import base64 import json import os import re from typing import Any, Dict from moto.core.responses import BaseResponse from moto.kms.utils import RESERVED_ALIASE_TARGET_KEY_IDS, RESERVED_ALIASES from moto.utilities.utils import get_partition from .exceptions import ( AlreadyExistsException, NotAuthorizedException, NotFoundException, ValidationException, ) from .models import KmsBackend, kms_backends from .policy_validator import validate_policy class KmsResponse(BaseResponse): def __init__(self) -> None: super().__init__(service_name="kms") def _get_param(self, param_name: str, if_none: Any = None) -> Any: params = json.loads(self.body) for key in ("Plaintext", "CiphertextBlob", "Message"): if key in params: params[key] = base64.b64decode(params[key].encode("utf-8")) return params.get(param_name, if_none) @property def kms_backend(self) -> KmsBackend: return kms_backends[self.current_account][self.region] def _display_arn(self, key_id: str) -> str: if key_id.startswith("arn:"): return key_id if key_id.startswith("alias/"): id_type = "" else: id_type = "key/" return f"arn:{get_partition(self.region)}:kms:{self.region}:{self.current_account}:{id_type}{key_id}" def _validate_cmk_id(self, key_id: str) -> None: """Determine whether a CMK ID exists. - raw key ID - key ARN """ is_arn = key_id.startswith("arn:") and ":key/" in key_id # https://docs.aws.amazon.com/kms/latest/developerguide/multi-region-keys-overview.html # "Notice that multi-Region keys have a distinctive key ID that begins with mrk-. You can use the mrk- prefix to # identify MRKs programmatically." is_raw_key_id = re.match( r"^(mrk-)?[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$", key_id, re.IGNORECASE, ) if not is_arn and not is_raw_key_id: raise NotFoundException(f"Invalid keyId {key_id}") cmk_id = self.kms_backend.get_key_id(key_id) if cmk_id not in self.kms_backend.keys: raise NotFoundException(f"Key '{self._display_arn(key_id)}' does not exist") def _validate_alias(self, key_id: str) -> None: """Determine whether an alias exists. - alias name - alias ARN """ error = NotFoundException(f"Alias {self._display_arn(key_id)} is not found.") is_arn = key_id.startswith("arn:") and ":alias/" in key_id is_name = key_id.startswith("alias/") if not is_arn and not is_name: raise error alias_name = self.kms_backend.get_alias_name(key_id) cmk_id = self.kms_backend.get_key_id_from_alias(alias_name) if cmk_id is None: raise error def _validate_key_id(self, key_id: str) -> None: """Determine whether a key ID exists. - raw key ID - key ARN - alias name - alias ARN """ is_alias_arn = key_id.startswith("arn:") and ":alias/" in key_id is_alias_name = key_id.startswith("alias/") if is_alias_arn or is_alias_name: self._validate_alias(key_id) return self._validate_cmk_id(key_id) def _validate_key_policy(self, key_id: str, action: str) -> None: """ Validate whether the specified action is allowed, given the key policy """ key = self.kms_backend.describe_key(self.kms_backend.get_key_id(key_id)) validate_policy(key, action) def create_key(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateKey.html""" policy = self._get_param("Policy") key_usage = self._get_param("KeyUsage") key_spec = self._get_param("KeySpec") or self._get_param( "CustomerMasterKeySpec" ) description = self._get_param("Description") tags = self._get_param("Tags") multi_region = self._get_param("MultiRegion") origin = self._get_param("Origin") or "AWS_KMS" key = self.kms_backend.create_key( policy, key_usage, key_spec, description, tags, multi_region, origin ) return json.dumps(key.to_dict()) def replicate_key(self) -> str: key_id = self._get_param("KeyId") self._validate_key_id(key_id) replica_region = self._get_param("ReplicaRegion") replica_key = self.kms_backend.replicate_key(key_id, replica_region) return json.dumps( { "ReplicaKeyMetadata": replica_key.to_dict()["KeyMetadata"], "ReplicaPolicy": replica_key.generate_default_policy(), } ) def update_key_description(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_UpdateKeyDescription.html""" key_id = self._get_param("KeyId") description = self._get_param("Description") self._validate_cmk_id(key_id) self.kms_backend.update_key_description(key_id, description) return json.dumps(None) def tag_resource(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_TagResource.html""" key_id = self._get_param("KeyId") tags = self._get_param("Tags") self._validate_cmk_id(key_id) self.kms_backend.tag_resource(key_id, tags) return "{}" def untag_resource(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_UntagResource.html""" key_id = self._get_param("KeyId") tag_names = self._get_param("TagKeys") self._validate_cmk_id(key_id) self.kms_backend.untag_resource(key_id, tag_names) return "{}" def list_resource_tags(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_ListResourceTags.html""" key_id = self._get_param("KeyId") self._validate_cmk_id(key_id) tags: Dict[str, Any] = self.kms_backend.list_resource_tags(key_id) tags.update({"NextMarker": None, "Truncated": False}) return json.dumps(tags) def describe_key(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_DescribeKey.html""" key_id = self._get_param("KeyId") self._validate_key_id(key_id) self._validate_key_policy(key_id, "kms:DescribeKey") key = self.kms_backend.describe_key(self.kms_backend.get_key_id(key_id)) return json.dumps(key.to_dict()) def list_keys(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeys.html""" keys = self.kms_backend.list_keys() return json.dumps( { "Keys": [{"KeyArn": key.arn, "KeyId": key.id} for key in keys], "NextMarker": None, "Truncated": False, } ) def create_alias(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateAlias.html""" return self._set_alias() def update_alias(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_UpdateAlias.html""" return self._set_alias(update=True) def _set_alias(self, update: bool = False) -> str: alias_name = self._get_param("AliasName") target_key_id = self._get_param("TargetKeyId") if not alias_name.startswith("alias/"): raise ValidationException("Invalid identifier") if alias_name in RESERVED_ALIASES: raise NotAuthorizedException() if ":" in alias_name: raise ValidationException( f"{alias_name} contains invalid characters for an alias" ) if not re.match(r"^[a-zA-Z0-9:/_-]+$", alias_name): raise ValidationException( f"1 validation error detected: Value '{alias_name}' at 'aliasName' " "failed to satisfy constraint: Member must satisfy regular " "expression pattern: ^[a-zA-Z0-9:/_-]+$" ) if self.kms_backend.alias_exists(target_key_id): raise ValidationException("Aliases must refer to keys. Not aliases") if update: # delete any existing aliases with that name (should be a no-op if none exist) self.kms_backend.delete_alias(alias_name) if self.kms_backend.alias_exists(alias_name): raise AlreadyExistsException( f"An alias with the name arn:aws:kms:{self.region}:{self.current_account}:{alias_name} already exists" ) self._validate_cmk_id(target_key_id) if update: self.kms_backend.update_alias(target_key_id, alias_name) else: self.kms_backend.create_alias(target_key_id, alias_name) return json.dumps(None) def delete_alias(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_DeleteAlias.html""" alias_name = self._get_param("AliasName") if not alias_name.startswith("alias/"): raise ValidationException("Invalid identifier") self._validate_alias(alias_name) self.kms_backend.delete_alias(alias_name) return json.dumps(None) def list_aliases(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_ListAliases.html""" region = self.region key_id = self._get_param("KeyId") if key_id is not None: self._validate_key_id(key_id) key_id = self.kms_backend.get_key_id(key_id) response_aliases = [] backend_aliases = self.kms_backend.list_aliases() for target_key_id, aliases in backend_aliases.items(): for alias_name in aliases: # TODO: add creation date and last updated in response_aliases response_aliases.append( { "AliasArn": f"arn:{get_partition(region)}:kms:{region}:{self.current_account}:{alias_name}", "AliasName": alias_name, "TargetKeyId": target_key_id, } ) for reserved_alias, target_key_id in RESERVED_ALIASE_TARGET_KEY_IDS.items(): exsisting = [ a for a in response_aliases if a["AliasName"] == reserved_alias ] if not exsisting: arn = f"arn:{get_partition(region)}:kms:{region}:{self.current_account}:{reserved_alias}" response_aliases.append( { "TargetKeyId": target_key_id, "AliasArn": arn, "AliasName": reserved_alias, } ) if key_id is not None: response_aliases = list( filter(lambda alias: alias["TargetKeyId"] == key_id, response_aliases) ) return json.dumps({"Truncated": False, "Aliases": response_aliases}) def create_grant(self) -> str: key_id = self._get_param("KeyId") grantee_principal = self._get_param("GranteePrincipal") retiring_principal = self._get_param("RetiringPrincipal") operations = self._get_param("Operations") name = self._get_param("Name") constraints = self._get_param("Constraints") grant_id, grant_token = self.kms_backend.create_grant( key_id, grantee_principal, operations, name, constraints=constraints, retiring_principal=retiring_principal, ) return json.dumps({"GrantId": grant_id, "GrantToken": grant_token}) def list_grants(self) -> str: key_id = self._get_param("KeyId") grant_id = self._get_param("GrantId") grants = self.kms_backend.list_grants(key_id=key_id, grant_id=grant_id) return json.dumps( { "Grants": [gr.to_json() for gr in grants], "GrantCount": len(grants), "Truncated": False, } ) def list_retirable_grants(self) -> str: retiring_principal = self._get_param("RetiringPrincipal") grants = self.kms_backend.list_retirable_grants(retiring_principal) return json.dumps( { "Grants": [gr.to_json() for gr in grants], "GrantCount": len(grants), "Truncated": False, } ) def revoke_grant(self) -> str: key_id = self._get_param("KeyId") grant_id = self._get_param("GrantId") self.kms_backend.revoke_grant(key_id, grant_id) return "{}" def retire_grant(self) -> str: key_id = self._get_param("KeyId") grant_id = self._get_param("GrantId") grant_token = self._get_param("GrantToken") self.kms_backend.retire_grant(key_id, grant_id, grant_token) return "{}" def enable_key_rotation(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_EnableKeyRotation.html""" key_id = self._get_param("KeyId") self._validate_cmk_id(key_id) self.kms_backend.enable_key_rotation(key_id) return json.dumps(None) def disable_key_rotation(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_EnableKeyRotation.html""" key_id = self._get_param("KeyId") self._validate_cmk_id(key_id) self.kms_backend.disable_key_rotation(key_id) return json.dumps(None) def get_key_rotation_status(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_GetKeyRotationStatus.html""" key_id = self._get_param("KeyId") self._validate_cmk_id(key_id) rotation_enabled = self.kms_backend.get_key_rotation_status(key_id) return json.dumps({"KeyRotationEnabled": rotation_enabled}) def put_key_policy(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_PutKeyPolicy.html""" key_id = self._get_param("KeyId") policy_name = self._get_param("PolicyName") policy = self._get_param("Policy") _assert_default_policy(policy_name) self._validate_cmk_id(key_id) self.kms_backend.put_key_policy(key_id, policy) return json.dumps(None) def get_key_policy(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_GetKeyPolicy.html""" key_id = self._get_param("KeyId") policy_name = self._get_param("PolicyName") _assert_default_policy(policy_name) self._validate_cmk_id(key_id) policy = self.kms_backend.get_key_policy(key_id) or "{}" return json.dumps({"Policy": policy}) def list_key_policies(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeyPolicies.html""" key_id = self._get_param("KeyId") self._validate_cmk_id(key_id) self.kms_backend.describe_key(key_id) return json.dumps({"Truncated": False, "PolicyNames": ["default"]}) def encrypt(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_Encrypt.html""" key_id = self._get_param("KeyId") encryption_context = self._get_param("EncryptionContext", {}) plaintext = self._get_param("Plaintext") self._validate_key_id(key_id) if isinstance(plaintext, str): plaintext = plaintext.encode("utf-8") ciphertext_blob, arn = self.kms_backend.encrypt( key_id=key_id, plaintext=plaintext, encryption_context=encryption_context ) ciphertext_blob_response = base64.b64encode(ciphertext_blob).decode("utf-8") return json.dumps({"CiphertextBlob": ciphertext_blob_response, "KeyId": arn}) def decrypt(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html""" ciphertext_blob = self._get_param("CiphertextBlob") encryption_context = self._get_param("EncryptionContext", {}) plaintext, arn = self.kms_backend.decrypt( ciphertext_blob=ciphertext_blob, encryption_context=encryption_context ) plaintext_response = base64.b64encode(plaintext).decode("utf-8") return json.dumps({"Plaintext": plaintext_response, "KeyId": arn}) def re_encrypt(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_ReEncrypt.html""" ciphertext_blob = self._get_param("CiphertextBlob") source_encryption_context = self._get_param("SourceEncryptionContext", {}) destination_key_id = self._get_param("DestinationKeyId") destination_encryption_context = self._get_param( "DestinationEncryptionContext", {} ) self._validate_key_id(destination_key_id) ( new_ciphertext_blob, decrypting_arn, encrypting_arn, ) = self.kms_backend.re_encrypt( ciphertext_blob=ciphertext_blob, source_encryption_context=source_encryption_context, destination_key_id=destination_key_id, destination_encryption_context=destination_encryption_context, ) response_ciphertext_blob = base64.b64encode(new_ciphertext_blob).decode("utf-8") return json.dumps( { "CiphertextBlob": response_ciphertext_blob, "KeyId": encrypting_arn, "SourceKeyId": decrypting_arn, } ) def disable_key(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_DisableKey.html""" key_id = self._get_param("KeyId") self._validate_cmk_id(key_id) self.kms_backend.disable_key(key_id) return json.dumps(None) def enable_key(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_EnableKey.html""" key_id = self._get_param("KeyId") self._validate_cmk_id(key_id) self.kms_backend.enable_key(key_id) return json.dumps(None) def cancel_key_deletion(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_CancelKeyDeletion.html""" key_id = self._get_param("KeyId") self._validate_cmk_id(key_id) self.kms_backend.cancel_key_deletion(key_id) return json.dumps({"KeyId": key_id}) def schedule_key_deletion(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_ScheduleKeyDeletion.html""" key_id = self._get_param("KeyId") if self._get_param("PendingWindowInDays") is None: pending_window_in_days = 30 else: pending_window_in_days = self._get_param("PendingWindowInDays") self._validate_cmk_id(key_id) return json.dumps( { "KeyId": key_id, "DeletionDate": self.kms_backend.schedule_key_deletion( key_id, pending_window_in_days ), } ) def generate_data_key(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html""" key_id = self._get_param("KeyId") encryption_context = self._get_param("EncryptionContext", {}) number_of_bytes = self._get_param("NumberOfBytes") key_spec = self._get_param("KeySpec") # Param validation self._validate_key_id(key_id) if number_of_bytes and (number_of_bytes > 1024 or number_of_bytes < 1): raise ValidationException( ( "1 validation error detected: Value '{number_of_bytes:d}' at 'numberOfBytes' failed " "to satisfy constraint: Member must have value less than or " "equal to 1024" ).format(number_of_bytes=number_of_bytes) ) if key_spec and key_spec not in ("AES_256", "AES_128"): raise ValidationException( ( "1 validation error detected: Value '{key_spec}' at 'keySpec' failed " "to satisfy constraint: Member must satisfy enum value set: " "[AES_256, AES_128]" ).format(key_spec=key_spec) ) if not key_spec and not number_of_bytes: raise ValidationException( "Please specify either number of bytes or key spec." ) if key_spec and number_of_bytes: raise ValidationException( "Please specify either number of bytes or key spec." ) plaintext, ciphertext_blob, key_arn = self.kms_backend.generate_data_key( key_id=key_id, encryption_context=encryption_context, number_of_bytes=number_of_bytes, key_spec=key_spec, ) plaintext_response = base64.b64encode(plaintext).decode("utf-8") ciphertext_blob_response = base64.b64encode(ciphertext_blob).decode("utf-8") return json.dumps( { "CiphertextBlob": ciphertext_blob_response, "Plaintext": plaintext_response, "KeyId": key_arn, # not alias } ) def generate_data_key_without_plaintext(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKeyWithoutPlaintext.html""" result = json.loads(self.generate_data_key()) del result["Plaintext"] return json.dumps(result) def generate_mac(self) -> str: message = self._get_param("Message") key_id = self._get_param("KeyId") mac_algorithm = self._get_param("MacAlgorithm") grant_tokens = self._get_param("GrantTokens") dry_run = self._get_param("DryRun") self._validate_key_id(key_id) mac_algorithms = { "HMAC_SHA_224", "HMAC_SHA_256", "HMAC_SHA_384", "HMAC_SHA_512", } if mac_algorithm and mac_algorithm not in mac_algorithms: raise ValidationException( f"MacAlgorithm must be one of {', '.join(mac_algorithms)}" ) mac, mac_algorithm, key_id = self.kms_backend.generate_mac( message=message, key_id=key_id, mac_algorithm=mac_algorithm, grant_tokens=grant_tokens, dry_run=dry_run, ) return json.dumps(dict(Mac=mac, MacAlgorithm=mac_algorithm, KeyId=key_id)) def generate_random(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateRandom.html""" number_of_bytes = self._get_param("NumberOfBytes") if number_of_bytes and (number_of_bytes > 1024 or number_of_bytes < 1): raise ValidationException( ( "1 validation error detected: Value '{number_of_bytes:d}' at 'numberOfBytes' failed " "to satisfy constraint: Member must have value less than or " "equal to 1024" ).format(number_of_bytes=number_of_bytes) ) entropy = os.urandom(number_of_bytes) response_entropy = base64.b64encode(entropy).decode("utf-8") return json.dumps({"Plaintext": response_entropy}) def sign(self) -> str: key_id = self._get_param("KeyId") message = self._get_param("Message") message_type = self._get_param("MessageType") signing_algorithm = self._get_param("SigningAlgorithm") self._validate_key_id(key_id) if isinstance(message, str): message = message.encode("utf-8") if message == b"": raise ValidationException( "1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1" ) if not message_type: message_type = "RAW" key_id, signature, signing_algorithm = self.kms_backend.sign( key_id=key_id, message=message, signing_algorithm=signing_algorithm, ) signature_blob_response = base64.b64encode(signature).decode("utf-8") return json.dumps( { "KeyId": key_id, "Signature": signature_blob_response, "SigningAlgorithm": signing_algorithm, } ) def verify(self) -> str: """https://docs.aws.amazon.com/kms/latest/APIReference/API_Verify.html""" key_id = self._get_param("KeyId") message = self._get_param("Message") message_type = self._get_param("MessageType") signature = self._get_param("Signature") signing_algorithm = self._get_param("SigningAlgorithm") self._validate_key_id(key_id) if not message_type: message_type = "RAW" if isinstance(message, str): message = message.encode("utf-8") if message == b"": raise ValidationException( "1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1" ) if isinstance(signature, str): # we return base64 signatures, when signing signature = base64.b64decode(signature.encode("utf-8")) if signature == b"": raise ValidationException( "1 validation error detected: Value at 'Signature' failed to satisfy constraint: Member must have length greater than or equal to 1" ) key_arn, signature_valid, signing_algorithm = self.kms_backend.verify( key_id=key_id, message=message, signature=signature, signing_algorithm=signing_algorithm, ) return json.dumps( { "KeyId": key_arn, "SignatureValid": signature_valid, "SigningAlgorithm": signing_algorithm, } ) def verify_mac(self) -> str: message = self._get_param("Message") mac = self._get_param("Mac") key_id = self._get_param("KeyId") mac_algorithm = self._get_param("MacAlgorithm") grant_tokens = self._get_param("GrantTokens") dry_run = self._get_param("DryRun") self._validate_key_id(key_id) mac_algorithms = { "HMAC_SHA_224", "HMAC_SHA_256", "HMAC_SHA_384", "HMAC_SHA_512", } if mac_algorithm and mac_algorithm not in mac_algorithms: raise ValidationException( f"MacAlgorithm must be one of {', '.join(mac_algorithms)}" ) self.kms_backend.verify_mac( message=message, key_id=key_id, mac_algorithm=mac_algorithm, mac=mac, grant_tokens=grant_tokens, dry_run=dry_run, ) return json.dumps(dict(KeyId=key_id, MacValid=True, MacAlgorithm=mac_algorithm)) def get_public_key(self) -> str: key_id = self._get_param("KeyId") self._validate_key_id(key_id) key, public_key = self.kms_backend.get_public_key(key_id) return json.dumps( { "CustomerMasterKeySpec": key.key_spec, "EncryptionAlgorithms": key.encryption_algorithms, "KeyId": key.id, "KeyUsage": key.key_usage, "PublicKey": base64.b64encode(public_key).decode("UTF-8"), "SigningAlgorithms": key.signing_algorithms, } ) def rotate_key_on_demand(self) -> str: key_id = self._get_param("KeyId") self._validate_key_id(key_id) key_id = self.kms_backend.rotate_key_on_demand( key_id=key_id, ) return json.dumps(dict(KeyId=key_id)) def list_key_rotations(self) -> str: key_id = self._get_param("KeyId") limit = self._get_param("Limit", 1000) marker = self._get_param("Marker") self._validate_key_id(key_id) rotations, next_marker = self.kms_backend.list_key_rotations( key_id=key_id, limit=limit, next_marker=marker ) is_truncated = next_marker is not None response = {"Rotations": rotations, "Truncated": is_truncated} if is_truncated: response["NextMarker"] = next_marker return json.dumps(response) def _assert_default_policy(policy_name: str) -> None: if policy_name != "default": raise NotFoundException("No such policy exists")
Memory