Skip to content

Commit 6fcda0f

Browse files
authored
Convert class to module (#958)
1 parent e73d6b5 commit 6fcda0f

File tree

3 files changed

+219
-216
lines changed

3 files changed

+219
-216
lines changed

temporalio/contrib/openai_agents/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717
TestModel,
1818
TestModelProvider,
1919
set_open_ai_agent_temporal_overrides,
20-
workflow,
2120
)
2221

22+
from . import workflow
23+
2324
__all__ = [
2425
"ModelActivity",
2526
"ModelActivityParameters",
Lines changed: 1 addition & 215 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
"""Initialize Temporal OpenAI Agents overrides."""
22

3-
import json
43
from contextlib import contextmanager
5-
from datetime import timedelta
6-
from typing import Any, AsyncIterator, Callable, Optional, Type, Union
4+
from typing import AsyncIterator, Callable, Optional, Union
75

8-
import nexusrpc
96
from agents import (
107
AgentOutputSchemaBase,
118
Handoff,
@@ -14,31 +11,21 @@
1411
ModelResponse,
1512
ModelSettings,
1613
ModelTracing,
17-
RunContextWrapper,
1814
Tool,
1915
TResponseInputItem,
2016
set_trace_provider,
2117
)
22-
from agents.function_schema import function_schema
2318
from agents.items import TResponseStreamEvent
2419
from agents.run import get_default_agent_runner, set_default_agent_runner
25-
from agents.tool import (
26-
FunctionTool,
27-
)
2820
from agents.tracing import get_trace_provider
2921
from agents.tracing.provider import DefaultTraceProvider
3022
from openai.types.responses import ResponsePromptParam
3123

32-
from temporalio import activity
33-
from temporalio import workflow as temporal_workflow
34-
from temporalio.common import Priority, RetryPolicy
3524
from temporalio.contrib.openai_agents._model_parameters import ModelActivityParameters
3625
from temporalio.contrib.openai_agents._openai_runner import TemporalOpenAIRunner
3726
from temporalio.contrib.openai_agents._temporal_trace_provider import (
3827
TemporalTraceProvider,
3928
)
40-
from temporalio.exceptions import ApplicationError, TemporalError
41-
from temporalio.workflow import ActivityCancellationType, VersioningIntent
4229

4330

4431
@contextmanager
@@ -146,204 +133,3 @@ def stream_response(
146133
) -> AsyncIterator[TResponseStreamEvent]:
147134
"""Get a streamed response from the model. Unimplemented."""
148135
raise NotImplementedError()
149-
150-
151-
class ToolSerializationError(TemporalError):
152-
"""Error that occurs when a tool output could not be serialized."""
153-
154-
155-
class workflow:
156-
"""Encapsulates workflow specific primitives for working with the OpenAI Agents SDK in a workflow context"""
157-
158-
@classmethod
159-
def activity_as_tool(
160-
cls,
161-
fn: Callable,
162-
*,
163-
task_queue: Optional[str] = None,
164-
schedule_to_close_timeout: Optional[timedelta] = None,
165-
schedule_to_start_timeout: Optional[timedelta] = None,
166-
start_to_close_timeout: Optional[timedelta] = None,
167-
heartbeat_timeout: Optional[timedelta] = None,
168-
retry_policy: Optional[RetryPolicy] = None,
169-
cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL,
170-
activity_id: Optional[str] = None,
171-
versioning_intent: Optional[VersioningIntent] = None,
172-
summary: Optional[str] = None,
173-
priority: Priority = Priority.default,
174-
) -> Tool:
175-
"""Convert a single Temporal activity function to an OpenAI agent tool.
176-
177-
.. warning::
178-
This API is experimental and may change in future versions.
179-
Use with caution in production environments.
180-
181-
This function takes a Temporal activity function and converts it into an
182-
OpenAI agent tool that can be used by the agent to execute the activity
183-
during workflow execution. The tool will automatically handle the conversion
184-
of inputs and outputs between the agent and the activity. Note that if you take a context,
185-
mutation will not be persisted, as the activity may not be running in the same location.
186-
187-
Args:
188-
fn: A Temporal activity function to convert to a tool.
189-
For other arguments, refer to :py:mod:`workflow` :py:meth:`start_activity`
190-
191-
Returns:
192-
An OpenAI agent tool that wraps the provided activity.
193-
194-
Raises:
195-
ApplicationError: If the function is not properly decorated as a Temporal activity.
196-
197-
Example:
198-
>>> @activity.defn
199-
>>> def process_data(input: str) -> str:
200-
... return f"Processed: {input}"
201-
>>>
202-
>>> # Create tool with custom activity options
203-
>>> tool = activity_as_tool(
204-
... process_data,
205-
... start_to_close_timeout=timedelta(seconds=30),
206-
... retry_policy=RetryPolicy(maximum_attempts=3),
207-
... heartbeat_timeout=timedelta(seconds=10)
208-
... )
209-
>>> # Use tool with an OpenAI agent
210-
"""
211-
ret = activity._Definition.from_callable(fn)
212-
if not ret:
213-
raise ApplicationError(
214-
"Bare function without tool and activity decorators is not supported",
215-
"invalid_tool",
216-
)
217-
schema = function_schema(fn)
218-
219-
async def run_activity(ctx: RunContextWrapper[Any], input: str) -> Any:
220-
try:
221-
json_data = json.loads(input)
222-
except Exception as e:
223-
raise ApplicationError(
224-
f"Invalid JSON input for tool {schema.name}: {input}"
225-
) from e
226-
227-
# Activities don't support keyword only arguments, so we can ignore the kwargs_dict return
228-
args, _ = schema.to_call_args(schema.params_pydantic_model(**json_data))
229-
230-
# Add the context to the arguments if it takes that
231-
if schema.takes_context:
232-
args = [ctx] + args
233-
234-
result = await temporal_workflow.execute_activity(
235-
fn,
236-
args=args,
237-
task_queue=task_queue,
238-
schedule_to_close_timeout=schedule_to_close_timeout,
239-
schedule_to_start_timeout=schedule_to_start_timeout,
240-
start_to_close_timeout=start_to_close_timeout,
241-
heartbeat_timeout=heartbeat_timeout,
242-
retry_policy=retry_policy,
243-
cancellation_type=cancellation_type,
244-
activity_id=activity_id,
245-
versioning_intent=versioning_intent,
246-
summary=summary,
247-
priority=priority,
248-
)
249-
try:
250-
return str(result)
251-
except Exception as e:
252-
raise ToolSerializationError(
253-
"You must return a string representation of the tool output, or something we can call str() on"
254-
) from e
255-
256-
return FunctionTool(
257-
name=schema.name,
258-
description=schema.description or "",
259-
params_json_schema=schema.params_json_schema,
260-
on_invoke_tool=run_activity,
261-
strict_json_schema=True,
262-
)
263-
264-
@classmethod
265-
def nexus_operation_as_tool(
266-
cls,
267-
operation: nexusrpc.Operation[Any, Any],
268-
*,
269-
service: Type[Any],
270-
endpoint: str,
271-
schedule_to_close_timeout: Optional[timedelta] = None,
272-
) -> Tool:
273-
"""Convert a Nexus operation into an OpenAI agent tool.
274-
275-
.. warning::
276-
This API is experimental and may change in future versions.
277-
Use with caution in production environments.
278-
279-
This function takes a Nexus operation and converts it into an
280-
OpenAI agent tool that can be used by the agent to execute the operation
281-
during workflow execution. The tool will automatically handle the conversion
282-
of inputs and outputs between the agent and the operation.
283-
284-
Args:
285-
fn: A Nexus operation to convert into a tool.
286-
service: The Nexus service class that contains the operation.
287-
endpoint: The Nexus endpoint to use for the operation.
288-
289-
Returns:
290-
An OpenAI agent tool that wraps the provided operation.
291-
292-
Example:
293-
>>> @nexusrpc.service
294-
... class WeatherService:
295-
... get_weather_object_nexus_operation: nexusrpc.Operation[WeatherInput, Weather]
296-
>>>
297-
>>> # Create tool with custom activity options
298-
>>> tool = nexus_operation_as_tool(
299-
... WeatherService.get_weather_object_nexus_operation,
300-
... service=WeatherService,
301-
... endpoint="weather-service",
302-
... )
303-
>>> # Use tool with an OpenAI agent
304-
"""
305-
306-
def operation_callable(input):
307-
raise NotImplementedError("This function definition is used as a type only")
308-
309-
operation_callable.__annotations__ = {
310-
"input": operation.input_type,
311-
"return": operation.output_type,
312-
}
313-
operation_callable.__name__ = operation.name
314-
315-
schema = function_schema(operation_callable)
316-
317-
async def run_operation(ctx: RunContextWrapper[Any], input: str) -> Any:
318-
try:
319-
json_data = json.loads(input)
320-
except Exception as e:
321-
raise ApplicationError(
322-
f"Invalid JSON input for tool {schema.name}: {input}"
323-
) from e
324-
325-
nexus_client = temporal_workflow.create_nexus_client(
326-
service=service, endpoint=endpoint
327-
)
328-
args, _ = schema.to_call_args(schema.params_pydantic_model(**json_data))
329-
assert len(args) == 1, "Nexus operations must have exactly one argument"
330-
[arg] = args
331-
result = await nexus_client.execute_operation(
332-
operation,
333-
arg,
334-
schedule_to_close_timeout=schedule_to_close_timeout,
335-
)
336-
try:
337-
return str(result)
338-
except Exception as e:
339-
raise ToolSerializationError(
340-
"You must return a string representation of the tool output, or something we can call str() on"
341-
) from e
342-
343-
return FunctionTool(
344-
name=schema.name,
345-
description=schema.description or "",
346-
params_json_schema=schema.params_json_schema,
347-
on_invoke_tool=run_operation,
348-
strict_json_schema=True,
349-
)

0 commit comments

Comments
 (0)