import logging import numbers import re from collections import defaultdict from dataclasses import asdict, is_dataclass from datetime import date, datetime, timezone from decimal import Decimal from uuid import UUID import six from dateutil.tz import tzlocal, tzutc log = logging.getLogger("posthog") def is_naive(dt): """Determines if a given datetime.datetime is naive.""" return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None def total_seconds(delta): """Determines total seconds with python < 2.7 compat.""" # http://stackoverflow.com/questions/3694835/python-2-6-5-divide-timedelta-with-timedelta return (delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 1e6) / 1e6 def guess_timezone(dt): """Attempts to convert a naive datetime to an aware datetime.""" if is_naive(dt): # attempts to guess the datetime.datetime.now() local timezone # case, and then defaults to utc delta = datetime.now() - dt if total_seconds(delta) < 5: # this was created using datetime.datetime.now() # so we are in the local timezone return dt.replace(tzinfo=tzlocal()) else: # at this point, the best we can do is guess UTC return dt.replace(tzinfo=tzutc()) return dt def remove_trailing_slash(host): if host.endswith("/"): return host[:-1] return host def clean(item): if isinstance(item, Decimal): return float(item) if isinstance(item, UUID): return str(item) if isinstance(item, (six.string_types, bool, numbers.Number, datetime, date, type(None))): return item if isinstance(item, (set, list, tuple)): return _clean_list(item) # Pydantic model try: # v2+ if hasattr(item, "model_dump") and callable(item.model_dump): item = item.model_dump() # v1 elif hasattr(item, "dict") and callable(item.dict): item = item.dict() except TypeError as e: log.debug(f"Could not serialize Pydantic-like model: {e}") pass if isinstance(item, dict): return _clean_dict(item) if is_dataclass(item) and not isinstance(item, type): return _clean_dataclass(item) return _coerce_unicode(item) def _clean_list(list_): return [clean(item) for item in list_] def _clean_dict(dict_): data = {} for k, v in six.iteritems(dict_): try: data[k] = clean(v) except TypeError: log.warning( 'Dictionary values must be serializeable to JSON "%s" value %s of type %s is unsupported.', k, v, type(v), ) return data def _clean_dataclass(dataclass_): data = asdict(dataclass_) data = _clean_dict(data) return data def _coerce_unicode(cmplx): try: item = cmplx.decode("utf-8", "strict") except AttributeError as exception: item = ":".join(exception) item.decode("utf-8", "strict") log.warning("Error decoding: %s", item) return None return item def is_valid_regex(value) -> bool: try: re.compile(value) return True except re.error: return False class SizeLimitedDict(defaultdict): def __init__(self, max_size, *args, **kwargs): super().__init__(*args, **kwargs) self.max_size = max_size def __setitem__(self, key, value): if len(self) >= self.max_size: self.clear() super().__setitem__(key, value) def convert_to_datetime_aware(date_obj): if date_obj.tzinfo is None: date_obj = date_obj.replace(tzinfo=timezone.utc) return date_obj def str_icontains(source, search): """ Check if a string contains another string, ignoring case. Args: source: The string to search within search: The substring to search for Returns: bool: True if search is a substring of source (case-insensitive), False otherwise Examples: >>> str_icontains("Hello World", "WORLD") True >>> str_icontains("Hello World", "python") False """ return str(search).casefold() in str(source).casefold() def str_iequals(value, comparand): """ Check if a string equals another string, ignoring case. Args: value: The string to compare comparand: The string to compare with Returns: bool: True if value and comparand are equal (case-insensitive), False otherwise Examples: >>> str_iequals("Hello World", "hello world") True >>> str_iequals("Hello World", "hello") False """ return str(value).casefold() == str(comparand).casefold()
Memory