From 807ae39545fdd68989cf7e2194cf0d3de070f849 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Tue, 30 Sep 2025 14:27:49 +0000 Subject: [PATCH 01/12] tests: System/compatibility tests for testing shim compatiblity --- google/cloud/bigtable/row_data.py | 2 +- tests/system/v2_client/test_data_api.py | 712 +++++++++++++++++++++++- 2 files changed, 710 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index e11379108..b333d9c6a 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -227,7 +227,7 @@ def _on_error(self, exc): retry_request = self._create_retry_request() self._row_merger = _RowMerger(self._row_merger.last_seen_row_key) - self.response_iterator = self.read_method(retry_request) + self.response_iterator = self.read_method(retry_request, retry=self.retry) def _read_next(self): """Helper for :meth:`__iter__`.""" diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index 579837e34..588098ecd 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -14,9 +14,17 @@ import datetime import operator +import struct import pytest +from google.cloud.bigtable import row_filters + +from grpc import UnaryStreamClientInterceptor +from grpc import RpcError +from grpc import StatusCode +from grpc import intercept_channel + COLUMN_FAMILY_ID1 = "col-fam-id1" COLUMN_FAMILY_ID2 = "col-fam-id2" COL_NAME1 = b"col-name1" @@ -26,9 +34,98 @@ CELL_VAL2 = b"cell-val-newer" CELL_VAL3 = b"altcol-cell-val" CELL_VAL4 = b"foo" +CELL_VAL_READ_ROWS_RETRY = b"1" * 512 ROW_KEY = b"row-key" ROW_KEY_ALT = b"row-key-alt" +CELL_VAL_TRUE = b"true" +CELL_VAL_FALSE = b"false" +INT_COL_NAME = 67890 +INT_CELL_VAL = 12345 +OVERFLOW_INT_CELL_VAL = 10 ** 100 +OVERFLOW_INT_CELL_VAL2 = -(10 ** 100) +FLOAT_CELL_VAL = 1.4 +FLOAT_CELL_VAL2 = -1.4 + +INITIAL_ROW_SPLITS = [ + b"row_split_1", + b"row_split_2", + b"row_split_3" +] +JOY_EMOJI = "\N{FACE WITH TEARS OF JOY}" + +PASS_ALL_FILTER = row_filters.PassAllFilter(True) +BLOCK_ALL_FILTER = row_filters.BlockAllFilter(True) + +READ_ROWS_METHOD_NAME = "/google.bigtable.v2.Bigtable/ReadRows" + +class SelectiveMethodsErrorInjector(UnaryStreamClientInterceptor): + def __init__(self): + # As long as there are more items on this list, the items on the list + # are as follows: + # + # 1. None = behave as normal + # 2. errors are raised. + # + # This is to inject errors mid stream after a bunch of normal behavior. + self.errors_to_inject = [] + + @staticmethod + def make_exception(status_code, message=None, fail_mid_stream=False, successes_before_fail=0): + # successes_before_fail allows us to test injecting failures mid-iterator iteration. + exc = RpcError(status_code) + exc.code = lambda: status_code + + _, status_message = status_code.value + exc.details = lambda: message if message else status_message + + exc.initial_metadata = lambda: [] + exc.trailing_metadata = lambda: [] + exc.fail_mid_stream = fail_mid_stream + exc.successes_before_fail = successes_before_fail + + def _result(): + raise exc + + exc.result = _result + return exc + + def intercept_unary_stream(self, continuation, client_call_details, request): + if client_call_details.method == READ_ROWS_METHOD_NAME and self.errors_to_inject: + error = self.errors_to_inject.pop(0) + if not error.fail_mid_stream: + raise error + + response = continuation(client_call_details, request) + if error.fail_mid_stream: + class CallWrapper: + def __init__(self, call, exc_to_raise): + self._call = call + self._exc = exc_to_raise + self._successes_remaining = exc_to_raise.successes_before_fail + self._raised = False + + def __iter__(self): + return self + + def __next__(self): + if not self._raised and self._successes_remaining == 0: + self._raised = True + if self._exc: + raise self._exc + + else: + if self._successes_remaining > 0: + self._successes_remaining -= 1 + return self._call.__next__() + + def __getattr__(self, name): + return getattr(self._call, name) + + return CallWrapper(response, error) + else: + return continuation(client_call_details, request) + @pytest.fixture(scope="module") def data_table_id(): @@ -38,7 +135,7 @@ def data_table_id(): @pytest.fixture(scope="module") def data_table(data_instance_populated, data_table_id): table = data_instance_populated.table(data_table_id) - table.create() + table.create(initial_split_keys=INITIAL_ROW_SPLITS) table.column_family(COLUMN_FAMILY_ID1).create() table.column_family(COLUMN_FAMILY_ID2).create() @@ -59,6 +156,36 @@ def rows_to_delete(): row.commit() +@pytest.fixture(scope="function") +def data_table_read_rows_retry_tests(data_table, rows_to_delete): + row_keys = [f"row_key_{i}".encode() for i in range(0, 32)] + columns = [f"col_{i}".encode() for i in range(0, 32)] + + _populate_table(data_table, rows_to_delete, row_keys, columns, CELL_VAL_READ_ROWS_RETRY) + + yield data_table + + +@pytest.fixture(scope="function") +def data_table_read_rows_with_error_injector(data_table_read_rows_retry_tests): + data_client = data_table_read_rows_retry_tests._instance._client.table_data_client + error_injector = SelectiveMethodsErrorInjector() + old_logged_channel = data_client.transport._logged_channel + data_client.transport._logged_channel = intercept_channel( + old_logged_channel, error_injector + ) + data_table_read_rows_retry_tests.error_injector = error_injector + data_client.transport._stubs = {} + data_client.transport._prep_wrapped_messages(None) + + yield data_table_read_rows_retry_tests + + del data_table_read_rows_retry_tests.error_injector + data_client.transport._logged_channel = old_logged_channel + data_client.transport._stubs = {} + data_client.transport._prep_wrapped_messages(None) + + def test_table_read_rows_filter_millis(data_table): from google.cloud.bigtable import row_filters @@ -99,10 +226,135 @@ def test_table_mutate_rows(data_table, rows_to_delete): assert row2_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == CELL_VAL4 -def _populate_table(data_table, rows_to_delete, row_keys): +def _add_test_error_handler(retry): + import time + + curr_time = time.monotonic() + times_triggered = 0 + + # Assert that the retry handler works properly. + def test_error_handler(exc): + nonlocal curr_time, times_triggered, retry + next_time = time.monotonic() + if times_triggered >= 1: + gap = next_time - curr_time + + # Exponential backoff = uniform randomness from 0 to max_gap + max_gap = min( + retry._initial * retry._multiplier**times_triggered, + retry._maximum, + ) + assert gap <= max_gap + times_triggered += 1 + curr_time = next_time + + retry._on_error = test_error_handler + + +def test_table_mutate_rows_retries_timeout(data_table, rows_to_delete): + import mock + import copy + from google.api_core import retry as retries + from google.api_core.exceptions import InvalidArgument + from google.cloud.bigtable_v2 import MutateRowsResponse + from google.cloud.bigtable.table import DEFAULT_RETRY, _BigtableRetryableError + from google.rpc.code_pb2 import Code + from google.rpc.status_pb2 import Status + + # Simulate a server error on row 2, and a normal response on row 1, followed by a bunch of error + # responses on row 2 + initial_error_response = [ + MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry(), + MutateRowsResponse.Entry( + index=1, + status=Status( + code=Code.INTERNAL, + message="Test error", + ), + ), + ] + ) + ] + + followup_error_response = [ + MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + status=Status( + code=Code.INTERNAL, + message="Test error", + ) + ) + ] + ) + ] + + final_success_response = [MutateRowsResponse(entries=[MutateRowsResponse.Entry()])] + + with mock.patch.object(data_table._instance._client.table_data_client, "mutate_rows") as mutate_mock: + mutate_mock.side_effect = [ + initial_error_response, + followup_error_response, + followup_error_response, + final_success_response, + ] + + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + row_2 = data_table.direct_row(ROW_KEY_ALT) + rows_to_delete.append(row_2) + row_2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + # Testing the default retry + default_retry_copy = copy.copy(DEFAULT_RETRY) + _add_test_error_handler(default_retry_copy) + statuses = data_table.mutate_rows([row, row_2], retry=default_retry_copy) + assert statuses[0].code == Code.OK + assert statuses[1].code == Code.OK + + # Simulate only server failures for row 2. + with mock.patch.object(data_table._instance._client.table_data_client, "mutate_rows") as mutate_mock: + mutate_mock.side_effect = [initial_error_response] + [followup_error_response] * 1000000 + + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + row_2 = data_table.direct_row(ROW_KEY_ALT) + rows_to_delete.append(row_2) + row_2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + # Testing the default retry + default_retry_copy = copy.copy(DEFAULT_RETRY) + _add_test_error_handler(default_retry_copy) + statuses = data_table.mutate_rows([row, row_2], retry=default_retry_copy) + assert statuses[0].code == Code.OK + assert statuses[1].code == Code.INTERNAL + + # Because of the way the retriable mutate worker class works, unusual things can happen + # when passing in custom retry predicates. + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + + row_2 = data_table.direct_row(ROW_KEY_ALT) + rows_to_delete.append(row_2) + + retry = DEFAULT_RETRY.with_predicate(retries.if_exception_type(_BigtableRetryableError, InvalidArgument)) + _add_test_error_handler(retry) + statuses = data_table.mutate_rows([row, row_2], retry=retry) + assert statuses[0] is None + assert statuses[1] is None + + +def _populate_table(data_table, rows_to_delete, row_keys, columns=[COL_NAME1], cell_value=CELL_VAL1): for row_key in row_keys: row = data_table.direct_row(row_key) - row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + for column in columns: + row.set_cell(COLUMN_FAMILY_ID1, column, cell_value) row.commit() rows_to_delete.append(row) @@ -157,6 +409,47 @@ def test_table_drop_by_prefix(data_table, rows_to_delete): assert expected_rows_count == found_rows_count +def test_table_mutate_rows_integers(data_table, rows_to_delete): + row = data_table.direct_row(ROW_KEY) + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + row.commit() + rows_to_delete.append(row) + + # Change the contents to an integer + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, INT_CELL_VAL) + statuses = data_table.mutate_rows([row]) + assert len(statuses) == 1 + for status in statuses: + assert status.code == 0 + + # Check the contents + row1_data = data_table.read_row(ROW_KEY) + assert int.from_bytes(row1_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, byteorder="big") == INT_CELL_VAL + + +def test_table_mutate_rows_input_errors(data_table, rows_to_delete): + from google.api_core.exceptions import InvalidArgument + from google.cloud.bigtable.table import TooManyMutationsError, _MAX_BULK_MUTATIONS + + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + + # Mutate row with 0 mutations gives an API error from the service, not + # from the client library. + with pytest.raises(InvalidArgument): + data_table.mutate_rows([row]) + + row.clear() + + # Mutate row with >100k mutations gives a TooManyMutationsError from the + # client library. + for _ in range(0, _MAX_BULK_MUTATIONS + 1): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + with pytest.raises(TooManyMutationsError): + data_table.mutate_rows([row]) + + def test_table_read_rows_w_row_set(data_table, rows_to_delete): from google.cloud.bigtable.row_set import RowSet from google.cloud.bigtable.row_set import RowRange @@ -377,6 +670,419 @@ def test_read_with_label_applied(data_table, rows_to_delete, skip_on_emulator): assert cell3_new.labels == [label2] +def _assert_data_table_read_rows_retry_correct(rows_data): + for row_num in range(0, 32): + row = rows_data.rows[f"row_key_{row_num}".encode()] + for col_num in range(0, 32): + assert row.cells[COLUMN_FAMILY_ID1][f"col_{col_num}".encode()][0].value == CELL_VAL_READ_ROWS_RETRY + + +def test_table_read_rows_retry_unretriable_error_establishing_stream(data_table_read_rows_with_error_injector): + from google.api_core import exceptions + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False) + ] + + with pytest.raises(exceptions.Aborted): + data_table_read_rows_with_error_injector.read_rows() + + +def test_table_read_rows_retry_retriable_error_establishing_stream(data_table_read_rows_with_error_injector): + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False) + ] * 3 + + rows_data = data_table_read_rows_with_error_injector.read_rows() + rows_data.consume_all() + + _assert_data_table_read_rows_retry_correct(rows_data) + + +def test_table_read_rows_retry_unretriable_error_mid_stream(data_table_read_rows_with_error_injector): + from google.api_core import exceptions + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.DATA_LOSS, fail_mid_stream=True, successes_before_fail=5) + ] + + rows_data = data_table_read_rows_with_error_injector.read_rows() + with pytest.raises(exceptions.DataLoss): + rows_data.consume_all() + + +def test_table_read_rows_retry_retriable_errors_mid_stream(data_table_read_rows_with_error_injector): + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=4), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0), + ] + + rows_data = data_table_read_rows_with_error_injector.read_rows() + rows_data.consume_all() + + _assert_data_table_read_rows_retry_correct(rows_data) + + +def test_table_read_rows_retry_retriable_internal_errors_mid_stream(data_table_read_rows_with_error_injector): + from google.cloud.bigtable.row_data import RETRYABLE_INTERNAL_ERROR_MESSAGES + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.INTERNAL, message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], fail_mid_stream=True, successes_before_fail=2), + error_injector.make_exception(StatusCode.INTERNAL, message=RETRYABLE_INTERNAL_ERROR_MESSAGES[1], fail_mid_stream=True, successes_before_fail=1), + error_injector.make_exception(StatusCode.INTERNAL, message=RETRYABLE_INTERNAL_ERROR_MESSAGES[2], fail_mid_stream=True, successes_before_fail=0), + ] + + rows_data = data_table_read_rows_with_error_injector.read_rows() + rows_data.consume_all() + + _assert_data_table_read_rows_retry_correct(rows_data) + + +def test_table_read_rows_retry_unretriable_internal_errors_mid_stream(data_table_read_rows_with_error_injector): + from google.api_core import exceptions + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.INTERNAL, message="Don't retry this at home!", fail_mid_stream=True, successes_before_fail=2), + ] + + rows_data = data_table_read_rows_with_error_injector.read_rows() + with pytest.raises(exceptions.InternalServerError): + rows_data.consume_all() + + +def test_table_read_rows_retry_retriable_error_mid_stream_unretriable_error_reestablishing_stream(data_table_read_rows_with_error_injector): + # Simulate a connection failure mid-stream into an unretriable error when trying to reconnect. + from google.api_core import exceptions + + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5), + error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False), + ] + + rows_data = data_table_read_rows_with_error_injector.read_rows() + + with pytest.raises(exceptions.Aborted): + rows_data.consume_all() + + +def test_table_read_rows_retry_retriable_error_mid_stream_retriable_error_reestablishing_stream(data_table_read_rows_with_error_injector): + # Simulate a connection failure mid-stream into retriable errors when trying to reconnect. + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), + ] + + rows_data = data_table_read_rows_with_error_injector.read_rows() + rows_data.consume_all() + + _assert_data_table_read_rows_retry_correct(rows_data) + + +def test_table_read_rows_retry_timeout_mid_stream(data_table_read_rows_with_error_injector): + # Simulate a read timeout mid stream. + + from google.api_core import exceptions + from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS, RETRYABLE_INTERNAL_ERROR_MESSAGES + + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.INTERNAL, message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], fail_mid_stream=True, successes_before_fail=5), + ] + [ + error_injector.make_exception(StatusCode.INTERNAL, message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], fail_mid_stream=True, successes_before_fail=0), + ] * 20 + + # Shorten the deadline so the timeout test is shorter. + rows_data = data_table_read_rows_with_error_injector.read_rows(retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0)) + with pytest.raises(exceptions.RetryError): + rows_data.consume_all() + + +def test_table_read_rows_retry_timeout_establishing_stream(data_table_read_rows_with_error_injector): + # Simulate a read timeout when creating the stream. + + from google.api_core import exceptions + from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS + + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False), + ] + [ + error_injector.make_exception(StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False), + ] * 20 + + # Shorten the deadline so the timeout test is shorter. + with pytest.raises(exceptions.RetryError): + data_table_read_rows_with_error_injector.read_rows(retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0)) + + +def test_table_check_and_mutate_rows(data_table, rows_to_delete): + true_mutation_row_key = b"true_row" + false_mutation_row_key = b"false_row" + + columns = [ + b"col_1", + b"col_2", + b"col_3", + b"col_4", + b"col_pr_1", + b"col_pr_2", + ] + _populate_table(data_table, rows_to_delete, [true_mutation_row_key, false_mutation_row_key], columns=columns) + true_row = data_table.conditional_row(true_mutation_row_key, PASS_ALL_FILTER) + for column in columns: + true_row.set_cell(COLUMN_FAMILY_ID1, column, CELL_VAL_TRUE, state=True) + true_row.set_cell(COLUMN_FAMILY_ID1, column, CELL_VAL_FALSE, state=False) + matched = true_row.commit() + assert matched == True + + false_row = data_table.conditional_row(false_mutation_row_key, BLOCK_ALL_FILTER) + for column in columns: + false_row.set_cell(COLUMN_FAMILY_ID1, column, CELL_VAL_TRUE, state=True) + false_row.delete_cell(COLUMN_FAMILY_ID1, column, state=False) + matched = false_row.commit() + assert matched == False + + row1_data = data_table.read_row(true_mutation_row_key) + for column in columns: + assert row1_data.cells[COLUMN_FAMILY_ID1][column][0].value == CELL_VAL_TRUE + + row2_data = data_table.read_row(false_mutation_row_key) + assert row2_data is None # all cells should be deleted + + +def test_table_append_row(data_table, rows_to_delete): + row = data_table.append_row(ROW_KEY) + rows_to_delete.append(data_table.direct_row(ROW_KEY)) + + int_col_name = b"int_col" + str_col_name = b"str_col" + num_increments = 100 + + row.append_cell_value(COLUMN_FAMILY_ID1, str_col_name, b"foo") + row.append_cell_value(COLUMN_FAMILY_ID1, str_col_name, b"bar") + + # Column names are convertible to byte strings provided it's a valid ascii string. + row.append_cell_value(COLUMN_FAMILY_ID1, str_col_name.decode("ascii"), b"baz") + + for _ in range(0, num_increments): + row.increment_cell_value(COLUMN_FAMILY_ID1, int_col_name, 1) + + row.commit() + + row_data = data_table.read_row(ROW_KEY) + assert row_data.cells[COLUMN_FAMILY_ID1][int_col_name][0].value == num_increments.to_bytes(8, byteorder="big", signed=True) + assert row_data.cells[COLUMN_FAMILY_ID1][str_col_name][0].value == b"foobarbaz" + + +def test_table_sample_row_keys(data_table): + # sample_row_keys returns a generator + response = list(data_table.sample_row_keys()) + previous_offset_bytes = 0 + for idx in range(len(INITIAL_ROW_SPLITS)): + assert response[idx].row_key == INITIAL_ROW_SPLITS[idx] + + offset_bytes = response[idx].offset_bytes + assert isinstance(offset_bytes, int) + assert offset_bytes >= previous_offset_bytes + + previous_offset_bytes = offset_bytes + assert response[-1].row_key == b"" + assert isinstance(response[-1].offset_bytes, int) + assert response[-1].offset_bytes >= previous_offset_bytes + + +def test_table_direct_row_input_errors(data_table, rows_to_delete): + from google.api_core.exceptions import InvalidArgument + from google.cloud.bigtable.row import MAX_MUTATIONS + + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + + # Column names are converted to bytes successfully if they're ASCII strings + # or bytes already. + with pytest.raises(TypeError): + row.set_cell(COLUMN_FAMILY_ID1, INT_COL_NAME, CELL_VAL1) + + with pytest.raises(TypeError): + row.delete_cell(COLUMN_FAMILY_ID1, INT_COL_NAME) + + # Unicode for column name and value does not get converted to bytes because + # internally we use to_bytes in ascii mode. + with pytest.raises(UnicodeEncodeError): + row.set_cell(COLUMN_FAMILY_ID1, JOY_EMOJI, CELL_VAL1) + + with pytest.raises(UnicodeEncodeError): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, JOY_EMOJI) + + with pytest.raises(UnicodeEncodeError): + row.delete_cell(COLUMN_FAMILY_ID1, JOY_EMOJI) + + # Various non int64s, we use struct to pack a Python int to bytes. + with pytest.raises(struct.error): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL) + + with pytest.raises(struct.error): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL2) + + # Since floats aren't ints, they aren't converted to bytes via struct.pack, + # but via _to_bytes, so you get a TypeError instead. + with pytest.raises(TypeError): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, FLOAT_CELL_VAL) + + # Can't have more than MAX_MUTATIONS mutations, but only enforced after + # a row.commit + row.clear() + for _ in range(0, MAX_MUTATIONS + 1): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + with pytest.raises(ValueError): + row.commit() + + # Not having any mutations gives a server error (InvalidArgument), not + # enforced on the client side. + row.clear() + with pytest.raises(InvalidArgument): + row.commit() + + +def test_table_conditional_row_input_errors(data_table, rows_to_delete): + from google.cloud.bigtable.row import MAX_MUTATIONS + + rows = [ROW_KEY, ROW_KEY_ALT] + columns = [COL_NAME1] + + _populate_table(data_table, rows_to_delete, rows, columns=columns) + + true_row = data_table.conditional_row(ROW_KEY, PASS_ALL_FILTER) + false_row = data_table.conditional_row(ROW_KEY_ALT, BLOCK_ALL_FILTER) + rows_to_delete.append(true_row) + rows_to_delete.append(false_row) + + # Column names are converted to bytes successfully if they're ASCII strings + # or bytes already. + with pytest.raises(TypeError): + true_row.set_cell(COLUMN_FAMILY_ID1, INT_COL_NAME, CELL_VAL1) + + with pytest.raises(TypeError): + true_row.delete_cell(COLUMN_FAMILY_ID1, INT_COL_NAME) + + # Unicode for column name and value does not get converted to bytes because + # internally we use to_bytes in ascii mode. + with pytest.raises(UnicodeEncodeError): + true_row.set_cell(COLUMN_FAMILY_ID1, JOY_EMOJI, CELL_VAL1) + + with pytest.raises(UnicodeEncodeError): + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, JOY_EMOJI) + + with pytest.raises(UnicodeEncodeError): + true_row.delete_cell(COLUMN_FAMILY_ID1, JOY_EMOJI) + + # Various non int64s, we use struct to pack a Python int to bytes. + with pytest.raises(struct.error): + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL) + + with pytest.raises(struct.error): + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL2) + + # Since floats aren't ints, they aren't converted to bytes via struct.pack, + # but via _to_bytes, so you get a TypeError instead. + with pytest.raises(TypeError): + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, FLOAT_CELL_VAL) + + # Can't have more than MAX_MUTATIONS mutations, but only enforced after + # a row.commit + true_row.clear() + for _ in range(0, MAX_MUTATIONS + 1): + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + with pytest.raises(ValueError): + true_row.commit() + + true_row.clear() + + # State could be anything, but it is evaluated to a boolean later. + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1, state=0) + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2, state=1) + true_row.commit() + + false_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1, state="") + false_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2, state="true_state") + false_row.commit() + + true_row_data = data_table.read_row(true_row.row_key) + assert true_row_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == CELL_VAL2 + + false_row_data = data_table.read_row(false_row.row_key) + assert false_row_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == CELL_VAL1 + + # Not having any mutations is enforced client-side for conditional row; nothing happens. + true_row.clear() + true_row.commit() + + false_row.clear() + false_row.commit() + + +def test_table_append_row_input_errors(data_table, rows_to_delete): + from google.api_core.exceptions import InvalidArgument + from google.cloud.bigtable.row import MAX_MUTATIONS + + row = data_table.append_row(ROW_KEY) + rows_to_delete.append(data_table.direct_row(ROW_KEY)) + + # Column names should be convertible to bytes (str or bytes) + with pytest.raises(TypeError): + row.append_cell_value(COLUMN_FAMILY_ID1, INT_COL_NAME, CELL_VAL1) + + with pytest.raises(TypeError): + row.increment_cell_value(COLUMN_FAMILY_ID1, INT_COL_NAME, 1) + + # Unicode for column name and value + with pytest.raises(UnicodeEncodeError): + row.append_cell_value(COLUMN_FAMILY_ID1, JOY_EMOJI, CELL_VAL1) + + with pytest.raises(UnicodeEncodeError): + row.append_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, JOY_EMOJI) + + with pytest.raises(UnicodeEncodeError): + row.increment_cell_value(COLUMN_FAMILY_ID1, JOY_EMOJI, 1) + + # Non-integer cell values for increment_cell_value + with pytest.raises(ValueError): + row.increment_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL) + + # increment_cell_value does not do input validation on the int_value, instead using + # proto-plus to do validation. + row.increment_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, FLOAT_CELL_VAL) + row.increment_cell_value(COLUMN_FAMILY_ID1, COL_NAME2, FLOAT_CELL_VAL2) + row.commit() + + row_data = data_table.read_row(ROW_KEY) + assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == int(FLOAT_CELL_VAL).to_bytes(8, byteorder="big", signed=True) + assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME2][0].value == int(FLOAT_CELL_VAL2).to_bytes(8, byteorder="big", signed=True) + + # Can't have more than MAX_MUTATIONS mutations, but only enforced after + # a row.commit + row.clear() + for _ in range(0, MAX_MUTATIONS + 1): + row.append_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + with pytest.raises(ValueError): + row.commit() + + # Not having any mutations gives a response of empty dict. + row.clear() + response = row.commit() + assert response == {} + + def test_access_with_non_admin_client(data_client, data_instance_id, data_table_id): instance = data_client.instance(data_instance_id) table = instance.table(data_table_id) From e86899a2afae0b4ac223fb8bbd412bff45a32d5a Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 14 Oct 2025 18:52:29 +0000 Subject: [PATCH 02/12] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- tests/system/v2_client/test_data_api.py | 275 +++++++++++++++++------- 1 file changed, 194 insertions(+), 81 deletions(-) diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index 588098ecd..d1615681b 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -42,16 +42,12 @@ CELL_VAL_FALSE = b"false" INT_COL_NAME = 67890 INT_CELL_VAL = 12345 -OVERFLOW_INT_CELL_VAL = 10 ** 100 -OVERFLOW_INT_CELL_VAL2 = -(10 ** 100) +OVERFLOW_INT_CELL_VAL = 10**100 +OVERFLOW_INT_CELL_VAL2 = -(10**100) FLOAT_CELL_VAL = 1.4 FLOAT_CELL_VAL2 = -1.4 -INITIAL_ROW_SPLITS = [ - b"row_split_1", - b"row_split_2", - b"row_split_3" -] +INITIAL_ROW_SPLITS = [b"row_split_1", b"row_split_2", b"row_split_3"] JOY_EMOJI = "\N{FACE WITH TEARS OF JOY}" PASS_ALL_FILTER = row_filters.PassAllFilter(True) @@ -59,6 +55,7 @@ READ_ROWS_METHOD_NAME = "/google.bigtable.v2.Bigtable/ReadRows" + class SelectiveMethodsErrorInjector(UnaryStreamClientInterceptor): def __init__(self): # As long as there are more items on this list, the items on the list @@ -71,7 +68,9 @@ def __init__(self): self.errors_to_inject = [] @staticmethod - def make_exception(status_code, message=None, fail_mid_stream=False, successes_before_fail=0): + def make_exception( + status_code, message=None, fail_mid_stream=False, successes_before_fail=0 + ): # successes_before_fail allows us to test injecting failures mid-iterator iteration. exc = RpcError(status_code) exc.code = lambda: status_code @@ -86,25 +85,29 @@ def make_exception(status_code, message=None, fail_mid_stream=False, successes_b def _result(): raise exc - + exc.result = _result return exc def intercept_unary_stream(self, continuation, client_call_details, request): - if client_call_details.method == READ_ROWS_METHOD_NAME and self.errors_to_inject: + if ( + client_call_details.method == READ_ROWS_METHOD_NAME + and self.errors_to_inject + ): error = self.errors_to_inject.pop(0) if not error.fail_mid_stream: raise error - + response = continuation(client_call_details, request) if error.fail_mid_stream: + class CallWrapper: def __init__(self, call, exc_to_raise): self._call = call self._exc = exc_to_raise self._successes_remaining = exc_to_raise.successes_before_fail self._raised = False - + def __iter__(self): return self @@ -113,15 +116,15 @@ def __next__(self): self._raised = True if self._exc: raise self._exc - + else: if self._successes_remaining > 0: self._successes_remaining -= 1 return self._call.__next__() - + def __getattr__(self, name): return getattr(self._call, name) - + return CallWrapper(response, error) else: return continuation(client_call_details, request) @@ -161,7 +164,9 @@ def data_table_read_rows_retry_tests(data_table, rows_to_delete): row_keys = [f"row_key_{i}".encode() for i in range(0, 32)] columns = [f"col_{i}".encode() for i in range(0, 32)] - _populate_table(data_table, rows_to_delete, row_keys, columns, CELL_VAL_READ_ROWS_RETRY) + _populate_table( + data_table, rows_to_delete, row_keys, columns, CELL_VAL_READ_ROWS_RETRY + ) yield data_table @@ -293,7 +298,9 @@ def test_table_mutate_rows_retries_timeout(data_table, rows_to_delete): final_success_response = [MutateRowsResponse(entries=[MutateRowsResponse.Entry()])] - with mock.patch.object(data_table._instance._client.table_data_client, "mutate_rows") as mutate_mock: + with mock.patch.object( + data_table._instance._client.table_data_client, "mutate_rows" + ) as mutate_mock: mutate_mock.side_effect = [ initial_error_response, followup_error_response, @@ -317,8 +324,12 @@ def test_table_mutate_rows_retries_timeout(data_table, rows_to_delete): assert statuses[1].code == Code.OK # Simulate only server failures for row 2. - with mock.patch.object(data_table._instance._client.table_data_client, "mutate_rows") as mutate_mock: - mutate_mock.side_effect = [initial_error_response] + [followup_error_response] * 1000000 + with mock.patch.object( + data_table._instance._client.table_data_client, "mutate_rows" + ) as mutate_mock: + mutate_mock.side_effect = [initial_error_response] + [ + followup_error_response + ] * 1000000 row = data_table.direct_row(ROW_KEY) rows_to_delete.append(row) @@ -343,14 +354,18 @@ def test_table_mutate_rows_retries_timeout(data_table, rows_to_delete): row_2 = data_table.direct_row(ROW_KEY_ALT) rows_to_delete.append(row_2) - retry = DEFAULT_RETRY.with_predicate(retries.if_exception_type(_BigtableRetryableError, InvalidArgument)) + retry = DEFAULT_RETRY.with_predicate( + retries.if_exception_type(_BigtableRetryableError, InvalidArgument) + ) _add_test_error_handler(retry) statuses = data_table.mutate_rows([row, row_2], retry=retry) assert statuses[0] is None assert statuses[1] is None -def _populate_table(data_table, rows_to_delete, row_keys, columns=[COL_NAME1], cell_value=CELL_VAL1): +def _populate_table( + data_table, rows_to_delete, row_keys, columns=[COL_NAME1], cell_value=CELL_VAL1 +): for row_key in row_keys: row = data_table.direct_row(row_key) for column in columns: @@ -424,7 +439,12 @@ def test_table_mutate_rows_integers(data_table, rows_to_delete): # Check the contents row1_data = data_table.read_row(ROW_KEY) - assert int.from_bytes(row1_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, byteorder="big") == INT_CELL_VAL + assert ( + int.from_bytes( + row1_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, byteorder="big" + ) + == INT_CELL_VAL + ) def test_table_mutate_rows_input_errors(data_table, rows_to_delete): @@ -445,7 +465,7 @@ def test_table_mutate_rows_input_errors(data_table, rows_to_delete): # client library. for _ in range(0, _MAX_BULK_MUTATIONS + 1): row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) - + with pytest.raises(TooManyMutationsError): data_table.mutate_rows([row]) @@ -674,11 +694,17 @@ def _assert_data_table_read_rows_retry_correct(rows_data): for row_num in range(0, 32): row = rows_data.rows[f"row_key_{row_num}".encode()] for col_num in range(0, 32): - assert row.cells[COLUMN_FAMILY_ID1][f"col_{col_num}".encode()][0].value == CELL_VAL_READ_ROWS_RETRY + assert ( + row.cells[COLUMN_FAMILY_ID1][f"col_{col_num}".encode()][0].value + == CELL_VAL_READ_ROWS_RETRY + ) -def test_table_read_rows_retry_unretriable_error_establishing_stream(data_table_read_rows_with_error_injector): +def test_table_read_rows_retry_unretriable_error_establishing_stream( + data_table_read_rows_with_error_injector, +): from google.api_core import exceptions + error_injector = data_table_read_rows_with_error_injector.error_injector error_injector.errors_to_inject = [ error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False) @@ -688,10 +714,14 @@ def test_table_read_rows_retry_unretriable_error_establishing_stream(data_table_ data_table_read_rows_with_error_injector.read_rows() -def test_table_read_rows_retry_retriable_error_establishing_stream(data_table_read_rows_with_error_injector): +def test_table_read_rows_retry_retriable_error_establishing_stream( + data_table_read_rows_with_error_injector, +): error_injector = data_table_read_rows_with_error_injector.error_injector error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False) + error_injector.make_exception( + StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False + ) ] * 3 rows_data = data_table_read_rows_with_error_injector.read_rows() @@ -700,11 +730,16 @@ def test_table_read_rows_retry_retriable_error_establishing_stream(data_table_re _assert_data_table_read_rows_retry_correct(rows_data) -def test_table_read_rows_retry_unretriable_error_mid_stream(data_table_read_rows_with_error_injector): +def test_table_read_rows_retry_unretriable_error_mid_stream( + data_table_read_rows_with_error_injector, +): from google.api_core import exceptions + error_injector = data_table_read_rows_with_error_injector.error_injector error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.DATA_LOSS, fail_mid_stream=True, successes_before_fail=5) + error_injector.make_exception( + StatusCode.DATA_LOSS, fail_mid_stream=True, successes_before_fail=5 + ) ] rows_data = data_table_read_rows_with_error_injector.read_rows() @@ -712,12 +747,20 @@ def test_table_read_rows_retry_unretriable_error_mid_stream(data_table_read_rows rows_data.consume_all() -def test_table_read_rows_retry_retriable_errors_mid_stream(data_table_read_rows_with_error_injector): +def test_table_read_rows_retry_retriable_errors_mid_stream( + data_table_read_rows_with_error_injector, +): error_injector = data_table_read_rows_with_error_injector.error_injector error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=4), - error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0), - error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0), + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=4 + ), + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0 + ), + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0 + ), ] rows_data = data_table_read_rows_with_error_injector.read_rows() @@ -726,13 +769,31 @@ def test_table_read_rows_retry_retriable_errors_mid_stream(data_table_read_rows_ _assert_data_table_read_rows_retry_correct(rows_data) -def test_table_read_rows_retry_retriable_internal_errors_mid_stream(data_table_read_rows_with_error_injector): +def test_table_read_rows_retry_retriable_internal_errors_mid_stream( + data_table_read_rows_with_error_injector, +): from google.cloud.bigtable.row_data import RETRYABLE_INTERNAL_ERROR_MESSAGES + error_injector = data_table_read_rows_with_error_injector.error_injector error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.INTERNAL, message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], fail_mid_stream=True, successes_before_fail=2), - error_injector.make_exception(StatusCode.INTERNAL, message=RETRYABLE_INTERNAL_ERROR_MESSAGES[1], fail_mid_stream=True, successes_before_fail=1), - error_injector.make_exception(StatusCode.INTERNAL, message=RETRYABLE_INTERNAL_ERROR_MESSAGES[2], fail_mid_stream=True, successes_before_fail=0), + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + fail_mid_stream=True, + successes_before_fail=2, + ), + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[1], + fail_mid_stream=True, + successes_before_fail=1, + ), + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[2], + fail_mid_stream=True, + successes_before_fail=0, + ), ] rows_data = data_table_read_rows_with_error_injector.read_rows() @@ -741,11 +802,19 @@ def test_table_read_rows_retry_retriable_internal_errors_mid_stream(data_table_r _assert_data_table_read_rows_retry_correct(rows_data) -def test_table_read_rows_retry_unretriable_internal_errors_mid_stream(data_table_read_rows_with_error_injector): +def test_table_read_rows_retry_unretriable_internal_errors_mid_stream( + data_table_read_rows_with_error_injector, +): from google.api_core import exceptions + error_injector = data_table_read_rows_with_error_injector.error_injector error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.INTERNAL, message="Don't retry this at home!", fail_mid_stream=True, successes_before_fail=2), + error_injector.make_exception( + StatusCode.INTERNAL, + message="Don't retry this at home!", + fail_mid_stream=True, + successes_before_fail=2, + ), ] rows_data = data_table_read_rows_with_error_injector.read_rows() @@ -753,13 +822,17 @@ def test_table_read_rows_retry_unretriable_internal_errors_mid_stream(data_table rows_data.consume_all() -def test_table_read_rows_retry_retriable_error_mid_stream_unretriable_error_reestablishing_stream(data_table_read_rows_with_error_injector): +def test_table_read_rows_retry_retriable_error_mid_stream_unretriable_error_reestablishing_stream( + data_table_read_rows_with_error_injector, +): # Simulate a connection failure mid-stream into an unretriable error when trying to reconnect. from google.api_core import exceptions error_injector = data_table_read_rows_with_error_injector.error_injector error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5), + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5 + ), error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False), ] @@ -769,11 +842,15 @@ def test_table_read_rows_retry_retriable_error_mid_stream_unretriable_error_rees rows_data.consume_all() -def test_table_read_rows_retry_retriable_error_mid_stream_retriable_error_reestablishing_stream(data_table_read_rows_with_error_injector): +def test_table_read_rows_retry_retriable_error_mid_stream_retriable_error_reestablishing_stream( + data_table_read_rows_with_error_injector, +): # Simulate a connection failure mid-stream into retriable errors when trying to reconnect. error_injector = data_table_read_rows_with_error_injector.error_injector error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5), + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5 + ), error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), @@ -785,26 +862,45 @@ def test_table_read_rows_retry_retriable_error_mid_stream_retriable_error_reesta _assert_data_table_read_rows_retry_correct(rows_data) -def test_table_read_rows_retry_timeout_mid_stream(data_table_read_rows_with_error_injector): +def test_table_read_rows_retry_timeout_mid_stream( + data_table_read_rows_with_error_injector, +): # Simulate a read timeout mid stream. from google.api_core import exceptions - from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS, RETRYABLE_INTERNAL_ERROR_MESSAGES + from google.cloud.bigtable.row_data import ( + DEFAULT_RETRY_READ_ROWS, + RETRYABLE_INTERNAL_ERROR_MESSAGES, + ) error_injector = data_table_read_rows_with_error_injector.error_injector error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.INTERNAL, message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], fail_mid_stream=True, successes_before_fail=5), + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + fail_mid_stream=True, + successes_before_fail=5, + ), ] + [ - error_injector.make_exception(StatusCode.INTERNAL, message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], fail_mid_stream=True, successes_before_fail=0), + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + fail_mid_stream=True, + successes_before_fail=0, + ), ] * 20 # Shorten the deadline so the timeout test is shorter. - rows_data = data_table_read_rows_with_error_injector.read_rows(retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0)) + rows_data = data_table_read_rows_with_error_injector.read_rows( + retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) + ) with pytest.raises(exceptions.RetryError): rows_data.consume_all() -def test_table_read_rows_retry_timeout_establishing_stream(data_table_read_rows_with_error_injector): +def test_table_read_rows_retry_timeout_establishing_stream( + data_table_read_rows_with_error_injector, +): # Simulate a read timeout when creating the stream. from google.api_core import exceptions @@ -812,14 +908,20 @@ def test_table_read_rows_retry_timeout_establishing_stream(data_table_read_rows_ error_injector = data_table_read_rows_with_error_injector.error_injector error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False), + error_injector.make_exception( + StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False + ), ] + [ - error_injector.make_exception(StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False), + error_injector.make_exception( + StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False + ), ] * 20 # Shorten the deadline so the timeout test is shorter. with pytest.raises(exceptions.RetryError): - data_table_read_rows_with_error_injector.read_rows(retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0)) + data_table_read_rows_with_error_injector.read_rows( + retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) + ) def test_table_check_and_mutate_rows(data_table, rows_to_delete): @@ -834,7 +936,12 @@ def test_table_check_and_mutate_rows(data_table, rows_to_delete): b"col_pr_1", b"col_pr_2", ] - _populate_table(data_table, rows_to_delete, [true_mutation_row_key, false_mutation_row_key], columns=columns) + _populate_table( + data_table, + rows_to_delete, + [true_mutation_row_key, false_mutation_row_key], + columns=columns, + ) true_row = data_table.conditional_row(true_mutation_row_key, PASS_ALL_FILTER) for column in columns: true_row.set_cell(COLUMN_FAMILY_ID1, column, CELL_VAL_TRUE, state=True) @@ -852,12 +959,12 @@ def test_table_check_and_mutate_rows(data_table, rows_to_delete): row1_data = data_table.read_row(true_mutation_row_key) for column in columns: assert row1_data.cells[COLUMN_FAMILY_ID1][column][0].value == CELL_VAL_TRUE - + row2_data = data_table.read_row(false_mutation_row_key) assert row2_data is None # all cells should be deleted -def test_table_append_row(data_table, rows_to_delete): +def test_table_append_row(data_table, rows_to_delete): row = data_table.append_row(ROW_KEY) rows_to_delete.append(data_table.direct_row(ROW_KEY)) @@ -873,11 +980,13 @@ def test_table_append_row(data_table, rows_to_delete): for _ in range(0, num_increments): row.increment_cell_value(COLUMN_FAMILY_ID1, int_col_name, 1) - + row.commit() - + row_data = data_table.read_row(ROW_KEY) - assert row_data.cells[COLUMN_FAMILY_ID1][int_col_name][0].value == num_increments.to_bytes(8, byteorder="big", signed=True) + assert row_data.cells[COLUMN_FAMILY_ID1][int_col_name][ + 0 + ].value == num_increments.to_bytes(8, byteorder="big", signed=True) assert row_data.cells[COLUMN_FAMILY_ID1][str_col_name][0].value == b"foobarbaz" @@ -909,42 +1018,42 @@ def test_table_direct_row_input_errors(data_table, rows_to_delete): # or bytes already. with pytest.raises(TypeError): row.set_cell(COLUMN_FAMILY_ID1, INT_COL_NAME, CELL_VAL1) - + with pytest.raises(TypeError): row.delete_cell(COLUMN_FAMILY_ID1, INT_COL_NAME) - + # Unicode for column name and value does not get converted to bytes because # internally we use to_bytes in ascii mode. with pytest.raises(UnicodeEncodeError): row.set_cell(COLUMN_FAMILY_ID1, JOY_EMOJI, CELL_VAL1) - + with pytest.raises(UnicodeEncodeError): row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, JOY_EMOJI) - + with pytest.raises(UnicodeEncodeError): row.delete_cell(COLUMN_FAMILY_ID1, JOY_EMOJI) # Various non int64s, we use struct to pack a Python int to bytes. with pytest.raises(struct.error): row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL) - + with pytest.raises(struct.error): row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL2) - + # Since floats aren't ints, they aren't converted to bytes via struct.pack, # but via _to_bytes, so you get a TypeError instead. with pytest.raises(TypeError): row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, FLOAT_CELL_VAL) - + # Can't have more than MAX_MUTATIONS mutations, but only enforced after # a row.commit row.clear() for _ in range(0, MAX_MUTATIONS + 1): row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) - + with pytest.raises(ValueError): row.commit() - + # Not having any mutations gives a server error (InvalidArgument), not # enforced on the client side. row.clear() @@ -969,18 +1078,18 @@ def test_table_conditional_row_input_errors(data_table, rows_to_delete): # or bytes already. with pytest.raises(TypeError): true_row.set_cell(COLUMN_FAMILY_ID1, INT_COL_NAME, CELL_VAL1) - + with pytest.raises(TypeError): true_row.delete_cell(COLUMN_FAMILY_ID1, INT_COL_NAME) - + # Unicode for column name and value does not get converted to bytes because # internally we use to_bytes in ascii mode. with pytest.raises(UnicodeEncodeError): true_row.set_cell(COLUMN_FAMILY_ID1, JOY_EMOJI, CELL_VAL1) - + with pytest.raises(UnicodeEncodeError): true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, JOY_EMOJI) - + with pytest.raises(UnicodeEncodeError): true_row.delete_cell(COLUMN_FAMILY_ID1, JOY_EMOJI) @@ -1004,7 +1113,7 @@ def test_table_conditional_row_input_errors(data_table, rows_to_delete): with pytest.raises(ValueError): true_row.commit() - + true_row.clear() # State could be anything, but it is evaluated to a boolean later. @@ -1013,7 +1122,7 @@ def test_table_conditional_row_input_errors(data_table, rows_to_delete): true_row.commit() false_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1, state="") - false_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2, state="true_state") + false_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2, state="true_state") false_row.commit() true_row_data = data_table.read_row(true_row.row_key) @@ -1022,7 +1131,7 @@ def test_table_conditional_row_input_errors(data_table, rows_to_delete): false_row_data = data_table.read_row(false_row.row_key) assert false_row_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == CELL_VAL1 - # Not having any mutations is enforced client-side for conditional row; nothing happens. + # Not having any mutations is enforced client-side for conditional row; nothing happens. true_row.clear() true_row.commit() @@ -1040,7 +1149,7 @@ def test_table_append_row_input_errors(data_table, rows_to_delete): # Column names should be convertible to bytes (str or bytes) with pytest.raises(TypeError): row.append_cell_value(COLUMN_FAMILY_ID1, INT_COL_NAME, CELL_VAL1) - + with pytest.raises(TypeError): row.increment_cell_value(COLUMN_FAMILY_ID1, INT_COL_NAME, 1) @@ -1050,10 +1159,10 @@ def test_table_append_row_input_errors(data_table, rows_to_delete): with pytest.raises(UnicodeEncodeError): row.append_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, JOY_EMOJI) - + with pytest.raises(UnicodeEncodeError): row.increment_cell_value(COLUMN_FAMILY_ID1, JOY_EMOJI, 1) - + # Non-integer cell values for increment_cell_value with pytest.raises(ValueError): row.increment_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL) @@ -1065,18 +1174,22 @@ def test_table_append_row_input_errors(data_table, rows_to_delete): row.commit() row_data = data_table.read_row(ROW_KEY) - assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == int(FLOAT_CELL_VAL).to_bytes(8, byteorder="big", signed=True) - assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME2][0].value == int(FLOAT_CELL_VAL2).to_bytes(8, byteorder="big", signed=True) + assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == int( + FLOAT_CELL_VAL + ).to_bytes(8, byteorder="big", signed=True) + assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME2][0].value == int( + FLOAT_CELL_VAL2 + ).to_bytes(8, byteorder="big", signed=True) # Can't have more than MAX_MUTATIONS mutations, but only enforced after # a row.commit row.clear() for _ in range(0, MAX_MUTATIONS + 1): row.append_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) - + with pytest.raises(ValueError): row.commit() - + # Not having any mutations gives a response of empty dict. row.clear() response = row.commit() From 0426ce2c606a88c02582b0646f41a6f5244e14c5 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Tue, 14 Oct 2025 20:40:16 +0000 Subject: [PATCH 03/12] Use 3.12 instead of 3.8 for system tests --- .kokoro/presubmit/{system-3.8.cfg => system-3.12.cfg} | 2 +- noxfile.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) rename .kokoro/presubmit/{system-3.8.cfg => system-3.12.cfg} (82%) diff --git a/.kokoro/presubmit/system-3.8.cfg b/.kokoro/presubmit/system-3.12.cfg similarity index 82% rename from .kokoro/presubmit/system-3.8.cfg rename to .kokoro/presubmit/system-3.12.cfg index f4bcee3db..78cdc5e85 100644 --- a/.kokoro/presubmit/system-3.8.cfg +++ b/.kokoro/presubmit/system-3.12.cfg @@ -3,5 +3,5 @@ # Only run this nox session. env_vars: { key: "NOX_SESSION" - value: "system-3.8" + value: "system-3.12" } \ No newline at end of file diff --git a/noxfile.py b/noxfile.py index 548bfd0ec..173e5d3ec 100644 --- a/noxfile.py +++ b/noxfile.py @@ -58,7 +58,7 @@ UNIT_TEST_EXTRAS: List[str] = [] UNIT_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = {} -SYSTEM_TEST_PYTHON_VERSIONS: List[str] = ["3.8", "3.12"] +SYSTEM_TEST_PYTHON_VERSIONS: List[str] = ["3.12"] SYSTEM_TEST_STANDARD_DEPENDENCIES: List[str] = [ "mock", "pytest", @@ -206,7 +206,6 @@ def install_unittest_dependencies(session, *constraints): ) def unit(session, protobuf_implementation): # Install all test dependencies, then install this package in-place. - if protobuf_implementation == "cpp" and session.python in ("3.11", "3.12", "3.13"): session.skip("cpp implementation is not supported in python 3.11+") From cca6ccdf47ffeb77aa60f16a7eaa778dd99bb8d5 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Tue, 14 Oct 2025 20:41:13 +0000 Subject: [PATCH 04/12] linting --- tests/system/v2_client/test_data_api.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index d1615681b..496682761 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -947,14 +947,14 @@ def test_table_check_and_mutate_rows(data_table, rows_to_delete): true_row.set_cell(COLUMN_FAMILY_ID1, column, CELL_VAL_TRUE, state=True) true_row.set_cell(COLUMN_FAMILY_ID1, column, CELL_VAL_FALSE, state=False) matched = true_row.commit() - assert matched == True + assert matched false_row = data_table.conditional_row(false_mutation_row_key, BLOCK_ALL_FILTER) for column in columns: false_row.set_cell(COLUMN_FAMILY_ID1, column, CELL_VAL_TRUE, state=True) false_row.delete_cell(COLUMN_FAMILY_ID1, column, state=False) matched = false_row.commit() - assert matched == False + assert not matched row1_data = data_table.read_row(true_mutation_row_key) for column in columns: @@ -1140,7 +1140,6 @@ def test_table_conditional_row_input_errors(data_table, rows_to_delete): def test_table_append_row_input_errors(data_table, rows_to_delete): - from google.api_core.exceptions import InvalidArgument from google.cloud.bigtable.row import MAX_MUTATIONS row = data_table.append_row(ROW_KEY) From 1bd02301cde295b5a8ab4e7457130bcca1b3e72f Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 14 Oct 2025 20:44:24 +0000 Subject: [PATCH 05/12] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .kokoro/presubmit/system-3.8.cfg | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .kokoro/presubmit/system-3.8.cfg diff --git a/.kokoro/presubmit/system-3.8.cfg b/.kokoro/presubmit/system-3.8.cfg new file mode 100644 index 000000000..f4bcee3db --- /dev/null +++ b/.kokoro/presubmit/system-3.8.cfg @@ -0,0 +1,7 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +# Only run this nox session. +env_vars: { + key: "NOX_SESSION" + value: "system-3.8" +} \ No newline at end of file From b3ce1cdde6ccd5297c7862647eff048394b8a677 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Wed, 15 Oct 2025 14:39:59 +0000 Subject: [PATCH 06/12] Skipped sample_row_keys test on emulator --- tests/system/v2_client/test_data_api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index 496682761..15f14d918 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -990,7 +990,9 @@ def test_table_append_row(data_table, rows_to_delete): assert row_data.cells[COLUMN_FAMILY_ID1][str_col_name][0].value == b"foobarbaz" -def test_table_sample_row_keys(data_table): +def test_table_sample_row_keys(data_table, skip_on_emulator): + # Skip on emulator because it gives a random response. + # sample_row_keys returns a generator response = list(data_table.sample_row_keys()) previous_offset_bytes = 0 From 4499e4b1b1502e89ed01609f3f17d94917c1c39b Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Tue, 14 Oct 2025 20:40:16 +0000 Subject: [PATCH 07/12] Use 3.12 instead of 3.8 for system tests --- bigtable_delete.sh | 26 ++++++++++++++++++++++++++ test.out | 5 +++++ 2 files changed, 31 insertions(+) create mode 100644 bigtable_delete.sh create mode 100644 test.out diff --git a/bigtable_delete.sh b/bigtable_delete.sh new file mode 100644 index 000000000..ad460098b --- /dev/null +++ b/bigtable_delete.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +function filter_instances() { + gcloud bigtable instances list | grep -v "NAME" | awk '{print $1}' | grep $1 +} + +# function filter_backups() { + +# } + +function delete_instances() { + for instance in $(filter_instances $1); do + # if [ -z "$2" ]; then + # for backup in $() + # fi + gcloud bigtable instances delete $instance + done +} + +current_project=$(gcloud config get project) + +gcloud config set project precise-truck-742 +delete_instances g-c-p-* +delete_instances python-bigtable-tests-* +delete_instances admin-overlay-instance-* +gcloud config set project $current_project diff --git a/test.out b/test.out new file mode 100644 index 000000000..3b21df3a4 --- /dev/null +++ b/test.out @@ -0,0 +1,5 @@ +nox > Running session system-3.8 +nox > Creating virtual environment (virtualenv) using python3.8 in .nox/system-3-8 +nox > python -m pip install --pre 'grpcio!=1.52.0rc1' +nox > Interrupted... +nox > Session system-3.8 interrupted. From a895bc20302b86cc165fc1a0a95f650db368a74d Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Tue, 21 Oct 2025 15:46:36 +0000 Subject: [PATCH 08/12] Addressed review feedback --- tests/system/v2_client/test_data_api.py | 437 ++++++++++++------------ 1 file changed, 223 insertions(+), 214 deletions(-) diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index 15f14d918..bb709ef51 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -56,15 +56,14 @@ READ_ROWS_METHOD_NAME = "/google.bigtable.v2.Bigtable/ReadRows" -class SelectiveMethodsErrorInjector(UnaryStreamClientInterceptor): +class ReadRowsErrorInjector(UnaryStreamClientInterceptor): + """An error injector that can be configured to raise errors for the ReadRows method. + + The error injector is configured to inject errors off the self.errors_to_inject queue. + Exceptions can be configured to arise either during stream initialization or in the middle + of a stream. + """ def __init__(self): - # As long as there are more items on this list, the items on the list - # are as follows: - # - # 1. None = behave as normal - # 2. errors are raised. - # - # This is to inject errors mid stream after a bunch of normal behavior. self.errors_to_inject = [] @staticmethod @@ -159,36 +158,25 @@ def rows_to_delete(): row.commit() -@pytest.fixture(scope="function") -def data_table_read_rows_retry_tests(data_table, rows_to_delete): +@pytest.fixture(scope="class") +def data_table_read_rows_retry_tests(data_table): row_keys = [f"row_key_{i}".encode() for i in range(0, 32)] columns = [f"col_{i}".encode() for i in range(0, 32)] + # Need to add this here as a class level teardown since rows_to_delete + # is a function level fixture. + rows_to_delete = [] + _populate_table( data_table, rows_to_delete, row_keys, columns, CELL_VAL_READ_ROWS_RETRY ) yield data_table - -@pytest.fixture(scope="function") -def data_table_read_rows_with_error_injector(data_table_read_rows_retry_tests): - data_client = data_table_read_rows_retry_tests._instance._client.table_data_client - error_injector = SelectiveMethodsErrorInjector() - old_logged_channel = data_client.transport._logged_channel - data_client.transport._logged_channel = intercept_channel( - old_logged_channel, error_injector - ) - data_table_read_rows_retry_tests.error_injector = error_injector - data_client.transport._stubs = {} - data_client.transport._prep_wrapped_messages(None) - - yield data_table_read_rows_retry_tests - - del data_table_read_rows_retry_tests.error_injector - data_client.transport._logged_channel = old_logged_channel - data_client.transport._stubs = {} - data_client.transport._prep_wrapped_messages(None) + for row in rows_to_delete: + row.clear() + row.delete() + row.commit() def test_table_read_rows_filter_millis(data_table): @@ -232,6 +220,7 @@ def test_table_mutate_rows(data_table, rows_to_delete): def _add_test_error_handler(retry): + """Overwrites the current on_error function to assert that backoff values are within expected bounds.""" import time curr_time = time.monotonic() @@ -699,229 +688,249 @@ def _assert_data_table_read_rows_retry_correct(rows_data): == CELL_VAL_READ_ROWS_RETRY ) +@pytest.mark.usefixtures("data_table_read_rows_retry_tests") +class TestTableReadRowsWithRetry: + @pytest.fixture(scope="function") + def data_table_read_rows_with_error_injector(self, data_table_read_rows_retry_tests): + data_client = data_table_read_rows_retry_tests._instance._client.table_data_client + error_injector = ReadRowsErrorInjector() + old_logged_channel = data_client.transport._logged_channel + data_client.transport._logged_channel = intercept_channel( + old_logged_channel, error_injector + ) + data_table_read_rows_retry_tests.error_injector = error_injector + data_client.transport._stubs = {} + data_client.transport._prep_wrapped_messages(None) -def test_table_read_rows_retry_unretriable_error_establishing_stream( - data_table_read_rows_with_error_injector, -): - from google.api_core import exceptions + yield data_table_read_rows_retry_tests - error_injector = data_table_read_rows_with_error_injector.error_injector - error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False) - ] + del data_table_read_rows_retry_tests.error_injector + data_client.transport._logged_channel = old_logged_channel + data_client.transport._stubs = {} + data_client.transport._prep_wrapped_messages(None) - with pytest.raises(exceptions.Aborted): - data_table_read_rows_with_error_injector.read_rows() + def test_table_read_rows_retry_unretriable_error_establishing_stream( + self, data_table_read_rows_with_error_injector, + ): + from google.api_core import exceptions + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False) + ] -def test_table_read_rows_retry_retriable_error_establishing_stream( - data_table_read_rows_with_error_injector, -): - error_injector = data_table_read_rows_with_error_injector.error_injector - error_injector.errors_to_inject = [ - error_injector.make_exception( - StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False - ) - ] * 3 + with pytest.raises(exceptions.Aborted): + data_table_read_rows_with_error_injector.read_rows() - rows_data = data_table_read_rows_with_error_injector.read_rows() - rows_data.consume_all() - _assert_data_table_read_rows_retry_correct(rows_data) + def test_table_read_rows_retry_retriable_error_establishing_stream( + self, data_table_read_rows_with_error_injector, + ): + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False + ) + ] * 3 + rows_data = data_table_read_rows_with_error_injector.read_rows() + rows_data.consume_all() -def test_table_read_rows_retry_unretriable_error_mid_stream( - data_table_read_rows_with_error_injector, -): - from google.api_core import exceptions + _assert_data_table_read_rows_retry_correct(rows_data) - error_injector = data_table_read_rows_with_error_injector.error_injector - error_injector.errors_to_inject = [ - error_injector.make_exception( - StatusCode.DATA_LOSS, fail_mid_stream=True, successes_before_fail=5 - ) - ] - rows_data = data_table_read_rows_with_error_injector.read_rows() - with pytest.raises(exceptions.DataLoss): - rows_data.consume_all() + def test_table_read_rows_retry_unretriable_error_mid_stream( + self, data_table_read_rows_with_error_injector, + ): + from google.api_core import exceptions + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.DATA_LOSS, fail_mid_stream=True, successes_before_fail=5 + ) + ] -def test_table_read_rows_retry_retriable_errors_mid_stream( - data_table_read_rows_with_error_injector, -): - error_injector = data_table_read_rows_with_error_injector.error_injector - error_injector.errors_to_inject = [ - error_injector.make_exception( - StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=4 - ), - error_injector.make_exception( - StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0 - ), - error_injector.make_exception( - StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0 - ), - ] + rows_data = data_table_read_rows_with_error_injector.read_rows() + with pytest.raises(exceptions.DataLoss): + rows_data.consume_all() - rows_data = data_table_read_rows_with_error_injector.read_rows() - rows_data.consume_all() - _assert_data_table_read_rows_retry_correct(rows_data) + def test_table_read_rows_retry_retriable_errors_mid_stream( + self, data_table_read_rows_with_error_injector, + ): + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=4 + ), + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0 + ), + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0 + ), + ] + rows_data = data_table_read_rows_with_error_injector.read_rows() + rows_data.consume_all() -def test_table_read_rows_retry_retriable_internal_errors_mid_stream( - data_table_read_rows_with_error_injector, -): - from google.cloud.bigtable.row_data import RETRYABLE_INTERNAL_ERROR_MESSAGES - - error_injector = data_table_read_rows_with_error_injector.error_injector - error_injector.errors_to_inject = [ - error_injector.make_exception( - StatusCode.INTERNAL, - message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], - fail_mid_stream=True, - successes_before_fail=2, - ), - error_injector.make_exception( - StatusCode.INTERNAL, - message=RETRYABLE_INTERNAL_ERROR_MESSAGES[1], - fail_mid_stream=True, - successes_before_fail=1, - ), - error_injector.make_exception( - StatusCode.INTERNAL, - message=RETRYABLE_INTERNAL_ERROR_MESSAGES[2], - fail_mid_stream=True, - successes_before_fail=0, - ), - ] + _assert_data_table_read_rows_retry_correct(rows_data) - rows_data = data_table_read_rows_with_error_injector.read_rows() - rows_data.consume_all() - _assert_data_table_read_rows_retry_correct(rows_data) + def test_table_read_rows_retry_retriable_internal_errors_mid_stream( + self, data_table_read_rows_with_error_injector, + ): + from google.cloud.bigtable.row_data import RETRYABLE_INTERNAL_ERROR_MESSAGES + + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + fail_mid_stream=True, + successes_before_fail=2, + ), + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[1], + fail_mid_stream=True, + successes_before_fail=1, + ), + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[2], + fail_mid_stream=True, + successes_before_fail=0, + ), + ] + rows_data = data_table_read_rows_with_error_injector.read_rows() + rows_data.consume_all() -def test_table_read_rows_retry_unretriable_internal_errors_mid_stream( - data_table_read_rows_with_error_injector, -): - from google.api_core import exceptions - - error_injector = data_table_read_rows_with_error_injector.error_injector - error_injector.errors_to_inject = [ - error_injector.make_exception( - StatusCode.INTERNAL, - message="Don't retry this at home!", - fail_mid_stream=True, - successes_before_fail=2, - ), - ] + _assert_data_table_read_rows_retry_correct(rows_data) - rows_data = data_table_read_rows_with_error_injector.read_rows() - with pytest.raises(exceptions.InternalServerError): - rows_data.consume_all() + def test_table_read_rows_retry_unretriable_internal_errors_mid_stream( + self, data_table_read_rows_with_error_injector, + ): + from google.api_core import exceptions + + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.INTERNAL, + message="Don't retry this at home!", + fail_mid_stream=True, + successes_before_fail=2, + ), + ] -def test_table_read_rows_retry_retriable_error_mid_stream_unretriable_error_reestablishing_stream( - data_table_read_rows_with_error_injector, -): - # Simulate a connection failure mid-stream into an unretriable error when trying to reconnect. - from google.api_core import exceptions - - error_injector = data_table_read_rows_with_error_injector.error_injector - error_injector.errors_to_inject = [ - error_injector.make_exception( - StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5 - ), - error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False), - ] + rows_data = data_table_read_rows_with_error_injector.read_rows() + with pytest.raises(exceptions.InternalServerError): + rows_data.consume_all() - rows_data = data_table_read_rows_with_error_injector.read_rows() - with pytest.raises(exceptions.Aborted): - rows_data.consume_all() + def test_table_read_rows_retry_retriable_error_mid_stream_unretriable_error_reestablishing_stream( + self, data_table_read_rows_with_error_injector, + ): + # Simulate a connection failure mid-stream into an unretriable error when trying to reconnect. + from google.api_core import exceptions + + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5 + ), + error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False), + ] + rows_data = data_table_read_rows_with_error_injector.read_rows() -def test_table_read_rows_retry_retriable_error_mid_stream_retriable_error_reestablishing_stream( - data_table_read_rows_with_error_injector, -): - # Simulate a connection failure mid-stream into retriable errors when trying to reconnect. - error_injector = data_table_read_rows_with_error_injector.error_injector - error_injector.errors_to_inject = [ - error_injector.make_exception( - StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5 - ), - error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), - error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), - error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), - ] + with pytest.raises(exceptions.Aborted): + rows_data.consume_all() - rows_data = data_table_read_rows_with_error_injector.read_rows() - rows_data.consume_all() - _assert_data_table_read_rows_retry_correct(rows_data) + def test_table_read_rows_retry_retriable_error_mid_stream_retriable_error_reestablishing_stream( + self, data_table_read_rows_with_error_injector, + ): + # Simulate a connection failure mid-stream into retriable errors when trying to reconnect. + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5 + ), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), + ] + rows_data = data_table_read_rows_with_error_injector.read_rows() + rows_data.consume_all() -def test_table_read_rows_retry_timeout_mid_stream( - data_table_read_rows_with_error_injector, -): - # Simulate a read timeout mid stream. + _assert_data_table_read_rows_retry_correct(rows_data) - from google.api_core import exceptions - from google.cloud.bigtable.row_data import ( - DEFAULT_RETRY_READ_ROWS, - RETRYABLE_INTERNAL_ERROR_MESSAGES, - ) - error_injector = data_table_read_rows_with_error_injector.error_injector - error_injector.errors_to_inject = [ - error_injector.make_exception( - StatusCode.INTERNAL, - message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], - fail_mid_stream=True, - successes_before_fail=5, - ), - ] + [ - error_injector.make_exception( - StatusCode.INTERNAL, - message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], - fail_mid_stream=True, - successes_before_fail=0, - ), - ] * 20 - - # Shorten the deadline so the timeout test is shorter. - rows_data = data_table_read_rows_with_error_injector.read_rows( - retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) - ) - with pytest.raises(exceptions.RetryError): - rows_data.consume_all() + def test_table_read_rows_retry_timeout_mid_stream( + self, data_table_read_rows_with_error_injector, + ): + # Simulate a read timeout mid stream. + from google.api_core import exceptions + from google.cloud.bigtable.row_data import ( + DEFAULT_RETRY_READ_ROWS, + RETRYABLE_INTERNAL_ERROR_MESSAGES, + ) -def test_table_read_rows_retry_timeout_establishing_stream( - data_table_read_rows_with_error_injector, -): - # Simulate a read timeout when creating the stream. - - from google.api_core import exceptions - from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS - - error_injector = data_table_read_rows_with_error_injector.error_injector - error_injector.errors_to_inject = [ - error_injector.make_exception( - StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False - ), - ] + [ - error_injector.make_exception( - StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False - ), - ] * 20 - - # Shorten the deadline so the timeout test is shorter. - with pytest.raises(exceptions.RetryError): - data_table_read_rows_with_error_injector.read_rows( + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + fail_mid_stream=True, + successes_before_fail=5, + ), + ] + [ + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + fail_mid_stream=True, + successes_before_fail=0, + ), + ] * 20 + + # Shorten the deadline so the timeout test is shorter. + rows_data = data_table_read_rows_with_error_injector.read_rows( retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) ) + with pytest.raises(exceptions.RetryError): + rows_data.consume_all() + + + def test_table_read_rows_retry_timeout_establishing_stream( + self, data_table_read_rows_with_error_injector, + ): + # Simulate a read timeout when creating the stream. + + from google.api_core import exceptions + from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS + + error_injector = data_table_read_rows_with_error_injector.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False + ), + ] + [ + error_injector.make_exception( + StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False + ), + ] * 20 + + # Shorten the deadline so the timeout test is shorter. + with pytest.raises(exceptions.RetryError): + data_table_read_rows_with_error_injector.read_rows( + retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) + ) def test_table_check_and_mutate_rows(data_table, rows_to_delete): From a16360c1fd36635fb33b7c1ce1b14628fd7749db Mon Sep 17 00:00:00 2001 From: Kevin Zheng <147537668+gkevinzheng@users.noreply.github.com> Date: Tue, 21 Oct 2025 11:48:06 -0400 Subject: [PATCH 09/12] Update test_data_api.py --- tests/system/v2_client/test_data_api.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index bb709ef51..ac2573908 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -61,7 +61,8 @@ class ReadRowsErrorInjector(UnaryStreamClientInterceptor): The error injector is configured to inject errors off the self.errors_to_inject queue. Exceptions can be configured to arise either during stream initialization or in the middle - of a stream. + of a stream. If no errors are in the error injection queue, the ReadRows RPC call will behave + normally. """ def __init__(self): self.errors_to_inject = [] From 533dba1c2bb9b03f9395e20fdcb6230462612f5f Mon Sep 17 00:00:00 2001 From: Kevin Zheng <147537668+gkevinzheng@users.noreply.github.com> Date: Tue, 21 Oct 2025 13:46:30 -0400 Subject: [PATCH 10/12] Delete test.out --- test.out | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 test.out diff --git a/test.out b/test.out deleted file mode 100644 index 3b21df3a4..000000000 --- a/test.out +++ /dev/null @@ -1,5 +0,0 @@ -nox > Running session system-3.8 -nox > Creating virtual environment (virtualenv) using python3.8 in .nox/system-3-8 -nox > python -m pip install --pre 'grpcio!=1.52.0rc1' -nox > Interrupted... -nox > Session system-3.8 interrupted. From 68a91bec1ae5f8620aaef63d233c590f77a97467 Mon Sep 17 00:00:00 2001 From: Kevin Zheng <147537668+gkevinzheng@users.noreply.github.com> Date: Tue, 21 Oct 2025 13:49:49 -0400 Subject: [PATCH 11/12] Delete .kokoro/presubmit/system-3.12.cfg --- .kokoro/presubmit/system-3.12.cfg | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 .kokoro/presubmit/system-3.12.cfg diff --git a/.kokoro/presubmit/system-3.12.cfg b/.kokoro/presubmit/system-3.12.cfg deleted file mode 100644 index 78cdc5e85..000000000 --- a/.kokoro/presubmit/system-3.12.cfg +++ /dev/null @@ -1,7 +0,0 @@ -# Format: //devtools/kokoro/config/proto/build.proto - -# Only run this nox session. -env_vars: { - key: "NOX_SESSION" - value: "system-3.12" -} \ No newline at end of file From 72cc5bb7baedd75b1a6653fb7676dda96e64dc50 Mon Sep 17 00:00:00 2001 From: Kevin Zheng <147537668+gkevinzheng@users.noreply.github.com> Date: Tue, 21 Oct 2025 13:50:10 -0400 Subject: [PATCH 12/12] Delete bigtable_delete.sh --- bigtable_delete.sh | 26 -------------------------- 1 file changed, 26 deletions(-) delete mode 100644 bigtable_delete.sh diff --git a/bigtable_delete.sh b/bigtable_delete.sh deleted file mode 100644 index ad460098b..000000000 --- a/bigtable_delete.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash - -function filter_instances() { - gcloud bigtable instances list | grep -v "NAME" | awk '{print $1}' | grep $1 -} - -# function filter_backups() { - -# } - -function delete_instances() { - for instance in $(filter_instances $1); do - # if [ -z "$2" ]; then - # for backup in $() - # fi - gcloud bigtable instances delete $instance - done -} - -current_project=$(gcloud config get project) - -gcloud config set project precise-truck-742 -delete_instances g-c-p-* -delete_instances python-bigtable-tests-* -delete_instances admin-overlay-instance-* -gcloud config set project $current_project