from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple from dateutil.parser import parse as dtparse from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel, CloudFormationModel from moto.emr.exceptions import ( InvalidRequestException, ResourceNotFoundException, ValidationException, ) from moto.utilities.utils import CamelToUnderscoresWalker, get_partition from .utils import ( EmrSecurityGroupManager, random_cluster_id, random_instance_group_id, random_step_id, ) EXAMPLE_AMI_ID = "ami-12c6146b" class FakeApplication(BaseModel): def __init__( self, name: str, version: str, args: List[str], additional_info: Dict[str, str] ): self.additional_info = additional_info or {} self.args = args or [] self.name = name self.version = version class FakeBootstrapAction(BaseModel): def __init__(self, args: List[str], name: str, script_path: str): self.args = args or [] self.name = name self.script_path = script_path class FakeInstance(BaseModel): def __init__( self, ec2_instance_id: str, instance_group: "FakeInstanceGroup", instance_fleet_id: Optional[str] = None, instance_id: Optional[str] = None, ): self.id = instance_id or random_instance_group_id() self.ec2_instance_id = ec2_instance_id self.instance_group = instance_group self.instance_fleet_id = instance_fleet_id class FakeInstanceGroup(CloudFormationModel): def __init__( self, cluster_id: str, instance_count: int, instance_role: str, instance_type: str, market: str = "ON_DEMAND", name: Optional[str] = None, instance_group_id: Optional[str] = None, bid_price: Optional[str] = None, ebs_configuration: Optional[Dict[str, Any]] = None, auto_scaling_policy: Optional[Dict[str, Any]] = None, ): self.id = instance_group_id or random_instance_group_id() self.cluster_id = cluster_id self.bid_price = bid_price self.market = market if name is None: if instance_role == "MASTER": name = "master" elif instance_role == "CORE": name = "slave" else: name = "Task instance group" self.name = name self.num_instances = instance_count self.role = instance_role self.instance_type = instance_type self.ebs_configuration = ebs_configuration self.auto_scaling_policy = auto_scaling_policy self.creation_datetime = datetime.now(timezone.utc) self.start_datetime = datetime.now(timezone.utc) self.ready_datetime = datetime.now(timezone.utc) self.end_datetime = None self.state = "RUNNING" def set_instance_count(self, instance_count: int) -> None: self.num_instances = instance_count @property def auto_scaling_policy(self) -> Any: # type: ignore[misc] return self._auto_scaling_policy @auto_scaling_policy.setter def auto_scaling_policy(self, value: Any) -> None: if value is None: self._auto_scaling_policy = value return self._auto_scaling_policy = CamelToUnderscoresWalker.parse(value) self._auto_scaling_policy["status"] = {"state": "ATTACHED"} # Transform common ${emr.clusterId} placeholder in any dimensions it occurs in. if "rules" in self._auto_scaling_policy: for rule in self._auto_scaling_policy["rules"]: if ( "trigger" in rule and "cloud_watch_alarm_definition" in rule["trigger"] and "dimensions" in rule["trigger"]["cloud_watch_alarm_definition"] ): for dimension in rule["trigger"]["cloud_watch_alarm_definition"][ "dimensions" ]: if ( "value" in dimension and dimension["value"] == "${emr.clusterId}" ): dimension["value"] = self.cluster_id @property def physical_resource_id(self) -> str: return self.id @staticmethod def cloudformation_type() -> str: return "AWS::EMR::InstanceGroupConfig" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "FakeInstanceGroup": properties = cloudformation_json["Properties"] job_flow_id = properties["JobFlowId"] ebs_config = properties.get("EbsConfiguration") if ebs_config: ebs_config = CamelToUnderscoresWalker.parse_dict(ebs_config) props = { "instance_count": properties.get("InstanceCount"), "instance_role": properties.get("InstanceRole"), "instance_type": properties.get("InstanceType"), "market": properties.get("Market"), "bid_price": properties.get("BidPrice"), "name": properties.get("Name"), "auto_scaling_policy": properties.get("AutoScalingPolicy"), "ebs_configuration": ebs_config, } emr_backend: ElasticMapReduceBackend = emr_backends[account_id][region_name] return emr_backend.add_instance_groups( cluster_id=job_flow_id, instance_groups=[props] )[0] class FakeStep(BaseModel): def __init__( self, state: str, name: str = "", jar: str = "", args: Optional[List[str]] = None, properties: Optional[Dict[str, str]] = None, action_on_failure: str = "TERMINATE_CLUSTER", ): self.id = random_step_id() self.action_on_failure = action_on_failure self.args = args or [] self.name = name self.jar = jar self.properties = properties or {} self.creation_datetime = datetime.now(timezone.utc) self.end_datetime = None self.ready_datetime = None self.start_datetime: Optional[datetime] = None self.state = state def start(self) -> None: self.start_datetime = datetime.now(timezone.utc) class FakeCluster(CloudFormationModel): def __init__( self, emr_backend: "ElasticMapReduceBackend", name: str, log_uri: str, job_flow_role: str, service_role: str, steps: List[Dict[str, Any]], instance_attrs: Dict[str, Any], bootstrap_actions: Optional[List[Dict[str, Any]]] = None, configurations: Optional[List[Dict[str, Any]]] = None, cluster_id: Optional[str] = None, visible_to_all_users: str = "false", release_label: Optional[str] = None, requested_ami_version: Optional[str] = None, running_ami_version: Optional[str] = None, custom_ami_id: Optional[str] = None, step_concurrency_level: int = 1, security_configuration: Optional[str] = None, kerberos_attributes: Optional[Dict[str, str]] = None, auto_scaling_role: Optional[str] = None, ): self.id = cluster_id or random_cluster_id() emr_backend.clusters[self.id] = self self.emr_backend = emr_backend self.applications: List[FakeApplication] = [] self.bootstrap_actions: List[FakeBootstrapAction] = [] for bootstrap_action in bootstrap_actions or []: self.add_bootstrap_action(bootstrap_action) self.configurations = configurations or [] self.tags: Dict[str, str] = {} self.log_uri = log_uri self.name = name self.normalized_instance_hours = 0 self.steps: List[FakeStep] = [] self.add_steps(steps) self.set_visibility(visible_to_all_users) self.instance_group_ids: List[str] = [] self.instances: List[FakeInstance] = [] self.master_instance_group_id: Optional[str] = None self.core_instance_group_id: Optional[str] = None if ( "master_instance_type" in instance_attrs and instance_attrs["master_instance_type"] ): self.emr_backend.add_instance_groups( self.id, [ { "instance_count": 1, "instance_role": "MASTER", "instance_type": instance_attrs["master_instance_type"], "market": "ON_DEMAND", "name": "master", } ], ) if ( "slave_instance_type" in instance_attrs and instance_attrs["slave_instance_type"] ): self.emr_backend.add_instance_groups( self.id, [ { "instance_count": instance_attrs["instance_count"] - 1, "instance_role": "CORE", "instance_type": instance_attrs["slave_instance_type"], "market": "ON_DEMAND", "name": "slave", } ], ) self.additional_master_security_groups = instance_attrs.get( "additional_master_security_groups" ) self.additional_slave_security_groups = instance_attrs.get( "additional_slave_security_groups" ) self.availability_zone = instance_attrs.get("availability_zone") self.ec2_key_name = instance_attrs.get("ec2_key_name") self.ec2_subnet_id = instance_attrs.get("ec2_subnet_id") self.hadoop_version = instance_attrs.get("hadoop_version") self.keep_job_flow_alive_when_no_steps = instance_attrs.get( "keep_job_flow_alive_when_no_steps" ) self.master_security_group = instance_attrs.get( "emr_managed_master_security_group" ) self.service_access_security_group = instance_attrs.get( "service_access_security_group" ) self.slave_security_group = instance_attrs.get( "emr_managed_slave_security_group" ) self.termination_protected = instance_attrs.get("termination_protected") self.release_label = release_label self.requested_ami_version = requested_ami_version self.running_ami_version = running_ami_version self.custom_ami_id = custom_ami_id self.role = job_flow_role or "EMRJobflowDefault" self.service_role = service_role self.step_concurrency_level = step_concurrency_level self.creation_datetime = datetime.now(timezone.utc) self.start_datetime: Optional[datetime] = None self.ready_datetime: Optional[datetime] = None self.end_datetime: Optional[datetime] = None self.state: Optional[str] = None self.start_cluster() self.run_bootstrap_actions() if self.steps: self.steps[0].start() self.security_configuration = ( security_configuration # ToDo: Raise if doesn't already exist. ) self.kerberos_attributes = kerberos_attributes self.auto_scaling_role = auto_scaling_role @property def arn(self) -> str: return f"arn:{get_partition(self.emr_backend.region_name)}:elasticmapreduce:{self.emr_backend.region_name}:{self.emr_backend.account_id}:cluster/{self.id}" @property def instance_groups(self) -> List[FakeInstanceGroup]: return self.emr_backend.get_instance_groups(self.instance_group_ids) @property def master_instance_type(self) -> str: return self.emr_backend.instance_groups[ self.master_instance_group_id # type: ignore ].instance_type @property def slave_instance_type(self) -> str: return self.emr_backend.instance_groups[ self.core_instance_group_id # type: ignore ].instance_type @property def instance_count(self) -> int: return sum(group.num_instances for group in self.instance_groups) def start_cluster(self) -> None: self.state = "STARTING" self.start_datetime = datetime.now(timezone.utc) def run_bootstrap_actions(self) -> None: self.state = "BOOTSTRAPPING" self.ready_datetime = datetime.now(timezone.utc) self.state = "WAITING" if not self.steps: if not self.keep_job_flow_alive_when_no_steps: self.terminate() def terminate(self) -> None: self.state = "TERMINATING" self.end_datetime = datetime.now(timezone.utc) self.state = "TERMINATED" def add_applications(self, applications: List[Dict[str, Any]]) -> None: self.applications.extend( [ FakeApplication( name=app.get("name", ""), version=app.get("version", ""), args=app.get("args", []), additional_info=app.get("additional_info", {}), ) for app in applications ] ) def add_bootstrap_action(self, bootstrap_action: Dict[str, Any]) -> None: self.bootstrap_actions.append(FakeBootstrapAction(**bootstrap_action)) def add_instance_group(self, instance_group: FakeInstanceGroup) -> None: if instance_group.role == "MASTER": if self.master_instance_group_id: raise Exception("Cannot add another master instance group") self.master_instance_group_id = instance_group.id num_master_nodes = instance_group.num_instances if num_master_nodes > 1: # Cluster is HA if num_master_nodes != 3: raise ValidationException( "Master instance group must have exactly 3 instances for HA clusters." ) self.keep_job_flow_alive_when_no_steps = True self.termination_protected = True if instance_group.role == "CORE": if self.core_instance_group_id: raise Exception("Cannot add another core instance group") self.core_instance_group_id = instance_group.id self.instance_group_ids.append(instance_group.id) def add_instance(self, instance: FakeInstance) -> None: self.instances.append(instance) def add_steps(self, steps: List[Dict[str, Any]]) -> List[FakeStep]: added_steps = [] for step in steps: if self.steps: # If we already have other steps, this one is pending fake = FakeStep(state="PENDING", **step) else: fake = FakeStep(state="RUNNING", **step) self.steps.append(fake) added_steps.append(fake) self.state = "RUNNING" return added_steps def add_tags(self, tags: Dict[str, str]) -> None: self.tags.update(tags) def remove_tags(self, tag_keys: List[str]) -> None: for key in tag_keys: self.tags.pop(key, None) def set_termination_protection(self, value: bool) -> None: self.termination_protected = value def set_visibility(self, visibility: str) -> None: self.visible_to_all_users = visibility @property def physical_resource_id(self) -> str: return self.id @staticmethod def cloudformation_type() -> str: return "AWS::EMR::Cluster" @classmethod def has_cfn_attr(cls, attr: str) -> bool: return attr in ["Id"] def get_cfn_attribute(self, attribute_name: str) -> str: from moto.cloudformation.exceptions import UnformattedGetAttTemplateException if attribute_name == "Id": return self.id raise UnformattedGetAttTemplateException() @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "FakeCluster": properties = cloudformation_json["Properties"] instance_attrs = properties.get("Instances", {}) instance_attrs["ec2_subnet_id"] = instance_attrs.get("Ec2SubnetId") instance_attrs["emr_managed_master_security_group"] = instance_attrs.get( "EmrManagedMasterSecurityGroup" ) instance_attrs["emr_managed_slave_security_group"] = instance_attrs.get( "EmrManagedSlaveSecurityGroup" ) instance_attrs["service_access_security_group"] = instance_attrs.get( "ServiceAccessSecurityGroup" ) instance_attrs["additional_master_security_groups"] = instance_attrs.get( "AdditionalMasterSecurityGroups", [] ) instance_attrs["additional_slave_security_groups"] = instance_attrs.get( "AdditionalSlaveSecurityGroups", [] ) emr_backend: ElasticMapReduceBackend = emr_backends[account_id][region_name] cluster = emr_backend.run_job_flow( name=properties["Name"], log_uri=properties.get("LogUri"), job_flow_role=properties["JobFlowRole"], service_role=properties["ServiceRole"], steps=[], instance_attrs=instance_attrs, kerberos_attributes=properties.get("KerberosAttributes", {}), release_label=properties.get("ReleaseLabel"), custom_ami_id=properties.get("CustomAmiId"), ) tags = {item["Key"]: item["Value"] for item in properties.get("Tags", [])} cluster.add_tags(tags) return cluster @classmethod def delete_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Dict[str, Any], account_id: str, region_name: str, ) -> None: emr_backend: ElasticMapReduceBackend = emr_backends[account_id][region_name] emr_backend.terminate_job_flows([resource_name]) class FakeSecurityConfiguration(CloudFormationModel): def __init__(self, name: str, security_configuration: str): self.name = name self.security_configuration = security_configuration self.creation_date_time = datetime.now(timezone.utc) @property def physical_resource_id(self) -> str: return self.name @staticmethod def cloudformation_type() -> str: return "AWS::EMR::SecurityConfiguration" @classmethod def create_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Any, account_id: str, region_name: str, **kwargs: Any, ) -> "FakeSecurityConfiguration": emr_backend: ElasticMapReduceBackend = emr_backends[account_id][region_name] properties = cloudformation_json["Properties"] return emr_backend.create_security_configuration( name=properties.get("Name") or resource_name, security_configuration=properties.get("SecurityConfiguration", {}), ) @classmethod def delete_from_cloudformation_json( # type: ignore[misc] cls, resource_name: str, cloudformation_json: Dict[str, Any], account_id: str, region_name: str, ) -> None: emr_backend: ElasticMapReduceBackend = emr_backends[account_id][region_name] properties = cloudformation_json["Properties"] name = properties.get("Name") or resource_name emr_backend.delete_security_configuration(name) class ElasticMapReduceBackend(BaseBackend): def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.clusters: Dict[str, FakeCluster] = {} self.instance_groups: Dict[str, FakeInstanceGroup] = {} self.security_configurations: Dict[str, FakeSecurityConfiguration] = {} self.block_public_access_configuration: Dict[str, Any] = {} @property def ec2_backend(self) -> Any: # type: ignore[misc] """ :return: EC2 Backend :rtype: moto.ec2.models.EC2Backend """ from moto.ec2 import ec2_backends return ec2_backends[self.account_id][self.region_name] def add_applications( self, cluster_id: str, applications: List[Dict[str, Any]] ) -> None: cluster = self.describe_cluster(cluster_id) cluster.add_applications(applications) def add_instance_groups( self, cluster_id: str, instance_groups: List[Dict[str, Any]] ) -> List[FakeInstanceGroup]: cluster = self.clusters[cluster_id] result_groups = [] for instance_group in instance_groups: group = FakeInstanceGroup(cluster_id=cluster_id, **instance_group) self.instance_groups[group.id] = group cluster.add_instance_group(group) result_groups.append(group) return result_groups def run_instances( self, cluster_id: str, instances: Dict[str, Any], instance_group: FakeInstanceGroup, ) -> None: cluster = self.clusters[cluster_id] instances["is_instance_type_default"] = not instances.get("instance_type") response = self.ec2_backend.run_instances( EXAMPLE_AMI_ID, instances["instance_count"], "", [], **instances ) for instance in response.instances: instance = FakeInstance( ec2_instance_id=instance.id, instance_group=instance_group ) cluster.add_instance(instance) def add_job_flow_steps( self, job_flow_id: str, steps: List[Dict[str, Any]] ) -> List[FakeStep]: cluster = self.clusters[job_flow_id] return cluster.add_steps(steps) def add_tags(self, cluster_id: str, tags: Dict[str, str]) -> None: cluster = self.describe_cluster(cluster_id) cluster.add_tags(tags) def describe_job_flows( self, job_flow_ids: Optional[List[str]] = None, job_flow_states: Optional[List[str]] = None, created_after: Optional[str] = None, created_before: Optional[str] = None, ) -> List[FakeCluster]: clusters = list(self.clusters.values()) within_two_month = datetime.now(timezone.utc) - timedelta(days=60) clusters = [c for c in clusters if c.creation_datetime >= within_two_month] if job_flow_ids: clusters = [c for c in clusters if c.id in job_flow_ids] if job_flow_states: clusters = [c for c in clusters if c.state in job_flow_states] if created_after: clusters = [ c for c in clusters if c.creation_datetime > dtparse(created_after) ] if created_before: clusters = [ c for c in clusters if c.creation_datetime < dtparse(created_before) ] # Amazon EMR can return a maximum of 512 job flow descriptions return sorted(clusters, key=lambda x: x.id)[:512] def describe_step(self, cluster_id: str, step_id: str) -> Optional[FakeStep]: cluster = self.clusters[cluster_id] for step in cluster.steps: if step.id == step_id: return step return None def describe_cluster(self, cluster_id: str) -> FakeCluster: if cluster_id in self.clusters: return self.clusters[cluster_id] raise ResourceNotFoundException("") def get_instance_groups( self, instance_group_ids: List[str] ) -> List[FakeInstanceGroup]: return [ group for group_id, group in self.instance_groups.items() if group_id in instance_group_ids ] def list_bootstrap_actions( self, cluster_id: str, marker: Optional[str] = None ) -> Tuple[List[FakeBootstrapAction], Optional[str]]: max_items = 50 actions = self.clusters[cluster_id].bootstrap_actions start_idx = 0 if marker is None else int(marker) marker = ( None if len(actions) <= start_idx + max_items else str(start_idx + max_items) ) return actions[start_idx : start_idx + max_items], marker def list_clusters( self, cluster_states: Optional[List[str]] = None, created_after: Optional[str] = None, created_before: Optional[str] = None, marker: Optional[str] = None, ) -> Tuple[List[FakeCluster], Optional[str]]: max_items = 50 clusters = list(self.clusters.values()) if cluster_states: clusters = [c for c in clusters if c.state in cluster_states] if created_after: clusters = [ c for c in clusters if c.creation_datetime > dtparse(created_after) ] if created_before: clusters = [ c for c in clusters if c.creation_datetime < dtparse(created_before) ] clusters = sorted(clusters, key=lambda x: x.id) start_idx = 0 if marker is None else int(marker) marker = ( None if len(clusters) <= start_idx + max_items else str(start_idx + max_items) ) return clusters[start_idx : start_idx + max_items], marker def list_instance_groups( self, cluster_id: str, marker: Optional[str] = None ) -> Tuple[List[FakeInstanceGroup], Optional[str]]: max_items = 50 groups = sorted(self.clusters[cluster_id].instance_groups, key=lambda x: x.id) start_idx = 0 if marker is None else int(marker) marker = ( None if len(groups) <= start_idx + max_items else str(start_idx + max_items) ) return groups[start_idx : start_idx + max_items], marker def list_instances( self, cluster_id: str, marker: Optional[str] = None, instance_group_id: Optional[str] = None, instance_group_types: Optional[List[str]] = None, ) -> Tuple[List[FakeInstance], Optional[str]]: max_items = 50 groups = sorted(self.clusters[cluster_id].instances, key=lambda x: x.id) start_idx = 0 if marker is None else int(marker) marker = ( None if len(groups) <= start_idx + max_items else str(start_idx + max_items) ) if instance_group_id: groups = [g for g in groups if g.instance_group.id == instance_group_id] if instance_group_types: groups = [ g for g in groups if g.instance_group.role in instance_group_types ] for g in groups: g.details = self.ec2_backend.get_instance(g.ec2_instance_id) # type: ignore return groups[start_idx : start_idx + max_items], marker def list_steps( self, cluster_id: str, marker: Optional[str] = None, step_ids: Optional[List[str]] = None, step_states: Optional[List[str]] = None, ) -> Tuple[List[FakeStep], Optional[str]]: max_items = 50 steps = sorted( self.clusters[cluster_id].steps, key=lambda o: o.creation_datetime, reverse=True, ) if step_ids: steps = [s for s in steps if s.id in step_ids] if step_states: steps = [s for s in steps if s.state in step_states] start_idx = 0 if marker is None else int(marker) marker = ( None if len(steps) <= start_idx + max_items else str(start_idx + max_items) ) return steps[start_idx : start_idx + max_items], marker def modify_cluster( self, cluster_id: str, step_concurrency_level: int ) -> FakeCluster: cluster = self.clusters[cluster_id] cluster.step_concurrency_level = step_concurrency_level return cluster def modify_instance_groups(self, instance_groups: List[Dict[str, Any]]) -> None: for instance_group in instance_groups: group = self.instance_groups[instance_group["instance_group_id"]] group.set_instance_count(int(instance_group["instance_count"])) def remove_tags(self, cluster_id: str, tag_keys: List[str]) -> None: cluster = self.describe_cluster(cluster_id) cluster.remove_tags(tag_keys) def _manage_security_groups( self, ec2_subnet_id: str, emr_managed_master_security_group: str, emr_managed_slave_security_group: str, service_access_security_group: str, **_: Any, ) -> Tuple[str, str, str]: default_return_value = ( emr_managed_master_security_group, emr_managed_slave_security_group, service_access_security_group, ) if not ec2_subnet_id: # TODO: Set up Security Groups in Default VPC. return default_return_value from moto.ec2.exceptions import InvalidSubnetIdError try: subnet = self.ec2_backend.get_subnet(ec2_subnet_id) except InvalidSubnetIdError: return default_return_value manager = EmrSecurityGroupManager(self.ec2_backend, subnet.vpc_id) master, slave, service = manager.manage_security_groups( emr_managed_master_security_group, emr_managed_slave_security_group, service_access_security_group, ) return master.id, slave.id, service.id def run_job_flow(self, **kwargs: Any) -> FakeCluster: ( kwargs["instance_attrs"]["emr_managed_master_security_group"], kwargs["instance_attrs"]["emr_managed_slave_security_group"], kwargs["instance_attrs"]["service_access_security_group"], ) = self._manage_security_groups(**kwargs["instance_attrs"]) return FakeCluster(self, **kwargs) def set_visible_to_all_users( self, job_flow_ids: List[str], visible_to_all_users: str ) -> None: for job_flow_id in job_flow_ids: cluster = self.clusters[job_flow_id] cluster.set_visibility(visible_to_all_users) def set_termination_protection(self, job_flow_ids: List[str], value: bool) -> None: for job_flow_id in job_flow_ids: cluster = self.clusters[job_flow_id] cluster.set_termination_protection(value) def terminate_job_flows(self, job_flow_ids: List[str]) -> List[FakeCluster]: clusters_terminated = [] clusters_protected = [] for job_flow_id in job_flow_ids: cluster = self.clusters[job_flow_id] if cluster.termination_protected: clusters_protected.append(cluster) continue cluster.terminate() clusters_terminated.append(cluster) if clusters_protected: raise ValidationException( "Could not shut down one or more job flows since they are termination protected." ) return clusters_terminated def put_auto_scaling_policy( self, instance_group_id: str, auto_scaling_policy: Optional[Dict[str, Any]] ) -> Optional[FakeInstanceGroup]: instance_groups = self.get_instance_groups( instance_group_ids=[instance_group_id] ) if len(instance_groups) == 0: return None instance_group = instance_groups[0] instance_group.auto_scaling_policy = auto_scaling_policy return instance_group def remove_auto_scaling_policy(self, instance_group_id: str) -> None: self.put_auto_scaling_policy(instance_group_id, auto_scaling_policy=None) def create_security_configuration( self, name: str, security_configuration: str ) -> FakeSecurityConfiguration: if name in self.security_configurations: raise InvalidRequestException( message=f"SecurityConfiguration with name '{name}' already exists." ) config = FakeSecurityConfiguration( name=name, security_configuration=security_configuration ) self.security_configurations[name] = config return config def get_security_configuration(self, name: str) -> FakeSecurityConfiguration: if name not in self.security_configurations: raise InvalidRequestException( message=f"Security configuration with name '{name}' does not exist." ) return self.security_configurations[name] def delete_security_configuration(self, name: str) -> None: if name not in self.security_configurations: raise InvalidRequestException( message=f"Security configuration with name '{name}' does not exist." ) del self.security_configurations[name] def get_block_public_access_configuration( self, ) -> Dict[str, Any]: # type ignore[misc] return self.block_public_access_configuration def put_block_public_access_configuration( self, block_public_security_group_rules: bool, rule_ranges: Optional[List[Dict[str, int]]], ) -> None: from moto.sts import sts_backends sts_backend = sts_backends[self.account_id]["global"] _, user_arn, _ = sts_backend.get_caller_identity( self.account_id, region=self.region_name ) self.block_public_access_configuration = { "block_public_access_configuration": { "block_public_security_group_rules": block_public_security_group_rules, "permitted_public_security_group_rule_ranges": [ { "min_range": rule_range.get("MinRange"), "max_range": rule_range.get("MaxRange"), } for rule_range in rule_ranges or [] ], }, "block_public_access_configuration_metadata": { "creation_date_time": datetime.now(), "created_by_arn": user_arn, }, } return emr_backends = BackendDict(ElasticMapReduceBackend, "emr")
Memory