Merge branch 'main' into feature_n/lancedb

This commit is contained in:
Alex
2024-10-19 16:22:39 +01:00
committed by GitHub
124 changed files with 8312 additions and 2573 deletions

View File

@@ -1,29 +1,38 @@
import asyncio
import datetime
import json
import logging
import os
import sys
from flask import Blueprint, request, Response, current_app
import json
import datetime
import logging
import traceback
from pymongo import MongoClient
from bson.dbref import DBRef
from bson.objectid import ObjectId
from flask import Blueprint, current_app, make_response, request, Response
from flask_restx import fields, Namespace, Resource
from pymongo import MongoClient
from application.core.settings import settings
from application.error import bad_request
from application.extensions import api
from application.llm.llm_creator import LLMCreator
from application.retriever.retriever_creator import RetrieverCreator
from application.error import bad_request
from application.utils import check_required_fields
logger = logging.getLogger(__name__)
mongo = MongoClient(settings.MONGO_URI)
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
vectors_collection = db["vectors"]
sources_collection = db["sources"]
prompts_collection = db["prompts"]
api_key_collection = db["api_keys"]
user_logs_collection = db["user_logs"]
answer = Blueprint("answer", __name__)
answer_ns = Namespace("answer", description="Answer related operations", path="/")
api.add_namespace(answer_ns)
gpt_model = ""
# to have some kind of default behaviour
@@ -31,6 +40,8 @@ if settings.LLM_NAME == "openai":
gpt_model = "gpt-3.5-turbo"
elif settings.LLM_NAME == "anthropic":
gpt_model = "claude-2"
elif settings.LLM_NAME == "groq":
gpt_model = "llama3-8b-8192"
if settings.MODEL_NAME: # in case there is particular model name configured
gpt_model = settings.MODEL_NAME
@@ -74,27 +85,29 @@ def run_async_chain(chain, question, chat_history):
def get_data_from_api_key(api_key):
data = api_key_collection.find_one({"key": api_key})
# # Raise custom exception if the API key is not found
if data is None:
raise Exception("Invalid API Key, please generate new key", 401)
if "retriever" not in data:
data["retriever"] = None
if "source" in data and isinstance(data["source"], DBRef):
source_doc = db.dereference(data["source"])
data["source"] = str(source_doc["_id"])
if "retriever" in source_doc:
data["retriever"] = source_doc["retriever"]
else:
data["source"] = {}
return data
def get_vectorstore(data):
if "active_docs" in data:
if data["active_docs"].split("/")[0] == "default":
vectorstore = ""
elif data["active_docs"].split("/")[0] == "local":
vectorstore = "indexes/" + data["active_docs"]
else:
vectorstore = "vectors/" + data["active_docs"]
if data["active_docs"] == "default":
vectorstore = ""
else:
vectorstore = ""
vectorstore = os.path.join("application", vectorstore)
return vectorstore
def get_retriever(source_id: str):
doc = sources_collection.find_one({"_id": ObjectId(source_id)})
if doc is None:
raise Exception("Source document does not exist", 404)
retriever_name = None if "retriever" not in doc else doc["retriever"]
return retriever_name
def is_azure_configured():
@@ -180,6 +193,13 @@ def complete_stream(
response_full = ""
source_log_docs = []
answer = retriever.gen()
sources = retriever.search()
for source in sources:
if "text" in source:
source["text"] = source["text"][:100].strip() + "..."
if len(sources) > 0:
data = json.dumps({"type": "source", "source": sources})
yield f"data: {data}\n\n"
for line in answer:
if "answer" in line:
response_full += str(line["answer"])
@@ -203,6 +223,20 @@ def complete_stream(
data = json.dumps({"type": "id", "id": str(conversation_id)})
yield f"data: {data}\n\n"
retriever_params = retriever.get_params()
user_logs_collection.insert_one(
{
"action": "stream_answer",
"level": "info",
"user": "local",
"api_key": user_api_key,
"question": question,
"response": response_full,
"sources": source_log_docs,
"retriever_params": retriever_params,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
}
)
data = json.dumps({"type": "end"})
yield f"data: {data}\n\n"
except Exception as e:
@@ -218,106 +252,134 @@ def complete_stream(
return
@answer.route("/stream", methods=["POST"])
def stream():
try:
data = request.get_json()
question = data["question"]
if "history" not in data:
history = []
else:
history = data["history"]
history = json.loads(history)
if "conversation_id" not in data:
conversation_id = None
else:
conversation_id = data["conversation_id"]
if "prompt_id" in data:
prompt_id = data["prompt_id"]
else:
prompt_id = "default"
if "selectedDocs" in data and data["selectedDocs"] is None:
chunks = 0
elif "chunks" in data:
chunks = int(data["chunks"])
else:
chunks = 2
if "token_limit" in data:
token_limit = data["token_limit"]
else:
token_limit = settings.DEFAULT_MAX_HISTORY
# check if active_docs or api_key is set
if "api_key" in data:
data_key = get_data_from_api_key(data["api_key"])
chunks = int(data_key["chunks"])
prompt_id = data_key["prompt_id"]
source = {"active_docs": data_key["source"]}
user_api_key = data["api_key"]
elif "active_docs" in data:
source = {"active_docs": data["active_docs"]}
user_api_key = None
else:
source = {}
user_api_key = None
if source["active_docs"].split("/")[0] in ["default", "local"]:
retriever_name = "classic"
else:
retriever_name = source["active_docs"]
current_app.logger.info(f"/stream - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})}
)
prompt = get_prompt(prompt_id)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
source=source,
chat_history=history,
prompt=prompt,
chunks=chunks,
token_limit=token_limit,
gpt_model=gpt_model,
user_api_key=user_api_key,
)
return Response(
complete_stream(
question=question,
retriever=retriever,
conversation_id=conversation_id,
user_api_key=user_api_key,
isNoneDoc=data.get("isNoneDoc"),
@answer_ns.route("/stream")
class Stream(Resource):
stream_model = api.model(
"StreamModel",
{
"question": fields.String(
required=True, description="Question to be asked"
),
mimetype="text/event-stream",
)
"history": fields.List(
fields.String, required=False, description="Chat history"
),
"conversation_id": fields.String(
required=False, description="Conversation ID"
),
"prompt_id": fields.String(
required=False, default="default", description="Prompt ID"
),
"selectedDocs": fields.String(
required=False, description="Selected documents"
),
"chunks": fields.Integer(
required=False, default=2, description="Number of chunks"
),
"token_limit": fields.Integer(required=False, description="Token limit"),
"retriever": fields.String(required=False, description="Retriever type"),
"api_key": fields.String(required=False, description="API key"),
"active_docs": fields.String(
required=False, description="Active documents"
),
"isNoneDoc": fields.Boolean(
required=False, description="Flag indicating if no document is used"
),
},
)
except ValueError:
message = "Malformed request body"
print("\033[91merr", str(message), file=sys.stderr)
return Response(
error_stream_generate(message),
status=400,
mimetype="text/event-stream",
)
except Exception as e:
current_app.logger.error(f"/stream - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()}
)
message = e.args[0]
status_code = 400
# # Custom exceptions with two arguments, index 1 as status code
if len(e.args) >= 2:
status_code = e.args[1]
return Response(
error_stream_generate(message),
status=status_code,
mimetype="text/event-stream",
)
@api.expect(stream_model)
@api.doc(description="Stream a response based on the question and retriever")
def post(self):
data = request.get_json()
required_fields = ["question"]
missing_fields = check_required_fields(data, required_fields)
if missing_fields:
return missing_fields
try:
question = data["question"]
history = data.get("history", [])
history = json.loads(history)
conversation_id = data.get("conversation_id")
prompt_id = data.get("prompt_id", "default")
if "selectedDocs" in data and data["selectedDocs"] is None:
chunks = 0
else:
chunks = int(data.get("chunks", 2))
token_limit = data.get("token_limit", settings.DEFAULT_MAX_HISTORY)
retriever_name = data.get("retriever", "classic")
if "api_key" in data:
data_key = get_data_from_api_key(data["api_key"])
chunks = int(data_key.get("chunks", 2))
prompt_id = data_key.get("prompt_id", "default")
source = {"active_docs": data_key.get("source")}
retriever_name = data_key.get("retriever", retriever_name)
user_api_key = data["api_key"]
elif "active_docs" in data:
source = {"active_docs": data["active_docs"]}
retriever_name = get_retriever(data["active_docs"]) or retriever_name
user_api_key = None
else:
source = {}
user_api_key = None
current_app.logger.info(
f"/stream - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})},
)
prompt = get_prompt(prompt_id)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
source=source,
chat_history=history,
prompt=prompt,
chunks=chunks,
token_limit=token_limit,
gpt_model=gpt_model,
user_api_key=user_api_key,
)
return Response(
complete_stream(
question=question,
retriever=retriever,
conversation_id=conversation_id,
user_api_key=user_api_key,
isNoneDoc=data.get("isNoneDoc"),
),
mimetype="text/event-stream",
)
except ValueError:
message = "Malformed request body"
print("\033[91merr", str(message), file=sys.stderr)
return Response(
error_stream_generate(message),
status=400,
mimetype="text/event-stream",
)
except Exception as e:
current_app.logger.error(
f"/stream - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()},
)
message = e.args[0]
status_code = 400
# Custom exceptions with two arguments, index 1 as status code
if len(e.args) >= 2:
status_code = e.args[1]
return Response(
error_stream_generate(message),
status=status_code,
mimetype="text/event-stream",
)
def error_stream_generate(err_response):
@@ -325,143 +387,235 @@ def error_stream_generate(err_response):
yield f"data: {data}\n\n"
@answer.route("/api/answer", methods=["POST"])
def api_answer():
data = request.get_json()
question = data["question"]
if "history" not in data:
history = []
else:
history = data["history"]
if "conversation_id" not in data:
conversation_id = None
else:
conversation_id = data["conversation_id"]
print("-" * 5)
if "prompt_id" in data:
prompt_id = data["prompt_id"]
else:
prompt_id = "default"
if "chunks" in data:
chunks = int(data["chunks"])
else:
chunks = 2
if "token_limit" in data:
token_limit = data["token_limit"]
else:
token_limit = settings.DEFAULT_MAX_HISTORY
try:
# check if the vectorstore is set
if "api_key" in data:
data_key = get_data_from_api_key(data["api_key"])
chunks = int(data_key["chunks"])
prompt_id = data_key["prompt_id"]
source = {"active_docs": data_key["source"]}
user_api_key = data["api_key"]
else:
source = data
user_api_key = None
if source["active_docs"].split("/")[0] in ["default", "local"]:
retriever_name = "classic"
else:
retriever_name = source["active_docs"]
prompt = get_prompt(prompt_id)
current_app.logger.info(f"/api/answer - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})}
)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
source=source,
chat_history=history,
prompt=prompt,
chunks=chunks,
token_limit=token_limit,
gpt_model=gpt_model,
user_api_key=user_api_key,
)
source_log_docs = []
response_full = ""
for line in retriever.gen():
if "source" in line:
source_log_docs.append(line["source"])
elif "answer" in line:
response_full += line["answer"]
if data.get("isNoneDoc"):
for doc in source_log_docs:
doc["source"] = "None"
llm = LLMCreator.create_llm(
settings.LLM_NAME, api_key=settings.API_KEY, user_api_key=user_api_key
)
result = {"answer": response_full, "sources": source_log_docs}
result["conversation_id"] = save_conversation(
conversation_id, question, response_full, source_log_docs, llm
)
return result
except Exception as e:
current_app.logger.error(f"/api/answer - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()}
)
return bad_request(500, str(e))
@answer.route("/api/search", methods=["POST"])
def api_search():
data = request.get_json()
question = data["question"]
if "chunks" in data:
chunks = int(data["chunks"])
else:
chunks = 2
if "api_key" in data:
data_key = get_data_from_api_key(data["api_key"])
chunks = int(data_key["chunks"])
source = {"active_docs": data_key["source"]}
user_api_key = data["api_key"]
elif "active_docs" in data:
source = {"active_docs": data["active_docs"]}
user_api_key = None
else:
source = {}
user_api_key = None
if source["active_docs"].split("/")[0] in ["default", "local"]:
retriever_name = "classic"
else:
retriever_name = source["active_docs"]
if "token_limit" in data:
token_limit = data["token_limit"]
else:
token_limit = settings.DEFAULT_MAX_HISTORY
current_app.logger.info(f"/api/answer - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})}
@answer_ns.route("/api/answer")
class Answer(Resource):
answer_model = api.model(
"AnswerModel",
{
"question": fields.String(
required=True, description="The question to answer"
),
"history": fields.List(
fields.String, required=False, description="Conversation history"
),
"conversation_id": fields.String(
required=False, description="Conversation ID"
),
"prompt_id": fields.String(
required=False, default="default", description="Prompt ID"
),
"chunks": fields.Integer(
required=False, default=2, description="Number of chunks"
),
"token_limit": fields.Integer(required=False, description="Token limit"),
"retriever": fields.String(required=False, description="Retriever type"),
"api_key": fields.String(required=False, description="API key"),
"active_docs": fields.String(
required=False, description="Active documents"
),
"isNoneDoc": fields.Boolean(
required=False, description="Flag indicating if no document is used"
),
},
)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
source=source,
chat_history=[],
prompt="default",
chunks=chunks,
token_limit=token_limit,
gpt_model=gpt_model,
user_api_key=user_api_key,
@api.expect(answer_model)
@api.doc(description="Provide an answer based on the question and retriever")
def post(self):
data = request.get_json()
required_fields = ["question"]
missing_fields = check_required_fields(data, required_fields)
if missing_fields:
return missing_fields
try:
question = data["question"]
history = data.get("history", [])
conversation_id = data.get("conversation_id")
prompt_id = data.get("prompt_id", "default")
chunks = int(data.get("chunks", 2))
token_limit = data.get("token_limit", settings.DEFAULT_MAX_HISTORY)
retriever_name = data.get("retriever", "classic")
if "api_key" in data:
data_key = get_data_from_api_key(data["api_key"])
chunks = int(data_key.get("chunks", 2))
prompt_id = data_key.get("prompt_id", "default")
source = {"active_docs": data_key.get("source")}
retriever_name = data_key.get("retriever", retriever_name)
user_api_key = data["api_key"]
elif "active_docs" in data:
source = {"active_docs": data["active_docs"]}
retriever_name = get_retriever(data["active_docs"]) or retriever_name
user_api_key = None
else:
source = {}
user_api_key = None
prompt = get_prompt(prompt_id)
current_app.logger.info(
f"/api/answer - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})},
)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
source=source,
chat_history=history,
prompt=prompt,
chunks=chunks,
token_limit=token_limit,
gpt_model=gpt_model,
user_api_key=user_api_key,
)
source_log_docs = []
response_full = ""
for line in retriever.gen():
if "source" in line:
source_log_docs.append(line["source"])
elif "answer" in line:
response_full += line["answer"]
if data.get("isNoneDoc"):
for doc in source_log_docs:
doc["source"] = "None"
llm = LLMCreator.create_llm(
settings.LLM_NAME, api_key=settings.API_KEY, user_api_key=user_api_key
)
result = {"answer": response_full, "sources": source_log_docs}
result["conversation_id"] = str(
save_conversation(
conversation_id, question, response_full, source_log_docs, llm
)
)
retriever_params = retriever.get_params()
user_logs_collection.insert_one(
{
"action": "api_answer",
"level": "info",
"user": "local",
"api_key": user_api_key,
"question": question,
"response": response_full,
"sources": source_log_docs,
"retriever_params": retriever_params,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
}
)
except Exception as e:
current_app.logger.error(
f"/api/answer - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()},
)
return bad_request(500, str(e))
return make_response(result, 200)
@answer_ns.route("/api/search")
class Search(Resource):
search_model = api.model(
"SearchModel",
{
"question": fields.String(
required=True, description="The question to search"
),
"chunks": fields.Integer(
required=False, default=2, description="Number of chunks"
),
"api_key": fields.String(
required=False, description="API key for authentication"
),
"active_docs": fields.String(
required=False, description="Active documents for retrieval"
),
"retriever": fields.String(required=False, description="Retriever type"),
"token_limit": fields.Integer(
required=False, description="Limit for tokens"
),
"isNoneDoc": fields.Boolean(
required=False, description="Flag indicating if no document is used"
),
},
)
docs = retriever.search()
if data.get("isNoneDoc"):
for doc in docs:
doc["source"] = "None"
@api.expect(search_model)
@api.doc(
description="Search for relevant documents based on the question and retriever"
)
def post(self):
data = request.get_json()
required_fields = ["question"]
missing_fields = check_required_fields(data, required_fields)
if missing_fields:
return missing_fields
return docs
try:
question = data["question"]
chunks = int(data.get("chunks", 2))
token_limit = data.get("token_limit", settings.DEFAULT_MAX_HISTORY)
retriever_name = data.get("retriever", "classic")
if "api_key" in data:
data_key = get_data_from_api_key(data["api_key"])
chunks = int(data_key.get("chunks", 2))
source = {"active_docs": data_key.get("source")}
user_api_key = data["api_key"]
elif "active_docs" in data:
source = {"active_docs": data["active_docs"]}
user_api_key = None
else:
source = {}
user_api_key = None
current_app.logger.info(
f"/api/answer - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})},
)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
source=source,
chat_history=[],
prompt="default",
chunks=chunks,
token_limit=token_limit,
gpt_model=gpt_model,
user_api_key=user_api_key,
)
docs = retriever.search()
retriever_params = retriever.get_params()
user_logs_collection.insert_one(
{
"action": "api_search",
"level": "info",
"user": "local",
"api_key": user_api_key,
"question": question,
"sources": docs,
"retriever_params": retriever_params,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
}
)
if data.get("isNoneDoc"):
for doc in docs:
doc["source"] = "None"
except Exception as e:
current_app.logger.error(
f"/api/search - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()},
)
return bad_request(500, str(e))
return make_response(docs, 200)

