diff --git a/src/infra/outbound/delivery-queue-storage.ts b/src/infra/outbound/delivery-queue-storage.ts index e9bae14b5ca..073349fdf27 100644 --- a/src/infra/outbound/delivery-queue-storage.ts +++ b/src/infra/outbound/delivery-queue-storage.ts @@ -1,7 +1,5 @@ -import path from "node:path"; import type { ReplyPayload } from "../../auto-reply/types.js"; import type { RenderedMessageBatchPlanItem } from "../../channels/message/types.js"; -import { resolveStateDir } from "../../config/paths.js"; import type { ReplyToMode } from "../../config/types.js"; import type { DB as OpenClawStateKyselyDatabase } from "../../state/openclaw-state-db.generated.js"; import { @@ -21,7 +19,6 @@ import type { OutboundMirror } from "./mirror.js"; import type { OutboundSessionContext } from "./session-context.js"; import type { OutboundChannel } from "./targets.js"; -const QUEUE_DIRNAME = "delivery-queue"; const QUEUE_NAME = "outbound-delivery"; export type QueuedRenderedMessageBatchPlan = { @@ -73,11 +70,6 @@ export interface QueuedDelivery extends QueuedDeliveryPayload { recoveryState?: "send_attempt_started" | "unknown_after_send"; } -export function resolveQueueDir(stateDir?: string): string { - const base = stateDir ?? resolveStateDir(); - return path.join(base, QUEUE_DIRNAME); -} - type DeliveryQueueDatabase = Pick; type DeliveryQueueEntryRow = { @@ -171,10 +163,8 @@ function persistQueueEntry(entry: QueuedDelivery, stateDir?: string): void { }, databaseOptions(stateDir)); } -/** Ensure the queue directory (and failed/ subdirectory) exist. */ -export async function ensureQueueDir(stateDir?: string): Promise { +function ensureDeliveryQueueStorage(stateDir?: string): void { openOpenClawStateDatabase(databaseOptions(stateDir)); - return resolveQueueDir(stateDir); } /** Persist a delivery entry before attempting send. Returns the entry ID. */ @@ -182,7 +172,7 @@ export async function enqueueDelivery( params: QueuedDeliveryPayload, stateDir?: string, ): Promise { - await ensureQueueDir(stateDir); + ensureDeliveryQueueStorage(stateDir); const id = generateSecureUuid(); const entry: QueuedDelivery = { id, @@ -276,7 +266,6 @@ export async function markDeliveryPlatformOutcomeUnknown( persistQueueEntry(entry, stateDir); } -/** Load a single pending delivery entry by ID from the queue directory. */ export async function loadPendingDelivery( id: string, stateDir?: string, @@ -292,7 +281,6 @@ export async function loadPendingDelivery( return normalized.entry; } -/** Load all pending delivery entries from the queue. */ export async function loadPendingDeliveries(stateDir?: string): Promise { const stateDatabase = openOpenClawStateDatabase(databaseOptions(stateDir)); const db = getNodeSqliteKysely(stateDatabase.db); diff --git a/src/infra/outbound/delivery-queue.storage.test.ts b/src/infra/outbound/delivery-queue.storage.test.ts index 6e5dfe96cdd..1c67ee4fddb 100644 --- a/src/infra/outbound/delivery-queue.storage.test.ts +++ b/src/infra/outbound/delivery-queue.storage.test.ts @@ -190,7 +190,7 @@ describe("delivery-queue storage", () => { }); describe("moveToFailed", () => { - it("moves entry to failed/ subdirectory", async () => { + it("moves entry to failed status", async () => { const id = await enqueueTextDelivery( { channel: "workspace", @@ -208,7 +208,7 @@ describe("delivery-queue storage", () => { }); describe("loadPendingDeliveries", () => { - it("returns empty array when queue directory does not exist", async () => { + it("returns empty array when the SQLite queue is empty", async () => { expect(await loadPendingDeliveries(path.join(tmpDir(), "no-such-dir"))).toEqual([]); }); diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index 2ff6cf74ccd..51c2e195d62 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -1,7 +1,6 @@ export { ackDelivery, enqueueDelivery, - ensureQueueDir, failDelivery, loadPendingDelivery, loadPendingDeliveries, diff --git a/src/infra/session-delivery-queue-storage.ts b/src/infra/session-delivery-queue-storage.ts index 32c5e37009e..b296e5e367f 100644 --- a/src/infra/session-delivery-queue-storage.ts +++ b/src/infra/session-delivery-queue-storage.ts @@ -1,7 +1,5 @@ import { createHash } from "node:crypto"; -import path from "node:path"; import type { ChatType } from "../channels/chat-type.js"; -import { resolveStateDir } from "../config/paths.js"; import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js"; import { openOpenClawStateDatabase, @@ -15,7 +13,6 @@ import { } from "./kysely-sync.js"; import { generateSecureUuid } from "./secure-random.js"; -const QUEUE_DIRNAME = "session-delivery-queue"; const QUEUE_NAME = "session-delivery"; type SessionDeliveryContext = { @@ -89,21 +86,15 @@ function parseQueueEntry(row: DeliveryQueueEntryRow | undefined): QueuedSessionD return typeof entry.id === "string" ? entry : null; } -export function resolveSessionDeliveryQueueDir(stateDir?: string): string { - const base = stateDir ?? resolveStateDir(); - return path.join(base, QUEUE_DIRNAME); -} - -async function ensureSessionDeliveryQueueDir(stateDir?: string): Promise { +function ensureSessionDeliveryQueueStorage(stateDir?: string): void { openOpenClawStateDatabase(databaseOptions(stateDir)); - return resolveSessionDeliveryQueueDir(stateDir); } export async function enqueueSessionDelivery( params: QueuedSessionDeliveryPayload, stateDir?: string, ): Promise { - await ensureSessionDeliveryQueueDir(stateDir); + ensureSessionDeliveryQueueStorage(stateDir); const id = buildEntryId(params.idempotencyKey); if (params.idempotencyKey) { diff --git a/src/infra/session-delivery-queue.storage.test.ts b/src/infra/session-delivery-queue.storage.test.ts index 703d9d5fe79..ff20c0cd20a 100644 --- a/src/infra/session-delivery-queue.storage.test.ts +++ b/src/infra/session-delivery-queue.storage.test.ts @@ -1,4 +1,5 @@ import fs from "node:fs/promises"; +import path from "node:path"; import { describe, expect, it } from "vitest"; import { withTempDir } from "../test-helpers/temp-dir.js"; import { @@ -6,7 +7,6 @@ import { enqueueSessionDelivery, failSessionDelivery, loadPendingSessionDeliveries, - resolveSessionDeliveryQueueDir, } from "./session-delivery-queue.js"; describe("session-delivery queue storage", () => { @@ -72,7 +72,7 @@ describe("session-delivery queue storage", () => { await loadPendingSessionDeliveries(tempDir); - await expect(fs.access(resolveSessionDeliveryQueueDir(tempDir))).rejects.toMatchObject({ + await expect(fs.access(path.join(tempDir, "session-delivery-queue"))).rejects.toMatchObject({ code: "ENOENT", }); }); diff --git a/src/infra/session-delivery-queue.ts b/src/infra/session-delivery-queue.ts index 8c0d590d1c6..46552f7d41d 100644 --- a/src/infra/session-delivery-queue.ts +++ b/src/infra/session-delivery-queue.ts @@ -3,7 +3,6 @@ export { enqueueSessionDelivery, failSessionDelivery, loadPendingSessionDeliveries, - resolveSessionDeliveryQueueDir, } from "./session-delivery-queue-storage.js"; export type { QueuedSessionDelivery,