2020
2121import org .apache .flink .table .api .DataTypes ;
2222import org .apache .flink .table .expressions .Expression ;
23+ import org .apache .flink .table .expressions .UnresolvedCallExpression ;
2324import org .apache .flink .table .expressions .UnresolvedReferenceExpression ;
2425import org .apache .flink .table .types .DataType ;
2526import org .apache .flink .table .types .logical .DecimalType ;
2829import java .math .BigDecimal ;
2930
3031import static org .apache .flink .table .expressions .ApiExpressionUtils .unresolvedRef ;
32+ import static org .apache .flink .table .planner .expressions .ExpressionBuilder .cast ;
3133import static org .apache .flink .table .planner .expressions .ExpressionBuilder .ifThenElse ;
3234import static org .apache .flink .table .planner .expressions .ExpressionBuilder .isNull ;
3335import static org .apache .flink .table .planner .expressions .ExpressionBuilder .literal ;
3436import static org .apache .flink .table .planner .expressions .ExpressionBuilder .minus ;
3537import static org .apache .flink .table .planner .expressions .ExpressionBuilder .plus ;
38+ import static org .apache .flink .table .planner .expressions .ExpressionBuilder .typeLiteral ;
3639
3740/** built-in sum0 aggregate function. */
3841public abstract class Sum0AggFunction extends DeclarativeAggregateFunction {
@@ -56,20 +59,25 @@ public DataType[] getAggBufferTypes() {
5659 @ Override
5760 public Expression [] accumulateExpressions () {
5861 return new Expression [] {
59- /* sum0 = */ ifThenElse (isNull (operand (0 )), sum0 , plus (sum0 , operand (0 )))
62+ /* sum0 = */ adjustSumType ( ifThenElse (isNull (operand (0 )), sum0 , plus (sum0 , operand (0 ) )))
6063 };
6164 }
6265
6366 @ Override
6467 public Expression [] retractExpressions () {
6568 return new Expression [] {
66- /* sum0 = */ ifThenElse (isNull (operand (0 )), sum0 , minus (sum0 , operand (0 )))
69+ /* sum0 = */ adjustSumType (
70+ ifThenElse (isNull (operand (0 )), sum0 , minus (sum0 , operand (0 ))))
6771 };
6872 }
6973
7074 @ Override
7175 public Expression [] mergeExpressions () {
72- return new Expression [] {/* sum0 = */ plus (sum0 , mergeOperand (sum0 ))};
76+ return new Expression [] {/* sum0 = */ adjustSumType (plus (sum0 , mergeOperand (sum0 )))};
77+ }
78+
79+ private UnresolvedCallExpression adjustSumType (UnresolvedCallExpression sumExpr ) {
80+ return cast (sumExpr , typeLiteral (getResultType ()));
7381 }
7482
7583 @ Override
0 commit comments