from moto.core.responses import ActionResult, BaseResponse from .models import DynamoDBStreamsBackend, dynamodbstreams_backends class DynamoDBStreamsHandler(BaseResponse): def __init__(self) -> None: super().__init__(service_name="dynamodbstreams") @property def backend(self) -> DynamoDBStreamsBackend: return dynamodbstreams_backends[self.current_account][self.region] def describe_stream(self) -> ActionResult: arn = self._get_param("StreamArn") stream = self.backend.describe_stream(arn) return ActionResult({"StreamDescription": stream}) def list_streams(self) -> ActionResult: table_name = self._get_param("TableName") streams = self.backend.list_streams(table_name) return ActionResult({"Streams": streams}) def get_shard_iterator(self) -> ActionResult: arn = self._get_param("StreamArn") shard_id = self._get_param("ShardId") shard_iterator_type = self._get_param("ShardIteratorType") sequence_number = self._get_param("SequenceNumber") # according to documentation sequence_number param should be string if isinstance(sequence_number, str): sequence_number = int(sequence_number) iterator = self.backend.get_shard_iterator( arn, shard_id, shard_iterator_type, sequence_number ) return ActionResult({"ShardIterator": iterator.arn}) def get_records(self) -> ActionResult: arn = self._get_param("ShardIterator") limit = self._get_param("Limit") if limit is None: limit = 1000 result = self.backend.get_records(arn, limit) return ActionResult(result)
Memory