import os import string from typing import Any, List from urllib.parse import urlparse import yaml from moto.moto_api._internal import mock_random as random from moto.utilities.utils import get_partition from .exceptions import ValidationError def generate_stack_id(stack_name: str, region: str, account: str) -> str: random_id = random.uuid4() return f"arn:{get_partition(region)}:cloudformation:{region}:{account}:stack/{stack_name}/{random_id}" def generate_changeset_id( changeset_name: str, region_name: str, account_id: str ) -> str: random_id = random.uuid4() return f"arn:{get_partition(region_name)}:cloudformation:{region_name}:{account_id}:changeSet/{changeset_name}/{random_id}" def generate_stackset_id(stackset_name: str) -> str: random_id = random.uuid4() return f"{stackset_name}:{random_id}" def generate_stackset_arn(stackset_id: str, region_name: str, account_id: str) -> str: return f"arn:{get_partition(region_name)}:cloudformation:{region_name}:{account_id}:stackset/{stackset_id}" def random_suffix() -> str: size = 12 chars = list(range(10)) + list(string.ascii_uppercase) return "".join(str(random.choice(chars)) for x in range(size)) def yaml_tag_constructor(loader: Any, tag: Any, node: Any) -> Any: """convert shorthand intrinsic function to full name""" def _f(loader: Any, tag: Any, node: Any) -> Any: if tag == "!GetAtt": if isinstance(node.value, list): return node.value return node.value.split(".") elif type(node) == yaml.SequenceNode: return loader.construct_sequence(node) else: return node.value if tag == "!Ref": key = "Ref" else: key = f"Fn::{tag[1:]}" return {key: _f(loader, tag, node)} def validate_template_cfn_lint(template: str) -> List[Any]: # Importing cfnlint adds a significant overhead, so we keep it local try: # Compatibility for cfn-lint 0.x # Fail fast with `cfnlint.core.configure_logging` which is removed in cfn-lint 1.x from cfnlint.core import configure_logging, get_rules, run_checks from cfnlint.decode import decode # Save the template to a temporary file -- cfn-lint requires a file filename = "file.tmp" with open(filename, "w") as file: file.write(template) abs_filename = os.path.abspath(filename) # decode handles both yaml and json try: template, matches = decode(abs_filename, False) except TypeError: # As of cfn-lint 0.39.0, the second argument (ignore_bad_template) was dropped # https://github.com/aws-cloudformation/cfn-python-lint/pull/1580 template, matches = decode(abs_filename) # Set cfn-lint to info configure_logging(None) # Initialize the ruleset to be applied (no overrules, no excludes) rules = get_rules([], [], []) # Use us-east-1 region (spec file) for validation regions = ["us-east-1"] # Process all the rules and gather the errors return run_checks(abs_filename, template, rules, regions) except ImportError: # Compatibility for cfn-lint 1.x from cfnlint.api import lint from cfnlint.config import configure_logging from cfnlint.core import get_rules # Set cfn-lint to info configure_logging(None, False) # Initialize the ruleset to be applied (no overrules, no excludes) rules = get_rules([], [], []) # Use us-east-1 region (spec file) for validation regions = ["us-east-1"] # Process all the rules and gather the errors return lint(template, rules, regions) def get_stack_from_s3_url(template_url: str, account_id: str, partition: str) -> str: from moto.s3.models import s3_backends template_url_parts = urlparse(template_url) if "localhost" in template_url: bucket_name, key_name = template_url_parts.path.lstrip("/").split("/", 1) else: if template_url_parts.netloc.endswith( "amazonaws.com" ) and template_url_parts.netloc.startswith("s3"): # Handle when S3 url uses amazon url with bucket in path # Also handles getting region as technically s3 is region'd # region = template_url.netloc.split('.')[1] bucket_name, key_name = template_url_parts.path.lstrip("/").split("/", 1) else: bucket_name = template_url_parts.netloc.split(".")[0] key_name = template_url_parts.path.lstrip("/") key = s3_backends[account_id][partition].get_object(bucket_name, key_name) return key.value.decode("utf-8") # type: ignore[union-attr] def validate_create_change_set(change_set_name: str) -> None: if not (change_set_name and change_set_name[0].isalpha()): raise ValidationError(f"Invalid change set name: {change_set_name}") if not all(c.isalnum() or c == "-" for c in change_set_name): raise ValidationError(f"Invalid change set name: {change_set_name}") if len(change_set_name) > 128: raise ValidationError( f"Change set name exceeds 128 characters: {change_set_name}" ) # Additional validations can be added here later return
Memory