import json
from typing import Any
from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from .models import GlacierBackend, glacier_backends
from .utils import vault_from_glacier_url
class GlacierResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="glacier")
def setup_class(
self, request: Any, full_url: str, headers: Any, use_raw_body: bool = False
) -> None:
super().setup_class(request, full_url, headers, use_raw_body=True)
@property
def glacier_backend(self) -> GlacierBackend:
return glacier_backends[self.current_account][self.region]
def list_vaults(self) -> TYPE_RESPONSE:
vaults = self.glacier_backend.list_vaults()
response = json.dumps(
{"Marker": None, "VaultList": [vault.to_dict() for vault in vaults]}
)
headers = {"content-type": "application/json"}
return 200, headers, response
def describe_vault(self) -> TYPE_RESPONSE:
vault_name = vault_from_glacier_url(self.uri)
vault = self.glacier_backend.describe_vault(vault_name)
headers = {"content-type": "application/json"}
return 200, headers, json.dumps(vault.to_dict())
def create_vault(self) -> TYPE_RESPONSE:
vault_name = vault_from_glacier_url(self.uri)
self.glacier_backend.create_vault(vault_name)
return 201, {"status": 201}, ""
def delete_vault(self) -> TYPE_RESPONSE:
vault_name = vault_from_glacier_url(self.uri)
self.glacier_backend.delete_vault(vault_name)
return 204, {"status": 204}, ""
def upload_archive(self) -> TYPE_RESPONSE:
description = self.headers.get("x-amz-archive-description") or ""
vault_name = self.parsed_url.path.split("/")[-2]
vault = self.glacier_backend.upload_archive(vault_name, self.body, description)
headers = {
"x-amz-archive-id": vault["archive_id"],
"x-amz-sha256-tree-hash": vault["sha256"],
"status": 201,
}
return 201, headers, ""
def delete_archive(self) -> TYPE_RESPONSE:
vault_name = self.parsed_url.path.split("/")[-3]
archive_id = self.parsed_url.path.split("/")[-1]
self.glacier_backend.delete_archive(vault_name, archive_id)
return 204, {"status": 204}, ""
def list_jobs(self) -> TYPE_RESPONSE:
vault_name = self.parsed_url.path.split("/")[-2]
jobs = self.glacier_backend.list_jobs(vault_name)
headers = {"content-type": "application/json"}
response = json.dumps(
{"JobList": [job.to_dict() for job in jobs], "Marker": None}
)
return 200, headers, response
def initiate_job(self) -> TYPE_RESPONSE:
account_id = self.uri.split("/")[1]
vault_name = self.parsed_url.path.split("/")[-2]
json_body = json.loads(self.body.decode("utf-8"))
job_type = json_body["Type"]
archive_id = json_body.get("ArchiveId")
tier = json_body.get("Tier") or "Standard"
job_id = self.glacier_backend.initiate_job(
vault_name, job_type, tier, archive_id
)
headers = {
"x-amz-job-id": job_id,
"Location": f"/{account_id}/vaults/{vault_name}/jobs/{job_id}",
"status": 202,
}
return 202, headers, ""
def describe_job(self) -> str:
vault_name = self.parsed_url.path.split("/")[-3]
archive_id = self.parsed_url.path.split("/")[-1]
job = self.glacier_backend.describe_job(vault_name, archive_id)
return json.dumps(job.to_dict()) # type: ignore
def get_job_output(self) -> TYPE_RESPONSE:
vault_name = self.parsed_url.path.split("/")[-4]
job_id = self.parsed_url.path.split("/")[-2]
output = self.glacier_backend.get_job_output(vault_name, job_id)
if output is None:
return 404, {"status": 404}, "404 Not Found"
if isinstance(output, dict):
headers = {"content-type": "application/json"}
return 200, headers, json.dumps(output)
else:
headers = {"content-type": "application/octet-stream"}
return 200, headers, output