From d4ca6535a3cd978cd3b129ab5fcd1b6bc5a99999 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 10:46:37 -0800 Subject: [PATCH 01/27] s/"main"/MAIN_BRANCH (cherry picked from commit ad1ec672bac7cc7cf5e74defcee8a37df6c316f4) --- pyiceberg/table/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b43dc3206b..08f554bfaa 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -397,7 +397,7 @@ def set_ref_snapshot( ), ) - requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"),) + requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),) return self._apply(updates, requirements) def _set_ref_snapshot( @@ -3205,10 +3205,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch" + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch" ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), ) @property From 0b7aaaf928f57941da4f28694d46fdf98884245a Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 11:17:28 -0800 Subject: [PATCH 02/27] replace string literals (cherry picked from commit 40cf10e0c5d3d2a5366b5b9b85191aa319e151d0) --- pyiceberg/cli/console.py | 6 +++--- pyiceberg/table/__init__.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index d1833df081..213883bc70 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -32,7 +32,7 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError -from pyiceberg.table.refs import SnapshotRef +from pyiceberg.table.refs import SnapshotRef, SnapshotRefType DEFAULT_MIN_SNAPSHOTS_TO_KEEP = 1 DEFAULT_MAX_SNAPSHOT_AGE_MS = 432000000 @@ -420,7 +420,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None: refs = table.refs() if type: type = type.lower() - if type not in {"branch", "tag"}: + if type not in {SnapshotRefType.BRANCH, SnapshotRefType.TAG}: raise ValueError(f"Type must be either branch or tag, got: {type}") relevant_refs = [ @@ -434,7 +434,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None: def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]: retention_properties = {} - if ref.snapshot_ref_type == "branch": + if ref.snapshot_ref_type == SnapshotRefType.BRANCH: default_min_snapshots_to_keep = table_properties.get( "history.expire.min-snapshots-to-keep", DEFAULT_MIN_SNAPSHOTS_TO_KEEP ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 08f554bfaa..395b8fa688 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -117,7 +117,7 @@ NameMapping, update_mapping, ) -from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( Operation, Snapshot, @@ -813,7 +813,7 @@ class AddSnapshotUpdate(IcebergBaseModel): class SetSnapshotRefUpdate(IcebergBaseModel): action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref") ref_name: str = Field(alias="ref-name") - type: Literal["tag", "branch"] + type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH] snapshot_id: int = Field(alias="snapshot-id") max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)] max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)] @@ -3205,7 +3205,7 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch" + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH ), ), (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), From 23f04ec039f223c3995ac27dd75e72e8f21b178f Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 11:20:48 -0800 Subject: [PATCH 03/27] default writes to main branch (cherry picked from commit 6dbe68d8dcb282375f158fbcc0466b985092078e) --- pyiceberg/table/__init__.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 395b8fa688..5aac334764 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -397,7 +397,7 @@ def set_ref_snapshot( ), ) - requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),) + requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name),) return self._apply(updates, requirements) def _set_ref_snapshot( @@ -1540,7 +1540,7 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() - def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,branch: str = MAIN_BRANCH) -> None: """ Shorthand API for appending a PyArrow table to the table. @@ -3044,14 +3044,16 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] + _branch: str def __init__( self, operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, + snapshot_properties: Dict[str, str] = EMPTY_DICT ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() @@ -3059,6 +3061,7 @@ def __init__( self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() # Since we only support the main branch for now + self._branch = branch self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None ) @@ -3205,10 +3208,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), ) @property From af6ff9aebad07cde3fdb746cfb1d3c73ddcedc83 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Thu, 18 Jul 2024 22:33:17 +0530 Subject: [PATCH 04/27] Added some more methods for branches --- pyiceberg/table/__init__.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5aac334764..1cd0a2c04c 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1549,13 +1549,14 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.append(df=df, snapshot_properties=snapshot_properties) + tx.append(df=df, snapshot_properties=snapshot_properties,branch=branch) def overwrite( self, df: pa.Table, overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, ) -> None: """ Shorthand for overwriting the table with a PyArrow table. @@ -1573,7 +1574,7 @@ def overwrite( snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties) + tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch) def delete( self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT @@ -3053,14 +3054,13 @@ def __init__( io: FileIO, branch: str, commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() self._io = io self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() - # Since we only support the main branch for now self._branch = branch self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None @@ -3208,7 +3208,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH + snapshot_id=self._snapshot_id, + parent_snapshot_id=self._parent_snapshot_id, + ref_name=self._branch, + type=SnapshotRefType.BRANCH, ), ), (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), @@ -3260,10 +3263,13 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ): - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__( + operation=operation, transaction=transaction, io=io, branch=branch, commit_uuid=commit_uuid, snapshot_properties=snapshot_properties + ) self._predicate = AlwaysFalse() def _commit(self) -> UpdatesAndRequirements: @@ -3419,10 +3425,11 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties) self._target_size_bytes = PropertyUtil.property_as_int( self._transaction.table_metadata.properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, @@ -3538,21 +3545,23 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: class UpdateSnapshot: _transaction: Transaction _io: FileIO + _branch: str _snapshot_properties: Dict[str, str] - def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def __init__(self, transaction: Transaction, io: FileIO, branch:str, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: self._transaction = transaction self._io = io self._snapshot_properties = snapshot_properties + self._branch = branch def fast_append(self) -> FastAppendFiles: return FastAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties ) def merge_append(self) -> MergeAppendFiles: return MergeAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties ) def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: @@ -3564,6 +3573,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties, + branch=self._branch ) def delete(self) -> DeleteFiles: @@ -3572,6 +3582,7 @@ def delete(self) -> DeleteFiles: transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties, + branch=self._branch ) From 6fbf3f18c994b4c2f8d3484ee5c43cf4c3d9991f Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 10:46:37 -0800 Subject: [PATCH 05/27] s/"main"/MAIN_BRANCH (cherry picked from commit ad1ec672bac7cc7cf5e74defcee8a37df6c316f4) --- pyiceberg/table/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 34e9d2c53b..cb6e4efde4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -402,7 +402,7 @@ def set_ref_snapshot( ), ) - requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"),) + requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),) return self._apply(updates, requirements) def _set_ref_snapshot( @@ -3227,10 +3227,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch" + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch" ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), ) @property From 8ce1509c6ca5e6aa38d615c99bcdb25051bcb4ad Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 11:17:28 -0800 Subject: [PATCH 06/27] replace string literals (cherry picked from commit 40cf10e0c5d3d2a5366b5b9b85191aa319e151d0) --- pyiceberg/cli/console.py | 6 +++--- pyiceberg/table/__init__.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index d1833df081..213883bc70 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -32,7 +32,7 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError -from pyiceberg.table.refs import SnapshotRef +from pyiceberg.table.refs import SnapshotRef, SnapshotRefType DEFAULT_MIN_SNAPSHOTS_TO_KEEP = 1 DEFAULT_MAX_SNAPSHOT_AGE_MS = 432000000 @@ -420,7 +420,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None: refs = table.refs() if type: type = type.lower() - if type not in {"branch", "tag"}: + if type not in {SnapshotRefType.BRANCH, SnapshotRefType.TAG}: raise ValueError(f"Type must be either branch or tag, got: {type}") relevant_refs = [ @@ -434,7 +434,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None: def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]: retention_properties = {} - if ref.snapshot_ref_type == "branch": + if ref.snapshot_ref_type == SnapshotRefType.BRANCH: default_min_snapshots_to_keep = table_properties.get( "history.expire.min-snapshots-to-keep", DEFAULT_MIN_SNAPSHOTS_TO_KEEP ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index cb6e4efde4..b8362338f4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -115,7 +115,7 @@ NameMapping, update_mapping, ) -from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( Operation, Snapshot, @@ -828,7 +828,7 @@ class AddSnapshotUpdate(IcebergBaseModel): class SetSnapshotRefUpdate(IcebergBaseModel): action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref") ref_name: str = Field(alias="ref-name") - type: Literal["tag", "branch"] + type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH] snapshot_id: int = Field(alias="snapshot-id") max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)] max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)] @@ -3227,7 +3227,7 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch" + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH ), ), (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), From 6daf29e4459c9e64403e56eab07f07457bd392a9 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 27 Jan 2024 11:20:48 -0800 Subject: [PATCH 07/27] default writes to main branch (cherry picked from commit 6dbe68d8dcb282375f158fbcc0466b985092078e) --- pyiceberg/table/__init__.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b8362338f4..595adb701e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -402,7 +402,7 @@ def set_ref_snapshot( ), ) - requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),) + requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name),) return self._apply(updates, requirements) def _set_ref_snapshot( @@ -1562,7 +1562,7 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() - def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,branch: str = MAIN_BRANCH) -> None: """ Shorthand API for appending a PyArrow table to the table. @@ -3066,14 +3066,16 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] + _branch: str def __init__( self, operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT, + snapshot_properties: Dict[str, str] = EMPTY_DICT ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() @@ -3081,6 +3083,7 @@ def __init__( self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() # Since we only support the main branch for now + self._branch = branch self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None ) @@ -3227,10 +3230,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH + snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), ) @property From 09321cdbe3ee36bff32cc389c5679308a2e6e83e Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Thu, 18 Jul 2024 22:33:17 +0530 Subject: [PATCH 08/27] Added some more methods for branches --- pyiceberg/table/__init__.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 595adb701e..d2a94b2e35 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1571,13 +1571,14 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.append(df=df, snapshot_properties=snapshot_properties) + tx.append(df=df, snapshot_properties=snapshot_properties,branch=branch) def overwrite( self, df: pa.Table, overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, ) -> None: """ Shorthand for overwriting the table with a PyArrow table. @@ -1595,7 +1596,7 @@ def overwrite( snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties) + tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch) def delete( self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT @@ -3075,14 +3076,13 @@ def __init__( io: FileIO, branch: str, commit_uuid: Optional[uuid.UUID] = None, - snapshot_properties: Dict[str, str] = EMPTY_DICT + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() self._io = io self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() - # Since we only support the main branch for now self._branch = branch self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None @@ -3230,7 +3230,10 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH + snapshot_id=self._snapshot_id, + parent_snapshot_id=self._parent_snapshot_id, + ref_name=self._branch, + type=SnapshotRefType.BRANCH, ), ), (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), @@ -3282,10 +3285,13 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ): - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__( + operation=operation, transaction=transaction, io=io, branch=branch, commit_uuid=commit_uuid, snapshot_properties=snapshot_properties + ) self._predicate = AlwaysFalse() def _commit(self) -> UpdatesAndRequirements: @@ -3441,10 +3447,11 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties) self._target_size_bytes = PropertyUtil.property_as_int( self._transaction.table_metadata.properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, @@ -3560,21 +3567,23 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: class UpdateSnapshot: _transaction: Transaction _io: FileIO + _branch: str _snapshot_properties: Dict[str, str] - def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def __init__(self, transaction: Transaction, io: FileIO, branch:str, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: self._transaction = transaction self._io = io self._snapshot_properties = snapshot_properties + self._branch = branch def fast_append(self) -> FastAppendFiles: return FastAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties ) def merge_append(self) -> MergeAppendFiles: return MergeAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties ) def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: @@ -3586,6 +3595,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles: transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties, + branch=self._branch ) def delete(self) -> DeleteFiles: @@ -3594,6 +3604,7 @@ def delete(self) -> DeleteFiles: transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties, + branch=self._branch ) From 45b01a68e1d4572494082a09b49883ecdf00b329 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Sun, 13 Oct 2024 04:50:17 +0530 Subject: [PATCH 09/27] Updated antries for branches --- pyiceberg/table/__init__.py | 32 +++++++++++++++---------- pyiceberg/table/update/__init__.py | 4 ++-- pyiceberg/table/update/snapshot.py | 38 +++++++++++++++++++++++------- 3 files changed, 52 insertions(+), 22 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 52dca937f8..5f4f24f40a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -78,7 +78,7 @@ from pyiceberg.table.name_mapping import ( NameMapping, ) -from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef from pyiceberg.table.snapshots import ( Snapshot, SnapshotLogEntry, @@ -402,21 +402,22 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.table_metadata.name_mapping(), ) - def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot: + def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> UpdateSnapshot: """Create a new UpdateSnapshot to produce a new snapshot for the table. Returns: A new UpdateSnapshot """ - return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties) + return UpdateSnapshot(self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties) - def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None: """ Shorthand API for appending a PyArrow table to a table transaction. Args: df: The Arrow dataframe that will be appended to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the overwrite operation """ try: import pyarrow as pa @@ -444,7 +445,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, ) - update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties) + update_snapshot = self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties) append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append with append_method() as append_files: @@ -461,6 +462,7 @@ def overwrite( df: pa.Table, overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, ) -> None: """ Shorthand for adding a table overwrite with a PyArrow table to the transaction. @@ -476,6 +478,7 @@ def overwrite( overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the overwrite operation """ try: import pyarrow as pa @@ -500,7 +503,7 @@ def overwrite( self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties) - with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: + with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( @@ -509,7 +512,12 @@ def overwrite( for data_file in data_files: update_snapshot.append_data_file(data_file) - def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def delete( + self, + delete_filter: Union[str, BooleanExpression], + snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, + ) -> None: """ Shorthand for deleting record from a table. @@ -537,7 +545,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti if isinstance(delete_filter, str): delete_filter = _parse_row_filter(delete_filter) - with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot: + with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).delete() as delete_snapshot: delete_snapshot.delete_by_predicate(delete_filter) # Check if there are any files that require an actual rewrite of a data file @@ -585,7 +593,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti )) if len(replaced_files) > 0: - with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite( + with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).overwrite( commit_uuid=commit_uuid ) as overwrite_snapshot: for original_data_file, replaced_data_files in replaced_files: @@ -1003,7 +1011,7 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() - def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,branch: str = MAIN_BRANCH) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None: """ Shorthand API for appending a PyArrow table to the table. @@ -1012,7 +1020,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.append(df=df, snapshot_properties=snapshot_properties,branch=branch) + tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch) def overwrite( self, @@ -1037,7 +1045,7 @@ def overwrite( snapshot_properties: Custom properties to be added to the snapshot summary """ with self.transaction() as tx: - tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch) + tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch) def delete( self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 6e14046f9a..d02fdc39b6 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -30,7 +30,7 @@ from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil -from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, Snapshot, @@ -136,7 +136,7 @@ class AddSnapshotUpdate(IcebergBaseModel): class SetSnapshotRefUpdate(IcebergBaseModel): action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref") ref_name: str = Field(alias="ref-name") - type: Literal["tag", "branch"] + type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH] snapshot_id: int = Field(alias="snapshot-id") max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)] max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)] diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 47e5fc55e3..3af3ba9a23 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -55,6 +55,7 @@ from pyiceberg.partitioning import ( PartitionSpec, ) +from pyiceberg.table.refs import SnapshotRefType from pyiceberg.table.snapshots import ( Operation, Snapshot, @@ -103,12 +104,14 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] + _branch: str def __init__( self, operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: @@ -117,7 +120,7 @@ def __init__( self._io = io self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() - # Since we only support the main branch for now + self._branch = branch self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None ) @@ -272,10 +275,13 @@ def _commit(self) -> UpdatesAndRequirements: ( AddSnapshotUpdate(snapshot=snapshot), SetSnapshotRefUpdate( - snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch" + snapshot_id=self._snapshot_id, + parent_snapshot_id=self._parent_snapshot_id, + ref_name=self._branch, + type=SnapshotRefType.BRANCH, ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), + (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), ) @property @@ -324,10 +330,11 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ): - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties) self._predicate = AlwaysFalse() def _commit(self) -> UpdatesAndRequirements: @@ -482,12 +489,13 @@ def __init__( operation: Operation, transaction: Transaction, io: FileIO, + branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: from pyiceberg.table import TableProperties - super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties) self._target_size_bytes = property_as_int( self._transaction.table_metadata.properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, @@ -603,21 +611,33 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: class UpdateSnapshot: _transaction: Transaction _io: FileIO + _branch: str _snapshot_properties: Dict[str, str] - def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def __init__( + self, transaction: Transaction, io: FileIO, branch: str, snapshot_properties: Dict[str, str] = EMPTY_DICT + ) -> None: self._transaction = transaction self._io = io self._snapshot_properties = snapshot_properties + self._branch = branch def fast_append(self) -> _FastAppendFiles: return _FastAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, + transaction=self._transaction, + io=self._io, + branch=self._branch, + snapshot_properties=self._snapshot_properties, ) def merge_append(self) -> _MergeAppendFiles: return _MergeAppendFiles( - operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + operation=Operation.APPEND, + transaction=self._transaction, + io=self._io, + branch=self._branch, + snapshot_properties=self._snapshot_properties, ) def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles: @@ -628,6 +648,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles: else Operation.APPEND, transaction=self._transaction, io=self._io, + branch=self._branch, snapshot_properties=self._snapshot_properties, ) @@ -636,6 +657,7 @@ def delete(self) -> _DeleteFiles: operation=Operation.DELETE, transaction=self._transaction, io=self._io, + branch=self._branch, snapshot_properties=self._snapshot_properties, ) From 917b044b2a3e244a7a25919325cc619fd8992500 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Tue, 15 Oct 2024 05:23:32 +0530 Subject: [PATCH 10/27] Fixed some bugs --- pyiceberg/table/__init__.py | 3 ++- pyiceberg/table/update/__init__.py | 2 ++ pyiceberg/table/update/snapshot.py | 9 ++++++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5f4f24f40a..2260cd150b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -417,7 +417,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, Args: df: The Arrow dataframe that will be appended to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary - branch: Branch Reference to run the overwrite operation + branch: Branch Reference to run the append operation """ try: import pyarrow as pa @@ -529,6 +529,7 @@ def delete( Args: delete_filter: A boolean expression to delete rows from a table snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the delete operation """ from pyiceberg.io.pyarrow import ( ArrowScan, diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index d02fdc39b6..03b9855307 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -597,6 +597,8 @@ class AssertRefSnapshotId(ValidatableTableRequirement): def validate(self, base_metadata: Optional[TableMetadata]) -> None: if base_metadata is None: raise CommitFailedException("Requirement failed: current table metadata is missing") + elif len(base_metadata.snapshots) == 0 and self.ref != MAIN_BRANCH: + raise CommitFailedException(f"Requirement failed: No snapshot available in table for ref: {self.ref}") elif snapshot_ref := base_metadata.refs.get(self.ref): ref_type = snapshot_ref.snapshot_ref_type if self.snapshot_id is None: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 3af3ba9a23..902141fa5c 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -281,7 +281,14 @@ def _commit(self) -> UpdatesAndRequirements: type=SnapshotRefType.BRANCH, ), ), - (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),), + ( + AssertRefSnapshotId( + snapshot_id=self._transaction.table_metadata.refs[self._branch].snapshot_id + if self._branch in self._transaction.table_metadata.refs + else self._transaction.table_metadata.current_snapshot_id, + ref=self._branch, + ), + ), ) @property From 398f6c0ee33cee4ba8564d62ede143de52c1243f Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Wed, 16 Oct 2024 05:29:23 +0530 Subject: [PATCH 11/27] Fixed bugs in delete and overwrite --- pyiceberg/table/__init__.py | 14 ++++++++++---- pyiceberg/table/update/snapshot.py | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 2260cd150b..662a9795d2 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -501,7 +501,7 @@ def overwrite( self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us ) - self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties) + self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch) with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty @@ -554,7 +554,7 @@ def delete( bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter) - files = self._scan(row_filter=delete_filter).plan_files() + files = self._scan(row_filter=delete_filter).use_ref(branch).plan_files() commit_uuid = uuid.uuid4() counter = itertools.count(0) @@ -1019,6 +1019,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, Args: df: The Arrow dataframe that will be appended to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the delete operation """ with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch) @@ -1044,12 +1045,16 @@ def overwrite( overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the delete operation """ with self.transaction() as tx: tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch) def delete( - self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT + self, + delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, ) -> None: """ Shorthand for deleting rows from the table. @@ -1057,9 +1062,10 @@ def delete( Args: delete_filter: The predicate that used to remove rows snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the delete operation """ with self.transaction() as tx: - tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties) + tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) def add_files( self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 902141fa5c..135e4f5293 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -122,7 +122,7 @@ def __init__( self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() self._branch = branch self._parent_snapshot_id = ( - snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None + snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._branch)) else None ) self._added_data_files = [] self._deleted_data_files = set() @@ -548,7 +548,7 @@ def _existing_manifests(self) -> List[ManifestFile]: """Determine if there are any existing manifest files.""" existing_files = [] - if snapshot := self._transaction.table_metadata.current_snapshot(): + if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._branch): for manifest_file in snapshot.manifests(io=self._io): entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files] From b7b8ba0b945f592d771aca936d1ceafb78a478c5 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Wed, 16 Oct 2024 12:23:35 +0530 Subject: [PATCH 12/27] Added tests and some refactoring --- pyiceberg/table/__init__.py | 2 +- pyiceberg/table/update/snapshot.py | 4 +- tests/integration/test_writes/test_writes.py | 80 +++++++++++++++++++- 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 662a9795d2..d34f875b29 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -501,7 +501,7 @@ def overwrite( self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us ) - self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch) + self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch) with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 135e4f5293..b3ef552c55 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -55,7 +55,7 @@ from pyiceberg.partitioning import ( PartitionSpec, ) -from pyiceberg.table.refs import SnapshotRefType +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRefType from pyiceberg.table.snapshots import ( Operation, Snapshot, @@ -622,7 +622,7 @@ class UpdateSnapshot: _snapshot_properties: Dict[str, str] def __init__( - self, transaction: Transaction, io: FileIO, branch: str, snapshot_properties: Dict[str, str] = EMPTY_DICT + self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH ) -> None: self._transaction = transaction self._io = io diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index fc2746c614..39e7758461 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -39,7 +39,7 @@ from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.rest import RestCatalog from pyiceberg.catalog.sql import SqlCatalog -from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.exceptions import CommitFailedException, NoSuchTableError from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not from pyiceberg.io.pyarrow import _dataframe_to_data_files from pyiceberg.partitioning import PartitionField, PartitionSpec @@ -1015,7 +1015,8 @@ def test_table_write_schema_with_valid_nullability_diff( NestedField(field_id=1, name="long", field_type=LongType(), required=False), ) other_schema = pa.schema(( - pa.field("long", pa.int64(), nullable=False), # can support writing required pyarrow field to optional Iceberg field + pa.field("long", pa.int64(), nullable=False), + # can support writing required pyarrow field to optional Iceberg field )) arrow_table = pa.Table.from_pydict( { @@ -1062,7 +1063,8 @@ def test_table_write_schema_with_valid_upcast( pa.field("list", pa.large_list(pa.int64()), nullable=False), pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False), pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double - pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16 + pa.field("uuid", pa.binary(length=16), nullable=True), + # can UUID is read as fixed length binary of length 16 )) ) lhs = spark.table(f"{identifier}").toPandas() @@ -1448,3 +1450,75 @@ def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) -> EqualTo("category", "A"), ), ) + + +@pytest.mark.integration +def test_append_to_non_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_non_existing_branch" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, []) + with pytest.raises(CommitFailedException, match="No snapshot available in table for ref:"): + tbl.append(arrow_table_with_null, branch="non_existing_branch") + + +@pytest.mark.integration +def test_append_to_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_existing_branch_append" + branch = "existing_branch" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + + assert tbl.metadata.current_snapshot_id is not None + + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + tbl.append(arrow_table_with_null, branch=branch) + + assert len(tbl.scan().use_ref(branch).to_arrow()) == 6 + assert len(tbl.scan().to_arrow()) == 3 + branch_snapshot = tbl.metadata.snapshot_by_name(branch) + assert branch_snapshot is not None + main_snapshot = tbl.metadata.snapshot_by_name("main") + assert main_snapshot is not None + assert branch_snapshot.parent_snapshot_id == main_snapshot.snapshot_id + + +@pytest.mark.integration +def test_delete_to_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_existing_branch_delete" + branch = "existing_branch" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + + assert tbl.metadata.current_snapshot_id is not None + + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + tbl.delete(delete_filter="int = 9", branch=branch) + + assert len(tbl.scan().use_ref(branch).to_arrow()) == 2 + assert len(tbl.scan().to_arrow()) == 3 + branch_snapshot = tbl.metadata.snapshot_by_name(branch) + assert branch_snapshot is not None + main_snapshot = tbl.metadata.snapshot_by_name("main") + assert main_snapshot is not None + assert branch_snapshot.parent_snapshot_id == main_snapshot.snapshot_id + + +@pytest.mark.integration +def test_overwrite_to_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_existing_branch_overwrite" + branch = "existing_branch" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + + assert tbl.metadata.current_snapshot_id is not None + + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + tbl.overwrite(arrow_table_with_null, branch=branch) + + assert len(tbl.scan().use_ref(branch).to_arrow()) == 3 + assert len(tbl.scan().to_arrow()) == 3 + branch_snapshot = tbl.metadata.snapshot_by_name(branch) + assert branch_snapshot is not None and branch_snapshot.parent_snapshot_id is not None + delete_snapshot = tbl.metadata.snapshot_by_id(branch_snapshot.parent_snapshot_id) + assert delete_snapshot is not None + main_snapshot = tbl.metadata.snapshot_by_name("main") + assert main_snapshot is not None + assert ( + delete_snapshot.parent_snapshot_id == main_snapshot.snapshot_id + ) # Currently overwrite is a delete followed by an append operation From ee591b476becc77193b8bb28b67d3d237da81131 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Wed, 16 Oct 2024 13:44:38 +0530 Subject: [PATCH 13/27] Added another integration test --- tests/integration/test_writes/test_writes.py | 25 ++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 39e7758461..c6b4b5257b 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1522,3 +1522,28 @@ def test_overwrite_to_existing_branch(session_catalog: Catalog, arrow_table_with assert ( delete_snapshot.parent_snapshot_id == main_snapshot.snapshot_id ) # Currently overwrite is a delete followed by an append operation + + +@pytest.mark.integration +def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_concurrent_branch_operations" + branch1 = "existing_branch_1" + branch2 = "existing_branch_2" + + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + + assert tbl.metadata.current_snapshot_id is not None + + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch1).commit() + + tbl.delete("int = 9", branch=branch1) + + tbl.append(arrow_table_with_null) + + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch2).commit() + + tbl.overwrite(arrow_table_with_null, branch=branch2) + + assert len(tbl.scan().use_ref(branch1).to_arrow()) == 2 + assert len(tbl.scan().use_ref(branch2).to_arrow()) == 3 + assert len(tbl.scan().to_arrow()) == 6 From e81907d296f8fc2f2f9cc8917e67933120506b11 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Thu, 17 Oct 2024 01:59:16 +0530 Subject: [PATCH 14/27] Fixed bug: concurrent same name branch and tag writes --- pyiceberg/table/__init__.py | 3 ++- pyiceberg/table/update/__init__.py | 3 +++ pyiceberg/table/update/snapshot.py | 1 + tests/table/test_init.py | 29 ++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index d34f875b29..3c14c565d7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -349,7 +349,7 @@ def set_ref_snapshot( ), ) - requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name),) + requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name, ref_type=type),) return self._apply(updates, requirements) def _set_ref_snapshot( @@ -380,6 +380,7 @@ def _set_ref_snapshot( AssertRefSnapshotId( snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None, ref=ref_name, + ref_type=type, ), ) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 03b9855307..8c511b9763 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -592,6 +592,7 @@ class AssertRefSnapshotId(ValidatableTableRequirement): type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id") ref: str = Field(...) + ref_type: SnapshotRefType = Field(...) snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id") def validate(self, base_metadata: Optional[TableMetadata]) -> None: @@ -607,6 +608,8 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None: raise CommitFailedException( f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}" ) + elif ref_type != self.ref_type: + raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} can't be changed to type {self.ref_type}") elif self.snapshot_id is not None: raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}") diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index b3ef552c55..638f3d4d7b 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -287,6 +287,7 @@ def _commit(self) -> UpdatesAndRequirements: if self._branch in self._transaction.table_metadata.refs else self._transaction.table_metadata.current_snapshot_id, ref=self._branch, + ref_type=SnapshotRefType.BRANCH, ), ), ) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 1c4029a292..d0ae5e8da4 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -49,7 +49,7 @@ _match_deletes_to_data_file, ) from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id -from pyiceberg.table.refs import SnapshotRef +from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, Operation, @@ -982,28 +982,43 @@ def test_assert_table_uuid(table_v2: Table) -> None: def test_assert_ref_snapshot_id(table_v2: Table) -> None: base_metadata = table_v2.metadata - AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata) + AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id, ref_type=SnapshotRefType.BRANCH).validate( + base_metadata + ) with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): - AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None) + AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(None) with pytest.raises( CommitFailedException, match="Requirement failed: branch main was created concurrently", ): - AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata) + AssertRefSnapshotId(ref="main", snapshot_id=None, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) with pytest.raises( CommitFailedException, match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004", ): - AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata) + AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + + with pytest.raises( + CommitFailedException, + match="Requirement failed: branch or tag not_exist_branch is missing, expected 1", + ): + AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + + with pytest.raises( + CommitFailedException, + match="Requirement failed: branch or tag not_exist_tag is missing, expected 1", + ): + AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1, ref_type=SnapshotRefType.TAG).validate(base_metadata) + # existing Tag in metadata: test with pytest.raises( CommitFailedException, - match="Requirement failed: branch or tag not_exist is missing, expected 1", + match="Requirement failed: tag test can't be changed to type branch", ): - AssertRefSnapshotId(ref="not_exist", snapshot_id=1).validate(base_metadata) + AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) def test_assert_last_assigned_field_id(table_v2: Table) -> None: From bc6fb6883c1da195d40acded2c9d1684d2444975 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Thu, 14 Nov 2024 06:28:13 +0530 Subject: [PATCH 15/27] Added integration tests with spark --- tests/integration/test_writes/test_writes.py | 60 +++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index bdfc5f124f..35e2ccf668 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1648,7 +1648,7 @@ def test_overwrite_to_existing_branch(session_catalog: Catalog, arrow_table_with @pytest.mark.integration def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: - identifier = "default.test_concurrent_branch_operations" + identifier = "default.test_intertwined_branch_operations" branch1 = "existing_branch_1" branch2 = "existing_branch_2" @@ -1669,3 +1669,61 @@ def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_nu assert len(tbl.scan().use_ref(branch1).to_arrow()) == 2 assert len(tbl.scan().use_ref(branch2).to_arrow()) == 3 assert len(tbl.scan().to_arrow()) == 6 + + +@pytest.mark.integration +def test_branch_spark_write_py_read(session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table) -> None: + # Intialize table with branch + identifier = "default.test_branch_spark_write_py_read" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + branch = "existing_spark_branch" + + # Create branch in Spark + spark.sql(f"ALTER TABLE {identifier} CREATE BRANCH {branch}") + + # Spark Write + spark.sql( + f""" + DELETE FROM {identifier}.branch_{branch} + WHERE int = 9 + """ + ) + + # Refresh table to get new refs + tbl.refresh() + + # Python Read + assert len(tbl.scan().to_arrow()) == 3 + assert len(tbl.scan().use_ref(branch).to_arrow()) == 2 + + +@pytest.mark.integration +def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table) -> None: + # Intialize table with branch + identifier = "default.test_branch_py_write_spark_read" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + branch = "existing_py_branch" + + assert tbl.metadata.current_snapshot_id is not None + + # Create branch + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + + # Python Write + tbl.delete("int = 9", branch=branch) + + # Spark Read + main_df = spark.sql( + f""" + SELECT * + FROM {identifier} + """ + ) + branch_df = spark.sql( + f""" + SELECT * + FROM {identifier}.branch_{branch} + """ + ) + assert main_df.count() == 3 + assert branch_df.count() == 2 From 82e65e155aeda7f49395d381546512032e3f6a35 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 01:47:55 +0530 Subject: [PATCH 16/27] Fixed comments for AssertSnapshotRef --- pyiceberg/table/__init__.py | 1 - pyiceberg/table/update/__init__.py | 10 ++++++---- pyiceberg/table/update/snapshot.py | 1 - tests/integration/test_writes/test_writes.py | 4 ++-- tests/table/test_init.py | 20 +++++++++----------- 5 files changed, 17 insertions(+), 19 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 4913ac0baf..a0f420c2eb 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -341,7 +341,6 @@ def _set_ref_snapshot( AssertRefSnapshotId( snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None, ref=ref_name, - ref_type=type, ), ) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index de54eded3e..9dfe28acb1 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -609,14 +609,16 @@ class AssertRefSnapshotId(ValidatableTableRequirement): type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id") ref: str = Field(...) - ref_type: SnapshotRefType = Field(...) + # ref_type: SnapshotRefType = Field(...) snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id") def validate(self, base_metadata: Optional[TableMetadata]) -> None: if base_metadata is None: raise CommitFailedException("Requirement failed: current table metadata is missing") elif len(base_metadata.snapshots) == 0 and self.ref != MAIN_BRANCH: - raise CommitFailedException(f"Requirement failed: No snapshot available in table for ref: {self.ref}") + raise CommitFailedException( + f"Requirement failed: Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH." + ) elif snapshot_ref := base_metadata.refs.get(self.ref): ref_type = snapshot_ref.snapshot_ref_type if self.snapshot_id is None: @@ -625,8 +627,8 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None: raise CommitFailedException( f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}" ) - elif ref_type != self.ref_type: - raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} can't be changed to type {self.ref_type}") + elif ref_type == SnapshotRefType.TAG: + raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created") elif self.snapshot_id is not None: raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}") diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 638f3d4d7b..b3ef552c55 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -287,7 +287,6 @@ def _commit(self) -> UpdatesAndRequirements: if self._branch in self._transaction.table_metadata.refs else self._transaction.table_metadata.current_snapshot_id, ref=self._branch, - ref_type=SnapshotRefType.BRANCH, ), ), ) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 35e2ccf668..2624fcd6bb 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -44,7 +44,7 @@ from pyiceberg.io.pyarrow import _dataframe_to_data_files from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import TableProperties +from pyiceberg.table import TableProperties, MAIN_BRANCH from pyiceberg.table.sorting import SortDirection, SortField, SortOrder from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform from pyiceberg.types import ( @@ -1578,7 +1578,7 @@ def test_abort_table_transaction_on_exception( def test_append_to_non_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.test_non_existing_branch" tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, []) - with pytest.raises(CommitFailedException, match="No snapshot available in table for ref:"): + with pytest.raises(CommitFailedException, match=f"Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH."): tbl.append(arrow_table_with_null, branch="non_existing_branch") diff --git a/tests/table/test_init.py b/tests/table/test_init.py index d0ae5e8da4..78b3d31aac 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -49,7 +49,7 @@ _match_deletes_to_data_file, ) from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id -from pyiceberg.table.refs import SnapshotRef, SnapshotRefType +from pyiceberg.table.refs import SnapshotRef from pyiceberg.table.snapshots import ( MetadataLogEntry, Operation, @@ -982,43 +982,41 @@ def test_assert_table_uuid(table_v2: Table) -> None: def test_assert_ref_snapshot_id(table_v2: Table) -> None: base_metadata = table_v2.metadata - AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id, ref_type=SnapshotRefType.BRANCH).validate( - base_metadata - ) + AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata) with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): - AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(None) + AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None) with pytest.raises( CommitFailedException, match="Requirement failed: branch main was created concurrently", ): - AssertRefSnapshotId(ref="main", snapshot_id=None, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata) with pytest.raises( CommitFailedException, match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004", ): - AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata) with pytest.raises( CommitFailedException, match="Requirement failed: branch or tag not_exist_branch is missing, expected 1", ): - AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1).validate(base_metadata) with pytest.raises( CommitFailedException, match="Requirement failed: branch or tag not_exist_tag is missing, expected 1", ): - AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1, ref_type=SnapshotRefType.TAG).validate(base_metadata) + AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1).validate(base_metadata) # existing Tag in metadata: test with pytest.raises( CommitFailedException, - match="Requirement failed: tag test can't be changed to type branch", + match="Requirement failed: TAG test can't be updated once created", ): - AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004, ref_type=SnapshotRefType.BRANCH).validate(base_metadata) + AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004).validate(base_metadata) def test_assert_last_assigned_field_id(table_v2: Table) -> None: From 82e5b906206ab466775cbd44e5c2a7be66c922d1 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 02:08:22 +0530 Subject: [PATCH 17/27] Fixed comments and linter issues --- tests/integration/test_writes/test_writes.py | 7 +++++-- tests/table/test_init.py | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 2624fcd6bb..0af5b1e8a4 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -44,7 +44,8 @@ from pyiceberg.io.pyarrow import _dataframe_to_data_files from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import TableProperties, MAIN_BRANCH +from pyiceberg.table import TableProperties +from pyiceberg.table.refs import MAIN_BRANCH from pyiceberg.table.sorting import SortDirection, SortField, SortOrder from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform from pyiceberg.types import ( @@ -1578,7 +1579,9 @@ def test_abort_table_transaction_on_exception( def test_append_to_non_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.test_non_existing_branch" tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, []) - with pytest.raises(CommitFailedException, match=f"Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH."): + with pytest.raises( + CommitFailedException, match=f"Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH." + ): tbl.append(arrow_table_with_null, branch="non_existing_branch") diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 78b3d31aac..7a416ac83c 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -49,7 +49,7 @@ _match_deletes_to_data_file, ) from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id -from pyiceberg.table.refs import SnapshotRef +from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, Operation, @@ -1012,6 +1012,10 @@ def test_assert_ref_snapshot_id(table_v2: Table) -> None: AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1).validate(base_metadata) # existing Tag in metadata: test + ref_tag = table_v2.refs().get("test") + assert ref_tag is not None + assert ref_tag.snapshot_ref_type == SnapshotRefType.TAG, "TAG test should be present in table to be tested" + with pytest.raises( CommitFailedException, match="Requirement failed: TAG test can't be updated once created", From 84d0971a100e66a6fd8118806498ab21e397dc6c Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 02:11:55 +0530 Subject: [PATCH 18/27] Fixed comments --- pyiceberg/table/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index a0f420c2eb..b90936c532 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -988,7 +988,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, Args: df: The Arrow dataframe that will be appended to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary - branch: Branch Reference to run the delete operation + branch: Branch Reference to run the append operation """ with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch) @@ -1014,7 +1014,7 @@ def overwrite( overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite snapshot_properties: Custom properties to be added to the snapshot summary - branch: Branch Reference to run the delete operation + branch: Branch Reference to run the overwrite operation """ with self.transaction() as tx: tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch) From 3efe53cc35fe761c2e329599823a8b3accd4b327 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 02:22:28 +0530 Subject: [PATCH 19/27] Fixed comments --- tests/table/test_init.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 7a416ac83c..42c066ef01 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -49,7 +49,7 @@ _match_deletes_to_data_file, ) from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id -from pyiceberg.table.refs import SnapshotRef, SnapshotRefType +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( MetadataLogEntry, Operation, @@ -982,34 +982,31 @@ def test_assert_table_uuid(table_v2: Table) -> None: def test_assert_ref_snapshot_id(table_v2: Table) -> None: base_metadata = table_v2.metadata - AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata) + AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata) with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): - AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None) + AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=1).validate(None) with pytest.raises( CommitFailedException, - match="Requirement failed: branch main was created concurrently", + match=f"Requirement failed: branch {MAIN_BRANCH} was created concurrently", ): - AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata) + AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=None).validate(base_metadata) with pytest.raises( CommitFailedException, - match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004", + match=f"Requirement failed: branch {MAIN_BRANCH} has changed: expected id 1, found 3055729675574597004", ): - AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata) + AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=1).validate(base_metadata) - with pytest.raises( - CommitFailedException, - match="Requirement failed: branch or tag not_exist_branch is missing, expected 1", - ): - AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1).validate(base_metadata) + non_existing_ref = "not_exist_branch_or_tag" + assert table_v2.refs().get("not_exist_branch_or_tag") is None with pytest.raises( CommitFailedException, - match="Requirement failed: branch or tag not_exist_tag is missing, expected 1", + match=f"Requirement failed: branch or tag {non_existing_ref} is missing, expected 1", ): - AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1).validate(base_metadata) + AssertRefSnapshotId(ref=non_existing_ref, snapshot_id=1).validate(base_metadata) # existing Tag in metadata: test ref_tag = table_v2.refs().get("test") From dfedc63c04328d89fb61b0dcb5799ad39a94c4ff Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 05:42:32 +0530 Subject: [PATCH 20/27] Fixed a bug in tests --- pyiceberg/table/update/__init__.py | 5 ++--- tests/table/test_init.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 9dfe28acb1..671b3b9e94 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -609,7 +609,6 @@ class AssertRefSnapshotId(ValidatableTableRequirement): type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id") ref: str = Field(...) - # ref_type: SnapshotRefType = Field(...) snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id") def validate(self, base_metadata: Optional[TableMetadata]) -> None: @@ -624,11 +623,11 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None: if self.snapshot_id is None: raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} was created concurrently") elif self.snapshot_id != snapshot_ref.snapshot_id: + if ref_type == SnapshotRefType.TAG: + raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created") raise CommitFailedException( f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}" ) - elif ref_type == SnapshotRefType.TAG: - raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created") elif self.snapshot_id is not None: raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}") diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 42c066ef01..89e0577ae8 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1017,7 +1017,7 @@ def test_assert_ref_snapshot_id(table_v2: Table) -> None: CommitFailedException, match="Requirement failed: TAG test can't be updated once created", ): - AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004).validate(base_metadata) + AssertRefSnapshotId(ref="test", snapshot_id=3055729675574597004).validate(base_metadata) def test_assert_last_assigned_field_id(table_v2: Table) -> None: From 076a6d5c040ca19cdc58949e7c533418211dc027 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 24 Feb 2025 05:55:59 +0530 Subject: [PATCH 21/27] Fixed some more tests --- pyiceberg/table/update/__init__.py | 2 -- tests/table/test_init.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 671b3b9e94..249de747de 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -623,8 +623,6 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None: if self.snapshot_id is None: raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} was created concurrently") elif self.snapshot_id != snapshot_ref.snapshot_id: - if ref_type == SnapshotRefType.TAG: - raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created") raise CommitFailedException( f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}" ) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 89e0577ae8..15f27aba61 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1015,7 +1015,7 @@ def test_assert_ref_snapshot_id(table_v2: Table) -> None: with pytest.raises( CommitFailedException, - match="Requirement failed: TAG test can't be updated once created", + match="Requirement failed: tag test has changed: expected id 3055729675574597004, found 3051729675574597004", ): AssertRefSnapshotId(ref="test", snapshot_id=3055729675574597004).validate(base_metadata) From e4463df70a6f524dadc32ac006914682c84ab400 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 26 May 2025 05:07:10 +0530 Subject: [PATCH 22/27] Fixed linter and code errors --- pyiceberg/table/__init__.py | 45 ++++++++++++++------ tests/integration/test_writes/test_writes.py | 4 +- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index a4d763d6c4..b8bfdde2fc 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -398,7 +398,7 @@ def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanE expr = Or(expr, match_partition_expression) return expr - def _append_snapshot_producer(self, snapshot_properties: Dict[str, str],branch:str) -> _FastAppendFiles: + def _append_snapshot_producer(self, snapshot_properties: Dict[str, str], branch: str) -> _FastAppendFiles: """Determine the append type based on table properties. Args: @@ -411,7 +411,7 @@ def _append_snapshot_producer(self, snapshot_properties: Dict[str, str],branch:s TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, ) - update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties,branch=branch) + update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch) return update_snapshot.merge_append() if manifest_merge_enabled else update_snapshot.fast_append() def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: @@ -478,7 +478,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us ) - with self._append_snapshot_producer(snapshot_properties,branch=branch) as append_files: + with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = list( @@ -489,7 +489,9 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, for data_file in data_files: append_files.append_data_file(data_file) - def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def dynamic_partition_overwrite( + self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH + ) -> None: """ Shorthand for overwriting existing partitions with a PyArrow table. @@ -500,6 +502,7 @@ def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[st Args: df: The Arrow dataframe that will be used to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the dynamic partition overwrite operation """ try: import pyarrow as pa @@ -540,7 +543,7 @@ def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[st delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite) self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties) - with self._append_snapshot_producer(snapshot_properties) as append_files: + with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: append_files.commit_uuid = append_snapshot_commit_uuid for data_file in data_files: append_files.append_data_file(data_file) @@ -593,9 +596,14 @@ def overwrite( if overwrite_filter != AlwaysFalse(): # Only delete when the filter is != AlwaysFalse - self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch) + self.delete( + delete_filter=overwrite_filter, + case_sensitive=case_sensitive, + snapshot_properties=snapshot_properties, + branch=branch, + ) - with self._append_snapshot_producer(snapshot_properties,branch=branch) as append_files: + with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( @@ -640,7 +648,7 @@ def delete( if isinstance(delete_filter, str): delete_filter = _parse_row_filter(delete_filter) - with self.update_snapshot(snapshot_properties=snapshot_properties,branch=branch).delete() as delete_snapshot: + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot: delete_snapshot.delete_by_predicate(delete_filter, case_sensitive) # Check if there are any files that require an actual rewrite of a data file @@ -690,7 +698,9 @@ def delete( ) if len(replaced_files) > 0: - with self.update_snapshot(snapshot_properties=snapshot_properties,branch=branch).overwrite() as overwrite_snapshot: + with self.update_snapshot( + snapshot_properties=snapshot_properties, branch=branch + ).overwrite() as overwrite_snapshot: overwrite_snapshot.commit_uuid = commit_uuid for original_data_file, replaced_data_files in replaced_files: overwrite_snapshot.delete_data_file(original_data_file) @@ -1301,16 +1311,19 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch) - def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + def dynamic_partition_overwrite( + self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH + ) -> None: """Shorthand for dynamic overwriting the table with a PyArrow table. Old partitions are auto detected and replaced with data files created for input arrow table. Args: df: The Arrow dataframe that will be used to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the dynamic partition overwrite operation """ with self.transaction() as tx: - tx.dynamic_partition_overwrite(df=df, snapshot_properties=snapshot_properties) + tx.dynamic_partition_overwrite(df=df, snapshot_properties=snapshot_properties, branch=branch) def overwrite( self, @@ -1339,7 +1352,11 @@ def overwrite( """ with self.transaction() as tx: tx.overwrite( - df=df, overwrite_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch + df=df, + overwrite_filter=overwrite_filter, + case_sensitive=case_sensitive, + snapshot_properties=snapshot_properties, + branch=branch, ) def delete( @@ -1359,7 +1376,9 @@ def delete( branch: Branch Reference to run the delete operation """ with self.transaction() as tx: - tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch) + tx.delete( + delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch + ) def add_files( self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 0643ec8571..ee27791114 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1945,7 +1945,7 @@ def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_nu @pytest.mark.integration def test_branch_spark_write_py_read(session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table) -> None: - # Intialize table with branch + # Initialize table with branch identifier = "default.test_branch_spark_write_py_read" tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) branch = "existing_spark_branch" @@ -1971,7 +1971,7 @@ def test_branch_spark_write_py_read(session_catalog: Catalog, spark: SparkSessio @pytest.mark.integration def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table) -> None: - # Intialize table with branch + # Initialize table with branch identifier = "default.test_branch_py_write_spark_read" tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) branch = "existing_py_branch" From 49f75b4ec021e5e2cdcb058568c509a6089cd524 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Mon, 26 May 2025 09:56:13 +0530 Subject: [PATCH 23/27] Fixed bug for empty tables --- pyiceberg/table/__init__.py | 60 ++++++++++++++++++++---------- pyiceberg/table/update/snapshot.py | 43 +++++++++++++-------- pyiceberg/utils/concurrent.py | 10 ++--- 3 files changed, 73 insertions(+), 40 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b8bfdde2fc..c8316926a9 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -87,7 +87,7 @@ from pyiceberg.table.name_mapping import ( NameMapping, ) -from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef +from pyiceberg.table.refs import SnapshotRef from pyiceberg.table.snapshots import ( Snapshot, SnapshotLogEntry, @@ -398,7 +398,7 @@ def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanE expr = Or(expr, match_partition_expression) return expr - def _append_snapshot_producer(self, snapshot_properties: Dict[str, str], branch: str) -> _FastAppendFiles: + def _append_snapshot_producer(self, snapshot_properties: Dict[str, str], branch: Optional[str]) -> _FastAppendFiles: """Determine the append type based on table properties. Args: @@ -431,7 +431,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.table_metadata.name_mapping(), ) - def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> UpdateSnapshot: + def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None) -> UpdateSnapshot: """Create a new UpdateSnapshot to produce a new snapshot for the table. Returns: @@ -448,7 +448,7 @@ def update_statistics(self) -> UpdateStatistics: """ return UpdateStatistics(transaction=self) - def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None) -> None: """ Shorthand API for appending a PyArrow table to a table transaction. @@ -490,7 +490,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, append_files.append_data_file(data_file) def dynamic_partition_overwrite( - self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH + self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None ) -> None: """ Shorthand for overwriting existing partitions with a PyArrow table. @@ -554,7 +554,7 @@ def overwrite( overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT, case_sensitive: bool = True, - branch: str = MAIN_BRANCH, + branch: Optional[str] = None, ) -> None: """ Shorthand for adding a table overwrite with a PyArrow table to the transaction. @@ -617,7 +617,7 @@ def delete( delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT, case_sensitive: bool = True, - branch: str = MAIN_BRANCH, + branch: Optional[str] = None, ) -> None: """ Shorthand for deleting record from a table. @@ -656,7 +656,10 @@ def delete( bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter) - files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).use_ref(branch).plan_files() + if branch is None: + files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).plan_files() + else: + files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).use_ref(branch).plan_files() commit_uuid = uuid.uuid4() counter = itertools.count(0) @@ -717,6 +720,7 @@ def upsert( when_matched_update_all: bool = True, when_not_matched_insert_all: bool = True, case_sensitive: bool = True, + branch: Optional[str] = None, ) -> UpsertResult: """Shorthand API for performing an upsert to an iceberg table. @@ -727,6 +731,7 @@ def upsert( when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table case_sensitive: Bool indicating if the match should be case-sensitive + branch: Branch Reference to run the upsert operation To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids @@ -789,12 +794,24 @@ def upsert( matched_predicate = upsert_util.create_match_filter(df, join_cols) # We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes. - matched_iceberg_table = DataScan( - table_metadata=self.table_metadata, - io=self._table.io, - row_filter=matched_predicate, - case_sensitive=case_sensitive, - ).to_arrow() + if branch is None: + matched_iceberg_table = DataScan( + table_metadata=self.table_metadata, + io=self._table.io, + row_filter=matched_predicate, + case_sensitive=case_sensitive, + ).to_arrow() + else: + matched_iceberg_table = ( + DataScan( + table_metadata=self.table_metadata, + io=self._table.io, + row_filter=matched_predicate, + case_sensitive=case_sensitive, + ) + .use_ref(branch) + .to_arrow() + ) update_row_cnt = 0 insert_row_cnt = 0 @@ -811,7 +828,7 @@ def upsert( # build the match predicate filter overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols) - self.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate) + self.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate, branch=branch) if when_not_matched_insert_all: expr_match = upsert_util.create_match_filter(matched_iceberg_table, join_cols) @@ -822,7 +839,7 @@ def upsert( insert_row_cnt = len(rows_to_insert) if insert_row_cnt > 0: - self.append(rows_to_insert) + self.append(rows_to_insert, branch=branch) return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt) @@ -1255,6 +1272,7 @@ def upsert( when_matched_update_all: bool = True, when_not_matched_insert_all: bool = True, case_sensitive: bool = True, + branch: Optional[str] = None, ) -> UpsertResult: """Shorthand API for performing an upsert to an iceberg table. @@ -1265,6 +1283,7 @@ def upsert( when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table case_sensitive: Bool indicating if the match should be case-sensitive + branch: Branch Reference to run the upsert operation To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids @@ -1297,9 +1316,10 @@ def upsert( when_matched_update_all=when_matched_update_all, when_not_matched_insert_all=when_not_matched_insert_all, case_sensitive=case_sensitive, + branch=branch, ) - def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None) -> None: """ Shorthand API for appending a PyArrow table to the table. @@ -1312,7 +1332,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch) def dynamic_partition_overwrite( - self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH + self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None ) -> None: """Shorthand for dynamic overwriting the table with a PyArrow table. @@ -1331,7 +1351,7 @@ def overwrite( overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT, case_sensitive: bool = True, - branch: str = MAIN_BRANCH, + branch: Optional[str] = None, ) -> None: """ Shorthand for overwriting the table with a PyArrow table. @@ -1364,7 +1384,7 @@ def delete( delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT, case_sensitive: bool = True, - branch: str = MAIN_BRANCH, + branch: Optional[str] = None, ) -> None: """ Shorthand for deleting rows from the table. diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index cc219ace69..463f1f8257 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -105,30 +105,39 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] - _branch: str def __init__( self, operation: Operation, transaction: Transaction, io: FileIO, - branch: str, commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: str = MAIN_BRANCH, ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() self._io = io self._operation = operation self._snapshot_id = self._transaction.table_metadata.new_snapshot_id() - self._branch = branch - self._parent_snapshot_id = ( - snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._branch)) else None - ) self._added_data_files = [] self._deleted_data_files = set() self.snapshot_properties = snapshot_properties self._manifest_num_counter = itertools.count(0) + self._set_target_branch(branch=branch) + self._parent_snapshot_id = ( + snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._target_branch)) else None + ) + + def _set_target_branch(self, branch: str) -> None: + # Default is already set to MAIN_BRANCH. So branch name can't be None. + assert branch is not None, ValueError("Invalid branch name: null") + if branch in self._transaction.table_metadata.refs: + ref = self._transaction.table_metadata.refs[branch] + assert ref.snapshot_ref_type == SnapshotRefType.BRANCH, ValueError( + f"{branch} is a tag, not a branch. Tags cannot be targets for producing snapshots" + ) + self._target_branch = branch def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._added_data_files.append(data_file) @@ -276,16 +285,16 @@ def _commit(self) -> UpdatesAndRequirements: SetSnapshotRefUpdate( snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, - ref_name=self._branch, + ref_name=self._target_branch, type=SnapshotRefType.BRANCH, ), ), ( AssertRefSnapshotId( - snapshot_id=self._transaction.table_metadata.refs[self._branch].snapshot_id - if self._branch in self._transaction.table_metadata.refs + snapshot_id=self._transaction.table_metadata.refs[self._target_branch].snapshot_id + if self._target_branch in self._transaction.table_metadata.refs else self._transaction.table_metadata.current_snapshot_id, - ref=self._branch, + ref=self._target_branch, ), ), ) @@ -338,7 +347,7 @@ def __init__( commit_uuid: Optional[uuid.UUID] = None, snapshot_properties: Dict[str, str] = EMPTY_DICT, ): - super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties) + super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch) self._predicate = AlwaysFalse() self._case_sensitive = True @@ -503,7 +512,7 @@ def __init__( ) -> None: from pyiceberg.table import TableProperties - super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties) + super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch) self._target_size_bytes = property_as_int( self._transaction.table_metadata.properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, @@ -549,7 +558,7 @@ def _existing_manifests(self) -> List[ManifestFile]: """Determine if there are any existing manifest files.""" existing_files = [] - if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._branch): + if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch): for manifest_file in snapshot.manifests(io=self._io): entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files] @@ -623,12 +632,16 @@ class UpdateSnapshot: _snapshot_properties: Dict[str, str] def __init__( - self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH + self, + transaction: Transaction, + io: FileIO, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + branch: Optional[str] = MAIN_BRANCH, ) -> None: self._transaction = transaction self._io = io self._snapshot_properties = snapshot_properties - self._branch = branch + self._branch = branch if branch is not None else MAIN_BRANCH def fast_append(self) -> _FastAppendFiles: return _FastAppendFiles( diff --git a/pyiceberg/utils/concurrent.py b/pyiceberg/utils/concurrent.py index 805599bf41..751cbd9bbb 100644 --- a/pyiceberg/utils/concurrent.py +++ b/pyiceberg/utils/concurrent.py @@ -25,6 +25,11 @@ class ExecutorFactory: _instance: Optional[Executor] = None + @staticmethod + def max_workers() -> Optional[int]: + """Return the max number of workers configured.""" + return Config().get_int("max-workers") + @staticmethod def get_or_create() -> Executor: """Return the same executor in each call.""" @@ -33,8 +38,3 @@ def get_or_create() -> Executor: ExecutorFactory._instance = ThreadPoolExecutor(max_workers=max_workers) return ExecutorFactory._instance - - @staticmethod - def max_workers() -> Optional[int]: - """Return the max number of workers configured.""" - return Config().get_int("max-workers") From 4ed060737949cb13300cf11c2cf127cdf689f9ff Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Tue, 27 May 2025 13:35:30 +0530 Subject: [PATCH 24/27] Fixed bugs and added more tests --- pyiceberg/table/__init__.py | 2 +- pyiceberg/table/update/snapshot.py | 116 +++++++++++++++-------------- tests/integration/test_deletes.py | 29 ++++++++ 3 files changed, 90 insertions(+), 57 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c8316926a9..c9ea2412a7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -541,7 +541,7 @@ def dynamic_partition_overwrite( partitions_to_overwrite = {data_file.partition for data_file in data_files} delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite) - self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties) + self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch) with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files: append_files.commit_uuid = append_snapshot_commit_uuid diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 463f1f8257..236d2a3816 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -131,12 +131,12 @@ def __init__( def _set_target_branch(self, branch: str) -> None: # Default is already set to MAIN_BRANCH. So branch name can't be None. - assert branch is not None, ValueError("Invalid branch name: null") + assert branch is not None, "Invalid branch name: null" if branch in self._transaction.table_metadata.refs: ref = self._transaction.table_metadata.refs[branch] - assert ref.snapshot_ref_type == SnapshotRefType.BRANCH, ValueError( - f"{branch} is a tag, not a branch. Tags cannot be targets for producing snapshots" - ) + assert ( + ref.snapshot_ref_type == SnapshotRefType.BRANCH + ), f"{branch} is a tag, not a branch. Tags cannot be targets for producing snapshots" self._target_branch = branch def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: @@ -293,7 +293,7 @@ def _commit(self) -> UpdatesAndRequirements: AssertRefSnapshotId( snapshot_id=self._transaction.table_metadata.refs[self._target_branch].snapshot_id if self._target_branch in self._transaction.table_metadata.refs - else self._transaction.table_metadata.current_snapshot_id, + else None, ref=self._target_branch, ), ), @@ -407,46 +407,52 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> total_deleted_entries = [] partial_rewrites_needed = False self._deleted_data_files = set() - if snapshot := self._transaction.table_metadata.current_snapshot(): - for manifest_file in snapshot.manifests(io=self._io): - if manifest_file.content == ManifestContent.DATA: - if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): - # If the manifest isn't relevant, we can just keep it in the manifest-list - existing_manifests.append(manifest_file) - else: - # It is relevant, let's check out the content - deleted_entries = [] - existing_entries = [] - for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): - if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH: - # Based on the metadata, it can be dropped right away - deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED)) - self._deleted_data_files.add(entry.data_file) - else: - # Based on the metadata, we cannot determine if it can be deleted - existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING)) - if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH: - partial_rewrites_needed = True - - if len(deleted_entries) > 0: - total_deleted_entries += deleted_entries - - # Rewrite the manifest - if len(existing_entries) > 0: - with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, - ) as writer: - for existing_entry in existing_entries: - writer.add_entry(existing_entry) - existing_manifests.append(writer.to_manifest_file()) - else: + + # Determine the snapshot to read manifests from for deletion + # Should be the current tip of the _target_branch + parent_snapshot_id_for_delete_source = self._parent_snapshot_id + if parent_snapshot_id_for_delete_source is not None: + snapshot = self._transaction.table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source) + if snapshot: # Ensure snapshot is found + for manifest_file in snapshot.manifests(io=self._io): + if manifest_file.content == ManifestContent.DATA: + if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): + # If the manifest isn't relevant, we can just keep it in the manifest-list existing_manifests.append(manifest_file) - else: - existing_manifests.append(manifest_file) + else: + # It is relevant, let's check out the content + deleted_entries = [] + existing_entries = [] + for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH: + # Based on the metadata, it can be dropped right away + deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED)) + self._deleted_data_files.add(entry.data_file) + else: + # Based on the metadata, we cannot determine if it can be deleted + existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING)) + if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH: + partial_rewrites_needed = True + + if len(deleted_entries) > 0: + total_deleted_entries += deleted_entries + + # Rewrite the manifest + if len(existing_entries) > 0: + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, + ) as writer: + for existing_entry in existing_entries: + writer.add_entry(existing_entry) + existing_manifests.append(writer.to_manifest_file()) + else: + existing_manifests.append(manifest_file) + else: + existing_manifests.append(manifest_file) return existing_manifests, total_deleted_entries, partial_rewrites_needed @@ -575,19 +581,17 @@ def _existing_manifests(self) -> List[ManifestFile]: output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, ) as writer: - [ - writer.add_entry( - ManifestEntry.from_args( - status=ManifestEntryStatus.EXISTING, - snapshot_id=entry.snapshot_id, - sequence_number=entry.sequence_number, - file_sequence_number=entry.file_sequence_number, - data_file=entry.data_file, + for entry in entries: + if entry.data_file not in found_deleted_data_files: + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) ) - ) - for entry in entries - if entry.data_file not in found_deleted_data_files - ] existing_files.append(writer.to_manifest_file()) return existing_files diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 527f659640..abf8502ac7 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -894,3 +894,32 @@ def test_overwrite_with_filter_case_insensitive(test_table: Table) -> None: test_table.overwrite(df=new_table, overwrite_filter=f"Idx == {record_to_overwrite['idx']}", case_sensitive=False) assert record_to_overwrite not in test_table.scan().to_arrow().to_pylist() assert new_record_to_insert in test_table.scan().to_arrow().to_pylist() + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.filterwarnings("ignore:Delete operation did not match any records") +def test_delete_on_empty_table(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: + identifier = f"default.test_delete_on_empty_table_{format_version}" + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + volume int + ) + USING iceberg + TBLPROPERTIES('format-version' = {format_version}) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + + # Perform a delete operation on the empty table + tbl.delete(AlwaysTrue()) + + # Assert that no new snapshot was created because no rows were deleted + assert len(tbl.snapshots()) == 0 From 958aac478ebdc1cdfaccea5bbf8ea8023bb808a6 Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Tue, 27 May 2025 19:58:09 +0530 Subject: [PATCH 25/27] changed design context for branch writes --- pyiceberg/table/__init__.py | 5 ++++- pyiceberg/table/update/snapshot.py | 19 ++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c9ea2412a7..ef395865b7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -87,7 +87,7 @@ from pyiceberg.table.name_mapping import ( NameMapping, ) -from pyiceberg.table.refs import SnapshotRef +from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef from pyiceberg.table.snapshots import ( Snapshot, SnapshotLogEntry, @@ -437,6 +437,9 @@ def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, bran Returns: A new UpdateSnapshot """ + if branch is None: + branch = MAIN_BRANCH + return UpdateSnapshot(self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties) def update_statistics(self) -> UpdateStatistics: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 236d2a3816..8c4d26e20d 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -105,6 +105,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] + _target_branch = MAIN_BRANCH def __init__( self, @@ -124,20 +125,20 @@ def __init__( self._deleted_data_files = set() self.snapshot_properties = snapshot_properties self._manifest_num_counter = itertools.count(0) - self._set_target_branch(branch=branch) + self._target_branch = self._validate_target_branch(branch=branch) self._parent_snapshot_id = ( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._target_branch)) else None ) - def _set_target_branch(self, branch: str) -> None: + def _validate_target_branch(self, branch: str) -> str: # Default is already set to MAIN_BRANCH. So branch name can't be None. - assert branch is not None, "Invalid branch name: null" + if branch is None: + raise ValueError("Invalid branch name: null") if branch in self._transaction.table_metadata.refs: ref = self._transaction.table_metadata.refs[branch] - assert ( - ref.snapshot_ref_type == SnapshotRefType.BRANCH - ), f"{branch} is a tag, not a branch. Tags cannot be targets for producing snapshots" - self._target_branch = branch + if ref.snapshot_ref_type != SnapshotRefType.BRANCH: + raise ValueError(f"{branch} is a tag, not a branch. Tags cannot be targets for producing snapshots") + return branch def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._added_data_files.append(data_file) @@ -639,13 +640,13 @@ def __init__( self, transaction: Transaction, io: FileIO, + branch: str, snapshot_properties: Dict[str, str] = EMPTY_DICT, - branch: Optional[str] = MAIN_BRANCH, ) -> None: self._transaction = transaction self._io = io self._snapshot_properties = snapshot_properties - self._branch = branch if branch is not None else MAIN_BRANCH + self._branch = branch def fast_append(self) -> _FastAppendFiles: return _FastAppendFiles( From 079802af8bc6ddc55f400306902da12e800a65ee Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Tue, 24 Jun 2025 07:22:29 +0530 Subject: [PATCH 26/27] Fixed linter, comments and other bugs --- pyiceberg/table/__init__.py | 34 ++++++++++++------------------ pyiceberg/table/update/snapshot.py | 2 +- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 58ffe01bc0..0537d45841 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -796,24 +796,18 @@ def upsert( matched_predicate = upsert_util.create_match_filter(df, join_cols) # We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes. - if branch is None: - matched_iceberg_record_batches = DataScan( - table_metadata=self.table_metadata, - io=self._table.io, - row_filter=matched_predicate, - case_sensitive=case_sensitive, - ).to_arrow_batch_reader() - else: - matched_iceberg_record_batches = ( - DataScan( - table_metadata=self.table_metadata, - io=self._table.io, - row_filter=matched_predicate, - case_sensitive=case_sensitive, - ) - .use_ref(branch) - .to_arrow() - ) + + matched_iceberg_record_batches_scan = DataScan( + table_metadata=self.table_metadata, + io=self._table.io, + row_filter=matched_predicate, + case_sensitive=case_sensitive, + ) + + if branch is not None: + matched_iceberg_record_batches_scan = matched_iceberg_record_batches_scan.use_ref(branch) + + matched_iceberg_record_batches = matched_iceberg_record_batches_scan.to_arrow_batch_reader() batches_to_overwrite = [] overwrite_predicates = [] @@ -852,13 +846,13 @@ def upsert( self.overwrite( rows_to_update, overwrite_filter=Or(*overwrite_predicates) if len(overwrite_predicates) > 1 else overwrite_predicates[0], - branch=branch + branch=branch, ) if when_not_matched_insert_all: insert_row_cnt = len(rows_to_insert) if rows_to_insert: - self.append(rows_to_insert,branch=branch) + self.append(rows_to_insert, branch=branch) return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 5c99e433ed..3ffb275ded 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -685,7 +685,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles: return _OverwriteFiles( commit_uuid=commit_uuid, operation=Operation.OVERWRITE - if self._transaction.table_metadata.current_snapshot() is not None + if self._transaction.table_metadata.snapshot_by_name(name=self._branch) is not None else Operation.APPEND, transaction=self._transaction, io=self._io, From f45df8b66da0735421655a8954aabe1df1c6eeef Mon Sep 17 00:00:00 2001 From: Vinayak Jaiswal Date: Tue, 24 Jun 2025 08:54:52 +0530 Subject: [PATCH 27/27] Usage of builder pattern --- pyiceberg/table/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0537d45841..07602c9ee5 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -658,10 +658,10 @@ def delete( bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter) - if branch is None: - files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).plan_files() - else: - files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).use_ref(branch).plan_files() + file_scan = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive) + if branch is not None: + file_scan = file_scan.use_ref(branch) + files = file_scan.plan_files() commit_uuid = uuid.uuid4() counter = itertools.count(0)