Skip to content

Commit b018d97

Browse files
committed
[FLINK-38532] improve operation converter test
1 parent c793dc8 commit b018d97

File tree

1 file changed

+84
-83
lines changed

1 file changed

+84
-83
lines changed

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java

Lines changed: 84 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@
8585
import org.assertj.core.api.HamcrestCondition;
8686
import org.assertj.core.api.InstanceOfAssertFactories;
8787
import org.junit.jupiter.api.Test;
88+
import org.junit.jupiter.params.ParameterizedTest;
89+
import org.junit.jupiter.params.provider.Arguments;
90+
import org.junit.jupiter.params.provider.MethodSource;
8891

8992
import javax.annotation.Nullable;
9093

@@ -96,7 +99,9 @@
9699
import java.util.Optional;
97100
import java.util.Set;
98101
import java.util.TreeMap;
102+
import java.util.function.Consumer;
99103
import java.util.stream.Collectors;
104+
import java.util.stream.Stream;
100105

101106
import static org.apache.flink.table.api.Expressions.$;
102107
import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
@@ -1404,97 +1409,93 @@ void testFailedToAlterTableDropDistribution() throws Exception {
14041409
checkAlterNonExistTable("alter table %s nonexistent drop watermark");
14051410
}
14061411

1407-
@Test
1408-
void createMaterializedTableWithRefreshModeContinuous() throws Exception {
1409-
final String sql =
1410-
"CREATE MATERIALIZED TABLE users_shops ("
1411-
+ " PRIMARY KEY (user_id) not enforced)"
1412-
+ " WITH(\n"
1413-
+ " 'format' = 'debezium-json'\n"
1414-
+ " )\n"
1415-
+ " FRESHNESS = INTERVAL '30' SECOND\n"
1416-
+ " REFRESH_MODE = CONTINUOUS\n"
1417-
+ " AS SELECT 1 as shop_id, 2 as user_id ";
1418-
1419-
final String expectedSummaryString =
1420-
"CREATE MATERIALIZED TABLE: (materializedTable: "
1421-
+ "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
1422-
+ " `shop_id` INT NOT NULL,\n"
1423-
+ " `user_id` INT NOT NULL,\n"
1424-
+ " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"
1425-
+ "), comment='null', distribution=null, partitionKeys=[], "
1426-
+ "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
1427-
+ "freshness=INTERVAL '30' SECOND, logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, "
1428-
+ "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n"
1429-
+ " `shop_id` INT NOT NULL,\n"
1430-
+ " `user_id` INT NOT NULL,\n"
1431-
+ " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"
1432-
+ ")}], identifier: [`builtin`.`default`.`users_shops`])";
1412+
@ParameterizedTest(name = "[{index}] {0}")
1413+
@MethodSource("provideCreateMaterializedTableTestCases")
1414+
void createMaterializedTableWithVariousOptions(
1415+
String testName,
1416+
String sql,
1417+
String expectedSummaryString,
1418+
Consumer<CreateMaterializedTableOperation> additionalAssertions) {
14331419

14341420
final Operation operation = parse(sql);
14351421

14361422
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
14371423
assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
1424+
14381425
final CreateMaterializedTableOperation createMaterializedTableOperation =
14391426
(CreateMaterializedTableOperation) operation;
1440-
assertThat(
1441-
createMaterializedTableOperation
1442-
.getCatalogMaterializedTable()
1443-
.getDefinitionFreshness())
1444-
.isEqualTo(IntervalFreshness.ofSecond("30"));
1445-
assertThat(createMaterializedTableOperation.getCatalogMaterializedTable().getRefreshMode())
1446-
.isSameAs(RefreshMode.CONTINUOUS);
14471427

1448-
prepareMaterializedTable("tb2", false, 1, null, "SELECT 1");
1449-
}
1450-
1451-
@Test
1452-
void createMaterializedTableWithDistribution() throws Exception {
1453-
final String sql =
1454-
"CREATE MATERIALIZED TABLE users_shops ("
1455-
+ " PRIMARY KEY (user_id) not enforced)"
1456-
+ " DISTRIBUTED BY HASH (user_id) INTO 7 BUCKETS\n"
1457-
+ " WITH(\n"
1458-
+ " 'format' = 'debezium-json'\n"
1459-
+ " )\n"
1460-
+ " FRESHNESS = INTERVAL '30' SECOND\n"
1461-
+ " AS SELECT 1 as shop_id, 2 as user_id ";
1462-
1463-
final String expectedSummaryString =
1464-
"CREATE MATERIALIZED TABLE: (materializedTable: "
1465-
+ "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
1466-
+ " `shop_id` INT NOT NULL,\n"
1467-
+ " `user_id` INT NOT NULL,\n"
1468-
+ " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"
1469-
+ "), comment='null', distribution=DISTRIBUTED BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "
1470-
+ "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
1471-
+ "freshness=INTERVAL '30' SECOND, logicalRefreshMode=AUTOMATIC, refreshMode=null, "
1472-
+ "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n"
1473-
+ " `shop_id` INT NOT NULL,\n"
1474-
+ " `user_id` INT NOT NULL,\n"
1475-
+ " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"
1476-
+ ")}], identifier: [`builtin`.`default`.`users_shops`])";
1477-
1478-
final Operation operation = parse(sql);
1479-
1480-
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
1481-
assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
1482-
assertThat(
1483-
((CreateMaterializedTableOperation) operation)
1484-
.getCatalogMaterializedTable()
1485-
.getDistribution()
1486-
.get())
1487-
.isEqualTo(TableDistribution.of(Kind.HASH, 7, List.of("user_id")));
1488-
1489-
prepareMaterializedTable("tb2", false, 1, null, "SELECT 1");
1490-
1491-
assertThatThrownBy(
1492-
() ->
1493-
parse(
1494-
"alter MATERIALIZED table cat1.db1.tb2 modify distribution into 3 buckets"))
1495-
.isInstanceOf(ValidationException.class)
1496-
.hasMessageContaining(
1497-
"Materialized table `cat1`.`db1`.`tb2` does not have a distribution to modify.");
1428+
additionalAssertions.accept(createMaterializedTableOperation);
1429+
}
1430+
1431+
private static Stream<Arguments> provideCreateMaterializedTableTestCases() {
1432+
return Stream.of(
1433+
Arguments.of(
1434+
"with refresh mode continuous",
1435+
"CREATE MATERIALIZED TABLE users_shops ("
1436+
+ " PRIMARY KEY (user_id) not enforced)"
1437+
+ " WITH(\n"
1438+
+ " 'format' = 'debezium-json'\n"
1439+
+ " )\n"
1440+
+ " FRESHNESS = INTERVAL '30' SECOND\n"
1441+
+ " REFRESH_MODE = CONTINUOUS\n"
1442+
+ " AS SELECT 1 as shop_id, 2 as user_id ",
1443+
"CREATE MATERIALIZED TABLE: (materializedTable: "
1444+
+ "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
1445+
+ " `shop_id` INT NOT NULL,\n"
1446+
+ " `user_id` INT NOT NULL,\n"
1447+
+ " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"
1448+
+ "), comment='null', distribution=null, partitionKeys=[], "
1449+
+ "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
1450+
+ "freshness=INTERVAL '30' SECOND, logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, "
1451+
+ "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n"
1452+
+ " `shop_id` INT NOT NULL,\n"
1453+
+ " `user_id` INT NOT NULL,\n"
1454+
+ " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"
1455+
+ ")}], identifier: [`builtin`.`default`.`users_shops`])",
1456+
(Consumer<CreateMaterializedTableOperation>)
1457+
op -> {
1458+
assertThat(
1459+
op.getCatalogMaterializedTable()
1460+
.getDefinitionFreshness())
1461+
.isEqualTo(IntervalFreshness.ofSecond("30"));
1462+
assertThat(op.getCatalogMaterializedTable().getRefreshMode())
1463+
.isSameAs(RefreshMode.CONTINUOUS);
1464+
}),
1465+
Arguments.of(
1466+
"with distribution",
1467+
"CREATE MATERIALIZED TABLE users_shops ("
1468+
+ " PRIMARY KEY (user_id) not enforced)"
1469+
+ " DISTRIBUTED BY HASH (user_id) INTO 7 BUCKETS\n"
1470+
+ " WITH(\n"
1471+
+ " 'format' = 'debezium-json'\n"
1472+
+ " )\n"
1473+
+ " FRESHNESS = INTERVAL '30' SECOND\n"
1474+
+ " AS SELECT 1 as shop_id, 2 as user_id ",
1475+
"CREATE MATERIALIZED TABLE: (materializedTable: "
1476+
+ "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
1477+
+ " `shop_id` INT NOT NULL,\n"
1478+
+ " `user_id` INT NOT NULL,\n"
1479+
+ " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"
1480+
+ "), comment='null', distribution=DISTRIBUTED BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "
1481+
+ "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
1482+
+ "freshness=INTERVAL '30' SECOND, logicalRefreshMode=AUTOMATIC, refreshMode=null, "
1483+
+ "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n"
1484+
+ " `shop_id` INT NOT NULL,\n"
1485+
+ " `user_id` INT NOT NULL,\n"
1486+
+ " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"
1487+
+ ")}], identifier: [`builtin`.`default`.`users_shops`])",
1488+
(Consumer<CreateMaterializedTableOperation>)
1489+
op ->
1490+
assertThat(
1491+
op.getCatalogMaterializedTable()
1492+
.getDistribution()
1493+
.get())
1494+
.isEqualTo(
1495+
TableDistribution.of(
1496+
Kind.HASH,
1497+
7,
1498+
List.of("user_id")))));
14981499
}
14991500

15001501
@Test

0 commit comments

Comments
 (0)