"""Handles incoming s3tables requests, invokes methods, returns responses.""" import json from typing import Any, Dict from urllib.parse import unquote from moto.core.common_types import TYPE_RESPONSE from moto.core.responses import BaseResponse from .models import S3TablesBackend, s3tables_backends class S3TablesResponse(BaseResponse): """Handler for S3Tables requests and responses.""" def __init__(self) -> None: super().__init__(service_name="s3tables") self.default_response_headers = {"Content-Type": "application/json"} @property def s3tables_backend(self) -> S3TablesBackend: """Return backend instance specific for this region.""" return s3tables_backends[self.current_account][self.region] def create_table_bucket(self) -> TYPE_RESPONSE: name = json.loads(self.body)["name"] bucket = self.s3tables_backend.create_table_bucket( name=name, ) return 200, self.default_response_headers, json.dumps(dict(arn=bucket.arn)) def list_table_buckets(self) -> TYPE_RESPONSE: params = self._get_params() prefix = params.get("prefix") continuation_token = params.get("continuationToken") max_buckets = params.get("maxBuckets") table_buckets, continuation_token = self.s3tables_backend.list_table_buckets( prefix=prefix, continuation_token=continuation_token, max_buckets=int(max_buckets) if max_buckets else None, ) body: Dict[str, Any] = { "tableBuckets": [ { "arn": b.arn, "name": b.name, "ownerAccountId": b.account_id, "createdAt": b.creation_date.isoformat(), } for b in table_buckets ] } if continuation_token: body.update(continuationToken=continuation_token) return 200, self.default_response_headers, json.dumps(body) def get_table_bucket(self) -> TYPE_RESPONSE: _, table_bucket_arn = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) bucket = self.s3tables_backend.get_table_bucket( table_bucket_arn=table_bucket_arn, ) return ( 200, self.default_response_headers, json.dumps( dict( arn=bucket.arn, name=bucket.name, ownerAccountId=bucket.account_id, createdAt=bucket.creation_date.isoformat(), ) ), ) def delete_table_bucket(self) -> TYPE_RESPONSE: _, table_bucket_arn = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) self.s3tables_backend.delete_table_bucket( table_bucket_arn=table_bucket_arn, ) return 204, {}, "" def create_namespace(self) -> TYPE_RESPONSE: _, table_bucket_arn = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) name = json.loads(self.body)["namespace"][0] namespace = self.s3tables_backend.create_namespace( table_bucket_arn=table_bucket_arn, namespace=name, ) return ( 200, self.default_response_headers, json.dumps( dict(tableBucketArn=table_bucket_arn, namespace=[namespace.name]) ), ) def list_namespaces(self) -> TYPE_RESPONSE: _, table_bucket_arn = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) params = self._get_params() continuation_token = params.get("continuationToken") max_namespaces = params.get("maxNamespaces") prefix = params.get("prefix") namespaces, continuation_token = self.s3tables_backend.list_namespaces( table_bucket_arn=table_bucket_arn, prefix=prefix, continuation_token=continuation_token, max_namespaces=int(max_namespaces) if max_namespaces else None, ) body: Dict[str, Any] = { "namespaces": [ { "namespace": [ns.name], "createdAt": ns.creation_date.isoformat(), "createdBy": ns.created_by, "ownerAccountId": ns.account_id, } for ns in namespaces ] } if continuation_token: body.update(continuationToken=continuation_token) return 200, self.default_response_headers, json.dumps(body) def get_namespace(self) -> TYPE_RESPONSE: _, table_bucket_arn, name = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) namespace = self.s3tables_backend.get_namespace( table_bucket_arn=table_bucket_arn, namespace=name, ) return ( 200, self.default_response_headers, json.dumps( dict( namespace=[namespace.name], createdAt=namespace.creation_date.isoformat(), createdBy=namespace.created_by, ownerAccountId=namespace.account_id, ) ), ) def delete_namespace(self) -> TYPE_RESPONSE: _, table_bucket_arn, namespace = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) self.s3tables_backend.delete_namespace( table_bucket_arn=table_bucket_arn, namespace=namespace, ) return 204, self.default_response_headers, "" def create_table(self) -> TYPE_RESPONSE: _, table_bucket_arn, namespace = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) body = json.loads(self.body) name = body["name"] format = body["format"] table = self.s3tables_backend.create_table( table_bucket_arn=table_bucket_arn, namespace=namespace, name=name, format=format, ) return ( 200, self.default_response_headers, json.dumps(dict(tableARN=table.arn, versionToken=table.version_token)), ) def get_table(self) -> TYPE_RESPONSE: _, table_bucket_arn, namespace, name = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) table = self.s3tables_backend.get_table( table_bucket_arn=table_bucket_arn, namespace=namespace, name=name, ) return ( 200, self.default_response_headers, json.dumps( dict( name=table.name, type=table.type, tableARN=table.arn, namespace=[namespace], versionToken=table.version_token, metadataLocation=table.metadata_location, warehouseLocation=table.warehouse_location, createdAt=table.creation_date.isoformat(), createdBy=table.account_id, managedByService=table.managed_by_service, modifiedAt=table.last_modified.isoformat(), modifiedBy=table.modified_by, ownerAccountId=table.account_id, format=table.format, ) ), ) def list_tables(self) -> TYPE_RESPONSE: _, table_bucket_arn = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) params = self._get_params() namespace = params.get("namespace") prefix = params.get("prefix") continuation_token = params.get("continuationToken") max_tables = params.get("maxTables") tables, continuation_token = self.s3tables_backend.list_tables( table_bucket_arn=table_bucket_arn, namespace=namespace, prefix=prefix, continuation_token=continuation_token, max_tables=int(max_tables) if max_tables else None, ) body: Dict[str, Any] = { "tables": [ { "namespace": [table.namespace], "name": table.name, "createdAt": table.creation_date.isoformat(), "modifiedAt": table.last_modified.isoformat(), } for table in tables ] } if continuation_token: body.update(continuationToken=continuation_token) return 200, self.default_response_headers, json.dumps(body) def delete_table(self) -> TYPE_RESPONSE: _, table_bucket_arn, namespace, name = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) params = self._get_params() version_token = params.get("versionToken") self.s3tables_backend.delete_table( table_bucket_arn=table_bucket_arn, namespace=namespace, name=name, version_token=version_token, ) return 204, {}, "" def get_table_metadata_location(self) -> TYPE_RESPONSE: _, table_bucket_arn, namespace, name, _ = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) table = self.s3tables_backend.get_table( table_bucket_arn=table_bucket_arn, namespace=namespace, name=name, ) return ( 200, self.default_response_headers, json.dumps( dict( versionToken=table.version_token, metadataLocation=table.metadata_location, warehouseLocation=table.warehouse_location, ) ), ) def update_table_metadata_location(self) -> TYPE_RESPONSE: _, table_bucket_arn, namespace, name, _ = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) body = json.loads(self.body) metadata_location = body["metadataLocation"] version_token = body["versionToken"] table = self.s3tables_backend.update_table_metadata_location( table_bucket_arn=table_bucket_arn, namespace=namespace, name=name, version_token=version_token, metadata_location=metadata_location, ) return ( 200, self.default_response_headers, json.dumps( dict( name=table.name, tableArn=table.arn, namespace=namespace, versionToken=table.version_token, metadataLocation=table.metadata_location, ) ), ) def rename_table(self) -> TYPE_RESPONSE: _, table_bucket_arn, namespace, name, _ = self.raw_path.lstrip("/").split("/") table_bucket_arn = unquote(table_bucket_arn) body = json.loads(self.body) version_token = body.get("versionToken") new_namespace_name = body.get("newNamespaceName") new_name = body.get("newName") self.s3tables_backend.rename_table( table_bucket_arn=table_bucket_arn, namespace=namespace, name=name, new_namespace_name=new_namespace_name, new_name=new_name, version_token=version_token, ) return 200, {}, ""
Memory