Add Amazon S3 support and synchronization features (#2244)

* Add Amazon S3 support and synchronization features

* refactor: remove unused variable in load_data test
This commit is contained in:
Alex
2025-12-30 18:26:51 +00:00
committed by GitHub
parent f910a82683
commit 9e7f1ad1c0
22 changed files with 1841 additions and 3 deletions

View File

@@ -9,6 +9,7 @@ from flask_restx import fields, Namespace, Resource
from application.api import api
from application.api.user.base import sources_collection
from application.api.user.tasks import sync_source
from application.core.settings import settings
from application.storage.storage_creator import StorageCreator
from application.utils import check_required_fields
@@ -20,6 +21,21 @@ sources_ns = Namespace(
)
def _get_provider_from_remote_data(remote_data):
if not remote_data:
return None
if isinstance(remote_data, dict):
return remote_data.get("provider")
if isinstance(remote_data, str):
try:
remote_data_obj = json.loads(remote_data)
except Exception:
return None
if isinstance(remote_data_obj, dict):
return remote_data_obj.get("provider")
return None
@sources_ns.route("/sources")
class CombinedJson(Resource):
@api.doc(description="Provide JSON file with combined available indexes")
@@ -41,6 +57,7 @@ class CombinedJson(Resource):
try:
for index in sources_collection.find({"user": user}).sort("date", -1):
provider = _get_provider_from_remote_data(index.get("remote_data"))
data.append(
{
"id": str(index["_id"]),
@@ -51,6 +68,7 @@ class CombinedJson(Resource):
"tokens": index.get("tokens", ""),
"retriever": index.get("retriever", "classic"),
"syncFrequency": index.get("sync_frequency", ""),
"provider": provider,
"is_nested": bool(index.get("directory_structure")),
"type": index.get(
"type", "file"
@@ -107,6 +125,7 @@ class PaginatedSources(Resource):
paginated_docs = []
for doc in documents:
provider = _get_provider_from_remote_data(doc.get("remote_data"))
doc_data = {
"id": str(doc["_id"]),
"name": doc.get("name", ""),
@@ -116,6 +135,7 @@ class PaginatedSources(Resource):
"tokens": doc.get("tokens", ""),
"retriever": doc.get("retriever", "classic"),
"syncFrequency": doc.get("sync_frequency", ""),
"provider": provider,
"isNested": bool(doc.get("directory_structure")),
"type": doc.get("type", "file"),
}
@@ -240,7 +260,7 @@ class ManageSync(Resource):
if not decoded_token:
return make_response(jsonify({"success": False}), 401)
user = decoded_token.get("sub")
data = request.get_json()
data = request.get_json() or {}
required_fields = ["source_id", "sync_frequency"]
missing_fields = check_required_fields(data, required_fields)
if missing_fields:
@@ -269,6 +289,72 @@ class ManageSync(Resource):
return make_response(jsonify({"success": True}), 200)
@sources_ns.route("/sync_source")
class SyncSource(Resource):
sync_source_model = api.model(
"SyncSourceModel",
{"source_id": fields.String(required=True, description="Source ID")},
)
@api.expect(sync_source_model)
@api.doc(description="Trigger an immediate sync for a source")
def post(self):
decoded_token = request.decoded_token
if not decoded_token:
return make_response(jsonify({"success": False}), 401)
user = decoded_token.get("sub")
data = request.get_json()
required_fields = ["source_id"]
missing_fields = check_required_fields(data, required_fields)
if missing_fields:
return missing_fields
source_id = data["source_id"]
if not ObjectId.is_valid(source_id):
return make_response(
jsonify({"success": False, "message": "Invalid source ID"}), 400
)
doc = sources_collection.find_one(
{"_id": ObjectId(source_id), "user": user}
)
if not doc:
return make_response(
jsonify({"success": False, "message": "Source not found"}), 404
)
source_type = doc.get("type", "")
if source_type.startswith("connector"):
return make_response(
jsonify(
{
"success": False,
"message": "Connector sources must be synced via /api/connectors/sync",
}
),
400,
)
source_data = doc.get("remote_data")
if not source_data:
return make_response(
jsonify({"success": False, "message": "Source is not syncable"}), 400
)
try:
task = sync_source.delay(
source_data=source_data,
job_name=doc.get("name", ""),
user=user,
loader=source_type,
sync_frequency=doc.get("sync_frequency", "never"),
retriever=doc.get("retriever", "classic"),
doc_id=source_id,
)
except Exception as err:
current_app.logger.error(
f"Error starting sync for source {source_id}: {err}",
exc_info=True,
)
return make_response(jsonify({"success": False}), 400)
return make_response(jsonify({"success": True, "task_id": task.id}), 200)
@sources_ns.route("/directory_structure")
class DirectoryStructure(Resource):
@api.doc(

View File

@@ -187,6 +187,8 @@ class UploadRemote(Resource):
source_data = config.get("url")
elif data["source"] == "reddit":
source_data = config
elif data["source"] == "s3":
source_data = config
elif data["source"] in ConnectorCreator.get_supported_connectors():
session_token = config.get("session_token")
if not session_token:

View File

@@ -8,6 +8,7 @@ from application.worker import (
mcp_oauth,
mcp_oauth_status,
remote_worker,
sync,
sync_worker,
)
@@ -38,6 +39,30 @@ def schedule_syncs(self, frequency):
return resp
@celery.task(bind=True)
def sync_source(
self,
source_data,
job_name,
user,
loader,
sync_frequency,
retriever,
doc_id,
):
resp = sync(
self,
source_data,
job_name,
user,
loader,
sync_frequency,
retriever,
doc_id,
)
return resp
@celery.task(bind=True)
def store_attachment(self, file_info, user):
resp = attachment_worker(self, file_info, user)