"""@private""" import json import logging from base64 import b64encode from typing import Any, List, Union import httpx from langfuse.serializer import EventSerializer class LangfuseClient: _public_key: str _secret_key: str _base_url: str _version: str _timeout: int _session: httpx.Client def __init__( self, public_key: str, secret_key: str, base_url: str, version: str, timeout: int, session: httpx.Client, ): self._public_key = public_key self._secret_key = secret_key self._base_url = base_url self._version = version self._timeout = timeout self._session = session def generate_headers(self): return { "Authorization": "Basic " + b64encode( f"{self._public_key}:{self._secret_key}".encode("utf-8") ).decode("ascii"), "Content-Type": "application/json", "x_langfuse_sdk_name": "python", "x_langfuse_sdk_version": self._version, "x_langfuse_public_key": self._public_key, } def batch_post(self, **kwargs) -> httpx.Response: """Post the `kwargs` to the batch API endpoint for events""" log = logging.getLogger("langfuse") log.debug("uploading data: %s", kwargs) res = self.post(**kwargs) return self._process_response( res, success_message="data uploaded successfully", return_json=False ) def post(self, **kwargs) -> httpx.Response: """Post the `kwargs` to the API""" log = logging.getLogger("langfuse") url = self._remove_trailing_slash(self._base_url) + "/api/public/ingestion" data = json.dumps(kwargs, cls=EventSerializer) log.debug("making request: %s to %s", data, url) headers = self.generate_headers() res = self._session.post( url, content=data, headers=headers, timeout=self._timeout ) if res.status_code == 200: log.debug("data uploaded successfully") return res def _remove_trailing_slash(self, url: str) -> str: """Removes the trailing slash from a URL""" if url.endswith("/"): return url[:-1] return url def _process_response( self, res: httpx.Response, success_message: str, *, return_json: bool = True ) -> Union[httpx.Response, Any]: log = logging.getLogger("langfuse") log.debug("received response: %s", res.text) if res.status_code in (200, 201): log.debug(success_message) if return_json: try: return res.json() except json.JSONDecodeError: log.error("Response is not valid JSON.") raise APIError(res.status_code, "Invalid JSON response received") else: return res elif res.status_code == 207: try: payload = res.json() errors = payload.get("errors", []) if errors: raise APIErrors( [ APIError( error.get("status"), error.get("message", "No message provided"), error.get("error", "No error details provided"), ) for error in errors ] ) else: return res.json() if return_json else res except json.JSONDecodeError: log.error("Response is not valid JSON.") raise APIError(res.status_code, "Invalid JSON response received") try: payload = res.json() log.error("received error response: %s", payload) raise APIError(res.status_code, payload) except (KeyError, ValueError): raise APIError(res.status_code, res.text) class APIError(Exception): def __init__(self, status: Union[int, str], message: str, details: Any = None): self.message = message self.status = status self.details = details def __str__(self): msg = "{0} ({1}): {2}" return msg.format(self.message, self.status, self.details) class APIErrors(Exception): def __init__(self, errors: List[APIError]): self.errors = errors def __str__(self): errors = ", ".join(str(error) for error in self.errors) return f"[Langfuse] {errors}"
Memory