Tasks: add blocked flow retry state (#58204)

This commit is contained in:
Mariano
2026-03-31 09:33:26 +02:00
committed by GitHub
parent 4e67e7c02c
commit 8d942000c9
9 changed files with 476 additions and 13 deletions

View File

@@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
- Background tasks: turn tasks into a real shared background-run control plane instead of ACP-only bookkeeping by unifying ACP, subagent, cron, and background CLI execution under one SQLite-backed ledger, routing detached lifecycle updates through the executor seam, adding audit/maintenance/status visibility, tightening auto-cleanup and lost-run recovery, improving task awareness in internal status/tool surfaces, and clarifying the split between heartbeat/main-session automation and detached scheduled runs. Thanks @vincentkoc and @mbelinky.
- Flows/tasks: add a minimal SQLite-backed flow registry plus task-to-flow linkage scaffolding, so orchestrated work can start gaining a first-class parent record without changing current task delivery behavior.
- Flows/tasks: route one-task ACP and subagent updates through a parent flow owner context, so detached work can emerge back through the intended parent thread/session instead of speaking only as a raw child task.
- Flows/tasks: persist blocked state on one-task flows and let the same flow reopen cleanly on retry, so blocked detached work can carry a parent-level reason and continue without fragmenting into a new job.
- Matrix/history: add optional room history context for Matrix group triggers via `channels.matrix.historyLimit`, with per-agent watermarks and retry-safe snapshots so failed trigger retries do not drift into newer room messages. (#57022) thanks @chain710.
- Diffs: skip unused viewer-versus-file SSR preload work so `diffs` view-only and file-only runs do less render work while keeping mode outputs aligned. (#57909) thanks @gumadeiras.
- Matrix/threads: add per-DM `threadReplies` overrides and keep thread session isolation aligned with the effective room or DM thread policy from the triggering message onward. (#57995) thanks @teconomix.

View File

@@ -14,6 +14,8 @@ type FlowRegistryRow = {
notify_policy: FlowRecord["notifyPolicy"];
goal: string;
current_step: string | null;
blocked_task_id: string | null;
blocked_summary: string | null;
created_at: number | bigint;
updated_at: number | bigint;
ended_at: number | bigint | null;
@@ -70,6 +72,8 @@ function rowToFlowRecord(row: FlowRegistryRow): FlowRecord {
notifyPolicy: row.notify_policy,
goal: row.goal,
...(row.current_step ? { currentStep: row.current_step } : {}),
...(row.blocked_task_id ? { blockedTaskId: row.blocked_task_id } : {}),
...(row.blocked_summary ? { blockedSummary: row.blocked_summary } : {}),
createdAt: normalizeNumber(row.created_at) ?? 0,
updatedAt: normalizeNumber(row.updated_at) ?? 0,
...(endedAt != null ? { endedAt } : {}),
@@ -85,6 +89,8 @@ function bindFlowRecord(record: FlowRecord) {
notify_policy: record.notifyPolicy,
goal: record.goal,
current_step: record.currentStep ?? null,
blocked_task_id: record.blockedTaskId ?? null,
blocked_summary: record.blockedSummary ?? null,
created_at: record.createdAt,
updated_at: record.updatedAt,
ended_at: record.endedAt ?? null,
@@ -102,6 +108,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements {
notify_policy,
goal,
current_step,
blocked_task_id,
blocked_summary,
created_at,
updated_at,
ended_at
@@ -117,6 +125,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements {
notify_policy,
goal,
current_step,
blocked_task_id,
blocked_summary,
created_at,
updated_at,
ended_at
@@ -128,6 +138,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements {
@notify_policy,
@goal,
@current_step,
@blocked_task_id,
@blocked_summary,
@created_at,
@updated_at,
@ended_at
@@ -139,6 +151,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements {
notify_policy = excluded.notify_policy,
goal = excluded.goal,
current_step = excluded.current_step,
blocked_task_id = excluded.blocked_task_id,
blocked_summary = excluded.blocked_summary,
created_at = excluded.created_at,
updated_at = excluded.updated_at,
ended_at = excluded.ended_at
@@ -158,11 +172,15 @@ function ensureSchema(db: DatabaseSync) {
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
blocked_task_id TEXT,
blocked_summary TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
ended_at INTEGER
);
`);
ensureColumn(db, "flow_runs", "blocked_task_id", "TEXT");
ensureColumn(db, "flow_runs", "blocked_summary", "TEXT");
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_session_key ON flow_runs(owner_session_key);`,
@@ -170,6 +188,19 @@ function ensureSchema(db: DatabaseSync) {
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`);
}
function ensureColumn(
db: DatabaseSync,
tableName: string,
columnName: string,
columnDefinition: string,
) {
const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ name?: string }>;
if (rows.some((row) => row.name === columnName)) {
return;
}
db.exec(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${columnDefinition};`);
}
function ensureFlowRegistryPermissions(pathname: string) {
const dir = resolveFlowRegistryDir(process.env);
mkdirSync(dir, { recursive: true, mode: FLOW_REGISTRY_DIR_MODE });

View File

@@ -10,12 +10,15 @@ function createStoredFlow(): FlowRecord {
return {
flowId: "flow-restored",
ownerSessionKey: "agent:main:main",
status: "running",
status: "blocked",
notifyPolicy: "done_only",
goal: "Restored flow",
currentStep: "spawn_task",
blockedTaskId: "task-restored",
blockedSummary: "Writable session required.",
createdAt: 100,
updatedAt: 100,
endedAt: 120,
};
}
@@ -59,6 +62,8 @@ describe("flow-registry store runtime", () => {
expect(getFlowById("flow-restored")).toMatchObject({
flowId: "flow-restored",
goal: "Restored flow",
blockedTaskId: "task-restored",
blockedSummary: "Writable session required.",
});
expect(loadSnapshot).toHaveBeenCalledTimes(1);
@@ -110,7 +115,9 @@ describe("flow-registry store runtime", () => {
createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Secured flow",
status: "queued",
status: "blocked",
blockedTaskId: "task-secured",
blockedSummary: "Need auth.",
});
const registryDir = resolveFlowRegistryDir(process.env);

View File

@@ -6,6 +6,7 @@ import {
getFlowById,
listFlowRecords,
resetFlowRegistryForTests,
syncFlowFromTask,
updateFlowRecordById,
} from "./flow-registry.js";
@@ -130,4 +131,57 @@ describe("flow-registry", () => {
});
});
});
it("stores blocked metadata and clears it when a later task resumes the same flow", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetFlowRegistryForTests();
const created = createFlowRecord({
ownerSessionKey: "agent:main:main",
goal: "Fix permissions",
status: "running",
});
const blocked = syncFlowFromTask({
taskId: "task-blocked",
parentFlowId: created.flowId,
status: "succeeded",
terminalOutcome: "blocked",
notifyPolicy: "done_only",
label: "Fix permissions",
task: "Fix permissions",
lastEventAt: 200,
endedAt: 200,
terminalSummary: "Writable session required.",
});
expect(blocked).toMatchObject({
flowId: created.flowId,
status: "blocked",
blockedTaskId: "task-blocked",
blockedSummary: "Writable session required.",
endedAt: 200,
});
const resumed = syncFlowFromTask({
taskId: "task-retry",
parentFlowId: created.flowId,
status: "running",
notifyPolicy: "done_only",
label: "Fix permissions",
task: "Fix permissions",
lastEventAt: 260,
progressSummary: "Retrying with writable session",
});
expect(resumed).toMatchObject({
flowId: created.flowId,
status: "running",
});
expect(resumed?.blockedTaskId).toBeUndefined();
expect(resumed?.blockedSummary).toBeUndefined();
expect(resumed?.endedAt).toBeUndefined();
});
});
});

View File

@@ -25,6 +25,34 @@ function resolveFlowGoal(task: Pick<TaskRecord, "label" | "task">): string {
return task.label?.trim() || task.task.trim() || "Background task";
}
function resolveFlowBlockedSummary(
task: Pick<TaskRecord, "status" | "terminalOutcome" | "terminalSummary" | "progressSummary">,
): string | undefined {
if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") {
return undefined;
}
return task.terminalSummary?.trim() || task.progressSummary?.trim() || undefined;
}
type FlowRecordPatch = Partial<
Pick<
FlowRecord,
| "status"
| "notifyPolicy"
| "goal"
| "currentStep"
| "blockedTaskId"
| "blockedSummary"
| "updatedAt"
| "endedAt"
>
> & {
currentStep?: string | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
endedAt?: number | null;
};
export function deriveFlowStatusFromTask(
task: Pick<TaskRecord, "status" | "terminalOutcome">,
): FlowStatus {
@@ -89,6 +117,8 @@ export function createFlowRecord(params: {
notifyPolicy?: TaskNotifyPolicy;
goal: string;
currentStep?: string;
blockedTaskId?: string;
blockedSummary?: string;
createdAt?: number;
updatedAt?: number;
endedAt?: number;
@@ -103,6 +133,8 @@ export function createFlowRecord(params: {
notifyPolicy: ensureNotifyPolicy(params.notifyPolicy),
goal: params.goal,
currentStep: params.currentStep?.trim() || undefined,
blockedTaskId: params.blockedTaskId?.trim() || undefined,
blockedSummary: params.blockedSummary?.trim() || undefined,
createdAt: now,
updatedAt: params.updatedAt ?? now,
...(params.endedAt !== undefined ? { endedAt: params.endedAt } : {}),
@@ -116,6 +148,7 @@ export function createFlowForTask(params: {
task: Pick<
TaskRecord,
| "requesterSessionKey"
| "taskId"
| "notifyPolicy"
| "status"
| "terminalOutcome"
@@ -124,6 +157,8 @@ export function createFlowForTask(params: {
| "createdAt"
| "lastEventAt"
| "endedAt"
| "terminalSummary"
| "progressSummary"
>;
requesterOrigin?: FlowRecord["requesterOrigin"];
}): FlowRecord {
@@ -143,18 +178,16 @@ export function createFlowForTask(params: {
status: terminalFlowStatus,
notifyPolicy: params.task.notifyPolicy,
goal: resolveFlowGoal(params.task),
blockedTaskId:
terminalFlowStatus === "blocked" ? params.task.taskId.trim() || undefined : undefined,
blockedSummary: resolveFlowBlockedSummary(params.task),
createdAt: params.task.createdAt,
updatedAt: params.task.lastEventAt ?? params.task.createdAt,
...(endedAt !== undefined ? { endedAt } : {}),
});
}
export function updateFlowRecordById(
flowId: string,
patch: Partial<
Pick<FlowRecord, "status" | "notifyPolicy" | "goal" | "currentStep" | "updatedAt" | "endedAt">
>,
): FlowRecord | null {
export function updateFlowRecordById(flowId: string, patch: FlowRecordPatch): FlowRecord | null {
ensureFlowRegistryReady();
const current = flows.get(flowId);
if (!current) {
@@ -166,9 +199,19 @@ export function updateFlowRecordById(
...(patch.notifyPolicy ? { notifyPolicy: patch.notifyPolicy } : {}),
...(patch.goal ? { goal: patch.goal } : {}),
currentStep:
patch.currentStep === undefined ? current.currentStep : patch.currentStep.trim() || undefined,
patch.currentStep === undefined
? current.currentStep
: patch.currentStep?.trim() || undefined,
blockedTaskId:
patch.blockedTaskId === undefined
? current.blockedTaskId
: patch.blockedTaskId?.trim() || undefined,
blockedSummary:
patch.blockedSummary === undefined
? current.blockedSummary
: patch.blockedSummary?.trim() || undefined,
updatedAt: patch.updatedAt ?? Date.now(),
...(patch.endedAt !== undefined ? { endedAt: patch.endedAt } : {}),
endedAt: patch.endedAt === undefined ? current.endedAt : (patch.endedAt ?? undefined),
};
flows.set(flowId, next);
persistFlowUpsert(next);
@@ -186,6 +229,9 @@ export function syncFlowFromTask(
| "task"
| "lastEventAt"
| "endedAt"
| "taskId"
| "terminalSummary"
| "progressSummary"
>,
): FlowRecord | null {
const flowId = task.parentFlowId?.trim();
@@ -203,12 +249,15 @@ export function syncFlowFromTask(
status: terminalFlowStatus,
notifyPolicy: task.notifyPolicy,
goal: resolveFlowGoal(task),
blockedTaskId: terminalFlowStatus === "blocked" ? task.taskId.trim() || null : null,
blockedSummary:
terminalFlowStatus === "blocked" ? (resolveFlowBlockedSummary(task) ?? null) : null,
updatedAt: task.lastEventAt ?? Date.now(),
...(isTerminal
? {
endedAt: task.endedAt ?? task.lastEventAt ?? Date.now(),
}
: {}),
: { endedAt: null }),
});
}

View File

@@ -19,6 +19,8 @@ export type FlowRecord = {
notifyPolicy: TaskNotifyPolicy;
goal: string;
currentStep?: string;
blockedTaskId?: string;
blockedSummary?: string;
createdAt: number;
updatedAt: number;
endedAt?: number;

View File

@@ -7,10 +7,16 @@ import {
createRunningTaskRun,
failTaskRunByRunId,
recordTaskRunProgressByRunId,
retryBlockedFlowAsQueuedTaskRun,
retryBlockedFlowAsRunningTaskRun,
setDetachedTaskDeliveryStatusByRunId,
startTaskRunByRunId,
} from "./task-executor.js";
import { findTaskByRunId, resetTaskRegistryForTests } from "./task-registry.js";
import {
findLatestTaskForFlowId,
findTaskByRunId,
resetTaskRegistryForTests,
} from "./task-registry.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
const hoisted = vi.hoisted(() => {
@@ -196,4 +202,142 @@ describe("task-executor", () => {
expect(listFlowRecords()).toEqual([]);
});
});
it("records blocked metadata on one-task flows and reuses the same flow for queued retries", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
childSessionKey: "agent:codex:acp:child",
runId: "run-executor-blocked",
task: "Patch file",
startedAt: 10,
deliveryStatus: "pending",
});
completeTaskRunByRunId({
runId: "run-executor-blocked",
endedAt: 40,
lastEventAt: 40,
terminalOutcome: "blocked",
terminalSummary: "Writable session required.",
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "blocked",
blockedTaskId: created.taskId,
blockedSummary: "Writable session required.",
endedAt: 40,
});
const retried = retryBlockedFlowAsQueuedTaskRun({
flowId: created.parentFlowId!,
runId: "run-executor-retry",
childSessionKey: "agent:codex:acp:retry-child",
});
expect(retried).toMatchObject({
found: true,
retried: true,
previousTask: expect.objectContaining({
taskId: created.taskId,
}),
task: expect.objectContaining({
parentFlowId: created.parentFlowId,
parentTaskId: created.taskId,
status: "queued",
runId: "run-executor-retry",
}),
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "queued",
});
expect(getFlowById(created.parentFlowId!)?.blockedTaskId).toBeUndefined();
expect(getFlowById(created.parentFlowId!)?.blockedSummary).toBeUndefined();
expect(getFlowById(created.parentFlowId!)?.endedAt).toBeUndefined();
expect(findLatestTaskForFlowId(created.parentFlowId!)).toMatchObject({
taskId: retried.task?.taskId,
});
});
});
it("can reopen blocked one-task flows directly into a running retry", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "subagent",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:subagent:child",
runId: "run-executor-blocked-running",
task: "Write summary",
startedAt: 10,
deliveryStatus: "pending",
});
completeTaskRunByRunId({
runId: "run-executor-blocked-running",
endedAt: 40,
lastEventAt: 40,
terminalOutcome: "blocked",
terminalSummary: "Need write approval.",
});
const retried = retryBlockedFlowAsRunningTaskRun({
flowId: created.parentFlowId!,
runId: "run-executor-running-retry",
childSessionKey: "agent:codex:subagent:retry",
startedAt: 55,
lastEventAt: 55,
progressSummary: "Retrying with approval",
});
expect(retried).toMatchObject({
found: true,
retried: true,
task: expect.objectContaining({
parentFlowId: created.parentFlowId,
status: "running",
runId: "run-executor-running-retry",
progressSummary: "Retrying with approval",
}),
});
expect(getFlowById(created.parentFlowId!)).toMatchObject({
flowId: created.parentFlowId,
status: "running",
});
});
});
it("refuses to retry flows that are not currently blocked", async () => {
await withTaskExecutorStateDir(async () => {
const created = createRunningTaskRun({
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child",
runId: "run-executor-not-blocked",
task: "Patch file",
startedAt: 10,
deliveryStatus: "pending",
});
const retried = retryBlockedFlowAsQueuedTaskRun({
flowId: created.parentFlowId!,
runId: "run-should-not-exist",
});
expect(retried).toMatchObject({
found: true,
retried: false,
reason: "Flow is not blocked.",
});
expect(findTaskByRunId("run-should-not-exist")).toBeUndefined();
});
});
});

