fix: reset turns bypass active queue modes (#74144) (thanks @yelog)

This commit is contained in:
Peter Steinberger
2026-05-03 00:53:37 +01:00
parent 4c33f7d751
commit c8ab22997b
6 changed files with 95 additions and 14 deletions

View File

@@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai
- Gateway/responses: emit every client tool call from `/v1/responses` JSON and SSE responses when the agent invokes multiple client tools in a single turn, so multi-tool plans, graph orchestration calls, and similar batched flows no longer drop every call but the last. Fixes #52288. Thanks @CharZhou and @bonelli.
- Control UI/Gateway: avoid full session-list reloads for locally applied message-phase session updates, carry known session keys through transcript-file update events, and defer media provider listing when explicit generation model config is present. Refs #76236, #76203, #76188, #76107, and #76166. Thanks @BunsDev.
- Install/update: prune the obsolete `plugin-runtime-deps` state directory during packaged postinstall so upgrades from pre-2026.5.2 releases reclaim old bundled-plugin dependency caches without touching external plugin installs.
- Auto-reply/queue: treat reset-triggered `/new` and `/reset` turns as interrupt runs across active-run queue handling, so steer/followup modes cannot delay a fresh session behind existing work. Fixes #74093. (#74144) Thanks @ruji9527 and @yelog.
- Gateway: keep directly requested plugin tools invokable under restrictive tool profiles while preserving explicit deny lists and the HTTP safety deny list, preventing catalog/invoke mismatches that surface as "Tool not available". Thanks @BunsDev.
- Gateway/update: allow beta binaries to refresh gateway services when the config was last written by the matching stable release version, avoiding false newer-config downgrade blocks during beta channel updates.
- Channels: keep Matrix and Mattermost bundled in the core package instead of advertising external npm installs before those channels are cut over. Thanks @vincentkoc.

View File

@@ -958,6 +958,10 @@ export async function runReplyAgent(params: {
let activeSessionEntry = sessionEntry;
const activeSessionStore = sessionStore;
let activeIsNewSession = isNewSession;
const effectiveResetTriggered = resetTriggered === true;
const activeRunQueueMode = effectiveResetTriggered ? "interrupt" : resolvedQueue.mode;
const effectiveShouldSteer = !effectiveResetTriggered && shouldSteer;
const effectiveShouldFollowup = !effectiveResetTriggered && shouldFollowup;
const isHeartbeat = opts?.isHeartbeat === true;
const typingSignals = createTypingSignaler({
@@ -995,7 +999,7 @@ export async function runReplyAgent(params: {
}
};
if (shouldSteer && isStreaming) {
if (effectiveShouldSteer && isStreaming) {
const steerSessionId =
(sessionKey ? replyRunRegistry.resolveSessionId(sessionKey) : undefined) ??
followupRun.run.sessionId;
@@ -1003,7 +1007,7 @@ export async function runReplyAgent(params: {
steeringMode: resolvePiSteeringModeForQueueMode(resolvedQueue.mode),
...(resolvedQueue.debounceMs !== undefined ? { debounceMs: resolvedQueue.debounceMs } : {}),
});
if (steered && !shouldFollowup) {
if (steered && !effectiveShouldFollowup) {
await touchActiveSessionEntry();
typing.cleanup();
return undefined;
@@ -1013,8 +1017,9 @@ export async function runReplyAgent(params: {
const activeRunQueueAction = resolveActiveRunQueueAction({
isActive,
isHeartbeat,
shouldFollowup,
queueMode: resolvedQueue.mode,
shouldFollowup: effectiveShouldFollowup,
queueMode: activeRunQueueMode,
resetTriggered: effectiveResetTriggered,
});
const queuedRunFollowupTurn = createFollowupRunner({
@@ -1118,7 +1123,7 @@ export async function runReplyAgent(params: {
createReplyOperation({
sessionId: followupRun.run.sessionId,
sessionKey: replySessionKey ?? "",
resetTriggered: resetTriggered === true,
resetTriggered: effectiveResetTriggered,
upstreamAbortSignal: opts?.abortSignal,
});
} catch (error) {

View File

@@ -857,6 +857,32 @@ describe("runPreparedReply media-only handling", () => {
await expect(runPromise).resolves.toEqual({ text: "ok" });
expect(vi.mocked(runReplyAgent)).toHaveBeenCalledOnce();
});
it("treats reset-triggered steer mode as interrupt when the session lane is empty", async () => {
const queueSettings = await import("./queue/settings-runtime.js");
const piRuntime = await import("../../agents/pi-embedded.runtime.js");
const commandQueue = await import("../../process/command-queue.js");
vi.mocked(queueSettings.resolveQueueSettings).mockReturnValueOnce({ mode: "steer" });
vi.mocked(commandQueue.getQueueSize).mockReturnValueOnce(0);
vi.mocked(piRuntime.resolveActiveEmbeddedRunSessionId).mockReturnValue("session-active");
vi.mocked(piRuntime.abortEmbeddedPiRun).mockReturnValue(true);
const result = await runPreparedReply(
baseParams({
resetTriggered: true,
isNewSession: true,
sessionId: "session-reset-new",
}),
);
expect(result).toEqual({ text: "ok" });
expect(commandQueue.clearCommandLane).toHaveBeenCalledWith("session:session-key");
expect(piRuntime.abortEmbeddedPiRun).toHaveBeenCalledWith("session-active");
expect(vi.mocked(runReplyAgent)).toHaveBeenCalledOnce();
const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0];
expect(call?.shouldSteer).toBe(false);
expect(call?.shouldFollowup).toBe(false);
expect(call?.resetTriggered).toBe(true);
});
it("rechecks same-session ownership after async prep before registering a new reply operation", async () => {
const { resolveSessionAuthProfileOverride } =
await import("../../agents/auth-profiles/session-override.js");

View File

@@ -819,11 +819,16 @@ export async function runPreparedReply(
? piRuntime.resolveEmbeddedSessionLane(sessionKey ?? sessionIdFinal)
: undefined;
const laneSize = sessionLaneKey ? getQueueSize(sessionLaneKey) : 0;
if (resolvedQueue.mode === "interrupt" && sessionLaneKey && laneSize > 0) {
const activeRunQueueMode = effectiveResetTriggered ? "interrupt" : resolvedQueue.mode;
const activeSessionIdForInterrupt = piRuntime?.resolveActiveEmbeddedRunSessionId(sessionKey);
if (
activeRunQueueMode === "interrupt" &&
sessionLaneKey &&
(laneSize > 0 || activeSessionIdForInterrupt)
) {
const cleared = clearCommandLane(sessionLaneKey);
const activeSessionId = piRuntime?.resolveActiveEmbeddedRunSessionId(sessionKey);
const aborted = piRuntime?.abortEmbeddedPiRun(
activeSessionId ?? preparedSessionState.sessionId,
activeSessionIdForInterrupt ?? preparedSessionState.sessionId,
);
logVerbose(`Interrupting ${sessionLaneKey} (cleared ${cleared}, aborted=${aborted})`);
}
@@ -856,22 +861,24 @@ export async function runPreparedReply(
};
};
let { activeSessionId, isActive, isStreaming } = resolveQueueBusyState();
const shouldSteer = isSteeringQueueMode(resolvedQueue.mode);
const shouldSteer = !effectiveResetTriggered && isSteeringQueueMode(resolvedQueue.mode);
const shouldFollowup =
resolvedQueue.mode === "followup" ||
resolvedQueue.mode === "collect" ||
resolvedQueue.mode === "steer-backlog";
!effectiveResetTriggered &&
(resolvedQueue.mode === "followup" ||
resolvedQueue.mode === "collect" ||
resolvedQueue.mode === "steer-backlog");
const activeRunQueueAction = resolveActiveRunQueueAction({
isActive,
isHeartbeat: opts?.isHeartbeat === true,
shouldFollowup,
queueMode: resolvedQueue.mode,
queueMode: activeRunQueueMode,
resetTriggered: effectiveResetTriggered,
});
if (isActive && activeRunQueueAction === "run-now") {
const queueState = await resolvePreparedReplyQueueState({
activeRunQueueAction,
activeSessionId: activeSessionId ?? resolveActiveQueueSessionId(),
queueMode: resolvedQueue.mode,
queueMode: activeRunQueueMode,
sessionKey,
sessionId: sessionIdFinal,
abortActiveRun: (activeRunSessionId) =>

View File

@@ -47,4 +47,42 @@ describe("resolveActiveRunQueueAction", () => {
).toBe("enqueue-followup");
}
});
it("runs reset-triggered turns immediately while another run is active", () => {
for (const queueMode of ["steer", "queue", "collect", "followup"] as const) {
expect(
resolveActiveRunQueueAction({
isActive: true,
isHeartbeat: false,
shouldFollowup: true,
queueMode,
resetTriggered: true,
}),
).toBe("run-now");
}
});
it("keeps heartbeat drops ahead of reset-triggered turns", () => {
expect(
resolveActiveRunQueueAction({
isActive: true,
isHeartbeat: true,
shouldFollowup: true,
queueMode: "steer",
resetTriggered: true,
}),
).toBe("drop");
});
it("ignores reset-triggered policy when there is no active run", () => {
expect(
resolveActiveRunQueueAction({
isActive: false,
isHeartbeat: false,
shouldFollowup: true,
queueMode: "collect",
resetTriggered: true,
}),
).toBe("run-now");
});
});

View File

@@ -7,6 +7,7 @@ export function resolveActiveRunQueueAction(params: {
isHeartbeat: boolean;
shouldFollowup: boolean;
queueMode: QueueSettings["mode"];
resetTriggered?: boolean;
}): ActiveRunQueueAction {
if (!params.isActive) {
return "run-now";
@@ -14,6 +15,9 @@ export function resolveActiveRunQueueAction(params: {
if (params.isHeartbeat) {
return "drop";
}
if (params.resetTriggered) {
return "run-now";
}
if (params.shouldFollowup || params.queueMode === "steer" || params.queueMode === "queue") {
return "enqueue-followup";
}