refactor: unify monitor abort lifecycle handling

This commit is contained in:
Peter Steinberger
2026-02-26 04:36:00 +01:00
parent 02c731826a
commit e915b4c64a
13 changed files with 319 additions and 103 deletions

View File

@@ -0,0 +1,13 @@
# Changelog Fragments
Use this directory when a PR should not edit `CHANGELOG.md` directly.
- One fragment file per PR.
- File name recommendation: `pr-<number>.md`.
- Include at least one line with both `#<pr>` and `thanks @<contributor>`.
Example:
```md
- Fix LINE monitor lifecycle wait ownership (#27001) (thanks @alice)
```

View File

@@ -129,50 +129,4 @@ describe("linePlugin gateway.startAccount", () => {
abort.abort();
await task;
});
it("stays pending until abort signal fires (no premature exit)", async () => {
const { runtime, monitorLineProvider } = createRuntime();
setLineRuntime(runtime);
const abort = new AbortController();
let resolved = false;
const task = linePlugin.gateway!.startAccount!(
createStartAccountCtx({
token: "token",
secret: "secret",
runtime: createRuntimeEnv(),
abortSignal: abort.signal,
}),
).then(() => {
resolved = true;
});
// Allow async internals to flush
await new Promise((r) => setTimeout(r, 50));
expect(monitorLineProvider).toHaveBeenCalled();
expect(resolved).toBe(false);
abort.abort();
await task;
expect(resolved).toBe(true);
});
it("resolves immediately when abortSignal is already aborted", async () => {
const { runtime } = createRuntime();
setLineRuntime(runtime);
const abort = new AbortController();
abort.abort();
await linePlugin.gateway!.startAccount!(
createStartAccountCtx({
token: "token",
secret: "secret",
runtime: createRuntimeEnv(),
abortSignal: abort.signal,
}),
);
});
});

View File

