mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-07 07:58:36 +00:00
916 lines
32 KiB
TypeScript
916 lines
32 KiB
TypeScript
import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/runs.js";
|
|
import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js";
|
|
import type { CanvasHostServer } from "../canvas-host/server.js";
|
|
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
|
import { createDefaultDeps } from "../cli/deps.js";
|
|
import { isRestartEnabled } from "../config/commands.flags.js";
|
|
import {
|
|
type OpenClawConfig,
|
|
applyConfigOverrides,
|
|
getRuntimeConfig,
|
|
isNixMode,
|
|
loadConfig,
|
|
promoteConfigSnapshotToLastKnownGood,
|
|
readConfigFileSnapshot,
|
|
recoverConfigFromLastKnownGood,
|
|
registerConfigWriteListener,
|
|
writeConfigFile,
|
|
} from "../config/config.js";
|
|
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
|
|
import { resolveMainSessionKey } from "../config/sessions.js";
|
|
import { clearAgentRunContext } from "../infra/agent-events.js";
|
|
import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js";
|
|
import { isTruthyEnvValue, isVitestRuntimeEnv, logAcceptedEnvOption } from "../infra/env.js";
|
|
import { ensureOpenClawCliOnPath } from "../infra/path-env.js";
|
|
import { setGatewaySigusr1RestartPolicy, setPreRestartDeferralCheck } from "../infra/restart.js";
|
|
import { enqueueSystemEvent } from "../infra/system-events.js";
|
|
import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat } from "../logging/diagnostic.js";
|
|
import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js";
|
|
import { runGlobalGatewayStopSafely } from "../plugins/hook-runner-global.js";
|
|
import { createPluginRuntime } from "../plugins/runtime/index.js";
|
|
import { getTotalQueueSize } from "../process/command-queue.js";
|
|
import type { RuntimeEnv } from "../runtime.js";
|
|
import {
|
|
clearSecretsRuntimeSnapshot,
|
|
getActiveSecretsRuntimeSnapshot,
|
|
} from "../secrets/runtime.js";
|
|
import {
|
|
getInspectableTaskRegistrySummary,
|
|
stopTaskRegistryMaintenance,
|
|
} from "../tasks/task-registry.maintenance.js";
|
|
import { runSetupWizard } from "../wizard/setup.js";
|
|
import { createAuthRateLimiter, type AuthRateLimiter } from "./auth-rate-limit.js";
|
|
import { resolveGatewayAuth } from "./auth.js";
|
|
import { closeMcpLoopbackServer } from "./mcp-http.js";
|
|
import { createGatewayAuxHandlers } from "./server-aux-handlers.js";
|
|
import { createChannelManager } from "./server-channels.js";
|
|
import { createGatewayCloseHandler, runGatewayClosePrelude } from "./server-close.js";
|
|
import { resolveGatewayControlUiRootState } from "./server-control-ui-root.js";
|
|
import { buildGatewayCronService } from "./server-cron.js";
|
|
import { applyGatewayLaneConcurrency } from "./server-lanes.js";
|
|
import { createGatewayServerLiveState, type GatewayServerLiveState } from "./server-live-state.js";
|
|
import { GATEWAY_EVENTS } from "./server-methods-list.js";
|
|
import { coreGatewayHandlers } from "./server-methods.js";
|
|
import { loadGatewayModelCatalog } from "./server-model-catalog.js";
|
|
import { createGatewayNodeSessionRuntime } from "./server-node-session-runtime.js";
|
|
import { reloadDeferredGatewayPlugins } from "./server-plugin-bootstrap.js";
|
|
import { setFallbackGatewayContextResolver } from "./server-plugins.js";
|
|
import { startManagedGatewayConfigReloader } from "./server-reload-handlers.js";
|
|
import { createGatewayRequestContext } from "./server-request-context.js";
|
|
import { resolveGatewayRuntimeConfig } from "./server-runtime-config.js";
|
|
import {
|
|
activateGatewayScheduledServices,
|
|
startGatewayRuntimeServices,
|
|
} from "./server-runtime-services.js";
|
|
import { createGatewayRuntimeState } from "./server-runtime-state.js";
|
|
import { startGatewayEventSubscriptions } from "./server-runtime-subscriptions.js";
|
|
import { resolveSessionKeyForRun } from "./server-session-key.js";
|
|
import {
|
|
enforceSharedGatewaySessionGenerationForConfigWrite,
|
|
getRequiredSharedGatewaySessionGeneration,
|
|
type SharedGatewaySessionGenerationState,
|
|
} from "./server-shared-auth-generation.js";
|
|
import {
|
|
createRuntimeSecretsActivator,
|
|
loadGatewayStartupConfigSnapshot,
|
|
prepareGatewayStartupConfig,
|
|
} from "./server-startup-config.js";
|
|
import { prepareGatewayPluginBootstrap } from "./server-startup-plugins.js";
|
|
import { STARTUP_UNAVAILABLE_GATEWAY_METHODS } from "./server-startup-unavailable-methods.js";
|
|
import { startGatewayEarlyRuntime, startGatewayPostAttachRuntime } from "./server-startup.js";
|
|
import { createWizardSessionTracker } from "./server-wizard-sessions.js";
|
|
import { attachGatewayWsHandlers } from "./server-ws-runtime.js";
|
|
import {
|
|
getHealthCache,
|
|
getHealthVersion,
|
|
getPresenceVersion,
|
|
incrementPresenceVersion,
|
|
refreshGatewayHealthSnapshot,
|
|
} from "./server/health-state.js";
|
|
import { resolveHookClientIpConfig } from "./server/hooks.js";
|
|
import { createReadinessChecker } from "./server/readiness.js";
|
|
import { loadGatewayTlsRuntime } from "./server/tls.js";
|
|
import { resolveSharedGatewaySessionGeneration } from "./server/ws-shared-generation.js";
|
|
import { maybeSeedControlUiAllowedOriginsAtStartup } from "./startup-control-ui-origins.js";
|
|
|
|
export { __resetModelCatalogCacheForTest } from "./server-model-catalog.js";
|
|
|
|
ensureOpenClawCliOnPath();
|
|
|
|
const MAX_MEDIA_TTL_HOURS = 24 * 7;
|
|
|
|
function resolveMediaCleanupTtlMs(ttlHoursRaw: number): number {
|
|
const ttlHours = Math.min(Math.max(ttlHoursRaw, 1), MAX_MEDIA_TTL_HOURS);
|
|
const ttlMs = ttlHours * 60 * 60_000;
|
|
if (!Number.isFinite(ttlMs) || !Number.isSafeInteger(ttlMs)) {
|
|
throw new Error(`Invalid media.ttlHours: ${String(ttlHoursRaw)}`);
|
|
}
|
|
return ttlMs;
|
|
}
|
|
|
|
const log = createSubsystemLogger("gateway");
|
|
const logCanvas = log.child("canvas");
|
|
const logDiscovery = log.child("discovery");
|
|
const logTailscale = log.child("tailscale");
|
|
const logChannels = log.child("channels");
|
|
|
|
let cachedChannelRuntime: ReturnType<typeof createPluginRuntime>["channel"] | null = null;
|
|
|
|
function getChannelRuntime() {
|
|
cachedChannelRuntime ??= createPluginRuntime().channel;
|
|
return cachedChannelRuntime;
|
|
}
|
|
|
|
const logHealth = log.child("health");
|
|
const logCron = log.child("cron");
|
|
const logReload = log.child("reload");
|
|
const logHooks = log.child("hooks");
|
|
const logPlugins = log.child("plugins");
|
|
const logWsControl = log.child("ws");
|
|
const logSecrets = log.child("secrets");
|
|
const gatewayRuntime = runtimeForLogger(log);
|
|
const canvasRuntime = runtimeForLogger(logCanvas);
|
|
|
|
function createGatewayStartupTrace() {
|
|
const enabled = isTruthyEnvValue(process.env.OPENCLAW_GATEWAY_STARTUP_TRACE);
|
|
const started = performance.now();
|
|
let last = started;
|
|
const emit = (name: string, durationMs: number, totalMs: number) => {
|
|
if (enabled) {
|
|
log.info(`startup trace: ${name} ${durationMs.toFixed(1)}ms total=${totalMs.toFixed(1)}ms`);
|
|
}
|
|
};
|
|
return {
|
|
mark(name: string) {
|
|
const now = performance.now();
|
|
emit(name, now - last, now - started);
|
|
last = now;
|
|
},
|
|
async measure<T>(name: string, run: () => Promise<T> | T): Promise<T> {
|
|
const before = performance.now();
|
|
try {
|
|
return await run();
|
|
} finally {
|
|
const now = performance.now();
|
|
emit(name, now - before, now - started);
|
|
last = now;
|
|
}
|
|
},
|
|
};
|
|
}
|
|
|
|
type AuthRateLimitConfig = Parameters<typeof createAuthRateLimiter>[0];
|
|
|
|
function createGatewayAuthRateLimiters(rateLimitConfig: AuthRateLimitConfig | undefined): {
|
|
rateLimiter?: AuthRateLimiter;
|
|
browserRateLimiter: AuthRateLimiter;
|
|
} {
|
|
const rateLimiter = rateLimitConfig ? createAuthRateLimiter(rateLimitConfig) : undefined;
|
|
// Browser-origin WS auth attempts always use loopback-non-exempt throttling.
|
|
const browserRateLimiter = createAuthRateLimiter({
|
|
...rateLimitConfig,
|
|
exemptLoopback: false,
|
|
});
|
|
return { rateLimiter, browserRateLimiter };
|
|
}
|
|
|
|
export type GatewayServer = {
|
|
close: (opts?: { reason?: string; restartExpectedMs?: number | null }) => Promise<void>;
|
|
};
|
|
|
|
export type GatewayServerOptions = {
|
|
/**
|
|
* Bind address policy for the Gateway WebSocket/HTTP server.
|
|
* - loopback: 127.0.0.1
|
|
* - lan: 0.0.0.0
|
|
* - tailnet: bind only to the Tailscale IPv4 address (100.64.0.0/10)
|
|
* - auto: prefer loopback, else LAN
|
|
*/
|
|
bind?: import("../config/config.js").GatewayBindMode;
|
|
/**
|
|
* Advanced override for the bind host, bypassing bind resolution.
|
|
* Prefer `bind` unless you really need a specific address.
|
|
*/
|
|
host?: string;
|
|
/**
|
|
* If false, do not serve the browser Control UI.
|
|
* Default: config `gateway.controlUi.enabled` (or true when absent).
|
|
*/
|
|
controlUiEnabled?: boolean;
|
|
/**
|
|
* If false, do not serve `POST /v1/chat/completions`.
|
|
* Default: config `gateway.http.endpoints.chatCompletions.enabled` (or false when absent).
|
|
*/
|
|
openAiChatCompletionsEnabled?: boolean;
|
|
/**
|
|
* If false, do not serve `POST /v1/responses` (OpenResponses API).
|
|
* Default: config `gateway.http.endpoints.responses.enabled` (or false when absent).
|
|
*/
|
|
openResponsesEnabled?: boolean;
|
|
/**
|
|
* Override gateway auth configuration (merges with config).
|
|
*/
|
|
auth?: import("../config/config.js").GatewayAuthConfig;
|
|
/**
|
|
* Override gateway Tailscale exposure configuration (merges with config).
|
|
*/
|
|
tailscale?: import("../config/config.js").GatewayTailscaleConfig;
|
|
/**
|
|
* Test-only: allow canvas host startup even when NODE_ENV/VITEST would disable it.
|
|
*/
|
|
allowCanvasHostInTests?: boolean;
|
|
/**
|
|
* Test-only: override the setup wizard runner.
|
|
*/
|
|
wizardRunner?: (
|
|
opts: import("../commands/onboard-types.js").OnboardOptions,
|
|
runtime: import("../runtime.js").RuntimeEnv,
|
|
prompter: import("../wizard/prompts.js").WizardPrompter,
|
|
) => Promise<void>;
|
|
/**
|
|
* Test-only: wait for post-listen sidecars such as plugin services before returning.
|
|
*/
|
|
awaitStartupSidecars?: boolean;
|
|
/**
|
|
* Optional startup timestamp used for concise readiness logging.
|
|
*/
|
|
startupStartedAt?: number;
|
|
};
|
|
|
|
export async function startGatewayServer(
|
|
port = 18789,
|
|
opts: GatewayServerOptions = {},
|
|
): Promise<GatewayServer> {
|
|
const minimalTestGateway =
|
|
isVitestRuntimeEnv() && process.env.OPENCLAW_TEST_MINIMAL_GATEWAY === "1";
|
|
|
|
// Ensure all default port derivations (browser/canvas) see the actual runtime port.
|
|
process.env.OPENCLAW_GATEWAY_PORT = String(port);
|
|
logAcceptedEnvOption({
|
|
key: "OPENCLAW_RAW_STREAM",
|
|
description: "raw stream logging enabled",
|
|
});
|
|
logAcceptedEnvOption({
|
|
key: "OPENCLAW_RAW_STREAM_PATH",
|
|
description: "raw stream log path override",
|
|
});
|
|
const startupTrace = createGatewayStartupTrace();
|
|
|
|
const startupConfigLoad = await startupTrace.measure("config.snapshot", () =>
|
|
loadGatewayStartupConfigSnapshot({
|
|
minimalTestGateway,
|
|
log,
|
|
}),
|
|
);
|
|
const configSnapshot = startupConfigLoad.snapshot;
|
|
|
|
const emitSecretsStateEvent = (
|
|
code: "SECRETS_RELOADER_DEGRADED" | "SECRETS_RELOADER_RECOVERED",
|
|
message: string,
|
|
cfg: OpenClawConfig,
|
|
) => {
|
|
enqueueSystemEvent(`[${code}] ${message}`, {
|
|
sessionKey: resolveMainSessionKey(cfg),
|
|
contextKey: code,
|
|
});
|
|
};
|
|
const activateRuntimeSecrets = createRuntimeSecretsActivator({
|
|
logSecrets,
|
|
emitStateEvent: emitSecretsStateEvent,
|
|
});
|
|
|
|
let cfgAtStart: OpenClawConfig;
|
|
let startupInternalWriteHash: string | null = null;
|
|
let startupLastGoodSnapshot = configSnapshot;
|
|
const startupRuntimeConfig = applyConfigOverrides(configSnapshot.config);
|
|
const authBootstrap = await startupTrace.measure("config.auth", () =>
|
|
prepareGatewayStartupConfig({
|
|
configSnapshot,
|
|
authOverride: opts.auth,
|
|
tailscaleOverride: opts.tailscale,
|
|
activateRuntimeSecrets,
|
|
}),
|
|
);
|
|
cfgAtStart = authBootstrap.cfg;
|
|
if (authBootstrap.generatedToken) {
|
|
if (authBootstrap.persistedGeneratedToken) {
|
|
log.info(
|
|
"Gateway auth token was missing. Generated a new token and saved it to config (gateway.auth.token).",
|
|
);
|
|
} else {
|
|
log.warn(
|
|
"Gateway auth token was missing. Generated a runtime token for this startup without changing config; restart will generate a different token. Persist one with `openclaw config set gateway.auth.mode token` and `openclaw config set gateway.auth.token <token>`.",
|
|
);
|
|
}
|
|
}
|
|
const diagnosticsEnabled = isDiagnosticsEnabled(cfgAtStart);
|
|
if (diagnosticsEnabled) {
|
|
startDiagnosticHeartbeat(undefined, { getConfig: getRuntimeConfig });
|
|
}
|
|
setGatewaySigusr1RestartPolicy({ allowExternal: isRestartEnabled(cfgAtStart) });
|
|
setPreRestartDeferralCheck(
|
|
() =>
|
|
getTotalQueueSize() +
|
|
getTotalPendingReplies() +
|
|
getActiveEmbeddedRunCount() +
|
|
getInspectableTaskRegistrySummary().active,
|
|
);
|
|
// Unconditional startup migration: seed gateway.controlUi.allowedOrigins for existing
|
|
// non-loopback installs that upgraded to v2026.2.26+ without required origins.
|
|
const controlUiSeed = minimalTestGateway
|
|
? { config: cfgAtStart, persistedAllowedOriginsSeed: false }
|
|
: await startupTrace.measure("control-ui.seed", () =>
|
|
maybeSeedControlUiAllowedOriginsAtStartup({
|
|
config: cfgAtStart,
|
|
writeConfig: writeConfigFile,
|
|
log,
|
|
}),
|
|
);
|
|
cfgAtStart = controlUiSeed.config;
|
|
// Capture the final config hash only after startup writes (plugin auto-enable,
|
|
// auth token generation, control-UI origin seeding) so the config reloader can
|
|
// suppress its own persistence events without rereading config on every boot.
|
|
if (
|
|
startupConfigLoad.wroteConfig ||
|
|
authBootstrap.persistedGeneratedToken ||
|
|
controlUiSeed.persistedAllowedOriginsSeed
|
|
) {
|
|
const startupSnapshot = await startupTrace.measure("config.final-snapshot", () =>
|
|
readConfigFileSnapshot(),
|
|
);
|
|
startupInternalWriteHash = startupSnapshot.hash ?? null;
|
|
startupLastGoodSnapshot = startupSnapshot;
|
|
}
|
|
const pluginBootstrap = await startupTrace.measure("plugins.bootstrap", () =>
|
|
prepareGatewayPluginBootstrap({
|
|
cfgAtStart,
|
|
startupRuntimeConfig,
|
|
minimalTestGateway,
|
|
log,
|
|
}),
|
|
);
|
|
const {
|
|
gatewayPluginConfigAtStart,
|
|
defaultWorkspaceDir,
|
|
deferredConfiguredChannelPluginIds,
|
|
startupPluginIds,
|
|
baseMethods,
|
|
} = pluginBootstrap;
|
|
let { pluginRegistry, baseGatewayMethods } = pluginBootstrap;
|
|
const channelLogs = Object.fromEntries(
|
|
listChannelPlugins().map((plugin) => [plugin.id, logChannels.child(plugin.id)]),
|
|
) as Record<ChannelId, ReturnType<typeof createSubsystemLogger>>;
|
|
const channelRuntimeEnvs = Object.fromEntries(
|
|
Object.entries(channelLogs).map(([id, logger]) => [id, runtimeForLogger(logger)]),
|
|
) as unknown as Record<ChannelId, RuntimeEnv>;
|
|
const listActiveGatewayMethods = (nextBaseGatewayMethods: string[]) =>
|
|
Array.from(
|
|
new Set([
|
|
...nextBaseGatewayMethods,
|
|
...listChannelPlugins().flatMap((plugin) => plugin.gatewayMethods ?? []),
|
|
]),
|
|
);
|
|
const runtimeConfig = await startupTrace.measure("runtime.config", () =>
|
|
resolveGatewayRuntimeConfig({
|
|
cfg: cfgAtStart,
|
|
port,
|
|
bind: opts.bind,
|
|
host: opts.host,
|
|
controlUiEnabled: opts.controlUiEnabled,
|
|
openAiChatCompletionsEnabled: opts.openAiChatCompletionsEnabled,
|
|
openResponsesEnabled: opts.openResponsesEnabled,
|
|
auth: opts.auth,
|
|
tailscale: opts.tailscale,
|
|
}),
|
|
);
|
|
const {
|
|
bindHost,
|
|
controlUiEnabled,
|
|
openAiChatCompletionsEnabled,
|
|
openAiChatCompletionsConfig,
|
|
openResponsesEnabled,
|
|
openResponsesConfig,
|
|
strictTransportSecurityHeader,
|
|
controlUiBasePath,
|
|
controlUiRoot: controlUiRootOverride,
|
|
resolvedAuth,
|
|
tailscaleConfig,
|
|
tailscaleMode,
|
|
} = runtimeConfig;
|
|
const getResolvedAuth = () =>
|
|
resolveGatewayAuth({
|
|
authConfig:
|
|
getActiveSecretsRuntimeSnapshot()?.config.gateway?.auth ?? getRuntimeConfig().gateway?.auth,
|
|
authOverride: opts.auth,
|
|
env: process.env,
|
|
tailscaleMode,
|
|
});
|
|
const resolveSharedGatewaySessionGenerationForConfig = (config: OpenClawConfig) =>
|
|
resolveSharedGatewaySessionGeneration(
|
|
resolveGatewayAuth({
|
|
authConfig: config.gateway?.auth,
|
|
authOverride: opts.auth,
|
|
env: process.env,
|
|
tailscaleMode,
|
|
}),
|
|
);
|
|
const resolveCurrentSharedGatewaySessionGeneration = () =>
|
|
resolveSharedGatewaySessionGeneration(getResolvedAuth());
|
|
const resolveSharedGatewaySessionGenerationForRuntimeSnapshot = () =>
|
|
resolveSharedGatewaySessionGeneration(
|
|
resolveGatewayAuth({
|
|
authConfig: getRuntimeConfig().gateway?.auth,
|
|
authOverride: opts.auth,
|
|
env: process.env,
|
|
tailscaleMode,
|
|
}),
|
|
);
|
|
const sharedGatewaySessionGenerationState: SharedGatewaySessionGenerationState = {
|
|
current: resolveCurrentSharedGatewaySessionGeneration(),
|
|
required: null,
|
|
};
|
|
const initialHooksConfig = runtimeConfig.hooksConfig;
|
|
const initialHookClientIpConfig = resolveHookClientIpConfig(cfgAtStart);
|
|
const canvasHostEnabled = runtimeConfig.canvasHostEnabled;
|
|
|
|
// Create auth rate limiters used by connect/auth flows.
|
|
const rateLimitConfig = cfgAtStart.gateway?.auth?.rateLimit;
|
|
const { rateLimiter: authRateLimiter, browserRateLimiter: browserAuthRateLimiter } =
|
|
createGatewayAuthRateLimiters(rateLimitConfig);
|
|
|
|
const controlUiRootState = await startupTrace.measure("control-ui.root", () =>
|
|
resolveGatewayControlUiRootState({
|
|
controlUiRootOverride,
|
|
controlUiEnabled,
|
|
gatewayRuntime,
|
|
log,
|
|
}),
|
|
);
|
|
|
|
const wizardRunner = opts.wizardRunner ?? runSetupWizard;
|
|
const { wizardSessions, findRunningWizard, purgeWizardSession } = createWizardSessionTracker();
|
|
|
|
const deps = createDefaultDeps();
|
|
let runtimeState: GatewayServerLiveState | null = null;
|
|
let canvasHostServer: CanvasHostServer | null = null;
|
|
const gatewayTls = await startupTrace.measure("tls.runtime", () =>
|
|
loadGatewayTlsRuntime(cfgAtStart.gateway?.tls, log.child("tls")),
|
|
);
|
|
if (cfgAtStart.gateway?.tls?.enabled && !gatewayTls.enabled) {
|
|
throw new Error(gatewayTls.error ?? "gateway tls: failed to enable");
|
|
}
|
|
const serverStartedAt = Date.now();
|
|
let startupSidecarsReady = minimalTestGateway;
|
|
const channelManager = createChannelManager({
|
|
loadConfig: () =>
|
|
applyPluginAutoEnable({
|
|
config: loadConfig(),
|
|
env: process.env,
|
|
}).config,
|
|
channelLogs,
|
|
channelRuntimeEnvs,
|
|
resolveChannelRuntime: getChannelRuntime,
|
|
});
|
|
const getReadiness = createReadinessChecker({
|
|
channelManager,
|
|
startedAt: serverStartedAt,
|
|
getStartupPending: () => !startupSidecarsReady,
|
|
});
|
|
log.info("starting HTTP server...");
|
|
const {
|
|
canvasHost,
|
|
releasePluginRouteRegistry,
|
|
httpServer,
|
|
httpServers,
|
|
httpBindHosts,
|
|
startListening,
|
|
wss,
|
|
preauthConnectionBudget,
|
|
clients,
|
|
broadcast,
|
|
broadcastToConnIds,
|
|
agentRunSeq,
|
|
dedupe,
|
|
chatRunState,
|
|
chatRunBuffers,
|
|
chatDeltaSentAt,
|
|
chatDeltaLastBroadcastLen,
|
|
addChatRun,
|
|
removeChatRun,
|
|
chatAbortControllers,
|
|
toolEventRecipients,
|
|
} = await startupTrace.measure("runtime.state", () =>
|
|
createGatewayRuntimeState({
|
|
cfg: cfgAtStart,
|
|
bindHost,
|
|
port,
|
|
controlUiEnabled,
|
|
controlUiBasePath,
|
|
controlUiRoot: controlUiRootState,
|
|
openAiChatCompletionsEnabled,
|
|
openAiChatCompletionsConfig,
|
|
openResponsesEnabled,
|
|
openResponsesConfig,
|
|
strictTransportSecurityHeader,
|
|
resolvedAuth,
|
|
rateLimiter: authRateLimiter,
|
|
gatewayTls,
|
|
getResolvedAuth,
|
|
hooksConfig: () => runtimeState?.hooksConfig ?? initialHooksConfig,
|
|
getHookClientIpConfig: () => runtimeState?.hookClientIpConfig ?? initialHookClientIpConfig,
|
|
pluginRegistry,
|
|
pinChannelRegistry: !minimalTestGateway,
|
|
deps,
|
|
canvasRuntime,
|
|
canvasHostEnabled,
|
|
allowCanvasHostInTests: opts.allowCanvasHostInTests,
|
|
logCanvas,
|
|
log,
|
|
logHooks,
|
|
logPlugins,
|
|
getReadiness,
|
|
}),
|
|
);
|
|
const {
|
|
nodeRegistry,
|
|
nodePresenceTimers,
|
|
sessionEventSubscribers,
|
|
sessionMessageSubscribers,
|
|
nodeSendToSession,
|
|
nodeSendToAllSubscribed,
|
|
nodeSubscribe,
|
|
nodeUnsubscribe,
|
|
nodeUnsubscribeAll,
|
|
broadcastVoiceWakeChanged,
|
|
hasMobileNodeConnected,
|
|
} = createGatewayNodeSessionRuntime({ broadcast });
|
|
applyGatewayLaneConcurrency(cfgAtStart);
|
|
|
|
runtimeState = createGatewayServerLiveState({
|
|
hooksConfig: initialHooksConfig,
|
|
hookClientIpConfig: initialHookClientIpConfig,
|
|
cronState: buildGatewayCronService({
|
|
cfg: cfgAtStart,
|
|
deps,
|
|
broadcast,
|
|
}),
|
|
gatewayMethods: listActiveGatewayMethods(baseGatewayMethods),
|
|
});
|
|
deps.cron = runtimeState.cronState.cron;
|
|
|
|
const runClosePrelude = async () =>
|
|
await runGatewayClosePrelude({
|
|
...(diagnosticsEnabled ? { stopDiagnostics: stopDiagnosticHeartbeat } : {}),
|
|
clearSkillsRefreshTimer: () => {
|
|
if (!runtimeState?.skillsRefreshTimer) {
|
|
return;
|
|
}
|
|
clearTimeout(runtimeState.skillsRefreshTimer);
|
|
runtimeState.skillsRefreshTimer = null;
|
|
},
|
|
skillsChangeUnsub: runtimeState.skillsChangeUnsub,
|
|
...(authRateLimiter ? { disposeAuthRateLimiter: () => authRateLimiter.dispose() } : {}),
|
|
disposeBrowserAuthRateLimiter: () => browserAuthRateLimiter.dispose(),
|
|
stopModelPricingRefresh: runtimeState.stopModelPricingRefresh,
|
|
stopChannelHealthMonitor: () => runtimeState?.channelHealthMonitor?.stop(),
|
|
clearSecretsRuntimeSnapshot,
|
|
closeMcpServer: async () => await closeMcpLoopbackServer(),
|
|
});
|
|
const { getRuntimeSnapshot, startChannels, startChannel, stopChannel, markChannelLoggedOut } =
|
|
channelManager;
|
|
const createCloseHandler = () =>
|
|
createGatewayCloseHandler({
|
|
bonjourStop: runtimeState.bonjourStop,
|
|
tailscaleCleanup: runtimeState.tailscaleCleanup,
|
|
canvasHost,
|
|
canvasHostServer,
|
|
releasePluginRouteRegistry,
|
|
stopChannel,
|
|
pluginServices: runtimeState.pluginServices,
|
|
cron: runtimeState.cronState.cron,
|
|
heartbeatRunner: runtimeState.heartbeatRunner,
|
|
updateCheckStop: runtimeState.stopGatewayUpdateCheck,
|
|
stopTaskRegistryMaintenance,
|
|
nodePresenceTimers,
|
|
broadcast,
|
|
tickInterval: runtimeState.tickInterval,
|
|
healthInterval: runtimeState.healthInterval,
|
|
dedupeCleanup: runtimeState.dedupeCleanup,
|
|
mediaCleanup: runtimeState.mediaCleanup,
|
|
agentUnsub: runtimeState.agentUnsub,
|
|
heartbeatUnsub: runtimeState.heartbeatUnsub,
|
|
transcriptUnsub: runtimeState.transcriptUnsub,
|
|
lifecycleUnsub: runtimeState.lifecycleUnsub,
|
|
chatRunState,
|
|
clients,
|
|
configReloader: runtimeState.configReloader,
|
|
wss,
|
|
httpServer,
|
|
httpServers,
|
|
});
|
|
const closeOnStartupFailure = async () => {
|
|
await runClosePrelude();
|
|
await createCloseHandler()({ reason: "gateway startup failed" });
|
|
};
|
|
|
|
try {
|
|
const earlyRuntime = await startupTrace.measure("runtime.early", () =>
|
|
startGatewayEarlyRuntime({
|
|
minimalTestGateway,
|
|
cfgAtStart,
|
|
port,
|
|
gatewayTls,
|
|
tailscaleMode,
|
|
log,
|
|
logDiscovery,
|
|
nodeRegistry,
|
|
broadcast,
|
|
nodeSendToAllSubscribed,
|
|
getPresenceVersion,
|
|
getHealthVersion,
|
|
refreshGatewayHealthSnapshot,
|
|
logHealth,
|
|
dedupe,
|
|
chatAbortControllers,
|
|
chatRunState,
|
|
chatRunBuffers,
|
|
chatDeltaSentAt,
|
|
chatDeltaLastBroadcastLen,
|
|
removeChatRun,
|
|
agentRunSeq,
|
|
nodeSendToSession,
|
|
...(typeof cfgAtStart.media?.ttlHours === "number"
|
|
? { mediaCleanupTtlMs: resolveMediaCleanupTtlMs(cfgAtStart.media.ttlHours) }
|
|
: {}),
|
|
skillsRefreshDelayMs: runtimeState.skillsRefreshDelayMs,
|
|
getSkillsRefreshTimer: () => runtimeState.skillsRefreshTimer,
|
|
setSkillsRefreshTimer: (timer) => {
|
|
runtimeState.skillsRefreshTimer = timer;
|
|
},
|
|
loadConfig,
|
|
}),
|
|
);
|
|
runtimeState.bonjourStop = earlyRuntime.bonjourStop;
|
|
runtimeState.skillsChangeUnsub = earlyRuntime.skillsChangeUnsub;
|
|
if (earlyRuntime.maintenance) {
|
|
runtimeState.tickInterval = earlyRuntime.maintenance.tickInterval;
|
|
runtimeState.healthInterval = earlyRuntime.maintenance.healthInterval;
|
|
runtimeState.dedupeCleanup = earlyRuntime.maintenance.dedupeCleanup;
|
|
runtimeState.mediaCleanup = earlyRuntime.maintenance.mediaCleanup;
|
|
}
|
|
|
|
Object.assign(
|
|
runtimeState,
|
|
startGatewayEventSubscriptions({
|
|
broadcast,
|
|
broadcastToConnIds,
|
|
nodeSendToSession,
|
|
agentRunSeq,
|
|
chatRunState,
|
|
resolveSessionKeyForRun,
|
|
clearAgentRunContext,
|
|
toolEventRecipients,
|
|
sessionEventSubscribers,
|
|
sessionMessageSubscribers,
|
|
chatAbortControllers,
|
|
}),
|
|
);
|
|
|
|
Object.assign(
|
|
runtimeState,
|
|
startGatewayRuntimeServices({
|
|
minimalTestGateway,
|
|
cfgAtStart,
|
|
channelManager,
|
|
log,
|
|
}),
|
|
);
|
|
|
|
const { execApprovalManager, pluginApprovalManager, extraHandlers } = createGatewayAuxHandlers({
|
|
log,
|
|
activateRuntimeSecrets,
|
|
sharedGatewaySessionGenerationState,
|
|
resolveSharedGatewaySessionGenerationForConfig,
|
|
clients,
|
|
});
|
|
|
|
const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port;
|
|
|
|
const unavailableGatewayMethods = new Set<string>(
|
|
minimalTestGateway ? [] : STARTUP_UNAVAILABLE_GATEWAY_METHODS,
|
|
);
|
|
const gatewayRequestContext = createGatewayRequestContext({
|
|
deps,
|
|
runtimeState,
|
|
execApprovalManager,
|
|
pluginApprovalManager,
|
|
loadGatewayModelCatalog,
|
|
getHealthCache,
|
|
refreshHealthSnapshot: refreshGatewayHealthSnapshot,
|
|
logHealth,
|
|
logGateway: log,
|
|
incrementPresenceVersion,
|
|
getHealthVersion,
|
|
broadcast,
|
|
broadcastToConnIds,
|
|
nodeSendToSession,
|
|
nodeSendToAllSubscribed,
|
|
nodeSubscribe,
|
|
nodeUnsubscribe,
|
|
nodeUnsubscribeAll,
|
|
hasConnectedMobileNode: hasMobileNodeConnected,
|
|
clients,
|
|
enforceSharedGatewayAuthGenerationForConfigWrite: (nextConfig: OpenClawConfig) => {
|
|
enforceSharedGatewaySessionGenerationForConfigWrite({
|
|
state: sharedGatewaySessionGenerationState,
|
|
nextConfig,
|
|
resolveRuntimeSnapshotGeneration: resolveSharedGatewaySessionGenerationForRuntimeSnapshot,
|
|
clients,
|
|
});
|
|
},
|
|
nodeRegistry,
|
|
agentRunSeq,
|
|
chatAbortControllers,
|
|
chatAbortedRuns: chatRunState.abortedRuns,
|
|
chatRunBuffers: chatRunState.buffers,
|
|
chatDeltaSentAt: chatRunState.deltaSentAt,
|
|
chatDeltaLastBroadcastLen: chatRunState.deltaLastBroadcastLen,
|
|
addChatRun,
|
|
removeChatRun,
|
|
subscribeSessionEvents: sessionEventSubscribers.subscribe,
|
|
unsubscribeSessionEvents: sessionEventSubscribers.unsubscribe,
|
|
subscribeSessionMessageEvents: sessionMessageSubscribers.subscribe,
|
|
unsubscribeSessionMessageEvents: sessionMessageSubscribers.unsubscribe,
|
|
unsubscribeAllSessionEvents: (connId: string) => {
|
|
sessionEventSubscribers.unsubscribe(connId);
|
|
sessionMessageSubscribers.unsubscribeAll(connId);
|
|
},
|
|
getSessionEventSubscriberConnIds: sessionEventSubscribers.getAll,
|
|
registerToolEventRecipient: toolEventRecipients.add,
|
|
dedupe,
|
|
wizardSessions,
|
|
findRunningWizard,
|
|
purgeWizardSession,
|
|
getRuntimeSnapshot,
|
|
startChannel,
|
|
stopChannel,
|
|
markChannelLoggedOut,
|
|
wizardRunner,
|
|
broadcastVoiceWakeChanged,
|
|
unavailableGatewayMethods,
|
|
});
|
|
|
|
setFallbackGatewayContextResolver(() => gatewayRequestContext);
|
|
|
|
if (!minimalTestGateway) {
|
|
if (deferredConfiguredChannelPluginIds.length > 0) {
|
|
({ pluginRegistry, gatewayMethods: baseGatewayMethods } = reloadDeferredGatewayPlugins({
|
|
cfg: gatewayPluginConfigAtStart,
|
|
workspaceDir: defaultWorkspaceDir,
|
|
log,
|
|
coreGatewayHandlers,
|
|
baseMethods,
|
|
pluginIds: startupPluginIds,
|
|
logDiagnostics: false,
|
|
}));
|
|
runtimeState.gatewayMethods = listActiveGatewayMethods(baseGatewayMethods);
|
|
}
|
|
}
|
|
|
|
attachGatewayWsHandlers({
|
|
wss,
|
|
clients,
|
|
preauthConnectionBudget,
|
|
port,
|
|
gatewayHost: bindHost ?? undefined,
|
|
canvasHostEnabled: Boolean(canvasHost),
|
|
canvasHostServerPort,
|
|
resolvedAuth,
|
|
getResolvedAuth,
|
|
getRequiredSharedGatewaySessionGeneration: () =>
|
|
getRequiredSharedGatewaySessionGeneration(sharedGatewaySessionGenerationState),
|
|
rateLimiter: authRateLimiter,
|
|
browserRateLimiter: browserAuthRateLimiter,
|
|
gatewayMethods: runtimeState.gatewayMethods,
|
|
events: GATEWAY_EVENTS,
|
|
logGateway: log,
|
|
logHealth,
|
|
logWsControl,
|
|
extraHandlers: { ...pluginRegistry.gatewayHandlers, ...extraHandlers },
|
|
broadcast,
|
|
context: gatewayRequestContext,
|
|
});
|
|
await startListening();
|
|
startupTrace.mark("http.bound");
|
|
({
|
|
stopGatewayUpdateCheck: runtimeState.stopGatewayUpdateCheck,
|
|
tailscaleCleanup: runtimeState.tailscaleCleanup,
|
|
pluginServices: runtimeState.pluginServices,
|
|
} = await startupTrace.measure("runtime.post-attach", () =>
|
|
startGatewayPostAttachRuntime({
|
|
minimalTestGateway,
|
|
cfgAtStart,
|
|
bindHost,
|
|
bindHosts: httpBindHosts,
|
|
port,
|
|
tlsEnabled: gatewayTls.enabled,
|
|
log,
|
|
isNixMode,
|
|
startupStartedAt: opts.startupStartedAt,
|
|
broadcast,
|
|
tailscaleMode,
|
|
resetOnExit: tailscaleConfig.resetOnExit ?? false,
|
|
controlUiBasePath,
|
|
logTailscale,
|
|
gatewayPluginConfigAtStart,
|
|
pluginRegistry,
|
|
defaultWorkspaceDir,
|
|
deps,
|
|
startChannels,
|
|
logHooks,
|
|
logChannels,
|
|
unavailableGatewayMethods,
|
|
onPluginServices: (pluginServices) => {
|
|
runtimeState.pluginServices = pluginServices;
|
|
},
|
|
onSidecarsReady: () => {
|
|
startupSidecarsReady = true;
|
|
},
|
|
startupTrace,
|
|
awaitSidecars: opts.awaitStartupSidecars,
|
|
}),
|
|
));
|
|
startupTrace.mark("ready");
|
|
|
|
// HTTP is live before sidecars finish; /readyz stays red until the startup sidecars settle.
|
|
const activated = activateGatewayScheduledServices({
|
|
minimalTestGateway,
|
|
cfgAtStart,
|
|
cron: runtimeState.cronState.cron,
|
|
logCron,
|
|
log,
|
|
});
|
|
runtimeState.heartbeatRunner = activated.heartbeatRunner;
|
|
|
|
runtimeState.configReloader = startManagedGatewayConfigReloader({
|
|
minimalTestGateway,
|
|
initialConfig: cfgAtStart,
|
|
initialInternalWriteHash: startupInternalWriteHash,
|
|
watchPath: configSnapshot.path,
|
|
readSnapshot: readConfigFileSnapshot,
|
|
recoverSnapshot: recoverConfigFromLastKnownGood,
|
|
promoteSnapshot: promoteConfigSnapshotToLastKnownGood,
|
|
subscribeToWrites: registerConfigWriteListener,
|
|
deps,
|
|
broadcast,
|
|
getState: () => ({
|
|
hooksConfig: runtimeState.hooksConfig,
|
|
hookClientIpConfig: runtimeState.hookClientIpConfig,
|
|
heartbeatRunner: runtimeState.heartbeatRunner,
|
|
cronState: runtimeState.cronState,
|
|
channelHealthMonitor: runtimeState.channelHealthMonitor,
|
|
}),
|
|
setState: (nextState) => {
|
|
runtimeState.hooksConfig = nextState.hooksConfig;
|
|
runtimeState.hookClientIpConfig = nextState.hookClientIpConfig;
|
|
runtimeState.heartbeatRunner = nextState.heartbeatRunner;
|
|
runtimeState.cronState = nextState.cronState;
|
|
deps.cron = runtimeState.cronState.cron;
|
|
runtimeState.channelHealthMonitor = nextState.channelHealthMonitor;
|
|
},
|
|
startChannel,
|
|
stopChannel,
|
|
logHooks,
|
|
logChannels,
|
|
logCron,
|
|
logReload,
|
|
channelManager,
|
|
activateRuntimeSecrets,
|
|
resolveSharedGatewaySessionGenerationForConfig,
|
|
sharedGatewaySessionGenerationState,
|
|
clients,
|
|
});
|
|
await promoteConfigSnapshotToLastKnownGood(startupLastGoodSnapshot).catch((err) => {
|
|
log.warn(`gateway: failed to promote config last-known-good backup: ${String(err)}`);
|
|
});
|
|
} catch (err) {
|
|
await closeOnStartupFailure();
|
|
throw err;
|
|
}
|
|
|
|
const close = createCloseHandler();
|
|
|
|
return {
|
|
close: async (opts) => {
|
|
// Run gateway_stop plugin hook before shutdown
|
|
await runGlobalGatewayStopSafely({
|
|
event: { reason: opts?.reason ?? "gateway stopping" },
|
|
ctx: { port },
|
|
onError: (err) => log.warn(`gateway_stop hook failed: ${String(err)}`),
|
|
});
|
|
await runClosePrelude();
|
|
await close(opts);
|
|
},
|
|
};
|
|
}
|