"""KafkaBackend class with methods for supported APIs."""
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.utilities.utils import get_partition
from ..utilities.tagging_service import TaggingService
class FakeKafkaCluster(BaseModel):
def __init__(
self,
cluster_name: str,
account_id: str,
region_name: str,
cluster_type: str,
tags: Optional[Dict[str, str]] = None,
broker_node_group_info: Optional[Dict[str, Any]] = None,
kafka_version: Optional[str] = None,
number_of_broker_nodes: Optional[int] = None,
configuration_info: Optional[Dict[str, Any]] = None,
serverless_config: Optional[Dict[str, Any]] = None,
encryption_info: Optional[Dict[str, Any]] = None,
enhanced_monitoring: str = "DEFAULT",
open_monitoring: Optional[Dict[str, Any]] = None,
logging_info: Optional[Dict[str, Any]] = None,
storage_mode: str = "LOCAL",
current_version: str = "1.0",
client_authentication: Optional[Dict[str, Any]] = None,
state: str = "CREATING",
active_operation_arn: Optional[str] = None,
zookeeper_connect_string: Optional[str] = None,
zookeeper_connect_string_tls: Optional[str] = None,
):
# General attributes
self.cluster_id = str(uuid.uuid4())
self.cluster_name = cluster_name
self.account_id = account_id
self.region_name = region_name
self.cluster_type = cluster_type
self.tags = tags or {}
self.state = state
self.creation_time = datetime.now().isoformat()
self.current_version = current_version
self.active_operation_arn = active_operation_arn
self.arn = self._generate_arn()
# Attributes specific to PROVISIONED clusters
self.broker_node_group_info = broker_node_group_info
self.kafka_version = kafka_version
self.number_of_broker_nodes = number_of_broker_nodes
self.configuration_info = configuration_info
self.encryption_info = encryption_info
self.enhanced_monitoring = enhanced_monitoring
self.open_monitoring = open_monitoring
self.logging_info = logging_info
self.storage_mode = storage_mode
self.client_authentication = client_authentication
self.zookeeper_connect_string = zookeeper_connect_string
self.zookeeper_connect_string_tls = zookeeper_connect_string_tls
# Attributes specific to SERVERLESS clusters
self.serverless_config = serverless_config
def _generate_arn(self) -> str:
resource_type = (
"cluster" if self.cluster_type == "PROVISIONED" else "serverless-cluster"
)
partition = get_partition(self.region_name)
return f"arn:{partition}:kafka:{self.region_name}:{self.account_id}:{resource_type}/{self.cluster_id}"
class KafkaBackend(BaseBackend):
"""Implementation of Kafka APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.clusters: Dict[str, FakeKafkaCluster] = {}
self.tagger = TaggingService()
def create_cluster_v2(
self,
cluster_name: str,
tags: Optional[Dict[str, str]],
provisioned: Optional[Dict[str, Any]],
serverless: Optional[Dict[str, Any]],
) -> Tuple[str, str, str, str]:
if provisioned:
cluster_type = "PROVISIONED"
broker_node_group_info = provisioned.get("brokerNodeGroupInfo")
kafka_version = provisioned.get("kafkaVersion", "default-kafka-version")
number_of_broker_nodes = int(provisioned.get("numberOfBrokerNodes", 1))
storage_mode = provisioned.get("storageMode", "LOCAL")
serverless_config = None
elif serverless:
cluster_type = "SERVERLESS"
broker_node_group_info = None
kafka_version = None
number_of_broker_nodes = None
storage_mode = None
serverless_config = serverless
new_cluster = FakeKafkaCluster(
cluster_name=cluster_name,
account_id=self.account_id,
region_name=self.region_name,
cluster_type=cluster_type,
broker_node_group_info=broker_node_group_info,
kafka_version=kafka_version,
number_of_broker_nodes=number_of_broker_nodes,
serverless_config=serverless_config,
tags=tags,
state="CREATING",
storage_mode=storage_mode if storage_mode else "LOCAL",
current_version="1.0",
)
self.clusters[new_cluster.arn] = new_cluster
if tags:
self.tag_resource(new_cluster.arn, tags)
return (
new_cluster.arn,
new_cluster.cluster_name,
new_cluster.state,
new_cluster.cluster_type,
)
def describe_cluster_v2(self, cluster_arn: str) -> Dict[str, Any]:
cluster = self.clusters[cluster_arn]
cluster_info: Dict[str, Any] = {
"activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation",
"clusterArn": cluster.arn,
"clusterName": cluster.cluster_name,
"clusterType": cluster.cluster_type,
"creationTime": cluster.creation_time,
"currentVersion": cluster.current_version,
"state": cluster.state,
"stateInfo": {
"code": "string",
"message": "Cluster state details.",
},
"tags": self.list_tags_for_resource(cluster.arn),
}
if cluster.cluster_type == "PROVISIONED":
cluster_info.update(
{
"provisioned": {
"brokerNodeGroupInfo": cluster.broker_node_group_info or {},
"clientAuthentication": cluster.client_authentication or {},
"currentBrokerSoftwareInfo": {
"configurationArn": (cluster.configuration_info or {}).get(
"arn", "string"
),
"configurationRevision": (
cluster.configuration_info or {}
).get("revision", 1),
"kafkaVersion": cluster.kafka_version,
},
"encryptionInfo": cluster.encryption_info or {},
"enhancedMonitoring": cluster.enhanced_monitoring,
"openMonitoring": cluster.open_monitoring or {},
"loggingInfo": cluster.logging_info or {},
"numberOfBrokerNodes": cluster.number_of_broker_nodes or 0,
"zookeeperConnectString": cluster.zookeeper_connect_string
or "zookeeper.example.com:2181",
"zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls
or "zookeeper.example.com:2181",
"storageMode": cluster.storage_mode,
"customerActionStatus": "NONE",
}
}
)
elif cluster.cluster_type == "SERVERLESS":
cluster_info.update(
{
"serverless": {
"vpcConfigs": cluster.serverless_config.get("vpcConfigs", [])
if cluster.serverless_config
else [],
"clientAuthentication": cluster.serverless_config.get(
"clientAuthentication", {}
)
if cluster.serverless_config
else {},
}
}
)
return cluster_info
def list_clusters_v2(
self,
cluster_name_filter: Optional[str],
cluster_type_filter: Optional[str],
max_results: Optional[int],
next_token: Optional[str],
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
cluster_info_list = []
for cluster_arn in self.clusters.keys():
cluster_info = self.describe_cluster_v2(cluster_arn)
cluster_info_list.append(cluster_info)
return cluster_info_list, None
def create_cluster(
self,
broker_node_group_info: Dict[str, Any],
client_authentication: Optional[Dict[str, Any]],
cluster_name: str,
configuration_info: Optional[Dict[str, Any]] = None,
encryption_info: Optional[Dict[str, Any]] = None,
enhanced_monitoring: str = "DEFAULT",
open_monitoring: Optional[Dict[str, Any]] = None,
kafka_version: str = "2.8.1",
logging_info: Optional[Dict[str, Any]] = None,
number_of_broker_nodes: int = 1,
tags: Optional[Dict[str, str]] = None,
storage_mode: str = "LOCAL",
) -> Tuple[str, str, str]:
new_cluster = FakeKafkaCluster(
cluster_name=cluster_name,
account_id=self.account_id,
region_name=self.region_name,
cluster_type="PROVISIONED",
broker_node_group_info=broker_node_group_info,
client_authentication=client_authentication,
kafka_version=kafka_version,
number_of_broker_nodes=number_of_broker_nodes,
configuration_info=configuration_info,
encryption_info=encryption_info,
enhanced_monitoring=enhanced_monitoring,
open_monitoring=open_monitoring,
logging_info=logging_info,
storage_mode=storage_mode,
)
self.clusters[new_cluster.arn] = new_cluster
if tags:
self.tag_resource(new_cluster.arn, tags)
return new_cluster.arn, new_cluster.cluster_name, new_cluster.state
def describe_cluster(self, cluster_arn: str) -> Dict[str, Any]:
cluster = self.clusters[cluster_arn]
return {
"activeOperationArn": "arn:aws:kafka:region:account-id:operation/active-operation",
"brokerNodeGroupInfo": cluster.broker_node_group_info or {},
"clientAuthentication": cluster.client_authentication or {},
"clusterArn": cluster.arn,
"clusterName": cluster.cluster_name,
"creationTime": cluster.creation_time,
"currentBrokerSoftwareInfo": {
"configurationArn": (cluster.configuration_info or {}).get(
"arn", "string"
),
"configurationRevision": (cluster.configuration_info or {}).get(
"revision", 1
),
"kafkaVersion": cluster.kafka_version,
},
"currentVersion": cluster.current_version,
"encryptionInfo": cluster.encryption_info or {},
"enhancedMonitoring": cluster.enhanced_monitoring,
"openMonitoring": cluster.open_monitoring or {},
"loggingInfo": cluster.logging_info or {},
"numberOfBrokerNodes": cluster.number_of_broker_nodes or 0,
"state": cluster.state,
"stateInfo": {
"code": "string",
"message": "Cluster state details.",
},
"tags": self.list_tags_for_resource(cluster.arn),
"zookeeperConnectString": cluster.zookeeper_connect_string
or "zookeeper.example.com:2181",
"zookeeperConnectStringTls": cluster.zookeeper_connect_string_tls
or "zookeeper.example.com:2181",
"storageMode": cluster.storage_mode,
"customerActionStatus": "NONE",
}
def list_clusters(
self,
cluster_name_filter: Optional[str],
max_results: Optional[int],
next_token: Optional[str],
) -> List[Dict[str, Any]]:
cluster_info_list = [
{
"clusterArn": cluster.arn,
"clusterName": cluster.cluster_name,
"state": cluster.state,
"creationTime": cluster.creation_time,
"clusterType": cluster.cluster_type,
}
for cluster_arn, cluster in self.clusters.items()
]
return cluster_info_list
def delete_cluster(self, cluster_arn: str, current_version: str) -> Tuple[str, str]:
cluster = self.clusters.pop(cluster_arn)
return cluster_arn, cluster.state
def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]:
return self.tagger.get_tag_dict_for_resource(resource_arn)
def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None:
tags_list = [{"Key": k, "Value": v} for k, v in tags.items()]
self.tagger.tag_resource(resource_arn, tags_list)
def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None:
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
kafka_backends = BackendDict(KafkaBackend, "kafka")