diff --git a/Cargo.lock b/Cargo.lock index 503fb48ea5..54d1854289 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,17 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "acp-client-test" +version = "1.18.0" +dependencies = [ + "agent-client-protocol", + "anyhow", + "serde_json", + "tokio", + "tokio-util", +] + [[package]] name = "addr2line" version = "0.24.2" @@ -28,6 +39,22 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "agent-client-protocol" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5d9f90990e9ff33a9f93d7e7043a093fc1b0f6c16136aac98cecd6de8781a0" +dependencies = [ + "anyhow", + "async-broadcast", + "futures", + "log", + "parking_lot", + "schemars", + "serde", + "serde_json", +] + [[package]] name = "ahash" version = "0.8.12" @@ -304,6 +331,18 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "async-broadcast" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.27" @@ -1239,6 +1278,7 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" name = "chat_cli" version = "1.18.0" dependencies = [ + "agent-client-protocol", "amzn-codewhisperer-client", "amzn-codewhisperer-streaming-client", "amzn-consolas-client", @@ -1602,6 +1642,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.11" @@ -2332,6 +2381,27 @@ dependencies = [ "cc", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.12" @@ -4604,6 +4674,12 @@ version = "4.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48dd4f4a2c8405440fd0462561f0e5806bd0f77e86f51c761481bdd4018b545e" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.4" diff --git a/Cargo.toml b/Cargo.toml index 2d80bc05f4..c1440728e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "3" -members = ["crates/amzn-codewhisperer-client", "crates/amzn-codewhisperer-streaming-client", "crates/amzn-consolas-client", "crates/amzn-qdeveloper-streaming-client", "crates/amzn-toolkit-telemetry-client", "crates/aws-toolkit-telemetry-definitions", "crates/chat-cli", "crates/semantic-search-client"] +members = ["crates/acp-client-test","crates/amzn-codewhisperer-client", "crates/amzn-codewhisperer-streaming-client", "crates/amzn-consolas-client", "crates/amzn-qdeveloper-streaming-client", "crates/amzn-toolkit-telemetry-client", "crates/aws-toolkit-telemetry-definitions", "crates/chat-cli", "crates/semantic-search-client"] default-members = ["crates/chat-cli"] [workspace.package] @@ -12,6 +12,7 @@ version = "1.18.0" license = "MIT OR Apache-2.0" [workspace.dependencies] +agent-client-protocol = "0.3.0" amzn-codewhisperer-client = { path = "crates/amzn-codewhisperer-client" } amzn-codewhisperer-streaming-client = { path = "crates/amzn-codewhisperer-streaming-client" } amzn-consolas-client = { path = "crates/amzn-consolas-client" } diff --git a/PR_PLAN.md b/PR_PLAN.md new file mode 100644 index 0000000000..c805346426 --- /dev/null +++ b/PR_PLAN.md @@ -0,0 +1,153 @@ +# ACP Server Implementation Plan + +This document tracks implementation progress for the Agent Client Protocol (ACP) server integration. It will be removed before the PR lands. + +## Implementation Plan + +The implementation uses an actor-based pattern for clean message passing instead of shared state with RwLocks. Implementation proceeds in commit-sized units: + +### Phase 1: Actor Foundation +1. **Actor pattern foundation** - Implement `AcpAgentForward`, `AcpServerHandle`, and `AcpSessionHandle` with message types + - *Test*: Actor handles can be created, messages can be sent (stub responses) + - *Note*: Uses `mpsc::channel(32)` for bounded message passing, `oneshot::channel()` for responses +2. **Basic command structure** - Add `q acp` subcommand with feature gating and actor system integration + - *Test*: `q acp` command spawns actor system, handles `initialize` requests + - *Note*: Uses `LocalSet` for !Send ACP futures, stdio transport with `AgentSideConnection` +3. **Server actor implementation** - Implement server actor loop with session management + - *Test*: Can handle multiple ACP method calls, routes to appropriate handlers + - *Note*: Server actor maintains `HashMap` for session routing + +### Phase 2: Session Management +4. **Session lifecycle** - Implement `new_session` and `load_session` with session actor spawning + - *Test*: Can create sessions, spawn session actors, store session IDs + - *Note*: Each session actor owns its `ConversationState` and `ToolManager` instance +5. **Basic prompt handling** - Implement session actor prompt processing (stub LLM responses) + - *Test*: Can send prompts to sessions, session actors receive and respond +6. **Response streaming** - Wire up real LLM integration with ACP streaming notifications + - *Test*: Prompts return actual AI responses, streaming works through actor system + +### Phase 2.5: Test Infrastructure Refactor +7. **MockLLM architecture refactor** - Move from single-actor to stateless per-turn model + - *Current problem*: MockLLM uses old single-actor pattern, doesn't match real LLM behavior + - *Solution*: Refactor to per-turn MockLLMContext with conversation history, streaming tx channel + - *Test*: Multi-turn conversations work with clean test API (`read_user_message()`, `respond_to_user()`) +8. **ApiClient streaming integration** - Return streams instead of collecting vectors + - *Current problem*: Mock LLM collects all events synchronously into `Vec` + - *Solution*: Return proper streaming response like real LLM clients (CodeWhisperer/QDeveloper) + - *Test*: Mock responses stream incrementally, match real LLM streaming behavior +9. **Actor test harness cleanup** - Align test infrastructure with new MockLLM model + - *Dependencies*: Requires MockLLM refactor completion + - *Test*: ACP actor tests work with new stateless MockLLMContext API + +### Phase 3: Tool System Refactoring & ACP Integration +10. **Tool system UI separation** - Refactor existing tool system to separate UI concerns from core logic + - *Problem*: Current tool execution mixes permission evaluation, console I/O, and state management + - *Solution*: Extract pure permission functions and create `PermissionInterface` trait abstraction + - *Test*: Identical console behavior with cleaner, more testable architecture + - *Details*: See `TOOL_USE_REFACTOR.md` for complete refactoring plan +11. **ACP tool permissions** - Implement ACP permission interface using refactored system + - *Implementation*: Create `AcpPermissionInterface` that sends protocol permission requests + - *Test*: Tools work through session actors, permission requests flow correctly via ACP + - *Note*: Session actors route tool permissions through ACP instead of console prompts +12. **ACP file operation routing** - Implement ACP file tools using refactored architecture + - *Implementation*: Create ACP-aware `fs_read`/`fs_write` tools that use protocol operations + - *Test*: File operations work through editor, session actors route via ACP protocol + - *Note*: Session actors use ACP file operations instead of direct filesystem access + +**Architecture Benefits:** +- **Eliminates RwLocks**: No shared mutable state, each actor owns its data +- **Natural backpressure**: Bounded channels prevent memory issues under load +- **Clean testing**: Each actor can be tested in isolation with message injection +- **Incremental development**: Can implement and test each actor independently + +## Current Implementation Status + +**✅ COMPLETED - Phase 1: Actor Foundation** +1. ✅ **Actor pattern foundation** - Complete actor system with message types + - `AcpAgentForward`, `AcpServerHandle`, `AcpSessionHandle` implemented + - Bounded channels (`mpsc::channel(32)`) with `oneshot` responses + - Proper error propagation (`eyre::Result` internal, `acp::Error` protocol) +2. ✅ **Basic command structure** - `q acp` subcommand with actor integration + - Feature gating, `LocalSet` for !Send futures, stdio transport + - `AgentSideConnection` integration working +3. ✅ **Server actor implementation** - Complete server actor with session routing + - Session management with `HashMap` + - Method routing for `initialize`, `new_session`, `load_session`, etc. + +**✅ COMPLETED - Phase 2: Session Management** +4. ✅ **Session lifecycle** - Session creation and actor spawning working + - `new_session` creates session actors with unique IDs + - Each session actor owns its `ConversationState` and `ToolManager` +5. ✅ **Basic prompt handling** - Session actors process prompts correctly + - Convert ACP prompts to Q CLI format, set in conversation state +6. ✅ **Response streaming** - Full LLM integration with streaming notifications + - Real `SendMessageStream` integration, `ResponseEvent` → ACP conversion + - Streaming `AssistantText`, `ToolUseStart`, `ToolUse` events via transport + +**✅ COMPLETED - Phase 2.5: Test Infrastructure Refactor** + +The MockLLM system has been successfully refactored to work properly with the actor-based ACP system and match real LLM behavior patterns. + +**Key Improvements Implemented:** +- **Stateless per-turn architecture**: Each user message spawns fresh MockLLMContext with full conversation history +- **Proper streaming integration**: ApiClient now returns streaming channels instead of collected vectors +- **Sophisticated pattern matching**: New regex-based conversation matching with named capture groups +- **Declarative test API**: Simple tuple-based pattern definitions replace complex imperative code +- **Proper error handling**: Result> types with clear error messages for regex compilation failures + +**Completed Work:** +7. ✅ **MockLLM architecture refactor** - **COMPLETE** + - **Achieved**: Stateless per-turn `MockLLMContext` with conversation history + streaming channels + - **API**: `match_conversation()` with regex patterns, `try_patterns()` for declarative matching + - **Benefits**: Matches real LLM behavior, enables proper actor system testing +8. ✅ **ApiClient streaming integration** - **COMPLETE** + - **Achieved**: Returns proper streaming `Receiver>` like real LLM clients + - **Fixed**: Performance regression from unnecessary `conversation.clone()` + - **Benefits**: True async streaming behavior, no more synchronous collection +9. ✅ **Actor test harness integration** - **COMPLETE** + - **Achieved**: Clean integration with declarative `try_patterns()` API + - **Compatibility**: All existing tests fixed with streaming helper functions + - **Benefits**: 20+ lines of imperative pattern matching → 5 lines of declarative config + +**New Declarative Test API:** +```rust +// Before: Complex imperative pattern matching +if ctx.match_and_respond(&[], r"(?i)hi,?\s+claude", "Hi, you! What's your name?").await? { + return Ok(()); +} +// After: Simple declarative patterns with automatic regex substitution +ctx.try_patterns(&[ + (&[], r"(?i)hi,?\s+claude", "Hi, you! What's your name?"), + (&[r"assistant.*name"], r"(?P\w+)", "Hi $name, I'm Q!"), +]).await +``` + +**🔄 NEXT - Phase 3: Tool System Refactoring & ACP Integration** +10. **Tool system UI separation** - Ready to begin refactoring + - Current: Tool execution mixes permission evaluation, console I/O, and state management in `tool_use_execute()` + - Plan: Extract pure permission functions, create `PermissionInterface` trait for swappable UI implementations + - Goal: Enable ACP integration while preserving identical console behavior +11. ⚠️ **ACP tool permissions** - Blocked on refactoring completion + - Current: Tool use shows as `[Tool execution]` placeholder in ACP sessions + - Missing: ACP `session/request_permission` flow, proper `ToolCall` messages + - Depends: Requires `PermissionInterface` abstraction from step 10 +12. ⚠️ **ACP file operation routing** - Blocked on permission system + - Current: Uses direct filesystem access even in ACP sessions + - Missing: Route `fs_read`/`fs_write` through ACP protocol operations + - Depends: Requires ACP tool permission system from step 11 + +**Minor TODOs:** +- Session configuration from ACP (currently uses defaults) +- Set session mode implementation (currently returns method not found) + +**Current State:** The ACP server is **functionally complete** for basic chat functionality with **sophisticated test infrastructure** and **comprehensive cancellation support**. Users can connect editors, create sessions, send prompts, receive streaming AI responses, and cancel active prompts. The actor architecture is solid and now has a stateless MockLLM system that matches real LLM behavior patterns, enabling comprehensive testing of the actor-based system with declarative pattern matching APIs. + +**🔍 READY FOR REVIEW** +The ACP server implementation is complete and ready for in-depth code review. Key areas for review: +- **Cancellation system** (`crates/chat-cli/src/cli/acp/server_session.rs`) - Concurrent prompt processing with tokio::select! cancellation +- **Cross-session isolation** (`crates/chat-cli/src/cli/acp/tests.rs`) - Comprehensive test coverage for session independence +- **MockLLM architecture** (`crates/chat-cli/src/mock_llm.rs`) - Stateless per-turn design with streaming +- **Pattern matching API** - Regex-based conversation matching with declarative `try_patterns()` +- **ApiClient integration** - Streaming compatibility and performance fixes +- **Test compatibility** - Helper functions maintaining existing test functionality +- **Error handling** - Proper `Result>` types with clear error messages \ No newline at end of file diff --git a/TOOL_USE_REFACTOR.md b/TOOL_USE_REFACTOR.md new file mode 100644 index 0000000000..9fcedbfc17 --- /dev/null +++ b/TOOL_USE_REFACTOR.md @@ -0,0 +1,303 @@ +# Tool Use System Refactoring Plan + +This document outlines the refactoring of the tool use system to separate UI concerns from core logic, enabling clean ACP integration without affecting existing console functionality. + +## Problem Statement + +The current tool execution system in `crates/chat-cli/src/cli/chat/mod.rs` has three concerns tightly coupled together: + +1. **Permission Evaluation** - Pure logic determining if tools are allowed +2. **UI Interaction** - Console output, user prompts, colored formatting +3. **Flow Control** - State machine with `ChatState::PromptUser` and `pending_tool_index` + +This coupling creates several issues: +- **ACP Integration Blocker**: Console I/O prevents use in protocol-based sessions +- **Complex State Machine**: Scattered control flow across multiple state transitions +- **Testing Difficulty**: UI side effects make unit testing challenging +- **Code Clarity**: Mixed concerns make the logic hard to follow + +## Current Architecture Issues + +### Console I/O Coupling +```rust +// Lines 2280-2291 in tool_use_execute() +execute!( + self.stderr, + style::SetForegroundColor(Color::Red), + style::Print("Command "), + style::Print(&tool.name), + style::Print(" is rejected..."), +)?; +``` + +### State Machine Complexity +```rust +// Permission needed → return special state +self.pending_tool_index = Some(i); +return Ok(ChatState::PromptUser { skip_printing_tools: false }); + +// Later, somehow resume from pending state... +``` + +### Mixed Concerns in Single Method +The `tool_use_execute()` method handles: +- Permission evaluation logic +- Terminal formatting and output +- User input handling +- Tool execution +- State management + +## Refactoring Solution + +### Architecture Overview + +``` +┌─────────────────────────────────────────────────────────────┐ +│ tool_use_execute() │ +│ (Clean async flow) │ +└─────────────────┬───────────────────────────────────────────┘ + │ +┌─────────────────▼───────────────────────────────────────────┐ +│ Permission Evaluation │ +│ (Pure functions) │ +└─────────────────┬───────────────────────────────────────────┘ + │ +┌─────────────────▼───────────────────────────────────────────┐ +│ PermissionInterface │ +│ (Async trait) │ +└─────────────┬─────────────────────────────┬─────────────────┘ + │ │ +┌─────────────▼─────────────┐ ┌─────────▼─────────────────┐ +│ ConsolePermissionInterface│ │ AcpPermissionInterface │ +│ (Terminal I/O) │ │ (Protocol messages) │ +└────────────────────────────┘ └───────────────────────────┘ +``` + +### Key Components + +#### 1. Pure Permission Evaluation +```rust +// New file: crates/chat-cli/src/cli/chat/permission.rs +pub fn evaluate_tool_permissions( + tools: &[QueuedTool], + agents: &AgentState, + os: &Os, +) -> Vec; + +pub enum ToolPermissionResult { + Allowed, + RequiresConfirmation { tool_index: usize, tool_name: String }, + Denied { tool_index: usize, rules: Vec }, +} +``` + +#### 2. Permission Interface Abstraction +```rust +#[async_trait] +pub trait PermissionInterface { + async fn request_permission( + &mut self, + tool: &QueuedTool, + context: &PermissionContext, + ) -> Result; + + async fn show_denied_tool( + &mut self, + tool: &QueuedTool, + rules: Vec, + ) -> Result<()>; + + async fn show_tool_execution( + &mut self, + tool: &QueuedTool, + allowed: bool, + ) -> Result<()>; +} + +pub enum PermissionDecision { + Approved, + Rejected, + Cancelled, +} +``` + +#### 3. Console Implementation +```rust +// New file: crates/chat-cli/src/cli/chat/permission/console.rs +pub struct ConsolePermissionInterface<'a> { + stdout: &'a mut dyn Write, + stderr: &'a mut dyn Write, + stdin: Box, +} + +#[async_trait] +impl PermissionInterface for ConsolePermissionInterface<'_> { + async fn request_permission(&mut self, tool: &QueuedTool, context: &PermissionContext) -> Result { + // Existing console behavior: + // - Show colored tool description + // - Play notification bell + // - Read user input from stdin + // - Return decision + } +} +``` + +#### 4. Refactored Main Flow +```rust +async fn tool_use_execute(&mut self, os: &mut Os) -> Result { + // 1. Pure permission evaluation + let permission_results = evaluate_tool_permissions(&self.tool_uses, &self.conversation.agents, os); + + // 2. Create appropriate permission interface + let mut permission_interface = self.create_permission_interface(); + + // 3. Handle permissions with clean async flow + for result in permission_results { + match result { + ToolPermissionResult::RequiresConfirmation { tool_index, .. } => { + let tool = &self.tool_uses[tool_index]; + let decision = permission_interface.request_permission(tool, &context).await?; + match decision { + PermissionDecision::Approved => { + self.tool_uses[tool_index].accepted = true; + } + PermissionDecision::Rejected => { + return Ok(ChatState::HandleInput { + input: format!("Tool {} was rejected", tool.name) + }); + } + } + } + ToolPermissionResult::Denied { tool_index, rules } => { + let tool = &self.tool_uses[tool_index]; + permission_interface.show_denied_tool(tool, rules).await?; + return Ok(ChatState::HandleInput { + input: format!("Tool {} was denied", tool.name) + }); + } + ToolPermissionResult::Allowed => { + // Tool already approved + } + } + } + + // 4. Execute tools (existing logic, unchanged) + self.execute_approved_tools(os).await +} +``` + +## Benefits + +### 1. Eliminates State Machine Complexity +- **Before**: Complex state transitions with `ChatState::PromptUser` and `pending_tool_index` +- **After**: Straightforward async flow that reads naturally top-to-bottom + +### 2. Enables Clean ACP Integration +- **Console Interface**: Preserves existing terminal behavior exactly +- **ACP Interface**: Routes permission requests through protocol messages +- **Swappable**: Same core logic, different UI implementations + +### 3. Improves Testability +- **Pure Functions**: Permission evaluation can be unit tested without UI +- **Mockable Interfaces**: Permission interfaces can be mocked for testing +- **Isolated Concerns**: Each component can be tested independently + +### 4. Maintains Backward Compatibility +- **Zero Behavior Changes**: Existing console functionality identical +- **Same External API**: No changes to public interfaces +- **Incremental Migration**: Can be implemented step-by-step + +## Implementation Plan + +### Phase 1: Extract Permission Evaluation +1. Create `crates/chat-cli/src/cli/chat/permission.rs` +2. Move permission logic from `tool_use_execute()` into pure functions +3. Define `ToolPermissionResult` enum for structured results +4. Update `tool_use_execute()` to use extracted functions + +### Phase 2: Create Permission Interface +1. Define `PermissionInterface` trait with async methods +2. Create `PermissionContext` for passing context data +3. Define `PermissionDecision` enum for results + +### Phase 3: Implement Console Interface +1. Create `crates/chat-cli/src/cli/chat/permission/console.rs` +2. Move existing console I/O logic into `ConsolePermissionInterface` +3. Implement all trait methods preserving current behavior +4. Handle stdin reading asynchronously + +### Phase 4: Refactor Main Flow +1. Update `tool_use_execute()` to use new interfaces +2. Add `create_permission_interface()` factory method +3. Remove old permission handling code +4. Remove `pending_tool_index` and related state machine logic + +### Phase 5: Testing and Validation +1. Add unit tests for permission evaluation functions +2. Add integration tests for console interface +3. Verify identical behavior with existing system +4. Performance testing to ensure no regressions + +## Future ACP Integration + +Once this refactoring is complete, ACP integration becomes straightforward: + +```rust +// Future ACP implementation +pub struct AcpPermissionInterface { + client: AcpClientHandle, + session_id: SessionId, +} + +#[async_trait] +impl PermissionInterface for AcpPermissionInterface { + async fn request_permission(&mut self, tool: &QueuedTool, context: &PermissionContext) -> Result { + // Send protocol message + let request = acp::RequestPermissionRequest { + tool_name: tool.name.clone(), + tool_args: tool.tool.get_args(), + reason: context.reason.clone(), + }; + + let response = self.client.request_permission(request).await?; + + Ok(match response.decision { + acp::PermissionResult::Approved => PermissionDecision::Approved, + acp::PermissionResult::Denied => PermissionDecision::Rejected, + }) + } +} +``` + +The ACP session actor can then create an `AcpPermissionInterface` instead of `ConsolePermissionInterface`, routing all permission requests through the protocol without changing any core tool logic. + +## File Structure + +``` +crates/chat-cli/src/cli/chat/ +├── mod.rs # Updated tool_use_execute() +├── permission.rs # Pure permission evaluation +└── permission/ + ├── mod.rs # PermissionInterface trait + ├── console.rs # Console implementation + └── acp.rs # Future ACP implementation +``` + +## Risk Mitigation + +### Behavior Preservation +- **Identical Console Flow**: All existing terminal interactions preserved exactly +- **Same Error Messages**: Error formatting and messaging unchanged +- **Performance Parity**: No significant performance impact + +### Testing Strategy +- **Side-by-Side Testing**: Run old and new implementations in parallel +- **Integration Tests**: Full tool execution scenarios +- **Manual Verification**: Interactive testing of permission flows + +### Rollback Plan +- **Incremental Changes**: Each phase can be implemented and tested independently +- **Feature Flags**: Can gate new implementation behind feature flag if needed +- **Clean Separation**: Old code can be preserved during transition + +This refactoring provides a clean foundation for ACP tool integration while maintaining full backward compatibility with existing console-based tool execution. \ No newline at end of file diff --git a/agent-client-protocol-rfc.md b/agent-client-protocol-rfc.md new file mode 100644 index 0000000000..cb96a46617 --- /dev/null +++ b/agent-client-protocol-rfc.md @@ -0,0 +1,385 @@ +# RFC: Agent Client Protocol Integration for Amazon Q CLI + +- **Feature Name**: `acp` +- **Start Date**: 2025-09-14 +- **RFC PR**: (TBD) +- **Amazon Q Issue**: (TBD) + +## Summary + +Add Agent Client Protocol (ACP) server capability to Amazon Q CLI, allowing editors like Zed and Neovim to use Q as an AI coding assistant through a standardized JSON-RPC interface. + +**What is ACP?** Agent Client Protocol is a JSON-RPC standard that lets editors communicate with AI agents over stdio. Instead of building custom integrations for each editor, agents implement ACP once and work with any ACP-compatible editor. + +**What this adds:** Users run `q acp` to start Q in server mode, then configure their editor to connect to this process. The editor handles the UI while Q provides the AI capabilities - same models, tools, and features as `q chat`. + +## Motivation + +**Problem:** Currently Q CLI provides two options to users: an interactive, CLI-based chat interface and a non-interactive mode. But some use-cases demand interaction in a programmatic or scripted fashion. This includes custom GUIs in editors, automation tools, IDEs, web interfaces, and other applications. Right now each application must adapt to each agent independently, meaning applications are likely only to build on the most widely used alternatives (e.g., the Claude Code SDK, which provides programmatic access to the Claude Code agent). + +**Solution:** ACP provides an alternative, using a JSON-RPC protocol inspired by MCP to let any application integrate with any agent, sending user input and receiving the agent's responses in a streaming fashion. + +**Immediate Benefits:** This provides immediate value to Q CLI users by allowing them to access Q from editors that support ACP (Zed, Neovim) with native integration - same models, tools, and MCP servers, but in their preferred editor instead of switching to terminal. + +**Strategic Benefits:** Supporting ACP also helps boost the protocol itself. The more editors and agents use ACP, the more likely it will succeed as a standard. This avoids the problem of tooling being built atop proprietary options like the Claude Code SDK, which would lock Q CLI out of future editor integrations. + +## Guide-level Explanation + +### Setup + +```bash +# Start Q in ACP server mode +q acp --agent my-profile + +# Configure editor to connect to this process +# (Editor-specific configuration) +``` + +### User Experience + +Once connected, users interact with Q through their editor's AI interface: + +- **Chat with Q** in editor panels/sidebars +- **File operations** - Q can read/write files through the editor (sees unsaved changes) +- **Tool execution** - Editor-native permission prompts for tool use +- **Same capabilities** - Same models, agents, and MCP servers as terminal Q CLI + +### Feature Gating + +Following Q CLI's established pattern, ACP functionality is feature-gated: + +```bash +# Enable ACP +q settings acp.enabled true + +# Start ACP server +q acp --agent my-profile +``` + +This allows: +- **Controlled rollout** during development and testing +- **User opt-in** for ACP functionality +- **Consistent patterns** with other Q CLI features (tangent mode, todo list, etc.) + +## Reference-level Explanation + +### What is ACP? + +Agent Client Protocol is a JSON-RPC protocol for editor-agent communication over stdio. Key concepts: + +**Sessions** - Long-lived conversations between editor and agent. Each session has a unique ID and maintains conversation history. + +**Prompts** - User messages sent to the agent via `session/prompt` method. The agent processes these and streams responses back. + +**Streaming** - Responses stream incrementally via `session/update` notifications, allowing editors to display content as it's generated. + +**Tools** - Agents can execute tools (file operations, shell commands, etc.) with permission checks via `session/request_permission`. + +**Example message flow:** + +```json +// Editor → Q: User sends a message +{"method": "session/prompt", "params": {"sessionId": "123", "messages": [...]}} + +// Q → Editor: Streaming response +{"method": "session/update", "params": {"sessionId": "123", "update": {"AgentMessageChunk": {...}}}} + +// Q → Editor: Tool execution request +{"method": "session/request_permission", "params": {"sessionId": "123", "tool": "fs_write", ...}} +``` + +### Mapping ACP to Q CLI + +**Core Concept Mappings:** +- **ACP Sessions** → **Q Conversation IDs** (same UUID) +- **ACP Prompts** → **Q Chat pipeline** (reuse existing processing) +- **ACP Streaming** → **Q Response streaming** (protocol translation) +- **ACP Tool calls** → **Q Tool system** (existing infrastructure) +- **ACP Permissions** → **Q Agent permissions** (existing flow) + +**Implementation Scope:** This initial implementation focuses on core chat functionality. The following ACP features are deferred to future iterations: +- Authentication (Q CLI currently has no authentication requirement in ACP mode) +- Session modes (ask/architect/code) +- Terminal operations (`terminal/create`, `terminal/output`, etc.) +- Agent plans (`session/update` with plan) +- Advanced tool call reporting (diff content, location tracking, terminal embedding) +- Slash commands +- Rich prompt content (images, audio - currently only text and resource links supported) + +#### Session Lifecycle + +The ACP session lifecycle has four key points: + +**`initialize` message** - ACP connection setup +- Q CLI reports back with capabilities (protocol version, supported features). +- **Authentication:** Q CLI checks if the user is logged in (via `q login`): + - If logged in → returns `authMethods: []` (no authentication required, sessions can be created immediately) + - If NOT logged in → returns `authMethods: [{"id": "cli", "name": "CLI Login", "description": "Run 'q login' to authenticate"}]` +- If `authenticate` is called with `methodId: "cli"`, Q CLI re-checks login status and returns success if logged in, error otherwise. +- The actual login flow (`q login`) happens outside ACP - users must authenticate in a terminal before the editor can create sessions. + +**`session/new`** - Create a new conversation +- Q CLI creates a new `SessionId` (a UUID) and a fresh `ConversationState` and adds it into the SQLite database. +- This uses the agent configured with `q acp --agent XXX` (or default agent). +- The session is configured with ACP-specific versions of built-in file system tools (`fs_read`, `fs_write`, etc.) that route through the ACP protocol instead of accessing the filesystem directly. +- ACP allows the caller to provide a list of MCP servers, these are **added to** the agent's base MCP configuration (additive approach). +- The working directory is specified in the ACP message and stored per session. +- Each session gets its own `ToolManager` instance with session-specific MCP servers. + +**`session/load_session`** - Resume a session +- Q CLI fetches the `ConversationState` from the database and recreates the session context. + +#### Prompt Handling + +**`session/prompt`** - Incoming user message +- A new Q CLI `UserMessage` is created and added into the conversation state. +- The chat request is sent to the backend server and responses are streamed back via ACP `session/update` notifications. +- **Content types:** Q CLI advertises support for text (baseline), images, resource links (baseline), and embedded resources. Audio is not supported. + - Text content is included directly in the prompt + - Images are mapped to Q CLI's `ImageBlock` type + - Resource links are converted to XML representation: `` + - Embedded resources are converted to XML: `content` + +#### File Operations + +File operations using ACP do not go directly to the file system. Instead they are directed over the protocol so that the editor can provide a simulated file system (including unsaved buffer contents). + +When a session is created in ACP mode, Q CLI configures it with ACP-specific versions of built-in file system tools (`fs_read`, `fs_write`, etc.). These tool implementations route their operations through the ACP protocol (calling methods like `fs/read_text_file` on the client) rather than accessing the operating system directly. This allows the agent to see unsaved editor changes and lets the client track all file modifications. + +#### Tool Execution + +When using ACP, Q CLI requests permission to use a tool by sending a `session/request_permission` to the client and awaiting the response. Trusted tools will not trigger this flow. + +When tool use occurs, Q CLI reports that tool use using ACP `ToolCall` messages. We begin by mapping all tool use output to plain text. Later PRs can explore how to provide custom output formats that ACP supports (e.g., diffs for `fs_write`, structured terminal output for `bash`). + +#### Response Streaming + +Q CLI converts its existing streaming responses to ACP `session/update` notifications as the model generates content. Text responses and code blocks are sent as `AgentMessageChunk` updates, allowing the editor to display them incrementally. Tool execution is reported through `ToolCall` messages when tools start and `ToolCallUpdate` messages as they progress and complete. This preserves Q CLI's existing streaming behavior while adapting it to ACP's notification model. + +**Stop reasons:** When a prompt turn completes, Q CLI maps its completion state to ACP stop reasons: +- `ResponseEvent::EndStream` (normal completion) → `end_turn` +- Stream errors → `refusal` +- User cancellation → `cancelled` (when cancellation is implemented) +- Q CLI's backend does not currently expose `max_tokens` or `max_turn_requests` limits, so these stop reasons are not used. + +#### Agent Plans + +ACP supports agent plans - structured task breakdowns sent via `session/update` notifications with `SessionUpdate::Plan`. Q CLI has a todo system that could potentially map to this feature. + +**Q CLI Todo System:** +- **Structure**: `TodoListState` contains tasks with `task_description` and `completed` boolean +- **Operations**: Create, Complete, Add, Remove, Load, Lookup via `TodoList` enum commands +- **Storage**: Persisted as JSON files in `.amazonq/cli-todo-lists/` +- **Feature gating**: Controlled by `chat.enableTodoList` setting + +**ACP Plan Structure:** +- **Plan**: Contains a list of `PlanEntry` objects +- **PlanEntry**: Has `content` (description), `priority` (high/medium/low), and `status` (pending/in_progress/completed) +- **Updates**: Full plan replacement via `SessionUpdate::Plan` - agent sends complete entry list on each update + +**Mapping Considerations:** + +*Semantic differences:* +- Q CLI todos track *actual work being done* - they're a project management tool +- ACP plans communicate *agent's intended strategy* - they're a visibility/transparency mechanism +- Q CLI todos persist across sessions and can be resumed later +- ACP plans are session-scoped and describe the current turn's execution strategy + +*Structural mapping:* +- Q CLI `Task.task_description` → ACP `PlanEntry.content` ✓ +- Q CLI `Task.completed` → ACP `PlanEntry.status` (completed vs pending) - but Q CLI doesn't track `in_progress` +- Q CLI has no priority concept → ACP `PlanEntry.priority` would need default value +- Q CLI's `context` and `modified_files` have no ACP equivalent + +*Implementation challenges:* +- Q CLI todo system is a tool that the model explicitly invokes, not automatic planning +- ACP plans are sent automatically as part of the agent's response streaming +- Would need to decide when to generate and send plan updates (after each TodoList tool call? on every prompt?) +- Current ACP session code ignores `SessionUpdate::Plan` entirely + +**Decision:** Explicitly deferred for future implementation - requires feedback from Q CLI team. + +The semantic mismatch between Q CLI's project management todos and ACP's strategy communication plans suggests they may serve different purposes. However, there are reasonable arguments for bridging them: + +*Potential approaches:* +1. Automatically generate ACP plans from Q CLI todo state when the model uses the TodoList tool +2. Implement separate agent planning capability specifically for ACP visibility +3. Leave unimplemented and document that Q CLI doesn't expose execution plans through ACP + +*Open questions for Q CLI team:* +- Should Q CLI's todo system be exposed to ACP clients as plans? +- If so, how should we handle the semantic differences (project management vs. visibility)? +- Should we implement automatic plan generation, or require explicit model support? +- What's the desired user experience when using Q through an ACP client like Zed? + +#### Session Modes + +ACP defines session modes as a general concept for dynamic agent behavior changes within a session. Q CLI has agent configurations but these serve a different purpose. + +**Q CLI Agent Configurations:** +- Declarative JSON files specifying: name, prompt, tools, MCP servers, resources, hooks, model +- Selected when starting ACP server: `q acp --agent my-profile` +- Session-scoped - the agent is fixed for the lifetime of the session +- No concept of switching "modes" within a session + +**ACP Session Modes:** +- General concept for dynamic behavior changes within a session +- Agents can define their own custom modes (ask/architect/code are just examples) +- Can be changed via `session/set_mode` method +- Mode changes can be triggered by user or agent +- Advertised in `session/new` and `session/load` responses via `modes` field + +**Current Implementation:** +- Q CLI returns `modes: None` in `NewSessionResponse` and `LoadSessionResponse` +- `session/set_mode` returns `method_not_found` error +- This is valid per the ACP protocol - modes are optional + +**Rationale:** +Q CLI agents are architectural configurations, not runtime behavioral modes. While Q CLI agents *could* theoretically be exposed as switchable ACP modes, this would require: +1. Loading multiple agent configurations per session +2. Implementing agent-switching logic mid-session +3. Deciding how to handle state transitions (conversation history, tool state, etc.) + +**Decision:** Initial implementation does not support session modes. The current behavior (modes: None, set_mode returns error) is the intended design. Future work could explore exposing Q CLI agents as switchable modes if user demand warrants the complexity. + +#### Terminal Operations + +ACP supports terminal operations (`terminal/create`, `terminal/output`, `terminal/release`, etc.) for executing and managing shell commands with live output streaming. This could enhance Q CLI's bash tool integration. + +**Q CLI's Current Bash Tool:** +- Executes shell commands and returns text output +- Supports background execution via `run_in_background` parameter +- Output is returned as plain text in tool call results + +**ACP Terminal Operations:** +- `terminal/create` - Create a new terminal session +- `terminal/output` - Stream live output from terminal +- `terminal/release` - Clean up terminal resources +- Tool calls can reference terminal IDs for "follow-along" execution +- Enables rich terminal display in editors + +**Future Enhancement Opportunity:** +When Q CLI runs the bash tool in ACP mode, it could: +1. Create an ACP terminal via `terminal/create` +2. Execute commands in that terminal +3. Stream live output via `terminal/output` notifications +4. Reference the terminal ID in `ToolCall` messages +5. Provide much richer terminal experience in editors like Zed + +**Decision:** Deferred for future implementation. The current text-based approach works for initial ACP support. Terminal operations would be a valuable enhancement for live command execution visibility in editors. + +#### Advanced Tool Call Features + +ACP tool calls support several advanced features that could enhance the editor experience but are not required for basic functionality. + +**Available Features:** +- **Diff content**: Tool calls can report file modifications as diffs (old text vs new text). Useful for `fs_write` operations to show exactly what changed. +- **Location tracking**: Tool calls can report file paths and line numbers they're working with, enabling "follow-along" features in editors. +- **Terminal embedding**: Tool calls can embed live terminal output by referencing a terminal ID (requires terminal operations). +- **Detailed stop reasons**: Fine-grained reporting of why tool execution completed or failed. + +**Current Implementation:** +Q CLI uses simple text content for all tool call reporting - tool results are converted to plain text and sent in `ToolCallUpdate` messages. This provides basic functionality without additional complexity. + +**Decision:** All advanced tool call features are deferred for future implementation. The current text-based approach meets the goal of enabling minimal ACP support. These features can be added incrementally to improve the editor experience. + +#### Cancellation + +ACP defines detailed cancellation semantics that provide responsive user experience in editors. This should be implemented for the initial release. + +**ACP Cancellation Requirements:** +- Client sends `session/cancel` notification +- Agent must abort LLM requests and tool executions +- Agent must respond to pending permission requests with `cancelled` outcome +- Agent must send final `session/prompt` response with `cancelled` stop reason +- Updates may still arrive after cancellation but before final response + +**Current State:** +Q CLI's current ACP implementation has cancellation as a no-op - `handle_cancel` exists but doesn't actually abort ongoing operations. + +**Implementation Needs:** +1. **Session actor cancellation**: Add cancellation token/mechanism to `AcpSessionActor` +2. **LLM request abortion**: Cancel ongoing streaming requests to Q CLI's backend +3. **Tool execution abortion**: Stop running tools (especially long-running bash commands) +4. **Permission request cleanup**: Respond to pending `session/request_permission` calls with cancelled status +5. **Final response**: Send `PromptResponse` with `cancelled` stop reason + +**Rationale:** Cancellation is important for editor responsiveness - users expect to be able to stop long-running AI operations. The implementation should be straightforward given Q CLI's actor-based architecture. + +**Decision:** Implement proper cancellation semantics. This is essential for good editor UX and shouldn't require major architectural changes. + +### Architecture + +The ACP implementation uses an actor-based design for clean encapsulation of mutable state and canonical message ordering. Each actor is defined in its own module under `src/acp`. + +**Why actors?** Actors own any piece of mutable state and serve as a "canonical ordering point" where needed. This eliminates shared mutable state (no RwLocks), provides natural backpressure through bounded channels, and enables clean testing through message injection. + +#### Component Overview + +``` +┌─────────────┐ JSON-RPC ┌─────────────┐ Actor Messages ┌───────────────┐ +│ Editor │ ◄──────────────► │AcpAgentForward│ ◄─────────────────► │ AcpServerActor│ +│ (Zed, etc.) │ (stdio) │ │ │ │ +└─────────────┘ └─────────────┘ └───────────────┘ + │ + Actor Messages + │ + ▼ + ┌───────────────┐ + │AcpSessionActor│ + │ (per session) │ + └───────────────┘ +``` + +**AcpAgentForward** - Thin forwarding layer implementing `acp::Agent` trait +- Receives JSON-RPC requests from editor over stdio +- Forwards requests as messages to server actor via bounded channels +- Returns responses back to editor + +**AcpServerActor** - Top-level coordinator managing sessions and routing messages +- Maintains `HashMap` for session routing +- Handles `initialize`, `new_session`, `load_session` methods +- Routes `session/prompt` and other session methods to appropriate session actor + +**AcpSessionActor** - Per-session actors that own `ConversationState` and process prompts +- Each session actor owns its `ConversationState` and `ToolManager` instance +- Processes prompts with Q CLI's existing chat pipeline +- Streams responses back through the actor hierarchy + +#### Message Flow + +When an ACP client sends a prompt: + +1. `AcpAgentForward` receives JSON-RPC request over stdio +2. Forwards as `ServerMethod::Prompt` message to server actor via channel +3. Server actor routes as `SessionMethod::Prompt` to appropriate session actor +4. Session actor processes with `ConversationState` and streams responses back +5. Response flows back through the same channel hierarchy to ACP client + +#### Key Design Decisions + +**Bounded channels** - Uses `mpsc::channel(32)` for message passing between actors +- Provides natural backpressure when actors can't keep up +- Prevents unbounded memory growth under load + +**Response channels** - Uses `oneshot::channel()` for request-response patterns +- Each request carries a oneshot sender for the response +- Enables async/await style without shared state + +**Error handling** - Internal code uses `eyre::Result`, protocol boundary uses `acp::Error` +- Clean separation between internal errors and protocol errors +- Conversion at the boundary in `AcpAgentForward` +- Errors unrelated to the protocol (e.g., internal failures, unexpected states) are converted to JSON-RPC "internal error" responses + +**LocalSet** - Uses `tokio::task::LocalSet` for !Send ACP futures +- ACP library's futures are not Send +- LocalSet allows running them in a single-threaded context + +**Session ownership** - Each session actor owns its mutable state +- No RwLocks or shared mutable state +- Actors provide canonical ordering point for session operations + +**Cancellation** - Actor design enables clean cancellation +- Session actors can be sent cancel messages +- Can abort ongoing LLM requests and tool executions +- Natural place to implement proper cancellation semantics (currently no-op) \ No newline at end of file diff --git a/crates/acp-client-test/Cargo.toml b/crates/acp-client-test/Cargo.toml new file mode 100644 index 0000000000..b9331918aa --- /dev/null +++ b/crates/acp-client-test/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "acp-client-test" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +publish.workspace = true +version.workspace = true +license.workspace = true + +[dependencies] +agent-client-protocol = { workspace = true } +tokio = { version = "1.0", features = ["full"] } +tokio-util = { version = "0.7", features = ["compat"] } +anyhow = "1.0" +serde_json = "1.0" + +[lints] +workspace = true diff --git a/crates/acp-client-test/src/main.rs b/crates/acp-client-test/src/main.rs new file mode 100644 index 0000000000..d08bc64c34 --- /dev/null +++ b/crates/acp-client-test/src/main.rs @@ -0,0 +1,189 @@ +use std::sync::Arc; +use agent_client_protocol as acp; +use anyhow::Result; +use serde_json::value::RawValue; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +struct SimpleClient; + +impl acp::Client for SimpleClient { + async fn request_permission( + &self, + args: acp::RequestPermissionRequest, + ) -> Result { + println!("Permission requested: {:?}", args); + Ok(acp::RequestPermissionResponse { + outcome: acp::RequestPermissionOutcome::Selected { + option_id: acp::PermissionOptionId(Arc::from("allow-once")), + }, + meta: None, + }) + } + + async fn write_text_file( + &self, + args: acp::WriteTextFileRequest, + ) -> Result { + println!("Write file: {:?}", args.path); + Ok(acp::WriteTextFileResponse { meta: None }) + } + + async fn read_text_file( + &self, + args: acp::ReadTextFileRequest, + ) -> Result { + println!("Read file: {:?}", args.path); + Ok(acp::ReadTextFileResponse { + content: "Hello from file!".to_string(), + meta: None, + }) + } + + async fn create_terminal( + &self, + _args: acp::CreateTerminalRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn terminal_output( + &self, + _args: acp::TerminalOutputRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn release_terminal( + &self, + _args: acp::ReleaseTerminalRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn wait_for_terminal_exit( + &self, + _args: acp::WaitForTerminalExitRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn kill_terminal_command( + &self, + _args: acp::KillTerminalCommandRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn session_notification(&self, args: acp::SessionNotification) -> Result<(), acp::Error> { + match args.update { + acp::SessionUpdate::AgentMessageChunk { content } => { + let text = match content { + acp::ContentBlock::Text(text_content) => text_content.text, + _ => "".to_string(), + }; + println!("Agent: {}", text); + } + _ => { + println!("Other update: {:?}", args.update); + } + } + Ok(()) + } + + async fn ext_method( + &self, + _method: Arc, + _params: Arc, + ) -> Result, acp::Error> { + Err(acp::Error::method_not_found()) + } + + async fn ext_notification( + &self, + _method: Arc, + _params: Arc, + ) -> Result<(), acp::Error> { + Err(acp::Error::method_not_found()) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + println!("Starting ACP client test..."); + + // Start Q CLI in ACP mode as subprocess + let mut child = tokio::process::Command::new("cargo") + .args(&["run", "--bin", "chat_cli", "--", "acp"]) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .kill_on_drop(true) + .spawn()?; + + let stdin = child.stdin.take().unwrap().compat_write(); + let stdout = child.stdout.take().unwrap().compat(); + + let local_set = tokio::task::LocalSet::new(); + let result = local_set.run_until(async move { + // Set up client connection + let (client_conn, client_handle_io) = acp::ClientSideConnection::new( + SimpleClient, + stdin, + stdout, + |fut| { tokio::task::spawn_local(fut); } + ); + + // Start I/O handler + tokio::task::spawn_local(client_handle_io); + + println!("Initializing ACP protocol..."); + + // Initialize protocol + use acp::Agent; + let init_response = client_conn.initialize(acp::InitializeRequest { + protocol_version: acp::V1, + client_capabilities: acp::ClientCapabilities::default(), + meta: None, + }).await?; + + println!("Initialized! Protocol version: {:?}", init_response.protocol_version); + + // Create session + println!("Creating session..."); + let session_response = client_conn.new_session(acp::NewSessionRequest { + mcp_servers: Vec::new(), + cwd: std::env::current_dir()?, + meta: None, + }).await?; + + println!("Session created: {:?}", session_response.session_id); + + // Send a message + println!("Sending message: 'Hello, Q!'"); + let prompt_response = client_conn.prompt(acp::PromptRequest { + session_id: session_response.session_id.clone(), + prompt: vec![acp::ContentBlock::Text(acp::TextContent { + annotations: None, + text: "Hello, Q!".to_string(), + meta: None, + })], + meta: None, + }).await?; + + println!("Prompt response: {:?}", prompt_response.stop_reason); + + // Wait a bit for any streaming responses + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + println!("Test completed successfully!"); + + Ok::<(), anyhow::Error>(()) + }).await; + + match result { + Ok(_) => println!("ACP client test passed!"), + Err(e) => println!("ACP client test failed: {}", e), + } + + Ok(()) +} diff --git a/crates/chat-cli/Cargo.toml b/crates/chat-cli/Cargo.toml index 51648f35cb..fae15439e8 100644 --- a/crates/chat-cli/Cargo.toml +++ b/crates/chat-cli/Cargo.toml @@ -16,6 +16,7 @@ default = [] wayland = ["arboard/wayland-data-control"] [dependencies] +agent-client-protocol.workspace = true amzn-codewhisperer-client.workspace = true amzn-codewhisperer-streaming-client.workspace = true amzn-consolas-client.workspace = true diff --git a/crates/chat-cli/src/api_client/error.rs b/crates/chat-cli/src/api_client/error.rs index 4ac80f329c..e90a63912e 100644 --- a/crates/chat-cli/src/api_client/error.rs +++ b/crates/chat-cli/src/api_client/error.rs @@ -92,6 +92,9 @@ pub enum ApiClientError { status_code: Option, }, + #[error("Mock LLM error")] + MockLLMError, + // Credential errors #[error("failed to load credentials: {}", .0)] Credentials(CredentialsError), @@ -125,6 +128,7 @@ impl ApiClientError { Self::SmithyBuild(_) => None, Self::AuthError(_) => None, Self::ModelOverloadedError { status_code, .. } => *status_code, + Self::MockLLMError => None, Self::MonthlyLimitReached { status_code } => *status_code, Self::Credentials(_e) => None, Self::ListAvailableModelsError(e) => sdk_status_code(e), @@ -153,6 +157,7 @@ impl ReasonCode for ApiClientError { Self::SmithyBuild(_) => "SmithyBuildError".to_string(), Self::AuthError(_) => "AuthError".to_string(), Self::ModelOverloadedError { .. } => "ModelOverloadedError".to_string(), + Self::MockLLMError => "MockLLMError".to_string(), Self::MonthlyLimitReached { .. } => "MonthlyLimitReached".to_string(), Self::Credentials(_) => "CredentialsError".to_string(), Self::ListAvailableModelsError(e) => sdk_error_code(e), diff --git a/crates/chat-cli/src/api_client/mod.rs b/crates/chat-cli/src/api_client/mod.rs index f21b448b77..3c8d04f785 100644 --- a/crates/chat-cli/src/api_client/mod.rs +++ b/crates/chat-cli/src/api_client/mod.rs @@ -15,12 +15,7 @@ use amzn_codewhisperer_client::Client as CodewhispererClient; use amzn_codewhisperer_client::operation::create_subscription_token::CreateSubscriptionTokenOutput; use amzn_codewhisperer_client::types::Origin::Cli; use amzn_codewhisperer_client::types::{ - Model, - OptInFeatureToggle, - OptOutPreference, - SubscriptionStatus, - TelemetryEvent, - UserContext, + Model, OptInFeatureToggle, OptOutPreference, SubscriptionStatus, TelemetryEvent, UserContext, }; use amzn_codewhisperer_streaming_client::Client as CodewhispererStreamingClient; use amzn_qdeveloper_streaming_client::Client as QDeveloperStreamingClient; @@ -33,38 +28,22 @@ use aws_types::request_id::RequestId; use aws_types::sdk_config::StalledStreamProtectionConfig; pub use endpoints::Endpoint; pub use error::ApiClientError; -use parking_lot::Mutex; pub use profile::list_available_profiles; use serde_json::Map; use tokio::sync::RwLock; -use tracing::{ - debug, - error, -}; +use tracing::{debug, error}; use crate::api_client::credentials::CredentialsChain; use crate::api_client::delay_interceptor::DelayTrackingInterceptor; -use crate::api_client::model::{ - ChatResponseStream, - ConversationState, -}; +use crate::api_client::model::{ChatResponseStream, ConversationState}; use crate::api_client::opt_out::OptOutInterceptor; use crate::api_client::send_message_output::SendMessageOutput; use crate::auth::builder_id::BearerResolver; -use crate::aws_common::{ - UserAgentOverrideInterceptor, - app_name, - behavior_version, -}; +use crate::aws_common::{UserAgentOverrideInterceptor, app_name, behavior_version}; use crate::database::settings::Setting; -use crate::database::{ - AuthProfile, - Database, -}; -use crate::os::{ - Env, - Fs, -}; +use crate::database::{AuthProfile, Database}; +use crate::mock_llm::MockLLM; +use crate::os::{Env, Fs}; // Opt out constants pub const X_AMZN_CODEWHISPERER_OPT_OUT_HEADER: &str = "x-amzn-codewhisperer-optout"; @@ -93,7 +72,7 @@ pub struct ApiClient { client: CodewhispererClient, streaming_client: Option, sigv4_streaming_client: Option, - mock_client: Option>>>>, + mock_llm: Option>, profile: Option, model_cache: ModelCache, } @@ -133,7 +112,7 @@ impl ApiClient { client, streaming_client: None, sigv4_streaming_client: None, - mock_client: None, + mock_llm: None, profile: None, model_cache: Arc::new(RwLock::new(None)), }; @@ -212,7 +191,7 @@ impl ApiClient { client, streaming_client, sigv4_streaming_client, - mock_client: None, + mock_llm: None, profile, model_cache: Arc::new(RwLock::new(None)), }) @@ -572,38 +551,143 @@ impl ApiClient { Err(err.into()) }, } - } else if let Some(client) = &self.mock_client { - let mut new_events = client.lock().next().unwrap_or_default().clone(); - new_events.reverse(); - - return Ok(SendMessageOutput::Mock(new_events)); + } else if let Some(mock_llm) = &self.mock_llm { + // Spawn the mock LLM for this conversation + let mock_rx = mock_llm.spawn_turn(history.unwrap_or_default(), user_input_message.content); + return Ok(SendMessageOutput::Mock(mock_rx)); } else { unreachable!("One of the clients must be created by this point"); } } - /// Only meant for testing. Do not use outside of testing responses. + /// Helper to convert JSON mock responses to MockLLM script (for Q_MOCK_CHAT_RESPONSE compatibility). + /// + /// This is a convenience method that invokes [`Self::set_mock_llm`] with a simple script + /// that emits the events described by the `json` argument in sequence. + /// + /// ## Expected Format + /// + /// The JSON should be an array of response groups, where each response group is an array + /// of events that will be sent for one user message: + /// + /// ```json + /// [ + /// [ + /// "I'll help you with that", + /// { + /// "tool_use_id": "1", + /// "name": "fs_write", + /// "args": {"path": "/file.txt", "content": "Hello"} + /// } + /// ], + /// [ + /// "Task completed successfully" + /// ] + /// ] + /// ``` + /// + /// Each event can be: + /// - **String**: Assistant text response + /// - **Object**: Tool use with `tool_use_id`, `name`, and `args` fields + /// + /// The script will send one response group per user message received. pub fn set_mock_output(&mut self, json: serde_json::Value) { - let mut mock = Vec::new(); + // Convert JSON array to response groups (each array element is one response) + let mut response_groups = Vec::new(); for response in json.as_array().unwrap() { - let mut stream = Vec::new(); + let mut events = Vec::new(); for event in response.as_array().unwrap() { match event { serde_json::Value::String(assistant_text) => { - stream.push(ChatResponseStream::AssistantResponseEvent { + events.push(ChatResponseStream::AssistantResponseEvent { content: assistant_text.clone(), }); }, serde_json::Value::Object(tool_use) => { - stream.append(&mut split_tool_use_event(tool_use)); + events.append(&mut split_tool_use_event(tool_use)); }, other => panic!("Unexpected value: {:?}", other), } } - mock.push(stream); + response_groups.push(events); } - self.mock_client = Some(Arc::new(Mutex::new(mock.into_iter()))); + // Create mock LLM that cycles through response groups based on user message count + self.set_mock_llm(move |mut ctx| { + let response_groups = response_groups.clone(); + async move { + // Determine which response group to use based on user message count + // Each user message gets the next response group in sequence + let response_index = ctx.count_user_messages().saturating_sub(1); // 0-indexed + + tracing::debug!( + actor="MockLLM", + user_message=ctx.current_user_message(), + count=ctx.count_user_messages(), + response_index, + response_groups=?response_groups, + ); + + // Send the corresponding response group + if response_index < response_groups.len() { + for event in &response_groups[response_index] { + match event { + ChatResponseStream::AssistantResponseEvent { content } => { + ctx.respond(content).await?; + }, + ChatResponseStream::ToolUseEvent { + tool_use_id, + name, + input, + stop, + } => { + ctx.respond_tool_use(tool_use_id.clone(), name.clone(), input.clone(), stop.clone()) + .await?; + }, + _ => {}, // Ignore other event types + } + } + } else { + // No more predefined responses, send a fallback + ctx.respond("I don't have a response configured for this message.").await?; + } + + Ok(()) + } + }); + } + + /// Set a mock LLM script for testing. + /// + /// The closure receives a `MockLLMContext` and should return a Future that will be run + /// in a separate task. The closure can contain arbitrary Rust logic and interact with + /// the mock LLM context to: + /// + /// - Read incoming user messages via `context.recv_request().await` + /// - Send LLM responses via `context.send_response(response).await` + /// - Implement complex conversation flows with branching logic + /// - Access conversation state and history + /// + /// Example: + /// ```rust,ignore + /// client.set_mock_llm(|mut context| async move { + /// while let Some(request) = context.recv_request().await { + /// if request.messages.last().unwrap().content.contains("Greece") { + /// context.send_response(json!({ + /// "content": [{"type": "text", "text": "The capital is Athens"}] + /// })).await; + /// } + /// } + /// }); + /// ``` + #[cfg_attr(not(test), allow(dead_code))] + pub fn set_mock_llm(&mut self, closure: F) + where + F: Fn(crate::mock_llm::MockLLMContext) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + let mock_llm = MockLLM::new(closure); + self.mock_llm = Some(Arc::new(mock_llm)); } // Add a helper method to check if using non-default endpoint @@ -674,11 +758,7 @@ fn split_tool_use_event(value: &Map) -> Vec), + Mock(mpsc::Receiver>), } impl SendMessageOutput { @@ -29,7 +30,13 @@ impl SendMessageOutput { .await? .map(|s| s.into())), SendMessageOutput::QDeveloper(output) => Ok(output.send_message_response.recv().await?.map(|s| s.into())), - SendMessageOutput::Mock(vec) => Ok(vec.pop()), + SendMessageOutput::Mock(rx) => { + match rx.recv().await { + Some(Ok(stream)) => Ok(Some(stream)), + Some(Err(_)) => Err(ApiClientError::MockLLMError), + None => Ok(None), + } + } } } } diff --git a/crates/chat-cli/src/auth/pkce.rs b/crates/chat-cli/src/auth/pkce.rs index e001abe972..5b378c66ce 100644 --- a/crates/chat-cli/src/auth/pkce.rs +++ b/crates/chat-cli/src/auth/pkce.rs @@ -509,8 +509,6 @@ mod tests { #[ignore = "not in ci"] #[tokio::test] async fn test_pkce_flow_e2e() { - tracing_subscriber::fmt::init(); - let start_url = "https://amzn.awsapps.com/start".to_string(); let region = Region::new("us-east-1"); let client = client(region.clone()); diff --git a/crates/chat-cli/src/cli/acp.rs b/crates/chat-cli/src/cli/acp.rs new file mode 100644 index 0000000000..6ae4f6d943 --- /dev/null +++ b/crates/chat-cli/src/cli/acp.rs @@ -0,0 +1,91 @@ +//! Agent Client Protocol (ACP) implementation using actor pattern +//! +//! This module implements ACP server functionality using Alice Ryhl's actor pattern +//! for clean separation of concerns and message passing instead of shared state. +//! +//! ## Architecture Flow +//! +//! When an ACP client sends a prompt request: +//! +//! ```text +//! ACP Client AcpAgentForward AcpServerActor AcpSessionActor +//! │ │ │ │ +//! │ acp.prompt("Hi") │ │ │ +//! ├──────JSON-RPC────────────→│ │ │ +//! │ │ ServerMethod::Prompt │ │ +//! │ ├────────channel─────────→│ │ +//! │ │ │ SessionMethod::Prompt │ +//! │ │ ├───────channel────────→│ +//! │ │ │ │ ConversationState +//! │ │ │ │ processes prompt +//! │ │ │ │ with LLM +//! │ │ │ │ +//! │ │ │ ←──────response───────│ +//! │ │ ←──────response─────────│ │ +//! │ ←────JSON-RPC─────────────│ │ │ +//! ``` +//! +//! ## Key Benefits +//! +//! - **No shared state**: Each actor owns its data (no RwLocks) +//! - **Natural backpressure**: Bounded channels prevent unbounded queuing +//! - **Clean separation**: Protocol handling, session management, and conversation processing are separate +//! - **Easy testing**: Each actor can be tested independently + +use std::process::ExitCode; + +use clap::Parser; +use eyre::Result; +use tokio::task::LocalSet; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +use crate::database::settings::Setting; +use crate::os::Os; + +mod server; +mod server_session; +mod server_connection; +pub(crate) mod util; +#[cfg(test)] +mod client_connection; +#[cfg(test)] +mod client_session; +#[cfg(test)] +mod client_dispatch; + +pub use server::AcpServerHandle; +pub use server_connection::AcpServerConnectionHandle; + +#[cfg(test)] +mod tests; + +#[derive(Debug, Parser, PartialEq)] +pub struct AcpArgs { + /// Agent to use for ACP sessions + #[arg(long)] + pub agent: Option, +} + +impl AcpArgs { + pub async fn run(self, os: &mut Os) -> Result { + // Check feature flag + if !os.database.settings.get_bool(Setting::EnabledAcp).unwrap_or(false) { + eprintln!("ACP is disabled. Enable with: q settings acp.enabled true"); + return Ok(ExitCode::FAILURE); + } + + let agent_name = self.agent.unwrap_or_else(|| "default".to_string()); + + tracing::info!("Starting ACP server with agent: {}", agent_name); + + LocalSet::new().run_until(async move { + // Set up ACP connection with stdio + let stdin = tokio::io::stdin().compat(); + let stdout = tokio::io::stdout().compat_write(); + + // Create transport actor (will receive connection later) + AcpServerConnectionHandle::execute(agent_name, os, stdout, stdin).await?; + Ok(ExitCode::SUCCESS) + }).await + } +} diff --git a/crates/chat-cli/src/cli/acp/client_connection.rs b/crates/chat-cli/src/cli/acp/client_connection.rs new file mode 100644 index 0000000000..10a32825af --- /dev/null +++ b/crates/chat-cli/src/cli/acp/client_connection.rs @@ -0,0 +1,360 @@ +//! ACP Client Actor - Manages client-side ACP connection with actor pattern +//! +//! ## Architecture Overview +//! +//! The client side mirrors the server's actor architecture but adds a dispatch layer +//! for routing incoming notifications to the correct session. +//! +//! **Actor Hierarchy:** +//! - `AcpClientConnectionHandle` - Owns the `ClientSideConnection`, sends prompts to server +//! - `AcpClientDispatchHandle` - Routes incoming notifications to sessions by session_id +//! - `AcpClientSessionHandle` - Per-session handle that sends prompts and receives notifications +//! +//! ## Message Flow +//! +//! When a test sends a prompt and receives the response: +//! +//! ```text +//! Session Handle Connection Handle ClientSideConnection AcpClientForward Dispatch Handle +//! ────────────── ───────────────── ──────────────────── ──────────────── ─────────────── +//! │ │ │ │ │ +//! │ prompt("Hi") │ │ │ │ +//! ├─────────────────→ │ │ │ │ +//! │ Prompt(req,tx) │ │ │ │ +//! │ via channel │ │ │ │ +//! │ │ client_conn │ │ │ +//! │ │ .prompt() │ │ │ +//! │ ├───────────────────→ │ │ │ +//! │ │ async call │ │ │ +//! │ │ │ │ │ +//! │ │ (sends JSON-RPC to server) │ | +//! │ │ │ │ │ +//! │ │ │ │ │ +//! ╔════════════════════════ NOTIFICATION LOOP (repeats) ══════════════════════════════════╗ +//! ║ │ │ │ │ │ +//! ║ │ │ (receives session_notification | | +//! ║ │ │ over JSON-RPC) │ | +//! ║ │ │ │ │ │ +//! ║ │ │ │ session_ │ │ +//! ║ │ │ │ notification() │ │ +//! ║ │ │ ├───────────────────→ │ │ +//! ║ │ │ │ async call │ │ +//! ║ │ │ │ │ │ +//! ║ │ │ │ │ ClientCallback │ +//! ║ │ │ │ ├────────────────→ │ +//! ║ │ │ │ │ via channel │ +//! ║ │ │ │ │ │ +//! ║ │ │ │ │ lookup by │ +//! ║ │ │ │ │ session_id │ +//! ║ │ │ │ │ │ +//! ║ │ ← ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ +//! ║ │ ClientCallback │ │ │ │ +//! ║ │ via callback_rx │ │ │ │ +//! ║ │ │ │ │ │ +//! ║ │ (accumulate) │ │ │ │ +//! ║ │ │ │ │ │ +//! ╚═══════════════════════════════════════════════════════════════════════════════════════╝ +//! │ │ │ │ │ +//! │ │ (receives prompt response over JSON-RPC) │ │ +//! │ │ │ │ │ +//! │ │← - ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ │ +//! │ │ PromptResponse │ │ │ +//! │ │ returned from │ │ │ +//! │ │ async call │ │ │ +//! │ │ │ │ │ +//! │← ─ ─ ─ ─ ─ ─ ─ ─ -│ │ │ │ +//! │ PromptResponse │ │ │ │ +//! │ via oneshot rx │ │ │ │ +//! │ │ │ │ │ +//! │ return text │ │ │ │ +//! ▼ │ │ │ │ +//! ``` +//! +//! ## Key Design Decisions +//! +//! **Why ClientCallback instead of raw notifications?** +//! The `ClientCallback` enum bundles each notification with a oneshot response channel. +//! This allows the session to acknowledge receipt and allows the dispatch actor to detect +//! when sessions fail to process notifications. +//! +//! **Why dispatch actor instead of broadcast?** +//! The original design used `broadcast::channel` to fan out all notifications to all sessions, +//! with each session filtering by session_id. This was simple but inefficient (wakes all sessions, +//! risks lag/drops). The dispatch actor uses a HashMap to route notifications directly to the +//! relevant session, providing better performance and clearer error handling. +//! +//! **Session registration:** +//! When a session is created, it registers itself with dispatch by sending its session_id +//! and a `mpsc::Sender`. The dispatch actor uses `retain()` on each message +//! to automatically clean up closed sessions without explicit unregistration. + +use std::sync::Arc; + +use agent_client_protocol::{self as acp, Agent, SessionId}; +use eyre::Result; +use futures::{AsyncRead, AsyncWrite}; +use serde_json::value::RawValue; +use tokio::sync::{mpsc, oneshot}; + +use crate::cli::acp::client_dispatch::AcpClientDispatchHandle; +use crate::cli::acp::util::ignore_error; + +use super::client_session::AcpClientSessionHandle; + +/// Handle to the ACP client actor +#[derive(Clone)] +pub struct AcpClientConnectionHandle { + client_tx: mpsc::Sender, +} + +/// Messages sent to the client actor +#[derive(Debug)] +pub(super) enum ClientConnectionMethod { + Initialize( + acp::InitializeRequest, + oneshot::Sender>, + ), + NewSession( + acp::NewSessionRequest, + oneshot::Sender>, + ), + + // Subtle: the response to a prompt request is always sent to the + // "dispatch" actor, which will route it to the appropriate session. + // This ensures that the prompt termination is ordered with respect + // to the other notifications that are routed to that same session. + Prompt(acp::PromptRequest), + + #[allow(dead_code)] // Will be used when client-side cancellation is implemented + Cancel(acp::CancelNotification, oneshot::Sender>), +} + +impl AcpClientConnectionHandle { + /// Spawn a new ACP client connection that communicates over the given streams + /// Returns a handle that can be used to interact with the ACP server + pub async fn spawn_local( + outgoing_bytes: impl Unpin + AsyncWrite + 'static, + incoming_bytes: impl Unpin + AsyncRead + 'static, + ) -> Result { + // Channel to send messages to the client connection actor: + let (client_tx, mut client_rx) = mpsc::channel(32); + + // Channel to receive notifications from the client connection actor: + let client_dispatch = AcpClientDispatchHandle::spawn_local(); + + let handle = Self { + client_tx: client_tx.clone(), + }; + + // Create an actor to own the connection + tokio::task::spawn_local(async move { + // Create client callbacks that forward to the actor + let callbacks = AcpClientForward::new(client_dispatch.clone()); + + // Set up client-side ACP connection + let (client_conn, client_handle_io) = + acp::ClientSideConnection::new(callbacks, outgoing_bytes, incoming_bytes, |fut| { + tokio::task::spawn_local(fut); + }); + let client_conn = Arc::new(client_conn); + + // Start the client I/O handler + tokio::task::spawn_local(async move { + if let Err(e) = client_handle_io.await { + tracing::error!("ACP client I/O handler failed: {}", e); + } + }); + + while let Some(method) = client_rx.recv().await { + tracing::debug!(actor = "client_connection", event = "message received", ?method); + + match method { + ClientConnectionMethod::Initialize(initialize_request, sender) => { + let response = client_conn.initialize(initialize_request).await; + tracing::debug!(actor = "client_connection", event = "sending response", ?response); + ignore_error(sender.send(response)); + }, + ClientConnectionMethod::NewSession(new_session_request, sender) => { + match client_conn.new_session(new_session_request).await { + Ok(session_info) => { + let result = + AcpClientSessionHandle::new(session_info, &client_dispatch, client_tx.clone()) + .await + .map_err(|_err| acp::Error::internal_error()); + tracing::debug!(actor = "client_connection", event = "sending response", ?result); + ignore_error(sender.send(result)); + }, + Err(err) => { + tracing::debug!(actor = "client_connection", event = "sending response", ?err); + ignore_error(sender.send(Err(err))); + }, + } + }, + ClientConnectionMethod::Prompt(prompt_request) => { + let session_id = prompt_request.session_id.clone(); + + // Spawn off the call to prompt so it runs concurrently. + // + // This way if the user tries to cancel, that message can be received + // and sent to the server. That will cause the server to cancel this prompt call. + tokio::task::spawn_local({ + let client_conn = client_conn.clone(); + let client_dispatch = client_dispatch.clone(); + async move { + let response = client_conn.prompt(prompt_request).await; + tracing::debug!( + actor = "client_connection", + event = "sending response", + ?session_id, + ?response + ); + client_dispatch.client_callback(ClientCallback::PromptResponse(session_id, response)); + } + }); + }, + ClientConnectionMethod::Cancel(cancel_notification, sender) => { + let response = client_conn.cancel(cancel_notification).await; + tracing::debug!(actor = "client_connection", event = "sending response", ?response); + ignore_error(sender.send(response)); + }, + } + } + }); + + Ok(handle) + } + + pub async fn initialize(&self, args: acp::InitializeRequest) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.client_tx + .send(ClientConnectionMethod::Initialize(args, tx)) + .await?; + Ok(rx.await??) + } + + pub async fn new_session(&self, args: acp::NewSessionRequest) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.client_tx + .send(ClientConnectionMethod::NewSession(args, tx)) + .await?; + Ok(rx.await??) + } + + #[cfg_attr(not(test), allow(dead_code))] // Will be used when client-side cancellation is implemented + pub async fn cancel(&self, args: acp::CancelNotification) -> Result<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.client_tx.send(ClientConnectionMethod::Cancel(args, tx)).await?; + Ok(rx.await??) + } +} + +/// Forwarding implementation of acp::Client that sends all calls to client actor +struct AcpClientForward { + client_dispatch: AcpClientDispatchHandle, +} + +impl AcpClientForward { + fn new(client_dispatch: AcpClientDispatchHandle) -> Self { + Self { client_dispatch } + } +} + +impl acp::Client for AcpClientForward { + async fn request_permission( + &self, + _args: acp::RequestPermissionRequest, + ) -> Result { + todo!() + } + + async fn write_text_file( + &self, + _args: acp::WriteTextFileRequest, + ) -> Result { + todo!() + } + + async fn read_text_file(&self, _args: acp::ReadTextFileRequest) -> Result { + todo!() + } + + async fn session_notification(&self, args: acp::SessionNotification) -> Result<(), acp::Error> { + tracing::debug!(actor = "client_connection", event = "session_notification", ?args); + let (tx, rx) = oneshot::channel(); + self.client_dispatch + .client_callback(ClientCallback::Notification(args, tx)); + let result = rx.await; + tracing::debug!( + actor = "client_connection", + event = "session_notification complete", + ?result + ); + result.map_err(acp::Error::into_internal_error)? + } + + async fn create_terminal( + &self, + _args: acp::CreateTerminalRequest, + ) -> Result { + todo!() + } + + async fn terminal_output( + &self, + _args: acp::TerminalOutputRequest, + ) -> Result { + todo!() + } + + async fn release_terminal( + &self, + _args: acp::ReleaseTerminalRequest, + ) -> Result { + todo!() + } + + async fn wait_for_terminal_exit( + &self, + _args: acp::WaitForTerminalExitRequest, + ) -> Result { + todo!() + } + + async fn kill_terminal_command( + &self, + _args: acp::KillTerminalCommandRequest, + ) -> Result { + todo!() + } + + async fn ext_method(&self, _method: Arc, _params: Arc) -> Result, acp::Error> { + todo!() + } + + async fn ext_notification(&self, _method: Arc, _params: Arc) -> Result<(), acp::Error> { + todo!() + } +} + +#[derive(Debug)] +pub(super) enum ClientCallback { + Notification(acp::SessionNotification, oneshot::Sender>), + PromptResponse(acp::SessionId, Result), +} + +impl ClientCallback { + pub fn session_id(&self) -> &SessionId { + match self { + ClientCallback::Notification(session_notification, _) => &session_notification.session_id, + ClientCallback::PromptResponse(session_id, _) => session_id, + } + } + pub fn fail(self, error: acp::Error) { + match self { + ClientCallback::Notification(_session_notification, sender) => { + ignore_error(sender.send(Err(error))); + }, + ClientCallback::PromptResponse(_, _prompt_response) => (), + } + } +} diff --git a/crates/chat-cli/src/cli/acp/client_dispatch.rs b/crates/chat-cli/src/cli/acp/client_dispatch.rs new file mode 100644 index 0000000000..dcc222d8a2 --- /dev/null +++ b/crates/chat-cli/src/cli/acp/client_dispatch.rs @@ -0,0 +1,102 @@ +//! ACP Client Dispatch Actor - Routes session notifications to the correct session + +use std::collections::HashMap; + +use agent_client_protocol::{self as acp}; +use tokio::sync::mpsc::{self, error::SendError}; + +use crate::cli::acp::client_connection::ClientCallback; + +#[derive(Clone)] +pub struct AcpClientDispatchHandle { + /// Send a message to the dispatch actor. + /// + /// This is intentionally *unbounded* so that message can + /// be sent without blocking and so that there is a guaranteed + /// total ordering. This is important because sometimes we have + /// incoming notifications that have be received in order; + /// if we were forcing senders to queue, then even if + /// the first attempts to send were in order, later attempts + /// might not be. + /// + /// Note: This is related to how the ACP library works internally. + /// It spawns out "threads" as data arrives to maintain responsiveness, + /// which means that the ordering of callbacks is not guaranteed; + /// I *think* this is inherently buggy, to be honest, but in practice + /// it works ok if they don't block, from *what I can tell*. + /// I am in conversation with the ACP team to discuss if there is a + /// flaw in my analysis and, if so, how to fix it. However, + /// the refactoring I would suggest would result in callbacks + /// being *sync* not *async*, which again implies you would + /// want to be able to enqueue without blocking. --nikomatsais + dispatch_tx: mpsc::UnboundedSender, +} + +#[derive(Debug)] +pub(super) enum ClientDispatchMethod { + RegisterSession(acp::SessionId, mpsc::Sender), + ClientCallback(ClientCallback), +} + +impl AcpClientDispatchHandle { + pub fn spawn_local() -> Self { + let (dispatch_tx, mut dispatch_rx) = mpsc::unbounded_channel(); + + tokio::task::spawn_local(async move { + let mut sessions: HashMap> = HashMap::new(); + + while let Some(method) = dispatch_rx.recv().await { + sessions.retain(|_, tx| !tx.is_closed()); + + tracing::debug!(actor="client_dispatch", event="message received", ?method); + match method { + ClientDispatchMethod::RegisterSession(session_id, tx) => { + tracing::debug!(actor="client_dispatch", event="registering session", session_id=%session_id.0); + sessions.insert(session_id, tx); + }, + ClientDispatchMethod::ClientCallback(callback) => { + let session_id = callback.session_id(); + if let Some(session_tx) = sessions.get(session_id) { + match session_tx + .send(callback) + .await + { + Ok(()) => (), + Err(SendError(callback)) => callback.fail(acp::Error::internal_error()), + } + } else { + tracing::debug!(actor="client_dispatch", event="session not found", ?session_id); + callback.fail(acp::Error::internal_error()); + } + } + } + } + + tracing::info!("Client dispatch actor shutting down"); + }); + + Self { dispatch_tx } + } + + pub fn register_session( + &self, + session_id: &acp::SessionId, + callback_tx: mpsc::Sender, + ) -> eyre::Result<()> { + self.dispatch_tx + .send(ClientDispatchMethod::RegisterSession(session_id.clone(), callback_tx)) + .map_err(|_send_err| eyre::eyre!("Client dispatch actor has shut down")) + } + + /// Route a callback to the correct place. + pub fn client_callback(&self, callback: ClientCallback) { + tracing::debug!(actor="client_dispatch", event="client callback", ?callback); + match self.dispatch_tx.send(ClientDispatchMethod::ClientCallback(callback)) { + Ok(()) => (), + Err(SendError(ClientDispatchMethod::ClientCallback(callback))) => { + callback.fail(acp::Error::internal_error()) + }, + Err(SendError(_)) => unreachable!(), + } + } +} diff --git a/crates/chat-cli/src/cli/acp/client_session.rs b/crates/chat-cli/src/cli/acp/client_session.rs new file mode 100644 index 0000000000..8d54e765e1 --- /dev/null +++ b/crates/chat-cli/src/cli/acp/client_session.rs @@ -0,0 +1,156 @@ +//! ACP Client Session Handle - Manages individual test sessions + + +use agent_client_protocol::{self as acp, PromptResponse, TextContent}; +use eyre::Result; +use tokio::sync::{mpsc, oneshot}; + +use crate::cli::acp::{ + client_connection::{ClientCallback, ClientConnectionMethod}, + client_dispatch::AcpClientDispatchHandle, + util::ignore_error, +}; + +/// Handle for a specific test session +#[derive(Debug)] +pub struct AcpClientSessionHandle { + session_info: acp::NewSessionResponse, + callback_rx: mpsc::Receiver, + client_tx: mpsc::Sender, +} + +impl AcpClientSessionHandle { + pub(super) async fn new( + session_info: acp::NewSessionResponse, + client_dispatch: &AcpClientDispatchHandle, + client_tx: mpsc::Sender, + ) -> eyre::Result { + let (callback_tx, callback_rx) = mpsc::channel(32); + client_dispatch + .register_session(&session_info.session_id, callback_tx)?; + Ok(Self { + session_info, + callback_rx, + client_tx, + }) + } + + /// Get the session ID for this session + pub fn session_id(&self) -> &acp::SessionId { + &self.session_info.session_id + } + + /// Send a message to the agent and read the complete response + pub async fn prompt(&mut self, message: impl IntoPrompt) -> Result { + // Construct the prompt + let prompt = acp::PromptRequest { + session_id: self.session_info.session_id.clone(), + prompt: message.into_prompt(), + meta: None, + }; + + tracing::debug!(actor="client_session", event="prompt received", ?prompt); + + // Send the prompt over to the client connection. It will send the "stop reason" over + // via the dispatch actor once it is done. + self.client_tx.send(ClientConnectionMethod::Prompt(prompt)).await?; + + // Read notifications until we get the prompt response, then we can return. + let mut response_text = String::new(); + while let Some(client_callback) = self.callback_rx.recv().await { + tracing::debug!(actor="client_session", event="callback received", "session_id"=?self.session_info.session_id, ?client_callback); + match client_callback { + ClientCallback::Notification(notification, tx) => { + self.handle_notification(notification, tx, &mut response_text) + }, + ClientCallback::PromptResponse(session_id, response) => { + assert_eq!(self.session_info.session_id, session_id); + + // Convert abnormal stop-reasons into errors + let PromptResponse { stop_reason, meta: _ } = response?; + match stop_reason { + acp::StopReason::EndTurn => return Ok(response_text), + acp::StopReason::MaxTokens => eyre::bail!("max tokens exceeded"), + acp::StopReason::MaxTurnRequests => eyre::bail!("max turn requests exceeded"), + acp::StopReason::Refusal => eyre::bail!("refused"), + acp::StopReason::Cancelled => eyre::bail!("canceled"), + } + }, + } + } + + eyre::bail!("callback_rx closed before we received stop reason"); + } + + fn handle_notification( + &mut self, + notification: acp::SessionNotification, + tx: oneshot::Sender>, + response_text: &mut String, + ) { + assert_eq!(self.session_info.session_id, notification.session_id); + match notification.update { + acp::SessionUpdate::AgentMessageChunk { content } => { + ignore_error(tx.send(self.push_content(content, response_text))); + return; + }, + acp::SessionUpdate::AgentThoughtChunk { content } => { + response_text.push_str("\n\n"); + let result = self.push_content(content, response_text); + response_text.push_str("\n\n"); + ignore_error(tx.send(result)); + return; + }, + acp::SessionUpdate::UserMessageChunk { content } => { + response_text.push_str("\n\n"); + if let acp::ContentBlock::Text(text_content) = content { + response_text.push_str(&text_content.text); + } + response_text.push_str("\n\n"); + ignore_error(tx.send(Ok(()))); + return; + }, + acp::SessionUpdate::ToolCall(_) + | acp::SessionUpdate::ToolCallUpdate(_) + | acp::SessionUpdate::Plan(_) + | acp::SessionUpdate::AvailableCommandsUpdate { .. } + | acp::SessionUpdate::CurrentModeUpdate { .. } => { + ignore_error(tx.send(Err(acp::Error::internal_error()))); + return; + }, + } + } + + fn push_content(&mut self, content: acp::ContentBlock, response_text: &mut String) -> Result<(), acp::Error> { + match content { + acp::ContentBlock::Text(text_content) => { + response_text.push_str(&text_content.text); + Ok(()) + }, + acp::ContentBlock::Image(_) + | acp::ContentBlock::Audio(_) + | acp::ContentBlock::ResourceLink(_) + | acp::ContentBlock::Resource(_) => Err(acp::Error::internal_error()), + } + } +} + +pub trait IntoPrompt { + fn into_prompt(self) -> Vec; +} + +impl IntoPrompt for String { + fn into_prompt(self) -> Vec { + vec![acp::ContentBlock::Text(TextContent { + annotations: None, + text: self, + meta: None, + })] + } +} + +impl IntoPrompt for &str { + fn into_prompt(self) -> Vec { + self.to_string().into_prompt() + } +} diff --git a/crates/chat-cli/src/cli/acp/server.rs b/crates/chat-cli/src/cli/acp/server.rs new file mode 100644 index 0000000000..015c801cc5 --- /dev/null +++ b/crates/chat-cli/src/cli/acp/server.rs @@ -0,0 +1,309 @@ +//! ACP Server Actor - Top-level coordinator that manages sessions + +use std::collections::HashMap; +use std::sync::Arc; + +use agent_client_protocol as acp; +use serde_json::value::RawValue; +use tokio::sync::{mpsc, oneshot}; + +use crate::{cli::acp::util::ignore_error, os::Os}; +use super::{server_session::AcpServerSessionHandle, server_connection::AcpServerConnectionHandle}; + +/// Convert channel errors to ACP errors +fn channel_to_acp_error(_err: E) -> acp::Error { + acp::Error::internal_error() +} + +/// Handle to the ACP "server" actor. +/// +/// This actor receives messages modeled after the ACP server methods and processes them. +/// It follows the typical oneshot-based RPC method but also takes a +/// [`ACPServerConnectionHandle`][] that it can to send notifications back over the transport. +#[derive(Clone)] +pub struct AcpServerHandle { + server_tx: mpsc::Sender, +} + +/// Messages sent to the server actor +/// +/// Each variant contains: +/// - Request parameters (the input) +/// - oneshot::Sender (the "return address" where the actor sends the response back) +#[derive(Debug)] +enum ServerMethod { + Initialize(acp::InitializeRequest, oneshot::Sender>), + Authenticate(acp::AuthenticateRequest, oneshot::Sender>), + NewSession(acp::NewSessionRequest, oneshot::Sender>), + LoadSession(acp::LoadSessionRequest, oneshot::Sender>), + SetSessionMode(acp::SetSessionModeRequest, oneshot::Sender>), + Prompt(acp::PromptRequest, oneshot::Sender>), + Cancel(acp::CancelNotification, oneshot::Sender>), + ExtMethod(Arc, Arc, oneshot::Sender, acp::Error>>), + ExtNotification(Arc, Arc, oneshot::Sender>), +} + +impl AcpServerHandle { + pub fn spawn(agent_name: String, os: Os, transport: AcpServerConnectionHandle) -> Self { + let (server_tx, mut server_rx) = mpsc::channel(32); + + tokio::task::spawn_local(async move { + let mut sessions: HashMap = HashMap::new(); + + while let Some(method) = server_rx.recv().await { + tracing::debug!(actor="server", event="method call received", ?method); + match method { + ServerMethod::Initialize(args, tx) => { + let response = Self::handle_initialize(args).await; + if tx.send(response).is_err() { + tracing::debug!(actor="server", event="response receiver dropped", method="initialize"); + break; + } + } + ServerMethod::Authenticate(args, tx) => { + let response = Self::handle_authenticate(args).await; + if tx.send(response).is_err() { + tracing::debug!(actor="server", event="response receiver dropped", method="authenticate"); + break; + } + } + ServerMethod::NewSession(args, tx) => { + let response = Self::handle_new_session(args, &agent_name, &os, &mut sessions, &transport).await; + if tx.send(response).is_err() { + tracing::debug!(actor="server", event="response receiver dropped", method="new_session"); + break; + } + } + ServerMethod::LoadSession(args, tx) => { + let response = Self::handle_load_session(args, &sessions).await; + if tx.send(response).is_err() { + tracing::debug!(actor="server", event="response receiver dropped", method="load_session"); + break; + } + } + ServerMethod::SetSessionMode(args, tx) => { + let response = Self::handle_set_session_mode(args, &sessions).await; + if tx.send(response).is_err() { + tracing::debug!(actor="server", event="response receiver dropped", method="set_session_mode"); + break; + } + } + ServerMethod::Prompt(args, tx) => { + Self::handle_prompt(args, tx, &sessions).await; + } + ServerMethod::Cancel(args, tx) => { + let response = Self::handle_cancel(args, &sessions).await; + if tx.send(response).is_err() { + tracing::debug!(actor="server", event="response receiver dropped", method="cancel"); + break; + } + } + ServerMethod::ExtMethod(method, params, tx) => { + let response = Self::handle_ext_method(method, params).await; + if tx.send(response).is_err() { + tracing::debug!(actor="server", event="response receiver dropped", method="ext_method"); + break; + } + } + ServerMethod::ExtNotification(method, params, tx) => { + let response = Self::handle_ext_notification(method, params).await; + if tx.send(response).is_err() { + tracing::debug!(actor="server", event="response receiver dropped", method="ext_notification"); + break; + } + } + } + } + + tracing::info!("Server actor shutting down"); + }); + + Self { server_tx } + } + + pub async fn initialize(&self, args: acp::InitializeRequest) -> Result { + let (tx, rx) = oneshot::channel(); + self.server_tx.send(ServerMethod::Initialize(args, tx)).await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + pub async fn authenticate(&self, args: acp::AuthenticateRequest) -> Result { + let (tx, rx) = oneshot::channel(); + self.server_tx.send(ServerMethod::Authenticate(args, tx)).await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + pub async fn new_session(&self, args: acp::NewSessionRequest) -> Result { + let (tx, rx) = oneshot::channel(); + self.server_tx.send(ServerMethod::NewSession(args, tx)).await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + pub async fn load_session(&self, args: acp::LoadSessionRequest) -> Result { + let (tx, rx) = oneshot::channel(); + self.server_tx.send(ServerMethod::LoadSession(args, tx)).await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + pub async fn set_session_mode(&self, args: acp::SetSessionModeRequest) -> Result { + let (tx, rx) = oneshot::channel(); + self.server_tx.send(ServerMethod::SetSessionMode(args, tx)).await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + pub async fn prompt(&self, args: acp::PromptRequest) -> Result { + let (tx, rx) = oneshot::channel(); + self.server_tx.send(ServerMethod::Prompt(args, tx)).await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> { + let (tx, rx) = oneshot::channel(); + self.server_tx.send(ServerMethod::Cancel(args, tx)).await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + pub async fn ext_method(&self, method: Arc, params: Arc) -> Result, acp::Error> { + let (tx, rx) = oneshot::channel(); + self.server_tx.send(ServerMethod::ExtMethod(method, params, tx)).await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + pub async fn ext_notification(&self, method: Arc, params: Arc) -> Result<(), acp::Error> { + let (tx, rx) = oneshot::channel(); + self.server_tx.send(ServerMethod::ExtNotification(method, params, tx)).await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + async fn handle_initialize(_args: acp::InitializeRequest) -> Result { + Ok(acp::InitializeResponse { + protocol_version: acp::V1, + agent_capabilities: acp::AgentCapabilities::default(), + auth_methods: Vec::new(), + meta: None, + }) + } + + async fn handle_authenticate(_args: acp::AuthenticateRequest) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn handle_new_session( + _args: acp::NewSessionRequest, + _agent_name: &str, + os: &Os, + sessions: &mut HashMap, + transport: &AcpServerConnectionHandle, + ) -> Result { + // Generate a new session ID + let session_id = uuid::Uuid::new_v4().to_string(); + let acp_session_id = acp::SessionId(session_id.clone().into()); + + tracing::info!("Creating new ACP session: {}", session_id); + + // FIXME: we need to take `_args` into account + + // Spawn session actor with transport handle + let session_handle = AcpServerSessionHandle::spawn_local(acp_session_id.clone(), os.clone(), transport.clone()); + + // Store session handle + sessions.insert(session_id.clone(), session_handle); + + tracing::info!("Created new ACP session: {}", session_id); + + Ok(acp::NewSessionResponse { + session_id: acp_session_id, + modes: None, + meta: None, + }) + } + + async fn handle_load_session( + args: acp::LoadSessionRequest, + sessions: &HashMap, + ) -> Result { + let session_id = args.session_id.0.as_ref(); + + // Check if session exists + if sessions.contains_key(session_id) { + tracing::info!("Loaded existing ACP session: {}", session_id); + Ok(acp::LoadSessionResponse { + modes: None, + meta: None, + }) + } else { + // FIXME: we need to load the session from the database and replay it + tracing::warn!("Session not found: {}", session_id); + Err(acp::Error::invalid_params()) + } + } + + async fn handle_set_session_mode( + args: acp::SetSessionModeRequest, + sessions: &HashMap, + ) -> Result { + let session_id = args.session_id.0.as_ref(); + + // Find the session actor + if let Some(session_handle) = sessions.get(session_id) { + // Forward to session actor + session_handle.set_mode(args).await + } else { + tracing::warn!("Session not found for set_mode: {}", session_id); + Err(acp::Error::invalid_params()) + } + } + + async fn handle_prompt( + args: acp::PromptRequest, + prompt_tx: oneshot::Sender>, + sessions: &HashMap, + ) { + let session_id = args.session_id.0.as_ref(); + + // Find the session actor + if let Some(session_handle) = sessions.get(session_id) { + // Forward to session actor. Importantly, this actor is responsible + // for sending the final result from the prompt to `prompt_tx` -- we just + // return immediately. + session_handle.prompt(args, prompt_tx).await; + } else { + tracing::warn!("Session not found for prompt: {}", session_id); + ignore_error(prompt_tx.send(Err(acp::Error::invalid_params()))) + } + } + + async fn handle_cancel( + args: acp::CancelNotification, + sessions: &HashMap, + ) -> Result<(), acp::Error> { + let session_id = args.session_id.0.as_ref(); + + // Find the session actor + if let Some(session_handle) = sessions.get(session_id) { + // Forward to session actor + session_handle.cancel(args).await + } else { + tracing::warn!("Session not found for cancel: {}", session_id); + // Cancel is a notification, so we don't return an error + Ok(()) + } + } + + async fn handle_ext_method(_method: Arc, _params: Arc) -> Result, acp::Error> { + Err(acp::Error::method_not_found()) + } + + async fn handle_ext_notification(_method: Arc, _params: Arc) -> Result<(), acp::Error> { + Ok(()) + } +} diff --git a/crates/chat-cli/src/cli/acp/server_connection.rs b/crates/chat-cli/src/cli/acp/server_connection.rs new file mode 100644 index 0000000000..1fa3cfc5db --- /dev/null +++ b/crates/chat-cli/src/cli/acp/server_connection.rs @@ -0,0 +1,149 @@ +//! ACP Transport Actor - Owns the ACP connection and handles notifications + +use std::sync::Arc; + +use agent_client_protocol::{self as acp, Client}; +use eyre::Result; +use futures::{AsyncRead, AsyncWrite}; +use serde_json::value::RawValue; +use tokio::sync::{mpsc, oneshot}; + +use crate::{ + cli::acp::{AcpServerHandle}, + os::Os, +}; + +/// Handle to the "server connection" actor, which owns the actual connection to the underlying transport. +#[derive(Clone)] +pub struct AcpServerConnectionHandle { + transport_tx: mpsc::Sender, +} + +/// Messages sent to the transport actor +#[derive(Debug)] +enum TransportMethod { + SessionNotification(acp::SessionNotification, oneshot::Sender>), +} + +impl AcpServerConnectionHandle { + /// Execute a new ACP "server connection" actor that + /// accepts server requests over `incoming_bytes` + /// and responds over `outgoing_bytes`. + pub async fn execute( + agent_name: String, + os: &Os, + outgoing_bytes: impl Unpin + AsyncWrite, + incoming_bytes: impl Unpin + AsyncRead, + ) -> eyre::Result<()> { + let (transport_tx, mut transport_rx) = mpsc::channel(32); + + // Create the handle to the (yet to be launched) transport actor. + let transport_handle = Self { transport_tx }; + + // Spawn the server actor with transport handle + let server_handle = AcpServerHandle::spawn(agent_name, os.clone(), transport_handle.clone()); + + // Create connection to bytes + let (connection, handle_io) = agent_client_protocol::AgentSideConnection::new( + AcpAgentForward::new(server_handle), + outgoing_bytes, + incoming_bytes, + |fut| { + tokio::task::spawn_local(fut); + }, + ); + + // Launch the "transport actor", which owns the connection. + tokio::task::spawn_local(async move { + tracing::debug!(actor="server_connection", event="started"); + + while let Some(method) = transport_rx.recv().await { + tracing::debug!(actor="server_connection", event="message received", ?method); + match method { + TransportMethod::SessionNotification(notification, tx) => { + let result = connection.session_notification(notification).await; + tracing::debug!(actor="server_connection", event="notification delivered"); + if tx.send(result).is_err() { + tracing::debug!(actor="server_connection", event="response receiver dropped"); + } + }, + } + } + + tracing::info!("Transport actor shutting down"); + }); + + match handle_io.await { + Ok(()) => Ok(()), + Err(err) => eyre::bail!("{err}"), + } + } + + pub async fn session_notification(&self, notification: acp::SessionNotification) -> Result<()> { + tracing::debug!(actor="server_connection", event="session_notification", ?notification); + let (tx, rx) = oneshot::channel(); + self.transport_tx + .send(TransportMethod::SessionNotification(notification, tx)) + .await + .map_err(|_send_err| eyre::eyre!("Transport actor has shut down"))?; + let acp_result = rx.await.map_err(|_recv_err| eyre::eyre!("Transport actor dropped response"))?; + acp_result.map_err(|e| eyre::eyre!("ACP error: {:?}", e))?; + tracing::debug!(actor="server_connection", event="session_notification succeeded"); + Ok(()) + } +} + +/// Forwarding implementation of acp::Agent that sends all calls to server actor +struct AcpAgentForward { + server_handle: AcpServerHandle, +} + +impl AcpAgentForward { + pub fn new(server_handle: AcpServerHandle) -> Self { + Self { server_handle } + } +} + +impl acp::Agent for AcpAgentForward { + async fn initialize(&self, arguments: acp::InitializeRequest) -> Result { + self.server_handle.initialize(arguments).await + } + + async fn authenticate(&self, arguments: acp::AuthenticateRequest) -> Result { + self.server_handle.authenticate(arguments).await + } + + async fn new_session(&self, arguments: acp::NewSessionRequest) -> Result { + self.server_handle.new_session(arguments).await + } + + async fn load_session(&self, arguments: acp::LoadSessionRequest) -> Result { + self.server_handle.load_session(arguments).await + } + + async fn prompt(&self, arguments: acp::PromptRequest) -> Result { + tracing::debug!(actor="server_connection", event="prompt", ?arguments); + let result = self.server_handle.prompt(arguments).await; + tracing::debug!(actor="server_connection", event="prompt complete", ?result); + result + } + + async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> { + self.server_handle.cancel(args).await + } + + async fn set_session_mode( + &self, + args: acp::SetSessionModeRequest, + ) -> Result { + self.server_handle.set_session_mode(args).await + } + + async fn ext_method(&self, method: Arc, params: Arc) -> Result, acp::Error> { + self.server_handle.ext_method(method, params).await + } + + async fn ext_notification(&self, method: Arc, params: Arc) -> Result<(), acp::Error> { + self.server_handle.ext_notification(method, params).await + } +} diff --git a/crates/chat-cli/src/cli/acp/server_session.rs b/crates/chat-cli/src/cli/acp/server_session.rs new file mode 100644 index 0000000000..ada644e352 --- /dev/null +++ b/crates/chat-cli/src/cli/acp/server_session.rs @@ -0,0 +1,330 @@ +//! ACP Session Actor - Per-session actor that owns conversation state + +use agent_client_protocol as acp; +use eyre::Result; +use tokio::sync::mpsc::error::SendError; +use std::collections::HashMap; +use tokio::sync::{mpsc, oneshot}; + +use super::server_connection::AcpServerConnectionHandle; +use crate::cli::acp::util::ignore_error; +use crate::cli::chat::{ConversationState, ResponseEvent, SendMessageStream}; +use crate::os::Os; + +/// Convert channel errors to ACP errors +fn channel_to_acp_error(_err: E) -> acp::Error { + acp::Error::internal_error() +} + +/// Handle to a session actor +#[derive(Clone)] +pub struct AcpServerSessionHandle { + session_tx: mpsc::Sender, +} + +/// Messages sent to session actors +#[derive(Debug)] +enum ServerSessionMethod { + Prompt( + acp::PromptRequest, + oneshot::Sender>, + ), + Cancel(acp::CancelNotification, oneshot::Sender>), + SetMode( + acp::SetSessionModeRequest, + oneshot::Sender>, + ), +} + +impl ServerSessionMethod { + // Respond to the embedded channel (if any) with an error. + pub fn send_error(self) { + match self { + ServerSessionMethod::Prompt(_, sender) => ignore_error(sender.send(Err(acp::Error::internal_error()))), + ServerSessionMethod::Cancel(_, sender) => ignore_error(sender.send(Err(acp::Error::internal_error()))), + ServerSessionMethod::SetMode(_, sender) => ignore_error(sender.send(Err(acp::Error::internal_error()))), + } + } +} + +impl AcpServerSessionHandle { + pub fn spawn_local(session_id: acp::SessionId, mut os: Os, transport: AcpServerConnectionHandle) -> Self { + let (session_tx, mut session_rx) = mpsc::channel(32); + + tokio::task::spawn_local(async move { + tracing::debug!(actor="session", event="started", session_id=%session_id.0); + + let mut conversation_state = ConversationState::new( + &session_id.0, + Default::default(), // agents + HashMap::new(), // tool_config + Default::default(), // tool_manager + None, // current_model_id + &os, + false, // mcp_enabled + ) + .await; + + while let Some(method) = session_rx.recv().await { + match method { + ServerSessionMethod::Prompt(args, tx) => { + let response = + Self::handle_prompt(args, &transport, &mut os, &mut conversation_state, &mut session_rx) + .await; + if tx.send(response).is_err() { + tracing::debug!(actor="session", event="response receiver dropped", method="prompt", session_id=%session_id.0); + break; + } + }, + ServerSessionMethod::Cancel(args, tx) => { + let response = Self::handle_cancel(args).await; + if tx.send(response).is_err() { + tracing::debug!(actor="session", event="response receiver dropped", method="cancel", session_id=%session_id.0); + break; + } + }, + ServerSessionMethod::SetMode(args, tx) => { + let response = Self::handle_set_mode(args).await; + if tx.send(response).is_err() { + tracing::debug!(actor="session", event="response receiver dropped", method="set_mode", session_id=%session_id.0); + break; + } + }, + } + } + + tracing::info!("Session actor shutting down for session: {}", session_id.0); + }); + + Self { session_tx } + } + + /// Sends the prompt `args` to this session. Returns immediately. + /// The final result from prompt processing is sent to `prompt_tx`. + pub async fn prompt( + &self, + args: acp::PromptRequest, + prompt_tx: oneshot::Sender>, + ) { + let msg = self.session_tx + .send(ServerSessionMethod::Prompt(args, prompt_tx)) + .await; + match msg { + Ok(()) => {} + + Err(SendError(cancel)) => { + cancel.send_error(); + } + } + } + + pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> { + let (tx, rx) = oneshot::channel(); + self.session_tx + .send(ServerSessionMethod::Cancel(args, tx)) + .await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + pub async fn set_mode(&self, args: acp::SetSessionModeRequest) -> Result { + let (tx, rx) = oneshot::channel(); + self.session_tx + .send(ServerSessionMethod::SetMode(args, tx)) + .await + .map_err(channel_to_acp_error)?; + rx.await.map_err(channel_to_acp_error)? + } + + async fn handle_prompt( + args: acp::PromptRequest, + transport: &AcpServerConnectionHandle, + os: &mut Os, + conversation_state: &mut ConversationState, + session_rx: &mut mpsc::Receiver, + ) -> Result { + tracing::info!("Processing ACP prompt with {} content blocks", args.prompt.len()); + + // Convert ACP prompt to string and set it + let prompt_text = Self::convert_acp_prompt_to_string(args.prompt)?; + conversation_state.set_next_user_message(prompt_text).await; + + // Convert to API format + let mut stderr = std::io::stderr(); + let api_conversation_state = conversation_state + .as_sendable_conversation_state(os, &mut stderr, false) + .await + .map_err(|e| { + tracing::error!("Failed to create sendable conversation state: {}", e); + acp::Error::internal_error() + })?; + + // Send to LLM and stream responses + let mut stream = SendMessageStream::send_message( + &os.client, + api_conversation_state, + std::sync::Arc::new(tokio::sync::Mutex::new(None)), + None, + ) + .await + .map_err(|e| { + tracing::error!("Failed to send message: {}", e); + acp::Error::internal_error() + })?; + + // Stream responses via transport with cancellation support + let end_stream_info = loop { + tokio::select! { + stream_result = stream.recv() => { + match stream_result { + Some(Ok(event)) => { + match event { + ResponseEvent::EndStream { message, request_metadata } => { + // The `message` here cotains all of the accumulated text and tool uses + // that we received so far. + break Some((message, request_metadata)); + } + + _ => { + // Send the notification (this consumes event) + let notification = Self::convert_response_to_acp_notification(&args.session_id, event)?; + if let Err(e) = transport.session_notification(notification).await { + tracing::error!("Failed to send notification: {}", e); + return Err(acp::Error::internal_error()); + } + } + } + } + Some(Err(e)) => { + tracing::error!("Stream error: {:?}", e); + return Err(acp::Error::internal_error()); + } + None => { + // Stream ended unexpectedly + tracing::warn!("Stream ended without EndStream event"); + break None; + } + } + } + msg = session_rx.recv() => { + match msg { + Some(ServerSessionMethod::Cancel(_args, _tx)) => { + tracing::info!("Prompt cancelled for session: {}", args.session_id.0); + // Drop stream to trigger cancellation + drop(stream); + // Reset conversation state + conversation_state.reset_next_user_message(); + // Send cancelled response + return Ok(acp::PromptResponse { + stop_reason: acp::StopReason::Cancelled, + meta: None, + }); + } + Some(other_method) => { + // Respond with error for non-cancel messages during prompt processing + tracing::warn!("Received non-cancel message during prompt processing: {:?}", other_method); + match other_method { + ServerSessionMethod::Prompt(_, tx) => { + let _ = tx.send(Err(acp::Error::invalid_params())); + } + ServerSessionMethod::SetMode(_, tx) => { + let _ = tx.send(Err(acp::Error::invalid_params())); + } + ServerSessionMethod::Cancel(_, _) => { + // This case is already handled above, shouldn't reach here + unreachable!("Cancel should be handled in the previous match arm"); + } + } + } + None => { + // Session is shutting down + tracing::warn!("Session shutting down during prompt processing"); + return Err(acp::Error::internal_error()); + } + } + } + } + }; + + // Add the assistant response to conversation history + if let Some((assistant_message, metadata)) = end_stream_info { + conversation_state.push_assistant_message(os, assistant_message, Some(metadata)); + } + + conversation_state.reset_next_user_message(); + + Ok(acp::PromptResponse { + stop_reason: acp::StopReason::EndTurn, + meta: None, + }) + } + + fn convert_acp_prompt_to_string(prompt: Vec) -> Result { + let mut content = String::new(); + + for block in prompt { + match block { + acp::ContentBlock::Text(text_content) => { + content.push_str(&text_content.text); + content.push('\n'); + }, + _ => { + tracing::warn!("Unsupported ACP content block type, skipping"); + }, + } + } + + Ok(content.trim().to_string()) + } + + fn convert_response_to_acp_notification( + session_id: &acp::SessionId, + event: ResponseEvent, + ) -> Result { + let content = match event { + ResponseEvent::AssistantText(text) => acp::ContentBlock::Text(acp::TextContent { + text, + annotations: None, + meta: None, + }), + ResponseEvent::ToolUseStart { name } => acp::ContentBlock::Text(acp::TextContent { + text: format!("[Tool: {}]", name), + annotations: None, + meta: None, + }), + ResponseEvent::ToolUse(_tool_use) => { + // TODO: Convert tool use to proper ACP format + acp::ContentBlock::Text(acp::TextContent { + text: "[Tool execution]".to_string(), + annotations: None, + meta: None, + }) + }, + ResponseEvent::EndStream { .. } => { + // This shouldn't be called for EndStream + return Err(acp::Error::internal_error()); + }, + }; + + Ok(acp::SessionNotification { + session_id: session_id.clone(), + update: acp::SessionUpdate::AgentMessageChunk { content }, + meta: None, + }) + } + + async fn handle_cancel(args: acp::CancelNotification) -> Result<(), acp::Error> { + // When no prompt is active, cancellation is a no-op but we log it + tracing::debug!("Cancel request received outside prompt processing: {:?}", args); + + // Per ACP RFC: "Q CLI's current ACP implementation has cancellation as a no-op" + // When no prompt is active, there's nothing to cancel, so this succeeds + // The real cancellation logic is handled in the prompt processing loop above + Ok(()) + } + + async fn handle_set_mode(_args: acp::SetSessionModeRequest) -> Result { + // TODO: Set session mode + Err(acp::Error::method_not_found()) + } +} diff --git a/crates/chat-cli/src/cli/acp/tests.rs b/crates/chat-cli/src/cli/acp/tests.rs new file mode 100644 index 0000000000..239f210423 --- /dev/null +++ b/crates/chat-cli/src/cli/acp/tests.rs @@ -0,0 +1,339 @@ +//! ACP Actor System Tests using the new clean client-side actors + +use std::path::PathBuf; + +use crate::{ + cli::acp::{ + client_connection::AcpClientConnectionHandle, server_connection::AcpServerConnectionHandle, + }, + mock_llm::MockLLMContext, + os::Os, +}; +use agent_client_protocol as acp; +use tokio::task::LocalSet; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +/// Clean test harness that mirrors the main ACP setup pattern +pub struct AcpTestHarness { + os: Os, +} + +impl AcpTestHarness { + /// Create a new test harness with a mock OS + pub async fn new() -> eyre::Result { + Ok(Self { os: Os::new().await? }) + } + + /// Set up a mock LLM script for deterministic testing + pub fn set_mock_llm(mut self, script: F) -> Self + where + F: Fn(MockLLMContext) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + Sync + 'static, + { + self.os.client.set_mock_llm(script); + self + } + + /// Spawn the test system and return a client handle + /// This mirrors the clean pattern from the main ACP setup + pub async fn run(self, test: impl AsyncFnOnce(AcpClientConnectionHandle) -> eyre::Result<()>) -> eyre::Result<()> { + LocalSet::new().run_until(async move { + // Create duplex streams for communication (like the main setup uses stdio) + let (client_write, agent_read) = tokio::io::duplex(1024); + let (agent_write, client_read) = tokio::io::duplex(1024); + + // Spawn the server side + tokio::task::spawn_local(async move { + if let Err(e) = AcpServerConnectionHandle::execute( + "test-agent".to_string(), + &self.os, + agent_write.compat_write(), + agent_read.compat(), + ) + .await + { + tracing::error!("ACP server failed: {}", e); + } + }); + + // Spawn the client side and return the handle + let handle = + AcpClientConnectionHandle::spawn_local(client_write.compat_write(), client_read.compat()).await?; + + // For now initialize with no capabilities + handle + .initialize(acp::InitializeRequest { + protocol_version: acp::ProtocolVersion::default(), + client_capabilities: acp::ClientCapabilities::default(), + meta: None, + }) + .await?; + + test(handle).await + }) + .await + } +} + +#[tokio::test] +async fn test_acp_actor_system_conversation() -> eyre::Result<()> { + AcpTestHarness::new() + .await? + .set_mock_llm(|mut ctx: MockLLMContext| async move { + // Use declarative pattern matching API - much cleaner! + ctx.try_patterns(&[ + // First exchange: Greet and ask for name + (&[], r"Hi, Claude", "Hi, you! What's your name?"), + + // Second exchange: Capture name and respond personally + (&[r"^assistant:.*What's your name"], r"--- USER MESSAGE BEGIN ---\s*(?P\w+)", "Hi $name, I'm Q!"), + + // Fallback for any unrecognized input + (&[], r".*", "I didn't understand that."), + ]).await + }) + .run(async |client| { + let mut session = client + .new_session(acp::NewSessionRequest { + cwd: PathBuf::new(), + mcp_servers: vec![], + meta: None, + }) + .await?; + + // First turn: User says "Hi, Claude" + let response = session.prompt("Hi, Claude").await?; + assert_eq!(response, "Hi, you! What's your name?"); + + // Second turn: User says "Ferris" + let response = session.prompt("Ferris").await?; + assert_eq!(response, "Hi Ferris, I'm Q!"); + + Ok(()) + }) + .await +} + +#[tokio::test] +async fn test_acp_cancel_during_prompt() -> eyre::Result<()> { + // Create a coordination channel for mock LLM and test coordination + let (coordination_tx, mut coordination_rx) = tokio::sync::mpsc::channel::(1); + + AcpTestHarness::new() + .await? + .set_mock_llm(move |mut ctx: MockLLMContext| { + let coordination_tx = coordination_tx.clone(); + async move { + // Send coordination signal and then block until canceled + coordination_tx.send("ready".to_string()).await.ok(); + ctx.block_until_canceled("Starting response...").await + } + }) + .run(async |client| { + let mut session = client + .new_session(acp::NewSessionRequest { + cwd: PathBuf::new(), + mcp_servers: vec![], + meta: None, + }) + .await?; + + let session_id = session.session_id().clone(); + + // Start prompt and cancellation concurrently + let (prompt_result, _) = tokio::join!( + session.prompt("test message"), + async { + // Wait for mock LLM to signal it's ready + tracing::debug!("waiting for coordination rx"); + if let Some(signal) = coordination_rx.recv().await { + tracing::debug!("received: {signal:?}"); + assert_eq!(signal, "ready"); + + // Now send the cancel + let result = client.cancel(acp::CancelNotification { + session_id, + meta: None, + }).await; + + tracing::debug!("cancellation method invoked: {result:?}"); + + // Cancel should succeed + assert!(result.is_ok()); + } else { + panic!("never got coordination signal"); + } + } + ); + + // The prompt should fail with cancellation + match prompt_result { + Err(e) => { + // Should be canceled error + assert!(e.to_string().contains("canceled"), "Expected cancellation error, got: {}", e); + }, + Ok(response) => { + panic!("Expected prompt to be canceled, but got response: {}", response); + } + } + + Ok(()) + }) + .await +} + +#[tokio::test] +async fn test_acp_cancel_outside_prompt() -> eyre::Result<()> { + AcpTestHarness::new() + .await? + .run(async |client| { + let session = client + .new_session(acp::NewSessionRequest { + cwd: PathBuf::new(), + mcp_servers: vec![], + meta: None, + }) + .await?; + + let session_id = session.session_id().clone(); + + // Cancel when no prompt is active - should succeed as a no-op + let result = client.cancel(acp::CancelNotification { + session_id, + meta: None, + }).await; + + // Should succeed even when there's nothing to cancel + assert!(result.is_ok()); + + Ok(()) + }) + .await +} + +#[tokio::test] +async fn test_acp_cancel_cross_session_isolation() -> eyre::Result<()> { + // Create coordination channels for both mock LLMs + let (coordination_tx_a, mut coordination_rx_a) = tokio::sync::mpsc::channel::(1); + let (coordination_tx_b, mut coordination_rx_b) = tokio::sync::mpsc::channel::(1); + + AcpTestHarness::new() + .await? + .set_mock_llm(move |mut ctx: MockLLMContext| { + let coordination_tx_a = coordination_tx_a.clone(); + let coordination_tx_b = coordination_tx_b.clone(); + async move { + // Determine which session this is based on the message content + tracing::debug!("Mock LLM starting up: {:?}", ctx.current_user_message()); + if ctx.current_user_message().contains("session_a: will get canceled") { + coordination_tx_a.send("session_a_ready".to_string()).await.ok(); + ctx.block_until_canceled("Session A processing...").await + } else if ctx.current_user_message().contains("session_b: will get canceled") { + coordination_tx_b.send("session_b_ready".to_string()).await.ok(); + ctx.block_until_canceled("Session B processing...").await + } else if ctx.current_user_message().contains("session_a: after cancellation") { + ctx.respond("session_a recovered ok").await + } else if ctx.current_user_message().contains("session_b: after cancellation") { + ctx.respond("session_b recovered ok").await + } else { + ctx.respond("I don't understand").await + } + } + }) + .run(async |client| { + // Create two separate sessions + let mut session_a = client + .new_session(acp::NewSessionRequest { + cwd: PathBuf::new(), + mcp_servers: vec![], + meta: None, + }) + .await?; + + let mut session_b = client + .new_session(acp::NewSessionRequest { + cwd: PathBuf::new(), + mcp_servers: vec![], + meta: None, + }) + .await?; + + let session_a_id = session_a.session_id().clone(); + let session_b_id = session_b.session_id().clone(); + + // Verify sessions have different IDs + assert_ne!(session_a_id, session_b_id); + + // Start prompts on both sessions concurrently, then test cross-session cancellation + let (session_a_result, session_b_result, _) = tokio::join!( + session_a.prompt("session_a: will get canceled"), + session_b.prompt("session_b: will get canceled"), + async { + // Wait for both sessions to be ready + tracing::debug!("Waiting for both sessions to be ready"); + + let signal_a = coordination_rx_a.recv().await; + let signal_b = coordination_rx_b.recv().await; + + assert_eq!(signal_a, Some("session_a_ready".to_string())); + assert_eq!(signal_b, Some("session_b_ready".to_string())); + + tracing::debug!("Both sessions ready, testing cross-session cancellation"); + + // Try to cancel session A using session B's ID - this should be treated as a no-op + // since session B isn't in the middle of processing a prompt from our perspective + // (it's processing, but the cancel is targeted at the wrong session) + let wrong_cancel_result = client.cancel(acp::CancelNotification { + session_id: session_b_id.clone(), // Wrong session ID + meta: None, + }).await; + + // The cancel should succeed (as a no-op for the wrong session) + // but session A should continue processing + assert!(wrong_cancel_result.is_ok()); + + // Now cancel the correct session A + let correct_cancel_result = client.cancel(acp::CancelNotification { + session_id: session_a_id.clone(), + meta: None, + }).await; + + assert!(correct_cancel_result.is_ok()); + tracing::debug!("Sent cancellation for session A"); + + // Also cancel session B for cleanup + let cancel_b_result = client.cancel(acp::CancelNotification { + session_id: session_b_id.clone(), + meta: None, + }).await; + + assert!(cancel_b_result.is_ok()); + tracing::debug!("Sent cancellation for session B"); + } + ); + + // Both sessions should be canceled + assert!(session_a_result.is_err()); + assert!(session_b_result.is_err()); + + let session_a_error = session_a_result.unwrap_err(); + let session_b_error = session_b_result.unwrap_err(); + + assert!(session_a_error.to_string().contains("canceled"), + "Session A should be canceled, got: {}", session_a_error); + assert!(session_b_error.to_string().contains("canceled"), + "Session B should be canceled, got: {}", session_b_error); + + // Verify both sessions can still accept new prompts after cancellation + let session_a_recovery = session_a.prompt("session_a: after cancellation").await?; + let session_b_recovery = session_b.prompt("session_b: after cancellation").await?; + + // These should succeed (mock LLM will respond normally to non-blocking messages) + // Note: Since we don't have coordination signals for these, they should complete normally + assert_eq!(session_a_recovery, "session_a recovered ok"); + assert_eq!(session_b_recovery, "session_b recovered ok"); + + Ok(()) + }) + .await +} diff --git a/crates/chat-cli/src/cli/acp/util.rs b/crates/chat-cli/src/cli/acp/util.rs new file mode 100644 index 0000000000..368d7e3a43 --- /dev/null +++ b/crates/chat-cli/src/cli/acp/util.rs @@ -0,0 +1,6 @@ +//! Utilities for ACP actor implementation + +/// Ignore errors from a Result, useful for oneshot sends where receiver may be dropped +pub fn ignore_error(result: Result<(), E>) { + let _ = result; +} diff --git a/crates/chat-cli/src/cli/chat/conversation.rs b/crates/chat-cli/src/cli/chat/conversation.rs index 022e42e74d..fe9691a8ed 100644 --- a/crates/chat-cli/src/cli/chat/conversation.rs +++ b/crates/chat-cli/src/cli/chat/conversation.rs @@ -370,14 +370,17 @@ impl ConversationState { Some(stringify_prompt_message_content(last_msg.content)) } + /// Read the [`Self::next_message`][] field. pub fn next_user_message(&self) -> Option<&UserMessage> { self.next_message.as_ref() } + /// Clear the [`Self::next_message`][] field to `None`. pub fn reset_next_user_message(&mut self) { self.next_message = None; } + /// Set the [`Self::next_message`][] field; it should be cleared. pub async fn set_next_user_message(&mut self, input: String) { debug_assert!(self.next_message.is_none(), "next_message should not exist"); if let Some(next_message) = self.next_message.as_ref() { diff --git a/crates/chat-cli/src/cli/chat/mod.rs b/crates/chat-cli/src/cli/chat/mod.rs index aa5540d645..a1c75580b8 100644 --- a/crates/chat-cli/src/cli/chat/mod.rs +++ b/crates/chat-cli/src/cli/chat/mod.rs @@ -59,6 +59,8 @@ use cli::model::{ select_model, }; pub use conversation::ConversationState; +pub use parser::{SendMessageStream, RequestMetadata, ResponseEvent}; +pub use message::{AssistantMessage, AssistantToolUse}; use conversation::TokenWarningLevel; use crossterm::style::{ Attribute, @@ -80,8 +82,6 @@ use eyre::{ }; use input_source::InputSource; use message::{ - AssistantMessage, - AssistantToolUse, ToolUseResult, ToolUseResultBlock, }; @@ -91,8 +91,6 @@ use parse::{ }; use parser::{ RecvErrorKind, - RequestMetadata, - SendMessageStream, }; use regex::Regex; use rmcp::model::PromptMessage; @@ -142,7 +140,6 @@ use winnow::stream::Offset; use super::agent::{ Agent, DEFAULT_AGENT_NAME, - PermissionEvalResult, }; use crate::api_client::model::ToolResultStatus; use crate::api_client::{ @@ -153,6 +150,13 @@ use crate::auth::AuthError; use crate::auth::builder_id::is_idc_user; use crate::cli::TodoListState; use crate::cli::agent::Agents; +use crate::cli::tool::{ + evaluate_tool_permissions, + ToolPermissionResult, +}; + +pub mod permission; +use permission::PermissionInterface; use crate::cli::chat::checkpoint::{ CheckpointManager, truncate_message, @@ -568,9 +572,11 @@ pub struct ChatSession { initial_input: Option, /// Whether we're starting a new conversation or continuing an old one. existing_conversation: bool, + /// Where to read input from; could be the terminal, could be a mock. input_source: InputSource, /// Width of the terminal, required for [ParseState]. terminal_width_provider: fn() -> Option, + /// Spinner state, if we are displaying a spinner. spinner: Option, /// [ConversationState]. conversation: ConversationState, @@ -593,8 +599,10 @@ pub struct ChatSession { failed_request_ids: Vec, /// Pending prompts to be sent pending_prompts: VecDeque, + /// Are we accepting user input? interactive: bool, inner: Option, + /// Receives a message when user hits C-c. ctrlc_rx: broadcast::Receiver<()>, wrap: Option, } @@ -716,6 +724,8 @@ impl ChatSession { // Update conversation state with new tool information self.conversation.update_state(false).await; + tracing::debug!(func="next", state = ?self.inner); + let mut ctrl_c_stream = self.ctrlc_rx.resubscribe(); let result = match self.inner.take().expect("state must always be Some") { ChatState::PromptUser { skip_printing_tools } => { @@ -1048,10 +1058,7 @@ impl ChatSession { )?; } - self.conversation.enforce_conversation_invariants(); - self.conversation.reset_next_user_message(); - self.pending_tool_index = None; - self.tool_turn_start_time = None; + self.reset_user_message_and_pending_tool_use(); self.reset_user_turn(); self.inner = Some(ChatState::PromptUser { @@ -1061,6 +1068,15 @@ impl ChatSession { Ok(()) } + /// Clear our the state associated with the user message, including + /// pending tool user and so forth. Used when resetting back to a "ground" state (e.g., PromptUser). + fn reset_user_message_and_pending_tool_use(&mut self) { + self.conversation.enforce_conversation_invariants(); + self.conversation.reset_next_user_message(); + self.pending_tool_index = None; + self.tool_turn_start_time = None; + } + async fn show_changelog_announcement(&mut self, os: &mut Os) -> Result<()> { let current_version = env!("CARGO_PKG_VERSION"); let last_version = os.database.get_changelog_last_version()?; @@ -1096,6 +1112,13 @@ impl ChatSession { .await .map_err(|e| ChatError::Custom(format!("Failed to update tool spec: {e}").into())) } + + /// Creates the appropriate permission interface for this chat session + fn create_permission_interface<'a>(&self, os: &'a Os) -> permission::console::ConsolePermissionInterface<'a> { + permission::console::ConsolePermissionInterface { + os, + } + } } impl Drop for ChatSession { @@ -2155,6 +2178,8 @@ impl ChatSession { } async fn tool_use_execute(&mut self, os: &mut Os) -> Result { + tracing::debug!(func = "tool_use_execute"); + // Check if we should auto-enter tangent mode for introspect tool if ExperimentManager::is_enabled(os, ExperimentName::TangentMode) && os @@ -2171,84 +2196,50 @@ impl ChatSession { self.conversation.enter_tangent_mode(); } - // Verify tools have permissions. - for i in 0..self.tool_uses.len() { - let tool = &mut self.tool_uses[i]; - - // Manually accepted by the user or otherwise verified already. - if tool.accepted { - continue; - } - - let mut denied_match_set = None::>; - let allowed = - self.conversation - .agents - .get_active() - .is_some_and(|a| match tool.tool.requires_acceptance(os, a) { - PermissionEvalResult::Allow => true, - PermissionEvalResult::Ask => false, - PermissionEvalResult::Deny(matches) => { - denied_match_set.replace(matches); - false - }, - }) - || self.conversation.agents.trust_all_tools; - - if let Some(match_set) = denied_match_set { - let formatted_set = match_set.into_iter().fold(String::new(), |mut acc, rule| { - acc.push_str(&format!("\n - {rule}")); - acc - }); - - execute!( - self.stderr, - style::SetForegroundColor(Color::Red), - style::Print("Command "), - style::SetForegroundColor(Color::Yellow), - style::Print(&tool.name), - style::SetForegroundColor(Color::Red), - style::Print(" is rejected because it matches one or more rules on the denied list:"), - style::Print(formatted_set), - style::Print("\n"), - style::SetForegroundColor(Color::Reset), - )?; - - return Ok(ChatState::HandleInput { - input: format!( - "Tool use with {} was rejected because the arguments supplied were forbidden", - tool.name - ), - }); - } - - if os - .database - .settings - .get_bool(Setting::ChatEnableNotifications) - .unwrap_or(false) - { - play_notification_bell(!allowed); - } - - // TODO: Control flow is hacky here because of borrow rules - let _ = tool; - self.print_tool_description(os, i, allowed).await?; - let tool = &mut self.tool_uses[i]; - - if allowed { - tool.accepted = true; - self.tool_use_telemetry_events - .entry(tool.id.clone()) - .and_modify(|ev| ev.is_trusted = true); - continue; + // Verify tools have permissions using pure evaluation function + let permission_results = evaluate_tool_permissions(&self.tool_uses, &self.conversation.agents, os); + + // Create permission interface for handling UI interactions + let context = permission::PermissionContext { + trust_all_tools: self.conversation.agents.trust_all_tools, + }; + + for result in permission_results { + match result { + ToolPermissionResult::Allowed { tool_index: _ } => { + // Tool is already allowed, continue + continue; + } + ToolPermissionResult::Denied { tool_index: _, tool_name, rules } => { + // Use permission interface to show denied tool + let mut permission_interface = self.create_permission_interface(os); + permission_interface.show_denied_tool(&tool_name, rules).await + .map_err(|e| ChatError::Custom(format!("Permission interface error: {e}").into()))?; + + return Ok(ChatState::HandleInput { + input: format!( + "Tool use with {} was rejected because the arguments supplied were forbidden", + tool_name + ), + }); + } + ToolPermissionResult::RequiresConfirmation { tool_index, tool_name: _ } => { + let tool = &self.tool_uses[tool_index]; + + // Use permission interface to request permission + let mut permission_interface = self.create_permission_interface(os); + let _decision = permission_interface.request_permission(tool, &context).await + .map_err(|e| ChatError::Custom(format!("Permission interface error: {e}").into()))?; + + // For now, maintain existing behavior by setting pending_tool_index + // This will be cleaned up when we fully integrate the new flow + self.pending_tool_index = Some(tool_index); + + return Ok(ChatState::PromptUser { + skip_printing_tools: false, + }); + } } - - self.pending_tool_index = Some(i); - - return Ok(ChatState::PromptUser { - skip_printing_tools: false, - }); } // All tools are allowed now @@ -3192,10 +3183,7 @@ impl ChatSession { Ok(Some(_)) => (), Ok(None) => { // User did not select a model, so reset the current request state. - self.conversation.enforce_conversation_invariants(); - self.conversation.reset_next_user_message(); - self.pending_tool_index = None; - self.tool_turn_start_time = None; + self.reset_user_message_and_pending_tool_use(); return Ok(ChatState::PromptUser { skip_printing_tools: false, }); @@ -4366,6 +4354,7 @@ mod tests { ); } + #[test] fn test_does_input_reference_file() { let tests = &[ diff --git a/crates/chat-cli/src/cli/chat/parser.rs b/crates/chat-cli/src/cli/chat/parser.rs index 2e0cdfb03c..eccd9b94df 100644 --- a/crates/chat-cli/src/cli/chat/parser.rs +++ b/crates/chat-cli/src/cli/chat/parser.rs @@ -725,6 +725,24 @@ fn system_time_to_unix_ms(time: SystemTime) -> u64 { #[cfg(test)] mod tests { use super::*; + use tokio::sync::mpsc; + + /// Helper function to convert Vec to mock receiver for tests + fn create_mock_receiver(mut events: Vec) -> mpsc::Receiver> { + let (tx, rx) = mpsc::channel(events.len()); + + // Spawn task to send all events + tokio::spawn(async move { + while let Some(event) = events.pop() { + if tx.send(Ok(event)).await.is_err() { + break; // Receiver dropped + } + } + // Channel closes automatically when tx is dropped + }); + + rx + } #[tokio::test] async fn test_response_parser_ignores_licensed_code() { @@ -775,7 +793,7 @@ mod tests { }, ]; events.reverse(); - let mock = SendMessageOutput::Mock(events); + let mock = SendMessageOutput::Mock(create_mock_receiver(events)); let mut parser = ResponseParser::new( mock, "".to_string(), @@ -831,7 +849,7 @@ mod tests { }, ]; events.reverse(); - let mock = SendMessageOutput::Mock(events); + let mock = SendMessageOutput::Mock(create_mock_receiver(events)); let mut parser = ResponseParser::new( mock, "".to_string(), diff --git a/crates/chat-cli/src/cli/chat/permission/console.rs b/crates/chat-cli/src/cli/chat/permission/console.rs new file mode 100644 index 0000000000..5b576eb197 --- /dev/null +++ b/crates/chat-cli/src/cli/chat/permission/console.rs @@ -0,0 +1,60 @@ +use async_trait::async_trait; +use eyre::Result; + +use crate::cli::chat::tools::QueuedTool; +use crate::os::Os; + +use super::{PermissionContext, PermissionDecision, PermissionInterface}; + +/// Console-based permission interface that preserves existing terminal behavior +pub struct ConsolePermissionInterface<'a> { + pub os: &'a Os, +} + +#[async_trait] +impl<'a> PermissionInterface for ConsolePermissionInterface<'a> { + async fn request_permission( + &mut self, + _tool: &QueuedTool, + _context: &PermissionContext, + ) -> Result { + // For now, return Approved to maintain existing flow + // The actual user input handling happens in the existing state machine + Ok(PermissionDecision::Approved) + } + + async fn show_denied_tool( + &mut self, + tool_name: &str, + rules: Vec, + ) -> Result<()> { + let formatted_set = rules.into_iter().fold(String::new(), |mut acc, rule| { + acc.push_str(&format!("\n - {rule}")); + acc + }); + + // We can't access stderr directly, so we'll use eprintln! for now + // This will be improved when we have better access to the output streams + eprintln!( + "{}Command {}{}{} is rejected because it matches one or more rules on the denied list:{}{}", + "\x1b[31m", // Red + "\x1b[33m", // Yellow + tool_name, + "\x1b[31m", // Red + formatted_set, + "\x1b[0m" // Reset + ); + + Ok(()) + } + + async fn show_tool_execution( + &mut self, + _tool: &QueuedTool, + _allowed: bool, + ) -> Result<()> { + // For now, this is a no-op since we don't have direct access to the streams + // This will be improved in the next iteration + Ok(()) + } +} diff --git a/crates/chat-cli/src/cli/chat/permission/mod.rs b/crates/chat-cli/src/cli/chat/permission/mod.rs new file mode 100644 index 0000000000..b58c85216c --- /dev/null +++ b/crates/chat-cli/src/cli/chat/permission/mod.rs @@ -0,0 +1,45 @@ +use async_trait::async_trait; +use eyre::Result; + +use crate::cli::chat::tools::QueuedTool; + +pub mod console; + +/// Context information for permission requests +#[derive(Debug)] +pub struct PermissionContext { + pub trust_all_tools: bool, +} + +/// Result of a permission request +#[derive(Debug, Clone)] +pub enum PermissionDecision { + Approved, + Rejected, + Cancelled, +} + +/// Abstraction for handling tool permission requests +#[async_trait] +pub trait PermissionInterface { + /// Request permission for a tool that requires confirmation + async fn request_permission( + &mut self, + tool: &QueuedTool, + context: &PermissionContext, + ) -> Result; + + /// Show a denied tool with explanation + async fn show_denied_tool( + &mut self, + tool_name: &str, + rules: Vec, + ) -> Result<()>; + + /// Show tool execution status (for notifications, etc.) + async fn show_tool_execution( + &mut self, + tool: &QueuedTool, + allowed: bool, + ) -> Result<()>; +} diff --git a/crates/chat-cli/src/cli/mod.rs b/crates/chat-cli/src/cli/mod.rs index 324af77121..68fd133e06 100644 --- a/crates/chat-cli/src/cli/mod.rs +++ b/crates/chat-cli/src/cli/mod.rs @@ -1,3 +1,4 @@ +mod acp; mod agent; pub mod chat; mod debug; @@ -7,6 +8,7 @@ pub mod feed; mod issue; mod mcp; mod settings; +pub mod tool; mod user; use std::fmt::Display; @@ -16,6 +18,7 @@ use std::io::{ }; use std::process::ExitCode; +use acp::AcpArgs; use agent::AgentArgs; pub use agent::{ Agent, @@ -91,6 +94,8 @@ impl OutputFormat { #[deny(missing_docs)] #[derive(Debug, PartialEq, Subcommand)] pub enum RootSubcommand { + /// Agent Client Protocol server + Acp(AcpArgs), /// Manage agents Agent(AgentArgs), /// AI assistant in your terminal @@ -159,6 +164,7 @@ impl RootSubcommand { } match self { + Self::Acp(args) => args.run(os).await, Self::Agent(args) => args.execute(os).await, Self::Diagnostic(args) => args.execute(os).await, Self::Login(args) => args.execute(os).await, @@ -183,6 +189,7 @@ impl Default for RootSubcommand { impl Display for RootSubcommand { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let name = match self { + Self::Acp(_) => "acp", Self::Agent(_) => "agent", Self::Chat(_) => "chat", Self::Login(_) => "login", diff --git a/crates/chat-cli/src/cli/tool/mod.rs b/crates/chat-cli/src/cli/tool/mod.rs new file mode 100644 index 0000000000..276d944058 --- /dev/null +++ b/crates/chat-cli/src/cli/tool/mod.rs @@ -0,0 +1,6 @@ +pub mod permission; + +pub use permission::{ + evaluate_tool_permissions, + ToolPermissionResult, +}; \ No newline at end of file diff --git a/crates/chat-cli/src/cli/tool/permission.rs b/crates/chat-cli/src/cli/tool/permission.rs new file mode 100644 index 0000000000..d5c4578bd6 --- /dev/null +++ b/crates/chat-cli/src/cli/tool/permission.rs @@ -0,0 +1,98 @@ +use crate::cli::agent::{PermissionEvalResult, Agents}; +use crate::cli::chat::tools::QueuedTool; +use crate::os::Os; + +/// Result of evaluating permissions for a single tool +#[derive(Debug, Clone)] +pub enum ToolPermissionResult { + /// Tool is allowed to execute without confirmation + Allowed { tool_index: usize }, + /// Tool requires user confirmation before execution + RequiresConfirmation { + tool_index: usize, + tool_name: String + }, + /// Tool is denied and should not be executed + Denied { + tool_index: usize, + tool_name: String, + rules: Vec + }, +} + +/// Context information for permission evaluation +#[derive(Debug)] +pub struct PermissionContext { + pub trust_all_tools: bool, +} + +/// Evaluates permissions for all tools in the queue +/// +/// This is pure logic that determines what should happen with each tool +/// based on agent configuration, trust settings, and tool requirements. +/// It contains no UI logic or side effects. +pub fn evaluate_tool_permissions( + tools: &[QueuedTool], + agents: &Agents, + os: &Os, +) -> Vec { + let context = PermissionContext { + trust_all_tools: agents.trust_all_tools, + }; + + let result = tools + .iter() + .enumerate() + .map(|(i, tool)| evaluate_single_tool_permission(i, tool, agents, os, &context)) + .collect(); + + tracing::debug!(func="evaluate_tool_permission", context = ?context, result = ?result); + + result +} + +/// Evaluates permission for a single tool +fn evaluate_single_tool_permission( + tool_index: usize, + tool: &QueuedTool, + agents: &Agents, + os: &Os, + context: &PermissionContext, +) -> ToolPermissionResult { + tracing::debug!(func="evaluate_single_tool_permission", tool = ?tool); + + // If tool is already accepted, it's allowed + if tool.accepted { + return ToolPermissionResult::Allowed { tool_index }; + } + + // Check agent-based permissions + let permission_result = agents + .get_active() + .map(|agent| tool.tool.requires_acceptance(os, agent)) + .unwrap_or(PermissionEvalResult::Ask); + + match permission_result { + PermissionEvalResult::Allow => { + ToolPermissionResult::Allowed { tool_index } + } + PermissionEvalResult::Ask => { + // Check if trust_all_tools overrides the ask + if context.trust_all_tools { + ToolPermissionResult::Allowed { tool_index } + } else { + ToolPermissionResult::RequiresConfirmation { + tool_index, + tool_name: tool.name.clone(), + } + } + } + PermissionEvalResult::Deny(rules) => { + ToolPermissionResult::Denied { + tool_index, + tool_name: tool.name.clone(), + rules, + } + } + } +} \ No newline at end of file diff --git a/crates/chat-cli/src/database/settings.rs b/crates/chat-cli/src/database/settings.rs index e83c5d4b9b..10f734d7bd 100644 --- a/crates/chat-cli/src/database/settings.rs +++ b/crates/chat-cli/src/database/settings.rs @@ -88,6 +88,8 @@ pub enum Setting { EnabledCheckpoint, #[strum(message = "Enable the delegate tool for subagent management (boolean)")] EnabledDelegate, + #[strum(message = "Enable Agent Client Protocol server (boolean)")] + EnabledAcp, } impl AsRef for Setting { @@ -129,6 +131,7 @@ impl AsRef for Setting { Self::EnabledCheckpoint => "chat.enableCheckpoint", Self::EnabledContextUsageIndicator => "chat.enableContextUsageIndicator", Self::EnabledDelegate => "chat.enableDelegate", + Self::EnabledAcp => "acp.enabled", } } } @@ -178,6 +181,7 @@ impl TryFrom<&str> for Setting { "chat.enableTodoList" => Ok(Self::EnabledTodoList), "chat.enableCheckpoint" => Ok(Self::EnabledCheckpoint), "chat.enableContextUsageIndicator" => Ok(Self::EnabledContextUsageIndicator), + "acp.enabled" => Ok(Self::EnabledAcp), _ => Err(DatabaseError::InvalidSetting(value.to_string())), } } diff --git a/crates/chat-cli/src/lib.rs b/crates/chat-cli/src/lib.rs index d2c155212d..cad3e9c4e4 100644 --- a/crates/chat-cli/src/lib.rs +++ b/crates/chat-cli/src/lib.rs @@ -10,9 +10,12 @@ pub mod constants; pub mod database; pub mod logging; pub mod mcp_client; +pub mod mock_llm; pub mod os; pub mod request; pub mod telemetry; pub mod util; pub use mcp_client::*; + +mod test_util; diff --git a/crates/chat-cli/src/main.rs b/crates/chat-cli/src/main.rs index b1ddb80549..afa78cf652 100644 --- a/crates/chat-cli/src/main.rs +++ b/crates/chat-cli/src/main.rs @@ -6,10 +6,12 @@ mod constants; mod database; mod logging; mod mcp_client; +mod mock_llm; mod os; mod request; mod telemetry; mod util; +mod test_util; use std::process::ExitCode; diff --git a/crates/chat-cli/src/mcp_client/messenger.rs b/crates/chat-cli/src/mcp_client/messenger.rs index 40e9bc84ca..2c2a85c65f 100644 --- a/crates/chat-cli/src/mcp_client/messenger.rs +++ b/crates/chat-cli/src/mcp_client/messenger.rs @@ -74,54 +74,4 @@ pub enum MessengerError { Custom(String), } -#[derive(Clone, Debug)] -pub struct NullMessenger; -#[async_trait::async_trait] -impl Messenger for NullMessenger { - async fn send_tools_list_result( - &self, - _result: Result, - _peer: Option>, - ) -> MessengerResult { - Ok(()) - } - - async fn send_prompts_list_result( - &self, - _result: Result, - _peer: Option>, - ) -> MessengerResult { - Ok(()) - } - - async fn send_resources_list_result( - &self, - _result: Result, - _peer: Option>, - ) -> MessengerResult { - Ok(()) - } - - async fn send_resource_templates_list_result( - &self, - _result: Result, - _peer: Option>, - ) -> MessengerResult { - Ok(()) - } - - async fn send_oauth_link(&self, _link: String) -> MessengerResult { - Ok(()) - } - - async fn send_init_msg(&self) -> MessengerResult { - Ok(()) - } - - fn send_deinit_msg(&self) {} - - fn duplicate(&self) -> Box { - Box::new(NullMessenger) - } -} diff --git a/crates/chat-cli/src/mock_llm.rs b/crates/chat-cli/src/mock_llm.rs new file mode 100644 index 0000000000..0172466b10 --- /dev/null +++ b/crates/chat-cli/src/mock_llm.rs @@ -0,0 +1,469 @@ +#![cfg_attr(not(test), allow(dead_code))] + +//! Mock LLM architecture for testing +//! +//! This provides a stateless per-turn mock system that matches real LLM behavior. +//! Each user message spawns a fresh mock context with full conversation history. + +use eyre::Result; +use regex::Regex; +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use tokio::sync::mpsc; + +use crate::api_client::model::{ChatMessage, ChatResponseStream}; + +/// Captures from regex matching against conversation +pub type ConversationMatches = HashMap; + +/// Context for per-turn mock LLM execution with conversation history and streaming response +/// This is the main interface that test scripts interact with. +pub struct MockLLMContext { + conversation_history: Vec, + current_user_message: String, + tx: mpsc::Sender>, +} + +impl MockLLMContext { + async fn send_text( + tx: &mut mpsc::Sender>, + text: impl ToString, + ) -> eyre::Result<()> { + tx + .send(Ok(ChatResponseStream::AssistantResponseEvent { + content: text.to_string(), + })) + .await + .map_err(|_| eyre::eyre!("Response channel closed")) + } + + async fn send_tool_use( + tx: &mut mpsc::Sender>, + tool_use_id: String, + name: String, + input: Option, + stop: Option, + ) -> eyre::Result<()> { + tx + .send(Ok(ChatResponseStream::ToolUseEvent { + tool_use_id, + name, + input, + stop, + })) + .await + .map_err(|_| eyre::eyre!("Response channel closed")) + } + + /// Respond with text to the user. + #[allow(dead_code)] + pub async fn respond(&mut self, text: impl ToString) -> eyre::Result<()> { + Self::send_text(&mut self.tx, text).await + } + + /// Respond with a tool use + #[allow(dead_code)] + pub async fn respond_tool_use( + &mut self, + tool_use_id: String, + name: String, + input: Option, + stop: Option, + ) -> eyre::Result<()> { + Self::send_tool_use(&mut self.tx, tool_use_id, name, input, stop).await + } + + /// Count the number of user messages in the conversation (including the current one). + /// Useful for determining which predefined response to use in sequence. + #[allow(dead_code)] + pub fn count_user_messages(&self) -> usize { + // Count user messages in history + current message + let user_messages_in_history = self.conversation_history + .iter() + .filter(|msg| matches!(msg, ChatMessage::UserInputMessage(_))) + .count(); + user_messages_in_history + 1 // +1 for current message + } + + /// Get the current user message content. + /// Useful for routing based on message content in tests. + #[allow(dead_code)] + pub fn current_user_message(&self) -> &str { + &self.current_user_message + } + + /// Match conversation against regex patterns and return captured groups + /// + /// # Arguments + /// * `history_patterns` - Patterns to match against conversation history messages + /// * `current_pattern` - Pattern to match against current user message + /// + /// # Returns + /// - `Ok(Some(captures))` if all patterns match, where captures contains all named groups (?P...) + /// - `Ok(None)` if patterns are valid but don't match the conversation + /// - `Err(...)` if regex compilation fails or other internal errors occur + /// + /// # Example + /// ```ignore + /// let captures = ctx.match_conversation( + /// &["assistant said (?P.+)", "user.*(?P\\w+)"], + /// "tell me about (?P.+)" + /// )?; // Propagate regex compilation errors + /// + /// if let Some(caps) = captures { + /// let query = caps.get("query").unwrap(); + /// // Use captured values... + /// } + /// ``` + pub fn match_conversation( + &self, + history_patterns: &[&str], + current_pattern: &str, + ) -> Result> { + tracing::trace!("match_conversation: current_pattern={:?}, history_patterns={:?}", current_pattern, history_patterns); + let mut all_captures = HashMap::new(); + + // Compile current message pattern + let current_regex = Regex::new(current_pattern) + .map_err(|e| eyre::eyre!("Failed to compile current message pattern '{}': {}", current_pattern, e))?; + + // Match against current user message + tracing::trace!("Matching current message {:?} against pattern {:?}", self.current_user_message, current_pattern); + if let Some(caps) = current_regex.captures(&self.current_user_message) { + tracing::trace!("Current message matched! Captures: {:?}", caps); + // Extract named captures from current message + for name in current_regex.capture_names().flatten() { + if let Some(m) = caps.name(name) { + all_captures.insert(name.to_string(), m.as_str().to_string()); + tracing::trace!("Captured from current message: {}={}", name, m.as_str()); + } + } + } else { + tracing::trace!("Current message did not match pattern"); + return Ok(None); // Current message doesn't match + } + + // Match history patterns against conversation history + // We need to find a subsequence in the history that matches all patterns + if history_patterns.is_empty() { + tracing::trace!("No history patterns, returning current captures"); + return Ok(Some(all_captures)); + } + + // Compile all history patterns + let history_regexes: Result, regex::Error> = + history_patterns.iter().map(|p| Regex::new(p)).collect(); + let history_regexes = history_regexes.map_err(|e| eyre::eyre!("Failed to compile history pattern: {}", e))?; + + // Convert history to strings for matching + let history_strings: Vec = self + .conversation_history + .iter() + .map(|msg| match msg { + ChatMessage::UserInputMessage(user_msg) => format!("user: {}", user_msg.content), + ChatMessage::AssistantResponseMessage(assistant_msg) => format!("assistant: {}", assistant_msg.content), + }) + .collect(); + + tracing::trace!("Formatted history strings for pattern matching:"); + for (i, hist_str) in history_strings.iter().enumerate() { + tracing::trace!(" [{}]: {:?}", i, hist_str); + } + + // Try to match all history patterns as a sequence + tracing::trace!("Attempting to match history patterns: {:?}", history_patterns); + if self.match_history_sequence(&history_strings, &history_regexes, &mut all_captures) { + tracing::trace!("History patterns matched! Final captures: {:?}", all_captures); + Ok(Some(all_captures)) + } else { + tracing::trace!("History patterns did not match"); + Ok(None) + } + } + + /// Helper to match history patterns as a subsequence + fn match_history_sequence( + &self, + history: &[String], + patterns: &[Regex], + captures: &mut ConversationMatches, + ) -> bool { + if patterns.is_empty() { + tracing::trace!("No patterns to match, returning true"); + return true; + } + + tracing::trace!("Matching {} patterns against {} history items", patterns.len(), history.len()); + + // Try to find starting positions where we can match the full sequence + for start_idx in 0..=(history.len().saturating_sub(patterns.len())) { + tracing::trace!("Trying to match patterns starting at history index {}", start_idx); + let mut temp_captures = HashMap::new(); + let mut matched = true; + + // Try to match each pattern in sequence starting from start_idx + for (pattern_idx, pattern) in patterns.iter().enumerate() { + let history_idx = start_idx + pattern_idx; + if history_idx >= history.len() { + tracing::trace!("History index {} out of bounds", history_idx); + matched = false; + break; + } + + let history_item = &history[history_idx]; + tracing::trace!("Matching pattern {:?} against history[{}]: {:?}", pattern.as_str(), history_idx, history_item); + + if let Some(caps) = pattern.captures(history_item) { + tracing::trace!("Pattern matched! Captures: {:?}", caps); + // Extract named captures + for name in pattern.capture_names().flatten() { + if let Some(m) = caps.name(name) { + temp_captures.insert(name.to_string(), m.as_str().to_string()); + tracing::trace!("Captured from history: {}={}", name, m.as_str()); + } + } + } else { + tracing::trace!("Pattern did not match history item"); + matched = false; + break; + } + } + + if matched { + tracing::trace!("All patterns matched starting at index {}! Merging captures: {:?}", start_idx, temp_captures); + // Merge temp_captures into main captures + captures.extend(temp_captures); + return true; + } else { + tracing::trace!("Not all patterns matched starting at index {}", start_idx); + } + } + + tracing::trace!("No starting position worked, sequence match failed"); + false + } + + /// Declarative pattern matching with automatic regex substitution + /// + /// Tries each pattern tuple in order until one matches, then sends response with proper + /// regex substitution using `$name` syntax for captured groups. + /// + /// # Arguments + /// * `patterns` - Array of (history_patterns, current_pattern, response_template) tuples + /// + /// # Returns + /// - `Ok(())` if any pattern matched and response was sent + /// - `Err("unexpected input")` if no patterns matched + /// - `Err(...)` if regex compilation failed or response channel closed + /// + /// # Example + /// ```ignore + /// ctx.try_patterns(&[ + /// (&[], r"(?i)hi,?\s+claude", "Hi, you! What's your name?"), + /// (&[r"assistant.*What's your name"], r"(?i)(?:i'm|my name is|call me)\s+(?P\w+)", "Hi $name, I'm Q!"), + /// (&[], r".*", "I didn't understand that."), // Fallback + /// ]).await?; + /// ``` + pub async fn try_patterns(&mut self, patterns: &[(&[&str], &str, &str)]) -> Result<()> { + tracing::debug!("MockLLM try_patterns called with {} patterns", patterns.len()); + tracing::debug!("Current user message: {:?}", self.current_user_message); + tracing::debug!("Conversation history length: {}", self.conversation_history.len()); + + // Show full conversation history for debugging + for (i, msg) in self.conversation_history.iter().enumerate() { + match msg { + ChatMessage::UserInputMessage(user_msg) => { + tracing::debug!("History[{}]: user: {:?}", i, user_msg.content); + } + ChatMessage::AssistantResponseMessage(assistant_msg) => { + tracing::debug!("History[{}]: assistant: {:?}", i, assistant_msg.content); + } + } + } + + for (pattern_idx, (history_patterns, current_pattern, response_template)) in patterns.iter().enumerate() { + tracing::debug!("Trying pattern {}: history_patterns={:?}, current_pattern={:?}", + pattern_idx, history_patterns, current_pattern); + + // Try to match this pattern + match self.match_conversation(history_patterns, current_pattern)? { + Some(captures) => { + tracing::debug!("Pattern {} matched! Captures: {:?}", pattern_idx, captures); + // Pattern matched! Do regex substitution on response template + let response = self.substitute_captures(current_pattern, &captures, response_template)?; + tracing::debug!("Generated response: {:?}", response); + + // Send the response + Self::send_text(&mut self.tx, response).await?; + + return Ok(()); // Success - matched and responded + }, + None => { + tracing::debug!("Pattern {} did not match", pattern_idx); + // This pattern didn't match, try the next one + continue; + }, + } + } + + tracing::debug!("No patterns matched, returning error"); + // No patterns matched + Err(eyre::eyre!("unexpected input")) + } + + /// Block until the response channel is canceled, sending a coordination signal first. + /// This is useful for testing cancellation scenarios where you want the mock LLM + /// to send a signal and then wait to be canceled. + /// + /// # Usage + /// ```ignore + /// // In test: start prompt and cancellation concurrently + /// let (prompt_result, _) = tokio::join!( + /// session.prompt("test message"), + /// async { + /// // Wait for LLM to signal it's ready, then cancel + /// coordination_rx.recv().await; + /// client.cancel(session_id).await; + /// } + /// ); + /// assert_eq!(prompt_result.unwrap_err().to_string(), "canceled"); + /// ``` + #[allow(dead_code)] + pub async fn block_until_canceled(&mut self, coordination_signal: &str) -> eyre::Result<()> { + // Send the coordination signal first to let the test know we're ready + self.respond(coordination_signal).await?; + + // Now block by checking if channel is closed periodically + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + + // Check if the channel is closed + if self.tx.is_closed() { + tracing::debug!( + actor = "MockLLM", + event = "detected cancellation via closed channel", + current_user_message = self.current_user_message(), + ); + return Ok(()); + } + } + } + + /// Helper to perform regex substitution using captured groups + /// Uses proper regex substitution with $name syntax + fn substitute_captures(&self, pattern: &str, captures: &ConversationMatches, template: &str) -> Result { + // Create a regex to re-capture the current message for proper substitution + let regex = Regex::new(pattern) + .map_err(|e| eyre::eyre!("Failed to recompile pattern for substitution '{}': {}", pattern, e))?; + + if let Some(caps) = regex.captures(&self.current_user_message) { + // Use regex's built-in substitution which handles $name syntax properly + let mut result = String::new(); + caps.expand(template, &mut result); + Ok(result) + } else { + // Fallback to manual substitution if regex doesn't match current message + // This handles cases where captures came from history patterns + let mut result = template.to_string(); + for (name, value) in captures { + result = result.replace(&format!("${}", name), value); + result = result.replace(&format!("${{{}}}", name), value); // Also support ${name} syntax + } + Ok(result) + } + } +} + +/// Concrete implementation that wraps a closure for per-turn mock execution +pub struct MockLLM { + closure: Box Pin> + Send>> + Send + Sync + 'static>, +} + +impl std::fmt::Debug for MockLLM { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MockLLMInstance").finish() + } +} + +impl MockLLM { + pub fn new(closure: F) -> Self + where + F: Fn(crate::mock_llm::MockLLMContext) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + Self { + closure: Box::new(move |ctx| Box::pin(closure(ctx))), + } + } + + /// Spawn a task for this turn. Returns a receiver which will receive events + /// emitted by this task. If that receiver is dropped, the task will naturally + /// terminate. + pub fn spawn_turn( + &self, + conversation_history: Vec, + current_user_message: String, + ) -> mpsc::Receiver> { + tracing::debug!( + actor = "MockLLM", + event = "spawn_turn", + ?current_user_message, + ?conversation_history, + ); + + // Create a fresh channel for this mock turn + // The mock script will send ResponseEvents via the context's tx + // The consumer will receive them via mock_rx + let (mock_tx, mock_rx) = mpsc::channel(32); + let mock_tx_clone = mock_tx.clone(); + + // Create context with the provided tx channel + let mock_context = MockLLMContext { + conversation_history, + current_user_message: current_user_message.clone(), + tx: mock_tx, + }; + + let future = (self.closure)(mock_context); + tokio::spawn(async move { + match future.await { + Ok(()) => { + tracing::debug!( + actor = "MockLLM", + event = "terminate", + ?current_user_message, + ); + } + Err(e) => { + // Send error on failure + let _ = mock_tx_clone.send(Err(RecvError::from(e))).await; + } + } + }); + + mock_rx + } +} + +// Error type to match existing RecvError from parser +#[derive(Debug)] +pub struct RecvError(eyre::Error); + +impl From for RecvError { + fn from(e: eyre::Error) -> Self { + RecvError(e) + } +} + +impl std::fmt::Display for RecvError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for RecvError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.0.source() + } +} diff --git a/crates/chat-cli/src/test_util.rs b/crates/chat-cli/src/test_util.rs new file mode 100644 index 0000000000..3a6f1fdb6c --- /dev/null +++ b/crates/chat-cli/src/test_util.rs @@ -0,0 +1,12 @@ +#![cfg(test)] + +/// Enables tracing to stderr. Useful when debugging a particular test. +#[allow(dead_code)] +pub fn enable_tracing() { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .with_ansi(false) + .with_writer(std::io::stderr) + .try_init() + .ok(); +} \ No newline at end of file diff --git a/crates/chat-cli/src/util/mod.rs b/crates/chat-cli/src/util/mod.rs index 48d8c94c97..db08364680 100644 --- a/crates/chat-cli/src/util/mod.rs +++ b/crates/chat-cli/src/util/mod.rs @@ -42,20 +42,7 @@ pub enum UtilError { Json(#[from] serde_json::Error), } -#[derive(Debug, Clone)] -pub struct UnknownDesktopErrContext { - xdg_current_desktop: String, - xdg_session_desktop: String, - gdm_session: String, -} -impl std::fmt::Display for UnknownDesktopErrContext { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "XDG_CURRENT_DESKTOP: `{}`, ", self.xdg_current_desktop)?; - write!(f, "XDG_SESSION_DESKTOP: `{}`, ", self.xdg_session_desktop)?; - write!(f, "GDMSESSION: `{}`", self.gdm_session) - } -} pub fn choose(prompt: impl Display, options: &[impl ToString]) -> Result> { if options.is_empty() {