File tree Expand file tree Collapse file tree 2 files changed +23
-1
lines changed
flink-table/flink-table-common/src
main/java/org/apache/flink/table/factories
test/java/org/apache/flink/table/factories Expand file tree Collapse file tree 2 files changed +23
-1
lines changed Original file line number Diff line number Diff line change @@ -107,6 +107,12 @@ public final class FactoryUtil {
107107 */
108108 public static final String FORMAT_SUFFIX = ".format" ;
109109
110+ /**
111+ * The placeholder symbol to be used for keys of options which can be templated. See {@link
112+ * Factory} for details.
113+ */
114+ public static final String PLACEHOLDER_SYMBOL = "#" ;
115+
110116 /**
111117 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
112118 *
@@ -362,6 +368,9 @@ public static void validateFactoryOptions(
362368
363369 final List <String > missingRequiredOptions =
364370 requiredOptions .stream ()
371+ // Templated options will never appear with their template key, so we need
372+ // to ignore them as required properties here
373+ .filter (option -> !option .key ().contains (PLACEHOLDER_SYMBOL ))
365374 .filter (option -> readOption (options , option ) == null )
366375 .map (ConfigOption ::key )
367376 .sorted ()
@@ -384,7 +393,7 @@ public static void validateUnconsumedKeys(
384393 String factoryIdentifier , Set <String > allOptionKeys , Set <String > consumedOptionKeys ) {
385394 final Set <String > remainingOptionKeys = new HashSet <>(allOptionKeys );
386395 remainingOptionKeys .removeAll (consumedOptionKeys );
387- if (remainingOptionKeys .size () > 0 ) {
396+ if (! remainingOptionKeys .isEmpty () ) {
388397 throw new ValidationException (
389398 String .format (
390399 "Unsupported options found for '%s'.\n \n "
Original file line number Diff line number Diff line change 1818
1919package org .apache .flink .table .factories ;
2020
21+ import org .apache .flink .configuration .ConfigOption ;
22+ import org .apache .flink .configuration .ConfigOptions ;
23+ import org .apache .flink .configuration .Configuration ;
2124import org .apache .flink .table .api .TableSchema ;
2225import org .apache .flink .table .api .ValidationException ;
2326import org .apache .flink .table .catalog .Catalog ;
3740
3841import java .util .Collections ;
3942import java .util .HashMap ;
43+ import java .util .HashSet ;
4044import java .util .List ;
4145import java .util .Map ;
4246import java .util .Optional ;
47+ import java .util .Set ;
4348import java .util .function .Consumer ;
4449
4550import static org .apache .flink .core .testutils .FlinkMatchers .containsCause ;
@@ -277,6 +282,14 @@ public void testConnectorErrorHint() {
277282 }
278283 }
279284
285+ @ Test
286+ public void testRequiredPlaceholderOption () {
287+ final Set <ConfigOption <?>> requiredOptions = new HashSet <>();
288+ requiredOptions .add (ConfigOptions .key ("fields.#.min" ).intType ().noDefaultValue ());
289+
290+ FactoryUtil .validateFactoryOptions (requiredOptions , new HashSet <>(), new Configuration ());
291+ }
292+
280293 @ Test
281294 public void testCreateCatalog () {
282295 final Map <String , String > options = new HashMap <>();
You can’t perform that action at this time.
0 commit comments