# Copyright (c) Microsoft Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import base64 import inspect import json import json as json_utils import mimetypes import re from collections import defaultdict from pathlib import Path from types import SimpleNamespace from typing import ( TYPE_CHECKING, Any, Callable, Coroutine, Dict, List, Optional, TypedDict, Union, cast, ) from urllib import parse from playwright._impl._api_structures import ( ClientCertificate, Headers, HeadersArray, RemoteAddr, RequestSizes, ResourceTiming, SecurityDetails, ) from playwright._impl._connection import ( ChannelOwner, from_channel, from_nullable_channel, ) from playwright._impl._errors import Error from playwright._impl._event_context_manager import EventContextManagerImpl from playwright._impl._helper import ( URLMatch, WebSocketRouteHandlerCallback, async_readfile, locals_to_params, url_matches, ) from playwright._impl._str_utils import escape_regex_flags from playwright._impl._waiter import Waiter if TYPE_CHECKING: # pragma: no cover from playwright._impl._browser_context import BrowserContext from playwright._impl._fetch import APIResponse from playwright._impl._frame import Frame from playwright._impl._page import Page class FallbackOverrideParameters(TypedDict, total=False): url: Optional[str] method: Optional[str] headers: Optional[Dict[str, str]] postData: Optional[Union[str, bytes]] class SerializedFallbackOverrides: def __init__(self) -> None: self.url: Optional[str] = None self.method: Optional[str] = None self.headers: Optional[Dict[str, str]] = None self.post_data_buffer: Optional[bytes] = None def serialize_headers(headers: Dict[str, str]) -> HeadersArray: return [ {"name": name, "value": value} for name, value in headers.items() if value is not None ] async def to_client_certificates_protocol( clientCertificates: Optional[List[ClientCertificate]], ) -> Optional[List[Dict[str, str]]]: if not clientCertificates: return None out = [] for clientCertificate in clientCertificates: out_record = { "origin": clientCertificate["origin"], } if passphrase := clientCertificate.get("passphrase"): out_record["passphrase"] = passphrase if pfx := clientCertificate.get("pfx"): out_record["pfx"] = base64.b64encode(pfx).decode() if pfx_path := clientCertificate.get("pfxPath"): out_record["pfx"] = base64.b64encode( await async_readfile(pfx_path) ).decode() if cert := clientCertificate.get("cert"): out_record["cert"] = base64.b64encode(cert).decode() if cert_path := clientCertificate.get("certPath"): out_record["cert"] = base64.b64encode( await async_readfile(cert_path) ).decode() if key := clientCertificate.get("key"): out_record["key"] = base64.b64encode(key).decode() if key_path := clientCertificate.get("keyPath"): out_record["key"] = base64.b64encode( await async_readfile(key_path) ).decode() out.append(out_record) return out class Request(ChannelOwner): def __init__( self, parent: ChannelOwner, type: str, guid: str, initializer: Dict ) -> None: super().__init__(parent, type, guid, initializer) self._redirected_from: Optional["Request"] = from_nullable_channel( initializer.get("redirectedFrom") ) self._redirected_to: Optional["Request"] = None if self._redirected_from: self._redirected_from._redirected_to = self self._failure_text: Optional[str] = None self._timing: ResourceTiming = { "startTime": 0, "domainLookupStart": -1, "domainLookupEnd": -1, "connectStart": -1, "secureConnectionStart": -1, "connectEnd": -1, "requestStart": -1, "responseStart": -1, "responseEnd": -1, } self._provisional_headers = RawHeaders(self._initializer["headers"]) self._all_headers_future: Optional[asyncio.Future[RawHeaders]] = None self._fallback_overrides: SerializedFallbackOverrides = ( SerializedFallbackOverrides() ) def __repr__(self) -> str: return f"<Request url={self.url!r} method={self.method!r}>" def _apply_fallback_overrides(self, overrides: FallbackOverrideParameters) -> None: self._fallback_overrides.url = overrides.get( "url", self._fallback_overrides.url ) self._fallback_overrides.method = overrides.get( "method", self._fallback_overrides.method ) self._fallback_overrides.headers = overrides.get( "headers", self._fallback_overrides.headers ) post_data = overrides.get("postData") if isinstance(post_data, str): self._fallback_overrides.post_data_buffer = post_data.encode() elif isinstance(post_data, bytes): self._fallback_overrides.post_data_buffer = post_data elif post_data is not None: self._fallback_overrides.post_data_buffer = json.dumps(post_data).encode() @property def url(self) -> str: return cast(str, self._fallback_overrides.url or self._initializer["url"]) @property def resource_type(self) -> str: return self._initializer["resourceType"] @property def method(self) -> str: return cast(str, self._fallback_overrides.method or self._initializer["method"]) async def sizes(self) -> RequestSizes: response = await self.response() if not response: raise Error("Unable to fetch sizes for failed request") return await response._channel.send("sizes") @property def post_data(self) -> Optional[str]: data = self._fallback_overrides.post_data_buffer if data: return data.decode() base64_post_data = self._initializer.get("postData") if base64_post_data is not None: return base64.b64decode(base64_post_data).decode() return None @property def post_data_json(self) -> Optional[Any]: post_data = self.post_data if not post_data: return None content_type = self.headers["content-type"] if "application/x-www-form-urlencoded" in content_type: return dict(parse.parse_qsl(post_data)) try: return json.loads(post_data) except Exception: raise Error(f"POST data is not a valid JSON object: {post_data}") @property def post_data_buffer(self) -> Optional[bytes]: if self._fallback_overrides.post_data_buffer: return self._fallback_overrides.post_data_buffer if self._initializer.get("postData"): return base64.b64decode(self._initializer["postData"]) return None async def response(self) -> Optional["Response"]: return from_nullable_channel(await self._channel.send("response")) @property def frame(self) -> "Frame": if not self._initializer.get("frame"): raise Error("Service Worker requests do not have an associated frame.") frame = cast("Frame", from_channel(self._initializer["frame"])) if not frame._page: raise Error( "\n".join( [ "Frame for this navigation request is not available, because the request", "was issued before the frame is created. You can check whether the request", "is a navigation request by calling isNavigationRequest() method.", ] ) ) return frame def is_navigation_request(self) -> bool: return self._initializer["isNavigationRequest"] @property def redirected_from(self) -> Optional["Request"]: return self._redirected_from @property def redirected_to(self) -> Optional["Request"]: return self._redirected_to @property def failure(self) -> Optional[str]: return self._failure_text @property def timing(self) -> ResourceTiming: return self._timing def _set_response_end_timing(self, response_end_timing: float) -> None: self._timing["responseEnd"] = response_end_timing if self._timing["responseStart"] == -1: self._timing["responseStart"] = response_end_timing @property def headers(self) -> Headers: override = self._fallback_overrides.headers if override: return RawHeaders._from_headers_dict_lossy(override).headers() return self._provisional_headers.headers() async def all_headers(self) -> Headers: return (await self._actual_headers()).headers() async def headers_array(self) -> HeadersArray: return (await self._actual_headers()).headers_array() async def header_value(self, name: str) -> Optional[str]: return (await self._actual_headers()).get(name) async def _actual_headers(self) -> "RawHeaders": override = self._fallback_overrides.headers if override: return RawHeaders(serialize_headers(override)) if not self._all_headers_future: self._all_headers_future = asyncio.Future() headers = await self._channel.send("rawRequestHeaders") self._all_headers_future.set_result(RawHeaders(headers)) return await self._all_headers_future def _target_closed_future(self) -> asyncio.Future: frame = cast( Optional["Frame"], from_nullable_channel(self._initializer.get("frame")) ) if not frame: return asyncio.Future() page = frame._page if not page: return asyncio.Future() return page._closed_or_crashed_future def _safe_page(self) -> "Optional[Page]": frame = from_nullable_channel(self._initializer.get("frame")) if not frame: return None return cast("Frame", frame)._page class Route(ChannelOwner): def __init__( self, parent: ChannelOwner, type: str, guid: str, initializer: Dict ) -> None: super().__init__(parent, type, guid, initializer) self._channel.mark_as_internal_type() self._handling_future: Optional[asyncio.Future["bool"]] = None self._context: "BrowserContext" = cast("BrowserContext", None) self._did_throw = False def _start_handling(self) -> "asyncio.Future[bool]": self._handling_future = asyncio.Future() return self._handling_future def _report_handled(self, done: bool) -> None: chain = self._handling_future assert chain self._handling_future = None chain.set_result(done) def _check_not_handled(self) -> None: if not self._handling_future: raise Error("Route is already handled!") def __repr__(self) -> str: return f"<Route request={self.request}>" @property def request(self) -> Request: return from_channel(self._initializer["request"]) async def abort(self, errorCode: str = None) -> None: await self._handle_route( lambda: self._race_with_page_close( self._channel.send( "abort", { "errorCode": errorCode, }, ) ) ) async def fulfill( self, status: int = None, headers: Dict[str, str] = None, body: Union[str, bytes] = None, json: Any = None, path: Union[str, Path] = None, contentType: str = None, response: "APIResponse" = None, ) -> None: await self._handle_route( lambda: self._inner_fulfill( status, headers, body, json, path, contentType, response ) ) async def _inner_fulfill( self, status: int = None, headers: Dict[str, str] = None, body: Union[str, bytes] = None, json: Any = None, path: Union[str, Path] = None, contentType: str = None, response: "APIResponse" = None, ) -> None: params = locals_to_params(locals()) if json is not None: if body is not None: raise Error("Can specify either body or json parameters") body = json_utils.dumps(json) if response: del params["response"] params["status"] = ( params["status"] if params.get("status") else response.status ) params["headers"] = ( params["headers"] if params.get("headers") else response.headers ) from playwright._impl._fetch import APIResponse if body is None and path is None and isinstance(response, APIResponse): if response._request._connection is self._connection: params["fetchResponseUid"] = response._fetch_uid else: body = await response.body() length = 0 if isinstance(body, str): params["body"] = body params["isBase64"] = False length = len(body.encode()) elif isinstance(body, bytes): params["body"] = base64.b64encode(body).decode() params["isBase64"] = True length = len(body) elif path: del params["path"] file_content = Path(path).read_bytes() params["body"] = base64.b64encode(file_content).decode() params["isBase64"] = True length = len(file_content) headers = {k.lower(): str(v) for k, v in params.get("headers", {}).items()} if params.get("contentType"): headers["content-type"] = params["contentType"] elif json: headers["content-type"] = "application/json" elif path: headers["content-type"] = ( mimetypes.guess_type(str(Path(path)))[0] or "application/octet-stream" ) if length and "content-length" not in headers: headers["content-length"] = str(length) params["headers"] = serialize_headers(headers) await self._race_with_page_close(self._channel.send("fulfill", params)) async def _handle_route(self, callback: Callable) -> None: self._check_not_handled() try: await callback() self._report_handled(True) except Exception as e: self._did_throw = True raise e async def fetch( self, url: str = None, method: str = None, headers: Dict[str, str] = None, postData: Union[Any, str, bytes] = None, maxRedirects: int = None, maxRetries: int = None, timeout: float = None, ) -> "APIResponse": return await self._connection.wrap_api_call( lambda: self._context.request._inner_fetch( self.request, url, method, headers, postData, maxRedirects=maxRedirects, maxRetries=maxRetries, timeout=timeout, ) ) async def fallback( self, url: str = None, method: str = None, headers: Dict[str, str] = None, postData: Union[Any, str, bytes] = None, ) -> None: overrides = cast(FallbackOverrideParameters, locals_to_params(locals())) self._check_not_handled() self.request._apply_fallback_overrides(overrides) self._report_handled(False) async def continue_( self, url: str = None, method: str = None, headers: Dict[str, str] = None, postData: Union[Any, str, bytes] = None, ) -> None: overrides = cast(FallbackOverrideParameters, locals_to_params(locals())) async def _inner() -> None: self.request._apply_fallback_overrides(overrides) await self._inner_continue(False) return await self._handle_route(_inner) async def _inner_continue(self, is_fallback: bool = False) -> None: options = self.request._fallback_overrides await self._race_with_page_close( self._channel.send( "continue", { "url": options.url, "method": options.method, "headers": ( serialize_headers(options.headers) if options.headers else None ), "postData": ( base64.b64encode(options.post_data_buffer).decode() if options.post_data_buffer is not None else None ), "isFallback": is_fallback, }, ) ) async def _redirected_navigation_request(self, url: str) -> None: await self._handle_route( lambda: self._race_with_page_close( self._channel.send("redirectNavigationRequest", {"url": url}) ) ) async def _race_with_page_close(self, future: Coroutine) -> None: fut = asyncio.create_task(future) # Rewrite the user's stack to the new task which runs in the background. setattr( fut, "__pw_stack__", getattr(asyncio.current_task(self._loop), "__pw_stack__", inspect.stack()), ) target_closed_future = self.request._target_closed_future() await asyncio.wait( [fut, target_closed_future], return_when=asyncio.FIRST_COMPLETED, ) if fut.done() and fut.exception(): raise cast(BaseException, fut.exception()) if target_closed_future.done(): await asyncio.gather(fut, return_exceptions=True) def _create_task_and_ignore_exception( loop: asyncio.AbstractEventLoop, coro: Coroutine ) -> None: async def _ignore_exception() -> None: try: await coro except Exception: pass loop.create_task(_ignore_exception()) class ServerWebSocketRoute: def __init__(self, ws: "WebSocketRoute"): self._ws = ws def on_message(self, handler: Callable[[Union[str, bytes]], Any]) -> None: self._ws._on_server_message = handler def on_close(self, handler: Callable[[Optional[int], Optional[str]], Any]) -> None: self._ws._on_server_close = handler def connect_to_server(self) -> None: raise NotImplementedError( "connectToServer must be called on the page-side WebSocketRoute" ) @property def url(self) -> str: return self._ws._initializer["url"] def close(self, code: int = None, reason: str = None) -> None: _create_task_and_ignore_exception( self._ws._loop, self._ws._channel.send( "closeServer", { "code": code, "reason": reason, "wasClean": True, }, ), ) def send(self, message: Union[str, bytes]) -> None: if isinstance(message, str): _create_task_and_ignore_exception( self._ws._loop, self._ws._channel.send( "sendToServer", {"message": message, "isBase64": False} ), ) else: _create_task_and_ignore_exception( self._ws._loop, self._ws._channel.send( "sendToServer", {"message": base64.b64encode(message).decode(), "isBase64": True}, ), ) class WebSocketRoute(ChannelOwner): def __init__( self, parent: ChannelOwner, type: str, guid: str, initializer: Dict ) -> None: super().__init__(parent, type, guid, initializer) self._channel.mark_as_internal_type() self._on_page_message: Optional[Callable[[Union[str, bytes]], Any]] = None self._on_page_close: Optional[Callable[[Optional[int], Optional[str]], Any]] = ( None ) self._on_server_message: Optional[Callable[[Union[str, bytes]], Any]] = None self._on_server_close: Optional[ Callable[[Optional[int], Optional[str]], Any] ] = None self._server = ServerWebSocketRoute(self) self._connected = False self._channel.on("messageFromPage", self._channel_message_from_page) self._channel.on("messageFromServer", self._channel_message_from_server) self._channel.on("closePage", self._channel_close_page) self._channel.on("closeServer", self._channel_close_server) def _channel_message_from_page(self, event: Dict) -> None: if self._on_page_message: self._on_page_message( base64.b64decode(event["message"]) if event["isBase64"] else event["message"] ) elif self._connected: _create_task_and_ignore_exception( self._loop, self._channel.send("sendToServer", event) ) def _channel_message_from_server(self, event: Dict) -> None: if self._on_server_message: self._on_server_message( base64.b64decode(event["message"]) if event["isBase64"] else event["message"] ) else: _create_task_and_ignore_exception( self._loop, self._channel.send("sendToPage", event) ) def _channel_close_page(self, event: Dict) -> None: if self._on_page_close: self._on_page_close(event["code"], event["reason"]) else: _create_task_and_ignore_exception( self._loop, self._channel.send("closeServer", event) ) def _channel_close_server(self, event: Dict) -> None: if self._on_server_close: self._on_server_close(event["code"], event["reason"]) else: _create_task_and_ignore_exception( self._loop, self._channel.send("closePage", event) ) @property def url(self) -> str: return self._initializer["url"] async def close(self, code: int = None, reason: str = None) -> None: try: await self._channel.send( "closePage", {"code": code, "reason": reason, "wasClean": True} ) except Exception: pass def connect_to_server(self) -> "WebSocketRoute": if self._connected: raise Error("Already connected to the server") self._connected = True asyncio.create_task(self._channel.send("connect")) return cast("WebSocketRoute", self._server) def send(self, message: Union[str, bytes]) -> None: if isinstance(message, str): _create_task_and_ignore_exception( self._loop, self._channel.send( "sendToPage", {"message": message, "isBase64": False} ), ) else: _create_task_and_ignore_exception( self._loop, self._channel.send( "sendToPage", { "message": base64.b64encode(message).decode(), "isBase64": True, }, ), ) def on_message(self, handler: Callable[[Union[str, bytes]], Any]) -> None: self._on_page_message = handler def on_close(self, handler: Callable[[Optional[int], Optional[str]], Any]) -> None: self._on_page_close = handler async def _after_handle(self) -> None: if self._connected: return # Ensure that websocket is "open" and can send messages without an actual server connection. await self._channel.send("ensureOpened") class WebSocketRouteHandler: def __init__( self, base_url: Optional[str], url: URLMatch, handler: WebSocketRouteHandlerCallback, ): self._base_url = base_url self.url = url self.handler = handler @staticmethod def prepare_interception_patterns( handlers: List["WebSocketRouteHandler"], ) -> List[dict]: patterns = [] all_urls = False for handler in handlers: if isinstance(handler.url, str): patterns.append({"glob": handler.url}) elif isinstance(handler.url, re.Pattern): patterns.append( { "regexSource": handler.url.pattern, "regexFlags": escape_regex_flags(handler.url), } ) else: all_urls = True if all_urls: return [{"glob": "**/*"}] return patterns def matches(self, ws_url: str) -> bool: return url_matches(self._base_url, ws_url, self.url) async def handle(self, websocket_route: "WebSocketRoute") -> None: coro_or_future = self.handler(websocket_route) if asyncio.iscoroutine(coro_or_future): await coro_or_future await websocket_route._after_handle() class Response(ChannelOwner): def __init__( self, parent: ChannelOwner, type: str, guid: str, initializer: Dict ) -> None: super().__init__(parent, type, guid, initializer) self._request: Request = from_channel(self._initializer["request"]) timing = self._initializer["timing"] self._request._timing["startTime"] = timing["startTime"] self._request._timing["domainLookupStart"] = timing["domainLookupStart"] self._request._timing["domainLookupEnd"] = timing["domainLookupEnd"] self._request._timing["connectStart"] = timing["connectStart"] self._request._timing["secureConnectionStart"] = timing["secureConnectionStart"] self._request._timing["connectEnd"] = timing["connectEnd"] self._request._timing["requestStart"] = timing["requestStart"] self._request._timing["responseStart"] = timing["responseStart"] self._provisional_headers = RawHeaders( cast(HeadersArray, self._initializer["headers"]) ) self._raw_headers_future: Optional[asyncio.Future[RawHeaders]] = None self._finished_future: asyncio.Future[bool] = asyncio.Future() def __repr__(self) -> str: return f"<Response url={self.url!r} request={self.request}>" @property def url(self) -> str: return self._initializer["url"] @property def ok(self) -> bool: # Status 0 is for file:// URLs return self._initializer["status"] == 0 or ( self._initializer["status"] >= 200 and self._initializer["status"] <= 299 ) @property def status(self) -> int: return self._initializer["status"] @property def status_text(self) -> str: return self._initializer["statusText"] @property def headers(self) -> Headers: return self._provisional_headers.headers() @property def from_service_worker(self) -> bool: return self._initializer["fromServiceWorker"] async def all_headers(self) -> Headers: return (await self._actual_headers()).headers() async def headers_array(self) -> HeadersArray: return (await self._actual_headers()).headers_array() async def header_value(self, name: str) -> Optional[str]: return (await self._actual_headers()).get(name) async def header_values(self, name: str) -> List[str]: return (await self._actual_headers()).get_all(name) async def _actual_headers(self) -> "RawHeaders": if not self._raw_headers_future: self._raw_headers_future = asyncio.Future() headers = cast(HeadersArray, await self._channel.send("rawResponseHeaders")) self._raw_headers_future.set_result(RawHeaders(headers)) return await self._raw_headers_future async def server_addr(self) -> Optional[RemoteAddr]: return await self._channel.send("serverAddr") async def security_details(self) -> Optional[SecurityDetails]: return await self._channel.send("securityDetails") async def finished(self) -> None: async def on_finished() -> None: await self._request._target_closed_future() raise Error("Target closed") on_finished_task = asyncio.create_task(on_finished()) await asyncio.wait( cast( List[Union[asyncio.Task, asyncio.Future]], [self._finished_future, on_finished_task], ), return_when=asyncio.FIRST_COMPLETED, ) if on_finished_task.done(): await on_finished_task async def body(self) -> bytes: binary = await self._channel.send("body") return base64.b64decode(binary) async def text(self) -> str: content = await self.body() return content.decode() async def json(self) -> Any: return json.loads(await self.text()) @property def request(self) -> Request: return self._request @property def frame(self) -> "Frame": return self._request.frame class WebSocket(ChannelOwner): Events = SimpleNamespace( Close="close", FrameReceived="framereceived", FrameSent="framesent", Error="socketerror", ) def __init__( self, parent: ChannelOwner, type: str, guid: str, initializer: Dict ) -> None: super().__init__(parent, type, guid, initializer) self._is_closed = False self._page = cast("Page", parent) self._channel.on( "frameSent", lambda params: self._on_frame_sent(params["opcode"], params["data"]), ) self._channel.on( "frameReceived", lambda params: self._on_frame_received(params["opcode"], params["data"]), ) self._channel.on( "socketError", lambda params: self.emit(WebSocket.Events.Error, params["error"]), ) self._channel.on("close", lambda params: self._on_close()) def __repr__(self) -> str: return f"<WebSocket url={self.url!r}>" @property def url(self) -> str: return self._initializer["url"] def expect_event( self, event: str, predicate: Callable = None, timeout: float = None, ) -> EventContextManagerImpl: if timeout is None: timeout = cast(Any, self._parent)._timeout_settings.timeout() waiter = Waiter(self, f"web_socket.expect_event({event})") waiter.reject_on_timeout( cast(float, timeout), f'Timeout {timeout}ms exceeded while waiting for event "{event}"', ) if event != WebSocket.Events.Close: waiter.reject_on_event(self, WebSocket.Events.Close, Error("Socket closed")) if event != WebSocket.Events.Error: waiter.reject_on_event(self, WebSocket.Events.Error, Error("Socket error")) waiter.reject_on_event( self._page, "close", lambda: self._page._close_error_with_reason() ) waiter.wait_for_event(self, event, predicate) return EventContextManagerImpl(waiter.result()) async def wait_for_event( self, event: str, predicate: Callable = None, timeout: float = None ) -> Any: async with self.expect_event(event, predicate, timeout) as event_info: pass return await event_info def _on_frame_sent(self, opcode: int, data: str) -> None: if opcode == 2: self.emit(WebSocket.Events.FrameSent, base64.b64decode(data)) elif opcode == 1: self.emit(WebSocket.Events.FrameSent, data) def _on_frame_received(self, opcode: int, data: str) -> None: if opcode == 2: self.emit(WebSocket.Events.FrameReceived, base64.b64decode(data)) elif opcode == 1: self.emit(WebSocket.Events.FrameReceived, data) def is_closed(self) -> bool: return self._is_closed def _on_close(self) -> None: self._is_closed = True self.emit(WebSocket.Events.Close, self) class RawHeaders: def __init__(self, headers: HeadersArray) -> None: self._headers_array = headers self._headers_map: Dict[str, Dict[str, bool]] = defaultdict(dict) for header in headers: self._headers_map[header["name"].lower()][header["value"]] = True @staticmethod def _from_headers_dict_lossy(headers: Dict[str, str]) -> "RawHeaders": return RawHeaders(serialize_headers(headers)) def get(self, name: str) -> Optional[str]: values = self.get_all(name) if not values: return None separator = "\n" if name.lower() == "set-cookie" else ", " return separator.join(values) def get_all(self, name: str) -> List[str]: return list(self._headers_map[name.lower()].keys()) def headers(self) -> Dict[str, str]: result = {} for name in self._headers_map.keys(): result[name] = cast(str, self.get(name)) return result def headers_array(self) -> HeadersArray: return self._headers_array
Memory