Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ Some things to note about the above code:
capabilities are needed.
* Local activities work very similarly except the functions are `workflow.start_local_activity()` and
`workflow.execute_local_activity()`
* Activities that are defined on a callable class (i.e. with `__call__`) or a class method, have to use that callable
as the first parameter. That means a do-nothing class may need to be instantiated to have access to the callable.

#### Invoking Child Workflows

Expand Down Expand Up @@ -482,6 +484,13 @@ Some things to note about activity definitions:
* Long running activities should regularly heartbeat and handle cancellation
* Activities can only have positional arguments. Best practice is to only take a single argument that is an
object/dataclass of fields that can be added to as needed.
* Callable classes (i.e. with `__call__` defined) may be defined as activities, but then an _instance_ of the class must
be what is registered with the worker which will be used during invocation. Similarly, an _instance_ of the class must
be what is used to call the activity from the workflow even though that instance is a dummy instance and is not the
one used for invocation.
* Methods on classes may be defined as activities. Similar to callable classes above, it is the method reference that
must be used during worker registration and during workflow invocation. The latter means that there will likely need
to be a dummy instance during workflow to have access to the method reference.

#### Types of Activities

Expand Down
13 changes: 8 additions & 5 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,19 @@ def _apply_to_callable(fn: Callable, activity_name: str) -> None:
raise ValueError("Function already contains activity definition")
elif not callable(fn):
raise TypeError("Activity is not callable")
elif not fn.__code__:
raise TypeError("Activity callable missing __code__")
elif fn.__code__.co_kwonlyargcount:
raise TypeError("Activity cannot have keyword-only arguments")
# We do not allow keyword only arguments in activities
sig = inspect.signature(fn)
for param in sig.parameters.values():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
raise TypeError("Activity cannot have keyword-only arguments")
setattr(
fn,
"__temporal_activity_definition",
_Definition(
name=activity_name,
fn=fn,
is_async=inspect.iscoroutinefunction(fn),
# iscoroutinefunction does not return true for async __call__
# TODO(cretz): Why can't MyPy handle this?
is_async=inspect.iscoroutinefunction(fn) or inspect.iscoroutinefunction(fn.__call__), # type: ignore
),
)
17 changes: 12 additions & 5 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,18 @@ def get_type_hints(self, fn: Any) -> Tuple[Optional[List[Type]], Optional[Type]]
# Due to MyPy issues, we cannot type "fn" as callable
if not callable(fn):
return (None, None)
ret = self._cache.get(fn.__qualname__)
if not ret:
# TODO(cretz): Do we even need to cache?
ret = _type_hints_from_func(fn, eval_str=self._type_hint_eval_str)
self._cache[fn.__qualname__] = ret
# We base the cache key on the qualified name of the function. However,
# since some callables are not functions, we assume we can never cache
# these just in case the type hints are dynamic for some strange reason.
cache_key = getattr(fn, "__qualname__", None)
if cache_key:
ret = self._cache.get(cache_key)
if ret:
return ret
# TODO(cretz): Do we even need to cache?
ret = _type_hints_from_func(fn, eval_str=self._type_hint_eval_str)
if cache_key:
self._cache[cache_key] = ret
return ret


Expand Down
5 changes: 3 additions & 2 deletions temporalio/worker/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ async def _run_activity(
)

# Setup events
if not inspect.iscoroutinefunction(activity_def.fn):
if not activity_def.is_async:
running_activity.sync = True
# If we're in a thread-pool executor we can use threading events
# otherwise we must use manager events
Expand Down Expand Up @@ -521,7 +521,8 @@ def init(self, outbound: ActivityOutboundInterceptor) -> None:

async def execute_activity(self, input: ExecuteActivityInput) -> Any:
# Handle synchronous activity
if not inspect.iscoroutinefunction(input.fn):
is_async = inspect.iscoroutinefunction(input.fn) or inspect.iscoroutinefunction(input.fn.__call__) # type: ignore
if not is_async:
# We execute a top-level function via the executor. It is top-level
# because it needs to be picklable. Also, by default Python does not
# propagate contextvars into executor futures so we don't either
Expand Down
66 changes: 66 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1935,6 +1935,72 @@ async def test_workflow_uuid(client: Client):
assert handle2_query_result == await handle2.query(UUIDWorkflow.result)


@activity.defn
class CallableClassActivity:
def __init__(self, orig: MyDataClass) -> None:
self.orig = orig

async def __call__(self, to_add: MyDataClass) -> MyDataClass:
return MyDataClass(field1=self.orig.field1 + to_add.field1)


@workflow.defn
class ActivityCallableClassWorkflow:
@workflow.run
async def run(self, to_add: MyDataClass) -> MyDataClass:
activity_instance = CallableClassActivity(MyDataClass(field1="in workflow"))
return await workflow.execute_activity(
activity_instance, to_add, start_to_close_timeout=timedelta(seconds=30)
)


async def test_workflow_activity_callable_class(client: Client):
activity_instance = CallableClassActivity(MyDataClass(field1="in worker"))
async with new_worker(
client, ActivityCallableClassWorkflow, activities=[activity_instance]
) as worker:
result = await client.execute_workflow(
ActivityCallableClassWorkflow.run,
MyDataClass(field1=", workflow param"),
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
assert result == MyDataClass(field1="in worker, workflow param")


class MethodActivity:
def __init__(self, orig: MyDataClass) -> None:
self.orig = orig

@activity.defn
async def add(self, to_add: MyDataClass) -> MyDataClass:
return MyDataClass(field1=self.orig.field1 + to_add.field1)


@workflow.defn
class ActivityMethodWorkflow:
@workflow.run
async def run(self, to_add: MyDataClass) -> MyDataClass:
activity_instance = MethodActivity(MyDataClass(field1="in workflow"))
return await workflow.execute_activity(
activity_instance.add, to_add, start_to_close_timeout=timedelta(seconds=30)
)


async def test_workflow_activity_method(client: Client):
activity_instance = MethodActivity(MyDataClass(field1="in worker"))
async with new_worker(
client, ActivityMethodWorkflow, activities=[activity_instance.add]
) as worker:
result = await client.execute_workflow(
ActivityMethodWorkflow.run,
MyDataClass(field1=", workflow param"),
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
assert result == MyDataClass(field1="in worker, workflow param")


def new_worker(
client: Client,
*workflows: Type,
Expand Down