import json
from urllib.parse import unquote, urlsplit
from moto.core.models import default_user_config
from moto.core.responses import BaseResponse
from .models import BatchBackend, batch_backends
class BatchResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="batch")
@property
def batch_backend(self) -> BatchBackend:
if default_user_config.get("batch", {}).get("use_docker", True) is False:
from moto.batch_simple.models import batch_simple_backends
return batch_simple_backends[self.current_account][self.region]
else:
return batch_backends[self.current_account][self.region]
def _get_action(self) -> str:
# Return element after the /v1/*
return urlsplit(self.uri).path.lstrip("/").split("/")[1]
def createcomputeenvironment(self) -> str:
compute_env_name = self._get_param("computeEnvironmentName")
compute_resource = self._get_param("computeResources")
service_role = self._get_param("serviceRole")
state = self._get_param("state")
_type = self._get_param("type")
env = self.batch_backend.create_compute_environment(
compute_environment_name=compute_env_name,
_type=_type,
state=state,
compute_resources=compute_resource,
service_role=service_role,
)
result = {"computeEnvironmentArn": env.arn, "computeEnvironmentName": env.name}
return json.dumps(result)
def describecomputeenvironments(self) -> str:
compute_environments = self._get_param("computeEnvironments")
envs = self.batch_backend.describe_compute_environments(compute_environments)
result = {"computeEnvironments": envs}
return json.dumps(result)
def deletecomputeenvironment(self) -> str:
compute_environment = self._get_param("computeEnvironment")
self.batch_backend.delete_compute_environment(compute_environment)
return ""
def updatecomputeenvironment(self) -> str:
compute_env_name = self._get_param("computeEnvironment")
compute_resource = self._get_param("computeResources")
service_role = self._get_param("serviceRole")
state = self._get_param("state")
name, arn = self.batch_backend.update_compute_environment(
compute_environment_name=compute_env_name,
compute_resources=compute_resource,
service_role=service_role,
state=state,
)
result = {"computeEnvironmentArn": arn, "computeEnvironmentName": name}
return json.dumps(result)
def createjobqueue(self) -> str:
compute_env_order = self._get_param("computeEnvironmentOrder")
queue_name = self._get_param("jobQueueName")
schedule_policy = self._get_param("schedulingPolicyArn")
priority = self._get_param("priority")
state = self._get_param("state")
tags = self._get_param("tags")
queue = self.batch_backend.create_job_queue(
queue_name=queue_name,
priority=priority,
schedule_policy=schedule_policy,
state=state,
compute_env_order=compute_env_order,
tags=tags,
)
result = {"jobQueueArn": queue.arn, "jobQueueName": queue.name}
return json.dumps(result)
def describejobqueues(self) -> str:
job_queues = self._get_param("jobQueues")
queues = self.batch_backend.describe_job_queues(job_queues)
result = {"jobQueues": queues}
return json.dumps(result)
def updatejobqueue(self) -> str:
compute_env_order = self._get_param("computeEnvironmentOrder")
queue_name = self._get_param("jobQueue")
schedule_policy = self._get_param("schedulingPolicyArn")
priority = self._get_param("priority")
state = self._get_param("state")
name, arn = self.batch_backend.update_job_queue(
queue_name=queue_name,
priority=priority,
state=state,
compute_env_order=compute_env_order,
schedule_policy=schedule_policy,
)
result = {"jobQueueArn": arn, "jobQueueName": name}
return json.dumps(result)
def deletejobqueue(self) -> str:
queue_name = self._get_param("jobQueue")
self.batch_backend.delete_job_queue(queue_name)
return ""
def registerjobdefinition(self) -> str:
container_properties = self._get_param("containerProperties")
node_properties = self._get_param("nodeProperties")
def_name = self._get_param("jobDefinitionName")
parameters = self._get_param("parameters")
tags = self._get_param("tags")
retry_strategy = self._get_param("retryStrategy")
_type = self._get_param("type")
timeout = self._get_param("timeout")
platform_capabilities = self._get_param("platformCapabilities")
propagate_tags = self._get_param("propagateTags")
job_def = self.batch_backend.register_job_definition(
def_name=def_name,
parameters=parameters,
_type=_type,
tags=tags,
retry_strategy=retry_strategy,
container_properties=container_properties,
node_properties=node_properties,
timeout=timeout,
platform_capabilities=platform_capabilities,
propagate_tags=propagate_tags,
)
result = {
"jobDefinitionArn": job_def.arn,
"jobDefinitionName": job_def.name,
"revision": job_def.revision,
}
return json.dumps(result)
def deregisterjobdefinition(self) -> str:
queue_name = self._get_param("jobDefinition")
self.batch_backend.deregister_job_definition(queue_name)
return ""
def describejobdefinitions(self) -> str:
job_def_name = self._get_param("jobDefinitionName")
job_def_list = self._get_param("jobDefinitions")
status = self._get_param("status")
job_defs = self.batch_backend.describe_job_definitions(
job_def_name, job_def_list, status
)
result = {"jobDefinitions": [job.describe() for job in job_defs]}
return json.dumps(result)
def submitjob(self) -> str:
container_overrides = self._get_param("containerOverrides")
depends_on = self._get_param("dependsOn")
job_def = self._get_param("jobDefinition")
job_name = self._get_param("jobName")
job_queue = self._get_param("jobQueue")
timeout = self._get_param("timeout")
array_properties = self._get_param("arrayProperties", {})
parameters = self._get_param("parameters")
name, job_id, job_arn = self.batch_backend.submit_job(
job_name,
job_def,
job_queue,
depends_on=depends_on,
container_overrides=container_overrides,
timeout=timeout,
array_properties=array_properties,
parameters=parameters,
)
result = {"jobId": job_id, "jobName": name, "jobArn": job_arn}
return json.dumps(result)
def describejobs(self) -> str:
jobs = self._get_param("jobs")
return json.dumps({"jobs": self.batch_backend.describe_jobs(jobs)})
def listjobs(self) -> str:
job_queue = self._get_param("jobQueue")
job_status = self._get_param("jobStatus")
filters = self._get_param("filters")
array_job_id = self._get_param("arrayJobId")
jobs = self.batch_backend.list_jobs(
job_queue_name=job_queue,
array_job_id=array_job_id,
job_status=job_status,
filters=filters,
)
result = {"jobSummaryList": [job.describe_short() for job in jobs]}
return json.dumps(result)
def terminatejob(self) -> str:
job_id = self._get_param("jobId")
reason = self._get_param("reason")
self.batch_backend.terminate_job(job_id, reason)
return ""
def canceljob(self) -> str:
job_id = self._get_param("jobId")
reason = self._get_param("reason")
self.batch_backend.cancel_job(job_id, reason)
return ""
def tags(self) -> str:
resource_arn = unquote(self.path).split("/v1/tags/")[-1]
tags = self._get_param("tags")
if self.method == "POST":
self.batch_backend.tag_resource(resource_arn, tags)
if self.method == "GET":
tags = self.batch_backend.list_tags_for_resource(resource_arn)
return json.dumps({"tags": tags})
if self.method == "DELETE":
tag_keys = self.querystring.get("tagKeys")
self.batch_backend.untag_resource(resource_arn, tag_keys) # type: ignore[arg-type]
return ""
def createschedulingpolicy(self) -> str:
body = json.loads(self.body)
name = body.get("name")
fairshare_policy = body.get("fairsharePolicy") or {}
tags = body.get("tags")
policy = self.batch_backend.create_scheduling_policy(
name, fairshare_policy, tags
)
return json.dumps(policy.to_dict(create=True))
def describeschedulingpolicies(self) -> str:
body = json.loads(self.body)
arns = body.get("arns") or []
policies = self.batch_backend.describe_scheduling_policies(arns)
return json.dumps({"schedulingPolicies": [pol.to_dict() for pol in policies]})
def listschedulingpolicies(self) -> str:
arns = self.batch_backend.list_scheduling_policies()
return json.dumps({"schedulingPolicies": [{"arn": arn} for arn in arns]})
def deleteschedulingpolicy(self) -> str:
body = json.loads(self.body)
arn = body["arn"]
self.batch_backend.delete_scheduling_policy(arn)
return ""
def updateschedulingpolicy(self) -> str:
body = json.loads(self.body)
arn = body.get("arn")
fairshare_policy = body.get("fairsharePolicy") or {}
self.batch_backend.update_scheduling_policy(arn, fairshare_policy)
return ""