import httpx import threading from typing import Optional from langfuse import Langfuse class LangfuseSingleton: _instance = None _lock = threading.Lock() _langfuse: Optional[Langfuse] = None def __new__(cls): if not cls._instance: with cls._lock: if not cls._instance: cls._instance = super(LangfuseSingleton, cls).__new__(cls) return cls._instance def get( self, *, public_key: Optional[str] = None, secret_key: Optional[str] = None, host: Optional[str] = None, release: Optional[str] = None, debug: Optional[bool] = None, threads: Optional[int] = None, flush_at: Optional[int] = None, flush_interval: Optional[int] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, sdk_integration: Optional[str] = None, enabled: Optional[bool] = None, sample_rate: Optional[float] = None, ) -> Langfuse: if self._langfuse: return self._langfuse with self._lock: if self._langfuse: return self._langfuse langfuse_init_args = { "public_key": public_key, "secret_key": secret_key, "host": host, "release": release, "debug": debug, "threads": threads, "flush_at": flush_at, "flush_interval": flush_interval, "max_retries": max_retries, "timeout": timeout, "httpx_client": httpx_client, "sdk_integration": sdk_integration, "enabled": enabled, "sample_rate": sample_rate, } self._langfuse = Langfuse( **{k: v for k, v in langfuse_init_args.items() if v is not None} ) return self._langfuse def reset(self) -> None: with self._lock: if self._langfuse: self._langfuse.flush() self._langfuse = None
Memory