mirror of
https://github.com/GH05TCREW/pentestagent.git
synced 2026-03-07 22:33:38 +00:00
feat(memory): improve summarization
This commit is contained in:
@@ -2,22 +2,30 @@
|
||||
|
||||
from typing import Awaitable, Callable, List, Optional
|
||||
|
||||
SUMMARY_PROMPT = """Summarize this conversation history for a pentesting agent. Be terse.
|
||||
SUMMARY_PROMPT = """Summarize the following conversation segment for a penetration testing agent.
|
||||
The summary will be used to continue the security assessment, so preserve all critical operational details.
|
||||
|
||||
Focus on:
|
||||
- Targets discovered (IPs, domains, hosts)
|
||||
- Open ports and services found
|
||||
- Credentials or secrets discovered
|
||||
- Vulnerabilities identified
|
||||
- What was attempted and failed (to avoid repeating)
|
||||
- Current objective/progress
|
||||
What to preserve:
|
||||
- Discovered targets (IPs, domains, hostnames) and network topology
|
||||
- Services, versions, and technologies identified (keep exact version strings)
|
||||
- Open ports and running services with specific details
|
||||
- Vulnerabilities found or suspected (CVEs, misconfigurations, weaknesses)
|
||||
- Credentials, tokens, API keys, or authentication details discovered
|
||||
- Attack vectors attempted and their outcomes (success or failure)
|
||||
- System architecture and relationships between hosts
|
||||
- Important error messages or behaviors that may indicate vulnerabilities
|
||||
- Current testing strategy and next planned steps
|
||||
|
||||
Omit: verbose tool output, back-and-forth clarifications, redundant info.
|
||||
Compression approach:
|
||||
- Consolidate redundant or repetitive findings into single statements
|
||||
- Reduce verbose tool output while maintaining key technical findings
|
||||
- Keep technical precision: exact paths, URLs, parameters, version numbers
|
||||
- Remove conversational back-and-forth but preserve decisions made
|
||||
|
||||
Conversation to summarize:
|
||||
Conversation segment:
|
||||
{conversation}
|
||||
|
||||
Summary:"""
|
||||
Provide a concise technical summary:"""
|
||||
|
||||
|
||||
class ConversationMemory:
|
||||
@@ -59,6 +67,16 @@ class ConversationMemory:
|
||||
self._encoder = None
|
||||
return self._encoder
|
||||
|
||||
def _count_tokens_with_litellm(self, text: str, model: str) -> Optional[int]:
|
||||
"""Try to count tokens using litellm for better accuracy."""
|
||||
try:
|
||||
import litellm
|
||||
|
||||
count = litellm.token_counter(model=model, text=text)
|
||||
return int(count)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@property
|
||||
def token_budget(self) -> int:
|
||||
"""Available tokens for history."""
|
||||
@@ -159,7 +177,7 @@ class ConversationMemory:
|
||||
self, messages: List[dict], llm_call: Callable[[str], Awaitable[str]]
|
||||
) -> str:
|
||||
"""
|
||||
Summarize a list of messages.
|
||||
Summarize a list of messages using chunked approach for better granularity.
|
||||
|
||||
Args:
|
||||
messages: Messages to summarize
|
||||
@@ -168,18 +186,41 @@ class ConversationMemory:
|
||||
Returns:
|
||||
Summary string
|
||||
"""
|
||||
# Format messages for summarization
|
||||
conversation_text = self._format_for_summary(messages)
|
||||
if not messages:
|
||||
return "[No messages to summarize]"
|
||||
|
||||
# Call LLM for summary
|
||||
prompt = SUMMARY_PROMPT.format(conversation=conversation_text)
|
||||
# Use chunked summarization for better context preservation
|
||||
# Process in chunks of 8-12 messages for balance between detail and efficiency
|
||||
chunk_size = 10
|
||||
summaries = []
|
||||
|
||||
try:
|
||||
summary = await llm_call(prompt)
|
||||
return summary.strip()
|
||||
except Exception as e:
|
||||
# Fallback: simple truncation indicator
|
||||
return f"[{len(messages)} earlier messages - summarization failed: {e}]"
|
||||
for i in range(0, len(messages), chunk_size):
|
||||
chunk = messages[i : i + chunk_size]
|
||||
conversation_text = self._format_for_summary(chunk)
|
||||
prompt = SUMMARY_PROMPT.format(conversation=conversation_text)
|
||||
|
||||
try:
|
||||
chunk_summary = await llm_call(prompt)
|
||||
if chunk_summary and chunk_summary.strip():
|
||||
summaries.append(chunk_summary.strip())
|
||||
except Exception as e:
|
||||
# Log failure but continue with other chunks
|
||||
summaries.append(
|
||||
f"[{len(chunk)} messages from segment {i // chunk_size + 1} - summary failed: {e}]"
|
||||
)
|
||||
|
||||
# Combine chunk summaries
|
||||
if not summaries:
|
||||
return f"[{len(messages)} earlier messages - all summarization attempts failed]"
|
||||
|
||||
# If we have multiple summaries, join them with context markers
|
||||
if len(summaries) == 1:
|
||||
return summaries[0]
|
||||
else:
|
||||
combined = "\n\n".join(
|
||||
f"Segment {i + 1}: {summary}" for i, summary in enumerate(summaries)
|
||||
)
|
||||
return combined
|
||||
|
||||
def _format_for_summary(self, messages: List[dict]) -> str:
|
||||
"""Format messages as text for summarization."""
|
||||
@@ -188,9 +229,20 @@ class ConversationMemory:
|
||||
role = msg.get("role", "unknown")
|
||||
content = msg.get("content", "")
|
||||
|
||||
# Truncate very long messages for summarization input
|
||||
if len(content) > 2000:
|
||||
content = content[:2000] + "...[truncated]"
|
||||
# Preserve more content for tool outputs (they contain findings)
|
||||
# but still limit to avoid overwhelming the summarizer
|
||||
max_length = 4000 if role == "tool" else 2000
|
||||
if len(content) > max_length:
|
||||
# For tool outputs, try to preserve beginning and end
|
||||
if role == "tool":
|
||||
half = max_length // 2
|
||||
content = (
|
||||
content[:half]
|
||||
+ f"\n...[{len(content) - max_length} chars truncated]...\n"
|
||||
+ content[-half:]
|
||||
)
|
||||
else:
|
||||
content = content[:max_length] + "...[truncated]"
|
||||
|
||||
if role == "user":
|
||||
lines.append(f"User: {content}")
|
||||
@@ -199,6 +251,9 @@ class ConversationMemory:
|
||||
elif role == "tool":
|
||||
tool_name = msg.get("name", "tool")
|
||||
lines.append(f"Tool ({tool_name}): {content}")
|
||||
elif role == "system":
|
||||
# Skip system messages in summarization input
|
||||
continue
|
||||
|
||||
return "\n\n".join(lines)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user