Skip to content

Commit 361be4a

Browse files
committed
Exceptions module and temporalite support
1 parent 26d6874 commit 361be4a

File tree

17 files changed

+1793
-136
lines changed

17 files changed

+1793
-136
lines changed

.github/workflows/ci.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ jobs:
2727
- uses: actions/setup-python@v1
2828
with:
2929
python-version: ${{ matrix.python }}
30+
# Needed to tests since they use external server
31+
- uses: actions/setup-go@v2
32+
with:
33+
go-version: "1.17"
3034
- run: python -m pip install --upgrade wheel poetry poethepoet
3135
- run: poetry install
3236
- run: poe lint
3337
- run: poe build
38+
- run: poe test

docs/api.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,9 @@ Converters
1616

1717
.. automodule:: temporalio.converter
1818
:members:
19+
20+
Exceptions
21+
----------
22+
23+
.. automodule:: temporalio.exceptions
24+
:members:

docs/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import os
1414
import sys
1515

16-
sys.path.insert(0, os.path.abspath("../"))
16+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
1717

1818

1919
# -- Project information -----------------------------------------------------

poetry.lock

Lines changed: 46 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ isort = "^5.10.1"
2424
maturin = "^0.12.6"
2525
mypy = "^0.931"
2626
mypy-protobuf = "^3.2.0"
27+
psutil = "^5.9.0"
2728
pydocstyle = "^6.1.1"
2829
pytest = "^6.2.5"
2930
pytest-asyncio = "^0.18.0"

temporalio/bridge/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def details(self) -> bytes:
8383
class Client:
8484
@staticmethod
8585
async def connect(opts: ClientOptions) -> "Client":
86-
return Client(await temporal_sdk_bridge.new_client(opts))
86+
return Client(await temporal_sdk_bridge.connect_client(opts))
8787

8888
_ref: temporal_sdk_bridge.ClientRef
8989

temporalio/bridge/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tonic;
99
fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
1010
m.add("RPCError", py.get_type::<RPCError>())?;
1111
m.add_class::<ClientRef>()?;
12-
m.add_function(wrap_pyfunction!(new_client, m)?)?;
12+
m.add_function(wrap_pyfunction!(connect_client, m)?)?;
1313
Ok(())
1414
}
1515

@@ -55,7 +55,7 @@ pub struct ClientRetryConfig {
5555
}
5656

