import contextlib
import copy
from collections import OrderedDict
from typing import Any, Dict, ItemsView, List, Optional, Set, Tuple
from moto import settings
from moto.core.common_models import CloudFormationModel
from moto.core.utils import camelcase_to_underscores, utcnow
from moto.ec2.models.elastic_network_interfaces import NetworkInterface
from moto.ec2.models.fleets import Fleet
from moto.ec2.models.instance_types import (
INSTANCE_TYPE_OFFERINGS,
InstanceTypeOfferingBackend,
)
from moto.ec2.models.launch_templates import LaunchTemplateVersion
from moto.ec2.models.security_groups import SecurityGroup
from moto.ec2.models.subnets import Subnet
from moto.packages.boto.ec2.blockdevicemapping import BlockDeviceMapping
from moto.packages.boto.ec2.instance import Instance as BotoInstance
from moto.packages.boto.ec2.instance import Reservation
from ..exceptions import (
AvailabilityZoneNotFromRegionError,
InvalidInstanceIdError,
InvalidInstanceTypeError,
InvalidParameterCombination,
InvalidParameterValueErrorUnknownAttribute,
InvalidSecurityGroupNotFoundError,
InvalidSubnetIdError,
OperationDisableApiStopNotPermitted,
OperationNotPermitted4,
)
from ..utils import (
convert_tag_spec,
filter_reservations,
random_eni_attach_id,
random_instance_id,
random_private_ip,
random_reservation_id,
utc_date_and_time,
)
from .core import TaggedEC2Resource
class InstanceState:
def __init__(self, name: str = "pending", code: int = 0):
self.name = name
self.code = code
class StateReason:
def __init__(self, message: str = "", code: str = ""):
self.message = message
self.code = code
class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
VALID_ATTRIBUTES = {
"instanceType",
"kernel",
"ramdisk",
"userData",
"disableApiTermination",
"instanceInitiatedShutdownBehavior",
"rootDeviceName",
"blockDeviceMapping",
"productCodes",
"sourceDestCheck",
"groupSet",
"ebsOptimized",
"sriovNetSupport",
"disableApiStop",
}
def __init__(
self,
ec2_backend: Any,
image_id: str,
user_data: Any,
security_groups: List[SecurityGroup],
**kwargs: Any,
):
super().__init__()
self.ec2_backend = ec2_backend
self.id = random_instance_id()
self.owner_id = ec2_backend.account_id
self.lifecycle: Optional[str] = kwargs.get("lifecycle")
nics = copy.deepcopy(kwargs.get("nics", []))
launch_template_arg = kwargs.get("launch_template", {})
if launch_template_arg and not image_id:
# the image id from the template should be used
template_version = ec2_backend._get_template_from_args(launch_template_arg)
self.image_id = template_version.image_id
else:
self.image_id = image_id
# Check if we have tags to process
if launch_template_arg:
template_version = ec2_backend._get_template_from_args(launch_template_arg)
tag_spec_set = template_version.data.get("TagSpecification", {})
tags = convert_tag_spec(tag_spec_set)
instance_tags = tags.get("instance", {})
self.add_tags(instance_tags)
self._state = InstanceState("running", 16)
self._reason = ""
self._state_reason = StateReason()
self.user_data = user_data
self.security_groups = security_groups
self.instance_type: str = kwargs.get("instance_type", "m1.small")
self.region_name = kwargs.get("region_name", "us-east-1")
placement = kwargs.get("placement", None)
self.placement_hostid = kwargs.get("placement_hostid")
self.subnet_id = kwargs.get("subnet_id")
if not self.subnet_id:
self.subnet_id = next(
(n["SubnetId"] for n in nics if "SubnetId" in n), None
)
in_ec2_classic = not bool(self.subnet_id)
self.key_name = kwargs.get("key_name")
self.ebs_optimized = kwargs.get("ebs_optimized", False)
self.monitoring_state = kwargs.get("monitoring_state", "disabled")
self.source_dest_check = "true"
self.launch_time = utc_date_and_time()
self.ami_launch_index = kwargs.get("ami_launch_index", 0)
self.disable_api_termination = kwargs.get("disable_api_termination", False)
self.instance_initiated_shutdown_behavior = (
kwargs.get("instance_initiated_shutdown_behavior") or "stop"
)
self.hibernation_options = kwargs.get("hibernation_options")
self.sriov_net_support = "simple"
self._spot_fleet_id = kwargs.get("spot_fleet_id", None)
self._fleet_id = kwargs.get("fleet_id", None)
self.associate_public_ip = kwargs.get("associate_public_ip", False)
if in_ec2_classic:
# If we are in EC2-Classic, autoassign a public IP
self.associate_public_ip = True
amis = self.ec2_backend.describe_images(filters={"image-id": self.image_id})
ami = amis[0] if amis else None
self.platform = ami.platform if ami else None
self.virtualization_type = ami.virtualization_type if ami else "paravirtual"
self.architecture = ami.architecture if ami else "x86_64"
self.root_device_name = ami.root_device_name if ami else None
self.disable_api_stop = kwargs.get("disable_api_stop", False)
self.iam_instance_profile = kwargs.get("iam_instance_profile")
# handle weird bug around user_data -- something grabs the repr(), so
# it must be clean
if isinstance(self.user_data, list) and len(self.user_data) > 0:
if isinstance(self.user_data[0], bytes):
# string will have a "b" prefix -- need to get rid of it
self.user_data[0] = self.user_data[0].decode("utf-8")
if self.subnet_id:
subnet: Subnet = ec2_backend.get_subnet(self.subnet_id)
self._placement.zone = subnet.availability_zone
if self.associate_public_ip is None:
# Mapping public ip hasnt been explicitly enabled or disabled
self.associate_public_ip = subnet.map_public_ip_on_launch == "true"
elif placement:
self._placement.zone = placement
else:
self._placement.zone = ec2_backend.region_name + "a"
self.block_device_mapping: BlockDeviceMapping = BlockDeviceMapping()
self._private_ips: Set[str] = set()
self.prep_nics(
nics,
private_ip=kwargs.get("private_ip"),
associate_public_ip=self.associate_public_ip,
security_groups=self.security_groups,
ipv6_address_count=kwargs.get("ipv6_address_count"),
)
@property
def vpc_id(self) -> Optional[str]:
if self.subnet_id:
with contextlib.suppress(InvalidSubnetIdError):
subnet: Subnet = self.ec2_backend.get_subnet(self.subnet_id)
return subnet.vpc_id
if self.nics and 0 in self.nics:
return self.nics[0].subnet.vpc_id
return None
def __del__(self) -> None:
try:
subnet: Subnet = self.ec2_backend.get_subnet(self.subnet_id)
for ip in self._private_ips:
subnet.del_subnet_ip(ip)
except Exception:
# Its not "super" critical we clean this up, as reset will do this
# worst case we'll get IP address exaustion... rarely
pass
def add_block_device(
self,
size: int,
device_path: str,
snapshot_id: Optional[str],
encrypted: bool,
delete_on_termination: bool,
kms_key_id: Optional[str],
volume_type: Optional[str],
) -> None:
volume = self.ec2_backend.create_volume(
size=size,
zone_name=self._placement.zone,
snapshot_id=snapshot_id,
encrypted=encrypted,
kms_key_id=kms_key_id,
volume_type=volume_type,
)
self.ec2_backend.attach_volume(
volume.id, self.id, device_path, delete_on_termination
)
def setup_defaults(self) -> None:
# Default have an instance with root volume should you not wish to
# override with attach volume cmd.
volume = self.ec2_backend.create_volume(size=8, zone_name=self._placement.zone)
self.ec2_backend.attach_volume(volume.id, self.id, "/dev/sda1", True)
def teardown_defaults(self) -> None:
for device_path in list(self.block_device_mapping.keys()):
volume = self.block_device_mapping[device_path]
volume_id = volume.volume_id
self.ec2_backend.detach_volume(volume_id, self.id, device_path)
if volume.delete_on_termination:
self.ec2_backend.delete_volume(volume_id)
@property
def get_block_device_mapping(self) -> ItemsView[str, Any]: # type: ignore[misc]
return self.block_device_mapping.items()
@property
def private_ip(self) -> Optional[str]:
return self.nics[0].private_ip_address
@property
def private_dns(self) -> str:
formatted_ip = self.private_ip.replace(".", "-") # type: ignore[union-attr]
if self.region_name == "us-east-1":
return f"ip-{formatted_ip}.ec2.internal"
else:
return f"ip-{formatted_ip}.{self.region_name}.compute.internal"
@property
def public_ip(self) -> Optional[str]:
return self.nics[0].public_ip
@property
def public_dns(self) -> Optional[str]:
if self.public_ip:
formatted_ip = self.public_ip.replace(".", "-")
if self.region_name == "us-east-1":
return f"ec2-{formatted_ip}.compute-1.amazonaws.com"
else:
return f"ec2-{formatted_ip}.{self.region_name}.compute.amazonaws.com"
return None
@staticmethod
def cloudformation_name_type() -> str:
return ""
@staticmethod
def cloudformation_type() -> str:
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-instance.html
return "AWS::EC2::Instance"
@classmethod
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "Instance":
from ..models import ec2_backends
properties = cloudformation_json["Properties"]
ec2_backend = ec2_backends[account_id][region_name]
security_group_ids = properties.get("SecurityGroups", [])
group_names = [
ec2_backend.get_security_group_from_id(group_id).name # type: ignore[union-attr]
for group_id in security_group_ids
]
reservation = ec2_backend.run_instances(
image_id=properties["ImageId"],
user_data=properties.get("UserData"),
count=1,
security_group_names=group_names,
instance_type=properties.get("InstanceType", "m1.small"),
is_instance_type_default=not properties.get("InstanceType"),
subnet_id=properties.get("SubnetId"),
key_name=properties.get("KeyName"),
private_ip=properties.get("PrivateIpAddress"),
block_device_mappings=properties.get("BlockDeviceMappings", {}),
)
instance = reservation.instances[0]
for tag in properties.get("Tags", []):
instance.add_tag(tag["Key"], tag["Value"])
# Associating iam instance profile.
# TODO: Don't forget to implement replace_iam_instance_profile_association once update_from_cloudformation_json
# for ec2 instance will be implemented.
if properties.get("IamInstanceProfile"):
ec2_backend.associate_iam_instance_profile(
instance_id=instance.id,
iam_instance_profile_name=properties.get("IamInstanceProfile"),
)
return instance
@classmethod
def delete_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> None:
from ..models import ec2_backends
ec2_backend = ec2_backends[account_id][region_name]
all_instances = ec2_backend.all_instances()
# the resource_name for instances is the stack name, logical id, and random suffix separated
# by hyphens. So to lookup the instances using the 'aws:cloudformation:logical-id' tag, we need to
# extract the logical-id from the resource_name
logical_id = resource_name.split("-")[1]
for instance in all_instances:
instance_tags = instance.get_tags()
for tag in instance_tags:
if (
tag["key"] == "aws:cloudformation:logical-id"
and tag["value"] == logical_id
):
instance.delete(account_id, region_name)
@property
def physical_resource_id(self) -> str:
return self.id
def start(self) -> InstanceState:
previous_state = copy.copy(self._state)
for nic in self.nics.values():
nic.start()
self._state.name = "running"
self._state.code = 16
self._reason = ""
self._state_reason = StateReason()
return previous_state
def stop(self) -> InstanceState:
previous_state = copy.copy(self._state)
for nic in self.nics.values():
nic.stop()
self._state.name = "stopped"
self._state.code = 80
self._reason = f"User initiated ({utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')})"
self._state_reason = StateReason(
"Client.UserInitiatedShutdown: User initiated shutdown",
"Client.UserInitiatedShutdown",
)
return previous_state
def is_running(self) -> bool:
return self._state.name == "running"
def delete(
self,
account_id: str,
region: str, # pylint: disable=unused-argument
) -> None:
self.terminate()
def terminate(self) -> InstanceState:
previous_state = copy.copy(self._state)
for nic in self.nics.values():
nic.stop()
if nic.delete_on_termination:
nic.delete()
self.teardown_defaults()
if self._spot_fleet_id or self._fleet_id:
fleet = self.ec2_backend.get_spot_fleet_request(self._spot_fleet_id)
if not fleet:
fleet = self.ec2_backend.get_fleet(
self._spot_fleet_id
) or self.ec2_backend.get_fleet(self._fleet_id)
for spec in fleet.launch_specs:
if (
spec.instance_type == self.instance_type
and spec.subnet_id == self.subnet_id
):
fleet.fulfilled_capacity -= spec.weighted_capacity
break
fleet.spot_requests = [
req for req in fleet.spot_requests if req.instance != self
]
if isinstance(fleet, Fleet):
fleet.on_demand_instances = [
inst
for inst in fleet.on_demand_instances
if inst["instance"] != self
]
self._state.name = "terminated"
self._state.code = 48
self._reason = f"User initiated ({utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')})"
self._state_reason = StateReason(
"Client.UserInitiatedShutdown: User initiated shutdown",
"Client.UserInitiatedShutdown",
)
# Disassociate iam instance profile if associated, otherwise iam_instance_profile_associations will
# be pointing to None.
if self.ec2_backend.iam_instance_profile_associations.get(self.id):
self.ec2_backend.disassociate_iam_instance_profile(
association_id=self.ec2_backend.iam_instance_profile_associations[
self.id
].id
)
return previous_state
def reboot(self) -> None:
self._state.name = "running"
self._state.code = 16
self._reason = ""
self._state_reason = StateReason()
@property
def dynamic_group_list(self) -> List[SecurityGroup]:
return self.security_groups
def _get_private_ip_from_nic(self, nic: Dict[str, Any]) -> Optional[str]:
private_ip = nic.get("PrivateIpAddress")
if private_ip:
return private_ip
for address in nic.get("PrivateIpAddresses", []):
if address.get("Primary") == "true":
return address.get("PrivateIpAddress")
return None
def prep_nics(
self,
nic_spec: List[Dict[str, Any]],
private_ip: Optional[str] = None,
associate_public_ip: Optional[bool] = None,
security_groups: Optional[List[SecurityGroup]] = None,
ipv6_address_count: Optional[int] = None,
) -> None:
self.nics: Dict[int, NetworkInterface] = {}
for nic in nic_spec:
if int(nic.get("DeviceIndex")) == 0: # type: ignore[arg-type]
nic_associate_public_ip = nic.get("AssociatePublicIpAddress")
if nic_associate_public_ip is not None:
associate_public_ip = nic_associate_public_ip == "true"
if private_ip is None:
private_ip = self._get_private_ip_from_nic(nic)
break
if self.subnet_id:
subnet: Subnet = self.ec2_backend.get_subnet(self.subnet_id)
if not private_ip:
private_ip = subnet.get_available_subnet_ip(instance=self)
else:
subnet.request_ip(private_ip, instance=self)
self._private_ips.add(private_ip)
elif private_ip is None:
# Preserve old behaviour if in EC2-Classic mode
private_ip = random_private_ip()
# Primary NIC defaults
primary_nic = {
"SubnetId": self.subnet_id,
"PrivateIpAddress": private_ip,
"AssociatePublicIpAddress": associate_public_ip,
}
primary_nic = dict((k, v) for k, v in primary_nic.items() if v)
# If empty NIC spec but primary NIC values provided, create NIC from
# them.
if primary_nic and not nic_spec:
nic_spec = [primary_nic]
nic_spec[0]["DeviceIndex"] = 0
# Flesh out data structures and associations
for nic in nic_spec:
device_index = int(nic.get("DeviceIndex")) # type: ignore[arg-type]
nic_id = nic.get("NetworkInterfaceId")
if nic_id:
# If existing NIC found, use it.
use_nic = self.ec2_backend.get_network_interface(nic_id)
use_nic.device_index = device_index
use_nic.public_ip_auto_assign = False
else:
# If primary NIC values provided, use them for the primary NIC.
if device_index == 0 and primary_nic:
nic.update(primary_nic)
if "SubnetId" in nic:
nic_subnet: Subnet = self.ec2_backend.get_subnet(nic["SubnetId"])
else:
# Get default Subnet
zone = self._placement.zone
nic_subnet = self.ec2_backend.get_default_subnet(
availability_zone=zone
)
group_ids = nic.get("SecurityGroupId") or []
if security_groups:
group_ids.extend([group.id for group in security_groups])
use_nic = self.ec2_backend.create_network_interface(
nic_subnet,
nic.get("PrivateIpAddress"),
device_index=device_index,
public_ip_auto_assign=nic.get("AssociatePublicIpAddress", False),
group_ids=group_ids,
delete_on_termination=nic.get("DeleteOnTermination") == "true",
ipv6_address_count=ipv6_address_count,
)
self.attach_eni(use_nic, device_index)
def attach_eni(self, eni: NetworkInterface, device_index: int) -> str:
device_index = int(device_index)
self.nics[device_index] = eni
# This is used upon associate/disassociate public IP.
eni.instance = self
eni.attachment_id = random_eni_attach_id()
eni.attach_time = utc_date_and_time()
eni.status = "in-use"
eni.device_index = device_index
return eni.attachment_id
def detach_eni(self, eni: NetworkInterface) -> None:
self.nics.pop(eni.device_index, None) # type: ignore[arg-type]
eni.instance = None
eni.attachment_id = None
eni.attach_time = None
eni.status = "available"
eni.device_index = None
@classmethod
def has_cfn_attr(cls, attr: str) -> bool:
return attr in [
"AvailabilityZone",
"PrivateDnsName",
"PublicDnsName",
"PrivateIp",
"PublicIp",
]
def get_cfn_attribute(self, attribute_name: str) -> Any:
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "AvailabilityZone":
return self.placement
elif attribute_name == "PrivateDnsName":
return self.private_dns
elif attribute_name == "PublicDnsName":
return self.public_dns
elif attribute_name == "PrivateIp":
return self.private_ip
elif attribute_name == "PublicIp":
return self.public_ip
raise UnformattedGetAttTemplateException()
def applies(self, filters: List[Dict[str, Any]]) -> bool:
if filters:
applicable = False
for f in filters:
acceptable_values = f["values"]
if f["name"] == "instance-state-name":
if self._state.name in acceptable_values:
applicable = True
if f["name"] == "instance-state-code":
if str(self._state.code) in acceptable_values:
applicable = True
return applicable
# If there are no filters, all instances are valid
return True
class InstanceBackend:
def __init__(self) -> None:
self.reservations: Dict[str, Reservation] = OrderedDict()
def get_instance(self, instance_id: str) -> Instance:
for instance in self.all_instances():
if instance.id == instance_id:
return instance
raise InvalidInstanceIdError(instance_id)
def run_instances(
self,
image_id: str,
count: int,
user_data: Optional[str],
security_group_names: List[str],
**kwargs: Any,
) -> Reservation:
"""
The Placement-parameter is validated to verify the availability-zone exists for the current region.
The InstanceType-parameter can be validated, to see if it is a known instance-type.
Enable this validation by setting the environment variable `MOTO_EC2_ENABLE_INSTANCE_TYPE_VALIDATION=true`
The ImageId-parameter can be validated, to see if it is a known AMI.
Enable this validation by setting the environment variable `MOTO_ENABLE_AMI_VALIDATION=true`
The KeyPair-parameter can be validated, to see if it is a known key-pair.
Enable this validation by setting the environment variable `MOTO_ENABLE_KEYPAIR_VALIDATION=true`
"""
location_type = "availability-zone" if kwargs.get("placement") else "region"
default_region = "us-east-1"
if settings.ENABLE_KEYPAIR_VALIDATION:
self.describe_key_pairs(key_names=[kwargs.get("key_name")]) # type: ignore[attr-defined]
if settings.ENABLE_AMI_VALIDATION:
self.describe_images(ami_ids=[image_id] if image_id else []) # type: ignore[attr-defined]
valid_instance_types = INSTANCE_TYPE_OFFERINGS[location_type]
if "region_name" in kwargs and kwargs.get("placement"):
valid_availability_zones = {
instance["Location"]
for instance in valid_instance_types[kwargs["region_name"]]
}
if kwargs["placement"] not in valid_availability_zones:
raise AvailabilityZoneNotFromRegionError(kwargs["placement"])
match_filters = InstanceTypeOfferingBackend().matches_filters
if not kwargs["is_instance_type_default"] and not any(
{
match_filters(
valid_instance,
{"instance-type": kwargs["instance_type"]},
location_type,
)
for valid_instance in valid_instance_types.get(
kwargs["region_name"]
if "region_name" in kwargs
else default_region,
{},
)
},
):
if settings.EC2_ENABLE_INSTANCE_TYPE_VALIDATION:
raise InvalidInstanceTypeError(kwargs["instance_type"])
security_groups = [
self.get_security_group_by_name_or_id(name) # type: ignore[attr-defined]
for name in security_group_names
]
for sg_id in kwargs.pop("security_group_ids", []):
if isinstance(sg_id, str):
sg = self.get_security_group_from_id(sg_id) # type: ignore[attr-defined]
if sg is None:
raise InvalidSecurityGroupNotFoundError(sg_id)
security_groups.append(sg)
else:
security_groups.append(sg_id)
new_reservation = Reservation(reservation_id=random_reservation_id())
self.reservations[new_reservation.id] = new_reservation
tags = kwargs.pop("tags", {})
instance_tags = tags.get("instance", {})
volume_tags = tags.get("volume", {})
for index in range(count):
kwargs["ami_launch_index"] = index
new_instance = Instance(
self, image_id, user_data, security_groups, **kwargs
)
new_reservation.instances.append(new_instance)
new_instance.add_tags(instance_tags)
block_device_mappings = None
if "block_device_mappings" not in kwargs:
new_instance.setup_defaults()
if "block_device_mappings" in kwargs:
block_device_mappings = kwargs["block_device_mappings"]
elif kwargs.get("launch_template"):
template = self._get_template_from_args(kwargs["launch_template"])
block_device_mappings = template.data.get("BlockDeviceMapping")
elif kwargs.get("launch_config"):
block_device_mappings = kwargs[
"launch_config"
].block_device_mapping_dict
if block_device_mappings:
for block_device in block_device_mappings:
device_name = block_device["DeviceName"]
volume_size = block_device["Ebs"].get("VolumeSize")
volume_type = block_device["Ebs"].get("VolumeType")
snapshot_id = block_device["Ebs"].get("SnapshotId")
encrypted = block_device["Ebs"].get("Encrypted", False)
if isinstance(encrypted, str):
encrypted = encrypted.lower() == "true"
delete_on_termination = block_device["Ebs"].get(
"DeleteOnTermination", False
)
kms_key_id = block_device["Ebs"].get("KmsKeyId")
if block_device.get("NoDevice") != "":
new_instance.add_block_device(
volume_size,
device_name,
snapshot_id,
encrypted,
delete_on_termination,
kms_key_id,
volume_type=volume_type,
)
if kwargs.get("instance_market_options"):
new_instance.lifecycle = "spot"
# Tag all created volumes.
for _, device in new_instance.get_block_device_mapping:
volumes = self.describe_volumes(volume_ids=[device.volume_id]) # type: ignore
for volume in volumes:
volume.add_tags(volume_tags)
return new_reservation
def start_instances(
self, instance_ids: List[str]
) -> List[Tuple[Instance, InstanceState]]:
started_instances = []
for instance in self.get_multi_instances_by_id(instance_ids):
previous_state = instance.start()
started_instances.append((instance, previous_state))
return started_instances
def stop_instances(
self, instance_ids: List[str]
) -> List[Tuple[Instance, InstanceState]]:
stopped_instances = []
for instance in self.get_multi_instances_by_id(instance_ids):
if instance.disable_api_stop == "true":
raise OperationDisableApiStopNotPermitted(instance.id)
previous_state = instance.stop()
stopped_instances.append((instance, previous_state))
return stopped_instances
def terminate_instances(
self, instance_ids: List[str]
) -> List[Tuple[Instance, InstanceState]]:
terminated_instances = []
if not instance_ids:
raise InvalidParameterCombination("No instances specified")
for instance in self.get_multi_instances_by_id(instance_ids):
if instance.disable_api_termination == "true":
raise OperationNotPermitted4(instance.id)
previous_state = instance.terminate()
terminated_instances.append((instance, previous_state))
return terminated_instances
def reboot_instances(self, instance_ids: List[str]) -> List[Instance]:
rebooted_instances = []
for instance in self.get_multi_instances_by_id(instance_ids):
instance.reboot()
rebooted_instances.append(instance)
return rebooted_instances
def modify_instance_attribute(
self, instance_id: str, key: str, value: Any
) -> Instance:
instance = self.get_instance(instance_id)
setattr(instance, key, value)
return instance
def modify_instance_security_groups(
self, instance_id: str, new_group_id_list: List[str]
) -> Instance:
instance = self.get_instance(instance_id)
new_group_list = []
for new_group_id in new_group_id_list:
new_group_list.append(self.get_security_group_from_id(new_group_id)) # type: ignore[attr-defined]
setattr(instance, "security_groups", new_group_list)
return instance
def describe_instance_attribute(
self, instance_id: str, attribute: str
) -> Tuple[Instance, Any]:
if attribute not in Instance.VALID_ATTRIBUTES:
raise InvalidParameterValueErrorUnknownAttribute(attribute)
if attribute == "groupSet":
key = "security_groups"
else:
key = camelcase_to_underscores(attribute)
instance = self.get_instance(instance_id)
value = getattr(instance, key)
return instance, value
def describe_instance_credit_specifications(
self, instance_ids: List[str]
) -> List[Instance]:
queried_instances = []
for instance in self.get_multi_instances_by_id(instance_ids):
queried_instances.append(instance)
return queried_instances
def all_instances(self, filters: Any = None) -> List[Instance]:
instances = []
for reservation in self.all_reservations():
for instance in reservation.instances:
if instance.applies(filters):
instances.append(instance)
return instances
def all_running_instances(self, filters: Any = None) -> List[Instance]:
instances = []
for reservation in self.all_reservations():
for instance in reservation.instances:
if instance.state_code == 16 and instance.applies(filters):
instances.append(instance)
return instances
def get_multi_instances_by_id(
self, instance_ids: List[str], filters: Any = None
) -> List[Instance]:
"""
:param instance_ids: A string list with instance ids
:return: A list with instance objects
"""
result = []
for reservation in self.all_reservations():
for instance in reservation.instances:
if instance.id in instance_ids:
if instance.applies(filters):
result.append(instance)
if instance_ids and len(instance_ids) > len(result):
result_ids = [i.id for i in result]
missing_instance_ids = [i for i in instance_ids if i not in result_ids]
raise InvalidInstanceIdError(missing_instance_ids)
return result
def get_instance_by_id(self, instance_id: str) -> Optional[Instance]:
for reservation in self.all_reservations():
for instance in reservation.instances:
if instance.id == instance_id:
return instance
return None
def get_reservations_by_instance_ids(
self, instance_ids: List[str], filters: Any = None
) -> List[Reservation]:
"""Go through all of the reservations and filter to only return those
associated with the given instance_ids.
"""
reservations = []
for reservation in self.all_reservations():
reservation_instance_ids = [
instance.id for instance in reservation.instances
]
matching_reservation = any(
instance_id in reservation_instance_ids for instance_id in instance_ids
)
if matching_reservation:
reservation.instances = [
instance
for instance in reservation.instances
if instance.id in instance_ids
]
reservations.append(reservation)
found_instance_ids = [
instance.id
for reservation in reservations
for instance in reservation.instances
]
if len(found_instance_ids) != len(instance_ids):
invalid_id = list(set(instance_ids).difference(set(found_instance_ids)))[0]
raise InvalidInstanceIdError(invalid_id)
if filters is not None:
reservations = filter_reservations(reservations, filters)
return reservations
def describe_instances(self, filters: Any = None) -> List[Reservation]:
return self.all_reservations(filters)
def describe_instance_status(
self, instance_ids: List[str], include_all_instances: bool, filters: Any
) -> List[Instance]:
if instance_ids:
return self.get_multi_instances_by_id(instance_ids, filters)
elif include_all_instances:
return self.all_instances(filters)
else:
return self.all_running_instances(filters)
def all_reservations(self, filters: Any = None) -> List[Reservation]:
reservations = [
copy.copy(reservation) for reservation in self.reservations.copy().values()
]
if filters is not None:
reservations = filter_reservations(reservations, filters)
return reservations
def _get_template_from_args(
self, launch_template_arg: Dict[str, Any]
) -> LaunchTemplateVersion:
template = (
self.describe_launch_templates( # type: ignore[attr-defined]
template_ids=[launch_template_arg["LaunchTemplateId"]]
)[0]
if "LaunchTemplateId" in launch_template_arg
else self.describe_launch_templates( # type: ignore[attr-defined]
template_names=[launch_template_arg["LaunchTemplateName"]]
)[0]
)
version = launch_template_arg.get("Version", template.latest_version_number)
template_version = template.get_version(version)
return template_version