import datetime import logging import uuid from typing import Dict, Optional from langsmith import run_trees as rt try: from agents import tracing # type: ignore[import] import langsmith.wrappers._agent_utils as agent_utils HAVE_AGENTS = True except ImportError: HAVE_AGENTS = False class OpenAIAgentsTracingProcessor: """Tracing processor for the `OpenAI Agents SDK <https://openai.github.io/openai-agents-python/>`_. Traces all intermediate steps of your OpenAI Agent to LangSmith. Requirements: Make sure to install ``pip install -U langsmith[openai-agents]``. Args: client: An instance of langsmith.client.Client. If not provided, a default client is created. Example: .. code-block:: python from agents import ( Agent, FileSearchTool, Runner, WebSearchTool, function_tool, set_trace_processors, ) from langsmith.wrappers import OpenAIAgentsTracingProcessor set_trace_processors([OpenAIAgentsTracingProcessor()]) @function_tool def get_weather(city: str) -> str: return f"The weather in {city} is sunny" haiku_agent = Agent( name="Haiku agent", instructions="Always respond in haiku form", model="o3-mini", tools=[get_weather], ) agent = Agent( name="Assistant", tools=[WebSearchTool()], instructions="speak in spanish. use Haiku agent if they ask for a haiku or for the weather", handoffs=[haiku_agent], ) result = await Runner.run( agent, "write a haiku about the weather today and tell me a recent news story about new york", ) print(result.final_output) """ # noqa: E501 def __init__(self, *args, **kwargs): raise ImportError( "The `agents` package is not installed. " "Please install it with `pip install langsmith[openai-agents]`." ) from langsmith import client as ls_client logger = logging.getLogger(__name__) if HAVE_AGENTS: class OpenAIAgentsTracingProcessor(tracing.TracingProcessor): # type: ignore[no-redef] """Tracing processor for the `OpenAI Agents SDK <https://openai.github.io/openai-agents-python/>`_. Traces all intermediate steps of your OpenAI Agent to LangSmith. Requirements: Make sure to install ``pip install -U langsmith[openai-agents]``. Args: client: An instance of langsmith.client.Client. If not provided, a default client is created. metadata: Metadata to associate with all traces. tags: Tags to associate with all traces. project_name: LangSmith project to trace to. name: Name of the root trace. Example: .. code-block:: python from agents import ( Agent, FileSearchTool, Runner, WebSearchTool, function_tool, set_trace_processors, ) from langsmith.wrappers import OpenAIAgentsTracingProcessor set_trace_processors([OpenAIAgentsTracingProcessor()]) @function_tool def get_weather(city: str) -> str: return f"The weather in {city} is sunny" haiku_agent = Agent( name="Haiku agent", instructions="Always respond in haiku form", model="o3-mini", tools=[get_weather], ) agent = Agent( name="Assistant", tools=[WebSearchTool()], instructions="speak in spanish. use Haiku agent if they ask for a haiku or for the weather", handoffs=[haiku_agent], ) result = await Runner.run( agent, "write a haiku about the weather today and tell me a recent news story about new york", ) print(result.final_output) """ # noqa: E501 def __init__( self, client: Optional[ls_client.Client] = None, *, metadata: Optional[dict] = None, tags: Optional[list[str]] = None, project_name: Optional[str] = None, name: Optional[str] = None, ): self.client = client or rt.get_cached_client() self._metadata = metadata self._tags = tags self._project_name = project_name self._name = name self._first_response_inputs: dict = {} self._last_response_outputs: dict = {} self._runs: Dict[str, str] = {} def on_trace_start(self, trace: tracing.Trace) -> None: if self._name: run_name = self._name elif trace.name: run_name = trace.name else: run_name = "Agent workflow" trace_run_id = str(uuid.uuid4()) self._runs[trace.trace_id] = trace_run_id run_extra = {"metadata": self._metadata or {}} trace_dict = trace.export() or {} if trace_dict.get("group_id") is not None: run_extra["metadata"]["thread_id"] = trace_dict["group_id"] try: run_data: dict = dict( name=run_name, inputs={}, run_type="chain", id=trace_run_id, revision_id=None, extra=run_extra, tags=self._tags, project_name=self._project_name, ) self.client.create_run(**run_data) except Exception as e: logger.exception(f"Error creating trace run: {e}") def on_trace_end(self, trace: tracing.Trace) -> None: run_id = self._runs.pop(trace.trace_id, None) trace_dict = trace.export() or {} metadata = {**(trace_dict.get("metadata") or {}), **(self._metadata or {})} if run_id: try: self.client.update_run( run_id=run_id, inputs=self._first_response_inputs.pop(trace.trace_id, {}), outputs=self._last_response_outputs.pop(trace.trace_id, {}), extra={"metadata": metadata}, ) except Exception as e: logger.exception(f"Error updating trace run: {e}") def on_span_start(self, span: tracing.Span) -> None: parent_run_id = self._runs.get(span.parent_id or span.trace_id) span_run_id = str(uuid.uuid4()) self._runs[span.span_id] = span_run_id run_name = agent_utils.get_run_name(span) run_type = agent_utils.get_run_type(span) extracted = agent_utils.extract_span_data(span) try: run_data: dict = dict( name=run_name, run_type=run_type, id=span_run_id, parent_run_id=parent_run_id, inputs=extracted.get("inputs", {}), ) if span.started_at: run_data["start_time"] = datetime.datetime.fromisoformat( span.started_at ) self.client.create_run(**run_data) except Exception as e: logger.exception(f"Error creating span run: {e}") def on_span_end(self, span: tracing.Span) -> None: run_id = self._runs.pop(span.span_id, None) if run_id: extracted = agent_utils.extract_span_data(span) metadata = extracted.get("metadata", {}) metadata["openai_parent_id"] = span.parent_id metadata["openai_trace_id"] = span.trace_id metadata["openai_span_id"] = span.span_id extracted["metadata"] = metadata outputs = extracted.pop("outputs", {}) inputs = extracted.pop("inputs", {}) run_data: dict = dict( run_id=run_id, error=str(span.error) if span.error else None, outputs=outputs, inputs=inputs, extra=extracted, ) if span.ended_at: run_data["end_time"] = datetime.datetime.fromisoformat( span.ended_at ) if isinstance(span.span_data, tracing.ResponseSpanData): self._first_response_inputs[span.trace_id] = ( self._first_response_inputs.get(span.trace_id) or inputs ) self._last_response_outputs[span.trace_id] = outputs self.client.update_run(**run_data) def shutdown(self) -> None: self.client.flush() def force_flush(self) -> None: self.client.flush()
Memory