@@ -661,18 +661,6 @@ export const linePlugin: ChannelPlugin<ResolvedLineAccount> = {
webhookPath: account.config.webhookPath,
});
// Keep the provider alive until the abort signal fires. Without this,
// the startAccount promise resolves immediately after webhook registration
// and the channel supervisor treats the provider as "exited", triggering an
// auto-restart loop (up to 10 attempts).
await new Promise<void>((resolve) => {
if (ctx.abortSignal.aborted) {
resolve();
return;
}
ctx.abortSignal.addEventListener("abort", () => resolve(), { once: true });
});
return monitor;
},
logoutAccount: async ({ accountId, cfg }) => {

View File

@@ -664,6 +664,61 @@ validate_changelog_entry_for_pr() {
echo "changelog validated: found PR #$pr (contributor handle unavailable, skipping thanks check)"
}
changed_changelog_fragment_files() {
git diff --name-only origin/main...HEAD -- changelog/fragments | rg '^changelog/fragments/.*\.md$' || true
}
validate_changelog_fragments_for_pr() {
local pr="$1"
local contrib="$2"
shift 2
if [ "$#" -lt 1 ]; then
echo "No changelog fragments provided for validation."
exit 1
fi
local pr_pattern
pr_pattern="(#$pr|openclaw#$pr)"
local added_lines
local file
local all_added_lines=""
for file in "$@"; do
added_lines=$(git diff --unified=0 origin/main...HEAD -- "$file" | awk '
/^\+\+\+/ { next }
/^\+/ { print substr($0, 2) }
')
if [ -z "$added_lines" ]; then
echo "$file is in diff but no added lines were detected."
exit 1
fi
all_added_lines=$(printf '%s\n%s\n' "$all_added_lines" "$added_lines")
done
local with_pr
with_pr=$(printf '%s\n' "$all_added_lines" | rg -in "$pr_pattern" || true)
if [ -z "$with_pr" ]; then
echo "Changelog fragment update must reference PR #$pr (for example, (#$pr))."
exit 1
fi
if [ -n "$contrib" ] && [ "$contrib" != "null" ]; then
local with_pr_and_thanks
with_pr_and_thanks=$(printf '%s\n' "$all_added_lines" | rg -in "$pr_pattern" | rg -i "thanks @$contrib" || true)
if [ -z "$with_pr_and_thanks" ]; then
echo "Changelog fragment update must include both PR #$pr and thanks @$contrib on the entry line."
exit 1
fi
echo "changelog fragments validated: found PR #$pr + thanks @$contrib"
return 0
fi
echo "changelog fragments validated: found PR #$pr (contributor handle unavailable, skipping thanks check)"
}
prepare_gates() {
local pr="$1"
enter_worktree "$pr" false
@@ -684,13 +739,30 @@ prepare_gates() {
docs_only=true
fi
# Enforce workflow policy: every prepared PR must include a changelog update.
if ! printf '%s\n' "$changed_files" | rg -q '^CHANGELOG\.md$'; then
echo "Missing CHANGELOG.md update in PR diff. This workflow requires a changelog entry."
local has_changelog_update=false
if printf '%s\n' "$changed_files" | rg -q '^CHANGELOG\.md$'; then
has_changelog_update=true
fi
local fragment_files
fragment_files=$(changed_changelog_fragment_files)
local has_fragment_update=false
if [ -n "$fragment_files" ]; then
has_fragment_update=true
fi
# Enforce workflow policy: every prepared PR must include either CHANGELOG.md
# or one or more changelog fragments.
if [ "$has_changelog_update" = "false" ] && [ "$has_fragment_update" = "false" ]; then
echo "Missing changelog update. Add CHANGELOG.md changes or changelog/fragments/*.md entry."
exit 1
fi
local contrib="${PR_AUTHOR:-}"
validate_changelog_entry_for_pr "$pr" "$contrib"
if [ "$has_changelog_update" = "true" ]; then
validate_changelog_entry_for_pr "$pr" "$contrib"
fi
if [ "$has_fragment_update" = "true" ]; then
mapfile -t fragment_file_list <<<"$fragment_files"
validate_changelog_fragments_for_pr "$pr" "$contrib" "${fragment_file_list[@]}"
fi
run_quiet_logged "pnpm build" ".local/gates-build.log" pnpm build
run_quiet_logged "pnpm check" ".local/gates-check.log" pnpm check

View File

@@ -0,0 +1,33 @@
import { EventEmitter } from "node:events";
import { describe, expect, it, vi } from "vitest";
import { attachEarlyGatewayErrorGuard } from "./gateway-error-guard.js";
describe("attachEarlyGatewayErrorGuard", () => {
it("captures gateway errors until released", () => {
const emitter = new EventEmitter();
const fallbackErrorListener = vi.fn();
emitter.on("error", fallbackErrorListener);
const client = {
getPlugin: vi.fn(() => ({ emitter })),
};
const guard = attachEarlyGatewayErrorGuard(client as never);
emitter.emit("error", new Error("Fatal Gateway error: 4014"));
expect(guard.pendingErrors).toHaveLength(1);
guard.release();
emitter.emit("error", new Error("Fatal Gateway error: 4000"));
expect(guard.pendingErrors).toHaveLength(1);
expect(fallbackErrorListener).toHaveBeenCalledTimes(2);
});
it("returns noop guard when gateway emitter is unavailable", () => {
const client = {
getPlugin: vi.fn(() => undefined),
};
const guard = attachEarlyGatewayErrorGuard(client as never);
expect(guard.pendingErrors).toEqual([]);
expect(() => guard.release()).not.toThrow();
});
});

View File

@@ -0,0 +1,36 @@
import type { Client } from "@buape/carbon";
import { getDiscordGatewayEmitter } from "../monitor.gateway.js";
export type EarlyGatewayErrorGuard = {
pendingErrors: unknown[];
release: () => void;
};
export function attachEarlyGatewayErrorGuard(client: Client): EarlyGatewayErrorGuard {
const pendingErrors: unknown[] = [];
const gateway = client.getPlugin("gateway");
const emitter = getDiscordGatewayEmitter(gateway);
if (!emitter) {
return {
pendingErrors,
release: () => {},
};
}
let released = false;
const onGatewayError = (err: unknown) => {
pendingErrors.push(err);
};
emitter.on("error", onGatewayError);
return {
pendingErrors,
release: () => {
if (released) {
return;
}
released = true;
emitter.removeListener("error", onGatewayError);
},
};
}

View File

@@ -34,7 +34,6 @@ import { createDiscordRetryRunner } from "../../infra/retry-policy.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { createNonExitingRuntime, type RuntimeEnv } from "../../runtime.js";
import { resolveDiscordAccount } from "../accounts.js";
import { getDiscordGatewayEmitter } from "../monitor.gateway.js";
import { fetchDiscordApplicationId } from "../probe.js";
import { normalizeDiscordToken } from "../token.js";
import { createDiscordVoiceCommand } from "../voice/command.js";
@@ -52,6 +51,7 @@ import {
} from "./agent-components.js";
import { resolveDiscordSlashCommandConfig } from "./commands.js";
import { createExecApprovalButton, DiscordExecApprovalHandler } from "./exec-approvals.js";
import { attachEarlyGatewayErrorGuard } from "./gateway-error-guard.js";
import { createDiscordGatewayPlugin } from "./gateway-plugin.js";
import {
DiscordMessageListener,
@@ -230,33 +230,6 @@ function isDiscordDisallowedIntentsError(err: unknown): boolean {
return message.includes(String(DISCORD_DISALLOWED_INTENTS_CODE));
}
type EarlyGatewayErrorGuard = {
pendingErrors: unknown[];
release: () => void;
};
function attachEarlyGatewayErrorGuard(client: Client): EarlyGatewayErrorGuard {
const pendingErrors: unknown[] = [];
const gateway = client.getPlugin<GatewayPlugin>("gateway");
const emitter = getDiscordGatewayEmitter(gateway);
if (!emitter) {
return {
pendingErrors,
release: () => {},
};
}
const onGatewayError = (err: unknown) => {
pendingErrors.push(err);
};
emitter.on("error", onGatewayError);
return {
pendingErrors,
release: () => {
emitter.removeListener("error", onGatewayError);
},
};
}
export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
const cfg = opts.config ?? loadConfig();
const account = resolveDiscordAccount({

View File

@@ -0,0 +1,29 @@
import { describe, expect, it } from "vitest";
import { waitForAbortSignal } from "./abort-signal.js";
describe("waitForAbortSignal", () => {
it("resolves immediately when signal is missing", async () => {
await expect(waitForAbortSignal(undefined)).resolves.toBeUndefined();
});
it("resolves immediately when signal is already aborted", async () => {
const abort = new AbortController();
abort.abort();
await expect(waitForAbortSignal(abort.signal)).resolves.toBeUndefined();
});
it("waits until abort fires", async () => {
const abort = new AbortController();
let resolved = false;
const task = waitForAbortSignal(abort.signal).then(() => {
resolved = true;
});
await Promise.resolve();
expect(resolved).toBe(false);
abort.abort();
await task;
expect(resolved).toBe(true);
});
});

12
src/infra/abort-signal.ts Normal file
View File

@@ -0,0 +1,12 @@
export async function waitForAbortSignal(signal?: AbortSignal): Promise<void> {
if (!signal || signal.aborted) {
return;
}
await new Promise<void>((resolve) => {
const onAbort = () => {
signal.removeEventListener("abort", onAbort);
resolve();
};
signal.addEventListener("abort", onAbort, { once: true });
});
}

View File

@@ -0,0 +1,92 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import type { RuntimeEnv } from "../runtime.js";
const { createLineBotMock, registerPluginHttpRouteMock, unregisterHttpMock } = vi.hoisted(() => ({
createLineBotMock: vi.fn(() => ({
account: { accountId: "default" },
handleWebhook: vi.fn(),
})),
registerPluginHttpRouteMock: vi.fn(),
unregisterHttpMock: vi.fn(),
}));
vi.mock("./bot.js", () => ({
createLineBot: createLineBotMock,
}));
vi.mock("../plugins/http-path.js", () => ({
normalizePluginHttpPath: (_path: string | undefined, fallback: string) => fallback,
}));
vi.mock("../plugins/http-registry.js", () => ({
registerPluginHttpRoute: registerPluginHttpRouteMock,
}));
vi.mock("./webhook-node.js", () => ({
createLineNodeWebhookHandler: vi.fn(() => vi.fn()),
}));
describe("monitorLineProvider lifecycle", () => {
beforeEach(() => {
createLineBotMock.mockClear();
unregisterHttpMock.mockClear();
registerPluginHttpRouteMock.mockClear().mockReturnValue(unregisterHttpMock);
});
it("waits for abort before resolving", async () => {
const { monitorLineProvider } = await import("./monitor.js");
const abort = new AbortController();
let resolved = false;
const task = monitorLineProvider({
channelAccessToken: "token",
channelSecret: "secret",
config: {} as OpenClawConfig,
runtime: {} as RuntimeEnv,
abortSignal: abort.signal,
}).then((monitor) => {
resolved = true;
return monitor;
});
await vi.waitFor(() => expect(registerPluginHttpRouteMock).toHaveBeenCalledTimes(1));
expect(resolved).toBe(false);
abort.abort();
await task;
expect(unregisterHttpMock).toHaveBeenCalledTimes(1);
});
it("stops immediately when signal is already aborted", async () => {
const { monitorLineProvider } = await import("./monitor.js");
const abort = new AbortController();
abort.abort();
await monitorLineProvider({
channelAccessToken: "token",
channelSecret: "secret",
config: {} as OpenClawConfig,
runtime: {} as RuntimeEnv,
abortSignal: abort.signal,
});
expect(unregisterHttpMock).toHaveBeenCalledTimes(1);
});
it("returns immediately without abort signal and stop is idempotent", async () => {
const { monitorLineProvider } = await import("./monitor.js");
const monitor = await monitorLineProvider({
channelAccessToken: "token",
channelSecret: "secret",
config: {} as OpenClawConfig,
runtime: {} as RuntimeEnv,
});
expect(unregisterHttpMock).not.toHaveBeenCalled();
monitor.stop();
monitor.stop();
expect(unregisterHttpMock).toHaveBeenCalledTimes(1);
});
});

View File

@@ -4,6 +4,7 @@ import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/pr
import { createReplyPrefixOptions } from "../channels/reply-prefix.js";
import type { OpenClawConfig } from "../config/config.js";
import { danger, logVerbose } from "../globals.js";
import { waitForAbortSignal } from "../infra/abort-signal.js";
import { normalizePluginHttpPath } from "../plugins/http-path.js";
import { registerPluginHttpRoute } from "../plugins/http-registry.js";
import type { RuntimeEnv } from "../runtime.js";
@@ -296,7 +297,12 @@ export async function monitorLineProvider(
logVerbose(`line: registered webhook handler at ${normalizedPath}`);
// Handle abort signal
let stopped = false;
const stopHandler = () => {
if (stopped) {
return;
}
stopped = true;
logVerbose(`line: stopping provider for account ${resolvedAccountId}`);
unregisterHttp();
recordChannelRuntimeState({
@@ -309,7 +315,12 @@ export async function monitorLineProvider(
});
};
abortSignal?.addEventListener("abort", stopHandler);
if (abortSignal?.aborted) {
stopHandler();
} else if (abortSignal) {
abortSignal.addEventListener("abort", stopHandler, { once: true });
await waitForAbortSignal(abortSignal);
}
return {
account: bot.account,

View File

@@ -1,4 +1,4 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { monitorTelegramProvider } from "./monitor.js";
type MockCtx = {
@@ -160,19 +160,30 @@ vi.mock("../auto-reply/reply.js", () => ({
}));
describe("monitorTelegramProvider (grammY)", () => {
let consoleErrorSpy: { mockRestore: () => void } | undefined;
beforeEach(() => {
loadConfig.mockReturnValue({
agents: { defaults: { maxConcurrent: 2 } },
channels: { telegram: {} },
});
initSpy.mockClear();
runSpy.mockClear();
runSpy.mockReset().mockImplementation(() =>
makeRunnerStub({
task: () => Promise.reject(new Error("runSpy called without explicit test stub")),
}),
);
computeBackoff.mockClear();
sleepWithAbort.mockClear();
startTelegramWebhookSpy.mockClear();
registerUnhandledRejectionHandlerMock.mockClear();
resetUnhandledRejection();
createTelegramBotErrors.length = 0;
consoleErrorSpy = vi.spyOn(console, "error").mockImplementation(() => {});
});
afterEach(() => {
consoleErrorSpy?.mockRestore();
});
it("processes a DM and sends reply", async () => {

View File

@@ -2,6 +2,7 @@ import { type RunOptions, run } from "@grammyjs/runner";
import { resolveAgentMaxConcurrent } from "../config/agent-limits.js";
import type { OpenClawConfig } from "../config/config.js";
import { loadConfig } from "../config/config.js";
import { waitForAbortSignal } from "../infra/abort-signal.js";
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
import { formatErrorMessage } from "../infra/errors.js";
import { formatDurationPrecise } from "../infra/format-time/format-duration.ts";
@@ -172,16 +173,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
abortSignal: opts.abortSignal,
publicUrl: opts.webhookUrl,
});
const abortSignal = opts.abortSignal;
if (abortSignal && !abortSignal.aborted) {
await new Promise<void>((resolve) => {
const onAbort = () => {
abortSignal.removeEventListener("abort", onAbort);
resolve();
};
abortSignal.addEventListener("abort", onAbort, { once: true });
});
}
await waitForAbortSignal(opts.abortSignal);
return;
}