fix: sources display

This commit is contained in:
Alex
2026-02-05 12:58:46 +00:00
parent f41f69a268
commit f17f8162a1
8 changed files with 320 additions and 60 deletions

View File

@@ -61,6 +61,7 @@ def upload_index_files():
file_path = request.form.get("file_path")
directory_structure = request.form.get("directory_structure")
file_name_map = request.form.get("file_name_map")
if directory_structure:
try:
@@ -70,6 +71,14 @@ def upload_index_files():
directory_structure = {}
else:
directory_structure = {}
if file_name_map:
try:
file_name_map = json.loads(file_name_map)
except Exception:
logger.error("Error parsing file_name_map")
file_name_map = None
else:
file_name_map = None
storage = StorageCreator.get_storage()
index_base_path = f"indexes/{id}"
@@ -97,41 +106,43 @@ def upload_index_files():
existing_entry = sources_collection.find_one({"_id": ObjectId(id)})
if existing_entry:
update_fields = {
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now(),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
"file_path": file_path,
"directory_structure": directory_structure,
}
if file_name_map is not None:
update_fields["file_name_map"] = file_name_map
sources_collection.update_one(
{"_id": ObjectId(id)},
{
"$set": {
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now(),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
"file_path": file_path,
"directory_structure": directory_structure,
}
},
{"$set": update_fields},
)
else:
sources_collection.insert_one(
{
"_id": ObjectId(id),
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now(),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
"file_path": file_path,
"directory_structure": directory_structure,
}
)
insert_doc = {
"_id": ObjectId(id),
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now(),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
"file_path": file_path,
"directory_structure": directory_structure,
}
if file_name_map is not None:
insert_doc["file_name_map"] = file_name_map
sources_collection.insert_one(insert_doc)
return {"status": "ok"}

View File

@@ -64,13 +64,16 @@ class UploadFile(Resource):
safe_user = safe_filename(user)
dir_name = safe_filename(job_name)
base_path = f"{settings.UPLOAD_FOLDER}/{safe_user}/{dir_name}"
file_name_map = {}
try:
storage = StorageCreator.get_storage()
for file in files:
original_filename = file.filename
original_filename = os.path.basename(file.filename)
safe_file = safe_filename(original_filename)
if original_filename:
file_name_map[safe_file] = original_filename
with tempfile.TemporaryDirectory() as temp_dir:
temp_file_path = os.path.join(temp_dir, safe_file)
@@ -142,6 +145,7 @@ class UploadFile(Resource):
user,
file_path=base_path,
filename=dir_name,
file_name_map=file_name_map,
)
except Exception as err:
current_app.logger.error(f"Error uploading file: {err}", exc_info=True)
@@ -341,6 +345,14 @@ class ManageSourceFiles(Resource):
storage = StorageCreator.get_storage()
source_file_path = source.get("file_path", "")
parent_dir = request.form.get("parent_dir", "")
file_name_map = source.get("file_name_map") or {}
if isinstance(file_name_map, str):
try:
file_name_map = json.loads(file_name_map)
except Exception:
file_name_map = {}
if not isinstance(file_name_map, dict):
file_name_map = {}
if parent_dir and (parent_dir.startswith("/") or ".." in parent_dir):
return make_response(
@@ -362,19 +374,35 @@ class ManageSourceFiles(Resource):
400,
)
added_files = []
map_updated = False
target_dir = source_file_path
if parent_dir:
target_dir = f"{source_file_path}/{parent_dir}"
for file in files:
if file.filename:
safe_filename_str = safe_filename(file.filename)
original_filename = os.path.basename(file.filename)
safe_filename_str = safe_filename(original_filename)
file_path = f"{target_dir}/{safe_filename_str}"
# Save file to storage
storage.save_file(file, file_path)
added_files.append(safe_filename_str)
if original_filename:
relative_key = (
f"{parent_dir}/{safe_filename_str}"
if parent_dir
else safe_filename_str
)
file_name_map[relative_key] = original_filename
map_updated = True
if map_updated:
sources_collection.update_one(
{"_id": ObjectId(source_id)},
{"$set": {"file_name_map": file_name_map}},
)
# Trigger re-ingestion pipeline
from application.api.user.tasks import reingest_source_task
@@ -421,6 +449,7 @@ class ManageSourceFiles(Resource):
# Remove files from storage and directory structure
removed_files = []
map_updated = False
for file_path in file_paths:
full_path = f"{source_file_path}/{file_path}"
@@ -429,6 +458,15 @@ class ManageSourceFiles(Resource):
if storage.file_exists(full_path):
storage.delete_file(full_path)
removed_files.append(file_path)
if file_path in file_name_map:
file_name_map.pop(file_path, None)
map_updated = True
if map_updated and isinstance(file_name_map, dict):
sources_collection.update_one(
{"_id": ObjectId(source_id)},
{"$set": {"file_name_map": file_name_map}},
)
# Trigger re-ingestion pipeline
from application.api.user.tasks import reingest_source_task
@@ -511,6 +549,20 @@ class ManageSourceFiles(Resource):
f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}, "
f"Full path: {full_directory_path}"
)
if directory_path and file_name_map:
prefix = f"{directory_path.rstrip('/')}/"
keys_to_remove = [
key
for key in file_name_map.keys()
if key == directory_path or key.startswith(prefix)
]
if keys_to_remove:
for key in keys_to_remove:
file_name_map.pop(key, None)
sources_collection.update_one(
{"_id": ObjectId(source_id)},
{"$set": {"file_name_map": file_name_map}},
)
# Trigger re-ingestion pipeline

