Compare commits

..

1 Commits

Author SHA1 Message Date
Pavel
5a891647bf parser functions change
token_func proposed change to chunking. open_ai_func proposed change to embedding_pipeline. Late chunking first  implementation requires further testing.
2024-11-20 21:40:57 +04:00
378 changed files with 11390 additions and 32560 deletions

View File

@@ -1,15 +0,0 @@
FROM python:3.12-bookworm
# Install Node.js 20.x
RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \
&& apt-get install -y nodejs \
&& rm -rf /var/lib/apt/lists/*
# Install global npm packages
RUN npm install -g husky vite
# Create and activate Python virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
WORKDIR /workspace

View File

@@ -1,49 +0,0 @@
# Welcome to DocsGPT Devcontainer
Welcome to the DocsGPT development environment! This guide will help you get started quickly.
## Starting Services
To run DocsGPT, you need to start three main services: Flask (backend), Celery (task queue), and Vite (frontend). Here are the commands to start each service within the devcontainer:
### Vite (Frontend)
```bash
cd frontend
npm run dev -- --host
```
### Flask (Backend)
```bash
flask --app application/app.py run --host=0.0.0.0 --port=7091
```
### Celery (Task Queue)
```bash
celery -A application.app.celery worker -l INFO
```
## Github Codespaces Instructions
### 1. Make Ports Public:
Go to the "Ports" panel in Codespaces (usually located at the bottom of the VS Code window).
For both port 5173 and 7091, right-click on the port and select "Make Public".
![CleanShot 2025-02-12 at 09 46 14@2x](https://github.com/user-attachments/assets/00a34b16-a7ef-47af-9648-87a7e3008475)
### 2. Update VITE_API_HOST:
After making port 7091 public, copy the public URL provided by Codespaces for port 7091.
Open the file frontend/.env.development.
Find the line VITE_API_HOST=http://localhost:7091.
Replace http://localhost:7091 with the public URL you copied from Codespaces.
![CleanShot 2025-02-12 at 09 46 56@2x](https://github.com/user-attachments/assets/c472242f-1079-4cd8-bc0b-2d78db22b94c)

View File

@@ -1,24 +0,0 @@
{
"name": "DocsGPT Dev Container",
"dockerComposeFile": ["docker-compose-dev.yaml", "docker-compose.override.yaml"],
"service": "dev",
"workspaceFolder": "/workspace",
"postCreateCommand": ".devcontainer/post-create-command.sh",
"forwardPorts": [7091, 5173, 6379, 27017],
"customizations": {
"vscode": {
"extensions": [
"ms-python.python",
"ms-toolsai.jupyter",
"esbenp.prettier-vscode",
"dbaeumer.vscode-eslint"
]
},
"codespaces": {
"openFiles": [
".devcontainer/devc-welcome.md",
"CONTRIBUTING.md"
]
}
}
}

View File

@@ -1,40 +0,0 @@
version: '3.8'
services:
dev:
build:
context: .
dockerfile: Dockerfile
volumes:
- ../:/workspace:cached
command: sleep infinity
depends_on:
redis:
condition: service_healthy
mongo:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- MONGO_URI=mongodb://mongo:27017/docsgpt
- CACHE_REDIS_URL=redis://redis:6379/2
networks:
- default
redis:
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 5
mongo:
healthcheck:
test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"]
interval: 5s
timeout: 30s
retries: 5
networks:
default:
name: docsgpt-dev-network

View File

@@ -1,32 +0,0 @@
#!/bin/bash
set -e # Exit immediately if a command exits with a non-zero status
if [ ! -f frontend/.env.development ]; then
cp -n .env-template frontend/.env.development || true # Assuming .env-template is in the root
fi
# Determine VITE_API_HOST based on environment
if [ -n "$CODESPACES" ]; then
# Running in Codespaces
CODESPACE_NAME=$(echo "$CODESPACES" | cut -d'-' -f1) # Extract codespace name
PUBLIC_API_HOST="https://${CODESPACE_NAME}-7091.${GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN}"
echo "Setting VITE_API_HOST for Codespaces: $PUBLIC_API_HOST in frontend/.env.development"
sed -i "s|VITE_API_HOST=.*|VITE_API_HOST=$PUBLIC_API_HOST|" frontend/.env.development
else
# Not running in Codespaces (local devcontainer)
DEFAULT_API_HOST="http://localhost:7091"
echo "Setting VITE_API_HOST for local dev: $DEFAULT_API_HOST in frontend/.env.development"
sed -i "s|VITE_API_HOST=.*|VITE_API_HOST=$DEFAULT_API_HOST|" frontend/.env.development
fi
mkdir -p model
if [ ! -d model/all-mpnet-base-v2 ]; then
wget -q https://d3dg1063dc54p9.cloudfront.net/models/embeddings/mpnet-base-v2.zip -O model/mpnet-base-v2.zip
unzip -q model/mpnet-base-v2.zip -d model
rm model/mpnet-base-v2.zip
fi
pip install -r application/requirements.txt
cd frontend
npm install --include=dev

3
.github/FUNDING.yml vendored
View File

@@ -1,3 +0,0 @@
# These are supported funding model platforms
github: arc53

View File

@@ -8,12 +8,12 @@ updates:
- package-ecosystem: "pip" # See documentation for possible values
directory: "/application" # Location of package manifests
schedule:
interval: "daily"
interval: "weekly"
- package-ecosystem: "npm" # See documentation for possible values
directory: "/frontend" # Location of package manifests
schedule:
interval: "daily"
interval: "weekly"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
interval: "weekly"

View File

@@ -1,40 +0,0 @@
name: Bandit Security Scan
on:
push:
branches:
- main
pull_request:
types: [opened, synchronize, reopened]
jobs:
bandit_scan:
if: ${{ github.repository == 'arc53/DocsGPT' }}
runs-on: ubuntu-latest
permissions:
security-events: write
actions: read
contents: read
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install bandit # Bandit is needed for this action
if [ -f application/requirements.txt ]; then pip install -r application/requirements.txt; fi
- name: Run Bandit scan
uses: PyCQA/bandit-action@v1
with:
severity: medium
confidence: medium
targets: application/
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -5,33 +5,20 @@ on:
types: [published]
jobs:
build:
deploy:
if: github.repository == 'arc53/DocsGPT'
strategy:
matrix:
include:
- platform: linux/amd64
runner: ubuntu-latest
suffix: amd64
- platform: linux/arm64
runner: ubuntu-24.04-arm
suffix: arm64
runs-on: ${{ matrix.runner }}
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Set up QEMU # Only needed for emulation, not for native arm64 builds
if: matrix.platform == 'linux/arm64'
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
install: true
- name: Login to DockerHub
uses: docker/login-action@v3
@@ -46,67 +33,15 @@ jobs:
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push platform-specific images
- name: Build and push Docker images to docker.io and ghcr.io
uses: docker/build-push-action@v6
with:
file: './application/Dockerfile'
platforms: ${{ matrix.platform }}
platforms: linux/amd64
context: ./application
push: true
tags: |
${{ secrets.DOCKER_USERNAME }}/docsgpt:${{ github.event.release.tag_name }}-${{ matrix.suffix }}
ghcr.io/${{ github.repository_owner }}/docsgpt:${{ github.event.release.tag_name }}-${{ matrix.suffix }}
provenance: false
sbom: false
${{ secrets.DOCKER_USERNAME }}/docsgpt:${{ github.event.release.tag_name }},${{ secrets.DOCKER_USERNAME }}/docsgpt:latest
ghcr.io/${{ github.repository_owner }}/docsgpt:${{ github.event.release.tag_name }},ghcr.io/${{ github.repository_owner }}/docsgpt:latest
cache-from: type=registry,ref=${{ secrets.DOCKER_USERNAME }}/docsgpt:latest
cache-to: type=inline
manifest:
if: github.repository == 'arc53/DocsGPT'
needs: build
runs-on: ubuntu-latest
permissions:
packages: write
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
install: true
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to ghcr.io
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Create and push manifest for DockerHub
run: |
set -e
docker manifest create ${{ secrets.DOCKER_USERNAME }}/docsgpt:${{ github.event.release.tag_name }} \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt:${{ github.event.release.tag_name }}-amd64 \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt:${{ github.event.release.tag_name }}-arm64
docker manifest push ${{ secrets.DOCKER_USERNAME }}/docsgpt:${{ github.event.release.tag_name }}
docker manifest create ${{ secrets.DOCKER_USERNAME }}/docsgpt:latest \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt:${{ github.event.release.tag_name }}-amd64 \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt:${{ github.event.release.tag_name }}-arm64
docker manifest push ${{ secrets.DOCKER_USERNAME }}/docsgpt:latest
- name: Create and push manifest for ghcr.io
run: |
set -e
docker manifest create ghcr.io/${{ github.repository_owner }}/docsgpt:${{ github.event.release.tag_name }} \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt:${{ github.event.release.tag_name }}-amd64 \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt:${{ github.event.release.tag_name }}-arm64
docker manifest push ghcr.io/${{ github.repository_owner }}/docsgpt:${{ github.event.release.tag_name }}
docker manifest create ghcr.io/${{ github.repository_owner }}/docsgpt:latest \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt:${{ github.event.release.tag_name }}-amd64 \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt:${{ github.event.release.tag_name }}-arm64
docker manifest push ghcr.io/${{ github.repository_owner }}/docsgpt:latest

View File

@@ -5,33 +5,20 @@ on:
types: [published]
jobs:
build:
deploy:
if: github.repository == 'arc53/DocsGPT'
strategy:
matrix:
include:
- platform: linux/amd64
runner: ubuntu-latest
suffix: amd64
- platform: linux/arm64
runner: ubuntu-24.04-arm
suffix: arm64
runs-on: ${{ matrix.runner }}
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Set up QEMU # Only needed for emulation, not for native arm64 builds
if: matrix.platform == 'linux/arm64'
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
install: true
- name: Login to DockerHub
uses: docker/login-action@v3
@@ -46,67 +33,16 @@ jobs:
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push platform-specific images
# Runs a single command using the runners shell
- name: Build and push Docker images to docker.io and ghcr.io
uses: docker/build-push-action@v6
with:
file: './frontend/Dockerfile'
platforms: ${{ matrix.platform }}
platforms: linux/amd64, linux/arm64
context: ./frontend
push: true
tags: |
${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:${{ github.event.release.tag_name }}-${{ matrix.suffix }}
ghcr.io/${{ github.repository_owner }}/docsgpt-fe:${{ github.event.release.tag_name }}-${{ matrix.suffix }}
provenance: false
sbom: false
${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:${{ github.event.release.tag_name }},${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:latest
ghcr.io/${{ github.repository_owner }}/docsgpt-fe:${{ github.event.release.tag_name }},ghcr.io/${{ github.repository_owner }}/docsgpt-fe:latest
cache-from: type=registry,ref=${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:latest
cache-to: type=inline
manifest:
if: github.repository == 'arc53/DocsGPT'
needs: build
runs-on: ubuntu-latest
permissions:
packages: write
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
install: true
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to ghcr.io
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Create and push manifest for DockerHub
run: |
set -e
docker manifest create ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:${{ github.event.release.tag_name }} \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:${{ github.event.release.tag_name }}-amd64 \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:${{ github.event.release.tag_name }}-arm64
docker manifest push ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:${{ github.event.release.tag_name }}
docker manifest create ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:latest \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:${{ github.event.release.tag_name }}-amd64 \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:${{ github.event.release.tag_name }}-arm64
docker manifest push ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:latest
- name: Create and push manifest for ghcr.io
run: |
set -e
docker manifest create ghcr.io/${{ github.repository_owner }}/docsgpt-fe:${{ github.event.release.tag_name }} \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt-fe:${{ github.event.release.tag_name }}-amd64 \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt-fe:${{ github.event.release.tag_name }}-arm64
docker manifest push ghcr.io/${{ github.repository_owner }}/docsgpt-fe:${{ github.event.release.tag_name }}
docker manifest create ghcr.io/${{ github.repository_owner }}/docsgpt-fe:latest \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt-fe:${{ github.event.release.tag_name }}-amd64 \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt-fe:${{ github.event.release.tag_name }}-arm64
docker manifest push ghcr.io/${{ github.repository_owner }}/docsgpt-fe:latest

View File

@@ -1,4 +1,4 @@
name: Build and push multi-arch DocsGPT Docker image
name: Build and push DocsGPT Docker image for development
on:
workflow_dispatch:
@@ -7,29 +7,20 @@ on:
- main
jobs:
build:
deploy:
if: github.repository == 'arc53/DocsGPT'
strategy:
matrix:
include:
- platform: linux/amd64
runner: ubuntu-latest
suffix: amd64
- platform: linux/arm64
runner: ubuntu-24.04-arm
suffix: arm64
runs-on: ${{ matrix.runner }}
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
install: true
- name: Login to DockerHub
uses: docker/login-action@v3
@@ -44,57 +35,15 @@ jobs:
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push platform-specific images
- name: Build and push Docker images to docker.io and ghcr.io
uses: docker/build-push-action@v6
with:
file: './application/Dockerfile'
platforms: ${{ matrix.platform }}
platforms: linux/amd64
context: ./application
push: true
tags: |
${{ secrets.DOCKER_USERNAME }}/docsgpt:develop-${{ matrix.suffix }}
ghcr.io/${{ github.repository_owner }}/docsgpt:develop-${{ matrix.suffix }}
provenance: false
sbom: false
${{ secrets.DOCKER_USERNAME }}/docsgpt:develop
ghcr.io/${{ github.repository_owner }}/docsgpt:develop
cache-from: type=registry,ref=${{ secrets.DOCKER_USERNAME }}/docsgpt:develop
cache-to: type=inline
manifest:
if: github.repository == 'arc53/DocsGPT'
needs: build
runs-on: ubuntu-latest
permissions:
packages: write
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
install: true
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to ghcr.io
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Create and push manifest for DockerHub
run: |
docker manifest create ${{ secrets.DOCKER_USERNAME }}/docsgpt:develop \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt:develop-amd64 \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt:develop-arm64
docker manifest push ${{ secrets.DOCKER_USERNAME }}/docsgpt:develop
- name: Create and push manifest for ghcr.io
run: |
docker manifest create ghcr.io/${{ github.repository_owner }}/docsgpt:develop \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt:develop-amd64 \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt:develop-arm64
docker manifest push ghcr.io/${{ github.repository_owner }}/docsgpt:develop

View File

@@ -7,33 +7,20 @@ on:
- main
jobs:
build:
deploy:
if: github.repository == 'arc53/DocsGPT'
strategy:
matrix:
include:
- platform: linux/amd64
runner: ubuntu-latest
suffix: amd64
- platform: linux/arm64
runner: ubuntu-24.04-arm
suffix: arm64
runs-on: ${{ matrix.runner }}
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Set up QEMU # Only needed for emulation, not for native arm64 builds
if: matrix.platform == 'linux/arm64'
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
install: true
- name: Login to DockerHub
uses: docker/login-action@v3
@@ -48,57 +35,15 @@ jobs:
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push platform-specific images
- name: Build and push Docker images to docker.io and ghcr.io
uses: docker/build-push-action@v6
with:
file: './frontend/Dockerfile'
platforms: ${{ matrix.platform }}
platforms: linux/amd64
context: ./frontend
push: true
tags: |
${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:develop-${{ matrix.suffix }}
ghcr.io/${{ github.repository_owner }}/docsgpt-fe:develop-${{ matrix.suffix }}
provenance: false
sbom: false
${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:develop
ghcr.io/${{ github.repository_owner }}/docsgpt-fe:develop
cache-from: type=registry,ref=${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:develop
cache-to: type=inline
manifest:
if: github.repository == 'arc53/DocsGPT'
needs: build
runs-on: ubuntu-latest
permissions:
packages: write
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
install: true
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to ghcr.io
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Create and push manifest for DockerHub
run: |
docker manifest create ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:develop \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:develop-amd64 \
--amend ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:develop-arm64
docker manifest push ${{ secrets.DOCKER_USERNAME }}/docsgpt-fe:develop
- name: Create and push manifest for ghcr.io
run: |
docker manifest create ghcr.io/${{ github.repository_owner }}/docsgpt-fe:develop \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt-fe:develop-amd64 \
--amend ghcr.io/${{ github.repository_owner }}/docsgpt-fe:develop-arm64
docker manifest push ghcr.io/${{ github.repository_owner }}/docsgpt-fe:develop

View File

@@ -6,7 +6,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.12"]
python-version: ["3.11"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
@@ -23,8 +23,8 @@ jobs:
run: |
python -m pytest --cov=application --cov-report=xml
- name: Upload coverage reports to Codecov
if: github.event_name == 'pull_request' && matrix.python-version == '3.12'
uses: codecov/codecov-action@v5
if: github.event_name == 'pull_request' && matrix.python-version == '3.11'
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

1
.gitignore vendored
View File

@@ -113,7 +113,6 @@ venv.bak/
# Spyder project settings
.spyderproject
.spyproject
.jwt_secret_key
# Rope project settings
.ropeproject

38
.vscode/launch.json vendored
View File

@@ -11,44 +11,6 @@
"skipFiles": [
"<node_internals>/**"
]
},
{
"name": "Flask Debugger",
"type": "debugpy",
"request": "launch",
"module": "flask",
"env": {
"FLASK_APP": "application/app.py",
"PYTHONPATH": "${workspaceFolder}",
"FLASK_ENV": "development",
"FLASK_DEBUG": "1",
"FLASK_RUN_PORT": "7091",
"FLASK_RUN_HOST": "0.0.0.0"
},
"args": [
"run",
"--no-debugger"
],
"cwd": "${workspaceFolder}",
},
{
"name": "Celery Debugger",
"type": "debugpy",
"request": "launch",
"module": "celery",
"env": {
"PYTHONPATH": "${workspaceFolder}",
},
"args": [
"-A",
"application.app.celery",
"worker",
"-l",
"INFO",
"--pool=solo"
],
"cwd": "${workspaceFolder}"
}
]
}

View File

@@ -27,7 +27,6 @@ Before creating issues, please check out how the latest version of our app looks
### 👨‍💻 If you're interested in contributing code, here are some important things to know:
For instructions on setting up a development environment, please refer to our [Development Deployment Guide](https://docs.docsgpt.cloud/Deploying/Development-Environment).
Tech Stack Overview:
@@ -35,40 +34,19 @@ Tech Stack Overview:
- 🖥 Backend: Developed in Python 🐍
### 🌐 Frontend Contributions (⚛️ React, Vite)
### 🌐 If you are looking to contribute to frontend (⚛React, Vite):
* The updated Figma design can be found [here](https://www.figma.com/file/OXLtrl1EAy885to6S69554/DocsGPT?node-id=0%3A1&t=hjWVuxRg9yi5YkJ9-1). Please try to follow the guidelines.
* **Coding Style:** We follow a strict coding style enforced by ESLint and Prettier. Please ensure your code adheres to the configuration provided in our repository's `fronetend/.eslintrc.js` file. We recommend configuring your editor with ESLint and Prettier to help with this.
* **Component Structure:** Strive for small, reusable components. Favor functional components and hooks over class components where possible.
* **State Management** If you need to add stores, please use Redux.
- The current frontend is being migrated from [`/application`](https://github.com/arc53/DocsGPT/tree/main/application) to [`/frontend`](https://github.com/arc53/DocsGPT/tree/main/frontend) with a new design, so please contribute to the new one.
- Check out this [milestone](https://github.com/arc53/DocsGPT/milestone/1) and its issues.
- The updated Figma design can be found [here](https://www.figma.com/file/OXLtrl1EAy885to6S69554/DocsGPT?node-id=0%3A1&t=hjWVuxRg9yi5YkJ9-1).
### 🖥 Backend Contributions (🐍 Python)
Please try to follow the guidelines.
- Review our issues and contribute to [`/application`](https://github.com/arc53/DocsGPT/tree/main/application)
### 🖥 If you are looking to contribute to Backend (🐍 Python):
- Review our issues and contribute to [`/application`](https://github.com/arc53/DocsGPT/tree/main/application) or [`/scripts`](https://github.com/arc53/DocsGPT/tree/main/scripts) (please disregard old [`ingest_rst.py`](https://github.com/arc53/DocsGPT/blob/main/scripts/old/ingest_rst.py) [`ingest_rst_sphinx.py`](https://github.com/arc53/DocsGPT/blob/main/scripts/old/ingest_rst_sphinx.py) files; these will be deprecated soon).
- All new code should be covered with unit tests ([pytest](https://github.com/pytest-dev/pytest)). Please find tests under [`/tests`](https://github.com/arc53/DocsGPT/tree/main/tests) folder.
- Before submitting your Pull Request, ensure it can be queried after ingesting some test data.
- **Coding Style:** We adhere to the [PEP 8](https://www.python.org/dev/peps/pep-0008/) style guide for Python code. We use `ruff` as our linter and code formatter. Please ensure your code is formatted correctly and passes `ruff` checks before submitting.
- **Type Hinting:** Please use type hints for all function arguments and return values. This improves code readability and helps catch errors early. Example:
```python
def my_function(name: str, count: int) -> list[str]:
...
```
- **Docstrings:** All functions and classes should have docstrings explaining their purpose, parameters, and return values. We prefer the [Google style docstrings](https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_google.html). Example:
```python
def my_function(name: str, count: int) -> list[str]:
"""Does something with a name and a count.
Args:
name: The name to use.
count: The number of times to do it.
Returns:
A list of strings.
"""
...
```
### Testing

41
HACKTOBERFEST.md Normal file
View File

@@ -0,0 +1,41 @@
# **🎉 Join the Hacktoberfest with DocsGPT and win a Free T-shirt and other prizes! 🎉**
Welcome, contributors! We're excited to announce that DocsGPT is participating in Hacktoberfest. Get involved by submitting meaningful pull requests.
All contributors with accepted PRs will receive a cool Holopin! 🤩 (Watch out for a reply in your PR to collect it).
### 🏆 Top 50 contributors will receive a special T-shirt
### 🏆 [LLM Document analysis by LexEU competition](https://github.com/arc53/DocsGPT/blob/main/lexeu-competition.md):
A separate competition is available for those who submit new retrieval / workflow method that will analyze a Document using EU laws.
With 200$, 100$, 50$ prize for 1st, 2nd and 3rd place respectively.
You can find more information [here](https://github.com/arc53/DocsGPT/blob/main/lexeu-competition.md)
## 📜 Here's How to Contribute:
```text
🛠️ Code: This is the golden ticket! Make meaningful contributions through PRs.
🧩 API extension: Build an app utilising DocsGPT API. We prefer submissions that showcase original ideas and turn the API into an AI agent.
They can be a completely separate repos.
For example:
https://github.com/arc53/tg-bot-docsgpt-extenstion or
https://github.com/arc53/DocsGPT-cli
Non-Code Contributions:
📚 Wiki: Improve our documentation, create a guide or change existing documentation.
🖥️ Design: Improve the UI/UX or design a new feature.
📝 Blogging or Content Creation: Write articles or create videos to showcase DocsGPT or highlight your contributions!
```
### 📝 Guidelines for Pull Requests:
- Familiarize yourself with the current contributions and our [Roadmap](https://github.com/orgs/arc53/projects/2).
- Before contributing we highly advise that you check existing [issues](https://github.com/arc53/DocsGPT/issues) or [create](https://github.com/arc53/DocsGPT/issues/new/choose) an issue and wait to get assigned.
- Once you are finished with your contribution, please fill in this [form](https://airtable.com/appikMaJwdHhC1SDP/pagoblCJ9W29wf6Hf/form).
- Refer to the [Documentation](https://docs.docsgpt.cloud/).
- Feel free to join our [Discord](https://discord.gg/n5BX8dh8rU) server. We're here to help newcomers, so don't hesitate to jump in! Join us [here](https://discord.gg/n5BX8dh8rU).
Thank you very much for considering contributing to DocsGPT during Hacktoberfest! 🙏 Your contributions (not just simple typos) could earn you a stylish new t-shirt and other prizes as a token of our appreciation. 🎁 Join us, and let's code together! 🚀

223
README.md
View File

@@ -3,11 +3,13 @@
</h1>
<p align="center">
<strong>Open-Source RAG Assistant</strong>
<strong>Open-Source Documentation Assistant</strong>
</p>
<p align="left">
<strong><a href="https://www.docsgpt.cloud/">DocsGPT</a></strong> is an open-source genAI tool that helps users get reliable answers from any knowledge source, while avoiding hallucinations. It enables quick and reliable information retrieval, with tooling and agentic system capability built in.
<strong><a href="https://www.docsgpt.cloud/">DocsGPT</a></strong> is a cutting-edge open-source solution that streamlines the process of finding information in the project documentation. With its integration of the powerful <strong>GPT</strong> models, developers can easily ask questions about a project and receive accurate answers.
Say goodbye to time-consuming manual searches, and let <strong><a href="https://www.docsgpt.cloud/">DocsGPT</a></strong> help you quickly find the information you need. Try it out and see how it revolutionizes your project documentation experience. Contribute to its development and be a part of the future of AI-powered assistance.
</p>
<div align="center">
@@ -15,131 +17,178 @@
<a href="https://github.com/arc53/DocsGPT">![link to main GitHub showing Stars number](https://img.shields.io/github/stars/arc53/docsgpt?style=social)</a>
<a href="https://github.com/arc53/DocsGPT">![link to main GitHub showing Forks number](https://img.shields.io/github/forks/arc53/docsgpt?style=social)</a>
<a href="https://github.com/arc53/DocsGPT/blob/main/LICENSE">![link to license file](https://img.shields.io/github/license/arc53/docsgpt)</a>
<a href="https://www.bestpractices.dev/projects/9907"><img src="https://www.bestpractices.dev/projects/9907/badge"></a>
<a href="https://discord.gg/n5BX8dh8rU">![link to discord](https://img.shields.io/discord/1070046503302877216)</a>
<a href="https://twitter.com/docsgptai">![X (formerly Twitter) URL](https://img.shields.io/twitter/follow/docsgptai)</a>
<a href="https://docs.docsgpt.cloud/quickstart">⚡️ Quickstart</a><a href="https://app.docsgpt.cloud/">☁️ Cloud Version</a><a href="https://discord.gg/n5BX8dh8rU">💬 Discord</a>
<br>
<a href="https://docs.docsgpt.cloud/">📖 Documentation</a><a href="https://github.com/arc53/DocsGPT/blob/main/CONTRIBUTING.md">👫 Contribute</a><a href="https://blog.docsgpt.cloud/">🗞 Blog</a>
<br>
</div>
<div align="center">
<img src="https://d3dg1063dc54p9.cloudfront.net/videos/demov7.gif" alt="video-example-of-docs-gpt" width="800" height="450">
</div>
<h3 align="left">
<strong>Key Features:</strong>
</h3>
<ul align="left">
<li><strong>🗂️ Wide Format Support:</strong> Reads PDF, DOCX, CSV, XLSX, EPUB, MD, RST, HTML, MDX, JSON, PPTX, and images.</li>
<li><strong>🌐 Web & Data Integration:</strong> Ingests from URLs, sitemaps, Reddit, GitHub and web crawlers.</li>
<li><strong>✅ Reliable Answers:</strong> Get accurate, hallucination-free responses with source citations viewable in a clean UI.</li>
<li><strong>🔑 Streamlined API Keys:</strong> Generate keys linked to your settings, documents, and models, simplifying chatbot and integration setup.</li>
<li><strong>🔗 Actionable Tooling:</strong> Connect to APIs, tools, and other services to enable LLM actions.</li>
<li><strong>🧩 Pre-built Integrations:</strong> Use readily available HTML/React chat widgets, search tools, Discord/Telegram bots, and more.</li>
<li><strong>🔌 Flexible Deployment:</strong> Works with major LLMs (OpenAI, Google, Anthropic) and local models (Ollama, llama_cpp).</li>
<li><strong>🏢 Secure & Scalable:</strong> Run privately and securely with Kubernetes support, designed for enterprise-grade reliability.</li>
</ul>
## Roadmap
- [x] Full GoogleAI compatibility (Jan 2025)
- [x] Add tools (Jan 2025)
- [x] Manually updating chunks in the app UI (Feb 2025)
- [x] Devcontainer for easy development (Feb 2025)
- [x] ReACT agent (March 2025)
- [x] Chatbots menu re-design to handle tools, agent types, and more (April 2025)
- [x] New input box in the conversation menu (April 2025)
- [x] Add triggerable actions / tools (webhook) (April 2025)
- [ ] Anthropic Tool compatibility (May 2025)
- [ ] Add OAuth 2.0 authentication for tools and sources
- [ ] Agent scheduling
You can find our full roadmap [here](https://github.com/orgs/arc53/projects/2). Please don't hesitate to contribute or create issues, it helps us improve DocsGPT!
### Production Support / Help for Companies:
We're eager to provide personalized assistance when deploying your DocsGPT to a live environment.
[Get a Demo :wave:](https://www.docsgpt.cloud/contact)
[Book a Meeting :wave:](https://cal.com/arc53/docsgpt-demo-b2b)
[Send Email :email:](mailto:support@docsgpt.cloud?subject=DocsGPT%20support%2Fsolutions)
[Send Email :email:](mailto:contact@arc53.com?subject=DocsGPT%20support%2Fsolutions)
## Join the Lighthouse Program 🌟
Calling all developers and GenAI innovators! The **DocsGPT Lighthouse Program** connects technical leaders actively deploying or extending DocsGPT in real-world scenarios. Collaborate directly with our team to shape the roadmap, access priority support, and build enterprise-ready solutions with exclusive community insights.
<img src="https://github.com/user-attachments/assets/9a1f21de-7a15-4e42-9424-70d22ba5a913" alt="video-example-of-docs-gpt" width="1000" height="500">
[Learn More & Apply →](https://docs.google.com/forms/d/1KAADiJinUJ8EMQyfTXUIGyFbqINNClNR3jBNWq7DgTE)
## Roadmap
You can find our roadmap [here](https://github.com/orgs/arc53/projects/2). Please don't hesitate to contribute or create issues, it helps us improve DocsGPT!
## Our Open-Source Models Optimized for DocsGPT:
| Name | Base Model | Requirements (or similar) |
| --------------------------------------------------------------------- | ----------- | ------------------------- |
| [Docsgpt-7b-mistral](https://huggingface.co/Arc53/docsgpt-7b-mistral) | Mistral-7b | 1xA10G gpu |
| [Docsgpt-14b](https://huggingface.co/Arc53/docsgpt-14b) | llama-2-14b | 2xA10 gpu's |
| [Docsgpt-40b-falcon](https://huggingface.co/Arc53/docsgpt-40b-falcon) | falcon-40b | 8xA10G gpu's |
If you don't have enough resources to run it, you can use bitsnbytes to quantize.
## End to End AI Framework for Information Retrieval
![Architecture chart](https://github.com/user-attachments/assets/fc6a7841-ddfc-45e6-b5a0-d05fe648cbe2)
## Useful Links
- :mag: :fire: [Cloud Version](https://app.docsgpt.cloud/)
- :speech_balloon: :tada: [Join our Discord](https://discord.gg/n5BX8dh8rU)
- :books: :sunglasses: [Guides](https://docs.docsgpt.cloud/)
- :couple: [Interested in contributing?](https://github.com/arc53/DocsGPT/blob/main/CONTRIBUTING.md)
- :file_folder: :rocket: [How to use any other documentation](https://docs.docsgpt.cloud/Guides/How-to-train-on-other-documentation)
- :house: :closed_lock_with_key: [How to host it locally (so all data will stay on-premises)](https://docs.docsgpt.cloud/Guides/How-to-use-different-LLM)
## Project Structure
- Application - Flask app (main application).
- Extensions - Chrome extension.
- Scripts - Script that creates similarity search index for other libraries.
- Frontend - Frontend uses <a href="https://vitejs.dev/">Vite</a> and <a href="https://react.dev/">React</a>.
## QuickStart
> [!Note]
> Make sure you have [Docker](https://docs.docker.com/engine/install/) installed
A more detailed [Quickstart](https://docs.docsgpt.cloud/quickstart) is available in our documentation
On Mac OS or Linux, write:
1. **Clone the repository:**
`./setup.sh`
```bash
git clone https://github.com/arc53/DocsGPT.git
cd DocsGPT
It will install all the dependencies and allow you to download the local model, use OpenAI or use our LLM API.
Otherwise, refer to this Guide for Windows:
1. Download and open this repository with `git clone https://github.com/arc53/DocsGPT.git`
2. Create a `.env` file in your root directory and set the env variables and `VITE_API_STREAMING` to true or false, depending on whether you want streaming answers or not.
It should look like this inside:
```
LLM_NAME=[docsgpt or openai or others]
VITE_API_STREAMING=true
API_KEY=[if LLM_NAME is openai]
```
**For macOS and Linux:**
See optional environment variables in the [/.env-template](https://github.com/arc53/DocsGPT/blob/main/.env-template) and [/application/.env_sample](https://github.com/arc53/DocsGPT/blob/main/application/.env_sample) files.
2. **Run the setup script:**
3. Run [./run-with-docker-compose.sh](https://github.com/arc53/DocsGPT/blob/main/run-with-docker-compose.sh).
4. Navigate to http://localhost:5173/.
```bash
./setup.sh
To stop, just run `Ctrl + C`.
## Development Environments
### Spin up Mongo and Redis
For development, only two containers are used from [docker-compose.yaml](https://github.com/arc53/DocsGPT/blob/main/docker-compose.yaml) (by deleting all services except for Redis and Mongo).
See file [docker-compose-dev.yaml](./docker-compose-dev.yaml).
Run
```
docker compose -f docker-compose-dev.yaml build
docker compose -f docker-compose-dev.yaml up -d
```
**For Windows:**
2. **Run the PowerShell setup script:**
```powershell
PowerShell -ExecutionPolicy Bypass -File .\setup.ps1
```
Either script will guide you through setting up DocsGPT. Four options available: using the public API, running locally, connecting to a local inference engine, or using a cloud API provider. Scripts will automatically configure your `.env` file and handle necessary downloads and installations based on your chosen option.
**Navigate to http://localhost:5173/**
To stop DocsGPT, open a terminal in the `DocsGPT` directory and run:
```bash
docker compose -f deployment/docker-compose.yaml down
```
(or use the specific `docker compose down` command shown after running the setup script).
### Run the Backend
> [!Note]
> For development environment setup instructions, please refer to the [Development Environment Guide](https://docs.docsgpt.cloud/Deploying/Development-Environment).
> Make sure you have Python 3.10 or 3.11 installed.
1. Export required environment variables or prepare a `.env` file in the project folder:
- Copy [.env-template](https://github.com/arc53/DocsGPT/blob/main/application/.env-template) and create `.env`.
(check out [`application/core/settings.py`](application/core/settings.py) if you want to see more config options.)
2. (optional) Create a Python virtual environment:
You can follow the [Python official documentation](https://docs.python.org/3/tutorial/venv.html) for virtual environments.
a) On Mac OS and Linux
```commandline
python -m venv venv
. venv/bin/activate
```
b) On Windows
```commandline
python -m venv venv
venv/Scripts/activate
```
3. Download embedding model and save it in the `model/` folder:
You can use the script below, or download it manually from [here](https://d3dg1063dc54p9.cloudfront.net/models/embeddings/mpnet-base-v2.zip), unzip it and save it in the `model/` folder.
```commandline
wget https://d3dg1063dc54p9.cloudfront.net/models/embeddings/mpnet-base-v2.zip
unzip mpnet-base-v2.zip -d model
rm mpnet-base-v2.zip
```
4. Install dependencies for the backend:
```commandline
pip install -r application/requirements.txt
```
5. Run the app using `flask --app application/app.py run --host=0.0.0.0 --port=7091`.
6. Start worker with `celery -A application.app.celery worker -l INFO`.
### Start Frontend
> [!Note]
> Make sure you have Node version 16 or higher.
1. Navigate to the [/frontend](https://github.com/arc53/DocsGPT/tree/main/frontend) folder.
2. Install the required packages `husky` and `vite` (ignore if already installed).
```commandline
npm install husky -g
npm install vite -g
```
3. Install dependencies by running `npm install --include=dev`.
4. Run the app using `npm run dev`.
## Contributing
Please refer to the [CONTRIBUTING.md](CONTRIBUTING.md) file for information about how to get involved. We welcome issues, questions, and pull requests.
## Architecture
![Architecture chart](https://github.com/user-attachments/assets/fc6a7841-ddfc-45e6-b5a0-d05fe648cbe2)
## Project Structure
- Application - Flask app (main application).
- Extensions - Extensions, like react widget or discord bot.
- Frontend - Frontend uses <a href="https://vitejs.dev/">Vite</a> and <a href="https://react.dev/">React</a>.
- Scripts - Miscellaneous scripts.
## Code Of Conduct
We as members, contributors, and leaders, pledge to make participation in our community a harassment-free experience for everyone, regardless of age, body size, visible or invisible disability, ethnicity, sex characteristics, gender identity and expression, level of experience, education, socio-economic status, nationality, personal appearance, race, religion, or sexual identity and orientation. Please refer to the [CODE_OF_CONDUCT.md](CODE_OF_CONDUCT.md) file for more information about contributing.
## Many Thanks To Our Contributors⚡
<a href="https://github.com/arc53/DocsGPT/graphs/contributors" alt="View Contributors">

View File

@@ -6,20 +6,21 @@ ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y software-properties-common && \
add-apt-repository ppa:deadsnakes/ppa && \
# Install necessary packages and Python
apt-get update && \
apt-get install -y --no-install-recommends gcc wget unzip libc6-dev python3.12 python3.12-venv && \
apt-get install -y --no-install-recommends gcc wget unzip libc6-dev python3.11 python3.11-distutils python3.11-venv && \
rm -rf /var/lib/apt/lists/*
# Verify Python installation and setup symlink
RUN if [ -f /usr/bin/python3.12 ]; then \
ln -s /usr/bin/python3.12 /usr/bin/python; \
RUN if [ -f /usr/bin/python3.11 ]; then \
ln -s /usr/bin/python3.11 /usr/bin/python; \
else \
echo "Python 3.12 not found"; exit 1; \
echo "Python 3.11 not found"; exit 1; \
fi
# Download and unzip the model
RUN wget https://d3dg1063dc54p9.cloudfront.net/models/embeddings/mpnet-base-v2.zip && \
unzip mpnet-base-v2.zip -d models && \
unzip mpnet-base-v2.zip -d model && \
rm mpnet-base-v2.zip
# Install Rust
@@ -32,7 +33,7 @@ RUN apt-get remove --purge -y wget unzip && apt-get autoremove -y && rm -rf /var
COPY requirements.txt .
# Setup Python virtual environment
RUN python3.12 -m venv /venv
RUN python3.11 -m venv /venv
# Activate virtual environment and install Python packages
ENV PATH="/venv/bin:$PATH"
@@ -48,8 +49,9 @@ FROM ubuntu:24.04 as final
RUN apt-get update && \
apt-get install -y software-properties-common && \
add-apt-repository ppa:deadsnakes/ppa && \
apt-get update && apt-get install -y --no-install-recommends python3.12 && \
ln -s /usr/bin/python3.12 /usr/bin/python && \
# Install Python
apt-get update && apt-get install -y --no-install-recommends python3.11 && \
ln -s /usr/bin/python3.11 /usr/bin/python && \
rm -rf /var/lib/apt/lists/*
# Set working directory
@@ -61,8 +63,7 @@ RUN groupadd -r appuser && \
# Copy the virtual environment and model from the builder stage
COPY --from=builder /venv /venv
COPY --from=builder /models /app/models
COPY --from=builder /model /app/model
# Copy your application code
COPY . /app/application
@@ -84,4 +85,4 @@ EXPOSE 7091
USER appuser
# Start Gunicorn
CMD ["gunicorn", "-w", "1", "--timeout", "120", "--bind", "0.0.0.0:7091", "--preload", "application.wsgi:app"]
CMD ["gunicorn", "-w", "2", "--timeout", "120", "--bind", "0.0.0.0:7091", "application.wsgi:app"]

View File

@@ -1,16 +0,0 @@
from application.agents.classic_agent import ClassicAgent
from application.agents.react_agent import ReActAgent
class AgentCreator:
agents = {
"classic": ClassicAgent,
"react": ReActAgent,
}
@classmethod
def create_agent(cls, type, *args, **kwargs):
agent_class = cls.agents.get(type.lower())
if not agent_class:
raise ValueError(f"No agent class found for type {type}")
return agent_class(*args, **kwargs)

View File

@@ -1,327 +0,0 @@
import uuid
from abc import ABC, abstractmethod
from typing import Dict, Generator, List, Optional
from bson.objectid import ObjectId
from application.agents.tools.tool_action_parser import ToolActionParser
from application.agents.tools.tool_manager import ToolManager
from application.core.mongo_db import MongoDB
from application.core.settings import settings
from application.llm.handlers.handler_creator import LLMHandlerCreator
from application.llm.llm_creator import LLMCreator
from application.logging import build_stack_data, log_activity, LogContext
from application.retriever.base import BaseRetriever
class BaseAgent(ABC):
def __init__(
self,
endpoint: str,
llm_name: str,
gpt_model: str,
api_key: str,
user_api_key: Optional[str] = None,
prompt: str = "",
chat_history: Optional[List[Dict]] = None,
decoded_token: Optional[Dict] = None,
attachments: Optional[List[Dict]] = None,
):
self.endpoint = endpoint
self.llm_name = llm_name
self.gpt_model = gpt_model
self.api_key = api_key
self.user_api_key = user_api_key
self.prompt = prompt
self.decoded_token = decoded_token or {}
self.user: str = decoded_token.get("sub")
self.tool_config: Dict = {}
self.tools: List[Dict] = []
self.tool_calls: List[Dict] = []
self.chat_history: List[Dict] = chat_history if chat_history is not None else []
self.llm = LLMCreator.create_llm(
llm_name,
api_key=api_key,
user_api_key=user_api_key,
decoded_token=decoded_token,
)
self.llm_handler = LLMHandlerCreator.create_handler(
llm_name if llm_name else "default"
)
self.attachments = attachments or []
@log_activity()
def gen(
self, query: str, retriever: BaseRetriever, log_context: LogContext = None
) -> Generator[Dict, None, None]:
yield from self._gen_inner(query, retriever, log_context)
@abstractmethod
def _gen_inner(
self, query: str, retriever: BaseRetriever, log_context: LogContext
) -> Generator[Dict, None, None]:
pass
def _get_tools(self, api_key: str = None) -> Dict[str, Dict]:
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
agents_collection = db["agents"]
tools_collection = db["user_tools"]
agent_data = agents_collection.find_one({"key": api_key or self.user_api_key})
tool_ids = agent_data.get("tools", []) if agent_data else []
tools = (
tools_collection.find(
{"_id": {"$in": [ObjectId(tool_id) for tool_id in tool_ids]}}
)
if tool_ids
else []
)
tools = list(tools)
tools_by_id = {str(tool["_id"]): tool for tool in tools} if tools else {}
return tools_by_id
def _get_user_tools(self, user="local"):
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
user_tools_collection = db["user_tools"]
user_tools = user_tools_collection.find({"user": user, "status": True})
user_tools = list(user_tools)
tools_by_id = {str(tool["_id"]): tool for tool in user_tools}
return tools_by_id
def _build_tool_parameters(self, action):
params = {"type": "object", "properties": {}, "required": []}
for param_type in ["query_params", "headers", "body", "parameters"]:
if param_type in action and action[param_type].get("properties"):
for k, v in action[param_type]["properties"].items():
if v.get("filled_by_llm", True):
params["properties"][k] = {
key: value
for key, value in v.items()
if key != "filled_by_llm" and key != "value"
}
params["required"].append(k)
return params
def _prepare_tools(self, tools_dict):
self.tools = [
{
"type": "function",
"function": {
"name": f"{action['name']}_{tool_id}",
"description": action["description"],
"parameters": self._build_tool_parameters(action),
},
}
for tool_id, tool in tools_dict.items()
if (
(tool["name"] == "api_tool" and "actions" in tool.get("config", {}))
or (tool["name"] != "api_tool" and "actions" in tool)
)
for action in (
tool["config"]["actions"].values()
if tool["name"] == "api_tool"
else tool["actions"]
)
if action.get("active", True)
]
def _execute_tool_action(self, tools_dict, call):
parser = ToolActionParser(self.llm.__class__.__name__)
tool_id, action_name, call_args = parser.parse_args(call)
call_id = getattr(call, "id", None) or str(uuid.uuid4())
tool_call_data = {
"tool_name": tools_dict[tool_id]["name"],
"call_id": call_id,
"action_name": f"{action_name}_{tool_id}",
"arguments": call_args,
}
yield {"type": "tool_call", "data": {**tool_call_data, "status": "pending"}}
tool_data = tools_dict[tool_id]
action_data = (
tool_data["config"]["actions"][action_name]
if tool_data["name"] == "api_tool"
else next(
action
for action in tool_data["actions"]
if action["name"] == action_name
)
)
query_params, headers, body, parameters = {}, {}, {}, {}
param_types = {
"query_params": query_params,
"headers": headers,
"body": body,
"parameters": parameters,
}
for param_type, target_dict in param_types.items():
if param_type in action_data and action_data[param_type].get("properties"):
for param, details in action_data[param_type]["properties"].items():
if param not in call_args and "value" in details:
target_dict[param] = details["value"]
for param, value in call_args.items():
for param_type, target_dict in param_types.items():
if param_type in action_data and param in action_data[param_type].get(
"properties", {}
):
target_dict[param] = value
tm = ToolManager(config={})
tool = tm.load_tool(
tool_data["name"],
tool_config=(
{
"url": tool_data["config"]["actions"][action_name]["url"],
"method": tool_data["config"]["actions"][action_name]["method"],
"headers": headers,
"query_params": query_params,
}
if tool_data["name"] == "api_tool"
else tool_data["config"]
),
)
if tool_data["name"] == "api_tool":
print(
f"Executing api: {action_name} with query_params: {query_params}, headers: {headers}, body: {body}"
)
result = tool.execute_action(action_name, **body)
else:
print(f"Executing tool: {action_name} with args: {call_args}")
result = tool.execute_action(action_name, **parameters)
tool_call_data["result"] = (
f"{str(result)[:50]}..." if len(str(result)) > 50 else result
)
yield {"type": "tool_call", "data": {**tool_call_data, "status": "completed"}}
self.tool_calls.append(tool_call_data)
return result, call_id
def _get_truncated_tool_calls(self):
return [
{
**tool_call,
"result": (
f"{str(tool_call['result'])[:50]}..."
if len(str(tool_call["result"])) > 50
else tool_call["result"]
),
"status": "completed",
}
for tool_call in self.tool_calls
]
def _build_messages(
self,
system_prompt: str,
query: str,
retrieved_data: List[Dict],
) -> List[Dict]:
docs_together = "\n".join([doc["text"] for doc in retrieved_data])
p_chat_combine = system_prompt.replace("{summaries}", docs_together)
messages_combine = [{"role": "system", "content": p_chat_combine}]
for i in self.chat_history:
if "prompt" in i and "response" in i:
messages_combine.append({"role": "user", "content": i["prompt"]})
messages_combine.append({"role": "assistant", "content": i["response"]})
if "tool_calls" in i:
for tool_call in i["tool_calls"]:
call_id = tool_call.get("call_id") or str(uuid.uuid4())
function_call_dict = {
"function_call": {
"name": tool_call.get("action_name"),
"args": tool_call.get("arguments"),
"call_id": call_id,
}
}
function_response_dict = {
"function_response": {
"name": tool_call.get("action_name"),
"response": {"result": tool_call.get("result")},
"call_id": call_id,
}
}
messages_combine.append(
{"role": "assistant", "content": [function_call_dict]}
)
messages_combine.append(
{"role": "tool", "content": [function_response_dict]}
)
messages_combine.append({"role": "user", "content": query})
return messages_combine
def _retriever_search(
self,
retriever: BaseRetriever,
query: str,
log_context: Optional[LogContext] = None,
) -> List[Dict]:
retrieved_data = retriever.search(query)
if log_context:
data = build_stack_data(retriever, exclude_attributes=["llm"])
log_context.stacks.append({"component": "retriever", "data": data})
return retrieved_data
def _llm_gen(self, messages: List[Dict], log_context: Optional[LogContext] = None):
gen_kwargs = {"model": self.gpt_model, "messages": messages}
if (
hasattr(self.llm, "_supports_tools")
and self.llm._supports_tools
and self.tools
):
gen_kwargs["tools"] = self.tools
resp = self.llm.gen_stream(**gen_kwargs)
if log_context:
data = build_stack_data(self.llm, exclude_attributes=["client"])
log_context.stacks.append({"component": "llm", "data": data})
return resp
def _llm_handler(
self,
resp,
tools_dict: Dict,
messages: List[Dict],
log_context: Optional[LogContext] = None,
attachments: Optional[List[Dict]] = None,
):
resp = self.llm_handler.process_message_flow(
self, resp, tools_dict, messages, attachments, True
)
if log_context:
data = build_stack_data(self.llm_handler, exclude_attributes=["tool_calls"])
log_context.stacks.append({"component": "llm_handler", "data": data})
return resp
def _handle_response(self, response, tools_dict, messages, log_context):
if isinstance(response, str):
yield {"answer": response}
return
if hasattr(response, "message") and getattr(response.message, "content", None):
yield {"answer": response.message.content}
return
processed_response_gen = self._llm_handler(
response, tools_dict, messages, log_context, self.attachments
)
for event in processed_response_gen:
if isinstance(event, str):
yield {"answer": event}
elif hasattr(event, "message") and getattr(event.message, "content", None):
yield {"answer": event.message.content}
elif isinstance(event, dict) and "type" in event:
yield event

View File

@@ -1,53 +0,0 @@
from typing import Dict, Generator
from application.agents.base import BaseAgent
from application.logging import LogContext
from application.retriever.base import BaseRetriever
import logging
logger = logging.getLogger(__name__)
class ClassicAgent(BaseAgent):
"""A simplified classic agent with clear execution flow.
Usage:
1. Processes a query through retrieval
2. Sets up available tools
3. Generates responses using LLM
4. Handles tool interactions if needed
5. Returns standardized outputs
Easy to extend by overriding specific steps.
"""
def _gen_inner(
self, query: str, retriever: BaseRetriever, log_context: LogContext
) -> Generator[Dict, None, None]:
# Step 1: Retrieve relevant data
retrieved_data = self._retriever_search(retriever, query, log_context)
# Step 2: Prepare tools
tools_dict = (
self._get_user_tools(self.user)
if not self.user_api_key
else self._get_tools(self.user_api_key)
)
self._prepare_tools(tools_dict)
# Step 3: Build and process messages
messages = self._build_messages(self.prompt, query, retrieved_data)
llm_response = self._llm_gen(messages, log_context)
# Step 4: Handle the response
yield from self._handle_response(
llm_response, tools_dict, messages, log_context
)
# Step 5: Return metadata
yield {"sources": retrieved_data}
yield {"tool_calls": self._get_truncated_tool_calls()}
# Log tool calls for debugging
log_context.stacks.append(
{"component": "agent", "data": {"tool_calls": self.tool_calls.copy()}}
)

View File

@@ -1,229 +0,0 @@
import os
from typing import Dict, Generator, List, Any
import logging
from application.agents.base import BaseAgent
from application.logging import build_stack_data, LogContext
from application.retriever.base import BaseRetriever
logger = logging.getLogger(__name__)
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
with open(
os.path.join(current_dir, "application/prompts", "react_planning_prompt.txt"), "r"
) as f:
planning_prompt_template = f.read()
with open(
os.path.join(current_dir, "application/prompts", "react_final_prompt.txt"),
"r",
) as f:
final_prompt_template = f.read()
MAX_ITERATIONS_REASONING = 10
class ReActAgent(BaseAgent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.plan: str = ""
self.observations: List[str] = []
def _extract_content_from_llm_response(self, resp: Any) -> str:
"""
Helper to extract string content from various LLM response types.
Handles strings, message objects (OpenAI-like), and streams.
Adapt stream handling for your specific LLM client if not OpenAI.
"""
collected_content = []
if isinstance(resp, str):
collected_content.append(resp)
elif ( # OpenAI non-streaming or Anthropic non-streaming (older SDK style)
hasattr(resp, "message")
and hasattr(resp.message, "content")
and resp.message.content is not None
):
collected_content.append(resp.message.content)
elif ( # OpenAI non-streaming (Pydantic model), Anthropic new SDK non-streaming
hasattr(resp, "choices") and resp.choices and
hasattr(resp.choices[0], "message") and
hasattr(resp.choices[0].message, "content") and
resp.choices[0].message.content is not None
):
collected_content.append(resp.choices[0].message.content) # OpenAI
elif ( # Anthropic new SDK non-streaming content block
hasattr(resp, "content") and isinstance(resp.content, list) and resp.content and
hasattr(resp.content[0], "text")
):
collected_content.append(resp.content[0].text) # Anthropic
else:
# Assume resp is a stream if not a recognized object
try:
for chunk in resp: # This will fail if resp is not iterable (e.g. a non-streaming response object)
content_piece = ""
# OpenAI-like stream
if hasattr(chunk, 'choices') and len(chunk.choices) > 0 and \
hasattr(chunk.choices[0], 'delta') and \
hasattr(chunk.choices[0].delta, 'content') and \
chunk.choices[0].delta.content is not None:
content_piece = chunk.choices[0].delta.content
# Anthropic-like stream (ContentBlockDelta)
elif hasattr(chunk, 'type') and chunk.type == 'content_block_delta' and \
hasattr(chunk, 'delta') and hasattr(chunk.delta, 'text'):
content_piece = chunk.delta.text
elif isinstance(chunk, str): # Simplest case: stream of strings
content_piece = chunk
if content_piece:
collected_content.append(content_piece)
except TypeError: # If resp is not iterable (e.g. a final response object that wasn't caught above)
logger.debug(f"Response type {type(resp)} could not be iterated as a stream. It might be a non-streaming object not handled by specific checks.")
except Exception as e:
logger.error(f"Error processing potential stream chunk: {e}, chunk was: {getattr(chunk, '__dict__', chunk)}")
return "".join(collected_content)
def _gen_inner(
self, query: str, retriever: BaseRetriever, log_context: LogContext
) -> Generator[Dict, None, None]:
# Reset state for this generation call
self.plan = ""
self.observations = []
retrieved_data = self._retriever_search(retriever, query, log_context)
if self.user_api_key:
tools_dict = self._get_tools(self.user_api_key)
else:
tools_dict = self._get_user_tools(self.user)
self._prepare_tools(tools_dict)
docs_together = "\n".join([doc["text"] for doc in retrieved_data])
iterating_reasoning = 0
while iterating_reasoning < MAX_ITERATIONS_REASONING:
iterating_reasoning += 1
# 1. Create Plan
logger.info("ReActAgent: Creating plan...")
plan_stream = self._create_plan(query, docs_together, log_context)
current_plan_parts = []
yield {"thought": f"Reasoning... (iteration {iterating_reasoning})\n\n"}
for line_chunk in plan_stream:
current_plan_parts.append(line_chunk)
yield {"thought": line_chunk}
self.plan = "".join(current_plan_parts)
if self.plan:
self.observations.append(f"Plan: {self.plan} Iteration: {iterating_reasoning}")
max_obs_len = 20000
obs_str = "\n".join(self.observations)
if len(obs_str) > max_obs_len:
obs_str = obs_str[:max_obs_len] + "\n...[observations truncated]"
execution_prompt_str = (
(self.prompt or "")
+ f"\n\nFollow this plan:\n{self.plan}"
+ f"\n\nObservations:\n{obs_str}"
+ f"\n\nIf there is enough data to complete user query '{query}', Respond with 'SATISFIED' only. Otherwise, continue. Dont Menstion 'SATISFIED' in your response if you are not ready. "
)
messages = self._build_messages(execution_prompt_str, query, retrieved_data)
resp_from_llm_gen = self._llm_gen(messages, log_context)
initial_llm_thought_content = self._extract_content_from_llm_response(resp_from_llm_gen)
if initial_llm_thought_content:
self.observations.append(f"Initial thought/response: {initial_llm_thought_content}")
else:
logger.info("ReActAgent: Initial LLM response (before handler) had no textual content (might be only tool calls).")
resp_after_handler = self._llm_handler(resp_from_llm_gen, tools_dict, messages, log_context)
for tool_call_info in self.tool_calls: # Iterate over self.tool_calls populated by _llm_handler
observation_string = (
f"Executed Action: Tool '{tool_call_info.get('tool_name', 'N/A')}' "
f"with arguments '{tool_call_info.get('arguments', '{}')}'. Result: '{str(tool_call_info.get('result', ''))[:200]}...'"
)
self.observations.append(observation_string)
content_after_handler = self._extract_content_from_llm_response(resp_after_handler)
if content_after_handler:
self.observations.append(f"Response after tool execution: {content_after_handler}")
else:
logger.info("ReActAgent: LLM response after handler had no textual content.")
if log_context:
log_context.stacks.append(
{"component": "agent_tool_calls", "data": {"tool_calls": self.tool_calls.copy()}}
)
yield {"sources": retrieved_data}
display_tool_calls = []
for tc in self.tool_calls:
cleaned_tc = tc.copy()
if len(str(cleaned_tc.get("result", ""))) > 50:
cleaned_tc["result"] = str(cleaned_tc["result"])[:50] + "..."
display_tool_calls.append(cleaned_tc)
if display_tool_calls:
yield {"tool_calls": display_tool_calls}
if "SATISFIED" in content_after_handler:
logger.info("ReActAgent: LLM satisfied with the plan and data. Stopping reasoning.")
break
# 3. Create Final Answer based on all observations
final_answer_stream = self._create_final_answer(query, self.observations, log_context)
for answer_chunk in final_answer_stream:
yield {"answer": answer_chunk}
logger.info("ReActAgent: Finished generating final answer.")
def _create_plan(
self, query: str, docs_data: str, log_context: LogContext = None
) -> Generator[str, None, None]:
plan_prompt_filled = planning_prompt_template.replace("{query}", query)
if "{summaries}" in plan_prompt_filled:
summaries = docs_data if docs_data else "No documents retrieved."
plan_prompt_filled = plan_prompt_filled.replace("{summaries}", summaries)
plan_prompt_filled = plan_prompt_filled.replace("{prompt}", self.prompt or "")
plan_prompt_filled = plan_prompt_filled.replace("{observations}", "\n".join(self.observations))
messages = [{"role": "user", "content": plan_prompt_filled}]
plan_stream_from_llm = self.llm.gen_stream(
model=self.gpt_model, messages=messages, tools=getattr(self, 'tools', None) # Use self.tools
)
if log_context:
data = build_stack_data(self.llm)
log_context.stacks.append({"component": "planning_llm", "data": data})
for chunk in plan_stream_from_llm:
content_piece = self._extract_content_from_llm_response(chunk)
if content_piece:
yield content_piece
def _create_final_answer(
self, query: str, observations: List[str], log_context: LogContext = None
) -> Generator[str, None, None]:
observation_string = "\n".join(observations)
max_obs_len = 10000
if len(observation_string) > max_obs_len:
observation_string = observation_string[:max_obs_len] + "\n...[observations truncated]"
logger.warning("ReActAgent: Truncated observations for final answer prompt due to length.")
final_answer_prompt_filled = final_prompt_template.format(
query=query, observations=observation_string
)
messages = [{"role": "user", "content": final_answer_prompt_filled}]
# Final answer should synthesize, not call tools.
final_answer_stream_from_llm = self.llm.gen_stream(
model=self.gpt_model, messages=messages, tools=None
)
if log_context:
data = build_stack_data(self.llm)
log_context.stacks.append({"component": "final_answer_llm", "data": data})
for chunk in final_answer_stream_from_llm:
content_piece = self._extract_content_from_llm_response(chunk)
if content_piece:
yield content_piece

View File

@@ -1,72 +0,0 @@
import json
import requests
from application.agents.tools.base import Tool
class APITool(Tool):
"""
API Tool
A flexible tool for performing various API actions (e.g., sending messages, retrieving data) via custom user-specified APIs
"""
def __init__(self, config):
self.config = config
self.url = config.get("url", "")
self.method = config.get("method", "GET")
self.headers = config.get("headers", {"Content-Type": "application/json"})
self.query_params = config.get("query_params", {})
def execute_action(self, action_name, **kwargs):
return self._make_api_call(
self.url, self.method, self.headers, self.query_params, kwargs
)
def _make_api_call(self, url, method, headers, query_params, body):
if query_params:
url = f"{url}?{requests.compat.urlencode(query_params)}"
# if isinstance(body, dict):
# body = json.dumps(body)
try:
print(f"Making API call: {method} {url} with body: {body}")
if body == "{}":
body = None
response = requests.request(method, url, headers=headers, data=body)
response.raise_for_status()
content_type = response.headers.get(
"Content-Type", "application/json"
).lower()
if "application/json" in content_type:
try:
data = response.json()
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}. Raw response: {response.text}")
return {
"status_code": response.status_code,
"message": f"API call returned invalid JSON. Error: {e}",
"data": response.text,
}
elif "text/" in content_type or "application/xml" in content_type:
data = response.text
elif not response.content:
data = None
else:
print(f"Unsupported content type: {content_type}")
data = response.content
return {
"status_code": response.status_code,
"data": data,
"message": "API call successful.",
}
except requests.exceptions.RequestException as e:
return {
"status_code": response.status_code if response else None,
"message": f"API call failed: {str(e)}",
}
def get_actions_metadata(self):
return []
def get_config_requirements(self):
return {}

View File

@@ -1,21 +0,0 @@
from abc import ABC, abstractmethod
class Tool(ABC):
@abstractmethod
def execute_action(self, action_name: str, **kwargs):
pass
@abstractmethod
def get_actions_metadata(self):
"""
Returns a list of JSON objects describing the actions supported by the tool.
"""
pass
@abstractmethod
def get_config_requirements(self):
"""
Returns a dictionary describing the configuration requirements for the tool.
"""
pass

View File

@@ -1,217 +0,0 @@
import requests
from application.agents.tools.base import Tool
class BraveSearchTool(Tool):
"""
Brave Search
A tool for performing web and image searches using the Brave Search API.
Requires an API key for authentication.
"""
def __init__(self, config):
self.config = config
self.token = config.get("token", "")
self.base_url = "https://api.search.brave.com/res/v1"
def execute_action(self, action_name, **kwargs):
actions = {
"brave_web_search": self._web_search,
"brave_image_search": self._image_search,
}
if action_name in actions:
return actions[action_name](**kwargs)
else:
raise ValueError(f"Unknown action: {action_name}")
def _web_search(self, query, country="ALL", search_lang="en", count=10,
offset=0, safesearch="off", freshness=None,
result_filter=None, extra_snippets=False, summary=False):
"""
Performs a web search using the Brave Search API.
"""
print(f"Performing Brave web search for: {query}")
url = f"{self.base_url}/web/search"
# Build query parameters
params = {
"q": query,
"country": country,
"search_lang": search_lang,
"count": min(count, 20),
"offset": min(offset, 9),
"safesearch": safesearch
}
# Add optional parameters only if they have values
if freshness:
params["freshness"] = freshness
if result_filter:
params["result_filter"] = result_filter
if extra_snippets:
params["extra_snippets"] = 1
if summary:
params["summary"] = 1
# Set up headers
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": self.token
}
# Make the request
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
return {
"status_code": response.status_code,
"results": response.json(),
"message": "Search completed successfully."
}
else:
return {
"status_code": response.status_code,
"message": f"Search failed with status code: {response.status_code}."
}
def _image_search(self, query, country="ALL", search_lang="en", count=5,
safesearch="off", spellcheck=False):
"""
Performs an image search using the Brave Search API.
"""
print(f"Performing Brave image search for: {query}")
url = f"{self.base_url}/images/search"
# Build query parameters
params = {
"q": query,
"country": country,
"search_lang": search_lang,
"count": min(count, 100), # API max is 100
"safesearch": safesearch,
"spellcheck": 1 if spellcheck else 0
}
# Set up headers
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": self.token
}
# Make the request
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
return {
"status_code": response.status_code,
"results": response.json(),
"message": "Image search completed successfully."
}
else:
return {
"status_code": response.status_code,
"message": f"Image search failed with status code: {response.status_code}."
}
def get_actions_metadata(self):
return [
{
"name": "brave_web_search",
"description": "Perform a web search using Brave Search",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query (max 400 characters, 50 words)",
},
# "country": {
# "type": "string",
# "description": "The 2-character country code (default: US)",
# },
"search_lang": {
"type": "string",
"description": "The search language preference (default: en)",
},
# "count": {
# "type": "integer",
# "description": "Number of results to return (max 20, default: 10)",
# },
# "offset": {
# "type": "integer",
# "description": "Pagination offset (max 9, default: 0)",
# },
# "safesearch": {
# "type": "string",
# "description": "Filter level for adult content (off, moderate, strict)",
# },
"freshness": {
"type": "string",
"description": "Time filter for results (pd: last 24h, pw: last week, pm: last month, py: last year)",
},
# "result_filter": {
# "type": "string",
# "description": "Comma-delimited list of result types to include",
# },
# "extra_snippets": {
# "type": "boolean",
# "description": "Get additional excerpts from result pages",
# },
# "summary": {
# "type": "boolean",
# "description": "Enable summary generation in search results",
# }
},
"required": ["query"],
"additionalProperties": False,
},
},
{
"name": "brave_image_search",
"description": "Perform an image search using Brave Search",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query (max 400 characters, 50 words)",
},
# "country": {
# "type": "string",
# "description": "The 2-character country code (default: US)",
# },
# "search_lang": {
# "type": "string",
# "description": "The search language preference (default: en)",
# },
"count": {
"type": "integer",
"description": "Number of results to return (max 100, default: 5)",
},
# "safesearch": {
# "type": "string",
# "description": "Filter level for adult content (off, strict). Default: strict",
# },
# "spellcheck": {
# "type": "boolean",
# "description": "Whether to spellcheck provided query (default: true)",
# }
},
"required": ["query"],
"additionalProperties": False,
},
}
]
def get_config_requirements(self):
return {
"token": {
"type": "string",
"description": "Brave Search API key for authentication"
},
}

View File

@@ -1,76 +0,0 @@
import requests
from application.agents.tools.base import Tool
class CryptoPriceTool(Tool):
"""
CryptoPrice
A tool for retrieving cryptocurrency prices using the CryptoCompare public API
"""
def __init__(self, config):
self.config = config
def execute_action(self, action_name, **kwargs):
actions = {"cryptoprice_get": self._get_price}
if action_name in actions:
return actions[action_name](**kwargs)
else:
raise ValueError(f"Unknown action: {action_name}")
def _get_price(self, symbol, currency):
"""
Fetches the current price of a given cryptocurrency symbol in the specified currency.
Example:
symbol = "BTC"
currency = "USD"
returns price in USD.
"""
url = f"https://min-api.cryptocompare.com/data/price?fsym={symbol.upper()}&tsyms={currency.upper()}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if currency.upper() in data:
return {
"status_code": response.status_code,
"price": data[currency.upper()],
"message": f"Price of {symbol.upper()} in {currency.upper()} retrieved successfully.",
}
else:
return {
"status_code": response.status_code,
"message": f"Couldn't find price for {symbol.upper()} in {currency.upper()}.",
}
else:
return {
"status_code": response.status_code,
"message": "Failed to retrieve price.",
}
def get_actions_metadata(self):
return [
{
"name": "cryptoprice_get",
"description": "Retrieve the price of a specified cryptocurrency in a given currency",
"parameters": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "The cryptocurrency symbol (e.g. BTC)",
},
"currency": {
"type": "string",
"description": "The currency in which you want the price (e.g. USD)",
},
},
"required": ["symbol", "currency"],
"additionalProperties": False,
},
}
]
def get_config_requirements(self):
# No specific configuration needed for this tool as it just queries a public endpoint
return {}

View File

@@ -1,127 +0,0 @@
import requests
from application.agents.tools.base import Tool
class NtfyTool(Tool):
"""
Ntfy Tool
A tool for sending notifications to ntfy topics on a specified server.
"""
def __init__(self, config):
"""
Initialize the NtfyTool with configuration.
Args:
config (dict): Configuration dictionary containing the access token.
"""
self.config = config
self.token = config.get("token", "")
def execute_action(self, action_name, **kwargs):
"""
Execute the specified action with given parameters.
Args:
action_name (str): Name of the action to execute.
**kwargs: Parameters for the action, including server_url.
Returns:
dict: Result of the action with status code and message.
Raises:
ValueError: If the action name is unknown.
"""
actions = {
"ntfy_send_message": self._send_message,
}
if action_name in actions:
return actions[action_name](**kwargs)
else:
raise ValueError(f"Unknown action: {action_name}")
def _send_message(self, server_url, message, topic, title=None, priority=None):
"""
Send a message to an ntfy topic on the specified server.
Args:
server_url (str): Base URL of the ntfy server (e.g., https://ntfy.sh).
message (str): The message text to send.
topic (str): The topic to send the message to.
title (str, optional): Title of the notification.
priority (int, optional): Priority of the notification (1-5).
Returns:
dict: Response with status code and a confirmation message.
Raises:
ValueError: If priority is not an integer between 1 and 5.
"""
url = f"{server_url.rstrip('/')}/{topic}"
headers = {}
if title:
headers["X-Title"] = title
if priority:
try:
priority = int(priority)
except (ValueError, TypeError):
raise ValueError("Priority must be convertible to an integer")
if priority < 1 or priority > 5:
raise ValueError("Priority must be an integer between 1 and 5")
headers["X-Priority"] = str(priority)
if self.token:
headers["Authorization"] = f"Basic {self.token}"
data = message.encode("utf-8")
response = requests.post(url, headers=headers, data=data)
return {"status_code": response.status_code, "message": "Message sent"}
def get_actions_metadata(self):
"""
Provide metadata about available actions.
Returns:
list: List of dictionaries describing each action.
"""
return [
{
"name": "ntfy_send_message",
"description": "Send a notification to an ntfy topic",
"parameters": {
"type": "object",
"properties": {
"server_url": {
"type": "string",
"description": "Base URL of the ntfy server",
},
"message": {
"type": "string",
"description": "Text to send in the notification",
},
"topic": {
"type": "string",
"description": "Topic to send the notification to",
},
"title": {
"type": "string",
"description": "Title of the notification (optional)",
},
"priority": {
"type": "integer",
"description": "Priority of the notification (1-5, optional)",
},
},
"required": ["server_url", "message", "topic"],
"additionalProperties": False,
},
},
]
def get_config_requirements(self):
"""
Specify the configuration requirements.
Returns:
dict: Dictionary describing required config parameters.
"""
return {
"token": {"type": "string", "description": "Access token for authentication"},
}

View File

@@ -1,163 +0,0 @@
import psycopg2
from application.agents.tools.base import Tool
class PostgresTool(Tool):
"""
PostgreSQL Database Tool
A tool for connecting to a PostgreSQL database using a connection string,
executing SQL queries, and retrieving schema information.
"""
def __init__(self, config):
self.config = config
self.connection_string = config.get("token", "")
def execute_action(self, action_name, **kwargs):
actions = {
"postgres_execute_sql": self._execute_sql,
"postgres_get_schema": self._get_schema,
}
if action_name in actions:
return actions[action_name](**kwargs)
else:
raise ValueError(f"Unknown action: {action_name}")
def _execute_sql(self, sql_query):
"""
Executes an SQL query against the PostgreSQL database using a connection string.
"""
conn = None # Initialize conn to None for error handling
try:
conn = psycopg2.connect(self.connection_string)
cur = conn.cursor()
cur.execute(sql_query)
conn.commit()
if sql_query.strip().lower().startswith("select"):
column_names = [desc[0] for desc in cur.description] if cur.description else []
results = []
rows = cur.fetchall()
for row in rows:
results.append(dict(zip(column_names, row)))
response_data = {"data": results, "column_names": column_names}
else:
row_count = cur.rowcount
response_data = {"message": f"Query executed successfully, {row_count} rows affected."}
cur.close()
return {
"status_code": 200,
"message": "SQL query executed successfully.",
"response_data": response_data,
}
except psycopg2.Error as e:
error_message = f"Database error: {e}"
print(f"Database error: {e}")
return {
"status_code": 500,
"message": "Failed to execute SQL query.",
"error": error_message,
}
finally:
if conn: # Ensure connection is closed even if errors occur
conn.close()
def _get_schema(self, db_name):
"""
Retrieves the schema of the PostgreSQL database using a connection string.
"""
conn = None # Initialize conn to None for error handling
try:
conn = psycopg2.connect(self.connection_string)
cur = conn.cursor()
cur.execute("""
SELECT
table_name,
column_name,
data_type,
column_default,
is_nullable
FROM
information_schema.columns
WHERE
table_schema = 'public'
ORDER BY
table_name,
ordinal_position;
""")
schema_data = {}
for row in cur.fetchall():
table_name, column_name, data_type, column_default, is_nullable = row
if table_name not in schema_data:
schema_data[table_name] = []
schema_data[table_name].append({
"column_name": column_name,
"data_type": data_type,
"column_default": column_default,
"is_nullable": is_nullable
})
cur.close()
return {
"status_code": 200,
"message": "Database schema retrieved successfully.",
"schema": schema_data,
}
except psycopg2.Error as e:
error_message = f"Database error: {e}"
print(f"Database error: {e}")
return {
"status_code": 500,
"message": "Failed to retrieve database schema.",
"error": error_message,
}
finally:
if conn: # Ensure connection is closed even if errors occur
conn.close()
def get_actions_metadata(self):
return [
{
"name": "postgres_execute_sql",
"description": "Execute an SQL query against the PostgreSQL database and return the results. Use this tool to interact with the database, e.g., retrieve specific data or perform updates. Only SELECT queries will return data, other queries will return execution status.",
"parameters": {
"type": "object",
"properties": {
"sql_query": {
"type": "string",
"description": "The SQL query to execute.",
},
},
"required": ["sql_query"],
"additionalProperties": False,
},
},
{
"name": "postgres_get_schema",
"description": "Retrieve the schema of the PostgreSQL database, including tables and their columns. Use this to understand the database structure before executing queries. db_name is 'default' if not provided.",
"parameters": {
"type": "object",
"properties": {
"db_name": {
"type": "string",
"description": "The name of the database to retrieve the schema for.",
},
},
"required": ["db_name"],
"additionalProperties": False,
},
},
]
def get_config_requirements(self):
return {
"token": {
"type": "string",
"description": "PostgreSQL database connection string (e.g., 'postgresql://user:password@host:port/dbname')",
},
}

View File

@@ -1,83 +0,0 @@
import requests
from markdownify import markdownify
from application.agents.tools.base import Tool
from urllib.parse import urlparse
class ReadWebpageTool(Tool):
"""
Read Webpage (browser)
A tool to fetch the HTML content of a URL and convert it to Markdown.
"""
def __init__(self, config=None):
"""
Initializes the tool.
:param config: Optional configuration dictionary. Not used by this tool.
"""
self.config = config
def execute_action(self, action_name: str, **kwargs) -> str:
"""
Executes the specified action. For this tool, the only action is 'read_webpage'.
:param action_name: The name of the action to execute. Should be 'read_webpage'.
:param kwargs: Keyword arguments, must include 'url'.
:return: The Markdown content of the webpage or an error message.
"""
if action_name != "read_webpage":
return f"Error: Unknown action '{action_name}'. This tool only supports 'read_webpage'."
url = kwargs.get("url")
if not url:
return "Error: URL parameter is missing."
# Ensure the URL has a scheme (if not, default to http)
parsed_url = urlparse(url)
if not parsed_url.scheme:
url = "http://" + url
try:
response = requests.get(url, timeout=10, headers={'User-Agent': 'DocsGPT-Agent/1.0'})
response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)
html_content = response.text
#soup = BeautifulSoup(html_content, 'html.parser')
markdown_content = markdownify(html_content, heading_style="ATX", newline_style="BACKSLASH")
return markdown_content
except requests.exceptions.RequestException as e:
return f"Error fetching URL {url}: {e}"
except Exception as e:
return f"Error processing URL {url}: {e}"
def get_actions_metadata(self):
"""
Returns metadata for the actions supported by this tool.
"""
return [
{
"name": "read_webpage",
"description": "Fetches the HTML content of a given URL and returns it as clean Markdown text. Input must be a valid URL.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The fully qualified URL of the webpage to read (e.g., 'https://www.example.com').",
}
},
"required": ["url"],
"additionalProperties": False,
},
}
]
def get_config_requirements(self):
"""
Returns a dictionary describing the configuration requirements for the tool.
This tool does not require any specific configuration.
"""
return {}

View File

@@ -1,86 +0,0 @@
import requests
from application.agents.tools.base import Tool
class TelegramTool(Tool):
"""
Telegram Bot
A flexible Telegram tool for performing various actions (e.g., sending messages, images).
Requires a bot token and chat ID for configuration
"""
def __init__(self, config):
self.config = config
self.token = config.get("token", "")
def execute_action(self, action_name, **kwargs):
actions = {
"telegram_send_message": self._send_message,
"telegram_send_image": self._send_image,
}
if action_name in actions:
return actions[action_name](**kwargs)
else:
raise ValueError(f"Unknown action: {action_name}")
def _send_message(self, text, chat_id):
print(f"Sending message: {text}")
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
payload = {"chat_id": chat_id, "text": text}
response = requests.post(url, data=payload)
return {"status_code": response.status_code, "message": "Message sent"}
def _send_image(self, image_url, chat_id):
print(f"Sending image: {image_url}")
url = f"https://api.telegram.org/bot{self.token}/sendPhoto"
payload = {"chat_id": chat_id, "photo": image_url}
response = requests.post(url, data=payload)
return {"status_code": response.status_code, "message": "Image sent"}
def get_actions_metadata(self):
return [
{
"name": "telegram_send_message",
"description": "Send a notification to Telegram chat",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Text to send in the notification",
},
"chat_id": {
"type": "string",
"description": "Chat ID to send the notification to",
},
},
"required": ["text"],
"additionalProperties": False,
},
},
{
"name": "telegram_send_image",
"description": "Send an image to the Telegram chat",
"parameters": {
"type": "object",
"properties": {
"image_url": {
"type": "string",
"description": "URL of the image to send",
},
"chat_id": {
"type": "string",
"description": "Chat ID to send the image to",
},
},
"required": ["image_url"],
"additionalProperties": False,
},
},
]
def get_config_requirements(self):
return {
"token": {"type": "string", "description": "Bot token for authentication"},
}

View File

@@ -1,37 +0,0 @@
import json
import logging
logger = logging.getLogger(__name__)
class ToolActionParser:
def __init__(self, llm_type):
self.llm_type = llm_type
self.parsers = {
"OpenAILLM": self._parse_openai_llm,
"GoogleLLM": self._parse_google_llm,
}
def parse_args(self, call):
parser = self.parsers.get(self.llm_type, self._parse_openai_llm)
return parser(call)
def _parse_openai_llm(self, call):
try:
call_args = json.loads(call.arguments)
tool_id = call.name.split("_")[-1]
action_name = call.name.rsplit("_", 1)[0]
except (AttributeError, TypeError) as e:
logger.error(f"Error parsing OpenAI LLM call: {e}")
return None, None, None
return tool_id, action_name, call_args
def _parse_google_llm(self, call):
try:
call_args = call.arguments
tool_id = call.name.split("_")[-1]
action_name = call.name.rsplit("_", 1)[0]
except (AttributeError, TypeError) as e:
logger.error(f"Error parsing Google LLM call: {e}")
return None, None, None
return tool_id, action_name, call_args

View File

@@ -1,42 +0,0 @@
import importlib
import inspect
import os
import pkgutil
from application.agents.tools.base import Tool
class ToolManager:
def __init__(self, config):
self.config = config
self.tools = {}
self.load_tools()
def load_tools(self):
tools_dir = os.path.join(os.path.dirname(__file__))
for finder, name, ispkg in pkgutil.iter_modules([tools_dir]):
if name == "base" or name.startswith("__"):
continue
module = importlib.import_module(f"application.agents.tools.{name}")
for member_name, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, Tool) and obj is not Tool:
tool_config = self.config.get(name, {})
self.tools[name] = obj(tool_config)
def load_tool(self, tool_name, tool_config):
self.config[tool_name] = tool_config
module = importlib.import_module(f"application.agents.tools.{tool_name}")
for member_name, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, Tool) and obj is not Tool:
return obj(tool_config)
def execute_action(self, tool_name, action_name, **kwargs):
if tool_name not in self.tools:
raise ValueError(f"Tool '{tool_name}' not loaded")
return self.tools[tool_name].execute_action(action_name, **kwargs)
def get_all_actions_metadata(self):
metadata = []
for tool in self.tools.values():
metadata.extend(tool.get_actions_metadata())
return metadata

View File

@@ -3,14 +3,14 @@ import datetime
import json
import logging
import os
import sys
import traceback
from bson.dbref import DBRef
from bson.objectid import ObjectId
from flask import Blueprint, make_response, request, Response
from flask import Blueprint, current_app, make_response, request, Response
from flask_restx import fields, Namespace, Resource
from application.agents.agent_creator import AgentCreator
from application.core.mongo_db import MongoDB
from application.core.settings import settings
@@ -18,18 +18,17 @@ from application.error import bad_request
from application.extensions import api
from application.llm.llm_creator import LLMCreator
from application.retriever.retriever_creator import RetrieverCreator
from application.utils import check_required_fields, limit_chat_history
from application.utils import check_required_fields
logger = logging.getLogger(__name__)
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
sources_collection = db["sources"]
prompts_collection = db["prompts"]
agents_collection = db["agents"]
api_key_collection = db["api_keys"]
user_logs_collection = db["user_logs"]
attachments_collection = db["attachments"]
answer = Blueprint("answer", __name__)
answer_ns = Namespace("answer", description="Answer related operations", path="/")
@@ -37,17 +36,15 @@ api.add_namespace(answer_ns)
gpt_model = ""
# to have some kind of default behaviour
if settings.LLM_PROVIDER == "openai":
gpt_model = "gpt-4o-mini"
elif settings.LLM_PROVIDER == "anthropic":
if settings.LLM_NAME == "openai":
gpt_model = "gpt-3.5-turbo"
elif settings.LLM_NAME == "anthropic":
gpt_model = "claude-2"
elif settings.LLM_PROVIDER == "groq":
elif settings.LLM_NAME == "groq":
gpt_model = "llama3-8b-8192"
elif settings.LLM_PROVIDER == "novita":
gpt_model = "deepseek/deepseek-r1"
if settings.LLM_NAME: # in case there is particular model name configured
gpt_model = settings.LLM_NAME
if settings.MODEL_NAME: # in case there is particular model name configured
gpt_model = settings.MODEL_NAME
# load the prompts
current_dir = os.path.dirname(
@@ -86,51 +83,22 @@ def run_async_chain(chain, question, chat_history):
return result
def get_agent_key(agent_id, user_id):
if not agent_id:
return None, False, None
try:
agent = agents_collection.find_one({"_id": ObjectId(agent_id)})
if agent is None:
raise Exception("Agent not found", 404)
is_owner = agent.get("user") == user_id
if is_owner:
agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": {"lastUsedAt": datetime.datetime.now(datetime.timezone.utc)}},
)
return str(agent["key"]), False, None
is_shared_with_user = agent.get(
"shared_publicly", False
) or user_id in agent.get("shared_with", [])
if is_shared_with_user:
return str(agent["key"]), True, agent.get("shared_token")
raise Exception("Unauthorized access to the agent", 403)
except Exception as e:
logger.error(f"Error in get_agent_key: {str(e)}", exc_info=True)
raise
def get_data_from_api_key(api_key):
data = agents_collection.find_one({"key": api_key})
if not data:
raise Exception("Invalid API Key, please generate a new key", 401)
data = api_key_collection.find_one({"key": api_key})
# # Raise custom exception if the API key is not found
if data is None:
raise Exception("Invalid API Key, please generate new key", 401)
source = data.get("source")
if isinstance(source, DBRef):
source_doc = db.dereference(source)
if "retriever" not in data:
data["retriever"] = None
if "source" in data and isinstance(data["source"], DBRef):
source_doc = db.dereference(data["source"])
data["source"] = str(source_doc["_id"])
data["retriever"] = source_doc.get("retriever", data.get("retriever"))
if "retriever" in source_doc:
data["retriever"] = source_doc["retriever"]
else:
data["source"] = {}
return data
@@ -150,44 +118,8 @@ def is_azure_configured():
)
def save_conversation(
conversation_id,
question,
response,
thought,
source_log_docs,
tool_calls,
llm,
decoded_token,
index=None,
api_key=None,
agent_id=None,
is_shared_usage=False,
shared_token=None,
attachment_ids=None,
):
current_time = datetime.datetime.now(datetime.timezone.utc)
if conversation_id is not None and index is not None:
conversations_collection.update_one(
{"_id": ObjectId(conversation_id), f"queries.{index}": {"$exists": True}},
{
"$set": {
f"queries.{index}.prompt": question,
f"queries.{index}.response": response,
f"queries.{index}.thought": thought,
f"queries.{index}.sources": source_log_docs,
f"queries.{index}.tool_calls": tool_calls,
f"queries.{index}.timestamp": current_time,
f"queries.{index}.attachments": attachment_ids,
}
},
)
##remove following queries from the array
conversations_collection.update_one(
{"_id": ObjectId(conversation_id), f"queries.{index}": {"$exists": True}},
{"$push": {"queries": {"$each": [], "$slice": index + 1}}},
)
elif conversation_id is not None and conversation_id != "None":
def save_conversation(conversation_id, question, response, source_log_docs, llm):
if conversation_id is not None and conversation_id != "None":
conversations_collection.update_one(
{"_id": ObjectId(conversation_id)},
{
@@ -195,11 +127,7 @@ def save_conversation(
"queries": {
"prompt": question,
"response": response,
"thought": thought,
"sources": source_log_docs,
"tool_calls": tool_calls,
"timestamp": current_time,
"attachments": attachment_ids,
}
}
},
@@ -219,38 +147,28 @@ def save_conversation(
"role": "user",
"content": "Summarise following conversation in no more than 3 words, "
"respond ONLY with the summary, use the same language as the "
"system \n\nUser: " + question + "\n\n" + "AI: " + response,
"system \n\nUser: "
+ question
+ "\n\n"
+ "AI: "
+ response,
},
]
completion = llm.gen(model=gpt_model, messages=messages_summary, max_tokens=30)
conversation_data = {
"user": decoded_token.get("sub"),
conversation_id = conversations_collection.insert_one(
{
"user": "local",
"date": datetime.datetime.utcnow(),
"name": completion,
"queries": [
{
"prompt": question,
"response": response,
"thought": thought,
"sources": source_log_docs,
"tool_calls": tool_calls,
"timestamp": current_time,
"attachments": attachment_ids,
}
],
}
if api_key:
if agent_id:
conversation_data["agent_id"] = agent_id
if is_shared_usage:
conversation_data["is_shared_usage"] = is_shared_usage
conversation_data["shared_token"] = shared_token
api_key_doc = agents_collection.find_one({"key": api_key})
if api_key_doc:
conversation_data["api_key"] = api_key_doc["key"]
conversation_id = conversations_collection.insert_one(
conversation_data
).inserted_id
return conversation_id
@@ -268,84 +186,39 @@ def get_prompt(prompt_id):
def complete_stream(
question,
agent,
retriever,
conversation_id,
user_api_key,
decoded_token,
isNoneDoc=False,
index=None,
should_save_conversation=True,
attachment_ids=None,
agent_id=None,
is_shared_usage=False,
shared_token=None,
question, retriever, conversation_id, user_api_key, isNoneDoc=False
):
try:
response_full, thought, source_log_docs, tool_calls = "", "", [], []
answer = agent.gen(query=question, retriever=retriever)
response_full = ""
source_log_docs = []
answer = retriever.gen()
sources = retriever.search()
for source in sources:
if "text" in source:
source["text"] = source["text"][:100].strip() + "..."
if len(sources) > 0:
data = json.dumps({"type": "source", "source": sources})
yield f"data: {data}\n\n"
for line in answer:
if "answer" in line:
response_full += str(line["answer"])
data = json.dumps({"type": "answer", "answer": line["answer"]})
yield f"data: {data}\n\n"
elif "sources" in line:
truncated_sources = []
source_log_docs = line["sources"]
for source in line["sources"]:
truncated_source = source.copy()
if "text" in truncated_source:
truncated_source["text"] = (
truncated_source["text"][:100].strip() + "..."
)
truncated_sources.append(truncated_source)
if len(truncated_sources) > 0:
data = json.dumps({"type": "source", "source": truncated_sources})
yield f"data: {data}\n\n"
elif "tool_calls" in line:
tool_calls = line["tool_calls"]
elif "thought" in line:
thought += line["thought"]
data = json.dumps({"type": "thought", "thought": line["thought"]})
yield f"data: {data}\n\n"
elif "type" in line:
data = json.dumps(line)
yield f"data: {data}\n\n"
elif "source" in line:
source_log_docs.append(line["source"])
if isNoneDoc:
for doc in source_log_docs:
doc["source"] = "None"
llm = LLMCreator.create_llm(
settings.LLM_PROVIDER,
api_key=settings.API_KEY,
user_api_key=user_api_key,
decoded_token=decoded_token,
settings.LLM_NAME, api_key=settings.API_KEY, user_api_key=user_api_key
)
if should_save_conversation:
if user_api_key is None:
conversation_id = save_conversation(
conversation_id,
question,
response_full,
thought,
source_log_docs,
tool_calls,
llm,
decoded_token,
index,
api_key=user_api_key,
attachment_ids=attachment_ids,
agent_id=agent_id,
is_shared_usage=is_shared_usage,
shared_token=shared_token,
conversation_id, question, response_full, source_log_docs, llm
)
else:
conversation_id = None
# send data.type = "end" to indicate that the stream has ended as json
data = json.dumps({"type": "id", "id": str(conversation_id)})
yield f"data: {data}\n\n"
@@ -355,24 +228,25 @@ def complete_stream(
{
"action": "stream_answer",
"level": "info",
"user": decoded_token.get("sub"),
"user": "local",
"api_key": user_api_key,
"question": question,
"response": response_full,
"sources": source_log_docs,
"retriever_params": retriever_params,
"attachments": attachment_ids,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
}
)
data = json.dumps({"type": "end"})
yield f"data: {data}\n\n"
except Exception as e:
logger.error(f"Error in stream: {str(e)}", exc_info=True)
print("\033[91merr", str(e), file=sys.stderr)
traceback.print_exc()
data = json.dumps(
{
"type": "error",
"error": "Please try again later. We apologize for any inconvenience.",
"error_exception": str(e),
}
)
yield f"data: {data}\n\n"
@@ -408,17 +282,6 @@ class Stream(Resource):
"isNoneDoc": fields.Boolean(
required=False, description="Flag indicating if no document is used"
),
"index": fields.Integer(
required=False, description="Index of the query to update"
),
"save_conversation": fields.Boolean(
required=False,
default=True,
description="Whether to save the conversation",
),
"attachments": fields.List(
fields.String, required=False, description="List of attachment IDs"
),
},
)
@@ -427,37 +290,22 @@ class Stream(Resource):
def post(self):
data = request.get_json()
required_fields = ["question"]
if "index" in data:
required_fields = ["question", "conversation_id"]
missing_fields = check_required_fields(data, required_fields)
if missing_fields:
return missing_fields
save_conv = data.get("save_conversation", True)
try:
question = data["question"]
history = limit_chat_history(
json.loads(data.get("history", "[]")), gpt_model=gpt_model
)
history = data.get("history", [])
history = json.loads(history)
conversation_id = data.get("conversation_id")
prompt_id = data.get("prompt_id", "default")
attachment_ids = data.get("attachments", [])
index = data.get("index", None)
chunks = int(data.get("chunks", 2))
token_limit = data.get("token_limit", settings.DEFAULT_MAX_HISTORY)
retriever_name = data.get("retriever", "classic")
agent_id = data.get("agent_id", None)
agent_type = settings.AGENT_NAME
decoded_token = getattr(request, "decoded_token", None)
user_sub = decoded_token.get("sub") if decoded_token else None
agent_key, is_shared_usage, shared_token = get_agent_key(agent_id, user_sub)
if agent_key:
data.update({"api_key": agent_key})
else:
agent_id = None
if "api_key" in data:
data_key = get_data_from_api_key(data["api_key"])
@@ -466,55 +314,27 @@ class Stream(Resource):
source = {"active_docs": data_key.get("source")}
retriever_name = data_key.get("retriever", retriever_name)
user_api_key = data["api_key"]
agent_type = data_key.get("agent_type", agent_type)
if is_shared_usage:
decoded_token = request.decoded_token
else:
decoded_token = {"sub": data_key.get("user")}
is_shared_usage = False
elif "active_docs" in data:
source = {"active_docs": data["active_docs"]}
retriever_name = get_retriever(data["active_docs"]) or retriever_name
user_api_key = None
decoded_token = request.decoded_token
else:
source = {}
user_api_key = None
decoded_token = request.decoded_token
if not decoded_token:
return make_response({"error": "Unauthorized"}, 401)
attachments = get_attachments_content(
attachment_ids, decoded_token.get("sub")
)
logger.info(
f"/stream - request_data: {data}, source: {source}, attachments: {len(attachments)}",
current_app.logger.info(
f"/stream - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})},
)
prompt = get_prompt(prompt_id)
if "isNoneDoc" in data and data["isNoneDoc"] is True:
chunks = 0
agent = AgentCreator.create_agent(
agent_type,
endpoint="stream",
llm_name=settings.LLM_PROVIDER,
gpt_model=gpt_model,
api_key=settings.API_KEY,
user_api_key=user_api_key,
prompt=prompt,
chat_history=history,
decoded_token=decoded_token,
attachments=attachments,
)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
source=source,
chat_history=history,
prompt=prompt,
@@ -522,44 +342,39 @@ class Stream(Resource):
token_limit=token_limit,
gpt_model=gpt_model,
user_api_key=user_api_key,
decoded_token=decoded_token,
)
return Response(
complete_stream(
question=question,
agent=agent,
retriever=retriever,
conversation_id=conversation_id,
user_api_key=user_api_key,
decoded_token=decoded_token,
isNoneDoc=data.get("isNoneDoc"),
index=index,
should_save_conversation=save_conv,
attachment_ids=attachment_ids,
agent_id=agent_id,
is_shared_usage=is_shared_usage,
shared_token=shared_token,
),
mimetype="text/event-stream",
)
except ValueError:
message = "Malformed request body"
logger.error(f"/stream - error: {message}")
print("\033[91merr", str(message), file=sys.stderr)
return Response(
error_stream_generate(message),
status=400,
mimetype="text/event-stream",
)
except Exception as e:
logger.error(
current_app.logger.error(
f"/stream - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()},
)
message = e.args[0]
status_code = 400
# Custom exceptions with two arguments, index 1 as status code
if len(e.args) >= 2:
status_code = e.args[1]
return Response(
error_stream_generate("Unknown error occurred"),
error_stream_generate(message),
status=status_code,
mimetype="text/event-stream",
)
@@ -613,15 +428,12 @@ class Answer(Resource):
try:
question = data["question"]
history = limit_chat_history(
json.loads(data.get("history", [])), gpt_model=gpt_model
)
history = data.get("history", [])
conversation_id = data.get("conversation_id")
prompt_id = data.get("prompt_id", "default")
chunks = int(data.get("chunks", 2))
token_limit = data.get("token_limit", settings.DEFAULT_MAX_HISTORY)
retriever_name = data.get("retriever", "classic")
agent_type = settings.AGENT_NAME
if "api_key" in data:
data_key = get_data_from_api_key(data["api_key"])
@@ -630,44 +442,24 @@ class Answer(Resource):
source = {"active_docs": data_key.get("source")}
retriever_name = data_key.get("retriever", retriever_name)
user_api_key = data["api_key"]
agent_type = data_key.get("agent_type", agent_type)
decoded_token = {"sub": data_key.get("user")}
elif "active_docs" in data:
source = {"active_docs": data["active_docs"]}
retriever_name = get_retriever(data["active_docs"]) or retriever_name
user_api_key = None
decoded_token = request.decoded_token
else:
source = {}
user_api_key = None
decoded_token = request.decoded_token
if not decoded_token:
return make_response({"error": "Unauthorized"}, 401)
prompt = get_prompt(prompt_id)
logger.info(
current_app.logger.info(
f"/api/answer - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})},
)
agent = AgentCreator.create_agent(
agent_type,
endpoint="api/answer",
llm_name=settings.LLM_PROVIDER,
gpt_model=gpt_model,
api_key=settings.API_KEY,
user_api_key=user_api_key,
prompt=prompt,
chat_history=history,
decoded_token=decoded_token,
)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
source=source,
chat_history=history,
prompt=prompt,
@@ -675,84 +467,36 @@ class Answer(Resource):
token_limit=token_limit,
gpt_model=gpt_model,
user_api_key=user_api_key,
decoded_token=decoded_token,
)
response_full = ""
source_log_docs = []
tool_calls = []
stream_ended = False
thought = ""
for line in complete_stream(
question=question,
agent=agent,
retriever=retriever,
conversation_id=conversation_id,
user_api_key=user_api_key,
decoded_token=decoded_token,
isNoneDoc=data.get("isNoneDoc"),
index=None,
should_save_conversation=False,
):
try:
event_data = line.replace("data: ", "").strip()
event = json.loads(event_data)
if event["type"] == "answer":
response_full += event["answer"]
elif event["type"] == "source":
source_log_docs = event["source"]
elif event["type"] == "tool_calls":
tool_calls = event["tool_calls"]
elif event["type"] == "thought":
thought = event["thought"]
elif event["type"] == "error":
logger.error(f"Error from stream: {event['error']}")
return bad_request(500, event["error"])
elif event["type"] == "end":
stream_ended = True
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Error parsing stream event: {e}, line: {line}")
continue
if not stream_ended:
logger.error("Stream ended unexpectedly without an 'end' event.")
return bad_request(500, "Stream ended unexpectedly.")
response_full = ""
for line in retriever.gen():
if "source" in line:
source_log_docs.append(line["source"])
elif "answer" in line:
response_full += line["answer"]
if data.get("isNoneDoc"):
for doc in source_log_docs:
doc["source"] = "None"
llm = LLMCreator.create_llm(
settings.LLM_PROVIDER,
api_key=settings.API_KEY,
user_api_key=user_api_key,
decoded_token=decoded_token,
settings.LLM_NAME, api_key=settings.API_KEY, user_api_key=user_api_key
)
result = {"answer": response_full, "sources": source_log_docs}
result["conversation_id"] = str(
save_conversation(
conversation_id,
question,
response_full,
thought,
source_log_docs,
tool_calls,
llm,
decoded_token,
api_key=user_api_key,
conversation_id, question, response_full, source_log_docs, llm
)
)
retriever_params = retriever.get_params()
user_logs_collection.insert_one(
{
"action": "api_answer",
"level": "info",
"user": decoded_token.get("sub"),
"user": "local",
"api_key": user_api_key,
"question": question,
"response": response_full,
@@ -763,7 +507,7 @@ class Answer(Resource):
)
except Exception as e:
logger.error(
current_app.logger.error(
f"/api/answer - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()},
)
@@ -821,28 +565,21 @@ class Search(Resource):
chunks = int(data_key.get("chunks", 2))
source = {"active_docs": data_key.get("source")}
user_api_key = data["api_key"]
decoded_token = {"sub": data_key.get("user")}
elif "active_docs" in data:
source = {"active_docs": data["active_docs"]}
user_api_key = None
decoded_token = request.decoded_token
else:
source = {}
user_api_key = None
decoded_token = request.decoded_token
if not decoded_token:
return make_response({"error": "Unauthorized"}, 401)
logger.info(
current_app.logger.info(
f"/api/answer - request_data: {data}, source: {source}",
extra={"data": json.dumps({"request_data": data, "source": source})},
)
retriever = RetrieverCreator.create_retriever(
retriever_name,
question=question,
source=source,
chat_history=[],
prompt="default",
@@ -850,17 +587,16 @@ class Search(Resource):
token_limit=token_limit,
gpt_model=gpt_model,
user_api_key=user_api_key,
decoded_token=decoded_token,
)
docs = retriever.search(question)
docs = retriever.search()
retriever_params = retriever.get_params()
user_logs_collection.insert_one(
{
"action": "api_search",
"level": "info",
"user": decoded_token.get("sub"),
"user": "local",
"api_key": user_api_key,
"question": question,
"sources": docs,
@@ -874,41 +610,10 @@ class Search(Resource):
doc["source"] = "None"
except Exception as e:
logger.error(
current_app.logger.error(
f"/api/search - error: {str(e)} - traceback: {traceback.format_exc()}",
extra={"error": str(e), "traceback": traceback.format_exc()},
)
return bad_request(500, str(e))
return make_response(docs, 200)
def get_attachments_content(attachment_ids, user):
"""
Retrieve content from attachment documents based on their IDs.
Args:
attachment_ids (list): List of attachment document IDs
user (str): User identifier to verify ownership
Returns:
list: List of dictionaries containing attachment content and metadata
"""
if not attachment_ids:
return []
attachments = []
for attachment_id in attachment_ids:
try:
attachment_doc = attachments_collection.find_one(
{"_id": ObjectId(attachment_id), "user": user}
)
if attachment_doc:
attachments.append(attachment_doc)
except Exception as e:
logger.error(
f"Error retrieving attachment {attachment_id}: {e}", exc_info=True
)
return attachments

View File

@@ -3,15 +3,12 @@ import datetime
from flask import Blueprint, request, send_from_directory
from werkzeug.utils import secure_filename
from bson.objectid import ObjectId
import logging
from application.core.mongo_db import MongoDB
from application.core.settings import settings
from application.storage.storage_creator import StorageCreator
logger = logging.getLogger(__name__)
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
sources_collection = db["sources"]
@@ -37,39 +34,37 @@ def upload_index_files():
"""Upload two files(index.faiss, index.pkl) to the user's folder."""
if "user" not in request.form:
return {"status": "no user"}
user = request.form["user"]
user = secure_filename(request.form["user"])
if "name" not in request.form:
return {"status": "no name"}
job_name = request.form["name"]
tokens = request.form["tokens"]
retriever = request.form["retriever"]
id = request.form["id"]
type = request.form["type"]
job_name = secure_filename(request.form["name"])
tokens = secure_filename(request.form["tokens"])
retriever = secure_filename(request.form["retriever"])
id = secure_filename(request.form["id"])
type = secure_filename(request.form["type"])
remote_data = request.form["remote_data"] if "remote_data" in request.form else None
sync_frequency = request.form["sync_frequency"] if "sync_frequency" in request.form else None
original_file_path = request.form.get("original_file_path")
storage = StorageCreator.get_storage()
index_base_path = f"indexes/{id}"
sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None
save_dir = os.path.join(current_dir, "indexes", str(id))
if settings.VECTOR_STORE == "faiss":
if "file_faiss" not in request.files:
logger.error("No file_faiss part")
print("No file part")
return {"status": "no file"}
file_faiss = request.files["file_faiss"]
if file_faiss.filename == "":
return {"status": "no file name"}
if "file_pkl" not in request.files:
logger.error("No file_pkl part")
print("No file part")
return {"status": "no file"}
file_pkl = request.files["file_pkl"]
if file_pkl.filename == "":
return {"status": "no file name"}
# saves index files
# Save index files to storage
storage.save_file(file_faiss, f"{index_base_path}/index.faiss")
storage.save_file(file_pkl, f"{index_base_path}/index.pkl")
if not os.path.exists(save_dir):
os.makedirs(save_dir)
file_faiss.save(os.path.join(save_dir, "index.faiss"))
file_pkl.save(os.path.join(save_dir, "index.pkl"))
existing_entry = sources_collection.find_one({"_id": ObjectId(id)})
if existing_entry:
@@ -87,7 +82,6 @@ def upload_index_files():
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
"file_path": original_file_path,
}
},
)
@@ -105,7 +99,6 @@ def upload_index_files():
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
"file_path": original_file_path,
}
)
return {"status": "ok"}

File diff suppressed because it is too large Load Diff

View File

@@ -1,18 +1,12 @@
from datetime import timedelta
from application.celery_init import celery
from application.worker import (
agent_webhook_worker,
attachment_worker,
ingest_worker,
remote_worker,
sync_worker,
)
from application.worker import ingest_worker, remote_worker, sync_worker
@celery.task(bind=True)
def ingest(self, directory, formats, job_name, filename, user, dir_name, user_dir):
resp = ingest_worker(self, directory, formats, job_name, filename, user, dir_name, user_dir)
def ingest(self, directory, formats, name_job, filename, user):
resp = ingest_worker(self, directory, formats, name_job, filename, user)
return resp
@@ -28,18 +22,6 @@ def schedule_syncs(self, frequency):
return resp
@celery.task(bind=True)
def store_attachment(self, file_info, user):
resp = attachment_worker(self, file_info, user)
return resp
@celery.task(bind=True)
def process_agent_webhook(self, agent_id, payload):
resp = agent_webhook_worker(self, agent_id, payload)
return resp
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(

View File

@@ -1,24 +1,15 @@
import os
import platform
import uuid
import dotenv
from flask import Flask, jsonify, redirect, request
from jose import jwt
from application.auth import handle_auth
from flask import Flask, redirect, request
from application.api.answer.routes import answer
from application.api.internal.routes import internal
from application.api.user.routes import user
from application.celery_init import celery
from application.core.logging_config import setup_logging
setup_logging()
from application.api.answer.routes import answer # noqa: E402
from application.api.internal.routes import internal # noqa: E402
from application.api.user.routes import user # noqa: E402
from application.celery_init import celery # noqa: E402
from application.core.settings import settings # noqa: E402
from application.extensions import api # noqa: E402
from application.core.settings import settings
from application.extensions import api
if platform.system() == "Windows":
import pathlib
@@ -26,6 +17,7 @@ if platform.system() == "Windows":
pathlib.PosixPath = pathlib.WindowsPath
dotenv.load_dotenv()
setup_logging()
app = Flask(__name__)
app.register_blueprint(user)
@@ -40,25 +32,6 @@ app.config.update(
celery.config_from_object("application.celeryconfig")
api.init_app(app)
if settings.AUTH_TYPE in ("simple_jwt", "session_jwt") and not settings.JWT_SECRET_KEY:
key_file = ".jwt_secret_key"
try:
with open(key_file, "r") as f:
settings.JWT_SECRET_KEY = f.read().strip()
except FileNotFoundError:
new_key = os.urandom(32).hex()
with open(key_file, "w") as f:
f.write(new_key)
settings.JWT_SECRET_KEY = new_key
except Exception as e:
raise RuntimeError(f"Failed to setup JWT_SECRET_KEY: {e}")
SIMPLE_JWT_TOKEN = None
if settings.AUTH_TYPE == "simple_jwt":
payload = {"sub": "local"}
SIMPLE_JWT_TOKEN = jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm="HS256")
print(f"Generated Simple JWT Token: {SIMPLE_JWT_TOKEN}")
@app.route("/")
def home():
@@ -68,47 +41,11 @@ def home():
return "Welcome to DocsGPT Backend!"
@app.route("/api/config")
def get_config():
response = {
"auth_type": settings.AUTH_TYPE,
"requires_auth": settings.AUTH_TYPE in ["simple_jwt", "session_jwt"],
}
return jsonify(response)
@app.route("/api/generate_token")
def generate_token():
if settings.AUTH_TYPE == "session_jwt":
new_user_id = str(uuid.uuid4())
token = jwt.encode(
{"sub": new_user_id}, settings.JWT_SECRET_KEY, algorithm="HS256"
)
return jsonify({"token": token})
return jsonify({"error": "Token generation not allowed in current auth mode"}), 400
@app.before_request
def authenticate_request():
if request.method == "OPTIONS":
return "", 200
decoded_token = handle_auth(request)
if not decoded_token:
request.decoded_token = None
elif "error" in decoded_token:
return jsonify(decoded_token), 401
else:
request.decoded_token = decoded_token
@app.after_request
def after_request(response):
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type,Authorization")
response.headers.add(
"Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS"
)
response.headers.add("Access-Control-Allow-Methods", "GET,PUT,POST,DELETE,OPTIONS")
return response

View File

@@ -1,28 +0,0 @@
from jose import jwt
from application.core.settings import settings
def handle_auth(request, data={}):
if settings.AUTH_TYPE in ["simple_jwt", "session_jwt"]:
jwt_token = request.headers.get("Authorization")
if not jwt_token:
return None
jwt_token = jwt_token.replace("Bearer ", "")
try:
decoded_token = jwt.decode(
jwt_token,
settings.JWT_SECRET_KEY,
algorithms=["HS256"],
options={"verify_exp": False},
)
return decoded_token
except Exception as e:
return {
"message": f"Authentication error: {str(e)}",
"error": "invalid_token",
}
else:
return {"sub": "local"}

View File

@@ -1,92 +1,66 @@
import redis
import time
import json
import logging
import time
from threading import Lock
import redis
from application.core.settings import settings
from application.utils import get_hash
logger = logging.getLogger(__name__)
_redis_instance = None
_redis_creation_failed = False
_instance_lock = Lock()
def get_redis_instance():
global _redis_instance, _redis_creation_failed
if _redis_instance is None and not _redis_creation_failed:
global _redis_instance
if _redis_instance is None:
with _instance_lock:
if _redis_instance is None and not _redis_creation_failed:
if _redis_instance is None:
try:
_redis_instance = redis.Redis.from_url(
settings.CACHE_REDIS_URL, socket_connect_timeout=2
)
except ValueError as e:
logger.error(f"Invalid Redis URL: {e}")
_redis_creation_failed = True # Stop future attempts
_redis_instance = None
_redis_instance = redis.Redis.from_url(settings.CACHE_REDIS_URL, socket_connect_timeout=2)
except redis.ConnectionError as e:
logger.error(f"Redis connection error: {e}")
_redis_instance = None # Keep trying for connection errors
_redis_instance = None
return _redis_instance
def gen_cache_key(messages, model="docgpt", tools=None):
def gen_cache_key(*messages, model="docgpt"):
if not all(isinstance(msg, dict) for msg in messages):
raise ValueError("All messages must be dictionaries.")
messages_str = json.dumps(messages)
tools_str = json.dumps(str(tools)) if tools else ""
combined = f"{model}_{messages_str}_{tools_str}"
messages_str = json.dumps(list(messages), sort_keys=True)
combined = f"{model}_{messages_str}"
cache_key = get_hash(combined)
return cache_key
def gen_cache(func):
def wrapper(self, model, messages, stream, tools=None, *args, **kwargs):
if tools is not None:
return func(self, model, messages, stream, tools, *args, **kwargs)
def wrapper(self, model, messages, *args, **kwargs):
try:
cache_key = gen_cache_key(messages, model, tools)
except ValueError as e:
logger.error(f"Cache key generation failed: {e}")
return func(self, model, messages, stream, tools, *args, **kwargs)
cache_key = gen_cache_key(*messages)
redis_client = get_redis_instance()
if redis_client:
try:
cached_response = redis_client.get(cache_key)
if cached_response:
return cached_response.decode("utf-8")
except Exception as e:
logger.error(f"Error getting cached response: {e}", exc_info=True)
return cached_response.decode('utf-8')
except redis.ConnectionError as e:
logger.error(f"Redis connection error: {e}")
result = func(self, model, messages, stream, tools, *args, **kwargs)
if redis_client and isinstance(result, str):
result = func(self, model, messages, *args, **kwargs)
if redis_client:
try:
redis_client.set(cache_key, result, ex=1800)
except Exception as e:
logger.error(f"Error setting cache: {e}", exc_info=True)
except redis.ConnectionError as e:
logger.error(f"Redis connection error: {e}")
return result
except ValueError as e:
logger.error(e)
return "Error: No user message found in the conversation to generate a cache key."
return wrapper
def stream_cache(func):
def wrapper(self, model, messages, stream, tools=None, *args, **kwargs):
if tools is not None:
yield from func(self, model, messages, stream, tools, *args, **kwargs)
return
try:
cache_key = gen_cache_key(messages, model, tools)
except ValueError as e:
logger.error(f"Cache key generation failed: {e}")
yield from func(self, model, messages, stream, tools, *args, **kwargs)
return
def wrapper(self, model, messages, stream, *args, **kwargs):
cache_key = gen_cache_key(*messages)
logger.info(f"Stream cache key: {cache_key}")
redis_client = get_redis_instance()
if redis_client:
@@ -94,24 +68,26 @@ def stream_cache(func):
cached_response = redis_client.get(cache_key)
if cached_response:
logger.info(f"Cache hit for stream key: {cache_key}")
cached_response = json.loads(cached_response.decode("utf-8"))
cached_response = json.loads(cached_response.decode('utf-8'))
for chunk in cached_response:
yield chunk
time.sleep(0.03) # Simulate streaming delay
time.sleep(0.03)
return
except Exception as e:
logger.error(f"Error getting cached stream: {e}", exc_info=True)
except redis.ConnectionError as e:
logger.error(f"Redis connection error: {e}")
result = func(self, model, messages, stream, *args, **kwargs)
stream_cache_data = []
for chunk in func(self, model, messages, stream, tools, *args, **kwargs):
for chunk in result:
stream_cache_data.append(chunk)
yield chunk
stream_cache_data.append(str(chunk))
if redis_client:
try:
redis_client.set(cache_key, json.dumps(stream_cache_data), ex=1800)
logger.info(f"Stream cache saved for key: {cache_key}")
except Exception as e:
logger.error(f"Error setting stream cache: {e}", exc_info=True)
except redis.ConnectionError as e:
logger.error(f"Redis connection error: {e}")
return wrapper

View File

@@ -2,22 +2,14 @@ from celery import Celery
from application.core.settings import settings
from celery.signals import setup_logging
def make_celery(app_name=__name__):
celery = Celery(
app_name,
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
)
celery = Celery(app_name, broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND)
celery.conf.update(settings)
return celery
@setup_logging.connect
def config_loggers(*args, **kwargs):
from application.core.logging_config import setup_logging
setup_logging()
celery = make_celery()

View File

@@ -1,43 +1,25 @@
import os
from pathlib import Path
from typing import Optional
import os
from pydantic_settings import BaseSettings
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class Settings(BaseSettings):
AUTH_TYPE: Optional[str] = None
LLM_PROVIDER: str = "docsgpt"
LLM_NAME: Optional[str] = (
None # if LLM_PROVIDER is openai, LLM_NAME can be gpt-4 or gpt-3.5-turbo
)
LLM_NAME: str = "docsgpt"
MODEL_NAME: Optional[str] = None # if LLM_NAME is openai, MODEL_NAME can be gpt-4 or gpt-3.5-turbo
EMBEDDINGS_NAME: str = "huggingface_sentence-transformers/all-mpnet-base-v2"
CELERY_BROKER_URL: str = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND: str = "redis://localhost:6379/1"
MONGO_URI: str = "mongodb://localhost:27017/docsgpt"
MONGO_DB_NAME: str = "docsgpt"
LLM_PATH: str = os.path.join(current_dir, "models/docsgpt-7b-f16.gguf")
MODEL_PATH: str = os.path.join(current_dir, "models/docsgpt-7b-f16.gguf")
DEFAULT_MAX_HISTORY: int = 150
LLM_TOKEN_LIMITS: dict = {
"gpt-4o-mini": 128000,
"gpt-3.5-turbo": 4096,
"claude-2": 1e5,
"gemini-2.0-flash-exp": 1e6,
}
MODEL_TOKEN_LIMITS: dict = {"gpt-3.5-turbo": 4096, "claude-2": 1e5}
UPLOAD_FOLDER: str = "inputs"
PARSE_PDF_AS_IMAGE: bool = False
VECTOR_STORE: str = (
"faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb"
)
VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb"
RETRIEVERS_ENABLED: list = ["classic_rag", "duckduck_search"] # also brave_search
AGENT_NAME: str = "classic"
FALLBACK_LLM_PROVIDER: Optional[str] = None # provider for fallback llm
FALLBACK_LLM_NAME: Optional[str] = None # model name for fallback llm
FALLBACK_LLM_API_KEY: Optional[str] = None # api key for fallback llm
# LLM Cache
CACHE_REDIS_URL: str = "redis://localhost:6379/2"
@@ -45,18 +27,12 @@ class Settings(BaseSettings):
API_URL: str = "http://localhost:7091" # backend url for celery worker
API_KEY: Optional[str] = None # LLM api key
EMBEDDINGS_KEY: Optional[str] = (
None # api key for embeddings (if using openai, just copy API_KEY)
)
EMBEDDINGS_KEY: Optional[str] = None # api key for embeddings (if using openai, just copy API_KEY)
OPENAI_API_BASE: Optional[str] = None # azure openai api base url
OPENAI_API_VERSION: Optional[str] = None # azure openai api version
AZURE_DEPLOYMENT_NAME: Optional[str] = None # azure deployment name for answering
AZURE_EMBEDDINGS_DEPLOYMENT_NAME: Optional[str] = (
None # azure deployment name for embeddings
)
OPENAI_BASE_URL: Optional[str] = (
None # openai base url for open ai compatable models
)
AZURE_EMBEDDINGS_DEPLOYMENT_NAME: Optional[str] = None # azure deployment name for embeddings
OPENAI_BASE_URL: Optional[str] = None # openai base url for open ai compatable models
# elasticsearch
ELASTIC_CLOUD_ID: Optional[str] = None # cloud id for elasticsearch
@@ -96,16 +72,10 @@ class Settings(BaseSettings):
# LanceDB vectorstore config
LANCEDB_PATH: str = "/tmp/lancedb" # Path where LanceDB stores its local data
LANCEDB_TABLE_NAME: Optional[str] = (
"docsgpts" # Name of the table to use for storing vectors
)
LANCEDB_TABLE_NAME: Optional[str] = "docsgpts" # Name of the table to use for storing vectors
BRAVE_SEARCH_API_KEY: Optional[str] = None
FLASK_DEBUG_MODE: bool = False
STORAGE_TYPE: str = "local" # local or s3
URL_STRATEGY: str = "backend" # backend or s3
JWT_SECRET_KEY: str = ""
path = Path(__file__).parent.parent.absolute()

View File

@@ -17,7 +17,7 @@ class AnthropicLLM(BaseLLM):
self.AI_PROMPT = AI_PROMPT
def _raw_gen(
self, baseself, model, messages, stream=False, tools=None, max_tokens=300, **kwargs
self, baseself, model, messages, stream=False, max_tokens=300, **kwargs
):
context = messages[0]["content"]
user_question = messages[-1]["content"]
@@ -34,7 +34,7 @@ class AnthropicLLM(BaseLLM):
return completion.completion
def _raw_gen_stream(
self, baseself, model, messages, stream=True, tools=None, max_tokens=300, **kwargs
self, baseself, model, messages, stream=True, max_tokens=300, **kwargs
):
context = messages[0]["content"]
user_question = messages[-1]["content"]

View File

@@ -1,130 +1,29 @@
import logging
from abc import ABC, abstractmethod
from application.cache import gen_cache, stream_cache
from application.core.settings import settings
from application.usage import gen_token_usage, stream_token_usage
logger = logging.getLogger(__name__)
from application.cache import stream_cache, gen_cache
class BaseLLM(ABC):
def __init__(
self,
decoded_token=None,
):
self.decoded_token = decoded_token
def __init__(self):
self.token_usage = {"prompt_tokens": 0, "generated_tokens": 0}
self.fallback_provider = settings.FALLBACK_LLM_PROVIDER
self.fallback_model_name = settings.FALLBACK_LLM_NAME
self.fallback_llm_api_key = settings.FALLBACK_LLM_API_KEY
self._fallback_llm = None
@property
def fallback_llm(self):
"""Lazy-loaded fallback LLM instance."""
if (
self._fallback_llm is None
and self.fallback_provider
and self.fallback_model_name
):
try:
from application.llm.llm_creator import LLMCreator
self._fallback_llm = LLMCreator.create_llm(
self.fallback_provider,
self.fallback_llm_api_key,
None,
self.decoded_token,
)
except Exception as e:
logger.error(
f"Failed to initialize fallback LLM: {str(e)}", exc_info=True
)
return self._fallback_llm
def _execute_with_fallback(
self, method_name: str, decorators: list, *args, **kwargs
):
"""
Unified method execution with fallback support.
Args:
method_name: Name of the raw method ('_raw_gen' or '_raw_gen_stream')
decorators: List of decorators to apply
*args: Positional arguments
**kwargs: Keyword arguments
"""
def decorated_method():
method = getattr(self, method_name)
def _apply_decorator(self, method, decorators, *args, **kwargs):
for decorator in decorators:
method = decorator(method)
return method(self, *args, **kwargs)
try:
return decorated_method()
except Exception as e:
if not self.fallback_llm:
logger.error(f"Primary LLM failed and no fallback available: {str(e)}")
raise
logger.warning(
f"Falling back to {self.fallback_provider}/{self.fallback_model_name}. Error: {str(e)}"
)
fallback_method = getattr(
self.fallback_llm, method_name.replace("_raw_", "")
)
return fallback_method(*args, **kwargs)
def gen(self, model, messages, stream=False, tools=None, *args, **kwargs):
decorators = [gen_token_usage, gen_cache]
return self._execute_with_fallback(
"_raw_gen",
decorators,
model=model,
messages=messages,
stream=stream,
tools=tools,
*args,
**kwargs,
)
def gen_stream(self, model, messages, stream=True, tools=None, *args, **kwargs):
decorators = [stream_cache, stream_token_usage]
return self._execute_with_fallback(
"_raw_gen_stream",
decorators,
model=model,
messages=messages,
stream=stream,
tools=tools,
*args,
**kwargs,
)
@abstractmethod
def _raw_gen(self, model, messages, stream, tools, *args, **kwargs):
def _raw_gen(self, model, messages, stream, *args, **kwargs):
pass
def gen(self, model, messages, stream=False, *args, **kwargs):
decorators = [gen_token_usage, gen_cache]
return self._apply_decorator(self._raw_gen, decorators=decorators, model=model, messages=messages, stream=stream, *args, **kwargs)
@abstractmethod
def _raw_gen_stream(self, model, messages, stream, *args, **kwargs):
pass
def supports_tools(self):
return hasattr(self, "_supports_tools") and callable(
getattr(self, "_supports_tools")
)
def _supports_tools(self):
raise NotImplementedError("Subclass must implement _supports_tools method")
def get_supported_attachment_types(self):
"""
Return a list of MIME types supported by this LLM for file uploads.
Returns:
list: List of supported MIME types
"""
return [] # Default: no attachments supported
def gen_stream(self, model, messages, stream=True, *args, **kwargs):
decorators = [stream_cache, stream_token_usage]
return self._apply_decorator(self._raw_gen_stream, decorators=decorators, model=model, messages=messages, stream=stream, *args, **kwargs)

View File

@@ -1,131 +1,34 @@
import json
from application.core.settings import settings
from application.llm.base import BaseLLM
import json
import requests
class DocsGPTAPILLM(BaseLLM):
def __init__(self, api_key=None, user_api_key=None, *args, **kwargs):
from openai import OpenAI
super().__init__(*args, **kwargs)
self.client = OpenAI(api_key="sk-docsgpt-public", base_url="https://oai.arc53.com")
self.user_api_key = user_api_key
self.api_key = api_key
self.user_api_key = user_api_key
self.endpoint = "https://llm.arc53.com"
def _clean_messages_openai(self, messages):
cleaned_messages = []
for message in messages:
role = message.get("role")
content = message.get("content")
def _raw_gen(self, baseself, model, messages, stream=False, *args, **kwargs):
response = requests.post(
f"{self.endpoint}/answer", json={"messages": messages, "max_new_tokens": 30}
)
response_clean = response.json()["a"].replace("###", "")
if role == "model":
role = "assistant"
return response_clean
if role and content is not None:
if isinstance(content, str):
cleaned_messages.append({"role": role, "content": content})
elif isinstance(content, list):
for item in content:
if "text" in item:
cleaned_messages.append(
{"role": role, "content": item["text"]}
)
elif "function_call" in item:
tool_call = {
"id": item["function_call"]["call_id"],
"type": "function",
"function": {
"name": item["function_call"]["name"],
"arguments": json.dumps(
item["function_call"]["args"]
),
},
}
cleaned_messages.append(
{
"role": "assistant",
"content": None,
"tool_calls": [tool_call],
}
)
elif "function_response" in item:
cleaned_messages.append(
{
"role": "tool",
"tool_call_id": item["function_response"][
"call_id"
],
"content": json.dumps(
item["function_response"]["response"]["result"]
),
}
)
else:
raise ValueError(
f"Unexpected content dictionary format: {item}"
)
else:
raise ValueError(f"Unexpected content type: {type(content)}")
return cleaned_messages
def _raw_gen(
self,
baseself,
model,
messages,
stream=False,
tools=None,
engine=settings.AZURE_DEPLOYMENT_NAME,
**kwargs,
):
messages = self._clean_messages_openai(messages)
if tools:
response = self.client.chat.completions.create(
model="docsgpt",
messages=messages,
stream=stream,
tools=tools,
**kwargs,
)
return response.choices[0]
else:
response = self.client.chat.completions.create(
model="docsgpt", messages=messages, stream=stream, **kwargs
)
return response.choices[0].message.content
def _raw_gen_stream(
self,
baseself,
model,
messages,
def _raw_gen_stream(self, baseself, model, messages, stream=True, *args, **kwargs):
response = requests.post(
f"{self.endpoint}/stream",
json={"messages": messages, "max_new_tokens": 256},
stream=True,
tools=None,
engine=settings.AZURE_DEPLOYMENT_NAME,
**kwargs,
):
messages = self._clean_messages_openai(messages)
if tools:
response = self.client.chat.completions.create(
model="docsgpt",
messages=messages,
stream=stream,
tools=tools,
**kwargs,
)
else:
response = self.client.chat.completions.create(
model="docsgpt", messages=messages, stream=stream, **kwargs
)
for line in response:
if len(line.choices) > 0 and line.choices[0].delta.content is not None and len(line.choices[0].delta.content) > 0:
yield line.choices[0].delta.content
elif len(line.choices) > 0:
yield line.choices[0]
def _supports_tools(self):
return True
for line in response.iter_lines():
if line:
data_str = line.decode("utf-8")
if data_str.startswith("data: "):
data = json.loads(data_str[6:])
yield data["a"]

View File

@@ -1,227 +1,21 @@
from google import genai
from google.genai import types
import logging
import json
from application.llm.base import BaseLLM
from application.storage.storage_creator import StorageCreator
from application.core.settings import settings
class GoogleLLM(BaseLLM):
def __init__(self, api_key=None, user_api_key=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api_key = api_key
self.user_api_key = user_api_key
self.client = genai.Client(api_key=self.api_key)
self.storage = StorageCreator.get_storage()
def get_supported_attachment_types(self):
"""
Return a list of MIME types supported by Google Gemini for file uploads.
Returns:
list: List of supported MIME types
"""
return [
'application/pdf',
'image/png',
'image/jpeg',
'image/jpg',
'image/webp',
'image/gif'
]
def prepare_messages_with_attachments(self, messages, attachments=None):
"""
Process attachments using Google AI's file API for more efficient handling.
Args:
messages (list): List of message dictionaries.
attachments (list): List of attachment dictionaries with content and metadata.
Returns:
list: Messages formatted with file references for Google AI API.
"""
if not attachments:
return messages
prepared_messages = messages.copy()
# Find the user message to attach files to the last one
user_message_index = None
for i in range(len(prepared_messages) - 1, -1, -1):
if prepared_messages[i].get("role") == "user":
user_message_index = i
break
if user_message_index is None:
user_message = {"role": "user", "content": []}
prepared_messages.append(user_message)
user_message_index = len(prepared_messages) - 1
if isinstance(prepared_messages[user_message_index].get("content"), str):
text_content = prepared_messages[user_message_index]["content"]
prepared_messages[user_message_index]["content"] = [
{"type": "text", "text": text_content}
]
elif not isinstance(prepared_messages[user_message_index].get("content"), list):
prepared_messages[user_message_index]["content"] = []
files = []
for attachment in attachments:
mime_type = attachment.get('mime_type')
if mime_type in self.get_supported_attachment_types():
try:
file_uri = self._upload_file_to_google(attachment)
logging.info(f"GoogleLLM: Successfully uploaded file, got URI: {file_uri}")
files.append({"file_uri": file_uri, "mime_type": mime_type})
except Exception as e:
logging.error(f"GoogleLLM: Error uploading file: {e}", exc_info=True)
if 'content' in attachment:
prepared_messages[user_message_index]["content"].append({
"type": "text",
"text": f"[File could not be processed: {attachment.get('path', 'unknown')}]"
})
if files:
logging.info(f"GoogleLLM: Adding {len(files)} files to message")
prepared_messages[user_message_index]["content"].append({
"files": files
})
return prepared_messages
def _upload_file_to_google(self, attachment):
"""
Upload a file to Google AI and return the file URI.
Args:
attachment (dict): Attachment dictionary with path and metadata.
Returns:
str: Google AI file URI for the uploaded file.
"""
if 'google_file_uri' in attachment:
return attachment['google_file_uri']
file_path = attachment.get('path')
if not file_path:
raise ValueError("No file path provided in attachment")
if not self.storage.file_exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
try:
file_uri = self.storage.process_file(
file_path,
lambda local_path, **kwargs: self.client.files.upload(file=local_path).uri
)
from application.core.mongo_db import MongoDB
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
attachments_collection = db["attachments"]
if '_id' in attachment:
attachments_collection.update_one(
{"_id": attachment['_id']},
{"$set": {"google_file_uri": file_uri}}
)
return file_uri
except Exception as e:
logging.error(f"Error uploading file to Google AI: {e}", exc_info=True)
raise
def _clean_messages_google(self, messages):
cleaned_messages = []
for message in messages:
role = message.get("role")
content = message.get("content")
if role == "assistant":
role = "model"
parts = []
if role and content is not None:
if isinstance(content, str):
parts = [types.Part.from_text(text=content)]
elif isinstance(content, list):
for item in content:
if "text" in item:
parts.append(types.Part.from_text(text=item["text"]))
elif "function_call" in item:
parts.append(
types.Part.from_function_call(
name=item["function_call"]["name"],
args=item["function_call"]["args"],
)
)
elif "function_response" in item:
parts.append(
types.Part.from_function_response(
name=item["function_response"]["name"],
response=item["function_response"]["response"],
)
)
elif "files" in item:
for file_data in item["files"]:
parts.append(
types.Part.from_uri(
file_uri=file_data["file_uri"],
mime_type=file_data["mime_type"]
)
)
else:
raise ValueError(
f"Unexpected content dictionary format:{item}"
)
else:
raise ValueError(f"Unexpected content type: {type(content)}")
cleaned_messages.append(types.Content(role=role, parts=parts))
return cleaned_messages
def _clean_tools_format(self, tools_list):
genai_tools = []
for tool_data in tools_list:
if tool_data["type"] == "function":
function = tool_data["function"]
parameters = function["parameters"]
properties = parameters.get("properties", {})
if properties:
genai_function = dict(
name=function["name"],
description=function["description"],
parameters={
"type": "OBJECT",
"properties": {
k: {
**v,
"type": v["type"].upper() if v["type"] else None,
return [
{
"role": "model" if message["role"] == "system" else message["role"],
"parts": [message["content"]],
}
for k, v in properties.items()
},
"required": (
parameters["required"]
if "required" in parameters
else []
),
},
)
else:
genai_function = dict(
name=function["name"],
description=function["description"],
)
genai_tool = types.Tool(function_declarations=[genai_function])
genai_tools.append(genai_tool)
return genai_tools
for message in messages[1:]
]
def _raw_gen(
self,
@@ -229,31 +23,12 @@ class GoogleLLM(BaseLLM):
model,
messages,
stream=False,
tools=None,
formatting="openai",
**kwargs,
**kwargs
):
client = genai.Client(api_key=self.api_key)
if formatting == "openai":
messages = self._clean_messages_google(messages)
config = types.GenerateContentConfig()
if messages[0].role == "system":
config.system_instruction = messages[0].parts[0].text
messages = messages[1:]
if tools:
cleaned_tools = self._clean_tools_format(tools)
config.tools = cleaned_tools
response = client.models.generate_content(
model=model,
contents=messages,
config=config,
)
return response
else:
response = client.models.generate_content(
model=model, contents=messages, config=config
)
import google.generativeai as genai
genai.configure(api_key=self.api_key)
model = genai.GenerativeModel(model, system_instruction=messages[0]["content"])
response = model.generate_content(self._clean_messages_google(messages))
return response.text
def _raw_gen_stream(
@@ -262,52 +37,12 @@ class GoogleLLM(BaseLLM):
model,
messages,
stream=True,
tools=None,
formatting="openai",
**kwargs,
**kwargs
):
client = genai.Client(api_key=self.api_key)
if formatting == "openai":
messages = self._clean_messages_google(messages)
config = types.GenerateContentConfig()
if messages[0].role == "system":
config.system_instruction = messages[0].parts[0].text
messages = messages[1:]
if tools:
cleaned_tools = self._clean_tools_format(tools)
config.tools = cleaned_tools
# Check if we have both tools and file attachments
has_attachments = False
for message in messages:
for part in message.parts:
if hasattr(part, 'file_data') and part.file_data is not None:
has_attachments = True
break
if has_attachments:
break
logging.info(f"GoogleLLM: Starting stream generation. Model: {model}, Messages: {json.dumps(messages, default=str)}, Has attachments: {has_attachments}")
response = client.models.generate_content_stream(
model=model,
contents=messages,
config=config,
)
for chunk in response:
if hasattr(chunk, "candidates") and chunk.candidates:
for candidate in chunk.candidates:
if candidate.content and candidate.content.parts:
for part in candidate.content.parts:
if part.function_call:
yield part
elif part.text:
yield part.text
elif hasattr(chunk, "text"):
yield chunk.text
def _supports_tools(self):
return True
import google.generativeai as genai
genai.configure(api_key=self.api_key)
model = genai.GenerativeModel(model, system_instruction=messages[0]["content"])
response = model.generate_content(self._clean_messages_google(messages), stream=True)
for line in response:
if line.text is not None:
yield line.text

View File

@@ -1,32 +1,45 @@
from application.llm.base import BaseLLM
from openai import OpenAI
class GroqLLM(BaseLLM):
def __init__(self, api_key=None, user_api_key=None, *args, **kwargs):
from openai import OpenAI
super().__init__(*args, **kwargs)
self.client = OpenAI(api_key=api_key, base_url="https://api.groq.com/openai/v1")
self.api_key = api_key
self.user_api_key = user_api_key
def _raw_gen(self, baseself, model, messages, stream=False, tools=None, **kwargs):
if tools:
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, tools=tools, **kwargs
)
return response.choices[0]
else:
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, **kwargs
)
return response.choices[0].message.content
def _raw_gen_stream(
self, baseself, model, messages, stream=True, tools=None, **kwargs
def _raw_gen(
self,
baseself,
model,
messages,
stream=False,
**kwargs
):
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, **kwargs
)
return response.choices[0].message.content
def _raw_gen_stream(
self,
baseself,
model,
messages,
stream=True,
**kwargs
):
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, **kwargs
)
for line in response:
# import sys
# print(line.choices[0].delta.content, file=sys.stderr)
if line.choices[0].delta.content is not None:
yield line.choices[0].delta.content

View File

@@ -1,335 +0,0 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, Generator, List, Optional, Union
from application.logging import build_stack_data
logger = logging.getLogger(__name__)
@dataclass
class ToolCall:
"""Represents a tool/function call from the LLM."""
id: str
name: str
arguments: Union[str, Dict]
index: Optional[int] = None
@classmethod
def from_dict(cls, data: Dict) -> "ToolCall":
"""Create ToolCall from dictionary."""
return cls(
id=data.get("id", ""),
name=data.get("name", ""),
arguments=data.get("arguments", {}),
index=data.get("index"),
)
@dataclass
class LLMResponse:
"""Represents a response from the LLM."""
content: str
tool_calls: List[ToolCall]
finish_reason: str
raw_response: Any
@property
def requires_tool_call(self) -> bool:
"""Check if the response requires tool calls."""
return bool(self.tool_calls) and self.finish_reason == "tool_calls"
class LLMHandler(ABC):
"""Abstract base class for LLM handlers."""
def __init__(self):
self.llm_calls = []
self.tool_calls = []
@abstractmethod
def parse_response(self, response: Any) -> LLMResponse:
"""Parse raw LLM response into standardized format."""
pass
@abstractmethod
def create_tool_message(self, tool_call: ToolCall, result: Any) -> Dict:
"""Create a tool result message for the conversation history."""
pass
@abstractmethod
def _iterate_stream(self, response: Any) -> Generator:
"""Iterate through streaming response chunks."""
pass
def process_message_flow(
self,
agent,
initial_response,
tools_dict: Dict,
messages: List[Dict],
attachments: Optional[List] = None,
stream: bool = False,
) -> Union[str, Generator]:
"""
Main orchestration method for processing LLM message flow.
Args:
agent: The agent instance
initial_response: Initial LLM response
tools_dict: Dictionary of available tools
messages: Conversation history
attachments: Optional attachments
stream: Whether to use streaming
Returns:
Final response or generator for streaming
"""
messages = self.prepare_messages(agent, messages, attachments)
if stream:
return self.handle_streaming(agent, initial_response, tools_dict, messages)
else:
return self.handle_non_streaming(
agent, initial_response, tools_dict, messages
)
def prepare_messages(
self, agent, messages: List[Dict], attachments: Optional[List] = None
) -> List[Dict]:
"""
Prepare messages with attachments and provider-specific formatting.
Args:
agent: The agent instance
messages: Original messages
attachments: List of attachments
Returns:
Prepared messages list
"""
if not attachments:
return messages
logger.info(f"Preparing messages with {len(attachments)} attachments")
supported_types = agent.llm.get_supported_attachment_types()
supported_attachments = [
a for a in attachments if a.get("mime_type") in supported_types
]
unsupported_attachments = [
a for a in attachments if a.get("mime_type") not in supported_types
]
# Process supported attachments with the LLM's custom method
if supported_attachments:
logger.info(
f"Processing {len(supported_attachments)} supported attachments"
)
messages = agent.llm.prepare_messages_with_attachments(
messages, supported_attachments
)
# Process unsupported attachments with default method
if unsupported_attachments:
logger.info(
f"Processing {len(unsupported_attachments)} unsupported attachments"
)
messages = self._append_unsupported_attachments(
messages, unsupported_attachments
)
return messages
def _append_unsupported_attachments(
self, messages: List[Dict], attachments: List[Dict]
) -> List[Dict]:
"""
Default method to append unsupported attachment content to system prompt.
Args:
messages: Current messages
attachments: List of unsupported attachments
Returns:
Updated messages list
"""
prepared_messages = messages.copy()
attachment_texts = []
for attachment in attachments:
logger.info(f"Adding attachment {attachment.get('id')} to context")
if "content" in attachment:
attachment_texts.append(
f"Attached file content:\n\n{attachment['content']}"
)
if attachment_texts:
combined_text = "\n\n".join(attachment_texts)
system_msg = next(
(msg for msg in prepared_messages if msg.get("role") == "system"),
{"role": "system", "content": ""},
)
if system_msg not in prepared_messages:
prepared_messages.insert(0, system_msg)
system_msg["content"] += f"\n\n{combined_text}"
return prepared_messages
def handle_tool_calls(
self, agent, tool_calls: List[ToolCall], tools_dict: Dict, messages: List[Dict]
) -> Generator:
"""
Execute tool calls and update conversation history.
Args:
agent: The agent instance
tool_calls: List of tool calls to execute
tools_dict: Available tools dictionary
messages: Current conversation history
Returns:
Updated messages list
"""
updated_messages = messages.copy()
for call in tool_calls:
try:
self.tool_calls.append(call)
tool_executor_gen = agent._execute_tool_action(tools_dict, call)
while True:
try:
yield next(tool_executor_gen)
except StopIteration as e:
tool_response, call_id = e.value
break
updated_messages.append(
{
"role": "assistant",
"content": [
{
"function_call": {
"name": call.name,
"args": call.arguments,
"call_id": call_id,
}
}
],
}
)
updated_messages.append(self.create_tool_message(call, tool_response))
except Exception as e:
logger.error(f"Error executing tool: {str(e)}", exc_info=True)
updated_messages.append(
{
"role": "tool",
"content": f"Error executing tool: {str(e)}",
"tool_call_id": call.id,
}
)
return updated_messages
def handle_non_streaming(
self, agent, response: Any, tools_dict: Dict, messages: List[Dict]
) -> Generator:
"""
Handle non-streaming response flow.
Args:
agent: The agent instance
response: Current LLM response
tools_dict: Available tools dictionary
messages: Conversation history
Returns:
Final response after processing all tool calls
"""
parsed = self.parse_response(response)
self.llm_calls.append(build_stack_data(agent.llm))
while parsed.requires_tool_call:
tool_handler_gen = self.handle_tool_calls(
agent, parsed.tool_calls, tools_dict, messages
)
while True:
try:
yield next(tool_handler_gen)
except StopIteration as e:
messages = e.value
break
response = agent.llm.gen(
model=agent.gpt_model, messages=messages, tools=agent.tools
)
parsed = self.parse_response(response)
self.llm_calls.append(build_stack_data(agent.llm))
return parsed.content
def handle_streaming(
self, agent, response: Any, tools_dict: Dict, messages: List[Dict]
) -> Generator:
"""
Handle streaming response flow.
Args:
agent: The agent instance
response: Current LLM response
tools_dict: Available tools dictionary
messages: Conversation history
Yields:
Streaming response chunks
"""
buffer = ""
tool_calls = {}
for chunk in self._iterate_stream(response):
if isinstance(chunk, str):
yield chunk
continue
parsed = self.parse_response(chunk)
if parsed.tool_calls:
for call in parsed.tool_calls:
if call.index not in tool_calls:
tool_calls[call.index] = call
else:
existing = tool_calls[call.index]
if call.id:
existing.id = call.id
if call.name:
existing.name = call.name
if call.arguments:
existing.arguments += call.arguments
if parsed.finish_reason == "tool_calls":
tool_handler_gen = self.handle_tool_calls(
agent, list(tool_calls.values()), tools_dict, messages
)
while True:
try:
yield next(tool_handler_gen)
except StopIteration as e:
messages = e.value
break
tool_calls = {}
response = agent.llm.gen_stream(
model=agent.gpt_model, messages=messages, tools=agent.tools
)
self.llm_calls.append(build_stack_data(agent.llm))
yield from self.handle_streaming(agent, response, tools_dict, messages)
return
if parsed.content:
buffer += parsed.content
yield buffer
buffer = ""
if parsed.finish_reason == "stop":
return

View File

@@ -1,78 +0,0 @@
import uuid
from typing import Any, Dict, Generator
from application.llm.handlers.base import LLMHandler, LLMResponse, ToolCall
class GoogleLLMHandler(LLMHandler):
"""Handler for Google's GenAI API."""
def parse_response(self, response: Any) -> LLMResponse:
"""Parse Google response into standardized format."""
if isinstance(response, str):
return LLMResponse(
content=response,
tool_calls=[],
finish_reason="stop",
raw_response=response,
)
if hasattr(response, "candidates"):
parts = response.candidates[0].content.parts if response.candidates else []
tool_calls = [
ToolCall(
id=str(uuid.uuid4()),
name=part.function_call.name,
arguments=part.function_call.args,
)
for part in parts
if hasattr(part, "function_call") and part.function_call is not None
]
content = " ".join(
part.text
for part in parts
if hasattr(part, "text") and part.text is not None
)
return LLMResponse(
content=content,
tool_calls=tool_calls,
finish_reason="tool_calls" if tool_calls else "stop",
raw_response=response,
)
else:
tool_calls = []
if hasattr(response, "function_call"):
tool_calls.append(
ToolCall(
id=str(uuid.uuid4()),
name=response.function_call.name,
arguments=response.function_call.args,
)
)
return LLMResponse(
content=response.text if hasattr(response, "text") else "",
tool_calls=tool_calls,
finish_reason="tool_calls" if tool_calls else "stop",
raw_response=response,
)
def create_tool_message(self, tool_call: ToolCall, result: Any) -> Dict:
"""Create Google-style tool message."""
from google.genai import types
return {
"role": "tool",
"content": [
types.Part.from_function_response(
name=tool_call.name, response={"result": result}
).to_json_dict()
],
}
def _iterate_stream(self, response: Any) -> Generator:
"""Iterate through Google streaming response."""
for chunk in response:
yield chunk

View File

@@ -1,18 +0,0 @@
from application.llm.handlers.base import LLMHandler
from application.llm.handlers.google import GoogleLLMHandler
from application.llm.handlers.openai import OpenAILLMHandler
class LLMHandlerCreator:
handlers = {
"openai": OpenAILLMHandler,
"google": GoogleLLMHandler,
"default": OpenAILLMHandler,
}
@classmethod
def create_handler(cls, llm_type: str, *args, **kwargs) -> LLMHandler:
handler_class = cls.handlers.get(llm_type.lower())
if not handler_class:
raise ValueError(f"No LLM handler class found for type {llm_type}")
return handler_class(*args, **kwargs)

View File

@@ -1,57 +0,0 @@
from typing import Any, Dict, Generator
from application.llm.handlers.base import LLMHandler, LLMResponse, ToolCall
class OpenAILLMHandler(LLMHandler):
"""Handler for OpenAI API."""
def parse_response(self, response: Any) -> LLMResponse:
"""Parse OpenAI response into standardized format."""
if isinstance(response, str):
return LLMResponse(
content=response,
tool_calls=[],
finish_reason="stop",
raw_response=response,
)
message = getattr(response, "message", None) or getattr(response, "delta", None)
tool_calls = []
if hasattr(message, "tool_calls"):
tool_calls = [
ToolCall(
id=getattr(tc, "id", ""),
name=getattr(tc.function, "name", ""),
arguments=getattr(tc.function, "arguments", ""),
index=getattr(tc, "index", None),
)
for tc in message.tool_calls or []
]
return LLMResponse(
content=getattr(message, "content", ""),
tool_calls=tool_calls,
finish_reason=getattr(response, "finish_reason", ""),
raw_response=response,
)
def create_tool_message(self, tool_call: ToolCall, result: Any) -> Dict:
"""Create OpenAI-style tool message."""
return {
"role": "tool",
"content": [
{
"function_response": {
"name": tool_call.name,
"response": {"result": result},
"call_id": tool_call.id,
}
}
],
}
def _iterate_stream(self, response: Any) -> Generator:
"""Iterate through OpenAI streaming response."""
for chunk in response:
yield chunk

View File

@@ -2,7 +2,6 @@ from application.llm.base import BaseLLM
from application.core.settings import settings
import threading
class LlamaSingleton:
_instances = {}
_lock = threading.Lock() # Add a lock for thread synchronization
@@ -30,7 +29,7 @@ class LlamaCpp(BaseLLM):
self,
api_key=None,
user_api_key=None,
llm_name=settings.LLM_PATH,
llm_name=settings.MODEL_PATH,
*args,
**kwargs,
):
@@ -43,18 +42,14 @@ class LlamaCpp(BaseLLM):
context = messages[0]["content"]
user_question = messages[-1]["content"]
prompt = f"### Instruction \n {user_question} \n ### Context \n {context} \n ### Answer \n"
result = LlamaSingleton.query_model(
self.llama, prompt, max_tokens=150, echo=False
)
result = LlamaSingleton.query_model(self.llama, prompt, max_tokens=150, echo=False)
return result["choices"][0]["text"].split("### Answer \n")[-1]
def _raw_gen_stream(self, baseself, model, messages, stream=True, **kwargs):
context = messages[0]["content"]
user_question = messages[-1]["content"]
prompt = f"### Instruction \n {user_question} \n ### Context \n {context} \n ### Answer \n"
result = LlamaSingleton.query_model(
self.llama, prompt, max_tokens=150, echo=False, stream=stream
)
result = LlamaSingleton.query_model(self.llama, prompt, max_tokens=150, echo=False, stream=stream)
for item in result:
for choice in item["choices"]:
yield choice["text"]

View File

@@ -7,7 +7,6 @@ from application.llm.anthropic import AnthropicLLM
from application.llm.docsgpt_provider import DocsGPTAPILLM
from application.llm.premai import PremAILLM
from application.llm.google_ai import GoogleLLM
from application.llm.novita import NovitaLLM
class LLMCreator:
@@ -21,15 +20,12 @@ class LLMCreator:
"docsgpt": DocsGPTAPILLM,
"premai": PremAILLM,
"groq": GroqLLM,
"google": GoogleLLM,
"novita": NovitaLLM,
"google": GoogleLLM
}
@classmethod
def create_llm(cls, type, api_key, user_api_key, decoded_token, *args, **kwargs):
def create_llm(cls, type, api_key, user_api_key, *args, **kwargs):
llm_class = cls.llms.get(type.lower())
if not llm_class:
raise ValueError(f"No LLM class found for type {type}")
return llm_class(
api_key, user_api_key, decoded_token=decoded_token, *args, **kwargs
)
return llm_class(api_key, user_api_key, *args, **kwargs)

View File

@@ -1,32 +0,0 @@
from application.llm.base import BaseLLM
from openai import OpenAI
class NovitaLLM(BaseLLM):
def __init__(self, api_key=None, user_api_key=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = OpenAI(api_key=api_key, base_url="https://api.novita.ai/v3/openai")
self.api_key = api_key
self.user_api_key = user_api_key
def _raw_gen(self, baseself, model, messages, stream=False, tools=None, **kwargs):
if tools:
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, tools=tools, **kwargs
)
return response.choices[0]
else:
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, **kwargs
)
return response.choices[0].message.content
def _raw_gen_stream(
self, baseself, model, messages, stream=True, tools=None, **kwargs
):
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, **kwargs
)
for line in response:
if line.choices[0].delta.content is not None:
yield line.choices[0].delta.content

View File

@@ -1,10 +1,6 @@
import json
import base64
import logging
from application.core.settings import settings
from application.llm.base import BaseLLM
from application.storage.storage_creator import StorageCreator
from application.core.settings import settings
class OpenAILLM(BaseLLM):
@@ -13,82 +9,15 @@ class OpenAILLM(BaseLLM):
from openai import OpenAI
super().__init__(*args, **kwargs)
if isinstance(settings.OPENAI_BASE_URL, str) and settings.OPENAI_BASE_URL.strip():
self.client = OpenAI(api_key=api_key, base_url=settings.OPENAI_BASE_URL)
if settings.OPENAI_BASE_URL:
self.client = OpenAI(
api_key=api_key,
base_url=settings.OPENAI_BASE_URL
)
else:
DEFAULT_OPENAI_API_BASE = "https://api.openai.com/v1"
self.client = OpenAI(api_key=api_key, base_url=DEFAULT_OPENAI_API_BASE)
self.client = OpenAI(api_key=api_key)
self.api_key = api_key
self.user_api_key = user_api_key
self.storage = StorageCreator.get_storage()
def _clean_messages_openai(self, messages):
cleaned_messages = []
for message in messages:
role = message.get("role")
content = message.get("content")
if role == "model":
role = "assistant"
if role and content is not None:
if isinstance(content, str):
cleaned_messages.append({"role": role, "content": content})
elif isinstance(content, list):
for item in content:
if "text" in item:
cleaned_messages.append(
{"role": role, "content": item["text"]}
)
elif "function_call" in item:
tool_call = {
"id": item["function_call"]["call_id"],
"type": "function",
"function": {
"name": item["function_call"]["name"],
"arguments": json.dumps(
item["function_call"]["args"]
),
},
}
cleaned_messages.append(
{
"role": "assistant",
"content": None,
"tool_calls": [tool_call],
}
)
elif "function_response" in item:
cleaned_messages.append(
{
"role": "tool",
"tool_call_id": item["function_response"][
"call_id"
],
"content": json.dumps(
item["function_response"]["response"]["result"]
),
}
)
elif isinstance(item, dict):
content_parts = []
if "text" in item:
content_parts.append({"type": "text", "text": item["text"]})
elif "type" in item and item["type"] == "text" and "text" in item:
content_parts.append(item)
elif "type" in item and item["type"] == "file" and "file" in item:
content_parts.append(item)
elif "type" in item and item["type"] == "image_url" and "image_url" in item:
content_parts.append(item)
cleaned_messages.append({"role": role, "content": content_parts})
else:
raise ValueError(
f"Unexpected content dictionary format: {item}"
)
else:
raise ValueError(f"Unexpected content type: {type(content)}")
return cleaned_messages
def _raw_gen(
self,
@@ -96,24 +25,13 @@ class OpenAILLM(BaseLLM):
model,
messages,
stream=False,
tools=None,
engine=settings.AZURE_DEPLOYMENT_NAME,
**kwargs,
**kwargs
):
messages = self._clean_messages_openai(messages)
if tools:
response = self.client.chat.completions.create(
model=model,
messages=messages,
stream=stream,
tools=tools,
**kwargs,
)
return response.choices[0]
else:
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, **kwargs
)
return response.choices[0].message.content
def _raw_gen_stream(
@@ -122,204 +40,34 @@ class OpenAILLM(BaseLLM):
model,
messages,
stream=True,
tools=None,
engine=settings.AZURE_DEPLOYMENT_NAME,
**kwargs,
**kwargs
):
messages = self._clean_messages_openai(messages)
if tools:
response = self.client.chat.completions.create(
model=model,
messages=messages,
stream=stream,
tools=tools,
**kwargs,
)
else:
response = self.client.chat.completions.create(
model=model, messages=messages, stream=stream, **kwargs
)
for line in response:
if len(line.choices) > 0 and line.choices[0].delta.content is not None and len(line.choices[0].delta.content) > 0:
# import sys
# print(line.choices[0].delta.content, file=sys.stderr)
if line.choices[0].delta.content is not None:
yield line.choices[0].delta.content
elif len(line.choices) > 0:
yield line.choices[0]
def _supports_tools(self):
return True
def get_supported_attachment_types(self):
"""
Return a list of MIME types supported by OpenAI for file uploads.
Returns:
list: List of supported MIME types
"""
return [
'application/pdf',
'image/png',
'image/jpeg',
'image/jpg',
'image/webp',
'image/gif'
]
def prepare_messages_with_attachments(self, messages, attachments=None):
"""
Process attachments using OpenAI's file API for more efficient handling.
Args:
messages (list): List of message dictionaries.
attachments (list): List of attachment dictionaries with content and metadata.
Returns:
list: Messages formatted with file references for OpenAI API.
"""
if not attachments:
return messages
prepared_messages = messages.copy()
# Find the user message to attach file_id to the last one
user_message_index = None
for i in range(len(prepared_messages) - 1, -1, -1):
if prepared_messages[i].get("role") == "user":
user_message_index = i
break
if user_message_index is None:
user_message = {"role": "user", "content": []}
prepared_messages.append(user_message)
user_message_index = len(prepared_messages) - 1
if isinstance(prepared_messages[user_message_index].get("content"), str):
text_content = prepared_messages[user_message_index]["content"]
prepared_messages[user_message_index]["content"] = [
{"type": "text", "text": text_content}
]
elif not isinstance(prepared_messages[user_message_index].get("content"), list):
prepared_messages[user_message_index]["content"] = []
for attachment in attachments:
mime_type = attachment.get('mime_type')
if mime_type and mime_type.startswith('image/'):
try:
base64_image = self._get_base64_image(attachment)
prepared_messages[user_message_index]["content"].append({
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{base64_image}"
}
})
except Exception as e:
logging.error(f"Error processing image attachment: {e}", exc_info=True)
if 'content' in attachment:
prepared_messages[user_message_index]["content"].append({
"type": "text",
"text": f"[Image could not be processed: {attachment.get('path', 'unknown')}]"
})
# Handle PDFs using the file API
elif mime_type == 'application/pdf':
try:
file_id = self._upload_file_to_openai(attachment)
prepared_messages[user_message_index]["content"].append({
"type": "file",
"file": {"file_id": file_id}
})
except Exception as e:
logging.error(f"Error uploading PDF to OpenAI: {e}", exc_info=True)
if 'content' in attachment:
prepared_messages[user_message_index]["content"].append({
"type": "text",
"text": f"File content:\n\n{attachment['content']}"
})
return prepared_messages
def _get_base64_image(self, attachment):
"""
Convert an image file to base64 encoding.
Args:
attachment (dict): Attachment dictionary with path and metadata.
Returns:
str: Base64-encoded image data.
"""
file_path = attachment.get('path')
if not file_path:
raise ValueError("No file path provided in attachment")
try:
with self.storage.get_file(file_path) as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
raise FileNotFoundError(f"File not found: {file_path}")
def _upload_file_to_openai(self, attachment):
"""
Upload a file to OpenAI and return the file_id.
Args:
attachment (dict): Attachment dictionary with path and metadata.
Expected keys:
- path: Path to the file
- id: Optional MongoDB ID for caching
Returns:
str: OpenAI file_id for the uploaded file.
"""
import logging
if 'openai_file_id' in attachment:
return attachment['openai_file_id']
file_path = attachment.get('path')
if not self.storage.file_exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
try:
file_id = self.storage.process_file(
file_path,
lambda local_path, **kwargs: self.client.files.create(
file=open(local_path, 'rb'),
purpose="assistants"
).id
)
from application.core.mongo_db import MongoDB
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
attachments_collection = db["attachments"]
if '_id' in attachment:
attachments_collection.update_one(
{"_id": attachment['_id']},
{"$set": {"openai_file_id": file_id}}
)
return file_id
except Exception as e:
logging.error(f"Error uploading file to OpenAI: {e}", exc_info=True)
raise
class AzureOpenAILLM(OpenAILLM):
def __init__(
self, api_key, user_api_key, *args, **kwargs
self, openai_api_key, openai_api_base, openai_api_version, deployment_name
):
super().__init__(api_key)
super().__init__(openai_api_key)
self.api_base = (settings.OPENAI_API_BASE,)
self.api_version = (settings.OPENAI_API_VERSION,)
self.deployment_name = (settings.AZURE_DEPLOYMENT_NAME,)
from openai import AzureOpenAI
self.client = AzureOpenAI(
api_key=api_key,
api_key=openai_api_key,
api_version=settings.OPENAI_API_VERSION,
azure_endpoint=settings.OPENAI_API_BASE
api_base=settings.OPENAI_API_BASE,
deployment_name=settings.AZURE_DEPLOYMENT_NAME,
)

View File

@@ -76,7 +76,7 @@ class SagemakerAPILLM(BaseLLM):
self.endpoint = settings.SAGEMAKER_ENDPOINT
self.runtime = runtime
def _raw_gen(self, baseself, model, messages, stream=False, tools=None, **kwargs):
def _raw_gen(self, baseself, model, messages, stream=False, **kwargs):
context = messages[0]["content"]
user_question = messages[-1]["content"]
prompt = f"### Instruction \n {user_question} \n ### Context \n {context} \n ### Answer \n"
@@ -105,7 +105,7 @@ class SagemakerAPILLM(BaseLLM):
print(result[0]["generated_text"], file=sys.stderr)
return result[0]["generated_text"][len(prompt) :]
def _raw_gen_stream(self, baseself, model, messages, stream=True, tools=None, **kwargs):
def _raw_gen_stream(self, baseself, model, messages, stream=True, **kwargs):
context = messages[0]["content"]
user_question = messages[-1]["content"]
prompt = f"### Instruction \n {user_question} \n ### Context \n {context} \n ### Answer \n"

View File

@@ -1,154 +0,0 @@
import datetime
import functools
import inspect
import logging
import uuid
from typing import Any, Callable, Dict, Generator, List
from application.core.mongo_db import MongoDB
from application.core.settings import settings
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class LogContext:
def __init__(self, endpoint, activity_id, user, api_key, query):
self.endpoint = endpoint
self.activity_id = activity_id
self.user = user
self.api_key = api_key
self.query = query
self.stacks = []
def build_stack_data(
obj: Any,
include_attributes: List[str] = None,
exclude_attributes: List[str] = None,
custom_data: Dict = None,
) -> Dict:
if obj is None:
raise ValueError("The 'obj' parameter cannot be None")
data = {}
if include_attributes is None:
include_attributes = []
for name, value in inspect.getmembers(obj):
if (
not name.startswith("_")
and not inspect.ismethod(value)
and not inspect.isfunction(value)
):
include_attributes.append(name)
for attr_name in include_attributes:
if exclude_attributes and attr_name in exclude_attributes:
continue
try:
attr_value = getattr(obj, attr_name)
if attr_value is not None:
if isinstance(attr_value, (int, float, str, bool)):
data[attr_name] = attr_value
elif isinstance(attr_value, list):
if all(isinstance(item, dict) for item in attr_value):
data[attr_name] = attr_value
elif all(hasattr(item, "__dict__") for item in attr_value):
data[attr_name] = [item.__dict__ for item in attr_value]
else:
data[attr_name] = [str(item) for item in attr_value]
elif isinstance(attr_value, dict):
data[attr_name] = {k: str(v) for k, v in attr_value.items()}
except AttributeError as e:
logging.warning(f"AttributeError while accessing {attr_name}: {e}")
except AttributeError:
pass
if custom_data:
data.update(custom_data)
return data
def log_activity() -> Callable:
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
activity_id = str(uuid.uuid4())
data = build_stack_data(args[0])
endpoint = data.get("endpoint", "")
user = data.get("user", "local")
api_key = data.get("user_api_key", "")
query = kwargs.get("query", getattr(args[0], "query", ""))
context = LogContext(endpoint, activity_id, user, api_key, query)
kwargs["log_context"] = context
logging.info(
f"Starting activity: {endpoint} - {activity_id} - User: {user}"
)
generator = func(*args, **kwargs)
yield from _consume_and_log(generator, context)
return wrapper
return decorator
def _consume_and_log(generator: Generator, context: "LogContext"):
try:
for item in generator:
yield item
except Exception as e:
logging.exception(f"Error in {context.endpoint} - {context.activity_id}: {e}")
context.stacks.append({"component": "error", "data": {"message": str(e)}})
_log_to_mongodb(
endpoint=context.endpoint,
activity_id=context.activity_id,
user=context.user,
api_key=context.api_key,
query=context.query,
stacks=context.stacks,
level="error",
)
raise
finally:
_log_to_mongodb(
endpoint=context.endpoint,
activity_id=context.activity_id,
user=context.user,
api_key=context.api_key,
query=context.query,
stacks=context.stacks,
level="info",
)
def _log_to_mongodb(
endpoint: str,
activity_id: str,
user: str,
api_key: str,
query: str,
stacks: List[Dict],
level: str,
) -> None:
try:
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
user_logs_collection = db["stack_logs"]
log_entry = {
"endpoint": endpoint,
"id": activity_id,
"level": level,
"user": user,
"api_key": api_key,
"query": query,
"stacks": stacks,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
}
user_logs_collection.insert_one(log_entry)
logging.debug(f"Logged activity to MongoDB: {activity_id}")
except Exception as e:
logging.error(f"Failed to log to MongoDB: {e}", exc_info=True)

View File

@@ -1,5 +1,5 @@
import re
from typing import List, Tuple
from typing import List, Tuple, Union
import logging
from application.parser.schema.base import Document
from application.utils import get_encoding

View File

@@ -19,7 +19,7 @@ def add_text_to_store_with_retry(store, doc, source_id):
doc.metadata["source_id"] = str(source_id)
store.add_texts([doc.page_content], metadatas=[doc.metadata])
except Exception as e:
logging.error(f"Failed to add document with retry: {e}", exc_info=True)
logging.error(f"Failed to add document with retry: {e}")
raise
@@ -61,7 +61,7 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status):
# Process and embed documents
for idx, doc in tqdm(
enumerate(docs),
docs,
desc="Embedding 🦖",
unit="docs",
total=total_docs,
@@ -69,13 +69,13 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status):
):
try:
# Update task status for progress tracking
progress = int(((idx + 1) / total_docs) * 100)
progress = int((idx / total_docs) * 100)
task_status.update_state(state="PROGRESS", meta={"current": progress})
# Add document to vector store
add_text_to_store_with_retry(store, doc, source_id)
except Exception as e:
logging.error(f"Error embedding document {idx}: {e}", exc_info=True)
logging.error(f"Error embedding document {idx}: {e}")
logging.info(f"Saving progress at document {idx} out of {total_docs}")
store.save_local(folder_name)
break

View File

@@ -13,7 +13,6 @@ from application.parser.file.rst_parser import RstParser
from application.parser.file.tabular_parser import PandasCSVParser,ExcelParser
from application.parser.file.json_parser import JSONParser
from application.parser.file.pptx_parser import PPTXParser
from application.parser.file.image_parser import ImageParser
from application.parser.schema.base import Document
DEFAULT_FILE_EXTRACTOR: Dict[str, BaseParser] = {
@@ -28,9 +27,6 @@ DEFAULT_FILE_EXTRACTOR: Dict[str, BaseParser] = {
".mdx": MarkdownParser(),
".json":JSONParser(),
".pptx":PPTXParser(),
".png": ImageParser(),
".jpg": ImageParser(),
".jpeg": ImageParser(),
}
@@ -158,7 +154,7 @@ class SimpleDirectoryReader(BaseReader):
data = f.read()
# Prepare metadata for this file
if self.file_metadata is not None:
file_metadata = self.file_metadata(input_file.name)
file_metadata = self.file_metadata(str(input_file))
else:
# Provide a default empty metadata
file_metadata = {'title': '', 'store': ''}

View File

@@ -7,8 +7,7 @@ from pathlib import Path
from typing import Dict
from application.parser.file.base_parser import BaseParser
from application.core.settings import settings
import requests
class PDFParser(BaseParser):
"""PDF parser."""
@@ -19,32 +18,22 @@ class PDFParser(BaseParser):
def parse_file(self, file: Path, errors: str = "ignore") -> str:
"""Parse file."""
if settings.PARSE_PDF_AS_IMAGE:
doc2md_service = "https://llm.arc53.com/doc2md"
# alternatively you can use local vision capable LLM
with open(file, "rb") as file_loaded:
files = {'file': file_loaded}
response = requests.post(doc2md_service, files=files)
data = response.json()["markdown"]
return data
try:
from pypdf import PdfReader
import PyPDF2
except ImportError:
raise ValueError("pypdf is required to read PDF files.")
raise ValueError("PyPDF2 is required to read PDF files.")
text_list = []
with open(file, "rb") as fp:
# Create a PDF object
pdf = PdfReader(fp)
pdf = PyPDF2.PdfReader(fp)
# Get the number of pages in the PDF document
num_pages = len(pdf.pages)
# Iterate over every page
for page_index in range(num_pages):
for page in range(num_pages):
# Extract the text from the page
page = pdf.pages[page_index]
page_text = page.extract_text()
page_text = pdf.pages[page].extract_text()
text_list.append(page_text)
text = "\n".join(text_list)

View File

@@ -1,27 +0,0 @@
"""Image parser.
Contains parser for .png, .jpg, .jpeg files.
"""
from pathlib import Path
import requests
from typing import Dict, Union
from application.parser.file.base_parser import BaseParser
class ImageParser(BaseParser):
"""Image parser."""
def _init_parser(self) -> Dict:
"""Init parser."""
return {}
def parse_file(self, file: Path, errors: str = "ignore") -> Union[str, list[str]]:
doc2md_service = "https://llm.arc53.com/doc2md"
# alternatively you can use local vision capable LLM
with open(file, "rb") as file_loaded:
files = {'file': file_loaded}
response = requests.post(doc2md_service, files=files)
data = response.json()["markdown"]
return data

View File

@@ -91,25 +91,6 @@ class RstParser(BaseParser):
]
return rst_tups
def chunk_by_token_count(self, text: str, max_tokens: int = 100) -> List[str]:
"""Chunk text by token count."""
avg_token_length = 5
chunk_size = max_tokens * avg_token_length
chunks = []
for i in range(0, len(text), chunk_size):
chunk = text[i:i+chunk_size]
if i + chunk_size < len(text):
last_space = chunk.rfind(' ')
if last_space != -1:
chunk = chunk[:last_space]
chunks.append(chunk.strip())
return chunks
def remove_images(self, content: str) -> str:
pattern = r"\.\. image:: (.*)"
content = re.sub(pattern, "", content)
@@ -155,7 +136,7 @@ class RstParser(BaseParser):
return {}
def parse_tups(
self, filepath: Path, errors: str = "ignore",max_tokens: Optional[int] = 1000
self, filepath: Path, errors: str = "ignore"
) -> List[Tuple[Optional[str], str]]:
"""Parse file into tuples."""
with open(filepath, "r") as f:
@@ -175,15 +156,6 @@ class RstParser(BaseParser):
rst_tups = self.remove_whitespaces_excess(rst_tups)
if self._remove_characters_excess:
rst_tups = self.remove_characters_excess(rst_tups)
# Apply chunking if max_tokens is provided
if max_tokens is not None:
chunked_tups = []
for header, text in rst_tups:
chunks = self.chunk_by_token_count(text, max_tokens)
for idx, chunk in enumerate(chunks):
chunked_tups.append((f"{header} - Chunk {idx + 1}", chunk))
return chunked_tups
return rst_tups
def parse_file(

View File

@@ -74,12 +74,6 @@ class PandasCSVParser(BaseParser):
Set to empty dict by default, this means pandas will try to figure
out the separators, table head, etc. on its own.
header_period (int): Controls how headers are included in output:
- 0: Headers only at the beginning
- 1: Headers in every row
- N > 1: Headers every N rows
header_prefix (str): Prefix for header rows. Default is "HEADERS: ".
"""
def __init__(
@@ -89,8 +83,6 @@ class PandasCSVParser(BaseParser):
col_joiner: str = ", ",
row_joiner: str = "\n",
pandas_config: dict = {},
header_period: int = 20,
header_prefix: str = "HEADERS: ",
**kwargs: Any
) -> None:
"""Init params."""
@@ -99,8 +91,6 @@ class PandasCSVParser(BaseParser):
self._col_joiner = col_joiner
self._row_joiner = row_joiner
self._pandas_config = pandas_config
self._header_period = header_period
self._header_prefix = header_prefix
def _init_parser(self) -> Dict:
"""Init parser."""
@@ -114,26 +104,15 @@ class PandasCSVParser(BaseParser):
raise ValueError("pandas module is required to read CSV files.")
df = pd.read_csv(file, **self._pandas_config)
headers = df.columns.tolist()
header_row = f"{self._header_prefix}{self._col_joiner.join(headers)}"
if not self._concat_rows:
return df.apply(
text_list = df.apply(
lambda row: (self._col_joiner).join(row.astype(str).tolist()), axis=1
).tolist()
text_list = []
if self._header_period != 1:
text_list.append(header_row)
for i, row in df.iterrows():
if (self._header_period > 1 and i > 0 and i % self._header_period == 0):
text_list.append(header_row)
text_list.append(self._col_joiner.join(row.astype(str).tolist()))
if self._header_period == 1 and i < len(df) - 1:
text_list.append(header_row)
return self._row_joiner.join(text_list)
if self._concat_rows:
return (self._row_joiner).join(text_list)
else:
return text_list
class ExcelParser(BaseParser):
@@ -160,12 +139,6 @@ class ExcelParser(BaseParser):
Set to empty dict by default, this means pandas will try to figure
out the table structure on its own.
header_period (int): Controls how headers are included in output:
- 0: Headers only at the beginning (default)
- 1: Headers in every row
- N > 1: Headers every N rows
header_prefix (str): Prefix for header rows. Default is "HEADERS: ".
"""
def __init__(
@@ -175,8 +148,6 @@ class ExcelParser(BaseParser):
col_joiner: str = ", ",
row_joiner: str = "\n",
pandas_config: dict = {},
header_period: int = 20,
header_prefix: str = "HEADERS: ",
**kwargs: Any
) -> None:
"""Init params."""
@@ -185,8 +156,6 @@ class ExcelParser(BaseParser):
self._col_joiner = col_joiner
self._row_joiner = row_joiner
self._pandas_config = pandas_config
self._header_period = header_period
self._header_prefix = header_prefix
def _init_parser(self) -> Dict:
"""Init parser."""
@@ -200,22 +169,12 @@ class ExcelParser(BaseParser):
raise ValueError("pandas module is required to read Excel files.")
df = pd.read_excel(file, **self._pandas_config)
headers = df.columns.tolist()
header_row = f"{self._header_prefix}{self._col_joiner.join(headers)}"
if not self._concat_rows:
return df.apply(
text_list = df.apply(
lambda row: (self._col_joiner).join(row.astype(str).tolist()), axis=1
).tolist()
text_list = []
if self._header_period != 1:
text_list.append(header_row)
for i, row in df.iterrows():
if (self._header_period > 1 and i > 0 and i % self._header_period == 0):
text_list.append(header_row)
text_list.append(self._col_joiner.join(row.astype(str).tolist()))
if self._header_period == 1 and i < len(df) - 1:
text_list.append(header_row)
return self._row_joiner.join(text_list)
if self._concat_rows:
return (self._row_joiner).join(text_list)
else:
return text_list

View File

@@ -0,0 +1,94 @@
from typing import List, Tuple, Union, Optional
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer
import torch
import torch.nn as nn
from application.parser.schema.base import Document
class LateChunker:
def __init__(self, model_name: str, late_tokens: int = 1000, **model_kwargs):
"""
Initialize the LateChunker with a model, tokenizer, and late_tokens limit.
Supports both transformers and sentence-transformers models.
"""
self.late_tokens = late_tokens
self.model_name = model_name
# Load model based on type
if "sentence-transformers" in model_name:
self.model = SentenceTransformer(model_name, **model_kwargs)
self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
self.wrapper_type = "sentence_transformers"
else:
self.model = AutoModel.from_pretrained(model_name, trust_remote_code=True, **model_kwargs)
self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
self.wrapper_type = "transformers"
def tokenize_with_offsets(self, text: str):
"""Tokenize text and return tokens with character offsets."""
tokens = self.tokenizer.encode_plus(
text, return_offsets_mapping=True, add_special_tokens=False
)
return tokens["input_ids"], tokens["offset_mapping"]
def late_chunk_with_embeddings(
self, documents: List[Document]
) -> List[Tuple[str, List[Tuple[int, int]], List[float]]]:
"""
Combines documents into 'super chunks' that fit within `late_tokens` limit.
Outputs each super chunk with span annotations and embeddings.
"""
super_chunks = []
current_super_chunk_text = []
current_token_count = 0
span_annotations = []
for doc in documents:
doc_text = doc.text
input_ids, offsets = self.tokenize_with_offsets(doc_text)
doc_token_count = len(input_ids)
# Check if adding this document exceeds the late_tokens limit
if current_token_count + doc_token_count > self.late_tokens:
# Finalize the current super chunk
combined_text = " ".join(current_super_chunk_text)
embeddings = self.generate_embeddings(combined_text)
super_chunks.append((combined_text, span_annotations, embeddings))
# Reset for a new super chunk
current_super_chunk_text = []
span_annotations = []
current_token_count = 0
# Add document to the current super chunk
start_token = current_token_count
end_token = current_token_count + doc_token_count
span_annotations.append((start_token, end_token))
current_super_chunk_text.append(doc_text)
current_token_count = end_token
# Add the final super chunk if there are remaining documents
if current_super_chunk_text:
combined_text = " ".join(current_super_chunk_text)
embeddings = self.generate_embeddings(combined_text)
super_chunks.append((combined_text, span_annotations, embeddings))
return super_chunks
def generate_embeddings(self, text: str) -> List[float]:
"""Generate embeddings for a given text using the loaded model."""
if self.wrapper_type == "sentence_transformers":
# Sentence-Transformers
embeddings = self.model.encode([text])
return embeddings[0].tolist()
elif self.wrapper_type == "transformers":
# Transformers models
inputs = self.tokenizer(text, return_tensors="pt")
model_output = self.model(**inputs)
return model_output.last_hidden_state.mean(dim=1).squeeze().tolist()
else:
raise ValueError("Unsupported model type for embedding generation.")

View File

@@ -0,0 +1,75 @@
import os
from retry import retry
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator
# from langchain_community.embeddings import HuggingFaceEmbeddings
# from langchain_community.embeddings import HuggingFaceInstructEmbeddings
# from langchain_community.embeddings import CohereEmbeddings
@retry(tries=10, delay=60)
def store_add_texts_with_retry(store, i, id):
# add source_id to the metadata
i.metadata["source_id"] = str(id)
store.add_texts([i.page_content], metadatas=[i.metadata])
# store_pine.add_texts([i.page_content], metadatas=[i.metadata])
def call_openai_api(docs, folder_name, id, task_status):
# Function to create a vector store from the documents and save it to disk
if not os.path.exists(f"{folder_name}"):
os.makedirs(f"{folder_name}")
from tqdm import tqdm
c1 = 0
if settings.VECTOR_STORE == "faiss":
docs_init = [docs[0]]
docs.pop(0)
store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
docs_init=docs_init,
source_id=f"{folder_name}",
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
else:
store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
source_id=str(id),
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
store.delete_index()
# Uncomment for MPNet embeddings
# model_name = "sentence-transformers/all-mpnet-base-v2"
# hf = HuggingFaceEmbeddings(model_name=model_name)
# store = FAISS.from_documents(docs_test, hf)
s1 = len(docs)
for i in tqdm(
docs,
desc="Embedding 🦖",
unit="docs",
total=len(docs),
bar_format="{l_bar}{bar}| Time Left: {remaining}",
):
try:
task_status.update_state(
state="PROGRESS", meta={"current": int((c1 / s1) * 100)}
)
store_add_texts_with_retry(store, i, id)
except Exception as e:
print(e)
print("Error on ", i)
print("Saving progress")
print(f"stopped at {c1} out of {len(docs)}")
store.save_local(f"{folder_name}")
break
c1 += 1
if settings.VECTOR_STORE == "faiss":
store.save_local(f"{folder_name}")

View File

@@ -1,18 +1,17 @@
import logging
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
from application.parser.remote.base import BaseRemote
from application.parser.schema.base import Document
from langchain_community.document_loaders import WebBaseLoader
class CrawlerLoader(BaseRemote):
def __init__(self, limit=10):
from langchain_community.document_loaders import WebBaseLoader
self.loader = WebBaseLoader # Initialize the document loader
self.limit = limit # Set the limit for the number of pages to scrape
def load_data(self, inputs):
url = inputs
# Check if the input is a list and if it is, use the first element
if isinstance(url, list) and url:
url = url[0]
@@ -20,30 +19,25 @@ class CrawlerLoader(BaseRemote):
if not urlparse(url).scheme:
url = "http://" + url
visited_urls = set()
base_url = urlparse(url).scheme + "://" + urlparse(url).hostname
urls_to_visit = [url]
loaded_content = []
visited_urls = set() # Keep track of URLs that have been visited
base_url = urlparse(url).scheme + "://" + urlparse(url).hostname # Extract the base URL
urls_to_visit = [url] # List of URLs to be visited, starting with the initial URL
loaded_content = [] # Store the loaded content from each URL
# Continue crawling until there are no more URLs to visit
while urls_to_visit:
current_url = urls_to_visit.pop(0)
visited_urls.add(current_url)
current_url = urls_to_visit.pop(0) # Get the next URL to visit
visited_urls.add(current_url) # Mark the URL as visited
# Try to load and process the content from the current URL
try:
response = requests.get(current_url)
response.raise_for_status()
loader = self.loader([current_url])
docs = loader.load()
# Convert the loaded documents to your Document schema
for doc in docs:
loaded_content.append(
Document(
doc.page_content,
extra_info=doc.metadata
)
)
response = requests.get(current_url) # Fetch the content of the current URL
response.raise_for_status() # Raise an exception for HTTP errors
loader = self.loader([current_url]) # Initialize the document loader for the current URL
loaded_content.extend(loader.load()) # Load the content and add it to the loaded_content list
except Exception as e:
logging.error(f"Error processing URL {current_url}: {e}", exc_info=True)
# Print an error message if loading or processing fails and continue with the next URL
print(f"Error processing URL {current_url}: {e}")
continue
# Parse the HTML content to extract all links
@@ -51,15 +45,15 @@ class CrawlerLoader(BaseRemote):
all_links = [
urljoin(current_url, a['href'])
for a in soup.find_all('a', href=True)
if base_url in urljoin(current_url, a['href'])
if base_url in urljoin(current_url, a['href']) # Ensure links are from the same domain
]
# Add new links to the list of URLs to visit if they haven't been visited yet
urls_to_visit.extend([link for link in all_links if link not in visited_urls])
urls_to_visit = list(set(urls_to_visit))
urls_to_visit = list(set(urls_to_visit)) # Remove duplicate URLs
# Stop crawling if the limit of pages to scrape is reached
if self.limit is not None and len(visited_urls) >= self.limit:
break
return loaded_content
return loaded_content # Return the loaded content from all visited URLs

View File

@@ -1,139 +0,0 @@
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
from application.parser.remote.base import BaseRemote
import re
from markdownify import markdownify
from application.parser.schema.base import Document
import tldextract
class CrawlerLoader(BaseRemote):
def __init__(self, limit=10, allow_subdomains=False):
"""
Given a URL crawl web pages up to `self.limit`,
convert HTML content to Markdown, and returning a list of Document objects.
:param limit: The maximum number of pages to crawl.
:param allow_subdomains: If True, crawl pages on subdomains of the base domain.
"""
self.limit = limit
self.allow_subdomains = allow_subdomains
self.session = requests.Session()
def load_data(self, inputs):
url = inputs
if isinstance(url, list) and url:
url = url[0]
# Ensure the URL has a scheme (if not, default to http)
if not urlparse(url).scheme:
url = "http://" + url
# Keep track of visited URLs to avoid revisiting the same page
visited_urls = set()
# Determine the base domain for link filtering using tldextract
base_domain = self._get_base_domain(url)
urls_to_visit = {url}
documents = []
while urls_to_visit:
current_url = urls_to_visit.pop()
# Skip if already visited
if current_url in visited_urls:
continue
visited_urls.add(current_url)
# Fetch the page content
html_content = self._fetch_page(current_url)
if html_content is None:
continue
# Convert the HTML to Markdown for cleaner text formatting
title, language, processed_markdown = self._process_html_to_markdown(html_content, current_url)
if processed_markdown:
# Create a Document for each visited page
documents.append(
Document(
processed_markdown, # content
None, # doc_id
None, # embedding
{"source": current_url, "title": title, "language": language} # extra_info
)
)
# Extract links and filter them according to domain rules
new_links = self._extract_links(html_content, current_url)
filtered_links = self._filter_links(new_links, base_domain)
# Add any new, not-yet-visited links to the queue
urls_to_visit.update(link for link in filtered_links if link not in visited_urls)
# If we've reached the limit, stop crawling
if self.limit is not None and len(visited_urls) >= self.limit:
break
return documents
def _fetch_page(self, url):
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"Error fetching URL {url}: {e}")
return None
def _process_html_to_markdown(self, html_content, current_url):
soup = BeautifulSoup(html_content, 'html.parser')
title_tag = soup.find('title')
title = title_tag.text.strip() if title_tag else "No Title"
# Extract language
language_tag = soup.find('html')
language = language_tag.get('lang', 'en') if language_tag else "en"
markdownified = markdownify(html_content, heading_style="ATX", newline_style="BACKSLASH")
# Reduce sequences of more than two newlines to exactly three
markdownified = re.sub(r'\n{3,}', '\n\n\n', markdownified)
return title, language, markdownified
def _extract_links(self, html_content, current_url):
soup = BeautifulSoup(html_content, 'html.parser')
links = []
for a in soup.find_all('a', href=True):
full_url = urljoin(current_url, a['href'])
links.append((full_url, a.text.strip()))
return links
def _get_base_domain(self, url):
extracted = tldextract.extract(url)
# Reconstruct the domain as domain.suffix
base_domain = f"{extracted.domain}.{extracted.suffix}"
return base_domain
def _filter_links(self, links, base_domain):
"""
Filter the extracted links to only include those that match the crawling criteria:
- If allow_subdomains is True, allow any link whose domain ends with the base_domain.
- If allow_subdomains is False, only allow exact matches of the base_domain.
"""
filtered = []
for link, _ in links:
parsed_link = urlparse(link)
if not parsed_link.netloc:
continue
extracted = tldextract.extract(parsed_link.netloc)
link_base = f"{extracted.domain}.{extracted.suffix}"
if self.allow_subdomains:
# For subdomains: sub.example.com ends with example.com
if link_base == base_domain or link_base.endswith("." + base_domain):
filtered.append(link)
else:
# Exact domain match
if link_base == base_domain:
filtered.append(link)
return filtered

View File

@@ -1,4 +1,3 @@
import logging
import requests
import re # Import regular expression library
import xml.etree.ElementTree as ET
@@ -33,7 +32,7 @@ class SitemapLoader(BaseRemote):
documents.extend(loader.load())
processed_urls += 1 # Increment the counter after processing each URL
except Exception as e:
logging.error(f"Error processing URL {url}: {e}", exc_info=True)
print(f"Error processing URL {url}: {e}")
continue
return documents

View File

@@ -1,8 +1,5 @@
import logging
from application.parser.remote.base import BaseRemote
from application.parser.schema.base import Document
from langchain_community.document_loaders import WebBaseLoader
from urllib.parse import urlparse
headers = {
"User-Agent": "Mozilla/5.0",
@@ -26,20 +23,10 @@ class WebLoader(BaseRemote):
urls = [urls]
documents = []
for url in urls:
# Check if the URL scheme is provided, if not, assume http
if not urlparse(url).scheme:
url = "http://" + url
try:
loader = self.loader([url], header_template=headers)
loaded_docs = loader.load()
for doc in loaded_docs:
documents.append(
Document(
doc.page_content,
extra_info=doc.metadata,
)
)
documents.extend(loader.load())
except Exception as e:
logging.error(f"Error processing URL {url}: {e}", exc_info=True)
print(f"Error processing URL {url}: {e}")
continue
return documents

View File

@@ -0,0 +1,79 @@
import re
from math import ceil
from typing import List
import tiktoken
from application.parser.schema.base import Document
def separate_header_and_body(text):
header_pattern = r"^(.*?\n){3}"
match = re.match(header_pattern, text)
header = match.group(0)
body = text[len(header):]
return header, body
def group_documents(documents: List[Document], min_tokens: int, max_tokens: int) -> List[Document]:
docs = []
current_group = None
for doc in documents:
doc_len = len(tiktoken.get_encoding("cl100k_base").encode(doc.text))
# Check if current group is empty or if the document can be added based on token count and matching metadata
if (current_group is None or
(len(tiktoken.get_encoding("cl100k_base").encode(current_group.text)) + doc_len < max_tokens and
doc_len < min_tokens and
current_group.extra_info == doc.extra_info)):
if current_group is None:
current_group = doc # Use the document directly to retain its metadata
else:
current_group.text += " " + doc.text # Append text to the current group
else:
docs.append(current_group)
current_group = doc # Start a new group with the current document
if current_group is not None:
docs.append(current_group)
return docs
def split_documents(documents: List[Document], max_tokens: int) -> List[Document]:
docs = []
for doc in documents:
token_length = len(tiktoken.get_encoding("cl100k_base").encode(doc.text))
if token_length <= max_tokens:
docs.append(doc)
else:
header, body = separate_header_and_body(doc.text)
if len(tiktoken.get_encoding("cl100k_base").encode(header)) > max_tokens:
body = doc.text
header = ""
num_body_parts = ceil(token_length / max_tokens)
part_length = ceil(len(body) / num_body_parts)
body_parts = [body[i:i + part_length] for i in range(0, len(body), part_length)]
for i, body_part in enumerate(body_parts):
new_doc = Document(text=header + body_part.strip(),
doc_id=f"{doc.doc_id}-{i}",
embedding=doc.embedding,
extra_info=doc.extra_info)
docs.append(new_doc)
return docs
def group_split(documents: List[Document], max_tokens: int = 2000, min_tokens: int = 150, token_check: bool = True):
if not token_check:
return documents
print("Grouping small documents")
try:
documents = group_documents(documents=documents, min_tokens=min_tokens, max_tokens=max_tokens)
except Exception:
print("Grouping failed, try running without token_check")
print("Separating large documents")
try:
documents = split_documents(documents=documents, max_tokens=max_tokens)
except Exception:
print("Grouping failed, try running without token_check")
return documents

View File

@@ -1,15 +1,9 @@
You are a helpful AI assistant, DocsGPT. You are proactive and helpful. Try to use tools, if they are available to you,
be proactive and fill in missing information.
Users can Upload documents for your context as attachments or sources via UI using the Conversation input box.
If appropriate, your answers can include code examples, formatted as follows:
You are a DocsGPT, friendly and helpful AI assistant by Arc53 that provides help with documents. You give thorough answers with code examples if possible.
Use the following pieces of context to help answer the users question. If its not relevant to the question, provide friendly responses.
You have access to chat history, and can use it to help answer the question.
When using code examples, use the following format:
```(language)
(code)
```
Users are also able to see charts and diagrams if you use them with valid mermaid syntax in your responses.
Try to respond with mermaid charts if visualization helps with users queries.
You effectively utilize chat history, ensuring relevant and tailored responses.
Try to use additional provided context if it's available, otherwise use your knowledge and tool capabilities.
Allow yourself to be very creative and use your imagination.
----------------
Possible additional context from uploaded sources:
{summaries}

View File

@@ -1,14 +1,9 @@
You are a helpful AI assistant, DocsGPT. You are proactive and helpful. Try to use tools, if they are available to you,
be proactive and fill in missing information.
Users can Upload documents for your context as attachments or sources via UI using the Conversation input box.
You are a helpful AI assistant, DocsGPT, specializing in document assistance, designed to offer detailed and informative responses.
If appropriate, your answers can include code examples, formatted as follows:
```(language)
(code)
```
Users are also able to see charts and diagrams if you use them with valid mermaid syntax in your responses.
Try to respond with mermaid charts if visualization helps with users queries.
You effectively utilize chat history, ensuring relevant and tailored responses.
Try to use additional provided context if it's available, otherwise use your knowledge and tool capabilities.
If a question doesn't align with your context, you provide friendly and helpful replies.
----------------
Possible additional context from uploaded sources:
{summaries}

View File

@@ -1,17 +1,13 @@
You are a helpful AI assistant, DocsGPT. You are proactive and helpful. Try to use tools, if they are available to you,
be proactive and fill in missing information.
Users can Upload documents for your context as attachments or sources via UI using the Conversation input box.
If appropriate, your answers can include code examples, formatted as follows:
You are an AI Assistant, DocsGPT, adept at offering document assistance.
Your expertise lies in providing answer on top of provided context.
You can leverage the chat history if needed.
Answer the question based on the context below.
Keep the answer concise. Respond "Irrelevant context" if not sure about the answer.
If question is not related to the context, respond "Irrelevant context".
When using code examples, use the following format:
```(language)
(code)
```
Users are also able to see charts and diagrams if you use them with valid mermaid syntax in your responses.
Try to respond with mermaid charts if visualization helps with users queries.
You effectively utilize chat history, ensuring relevant and tailored responses.
Use context provided below or use available tools tool capabilities to answer user queries.
If you dont have enough information from the context or tools, answer "I don't know" or "I don't have enough information".
Never make up information or provide false information!
Allow yourself to be very creative and use your imagination.
----------------
Context from uploaded sources:
Context:
{summaries}

View File

@@ -1,3 +0,0 @@
Query: {query}
Observations: {observations}
Now, using the insights from the observations, formulate a well-structured and precise final answer.

View File

@@ -1,13 +0,0 @@
You are an AI assistant and talk like you're thinking out loud. Given the following query, outline a concise thought process that includes key steps and considerations necessary for effective analysis and response. Avoid pointwise formatting. The goal is to break down the query into manageable components without excessive detail, focusing on clarity and logical progression.
Include the following elements in your thought and execution process:
1. Identify the main objective of the query.
2. Determine any relevant context or background information needed to understand the query.
3. List potential approaches or methods to address the query.
4. Highlight any critical factors or constraints that may influence the outcome.
5. Plan with available tools to help you with the analysis but dont execute them. Tools will be executed by another AI.
Query: {query}
Summaries: {summaries}
Prompt: {prompt}
Observations(potentially previous tool calls): {observations}

View File

@@ -1,85 +1,89 @@
anthropic==0.49.0
boto3==1.38.18
beautifulsoup4==4.13.4
celery==5.4.0
anthropic==0.34.2
boto3==1.34.153
beautifulsoup4==4.12.3
celery==5.3.6
dataclasses-json==0.6.7
docx2txt==0.8
duckduckgo-search==7.5.2
duckduckgo-search==6.3.0
ebooklib==0.18
elastic-transport==8.15.0
elasticsearch==8.15.1
escodegen==1.0.11
esprima==4.0.1
esutils==1.0.1
Flask==3.1.1
faiss-cpu==1.9.0.post1
Flask==3.0.3
faiss-cpu==1.8.0.post1
flask-restx==1.3.0
google-genai==1.3.0
gTTS==2.5.4
gTTS==2.3.2
gunicorn==23.0.0
html2text==2024.2.26
javalang==0.13.0
jinja2==3.1.6
jiter==0.8.2
jinja2==3.1.4
jiter==0.5.0
jmespath==1.0.1
joblib==1.4.2
jsonpatch==1.33
jsonpointer==3.0.0
jsonschema==4.23.0
jsonschema-spec==0.2.4
jsonschema-specifications==2023.7.1
kombu==5.4.2
langchain==0.3.20
langchain-community==0.3.19
langchain-core==0.3.59
langchain-openai==0.3.16
langchain-text-splitters==0.3.8
langsmith==0.3.42
langchain==0.3.0
langchain-community==0.3.0
langchain-core==0.3.2
langchain-openai==0.2.0
langchain-text-splitters==0.3.0
langsmith==0.1.125
lazy-object-proxy==1.10.0
lxml==5.3.1
markupsafe==3.0.2
marshmallow==3.26.1
lxml==5.3.0
markupsafe==2.1.5
marshmallow==3.22.0
mpmath==1.3.0
multidict==6.4.3
multidict==6.1.0
mypy-extensions==1.0.0
networkx==3.4.2
numpy==2.2.1
openai==1.78.1
openapi3-parser==1.1.21
orjson==3.10.14
packaging==24.2
networkx==3.3
numpy==1.26.4
openai==1.46.1
openapi-schema-validator==0.6.2
openapi-spec-validator==0.6.0
openapi3-parser==1.1.18
orjson==3.10.7
packaging==24.1
pandas==2.2.3
openpyxl==3.1.5
pathable==0.4.4
pillow==11.1.0
portalocker>=2.7.0,<3.0.0
pathable==0.4.3
pillow==10.4.0
portalocker==2.10.1
prance==23.6.21.0
prompt-toolkit==3.0.51
protobuf==5.29.3
psycopg2-binary==2.9.10
primp==0.6.3
prompt-toolkit==3.0.47
protobuf==5.28.2
py==1.11.0
pydantic==2.10.6
pydantic-core==2.27.2
pydantic-settings==2.7.1
pymongo==4.11.3
pypdf==5.5.0
pydantic==2.9.2
pydantic-core==2.23.4
pydantic-settings==2.4.0
pymongo==4.8.0
pypdf2==3.0.1
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-jose==3.4.0
python-pptx==1.0.2
redis==5.2.1
referencing>=0.28.0,<0.31.0
regex==2024.11.6
qdrant-client==1.11.0
redis==5.0.1
referencing==0.30.2
regex==2024.9.11
requests==2.32.3
retry==0.9.2
sentence-transformers==3.3.1
tiktoken==0.8.0
tokenizers==0.21.0
torch==2.7.0
tqdm==4.67.1
transformers==4.51.3
sentence-transformers==3.0.1
tiktoken==0.7.0
tokenizers==0.19.1
torch==2.4.1
tqdm==4.66.5
transformers==4.44.2
typing-extensions==4.12.2
typing-inspect==0.9.0
tzdata==2024.2
urllib3==2.5.0
urllib3==2.2.3
vine==5.1.0
wcwidth==0.2.13
werkzeug==3.1.3
yarl==1.20.0
markdownify==1.1.0
tldextract==5.1.3
websockets==14.1
werkzeug==3.0.4
yarl==1.11.1

View File

@@ -1,16 +1,16 @@
import json
from langchain_community.tools import BraveSearch
from application.retriever.base import BaseRetriever
from application.core.settings import settings
from application.llm.llm_creator import LLMCreator
from application.retriever.base import BaseRetriever
from application.utils import num_tokens_from_string
from langchain_community.tools import BraveSearch
class BraveRetSearch(BaseRetriever):
def __init__(
self,
question,
source,
chat_history,
prompt,
@@ -18,9 +18,8 @@ class BraveRetSearch(BaseRetriever):
token_limit=150,
gpt_model="docsgpt",
user_api_key=None,
decoded_token=None,
):
self.question = ""
self.question = question
self.source = source
self.chat_history = chat_history
self.prompt = prompt
@@ -29,15 +28,14 @@ class BraveRetSearch(BaseRetriever):
self.token_limit = (
token_limit
if token_limit
< settings.LLM_TOKEN_LIMITS.get(
< settings.MODEL_TOKEN_LIMITS.get(
self.gpt_model, settings.DEFAULT_MAX_HISTORY
)
else settings.LLM_TOKEN_LIMITS.get(
else settings.MODEL_TOKEN_LIMITS.get(
self.gpt_model, settings.DEFAULT_MAX_HISTORY
)
)
self.user_api_key = user_api_key
self.decoded_token = decoded_token
def _get_data(self):
if self.chunks == 0:
@@ -59,7 +57,7 @@ class BraveRetSearch(BaseRetriever):
docs.append({"text": snippet, "title": title, "link": link})
except IndexError:
pass
if settings.LLM_PROVIDER == "llama.cpp":
if settings.LLM_NAME == "llama.cpp":
docs = [docs[0]]
return docs
@@ -74,29 +72,33 @@ class BraveRetSearch(BaseRetriever):
for doc in docs:
yield {"source": doc}
if len(self.chat_history) > 0:
if len(self.chat_history) > 1:
tokens_current_history = 0
# count tokens in history
for i in self.chat_history:
if "prompt" in i and "response" in i:
messages_combine.append({"role": "user", "content": i["prompt"]})
tokens_batch = num_tokens_from_string(i["prompt"]) + num_tokens_from_string(
i["response"]
)
if tokens_current_history + tokens_batch < self.token_limit:
tokens_current_history += tokens_batch
messages_combine.append(
{"role": "assistant", "content": i["response"]}
{"role": "user", "content": i["prompt"]}
)
messages_combine.append(
{"role": "system", "content": i["response"]}
)
messages_combine.append({"role": "user", "content": self.question})
llm = LLMCreator.create_llm(
settings.LLM_PROVIDER,
api_key=settings.API_KEY,
user_api_key=self.user_api_key,
decoded_token=self.decoded_token,
settings.LLM_NAME, api_key=settings.API_KEY, user_api_key=self.user_api_key
)
completion = llm.gen_stream(model=self.gpt_model, messages=messages_combine)
for line in completion:
yield {"answer": str(line)}
def search(self, query: str = ""):
if query:
self.question = query
def search(self):
return self._get_data()
def get_params(self):
@@ -108,5 +110,5 @@ class BraveRetSearch(BaseRetriever):
"chunks": self.chunks,
"token_limit": self.token_limit,
"gpt_model": self.gpt_model,
"user_api_key": self.user_api_key,
"user_api_key": self.user_api_key
}

View File

@@ -1,85 +1,44 @@
import logging
from application.core.settings import settings
from application.llm.llm_creator import LLMCreator
from application.retriever.base import BaseRetriever
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator
from application.llm.llm_creator import LLMCreator
from application.utils import num_tokens_from_string
class ClassicRAG(BaseRetriever):
def __init__(
self,
question,
source,
chat_history=None,
prompt="",
chat_history,
prompt,
chunks=2,
token_limit=150,
gpt_model="docsgpt",
user_api_key=None,
llm_name=settings.LLM_PROVIDER,
api_key=settings.API_KEY,
decoded_token=None,
):
self.original_question = ""
self.chat_history = chat_history if chat_history is not None else []
self.question = question
self.vectorstore = source['active_docs'] if 'active_docs' in source else None
self.chat_history = chat_history
self.prompt = prompt
self.chunks = chunks
self.gpt_model = gpt_model
self.token_limit = (
token_limit
if token_limit
< settings.LLM_TOKEN_LIMITS.get(
< settings.MODEL_TOKEN_LIMITS.get(
self.gpt_model, settings.DEFAULT_MAX_HISTORY
)
else settings.LLM_TOKEN_LIMITS.get(
else settings.MODEL_TOKEN_LIMITS.get(
self.gpt_model, settings.DEFAULT_MAX_HISTORY
)
)
self.user_api_key = user_api_key
self.llm_name = llm_name
self.api_key = api_key
self.llm = LLMCreator.create_llm(
self.llm_name,
api_key=self.api_key,
user_api_key=self.user_api_key,
decoded_token=decoded_token,
)
self.vectorstore = source["active_docs"] if "active_docs" in source else None
self.question = self._rephrase_query()
self.decoded_token = decoded_token
def _rephrase_query(self):
if (
not self.original_question
or not self.chat_history
or self.chat_history == []
or self.chunks == 0
or self.vectorstore is None
):
return self.original_question
prompt = f"""Given the following conversation history:
{self.chat_history}
Rephrase the following user question to be a standalone search query
that captures all relevant context from the conversation:
"""
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": self.original_question},
]
try:
rephrased_query = self.llm.gen(model=self.gpt_model, messages=messages)
print(f"Rephrased query: {rephrased_query}")
return rephrased_query if rephrased_query else self.original_question
except Exception as e:
logging.error(f"Error rephrasing query: {e}", exc_info=True)
return self.original_question
def _get_data(self):
if self.chunks == 0 or self.vectorstore is None:
if self.chunks == 0:
docs = []
else:
docsearch = VectorCreator.create_vectorstore(
@@ -103,22 +62,52 @@ class ClassicRAG(BaseRetriever):
return docs
def gen():
pass
def gen(self):
docs = self._get_data()
def search(self, query: str = ""):
if query:
self.original_question = query
self.question = self._rephrase_query()
# join all page_content together with a newline
docs_together = "\n".join([doc["text"] for doc in docs])
p_chat_combine = self.prompt.replace("{summaries}", docs_together)
messages_combine = [{"role": "system", "content": p_chat_combine}]
for doc in docs:
yield {"source": doc}
if len(self.chat_history) > 1:
tokens_current_history = 0
# count tokens in history
for i in self.chat_history:
if "prompt" in i and "response" in i:
tokens_batch = num_tokens_from_string(i["prompt"]) + num_tokens_from_string(
i["response"]
)
if tokens_current_history + tokens_batch < self.token_limit:
tokens_current_history += tokens_batch
messages_combine.append(
{"role": "user", "content": i["prompt"]}
)
messages_combine.append(
{"role": "system", "content": i["response"]}
)
messages_combine.append({"role": "user", "content": self.question})
llm = LLMCreator.create_llm(
settings.LLM_NAME, api_key=settings.API_KEY, user_api_key=self.user_api_key
)
completion = llm.gen_stream(model=self.gpt_model, messages=messages_combine)
for line in completion:
yield {"answer": str(line)}
def search(self):
return self._get_data()
def get_params(self):
return {
"question": self.original_question,
"rephrased_question": self.question,
"question": self.question,
"source": self.vectorstore,
"chat_history": self.chat_history,
"prompt": self.prompt,
"chunks": self.chunks,
"token_limit": self.token_limit,
"gpt_model": self.gpt_model,
"user_api_key": self.user_api_key,
"user_api_key": self.user_api_key
}

View File

@@ -1,15 +1,16 @@
from langchain_community.tools import DuckDuckGoSearchResults
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
from application.retriever.base import BaseRetriever
from application.core.settings import settings
from application.llm.llm_creator import LLMCreator
from application.retriever.base import BaseRetriever
from application.utils import num_tokens_from_string
from langchain_community.tools import DuckDuckGoSearchResults
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
class DuckDuckSearch(BaseRetriever):
def __init__(
self,
question,
source,
chat_history,
prompt,
@@ -17,9 +18,8 @@ class DuckDuckSearch(BaseRetriever):
token_limit=150,
gpt_model="docsgpt",
user_api_key=None,
decoded_token=None,
):
self.question = ""
self.question = question
self.source = source
self.chat_history = chat_history
self.prompt = prompt
@@ -28,37 +28,53 @@ class DuckDuckSearch(BaseRetriever):
self.token_limit = (
token_limit
if token_limit
< settings.LLM_TOKEN_LIMITS.get(
< settings.MODEL_TOKEN_LIMITS.get(
self.gpt_model, settings.DEFAULT_MAX_HISTORY
)
else settings.LLM_TOKEN_LIMITS.get(
else settings.MODEL_TOKEN_LIMITS.get(
self.gpt_model, settings.DEFAULT_MAX_HISTORY
)
)
self.user_api_key = user_api_key
self.decoded_token = decoded_token
def _parse_lang_string(self, input_string):
result = []
current_item = ""
inside_brackets = False
for char in input_string:
if char == "[":
inside_brackets = True
elif char == "]":
inside_brackets = False
result.append(current_item)
current_item = ""
elif inside_brackets:
current_item += char
if inside_brackets:
result.append(current_item)
return result
def _get_data(self):
if self.chunks == 0:
docs = []
else:
wrapper = DuckDuckGoSearchAPIWrapper(max_results=self.chunks)
search = DuckDuckGoSearchResults(api_wrapper=wrapper, output_format="list")
search = DuckDuckGoSearchResults(api_wrapper=wrapper)
results = search.run(self.question)
results = self._parse_lang_string(results)
docs = []
for i in results:
try:
docs.append(
{
"text": i.get("snippet", "").strip(),
"title": i.get("title", "").strip(),
"link": i.get("link", "").strip(),
}
)
text = i.split("title:")[0]
title = i.split("title:")[1].split("link:")[0]
link = i.split("link:")[1]
docs.append({"text": text, "title": title, "link": link})
except IndexError:
pass
if settings.LLM_PROVIDER == "llama.cpp":
if settings.LLM_NAME == "llama.cpp":
docs = [docs[0]]
return docs
@@ -73,29 +89,33 @@ class DuckDuckSearch(BaseRetriever):
for doc in docs:
yield {"source": doc}
if len(self.chat_history) > 0:
if len(self.chat_history) > 1:
tokens_current_history = 0
# count tokens in history
for i in self.chat_history:
if "prompt" in i and "response" in i:
messages_combine.append({"role": "user", "content": i["prompt"]})
tokens_batch = num_tokens_from_string(i["prompt"]) + num_tokens_from_string(
i["response"]
)
if tokens_current_history + tokens_batch < self.token_limit:
tokens_current_history += tokens_batch
messages_combine.append(
{"role": "assistant", "content": i["response"]}
{"role": "user", "content": i["prompt"]}
)
messages_combine.append(
{"role": "system", "content": i["response"]}
)
messages_combine.append({"role": "user", "content": self.question})
llm = LLMCreator.create_llm(
settings.LLM_PROVIDER,
api_key=settings.API_KEY,
user_api_key=self.user_api_key,
decoded_token=self.decoded_token,
settings.LLM_NAME, api_key=settings.API_KEY, user_api_key=self.user_api_key
)
completion = llm.gen_stream(model=self.gpt_model, messages=messages_combine)
for line in completion:
yield {"answer": str(line)}
def search(self, query: str = ""):
if query:
self.question = query
def search(self):
return self._get_data()
def get_params(self):
@@ -107,5 +127,5 @@ class DuckDuckSearch(BaseRetriever):
"chunks": self.chunks,
"token_limit": self.token_limit,
"gpt_model": self.gpt_model,
"user_api_key": self.user_api_key,
"user_api_key": self.user_api_key
}

View File

@@ -3,18 +3,18 @@ from application.retriever.duckduck_search import DuckDuckSearch
from application.retriever.brave_search import BraveRetSearch
class RetrieverCreator:
retrievers = {
"classic": ClassicRAG,
"duckduck_search": DuckDuckSearch,
"brave_search": BraveRetSearch,
"default": ClassicRAG,
'classic': ClassicRAG,
'duckduck_search': DuckDuckSearch,
'brave_search': BraveRetSearch,
'default': ClassicRAG
}
@classmethod
def create_retriever(cls, type, *args, **kwargs):
retriever_type = (type or "default").lower()
retiever_class = cls.retrievers.get(retriever_type)
retiever_class = cls.retrievers.get(type.lower())
if not retiever_class:
raise ValueError(f"No retievers class found for type {type}")
return retiever_class(*args, **kwargs)

View File

@@ -1,94 +0,0 @@
"""Base storage class for file system abstraction."""
from abc import ABC, abstractmethod
from typing import BinaryIO, List, Callable
class BaseStorage(ABC):
"""Abstract base class for storage implementations."""
@abstractmethod
def save_file(self, file_data: BinaryIO, path: str) -> dict:
"""
Save a file to storage.
Args:
file_data: File-like object containing the data
path: Path where the file should be stored
Returns:
dict: A dictionary containing metadata about the saved file, including:
- 'path': The path where the file was saved
- 'storage_type': The type of storage (e.g., 'local', 's3')
- Other storage-specific metadata (e.g., 'uri', 'bucket_name', etc.)
"""
pass
@abstractmethod
def get_file(self, path: str) -> BinaryIO:
"""
Retrieve a file from storage.
Args:
path: Path to the file
Returns:
BinaryIO: File-like object containing the file data
"""
pass
@abstractmethod
def process_file(self, path: str, processor_func: Callable, **kwargs):
"""
Process a file using the provided processor function.
This method handles the details of retrieving the file and providing
it to the processor function in an appropriate way based on the storage type.
Args:
path: Path to the file
processor_func: Function that processes the file
**kwargs: Additional arguments to pass to the processor function
Returns:
The result of the processor function
"""
pass
@abstractmethod
def delete_file(self, path: str) -> bool:
"""
Delete a file from storage.
Args:
path: Path to the file
Returns:
bool: True if deletion was successful
"""
pass
@abstractmethod
def file_exists(self, path: str) -> bool:
"""
Check if a file exists.
Args:
path: Path to the file
Returns:
bool: True if the file exists
"""
pass
@abstractmethod
def list_files(self, directory: str) -> List[str]:
"""
List all files in a directory.
Args:
directory: Directory path to list
Returns:
List[str]: List of file paths
"""
pass

View File

@@ -1,103 +0,0 @@
"""Local file system implementation."""
import os
import shutil
from typing import BinaryIO, List, Callable
from application.storage.base import BaseStorage
class LocalStorage(BaseStorage):
"""Local file system storage implementation."""
def __init__(self, base_dir: str = None):
"""
Initialize local storage.
Args:
base_dir: Base directory for all operations. If None, uses current directory.
"""
self.base_dir = base_dir or os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
def _get_full_path(self, path: str) -> str:
"""Get absolute path by combining base_dir and path."""
if os.path.isabs(path):
return path
return os.path.join(self.base_dir, path)
def save_file(self, file_data: BinaryIO, path: str) -> dict:
"""Save a file to local storage."""
full_path = self._get_full_path(path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
if hasattr(file_data, 'save'):
file_data.save(full_path)
else:
with open(full_path, 'wb') as f:
shutil.copyfileobj(file_data, f)
return {
'storage_type': 'local'
}
def get_file(self, path: str) -> BinaryIO:
"""Get a file from local storage."""
full_path = self._get_full_path(path)
if not os.path.exists(full_path):
raise FileNotFoundError(f"File not found: {full_path}")
return open(full_path, 'rb')
def delete_file(self, path: str) -> bool:
"""Delete a file from local storage."""
full_path = self._get_full_path(path)
if not os.path.exists(full_path):
return False
os.remove(full_path)
return True
def file_exists(self, path: str) -> bool:
"""Check if a file exists in local storage."""
full_path = self._get_full_path(path)
return os.path.exists(full_path)
def list_files(self, directory: str) -> List[str]:
"""List all files in a directory in local storage."""
full_path = self._get_full_path(directory)
if not os.path.exists(full_path):
return []
result = []
for root, _, files in os.walk(full_path):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), self.base_dir)
result.append(rel_path)
return result
def process_file(self, path: str, processor_func: Callable, **kwargs):
"""
Process a file using the provided processor function.
For local storage, we can directly pass the full path to the processor.
Args:
path: Path to the file
processor_func: Function that processes the file
**kwargs: Additional arguments to pass to the processor function
Returns:
The result of the processor function
"""
full_path = self._get_full_path(path)
if not os.path.exists(full_path):
raise FileNotFoundError(f"File not found: {full_path}")
return processor_func(local_path=full_path, **kwargs)

View File

@@ -1,124 +0,0 @@
"""S3 storage implementation."""
import io
import os
from typing import BinaryIO, Callable, List
import boto3
from application.core.settings import settings
from application.storage.base import BaseStorage
from botocore.exceptions import ClientError
class S3Storage(BaseStorage):
"""AWS S3 storage implementation."""
def __init__(self, bucket_name=None):
"""
Initialize S3 storage.
Args:
bucket_name: S3 bucket name (optional, defaults to settings)
"""
self.bucket_name = bucket_name or getattr(
settings, "S3_BUCKET_NAME", "docsgpt-test-bucket"
)
# Get credentials from settings
aws_access_key_id = getattr(settings, "SAGEMAKER_ACCESS_KEY", None)
aws_secret_access_key = getattr(settings, "SAGEMAKER_SECRET_KEY", None)
region_name = getattr(settings, "SAGEMAKER_REGION", None)
self.s3 = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
)
def save_file(self, file_data: BinaryIO, path: str) -> dict:
"""Save a file to S3 storage."""
self.s3.upload_fileobj(file_data, self.bucket_name, path)
region = getattr(settings, "SAGEMAKER_REGION", None)
return {
"storage_type": "s3",
"bucket_name": self.bucket_name,
"uri": f"s3://{self.bucket_name}/{path}",
"region": region,
}
def get_file(self, path: str) -> BinaryIO:
"""Get a file from S3 storage."""
if not self.file_exists(path):
raise FileNotFoundError(f"File not found: {path}")
file_obj = io.BytesIO()
self.s3.download_fileobj(self.bucket_name, path, file_obj)
file_obj.seek(0)
return file_obj
def delete_file(self, path: str) -> bool:
"""Delete a file from S3 storage."""
try:
self.s3.delete_object(Bucket=self.bucket_name, Key=path)
return True
except ClientError:
return False
def file_exists(self, path: str) -> bool:
"""Check if a file exists in S3 storage."""
try:
self.s3.head_object(Bucket=self.bucket_name, Key=path)
return True
except ClientError:
return False
def list_files(self, directory: str) -> List[str]:
"""List all files in a directory in S3 storage."""
# Ensure directory ends with a slash if it's not empty
if directory and not directory.endswith("/"):
directory += "/"
result = []
paginator = self.s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.bucket_name, Prefix=directory)
for page in pages:
if "Contents" in page:
for obj in page["Contents"]:
result.append(obj["Key"])
return result
def process_file(self, path: str, processor_func: Callable, **kwargs):
"""
Process a file using the provided processor function.
Args:
path: Path to the file
processor_func: Function that processes the file
**kwargs: Additional arguments to pass to the processor function
Returns:
The result of the processor function
"""
import logging
import tempfile
if not self.file_exists(path):
raise FileNotFoundError(f"File not found in S3: {path}")
with tempfile.NamedTemporaryFile(
suffix=os.path.splitext(path)[1], delete=True
) as temp_file:
try:
# Download the file from S3 to the temporary file
self.s3.download_fileobj(self.bucket_name, path, temp_file)
temp_file.flush()
return processor_func(local_path=temp_file.name, **kwargs)
except Exception as e:
logging.error(f"Error processing S3 file {path}: {e}", exc_info=True)
raise

View File

@@ -1,32 +0,0 @@
"""Storage factory for creating different storage implementations."""
from typing import Dict, Type
from application.storage.base import BaseStorage
from application.storage.local import LocalStorage
from application.storage.s3 import S3Storage
from application.core.settings import settings
class StorageCreator:
storages: Dict[str, Type[BaseStorage]] = {
"local": LocalStorage,
"s3": S3Storage,
}
_instance = None
@classmethod
def get_storage(cls) -> BaseStorage:
if cls._instance is None:
storage_type = getattr(settings, "STORAGE_TYPE", "local")
cls._instance = cls.create_storage(storage_type)
return cls._instance
@classmethod
def create_storage(cls, type_name: str, *args, **kwargs) -> BaseStorage:
storage_class = cls.storages.get(type_name.lower())
if not storage_class:
raise ValueError(f"No storage implementation found for type {type_name}")
return storage_class(*args, **kwargs)

View File

@@ -1,84 +1,29 @@
import asyncio
import websockets
import json
import base64
from io import BytesIO
import base64
from application.tts.base import BaseTTS
class ElevenlabsTTS(BaseTTS):
def __init__(self):
self.api_key = 'ELEVENLABS_API_KEY'# here you should put your api key
self.model = "eleven_flash_v2_5"
self.voice = "VOICE_ID" # this is the hash code for the voice not the name!
self.write_audio = 1
from elevenlabs.client import ElevenLabs
self.client = ElevenLabs(
api_key="ELEVENLABS_API_KEY",
)
def text_to_speech(self, text):
asyncio.run(self._text_to_speech_websocket(text))
lang = "en"
audio = self.client.generate(
text=text,
model="eleven_multilingual_v2",
voice="Brian",
)
audio_data = BytesIO()
for chunk in audio:
audio_data.write(chunk)
audio_bytes = audio_data.getvalue()
async def _text_to_speech_websocket(self, text):
uri = f"wss://api.elevenlabs.io/v1/text-to-speech/{self.voice}/stream-input?model_id={self.model}"
websocket = await websockets.connect(uri)
payload = {
"text": " ",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.8,
},
"xi_api_key": self.api_key,
}
await websocket.send(json.dumps(payload))
async def listen():
while 1:
try:
msg = await websocket.recv()
data = json.loads(msg)
if data.get("audio"):
print("audio received")
yield base64.b64decode(data["audio"])
elif data.get("isFinal"):
break
except websockets.exceptions.ConnectionClosed:
print("websocket closed")
break
listen_task = asyncio.create_task(self.stream(listen()))
await websocket.send(json.dumps({"text": text}))
# this is to signal the end of the text, either use this or flush
await websocket.send(json.dumps({"text": ""}))
await listen_task
async def stream(self, audio_stream):
if self.write_audio:
audio_bytes = BytesIO()
async for chunk in audio_stream:
if chunk:
audio_bytes.write(chunk)
with open("output_audio.mp3", "wb") as f:
f.write(audio_bytes.getvalue())
else:
async for chunk in audio_stream:
pass # depends on the streamer!
def test_elevenlabs_websocket():
"""
Tests the ElevenlabsTTS text_to_speech method with a sample prompt.
Prints out the base64-encoded result and writes it to 'output_audio.mp3'.
"""
# Instantiate your TTS class
tts = ElevenlabsTTS()
# Call the method with some sample text
tts.text_to_speech("Hello from ElevenLabs WebSocket!")
print("Saved audio to output_audio.mp3.")
if __name__ == "__main__":
test_elevenlabs_websocket()
# Encode to base64
audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")
return audio_base64, lang

View File

@@ -1,24 +1,17 @@
import sys
from datetime import datetime
from application.core.mongo_db import MongoDB
from application.core.settings import settings
from application.utils import num_tokens_from_object_or_list, num_tokens_from_string
from application.utils import num_tokens_from_string
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
db = mongo["docsgpt"]
usage_collection = db["token_usage"]
def update_token_usage(decoded_token, user_api_key, token_usage):
def update_token_usage(user_api_key, token_usage):
if "pytest" in sys.modules:
return
if decoded_token:
user_id = decoded_token["sub"]
else:
user_id = None
usage_data = {
"user_id": user_id,
"api_key": user_api_key,
"prompt_tokens": token_usage["prompt_tokens"],
"generated_tokens": token_usage["generated_tokens"],
@@ -28,38 +21,28 @@ def update_token_usage(decoded_token, user_api_key, token_usage):
def gen_token_usage(func):
def wrapper(self, model, messages, stream, tools, **kwargs):
def wrapper(self, model, messages, stream, **kwargs):
for message in messages:
if message["content"]:
self.token_usage["prompt_tokens"] += num_tokens_from_string(
message["content"]
)
result = func(self, model, messages, stream, tools, **kwargs)
if isinstance(result, str):
self.token_usage["prompt_tokens"] += num_tokens_from_string(message["content"])
result = func(self, model, messages, stream, **kwargs)
self.token_usage["generated_tokens"] += num_tokens_from_string(result)
else:
self.token_usage["generated_tokens"] += num_tokens_from_object_or_list(
result
)
update_token_usage(self.decoded_token, self.user_api_key, self.token_usage)
update_token_usage(self.user_api_key, self.token_usage)
return result
return wrapper
def stream_token_usage(func):
def wrapper(self, model, messages, stream, tools, **kwargs):
def wrapper(self, model, messages, stream, **kwargs):
for message in messages:
self.token_usage["prompt_tokens"] += num_tokens_from_string(
message["content"]
)
self.token_usage["prompt_tokens"] += num_tokens_from_string(message["content"])
batch = []
result = func(self, model, messages, stream, tools, **kwargs)
result = func(self, model, messages, stream, **kwargs)
for r in result:
batch.append(r)
yield r
for line in batch:
self.token_usage["generated_tokens"] += num_tokens_from_string(line)
update_token_usage(self.decoded_token, self.user_api_key, self.token_usage)
update_token_usage(self.user_api_key, self.token_usage)
return wrapper

View File

@@ -1,12 +1,6 @@
import hashlib
import os
import re
import uuid
import tiktoken
import hashlib
from flask import jsonify, make_response
from werkzeug.utils import secure_filename
from application.core.settings import settings
_encoding = None
@@ -19,49 +13,10 @@ def get_encoding():
return _encoding
def safe_filename(filename):
"""
Creates a safe filename that preserves the original extension.
Uses secure_filename, but ensures a proper filename is returned even with non-Latin characters.
Args:
filename (str): The original filename
Returns:
str: A safe filename that can be used for storage
"""
if not filename:
return str(uuid.uuid4())
_, extension = os.path.splitext(filename)
safe_name = secure_filename(filename)
# If secure_filename returns just the extension or an empty string
if not safe_name or safe_name == extension.lstrip("."):
return f"{str(uuid.uuid4())}{extension}"
return safe_name
def num_tokens_from_string(string: str) -> int:
encoding = get_encoding()
if isinstance(string, str):
num_tokens = len(encoding.encode(string))
return num_tokens
else:
return 0
def num_tokens_from_object_or_list(thing):
if isinstance(thing, list):
return sum([num_tokens_from_object_or_list(x) for x in thing])
elif isinstance(thing, dict):
return sum([num_tokens_from_object_or_list(x) for x in thing.values()])
elif isinstance(thing, str):
return num_tokens_from_string(thing)
else:
return 0
def count_tokens_docs(docs):
@@ -89,63 +44,5 @@ def check_required_fields(data, required_fields):
def get_hash(data):
return hashlib.md5(data.encode(), usedforsecurity=False).hexdigest()
return hashlib.md5(data.encode()).hexdigest()
def limit_chat_history(history, max_token_limit=None, gpt_model="docsgpt"):
"""
Limits chat history based on token count.
Returns a list of messages that fit within the token limit.
"""
from application.core.settings import settings
max_token_limit = (
max_token_limit
if max_token_limit
and max_token_limit
< settings.LLM_TOKEN_LIMITS.get(gpt_model, settings.DEFAULT_MAX_HISTORY)
else settings.LLM_TOKEN_LIMITS.get(gpt_model, settings.DEFAULT_MAX_HISTORY)
)
if not history:
return []
trimmed_history = []
tokens_current_history = 0
for message in reversed(history):
tokens_batch = 0
if "prompt" in message and "response" in message:
tokens_batch += num_tokens_from_string(message["prompt"])
tokens_batch += num_tokens_from_string(message["response"])
if "tool_calls" in message:
for tool_call in message["tool_calls"]:
tool_call_string = f"Tool: {tool_call.get('tool_name')} | Action: {tool_call.get('action_name')} | Args: {tool_call.get('arguments')} | Response: {tool_call.get('result')}"
tokens_batch += num_tokens_from_string(tool_call_string)
if tokens_current_history + tokens_batch < max_token_limit:
tokens_current_history += tokens_batch
trimmed_history.insert(0, message)
else:
break
return trimmed_history
def validate_function_name(function_name):
"""Validates if a function name matches the allowed pattern."""
if not re.match(r"^[a-zA-Z0-9_-]+$", function_name):
return False
return True
def generate_image_url(image_path):
strategy = getattr(settings, "URL_STRATEGY", "backend")
if strategy == "s3":
bucket_name = getattr(settings, "S3_BUCKET_NAME", "docsgpt-test-bucket")
region_name = getattr(settings, "SAGEMAKER_REGION", "eu-central-1")
return f"https://{bucket_name}.s3.{region_name}.amazonaws.com/{image_path}"
else:
base_url = getattr(settings, "API_URL", "http://localhost:7091")
return f"{base_url}/api/images/{image_path}"

View File

@@ -75,9 +75,9 @@ class BaseVectorStore(ABC):
openai_api_key=embeddings_key
)
elif embeddings_name == "huggingface_sentence-transformers/all-mpnet-base-v2":
if os.path.exists("./models/all-mpnet-base-v2"):
if os.path.exists("./model/all-mpnet-base-v2"):
embedding_instance = EmbeddingsSingleton.get_instance(
embeddings_name = "./models/all-mpnet-base-v2",
embeddings_name="./model/all-mpnet-base-v2",
)
else:
embedding_instance = EmbeddingsSingleton.get_instance(
@@ -87,4 +87,3 @@ class BaseVectorStore(ABC):
embedding_instance = EmbeddingsSingleton.get_instance(embeddings_name)
return embedding_instance

View File

@@ -1,6 +1,9 @@
from application.vectorstore.base import BaseVectorStore
from application.core.settings import settings
from application.vectorstore.document_class import Document
import elasticsearch
class ElasticsearchStore(BaseVectorStore):
@@ -23,7 +26,8 @@ class ElasticsearchStore(BaseVectorStore):
else:
raise ValueError("Please provide either elasticsearch_url or cloud_id.")
import elasticsearch
ElasticsearchStore._es_connection = elasticsearch.Elasticsearch(**connection_params)
self.docsearch = ElasticsearchStore._es_connection
@@ -151,6 +155,8 @@ class ElasticsearchStore(BaseVectorStore):
**kwargs,
):
from elasticsearch.helpers import BulkIndexError, bulk
bulk_kwargs = bulk_kwargs or {}
import uuid
embeddings = []
@@ -183,7 +189,6 @@ class ElasticsearchStore(BaseVectorStore):
if len(requests) > 0:
from elasticsearch.helpers import BulkIndexError, bulk
try:
success, failed = bulk(
self._es_connection,

View File

@@ -1,64 +1,30 @@
import os
import tempfile
from langchain_community.vectorstores import FAISS
from application.core.settings import settings
from application.parser.schema.base import Document
from application.vectorstore.base import BaseVectorStore
from application.storage.storage_creator import StorageCreator
from application.core.settings import settings
import os
def get_vectorstore(path: str) -> str:
if path:
vectorstore = f"indexes/{path}"
vectorstore = os.path.join("application", "indexes", path)
else:
vectorstore = "indexes"
vectorstore = os.path.join("application")
return vectorstore
class FaissStore(BaseVectorStore):
def __init__(self, source_id: str, embeddings_key: str, docs_init=None):
super().__init__()
self.source_id = source_id
self.path = get_vectorstore(source_id)
self.embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key)
self.storage = StorageCreator.get_storage()
embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key)
try:
if docs_init:
self.docsearch = FAISS.from_documents(docs_init, self.embeddings)
self.docsearch = FAISS.from_documents(docs_init, embeddings)
else:
with tempfile.TemporaryDirectory() as temp_dir:
faiss_path = f"{self.path}/index.faiss"
pkl_path = f"{self.path}/index.pkl"
self.docsearch = FAISS.load_local(self.path, embeddings, allow_dangerous_deserialization=True)
except Exception:
raise
if not self.storage.file_exists(
faiss_path
) or not self.storage.file_exists(pkl_path):
raise FileNotFoundError(
f"Index files not found in storage at {self.path}"
)
faiss_file = self.storage.get_file(faiss_path)
pkl_file = self.storage.get_file(pkl_path)
local_faiss_path = os.path.join(temp_dir, "index.faiss")
local_pkl_path = os.path.join(temp_dir, "index.pkl")
with open(local_faiss_path, "wb") as f:
f.write(faiss_file.read())
with open(local_pkl_path, "wb") as f:
f.write(pkl_file.read())
self.docsearch = FAISS.load_local(
temp_dir, self.embeddings, allow_dangerous_deserialization=True
)
except Exception as e:
raise Exception(f"Error loading FAISS index: {str(e)}")
self.assert_embedding_dimensions(self.embeddings)
self.assert_embedding_dimensions(embeddings)
def search(self, *args, **kwargs):
return self.docsearch.similarity_search(*args, **kwargs)
@@ -74,42 +40,11 @@ class FaissStore(BaseVectorStore):
def assert_embedding_dimensions(self, embeddings):
"""Check that the word embedding dimension of the docsearch index matches the dimension of the word embeddings used."""
if (
settings.EMBEDDINGS_NAME
== "huggingface_sentence-transformers/all-mpnet-base-v2"
):
word_embedding_dimension = getattr(embeddings, "dimension", None)
if settings.EMBEDDINGS_NAME == "huggingface_sentence-transformers/all-mpnet-base-v2":
word_embedding_dimension = getattr(embeddings, 'dimension', None)
if word_embedding_dimension is None:
raise AttributeError(
"'dimension' attribute not found in embeddings instance."
)
raise AttributeError("'dimension' attribute not found in embeddings instance.")
docsearch_index_dimension = self.docsearch.index.d
if word_embedding_dimension != docsearch_index_dimension:
raise ValueError(
f"Embedding dimension mismatch: embeddings.dimension ({word_embedding_dimension}) != docsearch index dimension ({docsearch_index_dimension})"
)
def get_chunks(self):
chunks = []
if self.docsearch:
for doc_id, doc in self.docsearch.docstore._dict.items():
chunk_data = {
"doc_id": doc_id,
"text": doc.page_content,
"metadata": doc.metadata,
}
chunks.append(chunk_data)
return chunks
def add_chunk(self, text, metadata=None):
metadata = metadata or {}
doc = Document(text=text, extra_info=metadata).to_langchain_format()
doc_id = self.docsearch.add_documents([doc])
self.save_local(self.path)
return doc_id
def delete_chunk(self, chunk_id):
self.delete_index([chunk_id])
self.save_local(self.path)
return True
raise ValueError(f"Embedding dimension mismatch: embeddings.dimension ({word_embedding_dimension}) != docsearch index dimension ({docsearch_index_dimension})")

View File

@@ -1,4 +1,3 @@
import logging
from application.core.settings import settings
from application.vectorstore.base import BaseVectorStore
from application.vectorstore.document_class import Document
@@ -125,53 +124,3 @@ class MongoDBVectorStore(BaseVectorStore):
def delete_index(self, *args, **kwargs):
self._collection.delete_many({"source_id": self._source_id})
def get_chunks(self):
try:
chunks = []
cursor = self._collection.find({"source_id": self._source_id})
for doc in cursor:
doc_id = str(doc.get("_id"))
text = doc.get(self._text_key)
metadata = {
k: v
for k, v in doc.items()
if k
not in ["_id", self._text_key, self._embedding_key, "source_id"]
}
if text:
chunks.append(
{"doc_id": doc_id, "text": text, "metadata": metadata}
)
return chunks
except Exception as e:
logging.error(f"Error getting chunks: {e}", exc_info=True)
return []
def add_chunk(self, text, metadata=None):
metadata = metadata or {}
embeddings = self._embedding.embed_documents([text])
if not embeddings:
raise ValueError("Could not generate embedding for chunk")
chunk_data = {
self._text_key: text,
self._embedding_key: embeddings[0],
"source_id": self._source_id,
**metadata,
}
result = self._collection.insert_one(chunk_data)
return str(result.inserted_id)
def delete_chunk(self, chunk_id):
try:
from bson.objectid import ObjectId
object_id = ObjectId(chunk_id)
result = self._collection.delete_one({"_id": object_id})
return result.deleted_count > 0
except Exception as e:
logging.error(f"Error deleting chunk: {e}", exc_info=True)
return False

View File

@@ -1,12 +1,11 @@
from langchain_community.vectorstores.qdrant import Qdrant
from application.vectorstore.base import BaseVectorStore
from application.core.settings import settings
from qdrant_client import models
class QdrantStore(BaseVectorStore):
def __init__(self, source_id: str = "", embeddings_key: str = "embeddings"):
from qdrant_client import models
from langchain_community.vectorstores.qdrant import Qdrant
self._filter = models.Filter(
must=[
models.FieldCondition(

View File

@@ -1,37 +1,25 @@
import datetime
import json
import logging
import mimetypes
import os
import shutil
import string
import tempfile
import zipfile
from collections import Counter
from urllib.parse import urljoin
import requests
from bson.dbref import DBRef
from bson.objectid import ObjectId
from application.agents.agent_creator import AgentCreator
from application.api.answer.routes import get_prompt
from application.core.mongo_db import MongoDB
from application.core.settings import settings
from application.parser.chunking import Chunker
from application.parser.embedding_pipeline import embed_and_store_documents
from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.open_ai_func import call_openai_api
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.schema.base import Document
from application.retriever.retriever_creator import RetrieverCreator
from application.storage.storage_creator import StorageCreator
from application.utils import count_tokens_docs, num_tokens_from_string
from application.parser.token_func import group_split
from application.utils import count_tokens_docs
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
db = mongo["docsgpt"]
sources_collection = db["sources"]
# Constants
@@ -39,22 +27,18 @@ MIN_TOKENS = 150
MAX_TOKENS = 1250
RECURSION_DEPTH = 2
# Define a function to extract metadata from a given filename.
def metadata_from_filename(title):
return {"title": title}
# Define a function to generate a random string of a given length.
def generate_random_string(length):
return "".join([string.ascii_letters[i % 52] for i in range(length)])
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
"""
Recursively extract zip files with a limit on recursion depth.
@@ -74,7 +58,7 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
zip_ref.extractall(extract_to)
os.remove(zip_path) # Remove the zip file after extracting
except Exception as e:
logging.error(f"Error extracting zip file {zip_path}: {e}", exc_info=True)
logging.error(f"Error extracting zip file {zip_path}: {e}")
return
# Check for nested zip files and extract them
@@ -85,7 +69,6 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
file_path = os.path.join(root, file)
extract_zip_recursive(file_path, root, current_depth + 1, max_depth)
def download_file(url, params, dest_path):
try:
response = requests.get(url, params=params)
@@ -96,7 +79,6 @@ def download_file(url, params, dest_path):
logging.error(f"Error downloading file: {e}")
raise
def upload_index(full_path, file_data):
try:
if settings.VECTOR_STORE == "faiss":
@@ -105,9 +87,7 @@ def upload_index(full_path, file_data):
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"),
files=files,
data=file_data,
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
)
else:
response = requests.post(
@@ -122,79 +102,9 @@ def upload_index(full_path, file_data):
for file in files.values():
file.close()
def run_agent_logic(agent_config, input_data):
try:
source = agent_config.get("source")
retriever = agent_config.get("retriever", "classic")
if isinstance(source, DBRef):
source_doc = db.dereference(source)
source = str(source_doc["_id"])
retriever = source_doc.get("retriever", agent_config.get("retriever"))
else:
source = {}
source = {"active_docs": source}
chunks = int(agent_config.get("chunks", 2))
prompt_id = agent_config.get("prompt_id", "default")
user_api_key = agent_config["key"]
agent_type = agent_config.get("agent_type", "classic")
decoded_token = {"sub": agent_config.get("user")}
prompt = get_prompt(prompt_id)
agent = AgentCreator.create_agent(
agent_type,
endpoint="webhook",
llm_name=settings.LLM_PROVIDER,
gpt_model=settings.LLM_NAME,
api_key=settings.API_KEY,
user_api_key=user_api_key,
prompt=prompt,
chat_history=[],
decoded_token=decoded_token,
attachments=[],
)
retriever = RetrieverCreator.create_retriever(
retriever,
source=source,
chat_history=[],
prompt=prompt,
chunks=chunks,
token_limit=settings.DEFAULT_MAX_HISTORY,
gpt_model=settings.LLM_NAME,
user_api_key=user_api_key,
decoded_token=decoded_token,
)
answer = agent.gen(query=input_data, retriever=retriever)
response_full = ""
thought = ""
source_log_docs = []
tool_calls = []
for line in answer:
if "answer" in line:
response_full += str(line["answer"])
elif "sources" in line:
source_log_docs.extend(line["sources"])
elif "tool_calls" in line:
tool_calls.extend(line["tool_calls"])
elif "thought" in line:
thought += line["thought"]
result = {
"answer": response_full,
"sources": source_log_docs,
"tool_calls": tool_calls,
"thought": thought,
}
logging.info(f"Agent response: {result}")
return result
except Exception as e:
logging.error(f"Error in run_agent_logic: {e}", exc_info=True)
raise
# Define the main function for ingesting and processing documents.
def ingest_worker(
self, directory, formats, job_name, filename, user, dir_name=None, user_dir=None, retriever="classic"
self, directory, formats, name_job, filename, user, retriever="classic"
):
"""
Ingest and process documents.
@@ -203,11 +113,9 @@ def ingest_worker(
self: Reference to the instance of the task.
directory (str): Specifies the directory for ingesting ('inputs' or 'temp').
formats (list of str): List of file extensions to consider for ingestion (e.g., [".rst", ".md"]).
job_name (str): Name of the job for this ingestion task (original, unsanitized).
name_job (str): Name of the job for this ingestion task.
filename (str): Name of the file to be ingested.
user (str): Identifier for the user initiating the ingestion (original, unsanitized).
dir_name (str, optional): Sanitized directory name for filesystem operations.
user_dir (str, optional): Sanitized user ID for filesystem operations.
user (str): Identifier for the user initiating the ingestion.
retriever (str): Type of retriever to use for processing the documents.
Returns:
@@ -218,99 +126,71 @@ def ingest_worker(
limit = None
exclude = True
sample = False
token_check = True
full_path = os.path.join(directory, user, name_job)
storage = StorageCreator.get_storage()
logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job})
file_data = {"name": name_job, "file": filename, "user": user}
full_path = os.path.join(directory, user_dir, dir_name)
source_file_path = os.path.join(full_path, filename)
if not os.path.exists(full_path):
os.makedirs(full_path)
download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename))
logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": job_name})
# Create temporary working directory
with tempfile.TemporaryDirectory() as temp_dir:
try:
os.makedirs(temp_dir, exist_ok=True)
# Download file from storage to temp directory
temp_file_path = os.path.join(temp_dir, filename)
file_data = storage.get_file(source_file_path)
with open(temp_file_path, "wb") as f:
f.write(file_data.read())
# check if file is .zip and extract it
if filename.endswith(".zip"):
extract_zip_recursive(
os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH
)
self.update_state(state="PROGRESS", meta={"current": 1})
# Handle zip files
if filename.endswith(".zip"):
logging.info(f"Extracting zip file: {filename}")
extract_zip_recursive(
temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH
)
if sample:
logging.info(f"Sample mode enabled. Using {limit} documents.")
reader = SimpleDirectoryReader(
input_dir=temp_dir,
raw_docs = SimpleDirectoryReader(
input_dir=full_path,
input_files=input_files,
recursive=recursive,
required_exts=formats,
num_files_limit=limit,
exclude_hidden=exclude,
file_metadata=metadata_from_filename,
)
raw_docs = reader.load_data()
chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
).load_data()
raw_docs = group_split(
documents=raw_docs,
min_tokens=MIN_TOKENS,
duplicate_headers=False,
max_tokens=MAX_TOKENS,
token_check=token_check,
)
raw_docs = chunker.chunk(documents=raw_docs)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
id = ObjectId()
vector_store_path = os.path.join(temp_dir, "vector_store")
os.makedirs(vector_store_path, exist_ok=True)
embed_and_store_documents(docs, vector_store_path, id, self)
call_openai_api(docs, full_path, id, self)
tokens = count_tokens_docs(docs)
self.update_state(state="PROGRESS", meta={"current": 100})
if sample:
for i in range(min(5, len(raw_docs))):
logging.info(f"Sample document {i}: {raw_docs[i]}")
file_data = {
"name": job_name, # Use original job_name
"file": filename,
"user": user, # Use original user
file_data.update({
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": "local",
"original_file_path": source_file_path,
}
})
upload_index(full_path, file_data)
upload_index(vector_store_path, file_data)
except Exception as e:
logging.error(f"Error in ingest_worker: {e}", exc_info=True)
raise
# delete local
shutil.rmtree(full_path)
return {
"directory": directory,
"formats": formats,
"name_job": job_name, # Use original job_name
"name_job": name_job,
"filename": filename,
"user": user, # Use original user
"user": user,
"limited": False,
}
def remote_worker(
self,
source_data,
@@ -323,37 +203,35 @@ def remote_worker(
operation_mode="upload",
doc_id=None,
):
token_check = True
full_path = os.path.join(directory, user, name_job)
if not os.path.exists(full_path):
os.makedirs(full_path)
self.update_state(state="PROGRESS", meta={"current": 1})
try:
logging.info("Initializing remote loader with type: %s", loader)
logging.info(
f"Remote job: {full_path}",
extra={"user": user, "job": name_job, "source_data": source_data},
)
remote_loader = RemoteCreator.create_loader(loader)
raw_docs = remote_loader.load_data(source_data)
chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
docs = group_split(
documents=raw_docs,
min_tokens=MIN_TOKENS,
duplicate_headers=False,
max_tokens=MAX_TOKENS,
token_check=token_check,
)
docs = chunker.chunk(documents=raw_docs)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
tokens = count_tokens_docs(docs)
logging.info("Total tokens calculated: %d", tokens)
if operation_mode == "upload":
id = ObjectId()
embed_and_store_documents(docs, full_path, id, self)
call_openai_api(docs, full_path, id, self)
elif operation_mode == "sync":
if not doc_id or not ObjectId.is_valid(doc_id):
logging.error("Invalid doc_id provided for sync operation: %s", doc_id)
raise ValueError("doc_id must be provided for sync operation.")
id = ObjectId(doc_id)
embed_and_store_documents(docs, full_path, id, self)
call_openai_api(docs, full_path, id, self)
self.update_state(state="PROGRESS", meta={"current": 100})
file_data = {
@@ -368,18 +246,10 @@ def remote_worker(
}
upload_index(full_path, file_data)
except Exception as e:
logging.error("Error in remote_worker task: %s", str(e), exc_info=True)
raise
finally:
if os.path.exists(full_path):
shutil.rmtree(full_path)
logging.info("remote_worker task completed successfully")
return {"urls": source_data, "name_job": name_job, "user": user, "limited": False}
def sync(
self,
source_data,
@@ -405,11 +275,10 @@ def sync(
doc_id,
)
except Exception as e:
logging.error(f"Error during sync: {e}", exc_info=True)
logging.error(f"Error during sync: {e}")
return {"status": "error", "error": str(e)}
return {"status": "success"}
def sync_worker(self, frequency):
sync_counts = Counter()
sources = sources_collection.find()
@@ -433,122 +302,3 @@ def sync_worker(self, frequency):
key: sync_counts[key]
for key in ["total_sync_count", "sync_success", "sync_failure"]
}
def attachment_worker(self, file_info, user):
"""
Process and store a single attachment without vectorization.
"""
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
attachments_collection = db["attachments"]
filename = file_info["filename"]
attachment_id = file_info["attachment_id"]
relative_path = file_info["path"]
metadata = file_info.get("metadata", {})
try:
self.update_state(state="PROGRESS", meta={"current": 10})
storage = StorageCreator.get_storage()
self.update_state(
state="PROGRESS", meta={"current": 30, "status": "Processing content"}
)
content = storage.process_file(
relative_path,
lambda local_path, **kwargs: SimpleDirectoryReader(
input_files=[local_path], exclude_hidden=True, errors="ignore"
)
.load_data()[0]
.text,
)
token_count = num_tokens_from_string(content)
self.update_state(
state="PROGRESS", meta={"current": 80, "status": "Storing in database"}
)
mime_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
doc_id = ObjectId(attachment_id)
attachments_collection.insert_one(
{
"_id": doc_id,
"user": user,
"path": relative_path,
"filename": filename,
"content": content,
"token_count": token_count,
"mime_type": mime_type,
"date": datetime.datetime.now(),
"metadata": metadata,
}
)
logging.info(
f"Stored attachment with ID: {attachment_id}", extra={"user": user}
)
self.update_state(state="PROGRESS", meta={"current": 100, "status": "Complete"})
return {
"filename": filename,
"path": relative_path,
"token_count": token_count,
"attachment_id": attachment_id,
"mime_type": mime_type,
"metadata": metadata,
}
except Exception as e:
logging.error(
f"Error processing file {filename}: {e}",
extra={"user": user},
exc_info=True,
)
raise
def agent_webhook_worker(self, agent_id, payload):
"""
Process the webhook payload for an agent.
Args:
self: Reference to the instance of the task.
agent_id (str): Unique identifier for the agent.
payload (dict): The payload data from the webhook.
Returns:
dict: Information about the processed webhook.
"""
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
agents_collection = db["agents"]
self.update_state(state="PROGRESS", meta={"current": 1})
try:
agent_oid = ObjectId(agent_id)
agent_config = agents_collection.find_one({"_id": agent_oid})
if not agent_config:
raise ValueError(f"Agent with ID {agent_id} not found.")
input_data = json.dumps(payload)
except Exception as e:
logging.error(f"Error processing agent webhook: {e}", exc_info=True)
return {"status": "error", "error": str(e)}
self.update_state(state="PROGRESS", meta={"current": 50})
try:
result = run_agent_logic(agent_config, input_data)
except Exception as e:
logging.error(f"Error running agent logic: {e}", exc_info=True)
return {"status": "error", "error": str(e)}
finally:
self.update_state(state="PROGRESS", meta={"current": 100})
logging.info(
f"Webhook processed for agent {agent_id}", extra={"agent_id": agent_id}
)
return {"status": "success", "result": result}

View File

@@ -1,19 +0,0 @@
name: docsgpt-oss
services:
redis:
image: redis:6-alpine
ports:
- 6379:6379
mongo:
image: mongo:6
ports:
- 27017:27017
volumes:
- mongodb_data_container:/data/db
volumes:
mongodb_data_container:

View File

@@ -1,11 +0,0 @@
version: "3.8"
services:
ollama:
image: ollama/ollama
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
volumes:
ollama_data:

View File

@@ -1,16 +0,0 @@
version: "3.8"
services:
ollama:
image: ollama/ollama
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
deploy:
resources:
reservations:
devices:
- capabilities: [gpu]
volumes:
ollama_data:

View File

@@ -1,6 +1,6 @@
services:
frontend:
build: ../frontend
build: ./frontend
environment:
- VITE_API_HOST=http://localhost:7091
- VITE_API_STREAMING=$VITE_API_STREAMING
@@ -10,7 +10,7 @@ services:
- backend
backend:
build: ../application
build: ./application
environment:
- API_KEY=$OPENAI_API_KEY
- EMBEDDINGS_KEY=$OPENAI_API_KEY
@@ -25,15 +25,15 @@ services:
ports:
- "7091:7091"
volumes:
- ../application/indexes:/app/application/indexes
- ../application/inputs:/app/application/inputs
- ../application/vectors:/app/application/vectors
- ./application/indexes:/app/application/indexes
- ./application/inputs:/app/application/inputs
- ./application/vectors:/app/application/vectors
depends_on:
- redis
- mongo
worker:
build: ../application
build: ./application
command: celery -A application.app.celery worker -l INFO
environment:
- API_KEY=$OPENAI_API_KEY

View File

@@ -1,8 +1,8 @@
services:
frontend:
build: ../frontend
build: ./frontend
volumes:
- ../frontend/src:/app/src
- ./frontend/src:/app/src
environment:
- VITE_API_HOST=http://localhost:7091
- VITE_API_STREAMING=$VITE_API_STREAMING

Some files were not shown because too many files have changed in this diff Show More