Merge pull request #1873 from ManishMadan2882/main

Sources are the new Docs
This commit is contained in:
Alex
2025-08-13 18:24:35 +01:00
committed by GitHub
40 changed files with 3412 additions and 1119 deletions

View File

@@ -1,5 +1,6 @@
import os
import datetime
import json
from flask import Blueprint, request, send_from_directory
from werkzeug.utils import secure_filename
from bson.objectid import ObjectId
@@ -48,7 +49,17 @@ def upload_index_files():
remote_data = request.form["remote_data"] if "remote_data" in request.form else None
sync_frequency = request.form["sync_frequency"] if "sync_frequency" in request.form else None
original_file_path = request.form.get("original_file_path")
file_path = request.form.get("file_path")
directory_structure = request.form.get("directory_structure")
if directory_structure:
try:
directory_structure = json.loads(directory_structure)
except Exception:
logger.error("Error parsing directory_structure")
directory_structure = {}
else:
directory_structure = {}
storage = StorageCreator.get_storage()
index_base_path = f"indexes/{id}"
@@ -66,10 +77,13 @@ def upload_index_files():
file_pkl = request.files["file_pkl"]
if file_pkl.filename == "":
return {"status": "no file name"}
# Save index files to storage
storage.save_file(file_faiss, f"{index_base_path}/index.faiss")
storage.save_file(file_pkl, f"{index_base_path}/index.pkl")
faiss_storage_path = f"{index_base_path}/index.faiss"
pkl_storage_path = f"{index_base_path}/index.pkl"
storage.save_file(file_faiss, faiss_storage_path)
storage.save_file(file_pkl, pkl_storage_path)
existing_entry = sources_collection.find_one({"_id": ObjectId(id)})
if existing_entry:
@@ -87,7 +101,8 @@ def upload_index_files():
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
"file_path": original_file_path,
"file_path": file_path,
"directory_structure": directory_structure,
}
},
)
@@ -105,7 +120,8 @@ def upload_index_files():
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
"file_path": original_file_path,
"file_path": file_path,
"directory_structure": directory_structure,
}
)
return {"status": "ok"}

View File

