import datetime import email import json from email.encoders import encode_7or8bit from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.utils import formataddr, getaddresses, parseaddr from typing import Any, Dict, List, Optional from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.core.utils import utcnow from moto.sns.models import sns_backends from .exceptions import ( AlreadyExists, ConfigurationSetAlreadyExists, ConfigurationSetDoesNotExist, EventDestinationAlreadyExists, InvalidParameterValue, InvalidRenderingParameterException, MessageRejectedError, RuleDoesNotExist, RuleSetDoesNotExist, TemplateDoesNotExist, TemplateNameAlreadyExists, ValidationError, ) from .feedback import BOUNCE, COMMON_MAIL, COMPLAINT, DELIVERY from .template import parse_template from .utils import get_random_message_id, is_valid_address RECIPIENT_LIMIT = 50 PAGINATION_MODEL = { "list_configuration_sets": { "input_token": "next_token", "limit_key": "max_items", "limit_default": 100, "unique_attribute": ["configuration_set_name"], }, } class SESFeedback(BaseModel): BOUNCE = "Bounce" COMPLAINT = "Complaint" DELIVERY = "Delivery" SUCCESS_ADDR = "success" BOUNCE_ADDR = "bounce" COMPLAINT_ADDR = "complaint" FEEDBACK_SUCCESS_MSG = {"test": "success"} FEEDBACK_BOUNCE_MSG = {"test": "bounce"} FEEDBACK_COMPLAINT_MSG = {"test": "complaint"} FORWARDING_ENABLED = "feedback_forwarding_enabled" @staticmethod def generate_message(account_id: str, msg_type: str) -> Dict[str, Any]: # type: ignore[misc] msg: Dict[str, Any] = dict(COMMON_MAIL) msg["mail"]["sendingAccountId"] = account_id if msg_type == SESFeedback.BOUNCE: msg["bounce"] = BOUNCE elif msg_type == SESFeedback.COMPLAINT: msg["complaint"] = COMPLAINT elif msg_type == SESFeedback.DELIVERY: msg["delivery"] = DELIVERY return msg class Message(BaseModel): def __init__( self, message_id: str, source: str, subject: str, body: str, destinations: Dict[str, List[str]], ): self.id = message_id self.source = source self.subject = subject self.body = body self.destinations = destinations class TemplateMessage(BaseModel): def __init__( self, message_id: str, source: str, template: List[str], template_data: List[str], destinations: Any, ): self.id = message_id self.source = source self.template = template self.template_data = template_data self.destinations = destinations class BulkTemplateMessage(BaseModel): def __init__( self, message_ids: List[str], source: str, template: List[str], template_data: List[str], destinations: Any, ): self.ids = message_ids self.source = source self.template = template self.template_data = template_data self.destinations = destinations class RawMessage(BaseModel): def __init__( self, message_id: str, source: str, destinations: List[str], raw_data: str ): self.id = message_id self.source = source self.destinations = destinations self.raw_data = raw_data class SESQuota(BaseModel): def __init__(self, sent: int): self.sent = sent @property def sent_past_24(self) -> int: return self.sent class ConfigurationSet(BaseModel): def __init__( self, configuration_set_name: str, tracking_options: Optional[Dict[str, str]] = {}, delivery_options: Optional[Dict[str, Any]] = {}, reputation_options: Optional[Dict[str, Any]] = {}, sending_options: Optional[Dict[str, bool]] = {}, tags: Optional[List[Dict[str, str]]] = [], suppression_options: Optional[Dict[str, List[str]]] = {}, vdm_options: Optional[Dict[str, Dict[str, str]]] = {}, ) -> None: # Shared between SES and SESv2 self.configuration_set_name = configuration_set_name self.tracking_options = tracking_options self.delivery_options = delivery_options self.reputation_options = reputation_options self.enabled = sending_options # Enabled in v1, SendingOptions in v2 # SESv2 specific fields self.tags = tags self.suppression_options = suppression_options self.vdm_options = vdm_options def to_dict_v2(self) -> Dict[str, Any]: return { "ConfigurationSetName": self.configuration_set_name, "TrackingOptions": self.tracking_options, "DeliveryOptions": self.delivery_options, "ReputationOptions": self.reputation_options, "SendingOptions": {"SendingEnabled": self.enabled}, "Tags": self.tags, "SuppressionOptions": self.suppression_options, "VdmOptions": self.vdm_options, } class SESBackend(BaseBackend): """ Responsible for mocking calls to SES. Sent messages are persisted in the backend. If you need to verify that a message was sent successfully, you can use the internal API to check: .. sourcecode:: python from moto.core import DEFAULT_ACCOUNT_ID from moto.ses import ses_backends ses_backend = ses_backends[DEFAULT_ACCOUNT_ID][region] messages = ses_backend.sent_messages # sent_messages is a List of Message objects Note that, as this is an internal API, the exact format may differ per versions. """ def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.addresses: List[str] = [] self.email_addresses: List[str] = [] self.domains: List[str] = [] self.sent_messages: List[Any] = [] self.sent_message_count = 0 self.rejected_messages_count = 0 self.sns_topics: Dict[str, Dict[str, Any]] = {} self.config_sets: Dict[str, ConfigurationSet] = {} self.config_set_event_destination: Dict[str, Dict[str, Any]] = {} self.event_destinations: Dict[str, int] = {} self.identity_mail_from_domains: Dict[str, Dict[str, Any]] = {} self.templates: Dict[str, Dict[str, str]] = {} self.receipt_rule_set: Dict[str, List[Dict[str, Any]]] = {} def _is_verified_address(self, source: str) -> bool: _, address = parseaddr(source) if address in self.addresses: return True if address in self.email_addresses: return True host = address.split("@", 1)[-1] return host in self.domains def verify_email_identity(self, address: str) -> None: _, address = parseaddr(address) if address not in self.addresses: self.addresses.append(address) def verify_email_address(self, address: str) -> None: _, address = parseaddr(address) self.email_addresses.append(address) def verify_domain(self, domain: str) -> None: if domain.lower() not in self.domains: self.domains.append(domain.lower()) def list_identities(self, identity_type: str) -> List[str]: if identity_type == "Domain": return self.domains if identity_type == "EmailAddress": return self.addresses return self.domains + self.addresses def list_verified_email_addresses(self) -> List[str]: return self.email_addresses def delete_identity(self, identity: str) -> None: if "@" in identity: self.addresses.remove(identity) else: self.domains.remove(identity) def send_email( self, source: str, subject: str, body: str, destinations: Dict[str, List[str]] ) -> Message: recipient_count = sum(map(len, destinations.values())) if recipient_count > RECIPIENT_LIMIT: raise MessageRejectedError("Too many recipients.") if not self._is_verified_address(source): self.rejected_messages_count += 1 raise MessageRejectedError(f"Email address not verified {source}") destination_addresses = [ address for addresses in destinations.values() for address in addresses ] for address in [source, *destination_addresses]: msg = is_valid_address(address) if msg is not None: raise InvalidParameterValue(msg) self.__process_sns_feedback__(source, destinations) message_id = get_random_message_id() message = Message(message_id, source, subject, body, destinations) self.sent_messages.append(message) self.sent_message_count += recipient_count return message def send_bulk_templated_email( self, source: str, template: List[str], template_data: List[str], destinations: List[Dict[str, Dict[str, List[str]]]], ) -> BulkTemplateMessage: recipient_count = len(destinations) if recipient_count > RECIPIENT_LIMIT: raise MessageRejectedError("Too many destinations.") total_recipient_count = sum( map(lambda d: sum(map(len, d["Destination"].values())), destinations) ) if total_recipient_count > RECIPIENT_LIMIT: raise MessageRejectedError("Too many destinations.") if not self._is_verified_address(source): self.rejected_messages_count += 1 raise MessageRejectedError(f"Email address not verified {source}") if not self.templates.get(template[0]): raise TemplateDoesNotExist(f"Template ({template[0]}) does not exist") self.__process_sns_feedback__(source, destinations) message_id = get_random_message_id() message = TemplateMessage( message_id, source, template, template_data, destinations ) self.sent_messages.append(message) self.sent_message_count += total_recipient_count ids = list(map(lambda x: get_random_message_id(), range(len(destinations)))) return BulkTemplateMessage(ids, source, template, template_data, destinations) def send_templated_email( self, source: str, template: List[str], template_data: List[str], destinations: Dict[str, List[str]], ) -> TemplateMessage: recipient_count = sum(map(len, destinations.values())) if recipient_count > RECIPIENT_LIMIT: raise MessageRejectedError("Too many recipients.") if not self._is_verified_address(source): self.rejected_messages_count += 1 raise MessageRejectedError(f"Email address not verified {source}") destination_addresses = [ address for addresses in destinations.values() for address in addresses ] for address in [source, *destination_addresses]: msg = is_valid_address(address) if msg is not None: raise InvalidParameterValue(msg) if not self.templates.get(template[0]): raise TemplateDoesNotExist(f"Template ({template[0]}) does not exist") self.__process_sns_feedback__(source, destinations) message_id = get_random_message_id() message = TemplateMessage( message_id, source, template, template_data, destinations ) self.sent_messages.append(message) self.sent_message_count += recipient_count return message def __type_of_message__(self, destinations: Any) -> Optional[str]: """Checks the destination for any special address that could indicate delivery, complaint or bounce like in SES simulator""" if isinstance(destinations, list): alladdress = destinations else: alladdress = ( destinations.get("ToAddresses", []) + destinations.get("CcAddresses", []) + destinations.get("BccAddresses", []) ) for addr in alladdress: if SESFeedback.SUCCESS_ADDR in addr: return SESFeedback.DELIVERY elif SESFeedback.COMPLAINT_ADDR in addr: return SESFeedback.COMPLAINT elif SESFeedback.BOUNCE_ADDR in addr: return SESFeedback.BOUNCE return None def __generate_feedback__(self, msg_type: str) -> Dict[str, Any]: """Generates the SNS message for the feedback""" return SESFeedback.generate_message(self.account_id, msg_type) def __process_sns_feedback__(self, source: str, destinations: Any) -> None: domain = str(source) if "@" in domain: domain = domain.split("@")[1] if domain in self.sns_topics: msg_type = self.__type_of_message__(destinations) if msg_type is not None: sns_topic = self.sns_topics[domain].get(msg_type, None) if sns_topic is not None: message = self.__generate_feedback__(msg_type) if message: sns_backends[self.account_id][self.region_name].publish( message, # type: ignore[arg-type] arn=sns_topic, ) def send_raw_email( self, source: str, destinations: List[str], raw_data: str ) -> RawMessage: if source is not None: _, source_email_address = parseaddr(source) if not self._is_verified_address(source_email_address): raise MessageRejectedError( f"Did not have authority to send from email {source_email_address}" ) message = email.message_from_string(raw_data) if source is None: if message["from"] is None: raise MessageRejectedError("Source not specified") _, source = parseaddr(message["from"]) if not self._is_verified_address(source): raise MessageRejectedError( f"Did not have authority to send from email {source}" ) fieldvalues = [ message[header] for header in ["TO", "CC", "BCC"] if header in message ] destinations += [ formataddr((realname, email_address)) for realname, email_address in getaddresses(fieldvalues) if email_address ] if len(destinations) > RECIPIENT_LIMIT: raise MessageRejectedError("Too many recipients.") for address in [addr for addr in [source, *destinations] if addr is not None]: msg = is_valid_address(address) if msg is not None: raise InvalidParameterValue(msg) self.__process_sns_feedback__(source, destinations) self.sent_message_count += len(destinations) message_id = get_random_message_id() raw_message = RawMessage(message_id, source, destinations, raw_data) self.sent_messages.append(raw_message) return raw_message def get_send_quota(self) -> SESQuota: return SESQuota(self.sent_message_count) def get_identity_notification_attributes( self, identities: List[str] ) -> Dict[str, Dict[str, Any]]: response: Dict[str, Dict[str, Any]] = {} for identity in identities: response[identity] = self.sns_topics.get(identity, {}) return response def set_identity_feedback_forwarding_enabled( self, identity: str, enabled: bool ) -> None: identity_sns_topics = self.sns_topics.get(identity, {}) identity_sns_topics[SESFeedback.FORWARDING_ENABLED] = enabled self.sns_topics[identity] = identity_sns_topics def set_identity_notification_topic( self, identity: str, notification_type: str, sns_topic: Optional[str] ) -> None: identity_sns_topics = self.sns_topics.get(identity, {}) if sns_topic is None: del identity_sns_topics[notification_type] else: identity_sns_topics[notification_type] = sns_topic self.sns_topics[identity] = identity_sns_topics def create_configuration_set(self, configuration_set_name: str) -> None: if configuration_set_name in self.config_sets: raise ConfigurationSetAlreadyExists( f"Configuration set <{configuration_set_name}> already exists" ) config_set = ConfigurationSet(configuration_set_name=configuration_set_name) self.config_sets[configuration_set_name] = config_set def create_configuration_set_v2( self, configuration_set_name: str, tracking_options: Dict[str, str], delivery_options: Dict[str, Any], reputation_options: Dict[str, Any], sending_options: Dict[str, bool], tags: List[Dict[str, str]], suppression_options: Dict[str, List[str]], vdm_options: Dict[str, Dict[str, str]], ) -> None: if configuration_set_name in self.config_sets: raise ConfigurationSetAlreadyExists( f"Configuration set <{configuration_set_name}> already exists" ) new_config_set = ConfigurationSet( configuration_set_name=configuration_set_name, tracking_options=tracking_options, delivery_options=delivery_options, reputation_options=reputation_options, sending_options=sending_options, tags=tags, suppression_options=suppression_options, vdm_options=vdm_options, ) self.config_sets[configuration_set_name] = new_config_set def describe_configuration_set( self, configuration_set_name: str ) -> ConfigurationSet: if configuration_set_name not in self.config_sets: raise ConfigurationSetDoesNotExist( f"Configuration set <{configuration_set_name}> does not exist" ) return self.config_sets[configuration_set_name] def delete_configuration_set(self, configuration_set_name: str) -> None: self.config_sets.pop(configuration_set_name) def list_configuration_sets( self, next_token: Optional[str], max_items: Optional[int] ) -> List[str]: return list(self.config_sets.keys()) def create_configuration_set_event_destination( self, configuration_set_name: str, event_destination: Dict[str, Any] ) -> None: if self.config_sets.get(configuration_set_name) is None: raise ConfigurationSetDoesNotExist("Invalid Configuration Set Name.") if self.event_destinations.get(event_destination["Name"]): raise EventDestinationAlreadyExists("Duplicate Event destination Name.") self.config_set_event_destination[configuration_set_name] = event_destination self.event_destinations[event_destination["Name"]] = 1 def get_send_statistics(self) -> Dict[str, Any]: return { "DeliveryAttempts": self.sent_message_count, "Rejects": self.rejected_messages_count, "Complaints": 0, "Bounces": 0, "Timestamp": utcnow(), } def add_template(self, template_info: Dict[str, str]) -> None: template_name = template_info["template_name"] if not template_name: raise ValidationError( "1 validation error detected: " "Value null at 'template.templateName'" "failed to satisfy constraint: Member must not be null" ) if self.templates.get(template_name, None): raise TemplateNameAlreadyExists("Duplicate Template Name.") template_subject = template_info["subject_part"] if not template_subject: raise InvalidParameterValue("The subject must be specified.") self.templates[template_name] = template_info def update_template(self, template_info: Dict[str, str]) -> None: template_name = template_info["template_name"] if not template_name: raise ValidationError( "1 validation error detected: " "Value null at 'template.templateName'" "failed to satisfy constraint: Member must not be null" ) if not self.templates.get(template_name, None): raise TemplateDoesNotExist("Invalid Template Name.") template_subject = template_info["subject_part"] if not template_subject: raise InvalidParameterValue("The subject must be specified.") self.templates[template_name] = template_info def get_template(self, template_name: str) -> Dict[str, str]: if not self.templates.get(template_name, None): raise TemplateDoesNotExist("Invalid Template Name.") return self.templates[template_name] def list_templates(self) -> List[Dict[str, str]]: return list(self.templates.values()) def render_template(self, render_data: Dict[str, Any]) -> str: template_name = render_data.get("name", "") template = self.templates.get(template_name, None) if not template: raise TemplateDoesNotExist("Invalid Template Name.") template_data = render_data.get("data") try: template_data = json.loads(template_data) # type: ignore except ValueError: raise InvalidRenderingParameterException( "Template rendering data is invalid" ) subject_part = template["subject_part"] text_part = template["text_part"] html_part = template["html_part"] subject_part = parse_template(str(subject_part), template_data) text_part = parse_template(str(text_part), template_data) html_part = parse_template(str(html_part), template_data) email_obj = MIMEMultipart("alternative") mime_text = MIMEBase("text", "plain;charset=UTF-8") mime_text.set_payload(text_part.encode("utf-8")) encode_7or8bit(mime_text) email_obj.attach(mime_text) mime_html = MIMEBase("text", "html;charset=UTF-8") mime_html.set_payload(html_part.encode("utf-8")) encode_7or8bit(mime_html) email_obj.attach(mime_html) now = datetime.datetime.now().isoformat() rendered_template = ( f"Date: {now}\r\nSubject: {subject_part}\r\n{email_obj.as_string()}" ) return rendered_template def delete_template(self, name: str) -> None: self.templates.pop(name) def create_receipt_rule_set(self, rule_set_name: str) -> None: if self.receipt_rule_set.get(rule_set_name) is not None: raise AlreadyExists(f"Rule set already exists: {rule_set_name}") self.receipt_rule_set[rule_set_name] = [] def create_receipt_rule(self, rule_set_name: str, rule: Dict[str, Any]) -> None: rule_set = self.receipt_rule_set.get(rule_set_name) if rule_set is None: raise RuleSetDoesNotExist(f"Rule set does not exist: {rule_set_name}") if rule in rule_set: raise AlreadyExists(f"Rule already exists: {rule['name']}") rule_set.append(rule) self.receipt_rule_set[rule_set_name] = rule_set def describe_receipt_rule_set(self, rule_set_name: str) -> List[Dict[str, Any]]: rule_set = self.receipt_rule_set.get(rule_set_name) if rule_set is None: raise RuleSetDoesNotExist(f"Rule set does not exist: {rule_set_name}") return rule_set def describe_receipt_rule( self, rule_set_name: str, rule_name: str ) -> Dict[str, Any]: rule_set = self.receipt_rule_set.get(rule_set_name) if rule_set is None: raise RuleSetDoesNotExist(f"Rule set does not exist: {rule_set_name}") for receipt_rule in rule_set: if receipt_rule["name"] == rule_name: return receipt_rule raise RuleDoesNotExist(f"Rule does not exist: {rule_name}") def update_receipt_rule(self, rule_set_name: str, rule: Dict[str, Any]) -> None: rule_set = self.receipt_rule_set.get(rule_set_name) if rule_set is None: raise RuleSetDoesNotExist(f"Rule set does not exist: {rule_set_name}") for i, receipt_rule in enumerate(rule_set): if receipt_rule["name"] == rule["name"]: rule_set[i] = rule break else: raise RuleDoesNotExist(f"Rule does not exist: {rule['name']}") def set_identity_mail_from_domain( self, identity: str, mail_from_domain: Optional[str] = None, behavior_on_mx_failure: Optional[str] = None, ) -> None: if not self._is_verified_address(identity): raise InvalidParameterValue(f"Identity '{identity}' does not exist.") if mail_from_domain is None: self.identity_mail_from_domains.pop(identity) return if not mail_from_domain.endswith(identity.split("@")[-1]): raise InvalidParameterValue( f"Provided MAIL-FROM domain '{mail_from_domain}' is not subdomain of " f"the domain of the identity '{identity.split('@')[-1]}'." ) if behavior_on_mx_failure not in (None, "RejectMessage", "UseDefaultValue"): raise ValidationError( "1 validation error detected: " f"Value '{behavior_on_mx_failure}' at 'behaviorOnMXFailure'" "failed to satisfy constraint: Member must satisfy enum value set: " "[RejectMessage, UseDefaultValue]" ) self.identity_mail_from_domains[identity] = { "mail_from_domain": mail_from_domain, "behavior_on_mx_failure": behavior_on_mx_failure, } def get_identity_mail_from_domain_attributes( self, identities: Optional[List[str]] = None ) -> Dict[str, Dict[str, str]]: if identities is None: identities = [] attributes_by_identity = {} for identity in identities: if identity in (self.domains + self.addresses): attributes_by_identity[identity] = self.identity_mail_from_domains.get( identity ) or {"behavior_on_mx_failure": "UseDefaultValue"} return attributes_by_identity def get_identity_verification_attributes( self, identities: Optional[List[str]] = None ) -> Dict[str, str]: if identities is None: identities = [] attributes_by_identity: Dict[str, str] = {} for identity in identities: if identity in (self.domains + self.addresses): attributes_by_identity[identity] = "Success" return attributes_by_identity ses_backends = BackendDict(SESBackend, "ses")
Memory