import binascii
import struct
from typing import Any, Dict, List, Optional, Tuple
def parse_query(text_input: str, query: str) -> Tuple[List[Dict[str, Any]], int]:
from py_partiql_parser import S3SelectParser
parser = S3SelectParser(source_data=text_input)
result = parser.parse(query)
return result, parser.bytes_scanned
def _create_header(key: bytes, value: bytes) -> bytes:
return struct.pack("b", len(key)) + key + struct.pack("!bh", 7, len(value)) + value
def _create_message(
content_type: Optional[bytes], event_type: bytes, payload: bytes
) -> bytes:
headers = _create_header(b":message-type", b"event")
headers += _create_header(b":event-type", event_type)
if content_type is not None:
headers += _create_header(b":content-type", content_type)
headers_length = struct.pack("!I", len(headers))
total_length = struct.pack("!I", len(payload) + len(headers) + 16)
prelude = total_length + headers_length
prelude_crc = struct.pack("!I", binascii.crc32(total_length + headers_length))
message_crc = struct.pack(
"!I", binascii.crc32(prelude + prelude_crc + headers + payload)
)
return prelude + prelude_crc + headers + payload + message_crc
def _create_stats_message(bytes_scanned: int, bytes_returned: int) -> bytes:
stats = f"<Stats><BytesScanned>{bytes_scanned}</BytesScanned><BytesProcessed>{bytes_scanned}</BytesProcessed><BytesReturned>{bytes_returned}</BytesReturned></Stats>"
return _create_message(
content_type=b"text/xml", event_type=b"Stats", payload=stats.encode("utf-8")
)
def _create_data_message(payload: bytes) -> bytes:
# https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
return _create_message(
content_type=b"application/octet-stream", event_type=b"Records", payload=payload
)
def _create_end_message() -> bytes:
return _create_message(content_type=None, event_type=b"End", payload=b"")
def serialize_select(data_list: List[bytes], bytes_scanned: int) -> bytes:
bytes_returned = 0
all_data = b""
for data in data_list:
bytes_returned += len(data)
all_data += data
response = _create_data_message(all_data)
return (
response
+ _create_stats_message(bytes_scanned, bytes_returned)
+ _create_end_message()
)