Merge branch 'main' into feat/agent-refactor-and-logging

This commit is contained in:
Alex
2025-03-05 16:04:09 -05:00
committed by GitHub
28 changed files with 624 additions and 1072 deletions

View File

@@ -116,8 +116,9 @@ def is_azure_configured():
def save_conversation(
conversation_id, question, response, source_log_docs, tool_calls, llm, index=None
conversation_id, question, response, source_log_docs, tool_calls, llm, index=None, api_key=None
):
current_time = datetime.datetime.now(datetime.timezone.utc)
if conversation_id is not None and index is not None:
conversations_collection.update_one(
{"_id": ObjectId(conversation_id), f"queries.{index}": {"$exists": True}},
@@ -127,6 +128,7 @@ def save_conversation(
f"queries.{index}.response": response,
f"queries.{index}.sources": source_log_docs,
f"queries.{index}.tool_calls": tool_calls,
f"queries.{index}.timestamp": current_time
}
},
)
@@ -145,6 +147,7 @@ def save_conversation(
"response": response,
"sources": source_log_docs,
"tool_calls": tool_calls,
"timestamp": current_time
}
}
},
@@ -169,21 +172,25 @@ def save_conversation(
]
completion = llm.gen(model=gpt_model, messages=messages_summary, max_tokens=30)
conversation_id = conversations_collection.insert_one(
{
"user": "local",
"date": datetime.datetime.utcnow(),
"name": completion,
"queries": [
{
"prompt": question,
"response": response,
"sources": source_log_docs,
"tool_calls": tool_calls,
}
],
}
).inserted_id
conversation_data = {
"user": "local",
"date": datetime.datetime.utcnow(),
"name": completion,
"queries": [
{
"prompt": question,
"response": response,
"sources": source_log_docs,
"tool_calls": tool_calls,
"timestamp": current_time
}
],
}
if api_key:
api_key_doc = api_key_collection.find_one({"key": api_key})
if api_key_doc:
conversation_data["api_key"] = api_key_doc["key"]
conversation_id = conversations_collection.insert_one(conversation_data).inserted_id
return conversation_id
@@ -198,7 +205,6 @@ def get_prompt(prompt_id):
prompt = prompts_collection.find_one({"_id": ObjectId(prompt_id)})["content"]
return prompt
def complete_stream(
question,
agent,
@@ -207,8 +213,14 @@ def complete_stream(
user_api_key,
isNoneDoc=False,
index=None,
question,
retriever,
conversation_id,
user_api_key,
isNoneDoc=False,
index=None,
should_save_conversation=True
):
try:
response_full = ""
source_log_docs = []
@@ -239,9 +251,12 @@ def complete_stream(
doc["source"] = "None"
llm = LLMCreator.create_llm(
settings.LLM_NAME, api_key=settings.API_KEY, user_api_key=user_api_key
settings.LLM_NAME,
api_key=settings.API_KEY,
user_api_key=user_api_key
)
if user_api_key is None:
if should_save_conversation:
conversation_id = save_conversation(
conversation_id,
question,
@@ -250,10 +265,14 @@ def complete_stream(
tool_calls,
llm,
index,
api_key=user_api_key
)
# send data.type = "end" to indicate that the stream has ended as json
data = json.dumps({"type": "id", "id": str(conversation_id)})
yield f"data: {data}\n\n"
else:
conversation_id = None
# send data.type = "end" to indicate that the stream has ended as json
data = json.dumps({"type": "id", "id": str(conversation_id)})
yield f"data: {data}\n\n"
retriever_params = retriever.get_params()
user_logs_collection.insert_one(
@@ -316,6 +335,9 @@ class Stream(Resource):
"index": fields.Integer(
required=False, description="The position where query is to be updated"
),
"save_conversation": fields.Boolean(
required=False, default=True, description="Flag to save conversation"
),
},
)
@@ -330,6 +352,8 @@ class Stream(Resource):
if missing_fields:
return missing_fields
save_conv = data.get("save_conversation", True)
try:
question = data["question"]
history = limit_chat_history(
@@ -400,6 +424,7 @@ class Stream(Resource):
user_api_key=user_api_key,
isNoneDoc=data.get("isNoneDoc"),
index=index,
should_save_conversation=save_conv,
),
mimetype="text/event-stream",
)

View File

@@ -107,11 +107,14 @@ class DeleteAllConversations(Resource):
@user_ns.route("/api/get_conversations")
class GetConversations(Resource):
@api.doc(
description="Retrieve a list of the latest 30 conversations",
description="Retrieve a list of the latest 30 conversations (excluding API key conversations)",
)
def get(self):
try:
conversations = conversations_collection.find().sort("date", -1).limit(30)
conversations = conversations_collection.find(
{"api_key": {"$exists": False}}
).sort("date", -1).limit(30)
list_conversations = [
{"id": str(conversation["_id"]), "name": conversation["name"]}
for conversation in conversations
@@ -214,17 +217,34 @@ class SubmitFeedback(Resource):
return missing_fields
try:
conversations_collection.update_one(
{
"_id": ObjectId(data["conversation_id"]),
f"queries.{data['question_index']}": {"$exists": True},
},
{
"$set": {
f"queries.{data['question_index']}.feedback": data["feedback"]
}
},
)
if data["feedback"] is None:
# Remove feedback and feedback_timestamp if feedback is null
conversations_collection.update_one(
{
"_id": ObjectId(data["conversation_id"]),
f"queries.{data['question_index']}": {"$exists": True},
},
{
"$unset": {
f"queries.{data['question_index']}.feedback": "",
f"queries.{data['question_index']}.feedback_timestamp": ""
}
},
)
else:
# Set feedback and feedback_timestamp if feedback has a value
conversations_collection.update_one(
{
"_id": ObjectId(data["conversation_id"]),
f"queries.{data['question_index']}": {"$exists": True},
},
{
"$set": {
f"queries.{data['question_index']}.feedback": data["feedback"],
f"queries.{data['question_index']}.feedback_timestamp": datetime.datetime.now(datetime.timezone.utc)
}
},
)
except Exception as err:
current_app.logger.error(f"Error submitting feedback: {err}")
@@ -1186,21 +1206,12 @@ class GetMessageAnalytics(Resource):
get_message_analytics_model = api.model(
"GetMessageAnalyticsModel",
{
"api_key_id": fields.String(
required=False,
description="API Key ID",
),
"api_key_id": fields.String(required=False, description="API Key ID"),
"filter_option": fields.String(
required=False,
description="Filter option for analytics",
default="last_30_days",
enum=[
"last_hour",
"last_24_hour",
"last_7_days",
"last_15_days",
"last_30_days",
],
enum=["last_hour", "last_24_hour", "last_7_days", "last_15_days", "last_30_days"],
),
},
)
@@ -1221,42 +1232,21 @@ class GetMessageAnalytics(Resource):
except Exception as err:
current_app.logger.error(f"Error getting API key: {err}")
return make_response(jsonify({"success": False}), 400)
end_date = datetime.datetime.now(datetime.timezone.utc)
if filter_option == "last_hour":
start_date = end_date - datetime.timedelta(hours=1)
group_format = "%Y-%m-%d %H:%M:00"
group_stage = {
"$group": {
"_id": {
"minute": {
"$dateToString": {"format": group_format, "date": "$date"}
}
},
"total_messages": {"$sum": 1},
}
}
elif filter_option == "last_24_hour":
start_date = end_date - datetime.timedelta(hours=24)
group_format = "%Y-%m-%d %H:00"
group_stage = {
"$group": {
"_id": {
"hour": {
"$dateToString": {"format": group_format, "date": "$date"}
}
},
"total_messages": {"$sum": 1},
}
}
else:
if filter_option in ["last_7_days", "last_15_days", "last_30_days"]:
filter_days = (
6
if filter_option == "last_7_days"
else (14 if filter_option == "last_15_days" else 29)
6 if filter_option == "last_7_days"
else 14 if filter_option == "last_15_days"
else 29
)
else:
return make_response(
@@ -1264,36 +1254,44 @@ class GetMessageAnalytics(Resource):
)
start_date = end_date - datetime.timedelta(days=filter_days)
start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_date = end_date.replace(
hour=23, minute=59, second=59, microsecond=999999
)
end_date = end_date.replace(hour=23, minute=59, second=59, microsecond=999999)
group_format = "%Y-%m-%d"
group_stage = {
"$group": {
"_id": {
"day": {
"$dateToString": {"format": group_format, "date": "$date"}
}
},
"total_messages": {"$sum": 1},
}
}
try:
match_stage = {
"$match": {
"date": {"$gte": start_date, "$lte": end_date},
}
}
if api_key:
match_stage["$match"]["api_key"] = api_key
message_data = conversations_collection.aggregate(
[
match_stage,
group_stage,
{"$sort": {"_id": 1}},
]
)
pipeline = [
# Initial match for API key if provided
{
"$match": {
"api_key": api_key if api_key else {"$exists": False}
}
},
{"$unwind": "$queries"},
# Match queries within the time range
{
"$match": {
"queries.timestamp": {
"$gte": start_date,
"$lte": end_date
}
}
},
# Group by formatted timestamp
{
"$group": {
"_id": {
"$dateToString": {
"format": group_format,
"date": "$queries.timestamp"
}
},
"count": {"$sum": 1}
}
},
# Sort by timestamp
{"$sort": {"_id": 1}}
]
message_data = conversations_collection.aggregate(pipeline)
if filter_option == "last_hour":
intervals = generate_minute_range(start_date, end_date)
@@ -1305,12 +1303,7 @@ class GetMessageAnalytics(Resource):
daily_messages = {interval: 0 for interval in intervals}
for entry in message_data:
if filter_option == "last_hour":
daily_messages[entry["_id"]["minute"]] = entry["total_messages"]
elif filter_option == "last_24_hour":
daily_messages[entry["_id"]["hour"]] = entry["total_messages"]
else:
daily_messages[entry["_id"]["day"]] = entry["total_messages"]
daily_messages[entry["_id"]] = entry["count"]
except Exception as err:
current_app.logger.error(f"Error getting message analytics: {err}")
@@ -1358,6 +1351,7 @@ class GetTokenAnalytics(Resource):
except Exception as err:
current_app.logger.error(f"Error getting API key: {err}")
return make_response(jsonify({"success": False}), 400)
end_date = datetime.datetime.now(datetime.timezone.utc)
if filter_option == "last_hour":
@@ -1378,7 +1372,6 @@ class GetTokenAnalytics(Resource):
},
}
}
elif filter_option == "last_24_hour":
start_date = end_date - datetime.timedelta(hours=24)
group_format = "%Y-%m-%d %H:00"
@@ -1397,7 +1390,6 @@ class GetTokenAnalytics(Resource):
},
}
}
else:
if filter_option in ["last_7_days", "last_15_days", "last_30_days"]:
filter_days = (
@@ -1439,6 +1431,8 @@ class GetTokenAnalytics(Resource):
}
if api_key:
match_stage["$match"]["api_key"] = api_key
else:
match_stage["$match"]["api_key"] = {"$exists": False}
token_usage_data = token_usage_collection.aggregate(
[
@@ -1517,11 +1511,11 @@ class GetFeedbackAnalytics(Resource):
if filter_option == "last_hour":
start_date = end_date - datetime.timedelta(hours=1)
group_format = "%Y-%m-%d %H:%M:00"
date_field = {"$dateToString": {"format": group_format, "date": "$date"}}
date_field = {"$dateToString": {"format": group_format, "date": "$queries.feedback_timestamp"}}
elif filter_option == "last_24_hour":
start_date = end_date - datetime.timedelta(hours=24)
group_format = "%Y-%m-%d %H:00"
date_field = {"$dateToString": {"format": group_format, "date": "$date"}}
date_field = {"$dateToString": {"format": group_format, "date": "$queries.feedback_timestamp"}}
else:
if filter_option in ["last_7_days", "last_15_days", "last_30_days"]:
filter_days = (
@@ -1539,17 +1533,19 @@ class GetFeedbackAnalytics(Resource):
hour=23, minute=59, second=59, microsecond=999999
)
group_format = "%Y-%m-%d"
date_field = {"$dateToString": {"format": group_format, "date": "$date"}}
date_field = {"$dateToString": {"format": group_format, "date": "$queries.feedback_timestamp"}}
try:
match_stage = {
"$match": {
"date": {"$gte": start_date, "$lte": end_date},
"queries": {"$exists": True, "$ne": []},
"queries.feedback_timestamp": {"$gte": start_date, "$lte": end_date},
"queries.feedback": {"$exists": True}
}
}
if api_key:
match_stage["$match"]["api_key"] = api_key
else:
match_stage["$match"]["api_key"] = {"$exists": False}
# Unwind the queries array to process each query separately
pipeline = [