from time import sleep from typing import Any, Dict, List, Optional from moto.core.base_backend import BackendDict, BaseBackend from ..exceptions import ( SWFDomainAlreadyExistsFault, SWFDomainDeprecatedFault, SWFTypeAlreadyExistsFault, SWFTypeDeprecatedFault, SWFUnknownResourceFault, SWFValidationException, ) from .activity_task import ActivityTask # noqa from .activity_type import ActivityType # noqa from .decision_task import DecisionTask # noqa from .domain import Domain # noqa from .generic_type import GenericType, TGenericType # noqa from .history_event import HistoryEvent # noqa from .timeout import Timeout # noqa from .timer import Timer # noqa from .workflow_execution import WorkflowExecution # noqa from .workflow_type import WorkflowType # noqa KNOWN_SWF_TYPES = {"activity": ActivityType, "workflow": WorkflowType} class SWFBackend(BaseBackend): def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.domains: List[Domain] = [] def _get_domain(self, name: str, ignore_empty: bool = False) -> Domain: matching = [domain for domain in self.domains if domain.name == name] if not matching and not ignore_empty: raise SWFUnknownResourceFault("domain", name) if matching: return matching[0] return None # type: ignore def _process_timeouts(self) -> None: for domain in self.domains: for wfe in domain.workflow_executions: wfe._process_timeouts() def list_domains( self, status: str, reverse_order: Optional[bool] = None ) -> List[Domain]: domains = [domain for domain in self.domains if domain.status == status] domains = sorted(domains, key=lambda domain: domain.name) if reverse_order: domains = reversed(domains) # type: ignore[assignment] return domains def list_open_workflow_executions( self, domain_name: str, maximum_page_size: int, tag_filter: Dict[str, str], reverse_order: bool, ) -> List[WorkflowExecution]: self._process_timeouts() domain = self._get_domain(domain_name) if domain.status == "DEPRECATED": raise SWFDomainDeprecatedFault(domain_name) open_wfes = [ wfe for wfe in domain.workflow_executions if wfe.execution_status == "OPEN" ] if tag_filter: for open_wfe in open_wfes: if tag_filter["tag"] not in open_wfe.tag_list: open_wfes.remove(open_wfe) if reverse_order: open_wfes = reversed(open_wfes) # type: ignore[assignment] return open_wfes[0:maximum_page_size] def list_closed_workflow_executions( self, domain_name: str, tag_filter: Dict[str, str], close_status_filter: Dict[str, str], maximum_page_size: int, reverse_order: bool, ) -> List[WorkflowExecution]: self._process_timeouts() domain = self._get_domain(domain_name) if domain.status == "DEPRECATED": raise SWFDomainDeprecatedFault(domain_name) closed_wfes = [ wfe for wfe in domain.workflow_executions if wfe.execution_status == "CLOSED" ] if tag_filter: for closed_wfe in closed_wfes: if tag_filter["tag"] not in closed_wfe.tag_list: closed_wfes.remove(closed_wfe) if close_status_filter: for closed_wfe in closed_wfes: if close_status_filter != closed_wfe.close_status: # type: ignore closed_wfes.remove(closed_wfe) if reverse_order: closed_wfes = reversed(closed_wfes) # type: ignore[assignment] return closed_wfes[0:maximum_page_size] def register_domain( self, name: str, workflow_execution_retention_period_in_days: int, description: Optional[str] = None, ) -> None: if self._get_domain(name, ignore_empty=True): raise SWFDomainAlreadyExistsFault(name) domain = Domain( name, workflow_execution_retention_period_in_days, account_id=self.account_id, region_name=self.region_name, description=description, ) self.domains.append(domain) def deprecate_domain(self, name: str) -> None: domain = self._get_domain(name) if domain.status == "DEPRECATED": raise SWFDomainDeprecatedFault(name) domain.status = "DEPRECATED" def undeprecate_domain(self, name: str) -> None: domain = self._get_domain(name) if domain.status == "REGISTERED": raise SWFDomainAlreadyExistsFault(name) domain.status = "REGISTERED" def describe_domain(self, name: str) -> Optional[Domain]: return self._get_domain(name) def list_types( self, kind: str, domain_name: str, status: str, reverse_order: Optional[bool] = None, ) -> List[GenericType]: domain = self._get_domain(domain_name) _types: List[GenericType] = domain.find_types(kind, status) _types = sorted(_types, key=lambda domain: domain.name) if reverse_order: _types = reversed(_types) # type: ignore return _types def register_type( self, kind: str, domain_name: str, name: str, version: str, **kwargs: Any ) -> None: domain = self._get_domain(domain_name) _type: GenericType = domain.get_type(kind, name, version, ignore_empty=True) if _type: raise SWFTypeAlreadyExistsFault(_type) _class = KNOWN_SWF_TYPES[kind] _type = _class(name, version, **kwargs) domain.add_type(_type) def deprecate_type( self, kind: str, domain_name: str, name: str, version: str ) -> None: domain = self._get_domain(domain_name) _type: GenericType = domain.get_type(kind, name, version) if _type.status == "DEPRECATED": raise SWFTypeDeprecatedFault(_type) _type.status = "DEPRECATED" def undeprecate_type( self, kind: str, domain_name: str, name: str, version: str ) -> None: domain = self._get_domain(domain_name) _type: GenericType = domain.get_type(kind, name, version) if _type.status == "REGISTERED": raise SWFTypeAlreadyExistsFault(_type) _type.status = "REGISTERED" def describe_type( self, kind: str, domain_name: str, name: str, version: str ) -> GenericType: domain = self._get_domain(domain_name) return domain.get_type(kind, name, version) def start_workflow_execution( self, domain_name: str, workflow_id: str, workflow_name: str, workflow_version: str, tag_list: Optional[Dict[str, str]] = None, workflow_input: Optional[str] = None, **kwargs: Any, ) -> WorkflowExecution: domain = self._get_domain(domain_name) wf_type: WorkflowType = domain.get_type( "workflow", workflow_name, workflow_version ) # type: ignore if wf_type.status == "DEPRECATED": raise SWFTypeDeprecatedFault(wf_type) wfe = WorkflowExecution( domain, wf_type, workflow_id, tag_list=tag_list, workflow_input=workflow_input, **kwargs, ) domain.add_workflow_execution(wfe) wfe.start() return wfe def describe_workflow_execution( self, domain_name: str, run_id: str, workflow_id: str ) -> Optional[WorkflowExecution]: # process timeouts on all objects self._process_timeouts() domain = self._get_domain(domain_name) return domain.get_workflow_execution(workflow_id, run_id=run_id) def poll_for_decision_task( self, domain_name: str, task_list: List[str], identity: Optional[str] = None ) -> Optional[DecisionTask]: # process timeouts on all objects self._process_timeouts() domain = self._get_domain(domain_name) # Real SWF cases: # - case 1: there's a decision task to return, return it # - case 2: there's no decision task to return, so wait for timeout # and if a new decision is schedule, start and return it # - case 3: timeout reached, no decision, return an empty decision # (e.g. a decision with an empty "taskToken") # # For the sake of simplicity, we forget case 2 for now, so either # there's a DecisionTask to return, either we return a blank one. # # SWF client libraries should cope with that easily as long as tests # aren't distributed. # # TODO: handle long polling (case 2) for decision tasks candidates = [] # Collect candidate scheduled tasks from open workflow executions # matching the selected task list. # # If another decision task is already started, then no candidates # will be produced for that workflow execution. This is because only one # decision task can be started at any given time. # See https://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dev-tasks.html for wfe in domain.workflow_executions: if wfe.task_list == task_list and wfe.open: wfe_candidates = [] found_started = False for task in wfe.decision_tasks: if task.state == "STARTED": found_started = True break elif task.state == "SCHEDULED": wfe_candidates.append(task) if not found_started: candidates += wfe_candidates if any(candidates): # TODO: handle task priorities (but not supported by boto for now) task = min(candidates, key=lambda d: d.scheduled_at) wfe = task.workflow_execution wfe.start_decision_task(task.task_token, identity=identity) return task else: # Sleeping here will prevent clients that rely on the timeout from # entering in a busy waiting loop. sleep(1) return None def count_pending_decision_tasks( self, domain_name: str, task_list: List[str] ) -> int: # process timeouts on all objects self._process_timeouts() domain = self._get_domain(domain_name) count = 0 for wfe in domain.workflow_executions: if wfe.task_list == task_list: count += wfe.open_counts["openDecisionTasks"] return count def respond_decision_task_completed( self, task_token: str, decisions: Optional[List[Dict[str, Any]]] = None, execution_context: Optional[str] = None, ) -> None: # process timeouts on all objects self._process_timeouts() # let's find decision task decision_task = None for domain in self.domains: for wfe in domain.workflow_executions: for dt in wfe.decision_tasks: if dt.task_token == task_token: decision_task = dt # no decision task found if not decision_task: # In the real world, SWF distinguishes an obviously invalid token and a # token that has no corresponding decision task. For the latter it seems # to wait until a task with that token comes up (which looks like a smart # choice in an eventually-consistent system). The call doesn't seem to # timeout shortly, it takes 3 or 4 minutes to result in: # BotoServerError: 500 Internal Server Error # {"__type":"com.amazon.coral.service#InternalFailure"} # This behavior is not documented clearly in SWF docs and we'll ignore it # in moto, as there is no obvious reason to rely on it in tests. raise SWFValidationException("Invalid token") # decision task found, but WorflowExecution is CLOSED wfe = decision_task.workflow_execution if not wfe.open: raise SWFUnknownResourceFault( "execution", f"WorkflowExecution=[workflowId={wfe.workflow_id}, runId={wfe.run_id}]", ) # decision task found, but already completed if decision_task.state != "STARTED": if decision_task.state == "COMPLETED": raise SWFUnknownResourceFault( f"decision task, scheduledEventId = {decision_task.scheduled_event_id}" ) else: raise ValueError( "This shouldn't happen: you have to PollForDecisionTask to get a token, " "which changes DecisionTask status to 'STARTED' ; then it can only change " "to 'COMPLETED'. If you didn't hack moto/swf internals, this is probably " "a bug in moto, please report it, thanks!" ) # everything's good if decision_task: wfe = decision_task.workflow_execution wfe.complete_decision_task( decision_task.task_token, decisions=decisions, execution_context=execution_context, ) def poll_for_activity_task( self, domain_name: str, task_list: List[str], identity: Optional[str] = None ) -> Optional[ActivityTask]: # process timeouts on all objects self._process_timeouts() domain = self._get_domain(domain_name) # Real SWF cases: # - case 1: there's an activity task to return, return it # - case 2: there's no activity task to return, so wait for timeout # and if a new activity is scheduled, return it # - case 3: timeout reached, no activity task, return an empty response # (e.g. a response with an empty "taskToken") # # For the sake of simplicity, we forget case 2 for now, so either # there's an ActivityTask to return, either we return a blank one. # # SWF client libraries should cope with that easily as long as tests # aren't distributed. # # TODO: handle long polling (case 2) for activity tasks candidates = [] for _task_list, tasks in domain.activity_task_lists.items(): if _task_list == task_list: candidates += [t for t in tasks if t.state == "SCHEDULED"] if any(candidates): # TODO: handle task priorities (but not supported by boto for now) task = min(candidates, key=lambda d: d.scheduled_at) wfe = task.workflow_execution wfe.start_activity_task(task.task_token, identity=identity) return task else: # Sleeping here will prevent clients that rely on the timeout from # entering in a busy waiting loop. sleep(1) return None def count_pending_activity_tasks( self, domain_name: str, task_list: List[str] ) -> int: # process timeouts on all objects self._process_timeouts() domain = self._get_domain(domain_name) count = 0 for _task_list, tasks in domain.activity_task_lists.items(): if _task_list == task_list: pending = [t for t in tasks if t.state in ["SCHEDULED", "STARTED"]] count += len(pending) return count def _find_activity_task_from_token(self, task_token: str) -> ActivityTask: activity_task = None for domain in self.domains: for wfe in domain.workflow_executions: for task in wfe.activity_tasks: if task.task_token == task_token: activity_task = task # no task found if not activity_task: # Same as for decision tasks, we raise an invalid token BOTH for clearly # wrong SWF tokens and OK tokens but not used correctly. This should not # be a problem in moto. raise SWFValidationException("Invalid token") # activity task found, but WorflowExecution is CLOSED wfe = activity_task.workflow_execution if not wfe.open: raise SWFUnknownResourceFault( "execution", f"WorkflowExecution=[workflowId={wfe.workflow_id}, runId={wfe.run_id}]", ) # activity task found, but already completed if activity_task.state != "STARTED": if activity_task.state == "COMPLETED": raise SWFUnknownResourceFault( f"activity, scheduledEventId = {activity_task.scheduled_event_id}" ) else: raise ValueError( "This shouldn't happen: you have to PollForActivityTask to get a token, " "which changes ActivityTask status to 'STARTED' ; then it can only change " "to 'COMPLETED'. If you didn't hack moto/swf internals, this is probably " "a bug in moto, please report it, thanks!" ) # everything's good return activity_task def respond_activity_task_completed( self, task_token: str, result: Any = None ) -> None: # process timeouts on all objects self._process_timeouts() activity_task = self._find_activity_task_from_token(task_token) wfe = activity_task.workflow_execution wfe.complete_activity_task(activity_task.task_token, result=result) def respond_activity_task_failed( self, task_token: str, reason: Optional[str] = None, details: Any = None ) -> None: # process timeouts on all objects self._process_timeouts() activity_task = self._find_activity_task_from_token(task_token) wfe = activity_task.workflow_execution wfe.fail_activity_task(activity_task.task_token, reason=reason, details=details) def terminate_workflow_execution( self, domain_name: str, workflow_id: str, child_policy: Any = None, details: Any = None, reason: Optional[str] = None, run_id: Optional[str] = None, ) -> None: # process timeouts on all objects self._process_timeouts() domain = self._get_domain(domain_name) wfe = domain.get_workflow_execution( workflow_id, run_id=run_id, raise_if_closed=True ) wfe.terminate(child_policy=child_policy, details=details, reason=reason) # type: ignore[union-attr] def record_activity_task_heartbeat( self, task_token: str, details: Any = None ) -> None: # process timeouts on all objects self._process_timeouts() activity_task = self._find_activity_task_from_token(task_token) activity_task.reset_heartbeat_clock() if details: activity_task.details = details def signal_workflow_execution( self, domain_name: str, signal_name: str, workflow_id: str, workflow_input: Any = None, run_id: Optional[str] = None, ) -> None: # process timeouts on all objects self._process_timeouts() domain = self._get_domain(domain_name) wfe = domain.get_workflow_execution( workflow_id, run_id=run_id, raise_if_closed=True ) wfe.signal(signal_name, workflow_input) # type: ignore[union-attr] swf_backends = BackendDict(SWFBackend, "swf")
Memory