Skip to content

Commit abd3da9

Browse files
Airbladertwalthr
authored andcommitted
[FLINK-22475][table-api-java-bridge] Add placeholder options for datagen connector
This closes #15896.
1 parent 6be47cc commit abd3da9

File tree

5 files changed

+167
-85
lines changed

5 files changed

+167
-85
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.factories;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.configuration.ConfigOption;
23+
import org.apache.flink.configuration.ConfigOptions;
24+
25+
import static org.apache.flink.configuration.ConfigOptions.key;
26+
27+
/** {@link ConfigOption}s for {@link DataGenTableSourceFactory}. */
28+
@Internal
29+
public class DataGenOptions {
30+
31+
public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
32+
33+
public static final String FIELDS = "fields";
34+
public static final String KIND = "kind";
35+
public static final String START = "start";
36+
public static final String END = "end";
37+
public static final String MIN = "min";
38+
public static final String MAX = "max";
39+
public static final String LENGTH = "length";
40+
41+
public static final String SEQUENCE = "sequence";
42+
public static final String RANDOM = "random";
43+
44+
public static final ConfigOption<Long> ROWS_PER_SECOND =
45+
key("rows-per-second")
46+
.longType()
47+
.defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
48+
.withDescription("Rows per second to control the emit rate.");
49+
50+
public static final ConfigOption<Long> NUMBER_OF_ROWS =
51+
key("number-of-rows")
52+
.longType()
53+
.noDefaultValue()
54+
.withDescription(
55+
"Total number of rows to emit. By default, the source is unbounded.");
56+
57+
/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
58+
public static final ConfigOption<String> FIELD_KIND =
59+
ConfigOptions.key(String.format("%s.#.%s", FIELDS, KIND))
60+
.stringType()
61+
.defaultValue("random")
62+
.withDescription("Generator of this '#' field. Can be 'sequence' or 'random'.");
63+
64+
/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
65+
public static final ConfigOption<String> FIELD_MIN =
66+
ConfigOptions.key(String.format("%s.#.%s", FIELDS, MIN))
67+
.stringType()
68+
.noDefaultValue()
69+
.withDescription(
70+
"Minimum value to generate for fields of kind 'random'. Minimum value possible for the type of the field.");
71+
72+
/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
73+
public static final ConfigOption<String> FIELD_MAX =
74+
ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAX))
75+
.stringType()
76+
.noDefaultValue()
77+
.withDescription(
78+
"Maximum value to generate for fields of kind 'random'. Maximum value possible for the type of the field.");
79+
80+
/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
81+
public static final ConfigOption<Integer> FIELD_LENGTH =
82+
ConfigOptions.key(String.format("%s.#.%s", FIELDS, LENGTH))
83+
.intType()
84+
.defaultValue(100)
85+
.withDescription(
86+
"Size or length of the collection for generating char/varchar/string/array/map/multiset types.");
87+
88+
/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
89+
public static final ConfigOption<String> FIELD_START =
90+
ConfigOptions.key(String.format("%s.#.%s", FIELDS, START))
91+
.stringType()
92+
.noDefaultValue()
93+
.withDescription("Start value of sequence generator.");
94+
95+
/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
96+
public static final ConfigOption<String> FIELD_END =
97+
ConfigOptions.key(String.format("%s.#.%s", FIELDS, END))
98+
.stringType()
99+
.noDefaultValue()
100+
.withDescription("End value of sequence generator.");
101+
102+
private DataGenOptions() {}
103+
}

flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -46,31 +46,6 @@
4646
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
4747

