From 150d9f4e372220d975dced95f736734699ce73b1 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 12 May 2026 19:11:14 +0100 Subject: [PATCH] test(tasks): cover cleanup_message_events task body Adds skipped-when-no-POSTGRES_URI and happy-path coverage for the Celery janitor. The skipped path returns the documented short-circuit shape without touching the repo. The happy path seeds a backdated row, runs the task against the pg_conn fixture, and asserts the retention window's row is deleted while in-window rows survive. Mirrors the TestCleanupPendingToolState pattern. --- tests/api/user/test_tasks.py | 87 ++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/tests/api/user/test_tasks.py b/tests/api/user/test_tasks.py index ef1adce7..a2792db0 100644 --- a/tests/api/user/test_tasks.py +++ b/tests/api/user/test_tasks.py @@ -428,6 +428,93 @@ class TestCleanupPendingToolState: } +class TestCleanupMessageEventsTask: + """Retention janitor delegates to MessageEventsRepository.cleanup_older_than.""" + + @pytest.mark.unit + def test_skips_when_postgres_uri_missing(self, monkeypatch): + from application.api.user.tasks import cleanup_message_events + from application.core.settings import settings + + monkeypatch.setattr(settings, "POSTGRES_URI", None, raising=False) + + result = cleanup_message_events.run() + assert result == {"deleted": 0, "skipped": "POSTGRES_URI not set"} + + @pytest.mark.unit + def test_deletes_rows_past_retention_window(self, pg_conn, monkeypatch): + import uuid + + from sqlalchemy import text as _text + + from application.api.user.tasks import cleanup_message_events + from application.core.settings import settings + from application.storage.db.repositories.message_events import ( + MessageEventsRepository, + ) + + # Seed parent rows so the FK on message_events holds. + user_id = f"user-{uuid.uuid4().hex[:8]}" + conv_id = uuid.uuid4() + msg_id = uuid.uuid4() + pg_conn.execute( + _text("INSERT INTO users (user_id) VALUES (:u)"), + {"u": user_id}, + ) + pg_conn.execute( + _text( + "INSERT INTO conversations (id, user_id, name) " + "VALUES (:id, :u, 'test')" + ), + {"id": conv_id, "u": user_id}, + ) + pg_conn.execute( + _text( + "INSERT INTO conversation_messages (id, conversation_id, " + "user_id, position) VALUES (:id, :c, :u, 0)" + ), + {"id": msg_id, "c": conv_id, "u": user_id}, + ) + + repo = MessageEventsRepository(pg_conn) + repo.record(str(msg_id), 0, "answer", {"chunk": "stale"}) + repo.record(str(msg_id), 1, "answer", {"chunk": "fresh"}) + # Backdate seq=0 past the default 14-day retention so the + # janitor catches it; seq=1 stays at "now" and must survive. + pg_conn.execute( + _text( + "UPDATE message_events SET created_at = now() - interval '20 days' " + "WHERE message_id = CAST(:id AS uuid) AND sequence_no = 0" + ), + {"id": str(msg_id)}, + ) + + monkeypatch.setattr( + settings, "POSTGRES_URI", "postgresql://stub", raising=False + ) + + @contextmanager + def _fake_begin(): + yield pg_conn + + fake_engine = MagicMock() + fake_engine.begin = _fake_begin + + with patch( + "application.storage.db.engine.get_engine", + return_value=fake_engine, + ): + result = cleanup_message_events.run() + + assert result == { + "deleted": 1, + "ttl_days": settings.MESSAGE_EVENTS_RETENTION_DAYS, + } + # Only the fresh row survives. + rows = repo.read_after(str(msg_id)) + assert [r["sequence_no"] for r in rows] == [1] + + class TestIngestIdempotency: """Same short-circuit applies to the ingest task path."""