# Portions of this file are derived from getsentry/sentry-javascript by Software, Inc. dba Sentry # Licensed under the MIT License # copied and adapted from https://github.com/getsentry/sentry-python/blob/269d96d6e9821122fbff280e6a26956e5ed03c0b/sentry_sdk/utils.py#L689 # 💖open source (under MIT License) # We want to keep payloads as similar to Sentry as possible for easy interoperability import linecache import os import re import sys from datetime import datetime from typing import TYPE_CHECKING try: # Python 3.11 from builtins import BaseExceptionGroup except ImportError: # Python 3.10 and below BaseExceptionGroup = None # type: ignore DEFAULT_MAX_VALUE_LENGTH = 1024 if TYPE_CHECKING: from types import FrameType, TracebackType from typing import ( # noqa: F401 Any, Callable, Dict, Iterator, List, Literal, Optional, Set, Tuple, Type, TypedDict, TypeVar, Union, cast, ) ExcInfo = Union[ Tuple[Type[BaseException], BaseException, Optional[TracebackType]], Tuple[None, None, None], ] LogLevelStr = Literal["fatal", "critical", "error", "warning", "info", "debug"] Event = TypedDict( "Event", { "breadcrumbs": Dict[Literal["values"], List[Dict[str, Any]]], # TODO: We can expand on this type "check_in_id": str, "contexts": Dict[str, Dict[str, object]], "dist": str, "duration": Optional[float], "environment": str, "errors": List[Dict[str, Any]], # TODO: We can expand on this type "event_id": str, "exception": Dict[Literal["values"], List[Dict[str, Any]]], # TODO: We can expand on this type # "extra": MutableMapping[str, object], # "fingerprint": List[str], "level": LogLevelStr, # "logentry": Mapping[str, object], "logger": str, # "measurements": Dict[str, MeasurementValue], "message": str, "modules": Dict[str, str], # "monitor_config": Mapping[str, object], "monitor_slug": Optional[str], "platform": Literal["python"], "profile": object, # Should be sentry_sdk.profiler.Profile, but we can't import that here due to circular imports "release": str, "request": Dict[str, object], # "sdk": Mapping[str, object], "server_name": str, "spans": List[Dict[str, object]], "stacktrace": Dict[str, object], # We access this key in the code, but I am unsure whether we ever set it "start_timestamp": datetime, "status": Optional[str], # "tags": MutableMapping[ # str, str # ], # Tags must be less than 200 characters each "threads": Dict[Literal["values"], List[Dict[str, Any]]], # TODO: We can expand on this type "timestamp": Optional[datetime], # Must be set before sending the event "transaction": str, # "transaction_info": Mapping[str, Any], # TODO: We can expand on this type "type": Literal["check_in", "transaction"], "user": Dict[str, object], "_metrics_summary": Dict[str, object], }, total=False, ) epoch = datetime(1970, 1, 1) BASE64_ALPHABET = re.compile(r"^[a-zA-Z0-9/+=]*$") SENSITIVE_DATA_SUBSTITUTE = "[Filtered]" def to_timestamp(value): # type: (datetime) -> float return (value - epoch).total_seconds() def format_timestamp(value): # type: (datetime) -> str return value.strftime("%Y-%m-%dT%H:%M:%S.%fZ") def event_hint_with_exc_info(exc_info=None): # type: (Optional[ExcInfo]) -> Dict[str, Optional[ExcInfo]] """Creates a hint with the exc info filled in.""" if exc_info is None: exc_info = sys.exc_info() else: exc_info = exc_info_from_error(exc_info) if exc_info[0] is None: exc_info = None return {"exc_info": exc_info} class AnnotatedValue: """ Meta information for a data field in the event payload. This is to tell Relay that we have tampered with the fields value. See: https://github.com/getsentry/relay/blob/be12cd49a0f06ea932ed9b9f93a655de5d6ad6d1/relay-general/src/types/meta.rs#L407-L423 """ __slots__ = ("value", "metadata") def __init__(self, value, metadata): # type: (Optional[Any], Dict[str, Any]) -> None self.value = value self.metadata = metadata def __eq__(self, other): # type: (Any) -> bool if not isinstance(other, AnnotatedValue): return False return self.value == other.value and self.metadata == other.metadata @classmethod def removed_because_raw_data(cls): # type: () -> AnnotatedValue """The value was removed because it could not be parsed. This is done for request body values that are not json nor a form.""" return AnnotatedValue( value="", metadata={ "rem": [ # Remark [ "!raw", # Unparsable raw data "x", # The fields original value was removed ] ] }, ) @classmethod def removed_because_over_size_limit(cls): # type: () -> AnnotatedValue """The actual value was removed because the size of the field exceeded the configured maximum size (specified with the max_request_body_size sdk option)""" return AnnotatedValue( value="", metadata={ "rem": [ # Remark [ "!config", # Because of configured maximum size "x", # The fields original value was removed ] ] }, ) @classmethod def substituted_because_contains_sensitive_data(cls): # type: () -> AnnotatedValue """The actual value was removed because it contained sensitive information.""" return AnnotatedValue( value=SENSITIVE_DATA_SUBSTITUTE, metadata={ "rem": [ # Remark [ "!config", # Because of SDK configuration (in this case the config is the hard coded removal of certain django cookies) "s", # The fields original value was substituted ] ] }, ) if TYPE_CHECKING: T = TypeVar("T") Annotated = Union[AnnotatedValue, T] def get_type_name(cls): # type: (Optional[type]) -> Optional[str] return getattr(cls, "__qualname__", None) or getattr(cls, "__name__", None) def get_type_module(cls): # type: (Optional[type]) -> Optional[str] mod = getattr(cls, "__module__", None) if mod not in (None, "builtins", "__builtins__"): return mod return None def should_hide_frame(frame: "FrameType") -> bool: try: mod = frame.f_globals["__name__"] if mod.startswith("sentry_sdk."): return True except (AttributeError, KeyError): pass for flag_name in "__traceback_hide__", "__tracebackhide__": try: if frame.f_locals[flag_name]: return True except Exception: pass return False def iter_stacks(tb): # type: (Optional[TracebackType]) -> Iterator[TracebackType] tb_ = tb # type: Optional[TracebackType] while tb_ is not None: if not should_hide_frame(tb_.tb_frame): yield tb_ tb_ = tb_.tb_next def get_lines_from_file( filename, # type: str lineno, # type: int max_length=None, # type: Optional[int] loader=None, # type: Optional[Any] module=None, # type: Optional[str] ): # type: (...) -> Tuple[List[Annotated[str]], Optional[Annotated[str]], List[Annotated[str]]] context_lines = 5 source = None if loader is not None and hasattr(loader, "get_source"): try: source_str = loader.get_source(module) # type: Optional[str] except (ImportError, IOError): source_str = None if source_str is not None: source = source_str.splitlines() if source is None: try: source = linecache.getlines(filename) except (OSError, IOError): return [], None, [] if not source: return [], None, [] lower_bound = max(0, lineno - context_lines) upper_bound = min(lineno + 1 + context_lines, len(source)) try: pre_context = [strip_string(line.strip("\r\n"), max_length=max_length) for line in source[lower_bound:lineno]] context_line = strip_string(source[lineno].strip("\r\n"), max_length=max_length) post_context = [ strip_string(line.strip("\r\n"), max_length=max_length) for line in source[(lineno + 1) : upper_bound] # noqa: E203 ] return pre_context, context_line, post_context except IndexError: # the file may have changed since it was loaded into memory return [], None, [] def get_source_context( frame, # type: FrameType tb_lineno, # type: int max_value_length=None, # type: Optional[int] ): # type: (...) -> Tuple[List[Annotated[str]], Optional[Annotated[str]], List[Annotated[str]]] try: abs_path = frame.f_code.co_filename # type: Optional[str] except Exception: abs_path = None try: module = frame.f_globals["__name__"] except Exception: return [], None, [] try: loader = frame.f_globals["__loader__"] except Exception: loader = None lineno = tb_lineno - 1 if lineno is not None and abs_path: return get_lines_from_file(abs_path, lineno, max_value_length, loader=loader, module=module) return [], None, [] def safe_str(value): # type: (Any) -> str try: return str(value) except Exception: return safe_repr(value) def safe_repr(value): # type: (Any) -> str try: return repr(value) except Exception: return "<broken repr>" def filename_for_module(module, abs_path): # type: (Optional[str], Optional[str]) -> Optional[str] if not abs_path or not module: return abs_path try: if abs_path.endswith(".pyc"): abs_path = abs_path[:-1] base_module = module.split(".", 1)[0] if base_module == module: return os.path.basename(abs_path) base_module_path = sys.modules[base_module].__file__ if not base_module_path: return abs_path return abs_path.split(base_module_path.rsplit(os.sep, 2)[0], 1)[-1].lstrip(os.sep) except Exception: return abs_path def serialize_frame( frame, tb_lineno=None, include_local_variables=True, include_source_context=True, max_value_length=None, custom_repr=None, ): # type: (FrameType, Optional[int], bool, bool, Optional[int], Optional[Callable[..., Optional[str]]]) -> Dict[str, Any] f_code = getattr(frame, "f_code", None) if not f_code: abs_path = None function = None else: abs_path = frame.f_code.co_filename function = frame.f_code.co_name try: module = frame.f_globals["__name__"] except Exception: module = None if tb_lineno is None: tb_lineno = frame.f_lineno rv = { "platform": "python", "filename": filename_for_module(module, abs_path) or None, "abs_path": os.path.abspath(abs_path) if abs_path else None, "function": function or "<unknown>", "module": module, "lineno": tb_lineno, } # type: Dict[str, Any] if include_source_context: rv["pre_context"], rv["context_line"], rv["post_context"] = get_source_context( frame, tb_lineno, max_value_length ) if include_local_variables: # TODO(nk): Sort out this current invalid import # from sentry_sdk.serializer import serialize # rv["vars"] = serialize( # dict(frame.f_locals), is_vars=True, custom_repr=custom_repr # ) pass return rv def current_stacktrace( include_local_variables=True, # type: bool include_source_context=True, # type: bool max_value_length=None, # type: Optional[int] ): # type: (...) -> Dict[str, Any] __tracebackhide__ = True frames = [] f = sys._getframe() # type: Optional[FrameType] while f is not None: if not should_hide_frame(f): frames.append( serialize_frame( f, include_local_variables=include_local_variables, include_source_context=include_source_context, max_value_length=max_value_length, ) ) f = f.f_back frames.reverse() return {"frames": frames, "type": "raw"} def get_errno(exc_value): # type: (BaseException) -> Optional[Any] return getattr(exc_value, "errno", None) def get_error_message(exc_value): # type: (Optional[BaseException]) -> str return getattr(exc_value, "message", "") or getattr(exc_value, "detail", "") or safe_str(exc_value) def single_exception_from_error_tuple( exc_type, # type: Optional[type] exc_value, # type: Optional[BaseException] tb, # type: Optional[TracebackType] client_options=None, # type: Optional[Dict[str, Any]] mechanism=None, # type: Optional[Dict[str, Any]] exception_id=None, # type: Optional[int] parent_id=None, # type: Optional[int] source=None, # type: Optional[str] ): # type: (...) -> Dict[str, Any] """ Creates a dict that goes into the events `exception.values` list and is ingestible by Sentry. See the Exception Interface documentation for more details: https://develop.sentry.dev/sdk/event-payloads/exception/ """ exception_value = {} # type: Dict[str, Any] exception_value["mechanism"] = mechanism.copy() if mechanism else {"type": "generic", "handled": True} if exception_id is not None: exception_value["mechanism"]["exception_id"] = exception_id if exc_value is not None: errno = get_errno(exc_value) else: errno = None if errno is not None: exception_value["mechanism"].setdefault("meta", {}).setdefault("errno", {}).setdefault("number", errno) if source is not None: exception_value["mechanism"]["source"] = source is_root_exception = exception_id == 0 if not is_root_exception and parent_id is not None: exception_value["mechanism"]["parent_id"] = parent_id exception_value["mechanism"]["type"] = "chained" if is_root_exception and "type" not in exception_value["mechanism"]: exception_value["mechanism"]["type"] = "generic" is_exception_group = BaseExceptionGroup is not None and isinstance(exc_value, BaseExceptionGroup) if is_exception_group: exception_value["mechanism"]["is_exception_group"] = True exception_value["module"] = get_type_module(exc_type) exception_value["type"] = get_type_name(exc_type) exception_value["value"] = get_error_message(exc_value) if client_options is None: include_local_variables = True include_source_context = True max_value_length = DEFAULT_MAX_VALUE_LENGTH # fallback custom_repr = None else: include_local_variables = client_options["include_local_variables"] include_source_context = client_options["include_source_context"] max_value_length = client_options["max_value_length"] custom_repr = client_options.get("custom_repr") frames = [ serialize_frame( tb.tb_frame, tb_lineno=tb.tb_lineno, include_local_variables=include_local_variables, include_source_context=include_source_context, max_value_length=max_value_length, custom_repr=custom_repr, ) for tb in iter_stacks(tb) ] if frames: exception_value["stacktrace"] = {"frames": frames, "type": "raw"} return exception_value HAS_CHAINED_EXCEPTIONS = hasattr(Exception, "__suppress_context__") if HAS_CHAINED_EXCEPTIONS: def walk_exception_chain(exc_info): # type: (ExcInfo) -> Iterator[ExcInfo] exc_type, exc_value, tb = exc_info seen_exceptions = [] seen_exception_ids = set() # type: Set[int] while exc_type is not None and exc_value is not None and id(exc_value) not in seen_exception_ids: yield exc_type, exc_value, tb # Avoid hashing random types we don't know anything # about. Use the list to keep a ref so that the `id` is # not used for another object. seen_exceptions.append(exc_value) seen_exception_ids.add(id(exc_value)) if exc_value.__suppress_context__: cause = exc_value.__cause__ else: cause = exc_value.__context__ if cause is None: break exc_type = type(cause) exc_value = cause tb = getattr(cause, "__traceback__", None) else: def walk_exception_chain(exc_info): # type: (ExcInfo) -> Iterator[ExcInfo] yield exc_info def exceptions_from_error( exc_type, # type: Optional[type] exc_value, # type: Optional[BaseException] tb, # type: Optional[TracebackType] client_options=None, # type: Optional[Dict[str, Any]] mechanism=None, # type: Optional[Dict[str, Any]] exception_id=0, # type: int parent_id=0, # type: int source=None, # type: Optional[str] ): # type: (...) -> Tuple[int, List[Dict[str, Any]]] """ Creates the list of exceptions. This can include chained exceptions and exceptions from an ExceptionGroup. See the Exception Interface documentation for more details: https://develop.sentry.dev/sdk/event-payloads/exception/ """ parent = single_exception_from_error_tuple( exc_type=exc_type, exc_value=exc_value, tb=tb, client_options=client_options, mechanism=mechanism, exception_id=exception_id, parent_id=parent_id, source=source, ) exceptions = [parent] parent_id = exception_id exception_id += 1 should_supress_context = hasattr(exc_value, "__suppress_context__") and exc_value.__suppress_context__ # type: ignore if should_supress_context: # Add direct cause. # The field `__cause__` is set when raised with the exception (using the `from` keyword). exception_has_cause = exc_value and hasattr(exc_value, "__cause__") and exc_value.__cause__ is not None if exception_has_cause: cause = exc_value.__cause__ # type: ignore (exception_id, child_exceptions) = exceptions_from_error( exc_type=type(cause), exc_value=cause, tb=getattr(cause, "__traceback__", None), client_options=client_options, mechanism=mechanism, exception_id=exception_id, source="__cause__", ) exceptions.extend(child_exceptions) else: # Add indirect cause. # The field `__context__` is assigned if another exception occurs while handling the exception. exception_has_content = exc_value and hasattr(exc_value, "__context__") and exc_value.__context__ is not None if exception_has_content: context = exc_value.__context__ # type: ignore (exception_id, child_exceptions) = exceptions_from_error( exc_type=type(context), exc_value=context, tb=getattr(context, "__traceback__", None), client_options=client_options, mechanism=mechanism, exception_id=exception_id, source="__context__", ) exceptions.extend(child_exceptions) # Add exceptions from an ExceptionGroup. is_exception_group = exc_value and hasattr(exc_value, "exceptions") if is_exception_group: for idx, e in enumerate(exc_value.exceptions): # type: ignore (exception_id, child_exceptions) = exceptions_from_error( exc_type=type(e), exc_value=e, tb=getattr(e, "__traceback__", None), client_options=client_options, mechanism=mechanism, exception_id=exception_id, parent_id=parent_id, source="exceptions[%s]" % idx, ) exceptions.extend(child_exceptions) return (exception_id, exceptions) def exceptions_from_error_tuple( exc_info, # type: ExcInfo client_options=None, # type: Optional[Dict[str, Any]] mechanism=None, # type: Optional[Dict[str, Any]] ): # type: (...) -> List[Dict[str, Any]] exc_type, exc_value, tb = exc_info is_exception_group = BaseExceptionGroup is not None and isinstance(exc_value, BaseExceptionGroup) if is_exception_group: (_, exceptions) = exceptions_from_error( exc_type=exc_type, exc_value=exc_value, tb=tb, client_options=client_options, mechanism=mechanism, exception_id=0, parent_id=0, ) else: exceptions = [] for exc_type, exc_value, tb in walk_exception_chain(exc_info): exceptions.append(single_exception_from_error_tuple(exc_type, exc_value, tb, client_options, mechanism)) exceptions.reverse() return exceptions def to_string(value): # type: (str) -> str try: return str(value) except UnicodeDecodeError: return repr(value)[1:-1] def iter_event_stacktraces(event): # type: (Event) -> Iterator[Dict[str, Any]] if "stacktrace" in event: yield event["stacktrace"] if "threads" in event: for thread in event["threads"].get("values") or (): if "stacktrace" in thread: yield thread["stacktrace"] if "exception" in event: for exception in event["exception"].get("values") or (): if "stacktrace" in exception: yield exception["stacktrace"] def iter_event_frames(event): # type: (Event) -> Iterator[Dict[str, Any]] for stacktrace in iter_event_stacktraces(event): for frame in stacktrace.get("frames") or (): yield frame def handle_in_app(event, in_app_exclude=None, in_app_include=None, project_root=None): # type: (Event, Optional[List[str]], Optional[List[str]], Optional[str]) -> Event for stacktrace in iter_event_stacktraces(event): set_in_app_in_frames( stacktrace.get("frames"), in_app_exclude=in_app_exclude, in_app_include=in_app_include, project_root=project_root, ) return event def set_in_app_in_frames(frames, in_app_exclude, in_app_include, project_root=None): # type: (Any, Optional[List[str]], Optional[List[str]], Optional[str]) -> Optional[Any] if not frames: return None for frame in frames: # if frame has already been marked as in_app, skip it current_in_app = frame.get("in_app") if current_in_app is not None: continue module = frame.get("module") # check if module in frame is in the list of modules to include if _module_in_list(module, in_app_include): frame["in_app"] = True continue # check if module in frame is in the list of modules to exclude if _module_in_list(module, in_app_exclude): frame["in_app"] = False continue # if frame has no abs_path, skip further checks abs_path = frame.get("abs_path") if abs_path is None: continue if _is_external_source(abs_path): frame["in_app"] = False continue if _is_in_project_root(abs_path, project_root): frame["in_app"] = True continue return frames def exc_info_from_error(error): # type: (Union[BaseException, ExcInfo]) -> ExcInfo if isinstance(error, tuple) and len(error) == 3: exc_type, exc_value, tb = error elif isinstance(error, BaseException): tb = getattr(error, "__traceback__", None) if tb is not None: exc_type = type(error) exc_value = error else: exc_type, exc_value, tb = sys.exc_info() if exc_value is not error: tb = None exc_value = error exc_type = type(error) else: raise ValueError("Expected Exception object to report, got %s!" % type(error)) exc_info = (exc_type, exc_value, tb) if TYPE_CHECKING: # This cast is safe because exc_type and exc_value are either both # None or both not None. exc_info = cast(ExcInfo, exc_info) return exc_info def event_from_exception( exc_info, # type: Union[BaseException, ExcInfo] client_options=None, # type: Optional[Dict[str, Any]] mechanism=None, # type: Optional[Dict[str, Any]] ): # type: (...) -> Tuple[Event, Dict[str, Any]] exc_info = exc_info_from_error(exc_info) hint = event_hint_with_exc_info(exc_info) return ( { "level": "error", "exception": {"values": exceptions_from_error_tuple(exc_info, client_options, mechanism)}, }, hint, ) def _module_in_list(name, items): # type: (str | None, Optional[List[str]]) -> bool if name is None: return False if not items: return False for item in items: if item == name or name.startswith(item + "."): return True return False def _is_external_source(abs_path): # type: (str) -> bool # check if frame is in 'site-packages' or 'dist-packages' external_source = re.search(r"[\\/](?:dist|site)-packages[\\/]", abs_path) is not None return external_source def _is_in_project_root(abs_path, project_root): # type: (str, Optional[str]) -> bool if project_root is None: return False # check if path is in the project root if abs_path.startswith(project_root): return True return False def _truncate_by_bytes(string, max_bytes): # type: (str, int) -> str """ Truncate a UTF-8-encodable string to the last full codepoint so that it fits in max_bytes. """ truncated = string.encode("utf-8")[: max_bytes - 3].decode("utf-8", errors="ignore") return truncated + "..." def _get_size_in_bytes(value): # type: (str) -> Optional[int] try: return len(value.encode("utf-8")) except (UnicodeEncodeError, UnicodeDecodeError): return None def strip_string(value, max_length=None): # type: (str, Optional[int]) -> Union[AnnotatedValue, str] if not value: return value if max_length is None: max_length = DEFAULT_MAX_VALUE_LENGTH byte_size = _get_size_in_bytes(value) text_size = len(value) if byte_size is not None and byte_size > max_length: # truncate to max_length bytes, preserving code points truncated_value = _truncate_by_bytes(value, max_length) elif text_size is not None and text_size > max_length: # fallback to truncating by string length truncated_value = value[: max_length - 3] + "..." else: return value return AnnotatedValue( value=truncated_value, metadata={ "len": byte_size or text_size, "rem": [["!limit", "x", max_length - 3, max_length]], }, )
Memory