from typing import Any, Dict, Optional
from moto.core.responses import BaseResponse
from ..exceptions import EC2ClientError, EmptyTagSpecError, InvalidParameter
from ..utils import convert_tag_spec
class EC2BaseResponse(BaseResponse):
@property
def ec2_backend(self) -> Any: # type: ignore[misc]
from moto.ec2.models import ec2_backends
return ec2_backends[self.current_account][self.region]
def _filters_from_querystring(self) -> Dict[str, str]:
# [{"Name": x1, "Value": y1}, ..]
_filters = self._get_multi_param("Filter.", skip_result_conversion=True)
# return {x1: y1, ...}
try:
return {f["Name"]: f.get("Value", []) for f in _filters}
except KeyError:
raise EC2ClientError(
"InvalidParameterValue", "The filter 'null' is invalid."
)
def _parse_tag_specification(
self, expected_type: Optional[str] = None
) -> Dict[str, Dict[str, str]]:
# [{"ResourceType": _type, "Tag": [{"Key": k, "Value": v}, ..]}]
tag_spec_set = self._get_multi_param(
"TagSpecification", skip_result_conversion=True
)
if not tag_spec_set:
tag_spec_set = self._get_multi_param(
"TagSpecifications", skip_result_conversion=True
)
if not tag_spec_set:
return {}
tags_dict = (
tag_spec_set[0] if isinstance(tag_spec_set, list) else tag_spec_set
) # awscli allows for a json string to be passed and it should be allowed
if "ResourceType" not in tags_dict:
raise InvalidParameter("Tag specification resource type must have a value")
if expected_type and tags_dict["ResourceType"] != expected_type:
raise InvalidParameter(
f"'{tags_dict['ResourceType']}' is not a valid taggable resource type for this operation."
)
if "Tag" not in tags_dict:
if tags_dict.get("ResourceType") == "subnet":
raise InvalidParameter("Tag specification must have at least one tag")
raise EmptyTagSpecError
return convert_tag_spec(tag_spec_set)