View File

@@ -14,8 +14,19 @@ from application.worker import (
@celery.task(bind=True)
def ingest(self, directory, formats, job_name, user, file_path, filename):
resp = ingest_worker(self, directory, formats, job_name, file_path, filename, user)
def ingest(
self, directory, formats, job_name, user, file_path, filename, file_name_map=None
):
resp = ingest_worker(
self,
directory,
formats,
job_name,
file_path,
filename,
user,
file_name_map=file_name_map,
)
return resp

View File

@@ -52,6 +52,41 @@ def metadata_from_filename(title):
return {"title": title}
def _normalize_file_name_map(file_name_map):
if not file_name_map:
return {}
if isinstance(file_name_map, str):
try:
file_name_map = json.loads(file_name_map)
except Exception:
return {}
return file_name_map if isinstance(file_name_map, dict) else {}
def _get_display_name(file_name_map, rel_path):
if not file_name_map or not rel_path:
return None
if rel_path in file_name_map:
return file_name_map[rel_path]
base_name = os.path.basename(rel_path)
return file_name_map.get(base_name)
def _apply_display_names_to_structure(structure, file_name_map, prefix=""):
if not isinstance(structure, dict) or not file_name_map:
return structure
for name, node in structure.items():
if isinstance(node, dict) and "type" in node and "size_bytes" in node:
rel_path = f"{prefix}/{name}" if prefix else name
display_name = _get_display_name(file_name_map, rel_path)
if display_name:
node["display_name"] = display_name
elif isinstance(node, dict):
next_prefix = f"{prefix}/{name}" if prefix else name
_apply_display_names_to_structure(node, file_name_map, next_prefix)
return structure
# Define a function to generate a random string of a given length.
@@ -375,7 +410,15 @@ def run_agent_logic(agent_config, input_data):
def ingest_worker(
self, directory, formats, job_name, file_path, filename, user, retriever="classic"
self,
directory,
formats,
job_name,
file_path,
filename,
user,
retriever="classic",
file_name_map=None,
):
"""
Ingest and process documents.
@@ -389,6 +432,7 @@ def ingest_worker(
filename (str): Original unsanitized filename provided by the user.
user (str): Identifier for the user initiating the ingestion (original, unsanitized).
retriever (str): Type of retriever to use for processing the documents.
file_name_map (dict|str|None): Optional mapping of safe relative paths to original filenames.
Returns:
dict: Information about the completed ingestion task, including input parameters and a "limited" flag.
@@ -468,6 +512,22 @@ def ingest_worker(
directory_structure = getattr(reader, "directory_structure", {})
logging.info(f"Directory structure from reader: {directory_structure}")
file_name_map = _normalize_file_name_map(file_name_map)
if file_name_map:
for doc in raw_docs:
extra_info = getattr(doc, "extra_info", None)
if not isinstance(extra_info, dict):
continue
rel_path = extra_info.get("source") or extra_info.get("file_path")
display_name = _get_display_name(file_name_map, rel_path)
if display_name:
display_name = str(display_name)
extra_info["filename"] = display_name
extra_info["file_name"] = display_name
extra_info["title"] = display_name
directory_structure = _apply_display_names_to_structure(
directory_structure, file_name_map
)
chunker = Chunker(
chunking_strategy="classic_chunk",
@@ -504,6 +564,8 @@ def ingest_worker(
"file_path": file_path,
"directory_structure": json.dumps(directory_structure),
}
if file_name_map:
file_data["file_name_map"] = json.dumps(file_name_map)
upload_index(vector_store_path, file_data)
except Exception as e:
@@ -547,6 +609,7 @@ def reingest_source_worker(self, source_id, user):
storage = StorageCreator.get_storage()
source_file_path = source.get("file_path", "")
file_name_map = _normalize_file_name_map(source.get("file_name_map"))
self.update_state(
state="PROGRESS", meta={"current": 20, "status": "Scanning current files"}
@@ -781,6 +844,14 @@ def reingest_source_worker(self, source_id, user):
)
except Exception:
pass
display_name = _get_display_name(
file_name_map, meta.get("source")
)
if display_name:
display_name = str(display_name)
meta["filename"] = display_name
meta["file_name"] = display_name
meta["title"] = display_name
vector_store.add_chunk(d.text, metadata=meta)
added += 1
@@ -795,6 +866,9 @@ def reingest_source_worker(self, source_id, user):
# 3) Update source directory structure timestamp
try:
total_tokens = sum(reader.file_token_counts.values())
directory_structure = _apply_display_names_to_structure(
directory_structure, file_name_map
)
sources_collection.update_one(
{"_id": ObjectId(source_id)},