diff --git a/application/agents/agentic_agent.py b/application/agents/agentic_agent.py index f642402a..c8af7a2a 100644 --- a/application/agents/agentic_agent.py +++ b/application/agents/agentic_agent.py @@ -3,9 +3,9 @@ from typing import Dict, Generator, Optional from application.agents.base import BaseAgent from application.agents.tools.internal_search import ( - INTERNAL_TOOL_ENTRY, INTERNAL_TOOL_ID, build_internal_tool_config, + build_internal_tool_entry, ) from application.logging import LogContext @@ -39,9 +39,13 @@ class AgenticAgent(BaseAgent): source = self.retriever_config.get("source", {}) has_sources = bool(source.get("active_docs")) if self.retriever_config and has_sources: - internal_entry = dict(INTERNAL_TOOL_ENTRY) + has_dir = _sources_have_directory_structure(source) + internal_entry = build_internal_tool_entry( + has_directory_structure=has_dir + ) internal_entry["config"] = build_internal_tool_config( - **self.retriever_config + **self.retriever_config, + has_directory_structure=has_dir, ) tools_dict[INTERNAL_TOOL_ID] = internal_entry @@ -74,3 +78,40 @@ class AgenticAgent(BaseAgent): tool = self.tool_executor._loaded_tools.get(cache_key) if tool and hasattr(tool, "retrieved_docs") and tool.retrieved_docs: self.retrieved_docs = tool.retrieved_docs + + +def _sources_have_directory_structure(source: Dict) -> bool: + """Check if any of the active sources have directory_structure in MongoDB.""" + active_docs = source.get("active_docs", []) + if not active_docs: + return False + + try: + from bson.objectid import ObjectId + from application.core.mongo_db import MongoDB + + mongo = MongoDB.get_client() + db = mongo[settings.MONGO_DB_NAME] + sources_collection = db["sources"] + + if isinstance(active_docs, str): + active_docs = [active_docs] + + for doc_id in active_docs: + try: + source_doc = sources_collection.find_one( + {"_id": ObjectId(doc_id)}, + {"directory_structure": 1}, + ) + if source_doc and source_doc.get("directory_structure"): + return True + except Exception: + continue + except Exception as e: + logger.debug(f"Could not check directory structure: {e}") + + return False + + +# Import settings at module level for _sources_have_directory_structure +from application.core.settings import settings # noqa: E402 diff --git a/application/agents/research_agent.py b/application/agents/research_agent.py index 1e4f9a73..b743cb16 100644 --- a/application/agents/research_agent.py +++ b/application/agents/research_agent.py @@ -6,10 +6,11 @@ from typing import Dict, Generator, List, Optional from application.agents.base import BaseAgent from application.agents.tool_executor import ToolExecutor +from application.agents.agentic_agent import _sources_have_directory_structure from application.agents.tools.internal_search import ( - INTERNAL_TOOL_ENTRY, INTERNAL_TOOL_ID, build_internal_tool_config, + build_internal_tool_entry, ) from application.agents.tools.think import THINK_TOOL_ENTRY, THINK_TOOL_ID from application.logging import LogContext @@ -275,9 +276,13 @@ class ResearchAgent(BaseAgent): source = self.retriever_config.get("source", {}) has_sources = bool(source.get("active_docs")) if self.retriever_config and has_sources: - internal_entry = dict(INTERNAL_TOOL_ENTRY) + has_dir = _sources_have_directory_structure(source) + internal_entry = build_internal_tool_entry( + has_directory_structure=has_dir + ) internal_entry["config"] = build_internal_tool_config( - **self.retriever_config + **self.retriever_config, + has_directory_structure=has_dir, ) tools_dict[INTERNAL_TOOL_ID] = internal_entry elif self.retriever_config and not has_sources: diff --git a/application/agents/tools/internal_search.py b/application/agents/tools/internal_search.py index 0b8c2c47..18b9c514 100644 --- a/application/agents/tools/internal_search.py +++ b/application/agents/tools/internal_search.py @@ -1,3 +1,4 @@ +import json import logging from typing import Dict, List, Optional @@ -13,19 +14,25 @@ class InternalSearchTool(Tool): Instead of pre-fetching docs into the prompt, the LLM decides when and what to search. Supports multiple searches per session. + + Optional capabilities (enabled when sources have directory_structure): + - path_filter on search: restrict results to a specific file/folder + - list_files action: browse the file/folder structure """ def __init__(self, config: Dict): self.config = config self.retrieved_docs: List[Dict] = [] self._retriever = None + self._directory_structure: Optional[Dict] = None + self._dir_structure_loaded = False def _get_retriever(self): if self._retriever is None: self._retriever = RetrieverCreator.create_retriever( self.config.get("retriever_name", "classic"), source=self.config.get("source", {}), - chat_history=[], # no rephrasing — LLM controls the query + chat_history=[], prompt="", chunks=int(self.config.get("chunks", 2)), doc_token_limit=int(self.config.get("doc_token_limit", 50000)), @@ -38,11 +45,65 @@ class InternalSearchTool(Tool): ) return self._retriever - def execute_action(self, action_name: str, **kwargs): - if action_name != "search": - return f"Unknown action: {action_name}" + def _get_directory_structure(self) -> Optional[Dict]: + """Load directory structure from MongoDB for the configured sources.""" + if self._dir_structure_loaded: + return self._directory_structure + self._dir_structure_loaded = True + source = self.config.get("source", {}) + active_docs = source.get("active_docs", []) + if not active_docs: + return None + + try: + from bson.objectid import ObjectId + from application.core.mongo_db import MongoDB + + mongo = MongoDB.get_client() + db = mongo[settings.MONGO_DB_NAME] + sources_collection = db["sources"] + + if isinstance(active_docs, str): + active_docs = [active_docs] + + merged_structure = {} + for doc_id in active_docs: + try: + source_doc = sources_collection.find_one( + {"_id": ObjectId(doc_id)} + ) + if not source_doc: + continue + dir_str = source_doc.get("directory_structure") + if dir_str: + if isinstance(dir_str, str): + dir_str = json.loads(dir_str) + source_name = source_doc.get("name", doc_id) + if len(active_docs) > 1: + merged_structure[source_name] = dir_str + else: + merged_structure = dir_str + except Exception as e: + logger.debug(f"Could not load dir structure for {doc_id}: {e}") + + self._directory_structure = merged_structure if merged_structure else None + except Exception as e: + logger.debug(f"Failed to load directory structures: {e}") + + return self._directory_structure + + def execute_action(self, action_name: str, **kwargs): + if action_name == "search": + return self._execute_search(**kwargs) + elif action_name == "list_files": + return self._execute_list_files(**kwargs) + return f"Unknown action: {action_name}" + + def _execute_search(self, **kwargs) -> str: query = kwargs.get("query", "") + path_filter = kwargs.get("path_filter", "") + if not query: return "Error: 'query' parameter is required." @@ -56,6 +117,19 @@ class InternalSearchTool(Tool): if not docs: return "No documents found matching your query." + # Apply path filter if specified + if path_filter: + path_lower = path_filter.lower() + docs = [ + d + for d in docs + if path_lower in d.get("source", "").lower() + or path_lower in d.get("filename", "").lower() + or path_lower in d.get("title", "").lower() + ] + if not docs: + return f"No documents found matching query '{query}' in path '{path_filter}'." + # Accumulate for source tracking for doc in docs: if doc not in self.retrieved_docs: @@ -73,8 +147,81 @@ class InternalSearchTool(Tool): return "\n\n---\n\n".join(formatted) + def _execute_list_files(self, **kwargs) -> str: + path = kwargs.get("path", "") + dir_structure = self._get_directory_structure() + + if not dir_structure: + return "No file structure available for the current sources." + + # Navigate to the requested path + current = dir_structure + if path: + for part in path.strip("/").split("/"): + if not part: + continue + if isinstance(current, dict) and part in current: + current = current[part] + else: + return f"Path '{path}' not found in the file structure." + + # Format the structure for the LLM + return self._format_structure(current, path or "/") + + def _format_structure(self, node: Dict, current_path: str) -> str: + if not isinstance(node, dict): + return f"'{current_path}' is a file, not a directory." + + lines = [f"File structure at '{current_path}':\n"] + folders = [] + files = [] + + for name, value in sorted(node.items()): + if isinstance(value, dict): + # Check if it's a file metadata dict or a folder + if "type" in value or "size_bytes" in value or "token_count" in value: + # It's a file with metadata + size = value.get("token_count", "") + ftype = value.get("type", "") + info_parts = [] + if ftype: + info_parts.append(ftype) + if size: + info_parts.append(f"{size} tokens") + info = f" ({', '.join(info_parts)})" if info_parts else "" + files.append(f" {name}{info}") + else: + # It's a folder + count = self._count_files(value) + folders.append(f" {name}/ ({count} items)") + else: + files.append(f" {name}") + + if folders: + lines.append("Folders:") + lines.extend(folders) + if files: + lines.append("Files:") + lines.extend(files) + if not folders and not files: + lines.append(" (empty)") + + return "\n".join(lines) + + def _count_files(self, node: Dict) -> int: + count = 0 + for value in node.values(): + if isinstance(value, dict): + if "type" in value or "size_bytes" in value or "token_count" in value: + count += 1 + else: + count += self._count_files(value) + else: + count += 1 + return count + def get_actions_metadata(self): - return [ + actions = [ { "name": "search", "description": ( @@ -89,12 +236,47 @@ class InternalSearchTool(Tool): "description": "The search query. Be specific and focused.", "filled_by_llm": True, "required": True, - } + }, } }, } ] + # Add path_filter and list_files only if directory structure exists + has_structure = self.config.get("has_directory_structure", False) + if has_structure: + actions[0]["parameters"]["properties"]["path_filter"] = { + "type": "string", + "description": ( + "Optional: filter results to a specific file or folder path. " + "Use list_files first to see available paths." + ), + "filled_by_llm": True, + "required": False, + } + actions.append( + { + "name": "list_files", + "description": ( + "Browse the file and folder structure of the knowledge base. " + "Use this to see what files are available before searching. " + "Optionally provide a path to browse a specific folder." + ), + "parameters": { + "properties": { + "path": { + "type": "string", + "description": "Optional: folder path to browse. Leave empty for root.", + "filled_by_llm": True, + "required": False, + } + } + }, + } + ) + + return actions + def get_config_requirements(self): return {} @@ -102,9 +284,25 @@ class InternalSearchTool(Tool): # Constants for building synthetic tools_dict entries INTERNAL_TOOL_ID = "internal" -INTERNAL_TOOL_ENTRY = { - "name": "internal_search", - "actions": [ + +def build_internal_tool_entry(has_directory_structure: bool = False) -> Dict: + """Build the tools_dict entry for InternalSearchTool. + + Dynamically includes list_files and path_filter based on + whether the sources have directory structure. + """ + search_params = { + "properties": { + "query": { + "type": "string", + "description": "The search query. Be specific and focused.", + "filled_by_llm": True, + "required": True, + } + } + } + + actions = [ { "name": "search", "description": ( @@ -113,19 +311,47 @@ INTERNAL_TOOL_ENTRY = { "You can call this multiple times with different queries." ), "active": True, - "parameters": { - "properties": { - "query": { - "type": "string", - "description": "The search query. Be specific and focused.", - "filled_by_llm": True, - "required": True, - } - } - }, + "parameters": search_params, } - ], -} + ] + + if has_directory_structure: + search_params["properties"]["path_filter"] = { + "type": "string", + "description": ( + "Optional: filter results to a specific file or folder path. " + "Use list_files first to see available paths." + ), + "filled_by_llm": True, + "required": False, + } + actions.append( + { + "name": "list_files", + "description": ( + "Browse the file and folder structure of the knowledge base. " + "Use this to see what files are available before searching. " + "Optionally provide a path to browse a specific folder." + ), + "active": True, + "parameters": { + "properties": { + "path": { + "type": "string", + "description": "Optional: folder path to browse. Leave empty for root.", + "filled_by_llm": True, + "required": False, + } + } + }, + } + ) + + return {"name": "internal_search", "actions": actions} + + +# Keep backward compat +INTERNAL_TOOL_ENTRY = build_internal_tool_entry(has_directory_structure=False) def build_internal_tool_config( @@ -139,6 +365,7 @@ def build_internal_tool_config( llm_name: str = None, api_key: str = None, decoded_token: Optional[Dict] = None, + has_directory_structure: bool = False, ) -> Dict: """Build the config dict for InternalSearchTool.""" return { @@ -152,4 +379,5 @@ def build_internal_tool_config( "llm_name": llm_name or settings.LLM_PROVIDER, "api_key": api_key or settings.API_KEY, "decoded_token": decoded_token, + "has_directory_structure": has_directory_structure, }