import time import uuid from typing import Any, Callable, Dict, List, Optional from httpx import URL from posthog.client import Client as PostHogClient def get_model_params(kwargs: Dict[str, Any]) -> Dict[str, Any]: """ Extracts model parameters from the kwargs dictionary. """ model_params = {} for param in [ "temperature", "max_tokens", # Deprecated field "max_completion_tokens", "top_p", "frequency_penalty", "presence_penalty", "n", "stop", "stream", # OpenAI-specific field "streaming", # Anthropic-specific field ]: if param in kwargs and kwargs[param] is not None: model_params[param] = kwargs[param] return model_params def get_usage(response, provider: str) -> Dict[str, Any]: if provider == "anthropic": return { "input_tokens": response.usage.input_tokens, "output_tokens": response.usage.output_tokens, "cache_read_input_tokens": response.usage.cache_read_input_tokens, "cache_creation_input_tokens": response.usage.cache_creation_input_tokens, } elif provider == "openai": cached_tokens = 0 input_tokens = 0 output_tokens = 0 reasoning_tokens = 0 # responses api if hasattr(response.usage, "input_tokens"): input_tokens = response.usage.input_tokens if hasattr(response.usage, "output_tokens"): output_tokens = response.usage.output_tokens if hasattr(response.usage, "input_tokens_details") and hasattr( response.usage.input_tokens_details, "cached_tokens" ): cached_tokens = response.usage.input_tokens_details.cached_tokens if hasattr(response.usage, "output_tokens_details") and hasattr( response.usage.output_tokens_details, "reasoning_tokens" ): reasoning_tokens = response.usage.output_tokens_details.reasoning_tokens # chat completions if hasattr(response.usage, "prompt_tokens"): input_tokens = response.usage.prompt_tokens if hasattr(response.usage, "completion_tokens"): output_tokens = response.usage.completion_tokens if hasattr(response.usage, "prompt_tokens_details") and hasattr( response.usage.prompt_tokens_details, "cached_tokens" ): cached_tokens = response.usage.prompt_tokens_details.cached_tokens return { "input_tokens": input_tokens, "output_tokens": output_tokens, "cache_read_input_tokens": cached_tokens, "reasoning_tokens": reasoning_tokens, } return { "input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "reasoning_tokens": 0, } def format_response(response, provider: str): """ Format a regular (non-streaming) response. """ output = [] if response is None: return output if provider == "anthropic": return format_response_anthropic(response) elif provider == "openai": return format_response_openai(response) return output def format_response_anthropic(response): output = [] for choice in response.content: if choice.text: output.append( { "role": "assistant", "content": choice.text, } ) return output def format_response_openai(response): output = [] if hasattr(response, "choices"): for choice in response.choices: # Handle Chat Completions response format if hasattr(choice, "message") and choice.message and choice.message.content: output.append( { "content": choice.message.content, "role": choice.message.role, } ) # Handle Responses API format if hasattr(response, "output"): for item in response.output: if item.type == "message": # Extract text content from the content list if hasattr(item, "content") and isinstance(item.content, list): for content_item in item.content: if ( hasattr(content_item, "type") and content_item.type == "output_text" and hasattr(content_item, "text") ): output.append( { "content": content_item.text, "role": item.role, } ) elif hasattr(content_item, "text"): output.append( { "content": content_item.text, "role": item.role, } ) elif ( hasattr(content_item, "type") and content_item.type == "input_image" and hasattr(content_item, "image_url") ): output.append( { "content": { "type": "image", "image": content_item.image_url, }, "role": item.role, } ) else: output.append( { "content": item.content, "role": item.role, } ) return output def format_tool_calls(response, provider: str): if provider == "anthropic": if hasattr(response, "tools") and response.tools and len(response.tools) > 0: return response.tools elif provider == "openai": # Handle both Chat Completions and Responses API if hasattr(response, "choices") and response.choices: # Check for tool_calls in message (Chat Completions format) if ( hasattr(response.choices[0], "message") and hasattr(response.choices[0].message, "tool_calls") and response.choices[0].message.tool_calls ): return response.choices[0].message.tool_calls # Check for tool_calls directly in response (Responses API format) if hasattr(response.choices[0], "tool_calls") and response.choices[0].tool_calls: return response.choices[0].tool_calls return None def merge_system_prompt(kwargs: Dict[str, Any], provider: str): messages: List[Dict[str, Any]] = [] if provider == "anthropic": messages = kwargs.get("messages") or [] if kwargs.get("system") is None: return messages return [{"role": "system", "content": kwargs.get("system")}] + messages # For OpenAI, handle both Chat Completions and Responses API if kwargs.get("messages") is not None: messages = list(kwargs.get("messages", [])) if kwargs.get("input") is not None: input_data = kwargs.get("input") if isinstance(input_data, list): messages.extend(input_data) else: messages.append({"role": "user", "content": input_data}) # Check if system prompt is provided as a separate parameter if kwargs.get("system") is not None: has_system = any(msg.get("role") == "system" for msg in messages) if not has_system: messages = [{"role": "system", "content": kwargs.get("system")}] + messages # For Responses API, add instructions to the system prompt if provided if kwargs.get("instructions") is not None: # Find the system message if it exists system_idx = next((i for i, msg in enumerate(messages) if msg.get("role") == "system"), None) if system_idx is not None: # Append instructions to existing system message system_content = messages[system_idx].get("content", "") messages[system_idx]["content"] = f"{system_content}\n\n{kwargs.get('instructions')}" else: # Create a new system message with instructions messages = [{"role": "system", "content": kwargs.get("instructions")}] + messages return messages def call_llm_and_track_usage( posthog_distinct_id: Optional[str], ph_client: PostHogClient, provider: str, posthog_trace_id: Optional[str], posthog_properties: Optional[Dict[str, Any]], posthog_privacy_mode: bool, posthog_groups: Optional[Dict[str, Any]], base_url: URL, call_method: Callable[..., Any], **kwargs: Any, ) -> Any: """ Common usage-tracking logic for both sync and async calls. call_method: the llm call method (e.g. openai.chat.completions.create) """ start_time = time.time() response = None error = None http_status = 200 usage: Dict[str, Any] = {} error_params: Dict[str, any] = {} try: response = call_method(**kwargs) except Exception as exc: error = exc http_status = getattr(exc, "status_code", 0) # default to 0 becuase its likely an SDK error error_params = { "$ai_is_error": True, "$ai_error": exc.__str__(), } finally: end_time = time.time() latency = end_time - start_time if posthog_trace_id is None: posthog_trace_id = str(uuid.uuid4()) if response and hasattr(response, "usage"): usage = get_usage(response, provider) messages = merge_system_prompt(kwargs, provider) event_properties = { "$ai_provider": provider, "$ai_model": kwargs.get("model"), "$ai_model_parameters": get_model_params(kwargs), "$ai_input": with_privacy_mode(ph_client, posthog_privacy_mode, messages), "$ai_output_choices": with_privacy_mode( ph_client, posthog_privacy_mode, format_response(response, provider) ), "$ai_http_status": http_status, "$ai_input_tokens": usage.get("input_tokens", 0), "$ai_output_tokens": usage.get("output_tokens", 0), "$ai_latency": latency, "$ai_trace_id": posthog_trace_id, "$ai_base_url": str(base_url), **(posthog_properties or {}), **(error_params or {}), } tool_calls = format_tool_calls(response, provider) if tool_calls: event_properties["$ai_tools"] = with_privacy_mode(ph_client, posthog_privacy_mode, tool_calls) if usage.get("cache_read_input_tokens") is not None and usage.get("cache_read_input_tokens", 0) > 0: event_properties["$ai_cache_read_input_tokens"] = usage.get("cache_read_input_tokens", 0) if usage.get("cache_creation_input_tokens") is not None and usage.get("cache_creation_input_tokens", 0) > 0: event_properties["$ai_cache_creation_input_tokens"] = usage.get("cache_creation_input_tokens", 0) if usage.get("reasoning_tokens") is not None and usage.get("reasoning_tokens", 0) > 0: event_properties["$ai_reasoning_tokens"] = usage.get("reasoning_tokens", 0) if posthog_distinct_id is None: event_properties["$process_person_profile"] = False # Process instructions for Responses API if provider == "openai" and kwargs.get("instructions") is not None: event_properties["$ai_instructions"] = with_privacy_mode( ph_client, posthog_privacy_mode, kwargs.get("instructions") ) # send the event to posthog if hasattr(ph_client, "capture") and callable(ph_client.capture): ph_client.capture( distinct_id=posthog_distinct_id or posthog_trace_id, event="$ai_generation", properties=event_properties, groups=posthog_groups, ) if error: raise error return response async def call_llm_and_track_usage_async( posthog_distinct_id: Optional[str], ph_client: PostHogClient, provider: str, posthog_trace_id: Optional[str], posthog_properties: Optional[Dict[str, Any]], posthog_privacy_mode: bool, posthog_groups: Optional[Dict[str, Any]], base_url: URL, call_async_method: Callable[..., Any], **kwargs: Any, ) -> Any: start_time = time.time() response = None error = None http_status = 200 usage: Dict[str, Any] = {} error_params: Dict[str, any] = {} try: response = await call_async_method(**kwargs) except Exception as exc: error = exc http_status = getattr(exc, "status_code", 0) # default to 0 because its likely an SDK error error_params = { "$ai_is_error": True, "$ai_error": exc.__str__(), } finally: end_time = time.time() latency = end_time - start_time if posthog_trace_id is None: posthog_trace_id = str(uuid.uuid4()) if response and hasattr(response, "usage"): usage = get_usage(response, provider) messages = merge_system_prompt(kwargs, provider) event_properties = { "$ai_provider": provider, "$ai_model": kwargs.get("model"), "$ai_model_parameters": get_model_params(kwargs), "$ai_input": with_privacy_mode(ph_client, posthog_privacy_mode, messages), "$ai_output_choices": with_privacy_mode( ph_client, posthog_privacy_mode, format_response(response, provider) ), "$ai_http_status": http_status, "$ai_input_tokens": usage.get("input_tokens", 0), "$ai_output_tokens": usage.get("output_tokens", 0), "$ai_latency": latency, "$ai_trace_id": posthog_trace_id, "$ai_base_url": str(base_url), **(posthog_properties or {}), **(error_params or {}), } tool_calls = format_tool_calls(response, provider) if tool_calls: event_properties["$ai_tools"] = with_privacy_mode(ph_client, posthog_privacy_mode, tool_calls) if usage.get("cache_read_input_tokens") is not None and usage.get("cache_read_input_tokens", 0) > 0: event_properties["$ai_cache_read_input_tokens"] = usage.get("cache_read_input_tokens", 0) if usage.get("cache_creation_input_tokens") is not None and usage.get("cache_creation_input_tokens", 0) > 0: event_properties["$ai_cache_creation_input_tokens"] = usage.get("cache_creation_input_tokens", 0) if posthog_distinct_id is None: event_properties["$process_person_profile"] = False # Process instructions for Responses API if provider == "openai" and kwargs.get("instructions") is not None: event_properties["$ai_instructions"] = with_privacy_mode( ph_client, posthog_privacy_mode, kwargs.get("instructions") ) # send the event to posthog if hasattr(ph_client, "capture") and callable(ph_client.capture): ph_client.capture( distinct_id=posthog_distinct_id or posthog_trace_id, event="$ai_generation", properties=event_properties, groups=posthog_groups, ) if error: raise error return response def with_privacy_mode(ph_client: PostHogClient, privacy_mode: bool, value: Any): if ph_client.privacy_mode or privacy_mode: return None return value
Memory