feat: sync remote sources through celery periodic tasks

This commit is contained in:
Siddhant Rai
2024-09-25 15:20:11 +05:30
parent f92658de82
commit 3d292aa485
13 changed files with 404 additions and 88 deletions

View File

@@ -6,15 +6,20 @@ from werkzeug.utils import secure_filename
from bson.objectid import ObjectId
from application.core.settings import settings
mongo = MongoClient(settings.MONGO_URI)
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
sources_collection = db["sources"]
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
internal = Blueprint("internal", __name__)
internal = Blueprint('internal', __name__)
@internal.route("/api/download", methods=["get"])
def download_file():
user = secure_filename(request.args.get("user"))
@@ -24,7 +29,6 @@ def download_file():
return send_from_directory(save_dir, filename, as_attachment=True)
@internal.route("/api/upload_index", methods=["POST"])
def upload_index_files():
"""Upload two files(index.faiss, index.pkl) to the user's folder."""
@@ -38,7 +42,8 @@ def upload_index_files():
retriever = secure_filename(request.form["retriever"])
id = secure_filename(request.form["id"])
type = secure_filename(request.form["type"])
remote_data = secure_filename(request.form["remote_data"]) if "remote_data" in request.form else None
remote_data = request.form["remote_data"] if "remote_data" in request.form else None
sync_frequency = secure_filename(request.form["sync_frequency"])
save_dir = os.path.join(current_dir, "indexes", str(id))
if settings.VECTOR_STORE == "faiss":
@@ -55,24 +60,45 @@ def upload_index_files():
if file_pkl.filename == "":
return {"status": "no file name"}
# saves index files
if not os.path.exists(save_dir):
os.makedirs(save_dir)
file_faiss.save(os.path.join(save_dir, "index.faiss"))
file_pkl.save(os.path.join(save_dir, "index.pkl"))
# create entry in sources_collection
sources_collection.insert_one(
{
"_id": ObjectId(id),
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data
}
)
return {"status": "ok"}
existing_entry = sources_collection.find_one({"_id": ObjectId(id)})
if existing_entry:
sources_collection.update_one(
{"_id": ObjectId(id)},
{
"$set": {
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
}
},
)
else:
sources_collection.insert_one(
{
"_id": ObjectId(id),
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
}
)
return {"status": "ok"}

View File

@@ -289,14 +289,13 @@ def combined_json():
data.append(
{
"id": str(index["_id"]),
"name": index["name"],
"date": index["date"],
"name": index.get("name"),
"date": index.get("date"),
"model": settings.EMBEDDINGS_NAME,
"location": "local",
"tokens": index["tokens"] if ("tokens" in index.keys()) else "",
"retriever": (
index["retriever"] if ("retriever" in index.keys()) else "classic"
),
"tokens": index.get("tokens", ""),
"retriever": index.get("retriever", "classic"),
"syncFrequency": index.get("sync_frequency", ""),
}
)
if "duckduck_search" in settings.RETRIEVERS_ENABLED:
@@ -1157,3 +1156,27 @@ def get_user_logs():
),
200,
)
@user.route("/api/manage_sync", methods=["POST"])
def manage_sync():
data = request.get_json()
source_id = data.get("source_id")
sync_frequency = data.get("sync_frequency")
if sync_frequency not in ["never", "daily", "weekly", "monthly"]:
return jsonify({"status": "invalid frequency"}), 400
update_data = {"$set": {"sync_frequency": sync_frequency}}
try:
sources_collection.update_one(
{
"_id": ObjectId(source_id),
"user": "local",
},
update_data,
)
except Exception as err:
print(err)
return jsonify({"status": "error"}), 400
return jsonify({"status": "ok"}), 200

View File

@@ -1,12 +1,38 @@
from application.worker import ingest_worker, remote_worker
from datetime import timedelta
from application.celery_init import celery
from application.worker import ingest_worker, remote_worker, sync_worker
@celery.task(bind=True)
def ingest(self, directory, formats, name_job, filename, user):
resp = ingest_worker(self, directory, formats, name_job, filename, user)
return resp
@celery.task(bind=True)
def ingest_remote(self, source_data, job_name, user, loader):
resp = remote_worker(self, source_data, job_name, user, loader)
return resp
@celery.task(bind=True)
def schedule_syncs(self, frequency):
resp = sync_worker(self, frequency)
return resp
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
timedelta(days=1),
schedule_syncs.s("daily"),
)
sender.add_periodic_task(
timedelta(weeks=1),
schedule_syncs.s("weekly"),
)
sender.add_periodic_task(
timedelta(days=30),
schedule_syncs.s("monthly"),
)

