mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 16:43:16 +00:00
(feat:storage) file, indexes uploads
This commit is contained in:
@@ -133,71 +133,105 @@ def ingest_worker(
|
||||
limit = None
|
||||
exclude = True
|
||||
sample = False
|
||||
full_path = os.path.join(directory, user, name_job)
|
||||
|
||||
logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job})
|
||||
file_data = {"name": name_job, "file": filename, "user": user}
|
||||
|
||||
storage = StorageCreator.create_storage(settings.STORAGE_TYPE)
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
full_path = os.path.join(temp_dir, name_job)
|
||||
|
||||
if not os.path.exists(full_path):
|
||||
os.makedirs(full_path)
|
||||
download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename))
|
||||
|
||||
# check if file is .zip and extract it
|
||||
if filename.endswith(".zip"):
|
||||
extract_zip_recursive(
|
||||
os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH
|
||||
logging.info(f"Ingest file: {directory}/{user}/{name_job}/{filename}", extra={"user": user, "job": name_job})
|
||||
file_data = {"name": name_job, "file": filename, "user": user}
|
||||
|
||||
try:
|
||||
file_path = f"{directory}/{user}/{name_job}/{filename}"
|
||||
|
||||
try:
|
||||
file_obj = storage.get_file(file_path)
|
||||
|
||||
local_file_path = os.path.join(full_path, filename)
|
||||
with open(local_file_path, 'wb') as f:
|
||||
shutil.copyfileobj(file_obj, f)
|
||||
|
||||
# check if file is .zip and extract it
|
||||
if filename.endswith(".zip"):
|
||||
extract_zip_recursive(
|
||||
os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH
|
||||
)
|
||||
except FileNotFoundError as e:
|
||||
logging.error(f"File not found in storage: {file_path}")
|
||||
raise FileNotFoundError(f"File not found: {file_path}") from e
|
||||
|
||||
self.update_state(state="PROGRESS", meta={"current": 1})
|
||||
|
||||
raw_docs = SimpleDirectoryReader(
|
||||
input_dir=full_path,
|
||||
input_files=input_files,
|
||||
recursive=recursive,
|
||||
required_exts=formats,
|
||||
num_files_limit=limit,
|
||||
exclude_hidden=exclude,
|
||||
file_metadata=metadata_from_filename,
|
||||
).load_data()
|
||||
|
||||
chunker = Chunker(
|
||||
chunking_strategy="classic_chunk",
|
||||
max_tokens=MAX_TOKENS,
|
||||
min_tokens=MIN_TOKENS,
|
||||
duplicate_headers=False
|
||||
)
|
||||
raw_docs = chunker.chunk(documents=raw_docs)
|
||||
|
||||
self.update_state(state="PROGRESS", meta={"current": 1})
|
||||
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
|
||||
id = ObjectId()
|
||||
|
||||
raw_docs = SimpleDirectoryReader(
|
||||
input_dir=full_path,
|
||||
input_files=input_files,
|
||||
recursive=recursive,
|
||||
required_exts=formats,
|
||||
num_files_limit=limit,
|
||||
exclude_hidden=exclude,
|
||||
file_metadata=metadata_from_filename,
|
||||
).load_data()
|
||||
vector_dir = os.path.join(temp_dir, "vector_store")
|
||||
os.makedirs(vector_dir, exist_ok=True)
|
||||
|
||||
embed_and_store_documents(docs, vector_dir, str(id), self)
|
||||
tokens = count_tokens_docs(docs)
|
||||
self.update_state(state="PROGRESS", meta={"current": 100})
|
||||
|
||||
chunker = Chunker(
|
||||
chunking_strategy="classic_chunk",
|
||||
max_tokens=MAX_TOKENS,
|
||||
min_tokens=MIN_TOKENS,
|
||||
duplicate_headers=False
|
||||
)
|
||||
raw_docs = chunker.chunk(documents=raw_docs)
|
||||
if sample:
|
||||
for i in range(min(5, len(raw_docs))):
|
||||
logging.info(f"Sample document {i}: {raw_docs[i]}")
|
||||
|
||||
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
|
||||
id = ObjectId()
|
||||
file_data.update({
|
||||
"tokens": tokens,
|
||||
"retriever": retriever,
|
||||
"id": str(id),
|
||||
"type": "local",
|
||||
})
|
||||
|
||||
mongo = MongoDB.get_client()
|
||||
db = mongo["docsgpt"]
|
||||
sources_collection = db["sources"]
|
||||
|
||||
sources_collection.insert_one({
|
||||
"_id": id,
|
||||
"name": name_job,
|
||||
"user": user,
|
||||
"date": datetime.datetime.now(),
|
||||
"tokens": tokens,
|
||||
"retriever": retriever,
|
||||
"type": "local",
|
||||
"storage_type": settings.STORAGE_TYPE,
|
||||
"original_file_path": file_path
|
||||
})
|
||||
|
||||
embed_and_store_documents(docs, full_path, id, self)
|
||||
tokens = count_tokens_docs(docs)
|
||||
self.update_state(state="PROGRESS", meta={"current": 100})
|
||||
|
||||
if sample:
|
||||
for i in range(min(5, len(raw_docs))):
|
||||
logging.info(f"Sample document {i}: {raw_docs[i]}")
|
||||
|
||||
file_data.update({
|
||||
"tokens": tokens,
|
||||
"retriever": retriever,
|
||||
"id": str(id),
|
||||
"type": "local",
|
||||
})
|
||||
upload_index(full_path, file_data)
|
||||
|
||||
# delete local
|
||||
shutil.rmtree(full_path)
|
||||
|
||||
return {
|
||||
"directory": directory,
|
||||
"formats": formats,
|
||||
"name_job": name_job,
|
||||
"filename": filename,
|
||||
"user": user,
|
||||
"limited": False,
|
||||
}
|
||||
return {
|
||||
"directory": directory,
|
||||
"formats": formats,
|
||||
"name_job": name_job,
|
||||
"filename": filename,
|
||||
"user": user,
|
||||
"limited": False,
|
||||
}
|
||||
|
||||
finally:
|
||||
if os.path.exists(temp_dir):
|
||||
shutil.rmtree(temp_dir)
|
||||
|
||||
def remote_worker(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user