From 22c7015c695f2053492b2bc43c9c595bccd01411 Mon Sep 17 00:00:00 2001 From: Siddhant Rai Date: Tue, 29 Apr 2025 00:29:16 +0530 Subject: [PATCH] refactor: webhook listener handle both POST and GET requests --- application/api/user/routes.py | 192 +++++++++++++++++++++++---------- 1 file changed, 134 insertions(+), 58 deletions(-) diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 528a4c29..f8e40b24 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -2,9 +2,10 @@ import datetime import json import math import os +import secrets import shutil import uuid -import secrets +from functools import wraps from bson.binary import Binary, UuidRepresentation from bson.dbref import DBRef @@ -18,8 +19,8 @@ from application.agents.tools.tool_manager import ToolManager from application.api.user.tasks import ( ingest, ingest_remote, - store_attachment, process_agent_webhook, + store_attachment, ) from application.core.mongo_db import MongoDB from application.core.settings import settings @@ -419,13 +420,14 @@ class UploadFile(Resource): user = secure_filename(decoded_token.get("sub")) job_name = secure_filename(request.form["name"]) - + try: from application.storage.storage_creator import StorageCreator + storage = StorageCreator.get_storage() - + base_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}" - + if len(files) > 1: temp_files = [] for file in files: @@ -434,41 +436,56 @@ class UploadFile(Resource): storage.save_file(file, temp_path) temp_files.append(temp_path) print(f"Saved file: {filename}") - + zip_filename = f"{job_name}.zip" zip_path = f"{base_path}/{zip_filename}" - + def create_zip_archive(temp_paths, **kwargs): import tempfile + with tempfile.TemporaryDirectory() as temp_dir: for path in temp_paths: file_data = storage.get_file(path) - with open(os.path.join(temp_dir, os.path.basename(path)), 'wb') as f: + with open( + os.path.join(temp_dir, os.path.basename(path)), "wb" + ) as f: f.write(file_data.read()) - + # Create zip archive zip_temp = shutil.make_archive( base_name=os.path.join(temp_dir, job_name), format="zip", - root_dir=temp_dir + root_dir=temp_dir, ) - + return zip_temp - + zip_temp_path = create_zip_archive(temp_files) - with open(zip_temp_path, 'rb') as zip_file: + with open(zip_temp_path, "rb") as zip_file: storage.save_file(zip_file, zip_path) - + # Clean up temp files for temp_path in temp_files: storage.delete_file(temp_path) - + task = ingest.delay( settings.UPLOAD_FOLDER, [ - ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", - ".html", ".mdx", ".json", ".xlsx", ".pptx", ".png", - ".jpg", ".jpeg", + ".rst", + ".md", + ".pdf", + ".txt", + ".docx", + ".csv", + ".epub", + ".html", + ".mdx", + ".json", + ".xlsx", + ".pptx", + ".png", + ".jpg", + ".jpeg", ], job_name, zip_filename, @@ -479,15 +496,27 @@ class UploadFile(Resource): file = files[0] filename = secure_filename(file.filename) file_path = f"{base_path}/{filename}" - + storage.save_file(file, file_path) - + task = ingest.delay( settings.UPLOAD_FOLDER, [ - ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", - ".html", ".mdx", ".json", ".xlsx", ".pptx", ".png", - ".jpg", ".jpeg", + ".rst", + ".md", + ".pdf", + ".txt", + ".docx", + ".csv", + ".epub", + ".html", + ".mdx", + ".json", + ".xlsx", + ".pptx", + ".png", + ".jpg", + ".jpeg", ], job_name, filename, @@ -497,7 +526,7 @@ class UploadFile(Resource): except Exception as err: current_app.logger.error(f"Error uploading file: {err}") return make_response(jsonify({"success": False}), 400) - + return make_response(jsonify({"success": True, "task_id": task.id}), 200) @@ -1386,39 +1415,88 @@ class AgentWebhook(Resource): ) -@user_ns.route(f"/api/webhooks/agents/") -class AgentWebhookListener(Resource): - @api.doc(description="Webhook listener for agent events") - def post(self, webhook_token): +def require_agent(func): + @wraps(func) + def wrapper(*args, **kwargs): + webhook_token = kwargs.get("webhook_token") + if not webhook_token: + return make_response( + jsonify({"success": False, "message": "Webhook token missing"}), 400 + ) + agent = agents_collection.find_one( {"incoming_webhook_token": webhook_token}, {"_id": 1} ) if not agent: + current_app.logger.warning( + f"Webhook attempt with invalid token: {webhook_token}" + ) return make_response( jsonify({"success": False, "message": "Agent not found"}), 404 ) - data = request.get_json() - if not data: - return make_response( - jsonify({"success": False, "message": "No data provided"}), 400 + + kwargs["agent"] = agent + kwargs["agent_id_str"] = str(agent["_id"]) + return func(*args, **kwargs) + + return wrapper + + +@user_ns.route(f"/api/webhooks/agents/") +class AgentWebhookListener(Resource): + method_decorators = [require_agent] + + def _enqueue_webhook_task(self, agent_id_str, payload, source_method): + if not payload: + current_app.logger.warning( + f"Webhook ({source_method}) received for agent {agent_id_str} with empty payload." ) - agent_id_str = str(agent["_id"]) current_app.logger.info( - f"Incoming webhook received for agent {agent_id_str}. Enqueuing task." + f"Incoming {source_method} webhook for agent {agent_id_str}. Enqueuing task with payload: {payload}" ) try: task = process_agent_webhook.delay( agent_id=agent_id_str, - payload=data, + payload=payload, ) + current_app.logger.info( + f"Task {task.id} enqueued for agent {agent_id_str} ({source_method})." + ) + return make_response(jsonify({"success": True, "task_id": task.id}), 200) except Exception as err: - current_app.logger.error(f"Error processing webhook: {err}") - return make_response( - jsonify({"success": False, "message": "Error processing webhook"}), 400 + current_app.logger.error( + f"Error enqueuing webhook task ({source_method}) for agent {agent_id_str}: {err}", + exc_info=True, ) - return make_response(jsonify({"success": True, "task_id": task.id}), 200) + return make_response( + jsonify({"success": False, "message": "Error processing webhook"}), 500 + ) + + @api.doc( + description="Webhook listener for agent events (POST). Expects JSON payload, which is used to trigger processing.", + ) + def post(self, webhook_token, agent, agent_id_str): + payload = request.get_json() + if payload is None: + return make_response( + jsonify( + { + "success": False, + "message": "Invalid or missing JSON data in request body", + } + ), + 400, + ) + return self._enqueue_webhook_task(agent_id_str, payload, source_method="POST") + + @api.doc( + description="Webhook listener for agent events (GET). Uses URL query parameters as payload to trigger processing.", + ) + def get(self, webhook_token, agent, agent_id_str): + payload = request.args.to_dict(flat=True) + return self._enqueue_webhook_task(agent_id_str, payload, source_method="GET") @user_ns.route("/api/share") @@ -2872,9 +2950,9 @@ class StoreAttachment(Resource): decoded_token = request.decoded_token if not decoded_token: return make_response(jsonify({"success": False}), 401) - + file = request.files.get("file") - + if not file or file.filename == "": return make_response( jsonify({"status": "error", "message": "Missing file"}), @@ -2882,35 +2960,33 @@ class StoreAttachment(Resource): ) user = secure_filename(decoded_token.get("sub")) - + try: attachment_id = ObjectId() original_filename = secure_filename(file.filename) relative_path = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{str(attachment_id)}/{original_filename}" - + file_content = file.read() - + file_info = { "filename": original_filename, "attachment_id": str(attachment_id), "path": relative_path, - "file_content": file_content + "file_content": file_content, } - - task = store_attachment.delay( - file_info, - user - ) - + + task = store_attachment.delay(file_info, user) + return make_response( - jsonify({ - "success": True, - "task_id": task.id, - "message": "File uploaded successfully. Processing started." - }), - 200 + jsonify( + { + "success": True, + "task_id": task.id, + "message": "File uploaded successfully. Processing started.", + } + ), + 200, ) except Exception as err: current_app.logger.error(f"Error storing attachment: {err}") return make_response(jsonify({"success": False, "error": str(err)}), 400) -