import json import re import sys from typing import Any, Dict, List, Tuple, Union from urllib.parse import unquote from moto.core.responses import TYPE_RESPONSE, BaseResponse from moto.utilities.aws_headers import amz_crc32 from moto.utilities.utils import ARN_PARTITION_REGEX from .exceptions import FunctionAlreadyExists, UnknownFunctionException from .models import LambdaBackend from .utils import get_backend class LambdaResponse(BaseResponse): def __init__(self) -> None: super().__init__(service_name="awslambda") @property def json_body(self) -> Dict[str, Any]: # type: ignore[misc] return json.loads(self.body) @property def backend(self) -> LambdaBackend: return get_backend(self.current_account, self.region) def add_permission(self) -> str: function_name = unquote(self.path.split("/")[-2]) qualifier = self.querystring.get("Qualifier", [None])[0] statement = self.body statement = self.backend.add_permission(function_name, qualifier, statement) return json.dumps({"Statement": json.dumps(statement)}) def get_policy(self) -> str: function_name = unquote(self.path.split("/")[-2]) qualifier = self.querystring.get("Qualifier", [None])[0] return self.backend.get_policy(function_name, qualifier) def remove_permission(self) -> TYPE_RESPONSE: function_name = unquote(self.path.split("/")[-3]) statement_id = self.path.split("/")[-1].split("?")[0] revision = self.querystring.get("RevisionId", "") if self.backend.get_function(function_name): self.backend.remove_permission(function_name, statement_id, revision) return 204, {"status": 204}, "{}" else: return 404, {"status": 404}, "{}" @amz_crc32 def invoke(self) -> Tuple[int, Dict[str, str], Union[str, bytes]]: response_headers: Dict[str, str] = {} # URL Decode in case it's a ARN: function_name = unquote(self.path.rsplit("/", 2)[-2]) qualifier = self._get_param("qualifier") payload = self.backend.invoke( function_name, qualifier, self.body, self.headers, response_headers ) if payload is not None: if self.headers.get("X-Amz-Invocation-Type") != "Event": if sys.getsizeof(payload) > 6000000: response_headers["Content-Length"] = "142" response_headers["x-amz-function-error"] = "Unhandled" error_dict = { "errorMessage": "Response payload size exceeded maximum allowed payload size (6291556 bytes).", "errorType": "Function.ResponseSizeTooLarge", } payload = json.dumps(error_dict).encode("utf-8") response_headers["content-type"] = "application/json" if self.headers.get("X-Amz-Invocation-Type") == "Event": status_code = 202 response_headers["status"] = "202" elif self.headers.get("X-Amz-Invocation-Type") == "DryRun": status_code = 204 response_headers["status"] = "204" else: if ( self.headers.get("X-Amz-Log-Type") != "Tail" and "x-amz-log-result" in response_headers ): del response_headers["x-amz-log-result"] status_code = 200 return status_code, response_headers, payload else: return 404, response_headers, "{}" @amz_crc32 def invoke_async(self) -> Tuple[int, Dict[str, str], Union[str, bytes]]: response_headers: Dict[str, Any] = {} function_name = unquote(self.path.rsplit("/", 3)[-3]) fn = self.backend.get_function(function_name, None) payload = fn.invoke(self.body, self.headers, response_headers) response_headers["Content-Length"] = str(len(payload)) response_headers["status"] = 202 return 202, response_headers, payload def list_functions(self) -> str: querystring = self.querystring func_version = querystring.get("FunctionVersion", [None])[0] result: Dict[str, List[Dict[str, Any]]] = {"Functions": []} for fn in self.backend.list_functions(func_version): json_data = fn.get_configuration() result["Functions"].append(json_data) return json.dumps(result) def list_versions_by_function(self) -> str: function_name = self.path.split("/")[-2] result: Dict[str, Any] = {"Versions": []} functions = self.backend.list_versions_by_function(function_name) for fn in functions: json_data = fn.get_configuration() result["Versions"].append(json_data) return json.dumps(result) def list_aliases(self) -> TYPE_RESPONSE: path = self.path function_name = path.split("/")[-2] result: Dict[str, Any] = {"Aliases": []} aliases = self.backend.list_aliases(function_name) for alias in aliases: json_data = alias.to_json() result["Aliases"].append(json_data) return 200, {}, json.dumps(result) def create_function(self) -> TYPE_RESPONSE: function_name = self.json_body["FunctionName"].rsplit(":", 1)[-1] try: self.backend.get_function(function_name, None) except UnknownFunctionException: fn = self.backend.create_function(self.json_body) config = fn.get_configuration(on_create=True) return 201, {"status": 201}, json.dumps(config) raise FunctionAlreadyExists(function_name) def create_function_url_config(self) -> TYPE_RESPONSE: function_name = unquote(self.path.split("/")[-2]) config = self.backend.create_function_url_config(function_name, self.json_body) return 201, {"status": 201}, json.dumps(config.to_dict()) def delete_function_url_config(self) -> TYPE_RESPONSE: function_name = unquote(self.path.split("/")[-2]) self.backend.delete_function_url_config(function_name) return 204, {"status": 204}, "{}" def get_function_url_config(self) -> TYPE_RESPONSE: function_name = unquote(self.path.split("/")[-2]) config = self.backend.get_function_url_config(function_name) return 201, {"status": 201}, json.dumps(config.to_dict()) def update_function_url_config(self) -> str: function_name = unquote(self.path.split("/")[-2]) config = self.backend.update_function_url_config(function_name, self.json_body) return json.dumps(config.to_dict()) def create_event_source_mapping(self) -> TYPE_RESPONSE: fn = self.backend.create_event_source_mapping(self.json_body) config = fn.get_configuration() return 201, {"status": 201}, json.dumps(config) def list_event_source_mappings(self) -> str: event_source_arn = self.querystring.get("EventSourceArn", [None])[0] function_name = self.querystring.get("FunctionName", [None])[0] esms = self.backend.list_event_source_mappings(event_source_arn, function_name) result = {"EventSourceMappings": [esm.get_configuration() for esm in esms]} return json.dumps(result) def get_event_source_mapping(self) -> TYPE_RESPONSE: uuid = self.path.split("/")[-1] result = self.backend.get_event_source_mapping(uuid) if result: return 200, {}, json.dumps(result.get_configuration()) else: err = { "Type": "User", "Message": "The resource you requested does not exist.", } headers = {"x-amzn-errortype": "ResourceNotFoundException", "status": 404} return 404, headers, json.dumps(err) def update_event_source_mapping(self) -> TYPE_RESPONSE: uuid = self.path.split("/")[-1] result = self.backend.update_event_source_mapping(uuid, self.json_body) if result: return 202, {"status": 202}, json.dumps(result.get_configuration()) else: return 404, {}, "{}" def delete_event_source_mapping(self) -> TYPE_RESPONSE: uuid = self.path.split("/")[-1] esm = self.backend.delete_event_source_mapping(uuid) if esm: json_result = esm.get_configuration() json_result.update({"State": "Deleting"}) return 202, {"status": 202}, json.dumps(json_result) else: return 404, {}, "{}" def publish_version(self) -> TYPE_RESPONSE: function_name = unquote(self.path.split("/")[-2]) description = self._get_param("Description") fn = self.backend.publish_version(function_name, description) config = fn.get_configuration() # type: ignore[union-attr] return 201, {"status": 201}, json.dumps(config) def delete_function(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 1)[-1]) qualifier = self._get_param("Qualifier", None) self.backend.delete_function(function_name, qualifier) return 204, {"status": 204}, "" @staticmethod def _set_configuration_qualifier( # type: ignore[misc] configuration: Dict[str, Any], function_name: str, qualifier: str ) -> Dict[str, Any]: # Qualifier may be explicitly passed or part of function name or ARN, extract it here if re.match(ARN_PARTITION_REGEX, function_name): # Extract from ARN if ":" in function_name.split(":function:")[-1]: qualifier = function_name.split(":")[-1] else: # Extract from function name if ":" in function_name: qualifier = function_name.split(":")[1] if qualifier is None or qualifier == "$LATEST": configuration["Version"] = "$LATEST" if qualifier == "$LATEST": configuration["FunctionArn"] += ":$LATEST" return configuration def get_function(self) -> str: function_name = unquote(self.path.rsplit("/", 1)[-1]) qualifier = self._get_param("Qualifier", None) fn = self.backend.get_function(function_name, qualifier) code = fn.get_code() code["Configuration"] = self._set_configuration_qualifier( code["Configuration"], function_name, qualifier ) return json.dumps(code) def get_function_configuration(self) -> str: function_name = unquote(self.path.rsplit("/", 2)[-2]) qualifier = self._get_param("Qualifier", None) fn = self.backend.get_function(function_name, qualifier) resp = self._set_configuration_qualifier( fn.get_configuration(), function_name, qualifier ) return json.dumps(resp) def _get_aws_region(self, full_url: str) -> str: region = self.region_regex.search(full_url) if region: return region.group(1) else: return self.default_region def list_tags(self) -> str: function_arn = unquote(self.path.rsplit("/", 1)[-1]) tags = self.backend.list_tags(function_arn) return json.dumps({"Tags": tags}) def tag_resource(self) -> str: function_arn = unquote(self.path.rsplit("/", 1)[-1]) self.backend.tag_resource(function_arn, self.json_body["Tags"]) return "{}" def untag_resource(self) -> TYPE_RESPONSE: function_arn = unquote(self.path.rsplit("/", 1)[-1]) tag_keys = self.querystring["tagKeys"] self.backend.untag_resource(function_arn, tag_keys) return 204, {"status": 204}, "{}" def update_function_configuration(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 2)[-2]) qualifier = self._get_param("Qualifier", None) resp = self.backend.update_function_configuration( function_name, qualifier, body=self.json_body ) if resp: return 200, {}, json.dumps(resp) else: return 404, {"status": 404}, "{}" def update_function_code(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 2)[-2]) qualifier = self._get_param("Qualifier", None) resp = self.backend.update_function_code( function_name, qualifier, body=self.json_body ) if resp: return 200, {}, json.dumps(resp) else: return 404, {"status": 404}, "{}" def get_function_code_signing_config(self) -> str: function_name = unquote(self.path.rsplit("/", 2)[-2]) resp = self.backend.get_function_code_signing_config(function_name) return json.dumps(resp) def get_function_concurrency(self) -> TYPE_RESPONSE: path_function_name = unquote(self.path.rsplit("/", 2)[-2]) self.backend.get_function(path_function_name) resp = self.backend.get_function_concurrency(path_function_name) return 200, {}, json.dumps({"ReservedConcurrentExecutions": resp}) def delete_function_concurrency(self) -> TYPE_RESPONSE: path_function_name = unquote(self.path.rsplit("/", 2)[-2]) self.backend.get_function(path_function_name) self.backend.delete_function_concurrency(path_function_name) return 204, {"status": 204}, "{}" def put_function_concurrency(self) -> TYPE_RESPONSE: path_function_name = unquote(self.path.rsplit("/", 2)[-2]) self.backend.get_function(path_function_name) concurrency = self._get_param("ReservedConcurrentExecutions", None) resp = self.backend.put_function_concurrency(path_function_name, concurrency) return 200, {}, json.dumps({"ReservedConcurrentExecutions": resp}) def list_layers(self) -> str: layers = self.backend.list_layers() return json.dumps({"Layers": layers}) def delete_layer_version(self) -> str: layer_name = unquote(self.path.split("/")[-3]) layer_version = self.path.split("/")[-1] self.backend.delete_layer_version(layer_name, layer_version) return "{}" def get_layer_version(self) -> str: layer_name = unquote(self.path.split("/")[-3]) layer_version = self.path.split("/")[-1] layer = self.backend.get_layer_version(layer_name, layer_version) return json.dumps(layer.get_layer_version()) def list_layer_versions(self) -> str: layer_name = self.path.rsplit("/", 2)[-2] layer_versions = self.backend.list_layer_versions(layer_name) return json.dumps( {"LayerVersions": [lv.get_layer_version() for lv in layer_versions]} ) def publish_layer_version(self) -> TYPE_RESPONSE: spec = self.json_body if "LayerName" not in spec: spec["LayerName"] = self.path.rsplit("/", 2)[-2] layer_version = self.backend.publish_layer_version(spec) config = layer_version.get_layer_version() return 201, {"status": 201}, json.dumps(config) def create_alias(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 2)[-2]) params = json.loads(self.body) alias_name = params.get("Name") description = params.get("Description", "") function_version = params.get("FunctionVersion") routing_config = params.get("RoutingConfig") alias = self.backend.create_alias( name=alias_name, function_name=function_name, function_version=function_version, description=description, routing_config=routing_config, ) return 201, {"status": 201}, json.dumps(alias.to_json()) def delete_alias(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/")[-3]) alias_name = unquote(self.path.rsplit("/", 2)[-1]) self.backend.delete_alias(name=alias_name, function_name=function_name) return 201, {"status": 201}, "{}" def get_alias(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/")[-3]) alias_name = unquote(self.path.rsplit("/", 2)[-1]) alias = self.backend.get_alias(name=alias_name, function_name=function_name) return 201, {"status": 201}, json.dumps(alias.to_json()) def update_alias(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/")[-3]) alias_name = unquote(self.path.rsplit("/", 2)[-1]) params = json.loads(self.body) description = params.get("Description") function_version = params.get("FunctionVersion") routing_config = params.get("RoutingConfig") alias = self.backend.update_alias( name=alias_name, function_name=function_name, function_version=function_version, description=description, routing_config=routing_config, ) return 201, {"status": 201}, json.dumps(alias.to_json()) def put_function_event_invoke_config(self) -> str: function_name = unquote(self.path.rsplit("/", 2)[1]) response = self.backend.put_function_event_invoke_config( function_name, self.json_body ) return json.dumps(response) def get_function_event_invoke_config(self) -> str: function_name = unquote(self.path.rsplit("/", 2)[1]) response = self.backend.get_function_event_invoke_config(function_name) return json.dumps(response) def delete_function_event_invoke_config(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 2)[1]) self.backend.delete_function_event_invoke_config(function_name) return 204, {"status": 204}, json.dumps({}) def update_function_event_invoke_config(self) -> str: function_name = unquote(self.path.rsplit("/", 2)[1]) response = self.backend.update_function_event_invoke_config( function_name, self.json_body ) return json.dumps(response) def list_function_event_invoke_configs(self) -> str: function_name = unquote(self.path.rsplit("/", 3)[1]) return json.dumps( self.backend.list_function_event_invoke_configs(function_name) ) def add_layer_version_permission(self) -> str: statement = self.body layer_name = self._get_param("LayerName") version_number = self._get_param("VersionNumber") statement, revision_id = self.backend.add_layer_version_permission( layer_name=layer_name, version_number=version_number, statement=statement, ) return json.dumps(dict(Statement=json.dumps(statement), RevisionId=revision_id)) def get_layer_version_policy(self) -> str: layer_name = self._get_param("LayerName") version_number = self._get_param("VersionNumber") return self.backend.get_layer_version_policy( layer_name=layer_name, version_number=version_number ) def remove_layer_version_permission(self) -> TYPE_RESPONSE: layer_name = self._get_param("LayerName") version_number = self._get_param("VersionNumber") statement_id = self.path.split("/")[-1].split("?")[0] revision = self.querystring.get("RevisionId", "") if self.backend.get_layer_version(layer_name, version_number): self.backend.remove_layer_version_permission( layer_name, version_number, statement_id, revision ) return 204, {"status": 204}, "{}" else: return 404, {"status": 404}, "{}"
Memory