diff --git a/.gitignore b/.gitignore index c02f9b1..54d2aca 100644 --- a/.gitignore +++ b/.gitignore @@ -81,3 +81,12 @@ Thumbs.db tmp/ temp/ *.tmp + +# Local test artifacts and test scripts (do not commit local test runs) +tests/*.log +tests/*.out +tests/output/ +tests/tmp/ +tests/*.local.py +scripts/test_*.sh +*.test.sh diff --git a/pentestagent/agents/base_agent.py b/pentestagent/agents/base_agent.py index 2f96e85..7ecaf25 100644 --- a/pentestagent/agents/base_agent.py +++ b/pentestagent/agents/base_agent.py @@ -6,6 +6,10 @@ from typing import TYPE_CHECKING, Any, AsyncIterator, List, Optional from ..config.constants import AGENT_MAX_ITERATIONS from .state import AgentState, AgentStateManager +from types import MappingProxyType + +from ..workspaces.manager import WorkspaceManager, TargetManager, WorkspaceError +from ..workspaces.utils import resolve_knowledge_paths if TYPE_CHECKING: from ..llm import LLM @@ -106,6 +110,32 @@ class BaseAgent(ABC): # Use tools as-is (finish accesses plan via runtime) self.tools = list(tools) + @property + def workspace_context(self): + """Return a read-only workspace context built at access time. + + Uses WorkspaceManager.get_active() as the single source of truth + and does not cache state between calls. + """ + wm = WorkspaceManager() + active = wm.get_active() + if not active: + return None + + targets = wm.list_targets(active) + + kp = resolve_knowledge_paths() + knowledge_scope = "workspace" if kp.get("using_workspace") else "global" + + ctx = { + "name": active, + "targets": list(targets), + "has_targets": bool(targets), + "knowledge_scope": knowledge_scope, + } + + return MappingProxyType(ctx) + @property def state(self) -> AgentState: """Get current agent state.""" @@ -448,15 +478,120 @@ class BaseAgent(ABC): if tool: try: - result = await tool.execute(arguments, self.runtime) - results.append( - ToolResult( - tool_call_id=tool_call_id, - tool_name=name, - result=result, - success=True, + # Before executing, enforce target safety gate when workspace active + wm = WorkspaceManager() + active = wm.get_active() + + def _gather_candidate_targets(obj) -> list: + """Extract candidate target strings from arguments (shallow).""" + candidates = [] + if isinstance(obj, str): + candidates.append(obj) + elif isinstance(obj, dict): + for k, v in obj.items(): + if k.lower() in ( + "target", + "host", + "hostname", + "ip", + "address", + "url", + "hosts", + "targets", + ): + if isinstance(v, (list, tuple)): + for it in v: + if isinstance(it, str): + candidates.append(it) + elif isinstance(v, str): + candidates.append(v) + return candidates + + def _is_target_in_scope(candidate: str, allowed: list) -> bool: + """Check if candidate target is covered by any allowed target (IP/CIDR/hostname).""" + import ipaddress + + try: + # normalize candidate + norm = TargetManager.normalize_target(candidate) + except Exception: + return False + + # If candidate is IP or CIDR, handle appropriately + try: + if "/" in norm: + cand_net = ipaddress.ip_network(norm, strict=False) + # If any allowed contains this network or equals it + for a in allowed: + try: + if "/" in a: + an = ipaddress.ip_network(a, strict=False) + if cand_net.subnet_of(an) or cand_net == an: + return True + else: + # allowed is IP/hostname + if ipaddress.ip_address(a) == list(cand_net.hosts())[0]: + return True + except Exception: + continue + return False + else: + cand_ip = ipaddress.ip_address(norm) + for a in allowed: + try: + if "/" in a: + an = ipaddress.ip_network(a, strict=False) + if cand_ip in an: + return True + else: + if TargetManager.normalize_target(a) == norm: + return True + except Exception: + # hostname allowed entries fall through + if isinstance(a, str) and a.lower() == norm.lower(): + return True + return False + except Exception: + # candidate is likely hostname + for a in allowed: + if a.lower() == norm.lower(): + return True + return False + + out_of_scope = [] + if active: + allowed = wm.list_targets(active) + candidates = _gather_candidate_targets(arguments) + for c in candidates: + try: + if not _is_target_in_scope(c, allowed): + out_of_scope.append(c) + except Exception: + out_of_scope.append(c) + + if active and out_of_scope: + # Block execution and return an explicit error requiring operator confirmation + results.append( + ToolResult( + tool_call_id=tool_call_id, + tool_name=name, + error=( + f"Out-of-scope target(s): {out_of_scope} - operator confirmation required. " + "Set workspace targets with /target or run tool manually." + ), + success=False, + ) + ) + else: + result = await tool.execute(arguments, self.runtime) + results.append( + ToolResult( + tool_call_id=tool_call_id, + tool_name=name, + result=result, + success=True, + ) ) - ) except Exception as e: results.append( ToolResult( diff --git a/pentestagent/interface/main.py b/pentestagent/interface/main.py index bc48e2b..391b8ee 100644 --- a/pentestagent/interface/main.py +++ b/pentestagent/interface/main.py @@ -127,6 +127,25 @@ Examples: mcp_test = mcp_subparsers.add_parser("test", help="Test MCP server connection") mcp_test.add_argument("name", help="Server name to test") + # workspace management + ws_parser = subparsers.add_parser( + "workspace", help="Workspace lifecycle and info commands" + ) + ws_parser.add_argument( + "action", + nargs="?", + help="Action or workspace name. Subcommands: info, note, clear, export, import", + ) + ws_parser.add_argument("rest", nargs=argparse.REMAINDER, help="Additional arguments") + + # NOTE: use `workspace list` to list workspaces (handled by workspace subcommand) + + # target management + tgt_parser = subparsers.add_parser( + "target", help="Add or list targets for the active workspace" + ) + tgt_parser.add_argument("values", nargs="*", help="Targets to add (IP/CIDR/hostname)") + return parser, parser.parse_args() @@ -304,6 +323,210 @@ def handle_mcp_command(args: argparse.Namespace): console.print("[yellow]Use 'pentestagent mcp --help' for available commands[/]") +def handle_workspace_command(args: argparse.Namespace): + """Handle workspace lifecycle commands and actions.""" + import shutil + + from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError + from pentestagent.workspaces.utils import export_workspace, import_workspace, resolve_knowledge_paths + + wm = WorkspaceManager() + + action = args.action + rest = args.rest or [] + + # No args -> show active workspace + if not action: + active = wm.get_active() + if not active: + print("No active workspace.") + else: + print(f"Active workspace: {active}") + return + + # Subcommands + if action == "info": + # show info for active or specified workspace + name = rest[0] if rest else wm.get_active() + if not name: + print("No workspace specified and no active workspace.") + return + try: + meta = wm.get_meta(name) + created = meta.get("created_at") + last_active = meta.get("last_active_at") + targets = meta.get("targets", []) + kp = resolve_knowledge_paths() + ks = "workspace" if kp.get("using_workspace") else "global" + # estimate loot size if present + import os + + loot_dir = (wm.workspace_path(name) / "loot").resolve() + size = 0 + files = 0 + if loot_dir.exists(): + for rootp, _, filenames in os.walk(loot_dir): + for fn in filenames: + try: + fp = os.path.join(rootp, fn) + size += os.path.getsize(fp) + files += 1 + except Exception: + pass + + print(f"Name: {name}") + print(f"Created: {created}") + print(f"Last active: {last_active}") + print(f"Targets: {len(targets)}") + print(f"Knowledge scope: {ks}") + print(f"Loot files: {files}, approx size: {size} bytes") + except Exception as e: + print(f"Error retrieving workspace info: {e}") + return + + if action == "list": + # list all workspaces and mark active + try: + wss = wm.list_workspaces() + active = wm.get_active() + if not wss: + print("No workspaces found.") + return + for name in sorted(wss): + prefix = "* " if name == active else " " + print(f"{prefix}{name}") + except Exception as e: + print(f"Error listing workspaces: {e}") + return + + if action == "note": + # Append operator note to active workspace (or specified) + name = rest[0] if rest and not rest[0].startswith("--") else wm.get_active() + if not name: + print("No active workspace. Set one with /workspace .") + return + text = " ".join(rest[1:]) if rest and rest[0] == name else " ".join(rest) + if not text: + print("Usage: workspace note ") + return + try: + wm.set_operator_note(name, text) + print(f"Operator note saved for workspace '{name}'.") + except Exception as e: + print(f"Error saving note: {e}") + return + + if action == "clear": + active = wm.get_active() + if not active: + print("No active workspace.") + return + marker = wm.active_marker() + try: + if marker.exists(): + marker.unlink() + print(f"Workspace '{active}' deactivated.") + except Exception as e: + print(f"Error deactivating workspace: {e}") + return + + if action == "export": + # export [--output file.tar.gz] + if not rest: + print("Usage: workspace export [--output file.tar.gz]") + return + name = rest[0] + out = None + if "--output" in rest: + idx = rest.index("--output") + if idx + 1 < len(rest): + out = Path(rest[idx + 1]) + try: + archive = export_workspace(name, output=out) + print(f"Workspace exported: {archive}") + except Exception as e: + print(f"Export failed: {e}") + return + + if action == "import": + # import + if not rest: + print("Usage: workspace import ") + return + archive = Path(rest[0]) + try: + name = import_workspace(archive) + print(f"Workspace imported: {name} (not activated)") + except Exception as e: + print(f"Import failed: {e}") + return + + # Default: treat action as workspace name -> create and set active + name = action + try: + existed = wm.workspace_path(name).exists() + if not existed: + wm.create(name) + wm.set_active(name) + + # restore last target if present + last = wm.get_meta_field(name, "last_target") + if last: + print(f"Workspace '{name}' set active. Restored target: {last}") + else: + if existed: + print(f"Workspace '{name}' set active.") + else: + print(f"Workspace '{name}' created and set active.") + except WorkspaceError as e: + print(f"Error: {e}") + except Exception as e: + print(f"Error creating workspace: {e}") + + +def handle_workspaces_list(): + from pentestagent.workspaces.manager import WorkspaceManager + + wm = WorkspaceManager() + wss = wm.list_workspaces() + active = wm.get_active() + if not wss: + print("No workspaces found.") + return + for name in sorted(wss): + prefix = "* " if name == active else " " + print(f"{prefix}{name}") + + + +def handle_target_command(args: argparse.Namespace): + """Handle target add/list commands.""" + from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError + + wm = WorkspaceManager() + active = wm.get_active() + if not active: + print("No active workspace. Set one with /workspace .") + return + + vals = args.values or [] + try: + if not vals: + targets = wm.list_targets(active) + if not targets: + print(f"No targets for workspace '{active}'.") + else: + print(f"Targets for workspace '{active}': {targets}") + return + + saved = wm.add_targets(active, vals) + print(f"Targets for workspace '{active}': {saved}") + except WorkspaceError as e: + print(f"Error: {e}") + except Exception as e: + print(f"Error updating targets: {e}") + + def main(): """Main entry point.""" parser, args = parse_arguments() @@ -317,6 +540,16 @@ def main(): handle_mcp_command(args) return + if args.command == "workspace": + handle_workspace_command(args) + return + + # 'workspace list' handled by workspace subcommand + + if args.command == "target": + handle_target_command(args) + return + if args.command == "run": # Check model configuration if not args.model: diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index d7ce51f..6282cfa 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -109,8 +109,8 @@ class HelpScreen(ModalScreen): } #help-container { - width: 60; - height: 26; + width: 110; + height: 30; background: #121212; border: solid #3a3a3a; padding: 1 2; @@ -195,6 +195,137 @@ class HelpScreen(ModalScreen): self.app.pop_screen() +class WorkspaceHelpScreen(ModalScreen): + """Help modal for workspace commands.""" + + BINDINGS = [ + Binding("escape", "dismiss", "Close"), + Binding("q", "dismiss", "Close"), + ] + + CSS = """ + WorkspaceHelpScreen { + align: center middle; + scrollbar-background: #1a1a1a; + scrollbar-background-hover: #1a1a1a; + scrollbar-background-active: #1a1a1a; + scrollbar-color: #3a3a3a; + scrollbar-color-hover: #3a3a3a; + scrollbar-color-active: #3a3a3a; + scrollbar-corner-color: #1a1a1a; + scrollbar-size: 1 1; + } + + #help-container { + width: 60; + height: 26; + background: #121212; + border: solid #3a3a3a; + padding: 1 2; + layout: vertical; + } + + #help-title { + text-align: center; + text-style: bold; + color: #d4d4d4; + margin-bottom: 1; + } + + #help-content { + color: #9a9a9a; + } + + + #help-close { + margin-top: 1; + width: auto; + min-width: 10; + background: #1a1a1a; + color: #9a9a9a; + border: none; + } + + #help-close:hover { + background: #262626; + } + + #help-close:focus { + background: #262626; + text-style: none; + } + """ + def compose(self) -> ComposeResult: + from rich.table import Table + from rich.text import Text + + # Build a two-column table to prevent wrapping + table = Table.grid(padding=(0, 3)) + table.add_column(justify="left", ratio=2) + table.add_column(justify="left", ratio=3) + + # Header and usage + header = Text("Workspace Commands", style="bold") + usage = Text("Usage: /workspace or /workspace ") + + # Commands list + cmds = [ + ("/workspace", "Show active"), + ("/workspace list", "List all workspaces"), + ("/workspace info [NAME]", "Show workspace metadata"), + ("/workspace note ", "Add operator note"), + ("/workspace clear", "Deactivate workspace"), + ("/workspace NAME", "Create or activate workspace"), + ("/workspace help", "Show this help"), + ] + + # Compose table rows + table.add_row(Text("Commands:", style="bold"), Text("")) + + for left, right in cmds: + table.add_row(left, right) + + yield Container( + Static(header, id="help-title"), + Static(usage, id="help-usage"), + Static(table, id="help-content"), + Center(Button("Close", id="help-close"), id="help-center"), + id="help-container", + ) + + def _get_help_text(self) -> str: + header = "Usage: /workspace or /workspace \n" + cmds = [ + ("/workspace", "Show active"), + ("/workspace list", "List all workspaces"), + ("/workspace info [NAME]", "Show workspace metadata"), + ("/workspace note ", "Add operator note"), + ("/workspace clear", "Deactivate workspace"), + ("/workspace NAME", "Create or activate workspace"), + ("/workspace help", "Show this help"), + ] + + # Build two-column layout with fixed left column width + left_width = 44 + lines = [header, "Commands:\n"] + for left, right in cmds: + if len(left) >= left_width - 2: + # if left is long, place on its own line + lines.append(f" {left}\n {right}") + else: + pad = " " * (left_width - len(left)) + lines.append(f" {left}{pad}{right}") + + return "\n".join(lines) + + def action_dismiss(self) -> None: + self.app.pop_screen() + + @on(Button.Pressed, "#help-close") + def close_help(self) -> None: + self.app.pop_screen() + + class ToolsScreen(ModalScreen): """Interactive tools browser — split-pane layout. @@ -1393,6 +1524,26 @@ class PentestAgentTUI(App): # Update agent's target if agent exists if self.agent: self.agent.target = target + + # Persist to active workspace if present + try: + from pentestagent.workspaces.manager import WorkspaceManager + + wm = WorkspaceManager() + active = wm.get_active() + if active: + try: + wm.set_last_target(active, target) + except Exception: + pass + except Exception: + pass + + # Update displayed Target in the UI + try: + self._apply_target_display(target) + except Exception: + pass # Update the initial ready SystemMessage (if present) so Target appears under Runtime try: scroll = self.query_one("#chat-scroll", ScrollableContainer) @@ -1401,15 +1552,31 @@ class PentestAgentTUI(App): if isinstance(child, SystemMessage) and "PentestAgent ready" in getattr( child, "message_content", "" ): - # Append Target line if not already present - if "Target:" not in child.message_content: - child.message_content = ( - child.message_content + f"\n Target: {target}" - ) + # Replace existing Target line if present, otherwise append + try: + if "Target:" in child.message_content: + # replace the first Target line + import re + + child.message_content = re.sub( + r"(?m)^\s*Target:.*$", + f" Target: {target}", + child.message_content, + count=1, + ) + else: + child.message_content = ( + child.message_content + f"\n Target: {target}" + ) try: child.refresh() except Exception: pass + except Exception: + # Fallback to append if regex replacement fails + child.message_content = ( + child.message_content + f"\n Target: {target}" + ) updated = True break if not updated: @@ -1628,6 +1795,138 @@ Be concise. Use the actual data from notes.""" _ = cast(Any, self._run_report_generation()) elif cmd_original.startswith("/target"): self._set_target(cmd_original) + elif cmd_original.startswith("/workspace"): + # Support lightweight workspace management from the TUI + try: + from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError + from pentestagent.workspaces.utils import resolve_knowledge_paths + from pathlib import Path + + wm = WorkspaceManager() + rest = cmd_original[len("/workspace") :].strip() + + if not rest: + active = wm.get_active() + if not active: + self._add_system("No active workspace.") + else: + # restore last target if present + last = wm.get_meta_field(active, "last_target") + if last: + self.target = last + if self.agent: + self.agent.target = last + try: + self._apply_target_display(last) + except Exception: + pass + self._add_system(f"Active workspace: {active}") + return + + parts = rest.split() + verb = parts[0].lower() + + if verb == "help": + try: + await self.push_screen(WorkspaceHelpScreen()) + except Exception: + # Fallback: show inline help text + self._add_system( + "Usage: /workspace \nCommands: list, info, note, clear, help, " + ) + return + + if verb == "list": + wss = wm.list_workspaces() + if not wss: + self._add_system("No workspaces found.") + return + out = [] + active = wm.get_active() + for name in sorted(wss): + prefix = "* " if name == active else " " + out.append(f"{prefix}{name}") + self._add_system("\n".join(out)) + return + + if verb == "info": + name = parts[1] if len(parts) > 1 else wm.get_active() + if not name: + self._add_system("No workspace specified and no active workspace.") + return + try: + meta = wm.get_meta(name) + created = meta.get("created_at") + last_active = meta.get("last_active_at") + targets = meta.get("targets", []) + kp = resolve_knowledge_paths() + ks = "workspace" if kp.get("using_workspace") else "global" + self._add_system( + f"Name: {name}\nCreated: {created}\nLast active: {last_active}\nTargets: {len(targets)}\nKnowledge scope: {ks}" + ) + except Exception as e: + self._add_system(f"Error retrieving workspace info: {e}") + return + + if verb == "note": + name = parts[1] if len(parts) > 1 and not parts[1].startswith("--") else wm.get_active() + if not name: + self._add_system("No active workspace. Set one with /workspace .") + return + text = " ".join(parts[1:]) if len(parts) > 1 and parts[1] == name else " ".join(parts[1:]) + if not text: + self._add_system("Usage: /workspace note ") + return + try: + wm.set_operator_note(name, text) + self._add_system(f"Operator note saved for workspace '{name}'.") + except Exception as e: + self._add_system(f"Error saving note: {e}") + return + + if verb == "clear": + active = wm.get_active() + if not active: + self._add_system("No active workspace.") + return + marker = wm.active_marker() + try: + if marker.exists(): + marker.unlink() + self._add_system(f"Workspace '{active}' deactivated.") + except Exception as e: + self._add_system(f"Error deactivating workspace: {e}") + return + + # Default: treat rest as workspace name -> create (only if missing) and set active + name = rest + try: + existed = wm.workspace_path(name).exists() + if not existed: + wm.create(name) + wm.set_active(name) + # restore last target if set on workspace + last = wm.get_meta_field(name, "last_target") + if last: + self.target = last + if self.agent: + self.agent.target = last + try: + self._apply_target_display(last) + except Exception: + pass + + if existed: + self._add_system(f"Workspace '{name}' set active.") + else: + self._add_system(f"Workspace '{name}' created and set active.") + except WorkspaceError as e: + self._add_system(f"Error: {e}") + except Exception as e: + self._add_system(f"Error creating workspace: {e}") + except Exception as e: + self._add_system(f"Workspace command error: {e}") + return elif cmd_original.startswith("/agent"): await self._parse_agent_command(cmd_original) elif cmd_original.startswith("/crew"): @@ -1748,6 +2047,53 @@ Be concise. Use the actual data from notes.""" except Exception as e: self._add_system(f"[!] Sidebar error: {e}") + def _apply_target_display(self, target: str) -> None: + """Update or insert the Target line in the system/banner area.""" + try: + scroll = self.query_one("#chat-scroll", ScrollableContainer) + updated = False + for child in scroll.children: + if isinstance(child, SystemMessage) and "PentestAgent ready" in getattr( + child, "message_content", "" + ): + # Replace existing Target line if present, otherwise append + try: + if "Target:" in child.message_content: + import re + + child.message_content = re.sub( + r"(?m)^\s*Target:.*$", + f" Target: {target}", + child.message_content, + count=1, + ) + else: + child.message_content = ( + child.message_content + f"\n Target: {target}" + ) + try: + child.refresh() + except Exception: + pass + except Exception: + child.message_content = ( + child.message_content + f"\n Target: {target}" + ) + updated = True + break + if not updated: + try: + first = scroll.children[0] if scroll.children else None + msg = SystemMessage(f" Target: {target}") + if first: + scroll.mount_before(msg, first) + else: + scroll.mount(msg) + except Exception: + self._add_system(f" Target: {target}") + except Exception: + self._add_system(f" Target: {target}") + def _hide_sidebar(self) -> None: """Hide the sidebar.""" try: diff --git a/pentestagent/knowledge/indexer.py b/pentestagent/knowledge/indexer.py index 1116d31..3f4ce73 100644 --- a/pentestagent/knowledge/indexer.py +++ b/pentestagent/knowledge/indexer.py @@ -6,6 +6,7 @@ from pathlib import Path from typing import Any, List from .rag import Document +from ..workspaces.utils import resolve_knowledge_paths @dataclass @@ -51,6 +52,11 @@ class KnowledgeIndexer: total_files = 0 indexed_files = 0 + # If directory is the default 'knowledge', prefer workspace knowledge if available + if directory == Path("knowledge"): + kp = resolve_knowledge_paths() + directory = kp.get("sources", Path("knowledge")) + if not directory.exists(): return documents, IndexingResult( 0, 0, 0, [f"Directory not found: {directory}"] diff --git a/pentestagent/knowledge/rag.py b/pentestagent/knowledge/rag.py index 9a113fa..2523a96 100644 --- a/pentestagent/knowledge/rag.py +++ b/pentestagent/knowledge/rag.py @@ -8,6 +8,7 @@ from typing import Any, Dict, List, Optional import numpy as np from .embeddings import get_embeddings +from ..workspaces.utils import resolve_knowledge_paths @dataclass @@ -65,9 +66,34 @@ class RAGEngine: chunks = [] self._source_files = set() # Reset source file tracking + # Resolve knowledge paths (prefer workspace if available) + if self.knowledge_path != Path("knowledge"): + sources_base = self.knowledge_path + kp = None + else: + kp = resolve_knowledge_paths() + sources_base = kp.get("sources", Path("knowledge")) + + # If workspace has a persisted index and we're not forcing reindex, try to load it + try: + if kp and kp.get("using_workspace"): + emb_dir = kp.get("embeddings") + emb_dir.mkdir(parents=True, exist_ok=True) + idx_path = emb_dir / "index.pkl" + if idx_path.exists() and not force: + try: + self.load_index(idx_path) + return + except Exception: + # Fall through to re-index if loading fails + pass + except Exception: + # Non-fatal — continue to index from sources + pass + # Process all files in knowledge directory - if self.knowledge_path.exists(): - for file in self.knowledge_path.rglob("*"): + if sources_base.exists(): + for file in sources_base.rglob("*"): if not file.is_file(): continue @@ -127,6 +153,19 @@ class RAGEngine: doc.embedding = self.embeddings[i] self._indexed = True + # If using a workspace, persist the built index for faster future loads + try: + if kp and kp.get("using_workspace") and self.embeddings is not None: + emb_dir = kp.get("embeddings") + emb_dir.mkdir(parents=True, exist_ok=True) + idx_path = emb_dir / "index.pkl" + try: + self.save_index(idx_path) + except Exception: + # ignore save failures + pass + except Exception: + pass def _chunk_text( self, text: str, source: str, chunk_size: int = 1000, overlap: int = 200 @@ -408,6 +447,22 @@ class RAGEngine: with open(path, "wb") as f: pickle.dump(data, f) + def save_index_to_workspace(self, root: Optional[Path] = None, filename: str = "index.pkl"): + """ + Convenience helper to save the index into the active workspace embeddings path. + + Args: + root: Optional project root to resolve workspaces (defaults to cwd) + filename: Filename to use for the saved index + """ + from pathlib import Path as _P + + kp = resolve_knowledge_paths(root=root) + emb_dir = kp.get("embeddings") + emb_dir.mkdir(parents=True, exist_ok=True) + path = _P(emb_dir) / filename + self.save_index(path) + def load_index(self, path: Path): """ Load the index from disk. @@ -437,3 +492,20 @@ class RAGEngine: doc.embedding = self.embeddings[i] self._indexed = True + + def load_index_from_workspace(self, root: Optional[Path] = None, filename: str = "index.pkl"): + """ + Convenience helper to load the index from the active workspace embeddings path. + + Args: + root: Optional project root to resolve workspaces (defaults to cwd) + filename: Filename used for the saved index + """ + from pathlib import Path as _P + + kp = resolve_knowledge_paths(root=root) + emb_dir = kp.get("embeddings") + path = _P(emb_dir) / filename + if not path.exists(): + raise FileNotFoundError(f"Workspace index not found: {path}") + self.load_index(path) diff --git a/pentestagent/mcp/hexstrike_adapter.py b/pentestagent/mcp/hexstrike_adapter.py index 67dcfa0..fbc7601 100644 --- a/pentestagent/mcp/hexstrike_adapter.py +++ b/pentestagent/mcp/hexstrike_adapter.py @@ -24,9 +24,8 @@ except Exception: aiohttp = None -LOOT_DIR = Path("loot/artifacts") -LOOT_DIR.mkdir(parents=True, exist_ok=True) -LOG_FILE = LOOT_DIR / "hexstrike.log" +from ..workspaces.utils import get_loot_file + class HexstrikeAdapter: @@ -97,7 +96,8 @@ class HexstrikeAdapter: try: pid = getattr(self._process, "pid", None) if pid: - with LOG_FILE.open("a") as fh: + log_file = get_loot_file("artifacts/hexstrike.log") + with log_file.open("a") as fh: fh.write(f"[HexstrikeAdapter] started pid={pid}\n") except Exception: pass @@ -118,12 +118,12 @@ class HexstrikeAdapter: return try: - with LOG_FILE.open("ab") as fh: + log_file = get_loot_file("artifacts/hexstrike.log") + with log_file.open("ab") as fh: while True: line = await self._process.stdout.readline() if not line: break - # Prefix timestamps for easier debugging fh.write(line) fh.flush() except asyncio.CancelledError: diff --git a/pentestagent/mcp/metasploit_adapter.py b/pentestagent/mcp/metasploit_adapter.py index e696d0e..f83c4ff 100644 --- a/pentestagent/mcp/metasploit_adapter.py +++ b/pentestagent/mcp/metasploit_adapter.py @@ -21,9 +21,7 @@ except Exception: aiohttp = None -LOOT_DIR = Path("loot/artifacts") -LOOT_DIR.mkdir(parents=True, exist_ok=True) -LOG_FILE = LOOT_DIR / "metasploit_mcp.log" +from ..workspaces.utils import get_loot_file class MetasploitAdapter: @@ -193,7 +191,8 @@ class MetasploitAdapter: try: pid = getattr(self._process, "pid", None) if pid: - with LOG_FILE.open("a") as fh: + log_file = get_loot_file("artifacts/metasploit_mcp.log") + with log_file.open("a") as fh: fh.write(f"[MetasploitAdapter] started pid={pid}\n") except Exception: pass @@ -212,7 +211,8 @@ class MetasploitAdapter: return try: - with LOG_FILE.open("ab") as fh: + log_file = get_loot_file("artifacts/metasploit_mcp.log") + with log_file.open("ab") as fh: while True: line = await self._process.stdout.readline() if not line: diff --git a/pentestagent/runtime/runtime.py b/pentestagent/runtime/runtime.py index c68c316..fcbb677 100644 --- a/pentestagent/runtime/runtime.py +++ b/pentestagent/runtime/runtime.py @@ -455,11 +455,14 @@ class LocalRuntime(Runtime): async def start(self): """Start the local runtime.""" self._running = True - # Create organized loot directory structure - Path("loot").mkdir(exist_ok=True) - Path("loot/reports").mkdir(exist_ok=True) - Path("loot/artifacts").mkdir(exist_ok=True) - Path("loot/artifacts/screenshots").mkdir(exist_ok=True) + # Create organized loot directory structure (workspace-aware) + from ..workspaces.utils import get_loot_base + + base = get_loot_base() + (base).mkdir(parents=True, exist_ok=True) + (base / "reports").mkdir(parents=True, exist_ok=True) + (base / "artifacts").mkdir(parents=True, exist_ok=True) + (base / "artifacts" / "screenshots").mkdir(parents=True, exist_ok=True) async def stop(self): """Stop the local runtime gracefully.""" @@ -659,9 +662,10 @@ class LocalRuntime(Runtime): kwargs["url"], timeout=timeout, wait_until="domcontentloaded" ) - # Save screenshot to loot/artifacts/screenshots/ - output_dir = Path("loot/artifacts/screenshots") - output_dir.mkdir(parents=True, exist_ok=True) + # Save screenshot to workspace-aware loot/artifacts/screenshots/ + from ..workspaces.utils import get_loot_file + + output_dir = get_loot_file("artifacts/screenshots").parent timestamp = int(time.time()) unique_id = uuid.uuid4().hex[:8] diff --git a/pentestagent/tools/notes/__init__.py b/pentestagent/tools/notes/__init__.py index 8a90e8e..52b1612 100644 --- a/pentestagent/tools/notes/__init__.py +++ b/pentestagent/tools/notes/__init__.py @@ -9,17 +9,27 @@ from ..registry import ToolSchema, register_tool # Notes storage - kept at loot root for easy access _notes: Dict[str, Dict[str, Any]] = {} -_notes_file: Path = Path("loot/notes.json") +# Optional override (tests can call set_notes_file) +_custom_notes_file: Path | None = None # Lock for safe concurrent access from multiple agents (asyncio since agents are async tasks) _notes_lock = asyncio.Lock() +def _notes_file_path() -> Path: + from ...workspaces.utils import get_loot_file + + if _custom_notes_file: + return _custom_notes_file + return get_loot_file("notes.json") + + def _load_notes_unlocked() -> None: """Load notes from file (caller must hold lock).""" global _notes - if _notes_file.exists(): + nf = _notes_file_path() + if nf.exists(): try: - loaded = json.loads(_notes_file.read_text(encoding="utf-8")) + loaded = json.loads(nf.read_text(encoding="utf-8")) # Migration: Convert legacy string values to dicts _notes = {} for k, v in loaded.items(): @@ -37,8 +47,9 @@ def _load_notes_unlocked() -> None: def _save_notes_unlocked() -> None: """Save notes to file (caller must hold lock).""" - _notes_file.parent.mkdir(parents=True, exist_ok=True) - _notes_file.write_text(json.dumps(_notes, indent=2), encoding="utf-8") + nf = _notes_file_path() + nf.parent.mkdir(parents=True, exist_ok=True) + nf.write_text(json.dumps(_notes, indent=2), encoding="utf-8") async def get_all_notes() -> Dict[str, Dict[str, Any]]: @@ -52,9 +63,9 @@ async def get_all_notes() -> Dict[str, Dict[str, Any]]: def get_all_notes_sync() -> Dict[str, Dict[str, Any]]: """Get all notes synchronously (read-only, best effort for prompts).""" # If notes are empty, try to load from disk (safe read) - if not _notes and _notes_file.exists(): + if not _notes and _notes_file_path().exists(): try: - loaded = json.loads(_notes_file.read_text(encoding="utf-8")) + loaded = json.loads(_notes_file_path().read_text(encoding="utf-8")) # Migration for sync read result = {} for k, v in loaded.items(): @@ -74,14 +85,13 @@ def get_all_notes_sync() -> Dict[str, Dict[str, Any]]: def set_notes_file(path: Path) -> None: """Set custom notes file path.""" - global _notes_file - _notes_file = path + global _custom_notes_file + _custom_notes_file = Path(path) # Can't use async here, so load without lock (called at init time) _load_notes_unlocked() -# Load notes on module import (init time, no contention yet) -_load_notes_unlocked() +# Defer loading until first access to avoid caching active workspace path at import # Validation schema - declarative rules for note structure diff --git a/pentestagent/tools/token_tracker.py b/pentestagent/tools/token_tracker.py index 848ffd2..278bb6c 100644 --- a/pentestagent/tools/token_tracker.py +++ b/pentestagent/tools/token_tracker.py @@ -11,8 +11,8 @@ from datetime import date from pathlib import Path from typing import Any, Dict -# Persistent storage (loot root) -_data_file: Path = Path("loot/token_usage.json") +# Persistent storage (loot root) - compute at use to respect active workspace +_custom_data_file: Path | None = None _data_lock = threading.Lock() # In-memory cache @@ -27,9 +27,15 @@ _data: Dict[str, Any] = { def _load_unlocked() -> None: global _data - if _data_file.exists(): + data_file = _custom_data_file or None + if not data_file: + from ..workspaces.utils import get_loot_file + + data_file = get_loot_file("token_usage.json") + + if data_file.exists(): try: - loaded = json.loads(_data_file.read_text(encoding="utf-8")) + loaded = json.loads(data_file.read_text(encoding="utf-8")) # Merge with defaults to be robust to schema changes d = {**_data, **(loaded or {})} _data = d @@ -45,14 +51,20 @@ def _load_unlocked() -> None: def _save_unlocked() -> None: - _data_file.parent.mkdir(parents=True, exist_ok=True) - _data_file.write_text(json.dumps(_data, indent=2), encoding="utf-8") + data_file = _custom_data_file or None + if not data_file: + from ..workspaces.utils import get_loot_file + + data_file = get_loot_file("token_usage.json") + + data_file.parent.mkdir(parents=True, exist_ok=True) + data_file.write_text(json.dumps(_data, indent=2), encoding="utf-8") def set_data_file(path: Path) -> None: """Override the data file (used by tests).""" - global _data_file - _data_file = path + global _custom_data_file + _custom_data_file = Path(path) _load_unlocked() diff --git a/pentestagent/workspaces/__init__.py b/pentestagent/workspaces/__init__.py new file mode 100644 index 0000000..bb6cc20 --- /dev/null +++ b/pentestagent/workspaces/__init__.py @@ -0,0 +1,3 @@ +from .manager import WorkspaceManager, TargetManager, WorkspaceError + +__all__ = ["WorkspaceManager", "TargetManager", "WorkspaceError"] diff --git a/pentestagent/workspaces/manager.py b/pentestagent/workspaces/manager.py new file mode 100644 index 0000000..b7a869f --- /dev/null +++ b/pentestagent/workspaces/manager.py @@ -0,0 +1,215 @@ +"""WorkspaceManager: file-backed workspace and target management using YAML. + +Design goals: +- Workspace metadata stored as YAML at workspaces/{name}/meta.yaml +- Active workspace marker stored at workspaces/.active +- No in-memory caching: all operations read/write files directly +- Lightweight hostname validation; accept IPs, CIDRs, hostnames +""" +from pathlib import Path +import re +import time +import ipaddress +from typing import List + +import yaml + + +class WorkspaceError(Exception): + pass + + +WORKSPACES_DIR_NAME = "workspaces" +NAME_RE = re.compile(r"^[A-Za-z0-9._-]{1,64}$") + + +def _safe_mkdir(path: Path): + path.mkdir(parents=True, exist_ok=True) + + +class TargetManager: + """Validate and normalize targets (IP, CIDR, hostname). + + Hostname validation is intentionally light: allow letters, digits, hyphens, dots. + """ + + HOST_RE = re.compile(r"^[A-Za-z0-9.-]{1,253}$") + + @staticmethod + def normalize_target(value: str) -> str: + v = value.strip() + # try CIDR or IP + try: + if "/" in v: + net = ipaddress.ip_network(v, strict=False) + return str(net) + else: + ip = ipaddress.ip_address(v) + return str(ip) + except Exception: + # fallback to hostname validation (light) + if TargetManager.HOST_RE.match(v) and ".." not in v: + return v.lower() + raise WorkspaceError(f"Invalid target: {value}") + + @staticmethod + def validate(value: str) -> bool: + try: + TargetManager.normalize_target(value) + return True + except WorkspaceError: + return False + + +class WorkspaceManager: + """File-backed workspace manager. No persistent in-memory state. + + Root defaults to current working directory. + """ + + def __init__(self, root: Path = Path(".")): + self.root = Path(root) + self.workspaces_dir = self.root / WORKSPACES_DIR_NAME + _safe_mkdir(self.workspaces_dir) + + def validate_name(self, name: str): + if not NAME_RE.match(name): + raise WorkspaceError( + "Invalid workspace name; allowed characters: A-Za-z0-9._- (1-64 chars)" + ) + # prevent path traversal and slashes + if "/" in name or ".." in name: + raise WorkspaceError("Invalid workspace name; must not contain '/' or '..'") + + def workspace_path(self, name: str) -> Path: + self.validate_name(name) + return self.workspaces_dir / name + + def meta_path(self, name: str) -> Path: + return self.workspace_path(name) / "meta.yaml" + + def active_marker(self) -> Path: + return self.workspaces_dir / ".active" + + def create(self, name: str) -> dict: + self.validate_name(name) + p = self.workspace_path(name) + # create required dirs + for sub in ("loot", "knowledge/sources", "knowledge/embeddings", "notes", "memory"): + _safe_mkdir(p / sub) + + # initialize meta if missing + if not self.meta_path(name).exists(): + meta = {"name": name, "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"), "targets": []} + self._write_meta(name, meta) + return meta + return self._read_meta(name) + + def _read_meta(self, name: str) -> dict: + mp = self.meta_path(name) + if not mp.exists(): + return {"name": name, "targets": []} + try: + data = yaml.safe_load(mp.read_text(encoding="utf-8")) + if data is None: + return {"name": name, "targets": []} + # ensure keys + data.setdefault("name", name) + data.setdefault("targets", []) + return data + except Exception as e: + raise WorkspaceError(f"Failed to read meta for {name}: {e}") + + def _write_meta(self, name: str, meta: dict): + mp = self.meta_path(name) + mp.parent.mkdir(parents=True, exist_ok=True) + mp.write_text(yaml.safe_dump(meta, sort_keys=False), encoding="utf-8") + + def set_active(self, name: str): + # ensure workspace exists + self.create(name) + marker = self.active_marker() + marker.write_text(name, encoding="utf-8") + # update last_active_at in meta.yaml + try: + meta = self._read_meta(name) + meta["last_active_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ") + # ensure operator_notes and tool_runs exist + meta.setdefault("operator_notes", "") + meta.setdefault("tool_runs", []) + self._write_meta(name, meta) + except Exception: + # Non-fatal - don't block activation on meta write errors + pass + + def set_operator_note(self, name: str, note: str) -> dict: + """Append or set operator_notes for a workspace (plain text).""" + meta = self._read_meta(name) + prev = meta.get("operator_notes", "") or "" + if prev: + new = prev + "\n" + note + else: + new = note + meta["operator_notes"] = new + self._write_meta(name, meta) + return meta + + def get_meta_field(self, name: str, field: str): + meta = self._read_meta(name) + return meta.get(field) + + def get_active(self) -> str: + marker = self.active_marker() + if not marker.exists(): + return "" + return marker.read_text(encoding="utf-8").strip() + + def list_workspaces(self) -> List[str]: + if not self.workspaces_dir.exists(): + return [] + return [p.name for p in self.workspaces_dir.iterdir() if p.is_dir()] + + def get_meta(self, name: str) -> dict: + return self._read_meta(name) + + def add_targets(self, name: str, values: List[str]) -> List[str]: + # read-modify-write for strict file-backed behavior + meta = self._read_meta(name) + existing = set(meta.get("targets", [])) + changed = False + for v in values: + norm = TargetManager.normalize_target(v) + if norm not in existing: + existing.add(norm) + changed = True + if changed: + meta["targets"] = sorted(existing) + self._write_meta(name, meta) + return meta.get("targets", []) + + def set_last_target(self, name: str, value: str) -> str: + """Set the workspace's last used target and ensure it's in the targets list.""" + norm = TargetManager.normalize_target(value) + meta = self._read_meta(name) + # ensure targets contains it + existing = set(meta.get("targets", [])) + if norm not in existing: + existing.add(norm) + meta["targets"] = sorted(existing) + meta["last_target"] = norm + self._write_meta(name, meta) + return norm + + def remove_target(self, name: str, value: str) -> List[str]: + meta = self._read_meta(name) + existing = set(meta.get("targets", [])) + norm = TargetManager.normalize_target(value) + if norm in existing: + existing.remove(norm) + meta["targets"] = sorted(existing) + self._write_meta(name, meta) + return meta.get("targets", []) + + def list_targets(self, name: str) -> List[str]: + meta = self._read_meta(name) + return meta.get("targets", []) diff --git a/pentestagent/workspaces/utils.py b/pentestagent/workspaces/utils.py new file mode 100644 index 0000000..e107966 --- /dev/null +++ b/pentestagent/workspaces/utils.py @@ -0,0 +1,175 @@ +"""Utilities to route loot/output into the active workspace or global loot. + +All functions are file-backed and do not cache the active workspace selection. +This module will emit a single warning per run if no active workspace is set. +""" +from pathlib import Path +import logging +from typing import Optional + +from .manager import WorkspaceManager + +_WARNED = False + + +def get_loot_base(root: Optional[Path] = None) -> Path: + """Return the base loot directory: workspaces/{active}/loot or top-level `loot/`. + + Emits a single warning if no workspace is active. + """ + global _WARNED + root = Path(root or "./") + wm = WorkspaceManager(root=root) + active = wm.get_active() + if active: + base = root / "workspaces" / active / "loot" + else: + if not _WARNED: + logging.warning("No active workspace — writing loot to global loot/ directory.") + _WARNED = True + base = root / "loot" + base.mkdir(parents=True, exist_ok=True) + return base + + +def get_loot_file(relpath: str, root: Optional[Path] = None) -> Path: + """Return a Path for a file under the loot base, creating parent dirs. + + Example: get_loot_file('artifacts/hexstrike.log') + """ + base = get_loot_base(root=root) + p = base / relpath + p.parent.mkdir(parents=True, exist_ok=True) + return p + + +def resolve_knowledge_paths(root: Optional[Path] = None) -> dict: + """Resolve knowledge-related paths, preferring active workspace if present. + + Returns a dict with keys: base, sources, embeddings, graph, index, using_workspace + """ + root = Path(root or "./") + wm = WorkspaceManager(root=root) + active = wm.get_active() + + global_base = root / "knowledge" + workspace_base = root / "workspaces" / active / "knowledge" if active else None + + use_workspace = False + if workspace_base and workspace_base.exists(): + # prefer workspace if it has any content (explicit opt-in) + try: + if any(workspace_base.rglob("*")): + use_workspace = True + except Exception: + use_workspace = False + + if use_workspace: + base = workspace_base + else: + base = global_base + + paths = { + "base": base, + "sources": base / "sources", + "embeddings": base / "embeddings", + "graph": base / "graph", + "index": base / "index", + "using_workspace": use_workspace, + } + + return paths + + +def export_workspace(name: str, output: Optional[Path] = None, root: Optional[Path] = None) -> Path: + """Create a deterministic tar.gz archive of workspaces/{name}/ and return the archive path. + + Excludes __pycache__ and *.pyc. Does not mutate workspace. + """ + import tarfile + + root = Path(root or "./") + ws_dir = root / "workspaces" / name + if not ws_dir.exists() or not ws_dir.is_dir(): + raise FileNotFoundError(f"Workspace not found: {name}") + + out_path = Path(output) if output else Path(f"{name}-workspace.tar.gz") + + # Use deterministic ordering + entries = [] + for p in ws_dir.rglob("*"): + # skip __pycache__ and .pyc + if "__pycache__" in p.parts: + continue + if p.suffix == ".pyc": + continue + rel = p.relative_to(root) + entries.append(rel) + + entries = sorted(entries, key=lambda p: str(p)) + + # Create tar.gz + with tarfile.open(out_path, "w:gz") as tf: + for rel in entries: + full = root / rel + # store with relative path (preserve workspaces//...) + tf.add(str(full), arcname=str(rel)) + + return out_path + + +def import_workspace(archive: Path, root: Optional[Path] = None) -> str: + """Import a workspace tar.gz into workspaces/. Returns workspace name. + + Fails if workspace already exists. Requires meta.yaml present in archive. + """ + import tarfile + import tempfile + + root = Path(root or "./") + archive = Path(archive) + if not archive.exists(): + raise FileNotFoundError(f"Archive not found: {archive}") + + with tempfile.TemporaryDirectory() as td: + tdpath = Path(td) + with tarfile.open(archive, "r:gz") as tf: + tf.extractall(path=tdpath) + + # Look for workspaces//meta.yaml or meta.yaml at root + candidates = list(tdpath.rglob("meta.yaml")) + if not candidates: + raise ValueError("No meta.yaml found in archive") + meta_file = candidates[0] + # read name + import yaml + + meta = yaml.safe_load(meta_file.read_text(encoding="utf-8")) + if not meta or not meta.get("name"): + raise ValueError("meta.yaml missing 'name' field") + name = meta["name"] + + dest = root / "workspaces" / name + if dest.exists(): + raise FileExistsError(f"Workspace already exists: {name}") + + # Move extracted tree into place + # Find root folder under tdpath that contains the workspace files + # If archive stored paths with workspaces//..., move that subtree + candidate_root = None + for p in tdpath.iterdir(): + if p.is_dir() and p.name == "workspaces": + candidate_root = p / name + break + if candidate_root and candidate_root.exists(): + # move candidate_root to dest + dest.parent.mkdir(parents=True, exist_ok=True) + candidate_root.replace(dest) + else: + # Otherwise, assume contents are directly the workspace folder + # move the parent of meta_file (or its containing dir) + src = meta_file.parent + dest.parent.mkdir(parents=True, exist_ok=True) + src.replace(dest) + + return name diff --git a/requirements.txt b/requirements.txt index 42d1309..d43da20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,7 +36,7 @@ typer>=0.12.0 pydantic>=2.7.0 pydantic-settings>=2.2.0 python-dotenv>=1.0.0 -pyyaml>=6.0.0 +PyYAML>=6.0 jinja2>=3.1.0 # Dev diff --git a/tests/test_rag_workspace_integration.py b/tests/test_rag_workspace_integration.py new file mode 100644 index 0000000..1ffca3d --- /dev/null +++ b/tests/test_rag_workspace_integration.py @@ -0,0 +1,50 @@ +import os +from pathlib import Path + +import pytest + +from pentestagent.workspaces.manager import WorkspaceManager +from pentestagent.knowledge.rag import RAGEngine +from pentestagent.knowledge.indexer import KnowledgeIndexer + + +def test_rag_and_indexer_use_workspace(tmp_path, monkeypatch): + # Use tmp_path as the project root + monkeypatch.chdir(tmp_path) + + wm = WorkspaceManager(root=tmp_path) + name = "ws_test" + wm.create(name) + wm.set_active(name) + + # Create a sample source file in the workspace sources + src_dir = tmp_path / "workspaces" / name / "knowledge" / "sources" + src_dir.mkdir(parents=True, exist_ok=True) + sample = src_dir / "sample.md" + sample.write_text("# Sample\n\nThis is a test knowledge document for RAG indexing.") + + # Ensure KnowledgeIndexer picks up the workspace source when indexing default 'knowledge' + ki = KnowledgeIndexer() + docs, result = ki.index_directory(Path("knowledge")) + + assert result.indexed_files >= 1 + assert len(docs) >= 1 + # Ensure the document source path points at the workspace file + assert any("workspaces" in d.source and "sample.md" in d.source for d in docs) + + # Now run RAGEngine to build embeddings and verify saved index file appears + rag = RAGEngine(use_local_embeddings=True) + rag.index() + + emb_path = tmp_path / "workspaces" / name / "knowledge" / "embeddings" / "index.pkl" + assert emb_path.exists(), f"Expected saved index at {emb_path}" + + # Ensure RAG engine has documents/chunks loaded + assert rag.get_chunk_count() >= 1 + assert rag.get_document_count() >= 1 + + # Now create a new RAGEngine and ensure it loads persisted index automatically + rag2 = RAGEngine(use_local_embeddings=True) + # If load-on-init doesn't run, calling index() should load from saved file + rag2.index() + assert rag2.get_chunk_count() >= 1 diff --git a/tests/test_workspace.py b/tests/test_workspace.py new file mode 100644 index 0000000..1d7c48e --- /dev/null +++ b/tests/test_workspace.py @@ -0,0 +1,96 @@ +import os +from pathlib import Path + +import pytest + +from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError + + +def test_invalid_workspace_names(tmp_path: Path): + wm = WorkspaceManager(root=tmp_path) + bad_names = ["../escape", "name/with/slash", "..", ""] + # overlong name + bad_names.append("a" * 65) + for n in bad_names: + with pytest.raises(WorkspaceError): + wm.create(n) + + +def test_create_and_idempotent(tmp_path: Path): + wm = WorkspaceManager(root=tmp_path) + name = "eng1" + meta = wm.create(name) + assert (tmp_path / "workspaces" / name).exists() + assert (tmp_path / "workspaces" / name / "meta.yaml").exists() + # create again should not raise and should return meta + meta2 = wm.create(name) + assert meta2["name"] == name + + +def test_set_get_active(tmp_path: Path): + wm = WorkspaceManager(root=tmp_path) + name = "activews" + wm.create(name) + wm.set_active(name) + assert wm.get_active() == name + marker = tmp_path / "workspaces" / ".active" + assert marker.exists() + assert marker.read_text(encoding="utf-8").strip() == name + + +def test_add_list_remove_targets(tmp_path: Path): + wm = WorkspaceManager(root=tmp_path) + name = "targets" + wm.create(name) + added = wm.add_targets(name, ["192.168.1.1", "192.168.0.0/16", "Example.COM"]) # hostname mixed case + # normalized entries + assert "192.168.1.1" in added + assert "192.168.0.0/16" in added + assert "example.com" in added + # dedupe + added2 = wm.add_targets(name, ["192.168.1.1", "example.com"]) + assert len(added2) == len(added) + # remove + after = wm.remove_target(name, "192.168.1.1") + assert "192.168.1.1" not in after + + +def test_persistence_across_instances(tmp_path: Path): + wm1 = WorkspaceManager(root=tmp_path) + name = "persist" + wm1.create(name) + wm1.add_targets(name, ["10.0.0.1", "host.local"]) + + # new manager instance reads from disk + wm2 = WorkspaceManager(root=tmp_path) + targets = wm2.list_targets(name) + assert "10.0.0.1" in targets + assert "host.local" in targets + + +def test_last_target_persistence(tmp_path: Path): + wm = WorkspaceManager(root=tmp_path) + a = "wsA" + b = "wsB" + wm.create(a) + wm.create(b) + + t1 = "192.168.0.4" + t2 = "192.168.0.165" + + # set last target on workspace A and B + norm1 = wm.set_last_target(a, t1) + norm2 = wm.set_last_target(b, t2) + + # persisted in meta + assert wm.get_meta_field(a, "last_target") == norm1 + assert wm.get_meta_field(b, "last_target") == norm2 + + # targets list contains the last target + assert norm1 in wm.list_targets(a) + assert norm2 in wm.list_targets(b) + + # new manager instance still sees last_target + wm2 = WorkspaceManager(root=tmp_path) + assert wm2.get_meta_field(a, "last_target") == norm1 + assert wm2.get_meta_field(b, "last_target") == norm2 diff --git a/workspaces/.active b/workspaces/.active new file mode 100644 index 0000000..da8f209 --- /dev/null +++ b/workspaces/.active @@ -0,0 +1 @@ +Test2 \ No newline at end of file diff --git a/workspaces/Test1/meta.yaml b/workspaces/Test1/meta.yaml new file mode 100644 index 0000000..9066d49 --- /dev/null +++ b/workspaces/Test1/meta.yaml @@ -0,0 +1,8 @@ +name: Test1 +created_at: '2026-01-19T08:05:29Z' +targets: +- 192.168.0.4 +last_active_at: '2026-01-19T08:28:24Z' +operator_notes: '' +tool_runs: [] +last_target: 192.168.0.4 diff --git a/workspaces/Test2/meta.yaml b/workspaces/Test2/meta.yaml new file mode 100644 index 0000000..2f40e00 --- /dev/null +++ b/workspaces/Test2/meta.yaml @@ -0,0 +1,8 @@ +name: Test2 +created_at: '2026-01-19T08:05:55Z' +targets: +- 192.168.0.165 +last_active_at: '2026-01-19T08:28:27Z' +operator_notes: '' +tool_runs: [] +last_target: 192.168.0.165