# This file was auto-generated by Fern from our API Definition. import typing import urllib.parse from json.decoder import JSONDecodeError from ...core.api_error import ApiError from ...core.client_wrapper import AsyncClientWrapper, SyncClientWrapper from ...core.jsonable_encoder import jsonable_encoder from ...core.pydantic_utilities import pydantic_v1 from ...core.remove_none_from_dict import remove_none_from_dict from ...core.request_options import RequestOptions from ..commons.errors.access_denied_error import AccessDeniedError from ..commons.errors.error import Error from ..commons.errors.method_not_allowed_error import MethodNotAllowedError from ..commons.errors.not_found_error import NotFoundError from ..commons.errors.unauthorized_error import UnauthorizedError from .types.ingestion_event import IngestionEvent from .types.ingestion_response import IngestionResponse # this is used as the default value for optional parameters OMIT = typing.cast(typing.Any, ...) class IngestionClient: def __init__(self, *, client_wrapper: SyncClientWrapper): self._client_wrapper = client_wrapper def batch( self, *, batch: typing.Sequence[IngestionEvent], metadata: typing.Optional[typing.Any] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> IngestionResponse: """ Batched ingestion for Langfuse Tracing. If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement. Notes: - Batch sizes are limited to 3.5 MB in total. You need to adjust the number of events per batch accordingly. - The API does not return a 4xx status code for input errors. Instead, it responds with a 207 status code, which includes a list of the encountered errors. Parameters: - batch: typing.Sequence[IngestionEvent]. Batch of tracing events to be ingested. Discriminated by attribute `type`. - metadata: typing.Optional[typing.Any]. Optional. Metadata field used by the Langfuse SDKs for debugging. - request_options: typing.Optional[RequestOptions]. Request-specific configuration. --- import datetime from finto import IngestionEvent_TraceCreate, TraceBody from finto.client import FernLangfuse client = FernLangfuse( x_langfuse_sdk_name="YOUR_X_LANGFUSE_SDK_NAME", x_langfuse_sdk_version="YOUR_X_LANGFUSE_SDK_VERSION", x_langfuse_public_key="YOUR_X_LANGFUSE_PUBLIC_KEY", username="YOUR_USERNAME", password="YOUR_PASSWORD", base_url="https://yourhost.com/path/to/api", ) client.ingestion.batch( batch=[ IngestionEvent_TraceCreate( body=TraceBody( id="string", timestamp=datetime.datetime.fromisoformat( "2024-01-15 09:30:00+00:00", ), name="string", user_id="string", input={"key": "value"}, output={"key": "value"}, session_id="string", release="string", version="string", metadata={"key": "value"}, tags=["string"], public=True, ), id="string", timestamp="string", metadata={"key": "value"}, ) ], metadata={"key": "value"}, ) """ _request: typing.Dict[str, typing.Any] = {"batch": batch} if metadata is not OMIT: _request["metadata"] = metadata _response = self._client_wrapper.httpx_client.request( "POST", urllib.parse.urljoin( f"{self._client_wrapper.get_base_url()}/", "api/public/ingestion" ), params=jsonable_encoder( request_options.get("additional_query_parameters") if request_options is not None else None ), json=jsonable_encoder(_request) if request_options is None or request_options.get("additional_body_parameters") is None else { **jsonable_encoder(_request), **( jsonable_encoder( remove_none_from_dict( request_options.get("additional_body_parameters", {}) ) ) ), }, headers=jsonable_encoder( remove_none_from_dict( { **self._client_wrapper.get_headers(), **( request_options.get("additional_headers", {}) if request_options is not None else {} ), } ) ), timeout=request_options.get("timeout_in_seconds") if request_options is not None and request_options.get("timeout_in_seconds") is not None else self._client_wrapper.get_timeout(), retries=0, max_retries=request_options.get("max_retries") if request_options is not None else 0, # type: ignore ) if 200 <= _response.status_code < 300: return pydantic_v1.parse_obj_as(IngestionResponse, _response.json()) # type: ignore if _response.status_code == 400: raise Error(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore if _response.status_code == 401: raise UnauthorizedError( pydantic_v1.parse_obj_as(typing.Any, _response.json()) ) # type: ignore if _response.status_code == 403: raise AccessDeniedError( pydantic_v1.parse_obj_as(typing.Any, _response.json()) ) # type: ignore if _response.status_code == 405: raise MethodNotAllowedError( pydantic_v1.parse_obj_as(typing.Any, _response.json()) ) # type: ignore if _response.status_code == 404: raise NotFoundError(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore try: _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) class AsyncIngestionClient: def __init__(self, *, client_wrapper: AsyncClientWrapper): self._client_wrapper = client_wrapper async def batch( self, *, batch: typing.Sequence[IngestionEvent], metadata: typing.Optional[typing.Any] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> IngestionResponse: """ Batched ingestion for Langfuse Tracing. If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement. Notes: - Batch sizes are limited to 3.5 MB in total. You need to adjust the number of events per batch accordingly. - The API does not return a 4xx status code for input errors. Instead, it responds with a 207 status code, which includes a list of the encountered errors. Parameters: - batch: typing.Sequence[IngestionEvent]. Batch of tracing events to be ingested. Discriminated by attribute `type`. - metadata: typing.Optional[typing.Any]. Optional. Metadata field used by the Langfuse SDKs for debugging. - request_options: typing.Optional[RequestOptions]. Request-specific configuration. --- import datetime from finto import IngestionEvent_TraceCreate, TraceBody from finto.client import AsyncFernLangfuse client = AsyncFernLangfuse( x_langfuse_sdk_name="YOUR_X_LANGFUSE_SDK_NAME", x_langfuse_sdk_version="YOUR_X_LANGFUSE_SDK_VERSION", x_langfuse_public_key="YOUR_X_LANGFUSE_PUBLIC_KEY", username="YOUR_USERNAME", password="YOUR_PASSWORD", base_url="https://yourhost.com/path/to/api", ) await client.ingestion.batch( batch=[ IngestionEvent_TraceCreate( body=TraceBody( id="string", timestamp=datetime.datetime.fromisoformat( "2024-01-15 09:30:00+00:00", ), name="string", user_id="string", input={"key": "value"}, output={"key": "value"}, session_id="string", release="string", version="string", metadata={"key": "value"}, tags=["string"], public=True, ), id="string", timestamp="string", metadata={"key": "value"}, ) ], metadata={"key": "value"}, ) """ _request: typing.Dict[str, typing.Any] = {"batch": batch} if metadata is not OMIT: _request["metadata"] = metadata _response = await self._client_wrapper.httpx_client.request( "POST", urllib.parse.urljoin( f"{self._client_wrapper.get_base_url()}/", "api/public/ingestion" ), params=jsonable_encoder( request_options.get("additional_query_parameters") if request_options is not None else None ), json=jsonable_encoder(_request) if request_options is None or request_options.get("additional_body_parameters") is None else { **jsonable_encoder(_request), **( jsonable_encoder( remove_none_from_dict( request_options.get("additional_body_parameters", {}) ) ) ), }, headers=jsonable_encoder( remove_none_from_dict( { **self._client_wrapper.get_headers(), **( request_options.get("additional_headers", {}) if request_options is not None else {} ), } ) ), timeout=request_options.get("timeout_in_seconds") if request_options is not None and request_options.get("timeout_in_seconds") is not None else self._client_wrapper.get_timeout(), retries=0, max_retries=request_options.get("max_retries") if request_options is not None else 0, # type: ignore ) if 200 <= _response.status_code < 300: return pydantic_v1.parse_obj_as(IngestionResponse, _response.json()) # type: ignore if _response.status_code == 400: raise Error(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore if _response.status_code == 401: raise UnauthorizedError( pydantic_v1.parse_obj_as(typing.Any, _response.json()) ) # type: ignore if _response.status_code == 403: raise AccessDeniedError( pydantic_v1.parse_obj_as(typing.Any, _response.json()) ) # type: ignore if _response.status_code == 405: raise MethodNotAllowedError( pydantic_v1.parse_obj_as(typing.Any, _response.json()) ) # type: ignore if _response.status_code == 404: raise NotFoundError(pydantic_v1.parse_obj_as(typing.Any, _response.json())) # type: ignore try: _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json)
Memory