Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ExecutionException;

import org.apache.iceberg.DataFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;

import com.github.rholder.retry.Attempt;
Expand Down Expand Up @@ -57,6 +58,7 @@
@Slf4j
public class IcebergOverwritePartitionsStep implements CommitStep {
private final String destTableIdStr;
private final Schema updatedSchema;
private final Properties properties;
// data files are populated once all the copy tasks are done. Each IcebergPartitionCopyableFile has a serialized data file
@Setter private List<DataFile> dataFiles;
Expand All @@ -75,8 +77,9 @@ public class IcebergOverwritePartitionsStep implements CommitStep {
* @param destTableIdStr the identifier of the destination table as a string
* @param properties the properties containing configuration
*/
public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, Properties properties) {
public IcebergOverwritePartitionsStep(String destTableIdStr, Schema updatedSchema, String partitionColName, String partitionValue, Properties properties) {
this.destTableIdStr = destTableIdStr;
this.updatedSchema = updatedSchema;
this.partitionColName = partitionColName;
this.partitionValue = partitionValue;
this.properties = properties;
Expand Down Expand Up @@ -110,7 +113,7 @@ public void execute() throws IOException {
);
Retryer<Void> overwritePartitionsRetryer = createOverwritePartitionsRetryer();
overwritePartitionsRetryer.call(() -> {
destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue);
destTable.overwriteSchemaAndPartition(dataFiles, updatedSchema, this.partitionColName, this.partitionValue);
return null;
});
log.info("~{}~ Successful partition overwrite - partition: {}; value: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ private Path addUUIDToPath(String filePathStr) {
private PostPublishStep createOverwritePostPublishStep() {
IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep(
this.getDestIcebergTable().getTableId().toString(),
this.getSrcIcebergTable().getUnderlyingIcebergTable().schema(),
this.partitionColumnName,
this.partitionColValue,
this.properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateSchema;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -53,6 +56,8 @@ protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, Ice
boolean validateStrictPartitionEquality = Boolean.parseBoolean(properties.getProperty(ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY,
DEFAULT_ICEBERG_DATASET_PARTITION_VALIDATE_STRICT_EQUALITY));

validateSchemaEvolution(srcIcebergTable.getUnderlyingIcebergTable(), destIcebergTable.getUnderlyingIcebergTable());

IcebergTableMetadataValidatorUtils.failUnlessCompatibleStructure(
srcIcebergTable.accessTableMetadata(), destIcebergTable.accessTableMetadata(), validateStrictPartitionEquality);

Expand All @@ -69,4 +74,28 @@ protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, Ice
return new IcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, fs,
getConfigShouldCopyMetadataPath(properties), partitionColumnName, partitionColumnValue);
}

protected static void validateSchemaEvolution(Table tableA, Table tableB) {
Schema schemaA = tableA.schema();
Schema schemaB = tableB.schema();

if (schemaA.sameSchema(schemaB)) {
log.info("Schemas are identical for source & destination tables");
return;
}
// Take all fields from schemaA and add those to schemaB. If the resulting schema is same as schemaA,
// then schema evolution is possible. This is because, schemaB can be a subset of schemaA in case of schema evolution.
// TODO: unionByNameWith does not handle addition of a new required column in schemaA.
UpdateSchema updateSchemaTxn = tableB.updateSchema().unionByNameWith(schemaA);
Schema updatedSchema = updateSchemaTxn.apply();

if (updatedSchema.sameSchema(schemaA)) {
log.info("Schema evolution supported for source & destination tables, Schemas: {} and {}",
schemaA, schemaB);
} else {
String errMsg = String.format("Schema evolution not supported between schemas: %s and %s",
schemaA, schemaB);
throw new RuntimeException(errMsg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -102,6 +105,10 @@ public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), Optional.of(current));
}

public Table getUnderlyingIcebergTable() {
return table;
}

/** @return metadata info for most recent snapshot, wherein manifests and their child data files ARE NOT listed */
public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws IOException {
TableMetadata current = accessTableMetadata();
Expand Down Expand Up @@ -292,7 +299,7 @@ public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> iceber
* @param partitionColName the partition column name whose data files are to be replaced
* @param partitionValue the partition column value on which data files will be replaced
*/
protected void overwritePartition(List<DataFile> dataFiles, String partitionColName, String partitionValue)
protected void overwriteSchemaAndPartition(List<DataFile> dataFiles, Schema updatedSchema, String partitionColName, String partitionValue)
throws TableNotFoundException {
if (dataFiles.isEmpty()) {
return;
Expand All @@ -304,15 +311,37 @@ protected void overwritePartition(List<DataFile> dataFiles, String partitionColN
} else {
log.warn("~{}~ No current snapshot found before overwrite", tableId);
}
OverwriteFiles overwriteFiles = this.table.newOverwrite();
overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, partitionValue));
dataFiles.forEach(overwriteFiles::addFile);
overwriteFiles.commit();
Transaction transaction = this.table.newTransaction();
updateSchema(transaction, updatedSchema);
overwritePartition(transaction, dataFiles, partitionColName, partitionValue);
transaction.commitTransaction();
this.tableOps.refresh();
// Note : this would only arise in a high-frequency commit scenario, but there's no guarantee that the current
// snapshot is necessarily the one from the commit just before. another writer could have just raced to commit
// in between.
log.info("~{}~ SnapshotId after overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId());
}

private void updateSchema(Transaction transaction, Schema updatedSchema) {
if (updatedSchema.sameSchema(table.schema())) {
log.info("~{}~ No schema update needed; existing schema is identical", tableId);
return;
}
UpdateSchema updateSchema = transaction.updateSchema().unionByNameWith(updatedSchema);
Schema modifiedSchema = updateSchema.apply();
if (modifiedSchema.sameSchema(updatedSchema)) {
log.info("~{}~ Updated schema to: {}", tableId, modifiedSchema);
} else {
log.warn("~{}~ Post-update schema differs from requested update; existing: {}, requested: {}, actual: {}",
tableId, table.schema(), updatedSchema, modifiedSchema);
throw new RuntimeException("Failed to update schema as requested");
}
}

private void overwritePartition(Transaction transaction, List<DataFile> dataFiles, String partitionColName,
String partitionValue) {
OverwriteFiles overwriteFiles = transaction.newOverwrite();
overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, partitionValue));
dataFiles.forEach(overwriteFiles::addFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,11 +35,7 @@ private IcebergTableMetadataValidatorUtils() {
}

/**
* Compares the metadata of the given two iceberg tables.
* <ul>
* <li>First compares the schema of the metadata.</li>
* <li>Then compares the partition spec of the metadata.</li>
* </ul>
* Compares the partition spec of the metadata.
* @param tableMetadataA the metadata of the first table
* @param tableMetadataB the metadata of the second table
* @param validateStrictPartitionEquality boolean value to control strictness of partition spec comparison
Expand All @@ -52,27 +47,6 @@ public static void failUnlessCompatibleStructure(TableMetadata tableMetadataA,
tableMetadataA.metadataFileLocation(),
tableMetadataB.metadataFileLocation());

Schema schemaA = tableMetadataA.schema();
Schema schemaB = tableMetadataB.schema();
// TODO: Need to add support for schema evolution
// This check needs to be broken down into multiple checks to support schema evolution
// Possible cases - schemaA == schemaB,
// - schemaA is subset of schemaB [ schemaB Evolved ],
// - schemaA is superset of schemaB [ schemaA Evolved ],
// - Other cases?
// Also consider using Strategy or any other design pattern for this to make it a better solution
if (!schemaA.sameSchema(schemaB)) {
String errMsg = String.format(
"Schema Mismatch between Metadata{%s} - SchemaId{%d} and Metadata{%s} - SchemaId{%d}",
tableMetadataA.metadataFileLocation(),
schemaA.schemaId(),
tableMetadataB.metadataFileLocation(),
schemaB.schemaId()
);
log.error(errMsg);
throw new IOException(errMsg);
}

PartitionSpec partitionSpecA = tableMetadataA.spec();
PartitionSpec partitionSpecB = tableMetadataB.spec();
// .compatibleWith() doesn't match for specId of partition spec and fieldId of partition fields while .equals() does
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.mockito.Mockito;
import org.testng.Assert;
Expand All @@ -40,17 +41,19 @@ public class IcebergOverwritePartitionsStepTest {
private final String testPartitionColName = "testPartition";
private final String testPartitionColValue = "testValue";
private IcebergTable mockIcebergTable;
private Schema mockSchema;
private IcebergCatalog mockIcebergCatalog;
private Properties mockProperties;
private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;

@BeforeMethod
public void setUp() throws IOException {
mockIcebergTable = Mockito.mock(IcebergTable.class);
mockSchema = Mockito.mock(Schema.class);
mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
mockProperties = new Properties();

spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr,
spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
testPartitionColName, testPartitionColValue, mockProperties));

spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
Expand All @@ -67,10 +70,10 @@ public void testNeverIsCompleted() {
@Test
public void testExecute() {
try {
Mockito.doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(), Mockito.anyString(),
Mockito.doNothing().when(mockIcebergTable).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class), Mockito.anyString(),
Mockito.anyString());
spyIcebergOverwritePartitionsStep.execute();
Mockito.verify(mockIcebergTable, Mockito.times(1)).overwritePartition(Mockito.anyList(),
Mockito.verify(mockIcebergTable, Mockito.times(1)).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class),
Mockito.anyString(), Mockito.anyString());
} catch (IOException e) {
Assert.fail(String.format("Unexpected IOException : %s", e));
Expand All @@ -81,10 +84,10 @@ public void testExecute() {
public void testExecuteWithRetry() {
try {
// first call throw exception which will be retried and on second call nothing happens
Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(),
Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class),
Mockito.anyString(), Mockito.anyString());
spyIcebergOverwritePartitionsStep.execute();
Mockito.verify(mockIcebergTable, Mockito.times(2)).overwritePartition(Mockito.anyList(),
Mockito.verify(mockIcebergTable, Mockito.times(2)).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class),
Mockito.anyString(), Mockito.anyString());
} catch (IOException e) {
Assert.fail(String.format("Unexpected IOException : %s", e));
Expand All @@ -95,11 +98,11 @@ public void testExecuteWithRetry() {
public void testExecuteWithDefaultRetry() throws IcebergTable.TableNotFoundException {
try {
// Always throw exception
Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(),
Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class),
Mockito.anyString(), Mockito.anyString());
spyIcebergOverwritePartitionsStep.execute();
} catch (RuntimeException e) {
Mockito.verify(mockIcebergTable, Mockito.times(3)).overwritePartition(Mockito.anyList(),
Mockito.verify(mockIcebergTable, Mockito.times(3)).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class),
Mockito.anyString(), Mockito.anyString());
assertRetryTimes(e, 3);
} catch (IOException e) {
Expand All @@ -112,7 +115,7 @@ public void testExecuteWithCustomRetryConfig() throws IOException {
int retryCount = 7;
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX + "." + RETRY_TIMES,
Integer.toString(retryCount));
spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr,
spyIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, mockSchema,
testPartitionColName, testPartitionColValue, mockProperties));

spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
Expand All @@ -121,11 +124,11 @@ public void testExecuteWithCustomRetryConfig() throws IOException {
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
try {
// Always throw exception
Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(),
Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class),
Mockito.anyString(), Mockito.anyString());
spyIcebergOverwritePartitionsStep.execute();
} catch (RuntimeException e) {
Mockito.verify(mockIcebergTable, Mockito.times(retryCount)).overwritePartition(Mockito.anyList(),
Mockito.verify(mockIcebergTable, Mockito.times(retryCount)).overwriteSchemaAndPartition(Mockito.anyList(), Mockito.any(Schema.class),
Mockito.anyString(), Mockito.anyString());
assertRetryTimes(e, retryCount);
} catch (IOException e) {
Expand Down
Loading