Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
18650af
added function for populating shard
daniel-sanche Jul 24, 2023
df05171
Merge branch 'v3' into benchmarks_prod
daniel-sanche Jul 24, 2023
4db25b3
got table population working
daniel-sanche Jul 24, 2023
28cc1f4
finished table population
daniel-sanche Jul 24, 2023
9e97ac0
pulled out table setup fixtures from system tests
daniel-sanche Jul 25, 2023
52550cd
get system tests working
daniel-sanche Jul 25, 2023
a191236
added some basic throughput code
daniel-sanche Jul 25, 2023
31dbbd9
count operations
daniel-sanche Jul 25, 2023
eac206a
added splits to table
daniel-sanche Jul 25, 2023
ea771de
extracted cluster config
daniel-sanche Jul 25, 2023
2eda0ac
chanted test name
daniel-sanche Jul 25, 2023
f0649fd
improved benchmark output
daniel-sanche Jul 25, 2023
fbf2636
fixed population to use new format
daniel-sanche Jul 25, 2023
21df0c2
added sharded scan benchmark
daniel-sanche Jul 25, 2023
91ef008
more accurate time tracking
daniel-sanche Jul 25, 2023
f91750f
added script to deploy to GCE
daniel-sanche Jul 26, 2023
0236f85
limit to one qualifier
daniel-sanche Jul 26, 2023
fd94350
added separate test for point reads
daniel-sanche Jul 26, 2023
82ecaf2
made row size configurable
daniel-sanche Jul 26, 2023
a35f985
don't timeout benchmarks at deadline
daniel-sanche Jul 26, 2023
a440d76
added fastapi benchmark
daniel-sanche Jul 26, 2023
d0fcdb9
added driver for fastapi class
daniel-sanche Jul 26, 2023
0bb70ef
delete VM on complete; improved formatting
daniel-sanche Jul 27, 2023
3f3f59a
improved fastapi benchmark
daniel-sanche Jul 27, 2023
5a6ffad
run tests concurrently
daniel-sanche Jul 27, 2023
604071f
saved test_system changes
daniel-sanche Jul 27, 2023
3686ef6
added profling to point reads
daniel-sanche Jul 27, 2023
c42f596
add duration to run script
daniel-sanche Jul 27, 2023
e2ac258
build async generator manually
daniel-sanche Jul 28, 2023
1228c8a
lazy parse cell data
daniel-sanche Jul 28, 2023
1a3a8e7
added profiling to scans
daniel-sanche Jul 28, 2023
0659161
removed iscouroutine check
daniel-sanche Jul 28, 2023
ce2b733
added legacy benchmark
daniel-sanche Jul 28, 2023
4714e8a
added thread to legacy tests
daniel-sanche Jul 28, 2023
3111543
cut out row adapter
daniel-sanche Jul 28, 2023
c717a0b
lazy load rows
daniel-sanche Jul 28, 2023
b5cb253
simplify generator layers for non-stremaing
daniel-sanche Jul 29, 2023
aa4bed4
change pool size
daniel-sanche Jul 29, 2023
613b032
disabled lazy loading (75k rps)
daniel-sanche Jul 29, 2023
8e37967
only check for last_scanned when there are no chunks (81k rps)
daniel-sanche Jul 30, 2023
6775e2b
refactored chunk parsing (80k rps)
daniel-sanche Jul 31, 2023
537f4ff
removed subclass for row
daniel-sanche Jul 31, 2023
58b3fbb
added igor's row merger
daniel-sanche Jul 31, 2023
a7478ab
use stream by default
daniel-sanche Aug 7, 2023
689f072
use our models
daniel-sanche Aug 7, 2023
107b8c5
wrapped in object
daniel-sanche Aug 7, 2023
d76a7dc
updated row merger for acceptance tests
daniel-sanche Aug 8, 2023
1d9a475
improve performance
daniel-sanche Aug 8, 2023
37bc3a0
removed HasField from family
daniel-sanche Aug 8, 2023
ea95dcc
added request revision logic
daniel-sanche Aug 8, 2023
9b60693
added back split cell verifications
daniel-sanche Aug 8, 2023
24de29e
got all acceptance tests passing
daniel-sanche Aug 8, 2023
9c9ba5e
track last yielded key
daniel-sanche Aug 8, 2023
5d49ad8
track reamining count
daniel-sanche Aug 8, 2023
5ecfa3e
added retries
daniel-sanche Aug 8, 2023
13603ea
added exception conversion
daniel-sanche Aug 8, 2023
b40e1cb
moved into single function
daniel-sanche Aug 8, 2023
0f58066
Fixed style issues
daniel-sanche Aug 9, 2023
07341da
Merge branch 'v3' into optimize_read_rows_2
daniel-sanche Aug 9, 2023
90b85b3
fixed row and helpers tests
daniel-sanche Aug 9, 2023
4298e38
fixed some read rows tests
daniel-sanche Aug 9, 2023
62f1bf7
comments and cleanup
daniel-sanche Aug 9, 2023
98510c3
fixed issues with conformance tests
daniel-sanche Aug 9, 2023
4cf4aab
improved proto conversion
daniel-sanche Aug 9, 2023
0f0ccee
fixed read_rows tests
daniel-sanche Aug 10, 2023
42f44da
fixed tests
daniel-sanche Aug 10, 2023
3d1804c
optimizing query class
daniel-sanche Aug 10, 2023
b0882af
keep filter and limit out of proto storage
daniel-sanche Aug 10, 2023
2f2286d
fixed tests
daniel-sanche Aug 10, 2023
fbe298e
skip check when not necessary
daniel-sanche Aug 10, 2023
2a82343
experiment: yield in row batches
daniel-sanche Aug 10, 2023
4e6dd6f
Revert "experiment: yield in row batches"
daniel-sanche Aug 10, 2023
daa7a59
removed benchmarks dir
daniel-sanche Aug 10, 2023
e91d693
fixed type annotation
daniel-sanche Aug 10, 2023
cb28451
added slots to query
daniel-sanche Aug 10, 2023
1dd4d0c
use separate instances for each run
daniel-sanche Aug 14, 2023
6d2dab6
made _ReadRowOperation into slots
daniel-sanche Aug 14, 2023
309405a
clean up style issues
daniel-sanche Aug 14, 2023
e36d70b
removed commented test
daniel-sanche Aug 14, 2023
eb4bcfa
use optimizations from retries
daniel-sanche Aug 15, 2023
f6e8cd2
removed layer of wrapping
daniel-sanche Aug 15, 2023
5037891
fixed tests
daniel-sanche Aug 15, 2023
ea5b4f9
updated retryable stream submodule
daniel-sanche Aug 15, 2023
9456196
updated dependency version
daniel-sanche Aug 16, 2023
b9f8cdf
added todo
daniel-sanche Aug 16, 2023
fa0734c
Merge branch 'v3' into optimize_retries
daniel-sanche Aug 17, 2023
3323a66
moved exception builder into own method
daniel-sanche Aug 17, 2023
76d6df7
updated constraints
daniel-sanche Aug 17, 2023
bf669ad
fixed lint issues
daniel-sanche Aug 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 36 additions & 35 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from google.cloud.bigtable.data._helpers import _make_metadata

