mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 00:23:17 +00:00
159 lines
6.5 KiB
Python
159 lines
6.5 KiB
Python
import base64
|
|
import requests
|
|
import time
|
|
from typing import List, Optional
|
|
from application.parser.remote.base import BaseRemote
|
|
from application.parser.schema.base import Document
|
|
import mimetypes
|
|
from application.core.settings import settings
|
|
|
|
class GitHubLoader(BaseRemote):
|
|
def __init__(self):
|
|
self.access_token = settings.GITHUB_ACCESS_TOKEN
|
|
self.headers = {
|
|
"Authorization": f"token {self.access_token}",
|
|
"Accept": "application/vnd.github.v3+json"
|
|
} if self.access_token else {
|
|
"Accept": "application/vnd.github.v3+json"
|
|
}
|
|
return
|
|
|
|
def is_text_file(self, file_path: str) -> bool:
|
|
"""Determine if a file is a text file based on extension."""
|
|
# Common text file extensions
|
|
text_extensions = {
|
|
'.txt', '.md', '.markdown', '.rst', '.json', '.xml', '.yaml', '.yml',
|
|
'.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.c', '.cpp', '.h', '.hpp',
|
|
'.cs', '.go', '.rs', '.rb', '.php', '.swift', '.kt', '.scala',
|
|
'.html', '.css', '.scss', '.sass', '.less',
|
|
'.sh', '.bash', '.zsh', '.fish',
|
|
'.sql', '.r', '.m', '.mat',
|
|
'.ini', '.cfg', '.conf', '.config', '.env',
|
|
'.gitignore', '.dockerignore', '.editorconfig',
|
|
'.log', '.csv', '.tsv'
|
|
}
|
|
|
|
# Get file extension
|
|
file_lower = file_path.lower()
|
|
for ext in text_extensions:
|
|
if file_lower.endswith(ext):
|
|
return True
|
|
|
|
# Also check MIME type
|
|
mime_type, _ = mimetypes.guess_type(file_path)
|
|
if mime_type and (mime_type.startswith("text") or mime_type in ["application/json", "application/xml"]):
|
|
return True
|
|
|
|
return False
|
|
|
|
def fetch_file_content(self, repo_url: str, file_path: str) -> Optional[str]:
|
|
"""Fetch file content. Returns None if file should be skipped (binary files or empty files)."""
|
|
url = f"https://api.github.com/repos/{repo_url}/contents/{file_path}"
|
|
response = self._make_request(url)
|
|
|
|
content = response.json()
|
|
|
|
if content.get("encoding") == "base64":
|
|
if self.is_text_file(file_path): # Handle only text files
|
|
try:
|
|
decoded_content = base64.b64decode(content["content"]).decode("utf-8").strip()
|
|
# Skip empty files
|
|
if not decoded_content:
|
|
return None
|
|
return decoded_content
|
|
except Exception:
|
|
# If decoding fails, it's probably a binary file
|
|
return None
|
|
else:
|
|
# Skip binary files by returning None
|
|
return None
|
|
else:
|
|
file_content = content['content'].strip()
|
|
# Skip empty files
|
|
if not file_content:
|
|
return None
|
|
return file_content
|
|
|
|
def _make_request(self, url: str, max_retries: int = 3) -> requests.Response:
|
|
"""Make a request with retry logic for rate limiting"""
|
|
for attempt in range(max_retries):
|
|
response = requests.get(url, headers=self.headers)
|
|
|
|
if response.status_code == 200:
|
|
return response
|
|
elif response.status_code == 403:
|
|
# Check if it's a rate limit issue
|
|
try:
|
|
error_data = response.json()
|
|
error_msg = error_data.get("message", "")
|
|
|
|
# Check rate limit headers
|
|
remaining = response.headers.get("X-RateLimit-Remaining", "unknown")
|
|
reset_time = response.headers.get("X-RateLimit-Reset", "unknown")
|
|
|
|
print(f"GitHub API 403 Error: {error_msg}")
|
|
print(f"Rate limit remaining: {remaining}, Reset time: {reset_time}")
|
|
|
|
if "rate limit" in error_msg.lower():
|
|
if attempt < max_retries - 1:
|
|
wait_time = 2 ** attempt # Exponential backoff
|
|
print(f"Rate limit hit, waiting {wait_time} seconds before retry...")
|
|
time.sleep(wait_time)
|
|
continue
|
|
|
|
# Provide helpful error message
|
|
if remaining == "0":
|
|
raise Exception(f"GitHub API rate limit exceeded. Please set GITHUB_ACCESS_TOKEN environment variable. Reset time: {reset_time}")
|
|
else:
|
|
raise Exception(f"GitHub API error: {error_msg}. This may require authentication - set GITHUB_ACCESS_TOKEN environment variable.")
|
|
except Exception as e:
|
|
if isinstance(e, Exception) and "GitHub API" in str(e):
|
|
raise
|
|
# If we can't parse the response, raise the original error
|
|
response.raise_for_status()
|
|
else:
|
|
response.raise_for_status()
|
|
|
|
return response
|
|
|
|
def fetch_repo_files(self, repo_url: str, path: str = "") -> List[str]:
|
|
url = f"https://api.github.com/repos/{repo_url}/contents/{path}"
|
|
response = self._make_request(url)
|
|
|
|
contents = response.json()
|
|
|
|
# Handle error responses from GitHub API
|
|
if isinstance(contents, dict) and "message" in contents:
|
|
raise Exception(f"GitHub API error: {contents.get('message')}")
|
|
|
|
# Ensure contents is a list
|
|
if not isinstance(contents, list):
|
|
raise TypeError(f"Expected list from GitHub API, got {type(contents).__name__}: {contents}")
|
|
|
|
files = []
|
|
for item in contents:
|
|
if item["type"] == "file":
|
|
files.append(item["path"])
|
|
elif item["type"] == "dir":
|
|
files.extend(self.fetch_repo_files(repo_url, item["path"]))
|
|
return files
|
|
|
|
def load_data(self, repo_url: str) -> List[Document]:
|
|
repo_name = repo_url.split("github.com/")[-1]
|
|
files = self.fetch_repo_files(repo_name)
|
|
documents = []
|
|
for file_path in files:
|
|
content = self.fetch_file_content(repo_name, file_path)
|
|
# Skip binary files (content is None)
|
|
if content is None:
|
|
continue
|
|
documents.append(Document(
|
|
text=content,
|
|
doc_id=file_path,
|
|
extra_info={
|
|
"title": file_path,
|
|
"source": f"https://github.com/{repo_name}/blob/main/{file_path}"
|
|
}
|
|
))
|
|
return documents
|