@@ -31,6 +31,8 @@ use datafusion::common::UnnestOptions;
3131use datafusion:: config:: { CsvOptions , TableParquetOptions } ;
3232use datafusion:: dataframe:: { DataFrame , DataFrameWriteOptions } ;
3333use datafusion:: execution:: SendableRecordBatchStream ;
34+ use datafusion:: logical_expr:: utils:: find_window_exprs;
35+ use datafusion:: logical_expr:: LogicalPlanBuilder ;
3436use datafusion:: parquet:: basic:: { BrotliLevel , Compression , GzipLevel , ZstdLevel } ;
3537use datafusion:: prelude:: * ;
3638use pyo3:: exceptions:: { PyTypeError , PyValueError } ;
@@ -176,6 +178,56 @@ impl PyDataFrame {
176178 }
177179
178180 fn with_column ( & self , name : & str , expr : PyExpr ) -> PyResult < Self > {
181+ println ! ( "\n \n \n adding column: {:?} with expr: {:?}" , name, expr) ;
182+ let df = self . df . as_ref ( ) . clone ( ) ;
183+ let plan = df. logical_plan ( ) . clone ( ) ;
184+ let expr = expr. expr ;
185+ let window_func_exprs = find_window_exprs ( & [ expr. clone ( ) ] ) ;
186+ println ! ( "window_func_exprs: {:?}" , window_func_exprs) ;
187+ let ( plan, mut col_exists, window_func) = if window_func_exprs. is_empty ( ) {
188+ ( plan, false , false )
189+ } else {
190+ (
191+ LogicalPlanBuilder :: window_plan ( plan, window_func_exprs) ?,
192+ true ,
193+ true ,
194+ )
195+ } ;
196+ println ! ( "col_exists: {:?}, window_func: {:?}" , col_exists, window_func) ;
197+ println ! ( "plan: {}" , plan. display_indent( ) ) ;
198+
199+ let new_column = expr. clone ( ) . alias ( name) ;
200+ let mut fields: Vec < Expr > = plan
201+ . schema ( )
202+ . iter ( )
203+ . map ( |( qualifier, field) | {
204+ println ! ( "qualifier: {:?}, field: {:?}" , qualifier, field) ;
205+ if field. name ( ) == name {
206+ println ! ( "adding new column with same name" ) ;
207+ col_exists = true ;
208+ new_column. clone ( )
209+ } else if window_func && qualifier. is_none ( ) {
210+ println ! ( "adding window function with alias" ) ;
211+ col ( Column :: from ( ( qualifier, field) ) ) . alias ( name)
212+ } else {
213+ println ! ( "adding column" ) ;
214+ col ( Column :: from ( ( qualifier, field) ) )
215+ }
216+ } )
217+ . collect ( ) ;
218+
219+ if !col_exists {
220+ println ! ( "col does not exist - pushing {:?}" , new_column) ;
221+ fields. push ( new_column) ;
222+ } else {
223+ println ! ( "col exists - not pushing {:?}" , new_column) ;
224+ }
225+
226+ let project_plan = LogicalPlanBuilder :: from ( plan) . project ( fields) ?. build ( ) ?;
227+ println ! ( "project_plan: {}" , project_plan. display_indent( ) ) ;
228+
229+
230+ // Ok(DataFrame::new(df.session_state, project_plan))
179231 let df = self . df . as_ref ( ) . clone ( ) . with_column ( name, expr. into ( ) ) ?;
180232 Ok ( Self :: new ( df) )
181233 }
0 commit comments