from sentry_sdk._types import MYPY from sentry_sdk.hub import Hub from sentry_sdk.integrations import Integration from sentry_sdk.scope import add_global_event_processor from sentry_sdk.utils import Dsn import posthog from posthog.request import DEFAULT_HOST from posthog.sentry import POSTHOG_ID_TAG if MYPY: from typing import Optional # noqa: F401 from sentry_sdk._types import Event, Hint # noqa: F401 class PostHogIntegration(Integration): identifier = "posthog-python" organization = None # The Sentry organization, used to send a direct link from PostHog to Sentry project_id = None # The Sentry project id, used to send a direct link from PostHog to Sentry prefix = "https://sentry.io/organizations/" # URL of a hosted sentry instance (default: https://sentry.io/organizations/) @staticmethod def setup_once(): @add_global_event_processor def processor(event, hint): # type: (Event, Optional[Hint]) -> Optional[Event] if Hub.current.get_integration(PostHogIntegration) is not None: if event.get("level") != "error": return event if event.get("tags", {}).get(POSTHOG_ID_TAG): posthog_distinct_id = event["tags"][POSTHOG_ID_TAG] event["tags"]["PostHog URL"] = f"{posthog.host or DEFAULT_HOST}/person/{posthog_distinct_id}" properties = { "$sentry_event_id": event["event_id"], "$sentry_exception": event["exception"], } if PostHogIntegration.organization: project_id = PostHogIntegration.project_id or ( not not Hub.current.client.dsn and Dsn(Hub.current.client.dsn).project_id ) if project_id: properties["$sentry_url"] = ( f"{PostHogIntegration.prefix}{PostHogIntegration.organization}/issues/?project={project_id}&query={event['event_id']}" ) posthog.capture(posthog_distinct_id, "$exception", properties) return event
Memory