fix plugin sent hooks for Slack replies

This commit is contained in:
Bek
2026-05-05 18:22:46 -04:00
parent a36981a2c5
commit 19ccab136f
6 changed files with 457 additions and 13 deletions

View File

@@ -6,6 +6,7 @@ const SAME_TEXT = "same reply";
const createSlackDraftStreamMock = vi.fn();
const deliverRepliesMock = vi.fn(async () => {});
const emitSlackMessageSentHooksMock = vi.fn();
const finalizeSlackPreviewEditMock = vi.fn(async () => {});
const postMessageMock = vi.fn(async () => ({ ok: true, ts: "171234.999" }));
const appendSlackStreamMock = vi.fn(async () => {});
@@ -512,6 +513,7 @@ vi.mock("../replies.js", () => ({
markSent: () => {},
}),
deliverReplies: deliverRepliesMock,
emitSlackMessageSentHooks: emitSlackMessageSentHooksMock,
readSlackReplyBlocks: () => undefined,
resolveDeliveredSlackReplyThreadTs: (params: {
replyToMode: "off" | "first" | "all" | "batched";
@@ -519,6 +521,7 @@ vi.mock("../replies.js", () => ({
replyThreadTs?: string;
}) =>
(params.replyToMode === "off" ? undefined : params.payloadReplyToId) ?? params.replyThreadTs,
resolveSlackSentHookContent: (content: string) => content,
resolveSlackThreadTs: () => mockedReplyThreadTs,
}));
@@ -610,6 +613,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
beforeEach(() => {
createSlackDraftStreamMock.mockReset();
deliverRepliesMock.mockReset();
emitSlackMessageSentHooksMock.mockReset();
finalizeSlackPreviewEditMock.mockReset();
postMessageMock.mockClear();
appendSlackStreamMock.mockReset();
@@ -682,6 +686,14 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
text: "✅",
}),
);
expect(emitSlackMessageSentHooksMock).toHaveBeenCalledWith({
sessionKeyForInternalHooks: "agent:agent-1:slack:C123",
target: "channel:C123",
accountId: "default",
content: "✅",
success: true,
messageId: "171234.567",
});
expect(deliverRepliesMock).not.toHaveBeenCalled();
expect(draftStream.clear).not.toHaveBeenCalled();
});
@@ -932,6 +944,13 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
text: FINAL_REPLY_TEXT,
}),
);
expect(emitSlackMessageSentHooksMock).toHaveBeenCalledWith({
sessionKeyForInternalHooks: "agent:agent-1:slack:C123",
target: "channel:C123",
accountId: "default",
content: FINAL_REPLY_TEXT,
success: true,
});
expect(deliverRepliesMock).not.toHaveBeenCalled();
});
@@ -980,6 +999,24 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
);
});
it("passes the prepared dispatch session key to reply sent hooks", async () => {
const dispatchSessionKey = "agent:agent-1:slack:C123:thread:171234";
mockedDispatchSequence = [{ kind: "final", payload: { text: FINAL_REPLY_TEXT } }];
await dispatchPreparedSlackMessage(
createPreparedSlackMessage({
ctxPayload: { SessionKey: dispatchSessionKey },
}),
);
expect(deliverRepliesMock).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ text: FINAL_REPLY_TEXT })],
sessionKeyForInternalHooks: dispatchSessionKey,
}),
);
});
it("keeps multi-part block replies in the first reply thread after the plan is consumed", async () => {
mockedReplyThreadTsSequence = [THREAD_TS, undefined];
mockedDispatchSequence = [
@@ -1087,6 +1124,90 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
expect(session.stopped).toBe(true);
});
it("emits a sent hook for the delivered native stream prefix before fallback sends the pending tail", async () => {
mockedNativeStreaming = true;
mockedDispatchSequence = [
{ kind: "block", payload: { text: "first flushed" } },
{ kind: "final", payload: { text: "second buffered" } },
];
const session = {
channel: "C123",
threadTs: THREAD_TS,
stopped: false,
delivered: true,
pendingText: "",
};
startSlackStreamMock.mockResolvedValueOnce(session);
appendSlackStreamMock.mockImplementationOnce(async () => {
session.pendingText = "\nsecond buffered";
});
stopSlackStreamMock.mockRejectedValueOnce(
new TestSlackStreamNotDeliveredError("\nsecond buffered", "user_not_found"),
);
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(emitSlackMessageSentHooksMock).toHaveBeenCalledWith({
sessionKeyForInternalHooks: "agent:agent-1:slack:C123",
target: "channel:C123",
accountId: "default",
content: "first flushed",
success: true,
});
expect(deliverRepliesMock).toHaveBeenCalledWith(
expect.objectContaining({
replyThreadTs: THREAD_TS,
replies: [expect.objectContaining({ text: "second buffered" })],
}),
);
});
it("excludes append-fallback text from the finalized native stream sent hook", async () => {
mockedNativeStreaming = true;
mockedDispatchSequence = [
{ kind: "block", payload: { text: "first flushed" } },
{ kind: "tool", payload: { text: "second buffered" } },
{ kind: "final", payload: { text: "third failed" } },
];
const session = {
channel: "C123",
threadTs: THREAD_TS,
stopped: false,
delivered: true,
pendingText: "",
};
startSlackStreamMock.mockResolvedValueOnce(session);
appendSlackStreamMock
.mockImplementationOnce(async () => {
session.pendingText = "\nsecond buffered";
})
.mockImplementationOnce(async () => {
session.pendingText += "\nthird failed";
throw new Error("network socket closed");
});
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(deliverRepliesMock).toHaveBeenCalledWith(
expect.objectContaining({
replyThreadTs: THREAD_TS,
replies: [expect.objectContaining({ text: "second buffered\nthird failed" })],
}),
);
expect(emitSlackMessageSentHooksMock).toHaveBeenCalledWith({
sessionKeyForInternalHooks: "agent:agent-1:slack:C123",
target: "channel:C123",
accountId: "default",
content: "first flushed",
success: true,
});
expect(emitSlackMessageSentHooksMock).not.toHaveBeenCalledWith(
expect.objectContaining({
content: "first flushed\nsecond buffered",
}),
);
});
it("routes all pending native stream text through chunked sender when an append flush fails", async () => {
mockedNativeStreaming = true;
mockedDispatchSequence = [

View File

@@ -67,8 +67,10 @@ import { escapeSlackMrkdwn } from "../mrkdwn.js";
import {
createSlackReplyDeliveryPlan,
deliverReplies,
emitSlackMessageSentHooks,
readSlackReplyBlocks,
resolveDeliveredSlackReplyThreadTs,
resolveSlackSentHookContent,
resolveSlackThreadTs,
} from "../replies.js";
import {
@@ -245,6 +247,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
const { ctx, account, message, route } = prepared;
const cfg = ctx.cfg;
const runtime = ctx.runtime;
const sessionKeyForSentHooks = prepared.ctxPayload.SessionKey ?? route.sessionKey;
// Resolve agent identity for Slack chat:write.customize overrides.
const outboundIdentity = resolveAgentOutboundIdentity(cfg, route.agentId);
@@ -470,6 +473,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
});
let streamSession: SlackStreamSession | null = null;
let streamFailed = false;
let streamedSentHookContent = "";
let streamedFallbackSentHookContent = "";
let streamedSentHookEmitted = false;
let usedReplyThreadTs: string | undefined;
let usedBlockReplyThreadTs: string | undefined;
let observedReplyDelivery = false;
@@ -518,12 +524,16 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
target: prepared.replyTarget,
token: ctx.botToken,
accountId: account.accountId,
sessionKeyForInternalHooks: sessionKeyForSentHooks,
runtime,
textLimit: ctx.textLimit,
replyThreadTs: session.threadTs,
replyToMode: prepared.replyToMode,
...(slackIdentity ? { identity: slackIdentity } : {}),
});
streamedFallbackSentHookContent = streamedFallbackSentHookContent
? `${streamedFallbackSentHookContent}\n${fallbackText}`
: fallbackText;
markSlackStreamFallbackDelivered(session);
observedReplyDelivery = true;
usedReplyThreadTs ??= session.threadTs;
@@ -563,6 +573,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
target: prepared.replyTarget,
token: ctx.botToken,
accountId: account.accountId,
sessionKeyForInternalHooks: sessionKeyForSentHooks,
runtime,
textLimit: ctx.textLimit,
replyThreadTs,
@@ -607,6 +618,38 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
return true;
};
const emitNativeStreamSentHook = (
session: SlackStreamSession,
params?: { pendingText?: string },
): void => {
if (streamedSentHookEmitted || !session.delivered) {
return;
}
let content = streamedSentHookContent.trim();
const pendingText = (params?.pendingText ?? streamedFallbackSentHookContent).trim();
if (pendingText) {
const maxOverlap = Math.min(content.length, pendingText.length);
for (let length = maxOverlap; length > 0; length -= 1) {
const fallbackPrefix = pendingText.slice(0, length);
if (content === fallbackPrefix || content.endsWith(`\n${fallbackPrefix}`)) {
content = content.slice(0, -length).trim();
break;
}
}
}
if (!content) {
return;
}
streamedSentHookEmitted = true;
emitSlackMessageSentHooks({
sessionKeyForInternalHooks: sessionKeyForSentHooks,
target: prepared.replyTarget,
accountId: account.accountId,
content,
success: true,
});
};
const deliverWithStreaming = async (params: {
payload: ReplyPayload;
kind: ReplyDispatchKind;
@@ -671,6 +714,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
}),
userId: message.user,
});
streamedSentHookContent = text;
// startSlackStream may only buffer locally. Count delivery only after
// the SDK reports a real Slack response.
if (streamSession.delivered) {
@@ -702,6 +746,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
session: streamSession,
text: "\n" + text,
});
streamedSentHookContent += "\n" + text;
// appendSlackStream also buffers locally below the SDK threshold; avoid
// optimistic "done" status until Slack acknowledges a flush.
if (streamSession.delivered) {
@@ -835,8 +880,16 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
kind: info.kind,
});
},
onPreviewFinalized: (_preview) => {
onPreviewFinalized: (preview) => {
const finalThreadTs = usedReplyThreadTs ?? statusThreadTs;
emitSlackMessageSentHooks({
sessionKeyForInternalHooks: sessionKeyForSentHooks,
target: prepared.replyTarget,
accountId: account.accountId,
content: resolveSlackSentHookContent(trimmedFinalText, slackBlocks),
success: true,
messageId: preview.messageId,
});
observedReplyDelivery = true;
replyPlan.markSent();
deliveryTracker.markDelivered({ kind: info.kind, payload, threadTs: finalThreadTs });
@@ -1227,8 +1280,10 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
if (finalStream && !finalStream.stopped) {
try {
await stopSlackStream({ session: finalStream });
emitNativeStreamSentHook(finalStream);
} catch (err) {
if (err instanceof SlackStreamNotDeliveredError) {
emitNativeStreamSentHook(finalStream, { pendingText: err.pendingText });
streamFallbackDelivered = await deliverPendingStreamFallback(finalStream, err);
} else {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${formatErrorMessage(err)}`));

View File

@@ -1,9 +1,29 @@
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
const sendMock = vi.fn();
const messageHookRunner = vi.hoisted(() => ({
hasHooks: vi.fn((_name?: string) => false),
runMessageSent: vi.fn(async () => {}),
}));
const triggerInternalHook = vi.hoisted(() => vi.fn(async () => {}));
vi.mock("../send.js", () => ({
sendMessageSlack: (...args: unknown[]) => sendMock(...args),
}));
vi.mock("openclaw/plugin-sdk/hook-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/hook-runtime")>();
return {
...actual,
triggerInternalHook,
};
});
vi.mock("openclaw/plugin-sdk/plugin-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/plugin-runtime")>();
return {
...actual,
getGlobalHookRunner: () => messageHookRunner,
};
});
let deliverReplies: typeof import("./replies.js").deliverReplies;
let createSlackReplyDeliveryPlan: typeof import("./replies.js").createSlackReplyDeliveryPlan;
@@ -38,6 +58,10 @@ describe("deliverReplies identity passthrough", () => {
beforeEach(() => {
sendMock.mockReset();
messageHookRunner.hasHooks.mockReset();
messageHookRunner.hasHooks.mockReturnValue(false);
messageHookRunner.runMessageSent.mockReset();
triggerInternalHook.mockReset();
});
it("passes identity to sendMessageSlack for text replies", async () => {
sendMock.mockResolvedValue(undefined);
@@ -111,6 +135,44 @@ describe("deliverReplies identity passthrough", () => {
);
});
it("uses Slack block fallback text for block-only message_sent content", async () => {
messageHookRunner.hasHooks.mockImplementation((name?: string) => name === "message_sent");
sendMock.mockResolvedValue({ messageId: "1712345678.123456", channelId: "C123" });
const blocks = [
{
type: "section",
text: { type: "mrkdwn", text: "*Choose* a deployment target" },
},
];
await deliverReplies(
baseParams({
replies: [
{
text: "",
channelData: {
slack: {
blocks,
},
},
},
],
}),
);
expect(messageHookRunner.runMessageSent).toHaveBeenCalledWith(
expect.objectContaining({
content: "*Choose* a deployment target",
success: true,
messageId: "1712345678.123456",
}),
expect.objectContaining({
channelId: "slack",
messageId: "1712345678.123456",
}),
);
});
it("renders interactive replies into Slack blocks during delivery", async () => {
sendMock.mockResolvedValue(undefined);
@@ -151,6 +213,94 @@ describe("deliverReplies identity passthrough", () => {
});
});
it("emits message_sent success with delivered Slack message id and session context", async () => {
messageHookRunner.hasHooks.mockImplementation((name?: string) => name === "message_sent");
sendMock.mockResolvedValue({ messageId: "1712345678.123456", channelId: "C123" });
await deliverReplies(
baseParams({
accountId: "work",
sessionKeyForInternalHooks: "agent:test:slack:channel:C123",
}),
);
expect(messageHookRunner.runMessageSent).toHaveBeenCalledWith(
expect.objectContaining({
to: "C123",
content: "hello",
success: true,
messageId: "1712345678.123456",
sessionKey: "agent:test:slack:channel:C123",
}),
expect.objectContaining({
channelId: "slack",
accountId: "work",
conversationId: "C123",
sessionKey: "agent:test:slack:channel:C123",
messageId: "1712345678.123456",
}),
);
});
it("emits internal message:sent when session hook context is available", async () => {
sendMock.mockResolvedValue({ messageId: "1712345678.123456", channelId: "C123" });
await deliverReplies(
baseParams({
accountId: "work",
sessionKeyForInternalHooks: "agent:test:slack:channel:C123",
}),
);
expect(messageHookRunner.runMessageSent).not.toHaveBeenCalled();
expect(triggerInternalHook).toHaveBeenCalledWith(
expect.objectContaining({
type: "message",
action: "sent",
sessionKey: "agent:test:slack:channel:C123",
context: expect.objectContaining({
to: "C123",
content: "hello",
success: true,
channelId: "slack",
accountId: "work",
conversationId: "C123",
messageId: "1712345678.123456",
}),
}),
);
});
it("emits message_sent failure payload before rethrowing Slack send errors", async () => {
messageHookRunner.hasHooks.mockImplementation((name?: string) => name === "message_sent");
sendMock.mockRejectedValue(new Error("slack down"));
await expect(
deliverReplies(
baseParams({
accountId: "work",
sessionKeyForInternalHooks: "agent:test:slack:channel:C123",
}),
),
).rejects.toThrow("slack down");
expect(messageHookRunner.runMessageSent).toHaveBeenCalledWith(
expect.objectContaining({
to: "C123",
content: "hello",
success: false,
error: "slack down",
sessionKey: "agent:test:slack:channel:C123",
}),
expect.objectContaining({
channelId: "slack",
accountId: "work",
conversationId: "C123",
sessionKey: "agent:test:slack:channel:C123",
}),
);
});
it("rejects replies when merged Slack blocks exceed the platform limit", async () => {
sendMock.mockResolvedValue(undefined);

View File

@@ -1,4 +1,15 @@
import type { MarkdownTableMode, OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import {
buildCanonicalSentMessageHookContext,
createInternalHookEvent,
fireAndForgetHook,
toInternalMessageSentContext,
toPluginMessageContext,
toPluginMessageSentEvent,
triggerInternalHook,
} from "openclaw/plugin-sdk/hook-runtime";
import { getGlobalHookRunner } from "openclaw/plugin-sdk/plugin-runtime";
import {
chunkMarkdownTextWithMode,
isSilentReplyText,
@@ -12,11 +23,75 @@ import {
} from "openclaw/plugin-sdk/reply-payload";
import { createReplyReferencePlanner } from "openclaw/plugin-sdk/reply-reference";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { buildSlackBlocksFallbackText } from "../blocks-fallback.js";
import { markdownToSlackMrkdwnChunks } from "../format.js";
import { SLACK_TEXT_LIMIT } from "../limits.js";
import { resolveSlackReplyBlocks } from "../reply-blocks.js";
import { truncateSlackText } from "../truncate.js";
import { sendMessageSlack, type SlackSendIdentity } from "./send.runtime.js";
export type SlackMessageSentHookParams = {
sessionKeyForInternalHooks?: string;
target: string;
accountId?: string;
content: string;
success: boolean;
error?: string;
messageId?: string;
};
export function resolveSlackSentHookContent(
content: string,
slackBlocks?: ReturnType<typeof readSlackReplyBlocks>,
): string {
const fallback = slackBlocks?.length ? buildSlackBlocksFallbackText(slackBlocks) : "";
return truncateSlackText(content || fallback, SLACK_TEXT_LIMIT);
}
export function emitSlackMessageSentHooks(params: SlackMessageSentHookParams): void {
const hookRunner = getGlobalHookRunner();
const hasMessageSentHooks = hookRunner?.hasHooks("message_sent") ?? false;
if (!hasMessageSentHooks && !params.sessionKeyForInternalHooks) {
return;
}
const canonical = buildCanonicalSentMessageHookContext({
to: params.target,
content: params.content,
success: params.success,
error: params.error,
channelId: "slack",
accountId: params.accountId,
conversationId: params.target,
sessionKey: params.sessionKeyForInternalHooks,
messageId: params.messageId,
});
if (hasMessageSentHooks) {
fireAndForgetHook(
Promise.resolve(
hookRunner!.runMessageSent(
toPluginMessageSentEvent(canonical),
toPluginMessageContext(canonical),
),
),
"slack: message_sent plugin hook failed",
);
}
if (!params.sessionKeyForInternalHooks) {
return;
}
fireAndForgetHook(
triggerInternalHook(
createInternalHookEvent(
"message",
"sent",
params.sessionKeyForInternalHooks,
toInternalMessageSentContext(canonical),
),
),
"slack: message:sent internal hook failed",
);
}
export function readSlackReplyBlocks(payload: ReplyPayload) {
return resolveSlackReplyBlocks(payload);
}
@@ -38,12 +113,41 @@ export async function deliverReplies(params: {
target: string;
token: string;
accountId?: string;
sessionKeyForInternalHooks?: string;
runtime: RuntimeEnv;
textLimit: number;
replyThreadTs?: string;
replyToMode: "off" | "first" | "all" | "batched";
identity?: SlackSendIdentity;
}) {
const sendWithSentHook = async (
content: string,
opts: Parameters<typeof sendMessageSlack>[2],
hookContent = content,
) => {
try {
const result = await sendMessageSlack(params.target, content, opts);
emitSlackMessageSentHooks({
sessionKeyForInternalHooks: params.sessionKeyForInternalHooks,
target: params.target,
accountId: params.accountId,
content: hookContent,
success: true,
messageId: result?.messageId,
});
return result;
} catch (err) {
emitSlackMessageSentHooks({
sessionKeyForInternalHooks: params.sessionKeyForInternalHooks,
target: params.target,
accountId: params.accountId,
content: hookContent,
success: false,
error: formatErrorMessage(err),
});
throw err;
}
};
for (const payload of params.replies) {
const threadTs = resolveDeliveredSlackReplyThreadTs({
replyToMode: params.replyToMode,
@@ -64,14 +168,18 @@ export async function deliverReplies(params: {
if (trimmed && isSilentReplyText(trimmed, SILENT_REPLY_TOKEN)) {
continue;
}
await sendMessageSlack(params.target, trimmed, {
cfg: params.cfg,
token: params.token,
threadTs,
accountId: params.accountId,
...(slackBlocks?.length ? { blocks: slackBlocks } : {}),
...(params.identity ? { identity: params.identity } : {}),
});
await sendWithSentHook(
trimmed,
{
cfg: params.cfg,
token: params.token,
threadTs,
accountId: params.accountId,
...(slackBlocks?.length ? { blocks: slackBlocks } : {}),
...(params.identity ? { identity: params.identity } : {}),
},
resolveSlackSentHookContent(trimmed, slackBlocks),
);
params.runtime.log?.(`delivered reply to ${params.target}`);
continue;
}
@@ -89,7 +197,7 @@ export async function deliverReplies(params: {
}
: undefined,
sendText: async (trimmed) => {
await sendMessageSlack(params.target, trimmed, {
await sendWithSentHook(trimmed, {
cfg: params.cfg,
token: params.token,
threadTs,
@@ -98,7 +206,7 @@ export async function deliverReplies(params: {
});
},
sendMedia: async ({ mediaUrl, caption }) => {
await sendMessageSlack(params.target, caption ?? "", {
await sendWithSentHook(caption ?? "", {
cfg: params.cfg,
token: params.token,
mediaUrl,

View File

@@ -1749,12 +1749,21 @@ describe("deliverOutboundPayloads", () => {
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hello" }],
session: { key: "agent:test:matrix:room" },
deps: { matrix: sendMatrix },
});
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
expect.objectContaining({ to: "!room:example", content: "hello", success: true }),
expect.objectContaining({ channelId: "matrix" }),
expect.objectContaining({
to: "!room:example",
content: "hello",
success: true,
sessionKey: "agent:test:matrix:room",
}),
expect.objectContaining({
channelId: "matrix",
sessionKey: "agent:test:matrix:room",
}),
);
});

View File

@@ -698,6 +698,7 @@ function createMessageSentEmitter(params: {
channelId: params.channel,
accountId: params.accountId ?? undefined,
conversationId: params.to,
sessionKey: params.sessionKeyForInternalHooks,
messageId: event.messageId,
isGroup: params.mirrorIsGroup,
groupId: params.mirrorGroupId,