Skip to content

Commit e9261cd

Browse files
committed
Multiple replay working
1 parent 3a81369 commit e9261cd

File tree

7 files changed

+267
-101
lines changed

7 files changed

+267
-101
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ pub struct WorkerRef {
2525

2626
#[pyclass]
2727
pub struct ReplayWorker {
28-
// #[pyo3(get)]
2928
pub worker: WorkerRef,
30-
// #[pyo3(get)]
3129
pub history_pusher: HistoryPusher,
3230
}
3331

temporalio/bridge/worker.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from __future__ import annotations
77

88
from dataclasses import dataclass
9-
from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional, Sequence
9+
from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional, Sequence, Tuple
1010

1111
import google.protobuf.internal.containers
1212
from typing_extensions import TypeAlias
@@ -60,8 +60,14 @@ def create(client: temporalio.bridge.client.Client, config: WorkerConfig) -> Wor
6060
@staticmethod
6161
def for_replay(
6262
config: WorkerConfig,
63-
) -> (Worker, temporalio.bridge.temporal_sdk_bridge.HistoryPusher):
63+
) -> Tuple[Worker, temporalio.bridge.temporal_sdk_bridge.HistoryPusher]:
6464
"""Create a bridge replay worker from history."""
65+
cfg = temporalio.bridge.telemetry.TelemetryConfig()
66+
cfg.tracing_filter = "info,temporal_sdk_core=DEBUG,temporal_sdk=DEBUG"
67+
temporalio.bridge.telemetry.init_telemetry(
68+
cfg,
69+
warn_if_already_inited=False,
70+
)
6571
[
6672
replay_worker,
6773
pusher,

temporalio/worker/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
WorkflowInterceptorClassInput,
2020
WorkflowOutboundInterceptor,
2121
)
22-
from .replayer import Replayer, ReplayerConfig
22+
from .replayer import Replayer, ReplayerConfig, WorkflowHistory
2323
from .worker import Worker, WorkerConfig
2424
from .workflow_instance import (
2525
UnsandboxedWorkflowRunner,

temporalio/worker/replayer.py

Lines changed: 164 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,35 @@
22

33
from __future__ import annotations
44

5+
import asyncio
56
import concurrent.futures
67
import copy
78
import json
89
import logging
910
import re
10-
from typing import Any, Dict, Iterable, Optional, Sequence, Type, Union
11+
from dataclasses import dataclass
12+
from typing import (
13+
Any,
14+
AsyncIterator,
15+
Callable,
16+
Dict,
17+
Iterable,
18+
Mapping,
19+
MutableMapping,
20+
Optional,
21+
Sequence,
22+
Type,
23+
Union,
24+
)
1125

1226
import google.protobuf.json_format
1327
from typing_extensions import TypedDict
1428

1529
import temporalio.api.history.v1
30+
import temporalio.bridge.proto.workflow_activation
1631
import temporalio.bridge.worker
1732
import temporalio.converter
33+
import temporalio.workflow
1834

1935
from .interceptor import Interceptor
2036
from .worker import load_default_build_id
@@ -39,6 +55,7 @@ def __init__(
3955
build_id: Optional[str] = None,
4056
identity: Optional[str] = None,
4157
debug_mode: bool = False,
58+
fail_fast: bool = True,
4259
) -> None:
4360
"""Create a replayer to replay workflows from history.
4461
@@ -58,51 +75,18 @@ def __init__(
5875
build_id=build_id,
5976
identity=identity,
6077
debug_mode=debug_mode,
78+
fail_fast=fail_fast,
6179
)
62-
63-
def config(self) -> ReplayerConfig:
64-
"""Config, as a dictionary, used to create this replayer.
65-
66-
Returns:
67-
Configuration, shallow-copied.
68-
"""
69-
config = self._config.copy()
70-
config["workflows"] = list(config["workflows"])
71-
return config
72-
73-
async def replay_workflow(
74-
self,
75-
history: Union[temporalio.api.history.v1.History, str, Dict[str, Any]],
76-
) -> None:
77-
"""Replay a workflow for the given history.
78-
79-
Args:
80-
history: The history to replay. Can be a proto history object or
81-
JSON history as exported via web/tctl. If JSON history, can be a
82-
JSON string or a JSON dictionary as returned by
83-
:py:func:`json.load`.
84-
"""
85-
# Convert history if JSON or dict
86-
if not isinstance(history, temporalio.api.history.v1.History):
87-
history = _history_from_json(history)
88-
89-
# Extract workflow started event
90-
started_event = next(
91-
(
92-
e
93-
for e in history.events
94-
if e.HasField("workflow_execution_started_event_attributes")
95-
),
96-
None,
97-
)
98-
if not started_event:
99-
raise ValueError("Started event not found")
80+
tq = f"replay-{build_id}"
10081

10182
# Create bridge worker and workflow worker
102-
(bridge_worker, pusher) = temporalio.bridge.worker.Worker.for_replay(
83+
(
84+
self._bridge_worker,
85+
self._pusher,
86+
) = temporalio.bridge.worker.Worker.for_replay(
10387
temporalio.bridge.worker.WorkerConfig(
10488
namespace=self._config["namespace"],
105-
task_queue=started_event.workflow_execution_started_event_attributes.task_queue.name,
89+
task_queue=tq,
10690
build_id=self._config["build_id"] or load_default_build_id(),
10791
identity_override=self._config["identity"],
10892
# All values below are ignored but required by Core
@@ -121,31 +105,127 @@ async def replay_workflow(
121105
max_task_queue_activities_per_second=None,
122106
),
123107
)
124-
workflow_worker = _WorkflowWorker(
125-
bridge_worker=lambda: bridge_worker,
108+
self._workflow_worker = _WorkflowWorker(
109+
bridge_worker=lambda: self._bridge_worker,
126110
namespace=self._config["namespace"],
127-
task_queue=started_event.workflow_execution_started_event_attributes.task_queue.name,
111+
task_queue=tq,
128112
workflows=self._config["workflows"],
129113
workflow_task_executor=self._config["workflow_task_executor"],
130114
workflow_runner=self._config["workflow_runner"],
131115
data_converter=self._config["data_converter"],
132116
interceptors=self._config["interceptors"],
133117
debug_mode=self._config["debug_mode"],
134-
fail_on_eviction=True,
118+
on_eviction_hook=self._replayer_eviction_hook(
119+
fail_fast=self._config["fail_fast"]
120+
),
135121
)
122+
self._current_run_results = WorkflowReplayResults(False, dict())
136123

137-
await pusher.push_history("fake", history.SerializeToString())
138-
pusher.close()
124+
def config(self) -> ReplayerConfig:
125+
"""Config, as a dictionary, used to create this replayer.
139126
140-
# Run it
141-
try:
142-
await workflow_worker.run()
143-
finally:
144-
# We must finalize shutdown here
127+
Returns:
128+
Configuration, shallow-copied.
129+
"""
130+
config = self._config.copy()
131+
config["workflows"] = list(config["workflows"])
132+
return config
133+
134+
async def replay_workflow(
135+
self,
136+
history: WorkflowHistory,
137+
) -> None:
138+
"""Replay a workflow for the given history.
139+
140+
Args:
141+
history: The history to replay. Can be fetched directly, or use
142+
:py:meth:`WorkflowHistory.from_json` to parse a history downloaded via `tctl` or the
143+
web ui.
144+
"""
145+
146+
async def gen_hist():
147+
yield history
148+
149+
await self.replay_workflows(gen_hist())
150+
151+
async def replay_workflows(
152+
self, histories: AsyncIterator[WorkflowHistory]
153+
) -> WorkflowReplayResults:
154+
"""Replay a workflow for the given history.
155+
156+
Args:
157+
histories: The histories to replay, from an async iterator.
158+
"""
159+
self._current_run_results = WorkflowReplayResults(False, dict())
160+
161+
async def history_feeder():
162+
try:
163+
async for history in histories:
164+
# Extract workflow started event
165+
started_event = next(
166+
(
167+
e
168+
for e in history.events
169+
if e.HasField("workflow_execution_started_event_attributes")
170+
),
171+
None,
172+
)
173+
if not started_event:
174+
raise ValueError("Started event not found")
175+
176+
as_history_proto = temporalio.api.history.v1.History(
177+
events=history.events
178+
)
179+
await self._pusher.push_history(
180+
history.workflow_id, as_history_proto.SerializeToString()
181+
)
182+
finally:
183+
self._pusher.close()
184+
185+
async def runner():
186+
# Run it
145187
try:
146-
await bridge_worker.finalize_shutdown()
147-
except Exception:
148-
logger.warning("Failed to finalize shutdown", exc_info=True)
188+
await self._workflow_worker.run()
189+
finally:
190+
# We must finalize shutdown here
191+
try:
192+
await self._bridge_worker.finalize_shutdown()
193+
except Exception:
194+
logger.warning("Failed to finalize shutdown", exc_info=True)
195+
196+
await asyncio.gather(history_feeder(), runner())
197+
return self._current_run_results
198+
199+
def _replayer_eviction_hook(
200+
self, fail_fast: bool
201+
) -> Callable[
202+
[str, temporalio.bridge.proto.workflow_activation.RemoveFromCache], bool
203+
]:
204+
def retfn(run_id, remove_job):
205+
ex = None
206+
if (
207+
remove_job.reason
208+
== temporalio.bridge.proto.workflow_activation.RemoveFromCache.EvictionReason.CACHE_FULL
209+
):
210+
# Cache being full doesn't count as a failure-inducing eviction
211+
pass
212+
elif (
213+
remove_job.reason
214+
== temporalio.bridge.proto.workflow_activation.RemoveFromCache.EvictionReason.NONDETERMINISM
215+
):
216+
ex = temporalio.workflow.NondeterminismError(remove_job.message)
217+
else:
218+
ex = RuntimeError(f"{remove_job.reason}: {remove_job.message}")
219+
220+
if ex is not None:
221+
if fail_fast:
222+
self._pusher.close()
223+
raise ex
224+
else:
225+
self._current_run_results.had_any_failure = True
226+
self._current_run_results.failure_details[run_id] = ex
227+
228+
return retfn
149229

150230

151231
class ReplayerConfig(TypedDict, total=False):
@@ -160,6 +240,31 @@ class ReplayerConfig(TypedDict, total=False):
160240
build_id: Optional[str]
161241
identity: Optional[str]
162242
debug_mode: bool
243+
fail_fast: bool
244+
245+
246+
@dataclass
247+
class WorkflowHistory:
248+
"""A workflow's id and history"""
249+
250+
workflow_id: str
251+
events: Sequence[temporalio.api.history.v1.HistoryEvent]
252+
253+
@classmethod
254+
def from_json(
255+
cls, workflow_id: str, history: Union[str, Dict[str, Any]]
256+
) -> WorkflowHistory:
257+
"""Construct a WorkflowHistory from an id and a json dump of history"""
258+
parsed = _history_from_json(history)
259+
return cls(workflow_id, parsed.events)
260+
261+
262+
@dataclass
263+
class WorkflowReplayResults:
264+
"""Results of replaying multiple workflows"""
265+
266+
had_any_failure: bool
267+
failure_details: MutableMapping[str, Exception]
163268

164269

165270
def _history_from_json(
@@ -207,7 +312,10 @@ def _history_from_json(
207312
)
208313
_fix_history_enum("TASK_QUEUE_KIND", event, "*", "taskQueue", "kind")
209314
_fix_history_enum(
210-
"TIMEOUT_TYPE", event, "workflowTaskTimedOutEventAttributes", "timeoutType"
315+
"TIMEOUT_TYPE",
316+
event,
317+
"workflowTaskTimedOutEventAttributes",
318+
"timeoutType",
211319
)
212320
_fix_history_enum(
213321
"WORKFLOW_ID_REUSE_POLICY",

0 commit comments

Comments
 (0)