4848
public static final String IDENTIFIER = "datagen";
49-
public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
50-
51-
public static final ConfigOption<Long> ROWS_PER_SECOND =
52-
key("rows-per-second")
53-
.longType()
54-
.defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
55-
.withDescription("Rows per second to control the emit rate.");
56-
57-
public static final ConfigOption<Long> NUMBER_OF_ROWS =
58-
key("number-of-rows")
59-
.longType()
60-
.noDefaultValue()
61-
.withDescription(
62-
"Total number of rows to emit. By default, the source is unbounded.");
63-
64-
public static final String FIELDS = "fields";
65-
public static final String KIND = "kind";
66-
public static final String START = "start";
67-
public static final String END = "end";
68-
public static final String MIN = "min";
69-
public static final String MAX = "max";
70-
public static final String LENGTH = "length";
71-
72-
public static final String SEQUENCE = "sequence";
73-
public static final String RANDOM = "random";
7449

7550
@Override
7651
public String factoryIdentifier() {
@@ -85,8 +60,17 @@ public Set<ConfigOption<?>> requiredOptions() {
8560
@Override
8661
public Set<ConfigOption<?>> optionalOptions() {
8762
Set<ConfigOption<?>> options = new HashSet<>();
88-
options.add(ROWS_PER_SECOND);
89-
options.add(NUMBER_OF_ROWS);
63+
options.add(DataGenOptions.ROWS_PER_SECOND);
64+
options.add(DataGenOptions.NUMBER_OF_ROWS);
65+
66+
// Placeholder options
67+
options.add(DataGenOptions.FIELD_KIND);
68+
options.add(DataGenOptions.FIELD_MIN);
69+
options.add(DataGenOptions.FIELD_MAX);
70+
options.add(DataGenOptions.FIELD_LENGTH);
71+
options.add(DataGenOptions.FIELD_START);
72+
options.add(DataGenOptions.FIELD_END);
73+
9074
return options;
9175
}
9276

@@ -105,7 +89,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
10589
DataType type = schema.getFieldDataTypes()[i];
10690

10791
ConfigOption<String> kind =
108-
key(FIELDS + "." + name + "." + KIND).stringType().defaultValue(RANDOM);
92+
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.KIND)
93+
.stringType()
94+
.defaultValue(DataGenOptions.RANDOM);
10995
DataGeneratorContainer container =
11096
createContainer(name, type, options.get(kind), options);
11197
fieldGenerators[i] = container.getGenerator();
@@ -118,8 +104,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
118104

119105
Set<String> consumedOptionKeys = new HashSet<>();
120106
consumedOptionKeys.add(CONNECTOR.key());
121-
consumedOptionKeys.add(ROWS_PER_SECOND.key());
122-
consumedOptionKeys.add(NUMBER_OF_ROWS.key());
107+
consumedOptionKeys.add(DataGenOptions.ROWS_PER_SECOND.key());
108+
consumedOptionKeys.add(DataGenOptions.NUMBER_OF_ROWS.key());
123109
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
124110
FactoryUtil.validateUnconsumedKeys(
125111
factoryIdentifier(), options.keySet(), consumedOptionKeys);
@@ -129,16 +115,16 @@ public DynamicTableSource createDynamicTableSource(Context context) {
129115
fieldGenerators,
130116
name,
131117
schema,
132-
options.get(ROWS_PER_SECOND),
133-
options.get(NUMBER_OF_ROWS));
118+
options.get(DataGenOptions.ROWS_PER_SECOND),
119+
options.get(DataGenOptions.NUMBER_OF_ROWS));
134120
}
135121

136122
private DataGeneratorContainer createContainer(
137123
String name, DataType type, String kind, ReadableConfig options) {
138124
switch (kind) {
139-
case RANDOM:
125+
case DataGenOptions.RANDOM:
140126
return type.getLogicalType().accept(new RandomGeneratorVisitor(name, options));
141-
case SEQUENCE:
127+
case DataGenOptions.SEQUENCE:
142128
return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, options));
143129
default:
144130
throw new ValidationException("Unsupported generator kind: " + kind);

flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.table.data.GenericArrayData;
2929
import org.apache.flink.table.data.GenericMapData;
3030
import org.apache.flink.table.data.StringData;
31+
import org.apache.flink.table.factories.DataGenOptions;
3132
import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
3233
import org.apache.flink.table.factories.datagen.types.DecimalDataRandomGenerator;
3334
import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
@@ -55,10 +56,6 @@
5556
import java.util.stream.Collectors;
5657

