From 3629c6e7cd049d3b5c820367dda0241fe1c23dac Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 26 Mar 2025 15:23:40 -0700 Subject: [PATCH 1/2] Check for grpc status details before indexing & unpacking --- temporalio/client.py | 5 +-- tests/worker/test_update_with_start.py | 49 ++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 320a196e3..c2d54cc92 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -6073,7 +6073,9 @@ def on_start( multiop_failure = ( temporalio.api.errordetails.v1.MultiOperationExecutionFailure() ) - if err.grpc_status.details[0].Unpack(multiop_failure): + if err.grpc_status.details and err.grpc_status.details[0].Unpack( + multiop_failure + ): status = next( ( st @@ -6108,7 +6110,6 @@ def on_start( RPCStatusCode(status.code), err.raw_grpc_status, ) - raise err finally: if err and not seen_start: diff --git a/tests/worker/test_update_with_start.py b/tests/worker/test_update_with_start.py index 3f975742c..40d7f39f5 100644 --- a/tests/worker/test_update_with_start.py +++ b/tests/worker/test_update_with_start.py @@ -10,6 +10,7 @@ import pytest +import temporalio.api.common.v1 from temporalio import activity, workflow from temporalio.client import ( Client, @@ -25,6 +26,7 @@ WorkflowIDConflictPolicy, ) from temporalio.exceptions import ApplicationError, WorkflowAlreadyStartedError +from temporalio.service import RPCError, RPCStatusCode from temporalio.testing import WorkflowEnvironment from tests.helpers import ( new_worker, @@ -805,3 +807,50 @@ async def test_update_with_start_two_param(client: Client): assert await wf_handle.result() == WorkflowResult( result="workflow-arg1-workflow-arg2" ) + + +class ErrorClientInterceptor(Interceptor): + def intercept_client(self, next: OutboundInterceptor) -> OutboundInterceptor: + return EmptyDetailsErrorInterceptor(super().intercept_client(next)) + + +class EmptyDetailsErrorInterceptor(OutboundInterceptor): + def __init__(self, next: OutboundInterceptor) -> None: + super().__init__(next) + + async def start_update_with_start_workflow(self, *args, **kwargs): + empty_details_err = RPCError("empty details", RPCStatusCode.INTERNAL, b"") + # Set grpc_status with empty details + empty_details_err._grpc_status = temporalio.api.common.v1.GrpcStatus(details=[]) + raise empty_details_err + + +# Verify correcting issue #791 +async def test_start_update_with_start_empty_details_interceptor(client: Client): + # Create a client with error interceptor + client_with_interceptor = Client( + client.service_client, + namespace=client.namespace, + interceptors=[ErrorClientInterceptor()], + ) + + async with new_worker( + client, + UpdateWithStartInterceptorWorkflow, + ) as worker: + with pytest.raises(RPCError) as err: + await client_with_interceptor.start_update_with_start_workflow( + UpdateWithStartInterceptorWorkflow.my_update, + "original-update-arg", + start_workflow_operation=WithStartWorkflowOperation( + UpdateWithStartInterceptorWorkflow.run, + "original-workflow-arg", + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + id_conflict_policy=WorkflowIDConflictPolicy.FAIL, + ), + wait_for_stage=WorkflowUpdateStage.ACCEPTED, + ) + assert err.value.status == RPCStatusCode.INTERNAL + assert err.value.message == "empty details" + assert len(err.value.grpc_status.details) == 0 From 3de4acfc5beacee5c9d49a90d87b8f48f731af3d Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 27 Mar 2025 11:10:14 -0700 Subject: [PATCH 2/2] fix test - mock workflow service call instead of using interceptor (interceptor does not call code) --- tests/worker/test_update_with_start.py | 60 +++++++++++++------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/tests/worker/test_update_with_start.py b/tests/worker/test_update_with_start.py index 40d7f39f5..6302ee51e 100644 --- a/tests/worker/test_update_with_start.py +++ b/tests/worker/test_update_with_start.py @@ -5,12 +5,13 @@ from dataclasses import dataclass from datetime import timedelta from enum import Enum -from typing import Any, Iterator +from typing import Any, Iterator, Mapping, Optional from unittest.mock import patch import pytest import temporalio.api.common.v1 +import temporalio.api.workflowservice.v1 from temporalio import activity, workflow from temporalio.client import ( Client, @@ -26,7 +27,7 @@ WorkflowIDConflictPolicy, ) from temporalio.exceptions import ApplicationError, WorkflowAlreadyStartedError -from temporalio.service import RPCError, RPCStatusCode +from temporalio.service import RPCError, RPCStatusCode, ServiceCall from temporalio.testing import WorkflowEnvironment from tests.helpers import ( new_worker, @@ -809,44 +810,43 @@ async def test_update_with_start_two_param(client: Client): ) -class ErrorClientInterceptor(Interceptor): - def intercept_client(self, next: OutboundInterceptor) -> OutboundInterceptor: - return EmptyDetailsErrorInterceptor(super().intercept_client(next)) - - -class EmptyDetailsErrorInterceptor(OutboundInterceptor): - def __init__(self, next: OutboundInterceptor) -> None: - super().__init__(next) - - async def start_update_with_start_workflow(self, *args, **kwargs): +# Verify correcting issue #791 +async def test_start_update_with_start_empty_details(client: Client): + class execute_multi_operation( + ServiceCall[ + temporalio.api.workflowservice.v1.ExecuteMultiOperationRequest, + temporalio.api.workflowservice.v1.ExecuteMultiOperationResponse, + ] + ): empty_details_err = RPCError("empty details", RPCStatusCode.INTERNAL, b"") # Set grpc_status with empty details empty_details_err._grpc_status = temporalio.api.common.v1.GrpcStatus(details=[]) - raise empty_details_err - -# Verify correcting issue #791 -async def test_start_update_with_start_empty_details_interceptor(client: Client): - # Create a client with error interceptor - client_with_interceptor = Client( - client.service_client, - namespace=client.namespace, - interceptors=[ErrorClientInterceptor()], - ) - - async with new_worker( - client, - UpdateWithStartInterceptorWorkflow, - ) as worker: + def __init__(self) -> None: + pass + + async def __call__( + self, + req: temporalio.api.workflowservice.v1.ExecuteMultiOperationRequest, + *, + retry: bool = False, + metadata: Mapping[str, str] = {}, + timeout: Optional[timedelta] = None, + ) -> temporalio.api.workflowservice.v1.ExecuteMultiOperationResponse: + raise self.empty_details_err + + with patch.object( + client.workflow_service, "execute_multi_operation", execute_multi_operation() + ): with pytest.raises(RPCError) as err: - await client_with_interceptor.start_update_with_start_workflow( + await client.start_update_with_start_workflow( UpdateWithStartInterceptorWorkflow.my_update, "original-update-arg", start_workflow_operation=WithStartWorkflowOperation( UpdateWithStartInterceptorWorkflow.run, - "original-workflow-arg", + "wf-arg", id=f"wf-{uuid.uuid4()}", - task_queue=worker.task_queue, + task_queue="tq", id_conflict_policy=WorkflowIDConflictPolicy.FAIL, ), wait_for_stage=WorkflowUpdateStage.ACCEPTED,