"""Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT.""" from .basesdk import BaseSDK from .httpclient import AsyncHttpClient, ClientOwner, HttpClient, close_clients from .sdkconfiguration import SDKConfiguration from .utils.logger import Logger, get_default_logger from .utils.retries import RetryConfig import httpx from typing import Any, Callable, Dict, Optional, Union, cast from unstructured_client import utils from unstructured_client._hooks import SDKHooks from unstructured_client.destinations import Destinations from unstructured_client.general import General from unstructured_client.jobs import Jobs from unstructured_client.models import shared from unstructured_client.sources import Sources from unstructured_client.types import OptionalNullable, UNSET from unstructured_client.workflows import Workflows import weakref class UnstructuredClient(BaseSDK): destinations: Destinations jobs: Jobs sources: Sources workflows: Workflows general: General def __init__( self, api_key_auth: Optional[ Union[Optional[str], Callable[[], Optional[str]]] ] = None, server: Optional[str] = None, server_url: Optional[str] = None, url_params: Optional[Dict[str, str]] = None, client: Optional[HttpClient] = None, async_client: Optional[AsyncHttpClient] = None, retry_config: OptionalNullable[RetryConfig] = UNSET, timeout_ms: Optional[int] = None, debug_logger: Optional[Logger] = None, ) -> None: r"""Instantiates the SDK configuring it with the provided parameters. :param api_key_auth: The api_key_auth required for authentication :param server: The server by name to use for all methods :param server_url: The server URL to use for all methods :param url_params: Parameters to optionally template the server URL with :param client: The HTTP client to use for all synchronous methods :param async_client: The Async HTTP client to use for all asynchronous methods :param retry_config: The retry configuration to use for all supported methods :param timeout_ms: Optional request timeout applied to each operation in milliseconds """ client_supplied = True if client is None: client = httpx.Client() client_supplied = False assert issubclass( type(client), HttpClient ), "The provided client must implement the HttpClient protocol." async_client_supplied = True if async_client is None: async_client = httpx.AsyncClient() async_client_supplied = False if debug_logger is None: debug_logger = get_default_logger() assert issubclass( type(async_client), AsyncHttpClient ), "The provided async_client must implement the AsyncHttpClient protocol." security: Any = None if callable(api_key_auth): # pylint: disable=unnecessary-lambda-assignment security = lambda: shared.Security(api_key_auth=api_key_auth()) else: security = shared.Security(api_key_auth=api_key_auth) if server_url is not None: if url_params is not None: server_url = utils.template_url(server_url, url_params) BaseSDK.__init__( self, SDKConfiguration( client=client, client_supplied=client_supplied, async_client=async_client, async_client_supplied=async_client_supplied, security=security, server_url=server_url, server=server, retry_config=retry_config, timeout_ms=timeout_ms, debug_logger=debug_logger, ), ) hooks = SDKHooks() current_server_url, *_ = self.sdk_configuration.get_server_details() server_url, self.sdk_configuration.client = hooks.sdk_init( current_server_url, client ) if current_server_url != server_url: self.sdk_configuration.server_url = server_url # pylint: disable=protected-access self.sdk_configuration.__dict__["_hooks"] = hooks weakref.finalize( self, close_clients, cast(ClientOwner, self.sdk_configuration), self.sdk_configuration.client, self.sdk_configuration.client_supplied, self.sdk_configuration.async_client, self.sdk_configuration.async_client_supplied, ) self._init_sdks() def _init_sdks(self): self.destinations = Destinations(self.sdk_configuration) self.jobs = Jobs(self.sdk_configuration) self.sources = Sources(self.sdk_configuration) self.workflows = Workflows(self.sdk_configuration) self.general = General(self.sdk_configuration) def __enter__(self): return self async def __aenter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if ( self.sdk_configuration.client is not None and not self.sdk_configuration.client_supplied ): self.sdk_configuration.client.close() self.sdk_configuration.client = None async def __aexit__(self, exc_type, exc_val, exc_tb): if ( self.sdk_configuration.async_client is not None and not self.sdk_configuration.async_client_supplied ): await self.sdk_configuration.async_client.aclose() self.sdk_configuration.async_client = None
Memory