Merge branch 'main' into feat/analytics-and-logs

This commit is contained in:
Siddhant Rai
2024-09-11 17:58:04 +05:30
committed by GitHub
51 changed files with 1116 additions and 1498 deletions

View File

@@ -9,6 +9,7 @@ import traceback
from pymongo import MongoClient
from bson.objectid import ObjectId
from bson.dbref import DBRef
from application.core.settings import settings
from application.llm.llm_creator import LLMCreator
@@ -20,7 +21,7 @@ logger = logging.getLogger(__name__)
mongo = MongoClient(settings.MONGO_URI)
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
vectors_collection = db["vectors"]
sources_collection = db["sources"]
prompts_collection = db["prompts"]
api_key_collection = db["api_keys"]
user_logs_collection = db["user_logs"]
@@ -37,9 +38,7 @@ if settings.MODEL_NAME: # in case there is particular model name configured
gpt_model = settings.MODEL_NAME
# load the prompts
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
with open(os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r") as f:
chat_combine_template = f.read()
@@ -75,35 +74,34 @@ def run_async_chain(chain, question, chat_history):
def get_data_from_api_key(api_key):
data = api_key_collection.find_one({"key": api_key})
# # Raise custom exception if the API key is not found
if data is None:
raise Exception("Invalid API Key, please generate new key", 401)
if "retriever" not in data:
data["retriever"] = None
if "source" in data and isinstance(data["source"], DBRef):
source_doc = db.dereference(data["source"])
data["source"] = str(source_doc["_id"])
if "retriever" in source_doc:
data["retriever"] = source_doc["retriever"]
else:
data["source"] = {}
return data
def get_vectorstore(data):
if "active_docs" in data:
if data["active_docs"].split("/")[0] == "default":
vectorstore = ""
elif data["active_docs"].split("/")[0] == "local":
vectorstore = "indexes/" + data["active_docs"]
else:
vectorstore = "vectors/" + data["active_docs"]
if data["active_docs"] == "default":
vectorstore = ""
else:
vectorstore = ""
vectorstore = os.path.join("application", vectorstore)
return vectorstore
def get_retriever(source_id: str):
doc = sources_collection.find_one({"_id": ObjectId(source_id)})
if doc is None:
raise Exception("Source document does not exist", 404)
retriever_name = None if "retriever" not in doc else doc["retriever"]
return retriever_name
def is_azure_configured():
return (
settings.OPENAI_API_BASE
and settings.OPENAI_API_VERSION
and settings.AZURE_DEPLOYMENT_NAME
)
return settings.OPENAI_API_BASE and settings.OPENAI_API_VERSION and settings.AZURE_DEPLOYMENT_NAME
def save_conversation(conversation_id, question, response, source_log_docs, llm):
@@ -263,33 +261,33 @@ def stream():
else:
token_limit = settings.DEFAULT_MAX_HISTORY
# check if active_docs or api_key is set
## retriever can be "brave_search, duckduck_search or classic"
retriever_name = data["retriever"] if "retriever" in data else "classic"
# check if active_docs or api_key is set
if "api_key" in data:
data_key = get_data_from_api_key(data["api_key"])
chunks = int(data_key["chunks"])
prompt_id = data_key["prompt_id"]
source = {"active_docs": data_key["source"]}
retriever_name = data_key["retriever"] or retriever_name
user_api_key = data["api_key"]
elif "active_docs" in data:
source = {"active_docs": data["active_docs"]}
source = {"active_docs" : data["active_docs"]}
retriever_name = get_retriever(data["active_docs"]) or retriever_name
user_api_key = None
else:
source = {}
user_api_key = None
if source["active_docs"].split("/")[0] in ["default", "local"]:
retriever_name = "classic"
else:
retriever_name = source["active_docs"]
current_app.logger.info(
f"/stream - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})},
current_app.logger.info(f"/stream - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})}
)
prompt = get_prompt(prompt_id)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
@@ -369,6 +367,10 @@ def api_answer():
else:
token_limit = settings.DEFAULT_MAX_HISTORY
## retriever can be brave_search, duckduck_search or classic
retriever_name = data["retriever"] if "retriever" in data else "classic"
# use try and except to check for exception
try:
# check if the vectorstore is set
if "api_key" in data:
@@ -376,15 +378,15 @@ def api_answer():
chunks = int(data_key["chunks"])
prompt_id = data_key["prompt_id"]
source = {"active_docs": data_key["source"]}
retriever_name = data_key["retriever"] or retriever_name
user_api_key = data["api_key"]
else:
source = data
elif "active_docs" in data:
source = {"active_docs":data["active_docs"]}
retriever_name = get_retriever(data["active_docs"]) or retriever_name
user_api_key = None
if source["active_docs"].split("/")[0] in ["default", "local"]:
retriever_name = "classic"
else:
retriever_name = source["active_docs"]
source = {}
user_api_key = None
prompt = get_prompt(prompt_id)
@@ -421,8 +423,8 @@ def api_answer():
)
result = {"answer": response_full, "sources": source_log_docs}
result["conversation_id"] = save_conversation(
conversation_id, question, response_full, source_log_docs, llm
result["conversation_id"] = str(
save_conversation(conversation_id, question, response_full, source_log_docs, llm)
)
retriever_params = retriever.get_params()
user_logs_collection.insert_one(
@@ -459,19 +461,19 @@ def api_search():
if "api_key" in data:
data_key = get_data_from_api_key(data["api_key"])
chunks = int(data_key["chunks"])
source = {"active_docs": data_key["source"]}
user_api_key = data["api_key"]
source = {"active_docs":data_key["source"]}
user_api_key = data_key["api_key"]
elif "active_docs" in data:
source = {"active_docs": data["active_docs"]}
source = {"active_docs":data["active_docs"]}
user_api_key = None
else:
source = {}
user_api_key = None
if source["active_docs"].split("/")[0] in ["default", "local"]:
retriever_name = "classic"
if "retriever" in data:
retriever_name = data["retriever"]
else:
retriever_name = source["active_docs"]
retriever_name = "classic"
if "token_limit" in data:
token_limit = data["token_limit"]
else:

View File

@@ -3,13 +3,13 @@ import datetime
from flask import Blueprint, request, send_from_directory
from pymongo import MongoClient
from werkzeug.utils import secure_filename
from bson.objectid import ObjectId
from application.core.settings import settings
mongo = MongoClient(settings.MONGO_URI)
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
vectors_collection = db["vectors"]
sources_collection = db["sources"]
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -35,7 +35,12 @@ def upload_index_files():
return {"status": "no name"}
job_name = secure_filename(request.form["name"])
tokens = secure_filename(request.form["tokens"])
save_dir = os.path.join(current_dir, "indexes", user, job_name)
retriever = secure_filename(request.form["retriever"])
id = secure_filename(request.form["id"])
type = secure_filename(request.form["type"])
remote_data = secure_filename(request.form["remote_data"]) if "remote_data" in request.form else None
save_dir = os.path.join(current_dir, "indexes", str(id))
if settings.VECTOR_STORE == "faiss":
if "file_faiss" not in request.files:
print("No file part")
@@ -55,17 +60,19 @@ def upload_index_files():
os.makedirs(save_dir)
file_faiss.save(os.path.join(save_dir, "index.faiss"))
file_pkl.save(os.path.join(save_dir, "index.pkl"))
# create entry in vectors_collection
vectors_collection.insert_one(
# create entry in sources_collection
sources_collection.insert_one(
{
"_id": ObjectId(id),
"user": user,
"name": job_name,
"language": job_name,
"location": save_dir,
"date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"model": settings.EMBEDDINGS_NAME,
"type": "local",
"tokens": tokens
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data
}
)
return {"status": "ok"}

View File

@@ -20,7 +20,7 @@ from application.vectorstore.vector_creator import VectorCreator
mongo = MongoClient(settings.MONGO_URI)
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
vectors_collection = db["vectors"]
sources_collection = db["sources"]
prompts_collection = db["prompts"]
feedback_collection = db["feedback"]
api_key_collection = db["api_keys"]
@@ -30,9 +30,7 @@ user_logs_collection = db["user_logs"]
user = Blueprint("user", __name__)
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def generate_minute_range(start_date, end_date):
@@ -83,9 +81,7 @@ def get_conversations():
conversations = conversations_collection.find().sort("date", -1).limit(30)
list_conversations = []
for conversation in conversations:
list_conversations.append(
{"id": str(conversation["_id"]), "name": conversation["name"]}
)
list_conversations.append({"id": str(conversation["_id"]), "name": conversation["name"]})
# list_conversations = [{"id": "default", "name": "default"}, {"id": "jeff", "name": "jeff"}]
@@ -116,15 +112,10 @@ def api_feedback():
question = data["question"]
answer = data["answer"]
feedback = data["feedback"]
feedback_collection.insert_one(
{
"question": question,
"answer": answer,
"feedback": feedback,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
}
)
new_doc = {"question": question, "answer": answer, "feedback": feedback, "timestamp": datetime.datetime.now(datetime.timezone.utc)}
if "api_key" in data:
new_doc["api_key"] = data["api_key"]
feedback_collection.insert_one(new_doc)
return {"status": "ok"}
@@ -137,7 +128,7 @@ def delete_by_ids():
return {"status": "error"}
if settings.VECTOR_STORE == "faiss":
result = vectors_collection.delete_index(ids=ids)
result = sources_collection.delete_index(ids=ids)
if result:
return {"status": "ok"}
return {"status": "error"}
@@ -147,28 +138,24 @@ def delete_by_ids():
def delete_old():
"""Delete old indexes."""
import shutil
path = request.args.get("path")
dirs = path.split("/")
dirs_clean = []
for i in range(0, len(dirs)):
dirs_clean.append(secure_filename(dirs[i]))
# check that path strats with indexes or vectors
if dirs_clean[0] not in ["indexes", "vectors"]:
return {"status": "error"}
path_clean = "/".join(dirs_clean)
vectors_collection.delete_one({"name": dirs_clean[-1], "user": dirs_clean[-2]})
source_id = request.args.get("source_id")
doc = sources_collection.find_one({
"_id": ObjectId(source_id),
"user": "local",
})
if(doc is None):
return {"status":"not found"},404
if settings.VECTOR_STORE == "faiss":
try:
shutil.rmtree(os.path.join(current_dir, path_clean))
shutil.rmtree(os.path.join(current_dir, str(doc["_id"])))
except FileNotFoundError:
pass
else:
vetorstore = VectorCreator.create_vectorstore(
settings.VECTOR_STORE, path=os.path.join(current_dir, path_clean)
)
vetorstore = VectorCreator.create_vectorstore(settings.VECTOR_STORE, source_id=str(doc["_id"]))
vetorstore.delete_index()
sources_collection.delete_one({
"_id": ObjectId(source_id),
})
return {"status": "ok"}
@@ -202,9 +189,7 @@ def upload_file():
file.save(os.path.join(temp_dir, filename))
# Use shutil.make_archive to zip the temp directory
zip_path = shutil.make_archive(
base_name=os.path.join(save_dir, job_name), format="zip", root_dir=temp_dir
)
zip_path = shutil.make_archive(base_name=os.path.join(save_dir, job_name), format="zip", root_dir=temp_dir)
final_filename = os.path.basename(zip_path)
# Clean up the temporary directory after zipping
@@ -246,9 +231,7 @@ def upload_remote():
source_data = request.form["data"]
if source_data:
task = ingest_remote.delay(
source_data=source_data, job_name=job_name, user=user, loader=source
)
task = ingest_remote.delay(source_data=source_data, job_name=job_name, user=user, loader=source)
task_id = task.id
return {"status": "ok", "task_id": task_id}
else:
@@ -275,54 +258,36 @@ def combined_json():
data = [
{
"name": "default",
"language": "default",
"version": "",
"description": "default",
"fullName": "default",
"date": "default",
"docLink": "default",
"model": settings.EMBEDDINGS_NAME,
"location": "remote",
"tokens": "",
"retriever": "classic",
}
]
# structure: name, language, version, description, fullName, date, docLink
# append data from vectors_collection in sorted order in descending order of date
for index in vectors_collection.find({"user": user}).sort("date", -1):
# append data from sources_collection in sorted order in descending order of date
for index in sources_collection.find({"user": user}).sort("date", -1):
data.append(
{
"id": str(index["_id"]),
"name": index["name"],
"language": index["language"],
"version": "",
"description": index["name"],
"fullName": index["name"],
"date": index["date"],
"docLink": index["location"],
"model": settings.EMBEDDINGS_NAME,
"location": "local",
"tokens": index["tokens"] if ("tokens" in index.keys()) else "",
"retriever": index["retriever"] if ("retriever" in index.keys()) else "classic",
}
)
if settings.VECTOR_STORE == "faiss":
data_remote = requests.get(
"https://d3dg1063dc54p9.cloudfront.net/combined.json"
).json()
for index in data_remote:
index["location"] = "remote"
data.append(index)
if "duckduck_search" in settings.RETRIEVERS_ENABLED:
data.append(
{
"name": "DuckDuckGo Search",
"language": "en",
"version": "",
"description": "duckduck_search",
"fullName": "DuckDuckGo Search",
"date": "duckduck_search",
"docLink": "duckduck_search",
"model": settings.EMBEDDINGS_NAME,
"location": "custom",
"tokens": "",
"retriever": "duckduck_search",
}
)
if "brave_search" in settings.RETRIEVERS_ENABLED:
@@ -330,14 +295,11 @@ def combined_json():
{
"name": "Brave Search",
"language": "en",
"version": "",
"description": "brave_search",
"fullName": "Brave Search",
"date": "brave_search",
"docLink": "brave_search",
"model": settings.EMBEDDINGS_NAME,
"location": "custom",
"tokens": "",
"retriever": "brave_search",
}
)
@@ -346,39 +308,13 @@ def combined_json():
@user.route("/api/docs_check", methods=["POST"])
def check_docs():
# check if docs exist in a vectorstore folder
data = request.get_json()
# split docs on / and take first part
if data["docs"].split("/")[0] == "local":
return {"status": "exists"}
vectorstore = "vectors/" + secure_filename(data["docs"])
base_path = "https://raw.githubusercontent.com/arc53/DocsHUB/main/"
if os.path.exists(vectorstore) or data["docs"] == "default":
return {"status": "exists"}
else:
file_url = urlparse(base_path + vectorstore + "index.faiss")
if (
file_url.scheme in ["https"]
and file_url.netloc == "raw.githubusercontent.com"
and file_url.path.startswith("/arc53/DocsHUB/main/")
):
r = requests.get(file_url.geturl())
if r.status_code != 200:
return {"status": "null"}
else:
if not os.path.exists(vectorstore):
os.makedirs(vectorstore)
with open(vectorstore + "index.faiss", "wb") as f:
f.write(r.content)
r = requests.get(base_path + vectorstore + "index.pkl")
with open(vectorstore + "index.pkl", "wb") as f:
f.write(r.content)
else:
return {"status": "null"}
return {"status": "loaded"}
return {"status": "not found"}
@user.route("/api/create_prompt", methods=["POST"])
@@ -409,9 +345,7 @@ def get_prompts():
list_prompts.append({"id": "creative", "name": "creative", "type": "public"})
list_prompts.append({"id": "strict", "name": "strict", "type": "public"})
for prompt in prompts:
list_prompts.append(
{"id": str(prompt["_id"]), "name": prompt["name"], "type": "private"}
)
list_prompts.append({"id": str(prompt["_id"]), "name": prompt["name"], "type": "private"})
return jsonify(list_prompts)
@@ -420,21 +354,15 @@ def get_prompts():
def get_single_prompt():
prompt_id = request.args.get("id")
if prompt_id == "default":
with open(
os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r"
) as f:
with open(os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r") as f:
chat_combine_template = f.read()
return jsonify({"content": chat_combine_template})
elif prompt_id == "creative":
with open(
os.path.join(current_dir, "prompts", "chat_combine_creative.txt"), "r"
) as f:
with open(os.path.join(current_dir, "prompts", "chat_combine_creative.txt"), "r") as f:
chat_reduce_creative = f.read()
return jsonify({"content": chat_reduce_creative})
elif prompt_id == "strict":
with open(
os.path.join(current_dir, "prompts", "chat_combine_strict.txt"), "r"
) as f:
with open(os.path.join(current_dir, "prompts", "chat_combine_strict.txt"), "r") as f:
chat_reduce_strict = f.read()
return jsonify({"content": chat_reduce_strict})
@@ -463,9 +391,7 @@ def update_prompt_name():
# check if name is null
if name == "":
return {"status": "error"}
prompts_collection.update_one(
{"_id": ObjectId(id)}, {"$set": {"name": name, "content": content}}
)
prompts_collection.update_one({"_id": ObjectId(id)}, {"$set": {"name": name, "content": content}})
return {"status": "ok"}
@@ -475,12 +401,23 @@ def get_api_keys():
keys = api_key_collection.find({"user": user})
list_keys = []
for key in keys:
if "source" in key and isinstance(key["source"],DBRef):
source = db.dereference(key["source"])
if source is None:
continue
else:
source_name = source["name"]
elif "retriever" in key:
source_name = key["retriever"]
else:
continue
list_keys.append(
{
"id": str(key["_id"]),
"name": key["name"],
"key": key["key"][:4] + "..." + key["key"][-4:],
"source": key["source"],
"source": source_name,
"prompt_id": key["prompt_id"],
"chunks": key["chunks"],
}
@@ -492,21 +429,22 @@ def get_api_keys():
def create_api_key():
data = request.get_json()
name = data["name"]
source = data["source"]
prompt_id = data["prompt_id"]
chunks = data["chunks"]
key = str(uuid.uuid4())
user = "local"
resp = api_key_collection.insert_one(
{
"name": name,
"key": key,
"source": source,
"user": user,
"prompt_id": prompt_id,
"chunks": chunks,
}
)
new_api_key = {
"name": name,
"key": key,
"user": user,
"prompt_id": prompt_id,
"chunks": chunks,
}
if "source" in data and ObjectId.is_valid(data["source"]):
new_api_key["source"] = DBRef("sources", ObjectId(data["source"]))
if "retriever" in data:
new_api_key["retriever"] = data["retriever"]
resp = api_key_collection.insert_one(new_api_key)
new_id = str(resp.inserted_id)
return {"id": new_id, "key": key}
@@ -533,36 +471,37 @@ def share_conversation():
conversation_id = data["conversation_id"]
isPromptable = request.args.get("isPromptable").lower() == "true"
conversation = conversations_collection.find_one(
{"_id": ObjectId(conversation_id)}
)
conversation = conversations_collection.find_one({"_id": ObjectId(conversation_id)})
if(conversation is None):
raise Exception("Conversation does not exist")
current_n_queries = len(conversation["queries"])
##generate binary representation of uuid
explicit_binary = Binary.from_uuid(uuid.uuid4(), UuidRepresentation.STANDARD)
if isPromptable:
source = "default" if "source" not in data else data["source"]
prompt_id = "default" if "prompt_id" not in data else data["prompt_id"]
chunks = "2" if "chunks" not in data else data["chunks"]
name = conversation["name"] + "(shared)"
pre_existing_api_document = api_key_collection.find_one(
{
new_api_key_data = {
"prompt_id": prompt_id,
"chunks": chunks,
"source": source,
"user": user,
}
if "source" in data and ObjectId.is_valid(data["source"]):
new_api_key_data["source"] = DBRef("sources",ObjectId(data["source"]))
elif "retriever" in data:
new_api_key_data["retriever"] = data["retriever"]
pre_existing_api_document = api_key_collection.find_one(
new_api_key_data
)
api_uuid = str(uuid.uuid4())
if pre_existing_api_document:
api_uuid = pre_existing_api_document["key"]
pre_existing = shared_conversations_collections.find_one(
{
"conversation_id": DBRef(
"conversations", ObjectId(conversation_id)
),
"conversation_id": DBRef("conversations", ObjectId(conversation_id)),
"isPromptable": isPromptable,
"first_n_queries": current_n_queries,
"user": user,
@@ -593,21 +532,18 @@ def share_conversation():
"api_key": api_uuid,
}
)
return jsonify(
{"success": True, "identifier": str(explicit_binary.as_uuid())}
)
return jsonify({"success": True, "identifier": str(explicit_binary.as_uuid())})
else:
api_key_collection.insert_one(
{
"name": name,
"key": api_uuid,
"source": source,
"user": user,
"prompt_id": prompt_id,
"chunks": chunks,
}
)
shared_conversations_collections.insert_one(
api_uuid = str(uuid.uuid4())
new_api_key_data["key"] = api_uuid
new_api_key_data["name"] = name
if "source" in data and ObjectId.is_valid(data["source"]):
new_api_key_data["source"] = DBRef("sources", ObjectId(data["source"]))
if "retriever" in data:
new_api_key_data["retriever"] = data["retriever"]
api_key_collection.insert_one(new_api_key_data)
shared_conversations_collections.insert_one(
{
"uuid": explicit_binary,
"conversation_id": {
@@ -619,12 +555,10 @@ def share_conversation():
"user": user,
"api_key": api_uuid,
}
)
)
## Identifier as route parameter in frontend
return (
jsonify(
{"success": True, "identifier": str(explicit_binary.as_uuid())}
),
jsonify({"success": True, "identifier": str(explicit_binary.as_uuid())}),
201,
)
@@ -639,9 +573,7 @@ def share_conversation():
)
if pre_existing is not None:
return (
jsonify(
{"success": True, "identifier": str(pre_existing["uuid"].as_uuid())}
),
jsonify({"success": True, "identifier": str(pre_existing["uuid"].as_uuid())}),
200,
)
else:
@@ -659,9 +591,7 @@ def share_conversation():
)
## Identifier as route parameter in frontend
return (
jsonify(
{"success": True, "identifier": str(explicit_binary.as_uuid())}
),
jsonify({"success": True, "identifier": str(explicit_binary.as_uuid())}),
201,
)
except Exception as err:
@@ -673,16 +603,10 @@ def share_conversation():
@user.route("/api/shared_conversation/<string:identifier>", methods=["GET"])
def get_publicly_shared_conversations(identifier: str):
try:
query_uuid = Binary.from_uuid(
uuid.UUID(identifier), UuidRepresentation.STANDARD
)
query_uuid = Binary.from_uuid(uuid.UUID(identifier), UuidRepresentation.STANDARD)
shared = shared_conversations_collections.find_one({"uuid": query_uuid})
conversation_queries = []
if (
shared
and "conversation_id" in shared
and isinstance(shared["conversation_id"], DBRef)
):
if shared and "conversation_id" in shared and isinstance(shared["conversation_id"], DBRef):
# Resolve the DBRef
conversation_ref = shared["conversation_id"]
conversation = db.dereference(conversation_ref)
@@ -696,9 +620,7 @@ def get_publicly_shared_conversations(identifier: str):
),
404,
)
conversation_queries = conversation["queries"][
: (shared["first_n_queries"])
]
conversation_queries = conversation["queries"][: (shared["first_n_queries"])]
for query in conversation_queries:
query.pop("sources") ## avoid exposing sources
else:

View File

@@ -18,7 +18,7 @@ class Settings(BaseSettings):
DEFAULT_MAX_HISTORY: int = 150
MODEL_TOKEN_LIMITS: dict = {"gpt-3.5-turbo": 4096, "claude-2": 1e5}
UPLOAD_FOLDER: str = "inputs"
VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant"
VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus"
RETRIEVERS_ENABLED: list = ["classic_rag", "duckduck_search"] # also brave_search
API_URL: str = "http://localhost:7091" # backend url for celery worker
@@ -62,6 +62,11 @@ class Settings(BaseSettings):
QDRANT_PATH: Optional[str] = None
QDRANT_DISTANCE_FUNC: str = "Cosine"
# Milvus vectorstore config
MILVUS_COLLECTION_NAME: Optional[str] = "docsgpt"
MILVUS_URI: Optional[str] = "./milvus_local.db" # milvus lite version as default
MILVUS_TOKEN: Optional[str] = ""
BRAVE_SEARCH_API_KEY: Optional[str] = None
FLASK_DEBUG_MODE: bool = False

View File

@@ -11,12 +11,14 @@ from retry import retry
@retry(tries=10, delay=60)
def store_add_texts_with_retry(store, i):
def store_add_texts_with_retry(store, i, id):
# add source_id to the metadata
i.metadata["source_id"] = str(id)
store.add_texts([i.page_content], metadatas=[i.metadata])
# store_pine.add_texts([i.page_content], metadatas=[i.metadata])
def call_openai_api(docs, folder_name, task_status):
def call_openai_api(docs, folder_name, id, task_status):
# Function to create a vector store from the documents and save it to disk
if not os.path.exists(f"{folder_name}"):
@@ -32,13 +34,13 @@ def call_openai_api(docs, folder_name, task_status):
store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
docs_init=docs_init,
path=f"{folder_name}",
source_id=f"{folder_name}",
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
else:
store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
path=f"{folder_name}",
source_id=str(id),
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
# Uncomment for MPNet embeddings
@@ -57,7 +59,7 @@ def call_openai_api(docs, folder_name, task_status):
task_status.update_state(
state="PROGRESS", meta={"current": int((c1 / s1) * 100)}
)
store_add_texts_with_retry(store, i)
store_add_texts_with_retry(store, i, id)
except Exception as e:
print(e)
print("Error on ", i)

View File

@@ -9,13 +9,15 @@ EbookLib==0.18
elasticsearch==8.14.0
escodegen==1.0.11
esprima==4.0.1
Flask==3.0.1
faiss-cpu==1.8.0
Flask==3.0.3
faiss-cpu==1.8.0.post1
gunicorn==23.0.0
html2text==2020.1.16
javalang==0.13.0
langchain==0.1.4
langchain-openai==0.0.5
langchain==0.2.16
langchain-community==0.2.16
langchain-core==0.2.38
langchain-openai==0.1.23
openapi3_parser==1.1.16
pandas==2.2.2
pydantic_settings==2.4.0
@@ -26,9 +28,9 @@ qdrant-client==1.11.0
redis==5.0.1
Requests==2.32.0
retry==0.9.2
sentence-transformers
tiktoken
sentence-transformers==3.0.1
tiktoken==0.7.0
torch
tqdm==4.66.3
transformers==4.44.0
Werkzeug==3.0.3
tqdm==4.66.5
transformers==4.44.2
Werkzeug==3.0.4

View File

@@ -1,4 +1,3 @@
import os
from application.retriever.base import BaseRetriever
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator
@@ -21,7 +20,7 @@ class ClassicRAG(BaseRetriever):
user_api_key=None,
):
self.question = question
self.vectorstore = self._get_vectorstore(source=source)
self.vectorstore = source['active_docs'] if 'active_docs' in source else None
self.chat_history = chat_history
self.prompt = prompt
self.chunks = chunks
@@ -38,21 +37,6 @@ class ClassicRAG(BaseRetriever):
)
self.user_api_key = user_api_key
def _get_vectorstore(self, source):
if "active_docs" in source:
if source["active_docs"].split("/")[0] == "default":
vectorstore = ""
elif source["active_docs"].split("/")[0] == "local":
vectorstore = "indexes/" + source["active_docs"]
else:
vectorstore = "vectors/" + source["active_docs"]
if source["active_docs"] == "default":
vectorstore = ""
else:
vectorstore = ""
vectorstore = os.path.join("application", vectorstore)
return vectorstore
def _get_data(self):
if self.chunks == 0:
docs = []

View File

@@ -5,15 +5,16 @@ from application.retriever.brave_search import BraveRetSearch
class RetrieverCreator:
retievers = {
retrievers = {
'classic': ClassicRAG,
'duckduck_search': DuckDuckSearch,
'brave_search': BraveRetSearch
'brave_search': BraveRetSearch,
'default': ClassicRAG
}
@classmethod
def create_retriever(cls, type, *args, **kwargs):
retiever_class = cls.retievers.get(type.lower())
retiever_class = cls.retrievers.get(type.lower())
if not retiever_class:
raise ValueError(f"No retievers class found for type {type}")
return retiever_class(*args, **kwargs)

View File

@@ -1,13 +1,30 @@
from abc import ABC, abstractmethod
import os
from langchain_community.embeddings import (
HuggingFaceEmbeddings,
CohereEmbeddings,
HuggingFaceInstructEmbeddings,
)
from sentence_transformers import SentenceTransformer
from langchain_openai import OpenAIEmbeddings
from application.core.settings import settings
class EmbeddingsWrapper:
def __init__(self, model_name, *args, **kwargs):
self.model = SentenceTransformer(model_name, config_kwargs={'allow_dangerous_deserialization': True}, *args, **kwargs)
self.dimension = self.model.get_sentence_embedding_dimension()
def embed_query(self, query: str):
return self.model.encode(query).tolist()
def embed_documents(self, documents: list):
return self.model.encode(documents).tolist()
def __call__(self, text):
if isinstance(text, str):
return self.embed_query(text)
elif isinstance(text, list):
return self.embed_documents(text)
else:
raise ValueError("Input must be a string or a list of strings")
class EmbeddingsSingleton:
_instances = {}
@@ -23,16 +40,15 @@ class EmbeddingsSingleton:
def _create_instance(embeddings_name, *args, **kwargs):
embeddings_factory = {
"openai_text-embedding-ada-002": OpenAIEmbeddings,
"huggingface_sentence-transformers/all-mpnet-base-v2": HuggingFaceEmbeddings,
"huggingface_sentence-transformers-all-mpnet-base-v2": HuggingFaceEmbeddings,
"huggingface_hkunlp/instructor-large": HuggingFaceInstructEmbeddings,
"cohere_medium": CohereEmbeddings
"huggingface_sentence-transformers/all-mpnet-base-v2": lambda: EmbeddingsWrapper("sentence-transformers/all-mpnet-base-v2"),
"huggingface_sentence-transformers-all-mpnet-base-v2": lambda: EmbeddingsWrapper("sentence-transformers/all-mpnet-base-v2"),
"huggingface_hkunlp/instructor-large": lambda: EmbeddingsWrapper("hkunlp/instructor-large"),
}
if embeddings_name not in embeddings_factory:
raise ValueError(f"Invalid embeddings_name: {embeddings_name}")
return embeddings_factory[embeddings_name](*args, **kwargs)
if embeddings_name in embeddings_factory:
return embeddings_factory[embeddings_name](*args, **kwargs)
else:
return EmbeddingsWrapper(embeddings_name, *args, **kwargs)
class BaseVectorStore(ABC):
def __init__(self):
@@ -58,22 +74,14 @@ class BaseVectorStore(ABC):
embeddings_name,
openai_api_key=embeddings_key
)
elif embeddings_name == "cohere_medium":
embedding_instance = EmbeddingsSingleton.get_instance(
embeddings_name,
cohere_api_key=embeddings_key
)
elif embeddings_name == "huggingface_sentence-transformers/all-mpnet-base-v2":
if os.path.exists("./model/all-mpnet-base-v2"):
embedding_instance = EmbeddingsSingleton.get_instance(
embeddings_name,
model_name="./model/all-mpnet-base-v2",
model_kwargs={"device": "cpu"}
embeddings_name="./model/all-mpnet-base-v2",
)
else:
embedding_instance = EmbeddingsSingleton.get_instance(
embeddings_name,
model_kwargs={"device": "cpu"}
)
else:
embedding_instance = EmbeddingsSingleton.get_instance(embeddings_name)

View File

@@ -9,9 +9,9 @@ import elasticsearch
class ElasticsearchStore(BaseVectorStore):
_es_connection = None # Class attribute to hold the Elasticsearch connection
def __init__(self, path, embeddings_key, index_name=settings.ELASTIC_INDEX):
def __init__(self, source_id, embeddings_key, index_name=settings.ELASTIC_INDEX):
super().__init__()
self.path = path.replace("application/indexes/", "").rstrip("/")
self.source_id = source_id.replace("application/indexes/", "").rstrip("/")
self.embeddings_key = embeddings_key
self.index_name = index_name
@@ -81,7 +81,7 @@ class ElasticsearchStore(BaseVectorStore):
embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, self.embeddings_key)
vector = embeddings.embed_query(question)
knn = {
"filter": [{"match": {"metadata.store.keyword": self.path}}],
"filter": [{"match": {"metadata.source_id.keyword": self.source_id}}],
"field": "vector",
"k": k,
"num_candidates": 100,
@@ -100,7 +100,7 @@ class ElasticsearchStore(BaseVectorStore):
}
}
],
"filter": [{"match": {"metadata.store.keyword": self.path}}],
"filter": [{"match": {"metadata.source_id.keyword": self.source_id}}],
}
},
"rank": {"rrf": {}},
@@ -209,5 +209,4 @@ class ElasticsearchStore(BaseVectorStore):
def delete_index(self):
self._es_connection.delete_by_query(index=self.index_name, query={"match": {
"metadata.store.keyword": self.path}},)
"metadata.source_id.keyword": self.source_id}},)

