diff --git a/pyproject.toml b/pyproject.toml index 0f78e979c..b15c54be8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,7 +65,7 @@ build-develop = "python scripts/setup_bridge.py develop" build-develop-with-release = { cmd = "python scripts/setup_bridge.py develop", env = { TEMPORAL_BUILD_RELEASE = "1" }} fix-wheel = "python scripts/fix_wheel.py" format = [{cmd = "black ."}, {cmd = "isort ."}] -gen-docs = "pydoctor" +gen-docs = "python scripts/gen_docs.py" gen-protos = "python scripts/gen_protos.py" lint = [ {cmd = "black --check ."}, diff --git a/scripts/_img/favicon.ico b/scripts/_img/favicon.ico new file mode 100644 index 000000000..a17268cbb Binary files /dev/null and b/scripts/_img/favicon.ico differ diff --git a/scripts/gen_docs.py b/scripts/gen_docs.py new file mode 100644 index 000000000..2a6955a15 --- /dev/null +++ b/scripts/gen_docs.py @@ -0,0 +1,17 @@ +import shutil +import subprocess +from pathlib import Path + +base_dir = Path(__file__).parent.parent + +if __name__ == "__main__": + print("Generating documentation...") + + # Run pydoctor + subprocess.check_call("pydoctor") + + # Copy favicon + shutil.copyfile( + base_dir / "scripts" / "_img" / "favicon.ico", + base_dir / "build" / "apidocs" / "favicon.ico", + ) diff --git a/temporalio/bridge/src/telemetry.rs b/temporalio/bridge/src/telemetry.rs index 8ba4f856d..b0fcf6322 100644 --- a/temporalio/bridge/src/telemetry.rs +++ b/temporalio/bridge/src/telemetry.rs @@ -4,8 +4,8 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::str::FromStr; use temporal_sdk_core::{ - telemetry_init, Logger, MetricsExporter, OtelCollectorOptions, TelemetryOptions, - TelemetryOptionsBuilder, TraceExporter, + telemetry_init, Logger, MetricTemporality, MetricsExporter, OtelCollectorOptions, + TelemetryOptions, TelemetryOptionsBuilder, TraceExporter, }; use url::Url; @@ -23,6 +23,7 @@ pub struct TelemetryConfig { log_forwarding_level: Option, otel_metrics: Option, prometheus_metrics: Option, + metric_temporality: String, } #[derive(FromPyObject)] @@ -83,6 +84,19 @@ impl TryFrom for TelemetryOptions { })?, )); } + match conf.metric_temporality.as_str() { + "cumulative" => { + build.metric_temporality(MetricTemporality::Cumulative); + } + "delta" => { + build.metric_temporality(MetricTemporality::Delta); + } + _ => { + return Err(PyValueError::new_err( + "Invalid metric temporality, expected 'cumulative' or 'delta'", + )); + } + } build .build() .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {}", err))) diff --git a/temporalio/bridge/telemetry.py b/temporalio/bridge/telemetry.py index c8a218988..5b19f300e 100644 --- a/temporalio/bridge/telemetry.py +++ b/temporalio/bridge/telemetry.py @@ -9,6 +9,8 @@ from dataclasses import dataclass from typing import Mapping, Optional +from typing_extensions import Literal + import temporalio.bridge.temporal_sdk_bridge @@ -22,6 +24,7 @@ class TelemetryConfig: log_forwarding_level: Optional[str] = None otel_metrics: Optional[OtelCollectorConfig] = None prometheus_metrics: Optional[PrometheusMetricsConfig] = None + metric_temporality: Literal["cumulative", "delta"] = "cumulative" @dataclass diff --git a/temporalio/client.py b/temporalio/client.py index 39ef41e8a..195739979 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -1990,6 +1990,20 @@ def status(self) -> Optional[WorkflowExecutionStatus]: return self._status +class WorkflowQueryFailedError(temporalio.exceptions.TemporalError): + """Error that occurs when a query fails.""" + + def __init__(self, message: str) -> None: + """Create workflow query failed error.""" + super().__init__(message) + self._message = message + + @property + def message(self) -> str: + """Get query failed message.""" + return self._message + + class AsyncActivityCancelledError(temporalio.exceptions.TemporalError): """Error that occurs when async activity attempted heartbeat but was cancelled.""" @@ -2410,9 +2424,17 @@ async def query_workflow(self, input: QueryWorkflowInput) -> Any: ) if input.headers is not None: temporalio.common._apply_headers(input.headers, req.query.header.fields) - resp = await self._client.workflow_service.query_workflow( - req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout - ) + try: + resp = await self._client.workflow_service.query_workflow( + req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout + ) + except RPCError as err: + # If the status is INVALID_ARGUMENT, we can assume it's a query + # failed error + if err.status == RPCStatusCode.INVALID_ARGUMENT: + raise WorkflowQueryFailedError(err.message) + else: + raise if resp.HasField("query_rejected"): raise WorkflowQueryRejectedError( WorkflowExecutionStatus(resp.query_rejected.status) diff --git a/temporalio/service.py b/temporalio/service.py index 739a044eb..9d06d9ce2 100644 --- a/temporalio/service.py +++ b/temporalio/service.py @@ -662,7 +662,6 @@ async def __call__( class _BridgeServiceClient(ServiceClient): @staticmethod async def connect(config: ConnectConfig) -> _BridgeServiceClient: - # TODO(cretz): Expose telemetry init config temporalio.bridge.telemetry.init_telemetry( temporalio.bridge.telemetry.TelemetryConfig(), warn_if_already_inited=False, @@ -754,9 +753,15 @@ class RPCError(temporalio.exceptions.TemporalError): def __init__(self, message: str, status: RPCStatusCode, details: bytes) -> None: """Initialize RPC error.""" super().__init__(message) + self._message = message self._status = status self._details = details + @property + def message(self) -> str: + """Message for the error.""" + return self._message + @property def status(self) -> RPCStatusCode: """Status code for the error.""" diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 6626e60a8..3af84678b 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1202,45 +1202,53 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: except _ContinueAsNewError as err: logger.debug("Workflow requested continue as new") err._apply_command(self._add_command()) - except temporalio.exceptions.FailureError as err: + + # Note in some Python versions, cancelled error does not extend + # exception + # TODO(cretz): Should I fail the task on BaseException too (e.g. + # KeyboardInterrupt)? + except (Exception, asyncio.CancelledError) as err: logger.debug( f"Workflow raised failure with run ID {self._info.run_id}", exc_info=True, ) - # If a cancel was requested, and the failure is from an activity or - # child, and its cause was a cancellation, we want to use that cause - # instead because it means a cancel bubbled up while waiting on an - # activity or child. - if ( - self._cancel_requested - and ( - isinstance(err, temporalio.exceptions.ActivityError) - or isinstance(err, temporalio.exceptions.ChildWorkflowError) - ) - and isinstance(err.cause, temporalio.exceptions.CancelledError) - ): - err = err.cause - command = self._add_command() - command.fail_workflow_execution.failure.SetInParent() - try: - self._failure_converter.to_failure( - err, - self._payload_converter, - command.fail_workflow_execution.failure, + # All asyncio cancelled errors become Temporal cancelled errors + if isinstance(err, asyncio.CancelledError): + err = temporalio.exceptions.CancelledError(str(err)) + + # If a cancel was ever requested and this is a cancellation, or an + # activity/child cancellation, we add a cancel command. Technically + # this means that a swallowed cancel followed by, say, an activity + # cancel later on will show the workflow as cancelled. But this is + # a Temporal limitation in that cancellation is a state not an + # event. + if self._cancel_requested and ( + isinstance(err, temporalio.exceptions.CancelledError) + or ( + ( + isinstance(err, temporalio.exceptions.ActivityError) + or isinstance(err, temporalio.exceptions.ChildWorkflowError) + ) + and isinstance(err.cause, temporalio.exceptions.CancelledError) ) - except Exception as inner_err: - raise ValueError("Failed converting workflow exception") from inner_err - except asyncio.CancelledError as err: - command = self._add_command() - command.fail_workflow_execution.failure.SetInParent() - self._failure_converter.to_failure( - temporalio.exceptions.CancelledError(str(err)), - self._payload_converter, - command.fail_workflow_execution.failure, - ) - except Exception as err: - self._current_activation_error = err + ): + self._add_command().cancel_workflow_execution.SetInParent() + elif isinstance(err, temporalio.exceptions.FailureError): + # All other failure errors fail the workflow + failure = self._add_command().fail_workflow_execution.failure + failure.SetInParent() + try: + self._failure_converter.to_failure( + err, self._payload_converter, failure + ) + except Exception as inner_err: + raise ValueError( + "Failed converting workflow exception" + ) from inner_err + else: + # All other exceptions fail the task + self._current_activation_error = err async def _signal_external_workflow( self, @@ -1471,7 +1479,7 @@ async def handle_query(self, input: HandleQueryInput) -> Any: # an # interceptor could have changed the name if not handler: raise RuntimeError( - f"Query handler for {input.query} expected but not found" + f"Query handler for '{input.query}' expected but not found" ) # Put name first if dynamic args = list(input.args) if not dynamic else [input.query] + list(input.args) diff --git a/tests/test_client.py b/tests/test_client.py index 01d95222b..eb7e6c58a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -40,6 +40,7 @@ WorkflowExecutionStatus, WorkflowFailureError, WorkflowHandle, + WorkflowQueryFailedError, WorkflowQueryRejectedError, _history_from_json, ) @@ -246,10 +247,8 @@ async def test_query(client: Client, worker: ExternalWorker): await handle.result() assert "some query arg" == await handle.query("some query", "some query arg") # Try a query not on the workflow - with pytest.raises(RPCError) as err: + with pytest.raises(WorkflowQueryFailedError) as err: await handle.query("does not exist") - # TODO(cretz): Is this the status we expect all SDKs to report? - assert err.value.status == RPCStatusCode.INVALID_ARGUMENT async def test_query_rejected(client: Client, worker: ExternalWorker): diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 1fa98c7f9..c7eef2762 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -42,8 +42,10 @@ Client, RPCError, RPCStatusCode, + WorkflowExecutionStatus, WorkflowFailureError, WorkflowHandle, + WorkflowQueryFailedError, ) from temporalio.common import RetryPolicy, SearchAttributes from temporalio.converter import ( @@ -366,10 +368,16 @@ async def test_workflow_signal_and_query_errors(client: Client): assert isinstance(err.value.cause, ApplicationError) assert list(err.value.cause.details) == [123] # Fail query (no details on query failure) - with pytest.raises(RPCError) as rpc_err: + with pytest.raises(WorkflowQueryFailedError) as rpc_err: await handle.query(SignalAndQueryErrorsWorkflow.bad_query) - assert rpc_err.value.status is RPCStatusCode.INVALID_ARGUMENT assert str(rpc_err.value) == "query fail" + # Unrecognized query + with pytest.raises(WorkflowQueryFailedError) as rpc_err: + await handle.query("non-existent query") + assert ( + str(rpc_err.value) + == "Query handler for 'non-existent query' expected but not found" + ) @workflow.defn @@ -724,6 +732,7 @@ async def started() -> bool: with pytest.raises(WorkflowFailureError) as err: await handle.result() assert isinstance(err.value.cause, CancelledError) + assert (await handle.describe()).status == WorkflowExecutionStatus.CANCELED @workflow.defn