(feat:reingestion) spit directories

This commit is contained in:
ManishMadan2882
2025-08-02 00:49:15 +05:30
parent 5212769848
commit 53225bda4e
3 changed files with 110 additions and 17 deletions

View File

@@ -77,10 +77,13 @@ def upload_index_files():
file_pkl = request.files["file_pkl"]
if file_pkl.filename == "":
return {"status": "no file name"}
# Save index files to storage
storage.save_file(file_faiss, f"{index_base_path}/index.faiss")
storage.save_file(file_pkl, f"{index_base_path}/index.pkl")
faiss_storage_path = f"{index_base_path}/index.faiss"
pkl_storage_path = f"{index_base_path}/index.pkl"
storage.save_file(file_faiss, faiss_storage_path)
storage.save_file(file_pkl, pkl_storage_path)
existing_entry = sources_collection.find_one({"_id": ObjectId(id)})
if existing_entry:

View File

@@ -634,15 +634,16 @@ class ManageSourceFiles(Resource):
"ManageSourceFilesModel",
{
"source_id": fields.String(required=True, description="Source ID to modify"),
"operation": fields.String(required=True, description="Operation: 'add' or 'remove'"),
"operation": fields.String(required=True, description="Operation: 'add', 'remove', or 'remove_directory'"),
"file_paths": fields.List(fields.String, required=False, description="File paths to remove (for remove operation)"),
"directory_path": fields.String(required=False, description="Directory path to remove (for remove_directory operation)"),
"file": fields.Raw(required=False, description="Files to add (for add operation)"),
"parent_dir": fields.String(required=False, description="Parent directory path relative to source root"),
},
)
)
@api.doc(
description="Add or remove files from an existing source",
description="Add files, remove files, or remove directories from an existing source",
)
def post(self):
decoded_token = request.decoded_token
@@ -658,9 +659,9 @@ class ManageSourceFiles(Resource):
jsonify({"success": False, "message": "source_id and operation are required"}), 400
)
if operation not in ["add", "remove"]:
if operation not in ["add", "remove", "remove_directory"]:
return make_response(
jsonify({"success": False, "message": "operation must be 'add' or 'remove'"}), 400
jsonify({"success": False, "message": "operation must be 'add', 'remove', or 'remove_directory'"}), 400
)
try:
@@ -761,8 +762,78 @@ class ManageSourceFiles(Resource):
"reingest_task_id": task.id
}), 200)
elif operation == "remove_directory":
directory_path = request.form.get("directory_path")
if not directory_path:
return make_response(
jsonify({"success": False, "message": "directory_path required for remove_directory operation"}), 400
)
# Validate directory path (prevent path traversal)
if directory_path.startswith("/") or ".." in directory_path:
current_app.logger.warning(
f"Invalid directory path attempted for removal. "
f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}"
)
return make_response(
jsonify({"success": False, "message": "Invalid directory path"}), 400
)
full_directory_path = f"{source_file_path}/{directory_path}" if directory_path else source_file_path
if not storage.is_directory(full_directory_path):
current_app.logger.warning(
f"Directory not found or is not a directory for removal. "
f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}, "
f"Full path: {full_directory_path}"
)
return make_response(
jsonify({"success": False, "message": "Directory not found or is not a directory"}), 404
)
success = storage.remove_directory(full_directory_path)
if not success:
current_app.logger.error(
f"Failed to remove directory from storage. "
f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}, "
f"Full path: {full_directory_path}"
)
return make_response(
jsonify({"success": False, "message": "Failed to remove directory"}), 500
)
current_app.logger.info(
f"Successfully removed directory. "
f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}, "
f"Full path: {full_directory_path}"
)
# Trigger re-ingestion pipeline
from application.api.user.tasks import reingest_source_task
task = reingest_source_task.delay(source_id=source_id, user=user)
return make_response(jsonify({
"success": True,
"message": f"Successfully removed directory: {directory_path}",
"removed_directory": directory_path,
"reingest_task_id": task.id
}), 200)
except Exception as err:
current_app.logger.error(f"Error managing source files: {err}", exc_info=True)
error_context = f"operation={operation}, user={user}, source_id={source_id}"
if operation == "remove_directory":
directory_path = request.form.get("directory_path", "")
error_context += f", directory_path={directory_path}"
elif operation == "remove":
file_paths_str = request.form.get("file_paths", "")
error_context += f", file_paths={file_paths_str}"
elif operation == "add":
parent_dir = request.form.get("parent_dir", "")
error_context += f", parent_dir={parent_dir}"
current_app.logger.error(f"Error managing source files: {err} ({error_context})", exc_info=True)
return make_response(jsonify({"success": False, "message": "Operation failed"}), 500)
@@ -3438,15 +3509,22 @@ class GetChunks(Resource):
filtered_chunks = []
for chunk in chunks:
metadata = chunk.get("metadata", {})
# Filter by path if provided
if path:
chunk_source = metadata.get("source", "")
# Check if the chunk's source matches the requested path
if not chunk_source or not chunk_source.endswith(path):
continue
# Filter by search term if provided
if search_term:
text_match = search_term in chunk.get("text", "").lower()
title_match = search_term in metadata.get("title", "").lower()
if not (text_match or title_match):
continue
filtered_chunks.append(chunk)
chunks = filtered_chunks

View File

@@ -98,11 +98,23 @@ def download_file(url, params, dest_path):
def upload_index(full_path, file_data):
files = None
try:
if settings.VECTOR_STORE == "faiss":
faiss_path = full_path + "/index.faiss"
pkl_path = full_path + "/index.pkl"
if not os.path.exists(faiss_path):
logging.error(f"FAISS index file not found: {faiss_path}")
raise FileNotFoundError(f"FAISS index file not found: {faiss_path}")
if not os.path.exists(pkl_path):
logging.error(f"FAISS pickle file not found: {pkl_path}")
raise FileNotFoundError(f"FAISS pickle file not found: {pkl_path}")
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
"file_faiss": open(faiss_path, "rb"),
"file_pkl": open(pkl_path, "rb"),
}
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"),
@@ -114,11 +126,11 @@ def upload_index(full_path, file_data):
urljoin(settings.API_URL, "/api/upload_index"), data=file_data
)
response.raise_for_status()
except requests.RequestException as e:
except (requests.RequestException, FileNotFoundError) as e:
logging.error(f"Error uploading index: {e}")
raise
finally:
if settings.VECTOR_STORE == "faiss":
if settings.VECTOR_STORE == "faiss" and files is not None:
for file in files.values():
file.close()
@@ -552,7 +564,7 @@ def reingest_source_worker(self, source_id, user):
{"_id": ObjectId(source_id)},
{
"$set": {
"directory_structure": json.dumps(directory_structure),
"directory_structure": directory_structure,
"date": datetime.datetime.now(),
"tokens": total_tokens
}