Skip to content

Commit 16e1de1

Browse files
authored
Merge pull request #35 from filintod/filinto/deterministic
add deterministic methods and increase test coverage
2 parents b12c044 + 930b909 commit 16e1de1

File tree

8 files changed

+1154
-3
lines changed

8 files changed

+1154
-3
lines changed

Makefile

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ test-unit:
77
test-e2e:
88
pytest -m e2e --verbose
99

10+
coverage-clean:
11+
rm -f .coverage .coverage.* coverage.xml
12+
13+
coverage-all: coverage-clean
14+
pytest -m "not e2e" --durations=0 --cov=durabletask --cov-branch --cov-report=term-missing --cov-report=xml
15+
pytest -m e2e --durations=0 --cov=durabletask --cov-branch --cov-report=term-missing --cov-report=xml --cov-append
16+
1017
install:
1118
python3 -m pip install .
1219

@@ -18,4 +25,4 @@ gen-proto:
1825
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
1926
rm durabletask/internal/*.proto
2027

21-
.PHONY: init test-unit test-e2e gen-proto install
28+
.PHONY: init test-unit test-e2e coverage-clean coverage-all gen-proto install

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,38 @@ This repo contains a Python client SDK for use with the [Durable Task Framework
1111
> Note that this project is **not** currently affiliated with the [Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) project for Azure Functions. If you are looking for a Python SDK for Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).
1212
1313

14+
## Minimal worker setup
15+
16+
To execute orchestrations and activities you must run a worker that connects to the Dapr Workflow sidecar and dispatches work on background threads:
17+
18+
```python
19+
from durabletask.worker import TaskHubGrpcWorker
20+
21+
worker = TaskHubGrpcWorker(host_address="localhost:4001")
22+
23+
worker.add_orchestrator(say_hello)
24+
worker.add_activity(hello_activity)
25+
26+
try:
27+
worker.start()
28+
# Worker runs in the background and processes work until stopped
29+
finally:
30+
worker.stop()
31+
```
32+
33+
Always stop the worker when you're finished. The worker keeps polling threads alive; if you skip `stop()` they continue running and can prevent your process from shutting down cleanly after failures. You can rely on the context manager form to guarantee cleanup:
34+
35+
```python
36+
from durabletask.worker import TaskHubGrpcWorker
37+
38+
with TaskHubGrpcWorker(host_address="localhost:4001") as worker:
39+
worker.add_orchestrator(say_hello)
40+
worker.add_activity(hello_activity)
41+
worker.start()
42+
# worker.stop() is called automatically on exit
43+
```
44+
45+
1446
## Supported patterns
1547

1648
The following orchestration patterns are currently supported.

durabletask/deterministic.py

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
"""
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
"""
13+
14+
"""
15+
Deterministic utilities for Durable Task workflows (async and generator).
16+
17+
This module provides deterministic alternatives to non-deterministic Python
18+
functions, ensuring workflow replay consistency across different executions.
19+
It is shared by both the asyncio authoring model and the generator-based model.
20+
"""
21+
22+
import hashlib
23+
import random
24+
import string as _string
25+
import uuid
26+
from collections.abc import Sequence
27+
from dataclasses import dataclass
28+
from datetime import datetime, timedelta
29+
from typing import Optional, TypeVar
30+
31+
32+
@dataclass
33+
class DeterminismSeed:
34+
"""Seed data for deterministic operations."""
35+
36+
instance_id: str
37+
orchestration_unix_ts: int
38+
39+
def to_int(self) -> int:
40+
"""Convert seed to integer for PRNG initialization."""
41+
combined = f"{self.instance_id}:{self.orchestration_unix_ts}"
42+
hash_bytes = hashlib.sha256(combined.encode("utf-8")).digest()
43+
return int.from_bytes(hash_bytes[:8], byteorder="big")
44+
45+
46+
def derive_seed(instance_id: str, orchestration_time: datetime) -> int:
47+
"""
48+
Derive a deterministic seed from instance ID and orchestration time.
49+
"""
50+
ts = int(orchestration_time.timestamp())
51+
return DeterminismSeed(instance_id=instance_id, orchestration_unix_ts=ts).to_int()
52+
53+
54+
def deterministic_random(instance_id: str, orchestration_time: datetime) -> random.Random:
55+
"""
56+
Create a deterministic random number generator.
57+
"""
58+
seed = derive_seed(instance_id, orchestration_time)
59+
return random.Random(seed)
60+
61+
62+
def deterministic_uuid4(rnd: random.Random) -> uuid.UUID:
63+
"""
64+
Generate a deterministic UUID4 using the provided random generator.
65+
66+
Note: This is deprecated in favor of deterministic_uuid_v5 which matches
67+
the .NET implementation for cross-language compatibility.
68+
"""
69+
bytes_ = bytes(rnd.randrange(0, 256) for _ in range(16))
70+
bytes_list = list(bytes_)
71+
bytes_list[6] = (bytes_list[6] & 0x0F) | 0x40 # Version 4
72+
bytes_list[8] = (bytes_list[8] & 0x3F) | 0x80 # Variant bits
73+
return uuid.UUID(bytes=bytes(bytes_list))
74+
75+
76+
def deterministic_uuid_v5(instance_id: str, current_datetime: datetime, counter: int) -> uuid.UUID:
77+
"""
78+
Generate a deterministic UUID v5 matching the .NET implementation.
79+
80+
This implementation matches the durabletask-dotnet NewGuid() method:
81+
https://github.com/microsoft/durabletask-dotnet/blob/main/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
82+
83+
Args:
84+
instance_id: The orchestration instance ID.
85+
current_datetime: The current orchestration datetime (frozen during replay).
86+
counter: The per-call counter (starts at 0 on each replay).
87+
88+
Returns:
89+
A deterministic UUID v5 that will be the same across replays.
90+
"""
91+
# DNS namespace UUID - same as .NET DnsNamespaceValue
92+
namespace = uuid.UUID("9e952958-5e33-4daf-827f-2fa12937b875")
93+
94+
# Build name matching .NET format: instanceId_datetime_counter
95+
# Using isoformat() which produces ISO 8601 format similar to .NET's ToString("o")
96+
name = f"{instance_id}_{current_datetime.isoformat()}_{counter}"
97+
98+
# Generate UUID v5 (SHA-1 based, matching .NET)
99+
return uuid.uuid5(namespace, name)
100+
101+
102+
class DeterministicContextMixin:
103+
"""
104+
Mixin providing deterministic helpers for workflow contexts.
105+
106+
Assumes the inheriting class exposes `instance_id` and `current_utc_datetime` attributes.
107+
108+
This implementation matches the .NET durabletask SDK approach with an explicit
109+
counter for UUID generation that resets on each replay.
110+
"""
111+
112+
def __init__(self, *args, **kwargs):
113+
"""Initialize the mixin with UUID and timestamp counters."""
114+
super().__init__(*args, **kwargs)
115+
# Counter for deterministic UUID generation (matches .NET newGuidCounter)
116+
# This counter resets to 0 on each replay, ensuring determinism
117+
self._uuid_counter: int = 0
118+
# Counter for deterministic timestamp sequencing (resets on replay)
119+
self._timestamp_counter: int = 0
120+
121+
def now(self) -> datetime:
122+
"""Alias for deterministic current_utc_datetime."""
123+
return self.current_utc_datetime # type: ignore[attr-defined]
124+
125+
def random(self) -> random.Random:
126+
"""Return a PRNG seeded deterministically from instance id and orchestration time."""
127+
rnd = deterministic_random(
128+
self.instance_id, # type: ignore[attr-defined]
129+
self.current_utc_datetime, # type: ignore[attr-defined]
130+
)
131+
# Mark as deterministic for asyncio sandbox detector whitelisting of bound methods (randint, random)
132+
try:
133+
rnd._dt_deterministic = True
134+
except Exception:
135+
pass
136+
return rnd
137+
138+
def uuid4(self) -> uuid.UUID:
139+
"""
140+
Return a deterministically generated UUID v5 with explicit counter.
141+
https://www.sohamkamani.com/uuid-versions-explained/#v5-non-random-uuids
142+
143+
This matches the .NET implementation's NewGuid() method which uses:
144+
- Instance ID
145+
- Current UTC datetime (frozen during replay)
146+
- Per-call counter (resets to 0 on each replay)
147+
148+
The counter ensures multiple calls produce different UUIDs while maintaining
149+
determinism across replays.
150+
"""
151+
# Lazily initialize counter if not set by __init__ (for compatibility)
152+
if not hasattr(self, "_uuid_counter"):
153+
self._uuid_counter = 0
154+
155+
result = deterministic_uuid_v5(
156+
self.instance_id, # type: ignore[attr-defined]
157+
self.current_utc_datetime, # type: ignore[attr-defined]
158+
self._uuid_counter,
159+
)
160+
self._uuid_counter += 1
161+
return result
162+
163+
def new_guid(self) -> uuid.UUID:
164+
"""Alias for uuid4 for API parity with other SDKs."""
165+
return self.uuid4()
166+
167+
def random_string(self, length: int, *, alphabet: Optional[str] = None) -> str:
168+
"""Return a deterministically generated random string of the given length."""
169+
if length < 0:
170+
raise ValueError("length must be non-negative")
171+
chars = alphabet if alphabet is not None else (_string.ascii_letters + _string.digits)
172+
if not chars:
173+
raise ValueError("alphabet must not be empty")
174+
rnd = self.random()
175+
size = len(chars)
176+
return "".join(chars[rnd.randrange(0, size)] for _ in range(length))
177+
178+
def random_int(self, min_value: int = 0, max_value: int = 2**31 - 1) -> int:
179+
"""Return a deterministic random integer in the specified range."""
180+
if min_value > max_value:
181+
raise ValueError("min_value must be <= max_value")
182+
rnd = self.random()
183+
return rnd.randint(min_value, max_value)
184+
185+
T = TypeVar("T")
186+
187+
def random_choice(self, sequence: Sequence[T]) -> T:
188+
"""Return a deterministic random element from a non-empty sequence."""
189+
if not sequence:
190+
raise IndexError("Cannot choose from empty sequence")
191+
rnd = self.random()
192+
return rnd.choice(sequence)
193+
194+
def now_with_sequence(self) -> datetime:
195+
"""
196+
Return deterministic timestamp with microsecond increment per call.
197+
198+
Each call returns: current_utc_datetime + (counter * 1 microsecond)
199+
200+
This provides ordered, unique timestamps for tracing/telemetry while maintaining
201+
determinism across replays. The counter resets to 0 on each replay (similar to
202+
_uuid_counter pattern).
203+
204+
Perfect for preserving event ordering within a workflow without requiring activities.
205+
206+
Returns:
207+
datetime: Deterministic timestamp that increments on each call
208+
209+
Example:
210+
```python
211+
def workflow(ctx):
212+
t1 = ctx.now_with_sequence() # 2024-01-01 12:00:00.000000
213+
result = yield ctx.call_activity(some_activity, input="data")
214+
t2 = ctx.now_with_sequence() # 2024-01-01 12:00:00.000001
215+
# t1 < t2, preserving order for telemetry
216+
```
217+
"""
218+
offset = timedelta(microseconds=self._timestamp_counter)
219+
self._timestamp_counter += 1
220+
return self.current_utc_datetime + offset # type: ignore[attr-defined]
221+
222+
def current_utc_datetime_with_sequence(self):
223+
"""Alias for now_with_sequence for API parity with other SDKs."""
224+
return self.now_with_sequence()

durabletask/worker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import durabletask.internal.orchestrator_service_pb2 as pb
2020
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
2121
import durabletask.internal.shared as shared
22-
from durabletask import task
22+
from durabletask import deterministic, task
2323
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
2424

2525
TInput = TypeVar("TInput")
@@ -605,11 +605,14 @@ def _execute_activity(
605605
)
606606

607607

608-
class _RuntimeOrchestrationContext(task.OrchestrationContext):
608+
class _RuntimeOrchestrationContext(
609+
task.OrchestrationContext, deterministic.DeterministicContextMixin
610+
):
609611
_generator: Optional[Generator[task.Task, Any, Any]]
610612
_previous_task: Optional[task.Task]
611613

612614
def __init__(self, instance_id: str):
615+
super().__init__()
613616
self._generator = None
614617
self._is_replaying = True
615618
self._is_complete = False

0 commit comments

Comments
 (0)