mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-12-01 01:23:14 +00:00
feat: enhance MemoryTool and NotesTool with tool_id management and directory renaming tests (#2026)
This commit is contained in:
@@ -16,10 +16,40 @@ def memory_tool(monkeypatch) -> MemoryTool:
|
||||
tool_id = doc.get("tool_id")
|
||||
path = doc.get("path")
|
||||
key = f"{user_id}:{tool_id}:{path}"
|
||||
# Add _id to document if not present
|
||||
if "_id" not in doc:
|
||||
doc["_id"] = key
|
||||
self.docs[key] = doc
|
||||
return type("res", (), {"inserted_id": key})
|
||||
|
||||
def update_one(self, q, u, upsert=False):
|
||||
# Handle query by _id
|
||||
if "_id" in q:
|
||||
doc_id = q["_id"]
|
||||
if doc_id not in self.docs:
|
||||
return type("res", (), {"modified_count": 0})
|
||||
|
||||
if "$set" in u:
|
||||
old_doc = self.docs[doc_id].copy()
|
||||
old_doc.update(u["$set"])
|
||||
|
||||
# If path changed, update the dictionary key
|
||||
if "path" in u["$set"]:
|
||||
new_path = u["$set"]["path"]
|
||||
user_id = old_doc.get("user_id")
|
||||
tool_id = old_doc.get("tool_id")
|
||||
new_key = f"{user_id}:{tool_id}:{new_path}"
|
||||
|
||||
# Remove old key and add with new key
|
||||
del self.docs[doc_id]
|
||||
old_doc["_id"] = new_key
|
||||
self.docs[new_key] = old_doc
|
||||
else:
|
||||
self.docs[doc_id] = old_doc
|
||||
|
||||
return type("res", (), {"modified_count": 1})
|
||||
|
||||
# Handle query by user_id, tool_id, path
|
||||
user_id = q.get("user_id")
|
||||
tool_id = q.get("tool_id")
|
||||
path = q.get("path")
|
||||
@@ -29,7 +59,7 @@ def memory_tool(monkeypatch) -> MemoryTool:
|
||||
return type("res", (), {"modified_count": 0})
|
||||
|
||||
if key not in self.docs and upsert:
|
||||
self.docs[key] = {"user_id": user_id, "tool_id": tool_id, "path": path, "content": ""}
|
||||
self.docs[key] = {"user_id": user_id, "tool_id": tool_id, "path": path, "content": "", "_id": key}
|
||||
|
||||
if "$set" in u:
|
||||
self.docs[key].update(u["$set"])
|
||||
@@ -498,6 +528,33 @@ def test_memory_tool_isolation(monkeypatch) -> None:
|
||||
return type("res", (), {"inserted_id": key})
|
||||
|
||||
def update_one(self, q, u, upsert=False):
|
||||
# Handle query by _id
|
||||
if "_id" in q:
|
||||
doc_id = q["_id"]
|
||||
if doc_id not in self.docs:
|
||||
return type("res", (), {"modified_count": 0})
|
||||
|
||||
if "$set" in u:
|
||||
old_doc = self.docs[doc_id].copy()
|
||||
old_doc.update(u["$set"])
|
||||
|
||||
# If path changed, update the dictionary key
|
||||
if "path" in u["$set"]:
|
||||
new_path = u["$set"]["path"]
|
||||
user_id = old_doc.get("user_id")
|
||||
tool_id = old_doc.get("tool_id")
|
||||
new_key = f"{user_id}:{tool_id}:{new_path}"
|
||||
|
||||
# Remove old key and add with new key
|
||||
del self.docs[doc_id]
|
||||
old_doc["_id"] = new_key
|
||||
self.docs[new_key] = old_doc
|
||||
else:
|
||||
self.docs[doc_id] = old_doc
|
||||
|
||||
return type("res", (), {"modified_count": 1})
|
||||
|
||||
# Handle query by user_id, tool_id, path
|
||||
user_id = q.get("user_id")
|
||||
tool_id = q.get("tool_id")
|
||||
path = q.get("path")
|
||||
@@ -507,7 +564,7 @@ def test_memory_tool_isolation(monkeypatch) -> None:
|
||||
return type("res", (), {"modified_count": 0})
|
||||
|
||||
if key not in self.docs and upsert:
|
||||
self.docs[key] = {"user_id": user_id, "tool_id": tool_id, "path": path, "content": ""}
|
||||
self.docs[key] = {"user_id": user_id, "tool_id": tool_id, "path": path, "content": "", "_id": key}
|
||||
|
||||
if "$set" in u:
|
||||
self.docs[key].update(u["$set"])
|
||||
@@ -607,3 +664,102 @@ def test_paths_without_leading_slash(memory_tool) -> None:
|
||||
|
||||
view_nested = memory_tool.execute_action("view", path="projects/tasks.txt")
|
||||
assert "Task 1" in view_nested
|
||||
|
||||
|
||||
def test_rename_directory(memory_tool: MemoryTool) -> None:
|
||||
"""Test renaming a directory with files."""
|
||||
# Create files in a directory
|
||||
memory_tool.execute_action(
|
||||
"create",
|
||||
path="/docs/file1.txt",
|
||||
file_text="Content 1"
|
||||
)
|
||||
memory_tool.execute_action(
|
||||
"create",
|
||||
path="/docs/sub/file2.txt",
|
||||
file_text="Content 2"
|
||||
)
|
||||
|
||||
# Rename directory (with trailing slash)
|
||||
result = memory_tool.execute_action(
|
||||
"rename",
|
||||
old_path="/docs/",
|
||||
new_path="/archive/"
|
||||
)
|
||||
assert "renamed" in result.lower()
|
||||
assert "2 files" in result.lower()
|
||||
|
||||
# Verify old paths don't exist
|
||||
result = memory_tool.execute_action("view", path="/docs/file1.txt")
|
||||
assert "not found" in result.lower()
|
||||
|
||||
# Verify new paths exist
|
||||
result = memory_tool.execute_action("view", path="/archive/file1.txt")
|
||||
assert "Content 1" in result
|
||||
|
||||
result = memory_tool.execute_action("view", path="/archive/sub/file2.txt")
|
||||
assert "Content 2" in result
|
||||
|
||||
|
||||
def test_rename_directory_without_trailing_slash(memory_tool: MemoryTool) -> None:
|
||||
"""Test renaming a directory when new path is missing trailing slash."""
|
||||
# Create files in a directory
|
||||
memory_tool.execute_action(
|
||||
"create",
|
||||
path="/docs/file1.txt",
|
||||
file_text="Content 1"
|
||||
)
|
||||
memory_tool.execute_action(
|
||||
"create",
|
||||
path="/docs/sub/file2.txt",
|
||||
file_text="Content 2"
|
||||
)
|
||||
|
||||
# Rename directory - old path has slash, new path doesn't
|
||||
result = memory_tool.execute_action(
|
||||
"rename",
|
||||
old_path="/docs/",
|
||||
new_path="/archive" # Missing trailing slash
|
||||
)
|
||||
assert "renamed" in result.lower()
|
||||
|
||||
# Verify paths are correct (not corrupted like "/archivesub/file2.txt")
|
||||
result = memory_tool.execute_action("view", path="/archive/file1.txt")
|
||||
assert "Content 1" in result
|
||||
|
||||
result = memory_tool.execute_action("view", path="/archive/sub/file2.txt")
|
||||
assert "Content 2" in result
|
||||
|
||||
# Verify corrupted path doesn't exist
|
||||
result = memory_tool.execute_action("view", path="/archivesub/file2.txt")
|
||||
assert "not found" in result.lower()
|
||||
|
||||
|
||||
def test_view_file_line_numbers(memory_tool: MemoryTool) -> None:
|
||||
"""Test that view_range displays correct line numbers."""
|
||||
# Create a multiline file
|
||||
content = "Line 1\nLine 2\nLine 3\nLine 4\nLine 5"
|
||||
memory_tool.execute_action(
|
||||
"create",
|
||||
path="/numbered.txt",
|
||||
file_text=content
|
||||
)
|
||||
|
||||
# View lines 2-4
|
||||
result = memory_tool.execute_action(
|
||||
"view",
|
||||
path="/numbered.txt",
|
||||
view_range=[2, 4]
|
||||
)
|
||||
|
||||
# Check that line numbers are correct (should be 2, 3, 4 not 3, 4, 5)
|
||||
assert "2: Line 2" in result
|
||||
assert "3: Line 3" in result
|
||||
assert "4: Line 4" in result
|
||||
assert "1: Line 1" not in result
|
||||
assert "5: Line 5" not in result
|
||||
|
||||
# Verify no off-by-one error
|
||||
assert "3: Line 2" not in result # Wrong line number
|
||||
assert "4: Line 3" not in result # Wrong line number
|
||||
assert "5: Line 4" not in result # Wrong line number
|
||||
|
||||
Reference in New Issue
Block a user