Gateway/iOS: replay queued foreground actions safely after resume (#40281)

Merged via squash.

- Local validation: `pnpm exec vitest run --config vitest.gateway.config.ts src/gateway/server-methods/nodes.invoke-wake.test.ts`
- Local validation: `pnpm build`
- mb-server validation: `pnpm exec vitest run --config vitest.gateway.config.ts src/gateway/server-methods/nodes.invoke-wake.test.ts`
- mb-server validation: `pnpm build`
- mb-server validation: `pnpm protocol:check`
This commit is contained in:
Mariano
2026-03-08 22:46:54 +01:00
committed by GitHub
parent 38543d8196
commit e806c479f5
12 changed files with 604 additions and 1 deletions

View File

@@ -23,6 +23,8 @@ const NODE_ROLE_METHODS = new Set([
"node.invoke.result",
"node.event",
"node.canvas.capability.refresh",
"node.pending.pull",
"node.pending.ack",
"skills.bins",
]);

View File

@@ -146,6 +146,8 @@ import {
NodeInvokeResultParamsSchema,
type NodeListParams,
NodeListParamsSchema,
type NodePendingAckParams,
NodePendingAckParamsSchema,
type NodePairApproveParams,
NodePairApproveParamsSchema,
type NodePairListParams,
@@ -285,6 +287,9 @@ export const validateNodePairVerifyParams = ajv.compile<NodePairVerifyParams>(
);
export const validateNodeRenameParams = ajv.compile<NodeRenameParams>(NodeRenameParamsSchema);
export const validateNodeListParams = ajv.compile<NodeListParams>(NodeListParamsSchema);
export const validateNodePendingAckParams = ajv.compile<NodePendingAckParams>(
NodePendingAckParamsSchema,
);
export const validateNodeDescribeParams = ajv.compile<NodeDescribeParams>(NodeDescribeParamsSchema);
export const validateNodeInvokeParams = ajv.compile<NodeInvokeParams>(NodeInvokeParamsSchema);
export const validateNodeInvokeResultParams = ajv.compile<NodeInvokeResultParams>(
@@ -465,6 +470,7 @@ export {
NodePairRejectParamsSchema,
NodePairVerifyParamsSchema,
NodeListParamsSchema,
NodePendingAckParamsSchema,
NodeInvokeParamsSchema,
SessionsListParamsSchema,
SessionsPreviewParamsSchema,

View File

@@ -43,6 +43,13 @@ export const NodeRenameParamsSchema = Type.Object(
export const NodeListParamsSchema = Type.Object({}, { additionalProperties: false });
export const NodePendingAckParamsSchema = Type.Object(
{
ids: Type.Array(NonEmptyString, { minItems: 1 }),
},
{ additionalProperties: false },
);
export const NodeDescribeParamsSchema = Type.Object(
{ nodeId: NonEmptyString },
{ additionalProperties: false },

View File

@@ -118,6 +118,7 @@ import {
NodeInvokeResultParamsSchema,
NodeInvokeRequestEventSchema,
NodeListParamsSchema,
NodePendingAckParamsSchema,
NodePairApproveParamsSchema,
NodePairListParamsSchema,
NodePairRejectParamsSchema,
@@ -180,6 +181,7 @@ export const ProtocolSchemas = {
NodePairVerifyParams: NodePairVerifyParamsSchema,
NodeRenameParams: NodeRenameParamsSchema,
NodeListParams: NodeListParamsSchema,
NodePendingAckParams: NodePendingAckParamsSchema,
NodeDescribeParams: NodeDescribeParamsSchema,
NodeInvokeParams: NodeInvokeParamsSchema,
NodeInvokeResultParams: NodeInvokeResultParamsSchema,

View File

@@ -27,6 +27,7 @@ export type NodePairRejectParams = SchemaType<"NodePairRejectParams">;
export type NodePairVerifyParams = SchemaType<"NodePairVerifyParams">;
export type NodeRenameParams = SchemaType<"NodeRenameParams">;
export type NodeListParams = SchemaType<"NodeListParams">;
export type NodePendingAckParams = SchemaType<"NodePendingAckParams">;
export type NodeDescribeParams = SchemaType<"NodeDescribeParams">;
export type NodeInvokeParams = SchemaType<"NodeInvokeParams">;
export type NodeInvokeResultParams = SchemaType<"NodeInvokeResultParams">;

View File

@@ -77,6 +77,8 @@ const BASE_METHODS = [
"node.list",
"node.describe",
"node.invoke",
"node.pending.pull",
"node.pending.ack",
"node.invoke.result",
"node.event",
"node.canvas.capability.refresh",

View File

@@ -49,6 +49,7 @@ type RespondCall = [
type TestNodeSession = {
nodeId: string;
commands: string[];
platform?: string;
};
const WAKE_WAIT_TIMEOUT_MS = 3_001;
@@ -102,6 +103,54 @@ async function invokeNode(params: {
return respond;
}
async function pullPending(nodeId: string) {
const respond = vi.fn();
await nodeHandlers["node.pending.pull"]({
params: {},
respond: respond as never,
context: {} as never,
client: {
connect: {
role: "node",
client: {
id: nodeId,
mode: "node",
name: "ios-test",
platform: "iOS 26.4.0",
version: "test",
},
},
} as never,
req: { type: "req", id: "req-node-pending", method: "node.pending.pull" },
isWebchatConnect: () => false,
});
return respond;
}
async function ackPending(nodeId: string, ids: string[]) {
const respond = vi.fn();
await nodeHandlers["node.pending.ack"]({
params: { ids },
respond: respond as never,
context: {} as never,
client: {
connect: {
role: "node",
client: {
id: nodeId,
mode: "node",
name: "ios-test",
platform: "iOS 26.4.0",
version: "test",
},
},
} as never,
req: { type: "req", id: "req-node-pending-ack", method: "node.pending.ack" },
isWebchatConnect: () => false,
});
return respond;
}
function mockSuccessfulWakeConfig(nodeId: string) {
mocks.loadApnsRegistration.mockResolvedValue({
nodeId,
@@ -229,4 +278,138 @@ describe("node.invoke APNs wake path", () => {
expect(mocks.sendApnsBackgroundWake).toHaveBeenCalledTimes(2);
expect(nodeRegistry.invoke).not.toHaveBeenCalled();
});
it("queues iOS foreground-only command failures and keeps them until acked", async () => {
mocks.loadApnsRegistration.mockResolvedValue(null);
const nodeRegistry = {
get: vi.fn(() => ({
nodeId: "ios-node-queued",
commands: ["canvas.navigate"],
platform: "iOS 26.4.0",
})),
invoke: vi.fn().mockResolvedValue({
ok: false,
error: {
code: "NODE_BACKGROUND_UNAVAILABLE",
message: "NODE_BACKGROUND_UNAVAILABLE: canvas/camera/screen commands require foreground",
},
}),
};
const respond = await invokeNode({
nodeRegistry,
requestParams: {
nodeId: "ios-node-queued",
command: "canvas.navigate",
params: { url: "http://example.com/" },
idempotencyKey: "idem-queued",
},
});
const call = respond.mock.calls[0] as RespondCall | undefined;
expect(call?.[0]).toBe(false);
expect(call?.[2]?.code).toBe(ErrorCodes.UNAVAILABLE);
expect(call?.[2]?.message).toBe("node command queued until iOS returns to foreground");
expect(mocks.sendApnsBackgroundWake).not.toHaveBeenCalled();
const pullRespond = await pullPending("ios-node-queued");
const pullCall = pullRespond.mock.calls[0] as RespondCall | undefined;
expect(pullCall?.[0]).toBe(true);
expect(pullCall?.[1]).toMatchObject({
nodeId: "ios-node-queued",
actions: [
expect.objectContaining({
command: "canvas.navigate",
paramsJSON: JSON.stringify({ url: "http://example.com/" }),
}),
],
});
const repeatedPullRespond = await pullPending("ios-node-queued");
const repeatedPullCall = repeatedPullRespond.mock.calls[0] as RespondCall | undefined;
expect(repeatedPullCall?.[0]).toBe(true);
expect(repeatedPullCall?.[1]).toMatchObject({
nodeId: "ios-node-queued",
actions: [
expect.objectContaining({
command: "canvas.navigate",
paramsJSON: JSON.stringify({ url: "http://example.com/" }),
}),
],
});
const queuedActionId = (pullCall?.[1] as { actions?: Array<{ id?: string }> } | undefined)
?.actions?.[0]?.id;
expect(queuedActionId).toBeTruthy();
const ackRespond = await ackPending("ios-node-queued", [queuedActionId!]);
const ackCall = ackRespond.mock.calls[0] as RespondCall | undefined;
expect(ackCall?.[0]).toBe(true);
expect(ackCall?.[1]).toMatchObject({
nodeId: "ios-node-queued",
ackedIds: [queuedActionId],
remainingCount: 0,
});
const emptyPullRespond = await pullPending("ios-node-queued");
const emptyPullCall = emptyPullRespond.mock.calls[0] as RespondCall | undefined;
expect(emptyPullCall?.[0]).toBe(true);
expect(emptyPullCall?.[1]).toMatchObject({
nodeId: "ios-node-queued",
actions: [],
});
});
it("dedupes queued foreground actions by idempotency key", async () => {
mocks.loadApnsRegistration.mockResolvedValue(null);
const nodeRegistry = {
get: vi.fn(() => ({
nodeId: "ios-node-dedupe",
commands: ["canvas.navigate"],
platform: "iPadOS 26.4.0",
})),
invoke: vi.fn().mockResolvedValue({
ok: false,
error: {
code: "NODE_BACKGROUND_UNAVAILABLE",
message: "NODE_BACKGROUND_UNAVAILABLE: canvas/camera/screen commands require foreground",
},
}),
};
await invokeNode({
nodeRegistry,
requestParams: {
nodeId: "ios-node-dedupe",
command: "canvas.navigate",
params: { url: "http://example.com/first" },
idempotencyKey: "idem-dedupe",
},
});
await invokeNode({
nodeRegistry,
requestParams: {
nodeId: "ios-node-dedupe",
command: "canvas.navigate",
params: { url: "http://example.com/first" },
idempotencyKey: "idem-dedupe",
},
});
const pullRespond = await pullPending("ios-node-dedupe");
const pullCall = pullRespond.mock.calls[0] as RespondCall | undefined;
expect(pullCall?.[0]).toBe(true);
expect(pullCall?.[1]).toMatchObject({
nodeId: "ios-node-dedupe",
actions: [
expect.objectContaining({
command: "canvas.navigate",
paramsJSON: JSON.stringify({ url: "http://example.com/first" }),
}),
],
});
const actions = (pullCall?.[1] as { actions?: unknown[] } | undefined)?.actions ?? [];
expect(actions).toHaveLength(1);
});
});

View File

@@ -1,3 +1,4 @@
import { randomUUID } from "node:crypto";
import { loadConfig } from "../../config/config.js";
import { listDevicePairing } from "../../infra/device-pairing.js";
import {
@@ -28,6 +29,7 @@ import {
validateNodeEventParams,
validateNodeInvokeParams,
validateNodeListParams,
validateNodePendingAckParams,
validateNodePairApproveParams,
validateNodePairListParams,
validateNodePairRejectParams,
@@ -50,6 +52,8 @@ const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
const NODE_WAKE_RECONNECT_POLL_MS = 150;
const NODE_WAKE_THROTTLE_MS = 15_000;
const NODE_WAKE_NUDGE_THROTTLE_MS = 10 * 60_000;
const NODE_PENDING_ACTION_TTL_MS = 10 * 60_000;
const NODE_PENDING_ACTION_MAX_PER_NODE = 64;
type NodeWakeState = {
lastWakeAtMs: number;
@@ -77,6 +81,17 @@ type NodeWakeNudgeAttempt = {
apnsReason?: string;
};
type PendingNodeAction = {
id: string;
nodeId: string;
command: string;
paramsJSON?: string;
idempotencyKey: string;
enqueuedAtMs: number;
};
const pendingNodeActionsById = new Map<string, PendingNodeAction[]>();
function isNodeEntry(entry: { role?: string; roles?: string[] }) {
if (entry.role === "node") {
return true;
@@ -91,6 +106,108 @@ async function delayMs(ms: number): Promise<void> {
await new Promise<void>((resolve) => setTimeout(resolve, ms));
}
function isForegroundRestrictedIosCommand(command: string): boolean {
return (
command === "canvas.present" ||
command === "canvas.navigate" ||
command.startsWith("canvas.") ||
command.startsWith("camera.") ||
command.startsWith("screen.") ||
command.startsWith("talk.")
);
}
function shouldQueueAsPendingForegroundAction(params: {
platform?: string;
command: string;
error: unknown;
}): boolean {
const platform = (params.platform ?? "").trim().toLowerCase();
if (!platform.startsWith("ios") && !platform.startsWith("ipados")) {
return false;
}
if (!isForegroundRestrictedIosCommand(params.command)) {
return false;
}
const error =
params.error && typeof params.error === "object"
? (params.error as { code?: unknown; message?: unknown })
: null;
const code = typeof error?.code === "string" ? error.code.trim().toUpperCase() : "";
const message = typeof error?.message === "string" ? error.message.trim().toUpperCase() : "";
return code === "NODE_BACKGROUND_UNAVAILABLE" || message.includes("BACKGROUND_UNAVAILABLE");
}
function prunePendingNodeActions(nodeId: string, nowMs: number): PendingNodeAction[] {
const queue = pendingNodeActionsById.get(nodeId) ?? [];
const minTimestampMs = nowMs - NODE_PENDING_ACTION_TTL_MS;
const live = queue.filter((entry) => entry.enqueuedAtMs >= minTimestampMs);
if (live.length === 0) {
pendingNodeActionsById.delete(nodeId);
return [];
}
pendingNodeActionsById.set(nodeId, live);
return live;
}
function enqueuePendingNodeAction(params: {
nodeId: string;
command: string;
paramsJSON?: string;
idempotencyKey: string;
}): PendingNodeAction {
const nowMs = Date.now();
const queue = prunePendingNodeActions(params.nodeId, nowMs);
const existing = queue.find((entry) => entry.idempotencyKey === params.idempotencyKey);
if (existing) {
return existing;
}
const entry: PendingNodeAction = {
id: randomUUID(),
nodeId: params.nodeId,
command: params.command,
paramsJSON: params.paramsJSON,
idempotencyKey: params.idempotencyKey,
enqueuedAtMs: nowMs,
};
queue.push(entry);
if (queue.length > NODE_PENDING_ACTION_MAX_PER_NODE) {
queue.splice(0, queue.length - NODE_PENDING_ACTION_MAX_PER_NODE);
}
pendingNodeActionsById.set(params.nodeId, queue);
return entry;
}
function listPendingNodeActions(nodeId: string): PendingNodeAction[] {
return prunePendingNodeActions(nodeId, Date.now());
}
function ackPendingNodeActions(nodeId: string, ids: string[]): PendingNodeAction[] {
if (ids.length === 0) {
return listPendingNodeActions(nodeId);
}
const pending = prunePendingNodeActions(nodeId, Date.now());
const idSet = new Set(ids);
const remaining = pending.filter((entry) => !idSet.has(entry.id));
if (remaining.length === 0) {
pendingNodeActionsById.delete(nodeId);
return [];
}
pendingNodeActionsById.set(nodeId, remaining);
return remaining;
}
function toPendingParamsJSON(params: unknown): string | undefined {
if (params === undefined) {
return undefined;
}
try {
return JSON.stringify(params);
} catch {
return undefined;
}
}
async function maybeWakeNodeWithApns(
nodeId: string,
opts?: { force?: boolean },
@@ -596,6 +713,66 @@ export const nodeHandlers: GatewayRequestHandlers = {
undefined,
);
},
"node.pending.pull": async ({ params, respond, client }) => {
if (!validateNodeListParams(params)) {
respondInvalidParams({
respond,
method: "node.pending.pull",
validator: validateNodeListParams,
});
return;
}
const nodeId = client?.connect?.device?.id ?? client?.connect?.client?.id;
const trimmedNodeId = String(nodeId ?? "").trim();
if (!trimmedNodeId) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "nodeId required"));
return;
}
const pending = listPendingNodeActions(trimmedNodeId);
respond(
true,
{
nodeId: trimmedNodeId,
actions: pending.map((entry) => ({
id: entry.id,
command: entry.command,
paramsJSON: entry.paramsJSON ?? null,
enqueuedAtMs: entry.enqueuedAtMs,
})),
},
undefined,
);
},
"node.pending.ack": async ({ params, respond, client }) => {
if (!validateNodePendingAckParams(params)) {
respondInvalidParams({
respond,
method: "node.pending.ack",
validator: validateNodePendingAckParams,
});
return;
}
const nodeId = client?.connect?.device?.id ?? client?.connect?.client?.id;
const trimmedNodeId = String(nodeId ?? "").trim();
if (!trimmedNodeId) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "nodeId required"));
return;
}
const ackIds = Array.from(
new Set((params.ids ?? []).map((value) => String(value ?? "").trim()).filter(Boolean)),
);
const remaining = ackPendingNodeActions(trimmedNodeId, ackIds);
respond(
true,
{
nodeId: trimmedNodeId,
ackedIds: ackIds,
remainingCount: remaining.length,
},
undefined,
);
},
"node.invoke": async ({ params, respond, context, client, req }) => {
if (!validateNodeInvokeParams(params)) {
respondInvalidParams({
@@ -759,7 +936,56 @@ export const nodeHandlers: GatewayRequestHandlers = {
timeoutMs: p.timeoutMs,
idempotencyKey: p.idempotencyKey,
});
if (!respondUnavailableOnNodeInvokeError(respond, res)) {
if (!res.ok) {
if (
shouldQueueAsPendingForegroundAction({
platform: nodeSession.platform,
command,
error: res.error,
})
) {
const paramsJSON = toPendingParamsJSON(forwardedParams.params);
const queued = enqueuePendingNodeAction({
nodeId,
command,
paramsJSON,
idempotencyKey: p.idempotencyKey,
});
const wake = await maybeWakeNodeWithApns(nodeId);
context.logGateway.info(
`node pending queued node=${nodeId} req=${req.id} command=${command} ` +
`queuedId=${queued.id} wakePath=${wake.path} wakeAvailable=${wake.available}`,
);
respond(
false,
undefined,
errorShape(
ErrorCodes.UNAVAILABLE,
"node command queued until iOS returns to foreground",
{
retryable: true,
details: {
code: "QUEUED_UNTIL_FOREGROUND",
queuedActionId: queued.id,
nodeId,
command,
wake: {
path: wake.path,
available: wake.available,
throttled: wake.throttled,
apnsStatus: wake.apnsStatus,
apnsReason: wake.apnsReason,
},
nodeError: res.error ?? null,
},
},
),
);
return;
}
if (!respondUnavailableOnNodeInvokeError(respond, res)) {
return;
}
return;
}
const payload = res.payloadJSON ? safeParseJson(res.payloadJSON) : res.payload;