import re
from collections import OrderedDict
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import (
camelcase_to_underscores,
iso_8601_datetime_with_milliseconds,
)
from moto.moto_api._internal import mock_random
from moto.utilities.paginator import paginate
from moto.utilities.tagging_service import TaggingService
from moto.utilities.utils import ARN_PARTITION_REGEX, PARTITION_NAMES
from .exceptions import (
WAFNonexistentItemException,
WAFOptimisticLockException,
WAFV2DuplicateItemException,
WAFV2InsufficientInformationException,
)
from .utils import make_arn_for_ip_set, make_arn_for_rule_group, make_arn_for_wacl
if TYPE_CHECKING:
from moto.apigateway.models import Stage
US_EAST_1_REGION = "us-east-1"
GLOBAL_REGION = "global"
APIGATEWAY_REGEX = (
ARN_PARTITION_REGEX
+ r":apigateway:[a-zA-Z0-9-]+::/restapis/[a-zA-Z0-9]+/stages/[a-zA-Z0-9]+"
)
PAGINATION_MODEL = {
"list_ip_sets": {
"input_token": "next_marker",
"limit_key": "limit",
"limit_default": 100,
"unique_attribute": "arn",
},
"list_logging_configurations": {
"input_token": "next_marker",
"limit_key": "limit",
"limit_default": 100,
"unique_attribute": "arn",
},
"list_rule_groups": {
"input_token": "next_marker",
"limit_key": "limit",
"limit_default": 100,
"unique_attribute": "arn",
},
"list_tags_for_resource": {
"input_token": "next_marker",
"limit_key": "limit",
"limit_default": 100,
"unique_attribute": "Key",
},
"list_web_acls": {
"input_token": "next_marker",
"limit_key": "limit",
"limit_default": 100,
"unique_attribute": "arn",
},
}
class FakeRule(BaseModel):
def __init__(
self,
name: str,
priority: int,
statement: Dict[str, Any],
visibility_config: Dict[str, Union[str, bool]],
action: Optional[Dict[str, Any]] = None,
captcha_config: Optional[Dict[str, Dict[str, int]]] = None,
challenge_config: Optional[Dict[str, Dict[str, int]]] = None,
override_action: Optional[Dict[str, Any]] = None,
rule_labels: Optional[List[Dict[str, str]]] = None,
):
self.name = name
self.priority = priority
self.statement = statement
self.visibility_config = visibility_config
self.action = action
self.captcha_config = captcha_config
self.challenge_config = challenge_config
self.override_action = override_action
self.rule_labels = rule_labels
def get_consumed_label(self) -> Optional[str]:
return self.statement.get("LabelMatchStatement", {}).get("Key")
def get_available_labels(self) -> Optional[List[str]]:
return [r["Name"] for r in self.rule_labels] if self.rule_labels else None
def to_dict(self) -> Dict[str, Any]:
return {
"Name": self.name,
"Action": self.action,
"Priority": self.priority,
"Statement": self.statement,
"VisibilityConfig": self.visibility_config,
"CaptchaConfig": self.captcha_config,
"ChallengeConfig": self.challenge_config,
"OverrideAction": self.override_action,
"RuleLabels": self.rule_labels,
}
class FakeWebACL(BaseModel):
"""
https://docs.aws.amazon.com/waf/latest/APIReference/API_WebACL.html
"""
def __init__(
self,
name: str,
account: str,
arn: str,
wacl_id: str,
visibility_config: Dict[str, Any],
default_action: Dict[str, Any],
description: Optional[str],
rules: Optional[List[FakeRule]],
association_config: Optional[Dict[str, Any]] = None,
captcha_config: Optional[Dict[str, Any]] = None,
challenge_config: Optional[Dict[str, Any]] = None,
custom_response_bodies: Optional[Dict[str, Any]] = None,
token_domains: Optional[List[str]] = None,
):
self.name = name
self.account = account
self.created_time = iso_8601_datetime_with_milliseconds()
self.id = wacl_id
self.arn = arn
self.description = description or ""
self.capacity = 3
self.rules = rules or []
self.visibility_config = visibility_config
self.default_action = default_action
self.lock_token = self._generate_lock_token()
self.associated_resources: List[str] = []
self.association_config = association_config
self.captcha_config = captcha_config
self.challenge_config = challenge_config
self.custom_response_bodies = custom_response_bodies
self.token_domains = token_domains
self.label_namespace = self._get_label_namespace()
def _generate_lock_token(self) -> str:
return str(mock_random.uuid4())
def _get_label_namespace(self) -> str:
return f"awswaf:{self.account}:webacl:{self.name}:"
def update(
self,
default_action: Optional[Dict[str, Any]],
rules: Optional[List[FakeRule]],
description: Optional[str],
visibility_config: Optional[Dict[str, Any]],
custom_response_bodies: Optional[Dict[str, Any]],
captcha_config: Optional[Dict[str, Any]],
challenge_config: Optional[Dict[str, Any]],
token_domains: Optional[List[str]],
association_config: Optional[Dict[str, Any]],
) -> None:
if default_action is not None:
self.default_action = default_action
if rules is not None:
self.rules = rules
if description is not None:
self.description = description
if visibility_config is not None:
self.visibility_config = visibility_config
if custom_response_bodies is not None:
self.custom_response_bodies = custom_response_bodies
if captcha_config is not None:
self.captcha_config = captcha_config
if challenge_config is not None:
self.challenge_config = challenge_config
if token_domains is not None:
self.token_domains = token_domains
if association_config is not None:
self.association_config = association_config
self.lock_token = self._generate_lock_token()
def to_short_dict(self) -> Dict[str, Any]:
# Format for summary https://docs.aws.amazon.com/waf/latest/APIReference/API_CreateWebACL.html (response syntax section)
return {
"ARN": self.arn,
"Description": self.description,
"Id": self.id,
"Name": self.name,
"LockToken": self.lock_token,
}
def to_dict(self) -> Dict[str, Any]:
return {
"Name": self.name,
"Id": self.id,
"ARN": self.arn,
"Description": self.description,
"Rules": [r.to_dict() for r in self.rules],
"DefaultAction": self.default_action,
"VisibilityConfig": self.visibility_config,
"Capacity": self.capacity,
"LockToken": self.lock_token,
"AssociationConfig": self.association_config,
"CaptchaConfig": self.captcha_config,
"ChallengeConfig": self.challenge_config,
"CustomResponseBodies": self.custom_response_bodies,
"TokenDomains": self.token_domains,
"LabelNamespace": self.label_namespace,
}
class FakeIPSet(BaseModel):
"""
https://docs.aws.amazon.com/waf/latest/APIReference/API_IPSet.html
"""
def __init__(
self,
arn: str,
ip_set_id: str,
ip_address_version: str,
addresses: List[str],
name: str,
description: str,
scope: str,
):
self.name = name
self.ip_set_id = ip_set_id
self.arn = arn
self.addresses = addresses
self.description = description
self.ip_address_version = ip_address_version
self.scope = scope
self.lock_token = str(mock_random.uuid4())[0:6]
def update(self, description: Optional[str], addresses: List[str]) -> None:
if description is not None:
self.description = description
self.addresses = addresses
self.lock_token = str(mock_random.uuid4())[0:6]
def to_dict(self) -> Dict[str, Any]:
return {
"Name": self.name,
"Id": self.ip_set_id,
"ARN": self.arn,
"Description": self.description,
"IPAddressVersion": self.ip_address_version,
"Addresses": self.addresses,
"LockToken": self.lock_token,
}
class FakeLoggingConfiguration(BaseModel):
def __init__(
self,
arn: str,
log_destination_configs: List[str],
redacted_fields: Optional[Dict[str, Any]],
managed_gy_firewall_manager: Optional[bool],
logging_filter: Optional[Dict[str, Any]],
):
self.arn = arn
self.log_destination_configs = log_destination_configs
self.redacted_fields = redacted_fields
self.managed_by_firewall_manager = managed_gy_firewall_manager or False
self.logging_filter = logging_filter
def to_dict(self) -> Dict[str, Any]:
return {
"ResourceArn": self.arn,
"LogDestinationConfigs": self.log_destination_configs,
"RedactedFields": self.redacted_fields,
"ManagedByFirewallManager": self.managed_by_firewall_manager,
"LoggingFilter": self.logging_filter,
}
class FakeRuleGroup(BaseModel):
def __init__(
self,
account: str,
name: str,
id: str,
arn: str,
scope: str,
capacity: int,
visibility_config: Dict[str, Any],
description: Optional[str],
rules: Optional[List[FakeRule]],
custom_response_bodies: Optional[Dict[str, Any]] = None,
):
self.account = account
self.name = name
self.id = id
self.arn = arn
self.lock_token = self._generate_lock_token()
self.scope = scope
self.capacity = capacity
self.description = description or ""
self.rules = rules or []
self.visibility_config = visibility_config
self.custom_response_bodies = custom_response_bodies
self.label_namespace = self._get_label_namespace()
self.available_labels = self._get_available_labels()
self.consumed_labels = self._get_consumed_labels()
def _generate_lock_token(self) -> str:
return str(mock_random.uuid4())
def _get_available_labels(self) -> Optional[List[Dict[str, str]]]:
return (
[
{"Name": f"{self.label_namespace}{label}"}
for rule in self.rules
for label in (rule.get_available_labels() or [])
]
if self.rules
else None
)
def _get_consumed_labels(self) -> Optional[List[Dict[str, str]]]:
return (
[
{"Name": f"{self.label_namespace}{label}"}
for rule in self.rules
if (label := rule.get_consumed_label())
]
if self.rules
else None
)
def _get_label_namespace(self) -> str:
return f"awswaf:{self.account}:rulegroup:{self.name}:"
def update(
self,
description: Optional[str],
rules: Optional[List[FakeRule]],
visibility_config: Optional[Dict[str, Any]],
custom_response_bodies: Optional[Dict[str, Any]],
) -> str:
if description is not None:
self.description = description
if rules is not None:
self.rules = rules
if visibility_config is not None:
self.visibility_config = visibility_config
if custom_response_bodies is not None:
self.custom_response_bodies = custom_response_bodies
self.lock_token = self._generate_lock_token()
return self.lock_token
def to_short_dict(self) -> Dict[str, Any]:
return {
"Name": self.name,
"Id": self.id,
"Description": self.description,
"LockToken": self.lock_token,
"ARN": self.arn,
}
def to_dict(self) -> Dict[str, Any]:
return {
"Name": self.name,
"Id": self.id,
"Capacity": self.capacity,
"ARN": self.arn,
"Description": self.description,
"Rules": [r.to_dict() for r in self.rules],
"VisibilityConfig": self.visibility_config,
"LabelNamespace": self.label_namespace,
"AvailableLabels": self.available_labels,
"ConsumedLabels": self.consumed_labels,
"CustomResponseBodies": self.custom_response_bodies,
"LockToken": self.lock_token,
}
class WAFV2Backend(BaseBackend):
"""
https://docs.aws.amazon.com/waf/latest/APIReference/API_Operations_AWS_WAFV2.html
"""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.wacls: Dict[str, FakeWebACL] = OrderedDict()
self.ip_sets: Dict[str, FakeIPSet] = OrderedDict()
self.logging_configurations: Dict[str, FakeLoggingConfiguration] = OrderedDict()
self.rule_groups: Dict[str, FakeRuleGroup] = OrderedDict()
self.tagging_service = TaggingService()
# TODO: self.load_balancers = OrderedDict()
def associate_web_acl(self, web_acl_arn: str, resource_arn: str) -> None:
web_acl = self.wacls.get(web_acl_arn)
if not web_acl:
raise WAFNonexistentItemException
web_acl.associated_resources.append(resource_arn)
# Special Case - APIGateway wants to know about the WebACL it's associated to
stage = self._find_apigw_stage(resource_arn)
if stage:
stage.web_acl_arn = web_acl_arn
def disassociate_web_acl(self, resource_arn: str) -> None:
for web_acl in self.wacls.values():
if resource_arn in web_acl.associated_resources:
web_acl.associated_resources.remove(resource_arn)
break
stage = self._find_apigw_stage(resource_arn)
if stage:
stage.web_acl_arn = None
def get_web_acl_for_resource(self, resource_arn: str) -> Optional[FakeWebACL]:
for wacl in self.wacls.values():
if resource_arn in wacl.associated_resources:
return wacl
return None
def _find_apigw_stage(self, resource_arn: str) -> Optional["Stage"]: # type: ignore
try:
if re.search(APIGATEWAY_REGEX, resource_arn):
region = resource_arn.split(":")[3]
rest_api_id = resource_arn.split("/")[-3]
stage_name = resource_arn.split("/")[-1]
from moto.apigateway import apigateway_backends
apigw = apigateway_backends[self.account_id][region]
return apigw.get_stage(rest_api_id, stage_name)
except: # noqa: E722 Do not use bare except
return None
def _generate_id(self) -> str:
return str(mock_random.uuid4())
def create_web_acl(
self,
name: str,
visibility_config: Dict[str, Any],
default_action: Dict[str, Any],
scope: str,
description: str,
tags: List[Dict[str, str]],
rules: List[Dict[str, Any]],
association_config: Optional[Dict[str, Any]],
captcha_config: Optional[Dict[str, Any]],
challenge_config: Optional[Dict[str, Any]],
custom_response_bodies: Optional[Dict[str, Any]],
token_domains: Optional[List[str]],
) -> FakeWebACL:
wacl_id = self._generate_id()
arn = make_arn_for_wacl(
name=name,
account_id=self.account_id,
region_name=self.region_name,
_id=wacl_id,
scope=scope,
)
if arn in self.wacls or self._is_duplicate_name(name):
raise WAFV2DuplicateItemException()
rule_objs = [
FakeRule(**{camelcase_to_underscores(k): v for k, v in rule.items()})
for rule in rules
]
new_wacl = FakeWebACL(
name=name,
account=self.account_id,
arn=arn,
wacl_id=wacl_id,
visibility_config=visibility_config,
default_action=default_action,
description=description,
rules=rule_objs,
association_config=association_config,
captcha_config=captcha_config,
challenge_config=challenge_config,
custom_response_bodies=custom_response_bodies,
token_domains=token_domains,
)
self.wacls[arn] = new_wacl
self.tag_resource(arn, tags)
return new_wacl
def delete_web_acl(self, name: str, scope: str, _id: str, lock_token: str) -> None:
arn = make_arn_for_wacl(
name=name,
account_id=self.account_id,
region_name=self.region_name,
_id=_id,
scope=scope,
)
if arn not in self.wacls:
raise WAFNonexistentItemException
wacl = self.wacls[arn]
if wacl.lock_token != lock_token:
raise WAFOptimisticLockException()
self.wacls.pop(arn)
def get_web_acl(self, name: str, _id: str) -> FakeWebACL:
for wacl in self.wacls.values():
if wacl.name == name and wacl.id == _id:
return wacl
raise WAFNonexistentItemException
@paginate(PAGINATION_MODEL) # type: ignore
def list_web_acls(self) -> List[FakeWebACL]:
return [wacl for wacl in self.wacls.values()]
def _is_duplicate_name(self, name: str) -> bool:
all_wacl_names = set(wacl.name for wacl in self.wacls.values())
return name in all_wacl_names
@paginate(PAGINATION_MODEL) # type: ignore
def list_rule_groups(self, scope: str) -> List[FakeRuleGroup]:
rule_groups = [
group for group in self.rule_groups.values() if group.scope == scope
]
return rule_groups
@paginate(PAGINATION_MODEL) # type: ignore
def list_tags_for_resource(self, arn: str) -> List[Dict[str, str]]:
return self.tagging_service.list_tags_for_resource(arn)["Tags"]
def tag_resource(self, arn: str, tags: List[Dict[str, str]]) -> None:
self.tagging_service.tag_resource(arn, tags)
def untag_resource(self, arn: str, tag_keys: List[str]) -> None:
self.tagging_service.untag_resource_using_names(arn, tag_keys)
def update_web_acl(
self,
name: str,
_id: str,
default_action: Optional[Dict[str, Any]],
rules: Optional[List[Dict[str, Any]]],
description: Optional[str],
visibility_config: Optional[Dict[str, Any]],
lock_token: str,
custom_response_bodies: Optional[Dict[str, Any]],
captcha_config: Optional[Dict[str, Any]],
challenge_config: Optional[Dict[str, Any]],
token_domains: Optional[List[str]],
association_config: Optional[Dict[str, Any]],
) -> str:
acl = self.get_web_acl(name, _id)
if acl.lock_token != lock_token:
raise WAFOptimisticLockException()
rule_objs = (
[
FakeRule(**{camelcase_to_underscores(k): v for k, v in rule.items()})
for rule in rules
]
if rules
else None
)
acl.update(
default_action,
rule_objs,
description,
visibility_config,
custom_response_bodies,
captcha_config,
challenge_config,
token_domains,
association_config,
)
return acl.lock_token
def create_ip_set(
self,
name: str,
scope: str,
description: str,
ip_address_version: str,
addresses: List[str],
tags: List[Dict[str, str]],
) -> FakeIPSet:
ip_set_id = self._generate_id()
arn = make_arn_for_ip_set(
name=name,
account_id=self.account_id,
region_name=self.region_name,
_id=ip_set_id,
scope=scope,
)
new_ip_set = FakeIPSet(
arn,
ip_set_id,
ip_address_version,
addresses,
name,
description,
scope,
)
self.ip_sets[arn] = new_ip_set
self.tag_resource(arn, tags)
return new_ip_set
def delete_ip_set(self, name: str, scope: str, _id: str, lock_token: str) -> None:
arn = make_arn_for_ip_set(
name=name,
account_id=self.account_id,
region_name=self.region_name,
_id=_id,
scope=scope,
)
if arn not in self.ip_sets:
raise WAFNonexistentItemException()
if lock_token != self.ip_sets[arn].lock_token:
raise WAFOptimisticLockException()
self.ip_sets.pop(arn)
@paginate(PAGINATION_MODEL) # type: ignore
def list_ip_sets(self, scope: str) -> List[FakeIPSet]:
ip_sets = [
ip_set for arn, ip_set in self.ip_sets.items() if ip_set.scope == scope
]
return ip_sets
def get_ip_set(self, name: str, scope: str, _id: str) -> FakeIPSet:
arn = make_arn_for_ip_set(
name=name,
account_id=self.account_id,
region_name=self.region_name,
_id=_id,
scope=scope,
)
if arn not in self.ip_sets:
raise WAFNonexistentItemException()
return self.ip_sets[arn]
def update_ip_set(
self,
name: str,
scope: str,
_id: str,
description: Optional[str],
addresses: List[str],
lock_token: str,
) -> FakeIPSet:
arn = make_arn_for_ip_set(
name=name,
account_id=self.account_id,
region_name=self.region_name,
_id=_id,
scope=scope,
)
if not (ip_set := self.ip_sets.get(arn)):
raise WAFNonexistentItemException()
if ip_set.lock_token != lock_token:
raise WAFOptimisticLockException()
ip_set.update(description, addresses)
return ip_set
def put_logging_configuration(
self,
arn: str,
log_destination_configs: List[str],
redacted_fields: Optional[Dict[str, Any]],
managed_gy_firewall_manager: bool,
logging_filter: Dict[str, Any],
) -> FakeLoggingConfiguration:
logging_configuration = FakeLoggingConfiguration(
arn,
log_destination_configs,
redacted_fields,
managed_gy_firewall_manager,
logging_filter,
)
self.logging_configurations[arn] = logging_configuration
return logging_configuration
def delete_logging_configuration(self, arn: str) -> None:
if not self.logging_configurations.get(arn):
raise WAFNonexistentItemException()
self.logging_configurations.pop(arn)
def get_logging_configuration(self, arn: str) -> FakeLoggingConfiguration:
if not (logging_configuration := self.logging_configurations.get(arn)):
raise WAFNonexistentItemException()
return logging_configuration
@paginate(PAGINATION_MODEL) # type: ignore
def list_logging_configurations(self, scope: str) -> List[FakeLoggingConfiguration]:
if scope == "CLOUDFRONT":
scope = "global"
else:
scope = self.region_name
return [
logging_configuration
for arn, logging_configuration in self.logging_configurations.items()
if f":{scope}:" in arn
]
def create_rule_group(
self,
name: str,
scope: str,
capacity: int,
description: Optional[str],
rules: Optional[List[Dict[str, Any]]],
visibility_config: Dict[str, Union[bool, str]],
tags: Optional[List[Dict[str, str]]],
custom_response_bodies: Optional[Dict[str, str]],
) -> FakeRuleGroup:
id = self._generate_id()
arn = make_arn_for_rule_group(
name, self.account_id, self.region_name, id, scope
)
if name in set(group.name for group in self.rule_groups.values()):
raise WAFV2DuplicateItemException()
rules_objs = (
[
FakeRule(**{camelcase_to_underscores(k): v for k, v in rule.items()})
for rule in rules
]
if rules
else None
)
rule_group = FakeRuleGroup(
name=name,
account=self.account_id,
id=id,
arn=arn,
scope=scope,
capacity=capacity,
visibility_config=visibility_config,
description=description,
rules=rules_objs,
custom_response_bodies=custom_response_bodies,
)
self.rule_groups[arn] = rule_group
self.tagging_service.tag_resource(arn, tags)
return rule_group
def update_rule_group(
self,
name: str,
scope: str,
id: str,
description: Optional[str],
rules: Optional[List[Dict[str, Any]]],
visibility_config: Dict[str, Any],
lock_token: str,
custom_response_bodies: Optional[Dict[str, Any]],
) -> FakeRuleGroup:
arn = make_arn_for_rule_group(
name, self.account_id, self.region_name, id, scope
)
if not (group := self.rule_groups.get(arn)):
raise WAFNonexistentItemException()
if group.lock_token != lock_token:
raise WAFOptimisticLockException()
rules_objs = (
[
FakeRule(**{camelcase_to_underscores(k): v for k, v in rule.items()})
for rule in rules
]
if rules
else None
)
group.update(description, rules_objs, visibility_config, custom_response_bodies)
return group
def delete_rule_group(
self, name: str, scope: str, id: str, lock_token: str
) -> None:
arn = make_arn_for_rule_group(
name, self.account_id, self.region_name, id, scope
)
if not (group := self.rule_groups.get(arn)):
raise WAFNonexistentItemException()
if group.lock_token != lock_token:
raise WAFOptimisticLockException()
self.rule_groups.pop(arn)
return
def get_rule_group(
self,
name: Optional[str],
scope: Optional[str],
id: Optional[str],
arn: Optional[str],
) -> FakeRuleGroup:
if not arn and not (name and scope and id):
raise WAFV2InsufficientInformationException(name, scope, id, arn)
else:
arn = arn or make_arn_for_rule_group(
name, # type: ignore[arg-type]
self.account_id,
self.region_name,
id, # type: ignore[arg-type]
scope, # type: ignore[arg-type]
)
if not (group := self.rule_groups.get(arn)):
raise WAFNonexistentItemException()
return group
wafv2_backends = BackendDict(WAFV2Backend, "wafv2", additional_regions=PARTITION_NAMES)