diff --git a/docs/concepts/qa-e2e-automation.md b/docs/concepts/qa-e2e-automation.md index fa2e022f92b..faa72fa5c56 100644 --- a/docs/concepts/qa-e2e-automation.md +++ b/docs/concepts/qa-e2e-automation.md @@ -132,6 +132,22 @@ agent. The baseline list should stay broad enough to cover: - repo-reading and docs-reading - one small build task such as Lobster Invaders +## Transport adapters + +`qa-lab` owns a generic transport seam for markdown QA scenarios. +`qa-channel` is the first adapter on that seam, but the design target is wider: +future real or synthetic channels should plug into the same suite runner +instead of adding a transport-specific QA runner. + +At the architecture level, the split is: + +- `qa-lab` owns scenario execution, worker concurrency, artifact writing, and reporting. +- the transport adapter owns gateway config, readiness, inbound and outbound observation, transport actions, and normalized transport state. +- scenarios stay markdown-first under `qa/scenarios/`. + +Maintainer-facing adoption guidance for new channel adapters lives in +[Testing](/help/testing#adding-a-channel-to-qa). + ## Reporting `qa-lab` exports a Markdown protocol report from the observed bus timeline. diff --git a/docs/help/testing.md b/docs/help/testing.md index 38f98bb203e..37a4798d8a1 100644 --- a/docs/help/testing.md +++ b/docs/help/testing.md @@ -87,6 +87,78 @@ transport coverage matrix. | Matrix | x | x | x | x | x | x | x | x | | | Telegram | x | | | | | | | | x | +### Adding a channel to QA + +Adding a channel to the markdown QA system requires exactly two things: + +1. A transport adapter for the channel. +2. A scenario pack that exercises the channel contract. + +Do not add a channel-specific QA runner when the shared `qa-lab` runner can +own the flow. + +`qa-lab` owns the shared mechanics: + +- suite startup and teardown +- worker concurrency +- artifact writing +- report generation +- scenario execution +- compatibility aliases for older `qa-channel` scenarios + +The channel adapter owns the transport contract: + +- how the gateway is configured for that transport +- how readiness is checked +- how inbound events are injected +- how outbound messages are observed +- how transcripts and normalized transport state are exposed +- how transport-backed actions are executed +- how transport-specific reset or cleanup is handled + +The minimum adoption bar for a new channel is: + +1. Implement the transport adapter on the shared `qa-lab` seam. +2. Register the adapter in the transport registry. +3. Keep transport-specific mechanics inside the adapter or the channel harness. +4. Author or adapt markdown scenarios under `qa/scenarios/`. +5. Use the generic scenario helpers for new scenarios. +6. Keep existing compatibility aliases working unless the repo is doing an intentional migration. + +The decision rule is strict: + +- If behavior can be expressed once in `qa-lab`, put it in `qa-lab`. +- If behavior depends on one channel transport, keep it in that adapter or plugin harness. +- If a scenario needs a new capability that more than one channel can use, add a generic helper instead of a channel-specific branch in `suite.ts`. +- If a behavior is only meaningful for one transport, keep the scenario transport-specific and make that explicit in the scenario contract. + +Preferred generic helper names for new scenarios are: + +- `waitForTransportReady` +- `waitForChannelReady` +- `injectInboundMessage` +- `injectOutboundMessage` +- `waitForTransportOutboundMessage` +- `waitForChannelOutboundMessage` +- `waitForNoTransportOutbound` +- `getTransportSnapshot` +- `readTransportMessage` +- `readTransportTranscript` +- `formatTransportTranscript` +- `resetTransport` + +Compatibility aliases remain available for existing scenarios, including: + +- `waitForQaChannelReady` +- `waitForOutboundMessage` +- `waitForNoOutbound` +- `formatConversationTranscript` +- `resetBus` + +New channel work should use the generic helper names. +Compatibility aliases exist to avoid a flag day migration, not as the model for +new scenario authoring. + ## Test suites (what runs where) Think of the suites as “increasing realism” (and increasing flakiness/cost): diff --git a/extensions/qa-channel/index.ts b/extensions/qa-channel/index.ts index b9cc59f5e72..cbeac3e6e85 100644 --- a/extensions/qa-channel/index.ts +++ b/extensions/qa-channel/index.ts @@ -10,7 +10,7 @@ export default defineBundledChannelEntry({ exportName: "qaChannelPlugin", }, runtime: { - specifier: "./runtime-api.js", + specifier: "./api.js", exportName: "setQaChannelRuntime", }, }); diff --git a/extensions/qa-channel/runtime-api.ts b/extensions/qa-channel/runtime-api.ts index 0548753cc4f..84cbd4e09d3 100644 --- a/extensions/qa-channel/runtime-api.ts +++ b/extensions/qa-channel/runtime-api.ts @@ -1,2 +1,3 @@ export * from "./src/runtime-api.js"; +export { getQaChannelRuntime, setQaChannelRuntime } from "./src/runtime.js"; export * from "./src/runtime.js"; diff --git a/extensions/qa-lab/src/cli-paths.ts b/extensions/qa-lab/src/cli-paths.ts index 529527fdff6..392ae9f75b4 100644 --- a/extensions/qa-lab/src/cli-paths.ts +++ b/extensions/qa-lab/src/cli-paths.ts @@ -1,3 +1,4 @@ +import fs from "node:fs/promises"; import path from "node:path"; export function resolveRepoRelativeOutputDir(repoRoot: string, outputDir?: string) { @@ -14,3 +15,106 @@ export function resolveRepoRelativeOutputDir(repoRoot: string, outputDir?: strin } return resolved; } + +async function resolveNearestExistingPath(targetPath: string) { + let current = path.resolve(targetPath); + while (true) { + try { + await fs.lstat(current); + return current; + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + throw error; + } + } + const parent = path.dirname(current); + if (parent === current) { + throw new Error(`failed to resolve existing path for ${targetPath}`); + } + current = parent; + } +} + +function assertRepoRelativePath(repoRoot: string, targetPath: string, label: string) { + const relative = path.relative(repoRoot, targetPath); + if (relative.startsWith("..") || path.isAbsolute(relative)) { + throw new Error(`${label} must stay within the repo root.`); + } + return relative; +} + +async function assertNoSymlinkSegments(repoRoot: string, targetPath: string, label: string) { + const relative = assertRepoRelativePath(repoRoot, targetPath, label); + let current = repoRoot; + for (const segment of relative.split(path.sep).filter((entry) => entry.length > 0)) { + current = path.join(current, segment); + let stats: Awaited> | null = null; + try { + stats = await fs.lstat(current); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + break; + } + throw error; + } + if (stats.isSymbolicLink()) { + throw new Error(`${label} must not traverse symlinks.`); + } + } +} + +export async function assertRepoBoundPath(repoRoot: string, targetPath: string, label: string) { + const repoRootResolved = path.resolve(repoRoot); + const targetResolved = path.resolve(targetPath); + assertRepoRelativePath(repoRootResolved, targetResolved, label); + await assertNoSymlinkSegments(repoRootResolved, targetResolved, label); + const repoRootReal = await fs.realpath(repoRootResolved); + const nearestExistingPath = await resolveNearestExistingPath(targetResolved); + const nearestExistingReal = await fs.realpath(nearestExistingPath); + assertRepoRelativePath(repoRootReal, nearestExistingReal, label); + return targetResolved; +} + +export async function ensureRepoBoundDirectory( + repoRoot: string, + targetDir: string, + label: string, + opts?: { mode?: number }, +) { + const repoRootResolved = path.resolve(repoRoot); + const targetResolved = path.resolve(targetDir); + const relative = assertRepoRelativePath(repoRootResolved, targetResolved, label); + const repoRootReal = await fs.realpath(repoRootResolved); + let current = repoRootResolved; + for (const segment of relative.split(path.sep).filter((entry) => entry.length > 0)) { + current = path.join(current, segment); + while (true) { + try { + const stats = await fs.lstat(current); + if (stats.isSymbolicLink()) { + throw new Error(`${label} must not traverse symlinks.`); + } + if (!stats.isDirectory()) { + throw new Error(`${label} must point to a directory.`); + } + break; + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code !== "ENOENT") { + throw error; + } + try { + await fs.mkdir(current, { recursive: false, mode: opts?.mode }); + } catch (mkdirError) { + if ((mkdirError as NodeJS.ErrnoException).code === "EEXIST") { + continue; + } + throw mkdirError; + } + } + } + } + const targetReal = await fs.realpath(targetResolved); + assertRepoRelativePath(repoRootReal, targetReal, label); + return targetResolved; +} diff --git a/extensions/qa-lab/src/cli.runtime.test.ts b/extensions/qa-lab/src/cli.runtime.test.ts index 9f4651816f0..6c0efab1755 100644 --- a/extensions/qa-lab/src/cli.runtime.test.ts +++ b/extensions/qa-lab/src/cli.runtime.test.ts @@ -172,6 +172,7 @@ describe("qa cli runtime", () => { expect(runQaSuiteFromRuntime).toHaveBeenCalledWith({ repoRoot: path.resolve("/tmp/openclaw-repo"), outputDir: path.resolve("/tmp/openclaw-repo", ".artifacts/qa/frontier"), + transportId: "qa-channel", providerMode: "live-frontier", primaryModel: "openai/gpt-5.4", alternateModel: "anthropic/claude-sonnet-4-6", @@ -275,6 +276,7 @@ describe("qa cli runtime", () => { expect(runQaSuiteFromRuntime).toHaveBeenCalledWith( expect.objectContaining({ repoRoot: path.resolve("/tmp/openclaw-repo"), + transportId: "qa-channel", providerMode: "live-frontier", }), ); @@ -290,6 +292,7 @@ describe("qa cli runtime", () => { expect(runQaSuiteFromRuntime).toHaveBeenCalledWith( expect.objectContaining({ repoRoot: path.resolve("/tmp/openclaw-repo"), + transportId: "qa-channel", scenarioIds: ["channel-chat-baseline", "thread-follow-up"], concurrency: 3, }), @@ -497,6 +500,7 @@ describe("qa cli runtime", () => { expect(runQaManualLane).toHaveBeenCalledWith({ repoRoot: path.resolve("/tmp/openclaw-repo"), + transportId: "qa-channel", providerMode: "live-frontier", primaryModel: "openai/gpt-5.4", alternateModel: "openai/gpt-5.4", @@ -523,6 +527,7 @@ describe("qa cli runtime", () => { expect(runQaMultipass).toHaveBeenCalledWith({ repoRoot: path.resolve("/tmp/openclaw-repo"), outputDir: path.resolve("/tmp/openclaw-repo", ".artifacts/qa-multipass"), + transportId: "qa-channel", providerMode: "mock-openai", primaryModel: undefined, alternateModel: undefined, @@ -551,6 +556,7 @@ describe("qa cli runtime", () => { expect(runQaMultipass).toHaveBeenCalledWith( expect.objectContaining({ repoRoot: path.resolve("/tmp/openclaw-repo"), + transportId: "qa-channel", providerMode: "live-frontier", primaryModel: "openai/gpt-5.4", alternateModel: "openai/gpt-5.4", @@ -579,6 +585,7 @@ describe("qa cli runtime", () => { expect(runQaManualLane).toHaveBeenCalledWith({ repoRoot: path.resolve("/tmp/openclaw-repo"), + transportId: "qa-channel", providerMode: "mock-openai", primaryModel: "mock-openai/gpt-5.4", alternateModel: "mock-openai/gpt-5.4-alt", @@ -596,6 +603,7 @@ describe("qa cli runtime", () => { expect(runQaManualLane).toHaveBeenCalledWith({ repoRoot: path.resolve("/tmp/openclaw-repo"), + transportId: "qa-channel", providerMode: "live-frontier", primaryModel: "openai/gpt-5.4", alternateModel: "openai/gpt-5.4", @@ -615,6 +623,7 @@ describe("qa cli runtime", () => { expect(runQaManualLane).toHaveBeenCalledWith({ repoRoot: path.resolve("/tmp/openclaw-repo"), + transportId: "qa-channel", providerMode: "live-frontier", primaryModel: "anthropic/claude-sonnet-4-6", alternateModel: "anthropic/claude-sonnet-4-6", @@ -634,6 +643,7 @@ describe("qa cli runtime", () => { expect(runQaManualLane).toHaveBeenCalledWith( expect.objectContaining({ repoRoot: path.resolve("/tmp/openclaw-repo"), + transportId: "qa-channel", providerMode: "live-frontier", primaryModel: "openai/gpt-5.4", alternateModel: "openai/gpt-5.4", diff --git a/extensions/qa-lab/src/cli.runtime.ts b/extensions/qa-lab/src/cli.runtime.ts index ae5f4de09cc..3f75624ddea 100644 --- a/extensions/qa-lab/src/cli.runtime.ts +++ b/extensions/qa-lab/src/cli.runtime.ts @@ -16,6 +16,7 @@ import { runQaManualLane } from "./manual-lane.runtime.js"; import { startQaMockOpenAiServer } from "./mock-openai-server.js"; import { runQaMultipass } from "./multipass.runtime.js"; import { normalizeQaThinkingLevel, type QaThinkingLevel } from "./qa-gateway-config.js"; +import { normalizeQaTransportId } from "./qa-transport-registry.js"; import { defaultQaModelForMode, normalizeQaProviderMode, @@ -214,6 +215,7 @@ export async function runQaLabSelfCheckCommand(opts: { repoRoot?: string; output export async function runQaSuiteCommand(opts: { repoRoot?: string; outputDir?: string; + transportId?: string; runner?: string; providerMode?: QaProviderModeInput; primaryModel?: string; @@ -229,6 +231,7 @@ export async function runQaSuiteCommand(opts: { disk?: string; }) { const repoRoot = path.resolve(opts.repoRoot ?? process.cwd()); + const transportId = normalizeQaTransportId(opts.transportId); const runner = (opts.runner ?? "host").trim().toLowerCase(); const scenarioIds = resolveQaParityPackScenarioIds({ parityPack: opts.parityPack, @@ -255,6 +258,7 @@ export async function runQaSuiteCommand(opts: { const result = await runQaMultipass({ repoRoot, outputDir: resolveRepoRelativeOutputDir(repoRoot, opts.outputDir), + transportId, providerMode, primaryModel: opts.primaryModel, alternateModel: opts.alternateModel, @@ -278,6 +282,7 @@ export async function runQaSuiteCommand(opts: { const result = await runQaSuiteFromRuntime({ repoRoot, outputDir: resolveRepoRelativeOutputDir(repoRoot, opts.outputDir), + transportId, providerMode, primaryModel: opts.primaryModel, alternateModel: opts.alternateModel, @@ -375,6 +380,7 @@ export async function runQaCharacterEvalCommand(opts: { export async function runQaManualLaneCommand(opts: { repoRoot?: string; + transportId?: string; providerMode?: QaProviderModeInput; primaryModel?: string; alternateModel?: string; @@ -383,6 +389,7 @@ export async function runQaManualLaneCommand(opts: { timeoutMs?: number; }) { const repoRoot = path.resolve(opts.repoRoot ?? process.cwd()); + const transportId = normalizeQaTransportId(opts.transportId); const providerMode: QaProviderMode = opts.providerMode === undefined ? "live-frontier" : normalizeQaProviderMode(opts.providerMode); const models = resolveQaManualLaneModels({ @@ -392,6 +399,7 @@ export async function runQaManualLaneCommand(opts: { }); const result = await runQaManualLane({ repoRoot, + transportId, providerMode, primaryModel: models.primaryModel, alternateModel: models.alternateModel, diff --git a/extensions/qa-lab/src/cli.ts b/extensions/qa-lab/src/cli.ts index b520d0e6136..899eebdb3ad 100644 --- a/extensions/qa-lab/src/cli.ts +++ b/extensions/qa-lab/src/cli.ts @@ -21,6 +21,7 @@ async function runQaSelfCheck(opts: { repoRoot?: string; output?: string }) { async function runQaSuite(opts: { repoRoot?: string; outputDir?: string; + transportId?: string; providerMode?: QaProviderModeInput; primaryModel?: string; alternateModel?: string; @@ -70,6 +71,7 @@ async function runQaCharacterEval(opts: { async function runQaManualLane(opts: { repoRoot?: string; + transportId?: string; providerMode?: QaProviderModeInput; primaryModel?: string; alternateModel?: string; @@ -160,6 +162,7 @@ export function registerQaLabCli(program: Command) { .option("--repo-root ", "Repository root to target when running from a neutral cwd") .option("--output-dir ", "Suite artifact directory") .option("--runner ", "Execution runner: host or multipass", "host") + .option("--transport ", "QA transport id", "qa-channel") .option( "--provider-mode ", "Provider mode: mock-openai or live-frontier (legacy live-openai still works)", @@ -185,6 +188,7 @@ export function registerQaLabCli(program: Command) { async (opts: { repoRoot?: string; outputDir?: string; + transport?: string; runner?: string; providerMode?: QaProviderModeInput; model?: string; @@ -202,6 +206,7 @@ export function registerQaLabCli(program: Command) { await runQaSuite({ repoRoot: opts.repoRoot, outputDir: opts.outputDir, + transportId: opts.transport, runner: opts.runner, providerMode: opts.providerMode, primaryModel: opts.model, @@ -308,6 +313,7 @@ export function registerQaLabCli(program: Command) { .description("Run a one-off QA agent prompt against the selected provider/model lane") .requiredOption("--message ", "Prompt to send to the QA agent") .option("--repo-root ", "Repository root to target when running from a neutral cwd") + .option("--transport ", "QA transport id", "qa-channel") .option( "--provider-mode ", "Provider mode: mock-openai or live-frontier (legacy live-openai still works)", @@ -321,6 +327,7 @@ export function registerQaLabCli(program: Command) { async (opts: { message: string; repoRoot?: string; + transport?: string; providerMode?: QaProviderModeInput; model?: string; altModel?: string; @@ -329,6 +336,7 @@ export function registerQaLabCli(program: Command) { }) => { await runQaManualLane({ repoRoot: opts.repoRoot, + transportId: opts.transport, providerMode: opts.providerMode, primaryModel: opts.model, alternateModel: opts.altModel, diff --git a/extensions/qa-lab/src/docker-harness.ts b/extensions/qa-lab/src/docker-harness.ts index bcb94aa4664..40a31af49d3 100644 --- a/extensions/qa-lab/src/docker-harness.ts +++ b/extensions/qa-lab/src/docker-harness.ts @@ -2,6 +2,10 @@ import { randomUUID } from "node:crypto"; import fs from "node:fs/promises"; import path from "node:path"; import { seedQaAgentWorkspace } from "./qa-agent-workspace.js"; +import { + createQaChannelGatewayConfig, + QA_CHANNEL_REQUIRED_PLUGIN_IDS, +} from "./qa-channel-transport.js"; import { buildQaGatewayConfig } from "./qa-gateway-config.js"; const QA_LAB_INTERNAL_PORT = 43123; @@ -257,9 +261,12 @@ export async function writeQaDockerHarnessFiles(params: { gatewayPort: 18789, gatewayToken, providerBaseUrl, - qaBusBaseUrl, workspaceDir: "/tmp/openclaw/workspace", controlUiRoot: "/app/dist/control-ui", + transportPluginIds: QA_CHANNEL_REQUIRED_PLUGIN_IDS, + transportConfig: createQaChannelGatewayConfig({ + baseUrl: qaBusBaseUrl, + }), }); const files = [ diff --git a/extensions/qa-lab/src/gateway-child.test.ts b/extensions/qa-lab/src/gateway-child.test.ts index 6a11cef254c..886dc2682c5 100644 --- a/extensions/qa-lab/src/gateway-child.test.ts +++ b/extensions/qa-lab/src/gateway-child.test.ts @@ -1,5 +1,5 @@ import { spawn } from "node:child_process"; -import { lstat, mkdir, mkdtemp, readFile, readdir, rm, writeFile } from "node:fs/promises"; +import { lstat, mkdir, mkdtemp, readFile, readdir, rm, symlink, writeFile } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; @@ -346,6 +346,151 @@ describe("buildQaRuntimeEnv", () => { expect(child.exitCode !== null || child.signalCode !== null).toBe(true); }); + + it("treats bind collisions as retryable gateway startup errors", () => { + expect( + __testing.isRetryableGatewayStartupError( + "another gateway instance is already listening on ws://127.0.0.1:43124", + ), + ).toBe(true); + expect( + __testing.isRetryableGatewayStartupError( + "failed to bind gateway socket on ws://127.0.0.1:43124: Error: listen EADDRINUSE", + ), + ).toBe(true); + expect(__testing.isRetryableGatewayStartupError("gateway failed to become healthy")).toBe( + false, + ); + }); + + it("treats startup token mismatches as retryable rpc startup errors", () => { + expect( + __testing.isRetryableRpcStartupError( + "unauthorized: gateway token mismatch (set gateway.remote.token to match gateway.auth.token)", + ), + ).toBe(true); + expect(__testing.isRetryableRpcStartupError("permission denied")).toBe(false); + }); + + it("probes gateway health with a one-shot HEAD request through the SSRF guard", async () => { + const release = vi.fn(async () => {}); + fetchWithSsrFGuardMock.mockResolvedValue({ + response: { ok: true }, + release, + }); + + await expect( + __testing.fetchLocalGatewayHealth({ + baseUrl: "http://127.0.0.1:43124", + healthPath: "/readyz", + }), + ).resolves.toBe(true); + + expect(fetchWithSsrFGuardMock).toHaveBeenCalledWith( + expect.objectContaining({ + url: "http://127.0.0.1:43124/readyz", + init: expect.objectContaining({ + method: "HEAD", + headers: { + connection: "close", + }, + signal: expect.any(AbortSignal), + }), + policy: { allowPrivateNetwork: true }, + auditContext: "qa-lab-gateway-child-health", + }), + ); + expect(release).toHaveBeenCalledTimes(1); + }); + + it("preserves only sanitized gateway debug artifacts", async () => { + const tempRoot = await mkdtemp(path.join(os.tmpdir(), "qa-gateway-preserve-src-")); + const repoRoot = await mkdtemp(path.join(os.tmpdir(), "qa-gateway-preserve-repo-")); + cleanups.push(async () => { + await rm(tempRoot, { recursive: true, force: true }); + await rm(repoRoot, { recursive: true, force: true }); + }); + + const stdoutLogPath = path.join(tempRoot, "gateway.stdout.log"); + const stderrLogPath = path.join(tempRoot, "gateway.stderr.log"); + const artifactDir = path.join(repoRoot, ".artifacts", "qa-e2e", "gateway-runtime"); + await mkdir(path.dirname(artifactDir), { recursive: true }); + await writeFile( + stdoutLogPath, + 'OPENCLAW_GATEWAY_TOKEN=qa-suite-token\nOPENAI_API_KEY="openai-live"\nurl=http://127.0.0.1:18789/#token=abc123', + "utf8", + ); + await writeFile(stderrLogPath, "Authorization: Bearer secret+/token=123456", "utf8"); + await mkdir(path.join(tempRoot, "state"), { recursive: true }); + await writeFile(path.join(tempRoot, "state", "secret.txt"), "do-not-copy", "utf8"); + + await __testing.preserveQaGatewayDebugArtifacts({ + preserveToDir: artifactDir, + stdoutLogPath, + stderrLogPath, + tempRoot, + repoRoot, + }); + + expect((await readdir(artifactDir)).toSorted()).toEqual([ + "README.txt", + "gateway.stderr.log", + "gateway.stdout.log", + ]); + await expect(readFile(path.join(artifactDir, "gateway.stdout.log"), "utf8")).resolves.toBe( + "OPENCLAW_GATEWAY_TOKEN=\nOPENAI_API_KEY=\nurl=http://127.0.0.1:18789/#token=", + ); + await expect(readFile(path.join(artifactDir, "gateway.stderr.log"), "utf8")).resolves.toBe( + "Authorization: Bearer ", + ); + await expect(readFile(path.join(artifactDir, "README.txt"), "utf8")).resolves.toContain( + "was not copied because it may contain credentials or auth tokens", + ); + }); + + it("rejects preserved gateway artifacts outside the repo root", async () => { + await expect( + __testing.assertQaArtifactDirWithinRepo("/tmp/openclaw-repo", "/tmp/outside"), + ).rejects.toThrow("QA gateway artifact directory must stay within the repo root."); + }); + + it("rejects preserved gateway artifacts that traverse symlinks", async () => { + const repoRoot = await mkdtemp(path.join(os.tmpdir(), "qa-gateway-guard-repo-")); + const outsideRoot = await mkdtemp(path.join(os.tmpdir(), "qa-gateway-guard-outside-")); + cleanups.push(async () => { + await rm(repoRoot, { recursive: true, force: true }); + await rm(outsideRoot, { recursive: true, force: true }); + }); + await mkdir(path.join(repoRoot, ".artifacts"), { recursive: true }); + await symlink(outsideRoot, path.join(repoRoot, ".artifacts", "qa-e2e"), "dir"); + + await expect( + __testing.assertQaArtifactDirWithinRepo( + repoRoot, + path.join(repoRoot, ".artifacts", "qa-e2e", "gateway-runtime"), + ), + ).rejects.toThrow("QA gateway artifact directory must not traverse symlinks."); + }); + + it("cleans startup temp roots when they are not preserved", async () => { + const tempRoot = await mkdtemp(path.join(os.tmpdir(), "qa-gateway-cleanup-src-")); + const stagedRoot = await mkdtemp(path.join(os.tmpdir(), "qa-gateway-cleanup-stage-")); + cleanups.push(async () => { + await rm(tempRoot, { recursive: true, force: true }); + await rm(stagedRoot, { recursive: true, force: true }); + }); + + await writeFile(path.join(tempRoot, "openclaw.json"), "{}", "utf8"); + await writeFile(path.join(stagedRoot, "marker.txt"), "x", "utf8"); + + await __testing.cleanupQaGatewayTempRoots({ + tempRoot, + stagedBundledPluginsRoot: stagedRoot, + }); + + await expect(lstat(tempRoot)).rejects.toMatchObject({ code: "ENOENT" }); + await expect(lstat(stagedRoot)).rejects.toMatchObject({ code: "ENOENT" }); + }); }); describe("resolveQaControlUiRoot", () => { diff --git a/extensions/qa-lab/src/gateway-child.ts b/extensions/qa-lab/src/gateway-child.ts index 947b7227e42..5693b139fa4 100644 --- a/extensions/qa-lab/src/gateway-child.ts +++ b/extensions/qa-lab/src/gateway-child.ts @@ -1,6 +1,6 @@ import { spawn, type ChildProcess } from "node:child_process"; import { randomUUID } from "node:crypto"; -import { createWriteStream, existsSync } from "node:fs"; +import { createWriteStream, existsSync, type WriteStream } from "node:fs"; import fs from "node:fs/promises"; import net from "node:net"; import os from "node:os"; @@ -16,10 +16,13 @@ import { import type { ModelProviderConfig } from "openclaw/plugin-sdk/provider-model-shared"; import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime"; import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; +import { assertRepoBoundPath, ensureRepoBoundDirectory } from "./cli-paths.js"; +import { formatQaGatewayLogsForError, redactQaGatewayDebugText } from "./gateway-log-redaction.js"; import { startQaGatewayRpcClient } from "./gateway-rpc-client.js"; import { splitQaModelRef } from "./model-selection.js"; import { seedQaAgentWorkspace } from "./qa-agent-workspace.js"; import { buildQaGatewayConfig, type QaThinkingLevel } from "./qa-gateway-config.js"; +import type { QaTransportAdapter } from "./qa-transport.js"; const QA_LIVE_ENV_ALIASES = Object.freeze([ { @@ -81,9 +84,8 @@ const QA_LIVE_ANTHROPIC_SETUP_TOKEN_PROFILE_ID = "anthropic:qa-setup-token"; const QA_OPENAI_PLUGIN_ID = "openai"; const QA_LIVE_CLI_BACKEND_PRESERVE_ENV = "OPENCLAW_LIVE_CLI_BACKEND_PRESERVE_ENV"; const QA_LIVE_CLI_BACKEND_AUTH_MODE_ENV = "OPENCLAW_LIVE_CLI_BACKEND_AUTH_MODE"; - export type QaCliBackendAuthMode = "auto" | "api-key" | "subscription"; - +const QA_GATEWAY_CHILD_STARTUP_MAX_ATTEMPTS = 5; async function getFreePort() { return await new Promise((resolve, reject) => { const server = net.createServer(); @@ -99,6 +101,98 @@ async function getFreePort() { }); } +async function closeWriteStream(stream: WriteStream) { + await new Promise((resolve) => { + stream.end(() => resolve()); + }); +} + +async function writeSanitizedQaGatewayDebugLog(params: { sourcePath: string; targetPath: string }) { + const contents = await fs.readFile(params.sourcePath, "utf8").catch((error) => { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return ""; + } + throw error; + }); + await fs.writeFile(params.targetPath, redactQaGatewayDebugText(contents), "utf8"); +} + +async function assertQaArtifactDirWithinRepo(repoRoot: string, artifactDir: string) { + return await assertRepoBoundPath(repoRoot, artifactDir, "QA gateway artifact directory"); +} + +async function clearQaGatewayArtifactDir(dir: string) { + for (const entry of await fs.readdir(dir, { withFileTypes: true })) { + await fs.rm(path.join(dir, entry.name), { recursive: true, force: true }); + } +} + +async function cleanupQaGatewayTempRoots(params: { + tempRoot: string; + stagedBundledPluginsRoot?: string | null; +}) { + await fs.rm(params.tempRoot, { recursive: true, force: true }).catch(() => {}); + if (params.stagedBundledPluginsRoot) { + await fs.rm(params.stagedBundledPluginsRoot, { recursive: true, force: true }).catch(() => {}); + } +} + +async function preserveQaGatewayDebugArtifacts(params: { + preserveToDir: string; + stdoutLogPath: string; + stderrLogPath: string; + tempRoot: string; + repoRoot?: string; +}) { + const preserveToDir = params.repoRoot + ? await ensureRepoBoundDirectory( + params.repoRoot, + params.preserveToDir, + "QA gateway artifact directory", + { + mode: 0o700, + }, + ) + : params.preserveToDir; + await fs.mkdir(preserveToDir, { recursive: true, mode: 0o700 }); + await clearQaGatewayArtifactDir(preserveToDir); + await Promise.all([ + writeSanitizedQaGatewayDebugLog({ + sourcePath: params.stdoutLogPath, + targetPath: path.join(preserveToDir, "gateway.stdout.log"), + }), + writeSanitizedQaGatewayDebugLog({ + sourcePath: params.stderrLogPath, + targetPath: path.join(preserveToDir, "gateway.stderr.log"), + }), + ]); + await fs.writeFile( + path.join(preserveToDir, "README.txt"), + [ + "Only sanitized gateway debug artifacts are preserved here.", + "The full QA gateway runtime was not copied because it may contain credentials or auth tokens.", + `Original runtime temp root: ${params.tempRoot}`, + "", + ].join("\n"), + "utf8", + ); +} + +function isRetryableGatewayStartupError(details: string) { + return ( + details.includes("another gateway instance is already listening on ws://") || + details.includes("failed to bind gateway socket on ws://") || + details.includes("EADDRINUSE") || + details.includes("address already in use") + ); +} + +function appendQaGatewayTempRoot(details: string, tempRoot: string) { + return details.includes(tempRoot) + ? details + : `${details}\nQA gateway temp root preserved at ${tempRoot}`; +} + export function normalizeQaProviderModeEnv( env: NodeJS.ProcessEnv, providerMode?: "mock-openai" | "live-frontier", @@ -315,6 +409,10 @@ async function fetchLocalGatewayHealth(params: { const { response, release } = await fetchWithSsrFGuard({ url: `${params.baseUrl}${params.healthPath}`, init: { + method: "HEAD", + headers: { + connection: "close", + }, signal: AbortSignal.timeout(2_000), }, policy: { allowPrivateNetwork: true }, @@ -328,9 +426,15 @@ async function fetchLocalGatewayHealth(params: { } export const __testing = { + assertQaArtifactDirWithinRepo, buildQaRuntimeEnv, + cleanupQaGatewayTempRoots, fetchLocalGatewayHealth, isRetryableGatewayCallError, + isRetryableRpcStartupError, + isRetryableGatewayStartupError, + preserveQaGatewayDebugArtifacts, + redactQaGatewayDebugText, readQaLiveProviderConfigOverrides, resolveQaLiveAnthropicSetupToken, stageQaLiveAnthropicSetupToken, @@ -700,7 +804,10 @@ async function waitForGatewayReady(params: { function isRetryableRpcStartupError(error: unknown) { const details = formatErrorMessage(error); return ( + details.includes("gateway timeout after") || details.includes("handshake timeout") || + details.includes("gateway token mismatch") || + details.includes("token mismatch") || details.includes("gateway closed (1000") || details.includes("gateway closed (1006") || details.includes("gateway closed (1012)") @@ -719,8 +826,8 @@ export function resolveQaControlUiRoot(params: { repoRoot: string; controlUiEnab export async function startQaGatewayChild(params: { repoRoot: string; providerBaseUrl?: string; - qaBusBaseUrl: string; - includeQaChannel?: boolean; + transport: Pick; + transportBaseUrl: string; controlUiAllowedOrigins?: string[]; providerMode?: "mock-openai" | "live-frontier"; primaryModel?: string; @@ -743,7 +850,6 @@ export async function startQaGatewayChild(params: { const xdgDataHome = path.join(tempRoot, "xdg-data"); const xdgCacheHome = path.join(tempRoot, "xdg-cache"); const configPath = path.join(tempRoot, "openclaw.json"); - const gatewayPort = await getFreePort(); const gatewayToken = `qa-suite-${randomUUID()}`; await seedQaAgentWorkspace({ workspaceDir, @@ -775,216 +881,305 @@ export async function startQaGatewayChild(params: { providerConfigs: liveProviderConfigs, }) : undefined; - let cfg = buildQaGatewayConfig({ - bind: "loopback", - gatewayPort, - gatewayToken, - providerBaseUrl: params.providerBaseUrl, - qaBusBaseUrl: params.qaBusBaseUrl, - includeQaChannel: params.includeQaChannel, - workspaceDir, - controlUiRoot: resolveQaControlUiRoot({ - repoRoot: params.repoRoot, + const buildGatewayConfig = (gatewayPort: number) => + buildQaGatewayConfig({ + bind: "loopback", + gatewayPort, + gatewayToken, + providerBaseUrl: params.providerBaseUrl, + workspaceDir, + controlUiRoot: resolveQaControlUiRoot({ + repoRoot: params.repoRoot, + controlUiEnabled: params.controlUiEnabled, + }), + controlUiAllowedOrigins: params.controlUiAllowedOrigins, + providerMode: params.providerMode, + primaryModel: params.primaryModel, + alternateModel: params.alternateModel, + enabledPluginIds, + transportPluginIds: params.transport.requiredPluginIds, + transportConfig: params.transport.createGatewayConfig({ + baseUrl: params.transportBaseUrl, + }), + liveProviderConfigs, + fastMode: params.fastMode, + thinkingDefault: params.thinkingDefault, controlUiEnabled: params.controlUiEnabled, - }), - controlUiAllowedOrigins: params.controlUiAllowedOrigins, - providerMode: params.providerMode, - primaryModel: params.primaryModel, - alternateModel: params.alternateModel, - enabledPluginIds, - liveProviderConfigs, - fastMode: params.fastMode, - thinkingDefault: params.thinkingDefault, - controlUiEnabled: params.controlUiEnabled, - }); - cfg = await stageQaLiveAnthropicSetupToken({ - cfg, - stateDir, - }); - cfg = params.mutateConfig ? params.mutateConfig(cfg) : cfg; - await fs.writeFile(configPath, `${JSON.stringify(cfg, null, 2)}\n`, { - encoding: "utf8", - mode: 0o600, - }); - const allowedPluginIds = [...(cfg.plugins?.allow ?? []), "openai"].filter( - (pluginId, index, array): pluginId is string => { - return ( - typeof pluginId === "string" && pluginId.length > 0 && array.indexOf(pluginId) === index - ); - }, - ); - const bundledPluginsSourceRoot = resolveQaBundledPluginsSourceRoot(params.repoRoot); - const { bundledPluginsDir, stagedRoot: stagedBundledPluginsRoot } = - await createQaBundledPluginsDir({ - repoRoot: params.repoRoot, - tempRoot, - allowedPluginIds, }); - const runtimeHostVersion = await resolveQaRuntimeHostVersion({ - repoRoot: params.repoRoot, - bundledPluginsSourceRoot, - allowedPluginIds, - }); - + const buildStagedGatewayConfig = async (gatewayPort: number) => { + let cfg = buildGatewayConfig(gatewayPort); + cfg = await stageQaLiveAnthropicSetupToken({ + cfg, + stateDir, + }); + return params.mutateConfig ? params.mutateConfig(cfg) : cfg; + }; const stdout: Buffer[] = []; const stderr: Buffer[] = []; const stdoutLogPath = path.join(tempRoot, "gateway.stdout.log"); const stderrLogPath = path.join(tempRoot, "gateway.stderr.log"); const stdoutLog = createWriteStream(stdoutLogPath, { flags: "a" }); const stderrLog = createWriteStream(stderrLogPath, { flags: "a" }); - const env = buildQaRuntimeEnv({ - configPath, - gatewayToken, - homeDir, - stateDir, - xdgConfigHome, - xdgDataHome, - xdgCacheHome, - bundledPluginsDir, - compatibilityHostVersion: runtimeHostVersion, - providerMode: params.providerMode, - forwardHostHomeForClaudeCli: liveProviderIds.includes("claude-cli"), - claudeCliAuthMode: params.claudeCliAuthMode, - }); - const child = spawn( - process.execPath, - [ - distEntryPath, - "gateway", - "run", - "--port", - String(gatewayPort), - "--bind", - "loopback", - "--allow-unconfigured", - ], - { - cwd: runtimeCwd, - env, - detached: process.platform !== "win32", - stdio: ["ignore", "pipe", "pipe"], - }, - ); - child.stdout.on("data", (chunk) => { - const buffer = Buffer.from(chunk); - stdout.push(buffer); - stdoutLog.write(buffer); - }); - child.stderr.on("data", (chunk) => { - const buffer = Buffer.from(chunk); - stderr.push(buffer); - stderrLog.write(buffer); - }); - - const baseUrl = `http://127.0.0.1:${gatewayPort}`; - const wsUrl = `ws://127.0.0.1:${gatewayPort}`; const logs = () => `${Buffer.concat(stdout).toString("utf8")}\n${Buffer.concat(stderr).toString("utf8")}`.trim(); const keepTemp = process.env.OPENCLAW_QA_KEEP_TEMP === "1"; + let gatewayPort = 0; + let baseUrl = ""; + let wsUrl = ""; + let child: ReturnType | null = null; + let cfg: ReturnType | null = null; + let rpcClient: Awaited> | null = null; + let stagedBundledPluginsRoot: string | null = null; + let env: NodeJS.ProcessEnv | null = null; - let rpcClient; try { - await waitForGatewayReady({ - baseUrl, - logs, - child, - timeoutMs: 120_000, - }); - let lastRpcError: unknown = null; - for (let attempt = 1; attempt <= 4; attempt += 1) { + for (let attempt = 1; attempt <= QA_GATEWAY_CHILD_STARTUP_MAX_ATTEMPTS; attempt += 1) { + gatewayPort = await getFreePort(); + baseUrl = `http://127.0.0.1:${gatewayPort}`; + wsUrl = `ws://127.0.0.1:${gatewayPort}`; + cfg = await buildStagedGatewayConfig(gatewayPort); + if (!env) { + const allowedPluginIds = [...(cfg.plugins?.allow ?? []), "openai"].filter( + (pluginId, index, array): pluginId is string => { + return ( + typeof pluginId === "string" && + pluginId.length > 0 && + array.indexOf(pluginId) === index + ); + }, + ); + const bundledPluginsSourceRoot = resolveQaBundledPluginsSourceRoot(params.repoRoot); + const { bundledPluginsDir, stagedRoot } = await createQaBundledPluginsDir({ + repoRoot: params.repoRoot, + tempRoot, + allowedPluginIds, + }); + stagedBundledPluginsRoot = stagedRoot; + const runtimeHostVersion = await resolveQaRuntimeHostVersion({ + repoRoot: params.repoRoot, + bundledPluginsSourceRoot, + allowedPluginIds, + }); + env = buildQaRuntimeEnv({ + configPath, + gatewayToken, + homeDir, + stateDir, + xdgConfigHome, + xdgDataHome, + xdgCacheHome, + bundledPluginsDir, + compatibilityHostVersion: runtimeHostVersion, + providerMode: params.providerMode, + forwardHostHomeForClaudeCli: liveProviderIds.includes("claude-cli"), + claudeCliAuthMode: params.claudeCliAuthMode, + }); + } + await fs.writeFile(configPath, `${JSON.stringify(cfg, null, 2)}\n`, { + encoding: "utf8", + mode: 0o600, + }); + if (!env) { + throw new Error("qa gateway runtime env not initialized"); + } + + const attemptChild = spawn( + process.execPath, + [ + distEntryPath, + "gateway", + "run", + "--port", + String(gatewayPort), + "--bind", + "loopback", + "--allow-unconfigured", + ], + { + cwd: runtimeCwd, + env, + detached: process.platform !== "win32", + stdio: ["ignore", "pipe", "pipe"], + }, + ); + attemptChild.stdout.on("data", (chunk) => { + const buffer = Buffer.from(chunk); + stdout.push(buffer); + stdoutLog.write(buffer); + }); + attemptChild.stderr.on("data", (chunk) => { + const buffer = Buffer.from(chunk); + stderr.push(buffer); + stderrLog.write(buffer); + }); + child = attemptChild; + try { - rpcClient = await startQaGatewayRpcClient({ + await waitForGatewayReady({ + baseUrl, + logs, + child: attemptChild, + timeoutMs: 120_000, + }); + const attemptRpcClient = await startQaGatewayRpcClient({ wsUrl, token: gatewayToken, logs, }); - break; - } catch (error) { - lastRpcError = error; - if (attempt >= 4 || !isRetryableRpcStartupError(error)) { + try { + let rpcReady = false; + let lastRpcStartupError: unknown = null; + for (let rpcAttempt = 1; rpcAttempt <= 4; rpcAttempt += 1) { + try { + await attemptRpcClient.request("config.get", {}, { timeoutMs: 10_000 }); + rpcReady = true; + break; + } catch (error) { + lastRpcStartupError = error; + if (rpcAttempt >= 4 || !isRetryableRpcStartupError(error)) { + throw error; + } + await sleep(500 * rpcAttempt); + await waitForGatewayReady({ + baseUrl, + logs, + child: attemptChild, + timeoutMs: 15_000, + }); + } + } + if (!rpcReady) { + throw lastRpcStartupError ?? new Error("qa gateway rpc client failed to start"); + } + } catch (error) { + await attemptRpcClient.stop().catch(() => {}); throw error; } - await sleep(500 * attempt); - await waitForGatewayReady({ - baseUrl, - logs, - child, - timeoutMs: 15_000, + rpcClient = attemptRpcClient; + break; + } catch (error) { + const details = formatErrorMessage(error); + const retryable = + attempt < QA_GATEWAY_CHILD_STARTUP_MAX_ATTEMPTS && + (isRetryableGatewayStartupError(`${details}\n${logs()}`) || + isRetryableRpcStartupError(error)); + if (rpcClient) { + await rpcClient.stop().catch(() => {}); + rpcClient = null; + } + await stopQaGatewayChildProcessTree(attemptChild, { + gracefulTimeoutMs: 1_500, + forceTimeoutMs: 1_500, }); + child = null; + if (!retryable) { + throw error; + } + stdoutLog.write( + `[qa-lab] gateway child startup attempt ${attempt}/${QA_GATEWAY_CHILD_STARTUP_MAX_ATTEMPTS} hit a transient startup race on port ${gatewayPort}; retrying with a new port\n`, + ); } } - if (!rpcClient) { - throw lastRpcError ?? new Error("qa gateway rpc client failed to start"); - } - } catch (error) { - stdoutLog.end(); - stderrLog.end(); - await stopQaGatewayChildProcessTree(child, { gracefulTimeoutMs: 1_000 }).catch(() => {}); - if (!keepTemp && stagedBundledPluginsRoot) { - await fs.rm(stagedBundledPluginsRoot, { recursive: true, force: true }).catch(() => {}); - } - throw error; - } - return { - cfg, - baseUrl, - wsUrl, - pid: child.pid ?? null, - token: gatewayToken, - workspaceDir, - tempRoot, - configPath, - runtimeEnv: env, - logs, - async restart(signal: NodeJS.Signals = "SIGUSR1") { - if (!child.pid) { - throw new Error("qa gateway child has no pid"); - } - process.kill(child.pid, signal); - }, - async call( - method: string, - rpcParams?: unknown, - opts?: { expectFinal?: boolean; timeoutMs?: number; retryOnRestart?: boolean }, - ) { - const timeoutMs = opts?.timeoutMs ?? 20_000; - const retryOnRestart = opts?.retryOnRestart !== false; - let lastDetails = ""; - for (let attempt = 1; attempt <= 3; attempt += 1) { - try { - return await rpcClient.request(method, rpcParams, { - ...opts, - timeoutMs, - }); - } catch (error) { - const details = formatErrorMessage(error); - lastDetails = details; - if (attempt >= 3 || !retryOnRestart || !isRetryableGatewayCallError(details)) { - throw new Error(`${details}\nGateway logs:\n${logs()}`, { cause: error }); + if (!child || !cfg || !baseUrl || !wsUrl || !rpcClient || !env) { + throw new Error("qa gateway child failed to start"); + } + const runningChild = child; + const runningRpcClient = rpcClient; + const runningEnv = env; + + return { + cfg, + baseUrl, + wsUrl, + pid: child.pid ?? null, + token: gatewayToken, + workspaceDir, + tempRoot, + configPath, + runtimeEnv: runningEnv, + logs, + async restart(signal: NodeJS.Signals = "SIGUSR1") { + if (!runningChild.pid) { + throw new Error("qa gateway child has no pid"); + } + process.kill(runningChild.pid, signal); + }, + async call( + method: string, + rpcParams?: unknown, + opts?: { expectFinal?: boolean; timeoutMs?: number }, + ) { + const timeoutMs = opts?.timeoutMs ?? 20_000; + let lastDetails = ""; + for (let attempt = 1; attempt <= 3; attempt += 1) { + try { + return await runningRpcClient.request(method, rpcParams, { + ...opts, + timeoutMs, + }); + } catch (error) { + const details = formatErrorMessage(error); + lastDetails = details; + if (attempt >= 3 || !isRetryableGatewayCallError(details)) { + throw new Error(`${details}${formatQaGatewayLogsForError(logs())}`, { cause: error }); + } + await waitForGatewayReady({ + baseUrl, + logs, + child: runningChild, + timeoutMs: Math.max(10_000, timeoutMs), + }); } - await waitForGatewayReady({ - baseUrl, - logs, - child, - timeoutMs: Math.max(10_000, timeoutMs), + } + throw new Error(`${lastDetails}${formatQaGatewayLogsForError(logs())}`); + }, + async stop(opts?: { keepTemp?: boolean; preserveToDir?: string }) { + await runningRpcClient.stop().catch(() => {}); + await stopQaGatewayChildProcessTree(runningChild); + await closeWriteStream(stdoutLog); + await closeWriteStream(stderrLog); + if (opts?.preserveToDir && !(opts?.keepTemp ?? keepTemp)) { + await preserveQaGatewayDebugArtifacts({ + preserveToDir: opts.preserveToDir, + stdoutLogPath, + stderrLogPath, + tempRoot, + repoRoot: params.repoRoot, }); } - } - throw new Error(`${lastDetails}\nGateway logs:\n${logs()}`); - }, - async stop(opts?: { keepTemp?: boolean }) { - await rpcClient.stop().catch(() => {}); - stdoutLog.end(); - stderrLog.end(); - await stopQaGatewayChildProcessTree(child); - if (!(opts?.keepTemp ?? keepTemp)) { - await fs.rm(tempRoot, { recursive: true, force: true }); - if (stagedBundledPluginsRoot) { - await fs.rm(stagedBundledPluginsRoot, { recursive: true, force: true }); + if (!(opts?.keepTemp ?? keepTemp)) { + await cleanupQaGatewayTempRoots({ + tempRoot, + stagedBundledPluginsRoot, + }); } - } - }, - }; + }, + }; + } catch (error) { + await rpcClient?.stop().catch(() => {}); + if (child) { + await stopQaGatewayChildProcessTree(child, { + gracefulTimeoutMs: 1_500, + forceTimeoutMs: 1_500, + }); + } + await closeWriteStream(stdoutLog); + await closeWriteStream(stderrLog); + if (!keepTemp) { + await cleanupQaGatewayTempRoots({ + tempRoot, + stagedBundledPluginsRoot, + }); + } + throw new Error( + keepTemp + ? appendQaGatewayTempRoot(formatErrorMessage(error), tempRoot) + : formatErrorMessage(error), + { + cause: error, + }, + ); + } } diff --git a/extensions/qa-lab/src/gateway-log-redaction.ts b/extensions/qa-lab/src/gateway-log-redaction.ts new file mode 100644 index 00000000000..fc2c064f494 --- /dev/null +++ b/extensions/qa-lab/src/gateway-log-redaction.ts @@ -0,0 +1,45 @@ +const QA_GATEWAY_DEBUG_SECRET_ENV_VARS = Object.freeze([ + "ANTHROPIC_API_KEY", + "ANTHROPIC_OAUTH_TOKEN", + "AWS_ACCESS_KEY_ID", + "AWS_BEARER_TOKEN_BEDROCK", + "AWS_SECRET_ACCESS_KEY", + "AWS_SESSION_TOKEN", + "ANTHROPIC_API_KEYS", + "GEMINI_API_KEY", + "GEMINI_API_KEYS", + "GOOGLE_API_KEY", + "MISTRAL_API_KEY", + "OPENAI_API_KEY", + "OPENAI_API_KEYS", + "OPENCLAW_GATEWAY_TOKEN", + "OPENCLAW_LIVE_ANTHROPIC_KEY", + "OPENCLAW_LIVE_ANTHROPIC_KEYS", + "OPENCLAW_LIVE_GEMINI_KEY", + "OPENCLAW_LIVE_OPENAI_KEY", + "VOYAGE_API_KEY", +]); + +export function redactQaGatewayDebugText(text: string) { + let redacted = text; + for (const envVar of QA_GATEWAY_DEBUG_SECRET_ENV_VARS) { + const escapedEnvVar = envVar.replaceAll(/[.*+?^${}()|[\]\\]/g, "\\$&"); + redacted = redacted.replace( + new RegExp(`\\b(${escapedEnvVar})(\\s*[=:]\\s*)([^\\s"';,]+|"[^"]*"|'[^']*')`, "g"), + `$1$2`, + ); + redacted = redacted.replace( + new RegExp(`("${escapedEnvVar}"\\s*:\\s*)"[^"]*"`, "g"), + `$1""`, + ); + } + return redacted + .replaceAll(/\bsk-ant-oat01-[A-Za-z0-9_-]+\b/g, "") + .replaceAll(/\bBearer\s+[^\s"'<>]{8,}/gi, "Bearer ") + .replaceAll(/([?#&]token=)[^&\s]+/gi, "$1"); +} + +export function formatQaGatewayLogsForError(logs: string) { + const sanitized = redactQaGatewayDebugText(logs).trim(); + return sanitized.length > 0 ? `\nGateway logs:\n${sanitized}` : ""; +} diff --git a/extensions/qa-lab/src/gateway-rpc-client.test.ts b/extensions/qa-lab/src/gateway-rpc-client.test.ts index 2f13d6f53e1..885d646de17 100644 --- a/extensions/qa-lab/src/gateway-rpc-client.test.ts +++ b/extensions/qa-lab/src/gateway-rpc-client.test.ts @@ -64,11 +64,11 @@ describe("startQaGatewayRpcClient", () => { const client = await startQaGatewayRpcClient({ wsUrl: "ws://127.0.0.1:18789", token: "qa-token", - logs: () => "qa logs", + logs: () => "OPENCLAW_GATEWAY_TOKEN=secret-token\nAuthorization: Bearer secret+/token=123456", }); await expect(client.request("health")).rejects.toThrow( - "gateway not connected\nGateway logs:\nqa logs", + "gateway not connected\nGateway logs:\nOPENCLAW_GATEWAY_TOKEN=\nAuthorization: Bearer ", ); }); @@ -76,13 +76,93 @@ describe("startQaGatewayRpcClient", () => { const client = await startQaGatewayRpcClient({ wsUrl: "ws://127.0.0.1:18789", token: "qa-token", - logs: () => "qa logs", + logs: () => "url=http://127.0.0.1:18789/#token=abc123", }); await client.stop(); await expect(client.request("health")).rejects.toThrow( - "gateway rpc client already stopped\nGateway logs:\nqa logs", + "gateway rpc client already stopped\nGateway logs:\nurl=http://127.0.0.1:18789/#token=", ); }); + + it("does not serialize requests across different gateway clients", async () => { + let resolveFirst: ((value: { ok: boolean }) => void) | null = null; + gatewayRpcMock.callGatewayFromCli + .mockImplementationOnce( + async () => + await new Promise<{ ok: boolean }>((resolve) => { + resolveFirst = resolve; + }), + ) + .mockResolvedValueOnce({ ok: true }); + + const firstClient = await startQaGatewayRpcClient({ + wsUrl: "ws://127.0.0.1:18789", + token: "qa-token-a", + logs: () => "qa logs a", + }); + const secondClient = await startQaGatewayRpcClient({ + wsUrl: "ws://127.0.0.1:28789", + token: "qa-token-b", + logs: () => "qa logs b", + }); + + const firstRequest = firstClient.request("health"); + await Promise.resolve(); + + await expect(secondClient.request("status")).resolves.toEqual({ ok: true }); + expect(gatewayRpcMock.callGatewayFromCli).toHaveBeenNthCalledWith( + 2, + "status", + { + url: "ws://127.0.0.1:28789", + token: "qa-token-b", + timeout: "20000", + expectFinal: undefined, + json: true, + }, + {}, + { + expectFinal: undefined, + progress: false, + }, + ); + + expect(resolveFirst).not.toBeNull(); + resolveFirst!({ ok: true }); + await expect(firstRequest).resolves.toEqual({ ok: true }); + }); + + it("still serializes requests within the same gateway client", async () => { + let releaseFirst: (() => void) | null = null; + gatewayRpcMock.callGatewayFromCli + .mockImplementationOnce( + async () => + await new Promise<{ ok: boolean }>((resolve) => { + releaseFirst = () => resolve({ ok: true }); + }), + ) + .mockResolvedValueOnce({ ok: true }); + + const client = await startQaGatewayRpcClient({ + wsUrl: "ws://127.0.0.1:18789", + token: "qa-token", + logs: () => "qa logs", + }); + + const firstRequest = client.request("health"); + await Promise.resolve(); + const secondRequest = client.request("status"); + await Promise.resolve(); + + expect(gatewayRpcMock.callGatewayFromCli).toHaveBeenCalledTimes(1); + + expect(releaseFirst).not.toBeNull(); + releaseFirst!(); + + await expect(firstRequest).resolves.toEqual({ ok: true }); + await expect(secondRequest).resolves.toEqual({ ok: true }); + expect(gatewayRpcMock.callGatewayFromCli).toHaveBeenCalledTimes(2); + }); }); diff --git a/extensions/qa-lab/src/gateway-rpc-client.ts b/extensions/qa-lab/src/gateway-rpc-client.ts index 48c3fb6fe23..56b4889e538 100644 --- a/extensions/qa-lab/src/gateway-rpc-client.ts +++ b/extensions/qa-lab/src/gateway-rpc-client.ts @@ -1,4 +1,5 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import { formatQaGatewayLogsForError } from "./gateway-log-redaction.js"; import { callGatewayFromCli } from "./runtime-api.js"; type QaGatewayRpcRequestOptions = { @@ -13,18 +14,16 @@ export type QaGatewayRpcClient = { function formatQaGatewayRpcError(error: unknown, logs: () => string) { const details = formatErrorMessage(error); - return new Error(`${details}\nGateway logs:\n${logs()}`); + return new Error(`${details}${formatQaGatewayLogsForError(logs())}`); } -let qaGatewayRpcQueue = Promise.resolve(); - -async function runQueuedQaGatewayRpc(task: () => Promise): Promise { - const run = qaGatewayRpcQueue.then(task, task); - qaGatewayRpcQueue = run.then( +function runQueuedQaGatewayRpc(queue: Promise, task: () => Promise) { + const run = queue.then(task, task); + const nextQueue = run.then( () => undefined, () => undefined, ); - return await run; + return { run, nextQueue }; } export async function startQaGatewayRpcClient(params: { @@ -34,6 +33,7 @@ export async function startQaGatewayRpcClient(params: { }): Promise { const wrapError = (error: unknown) => formatQaGatewayRpcError(error, params.logs); let stopped = false; + let queue = Promise.resolve(); return { async request(method, rpcParams, opts) { @@ -41,7 +41,8 @@ export async function startQaGatewayRpcClient(params: { throw wrapError(new Error("gateway rpc client already stopped")); } try { - return await runQueuedQaGatewayRpc( + const { run, nextQueue } = runQueuedQaGatewayRpc( + queue, async () => await callGatewayFromCli( method, @@ -59,6 +60,8 @@ export async function startQaGatewayRpcClient(params: { }, ), ); + queue = nextQueue; + return await run; } catch (error) { throw wrapError(error); } diff --git a/extensions/qa-lab/src/lab-server.ts b/extensions/qa-lab/src/lab-server.ts index 13d6bb2e136..5ee07a417c0 100644 --- a/extensions/qa-lab/src/lab-server.ts +++ b/extensions/qa-lab/src/lab-server.ts @@ -29,6 +29,7 @@ import type { QaLabServerStartParams, } from "./lab-server.types.js"; import type { QaRunnerModelOption } from "./model-catalog.runtime.js"; +import { createQaChannelGatewayConfig } from "./qa-channel-transport.js"; import { createIdleQaRunnerSnapshot, createQaRunOutputDir, @@ -510,17 +511,7 @@ function tryResolveUiAsset( } function createQaLabConfig(baseUrl: string): OpenClawConfig { - return { - channels: { - "qa-channel": { - enabled: true, - baseUrl, - botUserId: "openclaw", - botDisplayName: "OpenClaw QA", - allowFrom: ["*"], - }, - }, - }; + return createQaChannelGatewayConfig({ baseUrl }); } async function startQaGatewayLoop(params: { state: QaBusState; baseUrl: string }) { @@ -633,6 +624,7 @@ export async function startQaLabServer( const result = await runQaSelfCheckAgainstState({ state, cfg: gateway?.cfg ?? createQaLabConfig(listenUrl), + transportId: "qa-channel", outputPath: params?.outputPath, repoRoot, }); diff --git a/extensions/qa-lab/src/live-transports/matrix/matrix-live.runtime.ts b/extensions/qa-lab/src/live-transports/matrix/matrix-live.runtime.ts index fdf4ab7bec6..0b480c6ac66 100644 --- a/extensions/qa-lab/src/live-transports/matrix/matrix-live.runtime.ts +++ b/extensions/qa-lab/src/live-transports/matrix/matrix-live.runtime.ts @@ -324,7 +324,11 @@ export async function runMatrixQaLive(params: { try { gatewayHarness = await startQaLiveLaneGateway({ repoRoot, - qaBusBaseUrl: "http://127.0.0.1:43123", + transport: { + requiredPluginIds: [], + createGatewayConfig: () => ({}), + }, + transportBaseUrl: "http://127.0.0.1:43123", providerMode, primaryModel, alternateModel, diff --git a/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.test.ts b/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.test.ts index 5cdb75106ff..99cabf09c2c 100644 --- a/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.test.ts +++ b/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.test.ts @@ -15,6 +15,29 @@ vi.mock("../../mock-openai-server.js", () => ({ import { startQaLiveLaneGateway } from "./live-gateway.runtime.js"; +function createStubTransport(baseUrl = "http://127.0.0.1:43123") { + return { + requiredPluginIds: ["qa-channel"], + createGatewayConfig: () => ({ + channels: { + "qa-channel": { + enabled: true, + baseUrl, + botUserId: "openclaw", + botDisplayName: "OpenClaw QA", + allowFrom: ["*"], + pollTimeoutMs: 250, + }, + }, + messages: { + groupChat: { + mentionPatterns: ["\\b@?openclaw\\b"], + }, + }, + }), + }; +} + describe("startQaLiveLaneGateway", () => { const gatewayStop = vi.fn(); const mockStop = vi.fn(); @@ -41,7 +64,8 @@ describe("startQaLiveLaneGateway", () => { it("threads the mock provider base url into the gateway child", async () => { const harness = await startQaLiveLaneGateway({ repoRoot: "/tmp/openclaw-repo", - qaBusBaseUrl: "http://127.0.0.1:43123", + transport: createStubTransport(), + transportBaseUrl: "http://127.0.0.1:43123", providerMode: "mock-openai", primaryModel: "mock-openai/gpt-5.4", alternateModel: "mock-openai/gpt-5.4-alt", @@ -54,7 +78,7 @@ describe("startQaLiveLaneGateway", () => { }); expect(startQaGatewayChild).toHaveBeenCalledWith( expect.objectContaining({ - includeQaChannel: false, + transportBaseUrl: "http://127.0.0.1:43123", providerBaseUrl: "http://127.0.0.1:44080/v1", providerMode: "mock-openai", }), @@ -68,7 +92,8 @@ describe("startQaLiveLaneGateway", () => { it("skips mock bootstrap for live frontier runs", async () => { const harness = await startQaLiveLaneGateway({ repoRoot: "/tmp/openclaw-repo", - qaBusBaseUrl: "http://127.0.0.1:43123", + transport: createStubTransport(), + transportBaseUrl: "http://127.0.0.1:43123", providerMode: "live-frontier", primaryModel: "openai/gpt-5.4", alternateModel: "openai/gpt-5.4", @@ -78,7 +103,7 @@ describe("startQaLiveLaneGateway", () => { expect(startQaMockOpenAiServer).not.toHaveBeenCalled(); expect(startQaGatewayChild).toHaveBeenCalledWith( expect.objectContaining({ - includeQaChannel: false, + transportBaseUrl: "http://127.0.0.1:43123", providerBaseUrl: undefined, providerMode: "live-frontier", }), @@ -92,7 +117,8 @@ describe("startQaLiveLaneGateway", () => { gatewayStop.mockRejectedValueOnce(new Error("gateway down")); const harness = await startQaLiveLaneGateway({ repoRoot: "/tmp/openclaw-repo", - qaBusBaseUrl: "http://127.0.0.1:43123", + transport: createStubTransport(), + transportBaseUrl: "http://127.0.0.1:43123", providerMode: "mock-openai", primaryModel: "mock-openai/gpt-5.4", alternateModel: "mock-openai/gpt-5.4-alt", @@ -111,7 +137,8 @@ describe("startQaLiveLaneGateway", () => { mockStop.mockRejectedValueOnce(new Error("mock down")); const harness = await startQaLiveLaneGateway({ repoRoot: "/tmp/openclaw-repo", - qaBusBaseUrl: "http://127.0.0.1:43123", + transport: createStubTransport(), + transportBaseUrl: "http://127.0.0.1:43123", providerMode: "mock-openai", primaryModel: "mock-openai/gpt-5.4", alternateModel: "mock-openai/gpt-5.4-alt", diff --git a/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.ts b/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.ts index 9e5c9068e02..7550b728dec 100644 --- a/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.ts +++ b/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.ts @@ -28,7 +28,13 @@ async function stopQaLiveLaneResources(resources: { export async function startQaLiveLaneGateway(params: { repoRoot: string; - qaBusBaseUrl: string; + transport: { + requiredPluginIds: readonly string[]; + createGatewayConfig: (params: { + baseUrl: string; + }) => Pick; + }; + transportBaseUrl: string; controlUiAllowedOrigins?: string[]; providerMode: "mock-openai" | "live-frontier"; primaryModel: string; @@ -50,8 +56,8 @@ export async function startQaLiveLaneGateway(params: { const gateway = await startQaGatewayChild({ repoRoot: params.repoRoot, providerBaseUrl: mock ? `${mock.baseUrl}/v1` : undefined, - qaBusBaseUrl: params.qaBusBaseUrl, - includeQaChannel: false, + transport: params.transport, + transportBaseUrl: params.transportBaseUrl, controlUiAllowedOrigins: params.controlUiAllowedOrigins, providerMode: params.providerMode, primaryModel: params.primaryModel, diff --git a/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.ts b/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.ts index 10f2373656e..94d23be6643 100644 --- a/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.ts +++ b/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.ts @@ -838,7 +838,11 @@ export async function runTelegramQaLive(params: { const gatewayHarness = await startQaLiveLaneGateway({ repoRoot, - qaBusBaseUrl: "http://127.0.0.1:43123", + transport: { + requiredPluginIds: [], + createGatewayConfig: () => ({}), + }, + transportBaseUrl: "http://127.0.0.1:0", providerMode, primaryModel, alternateModel, diff --git a/extensions/qa-lab/src/manual-lane.runtime.ts b/extensions/qa-lab/src/manual-lane.runtime.ts index cdea2e2998c..a20c0621557 100644 --- a/extensions/qa-lab/src/manual-lane.runtime.ts +++ b/extensions/qa-lab/src/manual-lane.runtime.ts @@ -6,9 +6,11 @@ import { startQaLabServer } from "./lab-server.js"; import { resolveQaLiveTurnTimeoutMs } from "./live-timeout.js"; import { startQaMockOpenAiServer } from "./mock-openai-server.js"; import type { QaThinkingLevel } from "./qa-gateway-config.js"; +import { createQaTransportAdapter, type QaTransportId } from "./qa-transport-registry.js"; type QaManualLaneParams = { repoRoot: string; + transportId?: QaTransportId; providerMode: "mock-openai" | "live-frontier"; primaryModel: string; alternateModel: string; @@ -48,6 +50,10 @@ export async function runQaManualLane(params: QaManualLaneParams) { repoRoot: params.repoRoot, embeddedGateway: "disabled", }); + const transport = createQaTransportAdapter({ + id: params.transportId ?? "qa-channel", + state: lab.state, + }); const mock = params.providerMode === "mock-openai" ? await startQaMockOpenAiServer({ @@ -58,7 +64,8 @@ export async function runQaManualLane(params: QaManualLaneParams) { const gateway = await startQaGatewayChild({ repoRoot: params.repoRoot, providerBaseUrl: mock ? `${mock.baseUrl}/v1` : undefined, - qaBusBaseUrl: lab.listenUrl, + transport, + transportBaseUrl: lab.listenUrl, providerMode: params.providerMode, primaryModel: params.primaryModel, alternateModel: params.alternateModel, @@ -74,6 +81,9 @@ export async function runQaManualLane(params: QaManualLaneParams) { timeoutMs: params.timeoutMs, }); try { + const delivery = transport.buildAgentDelivery({ + target: "dm:qa-operator", + }); const started = (await gateway.call( "agent", { @@ -82,10 +92,10 @@ export async function runQaManualLane(params: QaManualLaneParams) { sessionKey: `agent:qa:manual:${sessionSuffix}`, message: params.message, deliver: true, - channel: "qa-channel", + channel: delivery.channel, to: "dm:qa-operator", - replyChannel: "qa-channel", - replyTo: "dm:qa-operator", + replyChannel: delivery.replyChannel, + replyTo: delivery.replyTo, }, { timeoutMs: 30_000 }, )) as { runId?: string }; diff --git a/extensions/qa-lab/src/model-catalog.runtime.ts b/extensions/qa-lab/src/model-catalog.runtime.ts index 54146348c66..afcf97cd847 100644 --- a/extensions/qa-lab/src/model-catalog.runtime.ts +++ b/extensions/qa-lab/src/model-catalog.runtime.ts @@ -2,6 +2,10 @@ import { spawn } from "node:child_process"; import fs from "node:fs/promises"; import path from "node:path"; import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; +import { + createQaChannelGatewayConfig, + QA_CHANNEL_REQUIRED_PLUGIN_IDS, +} from "./qa-channel-transport.js"; import { buildQaGatewayConfig } from "./qa-gateway-config.js"; const QA_FRONTIER_PROVIDER_IDS = ["anthropic", "google", "openai"] as const; @@ -103,7 +107,6 @@ export async function loadQaRunnerModelOptions(params: { repoRoot: string; signa bind: "loopback", gatewayPort: 0, gatewayToken: "qa-model-catalog", - qaBusBaseUrl: "http://127.0.0.1:9", workspaceDir, providerMode: "live-frontier", primaryModel: "openai/gpt-5.4", @@ -111,6 +114,10 @@ export async function loadQaRunnerModelOptions(params: { repoRoot: string; signa enabledProviderIds: [...QA_FRONTIER_PROVIDER_IDS], imageGenerationModel: null, controlUiEnabled: false, + transportPluginIds: QA_CHANNEL_REQUIRED_PLUGIN_IDS, + transportConfig: createQaChannelGatewayConfig({ + baseUrl: "http://127.0.0.1:9", + }), }); await fs.writeFile(configPath, `${JSON.stringify(cfg, null, 2)}\n`, "utf8"); diff --git a/extensions/qa-lab/src/multipass.runtime.ts b/extensions/qa-lab/src/multipass.runtime.ts index e6e0918d332..a62a7e9863a 100644 --- a/extensions/qa-lab/src/multipass.runtime.ts +++ b/extensions/qa-lab/src/multipass.runtime.ts @@ -108,6 +108,7 @@ export type QaMultipassPlan = { memory: string; disk: string; pnpmVersion: string; + transportId: string; providerMode: "mock-openai" | "live-frontier"; primaryModel?: string; alternateModel?: string; @@ -329,6 +330,7 @@ function appendScenarioArgs(command: string[], scenarioIds: string[]) { export function createQaMultipassPlan(params: { repoRoot: string; outputDir?: string; + transportId?: string; providerMode?: "mock-openai" | "live-frontier"; primaryModel?: string; alternateModel?: string; @@ -342,6 +344,7 @@ export function createQaMultipassPlan(params: { }) { const outputDir = params.outputDir ?? createQaMultipassOutputDir(params.repoRoot); const scenarioIds = [...new Set(params.scenarioIds ?? [])]; + const transportId = params.transportId?.trim() || "qa-channel"; const providerMode = params.providerMode ?? "mock-openai"; const forwardedEnv = providerMode === "live-frontier" ? resolveForwardedLiveEnv() : {}; const hostCodexHomePath = forwardedEnv.CODEX_HOME; @@ -359,6 +362,8 @@ export function createQaMultipassPlan(params: { "openclaw", "qa", "suite", + "--transport", + transportId, "--provider-mode", providerMode, "--output-dir", @@ -385,6 +390,7 @@ export function createQaMultipassPlan(params: { memory: params.memory ?? qaMultipassDefaultResources.memory, disk: params.disk ?? qaMultipassDefaultResources.disk, pnpmVersion: validatePnpmVersion(resolvePnpmVersion(params.repoRoot)), + transportId, providerMode, primaryModel: params.primaryModel, alternateModel: params.alternateModel, @@ -629,6 +635,7 @@ async function tryCopyGuestBootstrapLog(plan: QaMultipassPlan) { export async function runQaMultipass(params: { repoRoot: string; outputDir?: string; + transportId?: string; providerMode?: "mock-openai" | "live-frontier"; primaryModel?: string; alternateModel?: string; diff --git a/extensions/qa-lab/src/qa-channel-transport.test.ts b/extensions/qa-lab/src/qa-channel-transport.test.ts new file mode 100644 index 00000000000..d6f3682d96d --- /dev/null +++ b/extensions/qa-lab/src/qa-channel-transport.test.ts @@ -0,0 +1,122 @@ +import { describe, expect, it, vi } from "vitest"; +import { createQaBusState } from "./bus-state.js"; +import { createQaChannelTransport } from "./qa-channel-transport.js"; + +describe("qa channel transport", () => { + it("creates gateway action config for qa-channel", () => { + const transport = createQaChannelTransport(createQaBusState()); + + expect( + transport.createGatewayConfig({ + baseUrl: "http://127.0.0.1:43123", + }), + ).toEqual({ + channels: { + "qa-channel": { + enabled: true, + baseUrl: "http://127.0.0.1:43123", + botUserId: "openclaw", + botDisplayName: "OpenClaw QA", + allowFrom: ["*"], + pollTimeoutMs: 250, + }, + }, + messages: { + groupChat: { + mentionPatterns: ["\\b@?openclaw\\b"], + }, + }, + }); + }); + + it("builds agent delivery params for qa-channel replies", () => { + const transport = createQaChannelTransport(createQaBusState()); + + expect(transport.buildAgentDelivery({ target: "dm:qa-operator" })).toEqual({ + channel: "qa-channel", + replyChannel: "qa-channel", + replyTo: "dm:qa-operator", + }); + }); + + it("waits until the qa-channel default account is running", async () => { + const transport = createQaChannelTransport(createQaBusState()); + const call = vi + .fn() + .mockResolvedValueOnce({ + channelAccounts: { + "qa-channel": [{ accountId: "default", running: false }], + }, + }) + .mockResolvedValueOnce({ + channelAccounts: { + "qa-channel": [{ accountId: "default", running: true, restartPending: false }], + }, + }); + + await transport.waitReady({ + gateway: { call }, + timeoutMs: 2_000, + }); + + expect(call).toHaveBeenCalledTimes(2); + }); + + it("inherits the shared normalized message capabilities", async () => { + const transport = createQaChannelTransport(createQaBusState()); + + const inbound = await transport.capabilities.sendInboundMessage({ + accountId: "default", + conversation: { id: "dm:qa-operator", kind: "direct" }, + senderId: "qa-operator", + text: "hello from the operator", + }); + + expect(transport.capabilities.getNormalizedMessageState().messages).toHaveLength(1); + expect( + await transport.capabilities.readNormalizedMessage({ + messageId: inbound.id, + }), + ).toMatchObject({ + id: inbound.id, + text: "hello from the operator", + }); + }); + + it("inherits the shared failure-aware wait helper", async () => { + const transport = createQaChannelTransport(createQaBusState()); + let injected = false; + + await expect( + transport.capabilities.waitForCondition( + async () => { + if (!injected) { + injected = true; + await transport.capabilities.injectOutboundMessage({ + accountId: "default", + to: "dm:qa-operator", + text: "⚠️ agent failed before reply: synthetic failure for wait helper", + }); + } + return undefined; + }, + 50, + 10, + ), + ).rejects.toThrow("synthetic failure for wait helper"); + }); + + it("captures a fresh failure cursor for each wait helper call", async () => { + const transport = createQaChannelTransport(createQaBusState()); + + await transport.capabilities.injectOutboundMessage({ + accountId: "default", + to: "dm:qa-operator", + text: "⚠️ agent failed before reply: stale failure should not leak", + }); + + await expect(transport.capabilities.waitForCondition(async () => "ok", 50, 10)).resolves.toBe( + "ok", + ); + }); +}); diff --git a/extensions/qa-lab/src/qa-channel-transport.ts b/extensions/qa-lab/src/qa-channel-transport.ts new file mode 100644 index 00000000000..3c3967b43ba --- /dev/null +++ b/extensions/qa-lab/src/qa-channel-transport.ts @@ -0,0 +1,123 @@ +import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; +import type { QaBusState } from "./bus-state.js"; +import { QaStateBackedTransportAdapter, waitForQaTransportCondition } from "./qa-transport.js"; +import type { + QaTransportActionName, + QaTransportGatewayConfig, + QaTransportGatewayClient, + QaTransportReportParams, +} from "./qa-transport.js"; +import { qaChannelPlugin } from "./runtime-api.js"; + +export const QA_CHANNEL_ID = "qa-channel"; +export const QA_CHANNEL_ACCOUNT_ID = "default"; +export const QA_CHANNEL_REQUIRED_PLUGIN_IDS = Object.freeze([QA_CHANNEL_ID]); + +async function waitForQaChannelReady(params: { + gateway: QaTransportGatewayClient; + timeoutMs?: number; +}) { + await waitForQaTransportCondition( + async () => { + try { + const payload = (await params.gateway.call( + "channels.status", + { probe: false, timeoutMs: 2_000 }, + { timeoutMs: 5_000 }, + )) as { + channelAccounts?: Record< + string, + Array<{ + accountId?: string; + running?: boolean; + restartPending?: boolean; + }> + >; + }; + const accounts = payload.channelAccounts?.[QA_CHANNEL_ID] ?? []; + const account = + accounts.find((entry) => entry.accountId === QA_CHANNEL_ACCOUNT_ID) ?? accounts[0]; + return account?.running && account.restartPending !== true ? true : undefined; + } catch { + return undefined; + } + }, + params.timeoutMs ?? 45_000, + 500, + ); +} + +export function createQaChannelGatewayConfig(params: { + baseUrl: string; +}): QaTransportGatewayConfig { + return { + channels: { + [QA_CHANNEL_ID]: { + enabled: true, + baseUrl: params.baseUrl, + botUserId: "openclaw", + botDisplayName: "OpenClaw QA", + allowFrom: ["*"], + pollTimeoutMs: 250, + }, + }, + messages: { + groupChat: { + mentionPatterns: ["\\b@?openclaw\\b"], + }, + }, + }; +} + +function createQaChannelReportNotes(params: QaTransportReportParams) { + return [ + params.providerMode === "mock-openai" + ? "Runs against qa-channel + qa-lab bus + real gateway child + mock OpenAI provider." + : `Runs against qa-channel + qa-lab bus + real gateway child + live frontier models (${params.primaryModel}, ${params.alternateModel})${params.fastMode ? " with fast mode enabled" : ""}.`, + params.concurrency > 1 + ? `Scenarios run in isolated gateway workers with concurrency ${params.concurrency}.` + : "Scenarios run serially in one gateway worker.", + "Cron uses a one-minute schedule assertion plus forced execution for fast verification.", + ]; +} + +async function handleQaChannelAction(params: { + action: QaTransportActionName; + args: Record; + cfg: OpenClawConfig; + accountId?: string | null; +}) { + return await qaChannelPlugin.actions?.handleAction?.({ + channel: QA_CHANNEL_ID, + action: params.action, + cfg: params.cfg, + accountId: params.accountId?.trim() || QA_CHANNEL_ACCOUNT_ID, + params: params.args, + }); +} + +class QaChannelTransport extends QaStateBackedTransportAdapter { + constructor(state: QaBusState) { + super({ + id: QA_CHANNEL_ID, + label: "qa-channel + qa-lab bus", + accountId: QA_CHANNEL_ACCOUNT_ID, + requiredPluginIds: QA_CHANNEL_REQUIRED_PLUGIN_IDS, + state, + }); + } + + createGatewayConfig = createQaChannelGatewayConfig; + waitReady = waitForQaChannelReady; + buildAgentDelivery = ({ target }: { target: string }) => ({ + channel: QA_CHANNEL_ID, + replyChannel: QA_CHANNEL_ID, + replyTo: target, + }); + handleAction = handleQaChannelAction; + createReportNotes = createQaChannelReportNotes; +} + +export function createQaChannelTransport(state: QaBusState) { + return new QaChannelTransport(state); +} diff --git a/extensions/qa-lab/src/qa-gateway-config.test.ts b/extensions/qa-lab/src/qa-gateway-config.test.ts index 5c23e94f23c..17cd3853e5e 100644 --- a/extensions/qa-lab/src/qa-gateway-config.test.ts +++ b/extensions/qa-lab/src/qa-gateway-config.test.ts @@ -4,6 +4,30 @@ import { DEFAULT_QA_CONTROL_UI_ALLOWED_ORIGINS, mergeQaControlUiAllowedOrigins, } from "./qa-gateway-config.js"; +import type { QaTransportGatewayConfig } from "./qa-transport.js"; + +function createQaChannelTransportParams(baseUrl = "http://127.0.0.1:43124") { + return { + transportPluginIds: ["qa-channel"], + transportConfig: { + channels: { + "qa-channel": { + enabled: true, + baseUrl, + botUserId: "openclaw", + botDisplayName: "OpenClaw QA", + allowFrom: ["*"], + pollTimeoutMs: 250, + }, + }, + messages: { + groupChat: { + mentionPatterns: ["\\b@?openclaw\\b"], + }, + }, + } satisfies QaTransportGatewayConfig, + }; +} function getPrimaryModel(value: unknown): string | undefined { if (typeof value === "string") { @@ -23,8 +47,8 @@ describe("buildQaGatewayConfig", () => { gatewayPort: 18789, gatewayToken: "token", providerBaseUrl: "http://127.0.0.1:44080/v1", - qaBusBaseUrl: "http://127.0.0.1:43124", workspaceDir: "/tmp/qa-workspace", + ...createQaChannelTransportParams(), }); expect(getPrimaryModel(cfg.agents?.defaults?.model)).toBe("mock-openai/gpt-5.4"); @@ -34,6 +58,12 @@ describe("buildQaGatewayConfig", () => { expect(cfg.plugins?.entries?.["qa-channel"]).toEqual({ enabled: true }); expect(cfg.plugins?.entries?.openai).toBeUndefined(); expect(cfg.gateway?.reload?.deferralTimeoutMs).toBe(1_000); + expect(cfg.channels?.["qa-channel"]).toMatchObject({ + enabled: true, + baseUrl: "http://127.0.0.1:43124", + pollTimeoutMs: 250, + }); + expect(cfg.messages?.groupChat?.mentionPatterns).toEqual(["\\b@?openclaw\\b"]); }); it("can omit qa-channel for live transport gateway children", () => { @@ -42,9 +72,9 @@ describe("buildQaGatewayConfig", () => { gatewayPort: 18789, gatewayToken: "token", providerBaseUrl: "http://127.0.0.1:44080/v1", - qaBusBaseUrl: "http://127.0.0.1:43124", - includeQaChannel: false, workspaceDir: "/tmp/qa-workspace", + transportPluginIds: [], + transportConfig: {}, }); expect(cfg.plugins?.allow).toEqual(["memory-core"]); @@ -57,12 +87,12 @@ describe("buildQaGatewayConfig", () => { bind: "loopback", gatewayPort: 18789, gatewayToken: "token", - qaBusBaseUrl: "http://127.0.0.1:43124", workspaceDir: "/tmp/qa-workspace", providerMode: "live-frontier", fastMode: true, primaryModel: "openai/gpt-5.4", alternateModel: "openai/gpt-5.4", + ...createQaChannelTransportParams(), }); expect(getPrimaryModel(cfg.agents?.defaults?.model)).toBe("openai/gpt-5.4"); @@ -80,12 +110,12 @@ describe("buildQaGatewayConfig", () => { bind: "loopback", gatewayPort: 18789, gatewayToken: "token", - qaBusBaseUrl: "http://127.0.0.1:43124", workspaceDir: "/tmp/qa-workspace", providerMode: "live-frontier", primaryModel: "anthropic/claude-sonnet-4-6", alternateModel: "google/gemini-pro-test", imageGenerationModel: null, + ...createQaChannelTransportParams(), }); expect(cfg.plugins?.allow).toEqual(["memory-core", "anthropic", "google", "qa-channel"]); @@ -100,13 +130,13 @@ describe("buildQaGatewayConfig", () => { bind: "loopback", gatewayPort: 18789, gatewayToken: "token", - qaBusBaseUrl: "http://127.0.0.1:43124", workspaceDir: "/tmp/qa-workspace", providerMode: "live-frontier", primaryModel: "codex-cli/test-model", alternateModel: "codex-cli/test-model", imageGenerationModel: null, enabledPluginIds: ["openai"], + ...createQaChannelTransportParams(), }); expect(getPrimaryModel(cfg.agents?.defaults?.model)).toBe("codex-cli/test-model"); @@ -120,13 +150,13 @@ describe("buildQaGatewayConfig", () => { bind: "loopback", gatewayPort: 18789, gatewayToken: "token", - qaBusBaseUrl: "http://127.0.0.1:43124", workspaceDir: "/tmp/qa-workspace", providerMode: "live-frontier", primaryModel: "custom-openai/model-a", alternateModel: "custom-openai/model-a", imageGenerationModel: null, enabledPluginIds: ["openai"], + ...createQaChannelTransportParams(), liveProviderConfigs: { "custom-openai": { baseUrl: "https://api.example.test/v1", @@ -158,12 +188,12 @@ describe("buildQaGatewayConfig", () => { bind: "loopback", gatewayPort: 18789, gatewayToken: "token", - qaBusBaseUrl: "http://127.0.0.1:43124", workspaceDir: "/tmp/qa-workspace", providerMode: "live-frontier", primaryModel: "openai/gpt-5.4", alternateModel: "openai/gpt-5.4", thinkingDefault: "xhigh", + ...createQaChannelTransportParams(), }); expect(cfg.agents?.defaults?.thinkingDefault).toBe("xhigh"); @@ -177,9 +207,9 @@ describe("buildQaGatewayConfig", () => { bind: "loopback", gatewayPort: 18789, gatewayToken: "token", - qaBusBaseUrl: "http://127.0.0.1:43124", workspaceDir: "/tmp/qa-workspace", controlUiEnabled: false, + ...createQaChannelTransportParams(), }); expect(cfg.gateway?.controlUi?.enabled).toBe(false); @@ -192,9 +222,9 @@ describe("buildQaGatewayConfig", () => { bind: "loopback", gatewayPort: 18789, gatewayToken: "token", - qaBusBaseUrl: "http://127.0.0.1:43124", workspaceDir: "/tmp/qa-workspace", controlUiRoot: "/tmp/openclaw/dist/control-ui", + ...createQaChannelTransportParams(), }); expect(cfg.gateway?.controlUi?.enabled).toBe(true); @@ -211,10 +241,10 @@ describe("buildQaGatewayConfig", () => { bind: "loopback", gatewayPort: 18789, gatewayToken: "token", - qaBusBaseUrl: "http://127.0.0.1:43124", workspaceDir: "/tmp/qa-workspace", controlUiRoot: "/tmp/openclaw/dist/control-ui", controlUiAllowedOrigins: ["http://127.0.0.1:60196"], + ...createQaChannelTransportParams(), }); expect(cfg.gateway?.controlUi?.root).toBe("/tmp/openclaw/dist/control-ui"); diff --git a/extensions/qa-lab/src/qa-gateway-config.ts b/extensions/qa-lab/src/qa-gateway-config.ts index 413dce79abe..d2432c7224e 100644 --- a/extensions/qa-lab/src/qa-gateway-config.ts +++ b/extensions/qa-lab/src/qa-gateway-config.ts @@ -7,6 +7,7 @@ import { splitQaModelRef, type QaProviderMode, } from "./model-selection.js"; +import type { QaTransportGatewayConfig } from "./qa-transport.js"; export const DEFAULT_QA_CONTROL_UI_ALLOWED_ORIGINS = Object.freeze([ "http://127.0.0.1:18789", @@ -56,8 +57,6 @@ export function buildQaGatewayConfig(params: { gatewayPort: number; gatewayToken: string; providerBaseUrl?: string; - qaBusBaseUrl: string; - includeQaChannel?: boolean; workspaceDir: string; controlUiRoot?: string; controlUiAllowedOrigins?: string[]; @@ -68,11 +67,12 @@ export function buildQaGatewayConfig(params: { imageGenerationModel?: string | null; enabledProviderIds?: string[]; enabledPluginIds?: string[]; + transportPluginIds?: readonly string[]; + transportConfig?: QaTransportGatewayConfig; liveProviderConfigs?: Record; fastMode?: boolean; thinkingDefault?: QaThinkingLevel; }): OpenClawConfig { - const includeQaChannel = params.includeQaChannel !== false; const mockProviderBaseUrl = params.providerBaseUrl ?? "http://127.0.0.1:44080/v1"; const mockOpenAiProvider: ModelProviderConfig = { baseUrl: mockProviderBaseUrl, @@ -163,14 +163,23 @@ export function buildQaGatewayConfig(params: { ), ] : []; + const transportPluginIds = [...new Set(params.transportPluginIds ?? [])] + .map((pluginId) => pluginId.trim()) + .filter((pluginId) => pluginId.length > 0); const pluginEntries = providerMode === "live-frontier" ? Object.fromEntries(selectedPluginIds.map((pluginId) => [pluginId, { enabled: true }])) : {}; - const allowedPlugins = - providerMode === "live-frontier" - ? ["memory-core", ...selectedPluginIds, ...(includeQaChannel ? ["qa-channel"] : [])] - : ["memory-core", ...(includeQaChannel ? ["qa-channel"] : [])]; + const transportPluginEntries = Object.fromEntries( + transportPluginIds.map((pluginId) => [pluginId, { enabled: true }]), + ); + const allowedPlugins = [ + ...new Set( + providerMode === "live-frontier" + ? ["memory-core", ...selectedPluginIds, ...transportPluginIds] + : ["memory-core", ...transportPluginIds], + ), + ]; const liveModelParams = providerMode === "live-frontier" ? (modelRef: string) => ({ @@ -199,7 +208,7 @@ export function buildQaGatewayConfig(params: { enabled: true, }, ...pluginEntries, - ...(includeQaChannel ? { "qa-channel": { enabled: true } } : {}), + ...transportPluginEntries, }, }, agents: { @@ -307,24 +316,7 @@ export function buildQaGatewayConfig(params: { mode: "off", }, }, - ...(includeQaChannel - ? { - channels: { - "qa-channel": { - enabled: true, - baseUrl: params.qaBusBaseUrl, - botUserId: "openclaw", - botDisplayName: "OpenClaw QA", - allowFrom: ["*"], - pollTimeoutMs: 250, - }, - }, - } - : {}), - messages: { - groupChat: { - mentionPatterns: ["\\b@?openclaw\\b"], - }, - }, + ...(params.transportConfig?.channels ? { channels: params.transportConfig.channels } : {}), + ...(params.transportConfig?.messages ? { messages: params.transportConfig.messages } : {}), } satisfies OpenClawConfig; } diff --git a/extensions/qa-lab/src/qa-transport-registry.ts b/extensions/qa-lab/src/qa-transport-registry.ts new file mode 100644 index 00000000000..6f1050c6403 --- /dev/null +++ b/extensions/qa-lab/src/qa-transport-registry.ts @@ -0,0 +1,29 @@ +import type { QaBusState } from "./bus-state.js"; +import { createQaChannelTransport } from "./qa-channel-transport.js"; +import type { QaTransportAdapter } from "./qa-transport.js"; + +export type QaTransportId = "qa-channel"; + +export function normalizeQaTransportId(input?: string | null): QaTransportId { + const transportId = input?.trim() || "qa-channel"; + switch (transportId) { + case "qa-channel": + return transportId; + default: + throw new Error(`unsupported QA transport: ${transportId}`); + } +} + +export function createQaTransportAdapter(params: { + id: QaTransportId; + state: QaBusState; +}): QaTransportAdapter { + switch (params.id) { + case "qa-channel": + return createQaChannelTransport(params.state); + default: { + const unsupported: never = params.id; + throw new Error(`unsupported QA transport: ${String(unsupported)}`); + } + } +} diff --git a/extensions/qa-lab/src/qa-transport.ts b/extensions/qa-lab/src/qa-transport.ts new file mode 100644 index 00000000000..8767912387a --- /dev/null +++ b/extensions/qa-lab/src/qa-transport.ts @@ -0,0 +1,223 @@ +import { setTimeout as sleep } from "node:timers/promises"; +import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; +import { extractQaFailureReplyText } from "./reply-failure.js"; +import type { + QaBusInboundMessageInput, + QaBusMessage, + QaBusOutboundMessageInput, + QaBusSearchMessagesInput, + QaBusReadMessageInput, + QaBusStateSnapshot, + QaBusWaitForInput, +} from "./runtime-api.js"; + +export type QaTransportGatewayClient = { + call: ( + method: string, + params?: unknown, + options?: { + timeoutMs?: number; + }, + ) => Promise; +}; + +export type QaTransportActionName = "delete" | "edit" | "react" | "thread-create"; + +export type QaTransportReportParams = { + providerMode: "mock-openai" | "live-frontier"; + primaryModel: string; + alternateModel: string; + fastMode: boolean; + concurrency: number; +}; + +export type QaTransportGatewayConfig = Pick; + +export type QaTransportState = { + reset: () => void | Promise; + getSnapshot: () => QaBusStateSnapshot; + addInboundMessage: (input: QaBusInboundMessageInput) => QaBusMessage | Promise; + addOutboundMessage: (input: QaBusOutboundMessageInput) => QaBusMessage | Promise; + readMessage: ( + input: QaBusReadMessageInput, + ) => QaBusMessage | null | undefined | Promise; + searchMessages: (input: QaBusSearchMessagesInput) => QaBusMessage[] | Promise; + waitFor: (input: QaBusWaitForInput) => Promise; +}; + +export type QaTransportFailureCursorSpace = "all" | "outbound"; + +export type QaTransportFailureAssertionOptions = { + sinceIndex?: number; + cursorSpace?: QaTransportFailureCursorSpace; +}; + +export type QaTransportCommonCapabilities = { + sendInboundMessage: QaTransportState["addInboundMessage"]; + injectOutboundMessage: QaTransportState["addOutboundMessage"]; + waitForOutboundMessage: (input: QaBusWaitForInput) => Promise; + getNormalizedMessageState: () => QaBusStateSnapshot; + resetNormalizedMessageState: () => Promise; + readNormalizedMessage: QaTransportState["readMessage"]; + executeGenericAction: (params: { + action: QaTransportActionName; + args: Record; + cfg: OpenClawConfig; + accountId?: string | null; + }) => Promise; + waitForReady: (params: { + gateway: QaTransportGatewayClient; + timeoutMs?: number; + }) => Promise; + waitForCondition: ( + check: () => T | Promise | null | undefined, + timeoutMs?: number, + intervalMs?: number, + ) => Promise; + assertNoFailureReplies: (options?: QaTransportFailureAssertionOptions) => void; +}; + +export async function waitForQaTransportCondition( + check: () => T | Promise | null | undefined, + timeoutMs = 15_000, + intervalMs = 100, +): Promise { + const startedAt = Date.now(); + while (Date.now() - startedAt < timeoutMs) { + const value = await check(); + if (value !== null && value !== undefined) { + return value; + } + await sleep(intervalMs); + } + throw new Error(`timed out after ${timeoutMs}ms`); +} + +export function findFailureOutboundMessage( + state: QaTransportState, + options?: QaTransportFailureAssertionOptions, +) { + const cursorSpace = options?.cursorSpace ?? "outbound"; + const observedMessages = + cursorSpace === "all" + ? state.getSnapshot().messages.slice(options?.sinceIndex ?? 0) + : state + .getSnapshot() + .messages.filter((message) => message.direction === "outbound") + .slice(options?.sinceIndex ?? 0); + return observedMessages.find( + (message) => + message.direction === "outbound" && Boolean(extractQaFailureReplyText(message.text)), + ); +} + +export function assertNoFailureReplies( + state: QaTransportState, + options?: QaTransportFailureAssertionOptions, +) { + const failureMessage = findFailureOutboundMessage(state, options); + if (failureMessage) { + throw new Error(extractQaFailureReplyText(failureMessage.text) ?? failureMessage.text); + } +} + +export function createFailureAwareTransportWaitForCondition(state: QaTransportState) { + return async function waitForTransportCondition( + check: () => T | Promise | null | undefined, + timeoutMs = 15_000, + intervalMs = 100, + ): Promise { + const sinceIndex = state.getSnapshot().messages.length; + return await waitForQaTransportCondition( + async () => { + assertNoFailureReplies(state, { + sinceIndex, + cursorSpace: "all", + }); + return await check(); + }, + timeoutMs, + intervalMs, + ); + }; +} + +export type QaTransportAdapter = { + id: string; + label: string; + accountId: string; + requiredPluginIds: readonly string[]; + state: QaTransportState; + capabilities: QaTransportCommonCapabilities; + createGatewayConfig: (params: { baseUrl: string }) => QaTransportGatewayConfig; + waitReady: (params: { gateway: QaTransportGatewayClient; timeoutMs?: number }) => Promise; + buildAgentDelivery: (params: { target: string }) => { + channel: string; + replyChannel: string; + replyTo: string; + }; + handleAction: (params: { + action: QaTransportActionName; + args: Record; + cfg: OpenClawConfig; + accountId?: string | null; + }) => Promise; + createReportNotes: (params: QaTransportReportParams) => string[]; +}; + +export abstract class QaStateBackedTransportAdapter implements QaTransportAdapter { + readonly id: string; + readonly label: string; + readonly accountId: string; + readonly requiredPluginIds: readonly string[]; + readonly state: QaTransportState; + readonly capabilities: QaTransportCommonCapabilities; + + protected constructor(params: { + id: string; + label: string; + accountId: string; + requiredPluginIds: readonly string[]; + state: QaTransportState; + }) { + this.id = params.id; + this.label = params.label; + this.accountId = params.accountId; + this.requiredPluginIds = params.requiredPluginIds; + this.state = params.state; + this.capabilities = { + sendInboundMessage: this.state.addInboundMessage.bind(this.state), + injectOutboundMessage: this.state.addOutboundMessage.bind(this.state), + waitForOutboundMessage: this.state.waitFor.bind(this.state), + getNormalizedMessageState: this.state.getSnapshot.bind(this.state), + resetNormalizedMessageState: async () => { + await this.state.reset(); + }, + readNormalizedMessage: this.state.readMessage.bind(this.state), + executeGenericAction: (params) => this.handleAction(params), + waitForReady: (params) => this.waitReady(params), + waitForCondition: createFailureAwareTransportWaitForCondition(this.state), + assertNoFailureReplies: (options) => { + assertNoFailureReplies(this.state, options); + }, + }; + } + + abstract createGatewayConfig: (params: { baseUrl: string }) => QaTransportGatewayConfig; + abstract waitReady: (params: { + gateway: QaTransportGatewayClient; + timeoutMs?: number; + }) => Promise; + abstract buildAgentDelivery: (params: { target: string }) => { + channel: string; + replyChannel: string; + replyTo: string; + }; + abstract handleAction: (params: { + action: QaTransportActionName; + args: Record; + cfg: OpenClawConfig; + accountId?: string | null; + }) => Promise; + abstract createReportNotes: (params: QaTransportReportParams) => string[]; +} diff --git a/extensions/qa-lab/src/scenario-flow-runner.ts b/extensions/qa-lab/src/scenario-flow-runner.ts index 81eade69ed2..345f511a994 100644 --- a/extensions/qa-lab/src/scenario-flow-runner.ts +++ b/extensions/qa-lab/src/scenario-flow-runner.ts @@ -1,5 +1,5 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; -import type { QaBusState } from "./bus-state.js"; +import type { QaTransportState } from "./qa-transport.js"; import type { QaScenarioFlow, QaSeedScenarioWithSource } from "./scenario-catalog.js"; type QaSuiteStep = { @@ -19,7 +19,7 @@ type QaSuiteScenarioResult = { }; type QaFlowApi = Record & { - state: QaBusState; + state: QaTransportState; scenario: QaSeedScenarioWithSource; config: Record; runScenario: (name: string, steps: QaSuiteStep[]) => Promise; diff --git a/extensions/qa-lab/src/scenario.ts b/extensions/qa-lab/src/scenario.ts index 623f09db293..3d89a494885 100644 --- a/extensions/qa-lab/src/scenario.ts +++ b/extensions/qa-lab/src/scenario.ts @@ -1,8 +1,12 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; -import type { QaBusState } from "./bus-state.js"; +import type { QaTransportActionName, QaTransportState } from "./qa-transport.js"; export type QaScenarioStepContext = { - state: QaBusState; + state: QaTransportState; + performAction?: ( + action: QaTransportActionName, + args: Record, + ) => Promise; }; export type QaScenarioStep = { diff --git a/extensions/qa-lab/src/self-check-scenario.ts b/extensions/qa-lab/src/self-check-scenario.ts index bdcd6026605..8bf72e6525a 100644 --- a/extensions/qa-lab/src/self-check-scenario.ts +++ b/extensions/qa-lab/src/self-check-scenario.ts @@ -1,15 +1,14 @@ import { extractQaToolPayload } from "./extract-tool-payload.js"; -import { qaChannelPlugin, type OpenClawConfig } from "./runtime-api.js"; import type { QaScenarioDefinition } from "./scenario.js"; -export function createQaSelfCheckScenario(cfg: OpenClawConfig): QaScenarioDefinition { +export function createQaSelfCheckScenario(): QaScenarioDefinition { return { name: "Synthetic Slack-class roundtrip", steps: [ { name: "DM echo roundtrip", async run({ state }) { - state.addInboundMessage({ + await state.addInboundMessage({ conversation: { id: "alice", kind: "direct" }, senderId: "alice", senderName: "Alice", @@ -25,26 +24,23 @@ export function createQaSelfCheckScenario(cfg: OpenClawConfig): QaScenarioDefini }, { name: "Thread create and threaded echo", - async run({ state }) { - const threadResult = await qaChannelPlugin.actions?.handleAction?.({ - channel: "qa-channel", - action: "thread-create", - cfg, - accountId: "default", - params: { - channelId: "qa-room", - title: "QA thread", - }, + async run({ state, performAction }) { + if (!performAction) { + throw new Error("self-check action dispatcher is not configured"); + } + const threadResult = await performAction("thread-create", { + channelId: "qa-room", + title: "QA thread", }); - const threadPayload = extractQaToolPayload(threadResult) as - | { thread?: { id?: string } } - | undefined; + const threadPayload = extractQaToolPayload( + threadResult as Parameters[0], + ) as { thread?: { id?: string } } | undefined; const threadId = threadPayload?.thread?.id; if (!threadId) { throw new Error("thread-create did not return thread id"); } - state.addInboundMessage({ + await state.addInboundMessage({ conversation: { id: "qa-room", kind: "channel", title: "QA Room" }, senderId: "alice", senderName: "Alice", @@ -63,54 +59,51 @@ export function createQaSelfCheckScenario(cfg: OpenClawConfig): QaScenarioDefini }, { name: "Reaction, edit, delete lifecycle", - async run({ state }) { - const outbound = state - .searchMessages({ query: "qa-echo: inside thread", conversationId: "qa-room" }) - .at(-1); - if (!outbound) { + async run({ state, performAction }) { + if (!performAction) { + throw new Error("self-check action dispatcher is not configured"); + } + const outboundMessage = ( + await state.searchMessages({ + query: "qa-echo: inside thread", + conversationId: "qa-room", + }) + ).at(-1); + if (!outboundMessage) { throw new Error("threaded outbound message not found"); } - await qaChannelPlugin.actions?.handleAction?.({ - channel: "qa-channel", - action: "react", - cfg, - accountId: "default", - params: { - messageId: outbound.id, - emoji: "white_check_mark", - }, + await performAction("react", { + messageId: outboundMessage.id, + emoji: "white_check_mark", }); - const reacted = state.readMessage({ messageId: outbound.id }); + const reacted = await state.readMessage({ messageId: outboundMessage.id }); + if (!reacted) { + throw new Error("reacted message not found"); + } if (reacted.reactions.length === 0) { throw new Error("reaction not recorded"); } - await qaChannelPlugin.actions?.handleAction?.({ - channel: "qa-channel", - action: "edit", - cfg, - accountId: "default", - params: { - messageId: outbound.id, - text: "qa-echo: inside thread (edited)", - }, + await performAction("edit", { + messageId: outboundMessage.id, + text: "qa-echo: inside thread (edited)", }); - const edited = state.readMessage({ messageId: outbound.id }); + const edited = await state.readMessage({ messageId: outboundMessage.id }); + if (!edited) { + throw new Error("edited message not found"); + } if (!edited.text.includes("(edited)")) { throw new Error("edit not recorded"); } - await qaChannelPlugin.actions?.handleAction?.({ - channel: "qa-channel", - action: "delete", - cfg, - accountId: "default", - params: { - messageId: outbound.id, - }, + await performAction("delete", { + messageId: outboundMessage.id, }); - const deleted = state.readMessage({ messageId: outbound.id }); + const deleted = await state.readMessage({ messageId: outboundMessage.id }); + if (!deleted) { + throw new Error("deleted message not found"); + } if (!deleted.deleted) { throw new Error("delete not recorded"); } diff --git a/extensions/qa-lab/src/self-check.ts b/extensions/qa-lab/src/self-check.ts index 848de9e0857..4c29fdf2bb3 100644 --- a/extensions/qa-lab/src/self-check.ts +++ b/extensions/qa-lab/src/self-check.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import type { QaBusState } from "./bus-state.js"; +import { createQaTransportAdapter, type QaTransportId } from "./qa-transport-registry.js"; import { renderQaMarkdownReport } from "./report.js"; import { runQaScenario, type QaScenarioResult } from "./scenario.js"; import { createQaSelfCheckScenario } from "./self-check-scenario.js"; @@ -24,15 +25,27 @@ export function resolveQaSelfCheckOutputPath(params?: { outputPath?: string; rep export async function runQaSelfCheckAgainstState(params: { state: QaBusState; cfg: OpenClawConfig; + transportId?: QaTransportId; outputPath?: string; repoRoot?: string; notes?: string[]; }): Promise { const startedAt = new Date(); - params.state.reset(); - const scenarioResult = await runQaScenario(createQaSelfCheckScenario(params.cfg), { + const transport = createQaTransportAdapter({ + id: params.transportId ?? "qa-channel", state: params.state, }); + params.state.reset(); + const scenarioResult = await runQaScenario(createQaSelfCheckScenario(), { + state: params.state, + performAction: async (action, args) => + await transport.handleAction({ + action, + args, + cfg: params.cfg, + accountId: transport.accountId, + }), + }); const checks = [ { name: "QA self-check scenario", diff --git a/extensions/qa-lab/src/suite.test.ts b/extensions/qa-lab/src/suite.test.ts index 70b23e80e61..95e932a3e67 100644 --- a/extensions/qa-lab/src/suite.test.ts +++ b/extensions/qa-lab/src/suite.test.ts @@ -1,6 +1,9 @@ -import { describe, expect, it } from "vitest"; +import { lstat, mkdir, mkdtemp, rm, symlink } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it, vi } from "vitest"; import { createQaBusState } from "./bus-state.js"; -import { qaSuiteTesting } from "./suite.js"; +import { qaSuiteTesting, runQaSuite } from "./suite.js"; describe("qa suite failure reply handling", () => { const makeScenario = ( @@ -39,6 +42,57 @@ describe("qa suite failure reply handling", () => { } }); + it("keeps programmatic suite output dirs within the repo root", async () => { + const repoRoot = await mkdtemp(path.join(os.tmpdir(), "qa-suite-existing-root-")); + try { + await expect( + qaSuiteTesting.resolveQaSuiteOutputDir( + repoRoot, + path.join(repoRoot, ".artifacts", "qa-e2e", "custom"), + ), + ).resolves.toBe(path.join(repoRoot, ".artifacts", "qa-e2e", "custom")); + await expect( + lstat(path.join(repoRoot, ".artifacts", "qa-e2e", "custom")).then((stats) => + stats.isDirectory(), + ), + ).resolves.toBe(true); + await expect( + qaSuiteTesting.resolveQaSuiteOutputDir(repoRoot, "/tmp/outside"), + ).rejects.toThrow("QA suite outputDir must stay within the repo root."); + } finally { + await rm(repoRoot, { recursive: true, force: true }); + } + }); + + it("rejects symlinked suite output dirs that escape the repo root", async () => { + const repoRoot = await mkdtemp(path.join(os.tmpdir(), "qa-suite-root-")); + const outsideRoot = await mkdtemp(path.join(os.tmpdir(), "qa-suite-outside-")); + try { + await mkdir(path.join(repoRoot, ".artifacts"), { recursive: true }); + await symlink(outsideRoot, path.join(repoRoot, ".artifacts", "qa-e2e"), "dir"); + + await expect( + qaSuiteTesting.resolveQaSuiteOutputDir(repoRoot, ".artifacts/qa-e2e/custom"), + ).rejects.toThrow("QA suite outputDir must not traverse symlinks."); + } finally { + await rm(repoRoot, { recursive: true, force: true }); + await rm(outsideRoot, { recursive: true, force: true }); + } + }); + + it("rejects unsupported transport ids before starting the lab", async () => { + const startLab = vi.fn(); + + await expect( + runQaSuite({ + transportId: "qa-nope" as unknown as "qa-channel", + startLab, + }), + ).rejects.toThrow("unsupported QA transport: qa-nope"); + + expect(startLab).not.toHaveBeenCalled(); + }); + it("maps suite work with bounded concurrency while preserving order", async () => { let active = 0; let maxActive = 0; @@ -272,4 +326,56 @@ describe("qa suite failure reply handling", () => { await expect(pending).rejects.toThrow('No API key found for provider "openai".'); }); + + it("reads transport transcripts with generic helper names", () => { + const state = createQaBusState(); + state.addInboundMessage({ + conversation: { id: "qa-operator", kind: "direct" }, + senderId: "alice", + senderName: "Alice", + text: "hello", + }); + state.addOutboundMessage({ + to: "dm:qa-operator", + text: "working on it", + senderId: "openclaw", + senderName: "OpenClaw QA", + }); + state.addOutboundMessage({ + to: "dm:qa-operator", + text: "done", + senderId: "openclaw", + senderName: "OpenClaw QA", + }); + + const messages = qaSuiteTesting.readTransportTranscript(state, { + conversationId: "qa-operator", + direction: "outbound", + }); + const formatted = qaSuiteTesting.formatTransportTranscript(state, { + conversationId: "qa-operator", + }); + + expect(messages.map((message) => message.text)).toEqual(["working on it", "done"]); + expect(formatted).toContain("USER Alice: hello"); + expect(formatted).toContain("ASSISTANT OpenClaw QA: working on it"); + }); + + it("waits for outbound replies through the generic transport alias", async () => { + const state = createQaBusState(); + const pending = qaSuiteTesting.waitForTransportOutboundMessage( + state, + (candidate) => candidate.conversation.id === "qa-operator" && candidate.text.includes("done"), + 5_000, + ); + + state.addOutboundMessage({ + to: "dm:qa-operator", + text: "done", + senderId: "openclaw", + senderName: "OpenClaw QA", + }); + + await expect(pending).resolves.toMatchObject({ text: "done" }); + }); }); diff --git a/extensions/qa-lab/src/suite.ts b/extensions/qa-lab/src/suite.ts index 871b4db0e3f..9a131f030eb 100644 --- a/extensions/qa-lab/src/suite.ts +++ b/extensions/qa-lab/src/suite.ts @@ -15,7 +15,7 @@ import { import { buildAgentSessionKey } from "openclaw/plugin-sdk/routing"; import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; -import type { QaBusState } from "./bus-state.js"; +import { ensureRepoBoundDirectory, resolveRepoRelativeOutputDir } from "./cli-paths.js"; import { waitForCronRunCompletion } from "./cron-run-wait.js"; import { hasDiscoveryLabels, @@ -40,6 +40,20 @@ import { } from "./model-selection.js"; import { hasModelSwitchContinuityEvidence } from "./model-switch-eval.js"; import type { QaThinkingLevel } from "./qa-gateway-config.js"; +import { + createQaTransportAdapter, + normalizeQaTransportId, + type QaTransportId, +} from "./qa-transport-registry.js"; +import type { + QaTransportAdapter, + QaTransportActionName, + QaTransportState, +} from "./qa-transport.js"; +import { + createFailureAwareTransportWaitForCondition, + findFailureOutboundMessage as findTransportFailureOutboundMessage, +} from "./qa-transport.js"; import { extractQaFailureReplyText } from "./reply-failure.js"; import { renderQaMarkdownReport, type QaReportCheck, type QaReportScenario } from "./report.js"; import { qaChannelPlugin, type QaBusMessage } from "./runtime-api.js"; @@ -63,6 +77,7 @@ type QaSuiteEnvironment = { mock: Awaited> | null; gateway: Awaited>; cfg: OpenClawConfig; + transport: QaTransportAdapter; repoRoot: string; providerMode: "mock-openai" | "live-frontier"; primaryModel: string; @@ -75,6 +90,7 @@ export type QaSuiteRunParams = { repoRoot?: string; outputDir?: string; providerMode?: QaProviderMode | "live-openai"; + transportId?: QaTransportId; primaryModel?: string; alternateModel?: string; fastMode?: boolean; @@ -84,6 +100,7 @@ export type QaSuiteRunParams = { lab?: QaLabServerHandle; startLab?: QaSuiteStartLabFn; concurrency?: number; + controlUiEnabled?: boolean; }; function requireQaSuiteStartLab(startLab: QaSuiteStartLabFn | undefined): QaSuiteStartLabFn { @@ -248,6 +265,24 @@ function liveTurnTimeoutMs(env: QaSuiteEnvironment, fallbackMs: number) { return resolveQaLiveTurnTimeoutMs(env, fallbackMs); } +async function resolveQaSuiteOutputDir(repoRoot: string, outputDir?: string) { + const targetDir = !outputDir + ? path.join(repoRoot, ".artifacts", "qa-e2e", `suite-${Date.now().toString(36)}`) + : outputDir; + if (!path.isAbsolute(targetDir)) { + const resolved = resolveRepoRelativeOutputDir(repoRoot, targetDir); + if (!resolved) { + throw new Error("QA suite outputDir must be set."); + } + return await ensureRepoBoundDirectory(repoRoot, resolved, "QA suite outputDir", { + mode: 0o700, + }); + } + return await ensureRepoBoundDirectory(repoRoot, targetDir, "QA suite outputDir", { + mode: 0o700, + }); +} + export type QaSuiteResult = { outputDir: string; reportPath: string; @@ -257,20 +292,6 @@ export type QaSuiteResult = { watchUrl: string; }; -function createQaActionConfig(baseUrl: string): OpenClawConfig { - return { - channels: { - "qa-channel": { - enabled: true, - baseUrl, - botUserId: "openclaw", - botDisplayName: "OpenClaw QA", - allowFrom: ["*"], - }, - }, - }; -} - async function waitForCondition( check: () => T | Promise | null | undefined, timeoutMs = 15_000, @@ -288,49 +309,18 @@ async function waitForCondition( } function findFailureOutboundMessage( - state: QaBusState, + state: QaTransportState, options?: { sinceIndex?: number; cursorSpace?: "all" | "outbound" }, ) { - const cursorSpace = options?.cursorSpace ?? "outbound"; - const observedMessages = - cursorSpace === "all" - ? state.getSnapshot().messages.slice(options?.sinceIndex ?? 0) - : state - .getSnapshot() - .messages.filter((message) => message.direction === "outbound") - .slice(options?.sinceIndex ?? 0); - return observedMessages.find( - (message) => - message.direction === "outbound" && Boolean(extractQaFailureReplyText(message.text)), - ); + return findTransportFailureOutboundMessage(state, options); } -function createScenarioWaitForCondition(state: QaBusState) { - const sinceIndex = state.getSnapshot().messages.length; - return async function waitForScenarioCondition( - check: () => T | Promise | null | undefined, - timeoutMs = 15_000, - intervalMs = 100, - ): Promise { - return await waitForCondition( - async () => { - const failureMessage = findFailureOutboundMessage(state, { - sinceIndex, - cursorSpace: "all", - }); - if (failureMessage) { - throw new Error(extractQaFailureReplyText(failureMessage.text) ?? failureMessage.text); - } - return await check(); - }, - timeoutMs, - intervalMs, - ); - }; +function createScenarioWaitForCondition(state: QaTransportState) { + return createFailureAwareTransportWaitForCondition(state); } async function waitForOutboundMessage( - state: QaBusState, + state: QaTransportState, predicate: (message: QaBusMessage) => boolean, timeoutMs = 15_000, options?: { sinceIndex?: number }, @@ -356,7 +346,7 @@ async function waitForOutboundMessage( }, timeoutMs); } -async function waitForNoOutbound(state: QaBusState, timeoutMs = 1_200) { +async function waitForNoOutbound(state: QaTransportState, timeoutMs = 1_200) { await sleep(timeoutMs); const outbound = state .getSnapshot() @@ -366,7 +356,7 @@ async function waitForNoOutbound(state: QaBusState, timeoutMs = 1_200) { } } -function recentOutboundSummary(state: QaBusState, limit = 5) { +function recentOutboundSummary(state: QaTransportState, limit = 5) { return state .getSnapshot() .messages.filter((message) => message.direction === "outbound") @@ -376,22 +366,47 @@ function recentOutboundSummary(state: QaBusState, limit = 5) { } function formatConversationTranscript( - state: QaBusState, + state: QaTransportState, params: { conversationId: string; threadId?: string; limit?: number; }, +) { + return formatTransportTranscript(state, params); +} + +function readTransportTranscript( + state: QaTransportState, + params: { + conversationId: string; + threadId?: string; + direction?: "inbound" | "outbound"; + limit?: number; + }, ) { const messages = state .getSnapshot() .messages.filter( (message) => message.conversation.id === params.conversationId && - (params.threadId ? message.threadId === params.threadId : true), + (params.threadId ? message.threadId === params.threadId : true) && + (params.direction ? message.direction === params.direction : true), ); - const selected = params.limit ? messages.slice(-params.limit) : messages; - return selected + return params.limit ? messages.slice(-params.limit) : messages; +} + +function formatTransportTranscript( + state: QaTransportState, + params: { + conversationId: string; + threadId?: string; + direction?: "inbound" | "outbound"; + limit?: number; + }, +) { + const messages = readTransportTranscript(state, params); + return messages .map((message) => { const direction = message.direction === "inbound" ? "user" : "assistant"; const speaker = message.senderName?.trim() || message.senderId; @@ -406,6 +421,26 @@ function formatConversationTranscript( .join("\n\n"); } +async function waitForTransportOutboundMessage( + state: QaTransportState, + predicate: (message: QaBusMessage) => boolean, + timeoutMs?: number, +) { + return await waitForOutboundMessage(state, predicate, timeoutMs); +} + +async function waitForChannelOutboundMessage( + state: QaTransportState, + predicate: (message: QaBusMessage) => boolean, + timeoutMs?: number, +) { + return await waitForTransportOutboundMessage(state, predicate, timeoutMs); +} + +async function waitForNoTransportOutbound(state: QaTransportState, timeoutMs = 1_200) { + await waitForNoOutbound(state, timeoutMs); +} + async function runScenario(name: string, steps: QaSuiteStep[]): Promise { const stepResults: QaReportCheck[] = []; for (const step of steps) { @@ -486,37 +521,17 @@ async function waitForGatewayHealthy(env: QaSuiteEnvironment, timeoutMs = 45_000 ); } -async function waitForQaChannelReady(env: QaSuiteEnvironment, timeoutMs = 45_000) { - await waitForCondition( - async () => { - try { - const payload = (await env.gateway.call( - "channels.status", - { probe: false, timeoutMs: 2_000 }, - { timeoutMs: 5_000 }, - )) as { - channelAccounts?: Record< - string, - Array<{ - accountId?: string; - running?: boolean; - restartPending?: boolean; - }> - >; - }; - const accounts = payload.channelAccounts?.["qa-channel"] ?? []; - const account = accounts.find((entry) => entry.accountId === "default") ?? accounts[0]; - if (account?.running && account.restartPending !== true) { - return true; - } - return undefined; - } catch { - return undefined; - } - }, +async function waitForTransportReady(env: QaSuiteEnvironment, timeoutMs = 45_000) { + await env.transport.waitReady({ + gateway: env.gateway, timeoutMs, - 500, - ); + }); +} + +async function waitForQaChannelReady(env: QaSuiteEnvironment, timeoutMs = 45_000) { + // Compatibility alias for existing markdown scenarios while qa-channel + // remains the only suite transport. + await waitForTransportReady(env, timeoutMs); } async function waitForConfigRestartSettle( @@ -613,7 +628,7 @@ async function runConfigMutation(params: { ...(params.note ? { note: params.note } : {}), restartDelayMs, }, - { timeoutMs: 45_000, retryOnRestart: false }, + { timeoutMs: 45_000 }, ); await waitForConfigRestartSettle(params.env, restartDelayMs); return result; @@ -888,6 +903,7 @@ async function startAgentRun( }, ) { const target = params.to ?? "dm:qa-operator"; + const delivery = env.transport.buildAgentDelivery({ target }); const started = (await env.gateway.call( "agent", { @@ -896,10 +912,10 @@ async function startAgentRun( sessionKey: params.sessionKey, message: params.message, deliver: true, - channel: "qa-channel", + channel: delivery.channel, to: target, - replyChannel: "qa-channel", - replyTo: target, + replyChannel: delivery.replyChannel, + replyTo: delivery.replyTo, ...(params.threadId ? { threadId: params.threadId } : {}), ...(params.provider ? { provider: params.provider } : {}), ...(params.model ? { model: params.model } : {}), @@ -961,7 +977,7 @@ async function forceMemoryIndex(params: { expectedNeedle: string; }) { await waitForGatewayHealthy(params.env, 60_000); - await waitForQaChannelReady(params.env, 60_000); + await waitForTransportReady(params.env, 60_000); await runQaCli(params.env, ["memory", "index", "--agent", "qa", "--force"], { timeoutMs: liveTurnTimeoutMs(params.env, 60_000), }); @@ -1074,7 +1090,7 @@ async function ensureImageGenerationConfigured(env: QaSuiteEnvironment) { env.providerMode === "mock-openai" ? { plugins: { - allow: ["memory-core", "openai", "qa-channel"], + allow: [...new Set(["memory-core", "openai", ...env.transport.requiredPluginIds])], entries: { openai: { enabled: true, @@ -1126,30 +1142,26 @@ async function ensureImageGenerationConfigured(env: QaSuiteEnvironment) { }, }); await waitForGatewayHealthy(env); - await waitForQaChannelReady(env, 60_000); + await waitForTransportReady(env, 60_000); } -type QaActionName = "delete" | "edit" | "react" | "thread-create"; - async function handleQaAction(params: { env: QaSuiteEnvironment; - action: QaActionName; + action: QaTransportActionName; args: Record; }) { - const result = await qaChannelPlugin.actions?.handleAction?.({ - channel: "qa-channel", + const result = await params.env.transport.handleAction({ action: params.action, + args: params.args, cfg: params.env.cfg, - accountId: "default", - params: params.args, }); - return extractQaToolPayload(result); + return extractQaToolPayload(result as Parameters[0]); } type QaScenarioFlowApi = { env: QaSuiteEnvironment; lab: QaSuiteEnvironment["lab"]; - state: QaBusState; + state: QaTransportState; scenario: ReturnType["scenarios"][number]; config: Record; fs: typeof fs; @@ -1159,11 +1171,18 @@ type QaScenarioFlowApi = { runScenario: typeof runScenario; waitForCondition: typeof waitForCondition; waitForOutboundMessage: typeof waitForOutboundMessage; + waitForTransportOutboundMessage: typeof waitForTransportOutboundMessage; + waitForChannelOutboundMessage: typeof waitForChannelOutboundMessage; waitForNoOutbound: typeof waitForNoOutbound; + waitForNoTransportOutbound: typeof waitForNoTransportOutbound; recentOutboundSummary: typeof recentOutboundSummary; formatConversationTranscript: typeof formatConversationTranscript; + readTransportTranscript: typeof readTransportTranscript; + formatTransportTranscript: typeof formatTransportTranscript; fetchJson: typeof fetchJson; waitForGatewayHealthy: typeof waitForGatewayHealthy; + waitForTransportReady: typeof waitForTransportReady; + waitForChannelReady: typeof waitForTransportReady; waitForQaChannelReady: typeof waitForQaChannelReady; waitForConfigRestartSettle: typeof waitForConfigRestartSettle; patchConfig: typeof patchConfig; @@ -1205,6 +1224,11 @@ type QaScenarioFlowApi = { imageUnderstandingPngBase64: string; imageUnderstandingLargePngBase64: string; imageUnderstandingValidPngBase64: string; + getTransportSnapshot: () => ReturnType; + resetTransport: () => Promise; + injectInboundMessage: QaTransportState["addInboundMessage"]; + injectOutboundMessage: QaTransportState["addOutboundMessage"]; + readTransportMessage: QaTransportState["readMessage"]; resetBus: () => Promise; reset: () => Promise; }; @@ -1216,7 +1240,7 @@ function createScenarioFlowApi( return { env, lab: env.lab, - state: env.lab.state, + state: env.transport.state, scenario, config: scenario.execution.config ?? {}, fs, @@ -1224,13 +1248,20 @@ function createScenarioFlowApi( sleep, randomUUID, runScenario, - waitForCondition: createScenarioWaitForCondition(env.lab.state), + waitForCondition: env.transport.capabilities.waitForCondition, waitForOutboundMessage, + waitForTransportOutboundMessage, + waitForChannelOutboundMessage, waitForNoOutbound, + waitForNoTransportOutbound, recentOutboundSummary, formatConversationTranscript, + readTransportTranscript, + formatTransportTranscript, fetchJson, waitForGatewayHealthy, + waitForTransportReady, + waitForChannelReady: waitForTransportReady, waitForQaChannelReady, waitForConfigRestartSettle, patchConfig, @@ -1272,12 +1303,20 @@ function createScenarioFlowApi( imageUnderstandingPngBase64: _QA_IMAGE_UNDERSTANDING_PNG_BASE64, imageUnderstandingLargePngBase64: _QA_IMAGE_UNDERSTANDING_LARGE_PNG_BASE64, imageUnderstandingValidPngBase64: QA_IMAGE_UNDERSTANDING_VALID_PNG_BASE64, + getTransportSnapshot: env.transport.capabilities.getNormalizedMessageState, + resetTransport: async () => { + await env.transport.capabilities.resetNormalizedMessageState(); + await sleep(100); + }, + injectInboundMessage: env.transport.capabilities.sendInboundMessage, + injectOutboundMessage: env.transport.capabilities.injectOutboundMessage, + readTransportMessage: env.transport.capabilities.readNormalizedMessage, resetBus: async () => { - env.lab.state.reset(); + await env.transport.capabilities.resetNormalizedMessageState(); await sleep(100); }, reset: async () => { - env.lab.state.reset(); + await env.transport.capabilities.resetNormalizedMessageState(); await sleep(100); }, }; @@ -1292,6 +1331,11 @@ export const qaSuiteTesting = { normalizeQaSuiteConcurrency, scenarioMatchesLiveLane, selectQaSuiteScenarios, + readTransportTranscript, + formatTransportTranscript, + resolveQaSuiteOutputDir, + waitForTransportOutboundMessage, + waitForNoTransportOutbound, waitForOutboundMessage, }; @@ -1311,21 +1355,14 @@ async function runScenarioDefinition( } function createQaSuiteReportNotes(params: { + transport: QaTransportAdapter; providerMode: "mock-openai" | "live-frontier"; primaryModel: string; alternateModel: string; fastMode: boolean; concurrency: number; }) { - return [ - params.providerMode === "mock-openai" - ? "Runs against qa-channel + qa-lab bus + real gateway child + mock OpenAI provider." - : `Runs against qa-channel + qa-lab bus + real gateway child + live frontier models (${params.primaryModel}, ${params.alternateModel})${params.fastMode ? " with fast mode enabled" : ""}.`, - params.concurrency > 1 - ? `Scenarios run in isolated gateway workers with concurrency ${params.concurrency}.` - : "Scenarios run serially in one gateway worker.", - "Cron uses a one-minute schedule assertion plus forced execution for fast verification.", - ]; + return params.transport.createReportNotes(params); } async function writeQaSuiteArtifacts(params: { @@ -1333,6 +1370,7 @@ async function writeQaSuiteArtifacts(params: { startedAt: Date; finishedAt: Date; scenarios: QaSuiteScenarioResult[]; + transport: QaTransportAdapter; providerMode: "mock-openai" | "live-frontier"; primaryModel: string; alternateModel: string; @@ -1378,6 +1416,7 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise ({ id: scenario.id, @@ -1447,6 +1487,7 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise { + // The gateway child already waits for /readyz before returning, but the + // selected transport can still be finishing account startup. Pay that + // readiness cost once here so the first scenario does not race bootstrap. + await waitForTransportReady(env, 120_000).catch(async () => { await waitForGatewayHealthy(env, 120_000); - await waitForQaChannelReady(env, 120_000); + await waitForTransportReady(env, 120_000); }); await sleep(1_000); const scenarios: QaSuiteScenarioResult[] = []; @@ -1653,6 +1708,9 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise scenario.status === "fail")) { + preserveGatewayRuntimeDir = path.join(outputDir, "artifacts", "gateway-runtime"); + } lab.setScenarioRun({ kind: "suite", status: "completed", @@ -1665,6 +1723,7 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise { expect(startAccount).toHaveBeenCalledTimes(1); }); + it("consumes rejected stop tasks during manual abort", async () => { + const unhandledRejection = vi.fn(); + process.on("unhandledRejection", unhandledRejection); + try { + const startAccount = vi.fn( + async ({ abortSignal }: { abortSignal: AbortSignal }) => + await new Promise((_resolve, reject) => { + abortSignal.addEventListener( + "abort", + () => { + reject(new Error("aborted")); + }, + { once: true }, + ); + }), + ); + installTestRegistry( + createTestPlugin({ + startAccount, + }), + ); + const manager = createManager(); + + await manager.startChannels(); + vi.runAllTicks(); + await manager.stopChannel("discord", DEFAULT_ACCOUNT_ID); + await Promise.resolve(); + + expect(unhandledRejection).not.toHaveBeenCalled(); + } finally { + process.off("unhandledRejection", unhandledRejection); + } + }); + + it("does not allow a second account task to start when stop times out", async () => { + const startAccount = vi.fn( + async ({ abortSignal }: { abortSignal: AbortSignal }) => + await new Promise(() => { + abortSignal.addEventListener("abort", () => {}, { once: true }); + }), + ); + installTestRegistry( + createTestPlugin({ + startAccount, + }), + ); + const manager = createManager(); + + await manager.startChannels(); + const stopTask = manager.stopChannel("discord", DEFAULT_ACCOUNT_ID); + await vi.advanceTimersByTimeAsync(5_000); + await stopTask; + await manager.startChannel("discord", DEFAULT_ACCOUNT_ID); + + const snapshot = manager.getRuntimeSnapshot(); + const account = snapshot.channelAccounts.discord?.[DEFAULT_ACCOUNT_ID]; + expect(startAccount).toHaveBeenCalledTimes(1); + expect(account?.running).toBe(true); + expect(account?.restartPending).toBe(false); + expect(account?.lastError).toContain("channel stop timed out"); + }); + it("marks enabled/configured when account descriptors omit them", () => { installTestRegistry( createTestPlugin({ diff --git a/src/gateway/server-channels.ts b/src/gateway/server-channels.ts index 57ebb339cbe..1745e058ffe 100644 --- a/src/gateway/server-channels.ts +++ b/src/gateway/server-channels.ts @@ -26,6 +26,7 @@ const CHANNEL_RESTART_POLICY: BackoffPolicy = { jitter: 0.1, }; const MAX_RESTART_ATTEMPTS = 10; +const CHANNEL_STOP_ABORT_TIMEOUT_MS = 5_000; type SubsystemLogger = ReturnType; @@ -72,6 +73,31 @@ function cloneDefaultRuntime(channelId: ChannelId, accountId: string): ChannelAc return { ...resolveDefaultRuntime(channelId), accountId }; } +async function waitForChannelStopGracefully(task: Promise | undefined, timeoutMs: number) { + if (!task) { + return true; + } + return await new Promise((resolve) => { + let settled = false; + const timer = setTimeout(() => { + if (!settled) { + settled = true; + resolve(false); + } + }, timeoutMs); + timer.unref?.(); + const resolveSettled = () => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + resolve(true); + }; + void task.then(resolveSettled, resolveSettled); + }); +} + function applyDescribedAccountFields( next: ChannelAccountSnapshot, described: ChannelAccountSnapshot | undefined, @@ -527,6 +553,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage } manuallyStopped.add(restartKey(channelId, id)); abort?.abort(); + const log = channelLogs[channelId]; if (plugin?.gateway?.stopAccount) { const account = plugin.config.resolveAccount(cfg, id); await plugin.gateway.stopAccount({ @@ -540,10 +567,21 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage setStatus: (next) => setRuntime(channelId, id, next), }); } - try { - await task; - } catch { - // ignore + const stoppedCleanly = await waitForChannelStopGracefully( + task, + CHANNEL_STOP_ABORT_TIMEOUT_MS, + ); + if (!stoppedCleanly) { + log.warn?.( + `[${id}] channel stop exceeded ${CHANNEL_STOP_ABORT_TIMEOUT_MS}ms after abort; continuing shutdown`, + ); + setRuntime(channelId, id, { + accountId: id, + running: true, + restartPending: false, + lastError: `channel stop timed out after ${CHANNEL_STOP_ABORT_TIMEOUT_MS}ms`, + }); + return; } store.aborts.delete(id); store.tasks.delete(id); diff --git a/src/gateway/server-close.test.ts b/src/gateway/server-close.test.ts index b707b06ad23..97e6c138a7d 100644 --- a/src/gateway/server-close.test.ts +++ b/src/gateway/server-close.test.ts @@ -6,6 +6,8 @@ const mocks = { }; const WEBSOCKET_CLOSE_GRACE_MS = 1_000; const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250; +const HTTP_CLOSE_GRACE_MS = 1_000; +const HTTP_CLOSE_FORCE_WAIT_MS = 5_000; vi.mock("../channels/plugins/index.js", () => ({ listChannelPlugins: () => [], @@ -133,6 +135,7 @@ describe("createGatewayCloseHandler", () => { await closePromise; expect(terminate).toHaveBeenCalledTimes(1); + expect(vi.getTimerCount()).toBe(0); expect( mocks.logWarn.mock.calls.some(([message]) => String(message).includes("websocket server close exceeded 1000ms"), @@ -156,10 +159,109 @@ describe("createGatewayCloseHandler", () => { await vi.advanceTimersByTimeAsync(WEBSOCKET_CLOSE_GRACE_MS + WEBSOCKET_CLOSE_FORCE_CONTINUE_MS); await closePromise; + expect(vi.getTimerCount()).toBe(0); expect( mocks.logWarn.mock.calls.some(([message]) => String(message).includes("websocket server close still pending after 250ms force window"), ), ).toBe(true); }); + + it("forces lingering HTTP connections closed when server close exceeds the grace window", async () => { + vi.useFakeTimers(); + + let closeCallback: ((err?: Error | null) => void) | null = null; + const closeAllConnections = vi.fn(() => { + closeCallback?.(null); + }); + const close = createGatewayCloseHandler({ + bonjourStop: null, + tailscaleCleanup: null, + canvasHost: null, + canvasHostServer: null, + stopChannel: vi.fn(async () => undefined), + pluginServices: null, + cron: { stop: vi.fn() }, + heartbeatRunner: { stop: vi.fn() } as never, + updateCheckStop: null, + stopTaskRegistryMaintenance: null, + nodePresenceTimers: new Map(), + broadcast: vi.fn(), + tickInterval: setInterval(() => undefined, 60_000), + healthInterval: setInterval(() => undefined, 60_000), + dedupeCleanup: setInterval(() => undefined, 60_000), + mediaCleanup: null, + agentUnsub: null, + heartbeatUnsub: null, + transcriptUnsub: null, + lifecycleUnsub: null, + chatRunState: { clear: vi.fn() }, + clients: new Set(), + configReloader: { stop: vi.fn(async () => undefined) }, + wss: { close: (cb: () => void) => cb() } as never, + httpServer: { + close: (cb: (err?: Error | null) => void) => { + closeCallback = cb; + }, + closeAllConnections, + closeIdleConnections: vi.fn(), + } as never, + }); + + const closePromise = close({ reason: "test shutdown" }); + await vi.advanceTimersByTimeAsync(HTTP_CLOSE_GRACE_MS); + await closePromise; + + expect(closeAllConnections).toHaveBeenCalledTimes(1); + expect(vi.getTimerCount()).toBe(0); + expect( + mocks.logWarn.mock.calls.some(([message]) => + String(message).includes("http server close exceeded 1000ms"), + ), + ).toBe(true); + }); + + it("fails shutdown when http server close still hangs after force close", async () => { + vi.useFakeTimers(); + + const close = createGatewayCloseHandler({ + bonjourStop: null, + tailscaleCleanup: null, + canvasHost: null, + canvasHostServer: null, + stopChannel: vi.fn(async () => undefined), + pluginServices: null, + cron: { stop: vi.fn() }, + heartbeatRunner: { stop: vi.fn() } as never, + updateCheckStop: null, + stopTaskRegistryMaintenance: null, + nodePresenceTimers: new Map(), + broadcast: vi.fn(), + tickInterval: setInterval(() => undefined, 60_000), + healthInterval: setInterval(() => undefined, 60_000), + dedupeCleanup: setInterval(() => undefined, 60_000), + mediaCleanup: null, + agentUnsub: null, + heartbeatUnsub: null, + transcriptUnsub: null, + lifecycleUnsub: null, + chatRunState: { clear: vi.fn() }, + clients: new Set(), + configReloader: { stop: vi.fn(async () => undefined) }, + wss: { close: (cb: () => void) => cb() } as never, + httpServer: { + close: () => undefined, + closeAllConnections: vi.fn(), + closeIdleConnections: vi.fn(), + } as never, + }); + + const closePromise = close({ reason: "test shutdown" }); + const closeExpectation = expect(closePromise).rejects.toThrow( + "http server close still pending after forced connection shutdown (5000ms)", + ); + await vi.advanceTimersByTimeAsync(HTTP_CLOSE_GRACE_MS + HTTP_CLOSE_FORCE_WAIT_MS); + await closeExpectation; + expect(vi.getTimerCount()).toBe(0); + }); }); diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index 6457bbe567d..ba8913ffab3 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -12,6 +12,31 @@ import { normalizeOptionalString } from "../shared/string-coerce.js"; const shutdownLog = createSubsystemLogger("gateway/shutdown"); const WEBSOCKET_CLOSE_GRACE_MS = 1_000; const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250; +const HTTP_CLOSE_GRACE_MS = 1_000; +const HTTP_CLOSE_FORCE_WAIT_MS = 5_000; + +function createTimeoutRace(timeoutMs: number, onTimeout: () => T) { + let timer: ReturnType | null = setTimeout(() => { + timer = null; + resolve(onTimeout()); + }, timeoutMs); + timer.unref?.(); + + let resolve!: (value: T) => void; + const promise = new Promise((innerResolve) => { + resolve = innerResolve; + }); + + return { + promise, + clear() { + if (timer) { + clearTimeout(timer); + timer = null; + } + }, + }; +} export async function runGatewayClosePrelude(params: { stopDiagnostics?: () => void; @@ -170,10 +195,15 @@ export function createGatewayCloseHandler(params: { await params.configReloader.stop().catch(() => {}); const wsClients = params.wss.clients ?? new Set(); const closePromise = new Promise((resolve) => params.wss.close(() => resolve())); + const websocketGraceTimeout = createTimeoutRace( + WEBSOCKET_CLOSE_GRACE_MS, + () => false as const, + ); const closedWithinGrace = await Promise.race([ closePromise.then(() => true), - new Promise((resolve) => setTimeout(() => resolve(false), WEBSOCKET_CLOSE_GRACE_MS)), + websocketGraceTimeout.promise, ]); + websocketGraceTimeout.clear(); if (!closedWithinGrace) { shutdownLog.warn( `websocket server close exceeded ${WEBSOCKET_CLOSE_GRACE_MS}ms; forcing shutdown continuation with ${wsClients.size} tracked client(s)`, @@ -185,17 +215,13 @@ export function createGatewayCloseHandler(params: { /* ignore */ } } - await Promise.race([ - closePromise, - new Promise((resolve) => - setTimeout(() => { - shutdownLog.warn( - `websocket server close still pending after ${WEBSOCKET_CLOSE_FORCE_CONTINUE_MS}ms force window; continuing shutdown`, - ); - resolve(); - }, WEBSOCKET_CLOSE_FORCE_CONTINUE_MS), - ), - ]); + const websocketForceTimeout = createTimeoutRace(WEBSOCKET_CLOSE_FORCE_CONTINUE_MS, () => { + shutdownLog.warn( + `websocket server close still pending after ${WEBSOCKET_CLOSE_FORCE_CONTINUE_MS}ms force window; continuing shutdown`, + ); + }); + await Promise.race([closePromise, websocketForceTimeout.promise]); + websocketForceTimeout.clear(); } const servers = params.httpServers && params.httpServers.length > 0 @@ -203,14 +229,41 @@ export function createGatewayCloseHandler(params: { : [params.httpServer]; for (const server of servers) { const httpServer = server as HttpServer & { + closeAllConnections?: () => void; closeIdleConnections?: () => void; }; if (typeof httpServer.closeIdleConnections === "function") { httpServer.closeIdleConnections(); } - await new Promise((resolve, reject) => + const closePromise = new Promise((resolve, reject) => httpServer.close((err) => (err ? reject(err) : resolve())), ); + const httpGraceTimeout = createTimeoutRace(HTTP_CLOSE_GRACE_MS, () => false as const); + const closedWithinGrace = await Promise.race([ + closePromise.then(() => true), + httpGraceTimeout.promise, + ]); + httpGraceTimeout.clear(); + if (!closedWithinGrace) { + shutdownLog.warn( + `http server close exceeded ${HTTP_CLOSE_GRACE_MS}ms; forcing connection shutdown and waiting for close`, + ); + httpServer.closeAllConnections?.(); + const httpForceTimeout = createTimeoutRace( + HTTP_CLOSE_FORCE_WAIT_MS, + () => false as const, + ); + const closedAfterForce = await Promise.race([ + closePromise.then(() => true), + httpForceTimeout.promise, + ]); + httpForceTimeout.clear(); + if (!closedAfterForce) { + throw new Error( + `http server close still pending after forced connection shutdown (${HTTP_CLOSE_FORCE_WAIT_MS}ms)`, + ); + } + } } } finally { try {