diff --git a/src/auto-reply/reply/abort-cutoff.ts b/src/auto-reply/reply/abort-cutoff.ts new file mode 100644 index 00000000000..44fb8b04ca3 --- /dev/null +++ b/src/auto-reply/reply/abort-cutoff.ts @@ -0,0 +1,138 @@ +import type { SessionEntry } from "../../config/sessions.js"; +import { updateSessionStore } from "../../config/sessions.js"; +import type { MsgContext } from "../templating.js"; + +export type AbortCutoff = { + messageSid?: string; + timestamp?: number; +}; + +type SessionAbortCutoffEntry = Pick; + +export function resolveAbortCutoffFromContext(ctx: MsgContext): AbortCutoff | undefined { + const messageSid = + (typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) || + (typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) || + undefined; + const timestamp = + typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined; + if (!messageSid && timestamp === undefined) { + return undefined; + } + return { messageSid, timestamp }; +} + +export function readAbortCutoffFromSessionEntry( + entry: SessionAbortCutoffEntry | undefined, +): AbortCutoff | undefined { + if (!entry) { + return undefined; + } + const messageSid = entry.abortCutoffMessageSid?.trim() || undefined; + const timestamp = + typeof entry.abortCutoffTimestamp === "number" && Number.isFinite(entry.abortCutoffTimestamp) + ? entry.abortCutoffTimestamp + : undefined; + if (!messageSid && timestamp === undefined) { + return undefined; + } + return { messageSid, timestamp }; +} + +export function hasAbortCutoff(entry: SessionAbortCutoffEntry | undefined): boolean { + return readAbortCutoffFromSessionEntry(entry) !== undefined; +} + +export function applyAbortCutoffToSessionEntry( + entry: SessionAbortCutoffEntry, + cutoff: AbortCutoff | undefined, +): void { + entry.abortCutoffMessageSid = cutoff?.messageSid; + entry.abortCutoffTimestamp = cutoff?.timestamp; +} + +export async function clearAbortCutoffInSession(params: { + sessionEntry?: SessionEntry; + sessionStore?: Record; + sessionKey?: string; + storePath?: string; +}): Promise { + const { sessionEntry, sessionStore, sessionKey, storePath } = params; + if (!sessionEntry || !sessionStore || !sessionKey || !hasAbortCutoff(sessionEntry)) { + return false; + } + + applyAbortCutoffToSessionEntry(sessionEntry, undefined); + sessionEntry.updatedAt = Date.now(); + sessionStore[sessionKey] = sessionEntry; + + if (storePath) { + await updateSessionStore(storePath, (store) => { + const existing = store[sessionKey] ?? sessionEntry; + if (!existing) { + return; + } + applyAbortCutoffToSessionEntry(existing, undefined); + existing.updatedAt = Date.now(); + store[sessionKey] = existing; + }); + } + + return true; +} + +function toNumericMessageSid(value: string | undefined): bigint | undefined { + const trimmed = value?.trim(); + if (!trimmed || !/^\d+$/.test(trimmed)) { + return undefined; + } + try { + return BigInt(trimmed); + } catch { + return undefined; + } +} + +export function shouldSkipMessageByAbortCutoff(params: { + cutoffMessageSid?: string; + cutoffTimestamp?: number; + messageSid?: string; + timestamp?: number; +}): boolean { + const cutoffSid = params.cutoffMessageSid?.trim(); + const currentSid = params.messageSid?.trim(); + if (cutoffSid && currentSid) { + const cutoffNumeric = toNumericMessageSid(cutoffSid); + const currentNumeric = toNumericMessageSid(currentSid); + if (cutoffNumeric !== undefined && currentNumeric !== undefined) { + return currentNumeric <= cutoffNumeric; + } + if (currentSid === cutoffSid) { + return true; + } + } + if ( + typeof params.cutoffTimestamp === "number" && + Number.isFinite(params.cutoffTimestamp) && + typeof params.timestamp === "number" && + Number.isFinite(params.timestamp) + ) { + return params.timestamp <= params.cutoffTimestamp; + } + return false; +} + +export function shouldPersistAbortCutoff(params: { + commandSessionKey?: string; + targetSessionKey?: string; +}): boolean { + const commandSessionKey = params.commandSessionKey?.trim(); + const targetSessionKey = params.targetSessionKey?.trim(); + if (!commandSessionKey || !targetSessionKey) { + return true; + } + // Native targeted /stop can run from a slash/session-control key while the + // actual target session uses different message id/timestamp spaces. + // Persist cutoff only when command source and target are the same session. + return commandSessionKey === targetSessionKey; +} diff --git a/src/auto-reply/reply/abort.ts b/src/auto-reply/reply/abort.ts index a59ffaaee27..0b318272d20 100644 --- a/src/auto-reply/reply/abort.ts +++ b/src/auto-reply/reply/abort.ts @@ -20,9 +20,16 @@ import { parseAgentSessionKey } from "../../routing/session-key.js"; import { resolveCommandAuthorization } from "../command-auth.js"; import { normalizeCommandBody, type CommandNormalizeOptions } from "../commands-registry.js"; import type { FinalizedMsgContext, MsgContext } from "../templating.js"; +import { + applyAbortCutoffToSessionEntry, + resolveAbortCutoffFromContext, + shouldPersistAbortCutoff, +} from "./abort-cutoff.js"; import { stripMentions, stripStructuralPrefixes } from "./mentions.js"; import { clearSessionQueues } from "./queue.js"; +export { resolveAbortCutoffFromContext, shouldSkipMessageByAbortCutoff } from "./abort-cutoff.js"; + const ABORT_TRIGGERS = new Set([ "stop", "esc", @@ -113,80 +120,6 @@ export function getAbortMemory(key: string): boolean | undefined { return ABORT_MEMORY.get(normalized); } -export type AbortCutoff = { - messageSid?: string; - timestamp?: number; -}; - -export function resolveAbortCutoffFromContext(ctx: MsgContext): AbortCutoff | undefined { - const messageSid = - (typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) || - (typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) || - undefined; - const timestamp = - typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined; - if (!messageSid && timestamp === undefined) { - return undefined; - } - return { messageSid, timestamp }; -} - -function toNumericMessageSid(value: string | undefined): bigint | undefined { - const trimmed = value?.trim(); - if (!trimmed || !/^\d+$/.test(trimmed)) { - return undefined; - } - try { - return BigInt(trimmed); - } catch { - return undefined; - } -} - -export function shouldSkipMessageByAbortCutoff(params: { - cutoffMessageSid?: string; - cutoffTimestamp?: number; - messageSid?: string; - timestamp?: number; -}): boolean { - const cutoffSid = params.cutoffMessageSid?.trim(); - const currentSid = params.messageSid?.trim(); - if (cutoffSid && currentSid) { - const cutoffNumeric = toNumericMessageSid(cutoffSid); - const currentNumeric = toNumericMessageSid(currentSid); - if (cutoffNumeric !== undefined && currentNumeric !== undefined) { - return currentNumeric <= cutoffNumeric; - } - if (currentSid === cutoffSid) { - return true; - } - } - if ( - typeof params.cutoffTimestamp === "number" && - Number.isFinite(params.cutoffTimestamp) && - typeof params.timestamp === "number" && - Number.isFinite(params.timestamp) - ) { - return params.timestamp <= params.cutoffTimestamp; - } - return false; -} - -function shouldPersistAbortCutoff(params: { - commandSessionKey?: string; - targetSessionKey?: string; -}): boolean { - const commandSessionKey = params.commandSessionKey?.trim(); - const targetSessionKey = params.targetSessionKey?.trim(); - if (!commandSessionKey || !targetSessionKey) { - return true; - } - // Native targeted /stop can run from a slash/session-control key while the - // actual target session uses different message id/timestamp spaces. - // Persist cutoff only when command source and target are the same session. - return commandSessionKey === targetSessionKey; -} - function pruneAbortMemory(): void { if (ABORT_MEMORY.size <= ABORT_MEMORY_MAX) { return; @@ -384,8 +317,7 @@ export async function tryFastAbortFromMessage(params: { : undefined; if (entry && key) { entry.abortedLastRun = true; - entry.abortCutoffMessageSid = abortCutoff?.messageSid; - entry.abortCutoffTimestamp = abortCutoff?.timestamp; + applyAbortCutoffToSessionEntry(entry, abortCutoff); entry.updatedAt = Date.now(); store[key] = entry; await updateSessionStore(storePath, (nextStore) => { @@ -394,8 +326,7 @@ export async function tryFastAbortFromMessage(params: { return; } nextEntry.abortedLastRun = true; - nextEntry.abortCutoffMessageSid = abortCutoff?.messageSid; - nextEntry.abortCutoffTimestamp = abortCutoff?.timestamp; + applyAbortCutoffToSessionEntry(nextEntry, abortCutoff); nextEntry.updatedAt = Date.now(); nextStore[key] = nextEntry; }); diff --git a/src/auto-reply/reply/commands-session-abort.ts b/src/auto-reply/reply/commands-session-abort.ts new file mode 100644 index 00000000000..f6683cc4df7 --- /dev/null +++ b/src/auto-reply/reply/commands-session-abort.ts @@ -0,0 +1,172 @@ +import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js"; +import type { SessionEntry } from "../../config/sessions.js"; +import { logVerbose } from "../../globals.js"; +import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; +import { + resolveAbortCutoffFromContext, + shouldPersistAbortCutoff, + type AbortCutoff, +} from "./abort-cutoff.js"; +import { + formatAbortReplyText, + isAbortTrigger, + resolveSessionEntryForKey, + setAbortMemory, + stopSubagentsForRequester, +} from "./abort.js"; +import { persistAbortTargetEntry } from "./commands-session-store.js"; +import type { CommandHandler } from "./commands-types.js"; +import { clearSessionQueues } from "./queue.js"; + +type AbortTarget = { + entry?: SessionEntry; + key?: string; + sessionId?: string; +}; + +function resolveAbortTarget(params: { + ctx: { CommandTargetSessionKey?: string | null }; + sessionKey?: string; + sessionEntry?: SessionEntry; + sessionStore?: Record; +}): AbortTarget { + const targetSessionKey = params.ctx.CommandTargetSessionKey?.trim() || params.sessionKey; + const { entry, key } = resolveSessionEntryForKey(params.sessionStore, targetSessionKey); + if (entry && key) { + return { entry, key, sessionId: entry.sessionId }; + } + if (params.sessionEntry && params.sessionKey) { + return { + entry: params.sessionEntry, + key: params.sessionKey, + sessionId: params.sessionEntry.sessionId, + }; + } + return { entry: undefined, key: targetSessionKey, sessionId: undefined }; +} + +function resolveAbortCutoffForTarget(params: { + ctx: Parameters[0]["ctx"]; + commandSessionKey?: string; + targetSessionKey?: string; +}): AbortCutoff | undefined { + if ( + !shouldPersistAbortCutoff({ + commandSessionKey: params.commandSessionKey, + targetSessionKey: params.targetSessionKey, + }) + ) { + return undefined; + } + return resolveAbortCutoffFromContext(params.ctx); +} + +async function applyAbortTarget(params: { + abortTarget: AbortTarget; + sessionStore?: Record; + storePath?: string; + abortKey?: string; + abortCutoff?: AbortCutoff; +}) { + const { abortTarget } = params; + if (abortTarget.sessionId) { + abortEmbeddedPiRun(abortTarget.sessionId); + } + + const persisted = await persistAbortTargetEntry({ + entry: abortTarget.entry, + key: abortTarget.key, + sessionStore: params.sessionStore, + storePath: params.storePath, + abortCutoff: params.abortCutoff, + }); + if (!persisted && params.abortKey) { + setAbortMemory(params.abortKey, true); + } +} + +export const handleStopCommand: CommandHandler = async (params, allowTextCommands) => { + if (!allowTextCommands) { + return null; + } + if (params.command.commandBodyNormalized !== "/stop") { + return null; + } + if (!params.command.isAuthorizedSender) { + logVerbose( + `Ignoring /stop from unauthorized sender: ${params.command.senderId || ""}`, + ); + return { shouldContinue: false }; + } + const abortTarget = resolveAbortTarget({ + ctx: params.ctx, + sessionKey: params.sessionKey, + sessionEntry: params.sessionEntry, + sessionStore: params.sessionStore, + }); + const cleared = clearSessionQueues([abortTarget.key, abortTarget.sessionId]); + if (cleared.followupCleared > 0 || cleared.laneCleared > 0) { + logVerbose( + `stop: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`, + ); + } + await applyAbortTarget({ + abortTarget, + sessionStore: params.sessionStore, + storePath: params.storePath, + abortKey: params.command.abortKey, + abortCutoff: resolveAbortCutoffForTarget({ + ctx: params.ctx, + commandSessionKey: params.sessionKey, + targetSessionKey: abortTarget.key, + }), + }); + + // Trigger internal hook for stop command + const hookEvent = createInternalHookEvent( + "command", + "stop", + abortTarget.key ?? params.sessionKey ?? "", + { + sessionEntry: abortTarget.entry ?? params.sessionEntry, + sessionId: abortTarget.sessionId, + commandSource: params.command.surface, + senderId: params.command.senderId, + }, + ); + await triggerInternalHook(hookEvent); + + const { stopped } = stopSubagentsForRequester({ + cfg: params.cfg, + requesterSessionKey: abortTarget.key ?? params.sessionKey, + }); + + return { shouldContinue: false, reply: { text: formatAbortReplyText(stopped) } }; +}; + +export const handleAbortTrigger: CommandHandler = async (params, allowTextCommands) => { + if (!allowTextCommands) { + return null; + } + if (!isAbortTrigger(params.command.rawBodyNormalized)) { + return null; + } + const abortTarget = resolveAbortTarget({ + ctx: params.ctx, + sessionKey: params.sessionKey, + sessionEntry: params.sessionEntry, + sessionStore: params.sessionStore, + }); + await applyAbortTarget({ + abortTarget, + sessionStore: params.sessionStore, + storePath: params.storePath, + abortKey: params.command.abortKey, + abortCutoff: resolveAbortCutoffForTarget({ + ctx: params.ctx, + commandSessionKey: params.sessionKey, + targetSessionKey: abortTarget.key, + }), + }); + return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } }; +}; diff --git a/src/auto-reply/reply/commands-session-store.ts b/src/auto-reply/reply/commands-session-store.ts new file mode 100644 index 00000000000..dd7e223d89b --- /dev/null +++ b/src/auto-reply/reply/commands-session-store.ts @@ -0,0 +1,53 @@ +import type { SessionEntry } from "../../config/sessions.js"; +import { updateSessionStore } from "../../config/sessions.js"; +import { applyAbortCutoffToSessionEntry, type AbortCutoff } from "./abort-cutoff.js"; +import type { CommandHandler } from "./commands-types.js"; + +type CommandParams = Parameters[0]; + +export async function persistSessionEntry(params: CommandParams): Promise { + if (!params.sessionEntry || !params.sessionStore || !params.sessionKey) { + return false; + } + params.sessionEntry.updatedAt = Date.now(); + params.sessionStore[params.sessionKey] = params.sessionEntry; + if (params.storePath) { + await updateSessionStore(params.storePath, (store) => { + store[params.sessionKey] = params.sessionEntry as SessionEntry; + }); + } + return true; +} + +export async function persistAbortTargetEntry(params: { + entry?: SessionEntry; + key?: string; + sessionStore?: Record; + storePath?: string; + abortCutoff?: AbortCutoff; +}): Promise { + const { entry, key, sessionStore, storePath, abortCutoff } = params; + if (!entry || !key || !sessionStore) { + return false; + } + + entry.abortedLastRun = true; + applyAbortCutoffToSessionEntry(entry, abortCutoff); + entry.updatedAt = Date.now(); + sessionStore[key] = entry; + + if (storePath) { + await updateSessionStore(storePath, (store) => { + const nextEntry = store[key] ?? entry; + if (!nextEntry) { + return; + } + nextEntry.abortedLastRun = true; + applyAbortCutoffToSessionEntry(nextEntry, abortCutoff); + nextEntry.updatedAt = Date.now(); + store[key] = nextEntry; + }); + } + + return true; +} diff --git a/src/auto-reply/reply/commands-session.ts b/src/auto-reply/reply/commands-session.ts index 5a752fe367c..d6e21eda15d 100644 --- a/src/auto-reply/reply/commands-session.ts +++ b/src/auto-reply/reply/commands-session.ts @@ -1,52 +1,20 @@ -import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js"; import { parseDurationMs } from "../../cli/parse-duration.js"; import { isRestartEnabled } from "../../config/commands.js"; -import type { SessionEntry } from "../../config/sessions.js"; -import { updateSessionStore } from "../../config/sessions.js"; import { formatThreadBindingTtlLabel, getThreadBindingManager, setThreadBindingTtlBySessionKey, } from "../../discord/monitor/thread-bindings.js"; import { logVerbose } from "../../globals.js"; -import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { scheduleGatewaySigusr1Restart, triggerOpenClawRestart } from "../../infra/restart.js"; import { loadCostUsageSummary, loadSessionCostSummary } from "../../infra/session-cost-usage.js"; import { formatTokenCount, formatUsd } from "../../utils/usage-format.js"; import { parseActivationCommand } from "../group-activation.js"; import { parseSendPolicyCommand } from "../send-policy.js"; import { normalizeUsageDisplay, resolveResponseUsageMode } from "../thinking.js"; -import { - formatAbortReplyText, - isAbortTrigger, - resolveAbortCutoffFromContext, - resolveSessionEntryForKey, - setAbortMemory, - stopSubagentsForRequester, -} from "./abort.js"; +import { handleAbortTrigger, handleStopCommand } from "./commands-session-abort.js"; +import { persistSessionEntry } from "./commands-session-store.js"; import type { CommandHandler } from "./commands-types.js"; -import { clearSessionQueues } from "./queue.js"; - -function resolveAbortTarget(params: { - ctx: { CommandTargetSessionKey?: string | null }; - sessionKey?: string; - sessionEntry?: SessionEntry; - sessionStore?: Record; -}) { - const targetSessionKey = params.ctx.CommandTargetSessionKey?.trim() || params.sessionKey; - const { entry, key } = resolveSessionEntryForKey(params.sessionStore, targetSessionKey); - if (entry && key) { - return { entry, key, sessionId: entry.sessionId }; - } - if (params.sessionEntry && params.sessionKey) { - return { - entry: params.sessionEntry, - key: params.sessionKey, - sessionId: params.sessionEntry.sessionId, - }; - } - return { entry: undefined, key: targetSessionKey, sessionId: undefined }; -} const SESSION_COMMAND_PREFIX = "/session"; const SESSION_TTL_OFF_VALUES = new Set(["off", "disable", "disabled", "none", "0"]); @@ -95,53 +63,6 @@ function formatSessionExpiry(expiresAt: number) { return new Date(expiresAt).toISOString(); } -async function applyAbortTarget(params: { - abortTarget: ReturnType; - sessionStore?: Record; - storePath?: string; - abortKey?: string; - abortCutoff?: { messageSid?: string; timestamp?: number }; -}) { - const { abortTarget } = params; - if (abortTarget.sessionId) { - abortEmbeddedPiRun(abortTarget.sessionId); - } - if (abortTarget.entry && params.sessionStore && abortTarget.key) { - abortTarget.entry.abortedLastRun = true; - abortTarget.entry.abortCutoffMessageSid = params.abortCutoff?.messageSid; - abortTarget.entry.abortCutoffTimestamp = params.abortCutoff?.timestamp; - abortTarget.entry.updatedAt = Date.now(); - params.sessionStore[abortTarget.key] = abortTarget.entry; - if (params.storePath) { - await updateSessionStore(params.storePath, (store) => { - store[abortTarget.key] = { - ...abortTarget.entry, - abortedLastRun: true, - abortCutoffMessageSid: params.abortCutoff?.messageSid, - abortCutoffTimestamp: params.abortCutoff?.timestamp, - updatedAt: Date.now(), - }; - }); - } - } else if (params.abortKey) { - setAbortMemory(params.abortKey, true); - } -} - -async function persistSessionEntry(params: Parameters[0]): Promise { - if (!params.sessionEntry || !params.sessionStore || !params.sessionKey) { - return false; - } - params.sessionEntry.updatedAt = Date.now(); - params.sessionStore[params.sessionKey] = params.sessionEntry; - if (params.storePath) { - await updateSessionStore(params.storePath, (store) => { - store[params.sessionKey] = params.sessionEntry as SessionEntry; - }); - } - return true; -} - export const handleActivationCommand: CommandHandler = async (params, allowTextCommands) => { if (!allowTextCommands) { return null; @@ -483,90 +404,4 @@ export const handleRestartCommand: CommandHandler = async (params, allowTextComm }; }; -export const handleStopCommand: CommandHandler = async (params, allowTextCommands) => { - if (!allowTextCommands) { - return null; - } - if (params.command.commandBodyNormalized !== "/stop") { - return null; - } - if (!params.command.isAuthorizedSender) { - logVerbose( - `Ignoring /stop from unauthorized sender: ${params.command.senderId || ""}`, - ); - return { shouldContinue: false }; - } - const abortTarget = resolveAbortTarget({ - ctx: params.ctx, - sessionKey: params.sessionKey, - sessionEntry: params.sessionEntry, - sessionStore: params.sessionStore, - }); - const cleared = clearSessionQueues([abortTarget.key, abortTarget.sessionId]); - if (cleared.followupCleared > 0 || cleared.laneCleared > 0) { - logVerbose( - `stop: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`, - ); - } - await applyAbortTarget({ - abortTarget, - sessionStore: params.sessionStore, - storePath: params.storePath, - abortKey: params.command.abortKey, - abortCutoff: - params.sessionKey?.trim() && - abortTarget.key?.trim() && - params.sessionKey.trim() === abortTarget.key.trim() - ? resolveAbortCutoffFromContext(params.ctx) - : undefined, - }); - - // Trigger internal hook for stop command - const hookEvent = createInternalHookEvent( - "command", - "stop", - abortTarget.key ?? params.sessionKey ?? "", - { - sessionEntry: abortTarget.entry ?? params.sessionEntry, - sessionId: abortTarget.sessionId, - commandSource: params.command.surface, - senderId: params.command.senderId, - }, - ); - await triggerInternalHook(hookEvent); - - const { stopped } = stopSubagentsForRequester({ - cfg: params.cfg, - requesterSessionKey: abortTarget.key ?? params.sessionKey, - }); - - return { shouldContinue: false, reply: { text: formatAbortReplyText(stopped) } }; -}; - -export const handleAbortTrigger: CommandHandler = async (params, allowTextCommands) => { - if (!allowTextCommands) { - return null; - } - if (!isAbortTrigger(params.command.rawBodyNormalized)) { - return null; - } - const abortTarget = resolveAbortTarget({ - ctx: params.ctx, - sessionKey: params.sessionKey, - sessionEntry: params.sessionEntry, - sessionStore: params.sessionStore, - }); - await applyAbortTarget({ - abortTarget, - sessionStore: params.sessionStore, - storePath: params.storePath, - abortKey: params.command.abortKey, - abortCutoff: - params.sessionKey?.trim() && - abortTarget.key?.trim() && - params.sessionKey.trim() === abortTarget.key.trim() - ? resolveAbortCutoffFromContext(params.ctx) - : undefined, - }); - return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } }; -}; +export { handleAbortTrigger, handleStopCommand }; diff --git a/src/auto-reply/reply/get-reply-inline-actions.ts b/src/auto-reply/reply/get-reply-inline-actions.ts index 2f399845172..4abb9a82f82 100644 --- a/src/auto-reply/reply/get-reply-inline-actions.ts +++ b/src/auto-reply/reply/get-reply-inline-actions.ts @@ -5,7 +5,6 @@ import { applyOwnerOnlyToolPolicy } from "../../agents/tool-policy.js"; import { getChannelDock } from "../../channels/dock.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { SessionEntry } from "../../config/sessions.js"; -import { updateSessionStore } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; import { generateSecureToken } from "../../infra/secure-random.js"; import { resolveGatewayMessageChannel } from "../../utils/message-channel.js"; @@ -17,7 +16,13 @@ import { import type { MsgContext, TemplateContext } from "../templating.js"; import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../thinking.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; -import { getAbortMemory, isAbortRequestText, shouldSkipMessageByAbortCutoff } from "./abort.js"; +import { + clearAbortCutoffInSession, + readAbortCutoffFromSessionEntry, + resolveAbortCutoffFromContext, + shouldSkipMessageByAbortCutoff, +} from "./abort-cutoff.js"; +import { getAbortMemory, isAbortRequestText } from "./abort.js"; import { buildStatusReply, handleCommands } from "./commands.js"; import type { InlineDirectives } from "./directive-handling.js"; import { isDirectiveOnly } from "./directive-handling.js"; @@ -253,54 +258,29 @@ export async function handleInlineActions(params: { await opts.onBlockReply(reply); }; - const clearAbortCutoff = async () => { - if (!sessionEntry || !sessionStore || !sessionKey) { - return; - } - if ( - sessionEntry.abortCutoffMessageSid === undefined && - sessionEntry.abortCutoffTimestamp === undefined - ) { - return; - } - sessionEntry.abortCutoffMessageSid = undefined; - sessionEntry.abortCutoffTimestamp = undefined; - sessionEntry.updatedAt = Date.now(); - sessionStore[sessionKey] = sessionEntry; - if (storePath) { - await updateSessionStore(storePath, (store) => { - const existing = store[sessionKey] ?? sessionEntry; - if (!existing) { - return; - } - existing.abortCutoffMessageSid = undefined; - existing.abortCutoffTimestamp = undefined; - existing.updatedAt = Date.now(); - store[sessionKey] = existing; - }); - } - }; - const isStopLikeInbound = isAbortRequestText(command.rawBodyNormalized); if (!isStopLikeInbound && sessionEntry) { - const shouldSkip = shouldSkipMessageByAbortCutoff({ - cutoffMessageSid: sessionEntry.abortCutoffMessageSid, - cutoffTimestamp: sessionEntry.abortCutoffTimestamp, - messageSid: - (typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) || - (typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) || - undefined, - timestamp: typeof ctx.Timestamp === "number" ? ctx.Timestamp : undefined, - }); + const cutoff = readAbortCutoffFromSessionEntry(sessionEntry); + const incoming = resolveAbortCutoffFromContext(ctx); + const shouldSkip = cutoff + ? shouldSkipMessageByAbortCutoff({ + cutoffMessageSid: cutoff.messageSid, + cutoffTimestamp: cutoff.timestamp, + messageSid: incoming?.messageSid, + timestamp: incoming?.timestamp, + }) + : false; if (shouldSkip) { typing.cleanup(); return { kind: "reply", reply: undefined }; } - if ( - sessionEntry.abortCutoffMessageSid !== undefined || - sessionEntry.abortCutoffTimestamp !== undefined - ) { - await clearAbortCutoff(); + if (cutoff) { + await clearAbortCutoffInSession({ + sessionEntry, + sessionStore, + sessionKey, + storePath, + }); } } diff --git a/src/cron/service/timeout-policy.test.ts b/src/cron/service/timeout-policy.test.ts new file mode 100644 index 00000000000..69ca6aa46c3 --- /dev/null +++ b/src/cron/service/timeout-policy.test.ts @@ -0,0 +1,49 @@ +import { describe, expect, it } from "vitest"; +import type { CronJob } from "../types.js"; +import { + AGENT_TURN_SAFETY_TIMEOUT_MS, + DEFAULT_JOB_TIMEOUT_MS, + resolveCronJobTimeoutMs, +} from "./timeout-policy.js"; + +function makeJob(payload: CronJob["payload"]): CronJob { + const sessionTarget = payload.kind === "agentTurn" ? "isolated" : "main"; + return { + id: "job-1", + name: "job", + createdAtMs: 0, + updatedAtMs: 0, + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget, + wakeMode: "next-heartbeat", + payload, + state: {}, + }; +} + +describe("timeout-policy", () => { + it("uses default timeout for non-agent jobs", () => { + const timeout = resolveCronJobTimeoutMs(makeJob({ kind: "systemEvent", text: "hello" })); + expect(timeout).toBe(DEFAULT_JOB_TIMEOUT_MS); + }); + + it("uses expanded safety timeout for agentTurn jobs without explicit timeout", () => { + const timeout = resolveCronJobTimeoutMs(makeJob({ kind: "agentTurn", message: "hi" })); + expect(timeout).toBe(AGENT_TURN_SAFETY_TIMEOUT_MS); + }); + + it("disables timeout when timeoutSeconds <= 0", () => { + const timeout = resolveCronJobTimeoutMs( + makeJob({ kind: "agentTurn", message: "hi", timeoutSeconds: 0 }), + ); + expect(timeout).toBeUndefined(); + }); + + it("applies explicit timeoutSeconds when positive", () => { + const timeout = resolveCronJobTimeoutMs( + makeJob({ kind: "agentTurn", message: "hi", timeoutSeconds: 1.9 }), + ); + expect(timeout).toBe(1_900); + }); +}); diff --git a/src/cron/service/timeout-policy.ts b/src/cron/service/timeout-policy.ts new file mode 100644 index 00000000000..7b03b8bda52 --- /dev/null +++ b/src/cron/service/timeout-policy.ts @@ -0,0 +1,25 @@ +import type { CronJob } from "../types.js"; + +/** + * Maximum wall-clock time for a single job execution. Acts as a safety net + * on top of per-provider/per-agent timeouts to prevent one stuck job from + * wedging the entire cron lane. + */ +export const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes + +/** + * Agent turns can legitimately run much longer than generic cron jobs. + * Use a larger safety ceiling when no explicit timeout is set. + */ +export const AGENT_TURN_SAFETY_TIMEOUT_MS = 60 * 60_000; // 60 minutes + +export function resolveCronJobTimeoutMs(job: CronJob): number | undefined { + const configuredTimeoutMs = + job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number" + ? Math.floor(job.payload.timeoutSeconds * 1_000) + : undefined; + if (configuredTimeoutMs === undefined) { + return job.payload.kind === "agentTurn" ? AGENT_TURN_SAFETY_TIMEOUT_MS : DEFAULT_JOB_TIMEOUT_MS; + } + return configuredTimeoutMs <= 0 ? undefined : configuredTimeoutMs; +} diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 6058777cdfd..8267d4c970a 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -18,6 +18,9 @@ import { import { locked } from "./locked.js"; import type { CronEvent, CronServiceState } from "./state.js"; import { ensureLoaded, persist } from "./store.js"; +import { DEFAULT_JOB_TIMEOUT_MS, resolveCronJobTimeoutMs } from "./timeout-policy.js"; + +export { DEFAULT_JOB_TIMEOUT_MS } from "./timeout-policy.js"; const MAX_TIMER_DELAY_MS = 60_000; @@ -30,14 +33,6 @@ const MAX_TIMER_DELAY_MS = 60_000; */ const MIN_REFIRE_GAP_MS = 2_000; -/** - * Maximum wall-clock time for a single job execution. Acts as a safety net - * on top of the per-provider / per-agent timeouts to prevent one stuck job - * from wedging the entire cron lane. - */ -export const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes -const AGENT_TURN_SAFETY_TIMEOUT_MS = 60 * 60_000; // 60 minutes - type TimedCronRunOutcome = CronRunOutcome & CronRunTelemetry & { jobId: string; @@ -47,17 +42,6 @@ type TimedCronRunOutcome = CronRunOutcome & endedAt: number; }; -function resolveCronJobTimeoutMs(job: CronJob): number | undefined { - const configuredTimeoutMs = - job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number" - ? Math.floor(job.payload.timeoutSeconds * 1_000) - : undefined; - if (configuredTimeoutMs === undefined) { - return job.payload.kind === "agentTurn" ? AGENT_TURN_SAFETY_TIMEOUT_MS : DEFAULT_JOB_TIMEOUT_MS; - } - return configuredTimeoutMs <= 0 ? undefined : configuredTimeoutMs; -} - export async function executeJobCoreWithTimeout( state: CronServiceState, job: CronJob,