Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ build-develop = "python scripts/setup_bridge.py develop"
build-develop-with-release = { cmd = "python scripts/setup_bridge.py develop", env = { TEMPORAL_BUILD_RELEASE = "1" }}
fix-wheel = "python scripts/fix_wheel.py"
format = [{cmd = "black ."}, {cmd = "isort ."}]
gen-docs = "pydoctor"
gen-docs = "python scripts/gen_docs.py"
gen-protos = "python scripts/gen_protos.py"
lint = [
{cmd = "black --check ."},
Expand Down
Binary file added scripts/_img/favicon.ico
Binary file not shown.
17 changes: 17 additions & 0 deletions scripts/gen_docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import shutil
import subprocess
from pathlib import Path

base_dir = Path(__file__).parent.parent

if __name__ == "__main__":
print("Generating documentation...")

# Run pydoctor
subprocess.check_call("pydoctor")

# Copy favicon
shutil.copyfile(
base_dir / "scripts" / "_img" / "favicon.ico",
base_dir / "build" / "apidocs" / "favicon.ico",
)
18 changes: 16 additions & 2 deletions temporalio/bridge/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use temporal_sdk_core::{
telemetry_init, Logger, MetricsExporter, OtelCollectorOptions, TelemetryOptions,
TelemetryOptionsBuilder, TraceExporter,
telemetry_init, Logger, MetricTemporality, MetricsExporter, OtelCollectorOptions,
TelemetryOptions, TelemetryOptionsBuilder, TraceExporter,
};
use url::Url;

Expand All @@ -23,6 +23,7 @@ pub struct TelemetryConfig {
log_forwarding_level: Option<String>,
otel_metrics: Option<OtelCollectorConfig>,
prometheus_metrics: Option<PrometheusMetricsConfig>,
metric_temporality: String,
}

#[derive(FromPyObject)]
Expand Down Expand Up @@ -83,6 +84,19 @@ impl TryFrom<TelemetryConfig> for TelemetryOptions {
})?,
));
}
match conf.metric_temporality.as_str() {
"cumulative" => {
build.metric_temporality(MetricTemporality::Cumulative);
}
"delta" => {
build.metric_temporality(MetricTemporality::Delta);
}
_ => {
return Err(PyValueError::new_err(
"Invalid metric temporality, expected 'cumulative' or 'delta'",
));
}
}
build
.build()
.map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {}", err)))
Expand Down
3 changes: 3 additions & 0 deletions temporalio/bridge/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from dataclasses import dataclass
from typing import Mapping, Optional

from typing_extensions import Literal

import temporalio.bridge.temporal_sdk_bridge


Expand All @@ -22,6 +24,7 @@ class TelemetryConfig:
log_forwarding_level: Optional[str] = None
otel_metrics: Optional[OtelCollectorConfig] = None
prometheus_metrics: Optional[PrometheusMetricsConfig] = None
metric_temporality: Literal["cumulative", "delta"] = "cumulative"


@dataclass
Expand Down
28 changes: 25 additions & 3 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,20 @@ def status(self) -> Optional[WorkflowExecutionStatus]:
return self._status


class WorkflowQueryFailedError(temporalio.exceptions.TemporalError):
"""Error that occurs when a query fails."""

def __init__(self, message: str) -> None:
"""Create workflow query failed error."""
super().__init__(message)
self._message = message

@property
def message(self) -> str:
"""Get query failed message."""
return self._message


class AsyncActivityCancelledError(temporalio.exceptions.TemporalError):
"""Error that occurs when async activity attempted heartbeat but was cancelled."""

