"""Handles incoming kafka requests, invokes methods, returns responses."""
import json
from urllib.parse import unquote
from moto.core.responses import BaseResponse
from .models import KafkaBackend, kafka_backends
class KafkaResponse(BaseResponse):
"""Handler for Kafka requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="kafka")
@property
def kafka_backend(self) -> KafkaBackend:
"""Return backend instance specific for this region."""
return kafka_backends[self.current_account][self.region]
def create_cluster_v2(self) -> str:
cluster_name = self._get_param("clusterName")
tags = self._get_param("tags")
provisioned = self._get_param("provisioned")
serverless = self._get_param("serverless")
cluster_arn, cluster_name, state, cluster_type = (
self.kafka_backend.create_cluster_v2(
cluster_name=cluster_name,
tags=tags,
provisioned=provisioned,
serverless=serverless,
)
)
return json.dumps(
dict(
clusterArn=cluster_arn,
clusterName=cluster_name,
state=state,
clusterType=cluster_type,
)
)
def describe_cluster_v2(self) -> str:
cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1])
cluster_info = self.kafka_backend.describe_cluster_v2(
cluster_arn=cluster_arn,
)
return json.dumps(dict(clusterInfo=cluster_info))
def list_clusters_v2(self) -> str:
cluster_name_filter = self._get_param("clusterNameFilter")
cluster_type_filter = self._get_param("clusterTypeFilter")
max_results = self._get_param("maxResults")
next_token = self._get_param("nextToken")
cluster_info_list, next_token = self.kafka_backend.list_clusters_v2(
cluster_name_filter=cluster_name_filter,
cluster_type_filter=cluster_type_filter,
max_results=max_results,
next_token=next_token,
)
return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token))
def list_tags_for_resource(self) -> str:
resource_arn = unquote(self.parsed_url.path.split("/tags/")[-1])
tags = self.kafka_backend.list_tags_for_resource(
resource_arn=resource_arn,
)
return json.dumps(dict(tags=tags))
def tag_resource(self) -> str:
resource_arn = unquote(self._get_param("resourceArn"))
tags = self._get_param("tags")
self.kafka_backend.tag_resource(
resource_arn=resource_arn,
tags=tags,
)
return json.dumps(dict())
def untag_resource(self) -> str:
resource_arn = unquote(self._get_param("resourceArn"))
tag_keys = self.__dict__["data"]["tagKeys"]
self.kafka_backend.untag_resource(
resource_arn=resource_arn,
tag_keys=tag_keys,
)
return json.dumps(dict())
def create_cluster(self) -> str:
broker_node_group_info = self._get_param("brokerNodeGroupInfo")
client_authentication = self._get_param("clientAuthentication")
cluster_name = self._get_param("clusterName")
configuration_info = self._get_param("configurationInfo")
encryption_info = self._get_param("encryptionInfo")
enhanced_monitoring = self._get_param("enhancedMonitoring")
open_monitoring = self._get_param("openMonitoring")
kafka_version = self._get_param("kafkaVersion")
logging_info = self._get_param("loggingInfo")
number_of_broker_nodes = self._get_param("numberOfBrokerNodes")
tags = self._get_param("tags")
storage_mode = self._get_param("storageMode")
cluster_arn, cluster_name, state = self.kafka_backend.create_cluster(
broker_node_group_info=broker_node_group_info,
client_authentication=client_authentication,
cluster_name=cluster_name,
configuration_info=configuration_info,
encryption_info=encryption_info,
enhanced_monitoring=enhanced_monitoring,
open_monitoring=open_monitoring,
kafka_version=kafka_version,
logging_info=logging_info,
number_of_broker_nodes=number_of_broker_nodes,
tags=tags,
storage_mode=storage_mode,
)
return json.dumps(
dict(clusterArn=cluster_arn, clusterName=cluster_name, state=state)
)
def describe_cluster(self) -> str:
cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1])
cluster_info = self.kafka_backend.describe_cluster(
cluster_arn=cluster_arn,
)
return json.dumps(dict(clusterInfo=cluster_info))
def delete_cluster(self) -> str:
cluster_arn = unquote(self.parsed_url.path.split("/clusters/")[-1])
current_version = self._get_param("currentVersion")
cluster_arn, state = self.kafka_backend.delete_cluster(
cluster_arn=cluster_arn,
current_version=current_version,
)
return json.dumps(dict(clusterArn=cluster_arn, state=state))
def list_clusters(self) -> str:
cluster_name_filter = self._get_param("clusterNameFilter")
max_results = self._get_param("maxResults")
next_token = self._get_param("nextToken")
cluster_info_list = self.kafka_backend.list_clusters(
cluster_name_filter=cluster_name_filter,
max_results=max_results,
next_token=next_token,
)
return json.dumps(dict(clusterInfoList=cluster_info_list, nextToken=next_token))