View File

@@ -3,18 +3,23 @@ import datetime
from flask import Blueprint, request, send_from_directory
from pymongo import MongoClient
from werkzeug.utils import secure_filename
from bson.objectid import ObjectId
from application.core.settings import settings
mongo = MongoClient(settings.MONGO_URI)
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
vectors_collection = db["vectors"]
sources_collection = db["sources"]
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
internal = Blueprint("internal", __name__)
internal = Blueprint('internal', __name__)
@internal.route("/api/download", methods=["get"])
def download_file():
user = secure_filename(request.args.get("user"))
@@ -24,7 +29,6 @@ def download_file():
return send_from_directory(save_dir, filename, as_attachment=True)
@internal.route("/api/upload_index", methods=["POST"])
def upload_index_files():
"""Upload two files(index.faiss, index.pkl) to the user's folder."""
@@ -35,7 +39,13 @@ def upload_index_files():
return {"status": "no name"}
job_name = secure_filename(request.form["name"])
tokens = secure_filename(request.form["tokens"])
save_dir = os.path.join(current_dir, "indexes", user, job_name)
retriever = secure_filename(request.form["retriever"])
id = secure_filename(request.form["id"])
type = secure_filename(request.form["type"])
remote_data = request.form["remote_data"] if "remote_data" in request.form else None
sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None
save_dir = os.path.join(current_dir, "indexes", str(id))
if settings.VECTOR_STORE == "faiss":
if "file_faiss" not in request.files:
print("No file part")
@@ -50,22 +60,45 @@ def upload_index_files():
if file_pkl.filename == "":
return {"status": "no file name"}
# saves index files
if not os.path.exists(save_dir):
os.makedirs(save_dir)
file_faiss.save(os.path.join(save_dir, "index.faiss"))
file_pkl.save(os.path.join(save_dir, "index.pkl"))
# create entry in vectors_collection
vectors_collection.insert_one(
{
"user": user,
"name": job_name,
"language": job_name,
"location": save_dir,
"date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"model": settings.EMBEDDINGS_NAME,
"type": "local",
"tokens": tokens
}
)
return {"status": "ok"}
existing_entry = sources_collection.find_one({"_id": ObjectId(id)})
if existing_entry:
sources_collection.update_one(
{"_id": ObjectId(id)},
{
"$set": {
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now(),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
}
},
)
else:
sources_collection.insert_one(
{
"_id": ObjectId(id),
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now(),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
}
)
return {"status": "ok"}

