Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 191 additions & 19 deletions lib/crewai/src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from crewai.tasks.task_output import TaskOutput
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
from crewai.utilities.crew.models import CrewContext
Expand All @@ -90,6 +91,14 @@
from crewai.utilities.planning_handler import CrewPlanner
from crewai.utilities.printer import PrinterColor
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.streaming import (
TaskInfo,
create_async_chunk_generator,
create_chunk_generator,
create_streaming_state,
signal_end,
signal_error,
)
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler

Expand Down Expand Up @@ -225,6 +234,10 @@ class Crew(FlowTrackable, BaseModel):
"It may be used to adjust the output of the crew."
),
)
stream: bool = Field(
default=False,
description="Whether to stream output from the crew execution.",
)
max_rpm: int | None = Field(
default=None,
description=(
Expand Down Expand Up @@ -660,7 +673,43 @@ def train(
def kickoff(
self,
inputs: dict[str, Any] | None = None,
) -> CrewOutput:
) -> CrewOutput | CrewStreamingOutput:
if self.stream:
for agent in self.agents:
if agent.llm is not None:
agent.llm.stream = True

result_holder: list[CrewOutput] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}

state = create_streaming_state(current_task_info, result_holder)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []

def run_crew() -> None:
"""Execute the crew and capture the result."""
try:
self.stream = False
crew_result = self.kickoff(inputs=inputs)
if isinstance(crew_result, CrewOutput):
result_holder.append(crew_result)
except Exception as exc:
signal_error(state, exc)
finally:
self.stream = True
signal_end(state)

streaming_output = CrewStreamingOutput(
sync_iterator=create_chunk_generator(state, run_crew, output_holder)
)
output_holder.append(streaming_output)
return streaming_output

ctx = baggage.set_baggage(
"crew_context", CrewContext(id=str(self.id), key=self.key)
)
Expand Down Expand Up @@ -726,55 +775,178 @@ def kickoff(
finally:
detach(token)

def kickoff_for_each(self, inputs: list[dict[str, Any]]) -> list[CrewOutput]:
"""Executes the Crew's workflow for each input and aggregates results."""
results: list[CrewOutput] = []
def kickoff_for_each(
self, inputs: list[dict[str, Any]]
) -> list[CrewOutput | CrewStreamingOutput]:
"""Executes the Crew's workflow for each input and aggregates results.

If stream=True, returns a list of CrewStreamingOutput objects that must
each be iterated to get stream chunks and access results.
"""
results: list[CrewOutput | CrewStreamingOutput] = []

# Initialize the parent crew's usage metrics
total_usage_metrics = UsageMetrics()

for input_data in inputs:
crew = self.copy()

output = crew.kickoff(inputs=input_data)

if crew.usage_metrics:
if not self.stream and crew.usage_metrics:
total_usage_metrics.add_usage_metrics(crew.usage_metrics)

results.append(output)

self.usage_metrics = total_usage_metrics
if not self.stream:
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
return results

async def kickoff_async(self, inputs: dict[str, Any] | None = None) -> CrewOutput:
"""Asynchronous kickoff method to start the crew execution."""
async def kickoff_async(
self, inputs: dict[str, Any] | None = None
) -> CrewOutput | CrewStreamingOutput:
"""Asynchronous kickoff method to start the crew execution.

If stream=True, returns a CrewStreamingOutput that can be async-iterated
to get stream chunks. After iteration completes, access the final result
via .result.
"""
inputs = inputs or {}

if self.stream:
for agent in self.agents:
if agent.llm is not None:
agent.llm.stream = True

result_holder: list[CrewOutput] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}

state = create_streaming_state(
current_task_info, result_holder, use_async=True
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []

async def run_crew() -> None:
try:
self.stream = False
result = await asyncio.to_thread(self.kickoff, inputs)
if isinstance(result, CrewOutput):
result_holder.append(result)
except Exception as e:
signal_error(state, e, is_async=True)
finally:
self.stream = True
signal_end(state, is_async=True)

streaming_output = CrewStreamingOutput(
async_iterator=create_async_chunk_generator(
state, run_crew, output_holder
)
)
output_holder.append(streaming_output)

return streaming_output

return await asyncio.to_thread(self.kickoff, inputs)

async def kickoff_for_each_async(
self, inputs: list[dict[str, Any]]
) -> list[CrewOutput]:
) -> list[CrewOutput | CrewStreamingOutput] | CrewStreamingOutput:
"""Executes the Crew's workflow for each input asynchronously.

If stream=True, returns a single CrewStreamingOutput that yields chunks
from all crews as they arrive. After iteration, access results via .results
(list of CrewOutput).
"""
crew_copies = [self.copy() for _ in inputs]

async def run_crew(crew: Self, input_data: Any) -> CrewOutput:
return await crew.kickoff_async(inputs=input_data)
if self.stream:
result_holder: list[list[CrewOutput]] = [[]]
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}

state = create_streaming_state(
current_task_info, result_holder, use_async=True
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []

async def run_all_crews() -> None:
"""Run all crew copies and aggregate their streaming outputs."""
try:
streaming_outputs: list[CrewStreamingOutput] = []
for i, crew in enumerate(crew_copies):
streaming = await crew.kickoff_async(inputs=inputs[i])
if isinstance(streaming, CrewStreamingOutput):
streaming_outputs.append(streaming)

async def consume_stream(
stream_output: CrewStreamingOutput,
) -> CrewOutput:
"""Consume stream chunks and forward to parent queue.

Args:
stream_output: The streaming output to consume.

Returns:
The final CrewOutput result.
"""
async for chunk in stream_output:
if state.async_queue is not None and state.loop is not None:
state.loop.call_soon_threadsafe(
state.async_queue.put_nowait, chunk
)
return stream_output.result

crew_results = await asyncio.gather(
*[consume_stream(s) for s in streaming_outputs]
)
result_holder[0] = list(crew_results)
except Exception as e:
signal_error(state, e, is_async=True)
finally:
signal_end(state, is_async=True)

streaming_output = CrewStreamingOutput(
async_iterator=create_async_chunk_generator(
state, run_all_crews, output_holder
)
)

def set_results_wrapper(result: Any) -> None:
"""Wrap _set_results to match _set_result signature."""
streaming_output._set_results(result)

streaming_output._set_result = set_results_wrapper # type: ignore[method-assign]
output_holder.append(streaming_output)

return streaming_output

tasks = [
asyncio.create_task(run_crew(crew_copies[i], inputs[i]))
for i in range(len(inputs))
asyncio.create_task(crew_copy.kickoff_async(inputs=input_data))
for crew_copy, input_data in zip(crew_copies, inputs, strict=True)
]

results = await asyncio.gather(*tasks)

total_usage_metrics = UsageMetrics()
for crew in crew_copies:
if crew.usage_metrics:
total_usage_metrics.add_usage_metrics(crew.usage_metrics)

for crew_copy in crew_copies:
if crew_copy.usage_metrics:
total_usage_metrics.add_usage_metrics(crew_copy.usage_metrics)
self.usage_metrics = total_usage_metrics

self._task_output_handler.reset()
return results
return list(results)

def _handle_crew_planning(self) -> None:
"""Handles the Crew planning."""
Expand Down
Loading
Loading