import datetime
import json
import time
from typing import Any, Dict, List, Optional, Tuple, Union
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import utcfromtimestamp, utcnow
from moto.moto_api._internal import mock_random
from .exceptions import (
ClientError,
InvalidParameterException,
InvalidRequestException,
OperationNotPermittedOnReplica,
ResourceExistsException,
ResourceNotFoundException,
SecretHasNoValueException,
SecretNotFoundException,
SecretStageVersionMismatchException,
)
from .list_secrets.filters import (
description_filter,
filter_all,
name_filter,
tag_key,
tag_value,
)
from .utils import (
SecretsManagerSecretIdentifier,
get_secret_name_from_partial_arn,
random_password,
)
MAX_RESULTS_DEFAULT = 100
def filter_primary_region(secret: "FakeSecret", values: List[str]) -> bool:
if isinstance(secret, FakeSecret):
return len(secret.replicas) > 0 and secret.region in values
elif isinstance(secret, ReplicaSecret):
return secret.source.region in values
_filter_functions = {
"all": filter_all,
"name": name_filter,
"description": description_filter,
"tag-key": tag_key,
"tag-value": tag_value,
"primary-region": filter_primary_region,
# Other services do not create secrets in Moto (yet)
# So if you're looking for any Secrets owned by a Service, you'll never get any results
"owning-service": lambda x, y: False,
}
def filter_keys() -> List[str]:
return list(_filter_functions.keys())
def _matches(
secret: Union["FakeSecret", "ReplicaSecret"], filters: List[Dict[str, Any]]
) -> bool:
is_match = True
for f in filters:
# Filter names are pre-validated in the resource layer
filter_function = _filter_functions.get(f["Key"])
is_match = is_match and filter_function(secret, f["Values"]) # type: ignore
return is_match
class SecretsManager(BaseModel):
def __init__(self, region_name: str):
self.region = region_name
class FakeSecret:
def __init__(
self,
account_id: str,
region_name: str,
secret_id: str,
secret_version: Dict[str, Any],
version_id: str,
secret_string: Optional[str] = None,
secret_binary: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[List[Dict[str, str]]] = None,
kms_key_id: Optional[str] = None,
version_stages: Optional[List[str]] = None,
last_changed_date: Optional[int] = None,
created_date: Optional[int] = None,
replica_regions: Optional[List[Dict[str, str]]] = None,
force_overwrite: bool = False,
):
self.secret_id = secret_id
self.name = secret_id
self.arn = SecretsManagerSecretIdentifier(
account_id, region_name, secret_id
).generate(tags=tags)
self.account_id = account_id
self.region = region_name
self.secret_string = secret_string
self.secret_binary = secret_binary
self.description = description
self.tags = tags or None
self.kms_key_id = kms_key_id
self.version_stages = version_stages
self.last_changed_date = last_changed_date
self.created_date = created_date
# We should only return Rotation details after it's been requested
self.rotation_requested = False
self.rotation_enabled = False
self.rotation_lambda_arn = ""
self.auto_rotate_after_days = 0
self.deleted_date: Optional[float] = None
self.policy: Optional[str] = None
self.next_rotation_date: Optional[int] = None
self.last_rotation_date: Optional[int] = None
self.versions: Dict[str, Dict[str, Any]] = {}
if secret_string or secret_binary:
self.versions = {version_id: secret_version}
self.set_default_version_id(version_id)
else:
self.set_default_version_id(None)
self.replicas = self.create_replicas(
replica_regions or [], force_overwrite=force_overwrite
)
@property
def owning_service(self) -> Optional[str]:
for tag in self.tags or []:
if tag["Key"] == "aws:secretsmanager:owningService":
return tag["Value"]
return None
def create_replicas(
self, replica_regions: List[Dict[str, str]], force_overwrite: bool
) -> List["ReplicaSecret"]:
# Validate first, before we create anything
for replica_config in replica_regions or []:
if replica_config["Region"] == self.region:
raise InvalidParameterException("Invalid replica region.")
replicas: List[ReplicaSecret] = []
for replica_config in replica_regions or []:
replica_region = replica_config["Region"]
backend = secretsmanager_backends[self.account_id][replica_region]
if self.name in backend.secrets:
if force_overwrite:
backend.secrets.pop(self.name)
replica = ReplicaSecret(self, replica_region)
backend.secrets[replica.arn] = replica
else:
message = f"Replication failed: Secret name simple already exists in region {backend.region_name}."
replica = ReplicaSecret(self, replica_region, "Failed", message)
else:
replica = ReplicaSecret(self, replica_region)
backend.secrets[replica.arn] = replica
replicas.append(replica)
return replicas
def update(
self,
description: Optional[str] = None,
tags: Optional[List[Dict[str, str]]] = None,
kms_key_id: Optional[str] = None,
last_changed_date: Optional[int] = None,
) -> None:
self.description = description
self.tags = tags or None
if last_changed_date is not None:
self.last_changed_date = last_changed_date
if kms_key_id is not None:
self.kms_key_id = kms_key_id
def set_default_version_id(self, version_id: Optional[str]) -> None:
self.default_version_id = version_id
def reset_default_version(
self, secret_version: Dict[str, Any], version_id: str
) -> None:
# remove all old AWSPREVIOUS stages
for old_version in self.versions.values():
if "AWSPREVIOUS" in old_version["version_stages"]:
old_version["version_stages"].remove("AWSPREVIOUS")
if self.default_version_id:
# set old AWSCURRENT secret to AWSPREVIOUS
previous_current_version_id = self.default_version_id
self.versions[previous_current_version_id]["version_stages"] = [
"AWSPREVIOUS"
]
self.versions[version_id] = secret_version
self.default_version_id = version_id
def remove_version_stages_from_old_versions(
self, version_stages: List[str]
) -> None:
for version_stage in version_stages:
for old_version in self.versions.values():
if version_stage in old_version["version_stages"]:
old_version["version_stages"].remove(version_stage)
def delete(self, deleted_date: float) -> None:
self.deleted_date = deleted_date
def restore(self) -> None:
self.deleted_date = None
def is_deleted(self) -> bool:
return self.deleted_date is not None
def to_short_dict(
self,
include_version_stages: bool = False,
version_id: Optional[str] = None,
include_version_id: bool = True,
) -> str:
if not version_id:
version_id = self.default_version_id
dct: Dict[str, Any] = {
"ARN": self.arn,
"Name": self.name,
}
if include_version_id and version_id:
dct["VersionId"] = version_id
if version_id and include_version_stages:
dct["VersionStages"] = self.versions[version_id]["version_stages"]
if self.replicas:
dct["ReplicationStatus"] = [replica.config for replica in self.replicas]
return json.dumps(dct)
def to_dict(self) -> Dict[str, Any]:
version_id_to_stages = self._form_version_ids_to_stages()
dct: Dict[str, Any] = {
"ARN": self.arn,
"Name": self.name,
"LastChangedDate": self.last_changed_date,
"LastAccessedDate": None,
"NextRotationDate": self.next_rotation_date,
"DeletedDate": self.deleted_date,
"CreatedDate": self.created_date,
}
if self.kms_key_id != SecretsManagerBackend.DEFAULT_KMS_KEY_ALIAS:
dct["KmsKeyId"] = self.kms_key_id
if self.owning_service is not None:
dct["OwningService"] = self.owning_service
if self.tags is not None:
dct["Tags"] = self.tags
if self.description:
dct["Description"] = self.description
if self.versions:
dct.update(
{
# Key used by describe_secret
"VersionIdsToStages": version_id_to_stages,
# Key used by list_secrets
"SecretVersionsToStages": version_id_to_stages,
}
)
if self.rotation_requested:
dct.update(
{
"RotationEnabled": self.rotation_enabled,
"RotationLambdaARN": self.rotation_lambda_arn,
"RotationRules": {
"AutomaticallyAfterDays": self.auto_rotate_after_days
},
"LastRotatedDate": self.last_rotation_date,
}
)
if self.replicas:
dct["ReplicationStatus"] = [replica.config for replica in self.replicas]
return dct
def _form_version_ids_to_stages(self) -> Dict[str, str]:
version_id_to_stages = {}
for key, value in self.versions.items():
version_id_to_stages[key] = value["version_stages"]
return version_id_to_stages
class ReplicaSecret:
def __init__(
self,
source: FakeSecret,
region: str,
status: Optional[str] = None,
message: Optional[str] = None,
):
self.source = source
self.arn = source.arn.replace(source.region, region)
self.region = region
self.status = status or "InSync"
self.message = message or "Replication succeeded"
self.has_replica = status is None
self.config = {
"Region": self.region,
"KmsKeyId": SecretsManagerBackend.DEFAULT_KMS_KEY_ALIAS,
"Status": self.status,
"StatusMessage": self.message,
}
def is_deleted(self) -> bool:
return False
def to_dict(self) -> Dict[str, Any]:
dct = self.source.to_dict()
dct["ARN"] = self.arn
dct["PrimaryRegion"] = self.source.region
return dct
@property
def default_version_id(self) -> Optional[str]:
return self.source.default_version_id
@property
def versions(self) -> Dict[str, Dict[str, Any]]: # type: ignore[misc]
return self.source.versions
@property
def name(self) -> str:
return self.source.name
@property
def secret_id(self) -> str:
return self.source.secret_id
class SecretsStore(Dict[str, Union[FakeSecret, ReplicaSecret]]):
# Parameters to this dictionary can be three possible values:
# names, full ARNs, and partial ARNs
# Every retrieval method should check which type of input it receives
def __setitem__(self, key: str, value: Union[FakeSecret, ReplicaSecret]) -> None:
super().__setitem__(key, value)
def __getitem__(self, key: str) -> Union[FakeSecret, ReplicaSecret]:
for secret in dict.values(self):
if secret.arn == key or secret.name == key:
return secret
name = get_secret_name_from_partial_arn(key)
return super().__getitem__(name)
def __contains__(self, key: str) -> bool: # type: ignore
for secret in dict.values(self):
if secret.arn == key or secret.name == key:
return True
name = get_secret_name_from_partial_arn(key)
return dict.__contains__(self, name) # type: ignore
def get(self, key: str) -> Optional[Union[FakeSecret, ReplicaSecret]]: # type: ignore
for secret in dict.values(self):
if secret.arn == key or secret.name == key:
return secret
name = get_secret_name_from_partial_arn(key)
return super().get(name)
def pop(self, key: str) -> Optional[Union[FakeSecret, ReplicaSecret]]: # type: ignore
for secret in dict.values(self):
if secret.arn == key or secret.name == key:
key = secret.name
name = get_secret_name_from_partial_arn(key)
return super().pop(name, None)
class SecretsManagerBackend(BaseBackend):
DEFAULT_KMS_KEY_ALIAS = "alias/aws/secretsmanager"
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.secrets = SecretsStore()
def _is_valid_identifier(self, identifier: str) -> bool:
return identifier in self.secrets
def _unix_time_secs(self, dt: datetime.datetime) -> float:
epoch = utcfromtimestamp(0)
return (dt - epoch).total_seconds()
def _client_request_token_validator(self, client_request_token: str) -> None:
token_length = len(client_request_token)
if token_length < 32 or token_length > 64:
msg = "ClientRequestToken must be 32-64 characters long."
raise InvalidParameterException(msg)
def _from_client_request_token(self, client_request_token: Optional[str]) -> str:
if client_request_token:
self._client_request_token_validator(client_request_token)
return client_request_token
else:
return str(mock_random.uuid4())
def cancel_rotate_secret(self, secret_id: str) -> str:
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
if secret.is_deleted():
raise InvalidRequestException(
"You tried to perform the operation on a secret that's currently marked deleted."
)
if not secret.rotation_lambda_arn:
# This response doesn't make much sense for `CancelRotateSecret`, but this is what AWS has documented ...
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_CancelRotateSecret.html
raise InvalidRequestException(
(
"You tried to enable rotation on a secret that doesn't already have a Lambda function ARN configured"
"and you didn't include such an ARN as a parameter in this call."
)
)
secret.rotation_enabled = False
return secret.to_short_dict()
def get_secret_value(
self, secret_id: str, version_id: str, version_stage: str
) -> Dict[str, Any]:
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
if version_id and version_stage:
versions_dict = self.secrets[secret_id].versions
if (
version_id in versions_dict
and version_stage not in versions_dict[version_id]["version_stages"]
):
raise SecretStageVersionMismatchException()
version_id_provided = version_id is not None
if not version_id and version_stage:
# set version_id to match version_stage
versions_dict = self.secrets[secret_id].versions
for ver_id, ver_val in versions_dict.items():
if version_stage in ver_val["version_stages"]:
version_id = ver_id
break
if not version_id:
raise SecretNotFoundException()
# TODO check this part
if self.secrets[secret_id].is_deleted():
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You tried to \
perform the operation on a secret that's currently marked deleted."
)
secret = self.secrets[secret_id]
version_id = version_id or secret.default_version_id or "AWSCURRENT"
secret_version = secret.versions.get(version_id)
if not secret_version:
_type = "staging label" if not version_id_provided else "VersionId"
raise ResourceNotFoundException(
f"Secrets Manager can't find the specified secret value for {_type}: {version_id}"
)
response_data = {
"ARN": secret.arn,
"Name": secret.name,
"VersionId": secret_version["version_id"],
"VersionStages": secret_version["version_stages"],
"CreatedDate": secret_version["createdate"],
}
if "secret_string" in secret_version:
response_data["SecretString"] = secret_version["secret_string"]
if "secret_binary" in secret_version:
response_data["SecretBinary"] = secret_version["secret_binary"]
if (
"secret_string" not in secret_version
and "secret_binary" not in secret_version
):
raise SecretHasNoValueException(version_stage or "AWSCURRENT")
return response_data
def batch_get_secret_value(
self,
secret_id_list: Optional[List[str]] = None,
filters: Optional[List[Dict[str, Any]]] = None,
max_results: Optional[int] = None,
next_token: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], List[Any], Optional[str]]:
secret_list = []
errors: List[Any] = []
if secret_id_list and filters:
raise InvalidParameterException(
"Either 'SecretIdList' or 'Filters' must be provided, but not both."
)
if max_results and not filters:
raise InvalidParameterException(
"'Filters' not specified. 'Filters' must also be specified when 'MaxResults' is provided."
)
if secret_id_list:
for secret_id in secret_id_list:
# TODO perhaps there should be a check if the secret id is valid identifier
# and add an error to the list if not
try:
# TODO investigate the behaviour when the secret doesn't exist or has been deleted,
# might need to add an error to the list
secret_list.append(self.get_secret_value(secret_id, "", ""))
except (SecretNotFoundException, InvalidRequestException):
pass
if filters:
for secret in self.secrets.values():
if _matches(secret, filters):
if isinstance(secret, FakeSecret):
secret_list.append(
self.get_secret_value(secret.secret_id, "", "")
)
elif isinstance(secret, ReplicaSecret):
secret_list.append(
self.get_secret_value(secret.source.secret_id, "", "")
)
secret_page, new_next_token = self._get_secret_values_page_and_next_token(
secret_list, max_results, next_token
)
return secret_page, errors, new_next_token
def update_secret(
self,
secret_id: str,
secret_string: Optional[str] = None,
secret_binary: Optional[str] = None,
client_request_token: Optional[str] = None,
kms_key_id: Optional[str] = None,
description: Optional[str] = None,
) -> str:
# error if secret does not exist
if secret_id not in self.secrets:
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
if secret.is_deleted():
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
"You can't perform this operation on the secret because it was marked for deletion."
)
tags = secret.tags
description = description or secret.description
secret, new_version = self._add_secret(
secret_id,
secret_string=secret_string,
secret_binary=secret_binary,
description=description,
version_id=client_request_token,
tags=tags,
kms_key_id=kms_key_id,
)
return secret.to_short_dict(include_version_id=new_version)
def create_secret(
self,
name: str,
secret_string: Optional[str],
secret_binary: Optional[str],
description: Optional[str],
tags: Optional[List[Dict[str, str]]],
kms_key_id: Optional[str],
client_request_token: Optional[str],
replica_regions: List[Dict[str, str]],
force_overwrite: bool,
) -> str:
if name in self.secrets.keys():
raise ResourceExistsException(
"A resource with the ID you requested already exists."
)
secret, new_version = self._add_secret(
name,
secret_string=secret_string,
secret_binary=secret_binary,
description=description,
tags=tags,
kms_key_id=kms_key_id,
version_id=client_request_token,
replica_regions=replica_regions,
force_overwrite=force_overwrite,
)
return secret.to_short_dict(include_version_id=new_version)
def _add_secret(
self,
secret_id: str,
secret_string: Optional[str] = None,
secret_binary: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[List[Dict[str, str]]] = None,
kms_key_id: Optional[str] = None,
version_id: Optional[str] = None,
version_stages: Optional[List[str]] = None,
replica_regions: Optional[List[Dict[str, str]]] = None,
force_overwrite: bool = False,
) -> Tuple[FakeSecret, bool]:
if version_stages is None:
version_stages = ["AWSCURRENT"]
version_id = self._from_client_request_token(version_id)
secret_version = {
"createdate": int(time.time()),
"version_id": version_id,
"version_stages": version_stages,
}
if secret_string is not None:
secret_version["secret_string"] = secret_string
if secret_binary is not None:
secret_version["secret_binary"] = secret_binary
new_version = secret_string is not None or secret_binary is not None
update_time = int(time.time())
if secret_id in self.secrets:
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
secret.update(description, tags, kms_key_id, last_changed_date=update_time)
if new_version:
if "AWSCURRENT" in version_stages:
secret.reset_default_version(secret_version, version_id)
else:
secret.remove_version_stages_from_old_versions(version_stages)
secret.versions[version_id] = secret_version
else:
secret = FakeSecret(
account_id=self.account_id,
region_name=self.region_name,
secret_id=secret_id,
secret_string=secret_string,
secret_binary=secret_binary,
description=description,
tags=tags,
kms_key_id=kms_key_id,
last_changed_date=update_time,
created_date=update_time,
version_id=version_id,
secret_version=secret_version,
replica_regions=replica_regions,
force_overwrite=force_overwrite,
)
self.secrets[secret_id] = secret
return secret, new_version
def create_managed_secret(
self,
service_name: str,
secret_id: str,
secret_string: Optional[str] = None,
secret_binary: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[List[Dict[str, str]]] = None,
kms_key_id: Optional[str] = None,
version_id: Optional[str] = None,
replica_regions: Optional[List[Dict[str, str]]] = None,
force_overwrite: bool = False,
) -> FakeSecret:
"""Create an AWS managed secret for the specified service name."""
if kms_key_id is None:
kms_key_id = self.DEFAULT_KMS_KEY_ALIAS
managed_tag = {
"Key": "aws:secretsmanager:owningService",
"Value": service_name,
}
if tags is None:
tags = [managed_tag]
else:
tags.append(managed_tag)
secret, _ = self._add_secret(
secret_id,
secret_string=secret_string,
secret_binary=secret_binary,
description=description,
tags=tags,
kms_key_id=kms_key_id,
version_id=version_id,
replica_regions=replica_regions,
force_overwrite=force_overwrite,
)
return secret
def put_secret_value(
self,
secret_id: str,
secret_string: str,
secret_binary: str,
client_request_token: str,
version_stages: List[str],
) -> str:
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
else:
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
tags = secret.tags
description = secret.description
version_id = self._from_client_request_token(client_request_token)
secret, _ = self._add_secret(
secret_id,
secret_string,
secret_binary,
version_id=version_id,
description=description,
tags=tags,
version_stages=version_stages,
)
return secret.to_short_dict(include_version_stages=True, version_id=version_id)
def describe_secret(self, secret_id: str) -> Union[FakeSecret, ReplicaSecret]:
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
return self.secrets[secret_id]
def rotate_secret(
self,
secret_id: str,
client_request_token: Optional[str] = None,
rotation_lambda_arn: Optional[str] = None,
rotation_rules: Optional[Dict[str, Any]] = None,
rotate_immediately: bool = True,
) -> str:
rotation_days = "AutomaticallyAfterDays"
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
if secret.is_deleted():
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \
perform the operation on a secret that's currently marked deleted."
)
if rotation_lambda_arn:
if len(rotation_lambda_arn) > 2048:
msg = "RotationLambdaARN must <= 2048 characters long."
raise InvalidParameterException(msg)
if rotation_rules:
if rotation_days in rotation_rules:
rotation_period = rotation_rules[rotation_days]
if rotation_period < 1 or rotation_period > 1000:
msg = "RotationRules.AutomaticallyAfterDays must be within 1-1000."
raise InvalidParameterException(msg)
secret.next_rotation_date = int(time.time()) + (
int(rotation_period) * 86400
)
# The rotation function must end with the versions of the secret in
# one of two states:
#
# - The AWSPENDING and AWSCURRENT staging labels are attached to the
# same version of the secret, or
# - The AWSPENDING staging label is not attached to any version of the secret.
#
# If the AWSPENDING staging label is present but not attached to the same
# version as AWSCURRENT then any later invocation of RotateSecret assumes
# that a previous rotation request is still in progress and returns an error.
try:
version = next(
version
for version in secret.versions.values()
if "AWSPENDING" in version["version_stages"]
)
if "AWSCURRENT" in version["version_stages"]:
msg = "Previous rotation request is still in progress."
raise InvalidRequestException(msg)
except StopIteration:
# Pending is not present in any version
pass
if secret.versions:
if client_request_token:
self._client_request_token_validator(client_request_token)
new_version_id = client_request_token
else:
new_version_id = str(mock_random.uuid4())
# We add a "pending" stage. The previous version remains as "current" for now.
# Caller is responsible for creating the new secret in the Lambda
secret_version = {
"createdate": int(time.time()),
"version_id": new_version_id,
"version_stages": ["AWSPENDING"],
}
if not rotate_immediately:
if secret.secret_string is not None:
secret_version["secret_string"] = secret.secret_string
if secret.secret_binary is not None:
secret_version["secret_binary"] = secret.secret_binary
secret.remove_version_stages_from_old_versions(["AWSPENDING"])
secret.versions[new_version_id] = secret_version
secret.rotation_requested = True
secret.rotation_lambda_arn = rotation_lambda_arn or ""
if rotation_rules:
secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0)
if secret.auto_rotate_after_days > 0:
secret.rotation_enabled = True
# Begin the rotation process for the given secret by invoking the lambda function.
if secret.rotation_lambda_arn:
from moto.awslambda.utils import get_backend
lambda_backend = get_backend(self.account_id, self.region_name)
request_headers: Dict[str, Any] = {}
response_headers: Dict[str, Any] = {}
try:
lambda_backend.get_function(secret.rotation_lambda_arn)
except Exception:
msg = f"Resource not found for ARN '{secret.rotation_lambda_arn}'."
raise ResourceNotFoundException(msg)
rotation_steps = ["create", "set", "test", "finish"]
if not rotate_immediately:
# if you don't immediately rotate the secret,
# Secrets Manager tests the rotation configuration by running the testSecretstep of the Lambda rotation function.
rotation_steps = ["test"]
for step in rotation_steps:
lambda_backend.invoke(
secret.rotation_lambda_arn,
qualifier=None,
body=json.dumps(
{
"Step": step + "Secret",
"SecretId": secret.name,
"ClientRequestToken": new_version_id,
}
),
headers=request_headers,
response_headers=response_headers,
)
if rotate_immediately:
# If we don't rotate, we only invoke the testSecret step
# This should be done with the existing (old) version ID
secret.set_default_version_id(new_version_id)
elif secret.versions:
# AWS will always require a Lambda ARN
# without that, Moto can still apply the 'AWSCURRENT'-label
# This only makes sense if we have a version
secret.reset_default_version(
secret.versions[new_version_id], new_version_id
)
secret.versions[new_version_id]["version_stages"] = ["AWSCURRENT"]
secret.last_rotation_date = int(time.time())
return secret.to_short_dict()
def get_random_password(
self,
password_length: int,
exclude_characters: str,
exclude_numbers: bool,
exclude_punctuation: bool,
exclude_uppercase: bool,
exclude_lowercase: bool,
include_space: bool,
require_each_included_type: bool,
) -> str:
# password size must have value less than or equal to 4096
if password_length > 4096:
raise ClientError(
f"ClientError: An error occurred (ValidationException) \
when calling the GetRandomPassword operation: 1 validation error detected: Value '{password_length}' at 'passwordLength' \
failed to satisfy constraint: Member must have value less than or equal to 4096"
)
if password_length < 4:
raise InvalidParameterException(
"InvalidParameterException: An error occurred (InvalidParameterException) \
when calling the GetRandomPassword operation: Password length is too short based on the required types."
)
return json.dumps(
{
"RandomPassword": random_password(
password_length,
exclude_characters,
exclude_numbers,
exclude_punctuation,
exclude_uppercase,
exclude_lowercase,
include_space,
require_each_included_type,
)
}
)
def list_secret_version_ids(self, secret_id: str) -> str:
secret = self.secrets[secret_id]
version_list = []
for version_id, version in secret.versions.items():
version_list.append(
{
"CreatedDate": int(time.time()),
"LastAccessedDate": int(time.time()),
"VersionId": version_id,
"VersionStages": version["version_stages"],
}
)
return json.dumps(
{
"ARN": secret.secret_id,
"Name": secret.name,
"NextToken": "",
"Versions": version_list,
}
)
def list_secrets(
self,
filters: List[Dict[str, Any]],
max_results: int = MAX_RESULTS_DEFAULT,
next_token: Optional[str] = None,
include_planned_deletion: bool = False,
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
secret_list: List[Dict[str, Any]] = []
for secret in self.secrets.values():
if hasattr(secret, "deleted_date"):
if secret.deleted_date and not include_planned_deletion:
continue
if _matches(secret, filters):
secret_list.append(secret.to_dict())
return self._get_secret_values_page_and_next_token(
secret_list, max_results, next_token
)
def delete_secret(
self,
secret_id: str,
recovery_window_in_days: Optional[int],
force_delete_without_recovery: bool,
) -> Tuple[str, str, float]:
if recovery_window_in_days is not None and (
recovery_window_in_days < 7 or recovery_window_in_days > 30
):
raise InvalidParameterException(
"An error occurred (InvalidParameterException) when calling the DeleteSecret operation: The \
RecoveryWindowInDays value must be between 7 and 30 days (inclusive)."
)
if recovery_window_in_days and force_delete_without_recovery:
raise InvalidParameterException(
"An error occurred (InvalidParameterException) when calling the DeleteSecret operation: You can't \
use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays."
)
if not self._is_valid_identifier(secret_id):
if not force_delete_without_recovery:
raise SecretNotFoundException()
else:
arn = SecretsManagerSecretIdentifier(
self.account_id, self.region_name, secret_id=secret_id
).generate()
name = secret_id
deletion_date = utcnow()
return arn, name, self._unix_time_secs(deletion_date)
else:
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
if len(secret.replicas) > 0:
replica_regions = ",".join([rep.region for rep in secret.replicas])
msg = f"You can't delete secret {secret_id} that still has replica regions [{replica_regions}]"
raise InvalidParameterException(msg)
if secret.is_deleted() and not force_delete_without_recovery:
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \
perform the operation on a secret that's currently marked deleted."
)
deletion_date = utcnow()
if force_delete_without_recovery:
self.secrets.pop(secret_id)
else:
deletion_date += datetime.timedelta(days=recovery_window_in_days or 30)
secret.delete(self._unix_time_secs(deletion_date))
if not secret:
raise SecretNotFoundException()
arn = secret.arn
name = secret.name
return arn, name, self._unix_time_secs(deletion_date)
def restore_secret(self, secret_id: str) -> Tuple[str, str]:
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
secret.restore()
return secret.arn, secret.name
def tag_resource(self, secret_id: str, tags: List[Dict[str, str]]) -> None:
if secret_id not in self.secrets:
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
old_tags = {tag["Key"]: tag for tag in secret.tags or []}
for tag in tags:
old_tags[tag["Key"]] = tag
secret.tags = list(old_tags.values())
def untag_resource(self, secret_id: str, tag_keys: List[str]) -> None:
if secret_id not in self.secrets:
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
if secret.tags is None:
return
secret.tags = [tag for tag in secret.tags if tag["Key"] not in tag_keys]
def update_secret_version_stage(
self,
secret_id: str,
version_stage: str,
remove_from_version_id: str,
move_to_version_id: str,
) -> Tuple[str, str]:
if secret_id not in self.secrets:
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if remove_from_version_id:
if remove_from_version_id not in secret.versions:
raise InvalidParameterException(
f"Not a valid version: {remove_from_version_id}"
)
stages = secret.versions[remove_from_version_id]["version_stages"]
if version_stage not in stages:
raise InvalidParameterException(
f"Version stage {version_stage} not found in version {remove_from_version_id}"
)
stages.remove(version_stage)
elif version_stage == "AWSCURRENT":
current_version = [
v
for v in secret.versions
if "AWSCURRENT" in secret.versions[v]["version_stages"]
][0]
err = f"The parameter RemoveFromVersionId can't be empty. Staging label AWSCURRENT is currently attached to version {current_version}, so you must explicitly reference that version in RemoveFromVersionId."
raise InvalidParameterException(err)
if move_to_version_id:
if move_to_version_id not in secret.versions:
raise InvalidParameterException(
f"Not a valid version: {move_to_version_id}"
)
stages = secret.versions[move_to_version_id]["version_stages"]
stages.append(version_stage)
if version_stage == "AWSCURRENT":
if remove_from_version_id:
# Whenever you move AWSCURRENT, Secrets Manager automatically
# moves the label AWSPREVIOUS to the version that AWSCURRENT
# was removed from.
for version in secret.versions:
if "AWSPREVIOUS" in secret.versions[version]["version_stages"]:
secret.versions[version]["version_stages"].remove("AWSPREVIOUS")
secret.versions[remove_from_version_id]["version_stages"].append(
"AWSPREVIOUS"
)
if move_to_version_id:
stages = secret.versions[move_to_version_id]["version_stages"]
if "AWSPREVIOUS" in stages:
stages.remove("AWSPREVIOUS")
return secret.arn, secret.name
def put_resource_policy(self, secret_id: str, policy: str) -> Tuple[str, str]:
"""
The BlockPublicPolicy-parameter is not yet implemented
"""
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
secret.policy = policy
return secret.arn, secret.name
def get_resource_policy(self, secret_id: str) -> str:
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
resp = {
"ARN": secret.arn,
"Name": secret.name,
}
if secret.policy is not None:
resp["ResourcePolicy"] = secret.policy
return json.dumps(resp)
def delete_resource_policy(self, secret_id: str) -> Tuple[str, str]:
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
secret.policy = None
return secret.arn, secret.name
def replicate_secret_to_regions(
self,
secret_id: str,
replica_regions: List[Dict[str, str]],
force_overwrite: bool,
) -> Tuple[str, List[Dict[str, Any]]]:
secret = self.describe_secret(secret_id)
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
secret.replicas.extend(
secret.create_replicas(replica_regions, force_overwrite=force_overwrite)
)
statuses = [replica.config for replica in secret.replicas]
return secret_id, statuses
def remove_regions_from_replication(
self, secret_id: str, replica_regions: List[str]
) -> Tuple[str, List[Dict[str, str]]]:
secret = self.describe_secret(secret_id)
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
for replica in secret.replicas.copy():
if replica.region in replica_regions:
backend = secretsmanager_backends[self.account_id][replica.region]
if replica.has_replica:
dict.pop(backend.secrets, replica.arn)
secret.replicas.remove(replica)
statuses = [replica.config for replica in secret.replicas]
return secret_id, statuses
def _get_secret_values_page_and_next_token(
self,
secret_list: List[Dict[str, Any]],
max_results: Optional[int],
next_token: Optional[str],
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
starting_point = int(next_token or 0)
ending_point = starting_point + int(max_results or MAX_RESULTS_DEFAULT)
secret_page = secret_list[starting_point:ending_point]
new_next_token = str(ending_point) if ending_point < len(secret_list) else None
return secret_page, new_next_token
secretsmanager_backends = BackendDict(SecretsManagerBackend, "secretsmanager")