diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index cea6c8ca..2081d6ea 100755 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -6,15 +6,20 @@ from werkzeug.utils import secure_filename from bson.objectid import ObjectId from application.core.settings import settings + mongo = MongoClient(settings.MONGO_URI) db = mongo["docsgpt"] conversations_collection = db["conversations"] sources_collection = db["sources"] -current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +current_dir = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +) + + +internal = Blueprint("internal", __name__) -internal = Blueprint('internal', __name__) @internal.route("/api/download", methods=["get"]) def download_file(): user = secure_filename(request.args.get("user")) @@ -24,7 +29,6 @@ def download_file(): return send_from_directory(save_dir, filename, as_attachment=True) - @internal.route("/api/upload_index", methods=["POST"]) def upload_index_files(): """Upload two files(index.faiss, index.pkl) to the user's folder.""" @@ -38,7 +42,8 @@ def upload_index_files(): retriever = secure_filename(request.form["retriever"]) id = secure_filename(request.form["id"]) type = secure_filename(request.form["type"]) - remote_data = secure_filename(request.form["remote_data"]) if "remote_data" in request.form else None + remote_data = request.form["remote_data"] if "remote_data" in request.form else None + sync_frequency = secure_filename(request.form["sync_frequency"]) save_dir = os.path.join(current_dir, "indexes", str(id)) if settings.VECTOR_STORE == "faiss": @@ -55,24 +60,45 @@ def upload_index_files(): if file_pkl.filename == "": return {"status": "no file name"} # saves index files - + if not os.path.exists(save_dir): os.makedirs(save_dir) file_faiss.save(os.path.join(save_dir, "index.faiss")) file_pkl.save(os.path.join(save_dir, "index.pkl")) - # create entry in sources_collection - sources_collection.insert_one( - { - "_id": ObjectId(id), - "user": user, - "name": job_name, - "language": job_name, - "date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), - "model": settings.EMBEDDINGS_NAME, - "type": type, - "tokens": tokens, - "retriever": retriever, - "remote_data": remote_data - } - ) - return {"status": "ok"} \ No newline at end of file + + existing_entry = sources_collection.find_one({"_id": ObjectId(id)}) + if existing_entry: + sources_collection.update_one( + {"_id": ObjectId(id)}, + { + "$set": { + "user": user, + "name": job_name, + "language": job_name, + "date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), + "model": settings.EMBEDDINGS_NAME, + "type": type, + "tokens": tokens, + "retriever": retriever, + "remote_data": remote_data, + "sync_frequency": sync_frequency, + } + }, + ) + else: + sources_collection.insert_one( + { + "_id": ObjectId(id), + "user": user, + "name": job_name, + "language": job_name, + "date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), + "model": settings.EMBEDDINGS_NAME, + "type": type, + "tokens": tokens, + "retriever": retriever, + "remote_data": remote_data, + "sync_frequency": sync_frequency, + } + ) + return {"status": "ok"} diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 35a9fd46..764195e3 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -289,14 +289,13 @@ def combined_json(): data.append( { "id": str(index["_id"]), - "name": index["name"], - "date": index["date"], + "name": index.get("name"), + "date": index.get("date"), "model": settings.EMBEDDINGS_NAME, "location": "local", - "tokens": index["tokens"] if ("tokens" in index.keys()) else "", - "retriever": ( - index["retriever"] if ("retriever" in index.keys()) else "classic" - ), + "tokens": index.get("tokens", ""), + "retriever": index.get("retriever", "classic"), + "syncFrequency": index.get("sync_frequency", ""), } ) if "duckduck_search" in settings.RETRIEVERS_ENABLED: @@ -1157,3 +1156,27 @@ def get_user_logs(): ), 200, ) + + +@user.route("/api/manage_sync", methods=["POST"]) +def manage_sync(): + data = request.get_json() + source_id = data.get("source_id") + sync_frequency = data.get("sync_frequency") + + if sync_frequency not in ["never", "daily", "weekly", "monthly"]: + return jsonify({"status": "invalid frequency"}), 400 + + update_data = {"$set": {"sync_frequency": sync_frequency}} + try: + sources_collection.update_one( + { + "_id": ObjectId(source_id), + "user": "local", + }, + update_data, + ) + except Exception as err: + print(err) + return jsonify({"status": "error"}), 400 + return jsonify({"status": "ok"}), 200 diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index 862b6dcd..73ad716e 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -1,12 +1,38 @@ -from application.worker import ingest_worker, remote_worker +from datetime import timedelta + from application.celery_init import celery +from application.worker import ingest_worker, remote_worker, sync_worker + @celery.task(bind=True) def ingest(self, directory, formats, name_job, filename, user): resp = ingest_worker(self, directory, formats, name_job, filename, user) return resp + @celery.task(bind=True) def ingest_remote(self, source_data, job_name, user, loader): resp = remote_worker(self, source_data, job_name, user, loader) return resp + + +@celery.task(bind=True) +def schedule_syncs(self, frequency): + resp = sync_worker(self, frequency) + return resp + + +@celery.on_after_configure.connect +def setup_periodic_tasks(sender, **kwargs): + sender.add_periodic_task( + timedelta(days=1), + schedule_syncs.s("daily"), + ) + sender.add_periodic_task( + timedelta(weeks=1), + schedule_syncs.s("weekly"), + ) + sender.add_periodic_task( + timedelta(days=30), + schedule_syncs.s("monthly"), + ) diff --git a/application/parser/open_ai_func.py b/application/parser/open_ai_func.py index 84f92db9..3109f583 100755 --- a/application/parser/open_ai_func.py +++ b/application/parser/open_ai_func.py @@ -1,9 +1,11 @@ import os -from application.vectorstore.vector_creator import VectorCreator -from application.core.settings import settings from retry import retry +from application.core.settings import settings + +from application.vectorstore.vector_creator import VectorCreator + # from langchain_community.embeddings import HuggingFaceEmbeddings # from langchain_community.embeddings import HuggingFaceInstructEmbeddings @@ -12,7 +14,7 @@ from retry import retry @retry(tries=10, delay=60) def store_add_texts_with_retry(store, i, id): - # add source_id to the metadata + # add source_id to the metadata i.metadata["source_id"] = str(id) store.add_texts([i.page_content], metadatas=[i.metadata]) # store_pine.add_texts([i.page_content], metadatas=[i.metadata]) @@ -43,6 +45,7 @@ def call_openai_api(docs, folder_name, id, task_status): source_id=str(id), embeddings_key=os.getenv("EMBEDDINGS_KEY"), ) + store.delete_index() # Uncomment for MPNet embeddings # model_name = "sentence-transformers/all-mpnet-base-v2" # hf = HuggingFaceEmbeddings(model_name=model_name) @@ -70,5 +73,3 @@ def call_openai_api(docs, folder_name, id, task_status): c1 += 1 if settings.VECTOR_STORE == "faiss": store.save_local(f"{folder_name}") - - diff --git a/application/vectorstore/mongodb.py b/application/vectorstore/mongodb.py index 32bca489..c577a5d5 100644 --- a/application/vectorstore/mongodb.py +++ b/application/vectorstore/mongodb.py @@ -1,7 +1,8 @@ -from application.vectorstore.base import BaseVectorStore from application.core.settings import settings +from application.vectorstore.base import BaseVectorStore from application.vectorstore.document_class import Document + class MongoDBVectorStore(BaseVectorStore): def __init__( self, @@ -33,27 +34,24 @@ class MongoDBVectorStore(BaseVectorStore): self._database = self._client[database] self._collection = self._database[collection] - def search(self, question, k=2, *args, **kwargs): query_vector = self._embedding.embed_query(question) pipeline = [ { "$vectorSearch": { - "queryVector": query_vector, + "queryVector": query_vector, "path": self._embedding_key, - "limit": k, - "numCandidates": k * 10, + "limit": k, + "numCandidates": k * 10, "index": self._index_name, - "filter": { - "source_id": {"$eq": self._source_id} - } + "filter": {"source_id": {"$eq": self._source_id}}, } } ] cursor = self._collection.aggregate(pipeline) - + results = [] for doc in cursor: text = doc[self._text_key] @@ -63,30 +61,32 @@ class MongoDBVectorStore(BaseVectorStore): metadata = doc results.append(Document(text, metadata)) return results - + def _insert_texts(self, texts, metadatas): if not texts: return [] embeddings = self._embedding.embed_documents(texts) + to_insert = [ {self._text_key: t, self._embedding_key: embedding, **m} for t, m, embedding in zip(texts, metadatas, embeddings) ] - # insert the documents in MongoDB Atlas + insert_result = self._collection.insert_many(to_insert) return insert_result.inserted_ids - - def add_texts(self, + + def add_texts( + self, texts, - metadatas = None, - ids = None, - refresh_indices = True, - create_index_if_not_exists = True, - bulk_kwargs = None, - **kwargs,): + metadatas=None, + ids=None, + refresh_indices=True, + create_index_if_not_exists=True, + bulk_kwargs=None, + **kwargs, + ): - - #dims = self._embedding.client[1].word_embedding_dimension + # dims = self._embedding.client[1].word_embedding_dimension # # check if index exists # if create_index_if_not_exists: # # check if index exists @@ -121,6 +121,6 @@ class MongoDBVectorStore(BaseVectorStore): if texts_batch: result_ids.extend(self._insert_texts(texts_batch, metadatas_batch)) return result_ids - + def delete_index(self, *args, **kwargs): - self._collection.delete_many({"source_id": self._source_id}) \ No newline at end of file + self._collection.delete_many({"source_id": self._source_id}) diff --git a/application/worker.py b/application/worker.py index 15603908..53f6c06a 100755 --- a/application/worker.py +++ b/application/worker.py @@ -1,21 +1,26 @@ +import logging import os import shutil import string import zipfile +from collections import Counter from urllib.parse import urljoin -import logging import requests from bson.objectid import ObjectId +from pymongo import MongoClient from application.core.settings import settings from application.parser.file.bulk import SimpleDirectoryReader -from application.parser.remote.remote_creator import RemoteCreator from application.parser.open_ai_func import call_openai_api +from application.parser.remote.remote_creator import RemoteCreator from application.parser.schema.base import Document from application.parser.token_func import group_split from application.utils import count_tokens_docs +mongo = MongoClient(settings.MONGO_URI) +db = mongo["docsgpt"] +sources_collection = db["sources"] # Define a function to extract metadata from a given filename. @@ -28,7 +33,9 @@ def generate_random_string(length): return "".join([string.ascii_letters[i % 52] for i in range(length)]) -current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +current_dir = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +) def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): @@ -59,7 +66,9 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): # Define the main function for ingesting and processing documents. -def ingest_worker(self, directory, formats, name_job, filename, user, retriever="classic"): +def ingest_worker( + self, directory, formats, name_job, filename, user, retriever="classic" +): """ Ingest and process documents. @@ -106,7 +115,9 @@ def ingest_worker(self, directory, formats, name_job, filename, user, retriever= # check if file is .zip and extract it if filename.endswith(".zip"): - extract_zip_recursive(os.path.join(full_path, filename), full_path, 0, recursion_depth) + extract_zip_recursive( + os.path.join(full_path, filename), full_path, 0, recursion_depth + ) self.update_state(state="PROGRESS", meta={"current": 1}) @@ -139,15 +150,26 @@ def ingest_worker(self, directory, formats, name_job, filename, user, retriever= # get files from outputs/inputs/index.faiss and outputs/inputs/index.pkl # and send them to the server (provide user and name in form) - file_data = {"name": name_job, "user": user, "tokens": tokens, "retriever": retriever, "id": str(id), 'type': 'local'} + file_data = { + "name": name_job, + "user": user, + "tokens": tokens, + "retriever": retriever, + "id": str(id), + "type": "local", + } if settings.VECTOR_STORE == "faiss": files = { "file_faiss": open(full_path + "/index.faiss", "rb"), "file_pkl": open(full_path + "/index.pkl", "rb"), } - response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) + response = requests.post( + urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data + ) else: - response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) + response = requests.post( + urljoin(settings.API_URL, "/api/upload_index"), data=file_data + ) # delete local shutil.rmtree(full_path) @@ -162,7 +184,18 @@ def ingest_worker(self, directory, formats, name_job, filename, user, retriever= } -def remote_worker(self, source_data, name_job, user, loader, directory="temp", retriever="classic"): +def remote_worker( + self, + source_data, + name_job, + user, + loader, + directory="temp", + retriever="classic", + sync_frequency="never", + operation_mode="upload", + doc_id=None, +): token_check = True min_tokens = 150 max_tokens = 1250 @@ -171,7 +204,10 @@ def remote_worker(self, source_data, name_job, user, loader, directory="temp", r if not os.path.exists(full_path): os.makedirs(full_path) self.update_state(state="PROGRESS", meta={"current": 1}) - logging.info(f"Remote job: {full_path}", extra={"user": user, "job": name_job, source_data: source_data}) + logging.info( + f"Remote job: {full_path}", + extra={"user": user, "job": name_job, source_data: source_data}, + ) remote_loader = RemoteCreator.create_loader(loader) raw_docs = remote_loader.load_data(source_data) @@ -184,23 +220,93 @@ def remote_worker(self, source_data, name_job, user, loader, directory="temp", r ) # docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] tokens = count_tokens_docs(docs) - id = ObjectId() - call_openai_api(docs, full_path, id, self) + if operation_mode == "upload": + id = ObjectId() + call_openai_api(docs, full_path, id, self) + elif operation_mode == "sync": + if not doc_id or not ObjectId.is_valid(doc_id): + raise ValueError("doc_id must be provided for sync operation.") + id = ObjectId(doc_id) + call_openai_api(docs, full_path, id, self) self.update_state(state="PROGRESS", meta={"current": 100}) # Proceed with uploading and cleaning as in the original function - file_data = {"name": name_job, "user": user, "tokens": tokens, "retriever": retriever, - "id": str(id), 'type': loader, 'remote_data': source_data} + file_data = { + "name": name_job, + "user": user, + "tokens": tokens, + "retriever": retriever, + "id": str(id), + "type": loader, + "remote_data": source_data, + "sync_frequency": sync_frequency, + } if settings.VECTOR_STORE == "faiss": files = { "file_faiss": open(full_path + "/index.faiss", "rb"), "file_pkl": open(full_path + "/index.pkl", "rb"), } - requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) + requests.post( + urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data + ) else: requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) shutil.rmtree(full_path) return {"urls": source_data, "name_job": name_job, "user": user, "limited": False} + + +def sync( + self, + source_data, + name_job, + user, + loader, + sync_frequency, + retriever, + doc_id=None, + directory="temp", +): + try: + remote_worker( + self, + source_data, + name_job, + user, + loader, + directory, + retriever, + sync_frequency, + "sync", + doc_id, + ) + except Exception as e: + return {"status": "error", "error": str(e)} + return {"status": "success"} + + +def sync_worker(self, frequency): + sync_counts = Counter() + sources = sources_collection.find() + for doc in sources: + if doc.get("sync_frequency") == frequency: + name = doc.get("name") + user = doc.get("user") + source_type = doc.get("type") + source_data = doc.get("remote_data") + retriever = doc.get("retriever") + doc_id = str(doc.get("_id")) + resp = sync( + self, source_data, name, user, source_type, frequency, retriever, doc_id + ) + sync_counts["total_sync_count"] += 1 + sync_counts[ + "sync_success" if resp["status"] == "success" else "sync_failure" + ] += 1 + + return { + key: sync_counts[key] + for key in ["total_sync_count", "sync_success", "sync_failure"] + } diff --git a/docker-compose.yaml b/docker-compose.yaml index 05c8c059..5b4d48ba 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -30,7 +30,7 @@ services: worker: build: ./application - command: celery -A application.app.celery worker -l INFO + command: celery -A application.app.celery worker -l INFO -B environment: - API_KEY=$API_KEY - EMBEDDINGS_KEY=$API_KEY diff --git a/frontend/src/api/endpoints.ts b/frontend/src/api/endpoints.ts index 742056b5..84674049 100644 --- a/frontend/src/api/endpoints.ts +++ b/frontend/src/api/endpoints.ts @@ -16,6 +16,7 @@ const endpoints = { TOKEN_ANALYTICS: '/api/get_token_analytics', FEEDBACK_ANALYTICS: '/api/get_feedback_analytics', LOGS: `/api/get_user_logs`, + MANAGE_SYNC: '/api/manage_sync', }, CONVERSATION: { ANSWER: '/api/answer', diff --git a/frontend/src/api/services/userService.ts b/frontend/src/api/services/userService.ts index 6d228177..c5bbba7d 100644 --- a/frontend/src/api/services/userService.ts +++ b/frontend/src/api/services/userService.ts @@ -31,6 +31,8 @@ const userService = { apiClient.post(endpoints.USER.FEEDBACK_ANALYTICS, data), getLogs: (data: any): Promise => apiClient.post(endpoints.USER.LOGS, data), + manageSync: (data: any): Promise => + apiClient.post(endpoints.USER.MANAGE_SYNC, data), }; export default userService; diff --git a/frontend/src/assets/sync.svg b/frontend/src/assets/sync.svg new file mode 100644 index 00000000..003dec43 --- /dev/null +++ b/frontend/src/assets/sync.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/frontend/src/components/DropdownMenu.tsx b/frontend/src/components/DropdownMenu.tsx new file mode 100644 index 00000000..787d3b84 --- /dev/null +++ b/frontend/src/components/DropdownMenu.tsx @@ -0,0 +1,88 @@ +import React from 'react'; + +type DropdownMenuProps = { + name: string; + options: { label: string; value: string }[]; + onSelect: (value: string) => void; + defaultValue?: string; + icon?: string; +}; + +export default function DropdownMenu({ + name, + options, + onSelect, + defaultValue = 'none', + icon, +}: DropdownMenuProps) { + const dropdownRef = React.useRef(null); + const [isOpen, setIsOpen] = React.useState(false); + const [selectedOption, setSelectedOption] = React.useState( + options.find((option) => option.value === defaultValue) || options[0], + ); + + const handleToggle = () => { + setIsOpen(!isOpen); + }; + const handleClickOutside = (event: MouseEvent) => { + if ( + dropdownRef.current && + !dropdownRef.current.contains(event.target as Node) + ) { + setIsOpen(false); + } + }; + const handleClickOption = (optionId: number) => { + setIsOpen(false); + setSelectedOption(options[optionId]); + onSelect(options[optionId].value); + }; + + React.useEffect(() => { + document.addEventListener('mousedown', handleClickOutside); + return () => { + document.removeEventListener('mousedown', handleClickOutside); + }; + }, []); + return ( +
+ +
+
+ {options.map((option, idx) => ( + + ))} +
+
+
+ ); +} diff --git a/frontend/src/models/misc.ts b/frontend/src/models/misc.ts index bf77fd0b..9affd0ab 100644 --- a/frontend/src/models/misc.ts +++ b/frontend/src/models/misc.ts @@ -11,6 +11,7 @@ export type Doc = { tokens?: string; type?: string; retriever?: string; + syncFrequency?: string; }; export type PromptProps = { diff --git a/frontend/src/settings/Documents.tsx b/frontend/src/settings/Documents.tsx index 342e2f52..ee88a98f 100644 --- a/frontend/src/settings/Documents.tsx +++ b/frontend/src/settings/Documents.tsx @@ -1,7 +1,14 @@ -import { DocumentsProps } from '../models/misc'; -import Trash from '../assets/trash.svg'; import PropTypes from 'prop-types'; import { useTranslation } from 'react-i18next'; +import { useDispatch } from 'react-redux'; + +import userService from '../api/services/userService'; +import SyncIcon from '../assets/sync.svg'; +import Trash from '../assets/trash.svg'; +import DropdownMenu from '../components/DropdownMenu'; +import { Doc, DocumentsProps } from '../models/misc'; +import { getDocs } from '../preferences/preferenceApi'; +import { setSourceDocs } from '../preferences/preferenceSlice'; // Utility function to format numbers const formatTokens = (tokens: number): string => { @@ -25,10 +32,29 @@ const Documents: React.FC = ({ handleDeleteDocument, }) => { const { t } = useTranslation(); + const dispatch = useDispatch(); + const syncOptions = [ + { label: 'Never', value: 'never' }, + { label: 'Daily', value: 'daily' }, + { label: 'Weekly', value: 'weekly' }, + { label: 'Monthly', value: 'monthly' }, + ]; + + const handleManageSync = (doc: Doc, sync_frequency: string) => { + userService + .manageSync({ source_id: doc.id, sync_frequency }) + .then(() => { + return getDocs(); + }) + .then((data) => { + dispatch(setSourceDocs(data)); + }) + .catch((error) => console.error(error)); + }; return (
-
-
+
+
@@ -64,18 +90,33 @@ const Documents: React.FC = ({ {document.type === 'remote' ? 'Pre-loaded' : 'Private'} ))}
- {document.type !== 'remote' && ( - Delete { - event.stopPropagation(); - handleDeleteDocument(index, document); - }} - /> - )} +
+ {document.type !== 'remote' && ( + Delete { + event.stopPropagation(); + handleDeleteDocument(index, document); + }} + /> + )} + {document.syncFrequency && ( +
+ { + handleManageSync(document, value); + }} + defaultValue={document.syncFrequency} + icon={SyncIcon} + /> +
+ )} +