File diff suppressed because it is too large Load Diff

View File

@@ -1,12 +1,38 @@
from application.worker import ingest_worker, remote_worker
from datetime import timedelta
from application.celery_init import celery
from application.worker import ingest_worker, remote_worker, sync_worker
@celery.task(bind=True)
def ingest(self, directory, formats, name_job, filename, user):
resp = ingest_worker(self, directory, formats, name_job, filename, user)
return resp
@celery.task(bind=True)
def ingest_remote(self, source_data, job_name, user, loader):
resp = remote_worker(self, source_data, job_name, user, loader)
return resp
@celery.task(bind=True)
def schedule_syncs(self, frequency):
resp = sync_worker(self, frequency)
return resp
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
timedelta(days=1),
schedule_syncs.s("daily"),
)
sender.add_periodic_task(
timedelta(weeks=1),
schedule_syncs.s("weekly"),
)
sender.add_periodic_task(
timedelta(days=30),
schedule_syncs.s("monthly"),
)

View File

@@ -1,15 +1,19 @@
import platform
import dotenv
from application.celery_init import celery
from flask import Flask, request, redirect
from application.core.settings import settings
from application.api.user.routes import user
from flask import Flask, redirect, request
from application.api.answer.routes import answer
from application.api.internal.routes import internal
from application.api.user.routes import user
from application.celery_init import celery
from application.core.logging_config import setup_logging
from application.core.settings import settings
from application.extensions import api
if platform.system() == "Windows":
import pathlib
pathlib.PosixPath = pathlib.WindowsPath
dotenv.load_dotenv()
@@ -23,16 +27,19 @@ app.config.update(
UPLOAD_FOLDER="inputs",
CELERY_BROKER_URL=settings.CELERY_BROKER_URL,
CELERY_RESULT_BACKEND=settings.CELERY_RESULT_BACKEND,
MONGO_URI=settings.MONGO_URI
MONGO_URI=settings.MONGO_URI,
)
celery.config_from_object("application.celeryconfig")
api.init_app(app)
@app.route("/")
def home():
if request.remote_addr in ('0.0.0.0', '127.0.0.1', 'localhost', '172.18.0.1'):
return redirect('http://localhost:5173')
if request.remote_addr in ("0.0.0.0", "127.0.0.1", "localhost", "172.18.0.1"):
return redirect("http://localhost:5173")
else:
return 'Welcome to DocsGPT Backend!'
return "Welcome to DocsGPT Backend!"
@app.after_request
def after_request(response):
@@ -41,6 +48,6 @@ def after_request(response):
response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE,OPTIONS")
return response
if __name__ == "__main__":
app.run(debug=settings.FLASK_DEBUG_MODE, port=7091)