5758
import static org.apache.flink.configuration.ConfigOptions.key;
58-
import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
59-
import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
60-
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
61-
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
6259

6360
/** Creates a random {@link DataGeneratorContainer} for a particular logical type. */
6461
@Internal
@@ -76,8 +73,8 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
7673
public RandomGeneratorVisitor(String name, ReadableConfig config) {
7774
super(name, config);
7875

79-
this.minKey = key(FIELDS + "." + name + "." + MIN);
80-
this.maxKey = key(FIELDS + "." + name + "." + MAX);
76+
this.minKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MIN);
77+
this.maxKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX);
8178
}
8279

8380
@Override
@@ -88,7 +85,7 @@ public DataGeneratorContainer visit(BooleanType booleanType) {
8885
@Override
8986
public DataGeneratorContainer visit(CharType booleanType) {
9087
ConfigOption<Integer> lenOption =
91-
key(FIELDS + "." + name + "." + LENGTH)
88+
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
9289
.intType()
9390
.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
9491
return DataGeneratorContainer.of(
@@ -98,7 +95,7 @@ public DataGeneratorContainer visit(CharType booleanType) {
9895
@Override
9996
public DataGeneratorContainer visit(VarCharType booleanType) {
10097
ConfigOption<Integer> lenOption =
101-
key(FIELDS + "." + name + "." + LENGTH)
98+
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
10299
.intType()
103100
.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
104101
return DataGeneratorContainer.of(
@@ -190,7 +187,7 @@ public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
190187
@Override
191188
public DataGeneratorContainer visit(ArrayType arrayType) {
192189
ConfigOption<Integer> lenOption =
193-
key(FIELDS + "." + name + "." + LENGTH)
190+
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
194191
.intType()
195192
.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
196193

@@ -208,7 +205,7 @@ public DataGeneratorContainer visit(ArrayType arrayType) {
208205
@Override
209206
public DataGeneratorContainer visit(MultisetType multisetType) {
210207
ConfigOption<Integer> lenOption =
211-
key(FIELDS + "." + name + "." + LENGTH)
208+
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
212209
.intType()
213210
.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
214211

@@ -230,7 +227,7 @@ public DataGeneratorContainer visit(MultisetType multisetType) {
230227
@Override
231228
public DataGeneratorContainer visit(MapType mapType) {
232229
ConfigOption<Integer> lenOption =
233-
key(FIELDS + "." + name + "." + LENGTH)
230+
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
234231
.intType()
235232
.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
236233

flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
2727
import org.apache.flink.table.api.ValidationException;
2828
import org.apache.flink.table.data.StringData;
29+
import org.apache.flink.table.factories.DataGenOptions;
2930
import org.apache.flink.table.types.logical.BigIntType;
3031
import org.apache.flink.table.types.logical.BooleanType;
3132
import org.apache.flink.table.types.logical.CharType;
@@ -38,9 +39,6 @@
3839
import org.apache.flink.table.types.logical.VarCharType;
3940

4041
import static org.apache.flink.configuration.ConfigOptions.key;
41-
import static org.apache.flink.table.factories.DataGenTableSourceFactory.END;
42-
import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
43-
import static org.apache.flink.table.factories.DataGenTableSourceFactory.START;
4442

4543
/** Creates a sequential {@link DataGeneratorContainer} for a particular logical type. */
4644
@Internal
@@ -65,8 +63,8 @@ public SequenceGeneratorVisitor(String name, ReadableConfig config) {
6563

6664
this.config = config;
6765

68-
this.startKeyStr = FIELDS + "." + name + "." + START;
69-
this.endKeyStr = FIELDS + "." + name + "." + END;
66+
this.startKeyStr = DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.START;
67+
this.endKeyStr = DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.END;
7068

7169
ConfigOptions.OptionBuilder startKey = key(startKeyStr);
7270
ConfigOptions.OptionBuilder endKey = key(endKeyStr);

0 commit comments

Comments
 (0)