from functools import wraps
from typing import TYPE_CHECKING
if TYPE_CHECKING: # pragma: no cover
import os
from typing import Any
from typing import BinaryIO
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Type
from typing import Union
from responses import FirstMatchRegistry
from responses import HTTPAdapter
from responses import PreparedRequest
from responses import models
from responses import _F
from responses import BaseResponse
from io import TextIOWrapper
import yaml
from responses import RequestsMock
from responses import Response
from responses import _real_send
from responses.registries import OrderedRegistry
def _remove_nones(d: "Any") -> "Any":
if isinstance(d, dict):
return {k: _remove_nones(v) for k, v in d.items() if v is not None}
if isinstance(d, list):
return [_remove_nones(i) for i in d]
return d
def _remove_default_headers(data: "Any") -> "Any":
"""
It would be too verbose to store these headers in the file generated by the
record functionality.
"""
if isinstance(data, dict):
keys_to_remove = [
"Content-Length",
"Content-Type",
"Date",
"Server",
"Connection",
"Content-Encoding",
]
for i, response in enumerate(data["responses"]):
for key in keys_to_remove:
if key in response["response"]["headers"]:
del data["responses"][i]["response"]["headers"][key]
if not response["response"]["headers"]:
del data["responses"][i]["response"]["headers"]
return data
def _dump(
registered: "List[BaseResponse]",
destination: "Union[BinaryIO, TextIOWrapper]",
dumper: "Callable[[Union[Dict[Any, Any], List[Any]], Union[BinaryIO, TextIOWrapper]], Any]",
) -> None:
data: Dict[str, Any] = {"responses": []}
for rsp in registered:
try:
content_length = rsp.auto_calculate_content_length # type: ignore[attr-defined]
data["responses"].append(
{
"response": {
"method": rsp.method,
"url": rsp.url,
"body": rsp.body,
"status": rsp.status,
"headers": rsp.headers,
"content_type": rsp.content_type,
"auto_calculate_content_length": content_length,
}
}
)
except AttributeError as exc: # pragma: no cover
raise AttributeError(
"Cannot dump response object."
"Probably you use custom Response object that is missing required attributes"
) from exc
dumper(_remove_default_headers(_remove_nones(data)), destination)
class Recorder(RequestsMock):
def __init__(
self,
*,
target: str = "requests.adapters.HTTPAdapter.send",
registry: "Type[FirstMatchRegistry]" = OrderedRegistry,
) -> None:
super().__init__(target=target, registry=registry)
def reset(self) -> None:
self._registry = OrderedRegistry()
def record(
self, *, file_path: "Union[str, bytes, os.PathLike[Any]]" = "response.yaml"
) -> "Union[Callable[[_F], _F], _F]":
def deco_record(function: "_F") -> "Callable[..., Any]":
@wraps(function)
def wrapper(*args: "Any", **kwargs: "Any") -> "Any": # type: ignore[misc]
with self:
ret = function(*args, **kwargs)
self.dump_to_file(
file_path=file_path, registered=self.get_registry().registered
)
return ret
return wrapper
return deco_record
def dump_to_file(
self,
file_path: "Union[str, bytes, os.PathLike[Any]]",
*,
registered: "Optional[List[BaseResponse]]" = None,
) -> None:
"""Dump the recorded responses to a file."""
if registered is None:
registered = self.get_registry().registered
with open(file_path, "w") as file:
_dump(registered, file, yaml.dump)
def _on_request(
self,
adapter: "HTTPAdapter",
request: "PreparedRequest",
**kwargs: "Any",
) -> "models.Response":
# add attributes params and req_kwargs to 'request' object for further match comparison
# original request object does not have these attributes
request.params = self._parse_request_params(request.path_url) # type: ignore[attr-defined]
request.req_kwargs = kwargs # type: ignore[attr-defined]
requests_response = _real_send(adapter, request, **kwargs)
headers_values = {
key: value for key, value in requests_response.headers.items()
}
responses_response = Response(
method=str(request.method),
url=str(requests_response.request.url),
status=requests_response.status_code,
body=requests_response.text,
headers=headers_values,
)
self._registry.add(responses_response)
return requests_response
def stop(self, allow_assert: bool = True) -> None:
super().stop(allow_assert=False)
recorder = Recorder()
record = recorder.record