import datetime import enum import re import time import typing from collections import OrderedDict from typing import Any, Dict, List, Optional, Set, Tuple from joserfc import jwk, jwt from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel from moto.core.utils import utcnow from moto.moto_api._internal import mock_random as random from moto.utilities.paginator import paginate from moto.utilities.utils import get_partition, load_resource, md5_hash from ..settings import get_cognito_idp_user_pool_id_strategy from .exceptions import ( AliasExistsException, ExpiredCodeException, GroupExistsException, InvalidParameterException, InvalidPasswordException, NotAuthorizedError, ResourceNotFoundError, UsernameExistsException, UserNotConfirmedException, UserNotFoundError, ) from .utils import ( PAGINATION_MODEL, check_secret_hash, create_id, expand_attrs, flatten_attrs, generate_id, validate_username_format, ) class UserStatus(str, enum.Enum): FORCE_CHANGE_PASSWORD = "FORCE_CHANGE_PASSWORD" CONFIRMED = "CONFIRMED" UNCONFIRMED = "UNCONFIRMED" RESET_REQUIRED = "RESET_REQUIRED" class AuthFlow(str, enum.Enum): # Order follows AWS' order ADMIN_NO_SRP_AUTH = "ADMIN_NO_SRP_AUTH" ADMIN_USER_PASSWORD_AUTH = "ADMIN_USER_PASSWORD_AUTH" USER_SRP_AUTH = "USER_SRP_AUTH" REFRESH_TOKEN_AUTH = "REFRESH_TOKEN_AUTH" REFRESH_TOKEN = "REFRESH_TOKEN" CUSTOM_AUTH = "CUSTOM_AUTH" USER_PASSWORD_AUTH = "USER_PASSWORD_AUTH" @classmethod def list(cls) -> List[str]: return [e.value for e in cls] class CognitoIdpUserPoolAttribute(BaseModel): STANDARD_SCHEMA = { "sub": { "AttributeDataType": "String", "Mutable": False, "Required": True, "StringAttributeConstraints": {"MinLength": "1", "MaxLength": "2048"}, }, "name": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "given_name": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "family_name": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "middle_name": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "nickname": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "preferred_username": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "profile": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "picture": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "website": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "email": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "email_verified": { "AttributeDataType": "Boolean", "Mutable": True, "Required": False, }, "gender": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "birthdate": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "10", "MaxLength": "10"}, }, "zoneinfo": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "locale": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "phone_number": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "phone_number_verified": { "AttributeDataType": "Boolean", "Mutable": True, "Required": False, }, "address": { "AttributeDataType": "String", "Mutable": True, "Required": False, "StringAttributeConstraints": {"MinLength": "0", "MaxLength": "2048"}, }, "updated_at": { "AttributeDataType": "Number", "Mutable": True, "Required": False, "NumberAttributeConstraints": {"MinValue": "0"}, }, } ATTRIBUTE_DATA_TYPES = {"Boolean", "DateTime", "String", "Number"} def __init__(self, name: str, custom: bool, schema: Dict[str, Any]): self.name = name self.custom = custom attribute_data_type = schema.get("AttributeDataType", None) if ( attribute_data_type and attribute_data_type not in CognitoIdpUserPoolAttribute.ATTRIBUTE_DATA_TYPES ): raise InvalidParameterException( f"Validation error detected: Value '{attribute_data_type}' failed to satisfy constraint: Member must satisfy enum value set: [Boolean, Number, String, DateTime]" ) if self.custom: self._init_custom(schema) else: self._init_standard(schema) def _init_custom(self, schema: Dict[str, Any]) -> None: self.name = "custom:" + self.name attribute_data_type = schema.get("AttributeDataType", None) if not attribute_data_type: raise InvalidParameterException( "Invalid AttributeDataType input, consider using the provided AttributeDataType enum." ) self.data_type = attribute_data_type self.developer_only = schema.get("DeveloperOnlyAttribute", False) if self.developer_only: self.name = "dev:" + self.name self.mutable = schema.get("Mutable", True) if schema.get("Required", False): raise InvalidParameterException( "Required custom attributes are not supported currently." ) self.required = False self._init_constraints(schema, None, show_empty_constraints=True) def _init_standard(self, schema: Dict[str, Any]) -> None: attribute_data_type = schema.get("AttributeDataType", None) default_attribute_data_type = CognitoIdpUserPoolAttribute.STANDARD_SCHEMA[ self.name ]["AttributeDataType"] if attribute_data_type and attribute_data_type != default_attribute_data_type: raise InvalidParameterException( f"You can not change AttributeDataType or set developerOnlyAttribute for standard schema attribute {self.name}" ) self.data_type = default_attribute_data_type if schema.get("DeveloperOnlyAttribute", False): raise InvalidParameterException( f"You can not change AttributeDataType or set developerOnlyAttribute for standard schema attribute {self.name}" ) else: self.developer_only = False self.mutable = schema.get( "Mutable", CognitoIdpUserPoolAttribute.STANDARD_SCHEMA[self.name]["Mutable"] ) self.required = schema.get( "Required", CognitoIdpUserPoolAttribute.STANDARD_SCHEMA[self.name]["Required"], ) constraints_key = None if self.data_type == "Number": constraints_key = "NumberAttributeConstraints" elif self.data_type == "String": constraints_key = "StringAttributeConstraints" default_constraints = ( None if not constraints_key else CognitoIdpUserPoolAttribute.STANDARD_SCHEMA[self.name][constraints_key] ) self._init_constraints(schema, default_constraints) def _init_constraints( self, schema: Dict[str, Any], default_constraints: Any, show_empty_constraints: bool = False, ) -> None: def numeric_limit(num: Optional[str], constraint_type: str) -> Optional[int]: if not num: return # type: ignore[return-value] parsed = None try: parsed = int(num) except ValueError: pass if parsed is None or parsed < 0: raise InvalidParameterException( f"Invalid {constraint_type} for schema attribute {self.name}" ) return parsed self.string_constraints: Optional[Dict[str, Any]] = ( {} if show_empty_constraints else None ) self.number_constraints = None if "AttributeDataType" in schema: # Quirk - schema is set/validated only if AttributeDataType is specified if self.data_type == "String": string_constraints = schema.get( "StringAttributeConstraints", default_constraints ) if not string_constraints: return min_len = numeric_limit( string_constraints.get("MinLength", None), "StringAttributeConstraints", ) max_len = numeric_limit( string_constraints.get("MaxLength", None), "StringAttributeConstraints", ) if (min_len and min_len > 2048) or (max_len and max_len > 2048): raise InvalidParameterException( f"user.{self.name}: String attributes cannot have a length of more than 2048" ) if min_len and max_len and min_len > max_len: raise InvalidParameterException( f"user.{self.name}: Max length cannot be less than min length." ) self.string_constraints = string_constraints self.number_constraints = None elif self.data_type == "Number": number_constraints = schema.get( "NumberAttributeConstraints", default_constraints ) if not number_constraints: return # No limits on either min or max value min_val = numeric_limit( number_constraints.get("MinValue", None), "NumberAttributeConstraints", ) max_val = numeric_limit( number_constraints.get("MaxValue", None), "NumberAttributeConstraints", ) if min_val and max_val and min_val > max_val: raise InvalidParameterException( f"user.{self.name}: Max value cannot be less than min value." ) self.number_constraints = number_constraints self.string_constraints = None else: self.number_constraints = None self.string_constraints = None def to_json(self) -> Dict[str, Any]: return { "Name": self.name, "AttributeDataType": self.data_type, "DeveloperOnlyAttribute": self.developer_only, "Mutable": self.mutable, "Required": self.required, "NumberAttributeConstraints": self.number_constraints, "StringAttributeConstraints": self.string_constraints, } DEFAULT_USER_POOL_CONFIG: Dict[str, Any] = { "Policies": { "PasswordPolicy": { "MinimumLength": 8, "RequireUppercase": True, "RequireLowercase": True, "RequireNumbers": True, "RequireSymbols": True, "TemporaryPasswordValidityDays": 7, } }, "AdminCreateUserConfig": { "AllowAdminCreateUserOnly": False, "UnusedAccountValidityDays": 7, "InviteMessageTemplate": { "SMSMessage": "Your username is {username} and temporary password is {####}. ", "EmailMessage": "Your username is {username} and temporary password is {####}. ", "EmailSubject": "Your temporary password", }, }, "EmailConfiguration": {"EmailSendingAccount": "COGNITO_DEFAULT"}, "VerificationMessageTemplate": { "SmsMessage": "Your verification code is {####}. ", "EmailMessage": "Your verification code is {####}. ", "EmailSubject": "Your verification code", "DefaultEmailOption": "CONFIRM_WITH_CODE", }, "AccountRecoverySetting": { "RecoveryMechanisms": [ {"Priority": 1, "Name": "verified_email"}, {"Priority": 2, "Name": "verified_phone_number"}, ] }, } class CognitoIdpUserPool(BaseModel): MAX_ID_LENGTH = 55 def __init__( self, account_id: str, region: str, name: str, extended_config: Dict[str, Any] ): self.account_id = account_id self.region = region user_pool_id = generate_id( get_cognito_idp_user_pool_id_strategy(), region, name, extended_config ) self.id = f"{self.region}_{user_pool_id}"[: self.MAX_ID_LENGTH] self.arn = f"arn:{get_partition(region)}:cognito-idp:{region}:{account_id}:userpool/{self.id}" self.name = name self.status = None self.update_extended_config(extended_config) self.creation_date = utcnow() self.last_modified_date = utcnow() self.mfa_config = extended_config.get("MfaConfiguration") or "OFF" self.sms_mfa_config: Optional[Dict[str, Any]] = None self.token_mfa_config: Optional[Dict[str, bool]] = None self.schema_attributes = {} for schema in self.extended_config.pop("Schema", {}): attribute = CognitoIdpUserPoolAttribute( schema["Name"], schema["Name"] not in CognitoIdpUserPoolAttribute.STANDARD_SCHEMA, schema, ) self.schema_attributes[attribute.name] = attribute # If we do not have custom attributes, use the standard schema if not self.schema_attributes: for ( standard_attribute_name, standard_attribute_schema, ) in CognitoIdpUserPoolAttribute.STANDARD_SCHEMA.items(): self.schema_attributes[standard_attribute_name] = ( CognitoIdpUserPoolAttribute( standard_attribute_name, False, standard_attribute_schema ) ) self.clients: Dict[str, CognitoIdpUserPoolClient] = OrderedDict() self.identity_providers: Dict[str, CognitoIdpIdentityProvider] = OrderedDict() self.groups: Dict[str, CognitoIdpGroup] = OrderedDict() self.users: Dict[str, CognitoIdpUser] = OrderedDict() self.resource_servers: Dict[str, CognitoResourceServer] = OrderedDict() self.refresh_tokens: Dict[str, Optional[Tuple[str, str, str]]] = {} self.access_tokens: Dict[str, Tuple[str, str]] = {} self.id_tokens: Dict[str, Tuple[str, str]] = {} jwks_file = load_resource(__name__, "resources/jwks-private.json") self.json_web_key = jwk.RSAKey.import_key(jwks_file) @property def backend(self) -> "CognitoIdpBackend": return cognitoidp_backends[self.account_id][self.region] @property def domain(self) -> Optional["CognitoIdpUserPoolDomain"]: return next( ( upd for upd in self.backend.user_pool_domains.values() if upd.user_pool_id == self.id ), None, ) def update_extended_config(self, extended_config: Dict[str, Any]) -> None: self.extended_config = DEFAULT_USER_POOL_CONFIG.copy() self.extended_config.update(extended_config or {}) message_template = self.extended_config.get("VerificationMessageTemplate") if message_template and "SmsVerificationMessage" not in extended_config: self.extended_config["SmsVerificationMessage"] = message_template.get( "SmsMessage" ) if message_template and "EmailVerificationSubject" not in extended_config: self.extended_config["EmailVerificationSubject"] = message_template.get( "EmailSubject" ) if message_template and "EmailVerificationMessage" not in extended_config: self.extended_config["EmailVerificationMessage"] = message_template.get( "EmailMessage" ) def _base_json(self) -> Dict[str, Any]: return { "Id": self.id, "Arn": self.arn, "Name": self.name, "Status": self.status, "CreationDate": time.mktime(self.creation_date.timetuple()), "LastModifiedDate": time.mktime(self.last_modified_date.timetuple()), "MfaConfiguration": self.mfa_config, "EstimatedNumberOfUsers": len(self.users), } def to_json(self, extended: bool = False) -> Dict[str, Any]: user_pool_json = self._base_json() if extended: user_pool_json.update(self.extended_config) user_pool_json.update( { "SchemaAttributes": [ att.to_json() for att in self.schema_attributes.values() ] } ) else: user_pool_json["LambdaConfig"] = ( self.extended_config.get("LambdaConfig") or {} ) if self.domain: user_pool_json["Domain"] = self.domain.domain return user_pool_json def _get_user(self, username: str) -> "CognitoIdpUser": """Find a user within a user pool by Username or any UsernameAttributes (`email` or `phone_number` or both)""" if self.extended_config.get("UsernameAttributes"): attribute_types = self.extended_config["UsernameAttributes"] for user in self.users.values(): if username in [ flatten_attrs(user.attributes).get(attribute_type) for attribute_type in attribute_types ]: return user return self.users.get(username) # type: ignore[return-value] def create_jwt( self, client_id: str, username: str, token_use: str, expires_in: int = 60 * 60, extra_data: Optional[Dict[str, Any]] = None, ) -> Tuple[str, int]: now = int(time.time()) payload = { "iss": f"https://cognito-idp.{self.region}.amazonaws.com/{self.id}", "sub": self._get_user(username).id, "client_id" if token_use == "access" else "aud": client_id, "token_use": token_use, "auth_time": now, "exp": now + expires_in, "jti": str(random.uuid4()), } username_is_email = "email" in self.extended_config.get( "UsernameAttributes", [] ) if token_use == "access": if username_is_email: payload["username"] = payload["sub"] else: payload["username"] = username if token_use == "id": if username_is_email: payload["cognito:username"] = payload["sub"] payload["email"] = username else: payload["cognito:username"] = username payload.update(extra_data or {}) headers = {"kid": "dummy", "alg": "RS256"} # KID as present in jwks-public.json return ( jwt.encode(headers, payload, self.json_web_key), expires_in, ) def add_custom_attributes(self, custom_attributes: List[Dict[str, str]]) -> None: attributes = [] for attribute_schema in custom_attributes: base_name = attribute_schema["Name"] target_name = "custom:" + base_name if attribute_schema.get("DeveloperOnlyAttribute", False): target_name = "dev:" + target_name if target_name in self.schema_attributes: raise InvalidParameterException( f"custom:{base_name}: Existing attribute already has name {target_name}." ) attribute = CognitoIdpUserPoolAttribute(base_name, True, attribute_schema) attributes.append(attribute) for attribute in attributes: self.schema_attributes[attribute.name] = attribute def create_id_token( self, client_id: str, username: str, origin_jti: str ) -> Tuple[str, int]: """ :returns: (id_token, expires_in) """ extra_data = self.get_user_extra_data_by_client_id(client_id, username) extra_data["origin_jti"] = origin_jti user = self._get_user(username) for attr in user.attributes: if attr["Name"].startswith("custom:"): extra_data[attr["Name"]] = attr["Value"] if len(user.groups) > 0: extra_data["cognito:groups"] = [group.group_name for group in user.groups] id_token, expires_in = self.create_jwt( client_id, username, "id", extra_data=extra_data ) self.id_tokens[id_token] = (client_id, username) return id_token, expires_in def create_refresh_token(self, client_id: str, username: str) -> Tuple[str, str]: """ :returns: (refresh_token, origin_jti) """ refresh_token = str(random.uuid4()) origin_jti = str(random.uuid4()) self.refresh_tokens[refresh_token] = (client_id, username, origin_jti) return refresh_token, origin_jti def create_access_token( self, client_id: str, username: str, origin_jti: str ) -> Tuple[str, int]: """ :returns: (access_token, expires_in) """ extra_data: Dict[str, Any] = { "origin_jti": origin_jti, } user = self._get_user(username) if len(user.groups) > 0: extra_data["cognito:groups"] = [group.group_name for group in user.groups] access_token, expires_in = self.create_jwt( client_id, username, "access", extra_data=extra_data ) self.access_tokens[access_token] = (client_id, username) return access_token, expires_in def create_tokens_from_refresh_token( self, refresh_token: str ) -> Tuple[str, str, int]: res = self.refresh_tokens[refresh_token] if res is None: raise NotAuthorizedError(refresh_token) client_id, username, origin_jti = res if not username: raise NotAuthorizedError(refresh_token) access_token, expires_in = self.create_access_token( client_id, username, origin_jti=origin_jti ) id_token, _ = self.create_id_token(client_id, username, origin_jti=origin_jti) return access_token, id_token, expires_in def get_user_extra_data_by_client_id( self, client_id: str, username: str ) -> Dict[str, Any]: extra_data = {} current_client = self.clients.get(client_id, None) if current_client: for readable_field in current_client.get_readable_fields(): attribute = list( filter( lambda f: f["Name"] == readable_field, self._get_user(username).attributes, ) ) if len(attribute) > 0: extra_data.update({attribute[0]["Name"]: attribute[0]["Value"]}) return extra_data def sign_out(self, username: str) -> None: for token, token_tuple in list(self.refresh_tokens.items()): if token_tuple is None: continue _, logged_in_user, _ = token_tuple if username == logged_in_user: self.refresh_tokens[token] = None for access_token, (_, logged_in_user) in list(self.access_tokens.items()): if username == logged_in_user: self.access_tokens.pop(access_token) class CognitoIdpUserPoolDomain(BaseModel): def __init__( self, user_pool_id: str, domain: str, custom_domain_config: Optional[Dict[str, Any]] = None, ): self.user_pool_id = user_pool_id self.domain = domain self.custom_domain_config = custom_domain_config or {} def _distribution_name(self) -> str: if self.custom_domain_config and "CertificateArn" in self.custom_domain_config: unique_hash = md5_hash( self.custom_domain_config["CertificateArn"].encode("utf-8") ).hexdigest() return f"{unique_hash[:16]}.cloudfront.net" unique_hash = md5_hash(self.user_pool_id.encode("utf-8")).hexdigest() return f"{unique_hash[:16]}.amazoncognito.com" def to_json(self, extended: bool = True) -> Dict[str, Any]: distribution = self._distribution_name() if extended: return { "UserPoolId": self.user_pool_id, "AWSAccountId": str(random.uuid4()), "CloudFrontDistribution": distribution, "Domain": self.domain, "S3Bucket": None, "Status": "ACTIVE", "Version": None, } else: return {"CloudFrontDomain": distribution} class CognitoIdpUserPoolClient(BaseModel): def __init__( self, user_pool_id: str, generate_secret: bool, extended_config: Optional[Dict[str, Any]], ): self.user_pool_id = user_pool_id self.id = create_id() self.secret = str(random.uuid4()) self.generate_secret = generate_secret or False # Some default values - may be overridden by the user self.extended_config: Dict[str, Any] = { "AllowedOAuthFlowsUserPoolClient": False, "AuthSessionValidity": 3, "EnablePropagateAdditionalUserContextData": False, "EnableTokenRevocation": True, "RefreshTokenValidity": 30, } self.extended_config.update(extended_config or {}) def _base_json(self) -> Dict[str, Any]: return { "ClientId": self.id, "ClientName": self.extended_config.get("ClientName"), "UserPoolId": self.user_pool_id, } def to_json(self, extended: bool = False) -> Dict[str, Any]: user_pool_client_json = self._base_json() if self.generate_secret: user_pool_client_json.update({"ClientSecret": self.secret}) if extended: user_pool_client_json.update(self.extended_config) return user_pool_client_json def get_readable_fields(self) -> List[str]: return self.extended_config.get("ReadAttributes", []) class CognitoIdpIdentityProvider(BaseModel): def __init__(self, name: str, extended_config: Optional[Dict[str, Any]]): self.name = name self.extended_config = extended_config or {} self.creation_date = utcnow() self.last_modified_date = utcnow() if "AttributeMapping" not in self.extended_config: self.extended_config["AttributeMapping"] = {"username": "sub"} def _base_json(self) -> Dict[str, Any]: return { "ProviderName": self.name, "ProviderType": self.extended_config.get("ProviderType"), "CreationDate": time.mktime(self.creation_date.timetuple()), "LastModifiedDate": time.mktime(self.last_modified_date.timetuple()), } def to_json(self, extended: bool = False) -> Dict[str, Any]: identity_provider_json = self._base_json() if extended: identity_provider_json.update(self.extended_config) return identity_provider_json class CognitoIdpGroup(BaseModel): def __init__( self, user_pool_id: str, group_name: str, description: str, role_arn: str, precedence: int, ): self.user_pool_id = user_pool_id self.group_name = group_name self.description = description or "" self.role_arn = role_arn self.precedence = precedence self.last_modified_date = datetime.datetime.now() self.creation_date = self.last_modified_date # Users who are members of this group. # Note that these links are bidirectional. self.users: Set[CognitoIdpUser] = set() def update( self, description: Optional[str], role_arn: Optional[str], precedence: Optional[int], ) -> None: if description is not None: self.description = description if role_arn is not None: self.role_arn = role_arn if precedence is not None: self.precedence = precedence self.last_modified_date = datetime.datetime.now() def to_json(self) -> Dict[str, Any]: return { "GroupName": self.group_name, "UserPoolId": self.user_pool_id, "Description": self.description, "RoleArn": self.role_arn, "Precedence": self.precedence, "LastModifiedDate": time.mktime(self.last_modified_date.timetuple()), "CreationDate": time.mktime(self.creation_date.timetuple()), } class CognitoIdpUser(BaseModel): def __init__( self, user_pool_id: str, username: Optional[str], password: Optional[str], status: str, attributes: List[Dict[str, str]], ): self.id = str(random.uuid4()) self.user_pool_id = user_pool_id # Username is None when users sign up with an email or phone_number, # and should be given the value of the internal id generate (sub) self.username = username if username else self.id self.password = password self.status = status self.enabled = True self.attributes = attributes self.attribute_lookup = flatten_attrs(attributes) self.create_date = utcnow() self.last_modified_date = utcnow() self.sms_mfa_enabled = False self.software_token_mfa_enabled = False self.token_verified = False self.confirmation_code: Optional[str] = None self.preferred_mfa_setting: Optional[str] = None # Groups this user is a member of. # Note that these links are bidirectional. self.groups: Set[CognitoIdpGroup] = set() self.update_attributes([{"Name": "sub", "Value": self.id}]) def _base_json(self) -> Dict[str, Any]: return { "UserPoolId": self.user_pool_id, "Username": self.username, "UserStatus": self.status, "UserCreateDate": time.mktime(self.create_date.timetuple()), "UserLastModifiedDate": time.mktime(self.last_modified_date.timetuple()), } # list_users brings back "Attributes" while admin_get_user brings back "UserAttributes". def to_json( self, extended: bool = False, attributes_key: str = "Attributes", attributes_to_get: Optional[List[str]] = None, ) -> Dict[str, Any]: user_mfa_setting_list = [] if self.software_token_mfa_enabled: user_mfa_setting_list.append("SOFTWARE_TOKEN_MFA") if self.sms_mfa_enabled: user_mfa_setting_list.append("SMS_MFA") user_json = self._base_json() if extended: attrs = [ attr for attr in self.attributes if not attributes_to_get or attr["Name"] in attributes_to_get ] user_json.update( { "Enabled": self.enabled, attributes_key: attrs, "MFAOptions": [], "UserMFASettingList": user_mfa_setting_list, "PreferredMfaSetting": self.preferred_mfa_setting or "", } ) return user_json def update_attributes(self, new_attributes: List[Dict[str, Any]]) -> None: flat_attributes = flatten_attrs(self.attributes) flat_attributes.update(flatten_attrs(new_attributes)) self.attribute_lookup = flat_attributes self.attributes = expand_attrs(flat_attributes) self.last_modified_date = utcnow() def delete_attributes(self, attrs_to_delete: List[str]) -> None: flat_attributes = flatten_attrs(self.attributes) wrong_attrs = [] for attr in attrs_to_delete: try: flat_attributes.pop(attr) except KeyError: wrong_attrs.append(attr) if wrong_attrs: raise InvalidParameterException( "Invalid user attributes: " + "\n".join( [ f"user.{w}: Attribute does not exist in the schema." for w in wrong_attrs ] ) + "\n" ) self.attribute_lookup = flat_attributes self.attributes = expand_attrs(flat_attributes) self.last_modified_date = utcnow() class CognitoResourceServer(BaseModel): def __init__( self, user_pool_id: str, identifier: str, name: str, scopes: List[Dict[str, str]], ): self.user_pool_id = user_pool_id self.identifier = identifier self.name = name self.scopes = scopes def to_json(self) -> Dict[str, Any]: res: Dict[str, Any] = { "UserPoolId": self.user_pool_id, "Identifier": self.identifier, "Name": self.name, } if self.scopes: res.update({"Scopes": self.scopes}) return res class CognitoIdpBackend(BaseBackend): """ Moto mocks the JWK uris. If you're using decorators, you can retrieve this information by making a call to `https://cognito-idp.us-west-2.amazonaws.com/someuserpoolid/.well-known/jwks.json`. Call `http://localhost:5000/userpoolid/.well-known/jwks.json` instead of you're running Moto in ServerMode or Docker. Because Moto cannot determine this is a CognitoIDP-request based on the URL alone, you have to add an Authorization-header instead: `Authorization: AWS4-HMAC-SHA256 Credential=mock_access_key/20220524/us-east-1/cognito-idp/aws4_request, SignedHeaders=content-length;content-type;host;x-amz-date, Signature=asdf` In some cases, you need to have reproducible IDs for the user pool. For example, a single initialization before the start of integration tests. This behavior can be enabled by passing the environment variable: MOTO_COGNITO_IDP_USER_POOL_ID_STRATEGY=HASH. """ def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.user_pools: Dict[str, CognitoIdpUserPool] = OrderedDict() self.user_pool_domains: Dict[str, CognitoIdpUserPoolDomain] = OrderedDict() self.sessions: Dict[str, Tuple[str, CognitoIdpUserPool]] = {} # User pool def create_user_pool( self, name: str, extended_config: Dict[str, Any] ) -> CognitoIdpUserPool: user_pool = CognitoIdpUserPool( self.account_id, self.region_name, name, extended_config ) self.user_pools[user_pool.id] = user_pool return user_pool def set_user_pool_mfa_config( self, user_pool_id: str, sms_config: Dict[str, Any], token_config: Dict[str, bool], mfa_config: str, ) -> Dict[str, Any]: user_pool = self.describe_user_pool(user_pool_id) user_pool.mfa_config = mfa_config user_pool.sms_mfa_config = sms_config user_pool.token_mfa_config = token_config return self.get_user_pool_mfa_config(user_pool_id) def get_user_pool_mfa_config(self, user_pool_id: str) -> Dict[str, Any]: user_pool = self.describe_user_pool(user_pool_id) return { "SmsMfaConfiguration": user_pool.sms_mfa_config, "SoftwareTokenMfaConfiguration": user_pool.token_mfa_config, "MfaConfiguration": user_pool.mfa_config, } @paginate(pagination_model=PAGINATION_MODEL) def list_user_pools(self) -> List[CognitoIdpUserPool]: return list(self.user_pools.values()) def describe_user_pool(self, user_pool_id: str) -> CognitoIdpUserPool: user_pool = self.user_pools.get(user_pool_id) if not user_pool: raise ResourceNotFoundError(f"User pool {user_pool_id} does not exist.") return user_pool def update_user_pool( self, user_pool_id: str, extended_config: Dict[str, Any] ) -> None: user_pool = self.describe_user_pool(user_pool_id) user_pool.update_extended_config(extended_config) def delete_user_pool(self, user_pool_id: str) -> None: self.describe_user_pool(user_pool_id) del self.user_pools[user_pool_id] # User pool domain def create_user_pool_domain( self, user_pool_id: str, domain: str, custom_domain_config: Optional[Dict[str, str]] = None, ) -> CognitoIdpUserPoolDomain: self.describe_user_pool(user_pool_id) user_pool_domain = CognitoIdpUserPoolDomain( user_pool_id, domain, custom_domain_config=custom_domain_config ) self.user_pool_domains[domain] = user_pool_domain return user_pool_domain def describe_user_pool_domain( self, domain: str ) -> Optional[CognitoIdpUserPoolDomain]: if domain not in self.user_pool_domains: return None return self.user_pool_domains[domain] def delete_user_pool_domain(self, domain: str) -> None: if domain not in self.user_pool_domains: raise ResourceNotFoundError(domain) del self.user_pool_domains[domain] def update_user_pool_domain( self, domain: str, custom_domain_config: Dict[str, str] ) -> CognitoIdpUserPoolDomain: if domain not in self.user_pool_domains: raise ResourceNotFoundError(domain) user_pool_domain = self.user_pool_domains[domain] user_pool_domain.custom_domain_config = custom_domain_config return user_pool_domain # User pool client def create_user_pool_client( self, user_pool_id: str, generate_secret: bool, extended_config: Dict[str, str] ) -> CognitoIdpUserPoolClient: user_pool = self.describe_user_pool(user_pool_id) user_pool_client = CognitoIdpUserPoolClient( user_pool_id, generate_secret, extended_config ) user_pool.clients[user_pool_client.id] = user_pool_client return user_pool_client @paginate(pagination_model=PAGINATION_MODEL) def list_user_pool_clients( self, user_pool_id: str ) -> List[CognitoIdpUserPoolClient]: user_pool = self.describe_user_pool(user_pool_id) return list(user_pool.clients.values()) def describe_user_pool_client( self, user_pool_id: str, client_id: str ) -> CognitoIdpUserPoolClient: user_pool = self.describe_user_pool(user_pool_id) client = user_pool.clients.get(client_id) if not client: raise ResourceNotFoundError(client_id) return client def update_user_pool_client( self, user_pool_id: str, client_id: str, extended_config: Dict[str, str] ) -> CognitoIdpUserPoolClient: user_pool = self.describe_user_pool(user_pool_id) client = user_pool.clients.get(client_id) if not client: raise ResourceNotFoundError(client_id) client.extended_config.update(extended_config) return client def delete_user_pool_client(self, user_pool_id: str, client_id: str) -> None: user_pool = self.describe_user_pool(user_pool_id) if client_id not in user_pool.clients: raise ResourceNotFoundError(client_id) del user_pool.clients[client_id] # Identity provider def create_identity_provider( self, user_pool_id: str, name: str, extended_config: Dict[str, str] ) -> CognitoIdpIdentityProvider: user_pool = self.describe_user_pool(user_pool_id) identity_provider = CognitoIdpIdentityProvider(name, extended_config) user_pool.identity_providers[name] = identity_provider return identity_provider @paginate(pagination_model=PAGINATION_MODEL) def list_identity_providers( self, user_pool_id: str ) -> List[CognitoIdpIdentityProvider]: user_pool = self.describe_user_pool(user_pool_id) return list(user_pool.identity_providers.values()) def describe_identity_provider( self, user_pool_id: str, name: str ) -> CognitoIdpIdentityProvider: user_pool = self.describe_user_pool(user_pool_id) identity_provider = user_pool.identity_providers.get(name) if not identity_provider: raise ResourceNotFoundError(name) return identity_provider def update_identity_provider( self, user_pool_id: str, name: str, extended_config: Dict[str, str] ) -> CognitoIdpIdentityProvider: user_pool = self.describe_user_pool(user_pool_id) identity_provider = user_pool.identity_providers.get(name) if not identity_provider: raise ResourceNotFoundError(name) identity_provider.extended_config.update(extended_config) return identity_provider def delete_identity_provider(self, user_pool_id: str, name: str) -> None: user_pool = self.describe_user_pool(user_pool_id) if name not in user_pool.identity_providers: raise ResourceNotFoundError(name) del user_pool.identity_providers[name] # Group def create_group( self, user_pool_id: str, group_name: str, description: str, role_arn: str, precedence: int, ) -> CognitoIdpGroup: user_pool = self.describe_user_pool(user_pool_id) group = CognitoIdpGroup( user_pool_id, group_name, description, role_arn, precedence ) if group.group_name in user_pool.groups: raise GroupExistsException("A group with the name already exists") user_pool.groups[group.group_name] = group return group def get_group(self, user_pool_id: str, group_name: str) -> CognitoIdpGroup: user_pool = self.describe_user_pool(user_pool_id) if group_name not in user_pool.groups: raise ResourceNotFoundError(group_name) return user_pool.groups[group_name] @paginate(pagination_model=PAGINATION_MODEL) def list_groups(self, user_pool_id: str) -> List[CognitoIdpGroup]: user_pool = self.describe_user_pool(user_pool_id) return list(user_pool.groups.values()) def delete_group(self, user_pool_id: str, group_name: str) -> None: user_pool = self.describe_user_pool(user_pool_id) if group_name not in user_pool.groups: raise ResourceNotFoundError(group_name) group = user_pool.groups[group_name] for user in group.users: user.groups.remove(group) del user_pool.groups[group_name] def update_group( self, user_pool_id: str, group_name: str, description: str, role_arn: str, precedence: int, ) -> CognitoIdpGroup: group = self.get_group(user_pool_id, group_name) group.update(description, role_arn, precedence) return group def admin_add_user_to_group( self, user_pool_id: str, group_name: str, username: str ) -> None: group = self.get_group(user_pool_id, group_name) user = self.admin_get_user(user_pool_id, username) group.users.add(user) user.groups.add(group) @paginate(pagination_model=PAGINATION_MODEL) def list_users_in_group( self, user_pool_id: str, group_name: str ) -> List[CognitoIdpUser]: user_pool = self.describe_user_pool(user_pool_id) group = self.get_group(user_pool_id, group_name) return list(filter(lambda user: user in group.users, user_pool.users.values())) @paginate(pagination_model=PAGINATION_MODEL) def admin_list_groups_for_user( self, user_pool_id: str, username: str ) -> List[CognitoIdpGroup]: user = self.admin_get_user(user_pool_id, username) return list(user.groups) def admin_remove_user_from_group( self, user_pool_id: str, group_name: str, username: str ) -> None: group = self.get_group(user_pool_id, group_name) user = self.admin_get_user(user_pool_id, username) group.users.discard(user) user.groups.discard(group) def admin_reset_user_password(self, user_pool_id: str, username: str) -> None: user = self.admin_get_user(user_pool_id, username) if not user.enabled: raise NotAuthorizedError("User is disabled") if user.status is UserStatus.RESET_REQUIRED: return if user.status is not UserStatus.CONFIRMED: raise NotAuthorizedError( "User password cannot be reset in the current state." ) if ( user.attribute_lookup.get("email_verified", "false") == "false" and user.attribute_lookup.get("phone_number_verified", "false") == "false" ): raise InvalidParameterException( "Cannot reset password for the user as there is no registered/verified email or phone_number" ) user.status = UserStatus.RESET_REQUIRED # User def admin_create_user( self, user_pool_id: str, username: str, message_action: str, temporary_password: str, attributes: List[Dict[str, str]], ) -> CognitoIdpUser: user_pool = self.describe_user_pool(user_pool_id) if message_action and message_action == "RESEND": self.admin_get_user(user_pool_id, username) elif user_pool._get_user(username): raise UsernameExistsException(username) # UsernameAttributes are attributes (either `email` or `phone_number` # or both) than can be used in the place of a unique username. If the # user provides an email or phone number when signing up, the user pool # performs the following steps: # 1. populates the correct field (email, phone_number) with the value # supplied for Username # 2. generates a persistent GUID for the user that will be returned as # the value of `Username` in the `get-user` and `list-users` # operations, as well as the value of `sub` in `IdToken` and # `AccessToken` # # ref: https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-settings-attributes.html#user-pool-settings-aliases-settings has_username_attrs = user_pool.extended_config.get("UsernameAttributes") if has_username_attrs: username_attributes = user_pool.extended_config["UsernameAttributes"] # attribute_type should be one of `email`, `phone_number` or both for attribute_type in username_attributes: # check if provided username matches one of the attribute types in # `UsernameAttributes` if attribute_type in username_attributes and validate_username_format( username, _format=attribute_type ): # insert provided username into new user's attributes under the # correct key flattened_attrs = flatten_attrs(attributes or []) flattened_attrs.update({attribute_type: username}) attributes = expand_attrs(flattened_attrs) # once the username has been validated against a username attribute # type, there is no need to attempt validation against the other # type(s) break # The provided username has not matched the required format for any # of the possible attributes else: raise InvalidParameterException( "Username should be either an email or a phone number." ) user = CognitoIdpUser( user_pool_id, # set username to None so that it will be default to the internal GUID # when them user gets created None if has_username_attrs else username, temporary_password, UserStatus.FORCE_CHANGE_PASSWORD, attributes, ) user_pool.users[user.username] = user return user def admin_confirm_sign_up(self, user_pool_id: str, username: str) -> str: user = self.admin_get_user(user_pool_id, username) user.status = UserStatus["CONFIRMED"] return "" def admin_get_user(self, user_pool_id: str, username: str) -> CognitoIdpUser: user_pool = self.describe_user_pool(user_pool_id) user = user_pool._get_user(username) if not user: raise UserNotFoundError("User does not exist.") return user def get_user(self, access_token: str) -> CognitoIdpUser: for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] user = self.admin_get_user(user_pool.id, username) if ( not user or not user.enabled or user.status is not UserStatus.CONFIRMED ): raise NotAuthorizedError("username") return user raise NotAuthorizedError("Invalid token") @paginate(pagination_model=PAGINATION_MODEL) def list_users(self, user_pool_id: str, filt: str) -> List[CognitoIdpUser]: user_pool = self.describe_user_pool(user_pool_id) users = list(user_pool.users.values()) if filt: inherent_attributes: Dict[str, Any] = { "cognito:user_status": lambda u: u.status, "status": lambda u: "Enabled" if u.enabled else "Disabled", "username": lambda u: u.username, } comparisons: Dict[str, Any] = { "=": lambda x, y: x == y, "^=": lambda x, y: x.startswith(y), } allowed_attributes = [ "username", "email", "phone_number", "name", "given_name", "family_name", "preferred_username", "cognito:user_status", "status", "sub", ] match = re.match(r"([\w:]+)\s*(=|\^=)\s*\"(.*)\"", filt) if match: name, op, value = match.groups() else: raise InvalidParameterException("Error while parsing filter") if name not in allowed_attributes: raise InvalidParameterException(f"Invalid search attribute: {name}") compare = comparisons[op] users = [ user for user in users if [ attr for attr in user.attributes if attr["Name"] == name and compare(attr["Value"], value) ] or ( name in inherent_attributes and compare(inherent_attributes[name](user), value) ) ] return users def admin_disable_user(self, user_pool_id: str, username: str) -> None: user = self.admin_get_user(user_pool_id, username) user.enabled = False def admin_enable_user(self, user_pool_id: str, username: str) -> None: user = self.admin_get_user(user_pool_id, username) user.enabled = True def admin_delete_user(self, user_pool_id: str, username: str) -> None: user_pool = self.describe_user_pool(user_pool_id) user = self.admin_get_user(user_pool_id, username) for group in user.groups: group.users.remove(user) # use internal username del user_pool.users[user.username] def _log_user_in( self, user_pool: CognitoIdpUserPool, client: CognitoIdpUserPoolClient, username: str, ) -> Dict[str, Dict[str, Any]]: refresh_token, _ = user_pool.create_refresh_token(client.id, username) access_token, id_token, expires_in = user_pool.create_tokens_from_refresh_token( refresh_token ) return { "ChallengeParameters": {}, "AuthenticationResult": { "IdToken": id_token, "AccessToken": access_token, "RefreshToken": refresh_token, "ExpiresIn": expires_in, "TokenType": "Bearer", }, } def _validate_auth_flow( self, auth_flow: str, valid_flows: typing.List[AuthFlow] ) -> AuthFlow: """validate auth_flow value and convert auth_flow to enum""" try: auth_flow = AuthFlow[auth_flow] except KeyError: raise InvalidParameterException( f"1 validation error detected: Value '{auth_flow}' at 'authFlow' failed to satisfy constraint: " f"Member must satisfy enum value set: " f"{AuthFlow.list()}" ) if auth_flow not in valid_flows: raise InvalidParameterException("Initiate Auth method not supported") return auth_flow def admin_initiate_auth( self, user_pool_id: str, client_id: str, auth_flow: str, auth_parameters: Dict[str, str], ) -> Dict[str, Any]: admin_auth_flows = [ AuthFlow.ADMIN_NO_SRP_AUTH, AuthFlow.ADMIN_USER_PASSWORD_AUTH, AuthFlow.REFRESH_TOKEN_AUTH, AuthFlow.REFRESH_TOKEN, ] auth_flow = self._validate_auth_flow( auth_flow=auth_flow, valid_flows=admin_auth_flows ) user_pool = self.describe_user_pool(user_pool_id) client = user_pool.clients.get(client_id) if not client: raise ResourceNotFoundError(client_id) if auth_flow in (AuthFlow.ADMIN_USER_PASSWORD_AUTH, AuthFlow.ADMIN_NO_SRP_AUTH): username: str = auth_parameters.get("USERNAME") # type: ignore[assignment] password: str = auth_parameters.get("PASSWORD") # type: ignore[assignment] user = self.admin_get_user(user_pool_id, username) if not user.enabled: raise NotAuthorizedError("User is disabled.") if user.password != password: raise NotAuthorizedError(username) if user.status in [ UserStatus.FORCE_CHANGE_PASSWORD, UserStatus.RESET_REQUIRED, ]: session = str(random.uuid4()) self.sessions[session] = (username, user_pool) return { "ChallengeName": "NEW_PASSWORD_REQUIRED", "ChallengeParameters": {}, "Session": session, } if ( user.software_token_mfa_enabled and user.preferred_mfa_setting == "SOFTWARE_TOKEN_MFA" ): session = str(random.uuid4()) self.sessions[session] = (username, user_pool) return { "ChallengeName": "SOFTWARE_TOKEN_MFA", "ChallengeParameters": {}, "Session": session, } if user.sms_mfa_enabled and user.preferred_mfa_setting == "SMS_MFA": session = str(random.uuid4()) self.sessions[session] = (username, user_pool) return { "ChallengeName": "SMS_MFA", "ChallengeParameters": {}, "Session": session, } return self._log_user_in(user_pool, client, username) elif auth_flow in (AuthFlow.REFRESH_TOKEN, AuthFlow.REFRESH_TOKEN_AUTH): refresh_token: str = auth_parameters.get("REFRESH_TOKEN") # type: ignore[assignment] ( access_token, id_token, expires_in, ) = user_pool.create_tokens_from_refresh_token(refresh_token) return { "AuthenticationResult": { "IdToken": id_token, "AccessToken": access_token, "ExpiresIn": expires_in, "TokenType": "Bearer", } } else: # We shouldn't get here due to enum validation of auth_flow return None # type: ignore[return-value] def admin_respond_to_auth_challenge( self, session: str, client_id: str, challenge_name: str, challenge_responses: Dict[str, str], ) -> Dict[str, Any]: # Responds to an authentication challenge, as an administrator. # The only differences between this admin endpoint and public endpoint are not relevant and so we can safely call # the public endpoint to do the work: # - The admin endpoint requires a user pool id along with a session; the public endpoint searches across all pools # - ContextData is passed in; we don't use it return self.respond_to_auth_challenge( session, client_id, challenge_name, challenge_responses ) def respond_to_auth_challenge( self, session: str, client_id: str, challenge_name: str, challenge_responses: Dict[str, str], ) -> Dict[str, Any]: if challenge_name == "PASSWORD_VERIFIER": session = challenge_responses.get("PASSWORD_CLAIM_SECRET_BLOCK") # type: ignore[assignment] if session not in self.sessions: raise ResourceNotFoundError(session) _, user_pool = self.sessions[session] client = user_pool.clients.get(client_id) if not client: raise ResourceNotFoundError(client_id) if challenge_name == "NEW_PASSWORD_REQUIRED": username: str = challenge_responses.get("USERNAME") # type: ignore[assignment] new_password = challenge_responses.get("NEW_PASSWORD") if not new_password: raise InvalidPasswordException() self._validate_password(user_pool.id, new_password) user = self.admin_get_user(user_pool.id, username) user.password = new_password user.status = UserStatus.CONFIRMED if user_pool.mfa_config == "ON": mfas_can_setup = [] if user_pool.token_mfa_config == {"Enabled": True}: mfas_can_setup.append("SOFTWARE_TOKEN_MFA") if user_pool.sms_mfa_config: mfas_can_setup.append("SMS_MFA") if ( mfas_can_setup and not user.software_token_mfa_enabled and not user.sms_mfa_enabled ): return { "ChallengeName": "MFA_SETUP", "ChallengeParameters": {"MFAS_CAN_SETUP": mfas_can_setup}, "Session": session, } del self.sessions[session] return self._log_user_in(user_pool, client, username) elif challenge_name == "PASSWORD_VERIFIER": username: str = challenge_responses.get("USERNAME") # type: ignore[no-redef] user = self.admin_get_user(user_pool.id, username) password_claim_signature = challenge_responses.get( "PASSWORD_CLAIM_SIGNATURE" ) if not password_claim_signature: raise ResourceNotFoundError(password_claim_signature) password_claim_secret_block = challenge_responses.get( "PASSWORD_CLAIM_SECRET_BLOCK" ) if not password_claim_secret_block: raise ResourceNotFoundError(password_claim_secret_block) timestamp = challenge_responses.get("TIMESTAMP") if not timestamp: raise ResourceNotFoundError(timestamp) if user.status == UserStatus.FORCE_CHANGE_PASSWORD: return { "ChallengeName": "NEW_PASSWORD_REQUIRED", "ChallengeParameters": { "USERNAME": username, }, "Session": session, } if user_pool.mfa_config == "ON" and not user.token_verified: mfas_can_setup = [] if user_pool.token_mfa_config == {"Enabled": True}: mfas_can_setup.append("SOFTWARE_TOKEN_MFA") if user_pool.sms_mfa_config: mfas_can_setup.append("SMS_MFA") return { "ChallengeName": "MFA_SETUP", "ChallengeParameters": {"MFAS_CAN_SETUP": mfas_can_setup}, "Session": session, } if user.software_token_mfa_enabled or ( user_pool.token_mfa_config == {"Enabled": True} and user.token_verified ): return { "ChallengeName": "SOFTWARE_TOKEN_MFA", "Session": session, "ChallengeParameters": {}, } if user.sms_mfa_enabled: return { "ChallengeName": "SMS_MFA", "Session": session, "ChallengeParameters": {}, } del self.sessions[session] return self._log_user_in(user_pool, client, username) elif challenge_name == "SOFTWARE_TOKEN_MFA" or challenge_name == "SMS_MFA": username: str = challenge_responses.get("USERNAME") # type: ignore[no-redef] self.admin_get_user(user_pool.id, username) mfa_code = challenge_responses.get(f"{challenge_name}_CODE") if not mfa_code: raise ResourceNotFoundError(mfa_code) if client.generate_secret: secret_hash = challenge_responses.get("SECRET_HASH") if not check_secret_hash( client.secret, client.id, username, secret_hash ): raise NotAuthorizedError(secret_hash) del self.sessions[session] return self._log_user_in(user_pool, client, username) elif challenge_name == "MFA_SETUP": username, user_pool = self.sessions[session] return self._log_user_in(user_pool, client, username) else: return {} def confirm_forgot_password( self, client_id: str, username: str, password: str, confirmation_code: str ) -> None: for user_pool in self.user_pools.values(): if client_id in user_pool.clients and user_pool._get_user(username): user = user_pool._get_user(username) if ( confirmation_code.startswith("moto-confirmation-code:") and user.confirmation_code != confirmation_code ): raise ExpiredCodeException( "Invalid code provided, please request a code again." ) user.password = password user.confirmation_code = None break else: raise ResourceNotFoundError(client_id) def forgot_password( self, client_id: str, username: str ) -> Tuple[Optional[str], Dict[str, Any]]: """ The ForgotPassword operation is partially broken in AWS. If the input is 100% correct it works fine. Otherwise you get semi-random garbage and HTTP 200 OK, for example: - recovery for username which is not registered in any cognito pool - recovery for username belonging to a different user pool than the client id is registered to - phone-based recovery for a user without phone_number / phone_number_verified attributes - same as above, but email / email_verified """ for user_pool in self.user_pools.values(): if client_id in user_pool.clients: recovery_settings = user_pool.extended_config["AccountRecoverySetting"] user = user_pool._get_user(username) break else: raise ResourceNotFoundError("Username/client id combination not found.") confirmation_code: Optional[str] = None if user: # An unfortunate bit of magic - confirmation_code is opt-in, as it's returned # via a "x-moto-forgot-password-confirmation-code" http header, which is not the AWS way (should be SES, SNS, Cognito built-in email) # Verification of user.confirmation_code vs received code will be performed only for codes # beginning with 'moto-confirmation-code' prefix. All other codes are considered VALID. confirmation_code = ( f"moto-confirmation-code:{random.randint(100_000, 999_999)}" ) user.confirmation_code = confirmation_code code_delivery_details = self._get_code_delivery_details( recovery_settings, user, username ) return confirmation_code, {"CodeDeliveryDetails": code_delivery_details} def _get_code_delivery_details( self, recovery_settings: Any, user: Optional[CognitoIdpUser], username: str ) -> Dict[str, str]: selected_recovery = min( recovery_settings["RecoveryMechanisms"], key=lambda recovery_mechanism: recovery_mechanism["Priority"], ) if selected_recovery["Name"] == "admin_only": raise NotAuthorizedError("Contact administrator to reset password.") if selected_recovery["Name"] == "verified_phone_number": number = "+*******9934" if user and "phone_number" in user.attribute_lookup: number = user.attribute_lookup["phone_number"] return { "Destination": number, "DeliveryMedium": "SMS", "AttributeName": "phone_number", } else: email = username + "@h***.com" if user and "email" in user.attribute_lookup: first, second = user.attribute_lookup["email"].split("@") email = f"{first[0]}***@{second[0]}***" return { "Destination": email, "DeliveryMedium": "EMAIL", "AttributeName": "email", } def change_password( self, access_token: str, previous_password: str, proposed_password: str ) -> None: for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: self._validate_password( user_pool_id=user_pool.id, password=proposed_password ) _, username = user_pool.access_tokens[access_token] user = self.admin_get_user(user_pool.id, username) if user.password != previous_password: raise NotAuthorizedError(username) user.password = proposed_password if user.status in [ UserStatus.FORCE_CHANGE_PASSWORD, UserStatus.RESET_REQUIRED, ]: user.status = UserStatus.CONFIRMED break else: raise NotAuthorizedError(access_token) def admin_update_user_attributes( self, user_pool_id: str, username: str, attributes: List[Dict[str, str]] ) -> None: user = self.admin_get_user(user_pool_id, username) email = self._find_attr("email", attributes) self._verify_email_is_not_used(user_pool_id, email) user.update_attributes(attributes) def admin_delete_user_attributes( self, user_pool_id: str, username: str, attributes: List[str] ) -> None: self.admin_get_user(user_pool_id, username).delete_attributes(attributes) def admin_user_global_sign_out(self, user_pool_id: str, username: str) -> None: user_pool = self.describe_user_pool(user_pool_id) self.admin_get_user(user_pool_id, username) user_pool.sign_out(username) def global_sign_out(self, access_token: str) -> None: for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] user_pool.sign_out(username) return raise NotAuthorizedError(access_token) def create_resource_server( self, user_pool_id: str, identifier: str, name: str, scopes: List[Dict[str, str]], ) -> CognitoResourceServer: user_pool = self.describe_user_pool(user_pool_id) if identifier in user_pool.resource_servers: raise InvalidParameterException( f"{identifier} already exists in user pool {user_pool_id}." ) resource_server = CognitoResourceServer(user_pool_id, identifier, name, scopes) user_pool.resource_servers[identifier] = resource_server return resource_server def describe_resource_server( self, user_pool_id: str, identifier: str ) -> CognitoResourceServer: user_pool = self.user_pools.get(user_pool_id) if not user_pool: raise ResourceNotFoundError(f"User pool {user_pool_id} does not exist.") resource_server = user_pool.resource_servers.get(identifier) if not resource_server: raise ResourceNotFoundError(f"Resource server {identifier} does not exist.") return resource_server @paginate(pagination_model=PAGINATION_MODEL) def list_resource_servers(self, user_pool_id: str) -> List[CognitoResourceServer]: user_pool = self.user_pools[user_pool_id] resource_servers = list(user_pool.resource_servers.values()) return resource_servers def sign_up( self, client_id: str, username: str, password: str, attributes: List[Dict[str, str]], ) -> Tuple[CognitoIdpUser, Any]: user_pool = None for p in self.user_pools.values(): if client_id in p.clients: user_pool = p if user_pool is None: raise ResourceNotFoundError(client_id) elif user_pool._get_user(username): raise UsernameExistsException(username) # UsernameAttributes are attributes (either `email` or `phone_number` # or both) than can be used in the place of a unique username. If the # user provides an email or phone number when signing up, the user pool # performs the following steps: # 1. populates the correct field (email, phone_number) with the value # supplied for Username # 2. generates a persistent GUID for the user that will be returned as # the value of `Username` in the `get-user` and `list-users` # operations, as well as the value of `sub` in `IdToken` and # `AccessToken` # # ref: https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-settings-attributes.html#user-pool-settings-aliases-settings has_username_attrs = user_pool.extended_config.get("UsernameAttributes") if has_username_attrs: username_attributes = user_pool.extended_config["UsernameAttributes"] # attribute_type should be one of `email`, `phone_number` or both for attribute_type in username_attributes: # check if provided username matches one of the attribute types in # `UsernameAttributes` if attribute_type in username_attributes and validate_username_format( username, _format=attribute_type ): # insert provided username into new user's attributes under the # correct key flattened_attrs = flatten_attrs(attributes or []) flattened_attrs.update({attribute_type: username}) attributes = expand_attrs(flattened_attrs) # once the username has been validated against a username attribute # type, there is no need to attempt validation against the other # type(s) break else: # The provided username has not matched the required format for any # of the possible attributes raise InvalidParameterException( "Username should be either an email or a phone number." ) self._validate_password(user_pool.id, password) user = CognitoIdpUser( user_pool_id=user_pool.id, # set username to None so that it will be default to the internal GUID # when them user gets created username=None if has_username_attrs else username, password=password, attributes=attributes, status=UserStatus.UNCONFIRMED, ) user_pool.users[user.username] = user has_email = "email" in user.attribute_lookup has_phone = "phone_number" in user.attribute_lookup verified_attributes = user_pool.extended_config.get( "AutoVerifiedAttributes", [] ) email_verified = ( "email_verified" in user.attribute_lookup or "email" in verified_attributes ) phone_verified = ( "phone_number_verified" in user.attribute_lookup or "phone_number" in verified_attributes ) if (has_email and email_verified) or (has_phone and phone_verified): recovery_settings = user_pool.extended_config["AccountRecoverySetting"] details = self._get_code_delivery_details( recovery_settings=recovery_settings, user=user, username=user.username ) return user, details else: return user, None def confirm_sign_up(self, client_id: str, username: str) -> str: user_pool = None for p in self.user_pools.values(): if client_id in p.clients: user_pool = p if user_pool is None: raise ResourceNotFoundError(client_id) user = self.admin_get_user(user_pool.id, username) user.status = UserStatus.CONFIRMED return "" def initiate_auth( self, client_id: str, auth_flow: str, auth_parameters: Dict[str, str] ) -> Dict[str, Any]: user_auth_flows = [ AuthFlow.USER_SRP_AUTH, AuthFlow.REFRESH_TOKEN_AUTH, AuthFlow.REFRESH_TOKEN, AuthFlow.CUSTOM_AUTH, AuthFlow.USER_PASSWORD_AUTH, ] auth_flow = self._validate_auth_flow( auth_flow=auth_flow, valid_flows=user_auth_flows ) user_pool: Optional[CognitoIdpUserPool] = None client: CognitoIdpUserPoolClient = None # type: ignore[assignment] for p in self.user_pools.values(): if client_id in p.clients: user_pool = p client = p.clients[client_id] if user_pool is None: raise ResourceNotFoundError(client_id) if auth_flow is AuthFlow.USER_SRP_AUTH: username: str = auth_parameters.get("USERNAME") # type: ignore[assignment] srp_a = auth_parameters.get("SRP_A") if not srp_a: raise ResourceNotFoundError(srp_a) if client.generate_secret: secret_hash: str = auth_parameters.get("SECRET_HASH") # type: ignore[assignment] if not check_secret_hash( client.secret, client.id, username, secret_hash ): raise NotAuthorizedError(secret_hash) user = self.admin_get_user(user_pool.id, username) if not user.enabled: raise NotAuthorizedError("User is disabled.") if user.status is UserStatus.UNCONFIRMED: raise UserNotConfirmedException("User is not confirmed.") session = str(random.uuid4()) self.sessions[session] = (username, user_pool) return { "ChallengeName": "PASSWORD_VERIFIER", "Session": session, "ChallengeParameters": { "SALT": random.uuid4().hex, "SRP_B": random.uuid4().hex, "USERNAME": user.username, "USER_ID_FOR_SRP": user.id, "SECRET_BLOCK": session, }, } elif auth_flow is AuthFlow.USER_PASSWORD_AUTH: username: str = auth_parameters.get("USERNAME") # type: ignore[no-redef] password: str = auth_parameters.get("PASSWORD") # type: ignore[assignment] user = self.admin_get_user(user_pool.id, username) if not user: raise UserNotFoundError(username) if not user.enabled: raise NotAuthorizedError("User is disabled.") if user.password != password: raise NotAuthorizedError("Incorrect username or password.") if user.status is UserStatus.UNCONFIRMED: raise UserNotConfirmedException("User is not confirmed.") session = str(random.uuid4()) self.sessions[session] = (username, user_pool) if user.status is UserStatus.FORCE_CHANGE_PASSWORD: return { "ChallengeName": "NEW_PASSWORD_REQUIRED", "ChallengeParameters": {"USERNAME": user.username}, "Session": session, } if ( user.software_token_mfa_enabled and user.preferred_mfa_setting == "SOFTWARE_TOKEN_MFA" ) or (user_pool.mfa_config == "ON" and user.token_verified): return { "ChallengeName": "SOFTWARE_TOKEN_MFA", "ChallengeParameters": {}, "Session": session, } if user.sms_mfa_enabled and user.preferred_mfa_setting == "SMS_MFA": return { "ChallengeName": "SMS_MFA", "ChallengeParameters": {}, "Session": session, } new_refresh_token, origin_jti = user_pool.create_refresh_token( client_id, username ) access_token, expires_in = user_pool.create_access_token( client_id, username, origin_jti=origin_jti ) id_token, _ = user_pool.create_id_token( client_id, username, origin_jti=origin_jti ) return { "AuthenticationResult": { "IdToken": id_token, "AccessToken": access_token, "ExpiresIn": expires_in, "RefreshToken": new_refresh_token, "TokenType": "Bearer", } } elif auth_flow in (AuthFlow.REFRESH_TOKEN, AuthFlow.REFRESH_TOKEN_AUTH): refresh_token = auth_parameters.get("REFRESH_TOKEN") if not refresh_token: raise ResourceNotFoundError(refresh_token) res = user_pool.refresh_tokens.get(refresh_token) if res is None: raise NotAuthorizedError("Refresh Token has been revoked") client_id, username, _ = res if not username: raise ResourceNotFoundError(username) if client.generate_secret: secret_hash: str = auth_parameters.get("SECRET_HASH") # type: ignore[no-redef] if not check_secret_hash( client.secret, client.id, username, secret_hash ): raise NotAuthorizedError(secret_hash) ( access_token, id_token, expires_in, ) = user_pool.create_tokens_from_refresh_token(refresh_token) return { "AuthenticationResult": { "IdToken": id_token, "AccessToken": access_token, "ExpiresIn": expires_in, "TokenType": "Bearer", } } else: # We shouldn't get here due to enum validation of auth_flow return None # type: ignore[return-value] def associate_software_token( self, access_token: str, session: str ) -> Dict[str, str]: secret_code = "asdfasdfasdf" if session: if session in self.sessions: return {"SecretCode": secret_code, "Session": session} raise NotAuthorizedError(session) for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] self.admin_get_user(user_pool.id, username) return {"SecretCode": secret_code} raise NotAuthorizedError(access_token) def verify_software_token(self, access_token: str, session: str) -> Dict[str, str]: """ The parameter UserCode has not yet been implemented """ if session: if session not in self.sessions: raise ResourceNotFoundError(session) username, user_pool = self.sessions[session] user = self.admin_get_user(user_pool.id, username) user.token_verified = True session = str(random.uuid4()) self.sessions[session] = (username, user_pool) return {"Status": "SUCCESS", "Session": session} for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] user = self.admin_get_user(user_pool.id, username) user.token_verified = True session = str(random.uuid4()) self.sessions[session] = (username, user_pool) return {"Status": "SUCCESS", "Session": session} raise NotAuthorizedError(access_token) def set_user_mfa_preference( self, access_token: str, software_token_mfa_settings: Dict[str, bool], sms_mfa_settings: Dict[str, bool], ) -> None: for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] return self.admin_set_user_mfa_preference( user_pool.id, username, software_token_mfa_settings, sms_mfa_settings, ) raise NotAuthorizedError(access_token) def admin_set_user_mfa_preference( self, user_pool_id: str, username: str, software_token_mfa_settings: Dict[str, bool], sms_mfa_settings: Dict[str, bool], ) -> None: user = self.admin_get_user(user_pool_id, username) if software_token_mfa_settings: if software_token_mfa_settings.get("Enabled"): if user.token_verified: user.software_token_mfa_enabled = True else: raise InvalidParameterException( "User has not verified software token mfa" ) else: user.software_token_mfa_enabled = False if software_token_mfa_settings.get("PreferredMfa"): user.preferred_mfa_setting = "SOFTWARE_TOKEN_MFA" elif user.preferred_mfa_setting != "SMS_MFA": user.preferred_mfa_setting = "" if sms_mfa_settings: if sms_mfa_settings.get("Enabled"): user.sms_mfa_enabled = True else: user.sms_mfa_enabled = False if sms_mfa_settings.get("PreferredMfa"): user.preferred_mfa_setting = "SMS_MFA" elif user.preferred_mfa_setting != "SOFTWARE_TOKEN_MFA": user.preferred_mfa_setting = "" return None def _validate_password(self, user_pool_id: str, password: str) -> None: user_pool = self.describe_user_pool(user_pool_id) password_policy = user_pool.extended_config.get("Policies", {}).get( "PasswordPolicy", {} ) minimum = password_policy.get("MinimumLength", 5) maximum = password_policy.get("MaximumLength", 99) require_uppercase = password_policy.get("RequireUppercase", True) require_lowercase = password_policy.get("RequireLowercase", True) require_numbers = password_policy.get("RequireNumbers", True) require_symbols = password_policy.get("RequireSymbols", True) flagl = minimum <= len(password) < maximum flagn = not require_numbers or bool(re.search(r"\d", password)) # If we require symbols, we assume False - and check a symbol is present # If we don't require symbols, we assume True - and we could technically skip the for-loop flag_sc = not require_symbols sc = "^ $ * . [ ] { } ( ) ? \" ! @ # % & / \\ , > < ' : ; | _ ~ ` = + -" for i in password: if i in sc: flag_sc = True flag_u = not require_uppercase or bool(re.search(r"[A-Z]+", password)) flag_lo = not require_lowercase or bool(re.search(r"[a-z]+", password)) if not (flagl and flagn and flag_sc and flag_u and flag_lo): raise InvalidPasswordException() def admin_set_user_password( self, user_pool_id: str, username: str, password: str, permanent: bool ) -> None: user = self.admin_get_user(user_pool_id, username) # user.password = password self._validate_password(user_pool_id, password) user.password = password if permanent: user.status = UserStatus.CONFIRMED else: user.status = UserStatus.FORCE_CHANGE_PASSWORD def add_custom_attributes( self, user_pool_id: str, custom_attributes: List[Dict[str, Any]] ) -> None: user_pool = self.describe_user_pool(user_pool_id) user_pool.add_custom_attributes(custom_attributes) def update_user_attributes( self, access_token: str, attributes: List[Dict[str, str]] ) -> None: """ The parameter ClientMetadata has not yet been implemented. No CodeDeliveryDetails are returned. """ for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] user = self.admin_get_user(user_pool.id, username) email = self._find_attr("email", attributes) self._verify_email_is_not_used(user_pool.id, email) user.update_attributes(attributes) return raise NotAuthorizedError(access_token) def _find_attr(self, name: str, attrs: List[Dict[str, str]]) -> Optional[str]: return next((a["Value"] for a in attrs if a["Name"] == name), None) def _verify_email_is_not_used( self, user_pool_id: str, email: Optional[str] ) -> None: if not email: # We're not updating emails return user_pool = self.describe_user_pool(user_pool_id) if "email" not in user_pool.extended_config.get("UsernameAttributes", []): # email is not used as a username - duplicate emails are allowed return for user in user_pool.users.values(): if user.attribute_lookup.get("email", "") == email: raise AliasExistsException class RegionAgnosticBackend: # Some operations are unauthenticated # Without authentication-header, we lose the context of which region the request was send to # This backend will cycle through all backends as a workaround def __init__(self, account_id: str, region_name: str): self.account_id = account_id self.region_name = region_name def _find_backend_by_access_token(self, access_token: str) -> CognitoIdpBackend: for account_specific_backends in cognitoidp_backends.values(): for region, backend in account_specific_backends.items(): if region == "global": continue for p in backend.user_pools.values(): if access_token in p.access_tokens: return backend return cognitoidp_backends[self.account_id][self.region_name] def _find_backend_by_access_token_or_session( self, access_token: str, session: str ) -> CognitoIdpBackend: for account_specific_backends in cognitoidp_backends.values(): for region, backend in account_specific_backends.items(): if region == "global": continue if session and session in backend.sessions: return backend for p in backend.user_pools.values(): if access_token and access_token in p.access_tokens: return backend return cognitoidp_backends[self.account_id][self.region_name] def _find_backend_for_clientid(self, client_id: str) -> CognitoIdpBackend: for account_specific_backends in cognitoidp_backends.values(): for region, backend in account_specific_backends.items(): if region == "global": continue for p in backend.user_pools.values(): if client_id in p.clients: return backend return cognitoidp_backends[self.account_id][self.region_name] def sign_up( self, client_id: str, username: str, password: str, attributes: List[Dict[str, str]], ) -> Tuple[CognitoIdpUser, Any]: backend = self._find_backend_for_clientid(client_id) return backend.sign_up(client_id, username, password, attributes) def initiate_auth( self, client_id: str, auth_flow: str, auth_parameters: Dict[str, str] ) -> Dict[str, Any]: backend = self._find_backend_for_clientid(client_id) return backend.initiate_auth(client_id, auth_flow, auth_parameters) def confirm_sign_up(self, client_id: str, username: str) -> str: backend = self._find_backend_for_clientid(client_id) return backend.confirm_sign_up(client_id, username) def get_user(self, access_token: str) -> CognitoIdpUser: backend = self._find_backend_by_access_token(access_token) return backend.get_user(access_token) def admin_respond_to_auth_challenge( self, session: str, client_id: str, challenge_name: str, challenge_responses: Dict[str, str], ) -> Dict[str, Any]: backend = self._find_backend_for_clientid(client_id) return backend.admin_respond_to_auth_challenge( session, client_id, challenge_name, challenge_responses ) def respond_to_auth_challenge( self, session: str, client_id: str, challenge_name: str, challenge_responses: Dict[str, str], ) -> Dict[str, Any]: backend = self._find_backend_for_clientid(client_id) return backend.respond_to_auth_challenge( session, client_id, challenge_name, challenge_responses ) def associate_software_token( self, access_token: str, session: str ) -> Dict[str, str]: backend = self._find_backend_by_access_token_or_session(access_token, session) return backend.associate_software_token(access_token, session) def verify_software_token(self, access_token: str, session: str) -> Dict[str, str]: backend = self._find_backend_by_access_token_or_session(access_token, session) return backend.verify_software_token(access_token, session) def set_user_mfa_preference( self, access_token: str, software_token_mfa_settings: Dict[str, bool], sms_mfa_settings: Dict[str, bool], ) -> None: backend = self._find_backend_by_access_token(access_token) return backend.set_user_mfa_preference( access_token, software_token_mfa_settings, sms_mfa_settings ) def update_user_attributes( self, access_token: str, attributes: List[Dict[str, str]] ) -> None: backend = self._find_backend_by_access_token(access_token) return backend.update_user_attributes(access_token, attributes) cognitoidp_backends = BackendDict(CognitoIdpBackend, "cognito-idp") # Hack to help moto-server process requests on localhost, where the region isn't # specified in the host header. Some endpoints (change password, confirm forgot # password) have no authorization header from which to extract the region. def find_account_region_by_value( key: str, value: str, fallback: Tuple[str, str] ) -> Tuple[str, str]: for account_id, account_specific_backend in cognitoidp_backends.items(): for region, backend in account_specific_backend.items(): for user_pool in backend.user_pools.values(): if key == "client_id" and value in user_pool.clients: return account_id, region if key == "access_token" and value in user_pool.access_tokens: return account_id, region # If we can't find the `client_id` or `access_token`, we just pass # back a default backend region, which will raise the appropriate # error message (e.g. NotAuthorized or NotFound). return fallback
Memory