diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 6fa7793872..e95b3a129f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -487,14 +487,43 @@ public List listPartitions( ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException { + String tableName = objectPath.getObjectName(); + TablePath tablePath = toTablePath(objectPath); + try { + if (tableName.contains(LAKE_TABLE_SPLITTER)) { + String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER); - // TODO lake table should support. - if (objectPath.getObjectName().contains(LAKE_TABLE_SPLITTER)) { - return Collections.emptyList(); - } + // Validate table name format - only support "table_name$lake" pattern + if (tableComponents.length != 1) { + throw new IllegalArgumentException( + "listPartitions only supports table_name$lake pattern"); + } - try { - TablePath tablePath = toTablePath(objectPath); + TableInfo tableInfo = + admin.getTableInfo( + TablePath.of( + objectPath.getDatabaseName(), tableComponents[0])) + .get(); + if (!tableInfo.getTableConfig().isDataLakeEnabled()) { + throw new UnsupportedOperationException( + String.format( + "The table %s is not a data lake table, please check the table config.", + objectPath)); + } + ObjectPath lakeObjectPath = + ObjectPath.fromString( + objectPath.getDatabaseName() + "." + tableComponents[0]); + if (catalogPartitionSpec == null) { + // TODO: For now iceberg-flink connector only support list partitions without + // partition spec. + return lakeCatalog + .getLakeCatalog(tableInfo.getProperties()) + .listPartitions(lakeObjectPath); + } + return lakeCatalog + .getLakeCatalog(tableInfo.getProperties()) + .listPartitions(lakeObjectPath, catalogPartitionSpec); + } List partitionInfos; if (catalogPartitionSpec != null) { Map partitionSpec = catalogPartitionSpec.getPartitionSpec(); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 84a85deade..9fd8c336b1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -683,6 +683,14 @@ void testOperatePartitions() throws Exception { .hasMessage( "Failed to list partitions of table fluss.partitioned_t1 in test-catalog, by partitionSpec CatalogPartitionSpec{{second=}}"); + ObjectPath illegalPartitionTablePath = + new ObjectPath(DEFAULT_DB, "partitioned_t1$lake$snapshots"); + assertThatThrownBy(() -> catalog.listPartitions(illegalPartitionTablePath)) + .isInstanceOf(CatalogException.class) + .rootCause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("listPartitions only supports table_name$lake pattern"); + // NEW: Test dropPartition functionality CatalogPartitionSpec firstPartSpec = catalogPartitionSpecs.get(0); catalog.dropPartition(path2, firstPartSpec, false); diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java index 168f51cd2a..33923dfa88 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java @@ -33,6 +33,8 @@ import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; @@ -115,6 +117,22 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception { assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); if (isPartitioned) { + List lakeCatalogPartitionSpecs = + flinkCatalog.listPartitions(ObjectPath.fromString(t1 + "$lake")); + List catalogPartitionSpecs = + flinkCatalog.listPartitions(ObjectPath.fromString(t1.toString())); + assertThat(lakeCatalogPartitionSpecs).hasSize(catalogPartitionSpecs.size()); + // Iceberg includes bucket specs; check only "p" partition values + List lakePartitions = + lakeCatalogPartitionSpecs.stream() + .map(s -> s.getPartitionSpec().get("p")) + .collect(Collectors.toList()); + List flussPartitions = + catalogPartitionSpecs.stream() + .map(s -> s.getPartitionSpec().get("p")) + .collect(Collectors.toList()); + assertThat(lakePartitions).containsExactlyInAnyOrderElementsOf(flussPartitions); + // get first partition String partition = waitUntilPartitions(t1).values().iterator().next(); String sqlWithPartitionFilter = diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java index adb9a86866..81819a96f3 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java @@ -19,6 +19,7 @@ package org.apache.fluss.lake.iceberg.flink; import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.flink.catalog.FlinkCatalog; import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase; import org.apache.flink.configuration.Configuration; @@ -29,6 +30,8 @@ import javax.annotation.Nullable; +import java.util.Collections; + import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; /** Base class for iceberg union read test. */ @@ -39,6 +42,7 @@ class FlinkUnionReadTestBase extends FlinkIcebergTieringTestBase { protected static final int DEFAULT_BUCKET_NUM = 1; StreamTableEnvironment batchTEnv; StreamTableEnvironment streamTEnv; + FlinkCatalog flinkCatalog; @BeforeAll protected static void beforeAll() { @@ -59,6 +63,7 @@ public void beforeEach() { batchTEnv.executeSql("use catalog " + CATALOG_NAME); batchTEnv.executeSql("use " + DEFAULT_DB); buildStreamTEnv(null); + buildFlinkCatalog(); } protected StreamTableEnvironment buildStreamTEnv(@Nullable String savepointPath) { @@ -79,4 +84,16 @@ protected StreamTableEnvironment buildStreamTEnv(@Nullable String savepointPath) streamTEnv.executeSql("use " + DEFAULT_DB); return streamTEnv; } + + protected void buildFlinkCatalog() { + String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); + flinkCatalog = + new FlinkCatalog( + CATALOG_NAME, + DEFAULT_DB, + bootstrapServers, + Thread.currentThread().getContextClassLoader(), + Collections.emptyMap()); + flinkCatalog.open(); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java index cdb88dcbf1..c48f40319c 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java @@ -27,6 +27,8 @@ import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; @@ -109,6 +111,14 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception { assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); if (isPartitioned) { + List lakeCatalogPartitionSpecs = + flinkCatalog.listPartitions(ObjectPath.fromString(t1 + "$lake")); + List catalogPartitionSpecs = + flinkCatalog.listPartitions(ObjectPath.fromString(t1.toString())); + assertThat(lakeCatalogPartitionSpecs).hasSize(catalogPartitionSpecs.size()); + assertThat(lakeCatalogPartitionSpecs) + .containsExactlyInAnyOrderElementsOf(catalogPartitionSpecs); + // get first partition String partition = waitUntilPartitions(t1).values().iterator().next(); String sqlWithPartitionFilter = diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java index 94ba4fd1ce..7f810573fa 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.paimon.flink; import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.flink.catalog.FlinkCatalog; import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase; import org.apache.fluss.server.testutils.FlussClusterExtension; @@ -30,6 +31,8 @@ import javax.annotation.Nullable; +import java.util.Collections; + import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; /** Base class for Flink union read test. */ @@ -45,6 +48,7 @@ public class FlinkUnionReadTestBase extends FlinkPaimonTieringTestBase { protected static final int DEFAULT_BUCKET_NUM = 1; StreamTableEnvironment batchTEnv; StreamTableEnvironment streamTEnv; + FlinkCatalog flinkCatalog; @BeforeAll protected static void beforeAll() { @@ -56,6 +60,7 @@ public void beforeEach() { super.beforeEach(); buildBatchTEnv(); buildStreamTEnv(); + buildFlinkCatalog(); } @Override @@ -98,4 +103,16 @@ public void buildBatchTEnv() { batchTEnv.executeSql("use catalog " + CATALOG_NAME); batchTEnv.executeSql("use " + DEFAULT_DB); } + + protected void buildFlinkCatalog() { + String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); + flinkCatalog = + new FlinkCatalog( + CATALOG_NAME, + DEFAULT_DB, + bootstrapServers, + Thread.currentThread().getContextClassLoader(), + Collections.emptyMap()); + flinkCatalog.open(); + } }