Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
Expand All @@ -28,7 +27,6 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.IOUtils;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
Expand All @@ -41,8 +39,9 @@

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
Expand All @@ -66,11 +65,6 @@ public class PaimonLakeCatalog implements LakeCatalog {

private final Catalog paimonCatalog;

// for fluss config
private static final String FLUSS_CONF_PREFIX = "fluss.";
// for paimon config
private static final String PAIMON_CONF_PREFIX = "paimon.";

public PaimonLakeCatalog(Configuration configuration) {
this.paimonCatalog =
CatalogFactory.createCatalog(
Expand All @@ -86,7 +80,7 @@ protected Catalog getPaimonCatalog() {
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
throws TableAlreadyExistException {
// then, create the table
Identifier paimonPath = toPaimonIdentifier(tablePath);
Identifier paimonPath = toPaimon(tablePath);
Schema paimonSchema = toPaimonSchema(tableDescriptor);
try {
createTable(paimonPath, paimonSchema);
Expand All @@ -111,9 +105,8 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
throws TableNotExistException {
try {
Identifier paimonPath = toPaimonIdentifier(tablePath);
List<SchemaChange> paimonSchemaChanges =
toPaimonSchemaChanges(tableChanges, this::getFlussPropertyKeyToPaimon);
Identifier paimonPath = toPaimon(tablePath);
List<SchemaChange> paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
alterTable(paimonPath, paimonSchemaChanges);
} catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) {
// shouldn't happen before we support schema change
Expand Down Expand Up @@ -149,97 +142,6 @@ private void alterTable(Identifier tablePath, List<SchemaChange> tableChanges)
}
}

private Identifier toPaimonIdentifier(TablePath tablePath) {
return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
}

private Schema toPaimonSchema(TableDescriptor tableDescriptor) {
Schema.Builder schemaBuilder = Schema.newBuilder();
Options options = new Options();

// set default properties
setPaimonDefaultProperties(options);

// When bucket key is undefined, it should use dynamic bucket (bucket = -1) mode.
List<String> bucketKeys = tableDescriptor.getBucketKeys();
if (!bucketKeys.isEmpty()) {
int numBuckets =
tableDescriptor
.getTableDistribution()
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
.orElseThrow(
() ->
new IllegalArgumentException(
"Bucket count should be set."));
options.set(CoreOptions.BUCKET, numBuckets);
options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys));
} else {
options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue());
}

// set schema
for (org.apache.fluss.metadata.Schema.Column column :
tableDescriptor.getSchema().getColumns()) {
String columnName = column.getName();
if (SYSTEM_COLUMNS.containsKey(columnName)) {
throw new InvalidTableException(
"Column "
+ columnName
+ " conflicts with a system column name of paimon table, please rename the column.");
}
schemaBuilder.column(
columnName,
column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE),
column.getComment().orElse(null));
}

// add system metadata columns to schema
for (Map.Entry<String, DataType> systemColumn : SYSTEM_COLUMNS.entrySet()) {
schemaBuilder.column(systemColumn.getKey(), systemColumn.getValue());
}

// set pk
if (tableDescriptor.hasPrimaryKey()) {
schemaBuilder.primaryKey(
tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames());
options.set(
CoreOptions.CHANGELOG_PRODUCER.key(),
CoreOptions.ChangelogProducer.INPUT.toString());
}
// set partition keys
schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys());

// set properties to paimon schema
tableDescriptor.getProperties().forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
tableDescriptor
.getCustomProperties()
.forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
schemaBuilder.options(options.toMap());
return schemaBuilder.build();
}

private void setPaimonDefaultProperties(Options options) {
// set partition.legacy-name to false, otherwise paimon will use toString for all types,
// which will cause inconsistent partition value for a same binary value
options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false);
}

private void setFlussPropertyToPaimon(String key, String value, Options options) {
if (key.startsWith(PAIMON_CONF_PREFIX)) {
options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
} else {
options.set(FLUSS_CONF_PREFIX + key, value);
}
}

private String getFlussPropertyKeyToPaimon(String key) {
if (key.startsWith(PAIMON_CONF_PREFIX)) {
return key.substring(PAIMON_CONF_PREFIX.length());
} else {
return FLUSS_CONF_PREFIX + key;
}
}

