Skip to content

Commit a8df020

Browse files
authored
feat: Support of branches in add_files (#2485)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> Closes #2428 # Rationale for this change Support to add files to iceberg branches. Currently, you can only add files to the main branch ## Are these changes tested? Yes ## Are there any user-facing changes? New optional paramater for `branch` in the add_files method of the Table API <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 411526a commit a8df020

File tree

2 files changed

+49
-5
lines changed

2 files changed

+49
-5
lines changed

pyiceberg/table/__init__.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,11 @@ def upsert(
859859
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
860860

861861
def add_files(
862-
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
862+
self,
863+
file_paths: List[str],
864+
snapshot_properties: Dict[str, str] = EMPTY_DICT,
865+
check_duplicate_files: bool = True,
866+
branch: Optional[str] = MAIN_BRANCH,
863867
) -> None:
864868
"""
865869
Shorthand API for adding files as data files to the table transaction.
@@ -888,12 +892,12 @@ def add_files(
888892
self.set_properties(
889893
**{TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()}
890894
)
891-
with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
895+
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
892896
data_files = _parquet_files_to_data_files(
893897
table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
894898
)
895899
for data_file in data_files:
896-
update_snapshot.append_data_file(data_file)
900+
append_files.append_data_file(data_file)
897901

898902
def update_spec(self) -> UpdateSpec:
899903
"""Create a new UpdateSpec to update the partitioning of the table.
@@ -1431,7 +1435,11 @@ def delete(
14311435
)
14321436

14331437
def add_files(
1434-
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
1438+
self,
1439+
file_paths: List[str],
1440+
snapshot_properties: Dict[str, str] = EMPTY_DICT,
1441+
check_duplicate_files: bool = True,
1442+
branch: Optional[str] = MAIN_BRANCH,
14351443
) -> None:
14361444
"""
14371445
Shorthand API for adding files as data files to the table.
@@ -1444,7 +1452,10 @@ def add_files(
14441452
"""
14451453
with self.transaction() as tx:
14461454
tx.add_files(
1447-
file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files
1455+
file_paths=file_paths,
1456+
snapshot_properties=snapshot_properties,
1457+
check_duplicate_files=check_duplicate_files,
1458+
branch=branch,
14481459
)
14491460

14501461
def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:

tests/integration/test_add_files.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,3 +926,36 @@ def test_add_files_hour_transform(session_catalog: Catalog) -> None:
926926
writer.write_table(arrow_table)
927927

928928
tbl.add_files(file_paths=[file_path])
929+
930+
931+
@pytest.mark.integration
932+
def test_add_files_to_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
933+
identifier = f"default.test_add_files_branch_v{format_version}"
934+
branch = "branch1"
935+
936+
tbl = _create_table(session_catalog, identifier, format_version)
937+
938+
file_paths = [f"s3://warehouse/default/addfile/v{format_version}/test-{i}.parquet" for i in range(5)]
939+
# write parquet files
940+
for file_path in file_paths:
941+
fo = tbl.io.new_output(file_path)
942+
with fo.create(overwrite=True) as fos:
943+
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
944+
writer.write_table(ARROW_TABLE)
945+
946+
# Dummy write to avoid failures on creating branch in empty table
947+
tbl.append(ARROW_TABLE)
948+
assert tbl.metadata.current_snapshot_id is not None
949+
tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit()
950+
951+
# add the parquet files as data files
952+
tbl.add_files(file_paths=file_paths, branch=branch)
953+
954+
df = spark.table(identifier)
955+
assert df.count() == 1, "Expected 1 row in Main table"
956+
957+
branch_df = spark.table(f"{identifier}.branch_{branch}")
958+
assert branch_df.count() == 6, "Expected 5 rows in branch"
959+
960+
for col in branch_df.columns:
961+
assert branch_df.filter(branch_df[col].isNotNull()).count() == 6, "Expected all 6 rows to be non-null"

0 commit comments

Comments
 (0)