import datetime from os import getenv from time import sleep from typing import Any, Dict, List, Optional, Tuple from moto.batch.exceptions import ClientException from moto.batch.models import BatchBackend, Job, batch_backends from moto.core.base_backend import BackendDict class BatchSimpleBackend(BatchBackend): """ Implements a Batch-Backend that does not use Docker containers. Submitted Jobs are marked as Success by default. Set the environment variable MOTO_SIMPLE_BATCH_FAIL_AFTER=0 to fail jobs immediately, or set this variable to a positive integer to control after how many seconds the job fails. Annotate your tests with `@mock_aws(config={"batch": {"use_docker": False}})`-decorator to use this Batch-implementation. """ @property def backend(self) -> BatchBackend: return batch_backends[self.account_id][self.region_name] def __getattribute__(self, name: str) -> Any: """ Magic part that makes this class behave like a wrapper around the regular batch_backend We intercept calls to `submit_job` and replace this with our own (non-Docker) implementation Every other method call is send through to batch_backend """ if name in [ "backend", "account_id", "region_name", "urls", "_url_module", "__class__", "url_bases", ]: return object.__getattribute__(self, name) if name in ["submit_job", "_mark_job_as_finished"]: def newfunc(*args: Any, **kwargs: Any) -> Any: attr = object.__getattribute__(self, name) return attr(*args, **kwargs) return newfunc else: return object.__getattribute__(self.backend, name) def submit_job( self, job_name: str, job_def_id: str, job_queue: str, array_properties: Dict[str, Any], depends_on: Optional[List[Dict[str, str]]] = None, container_overrides: Optional[Dict[str, Any]] = None, timeout: Optional[Dict[str, int]] = None, parameters: Optional[Dict[str, str]] = None, ) -> Tuple[str, str, str]: # Look for job definition job_def = self.get_job_definition(job_def_id) if job_def is None: raise ClientException(f"Job definition {job_def_id} does not exist") queue = self.get_job_queue(job_queue) if queue is None: raise ClientException(f"Job queue {job_queue} does not exist") job = Job( job_name, job_def, queue, log_backend=self.logs_backend, container_overrides=container_overrides, depends_on=depends_on, all_jobs=self._jobs, timeout=timeout, array_properties=array_properties, parameters=parameters, ) if "size" in array_properties: child_jobs: List[Job] = [] for array_index in range(array_properties.get("size", 0)): provided_job_id = f"{job.job_id}:{array_index}" child_job = Job( job_name, job_def, queue, log_backend=self.logs_backend, container_overrides=container_overrides, depends_on=depends_on, all_jobs=self._jobs, timeout=timeout, array_properties={"statusSummary": {}, "index": array_index}, provided_job_id=provided_job_id, parameters=parameters, ) self._mark_job_as_finished(include_start_attempt=True, job=child_job) child_jobs.append(child_job) self._mark_job_as_finished(include_start_attempt=False, job=job) job._child_jobs = child_jobs else: self._mark_job_as_finished(include_start_attempt=True, job=job) return job_name, job.job_id, job.arn def _mark_job_as_finished(self, include_start_attempt: bool, job: Job) -> None: self.backend._jobs[job.job_id] = job job.job_started_at = datetime.datetime.now() job.log_stream_name = job._stream_name if include_start_attempt: job._start_attempt() # We don't want to actually run the job - just mark it as succeeded or failed # depending on whether env var MOTO_SIMPLE_BATCH_FAIL_AFTER is set # if MOTO_SIMPLE_BATCH_FAIL_AFTER is set to an integer then batch will # sleep this many seconds should_batch_fail = getenv("MOTO_SIMPLE_BATCH_FAIL_AFTER") if should_batch_fail: try: batch_fail_delay = int(should_batch_fail) sleep(batch_fail_delay) except ValueError: # Unable to parse value of MOTO_SIMPLE_BATCH_FAIL_AFTER as an integer pass # fail the job job._mark_stopped(success=False) else: job._mark_stopped(success=True) batch_simple_backends = BackendDict(BatchSimpleBackend, "batch")
Memory