Skip to content

Commit f0d75de

Browse files
fix: bulk mutation eventual success (#909)
1 parent cc79d8c commit f0d75de

File tree

3 files changed

+49
-0
lines changed

3 files changed

+49
-0
lines changed

google/cloud/bigtable/data/_async/_mutate_rows.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ async def _run_attempt(self):
189189
if result.status.code != 0:
190190
# mutation failed; update error list (and remaining_indices if retryable)
191191
self._handle_entry_error(orig_idx, entry_error)
192+
elif orig_idx in self.errors:
193+
# mutation succeeded; remove from error list
194+
del self.errors[orig_idx]
192195
# remove processed entry from active list
193196
del active_request_indices[result.index]
194197
except Exception as exc:

tests/system/data/test_system.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,28 @@ async def test_bulk_mutations_set_cell(client, table, temp_rows):
238238
assert (await _retrieve_cell_value(table, row_key)) == new_value
239239

240240

241+
@pytest.mark.asyncio
242+
async def test_bulk_mutations_raise_exception(client, table):
243+
"""
244+
If an invalid mutation is passed, an exception should be raised
245+
"""
246+
from google.cloud.bigtable.data.mutations import RowMutationEntry, SetCell
247+
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
248+
from google.cloud.bigtable.data.exceptions import FailedMutationEntryError
249+
250+
row_key = uuid.uuid4().hex.encode()
251+
mutation = SetCell(family="nonexistent", qualifier=b"test-qualifier", new_value=b"")
252+
bulk_mutation = RowMutationEntry(row_key, [mutation])
253+
254+
with pytest.raises(MutationsExceptionGroup) as exc:
255+
await table.bulk_mutate_rows([bulk_mutation])
256+
assert len(exc.value.exceptions) == 1
257+
entry_error = exc.value.exceptions[0]
258+
assert isinstance(entry_error, FailedMutationEntryError)
259+
assert entry_error.index == 0
260+
assert entry_error.entry == bulk_mutation
261+
262+
241263
@pytest.mark.usefixtures("client")
242264
@pytest.mark.usefixtures("table")
243265
@retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5)

tests/unit/data/_async/test_client.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2659,6 +2659,30 @@ async def test_bulk_mutate_error_index(self):
26592659
assert isinstance(cause.exceptions[1], DeadlineExceeded)
26602660
assert isinstance(cause.exceptions[2], FailedPrecondition)
26612661

2662+
@pytest.mark.asyncio
2663+
async def test_bulk_mutate_error_recovery(self):
2664+
"""
2665+
If an error occurs, then resolves, no exception should be raised
2666+
"""
2667+
from google.api_core.exceptions import DeadlineExceeded
2668+
2669+
async with self._make_client(project="project") as client:
2670+
table = client.get_table("instance", "table")
2671+
with mock.patch.object(client._gapic_client, "mutate_rows") as mock_gapic:
2672+
# fail with a retryable error, then a non-retryable one
2673+
mock_gapic.side_effect = [
2674+
self._mock_response([DeadlineExceeded("mock")]),
2675+
self._mock_response([None]),
2676+
]
2677+
mutation = mutations.SetCell(
2678+
"family", b"qualifier", b"value", timestamp_micros=123
2679+
)
2680+
entries = [
2681+
mutations.RowMutationEntry((f"row_key_{i}").encode(), [mutation])
2682+
for i in range(3)
2683+
]
2684+
await table.bulk_mutate_rows(entries, operation_timeout=1000)
2685+
26622686

26632687
class TestCheckAndMutateRow:
26642688
def _make_client(self, *args, **kwargs):

0 commit comments

Comments
 (0)