# Copyright (c) Microsoft Corporation. # All rights reserved. # # This code is licensed under the MIT License. import json import logging import os import socket import sys import time from urllib.parse import urlparse # Python 3+ from collections import UserDict # Python 3+ from typing import Optional, Union # Needed in Python 3.7 & 3.8 from .token_cache import TokenCache from .individual_cache import _IndividualCache as IndividualCache from .throttled_http_client import ThrottledHttpClientBase, RetryAfterParser from .cloudshell import _is_running_in_cloud_shell logger = logging.getLogger(__name__) class ManagedIdentityError(ValueError): pass class ManagedIdentity(UserDict): """Feed an instance of this class to :class:`msal.ManagedIdentityClient` to acquire token for the specified managed identity. """ # The key names used in config dict ID_TYPE = "ManagedIdentityIdType" # Contains keyword ManagedIdentity so its json equivalent will be more readable ID = "Id" # Valid values for key ID_TYPE CLIENT_ID = "ClientId" RESOURCE_ID = "ResourceId" OBJECT_ID = "ObjectId" SYSTEM_ASSIGNED = "SystemAssigned" _types_mapping = { # Maps type name in configuration to type name on wire CLIENT_ID: "client_id", RESOURCE_ID: "msi_res_id", # VM's IMDS prefers msi_res_id https://github.com/Azure/azure-rest-api-specs/blob/dba6ed1f03bda88ac6884c0a883246446cc72495/specification/imds/data-plane/Microsoft.InstanceMetadataService/stable/2018-10-01/imds.json#L233-L239 OBJECT_ID: "object_id", } @classmethod def is_managed_identity(cls, unknown): return (isinstance(unknown, ManagedIdentity) or cls.is_system_assigned(unknown) or cls.is_user_assigned(unknown)) @classmethod def is_system_assigned(cls, unknown): return isinstance(unknown, SystemAssignedManagedIdentity) or ( isinstance(unknown, dict) and unknown.get(cls.ID_TYPE) == cls.SYSTEM_ASSIGNED) @classmethod def is_user_assigned(cls, unknown): return isinstance(unknown, UserAssignedManagedIdentity) or ( isinstance(unknown, dict) and unknown.get(cls.ID_TYPE) in cls._types_mapping and unknown.get(cls.ID)) def __init__(self, identifier=None, id_type=None): # Undocumented. Use subclasses instead. super(ManagedIdentity, self).__init__({ self.ID_TYPE: id_type, self.ID: identifier, }) class SystemAssignedManagedIdentity(ManagedIdentity): """Represent a system-assigned managed identity. It is equivalent to a Python dict of:: {"ManagedIdentityIdType": "SystemAssigned", "Id": None} or a JSON blob of:: {"ManagedIdentityIdType": "SystemAssigned", "Id": null} """ def __init__(self): super(SystemAssignedManagedIdentity, self).__init__(id_type=self.SYSTEM_ASSIGNED) class UserAssignedManagedIdentity(ManagedIdentity): """Represent a user-assigned managed identity. Depends on the id you provided, the outcome is equivalent to one of the below:: {"ManagedIdentityIdType": "ClientId", "Id": "foo"} {"ManagedIdentityIdType": "ResourceId", "Id": "foo"} {"ManagedIdentityIdType": "ObjectId", "Id": "foo"} """ def __init__(self, *, client_id=None, resource_id=None, object_id=None): if client_id and not resource_id and not object_id: super(UserAssignedManagedIdentity, self).__init__( id_type=self.CLIENT_ID, identifier=client_id) elif not client_id and resource_id and not object_id: super(UserAssignedManagedIdentity, self).__init__( id_type=self.RESOURCE_ID, identifier=resource_id) elif not client_id and not resource_id and object_id: super(UserAssignedManagedIdentity, self).__init__( id_type=self.OBJECT_ID, identifier=object_id) else: raise ManagedIdentityError( "You shall specify one of the three parameters: " "client_id, resource_id, object_id") class _ThrottledHttpClient(ThrottledHttpClientBase): def __init__(self, http_client, **kwargs): super(_ThrottledHttpClient, self).__init__(http_client, **kwargs) self.get = IndividualCache( # All MIs (except Cloud Shell) use GETs mapping=self._expiring_mapping, key_maker=lambda func, args, kwargs: "REQ {} hash={} 429/5xx/Retry-After".format( args[0], # It is the endpoint, typically a constant per MI type self._hash( # Managed Identity flavors have inconsistent parameters. # We simply choose to hash them all. str(kwargs.get("params")) + str(kwargs.get("data"))), ), expires_in=RetryAfterParser(5).parse, # 5 seconds default for non-PCA )(http_client.get) class ManagedIdentityClient(object): """This API encapsulates multiple managed identity back-ends: VM, App Service, Azure Automation (Runbooks), Azure Function, Service Fabric, and Azure Arc. It also provides token cache support. .. note:: Cloud Shell support is NOT implemented in this class. Since MSAL Python 1.18 in May 2022, it has been implemented in :func:`PublicClientApplication.acquire_token_interactive` via calling pattern ``PublicClientApplication(...).acquire_token_interactive(scopes=[...], prompt="none")``. That is appropriate, because Cloud Shell yields a token with delegated permissions for the end user who has signed in to the Azure Portal (like what a ``PublicClientApplication`` does), not a token with application permissions for an app. """ __instance, _tenant = None, "managed_identity" # Placeholders _TOKEN_SOURCE = "token_source" _TOKEN_SOURCE_IDP = "identity_provider" _TOKEN_SOURCE_CACHE = "cache" def __init__( self, managed_identity: Union[ dict, ManagedIdentity, # Could use Type[ManagedIdentity] but it is deprecated in Python 3.9+ SystemAssignedManagedIdentity, UserAssignedManagedIdentity, ], *, http_client, token_cache=None, http_cache=None, ): """Create a managed identity client. :param managed_identity: It accepts an instance of :class:`SystemAssignedManagedIdentity` or :class:`UserAssignedManagedIdentity`. They are equivalent to a dict with a certain shape, which may be loaded from a JSON configuration file or an env var. :param http_client: An http client object. For example, you can use ``requests.Session()``, optionally with exponential backoff behavior demonstrated in this recipe:: import msal, requests from requests.adapters import HTTPAdapter, Retry s = requests.Session() retries = Retry(total=3, backoff_factor=0.1, status_forcelist=[ 429, 500, 501, 502, 503, 504]) s.mount('https://', HTTPAdapter(max_retries=retries)) managed_identity = ... client = msal.ManagedIdentityClient(managed_identity, http_client=s) :param token_cache: Optional. It accepts a :class:`msal.TokenCache` instance to store tokens. It will use an in-memory token cache by default. :param http_cache: Optional. It has the same characteristics as the :paramref:`msal.ClientApplication.http_cache`. Recipe 1: Hard code a managed identity for your app:: import msal, requests client = msal.ManagedIdentityClient( msal.UserAssignedManagedIdentity(client_id="foo"), http_client=requests.Session(), ) token = client.acquire_token_for_client("resource") Recipe 2: Write once, run everywhere. If you use different managed identity on different deployment, you may use an environment variable (such as MY_MANAGED_IDENTITY_CONFIG) to store a json blob like ``{"ManagedIdentityIdType": "ClientId", "Id": "foo"}`` or ``{"ManagedIdentityIdType": "SystemAssigned", "Id": null}``. The following app can load managed identity configuration dynamically:: import json, os, msal, requests config = os.getenv("MY_MANAGED_IDENTITY_CONFIG") assert config, "An ENV VAR with value should exist" client = msal.ManagedIdentityClient( json.loads(config), http_client=requests.Session(), ) token = client.acquire_token_for_client("resource") """ if not ManagedIdentity.is_managed_identity(managed_identity): raise ManagedIdentityError( f"Incorrect managed_identity: {managed_identity}") self._managed_identity = managed_identity self._http_client = _ThrottledHttpClient( # This class only throttles excess token acquisition requests. # It does not provide retry. # Retry is the http_client or caller's responsibility, not MSAL's. # # FWIW, here is the inconsistent retry recommendation. # 1. Only MI on VM defines exotic 404 and 410 retry recommendations # ( https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling ) # (especially for 410 which was supposed to be a permanent failure). # 2. MI on Service Fabric specifically suggests to not retry on 404. # ( https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-cluster-managed-identity-service-fabric-app-code#error-handling ) http_client.http_client # Patch the raw (unpatched) http client if isinstance(http_client, ThrottledHttpClientBase) else http_client, http_cache=http_cache, ) self._token_cache = token_cache or TokenCache() def _get_instance(self): if self.__instance is None: self.__instance = socket.getfqdn() # Moved from class definition to here return self.__instance def acquire_token_for_client( self, *, resource: str, # If/when we support scope, resource will become optional claims_challenge: Optional[str] = None, ): """Acquire token for the managed identity. The result will be automatically cached. Subsequent calls will automatically search from cache first. :param resource: The resource for which the token is acquired. :param claims_challenge: Optional. It is a string representation of a JSON object (which contains lists of claims being requested). The tenant admin may choose to revoke all Managed Identity tokens, and then a *claims challenge* will be returned by the target resource, as a `claims_challenge` directive in the `www-authenticate` header, even if the app developer did not opt in for the "CP1" client capability. Upon receiving a `claims_challenge`, MSAL will skip a token cache read, and will attempt to acquire a new token. .. note:: Known issue: When an Azure VM has only one user-assigned managed identity, and your app specifies to use system-assigned managed identity, Azure VM may still return a token for your user-assigned identity. This is a service-side behavior that cannot be changed by this library. `Azure VM docs <https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http>`_ """ access_token_from_cache = None client_id_in_cache = self._managed_identity.get( ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY") now = time.time() if not claims_challenge: # Then attempt token cache search matches = self._token_cache.find( self._token_cache.CredentialType.ACCESS_TOKEN, target=[resource], query=dict( client_id=client_id_in_cache, environment=self._get_instance(), realm=self._tenant, home_account_id=None, ), ) for entry in matches: expires_in = int(entry["expires_on"]) - now if expires_in < 5*60: # Then consider it expired continue # Removal is not necessary, it will be overwritten logger.debug("Cache hit an AT") access_token_from_cache = { # Mimic a real response "access_token": entry["secret"], "token_type": entry.get("token_type", "Bearer"), "expires_in": int(expires_in), # OAuth2 specs defines it as int self._TOKEN_SOURCE: self._TOKEN_SOURCE_CACHE, } if "refresh_on" in entry: access_token_from_cache["refresh_on"] = int(entry["refresh_on"]) if int(entry["refresh_on"]) < now: # aging break # With a fallback in hand, we break here to go refresh return access_token_from_cache # It is still good as new try: result = _obtain_token(self._http_client, self._managed_identity, resource) if "access_token" in result: expires_in = result.get("expires_in", 3600) if "refresh_in" not in result and expires_in >= 7200: result["refresh_in"] = int(expires_in / 2) self._token_cache.add(dict( client_id=client_id_in_cache, scope=[resource], token_endpoint="https://{}/{}".format( self._get_instance(), self._tenant), response=result, params={}, data={}, )) if "refresh_in" in result: result["refresh_on"] = int(now + result["refresh_in"]) result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP if (result and "error" not in result) or (not access_token_from_cache): return result except: # The exact HTTP exception is transportation-layer dependent # Typically network error. Potential AAD outage? if not access_token_from_cache: # It means there is no fall back option raise # We choose to bubble up the exception return access_token_from_cache def _scope_to_resource(scope): # This is an experimental reasonable-effort approach u = urlparse(scope) if u.scheme: return "{}://{}".format(u.scheme, u.netloc) return scope # There is no much else we can do here def _get_arc_endpoint(): if "IDENTITY_ENDPOINT" in os.environ and "IMDS_ENDPOINT" in os.environ: return os.environ["IDENTITY_ENDPOINT"] if ( # Defined in https://eng.ms/docs/cloud-ai-platform/azure-core/azure-management-and-platforms/control-plane-bburns/hybrid-resource-provider/azure-arc-for-servers/specs/extension_authoring sys.platform == "linux" and os.path.exists("/opt/azcmagent/bin/himds") or sys.platform == "win32" and os.path.exists(os.path.expandvars( # Avoid Windows-only "%EnvVar%" syntax so that tests can be run on Linux r"${ProgramFiles}\AzureConnectedMachineAgent\himds.exe" )) ): return "http://localhost:40342/metadata/identity/oauth2/token" APP_SERVICE = object() AZURE_ARC = object() CLOUD_SHELL = object() # In MSAL Python, token acquisition was done by # PublicClientApplication(...).acquire_token_interactive(..., prompt="none") MACHINE_LEARNING = object() SERVICE_FABRIC = object() DEFAULT_TO_VM = object() # Unknown environment; default to VM; you may want to probe def get_managed_identity_source(): """Detect the current environment and return the likely identity source. When this function returns ``CLOUD_SHELL``, you should use :func:`msal.PublicClientApplication.acquire_token_interactive` with ``prompt="none"`` to obtain a token. """ if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ and "IDENTITY_SERVER_THUMBPRINT" in os.environ ): return SERVICE_FABRIC if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ: return APP_SERVICE if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ: return MACHINE_LEARNING if _get_arc_endpoint(): return AZURE_ARC if _is_running_in_cloud_shell(): return CLOUD_SHELL return DEFAULT_TO_VM def _obtain_token(http_client, managed_identity, resource): # A unified low-level API that talks to different Managed Identity if ("IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ and "IDENTITY_SERVER_THUMBPRINT" in os.environ ): if managed_identity: logger.debug( "Ignoring managed_identity parameter. " "Managed Identity in Service Fabric is configured in the cluster, " "not during runtime. See also " "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service") return _obtain_token_on_service_fabric( http_client, os.environ["IDENTITY_ENDPOINT"], os.environ["IDENTITY_HEADER"], os.environ["IDENTITY_SERVER_THUMBPRINT"], resource, ) if "IDENTITY_ENDPOINT" in os.environ and "IDENTITY_HEADER" in os.environ: return _obtain_token_on_app_service( http_client, os.environ["IDENTITY_ENDPOINT"], os.environ["IDENTITY_HEADER"], managed_identity, resource, ) if "MSI_ENDPOINT" in os.environ and "MSI_SECRET" in os.environ: # Back ported from https://github.com/Azure/azure-sdk-for-python/blob/azure-identity_1.15.0/sdk/identity/azure-identity/azure/identity/_credentials/azure_ml.py return _obtain_token_on_machine_learning( http_client, os.environ["MSI_ENDPOINT"], os.environ["MSI_SECRET"], managed_identity, resource, ) arc_endpoint = _get_arc_endpoint() if arc_endpoint: if ManagedIdentity.is_user_assigned(managed_identity): raise ManagedIdentityError( # Note: Azure Identity for Python raised exception too "Invalid managed_identity parameter. " "Azure Arc supports only system-assigned managed identity, " "See also " "https://learn.microsoft.com/en-us/azure/service-fabric/configure-existing-cluster-enable-managed-identity-token-service") return _obtain_token_on_arc(http_client, arc_endpoint, resource) return _obtain_token_on_azure_vm(http_client, managed_identity, resource) def _adjust_param(params, managed_identity, types_mapping=None): # Modify the params dict in place id_name = (types_mapping or ManagedIdentity._types_mapping).get( managed_identity.get(ManagedIdentity.ID_TYPE)) if id_name: params[id_name] = managed_identity[ManagedIdentity.ID] def _obtain_token_on_azure_vm(http_client, managed_identity, resource): # Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http logger.debug("Obtaining token via managed identity on Azure VM") params = { "api-version": "2018-02-01", "resource": resource, } _adjust_param(params, managed_identity) resp = http_client.get( os.getenv( "AZURE_POD_IDENTITY_AUTHORITY_HOST", "http://169.254.169.254" ).strip("/") + "/metadata/identity/oauth2/token", params=params, headers={"Metadata": "true"}, ) try: payload = json.loads(resp.text) if payload.get("access_token") and payload.get("expires_in"): return { # Normalizing the payload into OAuth2 format "access_token": payload["access_token"], "expires_in": int(payload["expires_in"]), "resource": payload.get("resource"), "token_type": payload.get("token_type", "Bearer"), } return payload # It would be {"error": ..., "error_description": ...} according to https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/how-to-use-vm-token#error-handling except json.decoder.JSONDecodeError: logger.debug("IMDS emits unexpected payload: %s", resp.text) raise def _obtain_token_on_app_service( http_client, endpoint, identity_header, managed_identity, resource, ): """Obtains token for `App Service <https://learn.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp#rest-endpoint-reference>`_, Azure Functions, and Azure Automation. """ # Prerequisite: Create your app service https://docs.microsoft.com/en-us/azure/app-service/quickstart-python # Assign it a managed identity https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity?tabs=portal%2Chttp # SSH into your container for testing https://docs.microsoft.com/en-us/azure/app-service/configure-linux-open-ssh-session logger.debug("Obtaining token via managed identity on Azure App Service") params = { "api-version": "2019-08-01", "resource": resource, } _adjust_param(params, managed_identity, types_mapping={ ManagedIdentity.CLIENT_ID: "client_id", ManagedIdentity.RESOURCE_ID: "mi_res_id", # App Service's resource id uses "mi_res_id" ManagedIdentity.OBJECT_ID: "object_id", }) resp = http_client.get( endpoint, params=params, headers={ "X-IDENTITY-HEADER": identity_header, "Metadata": "true", # Unnecessary yet harmless for App Service, # It will be needed by Azure Automation # https://docs.microsoft.com/en-us/azure/automation/enable-managed-identity-for-automation#get-access-token-for-system-assigned-managed-identity-using-http-get }, ) try: payload = json.loads(resp.text) if payload.get("access_token") and payload.get("expires_on"): return { # Normalizing the payload into OAuth2 format "access_token": payload["access_token"], "expires_in": int(payload["expires_on"]) - int(time.time()), "resource": payload.get("resource"), "token_type": payload.get("token_type", "Bearer"), } return { "error": "invalid_scope", # Empirically, wrong resource ends up with a vague statusCode=500 "error_description": "{}, {}".format( payload.get("statusCode"), payload.get("message")), } except json.decoder.JSONDecodeError: logger.debug("IMDS emits unexpected payload: %s", resp.text) raise def _obtain_token_on_machine_learning( http_client, endpoint, secret, managed_identity, resource, ): # Could not find protocol docs from https://docs.microsoft.com/en-us/azure/machine-learning # The following implementation is back ported from Azure Identity 1.15.0 logger.debug("Obtaining token via managed identity on Azure Machine Learning") params = {"api-version": "2017-09-01", "resource": resource} _adjust_param(params, managed_identity) if params["api-version"] == "2017-09-01" and "client_id" in params: # Workaround for a known bug in Azure ML 2017 API params["clientid"] = params.pop("client_id") resp = http_client.get( endpoint, params=params, headers={"secret": secret}, ) try: payload = json.loads(resp.text) if payload.get("access_token") and payload.get("expires_on"): return { # Normalizing the payload into OAuth2 format "access_token": payload["access_token"], "expires_in": int(payload["expires_on"]) - int(time.time()), "resource": payload.get("resource"), "token_type": payload.get("token_type", "Bearer"), } return { "error": "invalid_scope", # TODO: To be tested "error_description": "{}".format(payload), } except json.decoder.JSONDecodeError: logger.debug("IMDS emits unexpected payload: %s", resp.text) raise def _obtain_token_on_service_fabric( http_client, endpoint, identity_header, server_thumbprint, resource, ): """Obtains token for `Service Fabric <https://learn.microsoft.com/en-us/azure/service-fabric/>`_ """ # Deployment https://learn.microsoft.com/en-us/azure/service-fabric/service-fabric-get-started-containers-linux # See also https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/tests/managed-identity-live/service-fabric/service_fabric.md # Protocol https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#acquiring-an-access-token-using-rest-api logger.debug("Obtaining token via managed identity on Azure Service Fabric") resp = http_client.get( endpoint, params={"api-version": "2019-07-01-preview", "resource": resource}, headers={"Secret": identity_header}, ) try: payload = json.loads(resp.text) if payload.get("access_token") and payload.get("expires_on"): return { # Normalizing the payload into OAuth2 format "access_token": payload["access_token"], "expires_in": int( # Despite the example in docs shows an integer, payload["expires_on"] # Azure SDK team's test obtained a string. ) - int(time.time()), "resource": payload.get("resource"), "token_type": payload["token_type"], } error = payload.get("error", {}) # https://learn.microsoft.com/en-us/azure/service-fabric/how-to-managed-identity-service-fabric-app-code#error-handling error_mapping = { # Map Service Fabric errors into OAuth2 errors https://www.rfc-editor.org/rfc/rfc6749#section-5.2 "SecretHeaderNotFound": "unauthorized_client", "ManagedIdentityNotFound": "invalid_client", "ArgumentNullOrEmpty": "invalid_scope", } return { "error": error_mapping.get(payload["error"]["code"], "invalid_request"), "error_description": resp.text, } except json.decoder.JSONDecodeError: logger.debug("IMDS emits unexpected payload: %s", resp.text) raise _supported_arc_platforms_and_their_prefixes = { "linux": "/var/opt/azcmagent/tokens", "win32": os.path.expandvars(r"%ProgramData%\AzureConnectedMachineAgent\Tokens"), } class ArcPlatformNotSupportedError(ManagedIdentityError): pass def _obtain_token_on_arc(http_client, endpoint, resource): # https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication logger.debug("Obtaining token via managed identity on Azure Arc") resp = http_client.get( endpoint, params={"api-version": "2020-06-01", "resource": resource}, headers={"Metadata": "true"}, ) www_auth = "www-authenticate" # Header in lower case challenge = { # Normalized to lowercase, because header names are case-insensitive # https://datatracker.ietf.org/doc/html/rfc7230#section-3.2 k.lower(): v for k, v in resp.headers.items() if k.lower() == www_auth }.get(www_auth, "").split("=") # Output will be ["Basic realm", "content"] if not ( # https://datatracker.ietf.org/doc/html/rfc7617#section-2 len(challenge) == 2 and challenge[0].lower() == "basic realm"): raise ManagedIdentityError( "Unrecognizable WWW-Authenticate header: {}".format(resp.headers)) if sys.platform not in _supported_arc_platforms_and_their_prefixes: raise ArcPlatformNotSupportedError( f"Platform {sys.platform} was undefined and unsupported") filename = os.path.join( # This algorithm is documented in an internal doc https://msazure.visualstudio.com/One/_wiki/wikis/One.wiki/233012/VM-Extension-Authoring-for-Arc?anchor=2.-obtaining-tokens _supported_arc_platforms_and_their_prefixes[sys.platform], os.path.splitext(os.path.basename(challenge[1]))[0] + ".key") if os.stat(filename).st_size > 4096: # Check size BEFORE loading its content raise ManagedIdentityError("Local key file shall not be larger than 4KB") with open(filename) as f: secret = f.read() response = http_client.get( endpoint, params={"api-version": "2020-06-01", "resource": resource}, headers={"Metadata": "true", "Authorization": "Basic {}".format(secret)}, ) try: payload = json.loads(response.text) if payload.get("access_token") and payload.get("expires_in"): # Example: https://learn.microsoft.com/en-us/azure/azure-arc/servers/media/managed-identity-authentication/bash-token-output-example.png return { "access_token": payload["access_token"], "expires_in": int(payload["expires_in"]), "token_type": payload.get("token_type", "Bearer"), "resource": payload.get("resource"), } except json.decoder.JSONDecodeError: pass return { "error": "invalid_request", "error_description": response.text, }
Memory