@@ -3,11 +3,11 @@ import json
import math
import os
import secrets
import shutil
import uuid
from functools import wraps
from typing import Optional, Tuple
import tempfile
import zipfile
from bson.binary import Binary, UuidRepresentation
from bson.dbref import DBRef
from bson.objectid import ObjectId
@@ -44,6 +44,7 @@ from application.utils import (
validate_function_name,
validate_required_fields,
)
from application.utils import num_tokens_from_string
from application.vectorstore.vector_creator import VectorCreator
storage = StorageCreator.get_storage()
@@ -474,7 +475,7 @@ class DeleteByIds(Resource):
@user_ns.route("/api/delete_old")
class DeleteOldIndexes(Resource):
@api.doc(
description="Deletes old indexes",
description="Deletes old indexes and associated files",
params={"source_id": "The source ID to delete"},
)
def get(self):
@@ -491,21 +492,40 @@ class DeleteOldIndexes(Resource):
)
if not doc:
return make_response(jsonify({"status": "not found"}), 404)
storage = StorageCreator.get_storage()
try:
# Delete vector index
if settings.VECTOR_STORE == "faiss":
shutil.rmtree(os.path.join(current_dir, "indexes", str(doc["_id"])))
index_path = f"indexes/{str(doc['_id'])}"
if storage.file_exists(f"{index_path}/index.faiss"):
storage.delete_file(f"{index_path}/index.faiss")
if storage.file_exists(f"{index_path}/index.pkl"):
storage.delete_file(f"{index_path}/index.pkl")
else:
vectorstore = VectorCreator.create_vectorstore(
settings.VECTOR_STORE, source_id=str(doc["_id"])
)
vectorstore.delete_index()
if "file_path" in doc and doc["file_path"]:
file_path = doc["file_path"]
if storage.is_directory(file_path):
files = storage.list_files(file_path)
for f in files:
storage.delete_file(f)
else:
storage.delete_file(file_path)
except FileNotFoundError:
pass
except Exception as err:
current_app.logger.error(
f"Error deleting old indexes: {err}", exc_info=True
f"Error deleting files and indexes: {err}", exc_info=True
)
return make_response(jsonify({"success": False}), 400)
sources_collection.delete_one({"_id": ObjectId(source_id)})
return make_response(jsonify({"success": True}), 200)
@@ -549,146 +569,276 @@ class UploadFile(Resource):
# Create safe versions for filesystem operations
safe_user = safe_filename(user)
dir_name = safe_filename(job_name)
base_path = f"{settings.UPLOAD_FOLDER}/{safe_user}/{dir_name}"
try:
storage = StorageCreator.get_storage()
base_path = f"{settings.UPLOAD_FOLDER}/{safe_user}/{dir_name}"
if len(files) > 1:
temp_files = []
for file in files:
filename = safe_filename(file.filename)
temp_path = f"{base_path}/temp/{filename}"
storage.save_file(file, temp_path)
temp_files.append(temp_path)
print(f"Saved file: {filename}")
zip_filename = f"{dir_name}.zip"
zip_path = f"{base_path}/{zip_filename}"
zip_temp_path = None
def create_zip_archive(temp_paths, dir_name, storage):
import tempfile
with tempfile.NamedTemporaryFile(
delete=False, suffix=".zip"
) as temp_zip_file:
zip_output_path = temp_zip_file.name
with tempfile.TemporaryDirectory() as stage_dir:
for path in temp_paths:
try:
file_data = storage.get_file(path)
with open(
os.path.join(stage_dir, os.path.basename(path)),
"wb",
) as f:
f.write(file_data.read())
except Exception as e:
current_app.logger.error(
f"Error processing file {path} for zipping: {e}",
exc_info=True,
)
if os.path.exists(zip_output_path):
os.remove(zip_output_path)
raise
for file in files:
original_filename = file.filename
safe_file = safe_filename(original_filename)
with tempfile.TemporaryDirectory() as temp_dir:
temp_file_path = os.path.join(temp_dir, safe_file)
file.save(temp_file_path)
if zipfile.is_zipfile(temp_file_path):
try:
shutil.make_archive(
base_name=zip_output_path.replace(".zip", ""),
format="zip",
root_dir=stage_dir,
)
with zipfile.ZipFile(temp_file_path, 'r') as zip_ref:
zip_ref.extractall(path=temp_dir)
# Walk through extracted files and upload them
for root, _, files in os.walk(temp_dir):
for extracted_file in files:
if os.path.join(root, extracted_file) == temp_file_path:
continue
rel_path = os.path.relpath(os.path.join(root, extracted_file), temp_dir)
storage_path = f"{base_path}/{rel_path}"
with open(os.path.join(root, extracted_file), 'rb') as f:
storage.save_file(f, storage_path)
except Exception as e:
current_app.logger.error(
f"Error creating zip archive: {e}", exc_info=True
)
if os.path.exists(zip_output_path):
os.remove(zip_output_path)
raise
return zip_output_path
try:
zip_temp_path = create_zip_archive(temp_files, dir_name, storage)
with open(zip_temp_path, "rb") as zip_file:
storage.save_file(zip_file, zip_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
".rst",
".md",
".pdf",
".txt",
".docx",
".csv",
".epub",
".html",
".mdx",
".json",
".xlsx",
".pptx",
".png",
".jpg",
".jpeg",
],
job_name,
zip_filename,
user,
dir_name,
safe_user,
)
finally:
# Clean up temporary files
for temp_path in temp_files:
try:
storage.delete_file(temp_path)
except Exception as e:
current_app.logger.error(
f"Error deleting temporary file {temp_path}: {e}",
exc_info=True,
)
# Clean up the zip file if it was created
if zip_temp_path and os.path.exists(zip_temp_path):
os.remove(zip_temp_path)
else: # Keep this else block for single file upload
# For single file
file = files[0]
filename = safe_filename(file.filename)
file_path = f"{base_path}/{filename}"
storage.save_file(file, file_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
".rst",
".md",
".pdf",
".txt",
".docx",
".csv",
".epub",
".html",
".mdx",
".json",
".xlsx",
".pptx",
".png",
".jpg",
".jpeg",
],
job_name,
filename, # Corrected variable for single-file case
user,
dir_name,
safe_user,
)
current_app.logger.error(f"Error extracting zip: {e}", exc_info=True)
# If zip extraction fails, save the original zip file
file_path = f"{base_path}/{safe_file}"
with open(temp_file_path, 'rb') as f:
storage.save_file(f, file_path)
else:
# For non-zip files, save directly
file_path = f"{base_path}/{safe_file}"
with open(temp_file_path, 'rb') as f:
storage.save_file(f, file_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
".jpg", ".jpeg",
],
job_name,
user,
file_path=base_path,
filename=dir_name
)
except Exception as err:
current_app.logger.error(f"Error uploading file: {err}", exc_info=True)
return make_response(jsonify({"success": False}), 400)
return make_response(jsonify({"success": True, "task_id": task.id}), 200)
@user_ns.route("/api/manage_source_files")
class ManageSourceFiles(Resource):
@api.expect(
api.model(
"ManageSourceFilesModel",
{
"source_id": fields.String(required=True, description="Source ID to modify"),
"operation": fields.String(required=True, description="Operation: 'add', 'remove', or 'remove_directory'"),
"file_paths": fields.List(fields.String, required=False, description="File paths to remove (for remove operation)"),
"directory_path": fields.String(required=False, description="Directory path to remove (for remove_directory operation)"),
"file": fields.Raw(required=False, description="Files to add (for add operation)"),
"parent_dir": fields.String(required=False, description="Parent directory path relative to source root"),
},
)
)
@api.doc(
description="Add files, remove files, or remove directories from an existing source",
)
def post(self):
decoded_token = request.decoded_token
if not decoded_token:
return make_response(jsonify({"success": False, "message": "Unauthorized"}), 401)
user = decoded_token.get("sub")
source_id = request.form.get("source_id")
operation = request.form.get("operation")
if not source_id or not operation:
return make_response(
jsonify({"success": False, "message": "source_id and operation are required"}), 400
)
if operation not in ["add", "remove", "remove_directory"]:
return make_response(
jsonify({"success": False, "message": "operation must be 'add', 'remove', or 'remove_directory'"}), 400
)
try:
ObjectId(source_id)
except Exception:
return make_response(
jsonify({"success": False, "message": "Invalid source ID format"}), 400
)
try:
source = sources_collection.find_one({"_id": ObjectId(source_id), "user": user})
if not source:
return make_response(
jsonify({"success": False, "message": "Source not found or access denied"}), 404
)
except Exception as err:
current_app.logger.error(f"Error finding source: {err}", exc_info=True)
return make_response(jsonify({"success": False, "message": "Database error"}), 500)
try:
storage = StorageCreator.get_storage()
source_file_path = source.get("file_path", "")
parent_dir = request.form.get("parent_dir", "")
if parent_dir and (parent_dir.startswith("/") or ".." in parent_dir):
return make_response(
jsonify({"success": False, "message": "Invalid parent directory path"}), 400
)
if operation == "add":
files = request.files.getlist("file")
if not files or all(file.filename == "" for file in files):
return make_response(
jsonify({"success": False, "message": "No files provided for add operation"}), 400
)
added_files = []
target_dir = source_file_path
if parent_dir:
target_dir = f"{source_file_path}/{parent_dir}"
for file in files:
if file.filename:
safe_filename_str = safe_filename(file.filename)
file_path = f"{target_dir}/{safe_filename_str}"
# Save file to storage
storage.save_file(file, file_path)
added_files.append(safe_filename_str)
# Trigger re-ingestion pipeline
from application.api.user.tasks import reingest_source_task
task = reingest_source_task.delay(source_id=source_id, user=user)
return make_response(jsonify({
"success": True,
"message": f"Added {len(added_files)} files",
"added_files": added_files,
"parent_dir": parent_dir,
"reingest_task_id": task.id
}), 200)
elif operation == "remove":
file_paths_str = request.form.get("file_paths")
if not file_paths_str:
return make_response(
jsonify({"success": False, "message": "file_paths required for remove operation"}), 400
)
try:
file_paths = json.loads(file_paths_str) if isinstance(file_paths_str, str) else file_paths_str
except Exception:
return make_response(
jsonify({"success": False, "message": "Invalid file_paths format"}), 400
)
# Remove files from storage and directory structure
removed_files = []
for file_path in file_paths:
full_path = f"{source_file_path}/{file_path}"
# Remove from storage
if storage.file_exists(full_path):
storage.delete_file(full_path)
removed_files.append(file_path)
# Trigger re-ingestion pipeline
from application.api.user.tasks import reingest_source_task
task = reingest_source_task.delay(source_id=source_id, user=user)
return make_response(jsonify({
"success": True,
"message": f"Removed {len(removed_files)} files",
"removed_files": removed_files,
"reingest_task_id": task.id
}), 200)
elif operation == "remove_directory":
directory_path = request.form.get("directory_path")
if not directory_path:
return make_response(
jsonify({"success": False, "message": "directory_path required for remove_directory operation"}), 400
)
# Validate directory path (prevent path traversal)
if directory_path.startswith("/") or ".." in directory_path:
current_app.logger.warning(
f"Invalid directory path attempted for removal. "
f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}"
)
return make_response(
jsonify({"success": False, "message": "Invalid directory path"}), 400
)
full_directory_path = f"{source_file_path}/{directory_path}" if directory_path else source_file_path
if not storage.is_directory(full_directory_path):
current_app.logger.warning(
f"Directory not found or is not a directory for removal. "
f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}, "
f"Full path: {full_directory_path}"
)
return make_response(
jsonify({"success": False, "message": "Directory not found or is not a directory"}), 404
)
success = storage.remove_directory(full_directory_path)
if not success:
current_app.logger.error(
f"Failed to remove directory from storage. "
f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}, "
f"Full path: {full_directory_path}"
)
return make_response(
jsonify({"success": False, "message": "Failed to remove directory"}), 500
)
current_app.logger.info(
f"Successfully removed directory. "
f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}, "
f"Full path: {full_directory_path}"
)
# Trigger re-ingestion pipeline
from application.api.user.tasks import reingest_source_task
task = reingest_source_task.delay(source_id=source_id, user=user)
return make_response(jsonify({
"success": True,
"message": f"Successfully removed directory: {directory_path}",
"removed_directory": directory_path,
"reingest_task_id": task.id
}), 200)
except Exception as err:
error_context = f"operation={operation}, user={user}, source_id={source_id}"
if operation == "remove_directory":
directory_path = request.form.get("directory_path", "")
error_context += f", directory_path={directory_path}"
elif operation == "remove":
file_paths_str = request.form.get("file_paths", "")
error_context += f", file_paths={file_paths_str}"
elif operation == "add":
parent_dir = request.form.get("parent_dir", "")
error_context += f", parent_dir={parent_dir}"
current_app.logger.error(f"Error managing source files: {err} ({error_context})", exc_info=True)
return make_response(jsonify({"success": False, "message": "Operation failed"}), 500)
@user_ns.route("/api/remote")
class UploadRemote(Resource):
@api.expect(
@@ -834,6 +984,7 @@ class PaginatedSources(Resource):
"tokens": doc.get("tokens", ""),
"retriever": doc.get("retriever", "classic"),
"syncFrequency": doc.get("sync_frequency", ""),
"isNested": bool(doc.get("directory_structure"))
}
paginated_docs.append(doc_data)
response = {
@@ -881,6 +1032,7 @@ class CombinedJson(Resource):
"tokens": index.get("tokens", ""),
"retriever": index.get("retriever", "classic"),
"syncFrequency": index.get("sync_frequency", ""),
"is_nested": bool(index.get("directory_structure"))
}
)
except Exception as err:
@@ -3374,8 +3526,14 @@ class DeleteTool(Resource):
@user_ns.route("/api/get_chunks")
class GetChunks(Resource):
@api.doc(
description="Retrieves all chunks associated with a document",
params={"id": "The document ID"},
description="Retrieves chunks from a document, optionally filtered by file path and search term",
params={
"id": "The document ID",
"page": "Page number for pagination",
"per_page": "Number of chunks per page",
"path": "Optional: Filter chunks by relative file path",
"search": "Optional: Search term to filter chunks by title or content"
},
)
def get(self):
decoded_token = request.decoded_token
@@ -3385,6 +3543,8 @@ class GetChunks(Resource):
doc_id = request.args.get("id")
page = int(request.args.get("page", 1))
per_page = int(request.args.get("per_page", 10))
path = request.args.get("path")
search_term = request.args.get("search", "").strip().lower()
if not ObjectId.is_valid(doc_id):
return make_response(jsonify({"error": "Invalid doc_id"}), 400)
@@ -3396,6 +3556,30 @@ class GetChunks(Resource):
try:
store = get_vector_store(doc_id)
chunks = store.get_chunks()
filtered_chunks = []
for chunk in chunks:
metadata = chunk.get("metadata", {})
# Filter by path if provided
if path:
chunk_source = metadata.get("source", "")
# Check if the chunk's source matches the requested path
if not chunk_source or not chunk_source.endswith(path):
continue
# Filter by search term if provided
if search_term:
text_match = search_term in chunk.get("text", "").lower()
title_match = search_term in metadata.get("title", "").lower()
if not (text_match or title_match):
continue
filtered_chunks.append(chunk)
chunks = filtered_chunks
total_chunks = len(chunks)
start = (page - 1) * per_page
end = start + per_page
@@ -3408,6 +3592,8 @@ class GetChunks(Resource):
"per_page": per_page,
"total": total_chunks,
"chunks": paginated_chunks,
"path": path if path else None,
"search": search_term if search_term else None
}
),
200,
@@ -3416,7 +3602,6 @@ class GetChunks(Resource):
current_app.logger.error(f"Error getting chunks: {e}", exc_info=True)
return make_response(jsonify({"success": False}), 500)
@user_ns.route("/api/add_chunk")
class AddChunk(Resource):
@api.expect(
@@ -3448,6 +3633,8 @@ class AddChunk(Resource):
doc_id = data.get("id")
text = data.get("text")
metadata = data.get("metadata", {})
token_count = num_tokens_from_string(text)
metadata["token_count"] = token_count
if not ObjectId.is_valid(doc_id):
return make_response(jsonify({"error": "Invalid doc_id"}), 400)
@@ -3544,6 +3731,12 @@ class UpdateChunk(Resource):
text = data.get("text")
metadata = data.get("metadata")
if text is not None:
token_count = num_tokens_from_string(text)
if metadata is None:
metadata = {}
metadata["token_count"] = token_count
if not ObjectId.is_valid(doc_id):
return make_response(jsonify({"error": "Invalid doc_id"}), 400)
doc = sources_collection.find_one({"_id": ObjectId(doc_id), "user": user})
@@ -3553,31 +3746,45 @@ class UpdateChunk(Resource):
)
try:
store = get_vector_store(doc_id)
chunks = store.get_chunks()
existing_chunk = next((c for c in chunks if c["doc_id"] == chunk_id), None)
if not existing_chunk:
return make_response(jsonify({"error": "Chunk not found"}), 404)
deleted = store.delete_chunk(chunk_id)
if not deleted:
return make_response(
jsonify({"error": "Failed to delete existing chunk"}), 500
)
new_text = text if text is not None else existing_chunk["text"]
new_metadata = (
metadata if metadata is not None else existing_chunk["metadata"]
)
new_chunk_id = store.add_chunk(new_text, new_metadata)
if metadata is not None:
new_metadata = existing_chunk["metadata"].copy()
new_metadata.update(metadata)
else:
new_metadata = existing_chunk["metadata"].copy()
return make_response(
jsonify(
{
"message": "Chunk updated successfully",
"new_chunk_id": new_chunk_id,
}
),
200,
)
if text is not None:
new_metadata["token_count"] = num_tokens_from_string(new_text)
try:
new_chunk_id = store.add_chunk(new_text, new_metadata)
deleted = store.delete_chunk(chunk_id)
if not deleted:
current_app.logger.warning(f"Failed to delete old chunk {chunk_id}, but new chunk {new_chunk_id} was created")
return make_response(
jsonify(
{
"message": "Chunk updated successfully",
"chunk_id": new_chunk_id,
"original_chunk_id": chunk_id,
}
),
200,
)
except Exception as add_error:
current_app.logger.error(f"Failed to add updated chunk: {add_error}")
return make_response(
jsonify({"error": "Failed to update chunk - addition failed"}), 500
)
except Exception as e:
current_app.logger.error(f"Error updating chunk: {e}", exc_info=True)
return make_response(jsonify({"success": False}), 500)
@@ -3681,3 +3888,51 @@ class ServeImage(Resource):
return make_response(
jsonify({"success": False, "message": "Error retrieving image"}), 500
)
@user_ns.route("/api/directory_structure")
class DirectoryStructure(Resource):
@api.doc(
description="Get the directory structure for a document",
params={"id": "The document ID"},
)
def get(self):
decoded_token = request.decoded_token
if not decoded_token:
return make_response(jsonify({"success": False}), 401)
user = decoded_token.get("sub")
doc_id = request.args.get("id")
if not doc_id:
return make_response(
jsonify({"error": "Document ID is required"}), 400
)
if not ObjectId.is_valid(doc_id):
return make_response(jsonify({"error": "Invalid document ID"}), 400)
try:
doc = sources_collection.find_one({"_id": ObjectId(doc_id), "user": user})
if not doc:
return make_response(
jsonify({"error": "Document not found or access denied"}), 404
)
directory_structure = doc.get("directory_structure", {})
return make_response(
jsonify({
"success": True,
"directory_structure": directory_structure,
"base_path": doc.get("file_path", "")
}), 200
)
except Exception as e:
current_app.logger.error(
f"Error retrieving directory structure: {e}", exc_info=True
)
return make_response(
jsonify({"success": False, "error": str(e)}), 500
)