View File

@@ -1,9 +1,11 @@
import os
from application.vectorstore.vector_creator import VectorCreator
from application.core.settings import settings
from retry import retry
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator
# from langchain_community.embeddings import HuggingFaceEmbeddings
# from langchain_community.embeddings import HuggingFaceInstructEmbeddings
@@ -12,7 +14,7 @@ from retry import retry
@retry(tries=10, delay=60)
def store_add_texts_with_retry(store, i, id):
# add source_id to the metadata
# add source_id to the metadata
i.metadata["source_id"] = str(id)
store.add_texts([i.page_content], metadatas=[i.metadata])
# store_pine.add_texts([i.page_content], metadatas=[i.metadata])
@@ -43,6 +45,7 @@ def call_openai_api(docs, folder_name, id, task_status):
source_id=str(id),
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
store.delete_index()
# Uncomment for MPNet embeddings
# model_name = "sentence-transformers/all-mpnet-base-v2"
# hf = HuggingFaceEmbeddings(model_name=model_name)
@@ -70,5 +73,3 @@ def call_openai_api(docs, folder_name, id, task_status):
c1 += 1
if settings.VECTOR_STORE == "faiss":
store.save_local(f"{folder_name}")

View File

@@ -1,7 +1,8 @@
from application.vectorstore.base import BaseVectorStore
from application.core.settings import settings
from application.vectorstore.base import BaseVectorStore
from application.vectorstore.document_class import Document
class MongoDBVectorStore(BaseVectorStore):
def __init__(
self,
@@ -33,27 +34,24 @@ class MongoDBVectorStore(BaseVectorStore):
self._database = self._client[database]
self._collection = self._database[collection]
def search(self, question, k=2, *args, **kwargs):
query_vector = self._embedding.embed_query(question)
pipeline = [
{
"$vectorSearch": {
"queryVector": query_vector,
"queryVector": query_vector,
"path": self._embedding_key,
"limit": k,
"numCandidates": k * 10,
"limit": k,
"numCandidates": k * 10,
"index": self._index_name,
"filter": {
"source_id": {"$eq": self._source_id}
}
"filter": {"source_id": {"$eq": self._source_id}},
}
}
]
cursor = self._collection.aggregate(pipeline)
results = []
for doc in cursor:
text = doc[self._text_key]
@@ -63,30 +61,32 @@ class MongoDBVectorStore(BaseVectorStore):
metadata = doc
results.append(Document(text, metadata))
return results
def _insert_texts(self, texts, metadatas):
if not texts:
return []
embeddings = self._embedding.embed_documents(texts)
to_insert = [
{self._text_key: t, self._embedding_key: embedding, **m}
for t, m, embedding in zip(texts, metadatas, embeddings)
]
# insert the documents in MongoDB Atlas
insert_result = self._collection.insert_many(to_insert)
return insert_result.inserted_ids
def add_texts(self,
def add_texts(
self,
texts,
metadatas = None,
ids = None,
refresh_indices = True,
create_index_if_not_exists = True,
bulk_kwargs = None,
**kwargs,):
metadatas=None,
ids=None,
refresh_indices=True,
create_index_if_not_exists=True,
bulk_kwargs=None,
**kwargs,
):
#dims = self._embedding.client[1].word_embedding_dimension
# dims = self._embedding.client[1].word_embedding_dimension
# # check if index exists
# if create_index_if_not_exists:
# # check if index exists
@@ -121,6 +121,6 @@ class MongoDBVectorStore(BaseVectorStore):
if texts_batch:
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch))
return result_ids
def delete_index(self, *args, **kwargs):
self._collection.delete_many({"source_id": self._source_id})
self._collection.delete_many({"source_id": self._source_id})

View File

