import atexit import logging import numbers import os import platform import sys import warnings from datetime import datetime, timedelta from typing import Any, Optional, Union from uuid import UUID, uuid4 import distro # For Linux OS detection from dateutil.tz import tzutc from six import string_types from posthog.consumer import Consumer from posthog.exception_capture import ExceptionCapture from posthog.exception_utils import exc_info_from_error, exceptions_from_error_tuple, handle_in_app from posthog.feature_flags import InconclusiveMatchError, match_feature_flag_properties from posthog.poller import Poller from posthog.request import DEFAULT_HOST, APIError, batch_post, decide, determine_server_host, get, remote_config from posthog.types import ( DecideResponse, FeatureFlag, FlagMetadata, FlagsAndPayloads, FlagValue, normalize_decide_response, to_flags_and_payloads, to_payloads, to_values, ) from posthog.utils import SizeLimitedDict, clean, guess_timezone, remove_trailing_slash from posthog.version import VERSION try: import queue except ImportError: import Queue as queue ID_TYPES = (numbers.Number, string_types, UUID) MAX_DICT_SIZE = 50_000 def get_os_info(): """ Returns standardized OS name and version information. Similar to how user agent parsing works in JS. """ os_name = "" os_version = "" platform_name = sys.platform if platform_name.startswith("win"): os_name = "Windows" if hasattr(platform, "win32_ver"): win_version = platform.win32_ver()[0] if win_version: os_version = win_version elif platform_name == "darwin": os_name = "Mac OS X" if hasattr(platform, "mac_ver"): mac_version = platform.mac_ver()[0] if mac_version: os_version = mac_version elif platform_name.startswith("linux"): os_name = "Linux" linux_info = distro.info() if linux_info["version"]: os_version = linux_info["version"] elif platform_name.startswith("freebsd"): os_name = "FreeBSD" if hasattr(platform, "release"): os_version = platform.release() else: os_name = platform_name if hasattr(platform, "release"): os_version = platform.release() return os_name, os_version def system_context() -> dict[str, Any]: os_name, os_version = get_os_info() return { "$python_runtime": platform.python_implementation(), "$python_version": "%s.%s.%s" % (sys.version_info[:3]), "$os": os_name, "$os_version": os_version, } class Client(object): """Create a new PostHog client.""" log = logging.getLogger("posthog") def __init__( self, api_key=None, host=None, debug=False, max_queue_size=10000, send=True, on_error=None, flush_at=100, flush_interval=0.5, gzip=False, max_retries=3, sync_mode=False, timeout=15, thread=1, poll_interval=30, personal_api_key=None, project_api_key=None, disabled=False, disable_geoip=True, historical_migration=False, feature_flags_request_timeout_seconds=3, super_properties=None, enable_exception_autocapture=False, log_captured_exceptions=False, exception_autocapture_integrations=None, project_root=None, privacy_mode=False, ): self.queue = queue.Queue(max_queue_size) # api_key: This should be the Team API Key (token), public self.api_key = project_api_key or api_key require("api_key", self.api_key, string_types) self.on_error = on_error self.debug = debug self.send = send self.sync_mode = sync_mode # Used for session replay URL generation - we don't want the server host here. self.raw_host = host or DEFAULT_HOST self.host = determine_server_host(host) self.gzip = gzip self.timeout = timeout self._feature_flags = None # private variable to store flags self.feature_flags_by_key = None self.group_type_mapping = None self.cohorts = None self.poll_interval = poll_interval self.feature_flags_request_timeout_seconds = feature_flags_request_timeout_seconds self.poller = None self.distinct_ids_feature_flags_reported = SizeLimitedDict(MAX_DICT_SIZE, set) self.disabled = disabled self.disable_geoip = disable_geoip self.historical_migration = historical_migration self.super_properties = super_properties self.enable_exception_autocapture = enable_exception_autocapture self.log_captured_exceptions = log_captured_exceptions self.exception_autocapture_integrations = exception_autocapture_integrations self.exception_capture = None self.privacy_mode = privacy_mode if project_root is None: try: project_root = os.getcwd() except Exception: project_root = None self.project_root = project_root # personal_api_key: This should be a generated Personal API Key, private self.personal_api_key = personal_api_key if debug: # Ensures that debug level messages are logged when debug mode is on. # Otherwise, defaults to WARNING level. See https://docs.python.org/3/howto/logging.html#what-happens-if-no-configuration-is-provided logging.basicConfig() self.log.setLevel(logging.DEBUG) else: self.log.setLevel(logging.WARNING) if self.enable_exception_autocapture: self.exception_capture = ExceptionCapture(self, integrations=self.exception_autocapture_integrations) if sync_mode: self.consumers = None else: # On program exit, allow the consumer thread to exit cleanly. # This prevents exceptions and a messy shutdown when the # interpreter is destroyed before the daemon thread finishes # execution. However, it is *not* the same as flushing the queue! # To guarantee all messages have been delivered, you'll still need # to call flush(). if send: atexit.register(self.join) for n in range(thread): self.consumers = [] consumer = Consumer( self.queue, self.api_key, host=self.host, on_error=on_error, flush_at=flush_at, flush_interval=flush_interval, gzip=gzip, retries=max_retries, timeout=timeout, historical_migration=historical_migration, ) self.consumers.append(consumer) # if we've disabled sending, just don't start the consumer if send: consumer.start() @property def feature_flags(self): """ Get the local evaluation feature flags. """ return self._feature_flags @feature_flags.setter def feature_flags(self, flags): """ Set the local evaluation feature flags. """ self._feature_flags = flags or [] self.feature_flags_by_key = {flag["key"]: flag for flag in self._feature_flags if flag.get("key") is not None} assert ( self.feature_flags_by_key is not None ), "feature_flags_by_key should be initialized when feature_flags is set" def identify(self, distinct_id=None, properties=None, context=None, timestamp=None, uuid=None, disable_geoip=None): if context is not None: warnings.warn( "The 'context' parameter is deprecated and will be removed in a future version.", DeprecationWarning, stacklevel=2, ) properties = properties or {} require("distinct_id", distinct_id, ID_TYPES) require("properties", properties, dict) msg = { "timestamp": timestamp, "distinct_id": distinct_id, "$set": properties, "event": "$identify", "uuid": uuid, } return self._enqueue(msg, disable_geoip) def get_feature_variants( self, distinct_id, groups=None, person_properties=None, group_properties=None, disable_geoip=None ) -> dict[str, Union[bool, str]]: """ Get feature flag variants for a distinct_id by calling decide. """ resp_data = self.get_decide(distinct_id, groups, person_properties, group_properties, disable_geoip) return to_values(resp_data) or {} def get_feature_payloads( self, distinct_id, groups=None, person_properties=None, group_properties=None, disable_geoip=None ) -> dict[str, str]: """ Get feature flag payloads for a distinct_id by calling decide. """ resp_data = self.get_decide(distinct_id, groups, person_properties, group_properties, disable_geoip) return to_payloads(resp_data) or {} def get_feature_flags_and_payloads( self, distinct_id, groups=None, person_properties=None, group_properties=None, disable_geoip=None ) -> FlagsAndPayloads: """ Get feature flags and payloads for a distinct_id by calling decide. """ resp = self.get_decide(distinct_id, groups, person_properties, group_properties, disable_geoip) return to_flags_and_payloads(resp) def get_decide( self, distinct_id, groups=None, person_properties=None, group_properties=None, disable_geoip=None ) -> DecideResponse: require("distinct_id", distinct_id, ID_TYPES) if disable_geoip is None: disable_geoip = self.disable_geoip if groups: require("groups", groups, dict) else: groups = {} request_data = { "distinct_id": distinct_id, "groups": groups, "person_properties": person_properties, "group_properties": group_properties, "disable_geoip": disable_geoip, } resp_data = decide(self.api_key, self.host, timeout=self.feature_flags_request_timeout_seconds, **request_data) return normalize_decide_response(resp_data) def capture( self, distinct_id=None, event=None, properties=None, context=None, timestamp=None, uuid=None, groups=None, send_feature_flags=False, disable_geoip=None, ): if context is not None: warnings.warn( "The 'context' parameter is deprecated and will be removed in a future version.", DeprecationWarning, stacklevel=2, ) properties = {**(properties or {}), **system_context()} require("distinct_id", distinct_id, ID_TYPES) require("properties", properties, dict) require("event", event, string_types) msg = { "properties": properties, "timestamp": timestamp, "distinct_id": distinct_id, "event": event, "uuid": uuid, } if groups: require("groups", groups, dict) msg["properties"]["$groups"] = groups extra_properties: dict[str, Any] = {} feature_variants: Optional[dict[str, Union[bool, str]]] = {} if send_feature_flags: try: feature_variants = self.get_feature_variants(distinct_id, groups, disable_geoip=disable_geoip) except Exception as e: self.log.exception(f"[FEATURE FLAGS] Unable to get feature variants: {e}") elif self.feature_flags and event != "$feature_flag_called": # Local evaluation is enabled, flags are loaded, so try and get all flags we can without going to the server feature_variants = self.get_all_flags( distinct_id, groups=(groups or {}), disable_geoip=disable_geoip, only_evaluate_locally=True ) for feature, variant in (feature_variants or {}).items(): extra_properties[f"$feature/{feature}"] = variant active_feature_flags = [key for (key, value) in (feature_variants or {}).items() if value is not False] if active_feature_flags: extra_properties["$active_feature_flags"] = active_feature_flags if extra_properties: msg["properties"] = {**extra_properties, **msg["properties"]} return self._enqueue(msg, disable_geoip) def set(self, distinct_id=None, properties=None, context=None, timestamp=None, uuid=None, disable_geoip=None): if context is not None: warnings.warn( "The 'context' parameter is deprecated and will be removed in a future version.", DeprecationWarning, stacklevel=2, ) properties = properties or {} require("distinct_id", distinct_id, ID_TYPES) require("properties", properties, dict) msg = { "timestamp": timestamp, "distinct_id": distinct_id, "$set": properties, "event": "$set", "uuid": uuid, } return self._enqueue(msg, disable_geoip) def set_once(self, distinct_id=None, properties=None, context=None, timestamp=None, uuid=None, disable_geoip=None): if context is not None: warnings.warn( "The 'context' parameter is deprecated and will be removed in a future version.", DeprecationWarning, stacklevel=2, ) properties = properties or {} require("distinct_id", distinct_id, ID_TYPES) require("properties", properties, dict) msg = { "timestamp": timestamp, "distinct_id": distinct_id, "$set_once": properties, "event": "$set_once", "uuid": uuid, } return self._enqueue(msg, disable_geoip) def group_identify( self, group_type=None, group_key=None, properties=None, context=None, timestamp=None, uuid=None, disable_geoip=None, distinct_id=None, ): if context is not None: warnings.warn( "The 'context' parameter is deprecated and will be removed in a future version.", DeprecationWarning, stacklevel=2, ) properties = properties or {} require("group_type", group_type, ID_TYPES) require("group_key", group_key, ID_TYPES) require("properties", properties, dict) if distinct_id: require("distinct_id", distinct_id, ID_TYPES) else: distinct_id = "${}_{}".format(group_type, group_key) msg = { "event": "$groupidentify", "properties": { "$group_type": group_type, "$group_key": group_key, "$group_set": properties, }, "distinct_id": distinct_id, "timestamp": timestamp, "uuid": uuid, } return self._enqueue(msg, disable_geoip) def alias(self, previous_id=None, distinct_id=None, context=None, timestamp=None, uuid=None, disable_geoip=None): if context is not None: warnings.warn( "The 'context' parameter is deprecated and will be removed in a future version.", DeprecationWarning, stacklevel=2, ) require("previous_id", previous_id, ID_TYPES) require("distinct_id", distinct_id, ID_TYPES) msg = { "properties": { "distinct_id": previous_id, "alias": distinct_id, }, "timestamp": timestamp, "event": "$create_alias", "distinct_id": previous_id, } return self._enqueue(msg, disable_geoip) def page( self, distinct_id=None, url=None, properties=None, context=None, timestamp=None, uuid=None, disable_geoip=None ): if context is not None: warnings.warn( "The 'context' parameter is deprecated and will be removed in a future version.", DeprecationWarning, stacklevel=2, ) properties = properties or {} require("distinct_id", distinct_id, ID_TYPES) require("properties", properties, dict) require("url", url, string_types) properties["$current_url"] = url msg = { "event": "$pageview", "properties": properties, "timestamp": timestamp, "distinct_id": distinct_id, "uuid": uuid, } return self._enqueue(msg, disable_geoip) def capture_exception( self, exception=None, distinct_id=None, properties=None, context=None, timestamp=None, uuid=None, groups=None, **kwargs, ): if context is not None: warnings.warn( "The 'context' parameter is deprecated and will be removed in a future version.", DeprecationWarning, stacklevel=2, ) # this function shouldn't ever throw an error, so it logs exceptions instead of raising them. # this is important to ensure we don't unexpectedly re-raise exceptions in the user's code. try: properties = properties or {} # if there's no distinct_id, we'll generate one and set personless mode # via $process_person_profile = false if distinct_id is None: properties["$process_person_profile"] = False distinct_id = uuid4() require("distinct_id", distinct_id, ID_TYPES) require("properties", properties, dict) if exception is not None: exc_info = exc_info_from_error(exception) else: exc_info = sys.exc_info() if exc_info is None or exc_info == (None, None, None): self.log.warning("No exception information available") return # Format stack trace for cymbal all_exceptions_with_trace = exceptions_from_error_tuple(exc_info) # Add in-app property to frames in the exceptions event = handle_in_app( { "exception": { "values": all_exceptions_with_trace, }, }, project_root=self.project_root, ) all_exceptions_with_trace_and_in_app = event["exception"]["values"] properties = { "$exception_type": all_exceptions_with_trace_and_in_app[0].get("type"), "$exception_message": all_exceptions_with_trace_and_in_app[0].get("value"), "$exception_list": all_exceptions_with_trace_and_in_app, "$exception_personURL": f"{remove_trailing_slash(self.raw_host)}/project/{self.api_key}/person/{distinct_id}", **properties, } if self.log_captured_exceptions: self.log.exception(exception, extra=kwargs) return self.capture(distinct_id, "$exception", properties, context, timestamp, uuid, groups) except Exception as e: self.log.exception(f"Failed to capture exception: {e}") def _enqueue(self, msg, disable_geoip): """Push a new `msg` onto the queue, return `(success, msg)`""" if self.disabled: return False, "disabled" timestamp = msg["timestamp"] if timestamp is None: timestamp = datetime.now(tz=tzutc()) require("timestamp", timestamp, datetime) # add common timestamp = guess_timezone(timestamp) msg["timestamp"] = timestamp.isoformat() # only send if "uuid" is truthy if "uuid" in msg: uuid = msg.pop("uuid") if uuid: msg["uuid"] = stringify_id(uuid) if not msg.get("properties"): msg["properties"] = {} msg["properties"]["$lib"] = "posthog-python" msg["properties"]["$lib_version"] = VERSION if disable_geoip is None: disable_geoip = self.disable_geoip if disable_geoip: msg["properties"]["$geoip_disable"] = True if self.super_properties: msg["properties"] = {**msg["properties"], **self.super_properties} msg["distinct_id"] = stringify_id(msg.get("distinct_id", None)) msg = clean(msg) self.log.debug("queueing: %s", msg) # if send is False, return msg as if it was successfully queued if not self.send: return True, msg if self.sync_mode: self.log.debug("enqueued with blocking %s.", msg["event"]) batch_post( self.api_key, self.host, gzip=self.gzip, timeout=self.timeout, batch=[msg], historical_migration=self.historical_migration, ) return True, msg try: self.queue.put(msg, block=False) self.log.debug("enqueued %s.", msg["event"]) return True, msg except queue.Full: self.log.warning("analytics-python queue is full") return False, msg def flush(self): """Forces a flush from the internal queue to the server""" queue = self.queue size = queue.qsize() queue.join() # Note that this message may not be precise, because of threading. self.log.debug("successfully flushed about %s items.", size) def join(self): """Ends the consumer thread once the queue is empty. Blocks execution until finished """ for consumer in self.consumers: consumer.pause() try: consumer.join() except RuntimeError: # consumer thread has not started pass if self.poller: self.poller.stop() def shutdown(self): """Flush all messages and cleanly shutdown the client""" self.flush() self.join() if self.exception_capture: self.exception_capture.close() def _load_feature_flags(self): try: response = get( self.personal_api_key, f"/api/feature_flag/local_evaluation/?token={self.api_key}&send_cohorts", self.host, timeout=10, ) self.feature_flags = response["flags"] or [] self.group_type_mapping = response["group_type_mapping"] or {} self.cohorts = response["cohorts"] or {} except APIError as e: if e.status == 401: self.log.error( "[FEATURE FLAGS] Error loading feature flags: To use feature flags, please set a valid personal_api_key. More information: https://posthog.com/docs/api/overview" ) if self.debug: raise APIError( status=401, message="You are using a write-only key with feature flags. " "To use feature flags, please set a personal_api_key " "More information: https://posthog.com/docs/api/overview", ) elif e.status == 402: self.log.warning( "[FEATURE FLAGS] PostHog feature flags quota limited, resetting feature flag data. Learn more about billing limits at https://posthog.com/docs/billing/limits-alerts" ) # Reset all feature flag data when quota limited self.feature_flags = [] self.group_type_mapping = {} self.cohorts = {} if self.debug: raise APIError( status=402, message="PostHog feature flags quota limited", ) else: self.log.error(f"[FEATURE FLAGS] Error loading feature flags: {e}") except Exception as e: self.log.warning( "[FEATURE FLAGS] Fetching feature flags failed with following error. We will retry in %s seconds." % self.poll_interval ) self.log.warning(e) self._last_feature_flag_poll = datetime.now(tz=tzutc()) def load_feature_flags(self): if not self.personal_api_key: self.log.warning("[FEATURE FLAGS] You have to specify a personal_api_key to use feature flags.") self.feature_flags = [] return self._load_feature_flags() if not (self.poller and self.poller.is_alive()): self.poller = Poller(interval=timedelta(seconds=self.poll_interval), execute=self._load_feature_flags) self.poller.start() def _compute_flag_locally( self, feature_flag, distinct_id, *, groups={}, person_properties={}, group_properties={}, warn_on_unknown_groups=True, ) -> FlagValue: if feature_flag.get("ensure_experience_continuity", False): raise InconclusiveMatchError("Flag has experience continuity enabled") if not feature_flag.get("active"): return False flag_filters = feature_flag.get("filters") or {} aggregation_group_type_index = flag_filters.get("aggregation_group_type_index") if aggregation_group_type_index is not None: group_name = self.group_type_mapping.get(str(aggregation_group_type_index)) if not group_name: self.log.warning( f"[FEATURE FLAGS] Unknown group type index {aggregation_group_type_index} for feature flag {feature_flag['key']}" ) # failover to `/decide/` raise InconclusiveMatchError("Flag has unknown group type index") if group_name not in groups: # Group flags are never enabled in `groups` aren't passed in # don't failover to `/decide/`, since response will be the same if warn_on_unknown_groups: self.log.warning( f"[FEATURE FLAGS] Can't compute group feature flag: {feature_flag['key']} without group names passed in" ) else: self.log.debug( f"[FEATURE FLAGS] Can't compute group feature flag: {feature_flag['key']} without group names passed in" ) return False focused_group_properties = group_properties[group_name] return match_feature_flag_properties(feature_flag, groups[group_name], focused_group_properties) else: return match_feature_flag_properties(feature_flag, distinct_id, person_properties, self.cohorts) def feature_enabled( self, key, distinct_id, *, groups={}, person_properties={}, group_properties={}, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, ): response = self.get_feature_flag( key, distinct_id, groups=groups, person_properties=person_properties, group_properties=group_properties, only_evaluate_locally=only_evaluate_locally, send_feature_flag_events=send_feature_flag_events, disable_geoip=disable_geoip, ) if response is None: return None return bool(response) def get_feature_flag( self, key, distinct_id, *, groups={}, person_properties={}, group_properties={}, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, ) -> Optional[FlagValue]: """ Get a feature flag value for a key by evaluating locally or remotely depending on whether local evaluation is enabled and the flag can be locally evaluated. This also captures the $feature_flag_called event unless send_feature_flag_events is False. """ require("key", key, string_types) require("distinct_id", distinct_id, ID_TYPES) require("groups", groups, dict) if self.disabled: return None person_properties, group_properties = self._add_local_person_and_group_properties( distinct_id, groups, person_properties, group_properties ) response = self._locally_evaluate_flag(key, distinct_id, groups, person_properties, group_properties) flag_details = None request_id = None flag_was_locally_evaluated = response is not None if not flag_was_locally_evaluated and not only_evaluate_locally: try: flag_details, request_id = self._get_feature_flag_details_from_decide( key, distinct_id, groups, person_properties, group_properties, disable_geoip ) response = flag_details.get_value() if flag_details else False self.log.debug(f"Successfully computed flag remotely: #{key} -> #{response}") except Exception as e: self.log.exception(f"[FEATURE FLAGS] Unable to get flag remotely: {e}") if send_feature_flag_events: self._capture_feature_flag_called( distinct_id, key, response or False, None, flag_was_locally_evaluated, groups, disable_geoip, request_id, flag_details, ) return response def _locally_evaluate_flag( self, key: str, distinct_id: str, groups: dict[str, str], person_properties: dict[str, str], group_properties: dict[str, str], ) -> Optional[FlagValue]: if self.feature_flags is None and self.personal_api_key: self.load_feature_flags() response = None if self.feature_flags: assert ( self.feature_flags_by_key is not None ), "feature_flags_by_key should be initialized when feature_flags is set" # Local evaluation flag = self.feature_flags_by_key.get(key) if flag: try: response = self._compute_flag_locally( flag, distinct_id, groups=groups, person_properties=person_properties, group_properties=group_properties, ) self.log.debug(f"Successfully computed flag locally: {key} -> {response}") except InconclusiveMatchError as e: self.log.debug(f"Failed to compute flag {key} locally: {e}") except Exception as e: self.log.exception(f"[FEATURE FLAGS] Error while computing variant locally: {e}") return response def get_feature_flag_payload( self, key, distinct_id, *, match_value=None, groups={}, person_properties={}, group_properties={}, only_evaluate_locally=False, send_feature_flag_events=True, disable_geoip=None, ): if self.disabled: return None if match_value is None: person_properties, group_properties = self._add_local_person_and_group_properties( distinct_id, groups, person_properties, group_properties ) match_value = self._locally_evaluate_flag(key, distinct_id, groups, person_properties, group_properties) response = None payload = None flag_details = None request_id = None if match_value is not None: payload = self._compute_payload_locally(key, match_value) flag_was_locally_evaluated = payload is not None if not flag_was_locally_evaluated and not only_evaluate_locally: try: flag_details, request_id = self._get_feature_flag_details_from_decide( key, distinct_id, groups, person_properties, group_properties, disable_geoip ) payload = flag_details.metadata.payload if flag_details else None response = flag_details.get_value() if flag_details else False except Exception as e: self.log.exception(f"[FEATURE FLAGS] Unable to get feature flags and payloads: {e}") if send_feature_flag_events: self._capture_feature_flag_called( distinct_id, key, response or False, payload, flag_was_locally_evaluated, groups, disable_geoip, request_id, flag_details, ) return payload def _get_feature_flag_details_from_decide( self, key: str, distinct_id: str, groups: dict[str, str], person_properties: dict[str, str], group_properties: dict[str, str], disable_geoip: Optional[bool], ) -> tuple[Optional[FeatureFlag], Optional[str]]: """ Calls /decide and returns the flag details and request id """ resp_data = self.get_decide(distinct_id, groups, person_properties, group_properties, disable_geoip) request_id = resp_data.get("requestId") flags = resp_data.get("flags") flag_details = flags.get(key) if flags else None return flag_details, request_id def _capture_feature_flag_called( self, distinct_id: str, key: str, response: FlagValue, payload: Optional[str], flag_was_locally_evaluated: bool, groups: dict[str, str], disable_geoip: Optional[bool], request_id: Optional[str], flag_details: Optional[FeatureFlag], ): feature_flag_reported_key = f"{key}_{str(response)}" if feature_flag_reported_key not in self.distinct_ids_feature_flags_reported[distinct_id]: properties: dict[str, Any] = { "$feature_flag": key, "$feature_flag_response": response, "locally_evaluated": flag_was_locally_evaluated, f"$feature/{key}": response, } if payload: properties["$feature_flag_payload"] = payload if request_id: properties["$feature_flag_request_id"] = request_id if isinstance(flag_details, FeatureFlag): if flag_details.reason and flag_details.reason.description: properties["$feature_flag_reason"] = flag_details.reason.description if isinstance(flag_details.metadata, FlagMetadata): if flag_details.metadata.version: properties["$feature_flag_version"] = flag_details.metadata.version if flag_details.metadata.id: properties["$feature_flag_id"] = flag_details.metadata.id self.capture( distinct_id, "$feature_flag_called", properties, groups=groups, disable_geoip=disable_geoip, ) self.distinct_ids_feature_flags_reported[distinct_id].add(feature_flag_reported_key) def get_remote_config_payload(self, key: str): if self.disabled: return None if self.personal_api_key is None: self.log.warning( "[FEATURE FLAGS] You have to specify a personal_api_key to fetch decrypted feature flag payloads." ) return None try: return remote_config( self.personal_api_key, self.host, key, timeout=self.feature_flags_request_timeout_seconds, ) except Exception as e: self.log.exception(f"[FEATURE FLAGS] Unable to get decrypted feature flag payload: {e}") def _compute_payload_locally(self, key: str, match_value: FlagValue) -> Optional[str]: payload = None if self.feature_flags_by_key is None: return payload flag_definition = self.feature_flags_by_key.get(key) if flag_definition: flag_filters = flag_definition.get("filters") or {} flag_payloads = flag_filters.get("payloads") or {} # For boolean flags, convert True to "true" # For multivariate flags, use the variant string as-is lookup_value = "true" if isinstance(match_value, bool) and match_value else str(match_value) payload = flag_payloads.get(lookup_value, None) return payload def get_all_flags( self, distinct_id, *, groups={}, person_properties={}, group_properties={}, only_evaluate_locally=False, disable_geoip=None, ) -> Optional[dict[str, Union[bool, str]]]: response = self.get_all_flags_and_payloads( distinct_id, groups=groups, person_properties=person_properties, group_properties=group_properties, only_evaluate_locally=only_evaluate_locally, disable_geoip=disable_geoip, ) return response["featureFlags"] def get_all_flags_and_payloads( self, distinct_id, *, groups={}, person_properties={}, group_properties={}, only_evaluate_locally=False, disable_geoip=None, ) -> FlagsAndPayloads: if self.disabled: return {"featureFlags": None, "featureFlagPayloads": None} person_properties, group_properties = self._add_local_person_and_group_properties( distinct_id, groups, person_properties, group_properties ) response, fallback_to_decide = self._get_all_flags_and_payloads_locally( distinct_id, groups=groups, person_properties=person_properties, group_properties=group_properties ) if fallback_to_decide and not only_evaluate_locally: try: decide_response = self.get_decide( distinct_id, groups=groups, person_properties=person_properties, group_properties=group_properties, disable_geoip=disable_geoip, ) return to_flags_and_payloads(decide_response) except Exception as e: self.log.exception(f"[FEATURE FLAGS] Unable to get feature flags and payloads: {e}") return response def _get_all_flags_and_payloads_locally( self, distinct_id, *, groups={}, person_properties={}, group_properties={}, warn_on_unknown_groups=False ) -> tuple[FlagsAndPayloads, bool]: require("distinct_id", distinct_id, ID_TYPES) require("groups", groups, dict) if self.feature_flags is None and self.personal_api_key: self.load_feature_flags() flags: dict[str, FlagValue] = {} payloads: dict[str, str] = {} fallback_to_decide = False # If loading in previous line failed if self.feature_flags: for flag in self.feature_flags: try: flags[flag["key"]] = self._compute_flag_locally( flag, distinct_id, groups=groups, person_properties=person_properties, group_properties=group_properties, warn_on_unknown_groups=warn_on_unknown_groups, ) matched_payload = self._compute_payload_locally(flag["key"], flags[flag["key"]]) if matched_payload: payloads[flag["key"]] = matched_payload except InconclusiveMatchError: # No need to log this, since it's just telling us to fall back to `/decide` fallback_to_decide = True except Exception as e: self.log.exception(f"[FEATURE FLAGS] Error while computing variant and payload: {e}") fallback_to_decide = True else: fallback_to_decide = True return {"featureFlags": flags, "featureFlagPayloads": payloads}, fallback_to_decide def feature_flag_definitions(self): return self.feature_flags def _add_local_person_and_group_properties(self, distinct_id, groups, person_properties, group_properties): all_person_properties = {"distinct_id": distinct_id, **(person_properties or {})} all_group_properties = {} if groups: for group_name in groups: all_group_properties[group_name] = { "$group_key": groups[group_name], **(group_properties.get(group_name) or {}), } return all_person_properties, all_group_properties def require(name, field, data_type): """Require that the named `field` has the right `data_type`""" if not isinstance(field, data_type): msg = "{0} must have {1}, got: {2}".format(name, data_type, field) raise AssertionError(msg) def stringify_id(val): if val is None: return None if isinstance(val, string_types): return val return str(val)
Memory