from threading import Thread from typing import TYPE_CHECKING, Any, Dict, List, Optional from uuid import uuid4 from moto.core.utils import gzip_decompress from moto.utilities.utils import get_partition if TYPE_CHECKING: from moto.dynamodb.models import DynamoDBBackend from moto.dynamodb.models.table import Table from moto.s3.models import FakeBucket, S3Backend class TableImport(Thread): def __init__( self, account_id: str, s3_source: Dict[str, str], region_name: str, table_name: str, billing_mode: str, throughput: Optional[Dict[str, int]], key_schema: List[Dict[str, str]], global_indexes: Optional[List[Dict[str, Any]]], attrs: List[Dict[str, str]], compression_type: Optional[str], ): super().__init__() self.partition = get_partition(region_name) self.arn = f"arn:{self.partition}:dynamodb:{region_name}:{account_id}:table/{table_name}/import/{str(uuid4()).replace('-', '')}" self.status = "IN_PROGRESS" self.account_id = account_id self.s3_source = s3_source self.region_name = region_name self.table_name = table_name self.billing_mode = billing_mode self.throughput = throughput self.key_schema = key_schema self.global_indexes = global_indexes self.attrs = attrs self.compression_type = compression_type self.failure_code: Optional[str] = None self.failure_message: Optional[str] = None self.table: Optional["Table"] = None self.table_arn = f"arn:{get_partition(self.region_name)}:dynamodb:{self.region_name}:{self.account_id}:table/{table_name}" self.processed_count = 0 self.processed_bytes = 0 self.error_count = 0 self.imported_count = 0 def run(self) -> None: s3_bucket_name = self.s3_source["S3Bucket"] try: from moto.s3.models import s3_backends s3_backend = s3_backends[self.account_id][self.partition] bucket = s3_backend.buckets[s3_bucket_name] except KeyError: self.status = "FAILED" self.failure_code = "S3NoSuchBucket" self.failure_message = "The specified bucket does not exist" return try: self._process_s3_files(s3_backend, bucket) except Exception as e: self.status = "FAILED" self.failure_code = "UNKNOWN" self.failure_message = str(e) def _process_s3_files(self, s3_backend: "S3Backend", bucket: "FakeBucket") -> None: # CREATE TABLE from moto.dynamodb.models import dynamodb_backends dynamo: DynamoDBBackend = dynamodb_backends[self.account_id][self.region_name] self.table = dynamo.create_table( name=self.table_name, billing_mode=self.billing_mode, throughput=self.throughput, schema=self.key_schema, global_indexes=self.global_indexes, indexes=None, attr=self.attrs, sse_specification=None, streams=None, tags=[], deletion_protection_enabled=False, ) # Load data from S3 keys, _, _, _ = s3_backend.list_objects( bucket, prefix=self.s3_source.get("S3KeyPrefix"), delimiter=None, marker=None, max_keys=None, ) from py_partiql_parser import JsonParser for key in keys: if self.compression_type == "GZIP": content = gzip_decompress(key.value).decode("utf-8") else: content = key.value.decode("utf-8") result = JsonParser.parse(original=content) for json_object in result: try: self.processed_count += 1 self.processed_bytes += len(json_object) self.table.put_item(item_attrs=json_object["Item"]) self.imported_count += 1 except Exception as e: self.failure_message = str(e) self.error_count += 1 self.status = "COMPLETED" if self.error_count == 0 else "FAILED" def response(self) -> Dict[str, Any]: return { "ImportArn": self.arn, "ImportStatus": self.status, "TableArn": self.table_arn, "FailureCode": self.failure_code, "FailureMessage": self.failure_message, "ProcessedItemCount": self.processed_count, "ProcessedSizeBytes": self.processed_bytes, "ErrorCount": self.error_count, "ImportedItemCount": self.imported_count, }
Memory