View File

@@ -11,8 +11,8 @@ from application.worker import (
@celery.task(bind=True)
def ingest(self, directory, formats, job_name, filename, user, dir_name, user_dir):
resp = ingest_worker(self, directory, formats, job_name, filename, user, dir_name, user_dir)
def ingest(self, directory, formats, job_name, user, file_path, filename):
resp = ingest_worker(self, directory, formats, job_name, file_path, filename, user)
return resp
@@ -22,6 +22,13 @@ def ingest_remote(self, source_data, job_name, user, loader):
return resp
@celery.task(bind=True)
def reingest_source_task(self, source_id, user):
from application.worker import reingest_source_worker
resp = reingest_source_worker(self, source_id, user)
return resp
@celery.task(bind=True)
def schedule_syncs(self, frequency):
resp = sync_worker(self, frequency)

View File

@@ -32,16 +32,7 @@ class Chunker:
header, body = "", text # No header, treat entire text as body
return header, body
def combine_documents(self, doc: Document, next_doc: Document) -> Document:
combined_text = doc.text + " " + next_doc.text
combined_token_count = len(self.encoding.encode(combined_text))
new_doc = Document(
text=combined_text,
doc_id=doc.doc_id,
embedding=doc.embedding,
extra_info={**(doc.extra_info or {}), "token_count": combined_token_count}
)
return new_doc
def split_document(self, doc: Document) -> List[Document]:
split_docs = []
@@ -82,26 +73,11 @@ class Chunker:
processed_docs.append(doc)
i += 1
elif token_count < self.min_tokens:
if i + 1 < len(documents):
next_doc = documents[i + 1]
next_tokens = self.encoding.encode(next_doc.text)
if token_count + len(next_tokens) <= self.max_tokens:
# Combine small documents
combined_doc = self.combine_documents(doc, next_doc)
processed_docs.append(combined_doc)
i += 2
else:
# Keep the small document as is if adding next_doc would exceed max_tokens
doc.extra_info = doc.extra_info or {}
doc.extra_info["token_count"] = token_count
processed_docs.append(doc)
i += 1
else:
# No next document to combine with; add the small document as is
doc.extra_info = doc.extra_info or {}
doc.extra_info["token_count"] = token_count
processed_docs.append(doc)
i += 1
doc.extra_info = doc.extra_info or {}
doc.extra_info["token_count"] = token_count
processed_docs.append(doc)
i += 1
else:
# Split large documents
processed_docs.extend(self.split_document(doc))

View File

@@ -46,7 +46,7 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status):
store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
docs_init=docs_init,
source_id=folder_name,
source_id=source_id,
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
else:

