Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,43 @@ public List<CatalogPartitionSpec> listPartitions(
ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, CatalogException {
String tableName = objectPath.getObjectName();
TablePath tablePath = toTablePath(objectPath);
try {
if (tableName.contains(LAKE_TABLE_SPLITTER)) {
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);

// TODO lake table should support.
if (objectPath.getObjectName().contains(LAKE_TABLE_SPLITTER)) {
return Collections.emptyList();
}
// Validate table name format - only support "table_name$lake" pattern
if (tableComponents.length != 1) {
throw new IllegalArgumentException(
"listPartitions only supports table_name$lake pattern");
}

try {
TablePath tablePath = toTablePath(objectPath);
TableInfo tableInfo =
admin.getTableInfo(
TablePath.of(
objectPath.getDatabaseName(), tableComponents[0]))
.get();
if (!tableInfo.getTableConfig().isDataLakeEnabled()) {
throw new UnsupportedOperationException(
String.format(
"The table %s is not a data lake table, please check the table config.",
objectPath));
}
ObjectPath lakeObjectPath =
ObjectPath.fromString(
objectPath.getDatabaseName() + "." + tableComponents[0]);
if (catalogPartitionSpec == null) {
// TODO: For now iceberg-flink connector only support list partitions without
// partition spec.
return lakeCatalog
.getLakeCatalog(tableInfo.getProperties())
.listPartitions(lakeObjectPath);
}
return lakeCatalog
.getLakeCatalog(tableInfo.getProperties())
.listPartitions(lakeObjectPath, catalogPartitionSpec);
}
List<PartitionInfo> partitionInfos;
if (catalogPartitionSpec != null) {
Map<String, String> partitionSpec = catalogPartitionSpec.getPartitionSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,14 @@ void testOperatePartitions() throws Exception {
.hasMessage(
"Failed to list partitions of table fluss.partitioned_t1 in test-catalog, by partitionSpec CatalogPartitionSpec{{second=}}");

ObjectPath illegalPartitionTablePath =
new ObjectPath(DEFAULT_DB, "partitioned_t1$lake$snapshots");
assertThatThrownBy(() -> catalog.listPartitions(illegalPartitionTablePath))
.isInstanceOf(CatalogException.class)
.rootCause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("listPartitions only supports table_name$lake pattern");

// NEW: Test dropPartition functionality
CatalogPartitionSpec firstPartSpec = catalogPartitionSpecs.get(0);
catalog.dropPartition(path2, firstPartSpec, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
Expand Down Expand Up @@ -115,6 +117,22 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception {
assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);

if (isPartitioned) {
List<CatalogPartitionSpec> lakeCatalogPartitionSpecs =
flinkCatalog.listPartitions(ObjectPath.fromString(t1 + "$lake"));
List<CatalogPartitionSpec> catalogPartitionSpecs =
flinkCatalog.listPartitions(ObjectPath.fromString(t1.toString()));
assertThat(lakeCatalogPartitionSpecs).hasSize(catalogPartitionSpecs.size());
// Iceberg includes bucket specs; check only "p" partition values
List<String> lakePartitions =
lakeCatalogPartitionSpecs.stream()
.map(s -> s.getPartitionSpec().get("p"))
.collect(Collectors.toList());
List<String> flussPartitions =
catalogPartitionSpecs.stream()
.map(s -> s.getPartitionSpec().get("p"))
.collect(Collectors.toList());
assertThat(lakePartitions).containsExactlyInAnyOrderElementsOf(flussPartitions);

// get first partition
String partition = waitUntilPartitions(t1).values().iterator().next();
String sqlWithPartitionFilter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.fluss.lake.iceberg.flink;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.flink.catalog.FlinkCatalog;
import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;

import org.apache.flink.configuration.Configuration;
Expand All @@ -29,6 +30,8 @@

import javax.annotation.Nullable;

import java.util.Collections;

import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;

/** Base class for iceberg union read test. */
Expand All @@ -39,6 +42,7 @@ class FlinkUnionReadTestBase extends FlinkIcebergTieringTestBase {
protected static final int DEFAULT_BUCKET_NUM = 1;
StreamTableEnvironment batchTEnv;
StreamTableEnvironment streamTEnv;
FlinkCatalog flinkCatalog;

@BeforeAll
protected static void beforeAll() {
Expand All @@ -59,6 +63,7 @@ public void beforeEach() {
batchTEnv.executeSql("use catalog " + CATALOG_NAME);
batchTEnv.executeSql("use " + DEFAULT_DB);
buildStreamTEnv(null);
buildFlinkCatalog();
}

protected StreamTableEnvironment buildStreamTEnv(@Nullable String savepointPath) {
Expand All @@ -79,4 +84,16 @@ protected StreamTableEnvironment buildStreamTEnv(@Nullable String savepointPath)
streamTEnv.executeSql("use " + DEFAULT_DB);
return streamTEnv;
}

protected void buildFlinkCatalog() {
String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
flinkCatalog =
new FlinkCatalog(
CATALOG_NAME,
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap());
flinkCatalog.open();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
Expand Down Expand Up @@ -109,6 +111,14 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception {
assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);

if (isPartitioned) {
List<CatalogPartitionSpec> lakeCatalogPartitionSpecs =
flinkCatalog.listPartitions(ObjectPath.fromString(t1 + "$lake"));
List<CatalogPartitionSpec> catalogPartitionSpecs =
flinkCatalog.listPartitions(ObjectPath.fromString(t1.toString()));
assertThat(lakeCatalogPartitionSpecs).hasSize(catalogPartitionSpecs.size());
assertThat(lakeCatalogPartitionSpecs)
.containsExactlyInAnyOrderElementsOf(catalogPartitionSpecs);

// get first partition
String partition = waitUntilPartitions(t1).values().iterator().next();
String sqlWithPartitionFilter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.lake.paimon.flink;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.flink.catalog.FlinkCatalog;
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
import org.apache.fluss.server.testutils.FlussClusterExtension;

Expand All @@ -30,6 +31,8 @@

import javax.annotation.Nullable;

import java.util.Collections;

import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;

/** Base class for Flink union read test. */
Expand All @@ -45,6 +48,7 @@ public class FlinkUnionReadTestBase extends FlinkPaimonTieringTestBase {
protected static final int DEFAULT_BUCKET_NUM = 1;
StreamTableEnvironment batchTEnv;
StreamTableEnvironment streamTEnv;
FlinkCatalog flinkCatalog;

@BeforeAll
protected static void beforeAll() {
Expand All @@ -56,6 +60,7 @@ public void beforeEach() {
super.beforeEach();
buildBatchTEnv();
buildStreamTEnv();
buildFlinkCatalog();
}

@Override
Expand Down Expand Up @@ -98,4 +103,16 @@ public void buildBatchTEnv() {
batchTEnv.executeSql("use catalog " + CATALOG_NAME);
batchTEnv.executeSql("use " + DEFAULT_DB);
}

protected void buildFlinkCatalog() {
String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
flinkCatalog =
new FlinkCatalog(
CATALOG_NAME,
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap());
flinkCatalog.open();
}
}