View File

@@ -1,9 +1,10 @@
import type { OpenClawConfig } from "../config/config.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { createFlowForTask, deleteFlowRecordById } from "./flow-registry.js";
import { createFlowForTask, deleteFlowRecordById, getFlowById } from "./flow-registry.js";
import {
cancelTaskById,
createTaskRecord,
findLatestTaskForFlowId,
linkTaskToFlowById,
markTaskLostById,
markTaskRunningByRunId,
@@ -94,6 +95,141 @@ export function createQueuedTaskRun(params: {
});
}
type RetryBlockedFlowResult = {
found: boolean;
retried: boolean;
reason?: string;
previousTask?: TaskRecord;
task?: TaskRecord;
};
type RetryBlockedFlowParams = {
flowId: string;
sourceId?: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
childSessionKey?: string;
agentId?: string;
runId?: string;
label?: string;
task?: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
status: "queued" | "running";
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
};
function resolveRetryableBlockedFlowTask(flowId: string): {
flowFound: boolean;
retryable: boolean;
latestTask?: TaskRecord;
reason?: string;
} {
const flow = getFlowById(flowId);
if (!flow) {
return {
flowFound: false,
retryable: false,
reason: "Flow not found.",
};
}
const latestTask = findLatestTaskForFlowId(flowId);
if (!latestTask) {
return {
flowFound: true,
retryable: false,
reason: "Flow has no retryable task.",
};
}
if (flow.status !== "blocked") {
return {
flowFound: true,
retryable: false,
latestTask,
reason: "Flow is not blocked.",
};
}
if (latestTask.status !== "succeeded" || latestTask.terminalOutcome !== "blocked") {
return {
flowFound: true,
retryable: false,
latestTask,
reason: "Latest flow task is not blocked.",
};
}
return {
flowFound: true,
retryable: true,
latestTask,
};
}
function retryBlockedFlowTask(params: RetryBlockedFlowParams): RetryBlockedFlowResult {
const resolved = resolveRetryableBlockedFlowTask(params.flowId);
if (!resolved.retryable || !resolved.latestTask) {
return {
found: resolved.flowFound,
retried: false,
reason: resolved.reason,
};
}
const flow = getFlowById(params.flowId);
if (!flow) {
return {
found: false,
retried: false,
reason: "Flow not found.",
previousTask: resolved.latestTask,
};
}
const task = createTaskRecord({
runtime: resolved.latestTask.runtime,
sourceId: params.sourceId ?? resolved.latestTask.sourceId,
requesterSessionKey: flow.ownerSessionKey,
requesterOrigin: params.requesterOrigin ?? flow.requesterOrigin,
parentFlowId: flow.flowId,
childSessionKey: params.childSessionKey,
parentTaskId: resolved.latestTask.taskId,
agentId: params.agentId ?? resolved.latestTask.agentId,
runId: params.runId,
label: params.label ?? resolved.latestTask.label,
task: params.task ?? resolved.latestTask.task,
preferMetadata: params.preferMetadata,
notifyPolicy: params.notifyPolicy ?? resolved.latestTask.notifyPolicy,
deliveryStatus: params.deliveryStatus ?? "pending",
status: params.status,
startedAt: params.startedAt,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
});
return {
found: true,
retried: true,
previousTask: resolved.latestTask,
task,
};
}
export function retryBlockedFlowAsQueuedTaskRun(
params: Omit<RetryBlockedFlowParams, "status" | "startedAt" | "lastEventAt" | "progressSummary">,
): RetryBlockedFlowResult {
return retryBlockedFlowTask({
...params,
status: "queued",
});
}
export function retryBlockedFlowAsRunningTaskRun(
params: Omit<RetryBlockedFlowParams, "status">,
): RetryBlockedFlowResult {
return retryBlockedFlowTask({
...params,
status: "running",
});
}
export function createRunningTaskRun(params: {
runtime: TaskRuntime;
sourceId?: string;

View File

@@ -1048,6 +1048,17 @@ export function createTaskRecord(params: {
kind: "upserted",
task: cloneTaskRecord(record),
}));
if (record.parentFlowId?.trim()) {
try {
syncFlowFromTask(record);
} catch (error) {
log.warn("Failed to sync parent flow from task create", {
taskId: record.taskId,
flowId: record.parentFlowId,
error,
});
}
}
if (isTerminalTaskStatus(record.status)) {
void maybeDeliverTaskTerminalUpdate(taskId);
}
@@ -1356,6 +1367,34 @@ export function findLatestTaskForSessionKey(sessionKey: string): TaskRecord | un
return task ? cloneTaskRecord(task) : undefined;
}
export function listTasksForFlowId(flowId: string): TaskRecord[] {
ensureTaskRegistryReady();
const normalizedFlowId = flowId.trim();
if (!normalizedFlowId) {
return [];
}
return [...tasks.values()]
.map((task, insertionIndex) =>
task.parentFlowId?.trim() === normalizedFlowId
? { ...cloneTaskRecord(task), insertionIndex }
: null,
)
.filter(
(
task,
): task is TaskRecord & {
insertionIndex: number;
} => Boolean(task),
)
.toSorted(compareTasksNewestFirst)
.map(({ insertionIndex: _, ...task }) => task);
}
export function findLatestTaskForFlowId(flowId: string): TaskRecord | undefined {
const task = listTasksForFlowId(flowId)[0];
return task ? cloneTaskRecord(task) : undefined;
}
export function listTasksForSessionKey(sessionKey: string): TaskRecord[] {
ensureTaskRegistryReady();
const key = normalizeSessionIndexKey(sessionKey);