mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 16:43:16 +00:00
152 lines
4.7 KiB
Python
152 lines
4.7 KiB
Python
import datetime
|
|
import functools
|
|
import inspect
|
|
|
|
import logging
|
|
import uuid
|
|
from typing import Any, Callable, Dict, Generator, List
|
|
|
|
from application.core.mongo_db import MongoDB
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
|
|
class LogContext:
|
|
def __init__(self, endpoint, activity_id, user, api_key, query):
|
|
self.endpoint = endpoint
|
|
self.activity_id = activity_id
|
|
self.user = user
|
|
self.api_key = api_key
|
|
self.query = query
|
|
self.stacks = []
|
|
|
|
|
|
def build_stack_data(
|
|
obj: Any,
|
|
include_attributes: List[str] = None,
|
|
exclude_attributes: List[str] = None,
|
|
custom_data: Dict = None,
|
|
) -> Dict:
|
|
data = {}
|
|
if include_attributes is None:
|
|
include_attributes = []
|
|
for name, value in inspect.getmembers(obj):
|
|
if (
|
|
not name.startswith("_")
|
|
and not inspect.ismethod(value)
|
|
and not inspect.isfunction(value)
|
|
):
|
|
include_attributes.append(name)
|
|
for attr_name in include_attributes:
|
|
if exclude_attributes and attr_name in exclude_attributes:
|
|
continue
|
|
try:
|
|
attr_value = getattr(obj, attr_name)
|
|
if attr_value is not None:
|
|
if isinstance(attr_value, (int, float, str, bool)):
|
|
data[attr_name] = attr_value
|
|
elif isinstance(attr_value, list):
|
|
if all(isinstance(item, dict) for item in attr_value):
|
|
data[attr_name] = attr_value
|
|
elif all(hasattr(item, "__dict__") for item in attr_value):
|
|
data[attr_name] = [item.__dict__ for item in attr_value]
|
|
else:
|
|
data[attr_name] = [str(item) for item in attr_value]
|
|
elif isinstance(attr_value, dict):
|
|
data[attr_name] = {k: str(v) for k, v in attr_value.items()}
|
|
else:
|
|
data[attr_name] = str(attr_value)
|
|
except AttributeError:
|
|
pass
|
|
if custom_data:
|
|
data.update(custom_data)
|
|
return data
|
|
|
|
|
|
def log_activity() -> Callable:
|
|
def decorator(func: Callable) -> Callable:
|
|
@functools.wraps(func)
|
|
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
|
activity_id = str(uuid.uuid4())
|
|
data = build_stack_data(args[0])
|
|
endpoint = data.get("endpoint", "")
|
|
user = data.get("user", "local")
|
|
api_key = data.get("user_api_key", "")
|
|
query = kwargs.get("query", getattr(args[0], "query", ""))
|
|
|
|
context = LogContext(endpoint, activity_id, user, api_key, query)
|
|
kwargs["log_context"] = context
|
|
|
|
logging.info(
|
|
f"Starting activity: {endpoint} - {activity_id} - User: {user}"
|
|
)
|
|
|
|
generator = func(*args, **kwargs)
|
|
yield from _consume_and_log(generator, context)
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def _consume_and_log(generator: Generator, context: "LogContext"):
|
|
try:
|
|
for item in generator:
|
|
yield item
|
|
except Exception as e:
|
|
logging.exception(f"Error in {context.endpoint} - {context.activity_id}: {e}")
|
|
context.stacks.append({"component": "error", "data": {"message": str(e)}})
|
|
_log_to_mongodb(
|
|
endpoint=context.endpoint,
|
|
activity_id=context.activity_id,
|
|
user=context.user,
|
|
api_key=context.api_key,
|
|
query=context.query,
|
|
stacks=context.stacks,
|
|
level="error",
|
|
)
|
|
raise
|
|
finally:
|
|
_log_to_mongodb(
|
|
endpoint=context.endpoint,
|
|
activity_id=context.activity_id,
|
|
user=context.user,
|
|
api_key=context.api_key,
|
|
query=context.query,
|
|
stacks=context.stacks,
|
|
level="info",
|
|
)
|
|
|
|
|
|
def _log_to_mongodb(
|
|
endpoint: str,
|
|
activity_id: str,
|
|
user: str,
|
|
api_key: str,
|
|
query: str,
|
|
stacks: List[Dict],
|
|
level: str,
|
|
) -> None:
|
|
try:
|
|
mongo = MongoDB.get_client()
|
|
db = mongo["docsgpt"]
|
|
user_logs_collection = db["stack_logs"]
|
|
|
|
log_entry = {
|
|
"endpoint": endpoint,
|
|
"id": activity_id,
|
|
"level": level,
|
|
"user": user,
|
|
"api_key": api_key,
|
|
"query": query,
|
|
"stacks": stacks,
|
|
"timestamp": datetime.datetime.now(datetime.timezone.utc),
|
|
}
|
|
user_logs_collection.insert_one(log_entry)
|
|
logging.debug(f"Logged activity to MongoDB: {activity_id}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Failed to log to MongoDB: {e}")
|