"""Implementation of the AWS Config Service APIs.""" import json import re import time from datetime import datetime from typing import Any, Dict, List, Optional, Union from moto.config.exceptions import ( DuplicateTags, InsufficientPermissionsException, InvalidConfigurationRecorderNameException, InvalidDeliveryChannelNameException, InvalidDeliveryFrequency, InvalidLimitException, InvalidNextTokenException, InvalidParameterValueException, InvalidRecordingGroupException, InvalidResourceParameters, InvalidResourceTypeException, InvalidResultTokenException, InvalidS3KeyPrefixException, InvalidS3KmsKeyArnException, InvalidSNSTopicARNException, InvalidTagCharacters, LastDeliveryChannelDeleteFailedException, MaxNumberOfConfigRulesExceededException, MaxNumberOfConfigurationRecordersExceededException, MaxNumberOfDeliveryChannelsExceededException, MissingRequiredConfigRuleParameterException, NameTooLongException, NoAvailableConfigurationRecorderException, NoAvailableDeliveryChannelException, NoSuchBucketException, NoSuchConfigRuleException, NoSuchConfigurationAggregatorException, NoSuchConfigurationRecorderException, NoSuchDeliveryChannelException, NoSuchOrganizationConformancePackException, NoSuchRetentionConfigurationException, ResourceInUseException, ResourceNotDiscoveredException, ResourceNotFoundException, TagKeyTooBig, TagValueTooBig, TooManyAccountSources, TooManyResourceIds, TooManyResourceKeys, TooManyTags, ValidationException, ) from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel, ConfigQueryModel from moto.core.responses import AWSServiceSpec from moto.core.utils import utcnow from moto.iam.config import policy_config_query, role_config_query from moto.moto_api._internal import mock_random as random from moto.s3.config import s3_config_query from moto.s3control.config import s3_account_public_access_block_query from moto.utilities.utils import get_partition, load_resource POP_STRINGS = [ "capitalizeStart", "CapitalizeStart", "capitalizeArn", "CapitalizeArn", "capitalizeARN", "CapitalizeARN", ] DEFAULT_PAGE_SIZE = 100 CONFIG_RULE_PAGE_SIZE = 25 # Map the Config resource type to a backend: RESOURCE_MAP: Dict[str, ConfigQueryModel[Any]] = { "AWS::S3::Bucket": s3_config_query, "AWS::S3::AccountPublicAccessBlock": s3_account_public_access_block_query, "AWS::IAM::Role": role_config_query, "AWS::IAM::Policy": policy_config_query, } CAMEL_TO_SNAKE_REGEX = re.compile(r"(?<!^)(?=[A-Z])") MAX_TAGS_IN_ARG = 50 MANAGED_RULES = load_resource(__name__, "resources/aws_managed_rules.json") MANAGED_RULES_CONSTRAINTS = MANAGED_RULES["ManagedRules"] def datetime2int(date: datetime) -> int: return int(time.mktime(date.timetuple())) def snake_to_camels(original: str, cap_start: bool, cap_arn: bool) -> str: parts = original.split("_") camel_cased = parts[0].lower() + "".join(p.title() for p in parts[1:]) if cap_arn: camel_cased = camel_cased.replace( "Arn", "ARN" ) # Some config services use 'ARN' instead of 'Arn' if cap_start: camel_cased = camel_cased[0].upper() + camel_cased[1::] return camel_cased def random_string() -> str: """Returns a random set of 8 lowercase letters for the Config Aggregator ARN""" return random.get_random_string(length=8, include_digits=False, lower_case=True) def validate_tag_key(tag_key: str, exception_param: str = "tags.X.member.key") -> None: """Validates the tag key. :param tag_key: The tag key to check against. :param exception_param: The exception parameter to send over to help format the message. This is to reflect the difference between the tag and untag APIs. :return: """ # Validate that the key length is correct: if len(tag_key) > 128: raise TagKeyTooBig(tag_key, param=exception_param) # Validate that the tag key fits the proper Regex: # [\w\s_.:/=+\-@]+ SHOULD be the same as the Java regex on the AWS # documentation: [\p{L}\p{Z}\p{N}_.:/=+\-@]+ match = re.findall(r"[\w\s_.:/=+\-@]+", tag_key) # Kudos if you can come up with a better way of doing a global search :) if not match or len(match[0]) < len(tag_key): raise InvalidTagCharacters(tag_key, param=exception_param) def check_tag_duplicate(all_tags: Dict[str, str], tag_key: str) -> None: """Validates that a tag key is not a duplicate :param all_tags: Dict to check if there is a duplicate tag. :param tag_key: The tag key to check against. :return: """ if all_tags.get(tag_key): raise DuplicateTags() def validate_tags(tags: List[Dict[str, str]]) -> Dict[str, str]: proper_tags: Dict[str, str] = {} if len(tags) > MAX_TAGS_IN_ARG: raise TooManyTags(tags) for tag in tags: # Validate the Key: validate_tag_key(tag["Key"]) check_tag_duplicate(proper_tags, tag["Key"]) # Validate the Value: if len(tag["Value"]) > 256: raise TagValueTooBig(tag["Value"]) proper_tags[tag["Key"]] = tag["Value"] return proper_tags def convert_to_class_args(dict_arg: Dict[str, Any]) -> Dict[str, Any]: """Return dict that can be used to instantiate it's representative class. Given a dictionary in the incoming API request, convert the keys to snake case to use as arguments when instatiating the representative class's __init__(). """ class_args = {} for key, value in dict_arg.items(): class_args[CAMEL_TO_SNAKE_REGEX.sub("_", key).lower()] = value # boto detects if extra/unknown arguments are provided, so it's not # necessary to do so here. return class_args class ConfigEmptyDictable(BaseModel): """Base class to make serialization easy. This assumes that the sub-class will NOT return 'None's in the JSON. """ def __init__(self, capitalize_start: bool = False, capitalize_arn: bool = True): """Assists with the serialization of the config object :param capitalize_start: For some Config services, the first letter is lowercase -- for others it's capital :param capitalize_arn: For some Config services, the API expects 'ARN' and for others, it expects 'Arn' """ self.capitalize_start = capitalize_start self.capitalize_arn = capitalize_arn def to_dict(self) -> Dict[str, Any]: data: Dict[str, Any] = {} for item, value in self.__dict__.items(): # ignore private attributes if not item.startswith("_") and value is not None: if isinstance(value, ConfigEmptyDictable): data[ snake_to_camels( item, self.capitalize_start, self.capitalize_arn ) ] = value.to_dict() else: data[ snake_to_camels( item, self.capitalize_start, self.capitalize_arn ) ] = value # Cleanse the extra properties: for prop in POP_STRINGS: data.pop(prop, None) return data class ConfigRecorderStatus(ConfigEmptyDictable): def __init__(self, name: str): super().__init__() self.name = name self.recording = False self.last_start_time: Optional[int] = None self.last_stop_time: Optional[int] = None self.last_status: Optional[str] = None self.last_error_code: Optional[str] = None self.last_error_message: Optional[str] = None self.last_status_change_time: Optional[int] = None def start(self) -> None: self.recording = True self.last_status = "PENDING" self.last_start_time = datetime2int(utcnow()) self.last_status_change_time = datetime2int(utcnow()) def stop(self) -> None: self.recording = False self.last_stop_time = datetime2int(utcnow()) self.last_status_change_time = datetime2int(utcnow()) class ConfigDeliverySnapshotProperties(ConfigEmptyDictable): def __init__(self, delivery_frequency: str): super().__init__() self.delivery_frequency = delivery_frequency class ConfigDeliveryChannel(ConfigEmptyDictable): def __init__( self, name: str, s3_bucket_name: str, prefix: Optional[str] = None, sns_arn: Optional[str] = None, s3_kms_key_arn: Optional[str] = None, snapshot_properties: Optional[ConfigDeliverySnapshotProperties] = None, ): super().__init__() self.name = name self.s3_bucket_name = s3_bucket_name self.s3_key_prefix = prefix self.s3_kms_key_arn = s3_kms_key_arn self.sns_topic_arn = sns_arn self.config_snapshot_delivery_properties = snapshot_properties def to_dict(self) -> Dict[str, Any]: """Need to override this function because the KMS Key ARN is written as `Arn` vs. SNS which is `ARN`.""" data = super().to_dict() # Fix the KMS ARN if it's here: kms_arn = data.pop("s3KmsKeyARN", None) if kms_arn: data["s3KmsKeyArn"] = kms_arn return data class RecordingGroup(ConfigEmptyDictable): def __init__( self, all_supported: bool = True, include_global_resource_types: bool = False, resource_types: Optional[List[str]] = None, exclusion_by_resource_types: Optional[Dict[str, List[str]]] = None, recording_strategy: Optional[Dict[str, str]] = None, ): super().__init__() self.all_supported = all_supported self.include_global_resource_types = include_global_resource_types self.resource_types = resource_types self.exclusion_by_resource_types = exclusion_by_resource_types self.recording_strategy = recording_strategy class ConfigRecorder(ConfigEmptyDictable): def __init__( self, role_arn: str, recording_group: RecordingGroup, name: str = "default", status: Optional[ConfigRecorderStatus] = None, ): super().__init__() self.name = name self.role_arn = role_arn self.recording_group = recording_group if not status: self.status = ConfigRecorderStatus(name) else: self.status = status class AccountAggregatorSource(ConfigEmptyDictable): def __init__( self, account_ids: List[str], aws_regions: Optional[List[str]] = None, all_aws_regions: Optional[bool] = None, ): super().__init__(capitalize_start=True) # Can't have both the regions and all_regions flag present -- also # can't have them both missing: if aws_regions and all_aws_regions: raise InvalidParameterValueException( "Your configuration aggregator contains a list of regions " "and also specifies the use of all regions. You must choose " "one of these options." ) if not (aws_regions or all_aws_regions): raise InvalidParameterValueException( "Your request does not specify any regions. Select AWS Config-supported " "regions and try again." ) self.account_ids = account_ids self.aws_regions = aws_regions if not all_aws_regions: all_aws_regions = False self.all_aws_regions = all_aws_regions class OrganizationAggregationSource(ConfigEmptyDictable): def __init__( self, role_arn: str, aws_regions: Optional[List[str]] = None, all_aws_regions: Optional[bool] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) # Can't have both the regions and all_regions flag present -- also # can't have them both missing: if aws_regions and all_aws_regions: raise InvalidParameterValueException( "Your configuration aggregator contains a list of regions and also specifies " "the use of all regions. You must choose one of these options." ) if not (aws_regions or all_aws_regions): raise InvalidParameterValueException( "Your request does not specify any regions. Select AWS Config-supported " "regions and try again." ) self.role_arn = role_arn self.aws_regions = aws_regions if not all_aws_regions: all_aws_regions = False self.all_aws_regions = all_aws_regions class ConfigAggregator(ConfigEmptyDictable): def __init__( self, name: str, account_id: str, region: str, account_sources: Optional[List[AccountAggregatorSource]] = None, org_source: Optional[OrganizationAggregationSource] = None, tags: Optional[Dict[str, str]] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) self.configuration_aggregator_name = name self.configuration_aggregator_arn = f"arn:{get_partition(region)}:config:{region}:{account_id}:config-aggregator/config-aggregator-{random_string()}" self.account_aggregation_sources = account_sources self.organization_aggregation_source = org_source self.creation_time = datetime2int(utcnow()) self.last_updated_time = datetime2int(utcnow()) # Tags are listed in the list_tags_for_resource API call. self.tags = tags or {} # Override the to_dict so that we can format the tags properly... def to_dict(self) -> Dict[str, Any]: result = super().to_dict() # Override the account aggregation sources if present: if self.account_aggregation_sources: result["AccountAggregationSources"] = [ a.to_dict() for a in self.account_aggregation_sources ] if self.tags: result["Tags"] = [ {"Key": key, "Value": value} for key, value in self.tags.items() ] return result class ConfigAggregationAuthorization(ConfigEmptyDictable): def __init__( self, account_id: str, current_region: str, authorized_account_id: str, authorized_aws_region: str, tags: Dict[str, str], ): super().__init__(capitalize_start=True, capitalize_arn=False) self.aggregation_authorization_arn = f"arn:{get_partition(current_region)}:config:{current_region}:{account_id}:aggregation-authorization/{authorized_account_id}/{authorized_aws_region}" self.authorized_account_id = authorized_account_id self.authorized_aws_region = authorized_aws_region self.creation_time = datetime2int(utcnow()) # Tags are listed in the list_tags_for_resource API call. self.tags = tags or {} class OrganizationConformancePack(ConfigEmptyDictable): def __init__( self, account_id: str, region: str, name: str, delivery_s3_bucket: str, delivery_s3_key_prefix: Optional[str] = None, input_parameters: Optional[List[Dict[str, Any]]] = None, excluded_accounts: Optional[List[str]] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) self._status = "CREATE_SUCCESSFUL" self._unique_pack_name = f"{name}-{random_string()}" self.conformance_pack_input_parameters = input_parameters or [] self.delivery_s3_bucket = delivery_s3_bucket self.delivery_s3_key_prefix = delivery_s3_key_prefix self.excluded_accounts = excluded_accounts or [] self.last_update_time = datetime2int(utcnow()) self.organization_conformance_pack_arn = f"arn:{get_partition(region)}:config:{region}:{account_id}:organization-conformance-pack/{self._unique_pack_name}" self.organization_conformance_pack_name = name def update( self, delivery_s3_bucket: str, delivery_s3_key_prefix: str, input_parameters: List[Dict[str, Any]], excluded_accounts: List[str], ) -> None: self._status = "UPDATE_SUCCESSFUL" self.conformance_pack_input_parameters = input_parameters self.delivery_s3_bucket = delivery_s3_bucket self.delivery_s3_key_prefix = delivery_s3_key_prefix self.excluded_accounts = excluded_accounts self.last_update_time = datetime2int(utcnow()) class Scope(ConfigEmptyDictable): """Defines resources that can trigger an evaluation for the rule. Per boto3 documentation, Scope can be one of: - one or more resource types, - combo of one resource type and one resource ID, - combo of tag key and value. If no scope is specified, evaluations are trigged when any resource in the recording group changes. """ def __init__( self, compliance_resource_types: Optional[List[str]] = None, tag_key: Optional[str] = None, tag_value: Optional[str] = None, compliance_resource_id: Optional[str] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) self.tags = None if tag_key or tag_value: if tag_value and not tag_key: raise InvalidParameterValueException( "Tag key should not be empty when tag value is provided in scope" ) if tag_key and len(tag_key) > 128: raise TagKeyTooBig(tag_key, "ConfigRule.Scope.TagKey") if tag_value and len(tag_value) > 256: raise TagValueTooBig(tag_value, "ConfigRule.Scope.TagValue") self.tags = {tag_key: tag_value} # Can't use more than one combo to specify scope - either tags, # resource types, or resource id and resource type. if self.tags and (compliance_resource_types or compliance_resource_id): raise InvalidParameterValueException( "Scope cannot be applied to both resource and tag" ) if compliance_resource_id and len(compliance_resource_types) != 1: # type: ignore[arg-type] raise InvalidParameterValueException( "A single resourceType should be provided when resourceId " "is provided in scope" ) self.compliance_resource_types = compliance_resource_types self.compliance_resource_id = compliance_resource_id class SourceDetail(ConfigEmptyDictable): """Source and type of event triggering AWS Config resource evaluation. Applies only to customer rules. """ MESSAGE_TYPES = { "ConfigurationItemChangeNotification", "ConfigurationSnapshotDeliveryCompleted", "OversizedConfigurationItemChangeNotification", "ScheduledNotification", } DEFAULT_FREQUENCY = "TwentyFour_Hours" FREQUENCY_TYPES = { "One_Hour", "Six_Hours", "Three_Hours", "Twelve_Hours", "TwentyFour_Hours", } EVENT_SOURCES = ["aws.config"] def __init__( self, event_source: Optional[str] = None, message_type: Optional[str] = None, maximum_execution_frequency: Optional[str] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) # If the event_source or message_type fields are not provided, # boto3 reports: "SourceDetails should be null/empty if the owner is # AWS. SourceDetails should be provided if the owner is CUSTOM_LAMBDA." # A more specific message will be used here instead. if not event_source: raise MissingRequiredConfigRuleParameterException( "Missing required parameter in ConfigRule.SourceDetails: 'EventSource'" ) if event_source not in SourceDetail.EVENT_SOURCES: raise ValidationException( f"Value '{event_source}' at " f"'configRule.source.sourceDetails.eventSource' failed " f"to satisfy constraint: Member must satisfy enum value set: {{" + ", ".join((SourceDetail.EVENT_SOURCES)) + "}" ) if not message_type: # boto3 doesn't have a specific error if this field is missing. raise MissingRequiredConfigRuleParameterException( "Missing required parameter in ConfigRule.SourceDetails: 'MessageType'" ) if message_type not in SourceDetail.MESSAGE_TYPES: raise ValidationException( f"Value '{message_type}' at " f"'configRule.source.sourceDetails.message_type' failed " f"to satisfy constraint: Member must satisfy enum value set: {{" + ", ".join(sorted(SourceDetail.MESSAGE_TYPES)) + "}" ) if maximum_execution_frequency: if maximum_execution_frequency not in SourceDetail.FREQUENCY_TYPES: raise ValidationException( f"Value '{maximum_execution_frequency}' at " f"'configRule.source.sourceDetails.maximumExecutionFrequency' " f"failed to satisfy constraint: " f"Member must satisfy enum value set: {{" + ", ".join(sorted(SourceDetail.FREQUENCY_TYPES)) + "}" ) if message_type in [ "ConfigurationItemChangeNotification", "OversizedConfigurationItemChangeNotification", ]: raise InvalidParameterValueException( "A maximum execution frequency is not allowed if " "MessageType is ConfigurationItemChangeNotification or " "OversizedConfigurationItemChangeNotification" ) else: # If no value is specified, use a default value for # maximum_execution_frequency for message types representing a # periodic trigger. if message_type in [ "ScheduledNotification", "ConfigurationSnapshotDeliveryCompleted", ]: maximum_execution_frequency = SourceDetail.DEFAULT_FREQUENCY self.event_source = event_source self.message_type = message_type self.maximum_execution_frequency = maximum_execution_frequency class Source(ConfigEmptyDictable): """Defines rule owner, id and notification for triggering evaluation.""" OWNERS = {"AWS", "CUSTOM_LAMBDA"} def __init__( self, account_id: str, region: str, owner: str, source_identifier: str, source_details: Optional[SourceDetail] = None, ): super().__init__(capitalize_start=True, capitalize_arn=False) if owner not in Source.OWNERS: raise ValidationException( f"Value '{owner}' at 'configRule.source.owner' failed to " f"satisfy constraint: Member must satisfy enum value set: {{" + ", ".join(sorted(Source.OWNERS)) + "}" ) if owner == "AWS": # Can the Source ID be found in the dict of managed rule IDs? if source_identifier not in MANAGED_RULES_CONSTRAINTS: raise InvalidParameterValueException( f"The sourceIdentifier {source_identifier} is invalid. " f"Please refer to the documentation for a list of valid " f"sourceIdentifiers that can be used when AWS is the Owner" ) if source_details: raise InvalidParameterValueException( "SourceDetails should be null/empty if the owner is AWS. " "SourceDetails should be provided if the owner is " "CUSTOM_LAMBDA" ) self.owner = owner self.source_identifier = source_identifier self.source_details = None return # Otherwise, owner == "CUSTOM_LAMBDA" if not source_details: raise InvalidParameterValueException( "SourceDetails should be null/empty if the owner is AWS. " "SourceDetails should be provided if the owner is CUSTOM_LAMBDA" ) # Import is slow and as it's not needed for all config service # operations, only load it if needed. from moto.awslambda.utils import get_backend try: get_backend(account_id, region).get_function(source_identifier) except Exception: raise InsufficientPermissionsException( f"The AWS Lambda function {source_identifier} cannot be " f"invoked. Check the specified function ARN, and check the " f"function's permissions" ) details = [] for detail in source_details: # type: ignore[attr-defined] detail_dict = convert_to_class_args(detail) details.append(SourceDetail(**detail_dict)) self.source_details = details self.owner = owner self.source_identifier = source_identifier def to_dict(self) -> Dict[str, Any]: """Format the SourceDetails properly.""" result = super().to_dict() if self.source_details: result["SourceDetails"] = [x.to_dict() for x in self.source_details] return result class ConfigRule(ConfigEmptyDictable): """AWS Config Rule to evaluate compliance of resources to configuration. Can be a managed or custom config rule. Contains the instantiations of the Source and SourceDetail classes, and optionally the Scope class. """ MAX_RULES = 150 RULE_STATES = {"ACTIVE", "DELETING", "DELETING_RESULTS", "EVALUATING"} def __init__( self, account_id: str, region: str, config_rule: Dict[str, Any], tags: Dict[str, str], ): super().__init__(capitalize_start=True, capitalize_arn=False) self.account_id = account_id self.config_rule_name = config_rule.get("ConfigRuleName") if config_rule.get("ConfigRuleArn") or config_rule.get("ConfigRuleId"): raise InvalidParameterValueException( "ConfigRule Arn and Id can not be specified when creating a " "new ConfigRule. ConfigRule Arn and Id are generated by the " "service. Please try the request again without specifying " "ConfigRule Arn or Id" ) self.maximum_execution_frequency = None # keeps pylint happy self.modify_fields(region, config_rule, tags) self.config_rule_id = f"config-rule-{random_string():.6}" self.config_rule_arn = f"arn:{get_partition(region)}:config:{region}:{account_id}:config-rule/{self.config_rule_id}" def modify_fields( self, region: str, config_rule: Dict[str, Any], tags: Dict[str, str] ) -> None: """Initialize or update ConfigRule fields.""" self.config_rule_state = config_rule.get("ConfigRuleState", "ACTIVE") if self.config_rule_state not in ConfigRule.RULE_STATES: raise ValidationException( f"Value '{self.config_rule_state}' at " f"'configRule.configRuleState' failed to satisfy constraint: " f"Member must satisfy enum value set: {{" + ", ".join(sorted(ConfigRule.RULE_STATES)) + "}" ) if self.config_rule_state != "ACTIVE": raise InvalidParameterValueException( f"The ConfigRuleState {self.config_rule_state} is invalid. " f"Only the following values are permitted: ACTIVE" ) self.description = config_rule.get("Description") self.scope = None if "Scope" in config_rule: scope_dict = convert_to_class_args(config_rule["Scope"]) self.scope = Scope(**scope_dict) source_dict = convert_to_class_args(config_rule["Source"]) self.source = Source(self.account_id, region, **source_dict) self.input_parameters = config_rule.get("InputParameters") self.input_parameters_dict = {} if self.input_parameters: try: # A dictionary will be more useful when these parameters # are actually needed. self.input_parameters_dict = json.loads(self.input_parameters) except ValueError: raise InvalidParameterValueException( # pylint: disable=raise-missing-from f"Invalid json {self.input_parameters} passed in the " f"InputParameters field" ) self.maximum_execution_frequency = config_rule.get("MaximumExecutionFrequency") if self.maximum_execution_frequency: if self.maximum_execution_frequency not in SourceDetail.FREQUENCY_TYPES: raise ValidationException( f"Value '{self.maximum_execution_frequency}' at " f"'configRule.maximumExecutionFrequency' failed to " f"satisfy constraint: Member must satisfy enum value set: {{" + ", ".join(sorted(SourceDetail.FREQUENCY_TYPES)) + "}" ) # For an AWS managed rule, validate the parameters and trigger type. # Verify the MaximumExecutionFrequency makes sense as well. if self.source.owner == "AWS": self.validate_managed_rule() else: # Per the AWS documentation for a custom rule, ConfigRule's # MaximumExecutionFrequency can only be set if the message type # is ConfigSnapshotDeliveryProperties. However, if # ConfigSnapshotDeliveryProperties is used, the AWS console # leaves the Trigger Type blank and doesn't show the frequency. # If you edit the rule, it doesn't show the frequency either. # # If you provide two custom rules, one with a message type of # ConfigurationSnapshotDeliveryCompleted, one with # ScheduleNotification and specify a MaximumExecutionFrequency # for each, the first one is shown on the AWS console and the # second frequency is shown on the edit page. # # If you provide a custom rule for # OversizedConfigurationItemChangeNotification (not a periodic # trigger) with a MaximumExecutionFrequency for ConfigRule itself, # boto3 doesn't complain and describe_config_rule() shows the # frequency, but the AWS console and the edit page do not. # # So I'm not sure how to validate this situation or when to # set this value to a default value. pass self.created_by = config_rule.get("CreatedBy") if self.created_by: raise InvalidParameterValueException( "AWS Config populates the CreatedBy field for " "ServiceLinkedConfigRule. Try again without populating the " "CreatedBy field" ) self.last_updated_time = datetime2int(utcnow()) self.tags = tags def validate_managed_rule(self) -> None: """Validate parameters specific to managed rules.""" rule_info = MANAGED_RULES_CONSTRAINTS[self.source.source_identifier] param_names = self.input_parameters_dict.keys() # Verify input parameter names are actual parameters for the rule ID. if param_names: allowed_names = {x["Name"] for x in rule_info["Parameters"]} if not set(param_names).issubset(allowed_names): raise InvalidParameterValueException( f"Unknown parameters provided in the inputParameters: {self.input_parameters}" ) # Verify all the required parameters are specified. required_names = { x["Name"] for x in rule_info["Parameters"] if not x["Optional"] } diffs = required_names.difference(set(param_names)) if diffs: raise InvalidParameterValueException( "The required parameter [" + ", ".join(sorted(diffs)) + "] is not present in the inputParameters" ) # boto3 doesn't appear to be checking for valid types in the # InputParameters. It did give an error if a unquoted number was # used: "Blank spaces are not acceptable for input parameter: # MinimumPasswordLength. InputParameters': # '{"RequireNumbers":"true","MinimumPasswordLength":10}' # but I'm not going to attempt to detect that error. I could # check for ints, floats, strings and stringmaps, but boto3 doesn't # check. # WARNING: The AWS documentation indicates MaximumExecutionFrequency # can be specified for managed rules triggered at a periodic frequency. # However, boto3 allows a MaximumExecutionFrequency to be specified # for a AWS managed rule regardless of the frequency type. Also of # interest: triggers of "Configuration Changes and Periodic", # i.e., both trigger types. But again, the trigger type is ignored. # if rule_info["Trigger type"] == "Configuration changes": # if self.maximum_execution_frequency: # raise InvalidParameterValueException( # "A maximum execution frequency is not allowed for " # "rules triggered by configuration changes" # ) # # WARNING: boto3's describe_config_rule is not showing the # MaximumExecutionFrequency value as being updated, but the AWS # console shows the default value on the console. The default value # is used even if the rule is non-periodic # if "Periodic" in rule_info["Trigger type"]: # if not self.maximum_execution_frequency: # self.maximum_execution_frequency = SourceDetail.DEFAULT_FREQUENCY # if not self.maximum_execution_frequency: # self.maximum_execution_frequency = SourceDetail.DEFAULT_FREQUENCY # Verify the rule is allowed for this region -- not yet implemented. class RetentionConfiguration(ConfigEmptyDictable): def __init__(self, retention_period_in_days: int, name: Optional[str] = None): super().__init__(capitalize_start=True, capitalize_arn=False) self.name = name or "default" self.retention_period_in_days = retention_period_in_days class ConfigBackend(BaseBackend): def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.recorders: Dict[str, ConfigRecorder] = {} self.delivery_channels: Dict[str, ConfigDeliveryChannel] = {} self.config_aggregators: Dict[str, ConfigAggregator] = {} self.aggregation_authorizations: Dict[str, ConfigAggregationAuthorization] = {} self.organization_conformance_packs: Dict[str, OrganizationConformancePack] = {} self.config_rules: Dict[str, ConfigRule] = {} self.config_schema: Optional[AWSServiceSpec] = None self.retention_configuration: Optional[RetentionConfiguration] = None def _validate_resource_types(self, resource_list: List[str]) -> None: if not self.config_schema: self.config_schema = AWSServiceSpec( path="data/config/2014-11-12/service-2.json" ) # Verify that each entry exists in the supported list: bad_list = [] for resource in resource_list: if resource not in self.config_schema.shapes["ResourceType"]["enum"]: bad_list.append(resource) if bad_list: raise InvalidResourceTypeException( bad_list, self.config_schema.shapes["ResourceType"]["enum"] ) def _validate_delivery_snapshot_properties( self, properties: Dict[str, Any] ) -> None: if not self.config_schema: self.config_schema = AWSServiceSpec( path="data/config/2014-11-12/service-2.json" ) # Verify that the deliveryFrequency is set to an acceptable value: if ( properties.get("deliveryFrequency", None) not in self.config_schema.shapes["MaximumExecutionFrequency"]["enum"] ): raise InvalidDeliveryFrequency( properties.get("deliveryFrequency", None), self.config_schema.shapes["MaximumExecutionFrequency"]["enum"], ) def put_configuration_aggregator( self, config_aggregator: Dict[str, Any] ) -> Dict[str, Any]: # Validate the name: config_aggr_name = config_aggregator["ConfigurationAggregatorName"] if len(config_aggr_name) > 256: raise NameTooLongException(config_aggr_name, "configurationAggregatorName") account_sources: Optional[List[AccountAggregatorSource]] = None org_source: Optional[OrganizationAggregationSource] = None # Tag validation: tags = validate_tags(config_aggregator.get("Tags", [])) # Exception if both AccountAggregationSources and # OrganizationAggregationSource are supplied: if config_aggregator.get("AccountAggregationSources") and config_aggregator.get( "OrganizationAggregationSource" ): raise InvalidParameterValueException( "The configuration aggregator cannot be created because your " "request contains both the AccountAggregationSource and the " "OrganizationAggregationSource. Include only one aggregation " "source and try again." ) # If neither are supplied: if not config_aggregator.get( "AccountAggregationSources" ) and not config_aggregator.get("OrganizationAggregationSource"): raise InvalidParameterValueException( "The configuration aggregator cannot be created because your " "request is missing either the AccountAggregationSource or " "the OrganizationAggregationSource. Include the " "appropriate aggregation source and try again." ) if config_aggregator.get("AccountAggregationSources"): # Currently, only 1 account aggregation source can be set: if len(config_aggregator["AccountAggregationSources"]) > 1: raise TooManyAccountSources( len(config_aggregator["AccountAggregationSources"]) ) account_sources = [] for source in config_aggregator["AccountAggregationSources"]: account_sources.append( AccountAggregatorSource( source["AccountIds"], aws_regions=source.get("AwsRegions"), all_aws_regions=source.get("AllAwsRegions"), ) ) else: org_source = OrganizationAggregationSource( config_aggregator["OrganizationAggregationSource"]["RoleArn"], aws_regions=config_aggregator["OrganizationAggregationSource"].get( "AwsRegions" ), all_aws_regions=config_aggregator["OrganizationAggregationSource"].get( "AllAwsRegions" ), ) # Grab the existing one if it exists and update it: if not self.config_aggregators.get(config_aggr_name): aggregator = ConfigAggregator( config_aggr_name, account_id=self.account_id, region=self.region_name, account_sources=account_sources, org_source=org_source, tags=tags, ) self.config_aggregators[config_aggr_name] = aggregator else: aggregator = self.config_aggregators[config_aggr_name] aggregator.tags = tags aggregator.account_aggregation_sources = account_sources aggregator.organization_aggregation_source = org_source aggregator.last_updated_time = datetime2int(utcnow()) return aggregator.to_dict() def describe_configuration_aggregators( self, names: List[str], token: str, limit: Optional[int] ) -> Dict[str, Any]: limit = DEFAULT_PAGE_SIZE if not limit or limit < 0 else limit agg_list = [] result: Dict[str, Any] = {"ConfigurationAggregators": []} if names: for name in names: if not self.config_aggregators.get(name): raise NoSuchConfigurationAggregatorException(number=len(names)) agg_list.append(name) else: agg_list = list(self.config_aggregators.keys()) # Empty? if not agg_list: return result # Sort by name: sorted_aggregators = sorted(agg_list) # Get the start: if not token: start = 0 else: # Tokens for this moto feature are just the next names of the items in the list: if not self.config_aggregators.get(token): raise InvalidNextTokenException() start = sorted_aggregators.index(token) # Get the list of items to collect: agg_list = sorted_aggregators[start : (start + limit)] result["ConfigurationAggregators"] = [ self.config_aggregators[agg].to_dict() for agg in agg_list ] if len(sorted_aggregators) > (start + limit): result["NextToken"] = sorted_aggregators[start + limit] return result def delete_configuration_aggregator(self, config_aggregator: str) -> None: if not self.config_aggregators.get(config_aggregator): raise NoSuchConfigurationAggregatorException() del self.config_aggregators[config_aggregator] def put_aggregation_authorization( self, authorized_account: str, authorized_region: str, tags: List[Dict[str, str]], ) -> Dict[str, Any]: # Tag validation: tag_dict = validate_tags(tags or []) # Does this already exist? key = f"{authorized_account}/{authorized_region}" agg_auth = self.aggregation_authorizations.get(key) if not agg_auth: agg_auth = ConfigAggregationAuthorization( self.account_id, self.region_name, authorized_account, authorized_region, tags=tag_dict, ) self.aggregation_authorizations[key] = agg_auth else: # Only update the tags: agg_auth.tags = tag_dict return agg_auth.to_dict() def describe_aggregation_authorizations( self, token: Optional[str], limit: Optional[int] ) -> Dict[str, Any]: limit = DEFAULT_PAGE_SIZE if not limit or limit < 0 else limit result: Dict[str, Any] = {"AggregationAuthorizations": []} if not self.aggregation_authorizations: return result # Sort by name: sorted_authorizations = sorted(self.aggregation_authorizations.keys()) # Get the start: if not token: start = 0 else: # Tokens for this moto feature are just the next names of the items in the list: if not self.aggregation_authorizations.get(token): raise InvalidNextTokenException() start = sorted_authorizations.index(token) # Get the list of items to collect: auth_list = sorted_authorizations[start : (start + limit)] result["AggregationAuthorizations"] = [ self.aggregation_authorizations[auth].to_dict() for auth in auth_list ] if len(sorted_authorizations) > (start + limit): result["NextToken"] = sorted_authorizations[start + limit] return result def delete_aggregation_authorization( self, authorized_account: str, authorized_region: str ) -> None: # This will always return a 200 -- regardless if there is or isn't an existing # aggregation authorization. key = f"{authorized_account}/{authorized_region}" self.aggregation_authorizations.pop(key, None) def put_configuration_recorder(self, config_recorder: Dict[str, Any]) -> None: # Validate the name: if not config_recorder.get("name"): raise InvalidConfigurationRecorderNameException(config_recorder.get("name")) if len(config_recorder["name"]) > 256: raise NameTooLongException( config_recorder["name"], "configurationRecorder.name" ) # We're going to assume that the passed in Role ARN is correct. # Config currently only allows 1 configuration recorder for an account: if len(self.recorders) == 1 and not self.recorders.get(config_recorder["name"]): raise MaxNumberOfConfigurationRecordersExceededException( config_recorder["name"] ) # Is this updating an existing one? recorder_status = None if self.recorders.get(config_recorder["name"]): recorder_status = self.recorders[config_recorder["name"]].status # Validate the Recording Group: if config_recorder.get("recordingGroup") is None: recording_group = RecordingGroup( all_supported=True, include_global_resource_types=False, resource_types=[], exclusion_by_resource_types={"resourceTypes": []}, recording_strategy={"useOnly": "ALL_SUPPORTED_RESOURCE_TYPES"}, ) else: rgroup = config_recorder["recordingGroup"] # If an empty dict is passed in, then bad: if not rgroup: raise InvalidRecordingGroupException() # Recording strategy must be one of the allowed enums: recording_strategy = rgroup.get("recordingStrategy", {}).get( "useOnly", None ) if recording_strategy not in { None, "ALL_SUPPORTED_RESOURCE_TYPES", "INCLUSION_BY_RESOURCE_TYPES", "EXCLUSION_BY_RESOURCE_TYPES", }: raise ValidationException( f"1 validation error detected: Value '{recording_strategy}' at 'configurationRecorder.recordingGroup.recordingStrategy.useOnly' failed to satisfy constraint:" f" Member must satisfy enum value set: [INCLUSION_BY_RESOURCE_TYPES, ALL_SUPPORTED_RESOURCE_TYPES, EXCLUSION_BY_RESOURCE_TYPES]" ) # Validate the allSupported: if rgroup.get("allSupported", False): if ( rgroup.get("resourceTypes", []) or ( rgroup.get("exclusionByResourceTypes", {"resourceTypes": []}) != {"resourceTypes": []} ) or recording_strategy not in {None, "ALL_SUPPORTED_RESOURCE_TYPES"} ): raise InvalidRecordingGroupException() recording_group = RecordingGroup( all_supported=True, include_global_resource_types=rgroup.get( "includeGlobalResourceTypes", False ), resource_types=[], exclusion_by_resource_types={"resourceTypes": []}, recording_strategy={"useOnly": "ALL_SUPPORTED_RESOURCE_TYPES"}, ) # Validate the specifically passed in resource types: elif rgroup.get("resourceTypes", []): if ( rgroup.get("includeGlobalResourceTypes", False) or ( rgroup.get("exclusionByResourceTypes", {"resourceTypes": []}) != {"resourceTypes": []} ) or recording_strategy not in {None, "INCLUSION_BY_RESOURCE_TYPES"} ): raise InvalidRecordingGroupException() # Validate that the resource list provided is correct: self._validate_resource_types(rgroup["resourceTypes"]) recording_group = RecordingGroup( all_supported=False, include_global_resource_types=False, resource_types=rgroup["resourceTypes"], exclusion_by_resource_types={"resourceTypes": []}, recording_strategy={"useOnly": "INCLUSION_BY_RESOURCE_TYPES"}, ) # Validate the excluded resource types: elif rgroup.get("exclusionByResourceTypes", {}): if not rgroup["exclusionByResourceTypes"].get("resourceTypes", []): raise InvalidRecordingGroupException() # The recording strategy must be provided for exclusions. if ( rgroup.get("includeGlobalResourceTypes", False) or recording_strategy != "EXCLUSION_BY_RESOURCE_TYPES" ): raise InvalidRecordingGroupException() # Validate that the resource list provided is correct: self._validate_resource_types( rgroup["exclusionByResourceTypes"]["resourceTypes"] ) recording_group = RecordingGroup( all_supported=False, include_global_resource_types=False, resource_types=[], exclusion_by_resource_types=rgroup["exclusionByResourceTypes"], recording_strategy={"useOnly": "EXCLUSION_BY_RESOURCE_TYPES"}, ) # If the resourceTypes is an empty list, this will be reached: else: raise InvalidRecordingGroupException() self.recorders[config_recorder["name"]] = ConfigRecorder( config_recorder["roleARN"], recording_group, name=config_recorder["name"], status=recorder_status, ) def describe_configuration_recorders( self, recorder_names: Optional[List[str]] ) -> List[Dict[str, Any]]: recorders: List[Dict[str, Any]] = [] if recorder_names: for rname in recorder_names: if not self.recorders.get(rname): raise NoSuchConfigurationRecorderException(rname) # Format the recorder: recorders.append(self.recorders[rname].to_dict()) else: for recorder in self.recorders.values(): recorders.append(recorder.to_dict()) return recorders def describe_configuration_recorder_status( self, recorder_names: List[str] ) -> List[Dict[str, Any]]: recorders: List[Dict[str, Any]] = [] if recorder_names: for rname in recorder_names: if not self.recorders.get(rname): raise NoSuchConfigurationRecorderException(rname) # Format the recorder: recorders.append(self.recorders[rname].status.to_dict()) else: for recorder in self.recorders.values(): recorders.append(recorder.status.to_dict()) return recorders def put_delivery_channel(self, delivery_channel: Dict[str, Any]) -> None: # Must have a configuration recorder: if not self.recorders: raise NoAvailableConfigurationRecorderException() # Validate the name: if not delivery_channel.get("name"): raise InvalidDeliveryChannelNameException(delivery_channel.get("name")) if len(delivery_channel["name"]) > 256: raise NameTooLongException(delivery_channel["name"], "deliveryChannel.name") # We are going to assume that the bucket exists -- but will verify if # the bucket provided is blank: if not delivery_channel.get("s3BucketName"): raise NoSuchBucketException() # We are going to assume that the bucket has the correct policy # attached to it. We are only going to verify # if the prefix provided is not an empty string: if delivery_channel.get("s3KeyPrefix", None) == "": raise InvalidS3KeyPrefixException() # Ditto for SNS -- Only going to assume that the ARN provided is not # an empty string: # NOTE: SNS "ARN" is all caps, but KMS "Arn" is UpperCamelCase! if delivery_channel.get("snsTopicARN", None) == "": raise InvalidSNSTopicARNException() # Ditto for S3 KMS Key ARN -- Only going to assume that the ARN provided is not # an empty string: if delivery_channel.get("s3KmsKeyArn", None) == "": raise InvalidS3KmsKeyArnException() # Config currently only allows 1 delivery channel for an account: if len(self.delivery_channels) == 1 and not self.delivery_channels.get( delivery_channel["name"] ): raise MaxNumberOfDeliveryChannelsExceededException(delivery_channel["name"]) if not delivery_channel.get("configSnapshotDeliveryProperties"): dprop = None else: # Validate the config snapshot delivery properties: self._validate_delivery_snapshot_properties( delivery_channel["configSnapshotDeliveryProperties"] ) dprop = ConfigDeliverySnapshotProperties( delivery_channel["configSnapshotDeliveryProperties"][ "deliveryFrequency" ] ) self.delivery_channels[delivery_channel["name"]] = ConfigDeliveryChannel( delivery_channel["name"], delivery_channel["s3BucketName"], prefix=delivery_channel.get("s3KeyPrefix", None), s3_kms_key_arn=delivery_channel.get("s3KmsKeyArn", None), sns_arn=delivery_channel.get("snsTopicARN", None), snapshot_properties=dprop, ) def describe_delivery_channels( self, channel_names: List[str] ) -> List[Dict[str, Any]]: channels: List[Dict[str, Any]] = [] if channel_names: for cname in channel_names: if not self.delivery_channels.get(cname): raise NoSuchDeliveryChannelException(cname) # Format the delivery channel: channels.append(self.delivery_channels[cname].to_dict()) else: for channel in self.delivery_channels.values(): channels.append(channel.to_dict()) return channels def start_configuration_recorder(self, recorder_name: str) -> None: if not self.recorders.get(recorder_name): raise NoSuchConfigurationRecorderException(recorder_name) # Must have a delivery channel available as well: if not self.delivery_channels: raise NoAvailableDeliveryChannelException() # Start recording: self.recorders[recorder_name].status.start() def stop_configuration_recorder(self, recorder_name: str) -> None: if not self.recorders.get(recorder_name): raise NoSuchConfigurationRecorderException(recorder_name) # Stop recording: self.recorders[recorder_name].status.stop() def delete_configuration_recorder(self, recorder_name: str) -> None: if not self.recorders.get(recorder_name): raise NoSuchConfigurationRecorderException(recorder_name) del self.recorders[recorder_name] def delete_delivery_channel(self, channel_name: str) -> None: if not self.delivery_channels.get(channel_name): raise NoSuchDeliveryChannelException(channel_name) # Check if a channel is recording -- if so, bad -- (there can only be 1 recorder): for recorder in self.recorders.values(): if recorder.status.recording: raise LastDeliveryChannelDeleteFailedException(channel_name) del self.delivery_channels[channel_name] def list_discovered_resources( self, resource_type: str, resource_ids: List[str], resource_name: str, limit: int, next_token: str, ) -> Dict[str, Any]: """Queries against AWS Config (non-aggregated) listing function. The listing function must exist for the resource backend. :param resource_type: :param backend_region: :param ids: :param name: :param limit: :param next_token: :return: """ identifiers: List[Dict[str, Any]] = [] new_token = None limit = limit or DEFAULT_PAGE_SIZE if limit > DEFAULT_PAGE_SIZE: raise InvalidLimitException(limit) if resource_ids and resource_name: raise InvalidResourceParameters() # Only 20 maximum Resource IDs: if resource_ids and len(resource_ids) > 20: raise TooManyResourceIds() # If resource type exists and the backend region is implemented in # moto, then call upon the resource type's Config Query class to # retrieve the list of resources that match the criteria: if RESOURCE_MAP.get(resource_type, {}): # Always provide the backend this request arrived from. backend_query_region = self.region_name # Is this a global resource type? -- if so, use the partition if ( RESOURCE_MAP[resource_type] .backends[self.account_id] .get(self.partition) ): backend_region = self.partition else: backend_region = self.region_name # For non-aggregated queries, we only care about the # backend_region. Need to verify that moto has implemented # the region for the given backend: if ( RESOURCE_MAP[resource_type] .backends[self.account_id] .get(backend_region) ): # Fetch the resources for the backend's region: identifiers, new_token = RESOURCE_MAP[ resource_type ].list_config_service_resources( self.account_id, partition=self.partition, resource_ids=resource_ids, resource_name=resource_name, limit=limit, next_token=next_token, backend_region=backend_query_region, ) resource_identifiers = [] for identifier in identifiers: item = {"resourceType": identifier["type"], "resourceId": identifier["id"]} # Some resource types lack names: if identifier.get("name"): item["resourceName"] = identifier["name"] resource_identifiers.append(item) result: Dict[str, Any] = {"resourceIdentifiers": resource_identifiers} if new_token: result["nextToken"] = new_token return result def list_aggregate_discovered_resources( self, aggregator_name: str, resource_type: str, filters: Dict[str, str], limit: Optional[int], next_token: Optional[str], ) -> Dict[str, Any]: """Queries AWS Config listing function that must exist for resource backend. As far a moto goes -- the only real difference between this function and the `list_discovered_resources` function is that this will require a Config Aggregator be set up a priori and can search based on resource regions. :param aggregator_name: :param resource_type: :param filters: :param limit: :param next_token: :return: """ if not self.config_aggregators.get(aggregator_name): raise NoSuchConfigurationAggregatorException() identifiers: List[Dict[str, Any]] = [] new_token = None filters = filters or {} limit = limit or DEFAULT_PAGE_SIZE if limit > DEFAULT_PAGE_SIZE: raise InvalidLimitException(limit) # If the resource type exists and the backend region is implemented # in moto, then call upon the resource type's Config Query class to # retrieve the list of resources that match the criteria: if RESOURCE_MAP.get(resource_type, {}): # We only care about a filter's Region, Resource Name, and Resource ID: resource_region = filters.get("Region") resource_id = [filters["ResourceId"]] if filters.get("ResourceId") else None resource_name = filters.get("ResourceName") identifiers, new_token = RESOURCE_MAP[ resource_type ].list_config_service_resources( self.account_id, partition=self.partition, resource_ids=resource_id, resource_name=resource_name, limit=limit, next_token=next_token, resource_region=resource_region, aggregator=self.config_aggregators.get(aggregator_name).__dict__, ) resource_identifiers = [] for identifier in identifiers: item = { "SourceAccountId": self.account_id, "SourceRegion": identifier["region"], "ResourceType": identifier["type"], "ResourceId": identifier["id"], } if identifier.get("name"): item["ResourceName"] = identifier["name"] resource_identifiers.append(item) result: Dict[str, Any] = {"ResourceIdentifiers": resource_identifiers} if new_token: result["NextToken"] = new_token return result def get_resource_config_history( self, resource_type: str, resource_id: str, backend_region: str ) -> Dict[str, Any]: """Returns configuration of resource for the current regional backend. Item returned in AWS Config format. NOTE: This is --NOT-- returning history as it is not supported in moto at this time. (PR's welcome!) As such, the later_time, earlier_time, limit, and next_token are ignored as this will only return 1 item. (If no items, it raises an exception). """ # If the type isn't implemented then we won't find the item: if resource_type not in RESOURCE_MAP: raise ResourceNotDiscoveredException(resource_type, resource_id) # Always provide the backend this request arrived from. backend_query_region = backend_region # Is the resource type global? partition = get_partition(backend_region) if RESOURCE_MAP[resource_type].backends[self.account_id].get(partition): backend_region = partition # If the backend region isn't implemented then we won't find the item: if ( not RESOURCE_MAP[resource_type] .backends[self.account_id] .get(backend_region) ): raise ResourceNotDiscoveredException(resource_type, resource_id) # Get the item: item = RESOURCE_MAP[resource_type].get_config_resource( account_id=self.account_id, partition=self.partition, resource_id=resource_id, backend_region=backend_query_region, ) if not item: raise ResourceNotDiscoveredException(resource_type, resource_id) item["accountId"] = self.account_id return {"configurationItems": [item]} def batch_get_resource_config( self, resource_keys: List[Dict[str, str]], backend_region: str ) -> Dict[str, Any]: """Returns configuration of resource for the current regional backend. Item is returned in AWS Config format. :param resource_keys: :param backend_region: """ # Can't have more than 100 items if len(resource_keys) > 100: raise TooManyResourceKeys( ["com.amazonaws.starling.dove.ResourceKey@12345"] * len(resource_keys) ) results = [] for resource in resource_keys: # Does the resource type exist? if not RESOURCE_MAP.get(resource["resourceType"]): # Not found so skip. continue config_backend_region = backend_region # Always provide the backend this request arrived from. backend_query_region = backend_region # Is the resource type global? partition = get_partition(backend_region) if ( RESOURCE_MAP[resource["resourceType"]] .backends[self.account_id] .get(partition) ): config_backend_region = partition # If the backend region isn't implemented then we won't find the item: if ( not RESOURCE_MAP[resource["resourceType"]] .backends[self.account_id] .get(config_backend_region) ): continue # Get the item: item = RESOURCE_MAP[resource["resourceType"]].get_config_resource( self.account_id, partition=self.partition, resource_id=resource["resourceId"], backend_region=backend_query_region, ) if not item: continue item["accountId"] = self.account_id results.append(item) return { "baseConfigurationItems": results, "unprocessedResourceKeys": [], } # At this time, moto is not adding unprocessed items. def batch_get_aggregate_resource_config( self, aggregator_name: str, resource_identifiers: List[Dict[str, str]] ) -> Dict[str, Any]: """Returns configuration of resource for current regional backend. Item is returned in AWS Config format. As far a moto goes -- the only real difference between this function and the `batch_get_resource_config` function is that this will require a Config Aggregator be set up a priori and can search based on resource regions. Note: moto will IGNORE the resource account ID in the search query. """ if not self.config_aggregators.get(aggregator_name): raise NoSuchConfigurationAggregatorException() # Can't have more than 100 items if len(resource_identifiers) > 100: raise TooManyResourceKeys( ["com.amazonaws.starling.dove.AggregateResourceIdentifier@12345"] * len(resource_identifiers) ) found = [] not_found = [] for identifier in resource_identifiers: resource_type = identifier["ResourceType"] resource_region = identifier["SourceRegion"] resource_id = identifier["ResourceId"] resource_name = identifier.get("ResourceName", None) # Does the resource type exist? if not RESOURCE_MAP.get(resource_type): not_found.append(identifier) continue # Get the item: item = RESOURCE_MAP[resource_type].get_config_resource( self.account_id, partition=self.partition, resource_id=resource_id, resource_name=resource_name, resource_region=resource_region, ) if not item: not_found.append(identifier) continue item["accountId"] = self.account_id # The 'tags' field is not included in aggregate results for some reason... item.pop("tags", None) found.append(item) return { "BaseConfigurationItems": found, "UnprocessedResourceIdentifiers": not_found, } def put_evaluations( self, evaluations: Optional[List[Dict[str, Any]]] = None, result_token: Optional[str] = None, test_mode: Optional[bool] = False, ) -> Dict[str, List[Any]]: if not evaluations: raise InvalidParameterValueException( "The Evaluations object in your request cannot be null." "Add the required parameters and try again." ) if not result_token: raise InvalidResultTokenException() # Moto only supports PutEvaluations with test mode currently # (missing rule and token support). if not test_mode: raise NotImplementedError( "PutEvaluations without TestMode is not yet implemented" ) return { "FailedEvaluations": [], } # At this time, moto is not adding failed evaluations. def put_organization_conformance_pack( self, name: str, template_s3_uri: str, template_body: str, delivery_s3_bucket: str, delivery_s3_key_prefix: str, input_parameters: List[Dict[str, str]], excluded_accounts: List[str], ) -> Dict[str, Any]: # a real validation of the content of the template is missing at the moment if not template_s3_uri and not template_body: raise ValidationException("Template body is invalid") if not re.match(r"s3://.*", template_s3_uri): raise ValidationException( "1 validation error detected: " f"Value '{template_s3_uri}' at 'templateS3Uri' failed to satisfy constraint: " "Member must satisfy regular expression pattern: " "s3://.*" ) pack = self.organization_conformance_packs.get(name) if pack: pack.update( delivery_s3_bucket=delivery_s3_bucket, delivery_s3_key_prefix=delivery_s3_key_prefix, input_parameters=input_parameters, excluded_accounts=excluded_accounts, ) else: pack = OrganizationConformancePack( account_id=self.account_id, region=self.region_name, name=name, delivery_s3_bucket=delivery_s3_bucket, delivery_s3_key_prefix=delivery_s3_key_prefix, input_parameters=input_parameters, excluded_accounts=excluded_accounts, ) self.organization_conformance_packs[name] = pack return { "OrganizationConformancePackArn": pack.organization_conformance_pack_arn } def describe_organization_conformance_packs( self, names: List[str] ) -> Dict[str, Any]: packs = [] for name in names: pack = self.organization_conformance_packs.get(name) if not pack: raise NoSuchOrganizationConformancePackException( "One or more organization conformance packs with " "specified names are not present. Ensure your names are " "correct and try your request again later." ) packs.append(pack.to_dict()) return {"OrganizationConformancePacks": packs} def describe_organization_conformance_pack_statuses( self, names: List[str] ) -> Dict[str, Any]: packs = [] statuses = [] if names: for name in names: pack = self.organization_conformance_packs.get(name) if not pack: raise NoSuchOrganizationConformancePackException( "One or more organization conformance packs with " "specified names are not present. Ensure your names " "are correct and try your request again later." ) packs.append(pack) else: packs = list(self.organization_conformance_packs.values()) for pack in packs: statuses.append( { "OrganizationConformancePackName": pack.organization_conformance_pack_name, "Status": pack._status, "LastUpdateTime": pack.last_update_time, } ) return {"OrganizationConformancePackStatuses": statuses} def get_organization_conformance_pack_detailed_status( self, name: str ) -> Dict[str, Any]: pack = self.organization_conformance_packs.get(name) if not pack: raise NoSuchOrganizationConformancePackException( "One or more organization conformance packs with specified names are not present. " "Ensure your names are correct and try your request again later." ) # actually here would be a list of all accounts in the organization statuses = [ { "AccountId": self.account_id, "ConformancePackName": f"OrgConformsPack-{pack._unique_pack_name}", "Status": pack._status, "LastUpdateTime": datetime2int(utcnow()), } ] return {"OrganizationConformancePackDetailedStatuses": statuses} def delete_organization_conformance_pack(self, name: str) -> None: pack = self.organization_conformance_packs.get(name) if not pack: raise NoSuchOrganizationConformancePackException( "Could not find an OrganizationConformancePack for given " f"request with resourceName {name}" ) self.organization_conformance_packs.pop(name) def _match_arn( self, resource_arn: str ) -> Union[ConfigRule, ConfigAggregator, ConfigAggregationAuthorization]: """Return config instance that has a matching ARN.""" # The allowed resources are ConfigRule, ConfigurationAggregator, # and AggregatorAuthorization. allowed_resources = { "configuration_aggregator_arn": self.config_aggregators, "aggregation_authorization_arn": self.aggregation_authorizations, "config_rule_arn": self.config_rules, } # Find matching config for given resource_arn among all the # allowed config resources. matched_config = None for arn_attribute, configs in allowed_resources.items(): for config in configs.values(): # type: ignore[attr-defined] if resource_arn == getattr(config, arn_attribute): matched_config = config break if not matched_config: raise ResourceNotFoundException(resource_arn) return matched_config def tag_resource(self, resource_arn: str, tags: List[Dict[str, str]]) -> None: """Add tags in config with a matching ARN.""" # Tag validation: tag_dict = validate_tags(tags) # Find config with a matching ARN. matched_config = self._match_arn(resource_arn) # Merge the new tags with the existing tags. matched_config.tags.update(tag_dict) def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: """Remove tags in config with a matching ARN. If the tags in the tag_keys don't match any keys for that ARN, they're just ignored. """ if len(tag_keys) > MAX_TAGS_IN_ARG: raise TooManyTags(tag_keys) # Find config with a matching ARN. matched_config = self._match_arn(resource_arn) for tag_key in tag_keys: matched_config.tags.pop(tag_key, None) def list_tags_for_resource( self, resource_arn: str, limit: int ) -> Dict[str, List[Dict[str, str]]]: """Return list of tags for AWS Config resource.""" # The limit argument is essentially ignored as a config instance # can only have 50 tags, but we'll check the argument anyway. # Although the boto3 documentation indicates the limit is 50, boto3 # accepts a limit value up to 100 as does the AWS CLI. limit = limit or DEFAULT_PAGE_SIZE if limit > DEFAULT_PAGE_SIZE: raise InvalidLimitException(limit) matched_config = self._match_arn(resource_arn) return { "Tags": [ {"Key": k, "Value": v} for k, v in sorted(matched_config.tags.items()) ] } def put_config_rule( self, config_rule: Dict[str, Any], tags: Optional[List[Dict[str, str]]] = None ) -> str: """Add/Update config rule for evaluating resource compliance. TBD - Only the "accounting" of config rules are handled at the moment. No events are created or triggered. There is no interaction with the config recorder. """ # If there is no rule_name, use the ARN or ID to get the # rule_name. rule_name = config_rule.get("ConfigRuleName") if rule_name: if len(rule_name) > 128: raise NameTooLongException(rule_name, "configRule.configRuleName", 128) else: # Can we find the rule using the ARN or ID? rule_arn = config_rule.get("ConfigRuleArn") rule_id = config_rule.get("ConfigRuleId") if not rule_arn and not rule_id: raise InvalidParameterValueException( "One or more identifiers needs to be provided. Provide " "Name or Id or Arn" ) for config_rule_obj in self.config_rules.values(): if rule_id and config_rule_obj.config_rule_id == rule_id: rule_name = config_rule_obj.config_rule_name break if rule_arn and config_rule_obj.config_rule_arn == rule_arn: rule_name = config_rule_obj.config_rule_name break if not rule_name: raise InvalidParameterValueException( "One or more identifiers needs to be provided. Provide " "Name or Id or Arn" ) tag_dict = validate_tags(tags or []) # With the rule_name, determine whether it's for an existing rule # or whether a new rule should be created. rule = self.config_rules.get(rule_name) if rule: # Rule exists. Make sure it isn't in use for another activity. rule_state = rule.config_rule_state if rule_state != "ACTIVE": activity = "deleted" if rule_state.startswith("DELET") else "evaluated" raise ResourceInUseException( f"The rule {rule_name} is currently being {activity}. " f"Please retry after some time" ) # Update the current rule. rule.modify_fields(self.region_name, config_rule, tag_dict) else: # Create a new ConfigRule if the limit hasn't been reached. if len(self.config_rules) == ConfigRule.MAX_RULES: raise MaxNumberOfConfigRulesExceededException( rule_name, ConfigRule.MAX_RULES ) rule = ConfigRule(self.account_id, self.region_name, config_rule, tag_dict) self.config_rules[rule_name] = rule return "" def describe_config_rules( self, config_rule_names: Optional[List[str]], next_token: Optional[str] ) -> Dict[str, Any]: """Return details for the given ConfigRule names or for all rules.""" result: Dict[str, Any] = {"ConfigRules": []} if not self.config_rules: return result rule_list = [] if config_rule_names: for name in config_rule_names: if not self.config_rules.get(name): raise NoSuchConfigRuleException(name) rule_list.append(name) else: rule_list = list(self.config_rules.keys()) # The rules are not sorted alphanumerically. sorted_rules = sorted(rule_list) start = 0 if next_token: if not self.config_rules.get(next_token): raise InvalidNextTokenException() start = sorted_rules.index(next_token) rule_list = sorted_rules[start : start + CONFIG_RULE_PAGE_SIZE] result["ConfigRules"] = [self.config_rules[x].to_dict() for x in rule_list] if len(sorted_rules) > (start + CONFIG_RULE_PAGE_SIZE): result["NextToken"] = sorted_rules[start + CONFIG_RULE_PAGE_SIZE] return result def delete_config_rule(self, rule_name: str) -> None: """Delete config rule used for evaluating resource compliance.""" rule = self.config_rules.get(rule_name) if not rule: raise NoSuchConfigRuleException(rule_name) # The following logic is not applicable for moto as far as I can tell. # if rule.config_rule_state == "DELETING": # raise ResourceInUseException( # f"The rule {rule_name} is currently being deleted. Please " # f"retry after some time" # ) rule.config_rule_state = "DELETING" self.config_rules.pop(rule_name) def put_retention_configuration( self, retention_period_in_days: int ) -> Dict[str, Any]: """Creates a Retention Configuration.""" if retention_period_in_days < 30: raise ValidationException( f"Value '{retention_period_in_days}' at 'retentionPeriodInDays' failed to satisfy constraint: Member must have value greater than or equal to 30" ) if retention_period_in_days > 2557: raise ValidationException( f"Value '{retention_period_in_days}' at 'retentionPeriodInDays' failed to satisfy constraint: Member must have value less than or equal to 2557" ) self.retention_configuration = RetentionConfiguration(retention_period_in_days) return {"RetentionConfiguration": self.retention_configuration.to_dict()} def describe_retention_configurations( self, retention_configuration_names: Optional[List[str]] ) -> List[Dict[str, Any]]: """ This will return the retention configuration if one is present. This should only receive at most 1 name in. It will raise a ValidationException if more than 1 is supplied. """ # Handle the cases where we get a retention name to search for: if retention_configuration_names: if len(retention_configuration_names) > 1: raise ValidationException( f"Value '{retention_configuration_names}' at 'retentionConfigurationNames' failed to satisfy constraint: Member must have length less than or equal to 1" ) # If we get a retention name to search for, and we don't have it, then we need to raise an exception: if ( not self.retention_configuration or not self.retention_configuration.name == retention_configuration_names[0] ): raise NoSuchRetentionConfigurationException( retention_configuration_names[0] ) # If we found it, then return it: return [self.retention_configuration.to_dict()] # If no name was supplied: if self.retention_configuration: return [self.retention_configuration.to_dict()] return [] def delete_retention_configuration(self, retention_configuration_name: str) -> None: """This will delete the retention configuration if one is present with the provided name.""" if ( not self.retention_configuration or not self.retention_configuration.name == retention_configuration_name ): raise NoSuchRetentionConfigurationException(retention_configuration_name) self.retention_configuration = None config_backends = BackendDict(ConfigBackend, "config")
Memory