"use strict";Object.defineProperty(exports, "__esModule", { value: true });exports.agentLoop = agentLoop;exports.agentLoopContinue = agentLoopContinue; var _piAi = require("@mariozechner/pi-ai"); /** * Agent loop that works with AgentMessage throughout. * Transforms to Message[] only at the LLM call boundary. */ /** * Start an agent loop with a new prompt message. * The prompt is added to the context and events are emitted for it. */function agentLoop(prompts, context, config, signal, streamFn) {const stream = createAgentStream(); (async () => { const newMessages = [...prompts]; const currentContext = { ...context, messages: [...context.messages, ...prompts] }; stream.push({ type: "agent_start" }); stream.push({ type: "turn_start" }); for (const prompt of prompts) { stream.push({ type: "message_start", message: prompt }); stream.push({ type: "message_end", message: prompt }); } await runLoop(currentContext, newMessages, config, signal, stream, streamFn); })(); return stream; } /** * Continue an agent loop from the current context without adding a new message. * Used for retries - context already has user message or tool results. * * **Important:** The last message in context must convert to a `user` or `toolResult` message * via `convertToLlm`. If it doesn't, the LLM provider will reject the request. * This cannot be validated here since `convertToLlm` is only called once per turn. */ function agentLoopContinue(context, config, signal, streamFn) { if (context.messages.length === 0) { throw new Error("Cannot continue: no messages in context"); } if (context.messages[context.messages.length - 1].role === "assistant") { throw new Error("Cannot continue from message role: assistant"); } const stream = createAgentStream(); (async () => { const newMessages = []; const currentContext = { ...context }; stream.push({ type: "agent_start" }); stream.push({ type: "turn_start" }); await runLoop(currentContext, newMessages, config, signal, stream, streamFn); })(); return stream; } function createAgentStream() { return new _piAi.EventStream((event) => event.type === "agent_end", (event) => event.type === "agent_end" ? event.messages : []); } /** * Main loop logic shared by agentLoop and agentLoopContinue. */ async function runLoop(currentContext, newMessages, config, signal, stream, streamFn) { let firstTurn = true; // Check for steering messages at start (user may have typed while waiting) let pendingMessages = (await config.getSteeringMessages?.()) || []; // Outer loop: continues when queued follow-up messages arrive after agent would stop while (true) { let hasMoreToolCalls = true; let steeringAfterTools = null; // Inner loop: process tool calls and steering messages while (hasMoreToolCalls || pendingMessages.length > 0) { if (!firstTurn) { stream.push({ type: "turn_start" }); } else { firstTurn = false; } // Process pending messages (inject before next assistant response) if (pendingMessages.length > 0) { for (const message of pendingMessages) { stream.push({ type: "message_start", message }); stream.push({ type: "message_end", message }); currentContext.messages.push(message); newMessages.push(message); } pendingMessages = []; } // Stream assistant response const message = await streamAssistantResponse(currentContext, config, signal, stream, streamFn); newMessages.push(message); if (message.stopReason === "error" || message.stopReason === "aborted") { stream.push({ type: "turn_end", message, toolResults: [] }); stream.push({ type: "agent_end", messages: newMessages }); stream.end(newMessages); return; } // Check for tool calls const toolCalls = message.content.filter((c) => c.type === "toolCall"); hasMoreToolCalls = toolCalls.length > 0; const toolResults = []; if (hasMoreToolCalls) { const toolExecution = await executeToolCalls(currentContext.tools, message, signal, stream, config.getSteeringMessages); toolResults.push(...toolExecution.toolResults); steeringAfterTools = toolExecution.steeringMessages ?? null; for (const result of toolResults) { currentContext.messages.push(result); newMessages.push(result); } } stream.push({ type: "turn_end", message, toolResults }); // Get steering messages after turn completes if (steeringAfterTools && steeringAfterTools.length > 0) { pendingMessages = steeringAfterTools; steeringAfterTools = null; } else { pendingMessages = (await config.getSteeringMessages?.()) || []; } } // Agent would stop here. Check for follow-up messages. const followUpMessages = (await config.getFollowUpMessages?.()) || []; if (followUpMessages.length > 0) { // Set as pending so inner loop processes them pendingMessages = followUpMessages; continue; } // No more messages, exit break; } stream.push({ type: "agent_end", messages: newMessages }); stream.end(newMessages); } /** * Stream an assistant response from the LLM. * This is where AgentMessage[] gets transformed to Message[] for the LLM. */ async function streamAssistantResponse(context, config, signal, stream, streamFn) { // Apply context transform if configured (AgentMessage[] → AgentMessage[]) let messages = context.messages; if (config.transformContext) { messages = await config.transformContext(messages, signal); } // Convert to LLM-compatible messages (AgentMessage[] → Message[]) const llmMessages = await config.convertToLlm(messages); // Build LLM context const llmContext = { systemPrompt: context.systemPrompt, messages: llmMessages, tools: context.tools }; const streamFunction = streamFn || _piAi.streamSimple; // Resolve API key (important for expiring tokens) const resolvedApiKey = (config.getApiKey ? await config.getApiKey(config.model.provider) : undefined) || config.apiKey; const response = await streamFunction(config.model, llmContext, { ...config, apiKey: resolvedApiKey, signal }); let partialMessage = null; let addedPartial = false; for await (const event of response) { switch (event.type) { case "start": partialMessage = event.partial; context.messages.push(partialMessage); addedPartial = true; stream.push({ type: "message_start", message: { ...partialMessage } }); break; case "text_start": case "text_delta": case "text_end": case "thinking_start": case "thinking_delta": case "thinking_end": case "toolcall_start": case "toolcall_delta": case "toolcall_end": if (partialMessage) { partialMessage = event.partial; context.messages[context.messages.length - 1] = partialMessage; stream.push({ type: "message_update", assistantMessageEvent: event, message: { ...partialMessage } }); } break; case "done": case "error":{ const finalMessage = await response.result(); if (addedPartial) { context.messages[context.messages.length - 1] = finalMessage; } else { context.messages.push(finalMessage); } if (!addedPartial) { stream.push({ type: "message_start", message: { ...finalMessage } }); } stream.push({ type: "message_end", message: finalMessage }); return finalMessage; } } } return await response.result(); } /** * Execute tool calls from an assistant message. */ async function executeToolCalls(tools, assistantMessage, signal, stream, getSteeringMessages) { const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall"); const results = []; let steeringMessages; for (let index = 0; index < toolCalls.length; index++) { const toolCall = toolCalls[index]; const tool = tools?.find((t) => t.name === toolCall.name); stream.push({ type: "tool_execution_start", toolCallId: toolCall.id, toolName: toolCall.name, args: toolCall.arguments }); let result; let isError = false; try { if (!tool) throw new Error(`Tool ${toolCall.name} not found`); const validatedArgs = (0, _piAi.validateToolArguments)(tool, toolCall); result = await tool.execute(toolCall.id, validatedArgs, signal, (partialResult) => { stream.push({ type: "tool_execution_update", toolCallId: toolCall.id, toolName: toolCall.name, args: toolCall.arguments, partialResult }); }); } catch (e) { result = { content: [{ type: "text", text: e instanceof Error ? e.message : String(e) }], details: {} }; isError = true; } stream.push({ type: "tool_execution_end", toolCallId: toolCall.id, toolName: toolCall.name, result, isError }); const toolResultMessage = { role: "toolResult", toolCallId: toolCall.id, toolName: toolCall.name, content: result.content, details: result.details, isError, timestamp: Date.now() }; results.push(toolResultMessage); stream.push({ type: "message_start", message: toolResultMessage }); stream.push({ type: "message_end", message: toolResultMessage }); // Check for steering messages - skip remaining tools if user interrupted if (getSteeringMessages) { const steering = await getSteeringMessages(); if (steering.length > 0) { steeringMessages = steering; const remainingCalls = toolCalls.slice(index + 1); for (const skipped of remainingCalls) { results.push(skipToolCall(skipped, stream)); } break; } } } return { toolResults: results, steeringMessages }; } function skipToolCall(toolCall, stream) { const result = { content: [{ type: "text", text: "Skipped due to queued user message." }], details: {} }; stream.push({ type: "tool_execution_start", toolCallId: toolCall.id, toolName: toolCall.name, args: toolCall.arguments }); stream.push({ type: "tool_execution_end", toolCallId: toolCall.id, toolName: toolCall.name, result, isError: true }); const toolResultMessage = { role: "toolResult", toolCallId: toolCall.id, toolName: toolCall.name, content: result.content, details: {}, isError: true, timestamp: Date.now() }; stream.push({ type: "message_start", message: toolResultMessage }); stream.push({ type: "message_end", message: toolResultMessage }); return toolResultMessage; } /* v9-fb3330d522687889 */