93
application/cache.py Normal file
View File

@@ -0,0 +1,93 @@
import redis
import time
import json
import logging
from threading import Lock
from application.core.settings import settings
from application.utils import get_hash
logger = logging.getLogger(__name__)
_redis_instance = None
_instance_lock = Lock()
def get_redis_instance():
global _redis_instance
if _redis_instance is None:
with _instance_lock:
if _redis_instance is None:
try:
_redis_instance = redis.Redis.from_url(settings.CACHE_REDIS_URL, socket_connect_timeout=2)
except redis.ConnectionError as e:
logger.error(f"Redis connection error: {e}")
_redis_instance = None
return _redis_instance
def gen_cache_key(*messages, model="docgpt"):
if not all(isinstance(msg, dict) for msg in messages):
raise ValueError("All messages must be dictionaries.")
messages_str = json.dumps(list(messages), sort_keys=True)
combined = f"{model}_{messages_str}"
cache_key = get_hash(combined)
return cache_key
def gen_cache(func):
def wrapper(self, model, messages, *args, **kwargs):
try:
cache_key = gen_cache_key(*messages)
redis_client = get_redis_instance()
if redis_client:
try:
cached_response = redis_client.get(cache_key)
if cached_response:
return cached_response.decode('utf-8')
except redis.ConnectionError as e:
logger.error(f"Redis connection error: {e}")
result = func(self, model, messages, *args, **kwargs)
if redis_client:
try:
redis_client.set(cache_key, result, ex=1800)
except redis.ConnectionError as e:
logger.error(f"Redis connection error: {e}")
return result
except ValueError as e:
logger.error(e)
return "Error: No user message found in the conversation to generate a cache key."
return wrapper
def stream_cache(func):
def wrapper(self, model, messages, stream, *args, **kwargs):
cache_key = gen_cache_key(*messages)
logger.info(f"Stream cache key: {cache_key}")
redis_client = get_redis_instance()
if redis_client:
try:
cached_response = redis_client.get(cache_key)
if cached_response:
logger.info(f"Cache hit for stream key: {cache_key}")
cached_response = json.loads(cached_response.decode('utf-8'))
for chunk in cached_response:
yield chunk
time.sleep(0.03)
return
except redis.ConnectionError as e:
logger.error(f"Redis connection error: {e}")
result = func(self, model, messages, stream, *args, **kwargs)
stream_cache_data = []
for chunk in result:
stream_cache_data.append(chunk)
yield chunk
if redis_client:
try:
redis_client.set(cache_key, json.dumps(stream_cache_data), ex=1800)
logger.info(f"Stream cache saved for key: {cache_key}")
except redis.ConnectionError as e:
logger.error(f"Redis connection error: {e}")
return wrapper

View File

@@ -21,6 +21,9 @@ class Settings(BaseSettings):
VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb"
RETRIEVERS_ENABLED: list = ["classic_rag", "duckduck_search"] # also brave_search
# LLM Cache
CACHE_REDIS_URL: str = "redis://localhost:6379/2"
API_URL: str = "http://localhost:7091" # backend url for celery worker
API_KEY: Optional[str] = None # LLM api key

View File

@@ -0,0 +1,7 @@
from flask_restx import Api
api = Api(
version="1.0",
title="DocsGPT API",
description="API for DocsGPT",
)

View File

@@ -1,28 +1,29 @@
from abc import ABC, abstractmethod
from application.usage import gen_token_usage, stream_token_usage
from application.cache import stream_cache, gen_cache
class BaseLLM(ABC):
def __init__(self):
self.token_usage = {"prompt_tokens": 0, "generated_tokens": 0}
def _apply_decorator(self, method, decorator, *args, **kwargs):
return decorator(method, *args, **kwargs)
def _apply_decorator(self, method, decorators, *args, **kwargs):
for decorator in decorators:
method = decorator(method)
return method(self, *args, **kwargs)
@abstractmethod
def _raw_gen(self, model, messages, stream, *args, **kwargs):
pass
def gen(self, model, messages, stream=False, *args, **kwargs):
return self._apply_decorator(self._raw_gen, gen_token_usage)(
self, model=model, messages=messages, stream=stream, *args, **kwargs
)
decorators = [gen_token_usage, gen_cache]
return self._apply_decorator(self._raw_gen, decorators=decorators, model=model, messages=messages, stream=stream, *args, **kwargs)
@abstractmethod
def _raw_gen_stream(self, model, messages, stream, *args, **kwargs):
pass
def gen_stream(self, model, messages, stream=True, *args, **kwargs):
return self._apply_decorator(self._raw_gen_stream, stream_token_usage)(
self, model=model, messages=messages, stream=stream, *args, **kwargs
)
decorators = [stream_cache, stream_token_usage]
return self._apply_decorator(self._raw_gen_stream, decorators=decorators, model=model, messages=messages, stream=stream, *args, **kwargs)

45
application/llm/groq.py Normal file
View File

@@ -0,0 +1,45 @@
from application.llm.base import BaseLLM
class GroqLLM(BaseLLM):
def __init__(self, api_key=None, user_api_key=None, *args, **kwargs):
from openai import OpenAI
super().__init__(*args, **kwargs)
self.client = OpenAI(api_key=api_key, base_url="https://api.groq.com/openai/v1")
self.api_key = api_key
self.user_api_key = user_api_key
def _raw_gen(
self,
baseself,
model,
messages,
stream=False,
**kwargs
):
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, **kwargs
)
return response.choices[0].message.content
def _raw_gen_stream(
self,
baseself,
model,
messages,
stream=True,
**kwargs
):
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, **kwargs
)
for line in response:
# import sys
# print(line.choices[0].delta.content, file=sys.stderr)
if line.choices[0].delta.content is not None:
yield line.choices[0].delta.content

View File

@@ -1,3 +1,4 @@
from application.llm.groq import GroqLLM
from application.llm.openai import OpenAILLM, AzureOpenAILLM
from application.llm.sagemaker import SagemakerAPILLM
from application.llm.huggingface import HuggingFaceLLM
@@ -17,6 +18,7 @@ class LLMCreator:
"anthropic": AnthropicLLM,
"docsgpt": DocsGPTAPILLM,
"premai": PremAILLM,
"groq": GroqLLM
}
@classmethod

View File

