diff --git a/README.md b/README.md index ec15d4408..58cdeb055 100644 --- a/README.md +++ b/README.md @@ -1489,8 +1489,8 @@ https://github.com/temporalio/samples-python/tree/nexus/hello_nexus). ### Plugins -Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of -responsibility pattern. They allow you to intercept and modify client creation, service connections, worker +Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of +responsibility pattern. They allow you to intercept and modify client creation, service connections, worker configuration, and worker execution. Common customizations may include but are not limited to: 1. DataConverter @@ -1498,12 +1498,12 @@ configuration, and worker execution. Common customizations may include but are n 3. Workflows 4. Interceptors -A single plugin class can implement both client and worker plugin interfaces to share common logic between both +A single plugin class can implement both client and worker plugin interfaces to share common logic between both contexts. When used with a client, it will automatically be propagated to any workers created with that client. #### Client Plugins -Client plugins can intercept and modify client configuration and service connections. They are useful for adding +Client plugins can intercept and modify client configuration and service connections. They are useful for adding authentication, modifying connection parameters, or adding custom behavior during client creation. Here's an example of a client plugin that adds custom authentication: @@ -1515,7 +1515,7 @@ import temporalio.service class AuthenticationPlugin(Plugin): def __init__(self, api_key: str): self.api_key = api_key - + def init_client_plugin(self, next: Plugin) -> None: self.next_client_plugin = next @@ -1540,10 +1540,10 @@ client = await Client.connect( #### Worker Plugins -Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring, -custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay. -They should do this in the case that they modified the worker in a way which would also need to be present -for replay to function. For instance, changing the data converter or adding workflows. +Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring, +custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay. +They should do this in the case that they modified the worker in a way which would also need to be present +for replay to function. For instance, changing the data converter or adding workflows. Here's an example of a worker plugin that adds custom monitoring: @@ -1560,7 +1560,7 @@ class MonitoringPlugin(Plugin): def init_worker_plugin(self, next: Plugin) -> None: self.next_worker_plugin = next - + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: # Modify worker configuration original_task_queue = config["task_queue"] @@ -1574,22 +1574,22 @@ class MonitoringPlugin(Plugin): await self.next_worker_plugin.run_worker(worker) finally: self.logger.info("Worker execution completed") - - def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: - return self.next_worker_plugin.configure_replayer(config) - - @asynccontextmanager - async def run_replayer( - self, - replayer: Replayer, - histories: AsyncIterator[temporalio.client.WorkflowHistory], - ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: - self.logger.info("Starting replay execution") - try: - async with self.next_worker_plugin.run_replayer(replayer, histories) as results: - yield results - finally: - self.logger.info("Replay execution completed") + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + return self.next_worker_plugin.configure_replayer(config) + + @asynccontextmanager + async def run_replayer( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: + self.logger.info("Starting replay execution") + try: + async with self.next_worker_plugin.run_replayer(replayer, histories) as results: + yield results + finally: + self.logger.info("Replay execution completed") # Use the plugin when creating a worker worker = Worker( @@ -1617,38 +1617,38 @@ class UnifiedPlugin(ClientPlugin, WorkerPlugin): def init_worker_plugin(self, next: WorkerPlugin) -> None: self.next_worker_plugin = next - + def configure_client(self, config: ClientConfig) -> ClientConfig: # Client-side customization config["data_converter"] = pydantic_data_converter return self.next_client_plugin.configure_client(config) - + async def connect_service_client( self, config: temporalio.service.ConnectConfig ) -> temporalio.service.ServiceClient: # Add authentication to the connection config.api_key = self.api_key return await self.next_client_plugin.connect_service_client(config) - + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: # Worker-side customization return self.next_worker_plugin.configure_worker(config) - + async def run_worker(self, worker: Worker) -> None: print("Starting unified worker") await self.next_worker_plugin.run_worker(worker) - + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: config["data_converter"] = pydantic_data_converter return self.next_worker_plugin.configure_replayer(config) - - def run_replayer( + + async def run_replayer( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: return self.next_worker_plugin.run_replayer(replayer, histories) - + # Create client with the unified plugin client = await Client.connect( "localhost:7233",