from __future__ import annotations
import copy
import datetime
import re
from collections import OrderedDict, namedtuple
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
from botocore.utils import merge_dicts
SECONDS_IN_ONE_DAY = 24 * 60 * 60
FilterDef = namedtuple(
"FilterDef",
[
# A list of object attributes to check against the filter values.
# Set to None if filter is not yet implemented in `moto`.
"attrs_to_check",
# Description of the filter, e.g. 'Object Identifiers'.
# Used in filter error messaging.
"description",
],
)
class DbInstanceEngine(str, Enum):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/create_db_instance.html
# 2023-11-08
AURORA_MYSQL = "aurora-mysql"
AURORA_POSTGRESQL = "aurora-postgresql"
CUSTOM_ORACLE_EE = "custom-oracle-ee"
CUSTOM_ORACLE_EE_CDB = "custom-oracle-ee-cdb"
CUSTOM_SQLSERVER_EE = "custom-sqlserver-ee"
CUSTOM_SQLSERVER_SE = "custom-sqlserver-se"
CUSTOM_SQLSERVER_WEB = "custom-sqlserver-web"
MARIADB = "mariadb"
MYSQL = "mysql"
ORACLE_EE = "oracle-ee"
ORACLE_EE_CDB = "oracle-ee-cdb"
ORACLE_SE2 = "oracle-se2"
ORACLE_SE2_CDB = "oracle-se2-cdb"
POSTGRES = "postgres"
SQLSERVER_EE = "sqlserver-ee"
SQLSERVER_SE = "sqlserver-se"
SQLSERVER_EX = "sqlserver-ex"
SQLSERVER_WEB = "sqlserver-web"
@classmethod
def valid_db_instance_engine(self) -> List[str]:
return sorted([item.value for item in DbInstanceEngine])
class ClusterEngine(str, Enum):
AURORA_POSTGRESQL = "aurora-postgresql"
AURORA_MYSQL = "aurora-mysql"
NEPTUNE = "neptune"
RDS_POSTGRESQL = "postgres"
RDS_MYSQL = "mysql"
@classmethod
def list_cluster_engines(self) -> List[str]:
return sorted([item.value for item in ClusterEngine])
@classmethod
def serverless_engines(self) -> List[str]:
return [ClusterEngine.AURORA_MYSQL, ClusterEngine.AURORA_POSTGRESQL]
def get_object_value(obj: Any, attr: str) -> Any:
"""Retrieves an arbitrary attribute value from an object.
Nested attributes can be specified using dot notation,
e.g. 'parent.child'.
:param object obj:
A valid Python object.
:param str attr:
The attribute name of the value to retrieve from the object.
:returns:
The attribute value, if it exists, or None.
:rtype:
any
"""
keys = attr.split(".")
val = obj
for key in keys:
if hasattr(val, key):
val = getattr(val, key)
else:
return None
return val
def merge_filters(
filters_to_update: Optional[Dict[str, Any]], filters_to_merge: Dict[str, Any]
) -> Dict[str, Any]:
"""Given two groups of filters, merge the second into the first.
List values are appended instead of overwritten:
>>> merge_filters({'filter-name': ['value1']}, {'filter-name':['value2']})
>>> {'filter-name': ['value1', 'value2']}
:param filters_to_update:
The filters to update.
:type filters_to_update:
dict[str, list] or None
:param filters_to_merge:
The filters to merge.
:type filters_to_merge:
dict[str, list] or None
:returns:
The updated filters.
:rtype:
dict[str, list]
"""
if filters_to_update is None:
filters_to_update = {}
if filters_to_merge is None:
filters_to_merge = {}
merge_dicts(filters_to_update, filters_to_merge, append_lists=True)
return filters_to_update
def validate_filters(
filters: Dict[str, Any], filter_defs: Dict[str, FilterDef]
) -> None:
"""Validates filters against a set of filter definitions.
Raises standard Python exceptions which should be caught
and translated to an appropriate AWS/Moto exception higher
up the call stack.
:param dict[str, list] filters:
The filters to validate.
:param dict[str, FilterDef] filter_defs:
The filter definitions to validate against.
:returns: None
:rtype: None
:raises KeyError:
if filter name not found in the filter definitions.
:raises ValueError:
if filter values is an empty list.
:raises NotImplementedError:
if `moto` does not yet support this filter.
"""
for filter_name, filter_values in filters.items():
filter_def = filter_defs.get(filter_name)
if filter_def is None:
raise KeyError(f"Unrecognized filter name: {filter_name}")
if not filter_values:
raise ValueError(f"The list of {filter_def.description} must not be empty.")
if filter_def.attrs_to_check is None:
raise NotImplementedError(
f"{filter_name} filter has not been implemented in Moto yet."
)
def apply_filter(resources: Any, filters: Any, filter_defs: Any) -> Any:
"""Apply an arbitrary filter to a group of resources.
:param dict[str, object] resources:
A dictionary mapping resource identifiers to resource objects.
:param dict[str, list] filters:
The filters to apply.
:param dict[str, FilterDef] filter_defs:
The supported filter definitions for the resource type.
:returns:
The filtered collection of resources.
:rtype:
dict[str, object]
"""
resources_filtered = OrderedDict()
for identifier, obj in resources.items():
matches_filter = False
for filter_name, filter_values in filters.items():
filter_def = filter_defs.get(filter_name)
for attr in filter_def.attrs_to_check:
if get_object_value(obj, attr) in filter_values:
matches_filter = True
break
else:
matches_filter = False
if not matches_filter:
break
if matches_filter:
resources_filtered[identifier] = obj
return resources_filtered
def get_start_date_end_date(
base_date: str, window: str
) -> Tuple[datetime.datetime, datetime.datetime]:
"""Gets the start date and end date given DDD:HH24:MM-DDD:HH24:MM.
:param base_date:
type datetime
:param window:
DDD:HH24:MM-DDD:HH24:MM
:returns:
Start and End Date in datetime format
:rtype:
tuple
"""
days = {"mon": 1, "tue": 2, "wed": 3, "thu": 4, "fri": 5, "sat": 6, "sun": 7}
start = datetime.datetime.strptime(
base_date + " " + window[4:9], "%d/%m/%y %H:%M"
) + datetime.timedelta(days=days[window[0:3]])
end = datetime.datetime.strptime(
base_date + " " + window[14::], "%d/%m/%y %H:%M"
) + datetime.timedelta(days=days[window[10:13]])
return start, end
def get_start_date_end_date_from_time(
base_date: str, window: str
) -> Tuple[datetime.datetime, datetime.datetime, bool]:
"""Gets the start date and end date given HH24:MM-HH24:MM.
:param window:
HH24:MM-HH24:MM
:returns:
Start and End Date in datetime format
along with flag for spills over a day
This is useful when determine time overlaps
:rtype:
tuple
"""
times = window.split("-")
spillover = False
start = datetime.datetime.strptime(base_date + " " + times[0], "%d/%m/%y %H:%M")
end = datetime.datetime.strptime(base_date + " " + times[1], "%d/%m/%y %H:%M")
if end < start:
end += datetime.timedelta(days=1)
spillover = True
return start, end, spillover
def get_overlap_between_two_date_ranges(
start_time_1: datetime.datetime,
end_time_1: datetime.datetime,
start_time_2: datetime.datetime,
end_time_2: datetime.datetime,
) -> int:
"""
Determines overlap between 2 date ranges. Returns the overlap in seconds.
"""
latest_start = max(start_time_1, start_time_2)
earliest_end = min(end_time_1, end_time_2)
delta = earliest_end - latest_start
return (delta.days * SECONDS_IN_ONE_DAY) + delta.seconds
def valid_preferred_maintenance_window(
maintenance_window: Any, backup_window: Any
) -> Optional[str]:
"""Determines validity of preferred_maintenance_window
:param maintenance_windown:
type DDD:HH24:MM-DDD:HH24:MM
:param backup_window:
type HH24:MM-HH24:MM
:returns:
message
:rtype:
str
"""
MINUTES_30 = 1800
HOURS_24 = 86400
base_date = datetime.datetime.now().strftime("%d/%m/%y")
try:
p = re.compile(
"([a-z]{3}):([0-9]{2}):([0-9]{2})-([a-z]{3}):([0-9]{2}):([0-9]{2})"
)
if len(maintenance_window) != 19 or re.search(p, maintenance_window) is None:
return f"Invalid maintenance window format: {maintenance_window}. Should be specified as a range ddd:hh24:mi-ddd:hh24:mi (24H Clock UTC). Example: Sun:23:45-Mon:00:15"
if backup_window:
(
backup_window_start,
backup_window_end,
backup_spill,
) = get_start_date_end_date_from_time(base_date, backup_window)
(
maintenance_window_start,
maintenance_window_end,
maintenance_spill,
) = get_start_date_end_date_from_time(
base_date, maintenance_window[4:10] + maintenance_window[14::]
)
if (
get_overlap_between_two_date_ranges(
backup_window_start,
backup_window_end,
maintenance_window_start,
maintenance_window_end,
)
>= 0
):
return "The backup window and maintenance window must not overlap."
# Due to spill overs, adjust the windows
elif maintenance_spill:
backup_window_start += datetime.timedelta(days=1)
backup_window_end += datetime.timedelta(days=1)
elif backup_spill:
maintenance_window_start += datetime.timedelta(days=1)
maintenance_window_end += datetime.timedelta(days=1)
# If spills, rerun overlap test with adjusted windows
if maintenance_spill or backup_spill:
if (
get_overlap_between_two_date_ranges(
backup_window_start,
backup_window_end,
maintenance_window_start,
maintenance_window_end,
)
>= 0
):
return "The backup window and maintenance window must not overlap."
maintenance_window_start, maintenance_window_end = get_start_date_end_date(
base_date, maintenance_window
)
delta = maintenance_window_end - maintenance_window_start
delta_seconds = delta.seconds + (delta.days * SECONDS_IN_ONE_DAY)
if delta_seconds >= MINUTES_30 and delta_seconds <= HOURS_24:
return None
elif delta_seconds >= 0 and delta_seconds <= MINUTES_30:
return "The maintenance window must be at least 30 minutes."
else:
return "Maintenance window must be less than 24 hours."
except Exception:
return f"Invalid day:hour:minute value: {maintenance_window}"
ORDERABLE_DB_INSTANCE_ENCODING = {
"Engine": "E",
"EngineVersion": "EV",
"DBInstanceClass": "DBIC",
"LicenseModel": "L",
"AvailabilityZones": "AZ",
"MultiAZCapable": "MC",
"ReadReplicaCapable": "RC",
"Vpc": "V",
"SupportsStorageEncryption": "SE",
"StorageType": "ST",
"SupportsIops": "SI",
"SupportsEnhancedMonitoring": "SM",
"SupportsIAMDatabaseAuthentication": "SIAM",
"SupportsPerformanceInsights": "SPI",
"AvailableProcessorFeatures": "APF",
"SupportedEngineModes": "SEM",
"SupportsKerberosAuthentication": "SK",
"OutpostCapable": "O",
"SupportedActivityStreamModes": "SSM",
"SupportsGlobalDatabases": "SGD",
"SupportsClusters": "SC",
"SupportedNetworkTypes": "SN",
"SupportsStorageThroughput": "SST",
}
ORDERABLE_DB_INSTANCE_DECODING = {
v: k for (k, v) in ORDERABLE_DB_INSTANCE_ENCODING.items()
}
def encode_orderable_db_instance(db: Dict[str, Any]) -> Dict[str, Any]:
encoded = copy.deepcopy(db)
if "AvailabilityZones" in encoded:
encoded["AvailabilityZones"] = [
az["Name"] for az in encoded["AvailabilityZones"]
]
return {
ORDERABLE_DB_INSTANCE_ENCODING.get(key, key): value
for key, value in encoded.items()
}
def decode_orderable_db_instance(db: Dict[str, Any]) -> Dict[str, Any]:
decoded = copy.deepcopy(db)
decoded_az = ORDERABLE_DB_INSTANCE_ENCODING.get(
"AvailabilityZones", "AvailabilityZones"
)
if decoded_az in decoded:
decoded["AvailabilityZones"] = [{"Name": az} for az in decoded[decoded_az]]
return {
ORDERABLE_DB_INSTANCE_DECODING.get(key, key): value
for key, value in decoded.items()
}