mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-20 21:23:23 +00:00
refactor: split subagent registry helpers
This commit is contained in:
355
src/agents/subagent-registry-helpers.ts
Normal file
355
src/agents/subagent-registry-helpers.ts
Normal file
@@ -0,0 +1,355 @@
|
||||
import { promises as fs } from "node:fs";
|
||||
import path from "node:path";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveStorePath,
|
||||
updateSessionStore,
|
||||
type SessionEntry,
|
||||
} from "../config/sessions.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { type SubagentRunOutcome } from "./subagent-announce.js";
|
||||
import {
|
||||
SUBAGENT_ENDED_REASON_ERROR,
|
||||
SUBAGENT_ENDED_REASON_KILLED,
|
||||
} from "./subagent-lifecycle-events.js";
|
||||
import { runOutcomesEqual } from "./subagent-registry-completion.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
|
||||
export const MIN_ANNOUNCE_RETRY_DELAY_MS = 1_000;
|
||||
export const MAX_ANNOUNCE_RETRY_DELAY_MS = 8_000;
|
||||
export const MAX_ANNOUNCE_RETRY_COUNT = 3;
|
||||
export const ANNOUNCE_EXPIRY_MS = 5 * 60_000;
|
||||
export const ANNOUNCE_COMPLETION_HARD_EXPIRY_MS = 30 * 60_000;
|
||||
|
||||
const FROZEN_RESULT_TEXT_MAX_BYTES = 100 * 1024;
|
||||
|
||||
export type SubagentRunOrphanReason = "missing-session-entry" | "missing-session-id";
|
||||
|
||||
export function capFrozenResultText(resultText: string): string {
|
||||
const trimmed = resultText.trim();
|
||||
if (!trimmed) {
|
||||
return "";
|
||||
}
|
||||
const totalBytes = Buffer.byteLength(trimmed, "utf8");
|
||||
if (totalBytes <= FROZEN_RESULT_TEXT_MAX_BYTES) {
|
||||
return trimmed;
|
||||
}
|
||||
const notice = `\n\n[truncated: frozen completion output exceeded ${Math.round(FROZEN_RESULT_TEXT_MAX_BYTES / 1024)}KB (${Math.round(totalBytes / 1024)}KB)]`;
|
||||
const maxPayloadBytes = Math.max(
|
||||
0,
|
||||
FROZEN_RESULT_TEXT_MAX_BYTES - Buffer.byteLength(notice, "utf8"),
|
||||
);
|
||||
const payload = Buffer.from(trimmed, "utf8").subarray(0, maxPayloadBytes).toString("utf8");
|
||||
return `${payload}${notice}`;
|
||||
}
|
||||
|
||||
export function resolveAnnounceRetryDelayMs(retryCount: number) {
|
||||
const boundedRetryCount = Math.max(0, Math.min(retryCount, 10));
|
||||
// retryCount is "attempts already made", so retry #1 waits 1s, then 2s, 4s...
|
||||
const backoffExponent = Math.max(0, boundedRetryCount - 1);
|
||||
const baseDelay = MIN_ANNOUNCE_RETRY_DELAY_MS * 2 ** backoffExponent;
|
||||
return Math.min(baseDelay, MAX_ANNOUNCE_RETRY_DELAY_MS);
|
||||
}
|
||||
|
||||
export function logAnnounceGiveUp(entry: SubagentRunRecord, reason: "retry-limit" | "expiry") {
|
||||
const retryCount = entry.announceRetryCount ?? 0;
|
||||
const endedAgoMs =
|
||||
typeof entry.endedAt === "number" ? Math.max(0, Date.now() - entry.endedAt) : undefined;
|
||||
const endedAgoLabel = endedAgoMs != null ? `${Math.round(endedAgoMs / 1000)}s` : "n/a";
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent announce give up (${reason}) run=${entry.runId} child=${entry.childSessionKey} requester=${entry.requesterSessionKey} retries=${retryCount} endedAgo=${endedAgoLabel}`,
|
||||
);
|
||||
}
|
||||
|
||||
function findSessionEntryByKey(store: Record<string, SessionEntry>, sessionKey: string) {
|
||||
const direct = store[sessionKey];
|
||||
if (direct) {
|
||||
return direct;
|
||||
}
|
||||
const normalized = sessionKey.toLowerCase();
|
||||
for (const [key, entry] of Object.entries(store)) {
|
||||
if (key.toLowerCase() === normalized) {
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function resolveSubagentSessionStatus(
|
||||
entry: Pick<SubagentRunRecord, "endedAt" | "endedReason" | "outcome"> | null | undefined,
|
||||
): SessionEntry["status"] {
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
if (!entry.endedAt) {
|
||||
return "running";
|
||||
}
|
||||
if (entry.endedReason === SUBAGENT_ENDED_REASON_KILLED) {
|
||||
return "killed";
|
||||
}
|
||||
const status = entry.outcome?.status;
|
||||
if (status === "error") {
|
||||
return "failed";
|
||||
}
|
||||
if (status === "timeout") {
|
||||
return "timeout";
|
||||
}
|
||||
return "done";
|
||||
}
|
||||
|
||||
function resolveSubagentSessionStartedAt(
|
||||
entry: Pick<SubagentRunRecord, "sessionStartedAt" | "startedAt" | "createdAt">,
|
||||
): number | undefined {
|
||||
if (typeof entry.sessionStartedAt === "number" && Number.isFinite(entry.sessionStartedAt)) {
|
||||
return entry.sessionStartedAt;
|
||||
}
|
||||
if (typeof entry.startedAt === "number" && Number.isFinite(entry.startedAt)) {
|
||||
return entry.startedAt;
|
||||
}
|
||||
return typeof entry.createdAt === "number" && Number.isFinite(entry.createdAt)
|
||||
? entry.createdAt
|
||||
: undefined;
|
||||
}
|
||||
|
||||
export function getSubagentSessionStartedAt(
|
||||
entry: Pick<SubagentRunRecord, "sessionStartedAt" | "startedAt" | "createdAt"> | null | undefined,
|
||||
): number | undefined {
|
||||
return entry ? resolveSubagentSessionStartedAt(entry) : undefined;
|
||||
}
|
||||
|
||||
export function getSubagentSessionRuntimeMs(
|
||||
entry:
|
||||
| Pick<SubagentRunRecord, "startedAt" | "endedAt" | "accumulatedRuntimeMs">
|
||||
| null
|
||||
| undefined,
|
||||
now = Date.now(),
|
||||
): number | undefined {
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const accumulatedRuntimeMs =
|
||||
typeof entry.accumulatedRuntimeMs === "number" && Number.isFinite(entry.accumulatedRuntimeMs)
|
||||
? Math.max(0, entry.accumulatedRuntimeMs)
|
||||
: 0;
|
||||
|
||||
if (typeof entry.startedAt !== "number" || !Number.isFinite(entry.startedAt)) {
|
||||
return entry.accumulatedRuntimeMs != null ? accumulatedRuntimeMs : undefined;
|
||||
}
|
||||
|
||||
const currentRunEndedAt =
|
||||
typeof entry.endedAt === "number" && Number.isFinite(entry.endedAt) ? entry.endedAt : now;
|
||||
return Math.max(0, accumulatedRuntimeMs + Math.max(0, currentRunEndedAt - entry.startedAt));
|
||||
}
|
||||
|
||||
export async function persistSubagentSessionTiming(entry: SubagentRunRecord) {
|
||||
const childSessionKey = entry.childSessionKey?.trim();
|
||||
if (!childSessionKey) {
|
||||
return;
|
||||
}
|
||||
|
||||
const cfg = loadConfig();
|
||||
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
const startedAt = getSubagentSessionStartedAt(entry);
|
||||
const endedAt =
|
||||
typeof entry.endedAt === "number" && Number.isFinite(entry.endedAt) ? entry.endedAt : undefined;
|
||||
const runtimeMs =
|
||||
endedAt !== undefined
|
||||
? getSubagentSessionRuntimeMs(entry, endedAt)
|
||||
: getSubagentSessionRuntimeMs(entry);
|
||||
const status = resolveSubagentSessionStatus(entry);
|
||||
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
const sessionEntry = findSessionEntryByKey(store, childSessionKey);
|
||||
if (!sessionEntry) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (typeof startedAt === "number" && Number.isFinite(startedAt)) {
|
||||
sessionEntry.startedAt = startedAt;
|
||||
} else {
|
||||
delete sessionEntry.startedAt;
|
||||
}
|
||||
|
||||
if (typeof endedAt === "number" && Number.isFinite(endedAt)) {
|
||||
sessionEntry.endedAt = endedAt;
|
||||
} else {
|
||||
delete sessionEntry.endedAt;
|
||||
}
|
||||
|
||||
if (typeof runtimeMs === "number" && Number.isFinite(runtimeMs)) {
|
||||
sessionEntry.runtimeMs = runtimeMs;
|
||||
} else {
|
||||
delete sessionEntry.runtimeMs;
|
||||
}
|
||||
|
||||
if (status) {
|
||||
sessionEntry.status = status;
|
||||
} else {
|
||||
delete sessionEntry.status;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export function resolveSubagentRunOrphanReason(params: {
|
||||
entry: SubagentRunRecord;
|
||||
storeCache?: Map<string, Record<string, SessionEntry>>;
|
||||
}): SubagentRunOrphanReason | null {
|
||||
const childSessionKey = params.entry.childSessionKey?.trim();
|
||||
if (!childSessionKey) {
|
||||
return "missing-session-entry";
|
||||
}
|
||||
try {
|
||||
const cfg = loadConfig();
|
||||
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
let store = params.storeCache?.get(storePath);
|
||||
if (!store) {
|
||||
store = loadSessionStore(storePath);
|
||||
params.storeCache?.set(storePath, store);
|
||||
}
|
||||
const sessionEntry = findSessionEntryByKey(store, childSessionKey);
|
||||
if (!sessionEntry) {
|
||||
return "missing-session-entry";
|
||||
}
|
||||
if (typeof sessionEntry.sessionId !== "string" || !sessionEntry.sessionId.trim()) {
|
||||
return "missing-session-id";
|
||||
}
|
||||
return null;
|
||||
} catch {
|
||||
// Best-effort guard: avoid false orphan pruning on transient read/config failures.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export async function safeRemoveAttachmentsDir(entry: SubagentRunRecord): Promise<void> {
|
||||
if (!entry.attachmentsDir || !entry.attachmentsRootDir) {
|
||||
return;
|
||||
}
|
||||
|
||||
const resolveReal = async (targetPath: string): Promise<string | null> => {
|
||||
try {
|
||||
return await fs.realpath(targetPath);
|
||||
} catch (err) {
|
||||
if ((err as NodeJS.ErrnoException | undefined)?.code === "ENOENT") {
|
||||
return null;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
const [rootReal, dirReal] = await Promise.all([
|
||||
resolveReal(entry.attachmentsRootDir),
|
||||
resolveReal(entry.attachmentsDir),
|
||||
]);
|
||||
if (!dirReal) {
|
||||
return;
|
||||
}
|
||||
|
||||
const rootBase = rootReal ?? path.resolve(entry.attachmentsRootDir);
|
||||
const dirBase = dirReal;
|
||||
const rootWithSep = rootBase.endsWith(path.sep) ? rootBase : `${rootBase}${path.sep}`;
|
||||
if (!dirBase.startsWith(rootWithSep)) {
|
||||
return;
|
||||
}
|
||||
await fs.rm(dirBase, { recursive: true, force: true });
|
||||
} catch {
|
||||
// best effort
|
||||
}
|
||||
}
|
||||
|
||||
export function reconcileOrphanedRun(params: {
|
||||
runId: string;
|
||||
entry: SubagentRunRecord;
|
||||
reason: SubagentRunOrphanReason;
|
||||
source: "restore" | "resume";
|
||||
runs: Map<string, SubagentRunRecord>;
|
||||
resumedRuns: Set<string>;
|
||||
}) {
|
||||
const now = Date.now();
|
||||
let changed = false;
|
||||
if (typeof params.entry.endedAt !== "number") {
|
||||
params.entry.endedAt = now;
|
||||
changed = true;
|
||||
}
|
||||
const orphanOutcome: SubagentRunOutcome = {
|
||||
status: "error",
|
||||
error: `orphaned subagent run (${params.reason})`,
|
||||
};
|
||||
if (!runOutcomesEqual(params.entry.outcome, orphanOutcome)) {
|
||||
params.entry.outcome = orphanOutcome;
|
||||
changed = true;
|
||||
}
|
||||
if (params.entry.endedReason !== SUBAGENT_ENDED_REASON_ERROR) {
|
||||
params.entry.endedReason = SUBAGENT_ENDED_REASON_ERROR;
|
||||
changed = true;
|
||||
}
|
||||
if (params.entry.cleanupHandled !== true) {
|
||||
params.entry.cleanupHandled = true;
|
||||
changed = true;
|
||||
}
|
||||
if (typeof params.entry.cleanupCompletedAt !== "number") {
|
||||
params.entry.cleanupCompletedAt = now;
|
||||
changed = true;
|
||||
}
|
||||
const shouldDeleteAttachments =
|
||||
params.entry.cleanup === "delete" || !params.entry.retainAttachmentsOnKeep;
|
||||
if (shouldDeleteAttachments) {
|
||||
void safeRemoveAttachmentsDir(params.entry);
|
||||
}
|
||||
const removed = params.runs.delete(params.runId);
|
||||
params.resumedRuns.delete(params.runId);
|
||||
if (!removed && !changed) {
|
||||
return false;
|
||||
}
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent orphan run pruned source=${params.source} run=${params.runId} child=${params.entry.childSessionKey} reason=${params.reason}`,
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
export function reconcileOrphanedRestoredRuns(params: {
|
||||
runs: Map<string, SubagentRunRecord>;
|
||||
resumedRuns: Set<string>;
|
||||
}) {
|
||||
const storeCache = new Map<string, Record<string, SessionEntry>>();
|
||||
let changed = false;
|
||||
for (const [runId, entry] of params.runs.entries()) {
|
||||
const orphanReason = resolveSubagentRunOrphanReason({
|
||||
entry,
|
||||
storeCache,
|
||||
});
|
||||
if (!orphanReason) {
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
reconcileOrphanedRun({
|
||||
runId,
|
||||
entry,
|
||||
reason: orphanReason,
|
||||
source: "restore",
|
||||
runs: params.runs,
|
||||
resumedRuns: params.resumedRuns,
|
||||
})
|
||||
) {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
export function resolveArchiveAfterMs(cfg?: ReturnType<typeof loadConfig>) {
|
||||
const config = cfg ?? loadConfig();
|
||||
const minutes = config.agents?.defaults?.subagents?.archiveAfterMinutes ?? 60;
|
||||
if (!Number.isFinite(minutes) || minutes < 0) {
|
||||
return undefined;
|
||||
}
|
||||
if (minutes === 0) {
|
||||
return undefined;
|
||||
}
|
||||
return Math.max(1, Math.floor(minutes)) * 60_000;
|
||||
}
|
||||
@@ -1,14 +1,5 @@
|
||||
import { promises as fs } from "node:fs";
|
||||
import path from "node:path";
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveStorePath,
|
||||
updateSessionStore,
|
||||
type SessionEntry,
|
||||
} from "../config/sessions.js";
|
||||
import { ensureContextEnginesInitialized } from "../context-engine/init.js";
|
||||
import { resolveContextEngine } from "../context-engine/registry.js";
|
||||
import type { SubagentEndReason } from "../context-engine/types.js";
|
||||
@@ -41,6 +32,24 @@ import {
|
||||
resolveLifecycleOutcomeFromRunOutcome,
|
||||
runOutcomesEqual,
|
||||
} from "./subagent-registry-completion.js";
|
||||
import {
|
||||
ANNOUNCE_COMPLETION_HARD_EXPIRY_MS,
|
||||
ANNOUNCE_EXPIRY_MS,
|
||||
capFrozenResultText,
|
||||
getSubagentSessionRuntimeMs,
|
||||
getSubagentSessionStartedAt,
|
||||
logAnnounceGiveUp,
|
||||
MAX_ANNOUNCE_RETRY_COUNT,
|
||||
MIN_ANNOUNCE_RETRY_DELAY_MS,
|
||||
persistSubagentSessionTiming,
|
||||
reconcileOrphanedRestoredRuns,
|
||||
reconcileOrphanedRun,
|
||||
resolveAnnounceRetryDelayMs,
|
||||
resolveArchiveAfterMs,
|
||||
resolveSubagentRunOrphanReason,
|
||||
resolveSubagentSessionStatus,
|
||||
safeRemoveAttachmentsDir,
|
||||
} from "./subagent-registry-helpers.js";
|
||||
import { subagentRuns } from "./subagent-registry-memory.js";
|
||||
import {
|
||||
countActiveDescendantRunsFromRuns,
|
||||
@@ -63,6 +72,11 @@ import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
import { resolveAgentTimeoutMs } from "./timeout.js";
|
||||
|
||||
export type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
export {
|
||||
getSubagentSessionRuntimeMs,
|
||||
getSubagentSessionStartedAt,
|
||||
resolveSubagentSessionStatus,
|
||||
} from "./subagent-registry-helpers.js";
|
||||
const log = createSubsystemLogger("agents/subagent-registry");
|
||||
|
||||
let sweeper: NodeJS.Timeout | null = null;
|
||||
@@ -71,263 +85,17 @@ let listenerStop: (() => void) | null = null;
|
||||
// Use var to avoid TDZ when init runs across circular imports during bootstrap.
|
||||
var restoreAttempted = false;
|
||||
const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
|
||||
const MIN_ANNOUNCE_RETRY_DELAY_MS = 1_000;
|
||||
const MAX_ANNOUNCE_RETRY_DELAY_MS = 8_000;
|
||||
/**
|
||||
* Maximum number of announce delivery attempts before giving up.
|
||||
* Prevents infinite retry loops when `runSubagentAnnounceFlow` repeatedly
|
||||
* returns `false` due to stale state or transient conditions (#18264).
|
||||
*/
|
||||
const MAX_ANNOUNCE_RETRY_COUNT = 3;
|
||||
/**
|
||||
* Non-completion announce entries older than this are force-expired even if
|
||||
* delivery never succeeded.
|
||||
*/
|
||||
const ANNOUNCE_EXPIRY_MS = 5 * 60_000; // 5 minutes
|
||||
/**
|
||||
* Completion-message flows can wait for descendants to finish, but this hard
|
||||
* cap prevents indefinite pending state when descendants never fully settle.
|
||||
*/
|
||||
const ANNOUNCE_COMPLETION_HARD_EXPIRY_MS = 30 * 60_000; // 30 minutes
|
||||
type SubagentRunOrphanReason = "missing-session-entry" | "missing-session-id";
|
||||
/**
|
||||
* Embedded runs can emit transient lifecycle `error` events while provider/model
|
||||
* retry is still in progress. Defer terminal error cleanup briefly so a
|
||||
* subsequent lifecycle `start` / `end` can cancel premature failure announces.
|
||||
*/
|
||||
const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
|
||||
const FROZEN_RESULT_TEXT_MAX_BYTES = 100 * 1024;
|
||||
|
||||
function capFrozenResultText(resultText: string): string {
|
||||
const trimmed = resultText.trim();
|
||||
if (!trimmed) {
|
||||
return "";
|
||||
}
|
||||
const totalBytes = Buffer.byteLength(trimmed, "utf8");
|
||||
if (totalBytes <= FROZEN_RESULT_TEXT_MAX_BYTES) {
|
||||
return trimmed;
|
||||
}
|
||||
const notice = `\n\n[truncated: frozen completion output exceeded ${Math.round(FROZEN_RESULT_TEXT_MAX_BYTES / 1024)}KB (${Math.round(totalBytes / 1024)}KB)]`;
|
||||
const maxPayloadBytes = Math.max(
|
||||
0,
|
||||
FROZEN_RESULT_TEXT_MAX_BYTES - Buffer.byteLength(notice, "utf8"),
|
||||
);
|
||||
const payload = Buffer.from(trimmed, "utf8").subarray(0, maxPayloadBytes).toString("utf8");
|
||||
return `${payload}${notice}`;
|
||||
}
|
||||
|
||||
function resolveAnnounceRetryDelayMs(retryCount: number) {
|
||||
const boundedRetryCount = Math.max(0, Math.min(retryCount, 10));
|
||||
// retryCount is "attempts already made", so retry #1 waits 1s, then 2s, 4s...
|
||||
const backoffExponent = Math.max(0, boundedRetryCount - 1);
|
||||
const baseDelay = MIN_ANNOUNCE_RETRY_DELAY_MS * 2 ** backoffExponent;
|
||||
return Math.min(baseDelay, MAX_ANNOUNCE_RETRY_DELAY_MS);
|
||||
}
|
||||
|
||||
function logAnnounceGiveUp(entry: SubagentRunRecord, reason: "retry-limit" | "expiry") {
|
||||
const retryCount = entry.announceRetryCount ?? 0;
|
||||
const endedAgoMs =
|
||||
typeof entry.endedAt === "number" ? Math.max(0, Date.now() - entry.endedAt) : undefined;
|
||||
const endedAgoLabel = endedAgoMs != null ? `${Math.round(endedAgoMs / 1000)}s` : "n/a";
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent announce give up (${reason}) run=${entry.runId} child=${entry.childSessionKey} requester=${entry.requesterSessionKey} retries=${retryCount} endedAgo=${endedAgoLabel}`,
|
||||
);
|
||||
}
|
||||
|
||||
function persistSubagentRuns() {
|
||||
persistSubagentRunsToDisk(subagentRuns);
|
||||
}
|
||||
|
||||
function findSessionEntryByKey(store: Record<string, SessionEntry>, sessionKey: string) {
|
||||
const direct = store[sessionKey];
|
||||
if (direct) {
|
||||
return direct;
|
||||
}
|
||||
const normalized = sessionKey.toLowerCase();
|
||||
for (const [key, entry] of Object.entries(store)) {
|
||||
if (key.toLowerCase() === normalized) {
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function resolveSubagentSessionStatus(
|
||||
entry: Pick<SubagentRunRecord, "endedAt" | "endedReason" | "outcome"> | null | undefined,
|
||||
): SessionEntry["status"] {
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
if (!entry.endedAt) {
|
||||
return "running";
|
||||
}
|
||||
if (entry.endedReason === SUBAGENT_ENDED_REASON_KILLED) {
|
||||
return "killed";
|
||||
}
|
||||
const status = entry.outcome?.status;
|
||||
if (status === "error") {
|
||||
return "failed";
|
||||
}
|
||||
if (status === "timeout") {
|
||||
return "timeout";
|
||||
}
|
||||
return "done";
|
||||
}
|
||||
|
||||
async function persistSubagentSessionTiming(entry: SubagentRunRecord) {
|
||||
const childSessionKey = entry.childSessionKey?.trim();
|
||||
if (!childSessionKey) {
|
||||
return;
|
||||
}
|
||||
|
||||
const cfg = loadConfig();
|
||||
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
const startedAt = getSubagentSessionStartedAt(entry);
|
||||
const endedAt =
|
||||
typeof entry.endedAt === "number" && Number.isFinite(entry.endedAt) ? entry.endedAt : undefined;
|
||||
const runtimeMs =
|
||||
endedAt !== undefined
|
||||
? getSubagentSessionRuntimeMs(entry, endedAt)
|
||||
: getSubagentSessionRuntimeMs(entry);
|
||||
const status = resolveSubagentSessionStatus(entry);
|
||||
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
const sessionEntry = findSessionEntryByKey(store, childSessionKey);
|
||||
if (!sessionEntry) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (typeof startedAt === "number" && Number.isFinite(startedAt)) {
|
||||
sessionEntry.startedAt = startedAt;
|
||||
} else {
|
||||
delete sessionEntry.startedAt;
|
||||
}
|
||||
|
||||
if (typeof endedAt === "number" && Number.isFinite(endedAt)) {
|
||||
sessionEntry.endedAt = endedAt;
|
||||
} else {
|
||||
delete sessionEntry.endedAt;
|
||||
}
|
||||
|
||||
if (typeof runtimeMs === "number" && Number.isFinite(runtimeMs)) {
|
||||
sessionEntry.runtimeMs = runtimeMs;
|
||||
} else {
|
||||
delete sessionEntry.runtimeMs;
|
||||
}
|
||||
|
||||
if (status) {
|
||||
sessionEntry.status = status;
|
||||
} else {
|
||||
delete sessionEntry.status;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function resolveSubagentRunOrphanReason(params: {
|
||||
entry: SubagentRunRecord;
|
||||
storeCache?: Map<string, Record<string, SessionEntry>>;
|
||||
}): SubagentRunOrphanReason | null {
|
||||
const childSessionKey = params.entry.childSessionKey?.trim();
|
||||
if (!childSessionKey) {
|
||||
return "missing-session-entry";
|
||||
}
|
||||
try {
|
||||
const cfg = loadConfig();
|
||||
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
let store = params.storeCache?.get(storePath);
|
||||
if (!store) {
|
||||
store = loadSessionStore(storePath);
|
||||
params.storeCache?.set(storePath, store);
|
||||
}
|
||||
const sessionEntry = findSessionEntryByKey(store, childSessionKey);
|
||||
if (!sessionEntry) {
|
||||
return "missing-session-entry";
|
||||
}
|
||||
if (typeof sessionEntry.sessionId !== "string" || !sessionEntry.sessionId.trim()) {
|
||||
return "missing-session-id";
|
||||
}
|
||||
return null;
|
||||
} catch {
|
||||
// Best-effort guard: avoid false orphan pruning on transient read/config failures.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function reconcileOrphanedRun(params: {
|
||||
runId: string;
|
||||
entry: SubagentRunRecord;
|
||||
reason: SubagentRunOrphanReason;
|
||||
source: "restore" | "resume";
|
||||
}) {
|
||||
const now = Date.now();
|
||||
let changed = false;
|
||||
if (typeof params.entry.endedAt !== "number") {
|
||||
params.entry.endedAt = now;
|
||||
changed = true;
|
||||
}
|
||||
const orphanOutcome: SubagentRunOutcome = {
|
||||
status: "error",
|
||||
error: `orphaned subagent run (${params.reason})`,
|
||||
};
|
||||
if (!runOutcomesEqual(params.entry.outcome, orphanOutcome)) {
|
||||
params.entry.outcome = orphanOutcome;
|
||||
changed = true;
|
||||
}
|
||||
if (params.entry.endedReason !== SUBAGENT_ENDED_REASON_ERROR) {
|
||||
params.entry.endedReason = SUBAGENT_ENDED_REASON_ERROR;
|
||||
changed = true;
|
||||
}
|
||||
if (params.entry.cleanupHandled !== true) {
|
||||
params.entry.cleanupHandled = true;
|
||||
changed = true;
|
||||
}
|
||||
if (typeof params.entry.cleanupCompletedAt !== "number") {
|
||||
params.entry.cleanupCompletedAt = now;
|
||||
changed = true;
|
||||
}
|
||||
const shouldDeleteAttachments =
|
||||
params.entry.cleanup === "delete" || !params.entry.retainAttachmentsOnKeep;
|
||||
if (shouldDeleteAttachments) {
|
||||
void safeRemoveAttachmentsDir(params.entry);
|
||||
}
|
||||
const removed = subagentRuns.delete(params.runId);
|
||||
resumedRuns.delete(params.runId);
|
||||
if (!removed && !changed) {
|
||||
return false;
|
||||
}
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent orphan run pruned source=${params.source} run=${params.runId} child=${params.entry.childSessionKey} reason=${params.reason}`,
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
function reconcileOrphanedRestoredRuns() {
|
||||
const storeCache = new Map<string, Record<string, SessionEntry>>();
|
||||
let changed = false;
|
||||
for (const [runId, entry] of subagentRuns.entries()) {
|
||||
const orphanReason = resolveSubagentRunOrphanReason({
|
||||
entry,
|
||||
storeCache,
|
||||
});
|
||||
if (!orphanReason) {
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
reconcileOrphanedRun({
|
||||
runId,
|
||||
entry,
|
||||
reason: orphanReason,
|
||||
source: "restore",
|
||||
})
|
||||
) {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
const resumedRuns = new Set<string>();
|
||||
const endedHookInFlightRunIds = new Set<string>();
|
||||
const pendingLifecycleErrorByRunId = new Map<
|
||||
@@ -706,6 +474,8 @@ function resumeSubagentRun(runId: string) {
|
||||
entry,
|
||||
reason: orphanReason,
|
||||
source: "resume",
|
||||
runs: subagentRuns,
|
||||
resumedRuns,
|
||||
})
|
||||
) {
|
||||
persistSubagentRuns();
|
||||
@@ -786,7 +556,12 @@ function restoreSubagentRunsOnce() {
|
||||
if (restoredCount === 0) {
|
||||
return;
|
||||
}
|
||||
if (reconcileOrphanedRestoredRuns()) {
|
||||
if (
|
||||
reconcileOrphanedRestoredRuns({
|
||||
runs: subagentRuns,
|
||||
resumedRuns,
|
||||
})
|
||||
) {
|
||||
persistSubagentRuns();
|
||||
}
|
||||
if (subagentRuns.size === 0) {
|
||||
@@ -818,18 +593,6 @@ function restoreSubagentRunsOnce() {
|
||||
}
|
||||
}
|
||||
|
||||
function resolveArchiveAfterMs(cfg?: ReturnType<typeof loadConfig>) {
|
||||
const config = cfg ?? loadConfig();
|
||||
const minutes = config.agents?.defaults?.subagents?.archiveAfterMinutes ?? 60;
|
||||
if (!Number.isFinite(minutes) || minutes < 0) {
|
||||
return undefined;
|
||||
}
|
||||
if (minutes === 0) {
|
||||
return undefined;
|
||||
}
|
||||
return Math.max(1, Math.floor(minutes)) * 60_000;
|
||||
}
|
||||
|
||||
function resolveSubagentWaitTimeoutMs(
|
||||
cfg: ReturnType<typeof loadConfig>,
|
||||
runTimeoutSeconds?: number,
|
||||
@@ -954,44 +717,6 @@ function ensureListener() {
|
||||
});
|
||||
}
|
||||
|
||||
async function safeRemoveAttachmentsDir(entry: SubagentRunRecord): Promise<void> {
|
||||
if (!entry.attachmentsDir || !entry.attachmentsRootDir) {
|
||||
return;
|
||||
}
|
||||
|
||||
const resolveReal = async (targetPath: string): Promise<string | null> => {
|
||||
try {
|
||||
return await fs.realpath(targetPath);
|
||||
} catch (err) {
|
||||
if ((err as NodeJS.ErrnoException | undefined)?.code === "ENOENT") {
|
||||
return null;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
const [rootReal, dirReal] = await Promise.all([
|
||||
resolveReal(entry.attachmentsRootDir),
|
||||
resolveReal(entry.attachmentsDir),
|
||||
]);
|
||||
if (!dirReal) {
|
||||
return;
|
||||
}
|
||||
|
||||
const rootBase = rootReal ?? path.resolve(entry.attachmentsRootDir);
|
||||
// dirReal is guaranteed non-null here (early return above handles null case).
|
||||
const dirBase = dirReal;
|
||||
const rootWithSep = rootBase.endsWith(path.sep) ? rootBase : `${rootBase}${path.sep}`;
|
||||
if (!dirBase.startsWith(rootWithSep)) {
|
||||
return;
|
||||
}
|
||||
await fs.rm(dirBase, { recursive: true, force: true });
|
||||
} catch {
|
||||
// best effort
|
||||
}
|
||||
}
|
||||
|
||||
async function finalizeSubagentCleanup(
|
||||
runId: string,
|
||||
cleanup: "delete" | "keep",
|
||||
@@ -1260,51 +985,6 @@ export function clearSubagentRunSteerRestart(runId: string) {
|
||||
return true;
|
||||
}
|
||||
|
||||
function resolveSubagentSessionStartedAt(
|
||||
entry: Pick<SubagentRunRecord, "sessionStartedAt" | "startedAt" | "createdAt">,
|
||||
): number | undefined {
|
||||
if (typeof entry.sessionStartedAt === "number" && Number.isFinite(entry.sessionStartedAt)) {
|
||||
return entry.sessionStartedAt;
|
||||
}
|
||||
if (typeof entry.startedAt === "number" && Number.isFinite(entry.startedAt)) {
|
||||
return entry.startedAt;
|
||||
}
|
||||
return typeof entry.createdAt === "number" && Number.isFinite(entry.createdAt)
|
||||
? entry.createdAt
|
||||
: undefined;
|
||||
}
|
||||
|
||||
export function getSubagentSessionStartedAt(
|
||||
entry: Pick<SubagentRunRecord, "sessionStartedAt" | "startedAt" | "createdAt"> | null | undefined,
|
||||
): number | undefined {
|
||||
return entry ? resolveSubagentSessionStartedAt(entry) : undefined;
|
||||
}
|
||||
|
||||
export function getSubagentSessionRuntimeMs(
|
||||
entry:
|
||||
| Pick<SubagentRunRecord, "startedAt" | "endedAt" | "accumulatedRuntimeMs">
|
||||
| null
|
||||
| undefined,
|
||||
now = Date.now(),
|
||||
): number | undefined {
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const accumulatedRuntimeMs =
|
||||
typeof entry.accumulatedRuntimeMs === "number" && Number.isFinite(entry.accumulatedRuntimeMs)
|
||||
? Math.max(0, entry.accumulatedRuntimeMs)
|
||||
: 0;
|
||||
|
||||
if (typeof entry.startedAt !== "number" || !Number.isFinite(entry.startedAt)) {
|
||||
return entry.accumulatedRuntimeMs != null ? accumulatedRuntimeMs : undefined;
|
||||
}
|
||||
|
||||
const currentRunEndedAt =
|
||||
typeof entry.endedAt === "number" && Number.isFinite(entry.endedAt) ? entry.endedAt : now;
|
||||
return Math.max(0, accumulatedRuntimeMs + Math.max(0, currentRunEndedAt - entry.startedAt));
|
||||
}
|
||||
|
||||
export function replaceSubagentRunAfterSteer(params: {
|
||||
previousRunId: string;
|
||||
nextRunId: string;
|
||||
@@ -1347,7 +1027,7 @@ export function replaceSubagentRunAfterSteer(params: {
|
||||
const runTimeoutSeconds = params.runTimeoutSeconds ?? source.runTimeoutSeconds ?? 0;
|
||||
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
|
||||
const preserveFrozenResultFallback = params.preserveFrozenResultFallback === true;
|
||||
const sessionStartedAt = resolveSubagentSessionStartedAt(source) ?? now;
|
||||
const sessionStartedAt = getSubagentSessionStartedAt(source) ?? now;
|
||||
const accumulatedRuntimeMs =
|
||||
getSubagentSessionRuntimeMs(
|
||||
source,
|
||||
|
||||
Reference in New Issue
Block a user