@@ -528,6 +528,15 @@ impl DataFrame {
528528 self . session_state . optimize ( & self . plan )
529529 }
530530
531+ /// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
532+ /// as a table view using [`SessionContext::register_table`].
533+ ///
534+ /// Note: This discards the [`SessionState`] associated with this
535+ /// [`DataFrame`] in favour of the one passed to [`TableProvider::scan`]
536+ pub fn into_view ( self ) -> Arc < dyn TableProvider > {
537+ Arc :: new ( DataFrameTableProvider { plan : self . plan } )
538+ }
539+
531540 /// Return the optimized logical plan represented by this DataFrame.
532541 ///
533542 /// Note: This method should not be used outside testing, as it loses the snapshot
@@ -766,9 +775,12 @@ impl DataFrame {
766775 }
767776}
768777
769- // TODO: This will introduce a ref cycle (#2659)
778+ struct DataFrameTableProvider {
779+ plan : LogicalPlan ,
780+ }
781+
770782#[ async_trait]
771- impl TableProvider for DataFrame {
783+ impl TableProvider for DataFrameTableProvider {
772784 fn as_any ( & self ) -> & dyn Any {
773785 self
774786 }
@@ -796,34 +808,27 @@ impl TableProvider for DataFrame {
796808
797809 async fn scan (
798810 & self ,
799- _state : & SessionState ,
811+ state : & SessionState ,
800812 projection : Option < & Vec < usize > > ,
801813 filters : & [ Expr ] ,
802814 limit : Option < usize > ,
803815 ) -> Result < Arc < dyn ExecutionPlan > > {
804- let mut expr = self . clone ( ) ;
816+ let mut expr = LogicalPlanBuilder :: from ( self . plan . clone ( ) ) ;
805817 if let Some ( p) = projection {
806- let schema = TableProvider :: schema ( & expr) . project ( p) ?;
807- let names = schema
808- . fields ( )
809- . iter ( )
810- . map ( |field| field. name ( ) . as_str ( ) )
811- . collect :: < Vec < _ > > ( ) ;
812- expr = expr. select_columns ( names. as_slice ( ) ) ?;
818+ expr = expr. select ( p. iter ( ) . copied ( ) ) ?
813819 }
814820
815821 // Add filter when given
816822 let filter = filters. iter ( ) . cloned ( ) . reduce ( |acc, new| acc. and ( new) ) ;
817823 if let Some ( filter) = filter {
818824 expr = expr. filter ( filter) ?
819825 }
826+ // add a limit if given
820827 if let Some ( l) = limit {
821828 expr = expr. limit ( 0 , Some ( l) ) ?
822829 }
823- // add a limit if given
824- Self :: new ( self . session_state . clone ( ) , expr. plan )
825- . create_physical_plan ( )
826- . await
830+ let plan = expr. build ( ) ?;
831+ state. create_physical_plan ( & plan) . await
827832 }
828833}
829834
@@ -1098,7 +1103,7 @@ mod tests {
10981103 let df_impl = DataFrame :: new ( ctx. state ( ) , df. plan . clone ( ) ) ;
10991104
11001105 // register a dataframe as a table
1101- ctx. register_table ( "test_table" , Arc :: new ( df_impl. clone ( ) ) ) ?;
1106+ ctx. register_table ( "test_table" , df_impl. clone ( ) . into_view ( ) ) ?;
11021107
11031108 // pull the table out
11041109 let table = ctx. table ( "test_table" ) . await ?;
@@ -1297,7 +1302,7 @@ mod tests {
12971302 let df = test_table ( ) . await ?. select_columns ( & [ "c1" , "c2" , "c3" ] ) ?;
12981303 let ctx = SessionContext :: new ( ) ;
12991304
1300- let table = Arc :: new ( df ) ;
1305+ let table = df . into_view ( ) ;
13011306 ctx. register_table ( "t1" , table. clone ( ) ) ?;
13021307 ctx. register_table ( "t2" , table) ?;
13031308 let df = ctx
@@ -1386,7 +1391,7 @@ mod tests {
13861391 )
13871392 . await ?;
13881393
1389- ctx. register_table ( "t1" , Arc :: new ( ctx. table ( "test" ) . await ?) ) ?;
1394+ ctx. register_table ( "t1" , ctx. table ( "test" ) . await ?. into_view ( ) ) ?;
13901395
13911396 let df = ctx
13921397 . table ( "t1" )
0 commit comments