diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index e11379108..b333d9c6a 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -227,7 +227,7 @@ def _on_error(self, exc): retry_request = self._create_retry_request() self._row_merger = _RowMerger(self._row_merger.last_seen_row_key) - self.response_iterator = self.read_method(retry_request) + self.response_iterator = self.read_method(retry_request, retry=self.retry) def _read_next(self): """Helper for :meth:`__iter__`.""" diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index 579837e34..f41aa8377 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -14,9 +14,17 @@ import datetime import operator +import struct import pytest +from google.cloud.bigtable import row_filters + +from grpc import UnaryStreamClientInterceptor +from grpc import RpcError +from grpc import StatusCode +from grpc import intercept_channel + COLUMN_FAMILY_ID1 = "col-fam-id1" COLUMN_FAMILY_ID2 = "col-fam-id2" COL_NAME1 = b"col-name1" @@ -26,9 +34,105 @@ CELL_VAL2 = b"cell-val-newer" CELL_VAL3 = b"altcol-cell-val" CELL_VAL4 = b"foo" +CELL_VAL_READ_ROWS_RETRY = b"1" * 512 ROW_KEY = b"row-key" ROW_KEY_ALT = b"row-key-alt" +CELL_VAL_TRUE = b"true" +CELL_VAL_FALSE = b"false" +INT_COL_NAME = 67890 +INT_CELL_VAL = 12345 +OVERFLOW_INT_CELL_VAL = 10**100 +OVERFLOW_INT_CELL_VAL2 = -(10**100) +FLOAT_CELL_VAL = 1.4 +FLOAT_CELL_VAL2 = -1.4 + +INITIAL_ROW_SPLITS = [b"row_split_1", b"row_split_2", b"row_split_3"] +JOY_EMOJI = "\N{FACE WITH TEARS OF JOY}" + +PASS_ALL_FILTER = row_filters.PassAllFilter(True) +BLOCK_ALL_FILTER = row_filters.BlockAllFilter(True) + +READ_ROWS_METHOD_NAME = "/google.bigtable.v2.Bigtable/ReadRows" + + +class ReadRowsErrorInjector(UnaryStreamClientInterceptor): + """An error injector that can be configured to raise errors for the ReadRows method. + + The error injector is configured to inject errors off the self.errors_to_inject queue. + Exceptions can be configured to arise either during stream initialization or in the middle + of a stream. If no errors are in the error injection queue, the ReadRows RPC call will behave + normally. + """ + + def __init__(self): + self.errors_to_inject = [] + + def clear(self): + self.errors_to_inject.clear() + + @staticmethod + def make_exception( + status_code, message=None, fail_mid_stream=False, successes_before_fail=0 + ): + # successes_before_fail allows us to test injecting failures mid-iterator iteration. + exc = RpcError(status_code) + exc.code = lambda: status_code + + _, status_message = status_code.value + exc.details = lambda: message if message else status_message + + exc.initial_metadata = lambda: [] + exc.trailing_metadata = lambda: [] + exc.fail_mid_stream = fail_mid_stream + exc.successes_before_fail = successes_before_fail + + def _result(): + raise exc + + exc.result = _result + return exc + + def intercept_unary_stream(self, continuation, client_call_details, request): + if ( + client_call_details.method == READ_ROWS_METHOD_NAME + and self.errors_to_inject + ): + error = self.errors_to_inject.pop(0) + if not error.fail_mid_stream: + raise error + + response = continuation(client_call_details, request) + if error.fail_mid_stream: + + class CallWrapper: + def __init__(self, call, exc_to_raise): + self._call = call + self._exc = exc_to_raise + self._successes_remaining = exc_to_raise.successes_before_fail + self._raised = False + + def __iter__(self): + return self + + def __next__(self): + if not self._raised and self._successes_remaining == 0: + self._raised = True + if self._exc: + raise self._exc + + else: + if self._successes_remaining > 0: + self._successes_remaining -= 1 + return self._call.__next__() + + def __getattr__(self, name): + return getattr(self._call, name) + + return CallWrapper(response, error) + else: + return continuation(client_call_details, request) + @pytest.fixture(scope="module") def data_table_id(): @@ -38,7 +142,7 @@ def data_table_id(): @pytest.fixture(scope="module") def data_table(data_instance_populated, data_table_id): table = data_instance_populated.table(data_table_id) - table.create() + table.create(initial_split_keys=INITIAL_ROW_SPLITS) table.column_family(COLUMN_FAMILY_ID1).create() table.column_family(COLUMN_FAMILY_ID2).create() @@ -59,6 +163,50 @@ def rows_to_delete(): row.commit() +@pytest.fixture(scope="module") +def data_table_read_rows_retry_tests_setup(data_table): + row_keys = [f"row_key_{i}".encode() for i in range(0, 32)] + columns = [f"col_{i}".encode() for i in range(0, 32)] + + # Set up the error injector here + data_client = data_table._instance._client.table_data_client + error_injector = ReadRowsErrorInjector() + old_logged_channel = data_client.transport._logged_channel + data_client.transport._logged_channel = intercept_channel( + old_logged_channel, error_injector + ) + data_table.error_injector = error_injector + data_client.transport._stubs = {} + data_client.transport._prep_wrapped_messages(None) + + # Need to add this here as a class level teardown since rows_to_delete + # is a function level fixture. + rows_to_delete = [] + + try: + _populate_table( + data_table, rows_to_delete, row_keys, columns, CELL_VAL_READ_ROWS_RETRY + ) + yield data_table + finally: + del data_table.error_injector + data_client.transport._logged_channel = old_logged_channel + data_client.transport._stubs = {} + data_client.transport._prep_wrapped_messages(None) + + for row in rows_to_delete: + row.clear() + row.delete() + row.commit() + + +@pytest.fixture(scope="function") +def data_table_read_rows_retry_tests(data_table_read_rows_retry_tests_setup): + yield data_table_read_rows_retry_tests_setup + + data_table_read_rows_retry_tests_setup.error_injector.clear() + + def test_table_read_rows_filter_millis(data_table): from google.cloud.bigtable import row_filters @@ -99,10 +247,146 @@ def test_table_mutate_rows(data_table, rows_to_delete): assert row2_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == CELL_VAL4 -def _populate_table(data_table, rows_to_delete, row_keys): +def _add_test_error_handler(retry): + """Overwrites the current on_error function to assert that backoff values are within expected bounds.""" + import time + + curr_time = time.monotonic() + times_triggered = 0 + + # Assert that the retry handler works properly. + def test_error_handler(exc): + nonlocal curr_time, times_triggered, retry + next_time = time.monotonic() + if times_triggered >= 1: + gap = next_time - curr_time + + # Exponential backoff = uniform randomness from 0 to max_gap + max_gap = min( + retry._initial * retry._multiplier**times_triggered, + retry._maximum, + ) + assert gap <= max_gap + times_triggered += 1 + curr_time = next_time + + retry._on_error = test_error_handler + + +def test_table_mutate_rows_retries_timeout(data_table, rows_to_delete): + import mock + import copy + from google.api_core import retry as retries + from google.api_core.exceptions import InvalidArgument + from google.cloud.bigtable_v2 import MutateRowsResponse + from google.cloud.bigtable.table import DEFAULT_RETRY, _BigtableRetryableError + from google.rpc.code_pb2 import Code + from google.rpc.status_pb2 import Status + + # Simulate a server error on row 2, and a normal response on row 1, followed by a bunch of error + # responses on row 2 + initial_error_response = [ + MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry(), + MutateRowsResponse.Entry( + index=1, + status=Status( + code=Code.INTERNAL, + message="Test error", + ), + ), + ] + ) + ] + + followup_error_response = [ + MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + status=Status( + code=Code.INTERNAL, + message="Test error", + ) + ) + ] + ) + ] + + final_success_response = [MutateRowsResponse(entries=[MutateRowsResponse.Entry()])] + + with mock.patch.object( + data_table._instance._client.table_data_client, "mutate_rows" + ) as mutate_mock: + mutate_mock.side_effect = [ + initial_error_response, + followup_error_response, + followup_error_response, + final_success_response, + ] + + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + row_2 = data_table.direct_row(ROW_KEY_ALT) + rows_to_delete.append(row_2) + row_2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + # Testing the default retry + default_retry_copy = copy.copy(DEFAULT_RETRY) + _add_test_error_handler(default_retry_copy) + statuses = data_table.mutate_rows([row, row_2], retry=default_retry_copy) + assert statuses[0].code == Code.OK + assert statuses[1].code == Code.OK + + # Simulate only server failures for row 2. + with mock.patch.object( + data_table._instance._client.table_data_client, "mutate_rows" + ) as mutate_mock: + mutate_mock.side_effect = [initial_error_response] + [ + followup_error_response + ] * 1000000 + + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + row_2 = data_table.direct_row(ROW_KEY_ALT) + rows_to_delete.append(row_2) + row_2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + # Testing the default retry + default_retry_copy = copy.copy(DEFAULT_RETRY) + _add_test_error_handler(default_retry_copy) + statuses = data_table.mutate_rows([row, row_2], retry=default_retry_copy) + assert statuses[0].code == Code.OK + assert statuses[1].code == Code.INTERNAL + + # Because of the way the retriable mutate worker class works, unusual things can happen + # when passing in custom retry predicates. + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + + row_2 = data_table.direct_row(ROW_KEY_ALT) + rows_to_delete.append(row_2) + + retry = DEFAULT_RETRY.with_predicate( + retries.if_exception_type(_BigtableRetryableError, InvalidArgument) + ) + _add_test_error_handler(retry) + statuses = data_table.mutate_rows([row, row_2], retry=retry) + assert statuses[0] is None + assert statuses[1] is None + + +def _populate_table( + data_table, rows_to_delete, row_keys, columns=[COL_NAME1], cell_value=CELL_VAL1 +): for row_key in row_keys: row = data_table.direct_row(row_key) - row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + for column in columns: + row.set_cell(COLUMN_FAMILY_ID1, column, cell_value) row.commit() rows_to_delete.append(row) @@ -157,6 +441,52 @@ def test_table_drop_by_prefix(data_table, rows_to_delete): assert expected_rows_count == found_rows_count +def test_table_mutate_rows_integers(data_table, rows_to_delete): + row = data_table.direct_row(ROW_KEY) + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + row.commit() + rows_to_delete.append(row) + + # Change the contents to an integer + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, INT_CELL_VAL) + statuses = data_table.mutate_rows([row]) + assert len(statuses) == 1 + for status in statuses: + assert status.code == 0 + + # Check the contents + row1_data = data_table.read_row(ROW_KEY) + assert ( + int.from_bytes( + row1_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, byteorder="big" + ) + == INT_CELL_VAL + ) + + +def test_table_mutate_rows_input_errors(data_table, rows_to_delete): + from google.api_core.exceptions import InvalidArgument + from google.cloud.bigtable.table import TooManyMutationsError, _MAX_BULK_MUTATIONS + + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + + # Mutate row with 0 mutations gives an API error from the service, not + # from the client library. + with pytest.raises(InvalidArgument): + data_table.mutate_rows([row]) + + row.clear() + + # Mutate row with >100k mutations gives a TooManyMutationsError from the + # client library. + for _ in range(0, _MAX_BULK_MUTATIONS + 1): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + with pytest.raises(TooManyMutationsError): + data_table.mutate_rows([row]) + + def test_table_read_rows_w_row_set(data_table, rows_to_delete): from google.cloud.bigtable.row_set import RowSet from google.cloud.bigtable.row_set import RowRange @@ -377,6 +707,513 @@ def test_read_with_label_applied(data_table, rows_to_delete, skip_on_emulator): assert cell3_new.labels == [label2] +def _assert_data_table_read_rows_retry_correct(rows_data): + for row_num in range(0, 32): + row = rows_data.rows[f"row_key_{row_num}".encode()] + for col_num in range(0, 32): + assert ( + row.cells[COLUMN_FAMILY_ID1][f"col_{col_num}".encode()][0].value + == CELL_VAL_READ_ROWS_RETRY + ) + + +def test_table_read_rows_retry_unretriable_error_establishing_stream( + data_table_read_rows_retry_tests, +): + from google.api_core import exceptions + + error_injector = data_table_read_rows_retry_tests.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False) + ] + + with pytest.raises(exceptions.Aborted): + data_table_read_rows_retry_tests.read_rows() + + +def test_table_read_rows_retry_retriable_error_establishing_stream( + data_table_read_rows_retry_tests, +): + error_injector = data_table_read_rows_retry_tests.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False + ) + ] * 3 + + rows_data = data_table_read_rows_retry_tests.read_rows() + rows_data.consume_all() + + _assert_data_table_read_rows_retry_correct(rows_data) + + +def test_table_read_rows_retry_unretriable_error_mid_stream( + data_table_read_rows_retry_tests, +): + from google.api_core import exceptions + + error_injector = data_table_read_rows_retry_tests.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.DATA_LOSS, fail_mid_stream=True, successes_before_fail=5 + ) + ] + + rows_data = data_table_read_rows_retry_tests.read_rows() + with pytest.raises(exceptions.DataLoss): + rows_data.consume_all() + + +def test_table_read_rows_retry_retriable_errors_mid_stream( + data_table_read_rows_retry_tests, +): + error_injector = data_table_read_rows_retry_tests.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=4 + ), + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0 + ), + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=0 + ), + ] + + rows_data = data_table_read_rows_retry_tests.read_rows() + rows_data.consume_all() + + _assert_data_table_read_rows_retry_correct(rows_data) + + +def test_table_read_rows_retry_retriable_internal_errors_mid_stream( + data_table_read_rows_retry_tests, +): + from google.cloud.bigtable.row_data import RETRYABLE_INTERNAL_ERROR_MESSAGES + + error_injector = data_table_read_rows_retry_tests.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + fail_mid_stream=True, + successes_before_fail=2, + ), + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[1], + fail_mid_stream=True, + successes_before_fail=1, + ), + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[2], + fail_mid_stream=True, + successes_before_fail=0, + ), + ] + + rows_data = data_table_read_rows_retry_tests.read_rows() + rows_data.consume_all() + + _assert_data_table_read_rows_retry_correct(rows_data) + + +def test_table_read_rows_retry_unretriable_internal_errors_mid_stream( + data_table_read_rows_retry_tests, +): + from google.api_core import exceptions + + error_injector = data_table_read_rows_retry_tests.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.INTERNAL, + message="Don't retry this at home!", + fail_mid_stream=True, + successes_before_fail=2, + ), + ] + + rows_data = data_table_read_rows_retry_tests.read_rows() + with pytest.raises(exceptions.InternalServerError): + rows_data.consume_all() + + +def test_table_read_rows_retry_retriable_error_mid_stream_unretriable_error_reestablishing_stream( + data_table_read_rows_retry_tests, +): + # Simulate a connection failure mid-stream into an unretriable error when trying to reconnect. + from google.api_core import exceptions + + error_injector = data_table_read_rows_retry_tests.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5 + ), + error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False), + ] + + rows_data = data_table_read_rows_retry_tests.read_rows() + + with pytest.raises(exceptions.Aborted): + rows_data.consume_all() + + +def test_table_read_rows_retry_retriable_error_mid_stream_retriable_error_reestablishing_stream( + data_table_read_rows_retry_tests, +): + # Simulate a connection failure mid-stream into retriable errors when trying to reconnect. + error_injector = data_table_read_rows_retry_tests.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5 + ), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), + error_injector.make_exception(StatusCode.UNAVAILABLE, fail_mid_stream=False), + ] + + rows_data = data_table_read_rows_retry_tests.read_rows() + rows_data.consume_all() + + _assert_data_table_read_rows_retry_correct(rows_data) + + +def test_table_read_rows_retry_timeout_mid_stream( + data_table_read_rows_retry_tests, +): + # Simulate a read timeout mid stream. + + from google.api_core import exceptions + from google.cloud.bigtable.row_data import ( + DEFAULT_RETRY_READ_ROWS, + RETRYABLE_INTERNAL_ERROR_MESSAGES, + ) + + error_injector = data_table_read_rows_retry_tests.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + fail_mid_stream=True, + successes_before_fail=5, + ), + ] + [ + error_injector.make_exception( + StatusCode.INTERNAL, + message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + fail_mid_stream=True, + successes_before_fail=0, + ), + ] * 20 + + # Shorten the deadline so the timeout test is shorter. + rows_data = data_table_read_rows_retry_tests.read_rows( + retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) + ) + with pytest.raises(exceptions.RetryError): + rows_data.consume_all() + + +def test_table_read_rows_retry_timeout_establishing_stream( + data_table_read_rows_retry_tests, +): + # Simulate a read timeout when creating the stream. + + from google.api_core import exceptions + from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS + + error_injector = data_table_read_rows_retry_tests.error_injector + error_injector.errors_to_inject = [ + error_injector.make_exception( + StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False + ), + ] + [ + error_injector.make_exception( + StatusCode.DEADLINE_EXCEEDED, fail_mid_stream=False + ), + ] * 20 + + # Shorten the deadline so the timeout test is shorter. + with pytest.raises(exceptions.RetryError): + data_table_read_rows_retry_tests.read_rows( + retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) + ) + + +def test_table_check_and_mutate_rows(data_table, rows_to_delete): + true_mutation_row_key = b"true_row" + false_mutation_row_key = b"false_row" + + columns = [ + b"col_1", + b"col_2", + b"col_3", + b"col_4", + b"col_pr_1", + b"col_pr_2", + ] + _populate_table( + data_table, + rows_to_delete, + [true_mutation_row_key, false_mutation_row_key], + columns=columns, + ) + true_row = data_table.conditional_row(true_mutation_row_key, PASS_ALL_FILTER) + for column in columns: + true_row.set_cell(COLUMN_FAMILY_ID1, column, CELL_VAL_TRUE, state=True) + true_row.set_cell(COLUMN_FAMILY_ID1, column, CELL_VAL_FALSE, state=False) + matched = true_row.commit() + assert matched + + false_row = data_table.conditional_row(false_mutation_row_key, BLOCK_ALL_FILTER) + for column in columns: + false_row.set_cell(COLUMN_FAMILY_ID1, column, CELL_VAL_TRUE, state=True) + false_row.delete_cell(COLUMN_FAMILY_ID1, column, state=False) + matched = false_row.commit() + assert not matched + + row1_data = data_table.read_row(true_mutation_row_key) + for column in columns: + assert row1_data.cells[COLUMN_FAMILY_ID1][column][0].value == CELL_VAL_TRUE + + row2_data = data_table.read_row(false_mutation_row_key) + assert row2_data is None # all cells should be deleted + + +def test_table_append_row(data_table, rows_to_delete): + row = data_table.append_row(ROW_KEY) + rows_to_delete.append(data_table.direct_row(ROW_KEY)) + + int_col_name = b"int_col" + str_col_name = b"str_col" + num_increments = 100 + + row.append_cell_value(COLUMN_FAMILY_ID1, str_col_name, b"foo") + row.append_cell_value(COLUMN_FAMILY_ID1, str_col_name, b"bar") + + # Column names are convertible to byte strings provided it's a valid ascii string. + row.append_cell_value(COLUMN_FAMILY_ID1, str_col_name.decode("ascii"), b"baz") + + for _ in range(0, num_increments): + row.increment_cell_value(COLUMN_FAMILY_ID1, int_col_name, 1) + + row.commit() + + row_data = data_table.read_row(ROW_KEY) + assert row_data.cells[COLUMN_FAMILY_ID1][int_col_name][ + 0 + ].value == num_increments.to_bytes(8, byteorder="big", signed=True) + assert row_data.cells[COLUMN_FAMILY_ID1][str_col_name][0].value == b"foobarbaz" + + +def test_table_sample_row_keys(data_table, skip_on_emulator): + # Skip on emulator because it gives a random response. + + # sample_row_keys returns a generator + response = list(data_table.sample_row_keys()) + previous_offset_bytes = 0 + for idx in range(len(INITIAL_ROW_SPLITS)): + assert response[idx].row_key == INITIAL_ROW_SPLITS[idx] + + offset_bytes = response[idx].offset_bytes + assert isinstance(offset_bytes, int) + assert offset_bytes >= previous_offset_bytes + + previous_offset_bytes = offset_bytes + assert response[-1].row_key == b"" + assert isinstance(response[-1].offset_bytes, int) + assert response[-1].offset_bytes >= previous_offset_bytes + + +def test_table_direct_row_input_errors(data_table, rows_to_delete): + from google.api_core.exceptions import InvalidArgument + from google.cloud.bigtable.row import MAX_MUTATIONS + + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + + # Column names are converted to bytes successfully if they're ASCII strings + # or bytes already. + with pytest.raises(TypeError): + row.set_cell(COLUMN_FAMILY_ID1, INT_COL_NAME, CELL_VAL1) + + with pytest.raises(TypeError): + row.delete_cell(COLUMN_FAMILY_ID1, INT_COL_NAME) + + # Unicode for column name and value does not get converted to bytes because + # internally we use to_bytes in ascii mode. + with pytest.raises(UnicodeEncodeError): + row.set_cell(COLUMN_FAMILY_ID1, JOY_EMOJI, CELL_VAL1) + + with pytest.raises(UnicodeEncodeError): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, JOY_EMOJI) + + with pytest.raises(UnicodeEncodeError): + row.delete_cell(COLUMN_FAMILY_ID1, JOY_EMOJI) + + # Various non int64s, we use struct to pack a Python int to bytes. + with pytest.raises(struct.error): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL) + + with pytest.raises(struct.error): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL2) + + # Since floats aren't ints, they aren't converted to bytes via struct.pack, + # but via _to_bytes, so you get a TypeError instead. + with pytest.raises(TypeError): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, FLOAT_CELL_VAL) + + # Can't have more than MAX_MUTATIONS mutations, but only enforced after + # a row.commit + row.clear() + for _ in range(0, MAX_MUTATIONS + 1): + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + with pytest.raises(ValueError): + row.commit() + + # Not having any mutations gives a server error (InvalidArgument), not + # enforced on the client side. + row.clear() + with pytest.raises(InvalidArgument): + row.commit() + + +def test_table_conditional_row_input_errors(data_table, rows_to_delete): + from google.cloud.bigtable.row import MAX_MUTATIONS + + rows = [ROW_KEY, ROW_KEY_ALT] + columns = [COL_NAME1] + + _populate_table(data_table, rows_to_delete, rows, columns=columns) + + true_row = data_table.conditional_row(ROW_KEY, PASS_ALL_FILTER) + false_row = data_table.conditional_row(ROW_KEY_ALT, BLOCK_ALL_FILTER) + rows_to_delete.append(true_row) + rows_to_delete.append(false_row) + + # Column names are converted to bytes successfully if they're ASCII strings + # or bytes already. + with pytest.raises(TypeError): + true_row.set_cell(COLUMN_FAMILY_ID1, INT_COL_NAME, CELL_VAL1) + + with pytest.raises(TypeError): + true_row.delete_cell(COLUMN_FAMILY_ID1, INT_COL_NAME) + + # Unicode for column name and value does not get converted to bytes because + # internally we use to_bytes in ascii mode. + with pytest.raises(UnicodeEncodeError): + true_row.set_cell(COLUMN_FAMILY_ID1, JOY_EMOJI, CELL_VAL1) + + with pytest.raises(UnicodeEncodeError): + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, JOY_EMOJI) + + with pytest.raises(UnicodeEncodeError): + true_row.delete_cell(COLUMN_FAMILY_ID1, JOY_EMOJI) + + # Various non int64s, we use struct to pack a Python int to bytes. + with pytest.raises(struct.error): + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL) + + with pytest.raises(struct.error): + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL2) + + # Since floats aren't ints, they aren't converted to bytes via struct.pack, + # but via _to_bytes, so you get a TypeError instead. + with pytest.raises(TypeError): + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, FLOAT_CELL_VAL) + + # Can't have more than MAX_MUTATIONS mutations, but only enforced after + # a row.commit + true_row.clear() + for _ in range(0, MAX_MUTATIONS + 1): + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + with pytest.raises(ValueError): + true_row.commit() + + true_row.clear() + + # State could be anything, but it is evaluated to a boolean later. + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1, state=0) + true_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2, state=1) + true_row.commit() + + false_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1, state="") + false_row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2, state="true_state") + false_row.commit() + + true_row_data = data_table.read_row(true_row.row_key) + assert true_row_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == CELL_VAL2 + + false_row_data = data_table.read_row(false_row.row_key) + assert false_row_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == CELL_VAL1 + + # Not having any mutations is enforced client-side for conditional row; nothing happens. + true_row.clear() + true_row.commit() + + false_row.clear() + false_row.commit() + + +def test_table_append_row_input_errors(data_table, rows_to_delete): + from google.cloud.bigtable.row import MAX_MUTATIONS + + row = data_table.append_row(ROW_KEY) + rows_to_delete.append(data_table.direct_row(ROW_KEY)) + + # Column names should be convertible to bytes (str or bytes) + with pytest.raises(TypeError): + row.append_cell_value(COLUMN_FAMILY_ID1, INT_COL_NAME, CELL_VAL1) + + with pytest.raises(TypeError): + row.increment_cell_value(COLUMN_FAMILY_ID1, INT_COL_NAME, 1) + + # Unicode for column name and value + with pytest.raises(UnicodeEncodeError): + row.append_cell_value(COLUMN_FAMILY_ID1, JOY_EMOJI, CELL_VAL1) + + with pytest.raises(UnicodeEncodeError): + row.append_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, JOY_EMOJI) + + with pytest.raises(UnicodeEncodeError): + row.increment_cell_value(COLUMN_FAMILY_ID1, JOY_EMOJI, 1) + + # Non-integer cell values for increment_cell_value + with pytest.raises(ValueError): + row.increment_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, OVERFLOW_INT_CELL_VAL) + + # increment_cell_value does not do input validation on the int_value, instead using + # proto-plus to do validation. + row.increment_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, FLOAT_CELL_VAL) + row.increment_cell_value(COLUMN_FAMILY_ID1, COL_NAME2, FLOAT_CELL_VAL2) + row.commit() + + row_data = data_table.read_row(ROW_KEY) + assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == int( + FLOAT_CELL_VAL + ).to_bytes(8, byteorder="big", signed=True) + assert row_data.cells[COLUMN_FAMILY_ID1][COL_NAME2][0].value == int( + FLOAT_CELL_VAL2 + ).to_bytes(8, byteorder="big", signed=True) + + # Can't have more than MAX_MUTATIONS mutations, but only enforced after + # a row.commit + row.clear() + for _ in range(0, MAX_MUTATIONS + 1): + row.append_cell_value(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + with pytest.raises(ValueError): + row.commit() + + # Not having any mutations gives a response of empty dict. + row.clear() + response = row.commit() + assert response == {} + + def test_access_with_non_admin_client(data_client, data_instance_id, data_table_id): instance = data_client.instance(data_instance_id) table = instance.table(data_table_id)