mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-07 22:44:16 +00:00
fix(voice-call): block Twilio webhook replay and stale transitions
This commit is contained in:
@@ -10,6 +10,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Security/Voice Call: harden Twilio webhook replay handling by preserving provider event IDs through normalization, adding bounded replay dedupe, and enforcing per-call turn-token matching for call-state transitions. This ships in the next npm release. Thanks @jiseoung for reporting.
|
||||
- Security/Commands: enforce sender-only matching for `commands.allowFrom` by blocking conversation-shaped `From` identities (`channel:`, `group:`, `thread:`, `@g.us`) while preserving direct-message fallback when sender fields are missing. Ships in the next npm release. Thanks @jiseoung.
|
||||
- Config/Kilo Gateway: Kilo provider flow now surfaces an updated list of models. (#24921) thanks @gumadeiras.
|
||||
- Security/Sandbox: enforce `tools.exec.applyPatch.workspaceOnly` and `tools.fs.workspaceOnly` for `apply_patch` in sandbox-mounted paths so writes/deletes cannot escape the workspace boundary via mounts like `/agent` unless explicitly opted out (`tools.exec.applyPatch.workspaceOnly=false`). This ships in the next npm release. Thanks @tdjackey for reporting.
|
||||
|
||||
@@ -177,6 +177,12 @@ headers are trusted.
|
||||
`webhookSecurity.trustedProxyIPs` only trusts forwarded headers when the request
|
||||
remote IP matches the list.
|
||||
|
||||
Webhook replay protection is enabled for Twilio and Plivo. Replayed valid webhook
|
||||
requests are acknowledged but skipped for side effects.
|
||||
|
||||
Twilio conversation turns include a per-turn token in `<Gather>` callbacks, so
|
||||
stale/replayed speech callbacks cannot satisfy a newer pending transcript turn.
|
||||
|
||||
Example with a stable public host:
|
||||
|
||||
```json5
|
||||
|
||||
@@ -175,5 +175,7 @@ Actions:
|
||||
## Notes
|
||||
|
||||
- Uses webhook signature verification for Twilio/Telnyx/Plivo.
|
||||
- Adds replay protection for Twilio and Plivo webhooks (valid duplicate callbacks are ignored safely).
|
||||
- Twilio speech turns include a per-turn token so stale/replayed callbacks cannot complete a newer turn.
|
||||
- `responseModel` / `responseSystemPrompt` control AI auto-responses.
|
||||
- Media streaming requires `ws` and OpenAI Realtime API key.
|
||||
|
||||
@@ -17,12 +17,16 @@ import type {
|
||||
} from "./types.js";
|
||||
|
||||
class FakeProvider implements VoiceCallProvider {
|
||||
readonly name = "plivo" as const;
|
||||
readonly name: "plivo" | "twilio";
|
||||
readonly playTtsCalls: PlayTtsInput[] = [];
|
||||
readonly hangupCalls: HangupCallInput[] = [];
|
||||
readonly startListeningCalls: StartListeningInput[] = [];
|
||||
readonly stopListeningCalls: StopListeningInput[] = [];
|
||||
|
||||
constructor(name: "plivo" | "twilio" = "plivo") {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
verifyWebhook(_ctx: WebhookContext): WebhookVerificationResult {
|
||||
return { ok: true };
|
||||
}
|
||||
@@ -319,6 +323,61 @@ describe("CallManager", () => {
|
||||
expect(provider.stopListeningCalls).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("ignores speech events with mismatched turnToken while waiting for transcript", async () => {
|
||||
const { manager, provider } = createManagerHarness(
|
||||
{
|
||||
transcriptTimeoutMs: 5000,
|
||||
},
|
||||
new FakeProvider("twilio"),
|
||||
);
|
||||
|
||||
const started = await manager.initiateCall("+15550000004");
|
||||
expect(started.success).toBe(true);
|
||||
|
||||
markCallAnswered(manager, started.callId, "evt-turn-token-answered");
|
||||
|
||||
const turnPromise = manager.continueCall(started.callId, "Prompt");
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
const expectedTurnToken = provider.startListeningCalls[0]?.turnToken;
|
||||
expect(typeof expectedTurnToken).toBe("string");
|
||||
|
||||
manager.processEvent({
|
||||
id: "evt-turn-token-bad",
|
||||
type: "call.speech",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
transcript: "stale replay",
|
||||
isFinal: true,
|
||||
turnToken: "wrong-token",
|
||||
});
|
||||
|
||||
const pendingState = await Promise.race([
|
||||
turnPromise.then(() => "resolved"),
|
||||
new Promise<"pending">((resolve) => setTimeout(() => resolve("pending"), 0)),
|
||||
]);
|
||||
expect(pendingState).toBe("pending");
|
||||
|
||||
manager.processEvent({
|
||||
id: "evt-turn-token-good",
|
||||
type: "call.speech",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
transcript: "final answer",
|
||||
isFinal: true,
|
||||
turnToken: expectedTurnToken,
|
||||
});
|
||||
|
||||
const turnResult = await turnPromise;
|
||||
expect(turnResult.success).toBe(true);
|
||||
expect(turnResult.transcript).toBe("final answer");
|
||||
|
||||
const call = manager.getCall(started.callId);
|
||||
expect(call?.transcript.map((entry) => entry.text)).toEqual(["Prompt", "final answer"]);
|
||||
});
|
||||
|
||||
it("tracks latency metadata across multiple closed-loop turns", async () => {
|
||||
const { manager, provider } = createManagerHarness({
|
||||
transcriptTimeoutMs: 5000,
|
||||
|
||||
@@ -6,6 +6,7 @@ export type TranscriptWaiter = {
|
||||
resolve: (text: string) => void;
|
||||
reject: (err: Error) => void;
|
||||
timeout: NodeJS.Timeout;
|
||||
turnToken?: string;
|
||||
};
|
||||
|
||||
export type CallManagerRuntimeState = {
|
||||
|
||||
@@ -234,4 +234,49 @@ describe("processEvent (functional)", () => {
|
||||
expect(() => processEvent(ctx, event)).not.toThrow();
|
||||
expect(ctx.activeCalls.size).toBe(0);
|
||||
});
|
||||
|
||||
it("deduplicates by dedupeKey even when event IDs differ", () => {
|
||||
const now = Date.now();
|
||||
const ctx = createContext();
|
||||
ctx.activeCalls.set("call-dedupe", {
|
||||
callId: "call-dedupe",
|
||||
providerCallId: "provider-dedupe",
|
||||
provider: "plivo",
|
||||
direction: "outbound",
|
||||
state: "answered",
|
||||
from: "+15550000000",
|
||||
to: "+15550000001",
|
||||
startedAt: now,
|
||||
transcript: [],
|
||||
processedEventIds: [],
|
||||
metadata: {},
|
||||
});
|
||||
ctx.providerCallIdMap.set("provider-dedupe", "call-dedupe");
|
||||
|
||||
processEvent(ctx, {
|
||||
id: "evt-1",
|
||||
dedupeKey: "stable-key-1",
|
||||
type: "call.speech",
|
||||
callId: "call-dedupe",
|
||||
providerCallId: "provider-dedupe",
|
||||
timestamp: now + 1,
|
||||
transcript: "hello",
|
||||
isFinal: true,
|
||||
});
|
||||
|
||||
processEvent(ctx, {
|
||||
id: "evt-2",
|
||||
dedupeKey: "stable-key-1",
|
||||
type: "call.speech",
|
||||
callId: "call-dedupe",
|
||||
providerCallId: "provider-dedupe",
|
||||
timestamp: now + 2,
|
||||
transcript: "hello",
|
||||
isFinal: true,
|
||||
});
|
||||
|
||||
const call = ctx.activeCalls.get("call-dedupe");
|
||||
expect(call?.transcript).toHaveLength(1);
|
||||
expect(Array.from(ctx.processedEventIds)).toEqual(["stable-key-1"]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -92,10 +92,11 @@ function createInboundCall(params: {
|
||||
}
|
||||
|
||||
export function processEvent(ctx: EventContext, event: NormalizedEvent): void {
|
||||
if (ctx.processedEventIds.has(event.id)) {
|
||||
const dedupeKey = event.dedupeKey || event.id;
|
||||
if (ctx.processedEventIds.has(dedupeKey)) {
|
||||
return;
|
||||
}
|
||||
ctx.processedEventIds.add(event.id);
|
||||
ctx.processedEventIds.add(dedupeKey);
|
||||
|
||||
let call = findCall({
|
||||
activeCalls: ctx.activeCalls,
|
||||
@@ -158,7 +159,7 @@ export function processEvent(ctx: EventContext, event: NormalizedEvent): void {
|
||||
}
|
||||
}
|
||||
|
||||
call.processedEventIds.push(event.id);
|
||||
call.processedEventIds.push(dedupeKey);
|
||||
|
||||
switch (event.type) {
|
||||
case "call.initiated":
|
||||
@@ -192,8 +193,20 @@ export function processEvent(ctx: EventContext, event: NormalizedEvent): void {
|
||||
|
||||
case "call.speech":
|
||||
if (event.isFinal) {
|
||||
const hadWaiter = ctx.transcriptWaiters.has(call.callId);
|
||||
const resolved = resolveTranscriptWaiter(
|
||||
ctx,
|
||||
call.callId,
|
||||
event.transcript,
|
||||
event.turnToken,
|
||||
);
|
||||
if (hadWaiter && !resolved) {
|
||||
console.warn(
|
||||
`[voice-call] Ignoring speech event with mismatched turn token for ${call.callId}`,
|
||||
);
|
||||
break;
|
||||
}
|
||||
addTranscriptEntry(call, "user", event.transcript);
|
||||
resolveTranscriptWaiter(ctx, call.callId, event.transcript);
|
||||
}
|
||||
transitionState(call, "listening");
|
||||
break;
|
||||
|
||||
@@ -291,6 +291,7 @@ export async function continueCall(
|
||||
ctx.activeTurnCalls.add(callId);
|
||||
|
||||
const turnStartedAt = Date.now();
|
||||
const turnToken = provider.name === "twilio" ? crypto.randomUUID() : undefined;
|
||||
|
||||
try {
|
||||
await speak(ctx, callId, prompt);
|
||||
@@ -299,9 +300,9 @@ export async function continueCall(
|
||||
persistCallRecord(ctx.storePath, call);
|
||||
|
||||
const listenStartedAt = Date.now();
|
||||
await provider.startListening({ callId, providerCallId });
|
||||
await provider.startListening({ callId, providerCallId, turnToken });
|
||||
|
||||
const transcript = await waitForFinalTranscript(ctx, callId);
|
||||
const transcript = await waitForFinalTranscript(ctx, callId, turnToken);
|
||||
const transcriptReceivedAt = Date.now();
|
||||
|
||||
// Best-effort: stop listening after final transcript.
|
||||
|
||||
@@ -77,16 +77,25 @@ export function resolveTranscriptWaiter(
|
||||
ctx: TranscriptWaiterContext,
|
||||
callId: CallId,
|
||||
transcript: string,
|
||||
): void {
|
||||
turnToken?: string,
|
||||
): boolean {
|
||||
const waiter = ctx.transcriptWaiters.get(callId);
|
||||
if (!waiter) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
if (waiter.turnToken && waiter.turnToken !== turnToken) {
|
||||
return false;
|
||||
}
|
||||
clearTranscriptWaiter(ctx, callId);
|
||||
waiter.resolve(transcript);
|
||||
return true;
|
||||
}
|
||||
|
||||
export function waitForFinalTranscript(ctx: TimerContext, callId: CallId): Promise<string> {
|
||||
export function waitForFinalTranscript(
|
||||
ctx: TimerContext,
|
||||
callId: CallId,
|
||||
turnToken?: string,
|
||||
): Promise<string> {
|
||||
if (ctx.transcriptWaiters.has(callId)) {
|
||||
return Promise.reject(new Error("Already waiting for transcript"));
|
||||
}
|
||||
@@ -98,6 +107,6 @@ export function waitForFinalTranscript(ctx: TimerContext, callId: CallId): Promi
|
||||
reject(new Error(`Timed out waiting for transcript after ${timeoutMs}ms`));
|
||||
}, timeoutMs);
|
||||
|
||||
ctx.transcriptWaiters.set(callId, { resolve, reject, timeout });
|
||||
ctx.transcriptWaiters.set(callId, { resolve, reject, timeout, turnToken });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -30,6 +30,29 @@ export interface PlivoProviderOptions {
|
||||
type PendingSpeak = { text: string; locale?: string };
|
||||
type PendingListen = { language?: string };
|
||||
|
||||
function getHeader(
|
||||
headers: Record<string, string | string[] | undefined>,
|
||||
name: string,
|
||||
): string | undefined {
|
||||
const value = headers[name.toLowerCase()];
|
||||
if (Array.isArray(value)) {
|
||||
return value[0];
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
function createPlivoRequestDedupeKey(ctx: WebhookContext): string {
|
||||
const nonceV3 = getHeader(ctx.headers, "x-plivo-signature-v3-nonce");
|
||||
if (nonceV3) {
|
||||
return `plivo:v3:${nonceV3}`;
|
||||
}
|
||||
const nonceV2 = getHeader(ctx.headers, "x-plivo-signature-v2-nonce");
|
||||
if (nonceV2) {
|
||||
return `plivo:v2:${nonceV2}`;
|
||||
}
|
||||
return `plivo:fallback:${crypto.createHash("sha256").update(ctx.rawBody).digest("hex")}`;
|
||||
}
|
||||
|
||||
export class PlivoProvider implements VoiceCallProvider {
|
||||
readonly name = "plivo" as const;
|
||||
|
||||
@@ -104,7 +127,7 @@ export class PlivoProvider implements VoiceCallProvider {
|
||||
console.warn(`[plivo] Webhook verification failed: ${result.reason}`);
|
||||
}
|
||||
|
||||
return { ok: result.ok, reason: result.reason };
|
||||
return { ok: result.ok, reason: result.reason, isReplay: result.isReplay };
|
||||
}
|
||||
|
||||
parseWebhookEvent(ctx: WebhookContext): ProviderWebhookParseResult {
|
||||
@@ -173,7 +196,8 @@ export class PlivoProvider implements VoiceCallProvider {
|
||||
|
||||
// Normal events.
|
||||
const callIdFromQuery = this.getCallIdFromQuery(ctx);
|
||||
const event = this.normalizeEvent(parsed, callIdFromQuery);
|
||||
const dedupeKey = createPlivoRequestDedupeKey(ctx);
|
||||
const event = this.normalizeEvent(parsed, callIdFromQuery, dedupeKey);
|
||||
|
||||
return {
|
||||
events: event ? [event] : [],
|
||||
@@ -186,7 +210,11 @@ export class PlivoProvider implements VoiceCallProvider {
|
||||
};
|
||||
}
|
||||
|
||||
private normalizeEvent(params: URLSearchParams, callIdOverride?: string): NormalizedEvent | null {
|
||||
private normalizeEvent(
|
||||
params: URLSearchParams,
|
||||
callIdOverride?: string,
|
||||
dedupeKey?: string,
|
||||
): NormalizedEvent | null {
|
||||
const callUuid = params.get("CallUUID") || "";
|
||||
const requestUuid = params.get("RequestUUID") || "";
|
||||
|
||||
@@ -201,6 +229,7 @@ export class PlivoProvider implements VoiceCallProvider {
|
||||
|
||||
const baseEvent = {
|
||||
id: crypto.randomUUID(),
|
||||
dedupeKey,
|
||||
callId: callIdOverride || callUuid || requestUuid,
|
||||
providerCallId: callUuid || requestUuid || undefined,
|
||||
timestamp: Date.now(),
|
||||
|
||||
@@ -59,4 +59,38 @@ describe("TwilioProvider", () => {
|
||||
expect(result.providerResponseBody).toContain('<Parameter name="token" value="');
|
||||
expect(result.providerResponseBody).toContain("<Connect>");
|
||||
});
|
||||
|
||||
it("uses a stable dedupeKey for identical request payloads", () => {
|
||||
const provider = createProvider();
|
||||
const rawBody = "CallSid=CA789&Direction=inbound&SpeechResult=hello";
|
||||
const ctxA = {
|
||||
...createContext(rawBody, { callId: "call-1", turnToken: "turn-1" }),
|
||||
headers: { "i-twilio-idempotency-token": "idem-123" },
|
||||
};
|
||||
const ctxB = {
|
||||
...createContext(rawBody, { callId: "call-1", turnToken: "turn-1" }),
|
||||
headers: { "i-twilio-idempotency-token": "idem-123" },
|
||||
};
|
||||
|
||||
const eventA = provider.parseWebhookEvent(ctxA).events[0];
|
||||
const eventB = provider.parseWebhookEvent(ctxB).events[0];
|
||||
|
||||
expect(eventA).toBeDefined();
|
||||
expect(eventB).toBeDefined();
|
||||
expect(eventA?.id).not.toBe(eventB?.id);
|
||||
expect(eventA?.dedupeKey).toBe("twilio:idempotency:idem-123");
|
||||
expect(eventA?.dedupeKey).toBe(eventB?.dedupeKey);
|
||||
});
|
||||
|
||||
it("keeps turnToken from query on speech events", () => {
|
||||
const provider = createProvider();
|
||||
const ctx = createContext("CallSid=CA222&Direction=inbound&SpeechResult=hello", {
|
||||
callId: "call-2",
|
||||
turnToken: "turn-xyz",
|
||||
});
|
||||
|
||||
const event = provider.parseWebhookEvent(ctx).events[0];
|
||||
expect(event?.type).toBe("call.speech");
|
||||
expect(event?.turnToken).toBe("turn-xyz");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -20,6 +20,33 @@ import type { VoiceCallProvider } from "./base.js";
|
||||
import { twilioApiRequest } from "./twilio/api.js";
|
||||
import { verifyTwilioProviderWebhook } from "./twilio/webhook.js";
|
||||
|
||||
function getHeader(
|
||||
headers: Record<string, string | string[] | undefined>,
|
||||
name: string,
|
||||
): string | undefined {
|
||||
const value = headers[name.toLowerCase()];
|
||||
if (Array.isArray(value)) {
|
||||
return value[0];
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
function createTwilioRequestDedupeKey(ctx: WebhookContext): string {
|
||||
const idempotencyToken = getHeader(ctx.headers, "i-twilio-idempotency-token");
|
||||
if (idempotencyToken) {
|
||||
return `twilio:idempotency:${idempotencyToken}`;
|
||||
}
|
||||
|
||||
const signature = getHeader(ctx.headers, "x-twilio-signature") ?? "";
|
||||
const callId = typeof ctx.query?.callId === "string" ? ctx.query.callId.trim() : "";
|
||||
const flow = typeof ctx.query?.flow === "string" ? ctx.query.flow.trim() : "";
|
||||
const turnToken = typeof ctx.query?.turnToken === "string" ? ctx.query.turnToken.trim() : "";
|
||||
return `twilio:fallback:${crypto
|
||||
.createHash("sha256")
|
||||
.update(`${signature}\n${callId}\n${flow}\n${turnToken}\n${ctx.rawBody}`)
|
||||
.digest("hex")}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Twilio Voice API provider implementation.
|
||||
*
|
||||
@@ -212,7 +239,16 @@ export class TwilioProvider implements VoiceCallProvider {
|
||||
typeof ctx.query?.callId === "string" && ctx.query.callId.trim()
|
||||
? ctx.query.callId.trim()
|
||||
: undefined;
|
||||
const event = this.normalizeEvent(params, callIdFromQuery);
|
||||
const turnTokenFromQuery =
|
||||
typeof ctx.query?.turnToken === "string" && ctx.query.turnToken.trim()
|
||||
? ctx.query.turnToken.trim()
|
||||
: undefined;
|
||||
const dedupeKey = createTwilioRequestDedupeKey(ctx);
|
||||
const event = this.normalizeEvent(params, {
|
||||
callIdOverride: callIdFromQuery,
|
||||
dedupeKey,
|
||||
turnToken: turnTokenFromQuery,
|
||||
});
|
||||
|
||||
// For Twilio, we must return TwiML. Most actions are driven by Calls API updates,
|
||||
// so the webhook response is typically a pause to keep the call alive.
|
||||
@@ -245,14 +281,24 @@ export class TwilioProvider implements VoiceCallProvider {
|
||||
/**
|
||||
* Convert Twilio webhook params to normalized event format.
|
||||
*/
|
||||
private normalizeEvent(params: URLSearchParams, callIdOverride?: string): NormalizedEvent | null {
|
||||
private normalizeEvent(
|
||||
params: URLSearchParams,
|
||||
options?: {
|
||||
callIdOverride?: string;
|
||||
dedupeKey?: string;
|
||||
turnToken?: string;
|
||||
},
|
||||
): NormalizedEvent | null {
|
||||
const callSid = params.get("CallSid") || "";
|
||||
const callIdOverride = options?.callIdOverride;
|
||||
|
||||
const baseEvent = {
|
||||
id: crypto.randomUUID(),
|
||||
dedupeKey: options?.dedupeKey,
|
||||
callId: callIdOverride || callSid,
|
||||
providerCallId: callSid,
|
||||
timestamp: Date.now(),
|
||||
turnToken: options?.turnToken,
|
||||
direction: TwilioProvider.parseDirection(params.get("Direction")),
|
||||
from: params.get("From") || undefined,
|
||||
to: params.get("To") || undefined,
|
||||
@@ -603,9 +649,14 @@ export class TwilioProvider implements VoiceCallProvider {
|
||||
throw new Error("Missing webhook URL for this call (provider state not initialized)");
|
||||
}
|
||||
|
||||
const actionUrl = new URL(webhookUrl);
|
||||
if (input.turnToken) {
|
||||
actionUrl.searchParams.set("turnToken", input.turnToken);
|
||||
}
|
||||
|
||||
const twiml = `<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Gather input="speech" speechTimeout="auto" language="${input.language || "en-US"}" action="${escapeXml(webhookUrl)}" method="POST">
|
||||
<Gather input="speech" speechTimeout="auto" language="${input.language || "en-US"}" action="${escapeXml(actionUrl.toString())}" method="POST">
|
||||
</Gather>
|
||||
</Response>`;
|
||||
|
||||
|
||||
@@ -28,5 +28,6 @@ export function verifyTwilioProviderWebhook(params: {
|
||||
return {
|
||||
ok: result.ok,
|
||||
reason: result.reason,
|
||||
isReplay: result.isReplay,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -74,9 +74,13 @@ export type EndReason = z.infer<typeof EndReasonSchema>;
|
||||
|
||||
const BaseEventSchema = z.object({
|
||||
id: z.string(),
|
||||
// Stable provider-derived key for idempotency/replay dedupe.
|
||||
dedupeKey: z.string().optional(),
|
||||
callId: z.string(),
|
||||
providerCallId: z.string().optional(),
|
||||
timestamp: z.number(),
|
||||
// Optional per-turn nonce for speech events (Twilio <Gather> replay hardening).
|
||||
turnToken: z.string().optional(),
|
||||
// Optional fields for inbound call detection
|
||||
direction: z.enum(["inbound", "outbound"]).optional(),
|
||||
from: z.string().optional(),
|
||||
@@ -171,6 +175,8 @@ export type CallRecord = z.infer<typeof CallRecordSchema>;
|
||||
export type WebhookVerificationResult = {
|
||||
ok: boolean;
|
||||
reason?: string;
|
||||
/** Signature is valid, but request was seen before within replay window. */
|
||||
isReplay?: boolean;
|
||||
};
|
||||
|
||||
export type WebhookContext = {
|
||||
@@ -226,6 +232,8 @@ export type StartListeningInput = {
|
||||
callId: CallId;
|
||||
providerCallId: ProviderCallId;
|
||||
language?: string;
|
||||
/** Optional per-turn nonce for provider callbacks (replay hardening). */
|
||||
turnToken?: string;
|
||||
};
|
||||
|
||||
export type StopListeningInput = {
|
||||
|
||||
@@ -163,6 +163,40 @@ describe("verifyPlivoWebhook", () => {
|
||||
expect(result.ok).toBe(false);
|
||||
expect(result.reason).toMatch(/Missing Plivo signature headers/);
|
||||
});
|
||||
|
||||
it("marks replayed valid V3 requests as replay without failing auth", () => {
|
||||
const authToken = "test-auth-token";
|
||||
const nonce = "nonce-replay-v3";
|
||||
const urlWithQuery = "https://example.com/voice/webhook?flow=answer&callId=abc";
|
||||
const postBody = "CallUUID=uuid&CallStatus=in-progress&From=%2B15550000000";
|
||||
const signature = plivoV3Signature({
|
||||
authToken,
|
||||
urlWithQuery,
|
||||
postBody,
|
||||
nonce,
|
||||
});
|
||||
|
||||
const ctx = {
|
||||
headers: {
|
||||
host: "example.com",
|
||||
"x-forwarded-proto": "https",
|
||||
"x-plivo-signature-v3": signature,
|
||||
"x-plivo-signature-v3-nonce": nonce,
|
||||
},
|
||||
rawBody: postBody,
|
||||
url: urlWithQuery,
|
||||
method: "POST" as const,
|
||||
query: { flow: "answer", callId: "abc" },
|
||||
};
|
||||
|
||||
const first = verifyPlivoWebhook(ctx, authToken);
|
||||
const second = verifyPlivoWebhook(ctx, authToken);
|
||||
|
||||
expect(first.ok).toBe(true);
|
||||
expect(first.isReplay).toBeFalsy();
|
||||
expect(second.ok).toBe(true);
|
||||
expect(second.isReplay).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("verifyTwilioWebhook", () => {
|
||||
@@ -197,6 +231,48 @@ describe("verifyTwilioWebhook", () => {
|
||||
expect(result.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("marks replayed valid requests as replay without failing auth", () => {
|
||||
const authToken = "test-auth-token";
|
||||
const publicUrl = "https://example.com/voice/webhook";
|
||||
const urlWithQuery = `${publicUrl}?callId=abc`;
|
||||
const postBody = "CallSid=CS777&CallStatus=completed&From=%2B15550000000";
|
||||
const signature = twilioSignature({ authToken, url: urlWithQuery, postBody });
|
||||
const headers = {
|
||||
host: "example.com",
|
||||
"x-forwarded-proto": "https",
|
||||
"x-twilio-signature": signature,
|
||||
"i-twilio-idempotency-token": "idem-replay-1",
|
||||
};
|
||||
|
||||
const first = verifyTwilioWebhook(
|
||||
{
|
||||
headers,
|
||||
rawBody: postBody,
|
||||
url: "http://local/voice/webhook?callId=abc",
|
||||
method: "POST",
|
||||
query: { callId: "abc" },
|
||||
},
|
||||
authToken,
|
||||
{ publicUrl },
|
||||
);
|
||||
const second = verifyTwilioWebhook(
|
||||
{
|
||||
headers,
|
||||
rawBody: postBody,
|
||||
url: "http://local/voice/webhook?callId=abc",
|
||||
method: "POST",
|
||||
query: { callId: "abc" },
|
||||
},
|
||||
authToken,
|
||||
{ publicUrl },
|
||||
);
|
||||
|
||||
expect(first.ok).toBe(true);
|
||||
expect(first.isReplay).toBeFalsy();
|
||||
expect(second.ok).toBe(true);
|
||||
expect(second.isReplay).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects invalid signatures even when attacker injects forwarded host", () => {
|
||||
const authToken = "test-auth-token";
|
||||
const postBody = "CallSid=CS123&CallStatus=completed&From=%2B15550000000";
|
||||
|
||||
@@ -1,6 +1,63 @@
|
||||
import crypto from "node:crypto";
|
||||
import type { WebhookContext } from "./types.js";
|
||||
|
||||
const REPLAY_WINDOW_MS = 10 * 60 * 1000;
|
||||
const REPLAY_CACHE_MAX_ENTRIES = 10_000;
|
||||
const REPLAY_CACHE_PRUNE_INTERVAL = 64;
|
||||
|
||||
type ReplayCache = {
|
||||
seenUntil: Map<string, number>;
|
||||
calls: number;
|
||||
};
|
||||
|
||||
const twilioReplayCache: ReplayCache = {
|
||||
seenUntil: new Map<string, number>(),
|
||||
calls: 0,
|
||||
};
|
||||
|
||||
const plivoReplayCache: ReplayCache = {
|
||||
seenUntil: new Map<string, number>(),
|
||||
calls: 0,
|
||||
};
|
||||
|
||||
function sha256Hex(input: string): string {
|
||||
return crypto.createHash("sha256").update(input).digest("hex");
|
||||
}
|
||||
|
||||
function pruneReplayCache(cache: ReplayCache, now: number): void {
|
||||
for (const [key, expiresAt] of cache.seenUntil) {
|
||||
if (expiresAt <= now) {
|
||||
cache.seenUntil.delete(key);
|
||||
}
|
||||
}
|
||||
while (cache.seenUntil.size > REPLAY_CACHE_MAX_ENTRIES) {
|
||||
const oldest = cache.seenUntil.keys().next().value;
|
||||
if (!oldest) {
|
||||
break;
|
||||
}
|
||||
cache.seenUntil.delete(oldest);
|
||||
}
|
||||
}
|
||||
|
||||
function markReplay(cache: ReplayCache, replayKey: string): boolean {
|
||||
const now = Date.now();
|
||||
cache.calls += 1;
|
||||
if (cache.calls % REPLAY_CACHE_PRUNE_INTERVAL === 0) {
|
||||
pruneReplayCache(cache, now);
|
||||
}
|
||||
|
||||
const existing = cache.seenUntil.get(replayKey);
|
||||
if (existing && existing > now) {
|
||||
return true;
|
||||
}
|
||||
|
||||
cache.seenUntil.set(replayKey, now + REPLAY_WINDOW_MS);
|
||||
if (cache.seenUntil.size > REPLAY_CACHE_MAX_ENTRIES) {
|
||||
pruneReplayCache(cache, now);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate Twilio webhook signature using HMAC-SHA1.
|
||||
*
|
||||
@@ -328,6 +385,8 @@ export interface TwilioVerificationResult {
|
||||
verificationUrl?: string;
|
||||
/** Whether we're running behind ngrok free tier */
|
||||
isNgrokFreeTier?: boolean;
|
||||
/** Request is cryptographically valid but was already processed recently. */
|
||||
isReplay?: boolean;
|
||||
}
|
||||
|
||||
export interface TelnyxVerificationResult {
|
||||
@@ -335,6 +394,20 @@ export interface TelnyxVerificationResult {
|
||||
reason?: string;
|
||||
}
|
||||
|
||||
function createTwilioReplayKey(params: {
|
||||
ctx: WebhookContext;
|
||||
signature: string;
|
||||
verificationUrl: string;
|
||||
}): string {
|
||||
const idempotencyToken = getHeader(params.ctx.headers, "i-twilio-idempotency-token");
|
||||
if (idempotencyToken) {
|
||||
return `twilio:idempotency:${idempotencyToken}`;
|
||||
}
|
||||
return `twilio:fallback:${sha256Hex(
|
||||
`${params.verificationUrl}\n${params.signature}\n${params.ctx.rawBody}`,
|
||||
)}`;
|
||||
}
|
||||
|
||||
function decodeBase64OrBase64Url(input: string): Buffer {
|
||||
// Telnyx docs say Base64; some tooling emits Base64URL. Accept both.
|
||||
const normalized = input.replace(/-/g, "+").replace(/_/g, "/");
|
||||
@@ -505,7 +578,9 @@ export function verifyTwilioWebhook(
|
||||
const isValid = validateTwilioSignature(authToken, signature, verificationUrl, params);
|
||||
|
||||
if (isValid) {
|
||||
return { ok: true, verificationUrl };
|
||||
const replayKey = createTwilioReplayKey({ ctx, signature, verificationUrl });
|
||||
const isReplay = markReplay(twilioReplayCache, replayKey);
|
||||
return { ok: true, verificationUrl, isReplay };
|
||||
}
|
||||
|
||||
// Check if this is ngrok free tier - the URL might have different format
|
||||
@@ -533,6 +608,8 @@ export interface PlivoVerificationResult {
|
||||
verificationUrl?: string;
|
||||
/** Signature version used for verification */
|
||||
version?: "v3" | "v2";
|
||||
/** Request is cryptographically valid but was already processed recently. */
|
||||
isReplay?: boolean;
|
||||
}
|
||||
|
||||
function normalizeSignatureBase64(input: string): string {
|
||||
@@ -753,14 +830,17 @@ export function verifyPlivoWebhook(
|
||||
url: verificationUrl,
|
||||
postParams,
|
||||
});
|
||||
return ok
|
||||
? { ok: true, version: "v3", verificationUrl }
|
||||
: {
|
||||
ok: false,
|
||||
version: "v3",
|
||||
verificationUrl,
|
||||
reason: "Invalid Plivo V3 signature",
|
||||
};
|
||||
if (!ok) {
|
||||
return {
|
||||
ok: false,
|
||||
version: "v3",
|
||||
verificationUrl,
|
||||
reason: "Invalid Plivo V3 signature",
|
||||
};
|
||||
}
|
||||
const replayKey = `plivo:v3:${sha256Hex(`${verificationUrl}\n${nonceV3}`)}`;
|
||||
const isReplay = markReplay(plivoReplayCache, replayKey);
|
||||
return { ok: true, version: "v3", verificationUrl, isReplay };
|
||||
}
|
||||
|
||||
if (signatureV2 && nonceV2) {
|
||||
@@ -770,14 +850,17 @@ export function verifyPlivoWebhook(
|
||||
nonce: nonceV2,
|
||||
url: verificationUrl,
|
||||
});
|
||||
return ok
|
||||
? { ok: true, version: "v2", verificationUrl }
|
||||
: {
|
||||
ok: false,
|
||||
version: "v2",
|
||||
verificationUrl,
|
||||
reason: "Invalid Plivo V2 signature",
|
||||
};
|
||||
if (!ok) {
|
||||
return {
|
||||
ok: false,
|
||||
version: "v2",
|
||||
verificationUrl,
|
||||
reason: "Invalid Plivo V2 signature",
|
||||
};
|
||||
}
|
||||
const replayKey = `plivo:v2:${sha256Hex(`${verificationUrl}\n${nonceV2}`)}`;
|
||||
const isReplay = markReplay(plivoReplayCache, replayKey);
|
||||
return { ok: true, version: "v2", verificationUrl, isReplay };
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@@ -45,12 +45,14 @@ const createCall = (startedAt: number): CallRecord => ({
|
||||
|
||||
const createManager = (calls: CallRecord[]) => {
|
||||
const endCall = vi.fn(async () => ({ success: true }));
|
||||
const processEvent = vi.fn();
|
||||
const manager = {
|
||||
getActiveCalls: () => calls,
|
||||
endCall,
|
||||
processEvent,
|
||||
} as unknown as CallManager;
|
||||
|
||||
return { manager, endCall };
|
||||
return { manager, endCall, processEvent };
|
||||
};
|
||||
|
||||
describe("VoiceCallWebhookServer stale call reaper", () => {
|
||||
@@ -116,3 +118,51 @@ describe("VoiceCallWebhookServer stale call reaper", () => {
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("VoiceCallWebhookServer replay handling", () => {
|
||||
it("acknowledges replayed webhook requests and skips event side effects", async () => {
|
||||
const replayProvider: VoiceCallProvider = {
|
||||
...provider,
|
||||
verifyWebhook: () => ({ ok: true, isReplay: true }),
|
||||
parseWebhookEvent: () => ({
|
||||
events: [
|
||||
{
|
||||
id: "evt-replay",
|
||||
dedupeKey: "stable-replay",
|
||||
type: "call.speech",
|
||||
callId: "call-1",
|
||||
providerCallId: "provider-call-1",
|
||||
timestamp: Date.now(),
|
||||
transcript: "hello",
|
||||
isFinal: true,
|
||||
},
|
||||
],
|
||||
statusCode: 200,
|
||||
}),
|
||||
};
|
||||
const { manager, processEvent } = createManager([]);
|
||||
const config = createConfig({ serve: { port: 0, bind: "127.0.0.1", path: "/voice/webhook" } });
|
||||
const server = new VoiceCallWebhookServer(config, manager, replayProvider);
|
||||
|
||||
try {
|
||||
const baseUrl = await server.start();
|
||||
const address = (
|
||||
server as unknown as { server?: { address?: () => unknown } }
|
||||
).server?.address?.();
|
||||
const requestUrl = new URL(baseUrl);
|
||||
if (address && typeof address === "object" && "port" in address && address.port) {
|
||||
requestUrl.port = String(address.port);
|
||||
}
|
||||
const response = await fetch(requestUrl.toString(), {
|
||||
method: "POST",
|
||||
headers: { "content-type": "application/x-www-form-urlencoded" },
|
||||
body: "CallSid=CA123&SpeechResult=hello",
|
||||
});
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(processEvent).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
await server.stop();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -346,11 +346,15 @@ export class VoiceCallWebhookServer {
|
||||
const result = this.provider.parseWebhookEvent(ctx);
|
||||
|
||||
// Process each event
|
||||
for (const event of result.events) {
|
||||
try {
|
||||
this.manager.processEvent(event);
|
||||
} catch (err) {
|
||||
console.error(`[voice-call] Error processing event ${event.type}:`, err);
|
||||
if (verification.isReplay) {
|
||||
console.warn("[voice-call] Replay detected; skipping event side effects");
|
||||
} else {
|
||||
for (const event of result.events) {
|
||||
try {
|
||||
this.manager.processEvent(event);
|
||||
} catch (err) {
|
||||
console.error(`[voice-call] Error processing event ${event.type}:`, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user