Files
DocsGPT/extensions/chatwoot/app.py
2026-04-03 18:04:49 +01:00

116 lines
3.3 KiB
Python

import hashlib
import hmac
import os
import pprint
import dotenv
import requests
from flask import Flask, request
dotenv.load_dotenv()
docsgpt_url = os.getenv("docsgpt_url")
chatwoot_url = os.getenv("chatwoot_url")
docsgpt_key = os.getenv("docsgpt_key")
chatwoot_token = os.getenv("chatwoot_token")
chatwoot_webhook_secret = os.getenv("chatwoot_webhook_secret", "")
# account_id = os.getenv("account_id")
# assignee_id = os.getenv("assignee_id")
label_stop = "human-requested"
def send_to_bot(sender, message):
data = {
'sender': sender,
'question': message,
'api_key': docsgpt_key,
'embeddings_key': docsgpt_key,
'history': ''
}
headers = {"Content-Type": "application/json",
"Accept": "application/json"}
r = requests.post(f'{docsgpt_url}/api/answer',
json=data, headers=headers)
return r.json()['answer']
def send_to_chatwoot(account, conversation, message):
data = {
'content': message
}
url = f"{chatwoot_url}/api/v1/accounts/{account}/conversations/{conversation}/messages"
headers = {"Content-Type": "application/json",
"Accept": "application/json",
"api_access_token": f"{chatwoot_token}"}
r = requests.post(url,
json=data, headers=headers)
return r.json()
def is_valid_chatwoot_signature(raw_body: bytes, signature_header: str | None) -> bool:
"""Validate Chatwoot webhook signature using shared secret."""
if not chatwoot_webhook_secret or not signature_header:
return False
expected = hmac.new(
chatwoot_webhook_secret.encode("utf-8"), raw_body, hashlib.sha256
).hexdigest()
provided = signature_header.strip()
if provided.startswith("sha256="):
provided = provided.split("=", maxsplit=1)[1]
return hmac.compare_digest(provided, expected)
app = Flask(__name__)
@app.route('/docsgpt', methods=['POST'])
def docsgpt():
raw_body = request.get_data()
signature = request.headers.get("X-Chatwoot-Signature")
if not is_valid_chatwoot_signature(raw_body, signature):
return "Unauthorized", 401
data = request.get_json(silent=True)
if not isinstance(data, dict):
return "Invalid payload", 400
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(data)
try:
message_type = data['message_type']
except KeyError:
return "Not a message"
message = data['content']
conversation = data['conversation']['id']
contact = data['sender']['id']
account = data['account']['id']
assignee = data['conversation']['meta']['assignee']['id']
print(account)
print(label_stop)
print(data['conversation']['labels'])
print(assignee)
if label_stop in data['conversation']['labels']:
return "Label stop"
# elif str(account) != str(account_id):
# return "Not the right account"
# elif str(assignee) != str(assignee_id):
# return "Not the right assignee"
if (message_type == "incoming"):
bot_response = send_to_bot(contact, message)
create_message = send_to_chatwoot(
account, conversation, bot_response)
else:
return "Not an incoming message"
return create_message
if __name__ == '__main__':
app.run(host='0.0.0.0', port=80)