@@ -10,13 +10,14 @@ from application.parser.file.epub_parser import EpubParser
from application.parser.file.html_parser import HTMLParser
from application.parser.file.markdown_parser import MarkdownParser
from application.parser.file.rst_parser import RstParser
from application.parser.file.tabular_parser import PandasCSVParser
from application.parser.file.tabular_parser import PandasCSVParser,ExcelParser
from application.parser.schema.base import Document
DEFAULT_FILE_EXTRACTOR: Dict[str, BaseParser] = {
".pdf": PDFParser(),
".docx": DocxParser(),
".csv": PandasCSVParser(),
".xlsx":ExcelParser(),
".epub": EpubParser(),
".md": MarkdownParser(),
".rst": RstParser(),

View File

@@ -113,3 +113,68 @@ class PandasCSVParser(BaseParser):
return (self._row_joiner).join(text_list)
else:
return text_list
class ExcelParser(BaseParser):
r"""Excel (.xlsx) parser.
Parses Excel files using Pandas `read_excel` function.
If special parameters are required, use the `pandas_config` dict.
Args:
concat_rows (bool): whether to concatenate all rows into one document.
If set to False, a Document will be created for each row.
True by default.
col_joiner (str): Separator to use for joining cols per row.
Set to ", " by default.
row_joiner (str): Separator to use for joining each row.
Only used when `concat_rows=True`.
Set to "\n" by default.
pandas_config (dict): Options for the `pandas.read_excel` function call.
Refer to https://pandas.pydata.org/docs/reference/api/pandas.read_excel.html
for more information.
Set to empty dict by default, this means pandas will try to figure
out the table structure on its own.
"""
def __init__(
self,
*args: Any,
concat_rows: bool = True,
col_joiner: str = ", ",
row_joiner: str = "\n",
pandas_config: dict = {},
**kwargs: Any
) -> None:
"""Init params."""
super().__init__(*args, **kwargs)
self._concat_rows = concat_rows
self._col_joiner = col_joiner
self._row_joiner = row_joiner
self._pandas_config = pandas_config
def _init_parser(self) -> Dict:
"""Init parser."""
return {}
def parse_file(self, file: Path, errors: str = "ignore") -> Union[str, List[str]]:
"""Parse file."""
try:
import pandas as pd
except ImportError:
raise ValueError("pandas module is required to read Excel files.")
df = pd.read_excel(file, **self._pandas_config)
text_list = df.apply(
lambda row: (self._col_joiner).join(row.astype(str).tolist()), axis=1
).tolist()
if self._concat_rows:
return (self._row_joiner).join(text_list)
else:
return text_list

View File

@@ -1,9 +1,11 @@
import os
from application.vectorstore.vector_creator import VectorCreator
from application.core.settings import settings
from retry import retry
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator
# from langchain_community.embeddings import HuggingFaceEmbeddings
# from langchain_community.embeddings import HuggingFaceInstructEmbeddings
@@ -11,12 +13,14 @@ from retry import retry
@retry(tries=10, delay=60)
def store_add_texts_with_retry(store, i):
def store_add_texts_with_retry(store, i, id):
# add source_id to the metadata
i.metadata["source_id"] = str(id)
store.add_texts([i.page_content], metadatas=[i.metadata])
# store_pine.add_texts([i.page_content], metadatas=[i.metadata])
def call_openai_api(docs, folder_name, task_status):
def call_openai_api(docs, folder_name, id, task_status):
# Function to create a vector store from the documents and save it to disk
if not os.path.exists(f"{folder_name}"):
@@ -32,15 +36,16 @@ def call_openai_api(docs, folder_name, task_status):
store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
docs_init=docs_init,
path=f"{folder_name}",
source_id=f"{folder_name}",
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
else:
store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
path=f"{folder_name}",
source_id=str(id),
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
store.delete_index()
# Uncomment for MPNet embeddings
# model_name = "sentence-transformers/all-mpnet-base-v2"
# hf = HuggingFaceEmbeddings(model_name=model_name)
@@ -57,7 +62,7 @@ def call_openai_api(docs, folder_name, task_status):
task_status.update_state(
state="PROGRESS", meta={"current": int((c1 / s1) * 100)}
)
store_add_texts_with_retry(store, i)
store_add_texts_with_retry(store, i, id)
except Exception as e:
print(e)
print("Error on ", i)
@@ -68,5 +73,3 @@ def call_openai_api(docs, folder_name, task_status):
c1 += 1
if settings.VECTOR_STORE == "faiss":
store.save_local(f"{folder_name}")

View File

@@ -0,0 +1,53 @@
import base64
import requests
from typing import List
from application.parser.remote.base import BaseRemote
from langchain_core.documents import Document
class GitHubLoader(BaseRemote):
def __init__(self):
self.access_token = None
self.headers = {
"Authorization": f"token {self.access_token}"
} if self.access_token else {}
return
def fetch_file_content(self, repo_url: str, file_path: str) -> str:
url = f"https://api.github.com/repos/{repo_url}/contents/{file_path}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
content = response.json()
if content.get("encoding") == "base64":
try:
decoded_content = base64.b64decode(content["content"]).decode("utf-8")
return f"Filename: {file_path}\n\n{decoded_content}"
except Exception as e:
print(f"Error decoding content for {file_path}: {e}")
raise
else:
return f"Filename: {file_path}\n\n{content['content']}"
else:
response.raise_for_status()
def fetch_repo_files(self, repo_url: str, path: str = "") -> List[str]:
url = f"https://api.github.com/repos/{repo_url}/contents/{path}"
response = requests.get(url, headers={**self.headers, "Accept": "application/vnd.github.v3.raw"})
contents = response.json()
files = []
for item in contents:
if item["type"] == "file":
files.append(item["path"])
elif item["type"] == "dir":
files.extend(self.fetch_repo_files(repo_url, item["path"]))
return files
def load_data(self, repo_url: str) -> List[Document]:
repo_name = repo_url.split("github.com/")[-1]
files = self.fetch_repo_files(repo_name)
documents = []
for file_path in files:
content = self.fetch_file_content(repo_name, file_path)
documents.append(Document(page_content=content, metadata={"title": file_path,
"source": f"https://github.com/{repo_name}/blob/main/{file_path}"}))
return documents

View File

@@ -2,6 +2,7 @@ from application.parser.remote.sitemap_loader import SitemapLoader
from application.parser.remote.crawler_loader import CrawlerLoader
from application.parser.remote.web_loader import WebLoader
from application.parser.remote.reddit_loader import RedditPostsLoaderRemote
from application.parser.remote.github_loader import GitHubLoader
class RemoteCreator:
@@ -10,6 +11,7 @@ class RemoteCreator:
"sitemap": SitemapLoader,
"crawler": CrawlerLoader,
"reddit": RedditPostsLoaderRemote,
"github": GitHubLoader,
}
@classmethod

View File

