mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-20 21:23:23 +00:00
refactor(plugin-sdk): share direct dm ingress helpers
This commit is contained in:
@@ -36,6 +36,7 @@ function createRuntimeHarness() {
|
||||
commands: {
|
||||
shouldComputeCommandAuthorized: vi.fn(() => true),
|
||||
},
|
||||
resolveCommandAuthorizedFromAuthorizers: vi.fn(() => true),
|
||||
routing: {
|
||||
resolveAgentRoute: vi.fn(({ accountId, peer }) => ({
|
||||
agentId: "agent-nostr",
|
||||
@@ -45,6 +46,7 @@ function createRuntimeHarness() {
|
||||
},
|
||||
session: {
|
||||
resolveStorePath: vi.fn(() => "/tmp/nostr-session-store"),
|
||||
readSessionUpdatedAt: vi.fn(() => undefined),
|
||||
recordInboundSession,
|
||||
},
|
||||
reply: {
|
||||
|
||||
@@ -4,22 +4,19 @@ import {
|
||||
} from "openclaw/plugin-sdk/channel-config-helpers";
|
||||
import { createChannelPairingController } from "openclaw/plugin-sdk/channel-pairing";
|
||||
import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result";
|
||||
import { resolveCommandAuthorizedFromAuthorizers } from "openclaw/plugin-sdk/command-auth";
|
||||
import {
|
||||
buildPassiveChannelStatusSummary,
|
||||
buildTrafficStatusSummary,
|
||||
} from "openclaw/plugin-sdk/extension-shared";
|
||||
import {
|
||||
readStoreAllowFromForDmPolicy,
|
||||
resolveDmGroupAccessWithLists,
|
||||
} from "openclaw/plugin-sdk/security-runtime";
|
||||
import {
|
||||
buildChannelConfigSchema,
|
||||
createChannelReplyPipeline,
|
||||
collectStatusIssuesFromLastError,
|
||||
createPreCryptoDirectDmAuthorizer,
|
||||
createDefaultChannelRuntimeState,
|
||||
DEFAULT_ACCOUNT_ID,
|
||||
dispatchInboundDirectDmWithRuntime,
|
||||
formatPairingApproveHint,
|
||||
resolveInboundDirectDmAccessWithRuntime,
|
||||
type ChannelPlugin,
|
||||
} from "../runtime-api.js";
|
||||
import type { NostrProfile } from "./config-schema.js";
|
||||
@@ -73,27 +70,25 @@ function isNostrSenderAllowed(senderPubkey: string, allowFrom: string[]): boolea
|
||||
}
|
||||
|
||||
async function resolveNostrDirectAccess(params: {
|
||||
cfg: Parameters<typeof resolveInboundDirectDmAccessWithRuntime>[0]["cfg"];
|
||||
accountId: string;
|
||||
dmPolicy: "pairing" | "allowlist" | "open" | "disabled";
|
||||
allowFrom: Array<string | number> | undefined;
|
||||
senderPubkey: string;
|
||||
rawBody: string;
|
||||
runtime: Parameters<typeof resolveInboundDirectDmAccessWithRuntime>[0]["runtime"];
|
||||
}) {
|
||||
const storeAllowFrom =
|
||||
params.dmPolicy === "pairing"
|
||||
? await readStoreAllowFromForDmPolicy({
|
||||
provider: "nostr",
|
||||
accountId: params.accountId,
|
||||
dmPolicy: params.dmPolicy,
|
||||
})
|
||||
: [];
|
||||
|
||||
return resolveDmGroupAccessWithLists({
|
||||
isGroup: false,
|
||||
return resolveInboundDirectDmAccessWithRuntime({
|
||||
cfg: params.cfg,
|
||||
channel: "nostr",
|
||||
accountId: params.accountId,
|
||||
dmPolicy: params.dmPolicy,
|
||||
allowFrom: params.allowFrom,
|
||||
storeAllowFrom,
|
||||
groupAllowFromFallbackToAllowFrom: false,
|
||||
isSenderAllowed: (allowEntries) => isNostrSenderAllowed(params.senderPubkey, allowEntries),
|
||||
senderId: params.senderPubkey,
|
||||
rawBody: params.rawBody,
|
||||
isSenderAllowed: isNostrSenderAllowed,
|
||||
runtime: params.runtime,
|
||||
modeWhenAccessGroupsOff: "configured",
|
||||
});
|
||||
}
|
||||
|
||||
@@ -288,170 +283,103 @@ export const nostrPlugin: ChannelPlugin<ResolvedNostrAccount> = {
|
||||
channel: "nostr",
|
||||
accountId: account.accountId,
|
||||
});
|
||||
const resolveInboundAccess = async (senderPubkey: string, rawBody: string) =>
|
||||
await resolveNostrDirectAccess({
|
||||
cfg: ctx.cfg,
|
||||
accountId: account.accountId,
|
||||
dmPolicy: account.config.dmPolicy ?? "pairing",
|
||||
allowFrom: account.config.allowFrom,
|
||||
senderPubkey,
|
||||
rawBody,
|
||||
runtime: {
|
||||
shouldComputeCommandAuthorized: runtime.channel.commands.shouldComputeCommandAuthorized,
|
||||
resolveCommandAuthorizedFromAuthorizers:
|
||||
runtime.channel.commands.resolveCommandAuthorizedFromAuthorizers,
|
||||
},
|
||||
});
|
||||
|
||||
// Track bus handle for metrics callback
|
||||
let busHandle: NostrBusHandle | null = null;
|
||||
|
||||
const authorizeSender = createPreCryptoDirectDmAuthorizer({
|
||||
resolveAccess: async (senderPubkey) => await resolveInboundAccess(senderPubkey, ""),
|
||||
issuePairingChallenge: async ({ senderId, reply }) => {
|
||||
await pairing.issueChallenge({
|
||||
senderId,
|
||||
senderIdLine: `Your Nostr pubkey: ${senderId}`,
|
||||
sendPairingReply: reply,
|
||||
onCreated: () => {
|
||||
ctx.log?.debug?.(`[${account.accountId}] nostr pairing request sender=${senderId}`);
|
||||
},
|
||||
onReplyError: (err) => {
|
||||
ctx.log?.warn?.(
|
||||
`[${account.accountId}] nostr pairing reply failed for ${senderId}: ${String(err)}`,
|
||||
);
|
||||
},
|
||||
});
|
||||
},
|
||||
onBlocked: ({ senderId, reason }) => {
|
||||
ctx.log?.debug?.(`[${account.accountId}] blocked Nostr sender ${senderId} (${reason})`);
|
||||
},
|
||||
});
|
||||
|
||||
const bus = await startNostrBus({
|
||||
accountId: account.accountId,
|
||||
privateKey: account.privateKey,
|
||||
relays: account.relays,
|
||||
authorizeSender: async ({ senderPubkey, reply }) => {
|
||||
const dmPolicy = account.config.dmPolicy ?? "pairing";
|
||||
const access = await resolveNostrDirectAccess({
|
||||
accountId: account.accountId,
|
||||
dmPolicy,
|
||||
allowFrom: account.config.allowFrom,
|
||||
senderPubkey,
|
||||
});
|
||||
if (access.decision === "allow") {
|
||||
return "allow";
|
||||
}
|
||||
if (access.decision === "pairing") {
|
||||
await pairing.issueChallenge({
|
||||
senderId: senderPubkey,
|
||||
senderIdLine: `Your Nostr pubkey: ${senderPubkey}`,
|
||||
sendPairingReply: reply,
|
||||
onCreated: () => {
|
||||
ctx.log?.debug?.(
|
||||
`[${account.accountId}] nostr pairing request sender=${senderPubkey}`,
|
||||
);
|
||||
},
|
||||
onReplyError: (err) => {
|
||||
ctx.log?.warn?.(
|
||||
`[${account.accountId}] nostr pairing reply failed for ${senderPubkey}: ${String(err)}`,
|
||||
);
|
||||
},
|
||||
});
|
||||
return "pairing";
|
||||
}
|
||||
ctx.log?.debug?.(
|
||||
`[${account.accountId}] blocked Nostr sender ${senderPubkey} (${access.reason})`,
|
||||
);
|
||||
return "block";
|
||||
},
|
||||
authorizeSender: async ({ senderPubkey, reply }) =>
|
||||
await authorizeSender({ senderId: senderPubkey, reply }),
|
||||
onMessage: async (senderPubkey, text, reply, meta) => {
|
||||
const dmPolicy = account.config.dmPolicy ?? "pairing";
|
||||
const access = await resolveNostrDirectAccess({
|
||||
accountId: account.accountId,
|
||||
dmPolicy,
|
||||
allowFrom: account.config.allowFrom,
|
||||
senderPubkey,
|
||||
});
|
||||
if (access.decision !== "allow") {
|
||||
const resolvedAccess = await resolveInboundAccess(senderPubkey, text);
|
||||
if (resolvedAccess.access.decision !== "allow") {
|
||||
ctx.log?.warn?.(
|
||||
`[${account.accountId}] dropping Nostr DM after preflight drift (${senderPubkey}, ${access.reason})`,
|
||||
`[${account.accountId}] dropping Nostr DM after preflight drift (${senderPubkey}, ${resolvedAccess.access.reason})`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const shouldComputeAuth = runtime.channel.commands.shouldComputeCommandAuthorized(
|
||||
text,
|
||||
ctx.cfg,
|
||||
);
|
||||
const senderAllowedForCommands = isNostrSenderAllowed(
|
||||
senderPubkey,
|
||||
access.effectiveAllowFrom,
|
||||
);
|
||||
const commandAuthorized = shouldComputeAuth
|
||||
? dmPolicy === "open"
|
||||
? true
|
||||
: resolveCommandAuthorizedFromAuthorizers({
|
||||
useAccessGroups: ctx.cfg.commands?.useAccessGroups !== false,
|
||||
authorizers: [
|
||||
{
|
||||
configured: access.effectiveAllowFrom.length > 0,
|
||||
allowed: senderAllowedForCommands,
|
||||
},
|
||||
],
|
||||
modeWhenAccessGroupsOff: "configured",
|
||||
})
|
||||
: undefined;
|
||||
|
||||
const route = runtime.channel.routing.resolveAgentRoute({
|
||||
await dispatchInboundDirectDmWithRuntime({
|
||||
cfg: ctx.cfg,
|
||||
runtime,
|
||||
channel: "nostr",
|
||||
channelLabel: "Nostr",
|
||||
accountId: account.accountId,
|
||||
peer: {
|
||||
kind: "direct",
|
||||
id: senderPubkey,
|
||||
},
|
||||
});
|
||||
const storePath = runtime.channel.session.resolveStorePath(ctx.cfg.session?.store, {
|
||||
agentId: route.agentId,
|
||||
});
|
||||
const body = runtime.channel.reply.formatAgentEnvelope({
|
||||
channel: "Nostr",
|
||||
from: senderPubkey,
|
||||
senderId: senderPubkey,
|
||||
senderAddress: `nostr:${senderPubkey}`,
|
||||
recipientAddress: `nostr:${account.publicKey}`,
|
||||
conversationLabel: senderPubkey,
|
||||
rawBody: text,
|
||||
messageId: meta.eventId,
|
||||
timestamp: meta.createdAt * 1000,
|
||||
envelope: runtime.channel.reply.resolveEnvelopeFormatOptions(ctx.cfg),
|
||||
body: text,
|
||||
});
|
||||
const ctxPayload = runtime.channel.reply.finalizeInboundContext({
|
||||
Body: body,
|
||||
BodyForAgent: text,
|
||||
RawBody: text,
|
||||
CommandBody: text,
|
||||
From: `nostr:${senderPubkey}`,
|
||||
To: `nostr:${account.publicKey}`,
|
||||
SessionKey: route.sessionKey,
|
||||
AccountId: route.accountId,
|
||||
ChatType: "direct",
|
||||
ConversationLabel: senderPubkey,
|
||||
SenderId: senderPubkey,
|
||||
Provider: "nostr",
|
||||
Surface: "nostr",
|
||||
MessageSid: meta.eventId,
|
||||
MessageSidFull: meta.eventId,
|
||||
CommandAuthorized: commandAuthorized,
|
||||
OriginatingChannel: "nostr",
|
||||
OriginatingTo: `nostr:${account.publicKey}`,
|
||||
});
|
||||
await runtime.channel.session.recordInboundSession({
|
||||
storePath,
|
||||
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
|
||||
ctx: ctxPayload,
|
||||
commandAuthorized: resolvedAccess.commandAuthorized,
|
||||
deliver: async (payload) => {
|
||||
const outboundText =
|
||||
payload && typeof payload === "object" && "text" in payload
|
||||
? String((payload as { text?: string }).text ?? "")
|
||||
: "";
|
||||
if (!outboundText.trim()) {
|
||||
return;
|
||||
}
|
||||
const tableMode = runtime.channel.text.resolveMarkdownTableMode({
|
||||
cfg: ctx.cfg,
|
||||
channel: "nostr",
|
||||
accountId: account.accountId,
|
||||
});
|
||||
await reply(runtime.channel.text.convertMarkdownTables(outboundText, tableMode));
|
||||
},
|
||||
onRecordError: (err) => {
|
||||
ctx.log?.error?.(
|
||||
`[${account.accountId}] failed recording Nostr inbound session: ${String(err)}`,
|
||||
);
|
||||
},
|
||||
});
|
||||
|
||||
const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({
|
||||
cfg: ctx.cfg,
|
||||
agentId: route.agentId,
|
||||
channel: "nostr",
|
||||
accountId: route.accountId,
|
||||
});
|
||||
await runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
|
||||
ctx: ctxPayload,
|
||||
cfg: ctx.cfg,
|
||||
dispatcherOptions: {
|
||||
...replyPipeline,
|
||||
deliver: async (payload) => {
|
||||
const outboundText =
|
||||
payload && typeof payload === "object" && "text" in payload
|
||||
? String((payload as { text?: string }).text ?? "")
|
||||
: "";
|
||||
if (!outboundText.trim()) {
|
||||
return;
|
||||
}
|
||||
const tableMode = runtime.channel.text.resolveMarkdownTableMode({
|
||||
cfg: ctx.cfg,
|
||||
channel: "nostr",
|
||||
accountId: route.accountId,
|
||||
});
|
||||
await reply(runtime.channel.text.convertMarkdownTables(outboundText, tableMode));
|
||||
},
|
||||
onError: (err, info) => {
|
||||
ctx.log?.error?.(
|
||||
`[${account.accountId}] Nostr ${info.kind} reply failed: ${String(err)}`,
|
||||
);
|
||||
},
|
||||
},
|
||||
replyOptions: {
|
||||
onModelSelected,
|
||||
onDispatchError: (err, info) => {
|
||||
ctx.log?.error?.(
|
||||
`[${account.accountId}] Nostr ${info.kind} reply failed: ${String(err)}`,
|
||||
);
|
||||
},
|
||||
});
|
||||
},
|
||||
|
||||
@@ -7,6 +7,10 @@ import {
|
||||
type Event,
|
||||
} from "nostr-tools";
|
||||
import { decrypt, encrypt } from "nostr-tools/nip04";
|
||||
import {
|
||||
createDirectDmPreCryptoGuardPolicy,
|
||||
type DirectDmPreCryptoGuardPolicyOverrides,
|
||||
} from "../runtime-api.js";
|
||||
import type { NostrProfile } from "./config-schema.js";
|
||||
import { DEFAULT_RELAYS } from "./default-relays.js";
|
||||
import {
|
||||
@@ -33,13 +37,7 @@ import { createSeenTracker, type SeenTracker } from "./seen-tracker.js";
|
||||
const STARTUP_LOOKBACK_SEC = 120; // tolerate relay lag / clock skew
|
||||
const MAX_PERSISTED_EVENT_IDS = 5000;
|
||||
const STATE_PERSIST_DEBOUNCE_MS = 5000; // Debounce state writes
|
||||
const MAX_EVENT_FUTURE_SKEW_SEC = 120;
|
||||
const MAX_CIPHERTEXT_BYTES = 16 * 1024;
|
||||
const MAX_PLAINTEXT_BYTES = 8 * 1024;
|
||||
const RATE_LIMIT_WINDOW_MS = 60_000;
|
||||
const MAX_EVENTS_PER_SENDER_PER_WINDOW = 20;
|
||||
const MAX_EVENTS_GLOBAL_PER_WINDOW = 200;
|
||||
const MAX_TRACKED_RATE_LIMIT_KEYS = 4096;
|
||||
const DEFAULT_INBOUND_GUARD_POLICY = createDirectDmPreCryptoGuardPolicy();
|
||||
|
||||
// Circuit breaker configuration
|
||||
const CIRCUIT_BREAKER_THRESHOLD = 5; // failures before opening
|
||||
@@ -71,6 +69,8 @@ export interface NostrBusOptions {
|
||||
senderPubkey: string;
|
||||
reply: (text: string) => Promise<void>;
|
||||
}) => Promise<"allow" | "block" | "pairing">;
|
||||
/** Override pre-crypto DM guardrails for tests or future channel tuning (optional) */
|
||||
guardPolicy?: DirectDmPreCryptoGuardPolicyOverrides;
|
||||
/** Called on errors (optional) */
|
||||
onError?: (error: Error, context: string) => void;
|
||||
/** Called on connection status changes (optional) */
|
||||
@@ -404,6 +404,14 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
|
||||
const pool = new SimplePool();
|
||||
const accountId = options.accountId ?? pk.slice(0, 16);
|
||||
const gatewayStartedAt = Math.floor(Date.now() / 1000);
|
||||
const guardPolicy = createDirectDmPreCryptoGuardPolicy({
|
||||
...DEFAULT_INBOUND_GUARD_POLICY,
|
||||
...options.guardPolicy,
|
||||
rateLimit: {
|
||||
...DEFAULT_INBOUND_GUARD_POLICY.rateLimit,
|
||||
...options.guardPolicy?.rateLimit,
|
||||
},
|
||||
});
|
||||
|
||||
// Initialize metrics
|
||||
const metrics = onMetric ? createMetrics(onMetric) : createNoopMetrics();
|
||||
@@ -467,13 +475,13 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
|
||||
|
||||
const inflight = new Set<string>();
|
||||
const perSenderRateLimiter = createFixedWindowRateLimiter({
|
||||
windowMs: RATE_LIMIT_WINDOW_MS,
|
||||
maxRequests: MAX_EVENTS_PER_SENDER_PER_WINDOW,
|
||||
maxTrackedKeys: MAX_TRACKED_RATE_LIMIT_KEYS,
|
||||
windowMs: guardPolicy.rateLimit.windowMs,
|
||||
maxRequests: guardPolicy.rateLimit.maxPerSenderPerWindow,
|
||||
maxTrackedKeys: guardPolicy.rateLimit.maxTrackedSenderKeys,
|
||||
});
|
||||
const globalRateLimiter = createFixedWindowRateLimiter({
|
||||
windowMs: RATE_LIMIT_WINDOW_MS,
|
||||
maxRequests: MAX_EVENTS_GLOBAL_PER_WINDOW,
|
||||
windowMs: guardPolicy.rateLimit.windowMs,
|
||||
maxRequests: guardPolicy.rateLimit.maxGlobalPerWindow,
|
||||
maxTrackedKeys: 1,
|
||||
});
|
||||
|
||||
@@ -508,12 +516,12 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.created_at > Math.floor(Date.now() / 1000) + MAX_EVENT_FUTURE_SKEW_SEC) {
|
||||
if (event.created_at > Math.floor(Date.now() / 1000) + guardPolicy.maxFutureSkewSec) {
|
||||
metrics.emit("event.rejected.future");
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.kind !== 4) {
|
||||
if (!guardPolicy.allowedKinds.includes(event.kind)) {
|
||||
metrics.emit("event.rejected.wrong_kind");
|
||||
return;
|
||||
}
|
||||
@@ -570,7 +578,7 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
|
||||
}
|
||||
updateRateLimiterSizeMetric();
|
||||
|
||||
if (Buffer.byteLength(event.content, "utf8") > MAX_CIPHERTEXT_BYTES) {
|
||||
if (Buffer.byteLength(event.content, "utf8") > guardPolicy.maxCiphertextBytes) {
|
||||
metrics.emit("event.rejected.oversized_ciphertext");
|
||||
return;
|
||||
}
|
||||
@@ -598,7 +606,7 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
|
||||
return;
|
||||
}
|
||||
|
||||
if (Buffer.byteLength(plaintext, "utf8") > MAX_PLAINTEXT_BYTES) {
|
||||
if (Buffer.byteLength(plaintext, "utf8") > guardPolicy.maxPlaintextBytes) {
|
||||
metrics.emit("event.rejected.oversized_plaintext");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3,6 +3,12 @@ export {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
} from "../auto-reply/inbound-debounce.js";
|
||||
export {
|
||||
createDirectDmPreCryptoGuardPolicy,
|
||||
dispatchInboundDirectDmWithRuntime,
|
||||
type DirectDmPreCryptoGuardPolicy,
|
||||
type DirectDmPreCryptoGuardPolicyOverrides,
|
||||
} from "./direct-dm.js";
|
||||
export {
|
||||
formatInboundEnvelope,
|
||||
formatInboundFromLabel,
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { resolveDmGroupAccessWithLists } from "../security/dm-policy-shared.js";
|
||||
export {
|
||||
createPreCryptoDirectDmAuthorizer,
|
||||
resolveInboundDirectDmAccessWithRuntime,
|
||||
type DirectDmCommandAuthorizationRuntime,
|
||||
type ResolvedInboundDirectDmAccess,
|
||||
} from "./direct-dm.js";
|
||||
|
||||
export {
|
||||
hasControlCommand,
|
||||
|
||||
169
src/plugin-sdk/direct-dm.test.ts
Normal file
169
src/plugin-sdk/direct-dm.test.ts
Normal file
@@ -0,0 +1,169 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import {
|
||||
createDirectDmPreCryptoGuardPolicy,
|
||||
createPreCryptoDirectDmAuthorizer,
|
||||
dispatchInboundDirectDmWithRuntime,
|
||||
resolveInboundDirectDmAccessWithRuntime,
|
||||
} from "./direct-dm.js";
|
||||
|
||||
const baseCfg = {
|
||||
commands: { useAccessGroups: true },
|
||||
} as unknown as OpenClawConfig;
|
||||
|
||||
describe("plugin-sdk/direct-dm", () => {
|
||||
it("resolves inbound DM access and command auth through one helper", async () => {
|
||||
const result = await resolveInboundDirectDmAccessWithRuntime({
|
||||
cfg: baseCfg,
|
||||
channel: "nostr",
|
||||
accountId: "default",
|
||||
dmPolicy: "pairing",
|
||||
allowFrom: [],
|
||||
senderId: "paired-user",
|
||||
rawBody: "/status",
|
||||
isSenderAllowed: (senderId, allowFrom) => allowFrom.includes(senderId),
|
||||
readStoreAllowFrom: async () => ["paired-user"],
|
||||
runtime: {
|
||||
shouldComputeCommandAuthorized: () => true,
|
||||
resolveCommandAuthorizedFromAuthorizers: ({ authorizers }) =>
|
||||
authorizers.some((entry) => entry.configured && entry.allowed),
|
||||
},
|
||||
modeWhenAccessGroupsOff: "configured",
|
||||
});
|
||||
|
||||
expect(result.access.decision).toBe("allow");
|
||||
expect(result.access.effectiveAllowFrom).toEqual(["paired-user"]);
|
||||
expect(result.senderAllowedForCommands).toBe(true);
|
||||
expect(result.commandAuthorized).toBe(true);
|
||||
});
|
||||
|
||||
it("creates a pre-crypto authorizer that issues pairing and blocks unknown senders", async () => {
|
||||
const issuePairingChallenge = vi.fn(async () => {});
|
||||
const onBlocked = vi.fn();
|
||||
const authorizer = createPreCryptoDirectDmAuthorizer({
|
||||
resolveAccess: async (senderId) => ({
|
||||
access:
|
||||
senderId === "pair-me"
|
||||
? {
|
||||
decision: "pairing" as const,
|
||||
reasonCode: "dm_policy_pairing_required",
|
||||
reason: "dmPolicy=pairing (not allowlisted)",
|
||||
effectiveAllowFrom: [],
|
||||
}
|
||||
: {
|
||||
decision: "block" as const,
|
||||
reasonCode: "dm_policy_disabled",
|
||||
reason: "dmPolicy=disabled",
|
||||
effectiveAllowFrom: [],
|
||||
},
|
||||
}),
|
||||
issuePairingChallenge,
|
||||
onBlocked,
|
||||
});
|
||||
|
||||
await expect(
|
||||
authorizer({
|
||||
senderId: "pair-me",
|
||||
reply: async () => {},
|
||||
}),
|
||||
).resolves.toBe("pairing");
|
||||
await expect(
|
||||
authorizer({
|
||||
senderId: "blocked",
|
||||
reply: async () => {},
|
||||
}),
|
||||
).resolves.toBe("block");
|
||||
|
||||
expect(issuePairingChallenge).toHaveBeenCalledTimes(1);
|
||||
expect(onBlocked).toHaveBeenCalledWith({
|
||||
senderId: "blocked",
|
||||
reason: "dmPolicy=disabled",
|
||||
reasonCode: "dm_policy_disabled",
|
||||
});
|
||||
});
|
||||
|
||||
it("builds a shared pre-crypto guard policy with partial overrides", () => {
|
||||
const policy = createDirectDmPreCryptoGuardPolicy({
|
||||
maxFutureSkewSec: 30,
|
||||
rateLimit: {
|
||||
maxPerSenderPerWindow: 5,
|
||||
},
|
||||
});
|
||||
|
||||
expect(policy.allowedKinds).toEqual([4]);
|
||||
expect(policy.maxFutureSkewSec).toBe(30);
|
||||
expect(policy.maxCiphertextBytes).toBe(16 * 1024);
|
||||
expect(policy.rateLimit.maxPerSenderPerWindow).toBe(5);
|
||||
expect(policy.rateLimit.maxGlobalPerWindow).toBe(200);
|
||||
});
|
||||
|
||||
it("dispatches direct DMs through the standard route/session/reply pipeline", async () => {
|
||||
const recordInboundSession = vi.fn(async () => {});
|
||||
const dispatchReplyWithBufferedBlockDispatcher = vi.fn(async ({ dispatcherOptions }) => {
|
||||
await dispatcherOptions.deliver({ text: "reply text" });
|
||||
});
|
||||
const deliver = vi.fn(async () => {});
|
||||
|
||||
const result = await dispatchInboundDirectDmWithRuntime({
|
||||
cfg: {
|
||||
session: { store: { type: "jsonl" } },
|
||||
} as never,
|
||||
runtime: {
|
||||
channel: {
|
||||
routing: {
|
||||
resolveAgentRoute: vi.fn(({ accountId, peer }) => ({
|
||||
agentId: "agent-main",
|
||||
accountId,
|
||||
sessionKey: `dm:${peer.id}`,
|
||||
})),
|
||||
},
|
||||
session: {
|
||||
resolveStorePath: vi.fn(() => "/tmp/direct-dm-session-store"),
|
||||
readSessionUpdatedAt: vi.fn(() => 1234),
|
||||
recordInboundSession,
|
||||
},
|
||||
reply: {
|
||||
resolveEnvelopeFormatOptions: vi.fn(() => ({ mode: "agent" })),
|
||||
formatAgentEnvelope: vi.fn(({ body }) => `env:${body}`),
|
||||
finalizeInboundContext: vi.fn((ctx) => ctx),
|
||||
dispatchReplyWithBufferedBlockDispatcher,
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
channel: "nostr",
|
||||
channelLabel: "Nostr",
|
||||
accountId: "default",
|
||||
peer: { kind: "direct", id: "sender-1" },
|
||||
senderId: "sender-1",
|
||||
senderAddress: "nostr:sender-1",
|
||||
recipientAddress: "nostr:bot-1",
|
||||
conversationLabel: "sender-1",
|
||||
rawBody: "hello world",
|
||||
messageId: "event-123",
|
||||
timestamp: 1_710_000_000_000,
|
||||
commandAuthorized: true,
|
||||
deliver,
|
||||
onRecordError: () => {},
|
||||
onDispatchError: () => {},
|
||||
});
|
||||
|
||||
expect(result.route).toMatchObject({
|
||||
agentId: "agent-main",
|
||||
accountId: "default",
|
||||
sessionKey: "dm:sender-1",
|
||||
});
|
||||
expect(result.storePath).toBe("/tmp/direct-dm-session-store");
|
||||
expect(result.ctxPayload).toMatchObject({
|
||||
Body: "env:hello world",
|
||||
BodyForAgent: "hello world",
|
||||
From: "nostr:sender-1",
|
||||
To: "nostr:bot-1",
|
||||
SenderId: "sender-1",
|
||||
MessageSid: "event-123",
|
||||
CommandAuthorized: true,
|
||||
});
|
||||
expect(recordInboundSession).toHaveBeenCalledTimes(1);
|
||||
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
|
||||
expect(deliver).toHaveBeenCalledWith({ text: "reply text" });
|
||||
});
|
||||
});
|
||||
314
src/plugin-sdk/direct-dm.ts
Normal file
314
src/plugin-sdk/direct-dm.ts
Normal file
@@ -0,0 +1,314 @@
|
||||
import type { FinalizedMsgContext } from "../auto-reply/templating.js";
|
||||
import type { ChannelId } from "../channels/plugins/types.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import {
|
||||
readStoreAllowFromForDmPolicy,
|
||||
resolveDmGroupAccessWithLists,
|
||||
type DmGroupAccessReasonCode,
|
||||
} from "../security/dm-policy-shared.js";
|
||||
import { resolveInboundRouteEnvelopeBuilderWithRuntime } from "./inbound-envelope.js";
|
||||
import { recordInboundSessionAndDispatchReply } from "./inbound-reply-dispatch.js";
|
||||
import type { OutboundReplyPayload } from "./reply-payload.js";
|
||||
|
||||
export type DirectDmCommandAuthorizationRuntime = {
|
||||
shouldComputeCommandAuthorized: (rawBody: string, cfg: OpenClawConfig) => boolean;
|
||||
resolveCommandAuthorizedFromAuthorizers: (params: {
|
||||
useAccessGroups: boolean;
|
||||
authorizers: Array<{ configured: boolean; allowed: boolean }>;
|
||||
modeWhenAccessGroupsOff?: "allow" | "deny" | "configured";
|
||||
}) => boolean;
|
||||
};
|
||||
|
||||
export type ResolvedInboundDirectDmAccess = {
|
||||
access: {
|
||||
decision: "allow" | "block" | "pairing";
|
||||
reasonCode: DmGroupAccessReasonCode;
|
||||
reason: string;
|
||||
effectiveAllowFrom: string[];
|
||||
};
|
||||
shouldComputeAuth: boolean;
|
||||
senderAllowedForCommands: boolean;
|
||||
commandAuthorized: boolean | undefined;
|
||||
};
|
||||
|
||||
/** Resolve direct-DM policy, effective allowlists, and optional command auth in one place. */
|
||||
export async function resolveInboundDirectDmAccessWithRuntime(params: {
|
||||
cfg: OpenClawConfig;
|
||||
channel: ChannelId;
|
||||
accountId: string;
|
||||
dmPolicy?: string | null;
|
||||
allowFrom?: Array<string | number> | null;
|
||||
senderId: string;
|
||||
rawBody: string;
|
||||
isSenderAllowed: (senderId: string, allowFrom: string[]) => boolean;
|
||||
runtime: DirectDmCommandAuthorizationRuntime;
|
||||
modeWhenAccessGroupsOff?: "allow" | "deny" | "configured";
|
||||
readStoreAllowFrom?: (provider: ChannelId, accountId: string) => Promise<string[]>;
|
||||
}): Promise<ResolvedInboundDirectDmAccess> {
|
||||
const dmPolicy = params.dmPolicy ?? "pairing";
|
||||
const storeAllowFrom =
|
||||
dmPolicy === "pairing"
|
||||
? await readStoreAllowFromForDmPolicy({
|
||||
provider: params.channel,
|
||||
accountId: params.accountId,
|
||||
dmPolicy,
|
||||
readStore: params.readStoreAllowFrom,
|
||||
})
|
||||
: [];
|
||||
|
||||
const access = resolveDmGroupAccessWithLists({
|
||||
isGroup: false,
|
||||
dmPolicy,
|
||||
allowFrom: params.allowFrom,
|
||||
storeAllowFrom,
|
||||
groupAllowFromFallbackToAllowFrom: false,
|
||||
isSenderAllowed: (allowEntries) => params.isSenderAllowed(params.senderId, allowEntries),
|
||||
});
|
||||
|
||||
const shouldComputeAuth = params.runtime.shouldComputeCommandAuthorized(
|
||||
params.rawBody,
|
||||
params.cfg,
|
||||
);
|
||||
const senderAllowedForCommands = params.isSenderAllowed(
|
||||
params.senderId,
|
||||
access.effectiveAllowFrom,
|
||||
);
|
||||
const commandAuthorized = shouldComputeAuth
|
||||
? dmPolicy === "open"
|
||||
? true
|
||||
: params.runtime.resolveCommandAuthorizedFromAuthorizers({
|
||||
useAccessGroups: params.cfg.commands?.useAccessGroups !== false,
|
||||
authorizers: [
|
||||
{
|
||||
configured: access.effectiveAllowFrom.length > 0,
|
||||
allowed: senderAllowedForCommands,
|
||||
},
|
||||
],
|
||||
modeWhenAccessGroupsOff: params.modeWhenAccessGroupsOff,
|
||||
})
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
access: {
|
||||
decision: access.decision,
|
||||
reasonCode: access.reasonCode,
|
||||
reason: access.reason,
|
||||
effectiveAllowFrom: access.effectiveAllowFrom,
|
||||
},
|
||||
shouldComputeAuth,
|
||||
senderAllowedForCommands,
|
||||
commandAuthorized,
|
||||
};
|
||||
}
|
||||
|
||||
/** Convert resolved DM policy into a pre-crypto allow/block/pairing callback. */
|
||||
export function createPreCryptoDirectDmAuthorizer(params: {
|
||||
resolveAccess: (
|
||||
senderId: string,
|
||||
) => Promise<Pick<ResolvedInboundDirectDmAccess, "access"> | ResolvedInboundDirectDmAccess>;
|
||||
issuePairingChallenge?: (params: {
|
||||
senderId: string;
|
||||
reply: (text: string) => Promise<void>;
|
||||
}) => Promise<void>;
|
||||
onBlocked?: (params: {
|
||||
senderId: string;
|
||||
reason: string;
|
||||
reasonCode: DmGroupAccessReasonCode;
|
||||
}) => void;
|
||||
}) {
|
||||
return async (input: {
|
||||
senderId: string;
|
||||
reply: (text: string) => Promise<void>;
|
||||
}): Promise<"allow" | "block" | "pairing"> => {
|
||||
const resolved = await params.resolveAccess(input.senderId);
|
||||
const access = "access" in resolved ? resolved.access : resolved;
|
||||
if (access.decision === "allow") {
|
||||
return "allow";
|
||||
}
|
||||
if (access.decision === "pairing") {
|
||||
if (params.issuePairingChallenge) {
|
||||
await params.issuePairingChallenge({
|
||||
senderId: input.senderId,
|
||||
reply: input.reply,
|
||||
});
|
||||
}
|
||||
return "pairing";
|
||||
}
|
||||
params.onBlocked?.({
|
||||
senderId: input.senderId,
|
||||
reason: access.reason,
|
||||
reasonCode: access.reasonCode,
|
||||
});
|
||||
return "block";
|
||||
};
|
||||
}
|
||||
|
||||
export type DirectDmPreCryptoGuardPolicy = {
|
||||
allowedKinds: readonly number[];
|
||||
maxFutureSkewSec: number;
|
||||
maxCiphertextBytes: number;
|
||||
maxPlaintextBytes: number;
|
||||
rateLimit: {
|
||||
windowMs: number;
|
||||
maxPerSenderPerWindow: number;
|
||||
maxGlobalPerWindow: number;
|
||||
maxTrackedSenderKeys: number;
|
||||
};
|
||||
};
|
||||
|
||||
export type DirectDmPreCryptoGuardPolicyOverrides = Partial<
|
||||
Omit<DirectDmPreCryptoGuardPolicy, "rateLimit">
|
||||
> & {
|
||||
rateLimit?: Partial<DirectDmPreCryptoGuardPolicy["rateLimit"]>;
|
||||
};
|
||||
|
||||
/** Shared policy object for DM-style pre-crypto guardrails. */
|
||||
export function createDirectDmPreCryptoGuardPolicy(
|
||||
overrides: DirectDmPreCryptoGuardPolicyOverrides = {},
|
||||
): DirectDmPreCryptoGuardPolicy {
|
||||
return {
|
||||
allowedKinds: overrides.allowedKinds ?? [4],
|
||||
maxFutureSkewSec: overrides.maxFutureSkewSec ?? 120,
|
||||
maxCiphertextBytes: overrides.maxCiphertextBytes ?? 16 * 1024,
|
||||
maxPlaintextBytes: overrides.maxPlaintextBytes ?? 8 * 1024,
|
||||
rateLimit: {
|
||||
windowMs: overrides.rateLimit?.windowMs ?? 60_000,
|
||||
maxPerSenderPerWindow: overrides.rateLimit?.maxPerSenderPerWindow ?? 20,
|
||||
maxGlobalPerWindow: overrides.rateLimit?.maxGlobalPerWindow ?? 200,
|
||||
maxTrackedSenderKeys: overrides.rateLimit?.maxTrackedSenderKeys ?? 4096,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
type DirectDmRoutePeer = {
|
||||
kind: "direct";
|
||||
id: string;
|
||||
};
|
||||
|
||||
type DirectDmRoute = {
|
||||
agentId: string;
|
||||
sessionKey: string;
|
||||
accountId?: string;
|
||||
};
|
||||
|
||||
type DirectDmRuntime = {
|
||||
channel: {
|
||||
routing: {
|
||||
resolveAgentRoute: (params: {
|
||||
cfg: OpenClawConfig;
|
||||
channel: string;
|
||||
accountId: string;
|
||||
peer: DirectDmRoutePeer;
|
||||
}) => DirectDmRoute;
|
||||
};
|
||||
session: {
|
||||
resolveStorePath: typeof import("../config/sessions.js").resolveStorePath;
|
||||
readSessionUpdatedAt: (params: {
|
||||
storePath: string;
|
||||
sessionKey: string;
|
||||
}) => number | undefined;
|
||||
recordInboundSession: typeof import("../channels/session.js").recordInboundSession;
|
||||
};
|
||||
reply: {
|
||||
resolveEnvelopeFormatOptions: (
|
||||
cfg: OpenClawConfig,
|
||||
) => ReturnType<typeof import("../auto-reply/envelope.js").resolveEnvelopeFormatOptions>;
|
||||
formatAgentEnvelope: typeof import("../auto-reply/envelope.js").formatAgentEnvelope;
|
||||
finalizeInboundContext: typeof import("../auto-reply/reply/inbound-context.js").finalizeInboundContext;
|
||||
dispatchReplyWithBufferedBlockDispatcher: typeof import("../auto-reply/reply/provider-dispatcher.js").dispatchReplyWithBufferedBlockDispatcher;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
/** Route, envelope, record, and dispatch one direct-DM turn through the standard pipeline. */
|
||||
export async function dispatchInboundDirectDmWithRuntime(params: {
|
||||
cfg: OpenClawConfig;
|
||||
runtime: DirectDmRuntime;
|
||||
channel: string;
|
||||
channelLabel: string;
|
||||
accountId: string;
|
||||
peer: DirectDmRoutePeer;
|
||||
senderId: string;
|
||||
senderAddress: string;
|
||||
recipientAddress: string;
|
||||
conversationLabel: string;
|
||||
rawBody: string;
|
||||
messageId: string;
|
||||
timestamp?: number;
|
||||
commandAuthorized?: boolean;
|
||||
bodyForAgent?: string;
|
||||
commandBody?: string;
|
||||
provider?: string;
|
||||
surface?: string;
|
||||
originatingChannel?: string;
|
||||
originatingTo?: string;
|
||||
extraContext?: Record<string, unknown>;
|
||||
deliver: (payload: OutboundReplyPayload) => Promise<void>;
|
||||
onRecordError: (err: unknown) => void;
|
||||
onDispatchError: (err: unknown, info: { kind: string }) => void;
|
||||
}): Promise<{
|
||||
route: DirectDmRoute;
|
||||
storePath: string;
|
||||
ctxPayload: FinalizedMsgContext;
|
||||
}> {
|
||||
const { route, buildEnvelope } = resolveInboundRouteEnvelopeBuilderWithRuntime({
|
||||
cfg: params.cfg,
|
||||
channel: params.channel,
|
||||
accountId: params.accountId,
|
||||
peer: params.peer,
|
||||
runtime: params.runtime.channel,
|
||||
sessionStore: params.cfg.session?.store,
|
||||
});
|
||||
|
||||
const { storePath, body } = buildEnvelope({
|
||||
channel: params.channelLabel,
|
||||
from: params.conversationLabel,
|
||||
body: params.rawBody,
|
||||
timestamp: params.timestamp,
|
||||
});
|
||||
|
||||
const ctxPayload = params.runtime.channel.reply.finalizeInboundContext({
|
||||
Body: body,
|
||||
BodyForAgent: params.bodyForAgent ?? params.rawBody,
|
||||
RawBody: params.rawBody,
|
||||
CommandBody: params.commandBody ?? params.rawBody,
|
||||
From: params.senderAddress,
|
||||
To: params.recipientAddress,
|
||||
SessionKey: route.sessionKey,
|
||||
AccountId: route.accountId ?? params.accountId,
|
||||
ChatType: "direct",
|
||||
ConversationLabel: params.conversationLabel,
|
||||
SenderId: params.senderId,
|
||||
Provider: params.provider ?? params.channel,
|
||||
Surface: params.surface ?? params.channel,
|
||||
MessageSid: params.messageId,
|
||||
MessageSidFull: params.messageId,
|
||||
Timestamp: params.timestamp,
|
||||
CommandAuthorized: params.commandAuthorized,
|
||||
OriginatingChannel: params.originatingChannel ?? params.channel,
|
||||
OriginatingTo: params.originatingTo ?? params.recipientAddress,
|
||||
...params.extraContext,
|
||||
});
|
||||
|
||||
await recordInboundSessionAndDispatchReply({
|
||||
cfg: params.cfg,
|
||||
channel: params.channel,
|
||||
accountId: route.accountId ?? params.accountId,
|
||||
agentId: route.agentId,
|
||||
routeSessionKey: route.sessionKey,
|
||||
storePath,
|
||||
ctxPayload,
|
||||
recordInboundSession: params.runtime.channel.session.recordInboundSession,
|
||||
dispatchReplyWithBufferedBlockDispatcher:
|
||||
params.runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
|
||||
deliver: params.deliver,
|
||||
onRecordError: params.onRecordError,
|
||||
onDispatchError: params.onDispatchError,
|
||||
});
|
||||
|
||||
return {
|
||||
route,
|
||||
storePath,
|
||||
ctxPayload,
|
||||
};
|
||||
}
|
||||
@@ -8,6 +8,16 @@ export type { ChannelSetupAdapter } from "../channels/plugins/types.adapters.js"
|
||||
export { formatPairingApproveHint } from "../channels/plugins/helpers.js";
|
||||
export type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
|
||||
export { createChannelReplyPipeline } from "./channel-reply-pipeline.js";
|
||||
export {
|
||||
createDirectDmPreCryptoGuardPolicy,
|
||||
dispatchInboundDirectDmWithRuntime,
|
||||
type DirectDmPreCryptoGuardPolicy,
|
||||
type DirectDmPreCryptoGuardPolicyOverrides,
|
||||
} from "./direct-dm.js";
|
||||
export {
|
||||
createPreCryptoDirectDmAuthorizer,
|
||||
resolveInboundDirectDmAccessWithRuntime,
|
||||
} from "./direct-dm.js";
|
||||
export type { OpenClawConfig } from "../config/config.js";
|
||||
export { MarkdownConfigSchema } from "../config/zod-schema.core.js";
|
||||
export { readJsonBodyWithLimit, requestBodyErrorToText } from "../infra/http-body.js";
|
||||
|
||||
@@ -346,8 +346,10 @@ describe("plugin-sdk subpath exports", () => {
|
||||
]);
|
||||
expectSourceMentions("channel-inbound", [
|
||||
"buildMentionRegexes",
|
||||
"createDirectDmPreCryptoGuardPolicy",
|
||||
"createChannelInboundDebouncer",
|
||||
"createInboundDebouncer",
|
||||
"dispatchInboundDirectDmWithRuntime",
|
||||
"formatInboundEnvelope",
|
||||
"formatInboundFromLabel",
|
||||
"formatLocationText",
|
||||
@@ -446,8 +448,10 @@ describe("plugin-sdk subpath exports", () => {
|
||||
"listNativeCommandSpecsForConfig",
|
||||
"listSkillCommandsForAgents",
|
||||
"normalizeCommandBody",
|
||||
"createPreCryptoDirectDmAuthorizer",
|
||||
"resolveCommandAuthorization",
|
||||
"resolveCommandAuthorizedFromAuthorizers",
|
||||
"resolveInboundDirectDmAccessWithRuntime",
|
||||
"resolveControlCommandGate",
|
||||
"resolveDualTextControlCommandGate",
|
||||
"resolveNativeCommandSessionTargets",
|
||||
|
||||
Reference in New Issue
Block a user