From 13fcbe3e74dad4b6f2b37bc093f022d941be9d0b Mon Sep 17 00:00:00 2001 From: Pavel Date: Wed, 15 Jan 2025 01:08:09 +0300 Subject: [PATCH] scraper with markdownify --- application/api/user/routes.py | 2 +- application/parser/remote/crawler_loader.py | 41 +++--- application/parser/remote/crawler_markdown.py | 139 ++++++++++++++++++ application/requirements.txt | 4 +- application/worker.py | 82 ++++++----- 5 files changed, 211 insertions(+), 57 deletions(-) create mode 100644 application/parser/remote/crawler_markdown.py diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 4bcbd719..e516bb71 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -2105,4 +2105,4 @@ class DeleteTool(Resource): except Exception as err: return {"success": False, "error": str(err)}, 400 - return {"success": True}, 200 + return {"success": True}, 200 \ No newline at end of file diff --git a/application/parser/remote/crawler_loader.py b/application/parser/remote/crawler_loader.py index 76325ae6..c2da230b 100644 --- a/application/parser/remote/crawler_loader.py +++ b/application/parser/remote/crawler_loader.py @@ -2,16 +2,16 @@ import requests from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup from application.parser.remote.base import BaseRemote +from application.parser.schema.base import Document +from langchain_community.document_loaders import WebBaseLoader class CrawlerLoader(BaseRemote): def __init__(self, limit=10): - from langchain_community.document_loaders import WebBaseLoader self.loader = WebBaseLoader # Initialize the document loader self.limit = limit # Set the limit for the number of pages to scrape def load_data(self, inputs): url = inputs - # Check if the input is a list and if it is, use the first element if isinstance(url, list) and url: url = url[0] @@ -19,24 +19,29 @@ class CrawlerLoader(BaseRemote): if not urlparse(url).scheme: url = "http://" + url - visited_urls = set() # Keep track of URLs that have been visited - base_url = urlparse(url).scheme + "://" + urlparse(url).hostname # Extract the base URL - urls_to_visit = [url] # List of URLs to be visited, starting with the initial URL - loaded_content = [] # Store the loaded content from each URL + visited_urls = set() + base_url = urlparse(url).scheme + "://" + urlparse(url).hostname + urls_to_visit = [url] + loaded_content = [] - # Continue crawling until there are no more URLs to visit while urls_to_visit: - current_url = urls_to_visit.pop(0) # Get the next URL to visit - visited_urls.add(current_url) # Mark the URL as visited + current_url = urls_to_visit.pop(0) + visited_urls.add(current_url) - # Try to load and process the content from the current URL try: - response = requests.get(current_url) # Fetch the content of the current URL - response.raise_for_status() # Raise an exception for HTTP errors - loader = self.loader([current_url]) # Initialize the document loader for the current URL - loaded_content.extend(loader.load()) # Load the content and add it to the loaded_content list + response = requests.get(current_url) + response.raise_for_status() + loader = self.loader([current_url]) + docs = loader.load() + # Convert the loaded documents to your Document schema + for doc in docs: + loaded_content.append( + Document( + doc.page_content, + extra_info=doc.metadata + ) + ) except Exception as e: - # Print an error message if loading or processing fails and continue with the next URL print(f"Error processing URL {current_url}: {e}") continue @@ -45,15 +50,15 @@ class CrawlerLoader(BaseRemote): all_links = [ urljoin(current_url, a['href']) for a in soup.find_all('a', href=True) - if base_url in urljoin(current_url, a['href']) # Ensure links are from the same domain + if base_url in urljoin(current_url, a['href']) ] # Add new links to the list of URLs to visit if they haven't been visited yet urls_to_visit.extend([link for link in all_links if link not in visited_urls]) - urls_to_visit = list(set(urls_to_visit)) # Remove duplicate URLs + urls_to_visit = list(set(urls_to_visit)) # Stop crawling if the limit of pages to scrape is reached if self.limit is not None and len(visited_urls) >= self.limit: break - return loaded_content # Return the loaded content from all visited URLs + return loaded_content \ No newline at end of file diff --git a/application/parser/remote/crawler_markdown.py b/application/parser/remote/crawler_markdown.py new file mode 100644 index 00000000..3d199332 --- /dev/null +++ b/application/parser/remote/crawler_markdown.py @@ -0,0 +1,139 @@ +import requests +from urllib.parse import urlparse, urljoin +from bs4 import BeautifulSoup +from application.parser.remote.base import BaseRemote +import re +from markdownify import markdownify +from application.parser.schema.base import Document +import tldextract + +class CrawlerLoader(BaseRemote): + def __init__(self, limit=10, allow_subdomains=False): + """ + Given a URL crawl web pages up to `self.limit`, + convert HTML content to Markdown, and returning a list of Document objects. + + :param limit: The maximum number of pages to crawl. + :param allow_subdomains: If True, crawl pages on subdomains of the base domain. + """ + self.limit = limit + self.allow_subdomains = allow_subdomains + self.session = requests.Session() + + def load_data(self, inputs): + url = inputs + if isinstance(url, list) and url: + url = url[0] + + # Ensure the URL has a scheme (if not, default to http) + if not urlparse(url).scheme: + url = "http://" + url + + # Keep track of visited URLs to avoid revisiting the same page + visited_urls = set() + + # Determine the base domain for link filtering using tldextract + base_domain = self._get_base_domain(url) + urls_to_visit = {url} + documents = [] + + while urls_to_visit: + current_url = urls_to_visit.pop() + + # Skip if already visited + if current_url in visited_urls: + continue + visited_urls.add(current_url) + + # Fetch the page content + html_content = self._fetch_page(current_url) + if html_content is None: + continue + + # Convert the HTML to Markdown for cleaner text formatting + title, language, processed_markdown = self._process_html_to_markdown(html_content, current_url) + if processed_markdown: + # Create a Document for each visited page + documents.append( + Document( + processed_markdown, # content + None, # doc_id + None, # embedding + {"source": current_url, "title": title, "language": language} # extra_info + ) + ) + + # Extract links and filter them according to domain rules + new_links = self._extract_links(html_content, current_url) + filtered_links = self._filter_links(new_links, base_domain) + + # Add any new, not-yet-visited links to the queue + urls_to_visit.update(link for link in filtered_links if link not in visited_urls) + + # If we've reached the limit, stop crawling + if self.limit is not None and len(visited_urls) >= self.limit: + break + + return documents + + def _fetch_page(self, url): + try: + response = self.session.get(url, timeout=10) + response.raise_for_status() + return response.text + except requests.exceptions.RequestException as e: + print(f"Error fetching URL {url}: {e}") + return None + + def _process_html_to_markdown(self, html_content, current_url): + soup = BeautifulSoup(html_content, 'html.parser') + title_tag = soup.find('title') + title = title_tag.text.strip() if title_tag else "No Title" + + # Extract language + language_tag = soup.find('html') + language = language_tag.get('lang', 'en') if language_tag else "en" + + markdownified = markdownify(html_content, heading_style="ATX", newline_style="BACKSLASH") + # Reduce sequences of more than two newlines to exactly three + markdownified = re.sub(r'\n{3,}', '\n\n\n', markdownified) + return title, language, markdownified + + def _extract_links(self, html_content, current_url): + soup = BeautifulSoup(html_content, 'html.parser') + links = [] + for a in soup.find_all('a', href=True): + full_url = urljoin(current_url, a['href']) + links.append((full_url, a.text.strip())) + return links + + def _get_base_domain(self, url): + extracted = tldextract.extract(url) + # Reconstruct the domain as domain.suffix + base_domain = f"{extracted.domain}.{extracted.suffix}" + return base_domain + + def _filter_links(self, links, base_domain): + """ + Filter the extracted links to only include those that match the crawling criteria: + - If allow_subdomains is True, allow any link whose domain ends with the base_domain. + - If allow_subdomains is False, only allow exact matches of the base_domain. + """ + filtered = [] + for link, _ in links: + parsed_link = urlparse(link) + if not parsed_link.netloc: + continue + + extracted = tldextract.extract(parsed_link.netloc) + link_base = f"{extracted.domain}.{extracted.suffix}" + + if self.allow_subdomains: + # For subdomains: sub.example.com ends with example.com + if link_base == base_domain or link_base.endswith("." + base_domain): + filtered.append(link) + else: + # Exact domain match + if link_base == base_domain: + filtered.append(link) + return filtered \ No newline at end of file diff --git a/application/requirements.txt b/application/requirements.txt index ca8ba68f..7b756ed3 100644 --- a/application/requirements.txt +++ b/application/requirements.txt @@ -86,4 +86,6 @@ urllib3==2.3.0 vine==5.1.0 wcwidth==0.2.13 werkzeug==3.1.3 -yarl==1.18.3 \ No newline at end of file +yarl==1.18.3 +markdownify==0.14.1 +tldextract==5.1.3 \ No newline at end of file diff --git a/application/worker.py b/application/worker.py index f4f181e5..df0bbe7d 100755 --- a/application/worker.py +++ b/application/worker.py @@ -203,53 +203,61 @@ def remote_worker( sync_frequency="never", operation_mode="upload", doc_id=None, -): +): full_path = os.path.join(directory, user, name_job) - if not os.path.exists(full_path): os.makedirs(full_path) + self.update_state(state="PROGRESS", meta={"current": 1}) - logging.info( - f"Remote job: {full_path}", - extra={"user": user, "job": name_job, "source_data": source_data}, - ) + try: + logging.info("Initializing remote loader with type: %s", loader) + remote_loader = RemoteCreator.create_loader(loader) + raw_docs = remote_loader.load_data(source_data) - remote_loader = RemoteCreator.create_loader(loader) - raw_docs = remote_loader.load_data(source_data) + chunker = Chunker( + chunking_strategy="classic_chunk", + max_tokens=MAX_TOKENS, + min_tokens=MIN_TOKENS, + duplicate_headers=False + ) + docs = chunker.chunk(documents=raw_docs) + docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] + tokens = count_tokens_docs(docs) + logging.info("Total tokens calculated: %d", tokens) - chunker = Chunker( - chunking_strategy="classic_chunk", - max_tokens=MAX_TOKENS, - min_tokens=MIN_TOKENS, - duplicate_headers=False - ) - docs = chunker.chunk(documents=raw_docs) + if operation_mode == "upload": + id = ObjectId() + embed_and_store_documents(docs, full_path, id, self) + elif operation_mode == "sync": + if not doc_id or not ObjectId.is_valid(doc_id): + logging.error("Invalid doc_id provided for sync operation: %s", doc_id) + raise ValueError("doc_id must be provided for sync operation.") + id = ObjectId(doc_id) + embed_and_store_documents(docs, full_path, id, self) - tokens = count_tokens_docs(docs) - if operation_mode == "upload": - id = ObjectId() - embed_and_store_documents(docs, full_path, id, self) - elif operation_mode == "sync": - if not doc_id or not ObjectId.is_valid(doc_id): - raise ValueError("doc_id must be provided for sync operation.") - id = ObjectId(doc_id) - embed_and_store_documents(docs, full_path, id, self) - self.update_state(state="PROGRESS", meta={"current": 100}) + self.update_state(state="PROGRESS", meta={"current": 100}) - file_data = { - "name": name_job, - "user": user, - "tokens": tokens, - "retriever": retriever, - "id": str(id), - "type": loader, - "remote_data": source_data, - "sync_frequency": sync_frequency, - } - upload_index(full_path, file_data) + file_data = { + "name": name_job, + "user": user, + "tokens": tokens, + "retriever": retriever, + "id": str(id), + "type": loader, + "remote_data": source_data, + "sync_frequency": sync_frequency, + } + upload_index(full_path, file_data) - shutil.rmtree(full_path) + except Exception as e: + logging.error("Error in remote_worker task: %s", str(e), exc_info=True) + raise + finally: + if os.path.exists(full_path): + shutil.rmtree(full_path) + + logging.info("remote_worker task completed successfully") return {"urls": source_data, "name_job": name_job, "user": user, "limited": False} def sync(