mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-16 18:34:18 +00:00
feat(wake): expose typed sessionKey on wake protocol + system event CLI
Adds an optional sessionKey to the WakeParamsSchema and threads it through the gateway wake handler, CronService.wake(), and the underlying timer.wake() ops so callers can target a specific session for async-task completion relays instead of always hitting the agent's main session. Also adds --session-key to `openclaw system event`. The schema rejects empty/non-string sessionKey at the gateway boundary; mismatched session keys (a key that does not belong to the resolving agent) fall back to the agent's main session inside resolveCronSessionKey, which is the existing safety path. Refs #52305 (companion to PR #50818, which closes the related cron-run remap slice at internal enqueue sites). Doesn't depend on #50818. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
committed by
Peter Steinberger
parent
13bc7037b1
commit
4ddd942f5f
@@ -600,6 +600,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Status/Claude CLI: show `oauth (claude-cli)` for working Claude CLI OAuth runtime sessions instead of `unknown` when no local auth profile exists. Fixes #78632. Thanks @gorkem2020.
|
||||
- Memory search: preserve keyword-only hybrid FTS matches when vector scoring is unavailable or below the configured minimum score, so exact lexical hits are not dropped by weighted min-score filtering.
|
||||
- Heartbeat/async exec: remap cron-run session keys to agent-main (or `"global"` under `session.scope=global`) at the bash exec, ACP, gateway node-event, and CLI watchdog enqueue sites, and treat cron-run descendants as ephemeral for retention pruning, so async exec completion events land in the same queue the heartbeat drains instead of being stranded under the ephemeral cron-run key. Refs #52305. Thanks @Kaspre.
|
||||
- Wake protocol/system event CLI: type an optional `sessionKey` on `WakeParamsSchema` and add `--session-key` to `openclaw system event` so callers can target a specific session for async-task completion relays instead of always hitting the agent's main session. Refs #52305.
|
||||
- Exec approvals/node: let trusted backend node invokes complete no-device Control UI approvals after the original request connection changes, while keeping node, command, cwd, env, and allow-once replay bindings enforced. Fixes #78569. Thanks @naturedogdog.
|
||||
- Agents/subagents: keep background completion delivery on the requester-agent handoff/queue-retry path instead of raw-sending child results directly, and strip child-result wrapper or OpenClaw runtime-context scaffolding from queued outbound retries. Fixes #78531. Thanks @EthanSK.
|
||||
- Sandbox: recreate cached browser bridges when JavaScript-evaluation permission changes, keep failed prune removals tracked for retry, and make cross-device directory moves copy-then-commit without partially emptying the source on failure.
|
||||
|
||||
@@ -31,14 +31,20 @@ openclaw system presence
|
||||
|
||||
## `system event`
|
||||
|
||||
Enqueue a system event on the **main** session. The next heartbeat will inject
|
||||
it as a `System:` line in the prompt. Use `--mode now` to trigger the heartbeat
|
||||
immediately; `next-heartbeat` waits for the next scheduled tick.
|
||||
Enqueue a system event on the **main** session by default. The next heartbeat
|
||||
will inject it as a `System:` line in the prompt. Use `--mode now` to trigger
|
||||
the heartbeat immediately; `next-heartbeat` waits for the next scheduled tick.
|
||||
|
||||
Pass `--session-key` to target a specific session (for example to relay an
|
||||
async-task completion back to the channel that started it).
|
||||
|
||||
Flags:
|
||||
|
||||
- `--text <text>`: required system event text.
|
||||
- `--mode <mode>`: `now` or `next-heartbeat` (default).
|
||||
- `--session-key <sessionKey>`: optional; target a specific agent session
|
||||
instead of the agent's main session. Keys that do not belong to the
|
||||
resolved agent fall back to the agent's main session.
|
||||
- `--json`: machine-readable output.
|
||||
- `--url`, `--token`, `--timeout`, `--expect-final`: shared Gateway RPC flags.
|
||||
|
||||
|
||||
@@ -67,6 +67,38 @@ describe("system-cli", () => {
|
||||
expect(runtimeErrors[0]).toContain("--mode must be now or next-heartbeat");
|
||||
});
|
||||
|
||||
it("forwards --session-key on system event", async () => {
|
||||
await runCli([
|
||||
"system",
|
||||
"event",
|
||||
"--text",
|
||||
"ping",
|
||||
"--session-key",
|
||||
"agent:main:telegram:dm:42",
|
||||
]);
|
||||
|
||||
expect(callGatewayFromCli).toHaveBeenCalledWith(
|
||||
"wake",
|
||||
expect.any(Object),
|
||||
{ mode: "next-heartbeat", text: "ping", sessionKey: "agent:main:telegram:dm:42" },
|
||||
{ expectFinal: false },
|
||||
);
|
||||
});
|
||||
|
||||
it("omits sessionKey from payload when --session-key not provided", async () => {
|
||||
await runCli(["system", "event", "--text", "ping"]);
|
||||
|
||||
const [, , params] = callGatewayFromCli.mock.calls[0]!;
|
||||
expect(params).not.toHaveProperty("sessionKey");
|
||||
});
|
||||
|
||||
it("treats empty --session-key as omitted", async () => {
|
||||
await runCli(["system", "event", "--text", "ping", "--session-key", " "]);
|
||||
|
||||
const [, , params] = callGatewayFromCli.mock.calls[0]!;
|
||||
expect(params).not.toHaveProperty("sessionKey");
|
||||
});
|
||||
|
||||
it.each([
|
||||
{ args: ["system", "heartbeat", "last"], method: "last-heartbeat", params: undefined },
|
||||
{
|
||||
|
||||
@@ -8,7 +8,12 @@ import { formatCliCommand } from "./command-format.js";
|
||||
import type { GatewayRpcOpts } from "./gateway-rpc.js";
|
||||
import { addGatewayClientOptions, callGatewayFromCli } from "./gateway-rpc.js";
|
||||
|
||||
type SystemEventOpts = GatewayRpcOpts & { text?: string; mode?: string; json?: boolean };
|
||||
type SystemEventOpts = GatewayRpcOpts & {
|
||||
text?: string;
|
||||
mode?: string;
|
||||
sessionKey?: string;
|
||||
json?: boolean;
|
||||
};
|
||||
type SystemGatewayOpts = GatewayRpcOpts & { json?: boolean };
|
||||
|
||||
const normalizeWakeMode = (raw: unknown) => {
|
||||
@@ -56,6 +61,10 @@ export function registerSystemCli(program: Command) {
|
||||
.description("Enqueue a system event and optionally trigger a heartbeat")
|
||||
.requiredOption("--text <text>", "System event text")
|
||||
.option("--mode <mode>", "Wake mode (now|next-heartbeat)", "next-heartbeat")
|
||||
.option(
|
||||
"--session-key <sessionKey>",
|
||||
"Target a specific session for the event (defaults to the agent's main session)",
|
||||
)
|
||||
.option("--json", "Output JSON", false),
|
||||
).action(async (opts: SystemEventOpts) => {
|
||||
await runSystemGatewayCommand(
|
||||
@@ -68,7 +77,13 @@ export function registerSystemCli(program: Command) {
|
||||
);
|
||||
}
|
||||
const mode = normalizeWakeMode(opts.mode);
|
||||
return await callGatewayFromCli("wake", opts, { mode, text }, { expectFinal: false });
|
||||
const sessionKey = normalizeOptionalString(opts.sessionKey);
|
||||
return await callGatewayFromCli(
|
||||
"wake",
|
||||
opts,
|
||||
sessionKey ? { mode, text, sessionKey } : { mode, text },
|
||||
{ expectFinal: false },
|
||||
);
|
||||
},
|
||||
"ok",
|
||||
);
|
||||
|
||||
@@ -30,5 +30,5 @@ export interface CronServiceContract {
|
||||
enqueueRun(id: string, mode?: CronRunMode): Promise<CronServiceRunResult>;
|
||||
getJob(id: string): CronJob | undefined;
|
||||
getDefaultAgentId(): string | undefined;
|
||||
wake(opts: { mode: CronWakeMode; text: string }): CronWakeResult;
|
||||
wake(opts: { mode: CronWakeMode; text: string; sessionKey?: string }): CronWakeResult;
|
||||
}
|
||||
|
||||
@@ -64,7 +64,7 @@ export class CronService implements CronServiceContract {
|
||||
return this.state.deps.defaultAgentId;
|
||||
}
|
||||
|
||||
wake(opts: { mode: "now" | "next-heartbeat"; text: string }) {
|
||||
wake(opts: { mode: "now" | "next-heartbeat"; text: string; sessionKey?: string }) {
|
||||
return ops.wakeNow(this.state, opts);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -857,7 +857,7 @@ export async function enqueueRun(state: CronServiceState, id: string, mode?: "du
|
||||
|
||||
export function wakeNow(
|
||||
state: CronServiceState,
|
||||
opts: { mode: "now" | "next-heartbeat"; text: string },
|
||||
opts: { mode: "now" | "next-heartbeat"; text: string; sessionKey?: string },
|
||||
) {
|
||||
return wake(state, opts);
|
||||
}
|
||||
|
||||
@@ -1781,15 +1781,21 @@ function emitJobFinished(
|
||||
|
||||
export function wake(
|
||||
state: CronServiceState,
|
||||
opts: { mode: "now" | "next-heartbeat"; text: string },
|
||||
opts: { mode: "now" | "next-heartbeat"; text: string; sessionKey?: string },
|
||||
) {
|
||||
const text = opts.text.trim();
|
||||
if (!text) {
|
||||
return { ok: false } as const;
|
||||
}
|
||||
state.deps.enqueueSystemEvent(text);
|
||||
const sessionKey = opts.sessionKey?.trim() || undefined;
|
||||
state.deps.enqueueSystemEvent(text, sessionKey ? { sessionKey } : undefined);
|
||||
if (opts.mode === "now") {
|
||||
state.deps.requestHeartbeat({ source: "manual", intent: "immediate", reason: "wake" });
|
||||
state.deps.requestHeartbeat({
|
||||
source: "manual",
|
||||
intent: "immediate",
|
||||
reason: "wake",
|
||||
...(sessionKey ? { sessionKey } : {}),
|
||||
});
|
||||
}
|
||||
return { ok: true } as const;
|
||||
}
|
||||
|
||||
83
src/cron/service/wake.test.ts
Normal file
83
src/cron/service/wake.test.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { wake } from "./timer.js";
|
||||
|
||||
function createState() {
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeat = vi.fn();
|
||||
return {
|
||||
state: {
|
||||
deps: {
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeat,
|
||||
},
|
||||
} as unknown as Parameters<typeof wake>[0],
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeat,
|
||||
};
|
||||
}
|
||||
|
||||
describe("wake (cron timer)", () => {
|
||||
it("returns ok:false on empty text without enqueueing or waking", () => {
|
||||
const { state, enqueueSystemEvent, requestHeartbeat } = createState();
|
||||
expect(wake(state, { mode: "now", text: " " })).toEqual({ ok: false });
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeat).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("enqueues without sessionKey when omitted", () => {
|
||||
const { state, enqueueSystemEvent, requestHeartbeat } = createState();
|
||||
expect(wake(state, { mode: "now", text: "ping" })).toEqual({ ok: true });
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("ping", undefined);
|
||||
expect(requestHeartbeat).toHaveBeenCalledWith({
|
||||
source: "manual",
|
||||
intent: "immediate",
|
||||
reason: "wake",
|
||||
});
|
||||
});
|
||||
|
||||
it("threads sessionKey to both enqueue and heartbeat on mode=now", () => {
|
||||
const { state, enqueueSystemEvent, requestHeartbeat } = createState();
|
||||
expect(
|
||||
wake(state, {
|
||||
mode: "now",
|
||||
text: "ping",
|
||||
sessionKey: "agent:main:telegram:dm:42",
|
||||
}),
|
||||
).toEqual({ ok: true });
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("ping", {
|
||||
sessionKey: "agent:main:telegram:dm:42",
|
||||
});
|
||||
expect(requestHeartbeat).toHaveBeenCalledWith({
|
||||
source: "manual",
|
||||
intent: "immediate",
|
||||
reason: "wake",
|
||||
sessionKey: "agent:main:telegram:dm:42",
|
||||
});
|
||||
});
|
||||
|
||||
it("threads sessionKey to enqueue only on mode=next-heartbeat", () => {
|
||||
const { state, enqueueSystemEvent, requestHeartbeat } = createState();
|
||||
expect(
|
||||
wake(state, {
|
||||
mode: "next-heartbeat",
|
||||
text: "ping",
|
||||
sessionKey: "agent:main:slack:42",
|
||||
}),
|
||||
).toEqual({ ok: true });
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("ping", {
|
||||
sessionKey: "agent:main:slack:42",
|
||||
});
|
||||
expect(requestHeartbeat).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("treats whitespace-only sessionKey as omitted", () => {
|
||||
const { state, enqueueSystemEvent, requestHeartbeat } = createState();
|
||||
wake(state, { mode: "now", text: "ping", sessionKey: " " });
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("ping", undefined);
|
||||
expect(requestHeartbeat).toHaveBeenCalledWith({
|
||||
source: "manual",
|
||||
intent: "immediate",
|
||||
reason: "wake",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -218,6 +218,7 @@ export const WakeParamsSchema = Type.Object(
|
||||
{
|
||||
mode: Type.Union([Type.Literal("now"), Type.Literal("next-heartbeat")]),
|
||||
text: NonEmptyString,
|
||||
sessionKey: Type.Optional(NonEmptyString),
|
||||
},
|
||||
{ additionalProperties: true }, // external wake senders may attach opaque metadata
|
||||
);
|
||||
|
||||
@@ -279,7 +279,13 @@ export function buildGatewayCronService(params: {
|
||||
resolveSessionStorePath,
|
||||
sessionStorePath,
|
||||
enqueueSystemEvent: (text, opts) => {
|
||||
const { agentId, cfg: runtimeConfig } = resolveCronAgent(opts?.agentId);
|
||||
// When the caller passes only a sessionKey (e.g. `system event --session-key`),
|
||||
// derive agentId from that key so a non-default agent's session is not silently
|
||||
// rejected as foreign by resolveCronSessionKey and rerouted to the default agent.
|
||||
const derivedAgentId =
|
||||
opts?.agentId ??
|
||||
(opts?.sessionKey ? resolveAgentIdFromSessionKey(opts.sessionKey) : undefined);
|
||||
const { agentId, cfg: runtimeConfig } = resolveCronAgent(derivedAgentId);
|
||||
const sessionKey = resolveCronSessionKey({
|
||||
runtimeConfig,
|
||||
agentId,
|
||||
|
||||
@@ -175,8 +175,13 @@ export const cronHandlers: GatewayRequestHandlers = {
|
||||
const p = params as {
|
||||
mode: "now" | "next-heartbeat";
|
||||
text: string;
|
||||
sessionKey?: string;
|
||||
};
|
||||
const result = context.cron.wake({ mode: p.mode, text: p.text });
|
||||
const result = context.cron.wake({
|
||||
mode: p.mode,
|
||||
text: p.text,
|
||||
...(p.sessionKey ? { sessionKey: p.sessionKey } : {}),
|
||||
});
|
||||
respond(true, result, undefined);
|
||||
},
|
||||
"cron.list": async ({ params, respond, context }) => {
|
||||
|
||||
@@ -77,6 +77,7 @@ function createCronContext(currentJob?: CronJob) {
|
||||
update: vi.fn(async () => ({ id: "cron-1" })),
|
||||
getDefaultAgentId: vi.fn(() => "main"),
|
||||
getJob: vi.fn(() => currentJob),
|
||||
wake: vi.fn(() => ({ ok: true }) as const),
|
||||
},
|
||||
logGateway: {
|
||||
info: vi.fn(),
|
||||
@@ -647,4 +648,66 @@ describe("cron method validation", () => {
|
||||
).rejects.toThrow("DB write failed");
|
||||
expect(respond).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
describe("wake", () => {
|
||||
async function invokeWake(params: Record<string, unknown>) {
|
||||
const context = createCronContext();
|
||||
const respond = vi.fn();
|
||||
await cronHandlers.wake({
|
||||
req: {} as never,
|
||||
params: params as never,
|
||||
respond: respond as never,
|
||||
context: context as never,
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
return { context, respond };
|
||||
}
|
||||
|
||||
it("forwards sessionKey to context.cron.wake when provided", async () => {
|
||||
const { context, respond } = await invokeWake({
|
||||
mode: "now",
|
||||
text: "ping",
|
||||
sessionKey: "agent:main:telegram:dm:42",
|
||||
});
|
||||
expect(context.cron.wake).toHaveBeenCalledWith({
|
||||
mode: "now",
|
||||
text: "ping",
|
||||
sessionKey: "agent:main:telegram:dm:42",
|
||||
});
|
||||
expect(respond).toHaveBeenCalledWith(true, { ok: true }, undefined);
|
||||
});
|
||||
|
||||
it("omits sessionKey when not provided", async () => {
|
||||
const { context, respond } = await invokeWake({
|
||||
mode: "next-heartbeat",
|
||||
text: "ping",
|
||||
});
|
||||
expect(context.cron.wake).toHaveBeenCalledWith({
|
||||
mode: "next-heartbeat",
|
||||
text: "ping",
|
||||
});
|
||||
expect(respond).toHaveBeenCalledWith(true, { ok: true }, undefined);
|
||||
});
|
||||
|
||||
it("omits sessionKey when explicitly empty string", async () => {
|
||||
const { context } = await invokeWake({
|
||||
mode: "now",
|
||||
text: "ping",
|
||||
sessionKey: "",
|
||||
});
|
||||
// empty-string sessionKey is rejected at schema (NonEmptyString)
|
||||
expect(context.cron.wake).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("rejects non-string sessionKey at schema", async () => {
|
||||
const { context, respond } = await invokeWake({
|
||||
mode: "now",
|
||||
text: "ping",
|
||||
sessionKey: 42,
|
||||
});
|
||||
expect(context.cron.wake).not.toHaveBeenCalled();
|
||||
expect(respond).toHaveBeenCalledWith(false, undefined, expect.any(Object));
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user