# This file was auto-generated by Fern from our API Definition.
import typing
import httpx
from .core.client_wrapper import AsyncClientWrapper, SyncClientWrapper
from .resources.dataset_items.client import AsyncDatasetItemsClient, DatasetItemsClient
from .resources.dataset_run_items.client import (
AsyncDatasetRunItemsClient,
DatasetRunItemsClient,
)
from .resources.datasets.client import AsyncDatasetsClient, DatasetsClient
from .resources.health.client import AsyncHealthClient, HealthClient
from .resources.ingestion.client import AsyncIngestionClient, IngestionClient
from .resources.metrics.client import AsyncMetricsClient, MetricsClient
from .resources.models.client import AsyncModelsClient, ModelsClient
from .resources.observations.client import AsyncObservationsClient, ObservationsClient
from .resources.projects.client import AsyncProjectsClient, ProjectsClient
from .resources.prompts.client import AsyncPromptsClient, PromptsClient
from .resources.score.client import AsyncScoreClient, ScoreClient
from .resources.score_configs.client import AsyncScoreConfigsClient, ScoreConfigsClient
from .resources.sessions.client import AsyncSessionsClient, SessionsClient
from .resources.trace.client import AsyncTraceClient, TraceClient
class FernLangfuse:
"""
Use this class to access the different functions within the SDK. You can instantiate any number of clients with different configuration that will propogate to these functions.
Parameters:
- base_url: str. The base url to use for requests from the client.
- x_langfuse_sdk_name: typing.Optional[str].
- x_langfuse_sdk_version: typing.Optional[str].
- x_langfuse_public_key: typing.Optional[str].
- username: typing.Optional[typing.Union[str, typing.Callable[[], str]]].
- password: typing.Optional[typing.Union[str, typing.Callable[[], str]]].
- timeout: typing.Optional[float]. The timeout to be used, in seconds, for requests by default the timeout is 60 seconds, unless a custom httpx client is used, in which case a default is not set.
- follow_redirects: typing.Optional[bool]. Whether the default httpx client follows redirects or not, this is irrelevant if a custom httpx client is passed in.
- httpx_client: typing.Optional[httpx.Client]. The httpx client to use for making requests, a preconfigured client is used by default, however this is useful should you want to pass in any custom httpx configuration.
---
from finto.client import FernLangfuse
client = FernLangfuse(
x_langfuse_sdk_name="YOUR_X_LANGFUSE_SDK_NAME",
x_langfuse_sdk_version="YOUR_X_LANGFUSE_SDK_VERSION",
x_langfuse_public_key="YOUR_X_LANGFUSE_PUBLIC_KEY",
username="YOUR_USERNAME",
password="YOUR_PASSWORD",
base_url="https://yourhost.com/path/to/api",
)
"""
def __init__(
self,
*,
base_url: str,
x_langfuse_sdk_name: typing.Optional[str] = None,
x_langfuse_sdk_version: typing.Optional[str] = None,
x_langfuse_public_key: typing.Optional[str] = None,
username: typing.Optional[typing.Union[str, typing.Callable[[], str]]] = None,
password: typing.Optional[typing.Union[str, typing.Callable[[], str]]] = None,
timeout: typing.Optional[float] = None,
follow_redirects: typing.Optional[bool] = None,
httpx_client: typing.Optional[httpx.Client] = None,
):
_defaulted_timeout = (
timeout if timeout is not None else 60 if httpx_client is None else None
)
self._client_wrapper = SyncClientWrapper(
base_url=base_url,
x_langfuse_sdk_name=x_langfuse_sdk_name,
x_langfuse_sdk_version=x_langfuse_sdk_version,
x_langfuse_public_key=x_langfuse_public_key,
username=username,
password=password,
httpx_client=httpx_client
if httpx_client is not None
else httpx.Client(
timeout=_defaulted_timeout, follow_redirects=follow_redirects
)
if follow_redirects is not None
else httpx.Client(timeout=_defaulted_timeout),
timeout=_defaulted_timeout,
)
self.dataset_items = DatasetItemsClient(client_wrapper=self._client_wrapper)
self.dataset_run_items = DatasetRunItemsClient(
client_wrapper=self._client_wrapper
)
self.datasets = DatasetsClient(client_wrapper=self._client_wrapper)
self.health = HealthClient(client_wrapper=self._client_wrapper)
self.ingestion = IngestionClient(client_wrapper=self._client_wrapper)
self.metrics = MetricsClient(client_wrapper=self._client_wrapper)
self.models = ModelsClient(client_wrapper=self._client_wrapper)
self.observations = ObservationsClient(client_wrapper=self._client_wrapper)
self.projects = ProjectsClient(client_wrapper=self._client_wrapper)
self.prompts = PromptsClient(client_wrapper=self._client_wrapper)
self.score_configs = ScoreConfigsClient(client_wrapper=self._client_wrapper)
self.score = ScoreClient(client_wrapper=self._client_wrapper)
self.sessions = SessionsClient(client_wrapper=self._client_wrapper)
self.trace = TraceClient(client_wrapper=self._client_wrapper)
class AsyncFernLangfuse:
"""
Use this class to access the different functions within the SDK. You can instantiate any number of clients with different configuration that will propogate to these functions.
Parameters:
- base_url: str. The base url to use for requests from the client.
- x_langfuse_sdk_name: typing.Optional[str].
- x_langfuse_sdk_version: typing.Optional[str].
- x_langfuse_public_key: typing.Optional[str].
- username: typing.Optional[typing.Union[str, typing.Callable[[], str]]].
- password: typing.Optional[typing.Union[str, typing.Callable[[], str]]].
- timeout: typing.Optional[float]. The timeout to be used, in seconds, for requests by default the timeout is 60 seconds, unless a custom httpx client is used, in which case a default is not set.
- follow_redirects: typing.Optional[bool]. Whether the default httpx client follows redirects or not, this is irrelevant if a custom httpx client is passed in.
- httpx_client: typing.Optional[httpx.AsyncClient]. The httpx client to use for making requests, a preconfigured client is used by default, however this is useful should you want to pass in any custom httpx configuration.
---
from finto.client import AsyncFernLangfuse
client = AsyncFernLangfuse(
x_langfuse_sdk_name="YOUR_X_LANGFUSE_SDK_NAME",
x_langfuse_sdk_version="YOUR_X_LANGFUSE_SDK_VERSION",
x_langfuse_public_key="YOUR_X_LANGFUSE_PUBLIC_KEY",
username="YOUR_USERNAME",
password="YOUR_PASSWORD",
base_url="https://yourhost.com/path/to/api",
)
"""
def __init__(
self,
*,
base_url: str,
x_langfuse_sdk_name: typing.Optional[str] = None,
x_langfuse_sdk_version: typing.Optional[str] = None,
x_langfuse_public_key: typing.Optional[str] = None,
username: typing.Optional[typing.Union[str, typing.Callable[[], str]]] = None,
password: typing.Optional[typing.Union[str, typing.Callable[[], str]]] = None,
timeout: typing.Optional[float] = None,
follow_redirects: typing.Optional[bool] = None,
httpx_client: typing.Optional[httpx.AsyncClient] = None,
):
_defaulted_timeout = (
timeout if timeout is not None else 60 if httpx_client is None else None
)
self._client_wrapper = AsyncClientWrapper(
base_url=base_url,
x_langfuse_sdk_name=x_langfuse_sdk_name,
x_langfuse_sdk_version=x_langfuse_sdk_version,
x_langfuse_public_key=x_langfuse_public_key,
username=username,
password=password,
httpx_client=httpx_client
if httpx_client is not None
else httpx.AsyncClient(
timeout=_defaulted_timeout, follow_redirects=follow_redirects
)
if follow_redirects is not None
else httpx.AsyncClient(timeout=_defaulted_timeout),
timeout=_defaulted_timeout,
)
self.dataset_items = AsyncDatasetItemsClient(
client_wrapper=self._client_wrapper
)
self.dataset_run_items = AsyncDatasetRunItemsClient(
client_wrapper=self._client_wrapper
)
self.datasets = AsyncDatasetsClient(client_wrapper=self._client_wrapper)
self.health = AsyncHealthClient(client_wrapper=self._client_wrapper)
self.ingestion = AsyncIngestionClient(client_wrapper=self._client_wrapper)
self.metrics = AsyncMetricsClient(client_wrapper=self._client_wrapper)
self.models = AsyncModelsClient(client_wrapper=self._client_wrapper)
self.observations = AsyncObservationsClient(client_wrapper=self._client_wrapper)
self.projects = AsyncProjectsClient(client_wrapper=self._client_wrapper)
self.prompts = AsyncPromptsClient(client_wrapper=self._client_wrapper)
self.score_configs = AsyncScoreConfigsClient(
client_wrapper=self._client_wrapper
)
self.score = AsyncScoreClient(client_wrapper=self._client_wrapper)
self.sessions = AsyncSessionsClient(client_wrapper=self._client_wrapper)
self.trace = AsyncTraceClient(client_wrapper=self._client_wrapper)