# mypy: ignore-errors from collections import OrderedDict from collections.abc import Mapping, MutableMapping from botocore import xform_name from botocore.utils import parse_timestamp UNDEFINED = object() # Sentinel to signal the absence of a field in the input class QueryParser: TIMESTAMP_FORMAT = "iso8601" MAP_TYPE = dict def __init__(self, timestamp_parser=None, map_type=None): if timestamp_parser is None: timestamp_parser = parse_timestamp self._timestamp_parser = timestamp_parser if map_type is not None: self.MAP_TYPE = map_type def parse(self, request_dict, operation_model): shape = operation_model.input_shape parsed = self._do_parse(request_dict, shape) return parsed if parsed is not UNDEFINED else {} def _do_parse(self, request_dict, shape): parsed = self._parse_shape(shape, request_dict["query_params"]) return parsed if parsed is not UNDEFINED else {} def _parse_shape(self, shape, node, prefix=""): handler = getattr(self, "_handle_%s" % shape.type_name, self._default_handle) return handler(shape, node, prefix) def _gonna_recurse(self, query_params, prefix): if prefix == "": return False return not any([param_key.startswith(prefix) for param_key in query_params]) def _handle_structure(self, shape, query_params, prefix=""): if self._gonna_recurse(query_params, prefix): return UNDEFINED parsed = self.MAP_TYPE() members = shape.members for member_name in members: member_shape = members[member_name] member_prefix = self._get_serialized_name(member_shape, member_name) if prefix: member_prefix = "%s.%s" % (prefix, member_prefix) value = self._parse_shape(member_shape, query_params, member_prefix) parsed_key = self._parsed_key_name(member_name) if value is not UNDEFINED: parsed[parsed_key] = value return parsed if parsed != {} else UNDEFINED def _handle_list(self, shape, node, prefix=""): # The query protocol serializes empty lists as an empty string. value = self._parse_shape(shape.member, node, prefix) if value == "": return [] list_name = shape.member.serialization.get("name", "member") list_prefix = f"{prefix}.{list_name}" parsed_list = [] i = 1 while True: element_name = f"{list_prefix}.{i}" element_shape = shape.member value = self._parse_shape(element_shape, node, element_name) if value is UNDEFINED: break parsed_list.append(value) i += 1 return parsed_list if parsed_list != [] else UNDEFINED def _handle_timestamp(self, shape, query_params, prefix=""): value = self._default_handle(shape, query_params, prefix) return value if value is UNDEFINED else self._timestamp_parser(value) def _handle_boolean(self, shape, query_params, prefix=""): value = self._default_handle(shape, query_params, prefix) try: return value.lower() == "true" except AttributeError: pass return UNDEFINED def _handle_integer(self, shape, query_params, prefix=""): value = self._default_handle(shape, query_params, prefix) return value if value is UNDEFINED else int(value) def _handle_float(self, shape, query_params, prefix=""): value = self._default_handle(shape, query_params, prefix) return value if value is UNDEFINED else float(value) _handle_double = _handle_float _handle_long = _handle_integer def _default_handle(self, shape, value, prefix=""): default_value = shape.metadata.get("default", UNDEFINED) return value.get(prefix, default_value) def _get_serialized_name(self, shape, default_name): return shape.serialization.get("name", default_name) def _parsed_key_name(self, member_name): key_name = member_name return key_name class XFormedDict(MutableMapping): """ A Pascal/Snake case-insensitive ``dict``-like object. xfd = XFormedDict() xfd['DBInstanceIdentifier'] = 'identifier' xfd['DBInstanceIdentifier'] == 'identifier' # True xfd['db_instance_identifier'] == 'identifier' # True list(xfd) == ['db_instance_identifier'] # True """ def __init__(self, data=None, **kwargs): self._xform_cache = {} self._store = OrderedDict() if data is None: data = {} self.update(data, **kwargs) def _xformed(self, key: str): return xform_name(key, _xform_cache=self._xform_cache) def __setitem__(self, key, value): # Use the xformed key for lookups, but store the actual # key alongside the value. self._store[self._xformed(key)] = (key, value) def __getitem__(self, key: str): return self._store[self._xformed(key)][1] def __delitem__(self, key): del self._store[self._xformed(key)] def __iter__(self): return (key for key in self._store.keys()) def __len__(self): return len(self._store) def original_items(self): """Like iteritems(), but with all PascalCase keys.""" return ((keyval[0], keyval[1]) for (_, keyval) in self._store.items()) def __eq__(self, other): if isinstance(other, Mapping): other = XFormedDict(other) else: return NotImplemented # Compare xformed return dict(self.items()) == dict(other.items()) def copy(self): return XFormedDict(self._store.values()) def __repr__(self): return "%s(%r)" % (self.__class__.__name__, dict(self.items()))
Memory