Skip to content

Commit 5318e71

Browse files
committed
[FLINK-22378][table] Derive type of SOURCE_WATERMARK() from time attribute
1 parent 19ca330 commit 5318e71

File tree

40 files changed

+376
-166
lines changed

40 files changed

+376
-166
lines changed

flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.connectors.hive.FlinkHiveException;
2222
import org.apache.flink.table.api.SqlParserException;
23-
import org.apache.flink.table.api.TableSchema;
2423
import org.apache.flink.table.api.ValidationException;
2524
import org.apache.flink.table.catalog.Catalog;
2625
import org.apache.flink.table.catalog.CatalogManager;
@@ -36,7 +35,6 @@
3635
import org.apache.flink.table.operations.ddl.CreateTableASOperation;
3736
import org.apache.flink.table.operations.ddl.CreateTableOperation;
3837
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
39-
import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
4038
import org.apache.flink.table.planner.delegation.ParserImpl;
4139
import org.apache.flink.table.planner.delegation.PlannerContext;
4240
import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseException;
@@ -80,7 +78,6 @@
8078
import java.util.HashSet;
8179
import java.util.List;
8280
import java.util.Set;
83-
import java.util.function.Function;
8481
import java.util.function.Supplier;
8582

8683
/** A Parser that uses Hive's planner to parse a statement. */
@@ -176,13 +173,12 @@ public class HiveParser extends ParserImpl {
176173
CatalogManager catalogManager,
177174
Supplier<FlinkPlannerImpl> validatorSupplier,
178175
Supplier<CalciteParser> calciteParserSupplier,
179-
Function<TableSchema, SqlExprToRexConverter> sqlExprToRexConverterCreator,
180176
PlannerContext plannerContext) {
181177
super(
182178
catalogManager,
183179
validatorSupplier,
184180
calciteParserSupplier,
185-
sqlExprToRexConverterCreator);
181+
plannerContext.getSqlExprToRexConverterFactory());
186182
this.plannerContext = plannerContext;
187183
this.catalogReader =
188184
plannerContext.createCatalogReader(

flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.table.catalog.CatalogManager;
2424
import org.apache.flink.table.delegation.Parser;
2525
import org.apache.flink.table.descriptors.DescriptorProperties;
26-
import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
2726
import org.apache.flink.table.planner.delegation.ParserFactory;
2827
import org.apache.flink.table.planner.delegation.PlannerContext;
2928

@@ -36,18 +35,13 @@ public class HiveParserFactory implements ParserFactory {
3635

3736
@Override
3837
public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
39-
SqlExprToRexConverterFactory sqlExprToRexConverterFactory =
40-
plannerContext::createSqlExprToRexConverter;
4138
return new HiveParser(
4239
catalogManager,
4340
() ->
4441
plannerContext.createFlinkPlanner(
4542
catalogManager.getCurrentCatalog(),
4643
catalogManager.getCurrentDatabase()),
4744
plannerContext::createCalciteParser,
48-
tableSchema ->
49-
sqlExprToRexConverterFactory.create(
50-
plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)),
5145
plannerContext);
5246
}
5347

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,11 @@ protected TableEnvironmentImpl(
261261
return Optional.empty();
262262
}
263263
},
264-
(sqlExpression, inputSchema) -> {
264+
(sqlExpression, inputRowType, outputType) -> {
265265
try {
266-
return getParser().parseSqlExpression(sqlExpression, inputSchema);
266+
return getParser()
267+
.parseSqlExpression(
268+
sqlExpression, inputRowType, outputType);
267269
} catch (Throwable t) {
268270
throw new ValidationException(
269271
String.format("Invalid SQL expression: %s", sqlExpression),

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ private ComputedColumn resolveComputedColumn(
144144
UnresolvedComputedColumn unresolvedColumn, List<Column> inputColumns) {
145145
final ResolvedExpression resolvedExpression;
146146
try {
147-
resolvedExpression = resolveExpression(inputColumns, unresolvedColumn.getExpression());
147+
resolvedExpression =
148+
resolveExpression(inputColumns, unresolvedColumn.getExpression(), null);
148149
} catch (Exception e) {
149150
throw new ValidationException(
150151
String.format(
@@ -189,22 +190,26 @@ private List<WatermarkSpec> resolveWatermarkSpecs(
189190
final ResolvedExpression watermarkExpression;
190191
try {
191192
watermarkExpression =
192-
resolveExpression(inputColumns, watermarkSpec.getWatermarkExpression());
193+
resolveExpression(
194+
inputColumns,
195+
watermarkSpec.getWatermarkExpression(),
196+
validatedTimeColumn.getDataType());
193197
} catch (Exception e) {
194198
throw new ValidationException(
195199
String.format(
196200
"Invalid expression for watermark '%s'.", watermarkSpec.toString()),
197201
e);
198202
}
199-
validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType());
203+
final LogicalType outputType = watermarkExpression.getOutputDataType().getLogicalType();
204+
final LogicalType timeColumnType = validatedTimeColumn.getDataType().getLogicalType();
205+
validateWatermarkExpression(outputType);
200206

201-
if (!(watermarkExpression.getOutputDataType().getLogicalType().getTypeRoot()
202-
== validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) {
207+
if (!(outputType.getTypeRoot() == timeColumnType.getTypeRoot())) {
203208
throw new ValidationException(
204209
String.format(
205-
"The watermark output type %s is different from input time filed type %s.",
206-
watermarkExpression.getOutputDataType(),
207-
validatedTimeColumn.getDataType()));
210+
"The watermark declaration's output data type '%s' is different "
211+
+ "from the time field's data type '%s'.",
212+
outputType, timeColumnType));
208213
}
209214

210215
return Collections.singletonList(
@@ -348,13 +353,15 @@ private void validatePrimaryKey(UniqueConstraint primaryKey, List<Column> column
348353
}
349354
}
350355

351-
private ResolvedExpression resolveExpression(List<Column> columns, Expression expression) {
356+
private ResolvedExpression resolveExpression(
357+
List<Column> columns, Expression expression, @Nullable DataType outputDataType) {
352358
final LocalReferenceExpression[] localRefs =
353359
columns.stream()
354360
.map(c -> localRef(c.getName(), c.getDataType()))
355361
.toArray(LocalReferenceExpression[]::new);
356362
return resolverBuilder
357363
.withLocalReferences(localRefs)
364+
.withOutputDataType(outputDataType)
358365
.build()
359366
.resolve(Collections.singletonList(expression))
360367
.get(0);

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919
package org.apache.flink.table.delegation;
2020

2121
import org.apache.flink.annotation.Internal;
22-
import org.apache.flink.table.api.TableSchema;
2322
import org.apache.flink.table.catalog.UnresolvedIdentifier;
2423
import org.apache.flink.table.expressions.ResolvedExpression;
2524
import org.apache.flink.table.operations.Operation;
2625
import org.apache.flink.table.operations.QueryOperation;
26+
import org.apache.flink.table.types.logical.LogicalType;
27+
import org.apache.flink.table.types.logical.RowType;
28+
29+
import javax.annotation.Nullable;
2730

2831
import java.util.List;
2932

@@ -58,11 +61,13 @@ public interface Parser {
5861
* Entry point for parsing SQL expressions expressed as a String.
5962
*
6063
* @param sqlExpression the SQL expression to parse
61-
* @param inputSchema the schema of the fields in sql expression
64+
* @param inputRowType the fields available in the SQL expression
65+
* @param outputType expected top-level output type if available
6266
* @return resolved expression
6367
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the sql expression
6468
*/
65-
ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema);
69+
ResolvedExpression parseSqlExpression(
70+
String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType);
6671

6772
/**
6873
* Returns completion hints for the given statement at the given cursor position. The completion

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.apache.flink.table.types.DataType;
4444
import org.apache.flink.util.Preconditions;
4545

46+
import javax.annotation.Nullable;
47+
4648
import java.util.ArrayList;
4749
import java.util.Arrays;
4850
import java.util.Collections;
@@ -117,6 +119,8 @@ public static List<ResolverRule> getAllResolverRules() {
117119

118120
private final Map<String, LocalReferenceExpression> localReferences;
119121

122+
private final @Nullable DataType outputDataType;
123+
120124
private final Map<Expression, LocalOverWindow> localOverWindows;
121125

122126
private final boolean isGroupedAggregation;
@@ -130,6 +134,7 @@ private ExpressionResolver(
130134
FieldReferenceLookup fieldLookup,
131135
List<OverWindow> localOverWindows,
132136
List<LocalReferenceExpression> localReferences,
137+
@Nullable DataType outputDataType,
133138
boolean isGroupedAggregation) {
134139
this.config = Preconditions.checkNotNull(config).getConfiguration();
135140
this.tableLookup = Preconditions.checkNotNull(tableLookup);
@@ -149,6 +154,7 @@ private ExpressionResolver(
149154
"Duplicate local reference: " + u);
150155
},
151156
LinkedHashMap::new));
157+
this.outputDataType = outputDataType;
152158
this.localOverWindows = prepareOverWindows(localOverWindows);
153159
this.isGroupedAggregation = isGroupedAggregation;
154160
}
@@ -323,6 +329,11 @@ public List<LocalReferenceExpression> getLocalReferences() {
323329
return new ArrayList<>(localReferences.values());
324330
}
325331

332+
@Override
333+
public Optional<DataType> getOutputDataType() {
334+
return Optional.ofNullable(outputDataType);
335+
}
336+
326337
@Override
327338
public Optional<LocalOverWindow> getOverWindow(Expression alias) {
328339
return Optional.ofNullable(localOverWindows.get(alias));
@@ -443,6 +454,7 @@ public static class ExpressionResolverBuilder {
443454
private final SqlExpressionResolver sqlExpressionResolver;
444455
private List<OverWindow> logicalOverWindows = new ArrayList<>();
445456
private List<LocalReferenceExpression> localReferences = new ArrayList<>();
457+
private @Nullable DataType outputDataType;
446458
private boolean isGroupedAggregation;
447459

448460
private ExpressionResolverBuilder(
@@ -471,6 +483,11 @@ public ExpressionResolverBuilder withLocalReferences(
471483
return this;
472484
}
473485

486+
public ExpressionResolverBuilder withOutputDataType(@Nullable DataType outputDataType) {
487+
this.outputDataType = outputDataType;
488+
return this;
489+
}
490+
474491
public ExpressionResolverBuilder withGroupedAggregation(boolean isGroupedAggregation) {
475492
this.isGroupedAggregation = isGroupedAggregation;
476493
return this;
@@ -486,6 +503,7 @@ public ExpressionResolver build() {
486503
new FieldReferenceLookup(queryOperations),
487504
logicalOverWindows,
488505
localReferences,
506+
outputDataType,
489507
isGroupedAggregation);
490508
}
491509
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/SqlExpressionResolver.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,18 @@
1919
package org.apache.flink.table.expressions.resolver;
2020

2121
import org.apache.flink.annotation.Internal;
22-
import org.apache.flink.table.api.TableSchema;
2322
import org.apache.flink.table.api.ValidationException;
2423
import org.apache.flink.table.expressions.ResolvedExpression;
24+
import org.apache.flink.table.types.logical.LogicalType;
25+
import org.apache.flink.table.types.logical.RowType;
26+
27+
import javax.annotation.Nullable;
2528

2629
/** Translates a SQL expression string into a {@link ResolvedExpression}. */
2730
@Internal
2831
public interface SqlExpressionResolver {
2932

3033
/** Translates the given SQL expression string or throws a {@link ValidationException}. */
31-
ResolvedExpression resolveExpression(String sqlExpression, TableSchema inputSchema);
34+
ResolvedExpression resolveExpression(
35+
String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType);
3236
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,19 @@
1919
package org.apache.flink.table.expressions.resolver.rules;
2020

2121
import org.apache.flink.annotation.Internal;
22-
import org.apache.flink.table.api.TableSchema;
2322
import org.apache.flink.table.expressions.Expression;
2423
import org.apache.flink.table.expressions.ResolvedExpression;
2524
import org.apache.flink.table.expressions.SqlCallExpression;
2625
import org.apache.flink.table.expressions.UnresolvedCallExpression;
2726
import org.apache.flink.table.expressions.resolver.SqlExpressionResolver;
27+
import org.apache.flink.table.types.DataType;
28+
import org.apache.flink.table.types.logical.LogicalType;
29+
import org.apache.flink.table.types.logical.RowType;
30+
import org.apache.flink.table.types.logical.RowType.RowField;
2831

32+
import javax.annotation.Nullable;
33+
34+
import java.util.ArrayList;
2935
import java.util.List;
3036
import java.util.stream.Collectors;
3137

@@ -37,32 +43,49 @@ final class ResolveSqlCallRule implements ResolverRule {
3743

3844
@Override
3945
public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
40-
return expression.stream()
41-
.map(expr -> expr.accept(new TranslateSqlCallsVisitor(context)))
42-
.collect(Collectors.toList());
46+
// only the top-level expressions may access the output data type
47+
final LogicalType outputType =
48+
context.getOutputDataType().map(DataType::getLogicalType).orElse(null);
49+
final TranslateSqlCallsVisitor visitor = new TranslateSqlCallsVisitor(context, outputType);
50+
return expression.stream().map(expr -> expr.accept(visitor)).collect(Collectors.toList());
4351
}
4452

4553
private static class TranslateSqlCallsVisitor extends RuleExpressionVisitor<Expression> {
4654

47-
TranslateSqlCallsVisitor(ResolutionContext resolutionContext) {
55+
private final @Nullable LogicalType outputType;
56+
57+
TranslateSqlCallsVisitor(
58+
ResolutionContext resolutionContext, @Nullable LogicalType outputType) {
4859
super(resolutionContext);
60+
this.outputType = outputType;
4961
}
5062

5163
@Override
5264
public Expression visit(SqlCallExpression sqlCall) {
5365
final SqlExpressionResolver resolver = resolutionContext.sqlExpressionResolver();
5466

55-
final TableSchema.Builder builder = TableSchema.builder();
67+
final List<RowField> fields = new ArrayList<>();
5668
// input references
5769
resolutionContext
5870
.referenceLookup()
5971
.getAllInputFields()
60-
.forEach(f -> builder.field(f.getName(), f.getOutputDataType()));
72+
.forEach(
73+
f ->
74+
fields.add(
75+
new RowField(
76+
f.getName(),
77+
f.getOutputDataType().getLogicalType())));
6178
// local references
6279
resolutionContext
6380
.getLocalReferences()
64-
.forEach(refs -> builder.field(refs.getName(), refs.getOutputDataType()));
65-
return resolver.resolveExpression(sqlCall.getSqlExpression(), builder.build());
81+
.forEach(
82+
refs ->
83+
fields.add(
84+
new RowField(
85+
refs.getName(),
86+
refs.getOutputDataType().getLogicalType())));
87+
return resolver.resolveExpression(
88+
sqlCall.getSqlExpression(), new RowType(false, fields), outputType);
6689
}
6790

6891
@Override
@@ -76,8 +99,10 @@ protected Expression defaultMethod(Expression expression) {
7699
}
77100

78101
private List<Expression> resolveChildren(List<Expression> lookupChildren) {
102+
final TranslateSqlCallsVisitor visitor =
103+
new TranslateSqlCallsVisitor(resolutionContext, null);
79104
return lookupChildren.stream()
80-
.map(child -> child.accept(this))
105+
.map(child -> child.accept(visitor))
81106
.collect(Collectors.toList());
82107
}
83108
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup;
3232
import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
3333
import org.apache.flink.table.functions.FunctionDefinition;
34+
import org.apache.flink.table.types.DataType;
3435

3536
import java.util.List;
3637
import java.util.Optional;
@@ -87,6 +88,9 @@ interface ResolutionContext {
8788
/** Access to available local references. */
8889
List<LocalReferenceExpression> getLocalReferences();
8990

91+
/** Access to the expected top-level output data type. */
92+
Optional<DataType> getOutputDataType();
93+
9094
/** Access to available local over windows. */
9195
Optional<LocalOverWindow> getOverWindow(Expression alias);
9296

0 commit comments

Comments
 (0)