import datetime from collections import deque from typing import Deque, Optional from moto.stepfunctions.parser.api import ( ActivityListItem, Arn, DescribeActivityOutput, Name, Timestamp, ) class ActivityTask: task_input: str task_token: str def __init__(self, task_token: str, task_input: str): self.task_token = task_token self.task_input = task_input class Activity: arn: Arn name: Name creation_date: Timestamp _tasks: Deque[ActivityTask] def __init__(self, arn: Arn, name: Name, creation_date: Optional[Timestamp] = None): self.arn = arn self.name = name self.creation_date = creation_date or datetime.datetime.now( tz=datetime.timezone.utc ) self._tasks = deque() def add_task(self, task: ActivityTask): self._tasks.append(task) def get_task(self) -> Optional[ActivityTask]: return self._tasks.popleft() def to_describe_activity_output(self) -> DescribeActivityOutput: return DescribeActivityOutput( activityArn=self.arn, name=self.name, creationDate=self.creation_date ) def to_activity_list_item(self) -> ActivityListItem: return ActivityListItem( activityArn=self.arn, name=self.name, creationDate=self.creation_date )
Memory