from functools import wraps from typing import TYPE_CHECKING if TYPE_CHECKING: # pragma: no cover import os from typing import Any from typing import BinaryIO from typing import Callable from typing import Dict from typing import List from typing import Optional from typing import Type from typing import Union from responses import FirstMatchRegistry from responses import HTTPAdapter from responses import PreparedRequest from responses import models from responses import _F from responses import BaseResponse from io import TextIOWrapper import yaml from responses import RequestsMock from responses import Response from responses import _real_send from responses.registries import OrderedRegistry def _remove_nones(d: "Any") -> "Any": if isinstance(d, dict): return {k: _remove_nones(v) for k, v in d.items() if v is not None} if isinstance(d, list): return [_remove_nones(i) for i in d] return d def _remove_default_headers(data: "Any") -> "Any": """ It would be too verbose to store these headers in the file generated by the record functionality. """ if isinstance(data, dict): keys_to_remove = [ "Content-Length", "Content-Type", "Date", "Server", "Connection", "Content-Encoding", ] for i, response in enumerate(data["responses"]): for key in keys_to_remove: if key in response["response"]["headers"]: del data["responses"][i]["response"]["headers"][key] if not response["response"]["headers"]: del data["responses"][i]["response"]["headers"] return data def _dump( registered: "List[BaseResponse]", destination: "Union[BinaryIO, TextIOWrapper]", dumper: "Callable[[Union[Dict[Any, Any], List[Any]], Union[BinaryIO, TextIOWrapper]], Any]", ) -> None: data: Dict[str, Any] = {"responses": []} for rsp in registered: try: content_length = rsp.auto_calculate_content_length # type: ignore[attr-defined] data["responses"].append( { "response": { "method": rsp.method, "url": rsp.url, "body": rsp.body, "status": rsp.status, "headers": rsp.headers, "content_type": rsp.content_type, "auto_calculate_content_length": content_length, } } ) except AttributeError as exc: # pragma: no cover raise AttributeError( "Cannot dump response object." "Probably you use custom Response object that is missing required attributes" ) from exc dumper(_remove_default_headers(_remove_nones(data)), destination) class Recorder(RequestsMock): def __init__( self, *, target: str = "requests.adapters.HTTPAdapter.send", registry: "Type[FirstMatchRegistry]" = OrderedRegistry, ) -> None: super().__init__(target=target, registry=registry) def reset(self) -> None: self._registry = OrderedRegistry() def record( self, *, file_path: "Union[str, bytes, os.PathLike[Any]]" = "response.yaml" ) -> "Union[Callable[[_F], _F], _F]": def deco_record(function: "_F") -> "Callable[..., Any]": @wraps(function) def wrapper(*args: "Any", **kwargs: "Any") -> "Any": # type: ignore[misc] with self: ret = function(*args, **kwargs) self.dump_to_file( file_path=file_path, registered=self.get_registry().registered ) return ret return wrapper return deco_record def dump_to_file( self, file_path: "Union[str, bytes, os.PathLike[Any]]", *, registered: "Optional[List[BaseResponse]]" = None, ) -> None: """Dump the recorded responses to a file.""" if registered is None: registered = self.get_registry().registered with open(file_path, "w") as file: _dump(registered, file, yaml.dump) def _on_request( self, adapter: "HTTPAdapter", request: "PreparedRequest", **kwargs: "Any", ) -> "models.Response": # add attributes params and req_kwargs to 'request' object for further match comparison # original request object does not have these attributes request.params = self._parse_request_params(request.path_url) # type: ignore[attr-defined] request.req_kwargs = kwargs # type: ignore[attr-defined] requests_response = _real_send(adapter, request, **kwargs) headers_values = { key: value for key, value in requests_response.headers.items() } responses_response = Response( method=str(request.method), url=str(requests_response.request.url), status=requests_response.status_code, body=requests_response.text, headers=headers_values, ) self._registry.add(responses_response) return requests_response def stop(self, allow_assert: bool = True) -> None: super().stop(allow_assert=False) recorder = Recorder() record = recorder.record
Memory