"""@private""" from asyncio import Queue from datetime import date, datetime from dataclasses import is_dataclass, asdict import enum from json import JSONEncoder from typing import Any from uuid import UUID from collections.abc import Sequence from langfuse.api.core import serialize_datetime from pathlib import Path from pydantic import BaseModel # Attempt to import Serializable try: from langchain.load.serializable import Serializable except ImportError: # If Serializable is not available, set it to NoneType Serializable = type(None) class EventSerializer(JSONEncoder): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.seen = set() # Track seen objects to detect circular references def default(self, obj: Any): if isinstance(obj, (datetime)): # Timezone-awareness check return serialize_datetime(obj) if isinstance(obj, (Exception, KeyboardInterrupt)): return f"{type(obj).__name__}: {str(obj)}" # LlamaIndex StreamingAgentChatResponse and StreamingResponse is not serializable by default as it is a generator # Attention: These LlamaIndex objects are a also a dataclasses, so check for it first if "Streaming" in type(obj).__name__: return str(obj) if isinstance(obj, enum.Enum): return obj.value if isinstance(obj, Queue): return type(obj).__name__ if is_dataclass(obj): return asdict(obj) if isinstance(obj, UUID): return str(obj) if isinstance(obj, bytes): return obj.decode("utf-8") if isinstance(obj, (date)): return obj.isoformat() if isinstance(obj, BaseModel): return obj.dict() if isinstance(obj, Path): return str(obj) # if langchain is not available, the Serializable type is NoneType if Serializable is not None and isinstance(obj, Serializable): return obj.to_json() # Standard JSON-encodable types if isinstance(obj, (str, int, float, type(None))): return obj if isinstance(obj, (tuple, set, frozenset)): return list(obj) if isinstance(obj, dict): return {self.default(k): self.default(v) for k, v in obj.items()} if isinstance(obj, list): return [self.default(item) for item in obj] # Important: this needs to be always checked after str and bytes types # Useful for serializing protobuf messages if isinstance(obj, Sequence): return [self.default(item) for item in obj] if hasattr(obj, "__slots__"): return self.default( {slot: getattr(obj, slot, None) for slot in obj.__slots__} ) elif hasattr(obj, "__dict__"): obj_id = id(obj) if obj_id in self.seen: # Break on circular references return type(obj).__name__ else: self.seen.add(obj_id) result = {k: self.default(v) for k, v in vars(obj).items()} self.seen.remove(obj_id) return result else: # Return object type rather than JSONEncoder.default(obj) which simply raises a TypeError return f"<{type(obj).__name__}>" def encode(self, obj: Any) -> str: self.seen.clear() # Clear seen objects before each encode call try: return super().encode(obj) except Exception: return f'"<not serializable object of type: {type(obj).__name__}>"' # escaping the string to avoid JSON parsing errors
Memory