Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1489,21 +1489,21 @@ https://github.com/temporalio/samples-python/tree/nexus/hello_nexus).

### Plugins

Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of
responsibility pattern. They allow you to intercept and modify client creation, service connections, worker
Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of
responsibility pattern. They allow you to intercept and modify client creation, service connections, worker
configuration, and worker execution. Common customizations may include but are not limited to:

1. DataConverter
2. Activities
3. Workflows
4. Interceptors

A single plugin class can implement both client and worker plugin interfaces to share common logic between both
A single plugin class can implement both client and worker plugin interfaces to share common logic between both
contexts. When used with a client, it will automatically be propagated to any workers created with that client.

#### Client Plugins

Client plugins can intercept and modify client configuration and service connections. They are useful for adding
Client plugins can intercept and modify client configuration and service connections. They are useful for adding
authentication, modifying connection parameters, or adding custom behavior during client creation.

Here's an example of a client plugin that adds custom authentication:
Expand All @@ -1515,7 +1515,7 @@ import temporalio.service
class AuthenticationPlugin(Plugin):
def __init__(self, api_key: str):
self.api_key = api_key

def init_client_plugin(self, next: Plugin) -> None:
self.next_client_plugin = next

Expand All @@ -1540,10 +1540,10 @@ client = await Client.connect(

#### Worker Plugins

Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
They should do this in the case that they modified the worker in a way which would also need to be present
for replay to function. For instance, changing the data converter or adding workflows.
Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
They should do this in the case that they modified the worker in a way which would also need to be present
for replay to function. For instance, changing the data converter or adding workflows.

Here's an example of a worker plugin that adds custom monitoring:

Expand All @@ -1560,7 +1560,7 @@ class MonitoringPlugin(Plugin):

def init_worker_plugin(self, next: Plugin) -> None:
self.next_worker_plugin = next

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Modify worker configuration
original_task_queue = config["task_queue"]
Expand All @@ -1574,22 +1574,22 @@ class MonitoringPlugin(Plugin):
await self.next_worker_plugin.run_worker(worker)
finally:
self.logger.info("Worker execution completed")
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return self.next_worker_plugin.configure_replayer(config)
@asynccontextmanager
async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
self.logger.info("Starting replay execution")
try:
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
yield results
finally:
self.logger.info("Replay execution completed")

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return self.next_worker_plugin.configure_replayer(config)

@asynccontextmanager
async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
self.logger.info("Starting replay execution")
try:
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
yield results
finally:
self.logger.info("Replay execution completed")

# Use the plugin when creating a worker
worker = Worker(
Expand Down Expand Up @@ -1617,38 +1617,38 @@ class UnifiedPlugin(ClientPlugin, WorkerPlugin):

def init_worker_plugin(self, next: WorkerPlugin) -> None:
self.next_worker_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["data_converter"] = pydantic_data_converter
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
print("Starting unified worker")
await self.next_worker_plugin.run_worker(worker)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
config["data_converter"] = pydantic_data_converter
return self.next_worker_plugin.configure_replayer(config)
def run_replayer(

async def run_replayer(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Method Returns Coroutine Instead of Context Manager

The UnifiedPlugin.run_replayer method was changed from def to async def. This makes the method return a coroutine, but it should directly return an AbstractAsyncContextManager by delegating to the next plugin. This change breaks the API contract, as callers would now need to await the method before using the context manager.

Fix in Cursor Fix in Web

self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return self.next_worker_plugin.run_replayer(replayer, histories)

# Create client with the unified plugin
client = await Client.connect(
"localhost:7233",
Expand Down
Loading