mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-21 16:41:56 +00:00
refactor(reply): split abort cutoff and timeout policy modules
This commit is contained in:
138
src/auto-reply/reply/abort-cutoff.ts
Normal file
138
src/auto-reply/reply/abort-cutoff.ts
Normal file
@@ -0,0 +1,138 @@
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import { updateSessionStore } from "../../config/sessions.js";
|
||||
import type { MsgContext } from "../templating.js";
|
||||
|
||||
export type AbortCutoff = {
|
||||
messageSid?: string;
|
||||
timestamp?: number;
|
||||
};
|
||||
|
||||
type SessionAbortCutoffEntry = Pick<SessionEntry, "abortCutoffMessageSid" | "abortCutoffTimestamp">;
|
||||
|
||||
export function resolveAbortCutoffFromContext(ctx: MsgContext): AbortCutoff | undefined {
|
||||
const messageSid =
|
||||
(typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) ||
|
||||
(typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) ||
|
||||
undefined;
|
||||
const timestamp =
|
||||
typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined;
|
||||
if (!messageSid && timestamp === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
return { messageSid, timestamp };
|
||||
}
|
||||
|
||||
export function readAbortCutoffFromSessionEntry(
|
||||
entry: SessionAbortCutoffEntry | undefined,
|
||||
): AbortCutoff | undefined {
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
const messageSid = entry.abortCutoffMessageSid?.trim() || undefined;
|
||||
const timestamp =
|
||||
typeof entry.abortCutoffTimestamp === "number" && Number.isFinite(entry.abortCutoffTimestamp)
|
||||
? entry.abortCutoffTimestamp
|
||||
: undefined;
|
||||
if (!messageSid && timestamp === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
return { messageSid, timestamp };
|
||||
}
|
||||
|
||||
export function hasAbortCutoff(entry: SessionAbortCutoffEntry | undefined): boolean {
|
||||
return readAbortCutoffFromSessionEntry(entry) !== undefined;
|
||||
}
|
||||
|
||||
export function applyAbortCutoffToSessionEntry(
|
||||
entry: SessionAbortCutoffEntry,
|
||||
cutoff: AbortCutoff | undefined,
|
||||
): void {
|
||||
entry.abortCutoffMessageSid = cutoff?.messageSid;
|
||||
entry.abortCutoffTimestamp = cutoff?.timestamp;
|
||||
}
|
||||
|
||||
export async function clearAbortCutoffInSession(params: {
|
||||
sessionEntry?: SessionEntry;
|
||||
sessionStore?: Record<string, SessionEntry>;
|
||||
sessionKey?: string;
|
||||
storePath?: string;
|
||||
}): Promise<boolean> {
|
||||
const { sessionEntry, sessionStore, sessionKey, storePath } = params;
|
||||
if (!sessionEntry || !sessionStore || !sessionKey || !hasAbortCutoff(sessionEntry)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
applyAbortCutoffToSessionEntry(sessionEntry, undefined);
|
||||
sessionEntry.updatedAt = Date.now();
|
||||
sessionStore[sessionKey] = sessionEntry;
|
||||
|
||||
if (storePath) {
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
const existing = store[sessionKey] ?? sessionEntry;
|
||||
if (!existing) {
|
||||
return;
|
||||
}
|
||||
applyAbortCutoffToSessionEntry(existing, undefined);
|
||||
existing.updatedAt = Date.now();
|
||||
store[sessionKey] = existing;
|
||||
});
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
function toNumericMessageSid(value: string | undefined): bigint | undefined {
|
||||
const trimmed = value?.trim();
|
||||
if (!trimmed || !/^\d+$/.test(trimmed)) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
return BigInt(trimmed);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export function shouldSkipMessageByAbortCutoff(params: {
|
||||
cutoffMessageSid?: string;
|
||||
cutoffTimestamp?: number;
|
||||
messageSid?: string;
|
||||
timestamp?: number;
|
||||
}): boolean {
|
||||
const cutoffSid = params.cutoffMessageSid?.trim();
|
||||
const currentSid = params.messageSid?.trim();
|
||||
if (cutoffSid && currentSid) {
|
||||
const cutoffNumeric = toNumericMessageSid(cutoffSid);
|
||||
const currentNumeric = toNumericMessageSid(currentSid);
|
||||
if (cutoffNumeric !== undefined && currentNumeric !== undefined) {
|
||||
return currentNumeric <= cutoffNumeric;
|
||||
}
|
||||
if (currentSid === cutoffSid) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (
|
||||
typeof params.cutoffTimestamp === "number" &&
|
||||
Number.isFinite(params.cutoffTimestamp) &&
|
||||
typeof params.timestamp === "number" &&
|
||||
Number.isFinite(params.timestamp)
|
||||
) {
|
||||
return params.timestamp <= params.cutoffTimestamp;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
export function shouldPersistAbortCutoff(params: {
|
||||
commandSessionKey?: string;
|
||||
targetSessionKey?: string;
|
||||
}): boolean {
|
||||
const commandSessionKey = params.commandSessionKey?.trim();
|
||||
const targetSessionKey = params.targetSessionKey?.trim();
|
||||
if (!commandSessionKey || !targetSessionKey) {
|
||||
return true;
|
||||
}
|
||||
// Native targeted /stop can run from a slash/session-control key while the
|
||||
// actual target session uses different message id/timestamp spaces.
|
||||
// Persist cutoff only when command source and target are the same session.
|
||||
return commandSessionKey === targetSessionKey;
|
||||
}
|
||||
@@ -20,9 +20,16 @@ import { parseAgentSessionKey } from "../../routing/session-key.js";
|
||||
import { resolveCommandAuthorization } from "../command-auth.js";
|
||||
import { normalizeCommandBody, type CommandNormalizeOptions } from "../commands-registry.js";
|
||||
import type { FinalizedMsgContext, MsgContext } from "../templating.js";
|
||||
import {
|
||||
applyAbortCutoffToSessionEntry,
|
||||
resolveAbortCutoffFromContext,
|
||||
shouldPersistAbortCutoff,
|
||||
} from "./abort-cutoff.js";
|
||||
import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
|
||||
import { clearSessionQueues } from "./queue.js";
|
||||
|
||||
export { resolveAbortCutoffFromContext, shouldSkipMessageByAbortCutoff } from "./abort-cutoff.js";
|
||||
|
||||
const ABORT_TRIGGERS = new Set([
|
||||
"stop",
|
||||
"esc",
|
||||
@@ -113,80 +120,6 @@ export function getAbortMemory(key: string): boolean | undefined {
|
||||
return ABORT_MEMORY.get(normalized);
|
||||
}
|
||||
|
||||
export type AbortCutoff = {
|
||||
messageSid?: string;
|
||||
timestamp?: number;
|
||||
};
|
||||
|
||||
export function resolveAbortCutoffFromContext(ctx: MsgContext): AbortCutoff | undefined {
|
||||
const messageSid =
|
||||
(typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) ||
|
||||
(typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) ||
|
||||
undefined;
|
||||
const timestamp =
|
||||
typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined;
|
||||
if (!messageSid && timestamp === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
return { messageSid, timestamp };
|
||||
}
|
||||
|
||||
function toNumericMessageSid(value: string | undefined): bigint | undefined {
|
||||
const trimmed = value?.trim();
|
||||
if (!trimmed || !/^\d+$/.test(trimmed)) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
return BigInt(trimmed);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export function shouldSkipMessageByAbortCutoff(params: {
|
||||
cutoffMessageSid?: string;
|
||||
cutoffTimestamp?: number;
|
||||
messageSid?: string;
|
||||
timestamp?: number;
|
||||
}): boolean {
|
||||
const cutoffSid = params.cutoffMessageSid?.trim();
|
||||
const currentSid = params.messageSid?.trim();
|
||||
if (cutoffSid && currentSid) {
|
||||
const cutoffNumeric = toNumericMessageSid(cutoffSid);
|
||||
const currentNumeric = toNumericMessageSid(currentSid);
|
||||
if (cutoffNumeric !== undefined && currentNumeric !== undefined) {
|
||||
return currentNumeric <= cutoffNumeric;
|
||||
}
|
||||
if (currentSid === cutoffSid) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (
|
||||
typeof params.cutoffTimestamp === "number" &&
|
||||
Number.isFinite(params.cutoffTimestamp) &&
|
||||
typeof params.timestamp === "number" &&
|
||||
Number.isFinite(params.timestamp)
|
||||
) {
|
||||
return params.timestamp <= params.cutoffTimestamp;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function shouldPersistAbortCutoff(params: {
|
||||
commandSessionKey?: string;
|
||||
targetSessionKey?: string;
|
||||
}): boolean {
|
||||
const commandSessionKey = params.commandSessionKey?.trim();
|
||||
const targetSessionKey = params.targetSessionKey?.trim();
|
||||
if (!commandSessionKey || !targetSessionKey) {
|
||||
return true;
|
||||
}
|
||||
// Native targeted /stop can run from a slash/session-control key while the
|
||||
// actual target session uses different message id/timestamp spaces.
|
||||
// Persist cutoff only when command source and target are the same session.
|
||||
return commandSessionKey === targetSessionKey;
|
||||
}
|
||||
|
||||
function pruneAbortMemory(): void {
|
||||
if (ABORT_MEMORY.size <= ABORT_MEMORY_MAX) {
|
||||
return;
|
||||
@@ -384,8 +317,7 @@ export async function tryFastAbortFromMessage(params: {
|
||||
: undefined;
|
||||
if (entry && key) {
|
||||
entry.abortedLastRun = true;
|
||||
entry.abortCutoffMessageSid = abortCutoff?.messageSid;
|
||||
entry.abortCutoffTimestamp = abortCutoff?.timestamp;
|
||||
applyAbortCutoffToSessionEntry(entry, abortCutoff);
|
||||
entry.updatedAt = Date.now();
|
||||
store[key] = entry;
|
||||
await updateSessionStore(storePath, (nextStore) => {
|
||||
@@ -394,8 +326,7 @@ export async function tryFastAbortFromMessage(params: {
|
||||
return;
|
||||
}
|
||||
nextEntry.abortedLastRun = true;
|
||||
nextEntry.abortCutoffMessageSid = abortCutoff?.messageSid;
|
||||
nextEntry.abortCutoffTimestamp = abortCutoff?.timestamp;
|
||||
applyAbortCutoffToSessionEntry(nextEntry, abortCutoff);
|
||||
nextEntry.updatedAt = Date.now();
|
||||
nextStore[key] = nextEntry;
|
||||
});
|
||||
|
||||
172
src/auto-reply/reply/commands-session-abort.ts
Normal file
172
src/auto-reply/reply/commands-session-abort.ts
Normal file
@@ -0,0 +1,172 @@
|
||||
import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js";
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
|
||||
import {
|
||||
resolveAbortCutoffFromContext,
|
||||
shouldPersistAbortCutoff,
|
||||
type AbortCutoff,
|
||||
} from "./abort-cutoff.js";
|
||||
import {
|
||||
formatAbortReplyText,
|
||||
isAbortTrigger,
|
||||
resolveSessionEntryForKey,
|
||||
setAbortMemory,
|
||||
stopSubagentsForRequester,
|
||||
} from "./abort.js";
|
||||
import { persistAbortTargetEntry } from "./commands-session-store.js";
|
||||
import type { CommandHandler } from "./commands-types.js";
|
||||
import { clearSessionQueues } from "./queue.js";
|
||||
|
||||
type AbortTarget = {
|
||||
entry?: SessionEntry;
|
||||
key?: string;
|
||||
sessionId?: string;
|
||||
};
|
||||
|
||||
function resolveAbortTarget(params: {
|
||||
ctx: { CommandTargetSessionKey?: string | null };
|
||||
sessionKey?: string;
|
||||
sessionEntry?: SessionEntry;
|
||||
sessionStore?: Record<string, SessionEntry>;
|
||||
}): AbortTarget {
|
||||
const targetSessionKey = params.ctx.CommandTargetSessionKey?.trim() || params.sessionKey;
|
||||
const { entry, key } = resolveSessionEntryForKey(params.sessionStore, targetSessionKey);
|
||||
if (entry && key) {
|
||||
return { entry, key, sessionId: entry.sessionId };
|
||||
}
|
||||
if (params.sessionEntry && params.sessionKey) {
|
||||
return {
|
||||
entry: params.sessionEntry,
|
||||
key: params.sessionKey,
|
||||
sessionId: params.sessionEntry.sessionId,
|
||||
};
|
||||
}
|
||||
return { entry: undefined, key: targetSessionKey, sessionId: undefined };
|
||||
}
|
||||
|
||||
function resolveAbortCutoffForTarget(params: {
|
||||
ctx: Parameters<CommandHandler>[0]["ctx"];
|
||||
commandSessionKey?: string;
|
||||
targetSessionKey?: string;
|
||||
}): AbortCutoff | undefined {
|
||||
if (
|
||||
!shouldPersistAbortCutoff({
|
||||
commandSessionKey: params.commandSessionKey,
|
||||
targetSessionKey: params.targetSessionKey,
|
||||
})
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
return resolveAbortCutoffFromContext(params.ctx);
|
||||
}
|
||||
|
||||
async function applyAbortTarget(params: {
|
||||
abortTarget: AbortTarget;
|
||||
sessionStore?: Record<string, SessionEntry>;
|
||||
storePath?: string;
|
||||
abortKey?: string;
|
||||
abortCutoff?: AbortCutoff;
|
||||
}) {
|
||||
const { abortTarget } = params;
|
||||
if (abortTarget.sessionId) {
|
||||
abortEmbeddedPiRun(abortTarget.sessionId);
|
||||
}
|
||||
|
||||
const persisted = await persistAbortTargetEntry({
|
||||
entry: abortTarget.entry,
|
||||
key: abortTarget.key,
|
||||
sessionStore: params.sessionStore,
|
||||
storePath: params.storePath,
|
||||
abortCutoff: params.abortCutoff,
|
||||
});
|
||||
if (!persisted && params.abortKey) {
|
||||
setAbortMemory(params.abortKey, true);
|
||||
}
|
||||
}
|
||||
|
||||
export const handleStopCommand: CommandHandler = async (params, allowTextCommands) => {
|
||||
if (!allowTextCommands) {
|
||||
return null;
|
||||
}
|
||||
if (params.command.commandBodyNormalized !== "/stop") {
|
||||
return null;
|
||||
}
|
||||
if (!params.command.isAuthorizedSender) {
|
||||
logVerbose(
|
||||
`Ignoring /stop from unauthorized sender: ${params.command.senderId || "<unknown>"}`,
|
||||
);
|
||||
return { shouldContinue: false };
|
||||
}
|
||||
const abortTarget = resolveAbortTarget({
|
||||
ctx: params.ctx,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionEntry: params.sessionEntry,
|
||||
sessionStore: params.sessionStore,
|
||||
});
|
||||
const cleared = clearSessionQueues([abortTarget.key, abortTarget.sessionId]);
|
||||
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
|
||||
logVerbose(
|
||||
`stop: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
|
||||
);
|
||||
}
|
||||
await applyAbortTarget({
|
||||
abortTarget,
|
||||
sessionStore: params.sessionStore,
|
||||
storePath: params.storePath,
|
||||
abortKey: params.command.abortKey,
|
||||
abortCutoff: resolveAbortCutoffForTarget({
|
||||
ctx: params.ctx,
|
||||
commandSessionKey: params.sessionKey,
|
||||
targetSessionKey: abortTarget.key,
|
||||
}),
|
||||
});
|
||||
|
||||
// Trigger internal hook for stop command
|
||||
const hookEvent = createInternalHookEvent(
|
||||
"command",
|
||||
"stop",
|
||||
abortTarget.key ?? params.sessionKey ?? "",
|
||||
{
|
||||
sessionEntry: abortTarget.entry ?? params.sessionEntry,
|
||||
sessionId: abortTarget.sessionId,
|
||||
commandSource: params.command.surface,
|
||||
senderId: params.command.senderId,
|
||||
},
|
||||
);
|
||||
await triggerInternalHook(hookEvent);
|
||||
|
||||
const { stopped } = stopSubagentsForRequester({
|
||||
cfg: params.cfg,
|
||||
requesterSessionKey: abortTarget.key ?? params.sessionKey,
|
||||
});
|
||||
|
||||
return { shouldContinue: false, reply: { text: formatAbortReplyText(stopped) } };
|
||||
};
|
||||
|
||||
export const handleAbortTrigger: CommandHandler = async (params, allowTextCommands) => {
|
||||
if (!allowTextCommands) {
|
||||
return null;
|
||||
}
|
||||
if (!isAbortTrigger(params.command.rawBodyNormalized)) {
|
||||
return null;
|
||||
}
|
||||
const abortTarget = resolveAbortTarget({
|
||||
ctx: params.ctx,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionEntry: params.sessionEntry,
|
||||
sessionStore: params.sessionStore,
|
||||
});
|
||||
await applyAbortTarget({
|
||||
abortTarget,
|
||||
sessionStore: params.sessionStore,
|
||||
storePath: params.storePath,
|
||||
abortKey: params.command.abortKey,
|
||||
abortCutoff: resolveAbortCutoffForTarget({
|
||||
ctx: params.ctx,
|
||||
commandSessionKey: params.sessionKey,
|
||||
targetSessionKey: abortTarget.key,
|
||||
}),
|
||||
});
|
||||
return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } };
|
||||
};
|
||||
53
src/auto-reply/reply/commands-session-store.ts
Normal file
53
src/auto-reply/reply/commands-session-store.ts
Normal file
@@ -0,0 +1,53 @@
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import { updateSessionStore } from "../../config/sessions.js";
|
||||
import { applyAbortCutoffToSessionEntry, type AbortCutoff } from "./abort-cutoff.js";
|
||||
import type { CommandHandler } from "./commands-types.js";
|
||||
|
||||
type CommandParams = Parameters<CommandHandler>[0];
|
||||
|
||||
export async function persistSessionEntry(params: CommandParams): Promise<boolean> {
|
||||
if (!params.sessionEntry || !params.sessionStore || !params.sessionKey) {
|
||||
return false;
|
||||
}
|
||||
params.sessionEntry.updatedAt = Date.now();
|
||||
params.sessionStore[params.sessionKey] = params.sessionEntry;
|
||||
if (params.storePath) {
|
||||
await updateSessionStore(params.storePath, (store) => {
|
||||
store[params.sessionKey] = params.sessionEntry as SessionEntry;
|
||||
});
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
export async function persistAbortTargetEntry(params: {
|
||||
entry?: SessionEntry;
|
||||
key?: string;
|
||||
sessionStore?: Record<string, SessionEntry>;
|
||||
storePath?: string;
|
||||
abortCutoff?: AbortCutoff;
|
||||
}): Promise<boolean> {
|
||||
const { entry, key, sessionStore, storePath, abortCutoff } = params;
|
||||
if (!entry || !key || !sessionStore) {
|
||||
return false;
|
||||
}
|
||||
|
||||
entry.abortedLastRun = true;
|
||||
applyAbortCutoffToSessionEntry(entry, abortCutoff);
|
||||
entry.updatedAt = Date.now();
|
||||
sessionStore[key] = entry;
|
||||
|
||||
if (storePath) {
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
const nextEntry = store[key] ?? entry;
|
||||
if (!nextEntry) {
|
||||
return;
|
||||
}
|
||||
nextEntry.abortedLastRun = true;
|
||||
applyAbortCutoffToSessionEntry(nextEntry, abortCutoff);
|
||||
nextEntry.updatedAt = Date.now();
|
||||
store[key] = nextEntry;
|
||||
});
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
@@ -1,52 +1,20 @@
|
||||
import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js";
|
||||
import { parseDurationMs } from "../../cli/parse-duration.js";
|
||||
import { isRestartEnabled } from "../../config/commands.js";
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import { updateSessionStore } from "../../config/sessions.js";
|
||||
import {
|
||||
formatThreadBindingTtlLabel,
|
||||
getThreadBindingManager,
|
||||
setThreadBindingTtlBySessionKey,
|
||||
} from "../../discord/monitor/thread-bindings.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
|
||||
import { scheduleGatewaySigusr1Restart, triggerOpenClawRestart } from "../../infra/restart.js";
|
||||
import { loadCostUsageSummary, loadSessionCostSummary } from "../../infra/session-cost-usage.js";
|
||||
import { formatTokenCount, formatUsd } from "../../utils/usage-format.js";
|
||||
import { parseActivationCommand } from "../group-activation.js";
|
||||
import { parseSendPolicyCommand } from "../send-policy.js";
|
||||
import { normalizeUsageDisplay, resolveResponseUsageMode } from "../thinking.js";
|
||||
import {
|
||||
formatAbortReplyText,
|
||||
isAbortTrigger,
|
||||
resolveAbortCutoffFromContext,
|
||||
resolveSessionEntryForKey,
|
||||
setAbortMemory,
|
||||
stopSubagentsForRequester,
|
||||
} from "./abort.js";
|
||||
import { handleAbortTrigger, handleStopCommand } from "./commands-session-abort.js";
|
||||
import { persistSessionEntry } from "./commands-session-store.js";
|
||||
import type { CommandHandler } from "./commands-types.js";
|
||||
import { clearSessionQueues } from "./queue.js";
|
||||
|
||||
function resolveAbortTarget(params: {
|
||||
ctx: { CommandTargetSessionKey?: string | null };
|
||||
sessionKey?: string;
|
||||
sessionEntry?: SessionEntry;
|
||||
sessionStore?: Record<string, SessionEntry>;
|
||||
}) {
|
||||
const targetSessionKey = params.ctx.CommandTargetSessionKey?.trim() || params.sessionKey;
|
||||
const { entry, key } = resolveSessionEntryForKey(params.sessionStore, targetSessionKey);
|
||||
if (entry && key) {
|
||||
return { entry, key, sessionId: entry.sessionId };
|
||||
}
|
||||
if (params.sessionEntry && params.sessionKey) {
|
||||
return {
|
||||
entry: params.sessionEntry,
|
||||
key: params.sessionKey,
|
||||
sessionId: params.sessionEntry.sessionId,
|
||||
};
|
||||
}
|
||||
return { entry: undefined, key: targetSessionKey, sessionId: undefined };
|
||||
}
|
||||
|
||||
const SESSION_COMMAND_PREFIX = "/session";
|
||||
const SESSION_TTL_OFF_VALUES = new Set(["off", "disable", "disabled", "none", "0"]);
|
||||
@@ -95,53 +63,6 @@ function formatSessionExpiry(expiresAt: number) {
|
||||
return new Date(expiresAt).toISOString();
|
||||
}
|
||||
|
||||
async function applyAbortTarget(params: {
|
||||
abortTarget: ReturnType<typeof resolveAbortTarget>;
|
||||
sessionStore?: Record<string, SessionEntry>;
|
||||
storePath?: string;
|
||||
abortKey?: string;
|
||||
abortCutoff?: { messageSid?: string; timestamp?: number };
|
||||
}) {
|
||||
const { abortTarget } = params;
|
||||
if (abortTarget.sessionId) {
|
||||
abortEmbeddedPiRun(abortTarget.sessionId);
|
||||
}
|
||||
if (abortTarget.entry && params.sessionStore && abortTarget.key) {
|
||||
abortTarget.entry.abortedLastRun = true;
|
||||
abortTarget.entry.abortCutoffMessageSid = params.abortCutoff?.messageSid;
|
||||
abortTarget.entry.abortCutoffTimestamp = params.abortCutoff?.timestamp;
|
||||
abortTarget.entry.updatedAt = Date.now();
|
||||
params.sessionStore[abortTarget.key] = abortTarget.entry;
|
||||
if (params.storePath) {
|
||||
await updateSessionStore(params.storePath, (store) => {
|
||||
store[abortTarget.key] = {
|
||||
...abortTarget.entry,
|
||||
abortedLastRun: true,
|
||||
abortCutoffMessageSid: params.abortCutoff?.messageSid,
|
||||
abortCutoffTimestamp: params.abortCutoff?.timestamp,
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
});
|
||||
}
|
||||
} else if (params.abortKey) {
|
||||
setAbortMemory(params.abortKey, true);
|
||||
}
|
||||
}
|
||||
|
||||
async function persistSessionEntry(params: Parameters<CommandHandler>[0]): Promise<boolean> {
|
||||
if (!params.sessionEntry || !params.sessionStore || !params.sessionKey) {
|
||||
return false;
|
||||
}
|
||||
params.sessionEntry.updatedAt = Date.now();
|
||||
params.sessionStore[params.sessionKey] = params.sessionEntry;
|
||||
if (params.storePath) {
|
||||
await updateSessionStore(params.storePath, (store) => {
|
||||
store[params.sessionKey] = params.sessionEntry as SessionEntry;
|
||||
});
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
export const handleActivationCommand: CommandHandler = async (params, allowTextCommands) => {
|
||||
if (!allowTextCommands) {
|
||||
return null;
|
||||
@@ -483,90 +404,4 @@ export const handleRestartCommand: CommandHandler = async (params, allowTextComm
|
||||
};
|
||||
};
|
||||
|
||||
export const handleStopCommand: CommandHandler = async (params, allowTextCommands) => {
|
||||
if (!allowTextCommands) {
|
||||
return null;
|
||||
}
|
||||
if (params.command.commandBodyNormalized !== "/stop") {
|
||||
return null;
|
||||
}
|
||||
if (!params.command.isAuthorizedSender) {
|
||||
logVerbose(
|
||||
`Ignoring /stop from unauthorized sender: ${params.command.senderId || "<unknown>"}`,
|
||||
);
|
||||
return { shouldContinue: false };
|
||||
}
|
||||
const abortTarget = resolveAbortTarget({
|
||||
ctx: params.ctx,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionEntry: params.sessionEntry,
|
||||
sessionStore: params.sessionStore,
|
||||
});
|
||||
const cleared = clearSessionQueues([abortTarget.key, abortTarget.sessionId]);
|
||||
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
|
||||
logVerbose(
|
||||
`stop: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
|
||||
);
|
||||
}
|
||||
await applyAbortTarget({
|
||||
abortTarget,
|
||||
sessionStore: params.sessionStore,
|
||||
storePath: params.storePath,
|
||||
abortKey: params.command.abortKey,
|
||||
abortCutoff:
|
||||
params.sessionKey?.trim() &&
|
||||
abortTarget.key?.trim() &&
|
||||
params.sessionKey.trim() === abortTarget.key.trim()
|
||||
? resolveAbortCutoffFromContext(params.ctx)
|
||||
: undefined,
|
||||
});
|
||||
|
||||
// Trigger internal hook for stop command
|
||||
const hookEvent = createInternalHookEvent(
|
||||
"command",
|
||||
"stop",
|
||||
abortTarget.key ?? params.sessionKey ?? "",
|
||||
{
|
||||
sessionEntry: abortTarget.entry ?? params.sessionEntry,
|
||||
sessionId: abortTarget.sessionId,
|
||||
commandSource: params.command.surface,
|
||||
senderId: params.command.senderId,
|
||||
},
|
||||
);
|
||||
await triggerInternalHook(hookEvent);
|
||||
|
||||
const { stopped } = stopSubagentsForRequester({
|
||||
cfg: params.cfg,
|
||||
requesterSessionKey: abortTarget.key ?? params.sessionKey,
|
||||
});
|
||||
|
||||
return { shouldContinue: false, reply: { text: formatAbortReplyText(stopped) } };
|
||||
};
|
||||
|
||||
export const handleAbortTrigger: CommandHandler = async (params, allowTextCommands) => {
|
||||
if (!allowTextCommands) {
|
||||
return null;
|
||||
}
|
||||
if (!isAbortTrigger(params.command.rawBodyNormalized)) {
|
||||
return null;
|
||||
}
|
||||
const abortTarget = resolveAbortTarget({
|
||||
ctx: params.ctx,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionEntry: params.sessionEntry,
|
||||
sessionStore: params.sessionStore,
|
||||
});
|
||||
await applyAbortTarget({
|
||||
abortTarget,
|
||||
sessionStore: params.sessionStore,
|
||||
storePath: params.storePath,
|
||||
abortKey: params.command.abortKey,
|
||||
abortCutoff:
|
||||
params.sessionKey?.trim() &&
|
||||
abortTarget.key?.trim() &&
|
||||
params.sessionKey.trim() === abortTarget.key.trim()
|
||||
? resolveAbortCutoffFromContext(params.ctx)
|
||||
: undefined,
|
||||
});
|
||||
return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } };
|
||||
};
|
||||
export { handleAbortTrigger, handleStopCommand };
|
||||
|
||||
@@ -5,7 +5,6 @@ import { applyOwnerOnlyToolPolicy } from "../../agents/tool-policy.js";
|
||||
import { getChannelDock } from "../../channels/dock.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import { updateSessionStore } from "../../config/sessions.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { generateSecureToken } from "../../infra/secure-random.js";
|
||||
import { resolveGatewayMessageChannel } from "../../utils/message-channel.js";
|
||||
@@ -17,7 +16,13 @@ import {
|
||||
import type { MsgContext, TemplateContext } from "../templating.js";
|
||||
import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../thinking.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
import { getAbortMemory, isAbortRequestText, shouldSkipMessageByAbortCutoff } from "./abort.js";
|
||||
import {
|
||||
clearAbortCutoffInSession,
|
||||
readAbortCutoffFromSessionEntry,
|
||||
resolveAbortCutoffFromContext,
|
||||
shouldSkipMessageByAbortCutoff,
|
||||
} from "./abort-cutoff.js";
|
||||
import { getAbortMemory, isAbortRequestText } from "./abort.js";
|
||||
import { buildStatusReply, handleCommands } from "./commands.js";
|
||||
import type { InlineDirectives } from "./directive-handling.js";
|
||||
import { isDirectiveOnly } from "./directive-handling.js";
|
||||
@@ -253,54 +258,29 @@ export async function handleInlineActions(params: {
|
||||
await opts.onBlockReply(reply);
|
||||
};
|
||||
|
||||
const clearAbortCutoff = async () => {
|
||||
if (!sessionEntry || !sessionStore || !sessionKey) {
|
||||
return;
|
||||
}
|
||||
if (
|
||||
sessionEntry.abortCutoffMessageSid === undefined &&
|
||||
sessionEntry.abortCutoffTimestamp === undefined
|
||||
) {
|
||||
return;
|
||||
}
|
||||
sessionEntry.abortCutoffMessageSid = undefined;
|
||||
sessionEntry.abortCutoffTimestamp = undefined;
|
||||
sessionEntry.updatedAt = Date.now();
|
||||
sessionStore[sessionKey] = sessionEntry;
|
||||
if (storePath) {
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
const existing = store[sessionKey] ?? sessionEntry;
|
||||
if (!existing) {
|
||||
return;
|
||||
}
|
||||
existing.abortCutoffMessageSid = undefined;
|
||||
existing.abortCutoffTimestamp = undefined;
|
||||
existing.updatedAt = Date.now();
|
||||
store[sessionKey] = existing;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const isStopLikeInbound = isAbortRequestText(command.rawBodyNormalized);
|
||||
if (!isStopLikeInbound && sessionEntry) {
|
||||
const shouldSkip = shouldSkipMessageByAbortCutoff({
|
||||
cutoffMessageSid: sessionEntry.abortCutoffMessageSid,
|
||||
cutoffTimestamp: sessionEntry.abortCutoffTimestamp,
|
||||
messageSid:
|
||||
(typeof ctx.MessageSidFull === "string" && ctx.MessageSidFull.trim()) ||
|
||||
(typeof ctx.MessageSid === "string" && ctx.MessageSid.trim()) ||
|
||||
undefined,
|
||||
timestamp: typeof ctx.Timestamp === "number" ? ctx.Timestamp : undefined,
|
||||
});
|
||||
const cutoff = readAbortCutoffFromSessionEntry(sessionEntry);
|
||||
const incoming = resolveAbortCutoffFromContext(ctx);
|
||||
const shouldSkip = cutoff
|
||||
? shouldSkipMessageByAbortCutoff({
|
||||
cutoffMessageSid: cutoff.messageSid,
|
||||
cutoffTimestamp: cutoff.timestamp,
|
||||
messageSid: incoming?.messageSid,
|
||||
timestamp: incoming?.timestamp,
|
||||
})
|
||||
: false;
|
||||
if (shouldSkip) {
|
||||
typing.cleanup();
|
||||
return { kind: "reply", reply: undefined };
|
||||
}
|
||||
if (
|
||||
sessionEntry.abortCutoffMessageSid !== undefined ||
|
||||
sessionEntry.abortCutoffTimestamp !== undefined
|
||||
) {
|
||||
await clearAbortCutoff();
|
||||
if (cutoff) {
|
||||
await clearAbortCutoffInSession({
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
sessionKey,
|
||||
storePath,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
49
src/cron/service/timeout-policy.test.ts
Normal file
49
src/cron/service/timeout-policy.test.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import type { CronJob } from "../types.js";
|
||||
import {
|
||||
AGENT_TURN_SAFETY_TIMEOUT_MS,
|
||||
DEFAULT_JOB_TIMEOUT_MS,
|
||||
resolveCronJobTimeoutMs,
|
||||
} from "./timeout-policy.js";
|
||||
|
||||
function makeJob(payload: CronJob["payload"]): CronJob {
|
||||
const sessionTarget = payload.kind === "agentTurn" ? "isolated" : "main";
|
||||
return {
|
||||
id: "job-1",
|
||||
name: "job",
|
||||
createdAtMs: 0,
|
||||
updatedAtMs: 0,
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget,
|
||||
wakeMode: "next-heartbeat",
|
||||
payload,
|
||||
state: {},
|
||||
};
|
||||
}
|
||||
|
||||
describe("timeout-policy", () => {
|
||||
it("uses default timeout for non-agent jobs", () => {
|
||||
const timeout = resolveCronJobTimeoutMs(makeJob({ kind: "systemEvent", text: "hello" }));
|
||||
expect(timeout).toBe(DEFAULT_JOB_TIMEOUT_MS);
|
||||
});
|
||||
|
||||
it("uses expanded safety timeout for agentTurn jobs without explicit timeout", () => {
|
||||
const timeout = resolveCronJobTimeoutMs(makeJob({ kind: "agentTurn", message: "hi" }));
|
||||
expect(timeout).toBe(AGENT_TURN_SAFETY_TIMEOUT_MS);
|
||||
});
|
||||
|
||||
it("disables timeout when timeoutSeconds <= 0", () => {
|
||||
const timeout = resolveCronJobTimeoutMs(
|
||||
makeJob({ kind: "agentTurn", message: "hi", timeoutSeconds: 0 }),
|
||||
);
|
||||
expect(timeout).toBeUndefined();
|
||||
});
|
||||
|
||||
it("applies explicit timeoutSeconds when positive", () => {
|
||||
const timeout = resolveCronJobTimeoutMs(
|
||||
makeJob({ kind: "agentTurn", message: "hi", timeoutSeconds: 1.9 }),
|
||||
);
|
||||
expect(timeout).toBe(1_900);
|
||||
});
|
||||
});
|
||||
25
src/cron/service/timeout-policy.ts
Normal file
25
src/cron/service/timeout-policy.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
import type { CronJob } from "../types.js";
|
||||
|
||||
/**
|
||||
* Maximum wall-clock time for a single job execution. Acts as a safety net
|
||||
* on top of per-provider/per-agent timeouts to prevent one stuck job from
|
||||
* wedging the entire cron lane.
|
||||
*/
|
||||
export const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes
|
||||
|
||||
/**
|
||||
* Agent turns can legitimately run much longer than generic cron jobs.
|
||||
* Use a larger safety ceiling when no explicit timeout is set.
|
||||
*/
|
||||
export const AGENT_TURN_SAFETY_TIMEOUT_MS = 60 * 60_000; // 60 minutes
|
||||
|
||||
export function resolveCronJobTimeoutMs(job: CronJob): number | undefined {
|
||||
const configuredTimeoutMs =
|
||||
job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number"
|
||||
? Math.floor(job.payload.timeoutSeconds * 1_000)
|
||||
: undefined;
|
||||
if (configuredTimeoutMs === undefined) {
|
||||
return job.payload.kind === "agentTurn" ? AGENT_TURN_SAFETY_TIMEOUT_MS : DEFAULT_JOB_TIMEOUT_MS;
|
||||
}
|
||||
return configuredTimeoutMs <= 0 ? undefined : configuredTimeoutMs;
|
||||
}
|
||||
@@ -18,6 +18,9 @@ import {
|
||||
import { locked } from "./locked.js";
|
||||
import type { CronEvent, CronServiceState } from "./state.js";
|
||||
import { ensureLoaded, persist } from "./store.js";
|
||||
import { DEFAULT_JOB_TIMEOUT_MS, resolveCronJobTimeoutMs } from "./timeout-policy.js";
|
||||
|
||||
export { DEFAULT_JOB_TIMEOUT_MS } from "./timeout-policy.js";
|
||||
|
||||
const MAX_TIMER_DELAY_MS = 60_000;
|
||||
|
||||
@@ -30,14 +33,6 @@ const MAX_TIMER_DELAY_MS = 60_000;
|
||||
*/
|
||||
const MIN_REFIRE_GAP_MS = 2_000;
|
||||
|
||||
/**
|
||||
* Maximum wall-clock time for a single job execution. Acts as a safety net
|
||||
* on top of the per-provider / per-agent timeouts to prevent one stuck job
|
||||
* from wedging the entire cron lane.
|
||||
*/
|
||||
export const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes
|
||||
const AGENT_TURN_SAFETY_TIMEOUT_MS = 60 * 60_000; // 60 minutes
|
||||
|
||||
type TimedCronRunOutcome = CronRunOutcome &
|
||||
CronRunTelemetry & {
|
||||
jobId: string;
|
||||
@@ -47,17 +42,6 @@ type TimedCronRunOutcome = CronRunOutcome &
|
||||
endedAt: number;
|
||||
};
|
||||
|
||||
function resolveCronJobTimeoutMs(job: CronJob): number | undefined {
|
||||
const configuredTimeoutMs =
|
||||
job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number"
|
||||
? Math.floor(job.payload.timeoutSeconds * 1_000)
|
||||
: undefined;
|
||||
if (configuredTimeoutMs === undefined) {
|
||||
return job.payload.kind === "agentTurn" ? AGENT_TURN_SAFETY_TIMEOUT_MS : DEFAULT_JOB_TIMEOUT_MS;
|
||||
}
|
||||
return configuredTimeoutMs <= 0 ? undefined : configuredTimeoutMs;
|
||||
}
|
||||
|
||||
export async function executeJobCoreWithTimeout(
|
||||
state: CronServiceState,
|
||||
job: CronJob,
|
||||
|
||||
Reference in New Issue
Block a user