from av.audio.frame cimport alloc_audio_frame from av.error cimport err_check cdef class AudioFifo: """A simple audio sample FIFO (First In First Out) buffer.""" def __repr__(self): try: result = ( f"<av.{self.__class__.__name__} {self.samples} samples of " f"{self.sample_rate}hz {self.layout} {self.format} at 0x{id(self):x}>" ) except AttributeError: result = ( f"<av.{self.__class__.__name__} uninitialized, use fifo.write(frame)," f" at 0x{id(self):x}>" ) return result def __dealloc__(self): if self.ptr: lib.av_audio_fifo_free(self.ptr) cpdef write(self, AudioFrame frame): """write(frame) Push a frame of samples into the queue. :param AudioFrame frame: The frame of samples to push. The FIFO will remember the attributes from the first frame, and use those to populate all output frames. If there is a :attr:`~.Frame.pts` and :attr:`~.Frame.time_base` and :attr:`~.AudioFrame.sample_rate`, then the FIFO will assert that the incoming timestamps are continuous. """ if frame is None: raise TypeError("AudioFifo must be given an AudioFrame.") if not frame.ptr.nb_samples: return if not self.ptr: # Hold onto a copy of the attributes of the first frame to populate # output frames with. self.template = alloc_audio_frame() self.template._copy_internal_attributes(frame) self.template._init_user_attributes() # Figure out our "time_base". if frame._time_base.num and frame.ptr.sample_rate: self.pts_per_sample = frame._time_base.den / float(frame._time_base.num) self.pts_per_sample /= frame.ptr.sample_rate else: self.pts_per_sample = 0 self.ptr = lib.av_audio_fifo_alloc( <lib.AVSampleFormat>frame.ptr.format, frame.layout.nb_channels, frame.ptr.nb_samples * 2, # Just a default number of samples; it will adjust. ) if not self.ptr: raise RuntimeError("Could not allocate AVAudioFifo.") # Make sure nothing changed. elif ( frame.ptr.format != self.template.ptr.format or # TODO: frame.ptr.ch_layout != self.template.ptr.ch_layout or frame.ptr.sample_rate != self.template.ptr.sample_rate or (frame._time_base.num and self.template._time_base.num and ( frame._time_base.num != self.template._time_base.num or frame._time_base.den != self.template._time_base.den )) ): raise ValueError("Frame does not match AudioFifo parameters.") # Assert that the PTS are what we expect. cdef int64_t expected_pts if self.pts_per_sample and frame.ptr.pts != lib.AV_NOPTS_VALUE: expected_pts = <int64_t>(self.pts_per_sample * self.samples_written) if frame.ptr.pts != expected_pts: raise ValueError( "Frame.pts (%d) != expected (%d); fix or set to None." % (frame.ptr.pts, expected_pts) ) err_check(lib.av_audio_fifo_write( self.ptr, <void **>frame.ptr.extended_data, frame.ptr.nb_samples, )) self.samples_written += frame.ptr.nb_samples cpdef read(self, int samples=0, bint partial=False): """read(samples=0, partial=False) Read samples from the queue. :param int samples: The number of samples to pull; 0 gets all. :param bool partial: Allow returning less than requested. :returns: New :class:`AudioFrame` or ``None`` (if empty). If the incoming frames had valid a :attr:`~.Frame.time_base`, :attr:`~.AudioFrame.sample_rate` and :attr:`~.Frame.pts`, the returned frames will have accurate timing. """ if not self.ptr: return cdef int buffered_samples = lib.av_audio_fifo_size(self.ptr) if buffered_samples < 1: return samples = samples or buffered_samples if buffered_samples < samples: if partial: samples = buffered_samples else: return cdef AudioFrame frame = alloc_audio_frame() frame._copy_internal_attributes(self.template) frame._init( <lib.AVSampleFormat>self.template.ptr.format, <lib.AVChannelLayout>self.template.ptr.ch_layout, samples, 1, # Align? ) err_check(lib.av_audio_fifo_read( self.ptr, <void **>frame.ptr.extended_data, samples, )) if self.pts_per_sample: frame.ptr.pts = <uint64_t>(self.pts_per_sample * self.samples_read) else: frame.ptr.pts = lib.AV_NOPTS_VALUE self.samples_read += samples return frame cpdef read_many(self, int samples, bint partial=False): """read_many(samples, partial=False) Read as many frames as we can. :param int samples: How large for the frames to be. :param bool partial: If we should return a partial frame. :returns: A ``list`` of :class:`AudioFrame`. """ cdef AudioFrame frame frames = [] while True: frame = self.read(samples, partial=partial) if frame is not None: frames.append(frame) else: break return frames @property def format(self): """The :class:`.AudioFormat` of this FIFO.""" return self.template.format @property def layout(self): """The :class:`.AudioLayout` of this FIFO.""" return self.template.layout @property def sample_rate(self): return self.template.sample_rate @property def samples(self): """Number of audio samples (per channel) in the buffer.""" return lib.av_audio_fifo_size(self.ptr) if self.ptr else 0
Memory