From 8fde7a12eaa508ce5b21dbd4c4882d71af94e699 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 3 Jan 2023 16:10:11 -0600 Subject: [PATCH 1/5] Fix optional params Fixes #236 --- temporalio/client.py | 13 +++++++--- temporalio/common.py | 12 ++++----- tests/worker/test_workflow.py | 46 +++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 10 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 010a46abf..6b69d2b5b 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -320,6 +320,7 @@ async def start_workflow( args: Sequence[Any] = [], id: str, task_queue: str, + result_type: Optional[Type] = None, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, @@ -343,6 +344,7 @@ async def start_workflow( args: Sequence[Any] = [], id: str, task_queue: str, + result_type: Optional[Type] = None, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, @@ -365,6 +367,8 @@ async def start_workflow( args: Multiple arguments to the workflow. Cannot be set if arg is. id: Unique identifier for the workflow execution. task_queue: Task queue to run the workflow on. + result_type: For string workflows, this can set the specific result + type hint to deserialize into. execution_timeout: Total workflow execution timeout including retries and continue as new. run_timeout: Timeout of a single workflow run. @@ -390,13 +394,13 @@ async def start_workflow( """ # Use definition if callable name: str - ret_type: Optional[Type] = None if isinstance(workflow, str): name = workflow elif callable(workflow): defn = temporalio.workflow._Definition.must_from_run_fn(workflow) name = defn.name - ret_type = defn.ret_type + if result_type is None: + result_type = defn.ret_type else: raise TypeError("Workflow must be a string or callable") @@ -417,7 +421,7 @@ async def start_workflow( headers={}, start_signal=start_signal, start_signal_args=start_signal_args, - ret_type=ret_type, + ret_type=result_type, rpc_metadata=rpc_metadata, rpc_timeout=rpc_timeout, ) @@ -506,6 +510,7 @@ async def execute_workflow( args: Sequence[Any] = [], id: str, task_queue: str, + result_type: Optional[Type] = None, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, @@ -529,6 +534,7 @@ async def execute_workflow( args: Sequence[Any] = [], id: str, task_queue: str, + result_type: Optional[Type] = None, execution_timeout: Optional[timedelta] = None, run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, @@ -555,6 +561,7 @@ async def execute_workflow( arg, args=args, task_queue=task_queue, + result_type=result_type, id=id, execution_timeout=execution_timeout, run_timeout=run_timeout, diff --git a/temporalio/common.py b/temporalio/common.py index 4e6465316..eb3a21861 100644 --- a/temporalio/common.py +++ b/temporalio/common.py @@ -222,11 +222,7 @@ def _type_hints_from_func( sig = inspect.signature(func) hints = get_type_hints(func) ret_hint = hints.get("return") - ret = ( - ret_hint - if inspect.isclass(ret_hint) and ret_hint is not inspect.Signature.empty - else None - ) + ret = ret_hint if ret_hint is not inspect.Signature.empty else None args: List[Type] = [] for index, value in enumerate(sig.parameters.values()): # Ignore self on methods @@ -244,7 +240,9 @@ def _type_hints_from_func( return (None, ret) # All params must have annotations or we consider none to have them arg_hint = hints.get(value.name) - if not inspect.isclass(arg_hint) or arg_hint is inspect.Parameter.empty: + if arg_hint is inspect.Parameter.empty: return (None, ret) - args.append(arg_hint) + # Ignoring type here because union/optional isn't really a type + # necessarily + args.append(arg_hint) # type: ignore return args, ret diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 48ad38c7c..59b852f42 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -2646,3 +2646,49 @@ async def test_workflow_custom_failure_converter(client: Client): if not failure.HasField("cause"): break failure = failure.cause + + +@dataclass +class OptionalParam: + some_string: str + + +@workflow.defn +class OptionalParamWorkflow: + @workflow.run + async def run( + self, some_param: Optional[OptionalParam] = OptionalParam(some_string="default") + ) -> Optional[OptionalParam]: + assert some_param is None or ( + isinstance(some_param, OptionalParam) + and some_param.some_string in ["default", "foo"] + ) + return some_param + + +async def test_workflow_optional_param(client: Client): + async with new_worker(client, OptionalParamWorkflow) as worker: + # Don't send a parameter and confirm it is defaulted + result1 = await client.execute_workflow( + "OptionalParamWorkflow", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + result_type=OptionalParam, + ) + assert result1 == OptionalParam(some_string="default") + # Send None explicitly + result2 = await client.execute_workflow( + OptionalParamWorkflow.run, + None, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert result2 is None + # Send param explicitly + result3 = await client.execute_workflow( + OptionalParamWorkflow.run, + OptionalParam(some_string="foo"), + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert result3 == OptionalParam(some_string="foo") From 70b5b3e5c3fa74259e750034134951e958001560 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 3 Jan 2023 17:40:19 -0600 Subject: [PATCH 2/5] Fix activity cancellation reporting on worker shutdown Fixes #237 --- temporalio/worker/_activity.py | 15 ++++++++ tests/worker/test_activity.py | 65 +++++++++++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index c30d29ca2..c1c45a827 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -447,6 +447,21 @@ async def _run_activity( temporalio.activity.logger.warning( "Completing activity as failed", exc_info=True ) + # In some cases, like worker shutdown of an sync activity, + # this results in a CancelledError, but the server will fail + # if you send a cancelled error outside of a requested + # cancellation. So we wrap as a retryable application error. + if isinstance( + err, + (asyncio.CancelledError, temporalio.exceptions.CancelledError), + ): + new_err = temporalio.exceptions.ApplicationError( + "Cancelled without request, possibly due to worker shutdown", + type="CancelledError", + ) + new_err.__traceback__ = err.__traceback__ + new_err.__cause__ = err.__cause__ + err = new_err await self._data_converter.encode_failure( err, completion.result.failed.failure ) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index e2b97bb75..4905963ae 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -4,6 +4,7 @@ import logging.handlers import multiprocessing import queue +import threading import time import uuid from dataclasses import dataclass @@ -12,13 +13,14 @@ import pytest -from temporalio import activity +from temporalio import activity, workflow from temporalio.client import ( AsyncActivityHandle, Client, WorkflowFailureError, WorkflowHandle, ) +from temporalio.common import RetryPolicy from temporalio.exceptions import ( ActivityError, ApplicationError, @@ -375,6 +377,67 @@ def wait_cancel() -> None: assert events == ["pre1", "pre2", "pre3", "post3", "post2"] +sync_activity_waiting_cancel = threading.Event() + + +@activity.defn +def sync_activity_wait_cancel(): + sync_activity_waiting_cancel.set() + while True: + time.sleep(1) + activity.heartbeat() + + +# We don't sandbox because Python logging uses multiprocessing if it's present +# which we don't want to get warnings about +@workflow.defn(sandboxed=False) +class CancelOnWorkerShutdownWorkflow: + @workflow.run + async def run(self) -> None: + await workflow.execute_activity( + sync_activity_wait_cancel, + start_to_close_timeout=timedelta(hours=1), + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + +# This test used to fail because we were sending a cancelled error and the +# server doesn't allow that +async def test_sync_activity_thread_cancel_on_worker_shutdown(client: Client): + task_queue = f"tq-{uuid.uuid4()}" + + def new_worker() -> Worker: + return Worker( + client, + task_queue=task_queue, + activities=[sync_activity_wait_cancel], + workflows=[CancelOnWorkerShutdownWorkflow], + activity_executor=executor, + max_cached_workflows=0, + ) + + with concurrent.futures.ThreadPoolExecutor() as executor: + async with new_worker(): + # Start the workflow + handle = await client.start_workflow( + CancelOnWorkerShutdownWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=task_queue, + ) + # Wait for activity to start + assert await asyncio.get_running_loop().run_in_executor( + executor, lambda: sync_activity_waiting_cancel.wait(20) + ) + # Shut down the worker + # Start the worker again and wait for result + with pytest.raises(WorkflowFailureError) as err: + async with new_worker(): + await handle.result() + assert isinstance(err.value.cause, ActivityError) + assert isinstance(err.value.cause.cause, ApplicationError) + assert "due to worker shutdown" in err.value.cause.cause.message + + @activity.defn def picklable_activity_wait_cancel() -> str: while not activity.is_cancelled(): From c0ba115e6c9466e444d754971ec630c906797578 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Wed, 4 Jan 2023 09:38:07 -0600 Subject: [PATCH 3/5] Added README note about generic types Fixes #234 --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 2eb50b1a0..c1e295324 100644 --- a/README.md +++ b/README.md @@ -278,6 +278,9 @@ The default data converter supports converting multiple types including: This notably doesn't include any `date`, `time`, or `datetime` objects as they may not work across SDKs. +Classes with generics may not have the generics properly resolved. The current implementation, similar to Pydantic, does +not have generic type resolution. Users should use concrete types. + For converting from JSON, the workflow/activity type hint is taken into account to convert to the proper type. Care has been taken to support all common typings including `Optional`, `Union`, all forms of iterables and mappings, `NewType`, etc in addition to the regular JSON values mentioned before. From a1f90eea0d9b4f38b7d759b6ee3a71be8f79ee06 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Wed, 4 Jan 2023 09:47:48 -0600 Subject: [PATCH 4/5] Use create_task for 3.11 compat in README example Fixes #232 --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c1e295324..8375b9ef4 100644 --- a/README.md +++ b/README.md @@ -360,7 +360,10 @@ class GreetingWorkflow: # Wait for salutation update or complete signal (this can be # cancelled) await asyncio.wait( - [self._greeting_info_update.wait(), self._complete.wait()], + [ + asyncio.create_task(self._greeting_info_update.wait()), + asyncio.create_task(self._complete.wait()), + ], return_when=asyncio.FIRST_COMPLETED, ) if self._complete.is_set(): From 7c83c98306a2040f29eb63aa4232d0426ec2a0a5 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Wed, 4 Jan 2023 10:20:49 -0600 Subject: [PATCH 5/5] Increase assert eventually timeout for slower CI runs --- tests/helpers/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py index d9f9327db..b8140d244 100644 --- a/tests/helpers/__init__.py +++ b/tests/helpers/__init__.py @@ -34,7 +34,7 @@ async def assert_eq_eventually( expected: T, fn: Callable[[], Awaitable[T]], *, - timeout: timedelta = timedelta(seconds=3), + timeout: timedelta = timedelta(seconds=10), interval: timedelta = timedelta(milliseconds=200), ) -> None: start_sec = time.monotonic()