import copy import time from base64 import b64encode from datetime import datetime from moto.core.responses import ActionResult, BaseResponse from .models import ECRBackend, ecr_backends class ECRResponse(BaseResponse): def __init__(self) -> None: super().__init__(service_name="ecr") @property def ecr_backend(self) -> ECRBackend: return ecr_backends[self.current_account][self.region] def create_repository(self) -> ActionResult: repository_name = self._get_param("repositoryName") registry_id = self._get_param("registryId") encryption_config = self._get_param("encryptionConfiguration") image_scan_config = self._get_param("imageScanningConfiguration") image_tag_mutablility = self._get_param("imageTagMutability") tags = self._get_param("tags", []) repository = self.ecr_backend.create_repository( repository_name=repository_name, registry_id=registry_id, encryption_config=encryption_config, image_scan_config=image_scan_config, image_tag_mutablility=image_tag_mutablility, tags=tags, ) return ActionResult({"repository": repository}) def describe_repositories(self) -> ActionResult: describe_repositories_name = self._get_param("repositoryNames") registry_id = self._get_param("registryId") repositories = self.ecr_backend.describe_repositories( repository_names=describe_repositories_name, registry_id=registry_id ) return ActionResult({"repositories": repositories, "failures": []}) def delete_repository(self) -> ActionResult: repository_str = self._get_param("repositoryName") registry_id = self._get_param("registryId") force = self._get_param("force") repository = self.ecr_backend.delete_repository( repository_str, registry_id, force ) return ActionResult({"repository": repository}) def put_image(self) -> ActionResult: repository_str = self._get_param("repositoryName") image_manifest = self._get_param("imageManifest") image_tag = self._get_param("imageTag") image_manifest_media_type = self._get_param("imageManifestMediaType") digest = self._get_param("imageDigest") image = self.ecr_backend.put_image( repository_str, image_manifest, image_tag, image_manifest_media_type, digest ) return ActionResult({"image": image}) def list_images(self) -> ActionResult: repository_str = self._get_param("repositoryName") registry_id = self._get_param("registryId") images = self.ecr_backend.list_images(repository_str, registry_id) resp = [] for image in images: for tag in image.image_tags or [None]: # type: ignore resp.append({"imageDigest": image.image_digest, "imageTag": tag}) return ActionResult({"imageIds": resp}) def describe_images(self) -> ActionResult: repository_str = self._get_param("repositoryName") registry_id = self._get_param("registryId") image_ids = self._get_param("imageIds") images = self.ecr_backend.describe_images( repository_str, registry_id, image_ids ) # We have to do a bit of manipulation here because a real AWS ECR backend # does not serialize `imageTags` if it's an empty array (and we already # have a test that asserts this behavior). dto_images = [] for image in images: dto_image = copy.copy(image) if not dto_image.image_tags: del dto_image.image_tags dto_images.append(dto_image) return ActionResult({"imageDetails": dto_images}) def batch_check_layer_availability(self) -> None: self.error_on_dryrun() raise NotImplementedError( "ECR.batch_check_layer_availability is not yet implemented" ) def batch_delete_image(self) -> ActionResult: repository_str = self._get_param("repositoryName") registry_id = self._get_param("registryId") image_ids = self._get_param("imageIds") response = self.ecr_backend.batch_delete_image( repository_str, registry_id, image_ids ) return ActionResult(response) def batch_get_image(self) -> ActionResult: repository_str = self._get_param("repositoryName") registry_id = self._get_param("registryId") image_ids = self._get_param("imageIds") response = self.ecr_backend.batch_get_image( repository_str, registry_id, image_ids ) return ActionResult(response) def batch_get_repository_scanning_configuration(self) -> ActionResult: names = self._get_param("repositoryNames") configs, missing = self.ecr_backend.batch_get_repository_scanning_configuration( names ) return ActionResult( { "scanningConfigurations": configs, "failures": [ { "repositoryName": m, "failureCode": "REPOSITORY_NOT_FOUND", "failureReason": "REPOSITORY_NOT_FOUND", } for m in missing ], } ) def complete_layer_upload(self) -> None: self.error_on_dryrun() raise NotImplementedError("ECR.complete_layer_upload is not yet implemented") def delete_repository_policy(self) -> ActionResult: registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") return ActionResult( self.ecr_backend.delete_repository_policy( registry_id=registry_id, repository_name=repository_name ) ) def get_authorization_token(self) -> ActionResult: registry_ids = self._get_param("registryIds") if not registry_ids: registry_ids = [self.current_account] auth_data = [] for registry_id in registry_ids: password = f"{registry_id}-auth-token" auth_token = b64encode(f"AWS:{password}".encode("ascii")).decode() auth_data.append( { "authorizationToken": auth_token, "expiresAt": time.mktime(datetime(2015, 1, 1).timetuple()), "proxyEndpoint": f"https://{registry_id}.dkr.ecr.{self.region}.amazonaws.com", } ) return ActionResult({"authorizationData": auth_data}) def get_download_url_for_layer(self) -> None: self.error_on_dryrun() raise NotImplementedError( "ECR.get_download_url_for_layer is not yet implemented" ) def get_repository_policy(self) -> ActionResult: registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") return ActionResult( self.ecr_backend.get_repository_policy( registry_id=registry_id, repository_name=repository_name ) ) def initiate_layer_upload(self) -> None: self.error_on_dryrun() raise NotImplementedError("ECR.initiate_layer_upload is not yet implemented") def set_repository_policy(self) -> ActionResult: registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") policy_text = self._get_param("policyText") # this is usually a safety flag to prevent accidental repository lock outs # but this would need a much deeper validation of the provided policy # force = self._get_param("force") return ActionResult( self.ecr_backend.set_repository_policy( registry_id=registry_id, repository_name=repository_name, policy_text=policy_text, ) ) def upload_layer_part(self) -> None: self.error_on_dryrun() raise NotImplementedError("ECR.upload_layer_part is not yet implemented") def list_tags_for_resource(self) -> ActionResult: arn = self._get_param("resourceArn") return ActionResult(self.ecr_backend.list_tags_for_resource(arn)) def tag_resource(self) -> ActionResult: arn = self._get_param("resourceArn") tags = self._get_param("tags", []) self.ecr_backend.tag_resource(arn, tags) return ActionResult({}) def untag_resource(self) -> ActionResult: arn = self._get_param("resourceArn") tag_keys = self._get_param("tagKeys", []) self.ecr_backend.untag_resource(arn, tag_keys) return ActionResult({}) def put_image_tag_mutability(self) -> ActionResult: registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") image_tag_mutability = self._get_param("imageTagMutability") return ActionResult( self.ecr_backend.put_image_tag_mutability( registry_id=registry_id, repository_name=repository_name, image_tag_mutability=image_tag_mutability, ) ) def put_image_scanning_configuration(self) -> ActionResult: registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") image_scan_config = self._get_param("imageScanningConfiguration") return ActionResult( self.ecr_backend.put_image_scanning_configuration( registry_id=registry_id, repository_name=repository_name, image_scan_config=image_scan_config, ) ) def put_lifecycle_policy(self) -> ActionResult: registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") lifecycle_policy_text = self._get_param("lifecyclePolicyText") return ActionResult( self.ecr_backend.put_lifecycle_policy( registry_id=registry_id, repository_name=repository_name, lifecycle_policy_text=lifecycle_policy_text, ) ) def get_lifecycle_policy(self) -> ActionResult: registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") return ActionResult( self.ecr_backend.get_lifecycle_policy( registry_id=registry_id, repository_name=repository_name ) ) def delete_lifecycle_policy(self) -> ActionResult: registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") return ActionResult( self.ecr_backend.delete_lifecycle_policy( registry_id=registry_id, repository_name=repository_name ) ) def put_registry_policy(self) -> ActionResult: policy_text = self._get_param("policyText") return ActionResult( self.ecr_backend.put_registry_policy(policy_text=policy_text) ) def get_registry_policy(self) -> ActionResult: return ActionResult(self.ecr_backend.get_registry_policy()) def delete_registry_policy(self) -> ActionResult: return ActionResult(self.ecr_backend.delete_registry_policy()) def start_image_scan(self) -> ActionResult: registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") image_id = self._get_param("imageId") return ActionResult( self.ecr_backend.start_image_scan( registry_id=registry_id, repository_name=repository_name, image_id=image_id, ) ) def describe_image_scan_findings(self) -> ActionResult: registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") image_id = self._get_param("imageId") return ActionResult( self.ecr_backend.describe_image_scan_findings( registry_id=registry_id, repository_name=repository_name, image_id=image_id, ) ) def put_replication_configuration(self) -> ActionResult: replication_config = self._get_param("replicationConfiguration") return ActionResult( self.ecr_backend.put_replication_configuration( replication_config=replication_config ) ) def get_registry_scanning_configuration(self) -> ActionResult: registry_scanning_config = ( self.ecr_backend.get_registry_scanning_configuration() ) return ActionResult( { "registryId": self.current_account, "scanningConfiguration": registry_scanning_config, } ) def put_registry_scanning_configuration(self) -> ActionResult: scan_type = self._get_param("scanType") rules = self._get_param("rules") self.ecr_backend.put_registry_scanning_configuration(scan_type, rules) return ActionResult( {"registryScanningConfiguration": {"scanType": scan_type, "rules": rules}} ) def describe_registry(self) -> ActionResult: return ActionResult(self.ecr_backend.describe_registry())
Memory