From 7eda6323249779689ec680c715cfc37434f796fd Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 2 Mar 2026 23:07:12 +0000 Subject: [PATCH] refactor: split slack/discord/session maintenance helpers --- src/config/sessions/store-cache.ts | 81 ++++ src/config/sessions/store-maintenance.ts | 327 +++++++++++++ src/config/sessions/store-migrations.ts | 27 ++ src/config/sessions/store.ts | 435 ++---------------- src/daemon/runtime-paths.ts | 40 +- .../monitor/message-handler.preflight.ts | 55 +-- src/discord/monitor/preflight-audio.ts | 72 +++ src/infra/stable-node-path.ts | 39 ++ src/infra/update-runner.test.ts | 18 +- src/infra/update-runner.ts | 4 +- src/slack/monitor/channel-type.ts | 41 ++ src/slack/monitor/context.ts | 41 +- .../monitor/events/interactions.modal.ts | 259 +++++++++++ src/slack/monitor/events/interactions.ts | 238 +--------- src/slack/monitor/events/messages.test.ts | 52 +++ src/slack/monitor/events/messages.ts | 3 +- 16 files changed, 983 insertions(+), 749 deletions(-) create mode 100644 src/config/sessions/store-cache.ts create mode 100644 src/config/sessions/store-maintenance.ts create mode 100644 src/config/sessions/store-migrations.ts create mode 100644 src/discord/monitor/preflight-audio.ts create mode 100644 src/infra/stable-node-path.ts create mode 100644 src/slack/monitor/channel-type.ts create mode 100644 src/slack/monitor/events/interactions.modal.ts diff --git a/src/config/sessions/store-cache.ts b/src/config/sessions/store-cache.ts new file mode 100644 index 00000000000..994fe242985 --- /dev/null +++ b/src/config/sessions/store-cache.ts @@ -0,0 +1,81 @@ +import type { SessionEntry } from "./types.js"; + +type SessionStoreCacheEntry = { + store: Record; + loadedAt: number; + storePath: string; + mtimeMs?: number; + sizeBytes?: number; + serialized?: string; +}; + +const SESSION_STORE_CACHE = new Map(); +const SESSION_STORE_SERIALIZED_CACHE = new Map(); + +export function clearSessionStoreCaches(): void { + SESSION_STORE_CACHE.clear(); + SESSION_STORE_SERIALIZED_CACHE.clear(); +} + +export function invalidateSessionStoreCache(storePath: string): void { + SESSION_STORE_CACHE.delete(storePath); + SESSION_STORE_SERIALIZED_CACHE.delete(storePath); +} + +export function getSerializedSessionStore(storePath: string): string | undefined { + return SESSION_STORE_SERIALIZED_CACHE.get(storePath); +} + +export function setSerializedSessionStore(storePath: string, serialized?: string): void { + if (serialized === undefined) { + SESSION_STORE_SERIALIZED_CACHE.delete(storePath); + return; + } + SESSION_STORE_SERIALIZED_CACHE.set(storePath, serialized); +} + +export function dropSessionStoreObjectCache(storePath: string): void { + SESSION_STORE_CACHE.delete(storePath); +} + +export function readSessionStoreCache(params: { + storePath: string; + ttlMs: number; + mtimeMs?: number; + sizeBytes?: number; +}): Record | null { + const cached = SESSION_STORE_CACHE.get(params.storePath); + if (!cached) { + return null; + } + const now = Date.now(); + if (now - cached.loadedAt > params.ttlMs) { + invalidateSessionStoreCache(params.storePath); + return null; + } + if (params.mtimeMs !== cached.mtimeMs || params.sizeBytes !== cached.sizeBytes) { + invalidateSessionStoreCache(params.storePath); + return null; + } + return structuredClone(cached.store); +} + +export function writeSessionStoreCache(params: { + storePath: string; + store: Record; + mtimeMs?: number; + sizeBytes?: number; + serialized?: string; +}): void { + SESSION_STORE_CACHE.set(params.storePath, { + store: structuredClone(params.store), + loadedAt: Date.now(), + storePath: params.storePath, + mtimeMs: params.mtimeMs, + sizeBytes: params.sizeBytes, + serialized: params.serialized, + }); + if (params.serialized !== undefined) { + SESSION_STORE_SERIALIZED_CACHE.set(params.storePath, params.serialized); + } +} diff --git a/src/config/sessions/store-maintenance.ts b/src/config/sessions/store-maintenance.ts new file mode 100644 index 00000000000..410fcbc00f0 --- /dev/null +++ b/src/config/sessions/store-maintenance.ts @@ -0,0 +1,327 @@ +import fs from "node:fs"; +import path from "node:path"; +import { parseByteSize } from "../../cli/parse-bytes.js"; +import { parseDurationMs } from "../../cli/parse-duration.js"; +import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { loadConfig } from "../config.js"; +import type { SessionMaintenanceConfig, SessionMaintenanceMode } from "../types.base.js"; +import type { SessionEntry } from "./types.js"; + +const log = createSubsystemLogger("sessions/store"); + +const DEFAULT_SESSION_PRUNE_AFTER_MS = 30 * 24 * 60 * 60 * 1000; +const DEFAULT_SESSION_MAX_ENTRIES = 500; +const DEFAULT_SESSION_ROTATE_BYTES = 10_485_760; // 10 MB +const DEFAULT_SESSION_MAINTENANCE_MODE: SessionMaintenanceMode = "warn"; +const DEFAULT_SESSION_DISK_BUDGET_HIGH_WATER_RATIO = 0.8; + +export type SessionMaintenanceWarning = { + activeSessionKey: string; + activeUpdatedAt?: number; + totalEntries: number; + pruneAfterMs: number; + maxEntries: number; + wouldPrune: boolean; + wouldCap: boolean; +}; + +export type ResolvedSessionMaintenanceConfig = { + mode: SessionMaintenanceMode; + pruneAfterMs: number; + maxEntries: number; + rotateBytes: number; + resetArchiveRetentionMs: number | null; + maxDiskBytes: number | null; + highWaterBytes: number | null; +}; + +function resolvePruneAfterMs(maintenance?: SessionMaintenanceConfig): number { + const raw = maintenance?.pruneAfter ?? maintenance?.pruneDays; + if (raw === undefined || raw === null || raw === "") { + return DEFAULT_SESSION_PRUNE_AFTER_MS; + } + try { + return parseDurationMs(String(raw).trim(), { defaultUnit: "d" }); + } catch { + return DEFAULT_SESSION_PRUNE_AFTER_MS; + } +} + +function resolveRotateBytes(maintenance?: SessionMaintenanceConfig): number { + const raw = maintenance?.rotateBytes; + if (raw === undefined || raw === null || raw === "") { + return DEFAULT_SESSION_ROTATE_BYTES; + } + try { + return parseByteSize(String(raw).trim(), { defaultUnit: "b" }); + } catch { + return DEFAULT_SESSION_ROTATE_BYTES; + } +} + +function resolveResetArchiveRetentionMs( + maintenance: SessionMaintenanceConfig | undefined, + pruneAfterMs: number, +): number | null { + const raw = maintenance?.resetArchiveRetention; + if (raw === false) { + return null; + } + if (raw === undefined || raw === null || raw === "") { + return pruneAfterMs; + } + try { + return parseDurationMs(String(raw).trim(), { defaultUnit: "d" }); + } catch { + return pruneAfterMs; + } +} + +function resolveMaxDiskBytes(maintenance?: SessionMaintenanceConfig): number | null { + const raw = maintenance?.maxDiskBytes; + if (raw === undefined || raw === null || raw === "") { + return null; + } + try { + return parseByteSize(String(raw).trim(), { defaultUnit: "b" }); + } catch { + return null; + } +} + +function resolveHighWaterBytes( + maintenance: SessionMaintenanceConfig | undefined, + maxDiskBytes: number | null, +): number | null { + const computeDefault = () => { + if (maxDiskBytes == null) { + return null; + } + if (maxDiskBytes <= 0) { + return 0; + } + return Math.max( + 1, + Math.min( + maxDiskBytes, + Math.floor(maxDiskBytes * DEFAULT_SESSION_DISK_BUDGET_HIGH_WATER_RATIO), + ), + ); + }; + if (maxDiskBytes == null) { + return null; + } + const raw = maintenance?.highWaterBytes; + if (raw === undefined || raw === null || raw === "") { + return computeDefault(); + } + try { + const parsed = parseByteSize(String(raw).trim(), { defaultUnit: "b" }); + return Math.min(parsed, maxDiskBytes); + } catch { + return computeDefault(); + } +} + +/** + * Resolve maintenance settings from openclaw.json (`session.maintenance`). + * Falls back to built-in defaults when config is missing or unset. + */ +export function resolveMaintenanceConfig(): ResolvedSessionMaintenanceConfig { + let maintenance: SessionMaintenanceConfig | undefined; + try { + maintenance = loadConfig().session?.maintenance; + } catch { + // Config may not be available (e.g. in tests). Use defaults. + } + const pruneAfterMs = resolvePruneAfterMs(maintenance); + const maxDiskBytes = resolveMaxDiskBytes(maintenance); + return { + mode: maintenance?.mode ?? DEFAULT_SESSION_MAINTENANCE_MODE, + pruneAfterMs, + maxEntries: maintenance?.maxEntries ?? DEFAULT_SESSION_MAX_ENTRIES, + rotateBytes: resolveRotateBytes(maintenance), + resetArchiveRetentionMs: resolveResetArchiveRetentionMs(maintenance, pruneAfterMs), + maxDiskBytes, + highWaterBytes: resolveHighWaterBytes(maintenance, maxDiskBytes), + }; +} + +/** + * Remove entries whose `updatedAt` is older than the configured threshold. + * Entries without `updatedAt` are kept (cannot determine staleness). + * Mutates `store` in-place. + */ +export function pruneStaleEntries( + store: Record, + overrideMaxAgeMs?: number, + opts: { log?: boolean; onPruned?: (params: { key: string; entry: SessionEntry }) => void } = {}, +): number { + const maxAgeMs = overrideMaxAgeMs ?? resolveMaintenanceConfig().pruneAfterMs; + const cutoffMs = Date.now() - maxAgeMs; + let pruned = 0; + for (const [key, entry] of Object.entries(store)) { + if (entry?.updatedAt != null && entry.updatedAt < cutoffMs) { + opts.onPruned?.({ key, entry }); + delete store[key]; + pruned++; + } + } + if (pruned > 0 && opts.log !== false) { + log.info("pruned stale session entries", { pruned, maxAgeMs }); + } + return pruned; +} + +function getEntryUpdatedAt(entry?: SessionEntry): number { + return entry?.updatedAt ?? Number.NEGATIVE_INFINITY; +} + +export function getActiveSessionMaintenanceWarning(params: { + store: Record; + activeSessionKey: string; + pruneAfterMs: number; + maxEntries: number; + nowMs?: number; +}): SessionMaintenanceWarning | null { + const activeSessionKey = params.activeSessionKey.trim(); + if (!activeSessionKey) { + return null; + } + const activeEntry = params.store[activeSessionKey]; + if (!activeEntry) { + return null; + } + const now = params.nowMs ?? Date.now(); + const cutoffMs = now - params.pruneAfterMs; + const wouldPrune = activeEntry.updatedAt != null ? activeEntry.updatedAt < cutoffMs : false; + const keys = Object.keys(params.store); + const wouldCap = + keys.length > params.maxEntries && + keys + .toSorted((a, b) => getEntryUpdatedAt(params.store[b]) - getEntryUpdatedAt(params.store[a])) + .slice(params.maxEntries) + .includes(activeSessionKey); + + if (!wouldPrune && !wouldCap) { + return null; + } + + return { + activeSessionKey, + activeUpdatedAt: activeEntry.updatedAt, + totalEntries: keys.length, + pruneAfterMs: params.pruneAfterMs, + maxEntries: params.maxEntries, + wouldPrune, + wouldCap, + }; +} + +/** + * Cap the store to the N most recently updated entries. + * Entries without `updatedAt` are sorted last (removed first when over limit). + * Mutates `store` in-place. + */ +export function capEntryCount( + store: Record, + overrideMax?: number, + opts: { + log?: boolean; + onCapped?: (params: { key: string; entry: SessionEntry }) => void; + } = {}, +): number { + const maxEntries = overrideMax ?? resolveMaintenanceConfig().maxEntries; + const keys = Object.keys(store); + if (keys.length <= maxEntries) { + return 0; + } + + // Sort by updatedAt descending; entries without updatedAt go to the end (removed first). + const sorted = keys.toSorted((a, b) => { + const aTime = getEntryUpdatedAt(store[a]); + const bTime = getEntryUpdatedAt(store[b]); + return bTime - aTime; + }); + + const toRemove = sorted.slice(maxEntries); + for (const key of toRemove) { + const entry = store[key]; + if (entry) { + opts.onCapped?.({ key, entry }); + } + delete store[key]; + } + if (opts.log !== false) { + log.info("capped session entry count", { removed: toRemove.length, maxEntries }); + } + return toRemove.length; +} + +async function getSessionFileSize(storePath: string): Promise { + try { + const stat = await fs.promises.stat(storePath); + return stat.size; + } catch { + return null; + } +} + +/** + * Rotate the sessions file if it exceeds the configured size threshold. + * Renames the current file to `sessions.json.bak.{timestamp}` and cleans up + * old rotation backups, keeping only the 3 most recent `.bak.*` files. + */ +export async function rotateSessionFile( + storePath: string, + overrideBytes?: number, +): Promise { + const maxBytes = overrideBytes ?? resolveMaintenanceConfig().rotateBytes; + + // Check current file size (file may not exist yet). + const fileSize = await getSessionFileSize(storePath); + if (fileSize == null) { + return false; + } + + if (fileSize <= maxBytes) { + return false; + } + + // Rotate: rename current file to .bak.{timestamp} + const backupPath = `${storePath}.bak.${Date.now()}`; + try { + await fs.promises.rename(storePath, backupPath); + log.info("rotated session store file", { + backupPath: path.basename(backupPath), + sizeBytes: fileSize, + }); + } catch { + // If rename fails (e.g. file disappeared), skip rotation. + return false; + } + + // Clean up old backups — keep only the 3 most recent .bak.* files. + try { + const dir = path.dirname(storePath); + const baseName = path.basename(storePath); + const files = await fs.promises.readdir(dir); + const backups = files + .filter((f) => f.startsWith(`${baseName}.bak.`)) + .toSorted() + .toReversed(); + + const maxBackups = 3; + if (backups.length > maxBackups) { + const toDelete = backups.slice(maxBackups); + for (const old of toDelete) { + await fs.promises.unlink(path.join(dir, old)).catch(() => undefined); + } + log.info("cleaned up old session store backups", { deleted: toDelete.length }); + } + } catch { + // Best-effort cleanup; don't fail the write. + } + + return true; +} diff --git a/src/config/sessions/store-migrations.ts b/src/config/sessions/store-migrations.ts new file mode 100644 index 00000000000..0d161f734d6 --- /dev/null +++ b/src/config/sessions/store-migrations.ts @@ -0,0 +1,27 @@ +import type { SessionEntry } from "./types.js"; + +export function applySessionStoreMigrations(store: Record): void { + // Best-effort migration: message provider → channel naming. + for (const entry of Object.values(store)) { + if (!entry || typeof entry !== "object") { + continue; + } + const rec = entry as unknown as Record; + if (typeof rec.channel !== "string" && typeof rec.provider === "string") { + rec.channel = rec.provider; + delete rec.provider; + } + if (typeof rec.lastChannel !== "string" && typeof rec.lastProvider === "string") { + rec.lastChannel = rec.lastProvider; + delete rec.lastProvider; + } + + // Best-effort migration: legacy `room` field → `groupChannel` (keep value, prune old key). + if (typeof rec.groupChannel !== "string" && typeof rec.room === "string") { + rec.groupChannel = rec.room; + delete rec.room; + } else if ("room" in rec) { + delete rec.room; + } + } +} diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 0790bed566c..501a2063f9b 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -2,8 +2,6 @@ import fs from "node:fs"; import path from "node:path"; import { acquireSessionWriteLock } from "../../agents/session-write-lock.js"; import type { MsgContext } from "../../auto-reply/templating.js"; -import { parseByteSize } from "../../cli/parse-bytes.js"; -import { parseDurationMs } from "../../cli/parse-duration.js"; import { archiveSessionTranscripts, cleanupArchivedSessionTranscripts, @@ -18,10 +16,26 @@ import { type DeliveryContext, } from "../../utils/delivery-context.js"; import { getFileStatSnapshot, isCacheEnabled, resolveCacheTtlMs } from "../cache-utils.js"; -import { loadConfig } from "../config.js"; -import type { SessionMaintenanceConfig, SessionMaintenanceMode } from "../types.base.js"; import { enforceSessionDiskBudget, type SessionDiskBudgetSweepResult } from "./disk-budget.js"; import { deriveSessionMetaPatch } from "./metadata.js"; +import { + clearSessionStoreCaches, + dropSessionStoreObjectCache, + getSerializedSessionStore, + readSessionStoreCache, + setSerializedSessionStore, + writeSessionStoreCache, +} from "./store-cache.js"; +import { + capEntryCount, + getActiveSessionMaintenanceWarning, + pruneStaleEntries, + resolveMaintenanceConfig, + rotateSessionFile, + type ResolvedSessionMaintenanceConfig, + type SessionMaintenanceWarning, +} from "./store-maintenance.js"; +import { applySessionStoreMigrations } from "./store-migrations.js"; import { mergeSessionEntry, normalizeSessionRuntimeModelFields, @@ -34,17 +48,6 @@ const log = createSubsystemLogger("sessions/store"); // Session Store Cache with TTL Support // ============================================================================ -type SessionStoreCacheEntry = { - store: Record; - loadedAt: number; - storePath: string; - mtimeMs?: number; - sizeBytes?: number; - serialized?: string; -}; - -const SESSION_STORE_CACHE = new Map(); -const SESSION_STORE_SERIALIZED_CACHE = new Map(); const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s) function isSessionStoreRecord(value: unknown): value is Record { @@ -62,17 +65,6 @@ function isSessionStoreCacheEnabled(): boolean { return isCacheEnabled(getSessionStoreTtl()); } -function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean { - const now = Date.now(); - const ttl = getSessionStoreTtl(); - return now - entry.loadedAt <= ttl; -} - -function invalidateSessionStoreCache(storePath: string): void { - SESSION_STORE_CACHE.delete(storePath); - SESSION_STORE_SERIALIZED_CACHE.delete(storePath); -} - function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry { const normalized = normalizeSessionDeliveryFields({ channel: entry.channel, @@ -173,8 +165,7 @@ function normalizeSessionStore(store: Record): void { } export function clearSessionStoreCacheForTest(): void { - SESSION_STORE_CACHE.clear(); - SESSION_STORE_SERIALIZED_CACHE.clear(); + clearSessionStoreCaches(); for (const queue of LOCK_QUEUES.values()) { for (const task of queue.pending) { task.reject(new Error("session store queue cleared for test")); @@ -206,17 +197,15 @@ export function loadSessionStore( ): Record { // Check cache first if enabled if (!opts.skipCache && isSessionStoreCacheEnabled()) { - const cached = SESSION_STORE_CACHE.get(storePath); - if (cached && isSessionStoreCacheValid(cached)) { - const currentFileStat = getFileStatSnapshot(storePath); - if ( - currentFileStat?.mtimeMs === cached.mtimeMs && - currentFileStat?.sizeBytes === cached.sizeBytes - ) { - // Return a deep copy to prevent external mutations affecting cache - return structuredClone(cached.store); - } - invalidateSessionStoreCache(storePath); + const currentFileStat = getFileStatSnapshot(storePath); + const cached = readSessionStoreCache({ + storePath, + ttlMs: getSessionStoreTtl(), + mtimeMs: currentFileStat?.mtimeMs, + sizeBytes: currentFileStat?.sizeBytes, + }); + if (cached) { + return cached; } } @@ -258,41 +247,18 @@ export function loadSessionStore( } } if (serializedFromDisk !== undefined) { - SESSION_STORE_SERIALIZED_CACHE.set(storePath, serializedFromDisk); + setSerializedSessionStore(storePath, serializedFromDisk); } else { - SESSION_STORE_SERIALIZED_CACHE.delete(storePath); + setSerializedSessionStore(storePath, undefined); } - // Best-effort migration: message provider → channel naming. - for (const entry of Object.values(store)) { - if (!entry || typeof entry !== "object") { - continue; - } - const rec = entry as unknown as Record; - if (typeof rec.channel !== "string" && typeof rec.provider === "string") { - rec.channel = rec.provider; - delete rec.provider; - } - if (typeof rec.lastChannel !== "string" && typeof rec.lastProvider === "string") { - rec.lastChannel = rec.lastProvider; - delete rec.lastProvider; - } - - // Best-effort migration: legacy `room` field → `groupChannel` (keep value, prune old key). - if (typeof rec.groupChannel !== "string" && typeof rec.room === "string") { - rec.groupChannel = rec.room; - delete rec.room; - } else if ("room" in rec) { - delete rec.room; - } - } + applySessionStoreMigrations(store); // Cache the result if caching is enabled if (!opts.skipCache && isSessionStoreCacheEnabled()) { - SESSION_STORE_CACHE.set(storePath, { - store: structuredClone(store), // Store a copy to prevent external mutations - loadedAt: Date.now(), + writeSessionStoreCache({ storePath, + store, mtimeMs, sizeBytes: fileStat?.sizeBytes, serialized: serializedFromDisk, @@ -319,24 +285,8 @@ export function readSessionUpdatedAt(params: { // Session Store Pruning, Capping & File Rotation // ============================================================================ -const DEFAULT_SESSION_PRUNE_AFTER_MS = 30 * 24 * 60 * 60 * 1000; -const DEFAULT_SESSION_MAX_ENTRIES = 500; -const DEFAULT_SESSION_ROTATE_BYTES = 10_485_760; // 10 MB -const DEFAULT_SESSION_MAINTENANCE_MODE: SessionMaintenanceMode = "warn"; -const DEFAULT_SESSION_DISK_BUDGET_HIGH_WATER_RATIO = 0.8; - -export type SessionMaintenanceWarning = { - activeSessionKey: string; - activeUpdatedAt?: number; - totalEntries: number; - pruneAfterMs: number; - maxEntries: number; - wouldPrune: boolean; - wouldCap: boolean; -}; - export type SessionMaintenanceApplyReport = { - mode: SessionMaintenanceMode; + mode: ResolvedSessionMaintenanceConfig["mode"]; beforeCount: number; afterCount: number; pruned: number; @@ -344,306 +294,14 @@ export type SessionMaintenanceApplyReport = { diskBudget: SessionDiskBudgetSweepResult | null; }; -type ResolvedSessionMaintenanceConfig = { - mode: SessionMaintenanceMode; - pruneAfterMs: number; - maxEntries: number; - rotateBytes: number; - resetArchiveRetentionMs: number | null; - maxDiskBytes: number | null; - highWaterBytes: number | null; +export { + capEntryCount, + getActiveSessionMaintenanceWarning, + pruneStaleEntries, + resolveMaintenanceConfig, + rotateSessionFile, }; - -function resolvePruneAfterMs(maintenance?: SessionMaintenanceConfig): number { - const raw = maintenance?.pruneAfter ?? maintenance?.pruneDays; - if (raw === undefined || raw === null || raw === "") { - return DEFAULT_SESSION_PRUNE_AFTER_MS; - } - try { - return parseDurationMs(String(raw).trim(), { defaultUnit: "d" }); - } catch { - return DEFAULT_SESSION_PRUNE_AFTER_MS; - } -} - -function resolveRotateBytes(maintenance?: SessionMaintenanceConfig): number { - const raw = maintenance?.rotateBytes; - if (raw === undefined || raw === null || raw === "") { - return DEFAULT_SESSION_ROTATE_BYTES; - } - try { - return parseByteSize(String(raw).trim(), { defaultUnit: "b" }); - } catch { - return DEFAULT_SESSION_ROTATE_BYTES; - } -} - -function resolveResetArchiveRetentionMs( - maintenance: SessionMaintenanceConfig | undefined, - pruneAfterMs: number, -): number | null { - const raw = maintenance?.resetArchiveRetention; - if (raw === false) { - return null; - } - if (raw === undefined || raw === null || raw === "") { - return pruneAfterMs; - } - try { - return parseDurationMs(String(raw).trim(), { defaultUnit: "d" }); - } catch { - return pruneAfterMs; - } -} - -function resolveMaxDiskBytes(maintenance?: SessionMaintenanceConfig): number | null { - const raw = maintenance?.maxDiskBytes; - if (raw === undefined || raw === null || raw === "") { - return null; - } - try { - return parseByteSize(String(raw).trim(), { defaultUnit: "b" }); - } catch { - return null; - } -} - -function resolveHighWaterBytes( - maintenance: SessionMaintenanceConfig | undefined, - maxDiskBytes: number | null, -): number | null { - const computeDefault = () => { - if (maxDiskBytes == null) { - return null; - } - if (maxDiskBytes <= 0) { - return 0; - } - return Math.max( - 1, - Math.min( - maxDiskBytes, - Math.floor(maxDiskBytes * DEFAULT_SESSION_DISK_BUDGET_HIGH_WATER_RATIO), - ), - ); - }; - if (maxDiskBytes == null) { - return null; - } - const raw = maintenance?.highWaterBytes; - if (raw === undefined || raw === null || raw === "") { - return computeDefault(); - } - try { - const parsed = parseByteSize(String(raw).trim(), { defaultUnit: "b" }); - return Math.min(parsed, maxDiskBytes); - } catch { - return computeDefault(); - } -} - -/** - * Resolve maintenance settings from openclaw.json (`session.maintenance`). - * Falls back to built-in defaults when config is missing or unset. - */ -export function resolveMaintenanceConfig(): ResolvedSessionMaintenanceConfig { - let maintenance: SessionMaintenanceConfig | undefined; - try { - maintenance = loadConfig().session?.maintenance; - } catch { - // Config may not be available (e.g. in tests). Use defaults. - } - const pruneAfterMs = resolvePruneAfterMs(maintenance); - const maxDiskBytes = resolveMaxDiskBytes(maintenance); - return { - mode: maintenance?.mode ?? DEFAULT_SESSION_MAINTENANCE_MODE, - pruneAfterMs, - maxEntries: maintenance?.maxEntries ?? DEFAULT_SESSION_MAX_ENTRIES, - rotateBytes: resolveRotateBytes(maintenance), - resetArchiveRetentionMs: resolveResetArchiveRetentionMs(maintenance, pruneAfterMs), - maxDiskBytes, - highWaterBytes: resolveHighWaterBytes(maintenance, maxDiskBytes), - }; -} - -/** - * Remove entries whose `updatedAt` is older than the configured threshold. - * Entries without `updatedAt` are kept (cannot determine staleness). - * Mutates `store` in-place. - */ -export function pruneStaleEntries( - store: Record, - overrideMaxAgeMs?: number, - opts: { log?: boolean; onPruned?: (params: { key: string; entry: SessionEntry }) => void } = {}, -): number { - const maxAgeMs = overrideMaxAgeMs ?? resolveMaintenanceConfig().pruneAfterMs; - const cutoffMs = Date.now() - maxAgeMs; - let pruned = 0; - for (const [key, entry] of Object.entries(store)) { - if (entry?.updatedAt != null && entry.updatedAt < cutoffMs) { - opts.onPruned?.({ key, entry }); - delete store[key]; - pruned++; - } - } - if (pruned > 0 && opts.log !== false) { - log.info("pruned stale session entries", { pruned, maxAgeMs }); - } - return pruned; -} - -/** - * Cap the store to the N most recently updated entries. - * Entries without `updatedAt` are sorted last (removed first when over limit). - * Mutates `store` in-place. - */ -function getEntryUpdatedAt(entry?: SessionEntry): number { - return entry?.updatedAt ?? Number.NEGATIVE_INFINITY; -} - -export function getActiveSessionMaintenanceWarning(params: { - store: Record; - activeSessionKey: string; - pruneAfterMs: number; - maxEntries: number; - nowMs?: number; -}): SessionMaintenanceWarning | null { - const activeSessionKey = params.activeSessionKey.trim(); - if (!activeSessionKey) { - return null; - } - const activeEntry = params.store[activeSessionKey]; - if (!activeEntry) { - return null; - } - const now = params.nowMs ?? Date.now(); - const cutoffMs = now - params.pruneAfterMs; - const wouldPrune = activeEntry.updatedAt != null ? activeEntry.updatedAt < cutoffMs : false; - const keys = Object.keys(params.store); - const wouldCap = - keys.length > params.maxEntries && - keys - .toSorted((a, b) => getEntryUpdatedAt(params.store[b]) - getEntryUpdatedAt(params.store[a])) - .slice(params.maxEntries) - .includes(activeSessionKey); - - if (!wouldPrune && !wouldCap) { - return null; - } - - return { - activeSessionKey, - activeUpdatedAt: activeEntry.updatedAt, - totalEntries: keys.length, - pruneAfterMs: params.pruneAfterMs, - maxEntries: params.maxEntries, - wouldPrune, - wouldCap, - }; -} - -export function capEntryCount( - store: Record, - overrideMax?: number, - opts: { - log?: boolean; - onCapped?: (params: { key: string; entry: SessionEntry }) => void; - } = {}, -): number { - const maxEntries = overrideMax ?? resolveMaintenanceConfig().maxEntries; - const keys = Object.keys(store); - if (keys.length <= maxEntries) { - return 0; - } - - // Sort by updatedAt descending; entries without updatedAt go to the end (removed first). - const sorted = keys.toSorted((a, b) => { - const aTime = getEntryUpdatedAt(store[a]); - const bTime = getEntryUpdatedAt(store[b]); - return bTime - aTime; - }); - - const toRemove = sorted.slice(maxEntries); - for (const key of toRemove) { - const entry = store[key]; - if (entry) { - opts.onCapped?.({ key, entry }); - } - delete store[key]; - } - if (opts.log !== false) { - log.info("capped session entry count", { removed: toRemove.length, maxEntries }); - } - return toRemove.length; -} - -async function getSessionFileSize(storePath: string): Promise { - try { - const stat = await fs.promises.stat(storePath); - return stat.size; - } catch { - return null; - } -} - -/** - * Rotate the sessions file if it exceeds the configured size threshold. - * Renames the current file to `sessions.json.bak.{timestamp}` and cleans up - * old rotation backups, keeping only the 3 most recent `.bak.*` files. - */ -export async function rotateSessionFile( - storePath: string, - overrideBytes?: number, -): Promise { - const maxBytes = overrideBytes ?? resolveMaintenanceConfig().rotateBytes; - - // Check current file size (file may not exist yet). - const fileSize = await getSessionFileSize(storePath); - if (fileSize == null) { - return false; - } - - if (fileSize <= maxBytes) { - return false; - } - - // Rotate: rename current file to .bak.{timestamp} - const backupPath = `${storePath}.bak.${Date.now()}`; - try { - await fs.promises.rename(storePath, backupPath); - log.info("rotated session store file", { - backupPath: path.basename(backupPath), - sizeBytes: fileSize, - }); - } catch { - // If rename fails (e.g. file disappeared), skip rotation. - return false; - } - - // Clean up old backups — keep only the 3 most recent .bak.* files. - try { - const dir = path.dirname(storePath); - const baseName = path.basename(storePath); - const files = await fs.promises.readdir(dir); - const backups = files - .filter((f) => f.startsWith(`${baseName}.bak.`)) - .toSorted() - .toReversed(); - - const maxBackups = 3; - if (backups.length > maxBackups) { - const toDelete = backups.slice(maxBackups); - for (const old of toDelete) { - await fs.promises.unlink(path.join(dir, old)).catch(() => undefined); - } - log.info("cleaned up old session store backups", { deleted: toDelete.length }); - } - } catch { - // Best-effort cleanup; don't fail the write. - } - - return true; -} +export type { ResolvedSessionMaintenanceConfig, SessionMaintenanceWarning }; type SaveSessionStoreOptions = { /** Skip pruning, capping, and rotation (e.g. during one-time migrations). */ @@ -664,15 +322,14 @@ function updateSessionStoreWriteCaches(params: { serialized: string; }): void { const fileStat = getFileStatSnapshot(params.storePath); - SESSION_STORE_SERIALIZED_CACHE.set(params.storePath, params.serialized); + setSerializedSessionStore(params.storePath, params.serialized); if (!isSessionStoreCacheEnabled()) { - SESSION_STORE_CACHE.delete(params.storePath); + dropSessionStoreObjectCache(params.storePath); return; } - SESSION_STORE_CACHE.set(params.storePath, { - store: structuredClone(params.store), - loadedAt: Date.now(), + writeSessionStoreCache({ storePath: params.storePath, + store: params.store, mtimeMs: fileStat?.mtimeMs, sizeBytes: fileStat?.sizeBytes, serialized: params.serialized, @@ -807,7 +464,7 @@ async function saveSessionStoreUnlocked( await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); const json = JSON.stringify(store, null, 2); - if (SESSION_STORE_SERIALIZED_CACHE.get(storePath) === json) { + if (getSerializedSessionStore(storePath) === json) { updateSessionStoreWriteCaches({ storePath, store, serialized: json }); return; } diff --git a/src/daemon/runtime-paths.ts b/src/daemon/runtime-paths.ts index 8cea5792bb4..a3b737d15bf 100644 --- a/src/daemon/runtime-paths.ts +++ b/src/daemon/runtime-paths.ts @@ -3,6 +3,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import { promisify } from "node:util"; import { isSupportedNodeVersion } from "../infra/runtime-guard.js"; +import { resolveStableNodePath } from "../infra/stable-node-path.js"; const VERSION_MANAGER_MARKERS = [ "/.nvm/", @@ -152,44 +153,7 @@ export function renderSystemNodeWarning( const selectedLabel = selectedNodePath ? ` Using ${selectedNodePath} for the daemon.` : ""; return `System Node ${versionLabel} at ${systemNode.path} is below the required Node 22+.${selectedLabel} Install Node 22+ from nodejs.org or Homebrew.`; } - -/** - * Homebrew Cellar paths (e.g. /opt/homebrew/Cellar/node/25.7.0/bin/node) - * break when Homebrew upgrades Node and removes the old version directory. - * Resolve these to a stable Homebrew-managed path that survives upgrades: - * - Default formula "node": /opt/node/bin/node or /bin/node - * - Versioned formula "node@22": /opt/node@22/bin/node (keg-only) - */ -export async function resolveStableNodePath(nodePath: string): Promise { - const cellarMatch = nodePath.match(/^(.+?)\/Cellar\/([^/]+)\/[^/]+\/bin\/node$/); - if (!cellarMatch) { - return nodePath; - } - const prefix = cellarMatch[1]; // e.g. /opt/homebrew - const formula = cellarMatch[2]; // e.g. "node" or "node@22" - - // Try the Homebrew opt symlink first — works for both default and versioned formulas. - const optPath = `${prefix}/opt/${formula}/bin/node`; - try { - await fs.access(optPath); - return optPath; - } catch { - // fall through - } - - // For the default "node" formula, also try the direct bin symlink. - if (formula === "node") { - const binPath = `${prefix}/bin/node`; - try { - await fs.access(binPath); - return binPath; - } catch { - // fall through - } - } - - return nodePath; -} +export { resolveStableNodePath }; export async function resolvePreferredNodePath(params: { env?: Record; diff --git a/src/discord/monitor/message-handler.preflight.ts b/src/discord/monitor/message-handler.preflight.ts index 471d8b4c24e..1f45e353e16 100644 --- a/src/discord/monitor/message-handler.preflight.ts +++ b/src/discord/monitor/message-handler.preflight.ts @@ -56,6 +56,7 @@ import { resolveDiscordMessageChannelId, resolveDiscordMessageText, } from "./message-utils.js"; +import { resolveDiscordPreflightAudioMentionContext } from "./preflight-audio.js"; import { resolveDiscordSenderIdentity, resolveDiscordWebhookId } from "./sender-identity.js"; import { resolveDiscordSystemEvent } from "./system-events.js"; import { isRecentlyUnboundThreadWebhookMessage } from "./thread-bindings.js"; @@ -498,50 +499,16 @@ export async function preflightDiscordMessage( isBoundThreadSession, }); - // Preflight audio transcription for mention detection in guilds - // This allows voice notes to be checked for mentions before being dropped - let preflightTranscript: string | undefined; - const hasAudioAttachment = message.attachments?.some((att: { content_type?: string }) => - att.content_type?.startsWith("audio/"), - ); - const hasTypedText = Boolean(message.content?.trim()); - const needsPreflightTranscription = - !isDirectMessage && - shouldRequireMention && - hasAudioAttachment && - // `baseText` includes media placeholders; gate on typed text only. - !hasTypedText && - mentionRegexes.length > 0; - - if (needsPreflightTranscription) { - try { - const { transcribeFirstAudio } = await import("../../media-understanding/audio-preflight.js"); - const audioPaths = - message.attachments - ?.filter((att: { content_type?: string; url: string }) => - att.content_type?.startsWith("audio/"), - ) - .map((att: { url: string }) => att.url) ?? []; - if (audioPaths.length > 0) { - const tempCtx = { - MediaUrls: audioPaths, - MediaTypes: message.attachments - ?.filter((att: { content_type?: string; url: string }) => - att.content_type?.startsWith("audio/"), - ) - .map((att: { content_type?: string }) => att.content_type) - .filter(Boolean) as string[], - }; - preflightTranscript = await transcribeFirstAudio({ - ctx: tempCtx, - cfg: params.cfg, - agentDir: undefined, - }); - } - } catch (err) { - logVerbose(`discord: audio preflight transcription failed: ${String(err)}`); - } - } + // Preflight audio transcription for mention detection in guilds. + // This allows voice notes to be checked for mentions before being dropped. + const { hasTypedText, transcript: preflightTranscript } = + await resolveDiscordPreflightAudioMentionContext({ + message, + isDirectMessage, + shouldRequireMention, + mentionRegexes, + cfg: params.cfg, + }); const mentionText = hasTypedText ? baseText : ""; const wasMentioned = diff --git a/src/discord/monitor/preflight-audio.ts b/src/discord/monitor/preflight-audio.ts new file mode 100644 index 00000000000..89e4ae8c3e1 --- /dev/null +++ b/src/discord/monitor/preflight-audio.ts @@ -0,0 +1,72 @@ +import type { OpenClawConfig } from "../../config/config.js"; +import { logVerbose } from "../../globals.js"; + +type DiscordAudioAttachment = { + content_type?: string; + url?: string; +}; + +function collectAudioAttachments( + attachments: DiscordAudioAttachment[] | undefined, +): DiscordAudioAttachment[] { + if (!Array.isArray(attachments)) { + return []; + } + return attachments.filter((att) => att.content_type?.startsWith("audio/")); +} + +export async function resolveDiscordPreflightAudioMentionContext(params: { + message: { + attachments?: DiscordAudioAttachment[]; + content?: string; + }; + isDirectMessage: boolean; + shouldRequireMention: boolean; + mentionRegexes: RegExp[]; + cfg: OpenClawConfig; +}): Promise<{ + hasAudioAttachment: boolean; + hasTypedText: boolean; + transcript?: string; +}> { + const audioAttachments = collectAudioAttachments(params.message.attachments); + const hasAudioAttachment = audioAttachments.length > 0; + const hasTypedText = Boolean(params.message.content?.trim()); + const needsPreflightTranscription = + !params.isDirectMessage && + params.shouldRequireMention && + hasAudioAttachment && + // `baseText` includes media placeholders; gate on typed text only. + !hasTypedText && + params.mentionRegexes.length > 0; + + let transcript: string | undefined; + if (needsPreflightTranscription) { + try { + const { transcribeFirstAudio } = await import("../../media-understanding/audio-preflight.js"); + const audioUrls = audioAttachments + .map((att) => att.url) + .filter((url): url is string => typeof url === "string" && url.length > 0); + if (audioUrls.length > 0) { + transcript = await transcribeFirstAudio({ + ctx: { + MediaUrls: audioUrls, + MediaTypes: audioAttachments + .map((att) => att.content_type) + .filter((contentType): contentType is string => Boolean(contentType)), + }, + cfg: params.cfg, + agentDir: undefined, + }); + } + } catch (err) { + logVerbose(`discord: audio preflight transcription failed: ${String(err)}`); + } + } + + return { + hasAudioAttachment, + hasTypedText, + transcript, + }; +} diff --git a/src/infra/stable-node-path.ts b/src/infra/stable-node-path.ts new file mode 100644 index 00000000000..116b040eefa --- /dev/null +++ b/src/infra/stable-node-path.ts @@ -0,0 +1,39 @@ +import fs from "node:fs/promises"; + +/** + * Homebrew Cellar paths (e.g. /opt/homebrew/Cellar/node/25.7.0/bin/node) + * break when Homebrew upgrades Node and removes the old version directory. + * Resolve these to a stable Homebrew-managed path that survives upgrades: + * - Default formula "node": /opt/node/bin/node or /bin/node + * - Versioned formula "node@22": /opt/node@22/bin/node (keg-only) + */ +export async function resolveStableNodePath(nodePath: string): Promise { + const cellarMatch = nodePath.match(/^(.+?)\/Cellar\/([^/]+)\/[^/]+\/bin\/node$/); + if (!cellarMatch) { + return nodePath; + } + const prefix = cellarMatch[1]; // e.g. /opt/homebrew + const formula = cellarMatch[2]; // e.g. "node" or "node@22" + + // Try the Homebrew opt symlink first — works for both default and versioned formulas. + const optPath = `${prefix}/opt/${formula}/bin/node`; + try { + await fs.access(optPath); + return optPath; + } catch { + // fall through + } + + // For the default "node" formula, also try the direct bin symlink. + if (formula === "node") { + const binPath = `${prefix}/bin/node`; + try { + await fs.access(binPath); + return binPath; + } catch { + // fall through + } + } + + return nodePath; +} diff --git a/src/infra/update-runner.test.ts b/src/infra/update-runner.test.ts index 069bf1bea20..c415e4892c4 100644 --- a/src/infra/update-runner.test.ts +++ b/src/infra/update-runner.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { withEnvAsync } from "../test-utils/env.js"; import { pathExists } from "../utils.js"; +import { resolveStableNodePath } from "./stable-node-path.js"; import { runGatewayUpdate } from "./update-runner.js"; type CommandResponse = { stdout?: string; stderr?: string; code?: number | null }; @@ -49,7 +50,7 @@ describe("runGatewayUpdate", () => { // Shared fixtureRoot cleaned up in afterAll. }); - function createStableTagRunner(params: { + async function createStableTagRunner(params: { stableTag: string; uiIndexPath: string; onDoctor?: () => Promise; @@ -57,7 +58,8 @@ describe("runGatewayUpdate", () => { }) { const calls: string[] = []; let uiBuildCount = 0; - const doctorKey = `${process.execPath} ${path.join(tempDir, "openclaw.mjs")} doctor --non-interactive --fix`; + const doctorNodePath = await resolveStableNodePath(process.execPath); + const doctorKey = `${doctorNodePath} ${path.join(tempDir, "openclaw.mjs")} doctor --non-interactive --fix`; const runCommand = async (argv: string[]) => { const key = argv.join(" "); @@ -287,15 +289,15 @@ describe("runGatewayUpdate", () => { await setupUiIndex(); const stableTag = "v1.0.1-1"; const betaTag = "v1.0.0-beta.2"; + const doctorNodePath = await resolveStableNodePath(process.execPath); const { runner, calls } = createRunner({ ...buildStableTagResponses(stableTag, { additionalTags: [betaTag] }), "pnpm install": { stdout: "" }, "pnpm build": { stdout: "" }, "pnpm ui:build": { stdout: "" }, - [`${process.execPath} ${path.join(tempDir, "openclaw.mjs")} doctor --non-interactive --fix`]: - { - stdout: "", - }, + [`${doctorNodePath} ${path.join(tempDir, "openclaw.mjs")} doctor --non-interactive --fix`]: { + stdout: "", + }, }); const result = await runWithRunner(runner, { channel: "beta" }); @@ -544,7 +546,7 @@ describe("runGatewayUpdate", () => { const uiIndexPath = await setupUiIndex(); const stableTag = "v1.0.1-1"; - const { runCommand, calls, doctorKey, getUiBuildCount } = createStableTagRunner({ + const { runCommand, calls, doctorKey, getUiBuildCount } = await createStableTagRunner({ stableTag, uiIndexPath, onUiBuild: async (count) => { @@ -567,7 +569,7 @@ describe("runGatewayUpdate", () => { const uiIndexPath = await setupUiIndex(); const stableTag = "v1.0.1-1"; - const { runCommand } = createStableTagRunner({ + const { runCommand } = await createStableTagRunner({ stableTag, uiIndexPath, onUiBuild: async (count) => { diff --git a/src/infra/update-runner.ts b/src/infra/update-runner.ts index 7fde9322f99..5b1e31512da 100644 --- a/src/infra/update-runner.ts +++ b/src/infra/update-runner.ts @@ -10,6 +10,7 @@ import { detectPackageManager as detectPackageManagerImpl } from "./detect-packa import { readPackageName, readPackageVersion } from "./package-json.js"; import { normalizePackageTagInput } from "./package-tag.js"; import { trimLogTail } from "./restart-sentinel.js"; +import { resolveStableNodePath } from "./stable-node-path.js"; import { channelToNpmTag, DEFAULT_PACKAGE_CHANNEL, @@ -766,7 +767,8 @@ export async function runGatewayUpdate(opts: UpdateRunnerOptions = {}): Promise< // Use --fix so that doctor auto-strips unknown config keys introduced by // schema changes between versions, preventing a startup validation crash. - const doctorArgv = [process.execPath, doctorEntry, "doctor", "--non-interactive", "--fix"]; + const doctorNodePath = await resolveStableNodePath(process.execPath); + const doctorArgv = [doctorNodePath, doctorEntry, "doctor", "--non-interactive", "--fix"]; const doctorStep = await runStep( step("openclaw doctor", doctorArgv, gitRoot, { OPENCLAW_UPDATE_IN_PROGRESS: "1" }), ); diff --git a/src/slack/monitor/channel-type.ts b/src/slack/monitor/channel-type.ts new file mode 100644 index 00000000000..fafb334a19b --- /dev/null +++ b/src/slack/monitor/channel-type.ts @@ -0,0 +1,41 @@ +import type { SlackMessageEvent } from "../types.js"; + +export function inferSlackChannelType( + channelId?: string | null, +): SlackMessageEvent["channel_type"] | undefined { + const trimmed = channelId?.trim(); + if (!trimmed) { + return undefined; + } + if (trimmed.startsWith("D")) { + return "im"; + } + if (trimmed.startsWith("C")) { + return "channel"; + } + if (trimmed.startsWith("G")) { + return "group"; + } + return undefined; +} + +export function normalizeSlackChannelType( + channelType?: string | null, + channelId?: string | null, +): SlackMessageEvent["channel_type"] { + const normalized = channelType?.trim().toLowerCase(); + const inferred = inferSlackChannelType(channelId); + if ( + normalized === "im" || + normalized === "mpim" || + normalized === "channel" || + normalized === "group" + ) { + // D-prefix channel IDs are always DMs — override a contradicting channel_type. + if (inferred === "im" && normalized !== "im") { + return "im"; + } + return normalized; + } + return inferred ?? "channel"; +} diff --git a/src/slack/monitor/context.ts b/src/slack/monitor/context.ts index da50e5a0cbb..2127505f6e5 100644 --- a/src/slack/monitor/context.ts +++ b/src/slack/monitor/context.ts @@ -12,47 +12,10 @@ import type { SlackMessageEvent } from "../types.js"; import { normalizeAllowList, normalizeAllowListLower, normalizeSlackSlug } from "./allow-list.js"; import type { SlackChannelConfigEntries } from "./channel-config.js"; import { resolveSlackChannelConfig } from "./channel-config.js"; +import { normalizeSlackChannelType } from "./channel-type.js"; import { isSlackChannelAllowedByPolicy } from "./policy.js"; -export function inferSlackChannelType( - channelId?: string | null, -): SlackMessageEvent["channel_type"] | undefined { - const trimmed = channelId?.trim(); - if (!trimmed) { - return undefined; - } - if (trimmed.startsWith("D")) { - return "im"; - } - if (trimmed.startsWith("C")) { - return "channel"; - } - if (trimmed.startsWith("G")) { - return "group"; - } - return undefined; -} - -export function normalizeSlackChannelType( - channelType?: string | null, - channelId?: string | null, -): SlackMessageEvent["channel_type"] { - const normalized = channelType?.trim().toLowerCase(); - const inferred = inferSlackChannelType(channelId); - if ( - normalized === "im" || - normalized === "mpim" || - normalized === "channel" || - normalized === "group" - ) { - // D-prefix channel IDs are always DMs — override a contradicting channel_type. - if (inferred === "im" && normalized !== "im") { - return "im"; - } - return normalized; - } - return inferred ?? "channel"; -} +export { inferSlackChannelType, normalizeSlackChannelType } from "./channel-type.js"; export type SlackMonitorContext = { cfg: OpenClawConfig; diff --git a/src/slack/monitor/events/interactions.modal.ts b/src/slack/monitor/events/interactions.modal.ts new file mode 100644 index 00000000000..603b1ab79e2 --- /dev/null +++ b/src/slack/monitor/events/interactions.modal.ts @@ -0,0 +1,259 @@ +import { enqueueSystemEvent } from "../../../infra/system-events.js"; +import { parseSlackModalPrivateMetadata } from "../../modal-metadata.js"; +import { authorizeSlackSystemEventSender } from "../auth.js"; +import type { SlackMonitorContext } from "../context.js"; + +export type ModalInputSummary = { + blockId: string; + actionId: string; + actionType?: string; + inputKind?: "text" | "number" | "email" | "url" | "rich_text"; + value?: string; + selectedValues?: string[]; + selectedUsers?: string[]; + selectedChannels?: string[]; + selectedConversations?: string[]; + selectedLabels?: string[]; + selectedDate?: string; + selectedTime?: string; + selectedDateTime?: number; + inputValue?: string; + inputNumber?: number; + inputEmail?: string; + inputUrl?: string; + richTextValue?: unknown; + richTextPreview?: string; +}; + +export type SlackModalBody = { + user?: { id?: string }; + team?: { id?: string }; + view?: { + id?: string; + callback_id?: string; + private_metadata?: string; + root_view_id?: string; + previous_view_id?: string; + external_id?: string; + hash?: string; + state?: { values?: unknown }; + }; + is_cleared?: boolean; +}; + +type SlackModalEventBase = { + callbackId: string; + userId: string; + expectedUserId?: string; + viewId?: string; + sessionRouting: ReturnType; + payload: { + actionId: string; + callbackId: string; + viewId?: string; + userId: string; + teamId?: string; + rootViewId?: string; + previousViewId?: string; + externalId?: string; + viewHash?: string; + isStackedView?: boolean; + privateMetadata?: string; + routedChannelId?: string; + routedChannelType?: string; + inputs: ModalInputSummary[]; + }; +}; + +export type SlackModalInteractionKind = "view_submission" | "view_closed"; +export type SlackModalEventHandlerArgs = { ack: () => Promise; body: unknown }; +export type RegisterSlackModalHandler = ( + matcher: RegExp, + handler: (args: SlackModalEventHandlerArgs) => Promise, +) => void; + +type SlackInteractionContextPrefix = "slack:interaction:view" | "slack:interaction:view-closed"; + +function resolveModalSessionRouting(params: { + ctx: SlackMonitorContext; + metadata: ReturnType; +}): { sessionKey: string; channelId?: string; channelType?: string } { + const metadata = params.metadata; + if (metadata.sessionKey) { + return { + sessionKey: metadata.sessionKey, + channelId: metadata.channelId, + channelType: metadata.channelType, + }; + } + if (metadata.channelId) { + return { + sessionKey: params.ctx.resolveSlackSystemEventSessionKey({ + channelId: metadata.channelId, + channelType: metadata.channelType, + }), + channelId: metadata.channelId, + channelType: metadata.channelType, + }; + } + return { + sessionKey: params.ctx.resolveSlackSystemEventSessionKey({}), + }; +} + +function summarizeSlackViewLifecycleContext(view: { + root_view_id?: string; + previous_view_id?: string; + external_id?: string; + hash?: string; +}): { + rootViewId?: string; + previousViewId?: string; + externalId?: string; + viewHash?: string; + isStackedView?: boolean; +} { + const rootViewId = view.root_view_id; + const previousViewId = view.previous_view_id; + const externalId = view.external_id; + const viewHash = view.hash; + return { + rootViewId, + previousViewId, + externalId, + viewHash, + isStackedView: Boolean(previousViewId), + }; +} + +function resolveSlackModalEventBase(params: { + ctx: SlackMonitorContext; + body: SlackModalBody; + summarizeViewState: (values: unknown) => ModalInputSummary[]; +}): SlackModalEventBase { + const metadata = parseSlackModalPrivateMetadata(params.body.view?.private_metadata); + const callbackId = params.body.view?.callback_id ?? "unknown"; + const userId = params.body.user?.id ?? "unknown"; + const viewId = params.body.view?.id; + const inputs = params.summarizeViewState(params.body.view?.state?.values); + const sessionRouting = resolveModalSessionRouting({ + ctx: params.ctx, + metadata, + }); + return { + callbackId, + userId, + expectedUserId: metadata.userId, + viewId, + sessionRouting, + payload: { + actionId: `view:${callbackId}`, + callbackId, + viewId, + userId, + teamId: params.body.team?.id, + ...summarizeSlackViewLifecycleContext({ + root_view_id: params.body.view?.root_view_id, + previous_view_id: params.body.view?.previous_view_id, + external_id: params.body.view?.external_id, + hash: params.body.view?.hash, + }), + privateMetadata: params.body.view?.private_metadata, + routedChannelId: sessionRouting.channelId, + routedChannelType: sessionRouting.channelType, + inputs, + }, + }; +} + +export async function emitSlackModalLifecycleEvent(params: { + ctx: SlackMonitorContext; + body: SlackModalBody; + interactionType: SlackModalInteractionKind; + contextPrefix: SlackInteractionContextPrefix; + summarizeViewState: (values: unknown) => ModalInputSummary[]; + formatSystemEvent: (payload: Record) => string; +}): Promise { + const { callbackId, userId, expectedUserId, viewId, sessionRouting, payload } = + resolveSlackModalEventBase({ + ctx: params.ctx, + body: params.body, + summarizeViewState: params.summarizeViewState, + }); + const isViewClosed = params.interactionType === "view_closed"; + const isCleared = params.body.is_cleared === true; + const eventPayload = isViewClosed + ? { + interactionType: params.interactionType, + ...payload, + isCleared, + } + : { + interactionType: params.interactionType, + ...payload, + }; + + if (isViewClosed) { + params.ctx.runtime.log?.( + `slack:interaction view_closed callback=${callbackId} user=${userId} cleared=${isCleared}`, + ); + } else { + params.ctx.runtime.log?.( + `slack:interaction view_submission callback=${callbackId} user=${userId} inputs=${payload.inputs.length}`, + ); + } + + if (!expectedUserId) { + params.ctx.runtime.log?.( + `slack:interaction drop modal callback=${callbackId} user=${userId} reason=missing-expected-user`, + ); + return; + } + + const auth = await authorizeSlackSystemEventSender({ + ctx: params.ctx, + senderId: userId, + channelId: sessionRouting.channelId, + channelType: sessionRouting.channelType, + expectedSenderId: expectedUserId, + }); + if (!auth.allowed) { + params.ctx.runtime.log?.( + `slack:interaction drop modal callback=${callbackId} user=${userId} reason=${auth.reason ?? "unauthorized"}`, + ); + return; + } + + enqueueSystemEvent(params.formatSystemEvent(eventPayload), { + sessionKey: sessionRouting.sessionKey, + contextKey: [params.contextPrefix, callbackId, viewId, userId].filter(Boolean).join(":"), + }); +} + +export function registerModalLifecycleHandler(params: { + register: RegisterSlackModalHandler; + matcher: RegExp; + ctx: SlackMonitorContext; + interactionType: SlackModalInteractionKind; + contextPrefix: SlackInteractionContextPrefix; + summarizeViewState: (values: unknown) => ModalInputSummary[]; + formatSystemEvent: (payload: Record) => string; +}) { + params.register(params.matcher, async ({ ack, body }: SlackModalEventHandlerArgs) => { + await ack(); + if (params.ctx.shouldDropMismatchedSlackEvent?.(body)) { + params.ctx.runtime.log?.( + `slack:interaction drop ${params.interactionType} payload (mismatched app/team)`, + ); + return; + } + await emitSlackModalLifecycleEvent({ + ctx: params.ctx, + body: body as SlackModalBody, + interactionType: params.interactionType, + contextPrefix: params.contextPrefix, + summarizeViewState: params.summarizeViewState, + formatSystemEvent: params.formatSystemEvent, + }); + }); +} diff --git a/src/slack/monitor/events/interactions.ts b/src/slack/monitor/events/interactions.ts index 5f371dae2cd..3a242652bc9 100644 --- a/src/slack/monitor/events/interactions.ts +++ b/src/slack/monitor/events/interactions.ts @@ -1,10 +1,14 @@ import type { SlackActionMiddlewareArgs } from "@slack/bolt"; import type { Block, KnownBlock } from "@slack/web-api"; import { enqueueSystemEvent } from "../../../infra/system-events.js"; -import { parseSlackModalPrivateMetadata } from "../../modal-metadata.js"; import { authorizeSlackSystemEventSender } from "../auth.js"; import type { SlackMonitorContext } from "../context.js"; import { escapeSlackMrkdwn } from "../mrkdwn.js"; +import { + registerModalLifecycleHandler, + type ModalInputSummary, + type RegisterSlackModalHandler, +} from "./interactions.modal.js"; // Prefix for OpenClaw-generated action IDs to scope our handler const OPENCLAW_ACTION_PREFIX = "openclaw:"; @@ -68,58 +72,6 @@ type InteractionSummary = InteractionSelectionFields & { threadTs?: string; }; -type ModalInputSummary = InteractionSelectionFields & { - blockId: string; - actionId: string; -}; - -type SlackModalBody = { - user?: { id?: string }; - team?: { id?: string }; - view?: { - id?: string; - callback_id?: string; - private_metadata?: string; - root_view_id?: string; - previous_view_id?: string; - external_id?: string; - hash?: string; - state?: { values?: unknown }; - }; - is_cleared?: boolean; -}; - -type SlackModalEventBase = { - callbackId: string; - userId: string; - expectedUserId?: string; - viewId?: string; - sessionRouting: ReturnType; - payload: { - actionId: string; - callbackId: string; - viewId?: string; - userId: string; - teamId?: string; - rootViewId?: string; - previousViewId?: string; - externalId?: string; - viewHash?: string; - isStackedView?: boolean; - privateMetadata?: string; - routedChannelId?: string; - routedChannelType?: string; - inputs: ModalInputSummary[]; - }; -}; - -type SlackModalInteractionKind = "view_submission" | "view_closed"; -type SlackModalEventHandlerArgs = { ack: () => Promise; body: unknown }; -type RegisterSlackModalHandler = ( - matcher: RegExp, - handler: (args: SlackModalEventHandlerArgs) => Promise, -) => void; - function truncateInteractionString( value: string, max = SLACK_INTERACTION_STRING_MAX_CHARS, @@ -518,182 +470,6 @@ function summarizeViewState(values: unknown): ModalInputSummary[] { return entries; } -function resolveModalSessionRouting(params: { - ctx: SlackMonitorContext; - metadata: ReturnType; -}): { sessionKey: string; channelId?: string; channelType?: string } { - const metadata = params.metadata; - if (metadata.sessionKey) { - return { - sessionKey: metadata.sessionKey, - channelId: metadata.channelId, - channelType: metadata.channelType, - }; - } - if (metadata.channelId) { - return { - sessionKey: params.ctx.resolveSlackSystemEventSessionKey({ - channelId: metadata.channelId, - channelType: metadata.channelType, - }), - channelId: metadata.channelId, - channelType: metadata.channelType, - }; - } - return { - sessionKey: params.ctx.resolveSlackSystemEventSessionKey({}), - }; -} - -function summarizeSlackViewLifecycleContext(view: { - root_view_id?: string; - previous_view_id?: string; - external_id?: string; - hash?: string; -}): { - rootViewId?: string; - previousViewId?: string; - externalId?: string; - viewHash?: string; - isStackedView?: boolean; -} { - const rootViewId = view.root_view_id; - const previousViewId = view.previous_view_id; - const externalId = view.external_id; - const viewHash = view.hash; - return { - rootViewId, - previousViewId, - externalId, - viewHash, - isStackedView: Boolean(previousViewId), - }; -} - -function resolveSlackModalEventBase(params: { - ctx: SlackMonitorContext; - body: SlackModalBody; -}): SlackModalEventBase { - const metadata = parseSlackModalPrivateMetadata(params.body.view?.private_metadata); - const callbackId = params.body.view?.callback_id ?? "unknown"; - const userId = params.body.user?.id ?? "unknown"; - const viewId = params.body.view?.id; - const inputs = summarizeViewState(params.body.view?.state?.values); - const sessionRouting = resolveModalSessionRouting({ - ctx: params.ctx, - metadata, - }); - return { - callbackId, - userId, - expectedUserId: metadata.userId, - viewId, - sessionRouting, - payload: { - actionId: `view:${callbackId}`, - callbackId, - viewId, - userId, - teamId: params.body.team?.id, - ...summarizeSlackViewLifecycleContext({ - root_view_id: params.body.view?.root_view_id, - previous_view_id: params.body.view?.previous_view_id, - external_id: params.body.view?.external_id, - hash: params.body.view?.hash, - }), - privateMetadata: params.body.view?.private_metadata, - routedChannelId: sessionRouting.channelId, - routedChannelType: sessionRouting.channelType, - inputs, - }, - }; -} - -async function emitSlackModalLifecycleEvent(params: { - ctx: SlackMonitorContext; - body: SlackModalBody; - interactionType: SlackModalInteractionKind; - contextPrefix: "slack:interaction:view" | "slack:interaction:view-closed"; -}): Promise { - const { callbackId, userId, expectedUserId, viewId, sessionRouting, payload } = - resolveSlackModalEventBase({ - ctx: params.ctx, - body: params.body, - }); - const isViewClosed = params.interactionType === "view_closed"; - const isCleared = params.body.is_cleared === true; - const eventPayload = isViewClosed - ? { - interactionType: params.interactionType, - ...payload, - isCleared, - } - : { - interactionType: params.interactionType, - ...payload, - }; - - if (isViewClosed) { - params.ctx.runtime.log?.( - `slack:interaction view_closed callback=${callbackId} user=${userId} cleared=${isCleared}`, - ); - } else { - params.ctx.runtime.log?.( - `slack:interaction view_submission callback=${callbackId} user=${userId} inputs=${payload.inputs.length}`, - ); - } - - if (!expectedUserId) { - params.ctx.runtime.log?.( - `slack:interaction drop modal callback=${callbackId} user=${userId} reason=missing-expected-user`, - ); - return; - } - - const auth = await authorizeSlackSystemEventSender({ - ctx: params.ctx, - senderId: userId, - channelId: sessionRouting.channelId, - channelType: sessionRouting.channelType, - expectedSenderId: expectedUserId, - }); - if (!auth.allowed) { - params.ctx.runtime.log?.( - `slack:interaction drop modal callback=${callbackId} user=${userId} reason=${auth.reason ?? "unauthorized"}`, - ); - return; - } - - enqueueSystemEvent(formatSlackInteractionSystemEvent(eventPayload), { - sessionKey: sessionRouting.sessionKey, - contextKey: [params.contextPrefix, callbackId, viewId, userId].filter(Boolean).join(":"), - }); -} - -function registerModalLifecycleHandler(params: { - register: RegisterSlackModalHandler; - matcher: RegExp; - ctx: SlackMonitorContext; - interactionType: SlackModalInteractionKind; - contextPrefix: "slack:interaction:view" | "slack:interaction:view-closed"; -}) { - params.register(params.matcher, async ({ ack, body }: SlackModalEventHandlerArgs) => { - await ack(); - if (params.ctx.shouldDropMismatchedSlackEvent?.(body)) { - params.ctx.runtime.log?.( - `slack:interaction drop ${params.interactionType} payload (mismatched app/team)`, - ); - return; - } - await emitSlackModalLifecycleEvent({ - ctx: params.ctx, - body: body as SlackModalBody, - interactionType: params.interactionType, - contextPrefix: params.contextPrefix, - }); - }); -} - export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContext }) { const { ctx } = params; if (typeof ctx.app.action !== "function") { @@ -891,6 +667,8 @@ export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContex ctx, interactionType: "view_submission", contextPrefix: "slack:interaction:view", + summarizeViewState, + formatSystemEvent: formatSlackInteractionSystemEvent, }); const viewClosed = ( @@ -909,5 +687,7 @@ export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContex ctx, interactionType: "view_closed", contextPrefix: "slack:interaction:view-closed", + summarizeViewState, + formatSystemEvent: formatSlackInteractionSystemEvent, }); } diff --git a/src/slack/monitor/events/messages.test.ts b/src/slack/monitor/events/messages.test.ts index 989c4b3b96d..922458a40b1 100644 --- a/src/slack/monitor/events/messages.test.ts +++ b/src/slack/monitor/events/messages.test.ts @@ -17,6 +17,7 @@ vi.mock("../../../pairing/pairing-store.js", () => ({ })); type MessageHandler = (args: { event: Record; body: unknown }) => Promise; +type AppMentionHandler = MessageHandler; type MessageCase = { overrides?: SlackSystemEventTestOverrides; @@ -37,6 +38,19 @@ function createMessageHandlers(overrides?: SlackSystemEventTestOverrides) { }; } +function createAppMentionHandlers(overrides?: SlackSystemEventTestOverrides) { + const harness = createSlackSystemEventTestHarness(overrides); + const handleSlackMessage = vi.fn(async () => {}); + registerSlackMessageEvents({ + ctx: harness.ctx, + handleSlackMessage, + }); + return { + handler: harness.getHandler("app_mention") as AppMentionHandler | null, + handleSlackMessage, + }; +} + function makeChangedEvent(overrides?: { channel?: string; user?: string }) { const user = overrides?.user ?? "U1"; return { @@ -214,4 +228,42 @@ describe("registerSlackMessageEvents", () => { expect(handleSlackMessage).not.toHaveBeenCalled(); expect(messageQueueMock).toHaveBeenCalledTimes(1); }); + + it("skips app_mention events for DM channel ids even with contradictory channel_type", async () => { + const { handler, handleSlackMessage } = createAppMentionHandlers({ dmPolicy: "open" }); + expect(handler).toBeTruthy(); + + await handler!({ + event: { + type: "app_mention", + channel: "D123", + channel_type: "channel", + user: "U1", + text: "<@U_BOT> hello", + ts: "123.456", + }, + body: {}, + }); + + expect(handleSlackMessage).not.toHaveBeenCalled(); + }); + + it("routes app_mention events from channels to the message handler", async () => { + const { handler, handleSlackMessage } = createAppMentionHandlers({ dmPolicy: "open" }); + expect(handler).toBeTruthy(); + + await handler!({ + event: { + type: "app_mention", + channel: "C123", + channel_type: "channel", + user: "U1", + text: "<@U_BOT> hello", + ts: "123.789", + }, + body: {}, + }); + + expect(handleSlackMessage).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/slack/monitor/events/messages.ts b/src/slack/monitor/events/messages.ts index 9613d633ada..04a1b311958 100644 --- a/src/slack/monitor/events/messages.ts +++ b/src/slack/monitor/events/messages.ts @@ -2,6 +2,7 @@ import type { SlackEventMiddlewareArgs } from "@slack/bolt"; import { danger } from "../../../globals.js"; import { enqueueSystemEvent } from "../../../infra/system-events.js"; import type { SlackAppMentionEvent, SlackMessageEvent } from "../../types.js"; +import { normalizeSlackChannelType } from "../channel-type.js"; import type { SlackMonitorContext } from "../context.js"; import type { SlackMessageHandler } from "../message-handler.js"; import { resolveSlackMessageSubtypeHandler } from "./message-subtype-handlers.js"; @@ -66,7 +67,7 @@ export function registerSlackMessageEvents(params: { // Skip app_mention for DMs - they're already handled by message.im event // This prevents duplicate processing when both message and app_mention fire for DMs - const channelType = mention.channel_type; + const channelType = normalizeSlackChannelType(mention.channel_type, mention.channel); if (channelType === "im" || channelType === "mpim") { return; }