from datetime import datetime from typing import Any, Dict, Iterable, List, Optional from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.core.utils import unix_time from moto.moto_api._internal import mock_random from moto.utilities.tagging_service import TaggingService from moto.utilities.utils import get_partition from .exceptions import ApplicationNotFound, EventStreamNotFound class App(BaseModel): def __init__(self, account_id: str, region_name: str, name: str): self.application_id = str(mock_random.uuid4()).replace("-", "") self.arn = f"arn:{get_partition(region_name)}:mobiletargeting:{region_name}:{account_id}:apps/{self.application_id}" self.name = name self.created = unix_time() self.settings = AppSettings() self.event_stream: Optional[EventStream] = None def get_settings(self) -> "AppSettings": return self.settings def update_settings(self, settings: Dict[str, Any]) -> "AppSettings": self.settings.update(settings) return self.settings def delete_event_stream(self) -> "EventStream": stream = self.event_stream self.event_stream = None return stream # type: ignore def get_event_stream(self) -> "EventStream": if self.event_stream is None: raise EventStreamNotFound() return self.event_stream def put_event_stream(self, stream_arn: str, role_arn: str) -> "EventStream": self.event_stream = EventStream(stream_arn, role_arn) return self.event_stream def to_json(self) -> Dict[str, Any]: return { "Arn": self.arn, "Id": self.application_id, "Name": self.name, "CreationDate": self.created, } class AppSettings(BaseModel): def __init__(self) -> None: self.settings: Dict[str, Any] = dict() self.last_modified = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ") def update(self, settings: Dict[str, Any]) -> None: self.settings = settings self.last_modified = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ") def to_json(self) -> Dict[str, Any]: return { "CampaignHook": self.settings.get("CampaignHook", {}), "CloudWatchMetricsEnabled": self.settings.get( "CloudWatchMetricsEnabled", False ), "LastModifiedDate": self.last_modified, "Limits": self.settings.get("Limits", {}), "QuietTime": self.settings.get("QuietTime", {}), } class EventStream(BaseModel): def __init__(self, stream_arn: str, role_arn: str): self.stream_arn = stream_arn self.role_arn = role_arn self.last_modified = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ") def to_json(self) -> Dict[str, Any]: return { "DestinationStreamArn": self.stream_arn, "RoleArn": self.role_arn, "LastModifiedDate": self.last_modified, } class PinpointBackend(BaseBackend): """Implementation of Pinpoint APIs.""" def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.apps: Dict[str, App] = {} self.tagger = TaggingService() def create_app(self, name: str, tags: Dict[str, str]) -> App: app = App(self.account_id, self.region_name, name) self.apps[app.application_id] = app tag_list = self.tagger.convert_dict_to_tags_input(tags) self.tagger.tag_resource(app.arn, tag_list) return app def delete_app(self, application_id: str) -> App: self.get_app(application_id) return self.apps.pop(application_id) def get_app(self, application_id: str) -> App: if application_id not in self.apps: raise ApplicationNotFound() return self.apps[application_id] def get_apps(self) -> Iterable[App]: """ Pagination is not yet implemented """ return self.apps.values() def update_application_settings( self, application_id: str, settings: Dict[str, Any] ) -> AppSettings: app = self.get_app(application_id) return app.update_settings(settings) def get_application_settings(self, application_id: str) -> AppSettings: app = self.get_app(application_id) return app.get_settings() def list_tags_for_resource(self, resource_arn: str) -> Dict[str, Dict[str, str]]: tags = self.tagger.get_tag_dict_for_resource(resource_arn) return {"tags": tags} def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None: tag_list = TaggingService.convert_dict_to_tags_input(tags) self.tagger.tag_resource(resource_arn, tag_list) def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: self.tagger.untag_resource_using_names(resource_arn, tag_keys) def put_event_stream( self, application_id: str, stream_arn: str, role_arn: str ) -> EventStream: app = self.get_app(application_id) return app.put_event_stream(stream_arn, role_arn) def get_event_stream(self, application_id: str) -> EventStream: app = self.get_app(application_id) return app.get_event_stream() def delete_event_stream(self, application_id: str) -> EventStream: app = self.get_app(application_id) return app.delete_event_stream() pinpoint_backends = BackendDict(PinpointBackend, "pinpoint")
Memory