Skip to content

Commit b191451

Browse files
chore: optimize gapic calls (#863)
1 parent 3ac80a9 commit b191451

File tree

3 files changed

+105
-57
lines changed

3 files changed

+105
-57
lines changed

google/cloud/bigtable_v2/services/bigtable/async_client.py

Lines changed: 42 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16+
import functools
1617
from collections import OrderedDict
1718
import functools
1819
import re
@@ -272,7 +273,8 @@ def read_rows(
272273
"the individual field arguments should be set."
273274
)
274275

275-
request = bigtable.ReadRowsRequest(request)
276+
if not isinstance(request, bigtable.ReadRowsRequest):
277+
request = bigtable.ReadRowsRequest(request)
276278

277279
# If we have keyword arguments corresponding to fields on the
278280
# request, apply these.
@@ -283,12 +285,9 @@ def read_rows(
283285

284286
# Wrap the RPC method; this adds retry and timeout information,
285287
# and friendly error handling.
286-
rpc = gapic_v1.method_async.wrap_method(
287-
self._client._transport.read_rows,
288-
default_timeout=43200.0,
289-
client_info=DEFAULT_CLIENT_INFO,
290-
)
291-
288+
rpc = self._client._transport._wrapped_methods[
289+
self._client._transport.read_rows
290+
]
292291
# Certain fields should be provided within the metadata header;
293292
# add these here.
294293
metadata = tuple(metadata) + (
@@ -367,7 +366,8 @@ def sample_row_keys(
367366
"the individual field arguments should be set."
368367
)
369368

370-
request = bigtable.SampleRowKeysRequest(request)
369+
if not isinstance(request, bigtable.SampleRowKeysRequest):
370+
request = bigtable.SampleRowKeysRequest(request)
371371

372372
# If we have keyword arguments corresponding to fields on the
373373
# request, apply these.
@@ -378,12 +378,9 @@ def sample_row_keys(
378378

379379
# Wrap the RPC method; this adds retry and timeout information,
380380
# and friendly error handling.
381-
rpc = gapic_v1.method_async.wrap_method(
382-
self._client._transport.sample_row_keys,
383-
default_timeout=60.0,
384-
client_info=DEFAULT_CLIENT_INFO,
385-
)
386-
381+
rpc = self._client._transport._wrapped_methods[
382+
self._client._transport.sample_row_keys
383+
]
387384
# Certain fields should be provided within the metadata header;
388385
# add these here.
389386
metadata = tuple(metadata) + (
@@ -479,7 +476,8 @@ async def mutate_row(
479476
"the individual field arguments should be set."
480477
)
481478

482-
request = bigtable.MutateRowRequest(request)
479+
if not isinstance(request, bigtable.MutateRowRequest):
480+
request = bigtable.MutateRowRequest(request)
483481

484482
# If we have keyword arguments corresponding to fields on the
485483
# request, apply these.
@@ -494,21 +492,9 @@ async def mutate_row(
494492

495493
# Wrap the RPC method; this adds retry and timeout information,
496494
# and friendly error handling.
497-
rpc = gapic_v1.method_async.wrap_method(
498-
self._client._transport.mutate_row,
499-
default_retry=retries.Retry(
500-
initial=0.01,
501-
maximum=60.0,
502-
multiplier=2,
503-
predicate=retries.if_exception_type(
504-
core_exceptions.DeadlineExceeded,
505-
core_exceptions.ServiceUnavailable,
506-
),
507-
deadline=60.0,
508-
),
509-
default_timeout=60.0,
510-
client_info=DEFAULT_CLIENT_INFO,
511-
)
495+
rpc = self._client._transport._wrapped_methods[
496+
self._client._transport.mutate_row
497+
]
512498

513499
# Certain fields should be provided within the metadata header;
514500
# add these here.
@@ -601,7 +587,8 @@ def mutate_rows(
601587
"the individual field arguments should be set."
602588
)
603589

604-
request = bigtable.MutateRowsRequest(request)
590+
if not isinstance(request, bigtable.MutateRowsRequest):
591+
request = bigtable.MutateRowsRequest(request)
605592

606593
# If we have keyword arguments corresponding to fields on the
607594
# request, apply these.
@@ -614,11 +601,9 @@ def mutate_rows(
614601

615602
# Wrap the RPC method; this adds retry and timeout information,
616603
# and friendly error handling.
617-
rpc = gapic_v1.method_async.wrap_method(
618-
self._client._transport.mutate_rows,
619-
default_timeout=600.0,
620-
client_info=DEFAULT_CLIENT_INFO,
621-
)
604+
rpc = self._client._transport._wrapped_methods[
605+
self._client._transport.mutate_rows
606+
]
622607

623608
# Certain fields should be provided within the metadata header;
624609
# add these here.
@@ -749,7 +734,8 @@ async def check_and_mutate_row(
749734
"the individual field arguments should be set."
750735
)
751736

752-
request = bigtable.CheckAndMutateRowRequest(request)
737+
if not isinstance(request, bigtable.CheckAndMutateRowRequest):
738+
request = bigtable.CheckAndMutateRowRequest(request)
753739

754740
# If we have keyword arguments corresponding to fields on the
755741
# request, apply these.
@@ -768,11 +754,9 @@ async def check_and_mutate_row(
768754

769755
# Wrap the RPC method; this adds retry and timeout information,
770756
# and friendly error handling.
771-
rpc = gapic_v1.method_async.wrap_method(
772-
self._client._transport.check_and_mutate_row,
773-
default_timeout=20.0,
774-
client_info=DEFAULT_CLIENT_INFO,
775-
)
757+
rpc = self._client._transport._wrapped_methods[
758+
self._client._transport.check_and_mutate_row
759+
]
776760

777761
# Certain fields should be provided within the metadata header;
778762
# add these here.
@@ -851,7 +835,8 @@ async def ping_and_warm(
851835
"the individual field arguments should be set."
852836
)
853837

854-
request = bigtable.PingAndWarmRequest(request)
838+
if not isinstance(request, bigtable.PingAndWarmRequest):
839+
request = bigtable.PingAndWarmRequest(request)
855840

856841
# If we have keyword arguments corresponding to fields on the
857842
# request, apply these.
@@ -862,11 +847,9 @@ async def ping_and_warm(
862847

863848
# Wrap the RPC method; this adds retry and timeout information,
864849
# and friendly error handling.
865-
rpc = gapic_v1.method_async.wrap_method(
866-
self._client._transport.ping_and_warm,
867-
default_timeout=None,
868-
client_info=DEFAULT_CLIENT_INFO,
869-
)
850+
rpc = self._client._transport._wrapped_methods[
851+
self._client._transport.ping_and_warm
852+
]
870853

871854
# Certain fields should be provided within the metadata header;
872855
# add these here.
@@ -968,7 +951,8 @@ async def read_modify_write_row(
968951
"the individual field arguments should be set."
969952
)
970953

971-
request = bigtable.ReadModifyWriteRowRequest(request)
954+
if not isinstance(request, bigtable.ReadModifyWriteRowRequest):
955+
request = bigtable.ReadModifyWriteRowRequest(request)
972956

973957
# If we have keyword arguments corresponding to fields on the
974958
# request, apply these.
@@ -983,11 +967,9 @@ async def read_modify_write_row(
983967

984968
# Wrap the RPC method; this adds retry and timeout information,
985969
# and friendly error handling.
986-
rpc = gapic_v1.method_async.wrap_method(
987-
self._client._transport.read_modify_write_row,
988-
default_timeout=20.0,
989-
client_info=DEFAULT_CLIENT_INFO,
990-
)
970+
rpc = self._client._transport._wrapped_methods[
971+
self._client._transport.read_modify_write_row
972+
]
991973

992974
# Certain fields should be provided within the metadata header;
993975
# add these here.
@@ -1076,7 +1058,10 @@ def generate_initial_change_stream_partitions(
10761058
"the individual field arguments should be set."
10771059
)
10781060

1079-
request = bigtable.GenerateInitialChangeStreamPartitionsRequest(request)
1061+
if not isinstance(
1062+
request, bigtable.GenerateInitialChangeStreamPartitionsRequest
1063+
):
1064+
request = bigtable.GenerateInitialChangeStreamPartitionsRequest(request)
10801065

10811066
# If we have keyword arguments corresponding to fields on the
10821067
# request, apply these.
@@ -1174,7 +1159,8 @@ def read_change_stream(
11741159
"the individual field arguments should be set."
11751160
)
11761161

1177-
request = bigtable.ReadChangeStreamRequest(request)
1162+
if not isinstance(request, bigtable.ReadChangeStreamRequest):
1163+
request = bigtable.ReadChangeStreamRequest(request)
11781164

11791165
# If we have keyword arguments corresponding to fields on the
11801166
# request, apply these.

google/cloud/bigtable_v2/services/bigtable/transports/grpc_asyncio.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
from google.api_core import gapic_v1
2020
from google.api_core import grpc_helpers_async
21+
from google.api_core import exceptions as core_exceptions
22+
from google.api_core import retry as retries
2123
from google.auth import credentials as ga_credentials # type: ignore
2224
from google.auth.transport.grpc import SslCredentials # type: ignore
2325

@@ -512,6 +514,66 @@ def read_change_stream(
512514
)
513515
return self._stubs["read_change_stream"]
514516

517+
def _prep_wrapped_messages(self, client_info):
518+
# Precompute the wrapped methods.
519+
self._wrapped_methods = {
520+
self.read_rows: gapic_v1.method_async.wrap_method(
521+
self.read_rows,
522+
default_timeout=43200.0,
523+
client_info=client_info,
524+
),
525+
self.sample_row_keys: gapic_v1.method_async.wrap_method(
526+
self.sample_row_keys,
527+
default_timeout=60.0,
528+
client_info=client_info,
529+
),
530+
self.mutate_row: gapic_v1.method_async.wrap_method(
531+
self.mutate_row,
532+
default_retry=retries.Retry(
533+
initial=0.01,
534+
maximum=60.0,
535+
multiplier=2,
536+
predicate=retries.if_exception_type(
537+
core_exceptions.DeadlineExceeded,
538+
core_exceptions.ServiceUnavailable,
539+
),
540+
deadline=60.0,
541+
),
542+
default_timeout=60.0,
543+
client_info=client_info,
544+
),
545+
self.mutate_rows: gapic_v1.method_async.wrap_method(
546+
self.mutate_rows,
547+
default_timeout=600.0,
548+
client_info=client_info,
549+
),
550+
self.check_and_mutate_row: gapic_v1.method_async.wrap_method(
551+
self.check_and_mutate_row,
552+
default_timeout=20.0,
553+
client_info=client_info,
554+
),
555+
self.ping_and_warm: gapic_v1.method_async.wrap_method(
556+
self.ping_and_warm,
557+
default_timeout=None,
558+
client_info=client_info,
559+
),
560+
self.read_modify_write_row: gapic_v1.method_async.wrap_method(
561+
self.read_modify_write_row,
562+
default_timeout=20.0,
563+
client_info=client_info,
564+
),
565+
self.generate_initial_change_stream_partitions: gapic_v1.method_async.wrap_method(
566+
self.generate_initial_change_stream_partitions,
567+
default_timeout=60.0,
568+
client_info=client_info,
569+
),
570+
self.read_change_stream: gapic_v1.method_async.wrap_method(
571+
self.read_change_stream,
572+
default_timeout=43200.0,
573+
client_info=client_info,
574+
),
575+
}
576+
515577
def close(self):
516578
return self.grpc_channel.close()
517579

tests/unit/data/_async/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ async def test_ctor_dict_options(self):
141141
async def test_veneer_grpc_headers(self):
142142
# client_info should be populated with headers to
143143
# detect as a veneer client
144-
patch = mock.patch("google.api_core.gapic_v1.method.wrap_method")
144+
patch = mock.patch("google.api_core.gapic_v1.method_async.wrap_method")
145145
with patch as gapic_mock:
146146
client = self._make_one(project="project-id")
147147
wrapped_call_list = gapic_mock.call_args_list

0 commit comments

Comments
 (0)