from av.audio.format cimport get_audio_format
from av.audio.layout cimport get_audio_layout
from av.audio.plane cimport AudioPlane
from av.error cimport err_check
from av.utils cimport check_ndarray
cdef object _cinit_bypass_sentinel
format_dtypes = {
"dbl": "f8",
"dblp": "f8",
"flt": "f4",
"fltp": "f4",
"s16": "i2",
"s16p": "i2",
"s32": "i4",
"s32p": "i4",
"u8": "u1",
"u8p": "u1",
}
cdef AudioFrame alloc_audio_frame():
"""Get a mostly uninitialized AudioFrame.
You MUST call AudioFrame._init(...) or AudioFrame._init_user_attributes()
before exposing to the user.
"""
return AudioFrame.__new__(AudioFrame, _cinit_bypass_sentinel)
cdef class AudioFrame(Frame):
"""A frame of audio."""
def __cinit__(self, format="s16", layout="stereo", samples=0, align=1):
if format is _cinit_bypass_sentinel:
return
cdef AudioFormat cy_format = AudioFormat(format)
cdef AudioLayout cy_layout = AudioLayout(layout)
self._init(cy_format.sample_fmt, cy_layout.layout, samples, align)
cdef _init(self, lib.AVSampleFormat format, lib.AVChannelLayout layout, unsigned int nb_samples, unsigned int align):
self.ptr.nb_samples = nb_samples
self.ptr.format = <int>format
self.ptr.ch_layout = layout
# Sometimes this is called twice. Oh well.
self._init_user_attributes()
if self.layout.nb_channels != 0 and nb_samples:
# Cleanup the old buffer.
lib.av_freep(&self._buffer)
# Get a new one.
self._buffer_size = err_check(lib.av_samples_get_buffer_size(
NULL,
self.layout.nb_channels,
nb_samples,
format,
align
))
self._buffer = <uint8_t *>lib.av_malloc(self._buffer_size)
if not self._buffer:
raise MemoryError("cannot allocate AudioFrame buffer")
# Connect the data pointers to the buffer.
err_check(lib.avcodec_fill_audio_frame(
self.ptr,
self.layout.nb_channels,
<lib.AVSampleFormat>self.ptr.format,
self._buffer,
self._buffer_size,
align
))
def __dealloc__(self):
lib.av_freep(&self._buffer)
cdef _init_user_attributes(self):
self.layout = get_audio_layout(self.ptr.ch_layout)
self.format = get_audio_format(<lib.AVSampleFormat>self.ptr.format)
def __repr__(self):
return (
f"<av.{self.__class__.__name__} pts={self.pts}, {self.samples} "
f"samples at {self.rate}Hz, {self.layout.name}, {self.format.name} at 0x{id(self):x}"
)
@staticmethod
def from_ndarray(array, format="s16", layout="stereo"):
"""
Construct a frame from a numpy array.
"""
import numpy as np
# map avcodec type to numpy type
try:
dtype = np.dtype(format_dtypes[format])
except KeyError:
raise ValueError(
f"Conversion from numpy array with format `{format}` is not yet supported"
)
# check input format
nb_channels = AudioLayout(layout).nb_channels
check_ndarray(array, dtype, 2)
if AudioFormat(format).is_planar:
if array.shape[0] != nb_channels:
raise ValueError(f"Expected planar `array.shape[0]` to equal `{nb_channels}` but got `{array.shape[0]}`")
samples = array.shape[1]
else:
if array.shape[0] != 1:
raise ValueError(f"Expected packed `array.shape[0]` to equal `1` but got `{array.shape[0]}`")
samples = array.shape[1] // nb_channels
frame = AudioFrame(format=format, layout=layout, samples=samples)
for i, plane in enumerate(frame.planes):
plane.update(array[i, :])
return frame
@property
def planes(self):
"""
A tuple of :class:`~av.audio.plane.AudioPlane`.
:type: tuple
"""
cdef int plane_count = 0
while self.ptr.extended_data[plane_count]:
plane_count += 1
return tuple([AudioPlane(self, i) for i in range(plane_count)])
@property
def samples(self):
"""
Number of audio samples (per channel).
:type: int
"""
return self.ptr.nb_samples
@property
def sample_rate(self):
"""
Sample rate of the audio data, in samples per second.
:type: int
"""
return self.ptr.sample_rate
@sample_rate.setter
def sample_rate(self, value):
self.ptr.sample_rate = value
@property
def rate(self):
"""Another name for :attr:`sample_rate`."""
return self.ptr.sample_rate
@rate.setter
def rate(self, value):
self.ptr.sample_rate = value
def to_ndarray(self):
"""Get a numpy array of this frame.
.. note:: Numpy must be installed.
"""
import numpy as np
try:
dtype = np.dtype(format_dtypes[self.format.name])
except KeyError:
raise ValueError(f"Conversion from {self.format.name!r} format to numpy array is not supported.")
if self.format.is_planar:
count = self.samples
else:
count = self.samples * self.layout.nb_channels
return np.vstack([np.frombuffer(x, dtype=dtype, count=count) for x in self.planes])