-
Couldn't load subscription status.
- Fork 62
feat: implement mutate rows #769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
73 commits
Select commit
Hold shift + click to select a range
1d02154
added initial implementation of mutate_rows
daniel-sanche ab63cba
implemented mutation models
daniel-sanche cf9daa5
added retries to mutate_row
daniel-sanche 1247da4
return exception group if possible
daniel-sanche 3b3ed8c
check for idempotence
daniel-sanche 5d20037
initial implementation for bulk_mutations
daniel-sanche 3d322a1
include successes in bulk mutation error message
daniel-sanche a31232b
fixed style checks
daniel-sanche 8da2d65
added basic system tests
daniel-sanche 2b89d9c
added unit tests for mutate_row
daniel-sanche 47c5985
ran blacken
daniel-sanche 38fdcd7
improved exceptions
daniel-sanche 504d2d8
added bulk_mutate_rows unit tests
daniel-sanche b16067f
ran blacken
daniel-sanche 3ab1405
support __new___ for exceptions for python3.11+
daniel-sanche 0a6c0c6
added exception unit tests
daniel-sanche ec043cf
makde exceptions tuple
daniel-sanche 518530e
got exceptions to print consistently across versions
daniel-sanche 9624729
added test for 311 rich traceback
daniel-sanche 3087081
moved retryable row mutations to new file
daniel-sanche 9df588f
use index map
daniel-sanche 7ed8be3
added docstring
daniel-sanche 2536cc4
added predicate check to failed mutations
daniel-sanche 1f6875c
added _mutate_rows tests
daniel-sanche 1ea24e6
improved client tests
daniel-sanche 25ca2d2
refactored to loop by raising exception
daniel-sanche c0787db
refactored retry deadline logic into shared wrapper
daniel-sanche 3ed5c3d
ran black
daniel-sanche a91fbcb
pulled in table default timeouts
daniel-sanche df8a058
added tests for shared deadline parsing function
daniel-sanche b866b57
added tests for mutation models
daniel-sanche 54a4d43
fixed linter errors
daniel-sanche bd51dc4
added tests for BulkMutationsEntry
daniel-sanche 921b05a
improved mutations documentation
daniel-sanche 82ea61f
refactored mutate_rows logic into helper function
daniel-sanche fa42b86
implemented callbacks for mutate_rows
daniel-sanche 01a16f3
made exceptions into a tuple
daniel-sanche 6140acb
remove aborted from retryable errors
daniel-sanche 36ba2b6
improved SetCell mutation
daniel-sanche b3c9017
fixed mutations tests
daniel-sanche cac9e2d
SetCell timestamps use millisecond precision
daniel-sanche 34b051f
renamed BulkMutationsEntry to RowMutationEntry
daniel-sanche 63ac35c
Merge branch 'v3' into mutate_rows
daniel-sanche a51201c
added metadata to mutate rows and bulk mutate rows
daniel-sanche a21bebf
moved _convert_retry_deadline wrapper from exceptions into _helpers
daniel-sanche 4ca89d9
fixed system tests
daniel-sanche b240ee1
only handle precision adjustment when creating timestamp
daniel-sanche cb0e951
added _from_dict for mutation models
daniel-sanche a9cf385
rpc timeouts adjust when approaching operation_timeout
daniel-sanche eddc1c9
pass table instead of request dict
daniel-sanche f8b26aa
refactoring mutate rows
daniel-sanche 5b80dc5
made on_terminal_state into coroutine
daniel-sanche 9e5b80a
fixed style issues
daniel-sanche f7539f6
moved callback rewriting into retryable attempt
daniel-sanche e77a4fa
fixed tests
daniel-sanche 4e19ed0
pop successful mutations from error dict
daniel-sanche 920e4b7
removed unneeded check
daniel-sanche 725f5ff
refactoring
daniel-sanche 1054bc4
pass list of exceptions in callback
daniel-sanche f39a891
raise error in unexpected state
daniel-sanche 1d97135
removed callback
daniel-sanche 88e2bf5
refactoring mutation attempt into class
daniel-sanche a3c0166
use partial function
daniel-sanche 70c35ef
renamed class
daniel-sanche e00f592
added comments
daniel-sanche 18af78a
added tests
daniel-sanche 23e84f5
improved helpers
daniel-sanche 56fdf7c
refactored operation into class only
daniel-sanche aca31f0
restructured how remaining indices are tracked
daniel-sanche 5a5d541
fixed tests
daniel-sanche afed731
added docstrings
daniel-sanche 2396ec8
moved index deletion to end of block
daniel-sanche 3d441a2
added comment to exception types
daniel-sanche File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
| from __future__ import annotations | ||
|
|
||
| from typing import Callable, Any | ||
| from inspect import iscoroutinefunction | ||
| import time | ||
|
|
||
| from google.api_core import exceptions as core_exceptions | ||
| from google.cloud.bigtable.exceptions import RetryExceptionGroup | ||
|
|
||
| """ | ||
| Helper functions used in various places in the library. | ||
| """ | ||
|
|
||
|
|
||
| def _make_metadata( | ||
| table_name: str, app_profile_id: str | None | ||
| ) -> list[tuple[str, str]]: | ||
| """ | ||
| Create properly formatted gRPC metadata for requests. | ||
| """ | ||
| params = [] | ||
| params.append(f"table_name={table_name}") | ||
| if app_profile_id is not None: | ||
| params.append(f"app_profile_id={app_profile_id}") | ||
| params_str = ",".join(params) | ||
| return [("x-goog-request-params", params_str)] | ||
|
|
||
|
|
||
| def _attempt_timeout_generator( | ||
| per_request_timeout: float | None, operation_timeout: float | ||
| ): | ||
| """ | ||
| Generator that yields the timeout value for each attempt of a retry loop. | ||
|
|
||
| Will return per_request_timeout until the operation_timeout is approached, | ||
| at which point it will return the remaining time in the operation_timeout. | ||
|
|
||
| Args: | ||
| - per_request_timeout: The timeout value to use for each request, in seconds. | ||
| If None, the operation_timeout will be used for each request. | ||
| - operation_timeout: The timeout value to use for the entire operationm in seconds. | ||
| Yields: | ||
| - The timeout value to use for the next request, in seonds | ||
| """ | ||
| per_request_timeout = ( | ||
| per_request_timeout if per_request_timeout is not None else operation_timeout | ||
| ) | ||
| deadline = operation_timeout + time.monotonic() | ||
| while True: | ||
| yield max(0, min(per_request_timeout, deadline - time.monotonic())) | ||
|
|
||
|
|
||
| def _convert_retry_deadline( | ||
| func: Callable[..., Any], | ||
| timeout_value: float | None = None, | ||
| retry_errors: list[Exception] | None = None, | ||
| ): | ||
| """ | ||
| Decorator to convert RetryErrors raised by api_core.retry into | ||
| DeadlineExceeded exceptions, indicating that the underlying retries have | ||
| exhaused the timeout value. | ||
| Optionally attaches a RetryExceptionGroup to the DeadlineExceeded.__cause__, | ||
| detailing the failed exceptions associated with each retry. | ||
|
|
||
| Supports both sync and async function wrapping. | ||
|
|
||
| Args: | ||
| - func: The function to decorate | ||
| - timeout_value: The timeout value to display in the DeadlineExceeded error message | ||
| - retry_errors: An optional list of exceptions to attach as a RetryExceptionGroup to the DeadlineExceeded.__cause__ | ||
| """ | ||
| timeout_str = f" of {timeout_value:.1f}s" if timeout_value is not None else "" | ||
| error_str = f"operation_timeout{timeout_str} exceeded" | ||
|
|
||
| def handle_error(): | ||
| new_exc = core_exceptions.DeadlineExceeded( | ||
| error_str, | ||
| ) | ||
| source_exc = None | ||
| if retry_errors: | ||
| source_exc = RetryExceptionGroup(retry_errors) | ||
| new_exc.__cause__ = source_exc | ||
| raise new_exc from source_exc | ||
|
|
||
| # separate wrappers for async and sync functions | ||
| async def wrapper_async(*args, **kwargs): | ||
| try: | ||
| return await func(*args, **kwargs) | ||
| except core_exceptions.RetryError: | ||
| handle_error() | ||
|
|
||
| def wrapper(*args, **kwargs): | ||
| try: | ||
| return func(*args, **kwargs) | ||
| except core_exceptions.RetryError: | ||
| handle_error() | ||
|
|
||
| return wrapper_async if iscoroutinefunction(func) else wrapper | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,210 @@ | ||
| # Copyright 2023 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING | ||
| import functools | ||
|
|
||
| from google.api_core import exceptions as core_exceptions | ||
| from google.api_core import retry_async as retries | ||
| import google.cloud.bigtable.exceptions as bt_exceptions | ||
| from google.cloud.bigtable._helpers import _make_metadata | ||
| from google.cloud.bigtable._helpers import _convert_retry_deadline | ||
| from google.cloud.bigtable._helpers import _attempt_timeout_generator | ||
|
|
||
| if TYPE_CHECKING: | ||
| from google.cloud.bigtable_v2.services.bigtable.async_client import ( | ||
| BigtableAsyncClient, | ||
| ) | ||
| from google.cloud.bigtable.client import Table | ||
| from google.cloud.bigtable.mutations import RowMutationEntry | ||
|
|
||
|
|
||
| class _MutateRowsIncomplete(RuntimeError): | ||
| """ | ||
| Exception raised when a mutate_rows call has unfinished work. | ||
| """ | ||
|
|
||
| pass | ||
|
|
||
|
|
||
| class _MutateRowsOperation: | ||
| """ | ||
| MutateRowsOperation manages the logic of sending a set of row mutations, | ||
| and retrying on failed entries. It manages this using the _run_attempt | ||
| function, which attempts to mutate all outstanding entries, and raises | ||
| _MutateRowsIncomplete if any retryable errors are encountered. | ||
|
|
||
| Errors are exposed as a MutationsExceptionGroup, which contains a list of | ||
| exceptions organized by the related failed mutation entries. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| gapic_client: "BigtableAsyncClient", | ||
| table: "Table", | ||
| mutation_entries: list["RowMutationEntry"], | ||
| operation_timeout: float, | ||
| per_request_timeout: float | None, | ||
| ): | ||
| """ | ||
| Args: | ||
| - gapic_client: the client to use for the mutate_rows call | ||
| - table: the table associated with the request | ||
| - mutation_entries: a list of RowMutationEntry objects to send to the server | ||
| - operation_timeout: the timeout t o use for the entire operation, in seconds. | ||
| - per_request_timeout: the timeoutto use for each mutate_rows attempt, in seconds. | ||
| If not specified, the request will run until operation_timeout is reached. | ||
| """ | ||
| # create partial function to pass to trigger rpc call | ||
| metadata = _make_metadata(table.table_name, table.app_profile_id) | ||
| self._gapic_fn = functools.partial( | ||
| gapic_client.mutate_rows, | ||
| table_name=table.table_name, | ||
| app_profile_id=table.app_profile_id, | ||
| metadata=metadata, | ||
| ) | ||
| # create predicate for determining which errors are retryable | ||
| self.is_retryable = retries.if_exception_type( | ||
| # RPC level errors | ||
| core_exceptions.DeadlineExceeded, | ||
| core_exceptions.ServiceUnavailable, | ||
| # Entry level errors | ||
| _MutateRowsIncomplete, | ||
| ) | ||
| # build retryable operation | ||
| retry = retries.AsyncRetry( | ||
| predicate=self.is_retryable, | ||
| timeout=operation_timeout, | ||
| initial=0.01, | ||
| multiplier=2, | ||
| maximum=60, | ||
| ) | ||
| retry_wrapped = retry(self._run_attempt) | ||
| self._operation = _convert_retry_deadline(retry_wrapped, operation_timeout) | ||
| # initialize state | ||
| self.timeout_generator = _attempt_timeout_generator( | ||
| per_request_timeout, operation_timeout | ||
| ) | ||
| self.mutations = mutation_entries | ||
| self.remaining_indices = list(range(len(self.mutations))) | ||
| self.errors: dict[int, list[Exception]] = {} | ||
|
|
||
| async def start(self): | ||
| """ | ||
| Start the operation, and run until completion | ||
|
|
||
| Raises: | ||
| - MutationsExceptionGroup: if any mutations failed | ||
| """ | ||
| try: | ||
| # trigger mutate_rows | ||
| await self._operation() | ||
| except Exception as exc: | ||
| # exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations | ||
| incomplete_indices = self.remaining_indices.copy() | ||
| for idx in incomplete_indices: | ||
| self._handle_entry_error(idx, exc) | ||
| finally: | ||
| # raise exception detailing incomplete mutations | ||
| all_errors = [] | ||
| for idx, exc_list in self.errors.items(): | ||
| if len(exc_list) == 0: | ||
| raise core_exceptions.ClientError( | ||
| f"Mutation {idx} failed with no associated errors" | ||
| ) | ||
| elif len(exc_list) == 1: | ||
| cause_exc = exc_list[0] | ||
| else: | ||
| cause_exc = bt_exceptions.RetryExceptionGroup(exc_list) | ||
| entry = self.mutations[idx] | ||
| all_errors.append( | ||
| bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc) | ||
| ) | ||
| if all_errors: | ||
| raise bt_exceptions.MutationsExceptionGroup( | ||
| all_errors, len(self.mutations) | ||
| ) | ||
|
|
||
| async def _run_attempt(self): | ||
| """ | ||
| Run a single attempt of the mutate_rows rpc. | ||
|
|
||
| Raises: | ||
| - _MutateRowsIncomplete: if there are failed mutations eligible for | ||
| retry after the attempt is complete | ||
| - GoogleAPICallError: if the gapic rpc fails | ||
| """ | ||
| request_entries = [ | ||
| self.mutations[idx]._to_dict() for idx in self.remaining_indices | ||
| ] | ||
| # track mutations in this request that have not been finalized yet | ||
| active_request_indices = { | ||
| req_idx: orig_idx for req_idx, orig_idx in enumerate(self.remaining_indices) | ||
| } | ||
| self.remaining_indices = [] | ||
| if not request_entries: | ||
| # no more mutations. return early | ||
| return | ||
| # make gapic request | ||
| try: | ||
| result_generator = await self._gapic_fn( | ||
| timeout=next(self.timeout_generator), | ||
| entries=request_entries, | ||
| ) | ||
| async for result_list in result_generator: | ||
| for result in result_list.entries: | ||
| # convert sub-request index to global index | ||
| orig_idx = active_request_indices[result.index] | ||
| entry_error = core_exceptions.from_grpc_status( | ||
| result.status.code, | ||
| result.status.message, | ||
| details=result.status.details, | ||
| ) | ||
| if result.status.code != 0: | ||
| # mutation failed; update error list (and remaining_indices if retryable) | ||
| self._handle_entry_error(orig_idx, entry_error) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment that this will update remaining_indices |
||
| # remove processed entry from active list | ||
| del active_request_indices[result.index] | ||
| except Exception as exc: | ||
| # add this exception to list for each mutation that wasn't | ||
| # already handled, and update remaining_indices if mutation is retryable | ||
| for idx in active_request_indices.values(): | ||
| self._handle_entry_error(idx, exc) | ||
| # bubble up exception to be handled by retry wrapper | ||
| raise | ||
| # check if attempt succeeded, or needs to be retried | ||
| if self.remaining_indices: | ||
| # unfinished work; raise exception to trigger retry | ||
| raise _MutateRowsIncomplete | ||
|
|
||
| def _handle_entry_error(self, idx: int, exc: Exception): | ||
| """ | ||
| Add an exception to the list of exceptions for a given mutation index, | ||
| and add the index to the list of remaining indices if the exception is | ||
| retryable. | ||
|
|
||
| Args: | ||
| - idx: the index of the mutation that failed | ||
| - exc: the exception to add to the list | ||
| """ | ||
| entry = self.mutations[idx] | ||
| self.errors.setdefault(idx, []).append(exc) | ||
| if ( | ||
| entry.is_idempotent() | ||
| and self.is_retryable(exc) | ||
| and idx not in self.remaining_indices | ||
| ): | ||
| self.remaining_indices.append(idx) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.