"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
```diff
- import openai
+ from langfuse.openai import openai
```
Langfuse automatically tracks:
- All prompts/completions with support for streaming, async and functions
- Latencies
- API Errors
- Model usage (tokens) and cost (USD)
The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK.
See docs for more details: https://langfuse.com/docs/integrations/openai
"""
import copy
import logging
import types
from collections import defaultdict
from typing import List, Optional
import openai.resources
from openai._types import NotGiven
from packaging.version import Version
from wrapt import wrap_function_wrapper
from langfuse import Langfuse
from langfuse.client import StatefulGenerationClient
from langfuse.decorators import langfuse_context
from langfuse.utils import _get_timestamp
from langfuse.utils.langfuse_singleton import LangfuseSingleton
try:
import openai
except ImportError:
raise ModuleNotFoundError(
"Please install OpenAI to use this feature: 'pip install openai'"
)
try:
from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI # noqa: F401
except ImportError:
AsyncAzureOpenAI = None
AsyncOpenAI = None
AzureOpenAI = None
OpenAI = None
log = logging.getLogger("langfuse")
class OpenAiDefinition:
module: str
object: str
method: str
type: str
sync: bool
def __init__(self, module: str, object: str, method: str, type: str, sync: bool):
self.module = module
self.object = object
self.method = method
self.type = type
self.sync = sync
OPENAI_METHODS_V0 = [
OpenAiDefinition(
module="openai",
object="ChatCompletion",
method="create",
type="chat",
sync=True,
),
OpenAiDefinition(
module="openai",
object="Completion",
method="create",
type="completion",
sync=True,
),
]
OPENAI_METHODS_V1 = [
OpenAiDefinition(
module="openai.resources.chat.completions",
object="Completions",
method="create",
type="chat",
sync=True,
),
OpenAiDefinition(
module="openai.resources.completions",
object="Completions",
method="create",
type="completion",
sync=True,
),
OpenAiDefinition(
module="openai.resources.chat.completions",
object="AsyncCompletions",
method="create",
type="chat",
sync=False,
),
OpenAiDefinition(
module="openai.resources.completions",
object="AsyncCompletions",
method="create",
type="completion",
sync=False,
),
]
class OpenAiArgsExtractor:
def __init__(
self,
name=None,
metadata=None,
trace_id=None,
session_id=None,
user_id=None,
tags=None,
parent_observation_id=None,
langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API
**kwargs,
):
self.args = {}
self.args["name"] = name
self.args["metadata"] = (
metadata
if "response_format" not in kwargs
else {**(metadata or {}), "response_format": kwargs["response_format"]}
)
self.args["trace_id"] = trace_id
self.args["session_id"] = session_id
self.args["user_id"] = user_id
self.args["tags"] = tags
self.args["parent_observation_id"] = parent_observation_id
self.args["langfuse_prompt"] = langfuse_prompt
self.kwargs = kwargs
def get_langfuse_args(self):
return {**self.args, **self.kwargs}
def get_openai_args(self):
return self.kwargs
def _langfuse_wrapper(func):
def _with_langfuse(open_ai_definitions, initialize):
def wrapper(wrapped, instance, args, kwargs):
return func(open_ai_definitions, initialize, wrapped, args, kwargs)
return wrapper
return _with_langfuse
def _extract_chat_prompt(kwargs: any):
"""Extracts the user input from prompts. Returns an array of messages or dict with messages and functions"""
prompt = {}
if kwargs.get("functions") is not None:
prompt.update({"functions": kwargs["functions"]})
if kwargs.get("function_call") is not None:
prompt.update({"function_call": kwargs["function_call"]})
if kwargs.get("tools") is not None:
prompt.update({"tools": kwargs["tools"]})
if prompt:
# uf user provided functions, we need to send these together with messages to langfuse
prompt.update(
{
"messages": _filter_image_data(kwargs.get("messages", [])),
}
)
return prompt
else:
# vanilla case, only send messages in openai format to langfuse
return _filter_image_data(kwargs.get("messages", []))
def _extract_chat_response(kwargs: any):
"""Extracts the llm output from the response."""
response = {
"role": kwargs.get("role", None),
}
if kwargs.get("function_call") is not None:
response.update({"function_call": kwargs["function_call"]})
if kwargs.get("tool_calls") is not None:
response.update({"tool_calls": kwargs["tool_calls"]})
response.update(
{
"content": kwargs.get("content", None),
}
)
return response
def _get_langfuse_data_from_kwargs(
resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs
):
name = kwargs.get("name", "OpenAI-generation")
if name is None:
name = "OpenAI-generation"
if name is not None and not isinstance(name, str):
raise TypeError("name must be a string")
decorator_context_observation_id = langfuse_context.get_current_observation_id()
decorator_context_trace_id = langfuse_context.get_current_trace_id()
trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id
if trace_id is not None and not isinstance(trace_id, str):
raise TypeError("trace_id must be a string")
session_id = kwargs.get("session_id", None)
if session_id is not None and not isinstance(session_id, str):
raise TypeError("session_id must be a string")
user_id = kwargs.get("user_id", None)
if user_id is not None and not isinstance(user_id, str):
raise TypeError("user_id must be a string")
tags = kwargs.get("tags", None)
if tags is not None and (
not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags)
):
raise TypeError("tags must be a list of strings")
# Update trace params in decorator context if specified in openai call
if decorator_context_trace_id:
langfuse_context.update_current_trace(
session_id=session_id, user_id=user_id, tags=tags
)
parent_observation_id = kwargs.get("parent_observation_id", None) or (
decorator_context_observation_id
if decorator_context_observation_id != decorator_context_trace_id
else None
)
if parent_observation_id is not None and not isinstance(parent_observation_id, str):
raise TypeError("parent_observation_id must be a string")
if parent_observation_id is not None and trace_id is None:
raise ValueError("parent_observation_id requires trace_id to be set")
metadata = kwargs.get("metadata", {})
if metadata is not None and not isinstance(metadata, dict):
raise TypeError("metadata must be a dictionary")
model = kwargs.get("model", None) or None
prompt = None
if resource.type == "completion":
prompt = kwargs.get("prompt", None)
elif resource.type == "chat":
prompt = _extract_chat_prompt(kwargs)
is_nested_trace = False
if trace_id:
is_nested_trace = True
langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags)
else:
trace_id = (
decorator_context_trace_id
or langfuse.trace(
session_id=session_id,
user_id=user_id,
tags=tags,
name=name,
input=prompt,
metadata=metadata,
).id
)
parsed_temperature = (
kwargs.get("temperature", 1)
if not isinstance(kwargs.get("temperature", 1), NotGiven)
else 1
)
parsed_max_tokens = (
kwargs.get("max_tokens", float("inf"))
if not isinstance(kwargs.get("max_tokens", float("inf")), NotGiven)
else float("inf")
)
parsed_top_p = (
kwargs.get("top_p", 1)
if not isinstance(kwargs.get("top_p", 1), NotGiven)
else 1
)
parsed_frequency_penalty = (
kwargs.get("frequency_penalty", 0)
if not isinstance(kwargs.get("frequency_penalty", 0), NotGiven)
else 0
)
parsed_presence_penalty = (
kwargs.get("presence_penalty", 0)
if not isinstance(kwargs.get("presence_penalty", 0), NotGiven)
else 0
)
parsed_seed = (
kwargs.get("seed", None)
if not isinstance(kwargs.get("seed", None), NotGiven)
else None
)
modelParameters = {
"temperature": parsed_temperature,
"max_tokens": parsed_max_tokens, # casing?
"top_p": parsed_top_p,
"frequency_penalty": parsed_frequency_penalty,
"presence_penalty": parsed_presence_penalty,
}
if parsed_seed is not None:
modelParameters["seed"] = parsed_seed
langfuse_prompt = kwargs.get("langfuse_prompt", None)
return {
"name": name,
"metadata": metadata,
"trace_id": trace_id,
"parent_observation_id": parent_observation_id,
"user_id": user_id,
"start_time": start_time,
"input": prompt,
"model_parameters": modelParameters,
"model": model or None,
"prompt": langfuse_prompt,
}, is_nested_trace
def _create_langfuse_update(
completion, generation: StatefulGenerationClient, completion_start_time, model=None
):
update = {
"end_time": _get_timestamp(),
"output": completion,
"completion_start_time": completion_start_time,
}
if model is not None:
update["model"] = model
generation.update(**update)
def _extract_streamed_openai_response(resource, chunks):
completion = defaultdict(str) if resource.type == "chat" else ""
model = None
completion_start_time = None
for index, i in enumerate(chunks):
if index == 0:
completion_start_time = _get_timestamp()
if _is_openai_v1():
i = i.__dict__
model = model or i.get("model", None) or None
choices = i.get("choices", [])
for choice in choices:
if _is_openai_v1():
choice = choice.__dict__
if resource.type == "chat":
delta = choice.get("delta", None)
if _is_openai_v1():
delta = delta.__dict__
if delta.get("role", None) is not None:
completion["role"] = delta["role"]
if delta.get("content", None) is not None:
completion["content"] = (
delta.get("content", None)
if completion["content"] is None
else completion["content"] + delta.get("content", None)
)
elif delta.get("function_call", None) is not None:
curr = completion["function_call"]
tool_call_chunk = delta.get("function_call", None)
if not curr:
completion["function_call"] = {
"name": getattr(tool_call_chunk, "name", ""),
"arguments": getattr(tool_call_chunk, "arguments", ""),
}
else:
curr["name"] = curr["name"] or getattr(
tool_call_chunk, "name", None
)
curr["arguments"] += getattr(tool_call_chunk, "arguments", "")
elif delta.get("tool_calls", None) is not None:
curr = completion["tool_calls"]
tool_call_chunk = getattr(
delta.get("tool_calls", None)[0], "function", None
)
if not curr:
completion["tool_calls"] = {
"name": getattr(tool_call_chunk, "name", ""),
"arguments": getattr(tool_call_chunk, "arguments", ""),
}
else:
curr["name"] = curr["name"] or getattr(
tool_call_chunk, "name", None
)
curr["arguments"] += getattr(tool_call_chunk, "arguments", None)
if resource.type == "completion":
completion += choice.get("text", None)
def get_response_for_chat():
return (
completion["content"]
or (
completion["function_call"]
and {
"role": "assistant",
"function_call": completion["function_call"],
}
)
or (
completion["tool_calls"]
and {
"role": "assistant",
"tool_calls": [{"function": completion["tool_calls"]}],
}
)
or None
)
return (
model,
completion_start_time,
get_response_for_chat() if resource.type == "chat" else completion,
)
def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response):
model = response.get("model", None) or None
completion = None
if resource.type == "completion":
choices = response.get("choices", [])
if len(choices) > 0:
choice = choices[-1]
completion = choice.text if _is_openai_v1() else choice.get("text", None)
elif resource.type == "chat":
choices = response.get("choices", [])
if len(choices) > 0:
choice = choices[-1]
completion = (
_extract_chat_response(choice.message.__dict__)
if _is_openai_v1()
else choice.get("message", None)
)
usage = response.get("usage", None)
return model, completion, usage.__dict__ if _is_openai_v1() else usage
def _is_openai_v1():
return Version(openai.__version__) >= Version("1.0.0")
def _is_streaming_response(response):
return (
isinstance(response, types.GeneratorType)
or isinstance(response, types.AsyncGeneratorType)
or (_is_openai_v1() and isinstance(response, openai.Stream))
or (_is_openai_v1() and isinstance(response, openai.AsyncStream))
)
@_langfuse_wrapper
def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs):
new_langfuse: Langfuse = initialize()
start_time = _get_timestamp()
arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
generation, is_nested_trace = _get_langfuse_data_from_kwargs(
open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
)
generation = new_langfuse.generation(**generation)
try:
openai_response = wrapped(**arg_extractor.get_openai_args())
if _is_streaming_response(openai_response):
return LangfuseResponseGeneratorSync(
resource=open_ai_resource,
response=openai_response,
generation=generation,
langfuse=new_langfuse,
is_nested_trace=is_nested_trace,
)
else:
model, completion, usage = _get_langfuse_data_from_default_response(
open_ai_resource,
openai_response.__dict__ if _is_openai_v1() else openai_response,
)
generation.update(
model=model, output=completion, end_time=_get_timestamp(), usage=usage
)
# Avoiding the trace-update if trace-id is provided by user.
if not is_nested_trace:
new_langfuse.trace(id=generation.trace_id, output=completion)
return openai_response
except Exception as ex:
log.warning(ex)
model = kwargs.get("model", None) or None
generation.update(
end_time=_get_timestamp(),
status_message=str(ex),
level="ERROR",
model=model,
usage={"input_cost": 0, "output_cost": 0, "total_cost": 0},
)
raise ex
@_langfuse_wrapper
async def _wrap_async(
open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs
):
new_langfuse = initialize()
start_time = _get_timestamp()
arg_extractor = OpenAiArgsExtractor(*args, **kwargs)
generation, is_nested_trace = _get_langfuse_data_from_kwargs(
open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args()
)
generation = new_langfuse.generation(**generation)
try:
openai_response = await wrapped(**arg_extractor.get_openai_args())
if _is_streaming_response(openai_response):
return LangfuseResponseGeneratorAsync(
resource=open_ai_resource,
response=openai_response,
generation=generation,
langfuse=new_langfuse,
is_nested_trace=is_nested_trace,
)
else:
model, completion, usage = _get_langfuse_data_from_default_response(
open_ai_resource,
openai_response.__dict__ if _is_openai_v1() else openai_response,
)
generation.update(
model=model,
output=completion,
end_time=_get_timestamp(),
usage=usage,
)
# Avoiding the trace-update if trace-id is provided by user.
if not is_nested_trace:
new_langfuse.trace(id=generation.trace_id, output=completion)
return openai_response
except Exception as ex:
model = kwargs.get("model", None) or None
generation.update(
end_time=_get_timestamp(),
status_message=str(ex),
level="ERROR",
model=model,
usage={"input_cost": 0, "output_cost": 0, "total_cost": 0},
)
raise ex
class OpenAILangfuse:
_langfuse: Optional[Langfuse] = None
def initialize(self):
self._langfuse = LangfuseSingleton().get(
public_key=openai.langfuse_public_key,
secret_key=openai.langfuse_secret_key,
host=openai.langfuse_host,
debug=openai.langfuse_debug,
enabled=openai.langfuse_enabled,
sdk_integration="openai",
sample_rate=openai.langfuse_sample_rate,
)
return self._langfuse
def flush(cls):
cls._langfuse.flush()
def langfuse_auth_check(self):
"""Check if the provided Langfuse credentials (public and secret key) are valid.
Raises:
Exception: If no projects were found for the provided credentials.
Note:
This method is blocking. It is discouraged to use it in production code.
"""
if self._langfuse is None:
self.initialize()
return self._langfuse.auth_check()
def register_tracing(self):
resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0
for resource in resources:
wrap_function_wrapper(
resource.module,
f"{resource.object}.{resource.method}",
_wrap(resource, self.initialize)
if resource.sync
else _wrap_async(resource, self.initialize),
)
setattr(openai, "langfuse_public_key", None)
setattr(openai, "langfuse_secret_key", None)
setattr(openai, "langfuse_host", None)
setattr(openai, "langfuse_debug", None)
setattr(openai, "langfuse_enabled", True)
setattr(openai, "langfuse_sample_rate", None)
setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
setattr(openai, "flush_langfuse", self.flush)
modifier = OpenAILangfuse()
modifier.register_tracing()
# DEPRECATED: Use `openai.langfuse_auth_check()` instead
def auth_check():
if modifier._langfuse is None:
modifier.initialize()
return modifier._langfuse.auth_check()
def _filter_image_data(messages: List[dict]):
"""https://platform.openai.com/docs/guides/vision?lang=python
The messages array remains the same, but the 'image_url' is removed from the 'content' array.
It should only be removed if the value starts with 'data:image/jpeg;base64,'
"""
output_messages = copy.deepcopy(messages)
for message in output_messages:
content = (
message.get("content", None)
if isinstance(message, dict)
else getattr(message, "content", None)
)
if content is not None:
for index, item in enumerate(content):
if isinstance(item, dict) and item.get("image_url", None) is not None:
url = item["image_url"]["url"]
if url.startswith("data:image/"):
del content[index]["image_url"]
return output_messages
class LangfuseResponseGeneratorSync:
def __init__(
self,
*,
resource,
response,
generation,
langfuse,
is_nested_trace,
):
self.items = []
self.resource = resource
self.response = response
self.generation = generation
self.langfuse = langfuse
self.is_nested_trace = is_nested_trace
def __iter__(self):
try:
for i in self.response:
self.items.append(i)
yield i
finally:
self._finalize()
def __enter__(self):
return self.__iter__()
def __exit__(self, exc_type, exc_value, traceback):
pass
def _finalize(self):
model, completion_start_time, completion = _extract_streamed_openai_response(
self.resource, self.items
)
# Avoiding the trace-update if trace-id is provided by user.
if not self.is_nested_trace:
self.langfuse.trace(id=self.generation.trace_id, output=completion)
_create_langfuse_update(
completion, self.generation, completion_start_time, model=model
)
class LangfuseResponseGeneratorAsync:
def __init__(
self,
*,
resource,
response,
generation,
langfuse,
is_nested_trace,
):
self.items = []
self.resource = resource
self.response = response
self.generation = generation
self.langfuse = langfuse
self.is_nested_trace = is_nested_trace
async def __aiter__(self):
try:
async for i in self.response:
self.items.append(i)
yield i
finally:
await self._finalize()
async def __aenter__(self):
return self.__aiter__()
async def __aexit__(self, exc_type, exc_value, traceback):
pass
async def _finalize(self):
model, completion_start_time, completion = _extract_streamed_openai_response(
self.resource, self.items
)
# Avoiding the trace-update if trace-id is provided by user.
if not self.is_nested_trace:
self.langfuse.trace(id=self.generation.trace_id, output=completion)
_create_langfuse_update(
completion, self.generation, completion_start_time, model=model
)