mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-26 07:57:40 +00:00
refactor: consolidate core runtime state helpers
This commit is contained in:
@@ -59,6 +59,14 @@ const queueState = resolveGlobalSingleton(COMMAND_QUEUE_STATE_KEY, () => ({
|
||||
nextTaskId: 1,
|
||||
}));
|
||||
|
||||
function normalizeLane(lane: string): string {
|
||||
return lane.trim() || CommandLane.Main;
|
||||
}
|
||||
|
||||
function getLaneDepth(state: LaneState): number {
|
||||
return state.queue.length + state.activeTaskIds.size;
|
||||
}
|
||||
|
||||
function getLaneState(lane: string): LaneState {
|
||||
const existing = queueState.lanes.get(lane);
|
||||
if (existing) {
|
||||
@@ -159,7 +167,7 @@ export function markGatewayDraining(): void {
|
||||
}
|
||||
|
||||
export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) {
|
||||
const cleaned = lane.trim() || CommandLane.Main;
|
||||
const cleaned = normalizeLane(lane);
|
||||
const state = getLaneState(cleaned);
|
||||
state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent));
|
||||
drainLane(cleaned);
|
||||
@@ -176,7 +184,7 @@ export function enqueueCommandInLane<T>(
|
||||
if (queueState.gatewayDraining) {
|
||||
return Promise.reject(new GatewayDrainingError());
|
||||
}
|
||||
const cleaned = lane.trim() || CommandLane.Main;
|
||||
const cleaned = normalizeLane(lane);
|
||||
const warnAfterMs = opts?.warnAfterMs ?? 2_000;
|
||||
const state = getLaneState(cleaned);
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
@@ -188,7 +196,7 @@ export function enqueueCommandInLane<T>(
|
||||
warnAfterMs,
|
||||
onWait: opts?.onWait,
|
||||
});
|
||||
logLaneEnqueue(cleaned, state.queue.length + state.activeTaskIds.size);
|
||||
logLaneEnqueue(cleaned, getLaneDepth(state));
|
||||
drainLane(cleaned);
|
||||
});
|
||||
}
|
||||
@@ -204,24 +212,24 @@ export function enqueueCommand<T>(
|
||||
}
|
||||
|
||||
export function getQueueSize(lane: string = CommandLane.Main) {
|
||||
const resolved = lane.trim() || CommandLane.Main;
|
||||
const resolved = normalizeLane(lane);
|
||||
const state = queueState.lanes.get(resolved);
|
||||
if (!state) {
|
||||
return 0;
|
||||
}
|
||||
return state.queue.length + state.activeTaskIds.size;
|
||||
return getLaneDepth(state);
|
||||
}
|
||||
|
||||
export function getTotalQueueSize() {
|
||||
let total = 0;
|
||||
for (const s of queueState.lanes.values()) {
|
||||
total += s.queue.length + s.activeTaskIds.size;
|
||||
total += getLaneDepth(s);
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
export function clearCommandLane(lane: string = CommandLane.Main) {
|
||||
const cleaned = lane.trim() || CommandLane.Main;
|
||||
const cleaned = normalizeLane(lane);
|
||||
const state = queueState.lanes.get(cleaned);
|
||||
if (!state) {
|
||||
return 0;
|
||||
|
||||
Reference in New Issue
Block a user