Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class ActivityCancellationDetails:
not_found: bool = False
cancel_requested: bool = False
paused: bool = False
reset: bool = False
timed_out: bool = False
worker_shutdown: bool = False

Expand All @@ -167,6 +168,7 @@ def _from_proto(
paused=proto.is_paused,
timed_out=proto.is_timed_out,
worker_shutdown=proto.is_worker_shutdown,
reset=proto.is_reset,
)


Expand Down
3 changes: 3 additions & 0 deletions temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ impl ClientRef {
"request_cancel_workflow_execution" => {
rpc_call!(retry_client, call, request_cancel_workflow_execution)
}
"reset_activity" => {
rpc_call!(retry_client, call, reset_activity)
}
"reset_sticky_task_queue" => {
rpc_call!(retry_client, call, reset_sticky_task_queue)
}
Expand Down
8 changes: 7 additions & 1 deletion temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6363,11 +6363,16 @@ async def heartbeat_async_activity(
metadata=input.rpc_metadata,
timeout=input.rpc_timeout,
)
if resp_by_id.cancel_requested or resp_by_id.activity_paused:
if (
resp_by_id.cancel_requested
or resp_by_id.activity_paused
or resp_by_id.activity_reset
):
raise AsyncActivityCancelledError(
details=ActivityCancellationDetails(
cancel_requested=resp_by_id.cancel_requested,
paused=resp_by_id.activity_paused,
reset=resp_by_id.activity_reset,
)
)

Expand All @@ -6388,6 +6393,7 @@ async def heartbeat_async_activity(
details=ActivityCancellationDetails(
cancel_requested=resp.cancel_requested,
paused=resp.activity_paused,
reset=resp.activity_reset,
)
)

Expand Down
18 changes: 18 additions & 0 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,24 @@ async def _handle_start_activity_task(
),
completion.result.failed.failure,
)
elif (
isinstance(
err,
(asyncio.CancelledError, temporalio.exceptions.CancelledError),
)
and running_activity.cancellation_details.details
and running_activity.cancellation_details.details.reset
):
temporalio.activity.logger.warning(
"Completing as failure due to unhandled cancel error produced by activity reset",
)
await self._data_converter.encode_failure(
temporalio.exceptions.ApplicationError(
type="ActivityReset",
message="Unhandled activity cancel error produced by activity reset",
),
completion.result.failed.failure,
)
elif (
isinstance(
err,
Expand Down
1 change: 1 addition & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2791,6 +2791,7 @@ def _apply_schedule_command(
command.user_metadata.summary.CopyFrom(
self._instance._payload_converter.to_payload(self._input.summary)
)
print("Activity summary: ", command.user_metadata.summary)
if self._input.priority:
command.schedule_activity.priority.CopyFrom(
self._input.priority._to_proto()
Expand Down
105 changes: 105 additions & 0 deletions tests/worker/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from contextvars import ContextVar
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import Any, Callable, List, NoReturn, Optional, Sequence, Type

import temporalio.api.workflowservice.v1
from temporalio import activity, workflow
from temporalio.client import (
AsyncActivityHandle,
Expand Down Expand Up @@ -1486,3 +1489,105 @@ async def h():
client, worker, heartbeat, retry_max_attempts=2
)
assert result.result == "details: Some detail"


async def test_activity_reset_catch(
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
):
if env.supports_time_skipping:
pytest.skip("Time skipping server doesn't support activity reset")

@activity.defn
async def wait_cancel() -> str:
req = temporalio.api.workflowservice.v1.ResetActivityRequest(
namespace=client.namespace,
execution=temporalio.api.common.v1.WorkflowExecution(
workflow_id=activity.info().workflow_id,
run_id=activity.info().workflow_run_id,
),
id=activity.info().activity_id,
)
await client.workflow_service.reset_activity(req)
try:
while True:
await asyncio.sleep(0.3)
activity.heartbeat()
except asyncio.CancelledError:
details = activity.cancellation_details()
assert details is not None
return "Got cancelled error, reset? " + str(details.reset)

@activity.defn
def sync_wait_cancel() -> str:
req = temporalio.api.workflowservice.v1.ResetActivityRequest(
namespace=client.namespace,
execution=temporalio.api.common.v1.WorkflowExecution(
workflow_id=activity.info().workflow_id,
run_id=activity.info().workflow_run_id,
),
id=activity.info().activity_id,
)
asyncio.run(client.workflow_service.reset_activity(req))
try:
while True:
sleep(0.3)
activity.heartbeat()
except temporalio.exceptions.CancelledError:
details = activity.cancellation_details()
assert details is not None
return "Got cancelled error, reset? " + str(details.reset)
except Exception as e:
return str(type(e)) + str(e)

result = await _execute_workflow_with_activity(
client,
worker,
wait_cancel,
)
assert result.result == "Got cancelled error, reset? True"

config = WorkerConfig(
activity_executor=ThreadPoolExecutor(max_workers=1),
)
result = await _execute_workflow_with_activity(
client,
worker,
sync_wait_cancel,
worker_config=config,
)
assert result.result == "Got cancelled error, reset? True"


async def test_activity_reset_history(
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
):
if env.supports_time_skipping:
pytest.skip("Time skipping server doesn't support activity reset")

@activity.defn
async def wait_cancel() -> str:
req = temporalio.api.workflowservice.v1.ResetActivityRequest(
namespace=client.namespace,
execution=temporalio.api.common.v1.WorkflowExecution(
workflow_id=activity.info().workflow_id,
run_id=activity.info().workflow_run_id,
),
id=activity.info().activity_id,
)
await client.workflow_service.reset_activity(req)
while True:
await asyncio.sleep(0.3)
activity.heartbeat()

with pytest.raises(WorkflowFailureError) as e:
result = await _execute_workflow_with_activity(
client,
worker,
wait_cancel,
)
assert isinstance(e.value.cause, ActivityError)
assert isinstance(e.value.cause.cause, ApplicationError)
assert (
e.value.cause.cause.message
== "Unhandled activity cancel error produced by activity reset"
)
5 changes: 4 additions & 1 deletion tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,10 @@ class SimpleActivityWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
say_hello, name, schedule_to_close_timeout=timedelta(seconds=5)
say_hello,
name,
schedule_to_close_timeout=timedelta(seconds=5),
summary="Do a thing",
)


Expand Down
Loading