View File

@@ -15,6 +15,7 @@ from application.parser.file.json_parser import JSONParser
from application.parser.file.pptx_parser import PPTXParser
from application.parser.file.image_parser import ImageParser
from application.parser.schema.base import Document
from application.utils import num_tokens_from_string
DEFAULT_FILE_EXTRACTOR: Dict[str, BaseParser] = {
".pdf": PDFParser(),
@@ -141,11 +142,12 @@ class SimpleDirectoryReader(BaseReader):
Returns:
List[Document]: A list of documents.
"""
data: Union[str, List[str]] = ""
data_list: List[str] = []
metadata_list = []
self.file_token_counts = {}
for input_file in self.input_files:
if input_file.suffix in self.file_extractor:
parser = self.file_extractor[input_file.suffix]
@@ -156,24 +158,48 @@ class SimpleDirectoryReader(BaseReader):
# do standard read
with open(input_file, "r", errors=self.errors) as f:
data = f.read()
# Prepare metadata for this file
if self.file_metadata is not None:
file_metadata = self.file_metadata(input_file.name)
# Calculate token count for this file
if isinstance(data, List):
file_tokens = sum(num_tokens_from_string(str(d)) for d in data)
else:
# Provide a default empty metadata
file_metadata = {'title': '', 'store': ''}
# TODO: Find a case with no metadata and check if breaks anything
file_tokens = num_tokens_from_string(str(data))
full_path = str(input_file.resolve())
self.file_token_counts[full_path] = file_tokens
base_metadata = {
'title': input_file.name,
'token_count': file_tokens,
}
if hasattr(self, 'input_dir'):
try:
relative_path = str(input_file.relative_to(self.input_dir))
base_metadata['source'] = relative_path
except ValueError:
base_metadata['source'] = str(input_file)
else:
base_metadata['source'] = str(input_file)
if self.file_metadata is not None:
custom_metadata = self.file_metadata(input_file.name)
base_metadata.update(custom_metadata)
if isinstance(data, List):
# Extend data_list with each item in the data list
data_list.extend([str(d) for d in data])
# For each item in the data list, add the file's metadata to metadata_list
metadata_list.extend([file_metadata for _ in data])
metadata_list.extend([base_metadata for _ in data])
else:
# Add the single piece of data to data_list
data_list.append(str(data))
# Add the file's metadata to metadata_list
metadata_list.append(file_metadata)
metadata_list.append(base_metadata)
# Build directory structure if input_dir is provided
if hasattr(self, 'input_dir'):
self.directory_structure = self.build_directory_structure(self.input_dir)
logging.info("Directory structure built successfully")
else:
self.directory_structure = {}
if concatenate:
return [Document("\n".join(data_list))]
@@ -181,3 +207,48 @@ class SimpleDirectoryReader(BaseReader):
return [Document(d, extra_info=m) for d, m in zip(data_list, metadata_list)]
else:
return [Document(d) for d in data_list]
def build_directory_structure(self, base_path):
"""Build a dictionary representing the directory structure.
Args:
base_path: The base path to start building the structure from.
Returns:
dict: A nested dictionary representing the directory structure.
"""
import mimetypes
def build_tree(path):
"""Helper function to recursively build the directory tree."""
result = {}
for item in path.iterdir():
if self.exclude_hidden and item.name.startswith('.'):
continue
if item.is_dir():
subtree = build_tree(item)
if subtree:
result[item.name] = subtree
else:
if self.required_exts is not None and item.suffix not in self.required_exts:
continue
full_path = str(item.resolve())
file_size_bytes = item.stat().st_size
mime_type = mimetypes.guess_type(item.name)[0] or "application/octet-stream"
file_info = {
"type": mime_type,
"size_bytes": file_size_bytes
}
if hasattr(self, 'file_token_counts') and full_path in self.file_token_counts:
file_info["token_count"] = self.file_token_counts[full_path]
result[item.name] = file_info
return result
return build_tree(Path(base_path))

View File

@@ -93,3 +93,32 @@ class BaseStorage(ABC):
List[str]: List of file paths
"""
pass
@abstractmethod
def is_directory(self, path: str) -> bool:
"""
Check if a path is a directory.
Args:
path: Path to check
Returns:
bool: True if the path is a directory
"""
pass
@abstractmethod
def remove_directory(self, directory: str) -> bool:
"""
Remove a directory and all its contents.
For local storage, this removes the directory and all files/subdirectories within it.
For S3 storage, this removes all objects with the directory path as a prefix.
Args:
directory: Directory path to remove
Returns:
bool: True if removal was successful, False otherwise
"""
pass

View File

@@ -101,3 +101,40 @@ class LocalStorage(BaseStorage):
raise FileNotFoundError(f"File not found: {full_path}")
return processor_func(local_path=full_path, **kwargs)
def is_directory(self, path: str) -> bool:
"""
Check if a path is a directory in local storage.
Args:
path: Path to check
Returns:
bool: True if the path is a directory, False otherwise
"""
full_path = self._get_full_path(path)
return os.path.isdir(full_path)
def remove_directory(self, directory: str) -> bool:
"""
Remove a directory and all its contents from local storage.
Args:
directory: Directory path to remove
Returns:
bool: True if removal was successful, False otherwise
"""
full_path = self._get_full_path(directory)
if not os.path.exists(full_path):
return False
if not os.path.isdir(full_path):
return False
try:
shutil.rmtree(full_path)
return True
except (OSError, PermissionError):
return False

View File

@@ -130,3 +130,77 @@ class S3Storage(BaseStorage):
except Exception as e:
logging.error(f"Error processing S3 file {path}: {e}", exc_info=True)
raise
def is_directory(self, path: str) -> bool:
"""
Check if a path is a directory in S3 storage.
In S3, directories are virtual concepts. A path is considered a directory
if there are objects with the path as a prefix.
Args:
path: Path to check
Returns:
bool: True if the path is a directory, False otherwise
"""
# Ensure path ends with a slash if not empty
if path and not path.endswith('/'):
path += '/'
response = self.s3.list_objects_v2(
Bucket=self.bucket_name,
Prefix=path,
MaxKeys=1
)
return 'Contents' in response
def remove_directory(self, directory: str) -> bool:
"""
Remove a directory and all its contents from S3 storage.
In S3, this removes all objects with the directory path as a prefix.
Since S3 doesn't have actual directories, this effectively removes
all files within the virtual directory structure.
Args:
directory: Directory path to remove
Returns:
bool: True if removal was successful, False otherwise
"""
# Ensure directory ends with a slash if not empty
if directory and not directory.endswith('/'):
directory += '/'
try:
# Get all objects with the directory prefix
objects_to_delete = []
paginator = self.s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=self.bucket_name, Prefix=directory)
for page in pages:
if 'Contents' in page:
for obj in page['Contents']:
objects_to_delete.append({'Key': obj['Key']})
if not objects_to_delete:
return False
batch_size = 1000
for i in range(0, len(objects_to_delete), batch_size):
batch = objects_to_delete[i:i + batch_size]
response = self.s3.delete_objects(
Bucket=self.bucket_name,
Delete={'Objects': batch}
)
if 'Errors' in response and response['Errors']:
return False
return True
except ClientError:
return False

