From 5c6b2cbc8eb50d017a0faefcd919b749c3fbede6 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 25 Feb 2026 00:53:39 +0000 Subject: [PATCH] refactor: extract iMessage echo cache and unify suppression guards --- src/agents/pi-embedded-subscribe.tools.ts | 15 ++-- src/auto-reply/reply/dispatch-from-config.ts | 5 +- src/auto-reply/reply/reply-payloads.ts | 4 + src/auto-reply/reply/route-reply.ts | 3 +- src/imessage/monitor/deliver.ts | 7 +- src/imessage/monitor/echo-cache.ts | 85 ++++++++++++++++++ .../monitor-provider.echo-cache.test.ts | 8 +- src/imessage/monitor/monitor-provider.ts | 86 +------------------ src/infra/outbound/payloads.ts | 7 +- 9 files changed, 116 insertions(+), 104 deletions(-) create mode 100644 src/imessage/monitor/echo-cache.ts diff --git a/src/agents/pi-embedded-subscribe.tools.ts b/src/agents/pi-embedded-subscribe.tools.ts index 745c1212709..08a5e5f80c4 100644 --- a/src/agents/pi-embedded-subscribe.tools.ts +++ b/src/agents/pi-embedded-subscribe.tools.ts @@ -286,6 +286,14 @@ export function extractToolErrorMessage(result: unknown): string | undefined { return normalizeToolErrorText(text); } +function resolveMessageToolTarget(args: Record): string | undefined { + const toRaw = typeof args.to === "string" ? args.to : undefined; + if (toRaw) { + return toRaw; + } + return typeof args.target === "string" ? args.target : undefined; +} + export function extractMessagingToolSend( toolName: string, args: Record, @@ -298,12 +306,7 @@ export function extractMessagingToolSend( if (action !== "send" && action !== "thread-reply") { return undefined; } - const toRaw = - typeof args.to === "string" - ? args.to - : typeof args.target === "string" - ? args.target - : undefined; + const toRaw = resolveMessageToolTarget(args); if (!toRaw) { return undefined; } diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 96989ff98ea..881b1afe6fe 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -17,6 +17,7 @@ import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { formatAbortReplyText, tryFastAbortFromMessage } from "./abort.js"; import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; +import { shouldSuppressReasoningPayload } from "./reply-payloads.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; const AUDIO_PLACEHOLDER_RE = /^(\s*\([^)]*\))?$/i; @@ -366,7 +367,7 @@ export async function dispatchReplyFromConfig(params: { // Suppress reasoning payloads — channels using this generic dispatch // path (WhatsApp, web, etc.) do not have a dedicated reasoning lane. // Telegram has its own dispatch path that handles reasoning splitting. - if (payload.isReasoning) { + if (shouldSuppressReasoningPayload(payload)) { return; } // Accumulate block text for TTS generation after streaming @@ -404,7 +405,7 @@ export async function dispatchReplyFromConfig(params: { for (const reply of replies) { // Suppress reasoning payloads from channel delivery — channels using this // generic dispatch path do not have a dedicated reasoning lane. - if (reply.isReasoning) { + if (shouldSuppressReasoningPayload(reply)) { continue; } const ttsReply = await maybeApplyTtsToPayload({ diff --git a/src/auto-reply/reply/reply-payloads.ts b/src/auto-reply/reply/reply-payloads.ts index 41906f1227f..a408e942a2d 100644 --- a/src/auto-reply/reply/reply-payloads.ts +++ b/src/auto-reply/reply/reply-payloads.ts @@ -68,6 +68,10 @@ export function isRenderablePayload(payload: ReplyPayload): boolean { ); } +export function shouldSuppressReasoningPayload(payload: ReplyPayload): boolean { + return payload.isReasoning === true; +} + export function applyReplyThreading(params: { payloads: ReplyPayload[]; replyToMode: ReplyToMode; diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts index 462d6a54d9b..081fd58a04a 100644 --- a/src/auto-reply/reply/route-reply.ts +++ b/src/auto-reply/reply/route-reply.ts @@ -15,6 +15,7 @@ import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/m import type { OriginatingChannelType } from "../templating.js"; import type { ReplyPayload } from "../types.js"; import { normalizeReplyPayload } from "./normalize-reply.js"; +import { shouldSuppressReasoningPayload } from "./reply-payloads.js"; export type RouteReplyParams = { /** The reply payload to send. */ @@ -56,7 +57,7 @@ export type RouteReplyResult = { */ export async function routeReply(params: RouteReplyParams): Promise { const { payload, channel, to, accountId, threadId, cfg, abortSignal } = params; - if (payload.isReasoning) { + if (shouldSuppressReasoningPayload(payload)) { return { ok: true }; } const normalizedChannel = normalizeMessageChannel(channel); diff --git a/src/imessage/monitor/deliver.ts b/src/imessage/monitor/deliver.ts index 3e8f9391646..71825be8d0b 100644 --- a/src/imessage/monitor/deliver.ts +++ b/src/imessage/monitor/deliver.ts @@ -6,10 +6,7 @@ import { convertMarkdownTables } from "../../markdown/tables.js"; import type { RuntimeEnv } from "../../runtime.js"; import type { createIMessageRpcClient } from "../client.js"; import { sendMessageIMessage } from "../send.js"; - -type SentMessageCache = { - remember: (scope: string, lookup: { text?: string; messageId?: string }) => void; -}; +import type { SentMessageCache } from "./echo-cache.js"; export async function deliverReplies(params: { replies: ReplyPayload[]; @@ -19,7 +16,7 @@ export async function deliverReplies(params: { runtime: RuntimeEnv; maxBytes: number; textLimit: number; - sentMessageCache?: SentMessageCache; + sentMessageCache?: Pick; }) { const { replies, target, client, runtime, maxBytes, textLimit, accountId, sentMessageCache } = params; diff --git a/src/imessage/monitor/echo-cache.ts b/src/imessage/monitor/echo-cache.ts new file mode 100644 index 00000000000..c68ff04b970 --- /dev/null +++ b/src/imessage/monitor/echo-cache.ts @@ -0,0 +1,85 @@ +export type SentMessageLookup = { + text?: string; + messageId?: string; +}; + +export type SentMessageCache = { + remember: (scope: string, lookup: SentMessageLookup) => void; + has: (scope: string, lookup: SentMessageLookup) => boolean; +}; + +const SENT_MESSAGE_TEXT_TTL_MS = 5000; +const SENT_MESSAGE_ID_TTL_MS = 60_000; + +function normalizeEchoTextKey(text: string | undefined): string | null { + if (!text) { + return null; + } + const normalized = text.replace(/\r\n?/g, "\n").trim(); + return normalized ? normalized : null; +} + +function normalizeEchoMessageIdKey(messageId: string | undefined): string | null { + if (!messageId) { + return null; + } + const normalized = messageId.trim(); + if (!normalized || normalized === "ok" || normalized === "unknown") { + return null; + } + return normalized; +} + +class DefaultSentMessageCache implements SentMessageCache { + private textCache = new Map(); + private messageIdCache = new Map(); + + remember(scope: string, lookup: SentMessageLookup): void { + const textKey = normalizeEchoTextKey(lookup.text); + if (textKey) { + this.textCache.set(`${scope}:${textKey}`, Date.now()); + } + const messageIdKey = normalizeEchoMessageIdKey(lookup.messageId); + if (messageIdKey) { + this.messageIdCache.set(`${scope}:${messageIdKey}`, Date.now()); + } + this.cleanup(); + } + + has(scope: string, lookup: SentMessageLookup): boolean { + this.cleanup(); + const messageIdKey = normalizeEchoMessageIdKey(lookup.messageId); + if (messageIdKey) { + const idTimestamp = this.messageIdCache.get(`${scope}:${messageIdKey}`); + if (idTimestamp && Date.now() - idTimestamp <= SENT_MESSAGE_ID_TTL_MS) { + return true; + } + } + const textKey = normalizeEchoTextKey(lookup.text); + if (textKey) { + const textTimestamp = this.textCache.get(`${scope}:${textKey}`); + if (textTimestamp && Date.now() - textTimestamp <= SENT_MESSAGE_TEXT_TTL_MS) { + return true; + } + } + return false; + } + + private cleanup(): void { + const now = Date.now(); + for (const [key, timestamp] of this.textCache.entries()) { + if (now - timestamp > SENT_MESSAGE_TEXT_TTL_MS) { + this.textCache.delete(key); + } + } + for (const [key, timestamp] of this.messageIdCache.entries()) { + if (now - timestamp > SENT_MESSAGE_ID_TTL_MS) { + this.messageIdCache.delete(key); + } + } + } +} + +export function createSentMessageCache(): SentMessageCache { + return new DefaultSentMessageCache(); +} diff --git a/src/imessage/monitor/monitor-provider.echo-cache.test.ts b/src/imessage/monitor/monitor-provider.echo-cache.test.ts index 766b2bf00fb..e67667c0228 100644 --- a/src/imessage/monitor/monitor-provider.echo-cache.test.ts +++ b/src/imessage/monitor/monitor-provider.echo-cache.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { __testing } from "./monitor-provider.js"; +import { createSentMessageCache } from "./echo-cache.js"; describe("iMessage sent-message echo cache", () => { afterEach(() => { @@ -9,7 +9,7 @@ describe("iMessage sent-message echo cache", () => { it("matches recent text within the same scope", () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-02-25T00:00:00Z")); - const cache = __testing.createSentMessageCache(); + const cache = createSentMessageCache(); cache.remember("acct:imessage:+1555", { text: " Reasoning:\r\n_step_ " }); @@ -20,7 +20,7 @@ describe("iMessage sent-message echo cache", () => { it("matches by outbound message id and ignores placeholder ids", () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-02-25T00:00:00Z")); - const cache = __testing.createSentMessageCache(); + const cache = createSentMessageCache(); cache.remember("acct:imessage:+1555", { messageId: "abc-123" }); cache.remember("acct:imessage:+1555", { messageId: "ok" }); @@ -32,7 +32,7 @@ describe("iMessage sent-message echo cache", () => { it("keeps message-id lookups longer than text fallback", () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-02-25T00:00:00Z")); - const cache = __testing.createSentMessageCache(); + const cache = createSentMessageCache(); cache.remember("acct:imessage:+1555", { text: "hello", messageId: "m-1" }); vi.advanceTimersByTime(6000); diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index c585df1f8b3..3bfdc691163 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -44,6 +44,7 @@ import { probeIMessage } from "../probe.js"; import { sendMessageIMessage } from "../send.js"; import { attachIMessageMonitorAbortHandler } from "./abort-handler.js"; import { deliverReplies } from "./deliver.js"; +import { createSentMessageCache } from "./echo-cache.js"; import { buildIMessageInboundContext, resolveIMessageInboundDecision, @@ -80,88 +81,6 @@ async function detectRemoteHostFromCliPath(cliPath: string): Promise(); - private messageIdCache = new Map(); - - remember(scope: string, lookup: SentMessageLookup): void { - const textKey = normalizeEchoTextKey(lookup.text); - if (textKey) { - this.textCache.set(`${scope}:${textKey}`, Date.now()); - } - const messageIdKey = normalizeEchoMessageIdKey(lookup.messageId); - if (messageIdKey) { - this.messageIdCache.set(`${scope}:${messageIdKey}`, Date.now()); - } - this.cleanup(); - } - - has(scope: string, lookup: SentMessageLookup): boolean { - this.cleanup(); - const messageIdKey = normalizeEchoMessageIdKey(lookup.messageId); - if (messageIdKey) { - const idTimestamp = this.messageIdCache.get(`${scope}:${messageIdKey}`); - if (idTimestamp && Date.now() - idTimestamp <= SENT_MESSAGE_ID_TTL_MS) { - return true; - } - } - const textKey = normalizeEchoTextKey(lookup.text); - if (textKey) { - const textTimestamp = this.textCache.get(`${scope}:${textKey}`); - if (textTimestamp && Date.now() - textTimestamp <= SENT_MESSAGE_TEXT_TTL_MS) { - return true; - } - } - return false; - } - - private cleanup(): void { - const now = Date.now(); - for (const [key, timestamp] of this.textCache.entries()) { - if (now - timestamp > SENT_MESSAGE_TEXT_TTL_MS) { - this.textCache.delete(key); - } - } - for (const [key, timestamp] of this.messageIdCache.entries()) { - if (now - timestamp > SENT_MESSAGE_ID_TTL_MS) { - this.messageIdCache.delete(key); - } - } - } -} - export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): Promise { const runtime = resolveRuntime(opts); const cfg = opts.config ?? loadConfig(); @@ -177,7 +96,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P DEFAULT_GROUP_HISTORY_LIMIT, ); const groupHistories = new Map(); - const sentMessageCache = new SentMessageCache(); + const sentMessageCache = createSentMessageCache(); const textLimit = resolveTextChunkLimit(cfg, "imessage", accountInfo.accountId); const allowFrom = normalizeAllowList(opts.allowFrom ?? imessageCfg.allowFrom); const groupAllowFrom = normalizeAllowList( @@ -564,5 +483,4 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P export const __testing = { resolveIMessageRuntimeGroupPolicy: resolveOpenProviderRuntimeGroupPolicy, resolveDefaultGroupPolicy, - createSentMessageCache: () => new SentMessageCache(), }; diff --git a/src/infra/outbound/payloads.ts b/src/infra/outbound/payloads.ts index 98d67ce90bb..c5c99d0038b 100644 --- a/src/infra/outbound/payloads.ts +++ b/src/infra/outbound/payloads.ts @@ -1,5 +1,8 @@ import { parseReplyDirectives } from "../../auto-reply/reply/reply-directives.js"; -import { isRenderablePayload } from "../../auto-reply/reply/reply-payloads.js"; +import { + isRenderablePayload, + shouldSuppressReasoningPayload, +} from "../../auto-reply/reply/reply-payloads.js"; import type { ReplyPayload } from "../../auto-reply/types.js"; export type NormalizedOutboundPayload = { @@ -41,7 +44,7 @@ export function normalizeReplyPayloadsForDelivery( payloads: readonly ReplyPayload[], ): ReplyPayload[] { return payloads.flatMap((payload) => { - if (payload.isReasoning) { + if (shouldSuppressReasoningPayload(payload)) { return []; } const parsed = parseReplyDirectives(payload.text ?? "");