@@ -1,37 +1,87 @@
anthropic==0.34.0
anthropic==0.34.2
boto3==1.34.153
beautifulsoup4==4.12.3
celery==5.3.6
dataclasses_json==0.6.7
dataclasses-json==0.6.7
docx2txt==0.8
duckduckgo-search==6.2.6
EbookLib==0.18
elasticsearch==8.14.0
duckduckgo-search==6.3.0
ebooklib==0.18
elastic-transport==8.15.0
elasticsearch==8.15.1
escodegen==1.0.11
esprima==4.0.1
Flask==3.0.1
esutils==1.0.1
Flask==3.0.3
faiss-cpu==1.8.0.post1
flask-restx==1.3.0
gunicorn==23.0.0
html2text==2020.1.16
html2text==2024.2.26
javalang==0.13.0
langchain==0.2.16
langchain-community==0.2.16
langchain-core==0.2.38
langchain-openai==0.1.23
lancedb==0.13.0
openapi3_parser==1.1.16
pandas==2.2.2
pydantic_settings==2.4.0
jinja2==3.1.4
jiter==0.5.0
jmespath==1.0.1
joblib==1.4.2
jsonpatch==1.33
jsonpointer==3.0.0
jsonschema==4.23.0
jsonschema-spec==0.2.4
jsonschema-specifications==2023.7.1
kombu==5.4.2
langchain==0.3.0
langchain-community==0.3.0
langchain-core==0.3.2
langchain-openai==0.2.0
langchain-text-splitters==0.3.0
langsmith==0.1.125
lazy-object-proxy==1.10.0
lxml==5.3.0
markupsafe==2.1.5
marshmallow==3.22.0
mpmath==1.3.0
multidict==6.1.0
mypy-extensions==1.0.0
networkx==3.3
numpy==1.26.4
openai==1.46.1
openapi-schema-validator==0.6.2
openapi-spec-validator==0.6.0
openapi3-parser==1.1.18
orjson==3.10.7
packaging==24.1
pandas==2.2.3
openpyxl==3.1.5
pathable==0.4.3
pillow==10.4.0
portalocker==2.10.1
prance==23.6.21.0
primp==0.6.3
prompt-toolkit==3.0.47
protobuf==5.28.2
py==1.11.0
pydantic==2.9.2
pydantic-core==2.23.4
pydantic-settings==2.4.0
pymongo==4.8.0
PyPDF2==3.0.1
pypdf2==3.0.1
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
qdrant-client==1.11.0
redis==5.0.1
Requests==2.32.0
referencing==0.30.2
regex==2024.9.11
requests==2.32.3
retry==0.9.2
sentence-transformers
sentence-transformers==3.0.1
tiktoken==0.7.0
torch
tqdm==4.66.3
transformers==4.44.0
Werkzeug==3.0.3
tokenizers==0.19.1
torch==2.4.1
tqdm==4.66.5
transformers==4.44.2
typing-extensions==4.12.2
typing-inspect==0.9.0
tzdata==2024.2
urllib3==2.2.3
vine==5.1.0
wcwidth==0.2.13
werkzeug==3.0.4
yarl==1.11.1

View File

@@ -12,3 +12,7 @@ class BaseRetriever(ABC):
@abstractmethod
def search(self, *args, **kwargs):
pass
@abstractmethod
def get_params(self):
pass

View File

@@ -101,3 +101,15 @@ class BraveRetSearch(BaseRetriever):
def search(self):
return self._get_data()
def get_params(self):
return {
"question": self.question,
"source": self.source,
"chat_history": self.chat_history,
"prompt": self.prompt,
"chunks": self.chunks,
"token_limit": self.token_limit,
"gpt_model": self.gpt_model,
"user_api_key": self.user_api_key
}

View File

@@ -1,4 +1,3 @@
import os
from application.retriever.base import BaseRetriever
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator
@@ -21,7 +20,7 @@ class ClassicRAG(BaseRetriever):
user_api_key=None,
):
self.question = question
self.vectorstore = self._get_vectorstore(source=source)
self.vectorstore = source['active_docs'] if 'active_docs' in source else None
self.chat_history = chat_history
self.prompt = prompt
self.chunks = chunks
@@ -38,21 +37,6 @@ class ClassicRAG(BaseRetriever):
)
self.user_api_key = user_api_key
def _get_vectorstore(self, source):
if "active_docs" in source:
if source["active_docs"].split("/")[0] == "default":
vectorstore = ""
elif source["active_docs"].split("/")[0] == "local":
vectorstore = "indexes/" + source["active_docs"]
else:
vectorstore = "vectors/" + source["active_docs"]
if source["active_docs"] == "default":
vectorstore = ""
else:
vectorstore = ""
vectorstore = os.path.join("application", vectorstore)
return vectorstore
def _get_data(self):
if self.chunks == 0:
docs = []
@@ -61,13 +45,12 @@ class ClassicRAG(BaseRetriever):
settings.VECTOR_STORE, self.vectorstore, settings.EMBEDDINGS_KEY
)
docs_temp = docsearch.search(self.question, k=self.chunks)
print(docs_temp)
docs = [
{
"title": (
i.metadata["title"].split("/")[-1]
if i.metadata
else i.page_content
),
"title": i.metadata.get(
"title", i.metadata.get("post_title", i.page_content)
).split("/")[-1],
"text": i.page_content,
"source": (
i.metadata.get("source")
@@ -121,3 +104,15 @@ class ClassicRAG(BaseRetriever):
def search(self):
return self._get_data()
def get_params(self):
return {
"question": self.question,
"source": self.vectorstore,
"chat_history": self.chat_history,
"prompt": self.prompt,
"chunks": self.chunks,
"token_limit": self.token_limit,
"gpt_model": self.gpt_model,
"user_api_key": self.user_api_key
}

View File

@@ -118,3 +118,15 @@ class DuckDuckSearch(BaseRetriever):
def search(self):
return self._get_data()
def get_params(self):
return {
"question": self.question,
"source": self.source,
"chat_history": self.chat_history,
"prompt": self.prompt,
"chunks": self.chunks,
"token_limit": self.token_limit,
"gpt_model": self.gpt_model,
"user_api_key": self.user_api_key
}

View File

@@ -5,15 +5,16 @@ from application.retriever.brave_search import BraveRetSearch
class RetrieverCreator:
retievers = {
retrievers = {
'classic': ClassicRAG,
'duckduck_search': DuckDuckSearch,
'brave_search': BraveRetSearch
'brave_search': BraveRetSearch,
'default': ClassicRAG
}
@classmethod
def create_retriever(cls, type, *args, **kwargs):
retiever_class = cls.retievers.get(type.lower())
retiever_class = cls.retrievers.get(type.lower())
if not retiever_class:
raise ValueError(f"No retievers class found for type {type}")
return retiever_class(*args, **kwargs)

View File

@@ -1,22 +1,48 @@
import tiktoken
import hashlib
from flask import jsonify, make_response
_encoding = None
def get_encoding():
global _encoding
if _encoding is None:
_encoding = tiktoken.get_encoding("cl100k_base")
return _encoding
def num_tokens_from_string(string: str) -> int:
encoding = get_encoding()
num_tokens = len(encoding.encode(string))
return num_tokens
def count_tokens_docs(docs):
docs_content = ""
for doc in docs:
docs_content += doc.page_content
tokens = num_tokens_from_string(docs_content)
return tokens
return tokens
def check_required_fields(data, required_fields):
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return make_response(
jsonify(
{
"success": False,
"message": f"Missing fields: {', '.join(missing_fields)}",
}
),
400,
)
return None
def get_hash(data):
return hashlib.md5(data.encode()).hexdigest()

View File

@@ -9,9 +9,9 @@ import elasticsearch
class ElasticsearchStore(BaseVectorStore):
_es_connection = None # Class attribute to hold the Elasticsearch connection
def __init__(self, path, embeddings_key, index_name=settings.ELASTIC_INDEX):
def __init__(self, source_id, embeddings_key, index_name=settings.ELASTIC_INDEX):
super().__init__()
self.path = path.replace("application/indexes/", "").rstrip("/")
self.source_id = source_id.replace("application/indexes/", "").rstrip("/")
self.embeddings_key = embeddings_key
self.index_name = index_name
@@ -81,7 +81,7 @@ class ElasticsearchStore(BaseVectorStore):
embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, self.embeddings_key)
vector = embeddings.embed_query(question)
knn = {
"filter": [{"match": {"metadata.store.keyword": self.path}}],
"filter": [{"match": {"metadata.source_id.keyword": self.source_id}}],
"field": "vector",
"k": k,
"num_candidates": 100,
@@ -100,7 +100,7 @@ class ElasticsearchStore(BaseVectorStore):
}
}
],
"filter": [{"match": {"metadata.store.keyword": self.path}}],
"filter": [{"match": {"metadata.source_id.keyword": self.source_id}}],
}
},
"rank": {"rrf": {}},
@@ -209,5 +209,4 @@ class ElasticsearchStore(BaseVectorStore):
def delete_index(self):
self._es_connection.delete_by_query(index=self.index_name, query={"match": {
"metadata.store.keyword": self.path}},)
"metadata.source_id.keyword": self.source_id}},)