View File

@@ -1,5 +1,6 @@
import os
import tempfile
import io
from langchain_community.vectorstores import FAISS
@@ -66,8 +67,37 @@ class FaissStore(BaseVectorStore):
def add_texts(self, *args, **kwargs):
return self.docsearch.add_texts(*args, **kwargs)
def save_local(self, *args, **kwargs):
return self.docsearch.save_local(*args, **kwargs)
def _save_to_storage(self):
"""
Save the FAISS index to storage using temporary directory pattern.
Works consistently for both local and S3 storage.
"""
with tempfile.TemporaryDirectory() as temp_dir:
self.docsearch.save_local(temp_dir)
faiss_path = os.path.join(temp_dir, "index.faiss")
pkl_path = os.path.join(temp_dir, "index.pkl")
with open(faiss_path, "rb") as f_faiss:
faiss_data = f_faiss.read()
with open(pkl_path, "rb") as f_pkl:
pkl_data = f_pkl.read()
storage_path = get_vectorstore(self.source_id)
self.storage.save_file(io.BytesIO(faiss_data), f"{storage_path}/index.faiss")
self.storage.save_file(io.BytesIO(pkl_data), f"{storage_path}/index.pkl")
return True
def save_local(self, path=None):
if path:
os.makedirs(path, exist_ok=True)
self.docsearch.save_local(path)
self._save_to_storage()
return True
def delete_index(self, *args, **kwargs):
return self.docsearch.delete(*args, **kwargs)
@@ -103,13 +133,17 @@ class FaissStore(BaseVectorStore):
return chunks
def add_chunk(self, text, metadata=None):
"""Add a new chunk and save to storage."""
metadata = metadata or {}
doc = Document(text=text, extra_info=metadata).to_langchain_format()
doc_id = self.docsearch.add_documents([doc])
self.save_local(self.path)
self._save_to_storage()
return doc_id
def delete_chunk(self, chunk_id):
"""Delete a chunk and save to storage."""
self.delete_index([chunk_id])
self.save_local(self.path)
self._save_to_storage()
return True

