Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion packages/cloud/src/bridge/BridgeOrchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export interface BridgeOrchestratorOptions {
export class BridgeOrchestrator {
private static instance: BridgeOrchestrator | null = null

private static pendingTask: TaskLike | null = null

// Core
private readonly userId: string
private readonly socketBridgeUrl: string
Expand Down Expand Up @@ -116,6 +118,22 @@ export class BridgeOrchestrator {
}
}

/**
* @TODO: What if subtasks also get spawned? We'd probably want deferred
* subscriptions for those too.
*/
public static async subscribeToTask(task: TaskLike): Promise<void> {
const instance = BridgeOrchestrator.instance

if (instance && instance.socketTransport.isConnected()) {
console.log(`[BridgeOrchestrator#subscribeToTask] Subscribing to task ${task.taskId}`)
await instance.subscribeToTask(task)
} else {
console.log(`[BridgeOrchestrator#subscribeToTask] Deferring subscription for task ${task.taskId}`)
BridgeOrchestrator.pendingTask = task
}
}

private constructor(options: BridgeOrchestratorOptions) {
this.userId = options.userId
this.socketBridgeUrl = options.socketBridgeUrl
Expand Down Expand Up @@ -180,12 +198,27 @@ export class BridgeOrchestrator {
const socket = this.socketTransport.getSocket()

if (!socket) {
console.error("[BridgeOrchestrator] Socket not available after connect")
console.error("[BridgeOrchestrator#handleConnect] Socket not available")
return
}

await this.extensionChannel.onConnect(socket)
await this.taskChannel.onConnect(socket)

if (BridgeOrchestrator.pendingTask) {
console.log(
`[BridgeOrchestrator#handleConnect] Subscribing to task ${BridgeOrchestrator.pendingTask.taskId}`,
)

try {
await this.subscribeToTask(BridgeOrchestrator.pendingTask)
BridgeOrchestrator.pendingTask = null
} catch (error) {
console.error(
`[BridgeOrchestrator#handleConnect] subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
)
}
}
}

private handleDisconnect() {
Expand Down Expand Up @@ -261,6 +294,7 @@ export class BridgeOrchestrator {
await this.taskChannel.cleanup(this.socketTransport.getSocket())
await this.socketTransport.disconnect()
BridgeOrchestrator.instance = null
BridgeOrchestrator.pendingTask = null
}

public async reconnect(): Promise<void> {
Expand Down
14 changes: 8 additions & 6 deletions packages/cloud/src/bridge/TaskChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,25 +163,27 @@ export class TaskChannel extends BaseChannel<
public async unsubscribeFromTask(taskId: string, _socket: Socket): Promise<void> {
const task = this.subscribedTasks.get(taskId)

if (!task) {
return
}

await this.publish(TaskSocketEvents.LEAVE, { taskId }, (response: LeaveResponse) => {
if (response.success) {
console.log(`[TaskChannel#unsubscribeFromTask] unsubscribed from ${taskId}`, response)
console.log(`[TaskChannel#unsubscribeFromTask] unsubscribed from ${taskId}`)
} else {
console.error(`[TaskChannel#unsubscribeFromTask] failed to unsubscribe from ${taskId}`)
}

// If we failed to unsubscribe then something is probably wrong and
// we should still discard this task from `subscribedTasks`.
if (task) {
this.removeTaskListeners(task)
this.subscribedTasks.delete(taskId)
}
this.removeTaskListeners(task)
this.subscribedTasks.delete(taskId)
})
}

private setupTaskListeners(task: TaskLike): void {
if (this.taskListeners.has(task.taskId)) {
console.warn("[TaskChannel] Listeners already exist for task, removing old listeners:", task.taskId)
console.warn(`[TaskChannel] Listeners already exist for task, removing old listeners for ${task.taskId}`)
this.removeTaskListeners(task)
}

Expand Down
3 changes: 1 addition & 2 deletions packages/cloud/src/bridge/__tests__/TaskChannel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,7 @@ describe("TaskChannel", () => {

// Verify warning was logged
expect(warnSpy).toHaveBeenCalledWith(
"[TaskChannel] Listeners already exist for task, removing old listeners:",
taskId,
`[TaskChannel] Listeners already exist for task, removing old listeners for ${taskId}`,
)

// Verify only one set of listeners exists
Expand Down
2 changes: 1 addition & 1 deletion packages/types/npm/package.metadata.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@roo-code/types",
"version": "1.64.0",
"version": "1.65.0",
"description": "TypeScript type definitions for Roo Code.",
"publishConfig": {
"access": "public",
Expand Down
37 changes: 14 additions & 23 deletions src/core/task/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {

// Task Bridge
enableBridge: boolean
bridge: BridgeOrchestrator | null = null

// Streaming
isWaitingForFirstChunk = false
Expand Down Expand Up @@ -1084,14 +1083,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
private async startTask(task?: string, images?: string[]): Promise<void> {
if (this.enableBridge) {
try {
this.bridge = this.bridge || BridgeOrchestrator.getInstance()

if (this.bridge) {
await this.bridge.subscribeToTask(this)
}
await BridgeOrchestrator.subscribeToTask(this)
} catch (error) {
console.error(
`[Task#startTask] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`,
`[Task#startTask] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
)
}
}
Expand Down Expand Up @@ -1156,14 +1151,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
private async resumeTaskFromHistory() {
if (this.enableBridge) {
try {
this.bridge = this.bridge || BridgeOrchestrator.getInstance()

if (this.bridge) {
await this.bridge.subscribeToTask(this)
}
await BridgeOrchestrator.subscribeToTask(this)
} catch (error) {
console.error(
`[Task#resumeTaskFromHistory] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`,
`[Task#resumeTaskFromHistory] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
)
}
}
Expand Down Expand Up @@ -1417,10 +1408,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
}

public dispose(): void {
// Disposing task
console.log(`[Task] disposing task ${this.taskId}.${this.instanceId}`)
console.log(`[Task#dispose] disposing task ${this.taskId}.${this.instanceId}`)

// Remove all event listeners to prevent memory leaks
// Remove all event listeners to prevent memory leaks.
try {
this.removeAllListeners()
} catch (error) {
Expand All @@ -1433,13 +1423,14 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
this.pauseInterval = undefined
}

// Unsubscribe from TaskBridge service.
if (this.bridge) {
this.bridge
.unsubscribeFromTask(this.taskId)
.catch((error: unknown) => console.error("Error unsubscribing from task bridge:", error))

this.bridge = null
if (this.enableBridge) {
BridgeOrchestrator.getInstance()
?.unsubscribeFromTask(this.taskId)
.catch((error) =>
console.error(
`[Task#dispose] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`,
),
)
}

// Release any terminals associated with this task.
Expand Down
2 changes: 1 addition & 1 deletion src/core/task/__tests__/Task.dispose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ describe("Task dispose method", () => {

// Verify dispose was called and logged
expect(consoleLogSpy).toHaveBeenCalledWith(
expect.stringContaining(`[Task] disposing task ${task.taskId}.${task.instanceId}`),
expect.stringContaining(`[Task#dispose] disposing task ${task.taskId}.${task.instanceId}`),
)

// Verify removeAllListeners was called first (before other cleanup)
Expand Down
40 changes: 23 additions & 17 deletions src/core/webview/ClineProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ export class ClineProvider

this.marketplaceManager = new MarketplaceManager(this.context, this.customModesManager)

// Forward <most> task events to the provider.
// We do something fairly similar for the IPC-based API.
this.taskCreationCallback = (instance: Task) => {
this.emit(RooCodeEventName.TaskCreated, instance)

Expand Down Expand Up @@ -346,18 +348,18 @@ export class ClineProvider
let task = this.clineStack.pop()

if (task) {
task.emit(RooCodeEventName.TaskUnfocused)

try {
// Abort the running task and set isAbandoned to true so
// all running promises will exit as well.
await task.abortTask(true)
} catch (e) {
this.log(
`[removeClineFromStack] encountered error while aborting task ${task.taskId}.${task.instanceId}: ${e.message}`,
`[ClineProvider#removeClineFromStack] abortTask() failed ${task.taskId}.${task.instanceId}: ${e.message}`,
)
}

task.emit(RooCodeEventName.TaskUnfocused)

// Remove event listeners before clearing the reference.
const cleanupFunctions = this.taskEventListeners.get(task)

Expand Down Expand Up @@ -405,12 +407,6 @@ export class ClineProvider
await this.getCurrentTask()?.resumePausedTask(lastMessage)
}

// Clear the current task without treating it as a subtask.
// This is used when the user cancels a task that is not a subtask.
async clearTask() {
await this.removeClineFromStack()
}

resumeTask(taskId: string): void {
// Use the existing showTaskWithId method which handles both current and historical tasks
this.showTaskWithId(taskId).catch((error) => {
Expand Down Expand Up @@ -1307,6 +1303,16 @@ export class ClineProvider
await this.createTaskWithHistoryItem({ ...historyItem, rootTask, parentTask })
}

// Clear the current task without treating it as a subtask.
// This is used when the user cancels a task that is not a subtask.
async clearTask() {
if (this.clineStack.length > 0) {
const task = this.clineStack[this.clineStack.length - 1]
console.log(`[clearTask] clearing task ${task.taskId}.${task.instanceId}`)
await this.removeClineFromStack()
}
}

async updateCustomInstructions(instructions?: string) {
// User may be clearing the field.
await this.updateGlobalState("customInstructions", instructions || undefined)
Expand Down Expand Up @@ -1585,6 +1591,7 @@ export class ClineProvider
})
} catch (error) {
console.error("Failed to fetch marketplace data:", error)

// Send empty data on error to prevent UI from hanging
this.postMessageToWebview({
type: "marketplaceData",
Expand Down Expand Up @@ -2213,24 +2220,23 @@ export class ClineProvider
if (bridge) {
const currentTask = this.getCurrentTask()

if (currentTask && !currentTask.bridge) {
if (currentTask && !currentTask.enableBridge) {
try {
currentTask.bridge = bridge
await currentTask.bridge.subscribeToTask(currentTask)
currentTask.enableBridge = true
await BridgeOrchestrator.subscribeToTask(currentTask)
} catch (error) {
const message = `[ClineProvider#remoteControlEnabled] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`
const message = `[ClineProvider#remoteControlEnabled] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`
this.log(message)
console.error(message)
}
}
} else {
for (const task of this.clineStack) {
if (task.bridge) {
if (task.enableBridge) {
try {
await task.bridge.unsubscribeFromTask(task.taskId)
task.bridge = null
await BridgeOrchestrator.getInstance()?.unsubscribeFromTask(task.taskId)
} catch (error) {
const message = `[ClineProvider#remoteControlEnabled] unsubscribeFromTask failed - ${error instanceof Error ? error.message : String(error)}`
const message = `[ClineProvider#remoteControlEnabled] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`
this.log(message)
console.error(message)
}
Expand Down
28 changes: 23 additions & 5 deletions src/extension/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,29 @@ export class API extends EventEmitter<RooCodeEvents> implements RooCodeAPI {
this.taskMap.delete(task.taskId)
})

// Optional:
// RooCodeEventName.TaskFocused
// RooCodeEventName.TaskUnfocused
// RooCodeEventName.TaskActive
// RooCodeEventName.TaskIdle
task.on(RooCodeEventName.TaskFocused, () => {
this.emit(RooCodeEventName.TaskFocused, task.taskId)
})

task.on(RooCodeEventName.TaskUnfocused, () => {
this.emit(RooCodeEventName.TaskUnfocused, task.taskId)
})

task.on(RooCodeEventName.TaskActive, () => {
this.emit(RooCodeEventName.TaskActive, task.taskId)
})

task.on(RooCodeEventName.TaskInteractive, () => {
this.emit(RooCodeEventName.TaskInteractive, task.taskId)
})

task.on(RooCodeEventName.TaskResumable, () => {
this.emit(RooCodeEventName.TaskResumable, task.taskId)
})

task.on(RooCodeEventName.TaskIdle, () => {
this.emit(RooCodeEventName.TaskIdle, task.taskId)
})

// Subtask Lifecycle

Expand Down
Loading