import base64 import json import re from typing import Any, Dict, List, Optional class DynamoJsonEncoder(json.JSONEncoder): def default(self, o: Any) -> Any: if hasattr(o, "to_json"): return o.to_json() elif isinstance(o, bytes): return base64.b64encode(o).decode("utf-8") def dynamo_json_dump(dynamo_object: Any) -> str: return json.dumps(dynamo_object, cls=DynamoJsonEncoder) def bytesize(val: str) -> int: return len(val if isinstance(val, bytes) else val.encode("utf-8")) def find_nested_key( keys: List[str], dct: Dict[str, Any], processed_keys: Optional[List[str]] = None, result: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ keys : A list of keys that may be present in the provided dictionary ["level1", "level2"] dct : A dictionary that we want to inspect {"level1": {"level2": "val", "irrelevant": ..} processed_keys: Should not be set by the caller, only by recursive invocations. Example value: ["level1"] result: Should not be set by the caller, only by recursive invocations Example value: {"level1": {}} returns: {"level1": {"level2": "val"}} """ if result is None: result = {} if processed_keys is None: processed_keys = [] # A key can refer to a list-item: 'level1[1].level2' is_list_expression = re.match(pattern=r"(.+)\[(\d+)\]$", string=keys[0]) if len(keys) == 1: # Set 'current_key' and 'value' # or return an empty dictionary if the key does not exist in our dictionary if is_list_expression: current_key = is_list_expression.group(1) idx = int(is_list_expression.group(2)) if ( current_key in dct and isinstance(dct[current_key], list) and len(dct[current_key]) >= idx ): value = [dct[current_key][idx]] else: return {} elif keys[0] in dct: current_key = keys[0] value = dct[current_key] else: return {} # We may have already processed some keys # Dig into the result to find the appropriate key to append the value to # # result: {'level1': {'level2': {}}} # processed_keys: ['level1', 'level2'] # --> # result: {'level1': {'level2': value}} temp_result = result for key in processed_keys: if isinstance(temp_result, list): temp_result = temp_result[0][key] else: temp_result = temp_result[key] if isinstance(temp_result, list): temp_result.append({current_key: value}) else: temp_result[current_key] = value return result else: # Set 'current_key' # or return an empty dictionary if the key does not exist in our dictionary if is_list_expression: current_key = is_list_expression.group(1) idx = int(is_list_expression.group(2)) if ( current_key in dct and isinstance(dct[current_key], list) and len(dct[current_key]) >= idx ): pass else: return {} elif keys[0] in dct: current_key = keys[0] else: return {} # Append the 'current_key' to the dictionary that is our result (so far) # {'level1': {}} --> {'level1': {current_key: {}} temp_result = result for key in processed_keys: temp_result = temp_result[key] if isinstance(temp_result, list): temp_result.append({current_key: [] if is_list_expression else {}}) else: temp_result[current_key] = [] if is_list_expression else {} remaining_dct = ( dct[current_key][idx] if is_list_expression else dct[current_key] ) return find_nested_key( keys[1:], remaining_dct, processed_keys=processed_keys + [current_key], result=result, )
Memory