View File

@@ -103,11 +103,23 @@ def download_file(url, params, dest_path):
def upload_index(full_path, file_data):
files = None
try:
if settings.VECTOR_STORE == "faiss":
faiss_path = full_path + "/index.faiss"
pkl_path = full_path + "/index.pkl"
if not os.path.exists(faiss_path):
logging.error(f"FAISS index file not found: {faiss_path}")
raise FileNotFoundError(f"FAISS index file not found: {faiss_path}")
if not os.path.exists(pkl_path):
logging.error(f"FAISS pickle file not found: {pkl_path}")
raise FileNotFoundError(f"FAISS pickle file not found: {pkl_path}")
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
"file_faiss": open(faiss_path, "rb"),
"file_pkl": open(pkl_path, "rb"),
}
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"),
@@ -119,11 +131,11 @@ def upload_index(full_path, file_data):
urljoin(settings.API_URL, "/api/upload_index"), data=file_data
)
response.raise_for_status()
except requests.RequestException as e:
except (requests.RequestException, FileNotFoundError) as e:
logging.error(f"Error uploading index: {e}")
raise
finally:
if settings.VECTOR_STORE == "faiss":
if settings.VECTOR_STORE == "faiss" and files is not None:
for file in files.values():
file.close()
@@ -200,15 +212,8 @@ def run_agent_logic(agent_config, input_data):
def ingest_worker(
self,
directory,
formats,
job_name,
filename,
user,
dir_name=None,
user_dir=None,
retriever="classic",
self, directory, formats, job_name, file_path, filename, user,
retriever="classic"
):
"""
Ingest and process documents.
@@ -218,10 +223,9 @@ def ingest_worker(
directory (str): Specifies the directory for ingesting ('inputs' or 'temp').
formats (list of str): List of file extensions to consider for ingestion (e.g., [".rst", ".md"]).
job_name (str): Name of the job for this ingestion task (original, unsanitized).
filename (str): Name of the file to be ingested.
file_path (str): Complete file path to use consistently throughout the pipeline.
filename (str): Original unsanitized filename provided by the user.
user (str): Identifier for the user initiating the ingestion (original, unsanitized).
dir_name (str, optional): Sanitized directory name for filesystem operations.
user_dir (str, optional): Sanitized user ID for filesystem operations.
retriever (str): Type of retriever to use for processing the documents.
Returns:
@@ -234,11 +238,8 @@ def ingest_worker(
sample = False
storage = StorageCreator.get_storage()
full_path = os.path.join(directory, user_dir, dir_name)
source_file_path = os.path.join(full_path, filename)
logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": job_name})
logging.info(f"Ingest path: {file_path}", extra={"user": user, "job": job_name})
# Create temporary working directory
@@ -246,22 +247,46 @@ def ingest_worker(
try:
os.makedirs(temp_dir, exist_ok=True)
# Download file from storage to temp directory
if storage.is_directory(file_path):
# Handle directory case
logging.info(f"Processing directory: {file_path}")
files_list = storage.list_files(file_path)
for storage_file_path in files_list:
if storage.is_directory(storage_file_path):
continue
# Create relative path structure in temp directory
rel_path = os.path.relpath(storage_file_path, file_path)
local_file_path = os.path.join(temp_dir, rel_path)
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
# Download file
try:
file_data = storage.get_file(storage_file_path)
with open(local_file_path, "wb") as f:
f.write(file_data.read())
except Exception as e:
logging.error(f"Error downloading file {storage_file_path}: {e}")
continue
else:
# Handle single file case
temp_filename = os.path.basename(file_path)
temp_file_path = os.path.join(temp_dir, temp_filename)
file_data = storage.get_file(file_path)
with open(temp_file_path, "wb") as f:
f.write(file_data.read())
temp_file_path = os.path.join(temp_dir, filename)
file_data = storage.get_file(source_file_path)
# Handle zip files
if temp_filename.endswith(".zip"):
logging.info(f"Extracting zip file: {temp_filename}")
extract_zip_recursive(
temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH
)
with open(temp_file_path, "wb") as f:
f.write(file_data.read())
self.update_state(state="PROGRESS", meta={"current": 1})
# Handle zip files
if filename.endswith(".zip"):
logging.info(f"Extracting zip file: {filename}")
extract_zip_recursive(
temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH
)
if sample:
logging.info(f"Sample mode enabled. Using {limit} documents.")
reader = SimpleDirectoryReader(
@@ -273,6 +298,9 @@ def ingest_worker(
file_metadata=metadata_from_filename,
)
raw_docs = reader.load_data()
directory_structure = getattr(reader, 'directory_structure', {})
logging.info(f"Directory structure from reader: {directory_structure}")
chunker = Chunker(
chunking_strategy="classic_chunk",
@@ -299,14 +327,15 @@ def ingest_worker(
for i in range(min(5, len(raw_docs))):
logging.info(f"Sample document {i}: {raw_docs[i]}")
file_data = {
"name": job_name, # Use original job_name
"name": job_name,
"file": filename,
"user": user, # Use original user
"user": user,
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": "local",
"original_file_path": source_file_path,
"file_path": file_path,
"directory_structure": json.dumps(directory_structure),
}
upload_index(vector_store_path, file_data)
@@ -323,6 +352,252 @@ def ingest_worker(
}
def reingest_source_worker(self, source_id, user):
"""
Re-ingestion worker that handles incremental updates by:
1. Adding chunks from newly added files
2. Removing chunks from deleted files
Args:
self: Task instance
source_id: ID of the source to re-ingest
user: User identifier
Returns:
dict: Information about the re-ingestion task
"""
try:
from application.vectorstore.vector_creator import VectorCreator
self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing re-ingestion scan"})
source = sources_collection.find_one({"_id": ObjectId(source_id), "user": user})
if not source:
raise ValueError(f"Source {source_id} not found or access denied")
storage = StorageCreator.get_storage()
source_file_path = source.get("file_path", "")
self.update_state(state="PROGRESS", meta={"current": 20, "status": "Scanning current files"})
with tempfile.TemporaryDirectory() as temp_dir:
# Download all files from storage to temp directory, preserving directory structure
if storage.is_directory(source_file_path):
files_list = storage.list_files(source_file_path)
for storage_file_path in files_list:
if storage.is_directory(storage_file_path):
continue
rel_path = os.path.relpath(storage_file_path, source_file_path)
local_file_path = os.path.join(temp_dir, rel_path)
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
# Download file
try:
file_data = storage.get_file(storage_file_path)
with open(local_file_path, "wb") as f:
f.write(file_data.read())
except Exception as e:
logging.error(f"Error downloading file {storage_file_path}: {e}")
continue
reader = SimpleDirectoryReader(
input_dir=temp_dir,
recursive=True,
required_exts=[
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
".jpg", ".jpeg",
],
exclude_hidden=True,
file_metadata=metadata_from_filename,
)
reader.load_data()
directory_structure = reader.directory_structure
logging.info(f"Directory structure built with token counts: {directory_structure}")
try:
old_directory_structure = source.get("directory_structure") or {}
if isinstance(old_directory_structure, str):
try:
old_directory_structure = json.loads(old_directory_structure)
except Exception:
old_directory_structure = {}
def _flatten_directory_structure(struct, prefix=""):
files = set()
if isinstance(struct, dict):
for name, meta in struct.items():
current_path = os.path.join(prefix, name) if prefix else name
if isinstance(meta, dict) and ("type" in meta and "size_bytes" in meta):
files.add(current_path)
elif isinstance(meta, dict):
files |= _flatten_directory_structure(meta, current_path)
return files
old_files = _flatten_directory_structure(old_directory_structure)
new_files = _flatten_directory_structure(directory_structure)
added_files = sorted(new_files - old_files)
removed_files = sorted(old_files - new_files)
if added_files:
logging.info(f"Files added since last ingest: {added_files}")
else:
logging.info("No files added since last ingest.")
if removed_files:
logging.info(f"Files removed since last ingest: {removed_files}")
else:
logging.info("No files removed since last ingest.")
except Exception as e:
logging.error(f"Error comparing directory structures: {e}", exc_info=True)
added_files = []
removed_files = []
try:
if not added_files and not removed_files:
logging.info("No changes detected.")
return {
"source_id": source_id,
"user": user,
"status": "no_changes",
"added_files": [],
"removed_files": [],
}
vector_store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
source_id,
settings.EMBEDDINGS_KEY,
)
self.update_state(state="PROGRESS", meta={"current": 40, "status": "Processing file changes"})
# 1) Delete chunks from removed files
deleted = 0
if removed_files:
try:
for ch in vector_store.get_chunks() or []:
metadata = ch.get("metadata", {}) if isinstance(ch, dict) else getattr(ch, "metadata", {})
raw_source = metadata.get("source")
source_file = str(raw_source) if raw_source else ""
if source_file in removed_files:
cid = ch.get("doc_id")
if cid:
try:
vector_store.delete_chunk(cid)
deleted += 1
except Exception as de:
logging.error(f"Failed deleting chunk {cid}: {de}")
logging.info(f"Deleted {deleted} chunks from {len(removed_files)} removed files")
except Exception as e:
logging.error(f"Error during deletion of removed file chunks: {e}", exc_info=True)
# 2) Add chunks from new files
added = 0
if added_files:
try:
# Build list of local files for added files only
added_local_files = []
for rel_path in added_files:
local_path = os.path.join(temp_dir, rel_path)
if os.path.isfile(local_path):
added_local_files.append(local_path)
if added_local_files:
reader_new = SimpleDirectoryReader(
input_files=added_local_files,
exclude_hidden=True,
errors="ignore",
file_metadata=metadata_from_filename,
)
raw_docs_new = reader_new.load_data()
chunker_new = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False,
)
chunked_new = chunker_new.chunk(documents=raw_docs_new)
for file_path, token_count in reader_new.file_token_counts.items():
try:
rel_path = os.path.relpath(file_path, start=temp_dir)
path_parts = rel_path.split(os.sep)
current_dir = directory_structure
for part in path_parts[:-1]:
if part in current_dir and isinstance(current_dir[part], dict):
current_dir = current_dir[part]
else:
break
filename = path_parts[-1]
if filename in current_dir and isinstance(current_dir[filename], dict):
current_dir[filename]["token_count"] = token_count
logging.info(f"Updated token count for {rel_path}: {token_count}")
except Exception as e:
logging.warning(f"Could not update token count for {file_path}: {e}")
for d in chunked_new:
meta = dict(d.extra_info or {})
try:
raw_src = meta.get("source")
if isinstance(raw_src, str) and os.path.isabs(raw_src):
meta["source"] = os.path.relpath(raw_src, start=temp_dir)
except Exception:
pass
vector_store.add_chunk(d.text, metadata=meta)
added += 1
logging.info(f"Added {added} chunks from {len(added_files)} new files")
except Exception as e:
logging.error(f"Error during ingestion of new files: {e}", exc_info=True)
# 3) Update source directory structure timestamp
try:
total_tokens = sum(reader.file_token_counts.values())
sources_collection.update_one(
{"_id": ObjectId(source_id)},
{
"$set": {
"directory_structure": directory_structure,
"date": datetime.datetime.now(),
"tokens": total_tokens
}
},
)
except Exception as e:
logging.error(f"Error updating directory_structure in DB: {e}", exc_info=True)
self.update_state(state="PROGRESS", meta={"current": 100, "status": "Re-ingestion completed"})
return {
"source_id": source_id,
"user": user,
"status": "completed",
"added_files": added_files,
"removed_files": removed_files,
"chunks_added": added,
"chunks_deleted": deleted,
}
except Exception as e:
logging.error(f"Error while processing file changes: {e}", exc_info=True)
raise
except Exception as e:
logging.error(f"Error in reingest_source_worker: {e}", exc_info=True)
raise
def remote_worker(
self,
source_data,