"""Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT.""" # pyright: reportReturnType = false import asyncio from concurrent.futures import ThreadPoolExecutor from typing_extensions import Protocol, runtime_checkable import httpx from typing import Any, Optional, Union @runtime_checkable class HttpClient(Protocol): def send( self, request: httpx.Request, *, stream: bool = False, auth: Union[ httpx._types.AuthTypes, httpx._client.UseClientDefault, None ] = httpx.USE_CLIENT_DEFAULT, follow_redirects: Union[ bool, httpx._client.UseClientDefault ] = httpx.USE_CLIENT_DEFAULT, ) -> httpx.Response: pass def build_request( self, method: str, url: httpx._types.URLTypes, *, content: Optional[httpx._types.RequestContent] = None, data: Optional[httpx._types.RequestData] = None, files: Optional[httpx._types.RequestFiles] = None, json: Optional[Any] = None, params: Optional[httpx._types.QueryParamTypes] = None, headers: Optional[httpx._types.HeaderTypes] = None, cookies: Optional[httpx._types.CookieTypes] = None, timeout: Union[ httpx._types.TimeoutTypes, httpx._client.UseClientDefault ] = httpx.USE_CLIENT_DEFAULT, extensions: Optional[httpx._types.RequestExtensions] = None, ) -> httpx.Request: pass def close(self) -> None: pass @runtime_checkable class AsyncHttpClient(Protocol): async def send( self, request: httpx.Request, *, stream: bool = False, auth: Union[ httpx._types.AuthTypes, httpx._client.UseClientDefault, None ] = httpx.USE_CLIENT_DEFAULT, follow_redirects: Union[ bool, httpx._client.UseClientDefault ] = httpx.USE_CLIENT_DEFAULT, ) -> httpx.Response: pass def build_request( self, method: str, url: httpx._types.URLTypes, *, content: Optional[httpx._types.RequestContent] = None, data: Optional[httpx._types.RequestData] = None, files: Optional[httpx._types.RequestFiles] = None, json: Optional[Any] = None, params: Optional[httpx._types.QueryParamTypes] = None, headers: Optional[httpx._types.HeaderTypes] = None, cookies: Optional[httpx._types.CookieTypes] = None, timeout: Union[ httpx._types.TimeoutTypes, httpx._client.UseClientDefault ] = httpx.USE_CLIENT_DEFAULT, extensions: Optional[httpx._types.RequestExtensions] = None, ) -> httpx.Request: pass async def aclose(self) -> None: pass class ClientOwner(Protocol): client: Union[HttpClient, None] async_client: Union[AsyncHttpClient, None] def close_clients( owner: ClientOwner, sync_client: Union[HttpClient, None], sync_client_supplied: bool, async_client: Union[AsyncHttpClient, None], async_client_supplied: bool, ) -> None: """ A finalizer function that is meant to be used with weakref.finalize to close httpx clients used by an SDK so that underlying resources can be garbage collected. """ # Unset the client/async_client properties so there are no more references # to them from the owning SDK instance and they can be reaped. owner.client = None owner.async_client = None if sync_client is not None and not sync_client_supplied: try: sync_client.close() except Exception: pass if async_client is not None and not async_client_supplied: is_async = False try: asyncio.get_running_loop() is_async = True except RuntimeError: pass try: # If this function is called in an async loop then start another # loop in a separate thread to close the async http client. if is_async: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(asyncio.run, async_client.aclose()) future.result() else: asyncio.run(async_client.aclose()) except Exception: pass
Memory