from google.api_core import retry_async as retries
from google.api_core.retry_streaming_async import AsyncRetryableGenerator
from google.api_core.retry_streaming_async import retry_target_stream
from google.api_core.retry import exponential_sleep_generator
from google.api_core import exceptions as core_exceptions

Expand Down Expand Up @@ -100,35 +100,17 @@ def __init__(
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None

async def start_operation(self) -> AsyncGenerator[Row, None]:
def start_operation(self) -> AsyncGenerator[Row, None]:
"""
Start the read_rows operation, retrying on retryable errors.
"""
transient_errors = []

def on_error_fn(exc):
if self._predicate(exc):
transient_errors.append(exc)

retry_gen = AsyncRetryableGenerator(
return retry_target_stream(
self._read_rows_attempt,
self._predicate,
exponential_sleep_generator(0.01, 60, multiplier=2),
self.operation_timeout,
on_error_fn,
exception_factory=self._build_exception,
)
try:
async for row in retry_gen:
yield row
if self._remaining_count is not None:
self._remaining_count -= 1
if self._remaining_count < 0:
raise RuntimeError("emit count exceeds row limit")
except core_exceptions.RetryError:
self._raise_retry_error(transient_errors)
except GeneratorExit:
# propagate close to wrapped generator
await retry_gen.aclose()

def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
"""
Expand Down Expand Up @@ -202,6 +184,10 @@ async def chunk_stream(
elif c.commit_row:
# update row state after each commit
self._last_yielded_row_key = current_key
if self._remaining_count is not None:
self._remaining_count -= 1
if self._remaining_count < 0:
raise InvalidChunk("emit count exceeds row limit")
current_key = None

@staticmethod
Expand Down Expand Up @@ -354,19 +340,34 @@ def _revise_request_rowset(
raise _RowSetComplete()
return RowSetPB(row_keys=adjusted_keys, row_ranges=adjusted_ranges)

def _raise_retry_error(self, transient_errors: list[Exception]) -> None:
@staticmethod
def _build_exception(
exc_list: list[Exception], is_timeout: bool, timeout_val: float
) -> tuple[Exception, Exception | None]:
"""
If the retryable deadline is hit, wrap the raised exception
in a RetryExceptionGroup
Build retry error based on exceptions encountered during operation

Args:
- exc_list: list of exceptions encountered during operation
- is_timeout: whether the operation failed due to timeout
- timeout_val: the operation timeout value in seconds, for constructing
the error message
Returns:
- tuple of the exception to raise, and a cause exception if applicable
"""
timeout_value = self.operation_timeout
timeout_str = f" of {timeout_value:.1f}s" if timeout_value is not None else ""
error_str = f"operation_timeout{timeout_str} exceeded"
new_exc = core_exceptions.DeadlineExceeded(
error_str,
if is_timeout:
# if failed due to timeout, raise deadline exceeded as primary exception
source_exc: Exception = core_exceptions.DeadlineExceeded(
f"operation_timeout of {timeout_val} exceeded"
)
elif exc_list:
# otherwise, raise non-retryable error as primary exception
source_exc = exc_list.pop()
else:
source_exc = RuntimeError("failed with unspecified exception")
# use the retry exception group as the cause of the exception
cause_exc: Exception | None = (
RetryExceptionGroup(exc_list) if exc_list else None
)
source_exc = None
if transient_errors:
source_exc = RetryExceptionGroup(transient_errors)
new_exc.__cause__ = source_exc
raise new_exc from source_exc
source_exc.__cause__ = cause_exc
return source_exc, cause_exc
3 changes: 3 additions & 0 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ def _attempt_timeout_generator(
yield max(0, min(per_request_timeout, deadline - time.monotonic()))


# TODO:replace this function with an exception_factory passed into the retry when
# feature is merged:
# https://github.com/googleapis/python-bigtable/blob/ea5b4f923e42516729c57113ddbe28096841b952/google/cloud/bigtable/data/_async/_read_rows.py#L130
def _convert_retry_deadline(
func: Callable[..., Any],
timeout_value: float | None = None,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
# 'Development Status :: 5 - Production/Stable'
release_status = "Development Status :: 5 - Production/Stable"
dependencies = [
"google-api-core[grpc] == 2.12.0.dev0", # TODO: change to >= after streaming retries is merged
"google-api-core[grpc] == 2.12.0.dev1", # TODO: change to >= after streaming retries is merged
"google-cloud-core >= 1.4.1, <3.0.0dev",
"grpc-google-iam-v1 >= 0.12.4, <1.0.0dev",
"proto-plus >= 1.22.0, <2.0.0dev",
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#
# e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev",
# Then this file should have foo==1.14.0
google-api-core==2.12.0.dev0
google-api-core==2.12.0.dev1
google-cloud-core==2.3.2
grpc-google-iam-v1==0.12.4
proto-plus==1.22.0
Expand Down
74 changes: 48 additions & 26 deletions tests/unit/data/_async/test__read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,24 +226,34 @@ async def test_revise_limit(self, start_limit, emit_num, expected_limit):
should be raised (tested in test_revise_limit_over_limit)
"""
from google.cloud.bigtable.data import ReadRowsQuery
from google.cloud.bigtable_v2.types import ReadRowsResponse

async def mock_stream():
for i in range(emit_num):
yield i
async def awaitable_stream():
async def mock_stream():
for i in range(emit_num):
yield ReadRowsResponse(
chunks=[
ReadRowsResponse.CellChunk(
row_key=str(i).encode(),
family_name="b",
qualifier=b"c",
value=b"d",
commit_row=True,
)
]
)

return mock_stream()

query = ReadRowsQuery(limit=start_limit)
table = mock.Mock()
table.table_name = "table_name"
table.app_profile_id = "app_profile_id"
with mock.patch.object(
_ReadRowsOperationAsync, "_read_rows_attempt"
) as mock_attempt:
mock_attempt.return_value = mock_stream()
instance = self._make_one(query, table, 10, 10)
assert instance._remaining_count == start_limit
# read emit_num rows
async for val in instance.start_operation():
pass
instance = self._make_one(query, table, 10, 10)
assert instance._remaining_count == start_limit
# read emit_num rows
async for val in instance.chunk_stream(awaitable_stream()):
pass
assert instance._remaining_count == expected_limit

@pytest.mark.parametrize("start_limit,emit_num", [(5, 10), (3, 9), (1, 10)])
Expand All @@ -254,26 +264,37 @@ async def test_revise_limit_over_limit(self, start_limit, emit_num):
(unless start_num == 0, which represents unlimited)
"""
from google.cloud.bigtable.data import ReadRowsQuery
from google.cloud.bigtable_v2.types import ReadRowsResponse
from google.cloud.bigtable.data.exceptions import InvalidChunk

async def mock_stream():
for i in range(emit_num):
yield i
async def awaitable_stream():
async def mock_stream():
for i in range(emit_num):
yield ReadRowsResponse(
chunks=[
ReadRowsResponse.CellChunk(
row_key=str(i).encode(),
family_name="b",
qualifier=b"c",
value=b"d",
commit_row=True,
)
]
)

return mock_stream()

query = ReadRowsQuery(limit=start_limit)
table = mock.Mock()
table.table_name = "table_name"
table.app_profile_id = "app_profile_id"
with mock.patch.object(
_ReadRowsOperationAsync, "_read_rows_attempt"
) as mock_attempt:
mock_attempt.return_value = mock_stream()
instance = self._make_one(query, table, 10, 10)
assert instance._remaining_count == start_limit
with pytest.raises(RuntimeError) as e:
# read emit_num rows
async for val in instance.start_operation():
pass
assert "emit count exceeds row limit" in str(e.value)
instance = self._make_one(query, table, 10, 10)
assert instance._remaining_count == start_limit
with pytest.raises(InvalidChunk) as e:
# read emit_num rows
async for val in instance.chunk_stream(awaitable_stream()):
pass
assert "emit count exceeds row limit" in str(e.value)

@pytest.mark.asyncio
async def test_aclose(self):
Expand Down Expand Up @@ -333,6 +354,7 @@ async def mock_stream():

instance = mock.Mock()
instance._last_yielded_row_key = None
instance._remaining_count = None
stream = _ReadRowsOperationAsync.chunk_stream(instance, mock_awaitable_stream())
await stream.__anext__()
with pytest.raises(InvalidChunk) as exc:
Expand Down