import base64
from typing import Any, Dict, Iterable, List, Optional, Tuple
import xmltodict
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import utcnow
from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService
from moto.utilities.utils import get_partition
from .configuration import DEFAULT_CONFIGURATION_DATA
from .exceptions import (
UnknownBroker,
UnknownConfiguration,
UnknownEngineType,
UnknownUser,
)
class ConfigurationRevision(BaseModel):
def __init__(
self,
configuration_id: str,
revision_id: str,
description: str,
data: Optional[str] = None,
):
self.configuration_id = configuration_id
self.created = utcnow()
self.description = description
self.is_invalid = False
self.revision_id = revision_id
if data is None:
self.data = DEFAULT_CONFIGURATION_DATA
else:
self.data = data
def has_ldap_auth(self) -> bool:
try:
xml = base64.b64decode(self.data)
dct = xmltodict.parse(xml, dict_constructor=dict)
return (
"cachedLDAPAuthorizationMap"
in dct["broker"]["plugins"]["authorizationPlugin"]["map"]
)
except Exception:
# There are many configurations to enable LDAP
# We're only checking for one here
# If anything fails, lets assume it's not LDAP
return False
@property
def revision(self) -> str:
return self.revision_id
class Configuration(BaseModel):
def __init__(
self,
account_id: str,
region: str,
name: str,
engine_type: str,
engine_version: str,
):
self.id = f"c-{mock_random.get_random_hex(6)}"
self.arn = f"arn:{get_partition(region)}:mq:{region}:{account_id}:configuration:{self.id}"
self.created = utcnow()
self.name = name
self.engine_type = engine_type
self.engine_version = engine_version
self.revisions: Dict[str, ConfigurationRevision] = dict()
default_desc = (
f"Auto-generated default for {self.name} on {engine_type} {engine_version}"
)
latest_revision = ConfigurationRevision(
configuration_id=self.id, revision_id="1", description=default_desc
)
self.revisions[latest_revision.revision_id] = latest_revision
self.authentication_strategy = (
"ldap" if latest_revision.has_ldap_auth() else "simple"
)
def update(self, data: str, description: str) -> None:
max_revision_id, _ = sorted(self.revisions.items())[-1]
next_revision_id = str(int(max_revision_id) + 1)
latest_revision = ConfigurationRevision(
configuration_id=self.id,
revision_id=next_revision_id,
description=description,
data=data,
)
self.revisions[next_revision_id] = latest_revision
self.authentication_strategy = (
"ldap" if latest_revision.has_ldap_auth() else "simple"
)
def get_revision(self, revision_id: str) -> ConfigurationRevision:
return self.revisions[revision_id]
@property
def latest_revision(self) -> ConfigurationRevision:
_, latest_revision = sorted(self.revisions.items())[-1]
return latest_revision
class User(BaseModel):
def __init__(
self,
broker_id: str,
username: str,
console_access: Optional[bool] = None,
groups: Optional[List[str]] = None,
):
self.broker_id = broker_id
self.username = username
self.console_access = console_access or False
self.groups = groups or []
def update(
self, console_access: Optional[bool], groups: Optional[List[str]]
) -> None:
if console_access is not None:
self.console_access = console_access
if groups:
self.groups = groups
class Broker(BaseModel):
def __init__(
self,
name: str,
account_id: str,
region: str,
authentication_strategy: str,
auto_minor_version_upgrade: bool,
configuration: Dict[str, Any],
deployment_mode: str,
encryption_options: Optional[Dict[str, Any]],
engine_type: str,
engine_version: str,
host_instance_type: str,
ldap_server_metadata: Optional[Dict[str, Any]],
logs: Dict[str, bool],
maintenance_window_start_time: Optional[Dict[str, Any]],
publicly_accessible: bool,
security_groups: List[str],
storage_type: str,
subnet_ids: List[str],
users: List[Dict[str, Any]],
):
self.name = name
self.id = mock_random.get_random_hex(6)
self.arn = (
f"arn:{get_partition(region)}:mq:{region}:{account_id}:broker:{self.id}"
)
self.state = "RUNNING"
self.created = utcnow()
self.authentication_strategy = authentication_strategy
self.auto_minor_version_upgrade = auto_minor_version_upgrade
self.deployment_mode = deployment_mode
self.encryption_options = encryption_options
if not self.encryption_options:
self.encryption_options = {"useAwsOwnedKey": True}
self.engine_type = engine_type
self.engine_version = engine_version
self.host_instance_type = host_instance_type
self.ldap_server_metadata = ldap_server_metadata
self.logs = logs
if "general" not in self.logs:
self.logs["general"] = False
if "audit" not in self.logs:
if self.engine_type.upper() == "ACTIVEMQ":
self.logs["audit"] = False
self.maintenance_window_start_time = maintenance_window_start_time
if not self.maintenance_window_start_time:
self.maintenance_window_start_time = {
"dayOfWeek": "Sunday",
"timeOfDay": "00:00",
"timeZone": "UTC",
}
self.publicly_accessible = publicly_accessible
self.security_groups = security_groups
self.storage_type = storage_type
self.subnet_ids = subnet_ids
if not self.subnet_ids:
if self.deployment_mode == "CLUSTER_MULTI_AZ":
self.subnet_ids = [
"default-az1",
"default-az2",
"default-az3",
"default-az4",
]
elif self.deployment_mode == "ACTIVE_STANDBY_MULTI_AZ":
self.subnet_ids = ["active-subnet", "standby-subnet"]
else:
self.subnet_ids = ["default-subnet"]
self._users: Dict[str, User] = dict()
for user in users:
self.create_user(
username=user["username"],
groups=user.get("groups", []),
console_access=user.get("consoleAccess", False),
)
self.configurations: Dict[str, Any] = {"current": configuration, "history": []}
if self.engine_type.upper() == "RABBITMQ":
console_url = f"https://0000.mq.{region}.amazonaws.com"
endpoints = ["amqps://mockmq:5671"]
else:
console_url = f"https://0000.mq.{region}.amazonaws.com:8162"
endpoints = [
"ssl://mockmq:61617",
"amqp+ssl://mockmq:5671",
"stomp+ssl://mockmq:61614",
"mqtt+ssl://mockmq:8883",
"wss://mockmq:61619",
]
self.instances = [
{
"consoleURL": console_url,
"endpoints": endpoints,
"ipAddress": "192.168.0.1",
}
]
if deployment_mode == "ACTIVE_STANDBY_MULTI_AZ":
self.instances.append(
{
"consoleURL": console_url,
"endpoints": endpoints,
"ipAddress": "192.168.0.2",
}
)
def update(
self,
authentication_strategy: Optional[str],
auto_minor_version_upgrade: Optional[bool],
configuration: Optional[Dict[str, Any]],
engine_version: Optional[str],
host_instance_type: Optional[str],
ldap_server_metadata: Optional[Dict[str, Any]],
logs: Optional[Dict[str, bool]],
maintenance_window_start_time: Optional[Dict[str, str]],
security_groups: Optional[List[str]],
) -> None:
if authentication_strategy:
self.authentication_strategy = authentication_strategy
if auto_minor_version_upgrade is not None:
self.auto_minor_version_upgrade = auto_minor_version_upgrade
if configuration:
self.configurations["history"].append(self.configurations["current"])
self.configurations["current"] = configuration
if engine_version:
self.engine_version = engine_version
if host_instance_type:
self.host_instance_type = host_instance_type
if ldap_server_metadata:
self.ldap_server_metadata = ldap_server_metadata
if logs:
self.logs = logs
if maintenance_window_start_time:
self.maintenance_window_start_time = maintenance_window_start_time
if security_groups:
self.security_groups = security_groups
def reboot(self) -> None:
pass
def create_user(
self, username: str, console_access: bool, groups: List[str]
) -> None:
user = User(self.id, username, console_access, groups)
self._users[username] = user
def update_user(
self, username: str, console_access: bool, groups: List[str]
) -> None:
user = self.get_user(username)
user.update(console_access, groups)
def get_user(self, username: str) -> User:
if username not in self._users:
raise UnknownUser(username)
return self._users[username]
def delete_user(self, username: str) -> None:
self._users.pop(username, None)
def list_users(self) -> Iterable[User]:
return self._users.values()
@property
def users(self) -> Iterable[User]:
return self._users.values()
class MQBackend(BaseBackend):
"""
No EC2 integration exists yet - subnet ID's and security group values are not validated. Default values may not exist.
"""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.brokers: Dict[str, Broker] = dict()
self.configs: Dict[str, Configuration] = dict()
self.tagger = TaggingService()
def create_broker(
self,
authentication_strategy: str,
auto_minor_version_upgrade: bool,
broker_name: str,
configuration: Optional[Dict[str, Any]],
deployment_mode: str,
encryption_options: Optional[Dict[str, Any]],
engine_type: str,
engine_version: str,
host_instance_type: str,
ldap_server_metadata: Optional[Dict[str, Any]],
logs: Dict[str, bool],
maintenance_window_start_time: Optional[Dict[str, Any]],
publicly_accessible: bool,
security_groups: List[str],
storage_type: str,
subnet_ids: List[str],
tags: Dict[str, str],
users: List[Dict[str, Any]],
) -> Tuple[str, str]:
if configuration is None:
# create default configuration
default_config = self.create_configuration(
name=f"{broker_name}-configuration",
engine_type=engine_type,
engine_version=engine_version,
tags={},
)
configuration = {"id": default_config.id, "revision": 1}
broker = Broker(
name=broker_name,
account_id=self.account_id,
region=self.region_name,
authentication_strategy=authentication_strategy,
auto_minor_version_upgrade=auto_minor_version_upgrade,
configuration=configuration,
deployment_mode=deployment_mode,
encryption_options=encryption_options,
engine_type=engine_type,
engine_version=engine_version,
host_instance_type=host_instance_type,
ldap_server_metadata=ldap_server_metadata,
logs=logs,
maintenance_window_start_time=maintenance_window_start_time,
publicly_accessible=publicly_accessible,
security_groups=security_groups,
storage_type=storage_type,
subnet_ids=subnet_ids,
users=users,
)
self.brokers[broker.id] = broker
self.create_tags(broker.arn, tags)
return broker.arn, broker.id
def delete_broker(self, broker_id: str) -> None:
del self.brokers[broker_id]
def describe_broker(self, broker_id: str) -> Broker:
if broker_id not in self.brokers:
raise UnknownBroker(broker_id)
return self.brokers[broker_id]
def reboot_broker(self, broker_id: str) -> None:
self.brokers[broker_id].reboot()
def list_brokers(self) -> Iterable[Broker]:
"""
Pagination is not yet implemented
"""
return self.brokers.values()
def create_user(
self, broker_id: str, username: str, console_access: bool, groups: List[str]
) -> None:
broker = self.describe_broker(broker_id)
broker.create_user(username, console_access, groups)
def update_user(
self, broker_id: str, console_access: bool, groups: List[str], username: str
) -> None:
broker = self.describe_broker(broker_id)
broker.update_user(username, console_access, groups)
def describe_user(self, broker_id: str, username: str) -> User:
broker = self.describe_broker(broker_id)
return broker.get_user(username)
def delete_user(self, broker_id: str, username: str) -> None:
broker = self.describe_broker(broker_id)
broker.delete_user(username)
def list_users(self, broker_id: str) -> Iterable[User]:
broker = self.describe_broker(broker_id)
return broker.list_users()
def create_configuration(
self, name: str, engine_type: str, engine_version: str, tags: Dict[str, str]
) -> Configuration:
if engine_type.upper() not in ["ACTIVEMQ", "RABBITMQ"]:
raise UnknownEngineType(engine_type)
config = Configuration(
account_id=self.account_id,
region=self.region_name,
name=name,
engine_type=engine_type,
engine_version=engine_version,
)
self.configs[config.id] = config
self.tagger.tag_resource(
config.arn, self.tagger.convert_dict_to_tags_input(tags)
)
return config
def update_configuration(
self, config_id: str, data: str, description: str
) -> Configuration:
"""
No validation occurs on the provided XML. The authenticationStrategy may be changed depending on the provided configuration.
"""
config = self.configs[config_id]
config.update(data, description)
return config
def describe_configuration(self, config_id: str) -> Configuration:
if config_id not in self.configs:
raise UnknownConfiguration(config_id)
return self.configs[config_id]
def describe_configuration_revision(
self, config_id: str, revision_id: str
) -> ConfigurationRevision:
config = self.configs[config_id]
return config.get_revision(revision_id)
def list_configurations(self) -> Iterable[Configuration]:
"""
Pagination has not yet been implemented.
"""
return self.configs.values()
def create_tags(self, resource_arn: str, tags: Dict[str, str]) -> None:
self.tagger.tag_resource(
resource_arn, self.tagger.convert_dict_to_tags_input(tags)
)
def list_tags(self, arn: str) -> Dict[str, str]:
return self.tagger.get_tag_dict_for_resource(arn)
def delete_tags(self, resource_arn: str, tag_keys: List[str]) -> None:
if not isinstance(tag_keys, list):
tag_keys = [tag_keys]
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
def update_broker(
self,
authentication_strategy: str,
auto_minor_version_upgrade: bool,
broker_id: str,
configuration: Dict[str, Any],
engine_version: str,
host_instance_type: str,
ldap_server_metadata: Dict[str, Any],
logs: Dict[str, bool],
maintenance_window_start_time: Dict[str, str],
security_groups: List[str],
) -> None:
broker = self.describe_broker(broker_id)
broker.update(
authentication_strategy=authentication_strategy,
auto_minor_version_upgrade=auto_minor_version_upgrade,
configuration=configuration,
engine_version=engine_version,
host_instance_type=host_instance_type,
ldap_server_metadata=ldap_server_metadata,
logs=logs,
maintenance_window_start_time=maintenance_window_start_time,
security_groups=security_groups,
)
mq_backends = BackendDict(MQBackend, "mq")