@@ -28,9 +28,9 @@ use datafusion_expr::{
2828    CreateMemoryTable ,  DdlStatement ,  Distinct ,  Expr ,  LogicalPlan ,  LogicalPlanBuilder , 
2929} ; 
3030use  sqlparser:: ast:: { 
31-     Expr  as  SQLExpr ,  Ident ,  LimitClause ,  Offset ,  OffsetRows ,   OrderBy ,   OrderByExpr , 
32-     OrderByKind ,  PipeOperator ,  Query ,  SelectInto ,  SetExpr ,   SetOperator ,   SetQuantifier , 
33-     TableAlias , 
31+     Expr  as  SQLExpr ,  ExprWithAliasAndOrderBy ,   Ident ,  LimitClause ,  Offset ,  OffsetRows , 
32+     OrderBy ,   OrderByExpr ,   OrderByKind ,  PipeOperator ,  Query ,  SelectInto ,  SetExpr , 
33+     SetOperator ,   SetQuantifier ,   TableAlias , 
3434} ; 
3535use  sqlparser:: tokenizer:: Span ; 
3636
@@ -188,6 +188,15 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
188188                queries, 
189189                planner_context, 
190190            ) , 
191+             PipeOperator :: Aggregate  { 
192+                 full_table_exprs, 
193+                 group_by_expr, 
194+             }  => self . pipe_operator_aggregate ( 
195+                 plan, 
196+                 full_table_exprs, 
197+                 group_by_expr, 
198+                 planner_context, 
199+             ) , 
191200
192201            x => not_impl_err ! ( "`{x}` pipe operator is not supported yet" ) , 
193202        } 
@@ -294,6 +303,76 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
294303        } 
295304    } 
296305
306+     /// Handle AGGREGATE pipe operator 
307+ fn  pipe_operator_aggregate ( 
308+         & self , 
309+         plan :  LogicalPlan , 
310+         full_table_exprs :  Vec < ExprWithAliasAndOrderBy > , 
311+         group_by_expr :  Vec < ExprWithAliasAndOrderBy > , 
312+         planner_context :  & mut  PlannerContext , 
313+     )  -> Result < LogicalPlan >  { 
314+         // Convert aggregate expressions directly 
315+         let  aggr_exprs:  Vec < Expr >  = full_table_exprs
316+             . into_iter ( ) 
317+             . map ( |expr_with_alias_and_order_by| { 
318+                 let  expr_with_alias = expr_with_alias_and_order_by. expr ; 
319+                 let  sql_expr = expr_with_alias. expr ; 
320+                 let  alias = expr_with_alias. alias ; 
321+ 
322+                 // Convert SQL expression to DataFusion expression 
323+                 let  df_expr =
324+                     self . sql_to_expr ( sql_expr,  plan. schema ( ) ,  planner_context) ?; 
325+ 
326+                 // Apply alias if present, but handle the case where the expression might already be aliased 
327+                 match  alias { 
328+                     Some ( alias_ident)  => { 
329+                         // If the expression is already an alias, replace the alias name 
330+                         match  df_expr { 
331+                             Expr :: Alias ( alias_expr)  => { 
332+                                 Ok ( alias_expr. expr . alias ( alias_ident. value ) ) 
333+                             } 
334+                             _ => Ok ( df_expr. alias ( alias_ident. value ) ) , 
335+                         } 
336+                     } 
337+                     None  => Ok ( df_expr) , 
338+                 } 
339+             } ) 
340+             . collect :: < Result < Vec < _ > > > ( ) ?; 
341+ 
342+         // Convert group by expressions directly 
343+         let  group_by_exprs:  Vec < Expr >  = group_by_expr
344+             . into_iter ( ) 
345+             . map ( |expr_with_alias_and_order_by| { 
346+                 let  expr_with_alias = expr_with_alias_and_order_by. expr ; 
347+                 let  sql_expr = expr_with_alias. expr ; 
348+                 let  alias = expr_with_alias. alias ; 
349+ 
350+                 // Convert SQL expression to DataFusion expression 
351+                 let  df_expr =
352+                     self . sql_to_expr ( sql_expr,  plan. schema ( ) ,  planner_context) ?; 
353+ 
354+                 // Apply alias if present (though group by aliases are less common) 
355+                 match  alias { 
356+                     Some ( alias_ident)  => { 
357+                         // If the expression is already an alias, replace the alias name 
358+                         match  df_expr { 
359+                             Expr :: Alias ( alias_expr)  => { 
360+                                 Ok ( alias_expr. expr . alias ( alias_ident. value ) ) 
361+                             } 
362+                             _ => Ok ( df_expr. alias ( alias_ident. value ) ) , 
363+                         } 
364+                     } 
365+                     None  => Ok ( df_expr) , 
366+                 } 
367+             } ) 
368+             . collect :: < Result < Vec < _ > > > ( ) ?; 
369+ 
370+         // Create the aggregate logical plan 
371+         LogicalPlanBuilder :: from ( plan) 
372+             . aggregate ( group_by_exprs,  aggr_exprs) ?
373+             . build ( ) 
374+     } 
375+ 
297376    /// Wrap the logical plan in a `SelectInto` 
298377fn  select_into ( 
299378        & self , 
0 commit comments