feat(workspaces): add unified /workspace lifecycle, target persistence, and workspace-scoped RAG

- Introduce  command for CLI and TUI with create/activate, list, info, note, clear, export, import, and help actions
- Persist workspace state via  marker and enriched  (targets, operator notes, last_active_at, last_target)
- Restore  on workspace activation and sync it to UI banner, agent state, and CLI output
- Enforce target normalization and ensure  always exists in workspace targets
- Route loot output to  when a workspace is active
- Prefer workspace-local knowledge paths for indexing and RAG resolution
- Persist RAG indexes per workspace and load existing indexes before re-indexing
- Add deterministic workspace export/import utilities (excluding caches)
- Integrate workspace handling into TUI slash commands with modal help screen
This commit is contained in:
giveen
2026-01-19 08:41:38 -07:00
parent 50c8ec1936
commit e8ab673a13
20 changed files with 1439 additions and 56 deletions

9
.gitignore vendored
View File

@@ -81,3 +81,12 @@ Thumbs.db
tmp/
temp/
*.tmp
# Local test artifacts and test scripts (do not commit local test runs)
tests/*.log
tests/*.out
tests/output/
tests/tmp/
tests/*.local.py
scripts/test_*.sh
*.test.sh

View File

@@ -6,6 +6,10 @@ from typing import TYPE_CHECKING, Any, AsyncIterator, List, Optional
from ..config.constants import AGENT_MAX_ITERATIONS
from .state import AgentState, AgentStateManager
from types import MappingProxyType
from ..workspaces.manager import WorkspaceManager, TargetManager, WorkspaceError
from ..workspaces.utils import resolve_knowledge_paths
if TYPE_CHECKING:
from ..llm import LLM
@@ -106,6 +110,32 @@ class BaseAgent(ABC):
# Use tools as-is (finish accesses plan via runtime)
self.tools = list(tools)
@property
def workspace_context(self):
"""Return a read-only workspace context built at access time.
Uses WorkspaceManager.get_active() as the single source of truth
and does not cache state between calls.
"""
wm = WorkspaceManager()
active = wm.get_active()
if not active:
return None
targets = wm.list_targets(active)
kp = resolve_knowledge_paths()
knowledge_scope = "workspace" if kp.get("using_workspace") else "global"
ctx = {
"name": active,
"targets": list(targets),
"has_targets": bool(targets),
"knowledge_scope": knowledge_scope,
}
return MappingProxyType(ctx)
@property
def state(self) -> AgentState:
"""Get current agent state."""
@@ -448,15 +478,120 @@ class BaseAgent(ABC):
if tool:
try:
result = await tool.execute(arguments, self.runtime)
results.append(
ToolResult(
tool_call_id=tool_call_id,
tool_name=name,
result=result,
success=True,
# Before executing, enforce target safety gate when workspace active
wm = WorkspaceManager()
active = wm.get_active()
def _gather_candidate_targets(obj) -> list:
"""Extract candidate target strings from arguments (shallow)."""
candidates = []
if isinstance(obj, str):
candidates.append(obj)
elif isinstance(obj, dict):
for k, v in obj.items():
if k.lower() in (
"target",
"host",
"hostname",
"ip",
"address",
"url",
"hosts",
"targets",
):
if isinstance(v, (list, tuple)):
for it in v:
if isinstance(it, str):
candidates.append(it)
elif isinstance(v, str):
candidates.append(v)
return candidates
def _is_target_in_scope(candidate: str, allowed: list) -> bool:
"""Check if candidate target is covered by any allowed target (IP/CIDR/hostname)."""
import ipaddress
try:
# normalize candidate
norm = TargetManager.normalize_target(candidate)
except Exception:
return False
# If candidate is IP or CIDR, handle appropriately
try:
if "/" in norm:
cand_net = ipaddress.ip_network(norm, strict=False)
# If any allowed contains this network or equals it
for a in allowed:
try:
if "/" in a:
an = ipaddress.ip_network(a, strict=False)
if cand_net.subnet_of(an) or cand_net == an:
return True
else:
# allowed is IP/hostname
if ipaddress.ip_address(a) == list(cand_net.hosts())[0]:
return True
except Exception:
continue
return False
else:
cand_ip = ipaddress.ip_address(norm)
for a in allowed:
try:
if "/" in a:
an = ipaddress.ip_network(a, strict=False)
if cand_ip in an:
return True
else:
if TargetManager.normalize_target(a) == norm:
return True
except Exception:
# hostname allowed entries fall through
if isinstance(a, str) and a.lower() == norm.lower():
return True
return False
except Exception:
# candidate is likely hostname
for a in allowed:
if a.lower() == norm.lower():
return True
return False
out_of_scope = []
if active:
allowed = wm.list_targets(active)
candidates = _gather_candidate_targets(arguments)
for c in candidates:
try:
if not _is_target_in_scope(c, allowed):
out_of_scope.append(c)
except Exception:
out_of_scope.append(c)
if active and out_of_scope:
# Block execution and return an explicit error requiring operator confirmation
results.append(
ToolResult(
tool_call_id=tool_call_id,
tool_name=name,
error=(
f"Out-of-scope target(s): {out_of_scope} - operator confirmation required. "
"Set workspace targets with /target or run tool manually."
),
success=False,
)
)
else:
result = await tool.execute(arguments, self.runtime)
results.append(
ToolResult(
tool_call_id=tool_call_id,
tool_name=name,
result=result,
success=True,
)
)
)
except Exception as e:
results.append(
ToolResult(

View File

@@ -127,6 +127,25 @@ Examples:
mcp_test = mcp_subparsers.add_parser("test", help="Test MCP server connection")
mcp_test.add_argument("name", help="Server name to test")
# workspace management
ws_parser = subparsers.add_parser(
"workspace", help="Workspace lifecycle and info commands"
)
ws_parser.add_argument(
"action",
nargs="?",
help="Action or workspace name. Subcommands: info, note, clear, export, import",
)
ws_parser.add_argument("rest", nargs=argparse.REMAINDER, help="Additional arguments")
# NOTE: use `workspace list` to list workspaces (handled by workspace subcommand)
# target management
tgt_parser = subparsers.add_parser(
"target", help="Add or list targets for the active workspace"
)
tgt_parser.add_argument("values", nargs="*", help="Targets to add (IP/CIDR/hostname)")
return parser, parser.parse_args()
@@ -304,6 +323,210 @@ def handle_mcp_command(args: argparse.Namespace):
console.print("[yellow]Use 'pentestagent mcp --help' for available commands[/]")
def handle_workspace_command(args: argparse.Namespace):
"""Handle workspace lifecycle commands and actions."""
import shutil
from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError
from pentestagent.workspaces.utils import export_workspace, import_workspace, resolve_knowledge_paths
wm = WorkspaceManager()
action = args.action
rest = args.rest or []
# No args -> show active workspace
if not action:
active = wm.get_active()
if not active:
print("No active workspace.")
else:
print(f"Active workspace: {active}")
return
# Subcommands
if action == "info":
# show info for active or specified workspace
name = rest[0] if rest else wm.get_active()
if not name:
print("No workspace specified and no active workspace.")
return
try:
meta = wm.get_meta(name)
created = meta.get("created_at")
last_active = meta.get("last_active_at")
targets = meta.get("targets", [])
kp = resolve_knowledge_paths()
ks = "workspace" if kp.get("using_workspace") else "global"
# estimate loot size if present
import os
loot_dir = (wm.workspace_path(name) / "loot").resolve()
size = 0
files = 0
if loot_dir.exists():
for rootp, _, filenames in os.walk(loot_dir):
for fn in filenames:
try:
fp = os.path.join(rootp, fn)
size += os.path.getsize(fp)
files += 1
except Exception:
pass
print(f"Name: {name}")
print(f"Created: {created}")
print(f"Last active: {last_active}")
print(f"Targets: {len(targets)}")
print(f"Knowledge scope: {ks}")
print(f"Loot files: {files}, approx size: {size} bytes")
except Exception as e:
print(f"Error retrieving workspace info: {e}")
return
if action == "list":
# list all workspaces and mark active
try:
wss = wm.list_workspaces()
active = wm.get_active()
if not wss:
print("No workspaces found.")
return
for name in sorted(wss):
prefix = "* " if name == active else " "
print(f"{prefix}{name}")
except Exception as e:
print(f"Error listing workspaces: {e}")
return
if action == "note":
# Append operator note to active workspace (or specified)
name = rest[0] if rest and not rest[0].startswith("--") else wm.get_active()
if not name:
print("No active workspace. Set one with /workspace <name>.")
return
text = " ".join(rest[1:]) if rest and rest[0] == name else " ".join(rest)
if not text:
print("Usage: workspace note <text>")
return
try:
wm.set_operator_note(name, text)
print(f"Operator note saved for workspace '{name}'.")
except Exception as e:
print(f"Error saving note: {e}")
return
if action == "clear":
active = wm.get_active()
if not active:
print("No active workspace.")
return
marker = wm.active_marker()
try:
if marker.exists():
marker.unlink()
print(f"Workspace '{active}' deactivated.")
except Exception as e:
print(f"Error deactivating workspace: {e}")
return
if action == "export":
# export <NAME> [--output file.tar.gz]
if not rest:
print("Usage: workspace export <NAME> [--output file.tar.gz]")
return
name = rest[0]
out = None
if "--output" in rest:
idx = rest.index("--output")
if idx + 1 < len(rest):
out = Path(rest[idx + 1])
try:
archive = export_workspace(name, output=out)
print(f"Workspace exported: {archive}")
except Exception as e:
print(f"Export failed: {e}")
return
if action == "import":
# import <ARCHIVE>
if not rest:
print("Usage: workspace import <archive.tar.gz>")
return
archive = Path(rest[0])
try:
name = import_workspace(archive)
print(f"Workspace imported: {name} (not activated)")
except Exception as e:
print(f"Import failed: {e}")
return
# Default: treat action as workspace name -> create and set active
name = action
try:
existed = wm.workspace_path(name).exists()
if not existed:
wm.create(name)
wm.set_active(name)
# restore last target if present
last = wm.get_meta_field(name, "last_target")
if last:
print(f"Workspace '{name}' set active. Restored target: {last}")
else:
if existed:
print(f"Workspace '{name}' set active.")
else:
print(f"Workspace '{name}' created and set active.")
except WorkspaceError as e:
print(f"Error: {e}")
except Exception as e:
print(f"Error creating workspace: {e}")
def handle_workspaces_list():
from pentestagent.workspaces.manager import WorkspaceManager
wm = WorkspaceManager()
wss = wm.list_workspaces()
active = wm.get_active()
if not wss:
print("No workspaces found.")
return
for name in sorted(wss):
prefix = "* " if name == active else " "
print(f"{prefix}{name}")
def handle_target_command(args: argparse.Namespace):
"""Handle target add/list commands."""
from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError
wm = WorkspaceManager()
active = wm.get_active()
if not active:
print("No active workspace. Set one with /workspace <name>.")
return
vals = args.values or []
try:
if not vals:
targets = wm.list_targets(active)
if not targets:
print(f"No targets for workspace '{active}'.")
else:
print(f"Targets for workspace '{active}': {targets}")
return
saved = wm.add_targets(active, vals)
print(f"Targets for workspace '{active}': {saved}")
except WorkspaceError as e:
print(f"Error: {e}")
except Exception as e:
print(f"Error updating targets: {e}")
def main():
"""Main entry point."""
parser, args = parse_arguments()
@@ -317,6 +540,16 @@ def main():
handle_mcp_command(args)
return
if args.command == "workspace":
handle_workspace_command(args)
return
# 'workspace list' handled by workspace subcommand
if args.command == "target":
handle_target_command(args)
return
if args.command == "run":
# Check model configuration
if not args.model:

View File

@@ -109,8 +109,8 @@ class HelpScreen(ModalScreen):
}
#help-container {
width: 60;
height: 26;
width: 110;
height: 30;
background: #121212;
border: solid #3a3a3a;
padding: 1 2;
@@ -195,6 +195,137 @@ class HelpScreen(ModalScreen):
self.app.pop_screen()
class WorkspaceHelpScreen(ModalScreen):
"""Help modal for workspace commands."""
BINDINGS = [
Binding("escape", "dismiss", "Close"),
Binding("q", "dismiss", "Close"),
]
CSS = """
WorkspaceHelpScreen {
align: center middle;
scrollbar-background: #1a1a1a;
scrollbar-background-hover: #1a1a1a;
scrollbar-background-active: #1a1a1a;
scrollbar-color: #3a3a3a;
scrollbar-color-hover: #3a3a3a;
scrollbar-color-active: #3a3a3a;
scrollbar-corner-color: #1a1a1a;
scrollbar-size: 1 1;
}
#help-container {
width: 60;
height: 26;
background: #121212;
border: solid #3a3a3a;
padding: 1 2;
layout: vertical;
}
#help-title {
text-align: center;
text-style: bold;
color: #d4d4d4;
margin-bottom: 1;
}
#help-content {
color: #9a9a9a;
}
#help-close {
margin-top: 1;
width: auto;
min-width: 10;
background: #1a1a1a;
color: #9a9a9a;
border: none;
}
#help-close:hover {
background: #262626;
}
#help-close:focus {
background: #262626;
text-style: none;
}
"""
def compose(self) -> ComposeResult:
from rich.table import Table
from rich.text import Text
# Build a two-column table to prevent wrapping
table = Table.grid(padding=(0, 3))
table.add_column(justify="left", ratio=2)
table.add_column(justify="left", ratio=3)
# Header and usage
header = Text("Workspace Commands", style="bold")
usage = Text("Usage: /workspace <action> or /workspace <name>")
# Commands list
cmds = [
("/workspace", "Show active"),
("/workspace list", "List all workspaces"),
("/workspace info [NAME]", "Show workspace metadata"),
("/workspace note <text>", "Add operator note"),
("/workspace clear", "Deactivate workspace"),
("/workspace NAME", "Create or activate workspace"),
("/workspace help", "Show this help"),
]
# Compose table rows
table.add_row(Text("Commands:", style="bold"), Text(""))
for left, right in cmds:
table.add_row(left, right)
yield Container(
Static(header, id="help-title"),
Static(usage, id="help-usage"),
Static(table, id="help-content"),
Center(Button("Close", id="help-close"), id="help-center"),
id="help-container",
)
def _get_help_text(self) -> str:
header = "Usage: /workspace <action> or /workspace <name>\n"
cmds = [
("/workspace", "Show active"),
("/workspace list", "List all workspaces"),
("/workspace info [NAME]", "Show workspace metadata"),
("/workspace note <text>", "Add operator note"),
("/workspace clear", "Deactivate workspace"),
("/workspace NAME", "Create or activate workspace"),
("/workspace help", "Show this help"),
]
# Build two-column layout with fixed left column width
left_width = 44
lines = [header, "Commands:\n"]
for left, right in cmds:
if len(left) >= left_width - 2:
# if left is long, place on its own line
lines.append(f" {left}\n {right}")
else:
pad = " " * (left_width - len(left))
lines.append(f" {left}{pad}{right}")
return "\n".join(lines)
def action_dismiss(self) -> None:
self.app.pop_screen()
@on(Button.Pressed, "#help-close")
def close_help(self) -> None:
self.app.pop_screen()
class ToolsScreen(ModalScreen):
"""Interactive tools browser — split-pane layout.
@@ -1393,6 +1524,26 @@ class PentestAgentTUI(App):
# Update agent's target if agent exists
if self.agent:
self.agent.target = target
# Persist to active workspace if present
try:
from pentestagent.workspaces.manager import WorkspaceManager
wm = WorkspaceManager()
active = wm.get_active()
if active:
try:
wm.set_last_target(active, target)
except Exception:
pass
except Exception:
pass
# Update displayed Target in the UI
try:
self._apply_target_display(target)
except Exception:
pass
# Update the initial ready SystemMessage (if present) so Target appears under Runtime
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
@@ -1401,15 +1552,31 @@ class PentestAgentTUI(App):
if isinstance(child, SystemMessage) and "PentestAgent ready" in getattr(
child, "message_content", ""
):
# Append Target line if not already present
if "Target:" not in child.message_content:
child.message_content = (
child.message_content + f"\n Target: {target}"
)
# Replace existing Target line if present, otherwise append
try:
if "Target:" in child.message_content:
# replace the first Target line
import re
child.message_content = re.sub(
r"(?m)^\s*Target:.*$",
f" Target: {target}",
child.message_content,
count=1,
)
else:
child.message_content = (
child.message_content + f"\n Target: {target}"
)
try:
child.refresh()
except Exception:
pass
except Exception:
# Fallback to append if regex replacement fails
child.message_content = (
child.message_content + f"\n Target: {target}"
)
updated = True
break
if not updated:
@@ -1628,6 +1795,138 @@ Be concise. Use the actual data from notes."""
_ = cast(Any, self._run_report_generation())
elif cmd_original.startswith("/target"):
self._set_target(cmd_original)
elif cmd_original.startswith("/workspace"):
# Support lightweight workspace management from the TUI
try:
from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError
from pentestagent.workspaces.utils import resolve_knowledge_paths
from pathlib import Path
wm = WorkspaceManager()
rest = cmd_original[len("/workspace") :].strip()
if not rest:
active = wm.get_active()
if not active:
self._add_system("No active workspace.")
else:
# restore last target if present
last = wm.get_meta_field(active, "last_target")
if last:
self.target = last
if self.agent:
self.agent.target = last
try:
self._apply_target_display(last)
except Exception:
pass
self._add_system(f"Active workspace: {active}")
return
parts = rest.split()
verb = parts[0].lower()
if verb == "help":
try:
await self.push_screen(WorkspaceHelpScreen())
except Exception:
# Fallback: show inline help text
self._add_system(
"Usage: /workspace <action>\nCommands: list, info, note, clear, help, <name>"
)
return
if verb == "list":
wss = wm.list_workspaces()
if not wss:
self._add_system("No workspaces found.")
return
out = []
active = wm.get_active()
for name in sorted(wss):
prefix = "* " if name == active else " "
out.append(f"{prefix}{name}")
self._add_system("\n".join(out))
return
if verb == "info":
name = parts[1] if len(parts) > 1 else wm.get_active()
if not name:
self._add_system("No workspace specified and no active workspace.")
return
try:
meta = wm.get_meta(name)
created = meta.get("created_at")
last_active = meta.get("last_active_at")
targets = meta.get("targets", [])
kp = resolve_knowledge_paths()
ks = "workspace" if kp.get("using_workspace") else "global"
self._add_system(
f"Name: {name}\nCreated: {created}\nLast active: {last_active}\nTargets: {len(targets)}\nKnowledge scope: {ks}"
)
except Exception as e:
self._add_system(f"Error retrieving workspace info: {e}")
return
if verb == "note":
name = parts[1] if len(parts) > 1 and not parts[1].startswith("--") else wm.get_active()
if not name:
self._add_system("No active workspace. Set one with /workspace <name>.")
return
text = " ".join(parts[1:]) if len(parts) > 1 and parts[1] == name else " ".join(parts[1:])
if not text:
self._add_system("Usage: /workspace note <text>")
return
try:
wm.set_operator_note(name, text)
self._add_system(f"Operator note saved for workspace '{name}'.")
except Exception as e:
self._add_system(f"Error saving note: {e}")
return
if verb == "clear":
active = wm.get_active()
if not active:
self._add_system("No active workspace.")
return
marker = wm.active_marker()
try:
if marker.exists():
marker.unlink()
self._add_system(f"Workspace '{active}' deactivated.")
except Exception as e:
self._add_system(f"Error deactivating workspace: {e}")
return
# Default: treat rest as workspace name -> create (only if missing) and set active
name = rest
try:
existed = wm.workspace_path(name).exists()
if not existed:
wm.create(name)
wm.set_active(name)
# restore last target if set on workspace
last = wm.get_meta_field(name, "last_target")
if last:
self.target = last
if self.agent:
self.agent.target = last
try:
self._apply_target_display(last)
except Exception:
pass
if existed:
self._add_system(f"Workspace '{name}' set active.")
else:
self._add_system(f"Workspace '{name}' created and set active.")
except WorkspaceError as e:
self._add_system(f"Error: {e}")
except Exception as e:
self._add_system(f"Error creating workspace: {e}")
except Exception as e:
self._add_system(f"Workspace command error: {e}")
return
elif cmd_original.startswith("/agent"):
await self._parse_agent_command(cmd_original)
elif cmd_original.startswith("/crew"):
@@ -1748,6 +2047,53 @@ Be concise. Use the actual data from notes."""
except Exception as e:
self._add_system(f"[!] Sidebar error: {e}")
def _apply_target_display(self, target: str) -> None:
"""Update or insert the Target line in the system/banner area."""
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
updated = False
for child in scroll.children:
if isinstance(child, SystemMessage) and "PentestAgent ready" in getattr(
child, "message_content", ""
):
# Replace existing Target line if present, otherwise append
try:
if "Target:" in child.message_content:
import re
child.message_content = re.sub(
r"(?m)^\s*Target:.*$",
f" Target: {target}",
child.message_content,
count=1,
)
else:
child.message_content = (
child.message_content + f"\n Target: {target}"
)
try:
child.refresh()
except Exception:
pass
except Exception:
child.message_content = (
child.message_content + f"\n Target: {target}"
)
updated = True
break
if not updated:
try:
first = scroll.children[0] if scroll.children else None
msg = SystemMessage(f" Target: {target}")
if first:
scroll.mount_before(msg, first)
else:
scroll.mount(msg)
except Exception:
self._add_system(f" Target: {target}")
except Exception:
self._add_system(f" Target: {target}")
def _hide_sidebar(self) -> None:
"""Hide the sidebar."""
try:

View File

@@ -6,6 +6,7 @@ from pathlib import Path
from typing import Any, List
from .rag import Document
from ..workspaces.utils import resolve_knowledge_paths
@dataclass
@@ -51,6 +52,11 @@ class KnowledgeIndexer:
total_files = 0
indexed_files = 0
# If directory is the default 'knowledge', prefer workspace knowledge if available
if directory == Path("knowledge"):
kp = resolve_knowledge_paths()
directory = kp.get("sources", Path("knowledge"))
if not directory.exists():
return documents, IndexingResult(
0, 0, 0, [f"Directory not found: {directory}"]

View File

@@ -8,6 +8,7 @@ from typing import Any, Dict, List, Optional
import numpy as np
from .embeddings import get_embeddings
from ..workspaces.utils import resolve_knowledge_paths
@dataclass
@@ -65,9 +66,34 @@ class RAGEngine:
chunks = []
self._source_files = set() # Reset source file tracking
# Resolve knowledge paths (prefer workspace if available)
if self.knowledge_path != Path("knowledge"):
sources_base = self.knowledge_path
kp = None
else:
kp = resolve_knowledge_paths()
sources_base = kp.get("sources", Path("knowledge"))
# If workspace has a persisted index and we're not forcing reindex, try to load it
try:
if kp and kp.get("using_workspace"):
emb_dir = kp.get("embeddings")
emb_dir.mkdir(parents=True, exist_ok=True)
idx_path = emb_dir / "index.pkl"
if idx_path.exists() and not force:
try:
self.load_index(idx_path)
return
except Exception:
# Fall through to re-index if loading fails
pass
except Exception:
# Non-fatal — continue to index from sources
pass
# Process all files in knowledge directory
if self.knowledge_path.exists():
for file in self.knowledge_path.rglob("*"):
if sources_base.exists():
for file in sources_base.rglob("*"):
if not file.is_file():
continue
@@ -127,6 +153,19 @@ class RAGEngine:
doc.embedding = self.embeddings[i]
self._indexed = True
# If using a workspace, persist the built index for faster future loads
try:
if kp and kp.get("using_workspace") and self.embeddings is not None:
emb_dir = kp.get("embeddings")
emb_dir.mkdir(parents=True, exist_ok=True)
idx_path = emb_dir / "index.pkl"
try:
self.save_index(idx_path)
except Exception:
# ignore save failures
pass
except Exception:
pass
def _chunk_text(
self, text: str, source: str, chunk_size: int = 1000, overlap: int = 200
@@ -408,6 +447,22 @@ class RAGEngine:
with open(path, "wb") as f:
pickle.dump(data, f)
def save_index_to_workspace(self, root: Optional[Path] = None, filename: str = "index.pkl"):
"""
Convenience helper to save the index into the active workspace embeddings path.
Args:
root: Optional project root to resolve workspaces (defaults to cwd)
filename: Filename to use for the saved index
"""
from pathlib import Path as _P
kp = resolve_knowledge_paths(root=root)
emb_dir = kp.get("embeddings")
emb_dir.mkdir(parents=True, exist_ok=True)
path = _P(emb_dir) / filename
self.save_index(path)
def load_index(self, path: Path):
"""
Load the index from disk.
@@ -437,3 +492,20 @@ class RAGEngine:
doc.embedding = self.embeddings[i]
self._indexed = True
def load_index_from_workspace(self, root: Optional[Path] = None, filename: str = "index.pkl"):
"""
Convenience helper to load the index from the active workspace embeddings path.
Args:
root: Optional project root to resolve workspaces (defaults to cwd)
filename: Filename used for the saved index
"""
from pathlib import Path as _P
kp = resolve_knowledge_paths(root=root)
emb_dir = kp.get("embeddings")
path = _P(emb_dir) / filename
if not path.exists():
raise FileNotFoundError(f"Workspace index not found: {path}")
self.load_index(path)

View File

@@ -24,9 +24,8 @@ except Exception:
aiohttp = None
LOOT_DIR = Path("loot/artifacts")
LOOT_DIR.mkdir(parents=True, exist_ok=True)
LOG_FILE = LOOT_DIR / "hexstrike.log"
from ..workspaces.utils import get_loot_file
class HexstrikeAdapter:
@@ -97,7 +96,8 @@ class HexstrikeAdapter:
try:
pid = getattr(self._process, "pid", None)
if pid:
with LOG_FILE.open("a") as fh:
log_file = get_loot_file("artifacts/hexstrike.log")
with log_file.open("a") as fh:
fh.write(f"[HexstrikeAdapter] started pid={pid}\n")
except Exception:
pass
@@ -118,12 +118,12 @@ class HexstrikeAdapter:
return
try:
with LOG_FILE.open("ab") as fh:
log_file = get_loot_file("artifacts/hexstrike.log")
with log_file.open("ab") as fh:
while True:
line = await self._process.stdout.readline()
if not line:
break
# Prefix timestamps for easier debugging
fh.write(line)
fh.flush()
except asyncio.CancelledError:

View File

@@ -21,9 +21,7 @@ except Exception:
aiohttp = None
LOOT_DIR = Path("loot/artifacts")
LOOT_DIR.mkdir(parents=True, exist_ok=True)
LOG_FILE = LOOT_DIR / "metasploit_mcp.log"
from ..workspaces.utils import get_loot_file
class MetasploitAdapter:
@@ -193,7 +191,8 @@ class MetasploitAdapter:
try:
pid = getattr(self._process, "pid", None)
if pid:
with LOG_FILE.open("a") as fh:
log_file = get_loot_file("artifacts/metasploit_mcp.log")
with log_file.open("a") as fh:
fh.write(f"[MetasploitAdapter] started pid={pid}\n")
except Exception:
pass
@@ -212,7 +211,8 @@ class MetasploitAdapter:
return
try:
with LOG_FILE.open("ab") as fh:
log_file = get_loot_file("artifacts/metasploit_mcp.log")
with log_file.open("ab") as fh:
while True:
line = await self._process.stdout.readline()
if not line:

View File

@@ -455,11 +455,14 @@ class LocalRuntime(Runtime):
async def start(self):
"""Start the local runtime."""
self._running = True
# Create organized loot directory structure
Path("loot").mkdir(exist_ok=True)
Path("loot/reports").mkdir(exist_ok=True)
Path("loot/artifacts").mkdir(exist_ok=True)
Path("loot/artifacts/screenshots").mkdir(exist_ok=True)
# Create organized loot directory structure (workspace-aware)
from ..workspaces.utils import get_loot_base
base = get_loot_base()
(base).mkdir(parents=True, exist_ok=True)
(base / "reports").mkdir(parents=True, exist_ok=True)
(base / "artifacts").mkdir(parents=True, exist_ok=True)
(base / "artifacts" / "screenshots").mkdir(parents=True, exist_ok=True)
async def stop(self):
"""Stop the local runtime gracefully."""
@@ -659,9 +662,10 @@ class LocalRuntime(Runtime):
kwargs["url"], timeout=timeout, wait_until="domcontentloaded"
)
# Save screenshot to loot/artifacts/screenshots/
output_dir = Path("loot/artifacts/screenshots")
output_dir.mkdir(parents=True, exist_ok=True)
# Save screenshot to workspace-aware loot/artifacts/screenshots/
from ..workspaces.utils import get_loot_file
output_dir = get_loot_file("artifacts/screenshots").parent
timestamp = int(time.time())
unique_id = uuid.uuid4().hex[:8]

View File

@@ -9,17 +9,27 @@ from ..registry import ToolSchema, register_tool
# Notes storage - kept at loot root for easy access
_notes: Dict[str, Dict[str, Any]] = {}
_notes_file: Path = Path("loot/notes.json")
# Optional override (tests can call set_notes_file)
_custom_notes_file: Path | None = None
# Lock for safe concurrent access from multiple agents (asyncio since agents are async tasks)
_notes_lock = asyncio.Lock()
def _notes_file_path() -> Path:
from ...workspaces.utils import get_loot_file
if _custom_notes_file:
return _custom_notes_file
return get_loot_file("notes.json")
def _load_notes_unlocked() -> None:
"""Load notes from file (caller must hold lock)."""
global _notes
if _notes_file.exists():
nf = _notes_file_path()
if nf.exists():
try:
loaded = json.loads(_notes_file.read_text(encoding="utf-8"))
loaded = json.loads(nf.read_text(encoding="utf-8"))
# Migration: Convert legacy string values to dicts
_notes = {}
for k, v in loaded.items():
@@ -37,8 +47,9 @@ def _load_notes_unlocked() -> None:
def _save_notes_unlocked() -> None:
"""Save notes to file (caller must hold lock)."""
_notes_file.parent.mkdir(parents=True, exist_ok=True)
_notes_file.write_text(json.dumps(_notes, indent=2), encoding="utf-8")
nf = _notes_file_path()
nf.parent.mkdir(parents=True, exist_ok=True)
nf.write_text(json.dumps(_notes, indent=2), encoding="utf-8")
async def get_all_notes() -> Dict[str, Dict[str, Any]]:
@@ -52,9 +63,9 @@ async def get_all_notes() -> Dict[str, Dict[str, Any]]:
def get_all_notes_sync() -> Dict[str, Dict[str, Any]]:
"""Get all notes synchronously (read-only, best effort for prompts)."""
# If notes are empty, try to load from disk (safe read)
if not _notes and _notes_file.exists():
if not _notes and _notes_file_path().exists():
try:
loaded = json.loads(_notes_file.read_text(encoding="utf-8"))
loaded = json.loads(_notes_file_path().read_text(encoding="utf-8"))
# Migration for sync read
result = {}
for k, v in loaded.items():
@@ -74,14 +85,13 @@ def get_all_notes_sync() -> Dict[str, Dict[str, Any]]:
def set_notes_file(path: Path) -> None:
"""Set custom notes file path."""
global _notes_file
_notes_file = path
global _custom_notes_file
_custom_notes_file = Path(path)
# Can't use async here, so load without lock (called at init time)
_load_notes_unlocked()
# Load notes on module import (init time, no contention yet)
_load_notes_unlocked()
# Defer loading until first access to avoid caching active workspace path at import
# Validation schema - declarative rules for note structure

View File

@@ -11,8 +11,8 @@ from datetime import date
from pathlib import Path
from typing import Any, Dict
# Persistent storage (loot root)
_data_file: Path = Path("loot/token_usage.json")
# Persistent storage (loot root) - compute at use to respect active workspace
_custom_data_file: Path | None = None
_data_lock = threading.Lock()
# In-memory cache
@@ -27,9 +27,15 @@ _data: Dict[str, Any] = {
def _load_unlocked() -> None:
global _data
if _data_file.exists():
data_file = _custom_data_file or None
if not data_file:
from ..workspaces.utils import get_loot_file
data_file = get_loot_file("token_usage.json")
if data_file.exists():
try:
loaded = json.loads(_data_file.read_text(encoding="utf-8"))
loaded = json.loads(data_file.read_text(encoding="utf-8"))
# Merge with defaults to be robust to schema changes
d = {**_data, **(loaded or {})}
_data = d
@@ -45,14 +51,20 @@ def _load_unlocked() -> None:
def _save_unlocked() -> None:
_data_file.parent.mkdir(parents=True, exist_ok=True)
_data_file.write_text(json.dumps(_data, indent=2), encoding="utf-8")
data_file = _custom_data_file or None
if not data_file:
from ..workspaces.utils import get_loot_file
data_file = get_loot_file("token_usage.json")
data_file.parent.mkdir(parents=True, exist_ok=True)
data_file.write_text(json.dumps(_data, indent=2), encoding="utf-8")
def set_data_file(path: Path) -> None:
"""Override the data file (used by tests)."""
global _data_file
_data_file = path
global _custom_data_file
_custom_data_file = Path(path)
_load_unlocked()

View File

@@ -0,0 +1,3 @@
from .manager import WorkspaceManager, TargetManager, WorkspaceError
__all__ = ["WorkspaceManager", "TargetManager", "WorkspaceError"]

View File

@@ -0,0 +1,215 @@
"""WorkspaceManager: file-backed workspace and target management using YAML.
Design goals:
- Workspace metadata stored as YAML at workspaces/{name}/meta.yaml
- Active workspace marker stored at workspaces/.active
- No in-memory caching: all operations read/write files directly
- Lightweight hostname validation; accept IPs, CIDRs, hostnames
"""
from pathlib import Path
import re
import time
import ipaddress
from typing import List
import yaml
class WorkspaceError(Exception):
pass
WORKSPACES_DIR_NAME = "workspaces"
NAME_RE = re.compile(r"^[A-Za-z0-9._-]{1,64}$")
def _safe_mkdir(path: Path):
path.mkdir(parents=True, exist_ok=True)
class TargetManager:
"""Validate and normalize targets (IP, CIDR, hostname).
Hostname validation is intentionally light: allow letters, digits, hyphens, dots.
"""
HOST_RE = re.compile(r"^[A-Za-z0-9.-]{1,253}$")
@staticmethod
def normalize_target(value: str) -> str:
v = value.strip()
# try CIDR or IP
try:
if "/" in v:
net = ipaddress.ip_network(v, strict=False)
return str(net)
else:
ip = ipaddress.ip_address(v)
return str(ip)
except Exception:
# fallback to hostname validation (light)
if TargetManager.HOST_RE.match(v) and ".." not in v:
return v.lower()
raise WorkspaceError(f"Invalid target: {value}")
@staticmethod
def validate(value: str) -> bool:
try:
TargetManager.normalize_target(value)
return True
except WorkspaceError:
return False
class WorkspaceManager:
"""File-backed workspace manager. No persistent in-memory state.
Root defaults to current working directory.
"""
def __init__(self, root: Path = Path(".")):
self.root = Path(root)
self.workspaces_dir = self.root / WORKSPACES_DIR_NAME
_safe_mkdir(self.workspaces_dir)
def validate_name(self, name: str):
if not NAME_RE.match(name):
raise WorkspaceError(
"Invalid workspace name; allowed characters: A-Za-z0-9._- (1-64 chars)"
)
# prevent path traversal and slashes
if "/" in name or ".." in name:
raise WorkspaceError("Invalid workspace name; must not contain '/' or '..'")
def workspace_path(self, name: str) -> Path:
self.validate_name(name)
return self.workspaces_dir / name
def meta_path(self, name: str) -> Path:
return self.workspace_path(name) / "meta.yaml"
def active_marker(self) -> Path:
return self.workspaces_dir / ".active"
def create(self, name: str) -> dict:
self.validate_name(name)
p = self.workspace_path(name)
# create required dirs
for sub in ("loot", "knowledge/sources", "knowledge/embeddings", "notes", "memory"):
_safe_mkdir(p / sub)
# initialize meta if missing
if not self.meta_path(name).exists():
meta = {"name": name, "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"), "targets": []}
self._write_meta(name, meta)
return meta
return self._read_meta(name)
def _read_meta(self, name: str) -> dict:
mp = self.meta_path(name)
if not mp.exists():
return {"name": name, "targets": []}
try:
data = yaml.safe_load(mp.read_text(encoding="utf-8"))
if data is None:
return {"name": name, "targets": []}
# ensure keys
data.setdefault("name", name)
data.setdefault("targets", [])
return data
except Exception as e:
raise WorkspaceError(f"Failed to read meta for {name}: {e}")
def _write_meta(self, name: str, meta: dict):
mp = self.meta_path(name)
mp.parent.mkdir(parents=True, exist_ok=True)
mp.write_text(yaml.safe_dump(meta, sort_keys=False), encoding="utf-8")
def set_active(self, name: str):
# ensure workspace exists
self.create(name)
marker = self.active_marker()
marker.write_text(name, encoding="utf-8")
# update last_active_at in meta.yaml
try:
meta = self._read_meta(name)
meta["last_active_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ")
# ensure operator_notes and tool_runs exist
meta.setdefault("operator_notes", "")
meta.setdefault("tool_runs", [])
self._write_meta(name, meta)
except Exception:
# Non-fatal - don't block activation on meta write errors
pass
def set_operator_note(self, name: str, note: str) -> dict:
"""Append or set operator_notes for a workspace (plain text)."""
meta = self._read_meta(name)
prev = meta.get("operator_notes", "") or ""
if prev:
new = prev + "\n" + note
else:
new = note
meta["operator_notes"] = new
self._write_meta(name, meta)
return meta
def get_meta_field(self, name: str, field: str):
meta = self._read_meta(name)
return meta.get(field)
def get_active(self) -> str:
marker = self.active_marker()
if not marker.exists():
return ""
return marker.read_text(encoding="utf-8").strip()
def list_workspaces(self) -> List[str]:
if not self.workspaces_dir.exists():
return []
return [p.name for p in self.workspaces_dir.iterdir() if p.is_dir()]
def get_meta(self, name: str) -> dict:
return self._read_meta(name)
def add_targets(self, name: str, values: List[str]) -> List[str]:
# read-modify-write for strict file-backed behavior
meta = self._read_meta(name)
existing = set(meta.get("targets", []))
changed = False
for v in values:
norm = TargetManager.normalize_target(v)
if norm not in existing:
existing.add(norm)
changed = True
if changed:
meta["targets"] = sorted(existing)
self._write_meta(name, meta)
return meta.get("targets", [])
def set_last_target(self, name: str, value: str) -> str:
"""Set the workspace's last used target and ensure it's in the targets list."""
norm = TargetManager.normalize_target(value)
meta = self._read_meta(name)
# ensure targets contains it
existing = set(meta.get("targets", []))
if norm not in existing:
existing.add(norm)
meta["targets"] = sorted(existing)
meta["last_target"] = norm
self._write_meta(name, meta)
return norm
def remove_target(self, name: str, value: str) -> List[str]:
meta = self._read_meta(name)
existing = set(meta.get("targets", []))
norm = TargetManager.normalize_target(value)
if norm in existing:
existing.remove(norm)
meta["targets"] = sorted(existing)
self._write_meta(name, meta)
return meta.get("targets", [])
def list_targets(self, name: str) -> List[str]:
meta = self._read_meta(name)
return meta.get("targets", [])

View File

@@ -0,0 +1,175 @@
"""Utilities to route loot/output into the active workspace or global loot.
All functions are file-backed and do not cache the active workspace selection.
This module will emit a single warning per run if no active workspace is set.
"""
from pathlib import Path
import logging
from typing import Optional
from .manager import WorkspaceManager
_WARNED = False
def get_loot_base(root: Optional[Path] = None) -> Path:
"""Return the base loot directory: workspaces/{active}/loot or top-level `loot/`.
Emits a single warning if no workspace is active.
"""
global _WARNED
root = Path(root or "./")
wm = WorkspaceManager(root=root)
active = wm.get_active()
if active:
base = root / "workspaces" / active / "loot"
else:
if not _WARNED:
logging.warning("No active workspace — writing loot to global loot/ directory.")
_WARNED = True
base = root / "loot"
base.mkdir(parents=True, exist_ok=True)
return base
def get_loot_file(relpath: str, root: Optional[Path] = None) -> Path:
"""Return a Path for a file under the loot base, creating parent dirs.
Example: get_loot_file('artifacts/hexstrike.log')
"""
base = get_loot_base(root=root)
p = base / relpath
p.parent.mkdir(parents=True, exist_ok=True)
return p
def resolve_knowledge_paths(root: Optional[Path] = None) -> dict:
"""Resolve knowledge-related paths, preferring active workspace if present.
Returns a dict with keys: base, sources, embeddings, graph, index, using_workspace
"""
root = Path(root or "./")
wm = WorkspaceManager(root=root)
active = wm.get_active()
global_base = root / "knowledge"
workspace_base = root / "workspaces" / active / "knowledge" if active else None
use_workspace = False
if workspace_base and workspace_base.exists():
# prefer workspace if it has any content (explicit opt-in)
try:
if any(workspace_base.rglob("*")):
use_workspace = True
except Exception:
use_workspace = False
if use_workspace:
base = workspace_base
else:
base = global_base
paths = {
"base": base,
"sources": base / "sources",
"embeddings": base / "embeddings",
"graph": base / "graph",
"index": base / "index",
"using_workspace": use_workspace,
}
return paths
def export_workspace(name: str, output: Optional[Path] = None, root: Optional[Path] = None) -> Path:
"""Create a deterministic tar.gz archive of workspaces/{name}/ and return the archive path.
Excludes __pycache__ and *.pyc. Does not mutate workspace.
"""
import tarfile
root = Path(root or "./")
ws_dir = root / "workspaces" / name
if not ws_dir.exists() or not ws_dir.is_dir():
raise FileNotFoundError(f"Workspace not found: {name}")
out_path = Path(output) if output else Path(f"{name}-workspace.tar.gz")
# Use deterministic ordering
entries = []
for p in ws_dir.rglob("*"):
# skip __pycache__ and .pyc
if "__pycache__" in p.parts:
continue
if p.suffix == ".pyc":
continue
rel = p.relative_to(root)
entries.append(rel)
entries = sorted(entries, key=lambda p: str(p))
# Create tar.gz
with tarfile.open(out_path, "w:gz") as tf:
for rel in entries:
full = root / rel
# store with relative path (preserve workspaces/<name>/...)
tf.add(str(full), arcname=str(rel))
return out_path
def import_workspace(archive: Path, root: Optional[Path] = None) -> str:
"""Import a workspace tar.gz into workspaces/. Returns workspace name.
Fails if workspace already exists. Requires meta.yaml present in archive.
"""
import tarfile
import tempfile
root = Path(root or "./")
archive = Path(archive)
if not archive.exists():
raise FileNotFoundError(f"Archive not found: {archive}")
with tempfile.TemporaryDirectory() as td:
tdpath = Path(td)
with tarfile.open(archive, "r:gz") as tf:
tf.extractall(path=tdpath)
# Look for workspaces/<name>/meta.yaml or meta.yaml at root
candidates = list(tdpath.rglob("meta.yaml"))
if not candidates:
raise ValueError("No meta.yaml found in archive")
meta_file = candidates[0]
# read name
import yaml
meta = yaml.safe_load(meta_file.read_text(encoding="utf-8"))
if not meta or not meta.get("name"):
raise ValueError("meta.yaml missing 'name' field")
name = meta["name"]
dest = root / "workspaces" / name
if dest.exists():
raise FileExistsError(f"Workspace already exists: {name}")
# Move extracted tree into place
# Find root folder under tdpath that contains the workspace files
# If archive stored paths with workspaces/<name>/..., move that subtree
candidate_root = None
for p in tdpath.iterdir():
if p.is_dir() and p.name == "workspaces":
candidate_root = p / name
break
if candidate_root and candidate_root.exists():
# move candidate_root to dest
dest.parent.mkdir(parents=True, exist_ok=True)
candidate_root.replace(dest)
else:
# Otherwise, assume contents are directly the workspace folder
# move the parent of meta_file (or its containing dir)
src = meta_file.parent
dest.parent.mkdir(parents=True, exist_ok=True)
src.replace(dest)
return name

View File

@@ -36,7 +36,7 @@ typer>=0.12.0
pydantic>=2.7.0
pydantic-settings>=2.2.0
python-dotenv>=1.0.0
pyyaml>=6.0.0
PyYAML>=6.0
jinja2>=3.1.0
# Dev

View File

@@ -0,0 +1,50 @@
import os
from pathlib import Path
import pytest
from pentestagent.workspaces.manager import WorkspaceManager
from pentestagent.knowledge.rag import RAGEngine
from pentestagent.knowledge.indexer import KnowledgeIndexer
def test_rag_and_indexer_use_workspace(tmp_path, monkeypatch):
# Use tmp_path as the project root
monkeypatch.chdir(tmp_path)
wm = WorkspaceManager(root=tmp_path)
name = "ws_test"
wm.create(name)
wm.set_active(name)
# Create a sample source file in the workspace sources
src_dir = tmp_path / "workspaces" / name / "knowledge" / "sources"
src_dir.mkdir(parents=True, exist_ok=True)
sample = src_dir / "sample.md"
sample.write_text("# Sample\n\nThis is a test knowledge document for RAG indexing.")
# Ensure KnowledgeIndexer picks up the workspace source when indexing default 'knowledge'
ki = KnowledgeIndexer()
docs, result = ki.index_directory(Path("knowledge"))
assert result.indexed_files >= 1
assert len(docs) >= 1
# Ensure the document source path points at the workspace file
assert any("workspaces" in d.source and "sample.md" in d.source for d in docs)
# Now run RAGEngine to build embeddings and verify saved index file appears
rag = RAGEngine(use_local_embeddings=True)
rag.index()
emb_path = tmp_path / "workspaces" / name / "knowledge" / "embeddings" / "index.pkl"
assert emb_path.exists(), f"Expected saved index at {emb_path}"
# Ensure RAG engine has documents/chunks loaded
assert rag.get_chunk_count() >= 1
assert rag.get_document_count() >= 1
# Now create a new RAGEngine and ensure it loads persisted index automatically
rag2 = RAGEngine(use_local_embeddings=True)
# If load-on-init doesn't run, calling index() should load from saved file
rag2.index()
assert rag2.get_chunk_count() >= 1

96
tests/test_workspace.py Normal file
View File

@@ -0,0 +1,96 @@
import os
from pathlib import Path
import pytest
from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError
def test_invalid_workspace_names(tmp_path: Path):
wm = WorkspaceManager(root=tmp_path)
bad_names = ["../escape", "name/with/slash", "..", ""]
# overlong name
bad_names.append("a" * 65)
for n in bad_names:
with pytest.raises(WorkspaceError):
wm.create(n)
def test_create_and_idempotent(tmp_path: Path):
wm = WorkspaceManager(root=tmp_path)
name = "eng1"
meta = wm.create(name)
assert (tmp_path / "workspaces" / name).exists()
assert (tmp_path / "workspaces" / name / "meta.yaml").exists()
# create again should not raise and should return meta
meta2 = wm.create(name)
assert meta2["name"] == name
def test_set_get_active(tmp_path: Path):
wm = WorkspaceManager(root=tmp_path)
name = "activews"
wm.create(name)
wm.set_active(name)
assert wm.get_active() == name
marker = tmp_path / "workspaces" / ".active"
assert marker.exists()
assert marker.read_text(encoding="utf-8").strip() == name
def test_add_list_remove_targets(tmp_path: Path):
wm = WorkspaceManager(root=tmp_path)
name = "targets"
wm.create(name)
added = wm.add_targets(name, ["192.168.1.1", "192.168.0.0/16", "Example.COM"]) # hostname mixed case
# normalized entries
assert "192.168.1.1" in added
assert "192.168.0.0/16" in added
assert "example.com" in added
# dedupe
added2 = wm.add_targets(name, ["192.168.1.1", "example.com"])
assert len(added2) == len(added)
# remove
after = wm.remove_target(name, "192.168.1.1")
assert "192.168.1.1" not in after
def test_persistence_across_instances(tmp_path: Path):
wm1 = WorkspaceManager(root=tmp_path)
name = "persist"
wm1.create(name)
wm1.add_targets(name, ["10.0.0.1", "host.local"])
# new manager instance reads from disk
wm2 = WorkspaceManager(root=tmp_path)
targets = wm2.list_targets(name)
assert "10.0.0.1" in targets
assert "host.local" in targets
def test_last_target_persistence(tmp_path: Path):
wm = WorkspaceManager(root=tmp_path)
a = "wsA"
b = "wsB"
wm.create(a)
wm.create(b)
t1 = "192.168.0.4"
t2 = "192.168.0.165"
# set last target on workspace A and B
norm1 = wm.set_last_target(a, t1)
norm2 = wm.set_last_target(b, t2)
# persisted in meta
assert wm.get_meta_field(a, "last_target") == norm1
assert wm.get_meta_field(b, "last_target") == norm2
# targets list contains the last target
assert norm1 in wm.list_targets(a)
assert norm2 in wm.list_targets(b)
# new manager instance still sees last_target
wm2 = WorkspaceManager(root=tmp_path)
assert wm2.get_meta_field(a, "last_target") == norm1
assert wm2.get_meta_field(b, "last_target") == norm2

1
workspaces/.active Normal file
View File

@@ -0,0 +1 @@
Test2

View File

@@ -0,0 +1,8 @@
name: Test1
created_at: '2026-01-19T08:05:29Z'
targets:
- 192.168.0.4
last_active_at: '2026-01-19T08:28:24Z'
operator_notes: ''
tool_runs: []
last_target: 192.168.0.4

View File

@@ -0,0 +1,8 @@
name: Test2
created_at: '2026-01-19T08:05:55Z'
targets:
- 192.168.0.165
last_active_at: '2026-01-19T08:28:27Z'
operator_notes: ''
tool_runs: []
last_target: 192.168.0.165