refactor: webhook listener handle both POST and GET requests

This commit is contained in:
Siddhant Rai
2025-04-29 00:29:16 +05:30
parent cc67d4a1e2
commit 22c7015c69

View File

@@ -2,9 +2,10 @@ import datetime
import json
import math
import os
import secrets
import shutil
import uuid
import secrets
from functools import wraps
from bson.binary import Binary, UuidRepresentation
from bson.dbref import DBRef
@@ -18,8 +19,8 @@ from application.agents.tools.tool_manager import ToolManager
from application.api.user.tasks import (
ingest,
ingest_remote,
store_attachment,
process_agent_webhook,
store_attachment,
)
from application.core.mongo_db import MongoDB
from application.core.settings import settings
@@ -419,13 +420,14 @@ class UploadFile(Resource):
user = secure_filename(decoded_token.get("sub"))
job_name = secure_filename(request.form["name"])
try:
from application.storage.storage_creator import StorageCreator
storage = StorageCreator.get_storage()
base_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}"
if len(files) > 1:
temp_files = []
for file in files:
@@ -434,41 +436,56 @@ class UploadFile(Resource):
storage.save_file(file, temp_path)
temp_files.append(temp_path)
print(f"Saved file: {filename}")
zip_filename = f"{job_name}.zip"
zip_path = f"{base_path}/{zip_filename}"
def create_zip_archive(temp_paths, **kwargs):
import tempfile
with tempfile.TemporaryDirectory() as temp_dir:
for path in temp_paths:
file_data = storage.get_file(path)
with open(os.path.join(temp_dir, os.path.basename(path)), 'wb') as f:
with open(
os.path.join(temp_dir, os.path.basename(path)), "wb"
) as f:
f.write(file_data.read())
# Create zip archive
zip_temp = shutil.make_archive(
base_name=os.path.join(temp_dir, job_name),
format="zip",
root_dir=temp_dir
root_dir=temp_dir,
)
return zip_temp
zip_temp_path = create_zip_archive(temp_files)
with open(zip_temp_path, 'rb') as zip_file:
with open(zip_temp_path, "rb") as zip_file:
storage.save_file(zip_file, zip_path)
# Clean up temp files
for temp_path in temp_files:
storage.delete_file(temp_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
".jpg", ".jpeg",
".rst",
".md",
".pdf",
".txt",
".docx",
".csv",
".epub",
".html",
".mdx",
".json",
".xlsx",
".pptx",
".png",
".jpg",
".jpeg",
],
job_name,
zip_filename,
@@ -479,15 +496,27 @@ class UploadFile(Resource):
file = files[0]
filename = secure_filename(file.filename)
file_path = f"{base_path}/{filename}"
storage.save_file(file, file_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
".jpg", ".jpeg",
".rst",
".md",
".pdf",
".txt",
".docx",
".csv",
".epub",
".html",
".mdx",
".json",
".xlsx",
".pptx",
".png",
".jpg",
".jpeg",
],
job_name,
filename,
@@ -497,7 +526,7 @@ class UploadFile(Resource):
except Exception as err:
current_app.logger.error(f"Error uploading file: {err}")
return make_response(jsonify({"success": False}), 400)
return make_response(jsonify({"success": True, "task_id": task.id}), 200)
@@ -1386,39 +1415,88 @@ class AgentWebhook(Resource):
)
@user_ns.route(f"/api/webhooks/agents/<string:webhook_token>")
class AgentWebhookListener(Resource):
@api.doc(description="Webhook listener for agent events")
def post(self, webhook_token):
def require_agent(func):
@wraps(func)
def wrapper(*args, **kwargs):
webhook_token = kwargs.get("webhook_token")
if not webhook_token:
return make_response(
jsonify({"success": False, "message": "Webhook token missing"}), 400
)
agent = agents_collection.find_one(
{"incoming_webhook_token": webhook_token}, {"_id": 1}
)
if not agent:
current_app.logger.warning(
f"Webhook attempt with invalid token: {webhook_token}"
)
return make_response(
jsonify({"success": False, "message": "Agent not found"}), 404
)
data = request.get_json()
if not data:
return make_response(
jsonify({"success": False, "message": "No data provided"}), 400
kwargs["agent"] = agent
kwargs["agent_id_str"] = str(agent["_id"])
return func(*args, **kwargs)
return wrapper
@user_ns.route(f"/api/webhooks/agents/<string:webhook_token>")
class AgentWebhookListener(Resource):
method_decorators = [require_agent]
def _enqueue_webhook_task(self, agent_id_str, payload, source_method):
if not payload:
current_app.logger.warning(
f"Webhook ({source_method}) received for agent {agent_id_str} with empty payload."
)
agent_id_str = str(agent["_id"])
current_app.logger.info(
f"Incoming webhook received for agent {agent_id_str}. Enqueuing task."
f"Incoming {source_method} webhook for agent {agent_id_str}. Enqueuing task with payload: {payload}"
)
try:
task = process_agent_webhook.delay(
agent_id=agent_id_str,
payload=data,
payload=payload,
)
current_app.logger.info(
f"Task {task.id} enqueued for agent {agent_id_str} ({source_method})."
)
return make_response(jsonify({"success": True, "task_id": task.id}), 200)
except Exception as err:
current_app.logger.error(f"Error processing webhook: {err}")
return make_response(
jsonify({"success": False, "message": "Error processing webhook"}), 400
current_app.logger.error(
f"Error enqueuing webhook task ({source_method}) for agent {agent_id_str}: {err}",
exc_info=True,
)
return make_response(jsonify({"success": True, "task_id": task.id}), 200)
return make_response(
jsonify({"success": False, "message": "Error processing webhook"}), 500
)
@api.doc(
description="Webhook listener for agent events (POST). Expects JSON payload, which is used to trigger processing.",
)
def post(self, webhook_token, agent, agent_id_str):
payload = request.get_json()
if payload is None:
return make_response(
jsonify(
{
"success": False,
"message": "Invalid or missing JSON data in request body",
}
),
400,
)
return self._enqueue_webhook_task(agent_id_str, payload, source_method="POST")
@api.doc(
description="Webhook listener for agent events (GET). Uses URL query parameters as payload to trigger processing.",
)
def get(self, webhook_token, agent, agent_id_str):
payload = request.args.to_dict(flat=True)
return self._enqueue_webhook_task(agent_id_str, payload, source_method="GET")
@user_ns.route("/api/share")
@@ -2872,9 +2950,9 @@ class StoreAttachment(Resource):
decoded_token = request.decoded_token
if not decoded_token:
return make_response(jsonify({"success": False}), 401)
file = request.files.get("file")
if not file or file.filename == "":
return make_response(
jsonify({"status": "error", "message": "Missing file"}),
@@ -2882,35 +2960,33 @@ class StoreAttachment(Resource):
)
user = secure_filename(decoded_token.get("sub"))
try:
attachment_id = ObjectId()
original_filename = secure_filename(file.filename)
relative_path = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{str(attachment_id)}/{original_filename}"
file_content = file.read()
file_info = {
"filename": original_filename,
"attachment_id": str(attachment_id),
"path": relative_path,
"file_content": file_content
"file_content": file_content,
}
task = store_attachment.delay(
file_info,
user
)
task = store_attachment.delay(file_info, user)
return make_response(
jsonify({
"success": True,
"task_id": task.id,
"message": "File uploaded successfully. Processing started."
}),
200
jsonify(
{
"success": True,
"task_id": task.id,
"message": "File uploaded successfully. Processing started.",
}
),
200,
)
except Exception as err:
current_app.logger.error(f"Error storing attachment: {err}")
return make_response(jsonify({"success": False, "error": str(err)}), 400)