From c76cdc1177a149c7550cfdf3b2b61bdff57562fc Mon Sep 17 00:00:00 2001 From: dannytodtenhoefer Date: Fri, 28 Nov 2025 09:52:56 +0100 Subject: [PATCH 1/6] fix(workflows): pass engine option through resolveModule resolveModule was not passing the engine option from workflow overrides or module config, causing workflows to fall back to default engine instead of using the specified one. This aligns resolveModule with resolveStep which already handles engine correctly. --- src/workflows/utils/resolvers/module.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/workflows/utils/resolvers/module.ts b/src/workflows/utils/resolvers/module.ts index ff48ec01..99698ddb 100644 --- a/src/workflows/utils/resolvers/module.ts +++ b/src/workflows/utils/resolvers/module.ts @@ -42,6 +42,7 @@ export function resolveModule(id: string, overrides: ModuleOverrides = {}): Work const promptPath = overrides.promptPath ?? moduleEntry.promptPath; const model = overrides.model ?? moduleEntry.model; const modelReasoningEffort = overrides.modelReasoningEffort ?? moduleEntry.modelReasoningEffort; + const engine = overrides.engine ?? moduleEntry.engine; if (typeof promptPath !== 'string' || !promptPath.trim()) { throw new Error(`Module ${id} is missing a promptPath configuration.`); @@ -56,6 +57,7 @@ export function resolveModule(id: string, overrides: ModuleOverrides = {}): Work promptPath, model, modelReasoningEffort, + engine, module: { id: moduleEntry.id, behavior, From f88cfcc6925fb7e6789d3d353553bdcfcd4302ec Mon Sep 17 00:00:00 2001 From: Sam Tregar Date: Sun, 30 Nov 2025 12:24:33 -0800 Subject: [PATCH 2/6] fix(init): correct branch name typo codemacine -> codemachine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #43 šŸ¤– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- prompts/templates/dev-codemachine/main-agents/00-init.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/prompts/templates/dev-codemachine/main-agents/00-init.md b/prompts/templates/dev-codemachine/main-agents/00-init.md index 31c28918..edc53b9d 100644 --- a/prompts/templates/dev-codemachine/main-agents/00-init.md +++ b/prompts/templates/dev-codemachine/main-agents/00-init.md @@ -2,10 +2,10 @@ **Task:** -1. **Ensure the current branch is `codemacine/dev` by following this specific logic:** - * First, check if the *current* branch is already `codemacine/dev`. If it is, this step is complete. - * If not, check if a branch named `codemacine/dev` *already exists*. If it does, switch to it. - * If it does not exist, create it as a new branch and switch to it (e.g., `git checkout -b codemacine/dev`). +1. **Ensure the current branch is `codemachine/dev` by following this specific logic:** + * First, check if the *current* branch is already `codemachine/dev`. If it is, this step is complete. + * If not, check if a branch named `codemachine/dev` *already exists*. If it does, switch to it. + * If it does not exist, create it as a new branch and switch to it (e.g., `git checkout -b codemachine/dev`). 2. **Append the following lines to the `.gitignore` file, skipping any that already exist:** ``` From 31b97d619f13bac9cac1269268ca139999f440ce Mon Sep 17 00:00:00 2001 From: moazbuilds <109697407+moazbuilds@users.noreply.github.com> Date: Mon, 1 Dec 2025 02:02:38 +0200 Subject: [PATCH 3/6] Update README.md with updated discord url --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8d80cb2f..4dde2659 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ > **āš ļø Early Development Notice:** This project is in early development and is not yet ready for production use. Features may change, break, or be incomplete.

- + Join our Discord    From 46a271519c692370ec12a3344a8b51e57714f845 Mon Sep 17 00:00:00 2001 From: Nick Roth Date: Thu, 4 Dec 2025 09:52:50 -0600 Subject: [PATCH 4/6] feat: add agent field to workflow steps for OpenCode --agent flag Enables workflow step definitions to specify which OpenCode agent configuration to use via the --agent flag. The infrastructure for --agent was already in place in the OpenCode executor layer, this change plumbs it through from workflows. Changes: - Add agent?: string to ModuleStep (workflow step config) - Add agent?: string to EngineRunOptions (engine interface) - Add agent?: string to ExecuteAgentOptions (runner interface) - Pass step.agent through executeStep() -> executeAgent() -> engine.run() The agent name flows: workflow.js step.agent -> executeStep() -> executeAgent() -> engine.run() -> runOpenCode() -> buildOpenCodeRunCommand() -> 'opencode run --agent ' --- src/agents/runner/runner.ts | 677 ++++++++++++++++--------------- src/infra/engines/core/types.ts | 58 +-- src/workflows/execution/step.ts | 147 +++---- src/workflows/templates/types.ts | 89 ++-- 4 files changed, 515 insertions(+), 456 deletions(-) diff --git a/src/agents/runner/runner.ts b/src/agents/runner/runner.ts index c50bd02b..1fed5516 100644 --- a/src/agents/runner/runner.ts +++ b/src/agents/runner/runner.ts @@ -1,14 +1,17 @@ -import * as path from 'node:path'; - -import type { EngineType } from '../../infra/engines/index.js'; -import { getEngine } from '../../infra/engines/index.js'; -import { MemoryAdapter } from '../../infra/fs/memory-adapter.js'; -import { MemoryStore } from '../index.js'; -import { loadAgentConfig } from './config.js'; -import { AgentMonitorService, AgentLoggerService } from '../monitoring/index.js'; -import type { ParsedTelemetry } from '../../infra/engines/core/types.js'; -import { formatForLogFile } from '../../shared/formatters/logFileFormatter.js'; -import { info, error } from '../../shared/logging/logger.js'; +import * as path from "node:path"; + +import type { EngineType } from "../../infra/engines/index.js"; +import { getEngine } from "../../infra/engines/index.js"; +import { MemoryAdapter } from "../../infra/fs/memory-adapter.js"; +import { MemoryStore } from "../index.js"; +import { loadAgentConfig } from "./config.js"; +import { + AgentMonitorService, + AgentLoggerService, +} from "../monitoring/index.js"; +import type { ParsedTelemetry } from "../../infra/engines/core/types.js"; +import { formatForLogFile } from "../../shared/formatters/logFileFormatter.js"; +import { info, error } from "../../shared/logging/logger.js"; /** * Cache for engine authentication status with TTL (shared across all subagents) @@ -16,29 +19,33 @@ import { info, error } from '../../shared/logging/logger.js'; * CRITICAL: This fixes the 5-minute delay bug when spawning multiple subagents */ class EngineAuthCache { - private cache: Map = new Map(); - private ttlMs: number = 5 * 60 * 1000; // 5 minutes TTL - - async isAuthenticated(engineId: string, checkFn: () => Promise): Promise { - const cached = this.cache.get(engineId); - const now = Date.now(); - - // Return cached value if still valid - if (cached && (now - cached.timestamp) < this.ttlMs) { - return cached.isAuthenticated; - } - - // Cache miss or expired - perform actual check - const result = await checkFn(); - - // Cache the result - this.cache.set(engineId, { - isAuthenticated: result, - timestamp: now - }); - - return result; - } + private cache: Map = + new Map(); + private ttlMs: number = 5 * 60 * 1000; // 5 minutes TTL + + async isAuthenticated( + engineId: string, + checkFn: () => Promise, + ): Promise { + const cached = this.cache.get(engineId); + const now = Date.now(); + + // Return cached value if still valid + if (cached && now - cached.timestamp < this.ttlMs) { + return cached.isAuthenticated; + } + + // Cache miss or expired - perform actual check + const result = await checkFn(); + + // Cache the result + this.cache.set(engineId, { + isAuthenticated: result, + timestamp: now, + }); + + return result; + } } // Global auth cache instance (shared across all subagent executions) @@ -48,103 +55,108 @@ const authCache = new EngineAuthCache(); * Minimal UI interface for agent execution */ export interface AgentExecutionUI { - registerMonitoringId(uiAgentId: string, monitoringAgentId: number): void; + registerMonitoringId(uiAgentId: string, monitoringAgentId: number): void; } export interface ExecuteAgentOptions { - /** - * Engine to use (overrides agent config) - */ - engine?: EngineType; - - /** - * Model to use (overrides agent config) - */ - model?: string; - - /** - * Working directory for execution - */ - workingDir: string; - - /** - * Project root for config lookup (defaults to workingDir) - */ - projectRoot?: string; - - /** - * Logger for stdout - */ - logger?: (chunk: string) => void; - - /** - * Logger for stderr - */ - stderrLogger?: (chunk: string) => void; - - /** - * Telemetry callback (for UI updates) - */ - onTelemetry?: (telemetry: ParsedTelemetry) => void; - - /** - * Abort signal - */ - abortSignal?: AbortSignal; - - /** - * Timeout in milliseconds - */ - timeout?: number; - - /** - * Parent agent ID (for tracking parent-child relationships) - */ - parentId?: number; - - /** - * Disable monitoring (for special cases where monitoring is not desired) - */ - disableMonitoring?: boolean; - - /** - * UI manager (for registering monitoring IDs) - */ - ui?: AgentExecutionUI; - - /** - * Unique agent ID for UI (for registering monitoring IDs) - */ - uniqueAgentId?: string; - - /** - * Display prompt (for logging/monitoring - shows user's actual request) - * If not provided, uses the full execution prompt - */ - displayPrompt?: string; + /** + * Engine to use (overrides agent config) + */ + engine?: EngineType; + + /** + * Model to use (overrides agent config) + */ + model?: string; + + /** + * Working directory for execution + */ + workingDir: string; + + /** + * Project root for config lookup (defaults to workingDir) + */ + projectRoot?: string; + + /** + * Logger for stdout + */ + logger?: (chunk: string) => void; + + /** + * Logger for stderr + */ + stderrLogger?: (chunk: string) => void; + + /** + * Telemetry callback (for UI updates) + */ + onTelemetry?: (telemetry: ParsedTelemetry) => void; + + /** + * Abort signal + */ + abortSignal?: AbortSignal; + + /** + * Timeout in milliseconds + */ + timeout?: number; + + /** + * Parent agent ID (for tracking parent-child relationships) + */ + parentId?: number; + + /** + * Disable monitoring (for special cases where monitoring is not desired) + */ + disableMonitoring?: boolean; + + /** + * UI manager (for registering monitoring IDs) + */ + ui?: AgentExecutionUI; + + /** + * Unique agent ID for UI (for registering monitoring IDs) + */ + uniqueAgentId?: string; + + /** + * Display prompt (for logging/monitoring - shows user's actual request) + * If not provided, uses the full execution prompt + */ + displayPrompt?: string; + + /** + * Agent name for engines that support agent selection (e.g., OpenCode --agent flag) + */ + agent?: string; } /** * Ensures the engine is authenticated */ async function ensureEngineAuth(engineType: EngineType): Promise { - const { registry } = await import('../../infra/engines/index.js'); - const engine = registry.get(engineType); - - if (!engine) { - const availableEngines = registry.getAllIds().join(', '); - throw new Error( - `Unknown engine type: ${engineType}. Available engines: ${availableEngines}` - ); - } - - const isAuthed = await engine.auth.isAuthenticated(); - if (!isAuthed) { - console.error(`\n${engine.metadata.name} authentication required`); - console.error(`\nRun the following command to authenticate:\n`); - console.error(` codemachine auth login\n`); - throw new Error(`${engine.metadata.name} authentication required`); - } + const { registry } = await import("../../infra/engines/index.js"); + const engine = registry.get(engineType); + + if (!engine) { + const availableEngines = registry.getAllIds().join(", "); + throw new Error( + `Unknown engine type: ${engineType}. Available engines: ${availableEngines}`, + ); + } + + const isAuthed = await engine.auth.isAuthenticated(); + if (!isAuthed) { + console.error(`\n${engine.metadata.name} authentication required`); + console.error(`\nRun the following command to authenticate:\n`); + console.error(` codemachine auth login\n`); + throw new Error(`${engine.metadata.name} authentication required`); + } } /** @@ -172,204 +184,233 @@ async function ensureEngineAuth(engineType: EngineType): Promise { * - CLI commands (via orchestration) */ export interface AgentExecutionOutput { - output: string; - agentId?: number; + output: string; + agentId?: number; } export async function executeAgent( - agentId: string, - prompt: string, - options: ExecuteAgentOptions, + agentId: string, + prompt: string, + options: ExecuteAgentOptions, ): Promise { - const { workingDir, projectRoot, engine: engineOverride, model: modelOverride, logger, stderrLogger, onTelemetry, abortSignal, timeout, parentId, disableMonitoring, ui, uniqueAgentId, displayPrompt } = options; - - // Load agent config to determine engine and model - const agentConfig = await loadAgentConfig(agentId, projectRoot ?? workingDir); - - // Determine engine: CLI override > agent config > first authenticated engine - const { registry } = await import('../../infra/engines/index.js'); - let engineType: EngineType; - - if (engineOverride) { - engineType = engineOverride; - } else if (agentConfig.engine) { - engineType = agentConfig.engine; - } else { - // Fallback: find first authenticated engine by order (WITH CACHING - critical for subagents) - const engines = registry.getAll(); - let foundEngine = null; - - for (const engine of engines) { - // Use cached auth check to avoid 10-30 second delays per subagent - const isAuth = await authCache.isAuthenticated( - engine.metadata.id, - () => engine.auth.isAuthenticated() - ); - if (isAuth) { - foundEngine = engine; - break; - } - } - - if (!foundEngine) { - // If no authenticated engine, use default (first by order) - foundEngine = registry.getDefault(); - } - - if (!foundEngine) { - throw new Error('No engines registered. Please install at least one engine.'); - } - - engineType = foundEngine.metadata.id; - info(`No engine specified for agent '${agentId}', using ${foundEngine.metadata.name} (${engineType})`); - } - - // Ensure authentication - await ensureEngineAuth(engineType); - - // Get engine module for defaults - const engineModule = registry.get(engineType); - if (!engineModule) { - throw new Error(`Engine not found: ${engineType}`); - } - - // Model resolution: CLI override > agent config (legacy) > engine default - const model = modelOverride ?? (agentConfig.model as string | undefined) ?? engineModule.metadata.defaultModel; - const modelReasoningEffort = (agentConfig.modelReasoningEffort as 'low' | 'medium' | 'high' | undefined) ?? engineModule.metadata.defaultModelReasoningEffort; - - // Initialize monitoring with engine/model info (unless explicitly disabled) - const monitor = !disableMonitoring ? AgentMonitorService.getInstance() : null; - const loggerService = !disableMonitoring ? AgentLoggerService.getInstance() : null; - let monitoringAgentId: number | undefined; - - if (monitor && loggerService) { - // For registration: use displayPrompt (short user request) if provided, otherwise full prompt - const promptForDisplay = displayPrompt || prompt; - monitoringAgentId = await monitor.register({ - name: agentId, - prompt: promptForDisplay, // This gets truncated in monitor for memory efficiency - parentId, - engine: engineType, - engineProvider: engineType, - modelName: model, - }); - - // Store FULL prompt for debug mode logging (not the display prompt) - // In debug mode, we want to see the complete composite prompt with template + input files - loggerService.storeFullPrompt(monitoringAgentId, prompt); - - // Register monitoring ID with UI immediately so it can load logs - if (ui && uniqueAgentId && monitoringAgentId !== undefined) { - ui.registerMonitoringId(uniqueAgentId, monitoringAgentId); - } - } - - // Set up memory - const memoryDir = path.resolve(workingDir, '.codemachine', 'memory'); - const adapter = new MemoryAdapter(memoryDir); - const store = new MemoryStore(adapter); - - // Get engine and execute - // NOTE: Prompt is already complete - no template loading or building here - const engine = getEngine(engineType); - - let totalStdout = ''; - - try { - const result = await engine.run({ - prompt, // Already complete and ready to use - workingDir, - model, - modelReasoningEffort, - env: { - ...process.env, - // Pass parent agent ID to child processes (for orchestration context) - ...(monitoringAgentId !== undefined && { - CODEMACHINE_PARENT_AGENT_ID: monitoringAgentId.toString() - }) - }, - onData: (chunk) => { - totalStdout += chunk; - - // Dual-stream: write to log file (with status text) AND original logger (with colors) - if (loggerService && monitoringAgentId !== undefined) { - // Transform color markers to status text for log file readability - const logChunk = formatForLogFile(chunk); - loggerService.write(monitoringAgentId, logChunk); - } - - // Keep original format with color markers for UI display - if (logger) { - logger(chunk); - } else { - try { - process.stdout.write(chunk); - } catch { - // ignore streaming failures - } - } - }, - onErrorData: (chunk) => { - // Also log stderr to file (with status text transformation) - if (loggerService && monitoringAgentId !== undefined) { - const logChunk = formatForLogFile(chunk); - loggerService.write(monitoringAgentId, `[STDERR] ${logChunk}`); - } - - if (stderrLogger) { - stderrLogger(chunk); - } else { - try { - process.stderr.write(chunk); - } catch { - // ignore streaming failures - } - } - }, - onTelemetry: (telemetry) => { - // Update telemetry in monitoring (fire and forget - don't block streaming) - if (monitor && monitoringAgentId !== undefined) { - monitor.updateTelemetry(monitoringAgentId, telemetry).catch(err => - error(`Failed to update telemetry: ${err}`) - ); - } - - // Forward to caller's telemetry callback (for UI updates) - if (onTelemetry) { - onTelemetry(telemetry); - } - }, - abortSignal, - timeout, - }); - - // Store output in memory - const stdout = result.stdout || totalStdout; - const slice = stdout.slice(-2000); - await store.append({ - agentId, - content: slice, - timestamp: new Date().toISOString(), - }); - - // Mark agent as completed - if (monitor && monitoringAgentId !== undefined) { - await monitor.complete(monitoringAgentId); - // Note: Don't close stream here - workflow may write more messages - // Streams will be closed by cleanup handlers or monitoring service shutdown - } - - return { - output: stdout, - agentId: monitoringAgentId - }; - } catch (error) { - // Mark agent as failed - if (monitor && monitoringAgentId !== undefined) { - await monitor.fail(monitoringAgentId, error as Error); - // Note: Don't close stream here - workflow may write more messages - // Streams will be closed by cleanup handlers or monitoring service shutdown - } - throw error; - } + const { + workingDir, + projectRoot, + engine: engineOverride, + model: modelOverride, + logger, + stderrLogger, + onTelemetry, + abortSignal, + timeout, + parentId, + disableMonitoring, + ui, + uniqueAgentId, + displayPrompt, + } = options; + + // Load agent config to determine engine and model + const agentConfig = await loadAgentConfig(agentId, projectRoot ?? workingDir); + + // Determine engine: CLI override > agent config > first authenticated engine + const { registry } = await import("../../infra/engines/index.js"); + let engineType: EngineType; + + if (engineOverride) { + engineType = engineOverride; + } else if (agentConfig.engine) { + engineType = agentConfig.engine; + } else { + // Fallback: find first authenticated engine by order (WITH CACHING - critical for subagents) + const engines = registry.getAll(); + let foundEngine = null; + + for (const engine of engines) { + // Use cached auth check to avoid 10-30 second delays per subagent + const isAuth = await authCache.isAuthenticated(engine.metadata.id, () => + engine.auth.isAuthenticated(), + ); + if (isAuth) { + foundEngine = engine; + break; + } + } + + if (!foundEngine) { + // If no authenticated engine, use default (first by order) + foundEngine = registry.getDefault(); + } + + if (!foundEngine) { + throw new Error( + "No engines registered. Please install at least one engine.", + ); + } + + engineType = foundEngine.metadata.id; + info( + `No engine specified for agent '${agentId}', using ${foundEngine.metadata.name} (${engineType})`, + ); + } + + // Ensure authentication + await ensureEngineAuth(engineType); + + // Get engine module for defaults + const engineModule = registry.get(engineType); + if (!engineModule) { + throw new Error(`Engine not found: ${engineType}`); + } + + // Model resolution: CLI override > agent config (legacy) > engine default + const model = + modelOverride ?? + (agentConfig.model as string | undefined) ?? + engineModule.metadata.defaultModel; + const modelReasoningEffort = + (agentConfig.modelReasoningEffort as + | "low" + | "medium" + | "high" + | undefined) ?? engineModule.metadata.defaultModelReasoningEffort; + + // Initialize monitoring with engine/model info (unless explicitly disabled) + const monitor = !disableMonitoring ? AgentMonitorService.getInstance() : null; + const loggerService = !disableMonitoring + ? AgentLoggerService.getInstance() + : null; + let monitoringAgentId: number | undefined; + + if (monitor && loggerService) { + // For registration: use displayPrompt (short user request) if provided, otherwise full prompt + const promptForDisplay = displayPrompt || prompt; + monitoringAgentId = await monitor.register({ + name: agentId, + prompt: promptForDisplay, // This gets truncated in monitor for memory efficiency + parentId, + engine: engineType, + engineProvider: engineType, + modelName: model, + }); + + // Store FULL prompt for debug mode logging (not the display prompt) + // In debug mode, we want to see the complete composite prompt with template + input files + loggerService.storeFullPrompt(monitoringAgentId, prompt); + + // Register monitoring ID with UI immediately so it can load logs + if (ui && uniqueAgentId && monitoringAgentId !== undefined) { + ui.registerMonitoringId(uniqueAgentId, monitoringAgentId); + } + } + + // Set up memory + const memoryDir = path.resolve(workingDir, ".codemachine", "memory"); + const adapter = new MemoryAdapter(memoryDir); + const store = new MemoryStore(adapter); + + // Get engine and execute + // NOTE: Prompt is already complete - no template loading or building here + const engine = getEngine(engineType); + + let totalStdout = ""; + + try { + const result = await engine.run({ + prompt, // Already complete and ready to use + workingDir, + model, + modelReasoningEffort, + agent: options.agent, // Pass agent name for engines that support it (e.g., OpenCode --agent) + env: { + ...process.env, + // Pass parent agent ID to child processes (for orchestration context) + ...(monitoringAgentId !== undefined && { + CODEMACHINE_PARENT_AGENT_ID: monitoringAgentId.toString(), + }), + }, + onData: (chunk) => { + totalStdout += chunk; + + // Dual-stream: write to log file (with status text) AND original logger (with colors) + if (loggerService && monitoringAgentId !== undefined) { + // Transform color markers to status text for log file readability + const logChunk = formatForLogFile(chunk); + loggerService.write(monitoringAgentId, logChunk); + } + + // Keep original format with color markers for UI display + if (logger) { + logger(chunk); + } else { + try { + process.stdout.write(chunk); + } catch { + // ignore streaming failures + } + } + }, + onErrorData: (chunk) => { + // Also log stderr to file (with status text transformation) + if (loggerService && monitoringAgentId !== undefined) { + const logChunk = formatForLogFile(chunk); + loggerService.write(monitoringAgentId, `[STDERR] ${logChunk}`); + } + + if (stderrLogger) { + stderrLogger(chunk); + } else { + try { + process.stderr.write(chunk); + } catch { + // ignore streaming failures + } + } + }, + onTelemetry: (telemetry) => { + // Update telemetry in monitoring (fire and forget - don't block streaming) + if (monitor && monitoringAgentId !== undefined) { + monitor + .updateTelemetry(monitoringAgentId, telemetry) + .catch((err) => error(`Failed to update telemetry: ${err}`)); + } + + // Forward to caller's telemetry callback (for UI updates) + if (onTelemetry) { + onTelemetry(telemetry); + } + }, + abortSignal, + timeout, + }); + + // Store output in memory + const stdout = result.stdout || totalStdout; + const slice = stdout.slice(-2000); + await store.append({ + agentId, + content: slice, + timestamp: new Date().toISOString(), + }); + + // Mark agent as completed + if (monitor && monitoringAgentId !== undefined) { + await monitor.complete(monitoringAgentId); + // Note: Don't close stream here - workflow may write more messages + // Streams will be closed by cleanup handlers or monitoring service shutdown + } + + return { + output: stdout, + agentId: monitoringAgentId, + }; + } catch (error) { + // Mark agent as failed + if (monitor && monitoringAgentId !== undefined) { + await monitor.fail(monitoringAgentId, error as Error); + // Note: Don't close stream here - workflow may write more messages + // Streams will be closed by cleanup handlers or monitoring service shutdown + } + throw error; + } } diff --git a/src/infra/engines/core/types.ts b/src/infra/engines/core/types.ts index c7ceafc4..061e052d 100644 --- a/src/infra/engines/core/types.ts +++ b/src/infra/engines/core/types.ts @@ -9,36 +9,40 @@ export type EngineType = string; export interface ParsedTelemetry { - tokensIn: number; - tokensOut: number; - cached?: number; - cost?: number; - duration?: number; - cacheCreationTokens?: number; - cacheReadTokens?: number; + tokensIn: number; + tokensOut: number; + cached?: number; + cost?: number; + duration?: number; + cacheCreationTokens?: number; + cacheReadTokens?: number; } export interface EngineRunOptions { - prompt: string; - workingDir: string; - model?: string; - modelReasoningEffort?: 'low' | 'medium' | 'high'; - env?: NodeJS.ProcessEnv; - onData?: (chunk: string) => void; - onErrorData?: (chunk: string) => void; - onTelemetry?: (telemetry: ParsedTelemetry) => void; - abortSignal?: AbortSignal; - timeout?: number; + prompt: string; + workingDir: string; + model?: string; + modelReasoningEffort?: "low" | "medium" | "high"; + /** + * Agent name for engines that support agent selection (e.g., OpenCode --agent flag) + */ + agent?: string; + env?: NodeJS.ProcessEnv; + onData?: (chunk: string) => void; + onErrorData?: (chunk: string) => void; + onTelemetry?: (telemetry: ParsedTelemetry) => void; + abortSignal?: AbortSignal; + timeout?: number; } export interface EngineRunResult { - stdout: string; - stderr: string; + stdout: string; + stderr: string; } export interface Engine { - type: EngineType; - run(options: EngineRunOptions): Promise; + type: EngineType; + run(options: EngineRunOptions): Promise; } /** @@ -46,7 +50,7 @@ export interface Engine { * Basic validation - registry-specific validation should be done at runtime */ export function isValidEngineType(type: string): boolean { - return typeof type === 'string' && type.length > 0; + return typeof type === "string" && type.length > 0; } /** @@ -54,8 +58,10 @@ export function isValidEngineType(type: string): boolean { * Note: This basic validation can be enhanced when registry access is needed */ export function normalizeEngineType(type: string): string { - if (isValidEngineType(type)) { - return type; - } - throw new Error(`Invalid engine type "${type}". Engine type must be a non-empty string.`); + if (isValidEngineType(type)) { + return type; + } + throw new Error( + `Invalid engine type "${type}". Engine type must be a non-empty string.`, + ); } diff --git a/src/workflows/execution/step.ts b/src/workflows/execution/step.ts index 818f7275..b56da18a 100644 --- a/src/workflows/execution/step.ts +++ b/src/workflows/execution/step.ts @@ -1,35 +1,35 @@ -import * as path from 'node:path'; -import { readFile, mkdir } from 'node:fs/promises'; -import type { WorkflowStep } from '../templates/index.js'; -import { isModuleStep } from '../templates/types.js'; -import type { EngineType } from '../../infra/engines/index.js'; -import { processPromptString } from '../../shared/prompts/index.js'; -import { executeAgent } from '../../agents/runner/runner.js'; -import type { WorkflowUIManager } from '../../ui/index.js'; +import * as path from "node:path"; +import { readFile, mkdir } from "node:fs/promises"; +import type { WorkflowStep } from "../templates/index.js"; +import { isModuleStep } from "../templates/types.js"; +import type { EngineType } from "../../infra/engines/index.js"; +import { processPromptString } from "../../shared/prompts/index.js"; +import { executeAgent } from "../../agents/runner/runner.js"; +import type { WorkflowUIManager } from "../../ui/index.js"; export interface StepExecutorOptions { - logger: (chunk: string) => void; - stderrLogger: (chunk: string) => void; - timeout?: number; - ui?: WorkflowUIManager; - abortSignal?: AbortSignal; - /** Parent agent ID for tracking relationships */ - parentId?: number; - /** Disable monitoring (for special cases) */ - disableMonitoring?: boolean; - /** Unique agent ID for UI updates (includes step index) */ - uniqueAgentId?: string; + logger: (chunk: string) => void; + stderrLogger: (chunk: string) => void; + timeout?: number; + ui?: WorkflowUIManager; + abortSignal?: AbortSignal; + /** Parent agent ID for tracking relationships */ + parentId?: number; + /** Disable monitoring (for special cases) */ + disableMonitoring?: boolean; + /** Unique agent ID for UI updates (includes step index) */ + uniqueAgentId?: string; } async function ensureProjectScaffold(cwd: string): Promise { - const agentsDir = path.resolve(cwd, '.codemachine', 'agents'); - const planDir = path.resolve(cwd, '.codemachine', 'plan'); - await mkdir(agentsDir, { recursive: true }); - await mkdir(planDir, { recursive: true }); + const agentsDir = path.resolve(cwd, ".codemachine", "agents"); + const planDir = path.resolve(cwd, ".codemachine", "plan"); + await mkdir(agentsDir, { recursive: true }); + await mkdir(planDir, { recursive: true }); } async function runAgentsBuilderStep(cwd: string): Promise { - await ensureProjectScaffold(cwd); + await ensureProjectScaffold(cwd); } /** @@ -39,60 +39,63 @@ async function runAgentsBuilderStep(cwd: string): Promise { * after building the prompt. No duplication with runner.ts anymore. */ export async function executeStep( - step: WorkflowStep, - cwd: string, - options: StepExecutorOptions, + step: WorkflowStep, + cwd: string, + options: StepExecutorOptions, ): Promise { - // Only module steps can be executed - if (!isModuleStep(step)) { - throw new Error('Only module steps can be executed'); - } + // Only module steps can be executed + if (!isModuleStep(step)) { + throw new Error("Only module steps can be executed"); + } - // Load and process the prompt template - const promptPath = path.isAbsolute(step.promptPath) - ? step.promptPath - : path.resolve(cwd, step.promptPath); - const rawPrompt = await readFile(promptPath, 'utf8'); - const prompt = await processPromptString(rawPrompt, cwd); + // Load and process the prompt template + const promptPath = path.isAbsolute(step.promptPath) + ? step.promptPath + : path.resolve(cwd, step.promptPath); + const rawPrompt = await readFile(promptPath, "utf8"); + const prompt = await processPromptString(rawPrompt, cwd); - // Use environment variable or default to 30 minutes (1800000ms) - const timeout = - options.timeout ?? - (process.env.CODEMACHINE_AGENT_TIMEOUT - ? Number.parseInt(process.env.CODEMACHINE_AGENT_TIMEOUT, 10) - : 1800000); + // Use environment variable or default to 30 minutes (1800000ms) + const timeout = + options.timeout ?? + (process.env.CODEMACHINE_AGENT_TIMEOUT + ? Number.parseInt(process.env.CODEMACHINE_AGENT_TIMEOUT, 10) + : 1800000); - // Determine engine: step override > default - const engineType: EngineType | undefined = step.engine; + // Determine engine: step override > default + const engineType: EngineType | undefined = step.engine; - // Execute via the unified execution runner - // Runner handles: auth, monitoring, engine execution, memory storage - const result = await executeAgent(step.agentId, prompt, { - workingDir: cwd, - engine: engineType, - model: step.model, - logger: options.logger, - stderrLogger: options.stderrLogger, - onTelemetry: options.ui && options.uniqueAgentId - ? (telemetry) => options.ui!.updateAgentTelemetry(options.uniqueAgentId!, telemetry) - : undefined, - parentId: options.parentId, - disableMonitoring: options.disableMonitoring, - abortSignal: options.abortSignal, - timeout, - ui: options.ui, - uniqueAgentId: options.uniqueAgentId, - }); + // Execute via the unified execution runner + // Runner handles: auth, monitoring, engine execution, memory storage + const result = await executeAgent(step.agentId, prompt, { + workingDir: cwd, + engine: engineType, + model: step.model, + agent: step.agent, // Pass OpenCode agent name for --agent flag + logger: options.logger, + stderrLogger: options.stderrLogger, + onTelemetry: + options.ui && options.uniqueAgentId + ? (telemetry) => + options.ui!.updateAgentTelemetry(options.uniqueAgentId!, telemetry) + : undefined, + parentId: options.parentId, + disableMonitoring: options.disableMonitoring, + abortSignal: options.abortSignal, + timeout, + ui: options.ui, + uniqueAgentId: options.uniqueAgentId, + }); - // Run special post-execution steps - const agentName = step.agentName.toLowerCase(); - if (step.agentId === 'agents-builder' || agentName.includes('builder')) { - await runAgentsBuilderStep(cwd); - } + // Run special post-execution steps + const agentName = step.agentName.toLowerCase(); + if (step.agentId === "agents-builder" || agentName.includes("builder")) { + await runAgentsBuilderStep(cwd); + } - // NOTE: Telemetry is already updated via onTelemetry callback during streaming execution. - // DO NOT parse from final output - it would match the FIRST telemetry line (early/wrong values) - // instead of the LAST telemetry line (final/correct values), causing incorrect UI display. + // NOTE: Telemetry is already updated via onTelemetry callback during streaming execution. + // DO NOT parse from final output - it would match the FIRST telemetry line (early/wrong values) + // instead of the LAST telemetry line (final/correct values), causing incorrect UI display. - return result.output; + return result.output; } diff --git a/src/workflows/templates/types.ts b/src/workflows/templates/types.ts index 74b68925..30a5468e 100644 --- a/src/workflows/templates/types.ts +++ b/src/workflows/templates/types.ts @@ -1,48 +1,57 @@ export type UnknownRecord = Record; export interface LoopModuleBehavior { - type: 'loop'; - action: 'stepBack'; - steps: number; - trigger?: string; // Optional: behavior now controlled via .codemachine/memory/behavior.json - maxIterations?: number; - skip?: string[]; + type: "loop"; + action: "stepBack"; + steps: number; + trigger?: string; // Optional: behavior now controlled via .codemachine/memory/behavior.json + maxIterations?: number; + skip?: string[]; } export interface TriggerModuleBehavior { - type: 'trigger'; - action: 'mainAgentCall'; - triggerAgentId: string; // Agent ID to trigger + type: "trigger"; + action: "mainAgentCall"; + triggerAgentId: string; // Agent ID to trigger } export interface CheckpointModuleBehavior { - type: 'checkpoint'; - action: 'evaluate'; + type: "checkpoint"; + action: "evaluate"; } -export type ModuleBehavior = LoopModuleBehavior | TriggerModuleBehavior | CheckpointModuleBehavior; +export type ModuleBehavior = + | LoopModuleBehavior + | TriggerModuleBehavior + | CheckpointModuleBehavior; export interface ModuleMetadata { - id: string; - behavior?: ModuleBehavior; + id: string; + behavior?: ModuleBehavior; } export interface ModuleStep { - type: 'module'; - agentId: string; - agentName: string; - promptPath: string; - model?: string; - modelReasoningEffort?: 'low' | 'medium' | 'high'; - engine?: string; // Dynamic engine type from registry - module?: ModuleMetadata; - executeOnce?: boolean; - notCompletedFallback?: string; // Agent ID to run if step is in notCompletedSteps + type: "module"; + agentId: string; + agentName: string; + promptPath: string; + model?: string; + modelReasoningEffort?: "low" | "medium" | "high"; + engine?: string; // Dynamic engine type from registry + /** + * OpenCode agent name to use (passed as --agent flag) + * This selects which OpenCode agent configuration to use for execution. + * If not specified, uses the engine's default agent. + */ + agent?: string; + module?: ModuleMetadata; + executeOnce?: boolean; + notCompletedFallback?: string; // Agent ID to run if step is in notCompletedSteps } export interface UIStep { - type: 'ui'; - text: string; + type: "ui"; + text: string; } export type WorkflowStep = ModuleStep | UIStep; @@ -51,28 +60,28 @@ export type WorkflowStep = ModuleStep | UIStep; * Type guard to check if a step is a ModuleStep */ export function isModuleStep(step: WorkflowStep): step is ModuleStep { - return step.type === 'module'; + return step.type === "module"; } export interface WorkflowTemplate { - name: string; - steps: WorkflowStep[]; - subAgentIds?: string[]; + name: string; + steps: WorkflowStep[]; + subAgentIds?: string[]; } -export type ModuleName = ModuleStep['agentId']; +export type ModuleName = ModuleStep["agentId"]; export interface RunWorkflowOptions { - cwd?: string; - templatePath?: string; - specificationPath?: string; + cwd?: string; + templatePath?: string; + specificationPath?: string; } export interface TaskManagerOptions { - cwd: string; - tasksPath?: string; - logsPath?: string; - parallel?: boolean; - abortSignal?: AbortSignal; - execute?: (agentId: string, prompt: string) => Promise; + cwd: string; + tasksPath?: string; + logsPath?: string; + parallel?: boolean; + abortSignal?: AbortSignal; + execute?: (agentId: string, prompt: string) => Promise; } From 812aae99a1579f74f55e7092327ea34475956282 Mon Sep 17 00:00:00 2001 From: Nick Roth Date: Thu, 4 Dec 2025 10:23:15 -0600 Subject: [PATCH 5/6] feat: add OpenCode consolidated server mode for workflow execution Adds server lifecycle management to enable workflow steps to share a single OpenCode server instance, avoiding MCP cold boot times between steps. New features: - startOpenCodeServer() - starts server on random port with health polling - attachToExternalServer() - connects to existing external server - checkServerHealth() / listServerAgents() - server introspection - --attach flag for opencode run command - --opencode-server / --opencode-attach CLI flags for codemachine start - Automatic server lifecycle in workflow execution (start, inject URLs, cleanup) Integration tests included for command building, server lifecycle, and external server attachment. --- src/agents/runner/runner.ts | 8 + src/cli/commands/start.command.ts | 195 ++- src/infra/engines/core/types.ts | 6 + .../providers/opencode/execution/commands.ts | 84 +- .../providers/opencode/execution/index.ts | 7 +- .../providers/opencode/execution/runner.ts | 669 ++++----- .../providers/opencode/execution/server.ts | 285 ++++ src/workflows/execution/step.ts | 1 + src/workflows/execution/workflow.ts | 1215 ++++++++++------- src/workflows/templates/types.ts | 19 + tests/integration/opencode/server.test.ts | 189 +++ 11 files changed, 1741 insertions(+), 937 deletions(-) create mode 100644 src/infra/engines/providers/opencode/execution/server.ts create mode 100644 tests/integration/opencode/server.test.ts diff --git a/src/agents/runner/runner.ts b/src/agents/runner/runner.ts index 1fed5516..667a8f75 100644 --- a/src/agents/runner/runner.ts +++ b/src/agents/runner/runner.ts @@ -134,6 +134,13 @@ export interface ExecuteAgentOptions { * Agent name for engines that support agent selection (e.g., OpenCode --agent flag) */ agent?: string; + + /** + * URL of running server to attach to (e.g., http://localhost:4096) + * For OpenCode, uses --attach flag to connect to existing server. + * This avoids MCP server cold boot times on every run. + */ + attach?: string; } /** @@ -322,6 +329,7 @@ export async function executeAgent( model, modelReasoningEffort, agent: options.agent, // Pass agent name for engines that support it (e.g., OpenCode --agent) + attach: options.attach, // Pass server URL for engines that support it (e.g., OpenCode --attach) env: { ...process.env, // Pass parent agent ID to child processes (for orchestration context) diff --git a/src/cli/commands/start.command.ts b/src/cli/commands/start.command.ts index 2aa0e812..519d9815 100644 --- a/src/cli/commands/start.command.ts +++ b/src/cli/commands/start.command.ts @@ -1,84 +1,141 @@ -import * as path from 'node:path'; -import type { Command } from 'commander'; +import * as path from "node:path"; +import type { Command } from "commander"; -import { debug } from '../../shared/logging/logger.js'; -import { clearTerminal } from '../../shared/utils/terminal.js'; +import { debug } from "../../shared/logging/logger.js"; +import { clearTerminal } from "../../shared/utils/terminal.js"; -const DEFAULT_SPEC_PATH = '.codemachine/inputs/specifications.md'; +const DEFAULT_SPEC_PATH = ".codemachine/inputs/specifications.md"; type StartCommandOptions = { - spec?: string; + spec?: string; + opencodeServer?: boolean; + opencodeAttach?: string; }; export function registerStartCommand(program: Command): void { - program - .command('start') - .description('Run the workflow queue until completion (non-interactive)') - .option('--spec ', 'Path to the planning specification file') - .action(async (options: StartCommandOptions, command: Command) => { - const cwd = process.env.CODEMACHINE_CWD || process.cwd(); + program + .command("start") + .description("Run the workflow queue until completion (non-interactive)") + .option("--spec ", "Path to the planning specification file") + .option( + "--opencode-server", + "[Experimental] Start a consolidated OpenCode server for all workflow steps. " + + "Reduces MCP cold boot times by reusing a single server instance.", + ) + .option( + "--opencode-attach ", + "Attach to an existing OpenCode server (e.g., http://localhost:4096). " + + "Mutually exclusive with --opencode-server.", + ) + .action(async (options: StartCommandOptions, command: Command) => { + const cwd = process.env.CODEMACHINE_CWD || process.cwd(); - // Use command-specific --spec if provided, otherwise fall back to global --spec, then default - const globalOpts = command.optsWithGlobals ? command.optsWithGlobals() : command.opts(); - const specPath = options.spec ?? globalOpts.spec ?? DEFAULT_SPEC_PATH; - const specificationPath = path.resolve(cwd, specPath); + // Use command-specific --spec if provided, otherwise fall back to global --spec, then default + const globalOpts = command.optsWithGlobals + ? command.optsWithGlobals() + : command.opts(); + const specPath = options.spec ?? globalOpts.spec ?? DEFAULT_SPEC_PATH; + const specificationPath = path.resolve(cwd, specPath); - debug(`Starting workflow (spec: ${specificationPath})`); + // Validate mutually exclusive options + if (options.opencodeServer && options.opencodeAttach) { + console.error( + "Error: --opencode-server and --opencode-attach are mutually exclusive", + ); + process.exit(1); + } - // Comprehensive terminal clearing - clearTerminal(); + debug(`Starting workflow (spec: ${specificationPath})`); + if (options.opencodeServer) { + debug("OpenCode consolidated server mode enabled (experimental)"); + } + if (options.opencodeAttach) { + debug( + `Attaching to external OpenCode server: ${options.opencodeAttach}`, + ); + } - // Determine execution method based on environment: - // - Dev mode: Import and run workflow directly (no SolidJS preload in dev) - // - Production: Spawn workflow binary (prevents JSX conflicts) - const isDev = import.meta.url.includes('/src/') + // Comprehensive terminal clearing + clearTerminal(); - if (isDev) { - // Development mode - directly import and run (SolidJS preload not active) - const { runWorkflowQueue } = await import('../../workflows/index.js'); - const { ValidationError } = await import('../../runtime/services/validation.js'); - try { - await runWorkflowQueue({ cwd, specificationPath }); - console.log('\nāœ“ Workflow completed successfully'); - process.exit(0); - } catch (error) { - // Show friendly instructional message for validation errors (no stack trace) - if (error instanceof ValidationError) { - console.log(`\n${error.message}\n`); - process.exit(1); - } - // Show detailed error for other failures - console.error('\nāœ— Workflow failed:', error instanceof Error ? error.message : String(error)); - process.exit(1); - } - } else { - // Production mode - spawn workflow binary to avoid JSX conflicts - // The main binary has SolidJS transform, so we must use separate workflow binary - const { spawnProcess } = await import('../../infra/process/spawn.js'); - const { resolveWorkflowBinary } = await import('../../shared/utils/resolve-workflow-binary.js'); + // Determine execution method based on environment: + // - Dev mode: Import and run workflow directly (no SolidJS preload in dev) + // - Production: Spawn workflow binary (prevents JSX conflicts) + const isDev = import.meta.url.includes("/src/"); - try { - const result = await spawnProcess({ - command: resolveWorkflowBinary(), - args: [cwd, specificationPath], - // Pass CODEMACHINE_INSTALL_DIR from parent process to child - env: process.env.CODEMACHINE_INSTALL_DIR ? { - CODEMACHINE_INSTALL_DIR: process.env.CODEMACHINE_INSTALL_DIR - } : undefined, - stdioMode: 'inherit', // Let workflow take full terminal control - }); + if (isDev) { + // Development mode - directly import and run (SolidJS preload not active) + const { runWorkflowQueue } = await import("../../workflows/index.js"); + const { ValidationError } = await import( + "../../runtime/services/validation.js" + ); + try { + await runWorkflowQueue({ + cwd, + specificationPath, + opencodeServer: options.opencodeServer, + opencodeAttach: options.opencodeAttach, + }); + console.log("\nāœ“ Workflow completed successfully"); + process.exit(0); + } catch (error) { + // Show friendly instructional message for validation errors (no stack trace) + if (error instanceof ValidationError) { + console.log(`\n${error.message}\n`); + process.exit(1); + } + // Show detailed error for other failures + console.error( + "\nāœ— Workflow failed:", + error instanceof Error ? error.message : String(error), + ); + process.exit(1); + } + } else { + // Production mode - spawn workflow binary to avoid JSX conflicts + // The main binary has SolidJS transform, so we must use separate workflow binary + const { spawnProcess } = await import("../../infra/process/spawn.js"); + const { resolveWorkflowBinary } = await import( + "../../shared/utils/resolve-workflow-binary.js" + ); - if (result.exitCode === 0) { - console.log('\nāœ“ Workflow completed successfully'); - process.exit(0); - } else { - console.error(`\nāœ— Workflow failed with exit code ${result.exitCode}`); - process.exit(result.exitCode); - } - } catch (error) { - console.error('\nāœ— Failed to spawn workflow:', error instanceof Error ? error.message : String(error)); - process.exit(1); - } - } - }); + try { + // Build environment with optional OpenCode server settings + const spawnEnv: Record = {}; + if (process.env.CODEMACHINE_INSTALL_DIR) { + spawnEnv.CODEMACHINE_INSTALL_DIR = + process.env.CODEMACHINE_INSTALL_DIR; + } + if (options.opencodeServer) { + spawnEnv.CODEMACHINE_OPENCODE_SERVER = "1"; + } + if (options.opencodeAttach) { + spawnEnv.CODEMACHINE_OPENCODE_ATTACH = options.opencodeAttach; + } + + const result = await spawnProcess({ + command: resolveWorkflowBinary(), + args: [cwd, specificationPath], + env: Object.keys(spawnEnv).length > 0 ? spawnEnv : undefined, + stdioMode: "inherit", // Let workflow take full terminal control + }); + + if (result.exitCode === 0) { + console.log("\nāœ“ Workflow completed successfully"); + process.exit(0); + } else { + console.error( + `\nāœ— Workflow failed with exit code ${result.exitCode}`, + ); + process.exit(result.exitCode); + } + } catch (error) { + console.error( + "\nāœ— Failed to spawn workflow:", + error instanceof Error ? error.message : String(error), + ); + process.exit(1); + } + } + }); } diff --git a/src/infra/engines/core/types.ts b/src/infra/engines/core/types.ts index 061e052d..8cdcb4ed 100644 --- a/src/infra/engines/core/types.ts +++ b/src/infra/engines/core/types.ts @@ -27,6 +27,12 @@ export interface EngineRunOptions { * Agent name for engines that support agent selection (e.g., OpenCode --agent flag) */ agent?: string; + /** + * URL of running server to attach to (e.g., http://localhost:4096) + * For OpenCode, uses --attach flag to connect to existing server. + * This avoids MCP server cold boot times on every run. + */ + attach?: string; env?: NodeJS.ProcessEnv; onData?: (chunk: string) => void; onErrorData?: (chunk: string) => void; diff --git a/src/infra/engines/providers/opencode/execution/commands.ts b/src/infra/engines/providers/opencode/execution/commands.ts index 056d7fc4..eaf5660c 100644 --- a/src/infra/engines/providers/opencode/execution/commands.ts +++ b/src/infra/engines/providers/opencode/execution/commands.ts @@ -1,33 +1,71 @@ export interface OpenCodeCommandOptions { - /** - * Provider/model identifier (e.g., anthropic/claude-3.7-sonnet) - */ - model?: string; - /** - * Agent name to run (defaults to 'build') - */ - agent?: string; + /** + * Provider/model identifier (e.g., anthropic/claude-3.7-sonnet) + */ + model?: string; + /** + * Agent name to run (defaults to 'build') + */ + agent?: string; + /** + * URL of running OpenCode server to attach to (e.g., http://localhost:4096) + * When provided, uses --attach flag to connect to existing server instead of spawning new process. + * This avoids MCP server cold boot times on every run. + */ + attach?: string; } export interface OpenCodeCommand { - command: string; - args: string[]; + command: string; + args: string[]; } -export function buildOpenCodeRunCommand(options: OpenCodeCommandOptions = {}): OpenCodeCommand { - const args: string[] = ['run', '--format', 'json']; +/** + * Build command for `opencode run` + */ +export function buildOpenCodeRunCommand( + options: OpenCodeCommandOptions = {}, +): OpenCodeCommand { + const args: string[] = ["run", "--format", "json"]; - const agentName = options.agent?.trim() || 'build'; - if (agentName) { - args.push('--agent', agentName); - } + // Attach to existing server if URL provided + if (options.attach?.trim()) { + args.push("--attach", options.attach.trim()); + } - if (options.model?.trim()) { - args.push('--model', options.model.trim()); - } + const agentName = options.agent?.trim() || "build"; + if (agentName) { + args.push("--agent", agentName); + } - return { - command: 'opencode', - args, - }; + if (options.model?.trim()) { + args.push("--model", options.model.trim()); + } + + return { + command: "opencode", + args, + }; +} + +/** + * Build command for `opencode serve` + */ +export function buildOpenCodeServeCommand( + options: { port?: number; hostname?: string } = {}, +): OpenCodeCommand { + const args: string[] = ["serve"]; + + if (options.port) { + args.push("--port", options.port.toString()); + } + + if (options.hostname?.trim()) { + args.push("--hostname", options.hostname.trim()); + } + + return { + command: "opencode", + args, + }; } diff --git a/src/infra/engines/providers/opencode/execution/index.ts b/src/infra/engines/providers/opencode/execution/index.ts index c6d3990c..7bb32581 100644 --- a/src/infra/engines/providers/opencode/execution/index.ts +++ b/src/infra/engines/providers/opencode/execution/index.ts @@ -1,3 +1,4 @@ -export * from './commands.js'; -export * from './executor.js'; -export * from './runner.js'; +export * from "./commands.js"; +export * from "./executor.js"; +export * from "./runner.js"; +export * from "./server.js"; diff --git a/src/infra/engines/providers/opencode/execution/runner.ts b/src/infra/engines/providers/opencode/execution/runner.ts index adc024d3..38e49fe0 100644 --- a/src/infra/engines/providers/opencode/execution/runner.ts +++ b/src/infra/engines/providers/opencode/execution/runner.ts @@ -1,346 +1,389 @@ -import * as path from 'node:path'; - -import { spawnProcess } from '../../../../process/spawn.js'; -import { buildOpenCodeRunCommand } from './commands.js'; -import { metadata } from '../metadata.js'; -import { resolveOpenCodeHome } from '../auth.js'; -import { formatCommand, formatResult, formatStatus, formatMessage } from '../../../../../shared/formatters/outputMarkers.js'; -import { logger } from '../../../../../shared/logging/index.js'; -import { createTelemetryCapture } from '../../../../../shared/telemetry/index.js'; -import type { ParsedTelemetry } from '../../../core/types.js'; +import * as path from "node:path"; + +import { spawnProcess } from "../../../../process/spawn.js"; +import { buildOpenCodeRunCommand } from "./commands.js"; +import { metadata } from "../metadata.js"; +import { resolveOpenCodeHome } from "../auth.js"; +import { + formatCommand, + formatResult, + formatStatus, + formatMessage, +} from "../../../../../shared/formatters/outputMarkers.js"; +import { logger } from "../../../../../shared/logging/index.js"; +import { createTelemetryCapture } from "../../../../../shared/telemetry/index.js"; +import type { ParsedTelemetry } from "../../../core/types.js"; export interface RunOpenCodeOptions { - prompt: string; - workingDir: string; - model?: string; - agent?: string; - env?: NodeJS.ProcessEnv; - onData?: (chunk: string) => void; - onErrorData?: (chunk: string) => void; - onTelemetry?: (telemetry: ParsedTelemetry) => void; - abortSignal?: AbortSignal; - timeout?: number; // Timeout in milliseconds (default: 1800000ms = 30 minutes) + prompt: string; + workingDir: string; + model?: string; + agent?: string; + /** + * URL of running OpenCode server to attach to (e.g., http://localhost:4096) + * When provided, uses --attach flag to connect to existing server instead of spawning new process. + * This avoids MCP server cold boot times on every run. + */ + attach?: string; + env?: NodeJS.ProcessEnv; + onData?: (chunk: string) => void; + onErrorData?: (chunk: string) => void; + onTelemetry?: (telemetry: ParsedTelemetry) => void; + abortSignal?: AbortSignal; + timeout?: number; // Timeout in milliseconds (default: 1800000ms = 30 minutes) } export interface RunOpenCodeResult { - stdout: string; - stderr: string; + stdout: string; + stderr: string; } -const ANSI_ESCAPE_SEQUENCE = new RegExp(String.raw`\u001B\[[0-9;?]*[ -/]*[@-~]`, 'g'); +const ANSI_ESCAPE_SEQUENCE = new RegExp( + String.raw`\u001B\[[0-9;?]*[ -/]*[@-~]`, + "g", +); -function shouldApplyDefault(key: string, overrides?: NodeJS.ProcessEnv): boolean { - return overrides?.[key] === undefined && process.env[key] === undefined; +function shouldApplyDefault( + key: string, + overrides?: NodeJS.ProcessEnv, +): boolean { + return overrides?.[key] === undefined && process.env[key] === undefined; } function resolveRunnerEnv(env?: NodeJS.ProcessEnv): NodeJS.ProcessEnv { - const runnerEnv: NodeJS.ProcessEnv = { ...process.env, ...(env ?? {}) }; + const runnerEnv: NodeJS.ProcessEnv = { ...process.env, ...(env ?? {}) }; - // Set all three XDG environment variables to subdirectories under OPENCODE_HOME - // This centralizes all OpenCode data under ~/.codemachine/opencode by default - const opencodeHome = resolveOpenCodeHome(runnerEnv.OPENCODE_HOME); + // Set all three XDG environment variables to subdirectories under OPENCODE_HOME + // This centralizes all OpenCode data under ~/.codemachine/opencode by default + const opencodeHome = resolveOpenCodeHome(runnerEnv.OPENCODE_HOME); - if (shouldApplyDefault('XDG_CONFIG_HOME', env)) { - runnerEnv.XDG_CONFIG_HOME = path.join(opencodeHome, 'config'); - } + if (shouldApplyDefault("XDG_CONFIG_HOME", env)) { + runnerEnv.XDG_CONFIG_HOME = path.join(opencodeHome, "config"); + } - if (shouldApplyDefault('XDG_CACHE_HOME', env)) { - runnerEnv.XDG_CACHE_HOME = path.join(opencodeHome, 'cache'); - } + if (shouldApplyDefault("XDG_CACHE_HOME", env)) { + runnerEnv.XDG_CACHE_HOME = path.join(opencodeHome, "cache"); + } - if (shouldApplyDefault('XDG_DATA_HOME', env)) { - runnerEnv.XDG_DATA_HOME = path.join(opencodeHome, 'data'); - } + if (shouldApplyDefault("XDG_DATA_HOME", env)) { + runnerEnv.XDG_DATA_HOME = path.join(opencodeHome, "data"); + } - return runnerEnv; + return runnerEnv; } const truncate = (value: string, length = 100): string => - value.length > length ? `${value.slice(0, length)}...` : value; + value.length > length ? `${value.slice(0, length)}...` : value; function cleanAnsi(text: string, plainLogs: boolean): string { - if (!plainLogs) return text; - return text.replace(ANSI_ESCAPE_SEQUENCE, ''); + if (!plainLogs) return text; + return text.replace(ANSI_ESCAPE_SEQUENCE, ""); } function formatToolUse(part: unknown, plainLogs: boolean): string { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const partObj = (typeof part === 'object' && part !== null ? part : {}) as Record; - const tool = partObj?.tool ?? 'tool'; - const base = formatCommand(tool, 'success'); - const state = partObj?.state ?? {}; - - if (tool === 'bash') { - const outputRaw = - typeof state?.output === 'string' - ? state.output - : state?.output - ? JSON.stringify(state.output) - : ''; - const output = cleanAnsi(outputRaw?.trim() ?? '', plainLogs); - if (output) { - return `${base}\n${formatResult(output, false)}`; - } - return base; - } - - const previewSource = - (typeof state?.title === 'string' && state.title.trim()) || - (typeof state?.output === 'string' && state.output.trim()) || - (state?.input && Object.keys(state.input).length > 0 ? JSON.stringify(state.input) : ''); - - if (previewSource) { - const preview = cleanAnsi(previewSource.trim(), plainLogs); - return `${base}\n${formatResult(truncate(preview), false)}`; - } - - return base; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const partObj = ( + typeof part === "object" && part !== null ? part : {} + ) as Record; + const tool = partObj?.tool ?? "tool"; + const base = formatCommand(tool, "success"); + const state = partObj?.state ?? {}; + + if (tool === "bash") { + const outputRaw = + typeof state?.output === "string" + ? state.output + : state?.output + ? JSON.stringify(state.output) + : ""; + const output = cleanAnsi(outputRaw?.trim() ?? "", plainLogs); + if (output) { + return `${base}\n${formatResult(output, false)}`; + } + return base; + } + + const previewSource = + (typeof state?.title === "string" && state.title.trim()) || + (typeof state?.output === "string" && state.output.trim()) || + (state?.input && Object.keys(state.input).length > 0 + ? JSON.stringify(state.input) + : ""); + + if (previewSource) { + const preview = cleanAnsi(previewSource.trim(), plainLogs); + return `${base}\n${formatResult(truncate(preview), false)}`; + } + + return base; } function formatStepEvent(type: string, part: unknown): string | null { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const partObj = (typeof part === 'object' && part !== null ? part : {}) as Record; - const reason = typeof partObj?.reason === 'string' ? partObj.reason : undefined; - - // Only show final step (reason: 'stop'), skip intermediate steps (reason: 'tool-calls') - if (reason !== 'stop') { - return null; - } - - const tokens = partObj?.tokens; - if (!tokens) { - return null; - } - - const cache = (tokens.cache?.read ?? 0) + (tokens.cache?.write ?? 0); - const totalIn = (tokens.input ?? 0) + cache; - const tokenSummary = `ā±ļø Tokens: ${totalIn}in/${tokens.output ?? 0}out${cache > 0 ? ` (${cache} cached)` : ''}`; - - return tokenSummary; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const partObj = ( + typeof part === "object" && part !== null ? part : {} + ) as Record; + const reason = + typeof partObj?.reason === "string" ? partObj.reason : undefined; + + // Only show final step (reason: 'stop'), skip intermediate steps (reason: 'tool-calls') + if (reason !== "stop") { + return null; + } + + const tokens = partObj?.tokens; + if (!tokens) { + return null; + } + + const cache = (tokens.cache?.read ?? 0) + (tokens.cache?.write ?? 0); + const totalIn = (tokens.input ?? 0) + cache; + const tokenSummary = `ā±ļø Tokens: ${totalIn}in/${tokens.output ?? 0}out${cache > 0 ? ` (${cache} cached)` : ""}`; + + return tokenSummary; } function formatErrorEvent(error: unknown, plainLogs: boolean): string { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const errorObj = (typeof error === 'object' && error !== null ? error : {}) as Record; - const dataMessage = - typeof errorObj?.data?.message === 'string' - ? errorObj.data.message - : typeof errorObj?.message === 'string' - ? errorObj.message - : typeof errorObj?.name === 'string' - ? errorObj.name - : 'OpenCode reported an unknown error'; - - const cleaned = cleanAnsi(dataMessage, plainLogs); - return `${formatCommand('OpenCode Error', 'error')}\n${formatResult(cleaned, true)}`; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const errorObj = ( + typeof error === "object" && error !== null ? error : {} + ) as Record; + const dataMessage = + typeof errorObj?.data?.message === "string" + ? errorObj.data.message + : typeof errorObj?.message === "string" + ? errorObj.message + : typeof errorObj?.name === "string" + ? errorObj.name + : "OpenCode reported an unknown error"; + + const cleaned = cleanAnsi(dataMessage, plainLogs); + return `${formatCommand("OpenCode Error", "error")}\n${formatResult(cleaned, true)}`; } -export async function runOpenCode(options: RunOpenCodeOptions): Promise { - const { - prompt, - workingDir, - model, - agent, - env, - onData, - onErrorData, - onTelemetry, - abortSignal, - timeout = 1800000, - } = options; - - if (!prompt) { - throw new Error('runOpenCode requires a prompt.'); - } - - if (!workingDir) { - throw new Error('runOpenCode requires a working directory.'); - } - - const runnerEnv = resolveRunnerEnv(env); - const plainLogs = - (env?.CODEMACHINE_PLAIN_LOGS ?? process.env.CODEMACHINE_PLAIN_LOGS ?? '').toString() === '1'; - const { command, args } = buildOpenCodeRunCommand({ model, agent }); - - logger.debug( - `OpenCode runner - prompt length: ${prompt.length}, lines: ${prompt.split('\n').length}, agent: ${ - agent ?? 'build' - }, model: ${model ?? 'default'}`, - ); - - const telemetryCapture = createTelemetryCapture('opencode', model, prompt, workingDir); - let jsonBuffer = ''; - let isFirstStep = true; - - const processLine = (line: string): void => { - if (!line.trim()) { - return; - } - - let parsed: unknown; - try { - parsed = JSON.parse(line); - } catch { - return; - } - - telemetryCapture.captureFromStreamJson(line); - - if (onTelemetry) { - const captured = telemetryCapture.getCaptured(); - if (captured?.tokens) { - const totalIn = - (captured.tokens.input ?? 0) + (captured.tokens.cached ?? 0); - onTelemetry({ - tokensIn: totalIn, - tokensOut: captured.tokens.output ?? 0, - cached: captured.tokens.cached, - cost: captured.cost, - duration: captured.duration, - }); - } - } - - // Type guard for parsed JSON - if (typeof parsed !== 'object' || parsed === null) { - return; - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const parsedObj = parsed as Record; - - let formatted: string | null = null; - switch (parsedObj.type) { - case 'tool_use': - formatted = formatToolUse(parsedObj.part, plainLogs); - break; - case 'step_start': - if (isFirstStep) { - isFirstStep = false; - formatted = formatStatus('OpenCode is analyzing your request...'); - } - // Subsequent step_start events are silent - break; - case 'step_finish': - formatted = formatStepEvent(parsedObj.type, parsedObj.part); - break; - case 'text': { - const textPart = parsedObj.part; - const textValue = - typeof textPart?.text === 'string' - ? cleanAnsi(textPart.text, plainLogs) - : ''; - formatted = textValue ? formatMessage(textValue) : null; - break; - } - case 'error': - formatted = formatErrorEvent(parsedObj.error, plainLogs); - break; - default: - break; - } - - if (formatted) { - const suffix = formatted.endsWith('\n') ? '' : '\n'; - onData?.(formatted + suffix); - } - }; - - const normalizeChunk = (chunk: string): string => { - let result = chunk; - - // Convert line endings to \n - result = result.replace(/\r\n/g, '\n').replace(/\r/g, '\n'); - - // Handle carriage returns that cause line overwrites - result = result.replace(/^.*\r([^\r\n]*)/gm, '$1'); - - // Strip ANSI sequences in plain mode - if (plainLogs) { - result = result.replace(ANSI_ESCAPE_SEQUENCE, ''); - } - - // Collapse excessive newlines - result = result.replace(/\n{3,}/g, '\n\n'); - - return result; - }; - - let result; - try { - result = await spawnProcess({ - command, - args, - cwd: workingDir, - env: runnerEnv, - stdinInput: prompt, - stdioMode: 'pipe', - onStdout: (chunk) => { - const normalized = normalizeChunk(chunk); - jsonBuffer += normalized; - - const lines = jsonBuffer.split('\n'); - jsonBuffer = lines.pop() ?? ''; - - for (const line of lines) { - processLine(line); - } - }, - onStderr: (chunk) => { - const normalized = normalizeChunk(chunk); - const cleaned = cleanAnsi(normalized, plainLogs); - onErrorData?.(cleaned); - }, - signal: abortSignal, - timeout, - }); - } catch (error) { - const err = error as { code?: string; message?: string }; - const message = err?.message ?? ''; - const notFound = - err?.code === 'ENOENT' || - /not recognized as an internal or external command/i.test(message) || - /command not found/i.test(message); - - if (notFound) { - const installMessage = [ - `'${command}' is not available on this system.`, - 'Install OpenCode via:', - ' npm i -g opencode-ai@latest', - ' brew install opencode', - ' scoop bucket add extras && scoop install extras/opencode', - ' choco install opencode', - 'Docs: https://opencode.ai/docs', - ].join('\n'); - logger.error(`${metadata.name} CLI not found when executing: ${command} ${args.join(' ')}`); - throw new Error(installMessage); - } - - throw error; - } - - if (jsonBuffer.trim()) { - processLine(jsonBuffer); - jsonBuffer = ''; - } - - if (result.exitCode !== 0) { - const stderr = result.stderr.trim(); - const stdout = result.stdout.trim(); - const sample = (stderr || stdout || 'no error output').split('\n').slice(0, 10).join('\n'); - - logger.error('OpenCode CLI execution failed', { - exitCode: result.exitCode, - sample, - command: `${command} ${args.join(' ')}`, - }); - - throw new Error(`OpenCode CLI exited with code ${result.exitCode}`); - } - - telemetryCapture.logCapturedTelemetry(result.exitCode); - - return { - stdout: result.stdout, - stderr: result.stderr, - }; +export async function runOpenCode( + options: RunOpenCodeOptions, +): Promise { + const { + prompt, + workingDir, + model, + agent, + attach, + env, + onData, + onErrorData, + onTelemetry, + abortSignal, + timeout = 1800000, + } = options; + + if (!prompt) { + throw new Error("runOpenCode requires a prompt."); + } + + if (!workingDir) { + throw new Error("runOpenCode requires a working directory."); + } + + const runnerEnv = resolveRunnerEnv(env); + const plainLogs = + ( + env?.CODEMACHINE_PLAIN_LOGS ?? + process.env.CODEMACHINE_PLAIN_LOGS ?? + "" + ).toString() === "1"; + const { command, args } = buildOpenCodeRunCommand({ model, agent, attach }); + + logger.debug( + `OpenCode runner - prompt length: ${prompt.length}, lines: ${prompt.split("\n").length}, agent: ${ + agent ?? "build" + }, model: ${model ?? "default"}${attach ? `, attach: ${attach}` : ""}`, + ); + + const telemetryCapture = createTelemetryCapture( + "opencode", + model, + prompt, + workingDir, + ); + let jsonBuffer = ""; + let isFirstStep = true; + + const processLine = (line: string): void => { + if (!line.trim()) { + return; + } + + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + return; + } + + telemetryCapture.captureFromStreamJson(line); + + if (onTelemetry) { + const captured = telemetryCapture.getCaptured(); + if (captured?.tokens) { + const totalIn = + (captured.tokens.input ?? 0) + (captured.tokens.cached ?? 0); + onTelemetry({ + tokensIn: totalIn, + tokensOut: captured.tokens.output ?? 0, + cached: captured.tokens.cached, + cost: captured.cost, + duration: captured.duration, + }); + } + } + + // Type guard for parsed JSON + if (typeof parsed !== "object" || parsed === null) { + return; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const parsedObj = parsed as Record; + + let formatted: string | null = null; + switch (parsedObj.type) { + case "tool_use": + formatted = formatToolUse(parsedObj.part, plainLogs); + break; + case "step_start": + if (isFirstStep) { + isFirstStep = false; + formatted = formatStatus("OpenCode is analyzing your request..."); + } + // Subsequent step_start events are silent + break; + case "step_finish": + formatted = formatStepEvent(parsedObj.type, parsedObj.part); + break; + case "text": { + const textPart = parsedObj.part; + const textValue = + typeof textPart?.text === "string" + ? cleanAnsi(textPart.text, plainLogs) + : ""; + formatted = textValue ? formatMessage(textValue) : null; + break; + } + case "error": + formatted = formatErrorEvent(parsedObj.error, plainLogs); + break; + default: + break; + } + + if (formatted) { + const suffix = formatted.endsWith("\n") ? "" : "\n"; + onData?.(formatted + suffix); + } + }; + + const normalizeChunk = (chunk: string): string => { + let result = chunk; + + // Convert line endings to \n + result = result.replace(/\r\n/g, "\n").replace(/\r/g, "\n"); + + // Handle carriage returns that cause line overwrites + result = result.replace(/^.*\r([^\r\n]*)/gm, "$1"); + + // Strip ANSI sequences in plain mode + if (plainLogs) { + result = result.replace(ANSI_ESCAPE_SEQUENCE, ""); + } + + // Collapse excessive newlines + result = result.replace(/\n{3,}/g, "\n\n"); + + return result; + }; + + let result; + try { + result = await spawnProcess({ + command, + args, + cwd: workingDir, + env: runnerEnv, + stdinInput: prompt, + stdioMode: "pipe", + onStdout: (chunk) => { + const normalized = normalizeChunk(chunk); + jsonBuffer += normalized; + + const lines = jsonBuffer.split("\n"); + jsonBuffer = lines.pop() ?? ""; + + for (const line of lines) { + processLine(line); + } + }, + onStderr: (chunk) => { + const normalized = normalizeChunk(chunk); + const cleaned = cleanAnsi(normalized, plainLogs); + onErrorData?.(cleaned); + }, + signal: abortSignal, + timeout, + }); + } catch (error) { + const err = error as { code?: string; message?: string }; + const message = err?.message ?? ""; + const notFound = + err?.code === "ENOENT" || + /not recognized as an internal or external command/i.test(message) || + /command not found/i.test(message); + + if (notFound) { + const installMessage = [ + `'${command}' is not available on this system.`, + "Install OpenCode via:", + " npm i -g opencode-ai@latest", + " brew install opencode", + " scoop bucket add extras && scoop install extras/opencode", + " choco install opencode", + "Docs: https://opencode.ai/docs", + ].join("\n"); + logger.error( + `${metadata.name} CLI not found when executing: ${command} ${args.join(" ")}`, + ); + throw new Error(installMessage); + } + + throw error; + } + + if (jsonBuffer.trim()) { + processLine(jsonBuffer); + jsonBuffer = ""; + } + + if (result.exitCode !== 0) { + const stderr = result.stderr.trim(); + const stdout = result.stdout.trim(); + const sample = (stderr || stdout || "no error output") + .split("\n") + .slice(0, 10) + .join("\n"); + + logger.error("OpenCode CLI execution failed", { + exitCode: result.exitCode, + sample, + command: `${command} ${args.join(" ")}`, + }); + + throw new Error(`OpenCode CLI exited with code ${result.exitCode}`); + } + + telemetryCapture.logCapturedTelemetry(result.exitCode); + + return { + stdout: result.stdout, + stderr: result.stderr, + }; } diff --git a/src/infra/engines/providers/opencode/execution/server.ts b/src/infra/engines/providers/opencode/execution/server.ts new file mode 100644 index 00000000..0f0812e7 --- /dev/null +++ b/src/infra/engines/providers/opencode/execution/server.ts @@ -0,0 +1,285 @@ +import { spawn, type ChildProcess } from "node:child_process"; +import { createServer } from "node:net"; +import { buildOpenCodeServeCommand } from "./commands.js"; +import { + info, + error as logError, + debug, +} from "../../../../../shared/logging/logger.js"; + +/** + * Agent information returned by OpenCode server + */ +export interface OpenCodeAgent { + id: string; + name?: string; + description?: string; +} + +export interface OpenCodeServerOptions { + /** + * Port to listen on. If not provided, finds an available port. + */ + port?: number; + /** + * Hostname to listen on. Defaults to 127.0.0.1. + */ + hostname?: string; + /** + * Working directory for the server process. + */ + workingDir?: string; + /** + * Timeout in ms to wait for server to be ready. Defaults to 30000 (30s). + */ + startupTimeout?: number; +} + +export interface OpenCodeServer { + /** + * URL to connect to (e.g., http://127.0.0.1:4096) + */ + url: string; + /** + * Port the server is listening on + */ + port: number; + /** + * Hostname the server is listening on + */ + hostname: string; + /** + * Stop the server + */ + stop: () => Promise; + /** + * Check if server is healthy + */ + isHealthy: () => Promise; + /** + * List available agents from the server + */ + listAgents: () => Promise; +} + +/** + * Find an available port by attempting to bind to port 0 + */ +async function findAvailablePort(): Promise { + return new Promise((resolve, reject) => { + const server = createServer(); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + if (address && typeof address === "object") { + const port = address.port; + server.close(() => resolve(port)); + } else { + server.close(() => reject(new Error("Could not determine port"))); + } + }); + server.on("error", reject); + }); +} + +/** + * Wait for the server to be ready by polling the /app endpoint + */ +async function waitForServer(url: string, timeoutMs: number): Promise { + const startTime = Date.now(); + const pollInterval = 500; + + while (Date.now() - startTime < timeoutMs) { + try { + const response = await fetch(`${url}/app`, { + method: "GET", + signal: AbortSignal.timeout(2000), + }); + if (response.ok) { + return true; + } + } catch { + // Server not ready yet, continue polling + } + await new Promise((resolve) => setTimeout(resolve, pollInterval)); + } + return false; +} + +/** + * Check if an OpenCode server is healthy at the given URL + */ +export async function checkServerHealth(url: string): Promise { + try { + const response = await fetch(`${url}/app`, { + method: "GET", + signal: AbortSignal.timeout(5000), + }); + return response.ok; + } catch { + return false; + } +} + +/** + * List available agents from an OpenCode server + */ +export async function listServerAgents(url: string): Promise { + try { + const response = await fetch(`${url}/agent`, { + method: "GET", + signal: AbortSignal.timeout(10000), + }); + if (!response.ok) { + throw new Error( + `Failed to list agents: ${response.status} ${response.statusText}`, + ); + } + const data = await response.json(); + // The API returns an array of agents + return Array.isArray(data) ? data : []; + } catch (err) { + debug(`Failed to list agents from ${url}: ${err}`); + return []; + } +} + +/** + * Start an OpenCode server process + * + * @example + * ```typescript + * const server = await startOpenCodeServer({ port: 4096 }); + * console.log(`Server running at ${server.url}`); + * + * // Use server.url with --attach flag + * // opencode run --attach http://127.0.0.1:4096 "prompt" + * + * // When done + * await server.stop(); + * ``` + */ +export async function startOpenCodeServer( + options: OpenCodeServerOptions = {}, +): Promise { + const hostname = options.hostname ?? "127.0.0.1"; + const port = options.port ?? (await findAvailablePort()); + const startupTimeout = options.startupTimeout ?? 30000; + + const { command, args } = buildOpenCodeServeCommand({ port, hostname }); + + debug(`Starting OpenCode server: ${command} ${args.join(" ")}`); + + let serverProcess: ChildProcess | null = null; + + try { + serverProcess = spawn(command, args, { + cwd: options.workingDir ?? process.cwd(), + stdio: ["ignore", "pipe", "pipe"], + detached: false, + env: { + ...process.env, + // Ensure opencode doesn't try to use a TTY + NO_COLOR: "1", + }, + }); + + // Capture stderr for debugging + let stderrOutput = ""; + serverProcess.stderr?.on("data", (chunk) => { + stderrOutput += chunk.toString(); + debug(`[opencode serve stderr] ${chunk.toString().trim()}`); + }); + + // Handle process exit + const exitPromise = new Promise((_, reject) => { + serverProcess?.on("exit", (code, signal) => { + reject( + new Error( + `OpenCode server exited unexpectedly (code=${code}, signal=${signal}). stderr: ${stderrOutput}`, + ), + ); + }); + serverProcess?.on("error", (err) => { + reject(new Error(`Failed to start OpenCode server: ${err.message}`)); + }); + }); + + const url = `http://${hostname}:${port}`; + + // Wait for server to be ready or exit + const isReady = await Promise.race([ + waitForServer(url, startupTimeout), + exitPromise, + ]); + + if (!isReady) { + serverProcess.kill("SIGTERM"); + throw new Error( + `OpenCode server failed to start within ${startupTimeout}ms`, + ); + } + + info(`OpenCode server started at ${url}`); + + const server: OpenCodeServer = { + url, + port, + hostname, + stop: async () => { + if (serverProcess && !serverProcess.killed) { + debug(`Stopping OpenCode server at ${url}`); + serverProcess.kill("SIGTERM"); + + // Wait for graceful shutdown + await new Promise((resolve) => { + const timeout = setTimeout(() => { + if (serverProcess && !serverProcess.killed) { + serverProcess.kill("SIGKILL"); + } + resolve(); + }, 5000); + + serverProcess?.on("exit", () => { + clearTimeout(timeout); + resolve(); + }); + }); + + info(`OpenCode server stopped`); + } + }, + isHealthy: async () => checkServerHealth(url), + listAgents: async () => listServerAgents(url), + }; + + return server; + } catch (err) { + // Clean up on error + if (serverProcess && !serverProcess.killed) { + serverProcess.kill("SIGKILL"); + } + throw err; + } +} + +/** + * Create a server handle for an existing external server (no lifecycle management) + */ +export function attachToExternalServer(url: string): OpenCodeServer { + // Parse URL to extract hostname and port + const parsed = new URL(url); + const hostname = parsed.hostname; + const port = parseInt(parsed.port || "4096", 10); + + return { + url: url.replace(/\/$/, ""), // Remove trailing slash + port, + hostname, + stop: async () => { + // External server - we don't manage its lifecycle + debug(`Not stopping external OpenCode server at ${url}`); + }, + isHealthy: async () => checkServerHealth(url), + listAgents: async () => listServerAgents(url), + }; +} diff --git a/src/workflows/execution/step.ts b/src/workflows/execution/step.ts index b56da18a..e35dec36 100644 --- a/src/workflows/execution/step.ts +++ b/src/workflows/execution/step.ts @@ -72,6 +72,7 @@ export async function executeStep( engine: engineType, model: step.model, agent: step.agent, // Pass OpenCode agent name for --agent flag + attach: step.attach, // Pass server URL for --attach flag (consolidated server mode) logger: options.logger, stderrLogger: options.stderrLogger, onTelemetry: diff --git a/src/workflows/execution/workflow.ts b/src/workflows/execution/workflow.ts index c0f4b6ff..729b372d 100644 --- a/src/workflows/execution/workflow.ts +++ b/src/workflows/execution/workflow.ts @@ -1,543 +1,700 @@ -import * as path from 'node:path'; -import * as fs from 'node:fs'; +import * as path from "node:path"; +import * as fs from "node:fs"; -import type { RunWorkflowOptions } from '../templates/index.js'; -import { loadTemplateWithPath } from '../templates/index.js'; -import { formatAgentLog } from '../../shared/logging/index.js'; -import { debug, setDebugLogFile } from '../../shared/logging/logger.js'; +import type { RunWorkflowOptions } from "../templates/index.js"; +import { loadTemplateWithPath } from "../templates/index.js"; +import { formatAgentLog } from "../../shared/logging/index.js"; +import { debug, info, setDebugLogFile } from "../../shared/logging/logger.js"; import { - getTemplatePathFromTracking, - getCompletedSteps, - getNotCompletedSteps, - markStepCompleted, - markStepStarted, - removeFromNotCompleted, - getResumeStartIndex, -} from '../../shared/workflows/index.js'; -import { registry } from '../../infra/engines/index.js'; -import { shouldSkipStep, logSkipDebug, type ActiveLoop } from '../behaviors/skip.js'; -import { handleLoopLogic, createActiveLoop } from '../behaviors/loop/controller.js'; -import { handleTriggerLogic } from '../behaviors/trigger/controller.js'; -import { handleCheckpointLogic } from '../behaviors/checkpoint/controller.js'; -import { executeStep } from './step.js'; -import { executeTriggerAgent } from './trigger.js'; -import { shouldExecuteFallback, executeFallbackStep } from './fallback.js'; -import { WorkflowUIManager } from '../../ui/index.js'; -import { MonitoringCleanup } from '../../agents/monitoring/index.js'; + getTemplatePathFromTracking, + getCompletedSteps, + getNotCompletedSteps, + markStepCompleted, + markStepStarted, + removeFromNotCompleted, + getResumeStartIndex, +} from "../../shared/workflows/index.js"; +import { registry } from "../../infra/engines/index.js"; +import { + shouldSkipStep, + logSkipDebug, + type ActiveLoop, +} from "../behaviors/skip.js"; +import { + handleLoopLogic, + createActiveLoop, +} from "../behaviors/loop/controller.js"; +import { handleTriggerLogic } from "../behaviors/trigger/controller.js"; +import { handleCheckpointLogic } from "../behaviors/checkpoint/controller.js"; +import { executeStep } from "./step.js"; +import { executeTriggerAgent } from "./trigger.js"; +import { shouldExecuteFallback, executeFallbackStep } from "./fallback.js"; +import { WorkflowUIManager } from "../../ui/index.js"; +import { MonitoringCleanup } from "../../agents/monitoring/index.js"; +import { + type OpenCodeServer, + startOpenCodeServer, + attachToExternalServer, + checkServerHealth, +} from "../../infra/engines/providers/opencode/execution/server.js"; /** * Cache for engine authentication status with TTL * Prevents repeated auth checks (which can take 10-30 seconds) */ class EngineAuthCache { - private cache: Map = new Map(); - private ttlMs: number = 5 * 60 * 1000; // 5 minutes TTL - - /** - * Check if engine is authenticated (with caching) - */ - async isAuthenticated(engineId: string, checkFn: () => Promise): Promise { - const cached = this.cache.get(engineId); - const now = Date.now(); - - // Return cached value if still valid - if (cached && (now - cached.timestamp) < this.ttlMs) { - return cached.isAuthenticated; - } - - // Cache miss or expired - perform actual check - const result = await checkFn(); - - // Cache the result - this.cache.set(engineId, { - isAuthenticated: result, - timestamp: now - }); - - return result; - } - - /** - * Invalidate cache for specific engine - */ - invalidate(engineId: string): void { - this.cache.delete(engineId); - } - - /** - * Clear entire cache - */ - clear(): void { - this.cache.clear(); - } + private cache: Map = + new Map(); + private ttlMs: number = 5 * 60 * 1000; // 5 minutes TTL + + /** + * Check if engine is authenticated (with caching) + */ + async isAuthenticated( + engineId: string, + checkFn: () => Promise, + ): Promise { + const cached = this.cache.get(engineId); + const now = Date.now(); + + // Return cached value if still valid + if (cached && now - cached.timestamp < this.ttlMs) { + return cached.isAuthenticated; + } + + // Cache miss or expired - perform actual check + const result = await checkFn(); + + // Cache the result + this.cache.set(engineId, { + isAuthenticated: result, + timestamp: now, + }); + + return result; + } + + /** + * Invalidate cache for specific engine + */ + invalidate(engineId: string): void { + this.cache.delete(engineId); + } + + /** + * Clear entire cache + */ + clear(): void { + this.cache.clear(); + } } // Global auth cache instance const authCache = new EngineAuthCache(); -export async function runWorkflow(options: RunWorkflowOptions = {}): Promise { - const cwd = options.cwd ? path.resolve(options.cwd) : process.cwd(); - - // Redirect debug logs to file whenever LOG_LEVEL=debug (or DEBUG env is truthy) so they don't break Ink layout - const rawLogLevel = (process.env.LOG_LEVEL || '').trim().toLowerCase(); - const debugFlag = (process.env.DEBUG || '').trim().toLowerCase(); - const debugEnabled = rawLogLevel === 'debug' || (debugFlag !== '' && debugFlag !== '0' && debugFlag !== 'false'); - const isDebugLogLevel = debugEnabled; - const debugLogPath = isDebugLogLevel ? path.join(cwd, '.codemachine', 'logs', 'workflow-debug.log') : null; - setDebugLogFile(debugLogPath); - - // Set up cleanup handlers for graceful shutdown - MonitoringCleanup.setup(); - - // Load template from .codemachine/template.json or use provided path - const cmRoot = path.join(cwd, '.codemachine'); - const templatePath = options.templatePath || (await getTemplatePathFromTracking(cmRoot)); - - const { template } = await loadTemplateWithPath(cwd, templatePath); - - debug(`Using workflow template: ${template.name}`); - - // Sync agent configurations before running the workflow - const workflowAgents = Array.from( - template.steps - .filter((step) => step.type === 'module') - .reduce((acc, step) => { - const id = step.agentId?.trim(); - if (!id) return acc; - const existing = acc.get(id) ?? { id }; - acc.set(id, { - ...existing, - id, - model: step.model ?? existing.model, - modelReasoningEffort: step.modelReasoningEffort ?? existing.modelReasoningEffort, - }); - return acc; - }, new Map()).values(), - ); - - // Sync agent configurations for engines that need it - if (workflowAgents.length > 0) { - const engines = registry.getAll(); - for (const engine of engines) { - if (engine.syncConfig) { - await engine.syncConfig({ additionalAgents: workflowAgents }); - } - } - } - - // Load completed steps for executeOnce tracking - const completedSteps = await getCompletedSteps(cmRoot); - - // Load not completed steps for fallback tracking - const notCompletedSteps = await getNotCompletedSteps(cmRoot); - - const loopCounters = new Map(); - let activeLoop: ActiveLoop | null = null; - const workflowStartTime = Date.now(); - - // Initialize Workflow UI Manager - const ui = new WorkflowUIManager(template.name); - if (debugLogPath) { - ui.setDebugLogPath(debugLogPath); - } - - // Pre-populate timeline with all workflow steps BEFORE starting UI - // This prevents duplicate renders at startup - // Set initial status based on completion tracking - template.steps.forEach((step, stepIndex) => { - if (step.type === 'module') { - const defaultEngine = registry.getDefault(); - const engineType = step.engine ?? defaultEngine?.metadata.id ?? 'unknown'; - const engineName = engineType; // preserve original engine type, even if unknown - - // Create a unique identifier for each step instance (agentId + stepIndex) - // This allows multiple instances of the same agent to appear separately in the UI - const uniqueAgentId = `${step.agentId}-step-${stepIndex}`; - - // Determine initial status based on completion tracking - let initialStatus: 'pending' | 'completed' = 'pending'; - if (completedSteps.includes(stepIndex)) { - initialStatus = 'completed'; - } - - const agentId = ui.addMainAgent(step.agentName ?? step.agentId, engineName, stepIndex, initialStatus, uniqueAgentId); - - // Update agent with step information - const state = ui.getState(); - const agent = state.agents.find(a => a.id === agentId); - if (agent) { - agent.stepIndex = stepIndex; - agent.totalSteps = template.steps.filter(s => s.type === 'module').length; - } - } else if (step.type === 'ui') { - // Pre-populate UI elements - ui.addUIElement(step.text, stepIndex); - } - }); - - // Start UI after all agents are pre-populated (single clean render) - ui.start(); - - // Get the starting index based on resume configuration - const startIndex = await getResumeStartIndex(cmRoot); - - if (startIndex > 0) { - console.log(`Resuming workflow from step ${startIndex}...`); - } - - // Workflow stop flag for Ctrl+C handling - let workflowShouldStop = false; - let stoppedByCheckpointQuit = false; - const stopListener = () => { - workflowShouldStop = true; - }; - process.on('workflow:stop', stopListener); - - try { - for (let index = startIndex; index < template.steps.length; index += 1) { - // Check if workflow should stop (Ctrl+C pressed) - if (workflowShouldStop) { - console.log(formatAgentLog('workflow', 'Workflow stopped by user.')); - break; - } - - const step = template.steps[index]; - - // UI elements are pre-populated and don't need execution - if (step.type === 'ui') { - continue; - } - - if (step.type !== 'module') { - continue; - } - - // Create unique agent ID for this step instance (matches UI pre-population) - const uniqueAgentId = `${step.agentId}-step-${index}`; - - const skipResult = shouldSkipStep(step, index, completedSteps, activeLoop, ui, uniqueAgentId); - if (skipResult.skip) { - ui.logMessage(uniqueAgentId, skipResult.reason!); - continue; - } - - logSkipDebug(step, activeLoop); - - // Update UI status to running (this clears the output buffer) - ui.updateAgentStatus(uniqueAgentId, 'running'); - - // Log start message AFTER clearing buffer - ui.logMessage(uniqueAgentId, '═'.repeat(80)); - ui.logMessage(uniqueAgentId, `${step.agentName} started to work.`); - - // Reset behavior file to default "continue" before each agent run - const behaviorFile = path.join(cwd, '.codemachine/memory/behavior.json'); - const behaviorDir = path.dirname(behaviorFile); - if (!fs.existsSync(behaviorDir)) { - fs.mkdirSync(behaviorDir, { recursive: true }); - } - fs.writeFileSync(behaviorFile, JSON.stringify({ action: 'continue' }, null, 2)); - - // Mark step as started (adds to notCompletedSteps) - await markStepStarted(cmRoot, index); - - // Determine engine: step override > first authenticated engine - let engineType: string; - if (step.engine) { - engineType = step.engine; - - // If an override is provided but not authenticated, log and fall back - const overrideEngine = registry.get(engineType); - const isOverrideAuthed = overrideEngine - ? await authCache.isAuthenticated(overrideEngine.metadata.id, () => overrideEngine.auth.isAuthenticated()) - : false; - if (!isOverrideAuthed) { - const pretty = overrideEngine?.metadata.name ?? engineType; - ui.logMessage( - uniqueAgentId, - `${pretty} override is not authenticated; falling back to first authenticated engine by order. Run 'codemachine auth login' to use ${pretty}.` - ); - - // Find first authenticated engine by order (with caching) - const engines = registry.getAll(); - let fallbackEngine = null as typeof overrideEngine | null; - for (const eng of engines) { - const isAuth = await authCache.isAuthenticated( - eng.metadata.id, - () => eng.auth.isAuthenticated() - ); - if (isAuth) { - fallbackEngine = eng; - break; - } - } - - // If none authenticated, fall back to registry default (may still require auth) - if (!fallbackEngine) { - fallbackEngine = registry.getDefault() ?? null; - } - - if (fallbackEngine) { - engineType = fallbackEngine.metadata.id; - ui.logMessage( - uniqueAgentId, - `Falling back to ${fallbackEngine.metadata.name} (${engineType})` - ); - } - } - } else { - // Fallback: find first authenticated engine by order (with caching) - const engines = registry.getAll(); - let foundEngine = null; - - for (const engine of engines) { - const isAuth = await authCache.isAuthenticated( - engine.metadata.id, - () => engine.auth.isAuthenticated() - ); - if (isAuth) { - foundEngine = engine; - break; - } - } - - if (!foundEngine) { - // If no authenticated engine, use default (first by order) - foundEngine = registry.getDefault(); - } - - if (!foundEngine) { - throw new Error('No engines registered. Please install at least one engine.'); - } - - engineType = foundEngine.metadata.id; - ui.logMessage(uniqueAgentId, `No engine specified, using ${foundEngine.metadata.name} (${engineType})`); - } - - // Ensure the selected engine is used during execution - // (executeStep falls back to default engine if step.engine is unset) - // Mutate current step to carry the chosen engine forward - step.engine = engineType; - - // Set up skip listener and abort controller for this step (covers fallback + main + triggers) - const abortController = new AbortController(); - let skipRequested = false; // Prevent duplicate skip requests during async abort handling - const skipListener = () => { - if (skipRequested) { - // Ignore duplicate skip events (user pressing Ctrl+S rapidly) - // This prevents multiple "Skip requested" messages during Bun.spawn's async termination - return; - } - skipRequested = true; - ui.logMessage(uniqueAgentId, 'ā­ļø Skip requested by user...'); - abortController.abort(); - }; - process.once('workflow:skip', skipListener); - - try { - // Check if fallback should be executed before the original step - if (shouldExecuteFallback(step, index, notCompletedSteps)) { - ui.logMessage(uniqueAgentId, `Detected incomplete step. Running fallback agent first.`); - try { - await executeFallbackStep(step, cwd, workflowStartTime, engineType, ui, uniqueAgentId, abortController.signal); - } catch (error) { - // Fallback failed, step remains in notCompletedSteps - ui.logMessage(uniqueAgentId, `Fallback failed. Skipping original step retry.`); - // Don't update status to failed - just let it stay as running or retrying - throw error; - } - } - - const output = await executeStep(step, cwd, { - logger: () => {}, // No-op: UI reads from log files - stderrLogger: () => {}, // No-op: UI reads from log files - ui, - abortSignal: abortController.signal, - uniqueAgentId, - }); - - // Check for trigger behavior first - const triggerResult = await handleTriggerLogic(step, output, cwd, ui); - if (triggerResult?.shouldTrigger && triggerResult.triggerAgentId) { - const triggeredAgentId = triggerResult.triggerAgentId; // Capture for use in callbacks - try { - await executeTriggerAgent({ - triggerAgentId: triggeredAgentId, - cwd, - engineType, - logger: () => {}, // No-op: UI reads from log files - stderrLogger: () => {}, // No-op: UI reads from log files - sourceAgentId: uniqueAgentId, - ui, - abortSignal: abortController.signal, - }); - } catch (triggerError) { - // Check if this was a user-requested skip (abort) - if (triggerError instanceof Error && triggerError.name === 'AbortError') { - ui.updateAgentStatus(triggeredAgentId, 'skipped'); - ui.logMessage(triggeredAgentId, `Triggered agent was skipped by user.`); - } - // Continue with workflow even if triggered agent fails or is skipped - } - } - - // Remove from notCompletedSteps immediately after successful execution - // This must happen BEFORE loop logic to ensure cleanup even when loops trigger - await removeFromNotCompleted(cmRoot, index); - - // Mark step as completed if executeOnce is true - if (step.executeOnce) { - await markStepCompleted(cmRoot, index); - } - - // Update UI status to completed - // This must happen BEFORE loop logic to ensure UI updates even when loops trigger - ui.updateAgentStatus(uniqueAgentId, 'completed'); - - // Log completion messages BEFORE loop check (so they're part of current agent's output) - ui.logMessage(uniqueAgentId, `${step.agentName} has completed their work.`); - ui.logMessage(uniqueAgentId, '\n' + '═'.repeat(80) + '\n'); - - // Check for checkpoint behavior first (to pause workflow for manual review) - const checkpointResult = await handleCheckpointLogic(step, output, cwd, ui); - if (checkpointResult?.shouldStopWorkflow) { - // Wait for user action via events (Continue or Quit) - await new Promise((resolve) => { - const continueHandler = () => { - cleanup(); - resolve(); - }; - const quitHandler = () => { - cleanup(); - workflowShouldStop = true; - stoppedByCheckpointQuit = true; - resolve(); - }; - const cleanup = () => { - process.removeListener('checkpoint:continue', continueHandler); - process.removeListener('checkpoint:quit', quitHandler); - }; - - process.once('checkpoint:continue', continueHandler); - process.once('checkpoint:quit', quitHandler); - }); - - // Clear checkpoint state and resume - ui.clearCheckpointState(); - - if (workflowShouldStop) { - // User chose to quit from checkpoint - set status to stopped - ui.setWorkflowStatus('stopped'); - break; // User chose to quit - } - // Otherwise continue to next step (current step already marked complete via executeOnce) - } - - const loopResult = await handleLoopLogic(step, index, output, loopCounters, cwd, ui); - - if (loopResult.decision?.shouldRepeat) { - // Set active loop with skip list - activeLoop = createActiveLoop(loopResult.decision); - - // Update UI loop state - const loopKey = `${step.module?.id ?? step.agentId}:${index}`; - const iteration = (loopCounters.get(loopKey) || 0) + 1; - ui.setLoopState({ - active: true, - sourceAgent: uniqueAgentId, - backSteps: loopResult.decision.stepsBack, - iteration, - maxIterations: step.module?.behavior?.type === 'loop' ? step.module.behavior.maxIterations ?? Infinity : Infinity, - skipList: loopResult.decision.skipList || [], - reason: loopResult.decision.reason, - }); - - // Reset all agents that will be re-executed in the loop - // Clear their UI data (telemetry, tool counts, subagents) and monitoring registry data - // Save their current state to execution history with cycle number - for (let resetIndex = loopResult.newIndex; resetIndex <= index; resetIndex += 1) { - const resetStep = template.steps[resetIndex]; - if (resetStep && resetStep.type === 'module') { - const resetUniqueAgentId = `${resetStep.agentId}-step-${resetIndex}`; - await ui.resetAgentForLoop(resetUniqueAgentId, iteration); - } - } - - index = loopResult.newIndex; - continue; - } - - // Clear active loop only when a loop step explicitly terminates - const newActiveLoop = createActiveLoop(loopResult.decision); - if (newActiveLoop !== (undefined as unknown as ActiveLoop | null)) { - activeLoop = newActiveLoop; - if (!newActiveLoop) { - ui.setLoopState(null); - ui.clearLoopRound(uniqueAgentId); - } - } - } catch (error) { - // Check if this was a user-requested skip (abort) - if (error instanceof Error && error.name === 'AbortError') { - ui.updateAgentStatus(uniqueAgentId, 'skipped'); - ui.logMessage(uniqueAgentId, `${step.agentName} was skipped by user.`); - ui.logMessage(uniqueAgentId, '\n' + '═'.repeat(80) + '\n'); - // Continue to next step - don't throw - } else { - // Don't update status to failed - let it stay as running/retrying - ui.logMessage( - uniqueAgentId, - `${step.agentName} failed: ${error instanceof Error ? error.message : String(error)}` - ); - throw error; - } - } finally { - // Always clean up the skip listener - process.removeListener('workflow:skip', skipListener); - } - } - - // Check if workflow was stopped by user (Ctrl+C or checkpoint quit) - if (workflowShouldStop) { - if (stoppedByCheckpointQuit) { - // Workflow was stopped by checkpoint quit - status already set to 'stopped' - // UI will stay running showing the stopped status - // Wait indefinitely - user can press Ctrl+C to exit - await new Promise(() => { - // Never resolves - keeps event loop alive until Ctrl+C exits process - }); - } else { - // Workflow was stopped by Ctrl+C - status already updated by MonitoringCleanup handler - // Keep UI alive to show "Press Ctrl+C again to exit" message - // The second Ctrl+C will be handled by MonitoringCleanup's SIGINT handler - // Wait indefinitely - the SIGINT handler will call process.exit() - await new Promise(() => { - // Never resolves - keeps event loop alive until second Ctrl+C exits process - }); - } - } - - // Workflow completed successfully - MonitoringCleanup.clearWorkflowHandlers(); - - // Set status to completed and keep UI alive - ui.setWorkflowStatus('completed'); - // UI will stay running - user presses Ctrl+C to exit with two-stage behavior - // Wait indefinitely - the SIGINT handler will call process.exit() - await new Promise(() => { - // Never resolves - keeps event loop alive until Ctrl+C exits process - }); - } catch (error) { - // On workflow error, set status, stop UI, then exit - ui.setWorkflowStatus('stopped'); - - // Stop UI to restore console before logging error - ui.stop(); - - // Re-throw error to be handled by caller (will now print after UI is stopped) - throw error; - } finally { - // Clean up workflow stop listener - process.removeListener('workflow:stop', stopListener); - } +export async function runWorkflow( + options: RunWorkflowOptions = {}, +): Promise { + const cwd = options.cwd ? path.resolve(options.cwd) : process.cwd(); + + // Redirect debug logs to file whenever LOG_LEVEL=debug (or DEBUG env is truthy) so they don't break Ink layout + const rawLogLevel = (process.env.LOG_LEVEL || "").trim().toLowerCase(); + const debugFlag = (process.env.DEBUG || "").trim().toLowerCase(); + const debugEnabled = + rawLogLevel === "debug" || + (debugFlag !== "" && debugFlag !== "0" && debugFlag !== "false"); + const isDebugLogLevel = debugEnabled; + const debugLogPath = isDebugLogLevel + ? path.join(cwd, ".codemachine", "logs", "workflow-debug.log") + : null; + setDebugLogFile(debugLogPath); + + // Set up cleanup handlers for graceful shutdown + MonitoringCleanup.setup(); + + // Load template from .codemachine/template.json or use provided path + const cmRoot = path.join(cwd, ".codemachine"); + const templatePath = + options.templatePath || (await getTemplatePathFromTracking(cmRoot)); + + const { template } = await loadTemplateWithPath(cwd, templatePath); + + debug(`Using workflow template: ${template.name}`); + + // Check for OpenCode server options (from CLI flags or environment variables) + const useOpencodeServer = + options.opencodeServer || process.env.CODEMACHINE_OPENCODE_SERVER === "1"; + const opencodeAttachUrl = + options.opencodeAttach || process.env.CODEMACHINE_OPENCODE_ATTACH; + + // Detect if any steps use OpenCode engine + const hasOpenCodeSteps = template.steps.some( + (step) => step.type === "module" && step.engine === "opencode", + ); + + // Set up OpenCode server if requested and workflow has OpenCode steps + let opencodeServer: OpenCodeServer | null = null; + if (hasOpenCodeSteps && (useOpencodeServer || opencodeAttachUrl)) { + if (opencodeAttachUrl) { + // Attach to existing external server + info(`Attaching to external OpenCode server: ${opencodeAttachUrl}`); + opencodeServer = attachToExternalServer(opencodeAttachUrl); + + // Verify server is healthy + const healthy = await checkServerHealth(opencodeAttachUrl); + if (!healthy) { + throw new Error( + `Cannot connect to OpenCode server at ${opencodeAttachUrl}. Is it running?`, + ); + } + info(`Connected to OpenCode server at ${opencodeServer.url}`); + } else if (useOpencodeServer) { + // Start consolidated server (experimental) + info( + "[Experimental] Starting consolidated OpenCode server for workflow...", + ); + try { + opencodeServer = await startOpenCodeServer({ workingDir: cwd }); + info(`OpenCode server started at ${opencodeServer.url}`); + } catch (err) { + throw new Error( + `Failed to start OpenCode server: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + + // Inject attach URL into all OpenCode steps + if (opencodeServer) { + for (const step of template.steps) { + if (step.type === "module" && step.engine === "opencode") { + step.attach = opencodeServer.url; + } + } + } + } + + // Sync agent configurations before running the workflow + const workflowAgents = Array.from( + template.steps + .filter((step) => step.type === "module") + .reduce((acc, step) => { + const id = step.agentId?.trim(); + if (!id) return acc; + const existing = acc.get(id) ?? { id }; + acc.set(id, { + ...existing, + id, + model: step.model ?? existing.model, + modelReasoningEffort: + step.modelReasoningEffort ?? existing.modelReasoningEffort, + }); + return acc; + }, new Map< + string, + { id: string; model?: unknown; modelReasoningEffort?: unknown } + >()) + .values(), + ); + + // Sync agent configurations for engines that need it + if (workflowAgents.length > 0) { + const engines = registry.getAll(); + for (const engine of engines) { + if (engine.syncConfig) { + await engine.syncConfig({ additionalAgents: workflowAgents }); + } + } + } + + // Load completed steps for executeOnce tracking + const completedSteps = await getCompletedSteps(cmRoot); + + // Load not completed steps for fallback tracking + const notCompletedSteps = await getNotCompletedSteps(cmRoot); + + const loopCounters = new Map(); + let activeLoop: ActiveLoop | null = null; + const workflowStartTime = Date.now(); + + // Initialize Workflow UI Manager + const ui = new WorkflowUIManager(template.name); + if (debugLogPath) { + ui.setDebugLogPath(debugLogPath); + } + + // Pre-populate timeline with all workflow steps BEFORE starting UI + // This prevents duplicate renders at startup + // Set initial status based on completion tracking + template.steps.forEach((step, stepIndex) => { + if (step.type === "module") { + const defaultEngine = registry.getDefault(); + const engineType = step.engine ?? defaultEngine?.metadata.id ?? "unknown"; + const engineName = engineType; // preserve original engine type, even if unknown + + // Create a unique identifier for each step instance (agentId + stepIndex) + // This allows multiple instances of the same agent to appear separately in the UI + const uniqueAgentId = `${step.agentId}-step-${stepIndex}`; + + // Determine initial status based on completion tracking + let initialStatus: "pending" | "completed" = "pending"; + if (completedSteps.includes(stepIndex)) { + initialStatus = "completed"; + } + + const agentId = ui.addMainAgent( + step.agentName ?? step.agentId, + engineName, + stepIndex, + initialStatus, + uniqueAgentId, + ); + + // Update agent with step information + const state = ui.getState(); + const agent = state.agents.find((a) => a.id === agentId); + if (agent) { + agent.stepIndex = stepIndex; + agent.totalSteps = template.steps.filter( + (s) => s.type === "module", + ).length; + } + } else if (step.type === "ui") { + // Pre-populate UI elements + ui.addUIElement(step.text, stepIndex); + } + }); + + // Start UI after all agents are pre-populated (single clean render) + ui.start(); + + // Get the starting index based on resume configuration + const startIndex = await getResumeStartIndex(cmRoot); + + if (startIndex > 0) { + console.log(`Resuming workflow from step ${startIndex}...`); + } + + // Workflow stop flag for Ctrl+C handling + let workflowShouldStop = false; + let stoppedByCheckpointQuit = false; + const stopListener = () => { + workflowShouldStop = true; + }; + process.on("workflow:stop", stopListener); + + try { + for (let index = startIndex; index < template.steps.length; index += 1) { + // Check if workflow should stop (Ctrl+C pressed) + if (workflowShouldStop) { + console.log(formatAgentLog("workflow", "Workflow stopped by user.")); + break; + } + + const step = template.steps[index]; + + // UI elements are pre-populated and don't need execution + if (step.type === "ui") { + continue; + } + + if (step.type !== "module") { + continue; + } + + // Create unique agent ID for this step instance (matches UI pre-population) + const uniqueAgentId = `${step.agentId}-step-${index}`; + + const skipResult = shouldSkipStep( + step, + index, + completedSteps, + activeLoop, + ui, + uniqueAgentId, + ); + if (skipResult.skip) { + ui.logMessage(uniqueAgentId, skipResult.reason!); + continue; + } + + logSkipDebug(step, activeLoop); + + // Update UI status to running (this clears the output buffer) + ui.updateAgentStatus(uniqueAgentId, "running"); + + // Log start message AFTER clearing buffer + ui.logMessage(uniqueAgentId, "═".repeat(80)); + ui.logMessage(uniqueAgentId, `${step.agentName} started to work.`); + + // Reset behavior file to default "continue" before each agent run + const behaviorFile = path.join(cwd, ".codemachine/memory/behavior.json"); + const behaviorDir = path.dirname(behaviorFile); + if (!fs.existsSync(behaviorDir)) { + fs.mkdirSync(behaviorDir, { recursive: true }); + } + fs.writeFileSync( + behaviorFile, + JSON.stringify({ action: "continue" }, null, 2), + ); + + // Mark step as started (adds to notCompletedSteps) + await markStepStarted(cmRoot, index); + + // Determine engine: step override > first authenticated engine + let engineType: string; + if (step.engine) { + engineType = step.engine; + + // If an override is provided but not authenticated, log and fall back + const overrideEngine = registry.get(engineType); + const isOverrideAuthed = overrideEngine + ? await authCache.isAuthenticated(overrideEngine.metadata.id, () => + overrideEngine.auth.isAuthenticated(), + ) + : false; + if (!isOverrideAuthed) { + const pretty = overrideEngine?.metadata.name ?? engineType; + ui.logMessage( + uniqueAgentId, + `${pretty} override is not authenticated; falling back to first authenticated engine by order. Run 'codemachine auth login' to use ${pretty}.`, + ); + + // Find first authenticated engine by order (with caching) + const engines = registry.getAll(); + let fallbackEngine = null as typeof overrideEngine | null; + for (const eng of engines) { + const isAuth = await authCache.isAuthenticated( + eng.metadata.id, + () => eng.auth.isAuthenticated(), + ); + if (isAuth) { + fallbackEngine = eng; + break; + } + } + + // If none authenticated, fall back to registry default (may still require auth) + if (!fallbackEngine) { + fallbackEngine = registry.getDefault() ?? null; + } + + if (fallbackEngine) { + engineType = fallbackEngine.metadata.id; + ui.logMessage( + uniqueAgentId, + `Falling back to ${fallbackEngine.metadata.name} (${engineType})`, + ); + } + } + } else { + // Fallback: find first authenticated engine by order (with caching) + const engines = registry.getAll(); + let foundEngine = null; + + for (const engine of engines) { + const isAuth = await authCache.isAuthenticated( + engine.metadata.id, + () => engine.auth.isAuthenticated(), + ); + if (isAuth) { + foundEngine = engine; + break; + } + } + + if (!foundEngine) { + // If no authenticated engine, use default (first by order) + foundEngine = registry.getDefault(); + } + + if (!foundEngine) { + throw new Error( + "No engines registered. Please install at least one engine.", + ); + } + + engineType = foundEngine.metadata.id; + ui.logMessage( + uniqueAgentId, + `No engine specified, using ${foundEngine.metadata.name} (${engineType})`, + ); + } + + // Ensure the selected engine is used during execution + // (executeStep falls back to default engine if step.engine is unset) + // Mutate current step to carry the chosen engine forward + step.engine = engineType; + + // Set up skip listener and abort controller for this step (covers fallback + main + triggers) + const abortController = new AbortController(); + let skipRequested = false; // Prevent duplicate skip requests during async abort handling + const skipListener = () => { + if (skipRequested) { + // Ignore duplicate skip events (user pressing Ctrl+S rapidly) + // This prevents multiple "Skip requested" messages during Bun.spawn's async termination + return; + } + skipRequested = true; + ui.logMessage(uniqueAgentId, "ā­ļø Skip requested by user..."); + abortController.abort(); + }; + process.once("workflow:skip", skipListener); + + try { + // Check if fallback should be executed before the original step + if (shouldExecuteFallback(step, index, notCompletedSteps)) { + ui.logMessage( + uniqueAgentId, + `Detected incomplete step. Running fallback agent first.`, + ); + try { + await executeFallbackStep( + step, + cwd, + workflowStartTime, + engineType, + ui, + uniqueAgentId, + abortController.signal, + ); + } catch (error) { + // Fallback failed, step remains in notCompletedSteps + ui.logMessage( + uniqueAgentId, + `Fallback failed. Skipping original step retry.`, + ); + // Don't update status to failed - just let it stay as running or retrying + throw error; + } + } + + const output = await executeStep(step, cwd, { + logger: () => {}, // No-op: UI reads from log files + stderrLogger: () => {}, // No-op: UI reads from log files + ui, + abortSignal: abortController.signal, + uniqueAgentId, + }); + + // Check for trigger behavior first + const triggerResult = await handleTriggerLogic(step, output, cwd, ui); + if (triggerResult?.shouldTrigger && triggerResult.triggerAgentId) { + const triggeredAgentId = triggerResult.triggerAgentId; // Capture for use in callbacks + try { + await executeTriggerAgent({ + triggerAgentId: triggeredAgentId, + cwd, + engineType, + logger: () => {}, // No-op: UI reads from log files + stderrLogger: () => {}, // No-op: UI reads from log files + sourceAgentId: uniqueAgentId, + ui, + abortSignal: abortController.signal, + }); + } catch (triggerError) { + // Check if this was a user-requested skip (abort) + if ( + triggerError instanceof Error && + triggerError.name === "AbortError" + ) { + ui.updateAgentStatus(triggeredAgentId, "skipped"); + ui.logMessage( + triggeredAgentId, + `Triggered agent was skipped by user.`, + ); + } + // Continue with workflow even if triggered agent fails or is skipped + } + } + + // Remove from notCompletedSteps immediately after successful execution + // This must happen BEFORE loop logic to ensure cleanup even when loops trigger + await removeFromNotCompleted(cmRoot, index); + + // Mark step as completed if executeOnce is true + if (step.executeOnce) { + await markStepCompleted(cmRoot, index); + } + + // Update UI status to completed + // This must happen BEFORE loop logic to ensure UI updates even when loops trigger + ui.updateAgentStatus(uniqueAgentId, "completed"); + + // Log completion messages BEFORE loop check (so they're part of current agent's output) + ui.logMessage( + uniqueAgentId, + `${step.agentName} has completed their work.`, + ); + ui.logMessage(uniqueAgentId, "\n" + "═".repeat(80) + "\n"); + + // Check for checkpoint behavior first (to pause workflow for manual review) + const checkpointResult = await handleCheckpointLogic( + step, + output, + cwd, + ui, + ); + if (checkpointResult?.shouldStopWorkflow) { + // Wait for user action via events (Continue or Quit) + await new Promise((resolve) => { + const continueHandler = () => { + cleanup(); + resolve(); + }; + const quitHandler = () => { + cleanup(); + workflowShouldStop = true; + stoppedByCheckpointQuit = true; + resolve(); + }; + const cleanup = () => { + process.removeListener("checkpoint:continue", continueHandler); + process.removeListener("checkpoint:quit", quitHandler); + }; + + process.once("checkpoint:continue", continueHandler); + process.once("checkpoint:quit", quitHandler); + }); + + // Clear checkpoint state and resume + ui.clearCheckpointState(); + + if (workflowShouldStop) { + // User chose to quit from checkpoint - set status to stopped + ui.setWorkflowStatus("stopped"); + break; // User chose to quit + } + // Otherwise continue to next step (current step already marked complete via executeOnce) + } + + const loopResult = await handleLoopLogic( + step, + index, + output, + loopCounters, + cwd, + ui, + ); + + if (loopResult.decision?.shouldRepeat) { + // Set active loop with skip list + activeLoop = createActiveLoop(loopResult.decision); + + // Update UI loop state + const loopKey = `${step.module?.id ?? step.agentId}:${index}`; + const iteration = (loopCounters.get(loopKey) || 0) + 1; + ui.setLoopState({ + active: true, + sourceAgent: uniqueAgentId, + backSteps: loopResult.decision.stepsBack, + iteration, + maxIterations: + step.module?.behavior?.type === "loop" + ? (step.module.behavior.maxIterations ?? Infinity) + : Infinity, + skipList: loopResult.decision.skipList || [], + reason: loopResult.decision.reason, + }); + + // Reset all agents that will be re-executed in the loop + // Clear their UI data (telemetry, tool counts, subagents) and monitoring registry data + // Save their current state to execution history with cycle number + for ( + let resetIndex = loopResult.newIndex; + resetIndex <= index; + resetIndex += 1 + ) { + const resetStep = template.steps[resetIndex]; + if (resetStep && resetStep.type === "module") { + const resetUniqueAgentId = `${resetStep.agentId}-step-${resetIndex}`; + await ui.resetAgentForLoop(resetUniqueAgentId, iteration); + } + } + + index = loopResult.newIndex; + continue; + } + + // Clear active loop only when a loop step explicitly terminates + const newActiveLoop = createActiveLoop(loopResult.decision); + if (newActiveLoop !== (undefined as unknown as ActiveLoop | null)) { + activeLoop = newActiveLoop; + if (!newActiveLoop) { + ui.setLoopState(null); + ui.clearLoopRound(uniqueAgentId); + } + } + } catch (error) { + // Check if this was a user-requested skip (abort) + if (error instanceof Error && error.name === "AbortError") { + ui.updateAgentStatus(uniqueAgentId, "skipped"); + ui.logMessage( + uniqueAgentId, + `${step.agentName} was skipped by user.`, + ); + ui.logMessage(uniqueAgentId, "\n" + "═".repeat(80) + "\n"); + // Continue to next step - don't throw + } else { + // Don't update status to failed - let it stay as running/retrying + ui.logMessage( + uniqueAgentId, + `${step.agentName} failed: ${error instanceof Error ? error.message : String(error)}`, + ); + throw error; + } + } finally { + // Always clean up the skip listener + process.removeListener("workflow:skip", skipListener); + } + } + + // Check if workflow was stopped by user (Ctrl+C or checkpoint quit) + if (workflowShouldStop) { + if (stoppedByCheckpointQuit) { + // Workflow was stopped by checkpoint quit - status already set to 'stopped' + // UI will stay running showing the stopped status + // Wait indefinitely - user can press Ctrl+C to exit + await new Promise(() => { + // Never resolves - keeps event loop alive until Ctrl+C exits process + }); + } else { + // Workflow was stopped by Ctrl+C - status already updated by MonitoringCleanup handler + // Keep UI alive to show "Press Ctrl+C again to exit" message + // The second Ctrl+C will be handled by MonitoringCleanup's SIGINT handler + // Wait indefinitely - the SIGINT handler will call process.exit() + await new Promise(() => { + // Never resolves - keeps event loop alive until second Ctrl+C exits process + }); + } + } + + // Workflow completed successfully + MonitoringCleanup.clearWorkflowHandlers(); + + // Set status to completed and keep UI alive + ui.setWorkflowStatus("completed"); + // UI will stay running - user presses Ctrl+C to exit with two-stage behavior + // Wait indefinitely - the SIGINT handler will call process.exit() + await new Promise(() => { + // Never resolves - keeps event loop alive until Ctrl+C exits process + }); + } catch (error) { + // On workflow error, set status, stop UI, then exit + ui.setWorkflowStatus("stopped"); + + // Stop UI to restore console before logging error + ui.stop(); + + // Re-throw error to be handled by caller (will now print after UI is stopped) + throw error; + } finally { + // Clean up workflow stop listener + process.removeListener("workflow:stop", stopListener); + + // Stop OpenCode server if we started one + if (opencodeServer) { + debug("Stopping OpenCode server..."); + await opencodeServer.stop(); + } + } } diff --git a/src/workflows/templates/types.ts b/src/workflows/templates/types.ts index 30a5468e..84e8e7ed 100644 --- a/src/workflows/templates/types.ts +++ b/src/workflows/templates/types.ts @@ -44,6 +44,13 @@ export interface ModuleStep { * If not specified, uses the engine's default agent. */ agent?: string; + /** + * URL of running OpenCode server to attach to (e.g., http://localhost:4096) + * When provided, uses --attach flag to connect to existing server. + * This is typically set automatically by the workflow runner when using + * consolidated server mode (--opencode-server flag). + */ + attach?: string; module?: ModuleMetadata; executeOnce?: boolean; notCompletedFallback?: string; // Agent ID to run if step is in notCompletedSteps @@ -75,6 +82,18 @@ export interface RunWorkflowOptions { cwd?: string; templatePath?: string; specificationPath?: string; + /** + * [Experimental] Start a consolidated OpenCode server for all workflow steps. + * Reduces MCP cold boot times by reusing a single server instance. + * The server is started before the first OpenCode step and stopped when the workflow completes. + */ + opencodeServer?: boolean; + /** + * Attach to an existing OpenCode server instead of starting a new one. + * Mutually exclusive with opencodeServer. + * Example: "http://localhost:4096" + */ + opencodeAttach?: string; } export interface TaskManagerOptions { diff --git a/tests/integration/opencode/server.test.ts b/tests/integration/opencode/server.test.ts new file mode 100644 index 00000000..08dedb87 --- /dev/null +++ b/tests/integration/opencode/server.test.ts @@ -0,0 +1,189 @@ +/** + * OpenCode Server Integration Tests + * + * Tests the OpenCode server lifecycle management, including: + * - Starting a server on a random port + * - Health checks + * - Listing available agents + * - Attaching to an external server + * - Server cleanup + * + * Note: These tests require OpenCode to be installed (`opencode` CLI available). + * Tests are skipped if OpenCode is not installed. + */ + +import { describe, test, expect } from "bun:test"; +import { execSync } from "node:child_process"; +import { + startOpenCodeServer, + attachToExternalServer, + checkServerHealth, + type OpenCodeServer, +} from "../../../src/infra/engines/providers/opencode/execution/server.js"; +import { + buildOpenCodeRunCommand, + buildOpenCodeServeCommand, +} from "../../../src/infra/engines/providers/opencode/execution/commands.js"; + +// Check if opencode is installed +let opencodeInstalled = false; +try { + execSync("opencode --version", { stdio: "ignore" }); + opencodeInstalled = true; +} catch { + opencodeInstalled = false; +} + +const describeIfOpencode = opencodeInstalled ? describe : describe.skip; + +describeIfOpencode("OpenCode Server", () => { + describe("Command Building", () => { + test("buildOpenCodeRunCommand includes --attach when provided", () => { + const cmd = buildOpenCodeRunCommand({ + model: "anthropic/claude-3-5-sonnet", + agent: "task-implementer", + attach: "http://localhost:4096", + }); + + expect(cmd.command).toBe("opencode"); + expect(cmd.args).toContain("run"); + expect(cmd.args).toContain("--format"); + expect(cmd.args).toContain("json"); + expect(cmd.args).toContain("--attach"); + expect(cmd.args).toContain("http://localhost:4096"); + expect(cmd.args).toContain("--agent"); + expect(cmd.args).toContain("task-implementer"); + expect(cmd.args).toContain("--model"); + expect(cmd.args).toContain("anthropic/claude-3-5-sonnet"); + }); + + test("buildOpenCodeRunCommand omits --attach when not provided", () => { + const cmd = buildOpenCodeRunCommand({ + model: "anthropic/claude-3-5-sonnet", + agent: "build", + }); + + expect(cmd.args).not.toContain("--attach"); + }); + + test("buildOpenCodeServeCommand with port and hostname", () => { + const cmd = buildOpenCodeServeCommand({ + port: 5000, + hostname: "0.0.0.0", + }); + + expect(cmd.command).toBe("opencode"); + expect(cmd.args).toContain("serve"); + expect(cmd.args).toContain("--port"); + expect(cmd.args).toContain("5000"); + expect(cmd.args).toContain("--hostname"); + expect(cmd.args).toContain("0.0.0.0"); + }); + }); + + describe("Server Lifecycle", () => { + let server: OpenCodeServer | null = null; + + test("startOpenCodeServer starts a server on random port", async () => { + server = await startOpenCodeServer({ + startupTimeout: 60000, // 60s timeout for CI + }); + + expect(server).toBeDefined(); + expect(server.url).toMatch(/^http:\/\/127\.0\.0\.1:\d+$/); + expect(server.port).toBeGreaterThan(0); + expect(server.hostname).toBe("127.0.0.1"); + expect(typeof server.stop).toBe("function"); + expect(typeof server.isHealthy).toBe("function"); + expect(typeof server.listAgents).toBe("function"); + }, 90000); // 90s test timeout + + test("server health check returns true for running server", async () => { + if (!server) { + throw new Error("Server not started"); + } + + const healthy = await server.isHealthy(); + expect(healthy).toBe(true); + }); + + test("listAgents returns array of agents", async () => { + if (!server) { + throw new Error("Server not started"); + } + + const agents = await server.listAgents(); + expect(Array.isArray(agents)).toBe(true); + + // OpenCode should have at least the default 'build' agent + // The exact agents depend on the project config + if (agents.length > 0) { + expect(agents[0]).toHaveProperty("id"); + } + }); + + test("server stops cleanly", async () => { + if (!server) { + throw new Error("Server not started"); + } + + await server.stop(); + + // Health check should fail after stop + const healthy = await checkServerHealth(server.url); + expect(healthy).toBe(false); + + server = null; + }); + }); + + describe("External Server", () => { + test("attachToExternalServer creates handle without lifecycle management", () => { + const server = attachToExternalServer("http://localhost:4096"); + + expect(server.url).toBe("http://localhost:4096"); + expect(server.port).toBe(4096); + expect(server.hostname).toBe("localhost"); + expect(typeof server.stop).toBe("function"); + expect(typeof server.isHealthy).toBe("function"); + expect(typeof server.listAgents).toBe("function"); + }); + + test("attachToExternalServer handles URL with trailing slash", () => { + const server = attachToExternalServer("http://localhost:4096/"); + + expect(server.url).toBe("http://localhost:4096"); + }); + + test("attachToExternalServer parses custom port", () => { + const server = attachToExternalServer("http://127.0.0.1:5555"); + + expect(server.port).toBe(5555); + expect(server.hostname).toBe("127.0.0.1"); + }); + }); + + describe("Health Check", () => { + test("checkServerHealth returns false for non-existent server", async () => { + const healthy = await checkServerHealth("http://localhost:59999"); + expect(healthy).toBe(false); + }); + }); +}); + +// Tests that run regardless of OpenCode installation +describe("OpenCode Commands (no runtime)", () => { + test("buildOpenCodeRunCommand defaults to build agent", () => { + const cmd = buildOpenCodeRunCommand({}); + + expect(cmd.args).toContain("--agent"); + expect(cmd.args).toContain("build"); + }); + + test("buildOpenCodeRunCommand includes --format json", () => { + const cmd = buildOpenCodeRunCommand({}); + + expect(cmd.args).toContain("--format"); + expect(cmd.args).toContain("json"); + }); +}); From 6b66197ebd22071aacd839886ed17c5aef01b641 Mon Sep 17 00:00:00 2001 From: Nick Roth Date: Thu, 4 Dec 2025 11:59:43 -0600 Subject: [PATCH 6/6] feat: support project-local workflow templates and inline prompts - Add project-first template resolution: check .codemachine/ before package templates - Support inline prompt field as alternative to promptPath in workflow steps - Fix empty memory content crashes in runner and trigger modules - Update ModuleStep type to make promptPath optional when prompt is provided - Update validator to accept either promptPath or prompt field --- src/agents/runner/runner.ts | 16 +- src/shared/workflows/template.ts | 155 ++++++----- src/workflows/execution/step.ts | 21 +- src/workflows/execution/trigger.ts | 398 +++++++++++++++------------ src/workflows/templates/loader.ts | 194 ++++++++----- src/workflows/templates/types.ts | 5 +- src/workflows/templates/validator.ts | 253 ++++++++++------- 7 files changed, 607 insertions(+), 435 deletions(-) diff --git a/src/agents/runner/runner.ts b/src/agents/runner/runner.ts index 667a8f75..87cd7db6 100644 --- a/src/agents/runner/runner.ts +++ b/src/agents/runner/runner.ts @@ -392,14 +392,16 @@ export async function executeAgent( timeout, }); - // Store output in memory + // Store output in memory (skip if empty to avoid validation error) const stdout = result.stdout || totalStdout; - const slice = stdout.slice(-2000); - await store.append({ - agentId, - content: slice, - timestamp: new Date().toISOString(), - }); + const slice = stdout.slice(-2000).trim(); + if (slice) { + await store.append({ + agentId, + content: slice, + timestamp: new Date().toISOString(), + }); + } // Mark agent as completed if (monitor && monitoringAgentId !== undefined) { diff --git a/src/shared/workflows/template.ts b/src/shared/workflows/template.ts index ea2864f9..cb4aa945 100644 --- a/src/shared/workflows/template.ts +++ b/src/shared/workflows/template.ts @@ -1,62 +1,72 @@ -import { readFile, writeFile } from 'node:fs/promises'; -import { existsSync } from 'node:fs'; -import * as path from 'node:path'; -import { resolvePackageRoot } from '../runtime/pkg.js'; +import { readFile, writeFile } from "node:fs/promises"; +import { existsSync } from "node:fs"; +import * as path from "node:path"; +import { resolvePackageRoot } from "../runtime/pkg.js"; -const TEMPLATE_TRACKING_FILE = 'template.json'; +const TEMPLATE_TRACKING_FILE = "template.json"; -const packageRoot = resolvePackageRoot(import.meta.url, 'workflows template tracking'); +const packageRoot = resolvePackageRoot( + import.meta.url, + "workflows template tracking", +); -const templatesDir = path.resolve(packageRoot, 'templates', 'workflows'); +const templatesDir = path.resolve(packageRoot, "templates", "workflows"); interface TemplateTracking { - activeTemplate: string; - /** - * Timestamp in ISO 8601 format with UTC timezone (e.g., "2025-10-13T14:40:14.123Z"). - * The "Z" suffix explicitly indicates UTC timezone. - * To convert to local time in JavaScript: new Date(lastUpdated).toLocaleString() - */ - lastUpdated: string; - completedSteps?: number[]; - notCompletedSteps?: number[]; - resumeFromLastStep?: boolean; + activeTemplate: string; + /** + * Timestamp in ISO 8601 format with UTC timezone (e.g., "2025-10-13T14:40:14.123Z"). + * The "Z" suffix explicitly indicates UTC timezone. + * To convert to local time in JavaScript: new Date(lastUpdated).toLocaleString() + */ + lastUpdated: string; + completedSteps?: number[]; + notCompletedSteps?: number[]; + resumeFromLastStep?: boolean; } /** * Gets the currently active template name from the tracking file. */ -export async function getActiveTemplate(cmRoot: string): Promise { - const trackingPath = path.join(cmRoot, TEMPLATE_TRACKING_FILE); - - if (!existsSync(trackingPath)) { - return null; - } - - try { - const content = await readFile(trackingPath, 'utf8'); - const data = JSON.parse(content) as TemplateTracking; - return data.activeTemplate ?? null; - } catch (error) { - console.warn(`Failed to read active template from tracking file: ${error instanceof Error ? error.message : String(error)}`); - return null; - } +export async function getActiveTemplate( + cmRoot: string, +): Promise { + const trackingPath = path.join(cmRoot, TEMPLATE_TRACKING_FILE); + + if (!existsSync(trackingPath)) { + return null; + } + + try { + const content = await readFile(trackingPath, "utf8"); + const data = JSON.parse(content) as TemplateTracking; + return data.activeTemplate ?? null; + } catch (error) { + console.warn( + `Failed to read active template from tracking file: ${error instanceof Error ? error.message : String(error)}`, + ); + return null; + } } /** * Sets the active template name in the tracking file. */ -export async function setActiveTemplate(cmRoot: string, templateName: string): Promise { - const trackingPath = path.join(cmRoot, TEMPLATE_TRACKING_FILE); - - const data: TemplateTracking = { - activeTemplate: templateName, - lastUpdated: new Date().toISOString(), // ISO 8601 UTC format (e.g., "2025-10-13T14:40:14.123Z") - completedSteps: [], - notCompletedSteps: [], - resumeFromLastStep: true, - }; - - await writeFile(trackingPath, JSON.stringify(data, null, 2), 'utf8'); +export async function setActiveTemplate( + cmRoot: string, + templateName: string, +): Promise { + const trackingPath = path.join(cmRoot, TEMPLATE_TRACKING_FILE); + + const data: TemplateTracking = { + activeTemplate: templateName, + lastUpdated: new Date().toISOString(), // ISO 8601 UTC format (e.g., "2025-10-13T14:40:14.123Z") + completedSteps: [], + notCompletedSteps: [], + resumeFromLastStep: true, + }; + + await writeFile(trackingPath, JSON.stringify(data, null, 2), "utf8"); } /** @@ -65,30 +75,47 @@ export async function setActiveTemplate(cmRoot: string, templateName: string): P * - There is an active template and it's different from the provided one * - There is no active template but we have a new one (first run with template) */ -export async function hasTemplateChanged(cmRoot: string, templateName: string): Promise { - const activeTemplate = await getActiveTemplate(cmRoot); - - // If no active template, treat it as changed (first run with template should regenerate) - if (activeTemplate === null) { - return true; - } - - // Check if the template is different - return activeTemplate !== templateName; +export async function hasTemplateChanged( + cmRoot: string, + templateName: string, +): Promise { + const activeTemplate = await getActiveTemplate(cmRoot); + + // If no active template, treat it as changed (first run with template should regenerate) + if (activeTemplate === null) { + return true; + } + + // Check if the template is different + return activeTemplate !== templateName; } /** * Gets the full template path from the tracking file. * Returns the default template if no template is tracked. + * Supports both absolute paths and relative paths (relative to CodeMachine templates dir). */ -export async function getTemplatePathFromTracking(cmRoot: string): Promise { - const activeTemplate = await getActiveTemplate(cmRoot); - - if (!activeTemplate) { - // No template tracked, return default - return path.join(templatesDir, 'default.workflow.js'); - } - - // Return full path from template name - return path.join(templatesDir, activeTemplate); +export async function getTemplatePathFromTracking( + cmRoot: string, +): Promise { + const activeTemplate = await getActiveTemplate(cmRoot); + + if (!activeTemplate) { + // No template tracked, return default + return path.join(templatesDir, "default.workflow.js"); + } + + // If absolute path, use it directly + if (path.isAbsolute(activeTemplate)) { + return activeTemplate; + } + + // Check if template exists in project's .codemachine directory first + const projectTemplatePath = path.join(cmRoot, activeTemplate); + if (existsSync(projectTemplatePath)) { + return projectTemplatePath; + } + + // Fall back to CodeMachine's templates directory + return path.join(templatesDir, activeTemplate); } diff --git a/src/workflows/execution/step.ts b/src/workflows/execution/step.ts index e35dec36..82b41292 100644 --- a/src/workflows/execution/step.ts +++ b/src/workflows/execution/step.ts @@ -49,10 +49,23 @@ export async function executeStep( } // Load and process the prompt template - const promptPath = path.isAbsolute(step.promptPath) - ? step.promptPath - : path.resolve(cwd, step.promptPath); - const rawPrompt = await readFile(promptPath, "utf8"); + // Support either promptPath (file reference) or prompt (inline string) + let rawPrompt: string; + const stepWithPrompt = step as typeof step & { prompt?: string }; + if (stepWithPrompt.prompt) { + // Use inline prompt directly + rawPrompt = stepWithPrompt.prompt; + } else if (step.promptPath) { + // Load from file + const promptPath = path.isAbsolute(step.promptPath) + ? step.promptPath + : path.resolve(cwd, step.promptPath); + rawPrompt = await readFile(promptPath, "utf8"); + } else { + throw new Error( + `Step ${step.agentId} must have either promptPath or prompt`, + ); + } const prompt = await processPromptString(rawPrompt, cwd); // Use environment variable or default to 30 minutes (1800000ms) diff --git a/src/workflows/execution/trigger.ts b/src/workflows/execution/trigger.ts index 407ca43b..36dcf0b7 100644 --- a/src/workflows/execution/trigger.ts +++ b/src/workflows/execution/trigger.ts @@ -1,188 +1,228 @@ -import * as path from 'node:path'; -import type { EngineType } from '../../infra/engines/index.js'; -import { getEngine, registry } from '../../infra/engines/index.js'; -import { loadAgentConfig, loadAgentTemplate } from '../../agents/runner/index.js'; -import { MemoryAdapter } from '../../infra/fs/memory-adapter.js'; -import { MemoryStore } from '../../agents/index.js'; -import { processPromptString } from '../../shared/prompts/index.js'; -import type { WorkflowUIManager } from '../../ui/index.js'; -import { AgentMonitorService, AgentLoggerService } from '../../agents/monitoring/index.js'; +import * as path from "node:path"; +import type { EngineType } from "../../infra/engines/index.js"; +import { getEngine, registry } from "../../infra/engines/index.js"; +import { + loadAgentConfig, + loadAgentTemplate, +} from "../../agents/runner/index.js"; +import { MemoryAdapter } from "../../infra/fs/memory-adapter.js"; +import { MemoryStore } from "../../agents/index.js"; +import { processPromptString } from "../../shared/prompts/index.js"; +import type { WorkflowUIManager } from "../../ui/index.js"; +import { + AgentMonitorService, + AgentLoggerService, +} from "../../agents/monitoring/index.js"; export interface TriggerExecutionOptions { - triggerAgentId: string; - cwd: string; - engineType: EngineType; - logger: (chunk: string) => void; - stderrLogger: (chunk: string) => void; - sourceAgentId: string; // The agent that triggered this execution - ui?: WorkflowUIManager; - abortSignal?: AbortSignal; - /** Disable monitoring (for special cases) */ - disableMonitoring?: boolean; + triggerAgentId: string; + cwd: string; + engineType: EngineType; + logger: (chunk: string) => void; + stderrLogger: (chunk: string) => void; + sourceAgentId: string; // The agent that triggered this execution + ui?: WorkflowUIManager; + abortSignal?: AbortSignal; + /** Disable monitoring (for special cases) */ + disableMonitoring?: boolean; } /** * Executes a triggered agent by loading it from config/main.agents.js * This bypasses the workflow and allows triggering any agent, even outside the workflow */ -export async function executeTriggerAgent(options: TriggerExecutionOptions): Promise { - const { triggerAgentId, cwd, engineType, logger: _logger, stderrLogger: _stderrLogger, sourceAgentId, ui, abortSignal, disableMonitoring } = options; - - // Initialize monitoring (unless explicitly disabled) - declare outside try for catch block access - const monitor = !disableMonitoring ? AgentMonitorService.getInstance() : null; - const loggerService = !disableMonitoring ? AgentLoggerService.getInstance() : null; - let monitoringAgentId: number | undefined; - - try { - // Load agent config and template from config/main.agents.js - const triggeredAgentConfig = await loadAgentConfig(triggerAgentId, cwd); - const rawTemplate = await loadAgentTemplate(triggerAgentId, cwd); - const triggeredAgentTemplate = await processPromptString(rawTemplate, cwd); - - // Get engine and resolve model/reasoning - const engine = getEngine(engineType); - const engineModule = registry.get(engineType); - const triggeredModel = (triggeredAgentConfig.model as string | undefined) ?? engineModule?.metadata.defaultModel; - const triggeredReasoning = (triggeredAgentConfig.modelReasoningEffort as 'low' | 'medium' | 'high' | undefined) ?? engineModule?.metadata.defaultModelReasoningEffort; - - // Find parent agent in monitoring system by sourceAgentId - let parentMonitoringId: number | undefined; - if (monitor) { - const parentAgents = monitor.queryAgents({ name: sourceAgentId }); - if (parentAgents.length > 0) { - // Get the most recent one (highest ID) - parentMonitoringId = parentAgents.sort((a, b) => b.id - a.id)[0].id; - } - - // Register triggered agent with parent relationship and engine/model info - const promptText = `Triggered by ${sourceAgentId}`; - monitoringAgentId = await monitor.register({ - name: triggerAgentId, - prompt: promptText, - parentId: parentMonitoringId, - engineProvider: engineType, - modelName: triggeredModel, - }); - - // Store full prompt for debug mode logging - if (loggerService && monitoringAgentId !== undefined) { - loggerService.storeFullPrompt(monitoringAgentId, promptText); - } - - // Register monitoring ID with UI immediately so it can load logs - if (ui && monitoringAgentId !== undefined) { - ui.registerMonitoringId(triggerAgentId, monitoringAgentId); - } - } - - // Add triggered agent to UI - if (ui) { - const engineName = engineType; // preserve original engine type, even if unknown - ui.addTriggeredAgent(sourceAgentId, { - id: triggerAgentId, - name: triggeredAgentConfig.name ?? triggerAgentId, - engine: engineName, - status: 'running', - triggeredBy: sourceAgentId, - startTime: Date.now(), - telemetry: { tokensIn: 0, tokensOut: 0 }, - toolCount: 0, - thinkingCount: 0, - }); - } - - if (ui) { - ui.logMessage(sourceAgentId, `Executing triggered agent: ${triggeredAgentConfig.name ?? triggerAgentId}`); - ui.logMessage(triggerAgentId, '═'.repeat(80)); - ui.logMessage(triggerAgentId, `${triggeredAgentConfig.name ?? triggerAgentId} started to work (triggered).`); - } - - // Build prompt for triggered agent (memory write-only, no read) - const memoryDir = path.resolve(cwd, '.codemachine', 'memory'); - const adapter = new MemoryAdapter(memoryDir); - const store = new MemoryStore(adapter); - const compositePrompt = triggeredAgentTemplate; - - // Execute triggered agent - let totalTriggeredStdout = ''; - const triggeredResult = await engine.run({ - prompt: compositePrompt, - workingDir: cwd, - model: triggeredModel, - modelReasoningEffort: triggeredReasoning, - onData: (chunk) => { - totalTriggeredStdout += chunk; - - // Write to log file only (UI reads from log file) - if (loggerService && monitoringAgentId !== undefined) { - loggerService.write(monitoringAgentId, chunk); - } - }, - onErrorData: (chunk) => { - // Write stderr to log file only (UI reads from log file) - if (loggerService && monitoringAgentId !== undefined) { - loggerService.write(monitoringAgentId, `[STDERR] ${chunk}`); - } - }, - onTelemetry: (telemetry) => { - ui?.updateAgentTelemetry(triggerAgentId, telemetry); - - // Update telemetry in monitoring (fire and forget - don't block streaming) - if (monitor && monitoringAgentId !== undefined) { - monitor.updateTelemetry(monitoringAgentId, telemetry).catch(err => - console.error(`Failed to update telemetry: ${err}`) - ); - } - }, - abortSignal, - }); - - // NOTE: Telemetry is already updated via onTelemetry callback during streaming execution. - // DO NOT parse from final output - it would match the FIRST telemetry line (early/wrong values) - // instead of the LAST telemetry line (final/correct values), causing incorrect UI display. - - // Store output in memory - const triggeredStdout = triggeredResult.stdout || totalTriggeredStdout; - const triggeredSlice = triggeredStdout.slice(-2000); - await store.append({ - agentId: triggerAgentId, - content: triggeredSlice, - timestamp: new Date().toISOString(), - }); - - // Update UI status on completion - if (ui) { - // Always mark as completed (no failed status) - ui.updateAgentStatus(triggerAgentId, 'completed'); - } - - if (ui) { - ui.logMessage(triggerAgentId, `${triggeredAgentConfig.name ?? triggerAgentId} (triggered) has completed their work.`); - ui.logMessage(triggerAgentId, '═'.repeat(80)); - } - - // Mark agent as completed in monitoring - if (monitor && monitoringAgentId !== undefined) { - await monitor.complete(monitoringAgentId); - // Note: Don't close stream here - workflow may write more messages - // Streams will be closed by cleanup handlers or monitoring service shutdown - } - } catch (triggerError) { - // Mark agent as failed in monitoring - if (monitor && monitoringAgentId !== undefined) { - await monitor.fail(monitoringAgentId, triggerError as Error); - // Note: Don't close stream here - workflow may write more messages - // Streams will be closed by cleanup handlers or monitoring service shutdown - } - - // Don't update status to failed - let it stay as running/retrying - if (ui) { - ui.logMessage( - sourceAgentId, - `Triggered agent '${triggerAgentId}' failed: ${triggerError instanceof Error ? triggerError.message : String(triggerError)}` - ); - } - // Continue with workflow even if triggered agent fails - throw triggerError; - } +export async function executeTriggerAgent( + options: TriggerExecutionOptions, +): Promise { + const { + triggerAgentId, + cwd, + engineType, + logger: _logger, + stderrLogger: _stderrLogger, + sourceAgentId, + ui, + abortSignal, + disableMonitoring, + } = options; + + // Initialize monitoring (unless explicitly disabled) - declare outside try for catch block access + const monitor = !disableMonitoring ? AgentMonitorService.getInstance() : null; + const loggerService = !disableMonitoring + ? AgentLoggerService.getInstance() + : null; + let monitoringAgentId: number | undefined; + + try { + // Load agent config and template from config/main.agents.js + const triggeredAgentConfig = await loadAgentConfig(triggerAgentId, cwd); + const rawTemplate = await loadAgentTemplate(triggerAgentId, cwd); + const triggeredAgentTemplate = await processPromptString(rawTemplate, cwd); + + // Get engine and resolve model/reasoning + const engine = getEngine(engineType); + const engineModule = registry.get(engineType); + const triggeredModel = + (triggeredAgentConfig.model as string | undefined) ?? + engineModule?.metadata.defaultModel; + const triggeredReasoning = + (triggeredAgentConfig.modelReasoningEffort as + | "low" + | "medium" + | "high" + | undefined) ?? engineModule?.metadata.defaultModelReasoningEffort; + + // Find parent agent in monitoring system by sourceAgentId + let parentMonitoringId: number | undefined; + if (monitor) { + const parentAgents = monitor.queryAgents({ name: sourceAgentId }); + if (parentAgents.length > 0) { + // Get the most recent one (highest ID) + parentMonitoringId = parentAgents.sort((a, b) => b.id - a.id)[0].id; + } + + // Register triggered agent with parent relationship and engine/model info + const promptText = `Triggered by ${sourceAgentId}`; + monitoringAgentId = await monitor.register({ + name: triggerAgentId, + prompt: promptText, + parentId: parentMonitoringId, + engineProvider: engineType, + modelName: triggeredModel, + }); + + // Store full prompt for debug mode logging + if (loggerService && monitoringAgentId !== undefined) { + loggerService.storeFullPrompt(monitoringAgentId, promptText); + } + + // Register monitoring ID with UI immediately so it can load logs + if (ui && monitoringAgentId !== undefined) { + ui.registerMonitoringId(triggerAgentId, monitoringAgentId); + } + } + + // Add triggered agent to UI + if (ui) { + const engineName = engineType; // preserve original engine type, even if unknown + ui.addTriggeredAgent(sourceAgentId, { + id: triggerAgentId, + name: triggeredAgentConfig.name ?? triggerAgentId, + engine: engineName, + status: "running", + triggeredBy: sourceAgentId, + startTime: Date.now(), + telemetry: { tokensIn: 0, tokensOut: 0 }, + toolCount: 0, + thinkingCount: 0, + }); + } + + if (ui) { + ui.logMessage( + sourceAgentId, + `Executing triggered agent: ${triggeredAgentConfig.name ?? triggerAgentId}`, + ); + ui.logMessage(triggerAgentId, "═".repeat(80)); + ui.logMessage( + triggerAgentId, + `${triggeredAgentConfig.name ?? triggerAgentId} started to work (triggered).`, + ); + } + + // Build prompt for triggered agent (memory write-only, no read) + const memoryDir = path.resolve(cwd, ".codemachine", "memory"); + const adapter = new MemoryAdapter(memoryDir); + const store = new MemoryStore(adapter); + const compositePrompt = triggeredAgentTemplate; + + // Execute triggered agent + let totalTriggeredStdout = ""; + const triggeredResult = await engine.run({ + prompt: compositePrompt, + workingDir: cwd, + model: triggeredModel, + modelReasoningEffort: triggeredReasoning, + onData: (chunk) => { + totalTriggeredStdout += chunk; + + // Write to log file only (UI reads from log file) + if (loggerService && monitoringAgentId !== undefined) { + loggerService.write(monitoringAgentId, chunk); + } + }, + onErrorData: (chunk) => { + // Write stderr to log file only (UI reads from log file) + if (loggerService && monitoringAgentId !== undefined) { + loggerService.write(monitoringAgentId, `[STDERR] ${chunk}`); + } + }, + onTelemetry: (telemetry) => { + ui?.updateAgentTelemetry(triggerAgentId, telemetry); + + // Update telemetry in monitoring (fire and forget - don't block streaming) + if (monitor && monitoringAgentId !== undefined) { + monitor + .updateTelemetry(monitoringAgentId, telemetry) + .catch((err) => + console.error(`Failed to update telemetry: ${err}`), + ); + } + }, + abortSignal, + }); + + // NOTE: Telemetry is already updated via onTelemetry callback during streaming execution. + // DO NOT parse from final output - it would match the FIRST telemetry line (early/wrong values) + // instead of the LAST telemetry line (final/correct values), causing incorrect UI display. + + // Store output in memory (skip if empty to avoid validation error) + const triggeredStdout = triggeredResult.stdout || totalTriggeredStdout; + const triggeredSlice = triggeredStdout.slice(-2000).trim(); + if (triggeredSlice) { + await store.append({ + agentId: triggerAgentId, + content: triggeredSlice, + timestamp: new Date().toISOString(), + }); + } + + // Update UI status on completion + if (ui) { + // Always mark as completed (no failed status) + ui.updateAgentStatus(triggerAgentId, "completed"); + } + + if (ui) { + ui.logMessage( + triggerAgentId, + `${triggeredAgentConfig.name ?? triggerAgentId} (triggered) has completed their work.`, + ); + ui.logMessage(triggerAgentId, "═".repeat(80)); + } + + // Mark agent as completed in monitoring + if (monitor && monitoringAgentId !== undefined) { + await monitor.complete(monitoringAgentId); + // Note: Don't close stream here - workflow may write more messages + // Streams will be closed by cleanup handlers or monitoring service shutdown + } + } catch (triggerError) { + // Mark agent as failed in monitoring + if (monitor && monitoringAgentId !== undefined) { + await monitor.fail(monitoringAgentId, triggerError as Error); + // Note: Don't close stream here - workflow may write more messages + // Streams will be closed by cleanup handlers or monitoring service shutdown + } + + // Don't update status to failed - let it stay as running/retrying + if (ui) { + ui.logMessage( + sourceAgentId, + `Triggered agent '${triggerAgentId}' failed: ${triggerError instanceof Error ? triggerError.message : String(triggerError)}`, + ); + } + // Continue with workflow even if triggered agent fails + throw triggerError; + } } diff --git a/src/workflows/templates/loader.ts b/src/workflows/templates/loader.ts index 44ce7d88..b348d972 100644 --- a/src/workflows/templates/loader.ts +++ b/src/workflows/templates/loader.ts @@ -1,88 +1,136 @@ -import * as path from 'node:path'; -import { createRequire } from 'node:module'; -import { pathToFileURL } from 'node:url'; -import type { WorkflowTemplate } from './types.js'; -import { validateWorkflowTemplate } from './validator.js'; -import { ensureTemplateGlobals } from './globals.js'; -import { resolvePackageRoot } from '../../shared/runtime/pkg.js'; +import * as path from "node:path"; +import { createRequire } from "node:module"; +import { pathToFileURL } from "node:url"; +import { existsSync } from "node:fs"; +import type { WorkflowTemplate } from "./types.js"; +import { validateWorkflowTemplate } from "./validator.js"; +import { ensureTemplateGlobals } from "./globals.js"; +import { resolvePackageRoot } from "../../shared/runtime/pkg.js"; // Package root resolution -export const packageRoot = resolvePackageRoot(import.meta.url, 'workflow templates loader'); +export const packageRoot = resolvePackageRoot( + import.meta.url, + "workflow templates loader", +); -export const templatesDir = path.resolve(packageRoot, 'templates', 'workflows'); +export const templatesDir = path.resolve(packageRoot, "templates", "workflows"); // Module loading export async function loadWorkflowModule(modPath: string): Promise { - ensureTemplateGlobals(); - const ext = path.extname(modPath).toLowerCase(); - if (ext === '.cjs' || ext === '.cts') { - const require = createRequire(import.meta.url); - try { - delete require.cache[require.resolve(modPath)]; - } catch { - // Ignore cache deletion errors - } - return require(modPath); - } + ensureTemplateGlobals(); + const ext = path.extname(modPath).toLowerCase(); + if (ext === ".cjs" || ext === ".cts") { + const require = createRequire(import.meta.url); + try { + delete require.cache[require.resolve(modPath)]; + } catch { + // Ignore cache deletion errors + } + return require(modPath); + } - const fileUrl = pathToFileURL(modPath); - const cacheBustingUrl = new URL(fileUrl.href); - cacheBustingUrl.searchParams.set('ts', Date.now().toString()); - const mod = await import(cacheBustingUrl.href); - return mod?.default ?? mod; + const fileUrl = pathToFileURL(modPath); + const cacheBustingUrl = new URL(fileUrl.href); + cacheBustingUrl.searchParams.set("ts", Date.now().toString()); + const mod = await import(cacheBustingUrl.href); + return mod?.default ?? mod; } // Template loading -export async function loadTemplate(cwd: string, templatePath?: string): Promise { - const resolvedTemplateOverride = templatePath - ? path.isAbsolute(templatePath) - ? templatePath - : path.resolve(packageRoot, templatePath) - : undefined; - const codemachineTemplate = path.resolve(templatesDir, 'codemachine.workflow.js'); - const candidates = [resolvedTemplateOverride, codemachineTemplate].filter(Boolean) as string[]; +export async function loadTemplate( + cwd: string, + templatePath?: string, +): Promise { + const resolvedTemplateOverride = templatePath + ? resolveTemplatePath(cwd, templatePath) + : undefined; + const codemachineTemplate = path.resolve( + templatesDir, + "codemachine.workflow.js", + ); + const candidates = [resolvedTemplateOverride, codemachineTemplate].filter( + Boolean, + ) as string[]; - const errors: string[] = []; - for (const modPath of candidates) { - try { - const tpl = (await loadWorkflowModule(modPath)) as unknown; - const result = validateWorkflowTemplate(tpl); - if (result.valid) return tpl as WorkflowTemplate; - const rel = path.relative(cwd, modPath); - errors.push(`${rel}: ${result.errors.join('; ')}`); - } catch (e) { - const rel = path.relative(cwd, modPath); - errors.push(`${rel}: ${e instanceof Error ? e.message : String(e)}`); - } - } - const looked = candidates.map((p) => path.relative(cwd, p)).join(', '); - const details = errors.length ? `\nValidation errors:\n- ${errors.join('\n- ')}` : ''; - throw new Error(`No workflow template found. Looked for: ${looked}${details}`); + const errors: string[] = []; + for (const modPath of candidates) { + try { + const tpl = (await loadWorkflowModule(modPath)) as unknown; + const result = validateWorkflowTemplate(tpl); + if (result.valid) return tpl as WorkflowTemplate; + const rel = path.relative(cwd, modPath); + errors.push(`${rel}: ${result.errors.join("; ")}`); + } catch (e) { + const rel = path.relative(cwd, modPath); + errors.push(`${rel}: ${e instanceof Error ? e.message : String(e)}`); + } + } + const looked = candidates.map((p) => path.relative(cwd, p)).join(", "); + const details = errors.length + ? `\nValidation errors:\n- ${errors.join("\n- ")}` + : ""; + throw new Error( + `No workflow template found. Looked for: ${looked}${details}`, + ); } -export async function loadTemplateWithPath(cwd: string, templatePath?: string): Promise<{ template: WorkflowTemplate; resolvedPath: string }> { - const resolvedTemplateOverride = templatePath - ? path.isAbsolute(templatePath) - ? templatePath - : path.resolve(packageRoot, templatePath) - : undefined; - const codemachineTemplate = path.resolve(templatesDir, 'codemachine.workflow.js'); - const candidates = [resolvedTemplateOverride, codemachineTemplate].filter(Boolean) as string[]; +/** + * Resolves a template path, checking the project's .codemachine directory first. + * For relative paths like "templates/workflows/foo.workflow.js": + * 1. First check if it exists at {cwd}/.codemachine/{templatePath} + * 2. Fall back to CodeMachine's package templates directory + * For absolute paths, use them directly. + */ +function resolveTemplatePath(cwd: string, templatePath: string): string { + // Absolute paths are used directly + if (path.isAbsolute(templatePath)) { + return templatePath; + } - const errors: string[] = []; - for (const modPath of candidates) { - try { - const tpl = (await loadWorkflowModule(modPath)) as unknown; - const result = validateWorkflowTemplate(tpl); - if (result.valid) return { template: tpl as WorkflowTemplate, resolvedPath: modPath }; - const rel = path.relative(cwd, modPath); - errors.push(`${rel}: ${result.errors.join('; ')}`); - } catch (e) { - const rel = path.relative(cwd, modPath); - errors.push(`${rel}: ${e instanceof Error ? e.message : String(e)}`); - } - } - const looked = candidates.map((p) => path.relative(cwd, p)).join(', '); - const details = errors.length ? `\nValidation errors:\n- ${errors.join('\n- ')}` : ''; - throw new Error(`No workflow template found. Looked for: ${looked}${details}`); + // Check project's .codemachine directory first + const projectPath = path.join(cwd, ".codemachine", templatePath); + if (existsSync(projectPath)) { + return projectPath; + } + + // Fall back to CodeMachine's templates directory + return path.resolve(packageRoot, templatePath); +} + +export async function loadTemplateWithPath( + cwd: string, + templatePath?: string, +): Promise<{ template: WorkflowTemplate; resolvedPath: string }> { + const resolvedTemplateOverride = templatePath + ? resolveTemplatePath(cwd, templatePath) + : undefined; + const codemachineTemplate = path.resolve( + templatesDir, + "codemachine.workflow.js", + ); + const candidates = [resolvedTemplateOverride, codemachineTemplate].filter( + Boolean, + ) as string[]; + + const errors: string[] = []; + for (const modPath of candidates) { + try { + const tpl = (await loadWorkflowModule(modPath)) as unknown; + const result = validateWorkflowTemplate(tpl); + if (result.valid) + return { template: tpl as WorkflowTemplate, resolvedPath: modPath }; + const rel = path.relative(cwd, modPath); + errors.push(`${rel}: ${result.errors.join("; ")}`); + } catch (e) { + const rel = path.relative(cwd, modPath); + errors.push(`${rel}: ${e instanceof Error ? e.message : String(e)}`); + } + } + const looked = candidates.map((p) => path.relative(cwd, p)).join(", "); + const details = errors.length + ? `\nValidation errors:\n- ${errors.join("\n- ")}` + : ""; + throw new Error( + `No workflow template found. Looked for: ${looked}${details}`, + ); } diff --git a/src/workflows/templates/types.ts b/src/workflows/templates/types.ts index 84e8e7ed..3066fca2 100644 --- a/src/workflows/templates/types.ts +++ b/src/workflows/templates/types.ts @@ -34,7 +34,10 @@ export interface ModuleStep { type: "module"; agentId: string; agentName: string; - promptPath: string; + /** Path to prompt file (either promptPath or prompt is required) */ + promptPath?: string; + /** Inline prompt string (either promptPath or prompt is required) */ + prompt?: string; model?: string; modelReasoningEffort?: "low" | "medium" | "high"; engine?: string; // Dynamic engine type from registry diff --git a/src/workflows/templates/validator.ts b/src/workflows/templates/validator.ts index 5bc282dc..62d50bb7 100644 --- a/src/workflows/templates/validator.ts +++ b/src/workflows/templates/validator.ts @@ -1,125 +1,164 @@ -import type { WorkflowTemplate } from './types.js'; +import type { WorkflowTemplate } from "./types.js"; export interface ValidationResult { - valid: boolean; - errors: string[]; + valid: boolean; + errors: string[]; } export function validateWorkflowTemplate(value: unknown): ValidationResult { - const errors: string[] = []; - if (!value || typeof value !== 'object') { - return { valid: false, errors: ['Template is not an object'] }; - } + const errors: string[] = []; + if (!value || typeof value !== "object") { + return { valid: false, errors: ["Template is not an object"] }; + } - const obj = value as { name?: unknown; steps?: unknown }; - if (typeof obj.name !== 'string' || obj.name.trim().length === 0) { - errors.push('Template.name must be a non-empty string'); - } - if (!Array.isArray(obj.steps)) { - errors.push('Template.steps must be an array'); - } else { - obj.steps.forEach((step, index) => { - if (!step || typeof step !== 'object') { - errors.push(`Step[${index}] must be an object`); - return; - } - const candidate = step as { - type?: unknown; - agentId?: unknown; - agentName?: unknown; - promptPath?: unknown; - model?: unknown; - modelReasoningEffort?: unknown; - module?: unknown; - executeOnce?: unknown; - text?: unknown; - }; + const obj = value as { name?: unknown; steps?: unknown }; + if (typeof obj.name !== "string" || obj.name.trim().length === 0) { + errors.push("Template.name must be a non-empty string"); + } + if (!Array.isArray(obj.steps)) { + errors.push("Template.steps must be an array"); + } else { + obj.steps.forEach((step, index) => { + if (!step || typeof step !== "object") { + errors.push(`Step[${index}] must be an object`); + return; + } + const candidate = step as { + type?: unknown; + agentId?: unknown; + agentName?: unknown; + promptPath?: unknown; + prompt?: unknown; + model?: unknown; + modelReasoningEffort?: unknown; + module?: unknown; + executeOnce?: unknown; + text?: unknown; + }; - // Validate step type - if (candidate.type !== 'module' && candidate.type !== 'ui') { - errors.push(`Step[${index}].type must be 'module' or 'ui'`); - } + // Validate step type + if (candidate.type !== "module" && candidate.type !== "ui") { + errors.push(`Step[${index}].type must be 'module' or 'ui'`); + } - // Validate UI step - if (candidate.type === 'ui') { - if (typeof candidate.text !== 'string' || (candidate.text as string).trim().length === 0) { - errors.push(`Step[${index}].text must be a non-empty string`); - } - // UI steps don't need other validation - return; - } + // Validate UI step + if (candidate.type === "ui") { + if ( + typeof candidate.text !== "string" || + (candidate.text as string).trim().length === 0 + ) { + errors.push(`Step[${index}].text must be a non-empty string`); + } + // UI steps don't need other validation + return; + } - // Validate module step - if (candidate.type === 'module') { - if (typeof candidate.agentId !== 'string') { - errors.push(`Step[${index}].agentId must be a string`); - } - if (typeof candidate.agentName !== 'string') { - errors.push(`Step[${index}].agentName must be a string`); - } - if (typeof candidate.promptPath !== 'string') { - errors.push(`Step[${index}].promptPath must be a string`); - } + // Validate module step + if (candidate.type === "module") { + if (typeof candidate.agentId !== "string") { + errors.push(`Step[${index}].agentId must be a string`); + } + if (typeof candidate.agentName !== "string") { + errors.push(`Step[${index}].agentName must be a string`); + } + // Allow either promptPath (file reference) or prompt (inline string) + const hasPromptPath = typeof candidate.promptPath === "string"; + const hasInlinePrompt = + typeof (candidate as { prompt?: unknown }).prompt === "string"; + if (!hasPromptPath && !hasInlinePrompt) { + errors.push( + `Step[${index}] must have either promptPath or prompt as a string`, + ); + } - if (candidate.model !== undefined && typeof candidate.model !== 'string') { - errors.push(`Step[${index}].model must be a string`); - } + if ( + candidate.model !== undefined && + typeof candidate.model !== "string" + ) { + errors.push(`Step[${index}].model must be a string`); + } - if (candidate.modelReasoningEffort !== undefined) { - const mre = candidate.modelReasoningEffort; - if (mre !== 'low' && mre !== 'medium' && mre !== 'high') { - errors.push( - `Step[${index}].modelReasoningEffort must be one of 'low'|'medium'|'high' (got '${String(mre)}')`, - ); - } - } + if (candidate.modelReasoningEffort !== undefined) { + const mre = candidate.modelReasoningEffort; + if (mre !== "low" && mre !== "medium" && mre !== "high") { + errors.push( + `Step[${index}].modelReasoningEffort must be one of 'low'|'medium'|'high' (got '${String(mre)}')`, + ); + } + } - if (candidate.executeOnce !== undefined && typeof candidate.executeOnce !== 'boolean') { - errors.push(`Step[${index}].executeOnce must be a boolean`); - } + if ( + candidate.executeOnce !== undefined && + typeof candidate.executeOnce !== "boolean" + ) { + errors.push(`Step[${index}].executeOnce must be a boolean`); + } - if (candidate.module !== undefined) { - if (!candidate.module || typeof candidate.module !== 'object') { - errors.push(`Step[${index}].module must be an object`); - } else { - const moduleMeta = candidate.module as { id?: unknown; behavior?: unknown }; - if (typeof moduleMeta.id !== 'string') { - errors.push(`Step[${index}].module.id must be a string`); - } - if (moduleMeta.behavior !== undefined) { - if (!moduleMeta.behavior || typeof moduleMeta.behavior !== 'object') { - errors.push(`Step[${index}].module.behavior must be an object`); - } else { - const behavior = moduleMeta.behavior as { - type?: unknown; - action?: unknown; - steps?: unknown; - trigger?: unknown; - maxIterations?: unknown; - }; - if (behavior.type !== 'loop' || behavior.action !== 'stepBack') { - errors.push(`Step[${index}].module.behavior must be { type: 'loop', action: 'stepBack', ... }`); - } - if (typeof behavior.steps !== 'number' || behavior.steps <= 0) { - errors.push(`Step[${index}].module.behavior.steps must be a positive number`); - } - if (behavior.trigger !== undefined && typeof behavior.trigger !== 'string') { - errors.push(`Step[${index}].module.behavior.trigger must be a string if provided`); - } - if (behavior.maxIterations !== undefined && typeof behavior.maxIterations !== 'number') { - errors.push(`Step[${index}].module.behavior.maxIterations must be a number`); - } - } - } - } - } - } - }); - } + if (candidate.module !== undefined) { + if (!candidate.module || typeof candidate.module !== "object") { + errors.push(`Step[${index}].module must be an object`); + } else { + const moduleMeta = candidate.module as { + id?: unknown; + behavior?: unknown; + }; + if (typeof moduleMeta.id !== "string") { + errors.push(`Step[${index}].module.id must be a string`); + } + if (moduleMeta.behavior !== undefined) { + if ( + !moduleMeta.behavior || + typeof moduleMeta.behavior !== "object" + ) { + errors.push(`Step[${index}].module.behavior must be an object`); + } else { + const behavior = moduleMeta.behavior as { + type?: unknown; + action?: unknown; + steps?: unknown; + trigger?: unknown; + maxIterations?: unknown; + }; + if ( + behavior.type !== "loop" || + behavior.action !== "stepBack" + ) { + errors.push( + `Step[${index}].module.behavior must be { type: 'loop', action: 'stepBack', ... }`, + ); + } + if (typeof behavior.steps !== "number" || behavior.steps <= 0) { + errors.push( + `Step[${index}].module.behavior.steps must be a positive number`, + ); + } + if ( + behavior.trigger !== undefined && + typeof behavior.trigger !== "string" + ) { + errors.push( + `Step[${index}].module.behavior.trigger must be a string if provided`, + ); + } + if ( + behavior.maxIterations !== undefined && + typeof behavior.maxIterations !== "number" + ) { + errors.push( + `Step[${index}].module.behavior.maxIterations must be a number`, + ); + } + } + } + } + } + } + }); + } - return { valid: errors.length === 0, errors }; + return { valid: errors.length === 0, errors }; } export function isWorkflowTemplate(value: unknown): value is WorkflowTemplate { - return validateWorkflowTemplate(value).valid; + return validateWorkflowTemplate(value).valid; }