from typing import Optional, Union, List, Any
import httpx
import logging
import os
import warnings
from langfuse.client import Langfuse, StatefulTraceClient, StatefulSpanClient, StateType
class LangfuseBaseCallbackHandler:
log = logging.getLogger("langfuse")
def __init__(
self,
*,
public_key: Optional[str] = None,
secret_key: Optional[str] = None,
host: Optional[str] = None,
debug: bool = False,
stateful_client: Optional[
Union[StatefulTraceClient, StatefulSpanClient]
] = None,
update_stateful_client: bool = False,
version: Optional[str] = None,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
trace_name: Optional[str] = None,
release: Optional[str] = None,
metadata: Optional[Any] = None,
tags: Optional[List[str]] = None,
threads: Optional[int] = None,
flush_at: Optional[int] = None,
flush_interval: Optional[int] = None,
max_retries: Optional[int] = None,
timeout: Optional[int] = None,
enabled: Optional[bool] = None,
httpx_client: Optional[httpx.Client] = None,
sdk_integration: str,
sample_rate: Optional[float] = None,
) -> None:
self.version = version
self.session_id = session_id
self.user_id = user_id
self.trace_name = trace_name
self.release = release
self.metadata = metadata
self.tags = tags
self.root_span = None
self.update_stateful_client = update_stateful_client
self.langfuse = None
prio_public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
prio_secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
prio_host = host or os.environ.get(
"LANGFUSE_HOST", "https://cloud.langfuse.com"
)
prio_sample_rate = (
sample_rate
if sample_rate is not None
else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
)
if stateful_client and isinstance(stateful_client, StatefulTraceClient):
self.trace = stateful_client
self._task_manager = stateful_client.task_manager
return
elif stateful_client and isinstance(stateful_client, StatefulSpanClient):
self.root_span = stateful_client
self.trace = StatefulTraceClient(
stateful_client.client,
stateful_client.trace_id,
StateType.TRACE,
stateful_client.trace_id,
stateful_client.task_manager,
)
self._task_manager = stateful_client.task_manager
return
args = {
"public_key": prio_public_key,
"secret_key": prio_secret_key,
"host": prio_host,
"debug": debug,
}
if release is not None:
args["release"] = release
if threads is not None:
args["threads"] = threads
if flush_at is not None:
args["flush_at"] = flush_at
if flush_interval is not None:
args["flush_interval"] = flush_interval
if max_retries is not None:
args["max_retries"] = max_retries
if timeout is not None:
args["timeout"] = timeout
if enabled is not None:
args["enabled"] = enabled
if httpx_client is not None:
args["httpx_client"] = httpx_client
if prio_sample_rate is not None:
args["sample_rate"] = prio_sample_rate
args["sdk_integration"] = sdk_integration
self.langfuse = Langfuse(**args)
self.trace: Optional[StatefulTraceClient] = None
self._task_manager = self.langfuse.task_manager
def get_trace_id(self):
"""This method is deprecated and will be removed in a future version as it is not concurrency-safe.
Please refer to the [documentation](https://langfuse.com/docs/integrations/langchain/get-started#interoperability) on how to use interop with the Langfuse SDK to get the id of a trace.
Returns:
The ID of the current/last trace or None if no trace is available.
"""
warnings.warn(
"get_trace_id is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
DeprecationWarning,
)
return self.trace.id if self.trace else None
def get_trace_url(self):
"""This method is deprecated and will be removed in a future version as it is not concurrency-safe.
Please refer to the [documentation](https://langfuse.com/docs/tracing/url) for more information.
Returns:
The URL of the current/last trace or None if no trace is available.
"""
warnings.warn(
"get_trace_url is deprecated, create a trace for this handler instead. See interop documentation of this integration for more information.",
DeprecationWarning,
)
return self.trace.get_trace_url() if self.trace else None
def flush(self):
if self.trace is not None:
self.trace.task_manager.flush()
elif self.root_span is not None:
self.root_span.task_manager.flush()
else:
self.log.debug("There was no trace yet, hence no flushing possible.")
def auth_check(self):
if self.langfuse is not None:
return self.langfuse.auth_check()
elif self.trace is not None:
projects = self.trace.client.projects.get()
if len(projects.data) == 0:
raise Exception("No projects found for the keys.")
return True
elif self.root_span is not None:
projects = self.root_span.client.projects.get()
if len(projects) == 0:
raise Exception("No projects found for the keys.")
return True
return False