Skip to content

Commit cc175c1

Browse files
committed
Refactor stream handling in workflow.ts and stream.ts to utilize WorkflowStream type. Update processWorkflowStream function for improved event processing and clarity. Enhance imports from @llamaindex/workflow.
1 parent 1e7617c commit cc175c1

File tree

2 files changed

+48
-44
lines changed

2 files changed

+48
-44
lines changed

packages/server/src/utils/stream.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
import { agentStreamEvent, type WorkflowEventData } from "@llamaindex/workflow";
1+
import {
2+
agentStreamEvent,
3+
type WorkflowEventData,
4+
type WorkflowStream,
5+
} from "@llamaindex/workflow";
26
// DataStream is deprecated, converting to new API
37
import {
48
createDataStreamResponse,
@@ -14,7 +18,7 @@ import {
1418
* @returns A Response object with the streamed data.
1519
*/
1620
export function toDataStreamResponse(
17-
stream: AsyncIterable<WorkflowEventData<unknown>>,
21+
stream: WorkflowStream<WorkflowEventData<unknown>>,
1822
options: {
1923
init?: ResponseInit;
2024
callbacks?: {

packages/server/src/utils/workflow.ts

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import {
33
agentToolCallResultEvent,
44
run,
55
startAgentEvent,
6+
stopAgentEvent,
7+
WorkflowStream,
68
type AgentInputData,
79
type WorkflowEventData,
810
} from "@llamaindex/workflow";
@@ -49,51 +51,49 @@ export async function runWorkflow(
4951
});
5052
}
5153

52-
// Process the workflow stream to handle non-stream events as annotations
53-
async function* processWorkflowStream(
54-
stream: AsyncIterable<WorkflowEventData<unknown>>,
55-
): AsyncIterable<WorkflowEventData<unknown>> {
56-
for await (const event of stream) {
57-
const transformedEvent = transformWorkflowEvent(event);
54+
function processWorkflowStream(
55+
stream: WorkflowStream<WorkflowEventData<unknown>>,
56+
) {
57+
return stream.until(stopAgentEvent).pipeThrough(
58+
new TransformStream<WorkflowEventData<unknown>, WorkflowEventData<unknown>>(
59+
{
60+
async transform(event, controller) {
61+
let transformedEvent = event;
5862

59-
if (sourceEvent.include(transformedEvent)) {
60-
const sourceNodes = transformedEvent.data.data.nodes;
61-
downloadLlamaCloudFilesFromNodes(sourceNodes); // download files in background
62-
}
63-
yield transformedEvent;
64-
}
65-
}
63+
// Handle agent events from AgentToolCall
64+
if (agentToolCallEvent.include(event)) {
65+
const inputString = JSON.stringify(event.data.toolKwargs);
66+
transformedEvent = toAgentRunEvent({
67+
agent: event.data.agentName,
68+
text: `Using tool: '${event.data.toolName}' with inputs: '${inputString}'`,
69+
type: "text",
70+
});
71+
}
72+
// Handle source nodes from AgentToolCallResult
73+
else if (agentToolCallResultEvent.include(event)) {
74+
const rawOutput = event.data.raw;
75+
if (
76+
rawOutput &&
77+
typeof rawOutput === "object" &&
78+
"sourceNodes" in rawOutput // TODO: better use Zod to validate and extract sourceNodes from toolCallResult
79+
) {
80+
const sourceNodes =
81+
rawOutput.sourceNodes as unknown as NodeWithScore<Metadata>[];
82+
transformedEvent = toSourceEvent(sourceNodes);
83+
}
84+
}
6685

67-
// transform WorkflowEvent to another WorkflowEvent for annotations display purpose
68-
// this useful for handling AgentWorkflow events, because we cannot easily append custom events like custom workflows
69-
function transformWorkflowEvent(
70-
event: WorkflowEventData<unknown>,
71-
): WorkflowEventData<unknown> {
72-
// convert AgentToolCall event to AgentRunEvent
73-
if (agentToolCallEvent.include(event)) {
74-
const inputString = JSON.stringify(event.data.toolKwargs);
75-
return toAgentRunEvent({
76-
agent: event.data.agentName,
77-
text: `Using tool: '${event.data.toolName}' with inputs: '${inputString}'`,
78-
type: "text",
79-
});
80-
}
86+
// Post-process for llama-cloud files
87+
if (sourceEvent.include(transformedEvent)) {
88+
const sourceNodesForDownload = transformedEvent.data.data.nodes; // These are SourceEventNode[]
89+
downloadLlamaCloudFilesFromNodes(sourceNodesForDownload); // download files in background
90+
}
8191

82-
// if AgentToolCallResult contains sourceNodes, convert it to SourceEvent
83-
if (agentToolCallResultEvent.include(event)) {
84-
const rawOutput = event.data.raw;
85-
if (
86-
rawOutput &&
87-
typeof rawOutput === "object" &&
88-
"sourceNodes" in rawOutput // TODO: better use Zod to validate and extract sourceNodes from toolCallResult
89-
) {
90-
return toSourceEvent(
91-
rawOutput.sourceNodes as unknown as NodeWithScore<Metadata>[],
92-
);
93-
}
94-
}
95-
96-
return event;
92+
controller.enqueue(transformedEvent);
93+
},
94+
},
95+
),
96+
);
9797
}
9898

9999
async function downloadLlamaCloudFilesFromNodes(nodes: SourceEventNode[]) {

0 commit comments

Comments
 (0)