"""Handles Directory Service requests, invokes methods, returns responses."""
import json
from moto.core.exceptions import InvalidToken
from moto.core.responses import BaseResponse
from moto.ds.exceptions import InvalidNextTokenException
from moto.ds.models import DirectoryServiceBackend, ds_backends
class DirectoryServiceResponse(BaseResponse):
"""Handler for DirectoryService requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="ds")
@property
def ds_backend(self) -> DirectoryServiceBackend:
"""Return backend instance specific for this region."""
return ds_backends[self.current_account][self.region]
def connect_directory(self) -> str:
"""Create an AD Connector to connect to a self-managed directory."""
name = self._get_param("Name")
short_name = self._get_param("ShortName")
password = self._get_param("Password")
description = self._get_param("Description")
size = self._get_param("Size")
connect_settings = self._get_param("ConnectSettings")
tags = self._get_param("Tags", [])
directory_id = self.ds_backend.connect_directory(
region=self.region,
name=name,
short_name=short_name,
password=password,
description=description,
size=size,
connect_settings=connect_settings,
tags=tags,
)
return json.dumps({"DirectoryId": directory_id})
def create_directory(self) -> str:
"""Create a Simple AD directory."""
name = self._get_param("Name")
short_name = self._get_param("ShortName")
password = self._get_param("Password")
description = self._get_param("Description")
size = self._get_param("Size")
vpc_settings = self._get_param("VpcSettings")
tags = self._get_param("Tags", [])
directory_id = self.ds_backend.create_directory(
region=self.region,
name=name,
short_name=short_name,
password=password,
description=description,
size=size,
vpc_settings=vpc_settings,
tags=tags,
)
return json.dumps({"DirectoryId": directory_id})
def create_alias(self) -> str:
"""Create an alias and assign the alias to the directory."""
directory_id = self._get_param("DirectoryId")
alias = self._get_param("Alias")
response = self.ds_backend.create_alias(directory_id, alias)
return json.dumps(response)
def create_microsoft_ad(self) -> str:
"""Create a Microsoft AD directory."""
name = self._get_param("Name")
short_name = self._get_param("ShortName")
password = self._get_param("Password")
description = self._get_param("Description")
vpc_settings = self._get_param("VpcSettings")
edition = self._get_param("Edition")
tags = self._get_param("Tags", [])
directory_id = self.ds_backend.create_microsoft_ad(
region=self.region,
name=name,
short_name=short_name,
password=password,
description=description,
vpc_settings=vpc_settings,
edition=edition,
tags=tags,
)
return json.dumps({"DirectoryId": directory_id})
def delete_directory(self) -> str:
"""Delete a Directory Service directory."""
directory_id_arg = self._get_param("DirectoryId")
directory_id = self.ds_backend.delete_directory(directory_id_arg)
return json.dumps({"DirectoryId": directory_id})
def describe_directories(self) -> str:
"""Return directory info for the given IDs or all IDs."""
directory_ids = self._get_param("DirectoryIds")
next_token = self._get_param("NextToken")
limit = self._get_int_param("Limit")
try:
(directories, next_token) = self.ds_backend.describe_directories(
directory_ids, next_token=next_token, limit=limit
)
except InvalidToken as exc:
raise InvalidNextTokenException() from exc
response = {"DirectoryDescriptions": [x.to_dict() for x in directories]}
if next_token:
response["NextToken"] = next_token
return json.dumps(response)
def disable_sso(self) -> str:
"""Disable single-sign on for a directory."""
directory_id = self._get_param("DirectoryId")
username = self._get_param("UserName")
password = self._get_param("Password")
self.ds_backend.disable_sso(directory_id, username, password)
return ""
def enable_sso(self) -> str:
"""Enable single-sign on for a directory."""
directory_id = self._get_param("DirectoryId")
username = self._get_param("UserName")
password = self._get_param("Password")
self.ds_backend.enable_sso(directory_id, username, password)
return ""
def get_directory_limits(self) -> str:
"""Return directory limit information for the current region."""
limits = self.ds_backend.get_directory_limits()
return json.dumps({"DirectoryLimits": limits})
def add_tags_to_resource(self) -> str:
"""Add or overwrite on or more tags for specified directory."""
resource_id = self._get_param("ResourceId")
tags = self._get_param("Tags")
self.ds_backend.add_tags_to_resource(resource_id=resource_id, tags=tags)
return ""
def remove_tags_from_resource(self) -> str:
"""Removes tags from a directory."""
resource_id = self._get_param("ResourceId")
tag_keys = self._get_param("TagKeys")
self.ds_backend.remove_tags_from_resource(
resource_id=resource_id, tag_keys=tag_keys
)
return ""
def list_tags_for_resource(self) -> str:
"""Lists all tags on a directory."""
resource_id = self._get_param("ResourceId")
next_token = self._get_param("NextToken")
limit = self._get_param("Limit")
try:
tags, next_token = self.ds_backend.list_tags_for_resource(
resource_id=resource_id, next_token=next_token, limit=limit
)
except InvalidToken as exc:
raise InvalidNextTokenException() from exc
response = {"Tags": tags}
if next_token:
response["NextToken"] = next_token
return json.dumps(response)
def create_trust(self) -> str:
directory_id = self._get_param("DirectoryId")
remote_domain_name = self._get_param("RemoteDomainName")
trust_password = self._get_param("TrustPassword")
trust_direction = self._get_param("TrustDirection")
trust_type = self._get_param("TrustType")
conditional_forwarder_ip_addrs = self._get_param("ConditionalForwarderIpAddrs")
selective_auth = self._get_param("SelectiveAuth")
trust_id = self.ds_backend.create_trust(
directory_id=directory_id,
remote_domain_name=remote_domain_name,
trust_password=trust_password,
trust_direction=trust_direction,
trust_type=trust_type,
conditional_forwarder_ip_addrs=conditional_forwarder_ip_addrs,
selective_auth=selective_auth,
)
return json.dumps(dict(TrustId=trust_id))
def describe_trusts(self) -> str:
directory_id = self._get_param("DirectoryId")
trust_ids = self._get_param("TrustIds")
next_token = self._get_param("NextToken")
limit = self._get_param("Limit")
trusts, next_token = self.ds_backend.describe_trusts(
directory_id=directory_id,
trust_ids=trust_ids,
next_token=next_token,
limit=limit,
)
trust_list = [trust.to_dict() for trust in trusts]
return json.dumps(dict(Trusts=trust_list, nextToken=next_token))
def delete_trust(self) -> str:
trust_id = self._get_param("TrustId")
delete_associated_conditional_forwarder = self._get_param(
"DeleteAssociatedConditionalForwarder"
)
trust_id = self.ds_backend.delete_trust(
trust_id=trust_id,
delete_associated_conditional_forwarder=delete_associated_conditional_forwarder,
)
return json.dumps(dict(TrustId=trust_id))
def describe_ldaps_settings(self) -> str:
directory_id = self._get_param("DirectoryId")
type = self._get_param("Type")
next_token = self._get_param("NextToken")
limit = self._get_param("Limit")
ldaps_settings_info, next_token = self.ds_backend.describe_ldaps_settings(
directory_id=directory_id,
type=type,
next_token=next_token,
limit=limit,
)
ldaps = [ldap.to_dict() for ldap in ldaps_settings_info]
return json.dumps(dict(LDAPSSettingsInfo=ldaps, nextToken=next_token))
def enable_ldaps(self) -> str:
directory_id = self._get_param("DirectoryId")
type = self._get_param("Type")
self.ds_backend.enable_ldaps(
directory_id=directory_id,
type=type,
)
return ""
def disable_ldaps(self) -> str:
directory_id = self._get_param("DirectoryId")
type = self._get_param("Type")
self.ds_backend.disable_ldaps(
directory_id=directory_id,
type=type,
)
return ""
def describe_settings(self) -> str:
directory_id = self._get_param("DirectoryId")
status = self._get_param("Status")
next_token = self._get_param("NextToken")
setting_entries, next_token = self.ds_backend.describe_settings(
directory_id=directory_id,
status=status,
next_token=next_token,
)
return json.dumps(
dict(
DirectoryId=directory_id,
SettingEntries=setting_entries,
NextToken=next_token,
)
)
def update_settings(self) -> str:
directory_id = self._get_param("DirectoryId")
settings = self._get_param("Settings")
directory_id = self.ds_backend.update_settings(
directory_id=directory_id,
settings=settings,
)
return json.dumps(dict(DirectoryId=directory_id))
def create_log_subscription(self) -> str:
directory_id = self._get_param("DirectoryId")
log_group_name = self._get_param("LogGroupName")
self.ds_backend.create_log_subscription(
directory_id=directory_id,
log_group_name=log_group_name,
)
return json.dumps(dict())
def list_log_subscriptions(self) -> str:
directory_id = self._get_param("DirectoryId")
next_token = self._get_param("NextToken")
limit = self._get_param("Limit")
log_subscriptions, next_token = self.ds_backend.list_log_subscriptions(
directory_id=directory_id,
next_token=next_token,
limit=limit,
)
list_subscriptions = [sub.to_dict() for sub in log_subscriptions]
return json.dumps(
dict(LogSubscriptions=list_subscriptions, NextToken=next_token)
)
def delete_log_subscription(self) -> str:
directory_id = self._get_param("DirectoryId")
self.ds_backend.delete_log_subscription(
directory_id=directory_id,
)
return json.dumps(dict())