import json from threading import Thread from typing import TYPE_CHECKING, Any, Dict, Optional from uuid import uuid4 from moto.core.utils import gzip_compress from moto.utilities.utils import get_partition if TYPE_CHECKING: from moto.dynamodb.models import Table from moto.s3.models import S3Backend class TableExport(Thread): def __init__( self, s3_bucket: str, s3_prefix: str, region_name: str, account_id: str, table: "Table", export_format: str, export_type: str, ): super().__init__() self.partition = get_partition(region_name) self.table = table self.arn = f"arn:{self.partition}:dynamodb:{region_name}:{account_id}:table/{table.table_arn}/import/{str(uuid4()).replace('-', '')}" self.s3_bucket = s3_bucket self.s3_prefix = s3_prefix self.status = "IN_PROGRESS" self.export_format = export_format self.export_type = export_type self.account_id = account_id self.region_name = region_name self.failure_code: Optional[str] = None self.failure_message: Optional[str] = None self.item_count = 0 self.processed_bytes = 0 self.error_count = 0 def run(self) -> None: try: from moto.s3.models import s3_backends s3_backend = s3_backends[self.account_id][self.partition] s3_backend.buckets[self.s3_bucket] except KeyError: self.status = "FAILED" self.failure_code = "S3NoSuchBucket" self.failure_message = "The specified bucket does not exist" return try: self._backup_to_s3_file(s3_backend) except Exception as e: self.status = "FAILED" self.failure_code = "UNKNOWN" self.failure_message = str(e) def _backup_to_s3_file(self, s3_backend: "S3Backend") -> None: backup = "" for item in self.table.all_items(): json_item = item.to_json(root_attr_name="Item") backup += json.dumps(json_item) + "\n" self.processed_bytes += len(json_item) self.item_count += 1 content = gzip_compress(backup.encode("utf-8")) s3_backend.put_object( bucket_name=self.s3_bucket, key_name=f"{self.s3_prefix}/AWSDynamoDB/{str(uuid4())}/data/{str(uuid4())}.gz", value=content, ) self.status = "COMPLETED" if self.error_count == 0 else "FAILED" def response(self) -> Dict[str, Any]: return { "ExportArn": self.arn, "ExportStatus": self.status, "FailureCode": self.failure_code, "FailureMessage": self.failure_message, "ExportFormat": self.export_format, "ExportType": self.export_type, "ItemCount": self.item_count, "BilledSizeBytes": self.processed_bytes, }
Memory