from copy import deepcopy
from typing import Any, Dict, List, Optional, Tuple
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import unix_time
from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService
from moto.utilities.utils import get_partition
from .exceptions import AlreadyExistsException, ResourceNotFoundException
class Plan(BaseModel):
def __init__(
self,
backup_plan: Dict[str, Any],
creator_request_id: str,
backend: "BackupBackend",
):
self.backup_plan_id = str(mock_random.uuid4())
self.backup_plan_arn = f"arn:{get_partition(backend.region_name)}:backup:{backend.region_name}:{backend.account_id}:backup-plan:{self.backup_plan_id}"
self.creation_date = unix_time()
ran_str = mock_random.get_random_string(length=48)
self.version_id = ran_str
self.creator_request_id = creator_request_id
self.backup_plan = backup_plan
adv_settings = backup_plan.get("AdvancedBackupSettings")
self.advanced_backup_settings = adv_settings or []
self.deletion_date: Optional[float] = None
# Deletion Date is updated when the backup_plan is deleted
self.last_execution_date = None # start_restore_job not yet supported
rules = backup_plan["Rules"]
for rule in rules:
rule["ScheduleExpression"] = rule.get(
"ScheduleExpression", "cron(0 5 ? * * *)"
) # Default CRON expression in UTC
rule["StartWindowMinutes"] = rule.get(
"StartWindowMinutes", 480
) # Default=480
rule["CompletionWindowMinutes"] = rule.get(
"CompletionWindowMinutes", 10080
) # Default=10080
rule["ScheduleExpressionTimezone"] = rule.get(
"ScheduleExpressionTimezone", "Etc/UTC"
) # set to Etc/UTc by default
rule["RuleId"] = str(mock_random.uuid4())
def to_dict(self) -> Dict[str, Any]:
dct = {
"BackupPlanId": self.backup_plan_id,
"BackupPlanArn": self.backup_plan_arn,
"CreationDate": self.creation_date,
"VersionId": self.version_id,
"AdvancedBackupSettings": self.advanced_backup_settings,
}
return {k: v for k, v in dct.items() if v}
def to_get_dict(self) -> Dict[str, Any]:
dct = self.to_dict()
dct_options = {
"BackupPlan": self.backup_plan,
"CreatorRequestId": self.creator_request_id,
"DeletionDate": self.deletion_date,
"LastExecutionDate": self.last_execution_date,
}
for key, value in dct_options.items():
if value is not None:
dct[key] = value
return dct
def to_list_dict(self) -> Dict[str, Any]:
dct = self.to_get_dict()
dct.pop("BackupPlan")
dct["BackupPlanName"] = self.backup_plan.get("BackupPlanName")
return dct
class Vault(BaseModel):
def __init__(
self,
backup_vault_name: str,
encryption_key_arn: str,
creator_request_id: str,
backend: "BackupBackend",
):
self.backup_vault_name = backup_vault_name
self.backup_vault_arn = f"arn:{get_partition(backend.region_name)}:backup:{backend.region_name}:{backend.account_id}:backup-vault:{backup_vault_name}"
self.creation_date = unix_time()
self.encryption_key_arn = encryption_key_arn
self.creator_request_id = creator_request_id
self.num_of_recovery_points = 0 # start_backup_job not yet supported
self.locked = False # put_backup_vault_lock_configuration
self.min_retention_days = 0 # put_backup_vault_lock_configuration
self.max_retention_days = 0 # put_backup_vault_lock_configuration
self.lock_date = None # put_backup_vault_lock_configuration
def to_dict(self) -> Dict[str, Any]:
dct = {
"BackupVaultName": self.backup_vault_name,
"BackupVaultArn": self.backup_vault_arn,
"CreationDate": self.creation_date,
}
return dct
def to_list_dict(self) -> Dict[str, Any]:
dct = self.to_dict()
dct_options: Dict[str, Any] = dict()
dct_options = {
"EncryptionKeyArn": self.encryption_key_arn,
"CreatorRequestId": self.creator_request_id,
"NumberOfRecoveryPoints": self.num_of_recovery_points,
"Locked": self.locked,
"MinRetentionDays": self.min_retention_days,
"MaxRetentionDays": self.max_retention_days,
"LockDate": self.lock_date,
}
for key, value in dct_options.items():
if value is not None:
dct[key] = value
return dct
class BackupBackend(BaseBackend):
"""Implementation of Backup APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.vaults: Dict[str, Vault] = dict()
self.plans: Dict[str, Plan] = dict()
self.tagger = TaggingService()
def create_backup_plan(
self,
backup_plan: Dict[str, Any],
backup_plan_tags: Dict[str, str],
creator_request_id: str,
) -> Plan:
if backup_plan["BackupPlanName"] in list(
p.backup_plan["BackupPlanName"] for p in list(self.plans.values())
):
raise AlreadyExistsException(
msg="Backup plan with the same plan document already exists"
)
plan = Plan(
backup_plan=backup_plan,
creator_request_id=creator_request_id,
backend=self,
)
if backup_plan_tags:
self.tag_resource(plan.backup_plan_arn, backup_plan_tags)
self.plans[plan.backup_plan_id] = plan
return plan
def get_backup_plan(self, backup_plan_id: str, version_id: Optional[Any]) -> Plan:
msg = "Failed reading Backup plan with provided version"
if backup_plan_id not in self.plans:
raise ResourceNotFoundException(msg=msg)
plan = self.plans[backup_plan_id]
if version_id:
if plan.version_id == version_id:
return plan
else:
raise ResourceNotFoundException(msg=msg)
return plan
def delete_backup_plan(self, backup_plan_id: str) -> Tuple[str, str, float, str]:
if backup_plan_id not in self.plans:
raise ResourceNotFoundException(
msg="Failed reading Backup plan with provided version"
)
deletion_date = unix_time()
res = self.plans[backup_plan_id]
res.deletion_date = deletion_date
return res.backup_plan_id, res.backup_plan_arn, deletion_date, res.version_id
def list_backup_plans(self, include_deleted: Any) -> List[Plan]:
"""
Pagination is not yet implemented
"""
plans_list = deepcopy(self.plans)
for plan in list(plans_list.values()):
backup_plan_id = plan.backup_plan_id
if plan.deletion_date is not None:
plans_list.pop(backup_plan_id)
if include_deleted:
return list(self.plans.values())
return list(plans_list.values())
def create_backup_vault(
self,
backup_vault_name: str,
backup_vault_tags: Dict[str, str],
encryption_key_arn: str,
creator_request_id: str,
) -> Vault:
if backup_vault_name in self.vaults:
raise AlreadyExistsException(
msg="Backup vault with the same name already exists"
)
vault = Vault(
backup_vault_name=backup_vault_name,
encryption_key_arn=encryption_key_arn,
creator_request_id=creator_request_id,
backend=self,
)
if backup_vault_tags:
self.tag_resource(vault.backup_vault_arn, backup_vault_tags)
self.vaults[backup_vault_name] = vault
return vault
def list_backup_vaults(self) -> List[Vault]:
"""
Pagination is not yet implemented
"""
return list(self.vaults.values())
def list_tags(self, resource_arn: str) -> Dict[str, str]:
"""
Pagination is not yet implemented
"""
return self.tagger.get_tag_dict_for_resource(resource_arn)
def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None:
tags_input = TaggingService.convert_dict_to_tags_input(tags or {})
self.tagger.tag_resource(resource_arn, tags_input)
def untag_resource(self, resource_arn: str, tag_key_list: List[str]) -> None:
self.tagger.untag_resource_using_names(resource_arn, tag_key_list)
backup_backends = BackendDict(BackupBackend, "backup")