import json
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from .models import GLOBAL_REGION, WAFV2Backend, wafv2_backends
class WAFV2Response(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="wafv2")
@property
def wafv2_backend(self) -> WAFV2Backend:
return wafv2_backends[self.current_account][self.region]
def associate_web_acl(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
web_acl_arn = body["WebACLArn"]
resource_arn = body["ResourceArn"]
self.wafv2_backend.associate_web_acl(web_acl_arn, resource_arn)
return 200, {}, "{}"
def disassociate_web_acl(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
resource_arn = body["ResourceArn"]
self.wafv2_backend.disassociate_web_acl(resource_arn)
return 200, {}, "{}"
def get_web_acl_for_resource(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
resource_arn = body["ResourceArn"]
web_acl = self.wafv2_backend.get_web_acl_for_resource(resource_arn)
response = {"WebACL": web_acl.to_dict() if web_acl else None}
response_headers = {"Content-Type": "application/json"}
return 200, response_headers, json.dumps(response)
def create_web_acl(self) -> TYPE_RESPONSE:
"""https://docs.aws.amazon.com/waf/latest/APIReference/API_CreateWebACL.html (response syntax section)"""
scope = self._get_param("Scope")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
name = self._get_param("Name")
body = json.loads(self.body)
description = body.get("Description")
tags = body.get("Tags", [])
rules = body.get("Rules", [])
association_config = body.get("AssociationConfig")
captcha_config = body.get("CaptchaConfig")
challenge_config = body.get("ChallengeConfig")
custom_response_bodies = body.get("CustomResponseBodies")
token_domains = body.get("TokenDomains")
web_acl = self.wafv2_backend.create_web_acl(
name,
body["VisibilityConfig"],
body["DefaultAction"],
scope,
description,
tags,
rules,
association_config,
captcha_config,
challenge_config,
custom_response_bodies,
token_domains,
)
response = {"Summary": web_acl.to_short_dict()}
response_headers = {"Content-Type": "application/json"}
return 200, response_headers, json.dumps(response)
def delete_web_acl(self) -> TYPE_RESPONSE:
scope = self._get_param("Scope")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
name = self._get_param("Name")
_id = self._get_param("Id")
lock_token = self._get_param("LockToken")
self.wafv2_backend.delete_web_acl(name, scope, _id, lock_token)
response_headers = {"Content-Type": "application/json"}
return 200, response_headers, "{}"
def get_web_acl(self) -> TYPE_RESPONSE:
scope = self._get_param("Scope")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
name = self._get_param("Name")
_id = self._get_param("Id")
web_acl = self.wafv2_backend.get_web_acl(name, _id)
response = {"WebACL": web_acl.to_dict(), "LockToken": web_acl.lock_token}
response_headers = {"Content-Type": "application/json"}
return 200, response_headers, json.dumps(response)
def list_web_ac_ls(self) -> TYPE_RESPONSE:
"""https://docs.aws.amazon.com/waf/latest/APIReference/API_ListWebACLs.html (response syntax section)"""
scope = self._get_param("Scope")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
limit = self._get_int_param("Limit")
next_marker = self._get_param("NextMarker")
web_acls, next_marker = self.wafv2_backend.list_web_acls(
limit=limit, next_marker=next_marker
)
response = {
"NextMarker": next_marker,
"WebACLs": [web.to_short_dict() for web in web_acls],
}
response_headers = {"Content-Type": "application/json"}
return 200, response_headers, json.dumps(response)
def list_rule_groups(self) -> TYPE_RESPONSE:
scope = self._get_param("Scope")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
limit = self._get_int_param("Limit")
next_marker = self._get_param("NextMarker")
rule_groups, next_marker = self.wafv2_backend.list_rule_groups(
scope, limit=limit, next_marker=next_marker
)
response = {
"RuleGroups": [rg.to_short_dict() for rg in rule_groups],
"NextMarker": next_marker,
}
response_headers = {"Content-Type": "application/json"}
return 200, response_headers, json.dumps(response)
def list_tags_for_resource(self) -> TYPE_RESPONSE:
arn = self._get_param("ResourceARN")
# select correct backend - ARN region is not indicative by itself in case of WAF for Cloudfront
scope = "CLOUDFRONT" if arn.split(":")[5].startswith("global") else "REGIONAL"
self.region = GLOBAL_REGION if scope == "CLOUDFRONT" else arn.split(":")[3]
limit = self._get_int_param("Limit")
next_marker = self._get_param("NextMarker")
tags, next_marker = self.wafv2_backend.list_tags_for_resource(
arn, limit=limit, next_marker=next_marker
)
response = {
"TagInfoForResource": {"ResourceARN": arn, "TagList": tags},
"NextMarker": next_marker,
}
response_headers = {"Content-Type": "application/json"}
return 200, response_headers, json.dumps(response)
def tag_resource(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
arn = body.get("ResourceARN")
# select correct backend - ARN region is not indicative by itself in case of WAF for Cloudfront
scope = "CLOUDFRONT" if arn.split(":")[5].startswith("global") else "REGIONAL"
self.region = GLOBAL_REGION if scope == "CLOUDFRONT" else arn.split(":")[3]
tags = body.get("Tags")
self.wafv2_backend.tag_resource(arn, tags)
return 200, {}, "{}"
def untag_resource(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
arn = body.get("ResourceARN")
# select correct backend - ARN region is not indicative by itself in case of WAF for Cloudfront
scope = "CLOUDFRONT" if arn.split(":")[5].startswith("global") else "REGIONAL"
self.region = GLOBAL_REGION if scope == "CLOUDFRONT" else arn.split(":")[3]
tag_keys = body.get("TagKeys")
self.wafv2_backend.untag_resource(arn, tag_keys)
return 200, {}, "{}"
def update_web_acl(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
name = body.get("Name")
_id = body.get("Id")
scope = body.get("Scope")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
default_action = body.get("DefaultAction")
rules = body.get("Rules")
description = body.get("Description")
visibility_config = body.get("VisibilityConfig")
lock_token = body.get("LockToken")
custom_response_bodies = body.get("CustomResponseBodies")
captcha_config = body.get("CaptchaConfig")
challenge_config = body.get("ChallengeConfig")
token_domains = body.get("TokenDomains")
association_config = body.get("AssociationConfig")
new_lock_token = self.wafv2_backend.update_web_acl(
name,
_id,
default_action,
rules,
description,
visibility_config,
lock_token,
custom_response_bodies,
captcha_config,
challenge_config,
token_domains,
association_config,
)
return 200, {}, json.dumps({"NextLockToken": new_lock_token})
def create_ip_set(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
name = body.get("Name")
scope = body.get("Scope")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
description = body.get("Description")
ip_address_version = body.get("IPAddressVersion")
addresses = body.get("Addresses")
tags = body.get("Tags")
ip_set = self.wafv2_backend.create_ip_set(
name, scope, description, ip_address_version, addresses, tags
)
return (
200,
{},
json.dumps(
{
"Summary": {
"Name": ip_set.name,
"Id": ip_set.ip_set_id,
"Description": ip_set.description,
"LockToken": ip_set.lock_token,
"ARN": ip_set.arn,
}
}
),
)
def delete_ip_set(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
name = body.get("Name")
scope = body.get("Scope")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
_id = body.get("Id")
lock_token = body.get("LockToken")
self.wafv2_backend.delete_ip_set(name, scope, _id, lock_token)
return 200, {}, "{}"
def list_ip_sets(self) -> str:
scope = self._get_param("Scope")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
next_marker = self._get_param("NextMarker")
limit = self._get_int_param("Limit")
ip_sets, next_token = self.wafv2_backend.list_ip_sets(
scope, next_marker=next_marker, limit=limit
)
formatted_ip_sets = [
{
"Name": ip_set.name,
"Id": ip_set.ip_set_id,
"Description": ip_set.description,
"LockToken": ip_set.lock_token,
"ARN": ip_set.arn,
}
for ip_set in ip_sets
]
return json.dumps({"NextMarker": next_token, "IPSets": formatted_ip_sets})
def get_ip_set(self) -> TYPE_RESPONSE:
scope = self._get_param("Scope")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
ip_set = self.wafv2_backend.get_ip_set(
name=self._get_param("Name"), scope=scope, _id=self._get_param("Id")
)
dict_ip = ip_set.to_dict()
lock_token = dict_ip.pop("LockToken")
return 200, {}, json.dumps({"IPSet": dict_ip, "LockToken": lock_token})
def update_ip_set(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
name = body.get("Name")
scope = body.get("Scope")
_id = body.get("Id")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
description = body.get("Description")
addresses = body.get("Addresses")
lock_token = body.get("LockToken")
updated_ip_set = self.wafv2_backend.update_ip_set(
name, scope, _id, description, addresses, lock_token
)
return 200, {}, json.dumps({"NextLockToken": updated_ip_set.lock_token})
def put_logging_configuration(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
logging_configuration_parameter = body["LoggingConfiguration"]
resource_arn = logging_configuration_parameter["ResourceArn"]
log_destination_configs = logging_configuration_parameter[
"LogDestinationConfigs"
]
redacted_fields = logging_configuration_parameter.get("RedactedFields")
managed_by_firewall_manager = logging_configuration_parameter.get(
"ManagedByFirewallManager"
)
logging_filter = logging_configuration_parameter.get("LoggingFilter")
logging_configuration = self.wafv2_backend.put_logging_configuration(
resource_arn,
log_destination_configs,
redacted_fields,
managed_by_firewall_manager,
logging_filter,
)
return (
200,
{},
json.dumps(
{
"LoggingConfiguration": {
k: v
for k, v in logging_configuration.to_dict().items()
if v is not None
}
}
),
)
def get_logging_configuration(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
resource_arn = body["ResourceArn"]
logging_configuration = self.wafv2_backend.get_logging_configuration(
resource_arn
)
return (
200,
{},
json.dumps(
{
"LoggingConfiguration": {
k: v
for k, v in logging_configuration.to_dict().items()
if v is not None
}
}
),
)
def list_logging_configurations(self) -> str:
body = json.loads(self.body)
scope = body.get("Scope")
limit = self._get_int_param("Limit")
next_marker = self._get_param("NextMarker")
log_configs, next_marker = self.wafv2_backend.list_logging_configurations(
scope, limit=limit, next_marker=next_marker
)
formatted = [
{k: v for k, v in config.to_dict().items() if v is not None}
for config in log_configs
]
return json.dumps(
{"LoggingConfigurations": formatted, "NextMarker": next_marker}
)
def delete_logging_configuration(self) -> TYPE_RESPONSE:
body = json.loads(self.body)
resource_arn = body["ResourceArn"]
self.wafv2_backend.delete_logging_configuration(resource_arn)
return 200, {}, "{}"
def create_rule_group(self) -> TYPE_RESPONSE:
name = self._get_param("Name")
scope = self._get_param("Scope")
capacity = self._get_param("Capacity")
description = self._get_param("Description")
rules = self._get_param("Rules", [])
visibility_config = self._get_param("VisibilityConfig")
tags = self._get_param("Tags")
custom_response_bodies = self._get_param("CustomResponseBodies")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
group = self.wafv2_backend.create_rule_group(
name=name,
scope=scope,
capacity=capacity,
description=description,
rules=rules,
visibility_config=visibility_config,
tags=tags,
custom_response_bodies=custom_response_bodies,
)
return 200, {}, json.dumps(dict(Summary=group.to_short_dict()))
def update_rule_group(self) -> TYPE_RESPONSE:
name = self._get_param("Name")
scope = self._get_param("Scope")
id = self._get_param("Id")
description = self._get_param("Description")
rules = self._get_param("Rules")
visibility_config = self._get_param("VisibilityConfig")
lock_token = self._get_param("LockToken")
custom_response_bodies = self._get_param("CustomResponseBodies")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
updated_group = self.wafv2_backend.update_rule_group(
name=name,
scope=scope,
id=id,
description=description,
rules=rules,
visibility_config=visibility_config,
lock_token=lock_token,
custom_response_bodies=custom_response_bodies,
)
return 200, {}, json.dumps(dict(NextLockToken=updated_group.lock_token))
def delete_rule_group(self) -> TYPE_RESPONSE:
name = self._get_param("Name")
scope = self._get_param("Scope")
id = self._get_param("Id")
lock_token = self._get_param("LockToken")
if scope == "CLOUDFRONT":
self.region = GLOBAL_REGION
self.wafv2_backend.delete_rule_group(
name=name,
scope=scope,
id=id,
lock_token=lock_token,
)
return 200, {}, json.dumps(dict())
def get_rule_group(self) -> TYPE_RESPONSE:
name = self._get_param("Name")
scope = self._get_param("Scope")
id = self._get_param("Id")
arn = self._get_param("ARN")
if scope == "CLOUDFRONT" or (
isinstance(arn, str) and arn.split(":")[3] == GLOBAL_REGION
):
self.region = GLOBAL_REGION
rule_group = self.wafv2_backend.get_rule_group(
name=name,
scope=scope,
id=id,
arn=arn,
)
group_dict = rule_group.to_dict()
lock_token = group_dict.pop("LockToken")
return 200, {}, json.dumps(dict(RuleGroup=group_dict, LockToken=lock_token))
# notes about region and scope
# --scope = CLOUDFRONT is ALWAYS us-east-1 (but we use "global" instead to differentiate between REGIONAL us-east-1)
# --scope = REGIONAL defaults to us-east-1, but could be anything if specified with --region=<anyRegion>
# region is grabbed from the auth header, NOT from the body - even with --region flag
# The CLOUDFRONT wacls in aws console are located in us-east-1 but the us-east-1 REGIONAL wacls are not included