"use strict";Object.defineProperty(exports, "__esModule", { value: true });exports.getWsClient = getWsClient;exports.startWsClient = startWsClient;exports.stopWsClient = stopWsClient;exports.waitForWsConnection = waitForWsConnection; var _aibotNodeSdk = require("@wecom/aibot-node-sdk"); var _monitor = require("./monitor.js"); var _state = require("./monitor/state.js"); var _runtime = require("./runtime.js"); /** * WeCom WebSocket 长链接模式适配器 * * 职责:管理 WSClient 生命周期,将 SDK 事件桥接到现有 monitor.ts 消息管线。 * * SDK WsFrame 事件 * ↓ * ws-adapter 转换为 WecomBotInboundMessage 格式 * ↓ * 复用 monitor.ts 中的 shouldProcessBotInboundMessage → buildInboundBody * → streamStore.addPendingMessage → flushPending 管线 */ // ─── WSClient Instance Registry ──────────────────────────────────────── const wsClients = new Map(); /** * 获取指定账号的 WSClient 实例 */function getWsClient(accountId) {return wsClients.get(accountId);} /** * 等待 WSClient 连接就绪,最多等待 timeoutMs 毫秒(默认 30 秒)。 * 如果已连接则立即返回;如果 client 尚未创建,会轮询等待创建后再监听连接事件。 */async function waitForWsConnection(accountId, timeoutMs = 30_000) { const deadline = Date.now() + timeoutMs; // 等待 client 实例出现(gateway 重启时 client 可能还没注册) while (!wsClients.has(accountId)) { if (Date.now() >= deadline) return false; await new Promise((r) => setTimeout(r, 500)); } const client = wsClients.get(accountId); if (client.isConnected) return true; const remaining = deadline - Date.now(); if (remaining <= 0) return false; return new Promise((resolve) => { const timer = setTimeout(() => { cleanup(); resolve(false); }, remaining); const onConnected = () => { cleanup(); resolve(true); }; const cleanup = () => { clearTimeout(timer); client.off("connected", onConnected); }; client.on("connected", onConnected); // 再检查一次,防止在注册监听器之前已连上 if (client.isConnected) { cleanup(); resolve(true); } }); } // ─── Stream Reply Watcher ────────────────────────────────────────────── /** * 流式回复监听器:轮询 StreamState 变化并通过 WSClient 推送回复 */ function watchStreamReply(params) { const { wsClient, frame, streamId, log, error } = params; const streamStore = _state.monitorState.streamStore; let lastSentContent = ""; let finished = false; const POLL_INTERVAL_MS = 200; const tick = async () => { if (finished) return; const state = streamStore.getStream(streamId); if (!state) { finished = true; return; } const content = state.content ?? ""; const isFinished = state.finished ?? false; // 有新内容或流结束时发送 if (content !== lastSentContent || isFinished) { try { // 构建图片附件(仅在结束时) let msgItems; if (isFinished && state.images?.length) { msgItems = state.images.map((img) => ({ msgtype: "image", image: { base64: img.base64, md5: img.md5 } })); } await wsClient.replyStream( frame, streamId, content, isFinished, msgItems ); lastSentContent = content; log?.(`ws-reply: streamId=${streamId} len=${content.length} finish=${isFinished}`); } catch (err) { error?.(`ws-reply: replyStream failed streamId=${streamId}: ${String(err)}`); } } if (isFinished) { finished = true; return; } setTimeout(tick, POLL_INTERVAL_MS); }; // 初次延迟启动,等待 agent 开始生产内容 setTimeout(tick, POLL_INTERVAL_MS); } // ─── SDK Message → WecomBotInboundMessage Conversion ─────────────────── /** * 将 SDK 的 WsFrame 转换为现有的 WecomBotInboundMessage 格式 */ function convertSdkMessageToInbound(body) { const base = { msgid: body.msgid, aibotid: body.aibotid, chattype: body.chattype, chatid: body.chatid, response_url: body.response_url, from: body.from ? { userid: body.from.userid } : undefined, msgtype: body.msgtype }; const msgtype = String(body.msgtype ?? "").toLowerCase(); if (msgtype === "text") { const textBody = body; return { ...base, msgtype: "text", text: textBody.text, quote: textBody.quote }; } if (msgtype === "voice") { const voiceBody = body; return { ...base, msgtype: "voice", voice: voiceBody.voice, quote: voiceBody.quote }; } if (msgtype === "image") { const imageBody = body; return { ...base, msgtype: "image", image: imageBody.image, quote: imageBody.quote }; } if (msgtype === "file") { const fileBody = body; return { ...base, msgtype: "file", file: fileBody.file, quote: fileBody.quote }; } if (msgtype === "mixed") { const mixedBody = body; return { ...base, msgtype: "mixed", mixed: mixedBody.mixed, quote: mixedBody.quote }; } // Fallback: pass through as-is return { ...base, ...body }; } // ─── WS Event Handlers ──────────────────────────────────────────────── function setupMessageHandler(params) { const { wsClient, accountId, target } = params; const streamStore = _state.monitorState.streamStore; // 监听所有消息类型 wsClient.on("message", (frame) => { const body = frame.body; if (!body) return; const msgtype = String(body.msgtype ?? "").toLowerCase(); // event 类型由专门的 event handler 处理 if (msgtype === "event") return; const msg = convertSdkMessageToInbound(body); const decision = (0, _monitor.shouldProcessBotInboundMessage)(msg); if (!decision.shouldProcess) { target.runtime.log?.( `[${accountId}] ws-inbound: skipped msgtype=${msgtype} reason=${decision.reason}` ); return; } const userid = decision.senderUserId; const chatId = decision.chatId ?? userid; const conversationKey = `wecom:${accountId}:${userid}:${chatId}`; const msgContent = (0, _monitor.buildInboundBody)(msg); target.runtime.log?.( `[${accountId}] ws-inbound: msgtype=${msgtype} chattype=${String(msg.chattype ?? "")} ` + `from=${userid} msgid=${String(msg.msgid ?? "")}` ); // 消息去重 if (msg.msgid) { const existingStreamId = streamStore.getStreamByMsgId(String(msg.msgid)); if (existingStreamId) { target.runtime.log?.( `[${accountId}] ws-inbound: duplicate msgid=${msg.msgid}, skipping` ); return; } } // 加入 Pending 队列(复用现有防抖/聚合逻辑) const { streamId } = streamStore.addPendingMessage({ conversationKey, target, msg, msgContent, nonce: "", timestamp: String(Date.now()), debounceMs: target.account.config.debounceMs }); // 标记 wsMode streamStore.updateStream(streamId, (s) => { s.wsMode = true; }); // 注册流式回复监听器 watchStreamReply({ wsClient, frame, streamId, log: (msg) => target.runtime.log?.(`[${accountId}] ${msg}`), error: (msg) => target.runtime.error?.(`[${accountId}] ${msg}`) }); target.statusSink?.({ lastInboundAt: Date.now() }); }); } function setupEventHandler(params) { const { wsClient, accountId, target, welcomeText } = params; const streamStore = _state.monitorState.streamStore; // 进入会话事件 → 欢迎语 wsClient.on("event.enter_chat", async (frame) => { const text = welcomeText?.trim(); if (!text) return; try { await wsClient.replyWelcome(frame, { msgtype: "text", text: { content: text } }); target.runtime.log?.(`[${accountId}] ws-event: sent welcome text`); } catch (err) { target.runtime.error?.(`[${accountId}] ws-event: replyWelcome failed: ${String(err)}`); } }); // 模板卡片交互事件 → 转换为文本消息注入管线 wsClient.on("event.template_card_event", (frame) => { const body = frame.body; if (!body) return; const eventData = body.event; let interactionDesc = `[卡片交互] 按钮: ${eventData?.event_key || "unknown"}`; if (eventData?.task_id) interactionDesc += ` (任务ID: ${eventData.task_id})`; const msgid = body.msgid ? String(body.msgid) : undefined; // 去重 if (msgid && streamStore.getStreamByMsgId(msgid)) { target.runtime.log?.(`[${accountId}] ws-event: template_card_event already processed msgid=${msgid}`); return; } const streamId = streamStore.createStream({ msgid }); streamStore.markStarted(streamId); streamStore.updateStream(streamId, (s) => { s.wsMode = true; }); const syntheticMsg = { msgid, aibotid: body.aibotid, chattype: body.chattype, chatid: body.chatid, from: body.from ? { userid: body.from.userid } : undefined, msgtype: "text", text: { content: interactionDesc } }; let core; try { core = (0, _runtime.getWecomRuntime)(); } catch { target.runtime.error?.(`[${accountId}] ws-event: runtime not ready for template_card_event`); streamStore.markFinished(streamId); return; } // 由于卡片事件没有经过防抖队列,直接触发 flushPending 的等效操作 // 需要通过 addPendingMessage 注入,让现有管线处理 const userid = body.from?.userid ?? "unknown"; const chatId = body.chatid ?? userid; const conversationKey = `wecom:${accountId}:${userid}:${chatId}`; // 先清除之前创建的 stream(addPendingMessage 会创建新的) // 直接用 addPendingMessage 复用完整管线 const enrichedTarget = { ...target, core }; const { streamId: actualStreamId } = streamStore.addPendingMessage({ conversationKey, target: enrichedTarget, msg: syntheticMsg, msgContent: interactionDesc, nonce: "", timestamp: String(Date.now()), debounceMs: 0 // 卡片事件不防抖 }); streamStore.updateStream(actualStreamId, (s) => { s.wsMode = true; }); watchStreamReply({ wsClient, frame, streamId: actualStreamId, log: (msg) => target.runtime.log?.(`[${accountId}] ${msg}`), error: (msg) => target.runtime.error?.(`[${accountId}] ${msg}`) }); }); // 反馈事件 → 仅记录日志 wsClient.on("event.feedback_event", (frame) => { target.runtime.log?.( `[${accountId}] ws-event: feedback_event received (logged only)` ); }); } // ─── WSClient Lifecycle ──────────────────────────────────────────────── /** * 启动 WebSocket 长链接客户端 * @returns cleanup 函数(用于注销) */ function startWsClient(params) { const { accountId, botId, secret, account, config, runtime, core, statusSink, welcomeText } = params; // 如果已有实例,先停止 stopWsClient(accountId); const wsClient = new _aibotNodeSdk.WSClient({ botId, secret, maxReconnectAttempts: -1, // 无限重连 logger: { debug: (msg) => runtime.log?.(`[${accountId}][ws-sdk] ${msg}`), info: (msg) => runtime.log?.(`[${accountId}][ws-sdk] ${msg}`), warn: (msg) => runtime.log?.(`[${accountId}][ws-sdk] WARN: ${msg}`), error: (msg) => runtime.error?.(`[${accountId}][ws-sdk] ERROR: ${msg}`) } }); wsClients.set(accountId, wsClient); // 构建 WecomWebhookTarget 以复用 monitor 管线 const target = { account, config, runtime, core, path: `ws://${accountId}`, statusSink }; // 设置消息和事件处理 setupMessageHandler({ wsClient, accountId, target }); setupEventHandler({ wsClient, accountId, target, welcomeText }); // 连接状态日志 wsClient.on("connected", () => { runtime.log?.(`[${accountId}] ws: connected`); }); wsClient.on("authenticated", () => { runtime.log?.(`[${accountId}] ws: authenticated successfully`); }); wsClient.on("disconnected", (reason) => { runtime.log?.(`[${accountId}] ws: disconnected - ${reason}`); }); wsClient.on("reconnecting", (attempt) => { runtime.log?.(`[${accountId}] ws: reconnecting attempt=${attempt}`); }); wsClient.on("error", (err) => { runtime.error?.(`[${accountId}] ws: error - ${err.message}`); }); // 建立连接 wsClient.connect(); runtime.log?.(`[${accountId}] ws: starting connection (botId=${botId})`); // 返回清理函数 return () => { stopWsClient(accountId); }; } /** * 停止指定账号的 WSClient */ function stopWsClient(accountId) { const existing = wsClients.get(accountId); if (existing) { existing.disconnect(); wsClients.delete(accountId); } } /* v9-3dcdd93221630442 */