From 9c21f26a339ece9782f5ad983418a759f583b332 Mon Sep 17 00:00:00 2001 From: arjit3251 Date: Mon, 22 Sep 2025 22:04:44 +0530 Subject: [PATCH] Evolve iceberg table schema for partition copy --- .../IcebergOverwritePartitionsStep.java | 7 +- .../copy/iceberg/IcebergPartitionDataset.java | 1 + .../IcebergPartitionDatasetFinder.java | 29 +++ .../management/copy/iceberg/IcebergTable.java | 39 +++- .../IcebergTableMetadataValidatorUtils.java | 28 +-- .../IcebergOverwritePartitionsStepTest.java | 23 ++- .../IcebergPartitionDatasetFinderTest.java | 171 ++++++++++++++++++ .../copy/iceberg/IcebergTableTest.java | 6 +- 8 files changed, 257 insertions(+), 47 deletions(-) create mode 100644 gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinderTest.java diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java index 30dc2094f34..381e6dbceaa 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException; import org.apache.iceberg.DataFile; +import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.TableIdentifier; import com.github.rholder.retry.Attempt; @@ -57,6 +58,7 @@ @Slf4j public class IcebergOverwritePartitionsStep implements CommitStep { private final String destTableIdStr; + private final Schema updatedSchema; private final Properties properties; // data files are populated once all the copy tasks are done. Each IcebergPartitionCopyableFile has a serialized data file @Setter private List dataFiles; @@ -75,8 +77,9 @@ public class IcebergOverwritePartitionsStep implements CommitStep { * @param destTableIdStr the identifier of the destination table as a string * @param properties the properties containing configuration */ - public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, Properties properties) { + public IcebergOverwritePartitionsStep(String destTableIdStr, Schema updatedSchema, String partitionColName, String partitionValue, Properties properties) { this.destTableIdStr = destTableIdStr; + this.updatedSchema = updatedSchema; this.partitionColName = partitionColName; this.partitionValue = partitionValue; this.properties = properties; @@ -110,7 +113,7 @@ public void execute() throws IOException { ); Retryer overwritePartitionsRetryer = createOverwritePartitionsRetryer(); overwritePartitionsRetryer.call(() -> { - destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue); + destTable.overwriteSchemaAndPartition(dataFiles, updatedSchema, this.partitionColName, this.partitionValue); return null; }); log.info("~{}~ Successful partition overwrite - partition: {}; value: {}", diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java index 5600b47f08f..7ea3e8bd16e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java @@ -194,6 +194,7 @@ private Path addUUIDToPath(String filePathStr) { private PostPublishStep createOverwritePostPublishStep() { IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep( this.getDestIcebergTable().getTableId().toString(), + this.getSrcIcebergTable().getUnderlyingIcebergTable().schema(), this.partitionColumnName, this.partitionColValue, this.properties diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java index 3d3e5c19178..1c833269d7b 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -22,6 +22,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateSchema; import com.google.common.base.Preconditions; @@ -53,6 +56,8 @@ protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, Ice boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY, DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY)); + validateSchemaEvolution(srcIcebergTable.getUnderlyingIcebergTable(), destIcebergTable.getUnderlyingIcebergTable()); + IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure( srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality); @@ -69,4 +74,28 @@ protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, Ice return new IcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties), partitionColumnName, partitionColumnValue); } + + protected static void validateSchemaEvolution(Table tableA, Table tableB) { + Schema schemaA = tableA.schema(); + Schema schemaB = tableB.schema(); + + if (schemaA.sameSchema(schemaB)) { + log.info("Schemas are identical for source & destination tables"); + return; + } + // Take all fields from schemaA and add those to schemaB. If the resulting schema is same as schemaA, + // then schema evolution is possible. This is because, schemaB can be a subset of schemaA in case of schema evolution. + // TODO: unionByNameWith does not handle addition of a new required column in schemaA. + UpdateSchema updateSchemaTxn = tableB.updateSchema().unionByNameWith(schemaA); + Schema updatedSchema = updateSchemaTxn.apply(); + + if (updatedSchema.sameSchema(schemaA)) { + log.info("Schema evolution supported for source & destination tables, Schemas: {} and {}", + schemaA, schemaB); + } else { + String errMsg = String.format("Schema evolution not supported between schemas: %s and %s", + schemaA, schemaB); + throw new RuntimeException(errMsg); + } + } } \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index fab570120bf..367ec3a24d3 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -36,11 +36,14 @@ import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestReader; import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; @@ -102,6 +105,10 @@ public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException { return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), Optional.of(current)); } + public Table getUnderlyingIcebergTable() { + return table; + } + /** @return metadata info for most recent snapshot, wherein manifests and their child data files ARE NOT listed */ public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws IOException { TableMetadata current = accessTableMetadata(); @@ -292,7 +299,7 @@ public List getPartitionSpecificDataFiles(Predicate iceber * @param partitionColName the partition column name whose data files are to be replaced * @param partitionValue the partition column value on which data files will be replaced */ - protected void overwritePartition(List dataFiles, String partitionColName, String partitionValue) + protected void overwriteSchemaAndPartition(List dataFiles, Schema updatedSchema, String partitionColName, String partitionValue) throws TableNotFoundException { if (dataFiles.isEmpty()) { return; @@ -304,10 +311,10 @@ protected void overwritePartition(List dataFiles, String partitionColN } else { log.warn("~{}~ No current snapshot found before overwrite", tableId); } - OverwriteFiles overwriteFiles = this.table.newOverwrite(); - overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, partitionValue)); - dataFiles.forEach(overwriteFiles::addFile); - overwriteFiles.commit(); + Transaction transaction = this.table.newTransaction(); + updateSchema(transaction, updatedSchema); + overwritePartition(transaction, dataFiles, partitionColName, partitionValue); + transaction.commitTransaction(); this.tableOps.refresh(); // Note : this would only arise in a high-frequency commit scenario, but there's no guarantee that the current // snapshot is necessarily the one from the commit just before. another writer could have just raced to commit @@ -315,4 +322,26 @@ protected void overwritePartition(List dataFiles, String partitionColN log.info("~{}~ SnapshotId after overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId()); } + private void updateSchema(Transaction transaction, Schema updatedSchema) { + if (updatedSchema.sameSchema(table.schema())) { + log.info("~{}~ No schema update needed; existing schema is identical", tableId); + return; + } + UpdateSchema updateSchema = transaction.updateSchema().unionByNameWith(updatedSchema); + Schema modifiedSchema = updateSchema.apply(); + if (modifiedSchema.sameSchema(updatedSchema)) { + log.info("~{}~ Updated schema to: {}", tableId, modifiedSchema); + } else { + log.warn("~{}~ Post-update schema differs from requested update; existing: {}, requested: {}, actual: {}", + tableId, table.schema(), updatedSchema, modifiedSchema); + throw new RuntimeException("Failed to update schema as requested"); + } + } + + private void overwritePartition(Transaction transaction, List dataFiles, String partitionColName, + String partitionValue) { + OverwriteFiles overwriteFiles = transaction.newOverwrite(); + overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, partitionValue)); + dataFiles.forEach(overwriteFiles::addFile); + } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java index 7c47f2dfb81..90dc90c4120 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java @@ -20,7 +20,6 @@ import java.io.IOException; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; import lombok.extern.slf4j.Slf4j; @@ -36,11 +35,7 @@ private IcebergTableMetadataValidatorUtils() { } /** - * Compares the metadata of the given two iceberg tables. - *
    - *
  • First compares the schema of the metadata.
  • - *
  • Then compares the partition spec of the metadata.
  • - *
+ * Compares the partition spec of the metadata. * @param tableMetadataA the metadata of the first table * @param tableMetadataB the metadata of the second table * @param validateStrictPartitionEquality boolean value to control strictness of partition spec comparison @@ -52,27 +47,6 @@ public static void failUnlessCompatibleStructure(TableMetadata tableMetadataA, tableMetadataA.metadataFileLocation(), tableMetadataB.metadataFileLocation()); - Schema schemaA = tableMetadataA.schema(); - Schema schemaB = tableMetadataB.schema(); - // TODO: Need to add support for schema evolution - // This check needs to be broken down into multiple checks to support schema evolution - // Possible cases - schemaA == schemaB, - // - schemaA is subset of schemaB [ schemaB Evolved ], - // - schemaA is superset of schemaB [ schemaA Evolved ], - // - Other cases? - // Also consider using Strategy or any other design pattern for this to make it a better solution - if (!schemaA.sameSchema(schemaB)) { - String errMsg = String.format( - "Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}", - tableMetadataA.metadataFileLocation(), - schemaA.schemaId(), - tableMetadataB.metadataFileLocation(), - schemaB.schemaId() - ); - log.error(errMsg); - throw new IOException(errMsg); - } - PartitionSpec partitionSpecA = tableMetadataA.spec(); PartitionSpec partitionSpecB = tableMetadataB.spec(); // .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java index e4c1774e9aa..67c158585e2 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java @@ -24,6 +24,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.TableIdentifier; import org.mockito.Mockito; import org.testng.Assert; @@ -40,6 +41,7 @@ public class IcebergOverwritePartitionsStepTest { private final String testPartitionColName = "testPartition"; private final String testPartitionColValue = "testValue"; private IcebergTable mockIcebergTable; + private Schema mockSchema; private IcebergCatalog mockIcebergCatalog; private Properties mockProperties; private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep; @@ -47,10 +49,11 @@ public class IcebergOverwritePartitionsStepTest { @BeforeMethod public void setUp() throws IOException { mockIcebergTable = Mockito.mock(IcebergTable.class); + mockSchema = Mockito.mock(Schema.class); mockIcebergCatalog = Mockito.mock(IcebergCatalog.class); mockProperties = new Properties(); - spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, + spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, mockSchema, testPartitionColName, testPartitionColValue, mockProperties)); spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles()); @@ -67,10 +70,10 @@ public void testNeverIsCompleted() { @Test public void testExecute() { try { - Mockito.doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(), Mockito.anyString(), + Mockito.doNothing().when(mockIcebergTable).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class), Mockito.anyString(), Mockito.anyString()); spyIcebergOverwritePartitionsStep.execute(); - Mockito.verify(mockIcebergTable, Mockito.times(1)).overwritePartition(Mockito.anyList(), + Mockito.verify(mockIcebergTable, Mockito.times(1)).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class), Mockito.anyString(), Mockito.anyString()); } catch (IOException e) { Assert.fail(String.format("Unexpected IOException : %s", e)); @@ -81,10 +84,10 @@ public void testExecute() { public void testExecuteWithRetry() { try { // first call throw exception which will be retried and on second call nothing happens - Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(), + Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class), Mockito.anyString(), Mockito.anyString()); spyIcebergOverwritePartitionsStep.execute(); - Mockito.verify(mockIcebergTable, Mockito.times(2)).overwritePartition(Mockito.anyList(), + Mockito.verify(mockIcebergTable, Mockito.times(2)).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class), Mockito.anyString(), Mockito.anyString()); } catch (IOException e) { Assert.fail(String.format("Unexpected IOException : %s", e)); @@ -95,11 +98,11 @@ public void testExecuteWithRetry() { public void testExecuteWithDefaultRetry() throws IcebergTable.TableNotFoundException { try { // Always throw exception - Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(), + Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class), Mockito.anyString(), Mockito.anyString()); spyIcebergOverwritePartitionsStep.execute(); } catch (RuntimeException e) { - Mockito.verify(mockIcebergTable, Mockito.times(3)).overwritePartition(Mockito.anyList(), + Mockito.verify(mockIcebergTable, Mockito.times(3)).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class), Mockito.anyString(), Mockito.anyString()); assertRetryTimes(e, 3); } catch (IOException e) { @@ -112,7 +115,7 @@ public void testExecuteWithCustomRetryConfig() throws IOException { int retryCount = 7; mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX + "." + RETRY_TIMES, Integer.toString(retryCount)); - spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, + spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, mockSchema, testPartitionColName, testPartitionColValue, mockProperties)); spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles()); @@ -121,11 +124,11 @@ public void testExecuteWithCustomRetryConfig() throws IOException { Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog(); try { // Always throw exception - Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(), + Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class), Mockito.anyString(), Mockito.anyString()); spyIcebergOverwritePartitionsStep.execute(); } catch (RuntimeException e) { - Mockito.verify(mockIcebergTable, Mockito.times(retryCount)).overwritePartition(Mockito.anyList(), + Mockito.verify(mockIcebergTable, Mockito.times(retryCount)).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class), Mockito.anyString(), Mockito.anyString()); assertRetryTimes(e, retryCount); } catch (IOException e) { diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinderTest.java new file mode 100644 index 00000000000..ac66b13fcfa --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinderTest.java @@ -0,0 +1,171 @@ +package org.apache.gobblin.data.management.copy.iceberg; + +import java.util.Collections; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hive.HiveMetastoreTest; +import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder; +import org.junit.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Unit tests for {@link IcebergPartitionDatasetFinder}. + */ +public class IcebergPartitionDatasetFinderTest extends HiveMetastoreTest { + + // schema1 -> id (string, required) + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema1 = + SchemaBuilder.record("test") + .fields() + .name("id") + .type() + .stringType() + .noDefault() + .endRecord(); + + // schema2 -> id (string, required), data (string, optional, default = "default") + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema2 = + SchemaBuilder.record("test") + .fields() + .name("id") + .type() + .stringType() + .noDefault() + .name("data") + .type() + .stringType() + .stringDefault("default") + .endRecord(); + + // schema3 -> id (string, required), details (string, optional, default = "default") + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema3 = + SchemaBuilder.record("test") + .fields() + .name("id") + .type() + .stringType() + .noDefault() + .name("details") + .type() + .stringType() + .stringDefault("default") + .endRecord(); + + // schema4 -> id (string, optional) + private static final org.apache.iceberg.shaded.org.apache.avro.Schema avroDataSchema4 = + SchemaBuilder.record("test") + .fields() + .name("id") + .type() + .unionOf() + .nullType() + .and() + .stringType() + .endUnion() + .noDefault() + .endRecord(); + + private static final Schema schema1 = AvroSchemaUtil.toIceberg(avroDataSchema1); + private static final Schema schema2 = AvroSchemaUtil.toIceberg(avroDataSchema2); + private static final Schema schema3 = AvroSchemaUtil.toIceberg(avroDataSchema3); + private static final Schema schema4 = AvroSchemaUtil.toIceberg(avroDataSchema4); + +// private static final Schema schema1 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema1") +// .fields() +// .requiredInt("field1") +// .endRecord()); +// private static final Schema schema2 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema1") +// .fields() +// .requiredLong("field1") +// .requiredString("field2") +// .endRecord()); +// private static final Schema schema3 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema1") +// .fields() +// .requiredLong("field1") +// .requiredString("randomField") +// .endRecord()); +// private static final Schema schema4 = AvroSchemaUtil.toIceberg(SchemaBuilder.record("schema1") +// .fields() +// .optionalLong("field1") +// .endRecord()); + + private static final PartitionSpec icebergPartitionSpec = PartitionSpec.builderFor(schema1) + .identity("id") + .build(); + + private final String dbName = "myicebergdb"; + private final String tableName1 = "table1"; + private final String tableName2 = "table2"; + private final String tableName3 = "table3"; + private final String tableName4 = "table4"; + private TableIdentifier tableId1 = TableIdentifier.of(dbName, tableName1); + private TableIdentifier tableId2 = TableIdentifier.of(dbName, tableName2); + private TableIdentifier tableId3 = TableIdentifier.of(dbName, tableName3); + private TableIdentifier tableId4 = TableIdentifier.of(dbName, tableName4); + private Table table1; + private Table table2; + private Table table3; + private Table table4; + + @BeforeClass + public void setUp() throws Exception { + startMetastore(); + catalog.createNamespace(Namespace.of(dbName)); + } + + @BeforeMethod + public void setUpEachTest() { + table1 = catalog.createTable(tableId1, schema1, icebergPartitionSpec, Collections.singletonMap("format-version", "2")); + table2 = catalog.createTable(tableId2, schema2, icebergPartitionSpec, Collections.singletonMap("format-version", "2")); + table3 = catalog.createTable(tableId3, schema3, icebergPartitionSpec, Collections.singletonMap("format-version", "2")); + table4 = catalog.createTable(tableId4, schema4, icebergPartitionSpec, Collections.singletonMap("format-version", "2")); + } + + @AfterMethod + public void cleanUpEachTest() { + catalog.dropTable(tableId1); + catalog.dropTable(tableId2); + catalog.dropTable(tableId3); + catalog.dropTable(tableId4); + } + + @Test + public void validateSchemaEvolutionFailureTest() { + try { + IcebergPartitionDatasetFinder.validateSchemaEvolution(table1, table2); + Assert.fail("Expected RuntimeException was not thrown"); + } catch (RuntimeException e) { + Assert.assertTrue(e.getMessage().contains("Schema evolution not supported")); + } + + try { + IcebergPartitionDatasetFinder.validateSchemaEvolution(table1, table3); + Assert.fail("Expected RuntimeException was not thrown"); + } catch (RuntimeException e) { + Assert.assertTrue(e.getMessage().contains("Schema evolution not supported")); + } + + try { + IcebergPartitionDatasetFinder.validateSchemaEvolution(table2, table3); + Assert.fail("Expected RuntimeException was not thrown"); + } catch (RuntimeException e) { + Assert.assertTrue(e.getMessage().contains("Schema evolution not supported")); + } + } + + @Test + public void validateSchemaEvolutionSuccessTest() { + IcebergPartitionDatasetFinder.validateSchemaEvolution(table2, table1); + IcebergPartitionDatasetFinder.validateSchemaEvolution(table3, table1); + IcebergPartitionDatasetFinder.validateSchemaEvolution(table4, table1); + IcebergPartitionDatasetFinder.validateSchemaEvolution(table1, table4); + } +} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java index 90ba02b8489..d920ecbe19b 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java @@ -321,7 +321,7 @@ public void testGetPartitionSpecificDataFiles() throws IOException { /** Verify that overwritePartition replace data files belonging to given partition col and value */ @Test - public void testOverwritePartition() throws IOException { + public void testOverwriteSchemaAndPartition() throws IOException { // Note - any specific file path format is not mandatory to be mapped to specific partition List paths = Arrays.asList( "/path/tableName/data/id=1/file1.orc", @@ -351,7 +351,7 @@ public void testOverwritePartition() throws IOException { List partition2DataFiles = createDataFiles(paths2.stream().collect(Collectors.toMap(Function.identity(), v -> partition2Data))); // here, since partition data with value 2 doesn't exist yet, // we expect it to get added to the table, w/o changing or deleting any other partitions - icebergTable.overwritePartition(partition2DataFiles, "id", "2"); + icebergTable.overwriteSchemaAndPartition(partition2DataFiles, icebergTable.getUnderlyingIcebergTable().schema(), "id", "2"); List expectedPaths2 = new ArrayList<>(paths); expectedPaths2.addAll(paths2); verifyAnyOrder(expectedPaths2, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); @@ -363,7 +363,7 @@ public void testOverwritePartition() throws IOException { // Reusing same partition data to create data file with different paths List partition1NewDataFiles = createDataFiles(paths3.stream().collect(Collectors.toMap(Function.identity(), v -> partition1Data))); // here, since partition data with value 1 already exists, we expect it to get updated in the table with newer path - icebergTable.overwritePartition(partition1NewDataFiles, "id", "1"); + icebergTable.overwriteSchemaAndPartition(partition1NewDataFiles, icebergTable.getUnderlyingIcebergTable().schema(), "id", "1"); List expectedPaths3 = new ArrayList<>(paths2); expectedPaths3.addAll(paths3); verifyAnyOrder(expectedPaths3, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match");