5757
#[pyfunction]
58-
fn new_client(py: Python, opts: ClientOptions) -> PyResult<&PyAny> {
58+
fn connect_client(py: Python, opts: ClientOptions) -> PyResult<&PyAny> {
5959
// TODO(cretz): Add metrics_meter?
6060
let opts: temporal_client::ServerGatewayOptions = opts.try_into()?;
6161
pyo3_asyncio::tokio::future_into_py(py, async move {

temporalio/client.py

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import temporalio.api.workflowservice.v1
1818
import temporalio.common
1919
import temporalio.converter
20-
import temporalio.failure
20+
import temporalio.exceptions
2121
import temporalio.workflow_service
2222
from temporalio.workflow_service import RPCError, RPCStatusCode
2323

@@ -396,10 +396,8 @@ async def result(
396396
req.next_page_token = b""
397397
continue
398398
# Ignoring anything after the first response like TypeScript
399-
if not complete_attr.result:
400-
return cast(T, None)
401-
results = await self._client.data_converter.decode(
402-
complete_attr.result.payloads
399+
results = await temporalio.converter.decode_payloads(
400+
complete_attr.result, self._client.data_converter
403401
)
404402
if not results:
405403
return cast(T, None)
@@ -414,38 +412,35 @@ async def result(
414412
req.next_page_token = b""
415413
continue
416414
raise WorkflowFailureError(
417-
cause=await temporalio.failure.FailureError.from_proto(
415+
cause=await temporalio.exceptions.failure_to_error(
418416
fail_attr.failure, self._client.data_converter
419-
)
417+
),
420418
)
421419
elif event.HasField("workflow_execution_canceled_event_attributes"):
422420
cancel_attr = event.workflow_execution_canceled_event_attributes
423-
details = []
424-
if cancel_attr.details and cancel_attr.details.payloads:
425-
details = await self._client.data_converter.decode(
426-
cancel_attr.details.payloads
427-
)
428421
raise WorkflowFailureError(
429-
cause=temporalio.failure.FailureError(
422+
cause=temporalio.exceptions.CancelledError(
430423
"Workflow cancelled",
431-
temporalio.failure.CancelledFailure(*details),
424+
*(
425+
await temporalio.converter.decode_payloads(
426+
cancel_attr.details.payloads,
427+
self._client.data_converter,
428+
)
429+
),
432430
)
433431
)
434432
elif event.HasField("workflow_execution_terminated_event_attributes"):
435433
term_attr = event.workflow_execution_terminated_event_attributes
436-
details = []
437-
if term_attr.details and term_attr.details.payloads:
438-
details = await self._client.data_converter.decode(
439-
term_attr.details.payloads
440-
)
441434
raise WorkflowFailureError(
442-
cause=temporalio.failure.FailureError(
435+
cause=temporalio.exceptions.TerminatedError(
443436
term_attr.reason or "Workflow terminated",
444-
temporalio.failure.TerminatedFailure(
445-
*details,
446-
reason=term_attr.reason or None,
437+
*(
438+
await temporalio.converter.decode_payloads(
439+
term_attr.details.payloads,
440+
self._client.data_converter,
441+
)
447442
),
448-
)
443+
),
449444
)
450445
elif event.HasField("workflow_execution_timed_out_event_attributes"):
451446
time_attr = event.workflow_execution_timed_out_event_attributes
@@ -455,12 +450,10 @@ async def result(
455450
req.next_page_token = b""
456451
continue
457452
raise WorkflowFailureError(
458-
cause=temporalio.failure.FailureError(
453+
cause=temporalio.exceptions.TimeoutError(
459454
"Workflow timed out",
460-
temporalio.failure.TimeoutFailure(
461-
temporalio.failure.TimeoutType.START_TO_CLOSE
462-
),
463-
)
455+
type=temporalio.exceptions.TimeoutType.START_TO_CLOSE,
456+
),
464457
)
465458
elif event.HasField("workflow_execution_continued_as_new_event_attributes"):
466459
cont_attr = event.workflow_execution_continued_as_new_event_attributes
@@ -479,7 +472,7 @@ async def cancel(
479472
self,
480473
*,
481474
run_id: Optional[str] = SELF_RUN_ID,
482-
first_execution_run_id: Optional[None],
475+
first_execution_run_id: Optional[str] = None,
483476
) -> None:
484477
"""Cancel the workflow.
485478
@@ -902,7 +895,7 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
902895
class WorkflowFailureError(Exception):
903896
"""Error that occurs when a workflow is unsuccessful."""
904897

905-
def __init__(self, *, cause: temporalio.failure.FailureError) -> None:
898+
def __init__(self, *, cause: temporalio.exceptions.FailureError) -> None:
906899
"""Create workflow failure error."""
907900
super().__init__("Workflow execution failed")
908901
# TODO(cretz): Confirm setting this __cause__ is acceptable
@@ -917,6 +910,10 @@ def __init__(self, new_execution_run_id: str) -> None:
917910
super().__init__("Workflow continued as new")
918911
self._new_execution_run_id = new_execution_run_id
919912

913+
@property
914+
def new_execution_run_id(self) -> str:
915+
return self._new_execution_run_id
916+
920917

921918
class WorkflowQueryRejectedError(Exception):
922919
"""Error that occurs when a query was rejected."""

temporalio/converter.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,3 +419,11 @@ def default() -> CompositeDataConverter:
419419
BinaryProtoPayloadConverter(),
420420
JSONPlainPayloadConverter(),
421421
)
422+
423+
424+
async def decode_payloads(
425+
payloads: Optional[temporalio.api.common.v1.Payloads], converter: DataConverter
426+
) -> List[Any]:
427+
if not payloads or not payloads.payloads:
428+
return []
429+
return await converter.decode(payloads.payloads)

0 commit comments

Comments
 (0)