diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 9e97e2ab..6b52a436 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -413,81 +413,85 @@ class UploadFile(Resource): user = secure_filename(decoded_token.get("sub")) job_name = secure_filename(request.form["name"]) + try: - save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name) - os.makedirs(save_dir, exist_ok=True) - + from application.storage.storage_creator import StorageCreator + storage = StorageCreator.get_storage() + + base_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}" + if len(files) > 1: - temp_dir = os.path.join(save_dir, "temp") - os.makedirs(temp_dir, exist_ok=True) - + temp_files = [] for file in files: filename = secure_filename(file.filename) - file.save(os.path.join(temp_dir, filename)) + temp_path = f"{base_path}/temp/{filename}" + storage.save_file(file, temp_path) + temp_files.append(temp_path) print(f"Saved file: {filename}") - zip_path = shutil.make_archive( - base_name=os.path.join(save_dir, job_name), - format="zip", - root_dir=temp_dir, - ) - final_filename = os.path.basename(zip_path) - shutil.rmtree(temp_dir) + + zip_filename = f"{job_name}.zip" + zip_path = f"{base_path}/{zip_filename}" + + def create_zip_archive(temp_paths, **kwargs): + import tempfile + with tempfile.TemporaryDirectory() as temp_dir: + for path in temp_paths: + file_data = storage.get_file(path) + with open(os.path.join(temp_dir, os.path.basename(path)), 'wb') as f: + f.write(file_data.read()) + + # Create zip archive + zip_temp = shutil.make_archive( + base_name=os.path.join(temp_dir, job_name), + format="zip", + root_dir=temp_dir + ) + + return zip_temp + + zip_temp_path = create_zip_archive(temp_files) + with open(zip_temp_path, 'rb') as zip_file: + storage.save_file(zip_file, zip_path) + + # Clean up temp files + for temp_path in temp_files: + storage.delete_file(temp_path) + task = ingest.delay( settings.UPLOAD_FOLDER, [ - ".rst", - ".md", - ".pdf", - ".txt", - ".docx", - ".csv", - ".epub", - ".html", - ".mdx", - ".json", - ".xlsx", - ".pptx", - ".png", - ".jpg", - ".jpeg", + ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", + ".html", ".mdx", ".json", ".xlsx", ".pptx", ".png", + ".jpg", ".jpeg", ], job_name, - final_filename, + zip_filename, user, ) else: + # For single file file = files[0] - final_filename = secure_filename(file.filename) - file_path = os.path.join(save_dir, final_filename) - file.save(file_path) - + filename = secure_filename(file.filename) + file_path = f"{base_path}/{filename}" + + storage.save_file(file, file_path) + task = ingest.delay( settings.UPLOAD_FOLDER, [ - ".rst", - ".md", - ".pdf", - ".txt", - ".docx", - ".csv", - ".epub", - ".html", - ".mdx", - ".json", - ".xlsx", - ".pptx", - ".png", - ".jpg", - ".jpeg", + ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", + ".html", ".mdx", ".json", ".xlsx", ".pptx", ".png", + ".jpg", ".jpeg", ], job_name, - final_filename, + filename, user, ) except Exception as err: current_app.logger.error(f"Error uploading file: {err}") return make_response(jsonify({"success": False}), 400) + return make_response(jsonify({"success": True, "task_id": task.id}), 200) diff --git a/application/worker.py b/application/worker.py index b5caa23e..d83639d7 100755 --- a/application/worker.py +++ b/application/worker.py @@ -133,62 +133,91 @@ def ingest_worker( limit = None exclude = True sample = False + + storage = StorageCreator.get_storage() + full_path = os.path.join(directory, user, name_job) - + source_file_path = os.path.join(full_path, filename) + logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job}) - file_data = {"name": name_job, "file": filename, "user": user} + + # Create temporary working directory + with tempfile.TemporaryDirectory() as temp_dir: + try: + os.makedirs(temp_dir, exist_ok=True) + + # Download file from storage to temp directory + temp_file_path = os.path.join(temp_dir, filename) + file_data = storage.get_file(source_file_path) + + with open(temp_file_path, 'wb') as f: + f.write(file_data.read()) + + self.update_state(state="PROGRESS", meta={"current": 1}) - if not os.path.exists(full_path): - os.makedirs(full_path) - download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename)) + # Handle zip files + if filename.endswith('.zip'): + logging.info(f"Extracting zip file: {filename}") + extract_zip_recursive( + temp_file_path, + temp_dir, + current_depth=0, + max_depth=RECURSION_DEPTH + ) - # check if file is .zip and extract it - if filename.endswith(".zip"): - extract_zip_recursive( - os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH - ) + if sample: + logging.info(f"Sample mode enabled. Using {limit} documents.") - self.update_state(state="PROGRESS", meta={"current": 1}) + reader = SimpleDirectoryReader( + input_dir=temp_dir, + input_files=input_files, + recursive=recursive, + required_exts=formats, + exclude_hidden=exclude, + file_metadata=metadata_from_filename, + ) + raw_docs = reader.load_data() - raw_docs = SimpleDirectoryReader( - input_dir=full_path, - input_files=input_files, - recursive=recursive, - required_exts=formats, - num_files_limit=limit, - exclude_hidden=exclude, - file_metadata=metadata_from_filename, - ).load_data() + chunker = Chunker( + chunking_strategy="classic_chunk", + max_tokens=MAX_TOKENS, + min_tokens=MIN_TOKENS, + duplicate_headers=False + ) + raw_docs = chunker.chunk(documents=raw_docs) + + docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] + + id = ObjectId() + + vector_store_path = os.path.join(temp_dir, 'vector_store') + os.makedirs(vector_store_path, exist_ok=True) + + embed_and_store_documents(docs, vector_store_path, id, self) + + tokens = count_tokens_docs(docs) + + self.update_state(state="PROGRESS", meta={"current": 100}) - chunker = Chunker( - chunking_strategy="classic_chunk", - max_tokens=MAX_TOKENS, - min_tokens=MIN_TOKENS, - duplicate_headers=False - ) - raw_docs = chunker.chunk(documents=raw_docs) + if sample: + for i in range(min(5, len(raw_docs))): + logging.info(f"Sample document {i}: {raw_docs[i]}") + file_data = { + "name": name_job, + "file": filename, + "user": user, + "tokens": tokens, + "retriever": retriever, + "id": str(id), + "type": "local", + } - docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] - id = ObjectId() - embed_and_store_documents(docs, full_path, id, self) - tokens = count_tokens_docs(docs) - self.update_state(state="PROGRESS", meta={"current": 100}) + upload_index(vector_store_path, file_data) - if sample: - for i in range(min(5, len(raw_docs))): - logging.info(f"Sample document {i}: {raw_docs[i]}") - - file_data.update({ - "tokens": tokens, - "retriever": retriever, - "id": str(id), - "type": "local", - }) - upload_index(full_path, file_data) - - # delete local - shutil.rmtree(full_path) + except Exception as e: + logging.error(f"Error in ingest_worker: {e}", exc_info=True) + raise return { "directory": directory,