@@ -1,21 +1,26 @@
import logging
import os
import shutil
import string
import zipfile
from collections import Counter
from urllib.parse import urljoin
import logging
import requests
from bson.objectid import ObjectId
from pymongo import MongoClient
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.open_ai_func import call_openai_api
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.schema.base import Document
from application.parser.token_func import group_split
from application.utils import count_tokens_docs
mongo = MongoClient(settings.MONGO_URI)
db = mongo["docsgpt"]
sources_collection = db["sources"]
# Define a function to extract metadata from a given filename.
@@ -28,7 +33,9 @@ def generate_random_string(length):
return "".join([string.ascii_letters[i % 52] for i in range(length)])
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
@@ -59,7 +66,9 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
# Define the main function for ingesting and processing documents.
def ingest_worker(self, directory, formats, name_job, filename, user, retriever="classic"):
def ingest_worker(
self, directory, formats, name_job, filename, user, retriever="classic"
):
"""
Ingest and process documents.
@@ -106,7 +115,9 @@ def ingest_worker(self, directory, formats, name_job, filename, user, retriever=
# check if file is .zip and extract it
if filename.endswith(".zip"):
extract_zip_recursive(os.path.join(full_path, filename), full_path, 0, recursion_depth)
extract_zip_recursive(
os.path.join(full_path, filename), full_path, 0, recursion_depth
)
self.update_state(state="PROGRESS", meta={"current": 1})
@@ -139,15 +150,26 @@ def ingest_worker(self, directory, formats, name_job, filename, user, retriever=
# get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl
# and send them to the server (provide user and name in form)
file_data = {"name": name_job, "user": user, "tokens": tokens, "retriever": retriever, "id": str(id), 'type': 'local'}
file_data = {
"name": name_job,
"user": user,
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": "local",
}
if settings.VECTOR_STORE == "faiss":
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data)
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
else:
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data)
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), data=file_data
)
# delete local
shutil.rmtree(full_path)
@@ -162,7 +184,18 @@ def ingest_worker(self, directory, formats, name_job, filename, user, retriever=
}
def remote_worker(self, source_data, name_job, user, loader, directory="temp", retriever="classic"):
def remote_worker(
self,
source_data,
name_job,
user,
loader,
directory="temp",
retriever="classic",
sync_frequency="never",
operation_mode="upload",
doc_id=None,
):
token_check = True
min_tokens = 150
max_tokens = 1250
@@ -171,7 +204,10 @@ def remote_worker(self, source_data, name_job, user, loader, directory="temp", r
if not os.path.exists(full_path):
os.makedirs(full_path)
self.update_state(state="PROGRESS", meta={"current": 1})
logging.info(f"Remote job: {full_path}", extra={"user": user, "job": name_job, source_data: source_data})
logging.info(
f"Remote job: {full_path}",
extra={"user": user, "job": name_job, source_data: source_data},
)
remote_loader = RemoteCreator.create_loader(loader)
raw_docs = remote_loader.load_data(source_data)
@@ -184,23 +220,93 @@ def remote_worker(self, source_data, name_job, user, loader, directory="temp", r
)
# docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
tokens = count_tokens_docs(docs)
id = ObjectId()
call_openai_api(docs, full_path, id, self)
if operation_mode == "upload":
id = ObjectId()
call_openai_api(docs, full_path, id, self)
elif operation_mode == "sync":
if not doc_id or not ObjectId.is_valid(doc_id):
raise ValueError("doc_id must be provided for sync operation.")
id = ObjectId(doc_id)
call_openai_api(docs, full_path, id, self)
self.update_state(state="PROGRESS", meta={"current": 100})
# Proceed with uploading and cleaning as in the original function
file_data = {"name": name_job, "user": user, "tokens": tokens, "retriever": retriever,
"id": str(id), 'type': loader, 'remote_data': source_data}
file_data = {
"name": name_job,
"user": user,
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": loader,
"remote_data": source_data,
"sync_frequency": sync_frequency,
}
if settings.VECTOR_STORE == "faiss":
files = {
"file_faiss": open(full_path + "/index.faiss", "rb"),
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data)
requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
else:
requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data)
shutil.rmtree(full_path)
return {"urls": source_data, "name_job": name_job, "user": user, "limited": False}
def sync(
self,
source_data,
name_job,
user,
loader,
sync_frequency,
retriever,
doc_id=None,
directory="temp",
):
try:
remote_worker(
self,
source_data,
name_job,
user,
loader,
directory,
retriever,
sync_frequency,
"sync",
doc_id,
)
except Exception as e:
return {"status": "error", "error": str(e)}
return {"status": "success"}
def sync_worker(self, frequency):
sync_counts = Counter()
sources = sources_collection.find()
for doc in sources:
if doc.get("sync_frequency") == frequency:
name = doc.get("name")
user = doc.get("user")
source_type = doc.get("type")
source_data = doc.get("remote_data")
retriever = doc.get("retriever")
doc_id = str(doc.get("_id"))
resp = sync(
self, source_data, name, user, source_type, frequency, retriever, doc_id
)
sync_counts["total_sync_count"] += 1
sync_counts[
"sync_success" if resp["status"] == "success" else "sync_failure"
] += 1
return {
key: sync_counts[key]
for key in ["total_sync_count", "sync_success", "sync_failure"]
}