Merge pull request #835 from arc53/feature/remote-loads

Feature/remote loads
This commit is contained in:
Alex
2024-03-01 15:42:42 +00:00
committed by GitHub
12 changed files with 461 additions and 45 deletions

View File

@@ -5,7 +5,7 @@ from pymongo import MongoClient
from bson.objectid import ObjectId
from werkzeug.utils import secure_filename
from application.api.user.tasks import ingest
from application.api.user.tasks import ingest, ingest_remote
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator
@@ -157,6 +157,32 @@ def upload_file():
return {"status": "ok", "task_id": task_id}
else:
return {"status": "error"}
@user.route("/api/remote", methods=["POST"])
def upload_remote():
"""Upload a remote source to get vectorized and indexed."""
if "user" not in request.form:
return {"status": "no user"}
user = secure_filename(request.form["user"])
if "source" not in request.form:
return {"status": "no source"}
source = secure_filename(request.form["source"])
if "name" not in request.form:
return {"status": "no name"}
job_name = secure_filename(request.form["name"])
# check if the post request has the file part
if "data" not in request.form:
print("No data")
return {"status": "no data"}
source_data = request.form["data"]
if source_data:
task = ingest_remote.delay(source_data=source_data, job_name=job_name, user=user, loader=source)
# task id
task_id = task.id
return {"status": "ok", "task_id": task_id}
else:
return {"status": "error"}
@user.route("/api/task_status", methods=["GET"])
def task_status():

View File

@@ -1,7 +1,12 @@
from application.worker import ingest_worker
from application.worker import ingest_worker, remote_worker
from application.celery import celery
@celery.task(bind=True)
def ingest(self, directory, formats, name_job, filename, user):
resp = ingest_worker(self, directory, formats, name_job, filename, user)
return resp
@celery.task(bind=True)
def ingest_remote(self, source_data, job_name, user, loader):
resp = remote_worker(self, source_data, job_name, user, loader)
return resp

View File

@@ -0,0 +1,19 @@
"""Base reader class."""
from abc import abstractmethod
from typing import Any, List
from langchain.docstore.document import Document as LCDocument
from application.parser.schema.base import Document
class BaseRemote:
"""Utilities for loading data from a directory."""
@abstractmethod
def load_data(self, *args: Any, **load_kwargs: Any) -> List[Document]:
"""Load data from the input directory."""
def load_langchain_documents(self, **load_kwargs: Any) -> List[LCDocument]:
"""Load data in LangChain document format."""
docs = self.load_data(**load_kwargs)
return [d.to_langchain_format() for d in docs]

View File

@@ -0,0 +1,59 @@
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
from application.parser.remote.base import BaseRemote
class CrawlerLoader(BaseRemote):
def __init__(self, limit=10):
from langchain.document_loaders import WebBaseLoader
self.loader = WebBaseLoader # Initialize the document loader
self.limit = limit # Set the limit for the number of pages to scrape
def load_data(self, inputs):
url = inputs
# Check if the input is a list and if it is, use the first element
if isinstance(url, list) and url:
url = url[0]
# Check if the URL scheme is provided, if not, assume http
if not urlparse(url).scheme:
url = "http://" + url
visited_urls = set() # Keep track of URLs that have been visited
base_url = urlparse(url).scheme + "://" + urlparse(url).hostname # Extract the base URL
urls_to_visit = [url] # List of URLs to be visited, starting with the initial URL
loaded_content = [] # Store the loaded content from each URL
# Continue crawling until there are no more URLs to visit
while urls_to_visit:
current_url = urls_to_visit.pop(0) # Get the next URL to visit
visited_urls.add(current_url) # Mark the URL as visited
# Try to load and process the content from the current URL
try:
response = requests.get(current_url) # Fetch the content of the current URL
response.raise_for_status() # Raise an exception for HTTP errors
loader = self.loader([current_url]) # Initialize the document loader for the current URL
loaded_content.extend(loader.load()) # Load the content and add it to the loaded_content list
except Exception as e:
# Print an error message if loading or processing fails and continue with the next URL
print(f"Error processing URL {current_url}: {e}")
continue
# Parse the HTML content to extract all links
soup = BeautifulSoup(response.text, 'html.parser')
all_links = [
urljoin(current_url, a['href'])
for a in soup.find_all('a', href=True)
if base_url in urljoin(current_url, a['href']) # Ensure links are from the same domain
]
# Add new links to the list of URLs to visit if they haven't been visited yet
urls_to_visit.extend([link for link in all_links if link not in visited_urls])
urls_to_visit = list(set(urls_to_visit)) # Remove duplicate URLs
# Stop crawling if the limit of pages to scrape is reached
if self.limit is not None and len(visited_urls) >= self.limit:
break
return loaded_content # Return the loaded content from all visited URLs

View File

@@ -0,0 +1,18 @@
from application.parser.remote.sitemap_loader import SitemapLoader
from application.parser.remote.crawler_loader import CrawlerLoader
from application.parser.remote.web_loader import WebLoader
class RemoteCreator:
loaders = {
'url': WebLoader,
'sitemap': SitemapLoader,
'crawler': CrawlerLoader
}
@classmethod
def create_loader(cls, type, *args, **kwargs):
loader_class = cls.loaders.get(type.lower())
if not loader_class:
raise ValueError(f"No LLM class found for type {type}")
return loader_class(*args, **kwargs)

