import json import logging from threading import Thread import backoff import monotonic from posthog.request import APIError, DatetimeSerializer, batch_post try: from queue import Empty except ImportError: from Queue import Empty MAX_MSG_SIZE = 900 * 1024 # 900KiB per event # The maximum request body size is currently 20MiB, let's be conservative # in case we want to lower it in the future. BATCH_SIZE_LIMIT = 5 * 1024 * 1024 class Consumer(Thread): """Consumes the messages from the client's queue.""" log = logging.getLogger("posthog") def __init__( self, queue, api_key, flush_at=100, host=None, on_error=None, flush_interval=0.5, gzip=False, retries=10, timeout=15, historical_migration=False, ): """Create a consumer thread.""" Thread.__init__(self) # Make consumer a daemon thread so that it doesn't block program exit self.daemon = True self.flush_at = flush_at self.flush_interval = flush_interval self.api_key = api_key self.host = host self.on_error = on_error self.queue = queue self.gzip = gzip # It's important to set running in the constructor: if we are asked to # pause immediately after construction, we might set running to True in # run() *after* we set it to False in pause... and keep running # forever. self.running = True self.retries = retries self.timeout = timeout self.historical_migration = historical_migration def run(self): """Runs the consumer.""" self.log.debug("consumer is running...") while self.running: self.upload() self.log.debug("consumer exited.") def pause(self): """Pause the consumer.""" self.running = False def upload(self): """Upload the next batch of items, return whether successful.""" success = False batch = self.next() if len(batch) == 0: return False try: self.request(batch) success = True except Exception as e: self.log.error("error uploading: %s", e) success = False if self.on_error: self.on_error(e, batch) finally: # mark items as acknowledged from queue for item in batch: self.queue.task_done() return success def next(self): """Return the next batch of items to upload.""" queue = self.queue items = [] start_time = monotonic.monotonic() total_size = 0 while len(items) < self.flush_at: elapsed = monotonic.monotonic() - start_time if elapsed >= self.flush_interval: break try: item = queue.get(block=True, timeout=self.flush_interval - elapsed) item_size = len(json.dumps(item, cls=DatetimeSerializer).encode()) if item_size > MAX_MSG_SIZE: self.log.error("Item exceeds 900kib limit, dropping. (%s)", str(item)) continue items.append(item) total_size += item_size if total_size >= BATCH_SIZE_LIMIT: self.log.debug("hit batch size limit (size: %d)", total_size) break except Empty: break return items def request(self, batch): """Attempt to upload the batch and retry before raising an error""" def fatal_exception(exc): if isinstance(exc, APIError): # retry on server errors and client errors # with 429 status code (rate limited), # don't retry on other client errors if exc.status == "N/A": return False return (400 <= exc.status < 500) and exc.status != 429 else: # retry on all other errors (eg. network) return False @backoff.on_exception(backoff.expo, Exception, max_tries=self.retries + 1, giveup=fatal_exception) def send_request(): batch_post( self.api_key, self.host, gzip=self.gzip, timeout=self.timeout, batch=batch, historical_migration=self.historical_migration, ) send_request()
Memory