mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-07 07:58:36 +00:00
fix(diagnostics): export talk and recovery metrics
This commit is contained in:
@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
|
||||
- PR triage: mark external pull requests with `proof: supplied` when Barnacle finds structured real behavior proof, keep stale negative proof labels in sync across CRLF-edited PR bodies, and let ClawSweeper own the stronger `proof: sufficient` judgement.
|
||||
- Sessions CLI: show the selected agent runtime in the `openclaw sessions` table so terminal output matches the runtime visibility already present in JSON/status surfaces. Thanks @vincentkoc.
|
||||
- Talk/voice: unify realtime relay, transcription relay, managed-room handoff, Voice Call, Google Meet, VoiceClaw, and native clients around a shared Talk session controller and add the Gateway-managed `talk.session.*` RPC surface.
|
||||
- Diagnostics/Talk: export bounded Talk lifecycle/audio metrics and session recovery metrics through OpenTelemetry and Prometheus without exposing transcripts, audio payloads, room ids, turn ids, or session ids.
|
||||
- Google Meet/Voice Call: make Twilio dial-in joins speak through the realtime Gemini voice bridge with paced audio streaming, backpressure-aware buffering, barge-in queue clearing, same-session agent consult routing, duplicate-consult coalescing, and no TwiML fallback during realtime speech, giving Meet participants a much snappier OpenClaw voice agent. (#77064) Thanks @scoootscooob.
|
||||
- Voice Call/realtime: add opt-in OpenClaw agent voice context capsules and consult-cadence guidance so Gemini/OpenAI realtime calls can sound like the configured agent without consulting the full agent on every ordinary turn. Thanks @scoootscooob.
|
||||
- Docker/Gateway: harden the gateway container by dropping `NET_RAW` and `NET_ADMIN` capabilities and enabling `no-new-privileges` in the bundled `docker-compose.yml`. Thanks @VintageAyu.
|
||||
|
||||
@@ -70,11 +70,11 @@ openclaw plugins enable diagnostics-otel
|
||||
|
||||
## Signals exported
|
||||
|
||||
| Signal | What goes in it |
|
||||
| ----------- | ------------------------------------------------------------------------------------------------------------------------------------------ |
|
||||
| **Metrics** | Counters and histograms for token usage, cost, run duration, message flow, queue lanes, session state, exec, and memory pressure. |
|
||||
| **Traces** | Spans for model usage, model calls, harness lifecycle, tool execution, exec, webhook/message processing, context assembly, and tool loops. |
|
||||
| **Logs** | Structured `logging.file` records exported over OTLP when `diagnostics.otel.logs` is enabled. |
|
||||
| Signal | What goes in it |
|
||||
| ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| **Metrics** | Counters and histograms for token usage, cost, run duration, message flow, Talk events, queue lanes, session state/recovery, exec, and memory pressure. |
|
||||
| **Traces** | Spans for model usage, model calls, harness lifecycle, tool execution, exec, webhook/message processing, context assembly, and tool loops. |
|
||||
| **Logs** | Structured `logging.file` records exported over OTLP when `diagnostics.otel.logs` is enabled. |
|
||||
|
||||
Toggle `traces`, `metrics`, and `logs` independently. All three default to on
|
||||
when `diagnostics.otel.enabled` is true.
|
||||
@@ -129,6 +129,9 @@ Raw model/tool content is **not** exported by default. Spans carry bounded
|
||||
identifiers (channel, provider, model, error category, hash-only request ids)
|
||||
and never include prompt text, response text, tool inputs, tool outputs, or
|
||||
session keys.
|
||||
Talk metrics export only bounded event metadata such as mode, transport,
|
||||
provider, and event type. They do not include transcripts, audio payloads,
|
||||
session ids, turn ids, call ids, room ids, or handoff tokens.
|
||||
|
||||
Outbound model requests may include a W3C `traceparent` header. That header is
|
||||
generated only from OpenClaw-owned diagnostic trace context for the active model
|
||||
@@ -191,6 +194,12 @@ When any subkey is enabled, model and tool spans get bounded, redacted
|
||||
- `openclaw.message.delivery.started` (counter, attrs: `openclaw.channel`, `openclaw.delivery.kind`)
|
||||
- `openclaw.message.delivery.duration_ms` (histogram, attrs: `openclaw.channel`, `openclaw.delivery.kind`, `openclaw.outcome`, `openclaw.errorCategory`)
|
||||
|
||||
### Talk
|
||||
|
||||
- `openclaw.talk.event` (counter, attrs: `openclaw.talk.event_type`, `openclaw.talk.mode`, `openclaw.talk.transport`, `openclaw.talk.brain`, `openclaw.talk.provider`)
|
||||
- `openclaw.talk.event.duration_ms` (histogram, attrs: same as `openclaw.talk.event`; emitted when a Talk event reports duration)
|
||||
- `openclaw.talk.audio.bytes` (histogram, attrs: same as `openclaw.talk.event`; emitted for Talk audio frame events that report byte length)
|
||||
|
||||
### Queues and sessions
|
||||
|
||||
- `openclaw.queue.lane.enqueue` (counter, attrs: `openclaw.lane`)
|
||||
@@ -200,6 +209,9 @@ When any subkey is enabled, model and tool spans get bounded, redacted
|
||||
- `openclaw.session.state` (counter, attrs: `openclaw.state`, `openclaw.reason`)
|
||||
- `openclaw.session.stuck` (counter, attrs: `openclaw.state`; emitted only for stale session bookkeeping with no active work)
|
||||
- `openclaw.session.stuck_age_ms` (histogram, attrs: `openclaw.state`; emitted only for stale session bookkeeping with no active work)
|
||||
- `openclaw.session.recovery.requested` (counter, attrs: `openclaw.state`, `openclaw.action`, `openclaw.active_work_kind`, `openclaw.reason`)
|
||||
- `openclaw.session.recovery.completed` (counter, attrs: `openclaw.state`, `openclaw.action`, `openclaw.status`, `openclaw.active_work_kind`, `openclaw.reason`)
|
||||
- `openclaw.session.recovery.age_ms` (histogram, attrs: same as the matching recovery counter)
|
||||
- `openclaw.run.attempt` (counter, attrs: `openclaw.attempt`)
|
||||
|
||||
### Session liveness telemetry
|
||||
|
||||
@@ -102,12 +102,18 @@ For traces, logs, OTLP push, and OpenTelemetry GenAI semantic attributes, see [O
|
||||
| `openclaw_harness_run_duration_seconds` | histogram | `channel`, `error_category`, `harness`, `model`, `outcome`, `phase`, `plugin`, `provider` |
|
||||
| `openclaw_message_processed_total` | counter | `channel`, `outcome`, `reason` |
|
||||
| `openclaw_message_processed_duration_seconds` | histogram | `channel`, `outcome`, `reason` |
|
||||
| `openclaw_message_delivery_started_total` | counter | `channel`, `delivery_kind` |
|
||||
| `openclaw_message_delivery_total` | counter | `channel`, `delivery_kind`, `error_category`, `outcome` |
|
||||
| `openclaw_message_delivery_duration_seconds` | histogram | `channel`, `delivery_kind`, `error_category`, `outcome` |
|
||||
| `openclaw_talk_event_total` | counter | `brain`, `event_type`, `mode`, `provider`, `transport` |
|
||||
| `openclaw_talk_event_duration_seconds` | histogram | `brain`, `event_type`, `mode`, `provider`, `transport` |
|
||||
| `openclaw_talk_audio_bytes` | histogram | `brain`, `event_type`, `mode`, `provider`, `transport` |
|
||||
| `openclaw_queue_lane_size` | gauge | `lane` |
|
||||
| `openclaw_queue_lane_wait_seconds` | histogram | `lane` |
|
||||
| `openclaw_session_state_total` | counter | `reason`, `state` |
|
||||
| `openclaw_session_queue_depth` | gauge | `state` |
|
||||
| `openclaw_session_recovery_total` | counter | `action`, `active_work_kind`, `state`, `status` |
|
||||
| `openclaw_session_recovery_age_seconds` | histogram | `action`, `active_work_kind`, `state`, `status` |
|
||||
| `openclaw_memory_bytes` | gauge | `kind` |
|
||||
| `openclaw_memory_rss_bytes` | histogram | none |
|
||||
| `openclaw_memory_pressure_total` | counter | `level`, `reason` |
|
||||
@@ -131,6 +137,7 @@ For traces, logs, OTLP push, and OpenTelemetry GenAI semantic attributes, see [O
|
||||
</Accordion>
|
||||
<Accordion title="What never appears in Prometheus output">
|
||||
- prompt text, response text, tool inputs, tool outputs, system prompts
|
||||
- Talk transcripts, audio payloads, call ids, room ids, handoff tokens, turn ids, and raw session ids
|
||||
- raw provider request IDs (only bounded hashes, where applicable, on spans — never on metrics)
|
||||
- session keys and session IDs
|
||||
- hostnames, file paths, secret values
|
||||
|
||||
@@ -2478,6 +2478,118 @@ describe("diagnostics-otel service", () => {
|
||||
await service.stop?.(ctx);
|
||||
});
|
||||
|
||||
test("exports session recovery and talk metrics with bounded attributes", async () => {
|
||||
const service = createDiagnosticsOtelService();
|
||||
const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { metrics: true });
|
||||
await service.start(ctx);
|
||||
|
||||
emitTrustedDiagnosticEvent({
|
||||
type: "session.recovery.requested",
|
||||
sessionId: "session-should-not-export",
|
||||
sessionKey: "key-should-not-export",
|
||||
state: "processing",
|
||||
ageMs: 12_000,
|
||||
reason: "startup-sweep",
|
||||
activeWorkKind: "tool_call",
|
||||
allowActiveAbort: true,
|
||||
});
|
||||
emitTrustedDiagnosticEvent({
|
||||
type: "session.recovery.completed",
|
||||
sessionId: "session-should-not-export",
|
||||
sessionKey: "key-should-not-export",
|
||||
state: "processing",
|
||||
ageMs: 13_000,
|
||||
reason: "startup-sweep",
|
||||
activeWorkKind: "tool_call",
|
||||
status: "released",
|
||||
action: "abort-active-run",
|
||||
});
|
||||
emitTrustedDiagnosticEvent({
|
||||
type: "talk.event",
|
||||
sessionId: "talk-session-should-not-export",
|
||||
turnId: "turn-should-not-export",
|
||||
talkEventType: "input.audio.delta",
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: "openai",
|
||||
byteLength: 320,
|
||||
});
|
||||
emitTrustedDiagnosticEvent({
|
||||
type: "talk.event",
|
||||
sessionId: "talk-session-should-not-export",
|
||||
talkEventType: "latency.metrics",
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: "openai",
|
||||
durationMs: 45,
|
||||
});
|
||||
await flushDiagnosticEvents();
|
||||
|
||||
expect(
|
||||
telemetryState.counters.get("openclaw.session.recovery.requested")?.add,
|
||||
).toHaveBeenCalledWith(
|
||||
1,
|
||||
expect.objectContaining({
|
||||
"openclaw.state": "processing",
|
||||
"openclaw.action": "abort",
|
||||
"openclaw.active_work_kind": "tool_call",
|
||||
}),
|
||||
);
|
||||
expect(
|
||||
telemetryState.counters.get("openclaw.session.recovery.completed")?.add,
|
||||
).toHaveBeenCalledWith(
|
||||
1,
|
||||
expect.objectContaining({
|
||||
"openclaw.state": "processing",
|
||||
"openclaw.status": "released",
|
||||
"openclaw.action": "abort-active-run",
|
||||
}),
|
||||
);
|
||||
expect(
|
||||
telemetryState.histograms.get("openclaw.session.recovery.age_ms")?.record,
|
||||
).toHaveBeenCalledWith(
|
||||
13_000,
|
||||
expect.objectContaining({
|
||||
"openclaw.status": "released",
|
||||
}),
|
||||
);
|
||||
expect(telemetryState.counters.get("openclaw.talk.event")?.add).toHaveBeenCalledWith(1, {
|
||||
"openclaw.talk.brain": "agent-consult",
|
||||
"openclaw.talk.event_type": "input.audio.delta",
|
||||
"openclaw.talk.mode": "realtime",
|
||||
"openclaw.talk.provider": "openai",
|
||||
"openclaw.talk.transport": "gateway-relay",
|
||||
});
|
||||
expect(telemetryState.histograms.get("openclaw.talk.audio.bytes")?.record).toHaveBeenCalledWith(
|
||||
320,
|
||||
{
|
||||
"openclaw.talk.brain": "agent-consult",
|
||||
"openclaw.talk.event_type": "input.audio.delta",
|
||||
"openclaw.talk.mode": "realtime",
|
||||
"openclaw.talk.provider": "openai",
|
||||
"openclaw.talk.transport": "gateway-relay",
|
||||
},
|
||||
);
|
||||
expect(
|
||||
telemetryState.histograms.get("openclaw.talk.event.duration_ms")?.record,
|
||||
).toHaveBeenCalledWith(45, {
|
||||
"openclaw.talk.brain": "agent-consult",
|
||||
"openclaw.talk.event_type": "latency.metrics",
|
||||
"openclaw.talk.mode": "realtime",
|
||||
"openclaw.talk.provider": "openai",
|
||||
"openclaw.talk.transport": "gateway-relay",
|
||||
});
|
||||
|
||||
const talkCounterCalls = JSON.stringify(
|
||||
telemetryState.counters.get("openclaw.talk.event")?.add.mock.calls,
|
||||
);
|
||||
expect(talkCounterCalls).not.toContain("talk-session-should-not-export");
|
||||
expect(talkCounterCalls).not.toContain("turn-should-not-export");
|
||||
await service.stop?.(ctx);
|
||||
});
|
||||
|
||||
test("does not export model or tool content unless capture is explicitly enabled", async () => {
|
||||
const service = createDiagnosticsOtelService();
|
||||
const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { traces: true, metrics: true });
|
||||
|
||||
@@ -95,6 +95,7 @@ type SessionRecoveryDiagnosticEvent = Extract<
|
||||
DiagnosticEventPayload,
|
||||
{ type: "session.recovery.requested" | "session.recovery.completed" }
|
||||
>;
|
||||
type TalkDiagnosticEvent = Extract<DiagnosticEventPayload, { type: "talk.event" }>;
|
||||
|
||||
const NO_CONTENT_CAPTURE: OtelContentCapturePolicy = {
|
||||
inputMessages: false,
|
||||
@@ -844,6 +845,18 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
|
||||
description: "Age of sessions selected for recovery",
|
||||
},
|
||||
);
|
||||
const talkEventCounter = meter.createCounter("openclaw.talk.event", {
|
||||
unit: "1",
|
||||
description: "Talk events emitted by type",
|
||||
});
|
||||
const talkEventDurationHistogram = meter.createHistogram("openclaw.talk.event.duration_ms", {
|
||||
unit: "ms",
|
||||
description: "Talk event duration when reported",
|
||||
});
|
||||
const talkAudioBytesHistogram = meter.createHistogram("openclaw.talk.audio.bytes", {
|
||||
unit: "By",
|
||||
description: "Talk audio frame byte lengths",
|
||||
});
|
||||
const runAttemptCounter = meter.createCounter("openclaw.run.attempt", {
|
||||
unit: "1",
|
||||
description: "Run attempts",
|
||||
@@ -1526,6 +1539,28 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
|
||||
sessionRecoveryAgeHistogram.record(evt.ageMs, attrs);
|
||||
};
|
||||
|
||||
const talkEventAttrs = (evt: TalkDiagnosticEvent): Record<string, string> => ({
|
||||
"openclaw.talk.brain": lowCardinalityAttr(evt.brain),
|
||||
"openclaw.talk.event_type": lowCardinalityAttr(evt.talkEventType),
|
||||
"openclaw.talk.mode": lowCardinalityAttr(evt.mode),
|
||||
"openclaw.talk.provider": lowCardinalityAttr(evt.provider),
|
||||
"openclaw.talk.transport": lowCardinalityAttr(evt.transport),
|
||||
});
|
||||
|
||||
const recordTalkEvent = (evt: TalkDiagnosticEvent, metadata: DiagnosticEventMetadata) => {
|
||||
if (!metadata.trusted) {
|
||||
return;
|
||||
}
|
||||
const attrs = talkEventAttrs(evt);
|
||||
talkEventCounter.add(1, attrs);
|
||||
if (typeof evt.durationMs === "number") {
|
||||
talkEventDurationHistogram.record(evt.durationMs, attrs);
|
||||
}
|
||||
if (typeof evt.byteLength === "number") {
|
||||
talkAudioBytesHistogram.record(evt.byteLength, attrs);
|
||||
}
|
||||
};
|
||||
|
||||
const recordRunAttempt = (evt: Extract<DiagnosticEventPayload, { type: "run.attempt" }>) => {
|
||||
runAttemptCounter.add(1, { "openclaw.attempt": evt.attempt });
|
||||
};
|
||||
@@ -2283,6 +2318,9 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
|
||||
case "message.delivery.error":
|
||||
recordMessageDeliveryError(evt);
|
||||
return;
|
||||
case "talk.event":
|
||||
recordTalkEvent(evt, metadata);
|
||||
return;
|
||||
case "queue.lane.enqueue":
|
||||
recordLaneEnqueue(evt);
|
||||
return;
|
||||
|
||||
@@ -90,6 +90,17 @@ describe("diagnostics-prometheus service", () => {
|
||||
it("bounds messaging labels without exporting raw chat identifiers", () => {
|
||||
const store = __test__.createPrometheusMetricStore();
|
||||
|
||||
__test__.recordDiagnosticEvent(
|
||||
store,
|
||||
{
|
||||
...baseEvent(),
|
||||
type: "message.delivery.started",
|
||||
channel: "matrix",
|
||||
deliveryKind: "text",
|
||||
sessionKey: "session-should-not-export",
|
||||
},
|
||||
trusted,
|
||||
);
|
||||
__test__.recordDiagnosticEvent(
|
||||
store,
|
||||
{
|
||||
@@ -119,6 +130,9 @@ describe("diagnostics-prometheus service", () => {
|
||||
|
||||
const rendered = __test__.renderPrometheusMetrics(store);
|
||||
|
||||
expect(rendered).toContain(
|
||||
'openclaw_message_delivery_started_total{channel="matrix",delivery_kind="text"} 1',
|
||||
);
|
||||
expect(rendered).toContain(
|
||||
'openclaw_message_processed_total{channel="unknown",outcome="completed",reason="none"} 1',
|
||||
);
|
||||
@@ -127,9 +141,69 @@ describe("diagnostics-prometheus service", () => {
|
||||
);
|
||||
expect(rendered).not.toContain("chat-should-not-export");
|
||||
expect(rendered).not.toContain("message-should-not-export");
|
||||
expect(rendered).not.toContain("session-should-not-export");
|
||||
expect(rendered).not.toContain("progress draft");
|
||||
});
|
||||
|
||||
it("records session recovery and talk metrics without exporting raw ids or content", () => {
|
||||
const store = __test__.createPrometheusMetricStore();
|
||||
|
||||
__test__.recordDiagnosticEvent(
|
||||
store,
|
||||
{
|
||||
...baseEvent(),
|
||||
type: "session.recovery.completed",
|
||||
sessionId: "session-should-not-export",
|
||||
sessionKey: "key-should-not-export",
|
||||
state: "processing",
|
||||
stateGeneration: 2,
|
||||
ageMs: 12_000,
|
||||
queueDepth: 1,
|
||||
reason: "startup-sweep",
|
||||
activeWorkKind: "tool_call",
|
||||
allowActiveAbort: true,
|
||||
status: "released",
|
||||
action: "abort-active-run",
|
||||
},
|
||||
trusted,
|
||||
);
|
||||
__test__.recordDiagnosticEvent(
|
||||
store,
|
||||
{
|
||||
...baseEvent(),
|
||||
type: "talk.event",
|
||||
sessionId: "talk-session-should-not-export",
|
||||
turnId: "turn-should-not-export",
|
||||
talkEventType: "input.audio.delta",
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: "openai",
|
||||
byteLength: 320,
|
||||
},
|
||||
trusted,
|
||||
);
|
||||
|
||||
const rendered = __test__.renderPrometheusMetrics(store);
|
||||
|
||||
expect(rendered).toContain(
|
||||
'openclaw_session_recovery_total{action="abort-active-run",active_work_kind="tool_call",state="processing",status="released"} 1',
|
||||
);
|
||||
expect(rendered).toContain(
|
||||
'openclaw_session_recovery_age_seconds_sum{action="abort-active-run",active_work_kind="tool_call",state="processing",status="released"} 12',
|
||||
);
|
||||
expect(rendered).toContain(
|
||||
'openclaw_talk_event_total{brain="agent-consult",event_type="input.audio.delta",mode="realtime",provider="openai",transport="gateway-relay"} 1',
|
||||
);
|
||||
expect(rendered).toContain(
|
||||
'openclaw_talk_audio_bytes_sum{brain="agent-consult",event_type="input.audio.delta",mode="realtime",provider="openai",transport="gateway-relay"} 320',
|
||||
);
|
||||
expect(rendered).not.toContain("session-should-not-export");
|
||||
expect(rendered).not.toContain("key-should-not-export");
|
||||
expect(rendered).not.toContain("talk-session-should-not-export");
|
||||
expect(rendered).not.toContain("turn-should-not-export");
|
||||
});
|
||||
|
||||
it("caps metric series growth and reports dropped series", () => {
|
||||
const store = __test__.createPrometheusMetricStore();
|
||||
|
||||
|
||||
@@ -351,6 +351,35 @@ function harnessLabels(evt: {
|
||||
};
|
||||
}
|
||||
|
||||
function sessionRecoveryLabels(
|
||||
evt: Extract<
|
||||
DiagnosticEventPayload,
|
||||
{ type: "session.recovery.requested" | "session.recovery.completed" }
|
||||
>,
|
||||
): LabelSet {
|
||||
return {
|
||||
action:
|
||||
evt.type === "session.recovery.completed"
|
||||
? lowCardinalityLabel(evt.action, "unknown")
|
||||
: evt.allowActiveAbort
|
||||
? "abort"
|
||||
: "recover",
|
||||
active_work_kind: lowCardinalityLabel(evt.activeWorkKind, "none"),
|
||||
state: evt.state,
|
||||
status: evt.type === "session.recovery.completed" ? evt.status : "requested",
|
||||
};
|
||||
}
|
||||
|
||||
function talkLabels(evt: Extract<DiagnosticEventPayload, { type: "talk.event" }>): LabelSet {
|
||||
return {
|
||||
brain: lowCardinalityLabel(evt.brain),
|
||||
event_type: lowCardinalityLabel(evt.talkEventType),
|
||||
mode: lowCardinalityLabel(evt.mode),
|
||||
provider: lowCardinalityLabel(evt.provider),
|
||||
transport: lowCardinalityLabel(evt.transport),
|
||||
};
|
||||
}
|
||||
|
||||
function recordModelUsage(
|
||||
store: PrometheusMetricStore,
|
||||
evt: Extract<DiagnosticEventPayload, { type: "model.usage" }>,
|
||||
@@ -497,6 +526,16 @@ function recordDiagnosticEvent(
|
||||
seconds(evt.durationMs),
|
||||
);
|
||||
return;
|
||||
case "message.delivery.started":
|
||||
store.counter(
|
||||
"openclaw_message_delivery_started_total",
|
||||
"Outbound message delivery attempts started.",
|
||||
{
|
||||
channel: lowCardinalityLabel(evt.channel),
|
||||
delivery_kind: lowCardinalityLabel(evt.deliveryKind, "other"),
|
||||
},
|
||||
);
|
||||
return;
|
||||
case "message.delivery.completed":
|
||||
case "message.delivery.error":
|
||||
store.counter(
|
||||
@@ -527,6 +566,36 @@ function recordDiagnosticEvent(
|
||||
seconds(evt.durationMs),
|
||||
);
|
||||
return;
|
||||
case "talk.event":
|
||||
store.counter("openclaw_talk_event_total", "Talk events emitted by type.", talkLabels(evt));
|
||||
store.histogram(
|
||||
"openclaw_talk_event_duration_seconds",
|
||||
"Talk event duration in seconds when reported.",
|
||||
talkLabels(evt),
|
||||
seconds(evt.durationMs),
|
||||
);
|
||||
store.histogram(
|
||||
"openclaw_talk_audio_bytes",
|
||||
"Talk audio frame byte lengths.",
|
||||
talkLabels(evt),
|
||||
numericValue(evt.byteLength),
|
||||
BYTE_BUCKETS,
|
||||
);
|
||||
return;
|
||||
case "session.recovery.requested":
|
||||
case "session.recovery.completed":
|
||||
store.counter(
|
||||
"openclaw_session_recovery_total",
|
||||
"Session recovery observations by status and action.",
|
||||
sessionRecoveryLabels(evt),
|
||||
);
|
||||
store.histogram(
|
||||
"openclaw_session_recovery_age_seconds",
|
||||
"Age of sessions selected for recovery in seconds.",
|
||||
sessionRecoveryLabels(evt),
|
||||
seconds(evt.ageMs),
|
||||
);
|
||||
return;
|
||||
case "queue.lane.enqueue":
|
||||
case "queue.lane.dequeue":
|
||||
store.gauge(
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
createRealtimeVoiceAgentTalkbackQueue,
|
||||
createTalkSessionController,
|
||||
createRealtimeVoiceBridgeSession,
|
||||
recordTalkDiagnosticEvent,
|
||||
type RealtimeVoiceAgentTalkbackQueue,
|
||||
type RealtimeVoiceBridgeSession,
|
||||
type RealtimeVoiceProviderPlugin,
|
||||
@@ -359,13 +360,16 @@ export async function startNodeRealtimeAudioBridge(params: {
|
||||
const transcript: GoogleMeetRealtimeTranscriptEntry[] = [];
|
||||
const realtimeEvents: GoogleMeetRealtimeEventEntry[] = [];
|
||||
const strategy = params.config.realtime.strategy;
|
||||
const talk: TalkSessionController = createTalkSessionController({
|
||||
sessionId: `google-meet:${params.meetingSessionId}:${params.bridgeId}:node-realtime`,
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: strategy === "bidi" ? "direct-tools" : "agent-consult",
|
||||
provider: resolved.provider.id,
|
||||
});
|
||||
const talk: TalkSessionController = createTalkSessionController(
|
||||
{
|
||||
sessionId: `google-meet:${params.meetingSessionId}:${params.bridgeId}:node-realtime`,
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: strategy === "bidi" ? "direct-tools" : "agent-consult",
|
||||
provider: resolved.provider.id,
|
||||
},
|
||||
{ onEvent: recordTalkDiagnosticEvent },
|
||||
);
|
||||
const recentTalkEvents: TalkEvent[] = [];
|
||||
const rememberTalkEvent = (event: TalkEvent | undefined): void => {
|
||||
if (event) {
|
||||
|
||||
@@ -23,6 +23,7 @@ import {
|
||||
REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ,
|
||||
REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ,
|
||||
recordRealtimeVoiceBridgeEvent,
|
||||
recordTalkDiagnosticEvent,
|
||||
recordRealtimeVoiceTranscript,
|
||||
resamplePcm,
|
||||
resolveConfiguredRealtimeVoiceProvider,
|
||||
@@ -485,14 +486,17 @@ export async function startCommandAgentAudioBridge(params: {
|
||||
fullConfig: params.fullConfig,
|
||||
providers: params.providers,
|
||||
});
|
||||
const talk = createTalkSessionController({
|
||||
sessionId: `google-meet:${params.meetingSessionId}:agent`,
|
||||
mode: "stt-tts",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: resolved.provider.id,
|
||||
turnIdPrefix: `google-meet:${params.meetingSessionId}:turn`,
|
||||
});
|
||||
const talk = createTalkSessionController(
|
||||
{
|
||||
sessionId: `google-meet:${params.meetingSessionId}:agent`,
|
||||
mode: "stt-tts",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: resolved.provider.id,
|
||||
turnIdPrefix: `google-meet:${params.meetingSessionId}:turn`,
|
||||
},
|
||||
{ onEvent: recordTalkDiagnosticEvent },
|
||||
);
|
||||
const recentTalkEvents: TalkEvent[] = [];
|
||||
const emitTalkEvent = (input: TalkEventInput) =>
|
||||
pushGoogleMeetTalkEvent(recentTalkEvents, talk.emit(input));
|
||||
@@ -1034,13 +1038,16 @@ export async function startCommandRealtimeAudioBridge(params: {
|
||||
);
|
||||
const transcript: GoogleMeetRealtimeTranscriptEntry[] = [];
|
||||
const realtimeEvents: GoogleMeetRealtimeEventEntry[] = [];
|
||||
const talk: TalkSessionController = createTalkSessionController({
|
||||
sessionId: `google-meet:${params.meetingSessionId}:command-realtime`,
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: strategy === "bidi" ? "direct-tools" : "agent-consult",
|
||||
provider: resolved.provider.id,
|
||||
});
|
||||
const talk: TalkSessionController = createTalkSessionController(
|
||||
{
|
||||
sessionId: `google-meet:${params.meetingSessionId}:command-realtime`,
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: strategy === "bidi" ? "direct-tools" : "agent-consult",
|
||||
provider: resolved.provider.id,
|
||||
},
|
||||
{ onEvent: recordTalkDiagnosticEvent },
|
||||
);
|
||||
const recentTalkEvents: TalkEvent[] = [];
|
||||
const rememberTalkEvent = (event: TalkEvent | undefined): void => {
|
||||
if (event) {
|
||||
|
||||
@@ -16,6 +16,7 @@ import type {
|
||||
} from "openclaw/plugin-sdk/realtime-transcription";
|
||||
import {
|
||||
createTalkSessionController,
|
||||
recordTalkDiagnosticEvent,
|
||||
type TalkEvent,
|
||||
type TalkEventInput,
|
||||
type TalkSessionController,
|
||||
@@ -784,14 +785,17 @@ export class MediaStreamHandler {
|
||||
}
|
||||
|
||||
private createTalkEvents(callId: string, streamSid: string): TalkSessionController {
|
||||
return createTalkSessionController({
|
||||
sessionId: `voice-call:${callId}:${streamSid}`,
|
||||
mode: "stt-tts",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: this.config.transcriptionProvider.id,
|
||||
turnIdPrefix: `${streamSid}:turn`,
|
||||
});
|
||||
return createTalkSessionController(
|
||||
{
|
||||
sessionId: `voice-call:${callId}:${streamSid}`,
|
||||
mode: "stt-tts",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: this.config.transcriptionProvider.id,
|
||||
turnIdPrefix: `${streamSid}:turn`,
|
||||
},
|
||||
{ onEvent: recordTalkDiagnosticEvent },
|
||||
);
|
||||
}
|
||||
|
||||
private emitTalkEvent(session: StreamSession, input: TalkEventInput): void {
|
||||
|
||||
@@ -7,6 +7,7 @@ import {
|
||||
createTalkSessionController,
|
||||
createRealtimeVoiceBridgeSession,
|
||||
REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
|
||||
recordTalkDiagnosticEvent,
|
||||
type RealtimeVoiceBridgeSession,
|
||||
type RealtimeVoiceProviderConfig,
|
||||
type RealtimeVoiceProviderPlugin,
|
||||
@@ -507,13 +508,16 @@ export class RealtimeCallHandler {
|
||||
|
||||
const { callId, initialGreetingInstructions } = registration;
|
||||
const callRecord = this.manager.getCallByProviderCallId(callSid);
|
||||
const talk: TalkSessionController = createTalkSessionController({
|
||||
sessionId: `voice-call:${callId}:realtime`,
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: this.realtimeProvider.id,
|
||||
});
|
||||
const talk: TalkSessionController = createTalkSessionController(
|
||||
{
|
||||
sessionId: `voice-call:${callId}:realtime`,
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: this.realtimeProvider.id,
|
||||
},
|
||||
{ onEvent: recordTalkDiagnosticEvent },
|
||||
);
|
||||
const rememberTalkEvent = (event: TalkEvent | undefined): TalkEvent | undefined => {
|
||||
if (event) {
|
||||
appendRecentTalkEventMetadata(callRecord, event);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { createHash, randomBytes, randomUUID } from "node:crypto";
|
||||
import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js";
|
||||
import {
|
||||
createTalkSessionController,
|
||||
type TalkBrain,
|
||||
@@ -319,13 +320,16 @@ function createTalkHandoffRoom(params: {
|
||||
provider?: string;
|
||||
}): TalkHandoffRoomState {
|
||||
return {
|
||||
talk: createTalkSessionController({
|
||||
sessionId: params.roomId,
|
||||
mode: params.mode,
|
||||
transport: params.transport,
|
||||
brain: params.brain,
|
||||
provider: params.provider,
|
||||
}),
|
||||
talk: createTalkSessionController(
|
||||
{
|
||||
sessionId: params.roomId,
|
||||
mode: params.mode,
|
||||
transport: params.transport,
|
||||
brain: params.brain,
|
||||
provider: params.provider,
|
||||
},
|
||||
{ onEvent: recordTalkDiagnosticEvent },
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js";
|
||||
import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js";
|
||||
import {
|
||||
REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ,
|
||||
type RealtimeVoiceBrowserAudioContract,
|
||||
@@ -160,13 +161,16 @@ export function createTalkRealtimeRelaySession(
|
||||
enforceRelaySessionLimits(params.connId);
|
||||
const relaySessionId = randomUUID();
|
||||
const expiresAtMs = Date.now() + RELAY_SESSION_TTL_MS;
|
||||
const talk = createTalkSessionController({
|
||||
sessionId: relaySessionId,
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: params.provider.id,
|
||||
});
|
||||
const talk = createTalkSessionController(
|
||||
{
|
||||
sessionId: relaySessionId,
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: params.provider.id,
|
||||
},
|
||||
{ onEvent: recordTalkDiagnosticEvent },
|
||||
);
|
||||
let relay: RelaySession | undefined;
|
||||
const emit = (event: TalkRealtimeRelayEventPayload, talkEvent?: TalkEventInput) =>
|
||||
broadcastToOwner(params.context, params.connId, {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import type { RealtimeTranscriptionProviderPlugin } from "../plugins/types.js";
|
||||
import type { RealtimeTranscriptionProviderConfig } from "../realtime-transcription/provider-types.js";
|
||||
import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js";
|
||||
import {
|
||||
type TalkEvent,
|
||||
type TalkEventInput,
|
||||
@@ -138,13 +139,16 @@ export function createTalkTranscriptionRelaySession(
|
||||
enforceTranscriptionSessionLimits(params.connId);
|
||||
const transcriptionSessionId = randomUUID();
|
||||
const expiresAtMs = Date.now() + TRANSCRIPTION_SESSION_TTL_MS;
|
||||
const talk = createTalkSessionController({
|
||||
sessionId: transcriptionSessionId,
|
||||
mode: "transcription",
|
||||
transport: "gateway-relay",
|
||||
brain: "none",
|
||||
provider: params.provider.id,
|
||||
});
|
||||
const talk = createTalkSessionController(
|
||||
{
|
||||
sessionId: transcriptionSessionId,
|
||||
mode: "transcription",
|
||||
transport: "gateway-relay",
|
||||
brain: "none",
|
||||
provider: params.provider.id,
|
||||
},
|
||||
{ onEvent: recordTalkDiagnosticEvent },
|
||||
);
|
||||
let relay: TranscriptionRelaySession | undefined;
|
||||
const emit = (event: TalkTranscriptionRelayEventPayload, talkEvent?: TalkEventInput): void => {
|
||||
broadcastToOwner(params.context, params.connId, {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import type { TalkBrain, TalkEventType, TalkMode, TalkTransport } from "../talk/talk-events.js";
|
||||
import {
|
||||
formatDiagnosticTraceparent,
|
||||
getActiveDiagnosticTraceContext,
|
||||
@@ -114,6 +115,21 @@ export type DiagnosticMessageDeliveryErrorEvent = DiagnosticMessageDeliveryBaseE
|
||||
errorCategory: string;
|
||||
};
|
||||
|
||||
export type DiagnosticTalkEvent = DiagnosticBaseEvent & {
|
||||
type: "talk.event";
|
||||
sessionId?: string;
|
||||
turnId?: string;
|
||||
captureId?: string;
|
||||
talkEventType: TalkEventType;
|
||||
mode: TalkMode;
|
||||
transport: TalkTransport;
|
||||
brain: TalkBrain;
|
||||
provider?: string;
|
||||
final?: boolean;
|
||||
durationMs?: number;
|
||||
byteLength?: number;
|
||||
};
|
||||
|
||||
export type DiagnosticSessionStateEvent = DiagnosticBaseEvent & {
|
||||
type: "session.state";
|
||||
sessionKey?: string;
|
||||
@@ -548,6 +564,7 @@ export type DiagnosticEventPayload =
|
||||
| DiagnosticMessageDeliveryStartedEvent
|
||||
| DiagnosticMessageDeliveryCompletedEvent
|
||||
| DiagnosticMessageDeliveryErrorEvent
|
||||
| DiagnosticTalkEvent
|
||||
| DiagnosticSessionStateEvent
|
||||
| DiagnosticSessionLongRunningEvent
|
||||
| DiagnosticSessionStalledEvent
|
||||
@@ -623,6 +640,7 @@ const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set<DiagnosticEventPayload["type"]>([
|
||||
"message.delivery.started",
|
||||
"message.delivery.completed",
|
||||
"message.delivery.error",
|
||||
"talk.event",
|
||||
"model.call.started",
|
||||
"model.call.completed",
|
||||
"model.call.error",
|
||||
|
||||
@@ -35,10 +35,12 @@ export {
|
||||
type TalkMode,
|
||||
type TalkTransport,
|
||||
} from "../talk/talk-events.js";
|
||||
export { createTalkDiagnosticEvent, recordTalkDiagnosticEvent } from "../talk/diagnostics.js";
|
||||
export {
|
||||
createTalkSessionController,
|
||||
normalizeTalkTransport,
|
||||
type TalkEnsureTurnResult,
|
||||
type TalkSessionControllerOptions,
|
||||
type TalkSessionController,
|
||||
type TalkSessionControllerParams,
|
||||
type TalkTurnFailure,
|
||||
|
||||
72
src/talk/diagnostics.test.ts
Normal file
72
src/talk/diagnostics.test.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
onInternalDiagnosticEvent,
|
||||
resetDiagnosticEventsForTest,
|
||||
type DiagnosticEventPayload,
|
||||
} from "../infra/diagnostic-events.js";
|
||||
import { createTalkDiagnosticEvent, recordTalkDiagnosticEvent } from "./diagnostics.js";
|
||||
import { createTalkEventSequencer } from "./talk-events.js";
|
||||
|
||||
describe("talk diagnostics", () => {
|
||||
beforeEach(() => {
|
||||
resetDiagnosticEventsForTest();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
resetDiagnosticEventsForTest();
|
||||
});
|
||||
|
||||
it("maps talk events to bounded diagnostic events without payload content", async () => {
|
||||
const diagnostics: Array<{ event: DiagnosticEventPayload; trusted: boolean }> = [];
|
||||
onInternalDiagnosticEvent((event, metadata) => {
|
||||
diagnostics.push({ event, trusted: metadata.trusted });
|
||||
});
|
||||
const events = createTalkEventSequencer({
|
||||
sessionId: "talk-session",
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: "openai",
|
||||
});
|
||||
|
||||
const talkEvent = events.next({
|
||||
type: "input.audio.delta",
|
||||
turnId: "turn-1",
|
||||
payload: {
|
||||
byteLength: 320,
|
||||
text: "private transcript should not export",
|
||||
},
|
||||
});
|
||||
|
||||
expect(createTalkDiagnosticEvent(talkEvent)).toEqual({
|
||||
type: "talk.event",
|
||||
sessionId: "talk-session",
|
||||
turnId: "turn-1",
|
||||
captureId: undefined,
|
||||
talkEventType: "input.audio.delta",
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
provider: "openai",
|
||||
final: undefined,
|
||||
durationMs: undefined,
|
||||
byteLength: 320,
|
||||
});
|
||||
|
||||
recordTalkDiagnosticEvent(talkEvent);
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
expect(diagnostics).toHaveLength(1);
|
||||
expect(diagnostics[0]).toMatchObject({
|
||||
trusted: true,
|
||||
event: {
|
||||
type: "talk.event",
|
||||
talkEventType: "input.audio.delta",
|
||||
sessionId: "talk-session",
|
||||
turnId: "turn-1",
|
||||
byteLength: 320,
|
||||
},
|
||||
});
|
||||
expect(JSON.stringify(diagnostics[0]?.event)).not.toContain("private transcript");
|
||||
});
|
||||
});
|
||||
51
src/talk/diagnostics.ts
Normal file
51
src/talk/diagnostics.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import {
|
||||
emitTrustedDiagnosticEvent,
|
||||
type DiagnosticEventInput,
|
||||
} from "../infra/diagnostic-events.js";
|
||||
import type { TalkEvent } from "./talk-events.js";
|
||||
|
||||
type TalkDiagnosticEventInput = Extract<DiagnosticEventInput, { type: "talk.event" }>;
|
||||
|
||||
export function createTalkDiagnosticEvent(event: TalkEvent): TalkDiagnosticEventInput {
|
||||
const payload = asRecord(event.payload);
|
||||
return {
|
||||
type: "talk.event",
|
||||
sessionId: event.sessionId,
|
||||
turnId: event.turnId,
|
||||
captureId: event.captureId,
|
||||
talkEventType: event.type,
|
||||
mode: event.mode,
|
||||
transport: event.transport,
|
||||
brain: event.brain,
|
||||
provider: event.provider,
|
||||
final: event.final,
|
||||
durationMs: firstFiniteNumber(payload, ["durationMs", "latencyMs", "elapsedMs"]),
|
||||
byteLength: firstFiniteNumber(payload, ["byteLength", "audioBytes"]),
|
||||
};
|
||||
}
|
||||
|
||||
export function recordTalkDiagnosticEvent(event: TalkEvent): void {
|
||||
emitTrustedDiagnosticEvent(createTalkDiagnosticEvent(event));
|
||||
}
|
||||
|
||||
function asRecord(value: unknown): Record<string, unknown> | undefined {
|
||||
return value && typeof value === "object" && !Array.isArray(value)
|
||||
? (value as Record<string, unknown>)
|
||||
: undefined;
|
||||
}
|
||||
|
||||
function firstFiniteNumber(
|
||||
record: Record<string, unknown> | undefined,
|
||||
keys: readonly string[],
|
||||
): number | undefined {
|
||||
if (!record) {
|
||||
return undefined;
|
||||
}
|
||||
for (const key of keys) {
|
||||
const value = record[key];
|
||||
if (typeof value === "number" && Number.isFinite(value) && value >= 0) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
@@ -97,6 +97,28 @@ describe("createTalkSessionController", () => {
|
||||
expect(talk.outputAudioActive).toBe(false);
|
||||
});
|
||||
|
||||
it("notifies an event hook for emitted and controller-created events", () => {
|
||||
const events: string[] = [];
|
||||
const talk = createTalkSessionController(
|
||||
{
|
||||
sessionId: "talk-session",
|
||||
mode: "realtime",
|
||||
transport: "gateway-relay",
|
||||
brain: "agent-consult",
|
||||
},
|
||||
{
|
||||
now: () => "2026-05-05T00:00:00.000Z",
|
||||
onEvent: (event) => events.push(event.type),
|
||||
},
|
||||
);
|
||||
|
||||
talk.emit({ type: "session.started", payload: {} });
|
||||
const turn = talk.ensureTurn();
|
||||
talk.endTurn({ turnId: turn.turnId });
|
||||
|
||||
expect(events).toEqual(["session.started", "turn.started", "turn.ended"]);
|
||||
});
|
||||
|
||||
it("clears stale output audio state when a replacement turn starts", () => {
|
||||
const talk = createController();
|
||||
|
||||
|
||||
@@ -49,9 +49,15 @@ export type TalkSessionControllerParams = TalkEventContext & {
|
||||
turnIdPrefix?: string;
|
||||
};
|
||||
|
||||
export type TalkSessionControllerOptions = {
|
||||
now?: () => Date | string;
|
||||
onEvent?: (event: TalkEvent) => void;
|
||||
sequencer?: TalkEventSequencer;
|
||||
};
|
||||
|
||||
export function createTalkSessionController(
|
||||
params: TalkSessionControllerParams,
|
||||
options: { now?: () => Date | string; sequencer?: TalkEventSequencer } = {},
|
||||
options: TalkSessionControllerOptions = {},
|
||||
): TalkSessionController {
|
||||
const { maxRecentEvents = 20, turnIdPrefix = "turn", ...context } = params;
|
||||
const sequencer = options.sequencer ?? createTalkEventSequencer(context, { now: options.now });
|
||||
@@ -65,6 +71,11 @@ export function createTalkSessionController(
|
||||
if (recentEvents.length > maxRecentEvents) {
|
||||
recentEvents.splice(0, recentEvents.length - maxRecentEvents);
|
||||
}
|
||||
try {
|
||||
options.onEvent?.(event as TalkEvent);
|
||||
} catch {
|
||||
// Diagnostics hooks must not break Talk delivery.
|
||||
}
|
||||
return event;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user