View File

@@ -0,0 +1,81 @@
import requests
import re # Import regular expression library
import xml.etree.ElementTree as ET
from application.parser.remote.base import BaseRemote
class SitemapLoader(BaseRemote):
def __init__(self, limit=20):
from langchain.document_loaders import WebBaseLoader
self.loader = WebBaseLoader
self.limit = limit # Adding limit to control the number of URLs to process
def load_data(self, inputs):
sitemap_url= inputs
# Check if the input is a list and if it is, use the first element
if isinstance(sitemap_url, list) and sitemap_url:
url = sitemap_url[0]
urls = self._extract_urls(sitemap_url)
if not urls:
print(f"No URLs found in the sitemap: {sitemap_url}")
return []
# Load content of extracted URLs
documents = []
processed_urls = 0 # Counter for processed URLs
for url in urls:
if self.limit is not None and processed_urls >= self.limit:
break # Stop processing if the limit is reached
try:
loader = self.loader([url])
documents.extend(loader.load())
processed_urls += 1 # Increment the counter after processing each URL
except Exception as e:
print(f"Error processing URL {url}: {e}")
continue
return documents
def _extract_urls(self, sitemap_url):
try:
response = requests.get(sitemap_url)
response.raise_for_status() # Raise an exception for HTTP errors
except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError) as e:
print(f"Failed to fetch sitemap: {sitemap_url}. Error: {e}")
return []
# Determine if this is a sitemap or a URL
if self._is_sitemap(response):
# It's a sitemap, so parse it and extract URLs
return self._parse_sitemap(response.content)
else:
# It's not a sitemap, return the URL itself
return [sitemap_url]
def _is_sitemap(self, response):
content_type = response.headers.get('Content-Type', '')
if 'xml' in content_type or response.url.endswith('.xml'):
return True
if '<sitemapindex' in response.text or '<urlset' in response.text:
return True
return False
def _parse_sitemap(self, sitemap_content):
# Remove namespaces
sitemap_content = re.sub(' xmlns="[^"]+"', '', sitemap_content.decode('utf-8'), count=1)
root = ET.fromstring(sitemap_content)
urls = []
for loc in root.findall('.//url/loc'):
urls.append(loc.text)
# Check for nested sitemaps
for sitemap in root.findall('.//sitemap/loc'):
nested_sitemap_url = sitemap.text
urls.extend(self._extract_urls(nested_sitemap_url))
return urls

View File

@@ -0,0 +1,11 @@
from langchain.document_loader import TelegramChatApiLoader
from application.parser.remote.base import BaseRemote
class TelegramChatApiRemote(BaseRemote):
def _init_parser(self, *args, **load_kwargs):
self.loader = TelegramChatApiLoader(**load_kwargs)
return {}
def parse_file(self, *args, **load_kwargs):
return

View File

@@ -0,0 +1,22 @@
from application.parser.remote.base import BaseRemote
class WebLoader(BaseRemote):
def __init__(self):
from langchain.document_loaders import WebBaseLoader
self.loader = WebBaseLoader
def load_data(self, inputs):
urls = inputs
if isinstance(urls, str):
urls = [urls] # Convert string to list if a single URL is passed
documents = []
for url in urls:
try:
loader = self.loader([url]) # Process URLs one by one
documents.extend(loader.load())
except Exception as e:
print(f"Error processing URL {url}: {e}")
continue # Continue with the next URL if an error occurs
return documents

View File

@@ -9,6 +9,7 @@ import requests
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.open_ai_func import call_openai_api
from application.parser.schema.base import Document
from application.parser.token_func import group_split
@@ -121,3 +122,49 @@ def ingest_worker(self, directory, formats, name_job, filename, user):
'user': user,
'limited': False
}
def remote_worker(self, source_data, name_job, user, directory = 'temp', loader = 'url'):
# sample = False
token_check = True
min_tokens = 150
max_tokens = 1250
full_path = directory + '/' + user + '/' + name_job
if not os.path.exists(full_path):
os.makedirs(full_path)
self.update_state(state='PROGRESS', meta={'current': 1})
# source_data {"data": [url]} for url type task just urls
# Use RemoteCreator to load data from URL
remote_loader = RemoteCreator.create_loader(loader)
raw_docs = remote_loader.load_data(source_data)
docs = group_split(documents=raw_docs, min_tokens=min_tokens, max_tokens=max_tokens, token_check=token_check)
#docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
call_openai_api(docs, full_path, self)
self.update_state(state='PROGRESS', meta={'current': 100})
# Proceed with uploading and cleaning as in the original function
file_data = {'name': name_job, 'user': user}
if settings.VECTOR_STORE == "faiss":
files = {'file_faiss': open(full_path + '/index.faiss', 'rb'),
'file_pkl': open(full_path + '/index.pkl', 'rb')}
requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data)
requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path))
else:
requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data)
shutil.rmtree(full_path)
return {
'urls': source_data,
'name_job': name_job,
'user': user,
'limited': False
}