View File

@@ -1,22 +1,29 @@
from langchain_community.vectorstores import FAISS
from application.vectorstore.base import BaseVectorStore
from application.core.settings import settings
import os
def get_vectorstore(path: str) -> str:
if path:
vectorstore = os.path.join("application", "indexes", path)
else:
vectorstore = os.path.join("application")
return vectorstore
class FaissStore(BaseVectorStore):
def __init__(self, path, embeddings_key, docs_init=None):
def __init__(self, source_id: str, embeddings_key: str, docs_init=None):
super().__init__()
self.path = path
self.path = get_vectorstore(source_id)
embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key)
if docs_init:
self.docsearch = FAISS.from_documents(
docs_init, embeddings
)
else:
self.docsearch = FAISS.load_local(
self.path, embeddings,
allow_dangerous_deserialization=True
)
try:
if docs_init:
self.docsearch = FAISS.from_documents(docs_init, embeddings)
else:
self.docsearch = FAISS.load_local(self.path, embeddings, allow_dangerous_deserialization=True)
except Exception:
raise
self.assert_embedding_dimensions(embeddings)
def search(self, *args, **kwargs):
@@ -32,16 +39,12 @@ class FaissStore(BaseVectorStore):
return self.docsearch.delete(*args, **kwargs)
def assert_embedding_dimensions(self, embeddings):
"""
Check that the word embedding dimension of the docsearch index matches
the dimension of the word embeddings used
"""
"""Check that the word embedding dimension of the docsearch index matches the dimension of the word embeddings used."""
if settings.EMBEDDINGS_NAME == "huggingface_sentence-transformers/all-mpnet-base-v2":
try:
word_embedding_dimension = embeddings.dimension
except AttributeError as e:
raise AttributeError("'dimension' attribute not found in embeddings instance. Make sure the embeddings object is properly initialized.") from e
word_embedding_dimension = getattr(embeddings, 'dimension', None)
if word_embedding_dimension is None:
raise AttributeError("'dimension' attribute not found in embeddings instance.")
docsearch_index_dimension = self.docsearch.index.d
if word_embedding_dimension != docsearch_index_dimension:
raise ValueError(f"Embedding dimension mismatch: embeddings.dimension ({word_embedding_dimension}) " +
f"!= docsearch index dimension ({docsearch_index_dimension})")
raise ValueError(f"Embedding dimension mismatch: embeddings.dimension ({word_embedding_dimension}) != docsearch index dimension ({docsearch_index_dimension})")

View File

@@ -1,11 +1,12 @@
from application.vectorstore.base import BaseVectorStore
from application.core.settings import settings
from application.vectorstore.base import BaseVectorStore
from application.vectorstore.document_class import Document
class MongoDBVectorStore(BaseVectorStore):
def __init__(
self,
path: str = "",
source_id: str = "",
embeddings_key: str = "embeddings",
collection: str = "documents",
index_name: str = "vector_search_index",
@@ -18,7 +19,7 @@ class MongoDBVectorStore(BaseVectorStore):
self._embedding_key = embedding_key
self._embeddings_key = embeddings_key
self._mongo_uri = settings.MONGO_URI
self._path = path.replace("application/indexes/", "").rstrip("/")
self._source_id = source_id.replace("application/indexes/", "").rstrip("/")
self._embedding = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key)
try:
@@ -33,27 +34,24 @@ class MongoDBVectorStore(BaseVectorStore):
self._database = self._client[database]
self._collection = self._database[collection]
def search(self, question, k=2, *args, **kwargs):
query_vector = self._embedding.embed_query(question)
pipeline = [
{
"$vectorSearch": {
"queryVector": query_vector,
"queryVector": query_vector,
"path": self._embedding_key,
"limit": k,
"numCandidates": k * 10,
"limit": k,
"numCandidates": k * 10,
"index": self._index_name,
"filter": {
"store": {"$eq": self._path}
}
"filter": {"source_id": {"$eq": self._source_id}},
}
}
]
cursor = self._collection.aggregate(pipeline)
results = []
for doc in cursor:
text = doc[self._text_key]
@@ -63,30 +61,32 @@ class MongoDBVectorStore(BaseVectorStore):
metadata = doc
results.append(Document(text, metadata))
return results
def _insert_texts(self, texts, metadatas):
if not texts:
return []
embeddings = self._embedding.embed_documents(texts)
to_insert = [
{self._text_key: t, self._embedding_key: embedding, **m}
for t, m, embedding in zip(texts, metadatas, embeddings)
]
# insert the documents in MongoDB Atlas
insert_result = self._collection.insert_many(to_insert)
return insert_result.inserted_ids
def add_texts(self,
def add_texts(
self,
texts,
metadatas = None,
ids = None,
refresh_indices = True,
create_index_if_not_exists = True,
bulk_kwargs = None,
**kwargs,):
metadatas=None,
ids=None,
refresh_indices=True,
create_index_if_not_exists=True,
bulk_kwargs=None,
**kwargs,
):
#dims = self._embedding.client[1].word_embedding_dimension
# dims = self._embedding.client[1].word_embedding_dimension
# # check if index exists
# if create_index_if_not_exists:
# # check if index exists
@@ -121,6 +121,6 @@ class MongoDBVectorStore(BaseVectorStore):
if texts_batch:
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch))
return result_ids
def delete_index(self, *args, **kwargs):
self._collection.delete_many({"store": self._path})
self._collection.delete_many({"source_id": self._source_id})

View File

@@ -5,12 +5,12 @@ from qdrant_client import models
class QdrantStore(BaseVectorStore):
def __init__(self, path: str = "", embeddings_key: str = "embeddings"):
def __init__(self, source_id: str = "", embeddings_key: str = "embeddings"):
self._filter = models.Filter(
must=[
models.FieldCondition(
key="metadata.store",
match=models.MatchValue(value=path.replace("application/indexes/", "").rstrip("/")),
key="metadata.source_id",
match=models.MatchValue(value=source_id.replace("application/indexes/", "").rstrip("/")),
)
]
)

View File

