import json
import logging
from typing import Any, Dict, Literal
try:
from agents import tracing # type: ignore[import]
HAVE_AGENTS = True
except ImportError:
HAVE_AGENTS = False
logger = logging.getLogger(__name__)
RunTypeT = Literal["tool", "chain", "llm", "retriever", "embedding", "prompt", "parser"]
if HAVE_AGENTS:
def parse_io(data: Any, default_key: str = "output") -> Dict:
"""Parse inputs or outputs into a dictionary format.
Args:
data: The data to parse (can be inputs or outputs)
default_key: The default key to use if data is not a dict
("input" or "output")
Returns:
Dict: The parsed data as a dictionary
"""
if isinstance(data, dict):
data_ = data
elif isinstance(data, str):
try:
parsed_json = json.loads(data)
if isinstance(parsed_json, dict):
data_ = parsed_json
else:
data_ = {default_key: data}
except json.JSONDecodeError:
data_ = {default_key: data}
elif (
data is not None
and hasattr(data, "model_dump")
and callable(data.model_dump)
and not isinstance(data, type)
):
try:
data_ = data.model_dump(exclude_none=True, mode="json")
except Exception as e:
logger.debug(
f"Failed to use model_dump to serialize {type(data)} to JSON: {e}"
)
data_ = {default_key: data}
else:
data_ = {default_key: data}
return data_
def get_run_type(span: tracing.Span) -> RunTypeT:
span_type = getattr(span.span_data, "type", None)
if span_type in ["agent", "handoff", "custom"]:
return "chain"
elif span_type in ["function", "guardrail"]:
return "tool"
elif span_type in ["generation", "response"]:
return "llm"
else:
return "chain"
def get_run_name(span: tracing.Span) -> str:
if hasattr(span.span_data, "name") and span.span_data.name:
return span.span_data.name
span_type = getattr(span.span_data, "type", None)
if span_type == "generation":
return "Generation"
elif span_type == "response":
return "Response"
elif span_type == "handoff":
return "Handoff"
else:
return "Span"
def _extract_function_span_data(
span_data: tracing.FunctionSpanData,
) -> Dict[str, Any]:
return {
"inputs": parse_io(span_data.input, "input"),
"outputs": parse_io(span_data.output, "output"),
}
def _extract_generation_span_data(
span_data: tracing.GenerationSpanData,
) -> Dict[str, Any]:
data = {
"inputs": parse_io(span_data.input, "input"),
"outputs": parse_io(span_data.output, "output"),
"invocation_params": {
"model": span_data.model,
"model_config": span_data.model_config,
},
}
if span_data.usage:
data["outputs"]["usage_metadata"] = {
"total_tokens": span_data.usage.get("total_tokens"),
"input_tokens": span_data.usage.get("prompt_tokens"),
"output_tokens": span_data.usage.get("completion_tokens"),
}
return data
def _extract_response_span_data(
span_data: tracing.ResponseSpanData,
) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if span_data.input is not None:
data["inputs"] = {
"input": span_data.input,
"instructions": span_data.response.instructions,
}
if span_data.response is not None:
response = span_data.response.model_dump(exclude_none=True, mode="json")
data["outputs"] = {"output": response.pop("output", [])}
if usage := response.pop("usage", None):
# tokens -> token
if "output_tokens_details" in usage:
usage["output_token_details"] = usage.pop("output_tokens_details")
usage["output_token_details"]["reasoning"] = usage[
"output_token_details"
].pop("reasoning_tokens", 0)
if "input_tokens_details" in usage:
usage["input_token_details"] = usage.pop("input_tokens_details")
usage["input_token_details"]["cache_read"] = usage[
"input_token_details"
].pop("cached_tokens", 0)
data["outputs"]["usage_metadata"] = usage
data["invocation_params"] = {
k: v
for k, v in response.items()
if k
in (
"max_output_tokens",
"model",
"parallel_tool_calls",
"reasoning",
"temperature",
"text",
"tool_choice",
"tools",
"top_p",
"truncation",
)
}
metadata = {
k: v
for k, v in response.items()
if k
not in (
{"output", "usage", "instructions"}.union(data["invocation_params"])
)
}
metadata.update(
{
"ls_model_name": data["invocation_params"].get("model"),
"ls_max_tokens": data["invocation_params"].get("max_output_tokens"),
"ls_temperature": data["invocation_params"].get("temperature"),
"ls_model_type": "chat",
"ls_provider": "openai",
}
)
data["metadata"] = metadata
return data
def _extract_agent_span_data(span_data: tracing.AgentSpanData) -> Dict[str, Any]:
return {
"invocation_params": {
"tools": span_data.tools,
"handoffs": span_data.handoffs,
},
"metadata": {
"output_type": span_data.output_type,
},
}
def _extract_handoff_span_data(
span_data: tracing.HandoffSpanData,
) -> Dict[str, Any]:
return {
"inputs": {
"from_agent": span_data.from_agent,
"to_agent": span_data.to_agent,
}
}
def _extract_guardrail_span_data(
span_data: tracing.GuardrailSpanData,
) -> Dict[str, Any]:
return {"metadata": {"triggered": span_data.triggered}}
def _extract_custom_span_data(span_data: tracing.CustomSpanData) -> Dict[str, Any]:
return {"metadata": span_data.data}
def extract_span_data(span: tracing.Span) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if isinstance(span.span_data, tracing.FunctionSpanData):
data.update(_extract_function_span_data(span.span_data))
elif isinstance(span.span_data, tracing.GenerationSpanData):
data.update(_extract_generation_span_data(span.span_data))
elif isinstance(span.span_data, tracing.ResponseSpanData):
data.update(_extract_response_span_data(span.span_data))
elif isinstance(span.span_data, tracing.AgentSpanData):
data.update(_extract_agent_span_data(span.span_data))
elif isinstance(span.span_data, tracing.HandoffSpanData):
data.update(_extract_handoff_span_data(span.span_data))
elif isinstance(span.span_data, tracing.GuardrailSpanData):
data.update(_extract_guardrail_span_data(span.span_data))
elif isinstance(span.span_data, tracing.CustomSpanData):
data.update(_extract_custom_span_data(span.span_data))
else:
return {}
return data