View File

@@ -1,12 +1,22 @@
from langchain_community.vectorstores import FAISS
from application.vectorstore.base import BaseVectorStore
from application.core.settings import settings
import os
def get_vectorstore(path):
if path:
vectorstore = "indexes/"+path
vectorstore = os.path.join("application", vectorstore)
else:
vectorstore = os.path.join("application")
return vectorstore
class FaissStore(BaseVectorStore):
def __init__(self, path, embeddings_key, docs_init=None):
def __init__(self, source_id, embeddings_key, docs_init=None):
super().__init__()
self.path = path
self.path = get_vectorstore(source_id)
embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key)
if docs_init:
self.docsearch = FAISS.from_documents(
@@ -14,7 +24,8 @@ class FaissStore(BaseVectorStore):
)
else:
self.docsearch = FAISS.load_local(
self.path, embeddings
self.path, embeddings,
allow_dangerous_deserialization=True
)
self.assert_embedding_dimensions(embeddings)
@@ -37,10 +48,10 @@ class FaissStore(BaseVectorStore):
"""
if settings.EMBEDDINGS_NAME == "huggingface_sentence-transformers/all-mpnet-base-v2":
try:
word_embedding_dimension = embeddings.client[1].word_embedding_dimension
word_embedding_dimension = embeddings.dimension
except AttributeError as e:
raise AttributeError("word_embedding_dimension not found in embeddings.client[1]") from e
raise AttributeError("'dimension' attribute not found in embeddings instance. Make sure the embeddings object is properly initialized.") from e
docsearch_index_dimension = self.docsearch.index.d
if word_embedding_dimension != docsearch_index_dimension:
raise ValueError(f"word_embedding_dimension ({word_embedding_dimension}) " +
f"!= docsearch_index_word_embedding_dimension ({docsearch_index_dimension})")
raise ValueError(f"Embedding dimension mismatch: embeddings.dimension ({word_embedding_dimension}) " +
f"!= docsearch index dimension ({docsearch_index_dimension})")

View File

@@ -0,0 +1,37 @@
from typing import List, Optional
from uuid import uuid4
from application.core.settings import settings
from application.vectorstore.base import BaseVectorStore
class MilvusStore(BaseVectorStore):
def __init__(self, path: str = "", embeddings_key: str = "embeddings"):
super().__init__()
from langchain_milvus import Milvus
connection_args = {
"uri": settings.MILVUS_URI,
"token": settings.MILVUS_TOKEN,
}
self._docsearch = Milvus(
embedding_function=self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key),
collection_name=settings.MILVUS_COLLECTION_NAME,
connection_args=connection_args,
)
self._path = path
def search(self, question, k=2, *args, **kwargs):
return self._docsearch.similarity_search(query=question, k=k, filter={"path": self._path} *args, **kwargs)
def add_texts(self, texts: List[str], metadatas: Optional[List[dict]], *args, **kwargs):
ids = [str(uuid4()) for _ in range(len(texts))]
return self._docsearch.add_texts(texts=texts, metadatas=metadatas, ids=ids, *args, **kwargs)
def save_local(self, *args, **kwargs):
pass
def delete_index(self, *args, **kwargs):
pass

View File

@@ -5,7 +5,7 @@ from application.vectorstore.document_class import Document
class MongoDBVectorStore(BaseVectorStore):
def __init__(
self,
path: str = "",
source_id: str = "",
embeddings_key: str = "embeddings",
collection: str = "documents",
index_name: str = "vector_search_index",
@@ -18,7 +18,7 @@ class MongoDBVectorStore(BaseVectorStore):
self._embedding_key = embedding_key
self._embeddings_key = embeddings_key
self._mongo_uri = settings.MONGO_URI
self._path = path.replace("application/indexes/", "").rstrip("/")
self._source_id = source_id.replace("application/indexes/", "").rstrip("/")
self._embedding = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key)
try:
@@ -46,7 +46,7 @@ class MongoDBVectorStore(BaseVectorStore):
"numCandidates": k * 10,
"index": self._index_name,
"filter": {
"store": {"$eq": self._path}
"source_id": {"$eq": self._source_id}
}
}
}
@@ -123,4 +123,4 @@ class MongoDBVectorStore(BaseVectorStore):
return result_ids
def delete_index(self, *args, **kwargs):
self._collection.delete_many({"store": self._path})
self._collection.delete_many({"source_id": self._source_id})

View File

@@ -5,12 +5,12 @@ from qdrant_client import models
class QdrantStore(BaseVectorStore):
def __init__(self, path: str = "", embeddings_key: str = "embeddings"):
def __init__(self, source_id: str = "", embeddings_key: str = "embeddings"):
self._filter = models.Filter(
must=[
models.FieldCondition(
key="metadata.store",
match=models.MatchValue(value=path.replace("application/indexes/", "").rstrip("/")),
key="metadata.source_id",
match=models.MatchValue(value=source_id.replace("application/indexes/", "").rstrip("/")),
)
]
)

View File

@@ -1,5 +1,6 @@
from application.vectorstore.faiss import FaissStore
from application.vectorstore.elasticsearch import ElasticsearchStore
from application.vectorstore.milvus import MilvusStore
from application.vectorstore.mongodb import MongoDBVectorStore
from application.vectorstore.qdrant import QdrantStore
@@ -10,6 +11,7 @@ class VectorCreator:
"elasticsearch": ElasticsearchStore,
"mongodb": MongoDBVectorStore,
"qdrant": QdrantStore,
"milvus": MilvusStore,
}
@classmethod

View File

@@ -6,6 +6,7 @@ from urllib.parse import urljoin
import logging
import requests
from bson.objectid import ObjectId
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
@@ -16,10 +17,10 @@ from application.parser.token_func import group_split
from application.utils import count_tokens_docs
# Define a function to extract metadata from a given filename.
def metadata_from_filename(title):
store = "/".join(title.split("/")[1:3])
return {"title": title, "store": store}
return {"title": title}
# Define a function to generate a random string of a given length.
@@ -27,9 +28,7 @@ def generate_random_string(length):
return "".join([string.ascii_letters[i % 52] for i in range(length)])
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
@@ -60,7 +59,7 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
# Define the main function for ingesting and processing documents.
def ingest_worker(self, directory, formats, name_job, filename, user):
def ingest_worker(self, directory, formats, name_job, filename, user, retriever="classic"):
"""
Ingest and process documents.
@@ -71,6 +70,7 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
name_job (str): Name of the job for this ingestion task.
filename (str): Name of the file to be ingested.
user (str): Identifier for the user initiating the ingestion.
retriever (str): Type of retriever to use for processing the documents.
Returns:
dict: Information about the completed ingestion task, including input parameters and a "limited" flag.
@@ -106,9 +106,7 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
# check if file is .zip and extract it
if filename.endswith(".zip"):
extract_zip_recursive(
os.path.join(full_path, filename), full_path, 0, recursion_depth
)
extract_zip_recursive(os.path.join(full_path, filename), full_path, 0, recursion_depth)
self.update_state(state="PROGRESS", meta={"current": 1})
@@ -129,8 +127,9 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
id = ObjectId()
call_openai_api(docs, full_path, self)
call_openai_api(docs, full_path, id, self)
tokens = count_tokens_docs(docs)
self.update_state(state="PROGRESS", meta={"current": 100})
@@ -140,22 +139,15 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
# get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl
# and send them to the server (provide user and name in form)
file_data = {"name": name_job, "user": user, "tokens":tokens}
file_data = {"name": name_job, "user": user, "tokens": tokens, "retriever": retriever, "id": str(id), 'type': 'local'}
if settings.VECTOR_STORE == "faiss":
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
response = requests.get(
urljoin(settings.API_URL, "/api/delete_old?path=" + full_path)
)
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data)
else:
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), data=file_data
)
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data)
# delete local
shutil.rmtree(full_path)
@@ -170,7 +162,7 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
}
def remote_worker(self, source_data, name_job, user, loader, directory="temp"):
def remote_worker(self, source_data, name_job, user, loader, directory="temp", retriever="classic"):
token_check = True
min_tokens = 150
max_tokens = 1250
@@ -191,25 +183,24 @@ def remote_worker(self, source_data, name_job, user, loader, directory="temp"):
token_check=token_check,
)
# docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
call_openai_api(docs, full_path, self)
tokens = count_tokens_docs(docs)
id = ObjectId()
call_openai_api(docs, full_path, id, self)
self.update_state(state="PROGRESS", meta={"current": 100})
# Proceed with uploading and cleaning as in the original function
file_data = {"name": name_job, "user": user, "tokens":tokens}
file_data = {"name": name_job, "user": user, "tokens": tokens, "retriever": retriever,
"id": str(id), 'type': loader, 'remote_data': source_data}
if settings.VECTOR_STORE == "faiss":
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path))
requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data)
else:
requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data)
shutil.rmtree(full_path)
return {"urls": source_data, "name_job": name_job, "user": user, "limited": False}
return {"urls": source_data, "name_job": name_job, "user": user, "limited": False}