@@ -1,37 +1,44 @@
import logging
import os
import shutil
import string
import zipfile
from collections import Counter
from urllib.parse import urljoin
import logging
import requests
from bson.objectid import ObjectId
from pymongo import MongoClient
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.open_ai_func import call_openai_api
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.schema.base import Document
from application.parser.token_func import group_split
from application.utils import count_tokens_docs
mongo = MongoClient(settings.MONGO_URI)
db = mongo["docsgpt"]
sources_collection = db["sources"]
# Constants
MIN_TOKENS = 150
MAX_TOKENS = 1250
RECURSION_DEPTH = 2
# Define a function to extract metadata from a given filename.
def metadata_from_filename(title):
store = "/".join(title.split("/")[1:3])
return {"title": title, "store": store}
return {"title": title}
# Define a function to generate a random string of a given length.
def generate_random_string(length):
return "".join([string.ascii_letters[i % 52] for i in range(length)])
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
"""
Recursively extract zip files with a limit on recursion depth.
@@ -46,9 +53,13 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
logging.warning(f"Reached maximum recursion depth of {max_depth}")
return
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_to)
os.remove(zip_path) # Remove the zip file after extracting
try:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_to)
os.remove(zip_path) # Remove the zip file after extracting
except Exception as e:
logging.error(f"Error extracting zip file {zip_path}: {e}")
return
# Check for nested zip files and extract them
for root, dirs, files in os.walk(extract_to):
@@ -58,9 +69,43 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
file_path = os.path.join(root, file)
extract_zip_recursive(file_path, root, current_depth + 1, max_depth)
def download_file(url, params, dest_path):
try:
response = requests.get(url, params=params)
response.raise_for_status()
with open(dest_path, "wb") as f:
f.write(response.content)
except requests.RequestException as e:
logging.error(f"Error downloading file: {e}")
raise
def upload_index(full_path, file_data):
try:
if settings.VECTOR_STORE == "faiss":
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
else:
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), data=file_data
)
response.raise_for_status()
except requests.RequestException as e:
logging.error(f"Error uploading index: {e}")
raise
finally:
if settings.VECTOR_STORE == "faiss":
for file in files.values():
file.close()
# Define the main function for ingesting and processing documents.
def ingest_worker(self, directory, formats, name_job, filename, user):
def ingest_worker(
self, directory, formats, name_job, filename, user, retriever="classic"
):
"""
Ingest and process documents.
@@ -71,43 +116,30 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
name_job (str): Name of the job for this ingestion task.
filename (str): Name of the file to be ingested.
user (str): Identifier for the user initiating the ingestion.
retriever (str): Type of retriever to use for processing the documents.
Returns:
dict: Information about the completed ingestion task, including input parameters and a "limited" flag.
"""
# directory = 'inputs' or 'temp'
# formats = [".rst", ".md"]
input_files = None
recursive = True
limit = None
exclude = True
# name_job = 'job1'
# filename = 'install.rst'
# user = 'local'
sample = False
token_check = True
min_tokens = 150
max_tokens = 1250
recursion_depth = 2
full_path = os.path.join(directory, user, name_job)
logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job})
# check if API_URL env variable is set
file_data = {"name": name_job, "file": filename, "user": user}
response = requests.get(
urljoin(settings.API_URL, "/api/download"), params=file_data
)
file = response.content
if not os.path.exists(full_path):
os.makedirs(full_path)
with open(os.path.join(full_path, filename), "wb") as f:
f.write(file)
download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename))
# check if file is .zip and extract it
if filename.endswith(".zip"):
extract_zip_recursive(
os.path.join(full_path, filename), full_path, 0, recursion_depth
os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH
)
self.update_state(state="PROGRESS", meta={"current": 1})
@@ -123,14 +155,15 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
).load_data()
raw_docs = group_split(
documents=raw_docs,
min_tokens=min_tokens,
max_tokens=max_tokens,
min_tokens=MIN_TOKENS,
max_tokens=MAX_TOKENS,
token_check=token_check,
)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
id = ObjectId()
call_openai_api(docs, full_path, self)
call_openai_api(docs, full_path, id, self)
tokens = count_tokens_docs(docs)
self.update_state(state="PROGRESS", meta={"current": 100})
@@ -138,24 +171,13 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
for i in range(min(5, len(raw_docs))):
logging.info(f"Sample document {i}: {raw_docs[i]}")
# get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl
# and send them to the server (provide user and name in form)
file_data = {"name": name_job, "user": user, "tokens":tokens}
if settings.VECTOR_STORE == "faiss":
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
response = requests.get(
urljoin(settings.API_URL, "/api/delete_old?path=" + full_path)
)
else:
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), data=file_data
)
file_data.update({
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": "local",
})
upload_index(full_path, file_data)
# delete local
shutil.rmtree(full_path)
@@ -169,47 +191,114 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
"limited": False,
}
def remote_worker(self, source_data, name_job, user, loader, directory="temp"):
def remote_worker(
self,
source_data,
name_job,
user,
loader,
directory="temp",
retriever="classic",
sync_frequency="never",
operation_mode="upload",
doc_id=None,
):
token_check = True
min_tokens = 150
max_tokens = 1250
full_path = directory + "/" + user + "/" + name_job
full_path = os.path.join(directory, user, name_job)
if not os.path.exists(full_path):
os.makedirs(full_path)
self.update_state(state="PROGRESS", meta={"current": 1})
logging.info(f"Remote job: {full_path}", extra={"user": user, "job": name_job, source_data: source_data})
logging.info(
f"Remote job: {full_path}",
extra={"user": user, "job": name_job, "source_data": source_data},
)
remote_loader = RemoteCreator.create_loader(loader)
raw_docs = remote_loader.load_data(source_data)
docs = group_split(
documents=raw_docs,
min_tokens=min_tokens,
max_tokens=max_tokens,
min_tokens=MIN_TOKENS,
max_tokens=MAX_TOKENS,
token_check=token_check,
)
# docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
call_openai_api(docs, full_path, self)
tokens = count_tokens_docs(docs)
if operation_mode == "upload":
id = ObjectId()
call_openai_api(docs, full_path, id, self)
elif operation_mode == "sync":
if not doc_id or not ObjectId.is_valid(doc_id):
raise ValueError("doc_id must be provided for sync operation.")
id = ObjectId(doc_id)
call_openai_api(docs, full_path, id, self)
self.update_state(state="PROGRESS", meta={"current": 100})
# Proceed with uploading and cleaning as in the original function
file_data = {"name": name_job, "user": user, "tokens":tokens}
if settings.VECTOR_STORE == "faiss":
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path))
else:
requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data)
file_data = {
"name": name_job,
"user": user,
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": loader,
"remote_data": source_data,
"sync_frequency": sync_frequency,
}
upload_index(full_path, file_data)
shutil.rmtree(full_path)
return {"urls": source_data, "name_job": name_job, "user": user, "limited": False}
return {"urls": source_data, "name_job": name_job, "user": user, "limited": False}
def sync(
self,
source_data,
name_job,
user,
loader,
sync_frequency,
retriever,
doc_id=None,
directory="temp",
):
try:
remote_worker(
self,
source_data,
name_job,
user,
loader,
directory,
retriever,
sync_frequency,
"sync",
doc_id,
)
except Exception as e:
logging.error(f"Error during sync: {e}")
return {"status": "error", "error": str(e)}
return {"status": "success"}
def sync_worker(self, frequency):
sync_counts = Counter()
sources = sources_collection.find()
for doc in sources:
if doc.get("sync_frequency") == frequency:
name = doc.get("name")
user = doc.get("user")
source_type = doc.get("type")
source_data = doc.get("remote_data")
retriever = doc.get("retriever")
doc_id = str(doc.get("_id"))
resp = sync(
self, source_data, name, user, source_type, frequency, retriever, doc_id
)
sync_counts["total_sync_count"] += 1
sync_counts[
"sync_success" if resp["status"] == "success" else "sync_failure"
] += 1
return {
key: sync_counts[key]
for key in ["total_sync_count", "sync_success", "sync_failure"]
}