"""WorkSpacesWebBackend class with methods for supported APIs."""
import datetime
import uuid
from typing import Any, Dict, List, Optional, Tuple
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.utilities.utils import get_partition
from ..utilities.tagging_service import TaggingService
class FakeUserSettings(BaseModel):
def __init__(
self,
additional_encryption_context: Any,
client_token: str,
cookie_synchronization_configuration: str,
copy_allowed: bool,
customer_managed_key: str,
deep_link_allowed: bool,
disconnect_timeout_in_minutes: int,
download_allowed: bool,
idle_disconnect_timeout_in_minutes: int,
paste_allowed: bool,
print_allowed: bool,
upload_allowed: bool,
region_name: str,
account_id: str,
):
self.user_settings_id = str(uuid.uuid4())
self.arn = self.arn_formatter(self.user_settings_id, account_id, region_name)
self.additional_encryption_context = additional_encryption_context
self.client_token = client_token
self.cookie_synchronization_configuration = cookie_synchronization_configuration
self.copy_allowed = copy_allowed if copy_allowed else "Disabled"
self.customer_managed_key = customer_managed_key
self.deep_link_allowed = deep_link_allowed if deep_link_allowed else "Disabled"
self.disconnect_timeout_in_minutes = disconnect_timeout_in_minutes
self.download_allowed = download_allowed if download_allowed else "Disabled"
self.idle_disconnect_timeout_in_minutes = idle_disconnect_timeout_in_minutes
self.paste_allowed = paste_allowed if paste_allowed else "Disabled"
self.print_allowed = print_allowed if print_allowed else "Disabled"
self.upload_allowed = upload_allowed if upload_allowed else "Disabled"
self.associated_portal_arns: List[str] = []
def arn_formatter(self, _id: str, account_id: str, region_name: str) -> str:
return f"arn:{get_partition(region_name)}:workspaces-web:{region_name}:{account_id}:user-settings/{_id}"
def to_dict(self) -> Dict[str, Any]:
return {
"associatedPortalArns": self.associated_portal_arns,
"additionalEncryptionContext": self.additional_encryption_context,
"clientToken": self.client_token,
"cookieSynchronizationConfiguration": self.cookie_synchronization_configuration,
"copyAllowed": self.copy_allowed,
"customerManagedKey": self.customer_managed_key,
"deepLinkAllowed": self.deep_link_allowed,
"disconnectTimeoutInMinutes": self.disconnect_timeout_in_minutes,
"downloadAllowed": self.download_allowed,
"idleDisconnectTimeoutInMinutes": self.idle_disconnect_timeout_in_minutes,
"pasteAllowed": self.paste_allowed,
"printAllowed": self.print_allowed,
"uploadAllowed": self.upload_allowed,
"userSettingsArn": self.arn,
}
class FakeUserAccessLoggingSettings(BaseModel):
def __init__(
self,
client_token: str,
kinesis_stream_arn: str,
region_name: str,
account_id: str,
):
self.user_access_logging_settings_id = str(uuid.uuid4())
self.arn = self.arn_formatter(
self.user_access_logging_settings_id, account_id, region_name
)
self.client_token = client_token
self.kinesis_stream_arn = kinesis_stream_arn
self.associated_portal_arns: List[str] = []
def arn_formatter(self, _id: str, account_id: str, region_name: str) -> str:
return f"arn:{get_partition(region_name)}:workspaces-web:{region_name}:{account_id}:user-access-logging-settings/{_id}"
def to_dict(self) -> Dict[str, Any]:
return {
"associatedPortalArns": self.associated_portal_arns,
"kinesisStreamArn": self.kinesis_stream_arn,
"userAccessLoggingSettingsArn": self.arn,
}
class FakeNetworkSettings(BaseModel):
def __init__(
self,
security_group_ids: List[str],
subnet_ids: List[str],
vpc_id: str,
region_name: str,
account_id: str,
):
self.network_settings_id = str(uuid.uuid4())
self.arn = self.arn_formatter(self.network_settings_id, account_id, region_name)
self.security_group_ids = security_group_ids
self.subnet_ids = subnet_ids
self.vpc_id = vpc_id
self.associated_portal_arns: List[str] = []
def arn_formatter(self, _id: str, account_id: str, region_name: str) -> str:
return f"arn:{get_partition(region_name)}:workspaces-web:{region_name}:{account_id}:network-settings/{_id}"
def to_dict(self) -> Dict[str, Any]:
return {
"associatedPortalArns": self.associated_portal_arns,
"networkSettingsArn": self.arn,
"securityGroupIds": self.security_group_ids,
"subnetIds": self.subnet_ids,
"vpcId": self.vpc_id,
}
class FakeBrowserSettings(BaseModel):
def __init__(
self,
additional_encryption_context: Any,
browser_policy: str,
client_token: str,
customer_managed_key: str,
region_name: str,
account_id: str,
):
self.browser_settings_id = str(uuid.uuid4())
self.arn = self.arn_formatter(self.browser_settings_id, account_id, region_name)
self.additional_encryption_context = additional_encryption_context
self.browser_policy = browser_policy
self.client_token = client_token
self.customer_managed_key = customer_managed_key
self.associated_portal_arns: List[str] = []
def arn_formatter(self, _id: str, account_id: str, region_name: str) -> str:
return f"arn:{get_partition(region_name)}:workspaces-web:{region_name}:{account_id}:browser-settings/{_id}"
def to_dict(self) -> Dict[str, Any]:
return {
"associatedPortalArns": self.associated_portal_arns,
"browserSettingsArn": self.arn,
"additionalEncryptionContext": self.additional_encryption_context,
"browserPolicy": self.browser_policy,
"customerManagedKey": self.customer_managed_key,
}
class FakePortal(BaseModel):
def __init__(
self,
additional_encryption_context: Any,
authentication_type: str,
client_token: str,
customer_managed_key: str,
display_name: str,
instance_type: str,
max_concurrent_sessions: str,
region_name: str,
account_id: str,
):
self.portal_id = str(uuid.uuid4())
self.arn = self.arn_formatter(self.portal_id, account_id, region_name)
self.additional_encryption_context = additional_encryption_context
self.authentication_type = authentication_type
self.client_token = client_token
self.customer_managed_key = customer_managed_key
self.display_name = display_name
self.instance_type = instance_type
self.max_concurrent_sessions = max_concurrent_sessions
self.portal_endpoint = f"{self.portal_id}.portal.aws"
self.browser_type = "Chrome"
self.creation_time = datetime.datetime.now().isoformat()
self.status = "CREATED"
self.renderer_type = "AppStream"
self.status_reason = "TestStatusReason"
self.browser_settings_arn: Optional[str] = None
self.network_settings_arn: Optional[str] = None
self.trust_store_arn: Optional[str] = None
self.ip_access_settings_arn: Optional[str] = None
self.user_access_logging_settings_arn: Optional[str] = None
self.user_settings_arn: Optional[str] = None
def arn_formatter(self, _id: str, account_id: str, region_name: str) -> str:
return f"arn:{get_partition(region_name)}:workspaces-web:{region_name}:{account_id}:portal/{_id}"
def to_dict(self) -> Dict[str, Any]:
return {
"additionalEncryptionContext": self.additional_encryption_context,
"authenticationType": self.authentication_type,
"browserSettingsArn": self.browser_settings_arn,
"browserType": self.browser_type,
"creationDate": self.creation_time,
"customerManagedKey": self.customer_managed_key,
"displayName": self.display_name,
"instanceType": self.instance_type,
"ipAccessSettingsArn": self.ip_access_settings_arn,
"maxConcurrentSessions": self.max_concurrent_sessions,
"networkSettingsArn": self.network_settings_arn,
"portalArn": self.arn,
"portalEndpoint": self.portal_endpoint,
"portalStatus": self.status,
"rendererType": self.renderer_type,
"statusReason": self.status_reason,
"trustStoreArn": self.trust_store_arn,
"userAccessLoggingSettingsArn": self.user_access_logging_settings_arn,
"userSettingsArn": self.user_settings_arn,
}
class WorkSpacesWebBackend(BaseBackend):
"""Implementation of WorkSpacesWeb APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.network_settings: Dict[str, FakeNetworkSettings] = {}
self.browser_settings: Dict[str, FakeBrowserSettings] = {}
self.user_settings: Dict[str, FakeUserSettings] = {}
self.user_access_logging_settings: Dict[str, FakeUserAccessLoggingSettings] = {}
self.portals: Dict[str, FakePortal] = {}
self.tagger = TaggingService()
def create_network_settings(
self,
security_group_ids: List[str],
subnet_ids: List[str],
tags: Dict[str, str],
vpc_id: str,
) -> str:
network_settings_object = FakeNetworkSettings(
security_group_ids,
subnet_ids,
vpc_id,
self.region_name,
self.account_id,
)
self.network_settings[network_settings_object.arn] = network_settings_object
if tags:
self.tag_resource("TEMP_CLIENT_TOKEN", network_settings_object.arn, tags)
return network_settings_object.arn
def list_network_settings(self) -> List[Dict[str, str]]:
return [
{"networkSettingsArn": network_setting.arn, "vpcId": network_setting.vpc_id}
for network_setting in self.network_settings.values()
]
def get_network_settings(self, network_settings_arn: str) -> Dict[str, Any]:
return self.network_settings[network_settings_arn].to_dict()
def delete_network_settings(self, network_settings_arn: str) -> None:
self.network_settings.pop(network_settings_arn)
def create_browser_settings(
self,
additional_encryption_context: Any,
browser_policy: str,
client_token: str,
customer_managed_key: str,
tags: Optional[List[Dict[str, str]]] = None,
) -> str:
browser_settings_object = FakeBrowserSettings(
additional_encryption_context,
browser_policy,
client_token,
customer_managed_key,
self.region_name,
self.account_id,
)
self.browser_settings[browser_settings_object.arn] = browser_settings_object
if tags:
self.tag_resource(client_token, browser_settings_object.arn, tags)
return browser_settings_object.arn
def list_browser_settings(self) -> List[Dict[str, str]]:
return [
{"browserSettingsArn": browser_setting.arn}
for browser_setting in self.browser_settings.values()
]
def get_browser_settings(self, browser_settings_arn: str) -> Dict[str, Any]:
return self.browser_settings[browser_settings_arn].to_dict()
def delete_browser_settings(self, browser_settings_arn: str) -> None:
self.browser_settings.pop(browser_settings_arn)
def create_portal(
self,
additional_encryption_context: Any,
authentication_type: str,
client_token: str,
customer_managed_key: str,
display_name: str,
instance_type: str,
max_concurrent_sessions: str,
tags: Optional[List[Dict[str, str]]] = None,
) -> Tuple[str, str]:
portal_object = FakePortal(
additional_encryption_context,
authentication_type,
client_token,
customer_managed_key,
display_name,
instance_type,
max_concurrent_sessions,
self.region_name,
self.account_id,
)
self.portals[portal_object.arn] = portal_object
if tags:
self.tag_resource(client_token, portal_object.arn, tags)
return portal_object.arn, portal_object.portal_endpoint
def list_portals(self) -> List[Dict[str, Any]]:
return [
{
"authenticationType": portal.authentication_type,
"browserSettingsArn": portal.browser_settings_arn,
"browserType": portal.browser_type,
"creationDate": portal.creation_time,
"customerManagedKey": portal.customer_managed_key,
"displayName": portal.display_name,
"instanceType": portal.instance_type,
"ipAccessSettingsArn": portal.ip_access_settings_arn,
"maxConcurrentSessions": portal.max_concurrent_sessions,
"networkSettingsArn": portal.network_settings_arn,
"portalArn": portal.arn,
"portalEndpoint": portal.portal_endpoint,
"portalStatus": portal.status,
"rendererType": portal.renderer_type,
"statusReason": portal.status_reason,
"trustStoreArn": portal.trust_store_arn,
"userAccessLoggingSettingsArn": portal.user_access_logging_settings_arn,
"userSettingsArn": portal.user_settings_arn,
}
for portal in self.portals.values()
]
def get_portal(self, portal_arn: str) -> Dict[str, Any]:
return self.portals[portal_arn].to_dict()
def delete_portal(self, portal_arn: str) -> None:
self.portals.pop(portal_arn)
def associate_browser_settings(
self, browser_settings_arn: str, portal_arn: str
) -> Tuple[str, str]:
browser_settings_object = self.browser_settings[browser_settings_arn]
portal_object = self.portals[portal_arn]
browser_settings_object.associated_portal_arns.append(portal_arn)
portal_object.browser_settings_arn = browser_settings_arn
return browser_settings_arn, portal_arn
def associate_network_settings(
self, network_settings_arn: str, portal_arn: str
) -> Tuple[str, str]:
network_settings_object = self.network_settings[network_settings_arn]
portal_object = self.portals[portal_arn]
network_settings_object.associated_portal_arns.append(portal_arn)
portal_object.network_settings_arn = network_settings_arn
return network_settings_arn, portal_arn
def create_user_settings(
self,
additional_encryption_context: Any,
client_token: Any,
cookie_synchronization_configuration: Any,
copy_allowed: Any,
customer_managed_key: Any,
deep_link_allowed: Any,
disconnect_timeout_in_minutes: Any,
download_allowed: Any,
idle_disconnect_timeout_in_minutes: Any,
paste_allowed: Any,
print_allowed: Any,
tags: Any,
upload_allowed: Any,
) -> str:
user_settings_object = FakeUserSettings(
additional_encryption_context,
client_token,
cookie_synchronization_configuration,
copy_allowed,
customer_managed_key,
deep_link_allowed,
disconnect_timeout_in_minutes,
download_allowed,
idle_disconnect_timeout_in_minutes,
paste_allowed,
print_allowed,
upload_allowed,
self.region_name,
self.account_id,
)
self.user_settings[user_settings_object.arn] = user_settings_object
if tags:
self.tag_resource(client_token, user_settings_object.arn, tags)
return user_settings_object.arn
def get_user_settings(self, user_settings_arn: str) -> Dict[str, Any]:
return self.user_settings[user_settings_arn].to_dict()
def delete_user_settings(self, user_settings_arn: str) -> None:
self.user_settings.pop(user_settings_arn)
def create_user_access_logging_settings(
self, client_token: Any, kinesis_stream_arn: Any, tags: Any
) -> str:
user_access_logging_settings_object = FakeUserAccessLoggingSettings(
client_token, kinesis_stream_arn, self.region_name, self.account_id
)
self.user_access_logging_settings[user_access_logging_settings_object.arn] = (
user_access_logging_settings_object
)
if tags:
self.tag_resource(
client_token, user_access_logging_settings_object.arn, tags
)
return user_access_logging_settings_object.arn
def get_user_access_logging_settings(
self, user_access_logging_settings_arn: str
) -> Dict[str, Any]:
return self.user_access_logging_settings[
user_access_logging_settings_arn
].to_dict()
def delete_user_access_logging_settings(
self, user_access_logging_settings_arn: str
) -> None:
self.user_access_logging_settings.pop(user_access_logging_settings_arn)
def associate_user_settings(
self, portal_arn: str, user_settings_arn: str
) -> Tuple[str, str]:
user_settings_object = self.user_settings[user_settings_arn]
portal_object = self.portals[portal_arn]
user_settings_object.associated_portal_arns.append(portal_arn)
portal_object.user_settings_arn = user_settings_arn
return portal_arn, user_settings_arn
def associate_user_access_logging_settings(
self, portal_arn: str, user_access_logging_settings_arn: str
) -> Tuple[str, str]:
user_access_logging_settings_object = self.user_access_logging_settings[
user_access_logging_settings_arn
]
portal_object = self.portals[portal_arn]
user_access_logging_settings_object.associated_portal_arns.append(portal_arn)
portal_object.user_access_logging_settings_arn = (
user_access_logging_settings_arn
)
return portal_arn, user_access_logging_settings_arn
def list_user_settings(self) -> List[Dict[str, str]]:
return [
{"userSettingsArn": user_settings.arn}
for user_settings in self.user_settings.values()
]
def list_user_access_logging_settings(self) -> List[Dict[str, str]]:
return [
{"userAccessLoggingSettingsArn": user_access_logging_settings.arn}
for user_access_logging_settings in self.user_access_logging_settings.values()
]
def tag_resource(self, client_token: str, resource_arn: str, tags: Any) -> None:
self.tagger.tag_resource(resource_arn, tags)
def untag_resource(self, resource_arn: str, tag_keys: Any) -> None:
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
def list_tags_for_resource(self, resource_arn: str) -> List[Dict[str, str]]:
tags = self.tagger.get_tag_dict_for_resource(resource_arn)
Tags = []
for key, value in tags.items():
Tags.append({"Key": key, "Value": value})
return Tags
workspacesweb_backends = BackendDict(WorkSpacesWebBackend, "workspaces-web")