mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-21 13:44:03 +00:00
Outbound: centralize payload normalization plan
This commit is contained in:
@@ -14,10 +14,11 @@ import { resolveMessageChannelSelection } from "../../infra/outbound/channel-sel
|
||||
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
|
||||
import { buildOutboundResultEnvelope } from "../../infra/outbound/envelope.js";
|
||||
import {
|
||||
createOutboundPayloadPlan,
|
||||
formatOutboundPayloadLog,
|
||||
type NormalizedOutboundPayload,
|
||||
normalizeOutboundPayloads,
|
||||
normalizeOutboundPayloadsForJson,
|
||||
projectOutboundPayloadPlanForJson,
|
||||
projectOutboundPayloadPlanForOutbound,
|
||||
} from "../../infra/outbound/payloads.js";
|
||||
import type { OutboundSessionContext } from "../../infra/outbound/session-context.js";
|
||||
import type { RuntimeEnv } from "../../runtime.js";
|
||||
@@ -266,7 +267,8 @@ export async function deliverAgentCommandResult(params: {
|
||||
accountId: resolvedAccountId,
|
||||
applyChannelTransforms: deliver,
|
||||
});
|
||||
const normalizedPayloads = normalizeOutboundPayloadsForJson(normalizedReplyPayloads);
|
||||
const outboundPayloadPlan = createOutboundPayloadPlan(normalizedReplyPayloads);
|
||||
const normalizedPayloads = projectOutboundPayloadPlanForJson(outboundPayloadPlan);
|
||||
if (opts.json) {
|
||||
runtime.log(
|
||||
JSON.stringify(
|
||||
@@ -288,7 +290,7 @@ export async function deliverAgentCommandResult(params: {
|
||||
return { payloads: [], meta: result.meta };
|
||||
}
|
||||
|
||||
const deliveryPayloads = normalizeOutboundPayloads(normalizedReplyPayloads);
|
||||
const deliveryPayloads = projectOutboundPayloadPlanForOutbound(outboundPayloadPlan);
|
||||
const logPayload = (payload: NormalizedOutboundPayload) => {
|
||||
if (opts.json) {
|
||||
return;
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
|
||||
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
|
||||
import { normalizeChannelId } from "../../channels/plugins/index.js";
|
||||
import { dispatchChannelMessageAction } from "../../channels/plugins/message-action-dispatch.js";
|
||||
@@ -13,7 +12,10 @@ import {
|
||||
ensureOutboundSessionEntry,
|
||||
resolveOutboundSessionRoute,
|
||||
} from "../../infra/outbound/outbound-session.js";
|
||||
import { normalizeReplyPayloadsForDelivery } from "../../infra/outbound/payloads.js";
|
||||
import {
|
||||
createOutboundPayloadPlan,
|
||||
projectOutboundPayloadPlanForMirror,
|
||||
} from "../../infra/outbound/payloads.js";
|
||||
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
|
||||
import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js";
|
||||
import { resolveOutboundTarget } from "../../infra/outbound/targets.js";
|
||||
@@ -405,16 +407,11 @@ export const sendHandlers: GatewayRequestHandlers = {
|
||||
});
|
||||
const deliveryTarget = idLikeTarget?.to ?? resolvedTarget.to;
|
||||
const outboundDeps = context.deps ? createOutboundSendDeps(context.deps) : undefined;
|
||||
const mirrorPayloads = normalizeReplyPayloadsForDelivery([
|
||||
{ text: message, mediaUrl, mediaUrls },
|
||||
]);
|
||||
const mirrorText = mirrorPayloads
|
||||
.map((payload) => payload.text)
|
||||
.filter(Boolean)
|
||||
.join("\n");
|
||||
const mirrorMediaUrls = mirrorPayloads.flatMap(
|
||||
(payload) => resolveSendableOutboundReplyParts(payload).mediaUrls,
|
||||
);
|
||||
const outboundPayloads = [{ text: message, mediaUrl, mediaUrls }];
|
||||
const outboundPayloadPlan = createOutboundPayloadPlan(outboundPayloads);
|
||||
const mirrorProjection = projectOutboundPayloadPlanForMirror(outboundPayloadPlan);
|
||||
const mirrorText = mirrorProjection.text;
|
||||
const mirrorMediaUrls = mirrorProjection.mediaUrls;
|
||||
const providedSessionKey = normalizeOptionalLowercaseString(request.sessionKey);
|
||||
const explicitAgentId = normalizeOptionalString(request.agentId);
|
||||
const sessionAgentId = providedSessionKey
|
||||
@@ -460,7 +457,7 @@ export const sendHandlers: GatewayRequestHandlers = {
|
||||
channel: outboundChannel,
|
||||
to: deliveryTarget,
|
||||
accountId,
|
||||
payloads: [{ text: message, mediaUrl, mediaUrls }],
|
||||
payloads: outboundPayloads,
|
||||
session: outboundSession,
|
||||
gifPlayback: request.gifPlayback,
|
||||
threadId: threadId ?? null,
|
||||
|
||||
@@ -838,6 +838,36 @@ describe("deliverOutboundPayloads", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("writes raw payloads to the queue before normalization", async () => {
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w-raw", toJid: "jid" });
|
||||
const rawPayloads: DeliverOutboundPayload[] = [
|
||||
{ text: "NO_REPLY" },
|
||||
{ text: '{"action":"NO_REPLY"}' },
|
||||
{ text: "caption\nMEDIA:https://x.test/a.png" },
|
||||
{ text: "NO_REPLY", mediaUrl: " https://x.test/b.png " },
|
||||
];
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg: whatsappChunkConfig,
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: rawPayloads,
|
||||
deps: { whatsapp: sendWhatsApp },
|
||||
});
|
||||
|
||||
expect(queueMocks.enqueueDelivery).toHaveBeenCalledTimes(1);
|
||||
expect(queueMocks.enqueueDelivery).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
payloads: [
|
||||
{ text: "NO_REPLY" },
|
||||
{ text: '{"action":"NO_REPLY"}' },
|
||||
{ text: "caption\nMEDIA:https://x.test/a.png" },
|
||||
{ text: "NO_REPLY", mediaUrl: " https://x.test/b.png " },
|
||||
],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("acks the queue entry when delivery is aborted", async () => {
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
|
||||
const abortController = new AbortController();
|
||||
|
||||
@@ -1,7 +1,4 @@
|
||||
import {
|
||||
resolveSendableOutboundReplyParts,
|
||||
sendMediaWithLeadingCaption,
|
||||
} from "openclaw/plugin-sdk/reply-payload";
|
||||
import { sendMediaWithLeadingCaption } from "openclaw/plugin-sdk/reply-payload";
|
||||
import {
|
||||
chunkByParagraph,
|
||||
chunkMarkdownTextWithMode,
|
||||
@@ -35,8 +32,13 @@ import type { OutboundDeliveryResult } from "./deliver-types.js";
|
||||
import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js";
|
||||
import type { OutboundIdentity } from "./identity.js";
|
||||
import type { DeliveryMirror } from "./mirror.js";
|
||||
import type { NormalizedOutboundPayload } from "./payloads.js";
|
||||
import { normalizeReplyPayloadsForDelivery } from "./payloads.js";
|
||||
import {
|
||||
createOutboundPayloadPlan,
|
||||
projectOutboundPayloadPlanForDelivery,
|
||||
summarizeOutboundPayloadForTransport,
|
||||
type NormalizedOutboundPayload,
|
||||
type OutboundPayloadPlan,
|
||||
} from "./payloads.js";
|
||||
import { resolveOutboundSendDep, type OutboundSendDeps } from "./send-deps.js";
|
||||
import type { OutboundSessionContext } from "./session-context.js";
|
||||
import type { OutboundChannel } from "./targets.js";
|
||||
@@ -289,12 +291,8 @@ type DeliverOutboundPayloadsCoreParams = {
|
||||
gatewayClientScopes?: readonly string[];
|
||||
};
|
||||
|
||||
function collectPayloadMediaSources(payloads: ReplyPayload[]): string[] {
|
||||
const mediaSources: string[] = [];
|
||||
for (const payload of normalizeReplyPayloadsForDelivery(payloads)) {
|
||||
mediaSources.push(...resolveSendableOutboundReplyParts(payload).mediaUrls);
|
||||
}
|
||||
return mediaSources;
|
||||
function collectPayloadMediaSources(plan: readonly OutboundPayloadPlan[]): string[] {
|
||||
return plan.flatMap((entry) => entry.parts.mediaUrls);
|
||||
}
|
||||
|
||||
export type DeliverOutboundPayloadsParams = DeliverOutboundPayloadsCoreParams & {
|
||||
@@ -326,11 +324,11 @@ function normalizeEmptyPayloadForDelivery(payload: ReplyPayload): ReplyPayload |
|
||||
}
|
||||
|
||||
function normalizePayloadsForChannelDelivery(
|
||||
payloads: ReplyPayload[],
|
||||
plan: readonly OutboundPayloadPlan[],
|
||||
handler: ChannelHandler,
|
||||
): ReplyPayload[] {
|
||||
const normalizedPayloads: ReplyPayload[] = [];
|
||||
for (const payload of normalizeReplyPayloadsForDelivery(payloads)) {
|
||||
for (const payload of projectOutboundPayloadPlanForDelivery(plan)) {
|
||||
let sanitizedPayload = payload;
|
||||
if (handler.sanitizeText && sanitizedPayload.text) {
|
||||
if (!handler.shouldSkipPlainTextSanitization?.(sanitizedPayload)) {
|
||||
@@ -354,14 +352,7 @@ function normalizePayloadsForChannelDelivery(
|
||||
}
|
||||
|
||||
function buildPayloadSummary(payload: ReplyPayload): NormalizedOutboundPayload {
|
||||
const parts = resolveSendableOutboundReplyParts(payload);
|
||||
return {
|
||||
text: parts.text,
|
||||
mediaUrls: parts.mediaUrls,
|
||||
audioAsVoice: payload.audioAsVoice === true ? true : undefined,
|
||||
interactive: payload.interactive,
|
||||
channelData: payload.channelData,
|
||||
};
|
||||
return summarizeOutboundPayloadForTransport(payload);
|
||||
}
|
||||
|
||||
function createMessageSentEmitter(params: {
|
||||
@@ -561,13 +552,14 @@ async function deliverOutboundPayloadsCore(
|
||||
params: DeliverOutboundPayloadsCoreParams,
|
||||
): Promise<OutboundDeliveryResult[]> {
|
||||
const { cfg, channel, to, payloads } = params;
|
||||
const outboundPayloadPlan = createOutboundPayloadPlan(payloads);
|
||||
const accountId = params.accountId;
|
||||
const deps = params.deps;
|
||||
const abortSignal = params.abortSignal;
|
||||
const mediaAccess = resolveAgentScopedOutboundMediaAccess({
|
||||
cfg,
|
||||
agentId: params.session?.agentId ?? params.mirror?.agentId,
|
||||
mediaSources: collectPayloadMediaSources(payloads),
|
||||
mediaSources: collectPayloadMediaSources(outboundPayloadPlan),
|
||||
sessionKey: params.session?.key,
|
||||
messageProvider: params.session?.key ? undefined : channel,
|
||||
accountId: params.session?.requesterAccountId ?? accountId,
|
||||
@@ -643,7 +635,7 @@ async function deliverOutboundPayloadsCore(
|
||||
results.push(await handler.sendText(chunk, overrides));
|
||||
}
|
||||
};
|
||||
const normalizedPayloads = normalizePayloadsForChannelDelivery(payloads, handler);
|
||||
const normalizedPayloads = normalizePayloadsForChannelDelivery(outboundPayloadPlan, handler);
|
||||
const hookRunner = getGlobalHookRunner();
|
||||
const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key;
|
||||
const mirrorIsGroup = params.mirror?.isGroup;
|
||||
|
||||
@@ -188,6 +188,109 @@ describe("sendMessage", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("applies mirror matrix semantics for MEDIA and silent token variants", async () => {
|
||||
const matrix: Array<{
|
||||
name: string;
|
||||
content: string;
|
||||
mediaUrl?: string;
|
||||
expectedPayloads: Array<{
|
||||
text: string;
|
||||
mediaUrl: string | null;
|
||||
mediaUrls: string[];
|
||||
}>;
|
||||
expectedMirror: {
|
||||
text: string;
|
||||
mediaUrls?: string[];
|
||||
};
|
||||
}> = [
|
||||
{
|
||||
name: "MEDIA directives",
|
||||
content: "Here\nMEDIA:https://example.com/a.png\nMEDIA:https://example.com/b.png",
|
||||
expectedPayloads: [
|
||||
{
|
||||
text: "Here",
|
||||
mediaUrl: null,
|
||||
mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
|
||||
},
|
||||
],
|
||||
expectedMirror: {
|
||||
text: "Here",
|
||||
mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "exact NO_REPLY",
|
||||
content: "NO_REPLY",
|
||||
expectedPayloads: [],
|
||||
expectedMirror: {
|
||||
text: "NO_REPLY",
|
||||
mediaUrls: undefined,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "JSON NO_REPLY",
|
||||
content: '{\n "action": "NO_REPLY"\n}',
|
||||
expectedPayloads: [],
|
||||
expectedMirror: {
|
||||
text: '{\n "action": "NO_REPLY"\n}',
|
||||
mediaUrls: undefined,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "exact NO_REPLY with explicit media",
|
||||
content: "NO_REPLY",
|
||||
mediaUrl: "https://example.com/c.png",
|
||||
expectedPayloads: [
|
||||
{
|
||||
text: "",
|
||||
mediaUrl: "https://example.com/c.png",
|
||||
mediaUrls: ["https://example.com/c.png"],
|
||||
},
|
||||
],
|
||||
expectedMirror: {
|
||||
text: "NO_REPLY",
|
||||
mediaUrls: ["https://example.com/c.png"],
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
for (const entry of matrix) {
|
||||
mocks.deliverOutboundPayloads.mockClear();
|
||||
|
||||
await sendMessage({
|
||||
cfg: {},
|
||||
channel: "telegram",
|
||||
to: "123456",
|
||||
content: entry.content,
|
||||
...(entry.mediaUrl ? { mediaUrl: entry.mediaUrl } : {}),
|
||||
mirror: {
|
||||
sessionKey: "agent:main:telegram:dm:123456",
|
||||
},
|
||||
});
|
||||
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||
const deliveryCall = mocks.deliverOutboundPayloads.mock.calls[0]?.[0] as
|
||||
| {
|
||||
payloads?: Array<{ text?: string; mediaUrl?: string; mediaUrls?: string[] }>;
|
||||
mirror?: unknown;
|
||||
}
|
||||
| undefined;
|
||||
const payloadSummary = (deliveryCall?.payloads ?? []).map((payload) => ({
|
||||
text: payload.text ?? "",
|
||||
mediaUrl: payload.mediaUrl ?? null,
|
||||
mediaUrls: payload.mediaUrls ?? [],
|
||||
}));
|
||||
expect(payloadSummary, entry.name).toEqual(entry.expectedPayloads);
|
||||
expect(deliveryCall?.mirror, entry.name).toEqual(
|
||||
expect.objectContaining({
|
||||
sessionKey: "agent:main:telegram:dm:123456",
|
||||
text: entry.expectedMirror.text,
|
||||
mediaUrls: entry.expectedMirror.mediaUrls,
|
||||
}),
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
it("recovers telegram plugin resolution so message/send does not fail with Unknown channel: telegram", async () => {
|
||||
const telegramPlugin = {
|
||||
outbound: { deliveryMode: "direct" },
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import type { PollInput } from "../../polls.js";
|
||||
import { normalizePollInput } from "../../polls.js";
|
||||
@@ -16,7 +15,11 @@ import {
|
||||
type OutboundSendDeps,
|
||||
} from "./deliver.js";
|
||||
import type { OutboundMirror } from "./mirror.js";
|
||||
import { normalizeReplyPayloadsForDelivery } from "./payloads.js";
|
||||
import {
|
||||
createOutboundPayloadPlan,
|
||||
projectOutboundPayloadPlanForDelivery,
|
||||
projectOutboundPayloadPlanForMirror,
|
||||
} from "./payloads.js";
|
||||
import { buildOutboundSessionContext } from "./session-context.js";
|
||||
import { resolveOutboundTarget } from "./targets.js";
|
||||
|
||||
@@ -234,20 +237,17 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
|
||||
const channel = await resolveRequiredChannel({ cfg, channel: params.channel });
|
||||
const plugin = resolveRequiredPlugin(channel, cfg);
|
||||
const deliveryMode = plugin.outbound?.deliveryMode ?? "direct";
|
||||
const normalizedPayloads = normalizeReplyPayloadsForDelivery([
|
||||
const outboundPlan = createOutboundPayloadPlan([
|
||||
{
|
||||
text: params.content,
|
||||
mediaUrl: params.mediaUrl,
|
||||
mediaUrls: params.mediaUrls,
|
||||
},
|
||||
]);
|
||||
const mirrorText = normalizedPayloads
|
||||
.map((payload) => payload.text)
|
||||
.filter(Boolean)
|
||||
.join("\n");
|
||||
const mirrorMediaUrls = normalizedPayloads.flatMap(
|
||||
(payload) => resolveSendableOutboundReplyParts(payload).mediaUrls,
|
||||
);
|
||||
const normalizedPayloads = projectOutboundPayloadPlanForDelivery(outboundPlan);
|
||||
const mirrorProjection = projectOutboundPayloadPlanForMirror(outboundPlan);
|
||||
const mirrorText = mirrorProjection.text;
|
||||
const mirrorMediaUrls = mirrorProjection.mediaUrls;
|
||||
const primaryMediaUrl = mirrorMediaUrls[0] ?? params.mediaUrl ?? null;
|
||||
|
||||
if (params.dryRun) {
|
||||
|
||||
@@ -1,13 +1,32 @@
|
||||
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import type { ReplyPayload } from "../../auto-reply/types.js";
|
||||
import { typedCases } from "../../test-utils/typed-cases.js";
|
||||
import {
|
||||
createOutboundPayloadPlan,
|
||||
formatOutboundPayloadLog,
|
||||
normalizeOutboundPayloads,
|
||||
normalizeOutboundPayloadsForJson,
|
||||
normalizeReplyPayloadsForDelivery,
|
||||
projectOutboundPayloadPlanForDelivery,
|
||||
projectOutboundPayloadPlanForJson,
|
||||
projectOutboundPayloadPlanForMirror,
|
||||
projectOutboundPayloadPlanForOutbound,
|
||||
} from "./payloads.js";
|
||||
|
||||
function resolveMirrorProjection(payloads: readonly ReplyPayload[]) {
|
||||
const normalized = normalizeReplyPayloadsForDelivery(payloads);
|
||||
return {
|
||||
text: normalized
|
||||
.map((payload) => payload.text)
|
||||
.filter((text): text is string => Boolean(text))
|
||||
.join("\n"),
|
||||
mediaUrls: normalized.flatMap(
|
||||
(payload) => resolveSendableOutboundReplyParts(payload).mediaUrls,
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
describe("normalizeReplyPayloadsForDelivery", () => {
|
||||
it("parses directives, merges media, and preserves reply metadata", () => {
|
||||
expect(
|
||||
@@ -77,6 +96,167 @@ describe("normalizeReplyPayloadsForDelivery", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("keeps mixed NO_REPLY text literal and only suppresses exact sentinel payloads", () => {
|
||||
expect(
|
||||
normalizeReplyPayloadsForDelivery([
|
||||
{ text: "NO_REPLY thanks for the update" },
|
||||
{ text: "NO_REPLY" },
|
||||
{ text: "thanks NO_REPLY" },
|
||||
]),
|
||||
).toEqual([
|
||||
{
|
||||
text: "NO_REPLY thanks for the update",
|
||||
mediaUrls: undefined,
|
||||
mediaUrl: undefined,
|
||||
replyToId: undefined,
|
||||
replyToCurrent: false,
|
||||
replyToTag: false,
|
||||
audioAsVoice: false,
|
||||
},
|
||||
{
|
||||
text: "thanks NO_REPLY",
|
||||
mediaUrls: undefined,
|
||||
mediaUrl: undefined,
|
||||
replyToId: undefined,
|
||||
replyToCurrent: false,
|
||||
replyToTag: false,
|
||||
audioAsVoice: false,
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("keeps silent token payloads when media exists", () => {
|
||||
expect(
|
||||
normalizeReplyPayloadsForDelivery([
|
||||
{ text: "NO_REPLY", mediaUrl: "https://x.test/one.png" },
|
||||
{ text: '{"action":"NO_REPLY"}', mediaUrls: ["https://x.test/two.png"] },
|
||||
]),
|
||||
).toEqual([
|
||||
{
|
||||
text: "",
|
||||
mediaUrls: ["https://x.test/one.png"],
|
||||
mediaUrl: "https://x.test/one.png",
|
||||
replyToId: undefined,
|
||||
replyToCurrent: false,
|
||||
replyToTag: false,
|
||||
audioAsVoice: false,
|
||||
},
|
||||
{
|
||||
text: "",
|
||||
mediaUrls: ["https://x.test/two.png"],
|
||||
mediaUrl: undefined,
|
||||
replyToId: undefined,
|
||||
replyToCurrent: false,
|
||||
replyToTag: false,
|
||||
audioAsVoice: false,
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("is idempotent for already-normalized delivery payloads", () => {
|
||||
const once = normalizeReplyPayloadsForDelivery([
|
||||
{
|
||||
text: "Hello",
|
||||
mediaUrls: ["https://x.test/a.png"],
|
||||
replyToId: "123",
|
||||
replyToTag: true,
|
||||
replyToCurrent: true,
|
||||
audioAsVoice: true,
|
||||
},
|
||||
{
|
||||
text: "",
|
||||
channelData: { provider: "line" },
|
||||
},
|
||||
]);
|
||||
const twice = normalizeReplyPayloadsForDelivery(once);
|
||||
expect(twice).toEqual(once);
|
||||
});
|
||||
|
||||
it("captures a tricky payload matrix snapshot", () => {
|
||||
const input: ReplyPayload[] = [
|
||||
{ text: "NO_REPLY" },
|
||||
{ text: "NO_REPLY with details" },
|
||||
{ text: '{"action":"NO_REPLY"}' },
|
||||
{ text: '{"action":"NO_REPLY","note":"keep"}' },
|
||||
{ text: "NO_REPLY", mediaUrl: "https://x.test/m1.png" },
|
||||
{ text: "MEDIA:https://x.test/m2.png\n[[audio_as_voice]] [[reply_to: 444]] hi" },
|
||||
{ text: "headline", btw: { question: "what changed?" } },
|
||||
{ text: " \n\t ", channelData: { mode: "custom" } },
|
||||
{ text: "Reasoning block", isReasoning: true },
|
||||
];
|
||||
expect(normalizeReplyPayloadsForDelivery(input)).toMatchInlineSnapshot(`
|
||||
[
|
||||
{
|
||||
"audioAsVoice": false,
|
||||
"mediaUrl": undefined,
|
||||
"mediaUrls": undefined,
|
||||
"replyToCurrent": false,
|
||||
"replyToId": undefined,
|
||||
"replyToTag": false,
|
||||
"text": "NO_REPLY with details",
|
||||
},
|
||||
{
|
||||
"audioAsVoice": false,
|
||||
"mediaUrl": undefined,
|
||||
"mediaUrls": undefined,
|
||||
"replyToCurrent": false,
|
||||
"replyToId": undefined,
|
||||
"replyToTag": false,
|
||||
"text": "{"action":"NO_REPLY","note":"keep"}",
|
||||
},
|
||||
{
|
||||
"audioAsVoice": false,
|
||||
"mediaUrl": "https://x.test/m1.png",
|
||||
"mediaUrls": [
|
||||
"https://x.test/m1.png",
|
||||
],
|
||||
"replyToCurrent": false,
|
||||
"replyToId": undefined,
|
||||
"replyToTag": false,
|
||||
"text": "",
|
||||
},
|
||||
{
|
||||
"audioAsVoice": true,
|
||||
"mediaUrl": "https://x.test/m2.png",
|
||||
"mediaUrls": [
|
||||
"https://x.test/m2.png",
|
||||
],
|
||||
"replyToCurrent": false,
|
||||
"replyToId": "444",
|
||||
"replyToTag": true,
|
||||
"text": "hi",
|
||||
},
|
||||
{
|
||||
"audioAsVoice": false,
|
||||
"btw": {
|
||||
"question": "what changed?",
|
||||
},
|
||||
"mediaUrl": undefined,
|
||||
"mediaUrls": undefined,
|
||||
"replyToCurrent": false,
|
||||
"replyToId": undefined,
|
||||
"replyToTag": false,
|
||||
"text": "BTW
|
||||
Question: what changed?
|
||||
|
||||
headline",
|
||||
},
|
||||
{
|
||||
"audioAsVoice": false,
|
||||
"channelData": {
|
||||
"mode": "custom",
|
||||
},
|
||||
"mediaUrl": undefined,
|
||||
"mediaUrls": undefined,
|
||||
"replyToCurrent": false,
|
||||
"replyToId": undefined,
|
||||
"replyToTag": false,
|
||||
"text": "",
|
||||
},
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
it("keeps renderable channel-data payloads and reply-to-current markers", () => {
|
||||
expect(
|
||||
normalizeReplyPayloadsForDelivery([
|
||||
@@ -211,6 +391,64 @@ describe("normalizeOutboundPayloads", () => {
|
||||
]),
|
||||
).toEqual([{ text: "BTW\nQuestion: what is 17 * 19?\n\n323", mediaUrls: [] }]);
|
||||
});
|
||||
|
||||
it("keeps delivery and mirror projections aligned", () => {
|
||||
const payloads: ReplyPayload[] = [
|
||||
{ text: "Hello" },
|
||||
{ text: "MEDIA:https://x.test/a.png\nMEDIA:https://x.test/b.png" },
|
||||
{ text: '{"action":"NO_REPLY"}' },
|
||||
{ text: "NO_REPLY", mediaUrl: "https://x.test/c.png" },
|
||||
];
|
||||
|
||||
const deliveryProjection = normalizeOutboundPayloads(payloads);
|
||||
const mirrorProjection = resolveMirrorProjection(payloads);
|
||||
|
||||
expect(mirrorProjection.text).toBe(
|
||||
deliveryProjection
|
||||
.map((payload) => payload.text)
|
||||
.filter((text) => Boolean(text))
|
||||
.join("\n"),
|
||||
);
|
||||
expect(mirrorProjection.mediaUrls).toEqual(
|
||||
deliveryProjection.flatMap((payload) => payload.mediaUrls),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("OutboundPayloadPlan projections", () => {
|
||||
const matrix: ReplyPayload[] = [
|
||||
{ text: "hello" },
|
||||
{ text: "NO_REPLY" },
|
||||
{ text: "NO_REPLY", mediaUrl: "https://x.test/1.png" },
|
||||
{ text: "MEDIA:https://x.test/2.png\nworld" },
|
||||
{ text: '{"action":"NO_REPLY","note":"keep"}' },
|
||||
{ text: "reasoning", isReasoning: true },
|
||||
{ text: " \n", channelData: { mode: "flex" } },
|
||||
];
|
||||
|
||||
it("matches normalizeReplyPayloadsForDelivery", () => {
|
||||
const plan = createOutboundPayloadPlan(matrix);
|
||||
expect(projectOutboundPayloadPlanForDelivery(plan)).toEqual(
|
||||
normalizeReplyPayloadsForDelivery(matrix),
|
||||
);
|
||||
});
|
||||
|
||||
it("matches normalizeOutboundPayloads", () => {
|
||||
const plan = createOutboundPayloadPlan(matrix);
|
||||
expect(projectOutboundPayloadPlanForOutbound(plan)).toEqual(normalizeOutboundPayloads(matrix));
|
||||
});
|
||||
|
||||
it("matches normalizeOutboundPayloadsForJson", () => {
|
||||
const plan = createOutboundPayloadPlan(matrix);
|
||||
expect(projectOutboundPayloadPlanForJson(plan)).toEqual(
|
||||
normalizeOutboundPayloadsForJson(matrix),
|
||||
);
|
||||
});
|
||||
|
||||
it("matches mirror projection behavior", () => {
|
||||
const plan = createOutboundPayloadPlan(matrix);
|
||||
expect(projectOutboundPayloadPlanForMirror(plan)).toEqual(resolveMirrorProjection(matrix));
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatOutboundPayloadLog", () => {
|
||||
|
||||
@@ -30,6 +30,18 @@ export type OutboundPayloadJson = {
|
||||
channelData?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type OutboundPayloadPlan = {
|
||||
payload: ReplyPayload;
|
||||
parts: ReturnType<typeof resolveSendableOutboundReplyParts>;
|
||||
hasInteractive: boolean;
|
||||
hasChannelData: boolean;
|
||||
};
|
||||
|
||||
export type OutboundPayloadMirror = {
|
||||
text: string;
|
||||
mediaUrls: string[];
|
||||
};
|
||||
|
||||
function mergeMediaUrls(...lists: Array<ReadonlyArray<string | undefined> | undefined>): string[] {
|
||||
const seen = new Set<string>();
|
||||
const merged: string[] = [];
|
||||
@@ -52,85 +64,108 @@ function mergeMediaUrls(...lists: Array<ReadonlyArray<string | undefined> | unde
|
||||
return merged;
|
||||
}
|
||||
|
||||
export function normalizeReplyPayloadsForDelivery(
|
||||
payloads: readonly ReplyPayload[],
|
||||
): ReplyPayload[] {
|
||||
const normalized: ReplyPayload[] = [];
|
||||
for (const payload of payloads) {
|
||||
if (shouldSuppressReasoningPayload(payload)) {
|
||||
continue;
|
||||
}
|
||||
const parsed = parseReplyDirectives(payload.text ?? "");
|
||||
const explicitMediaUrls = payload.mediaUrls ?? parsed.mediaUrls;
|
||||
const explicitMediaUrl = payload.mediaUrl ?? parsed.mediaUrl;
|
||||
const mergedMedia = mergeMediaUrls(
|
||||
explicitMediaUrls,
|
||||
explicitMediaUrl ? [explicitMediaUrl] : undefined,
|
||||
);
|
||||
const hasMultipleMedia = (explicitMediaUrls?.length ?? 0) > 1;
|
||||
const resolvedMediaUrl = hasMultipleMedia ? undefined : explicitMediaUrl;
|
||||
const next: ReplyPayload = {
|
||||
...payload,
|
||||
text:
|
||||
formatBtwTextForExternalDelivery({
|
||||
...payload,
|
||||
text: parsed.text ?? "",
|
||||
}) ?? "",
|
||||
mediaUrls: mergedMedia.length ? mergedMedia : undefined,
|
||||
mediaUrl: resolvedMediaUrl,
|
||||
replyToId: payload.replyToId ?? parsed.replyToId,
|
||||
replyToTag: payload.replyToTag || parsed.replyToTag,
|
||||
replyToCurrent: payload.replyToCurrent || parsed.replyToCurrent,
|
||||
audioAsVoice: Boolean(payload.audioAsVoice || parsed.audioAsVoice),
|
||||
};
|
||||
if (parsed.isSilent && mergedMedia.length === 0) {
|
||||
continue;
|
||||
}
|
||||
if (!isRenderablePayload(next)) {
|
||||
continue;
|
||||
}
|
||||
normalized.push(next);
|
||||
function createOutboundPayloadPlanEntry(payload: ReplyPayload): OutboundPayloadPlan | null {
|
||||
if (shouldSuppressReasoningPayload(payload)) {
|
||||
return null;
|
||||
}
|
||||
return normalized;
|
||||
const parsed = parseReplyDirectives(payload.text ?? "");
|
||||
const explicitMediaUrls = payload.mediaUrls ?? parsed.mediaUrls;
|
||||
const explicitMediaUrl = payload.mediaUrl ?? parsed.mediaUrl;
|
||||
const mergedMedia = mergeMediaUrls(
|
||||
explicitMediaUrls,
|
||||
explicitMediaUrl ? [explicitMediaUrl] : undefined,
|
||||
);
|
||||
if (parsed.isSilent && mergedMedia.length === 0) {
|
||||
return null;
|
||||
}
|
||||
const hasMultipleMedia = (explicitMediaUrls?.length ?? 0) > 1;
|
||||
const resolvedMediaUrl = hasMultipleMedia ? undefined : explicitMediaUrl;
|
||||
const normalizedPayload: ReplyPayload = {
|
||||
...payload,
|
||||
text:
|
||||
formatBtwTextForExternalDelivery({
|
||||
...payload,
|
||||
text: parsed.text ?? "",
|
||||
}) ?? "",
|
||||
mediaUrls: mergedMedia.length ? mergedMedia : undefined,
|
||||
mediaUrl: resolvedMediaUrl,
|
||||
replyToId: payload.replyToId ?? parsed.replyToId,
|
||||
replyToTag: payload.replyToTag || parsed.replyToTag,
|
||||
replyToCurrent: payload.replyToCurrent || parsed.replyToCurrent,
|
||||
audioAsVoice: Boolean(payload.audioAsVoice || parsed.audioAsVoice),
|
||||
};
|
||||
if (!isRenderablePayload(normalizedPayload)) {
|
||||
return null;
|
||||
}
|
||||
const parts = resolveSendableOutboundReplyParts(normalizedPayload);
|
||||
const hasChannelData = hasReplyChannelData(normalizedPayload.channelData);
|
||||
return {
|
||||
payload: normalizedPayload,
|
||||
parts,
|
||||
hasInteractive: hasInteractiveReplyBlocks(normalizedPayload.interactive),
|
||||
hasChannelData,
|
||||
};
|
||||
}
|
||||
|
||||
export function normalizeOutboundPayloads(
|
||||
export function createOutboundPayloadPlan(
|
||||
payloads: readonly ReplyPayload[],
|
||||
): OutboundPayloadPlan[] {
|
||||
// Intentionally scoped to channel-agnostic normalization and projection inputs.
|
||||
// Transport concerns (queueing, hooks, retries), channel transforms, and
|
||||
// heartbeat-specific token semantics remain outside this plan boundary.
|
||||
const plan: OutboundPayloadPlan[] = [];
|
||||
for (const payload of payloads) {
|
||||
const entry = createOutboundPayloadPlanEntry(payload);
|
||||
if (!entry) {
|
||||
continue;
|
||||
}
|
||||
plan.push(entry);
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
||||
export function projectOutboundPayloadPlanForDelivery(
|
||||
plan: readonly OutboundPayloadPlan[],
|
||||
): ReplyPayload[] {
|
||||
return plan.map((entry) => entry.payload);
|
||||
}
|
||||
|
||||
export function projectOutboundPayloadPlanForOutbound(
|
||||
plan: readonly OutboundPayloadPlan[],
|
||||
): NormalizedOutboundPayload[] {
|
||||
const normalizedPayloads: NormalizedOutboundPayload[] = [];
|
||||
for (const payload of normalizeReplyPayloadsForDelivery(payloads)) {
|
||||
const parts = resolveSendableOutboundReplyParts(payload);
|
||||
const interactive = payload.interactive;
|
||||
const channelData = payload.channelData;
|
||||
const hasChannelData = hasReplyChannelData(channelData);
|
||||
const hasInteractive = hasInteractiveReplyBlocks(interactive);
|
||||
const text = parts.text;
|
||||
for (const entry of plan) {
|
||||
const payload = entry.payload;
|
||||
const text = entry.parts.text;
|
||||
if (
|
||||
!hasReplyPayloadContent({ ...payload, text, mediaUrls: parts.mediaUrls }, { hasChannelData })
|
||||
!hasReplyPayloadContent(
|
||||
{ ...payload, text, mediaUrls: entry.parts.mediaUrls },
|
||||
{ hasChannelData: entry.hasChannelData },
|
||||
)
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
normalizedPayloads.push({
|
||||
text,
|
||||
mediaUrls: parts.mediaUrls,
|
||||
mediaUrls: entry.parts.mediaUrls,
|
||||
audioAsVoice: payload.audioAsVoice === true ? true : undefined,
|
||||
...(hasInteractive ? { interactive } : {}),
|
||||
...(hasChannelData ? { channelData } : {}),
|
||||
...(entry.hasInteractive ? { interactive: payload.interactive } : {}),
|
||||
...(entry.hasChannelData ? { channelData: payload.channelData } : {}),
|
||||
});
|
||||
}
|
||||
return normalizedPayloads;
|
||||
}
|
||||
|
||||
export function normalizeOutboundPayloadsForJson(
|
||||
payloads: readonly ReplyPayload[],
|
||||
export function projectOutboundPayloadPlanForJson(
|
||||
plan: readonly OutboundPayloadPlan[],
|
||||
): OutboundPayloadJson[] {
|
||||
const normalized: OutboundPayloadJson[] = [];
|
||||
for (const payload of normalizeReplyPayloadsForDelivery(payloads)) {
|
||||
const parts = resolveSendableOutboundReplyParts(payload);
|
||||
for (const entry of plan) {
|
||||
const payload = entry.payload;
|
||||
normalized.push({
|
||||
text: parts.text,
|
||||
text: entry.parts.text,
|
||||
mediaUrl: payload.mediaUrl ?? null,
|
||||
mediaUrls: parts.mediaUrls.length ? parts.mediaUrls : undefined,
|
||||
mediaUrls: entry.parts.mediaUrls.length ? entry.parts.mediaUrls : undefined,
|
||||
audioAsVoice: payload.audioAsVoice === true ? true : undefined,
|
||||
interactive: payload.interactive,
|
||||
channelData: payload.channelData,
|
||||
@@ -139,6 +174,49 @@ export function normalizeOutboundPayloadsForJson(
|
||||
return normalized;
|
||||
}
|
||||
|
||||
export function projectOutboundPayloadPlanForMirror(
|
||||
plan: readonly OutboundPayloadPlan[],
|
||||
): OutboundPayloadMirror {
|
||||
return {
|
||||
text: plan
|
||||
.map((entry) => entry.payload.text)
|
||||
.filter((text): text is string => Boolean(text))
|
||||
.join("\n"),
|
||||
mediaUrls: plan.flatMap((entry) => entry.parts.mediaUrls),
|
||||
};
|
||||
}
|
||||
|
||||
export function summarizeOutboundPayloadForTransport(
|
||||
payload: ReplyPayload,
|
||||
): NormalizedOutboundPayload {
|
||||
const parts = resolveSendableOutboundReplyParts(payload);
|
||||
return {
|
||||
text: parts.text,
|
||||
mediaUrls: parts.mediaUrls,
|
||||
audioAsVoice: payload.audioAsVoice === true ? true : undefined,
|
||||
interactive: payload.interactive,
|
||||
channelData: payload.channelData,
|
||||
};
|
||||
}
|
||||
|
||||
export function normalizeReplyPayloadsForDelivery(
|
||||
payloads: readonly ReplyPayload[],
|
||||
): ReplyPayload[] {
|
||||
return projectOutboundPayloadPlanForDelivery(createOutboundPayloadPlan(payloads));
|
||||
}
|
||||
|
||||
export function normalizeOutboundPayloads(
|
||||
payloads: readonly ReplyPayload[],
|
||||
): NormalizedOutboundPayload[] {
|
||||
return projectOutboundPayloadPlanForOutbound(createOutboundPayloadPlan(payloads));
|
||||
}
|
||||
|
||||
export function normalizeOutboundPayloadsForJson(
|
||||
payloads: readonly ReplyPayload[],
|
||||
): OutboundPayloadJson[] {
|
||||
return projectOutboundPayloadPlanForJson(createOutboundPayloadPlan(payloads));
|
||||
}
|
||||
|
||||
export function formatOutboundPayloadLog(
|
||||
payload: Pick<NormalizedOutboundPayload, "text" | "channelData"> & {
|
||||
mediaUrls: readonly string[];
|
||||
|
||||
Reference in New Issue
Block a user