# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-locals
"""
The opentelemetry-instrumentation-asgi package provides an ASGI middleware that can be used
on any ASGI framework (such as Django-channels / Quart) to track request timing through OpenTelemetry.
Usage (Quart)
-------------
.. code-block:: python
from quart import Quart
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
app = Quart(__name__)
app.asgi_app = OpenTelemetryMiddleware(app.asgi_app)
@app.route("/")
async def hello():
return "Hello!"
if __name__ == "__main__":
app.run(debug=True)
Usage (Django 3.0)
------------------
Modify the application's ``asgi.py`` file as shown below.
.. code-block:: python
import os
from django.core.asgi import get_asgi_application
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'asgi_example.settings')
application = get_asgi_application()
application = OpenTelemetryMiddleware(application)
Usage (Raw ASGI)
----------------
.. code-block:: python
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
app = ... # An ASGI application.
app = OpenTelemetryMiddleware(app)
Configuration
-------------
Request/Response hooks
**********************
This instrumentation supports request and response hooks. These are functions that get called
right after a span is created for a request and right before the span is finished for the response.
- The server request hook is passed a server span and ASGI scope object for every incoming request.
- The client request hook is called with the internal span and an ASGI scope when the method ``receive`` is called.
- The client response hook is called with the internal span and an ASGI event when the method ``send`` is called.
For example,
.. code-block:: python
def server_request_hook(span: Span, scope: dict[str, Any]):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_request_hook", "some-value")
def client_request_hook(span: Span, scope: dict[str, Any], message: dict[str, Any]):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_client_request_hook", "some-value")
def client_response_hook(span: Span, scope: dict[str, Any], message: dict[str, Any]):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_response_hook", "some-value")
OpenTelemetryMiddleware().(application, server_request_hook=server_request_hook, client_request_hook=client_request_hook, client_response_hook=client_response_hook)
Capture HTTP request and response headers
*****************************************
You can configure the agent to capture specified HTTP headers as span attributes, according to the
`semantic convention <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-request-and-response-headers>`_.
Request headers
***************
To capture HTTP request headers as span attributes, set the environment variable
``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST`` to a comma delimited list of HTTP header names.
For example,
::
export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST="content-type,custom_request_header"
will extract ``content-type`` and ``custom_request_header`` from the request headers and add them as span attributes.
Request header names in ASGI are case-insensitive. So, giving the header name as ``CUStom-Header`` in the environment
variable will capture the header named ``custom-header``.
Regular expressions may also be used to match multiple headers that correspond to the given pattern. For example:
::
export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST="Accept.*,X-.*"
Would match all request headers that start with ``Accept`` and ``X-``.
To capture all request headers, set ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST`` to ``".*"``.
::
export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST=".*"
The name of the added span attribute will follow the format ``http.request.header.<header_name>`` where ``<header_name>``
is the normalized HTTP header name (lowercase, with ``-`` replaced by ``_``). The value of the attribute will be a
list containing the header values.
For example:
``http.request.header.custom_request_header = ["<value1>", "<value2>"]``
Response headers
****************
To capture HTTP response headers as span attributes, set the environment variable
``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE`` to a comma delimited list of HTTP header names.
For example,
::
export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE="content-type,custom_response_header"
will extract ``content-type`` and ``custom_response_header`` from the response headers and add them as span attributes.
Response header names in ASGI are case-insensitive. So, giving the header name as ``CUStom-Header`` in the environment
variable will capture the header named ``custom-header``.
Regular expressions may also be used to match multiple headers that correspond to the given pattern. For example:
::
export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE="Content.*,X-.*"
Would match all response headers that start with ``Content`` and ``X-``.
To capture all response headers, set ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE`` to ``".*"``.
::
export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE=".*"
The name of the added span attribute will follow the format ``http.response.header.<header_name>`` where ``<header_name>``
is the normalized HTTP header name (lowercase, with ``-`` replaced by ``_``). The value of the attribute will be a
list containing the header values.
For example:
``http.response.header.custom_response_header = ["<value1>", "<value2>"]``
Sanitizing headers
******************
In order to prevent storing sensitive data such as personally identifiable information (PII), session keys, passwords,
etc, set the environment variable ``OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS``
to a comma delimited list of HTTP header names to be sanitized. Regexes may be used, and all header names will be
matched in a case-insensitive manner.
For example,
::
export OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS=".*session.*,set-cookie"
will replace the value of headers such as ``session-id`` and ``set-cookie`` with ``[REDACTED]`` in the span.
Note:
The environment variable names used to capture HTTP headers are still experimental, and thus are subject to change.
API
---
"""
from __future__ import annotations
import typing
import urllib
from collections import defaultdict
from functools import wraps
from timeit import default_timer
from typing import Any, Awaitable, Callable, DefaultDict, Tuple
from asgiref.compatibility import guarantee_single_callable
from opentelemetry import context, trace
from opentelemetry.instrumentation._semconv import (
_filter_semconv_active_request_count_attr,
_filter_semconv_duration_attrs,
_get_schema_url,
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
_report_new,
_report_old,
_server_active_requests_count_attrs_new,
_server_active_requests_count_attrs_old,
_server_duration_attrs_new,
_server_duration_attrs_old,
_set_http_flavor_version,
_set_http_host_server,
_set_http_method,
_set_http_net_host_port,
_set_http_peer_ip_server,
_set_http_peer_port_server,
_set_http_scheme,
_set_http_target,
_set_http_url,
_set_http_user_agent,
_set_status,
_StabilityMode,
)
from opentelemetry.instrumentation.asgi.types import (
ClientRequestHook,
ClientResponseHook,
ServerRequestHook,
)
from opentelemetry.instrumentation.asgi.version import __version__ # noqa
from opentelemetry.instrumentation.propagators import (
get_global_response_propagator,
)
from opentelemetry.instrumentation.utils import _start_internal_or_server_span
from opentelemetry.metrics import get_meter
from opentelemetry.propagators.textmap import Getter, Setter
from opentelemetry.semconv._incubating.metrics.http_metrics import (
create_http_server_active_requests,
create_http_server_request_body_size,
create_http_server_response_body_size,
)
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.semconv.metrics.http_metrics import (
HTTP_SERVER_REQUEST_DURATION,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import set_span_in_context
from opentelemetry.util.http import (
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS,
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST,
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE,
SanitizeValue,
_parse_url_query,
get_custom_headers,
normalise_request_header_name,
normalise_response_header_name,
remove_url_credentials,
sanitize_method,
)
class ASGIGetter(Getter[dict]):
def get(
self, carrier: dict, key: str
) -> typing.Optional[typing.List[str]]:
"""Getter implementation to retrieve a HTTP header value from the ASGI
scope.
Args:
carrier: ASGI scope object
key: header name in scope
Returns:
A list with a single string with the header value if it exists,
else None.
"""
headers = carrier.get("headers")
if not headers:
return None
# ASGI header keys are in lower case
key = key.lower()
decoded = [
_decode_header_item(_value)
for (_key, _value) in headers
if _decode_header_item(_key).lower() == key
]
if not decoded:
return None
return decoded
def keys(self, carrier: dict) -> typing.List[str]:
headers = carrier.get("headers") or []
return [_decode_header_item(_key) for (_key, _value) in headers]
asgi_getter = ASGIGetter()
class ASGISetter(Setter[dict]):
def set(self, carrier: dict, key: str, value: str) -> None: # pylint: disable=no-self-use
"""Sets response header values on an ASGI scope according to `the spec <https://asgi.readthedocs.io/en/latest/specs/www.html#response-start-send-event>`_.
Args:
carrier: ASGI scope object
key: response header name to set
value: response header value
Returns:
None
"""
headers = carrier.get("headers")
if not headers:
headers = []
carrier["headers"] = headers
headers.append([key.lower().encode(), value.encode()])
asgi_setter = ASGISetter()
# pylint: disable=too-many-branches
def collect_request_attributes(
scope, sem_conv_opt_in_mode=_StabilityMode.DEFAULT
):
"""Collects HTTP request attributes from the ASGI scope and returns a
dictionary to be used as span creation attributes."""
server_host, port, http_url = get_host_port_url_tuple(scope)
query_string = scope.get("query_string")
if query_string and http_url:
if isinstance(query_string, bytes):
query_string = query_string.decode("utf8")
http_url += "?" + urllib.parse.unquote(query_string)
result = {}
scheme = scope.get("scheme")
if scheme:
_set_http_scheme(result, scheme, sem_conv_opt_in_mode)
if server_host:
_set_http_host_server(result, server_host, sem_conv_opt_in_mode)
if port:
_set_http_net_host_port(result, port, sem_conv_opt_in_mode)
flavor = scope.get("http_version")
if flavor:
_set_http_flavor_version(result, flavor, sem_conv_opt_in_mode)
path = scope.get("path")
if path:
_set_http_target(
result, path, path, query_string, sem_conv_opt_in_mode
)
if http_url:
if _report_old(sem_conv_opt_in_mode):
_set_http_url(
result,
remove_url_credentials(http_url),
_StabilityMode.DEFAULT,
)
http_method = scope.get("method", "")
if http_method:
_set_http_method(
result,
http_method,
sanitize_method(http_method),
sem_conv_opt_in_mode,
)
http_host_value_list = asgi_getter.get(scope, "host")
if http_host_value_list:
if _report_old(sem_conv_opt_in_mode):
result[SpanAttributes.HTTP_SERVER_NAME] = ",".join(
http_host_value_list
)
http_user_agent = asgi_getter.get(scope, "user-agent")
if http_user_agent:
_set_http_user_agent(result, http_user_agent[0], sem_conv_opt_in_mode)
if "client" in scope and scope["client"] is not None:
_set_http_peer_ip_server(
result, scope.get("client")[0], sem_conv_opt_in_mode
)
_set_http_peer_port_server(
result, scope.get("client")[1], sem_conv_opt_in_mode
)
# remove None values
result = {k: v for k, v in result.items() if v is not None}
return result
def collect_custom_headers_attributes(
scope_or_response_message: dict[str, Any],
sanitize: SanitizeValue,
header_regexes: list[str],
normalize_names: Callable[[str], str],
) -> dict[str, list[str]]:
"""
Returns custom HTTP request or response headers to be added into SERVER span as span attributes.
Refer specifications:
- https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-request-and-response-headers
"""
headers: DefaultDict[str, list[str]] = defaultdict(list)
raw_headers = scope_or_response_message.get("headers")
if raw_headers:
for key, value in raw_headers:
# Decode headers before processing.
headers[_decode_header_item(key)].append(
_decode_header_item(value)
)
return sanitize.sanitize_header_values(
headers,
header_regexes,
normalize_names,
)
def get_host_port_url_tuple(scope):
"""Returns (host, port, full_url) tuple."""
server = scope.get("server") or ["0.0.0.0", 80]
port = server[1]
server_host = server[0] + (":" + str(port) if str(port) != "80" else "")
# using the scope path is enough, see:
# - https://asgi.readthedocs.io/en/latest/specs/www.html#http-connection-scope (see: root_path and path)
# - https://asgi.readthedocs.io/en/latest/specs/www.html#wsgi-compatibility (see: PATH_INFO)
# PATH_INFO can be derived by stripping root_path from path
# -> that means that the path should contain the root_path already, so prefixing it again is not necessary
# - https://wsgi.readthedocs.io/en/latest/definitions.html#envvar-PATH_INFO
full_path = scope.get("path", "")
http_url = scope.get("scheme", "http") + "://" + server_host + full_path
return server_host, port, http_url
def set_status_code(
span,
status_code,
metric_attributes=None,
sem_conv_opt_in_mode=_StabilityMode.DEFAULT,
):
"""Adds HTTP response attributes to span using the status_code argument."""
status_code_str = str(status_code)
try:
status_code = int(status_code)
except ValueError:
status_code = -1
if metric_attributes is None:
metric_attributes = {}
_set_status(
span,
metric_attributes,
status_code,
status_code_str,
server_span=True,
sem_conv_opt_in_mode=sem_conv_opt_in_mode,
)
def get_default_span_details(scope: dict) -> Tuple[str, dict]:
"""
Default span name is the HTTP method and URL path, or just the method.
https://github.com/open-telemetry/opentelemetry-specification/pull/3165
https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/http/#name
Args:
scope: the ASGI scope dictionary
Returns:
a tuple of the span name, and any attributes to attach to the span.
"""
path = scope.get("path", "").strip()
method = sanitize_method(scope.get("method", "").strip())
if method == "_OTHER":
method = "HTTP"
if method and path: # http
return f"{method} {path}", {}
if path: # websocket
return path, {}
return method, {} # http with no path
def _collect_target_attribute(
scope: typing.Dict[str, typing.Any],
) -> typing.Optional[str]:
"""
Returns the target path as defined by the Semantic Conventions.
This value is suitable to use in metrics as it should replace concrete
values with a parameterized name. Example: /api/users/{user_id}
Refer to the specification
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/http-metrics.md#parameterized-attributes
Note: this function requires specific code for each framework, as there's no
standard attribute to use.
"""
# FastAPI
root_path = scope.get("root_path", "")
route = scope.get("route")
path_format = getattr(route, "path_format", None)
if path_format:
return f"{root_path}{path_format}"
return None
class OpenTelemetryMiddleware:
"""The ASGI application middleware.
This class is an ASGI middleware that starts and annotates spans for any
requests it is invoked with.
Args:
app: The ASGI application callable to forward requests to.
default_span_details: Callback which should return a string and a tuple, representing the desired default span name and a
dictionary with any additional span attributes to set.
Optional: Defaults to get_default_span_details.
server_request_hook: Optional callback which is called with the server span and ASGI
scope object for every incoming request.
client_request_hook: Optional callback which is called with the internal span, and ASGI
scope and event which are sent as dictionaries for when the method receive is called.
client_response_hook: Optional callback which is called with the internal span, and ASGI
scope and event which are sent as dictionaries for when the method send is called.
tracer_provider: The optional tracer provider to use. If omitted
the current globally configured one is used.
meter_provider: The optional meter provider to use. If omitted
the current globally configured one is used.
exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace.
"""
# pylint: disable=too-many-branches
def __init__(
self,
app,
excluded_urls=None,
default_span_details=None,
server_request_hook: ServerRequestHook = None,
client_request_hook: ClientRequestHook = None,
client_response_hook: ClientResponseHook = None,
tracer_provider=None,
meter_provider=None,
tracer=None,
meter=None,
http_capture_headers_server_request: list[str] | None = None,
http_capture_headers_server_response: list[str] | None = None,
http_capture_headers_sanitize_fields: list[str] | None = None,
exclude_spans: list[typing.Literal["receive", "send"]] | None = None,
):
# initialize semantic conventions opt-in if needed
_OpenTelemetrySemanticConventionStability._initialize()
sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.HTTP,
)
self.app = guarantee_single_callable(app)
self.tracer = (
trace.get_tracer(
__name__,
__version__,
tracer_provider,
schema_url=_get_schema_url(sem_conv_opt_in_mode),
)
if tracer is None
else tracer
)
self.meter = (
get_meter(
__name__,
__version__,
meter_provider,
schema_url=_get_schema_url(sem_conv_opt_in_mode),
)
if meter is None
else meter
)
self.duration_histogram_old = None
if _report_old(sem_conv_opt_in_mode):
self.duration_histogram_old = self.meter.create_histogram(
name=MetricInstruments.HTTP_SERVER_DURATION,
unit="ms",
description="Measures the duration of inbound HTTP requests.",
)
self.duration_histogram_new = None
if _report_new(sem_conv_opt_in_mode):
self.duration_histogram_new = self.meter.create_histogram(
name=HTTP_SERVER_REQUEST_DURATION,
description="Duration of HTTP server requests.",
unit="s",
)
self.server_response_size_histogram = None
if _report_old(sem_conv_opt_in_mode):
self.server_response_size_histogram = self.meter.create_histogram(
name=MetricInstruments.HTTP_SERVER_RESPONSE_SIZE,
unit="By",
description="measures the size of HTTP response messages (compressed).",
)
self.server_response_body_size_histogram = None
if _report_new(sem_conv_opt_in_mode):
self.server_response_body_size_histogram = (
create_http_server_response_body_size(self.meter)
)
self.server_request_size_histogram = None
if _report_old(sem_conv_opt_in_mode):
self.server_request_size_histogram = self.meter.create_histogram(
name=MetricInstruments.HTTP_SERVER_REQUEST_SIZE,
unit="By",
description="Measures the size of HTTP request messages (compressed).",
)
self.server_request_body_size_histogram = None
if _report_new(sem_conv_opt_in_mode):
self.server_request_body_size_histogram = (
create_http_server_request_body_size(self.meter)
)
self.active_requests_counter = create_http_server_active_requests(
self.meter
)
self.excluded_urls = excluded_urls
self.default_span_details = (
default_span_details or get_default_span_details
)
self.server_request_hook = server_request_hook
self.client_request_hook = client_request_hook
self.client_response_hook = client_response_hook
self.content_length_header = None
self._sem_conv_opt_in_mode = sem_conv_opt_in_mode
# Environment variables as constructor parameters
self.http_capture_headers_server_request = (
http_capture_headers_server_request
or (
get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST
)
)
or None
)
self.http_capture_headers_server_response = (
http_capture_headers_server_response
or (
get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE
)
)
or None
)
self.http_capture_headers_sanitize_fields = SanitizeValue(
http_capture_headers_sanitize_fields
or (
get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS
)
)
or []
)
self.exclude_receive_span = (
"receive" in exclude_spans if exclude_spans else False
)
self.exclude_send_span = (
"send" in exclude_spans if exclude_spans else False
)
# pylint: disable=too-many-statements
async def __call__(
self,
scope: dict[str, Any],
receive: Callable[[], Awaitable[dict[str, Any]]],
send: Callable[[dict[str, Any]], Awaitable[None]],
) -> None:
"""The ASGI application
Args:
scope: An ASGI environment.
receive: An awaitable callable yielding dictionaries
send: An awaitable callable taking a single dictionary as argument.
"""
start = default_timer()
if scope["type"] not in ("http", "websocket"):
return await self.app(scope, receive, send)
_, _, url = get_host_port_url_tuple(scope)
if self.excluded_urls and self.excluded_urls.url_disabled(url):
return await self.app(scope, receive, send)
span_name, additional_attributes = self.default_span_details(scope)
attributes = collect_request_attributes(
scope, self._sem_conv_opt_in_mode
)
attributes.update(additional_attributes)
span, token = _start_internal_or_server_span(
tracer=self.tracer,
span_name=span_name,
start_time=None,
context_carrier=scope,
context_getter=asgi_getter,
attributes=attributes,
)
active_requests_count_attrs = _parse_active_request_count_attrs(
attributes,
self._sem_conv_opt_in_mode,
)
if scope["type"] == "http":
self.active_requests_counter.add(1, active_requests_count_attrs)
try:
with trace.use_span(span, end_on_exit=False) as current_span:
if current_span.is_recording():
for key, value in attributes.items():
current_span.set_attribute(key, value)
if current_span.kind == trace.SpanKind.SERVER:
custom_attributes = (
collect_custom_headers_attributes(
scope,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_request,
normalise_request_header_name,
)
if self.http_capture_headers_server_request
else {}
)
if len(custom_attributes) > 0:
current_span.set_attributes(custom_attributes)
if callable(self.server_request_hook):
self.server_request_hook(current_span, scope)
otel_receive = self._get_otel_receive(
span_name, scope, receive
)
otel_send = self._get_otel_send(
current_span,
span_name,
scope,
send,
attributes,
)
await self.app(scope, otel_receive, otel_send)
finally:
if scope["type"] == "http":
target = _collect_target_attribute(scope)
if target:
path, query = _parse_url_query(target)
_set_http_target(
attributes,
target,
path,
query,
self._sem_conv_opt_in_mode,
)
duration_s = default_timer() - start
duration_attrs_old = _parse_duration_attrs(
attributes, _StabilityMode.DEFAULT
)
if target:
duration_attrs_old[SpanAttributes.HTTP_TARGET] = target
duration_attrs_new = _parse_duration_attrs(
attributes, _StabilityMode.HTTP
)
if self.duration_histogram_old:
self.duration_histogram_old.record(
max(round(duration_s * 1000), 0), duration_attrs_old
)
if self.duration_histogram_new:
self.duration_histogram_new.record(
max(duration_s, 0), duration_attrs_new
)
self.active_requests_counter.add(
-1, active_requests_count_attrs
)
if self.content_length_header:
if self.server_response_size_histogram:
self.server_response_size_histogram.record(
self.content_length_header, duration_attrs_old
)
if self.server_response_body_size_histogram:
self.server_response_body_size_histogram.record(
self.content_length_header, duration_attrs_new
)
request_size = asgi_getter.get(scope, "content-length")
if request_size:
try:
request_size_amount = int(request_size[0])
except ValueError:
pass
else:
if self.server_request_size_histogram:
self.server_request_size_histogram.record(
request_size_amount, duration_attrs_old
)
if self.server_request_body_size_histogram:
self.server_request_body_size_histogram.record(
request_size_amount, duration_attrs_new
)
if token:
context.detach(token)
if span.is_recording():
span.end()
# pylint: enable=too-many-branches
def _get_otel_receive(self, server_span_name, scope, receive):
if self.exclude_receive_span:
return receive
@wraps(receive)
async def otel_receive():
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "receive"))
) as receive_span:
message = await receive()
if callable(self.client_request_hook):
self.client_request_hook(receive_span, scope, message)
if receive_span.is_recording():
if message["type"] == "websocket.receive":
set_status_code(
receive_span,
200,
None,
self._sem_conv_opt_in_mode,
)
receive_span.set_attribute(
"asgi.event.type", message["type"]
)
return message
return otel_receive
def _set_send_span(
self,
server_span_name,
scope,
send,
message,
status_code,
expecting_trailers,
):
"""Set send span attributes and status code."""
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, scope, message)
if send_span.is_recording():
if message["type"] == "http.response.start":
expecting_trailers = message.get("trailers", False)
send_span.set_attribute("asgi.event.type", message["type"])
if status_code:
set_status_code(
send_span,
status_code,
None,
self._sem_conv_opt_in_mode,
)
return expecting_trailers
def _set_server_span(
self, server_span, message, status_code, duration_attrs
):
"""Set server span attributes and status code."""
if (
server_span.is_recording()
and server_span.kind == trace.SpanKind.SERVER
and "headers" in message
):
custom_response_attributes = (
collect_custom_headers_attributes(
message,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_response,
normalise_response_header_name,
)
if self.http_capture_headers_server_response
else {}
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(custom_response_attributes)
if status_code:
set_status_code(
server_span,
status_code,
duration_attrs,
self._sem_conv_opt_in_mode,
)
def _get_otel_send(
self,
server_span,
server_span_name,
scope,
send,
duration_attrs,
):
expecting_trailers = False
@wraps(send)
async def otel_send(message: dict[str, Any]):
nonlocal expecting_trailers
status_code = None
if message["type"] == "http.response.start":
status_code = message["status"]
elif message["type"] == "websocket.send":
status_code = 200
if not self.exclude_send_span:
expecting_trailers = self._set_send_span(
server_span_name,
scope,
send,
message,
status_code,
expecting_trailers,
)
self._set_server_span(
server_span, message, status_code, duration_attrs
)
propagator = get_global_response_propagator()
if propagator:
propagator.inject(
message,
context=set_span_in_context(
server_span, trace.context_api.Context()
),
setter=asgi_setter,
)
content_length = asgi_getter.get(message, "content-length")
if content_length:
try:
self.content_length_header = int(content_length[0])
except ValueError:
pass
await send(message)
# pylint: disable=too-many-boolean-expressions
if (
not expecting_trailers
and message["type"] == "http.response.body"
and not message.get("more_body", False)
) or (
expecting_trailers
and message["type"] == "http.response.trailers"
and not message.get("more_trailers", False)
):
server_span.end()
return otel_send
def _parse_duration_attrs(
req_attrs, sem_conv_opt_in_mode=_StabilityMode.DEFAULT
):
return _filter_semconv_duration_attrs(
req_attrs,
_server_duration_attrs_old,
_server_duration_attrs_new,
sem_conv_opt_in_mode,
)
def _parse_active_request_count_attrs(
req_attrs, sem_conv_opt_in_mode=_StabilityMode.DEFAULT
):
return _filter_semconv_active_request_count_attr(
req_attrs,
_server_active_requests_count_attrs_old,
_server_active_requests_count_attrs_new,
sem_conv_opt_in_mode,
)
def _decode_header_item(value):
try:
return value.decode("utf-8")
except ValueError:
# ASGI header encoding specs, see:
# - https://asgi.readthedocs.io/en/latest/specs/www.html#wsgi-encoding-differences (see: WSGI encoding differences)
# - https://docs.python.org/3/library/codecs.html#text-encodings (see: Text Encodings)
return value.decode("unicode_escape")