Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ Some things to note about the above code:
capabilities are needed.
* Local activities work very similarly except the functions are `workflow.start_local_activity()` and
`workflow.execute_local_activity()`
* Activities can be methods of a class. Invokers should use `workflow.start_activity_method()`,
`workflow.execute_activity_method()`, `workflow.start_local_activity_method()`, and
`workflow.execute_local_activity_method()` instead.
* Activities can callable classes (i.e. that define `__call__`). Invokers should use `workflow.start_activity_class()`,
`workflow.execute_activity_class()`, `workflow.start_local_activity_class()`, and
`workflow.execute_local_activity_class()` instead.

#### Invoking Child Workflows

Expand Down Expand Up @@ -465,7 +471,7 @@ While running in a workflow, in addition to features documented elsewhere, the f

#### Definition

Activities are functions decorated with `@activity.defn` like so:
Activities are decorated with `@activity.defn` like so:

```python
from temporalio import activity
Expand All @@ -482,6 +488,10 @@ Some things to note about activity definitions:
* Long running activities should regularly heartbeat and handle cancellation
* Activities can only have positional arguments. Best practice is to only take a single argument that is an
object/dataclass of fields that can be added to as needed.
* Activities can be defined on methods instead of top-level functions. This allows the instance to carry state that an
activity may need (e.g. a DB connection). The instance method should be what is registered with the worker.
* Activities can also be defined on callable classes (i.e. classes with `__call__`). An instance of the class should be
what is registered with the worker.

#### Types of Activities

Expand Down Expand Up @@ -721,6 +731,6 @@ poe test
* We use [Black](https://github.com/psf/black) for formatting, so that takes precedence
* In tests and example code, can import individual classes/functions to make it more readable. Can also do this for
rarely in library code for some Python common items (e.g. `dataclass` or `partial`), but not allowed to do this for
any `temporalio` packages or any classes/functions that aren't clear when unqualified.
any `temporalio` packages (except `temporalio.types`) or any classes/functions that aren't clear when unqualified.
* We allow relative imports for private packages
* We allow `@staticmethod`
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ html-output = "build/apidocs"
intersphinx = ["https://docs.python.org/3/objects.inv", "https://googleapis.dev/python/protobuf/latest/objects.inv"]
privacy = [
"PRIVATE:temporalio.bridge",
"PRIVATE:temporalio.types",
"HIDDEN:temporalio.worker.activity",
"HIDDEN:temporalio.worker.interceptor",
"HIDDEN:temporalio.worker.worker",
Expand Down
24 changes: 13 additions & 11 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,27 @@
NoReturn,
Optional,
Tuple,
TypeVar,
overload,
)

import temporalio.api.common.v1
import temporalio.common
import temporalio.exceptions

ActivityFunc = TypeVar("ActivityFunc", bound=Callable[..., Any])
from .types import CallableType


@overload
def defn(fn: ActivityFunc) -> ActivityFunc:
def defn(fn: CallableType) -> CallableType:
...


@overload
def defn(*, name: str) -> Callable[[ActivityFunc], ActivityFunc]:
def defn(*, name: str) -> Callable[[CallableType], CallableType]:
...


def defn(fn: Optional[ActivityFunc] = None, *, name: Optional[str] = None):
def defn(fn: Optional[CallableType] = None, *, name: Optional[str] = None):
"""Decorator for activity functions.

Activities can be async or non-async.
Expand All @@ -58,7 +57,7 @@ def defn(fn: Optional[ActivityFunc] = None, *, name: Optional[str] = None):
name: Name to use for the activity. Defaults to function ``__name__``.
"""

def with_name(name: str, fn: ActivityFunc) -> ActivityFunc:
def with_name(name: str, fn: CallableType) -> CallableType:
# This performs validation
_Definition._apply_to_callable(fn, name)
return fn
Expand Down Expand Up @@ -371,16 +370,19 @@ def _apply_to_callable(fn: Callable, activity_name: str) -> None:
raise ValueError("Function already contains activity definition")
elif not callable(fn):
raise TypeError("Activity is not callable")
elif not fn.__code__:
raise TypeError("Activity callable missing __code__")
elif fn.__code__.co_kwonlyargcount:
raise TypeError("Activity cannot have keyword-only arguments")
# We do not allow keyword only arguments in activities
sig = inspect.signature(fn)
for param in sig.parameters.values():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
raise TypeError("Activity cannot have keyword-only arguments")
setattr(
fn,
"__temporal_activity_definition",
_Definition(
name=activity_name,
fn=fn,
is_async=inspect.iscoroutinefunction(fn),
# iscoroutinefunction does not return true for async __call__
# TODO(cretz): Why can't MyPy handle this?
is_async=inspect.iscoroutinefunction(fn) or inspect.iscoroutinefunction(fn.__call__), # type: ignore
),
)
83 changes: 40 additions & 43 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,17 @@
import temporalio.workflow_service
from temporalio.workflow_service import RetryConfig, RPCError, RPCStatusCode, TLSConfig

LocalParamType = TypeVar("LocalParamType")
LocalReturnType = TypeVar("LocalReturnType")
WorkflowClass = TypeVar("WorkflowClass")
WorkflowReturnType = TypeVar("WorkflowReturnType")
MultiParamSpec = ParamSpec("MultiParamSpec")
from .types import (
LocalReturnType,
MethodAsyncNoParam,
MethodAsyncSingleParam,
MethodSyncOrAsyncNoParam,
MethodSyncOrAsyncSingleParam,
MultiParamSpec,
ParamType,
ReturnType,
SelfType,
)


class Client:
Expand Down Expand Up @@ -198,7 +204,7 @@ def data_converter(self) -> temporalio.converter.DataConverter:
@overload
async def start_workflow(
self,
workflow: Callable[[WorkflowClass], Awaitable[WorkflowReturnType]],
workflow: MethodAsyncNoParam[SelfType, ReturnType],
*,
id: str,
task_queue: str,
Expand All @@ -213,17 +219,15 @@ async def start_workflow(
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowHandle[WorkflowClass, WorkflowReturnType]:
) -> WorkflowHandle[SelfType, ReturnType]:
...

# Overload for single-param workflow
@overload
async def start_workflow(
self,
workflow: Callable[
[WorkflowClass, LocalParamType], Awaitable[WorkflowReturnType]
],
arg: LocalParamType,
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],
arg: ParamType,
*,
id: str,
task_queue: str,
Expand All @@ -238,15 +242,15 @@ async def start_workflow(
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowHandle[WorkflowClass, WorkflowReturnType]:
) -> WorkflowHandle[SelfType, ReturnType]:
...

# Overload for multi-param workflow
@overload
async def start_workflow(
self,
workflow: Callable[
Concatenate[WorkflowClass, MultiParamSpec], Awaitable[WorkflowReturnType]
Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]
],
*,
args: Iterable[Any],
Expand All @@ -263,7 +267,7 @@ async def start_workflow(
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowHandle[WorkflowClass, WorkflowReturnType]:
) -> WorkflowHandle[SelfType, ReturnType]:
...

# Overload for string-name workflow
Expand Down Expand Up @@ -377,7 +381,7 @@ async def start_workflow(
@overload
async def execute_workflow(
self,
workflow: Callable[[WorkflowClass], Awaitable[WorkflowReturnType]],
workflow: MethodAsyncNoParam[SelfType, ReturnType],
*,
id: str,
task_queue: str,
Expand All @@ -392,17 +396,15 @@ async def execute_workflow(
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowReturnType:
) -> ReturnType:
...

# Overload for single-param workflow
@overload
async def execute_workflow(
self,
workflow: Callable[
[WorkflowClass, LocalParamType], Awaitable[WorkflowReturnType]
],
arg: LocalParamType,
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],
arg: ParamType,
*,
id: str,
task_queue: str,
Expand All @@ -417,15 +419,15 @@ async def execute_workflow(
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowReturnType:
) -> ReturnType:
...

# Overload for multi-param workflow
@overload
async def execute_workflow(
self,
workflow: Callable[
Concatenate[WorkflowClass, MultiParamSpec], Awaitable[WorkflowReturnType]
Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]
],
*,
args: Iterable[Any],
Expand All @@ -442,7 +444,7 @@ async def execute_workflow(
header: Optional[Mapping[str, Any]] = None,
start_signal: Optional[str] = None,
start_signal_args: Iterable[Any] = [],
) -> WorkflowReturnType:
) -> ReturnType:
...

# Overload for string-name workflow
Expand Down Expand Up @@ -546,14 +548,14 @@ def get_workflow_handle(
def get_workflow_handle_for(
self,
workflow: Union[
Callable[[WorkflowClass, LocalParamType], Awaitable[WorkflowReturnType]],
Callable[[WorkflowClass], Awaitable[WorkflowReturnType]],
MethodAsyncNoParam[SelfType, ReturnType],
MethodAsyncSingleParam[SelfType, Any, ReturnType],
],
workflow_id: str,
*,
run_id: Optional[str] = None,
first_execution_run_id: Optional[str] = None,
) -> WorkflowHandle[WorkflowClass, WorkflowReturnType]:
) -> WorkflowHandle[SelfType, ReturnType]:
"""Get a typed workflow handle to an existing workflow by its ID.

This is the same as :py:meth:`get_workflow_handle` but typed. Note, the
Expand Down Expand Up @@ -641,7 +643,7 @@ class ClientConfig(TypedDict, total=False):
type_hint_eval_str: bool


class WorkflowHandle(Generic[WorkflowClass, WorkflowReturnType]):
class WorkflowHandle(Generic[SelfType, ReturnType]):
"""Handle for interacting with a workflow.

This is usually created via :py:meth:`Client.get_workflow_handle` or
Expand Down Expand Up @@ -714,7 +716,7 @@ def first_execution_run_id(self) -> Optional[str]:
"""
return self._first_execution_run_id

async def result(self, *, follow_runs: bool = True) -> WorkflowReturnType:
async def result(self, *, follow_runs: bool = True) -> ReturnType:
"""Wait for result of the workflow.

This will use :py:attr:`result_run_id` if present to base the result on.
Expand Down Expand Up @@ -772,10 +774,10 @@ async def result(self, *, follow_runs: bool = True) -> WorkflowReturnType:
type_hints,
)
if not results:
return cast(WorkflowReturnType, None)
return cast(ReturnType, None)
elif len(results) > 1:
warnings.warn(f"Expected single result, got {len(results)}")
return cast(WorkflowReturnType, results[0])
return cast(ReturnType, results[0])
elif event.HasField("workflow_execution_failed_event_attributes"):
fail_attr = event.workflow_execution_failed_event_attributes
# Follow execution
Expand Down Expand Up @@ -891,9 +893,7 @@ async def describe(
@overload
async def query(
self,
query: Callable[
[WorkflowClass], Union[Awaitable[LocalReturnType], LocalReturnType]
],
query: MethodSyncOrAsyncNoParam[SelfType, LocalReturnType],
*,
reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
) -> LocalReturnType:
Expand All @@ -903,11 +903,8 @@ async def query(
@overload
async def query(
self,
query: Callable[
[WorkflowClass, LocalParamType],
Union[Awaitable[LocalReturnType], LocalReturnType],
],
arg: LocalParamType,
query: MethodSyncOrAsyncSingleParam[SelfType, ParamType, LocalReturnType],
arg: ParamType,
*,
reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
) -> LocalReturnType:
Expand All @@ -918,7 +915,7 @@ async def query(
async def query(
self,
query: Callable[
Concatenate[WorkflowClass, MultiParamSpec],
Concatenate[SelfType, MultiParamSpec],
Union[Awaitable[LocalReturnType], LocalReturnType],
],
*,
Expand Down Expand Up @@ -1005,16 +1002,16 @@ async def query(
@overload
async def signal(
self,
signal: Callable[[WorkflowClass], Union[Awaitable[None], None]],
signal: MethodSyncOrAsyncNoParam[SelfType, None],
) -> None:
...

# Overload for single-param signal
@overload
async def signal(
self,
signal: Callable[[WorkflowClass, LocalParamType], Union[Awaitable[None], None]],
arg: LocalParamType,
signal: MethodSyncOrAsyncSingleParam[SelfType, ParamType, None],
arg: ParamType,
) -> None:
...

Expand All @@ -1023,7 +1020,7 @@ async def signal(
async def signal(
self,
signal: Callable[
Concatenate[WorkflowClass, MultiParamSpec], Union[Awaitable[None], None]
Concatenate[SelfType, MultiParamSpec], Union[Awaitable[None], None]
],
*,
args: Iterable[Any],
Expand Down
17 changes: 12 additions & 5 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,11 +690,18 @@ def get_type_hints(self, fn: Any) -> Tuple[Optional[List[Type]], Optional[Type]]
# Due to MyPy issues, we cannot type "fn" as callable
if not callable(fn):
return (None, None)
ret = self._cache.get(fn.__qualname__)
if not ret:
# TODO(cretz): Do we even need to cache?
ret = _type_hints_from_func(fn, eval_str=self._type_hint_eval_str)
self._cache[fn.__qualname__] = ret
# We base the cache key on the qualified name of the function. However,
# since some callables are not functions, we assume we can never cache
# these just in case the type hints are dynamic for some strange reason.
cache_key = getattr(fn, "__qualname__", None)
if cache_key:
ret = self._cache.get(cache_key)
if ret:
return ret
# TODO(cretz): Do we even need to cache?
ret = _type_hints_from_func(fn, eval_str=self._type_hint_eval_str)
if cache_key:
self._cache[cache_key] = ret
return ret


Expand Down
Loading