@Override
public void close() {
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,52 @@

package org.apache.fluss.lake.paimon.utils;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.function.Function;
import java.util.Map;
import java.util.Set;

import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PaimonConversions now statically depends on PaimonLakeCatalog via SYSTEM_COLUMNS while PaimonLakeCatalog depends on PaimonConversions, creating tight coupling. Consider moving SYSTEM_COLUMNS to a dedicated constants/util class to avoid circular dependencies and improve modularity.

Suggested change
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
import static org.apache.fluss.lake.paimon.PaimonLakeConstants.SYSTEM_COLUMNS;

Copilot uses AI. Check for mistakes.

/** Utils for conversion between Paimon and Fluss. */
public class PaimonConversions {

// for fluss config
private static final String FLUSS_CONF_PREFIX = "fluss.";
// for paimon config
private static final String PAIMON_CONF_PREFIX = "paimon.";

/** Paimon config options set by Fluss should not be set by users. */
@VisibleForTesting public static final Set<String> PAIMON_UNSETTABLE_OPTIONS = new HashSet<>();

static {
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PARTITION_GENERATE_LEGCY_NAME.key());
}

public static RowKind toRowKind(ChangeType changeType) {
switch (changeType) {
case APPEND_ONLY:
Expand Down Expand Up @@ -80,22 +106,21 @@ public static Object toPaimonLiteral(DataType dataType, Object flussLiteral) {
.getFieldOrNull(flussRowAsPaimonRow);
}

public static List<SchemaChange> toPaimonSchemaChanges(
List<TableChange> tableChanges, Function<String, String> optionKeyTransformer) {
public static List<SchemaChange> toPaimonSchemaChanges(List<TableChange> tableChanges) {
List<SchemaChange> schemaChanges = new ArrayList<>(tableChanges.size());

for (TableChange tableChange : tableChanges) {
if (tableChange instanceof TableChange.SetOption) {
TableChange.SetOption setOption = (TableChange.SetOption) tableChange;
schemaChanges.add(
SchemaChange.setOption(
optionKeyTransformer.apply(setOption.getKey()),
convertFlussPropertyKeyToPaimon(setOption.getKey()),
setOption.getValue()));
} else if (tableChange instanceof TableChange.ResetOption) {
TableChange.ResetOption resetOption = (TableChange.ResetOption) tableChange;
schemaChanges.add(
SchemaChange.removeOption(
optionKeyTransformer.apply(resetOption.getKey())));
convertFlussPropertyKeyToPaimon(resetOption.getKey())));
} else {
throw new UnsupportedOperationException(
"Unsupported table change: " + tableChange.getClass());
Expand All @@ -104,4 +129,112 @@ public static List<SchemaChange> toPaimonSchemaChanges(

return schemaChanges;
}

public static Schema toPaimonSchema(TableDescriptor tableDescriptor) {
// validate paimon options first
validatePaimonOptions(tableDescriptor.getProperties());
validatePaimonOptions(tableDescriptor.getCustomProperties());

Schema.Builder schemaBuilder = Schema.newBuilder();
Options options = new Options();

// set default properties
setPaimonDefaultProperties(options);

// When bucket key is undefined, it should use dynamic bucket (bucket = -1) mode.
List<String> bucketKeys = tableDescriptor.getBucketKeys();
if (!bucketKeys.isEmpty()) {
int numBuckets =
tableDescriptor
.getTableDistribution()
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
.orElseThrow(
() ->
new IllegalArgumentException(
"Bucket count should be set."));
options.set(CoreOptions.BUCKET, numBuckets);
options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys));
} else {
options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue());
}

// set schema
for (org.apache.fluss.metadata.Schema.Column column :
tableDescriptor.getSchema().getColumns()) {
String columnName = column.getName();
if (SYSTEM_COLUMNS.containsKey(columnName)) {
throw new InvalidTableException(
"Column "
+ columnName
+ " conflicts with a system column name of paimon table, please rename the column.");
}
schemaBuilder.column(
columnName,
column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE),
column.getComment().orElse(null));
}

// add system metadata columns to schema
for (Map.Entry<String, DataType> systemColumn : SYSTEM_COLUMNS.entrySet()) {
schemaBuilder.column(systemColumn.getKey(), systemColumn.getValue());
}

// set pk
if (tableDescriptor.hasPrimaryKey()) {
schemaBuilder.primaryKey(
tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames());
options.set(
CoreOptions.CHANGELOG_PRODUCER.key(),
CoreOptions.ChangelogProducer.INPUT.toString());
}
// set partition keys
schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys());

// set properties to paimon schema
tableDescriptor.getProperties().forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
tableDescriptor
.getCustomProperties()
.forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
schemaBuilder.options(options.toMap());
return schemaBuilder.build();
}

private static void validatePaimonOptions(Map<String, String> properties) {
properties.forEach(
(k, v) -> {
String paimonKey = k;
if (k.startsWith(PAIMON_CONF_PREFIX)) {
paimonKey = k.substring(PAIMON_CONF_PREFIX.length());
}
if (PAIMON_UNSETTABLE_OPTIONS.contains(paimonKey)) {
throw new InvalidConfigException(
String.format(
"The Paimon option %s will be set automatically by Fluss "
+ "and should not be set manually.",
k));
}
});
}

private static void setPaimonDefaultProperties(Options options) {
// set partition.legacy-name to false, otherwise paimon will use toString for all types,
// which will cause inconsistent partition value for the same binary value
options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false);
}

private static void setFlussPropertyToPaimon(String key, String value, Options options) {
if (key.startsWith(PAIMON_CONF_PREFIX)) {
options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
} else {
options.set(FLUSS_CONF_PREFIX + key, value);
}
}

private static String convertFlussPropertyKeyToPaimon(String key) {
if (key.startsWith(PAIMON_CONF_PREFIX)) {
return key.substring(PAIMON_CONF_PREFIX.length());
} else {
return FLUSS_CONF_PREFIX + key;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
Expand Down Expand Up @@ -370,6 +371,36 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception {
BUCKET_NUM);
}

@Test
void testCreateLakeEnableTableWithUnsettablePaimonOptions() {
Map<String, String> customProperties = new HashMap<>();

for (String key : PAIMON_UNSETTABLE_OPTIONS) {
customProperties.clear();
customProperties.put(key, "v");

TableDescriptor table =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("c1", DataTypes.INT())
.column("c2", DataTypes.STRING())
.build())
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
.customProperties(customProperties)
.distributedBy(BUCKET_NUM, "c1", "c2")
.build();
TablePath tablePath = TablePath.of(DATABASE, "table_unsettable_paimon_option");
assertThatThrownBy(() -> admin.createTable(tablePath, table, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessage(
String.format(
"The Paimon option %s will be set automatically by Fluss and should not be set manually.",
key));
}
}

@Test
void testAlterLakeEnabledLogTable() throws Exception {
Map<String, String> customProperties = new HashMap<>();
Expand Down