Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 99 additions & 30 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid
import warnings
from dataclasses import dataclass
from datetime import timedelta
from datetime import datetime, timedelta, timezone
from enum import IntEnum
from typing import (
Any,
Expand Down Expand Up @@ -867,7 +867,7 @@ async def cancel(self) -> None:

async def describe(
self,
) -> WorkflowDescription:
) -> WorkflowExecutionDescription:
"""Get workflow details.

This will get details for :py:attr:`run_id` if present. To use a
Expand Down Expand Up @@ -1162,29 +1162,95 @@ async def report_cancellation(self, *details: Any) -> None:
)


class WorkflowDescription:
"""Description for a workflow."""
@dataclass
class WorkflowExecutionDescription:
"""Description for a single workflow execution run."""

def __init__(
self,
raw_message: temporalio.api.workflowservice.v1.DescribeWorkflowExecutionResponse,
):
"""Create a workflow description from a describe response."""
self._raw_message = raw_message
status = raw_message.workflow_execution_info.status
self._status = WorkflowExecutionStatus(status) if status else None
close_time: Optional[datetime]
"""When the workflow was closed if closed."""

@property
def raw_message(
self,
) -> temporalio.api.workflowservice.v1.DescribeWorkflowExecutionResponse:
"""Underlying workflow description response."""
return self._raw_message
execution_time: Optional[datetime]
"""When this workflow run started or should start."""

@property
def status(self) -> Optional[WorkflowExecutionStatus]:
"""Status of the workflow."""
return self._status
history_length: int
"""Number of events in the history."""

id: str
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be workflow_id. It's that way in TS and you have to infer what it is by looking at run id and knowing there are two types of IDs, which seems less than ideal.

Copy link
Member Author

@cretz cretz Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python we have used id where it's obviously a workflow ID to avoid stuttering and unnecessary typing. So we have WorkflowHandle.id, start_workflow(id="myid") etc. This was how it was originally proposed/designed, but if we want to change all of these we can (I'd do it in a separate PR). I will bring it up.

"""ID for the workflow."""

memo: Mapping[str, Any]
"""Memo values on the workflow if any."""

parent_id: Optional[str]
"""ID for the parent workflow if this was started as a child."""

parent_run_id: Optional[str]
"""Run ID for the parent workflow if this was started as a child."""

raw: temporalio.api.workflowservice.v1.DescribeWorkflowExecutionResponse
"""Underlying API describe response."""

run_id: str
"""Run ID for this workflow run."""

search_attributes: temporalio.common.SearchAttributes
"""Current set of search attributes if any."""

start_time: datetime
"""When the workflow was created."""

status: Optional[WorkflowExecutionStatus]
"""Status for the workflow."""

task_queue: str
"""Task queue for the workflow."""

workflow_type: str
"""Type name for the workflow."""

@staticmethod
async def from_raw(
raw: temporalio.api.workflowservice.v1.DescribeWorkflowExecutionResponse,
converter: temporalio.converter.DataConverter,
) -> WorkflowExecutionDescription:
"""Create a description from a raw description response."""
return WorkflowExecutionDescription(
close_time=raw.workflow_execution_info.close_time.ToDatetime().replace(
tzinfo=timezone.utc
)
if raw.workflow_execution_info.HasField("close_time")
else None,
execution_time=raw.workflow_execution_info.execution_time.ToDatetime().replace(
tzinfo=timezone.utc
)
if raw.workflow_execution_info.HasField("execution_time")
else None,
history_length=raw.workflow_execution_info.history_length,
id=raw.workflow_execution_info.execution.workflow_id,
memo={
k: (await converter.decode([v]))[0]
for k, v in raw.workflow_execution_info.memo.fields.items()
},
parent_id=raw.workflow_execution_info.parent_execution.workflow_id
if raw.workflow_execution_info.HasField("parent_execution")
else None,
parent_run_id=raw.workflow_execution_info.parent_execution.run_id
if raw.workflow_execution_info.HasField("parent_execution")
else None,
raw=raw,
run_id=raw.workflow_execution_info.execution.run_id,
search_attributes=temporalio.converter.decode_search_attributes(
raw.workflow_execution_info.search_attributes
),
start_time=raw.workflow_execution_info.start_time.ToDatetime().replace(
tzinfo=timezone.utc
),
status=WorkflowExecutionStatus(raw.workflow_execution_info.status)
if raw.workflow_execution_info.status
else None,
task_queue=raw.workflow_execution_info.task_queue,
workflow_type=raw.workflow_execution_info.type.name,
)


class WorkflowExecutionStatus(IntEnum):
Expand Down Expand Up @@ -1434,7 +1500,7 @@ async def cancel_workflow(self, input: CancelWorkflowInput) -> None:

async def describe_workflow(
self, input: DescribeWorkflowInput
) -> WorkflowDescription:
) -> WorkflowExecutionDescription:
"""Called for every :py:meth:`WorkflowHandle.describe` call."""

async def query_workflow(self, input: QueryWorkflowInput) -> Any:
Expand Down Expand Up @@ -1522,16 +1588,18 @@ async def start_workflow(
req.cron_schedule = input.cron_schedule
if input.memo is not None:
for k, v in input.memo.items():
req.memo.fields[k] = (await self._client.data_converter.encode([v]))[0]
req.memo.fields[k].CopyFrom(
(await self._client.data_converter.encode([v]))[0]
)
if input.search_attributes is not None:
temporalio.converter.encode_search_attributes(
input.search_attributes, req.search_attributes
)
if input.header is not None:
for k, v in input.header.items():
req.header.fields[k] = (await self._client.data_converter.encode([v]))[
0
]
req.header.fields[k].CopyFrom(
(await self._client.data_converter.encode([v]))[0]
)

# Start with signal or just normal start
resp: Union[
Expand Down Expand Up @@ -1574,8 +1642,8 @@ async def cancel_workflow(self, input: CancelWorkflowInput) -> None:

async def describe_workflow(
self, input: DescribeWorkflowInput
) -> WorkflowDescription:
return WorkflowDescription(
) -> WorkflowExecutionDescription:
return await WorkflowExecutionDescription.from_raw(
await self._client.service.describe_workflow_execution(
temporalio.api.workflowservice.v1.DescribeWorkflowExecutionRequest(
namespace=self._client.namespace,
Expand All @@ -1585,7 +1653,8 @@ async def describe_workflow(
),
),
retry=True,
)
),
self._client.data_converter,
)

async def query_workflow(self, input: QueryWorkflowInput) -> Any:
Expand Down
21 changes: 16 additions & 5 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid
from datetime import timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, List, Optional, Tuple

import pytest
Expand Down Expand Up @@ -172,14 +172,25 @@ async def test_describe(client: Client, worker: ExternalWorker):
KSWorkflowParams(actions=[KSAction(result=KSResultAction(value="some value"))]),
id=str(uuid.uuid4()),
task_queue=worker.task_queue,
memo={"foo": "bar"},
)
assert "some value" == await handle.result()
desc = await handle.describe()
assert desc.close_time and abs(
desc.close_time - datetime.now(timezone.utc)
) < timedelta(seconds=20)
assert desc.execution_time and abs(
desc.execution_time - datetime.now(timezone.utc)
) < timedelta(seconds=20)
assert desc.id == handle.id
assert desc.memo == {"foo": "bar"}
assert not desc.parent_id
assert not desc.parent_run_id
assert desc.run_id == handle.first_execution_run_id
assert abs(desc.start_time - datetime.now(timezone.utc)) < timedelta(seconds=20)
assert desc.status == WorkflowExecutionStatus.COMPLETED
assert (
desc.raw_message.workflow_execution_info.status
== temporalio.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
)
assert desc.task_queue == worker.task_queue
assert desc.workflow_type == "kitchen_sink"


async def test_query(client: Client, worker: ExternalWorker):
Expand Down
7 changes: 3 additions & 4 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,11 +1291,10 @@ async def search_attributes_present() -> bool:

# Also confirm it matches describe from the server
desc = await handle.describe()
attrs = decode_search_attributes(
desc.raw_message.workflow_execution_info.search_attributes
)
# Remove attrs without our prefix
attrs = {k: v for k, v in attrs.items() if k.startswith(sa_prefix)}
attrs = {
k: v for k, v in desc.search_attributes.items() if k.startswith(sa_prefix)
}
assert expected == search_attrs_to_dict_with_type(attrs)


Expand Down