import json
from moto.core.common_types import TYPE_RESPONSE
from moto.core.config import default_user_config
from moto.core.responses import BaseResponse
from moto.core.utils import iso_8601_datetime_with_milliseconds
from .exceptions import ValidationException
from .models import StepFunctionBackend, stepfunctions_backends
from .parser.api import ExecutionStatus
class StepFunctionResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="stepfunctions")
@property
def stepfunction_backend(self) -> StepFunctionBackend:
if default_user_config.get("stepfunctions", {}).get(
"execute_state_machine", False
):
from .parser.models import stepfunctions_parser_backends
return stepfunctions_parser_backends[self.current_account][self.region]
else:
return stepfunctions_backends[self.current_account][self.region]
def create_state_machine(self) -> TYPE_RESPONSE:
name = self._get_param("name")
definition = self._get_param("definition")
roleArn = self._get_param("roleArn")
tags = self._get_param("tags")
publish = self._get_param("publish")
encryptionConfiguration = self._get_param("encryptionConfiguration")
loggingConfiguration = self._get_param("loggingConfiguration")
tracingConfiguration = self._get_param("tracingConfiguration")
version_description = self._get_param("versionDescription")
if version_description and not publish:
raise ValidationException(
"Version description can only be set when publish is true"
)
state_machine = self.stepfunction_backend.create_state_machine(
name=name,
definition=definition,
roleArn=roleArn,
tags=tags,
publish=publish,
loggingConfiguration=loggingConfiguration,
tracingConfiguration=tracingConfiguration,
encryptionConfiguration=encryptionConfiguration,
version_description=version_description,
)
response = {
"creationDate": state_machine.creation_date,
"stateMachineArn": state_machine.arn,
}
if state_machine.latest_version:
response["stateMachineVersionArn"] = state_machine.latest_version.arn
return 200, {}, json.dumps(response)
def list_state_machines(self) -> TYPE_RESPONSE:
max_results = self._get_int_param("maxResults")
next_token = self._get_param("nextToken")
results, next_token = self.stepfunction_backend.list_state_machines(
max_results=max_results, next_token=next_token
)
state_machines = [
{
"creationDate": sm.creation_date,
"name": sm.name,
"stateMachineArn": sm.arn,
}
for sm in results
]
response = {"stateMachines": state_machines}
if next_token:
response["nextToken"] = next_token
return 200, {}, json.dumps(response)
def describe_state_machine(self) -> TYPE_RESPONSE:
arn = self._get_param("stateMachineArn")
return self._describe_state_machine(arn)
def _describe_state_machine(self, state_machine_arn: str) -> TYPE_RESPONSE:
state_machine = self.stepfunction_backend.describe_state_machine(
state_machine_arn
)
response = {
"creationDate": state_machine.creation_date,
"stateMachineArn": state_machine.arn,
"definition": state_machine.definition,
"name": state_machine.name,
"roleArn": state_machine.roleArn,
"status": "ACTIVE",
"type": state_machine.type,
"encryptionConfiguration": state_machine.encryptionConfiguration,
"tracingConfiguration": state_machine.tracingConfiguration,
"loggingConfiguration": state_machine.loggingConfiguration,
}
if state_machine.description:
response["description"] = state_machine.description
return 200, {}, json.dumps(response)
def delete_state_machine(self) -> TYPE_RESPONSE:
arn = self._get_param("stateMachineArn")
self.stepfunction_backend.delete_state_machine(arn)
return 200, {}, json.dumps("{}")
def update_state_machine(self) -> TYPE_RESPONSE:
arn = self._get_param("stateMachineArn")
definition = self._get_param("definition")
role_arn = self._get_param("roleArn")
tracing_config = self._get_param("tracingConfiguration")
encryption_config = self._get_param("encryptionConfiguration")
logging_config = self._get_param("loggingConfiguration")
publish = self._get_param("publish")
version_description = self._get_param("versionDescription")
if version_description and not publish:
raise ValidationException(
"Version description can only be set when publish is true"
)
state_machine = self.stepfunction_backend.update_state_machine(
arn=arn,
definition=definition,
role_arn=role_arn,
tracing_configuration=tracing_config,
encryption_configuration=encryption_config,
logging_configuration=logging_config,
publish=publish,
version_description=version_description,
)
response = {"updateDate": state_machine.update_date}
if publish:
response["stateMachineVersionArn"] = state_machine.latest_version.arn # type: ignore
return 200, {}, json.dumps(response)
def list_tags_for_resource(self) -> TYPE_RESPONSE:
arn = self._get_param("resourceArn")
tags = self.stepfunction_backend.list_tags_for_resource(arn)
response = {"tags": tags}
return 200, {}, json.dumps(response)
def tag_resource(self) -> TYPE_RESPONSE:
arn = self._get_param("resourceArn")
tags = self._get_param("tags", [])
self.stepfunction_backend.tag_resource(arn, tags)
return 200, {}, json.dumps({})
def untag_resource(self) -> TYPE_RESPONSE:
arn = self._get_param("resourceArn")
tag_keys = self._get_param("tagKeys", [])
self.stepfunction_backend.untag_resource(arn, tag_keys)
return 200, {}, json.dumps({})
def start_execution(self) -> TYPE_RESPONSE:
arn = self._get_param("stateMachineArn")
name = self._get_param("name")
execution_input = self._get_param("input", if_none="{}")
execution = self.stepfunction_backend.start_execution(
arn, name, execution_input
)
response = {
"executionArn": execution.execution_arn,
"startDate": iso_8601_datetime_with_milliseconds(execution.start_date),
}
return 200, {}, json.dumps(response)
def list_executions(self) -> TYPE_RESPONSE:
max_results = self._get_int_param("maxResults")
next_token = self._get_param("nextToken")
arn = self._get_param("stateMachineArn")
status_filter = self._get_param("statusFilter")
state_machine = self.stepfunction_backend.describe_state_machine(arn)
results, next_token = self.stepfunction_backend.list_executions(
arn,
status_filter=status_filter,
max_results=max_results,
next_token=next_token,
)
executions = []
for execution in results:
result = {
"executionArn": execution.execution_arn,
"name": execution.name,
"startDate": iso_8601_datetime_with_milliseconds(execution.start_date),
"stateMachineArn": state_machine.arn,
"status": execution.status,
}
if execution.status in [
ExecutionStatus.SUCCEEDED,
ExecutionStatus.FAILED,
ExecutionStatus.ABORTED,
]:
result["stopDate"] = iso_8601_datetime_with_milliseconds(
execution.stop_date
)
executions.append(result)
response = {"executions": executions}
if next_token:
response["nextToken"] = next_token
return 200, {}, json.dumps(response)
def describe_execution(self) -> TYPE_RESPONSE:
arn = self._get_param("executionArn")
execution = self.stepfunction_backend.describe_execution(arn)
response = {
"executionArn": arn,
"input": json.dumps(execution.execution_input),
"name": execution.name,
"startDate": iso_8601_datetime_with_milliseconds(execution.start_date),
"stateMachineArn": execution.state_machine_arn,
"status": execution.status,
}
if execution.status in [
ExecutionStatus.SUCCEEDED,
ExecutionStatus.ABORTED,
ExecutionStatus.FAILED,
]:
response["stopDate"] = iso_8601_datetime_with_milliseconds(
execution.stop_date
)
if execution.status in [
ExecutionStatus.SUCCEEDED,
ExecutionStatus.SUCCEEDED.value,
]:
if isinstance(execution.output, str):
response["output"] = execution.output
elif execution.output is not None:
response["output"] = json.dumps(execution.output)
response["outputDetails"] = execution.output_details
if execution.error is not None:
response["error"] = execution.error
if execution.cause is not None:
response["cause"] = execution.cause
return 200, {}, json.dumps(response)
def describe_state_machine_for_execution(self) -> TYPE_RESPONSE:
arn = self._get_param("executionArn")
sm = self.stepfunction_backend.describe_state_machine_for_execution(arn)
return self._describe_state_machine(sm.arn)
def stop_execution(self) -> TYPE_RESPONSE:
arn = self._get_param("executionArn")
execution = self.stepfunction_backend.stop_execution(arn)
response = {
"stopDate": iso_8601_datetime_with_milliseconds(execution.stop_date)
}
return 200, {}, json.dumps(response)
def get_execution_history(self) -> TYPE_RESPONSE:
execution_arn = self._get_param("executionArn")
execution_history = self.stepfunction_backend.get_execution_history(
execution_arn
)
return 200, {}, json.dumps(execution_history)
def send_task_failure(self) -> TYPE_RESPONSE:
task_token = self._get_param("taskToken")
error = self._get_param("error")
self.stepfunction_backend.send_task_failure(task_token, error=error)
return 200, {}, "{}"
def send_task_heartbeat(self) -> TYPE_RESPONSE:
task_token = self._get_param("taskToken")
self.stepfunction_backend.send_task_heartbeat(task_token)
return 200, {}, "{}"
def send_task_success(self) -> TYPE_RESPONSE:
task_token = self._get_param("taskToken")
output = self._get_param("output")
self.stepfunction_backend.send_task_success(task_token, output)
return 200, {}, "{}"
def list_map_runs(self) -> TYPE_RESPONSE:
execution_arn = self._get_param("executionArn")
runs = self.stepfunction_backend.list_map_runs(execution_arn)
return 200, {}, json.dumps(runs)
def describe_map_run(self) -> TYPE_RESPONSE:
map_run_arn = self._get_param("mapRunArn")
run = self.stepfunction_backend.describe_map_run(map_run_arn)
return 200, {}, json.dumps(run)
def update_map_run(self) -> TYPE_RESPONSE:
map_run_arn = self._get_param("mapRunArn")
max_concurrency = self._get_param("maxConcurrency")
tolerated_failure_count = self._get_param("toleratedFailureCount")
tolerated_failure_percentage = self._get_param("toleratedFailurePercentage")
self.stepfunction_backend.update_map_run(
map_run_arn,
max_concurrency,
tolerated_failure_count=tolerated_failure_count,
tolerated_failure_percentage=tolerated_failure_percentage,
)
return 200, {}, "{}"