Expand Down Expand Up @@ -2410,9 +2424,17 @@ async def query_workflow(self, input: QueryWorkflowInput) -> Any:
)
if input.headers is not None:
temporalio.common._apply_headers(input.headers, req.query.header.fields)
resp = await self._client.workflow_service.query_workflow(
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
)
try:
resp = await self._client.workflow_service.query_workflow(
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
)
except RPCError as err:
# If the status is INVALID_ARGUMENT, we can assume it's a query
# failed error
if err.status == RPCStatusCode.INVALID_ARGUMENT:
raise WorkflowQueryFailedError(err.message)
else:
raise
if resp.HasField("query_rejected"):
raise WorkflowQueryRejectedError(
WorkflowExecutionStatus(resp.query_rejected.status)
Expand Down
7 changes: 6 additions & 1 deletion temporalio/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,6 @@ async def __call__(
class _BridgeServiceClient(ServiceClient):
@staticmethod
async def connect(config: ConnectConfig) -> _BridgeServiceClient:
# TODO(cretz): Expose telemetry init config
temporalio.bridge.telemetry.init_telemetry(
temporalio.bridge.telemetry.TelemetryConfig(),
warn_if_already_inited=False,
Expand Down Expand Up @@ -754,9 +753,15 @@ class RPCError(temporalio.exceptions.TemporalError):
def __init__(self, message: str, status: RPCStatusCode, details: bytes) -> None:
"""Initialize RPC error."""
super().__init__(message)
self._message = message
self._status = status
self._details = details

@property
def message(self) -> str:
"""Message for the error."""
return self._message

@property
def status(self) -> RPCStatusCode:
"""Status code for the error."""
Expand Down
76 changes: 42 additions & 34 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,45 +1202,53 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
except _ContinueAsNewError as err:
logger.debug("Workflow requested continue as new")
err._apply_command(self._add_command())
except temporalio.exceptions.FailureError as err:

# Note in some Python versions, cancelled error does not extend
# exception
# TODO(cretz): Should I fail the task on BaseException too (e.g.
# KeyboardInterrupt)?
except (Exception, asyncio.CancelledError) as err:
logger.debug(
f"Workflow raised failure with run ID {self._info.run_id}",
exc_info=True,
)
# If a cancel was requested, and the failure is from an activity or
# child, and its cause was a cancellation, we want to use that cause
# instead because it means a cancel bubbled up while waiting on an
# activity or child.
if (
self._cancel_requested
and (
isinstance(err, temporalio.exceptions.ActivityError)
or isinstance(err, temporalio.exceptions.ChildWorkflowError)
)
and isinstance(err.cause, temporalio.exceptions.CancelledError)
):
err = err.cause

command = self._add_command()
command.fail_workflow_execution.failure.SetInParent()
try:
self._failure_converter.to_failure(
err,
self._payload_converter,
command.fail_workflow_execution.failure,
# All asyncio cancelled errors become Temporal cancelled errors
if isinstance(err, asyncio.CancelledError):
err = temporalio.exceptions.CancelledError(str(err))

# If a cancel was ever requested and this is a cancellation, or an
# activity/child cancellation, we add a cancel command. Technically
# this means that a swallowed cancel followed by, say, an activity
# cancel later on will show the workflow as cancelled. But this is
# a Temporal limitation in that cancellation is a state not an
# event.
if self._cancel_requested and (
isinstance(err, temporalio.exceptions.CancelledError)
or (
(
isinstance(err, temporalio.exceptions.ActivityError)
or isinstance(err, temporalio.exceptions.ChildWorkflowError)
)
and isinstance(err.cause, temporalio.exceptions.CancelledError)
)
except Exception as inner_err:
raise ValueError("Failed converting workflow exception") from inner_err
except asyncio.CancelledError as err:
command = self._add_command()
command.fail_workflow_execution.failure.SetInParent()
self._failure_converter.to_failure(
temporalio.exceptions.CancelledError(str(err)),
self._payload_converter,
command.fail_workflow_execution.failure,
)
except Exception as err:
self._current_activation_error = err
):
self._add_command().cancel_workflow_execution.SetInParent()
elif isinstance(err, temporalio.exceptions.FailureError):
# All other failure errors fail the workflow
failure = self._add_command().fail_workflow_execution.failure
failure.SetInParent()
try:
self._failure_converter.to_failure(
err, self._payload_converter, failure
)
except Exception as inner_err:
raise ValueError(
"Failed converting workflow exception"
) from inner_err
else:
# All other exceptions fail the task
self._current_activation_error = err

async def _signal_external_workflow(
self,
Expand Down Expand Up @@ -1471,7 +1479,7 @@ async def handle_query(self, input: HandleQueryInput) -> Any:
# an # interceptor could have changed the name
if not handler:
raise RuntimeError(
f"Query handler for {input.query} expected but not found"
f"Query handler for '{input.query}' expected but not found"
)
# Put name first if dynamic
args = list(input.args) if not dynamic else [input.query] + list(input.args)
Expand Down
5 changes: 2 additions & 3 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
WorkflowExecutionStatus,
WorkflowFailureError,
WorkflowHandle,
WorkflowQueryFailedError,
WorkflowQueryRejectedError,
_history_from_json,
)
Expand Down Expand Up @@ -246,10 +247,8 @@ async def test_query(client: Client, worker: ExternalWorker):
await handle.result()
assert "some query arg" == await handle.query("some query", "some query arg")
# Try a query not on the workflow
with pytest.raises(RPCError) as err:
with pytest.raises(WorkflowQueryFailedError) as err:
await handle.query("does not exist")
# TODO(cretz): Is this the status we expect all SDKs to report?
assert err.value.status == RPCStatusCode.INVALID_ARGUMENT


async def test_query_rejected(client: Client, worker: ExternalWorker):
Expand Down
13 changes: 11 additions & 2 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
Client,
RPCError,
RPCStatusCode,
WorkflowExecutionStatus,
WorkflowFailureError,
WorkflowHandle,
WorkflowQueryFailedError,
)
from temporalio.common import RetryPolicy, SearchAttributes
from temporalio.converter import (
Expand Down Expand Up @@ -366,10 +368,16 @@ async def test_workflow_signal_and_query_errors(client: Client):
assert isinstance(err.value.cause, ApplicationError)
assert list(err.value.cause.details) == [123]
# Fail query (no details on query failure)
with pytest.raises(RPCError) as rpc_err:
with pytest.raises(WorkflowQueryFailedError) as rpc_err:
await handle.query(SignalAndQueryErrorsWorkflow.bad_query)
assert rpc_err.value.status is RPCStatusCode.INVALID_ARGUMENT
assert str(rpc_err.value) == "query fail"
# Unrecognized query
with pytest.raises(WorkflowQueryFailedError) as rpc_err:
await handle.query("non-existent query")
assert (
str(rpc_err.value)
== "Query handler for 'non-existent query' expected but not found"
)


@workflow.defn
Expand Down Expand Up @@ -724,6 +732,7 @@ async def started() -> bool:
with pytest.raises(WorkflowFailureError) as err:
await handle.result()
assert isinstance(err.value.cause, CancelledError)
assert (await handle.describe()).status == WorkflowExecutionStatus.CANCELED


@workflow.defn
Expand Down