diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java index 03ffd0cdc9..b11d5adf2f 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java @@ -19,7 +19,6 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.lake.lakestorage.LakeCatalog; @@ -28,7 +27,6 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.utils.IOUtils; -import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; @@ -41,8 +39,9 @@ import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; @@ -66,11 +65,6 @@ public class PaimonLakeCatalog implements LakeCatalog { private final Catalog paimonCatalog; - // for fluss config - private static final String FLUSS_CONF_PREFIX = "fluss."; - // for paimon config - private static final String PAIMON_CONF_PREFIX = "paimon."; - public PaimonLakeCatalog(Configuration configuration) { this.paimonCatalog = CatalogFactory.createCatalog( @@ -86,7 +80,7 @@ protected Catalog getPaimonCatalog() { public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) throws TableAlreadyExistException { // then, create the table - Identifier paimonPath = toPaimonIdentifier(tablePath); + Identifier paimonPath = toPaimon(tablePath); Schema paimonSchema = toPaimonSchema(tableDescriptor); try { createTable(paimonPath, paimonSchema); @@ -111,9 +105,8 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) public void alterTable(TablePath tablePath, List tableChanges) throws TableNotExistException { try { - Identifier paimonPath = toPaimonIdentifier(tablePath); - List paimonSchemaChanges = - toPaimonSchemaChanges(tableChanges, this::getFlussPropertyKeyToPaimon); + Identifier paimonPath = toPaimon(tablePath); + List paimonSchemaChanges = toPaimonSchemaChanges(tableChanges); alterTable(paimonPath, paimonSchemaChanges); } catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { // shouldn't happen before we support schema change @@ -149,97 +142,6 @@ private void alterTable(Identifier tablePath, List tableChanges) } } - private Identifier toPaimonIdentifier(TablePath tablePath) { - return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName()); - } - - private Schema toPaimonSchema(TableDescriptor tableDescriptor) { - Schema.Builder schemaBuilder = Schema.newBuilder(); - Options options = new Options(); - - // set default properties - setPaimonDefaultProperties(options); - - // When bucket key is undefined, it should use dynamic bucket (bucket = -1) mode. - List bucketKeys = tableDescriptor.getBucketKeys(); - if (!bucketKeys.isEmpty()) { - int numBuckets = - tableDescriptor - .getTableDistribution() - .flatMap(TableDescriptor.TableDistribution::getBucketCount) - .orElseThrow( - () -> - new IllegalArgumentException( - "Bucket count should be set.")); - options.set(CoreOptions.BUCKET, numBuckets); - options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys)); - } else { - options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue()); - } - - // set schema - for (org.apache.fluss.metadata.Schema.Column column : - tableDescriptor.getSchema().getColumns()) { - String columnName = column.getName(); - if (SYSTEM_COLUMNS.containsKey(columnName)) { - throw new InvalidTableException( - "Column " - + columnName - + " conflicts with a system column name of paimon table, please rename the column."); - } - schemaBuilder.column( - columnName, - column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE), - column.getComment().orElse(null)); - } - - // add system metadata columns to schema - for (Map.Entry systemColumn : SYSTEM_COLUMNS.entrySet()) { - schemaBuilder.column(systemColumn.getKey(), systemColumn.getValue()); - } - - // set pk - if (tableDescriptor.hasPrimaryKey()) { - schemaBuilder.primaryKey( - tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames()); - options.set( - CoreOptions.CHANGELOG_PRODUCER.key(), - CoreOptions.ChangelogProducer.INPUT.toString()); - } - // set partition keys - schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys()); - - // set properties to paimon schema - tableDescriptor.getProperties().forEach((k, v) -> setFlussPropertyToPaimon(k, v, options)); - tableDescriptor - .getCustomProperties() - .forEach((k, v) -> setFlussPropertyToPaimon(k, v, options)); - schemaBuilder.options(options.toMap()); - return schemaBuilder.build(); - } - - private void setPaimonDefaultProperties(Options options) { - // set partition.legacy-name to false, otherwise paimon will use toString for all types, - // which will cause inconsistent partition value for a same binary value - options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false); - } - - private void setFlussPropertyToPaimon(String key, String value, Options options) { - if (key.startsWith(PAIMON_CONF_PREFIX)) { - options.set(key.substring(PAIMON_CONF_PREFIX.length()), value); - } else { - options.set(FLUSS_CONF_PREFIX + key, value); - } - } - - private String getFlussPropertyKeyToPaimon(String key) { - if (key.startsWith(PAIMON_CONF_PREFIX)) { - return key.substring(PAIMON_CONF_PREFIX.length()); - } else { - return FLUSS_CONF_PREFIX + key; - } - } - @Override public void close() { IOUtils.closeQuietly(paimonCatalog, "paimon catalog"); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index a9491659f1..2c42c471ee 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -17,26 +17,52 @@ package org.apache.fluss.lake.paimon.utils; +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.exception.InvalidConfigException; +import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType; import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow; import org.apache.fluss.metadata.TableChange; +import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.ChangeType; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; -import java.util.function.Function; +import java.util.Map; +import java.util.Set; + +import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS; /** Utils for conversion between Paimon and Fluss. */ public class PaimonConversions { + // for fluss config + private static final String FLUSS_CONF_PREFIX = "fluss."; + // for paimon config + private static final String PAIMON_CONF_PREFIX = "paimon."; + + /** Paimon config options set by Fluss should not be set by users. */ + @VisibleForTesting public static final Set PAIMON_UNSETTABLE_OPTIONS = new HashSet<>(); + + static { + PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key()); + PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key()); + PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PARTITION_GENERATE_LEGCY_NAME.key()); + } + public static RowKind toRowKind(ChangeType changeType) { switch (changeType) { case APPEND_ONLY: @@ -80,8 +106,7 @@ public static Object toPaimonLiteral(DataType dataType, Object flussLiteral) { .getFieldOrNull(flussRowAsPaimonRow); } - public static List toPaimonSchemaChanges( - List tableChanges, Function optionKeyTransformer) { + public static List toPaimonSchemaChanges(List tableChanges) { List schemaChanges = new ArrayList<>(tableChanges.size()); for (TableChange tableChange : tableChanges) { @@ -89,13 +114,13 @@ public static List toPaimonSchemaChanges( TableChange.SetOption setOption = (TableChange.SetOption) tableChange; schemaChanges.add( SchemaChange.setOption( - optionKeyTransformer.apply(setOption.getKey()), + convertFlussPropertyKeyToPaimon(setOption.getKey()), setOption.getValue())); } else if (tableChange instanceof TableChange.ResetOption) { TableChange.ResetOption resetOption = (TableChange.ResetOption) tableChange; schemaChanges.add( SchemaChange.removeOption( - optionKeyTransformer.apply(resetOption.getKey()))); + convertFlussPropertyKeyToPaimon(resetOption.getKey()))); } else { throw new UnsupportedOperationException( "Unsupported table change: " + tableChange.getClass()); @@ -104,4 +129,112 @@ public static List toPaimonSchemaChanges( return schemaChanges; } + + public static Schema toPaimonSchema(TableDescriptor tableDescriptor) { + // validate paimon options first + validatePaimonOptions(tableDescriptor.getProperties()); + validatePaimonOptions(tableDescriptor.getCustomProperties()); + + Schema.Builder schemaBuilder = Schema.newBuilder(); + Options options = new Options(); + + // set default properties + setPaimonDefaultProperties(options); + + // When bucket key is undefined, it should use dynamic bucket (bucket = -1) mode. + List bucketKeys = tableDescriptor.getBucketKeys(); + if (!bucketKeys.isEmpty()) { + int numBuckets = + tableDescriptor + .getTableDistribution() + .flatMap(TableDescriptor.TableDistribution::getBucketCount) + .orElseThrow( + () -> + new IllegalArgumentException( + "Bucket count should be set.")); + options.set(CoreOptions.BUCKET, numBuckets); + options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys)); + } else { + options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue()); + } + + // set schema + for (org.apache.fluss.metadata.Schema.Column column : + tableDescriptor.getSchema().getColumns()) { + String columnName = column.getName(); + if (SYSTEM_COLUMNS.containsKey(columnName)) { + throw new InvalidTableException( + "Column " + + columnName + + " conflicts with a system column name of paimon table, please rename the column."); + } + schemaBuilder.column( + columnName, + column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE), + column.getComment().orElse(null)); + } + + // add system metadata columns to schema + for (Map.Entry systemColumn : SYSTEM_COLUMNS.entrySet()) { + schemaBuilder.column(systemColumn.getKey(), systemColumn.getValue()); + } + + // set pk + if (tableDescriptor.hasPrimaryKey()) { + schemaBuilder.primaryKey( + tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames()); + options.set( + CoreOptions.CHANGELOG_PRODUCER.key(), + CoreOptions.ChangelogProducer.INPUT.toString()); + } + // set partition keys + schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys()); + + // set properties to paimon schema + tableDescriptor.getProperties().forEach((k, v) -> setFlussPropertyToPaimon(k, v, options)); + tableDescriptor + .getCustomProperties() + .forEach((k, v) -> setFlussPropertyToPaimon(k, v, options)); + schemaBuilder.options(options.toMap()); + return schemaBuilder.build(); + } + + private static void validatePaimonOptions(Map properties) { + properties.forEach( + (k, v) -> { + String paimonKey = k; + if (k.startsWith(PAIMON_CONF_PREFIX)) { + paimonKey = k.substring(PAIMON_CONF_PREFIX.length()); + } + if (PAIMON_UNSETTABLE_OPTIONS.contains(paimonKey)) { + throw new InvalidConfigException( + String.format( + "The Paimon option %s will be set automatically by Fluss " + + "and should not be set manually.", + k)); + } + }); + } + + private static void setPaimonDefaultProperties(Options options) { + // set partition.legacy-name to false, otherwise paimon will use toString for all types, + // which will cause inconsistent partition value for the same binary value + options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false); + } + + private static void setFlussPropertyToPaimon(String key, String value, Options options) { + if (key.startsWith(PAIMON_CONF_PREFIX)) { + options.set(key.substring(PAIMON_CONF_PREFIX.length()), value); + } else { + options.set(FLUSS_CONF_PREFIX + key, value); + } + } + + private static String convertFlussPropertyKeyToPaimon(String key) { + if (key.startsWith(PAIMON_CONF_PREFIX)) { + return key.substring(PAIMON_CONF_PREFIX.length()); + } else { + return FLUSS_CONF_PREFIX + key; + } + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 99f33b05ee..37033ea74f 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -57,6 +57,7 @@ import java.util.Map; import java.util.stream.Stream; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; @@ -370,6 +371,36 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception { BUCKET_NUM); } + @Test + void testCreateLakeEnableTableWithUnsettablePaimonOptions() { + Map customProperties = new HashMap<>(); + + for (String key : PAIMON_UNSETTABLE_OPTIONS) { + customProperties.clear(); + customProperties.put(key, "v"); + + TableDescriptor table = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .distributedBy(BUCKET_NUM, "c1", "c2") + .build(); + TablePath tablePath = TablePath.of(DATABASE, "table_unsettable_paimon_option"); + assertThatThrownBy(() -> admin.createTable(tablePath, table, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessage( + String.format( + "The Paimon option %s will be set automatically by Fluss and should not be set manually.", + key)); + } + } + @Test void testAlterLakeEnabledLogTable() throws Exception { Map customProperties = new HashMap<>();