fix: restore worker runtime state

This commit is contained in:
Peter Steinberger
2026-05-06 09:22:40 +01:00
parent 84d793db29
commit 883eed2e95
5 changed files with 211 additions and 7 deletions

View File

@@ -2,6 +2,7 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import {
onAgentEvent as onParentAgentEvent,
@@ -19,6 +20,7 @@ import { serializeWorkerError } from "./errors.js";
function createFixtureWorkerUrl(): URL {
const source = `
import fs from "node:fs/promises";
import { parentPort } from "node:worker_threads";
let runStarted = false;
@@ -27,7 +29,23 @@ function createFixtureWorkerUrl(): URL {
parentPort.postMessage(message);
}
parentPort.on("message", (message) => {
async function writeSessionStoreUpdate(message, tag) {
if (!message.params.storePath || !message.params.sessionKey) {
return;
}
await fs.writeFile(
message.params.storePath,
JSON.stringify({
[message.params.sessionKey]: {
sessionId: message.params.sessionId,
updatedAt: 456,
model: "worker-" + tag
}
}, null, 2)
);
}
parentPort.on("message", async (message) => {
if (message.type === "abort") {
if (runStarted) {
post({ type: "error", error: { name: "AbortError", message: "aborted:" + String(message.reason ?? "") } });
@@ -84,6 +102,11 @@ function createFixtureWorkerUrl(): URL {
post({ type: "error", error: { name: "FixtureError", message: "fixture failed", code: "FIXTURE" } });
return;
}
if (message.params.body === "mutate-store-then-throw") {
await writeSessionStoreUpdate(message, "error");
post({ type: "error", error: { name: "FixtureError", message: "fixture failed", code: "FIXTURE" } });
return;
}
if (message.params.body === "switch") {
post({
type: "error",
@@ -104,6 +127,9 @@ function createFixtureWorkerUrl(): URL {
if (message.params.body === "wait") {
return;
}
if (message.params.body === "mutate-store") {
await writeSessionStoreUpdate(message, "result");
}
post({
type: "result",
@@ -272,6 +298,70 @@ describe("agent runtime worker bridge", () => {
});
});
it("refreshes parent session store state after a worker result", async () => {
const params = await makeWorkerParams("mutate-store");
const sessionStore: Record<string, SessionEntry> = {
[params.sessionKey!]: {
sessionId: params.sessionId,
updatedAt: 1,
model: "parent-stale",
cliSessionBindings: { "claude-cli": { sessionId: "stale-cli-session" } },
},
};
const storePath = path.join(path.dirname(params.sessionFile), "sessions.json");
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
await runAgentAttemptInWorker(
{
...params,
sessionStore,
storePath,
},
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
);
expect(sessionStore[params.sessionKey!]).toMatchObject({
sessionId: params.sessionId,
updatedAt: 456,
model: "worker-result",
});
expect(sessionStore[params.sessionKey!]?.cliSessionBindings).toBeUndefined();
});
it("refreshes parent session store state after a worker error", async () => {
const params = await makeWorkerParams("mutate-store-then-throw");
const sessionStore: Record<string, SessionEntry> = {
[params.sessionKey!]: {
sessionId: params.sessionId,
updatedAt: 1,
model: "parent-stale",
cliSessionBindings: { "claude-cli": { sessionId: "stale-cli-session" } },
},
};
const storePath = path.join(path.dirname(params.sessionFile), "sessions.json");
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
await expect(
runAgentAttemptInWorker(
{
...params,
sessionStore,
storePath,
},
{ workerUrl: createFixtureWorkerUrl(), execArgv: [], usePermissions: false },
),
).rejects.toMatchObject({
name: "FixtureError",
});
expect(sessionStore[params.sessionKey!]).toMatchObject({
sessionId: params.sessionId,
updatedAt: 456,
model: "worker-error",
});
expect(sessionStore[params.sessionKey!]?.cliSessionBindings).toBeUndefined();
});
it("preserves live model switch errors across the worker boundary", async () => {
const error = new LiveSessionModelSwitchError({
provider: "anthropic",

View File

@@ -1,4 +1,7 @@
import fs from "node:fs";
import { Worker } from "node:worker_threads";
import { loadSessionStore } from "../../config/sessions/store.js";
import type { SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import type { RunAgentAttemptParams } from "../command/attempt-execution.js";
@@ -153,6 +156,18 @@ function stripWorkerCallbacks(params: RunAgentAttemptParams): AgentRuntimeWorker
return { ...rest, opts };
}
function syncParentSessionStoreFromDisk(params: RunAgentAttemptParams): void {
if (!params.sessionStore || !params.storePath || !fs.existsSync(params.storePath)) {
return;
}
const latest = loadSessionStore(params.storePath, { skipCache: true, clone: false });
const mutableStore = params.sessionStore as Record<string, SessionEntry>;
for (const key of Object.keys(mutableStore)) {
delete mutableStore[key];
}
Object.assign(mutableStore, latest);
}
export async function runAgentAttemptInWorker(
params: RunAgentAttemptParams,
options: RunAgentAttemptInWorkerOptions = {},
@@ -194,7 +209,12 @@ export async function runAgentAttemptInWorker(
worker.once("error", (error) => {
settled = true;
cleanup();
reject(error);
try {
syncParentSessionStoreFromDisk(params);
reject(error);
} catch (syncError) {
reject(syncError);
}
});
worker.once("exit", (code) => {
if (settled) {
@@ -202,7 +222,12 @@ export async function runAgentAttemptInWorker(
}
settled = true;
cleanup();
reject(new Error(`Agent runtime worker exited before completing run (code ${code})`));
try {
syncParentSessionStoreFromDisk(params);
reject(new Error(`Agent runtime worker exited before completing run (code ${code})`));
} catch (syncError) {
reject(syncError);
}
});
worker.on("message", (message: AgentWorkerToParentMessage) => {
if (message.type === "agentEvent") {
@@ -224,15 +249,27 @@ export async function runAgentAttemptInWorker(
if (message.type === "result") {
settled = true;
cleanup();
resolve(message.result);
void worker.terminate();
try {
syncParentSessionStoreFromDisk(params);
resolve(message.result);
} catch (error) {
reject(error);
} finally {
void worker.terminate();
}
return;
}
if (message.type === "error") {
settled = true;
cleanup();
reject(deserializeWorkerError(message));
void worker.terminate();
try {
syncParentSessionStoreFromDisk(params);
reject(deserializeWorkerError(message));
} catch (error) {
reject(error);
} finally {
void worker.terminate();
}
}
});

View File

@@ -6,6 +6,7 @@ import type {
ParentToAgentWorkerMessage,
} from "./agent-runtime.types.js";
import { serializeWorkerError } from "./errors.js";
import { restoreAgentWorkerPluginRuntime } from "./plugin-runtime.js";
function post(message: AgentWorkerToParentMessage): void {
// oxlint-disable-next-line unicorn/require-post-message-target-origin -- worker_threads MessagePort has no targetOrigin.
@@ -31,6 +32,14 @@ parentPort?.on("message", (message: ParentToAgentWorkerMessage) => {
const stopRuntimeEventBridge = onAgentEvent((event) => {
post({ type: "agentEvent", origin: "runtime", event });
});
try {
restoreAgentWorkerPluginRuntime(message.params);
} catch (error: unknown) {
post(serializeWorkerError(error));
stopRuntimeEventBridge();
abortController = undefined;
return;
}
void runAgentAttempt({
...message.params,
opts: {

View File

@@ -0,0 +1,58 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
import type { AgentRuntimeWorkerRunParams } from "./agent-runtime.types.js";
import { restoreAgentWorkerPluginRuntime } from "./plugin-runtime.js";
vi.mock("../runtime-plugins.js", () => ({
ensureRuntimePluginsLoaded: vi.fn(),
}));
const mockedEnsureRuntimePluginsLoaded = vi.mocked(ensureRuntimePluginsLoaded);
function makeParams(): AgentRuntimeWorkerRunParams {
return {
providerOverride: "openai",
originalProvider: "openai",
modelOverride: "gpt-5.5",
cfg: { plugins: { entries: { demo: { enabled: true } } } } as OpenClawConfig,
sessionEntry: undefined,
sessionId: "session-worker-test",
sessionKey: "agent:main:worker-test",
sessionAgentId: "main",
sessionFile: "/tmp/openclaw-worker-session.jsonl",
workspaceDir: "/tmp/openclaw-worker-workspace",
body: "hello",
isFallbackRetry: false,
resolvedThinkLevel: "medium",
timeoutMs: 1_000,
runId: "run-worker-test",
opts: { message: "hello", senderIsOwner: false },
runContext: {} as AgentRuntimeWorkerRunParams["runContext"],
spawnedBy: undefined,
messageChannel: undefined,
skillsSnapshot: undefined,
resolvedVerboseLevel: undefined,
agentDir: "/tmp/openclaw-worker-agent",
authProfileProvider: "openai",
sessionHasHistory: false,
};
}
describe("agent worker plugin runtime", () => {
beforeEach(() => {
mockedEnsureRuntimePluginsLoaded.mockClear();
});
it("restores gateway-bindable runtime plugins before worker attempts", () => {
const params = makeParams();
restoreAgentWorkerPluginRuntime(params);
expect(mockedEnsureRuntimePluginsLoaded).toHaveBeenCalledWith({
config: params.cfg,
workspaceDir: params.workspaceDir,
allowGatewaySubagentBinding: true,
});
});
});

View File

@@ -0,0 +1,10 @@
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
import type { AgentRuntimeWorkerRunParams } from "./agent-runtime.types.js";
export function restoreAgentWorkerPluginRuntime(params: AgentRuntimeWorkerRunParams): void {
ensureRuntimePluginsLoaded({
config: params.cfg,
workspaceDir: params.workspaceDir,
allowGatewaySubagentBinding: true,
});
}