try:
import anthropic
from anthropic.resources import AsyncMessages
except ImportError:
raise ModuleNotFoundError("Please install the Anthropic SDK to use this feature: 'pip install anthropic'")
import time
import uuid
from typing import Any, Dict, Optional
from posthog.ai.utils import call_llm_and_track_usage_async, get_model_params, merge_system_prompt, with_privacy_mode
from posthog.client import Client as PostHogClient
class AsyncAnthropic(anthropic.AsyncAnthropic):
"""
An async wrapper around the Anthropic SDK that automatically sends LLM usage events to PostHog.
"""
_ph_client: PostHogClient
def __init__(self, posthog_client: PostHogClient, **kwargs):
"""
Args:
posthog_client: PostHog client for tracking usage
**kwargs: Additional arguments passed to the Anthropic client
"""
super().__init__(**kwargs)
self._ph_client = posthog_client
self.messages = AsyncWrappedMessages(self)
class AsyncWrappedMessages(AsyncMessages):
_client: AsyncAnthropic
async def create(
self,
posthog_distinct_id: Optional[str] = None,
posthog_trace_id: Optional[str] = None,
posthog_properties: Optional[Dict[str, Any]] = None,
posthog_privacy_mode: bool = False,
posthog_groups: Optional[Dict[str, Any]] = None,
**kwargs: Any,
):
"""
Create a message using Anthropic's API while tracking usage in PostHog.
Args:
posthog_distinct_id: Optional ID to associate with the usage event
posthog_trace_id: Optional trace UUID for linking events
posthog_properties: Optional dictionary of extra properties to include in the event
posthog_privacy_mode: Whether to redact sensitive information in tracking
posthog_groups: Optional group analytics properties
**kwargs: Arguments passed to Anthropic's messages.create
"""
if posthog_trace_id is None:
posthog_trace_id = str(uuid.uuid4())
if kwargs.get("stream", False):
return await self._create_streaming(
posthog_distinct_id,
posthog_trace_id,
posthog_properties,
posthog_privacy_mode,
posthog_groups,
**kwargs,
)
return await call_llm_and_track_usage_async(
posthog_distinct_id,
self._client._ph_client,
"anthropic",
posthog_trace_id,
posthog_properties,
posthog_privacy_mode,
posthog_groups,
self._client.base_url,
super().create,
**kwargs,
)
async def stream(
self,
posthog_distinct_id: Optional[str] = None,
posthog_trace_id: Optional[str] = None,
posthog_properties: Optional[Dict[str, Any]] = None,
posthog_privacy_mode: bool = False,
posthog_groups: Optional[Dict[str, Any]] = None,
**kwargs: Any,
):
if posthog_trace_id is None:
posthog_trace_id = str(uuid.uuid4())
return await self._create_streaming(
posthog_distinct_id,
posthog_trace_id,
posthog_properties,
posthog_privacy_mode,
posthog_groups,
**kwargs,
)
async def _create_streaming(
self,
posthog_distinct_id: Optional[str],
posthog_trace_id: Optional[str],
posthog_properties: Optional[Dict[str, Any]],
posthog_privacy_mode: bool,
posthog_groups: Optional[Dict[str, Any]],
**kwargs: Any,
):
start_time = time.time()
usage_stats: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0}
accumulated_content = []
response = await super().create(**kwargs)
async def generator():
nonlocal usage_stats
nonlocal accumulated_content # noqa: F824
try:
async for event in response:
if hasattr(event, "usage") and event.usage:
usage_stats = {
k: getattr(event.usage, k, 0)
for k in [
"input_tokens",
"output_tokens",
"cache_read_input_tokens",
"cache_creation_input_tokens",
]
}
if hasattr(event, "content") and event.content:
accumulated_content.append(event.content)
yield event
finally:
end_time = time.time()
latency = end_time - start_time
output = "".join(accumulated_content)
await self._capture_streaming_event(
posthog_distinct_id,
posthog_trace_id,
posthog_properties,
posthog_privacy_mode,
posthog_groups,
kwargs,
usage_stats,
latency,
output,
)
return generator()
async def _capture_streaming_event(
self,
posthog_distinct_id: Optional[str],
posthog_trace_id: Optional[str],
posthog_properties: Optional[Dict[str, Any]],
posthog_privacy_mode: bool,
posthog_groups: Optional[Dict[str, Any]],
kwargs: Dict[str, Any],
usage_stats: Dict[str, int],
latency: float,
output: str,
):
if posthog_trace_id is None:
posthog_trace_id = str(uuid.uuid4())
event_properties = {
"$ai_provider": "anthropic",
"$ai_model": kwargs.get("model"),
"$ai_model_parameters": get_model_params(kwargs),
"$ai_input": with_privacy_mode(
self._client._ph_client,
posthog_privacy_mode,
merge_system_prompt(kwargs, "anthropic"),
),
"$ai_output_choices": with_privacy_mode(
self._client._ph_client,
posthog_privacy_mode,
[{"content": output, "role": "assistant"}],
),
"$ai_http_status": 200,
"$ai_input_tokens": usage_stats.get("input_tokens", 0),
"$ai_output_tokens": usage_stats.get("output_tokens", 0),
"$ai_cache_read_input_tokens": usage_stats.get("cache_read_input_tokens", 0),
"$ai_cache_creation_input_tokens": usage_stats.get("cache_creation_input_tokens", 0),
"$ai_latency": latency,
"$ai_trace_id": posthog_trace_id,
"$ai_base_url": str(self._client.base_url),
**(posthog_properties or {}),
}
if posthog_distinct_id is None:
event_properties["$process_person_profile"] = False
if hasattr(self._client._ph_client, "capture"):
self._client._ph_client.capture(
distinct_id=posthog_distinct_id or posthog_trace_id,
event="$ai_generation",
properties=event_properties,
groups=posthog_groups,
)