from contextlib import contextmanager
import datetime as dt
import logging
import os
import typing
import uuid
import backoff
import httpx
from enum import Enum
import time
import tracemalloc
from typing import (
Any,
Dict,
Optional,
Literal,
Union,
List,
Sequence,
overload,
)
import urllib.parse
import warnings
from dataclasses import dataclass
from langfuse.api.resources.commons.types.dataset_run_with_items import (
DatasetRunWithItems,
)
from langfuse.api.resources.commons.types.observations_view import ObservationsView
from langfuse.api.resources.commons.types.session import Session
from langfuse.api.resources.commons.types.trace_with_details import TraceWithDetails
from langfuse.api.resources.datasets.types.paginated_dataset_runs import (
PaginatedDatasetRuns,
)
from langfuse.api.resources.ingestion.types.create_event_body import CreateEventBody
from langfuse.api.resources.ingestion.types.create_generation_body import (
CreateGenerationBody,
)
from langfuse.api.resources.ingestion.types.create_span_body import CreateSpanBody
from langfuse.api.resources.ingestion.types.score_body import ScoreBody
from langfuse.api.resources.ingestion.types.trace_body import TraceBody
from langfuse.api.resources.ingestion.types.sdk_log_body import SdkLogBody
from langfuse.api.resources.ingestion.types.update_generation_body import (
UpdateGenerationBody,
)
from langfuse.api.resources.ingestion.types.update_span_body import UpdateSpanBody
from langfuse.api.resources.observations.types.observations_views import (
ObservationsViews,
)
from langfuse.api.resources.prompts.types import (
CreatePromptRequest_Chat,
CreatePromptRequest_Text,
Prompt_Text,
Prompt_Chat,
)
from langfuse.api.resources.trace.types.traces import Traces
from langfuse.api.resources.utils.resources.pagination.types.meta_response import (
MetaResponse,
)
from langfuse.model import (
CreateDatasetItemRequest,
CreateDatasetRequest,
CreateDatasetRunItemRequest,
ChatMessageDict,
DatasetItem,
DatasetStatus,
ModelUsage,
PromptClient,
ChatPromptClient,
TextPromptClient,
)
from langfuse.prompt_cache import PromptCache
try:
import pydantic.v1 as pydantic # type: ignore
except ImportError:
import pydantic # type: ignore
from langfuse.api.client import FernLangfuse
from langfuse.environment import get_common_release_envs
from langfuse.logging import clean_logger
from langfuse.model import Dataset, MapValue, Observation, TraceWithFullDetails
from langfuse.request import LangfuseClient
from langfuse.task_manager import TaskManager
from langfuse.types import SpanLevel, ScoreDataType
from langfuse.utils import _convert_usage_input, _create_prompt_context, _get_timestamp
from .version import __version__ as version
@dataclass
class FetchTracesResponse:
"""Response object for fetch_traces method."""
data: typing.List[TraceWithDetails]
meta: MetaResponse
@dataclass
class FetchTraceResponse:
"""Response object for fetch_trace method."""
data: TraceWithFullDetails
@dataclass
class FetchObservationsResponse:
"""Response object for fetch_observations method."""
data: typing.List[ObservationsView]
meta: MetaResponse
@dataclass
class FetchObservationResponse:
"""Response object for fetch_observation method."""
data: Observation
@dataclass
class FetchSessionsResponse:
"""Response object for fetch_sessions method."""
data: typing.List[Session]
meta: MetaResponse
class Langfuse(object):
"""Langfuse Python client.
Attributes:
log (logging.Logger): Logger for the Langfuse client.
base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction.
httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API.
client (FernLangfuse): Core interface for Langfuse API interaction.
task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks.
release (str): Identifies the release number or hash of the application.
prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances.
Example:
Initiating the Langfuse client should always be first step to use Langfuse.
```python
import os
from langfuse import Langfuse
# Set the public and secret keys as environment variables
os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
os.environ['LANGFUSE_SECRET_KEY'] = secret_key
# Initialize the Langfuse client using the credentials
langfuse = Langfuse()
```
"""
log = logging.getLogger("langfuse")
"""Logger for the Langfuse client."""
host: str
"""Host of Langfuse API."""
def __init__(
self,
public_key: Optional[str] = None,
secret_key: Optional[str] = None,
host: Optional[str] = None,
release: Optional[str] = None,
debug: bool = False,
threads: Optional[int] = None,
flush_at: Optional[int] = None,
flush_interval: Optional[float] = None,
max_retries: Optional[int] = None,
timeout: Optional[int] = None, # seconds
sdk_integration: Optional[str] = "default",
httpx_client: Optional[httpx.Client] = None,
enabled: Optional[bool] = True,
sample_rate: Optional[float] = None,
):
"""Initialize the Langfuse client.
Args:
public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable.
secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable.
host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`.
release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable.
debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable.
threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues.
flush_at: Max batch size that's sent to the API.
flush_interval: Max delay until a new batch is sent to the API.
max_retries: Max number of retries in case of API/network errors.
timeout: Timeout of API requests in seconds. Defaults to 20 seconds.
httpx_client: Pass your own httpx client for more customizability of requests.
sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly.
enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable.
Raises:
ValueError: If public_key or secret_key are not set and not found in environment variables.
Example:
Initiating the Langfuse client should always be first step to use Langfuse.
```python
import os
from langfuse import Langfuse
# Set the public and secret keys as environment variables
os.environ['LANGFUSE_PUBLIC_KEY'] = public_key
os.environ['LANGFUSE_SECRET_KEY'] = secret_key
# Initialize the Langfuse client using the credentials
langfuse = Langfuse()
```
"""
self.enabled = enabled
public_key = public_key or os.environ.get("LANGFUSE_PUBLIC_KEY")
secret_key = secret_key or os.environ.get("LANGFUSE_SECRET_KEY")
sample_rate = (
sample_rate
if sample_rate
is not None # needs explicit None check, as 0 is a valid value
else float(os.environ.get("LANGFUSE_SAMPLE_RATE", 1.0))
)
if sample_rate is not None and (
sample_rate > 1 or sample_rate < 0
): # default value 1 will be set in the taskmanager
self.enabled = False
self.log.warning(
"Langfuse client is disabled since the sample rate provided is not between 0 and 1."
)
threads = threads or int(os.environ.get("LANGFUSE_THREADS", 1))
flush_at = flush_at or int(os.environ.get("LANGFUSE_FLUSH_AT", 15))
flush_interval = flush_interval or float(
os.environ.get("LANGFUSE_FLUSH_INTERVAL", 0.5)
)
max_retries = max_retries or int(os.environ.get("LANGFUSE_MAX_RETRIES", 3))
timeout = timeout or int(os.environ.get("LANGFUSE_TIMEOUT", 20))
if not self.enabled:
self.log.warning(
"Langfuse client is disabled. No observability data will be sent."
)
elif not public_key:
self.enabled = False
self.log.warning(
"Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
)
elif not secret_key:
self.enabled = False
self.log.warning(
"Langfuse client is disabled since no secret_key was provided as a parameter or environment variable 'LANGFUSE_SECRET_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client"
)
set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True")
if set_debug is True:
# Ensures that debug level messages are logged when debug mode is on.
# Otherwise, defaults to WARNING level.
# See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided
logging.basicConfig()
self.log.setLevel(logging.DEBUG)
clean_logger()
else:
self.log.setLevel(logging.WARNING)
clean_logger()
self.base_url = (
host
if host
else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
)
self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
self.client = FernLangfuse(
base_url=self.base_url,
username=public_key,
password=secret_key,
x_langfuse_sdk_name="python",
x_langfuse_sdk_version=version,
x_langfuse_public_key=public_key,
httpx_client=self.httpx_client,
)
langfuse_client = LangfuseClient(
public_key=public_key,
secret_key=secret_key,
base_url=self.base_url,
version=version,
timeout=timeout,
session=self.httpx_client,
)
args = {
"threads": threads,
"flush_at": flush_at,
"flush_interval": flush_interval,
"max_retries": max_retries,
"client": langfuse_client,
"public_key": public_key,
"sdk_name": "python",
"sdk_version": version,
"sdk_integration": sdk_integration,
"enabled": self.enabled,
"sample_rate": sample_rate,
}
self.task_manager = TaskManager(**args)
self.trace_id = None
self.release = self._get_release_value(release)
self.prompt_cache = PromptCache()
def _get_release_value(self, release: Optional[str] = None) -> Optional[str]:
if release:
return release
elif "LANGFUSE_RELEASE" in os.environ:
return os.environ["LANGFUSE_RELEASE"]
else:
return get_common_release_envs()
def get_trace_id(self) -> str:
"""Get the current trace id."""
return self.trace_id
def get_trace_url(self) -> str:
"""Get the URL of the current trace to view it in the Langfuse UI."""
return f"{self.base_url}/trace/{self.trace_id}"
def get_dataset(
self, name: str, *, fetch_items_page_size: Optional[int] = 50
) -> "DatasetClient":
"""Fetch a dataset by its name.
Args:
name (str): The name of the dataset to fetch.
fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
Returns:
DatasetClient: The dataset with the given name.
"""
try:
self.log.debug(f"Getting datasets {name}")
dataset = self.client.datasets.get(dataset_name=name)
dataset_items = []
page = 1
while True:
new_items = self.client.dataset_items.list(
dataset_name=name, page=page, limit=fetch_items_page_size
)
dataset_items.extend(new_items.data)
if new_items.meta.total_pages <= page:
break
page += 1
items = [DatasetItemClient(i, langfuse=self) for i in dataset_items]
return DatasetClient(dataset, items=items)
except Exception as e:
self.log.exception(e)
raise e
def get_dataset_item(self, id: str) -> "DatasetItemClient":
"""Get the dataset item with the given id."""
try:
self.log.debug(f"Getting dataset item {id}")
dataset_item = self.client.dataset_items.get(id=id)
return DatasetItemClient(dataset_item, langfuse=self)
except Exception as e:
self.log.exception(e)
raise e
def auth_check(self) -> bool:
"""Check if the provided credentials (public and secret key) are valid.
Raises:
Exception: If no projects were found for the provided credentials.
Note:
This method is blocking. It is discouraged to use it in production code.
"""
try:
projects = self.client.projects.get()
self.log.debug(
f"Auth check successful, found {len(projects.data)} projects"
)
if len(projects.data) == 0:
raise Exception(
"Auth check failed, no project found for the keys provided."
)
return True
except Exception as e:
self.log.exception(e)
raise e
def get_dataset_runs(
self,
dataset_name: str,
*,
page: typing.Optional[int] = None,
limit: typing.Optional[int] = None,
) -> PaginatedDatasetRuns:
"""Get all dataset runs.
Args:
dataset_name (str): Name of the dataset.
page (Optional[int]): Page number of the dataset runs to return, starts at 1. Defaults to None.
limit (Optional[int]): Maximum number of dataset runs to return. Defaults to 50.
Returns:
PaginatedDatasetRuns: The dataset runs.
"""
try:
self.log.debug("Getting dataset runs")
return self.client.datasets.get_runs(
dataset_name=dataset_name, page=page, limit=limit
)
except Exception as e:
self.log.exception(e)
raise e
def get_dataset_run(
self,
dataset_name: str,
dataset_run_name: str,
) -> DatasetRunWithItems:
"""Get a dataset run.
Args:
dataset_name: Name of the dataset.
dataset_run_name: Name of the dataset run.
Returns:
DatasetRunWithItems: The dataset run.
"""
try:
self.log.debug(
f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}"
)
return self.client.datasets.get_run(
dataset_name=dataset_name, run_name=dataset_run_name
)
except Exception as e:
self.log.exception(e)
raise e
def create_dataset(
self,
name: str,
description: Optional[str] = None,
metadata: Optional[Any] = None,
) -> Dataset:
"""Create a dataset with the given name on Langfuse.
Args:
name: Name of the dataset to create.
description: Description of the dataset. Defaults to None.
metadata: Additional metadata. Defaults to None.
Returns:
Dataset: The created dataset as returned by the Langfuse API.
"""
try:
body = CreateDatasetRequest(
name=name, description=description, metadata=metadata
)
self.log.debug(f"Creating datasets {body}")
return self.client.datasets.create(request=body)
except Exception as e:
self.log.exception(e)
raise e
def create_dataset_item(
self,
dataset_name: str,
input: Optional[Any] = None,
expected_output: Optional[Any] = None,
metadata: Optional[Any] = None,
source_trace_id: Optional[str] = None,
source_observation_id: Optional[str] = None,
status: Optional[DatasetStatus] = None,
id: Optional[str] = None,
) -> DatasetItem:
"""Create a dataset item.
Upserts if an item with id already exists.
Args:
dataset_name: Name of the dataset in which the dataset item should be created.
input: Input data. Defaults to None. Can contain any dict, list or scalar.
expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar.
metadata: Additional metadata. Defaults to None. Can contain any dict, list or scalar.
source_trace_id: Id of the source trace. Defaults to None.
source_observation_id: Id of the source observation. Defaults to None.
status: Status of the dataset item. Defaults to ACTIVE for newly created items.
id: Id of the dataset item. Defaults to None. Provide your own id if you want to dedupe dataset items. Id needs to be globally unique and cannot be reused across datasets.
Returns:
DatasetItem: The created dataset item as returned by the Langfuse API.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Uploading items to the Langfuse dataset named "capital_cities"
langfuse.create_dataset_item(
dataset_name="capital_cities",
input={"input": {"country": "Italy"}},
expected_output={"expected_output": "Rome"},
metadata={"foo": "bar"}
)
```
"""
try:
body = CreateDatasetItemRequest(
datasetName=dataset_name,
input=input,
expectedOutput=expected_output,
metadata=metadata,
sourceTraceId=source_trace_id,
sourceObservationId=source_observation_id,
status=status,
id=id,
)
self.log.debug(f"Creating dataset item {body}")
return self.client.dataset_items.create(request=body)
except Exception as e:
self.log.exception(e)
raise e
def fetch_trace(
self,
id: str,
) -> FetchTraceResponse:
"""Fetch a trace via the Langfuse API by its id.
Args:
id: The id of the trace to fetch.
Returns:
FetchTraceResponse: The trace with full details as returned by the Langfuse API on `data`.
Raises:
Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
"""
try:
self.log.debug(f"Getting trace {id}")
trace = self.client.trace.get(id)
return FetchTraceResponse(data=trace)
except Exception as e:
self.log.exception(e)
raise e
def get_trace(
self,
id: str,
) -> TraceWithFullDetails:
"""Get a trace via the Langfuse API by its id. Deprecated, use fetch_trace instead.
Args:
id: The id of the trace to fetch.
Returns:
TraceWithFullDetails: The trace with full details as returned by the Langfuse API.
Raises:
Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request.
"""
warnings.warn(
"get_trace is deprecated, use fetch_trace instead.",
DeprecationWarning,
)
try:
self.log.debug(f"Getting trace {id}")
return self.client.trace.get(id)
except Exception as e:
self.log.exception(e)
raise e
def fetch_traces(
self,
*,
page: Optional[int] = None,
limit: Optional[int] = None,
user_id: Optional[str] = None,
name: Optional[str] = None,
session_id: Optional[str] = None,
from_timestamp: Optional[dt.datetime] = None,
to_timestamp: Optional[dt.datetime] = None,
order_by: Optional[str] = None,
tags: Optional[Union[str, Sequence[str]]] = None,
) -> FetchTracesResponse:
"""Fetch a list of traces in the current project matching the given parameters.
Args:
page (Optional[int]): Page number, starts at 1. Defaults to None.
limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None.
name (Optional[str]): Filter by name of traces. Defaults to None.
user_id (Optional[str]): Filter by user_id. Defaults to None.
session_id (Optional[str]): Filter by session_id. Defaults to None.
from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None.
to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None.
order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None.
tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None.
Returns:
FetchTracesResponse, list of traces on `data` and metadata on `meta`.
Raises:
Exception: If an error occurred during the request.
"""
try:
self.log.debug(
f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}"
)
res = self.client.trace.list(
page=page,
limit=limit,
name=name,
user_id=user_id,
session_id=session_id,
from_timestamp=from_timestamp,
to_timestamp=to_timestamp,
order_by=order_by,
tags=tags,
)
return FetchTracesResponse(data=res.data, meta=res.meta)
except Exception as e:
self.log.exception(e)
raise e
def get_traces(
self,
*,
page: Optional[int] = None,
limit: Optional[int] = None,
user_id: Optional[str] = None,
name: Optional[str] = None,
session_id: Optional[str] = None,
from_timestamp: Optional[dt.datetime] = None,
to_timestamp: Optional[dt.datetime] = None,
order_by: Optional[str] = None,
tags: Optional[Union[str, Sequence[str]]] = None,
) -> Traces:
"""Get a list of traces in the current project matching the given parameters. Deprecated, use fetch_traces instead.
Args:
page (Optional[int]): Page number, starts at 1. Defaults to None.
limit (Optional[int]): Limit of items per page. If you encounter API issues due to too large page sizes, try to reduce the limit. Defaults to None.
name (Optional[str]): Filter by name of traces. Defaults to None.
user_id (Optional[str]): Filter by user_id. Defaults to None.
session_id (Optional[str]): Filter by session_id. Defaults to None.
from_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp on or after this datetime. Defaults to None.
to_timestamp (Optional[dt.datetime]): Retrieve only traces with a timestamp before this datetime. Defaults to None.
order_by (Optional[str]): Format of the string `[field].[asc/desc]`. Fields: id, timestamp, name, userId, release, version, public, bookmarked, sessionId. Example: `timestamp.asc`. Defaults to None.
tags (Optional[Union[str, Sequence[str]]]): Filter by tags. Defaults to None.
Returns:
List of Traces
Raises:
Exception: If an error occurred during the request.
"""
warnings.warn(
"get_traces is deprecated, use fetch_traces instead.",
DeprecationWarning,
)
try:
self.log.debug(
f"Getting traces... {page}, {limit}, {name}, {user_id}, {session_id}, {from_timestamp}, {to_timestamp}, {order_by}, {tags}"
)
return self.client.trace.list(
page=page,
limit=limit,
name=name,
user_id=user_id,
session_id=session_id,
from_timestamp=from_timestamp,
to_timestamp=to_timestamp,
order_by=order_by,
tags=tags,
)
except Exception as e:
self.log.exception(e)
raise e
def fetch_observations(
self,
*,
page: typing.Optional[int] = None,
limit: typing.Optional[int] = None,
name: typing.Optional[str] = None,
user_id: typing.Optional[str] = None,
trace_id: typing.Optional[str] = None,
parent_observation_id: typing.Optional[str] = None,
from_start_time: typing.Optional[dt.datetime] = None,
to_start_time: typing.Optional[dt.datetime] = None,
type: typing.Optional[str] = None,
) -> FetchObservationsResponse:
"""Get a list of observations in the current project matching the given parameters.
Args:
page (Optional[int]): Page number of the observations to return. Defaults to None.
limit (Optional[int]): Maximum number of observations to return. Defaults to None.
name (Optional[str]): Name of the observations to return. Defaults to None.
user_id (Optional[str]): User identifier. Defaults to None.
trace_id (Optional[str]): Trace identifier. Defaults to None.
parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
type (Optional[str]): Type of the observation. Defaults to None.
Returns:
FetchObservationsResponse, list of observations on `data` and metadata on `meta`.
Raises:
Exception: If an error occurred during the request.
"""
try:
self.log.debug(
f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}"
)
res = self.client.observations.get_many(
page=page,
limit=limit,
name=name,
user_id=user_id,
trace_id=trace_id,
parent_observation_id=parent_observation_id,
from_start_time=from_start_time,
to_start_time=to_start_time,
type=type,
)
return FetchObservationsResponse(data=res.data, meta=res.meta)
except Exception as e:
self.log.exception(e)
raise e
def get_observations(
self,
*,
page: typing.Optional[int] = None,
limit: typing.Optional[int] = None,
name: typing.Optional[str] = None,
user_id: typing.Optional[str] = None,
trace_id: typing.Optional[str] = None,
parent_observation_id: typing.Optional[str] = None,
from_start_time: typing.Optional[dt.datetime] = None,
to_start_time: typing.Optional[dt.datetime] = None,
type: typing.Optional[str] = None,
) -> ObservationsViews:
"""Get a list of observations in the current project matching the given parameters. Deprecated, use fetch_observations instead.
Args:
page (Optional[int]): Page number of the observations to return. Defaults to None.
limit (Optional[int]): Maximum number of observations to return. Defaults to None.
name (Optional[str]): Name of the observations to return. Defaults to None.
user_id (Optional[str]): User identifier. Defaults to None.
trace_id (Optional[str]): Trace identifier. Defaults to None.
parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None.
from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
type (Optional[str]): Type of the observation. Defaults to None.
Returns:
List of ObservationsViews: List of observations in the project matching the given parameters.
Raises:
Exception: If an error occurred during the request.
"""
warnings.warn(
"get_observations is deprecated, use fetch_observations instead.",
DeprecationWarning,
)
try:
self.log.debug(
f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {from_start_time}, {to_start_time}, {type}"
)
return self.client.observations.get_many(
page=page,
limit=limit,
name=name,
user_id=user_id,
trace_id=trace_id,
parent_observation_id=parent_observation_id,
from_start_time=from_start_time,
to_start_time=to_start_time,
type=type,
)
except Exception as e:
self.log.exception(e)
raise e
def get_generations(
self,
*,
page: typing.Optional[int] = None,
limit: typing.Optional[int] = None,
name: typing.Optional[str] = None,
user_id: typing.Optional[str] = None,
trace_id: typing.Optional[str] = None,
from_start_time: typing.Optional[dt.datetime] = None,
to_start_time: typing.Optional[dt.datetime] = None,
parent_observation_id: typing.Optional[str] = None,
) -> ObservationsViews:
"""Get a list of generations in the current project matching the given parameters. Deprecated, use fetch_observations(type='GENERATION') instead.
Args:
page (Optional[int]): Page number of the generations to return. Defaults to None.
limit (Optional[int]): Maximum number of generations to return. Defaults to None.
name (Optional[str]): Name of the generations to return. Defaults to None.
user_id (Optional[str]): User identifier of the generations to return. Defaults to None.
trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None.
from_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time on or after this datetime. Defaults to None.
to_start_time (Optional[dt.datetime]): Retrieve only observations with a start_time before this datetime. Defaults to None.
parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None.
Returns:
List of ObservationsViews: List of generations in the project matching the given parameters.
Raises:
Exception: If an error occurred during the request.
"""
warnings.warn(
"get_generations is deprecated, use `fetch_observations(type='GENERATION')` instead.",
DeprecationWarning,
)
return self.get_observations(
page=page,
limit=limit,
name=name,
user_id=user_id,
trace_id=trace_id,
parent_observation_id=parent_observation_id,
from_start_time=from_start_time,
to_start_time=to_start_time,
type="GENERATION",
)
def fetch_observation(
self,
id: str,
) -> FetchObservationResponse:
"""Get an observation in the current project with the given identifier.
Args:
id: The identifier of the observation to fetch.
Returns:
FetchObservationResponse: The observation with the given id on `data`.
Raises:
Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
"""
try:
self.log.debug(f"Getting observation {id}")
observation = self.client.observations.get(id)
return FetchObservationResponse(data=observation)
except Exception as e:
self.log.exception(e)
raise e
def get_observation(
self,
id: str,
) -> Observation:
"""Get an observation in the current project with the given identifier. Deprecated, use fetch_observation instead.
Args:
id: The identifier of the observation to fetch.
Raises:
Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request.
"""
warnings.warn(
"get_observation is deprecated, use fetch_observation instead.",
DeprecationWarning,
)
try:
self.log.debug(f"Getting observation {id}")
return self.client.observations.get(id)
except Exception as e:
self.log.exception(e)
raise e
def fetch_sessions(
self,
*,
page: typing.Optional[int] = None,
limit: typing.Optional[int] = None,
from_timestamp: typing.Optional[dt.datetime] = None,
to_timestamp: typing.Optional[dt.datetime] = None,
) -> FetchSessionsResponse:
"""Get a list of sessions in the current project.
Args:
page (Optional[int]): Page number of the sessions to return. Defaults to None.
limit (Optional[int]): Maximum number of sessions to return. Defaults to None.
from_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp on or after this datetime. Defaults to None.
to_timestamp (Optional[dt.datetime]): Retrieve only sessions with a timestamp before this datetime. Defaults to None.
Returns:
FetchSessionsResponse, list of sessions on `data` and metadata on `meta`.
Raises:
Exception: If an error occurred during the request.
"""
try:
self.log.debug(
f"Getting sessions... {page}, {limit}, {from_timestamp}, {to_timestamp}"
)
res = self.client.sessions.list(
page=page,
limit=limit,
from_timestamp=from_timestamp,
to_timestamp=to_timestamp,
)
return FetchSessionsResponse(data=res.data, meta=res.meta)
except Exception as e:
self.log.exception(e)
raise e
@overload
def get_prompt(
self,
name: str,
version: Optional[int] = None,
*,
label: Optional[str] = None,
type: Literal["chat"],
cache_ttl_seconds: Optional[int] = None,
fallback: Optional[List[ChatMessageDict]] = None,
max_retries: Optional[int] = None,
fetch_timeout_seconds: Optional[int] = None,
) -> ChatPromptClient: ...
@overload
def get_prompt(
self,
name: str,
version: Optional[int] = None,
*,
label: Optional[str] = None,
type: Literal["text"] = "text",
cache_ttl_seconds: Optional[int] = None,
fallback: Optional[str] = None,
max_retries: Optional[int] = None,
fetch_timeout_seconds: Optional[int] = None,
) -> TextPromptClient: ...
def get_prompt(
self,
name: str,
version: Optional[int] = None,
*,
label: Optional[str] = None,
type: Literal["chat", "text"] = "text",
cache_ttl_seconds: Optional[int] = None,
fallback: Union[Optional[List[ChatMessageDict]], Optional[str]] = None,
max_retries: Optional[int] = None,
fetch_timeout_seconds: Optional[int] = None,
) -> PromptClient:
"""Get a prompt.
This method attempts to fetch the requested prompt from the local cache. If the prompt is not found
in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again
and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will
return the expired prompt as a fallback.
Args:
name (str): The name of the prompt to retrieve.
Keyword Args:
version (Optional[int]): The version of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
label: Optional[str]: The label of the prompt to retrieve. If no label and version is specified, the `production` label is returned. Specify either version or label, not both.
cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a
keyword argument. If not set, defaults to 60 seconds.
type: Literal["chat", "text"]: The type of the prompt to retrieve. Defaults to "text".
fallback: Union[Optional[List[ChatMessageDict]], Optional[str]]: The prompt string to return if fetching the prompt fails. Important on the first call where no cached prompt is available. Follows Langfuse prompt formatting with double curly braces for variables. Defaults to None.
max_retries: Optional[int]: The maximum number of retries in case of API/network errors. Defaults to 2. The maximum value is 4. Retries have an exponential backoff with a maximum delay of 10 seconds.
fetch_timeout_seconds: Optional[int]: The timeout in milliseconds for fetching the prompt. Defaults to the default timeout set on the SDK, which is 10 seconds per default.
Returns:
The prompt object retrieved from the cache or directly fetched if not cached or expired of type
- TextPromptClient, if type argument is 'text'.
- ChatPromptClient, if type argument is 'chat'.
Raises:
Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an
expired prompt in the cache, in which case it logs a warning and returns the expired prompt.
"""
if version is not None and label is not None:
raise ValueError("Cannot specify both version and label at the same time.")
if not name:
raise ValueError("Prompt name cannot be empty.")
cache_key = PromptCache.generate_cache_key(name, version=version, label=label)
bounded_max_retries = self._get_bounded_max_retries(
max_retries, default_max_retries=2, max_retries_upper_bound=4
)
self.log.debug(f"Getting prompt '{cache_key}'")
cached_prompt = self.prompt_cache.get(cache_key)
if cached_prompt is None:
self.log.debug(f"Prompt '{cache_key}' not found in cache.")
try:
return self._fetch_prompt_and_update_cache(
name,
version=version,
label=label,
ttl_seconds=cache_ttl_seconds,
max_retries=bounded_max_retries,
fetch_timeout_seconds=fetch_timeout_seconds,
)
except Exception as e:
if fallback:
self.log.warn(
f"Returning fallback prompt for '{cache_key}' due to fetch error: {e}"
)
fallback_client_args = {
"name": name,
"prompt": fallback,
"type": type,
"version": version or 0,
"config": {},
"labels": [label] if label else [],
"tags": [],
}
if type == "text":
return TextPromptClient(
prompt=Prompt_Text(**fallback_client_args),
is_fallback=True,
)
if type == "chat":
return ChatPromptClient(
prompt=Prompt_Chat(**fallback_client_args),
is_fallback=True,
)
raise e
if cached_prompt.is_expired():
try:
return self._fetch_prompt_and_update_cache(
name,
version=version,
label=label,
ttl_seconds=cache_ttl_seconds,
max_retries=bounded_max_retries,
fetch_timeout_seconds=fetch_timeout_seconds,
)
except Exception as e:
self.log.warn(
f"Returning expired prompt cache for '{cache_key}' due to fetch error: {e}"
)
return cached_prompt.value
return cached_prompt.value
def _fetch_prompt_and_update_cache(
self,
name: str,
*,
version: Optional[int] = None,
label: Optional[str] = None,
ttl_seconds: Optional[int] = None,
max_retries: int,
fetch_timeout_seconds,
) -> PromptClient:
try:
cache_key = PromptCache.generate_cache_key(
name, version=version, label=label
)
self.log.debug(f"Fetching prompt '{cache_key}' from server...")
@backoff.on_exception(backoff.constant, Exception, max_tries=max_retries)
def fetch_prompts():
return self.client.prompts.get(
self._url_encode(name),
version=version,
label=label,
request_options={
"timeout_in_seconds": fetch_timeout_seconds,
}
if fetch_timeout_seconds is not None
else None,
)
prompt_response = fetch_prompts()
if prompt_response.type == "chat":
prompt = ChatPromptClient(prompt_response)
else:
prompt = TextPromptClient(prompt_response)
self.prompt_cache.set(cache_key, prompt, ttl_seconds)
return prompt
except Exception as e:
self.log.exception(f"Error while fetching prompt '{cache_key}': {e}")
raise e
def _get_bounded_max_retries(
self,
max_retries: Optional[int],
*,
default_max_retries: int = 2,
max_retries_upper_bound: int = 4,
) -> int:
if max_retries is None:
return default_max_retries
bounded_max_retries = min(
max(max_retries, 0),
max_retries_upper_bound,
)
return bounded_max_retries
@overload
def create_prompt(
self,
*,
name: str,
prompt: List[ChatMessageDict],
is_active: Optional[bool] = None, # deprecated
labels: List[str] = [],
tags: Optional[List[str]] = None,
type: Optional[Literal["chat"]],
config: Optional[Any] = None,
) -> ChatPromptClient: ...
@overload
def create_prompt(
self,
*,
name: str,
prompt: str,
is_active: Optional[bool] = None, # deprecated
labels: List[str] = [],
tags: Optional[List[str]] = None,
type: Optional[Literal["text"]] = "text",
config: Optional[Any] = None,
) -> TextPromptClient: ...
def create_prompt(
self,
*,
name: str,
prompt: Union[str, List[ChatMessageDict]],
is_active: Optional[bool] = None, # deprecated
labels: List[str] = [],
tags: Optional[List[str]] = None,
type: Optional[Literal["chat", "text"]] = "text",
config: Optional[Any] = None,
) -> PromptClient:
"""Create a new prompt in Langfuse.
Keyword Args:
name : The name of the prompt to be created.
prompt : The content of the prompt to be created.
is_active [DEPRECATED] : A flag indicating whether the prompt is active or not. This is deprecated and will be removed in a future release. Please use the 'production' label instead.
labels: The labels of the prompt. Defaults to None. To create a default-served prompt, add the 'production' label.
tags: The tags of the prompt. Defaults to None. Will be applied to all versions of the prompt.
config: Additional structured data to be saved with the prompt. Defaults to None.
type: The type of the prompt to be created. "chat" vs. "text". Defaults to "text".
Returns:
TextPromptClient: The prompt if type argument is 'text'.
ChatPromptClient: The prompt if type argument is 'chat'.
"""
try:
self.log.debug(f"Creating prompt {name=}, {version=}, {labels=}")
# Handle deprecated is_active flag
if is_active:
self.log.warning(
"The 'is_active' flag is deprecated and will be removed in a future release. Please use the 'production' label instead."
)
labels = labels if "production" in labels else labels + ["production"]
if type == "chat":
if not isinstance(prompt, list):
raise ValueError(
"For 'chat' type, 'prompt' must be a list of chat messages with role and content attributes."
)
request = CreatePromptRequest_Chat(
name=name,
prompt=prompt,
labels=labels,
tags=tags,
config=config or {},
type="chat",
)
server_prompt = self.client.prompts.create(request=request)
return ChatPromptClient(prompt=server_prompt)
if not isinstance(prompt, str):
raise ValueError("For 'text' type, 'prompt' must be a string.")
request = CreatePromptRequest_Text(
name=name,
prompt=prompt,
labels=labels,
tags=tags,
config=config or {},
type="text",
)
server_prompt = self.client.prompts.create(request=request)
return TextPromptClient(prompt=server_prompt)
except Exception as e:
self.log.exception(e)
raise e
def _url_encode(self, url: str) -> str:
return urllib.parse.quote(url)
def trace(
self,
*,
id: typing.Optional[str] = None,
name: typing.Optional[str] = None,
user_id: typing.Optional[str] = None,
session_id: typing.Optional[str] = None,
version: typing.Optional[str] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
metadata: typing.Optional[typing.Any] = None,
tags: typing.Optional[typing.List[str]] = None,
timestamp: typing.Optional[dt.datetime] = None,
public: typing.Optional[bool] = None,
**kwargs,
) -> "StatefulTraceClient":
"""Create a trace.
Args:
id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id.
name: Identifier of the trace. Useful for sorting/filtering in the UI.
input: The input of the trace. Can be any JSON object.
output: The output of the trace. Can be any JSON object.
metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
timestamp: The timestamp of the trace. Defaults to the current time if not provided.
public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
**kwargs: Additional keyword arguments that can be included in the trace.
Returns:
StatefulTraceClient: The created trace.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
trace = langfuse.trace(
name="example-application",
user_id="user-1234")
)
```
"""
new_id = id or str(uuid.uuid4())
self.trace_id = new_id
try:
new_dict = {
"id": new_id,
"name": name,
"userId": user_id,
"sessionId": session_id
or kwargs.get("sessionId", None), # backward compatibility
"release": self.release,
"version": version,
"metadata": metadata,
"input": input,
"output": output,
"tags": tags,
"timestamp": timestamp or _get_timestamp(),
"public": public,
}
if kwargs is not None:
new_dict.update(kwargs)
new_body = TraceBody(**new_dict)
self.log.debug(f"Creating trace {new_body}")
event = {
"id": str(uuid.uuid4()),
"type": "trace-create",
"body": new_body.dict(exclude_none=True),
}
self.task_manager.add_task(
event,
)
except Exception as e:
self.log.exception(e)
finally:
self._log_memory_usage()
return StatefulTraceClient(
self.client, new_id, StateType.TRACE, new_id, self.task_manager
)
def _log_memory_usage(self):
try:
is_malloc_tracing_enabled = bool(int(os.getenv("PYTHONTRACEMALLOC", 0)))
report_interval = int(os.getenv("LANGFUSE_DEBUG_MEMORY_REPORT_INTERVAL", 0))
top_k_items = int(os.getenv("LANGFUSE_DEBUG_MEMORY_TOP_K", 10))
if (
not is_malloc_tracing_enabled
or report_interval <= 0
or round(time.monotonic()) % report_interval != 0
):
return
snapshot = tracemalloc.take_snapshot().statistics("lineno")
total_memory_usage = sum([stat.size for stat in snapshot]) / 1024 / 1024
memory_usage_total_items = [f"{stat}" for stat in snapshot]
memory_usage_langfuse_items = [
stat for stat in memory_usage_total_items if "/langfuse/" in stat
]
logged_memory_usage = {
"all_files": [f"{stat}" for stat in memory_usage_total_items][
:top_k_items
],
"langfuse_files": [f"{stat}" for stat in memory_usage_langfuse_items][
:top_k_items
],
"total_usage": f"{total_memory_usage:.2f} MB",
"langfuse_queue_length": self.task_manager._queue.qsize(),
}
self.log.debug("Memory usage: ", logged_memory_usage)
event = SdkLogBody(log=logged_memory_usage)
self.task_manager.add_task(
{
"id": str(uuid.uuid4()),
"type": "sdk-log",
"timestamp": _get_timestamp(),
"body": event.dict(),
}
)
except Exception as e:
self.log.exception(e)
@overload
def score(
self,
*,
name: str,
value: float,
data_type: typing.Optional[Literal["NUMERIC", "BOOLEAN"]] = None,
trace_id: typing.Optional[str] = None,
id: typing.Optional[str] = None,
comment: typing.Optional[str] = None,
observation_id: typing.Optional[str] = None,
config_id: typing.Optional[str] = None,
**kwargs,
) -> "StatefulClient": ...
@overload
def score(
self,
*,
name: str,
value: str,
data_type: typing.Optional[Literal["CATEGORICAL"]] = "CATEGORICAL",
trace_id: typing.Optional[str] = None,
id: typing.Optional[str] = None,
comment: typing.Optional[str] = None,
observation_id: typing.Optional[str] = None,
config_id: typing.Optional[str] = None,
**kwargs,
) -> "StatefulClient": ...
def score(
self,
*,
name: str,
value: typing.Union[float, str],
data_type: typing.Optional[ScoreDataType] = None,
trace_id: typing.Optional[str] = None,
id: typing.Optional[str] = None,
comment: typing.Optional[str] = None,
observation_id: typing.Optional[str] = None,
config_id: typing.Optional[str] = None,
**kwargs,
) -> "StatefulClient":
"""Create a score attached to a trace (and optionally an observation).
Args:
name (str): Identifier of the score.
value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores.
data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
trace_id (str): The id of the trace to which the score should be attached.
id (Optional[str]): The id of the score. If not provided, a new UUID is generated.
comment (Optional[str]): Additional context/explanation of the score.
observation_id (Optional[str]): The id of the observation to which the score should be attached.
config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
**kwargs: Additional keyword arguments to include in the score.
Returns:
StatefulClient: Either the associated observation (if observation_id is provided) or the trace (if observation_id is not provided).
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(name="example-application")
# Get id of created trace
trace_id = trace.id
# Add score to the trace
trace = langfuse.score(
trace_id=trace_id,
name="user-explicit-feedback",
value=0.9,
comment="I like how personalized the response is"
)
```
"""
trace_id = trace_id or self.trace_id or str(uuid.uuid4())
new_id = id or str(uuid.uuid4())
try:
new_dict = {
"id": new_id,
"trace_id": trace_id,
"observation_id": observation_id,
"name": name,
"value": value,
"data_type": data_type,
"comment": comment,
"config_id": config_id,
**kwargs,
}
self.log.debug(f"Creating score {new_dict}...")
new_body = ScoreBody(**new_dict)
event = {
"id": str(uuid.uuid4()),
"type": "score-create",
"body": new_body.dict(exclude_none=True),
}
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
if observation_id is not None:
return StatefulClient(
self.client,
observation_id,
StateType.OBSERVATION,
trace_id,
self.task_manager,
)
else:
return StatefulClient(
self.client, new_id, StateType.TRACE, new_id, self.task_manager
)
def span(
self,
*,
id: typing.Optional[str] = None,
trace_id: typing.Optional[str] = None,
parent_observation_id: typing.Optional[str] = None,
name: typing.Optional[str] = None,
start_time: typing.Optional[dt.datetime] = None,
end_time: typing.Optional[dt.datetime] = None,
metadata: typing.Optional[typing.Any] = None,
level: typing.Optional[SpanLevel] = None,
status_message: typing.Optional[str] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
version: typing.Optional[str] = None,
**kwargs,
) -> "StatefulSpanClient":
"""Create a span.
A span represents durations of units of work in a trace.
Usually, you want to add a span nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
If no trace_id is provided, a new trace is created just for this span.
Args:
id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.
trace_id (Optional[str]): The trace ID associated with this span. If not provided, a new UUID is generated.
parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
input (Optional[dict]): The input to the span. Can be any JSON object.
output (Optional[dict]): The output to the span. Can be any JSON object.
version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
**kwargs: Additional keyword arguments to include in the span.
Returns:
StatefulSpanClient: The created span.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
trace = langfuse.trace(name = "llm-feature")
# Create a span
retrieval = langfuse.span(name = "retrieval", trace_id = trace.id)
# Create a nested span
nested_span = langfuse.span(name = "retrieval", trace_id = trace.id, parent_observation_id = retrieval.id)
```
"""
new_span_id = id or str(uuid.uuid4())
new_trace_id = trace_id or str(uuid.uuid4())
self.trace_id = new_trace_id
try:
span_body = {
"id": new_span_id,
"trace_id": new_trace_id,
"name": name,
"start_time": start_time or _get_timestamp(),
"metadata": metadata,
"input": input,
"output": output,
"level": level,
"status_message": status_message,
"parent_observation_id": parent_observation_id,
"version": version,
"end_time": end_time,
"trace": {"release": self.release},
**kwargs,
}
if trace_id is None:
self._generate_trace(new_trace_id, name or new_trace_id)
self.log.debug(f"Creating span {span_body}...")
span_body = CreateSpanBody(**span_body)
event = {
"id": str(uuid.uuid4()),
"type": "span-create",
"body": span_body.dict(exclude_none=True),
}
self.log.debug(f"Creating span {event}...")
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
self._log_memory_usage()
return StatefulSpanClient(
self.client,
new_span_id,
StateType.OBSERVATION,
new_trace_id,
self.task_manager,
)
def event(
self,
*,
id: typing.Optional[str] = None,
trace_id: typing.Optional[str] = None,
parent_observation_id: typing.Optional[str] = None,
name: typing.Optional[str] = None,
start_time: typing.Optional[dt.datetime] = None,
metadata: typing.Optional[typing.Any] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
level: typing.Optional[SpanLevel] = None,
status_message: typing.Optional[str] = None,
version: typing.Optional[str] = None,
**kwargs,
) -> "StatefulSpanClient":
"""Create an event.
An event represents a discrete event in a trace.
Usually, you want to add a event nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
If no trace_id is provided, a new trace is created just for this event.
Args:
id (Optional[str]): The id of the event can be set, otherwise a random id is generated.
trace_id (Optional[str]): The trace ID associated with this event. If not provided, a new trace is created just for this event.
parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI.
start_time (Optional[datetime]): The time at which the event started, defaults to the current time.
metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API.
input (Optional[Any]): The input to the event. Can be any JSON object.
output (Optional[Any]): The output to the event. Can be any JSON object.
level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event.
version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging.
**kwargs: Additional keyword arguments to include in the event.
Returns:
StatefulSpanClient: The created event.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
trace = langfuse.trace(name = "llm-feature")
# Create an event
retrieval = langfuse.event(name = "retrieval", trace_id = trace.id)
```
"""
event_id = id or str(uuid.uuid4())
new_trace_id = trace_id or str(uuid.uuid4())
self.trace_id = new_trace_id
try:
event_body = {
"id": event_id,
"trace_id": new_trace_id,
"name": name,
"start_time": start_time or _get_timestamp(),
"metadata": metadata,
"input": input,
"output": output,
"level": level,
"status_message": status_message,
"parent_observation_id": parent_observation_id,
"version": version,
"trace": {"release": self.release},
**kwargs,
}
if trace_id is None:
self._generate_trace(new_trace_id, name or new_trace_id)
request = CreateEventBody(**event_body)
event = {
"id": str(uuid.uuid4()),
"type": "event-create",
"body": request.dict(exclude_none=True),
}
self.log.debug(f"Creating event {event}...")
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
return StatefulSpanClient(
self.client,
event_id,
StateType.OBSERVATION,
new_trace_id,
self.task_manager,
)
def generation(
self,
*,
id: typing.Optional[str] = None,
trace_id: typing.Optional[str] = None,
parent_observation_id: typing.Optional[str] = None,
name: typing.Optional[str] = None,
start_time: typing.Optional[dt.datetime] = None,
end_time: typing.Optional[dt.datetime] = None,
completion_start_time: typing.Optional[dt.datetime] = None,
metadata: typing.Optional[typing.Any] = None,
level: typing.Optional[SpanLevel] = None,
status_message: typing.Optional[str] = None,
version: typing.Optional[str] = None,
model: typing.Optional[str] = None,
model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
prompt: typing.Optional[PromptClient] = None,
**kwargs,
) -> "StatefulGenerationClient":
"""Create a generation.
A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI.
Usually, you want to add a generation nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id.
If no trace_id is provided, a new trace is created just for this generation.
Args:
id (Optional[str]): The id of the generation can be set, defaults to random id.
trace_id (Optional[str]): The trace ID associated with this generation. If not provided, a new trace is created
parent_observation_id (Optional[str]): The ID of the parent observation, if applicable.
name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
model (Optional[str]): The name of the model used for the generation.
model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
**kwargs: Additional keyword arguments to include in the generation.
Returns:
StatefulGenerationClient: The created generation.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a generation in Langfuse
generation = langfuse.generation(
name="summary-generation",
model="gpt-3.5-turbo",
model_parameters={"maxTokens": "1000", "temperature": "0.9"},
input=[{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Please generate a summary of the following documents ..."}],
metadata={"interface": "whatsapp"}
)
```
"""
new_trace_id = trace_id or str(uuid.uuid4())
new_generation_id = id or str(uuid.uuid4())
self.trace_id = new_trace_id
try:
generation_body = {
"id": new_generation_id,
"trace_id": new_trace_id,
"release": self.release,
"name": name,
"start_time": start_time or _get_timestamp(),
"metadata": metadata,
"input": input,
"output": output,
"level": level,
"status_message": status_message,
"parent_observation_id": parent_observation_id,
"version": version,
"end_time": end_time,
"completion_start_time": completion_start_time,
"model": model,
"model_parameters": model_parameters,
"usage": _convert_usage_input(usage) if usage is not None else None,
"trace": {"release": self.release},
**_create_prompt_context(prompt),
**kwargs,
}
if trace_id is None:
trace = {
"id": new_trace_id,
"release": self.release,
"name": name,
}
request = TraceBody(**trace)
event = {
"id": str(uuid.uuid4()),
"type": "trace-create",
"body": request.dict(exclude_none=True),
}
self.log.debug(f"Creating trace {event}...")
self.task_manager.add_task(event)
self.log.debug(f"Creating generation max {generation_body} {usage}...")
request = CreateGenerationBody(**generation_body)
event = {
"id": str(uuid.uuid4()),
"type": "generation-create",
"body": request.dict(exclude_none=True),
}
self.log.debug(f"Creating top-level generation {event} ...")
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
return StatefulGenerationClient(
self.client,
new_generation_id,
StateType.OBSERVATION,
new_trace_id,
self.task_manager,
)
def _generate_trace(self, trace_id: str, name: str):
trace_dict = {
"id": trace_id,
"release": self.release,
"name": name,
}
trace_body = TraceBody(**trace_dict)
event = {
"id": str(uuid.uuid4()),
"type": "trace-create",
"body": trace_body.dict(exclude_none=True),
}
self.log.debug(f"Creating trace {event}...")
self.task_manager.add_task(event)
def join(self):
"""Blocks until all consumer Threads are terminated. The SKD calls this upon termination of the Python Interpreter.
If called before flushing, consumers might terminate before sending all events to Langfuse API. This method is called at exit of the SKD, right before the Python interpreter closes.
To guarantee all messages have been delivered, you still need to call flush().
"""
try:
return self.task_manager.join()
except Exception as e:
self.log.exception(e)
def flush(self):
"""Flush the internal event queue to the Langfuse API. It blocks until the queue is empty. It should be called when the application shuts down.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Some operations with Langfuse
# Flushing all events to end Langfuse cleanly
langfuse.flush()
```
"""
try:
return self.task_manager.flush()
except Exception as e:
self.log.exception(e)
def shutdown(self):
"""Initiate a graceful shutdown of the Langfuse SDK, ensuring all events are sent to Langfuse API and all consumer Threads are terminated.
This function calls flush() and join() consecutively resulting in a complete shutdown of the SDK. On success of this function, no more events will be sent to Langfuse API.
As the SDK calls join() already on shutdown, refer to flush() to ensure all events arive at the Langfuse API.
"""
try:
return self.task_manager.shutdown()
except Exception as e:
self.log.exception(e)
class StateType(Enum):
"""Enum to distinguish observation and trace states.
Attributes:
OBSERVATION (int): Observation state.
TRACE (int): Trace state.
"""
OBSERVATION = 1
TRACE = 0
class StatefulClient(object):
"""Base class for handling stateful operations in the Langfuse system.
This client is capable of creating different nested Langfuse objects like spans, generations, scores, and events,
associating them with either an observation or a trace based on the specified state type.
Attributes:
client (FernLangfuse): Core interface for Langfuse API interactions.
id (str): Unique identifier of the stateful client (either observation or trace).
state_type (StateType): Enum indicating whether the client is an observation or a trace.
trace_id (str): Id of the trace associated with the stateful client.
task_manager (TaskManager): Manager handling asynchronous tasks for the client.
"""
log = logging.getLogger("langfuse")
def __init__(
self,
client: FernLangfuse,
id: str,
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
):
"""Initialize the StatefulClient.
Args:
client (FernLangfuse): Core interface for Langfuse API interactions.
id (str): Unique identifier of the stateful client (either observation or trace).
state_type (StateType): Enum indicating whether the client is an observation or a trace.
trace_id (str): Id of the trace associated with the stateful client.
task_manager (TaskManager): Manager handling asynchronous tasks for the client.
"""
self.client = client
self.trace_id = trace_id
self.id = id
self.state_type = state_type
self.task_manager = task_manager
def _add_state_to_event(self, body: dict):
if self.state_type == StateType.OBSERVATION:
body["parent_observation_id"] = self.id
body["trace_id"] = self.trace_id
else:
body["trace_id"] = self.id
return body
def _add_default_values(self, body: dict):
if body.get("start_time") is None:
body["start_time"] = _get_timestamp()
return body
def generation(
self,
*,
id: typing.Optional[str] = None,
name: typing.Optional[str] = None,
start_time: typing.Optional[dt.datetime] = None,
end_time: typing.Optional[dt.datetime] = None,
metadata: typing.Optional[typing.Any] = None,
level: typing.Optional[SpanLevel] = None,
status_message: typing.Optional[str] = None,
version: typing.Optional[str] = None,
completion_start_time: typing.Optional[dt.datetime] = None,
model: typing.Optional[str] = None,
model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
prompt: typing.Optional[PromptClient] = None,
**kwargs,
) -> "StatefulGenerationClient":
"""Create a generation nested within the current observation or trace.
A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI.
Args:
id (Optional[str]): The id of the generation can be set, defaults to random id.
name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
model (Optional[str]): The name of the model used for the generation.
model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
**kwargs: Additional keyword arguments to include in the generation.
Returns:
StatefulGenerationClient: The created generation. Use this client to update the generation or create additional nested observations.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(name = "llm-feature")
# Create a nested generation in Langfuse
generation = trace.generation(
name="summary-generation",
model="gpt-3.5-turbo",
model_parameters={"maxTokens": "1000", "temperature": "0.9"},
input=[{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Please generate a summary of the following documents ..."}],
metadata={"interface": "whatsapp"}
)
```
"""
generation_id = id or str(uuid.uuid4())
try:
generation_body = {
"id": generation_id,
"name": name,
"start_time": start_time or _get_timestamp(),
"metadata": metadata,
"level": level,
"status_message": status_message,
"version": version,
"end_time": end_time,
"completion_start_time": completion_start_time,
"model": model,
"model_parameters": model_parameters,
"input": input,
"output": output,
"usage": _convert_usage_input(usage) if usage is not None else None,
**_create_prompt_context(prompt),
**kwargs,
}
generation_body = self._add_state_to_event(generation_body)
new_body = self._add_default_values(generation_body)
new_body = CreateGenerationBody(**new_body)
event = {
"id": str(uuid.uuid4()),
"type": "generation-create",
"body": new_body.dict(exclude_none=True, exclude_unset=False),
}
self.log.debug(f"Creating generation {new_body}...")
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
return StatefulGenerationClient(
self.client,
generation_id,
StateType.OBSERVATION,
self.trace_id,
task_manager=self.task_manager,
)
def span(
self,
*,
id: typing.Optional[str] = None,
name: typing.Optional[str] = None,
start_time: typing.Optional[dt.datetime] = None,
end_time: typing.Optional[dt.datetime] = None,
metadata: typing.Optional[typing.Any] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
level: typing.Optional[SpanLevel] = None,
status_message: typing.Optional[str] = None,
version: typing.Optional[str] = None,
**kwargs,
) -> "StatefulSpanClient":
"""Create a span nested within the current observation or trace.
A span represents durations of units of work in a trace.
Args:
id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.
name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
input (Optional[dict]): The input to the span. Can be any JSON object.
output (Optional[dict]): The output to the span. Can be any JSON object.
version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
**kwargs: Additional keyword arguments to include in the span.
Returns:
StatefulSpanClient: The created span. Use this client to update the span or create additional nested observations.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(name = "llm-feature")
# Create a span
retrieval = langfuse.span(name = "retrieval")
```
"""
span_id = id or str(uuid.uuid4())
try:
span_body = {
"id": span_id,
"name": name,
"start_time": start_time or _get_timestamp(),
"metadata": metadata,
"input": input,
"output": output,
"level": level,
"status_message": status_message,
"version": version,
"end_time": end_time,
**kwargs,
}
self.log.debug(f"Creating span {span_body}...")
new_dict = self._add_state_to_event(span_body)
new_body = self._add_default_values(new_dict)
event = CreateSpanBody(**new_body)
event = {
"id": str(uuid.uuid4()),
"type": "span-create",
"body": event.dict(exclude_none=True),
}
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
return StatefulSpanClient(
self.client,
span_id,
StateType.OBSERVATION,
self.trace_id,
task_manager=self.task_manager,
)
@overload
def score(
self,
*,
id: typing.Optional[str] = None,
name: str,
value: float,
data_type: typing.Optional[Literal["NUMERIC", "BOOLEAN"]] = None,
comment: typing.Optional[str] = None,
config_id: typing.Optional[str] = None,
**kwargs,
) -> "StatefulClient": ...
@overload
def score(
self,
*,
id: typing.Optional[str] = None,
name: str,
value: str,
data_type: typing.Optional[Literal["CATEGORICAL"]] = "CATEGORICAL",
comment: typing.Optional[str] = None,
config_id: typing.Optional[str] = None,
**kwargs,
) -> "StatefulClient": ...
def score(
self,
*,
id: typing.Optional[str] = None,
name: str,
value: typing.Union[float, str],
data_type: typing.Optional[ScoreDataType] = None,
comment: typing.Optional[str] = None,
config_id: typing.Optional[str] = None,
**kwargs,
) -> "StatefulClient":
"""Create a score attached for the current observation or trace.
Args:
name (str): Identifier of the score.
value (Union[float, str]): The value of the score. Should be passed as float for numeric and boolean scores and as string for categorical scores.
data_type (Optional[ScoreDataType]): The data type of the score. When not set, the data type is inferred from the score config's data type, when present.
When no config is set, the data type is inferred from the value's type, i.e. float values are categorized as numeric scores and string values as categorical scores.
comment (Optional[str]): Additional context/explanation of the score.
id (Optional[str]): The id of the score. If not provided, a new UUID is generated.
config_id (Optional[str]): The id of the score config. When set, the score value is validated against the config. Defaults to None.
**kwargs: Additional keyword arguments to include in the score.
Returns:
StatefulClient: The current observation or trace for which the score was created. Passthrough for chaining.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(name="example-application")
# Add score to the trace
trace = trace.score(
name="user-explicit-feedback",
value=0.8,
comment="I like how personalized the response is"
)
```
"""
score_id = id or str(uuid.uuid4())
try:
new_score = {
"id": score_id,
"trace_id": self.trace_id,
"name": name,
"value": value,
"data_type": data_type,
"comment": comment,
"config_id": config_id,
**kwargs,
}
self.log.debug(f"Creating score {new_score}...")
new_dict = self._add_state_to_event(new_score)
if self.state_type == StateType.OBSERVATION:
new_dict["observationId"] = self.id
request = ScoreBody(**new_dict)
event = {
"id": str(uuid.uuid4()),
"type": "score-create",
"body": request.dict(exclude_none=True),
}
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
return StatefulClient(
self.client,
self.id,
self.state_type,
self.trace_id,
task_manager=self.task_manager,
)
def event(
self,
*,
id: typing.Optional[str] = None,
name: typing.Optional[str] = None,
start_time: typing.Optional[dt.datetime] = None,
metadata: typing.Optional[typing.Any] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
level: typing.Optional[SpanLevel] = None,
status_message: typing.Optional[str] = None,
version: typing.Optional[str] = None,
**kwargs,
) -> "StatefulClient":
"""Create an event nested within the current observation or trace.
An event represents a discrete event in a trace.
Args:
id (Optional[str]): The id of the event can be set, otherwise a random id is generated.
name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI.
start_time (Optional[datetime]): The time at which the event started, defaults to the current time.
metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API.
input (Optional[Any]): The input to the event. Can be any JSON object.
output (Optional[Any]): The output to the event. Can be any JSON object.
level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event.
version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging.
**kwargs: Additional keyword arguments to include in the event.
Returns:
StatefulSpanClient: The created event. Use this client to update the event or create additional nested observations.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(name = "llm-feature")
# Create an event
retrieval = trace.event(name = "retrieval")
```
"""
event_id = id or str(uuid.uuid4())
try:
event_body = {
"id": event_id,
"name": name,
"start_time": start_time or _get_timestamp(),
"metadata": metadata,
"input": input,
"output": output,
"level": level,
"status_message": status_message,
"version": version,
**kwargs,
}
new_dict = self._add_state_to_event(event_body)
new_body = self._add_default_values(new_dict)
request = CreateEventBody(**new_body)
event = {
"id": str(uuid.uuid4()),
"type": "event-create",
"body": request.dict(exclude_none=True),
}
self.log.debug(f"Creating event {event}...")
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
return StatefulClient(
self.client,
event_id,
StateType.OBSERVATION,
self.trace_id,
self.task_manager,
)
def get_trace_url(self):
"""Get the URL to see the current trace in the Langfuse UI."""
return f"{self.client._client_wrapper._base_url}/trace/{self.trace_id}"
class StatefulGenerationClient(StatefulClient):
"""Class for handling stateful operations of generations in the Langfuse system. Inherits from StatefulClient.
This client extends the capabilities of the StatefulClient to specifically handle generation,
allowing for the creation, update, and termination of generation processes in Langfuse.
Attributes:
client (FernLangfuse): Core interface for Langfuse API interaction.
id (str): Unique identifier of the generation.
state_type (StateType): Type of the stateful entity (observation or trace).
trace_id (str): Id of trace associated with the generation.
task_manager (TaskManager): Manager for handling asynchronous tasks.
"""
log = logging.getLogger("langfuse")
def __init__(
self,
client: FernLangfuse,
id: str,
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
):
"""Initialize the StatefulGenerationClient."""
super().__init__(client, id, state_type, trace_id, task_manager)
# WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
def update(
self,
*,
name: typing.Optional[str] = None,
start_time: typing.Optional[dt.datetime] = None,
end_time: typing.Optional[dt.datetime] = None,
completion_start_time: typing.Optional[dt.datetime] = None,
metadata: typing.Optional[typing.Any] = None,
level: typing.Optional[SpanLevel] = None,
status_message: typing.Optional[str] = None,
version: typing.Optional[str] = None,
model: typing.Optional[str] = None,
model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
prompt: typing.Optional[PromptClient] = None,
**kwargs,
) -> "StatefulGenerationClient":
"""Update the generation.
Args:
name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`.
completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
model (Optional[str]): The name of the model used for the generation.
model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
**kwargs: Additional keyword arguments to include in the generation.
Returns:
StatefulGenerationClient: The updated generation. Passthrough for chaining.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(name = "llm-feature")
# Create a nested generation in Langfuse
generation = trace.generation(name="summary-generation")
# Update the generation
generation = generation.update(metadata={"interface": "whatsapp"})
```
"""
try:
generation_body = {
"id": self.id,
"trace_id": self.trace_id, # Included to avoid relying on the order of events sent to the API
"name": name,
"start_time": start_time,
"metadata": metadata,
"level": level,
"status_message": status_message,
"version": version,
"end_time": end_time,
"completion_start_time": completion_start_time,
"model": model,
"model_parameters": model_parameters,
"input": input,
"output": output,
"usage": _convert_usage_input(usage) if usage is not None else None,
**_create_prompt_context(prompt),
**kwargs,
}
self.log.debug(f"Update generation {generation_body}...")
request = UpdateGenerationBody(**generation_body)
event = {
"id": str(uuid.uuid4()),
"type": "generation-update",
"body": request.dict(exclude_none=True, exclude_unset=False),
}
self.log.debug(f"Update generation {event}...")
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
return StatefulGenerationClient(
self.client,
self.id,
StateType.OBSERVATION,
self.trace_id,
task_manager=self.task_manager,
)
def end(
self,
*,
name: typing.Optional[str] = None,
start_time: typing.Optional[dt.datetime] = None,
end_time: typing.Optional[dt.datetime] = None,
completion_start_time: typing.Optional[dt.datetime] = None,
metadata: typing.Optional[typing.Any] = None,
level: typing.Optional[SpanLevel] = None,
status_message: typing.Optional[str] = None,
version: typing.Optional[str] = None,
model: typing.Optional[str] = None,
model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
usage: typing.Optional[typing.Union[pydantic.BaseModel, ModelUsage]] = None,
prompt: typing.Optional[PromptClient] = None,
**kwargs,
) -> "StatefulGenerationClient":
"""End the generation, optionally updating its properties.
Args:
name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI.
start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time.
end_time (Optional[datetime.datetime]): Automatically set to the current time. Can be overridden to set a custom end time.
completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration.
metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API.
level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event.
version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging.
model (Optional[str]): The name of the model used for the generation.
model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs.
input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object.
output (Optional[dict]): The completion generated by the model. Can be any string or JSON object.
usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse.
prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation.
**kwargs: Additional keyword arguments to include in the generation.
Returns:
StatefulGenerationClient: The ended generation. Passthrough for chaining.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(name = "llm-feature")
# Create a nested generation in Langfuse
generation = trace.generation(name="summary-generation")
# End the generation and update its properties
generation = generation.end(metadata={"interface": "whatsapp"})
```
"""
return self.update(
name=name,
start_time=start_time,
end_time=end_time or _get_timestamp(),
metadata=metadata,
level=level,
status_message=status_message,
version=version,
completion_start_time=completion_start_time,
model=model,
model_parameters=model_parameters,
input=input,
output=output,
usage=usage,
prompt=prompt,
**kwargs,
)
class StatefulSpanClient(StatefulClient):
"""Class for handling stateful operations of spans in the Langfuse system. Inherits from StatefulClient.
Attributes:
client (FernLangfuse): Core interface for Langfuse API interaction.
id (str): Unique identifier of the span.
state_type (StateType): Type of the stateful entity (observation or trace).
trace_id (str): Id of trace associated with the span.
task_manager (TaskManager): Manager for handling asynchronous tasks.
"""
log = logging.getLogger("langfuse")
def __init__(
self,
client: FernLangfuse,
id: str,
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
):
"""Initialize the StatefulSpanClient."""
super().__init__(client, id, state_type, trace_id, task_manager)
# WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
def update(
self,
*,
name: typing.Optional[str] = None,
start_time: typing.Optional[dt.datetime] = None,
end_time: typing.Optional[dt.datetime] = None,
metadata: typing.Optional[typing.Any] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
level: typing.Optional[SpanLevel] = None,
status_message: typing.Optional[str] = None,
version: typing.Optional[str] = None,
**kwargs,
) -> "StatefulSpanClient":
"""Update the span.
Args:
name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
input (Optional[dict]): The input to the span. Can be any JSON object.
output (Optional[dict]): The output to the span. Can be any JSON object.
version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
**kwargs: Additional keyword arguments to include in the span.
Returns:
StatefulSpanClient: The updated span. Passthrough for chaining.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(name = "llm-feature")
# Create a nested span in Langfuse
span = trace.span(name="retrieval")
# Update the span
span = span.update(metadata={"interface": "whatsapp"})
```
"""
try:
span_body = {
"id": self.id,
"trace_id": self.trace_id, # Included to avoid relying on the order of events sent to the API
"name": name,
"start_time": start_time,
"metadata": metadata,
"input": input,
"output": output,
"level": level,
"status_message": status_message,
"version": version,
"end_time": end_time,
**kwargs,
}
self.log.debug(f"Update span {span_body}...")
request = UpdateSpanBody(**span_body)
event = {
"id": str(uuid.uuid4()),
"type": "span-update",
"body": request.dict(exclude_none=True),
}
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
return StatefulSpanClient(
self.client,
self.id,
StateType.OBSERVATION,
self.trace_id,
task_manager=self.task_manager,
)
def end(
self,
*,
name: typing.Optional[str] = None,
start_time: typing.Optional[dt.datetime] = None,
end_time: typing.Optional[dt.datetime] = None,
metadata: typing.Optional[typing.Any] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
level: typing.Optional[SpanLevel] = None,
status_message: typing.Optional[str] = None,
version: typing.Optional[str] = None,
**kwargs,
) -> "StatefulSpanClient":
"""End the span, optionally updating its properties.
Args:
name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI.
start_time (Optional[datetime]): The time at which the span started, defaults to the current time.
end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`.
metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API.
level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI.
status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event.
input (Optional[dict]): The input to the span. Can be any JSON object.
output (Optional[dict]): The output to the span. Can be any JSON object.
version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging.
**kwargs: Additional keyword arguments to include in the span.
Returns:
StatefulSpanClient: The updated span. Passthrough for chaining.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(name = "llm-feature")
# Create a nested span in Langfuse
span = trace.span(name="retrieval")
# End the span and update its properties
span = span.end(metadata={"interface": "whatsapp"})
```
"""
try:
span_body = {
"name": name,
"start_time": start_time,
"metadata": metadata,
"input": input,
"output": output,
"level": level,
"status_message": status_message,
"version": version,
"end_time": end_time or _get_timestamp(),
**kwargs,
}
return self.update(**span_body)
except Exception as e:
self.log.warning(e)
finally:
return StatefulSpanClient(
self.client,
self.id,
StateType.OBSERVATION,
self.trace_id,
task_manager=self.task_manager,
)
def get_langchain_handler(self, update_parent: bool = False):
"""Get langchain callback handler associated with the current span.
Args:
update_parent (bool): If set to True, the parent observation will be updated with the outcome of the Langchain run.
Returns:
CallbackHandler: An instance of CallbackHandler linked to this StatefulSpanClient.
"""
from langfuse.callback import CallbackHandler
return CallbackHandler(
stateful_client=self, update_stateful_client=update_parent
)
class StatefulTraceClient(StatefulClient):
"""Class for handling stateful operations of traces in the Langfuse system. Inherits from StatefulClient.
Attributes:
client (FernLangfuse): Core interface for Langfuse API interaction.
id (str): Unique identifier of the trace.
state_type (StateType): Type of the stateful entity (observation or trace).
trace_id (str): The trace ID associated with this client.
task_manager (TaskManager): Manager for handling asynchronous tasks.
"""
log = logging.getLogger("langfuse")
def __init__(
self,
client: FernLangfuse,
id: str,
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
):
"""Initialize the StatefulTraceClient."""
super().__init__(client, id, state_type, trace_id, task_manager)
self.task_manager = task_manager
def update(
self,
*,
name: typing.Optional[str] = None,
user_id: typing.Optional[str] = None,
session_id: typing.Optional[str] = None,
version: typing.Optional[str] = None,
release: typing.Optional[str] = None,
input: typing.Optional[typing.Any] = None,
output: typing.Optional[typing.Any] = None,
metadata: typing.Optional[typing.Any] = None,
tags: typing.Optional[typing.List[str]] = None,
public: typing.Optional[bool] = None,
**kwargs,
) -> "StatefulTraceClient":
"""Update the trace.
Args:
name: Identifier of the trace. Useful for sorting/filtering in the UI.
input: The input of the trace. Can be any JSON object.
output: The output of the trace. Can be any JSON object.
metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API.
user_id: The id of the user that triggered the execution. Used to provide user-level analytics.
session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier.
version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging.
release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging.
tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.
public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project.
**kwargs: Additional keyword arguments that can be included in the trace.
Returns:
StatefulTraceClient: The updated trace. Passthrough for chaining.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(
name="example-application",
user_id="user-1234")
)
# Update the trace
trace = trace.update(
output={"result": "success"},
metadata={"interface": "whatsapp"}
)
```
"""
try:
trace_body = {
"id": self.id,
"name": name,
"userId": user_id,
"sessionId": session_id
or kwargs.get("sessionId", None), # backward compatibility
"version": version,
"release": release,
"input": input,
"output": output,
"metadata": metadata,
"public": public,
"tags": tags,
**kwargs,
}
self.log.debug(f"Update trace {trace_body}...")
request = TraceBody(**trace_body)
event = {
"id": str(uuid.uuid4()),
"type": "trace-create",
"body": request.dict(exclude_none=True),
}
self.task_manager.add_task(event)
except Exception as e:
self.log.exception(e)
finally:
return StatefulTraceClient(
self.client,
self.id,
StateType.TRACE,
self.trace_id,
task_manager=self.task_manager,
)
def get_langchain_handler(self, update_parent: bool = False):
"""Get langchain callback handler associated with the current trace.
This method creates and returns a CallbackHandler instance, linking it with the current
trace. Use this if you want to group multiple Langchain runs within a single trace.
Args:
update_parent (bool): If set to True, the parent trace will be updated with the outcome of the Langchain run.
Raises:
ImportError: If the 'langchain' module is not installed, indicating missing functionality.
Returns:
CallbackHandler: Langchain callback handler linked to the current trace.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
# Create a trace
trace = langfuse.trace(name = "llm-feature")
# Get a langchain callback handler
handler = trace.get_langchain_handler()
```
"""
try:
from langfuse.callback import CallbackHandler
self.log.debug(f"Creating new handler for trace {self.id}")
return CallbackHandler(
stateful_client=self,
debug=self.log.level == logging.DEBUG,
update_stateful_client=update_parent,
)
except Exception as e:
self.log.exception(e)
def getNewHandler(self):
"""Alias for the `get_langchain_handler` method. Retrieves a callback handler for the trace. Deprecated."""
return self.get_langchain_handler()
class DatasetItemClient:
"""Class for managing dataset items in Langfuse.
Args:
id (str): Unique identifier of the dataset item.
status (DatasetStatus): The status of the dataset item. Can be either 'ACTIVE' or 'ARCHIVED'.
input (Any): Input data of the dataset item.
expected_output (Optional[Any]): Expected output of the dataset item.
metadata (Optional[Any]): Additional metadata of the dataset item.
source_trace_id (Optional[str]): Identifier of the source trace.
source_observation_id (Optional[str]): Identifier of the source observation.
dataset_id (str): Identifier of the dataset to which this item belongs.
dataset_name (str): Name of the dataset to which this item belongs.
created_at (datetime): Timestamp of dataset item creation.
updated_at (datetime): Timestamp of the last update to the dataset item.
langfuse (Langfuse): Instance of Langfuse client for API interactions.
Example:
```python
from langfuse import Langfuse
langfuse = Langfuse()
dataset = langfuse.get_dataset("<dataset_name>")
for item in dataset.items:
# Generate a completion using the input of every item
completion, generation = llm_app.run(item.input)
# Evaluate the completion
generation.score(
name="example-score",
value=1
)
```
"""
log = logging.getLogger("langfuse")
id: str
status: DatasetStatus
input: typing.Any
expected_output: typing.Optional[typing.Any]
metadata: Optional[Any]
source_trace_id: typing.Optional[str]
source_observation_id: typing.Optional[str]
dataset_id: str
dataset_name: str
created_at: dt.datetime
updated_at: dt.datetime
langfuse: Langfuse
def __init__(self, dataset_item: DatasetItem, langfuse: Langfuse):
"""Initialize the DatasetItemClient."""
self.id = dataset_item.id
self.status = dataset_item.status
self.input = dataset_item.input
self.expected_output = dataset_item.expected_output
self.metadata = dataset_item.metadata
self.source_trace_id = dataset_item.source_trace_id
self.source_observation_id = dataset_item.source_observation_id
self.dataset_id = dataset_item.dataset_id
self.dataset_name = dataset_item.dataset_name
self.created_at = dataset_item.created_at
self.updated_at = dataset_item.updated_at
self.langfuse = langfuse
def flush(self, observation: StatefulClient, run_name: str):
"""Flushes an observations task manager's queue.
Used before creating a dataset run item to ensure all events are persistent.
Args:
observation (StatefulClient): The observation or trace client associated with the dataset item.
run_name (str): The name of the dataset run.
"""
observation.task_manager.flush()
def link(
self,
trace_or_observation: typing.Union[StatefulClient, str, None],
run_name: str,
run_metadata: Optional[Any] = None,
run_description: Optional[str] = None,
trace_id: Optional[str] = None,
observation_id: Optional[str] = None,
):
"""Link the dataset item to observation within a specific dataset run. Creates a dataset run item.
Args:
trace_or_observation (Union[StatefulClient, str, None]): The trace or observation object to link. Deprecated: can also be an observation ID.
run_name (str): The name of the dataset run.
run_metadata (Optional[Any]): Additional metadata to include in dataset run.
run_description (Optional[str]): Description of the dataset run.
trace_id (Optional[str]): The trace ID to link to the dataset item. Set trace_or_observation to None if trace_id is provided.
observation_id (Optional[str]): The observation ID to link to the dataset item (optional). Set trace_or_observation to None if trace_id is provided.
"""
parsed_trace_id: str = None
parsed_observation_id: str = None
if isinstance(trace_or_observation, StatefulClient):
# flush the queue before creating the dataset run item
# to ensure that all events are persisted.
if trace_or_observation.state_type == StateType.TRACE:
parsed_trace_id = trace_or_observation.trace_id
elif trace_or_observation.state_type == StateType.OBSERVATION:
parsed_observation_id = trace_or_observation.id
parsed_trace_id = trace_or_observation.trace_id
# legacy support for observation_id
elif isinstance(trace_or_observation, str):
parsed_observation_id = trace_or_observation
elif trace_or_observation is None:
if trace_id is not None:
parsed_trace_id = trace_id
if observation_id is not None:
parsed_observation_id = observation_id
else:
raise ValueError(
"trace_id must be provided if trace_or_observation is None"
)
else:
raise ValueError(
"trace_or_observation (arg) or trace_id (kwarg) must be provided to link the dataset item"
)
self.log.debug(
f"Creating dataset run item: {run_name} {self.id} {parsed_trace_id} {parsed_observation_id}"
)
self.langfuse.client.dataset_run_items.create(
request=CreateDatasetRunItemRequest(
runName=run_name,
datasetItemId=self.id,
traceId=parsed_trace_id,
observationId=parsed_observation_id,
metadata=run_metadata,
runDescription=run_description,
)
)
def get_langchain_handler(
self,
*,
run_name: str,
run_description: Optional[str] = None,
run_metadata: Optional[Any] = None,
):
"""Create and get a langchain callback handler linked to this dataset item.
Args:
run_name (str): The name of the dataset run to be used in the callback handler.
run_description (Optional[str]): Description of the dataset run.
run_metadata (Optional[Any]): Additional metadata to include in dataset run.
Returns:
CallbackHandler: An instance of CallbackHandler linked to the dataset item.
"""
metadata = {
"dataset_item_id": self.id,
"run_name": run_name,
"dataset_id": self.dataset_id,
}
trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
self.link(
trace, run_name, run_metadata=run_metadata, run_description=run_description
)
return trace.get_langchain_handler(update_parent=True)
@contextmanager
def observe(
self,
*,
run_name: str,
run_description: Optional[str] = None,
run_metadata: Optional[Any] = None,
trace_id: Optional[str] = None,
):
"""Observes a dataset run within the Langfuse client.
Args:
run_name (str): The name of the dataset run.
root_trace (Optional[StatefulTraceClient]): The root trace client to use for the dataset run. If not provided, a new trace client will be created.
run_description (Optional[str]): The description of the dataset run.
run_metadata (Optional[Any]): Additional metadata for the dataset run.
Yields:
StatefulTraceClient: The trace associated with the dataset run.
"""
from langfuse.decorators import langfuse_context
root_trace_id = trace_id or str(uuid.uuid4())
langfuse_context._set_root_trace_id(root_trace_id)
try:
yield root_trace_id
finally:
self.link(
run_name=run_name,
run_metadata=run_metadata,
run_description=run_description,
trace_or_observation=None,
trace_id=root_trace_id,
)
@contextmanager
def observe_llama_index(
self,
*,
run_name: str,
run_description: Optional[str] = None,
run_metadata: Optional[Any] = None,
llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {},
):
"""Context manager for observing LlamaIndex operations linked to this dataset item.
This method sets up a LlamaIndex callback handler that integrates with Langfuse, allowing detailed logging
and tracing of LlamaIndex operations within the context of a specific dataset run. It ensures that all
operations performed within the context are linked to the appropriate dataset item and run in Langfuse.
Args:
run_name (str): The name of the dataset run.
run_description (Optional[str]): Description of the dataset run. Defaults to None.
run_metadata (Optional[Any]): Additional metadata for the dataset run. Defaults to None.
llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Keyword arguments to pass
to the LlamaIndex integration constructor. Defaults to an empty dictionary.
Yields:
LlamaIndexCallbackHandler: The callback handler for LlamaIndex operations.
Example:
```python
dataset_item = dataset.items[0]
with dataset_item.observe_llama_index(run_name="example-run", run_description="Example LlamaIndex run") as handler:
# Perform LlamaIndex operations here
some_llama_index_operation()
```
Raises:
ImportError: If required modules for LlamaIndex integration are not available.
"""
metadata = {
"dataset_item_id": self.id,
"run_name": run_name,
"dataset_id": self.dataset_id,
}
trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
self.link(
trace, run_name, run_metadata=run_metadata, run_description=run_description
)
try:
import llama_index.core
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager
from langfuse.llama_index import LlamaIndexCallbackHandler
callback_handler = LlamaIndexCallbackHandler(
**llama_index_integration_constructor_kwargs,
)
callback_handler.set_root(trace, update_root=True)
# Temporarily set the global handler to the new handler if previous handler is a LlamaIndexCallbackHandler
# LlamaIndex does not adding two errors of same type, so if global handler is already a LlamaIndexCallbackHandler, we need to remove it
prev_global_handler = llama_index.core.global_handler
prev_langfuse_handler = None
if isinstance(prev_global_handler, LlamaIndexCallbackHandler):
llama_index.core.global_handler = None
if Settings.callback_manager is None:
Settings.callback_manager = CallbackManager([callback_handler])
else:
for handler in Settings.callback_manager.handlers:
if isinstance(handler, LlamaIndexCallbackHandler):
prev_langfuse_handler = handler
Settings.callback_manager.remove_handler(handler)
Settings.callback_manager.add_handler(callback_handler)
except Exception as e:
self.log.exception(e)
try:
yield callback_handler
finally:
# Reset the handlers
Settings.callback_manager.remove_handler(callback_handler)
if prev_langfuse_handler is not None:
Settings.callback_manager.add_handler(prev_langfuse_handler)
llama_index.core.global_handler = prev_global_handler
def get_llama_index_handler(
self,
*,
run_name: str,
run_description: Optional[str] = None,
run_metadata: Optional[Any] = None,
llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {},
):
"""Create and get a llama-index callback handler linked to this dataset item.
Args:
run_name (str): The name of the dataset run to be used in the callback handler.
run_description (Optional[str]): Description of the dataset run.
run_metadata (Optional[Any]): Additional metadata to include in dataset run.
llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Additional keyword arguments to pass to the LlamaIndex integration constructor.
Returns:
LlamaIndexCallbackHandler: An instance of LlamaIndexCallbackHandler linked to the dataset item.
"""
metadata = {
"dataset_item_id": self.id,
"run_name": run_name,
"dataset_id": self.dataset_id,
}
trace = self.langfuse.trace(name="dataset-run", metadata=metadata)
self.link(
trace, run_name, run_metadata=run_metadata, run_description=run_description
)
try:
from langfuse.llama_index.llama_index import LlamaIndexCallbackHandler
callback_handler = LlamaIndexCallbackHandler(
**llama_index_integration_constructor_kwargs,
)
callback_handler.set_root(trace, update_root=True)
return callback_handler
except Exception as e:
self.log.exception(e)
class DatasetClient:
"""Class for managing datasets in Langfuse.
Attributes:
id (str): Unique identifier of the dataset.
name (str): Name of the dataset.
description (Optional[str]): Description of the dataset.
metadata (Optional[typing.Any]): Additional metadata of the dataset.
project_id (str): Identifier of the project to which the dataset belongs.
dataset_name (str): Name of the dataset.
created_at (datetime): Timestamp of dataset creation.
updated_at (datetime): Timestamp of the last update to the dataset.
items (List[DatasetItemClient]): List of dataset items associated with the dataset.
runs (List[str]): List of dataset runs associated with the dataset. Deprecated.
Example:
Print the input of each dataset item in a dataset.
```python
from langfuse import Langfuse
langfuse = Langfuse()
dataset = langfuse.get_dataset("<dataset_name>")
for item in dataset.items:
print(item.input)
```
"""
id: str
name: str
description: Optional[str]
project_id: str
dataset_name: str # for backward compatibility, to be deprecated
metadata: Optional[Any]
created_at: dt.datetime
updated_at: dt.datetime
items: typing.List[DatasetItemClient]
runs: typing.List[str] = [] # deprecated
def __init__(self, dataset: Dataset, items: typing.List[DatasetItemClient]):
"""Initialize the DatasetClient."""
self.id = dataset.id
self.name = dataset.name
self.description = dataset.description
self.project_id = dataset.project_id
self.metadata = dataset.metadata
self.dataset_name = dataset.name # for backward compatibility, to be deprecated
self.created_at = dataset.created_at
self.updated_at = dataset.updated_at
self.items = items