# This file was auto-generated by Fern from our API Definition.
import datetime as dt
import typing
import urllib.parse
from json.decoder import JSONDecodeError
from ...core.api_error import ApiError
from ...core.client_wrapper import AsyncClientWrapper, SyncClientWrapper
from ...core.datetime_utils import serialize_datetime
from ...core.jsonable_encoder import jsonable_encoder
from ...core.pydantic_utilities import pydantic_v1
from ...core.remove_none_from_dict import remove_none_from_dict
from ...core.request_options import RequestOptions
from ..commons.errors.access_denied_error import AccessDeniedError
from ..commons.errors.error import Error
from ..commons.errors.method_not_allowed_error import MethodNotAllowedError
from ..commons.errors.not_found_error import NotFoundError
from ..commons.errors.unauthorized_error import UnauthorizedError
from ..commons.types.observations_view import ObservationsView
from .types.observations_views import ObservationsViews
class ObservationsClient:
def __init__(self, *, client_wrapper: SyncClientWrapper):
self._client_wrapper = client_wrapper
def get(
self,
observation_id: str,
*,
request_options: typing.Optional[RequestOptions] = None,
) -> ObservationsView:
"""
Get a observation
Parameters:
- observation_id: str. The unique langfuse identifier of an observation, can be an event, span or generation
- request_options: typing.Optional[RequestOptions]. Request-specific configuration.
---
from finto.client import FernLangfuse
client = FernLangfuse(
x_langfuse_sdk_name="YOUR_X_LANGFUSE_SDK_NAME",
x_langfuse_sdk_version="YOUR_X_LANGFUSE_SDK_VERSION",
x_langfuse_public_key="YOUR_X_LANGFUSE_PUBLIC_KEY",
username="YOUR_USERNAME",
password="YOUR_PASSWORD",
base_url="https://yourhost.com/path/to/api",
)
client.observations.get(
observation_id="string",
)
"""
_response = self._client_wrapper.httpx_client.request(
"GET",
urllib.parse.urljoin(
f"{self._client_wrapper.get_base_url()}/",
f"api/public/observations/{jsonable_encoder(observation_id)}",
),
params=jsonable_encoder(
request_options.get("additional_query_parameters")
if request_options is not None
else None
),
headers=jsonable_encoder(
remove_none_from_dict(
{
**self._client_wrapper.get_headers(),
**(
request_options.get("additional_headers", {})
if request_options is not None
else {}
),
}
)
),
timeout=request_options.get("timeout_in_seconds")
if request_options is not None
and request_options.get("timeout_in_seconds") is not None
else self._client_wrapper.get_timeout(),
retries=0,
max_retries=request_options.get("max_retries")
if request_options is not None
else 0, # type: ignore
)
if 200 <= _response.status_code < 300:
return pydantic_v1.parse_obj_as(ObservationsView, _response.json()) # type: ignore
if _response.status_code == 400:
raise Error(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore
if _response.status_code == 401:
raise UnauthorizedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 403:
raise AccessDeniedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 405:
raise MethodNotAllowedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 404:
raise NotFoundError(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore
try:
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_many(
self,
*,
page: typing.Optional[int] = None,
limit: typing.Optional[int] = None,
name: typing.Optional[str] = None,
user_id: typing.Optional[str] = None,
type: typing.Optional[str] = None,
trace_id: typing.Optional[str] = None,
parent_observation_id: typing.Optional[str] = None,
from_start_time: typing.Optional[dt.datetime] = None,
to_start_time: typing.Optional[dt.datetime] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> ObservationsViews:
"""
Get a list of observations
Parameters:
- page: typing.Optional[int]. Page number, starts at 1.
- limit: typing.Optional[int]. Limit of items per page. If you encounter api issues due to too large page sizes, try to reduce the limit.
- name: typing.Optional[str].
- user_id: typing.Optional[str].
- type: typing.Optional[str].
- trace_id: typing.Optional[str].
- parent_observation_id: typing.Optional[str].
- from_start_time: typing.Optional[dt.datetime]. Retrieve only observations with a start_time or or after this datetime (ISO 8601).
- to_start_time: typing.Optional[dt.datetime]. Retrieve only observations with a start_time before this datetime (ISO 8601).
- request_options: typing.Optional[RequestOptions]. Request-specific configuration.
---
import datetime
from finto.client import FernLangfuse
client = FernLangfuse(
x_langfuse_sdk_name="YOUR_X_LANGFUSE_SDK_NAME",
x_langfuse_sdk_version="YOUR_X_LANGFUSE_SDK_VERSION",
x_langfuse_public_key="YOUR_X_LANGFUSE_PUBLIC_KEY",
username="YOUR_USERNAME",
password="YOUR_PASSWORD",
base_url="https://yourhost.com/path/to/api",
)
client.observations.get_many(
page=1,
limit=1,
name="string",
user_id="string",
type="string",
trace_id="string",
parent_observation_id="string",
from_start_time=datetime.datetime.fromisoformat(
"2024-01-15 09:30:00+00:00",
),
to_start_time=datetime.datetime.fromisoformat(
"2024-01-15 09:30:00+00:00",
),
)
"""
_response = self._client_wrapper.httpx_client.request(
"GET",
urllib.parse.urljoin(
f"{self._client_wrapper.get_base_url()}/", "api/public/observations"
),
params=jsonable_encoder(
remove_none_from_dict(
{
"page": page,
"limit": limit,
"name": name,
"userId": user_id,
"type": type,
"traceId": trace_id,
"parentObservationId": parent_observation_id,
"fromStartTime": serialize_datetime(from_start_time)
if from_start_time is not None
else None,
"toStartTime": serialize_datetime(to_start_time)
if to_start_time is not None
else None,
**(
request_options.get("additional_query_parameters", {})
if request_options is not None
else {}
),
}
)
),
headers=jsonable_encoder(
remove_none_from_dict(
{
**self._client_wrapper.get_headers(),
**(
request_options.get("additional_headers", {})
if request_options is not None
else {}
),
}
)
),
timeout=request_options.get("timeout_in_seconds")
if request_options is not None
and request_options.get("timeout_in_seconds") is not None
else self._client_wrapper.get_timeout(),
retries=0,
max_retries=request_options.get("max_retries")
if request_options is not None
else 0, # type: ignore
)
if 200 <= _response.status_code < 300:
return pydantic_v1.parse_obj_as(ObservationsViews, _response.json()) # type: ignore
if _response.status_code == 400:
raise Error(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore
if _response.status_code == 401:
raise UnauthorizedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 403:
raise AccessDeniedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 405:
raise MethodNotAllowedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 404:
raise NotFoundError(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore
try:
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
class AsyncObservationsClient:
def __init__(self, *, client_wrapper: AsyncClientWrapper):
self._client_wrapper = client_wrapper
async def get(
self,
observation_id: str,
*,
request_options: typing.Optional[RequestOptions] = None,
) -> ObservationsView:
"""
Get a observation
Parameters:
- observation_id: str. The unique langfuse identifier of an observation, can be an event, span or generation
- request_options: typing.Optional[RequestOptions]. Request-specific configuration.
---
from finto.client import AsyncFernLangfuse
client = AsyncFernLangfuse(
x_langfuse_sdk_name="YOUR_X_LANGFUSE_SDK_NAME",
x_langfuse_sdk_version="YOUR_X_LANGFUSE_SDK_VERSION",
x_langfuse_public_key="YOUR_X_LANGFUSE_PUBLIC_KEY",
username="YOUR_USERNAME",
password="YOUR_PASSWORD",
base_url="https://yourhost.com/path/to/api",
)
await client.observations.get(
observation_id="string",
)
"""
_response = await self._client_wrapper.httpx_client.request(
"GET",
urllib.parse.urljoin(
f"{self._client_wrapper.get_base_url()}/",
f"api/public/observations/{jsonable_encoder(observation_id)}",
),
params=jsonable_encoder(
request_options.get("additional_query_parameters")
if request_options is not None
else None
),
headers=jsonable_encoder(
remove_none_from_dict(
{
**self._client_wrapper.get_headers(),
**(
request_options.get("additional_headers", {})
if request_options is not None
else {}
),
}
)
),
timeout=request_options.get("timeout_in_seconds")
if request_options is not None
and request_options.get("timeout_in_seconds") is not None
else self._client_wrapper.get_timeout(),
retries=0,
max_retries=request_options.get("max_retries")
if request_options is not None
else 0, # type: ignore
)
if 200 <= _response.status_code < 300:
return pydantic_v1.parse_obj_as(ObservationsView, _response.json()) # type: ignore
if _response.status_code == 400:
raise Error(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore
if _response.status_code == 401:
raise UnauthorizedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 403:
raise AccessDeniedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 405:
raise MethodNotAllowedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 404:
raise NotFoundError(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore
try:
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_many(
self,
*,
page: typing.Optional[int] = None,
limit: typing.Optional[int] = None,
name: typing.Optional[str] = None,
user_id: typing.Optional[str] = None,
type: typing.Optional[str] = None,
trace_id: typing.Optional[str] = None,
parent_observation_id: typing.Optional[str] = None,
from_start_time: typing.Optional[dt.datetime] = None,
to_start_time: typing.Optional[dt.datetime] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> ObservationsViews:
"""
Get a list of observations
Parameters:
- page: typing.Optional[int]. Page number, starts at 1.
- limit: typing.Optional[int]. Limit of items per page. If you encounter api issues due to too large page sizes, try to reduce the limit.
- name: typing.Optional[str].
- user_id: typing.Optional[str].
- type: typing.Optional[str].
- trace_id: typing.Optional[str].
- parent_observation_id: typing.Optional[str].
- from_start_time: typing.Optional[dt.datetime]. Retrieve only observations with a start_time or or after this datetime (ISO 8601).
- to_start_time: typing.Optional[dt.datetime]. Retrieve only observations with a start_time before this datetime (ISO 8601).
- request_options: typing.Optional[RequestOptions]. Request-specific configuration.
---
import datetime
from finto.client import AsyncFernLangfuse
client = AsyncFernLangfuse(
x_langfuse_sdk_name="YOUR_X_LANGFUSE_SDK_NAME",
x_langfuse_sdk_version="YOUR_X_LANGFUSE_SDK_VERSION",
x_langfuse_public_key="YOUR_X_LANGFUSE_PUBLIC_KEY",
username="YOUR_USERNAME",
password="YOUR_PASSWORD",
base_url="https://yourhost.com/path/to/api",
)
await client.observations.get_many(
page=1,
limit=1,
name="string",
user_id="string",
type="string",
trace_id="string",
parent_observation_id="string",
from_start_time=datetime.datetime.fromisoformat(
"2024-01-15 09:30:00+00:00",
),
to_start_time=datetime.datetime.fromisoformat(
"2024-01-15 09:30:00+00:00",
),
)
"""
_response = await self._client_wrapper.httpx_client.request(
"GET",
urllib.parse.urljoin(
f"{self._client_wrapper.get_base_url()}/", "api/public/observations"
),
params=jsonable_encoder(
remove_none_from_dict(
{
"page": page,
"limit": limit,
"name": name,
"userId": user_id,
"type": type,
"traceId": trace_id,
"parentObservationId": parent_observation_id,
"fromStartTime": serialize_datetime(from_start_time)
if from_start_time is not None
else None,
"toStartTime": serialize_datetime(to_start_time)
if to_start_time is not None
else None,
**(
request_options.get("additional_query_parameters", {})
if request_options is not None
else {}
),
}
)
),
headers=jsonable_encoder(
remove_none_from_dict(
{
**self._client_wrapper.get_headers(),
**(
request_options.get("additional_headers", {})
if request_options is not None
else {}
),
}
)
),
timeout=request_options.get("timeout_in_seconds")
if request_options is not None
and request_options.get("timeout_in_seconds") is not None
else self._client_wrapper.get_timeout(),
retries=0,
max_retries=request_options.get("max_retries")
if request_options is not None
else 0, # type: ignore
)
if 200 <= _response.status_code < 300:
return pydantic_v1.parse_obj_as(ObservationsViews, _response.json()) # type: ignore
if _response.status_code == 400:
raise Error(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore
if _response.status_code == 401:
raise UnauthorizedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 403:
raise AccessDeniedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 405:
raise MethodNotAllowedError(
pydantic_v1.parse_obj_as(typing.Any, _response.json())
) # type: ignore
if _response.status_code == 404:
raise NotFoundError(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore
try:
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)