From bdb0b1d90829d5ed23740327620947cc1e91a720 Mon Sep 17 00:00:00 2001 From: giveen Date: Mon, 19 Jan 2026 12:37:48 -0700 Subject: [PATCH] docs: clarify gather_candidate_targets is shallow, not recursive --- dupe-workspace.tar.gz | Bin 0 -> 444 bytes expimp-workspace.tar.gz | Bin 0 -> 502 bytes pentestagent/agents/base_agent.py | 46 ++- pentestagent/agents/crew/orchestrator.py | 39 ++- pentestagent/agents/crew/worker_pool.py | 46 ++- pentestagent/agents/pa_agent/pa_agent.py | 12 +- pentestagent/interface/tui.py | 417 ++++++++++++++++++----- pentestagent/mcp/hexstrike_adapter.py | 138 ++++++-- pentestagent/mcp/metasploit_adapter.py | 110 +++++- pentestagent/mcp/transport.py | 107 +++++- pentestagent/runtime/docker_runtime.py | 29 +- pentestagent/runtime/runtime.py | 61 +++- pentestagent/workspaces/utils.py | 2 +- pentestagent/workspaces/validation.py | 15 +- tests/test_rag_workspace_integration.py | 3 + tests/test_validation.py | 51 +++ tests/test_workspace_utils.py | 57 ++++ 17 files changed, 955 insertions(+), 178 deletions(-) create mode 100644 dupe-workspace.tar.gz create mode 100644 expimp-workspace.tar.gz create mode 100644 tests/test_validation.py create mode 100644 tests/test_workspace_utils.py diff --git a/dupe-workspace.tar.gz b/dupe-workspace.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..7bb4cfc3b49a29989df537c9382f29914798f1c8 GIT binary patch literal 444 zcmV;t0Ym;DiwFo-fNp65|73M=Wi5Aaa%*#NVPj=3bYXG;?b=UE!eAW0@tu7YyLD=N zo@dV=9RwXacIZ?@JhcZYor`UfzWXLYMIm9$MfUx8m~6s8`F)>FP0}PiJ@>1#;EPa4 zdm&zI+X|8Cx96MvQYfjScohv`*|fgyP9ObGY8;pCeHl)qv*WRzbdN+^VT4no2nX+} zs%OokzY9%TEFVKUTU1Z;P)tMG@BBMwHe2b8c)xv}d1+*pneKqH-TWJ4WfYqs=l>zp zpf^PygvZxc;aA^q>CIf9#|G^QvmoJg;crFxzU$LqwcMmhPKJ$t841O`p^2$`j3NJ m{})x;%WpnEyZ_TJtCFmeAa!o z3wN_@`JKe&v2JP2#b&)*1Yhru@7~axo7b`RQO*1&O*a9Z_n*}!%{cPpTby~h+w)~y zeoS{a<}Wevw0pPvyX4az)<9vN1G~QM%Gz(Y!*55|n)ry~!&Bz?T*|CIx>BHNl>x*5 z`Ms;Bef2u;V|0Kw7lRMHF+NMe};RNC|6u>;fAjy|TC>W18qZBFt^TwBoF9Bn^bbE+&SLui?CtAz zbw0fQ->d9r`~QdaKjit&{E;ZGY@%_8j5E7d@2xG|nqK%YZC8WY p1)!Z@>~~!Fr$6`o!!yq({ None: """Wait for dependent workers to complete.""" @@ -248,8 +265,17 @@ class WorkerPool: if dep_id in self._tasks: try: await self._tasks[dep_id] - except (asyncio.CancelledError, Exception): - pass # Dependency failed, but we continue + except (asyncio.CancelledError, Exception) as e: + import logging + + logging.getLogger(__name__).exception("Dependency wait failed for %s: %s", dep_id, e) + try: + from ...interface.notifier import notify + + notify("warning", f"Dependency wait failed for {dep_id}: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about dependency wait failure: %s", ne) + # Dependency failed, but we continue async def wait_for(self, agent_ids: Optional[List[str]] = None) -> Dict[str, Any]: """ @@ -269,8 +295,16 @@ class WorkerPool: if agent_id in self._tasks: try: await self._tasks[agent_id] - except (asyncio.CancelledError, Exception): - pass + except (asyncio.CancelledError, Exception) as e: + import logging + + logging.getLogger(__name__).exception("Waiting for agent task %s failed: %s", agent_id, e) + try: + from ...interface.notifier import notify + + notify("warning", f"Waiting for agent {agent_id} failed: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about wait_for agent failure: %s", ne) worker = self._workers.get(agent_id) if worker: diff --git a/pentestagent/agents/pa_agent/pa_agent.py b/pentestagent/agents/pa_agent/pa_agent.py index f53a4a8..f4449fe 100644 --- a/pentestagent/agents/pa_agent/pa_agent.py +++ b/pentestagent/agents/pa_agent/pa_agent.py @@ -117,8 +117,16 @@ class PentestAgentAgent(BaseAgent): sections.append("\n".join(grouped[cat])) notes_context = "\n\n".join(sections) - except Exception: - pass # Notes not available + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed to gather notes for agent prompt: %s", e) + try: + from ...interface.notifier import notify + + notify("warning", f"Agent: failed to gather notes: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about agent notes failure") # Get environment info from runtime env = self.runtime.environment diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 572e8f5..253902b 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -362,7 +362,14 @@ class ToolsScreen(ModalScreen): def on_mount(self) -> None: try: tree = self.query_one("#tools-tree", Tree) - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to query tools tree: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to initialize tools tree: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about tools tree init failure: %s", e) return root = tree.root @@ -376,8 +383,14 @@ class ToolsScreen(ModalScreen): try: tree.focus() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to focus tools tree: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to focus tools tree: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about tools tree focus failure: %s", e) @on(Tree.NodeSelected, "#tools-tree") def on_tool_selected(self, event: Tree.NodeSelected) -> None: @@ -406,10 +419,22 @@ class ToolsScreen(ModalScreen): text.append(f"{name}\n", style="bold #d4d4d4") text.append(str(desc), style="#d4d4d4") desc_widget.update(text) - except Exception: - pass - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to update tool description pane: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to update tool description: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about tool desc update failure: %s", e) + except Exception as e: + logging.getLogger(__name__).exception("Unhandled error in on_tool_selected: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: error handling tool selection: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about tool selection error: %s", e) @on(Button.Pressed, "#tools-close") def close_tools(self) -> None: @@ -722,7 +747,14 @@ class TokenDiagnostics(Static): # Lazy import of token_tracker (best-effort) try: from ..tools import token_tracker - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to import token_tracker: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: token tracker import failed: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about token_tracker import failure: %s", e) token_tracker = None text.append("Token Usage Diagnostics\n", style="bold #d4d4d4") @@ -744,8 +776,14 @@ class TokenDiagnostics(Static): token_tracker.record_usage_sync(0, 0) stats = token_tracker.get_stats_sync() reset_occurred = True - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Token tracker reset failed: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Token tracker reset failed: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about token tracker reset failure: %s", e) # Extract values last_in = int(stats.get("last_input_tokens", 0) or 0) @@ -764,7 +802,8 @@ class TokenDiagnostics(Static): return None try: return float(v) - except Exception: + except Exception as e: + logging.getLogger(__name__).debug("Failed to parse env var %s: %s", name, e) return "INVALID" unified = _parse_env("COST_PER_MILLION") @@ -839,7 +878,14 @@ class TokenDiagnostics(Static): dl = float(daily_limit) remaining_tokens = max(int(dl - new_daily_total), 0) percent_used = (new_daily_total / max(1.0, dl)) * 100.0 - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to compute daily limit values: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to compute daily token limit: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about daily limit computation failure: %s", e) remaining_tokens = None # Render structured panel with aligned labels and block bars @@ -1200,8 +1246,14 @@ class PentestAgentTUI(App): from .notifier import register_callback register_callback(self._notifier_callback) - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to register TUI notifier callback: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to register notifier callback: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about notifier registration failure: %s", ne) # Call the textual worker - decorator returns a Worker, not a coroutine _ = cast(Any, self._initialize_agent()) @@ -1258,17 +1310,24 @@ class PentestAgentTUI(App): self._add_system(f"[!] RAG: {e}") self.rag_engine = None - # MCP - auto-load if config exists + # MCP - auto-load only if enabled in environment mcp_server_count = 0 - try: - self.mcp_manager = MCPManager() - if self.mcp_manager.config_path.exists(): - mcp_tools = await self.mcp_manager.connect_all() - for tool in mcp_tools: - register_tool_instance(tool) - mcp_server_count = len(self.mcp_manager.servers) - except Exception as e: - self._add_system(f"[!] MCP: {e}") + import os + launch_hexstrike = os.getenv("LAUNCH_HEXTRIKE", "false").lower() == "true" + launch_metasploit = os.getenv("LAUNCH_METASPLOIT_MCP", "false").lower() == "true" + if launch_hexstrike or launch_metasploit: + try: + self.mcp_manager = MCPManager() + if self.mcp_manager.config_path.exists(): + mcp_tools = await self.mcp_manager.connect_all() + for tool in mcp_tools: + register_tool_instance(tool) + mcp_server_count = len(self.mcp_manager.servers) + except Exception as e: + self._add_system(f"[!] MCP: {e}") + else: + self.mcp_manager = None + mcp_server_count = 0 # Runtime - Docker or Local if self.use_docker: @@ -1346,8 +1405,14 @@ class PentestAgentTUI(App): if mode: bar.mode = mode self._mode = mode - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to update status bar: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to update status bar: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about status bar update failure: %s", ne) def _show_notification(self, level: str, message: str) -> None: """Display a short operator-visible notification in the chat area.""" @@ -1358,8 +1423,8 @@ class PentestAgentTUI(App): # Set status bar to error briefly for emphasis if level.lower() in ("error", "critical"): self._set_status("error") - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to show notification in TUI: %s", e) def _notifier_callback(self, level: str, message: str) -> None: """Callback wired to `pentestagent.interface.notifier`. @@ -1373,12 +1438,13 @@ class PentestAgentTUI(App): try: self.call_from_thread(self._show_notification, level, message) return - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("call_from_thread failed in notifier callback: %s", e) # Fall through to direct call pass self._show_notification(level, message) - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Exception in notifier callback handling: %s", e) def _add_message(self, widget: Static) -> None: """Add a message widget to chat""" @@ -1387,8 +1453,14 @@ class PentestAgentTUI(App): widget.add_class("message") scroll.mount(widget) scroll.scroll_end(animate=False) - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to add message to chat: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to add chat message: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about add_message failure: %s", ne) def _add_system(self, content: str) -> None: self._add_message(SystemMessage(content)) @@ -1423,7 +1495,14 @@ class PentestAgentTUI(App): """Mount a live memory diagnostics widget into the chat area.""" try: scroll = self.query_one("#chat-scroll", ScrollableContainer) - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to query chat-scroll for memory diagnostics: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: memory diagnostics unavailable: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about memory diagnostics availability: %s", ne) self._add_system("Agent not initialized") return # Mount a new diagnostics panel with a unique ID and scroll into view @@ -1431,21 +1510,35 @@ class PentestAgentTUI(App): import uuid panel_id = f"memory-diagnostics-{uuid.uuid4().hex}" - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to generate memory diagnostics panel id: %s", e) panel_id = None widget = MemoryDiagnostics(id=panel_id) scroll.mount(widget) try: scroll.scroll_end(animate=False) - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to scroll to memory diagnostics panel: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to scroll to memory diagnostics panel: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about scroll failure: %s", ne) def _show_token_stats(self) -> None: """Mount a live token diagnostics widget into the chat area.""" try: scroll = self.query_one("#chat-scroll", ScrollableContainer) - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to query chat-scroll for token diagnostics: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: token diagnostics unavailable: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics availability: %s", ne) self._add_system("Agent not initialized") return # Mount a new diagnostics panel with a unique ID and scroll into view @@ -1453,15 +1546,28 @@ class PentestAgentTUI(App): import uuid panel_id = f"token-diagnostics-{uuid.uuid4().hex}" - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to generate token diagnostics panel id: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to generate token diagnostics panel id: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics panel id generation failure: %s", ne) panel_id = None widget = TokenDiagnostics(id=panel_id) scroll.mount(widget) try: scroll.scroll_end(animate=False) - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to scroll to token diagnostics panel: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to scroll to token diagnostics panel: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics scroll failure: %s", ne) async def _show_notes(self) -> None: """Display saved notes""" @@ -1580,10 +1686,16 @@ class PentestAgentTUI(App): from pentestagent.interface.notifier import notify notify("warning", f"Failed to persist last target for workspace {active}: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about target persist error") - except Exception: - logging.getLogger(__name__).exception("Failed to access WorkspaceManager to persist last target") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about target persist error: %s", ne) + except Exception as e: + logging.getLogger(__name__).exception("Failed to access WorkspaceManager to persist last target: %s", e) + try: + from pentestagent.interface.notifier import notify + + notify("warning", f"TUI: failed to persist last target: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about WorkspaceManager access failure: %s", ne) # Update displayed Target in the UI try: @@ -1594,8 +1706,8 @@ class PentestAgentTUI(App): from pentestagent.interface.notifier import notify notify("warning", f"Failed to update target display: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about target display error") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about target display error: %s", ne) # Update the initial ready SystemMessage (if present) so Target appears under Runtime try: scroll = self.query_one("#chat-scroll", ScrollableContainer) @@ -1626,8 +1738,8 @@ class PentestAgentTUI(App): from pentestagent.interface.notifier import notify notify("warning", f"Failed to refresh UI after target update: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about UI refresh error") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about UI refresh error: %s", ne) except Exception as e: # Fallback to append if regex replacement fails, and surface warning logging.getLogger(__name__).exception("Failed to update SystemMessage target line: %s", e) @@ -1635,8 +1747,8 @@ class PentestAgentTUI(App): from pentestagent.interface.notifier import notify notify("warning", f"Failed to update target display: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about target update error") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about target update error: %s", ne) child.message_content = ( child.message_content + f"\n Target: {target}" ) @@ -1651,9 +1763,23 @@ class PentestAgentTUI(App): scroll.mount_before(msg, first) else: scroll.mount(msg) - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to mount target system message: %s", e) + try: + from pentestagent.interface.notifier import notify + + notify("warning", f"TUI: failed to display target: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about target display failure: %s", ne) self._add_system(f" Target: {target}") - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed while applying target display: %s", e) + try: + from pentestagent.interface.notifier import notify + + notify("warning", f"TUI: failed while updating target display: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about target display outer failure: %s", ne) # Last resort: append a subtle system line self._add_system(f" Target: {target}") @@ -1884,8 +2010,14 @@ Be concise. Use the actual data from notes.""" self.agent.target = last try: self._apply_target_display(last) - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to apply target display when restoring last target: %s", e) + try: + from pentestagent.interface.notifier import notify + + notify("warning", f"TUI: failed restoring last target display: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about restore-last-target failure") self._add_system(f"Active workspace: {active}") return @@ -1993,8 +2125,14 @@ Be concise. Use the actual data from notes.""" self.agent.target = last try: self._apply_target_display(last) - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to apply target display when activating workspace: %s", e) + try: + from pentestagent.interface.notifier import notify + + notify("warning", f"TUI: failed to restore workspace target display: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about workspace target restore failure") if existed: self._add_system(f"Workspace '{name}' set active.") @@ -2118,8 +2256,14 @@ Be concise. Use the actual data from notes.""" try: self._crew_orchestrator_node.expand() tree.select_node(self._crew_orchestrator_node) - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to expand/select crew orchestrator node: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to expand crew sidebar node: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about crew node expansion failure") self._viewing_worker_id = None # Update stats @@ -2153,9 +2297,22 @@ Be concise. Use the actual data from notes.""" ) try: child.refresh() + except Exception as e: + logging.getLogger(__name__).exception("Failed to refresh child message: %s", e) + try: + from pentestagent.interface.notifier import notify + + notify("warning", f"TUI: failed to refresh UI element: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about child refresh failure") + except Exception as e: + logging.getLogger(__name__).exception("Failed to update SystemMessage target line: %s", e) + try: + from pentestagent.interface.notifier import notify + + notify("warning", f"Failed to update target display: {e}") except Exception: - pass - except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about target update error") child.message_content = ( child.message_content + f"\n Target: {target}" ) @@ -2187,8 +2344,14 @@ Be concise. Use the actual data from notes.""" chat_area = self.query_one("#chat-area") chat_area.remove_class("with-sidebar") - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Sidebar error: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: sidebar error: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about sidebar error") def _update_crew_stats(self) -> None: """Update crew stats panel.""" @@ -2231,8 +2394,14 @@ Be concise. Use the actual data from notes.""" stats = self.query_one("#crew-stats", Static) stats.update(text) stats.border_title = "# Stats" - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to hide sidebar: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to hide sidebar: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hide_sidebar failure") def _update_spinner(self) -> None: """Update spinner animation for running workers.""" @@ -2254,8 +2423,14 @@ Be concise. Use the actual data from notes.""" if not has_running and self._spinner_timer: self._spinner_timer.stop() self._spinner_timer = None - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to update crew stats: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to update crew stats: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about crew stats update failure") def _add_crew_worker(self, worker_id: str, worker_type: str, task: str) -> None: """Add a worker to the sidebar tree.""" @@ -2275,11 +2450,23 @@ Be concise. Use the actual data from notes.""" self._crew_worker_nodes[worker_id] = node try: self._crew_orchestrator_node.expand() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to expand crew orchestrator node: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to expand crew node: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about crew node expansion failure") self._update_crew_stats() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to update spinner: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to update spinner: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about spinner update failure") def _update_crew_worker(self, worker_id: str, **updates) -> None: """Update a worker's state.""" @@ -2297,8 +2484,14 @@ Be concise. Use the actual data from notes.""" label = self._format_worker_label(worker_id) self._crew_worker_nodes[worker_id].set_label(label) self._update_crew_stats() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to add crew worker node: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to add crew worker node: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about add_crew_worker failure") def _format_worker_label(self, worker_id: str) -> Text: """Format worker label for tree.""" @@ -2394,8 +2587,14 @@ Be concise. Use the actual data from notes.""" if node: node.add_leaf(f" {tool_name}") node.expand() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to update crew worker display: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to update crew worker display: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about update_crew_worker failure") @on(Tree.NodeSelected, "#workers-tree") def on_worker_tree_selected(self, event: Tree.NodeSelected) -> None: @@ -2538,8 +2737,14 @@ Be concise. Use the actual data from notes.""" if self._current_crew: try: await self._current_crew.cancel() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to add tool to worker node: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to add tool to worker: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about add_tool_to_worker failure") self._current_crew = None self._add_system(f"[!] Crew error: {e}\n{traceback.format_exc()}") self._set_status("error") @@ -2743,8 +2948,14 @@ Be concise. Use the actual data from notes.""" for worker_id, worker in self._crew_workers.items(): if worker.get("status") in ("running", "pending"): self._update_crew_worker(worker_id, status="cancelled") - except Exception: - pass # Best effort + except Exception as e: + logging.getLogger(__name__).exception("Failed to cancel crew orchestrator cleanly: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed during crew cancellation: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about crew cancellation failure") async def _reconnect_mcp_after_cancel(self) -> None: """Reconnect MCP servers after cancellation to restore clean state.""" @@ -2752,8 +2963,14 @@ Be concise. Use the actual data from notes.""" try: if self.mcp_manager: await self.mcp_manager.reconnect_all() - except Exception: - pass # Best effort - don't crash if reconnect fails + except Exception as e: + logging.getLogger(__name__).exception("Failed to reconnect MCP servers after cancel: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: failed to reconnect MCP servers after cancel: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about MCP reconnect failure") def action_show_help(self) -> None: self.push_screen(HelpScreen()) @@ -2764,7 +2981,14 @@ Be concise. Use the actual data from notes.""" """Recall previous input into the chat field.""" try: inp = self.query_one("#chat-input", Input) - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to query chat input for history up: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: history navigation failed: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about history_up failure") return if not self._cmd_history: @@ -2780,7 +3004,14 @@ Be concise. Use the actual data from notes.""" """Recall next input (or clear when at end).""" try: inp = self.query_one("#chat-input", Input) - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to query chat input for history down: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: history navigation failed: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about history_down failure") return if not self._cmd_history: @@ -2800,14 +3031,26 @@ Be concise. Use the actual data from notes.""" try: await self.mcp_manager.disconnect_all() await asyncio.sleep(0.1) - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to disconnect MCP manager on unmount: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: error during shutdown disconnect: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about MCP disconnect failure") if self.runtime: try: await self.runtime.stop() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to stop runtime on unmount: %s", e) + try: + from .notifier import notify + + notify("warning", f"TUI: runtime stop error during shutdown: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about runtime stop failure") # ----- Entry Point ----- diff --git a/pentestagent/mcp/hexstrike_adapter.py b/pentestagent/mcp/hexstrike_adapter.py index 97bc483..e6f44b3 100644 --- a/pentestagent/mcp/hexstrike_adapter.py +++ b/pentestagent/mcp/hexstrike_adapter.py @@ -97,8 +97,16 @@ class HexstrikeAdapter: log_file = get_loot_file("artifacts/hexstrike.log") with log_file.open("a") as fh: fh.write(f"[HexstrikeAdapter] started pid={pid}\n") - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed to write hexstrike start PID to log: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed to write hexstrike PID to log: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike PID log failure") # Start a background reader task to capture logs loop = asyncio.get_running_loop() @@ -125,9 +133,17 @@ class HexstrikeAdapter: fh.write(line) fh.flush() except asyncio.CancelledError: - pass - except Exception: - pass + return + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error capturing hexstrike output: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"HexStrike log capture failed: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike log capture failure") async def stop(self, timeout: int = 5) -> None: """Stop the server process gracefully.""" @@ -141,10 +157,26 @@ class HexstrikeAdapter: except asyncio.TimeoutError: try: proc.kill() + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed to kill hexstrike after timeout: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed to kill hexstrike after timeout: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike kill failure") + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error stopping hexstrike process: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Error stopping hexstrike process: {e}") except Exception: - pass - except Exception: - pass + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike stop error") self._process = None @@ -152,8 +184,16 @@ class HexstrikeAdapter: self._reader_task.cancel() try: await self._reader_task - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error awaiting hexstrike reader task: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Error awaiting hexstrike reader task: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike reader await failure") def stop_sync(self, timeout: int = 5) -> None: """Synchronous stop helper for use during process-exit cleanup. @@ -177,7 +217,15 @@ class HexstrikeAdapter: try: os.kill(pid, signal.SIGTERM) except Exception: - pass + import logging + + logging.getLogger(__name__).exception("Failed to SIGTERM hexstrike pid: %s", pid) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed to SIGTERM hexstrike pid {pid}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike SIGTERM failure") # wait briefly for process to exit end = time.time() + float(timeout) @@ -195,20 +243,52 @@ class HexstrikeAdapter: try: os.kill(pid, signal.SIGKILL) except Exception: - pass - except Exception: - pass + import logging + + logging.getLogger(__name__).exception("Failed to SIGKILL hexstrike pid: %s", pid) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed to SIGKILL hexstrike pid {pid}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike SIGKILL failure") + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error during hexstrike stop_sync cleanup: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Error during hexstrike stop_sync cleanup: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike stop_sync cleanup error") def __del__(self): try: self.stop_sync() - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Exception during HexstrikeAdapter.__del__: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Error during HexstrikeAdapter cleanup: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike __del__ error") # Clear references try: self._process = None - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed to clear HexstrikeAdapter process reference: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed to clear hexstrike process reference: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike process-clear failure") async def health_check(self, timeout: int = 5) -> bool: """Check the server health endpoint. Returns True if healthy.""" @@ -219,7 +299,16 @@ class HexstrikeAdapter: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=timeout) as resp: return resp.status == 200 - except Exception: + except Exception as e: + import logging + + logging.getLogger(__name__).exception("HexstrikeAdapter health_check (aiohttp) failed: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"HexStrike health check failed: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike health check failure") return False # Fallback: synchronous urllib in thread @@ -229,7 +318,16 @@ class HexstrikeAdapter: try: with urllib.request.urlopen(url, timeout=timeout) as r: return r.status == 200 - except Exception: + except Exception as e: + import logging + + logging.getLogger(__name__).exception("HexstrikeAdapter health_check (urllib) failed: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"HexStrike health check failed: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about hexstrike urllib health check failure") return False loop = asyncio.get_running_loop() diff --git a/pentestagent/mcp/metasploit_adapter.py b/pentestagent/mcp/metasploit_adapter.py index 93437ae..1578f59 100644 --- a/pentestagent/mcp/metasploit_adapter.py +++ b/pentestagent/mcp/metasploit_adapter.py @@ -134,7 +134,16 @@ class MetasploitAdapter: await asyncio.sleep(0.5) # If we fallthrough, msfrpcd didn't become ready in time return - except Exception: + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed to start msfrpcd: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed to start msfrpcd: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd start failure") return async def _capture_msfrpcd_output(self) -> None: @@ -150,9 +159,17 @@ class MetasploitAdapter: fh.write(b"[msfrpcd] " + line) fh.flush() except asyncio.CancelledError: - pass - except Exception: - pass + return + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error capturing msfrpcd output: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"msfrpcd log capture failed: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd log capture failure") async def start(self, background: bool = True, timeout: int = 30) -> bool: """Start the vendored Metasploit MCP server. @@ -172,8 +189,16 @@ class MetasploitAdapter: if str(self.transport).lower() in ("http", "sse"): try: await self._start_msfrpcd_if_needed() - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error starting msfrpcd: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Error starting msfrpcd: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd error") cmd = self._build_command() resolved = shutil.which(self.python_cmd) or self.python_cmd @@ -195,8 +220,16 @@ class MetasploitAdapter: log_file = get_loot_file("artifacts/metasploit_mcp.log") with log_file.open("a") as fh: fh.write(f"[MetasploitAdapter] started pid={pid}\n") - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed to write metasploit start PID to log: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed to write metasploit PID to log: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about metasploit PID log failure") # Start background reader loop = asyncio.get_running_loop() @@ -204,7 +237,16 @@ class MetasploitAdapter: try: return await self.health_check(timeout=timeout) - except Exception: + except Exception as e: + import logging + + logging.getLogger(__name__).exception("MetasploitAdapter health_check raised: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Metasploit health check failed: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about metasploit health check failure") return False async def _capture_output(self) -> None: @@ -221,9 +263,17 @@ class MetasploitAdapter: fh.write(line) fh.flush() except asyncio.CancelledError: - pass - except Exception: - pass + return + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error capturing metasploit output: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Metasploit log capture failed: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about metasploit log capture failure") async def stop(self, timeout: int = 5) -> None: proc = self._process @@ -238,8 +288,16 @@ class MetasploitAdapter: proc.kill() except Exception: pass - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error waiting for process termination: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Error stopping metasploit adapter: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about metasploit stop error") self._process = None @@ -247,8 +305,16 @@ class MetasploitAdapter: self._reader_task.cancel() try: await self._reader_task - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed to kill msfrpcd during stop: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed to kill msfrpcd: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd kill failure") # Stop msfrpcd if we started it try: @@ -262,8 +328,16 @@ class MetasploitAdapter: msf_proc.kill() except Exception: pass - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error stopping metasploit adapter cleanup: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Error stopping metasploit adapter: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about metasploit adapter cleanup error") finally: self._msfrpcd_proc = None diff --git a/pentestagent/mcp/transport.py b/pentestagent/mcp/transport.py index 58d80eb..da93461 100644 --- a/pentestagent/mcp/transport.py +++ b/pentestagent/mcp/transport.py @@ -210,7 +210,16 @@ class SSETransport(MCPTransport): except asyncio.TimeoutError: # If endpoint not discovered, continue; send() will try discovery pass - except Exception: + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed opening SSE stream: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed opening SSE stream: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about SSE open failure") # If opening the SSE stream fails, still mark connected so # send() can attempt POST discovery and report meaningful errors. self._sse_response = None @@ -312,7 +321,16 @@ class SSETransport(MCPTransport): else: self._post_url = f"{p.scheme}://{p.netloc}/{endpoint.lstrip('/')}" return - except Exception: + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error during SSE POST endpoint discovery: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Error during SSE POST endpoint discovery: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about SSE discovery error") return async def disconnect(self): @@ -323,21 +341,53 @@ class SSETransport(MCPTransport): self._sse_task.cancel() try: await self._sse_task - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error awaiting SSE listener task during disconnect: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Error awaiting SSE listener task during disconnect: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about SSE listener await failure") self._sse_task = None except Exception: - pass + import logging + + logging.getLogger(__name__).exception("Error cancelling SSE listener task during disconnect") + try: + from ..interface.notifier import notify + + notify("warning", "Error cancelling SSE listener task during disconnect") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about SSE listener cancellation error") try: if self._sse_response: try: await self._sse_response.release() - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Error releasing SSE response during disconnect: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Error releasing SSE response during disconnect: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about SSE response release error") self._sse_response = None except Exception: - pass + import logging + + logging.getLogger(__name__).exception("Error handling SSE response during disconnect") + try: + from ..interface.notifier import notify + + notify("warning", "Error handling SSE response during disconnect") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about SSE response handling error") # Fail any pending requests async with self._pending_lock: @@ -365,7 +415,10 @@ class SSETransport(MCPTransport): async for raw in resp.content: try: line = raw.decode(errors="ignore").rstrip("\r\n") - except Exception: + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed to decode SSE raw chunk: %s", e) continue if line == "": # End of event; process accumulated lines @@ -392,14 +445,30 @@ class SSETransport(MCPTransport): self._post_url = f"{p.scheme}://{p.netloc}{endpoint}" else: self._post_url = f"{p.scheme}://{p.netloc}/{endpoint.lstrip('/')}" - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed parsing SSE endpoint announcement: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed parsing SSE endpoint announcement: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about SSE endpoint parse failure") # Notify connect() that endpoint is ready try: if self._endpoint_ready and not self._endpoint_ready.is_set(): self._endpoint_ready.set() - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed to set SSE endpoint ready event: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed to set SSE endpoint ready event: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about SSE endpoint ready event failure") else: # Try to parse as JSON and resolve pending futures try: @@ -410,8 +479,16 @@ class SSETransport(MCPTransport): fut = self._pending.get(msg_id) if fut and not fut.done(): fut.set_result(obj) - except Exception: - pass + except Exception as e: + import logging + + logging.getLogger(__name__).exception("Failed parsing SSE event JSON or resolving pending future: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed parsing SSE event JSON or resolving pending future: {e}") + except Exception: + logging.getLogger(__name__).exception("Failed to notify operator about SSE event parse/future failure") event_lines = [] else: diff --git a/pentestagent/runtime/docker_runtime.py b/pentestagent/runtime/docker_runtime.py index 0afdeb5..0ecfea6 100644 --- a/pentestagent/runtime/docker_runtime.py +++ b/pentestagent/runtime/docker_runtime.py @@ -3,6 +3,7 @@ import asyncio import io import tarfile +import logging from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Optional @@ -77,7 +78,14 @@ class DockerRuntime(Runtime): if self.container.status != "running": self.container.start() await asyncio.sleep(2) # Wait for container to fully start - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to get or start existing container: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"DockerRuntime: container check failed: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about docker container check failure: %s", ne) # Create new container volumes = { str(Path.home() / ".pentestagent"): { @@ -109,8 +117,14 @@ class DockerRuntime(Runtime): try: self.container.stop(timeout=10) self.container.remove() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed stopping/removing container: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"DockerRuntime: failed stopping/removing container: {e}") + except Exception as ne: + logging.getLogger(__name__).exception("Failed to notify operator about docker stop error: %s", ne) finally: self.container = None @@ -261,7 +275,14 @@ class DockerRuntime(Runtime): try: self.container.reload() return self.container.status == "running" - except Exception: + except Exception as e: + logging.getLogger(__name__).exception("Failed to determine container running state: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"DockerRuntime: is_running check failed: {e}") + except Exception: + pass return False async def get_status(self) -> dict: diff --git a/pentestagent/runtime/runtime.py b/pentestagent/runtime/runtime.py index 355bf34..e9f8c06 100644 --- a/pentestagent/runtime/runtime.py +++ b/pentestagent/runtime/runtime.py @@ -2,6 +2,7 @@ import platform import shutil +import logging from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import TYPE_CHECKING, List, Optional @@ -303,8 +304,8 @@ def detect_environment() -> EnvironmentInfo: with open("/proc/version", "r") as f: if "microsoft" in f.read().lower(): os_name = "Linux (WSL)" - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).debug("WSL detection probe failed: %s", e) # Detect available tools with categories available_tools = [] @@ -478,8 +479,16 @@ class LocalRuntime(Runtime): proc.stdout.close() if proc.stderr: proc.stderr.close() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Error cleaning up active process: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"Runtime: error cleaning up process: {e}") + except Exception as ne: + logging.getLogger(__name__).exception( + "Failed to notify operator about process cleanup error: %s", ne + ) self._active_processes.clear() # Clean up browser @@ -492,29 +501,57 @@ class LocalRuntime(Runtime): if self._page: try: await self._page.close() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to close browser page: %s", e) + try: + from ..interface.notifier import notify + notify("warning", f"Runtime: failed to close browser page: {e}") + except Exception as ne: + logging.getLogger(__name__).exception( + "Failed to notify operator about browser page close error: %s", ne + ) self._page = None if self._browser_context: try: await self._browser_context.close() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to close browser context: %s", e) + try: + from ..interface.notifier import notify + notify("warning", f"Runtime: failed to close browser context: {e}") + except Exception as ne: + logging.getLogger(__name__).exception( + "Failed to notify operator about browser context close error: %s", ne + ) self._browser_context = None if self._browser: try: await self._browser.close() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to close browser: %s", e) + try: + from ..interface.notifier import notify + notify("warning", f"Runtime: failed to close browser: {e}") + except Exception as ne: + logging.getLogger(__name__).exception( + "Failed to notify operator about browser close error: %s", ne + ) self._browser = None if self._playwright: try: await self._playwright.stop() - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).exception("Failed to stop playwright: %s", e) + try: + from ..interface.notifier import notify + notify("warning", f"Runtime: failed to stop playwright: {e}") + except Exception as ne: + logging.getLogger(__name__).exception( + "Failed to notify operator about playwright stop error: %s", ne + ) self._playwright = None async def _ensure_browser(self): diff --git a/pentestagent/workspaces/utils.py b/pentestagent/workspaces/utils.py index dddf29c..9c3539b 100644 --- a/pentestagent/workspaces/utils.py +++ b/pentestagent/workspaces/utils.py @@ -114,7 +114,7 @@ def export_workspace(name: str, output: Optional[Path] = None, root: Optional[Pa rel = p.relative_to(root) entries.append(rel) - entries = sorted(entries, key=lambda p: str(p)) + entries = sorted(entries, key=str) # Create tar.gz with tarfile.open(out_path, "w:gz") as tf: diff --git a/pentestagent/workspaces/validation.py b/pentestagent/workspaces/validation.py index 6b44906..72220a1 100644 --- a/pentestagent/workspaces/validation.py +++ b/pentestagent/workspaces/validation.py @@ -12,11 +12,18 @@ from .manager import TargetManager def gather_candidate_targets(obj: Any) -> List[str]: - """Extract candidate target strings from arguments (shallow). + """ + Extract candidate target strings from arguments (shallow, non-recursive). - This intentionally performs a shallow inspection to keep the function - fast and predictable; nested structures should be handled by callers - if required. + This function inspects only the top-level of the provided object (str or dict) + and collects values for common target keys (e.g., 'target', 'host', 'ip', etc.). + It does NOT recurse into nested dictionaries or lists. If you need to extract + targets from deeply nested structures, you must implement or call a recursive + extractor separately. + + Rationale: Shallow extraction is fast and predictable for most tool argument + schemas. For recursive extraction, see the project documentation or extend + this function as needed. """ candidates: List[str] = [] if isinstance(obj, str): diff --git a/tests/test_rag_workspace_integration.py b/tests/test_rag_workspace_integration.py index bcfeb14..5d94d78 100644 --- a/tests/test_rag_workspace_integration.py +++ b/tests/test_rag_workspace_integration.py @@ -48,6 +48,9 @@ def test_rag_and_indexer_use_workspace(tmp_path, monkeypatch): # If load-on-init doesn't run, calling index() would re-index and rewrite the file rag2.index() assert rag2.get_chunk_count() >= 1 + # Assert that the index file was not overwritten (mtime unchanged) + mtime_after = emb_path.stat().st_mtime + assert mtime_after == mtime_before, "Index file was unexpectedly overwritten (should have been loaded)" mtime_after = emb_path.stat().st_mtime assert mtime_after == mtime_before, "Expected persisted index to be loaded, not re-written" diff --git a/tests/test_validation.py b/tests/test_validation.py new file mode 100644 index 0000000..67624f4 --- /dev/null +++ b/tests/test_validation.py @@ -0,0 +1,51 @@ +import pytest +from pentestagent.workspaces.validation import gather_candidate_targets, is_target_in_scope + +def test_gather_candidate_targets_shallow(): + args = { + "target": "10.0.0.1", + "hosts": ["host1", "host2"], + "nested": {"target": "should_not_find"}, + "ip": "192.168.1.1", + "irrelevant": "nope" + } + result = gather_candidate_targets(args) + assert "10.0.0.1" in result + assert "host1" in result and "host2" in result + assert "192.168.1.1" in result + assert "should_not_find" not in result + assert "nope" not in result + +def test_is_target_in_scope_ip_cidr_hostname(): + allowed = ["192.168.0.0/16", "host.local", "10.0.0.1"] + # IP in CIDR + assert is_target_in_scope("192.168.1.5", allowed) + # Exact IP + assert is_target_in_scope("10.0.0.1", allowed) + # Hostname + assert is_target_in_scope("host.local", allowed) + # Not in scope + assert not is_target_in_scope("8.8.8.8", allowed) + assert not is_target_in_scope("otherhost", allowed) + +def test_is_target_in_scope_cidr_vs_cidr(): + allowed = ["10.0.0.0/24"] + # Subnet of allowed + assert is_target_in_scope("10.0.0.128/25", allowed) + # Same network + assert is_target_in_scope("10.0.0.0/24", allowed) + # Not a subnet + assert not is_target_in_scope("10.0.1.0/24", allowed) + +def test_is_target_in_scope_single_ip_cidr(): + allowed = ["10.0.0.1"] + # Single-IP network + assert is_target_in_scope("10.0.0.1/32", allowed) + # Not matching + assert not is_target_in_scope("10.0.0.2/32", allowed) + +def test_is_target_in_scope_case_insensitive_hostname(): + allowed = ["Example.COM"] + assert is_target_in_scope("example.com", allowed) + assert is_target_in_scope("EXAMPLE.com", allowed) + assert not is_target_in_scope("other.com", allowed) diff --git a/tests/test_workspace_utils.py b/tests/test_workspace_utils.py new file mode 100644 index 0000000..7f15f31 --- /dev/null +++ b/tests/test_workspace_utils.py @@ -0,0 +1,57 @@ +import os +import tarfile +from pathlib import Path +import pytest +from pentestagent.workspaces.utils import export_workspace, import_workspace +from pentestagent.workspaces.manager import WorkspaceManager + +def test_export_import_workspace(tmp_path): + wm = WorkspaceManager(root=tmp_path) + name = "expimp" + wm.create(name) + wm.add_targets(name, ["10.1.1.1", "host1.local"]) + # Add a file to workspace + loot_dir = tmp_path / "workspaces" / name / "loot" + loot_dir.mkdir(parents=True, exist_ok=True) + (loot_dir / "loot.txt").write_text("lootdata") + + # Export + archive = export_workspace(name, root=tmp_path) + assert archive.exists() + with tarfile.open(archive, "r:gz") as tf: + members = tf.getnames() + assert any("loot.txt" in m for m in members) + assert any("meta.yaml" in m for m in members) + + # Remove workspace, then import + ws_dir = tmp_path / "workspaces" / name + for rootdir, dirs, files in os.walk(ws_dir, topdown=False): + for f in files: + os.remove(Path(rootdir) / f) + for d in dirs: + os.rmdir(Path(rootdir) / d) + os.rmdir(ws_dir) + assert not ws_dir.exists() + + imported = import_workspace(archive, root=tmp_path) + assert imported == name + assert (tmp_path / "workspaces" / name / "loot" / "loot.txt").exists() + assert (tmp_path / "workspaces" / name / "meta.yaml").exists() + + +def test_import_workspace_missing_meta(tmp_path): + # Create a tar.gz without meta.yaml + archive = tmp_path / "bad.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + tf.add(__file__, arcname="not_meta.txt") + with pytest.raises(ValueError): + import_workspace(archive, root=tmp_path) + + +def test_import_workspace_already_exists(tmp_path): + wm = WorkspaceManager(root=tmp_path) + name = "dupe" + wm.create(name) + archive = export_workspace(name, root=tmp_path) + with pytest.raises(FileExistsError): + import_workspace(archive, root=tmp_path)