import re from typing import Dict, Any, List, Optional, Tuple from ..exceptions import ParserException from .delete_parser import DeleteParser from .from_parser import DynamoDBFromParser, S3FromParser, FromParser from .insert_parser import InsertParser from .json_parser import JsonParser from .select_parser import DynamoDBSelectParser, S3SelectClauseParser from .update_parser import UpdateParser from .where_parser import DynamoDBWhereParser, S3WhereParser, WhereParser from .utils import QueryMetadata, CaseInsensitiveDict TYPE_RESPONSE = Tuple[ List[Dict[str, Any]], Dict[str, List[Tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]]]]], ] class S3SelectParser: def __init__(self, source_data: str): # Source data is one or more json documents self.documents = source_data self.table_prefix = "s3object" self.bytes_scanned = 0 def parse(self, query: str) -> List[Dict[str, Any]]: query = query.replace("\n", " ") clauses = re.split("SELECT | FROM | WHERE ", query, flags=re.IGNORECASE) # First clause is whatever comes in front of SELECT - which should be nothing _ = clauses[0] from_parser = S3FromParser(from_clause=clauses[2]) table_prefix = self.table_prefix for alias_key, alias_value in from_parser.clauses.items(): if table_prefix == alias_value or f"{table_prefix}[*]" == alias_value: table_prefix = alias_key results: List[Any] = [] for doc, tokens_parsed in JsonParser.parse_with_tokens(self.documents): doc = from_parser.get_source_data(doc) self.bytes_scanned += tokens_parsed if len(clauses) > 3: where_clause = clauses[3] if not S3WhereParser.applies(doc, table_prefix, where_clause): continue select_clause = clauses[1] S3SelectClauseParser(table_prefix).parse( select_clause, from_parser.clauses, doc, results ) return results class DynamoDBStatementParser: def __init__(self, source_data: Dict[str, List[Dict[str, Any]]]): """ Source Data should be a list of DynamoDB documents, mapped to the table name { "table_name": [ { "hash_key": "..", "other_item": {"S": ".."}, .. }, .. ], .. } """ # Source data is in the format: {source: json} # Where 'json' is one or more json documents separated by a newline self.documents = { key: [CaseInsensitiveDict(v) for v in val] for key, val in source_data.items() } def parse( # type: ignore[return] self, query: str, parameters: Optional[List[Dict[str, Any]]] = None ) -> TYPE_RESPONSE: if query.lower().startswith("select"): return_data, updates = self._parse_select(query, parameters) for item in return_data: for key, val in item.items(): item[key] = val.get_regular() return return_data, updates if query.lower().startswith("update"): return self._parse_update(query, parameters) if query.lower().startswith("delete"): return self._parse_delete(query) if query.lower().startswith("insert"): return self._parse_insert(query) def _parse_select( self, query: str, parameters: Optional[List[Dict[str, Any]]] = None ) -> TYPE_RESPONSE: query = query.replace("\n", " ") clauses = re.split("SELECT | FROM | WHERE ", query, flags=re.IGNORECASE) # First clause is whatever comes in front of SELECT - which should be nothing _ = clauses[0] # FROM from_parser = DynamoDBFromParser(from_clause=clauses[2]) source_data = self.documents[list(from_parser.clauses.values())[0]] # WHERE if len(clauses) > 3: where_clause = clauses[3] source_data = DynamoDBWhereParser(source_data).parse( where_clause, parameters ) # SELECT select_clause = clauses[1] queried_data = DynamoDBSelectParser().parse( select_clause, from_parser.clauses, source_data ) updates: Dict[ str, List[Tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] ] = {} return queried_data, updates def _parse_update( self, query: str, parameters: Optional[List[Dict[str, Any]]] = None ) -> TYPE_RESPONSE: query = query.replace("\n", " ") table_name, attrs_to_update, attrs_to_filter = UpdateParser().parse(query) parameters_requested = len( [_ for _, val in attrs_to_update + attrs_to_filter if val == "?"] ) if parameters_requested and len(parameters) != parameters_requested: # type: ignore raise ParserException( name="ValidationError", message="Number of parameters in request and statement don't match.", ) attrs_to_update = [ (key, parameters.pop(0) if val == "?" else val) # type: ignore for key, val in attrs_to_update ] attrs_to_filter = [ (key, parameters.pop(0) if val == "?" else val) # type: ignore for key, val in attrs_to_filter ] source_data = self.documents[table_name] updates_per_table: Dict[ str, List[Tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] ] = {table_name: []} for item in source_data: if all([item.get(name) == val for name, val in attrs_to_filter]): new_item = item.copy() for attr_key, attr_value in attrs_to_update: if attr_value is None: new_item.pop(attr_key, None) else: new_item[attr_key] = attr_value updates_per_table[table_name].append( (item.get_regular(), new_item.get_regular()) ) return [], updates_per_table def _parse_delete(self, query: str) -> TYPE_RESPONSE: query = query.replace("\n", " ") table_name, attrs_to_filter = DeleteParser().parse(query) source_data = self.documents[table_name] deletes_per_table: Dict[ str, List[Tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]]]] ] = {table_name: []} for item in source_data: if all([item.get(name) == val for name, val in attrs_to_filter]): deletes_per_table[table_name].append((item.get_regular(), None)) return [], deletes_per_table def _parse_insert(self, query: str) -> TYPE_RESPONSE: query = query.replace("\n", " ") table_name, new_item = InsertParser().parse(query) return [], {table_name: [(None, new_item)]} @classmethod def get_query_metadata(cls, query: str) -> QueryMetadata: query = query.replace("\n", " ") if query.lower().startswith("select"): clauses = re.split("SELECT | FROM | WHERE ", query, flags=re.IGNORECASE) from_parser = FromParser(clauses[2]) # WHERE if len(clauses) > 3: where_clause = clauses[3] where = WhereParser.parse_where_clause(where_clause) else: where = None return QueryMetadata( tables=from_parser.clauses, where_clause=where, is_select_query=True ) elif query.lower().startswith("update"): table_name, attrs_to_update, attrs_to_filter = UpdateParser().parse(query) return QueryMetadata(tables={table_name: table_name}, where_clause=None) elif query.lower().startswith("delete"): query = query.replace("\n", " ") table_name, attrs_to_filter = DeleteParser().parse(query) return QueryMetadata(tables={table_name: table_name}) elif query.lower().startswith("insert"): query = query.replace("\n", " ") table_name, new_item = InsertParser().parse(query) return QueryMetadata(tables={table_name: table_name}) raise Exception
Memory