mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-20 13:13:06 +00:00
fix(gateway): add TTL cleanup for 3 Maps that grow unbounded causing OOM (#52731)
Merged via squash.
Prepared head SHA: 4816a29de5
Co-authored-by: artwalker <44759507+artwalker@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
16
CHANGELOG.md
16
CHANGELOG.md
@@ -33,6 +33,22 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/tailscale: start Tailscale exposure and the gateway update check before awaiting channel and plugin sidecar startup so remote operators are not locked out when startup sidecars stall.
|
||||
- QQBot/streaming: make block streaming configurable per QQ bot account via `streaming.mode` (`"partial"` | `"off"`, default `"partial"`) instead of hardcoding it off, so responses can be delivered incrementally. (#63746)
|
||||
- Dreaming/gateway: require `operator.admin` for persistent `/dreaming on|off` changes and treat missing gateway client scopes as unprivileged instead of silently allowing config writes. (#63872) Thanks @mbelinky.
|
||||
- Matrix/multi-account: keep room-level `account` scoping, inherited room overrides, and implicit account selection consistent across top-level default auth, named accounts, and cached-credential env setups. (#58449) thanks @Daanvdplas and @gumadeiras.
|
||||
- Gateway/pairing: prefer explicit QR bootstrap auth over earlier Tailscale auth classification so iOS `/pair qr` silent bootstrap pairing does not fall through to `pairing required`. (#59232) Thanks @ngutman.
|
||||
- Config/Discord: coerce safe integer numeric Discord IDs to strings during config validation, keep unsafe or precision-losing numeric snowflakes rejected, and align `openclaw doctor` repair guidance with the same fail-closed behavior. (#45125) Thanks @moliendocode.
|
||||
- Gateway/sessions: scope bare `sessions.create` aliases like `main` to the requested agent while preserving the canonical `global` and `unknown` sentinel keys. (#58207) thanks @jalehman.
|
||||
- `/context detail` now compares the tracked prompt estimate with cached context usage and surfaces untracked provider/runtime overhead when present. (#28391) thanks @ImLukeF.
|
||||
- Gateway/session reset: emit the typed `before_reset` hook for gateway `/new` and `/reset`, preserving reset-hook behavior even when the previous transcript has already been archived. (#53872) thanks @VACInc
|
||||
- Plugins/commands: pass the active host `sessionKey` into plugin command contexts, and include `sessionId` when it is already available from the active session entry, so bundled and third-party commands can resolve the current conversation reliably. (#59044) Thanks @jalehman.
|
||||
- Agents/auth: honor `models.providers.*.authHeader` for pi embedded runner model requests by injecting `Authorization: Bearer <apiKey>` when requested. (#54390) Thanks @lndyzwdxhs.
|
||||
- UI/compaction: keep the compaction indicator in a retry-pending state until the run actually finishes, so the UI does not show `Context compacted` before compaction actually finishes. (#55132) Thanks @mpz4life.
|
||||
- Cron/tool schemas: keep cron tool schemas strict-model-friendly while still preserving `failureAlert=false`, nullable `agentId`/`sessionKey`, and flattened add/update recovery for the newly exposed cron job fields. (#55043) Thanks @brunolorente.
|
||||
- BlueBubbles/config: accept `enrichGroupParticipantsFromContacts` in the core strict config schema so gateways no longer fail validation or startup when the BlueBubbles plugin writes that field. (#56889) Thanks @zqchris.
|
||||
- Agents/failover: classify AbortError and stream-abort messages as timeout so Ollama NDJSON stream aborts stop showing `reason=unknown` in model fallback logs. (#58324) Thanks @yelog
|
||||
- Exec approvals: route Slack, Discord, and Telegram approvals through the shared channel approval-capability path so native approval auth, delivery, and `/approve` handling stay aligned across channels while preserving Telegram session-key agent filtering. (#58634) thanks @gumadeiras
|
||||
- Matrix/runtime: resolve the verification/bootstrap runtime from a distinct packaged Matrix entry so global npm installs stop failing on crypto bootstrap with missing-module or recursive runtime alias errors. (#59249) Thanks @gumadeiras.
|
||||
- Matrix/streaming: preserve ordered block flushes before tool, message, and agent boundaries, add explicit `channels.matrix.blockStreaming` opt-in so Matrix `streaming: "off"` stays final-only by default, and move MiniMax plain-text final handling into the MiniMax provider runtime instead of the shared core heuristic. (#59266) thanks @gumadeiras
|
||||
- Gateway/agents: fix stale run-context TTL cleanup so the new maintenance sweep compiles and resets orphaned run sequence state correctly. (#52731) thanks @artwalker
|
||||
|
||||
## 2026.4.9
|
||||
|
||||
|
||||
@@ -248,9 +248,8 @@ export function createSubagentRunManager(params: {
|
||||
params.runs.set(nextRunId, next);
|
||||
params.ensureListener();
|
||||
params.persist();
|
||||
if (archiveAtMs) {
|
||||
params.startSweeper();
|
||||
}
|
||||
// Always start sweeper — session-mode runs (no archiveAtMs) also need TTL cleanup.
|
||||
params.startSweeper();
|
||||
void waitForSubagentCompletion(nextRunId, waitTimeoutMs);
|
||||
return true;
|
||||
};
|
||||
@@ -338,9 +337,8 @@ export function createSubagentRunManager(params: {
|
||||
}
|
||||
params.ensureListener();
|
||||
params.persist();
|
||||
if (archiveAtMs) {
|
||||
params.startSweeper();
|
||||
}
|
||||
// Always start sweeper — session-mode runs (no archiveAtMs) also need TTL cleanup.
|
||||
params.startSweeper();
|
||||
// Wait for subagent completion via gateway RPC (cross-process).
|
||||
// The in-process lifecycle listener is a fallback for embedded runs.
|
||||
void waitForSubagentCompletion(registerParams.runId, waitTimeoutMs);
|
||||
|
||||
@@ -108,6 +108,10 @@ const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
|
||||
* subsequent lifecycle `start` / `end` can cancel premature failure announces.
|
||||
*/
|
||||
const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
|
||||
/** Absolute TTL for session-mode runs after cleanup completes (no archiveAtMs). */
|
||||
const SESSION_RUN_TTL_MS = 5 * 60_000; // 5 minutes
|
||||
/** Absolute TTL for orphaned pendingLifecycleError entries. */
|
||||
const PENDING_ERROR_TTL_MS = 5 * 60_000; // 5 minutes
|
||||
|
||||
function loadSubagentRegistryRuntime() {
|
||||
subagentRegistryRuntimePromise ??= import("./subagent-registry.runtime.js");
|
||||
@@ -432,9 +436,8 @@ function restoreSubagentRunsOnce() {
|
||||
}
|
||||
// Resume pending work.
|
||||
ensureListener();
|
||||
if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) {
|
||||
startSweeper();
|
||||
}
|
||||
// Always start sweeper — session-mode runs (no archiveAtMs) also need TTL cleanup.
|
||||
startSweeper();
|
||||
for (const runId of subagentRuns.keys()) {
|
||||
resumeSubagentRun(runId);
|
||||
}
|
||||
@@ -479,7 +482,25 @@ async function sweepSubagentRuns() {
|
||||
const now = Date.now();
|
||||
let mutated = false;
|
||||
for (const [runId, entry] of subagentRuns.entries()) {
|
||||
if (!entry.archiveAtMs || entry.archiveAtMs > now) {
|
||||
// Session-mode runs have no archiveAtMs — apply absolute TTL after cleanup completes.
|
||||
// Use cleanupCompletedAt (not endedAt) to avoid interrupting deferred cleanup flows.
|
||||
if (!entry.archiveAtMs) {
|
||||
if (typeof entry.cleanupCompletedAt === "number" && now - entry.cleanupCompletedAt > SESSION_RUN_TTL_MS) {
|
||||
clearPendingLifecycleError(runId);
|
||||
void notifyContextEngineSubagentEnded({
|
||||
childSessionKey: entry.childSessionKey,
|
||||
reason: "swept",
|
||||
workspaceDir: entry.workspaceDir,
|
||||
});
|
||||
subagentRuns.delete(runId);
|
||||
mutated = true;
|
||||
if (!entry.retainAttachmentsOnKeep) {
|
||||
await safeRemoveAttachmentsDir(entry);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (entry.archiveAtMs > now) {
|
||||
continue;
|
||||
}
|
||||
clearPendingLifecycleError(runId);
|
||||
@@ -506,6 +527,13 @@ async function sweepSubagentRuns() {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
// Sweep orphaned pendingLifecycleError entries (absolute TTL).
|
||||
for (const [runId, pending] of pendingLifecycleErrorByRunId.entries()) {
|
||||
if (now - pending.endedAt > PENDING_ERROR_TTL_MS) {
|
||||
clearPendingLifecycleError(runId);
|
||||
}
|
||||
}
|
||||
|
||||
if (mutated) {
|
||||
persistSubagentRuns();
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { HealthSummary } from "../commands/health.js";
|
||||
import { sweepStaleRunContexts } from "../infra/agent-events.js";
|
||||
import { cleanOldMedia } from "../media/store.js";
|
||||
import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js";
|
||||
import type { ChatRunEntry } from "./server-chat.js";
|
||||
@@ -151,6 +152,8 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
params.chatDeltaSentAt.delete(runId);
|
||||
params.chatDeltaLastBroadcastLen.delete(runId);
|
||||
}
|
||||
// Sweep stale agent run contexts (orphaned when lifecycle end/error is missed).
|
||||
sweepStaleRunContexts();
|
||||
}, 60_000);
|
||||
|
||||
if (typeof params.mediaCleanupTtlMs !== "number") {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { beforeEach, describe, expect, test } from "vitest";
|
||||
import { beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import {
|
||||
clearAgentRunContext,
|
||||
emitAgentEvent,
|
||||
@@ -7,6 +7,7 @@ import {
|
||||
registerAgentRunContext,
|
||||
resetAgentEventsForTest,
|
||||
resetAgentRunContextForTest,
|
||||
sweepStaleRunContexts,
|
||||
} from "./agent-events.js";
|
||||
|
||||
type AgentEventsModule = typeof import("./agent-events.js");
|
||||
@@ -107,7 +108,7 @@ describe("agent-events sequencing", () => {
|
||||
isHeartbeat: true,
|
||||
});
|
||||
|
||||
expect(getAgentRunContext("run-ctx")).toEqual({
|
||||
expect(getAgentRunContext("run-ctx")).toMatchObject({
|
||||
sessionKey: "session-main",
|
||||
verboseLevel: "full",
|
||||
isHeartbeat: true,
|
||||
@@ -186,7 +187,7 @@ describe("agent-events sequencing", () => {
|
||||
|
||||
stop();
|
||||
|
||||
expect(second.getAgentRunContext("run-dup")).toEqual({ sessionKey: "session-dup" });
|
||||
expect(second.getAgentRunContext("run-dup")).toMatchObject({ sessionKey: "session-dup" });
|
||||
expect(seen).toEqual([
|
||||
{ seq: 1, sessionKey: "session-dup" },
|
||||
{ seq: 2, sessionKey: "session-dup" },
|
||||
@@ -194,4 +195,40 @@ describe("agent-events sequencing", () => {
|
||||
|
||||
first.resetAgentEventsForTest();
|
||||
});
|
||||
|
||||
test("sweeps stale run contexts and clears their sequence state", async () => {
|
||||
const stop = vi.spyOn(Date, "now");
|
||||
stop.mockReturnValue(100);
|
||||
registerAgentRunContext("run-stale", { sessionKey: "session-stale", registeredAt: 100 });
|
||||
registerAgentRunContext("run-active", { sessionKey: "session-active", registeredAt: 100 });
|
||||
|
||||
stop.mockReturnValue(200);
|
||||
emitAgentEvent({ runId: "run-stale", stream: "assistant", data: { text: "stale" } });
|
||||
|
||||
stop.mockReturnValue(900);
|
||||
emitAgentEvent({ runId: "run-active", stream: "assistant", data: { text: "active" } });
|
||||
|
||||
stop.mockReturnValue(1_000);
|
||||
expect(sweepStaleRunContexts(500)).toBe(1);
|
||||
expect(getAgentRunContext("run-stale")).toBeUndefined();
|
||||
expect(getAgentRunContext("run-active")).toMatchObject({ sessionKey: "session-active" });
|
||||
|
||||
const seen: Array<{ runId: string; seq: number }> = [];
|
||||
const unsubscribe = onAgentEvent((evt) => {
|
||||
if (evt.runId === "run-stale" || evt.runId === "run-active") {
|
||||
seen.push({ runId: evt.runId, seq: evt.seq });
|
||||
}
|
||||
});
|
||||
|
||||
emitAgentEvent({ runId: "run-stale", stream: "assistant", data: { text: "restarted" } });
|
||||
emitAgentEvent({ runId: "run-active", stream: "assistant", data: { text: "continued" } });
|
||||
|
||||
unsubscribe();
|
||||
stop.mockRestore();
|
||||
|
||||
expect(seen).toEqual([
|
||||
{ runId: "run-stale", seq: 1 },
|
||||
{ runId: "run-active", seq: 2 },
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -111,6 +111,10 @@ export type AgentRunContext = {
|
||||
isHeartbeat?: boolean;
|
||||
/** Whether control UI clients should receive chat/agent updates for this run. */
|
||||
isControlUiVisible?: boolean;
|
||||
/** Timestamp when this context was first registered (for TTL-based cleanup). */
|
||||
registeredAt?: number;
|
||||
/** Timestamp of last activity (updated on every emitAgentEvent). */
|
||||
lastActiveAt?: number;
|
||||
};
|
||||
|
||||
type AgentEventState = {
|
||||
@@ -136,7 +140,10 @@ export function registerAgentRunContext(runId: string, context: AgentRunContext)
|
||||
const state = getAgentEventState();
|
||||
const existing = state.runContextById.get(runId);
|
||||
if (!existing) {
|
||||
state.runContextById.set(runId, { ...context });
|
||||
state.runContextById.set(runId, {
|
||||
...context,
|
||||
registeredAt: context.registeredAt ?? Date.now(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (context.sessionKey && existing.sessionKey !== context.sessionKey) {
|
||||
@@ -159,10 +166,34 @@ export function getAgentRunContext(runId: string) {
|
||||
|
||||
export function clearAgentRunContext(runId: string) {
|
||||
getAgentEventState().runContextById.delete(runId);
|
||||
getAgentEventState().seqByRun.delete(runId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sweep stale run contexts that exceeded the given TTL.
|
||||
* Guards against orphaned entries when lifecycle "end"/"error" events are missed.
|
||||
*/
|
||||
export function sweepStaleRunContexts(maxAgeMs = 30 * 60 * 1000): number {
|
||||
const state = getAgentEventState();
|
||||
const now = Date.now();
|
||||
let swept = 0;
|
||||
for (const [runId, ctx] of state.runContextById.entries()) {
|
||||
// Use lastActiveAt (refreshed on every event) to avoid sweeping active runs.
|
||||
// Fall back to registeredAt, then treat missing timestamps as infinitely old.
|
||||
const lastSeen = ctx.lastActiveAt ?? ctx.registeredAt;
|
||||
const age = lastSeen ? now - lastSeen : Infinity;
|
||||
if (age > maxAgeMs) {
|
||||
state.runContextById.delete(runId);
|
||||
state.seqByRun.delete(runId);
|
||||
swept++;
|
||||
}
|
||||
}
|
||||
return swept;
|
||||
}
|
||||
|
||||
export function resetAgentRunContextForTest() {
|
||||
getAgentEventState().runContextById.clear();
|
||||
getAgentEventState().seqByRun.clear();
|
||||
}
|
||||
|
||||
export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
|
||||
@@ -170,6 +201,9 @@ export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
|
||||
const nextSeq = (state.seqByRun.get(event.runId) ?? 0) + 1;
|
||||
state.seqByRun.set(event.runId, nextSeq);
|
||||
const context = state.runContextById.get(event.runId);
|
||||
if (context) {
|
||||
context.lastActiveAt = Date.now();
|
||||
}
|
||||
const isControlUiVisible = context?.isControlUiVisible ?? true;
|
||||
const eventSessionKey =
|
||||
typeof event.sessionKey === "string" && event.sessionKey.trim() ? event.sessionKey : undefined;
|
||||
|
||||
Reference in New Issue
Block a user