docs: clarify gather_candidate_targets is shallow, not recursive

This commit is contained in:
giveen
2026-01-19 12:37:48 -07:00
parent 63233dc392
commit bdb0b1d908
17 changed files with 955 additions and 178 deletions

BIN
dupe-workspace.tar.gz Normal file

Binary file not shown.

BIN
expimp-workspace.tar.gz Normal file

Binary file not shown.

View File

@@ -106,7 +106,16 @@ class BaseAgent(ABC):
from ..tools.finish import TaskPlan
self._task_plan = TaskPlan()
except Exception:
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed importing TaskPlan: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to import TaskPlan: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about TaskPlan import failure")
# Fallback simple plan structure
class _SimplePlan:
def __init__(self):
@@ -127,8 +136,16 @@ class BaseAgent(ABC):
# Expose plan to runtime so tools like `finish` can access it
try:
self.runtime.plan = self._task_plan
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to attach plan to runtime: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to attach plan to runtime: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about runtime plan attach failure")
# Ensure agent starts idle
self.state_manager.transition_to(AgentState.IDLE)
@@ -448,11 +465,17 @@ class BaseAgent(ABC):
if active:
allowed = wm.list_targets(active)
for c in candidates:
try:
if not validation.is_target_in_scope(c, allowed):
for c in candidates:
try:
if not validation.is_target_in_scope(c, allowed):
out_of_scope.append(c)
except Exception as e:
import logging
logging.getLogger(__name__).exception(
"Error validating candidate target %s: %s", c, e
)
out_of_scope.append(c)
except Exception:
out_of_scope.append(c)
if active and out_of_scope:
# Block execution and return an explicit error requiring operator confirmation
@@ -478,6 +501,15 @@ class BaseAgent(ABC):
)
)
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error executing tool %s: %s", name, e)
try:
from ..interface.notifier import notify
notify("warning", f"Tool execution failed ({name}): {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about tool execution failure")
results.append(
ToolResult(
tool_call_id=tool_call_id,

View File

@@ -111,8 +111,16 @@ class CrewOrchestrator:
sections.append("\n".join(grouped[cat]))
notes_context = "\n\n".join(sections)
except Exception:
pass # Notes not available
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to gather notes for orchestrator prompt: %s", e)
try:
from ...interface.notifier import notify
notify("warning", f"Orchestrator: failed to gather notes: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about orchestrator notes failure: %s", ne)
# Format insights for prompt
insights_text = ""
@@ -271,6 +279,15 @@ class CrewOrchestrator:
break # Exit immediately after finish
except Exception as e:
import logging
logging.getLogger(__name__).exception("Worker tool execution failed (%s): %s", tc_name, e)
try:
from ...interface.notifier import notify
notify("warning", f"Worker tool execution failed ({tc_name}): {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about worker tool failure: %s", ne)
error_msg = f"Error: {e}"
yield {
"phase": "tool_result",
@@ -327,6 +344,15 @@ class CrewOrchestrator:
self.pool.finish_tokens = 0
break
except Exception as e:
import logging
logging.getLogger(__name__).exception("Auto-finish failed: %s", e)
try:
from ...interface.notifier import notify
notify("warning", f"Auto-finish failed: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about auto-finish failure: %s", ne)
yield {
"phase": "error",
"error": f"Auto-finish failed: {e}",
@@ -344,6 +370,15 @@ class CrewOrchestrator:
yield {"phase": "complete", "report": final_report}
except Exception as e:
import logging
logging.getLogger(__name__).exception("Orchestrator run failed: %s", e)
try:
from ...interface.notifier import notify
notify("error", f"CrewOrchestrator run failed: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about orchestrator run failure: %s", ne)
self.state = CrewState.ERROR
yield {"phase": "error", "error": str(e)}

View File

@@ -230,6 +230,15 @@ class WorkerPool:
raise
except Exception as e:
import logging
logging.getLogger(__name__).exception("Worker execution failed (%s): %s", worker.id, e)
try:
from ...interface.notifier import notify
notify("warning", f"Worker execution failed ({worker.id}): {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about worker execution failure: %s", ne)
worker.error = str(e)
worker.status = AgentStatus.ERROR
worker.completed_at = time.time()
@@ -239,8 +248,16 @@ class WorkerPool:
# Cleanup worker's isolated runtime
try:
await worker_runtime.stop()
except Exception:
pass # Best effort cleanup
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to stop worker runtime for %s: %s", worker.id, e)
try:
from ...interface.notifier import notify
notify("warning", f"Failed to stop worker runtime for {worker.id}: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about worker runtime stop failure: %s", ne)
async def _wait_for_dependencies(self, depends_on: List[str]) -> None:
"""Wait for dependent workers to complete."""
@@ -248,8 +265,17 @@ class WorkerPool:
if dep_id in self._tasks:
try:
await self._tasks[dep_id]
except (asyncio.CancelledError, Exception):
pass # Dependency failed, but we continue
except (asyncio.CancelledError, Exception) as e:
import logging
logging.getLogger(__name__).exception("Dependency wait failed for %s: %s", dep_id, e)
try:
from ...interface.notifier import notify
notify("warning", f"Dependency wait failed for {dep_id}: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about dependency wait failure: %s", ne)
# Dependency failed, but we continue
async def wait_for(self, agent_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""
@@ -269,8 +295,16 @@ class WorkerPool:
if agent_id in self._tasks:
try:
await self._tasks[agent_id]
except (asyncio.CancelledError, Exception):
pass
except (asyncio.CancelledError, Exception) as e:
import logging
logging.getLogger(__name__).exception("Waiting for agent task %s failed: %s", agent_id, e)
try:
from ...interface.notifier import notify
notify("warning", f"Waiting for agent {agent_id} failed: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about wait_for agent failure: %s", ne)
worker = self._workers.get(agent_id)
if worker:

View File

@@ -117,8 +117,16 @@ class PentestAgentAgent(BaseAgent):
sections.append("\n".join(grouped[cat]))
notes_context = "\n\n".join(sections)
except Exception:
pass # Notes not available
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to gather notes for agent prompt: %s", e)
try:
from ...interface.notifier import notify
notify("warning", f"Agent: failed to gather notes: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about agent notes failure")
# Get environment info from runtime
env = self.runtime.environment

View File

@@ -362,7 +362,14 @@ class ToolsScreen(ModalScreen):
def on_mount(self) -> None:
try:
tree = self.query_one("#tools-tree", Tree)
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to query tools tree: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to initialize tools tree: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about tools tree init failure: %s", e)
return
root = tree.root
@@ -376,8 +383,14 @@ class ToolsScreen(ModalScreen):
try:
tree.focus()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to focus tools tree: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to focus tools tree: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about tools tree focus failure: %s", e)
@on(Tree.NodeSelected, "#tools-tree")
def on_tool_selected(self, event: Tree.NodeSelected) -> None:
@@ -406,10 +419,22 @@ class ToolsScreen(ModalScreen):
text.append(f"{name}\n", style="bold #d4d4d4")
text.append(str(desc), style="#d4d4d4")
desc_widget.update(text)
except Exception:
pass
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to update tool description pane: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to update tool description: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about tool desc update failure: %s", e)
except Exception as e:
logging.getLogger(__name__).exception("Unhandled error in on_tool_selected: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: error handling tool selection: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about tool selection error: %s", e)
@on(Button.Pressed, "#tools-close")
def close_tools(self) -> None:
@@ -722,7 +747,14 @@ class TokenDiagnostics(Static):
# Lazy import of token_tracker (best-effort)
try:
from ..tools import token_tracker
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to import token_tracker: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: token tracker import failed: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about token_tracker import failure: %s", e)
token_tracker = None
text.append("Token Usage Diagnostics\n", style="bold #d4d4d4")
@@ -744,8 +776,14 @@ class TokenDiagnostics(Static):
token_tracker.record_usage_sync(0, 0)
stats = token_tracker.get_stats_sync()
reset_occurred = True
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Token tracker reset failed: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Token tracker reset failed: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about token tracker reset failure: %s", e)
# Extract values
last_in = int(stats.get("last_input_tokens", 0) or 0)
@@ -764,7 +802,8 @@ class TokenDiagnostics(Static):
return None
try:
return float(v)
except Exception:
except Exception as e:
logging.getLogger(__name__).debug("Failed to parse env var %s: %s", name, e)
return "INVALID"
unified = _parse_env("COST_PER_MILLION")
@@ -839,7 +878,14 @@ class TokenDiagnostics(Static):
dl = float(daily_limit)
remaining_tokens = max(int(dl - new_daily_total), 0)
percent_used = (new_daily_total / max(1.0, dl)) * 100.0
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to compute daily limit values: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to compute daily token limit: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about daily limit computation failure: %s", e)
remaining_tokens = None
# Render structured panel with aligned labels and block bars
@@ -1200,8 +1246,14 @@ class PentestAgentTUI(App):
from .notifier import register_callback
register_callback(self._notifier_callback)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to register TUI notifier callback: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to register notifier callback: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about notifier registration failure: %s", ne)
# Call the textual worker - decorator returns a Worker, not a coroutine
_ = cast(Any, self._initialize_agent())
@@ -1258,17 +1310,24 @@ class PentestAgentTUI(App):
self._add_system(f"[!] RAG: {e}")
self.rag_engine = None
# MCP - auto-load if config exists
# MCP - auto-load only if enabled in environment
mcp_server_count = 0
try:
self.mcp_manager = MCPManager()
if self.mcp_manager.config_path.exists():
mcp_tools = await self.mcp_manager.connect_all()
for tool in mcp_tools:
register_tool_instance(tool)
mcp_server_count = len(self.mcp_manager.servers)
except Exception as e:
self._add_system(f"[!] MCP: {e}")
import os
launch_hexstrike = os.getenv("LAUNCH_HEXTRIKE", "false").lower() == "true"
launch_metasploit = os.getenv("LAUNCH_METASPLOIT_MCP", "false").lower() == "true"
if launch_hexstrike or launch_metasploit:
try:
self.mcp_manager = MCPManager()
if self.mcp_manager.config_path.exists():
mcp_tools = await self.mcp_manager.connect_all()
for tool in mcp_tools:
register_tool_instance(tool)
mcp_server_count = len(self.mcp_manager.servers)
except Exception as e:
self._add_system(f"[!] MCP: {e}")
else:
self.mcp_manager = None
mcp_server_count = 0
# Runtime - Docker or Local
if self.use_docker:
@@ -1346,8 +1405,14 @@ class PentestAgentTUI(App):
if mode:
bar.mode = mode
self._mode = mode
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to update status bar: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to update status bar: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about status bar update failure: %s", ne)
def _show_notification(self, level: str, message: str) -> None:
"""Display a short operator-visible notification in the chat area."""
@@ -1358,8 +1423,8 @@ class PentestAgentTUI(App):
# Set status bar to error briefly for emphasis
if level.lower() in ("error", "critical"):
self._set_status("error")
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to show notification in TUI: %s", e)
def _notifier_callback(self, level: str, message: str) -> None:
"""Callback wired to `pentestagent.interface.notifier`.
@@ -1373,12 +1438,13 @@ class PentestAgentTUI(App):
try:
self.call_from_thread(self._show_notification, level, message)
return
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("call_from_thread failed in notifier callback: %s", e)
# Fall through to direct call
pass
self._show_notification(level, message)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Exception in notifier callback handling: %s", e)
def _add_message(self, widget: Static) -> None:
"""Add a message widget to chat"""
@@ -1387,8 +1453,14 @@ class PentestAgentTUI(App):
widget.add_class("message")
scroll.mount(widget)
scroll.scroll_end(animate=False)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to add message to chat: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to add chat message: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about add_message failure: %s", ne)
def _add_system(self, content: str) -> None:
self._add_message(SystemMessage(content))
@@ -1423,7 +1495,14 @@ class PentestAgentTUI(App):
"""Mount a live memory diagnostics widget into the chat area."""
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to query chat-scroll for memory diagnostics: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: memory diagnostics unavailable: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about memory diagnostics availability: %s", ne)
self._add_system("Agent not initialized")
return
# Mount a new diagnostics panel with a unique ID and scroll into view
@@ -1431,21 +1510,35 @@ class PentestAgentTUI(App):
import uuid
panel_id = f"memory-diagnostics-{uuid.uuid4().hex}"
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to generate memory diagnostics panel id: %s", e)
panel_id = None
widget = MemoryDiagnostics(id=panel_id)
scroll.mount(widget)
try:
scroll.scroll_end(animate=False)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to scroll to memory diagnostics panel: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to scroll to memory diagnostics panel: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about scroll failure: %s", ne)
def _show_token_stats(self) -> None:
"""Mount a live token diagnostics widget into the chat area."""
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to query chat-scroll for token diagnostics: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: token diagnostics unavailable: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics availability: %s", ne)
self._add_system("Agent not initialized")
return
# Mount a new diagnostics panel with a unique ID and scroll into view
@@ -1453,15 +1546,28 @@ class PentestAgentTUI(App):
import uuid
panel_id = f"token-diagnostics-{uuid.uuid4().hex}"
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to generate token diagnostics panel id: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to generate token diagnostics panel id: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics panel id generation failure: %s", ne)
panel_id = None
widget = TokenDiagnostics(id=panel_id)
scroll.mount(widget)
try:
scroll.scroll_end(animate=False)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to scroll to token diagnostics panel: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to scroll to token diagnostics panel: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics scroll failure: %s", ne)
async def _show_notes(self) -> None:
"""Display saved notes"""
@@ -1580,10 +1686,16 @@ class PentestAgentTUI(App):
from pentestagent.interface.notifier import notify
notify("warning", f"Failed to persist last target for workspace {active}: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about target persist error")
except Exception:
logging.getLogger(__name__).exception("Failed to access WorkspaceManager to persist last target")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target persist error: %s", ne)
except Exception as e:
logging.getLogger(__name__).exception("Failed to access WorkspaceManager to persist last target: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed to persist last target: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about WorkspaceManager access failure: %s", ne)
# Update displayed Target in the UI
try:
@@ -1594,8 +1706,8 @@ class PentestAgentTUI(App):
from pentestagent.interface.notifier import notify
notify("warning", f"Failed to update target display: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about target display error")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target display error: %s", ne)
# Update the initial ready SystemMessage (if present) so Target appears under Runtime
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
@@ -1626,8 +1738,8 @@ class PentestAgentTUI(App):
from pentestagent.interface.notifier import notify
notify("warning", f"Failed to refresh UI after target update: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about UI refresh error")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about UI refresh error: %s", ne)
except Exception as e:
# Fallback to append if regex replacement fails, and surface warning
logging.getLogger(__name__).exception("Failed to update SystemMessage target line: %s", e)
@@ -1635,8 +1747,8 @@ class PentestAgentTUI(App):
from pentestagent.interface.notifier import notify
notify("warning", f"Failed to update target display: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about target update error")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target update error: %s", ne)
child.message_content = (
child.message_content + f"\n Target: {target}"
)
@@ -1651,9 +1763,23 @@ class PentestAgentTUI(App):
scroll.mount_before(msg, first)
else:
scroll.mount(msg)
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to mount target system message: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed to display target: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target display failure: %s", ne)
self._add_system(f" Target: {target}")
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed while applying target display: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed while updating target display: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target display outer failure: %s", ne)
# Last resort: append a subtle system line
self._add_system(f" Target: {target}")
@@ -1884,8 +2010,14 @@ Be concise. Use the actual data from notes."""
self.agent.target = last
try:
self._apply_target_display(last)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to apply target display when restoring last target: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed restoring last target display: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about restore-last-target failure")
self._add_system(f"Active workspace: {active}")
return
@@ -1993,8 +2125,14 @@ Be concise. Use the actual data from notes."""
self.agent.target = last
try:
self._apply_target_display(last)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to apply target display when activating workspace: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed to restore workspace target display: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about workspace target restore failure")
if existed:
self._add_system(f"Workspace '{name}' set active.")
@@ -2118,8 +2256,14 @@ Be concise. Use the actual data from notes."""
try:
self._crew_orchestrator_node.expand()
tree.select_node(self._crew_orchestrator_node)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to expand/select crew orchestrator node: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to expand crew sidebar node: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about crew node expansion failure")
self._viewing_worker_id = None
# Update stats
@@ -2153,9 +2297,22 @@ Be concise. Use the actual data from notes."""
)
try:
child.refresh()
except Exception as e:
logging.getLogger(__name__).exception("Failed to refresh child message: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed to refresh UI element: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about child refresh failure")
except Exception as e:
logging.getLogger(__name__).exception("Failed to update SystemMessage target line: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"Failed to update target display: {e}")
except Exception:
pass
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about target update error")
child.message_content = (
child.message_content + f"\n Target: {target}"
)
@@ -2187,8 +2344,14 @@ Be concise. Use the actual data from notes."""
chat_area = self.query_one("#chat-area")
chat_area.remove_class("with-sidebar")
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Sidebar error: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: sidebar error: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about sidebar error")
def _update_crew_stats(self) -> None:
"""Update crew stats panel."""
@@ -2231,8 +2394,14 @@ Be concise. Use the actual data from notes."""
stats = self.query_one("#crew-stats", Static)
stats.update(text)
stats.border_title = "# Stats"
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to hide sidebar: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to hide sidebar: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hide_sidebar failure")
def _update_spinner(self) -> None:
"""Update spinner animation for running workers."""
@@ -2254,8 +2423,14 @@ Be concise. Use the actual data from notes."""
if not has_running and self._spinner_timer:
self._spinner_timer.stop()
self._spinner_timer = None
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to update crew stats: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to update crew stats: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about crew stats update failure")
def _add_crew_worker(self, worker_id: str, worker_type: str, task: str) -> None:
"""Add a worker to the sidebar tree."""
@@ -2275,11 +2450,23 @@ Be concise. Use the actual data from notes."""
self._crew_worker_nodes[worker_id] = node
try:
self._crew_orchestrator_node.expand()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to expand crew orchestrator node: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to expand crew node: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about crew node expansion failure")
self._update_crew_stats()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to update spinner: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to update spinner: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about spinner update failure")
def _update_crew_worker(self, worker_id: str, **updates) -> None:
"""Update a worker's state."""
@@ -2297,8 +2484,14 @@ Be concise. Use the actual data from notes."""
label = self._format_worker_label(worker_id)
self._crew_worker_nodes[worker_id].set_label(label)
self._update_crew_stats()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to add crew worker node: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to add crew worker node: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about add_crew_worker failure")
def _format_worker_label(self, worker_id: str) -> Text:
"""Format worker label for tree."""
@@ -2394,8 +2587,14 @@ Be concise. Use the actual data from notes."""
if node:
node.add_leaf(f" {tool_name}")
node.expand()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to update crew worker display: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to update crew worker display: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about update_crew_worker failure")
@on(Tree.NodeSelected, "#workers-tree")
def on_worker_tree_selected(self, event: Tree.NodeSelected) -> None:
@@ -2538,8 +2737,14 @@ Be concise. Use the actual data from notes."""
if self._current_crew:
try:
await self._current_crew.cancel()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to add tool to worker node: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to add tool to worker: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about add_tool_to_worker failure")
self._current_crew = None
self._add_system(f"[!] Crew error: {e}\n{traceback.format_exc()}")
self._set_status("error")
@@ -2743,8 +2948,14 @@ Be concise. Use the actual data from notes."""
for worker_id, worker in self._crew_workers.items():
if worker.get("status") in ("running", "pending"):
self._update_crew_worker(worker_id, status="cancelled")
except Exception:
pass # Best effort
except Exception as e:
logging.getLogger(__name__).exception("Failed to cancel crew orchestrator cleanly: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed during crew cancellation: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about crew cancellation failure")
async def _reconnect_mcp_after_cancel(self) -> None:
"""Reconnect MCP servers after cancellation to restore clean state."""
@@ -2752,8 +2963,14 @@ Be concise. Use the actual data from notes."""
try:
if self.mcp_manager:
await self.mcp_manager.reconnect_all()
except Exception:
pass # Best effort - don't crash if reconnect fails
except Exception as e:
logging.getLogger(__name__).exception("Failed to reconnect MCP servers after cancel: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to reconnect MCP servers after cancel: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about MCP reconnect failure")
def action_show_help(self) -> None:
self.push_screen(HelpScreen())
@@ -2764,7 +2981,14 @@ Be concise. Use the actual data from notes."""
"""Recall previous input into the chat field."""
try:
inp = self.query_one("#chat-input", Input)
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to query chat input for history up: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: history navigation failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about history_up failure")
return
if not self._cmd_history:
@@ -2780,7 +3004,14 @@ Be concise. Use the actual data from notes."""
"""Recall next input (or clear when at end)."""
try:
inp = self.query_one("#chat-input", Input)
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to query chat input for history down: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: history navigation failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about history_down failure")
return
if not self._cmd_history:
@@ -2800,14 +3031,26 @@ Be concise. Use the actual data from notes."""
try:
await self.mcp_manager.disconnect_all()
await asyncio.sleep(0.1)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to disconnect MCP manager on unmount: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: error during shutdown disconnect: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about MCP disconnect failure")
if self.runtime:
try:
await self.runtime.stop()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to stop runtime on unmount: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: runtime stop error during shutdown: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about runtime stop failure")
# ----- Entry Point -----

View File

@@ -97,8 +97,16 @@ class HexstrikeAdapter:
log_file = get_loot_file("artifacts/hexstrike.log")
with log_file.open("a") as fh:
fh.write(f"[HexstrikeAdapter] started pid={pid}\n")
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to write hexstrike start PID to log: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to write hexstrike PID to log: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike PID log failure")
# Start a background reader task to capture logs
loop = asyncio.get_running_loop()
@@ -125,9 +133,17 @@ class HexstrikeAdapter:
fh.write(line)
fh.flush()
except asyncio.CancelledError:
pass
except Exception:
pass
return
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error capturing hexstrike output: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"HexStrike log capture failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike log capture failure")
async def stop(self, timeout: int = 5) -> None:
"""Stop the server process gracefully."""
@@ -141,10 +157,26 @@ class HexstrikeAdapter:
except asyncio.TimeoutError:
try:
proc.kill()
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to kill hexstrike after timeout: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to kill hexstrike after timeout: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike kill failure")
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error stopping hexstrike process: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error stopping hexstrike process: {e}")
except Exception:
pass
except Exception:
pass
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike stop error")
self._process = None
@@ -152,8 +184,16 @@ class HexstrikeAdapter:
self._reader_task.cancel()
try:
await self._reader_task
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error awaiting hexstrike reader task: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error awaiting hexstrike reader task: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike reader await failure")
def stop_sync(self, timeout: int = 5) -> None:
"""Synchronous stop helper for use during process-exit cleanup.
@@ -177,7 +217,15 @@ class HexstrikeAdapter:
try:
os.kill(pid, signal.SIGTERM)
except Exception:
pass
import logging
logging.getLogger(__name__).exception("Failed to SIGTERM hexstrike pid: %s", pid)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to SIGTERM hexstrike pid {pid}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike SIGTERM failure")
# wait briefly for process to exit
end = time.time() + float(timeout)
@@ -195,20 +243,52 @@ class HexstrikeAdapter:
try:
os.kill(pid, signal.SIGKILL)
except Exception:
pass
except Exception:
pass
import logging
logging.getLogger(__name__).exception("Failed to SIGKILL hexstrike pid: %s", pid)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to SIGKILL hexstrike pid {pid}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike SIGKILL failure")
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error during hexstrike stop_sync cleanup: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error during hexstrike stop_sync cleanup: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike stop_sync cleanup error")
def __del__(self):
try:
self.stop_sync()
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Exception during HexstrikeAdapter.__del__: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error during HexstrikeAdapter cleanup: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike __del__ error")
# Clear references
try:
self._process = None
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to clear HexstrikeAdapter process reference: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to clear hexstrike process reference: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike process-clear failure")
async def health_check(self, timeout: int = 5) -> bool:
"""Check the server health endpoint. Returns True if healthy."""
@@ -219,7 +299,16 @@ class HexstrikeAdapter:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as resp:
return resp.status == 200
except Exception:
except Exception as e:
import logging
logging.getLogger(__name__).exception("HexstrikeAdapter health_check (aiohttp) failed: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"HexStrike health check failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike health check failure")
return False
# Fallback: synchronous urllib in thread
@@ -229,7 +318,16 @@ class HexstrikeAdapter:
try:
with urllib.request.urlopen(url, timeout=timeout) as r:
return r.status == 200
except Exception:
except Exception as e:
import logging
logging.getLogger(__name__).exception("HexstrikeAdapter health_check (urllib) failed: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"HexStrike health check failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike urllib health check failure")
return False
loop = asyncio.get_running_loop()

View File

@@ -134,7 +134,16 @@ class MetasploitAdapter:
await asyncio.sleep(0.5)
# If we fallthrough, msfrpcd didn't become ready in time
return
except Exception:
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to start msfrpcd: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to start msfrpcd: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd start failure")
return
async def _capture_msfrpcd_output(self) -> None:
@@ -150,9 +159,17 @@ class MetasploitAdapter:
fh.write(b"[msfrpcd] " + line)
fh.flush()
except asyncio.CancelledError:
pass
except Exception:
pass
return
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error capturing msfrpcd output: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"msfrpcd log capture failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd log capture failure")
async def start(self, background: bool = True, timeout: int = 30) -> bool:
"""Start the vendored Metasploit MCP server.
@@ -172,8 +189,16 @@ class MetasploitAdapter:
if str(self.transport).lower() in ("http", "sse"):
try:
await self._start_msfrpcd_if_needed()
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error starting msfrpcd: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error starting msfrpcd: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd error")
cmd = self._build_command()
resolved = shutil.which(self.python_cmd) or self.python_cmd
@@ -195,8 +220,16 @@ class MetasploitAdapter:
log_file = get_loot_file("artifacts/metasploit_mcp.log")
with log_file.open("a") as fh:
fh.write(f"[MetasploitAdapter] started pid={pid}\n")
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to write metasploit start PID to log: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to write metasploit PID to log: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about metasploit PID log failure")
# Start background reader
loop = asyncio.get_running_loop()
@@ -204,7 +237,16 @@ class MetasploitAdapter:
try:
return await self.health_check(timeout=timeout)
except Exception:
except Exception as e:
import logging
logging.getLogger(__name__).exception("MetasploitAdapter health_check raised: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Metasploit health check failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about metasploit health check failure")
return False
async def _capture_output(self) -> None:
@@ -221,9 +263,17 @@ class MetasploitAdapter:
fh.write(line)
fh.flush()
except asyncio.CancelledError:
pass
except Exception:
pass
return
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error capturing metasploit output: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Metasploit log capture failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about metasploit log capture failure")
async def stop(self, timeout: int = 5) -> None:
proc = self._process
@@ -238,8 +288,16 @@ class MetasploitAdapter:
proc.kill()
except Exception:
pass
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error waiting for process termination: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error stopping metasploit adapter: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about metasploit stop error")
self._process = None
@@ -247,8 +305,16 @@ class MetasploitAdapter:
self._reader_task.cancel()
try:
await self._reader_task
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to kill msfrpcd during stop: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to kill msfrpcd: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd kill failure")
# Stop msfrpcd if we started it
try:
@@ -262,8 +328,16 @@ class MetasploitAdapter:
msf_proc.kill()
except Exception:
pass
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error stopping metasploit adapter cleanup: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error stopping metasploit adapter: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about metasploit adapter cleanup error")
finally:
self._msfrpcd_proc = None

View File

@@ -210,7 +210,16 @@ class SSETransport(MCPTransport):
except asyncio.TimeoutError:
# If endpoint not discovered, continue; send() will try discovery
pass
except Exception:
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed opening SSE stream: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed opening SSE stream: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE open failure")
# If opening the SSE stream fails, still mark connected so
# send() can attempt POST discovery and report meaningful errors.
self._sse_response = None
@@ -312,7 +321,16 @@ class SSETransport(MCPTransport):
else:
self._post_url = f"{p.scheme}://{p.netloc}/{endpoint.lstrip('/')}"
return
except Exception:
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error during SSE POST endpoint discovery: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error during SSE POST endpoint discovery: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE discovery error")
return
async def disconnect(self):
@@ -323,21 +341,53 @@ class SSETransport(MCPTransport):
self._sse_task.cancel()
try:
await self._sse_task
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error awaiting SSE listener task during disconnect: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error awaiting SSE listener task during disconnect: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE listener await failure")
self._sse_task = None
except Exception:
pass
import logging
logging.getLogger(__name__).exception("Error cancelling SSE listener task during disconnect")
try:
from ..interface.notifier import notify
notify("warning", "Error cancelling SSE listener task during disconnect")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE listener cancellation error")
try:
if self._sse_response:
try:
await self._sse_response.release()
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error releasing SSE response during disconnect: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error releasing SSE response during disconnect: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE response release error")
self._sse_response = None
except Exception:
pass
import logging
logging.getLogger(__name__).exception("Error handling SSE response during disconnect")
try:
from ..interface.notifier import notify
notify("warning", "Error handling SSE response during disconnect")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE response handling error")
# Fail any pending requests
async with self._pending_lock:
@@ -365,7 +415,10 @@ class SSETransport(MCPTransport):
async for raw in resp.content:
try:
line = raw.decode(errors="ignore").rstrip("\r\n")
except Exception:
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to decode SSE raw chunk: %s", e)
continue
if line == "":
# End of event; process accumulated lines
@@ -392,14 +445,30 @@ class SSETransport(MCPTransport):
self._post_url = f"{p.scheme}://{p.netloc}{endpoint}"
else:
self._post_url = f"{p.scheme}://{p.netloc}/{endpoint.lstrip('/')}"
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed parsing SSE endpoint announcement: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed parsing SSE endpoint announcement: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE endpoint parse failure")
# Notify connect() that endpoint is ready
try:
if self._endpoint_ready and not self._endpoint_ready.is_set():
self._endpoint_ready.set()
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to set SSE endpoint ready event: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to set SSE endpoint ready event: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE endpoint ready event failure")
else:
# Try to parse as JSON and resolve pending futures
try:
@@ -410,8 +479,16 @@ class SSETransport(MCPTransport):
fut = self._pending.get(msg_id)
if fut and not fut.done():
fut.set_result(obj)
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed parsing SSE event JSON or resolving pending future: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed parsing SSE event JSON or resolving pending future: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE event parse/future failure")
event_lines = []
else:

View File

@@ -3,6 +3,7 @@
import asyncio
import io
import tarfile
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Optional
@@ -77,7 +78,14 @@ class DockerRuntime(Runtime):
if self.container.status != "running":
self.container.start()
await asyncio.sleep(2) # Wait for container to fully start
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to get or start existing container: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"DockerRuntime: container check failed: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about docker container check failure: %s", ne)
# Create new container
volumes = {
str(Path.home() / ".pentestagent"): {
@@ -109,8 +117,14 @@ class DockerRuntime(Runtime):
try:
self.container.stop(timeout=10)
self.container.remove()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed stopping/removing container: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"DockerRuntime: failed stopping/removing container: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about docker stop error: %s", ne)
finally:
self.container = None
@@ -261,7 +275,14 @@ class DockerRuntime(Runtime):
try:
self.container.reload()
return self.container.status == "running"
except Exception:
except Exception as e:
logging.getLogger(__name__).exception("Failed to determine container running state: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"DockerRuntime: is_running check failed: {e}")
except Exception:
pass
return False
async def get_status(self) -> dict:

View File

@@ -2,6 +2,7 @@
import platform
import shutil
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, List, Optional
@@ -303,8 +304,8 @@ def detect_environment() -> EnvironmentInfo:
with open("/proc/version", "r") as f:
if "microsoft" in f.read().lower():
os_name = "Linux (WSL)"
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).debug("WSL detection probe failed: %s", e)
# Detect available tools with categories
available_tools = []
@@ -478,8 +479,16 @@ class LocalRuntime(Runtime):
proc.stdout.close()
if proc.stderr:
proc.stderr.close()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Error cleaning up active process: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Runtime: error cleaning up process: {e}")
except Exception as ne:
logging.getLogger(__name__).exception(
"Failed to notify operator about process cleanup error: %s", ne
)
self._active_processes.clear()
# Clean up browser
@@ -492,29 +501,57 @@ class LocalRuntime(Runtime):
if self._page:
try:
await self._page.close()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to close browser page: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Runtime: failed to close browser page: {e}")
except Exception as ne:
logging.getLogger(__name__).exception(
"Failed to notify operator about browser page close error: %s", ne
)
self._page = None
if self._browser_context:
try:
await self._browser_context.close()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to close browser context: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Runtime: failed to close browser context: {e}")
except Exception as ne:
logging.getLogger(__name__).exception(
"Failed to notify operator about browser context close error: %s", ne
)
self._browser_context = None
if self._browser:
try:
await self._browser.close()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to close browser: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Runtime: failed to close browser: {e}")
except Exception as ne:
logging.getLogger(__name__).exception(
"Failed to notify operator about browser close error: %s", ne
)
self._browser = None
if self._playwright:
try:
await self._playwright.stop()
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception("Failed to stop playwright: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Runtime: failed to stop playwright: {e}")
except Exception as ne:
logging.getLogger(__name__).exception(
"Failed to notify operator about playwright stop error: %s", ne
)
self._playwright = None
async def _ensure_browser(self):

View File

@@ -114,7 +114,7 @@ def export_workspace(name: str, output: Optional[Path] = None, root: Optional[Pa
rel = p.relative_to(root)
entries.append(rel)
entries = sorted(entries, key=lambda p: str(p))
entries = sorted(entries, key=str)
# Create tar.gz
with tarfile.open(out_path, "w:gz") as tf:

View File

@@ -12,11 +12,18 @@ from .manager import TargetManager
def gather_candidate_targets(obj: Any) -> List[str]:
"""Extract candidate target strings from arguments (shallow).
"""
Extract candidate target strings from arguments (shallow, non-recursive).
This intentionally performs a shallow inspection to keep the function
fast and predictable; nested structures should be handled by callers
if required.
This function inspects only the top-level of the provided object (str or dict)
and collects values for common target keys (e.g., 'target', 'host', 'ip', etc.).
It does NOT recurse into nested dictionaries or lists. If you need to extract
targets from deeply nested structures, you must implement or call a recursive
extractor separately.
Rationale: Shallow extraction is fast and predictable for most tool argument
schemas. For recursive extraction, see the project documentation or extend
this function as needed.
"""
candidates: List[str] = []
if isinstance(obj, str):

View File

@@ -48,6 +48,9 @@ def test_rag_and_indexer_use_workspace(tmp_path, monkeypatch):
# If load-on-init doesn't run, calling index() would re-index and rewrite the file
rag2.index()
assert rag2.get_chunk_count() >= 1
# Assert that the index file was not overwritten (mtime unchanged)
mtime_after = emb_path.stat().st_mtime
assert mtime_after == mtime_before, "Index file was unexpectedly overwritten (should have been loaded)"
mtime_after = emb_path.stat().st_mtime
assert mtime_after == mtime_before, "Expected persisted index to be loaded, not re-written"

51
tests/test_validation.py Normal file
View File

@@ -0,0 +1,51 @@
import pytest
from pentestagent.workspaces.validation import gather_candidate_targets, is_target_in_scope
def test_gather_candidate_targets_shallow():
args = {
"target": "10.0.0.1",
"hosts": ["host1", "host2"],
"nested": {"target": "should_not_find"},
"ip": "192.168.1.1",
"irrelevant": "nope"
}
result = gather_candidate_targets(args)
assert "10.0.0.1" in result
assert "host1" in result and "host2" in result
assert "192.168.1.1" in result
assert "should_not_find" not in result
assert "nope" not in result
def test_is_target_in_scope_ip_cidr_hostname():
allowed = ["192.168.0.0/16", "host.local", "10.0.0.1"]
# IP in CIDR
assert is_target_in_scope("192.168.1.5", allowed)
# Exact IP
assert is_target_in_scope("10.0.0.1", allowed)
# Hostname
assert is_target_in_scope("host.local", allowed)
# Not in scope
assert not is_target_in_scope("8.8.8.8", allowed)
assert not is_target_in_scope("otherhost", allowed)
def test_is_target_in_scope_cidr_vs_cidr():
allowed = ["10.0.0.0/24"]
# Subnet of allowed
assert is_target_in_scope("10.0.0.128/25", allowed)
# Same network
assert is_target_in_scope("10.0.0.0/24", allowed)
# Not a subnet
assert not is_target_in_scope("10.0.1.0/24", allowed)
def test_is_target_in_scope_single_ip_cidr():
allowed = ["10.0.0.1"]
# Single-IP network
assert is_target_in_scope("10.0.0.1/32", allowed)
# Not matching
assert not is_target_in_scope("10.0.0.2/32", allowed)
def test_is_target_in_scope_case_insensitive_hostname():
allowed = ["Example.COM"]
assert is_target_in_scope("example.com", allowed)
assert is_target_in_scope("EXAMPLE.com", allowed)
assert not is_target_in_scope("other.com", allowed)

View File

@@ -0,0 +1,57 @@
import os
import tarfile
from pathlib import Path
import pytest
from pentestagent.workspaces.utils import export_workspace, import_workspace
from pentestagent.workspaces.manager import WorkspaceManager
def test_export_import_workspace(tmp_path):
wm = WorkspaceManager(root=tmp_path)
name = "expimp"
wm.create(name)
wm.add_targets(name, ["10.1.1.1", "host1.local"])
# Add a file to workspace
loot_dir = tmp_path / "workspaces" / name / "loot"
loot_dir.mkdir(parents=True, exist_ok=True)
(loot_dir / "loot.txt").write_text("lootdata")
# Export
archive = export_workspace(name, root=tmp_path)
assert archive.exists()
with tarfile.open(archive, "r:gz") as tf:
members = tf.getnames()
assert any("loot.txt" in m for m in members)
assert any("meta.yaml" in m for m in members)
# Remove workspace, then import
ws_dir = tmp_path / "workspaces" / name
for rootdir, dirs, files in os.walk(ws_dir, topdown=False):
for f in files:
os.remove(Path(rootdir) / f)
for d in dirs:
os.rmdir(Path(rootdir) / d)
os.rmdir(ws_dir)
assert not ws_dir.exists()
imported = import_workspace(archive, root=tmp_path)
assert imported == name
assert (tmp_path / "workspaces" / name / "loot" / "loot.txt").exists()
assert (tmp_path / "workspaces" / name / "meta.yaml").exists()
def test_import_workspace_missing_meta(tmp_path):
# Create a tar.gz without meta.yaml
archive = tmp_path / "bad.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
tf.add(__file__, arcname="not_meta.txt")
with pytest.raises(ValueError):
import_workspace(archive, root=tmp_path)
def test_import_workspace_already_exists(tmp_path):
wm = WorkspaceManager(root=tmp_path)
name = "dupe"
wm.create(name)
archive = export_workspace(name, root=tmp_path)
with pytest.raises(FileExistsError):
import_workspace(archive, root=tmp_path)