diff --git a/application/storage/db/repositories/tool_call_attempts.py b/application/storage/db/repositories/tool_call_attempts.py index d3624736..9b72c3ab 100644 --- a/application/storage/db/repositories/tool_call_attempts.py +++ b/application/storage/db/repositories/tool_call_attempts.py @@ -63,7 +63,8 @@ class ToolCallAttemptsRepository: message_id: Optional[str] = None, artifact_id: Optional[str] = None, ) -> None: - """Insert OR upgrade a row to ``executed``. + """Insert OR upgrade a row to ``executed`` — or ``confirmed`` when + there is no ``message_id``, as in ``mark_executed``. Used as a fallback when ``record_proposed`` failed (DB outage) and the tool ran anyway — preserves the journal so the @@ -72,6 +73,7 @@ class ToolCallAttemptsRepository: result_payload: dict = {"result": result} if artifact_id: result_payload["artifact_id"] = artifact_id + status = "executed" if message_id is not None else "confirmed" self._conn.execute( text( """ @@ -82,9 +84,9 @@ class ToolCallAttemptsRepository: (:call_id, CAST(:tool_id AS uuid), :tool_name, :action_name, CAST(:arguments AS jsonb), CAST(:result AS jsonb), CAST(:message_id AS uuid), - 'executed') + :status) ON CONFLICT (call_id) DO UPDATE - SET status = 'executed', + SET status = :status, result = EXCLUDED.result, message_id = COALESCE(EXCLUDED.message_id, tool_call_attempts.message_id) """ @@ -97,6 +99,7 @@ class ToolCallAttemptsRepository: "arguments": json.dumps(arguments if arguments is not None else {}, cls=PGNativeJSONEncoder), "result": json.dumps(result_payload, cls=PGNativeJSONEncoder), "message_id": message_id, + "status": status, }, ) @@ -108,7 +111,9 @@ class ToolCallAttemptsRepository: message_id: Optional[str] = None, artifact_id: Optional[str] = None, ) -> bool: - """Flip ``proposed`` → ``executed`` with the tool result. + """Flip ``proposed`` → ``executed``, or straight to ``confirmed`` + when there is no ``message_id`` (a ``save_conversation=False`` + request reserves no message, so no finalize will confirm it). ``artifact_id`` (when present) is stored alongside ``result`` in the JSONB as audit data — the reconciler reads it for diagnostic @@ -117,12 +122,14 @@ class ToolCallAttemptsRepository: result_payload: dict = {"result": result} if artifact_id: result_payload["artifact_id"] = artifact_id + status = "executed" if message_id is not None else "confirmed" sql = ( "UPDATE tool_call_attempts SET " - "status = 'executed', result = CAST(:result AS jsonb)" + "status = :status, result = CAST(:result AS jsonb)" ) params: dict[str, Any] = { "call_id": call_id, + "status": status, "result": json.dumps(result_payload, cls=PGNativeJSONEncoder), } if message_id is not None: diff --git a/tests/agents/test_tool_executor_three_phase.py b/tests/agents/test_tool_executor_three_phase.py index d36ab5a7..65020b24 100644 --- a/tests/agents/test_tool_executor_three_phase.py +++ b/tests/agents/test_tool_executor_three_phase.py @@ -1,10 +1,9 @@ """Tests for the journaled execute path on ToolExecutor. -Each tool call inserts a row into ``tool_call_attempts`` then flips -through ``proposed → executed`` (or ``proposed → failed``). The flip -to ``confirmed`` is owned by the message-finalize path and is only -asserted indirectly here (rows stay in ``executed`` so the reconciler -can pick them up). +Each tool call inserts a ``tool_call_attempts`` row and flips it +``proposed → executed`` (or ``→ failed``). With a ``message_id`` it +stays ``executed`` for the finalize path to confirm; without one +(``save_conversation=False``) it goes straight to ``confirmed``. """ from contextlib import contextmanager @@ -75,11 +74,24 @@ def _make_call(name="test_action_t1", call_id="c1"): return call +_TOOLS_DICT = { + "t1": { + "id": "00000000-0000-0000-0000-000000000001", + "name": "test_tool", + "config": {"key": "val"}, + "actions": [ + {"name": "test_action", "description": "T", "parameters": {"properties": {}}}, + ], + } +} + + @pytest.mark.unit class TestExecuteJournaling: - def test_happy_path_proposed_then_executed( + def test_no_message_id_proposed_then_confirmed( self, pg_conn, mock_tool_manager, monkeypatch ): + """No reserved message (``save_conversation=False``) → row lands ``confirmed``, not ``executed``.""" executor = ToolExecutor(user="u") monkeypatch.setattr( "application.agents.tool_executor.ToolActionParser", @@ -89,23 +101,12 @@ class TestExecuteJournaling: ) _patch_db(monkeypatch, pg_conn) - tools_dict = { - "t1": { - "id": "00000000-0000-0000-0000-000000000001", - "name": "test_tool", - "config": {"key": "val"}, - "actions": [ - {"name": "test_action", "description": "T", "parameters": {"properties": {}}}, - ], - } - } - - events, result = _drain(executor.execute(tools_dict, _make_call(), "MockLLM")) + events, result = _drain(executor.execute(_TOOLS_DICT, _make_call(), "MockLLM")) assert result[0] == "Tool result" row = _select_attempt(pg_conn, "c1") assert row is not None - assert row["status"] == "executed" + assert row["status"] == "confirmed" assert row["tool_name"] == "test_tool" assert row["action_name"] == "test_action" assert row["arguments"] == {"q": "v"} @@ -117,10 +118,7 @@ class TestExecuteJournaling: def test_executor_message_id_is_persisted_on_executed_row( self, pg_conn, mock_tool_manager, monkeypatch ): - """When the route stamps a placeholder message_id on the executor, - the journal row carries it forward so ``confirm_executed_tool_calls`` - can later flip it to ``confirmed``. - """ + """The executor's message_id is carried onto the journal row, which stays ``executed``.""" from application.storage.db.repositories.conversations import ( ConversationsRepository, ) @@ -147,18 +145,7 @@ class TestExecuteJournaling: ) _patch_db(monkeypatch, pg_conn) - tools_dict = { - "t1": { - "id": "00000000-0000-0000-0000-000000000001", - "name": "test_tool", - "config": {"key": "val"}, - "actions": [ - {"name": "test_action", "description": "T", "parameters": {"properties": {}}}, - ], - } - } - - _drain(executor.execute(tools_dict, _make_call(call_id="cm1"), "MockLLM")) + _drain(executor.execute(_TOOLS_DICT, _make_call(call_id="cm1"), "MockLLM")) row = _select_attempt(pg_conn, "cm1") assert row is not None @@ -180,18 +167,7 @@ class TestExecuteJournaling: RuntimeError("boom") ) - tools_dict = { - "t1": { - "id": "00000000-0000-0000-0000-000000000001", - "name": "test_tool", - "config": {"key": "val"}, - "actions": [ - {"name": "test_action", "description": "T", "parameters": {"properties": {}}}, - ], - } - } - - gen = executor.execute(tools_dict, _make_call(call_id="c2"), "MockLLM") + gen = executor.execute(_TOOLS_DICT, _make_call(call_id="c2"), "MockLLM") with pytest.raises(RuntimeError, match="boom"): _drain(gen) @@ -200,42 +176,10 @@ class TestExecuteJournaling: assert row["status"] == "failed" assert row["error"] == "boom" - def test_executed_row_lingers_for_reconciler_when_no_confirm( - self, pg_conn, mock_tool_manager, monkeypatch - ): - """No finalize_message call → row sits in ``executed``.""" - executor = ToolExecutor(user="u") - monkeypatch.setattr( - "application.agents.tool_executor.ToolActionParser", - lambda _cls, **kw: Mock( - parse_args=Mock(return_value=("t1", "test_action", {})) - ), - ) - _patch_db(monkeypatch, pg_conn) - - tools_dict = { - "t1": { - "id": "00000000-0000-0000-0000-000000000001", - "name": "test_tool", - "config": {"key": "val"}, - "actions": [ - {"name": "test_action", "description": "T", "parameters": {"properties": {}}}, - ], - } - } - - _drain(executor.execute(tools_dict, _make_call(call_id="c3"), "MockLLM")) - - row = _select_attempt(pg_conn, "c3") - assert row["status"] == "executed" - # Partial index `tool_call_attempts_pending_ts_idx` selects rows - # in ('proposed','executed') — the reconciler reads those. - assert row["status"] in ("proposed", "executed") - @pytest.mark.unit class TestRepository: - def test_proposed_then_executed_round_trip(self, pg_conn): + def test_proposed_then_confirmed_when_no_message(self, pg_conn): from application.storage.db.repositories.tool_call_attempts import ( ToolCallAttemptsRepository, ) @@ -249,7 +193,50 @@ class TestRepository: assert repo.mark_executed("c-x", {"out": "ok"}) is True row = _select_attempt(pg_conn, "c-x") + assert row["status"] == "confirmed" + assert row["message_id"] is None + assert row["result"] == {"result": {"out": "ok"}} + + def test_mark_executed_with_message_stays_executed(self, pg_conn): + from application.storage.db.repositories.conversations import ( + ConversationsRepository, + ) + from application.storage.db.repositories.tool_call_attempts import ( + ToolCallAttemptsRepository, + ) + + # FK constraint: message_id must reference a real row. + conv_repo = ConversationsRepository(pg_conn) + conv = conv_repo.create("u-repo", "repo-msg-test") + msg = conv_repo.reserve_message( + str(conv["id"]), + prompt="q?", + placeholder_response="...", + request_id="req-repo-1", + status="pending", + ) + message_uuid = str(msg["id"]) + + repo = ToolCallAttemptsRepository(pg_conn) + repo.record_proposed("c-m", "tool", "act", {}) + assert ( + repo.mark_executed("c-m", {"out": "ok"}, message_id=message_uuid) is True + ) + row = _select_attempt(pg_conn, "c-m") assert row["status"] == "executed" + assert str(row["message_id"]) == message_uuid + + def test_upsert_executed_without_message_confirms(self, pg_conn): + """``upsert_executed`` (DB-outage fallback) with no ``message_id`` lands ``confirmed``.""" + from application.storage.db.repositories.tool_call_attempts import ( + ToolCallAttemptsRepository, + ) + + repo = ToolCallAttemptsRepository(pg_conn) + repo.upsert_executed("c-up", "tool", "act", {"a": 1}, {"out": "ok"}) + row = _select_attempt(pg_conn, "c-up") + assert row["status"] == "confirmed" + assert row["message_id"] is None assert row["result"] == {"result": {"out": "ok"}} def test_mark_failed_sets_error(self, pg_conn):