Merge: branch main from upstream

This commit is contained in:
ManishMadan2882
2024-09-11 23:12:15 +05:30
32 changed files with 1622 additions and 165 deletions

View File

@@ -24,6 +24,7 @@ conversations_collection = db["conversations"]
sources_collection = db["sources"]
prompts_collection = db["prompts"]
api_key_collection = db["api_keys"]
user_logs_collection = db["user_logs"]
answer = Blueprint("answer", __name__)
gpt_model = ""
@@ -37,7 +38,9 @@ if settings.MODEL_NAME: # in case there is particular model name configured
gpt_model = settings.MODEL_NAME
# load the prompts
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
with open(os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r") as f:
chat_combine_template = f.read()
@@ -98,9 +101,12 @@ def get_retriever(source_id: str):
return retriever_name
def is_azure_configured():
return settings.OPENAI_API_BASE and settings.OPENAI_API_VERSION and settings.AZURE_DEPLOYMENT_NAME
return (
settings.OPENAI_API_BASE
and settings.OPENAI_API_VERSION
and settings.AZURE_DEPLOYMENT_NAME
)
def save_conversation(conversation_id, question, response, source_log_docs, llm):
@@ -201,6 +207,21 @@ def complete_stream(
data = json.dumps({"type": "id", "id": str(conversation_id)})
yield f"data: {data}\n\n"
retriever_params = retriever.get_params()
user_logs_collection.insert_one(
{
"action": "stream_answer",
"level": "info",
"user": "local",
"api_key": user_api_key,
"question": question,
"response": response_full,
"sources": source_log_docs,
"retriever_params": retriever_params,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
}
)
data = json.dumps({"type": "end"})
yield f"data: {data}\n\n"
except Exception as e:
@@ -258,7 +279,7 @@ def stream():
user_api_key = data["api_key"]
elif "active_docs" in data:
source = {"active_docs" : data["active_docs"]}
source = {"active_docs": data["active_docs"]}
retriever_name = get_retriever(data["active_docs"]) or retriever_name
user_api_key = None
@@ -266,12 +287,13 @@ def stream():
source = {}
user_api_key = None
current_app.logger.info(f"/stream - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})}
current_app.logger.info(
f"/stream - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})},
)
prompt = get_prompt(prompt_id)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
@@ -304,8 +326,9 @@ def stream():
mimetype="text/event-stream",
)
except Exception as e:
current_app.logger.error(f"/stream - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()}
current_app.logger.error(
f"/stream - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()},
)
message = e.args[0]
status_code = 400
@@ -364,7 +387,7 @@ def api_answer():
retriever_name = data_key["retriever"] or retriever_name
user_api_key = data["api_key"]
elif "active_docs" in data:
source = {"active_docs":data["active_docs"]}
source = {"active_docs": data["active_docs"]}
retriever_name = get_retriever(data["active_docs"]) or retriever_name
user_api_key = None
else:
@@ -373,8 +396,9 @@ def api_answer():
prompt = get_prompt(prompt_id)
current_app.logger.info(f"/api/answer - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})}
current_app.logger.info(
f"/api/answer - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})},
)
retriever = RetrieverCreator.create_retriever(
@@ -406,13 +430,30 @@ def api_answer():
result = {"answer": response_full, "sources": source_log_docs}
result["conversation_id"] = str(
save_conversation(conversation_id, question, response_full, source_log_docs, llm)
save_conversation(
conversation_id, question, response_full, source_log_docs, llm
)
)
retriever_params = retriever.get_params()
user_logs_collection.insert_one(
{
"action": "api_answer",
"level": "info",
"user": "local",
"api_key": user_api_key,
"question": question,
"response": response_full,
"sources": source_log_docs,
"retriever_params": retriever_params,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
}
)
return result
except Exception as e:
current_app.logger.error(f"/api/answer - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()}
current_app.logger.error(
f"/api/answer - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()},
)
return bad_request(500, str(e))
@@ -431,7 +472,7 @@ def api_search():
source = {"active_docs":data_key["source"]}
user_api_key = data["api_key"]
elif "active_docs" in data:
source = {"active_docs":data["active_docs"]}
source = {"active_docs": data["active_docs"]}
user_api_key = None
else:
source = {}
@@ -445,9 +486,10 @@ def api_search():
token_limit = data["token_limit"]
else:
token_limit = settings.DEFAULT_MAX_HISTORY
current_app.logger.info(f"/api/answer - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})}
current_app.logger.info(
f"/api/answer - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})},
)
retriever = RetrieverCreator.create_retriever(
@@ -463,6 +505,20 @@ def api_search():
)
docs = retriever.search()
retriever_params = retriever.get_params()
user_logs_collection.insert_one(
{
"action": "api_search",
"level": "info",
"user": "local",
"api_key": user_api_key,
"question": question,
"sources": docs,
"retriever_params": retriever_params,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
}
)
if data.get("isNoneDoc"):
for doc in docs:
doc["source"] = "None"

View File

@@ -1,12 +1,15 @@
import datetime
import os
import uuid
import shutil
from flask import Blueprint, request, jsonify
from pymongo import MongoClient
from bson.objectid import ObjectId
import uuid
from bson.binary import Binary, UuidRepresentation
from werkzeug.utils import secure_filename
from bson.dbref import DBRef
from bson.objectid import ObjectId
from flask import Blueprint, jsonify, request
from pymongo import MongoClient
from werkzeug.utils import secure_filename
from application.api.user.tasks import ingest, ingest_remote
from application.core.settings import settings
@@ -19,11 +22,36 @@ sources_collection = db["sources"]
prompts_collection = db["prompts"]
feedback_collection = db["feedback"]
api_key_collection = db["api_keys"]
token_usage_collection = db["token_usage"]
shared_conversations_collections = db["shared_conversations"]
user_logs_collection = db["user_logs"]
user = Blueprint("user", __name__)
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
def generate_minute_range(start_date, end_date):
return {
(start_date + datetime.timedelta(minutes=i)).strftime("%Y-%m-%d %H:%M:00"): 0
for i in range(int((end_date - start_date).total_seconds() // 60) + 1)
}
def generate_hourly_range(start_date, end_date):
return {
(start_date + datetime.timedelta(hours=i)).strftime("%Y-%m-%d %H:00"): 0
for i in range(int((end_date - start_date).total_seconds() // 3600) + 1)
}
def generate_date_range(start_date, end_date):
return {
(start_date + datetime.timedelta(days=i)).strftime("%Y-%m-%d"): 0
for i in range((end_date - start_date).days + 1)
}
@user.route("/api/delete_conversation", methods=["POST"])
@@ -53,7 +81,9 @@ def get_conversations():
conversations = conversations_collection.find().sort("date", -1).limit(30)
list_conversations = []
for conversation in conversations:
list_conversations.append({"id": str(conversation["_id"]), "name": conversation["name"]})
list_conversations.append(
{"id": str(conversation["_id"]), "name": conversation["name"]}
)
# list_conversations = [{"id": "default", "name": "default"}, {"id": "jeff", "name": "jeff"}]
@@ -84,7 +114,12 @@ def api_feedback():
question = data["question"]
answer = data["answer"]
feedback = data["feedback"]
new_doc = {"question": question, "answer": answer, "feedback": feedback}
new_doc = {
"question": question,
"answer": answer,
"feedback": feedback,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
}
if "api_key" in data:
new_doc["api_key"] = data["api_key"]
feedback_collection.insert_one(new_doc)
@@ -110,24 +145,31 @@ def delete_by_ids():
def delete_old():
"""Delete old indexes."""
import shutil
source_id = request.args.get("source_id")
doc = sources_collection.find_one({
"_id": ObjectId(source_id),
"user": "local",
})
if(doc is None):
return {"status":"not found"},404
doc = sources_collection.find_one(
{
"_id": ObjectId(source_id),
"user": "local",
}
)
if doc is None:
return {"status": "not found"}, 404
if settings.VECTOR_STORE == "faiss":
try:
shutil.rmtree(os.path.join(current_dir, str(doc["_id"])))
except FileNotFoundError:
pass
else:
vetorstore = VectorCreator.create_vectorstore(settings.VECTOR_STORE, source_id=str(doc["_id"]))
vetorstore = VectorCreator.create_vectorstore(
settings.VECTOR_STORE, source_id=str(doc["_id"])
)
vetorstore.delete_index()
sources_collection.delete_one({
"_id": ObjectId(source_id),
})
sources_collection.delete_one(
{
"_id": ObjectId(source_id),
}
)
return {"status": "ok"}
@@ -161,7 +203,9 @@ def upload_file():
file.save(os.path.join(temp_dir, filename))
# Use shutil.make_archive to zip the temp directory
zip_path = shutil.make_archive(base_name=os.path.join(save_dir, job_name), format="zip", root_dir=temp_dir)
zip_path = shutil.make_archive(
base_name=os.path.join(save_dir, job_name), format="zip", root_dir=temp_dir
)
final_filename = os.path.basename(zip_path)
# Clean up the temporary directory after zipping
@@ -203,7 +247,9 @@ def upload_remote():
source_data = request.form["data"]
if source_data:
task = ingest_remote.delay(source_data=source_data, job_name=job_name, user=user, loader=source)
task = ingest_remote.delay(
source_data=source_data, job_name=job_name, user=user, loader=source
)
task_id = task.id
return {"status": "ok", "task_id": task_id}
else:
@@ -248,7 +294,9 @@ def combined_json():
"model": settings.EMBEDDINGS_NAME,
"location": "local",
"tokens": index["tokens"] if ("tokens" in index.keys()) else "",
"retriever": index["retriever"] if ("retriever" in index.keys()) else "classic",
"retriever": (
index["retriever"] if ("retriever" in index.keys()) else "classic"
),
}
)
if "duckduck_search" in settings.RETRIEVERS_ENABLED:
@@ -317,7 +365,9 @@ def get_prompts():
list_prompts.append({"id": "creative", "name": "creative", "type": "public"})
list_prompts.append({"id": "strict", "name": "strict", "type": "public"})
for prompt in prompts:
list_prompts.append({"id": str(prompt["_id"]), "name": prompt["name"], "type": "private"})
list_prompts.append(
{"id": str(prompt["_id"]), "name": prompt["name"], "type": "private"}
)
return jsonify(list_prompts)
@@ -326,15 +376,21 @@ def get_prompts():
def get_single_prompt():
prompt_id = request.args.get("id")
if prompt_id == "default":
with open(os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r") as f:
with open(
os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r"
) as f:
chat_combine_template = f.read()
return jsonify({"content": chat_combine_template})
elif prompt_id == "creative":
with open(os.path.join(current_dir, "prompts", "chat_combine_creative.txt"), "r") as f:
with open(
os.path.join(current_dir, "prompts", "chat_combine_creative.txt"), "r"
) as f:
chat_reduce_creative = f.read()
return jsonify({"content": chat_reduce_creative})
elif prompt_id == "strict":
with open(os.path.join(current_dir, "prompts", "chat_combine_strict.txt"), "r") as f:
with open(
os.path.join(current_dir, "prompts", "chat_combine_strict.txt"), "r"
) as f:
chat_reduce_strict = f.read()
return jsonify({"content": chat_reduce_strict})
@@ -363,7 +419,9 @@ def update_prompt_name():
# check if name is null
if name == "":
return {"status": "error"}
prompts_collection.update_one({"_id": ObjectId(id)}, {"$set": {"name": name, "content": content}})
prompts_collection.update_one(
{"_id": ObjectId(id)}, {"$set": {"name": name, "content": content}}
)
return {"status": "ok"}
@@ -373,7 +431,7 @@ def get_api_keys():
keys = api_key_collection.find({"user": user})
list_keys = []
for key in keys:
if "source" in key and isinstance(key["source"],DBRef):
if "source" in key and isinstance(key["source"], DBRef):
source = db.dereference(key["source"])
if source is None:
continue
@@ -383,7 +441,7 @@ def get_api_keys():
source_name = key["retriever"]
else:
continue
list_keys.append(
{
"id": str(key["_id"]),
@@ -443,8 +501,10 @@ def share_conversation():
conversation_id = data["conversation_id"]
isPromptable = request.args.get("isPromptable").lower() == "true"
conversation = conversations_collection.find_one({"_id": ObjectId(conversation_id)})
if(conversation is None):
conversation = conversations_collection.find_one(
{"_id": ObjectId(conversation_id)}
)
if conversation is None:
raise Exception("Conversation does not exist")
current_n_queries = len(conversation["queries"])
@@ -456,24 +516,24 @@ def share_conversation():
chunks = "2" if "chunks" not in data else data["chunks"]
name = conversation["name"] + "(shared)"
new_api_key_data = {
"prompt_id": prompt_id,
"chunks": chunks,
"user": user,
}
new_api_key_data = {
"prompt_id": prompt_id,
"chunks": chunks,
"user": user,
}
if "source" in data and ObjectId.is_valid(data["source"]):
new_api_key_data["source"] = DBRef("sources",ObjectId(data["source"]))
new_api_key_data["source"] = DBRef("sources", ObjectId(data["source"]))
elif "retriever" in data:
new_api_key_data["retriever"] = data["retriever"]
pre_existing_api_document = api_key_collection.find_one(
new_api_key_data
)
pre_existing_api_document = api_key_collection.find_one(new_api_key_data)
if pre_existing_api_document:
api_uuid = pre_existing_api_document["key"]
pre_existing = shared_conversations_collections.find_one(
{
"conversation_id": DBRef("conversations", ObjectId(conversation_id)),
"conversation_id": DBRef(
"conversations", ObjectId(conversation_id)
),
"isPromptable": isPromptable,
"first_n_queries": current_n_queries,
"user": user,
@@ -504,33 +564,39 @@ def share_conversation():
"api_key": api_uuid,
}
)
return jsonify({"success": True, "identifier": str(explicit_binary.as_uuid())})
return jsonify(
{"success": True, "identifier": str(explicit_binary.as_uuid())}
)
else:
api_uuid = str(uuid.uuid4())
new_api_key_data["key"] = api_uuid
new_api_key_data["name"] = name
if "source" in data and ObjectId.is_valid(data["source"]):
new_api_key_data["source"] = DBRef("sources", ObjectId(data["source"]))
new_api_key_data["source"] = DBRef(
"sources", ObjectId(data["source"])
)
if "retriever" in data:
new_api_key_data["retriever"] = data["retriever"]
api_key_collection.insert_one(new_api_key_data)
shared_conversations_collections.insert_one(
{
"uuid": explicit_binary,
"conversation_id": {
"$ref": "conversations",
"$id": ObjectId(conversation_id),
},
"isPromptable": isPromptable,
"first_n_queries": current_n_queries,
"user": user,
"api_key": api_uuid,
}
)
{
"uuid": explicit_binary,
"conversation_id": {
"$ref": "conversations",
"$id": ObjectId(conversation_id),
},
"isPromptable": isPromptable,
"first_n_queries": current_n_queries,
"user": user,
"api_key": api_uuid,
}
)
## Identifier as route parameter in frontend
return (
jsonify({"success": True, "identifier": str(explicit_binary.as_uuid())}),
jsonify(
{"success": True, "identifier": str(explicit_binary.as_uuid())}
),
201,
)
@@ -545,7 +611,9 @@ def share_conversation():
)
if pre_existing is not None:
return (
jsonify({"success": True, "identifier": str(pre_existing["uuid"].as_uuid())}),
jsonify(
{"success": True, "identifier": str(pre_existing["uuid"].as_uuid())}
),
200,
)
else:
@@ -563,7 +631,9 @@ def share_conversation():
)
## Identifier as route parameter in frontend
return (
jsonify({"success": True, "identifier": str(explicit_binary.as_uuid())}),
jsonify(
{"success": True, "identifier": str(explicit_binary.as_uuid())}
),
201,
)
except Exception as err:
@@ -575,10 +645,16 @@ def share_conversation():
@user.route("/api/shared_conversation/<string:identifier>", methods=["GET"])
def get_publicly_shared_conversations(identifier: str):
try:
query_uuid = Binary.from_uuid(uuid.UUID(identifier), UuidRepresentation.STANDARD)
query_uuid = Binary.from_uuid(
uuid.UUID(identifier), UuidRepresentation.STANDARD
)
shared = shared_conversations_collections.find_one({"uuid": query_uuid})
conversation_queries = []
if shared and "conversation_id" in shared and isinstance(shared["conversation_id"], DBRef):
if (
shared
and "conversation_id" in shared
and isinstance(shared["conversation_id"], DBRef)
):
# Resolve the DBRef
conversation_ref = shared["conversation_id"]
conversation = db.dereference(conversation_ref)
@@ -592,7 +668,9 @@ def get_publicly_shared_conversations(identifier: str):
),
404,
)
conversation_queries = conversation["queries"][: (shared["first_n_queries"])]
conversation_queries = conversation["queries"][
: (shared["first_n_queries"])
]
else:
return (
jsonify(
@@ -616,3 +694,466 @@ def get_publicly_shared_conversations(identifier: str):
except Exception as err:
print(err)
return jsonify({"success": False, "error": str(err)}), 400
@user.route("/api/get_message_analytics", methods=["POST"])
def get_message_analytics():
data = request.get_json()
api_key_id = data.get("api_key_id")
filter_option = data.get("filter_option", "last_30_days")
try:
api_key = (
api_key_collection.find_one({"_id": ObjectId(api_key_id)})["key"]
if api_key_id
else None
)
except Exception as err:
print(err)
return jsonify({"success": False, "error": str(err)}), 400
end_date = datetime.datetime.now(datetime.timezone.utc)
if filter_option == "last_hour":
start_date = end_date - datetime.timedelta(hours=1)
group_format = "%Y-%m-%d %H:%M:00"
group_stage = {
"$group": {
"_id": {
"minute": {
"$dateToString": {"format": group_format, "date": "$date"}
}
},
"total_messages": {"$sum": 1},
}
}
elif filter_option == "last_24_hour":
start_date = end_date - datetime.timedelta(hours=24)
group_format = "%Y-%m-%d %H:00"
group_stage = {
"$group": {
"_id": {
"hour": {"$dateToString": {"format": group_format, "date": "$date"}}
},
"total_messages": {"$sum": 1},
}
}
else:
if filter_option in ["last_7_days", "last_15_days", "last_30_days"]:
filter_days = (
6
if filter_option == "last_7_days"
else (14 if filter_option == "last_15_days" else 29)
)
else:
return jsonify({"success": False, "error": "Invalid option"}), 400
start_date = end_date - datetime.timedelta(days=filter_days)
start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_date = end_date.replace(hour=23, minute=59, second=59, microsecond=999999)
group_format = "%Y-%m-%d"
group_stage = {
"$group": {
"_id": {
"day": {"$dateToString": {"format": group_format, "date": "$date"}}
},
"total_messages": {"$sum": 1},
}
}
try:
match_stage = {
"$match": {
"date": {"$gte": start_date, "$lte": end_date},
}
}
if api_key:
match_stage["$match"]["api_key"] = api_key
message_data = conversations_collection.aggregate(
[
match_stage,
group_stage,
{"$sort": {"_id": 1}},
]
)
if filter_option == "last_hour":
intervals = generate_minute_range(start_date, end_date)
elif filter_option == "last_24_hour":
intervals = generate_hourly_range(start_date, end_date)
else:
intervals = generate_date_range(start_date, end_date)
daily_messages = {interval: 0 for interval in intervals}
for entry in message_data:
if filter_option == "last_hour":
daily_messages[entry["_id"]["minute"]] = entry["total_messages"]
elif filter_option == "last_24_hour":
daily_messages[entry["_id"]["hour"]] = entry["total_messages"]
else:
daily_messages[entry["_id"]["day"]] = entry["total_messages"]
except Exception as err:
print(err)
return jsonify({"success": False, "error": str(err)}), 400
return jsonify({"success": True, "messages": daily_messages}), 200
@user.route("/api/get_token_analytics", methods=["POST"])
def get_token_analytics():
data = request.get_json()
api_key_id = data.get("api_key_id")
filter_option = data.get("filter_option", "last_30_days")
try:
api_key = (
api_key_collection.find_one({"_id": ObjectId(api_key_id)})["key"]
if api_key_id
else None
)
except Exception as err:
print(err)
return jsonify({"success": False, "error": str(err)}), 400
end_date = datetime.datetime.now(datetime.timezone.utc)
if filter_option == "last_hour":
start_date = end_date - datetime.timedelta(hours=1)
group_format = "%Y-%m-%d %H:%M:00"
group_stage = {
"$group": {
"_id": {
"minute": {
"$dateToString": {"format": group_format, "date": "$timestamp"}
}
},
"total_tokens": {
"$sum": {"$add": ["$prompt_tokens", "$generated_tokens"]}
},
}
}
elif filter_option == "last_24_hour":
start_date = end_date - datetime.timedelta(hours=24)
group_format = "%Y-%m-%d %H:00"
group_stage = {
"$group": {
"_id": {
"hour": {
"$dateToString": {"format": group_format, "date": "$timestamp"}
}
},
"total_tokens": {
"$sum": {"$add": ["$prompt_tokens", "$generated_tokens"]}
},
}
}
else:
if filter_option in ["last_7_days", "last_15_days", "last_30_days"]:
filter_days = (
6
if filter_option == "last_7_days"
else (14 if filter_option == "last_15_days" else 29)
)
else:
return jsonify({"success": False, "error": "Invalid option"}), 400
start_date = end_date - datetime.timedelta(days=filter_days)
start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_date = end_date.replace(hour=23, minute=59, second=59, microsecond=999999)
group_format = "%Y-%m-%d"
group_stage = {
"$group": {
"_id": {
"day": {
"$dateToString": {"format": group_format, "date": "$timestamp"}
}
},
"total_tokens": {
"$sum": {"$add": ["$prompt_tokens", "$generated_tokens"]}
},
}
}
try:
match_stage = {
"$match": {
"timestamp": {"$gte": start_date, "$lte": end_date},
}
}
if api_key:
match_stage["$match"]["api_key"] = api_key
token_usage_data = token_usage_collection.aggregate(
[
match_stage,
group_stage,
{"$sort": {"_id": 1}},
]
)
if filter_option == "last_hour":
intervals = generate_minute_range(start_date, end_date)
elif filter_option == "last_24_hour":
intervals = generate_hourly_range(start_date, end_date)
else:
intervals = generate_date_range(start_date, end_date)
daily_token_usage = {interval: 0 for interval in intervals}
for entry in token_usage_data:
if filter_option == "last_hour":
daily_token_usage[entry["_id"]["minute"]] = entry["total_tokens"]
elif filter_option == "last_24_hour":
daily_token_usage[entry["_id"]["hour"]] = entry["total_tokens"]
else:
daily_token_usage[entry["_id"]["day"]] = entry["total_tokens"]
except Exception as err:
print(err)
return jsonify({"success": False, "error": str(err)}), 400
return jsonify({"success": True, "token_usage": daily_token_usage}), 200
@user.route("/api/get_feedback_analytics", methods=["POST"])
def get_feedback_analytics():
data = request.get_json()
api_key_id = data.get("api_key_id")
filter_option = data.get("filter_option", "last_30_days")
try:
api_key = (
api_key_collection.find_one({"_id": ObjectId(api_key_id)})["key"]
if api_key_id
else None
)
except Exception as err:
print(err)
return jsonify({"success": False, "error": str(err)}), 400
end_date = datetime.datetime.now(datetime.timezone.utc)
if filter_option == "last_hour":
start_date = end_date - datetime.timedelta(hours=1)
group_format = "%Y-%m-%d %H:%M:00"
group_stage_1 = {
"$group": {
"_id": {
"minute": {
"$dateToString": {"format": group_format, "date": "$timestamp"}
},
"feedback": "$feedback",
},
"count": {"$sum": 1},
}
}
group_stage_2 = {
"$group": {
"_id": "$_id.minute",
"likes": {
"$sum": {
"$cond": [
{"$eq": ["$_id.feedback", "LIKE"]},
"$count",
0,
]
}
},
"dislikes": {
"$sum": {
"$cond": [
{"$eq": ["$_id.feedback", "DISLIKE"]},
"$count",
0,
]
}
},
}
}
elif filter_option == "last_24_hour":
start_date = end_date - datetime.timedelta(hours=24)
group_format = "%Y-%m-%d %H:00"
group_stage_1 = {
"$group": {
"_id": {
"hour": {
"$dateToString": {"format": group_format, "date": "$timestamp"}
},
"feedback": "$feedback",
},
"count": {"$sum": 1},
}
}
group_stage_2 = {
"$group": {
"_id": "$_id.hour",
"likes": {
"$sum": {
"$cond": [
{"$eq": ["$_id.feedback", "LIKE"]},
"$count",
0,
]
}
},
"dislikes": {
"$sum": {
"$cond": [
{"$eq": ["$_id.feedback", "DISLIKE"]},
"$count",
0,
]
}
},
}
}
else:
if filter_option in ["last_7_days", "last_15_days", "last_30_days"]:
filter_days = (
6
if filter_option == "last_7_days"
else (14 if filter_option == "last_15_days" else 29)
)
else:
return jsonify({"success": False, "error": "Invalid option"}), 400
start_date = end_date - datetime.timedelta(days=filter_days)
start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_date = end_date.replace(hour=23, minute=59, second=59, microsecond=999999)
group_format = "%Y-%m-%d"
group_stage_1 = {
"$group": {
"_id": {
"day": {
"$dateToString": {"format": group_format, "date": "$timestamp"}
},
"feedback": "$feedback",
},
"count": {"$sum": 1},
}
}
group_stage_2 = {
"$group": {
"_id": "$_id.day",
"likes": {
"$sum": {
"$cond": [
{"$eq": ["$_id.feedback", "LIKE"]},
"$count",
0,
]
}
},
"dislikes": {
"$sum": {
"$cond": [
{"$eq": ["$_id.feedback", "DISLIKE"]},
"$count",
0,
]
}
},
}
}
try:
match_stage = {
"$match": {
"timestamp": {"$gte": start_date, "$lte": end_date},
}
}
if api_key:
match_stage["$match"]["api_key"] = api_key
feedback_data = feedback_collection.aggregate(
[
match_stage,
group_stage_1,
group_stage_2,
{"$sort": {"_id": 1}},
]
)
if filter_option == "last_hour":
intervals = generate_minute_range(start_date, end_date)
elif filter_option == "last_24_hour":
intervals = generate_hourly_range(start_date, end_date)
else:
intervals = generate_date_range(start_date, end_date)
daily_feedback = {
interval: {"positive": 0, "negative": 0} for interval in intervals
}
for entry in feedback_data:
daily_feedback[entry["_id"]] = {
"positive": entry["likes"],
"negative": entry["dislikes"],
}
except Exception as err:
print(err)
return jsonify({"success": False, "error": str(err)}), 400
return jsonify({"success": True, "feedback": daily_feedback}), 200
@user.route("/api/get_user_logs", methods=["POST"])
def get_user_logs():
data = request.get_json()
page = int(data.get("page", 1))
api_key_id = data.get("api_key_id")
page_size = int(data.get("page_size", 10))
skip = (page - 1) * page_size
try:
api_key = (
api_key_collection.find_one({"_id": ObjectId(api_key_id)})["key"]
if api_key_id
else None
)
except Exception as err:
print(err)
return jsonify({"success": False, "error": str(err)}), 400
query = {}
if api_key:
query = {"api_key": api_key}
items_cursor = (
user_logs_collection.find(query)
.sort("timestamp", -1)
.skip(skip)
.limit(page_size + 1)
)
items = list(items_cursor)
results = []
for item in items[:page_size]:
results.append(
{
"id": str(item.get("_id")),
"action": item.get("action"),
"level": item.get("level"),
"user": item.get("user"),
"question": item.get("question"),
"sources": item.get("sources"),
"retriever_params": item.get("retriever_params"),
"timestamp": item.get("timestamp"),
}
)
has_more = len(items) > page_size
return (
jsonify(
{
"success": True,
"logs": results,
"page": page,
"page_size": page_size,
"has_more": has_more,
}
),
200,
)