from typing import Any, Dict, List, Tuple
from ._base_response import EC2BaseResponse
def try_parse_int(value: Any, default: Any = None) -> Any:
try:
return int(value)
except (TypeError, ValueError):
return default
def parse_sg_attributes_from_dict(sg_attributes: Dict[str, Any]) -> Tuple[Any, ...]:
ip_protocol = sg_attributes.get("IpProtocol", [None])[0]
from_port = sg_attributes.get("FromPort", [None])[0]
to_port = sg_attributes.get("ToPort", [None])[0]
ip_ranges: List[Dict[str, Any]] = []
ip_ranges_tree = sg_attributes.get("IpRanges") or {}
for ip_range_idx in sorted(ip_ranges_tree.keys()):
ip_range = {"CidrIp": ip_ranges_tree[ip_range_idx]["CidrIp"][0]}
if ip_ranges_tree[ip_range_idx].get("Description"):
ip_range["Description"] = ip_ranges_tree[ip_range_idx].get("Description")[0]
ip_ranges.append(ip_range)
ip_ranges_tree: Dict[str, Any] = sg_attributes.get("Ipv6Ranges") or {} # type: ignore[no-redef]
for ip_range_idx in sorted(ip_ranges_tree.keys()):
ip_range = {"CidrIpv6": ip_ranges_tree[ip_range_idx]["CidrIpv6"][0]}
if ip_ranges_tree[ip_range_idx].get("Description"):
ip_range["Description"] = ip_ranges_tree[ip_range_idx].get("Description")[0]
ip_ranges.append(ip_range)
if "CidrIp" in sg_attributes:
cidr_ip = sg_attributes.get("CidrIp")[0] # type: ignore
ip_ranges.append({"CidrIp": cidr_ip})
if "CidrIpv6" in sg_attributes:
cidr_ipv6 = sg_attributes.get("CidrIpv6")[0] # type: ignore
ip_ranges.append({"CidrIpv6": cidr_ipv6})
source_groups: List[Dict[str, Any]] = []
groups_tree: Dict[str, Any] = sg_attributes.get("Groups") or {}
for group_idx in sorted(groups_tree.keys()):
group_dict = groups_tree[group_idx]
source_group = {}
if "GroupId" in group_dict:
source_group["GroupId"] = group_dict["GroupId"][0]
if "GroupName" in group_dict:
source_group["GroupName"] = group_dict["GroupName"][0]
if "Description" in group_dict:
source_group["Description"] = group_dict["Description"][0]
if "OwnerId" in group_dict:
source_group["OwnerId"] = group_dict["OwnerId"][0]
source_groups.append(source_group)
prefix_list_ids: List[Dict[str, Any]] = []
pl_tree: Dict[str, Any] = sg_attributes.get("PrefixListIds") or {}
for pl_index in sorted(pl_tree):
pl_dict = pl_tree.get(pl_index, {})
pl_item = {}
if "PrefixListId" in pl_dict:
pl_item["PrefixListId"] = pl_dict.get("PrefixListId")[0]
if "Description" in pl_dict:
pl_item["Description"] = pl_dict.get("Description")[0]
if pl_item:
prefix_list_ids.append(pl_item)
return (ip_protocol, from_port, to_port, ip_ranges, source_groups, prefix_list_ids)
class SecurityGroups(EC2BaseResponse):
def _process_rules_from_querystring(self) -> Any:
group_name_or_id = self._get_param("GroupName") or self._get_param("GroupId")
security_rule_ids = self._get_multi_param("SecurityGroupRuleId")
querytree: Dict[str, Any] = {}
for key, value in self.querystring.items():
key_splitted = key.split(".")
key_splitted = [try_parse_int(e, e) for e in key_splitted]
d = querytree
for subkey in key_splitted[:-1]:
if subkey not in d:
d[subkey] = {}
d = d[subkey]
d[key_splitted[-1]] = value
sg_rule_tags = self._parse_tag_specification().get("security-group-rule", {})
if "IpPermissions" not in querytree:
# Handle single rule syntax
(
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups,
prefix_list_ids,
) = parse_sg_attributes_from_dict(querytree)
yield {
"group_name_or_id": group_name_or_id,
"ip_protocol": ip_protocol,
"from_port": from_port,
"to_port": to_port,
"ip_ranges": ip_ranges,
"sgrule_tags": sg_rule_tags,
"source_groups": source_groups,
"prefix_list_ids": prefix_list_ids,
"security_rule_ids": security_rule_ids,
}
ip_permissions = querytree.get("IpPermissions") or {}
for ip_permission_idx in sorted(ip_permissions.keys()):
ip_permission = ip_permissions[ip_permission_idx]
(
ip_protocol,
from_port,
to_port,
ip_ranges,
source_groups,
prefix_list_ids,
) = parse_sg_attributes_from_dict(ip_permission)
yield {
"group_name_or_id": group_name_or_id,
"ip_protocol": ip_protocol,
"from_port": from_port,
"to_port": to_port,
"ip_ranges": ip_ranges,
"sgrule_tags": sg_rule_tags,
"source_groups": source_groups,
"prefix_list_ids": prefix_list_ids,
"security_rule_ids": security_rule_ids,
}
def authorize_security_group_egress(self) -> str:
self.error_on_dryrun()
for kwargs in self._process_rules_from_querystring():
rules, group = self.ec2_backend.authorize_security_group_egress(**kwargs)
template = self.response_template(AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE)
return template.render(rules=rules, group=group)
def authorize_security_group_ingress(self) -> str:
self.error_on_dryrun()
for kwargs in self._process_rules_from_querystring():
rules, group = self.ec2_backend.authorize_security_group_ingress(**kwargs)
template = self.response_template(AUTHORIZE_SECURITY_GROUP_INGRESS_RESPONSE)
return template.render(rules=rules, group=group)
def create_security_group(self) -> str:
name = self._get_param("GroupName")
description = self._get_param("GroupDescription")
vpc_id = self._get_param("VpcId")
tags = self._parse_tag_specification().get("security-group", {})
self.error_on_dryrun()
group = self.ec2_backend.create_security_group(
name, description, vpc_id=vpc_id, tags=tags
)
template = self.response_template(CREATE_SECURITY_GROUP_RESPONSE)
return template.render(group=group)
def delete_security_group(self) -> str:
# TODO this should raise an error if there are instances in the group.
# See
# http://docs.aws.amazon.com/AWSEC2/latest/APIReference/ApiReference-query-DeleteSecurityGroup.html
name = self._get_param("GroupName")
sg_id = self._get_param("GroupId")
self.error_on_dryrun()
if name:
self.ec2_backend.delete_security_group(name)
elif sg_id:
self.ec2_backend.delete_security_group(group_id=sg_id)
return DELETE_GROUP_RESPONSE
def describe_security_groups(self) -> str:
groupnames = self._get_multi_param("GroupName")
group_ids = self._get_multi_param("GroupId")
filters = self._filters_from_querystring()
groups = self.ec2_backend.describe_security_groups(
group_ids=group_ids, groupnames=groupnames, filters=filters
)
template = self.response_template(DESCRIBE_SECURITY_GROUPS_RESPONSE)
return template.render(groups=groups)
def describe_security_group_rules(self) -> str:
group_id = self._get_param("GroupId")
sg_rule_ids = self._get_param("SecurityGroupRuleId.1")
filters = self._filters_from_querystring()
self.error_on_dryrun()
# if sg rule ids are not None then wrap in a list
# as expected by ec2_backend.describe_security_group_rules
if sg_rule_ids:
sg_rule_ids = [sg_rule_ids]
rules = self.ec2_backend.describe_security_group_rules(
group_id, sg_rule_ids, filters
)
template = self.response_template(DESCRIBE_SECURITY_GROUP_RULES_RESPONSE)
return template.render(rules=rules)
def revoke_security_group_egress(self) -> str:
self.error_on_dryrun()
for args in self._process_rules_from_querystring():
# we don't need this parameter to revoke
del args["sgrule_tags"]
self.ec2_backend.revoke_security_group_egress(**args)
return REVOKE_SECURITY_GROUP_EGRESS_RESPONSE
def revoke_security_group_ingress(self) -> str:
self.error_on_dryrun()
for args in self._process_rules_from_querystring():
# we don't need this parameter to revoke
del args["sgrule_tags"]
self.ec2_backend.revoke_security_group_ingress(**args)
return REVOKE_SECURITY_GROUP_INGRESS_RESPONSE
def update_security_group_rule_descriptions_ingress(self) -> str:
for args in self._process_rules_from_querystring():
# we don't need this parameter to revoke
del args["sgrule_tags"]
self.ec2_backend.update_security_group_rule_descriptions_ingress(**args)
return UPDATE_SECURITY_GROUP_RULE_DESCRIPTIONS_INGRESS
def update_security_group_rule_descriptions_egress(self) -> str:
for args in self._process_rules_from_querystring():
# we don't need this parameter to revoke
del args["sgrule_tags"]
self.ec2_backend.update_security_group_rule_descriptions_egress(**args)
return UPDATE_SECURITY_GROUP_RULE_DESCRIPTIONS_EGRESS
CREATE_SECURITY_GROUP_RESPONSE = """<CreateSecurityGroupResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
<groupId>{{ group.id }}</groupId>
<securityGroupArn>{{ group.arn }}</securityGroupArn>
<tagSet>
{% for tag in group.get_tags() %}
<item>
<key>{{ tag.key }}</key>
<value>{{ tag.value }}</value>
</item>
{% endfor %}
</tagSet>
</CreateSecurityGroupResponse>"""
DESCRIBE_SECURITY_GROUP_RULES_RESPONSE = """
<DescribeSecurityGroupRulesResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>{{ request_id }}</requestId>
<securityGroupRuleSet>
{% for rule in rules %}
<item>
<ipProtocol>{{ rule.ip_protocol }}</ipProtocol>
<groupId>{{ rule.group_id }}</groupId>
<groupOwnerId>{{ rule.owner_id }}</groupOwnerId>
<fromPort>{{ rule.from_port if rule.from_port is not none else -1 }}</fromPort>
<toPort>{{ rule.to_port if rule.to_port is not none else -1 }}</toPort>
<isEgress>{{ 'true' if rule.is_egress else 'false' }}</isEgress>
<securityGroupRuleId>{{ rule.id }}</securityGroupRuleId>
{% if rule.ip_range %}
{% if rule.ip_range['CidrIp'] %}<cidrIpv4>{{ rule.ip_range['CidrIp'] }}</cidrIpv4>{% endif %}
{% if rule.ip_range['CidrIpv6'] %}<cidrIpv6>{{ rule.ip_range['CidrIpv6'] }}</cidrIpv6>{% endif %}
{% if rule.ip_range['Description'] %}<description>{{ rule.ip_range['Description'] }}</description>{% endif %}
{% endif %}
{% if rule.source_group %}
<referencedGroupInfo>
<groupId>{{ rule.source_group['GroupId'] }}</groupId><userId>{{ rule.source_group['OwnerId'] }}</userId>
</referencedGroupInfo>
{% if rule.source_group['Description'] %}<description>{{ rule.source_group['Description'] }}</description>{% endif %}
{% endif %}
{% if rule.prefix_list_id %}
<prefixListId>{{ rule.prefix_list_id['PrefixListId'] }}</prefixListId>
{% if rule.prefix_list_id['Description'] %}<description>{{ rule.prefix_list_id['Description'] }}</description>{% endif %}
{% endif %}
<tagSet>
{% for tag in rule.get_tags() %}
<item>
<key>{{ tag.key }}</key>
<value>{{ tag.value }}</value>
</item>
{% endfor %}
</tagSet>
</item>
{% endfor %}
</securityGroupRuleSet>
</DescribeSecurityGroupRulesResponse>"""
DELETE_GROUP_RESPONSE = """<DeleteSecurityGroupResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
</DeleteSecurityGroupResponse>"""
DESCRIBE_SECURITY_GROUPS_RESPONSE = """<DescribeSecurityGroupsResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<securityGroupInfo>
{% for group in groups %}
<item>
<ownerId>{{ group.owner_id }}</ownerId>
<groupId>{{ group.id }}</groupId>
<groupName>{{ group.name }}</groupName>
<groupDescription>{{ group.description }}</groupDescription>
<securityGroupArn>{{ group.arn }}</securityGroupArn>
{% if group.vpc_id %}
<vpcId>{{ group.vpc_id }}</vpcId>
{% endif %}
<ipPermissions>
{% for rule in group.flattened_ingress_rules %}
<item>
<ipProtocol>{{ rule.ip_protocol }}</ipProtocol>
{% if rule.from_port is not none %}
<fromPort>{{ rule.from_port }}</fromPort>
{% endif %}
{% if rule.to_port is not none %}
<toPort>{{ rule.to_port }}</toPort>
{% endif %}
<groups>
{% for source_group in rule.source_groups %}
<item>
{% if source_group.OwnerId and source_group.OwnerId != "" %}
<userId>{{ source_group.OwnerId }}</userId>
{% endif %}
{% if source_group.GroupId and source_group.GroupId != "" %}
<groupId>{{ source_group.GroupId }}</groupId>
{% endif %}
{% if source_group.GroupName and source_group.GroupName != "" %}
<groupName>{{ source_group.GroupName }}</groupName>
{% endif %}
{% if source_group.Description and source_group.Description != "" %}
<description>{{ source_group.Description }}</description>
{% endif %}
</item>
{% endfor %}
</groups>
<ipRanges>
{% for ip_range in rule.ip_ranges %}
{% if ip_range['CidrIp'] %}
<item>
<cidrIp>{{ ip_range['CidrIp'] }}</cidrIp>
{% if ip_range['Description'] %}
<description>{{ ip_range['Description'] }}</description>
{% endif %}
</item>
{% endif %}
{% endfor %}
</ipRanges>
<ipv6Ranges>
{% for ip_range in rule.ip_ranges %}
{% if ip_range['CidrIpv6'] %}
<item>
<cidrIpv6>{{ ip_range['CidrIpv6'] }}</cidrIpv6>
{% if ip_range['Description'] %}
<description>{{ ip_range['Description'] }}</description>
{% endif %}
</item>
{% endif %}
{% endfor %}
</ipv6Ranges>
<prefixListIds>
{% for prefix_list in rule.prefix_list_ids %}
<item>
<prefixListId>{{ prefix_list.PrefixListId }}</prefixListId>
{% if prefix_list.Description %}
<description>{{ prefix_list.Description }}</description>
{% endif %}
</item>
{% endfor %}
</prefixListIds>
</item>
{% endfor %}
</ipPermissions>
<ipPermissionsEgress>
{% for rule in group.flattened_egress_rules %}
<item>
<ipProtocol>{{ rule.ip_protocol }}</ipProtocol>
{% if rule.from_port is not none %}
<fromPort>{{ rule.from_port }}</fromPort>
{% endif %}
{% if rule.to_port is not none %}
<toPort>{{ rule.to_port }}</toPort>
{% endif %}
<groups>
{% for source_group in rule.source_groups %}
<item>
{% if source_group.OwnerId and source_group.OwnerId != "" %}
<userId>{{ source_group.OwnerId }}</userId>
{% endif %}
{% if source_group.GroupId and source_group.GroupId != "" %}
<groupId>{{ source_group.GroupId }}</groupId>
{% endif %}
{% if source_group.GroupName and source_group.GroupName != "" %}
<groupName>{{ source_group.GroupName }}</groupName>
{% endif %}
{% if source_group.Description and source_group.Description != "" %}
<description>{{ source_group.Description }}</description>
{% endif %}
</item>
{% endfor %}
</groups>
<ipRanges>
{% for ip_range in rule.ip_ranges %}
{% if ip_range['CidrIp'] %}
<item>
<cidrIp>{{ ip_range['CidrIp'] }}</cidrIp>
{% if ip_range['Description'] %}
<description>{{ ip_range['Description'] }}</description>
{% endif %}
</item>
{% endif %}
{% endfor %}
</ipRanges>
<ipv6Ranges>
{% for ip_range in rule.ip_ranges %}
{% if ip_range['CidrIpv6'] %}
<item>
<cidrIpv6>{{ ip_range['CidrIpv6'] }}</cidrIpv6>
{% if ip_range['Description'] %}
<description>{{ ip_range['Description'] }}</description>
{% endif %}
</item>
{% endif %}
{% endfor %}
</ipv6Ranges>
<prefixListIds>
{% for prefix_list in rule.prefix_list_ids %}
<item>
<prefixListId>{{ prefix_list.PrefixListId }}</prefixListId>
{% if prefix_list.Description %}
<description>{{ prefix_list.Description }}</description>
{% endif %}
</item>
{% endfor %}
</prefixListIds>
</item>
{% endfor %}
</ipPermissionsEgress>
<tagSet>
{% for tag in group.get_tags() %}
<item>
<key>{{ tag.key }}</key>
<value>{{ tag.value }}</value>
</item>
{% endfor %}
</tagSet>
</item>
{% endfor %}
</securityGroupInfo>
</DescribeSecurityGroupsResponse>"""
AUTHORIZE_SECURITY_GROUP_INGRESS_RESPONSE = """<AuthorizeSecurityGroupIngressResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>b1f67202-c2c2-4ba4-8464-c8b1d8f5af7a</requestId>
<return>true</return>
<securityGroupRuleSet>
{% for rule in rules %}
<item>
{% if rule.ip_range %}
{% if rule.ip_range.CidrIp %}
<cidrIpv4>{{ rule.ip_range.CidrIp }}</cidrIpv4>
{% endif %}
{% if rule.ip_range.CidrIpv6 %}
<cidrIpv6>{{ rule.ip_range.CidrIpv6 }}</cidrIpv6>
{% endif %}
<description>{{ rule.ip_range.Description or '' }}</description>
{% endif %}
{% if rule.prefix_list_id %}
<prefixListId>{{ rule.prefix_list_id.PrefixListId }}</prefixListId>
<description>{{ rule.prefix_list_id.Description or '' }}</description>
{% endif %}
{% if rule.source_group %}
<referencedGroupInfo>
{% if rule.source_group.OwnerId and rule.source_group.OwnerId != "" %}
<userId>{{ rule.source_group.OwnerId }}</userId>
{% endif %}
{% if rule.source_group.GroupId and rule.source_group.GroupId != "" %}
<groupId>{{ rule.source_group.GroupId }}</groupId>
{% endif %}
{% if rule.source_group.VpcId and rule.source_group.VpcId != "" %}
<vpcId>{{ rule.source_group.VpcId }}</vpcId>
{% endif %}
</referencedGroupInfo>
<description>{{ rule.source_group.Description or '' }}</description>
{% endif %}
{% if rule.from_port is not none %}
<fromPort>{{ rule.from_port }}</fromPort>
{% endif %}
<groupId>{{ group.id }}</groupId>
<groupOwnerId>{{ rule.owner_id }}</groupOwnerId>
<ipProtocol>{{ rule.ip_protocol }}</ipProtocol>
<isEgress>false</isEgress>
<securityGroupRuleId>{{ rule.id }}</securityGroupRuleId>
{% if rule.to_port is not none %}
<toPort>{{ rule.to_port }}</toPort>
{% endif %}
<tagSet>
{% for tag in rule.get_tags() %}
<item>
<key>{{ tag.key }}</key>
<value>{{ tag.value }}</value>
</item>
{% endfor %}
</tagSet>
</item>
{% endfor %}
</securityGroupRuleSet>
</AuthorizeSecurityGroupIngressResponse>"""
REVOKE_SECURITY_GROUP_INGRESS_RESPONSE = """<RevokeSecurityGroupIngressResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
</RevokeSecurityGroupIngressResponse>"""
AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE = """<AuthorizeSecurityGroupEgressResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>b1f67202-c2c2-4ba4-8464-c8b1d8f5af7a</requestId>
<return>true</return>
<securityGroupRuleSet>
{% for rule in rules %}
<item>
{% if rule.ip_range %}
{% if rule.ip_range.CidrIp %}
<cidrIpv4>{{ rule.ip_range.CidrIp }}</cidrIpv4>
{% endif %}
{% if rule.ip_range.CidrIpv6 %}
<cidrIpv6>{{ rule.ip_range.CidrIpv6 }}</cidrIpv6>
{% endif %}
<description>{{ rule.ip_range.Description or '' }}</description>
{% endif %}
{% if rule.prefix_list_id %}
<prefixListId>{{ rule.prefix_list_id.PrefixListId }}</prefixListId>
<description>{{ rule.prefix_list_id.Description or '' }}</description>
{% endif %}
{% if rule.source_group %}
<referencedGroupInfo>
{% if rule.source_group.OwnerId and rule.source_group.OwnerId != "" %}
<userId>{{ rule.source_group.OwnerId }}</userId>
{% endif %}
{% if rule.source_group.GroupId and rule.source_group.GroupId != "" %}
<groupId>{{ rule.source_group.GroupId }}</groupId>
{% endif %}
{% if rule.source_group.VpcId and rule.source_group.VpcId != "" %}
<vpcId>{{ rule.source_group.VpcId }}</vpcId>
{% endif %}
</referencedGroupInfo>
<description>{{ rule.source_group.Description or '' }}</description>
{% endif %}
{% if rule.from_port is not none %}
<fromPort>{{ rule.from_port }}</fromPort>
{% endif %}
<groupId>{{ group.id }}</groupId>
<groupOwnerId>{{ rule.owner_id }}</groupOwnerId>
<ipProtocol>{{ rule.ip_protocol }}</ipProtocol>
<isEgress>true</isEgress>
<securityGroupRuleId>{{ rule.id }}</securityGroupRuleId>
{% if rule.to_port is not none %}
<toPort>{{ rule.to_port }}</toPort>
{% endif %}
<tagSet>
{% for tag in rule.get_tags() %}
<item>
<key>{{ tag.key }}</key>
<value>{{ tag.value }}</value>
</item>
{% endfor %}
</tagSet>
</item>
{% endfor %}
</securityGroupRuleSet>
</AuthorizeSecurityGroupEgressResponse>"""
REVOKE_SECURITY_GROUP_EGRESS_RESPONSE = """<RevokeSecurityGroupEgressResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
</RevokeSecurityGroupEgressResponse>"""
UPDATE_SECURITY_GROUP_RULE_DESCRIPTIONS_INGRESS = """<UpdateSecurityGroupRuleDescriptionsIngressResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
</UpdateSecurityGroupRuleDescriptionsIngressResponse>"""
UPDATE_SECURITY_GROUP_RULE_DESCRIPTIONS_EGRESS = """<UpdateSecurityGroupRuleDescriptionsEgressResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<return>true</return>
</UpdateSecurityGroupRuleDescriptionsEgressResponse>"""