test: clear restart sentinel broad matchers

This commit is contained in:
Peter Steinberger
2026-05-10 08:15:11 +01:00
parent 4643ec761b
commit 1a2664e2ce

View File

@@ -293,6 +293,60 @@ vi.mock("./server-methods/agent-timestamp.js", () => ({
const { scheduleRestartSentinelWake } = await import("./server-restart-sentinel.js");
function expectRecordFields(
record: unknown,
expected: Record<string, unknown>,
): Record<string, unknown> {
expect(record).toBeDefined();
const actual = record as Record<string, unknown>;
for (const [key, value] of Object.entries(expected)) {
expect(actual[key]).toEqual(value);
}
return actual;
}
function mockCallArg(mock: { mock: { calls: Array<Array<unknown>> } }, callIndex = 0): unknown {
const call = mock.mock.calls[callIndex];
expect(call).toBeDefined();
return call?.[0];
}
function lastMockCallArg(mock: { mock: { calls: Array<Array<unknown>> } }): unknown {
const call = mock.mock.calls.at(-1);
expect(call).toBeDefined();
return call?.[0];
}
function expectMockCallFields(
mock: { mock: { calls: Array<Array<unknown>> } },
expected: Record<string, unknown>,
callIndex = 0,
): Record<string, unknown> {
return expectRecordFields(mockCallArg(mock, callIndex), expected);
}
function expectNthSystemEventFields(callIndex: number, expected: Record<string, unknown>): void {
const call = mocks.enqueueSystemEvent.mock.calls[callIndex];
expect(call).toBeDefined();
expectRecordFields(call?.[1], expected);
}
function expectContinuationDispatchFields(
expected: Record<string, unknown>,
expectedCtx?: Record<string, unknown>,
callIndex = 0,
): Record<string, unknown> {
const params = expectMockCallFields(
mocks.recordInboundSessionAndDispatchReply,
expected,
callIndex,
);
if (expectedCtx) {
expectRecordFields(params.ctxPayload, expectedCtx);
}
return params;
}
describe("scheduleRestartSentinelWake", () => {
afterEach(() => {
vi.useRealTimers();
@@ -359,32 +413,26 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps });
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
channel: "whatsapp",
to: "+15550002",
session: { key: "agent:main:main", agentId: "agent-from-key" },
deps,
bestEffort: false,
skipQueue: true,
}),
);
expect(mocks.enqueueDelivery).toHaveBeenCalledWith(
expect.objectContaining({
channel: "whatsapp",
to: "+15550002",
payloads: [{ text: "restart message" }],
bestEffort: false,
}),
);
expectMockCallFields(mocks.deliverOutboundPayloads, {
channel: "whatsapp",
to: "+15550002",
session: { key: "agent:main:main", agentId: "agent-from-key" },
deps,
bestEffort: false,
skipQueue: true,
});
expectMockCallFields(mocks.enqueueDelivery, {
channel: "whatsapp",
to: "+15550002",
payloads: [{ text: "restart message" }],
bestEffort: false,
});
expect(mocks.ackDelivery).toHaveBeenCalledWith("queue-1");
expect(mocks.failDelivery).not.toHaveBeenCalled();
expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith(
"restart message",
expect.objectContaining({
sessionKey: "agent:main:main",
}),
);
expect(mocks.enqueueSystemEvent.mock.calls[0]?.[0]).toBe("restart message");
expectNthSystemEventFields(0, {
sessionKey: "agent:main:main",
});
expect(mocks.requestHeartbeat).toHaveBeenCalledWith({
source: "restart-sentinel",
intent: "immediate",
@@ -409,32 +457,19 @@ describe("scheduleRestartSentinelWake", () => {
expect(mocks.enqueueDelivery).toHaveBeenCalledTimes(1);
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledTimes(2);
expect(mocks.deliverOutboundPayloads).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
skipQueue: true,
}),
);
expect(mocks.deliverOutboundPayloads).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
skipQueue: true,
}),
);
expectMockCallFields(mocks.deliverOutboundPayloads, { skipQueue: true }, 0);
expectMockCallFields(mocks.deliverOutboundPayloads, { skipQueue: true }, 1);
expect(mocks.ackDelivery).toHaveBeenCalledWith("queue-1");
expect(mocks.failDelivery).not.toHaveBeenCalled();
expect(mocks.enqueueSystemEvent).toHaveBeenCalledTimes(1);
expect(mocks.requestHeartbeat).toHaveBeenCalledTimes(1);
expect(mocks.logWarn).toHaveBeenCalledWith(
expect.stringContaining("retrying in 1000ms"),
expect.objectContaining({
channel: "whatsapp",
to: "+15550002",
sessionKey: "agent:main:main",
attempt: 1,
maxAttempts: 45,
}),
);
expect(mocks.logWarn).toHaveBeenCalledWith(expect.stringContaining("retrying in 1000ms"), {
channel: "whatsapp",
to: "+15550002",
sessionKey: "agent:main:main",
attempt: 1,
maxAttempts: 45,
});
});
it("keeps one queued restart notice when outbound retries are exhausted", async () => {
@@ -484,14 +519,7 @@ describe("scheduleRestartSentinelWake", () => {
expect(mocks.failDelivery).toHaveBeenCalledWith("queue-1", "transport still not ready");
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1);
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
expect.objectContaining({
routeSessionKey: "agent:main:main",
ctxPayload: expect.objectContaining({
Body: "continue",
}),
}),
);
expectContinuationDispatchFields({ routeSessionKey: "agent:main:main" }, { Body: "continue" });
});
it("prefers top-level sentinel threadId for wake routing context", async () => {
@@ -511,15 +539,15 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith(
"restart message",
expect.objectContaining({
sessionKey: "agent:main:main",
deliveryContext: expect.objectContaining({
threadId: "fresh-thread",
}),
}),
);
expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith("restart message", {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
threadId: "fresh-thread",
},
});
});
it("dispatches agentTurn continuation after the restart notice in the same routed thread", async () => {
@@ -548,38 +576,36 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueDelivery).toHaveBeenCalledWith(
expect.objectContaining({
payloads: [{ text: "restart message" }],
threadId: "thread-42",
}),
);
expectMockCallFields(mocks.enqueueDelivery, {
payloads: [{ text: "restart message" }],
threadId: "thread-42",
});
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1);
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
expect.objectContaining({
expectContinuationDispatchFields(
{
channel: "whatsapp",
accountId: "acct-2",
routeSessionKey: "agent:main:main",
ctxPayload: expect.objectContaining({
Body: "Reply with exactly: Yay! I did it!",
BodyForAgent: "stamped:Reply with exactly: Yay! I did it!",
BodyForCommands: "",
CommandBody: "",
CommandAuthorized: true,
GatewayClientScopes: ["operator.admin"],
InputProvenance: {
kind: "internal_system",
sourceChannel: "whatsapp",
sourceTool: "restart-sentinel",
},
SessionKey: "agent:main:main",
Provider: "webchat",
Surface: "webchat",
OriginatingChannel: "whatsapp",
OriginatingTo: "+15550002",
MessageThreadId: "thread-42",
}),
}),
},
{
Body: "Reply with exactly: Yay! I did it!",
BodyForAgent: "stamped:Reply with exactly: Yay! I did it!",
BodyForCommands: "",
CommandBody: "",
CommandAuthorized: true,
GatewayClientScopes: ["operator.admin"],
InputProvenance: {
kind: "internal_system",
sourceChannel: "whatsapp",
sourceTool: "restart-sentinel",
},
SessionKey: "agent:main:main",
Provider: "webchat",
Surface: "webchat",
OriginatingChannel: "whatsapp",
OriginatingTo: "+15550002",
MessageThreadId: "thread-42",
},
);
expect(mocks.requestHeartbeat).not.toHaveBeenCalled();
});
@@ -616,16 +642,16 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
expect.objectContaining({
expectContinuationDispatchFields(
{
channel: "telegram",
routeSessionKey: "agent:main:group",
ctxPayload: expect.objectContaining({
ChatType: "group",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:-1001",
}),
}),
},
{
ChatType: "group",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:-1001",
},
);
});
@@ -669,29 +695,29 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
expect.objectContaining({
expectContinuationDispatchFields(
{
channel: "telegram",
accountId: "default",
routeSessionKey: "agent:main:telegram:group:-1003826723328:topic:13757",
ctxPayload: expect.objectContaining({
Body: "continue in topic",
CommandAuthorized: true,
GatewayClientScopes: ["operator.admin"],
InputProvenance: {
kind: "internal_system",
sourceChannel: "telegram",
sourceTool: "restart-sentinel",
},
Provider: "webchat",
Surface: "webchat",
ChatType: "group",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:-1003826723328:topic:13757",
ExplicitDeliverRoute: true,
MessageThreadId: "13757",
}),
}),
},
{
Body: "continue in topic",
CommandAuthorized: true,
GatewayClientScopes: ["operator.admin"],
InputProvenance: {
kind: "internal_system",
sourceChannel: "telegram",
sourceTool: "restart-sentinel",
},
Provider: "webchat",
Surface: "webchat",
ChatType: "group",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:-1003826723328:topic:13757",
ExplicitDeliverRoute: true,
MessageThreadId: "13757",
},
);
});
@@ -742,24 +768,21 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
expect.objectContaining({
ctxPayload: expect.objectContaining({
ReplyToId: "reply:thread-42",
MessageThreadId: undefined,
}),
}),
);
expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
expect.objectContaining({
payloads: [
{
text: "done",
replyToId: "reply:thread-42",
},
],
}),
expectContinuationDispatchFields(
{},
{
ReplyToId: "reply:thread-42",
MessageThreadId: undefined,
},
);
expectRecordFields(lastMockCallArg(mocks.deliverOutboundPayloads), {
payloads: [
{
text: "done",
replyToId: "reply:thread-42",
},
],
});
});
it("strips synthetic reply transport ids when no real reply target exists", async () => {
@@ -787,11 +810,9 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
expect.objectContaining({
payloads: [{ text: "done" }],
}),
);
expectRecordFields(lastMockCallArg(mocks.deliverOutboundPayloads), {
payloads: [{ text: "done" }],
});
});
it("preserves non-synthetic reply transport ids from continuation payloads", async () => {
@@ -819,16 +840,14 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
expect.objectContaining({
payloads: [
{
text: "done",
replyToId: "provider-reply-id",
},
],
}),
);
expectRecordFields(lastMockCallArg(mocks.deliverOutboundPayloads), {
payloads: [
{
text: "done",
replyToId: "provider-reply-id",
},
],
});
});
it("dispatches agentTurn continuation from session delivery context when sentinel routing is empty", async () => {
@@ -854,16 +873,16 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
expect.objectContaining({
expectContinuationDispatchFields(
{
channel: "telegram",
accountId: "default",
ctxPayload: expect.objectContaining({
Body: "continue",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:200482621",
}),
}),
},
{
Body: "continue",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:200482621",
},
);
});
@@ -887,19 +906,15 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(
2,
"continue after restart",
expect.objectContaining({
sessionKey: "agent:main:main",
deliveryContext: expect.objectContaining({
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
threadId: "thread-42",
}),
}),
);
expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(2, "continue after restart", {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
threadId: "thread-42",
},
});
expect(mocks.requestHeartbeat).toHaveBeenNthCalledWith(1, {
source: "restart-sentinel",
intent: "immediate",
@@ -938,19 +953,15 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(
2,
"continue after restart",
expect.objectContaining({
sessionKey: "agent:main:main",
deliveryContext: expect.objectContaining({
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
threadId: "thread-42",
}),
}),
);
expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(2, "continue after restart", {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
threadId: "thread-42",
},
});
});
it("logs and continues when continuation delivery fails", async () => {
@@ -1041,21 +1052,15 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(7);
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
ctxPayload: expect.objectContaining({
MessageSid: "restart-sentinel:agent:main:main:agentTurn:123",
}),
}),
expectContinuationDispatchFields(
{},
{ MessageSid: "restart-sentinel:agent:main:main:agentTurn:123" },
0,
);
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenNthCalledWith(
7,
expect.objectContaining({
ctxPayload: expect.objectContaining({
MessageSid: "restart-sentinel:agent:main:main:agentTurn:123:retry:6",
}),
}),
expectContinuationDispatchFields(
{},
{ MessageSid: "restart-sentinel:agent:main:main:agentTurn:123:retry:6" },
6,
);
const deliveredBusyReply = (
mocks.deliverOutboundPayloads.mock.calls as unknown as Array<
@@ -1063,11 +1068,9 @@ describe("scheduleRestartSentinelWake", () => {
>
).some(([call]) => call.payloads?.some((payload) => payload.text === busyReply) === true);
expect(deliveredBusyReply).toBe(false);
expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
expect.objectContaining({
payloads: [{ text: "done" }],
}),
);
expectRecordFields(lastMockCallArg(mocks.deliverOutboundPayloads), {
payloads: [{ text: "done" }],
});
expect(mocks.logWarn).toHaveBeenCalledWith(
expect.stringContaining(
"retry failed for entry session-delivery-1: Error: restart continuation deferred because previous run is still shutting down",
@@ -1100,13 +1103,10 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled();
expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(
2,
"continue",
expect.objectContaining({
sessionKey: "agent:main:main",
}),
);
expect(mocks.enqueueSystemEvent.mock.calls[1]?.[0]).toBe("continue");
expectNthSystemEventFields(1, {
sessionKey: "agent:main:main",
});
expect(mocks.requestHeartbeat).toHaveBeenCalledTimes(2);
expect(mocks.logWarn).not.toHaveBeenCalled();
});
@@ -1133,14 +1133,11 @@ describe("scheduleRestartSentinelWake", () => {
expect(mocks.removeRestartSentinelFile).not.toHaveBeenCalled();
expect(mocks.drainPendingSessionDeliveries).not.toHaveBeenCalled();
expect(mocks.logWarn).toHaveBeenCalledWith(
"startup task failed",
expect.objectContaining({
source: "restart-sentinel",
sessionKey: "agent:main:main",
reason: "queue write failed",
}),
);
expect(mocks.logWarn).toHaveBeenCalledWith("startup task failed", {
source: "restart-sentinel",
sessionKey: "agent:main:main",
reason: "queue write failed",
});
});
it("consumes continuation once and does not replay it on later startup cycles", async () => {
@@ -1203,13 +1200,10 @@ describe("scheduleRestartSentinelWake", () => {
sessionKey: "agent:main:main",
});
expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled();
expect(mocks.logWarn).toHaveBeenCalledWith(
expect.stringContaining("continuation skipped"),
expect.objectContaining({
sessionKey: "agent:main:main",
continuationKind: "agentTurn",
}),
);
expect(mocks.logWarn).toHaveBeenCalledWith(expect.stringContaining("continuation skipped"), {
sessionKey: "agent:main:main",
continuationKind: "agentTurn",
});
});
it("skips outbound restart notice when no canonical delivery context survives restart", async () => {
mocks.readRestartSentinel.mockResolvedValue({
@@ -1225,12 +1219,10 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith(
"restart message",
expect.objectContaining({
sessionKey: "agent:main:matrix:channel:!lowercased:example.org",
}),
);
expect(mocks.enqueueSystemEvent.mock.calls[0]?.[0]).toBe("restart message");
expectNthSystemEventFields(0, {
sessionKey: "agent:main:matrix:channel:!lowercased:example.org",
});
expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
expect(mocks.enqueueDelivery).not.toHaveBeenCalled();
expect(mocks.resolveOutboundTarget).not.toHaveBeenCalled();
@@ -1264,18 +1256,14 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.requestHeartbeat).toHaveBeenCalledTimes(1);
expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith(
expect.objectContaining({
channel: "qa-channel",
to: "channel:qa-room",
}),
);
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
channel: "qa-channel",
to: "channel:qa-room",
}),
);
expectMockCallFields(mocks.resolveOutboundTarget, {
channel: "qa-channel",
to: "channel:qa-room",
});
expectMockCallFields(mocks.deliverOutboundPayloads, {
channel: "qa-channel",
to: "channel:qa-room",
});
});
it("merges base session routing into partial thread metadata", async () => {
@@ -1328,20 +1316,16 @@ describe("scheduleRestartSentinelWake", () => {
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith(
expect.objectContaining({
channel: "matrix",
to: "room:!MixedCase:example.org",
accountId: "acct-thread",
}),
);
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
channel: "matrix",
to: "room:!MixedCase:example.org",
accountId: "acct-thread",
threadId: "$thread-event",
}),
);
expectMockCallFields(mocks.resolveOutboundTarget, {
channel: "matrix",
to: "room:!MixedCase:example.org",
accountId: "acct-thread",
});
expectMockCallFields(mocks.deliverOutboundPayloads, {
channel: "matrix",
to: "room:!MixedCase:example.org",
accountId: "acct-thread",
threadId: "$thread-event",
});
});
});