diff --git a/src/config/redact-snapshot.raw.ts b/src/config/redact-snapshot.raw.ts new file mode 100644 index 00000000000..9f6f78a6724 --- /dev/null +++ b/src/config/redact-snapshot.raw.ts @@ -0,0 +1,32 @@ +import { isDeepStrictEqual } from "node:util"; +import JSON5 from "json5"; + +export function replaceSensitiveValuesInRaw(params: { + raw: string; + sensitiveValues: string[]; + redactedSentinel: string; +}): string { + const values = [...params.sensitiveValues].toSorted((a, b) => b.length - a.length); + let result = params.raw; + for (const value of values) { + result = result.replaceAll(value, params.redactedSentinel); + } + return result; +} + +export function shouldFallbackToStructuredRawRedaction(params: { + redactedRaw: string; + originalConfig: unknown; + restoreParsed: (parsed: unknown) => { ok: boolean; result?: unknown }; +}): boolean { + try { + const parsed = JSON5.parse(params.redactedRaw); + const restored = params.restoreParsed(parsed); + if (!restored.ok) { + return true; + } + return !isDeepStrictEqual(restored.result, params.originalConfig); + } catch { + return true; + } +} diff --git a/src/config/redact-snapshot.secret-ref.ts b/src/config/redact-snapshot.secret-ref.ts new file mode 100644 index 00000000000..20af40c6f19 --- /dev/null +++ b/src/config/redact-snapshot.secret-ref.ts @@ -0,0 +1,20 @@ +export function isSecretRefShape( + value: Record, +): value is Record & { source: string; id: string } { + return typeof value.source === "string" && typeof value.id === "string"; +} + +export function redactSecretRefId(params: { + value: Record & { source: string; id: string }; + values: string[]; + redactedSentinel: string; + isEnvVarPlaceholder: (value: string) => boolean; +}): Record { + const { value, values, redactedSentinel, isEnvVarPlaceholder } = params; + const redacted: Record = { ...value }; + if (!isEnvVarPlaceholder(value.id)) { + values.push(value.id); + redacted.id = redactedSentinel; + } + return redacted; +} diff --git a/src/config/redact-snapshot.ts b/src/config/redact-snapshot.ts index d600327cb70..a80d1debb03 100644 --- a/src/config/redact-snapshot.ts +++ b/src/config/redact-snapshot.ts @@ -1,6 +1,10 @@ -import { isDeepStrictEqual } from "node:util"; import JSON5 from "json5"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { + replaceSensitiveValuesInRaw, + shouldFallbackToStructuredRawRedaction, +} from "./redact-snapshot.raw.js"; +import { isSecretRefShape, redactSecretRefId } from "./redact-snapshot.secret-ref.js"; import { isSensitiveConfigPath, type ConfigUiHints } from "./schema.hints.js"; import type { ConfigFileSnapshot } from "./types.openclaw.js"; @@ -24,24 +28,6 @@ function isWholeObjectSensitivePath(path: string): boolean { return lowered.endsWith("serviceaccount") || lowered.endsWith("serviceaccountref"); } -function isSecretRefShape( - value: Record, -): value is Record & { source: string; id: string } { - return typeof value.source === "string" && typeof value.id === "string"; -} - -function redactSecretRef( - value: Record & { source: string; id: string }, - values: string[], -): Record { - const redacted: Record = { ...value }; - if (!isEnvVarPlaceholder(value.id)) { - values.push(value.id); - redacted.id = REDACTED_SENTINEL; - } - return redacted; -} - function collectSensitiveStrings(value: unknown, values: string[]): void { if (typeof value === "string") { if (!isEnvVarPlaceholder(value)) { @@ -206,7 +192,12 @@ function redactObjectWithLookup( if (hints[candidate]?.sensitive === true && !Array.isArray(value)) { const objectValue = value as Record; if (isSecretRefShape(objectValue)) { - result[key] = redactSecretRef(objectValue, values); + result[key] = redactSecretRefId({ + value: objectValue, + values, + redactedSentinel: REDACTED_SENTINEL, + isEnvVarPlaceholder, + }); } else { collectSensitiveStrings(objectValue, values); result[key] = REDACTED_SENTINEL; @@ -320,12 +311,11 @@ function redactObjectGuessing( */ function redactRawText(raw: string, config: unknown, hints?: ConfigUiHints): string { const sensitiveValues = collectSensitiveValues(config, hints); - sensitiveValues.sort((a, b) => b.length - a.length); - let result = raw; - for (const value of sensitiveValues) { - result = result.replaceAll(value, REDACTED_SENTINEL); - } - return result; + return replaceSensitiveValuesInRaw({ + raw, + sensitiveValues, + redactedSentinel: REDACTED_SENTINEL, + }); } let suppressRestoreWarnings = false; @@ -340,25 +330,6 @@ function withRestoreWarningsSuppressed(fn: () => T): T { } } -function shouldFallbackToStructuredRawRedaction(params: { - redactedRaw: string; - originalConfig: unknown; - hints?: ConfigUiHints; -}): boolean { - try { - const parsed = JSON5.parse(params.redactedRaw); - const restored = withRestoreWarningsSuppressed(() => - restoreRedactedValues(parsed, params.originalConfig, params.hints), - ); - if (!restored.ok) { - return true; - } - return !isDeepStrictEqual(restored.result, params.originalConfig); - } catch { - return true; - } -} - /** * Returns a copy of the config snapshot with all sensitive fields * replaced by {@link REDACTED_SENTINEL}. The `hash` is preserved @@ -410,7 +381,10 @@ export function redactConfigSnapshot( shouldFallbackToStructuredRawRedaction({ redactedRaw, originalConfig: snapshot.config, - hints: uiHints, + restoreParsed: (parsed) => + withRestoreWarningsSuppressed(() => + restoreRedactedValues(parsed, snapshot.config, uiHints), + ), }) ) { redactedRaw = JSON5.stringify(redactedParsed ?? redactedConfig, null, 2); diff --git a/src/cron/heartbeat-policy.test.ts b/src/cron/heartbeat-policy.test.ts new file mode 100644 index 00000000000..6ad061217e7 --- /dev/null +++ b/src/cron/heartbeat-policy.test.ts @@ -0,0 +1,59 @@ +import { describe, expect, it } from "vitest"; +import { + shouldEnqueueCronMainSummary, + shouldSkipHeartbeatOnlyDelivery, +} from "./heartbeat-policy.js"; + +describe("shouldSkipHeartbeatOnlyDelivery", () => { + it("suppresses empty payloads", () => { + expect(shouldSkipHeartbeatOnlyDelivery([], 300)).toBe(true); + }); + + it("suppresses when any payload is a heartbeat ack and no media is present", () => { + expect( + shouldSkipHeartbeatOnlyDelivery( + [{ text: "Checked inbox and calendar." }, { text: "HEARTBEAT_OK" }], + 300, + ), + ).toBe(true); + }); + + it("does not suppress when media is present", () => { + expect( + shouldSkipHeartbeatOnlyDelivery( + [{ text: "HEARTBEAT_OK", mediaUrl: "https://example.com/image.png" }], + 300, + ), + ).toBe(false); + }); +}); + +describe("shouldEnqueueCronMainSummary", () => { + const isSystemEvent = (text: string) => text.includes("HEARTBEAT_OK"); + + it("enqueues only when delivery was requested but did not run", () => { + expect( + shouldEnqueueCronMainSummary({ + summaryText: "HEARTBEAT_OK", + deliveryRequested: true, + delivered: false, + deliveryAttempted: false, + suppressMainSummary: false, + isCronSystemEvent: isSystemEvent, + }), + ).toBe(true); + }); + + it("does not enqueue after attempted outbound delivery", () => { + expect( + shouldEnqueueCronMainSummary({ + summaryText: "HEARTBEAT_OK", + deliveryRequested: true, + delivered: false, + deliveryAttempted: true, + suppressMainSummary: false, + isCronSystemEvent: isSystemEvent, + }), + ).toBe(false); + }); +}); diff --git a/src/cron/heartbeat-policy.ts b/src/cron/heartbeat-policy.ts new file mode 100644 index 00000000000..61edfa0701f --- /dev/null +++ b/src/cron/heartbeat-policy.ts @@ -0,0 +1,48 @@ +import { stripHeartbeatToken } from "../auto-reply/heartbeat.js"; + +export type HeartbeatDeliveryPayload = { + text?: string; + mediaUrl?: string; + mediaUrls?: string[]; +}; + +export function shouldSkipHeartbeatOnlyDelivery( + payloads: HeartbeatDeliveryPayload[], + ackMaxChars: number, +): boolean { + if (payloads.length === 0) { + return true; + } + const hasAnyMedia = payloads.some( + (payload) => (payload.mediaUrls?.length ?? 0) > 0 || Boolean(payload.mediaUrl), + ); + if (hasAnyMedia) { + return false; + } + return payloads.some((payload) => { + const result = stripHeartbeatToken(payload.text, { + mode: "heartbeat", + maxAckChars: ackMaxChars, + }); + return result.shouldSkip; + }); +} + +export function shouldEnqueueCronMainSummary(params: { + summaryText: string | undefined; + deliveryRequested: boolean; + delivered: boolean | undefined; + deliveryAttempted: boolean | undefined; + suppressMainSummary: boolean; + isCronSystemEvent: (text: string) => boolean; +}): boolean { + const summaryText = params.summaryText?.trim(); + return Boolean( + summaryText && + params.isCronSystemEvent(summaryText) && + params.deliveryRequested && + !params.delivered && + params.deliveryAttempted !== true && + !params.suppressMainSummary, + ); +} diff --git a/src/cron/isolated-agent/helpers.ts b/src/cron/isolated-agent/helpers.ts index 0e0184a5401..3792a3a7abd 100644 --- a/src/cron/isolated-agent/helpers.ts +++ b/src/cron/isolated-agent/helpers.ts @@ -1,8 +1,6 @@ -import { - DEFAULT_HEARTBEAT_ACK_MAX_CHARS, - stripHeartbeatToken, -} from "../../auto-reply/heartbeat.js"; +import { DEFAULT_HEARTBEAT_ACK_MAX_CHARS } from "../../auto-reply/heartbeat.js"; import { truncateUtf16Safe } from "../../utils.js"; +import { shouldSkipHeartbeatOnlyDelivery } from "../heartbeat-policy.js"; type DeliveryPayload = { text?: string; @@ -91,27 +89,7 @@ export function pickLastDeliverablePayload(payloads: DeliveryPayload[]) { * Returns true when any payload is a heartbeat ack token and no payload contains media. */ export function isHeartbeatOnlyResponse(payloads: DeliveryPayload[], ackMaxChars: number) { - if (payloads.length === 0) { - return true; - } - // If any payload has media, deliver regardless — there's real content. - const hasAnyMedia = payloads.some( - (payload) => (payload.mediaUrls?.length ?? 0) > 0 || Boolean(payload.mediaUrl), - ); - if (hasAnyMedia) { - return false; - } - // An agent may emit multiple text payloads (narration, tool summaries) - // before a final HEARTBEAT_OK. If *any* payload is a heartbeat ack token, - // the agent is signaling "nothing needs attention" — the preceding text - // payloads are just internal narration and should not be delivered. - return payloads.some((payload) => { - const result = stripHeartbeatToken(payload.text, { - mode: "heartbeat", - maxAckChars: ackMaxChars, - }); - return result.shouldSkip; - }); + return shouldSkipHeartbeatOnlyDelivery(payloads, ackMaxChars); } export function resolveHeartbeatAckMaxChars(agentCfg?: { heartbeat?: { ackMaxChars?: number } }) { diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 99f4ea7e72f..ec9d919ec2c 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -3,6 +3,7 @@ import { isCronSystemEvent } from "../../infra/heartbeat-events-filter.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; +import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js"; import { sweepCronRunSessions } from "../session-reaper.js"; import type { CronDeliveryStatus, @@ -995,12 +996,14 @@ export async function executeJobCore( const suppressMainSummary = res.status === "error" && res.errorKind === "delivery-target" && deliveryPlan.requested; if ( - summaryText && - isCronSystemEvent(summaryText) && - deliveryPlan.requested && - !res.delivered && - res.deliveryAttempted !== true && - !suppressMainSummary + shouldEnqueueCronMainSummary({ + summaryText, + deliveryRequested: deliveryPlan.requested, + delivered: res.delivered, + deliveryAttempted: res.deliveryAttempted, + suppressMainSummary, + isCronSystemEvent, + }) ) { const prefix = "Cron"; const label = diff --git a/src/daemon/service-env.test.ts b/src/daemon/service-env.test.ts index d13973e8c83..4080cd88fcf 100644 --- a/src/daemon/service-env.test.ts +++ b/src/daemon/service-env.test.ts @@ -329,58 +329,6 @@ describe("buildServiceEnvironment", () => { expect(env.http_proxy).toBe("http://proxy.local:7890"); expect(env.all_proxy).toBe("socks5://proxy.local:1080"); }); - it("defaults NODE_EXTRA_CA_CERTS to system cert bundle on macOS", () => { - const env = buildServiceEnvironment({ - env: { HOME: "/home/user" }, - port: 18789, - platform: "darwin", - }); - expect(env.NODE_EXTRA_CA_CERTS).toBe("/etc/ssl/cert.pem"); - }); - - it("does not default NODE_EXTRA_CA_CERTS on non-macOS", () => { - const env = buildServiceEnvironment({ - env: { HOME: "/home/user" }, - port: 18789, - platform: "linux", - }); - expect(env.NODE_EXTRA_CA_CERTS).toBeUndefined(); - }); - - it("respects user-provided NODE_EXTRA_CA_CERTS over the default", () => { - const env = buildServiceEnvironment({ - env: { HOME: "/home/user", NODE_EXTRA_CA_CERTS: "/custom/certs/ca.pem" }, - port: 18789, - }); - expect(env.NODE_EXTRA_CA_CERTS).toBe("/custom/certs/ca.pem"); - }); - - it("defaults NODE_USE_SYSTEM_CA=1 on macOS", () => { - const env = buildServiceEnvironment({ - env: { HOME: "/home/user" }, - port: 18789, - platform: "darwin", - }); - expect(env.NODE_USE_SYSTEM_CA).toBe("1"); - }); - - it("does not default NODE_USE_SYSTEM_CA on non-macOS", () => { - const env = buildServiceEnvironment({ - env: { HOME: "/home/user" }, - port: 18789, - platform: "linux", - }); - expect(env.NODE_USE_SYSTEM_CA).toBeUndefined(); - }); - - it("respects user-provided NODE_USE_SYSTEM_CA over the default", () => { - const env = buildServiceEnvironment({ - env: { HOME: "/home/user", NODE_USE_SYSTEM_CA: "0" }, - port: 18789, - platform: "darwin", - }); - expect(env.NODE_USE_SYSTEM_CA).toBe("0"); - }); }); describe("buildNodeServiceEnvironment", () => { @@ -453,51 +401,49 @@ describe("buildNodeServiceEnvironment", () => { }); expect(env.TMPDIR).toBe(os.tmpdir()); }); +}); - it("defaults NODE_EXTRA_CA_CERTS to system cert bundle on macOS for node services", () => { - const env = buildNodeServiceEnvironment({ - env: { HOME: "/home/user" }, - platform: "darwin", - }); +describe("shared Node TLS env defaults", () => { + const builders = [ + { + name: "gateway service env", + build: (env: Record, platform?: NodeJS.Platform) => + buildServiceEnvironment({ env, port: 18789, platform }), + }, + { + name: "node service env", + build: (env: Record, platform?: NodeJS.Platform) => + buildNodeServiceEnvironment({ env, platform }), + }, + ] as const; + + it.each(builders)("$name defaults NODE_EXTRA_CA_CERTS on macOS", ({ build }) => { + const env = build({ HOME: "/home/user" }, "darwin"); expect(env.NODE_EXTRA_CA_CERTS).toBe("/etc/ssl/cert.pem"); }); - it("does not default NODE_EXTRA_CA_CERTS on non-macOS for node services", () => { - const env = buildNodeServiceEnvironment({ - env: { HOME: "/home/user" }, - platform: "linux", - }); + it.each(builders)("$name does not default NODE_EXTRA_CA_CERTS on non-macOS", ({ build }) => { + const env = build({ HOME: "/home/user" }, "linux"); expect(env.NODE_EXTRA_CA_CERTS).toBeUndefined(); }); - it("respects user-provided NODE_EXTRA_CA_CERTS for node services", () => { - const env = buildNodeServiceEnvironment({ - env: { HOME: "/home/user", NODE_EXTRA_CA_CERTS: "/custom/certs/ca.pem" }, - }); + it.each(builders)("$name respects user-provided NODE_EXTRA_CA_CERTS", ({ build }) => { + const env = build({ HOME: "/home/user", NODE_EXTRA_CA_CERTS: "/custom/certs/ca.pem" }); expect(env.NODE_EXTRA_CA_CERTS).toBe("/custom/certs/ca.pem"); }); - it("defaults NODE_USE_SYSTEM_CA=1 on macOS for node services", () => { - const env = buildNodeServiceEnvironment({ - env: { HOME: "/home/user" }, - platform: "darwin", - }); + it.each(builders)("$name defaults NODE_USE_SYSTEM_CA=1 on macOS", ({ build }) => { + const env = build({ HOME: "/home/user" }, "darwin"); expect(env.NODE_USE_SYSTEM_CA).toBe("1"); }); - it("does not default NODE_USE_SYSTEM_CA on non-macOS for node services", () => { - const env = buildNodeServiceEnvironment({ - env: { HOME: "/home/user" }, - platform: "linux", - }); + it.each(builders)("$name does not default NODE_USE_SYSTEM_CA on non-macOS", ({ build }) => { + const env = build({ HOME: "/home/user" }, "linux"); expect(env.NODE_USE_SYSTEM_CA).toBeUndefined(); }); - it("respects user-provided NODE_USE_SYSTEM_CA for node services", () => { - const env = buildNodeServiceEnvironment({ - env: { HOME: "/home/user", NODE_USE_SYSTEM_CA: "0" }, - platform: "darwin", - }); + it.each(builders)("$name respects user-provided NODE_USE_SYSTEM_CA", ({ build }) => { + const env = build({ HOME: "/home/user", NODE_USE_SYSTEM_CA: "0" }, "darwin"); expect(env.NODE_USE_SYSTEM_CA).toBe("0"); }); }); diff --git a/src/daemon/service-env.ts b/src/daemon/service-env.ts index 45bb5099495..f0534746aa7 100644 --- a/src/daemon/service-env.ts +++ b/src/daemon/service-env.ts @@ -25,6 +25,16 @@ type BuildServicePathOptions = MinimalServicePathOptions & { env?: Record; }; +type SharedServiceEnvironmentFields = { + stateDir: string | undefined; + configPath: string | undefined; + tmpDir: string; + minimalPath: string; + proxyEnv: Record; + nodeCaCerts: string | undefined; + nodeUseSystemCa: string | undefined; +}; + const SERVICE_PROXY_ENV_KEYS = [ "HTTP_PROXY", "HTTPS_PROXY", @@ -246,15 +256,8 @@ export function buildServiceEnvironment(params: { launchdLabel || (platform === "darwin" ? resolveGatewayLaunchAgentLabel(profile) : undefined); const systemdUnit = `${resolveGatewaySystemdServiceName(profile)}.service`; return { - HOME: env.HOME, - TMPDIR: sharedEnv.tmpDir, - PATH: sharedEnv.minimalPath, - ...sharedEnv.proxyEnv, - NODE_EXTRA_CA_CERTS: sharedEnv.nodeCaCerts, - NODE_USE_SYSTEM_CA: sharedEnv.nodeUseSystemCa, + ...buildCommonServiceEnvironment(env, sharedEnv), OPENCLAW_PROFILE: profile, - OPENCLAW_STATE_DIR: sharedEnv.stateDir, - OPENCLAW_CONFIG_PATH: sharedEnv.configPath, OPENCLAW_GATEWAY_PORT: String(port), OPENCLAW_GATEWAY_TOKEN: token, OPENCLAW_LAUNCHD_LABEL: resolvedLaunchdLabel, @@ -275,14 +278,7 @@ export function buildNodeServiceEnvironment(params: { const gatewayToken = env.OPENCLAW_GATEWAY_TOKEN?.trim() || env.CLAWDBOT_GATEWAY_TOKEN?.trim() || undefined; return { - HOME: env.HOME, - TMPDIR: sharedEnv.tmpDir, - PATH: sharedEnv.minimalPath, - ...sharedEnv.proxyEnv, - NODE_EXTRA_CA_CERTS: sharedEnv.nodeCaCerts, - NODE_USE_SYSTEM_CA: sharedEnv.nodeUseSystemCa, - OPENCLAW_STATE_DIR: sharedEnv.stateDir, - OPENCLAW_CONFIG_PATH: sharedEnv.configPath, + ...buildCommonServiceEnvironment(env, sharedEnv), OPENCLAW_GATEWAY_TOKEN: gatewayToken, OPENCLAW_LAUNCHD_LABEL: resolveNodeLaunchAgentLabel(), OPENCLAW_SYSTEMD_UNIT: resolveNodeSystemdServiceName(), @@ -295,18 +291,26 @@ export function buildNodeServiceEnvironment(params: { }; } +function buildCommonServiceEnvironment( + env: Record, + sharedEnv: SharedServiceEnvironmentFields, +): Record { + return { + HOME: env.HOME, + TMPDIR: sharedEnv.tmpDir, + PATH: sharedEnv.minimalPath, + ...sharedEnv.proxyEnv, + NODE_EXTRA_CA_CERTS: sharedEnv.nodeCaCerts, + NODE_USE_SYSTEM_CA: sharedEnv.nodeUseSystemCa, + OPENCLAW_STATE_DIR: sharedEnv.stateDir, + OPENCLAW_CONFIG_PATH: sharedEnv.configPath, + }; +} + function resolveSharedServiceEnvironmentFields( env: Record, platform: NodeJS.Platform, -): { - stateDir: string | undefined; - configPath: string | undefined; - tmpDir: string; - minimalPath: string; - proxyEnv: Record; - nodeCaCerts: string | undefined; - nodeUseSystemCa: string | undefined; -} { +): SharedServiceEnvironmentFields { const stateDir = env.OPENCLAW_STATE_DIR; const configPath = env.OPENCLAW_CONFIG_PATH; // Keep a usable temp directory for supervised services even when the host env omits TMPDIR. diff --git a/src/slack/monitor/message-handler/prepare-content.ts b/src/slack/monitor/message-handler/prepare-content.ts new file mode 100644 index 00000000000..2f3ad1a4e06 --- /dev/null +++ b/src/slack/monitor/message-handler/prepare-content.ts @@ -0,0 +1,106 @@ +import { logVerbose } from "../../../globals.js"; +import type { SlackFile, SlackMessageEvent } from "../../types.js"; +import { + MAX_SLACK_MEDIA_FILES, + resolveSlackAttachmentContent, + resolveSlackMedia, + type SlackMediaResult, + type SlackThreadStarter, +} from "../media.js"; + +export type SlackResolvedMessageContent = { + rawBody: string; + effectiveDirectMedia: SlackMediaResult[] | null; +}; + +function filterInheritedParentFiles(params: { + files: SlackFile[] | undefined; + isThreadReply: boolean; + threadStarter: SlackThreadStarter | null; +}): SlackFile[] | undefined { + const { files, isThreadReply, threadStarter } = params; + if (!isThreadReply || !files?.length) { + return files; + } + if (!threadStarter?.files?.length) { + return files; + } + const starterFileIds = new Set(threadStarter.files.map((file) => file.id)); + const filtered = files.filter((file) => !file.id || !starterFileIds.has(file.id)); + if (filtered.length < files.length) { + logVerbose( + `slack: filtered ${files.length - filtered.length} inherited parent file(s) from thread reply`, + ); + } + return filtered.length > 0 ? filtered : undefined; +} + +export async function resolveSlackMessageContent(params: { + message: SlackMessageEvent; + isThreadReply: boolean; + threadStarter: SlackThreadStarter | null; + isBotMessage: boolean; + botToken: string; + mediaMaxBytes: number; +}): Promise { + const ownFiles = filterInheritedParentFiles({ + files: params.message.files, + isThreadReply: params.isThreadReply, + threadStarter: params.threadStarter, + }); + + const media = await resolveSlackMedia({ + files: ownFiles, + token: params.botToken, + maxBytes: params.mediaMaxBytes, + }); + + const attachmentContent = await resolveSlackAttachmentContent({ + attachments: params.message.attachments, + token: params.botToken, + maxBytes: params.mediaMaxBytes, + }); + + const mergedMedia = [...(media ?? []), ...(attachmentContent?.media ?? [])]; + const effectiveDirectMedia = mergedMedia.length > 0 ? mergedMedia : null; + const mediaPlaceholder = effectiveDirectMedia + ? effectiveDirectMedia.map((item) => item.placeholder).join(" ") + : undefined; + + const fallbackFiles = ownFiles ?? []; + const fileOnlyFallback = + !mediaPlaceholder && fallbackFiles.length > 0 + ? fallbackFiles + .slice(0, MAX_SLACK_MEDIA_FILES) + .map((file) => file.name?.trim() || "file") + .join(", ") + : undefined; + const fileOnlyPlaceholder = fileOnlyFallback ? `[Slack file: ${fileOnlyFallback}]` : undefined; + + const botAttachmentText = + params.isBotMessage && !attachmentContent?.text + ? (params.message.attachments ?? []) + .map((attachment) => attachment.text?.trim() || attachment.fallback?.trim()) + .filter(Boolean) + .join("\n") + : undefined; + + const rawBody = + [ + (params.message.text ?? "").trim(), + attachmentContent?.text, + botAttachmentText, + mediaPlaceholder, + fileOnlyPlaceholder, + ] + .filter(Boolean) + .join("\n") || ""; + if (!rawBody) { + return null; + } + + return { + rawBody, + effectiveDirectMedia, + }; +} diff --git a/src/slack/monitor/message-handler/prepare-thread-context.ts b/src/slack/monitor/message-handler/prepare-thread-context.ts new file mode 100644 index 00000000000..f25aa881629 --- /dev/null +++ b/src/slack/monitor/message-handler/prepare-thread-context.ts @@ -0,0 +1,137 @@ +import { formatInboundEnvelope } from "../../../auto-reply/envelope.js"; +import { readSessionUpdatedAt } from "../../../config/sessions.js"; +import { logVerbose } from "../../../globals.js"; +import type { ResolvedSlackAccount } from "../../accounts.js"; +import type { SlackMessageEvent } from "../../types.js"; +import type { SlackMonitorContext } from "../context.js"; +import { + resolveSlackMedia, + resolveSlackThreadHistory, + type SlackMediaResult, + type SlackThreadStarter, +} from "../media.js"; + +export type SlackThreadContextData = { + threadStarterBody: string | undefined; + threadHistoryBody: string | undefined; + threadSessionPreviousTimestamp: number | undefined; + threadLabel: string | undefined; + threadStarterMedia: SlackMediaResult[] | null; +}; + +export async function resolveSlackThreadContextData(params: { + ctx: SlackMonitorContext; + account: ResolvedSlackAccount; + message: SlackMessageEvent; + isThreadReply: boolean; + threadTs: string | undefined; + threadStarter: SlackThreadStarter | null; + roomLabel: string; + storePath: string; + sessionKey: string; + envelopeOptions: ReturnType< + typeof import("../../../auto-reply/envelope.js").resolveEnvelopeFormatOptions + >; + effectiveDirectMedia: SlackMediaResult[] | null; +}): Promise { + let threadStarterBody: string | undefined; + let threadHistoryBody: string | undefined; + let threadSessionPreviousTimestamp: number | undefined; + let threadLabel: string | undefined; + let threadStarterMedia: SlackMediaResult[] | null = null; + + if (!params.isThreadReply || !params.threadTs) { + return { + threadStarterBody, + threadHistoryBody, + threadSessionPreviousTimestamp, + threadLabel, + threadStarterMedia, + }; + } + + const starter = params.threadStarter; + if (starter?.text) { + threadStarterBody = starter.text; + const snippet = starter.text.replace(/\s+/g, " ").slice(0, 80); + threadLabel = `Slack thread ${params.roomLabel}${snippet ? `: ${snippet}` : ""}`; + if (!params.effectiveDirectMedia && starter.files && starter.files.length > 0) { + threadStarterMedia = await resolveSlackMedia({ + files: starter.files, + token: params.ctx.botToken, + maxBytes: params.ctx.mediaMaxBytes, + }); + if (threadStarterMedia) { + const starterPlaceholders = threadStarterMedia.map((item) => item.placeholder).join(", "); + logVerbose(`slack: hydrated thread starter file ${starterPlaceholders} from root message`); + } + } + } else { + threadLabel = `Slack thread ${params.roomLabel}`; + } + + const threadInitialHistoryLimit = params.account.config?.thread?.initialHistoryLimit ?? 20; + threadSessionPreviousTimestamp = readSessionUpdatedAt({ + storePath: params.storePath, + sessionKey: params.sessionKey, + }); + + if (threadInitialHistoryLimit > 0 && !threadSessionPreviousTimestamp) { + const threadHistory = await resolveSlackThreadHistory({ + channelId: params.message.channel, + threadTs: params.threadTs, + client: params.ctx.app.client, + currentMessageTs: params.message.ts, + limit: threadInitialHistoryLimit, + }); + + if (threadHistory.length > 0) { + const uniqueUserIds = [ + ...new Set( + threadHistory.map((item) => item.userId).filter((id): id is string => Boolean(id)), + ), + ]; + const userMap = new Map(); + await Promise.all( + uniqueUserIds.map(async (id) => { + const user = await params.ctx.resolveUserName(id); + if (user) { + userMap.set(id, user); + } + }), + ); + + const historyParts: string[] = []; + for (const historyMsg of threadHistory) { + const msgUser = historyMsg.userId ? userMap.get(historyMsg.userId) : null; + const msgSenderName = + msgUser?.name ?? (historyMsg.botId ? `Bot (${historyMsg.botId})` : "Unknown"); + const isBot = Boolean(historyMsg.botId); + const role = isBot ? "assistant" : "user"; + const msgWithId = `${historyMsg.text}\n[slack message id: ${historyMsg.ts ?? "unknown"} channel: ${params.message.channel}]`; + historyParts.push( + formatInboundEnvelope({ + channel: "Slack", + from: `${msgSenderName} (${role})`, + timestamp: historyMsg.ts ? Math.round(Number(historyMsg.ts) * 1000) : undefined, + body: msgWithId, + chatType: "channel", + envelope: params.envelopeOptions, + }), + ); + } + threadHistoryBody = historyParts.join("\n\n"); + logVerbose( + `slack: populated thread history with ${threadHistory.length} messages for new session`, + ); + } + } + + return { + threadStarterBody, + threadHistoryBody, + threadSessionPreviousTimestamp, + threadLabel, + threadStarterMedia, + }; +} diff --git a/src/slack/monitor/message-handler/prepare.ts b/src/slack/monitor/message-handler/prepare.ts index b2616b63927..f58deecb3f4 100644 --- a/src/slack/monitor/message-handler/prepare.ts +++ b/src/slack/monitor/message-handler/prepare.ts @@ -46,14 +46,10 @@ import { resolveSlackChannelConfig } from "../channel-config.js"; import { stripSlackMentionsForCommandDetection } from "../commands.js"; import { normalizeSlackChannelType, type SlackMonitorContext } from "../context.js"; import { authorizeSlackDirectMessage } from "../dm-auth.js"; -import { - resolveSlackAttachmentContent, - MAX_SLACK_MEDIA_FILES, - resolveSlackMedia, - resolveSlackThreadHistory, - resolveSlackThreadStarter, -} from "../media.js"; +import { resolveSlackThreadStarter } from "../media.js"; import { resolveSlackRoomContextHints } from "../room-context.js"; +import { resolveSlackMessageContent } from "./prepare-content.js"; +import { resolveSlackThreadContextData } from "./prepare-thread-context.js"; import type { PreparedSlackMessage } from "./types.js"; const mentionRegexCache = new WeakMap>(); @@ -515,87 +511,26 @@ export async function prepareSlackMessage(params: { return null; } - // When processing a thread reply, filter out files that belong to the thread - // starter (parent message). Slack's Events API includes the parent's `files` - // array in every thread reply payload, which causes ghost media attachments - // on text-only replies. We eagerly resolve the thread starter here (the result - // is cached) and exclude any file IDs that match the parent. (#32203) - let ownFiles = message.files; - if (isThreadReply && threadTs && message.files?.length) { - const starter = await resolveSlackThreadStarter({ - channelId: message.channel, - threadTs, - client: ctx.app.client, - }); - if (starter?.files?.length) { - const starterFileIds = new Set(starter.files.map((f) => f.id)); - const filtered = message.files.filter((f) => !f.id || !starterFileIds.has(f.id)); - if (filtered.length < message.files.length) { - logVerbose( - `slack: filtered ${message.files.length - filtered.length} inherited parent file(s) from thread reply`, - ); - } - ownFiles = filtered.length > 0 ? filtered : undefined; - } - } - - const media = await resolveSlackMedia({ - files: ownFiles, - token: ctx.botToken, - maxBytes: ctx.mediaMaxBytes, + const threadStarter = + isThreadReply && threadTs + ? await resolveSlackThreadStarter({ + channelId: message.channel, + threadTs, + client: ctx.app.client, + }) + : null; + const resolvedMessageContent = await resolveSlackMessageContent({ + message, + isThreadReply, + threadStarter, + isBotMessage, + botToken: ctx.botToken, + mediaMaxBytes: ctx.mediaMaxBytes, }); - - // Resolve forwarded message content (text + media) from Slack attachments - const attachmentContent = await resolveSlackAttachmentContent({ - attachments: message.attachments, - token: ctx.botToken, - maxBytes: ctx.mediaMaxBytes, - }); - - // Merge forwarded media into the message's media array - const mergedMedia = [...(media ?? []), ...(attachmentContent?.media ?? [])]; - const effectiveDirectMedia = mergedMedia.length > 0 ? mergedMedia : null; - - const mediaPlaceholder = effectiveDirectMedia - ? effectiveDirectMedia.map((m) => m.placeholder).join(" ") - : undefined; - - // When files were attached but all downloads failed, create a fallback - // placeholder so the message is still delivered to the agent instead of - // being silently dropped (#25064). - const fileOnlyFallback = - !mediaPlaceholder && (message.files?.length ?? 0) > 0 - ? message - .files!.slice(0, MAX_SLACK_MEDIA_FILES) - .map((f) => f.name?.trim() || "file") - .join(", ") - : undefined; - const fileOnlyPlaceholder = fileOnlyFallback ? `[Slack file: ${fileOnlyFallback}]` : undefined; - - // Bot messages (e.g. Prometheus, Gatus webhooks) often carry content only in - // non-forwarded attachments (is_share !== true). Extract their text/fallback - // so the message isn't silently dropped when `allowBots: true` (#27616). - const botAttachmentText = - isBotMessage && !attachmentContent?.text - ? (message.attachments ?? []) - .map((a) => a.text?.trim() || a.fallback?.trim()) - .filter(Boolean) - .join("\n") - : undefined; - - const rawBody = - [ - (message.text ?? "").trim(), - attachmentContent?.text, - botAttachmentText, - mediaPlaceholder, - fileOnlyPlaceholder, - ] - .filter(Boolean) - .join("\n") || ""; - if (!rawBody) { + if (!resolvedMessageContent) { return null; } + const { rawBody, effectiveDirectMedia } = resolvedMessageContent; const ackReaction = resolveAckReaction(cfg, route.agentId, { channel: "slack", @@ -711,99 +646,25 @@ export async function prepareSlackMessage(params: { channelConfig, }); - let threadStarterBody: string | undefined; - let threadHistoryBody: string | undefined; - let threadSessionPreviousTimestamp: number | undefined; - let threadLabel: string | undefined; - let threadStarterMedia: Awaited> = null; - if (isThreadReply && threadTs) { - const starter = await resolveSlackThreadStarter({ - channelId: message.channel, - threadTs, - client: ctx.app.client, - }); - if (starter?.text) { - // Keep thread starter as raw text; metadata is provided out-of-band in the system prompt. - threadStarterBody = starter.text; - const snippet = starter.text.replace(/\s+/g, " ").slice(0, 80); - threadLabel = `Slack thread ${roomLabel}${snippet ? `: ${snippet}` : ""}`; - // If current message has no files but thread starter does, fetch starter's files - if (!effectiveDirectMedia && starter.files && starter.files.length > 0) { - threadStarterMedia = await resolveSlackMedia({ - files: starter.files, - token: ctx.botToken, - maxBytes: ctx.mediaMaxBytes, - }); - if (threadStarterMedia) { - const starterPlaceholders = threadStarterMedia.map((m) => m.placeholder).join(", "); - logVerbose( - `slack: hydrated thread starter file ${starterPlaceholders} from root message`, - ); - } - } - } else { - threadLabel = `Slack thread ${roomLabel}`; - } - - // Fetch full thread history for new thread sessions - // This provides context of previous messages (including bot replies) in the thread - // Use the thread session key (not base session key) to determine if this is a new session - const threadInitialHistoryLimit = account.config?.thread?.initialHistoryLimit ?? 20; - threadSessionPreviousTimestamp = readSessionUpdatedAt({ - storePath, - sessionKey, // Thread-specific session key - }); - // Only fetch thread history for NEW sessions (existing sessions already have this context in their transcript) - if (threadInitialHistoryLimit > 0 && !threadSessionPreviousTimestamp) { - const threadHistory = await resolveSlackThreadHistory({ - channelId: message.channel, - threadTs, - client: ctx.app.client, - currentMessageTs: message.ts, - limit: threadInitialHistoryLimit, - }); - - if (threadHistory.length > 0) { - // Batch resolve user names to avoid N sequential API calls - const uniqueUserIds = [ - ...new Set(threadHistory.map((m) => m.userId).filter((id): id is string => Boolean(id))), - ]; - const userMap = new Map(); - await Promise.all( - uniqueUserIds.map(async (id) => { - const user = await ctx.resolveUserName(id); - if (user) { - userMap.set(id, user); - } - }), - ); - - const historyParts: string[] = []; - for (const historyMsg of threadHistory) { - const msgUser = historyMsg.userId ? userMap.get(historyMsg.userId) : null; - const msgSenderName = - msgUser?.name ?? (historyMsg.botId ? `Bot (${historyMsg.botId})` : "Unknown"); - const isBot = Boolean(historyMsg.botId); - const role = isBot ? "assistant" : "user"; - const msgWithId = `${historyMsg.text}\n[slack message id: ${historyMsg.ts ?? "unknown"} channel: ${message.channel}]`; - historyParts.push( - formatInboundEnvelope({ - channel: "Slack", - from: `${msgSenderName} (${role})`, - timestamp: historyMsg.ts ? Math.round(Number(historyMsg.ts) * 1000) : undefined, - body: msgWithId, - chatType: "channel", - envelope: envelopeOptions, - }), - ); - } - threadHistoryBody = historyParts.join("\n\n"); - logVerbose( - `slack: populated thread history with ${threadHistory.length} messages for new session`, - ); - } - } - } + const { + threadStarterBody, + threadHistoryBody, + threadSessionPreviousTimestamp, + threadLabel, + threadStarterMedia, + } = await resolveSlackThreadContextData({ + ctx, + account, + message, + isThreadReply, + threadTs, + threadStarter, + roomLabel, + storePath, + sessionKey, + envelopeOptions, + effectiveDirectMedia, + }); // Use direct media (including forwarded attachment media) if available, else thread starter media const effectiveMedia = effectiveDirectMedia ?? threadStarterMedia;