From 266d256a07eaf7c62d9ce97c388e8a22fed39929 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Wed, 30 Jul 2025 01:57:40 +0530 Subject: [PATCH] (feat:sources) management, simple re-ingest --- application/api/user/routes.py | 139 +++++++++++++++++++++++++++++++++ application/api/user/tasks.py | 7 ++ application/worker.py | 123 +++++++++++++++++++++++++++++ 3 files changed, 269 insertions(+) diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 403008bd..d00b0623 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -627,6 +627,145 @@ class UploadFile(Resource): return make_response(jsonify({"success": True, "task_id": task.id}), 200) +@user_ns.route("/api/manage_source_files") +class ManageSourceFiles(Resource): + @api.expect( + api.model( + "ManageSourceFilesModel", + { + "source_id": fields.String(required=True, description="Source ID to modify"), + "operation": fields.String(required=True, description="Operation: 'add' or 'remove'"), + "file_paths": fields.List(fields.String, required=False, description="File paths to remove (for remove operation)"), + "file": fields.Raw(required=False, description="Files to add (for add operation)"), + "parent_dir": fields.String(required=False, description="Parent directory path relative to source root"), + }, + ) + ) + @api.doc( + description="Add or remove files from an existing source", + ) + def post(self): + decoded_token = request.decoded_token + if not decoded_token: + return make_response(jsonify({"success": False, "message": "Unauthorized"}), 401) + + user = decoded_token.get("sub") + source_id = request.form.get("source_id") + operation = request.form.get("operation") + + if not source_id or not operation: + return make_response( + jsonify({"success": False, "message": "source_id and operation are required"}), 400 + ) + + if operation not in ["add", "remove"]: + return make_response( + jsonify({"success": False, "message": "operation must be 'add' or 'remove'"}), 400 + ) + + try: + ObjectId(source_id) + except Exception: + return make_response( + jsonify({"success": False, "message": "Invalid source ID format"}), 400 + ) + + try: + source = sources_collection.find_one({"_id": ObjectId(source_id), "user": user}) + if not source: + return make_response( + jsonify({"success": False, "message": "Source not found or access denied"}), 404 + ) + except Exception as err: + current_app.logger.error(f"Error finding source: {err}", exc_info=True) + return make_response(jsonify({"success": False, "message": "Database error"}), 500) + + try: + storage = StorageCreator.get_storage() + source_file_path = source.get("file_path", "") + parent_dir = request.form.get("parent_dir", "") + + if parent_dir and (parent_dir.startswith("/") or ".." in parent_dir): + return make_response( + jsonify({"success": False, "message": "Invalid parent directory path"}), 400 + ) + + if operation == "add": + files = request.files.getlist("file") + if not files or all(file.filename == "" for file in files): + return make_response( + jsonify({"success": False, "message": "No files provided for add operation"}), 400 + ) + + added_files = [] + + target_dir = source_file_path + if parent_dir: + target_dir = f"{source_file_path}/{parent_dir}" + + for file in files: + if file.filename: + safe_filename_str = safe_filename(file.filename) + file_path = f"{target_dir}/{safe_filename_str}" + + # Save file to storage + storage.save_file(file, file_path) + added_files.append(safe_filename_str) + + # Trigger re-ingestion pipeline + from application.api.user.tasks import reingest_source_task + + task = reingest_source_task.delay(source_id=source_id, user=user) + + return make_response(jsonify({ + "success": True, + "message": f"Added {len(added_files)} files", + "added_files": added_files, + "parent_dir": parent_dir, + "reingest_task_id": task.id + }), 200) + + elif operation == "remove": + file_paths_str = request.form.get("file_paths") + if not file_paths_str: + return make_response( + jsonify({"success": False, "message": "file_paths required for remove operation"}), 400 + ) + + try: + file_paths = json.loads(file_paths_str) if isinstance(file_paths_str, str) else file_paths_str + except: + return make_response( + jsonify({"success": False, "message": "Invalid file_paths format"}), 400 + ) + + # Remove files from storage and directory structure + removed_files = [] + for file_path in file_paths: + full_path = f"{source_file_path}/{file_path}" + + # Remove from storage + if storage.file_exists(full_path): + storage.delete_file(full_path) + removed_files.append(file_path) + + # Trigger re-ingestion pipeline + from application.api.user.tasks import reingest_source_task + + task = reingest_source_task.delay(source_id=source_id, user=user) + + return make_response(jsonify({ + "success": True, + "message": f"Removed {len(removed_files)} files", + "removed_files": removed_files, + "reingest_task_id": task.id + }), 200) + + except Exception as err: + current_app.logger.error(f"Error managing source files: {err}", exc_info=True) + return make_response(jsonify({"success": False, "message": "Operation failed"}), 500) + + @user_ns.route("/api/remote") class UploadRemote(Resource): @api.expect( diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index aa40f37b..28a78c0d 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -22,6 +22,13 @@ def ingest_remote(self, source_data, job_name, user, loader): return resp +@celery.task(bind=True) +def reingest_source_task(self, source_id, user): + from application.worker import reingest_source_worker + resp = reingest_source_worker(self, source_id, user) + return resp + + @celery.task(bind=True) def schedule_syncs(self, frequency): resp = sync_worker(self, frequency) diff --git a/application/worker.py b/application/worker.py index 6e3cb1ae..5e19a202 100755 --- a/application/worker.py +++ b/application/worker.py @@ -336,6 +336,129 @@ def ingest_worker( } +def reingest_source_worker(self, source_id, user): + """ + Re-ingestion worker that scans the source and determines what needs to be re-ingested. + This is decoupled from the file management operations. + + Args: + self: Task instance + source_id: ID of the source to re-ingest + user: User identifier + + Returns: + dict: Information about the re-ingestion task + """ + try: + from application.vectorstore.vector_creator import VectorCreator + from application.utils import num_tokens_from_string + + self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing re-ingestion scan"}) + + + source = sources_collection.find_one({"_id": ObjectId(source_id), "user": user}) + if not source: + raise ValueError(f"Source {source_id} not found or access denied") + + storage = StorageCreator.get_storage() + source_file_path = source.get("file_path", "") + job_name = source.get("name", "") + + self.update_state(state="PROGRESS", meta={"current": 20, "status": "Scanning current files"}) + + with tempfile.TemporaryDirectory() as temp_dir: + # Download all files from storage to temp directory, preserving directory structure + if storage.is_directory(source_file_path): + files_list = storage.list_files(source_file_path) + + for storage_file_path in files_list: + if storage.is_directory(storage_file_path): + continue + + + rel_path = os.path.relpath(storage_file_path, source_file_path) + local_file_path = os.path.join(temp_dir, rel_path) + + os.makedirs(os.path.dirname(local_file_path), exist_ok=True) + + # Download file + try: + file_data = storage.get_file(storage_file_path) + with open(local_file_path, "wb") as f: + f.write(file_data.read()) + except Exception as e: + logging.error(f"Error downloading file {storage_file_path}: {e}") + continue + + reader = SimpleDirectoryReader( + input_dir=temp_dir, + recursive=True, + required_exts=[ + ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", + ".html", ".mdx", ".json", ".xlsx", ".pptx", ".png", + ".jpg", ".jpeg", + ], + exclude_hidden=True, + file_metadata=metadata_from_filename, + ) + + raw_docs = reader.load_data() + + directory_structure = getattr(reader, 'directory_structure', {}) + logging.info(f"Directory structure from reader: {directory_structure}") + + total_tokens = 0 + for doc in raw_docs: + if hasattr(doc, 'extra_info') and 'token_count' in doc.extra_info: + total_tokens += doc.extra_info['token_count'] + else: + doc_text = str(doc.text) if hasattr(doc, 'text') else str(doc) + total_tokens += num_tokens_from_string(doc_text) + + logging.info(f"Total tokens calculated: {total_tokens}") + + self.update_state(state="PROGRESS", meta={"current": 50, "status": "Processing documents"}) + + temp_vector_dir = os.path.join(temp_dir, "vector_store") + os.makedirs(temp_vector_dir, exist_ok=True) + + chunker = Chunker( + chunking_strategy="classic_chunk", + max_tokens=MAX_TOKENS, + min_tokens=MIN_TOKENS, + duplicate_headers=False, + ) + chunked_docs = chunker.chunk(documents=raw_docs) + + docs = [Document.to_langchain_format(doc) for doc in chunked_docs] + + embed_and_store_documents(docs, temp_vector_dir, ObjectId(source_id), self) + + file_data = { + "name": source.get("name", ""), + "file": source.get("name", ""), + "user": user, + "tokens": total_tokens, + "retriever": source.get("retriever", "classic"), + "id": source_id, + "type": source.get("type", "local"), + "file_path": source_file_path, + "directory_structure": json.dumps(directory_structure), + } + + upload_index(temp_vector_dir, file_data) + + return { + "source_id": source_id, + "user": user, + "tokens": total_tokens, + "status": "completed" + } + + except Exception as e: + logging.error(f"Error in reingest_source_worker: {e}", exc_info=True) + raise + def remote_worker( self, source_data,