Skip to content

Commit 44412ae

Browse files
authored
Expose Last Completion Result and Previous Run Failure (#1067)
* WIP Last completion result * Fix test and merge artifact * Some cleanup, reset core * Linting * Add previous run failure * Rename * PR feedback
1 parent 5e9b2ba commit 44412ae

File tree

6 files changed

+171
-1
lines changed

6 files changed

+171
-1
lines changed

temporalio/worker/_workflow.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,10 @@ def _create_workflow_instance(
552552
priority=temporalio.common.Priority._from_proto(init.priority),
553553
)
554554

555+
last_failure = (
556+
init.continued_failure if init.HasField("continued_failure") else None
557+
)
558+
555559
# Create instance from details
556560
det = WorkflowInstanceDetails(
557561
payload_converter_class=self._data_converter.payload_converter_class,
@@ -563,6 +567,8 @@ def _create_workflow_instance(
563567
extern_functions=self._extern_functions,
564568
disable_eager_activity_execution=self._disable_eager_activity_execution,
565569
worker_level_failure_exception_types=self._workflow_failure_exception_types,
570+
last_completion_result=init.last_completion_result,
571+
last_failure=last_failure,
566572
)
567573
if defn.sandboxed:
568574
return self._workflow_runner.create_instance(det)

temporalio/worker/_workflow_instance.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import temporalio.workflow
6565
from temporalio.service import __version__
6666

67+
from ..api.failure.v1.message_pb2 import Failure
6768
from ._interceptor import (
6869
ContinueAsNewInput,
6970
ExecuteWorkflowInput,
@@ -143,6 +144,8 @@ class WorkflowInstanceDetails:
143144
extern_functions: Mapping[str, Callable]
144145
disable_eager_activity_execution: bool
145146
worker_level_failure_exception_types: Sequence[Type[BaseException]]
147+
last_completion_result: temporalio.api.common.v1.Payloads
148+
last_failure: Optional[Failure]
146149

147150

148151
class WorkflowInstance(ABC):
@@ -320,6 +323,9 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
320323
# metadata query
321324
self._current_details = ""
322325

326+
self._last_completion_result = det.last_completion_result
327+
self._last_failure = det.last_failure
328+
323329
# The versioning behavior of this workflow, as established by annotation or by the dynamic
324330
# config function. Is only set once upon initialization.
325331
self._versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None
@@ -1703,6 +1709,37 @@ def workflow_is_failure_exception(self, err: BaseException) -> bool:
17031709
)
17041710
)
17051711

1712+
def workflow_has_last_completion_result(self) -> bool:
1713+
return len(self._last_completion_result.payloads) > 0
1714+
1715+
def workflow_last_completion_result(
1716+
self, type_hint: Optional[Type]
1717+
) -> Optional[Any]:
1718+
if len(self._last_completion_result.payloads) == 0:
1719+
return None
1720+
elif len(self._last_completion_result.payloads) > 1:
1721+
warnings.warn(
1722+
f"Expected single last completion result, got {len(self._last_completion_result.payloads)}"
1723+
)
1724+
return None
1725+
1726+
if type_hint is None:
1727+
return self._payload_converter.from_payload(
1728+
self._last_completion_result.payloads[0]
1729+
)
1730+
else:
1731+
return self._payload_converter.from_payload(
1732+
self._last_completion_result.payloads[0], type_hint
1733+
)
1734+
1735+
def workflow_last_failure(self) -> Optional[BaseException]:
1736+
if self._last_failure:
1737+
return self._failure_converter.from_failure(
1738+
self._last_failure, self._payload_converter
1739+
)
1740+
1741+
return None
1742+
17061743
#### Calls from outbound impl ####
17071744
# These are in alphabetical order and all start with "_outbound_".
17081745

@@ -2766,6 +2803,7 @@ def _apply_schedule_command(
27662803
v.start_to_close_timeout.FromTimedelta(self._input.start_to_close_timeout)
27672804
if self._input.retry_policy:
27682805
self._input.retry_policy.apply_to_proto(v.retry_policy)
2806+
27692807
v.cancellation_type = cast(
27702808
temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType,
27712809
int(self._input.cancellation_type),

temporalio/worker/workflow_sandbox/_runner.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import temporalio.worker._workflow_instance
1919
import temporalio.workflow
2020

21+
from ...api.common.v1.message_pb2 import Payloads
22+
from ...api.failure.v1.message_pb2 import Failure
23+
2124
# Workflow instance has to be relative import
2225
from .._workflow_instance import (
2326
UnsandboxedWorkflowRunner,
@@ -84,6 +87,8 @@ def prepare_workflow(self, defn: temporalio.workflow._Definition) -> None:
8487
extern_functions={},
8588
disable_eager_activity_execution=False,
8689
worker_level_failure_exception_types=self._worker_level_failure_exception_types,
90+
last_completion_result=Payloads(),
91+
last_failure=Failure(),
8792
),
8893
)
8994

temporalio/workflow.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import temporalio.workflow
6262
from temporalio.nexus._util import ServiceHandlerT
6363

64+
from .api.failure.v1.message_pb2 import Failure
6465
from .types import (
6566
AnyType,
6667
CallableAsyncNoParam,
@@ -900,6 +901,17 @@ def workflow_set_current_details(self, details: str): ...
900901
@abstractmethod
901902
def workflow_is_failure_exception(self, err: BaseException) -> bool: ...
902903

904+
@abstractmethod
905+
def workflow_has_last_completion_result(self) -> bool: ...
906+
907+
@abstractmethod
908+
def workflow_last_completion_result(
909+
self, type_hint: Optional[Type]
910+
) -> Optional[Any]: ...
911+
912+
@abstractmethod
913+
def workflow_last_failure(self) -> Optional[BaseException]: ...
914+
903915

904916
_current_update_info: contextvars.ContextVar[UpdateInfo] = contextvars.ContextVar(
905917
"__temporal_current_update_info"
@@ -1051,6 +1063,32 @@ def get_current_details() -> str:
10511063
return _Runtime.current().workflow_get_current_details()
10521064

10531065

1066+
def has_last_completion_result() -> bool:
1067+
"""Gets whether there is a last completion result of the workflow."""
1068+
return _Runtime.current().workflow_has_last_completion_result()
1069+
1070+
1071+
@overload
1072+
def get_last_completion_result() -> Optional[Any]: ...
1073+
1074+
1075+
@overload
1076+
def get_last_completion_result(type_hint: Type[ParamType]) -> Optional[ParamType]: ...
1077+
1078+
1079+
def get_last_completion_result(type_hint: Optional[Type] = None) -> Optional[Any]:
1080+
"""Get the result of the last run of the workflow. This will be None if there was
1081+
no previous completion or the result was None. has_last_completion_result()
1082+
can be used to differentiate.
1083+
"""
1084+
return _Runtime.current().workflow_last_completion_result(type_hint)
1085+
1086+
1087+
def get_last_failure() -> Optional[BaseException]:
1088+
"""Get the last failure of the workflow if it has run previously."""
1089+
return _Runtime.current().workflow_last_failure()
1090+
1091+
10541092
def set_current_details(description: str) -> None:
10551093
"""Set the current details of the workflow which may appear in the UI/CLI.
10561094
Unlike static details set at start, this value can be updated throughout

tests/test_client.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
import asyncio
12
import dataclasses
23
import json
34
import os
45
import uuid
56
from datetime import datetime, timedelta, timezone
6-
from typing import Any, List, Mapping, Optional, cast
7+
from typing import Any, List, Mapping, Optional, Tuple, cast
78
from unittest import mock
89

910
import google.protobuf.any_pb2
@@ -91,6 +92,7 @@
9192
from temporalio.testing import WorkflowEnvironment
9293
from tests.helpers import (
9394
assert_eq_eventually,
95+
assert_eventually,
9496
ensure_search_attributes_present,
9597
new_worker,
9698
worker_versioning_enabled,
@@ -1501,3 +1503,58 @@ async def test_cloud_client_simple():
15011503
GetNamespaceRequest(namespace=os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"])
15021504
)
15031505
assert os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"] == result.namespace.namespace
1506+
1507+
1508+
@workflow.defn
1509+
class LastCompletionResultWorkflow:
1510+
@workflow.run
1511+
async def run(self) -> str:
1512+
last_result = workflow.get_last_completion_result(type_hint=str)
1513+
if last_result is not None:
1514+
return "From last completion: " + last_result
1515+
else:
1516+
return "My First Result"
1517+
1518+
1519+
async def test_schedule_last_completion_result(
1520+
client: Client, env: WorkflowEnvironment
1521+
):
1522+
if env.supports_time_skipping:
1523+
pytest.skip("Java test server doesn't support schedules")
1524+
1525+
async with new_worker(client, LastCompletionResultWorkflow) as worker:
1526+
handle = await client.create_schedule(
1527+
f"schedule-{uuid.uuid4()}",
1528+
Schedule(
1529+
action=ScheduleActionStartWorkflow(
1530+
"LastCompletionResultWorkflow",
1531+
id=f"workflow-{uuid.uuid4()}",
1532+
task_queue=worker.task_queue,
1533+
),
1534+
spec=ScheduleSpec(),
1535+
),
1536+
)
1537+
await handle.trigger()
1538+
1539+
async def get_schedule_result() -> Tuple[int, Optional[str]]:
1540+
desc = await handle.describe()
1541+
length = len(desc.info.recent_actions)
1542+
if length == 0:
1543+
return length, None
1544+
else:
1545+
workflow_id = cast(
1546+
ScheduleActionExecutionStartWorkflow,
1547+
desc.info.recent_actions[-1].action,
1548+
).workflow_id
1549+
workflow_handle = client.get_workflow_handle(workflow_id)
1550+
result = await workflow_handle.result()
1551+
return length, result
1552+
1553+
assert await get_schedule_result() == (1, "My First Result")
1554+
await handle.trigger()
1555+
assert await get_schedule_result() == (
1556+
2,
1557+
"From last completion: My First Result",
1558+
)
1559+
1560+
await handle.delete()

tests/worker/test_workflow.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8330,3 +8330,29 @@ async def test_workflow_headers_with_codec(
83308330
assert headers["foo"].data == b"bar"
83318331
else:
83328332
assert headers["foo"].data != b"bar"
8333+
8334+
8335+
@workflow.defn
8336+
class PreviousRunFailureWorkflow:
8337+
@workflow.run
8338+
async def run(self) -> str:
8339+
if workflow.info().attempt != 1:
8340+
previous_failure = workflow.get_last_failure()
8341+
assert isinstance(previous_failure, ApplicationError)
8342+
assert previous_failure.message == "Intentional Failure"
8343+
return "Done"
8344+
raise ApplicationError("Intentional Failure")
8345+
8346+
8347+
async def test_previous_run_failure(client: Client):
8348+
async with new_worker(client, PreviousRunFailureWorkflow) as worker:
8349+
handle = await client.start_workflow(
8350+
PreviousRunFailureWorkflow.run,
8351+
id=f"previous-run-failure-workflow-{uuid.uuid4()}",
8352+
task_queue=worker.task_queue,
8353+
retry_policy=RetryPolicy(
8354+
initial_interval=timedelta(milliseconds=10),
8355+
),
8356+
)
8357+
result = await handle.result()
8358+
assert result == "Done"

0 commit comments

Comments
 (0)