From 2f5fa5416b9a670061614daf10762d2756d0ae28 Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Wed, 1 Feb 2023 09:36:39 +0000 Subject: [PATCH 01/12] proto changes --- google/cloud/spanner_v1/types/spanner.py | 20 ++++++++++++++++++++ scripts/fixup_spanner_v1_keywords.py | 8 ++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/google/cloud/spanner_v1/types/spanner.py b/google/cloud/spanner_v1/types/spanner.py index b8b960c198..aaab98930f 100644 --- a/google/cloud/spanner_v1/types/spanner.py +++ b/google/cloud/spanner_v1/types/spanner.py @@ -474,6 +474,12 @@ class ExecuteSqlRequest(proto.Message): given query. request_options (google.cloud.spanner_v1.types.RequestOptions): Common options for this request. + data_boost_enabled (bool): + If this is for a partitioned query and this field is set to + ``true``, the request will be executed via offline access. + If the field is set to ``true`` but the request does not set + ``partition_token``, the API will return an + ``INVALID_ARGUMENT`` error. """ class QueryMode(proto.Enum): @@ -613,6 +619,10 @@ class QueryOptions(proto.Message): number=11, message="RequestOptions", ) + data_boost_enabled: bool = proto.Field( + proto.BOOL, + number=14, + ) class ExecuteBatchDmlRequest(proto.Message): @@ -1123,6 +1133,12 @@ class ReadRequest(proto.Message): create this partition_token. request_options (google.cloud.spanner_v1.types.RequestOptions): Common options for this request. + data_boost_enabled (bool): + If this is for a partitioned read and this field is set to + ``true``, the request will be executed via offline access. + If the field is set to ``true`` but the request does not set + ``partition_token``, the API will return an + ``INVALID_ARGUMENT`` error. """ session: str = proto.Field( @@ -1168,6 +1184,10 @@ class ReadRequest(proto.Message): number=11, message="RequestOptions", ) + data_boost_enabled: bool = proto.Field( + proto.BOOL, + number=13, + ) class BeginTransactionRequest(proto.Message): diff --git a/scripts/fixup_spanner_v1_keywords.py b/scripts/fixup_spanner_v1_keywords.py index ed532c0d8f..b897807106 100644 --- a/scripts/fixup_spanner_v1_keywords.py +++ b/scripts/fixup_spanner_v1_keywords.py @@ -45,15 +45,15 @@ class spannerCallTransformer(cst.CSTTransformer): 'create_session': ('database', 'session', ), 'delete_session': ('name', ), 'execute_batch_dml': ('session', 'transaction', 'statements', 'seqno', 'request_options', ), - 'execute_sql': ('session', 'sql', 'transaction', 'params', 'param_types', 'resume_token', 'query_mode', 'partition_token', 'seqno', 'query_options', 'request_options', ), - 'execute_streaming_sql': ('session', 'sql', 'transaction', 'params', 'param_types', 'resume_token', 'query_mode', 'partition_token', 'seqno', 'query_options', 'request_options', ), + 'execute_sql': ('session', 'sql', 'transaction', 'params', 'param_types', 'resume_token', 'query_mode', 'partition_token', 'seqno', 'query_options', 'request_options', 'data_boost_enabled', ), + 'execute_streaming_sql': ('session', 'sql', 'transaction', 'params', 'param_types', 'resume_token', 'query_mode', 'partition_token', 'seqno', 'query_options', 'request_options', 'data_boost_enabled', ), 'get_session': ('name', ), 'list_sessions': ('database', 'page_size', 'page_token', 'filter', ), 'partition_query': ('session', 'sql', 'transaction', 'params', 'param_types', 'partition_options', ), 'partition_read': ('session', 'table', 'key_set', 'transaction', 'index', 'columns', 'partition_options', ), - 'read': ('session', 'table', 'columns', 'key_set', 'transaction', 'index', 'limit', 'resume_token', 'partition_token', 'request_options', ), + 'read': ('session', 'table', 'columns', 'key_set', 'transaction', 'index', 'limit', 'resume_token', 'partition_token', 'request_options', 'data_boost_enabled', ), 'rollback': ('session', 'transaction_id', ), - 'streaming_read': ('session', 'table', 'columns', 'key_set', 'transaction', 'index', 'limit', 'resume_token', 'partition_token', 'request_options', ), + 'streaming_read': ('session', 'table', 'columns', 'key_set', 'transaction', 'index', 'limit', 'resume_token', 'partition_token', 'request_options', 'data_boost_enabled', ), } def leave_Call(self, original: cst.Call, updated: cst.Call) -> cst.CSTNode: From bb28dac6a542f946b41260ab78a23459e79618ea Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Thu, 2 Feb 2023 13:45:06 +0530 Subject: [PATCH 02/12] changes --- google/cloud/spanner_v1/database.py | 8 ++- google/cloud/spanner_v1/snapshot.py | 4 ++ google/cloud/spanner_v1/types/spanner.py | 8 +-- tests/system/test_session_api.py | 51 ++++++++++++++++ tests/unit/test_database.py | 77 +++++++++++++++++++++++- 5 files changed, 142 insertions(+), 6 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index f919fa2c5e..37f8d30819 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -1101,6 +1101,7 @@ def generate_read_batches( index="", partition_size_bytes=None, max_partitions=None, + databoost_enabled=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1162,6 +1163,7 @@ def generate_read_batches( "columns": columns, "keyset": keyset._to_dict(), "index": index, + "databoost_enabled": databoost_enabled, } for partition in partitions: yield {"partition": partition, "read": read_info.copy()} @@ -1205,6 +1207,7 @@ def generate_query_batches( partition_size_bytes=None, max_partitions=None, query_options=None, + databoost_enabled=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1272,7 +1275,10 @@ def generate_query_batches( timeout=timeout, ) - query_info = {"sql": sql} + query_info = { + "sql": sql, + "databoost_enabled": databoost_enabled, + } if params: query_info["params"] = params query_info["param_types"] = param_types diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index f1fff8b533..94823293ea 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -167,6 +167,7 @@ def read( limit=0, partition=None, request_options=None, + databoost_enabled=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -247,6 +248,7 @@ def read( limit=limit, partition_token=partition, request_options=request_options, + serverless_analytics_enabled=databoost_enabled, ) restart = functools.partial( api.streaming_read, @@ -302,6 +304,7 @@ def execute_sql( partition=None, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, + databoost_enabled=False, ): """Perform an ``ExecuteStreamingSql`` API request. @@ -400,6 +403,7 @@ def execute_sql( seqno=self._execute_sql_count, query_options=query_options, request_options=request_options, + serverless_analytics_enabled=databoost_enabled, ) restart = functools.partial( api.execute_streaming_sql, diff --git a/google/cloud/spanner_v1/types/spanner.py b/google/cloud/spanner_v1/types/spanner.py index aaab98930f..c44995e134 100644 --- a/google/cloud/spanner_v1/types/spanner.py +++ b/google/cloud/spanner_v1/types/spanner.py @@ -474,7 +474,7 @@ class ExecuteSqlRequest(proto.Message): given query. request_options (google.cloud.spanner_v1.types.RequestOptions): Common options for this request. - data_boost_enabled (bool): + databoost_enabled (bool): If this is for a partitioned query and this field is set to ``true``, the request will be executed via offline access. If the field is set to ``true`` but the request does not set @@ -619,7 +619,7 @@ class QueryOptions(proto.Message): number=11, message="RequestOptions", ) - data_boost_enabled: bool = proto.Field( + serverless_analytics_enabled: bool = proto.Field( proto.BOOL, number=14, ) @@ -1133,7 +1133,7 @@ class ReadRequest(proto.Message): create this partition_token. request_options (google.cloud.spanner_v1.types.RequestOptions): Common options for this request. - data_boost_enabled (bool): + databoost_enabled (bool): If this is for a partitioned read and this field is set to ``true``, the request will be executed via offline access. If the field is set to ``true`` but the request does not set @@ -1184,7 +1184,7 @@ class ReadRequest(proto.Message): number=11, message="RequestOptions", ) - data_boost_enabled: bool = proto.Field( + serverless_analytics_enabled: bool = proto.Field( proto.BOOL, number=13, ) diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 6b7afbe525..47a0707aaa 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -1894,6 +1894,38 @@ def test_partition_read_w_index(sessions_database): assert union == expected batch_txn.close() +import pdb + +def test_partition_read_w_databoost_enabled(sessions_database): + pdb.set_trace() + sd = _sample_data + row_count = 10 + columns = sd.COLUMNS[1], sd.COLUMNS[2] + committed = _set_up_table(sessions_database, row_count) + + expected = [[row[1], row[2]] for row in _row_data(row_count)] + union = [] + + batch_txn = sessions_database.batch_snapshot(read_timestamp=committed) + batches = batch_txn.generate_read_batches( + sd.TABLE, columns, spanner_v1.KeySet(all_=True), databoost_enabled=True + ) + for batch in batches: + p_results_iter = batch_txn.process(batch) + union.extend(list(p_results_iter)) + + assert union == expected + batch_txn.close() + + +def test_execute_sql_invalid_arguement_error_w_databoost_enabled(sessions_database): + pdb.set_trace() + sd = _sample_data + row_count = 40 + _set_up_table(sessions_database, row_count) + with pytest.raises(exceptions.InvalidArgument): + with sessions_database.snapshot() as snapshot: + list(snapshot.execute_sql(sd.SQL, databoost_enabled=True)) def test_execute_sql_w_manual_consume(sessions_database): @@ -2513,6 +2545,25 @@ def test_partition_query(sessions_database): batch_txn.close() +def test_partition_query_w_databoost_enabled(sessions_database): + pdb.set_trace() + row_count = 40 + sql = f"SELECT * FROM {_sample_data.TABLE}" + + # Paritioned query does not support ORDER BY + all_data_rows = set(_row_data(row_count)) + union = set() + batch_txn = sessions_database.batch_snapshot() + for batch in batch_txn.generate_query_batches(sql, databoost_enabled=True): + p_results_iter = batch_txn.process(batch) + # Lists aren't hashable so the results need to be converted + rows = [tuple(result) for result in p_results_iter] + union.update(set(rows)) + + assert union == all_data_rows + batch_txn.close() + + class FauxCall: def __init__(self, code, details="FauxCall"): self._code = code diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index bff89320c7..e8b5a7baec 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -2114,6 +2114,7 @@ def test_generate_read_batches_w_max_partitions(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": "", + "databoost_enabled": False, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2155,6 +2156,7 @@ def test_generate_read_batches_w_retry_and_timeout_params(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": "", + "databoost_enabled": False, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2195,6 +2197,7 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": self.INDEX, + "databoost_enabled": False, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2212,6 +2215,47 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): timeout=gapic_v1.method.DEFAULT, ) + def test_generate_read_batches_w_databoost_enabled(self): + databoost_enabled = True + keyset = self._make_keyset() + database = self._make_database() + batch_txn = self._make_one(database) + snapshot = batch_txn._snapshot = self._make_snapshot() + snapshot.partition_read.return_value = self.TOKENS + + batches = list( + batch_txn.generate_read_batches( + self.TABLE, + self.COLUMNS, + keyset, + index=self.INDEX, + databoost_enabled=databoost_enabled, + ) + ) + + expected_read = { + "table": self.TABLE, + "columns": self.COLUMNS, + "keyset": {"all": True}, + "index": self.INDEX, + "databoost_enabled": True, + } + self.assertEqual(len(batches), len(self.TOKENS)) + for batch, token in zip(batches, self.TOKENS): + self.assertEqual(batch["partition"], token) + self.assertEqual(batch["read"], expected_read) + + snapshot.partition_read.assert_called_once_with( + table=self.TABLE, + columns=self.COLUMNS, + keyset=keyset, + index=self.INDEX, + partition_size_bytes=None, + max_partitions=None, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + ) + def test_process_read_batch(self): keyset = self._make_keyset() token = b"TOKEN" @@ -2288,7 +2332,7 @@ def test_generate_query_batches_w_max_partitions(self): batch_txn.generate_query_batches(sql, max_partitions=max_partitions) ) - expected_query = {"sql": sql, "query_options": client._query_options} + expected_query = {"sql": sql, "databoost_enabled": False, "query_options": client._query_options} self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): self.assertEqual(batch["partition"], token) @@ -2326,6 +2370,7 @@ def test_generate_query_batches_w_params_w_partition_size_bytes(self): expected_query = { "sql": sql, + "databoost_enabled": False, "params": params, "param_types": param_types, "query_options": client._query_options, @@ -2372,6 +2417,7 @@ def test_generate_query_batches_w_retry_and_timeout_params(self): expected_query = { "sql": sql, + "databoost_enabled": False, "params": params, "param_types": param_types, "query_options": client._query_options, @@ -2391,6 +2437,35 @@ def test_generate_query_batches_w_retry_and_timeout_params(self): timeout=2.0, ) + def test_generate_query_batches_w_databoost_enabled(self): + sql = "SELECT COUNT(*) FROM table_name" + client = _Client(self.PROJECT_ID) + instance = _Instance(self.INSTANCE_NAME, client=client) + database = _Database(self.DATABASE_NAME, instance=instance) + batch_txn = self._make_one(database) + snapshot = batch_txn._snapshot = self._make_snapshot() + snapshot.partition_query.return_value = self.TOKENS + + batches = list( + batch_txn.generate_query_batches(sql, databoost_enabled=True) + ) + + expected_query = {"sql": sql, "databoost_enabled": True, "query_options": client._query_options} + self.assertEqual(len(batches), len(self.TOKENS)) + for batch, token in zip(batches, self.TOKENS): + self.assertEqual(batch["partition"], token) + self.assertEqual(batch["query"], expected_query) + + snapshot.partition_query.assert_called_once_with( + sql=sql, + params=None, + param_types=None, + partition_size_bytes=None, + max_partitions=None, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + ) + def test_process_query_batch(self): sql = ( "SELECT first_name, last_name, email FROM citizens " "WHERE age <= @max_age" From d026e8a4673bbec59b70ad5aafc1d2a56748e755 Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Fri, 3 Feb 2023 09:39:13 +0530 Subject: [PATCH 03/12] changes --- google/cloud/spanner_v1/database.py | 32 +++++++++++++-- google/cloud/spanner_v1/instance.py | 2 + google/cloud/spanner_v1/snapshot.py | 22 +++++++++++ samples/samples/batch_sample.py | 60 ++++++++++++++++++++++++++++- tests/system/test_session_api.py | 13 +------ tests/unit/test_database.py | 49 ++++++++++++++++------- tests/unit/test_snapshot.py | 21 ++++++++++ 7 files changed, 169 insertions(+), 30 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 37f8d30819..08a13c7f20 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -124,6 +124,9 @@ class Database(object): (Optional) database dialect for the database :type database_role: str or None :param database_role: (Optional) user-assigned database_role for the session. + :type databoost_enabled: bool + :param databoost_enabled: (Optional) for batch partitioned query if this field is + set ``true``, the request will be executed via offline access. """ _spanner_api = None @@ -138,6 +141,7 @@ def __init__( encryption_config=None, database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED, database_role=None, + databoost_enabled=False, ): self.database_id = database_id self._instance = instance @@ -155,6 +159,7 @@ def __init__( self._encryption_config = encryption_config self._database_dialect = database_dialect self._database_role = database_role + self._databoost_enabled = databoost_enabled if pool is None: pool = BurstyPool(database_role=database_role) @@ -328,6 +333,15 @@ def database_role(self): """ return self._database_role + @property + def databoost_enabled(self): + """(Optional) For batch partitioned query if this field is + set ``true``, the request will be executed via offline access. + :rtype: bool + :returns: a bool with the value if databoost is enabled. + """ + return self._databoost_enabled + @property def logger(self): """Logger used by the database. @@ -1101,7 +1115,7 @@ def generate_read_batches( index="", partition_size_bytes=None, max_partitions=None, - databoost_enabled=False, + databoost_enabled=None, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1136,6 +1150,11 @@ def generate_read_batches( service uses this as a hint, the actual number of partitions may differ. + :type databoost_enabled: + :param databoost_enabled: + (Optional) If this is for a partitioned query and this field is + set ``true``, the request will be executed via offline access. + :type retry: :class:`~google.api_core.retry.Retry` :param retry: (Optional) The retry settings for this request. @@ -1163,7 +1182,7 @@ def generate_read_batches( "columns": columns, "keyset": keyset._to_dict(), "index": index, - "databoost_enabled": databoost_enabled, + "databoost_enabled": databoost_enabled if databoost_enabled != None else self._database.databoost_enabled, } for partition in partitions: yield {"partition": partition, "read": read_info.copy()} @@ -1207,7 +1226,7 @@ def generate_query_batches( partition_size_bytes=None, max_partitions=None, query_options=None, - databoost_enabled=False, + databoost_enabled=None, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1253,6 +1272,11 @@ def generate_query_batches( (Optional) Query optimizer configuration to use for the given query. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.QueryOptions` + + :type databoost_enabled: + :param databoost_enabled: + (Optional) If this is for a partitioned query and this field is + set ``true``, the request will be executed via offline access. :type retry: :class:`~google.api_core.retry.Retry` :param retry: (Optional) The retry settings for this request. @@ -1277,7 +1301,7 @@ def generate_query_batches( query_info = { "sql": sql, - "databoost_enabled": databoost_enabled, + "databoost_enabled": databoost_enabled if databoost_enabled != None else self._database.databoost_enabled, } if params: query_info["params"] = params diff --git a/google/cloud/spanner_v1/instance.py b/google/cloud/spanner_v1/instance.py index f972f817b3..981ed5328b 100644 --- a/google/cloud/spanner_v1/instance.py +++ b/google/cloud/spanner_v1/instance.py @@ -432,6 +432,7 @@ def database( encryption_config=None, database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED, database_role=None, + databoost_enabled=False ): """Factory to create a database within this instance. @@ -479,6 +480,7 @@ def database( encryption_config=encryption_config, database_dialect=database_dialect, database_role=database_role, + databoost_enabled=databoost_enabled, ) def list_databases(self, page_size=None): diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 94823293ea..76f8e33a87 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -211,6 +211,14 @@ def read( :type timeout: float :param timeout: (Optional) The timeout for this request. + :type databoost_enabled: + :param databoost_enabled: + (Optional) If this is for a partitioned query and this field is + set ``true``, the request will be executed via offline access. + If the field is set to ``true`` but the request does not set + ``partition_token``, the API will return an + ``INVALID_ARGUMENT`` error. + :rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet` :returns: a result set instance which can be used to consume rows. @@ -224,6 +232,9 @@ def read( if self._transaction_id is None and self._read_only: raise ValueError("Transaction ID pending.") + if not partition and databoost_enabled: + raise InvalidArgument("'databoost_enable' should only be set for batch queries") + database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) @@ -354,6 +365,14 @@ def execute_sql( :type timeout: float :param timeout: (Optional) The timeout for this request. + :type databoost_enabled: + :param databoost_enabled: + (Optional) If this is for a partitioned query and this field is + set ``true``, the request will be executed via offline access. + If the field is set to ``true`` but the request does not set + ``partition_token``, the API will return an + ``INVALID_ARGUMENT`` error. + :raises ValueError: for reuse of single-use snapshots, or if a transaction ID is already pending for multiple-use snapshots. @@ -364,6 +383,9 @@ def execute_sql( if self._transaction_id is None and self._read_only: raise ValueError("Transaction ID pending.") + if not partition and databoost_enabled: + raise InvalidArgument("'databoost_enable' should only be set for batch queries") + if params is not None: if param_types is None: raise ValueError("Specify 'param_types' when passing 'params'.") diff --git a/samples/samples/batch_sample.py b/samples/samples/batch_sample.py index 73d9f5667e..6b164eff2e 100644 --- a/samples/samples/batch_sample.py +++ b/samples/samples/batch_sample.py @@ -76,6 +76,61 @@ def process(snapshot, partition): # [END spanner_batch_client] +# [START spanner_databoost_enabled_batch_client] +def run_databoost_enabled_batch_query(instance_id, database_id): + """Runs an example batch query with enabled databoost_enabled option.""" + + # Expected Table Format: + # CREATE TABLE Singers ( + # SingerId INT64 NOT NULL, + # FirstName STRING(1024), + # LastName STRING(1024), + # SingerInfo BYTES(MAX), + # ) PRIMARY KEY (SingerId); + + spanner_client = spanner.Client() + instance = spanner_client.instance(instance_id) + # Databoost Enabled option can be set for all of batch queries + # for that database. If this is for a partitioned query and this field + # is set ``true``, the request will be executed via offline access. + database = instance.database(database_id, databoost_enabled=True) + + # When generating the partitions, databoost_enabled option can be passed. + snapshot = database.batch_snapshot() + partitions = snapshot.generate_read_batches( + table="Singers", + columns=("SingerId", "FirstName", "LastName"), + keyset=spanner.KeySet(all_=True), + databoost_enabled=True + ) + + # Create a pool of workers for the tasks + start = time.time() + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(process, snapshot, p) for p in partitions] + + for future in concurrent.futures.as_completed(futures, timeout=3600): + finish, row_ct = future.result() + elapsed = finish - start + print("Completed {} rows in {} seconds".format(row_ct, elapsed)) + + # Clean up + snapshot.close() + + +def process(snapshot, partition): + """Processes the requests of a query in an separate process.""" + print("Started processing partition.") + row_ct = 0 + for row in snapshot.process_read_batch(partition): + print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row)) + row_ct += 1 + return time.time(), row_ct + + +# [END spanner_databoost_enabled_batch_client] + + if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter @@ -87,4 +142,7 @@ def process(snapshot, partition): args = parser.parse_args() - run_batch_query(args.instance_id, args.database_id) + if args.command == "run_batch_query": + run_batch_query(args.instance_id, args.database_id) + elif args.command == "databoost_enabled_batch_query": + run_databoost_enabled_batch_query(args.instance_id, args.database_id) diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 47a0707aaa..2b2ded6a3b 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -1894,10 +1894,9 @@ def test_partition_read_w_index(sessions_database): assert union == expected batch_txn.close() -import pdb + def test_partition_read_w_databoost_enabled(sessions_database): - pdb.set_trace() sd = _sample_data row_count = 10 columns = sd.COLUMNS[1], sd.COLUMNS[2] @@ -1918,16 +1917,6 @@ def test_partition_read_w_databoost_enabled(sessions_database): batch_txn.close() -def test_execute_sql_invalid_arguement_error_w_databoost_enabled(sessions_database): - pdb.set_trace() - sd = _sample_data - row_count = 40 - _set_up_table(sessions_database, row_count) - with pytest.raises(exceptions.InvalidArgument): - with sessions_database.snapshot() as snapshot: - list(snapshot.execute_sql(sd.SQL, databoost_enabled=True)) - - def test_execute_sql_w_manual_consume(sessions_database): sd = _sample_data row_count = 3000 diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index e8b5a7baec..d96497f422 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -114,6 +114,7 @@ def test_ctor_defaults(self): # BurstyPool does not create sessions during 'bind()'. self.assertTrue(database._pool._sessions.empty()) self.assertIsNone(database.database_role) + self.assertFalse(database.databoost_enabled) def test_ctor_w_explicit_pool(self): instance = _Instance(self.INSTANCE_NAME) @@ -134,6 +135,15 @@ def test_ctor_w_database_role(self): self.assertIs(database._instance, instance) self.assertIs(database.database_role, self.DATABASE_ROLE) + def test_ctor_w_databoost_enabled(self): + instance = _Instance(self.INSTANCE_NAME) + database = self._make_one( + self.DATABASE_ID, instance, databoost_enabled=True + ) + self.assertEqual(database.database_id, self.DATABASE_ID) + self.assertIs(database._instance, instance) + self.assertTrue(database.databoost_enabled) + def test_ctor_w_ddl_statements_non_string(self): with self.assertRaises(ValueError): @@ -1923,7 +1933,7 @@ def _get_target_class(self): def _make_database(**kwargs): from google.cloud.spanner_v1.database import Database - return mock.create_autospec(Database, instance=True, **kwargs) + return mock.create_autospec(Database, instance=True, **kwargs) @staticmethod def _make_session(**kwargs): @@ -1975,7 +1985,6 @@ def test_ctor_w_exact_staleness(self): duration = self._make_duration() batch_txn = self._make_one(database, exact_staleness=duration) - self.assertIs(batch_txn._database, database) self.assertIsNone(batch_txn._session) self.assertIsNone(batch_txn._snapshot) @@ -2101,6 +2110,7 @@ def test_generate_read_batches_w_max_partitions(self): database = self._make_database() batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() + batch_txn._database.databoost_enabled=False snapshot.partition_read.return_value = self.TOKENS batches = list( @@ -2139,6 +2149,7 @@ def test_generate_read_batches_w_retry_and_timeout_params(self): batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() snapshot.partition_read.return_value = self.TOKENS + batch_txn._database.databoost_enabled=False retry = Retry(deadline=60) batches = list( batch_txn.generate_read_batches( @@ -2180,6 +2191,7 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): database = self._make_database() batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() + batch_txn._database.databoost_enabled=False snapshot.partition_read.return_value = self.TOKENS batches = list( @@ -2221,6 +2233,7 @@ def test_generate_read_batches_w_databoost_enabled(self): database = self._make_database() batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() + batch_txn._database.databoost_enabled=False snapshot.partition_read.return_value = self.TOKENS batches = list( @@ -2441,13 +2454,13 @@ def test_generate_query_batches_w_databoost_enabled(self): sql = "SELECT COUNT(*) FROM table_name" client = _Client(self.PROJECT_ID) instance = _Instance(self.INSTANCE_NAME, client=client) - database = _Database(self.DATABASE_NAME, instance=instance) + database = _Database(self.DATABASE_NAME, instance=instance, databoost_enabled=True) batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() snapshot.partition_query.return_value = self.TOKENS batches = list( - batch_txn.generate_query_batches(sql, databoost_enabled=True) + batch_txn.generate_query_batches(sql) ) expected_query = {"sql": sql, "databoost_enabled": True, "query_options": client._query_options} @@ -2456,16 +2469,25 @@ def test_generate_query_batches_w_databoost_enabled(self): self.assertEqual(batch["partition"], token) self.assertEqual(batch["query"], expected_query) - snapshot.partition_query.assert_called_once_with( - sql=sql, - params=None, - param_types=None, - partition_size_bytes=None, - max_partitions=None, - retry=gapic_v1.method.DEFAULT, - timeout=gapic_v1.method.DEFAULT, + def test_generate_query_batches_w_databoost_enabled_at_database(self): + sql = "SELECT COUNT(*) FROM table_name" + client = _Client(self.PROJECT_ID) + instance = _Instance(self.INSTANCE_NAME, client=client) + database = _Database(self.DATABASE_NAME, instance=instance, databoost_enabled=True) + batch_txn = self._make_one(database) + snapshot = batch_txn._snapshot = self._make_snapshot() + snapshot.partition_query.return_value = self.TOKENS + + batches = list( + batch_txn.generate_query_batches(sql) ) + expected_query = {"sql": sql, "databoost_enabled": True, "query_options": client._query_options} + self.assertEqual(len(batches), len(self.TOKENS)) + for batch, token in zip(batches, self.TOKENS): + self.assertEqual(batch["partition"], token) + self.assertEqual(batch["query"], expected_query) + def test_process_query_batch(self): sql = ( "SELECT first_name, last_name, email FROM citizens " "WHERE age <= @max_age" @@ -2644,10 +2666,11 @@ def __init__(self, name): class _Database(object): log_commit_stats = False - def __init__(self, name, instance=None): + def __init__(self, name, instance=None, databoost_enabled=False): self.name = name self.database_id = name.rsplit("/", 1)[1] self._instance = instance + self.databoost_enabled = databoost_enabled from logging import Logger self.logger = mock.create_autospec(Logger, instance=True) diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index c3ea162f11..a42fb2a652 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -594,6 +594,18 @@ def test_read_other_error(self): ), ) + def test_read_w_databoost_enabled_invalid_error(self): + from google.cloud.spanner_v1.keyset import KeySet + from google.api_core.exceptions import InvalidArgument + + keyset = KeySet(all_=True) + database = _Database() + session = _Session(database) + derived = self._makeDerived(session) + + with self.assertRaises(InvalidArgument): + derived.read(TABLE_NAME, COLUMNS, keyset, databoost_enabled=True) + def _read_helper( self, multi_use, @@ -815,6 +827,15 @@ def test_execute_sql_other_error(self): attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY}), ) + def test_execute_sql_w_databoost_enabled_invalid_error(self): + from google.api_core.exceptions import InvalidArgument + database = _Database() + session = _Session(database) + derived = self._makeDerived(session) + + with self.assertRaises(InvalidArgument): + derived.execute_sql(SQL_QUERY, databoost_enabled=True) + def test_execute_sql_w_params_wo_param_types(self): database = _Database() session = _Session(database) From b9ddafd39897213226fab5de880c1f9d97c15fbe Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Fri, 3 Feb 2023 09:44:50 +0530 Subject: [PATCH 04/12] linting --- google/cloud/spanner_v1/database.py | 12 ++++--- google/cloud/spanner_v1/instance.py | 2 +- google/cloud/spanner_v1/snapshot.py | 8 +++-- tests/unit/test_database.py | 53 +++++++++++++++++------------ tests/unit/test_snapshot.py | 1 + 5 files changed, 48 insertions(+), 28 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 08a13c7f20..714c0a33e8 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -1182,7 +1182,9 @@ def generate_read_batches( "columns": columns, "keyset": keyset._to_dict(), "index": index, - "databoost_enabled": databoost_enabled if databoost_enabled != None else self._database.databoost_enabled, + "databoost_enabled": databoost_enabled + if databoost_enabled != None + else self._database.databoost_enabled, } for partition in partitions: yield {"partition": partition, "read": read_info.copy()} @@ -1272,7 +1274,7 @@ def generate_query_batches( (Optional) Query optimizer configuration to use for the given query. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.QueryOptions` - + :type databoost_enabled: :param databoost_enabled: (Optional) If this is for a partitioned query and this field is @@ -1301,8 +1303,10 @@ def generate_query_batches( query_info = { "sql": sql, - "databoost_enabled": databoost_enabled if databoost_enabled != None else self._database.databoost_enabled, - } + "databoost_enabled": databoost_enabled + if databoost_enabled != None + else self._database.databoost_enabled, + } if params: query_info["params"] = params query_info["param_types"] = param_types diff --git a/google/cloud/spanner_v1/instance.py b/google/cloud/spanner_v1/instance.py index 981ed5328b..e9b608998f 100644 --- a/google/cloud/spanner_v1/instance.py +++ b/google/cloud/spanner_v1/instance.py @@ -432,7 +432,7 @@ def database( encryption_config=None, database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED, database_role=None, - databoost_enabled=False + databoost_enabled=False, ): """Factory to create a database within this instance. diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 76f8e33a87..c915b61984 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -233,7 +233,9 @@ def read( raise ValueError("Transaction ID pending.") if not partition and databoost_enabled: - raise InvalidArgument("'databoost_enable' should only be set for batch queries") + raise InvalidArgument( + "'databoost_enable' should only be set for batch queries" + ) database = self._session._database api = database.spanner_api @@ -384,7 +386,9 @@ def execute_sql( raise ValueError("Transaction ID pending.") if not partition and databoost_enabled: - raise InvalidArgument("'databoost_enable' should only be set for batch queries") + raise InvalidArgument( + "'databoost_enable' should only be set for batch queries" + ) if params is not None: if param_types is None: diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index d96497f422..c8c293b6e4 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -137,9 +137,7 @@ def test_ctor_w_database_role(self): def test_ctor_w_databoost_enabled(self): instance = _Instance(self.INSTANCE_NAME) - database = self._make_one( - self.DATABASE_ID, instance, databoost_enabled=True - ) + database = self._make_one(self.DATABASE_ID, instance, databoost_enabled=True) self.assertEqual(database.database_id, self.DATABASE_ID) self.assertIs(database._instance, instance) self.assertTrue(database.databoost_enabled) @@ -1933,7 +1931,7 @@ def _get_target_class(self): def _make_database(**kwargs): from google.cloud.spanner_v1.database import Database - return mock.create_autospec(Database, instance=True, **kwargs) + return mock.create_autospec(Database, instance=True, **kwargs) @staticmethod def _make_session(**kwargs): @@ -1985,6 +1983,7 @@ def test_ctor_w_exact_staleness(self): duration = self._make_duration() batch_txn = self._make_one(database, exact_staleness=duration) + self.assertIs(batch_txn._database, database) self.assertIsNone(batch_txn._session) self.assertIsNone(batch_txn._snapshot) @@ -2110,7 +2109,7 @@ def test_generate_read_batches_w_max_partitions(self): database = self._make_database() batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() - batch_txn._database.databoost_enabled=False + batch_txn._database.databoost_enabled = False snapshot.partition_read.return_value = self.TOKENS batches = list( @@ -2149,7 +2148,7 @@ def test_generate_read_batches_w_retry_and_timeout_params(self): batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() snapshot.partition_read.return_value = self.TOKENS - batch_txn._database.databoost_enabled=False + batch_txn._database.databoost_enabled = False retry = Retry(deadline=60) batches = list( batch_txn.generate_read_batches( @@ -2191,7 +2190,7 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): database = self._make_database() batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() - batch_txn._database.databoost_enabled=False + batch_txn._database.databoost_enabled = False snapshot.partition_read.return_value = self.TOKENS batches = list( @@ -2233,7 +2232,7 @@ def test_generate_read_batches_w_databoost_enabled(self): database = self._make_database() batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() - batch_txn._database.databoost_enabled=False + batch_txn._database.databoost_enabled = False snapshot.partition_read.return_value = self.TOKENS batches = list( @@ -2345,7 +2344,11 @@ def test_generate_query_batches_w_max_partitions(self): batch_txn.generate_query_batches(sql, max_partitions=max_partitions) ) - expected_query = {"sql": sql, "databoost_enabled": False, "query_options": client._query_options} + expected_query = { + "sql": sql, + "databoost_enabled": False, + "query_options": client._query_options, + } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): self.assertEqual(batch["partition"], token) @@ -2383,7 +2386,7 @@ def test_generate_query_batches_w_params_w_partition_size_bytes(self): expected_query = { "sql": sql, - "databoost_enabled": False, + "databoost_enabled": False, "params": params, "param_types": param_types, "query_options": client._query_options, @@ -2430,7 +2433,7 @@ def test_generate_query_batches_w_retry_and_timeout_params(self): expected_query = { "sql": sql, - "databoost_enabled": False, + "databoost_enabled": False, "params": params, "param_types": param_types, "query_options": client._query_options, @@ -2454,16 +2457,20 @@ def test_generate_query_batches_w_databoost_enabled(self): sql = "SELECT COUNT(*) FROM table_name" client = _Client(self.PROJECT_ID) instance = _Instance(self.INSTANCE_NAME, client=client) - database = _Database(self.DATABASE_NAME, instance=instance, databoost_enabled=True) + database = _Database( + self.DATABASE_NAME, instance=instance, databoost_enabled=True + ) batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() snapshot.partition_query.return_value = self.TOKENS - batches = list( - batch_txn.generate_query_batches(sql) - ) + batches = list(batch_txn.generate_query_batches(sql)) - expected_query = {"sql": sql, "databoost_enabled": True, "query_options": client._query_options} + expected_query = { + "sql": sql, + "databoost_enabled": True, + "query_options": client._query_options, + } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): self.assertEqual(batch["partition"], token) @@ -2473,16 +2480,20 @@ def test_generate_query_batches_w_databoost_enabled_at_database(self): sql = "SELECT COUNT(*) FROM table_name" client = _Client(self.PROJECT_ID) instance = _Instance(self.INSTANCE_NAME, client=client) - database = _Database(self.DATABASE_NAME, instance=instance, databoost_enabled=True) + database = _Database( + self.DATABASE_NAME, instance=instance, databoost_enabled=True + ) batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() snapshot.partition_query.return_value = self.TOKENS - batches = list( - batch_txn.generate_query_batches(sql) - ) + batches = list(batch_txn.generate_query_batches(sql)) - expected_query = {"sql": sql, "databoost_enabled": True, "query_options": client._query_options} + expected_query = { + "sql": sql, + "databoost_enabled": True, + "query_options": client._query_options, + } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): self.assertEqual(batch["partition"], token) diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index a42fb2a652..a46b9640c0 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -829,6 +829,7 @@ def test_execute_sql_other_error(self): def test_execute_sql_w_databoost_enabled_invalid_error(self): from google.api_core.exceptions import InvalidArgument + database = _Database() session = _Session(database) derived = self._makeDerived(session) From 75a74974fe091640ce8fa1ddd07b43c657be77df Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Wed, 8 Feb 2023 20:57:53 +0530 Subject: [PATCH 05/12] changes --- google/cloud/spanner_v1/database.py | 26 ++----------- google/cloud/spanner_v1/instance.py | 2 - google/cloud/spanner_v1/snapshot.py | 10 ----- samples/samples/batch_sample.py | 60 ++--------------------------- tests/system/test_session_api.py | 53 +++++++++---------------- tests/unit/test_database.py | 50 ++++++------------------ tests/unit/test_snapshot.py | 22 ----------- 7 files changed, 37 insertions(+), 186 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 714c0a33e8..031cadd65a 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -124,9 +124,6 @@ class Database(object): (Optional) database dialect for the database :type database_role: str or None :param database_role: (Optional) user-assigned database_role for the session. - :type databoost_enabled: bool - :param databoost_enabled: (Optional) for batch partitioned query if this field is - set ``true``, the request will be executed via offline access. """ _spanner_api = None @@ -141,7 +138,6 @@ def __init__( encryption_config=None, database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED, database_role=None, - databoost_enabled=False, ): self.database_id = database_id self._instance = instance @@ -159,7 +155,6 @@ def __init__( self._encryption_config = encryption_config self._database_dialect = database_dialect self._database_role = database_role - self._databoost_enabled = databoost_enabled if pool is None: pool = BurstyPool(database_role=database_role) @@ -333,15 +328,6 @@ def database_role(self): """ return self._database_role - @property - def databoost_enabled(self): - """(Optional) For batch partitioned query if this field is - set ``true``, the request will be executed via offline access. - :rtype: bool - :returns: a bool with the value if databoost is enabled. - """ - return self._databoost_enabled - @property def logger(self): """Logger used by the database. @@ -1115,7 +1101,7 @@ def generate_read_batches( index="", partition_size_bytes=None, max_partitions=None, - databoost_enabled=None, + databoost_enabled=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1182,9 +1168,7 @@ def generate_read_batches( "columns": columns, "keyset": keyset._to_dict(), "index": index, - "databoost_enabled": databoost_enabled - if databoost_enabled != None - else self._database.databoost_enabled, + "databoost_enabled": databoost_enabled, } for partition in partitions: yield {"partition": partition, "read": read_info.copy()} @@ -1228,7 +1212,7 @@ def generate_query_batches( partition_size_bytes=None, max_partitions=None, query_options=None, - databoost_enabled=None, + databoost_enabled=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1303,9 +1287,7 @@ def generate_query_batches( query_info = { "sql": sql, - "databoost_enabled": databoost_enabled - if databoost_enabled != None - else self._database.databoost_enabled, + "databoost_enabled": databoost_enabled, } if params: query_info["params"] = params diff --git a/google/cloud/spanner_v1/instance.py b/google/cloud/spanner_v1/instance.py index e9b608998f..f972f817b3 100644 --- a/google/cloud/spanner_v1/instance.py +++ b/google/cloud/spanner_v1/instance.py @@ -432,7 +432,6 @@ def database( encryption_config=None, database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED, database_role=None, - databoost_enabled=False, ): """Factory to create a database within this instance. @@ -480,7 +479,6 @@ def database( encryption_config=encryption_config, database_dialect=database_dialect, database_role=database_role, - databoost_enabled=databoost_enabled, ) def list_databases(self, page_size=None): diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index c915b61984..2d23e8d8d2 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -232,11 +232,6 @@ def read( if self._transaction_id is None and self._read_only: raise ValueError("Transaction ID pending.") - if not partition and databoost_enabled: - raise InvalidArgument( - "'databoost_enable' should only be set for batch queries" - ) - database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) @@ -385,11 +380,6 @@ def execute_sql( if self._transaction_id is None and self._read_only: raise ValueError("Transaction ID pending.") - if not partition and databoost_enabled: - raise InvalidArgument( - "'databoost_enable' should only be set for batch queries" - ) - if params is not None: if param_types is None: raise ValueError("Specify 'param_types' when passing 'params'.") diff --git a/samples/samples/batch_sample.py b/samples/samples/batch_sample.py index 6b164eff2e..06434e8759 100644 --- a/samples/samples/batch_sample.py +++ b/samples/samples/batch_sample.py @@ -47,6 +47,9 @@ def run_batch_query(instance_id, database_id): table="Singers", columns=("SingerId", "FirstName", "LastName"), keyset=spanner.KeySet(all_=True), + # If this is for a partitioned query and this field is + # set ``true``, the request will be executed via offline access. + databoost_enabled=True, ) # Create a pool of workers for the tasks @@ -76,61 +79,6 @@ def process(snapshot, partition): # [END spanner_batch_client] -# [START spanner_databoost_enabled_batch_client] -def run_databoost_enabled_batch_query(instance_id, database_id): - """Runs an example batch query with enabled databoost_enabled option.""" - - # Expected Table Format: - # CREATE TABLE Singers ( - # SingerId INT64 NOT NULL, - # FirstName STRING(1024), - # LastName STRING(1024), - # SingerInfo BYTES(MAX), - # ) PRIMARY KEY (SingerId); - - spanner_client = spanner.Client() - instance = spanner_client.instance(instance_id) - # Databoost Enabled option can be set for all of batch queries - # for that database. If this is for a partitioned query and this field - # is set ``true``, the request will be executed via offline access. - database = instance.database(database_id, databoost_enabled=True) - - # When generating the partitions, databoost_enabled option can be passed. - snapshot = database.batch_snapshot() - partitions = snapshot.generate_read_batches( - table="Singers", - columns=("SingerId", "FirstName", "LastName"), - keyset=spanner.KeySet(all_=True), - databoost_enabled=True - ) - - # Create a pool of workers for the tasks - start = time.time() - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [executor.submit(process, snapshot, p) for p in partitions] - - for future in concurrent.futures.as_completed(futures, timeout=3600): - finish, row_ct = future.result() - elapsed = finish - start - print("Completed {} rows in {} seconds".format(row_ct, elapsed)) - - # Clean up - snapshot.close() - - -def process(snapshot, partition): - """Processes the requests of a query in an separate process.""" - print("Started processing partition.") - row_ct = 0 - for row in snapshot.process_read_batch(partition): - print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row)) - row_ct += 1 - return time.time(), row_ct - - -# [END spanner_databoost_enabled_batch_client] - - if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter @@ -144,5 +92,3 @@ def process(snapshot, partition): if args.command == "run_batch_query": run_batch_query(args.instance_id, args.database_id) - elif args.command == "databoost_enabled_batch_query": - run_databoost_enabled_batch_query(args.instance_id, args.database_id) diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 2b2ded6a3b..13459394aa 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -1886,7 +1886,11 @@ def test_partition_read_w_index(sessions_database): batch_txn = sessions_database.batch_snapshot(read_timestamp=committed) batches = batch_txn.generate_read_batches( - sd.TABLE, columns, spanner_v1.KeySet(all_=True), index="name" + sd.TABLE, + columns, + spanner_v1.KeySet(all_=True), + index="name", + databoost_enabled=True, ) for batch in batches: p_results_iter = batch_txn.process(batch) @@ -1896,25 +1900,15 @@ def test_partition_read_w_index(sessions_database): batch_txn.close() -def test_partition_read_w_databoost_enabled(sessions_database): +def test_read_invalid_arguement_error_w_databoost_enabled(sessions_database): sd = _sample_data - row_count = 10 - columns = sd.COLUMNS[1], sd.COLUMNS[2] - committed = _set_up_table(sessions_database, row_count) + row_count = 40 + keyset = spanner_v1.KeySet(all_=True) + _set_up_table(sessions_database, row_count) - expected = [[row[1], row[2]] for row in _row_data(row_count)] - union = [] - - batch_txn = sessions_database.batch_snapshot(read_timestamp=committed) - batches = batch_txn.generate_read_batches( - sd.TABLE, columns, spanner_v1.KeySet(all_=True), databoost_enabled=True - ) - for batch in batches: - p_results_iter = batch_txn.process(batch) - union.extend(list(p_results_iter)) - - assert union == expected - batch_txn.close() + with pytest.raises(exceptions.InvalidArgument): + with sessions_database.snapshot() as snapshot: + list(snapshot.read(sd.TABLE, sd.COLUMNS, keyset)) def test_execute_sql_w_manual_consume(sessions_database): @@ -2524,7 +2518,7 @@ def test_partition_query(sessions_database): all_data_rows = set(_row_data(row_count)) union = set() batch_txn = sessions_database.batch_snapshot(read_timestamp=committed) - for batch in batch_txn.generate_query_batches(sql): + for batch in batch_txn.generate_query_batches(sql, databoost_enabled=True): p_results_iter = batch_txn.process(batch) # Lists aren't hashable so the results need to be converted rows = [tuple(result) for result in p_results_iter] @@ -2534,23 +2528,14 @@ def test_partition_query(sessions_database): batch_txn.close() -def test_partition_query_w_databoost_enabled(sessions_database): - pdb.set_trace() +def test_execute_sql_invalid_arguement_error_w_databoost_enabled(sessions_database): + sd = _sample_data row_count = 40 - sql = f"SELECT * FROM {_sample_data.TABLE}" - - # Paritioned query does not support ORDER BY - all_data_rows = set(_row_data(row_count)) - union = set() - batch_txn = sessions_database.batch_snapshot() - for batch in batch_txn.generate_query_batches(sql, databoost_enabled=True): - p_results_iter = batch_txn.process(batch) - # Lists aren't hashable so the results need to be converted - rows = [tuple(result) for result in p_results_iter] - union.update(set(rows)) + _set_up_table(sessions_database, row_count) - assert union == all_data_rows - batch_txn.close() + with pytest.raises(exceptions.InvalidArgument): + with sessions_database.snapshot() as snapshot: + list(snapshot.execute_sql(sd.SQL, databoost_enabled=True)) class FauxCall: diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index c8c293b6e4..20f39751cc 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -114,7 +114,6 @@ def test_ctor_defaults(self): # BurstyPool does not create sessions during 'bind()'. self.assertTrue(database._pool._sessions.empty()) self.assertIsNone(database.database_role) - self.assertFalse(database.databoost_enabled) def test_ctor_w_explicit_pool(self): instance = _Instance(self.INSTANCE_NAME) @@ -135,13 +134,6 @@ def test_ctor_w_database_role(self): self.assertIs(database._instance, instance) self.assertIs(database.database_role, self.DATABASE_ROLE) - def test_ctor_w_databoost_enabled(self): - instance = _Instance(self.INSTANCE_NAME) - database = self._make_one(self.DATABASE_ID, instance, databoost_enabled=True) - self.assertEqual(database.database_id, self.DATABASE_ID) - self.assertIs(database._instance, instance) - self.assertTrue(database.databoost_enabled) - def test_ctor_w_ddl_statements_non_string(self): with self.assertRaises(ValueError): @@ -2109,7 +2101,6 @@ def test_generate_read_batches_w_max_partitions(self): database = self._make_database() batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() - batch_txn._database.databoost_enabled = False snapshot.partition_read.return_value = self.TOKENS batches = list( @@ -2148,7 +2139,6 @@ def test_generate_read_batches_w_retry_and_timeout_params(self): batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() snapshot.partition_read.return_value = self.TOKENS - batch_txn._database.databoost_enabled = False retry = Retry(deadline=60) batches = list( batch_txn.generate_read_batches( @@ -2190,7 +2180,6 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): database = self._make_database() batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() - batch_txn._database.databoost_enabled = False snapshot.partition_read.return_value = self.TOKENS batches = list( @@ -2232,7 +2221,6 @@ def test_generate_read_batches_w_databoost_enabled(self): database = self._make_database() batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() - batch_txn._database.databoost_enabled = False snapshot.partition_read.return_value = self.TOKENS batches = list( @@ -2457,14 +2445,12 @@ def test_generate_query_batches_w_databoost_enabled(self): sql = "SELECT COUNT(*) FROM table_name" client = _Client(self.PROJECT_ID) instance = _Instance(self.INSTANCE_NAME, client=client) - database = _Database( - self.DATABASE_NAME, instance=instance, databoost_enabled=True - ) + database = _Database(self.DATABASE_NAME, instance=instance) batch_txn = self._make_one(database) snapshot = batch_txn._snapshot = self._make_snapshot() snapshot.partition_query.return_value = self.TOKENS - batches = list(batch_txn.generate_query_batches(sql)) + batches = list(batch_txn.generate_query_batches(sql, databoost_enabled=True)) expected_query = { "sql": sql, @@ -2476,28 +2462,15 @@ def test_generate_query_batches_w_databoost_enabled(self): self.assertEqual(batch["partition"], token) self.assertEqual(batch["query"], expected_query) - def test_generate_query_batches_w_databoost_enabled_at_database(self): - sql = "SELECT COUNT(*) FROM table_name" - client = _Client(self.PROJECT_ID) - instance = _Instance(self.INSTANCE_NAME, client=client) - database = _Database( - self.DATABASE_NAME, instance=instance, databoost_enabled=True + snapshot.partition_query.assert_called_once_with( + sql=sql, + params=None, + param_types=None, + partition_size_bytes=None, + max_partitions=None, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, ) - batch_txn = self._make_one(database) - snapshot = batch_txn._snapshot = self._make_snapshot() - snapshot.partition_query.return_value = self.TOKENS - - batches = list(batch_txn.generate_query_batches(sql)) - - expected_query = { - "sql": sql, - "databoost_enabled": True, - "query_options": client._query_options, - } - self.assertEqual(len(batches), len(self.TOKENS)) - for batch, token in zip(batches, self.TOKENS): - self.assertEqual(batch["partition"], token) - self.assertEqual(batch["query"], expected_query) def test_process_query_batch(self): sql = ( @@ -2677,11 +2650,10 @@ def __init__(self, name): class _Database(object): log_commit_stats = False - def __init__(self, name, instance=None, databoost_enabled=False): + def __init__(self, name, instance=None): self.name = name self.database_id = name.rsplit("/", 1)[1] self._instance = instance - self.databoost_enabled = databoost_enabled from logging import Logger self.logger = mock.create_autospec(Logger, instance=True) diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index a46b9640c0..c3ea162f11 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -594,18 +594,6 @@ def test_read_other_error(self): ), ) - def test_read_w_databoost_enabled_invalid_error(self): - from google.cloud.spanner_v1.keyset import KeySet - from google.api_core.exceptions import InvalidArgument - - keyset = KeySet(all_=True) - database = _Database() - session = _Session(database) - derived = self._makeDerived(session) - - with self.assertRaises(InvalidArgument): - derived.read(TABLE_NAME, COLUMNS, keyset, databoost_enabled=True) - def _read_helper( self, multi_use, @@ -827,16 +815,6 @@ def test_execute_sql_other_error(self): attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY}), ) - def test_execute_sql_w_databoost_enabled_invalid_error(self): - from google.api_core.exceptions import InvalidArgument - - database = _Database() - session = _Session(database) - derived = self._makeDerived(session) - - with self.assertRaises(InvalidArgument): - derived.execute_sql(SQL_QUERY, databoost_enabled=True) - def test_execute_sql_w_params_wo_param_types(self): database = _Database() session = _Session(database) From a386e88b2f1647ff76fd76bd38f9056ee0503818 Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Fri, 24 Mar 2023 14:13:26 +0530 Subject: [PATCH 06/12] changes --- google/cloud/spanner_v1/types/spanner.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/google/cloud/spanner_v1/types/spanner.py b/google/cloud/spanner_v1/types/spanner.py index c44995e134..d829df618f 100644 --- a/google/cloud/spanner_v1/types/spanner.py +++ b/google/cloud/spanner_v1/types/spanner.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import annotations + from typing import MutableMapping, MutableSequence import proto # type: ignore @@ -474,9 +476,11 @@ class ExecuteSqlRequest(proto.Message): given query. request_options (google.cloud.spanner_v1.types.RequestOptions): Common options for this request. - databoost_enabled (bool): + data_boost_enabled (bool): If this is for a partitioned query and this field is set to - ``true``, the request will be executed via offline access. + ``true``, the request will be executed via Spanner + independent compute resources. + If the field is set to ``true`` but the request does not set ``partition_token``, the API will return an ``INVALID_ARGUMENT`` error. @@ -619,9 +623,9 @@ class QueryOptions(proto.Message): number=11, message="RequestOptions", ) - serverless_analytics_enabled: bool = proto.Field( + data_boost_enabled: bool = proto.Field( proto.BOOL, - number=14, + number=16, ) @@ -1133,9 +1137,11 @@ class ReadRequest(proto.Message): create this partition_token. request_options (google.cloud.spanner_v1.types.RequestOptions): Common options for this request. - databoost_enabled (bool): + data_boost_enabled (bool): If this is for a partitioned read and this field is set to - ``true``, the request will be executed via offline access. + ``true``, the request will be executed via Spanner + independent compute resources. + If the field is set to ``true`` but the request does not set ``partition_token``, the API will return an ``INVALID_ARGUMENT`` error. @@ -1184,9 +1190,9 @@ class ReadRequest(proto.Message): number=11, message="RequestOptions", ) - serverless_analytics_enabled: bool = proto.Field( + data_boost_enabled: bool = proto.Field( proto.BOOL, - number=13, + number=15, ) From da5d7959f54717be6cd3b38116f8076e03f01609 Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Fri, 24 Mar 2023 14:37:06 +0530 Subject: [PATCH 07/12] changes --- google/cloud/spanner_v1/database.py | 16 ++++++++-------- google/cloud/spanner_v1/snapshot.py | 16 ++++++++-------- samples/samples/batch_sample.py | 7 ++++--- tests/system/test_session_api.py | 29 ++++------------------------- tests/unit/test_database.py | 26 +++++++++++++------------- 5 files changed, 37 insertions(+), 57 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 031cadd65a..721f357b1a 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -1101,7 +1101,7 @@ def generate_read_batches( index="", partition_size_bytes=None, max_partitions=None, - databoost_enabled=False, + data_boost_enable=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1136,8 +1136,8 @@ def generate_read_batches( service uses this as a hint, the actual number of partitions may differ. - :type databoost_enabled: - :param databoost_enabled: + :type data_boost_enable: + :param data_boost_enable: (Optional) If this is for a partitioned query and this field is set ``true``, the request will be executed via offline access. @@ -1168,7 +1168,7 @@ def generate_read_batches( "columns": columns, "keyset": keyset._to_dict(), "index": index, - "databoost_enabled": databoost_enabled, + "data_boost_enable": data_boost_enable, } for partition in partitions: yield {"partition": partition, "read": read_info.copy()} @@ -1212,7 +1212,7 @@ def generate_query_batches( partition_size_bytes=None, max_partitions=None, query_options=None, - databoost_enabled=False, + data_boost_enable=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1259,8 +1259,8 @@ def generate_query_batches( If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.QueryOptions` - :type databoost_enabled: - :param databoost_enabled: + :type data_boost_enable: + :param data_boost_enable: (Optional) If this is for a partitioned query and this field is set ``true``, the request will be executed via offline access. @@ -1287,7 +1287,7 @@ def generate_query_batches( query_info = { "sql": sql, - "databoost_enabled": databoost_enabled, + "data_boost_enable": data_boost_enable, } if params: query_info["params"] = params diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 2d23e8d8d2..7c7a8e465c 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -167,7 +167,7 @@ def read( limit=0, partition=None, request_options=None, - databoost_enabled=False, + data_boost_enable=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -211,8 +211,8 @@ def read( :type timeout: float :param timeout: (Optional) The timeout for this request. - :type databoost_enabled: - :param databoost_enabled: + :type data_boost_enable: + :param data_boost_enable: (Optional) If this is for a partitioned query and this field is set ``true``, the request will be executed via offline access. If the field is set to ``true`` but the request does not set @@ -256,7 +256,7 @@ def read( limit=limit, partition_token=partition, request_options=request_options, - serverless_analytics_enabled=databoost_enabled, + data_boost_enable=data_boost_enable, ) restart = functools.partial( api.streaming_read, @@ -312,7 +312,7 @@ def execute_sql( partition=None, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, - databoost_enabled=False, + data_boost_enable=False, ): """Perform an ``ExecuteStreamingSql`` API request. @@ -362,8 +362,8 @@ def execute_sql( :type timeout: float :param timeout: (Optional) The timeout for this request. - :type databoost_enabled: - :param databoost_enabled: + :type data_boost_enable: + :param data_boost_enable: (Optional) If this is for a partitioned query and this field is set ``true``, the request will be executed via offline access. If the field is set to ``true`` but the request does not set @@ -419,7 +419,7 @@ def execute_sql( seqno=self._execute_sql_count, query_options=query_options, request_options=request_options, - serverless_analytics_enabled=databoost_enabled, + data_boost_enable=data_boost_enable, ) restart = functools.partial( api.execute_streaming_sql, diff --git a/samples/samples/batch_sample.py b/samples/samples/batch_sample.py index 06434e8759..f6e8b8c6ba 100644 --- a/samples/samples/batch_sample.py +++ b/samples/samples/batch_sample.py @@ -47,9 +47,10 @@ def run_batch_query(instance_id, database_id): table="Singers", columns=("SingerId", "FirstName", "LastName"), keyset=spanner.KeySet(all_=True), - # If this is for a partitioned query and this field is - # set ``true``, the request will be executed via offline access. - databoost_enabled=True, + # A Partition object is serializable and can be used from a different process. + # DataBoost option is an optional parameter which can also be used for partition read + # and query to execute the request via spanner independent compute resources. + data_boost_enable=True, ) # Create a pool of workers for the tasks diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 13459394aa..30bcac1c2d 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -1875,7 +1875,7 @@ def test_read_with_range_keys_and_index_open_open(sessions_database): assert rows == expected -def test_partition_read_w_index(sessions_database): +def test_partition_read_w_index(sessions_database, not_emulator): sd = _sample_data row_count = 10 columns = sd.COLUMNS[1], sd.COLUMNS[2] @@ -1890,7 +1890,7 @@ def test_partition_read_w_index(sessions_database): columns, spanner_v1.KeySet(all_=True), index="name", - databoost_enabled=True, + data_boost_enable=True, ) for batch in batches: p_results_iter = batch_txn.process(batch) @@ -1900,17 +1900,6 @@ def test_partition_read_w_index(sessions_database): batch_txn.close() -def test_read_invalid_arguement_error_w_databoost_enabled(sessions_database): - sd = _sample_data - row_count = 40 - keyset = spanner_v1.KeySet(all_=True) - _set_up_table(sessions_database, row_count) - - with pytest.raises(exceptions.InvalidArgument): - with sessions_database.snapshot() as snapshot: - list(snapshot.read(sd.TABLE, sd.COLUMNS, keyset)) - - def test_execute_sql_w_manual_consume(sessions_database): sd = _sample_data row_count = 3000 @@ -2509,7 +2498,7 @@ def test_execute_sql_returning_transfinite_floats(sessions_database, not_postgre assert math.isnan(float_array[2]) -def test_partition_query(sessions_database): +def test_partition_query(sessions_database, not_emulator): row_count = 40 sql = f"SELECT * FROM {_sample_data.TABLE}" committed = _set_up_table(sessions_database, row_count) @@ -2518,7 +2507,7 @@ def test_partition_query(sessions_database): all_data_rows = set(_row_data(row_count)) union = set() batch_txn = sessions_database.batch_snapshot(read_timestamp=committed) - for batch in batch_txn.generate_query_batches(sql, databoost_enabled=True): + for batch in batch_txn.generate_query_batches(sql, data_boost_enable=True): p_results_iter = batch_txn.process(batch) # Lists aren't hashable so the results need to be converted rows = [tuple(result) for result in p_results_iter] @@ -2528,16 +2517,6 @@ def test_partition_query(sessions_database): batch_txn.close() -def test_execute_sql_invalid_arguement_error_w_databoost_enabled(sessions_database): - sd = _sample_data - row_count = 40 - _set_up_table(sessions_database, row_count) - - with pytest.raises(exceptions.InvalidArgument): - with sessions_database.snapshot() as snapshot: - list(snapshot.execute_sql(sd.SQL, databoost_enabled=True)) - - class FauxCall: def __init__(self, code, details="FauxCall"): self._code = code diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 20f39751cc..ff5da097f2 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -2114,7 +2114,7 @@ def test_generate_read_batches_w_max_partitions(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": "", - "databoost_enabled": False, + "data_boost_enable": False, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2156,7 +2156,7 @@ def test_generate_read_batches_w_retry_and_timeout_params(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": "", - "databoost_enabled": False, + "data_boost_enable": False, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2197,7 +2197,7 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": self.INDEX, - "databoost_enabled": False, + "data_boost_enable": False, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2215,8 +2215,8 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): timeout=gapic_v1.method.DEFAULT, ) - def test_generate_read_batches_w_databoost_enabled(self): - databoost_enabled = True + def test_generate_read_batches_w_data_boost_enable(self): + data_boost_enable = True keyset = self._make_keyset() database = self._make_database() batch_txn = self._make_one(database) @@ -2229,7 +2229,7 @@ def test_generate_read_batches_w_databoost_enabled(self): self.COLUMNS, keyset, index=self.INDEX, - databoost_enabled=databoost_enabled, + data_boost_enable=data_boost_enable, ) ) @@ -2238,7 +2238,7 @@ def test_generate_read_batches_w_databoost_enabled(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": self.INDEX, - "databoost_enabled": True, + "data_boost_enable": True, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2334,7 +2334,7 @@ def test_generate_query_batches_w_max_partitions(self): expected_query = { "sql": sql, - "databoost_enabled": False, + "data_boost_enable": False, "query_options": client._query_options, } self.assertEqual(len(batches), len(self.TOKENS)) @@ -2374,7 +2374,7 @@ def test_generate_query_batches_w_params_w_partition_size_bytes(self): expected_query = { "sql": sql, - "databoost_enabled": False, + "data_boost_enable": False, "params": params, "param_types": param_types, "query_options": client._query_options, @@ -2421,7 +2421,7 @@ def test_generate_query_batches_w_retry_and_timeout_params(self): expected_query = { "sql": sql, - "databoost_enabled": False, + "data_boost_enable": False, "params": params, "param_types": param_types, "query_options": client._query_options, @@ -2441,7 +2441,7 @@ def test_generate_query_batches_w_retry_and_timeout_params(self): timeout=2.0, ) - def test_generate_query_batches_w_databoost_enabled(self): + def test_generate_query_batches_w_data_boost_enable(self): sql = "SELECT COUNT(*) FROM table_name" client = _Client(self.PROJECT_ID) instance = _Instance(self.INSTANCE_NAME, client=client) @@ -2450,11 +2450,11 @@ def test_generate_query_batches_w_databoost_enabled(self): snapshot = batch_txn._snapshot = self._make_snapshot() snapshot.partition_query.return_value = self.TOKENS - batches = list(batch_txn.generate_query_batches(sql, databoost_enabled=True)) + batches = list(batch_txn.generate_query_batches(sql, data_boost_enable=True)) expected_query = { "sql": sql, - "databoost_enabled": True, + "data_boost_enable": True, "query_options": client._query_options, } self.assertEqual(len(batches), len(self.TOKENS)) From e9388ed5b6f1e9947be4a5a52a391f5022405226 Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Fri, 31 Mar 2023 18:45:19 +0530 Subject: [PATCH 08/12] changes --- google/cloud/spanner_v1/database.py | 16 ++++++++-------- google/cloud/spanner_v1/snapshot.py | 16 ++++++++-------- samples/samples/batch_sample.py | 2 +- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 721f357b1a..0c51cbf09c 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -1101,7 +1101,7 @@ def generate_read_batches( index="", partition_size_bytes=None, max_partitions=None, - data_boost_enable=False, + data_boost_enabled=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1136,8 +1136,8 @@ def generate_read_batches( service uses this as a hint, the actual number of partitions may differ. - :type data_boost_enable: - :param data_boost_enable: + :type data_boost_enabled: + :param data_boost_enabled: (Optional) If this is for a partitioned query and this field is set ``true``, the request will be executed via offline access. @@ -1168,7 +1168,7 @@ def generate_read_batches( "columns": columns, "keyset": keyset._to_dict(), "index": index, - "data_boost_enable": data_boost_enable, + "data_boost_enabled": data_boost_enabled, } for partition in partitions: yield {"partition": partition, "read": read_info.copy()} @@ -1212,7 +1212,7 @@ def generate_query_batches( partition_size_bytes=None, max_partitions=None, query_options=None, - data_boost_enable=False, + data_boost_enabled=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -1259,8 +1259,8 @@ def generate_query_batches( If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.QueryOptions` - :type data_boost_enable: - :param data_boost_enable: + :type data_boost_enabled: + :param data_boost_enabled: (Optional) If this is for a partitioned query and this field is set ``true``, the request will be executed via offline access. @@ -1287,7 +1287,7 @@ def generate_query_batches( query_info = { "sql": sql, - "data_boost_enable": data_boost_enable, + "data_boost_enabled": data_boost_enabled, } if params: query_info["params"] = params diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 7c7a8e465c..39e5ef4bda 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -167,7 +167,7 @@ def read( limit=0, partition=None, request_options=None, - data_boost_enable=False, + data_boost_enabled=False, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -211,8 +211,8 @@ def read( :type timeout: float :param timeout: (Optional) The timeout for this request. - :type data_boost_enable: - :param data_boost_enable: + :type data_boost_enabled: + :param data_boost_enabled: (Optional) If this is for a partitioned query and this field is set ``true``, the request will be executed via offline access. If the field is set to ``true`` but the request does not set @@ -256,7 +256,7 @@ def read( limit=limit, partition_token=partition, request_options=request_options, - data_boost_enable=data_boost_enable, + data_boost_enabled=data_boost_enabled, ) restart = functools.partial( api.streaming_read, @@ -312,7 +312,7 @@ def execute_sql( partition=None, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, - data_boost_enable=False, + data_boost_enabled=False, ): """Perform an ``ExecuteStreamingSql`` API request. @@ -362,8 +362,8 @@ def execute_sql( :type timeout: float :param timeout: (Optional) The timeout for this request. - :type data_boost_enable: - :param data_boost_enable: + :type data_boost_enabled: + :param data_boost_enabled: (Optional) If this is for a partitioned query and this field is set ``true``, the request will be executed via offline access. If the field is set to ``true`` but the request does not set @@ -419,7 +419,7 @@ def execute_sql( seqno=self._execute_sql_count, query_options=query_options, request_options=request_options, - data_boost_enable=data_boost_enable, + data_boost_enabled=data_boost_enabled, ) restart = functools.partial( api.execute_streaming_sql, diff --git a/samples/samples/batch_sample.py b/samples/samples/batch_sample.py index f6e8b8c6ba..69913ac4b3 100644 --- a/samples/samples/batch_sample.py +++ b/samples/samples/batch_sample.py @@ -50,7 +50,7 @@ def run_batch_query(instance_id, database_id): # A Partition object is serializable and can be used from a different process. # DataBoost option is an optional parameter which can also be used for partition read # and query to execute the request via spanner independent compute resources. - data_boost_enable=True, + data_boost_enabled=True, ) # Create a pool of workers for the tasks From 96cba7422b512073c8e7257186104faef92d7c7d Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Mon, 3 Apr 2023 01:34:19 +0530 Subject: [PATCH 09/12] changes --- tests/system/test_session_api.py | 4 ++-- tests/unit/test_database.py | 26 +++++++++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 30bcac1c2d..7d58324b04 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -1890,7 +1890,7 @@ def test_partition_read_w_index(sessions_database, not_emulator): columns, spanner_v1.KeySet(all_=True), index="name", - data_boost_enable=True, + data_boost_enabled=True, ) for batch in batches: p_results_iter = batch_txn.process(batch) @@ -2507,7 +2507,7 @@ def test_partition_query(sessions_database, not_emulator): all_data_rows = set(_row_data(row_count)) union = set() batch_txn = sessions_database.batch_snapshot(read_timestamp=committed) - for batch in batch_txn.generate_query_batches(sql, data_boost_enable=True): + for batch in batch_txn.generate_query_batches(sql, data_boost_enabled=True): p_results_iter = batch_txn.process(batch) # Lists aren't hashable so the results need to be converted rows = [tuple(result) for result in p_results_iter] diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index ff5da097f2..9d70d5567f 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -2114,7 +2114,7 @@ def test_generate_read_batches_w_max_partitions(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": "", - "data_boost_enable": False, + "data_boost_enabled": False, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2156,7 +2156,7 @@ def test_generate_read_batches_w_retry_and_timeout_params(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": "", - "data_boost_enable": False, + "data_boost_enabled": False, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2197,7 +2197,7 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": self.INDEX, - "data_boost_enable": False, + "data_boost_enabled": False, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2215,8 +2215,8 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): timeout=gapic_v1.method.DEFAULT, ) - def test_generate_read_batches_w_data_boost_enable(self): - data_boost_enable = True + def test_generate_read_batches_w_data_boost_enabled(self): + data_boost_enabled= True keyset = self._make_keyset() database = self._make_database() batch_txn = self._make_one(database) @@ -2229,7 +2229,7 @@ def test_generate_read_batches_w_data_boost_enable(self): self.COLUMNS, keyset, index=self.INDEX, - data_boost_enable=data_boost_enable, + data_boost_enabled=data_boost_enabled, ) ) @@ -2238,7 +2238,7 @@ def test_generate_read_batches_w_data_boost_enable(self): "columns": self.COLUMNS, "keyset": {"all": True}, "index": self.INDEX, - "data_boost_enable": True, + "data_boost_enabled": True, } self.assertEqual(len(batches), len(self.TOKENS)) for batch, token in zip(batches, self.TOKENS): @@ -2334,7 +2334,7 @@ def test_generate_query_batches_w_max_partitions(self): expected_query = { "sql": sql, - "data_boost_enable": False, + "data_boost_enabled": False, "query_options": client._query_options, } self.assertEqual(len(batches), len(self.TOKENS)) @@ -2374,7 +2374,7 @@ def test_generate_query_batches_w_params_w_partition_size_bytes(self): expected_query = { "sql": sql, - "data_boost_enable": False, + "data_boost_enabled": False, "params": params, "param_types": param_types, "query_options": client._query_options, @@ -2421,7 +2421,7 @@ def test_generate_query_batches_w_retry_and_timeout_params(self): expected_query = { "sql": sql, - "data_boost_enable": False, + "data_boost_enabled": False, "params": params, "param_types": param_types, "query_options": client._query_options, @@ -2441,7 +2441,7 @@ def test_generate_query_batches_w_retry_and_timeout_params(self): timeout=2.0, ) - def test_generate_query_batches_w_data_boost_enable(self): + def test_generate_query_batches_w_data_boost_enabled(self): sql = "SELECT COUNT(*) FROM table_name" client = _Client(self.PROJECT_ID) instance = _Instance(self.INSTANCE_NAME, client=client) @@ -2450,11 +2450,11 @@ def test_generate_query_batches_w_data_boost_enable(self): snapshot = batch_txn._snapshot = self._make_snapshot() snapshot.partition_query.return_value = self.TOKENS - batches = list(batch_txn.generate_query_batches(sql, data_boost_enable=True)) + batches = list(batch_txn.generate_query_batches(sql, data_boost_enabled=True)) expected_query = { "sql": sql, - "data_boost_enable": True, + "data_boost_enabled": True, "query_options": client._query_options, } self.assertEqual(len(batches), len(self.TOKENS)) From e38755f5521c4893899c3fba4a8bcfea1b0f5e39 Mon Sep 17 00:00:00 2001 From: Astha Mohta Date: Mon, 3 Apr 2023 10:03:45 +0530 Subject: [PATCH 10/12] Changes --- tests/unit/test_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 9d70d5567f..030cf5512b 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -2216,7 +2216,7 @@ def test_generate_read_batches_w_index_w_partition_size_bytes(self): ) def test_generate_read_batches_w_data_boost_enabled(self): - data_boost_enabled= True + data_boost_enabled = True keyset = self._make_keyset() database = self._make_database() batch_txn = self._make_one(database) From ec7d66e4976b327f8e32891dd93a2bb6698a558a Mon Sep 17 00:00:00 2001 From: Astha Mohta <35952883+asthamohta@users.noreply.github.com> Date: Thu, 6 Apr 2023 16:10:36 +0530 Subject: [PATCH 11/12] Update google/cloud/spanner_v1/snapshot.py Co-authored-by: Rajat Bhatta <93644539+rajatbhatta@users.noreply.github.com> --- google/cloud/spanner_v1/snapshot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 39e5ef4bda..362e5dd1bc 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -213,7 +213,7 @@ def read( :type data_boost_enabled: :param data_boost_enabled: - (Optional) If this is for a partitioned query and this field is + (Optional) If this is for a partitioned read and this field is set ``true``, the request will be executed via offline access. If the field is set to ``true`` but the request does not set ``partition_token``, the API will return an From 3db4f6932ef09050d32ef8b980e7ea7c2e356a87 Mon Sep 17 00:00:00 2001 From: Astha Mohta <35952883+asthamohta@users.noreply.github.com> Date: Thu, 6 Apr 2023 16:10:44 +0530 Subject: [PATCH 12/12] Update google/cloud/spanner_v1/database.py Co-authored-by: Rajat Bhatta <93644539+rajatbhatta@users.noreply.github.com> --- google/cloud/spanner_v1/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 0c51cbf09c..8e72d6cf8f 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -1138,7 +1138,7 @@ def generate_read_batches( :type data_boost_enabled: :param data_boost_enabled: - (Optional) If this is for a partitioned query and this field is + (Optional) If this is for a partitioned read and this field is set ``true``, the request will be executed via offline access. :type retry: :class:`~google.api_core.retry.Retry`