import sys from enum import IntEnum from libc.stdint cimport uint8_t from av.error cimport err_check from av.sidedata.sidedata cimport get_display_rotation from av.utils cimport check_ndarray from av.video.format cimport get_pix_fmt, get_video_format from av.video.plane cimport VideoPlane cdef object _cinit_bypass_sentinel # `pix_fmt`s supported by Frame.to_ndarray() and Frame.from_ndarray() supported_np_pix_fmts = { "abgr", "argb", "bayer_bggr16be", "bayer_bggr16le", "bayer_bggr8", "bayer_gbrg16be", "bayer_gbrg16le", "bayer_gbrg8", "bayer_grbg16be", "bayer_grbg16le", "bayer_grbg8", "bayer_rggb16be", "bayer_rggb16le", "bayer_rggb8", "bgr24", "bgr8", "bgra", "gbrapf32be", "gbrapf32le", "gbrp", "gbrp10be", "gbrp10le", "gbrp12be", "gbrp12le", "gbrp14be", "gbrp14le", "gbrp16be", "gbrp16le", "gbrpf32be", "gbrpf32le", "gray", "gray16be", "gray16le", "gray8", "grayf32be", "grayf32le", "nv12", "pal8", "rgb24", "rgb48be", "rgb48le", "rgb8", "rgba", "rgba64be", "rgba64le", "yuv420p", "yuv420p10le", "yuv422p10le", "yuv444p", "yuv444p16be", "yuv444p16le", "yuva444p16be", "yuva444p16le", "yuvj420p", "yuvj444p", "yuyv422", } cdef VideoFrame alloc_video_frame(): """Get a mostly uninitialized VideoFrame. You MUST call VideoFrame._init(...) or VideoFrame._init_user_attributes() before exposing to the user. """ return VideoFrame.__new__(VideoFrame, _cinit_bypass_sentinel) class PictureType(IntEnum): NONE = lib.AV_PICTURE_TYPE_NONE # Undefined I = lib.AV_PICTURE_TYPE_I # Intra P = lib.AV_PICTURE_TYPE_P # Predicted B = lib.AV_PICTURE_TYPE_B # Bi-directional predicted S = lib.AV_PICTURE_TYPE_S # S(GMC)-VOP MPEG-4 SI = lib.AV_PICTURE_TYPE_SI # Switching intra SP = lib.AV_PICTURE_TYPE_SP # Switching predicted BI = lib.AV_PICTURE_TYPE_BI # BI type cdef byteswap_array(array, bint big_endian): if (sys.byteorder == "big") != big_endian: return array.byteswap() else: return array cdef copy_bytes_to_plane(img_bytes, VideoPlane plane, unsigned int bytes_per_pixel, bint flip_horizontal, bint flip_vertical): cdef const uint8_t[:] i_buf = img_bytes cdef size_t i_pos = 0 cdef size_t i_stride = plane.width * bytes_per_pixel cdef uint8_t[:] o_buf = plane cdef size_t o_pos = 0 cdef size_t o_stride = abs(plane.line_size) cdef int start_row, end_row, step if flip_vertical: start_row = plane.height - 1 end_row = -1 step = -1 else: start_row = 0 end_row = plane.height step = 1 cdef int i, j for row in range(start_row, end_row, step): i_pos = row * i_stride if flip_horizontal: for i in range(0, i_stride, bytes_per_pixel): for j in range(bytes_per_pixel): o_buf[o_pos + i + j] = i_buf[i_pos + i_stride - i - bytes_per_pixel + j] else: o_buf[o_pos:o_pos + i_stride] = i_buf[i_pos:i_pos + i_stride] o_pos += o_stride cdef copy_array_to_plane(array, VideoPlane plane, unsigned int bytes_per_pixel): cdef bytes imgbytes = array.tobytes() copy_bytes_to_plane(imgbytes, plane, bytes_per_pixel, False, False) cdef useful_array(VideoPlane plane, unsigned int bytes_per_pixel=1, str dtype="uint8"): """ Return the useful part of the VideoPlane as a single dimensional array. We are simply discarding any padding which was added for alignment. """ import numpy as np cdef size_t total_line_size = abs(plane.line_size) cdef size_t useful_line_size = plane.width * bytes_per_pixel arr = np.frombuffer(plane, np.uint8) if total_line_size != useful_line_size: arr = arr.reshape(-1, total_line_size)[:, 0:useful_line_size].reshape(-1) return arr.view(np.dtype(dtype)) cdef check_ndarray_shape(object array, bint ok): if not ok: raise ValueError(f"Unexpected numpy array shape `{array.shape}`") cdef class VideoFrame(Frame): def __cinit__(self, width=0, height=0, format="yuv420p"): if width is _cinit_bypass_sentinel: return cdef lib.AVPixelFormat c_format = get_pix_fmt(format) self._init(c_format, width, height) cdef _init(self, lib.AVPixelFormat format, unsigned int width, unsigned int height): cdef int res = 0 with nogil: self.ptr.width = width self.ptr.height = height self.ptr.format = format # We enforce aligned buffers, otherwise `sws_scale` can perform # poorly or even cause out-of-bounds reads and writes. if width and height: res = lib.av_image_alloc( self.ptr.data, self.ptr.linesize, width, height, format, 16 ) self._buffer = self.ptr.data[0] if res: err_check(res) self._init_user_attributes() cdef _init_user_attributes(self): self.format = get_video_format(<lib.AVPixelFormat>self.ptr.format, self.ptr.width, self.ptr.height) def __dealloc__(self): # The `self._buffer` member is only set if *we* allocated the buffer in `_init`, # as opposed to a buffer allocated by a decoder. lib.av_freep(&self._buffer) # Let go of the reference from the numpy buffers if we made one self._np_buffer = None def __repr__(self): return ( f"<av.{self.__class__.__name__}, pts={self.pts} {self.format.name} " f"{self.width}x{self.height} at 0x{id(self):x}>" ) @property def planes(self): """ A tuple of :class:`.VideoPlane` objects. """ # We need to detect which planes actually exist, but also contrain # ourselves to the maximum plane count (as determined only by VideoFrames # so far), in case the library implementation does not set the last # plane to NULL. cdef int max_plane_count = 0 for i in range(self.format.ptr.nb_components): count = self.format.ptr.comp[i].plane + 1 if max_plane_count < count: max_plane_count = count if self.format.name == "pal8": max_plane_count = 2 cdef int plane_count = 0 while plane_count < max_plane_count and self.ptr.extended_data[plane_count]: plane_count += 1 return tuple([VideoPlane(self, i) for i in range(plane_count)]) @property def width(self): """Width of the image, in pixels.""" return self.ptr.width @property def height(self): """Height of the image, in pixels.""" return self.ptr.height @property def rotation(self): """The rotation component of the `DISPLAYMATRIX` transformation matrix. Returns: int: The angle (in degrees) by which the transformation rotates the frame counterclockwise. The angle will be in range [-180, 180]. """ return get_display_rotation(self) @property def interlaced_frame(self): """Is this frame an interlaced or progressive?""" return bool(self.ptr.flags & lib.AV_FRAME_FLAG_INTERLACED) @property def pict_type(self): """Returns an integer that corresponds to the PictureType enum. Wraps :ffmpeg:`AVFrame.pict_type` :type: int """ return self.ptr.pict_type @pict_type.setter def pict_type(self, value): self.ptr.pict_type = value @property def colorspace(self): """Colorspace of frame. Wraps :ffmpeg:`AVFrame.colorspace`. """ return self.ptr.colorspace @colorspace.setter def colorspace(self, value): self.ptr.colorspace = value @property def color_range(self): """Color range of frame. Wraps :ffmpeg:`AVFrame.color_range`. """ return self.ptr.color_range @color_range.setter def color_range(self, value): self.ptr.color_range = value def reformat(self, *args, **kwargs): """reformat(width=None, height=None, format=None, src_colorspace=None, dst_colorspace=None, interpolation=None) Create a new :class:`VideoFrame` with the given width/height/format/colorspace. .. seealso:: :meth:`.VideoReformatter.reformat` for arguments. """ if not self.reformatter: self.reformatter = VideoReformatter() return self.reformatter.reformat(self, *args, **kwargs) def to_rgb(self, **kwargs): """Get an RGB version of this frame. Any ``**kwargs`` are passed to :meth:`.VideoReformatter.reformat`. >>> frame = VideoFrame(1920, 1080) >>> frame.format.name 'yuv420p' >>> frame.to_rgb().format.name 'rgb24' """ return self.reformat(format="rgb24", **kwargs) def to_image(self, **kwargs): """Get an RGB ``PIL.Image`` of this frame. Any ``**kwargs`` are passed to :meth:`.VideoReformatter.reformat`. .. note:: PIL or Pillow must be installed. """ from PIL import Image cdef VideoPlane plane = self.reformat(format="rgb24", **kwargs).planes[0] cdef const uint8_t[:] i_buf = plane cdef size_t i_pos = 0 cdef size_t i_stride = plane.line_size cdef size_t o_pos = 0 cdef size_t o_stride = plane.width * 3 cdef size_t o_size = plane.height * o_stride cdef bytearray o_buf = bytearray(o_size) while o_pos < o_size: o_buf[o_pos:o_pos + o_stride] = i_buf[i_pos:i_pos + o_stride] i_pos += i_stride o_pos += o_stride return Image.frombytes("RGB", (plane.width, plane.height), bytes(o_buf), "raw", "RGB", 0, 1) def to_ndarray(self, channel_last=False, **kwargs): """Get a numpy array of this frame. Any ``**kwargs`` are passed to :meth:`.VideoReformatter.reformat`. The array returned is generally of dimension (height, width, channels). :param bool channel_last: If True, the shape of array will be (height, width, channels) rather than (channels, height, width) for the "yuv444p" and "yuvj444p" formats. .. note:: Numpy must be installed. .. note:: For formats which return an array of ``uint16`` or ``float32``, the samples will be in the system's native byte order. .. note:: For ``pal8``, an ``(image, palette)`` tuple will be returned, with the palette being in ARGB (PyAV will swap bytes if needed). .. note:: For ``gbrp`` formats, channels are flipped to RGB order. """ cdef VideoFrame frame = self.reformat(**kwargs) import numpy as np # check size if frame.format.name in {"yuv420p", "yuvj420p", "yuyv422", "yuv420p10le", "yuv422p10le"}: assert frame.width % 2 == 0, "the width has to be even for this pixel format" assert frame.height % 2 == 0, "the height has to be even for this pixel format" # cases planes are simply concatenated in shape (height, width, channels) itemsize, dtype = { "abgr": (4, "uint8"), "argb": (4, "uint8"), "bayer_bggr8": (1, "uint8"), "bayer_gbrg8": (1, "uint8"), "bayer_grbg8": (1, "uint8"), "bayer_rggb8": (1, "uint8"), "bayer_bggr16le": (2, "uint16"), "bayer_bggr16be": (2, "uint16"), "bayer_gbrg16le": (2, "uint16"), "bayer_gbrg16be": (2, "uint16"), "bayer_grbg16le": (2, "uint16"), "bayer_grbg16be": (2, "uint16"), "bayer_rggb16le": (2, "uint16"), "bayer_rggb16be": (2, "uint16"), "bgr24": (3, "uint8"), "bgr8": (1, "uint8"), "bgra": (4, "uint8"), "gbrapf32be": (4, "float32"), "gbrapf32le": (4, "float32"), "gbrp": (1, "uint8"), "gbrp10be": (2, "uint16"), "gbrp10le": (2, "uint16"), "gbrp12be": (2, "uint16"), "gbrp12le": (2, "uint16"), "gbrp14be": (2, "uint16"), "gbrp14le": (2, "uint16"), "gbrp16be": (2, "uint16"), "gbrp16le": (2, "uint16"), "gbrpf32be": (4, "float32"), "gbrpf32le": (4, "float32"), "gray": (1, "uint8"), "gray16be": (2, "uint16"), "gray16le": (2, "uint16"), "gray8": (1, "uint8"), "grayf32be": (4, "float32"), "grayf32le": (4, "float32"), "rgb24": (3, "uint8"), "rgb48be": (6, "uint16"), "rgb48le": (6, "uint16"), "rgb8": (1, "uint8"), "rgba": (4, "uint8"), "rgba64be": (8, "uint16"), "rgba64le": (8, "uint16"), "yuv444p": (1, "uint8"), "yuv444p16be": (2, "uint16"), "yuv444p16le": (2, "uint16"), "yuva444p16be": (2, "uint16"), "yuva444p16le": (2, "uint16"), "yuvj444p": (1, "uint8"), "yuyv422": (2, "uint8"), }.get(frame.format.name, (None, None)) if itemsize is not None: layers = [ useful_array(plan, itemsize, dtype) .reshape(frame.height, frame.width, -1) for plan in frame.planes ] if len(layers) == 1: # shortcut, avoid memory copy array = layers[0] else: # general case array = np.concatenate(layers, axis=2) array = byteswap_array(array, frame.format.name.endswith("be")) if array.shape[2] == 1: # skip last channel for gray images return array.squeeze(2) if frame.format.name.startswith("gbr"): # gbr -> rgb buffer = array[:, :, 0].copy() array[:, :, 0] = array[:, :, 2] array[:, :, 2] = array[:, :, 1] array[:, :, 1] = buffer if not channel_last and frame.format.name in {"yuv444p", "yuvj444p"}: array = np.moveaxis(array, 2, 0) return array # special cases if frame.format.name in {"yuv420p", "yuvj420p"}: return np.hstack([ useful_array(frame.planes[0]), useful_array(frame.planes[1]), useful_array(frame.planes[2]), ]).reshape(-1, frame.width) if frame.format.name == "yuv420p10le": # Read planes as uint16: y = useful_array(frame.planes[0], 2, "uint16").reshape(frame.height, frame.width) u = useful_array(frame.planes[1], 2, "uint16").reshape(frame.height // 2, frame.width // 2) v = useful_array(frame.planes[2], 2, "uint16").reshape(frame.height // 2, frame.width // 2) u_full = np.repeat(np.repeat(u, 2, axis=1), 2, axis=0) v_full = np.repeat(np.repeat(u, 2, axis=1), 2, axis=0) if channel_last: return np.stack([y, u_full, v_full], axis=2) return np.stack([y, u_full, v_full], axis=0) if frame.format.name == "yuv422p10le": # Read planes as uint16 at their original width y = useful_array(frame.planes[0], 2, "uint16").reshape(frame.height, frame.width) u = useful_array(frame.planes[1], 2, "uint16").reshape(frame.height, frame.width // 2) v = useful_array(frame.planes[2], 2, "uint16").reshape(frame.height, frame.width // 2) # Double the width of U and V by repeating each value u_full = np.repeat(u, 2, axis=1) v_full = np.repeat(v, 2, axis=1) if channel_last: return np.stack([y, u_full, v_full], axis=2) return np.stack([y, u_full, v_full], axis=0) if frame.format.name == "pal8": image = useful_array(frame.planes[0]).reshape(frame.height, frame.width) palette = np.frombuffer(frame.planes[1], "i4").astype(">i4").reshape(-1, 1).view(np.uint8) return image, palette if frame.format.name == "nv12": return np.hstack([ useful_array(frame.planes[0]), useful_array(frame.planes[1], 2), ]).reshape(-1, frame.width) raise ValueError( f"Conversion to numpy array with format `{frame.format.name}` is not yet supported" ) @staticmethod def from_image(img): """ Construct a frame from a ``PIL.Image``. """ if img.mode != "RGB": img = img.convert("RGB") cdef VideoFrame frame = VideoFrame(img.size[0], img.size[1], "rgb24") copy_array_to_plane(img, frame.planes[0], 3) return frame @staticmethod def from_numpy_buffer(array, format="rgb24", width=0): # Usually the width of the array is the same as the width of the image. But sometimes # this is not possible, for example with yuv420p images that have padding. These are # awkward because the UV rows at the bottom have padding bytes in the middle of the # row as well as at the end. To cope with these, callers need to be able to pass the # actual width to us. height = array.shape[0] if not width: width = array.shape[1] if format in ("rgb24", "bgr24"): check_ndarray(array, "uint8", 3) check_ndarray_shape(array, array.shape[2] == 3) if array.strides[1:] != (3, 1): raise ValueError("provided array does not have C_CONTIGUOUS rows") linesizes = (array.strides[0], ) elif format in ("rgba", "bgra"): check_ndarray(array, "uint8", 3) check_ndarray_shape(array, array.shape[2] == 4) if array.strides[1:] != (4, 1): raise ValueError("provided array does not have C_CONTIGUOUS rows") linesizes = (array.strides[0], ) elif format in ("gray", "gray8", "rgb8", "bgr8"): check_ndarray(array, "uint8", 2) if array.strides[1] != 1: raise ValueError("provided array does not have C_CONTIGUOUS rows") linesizes = (array.strides[0], ) elif format in ("yuv420p", "yuvj420p", "nv12"): check_ndarray(array, "uint8", 2) check_ndarray_shape(array, array.shape[0] % 3 == 0) check_ndarray_shape(array, array.shape[1] % 2 == 0) height = height // 6 * 4 if array.strides[1] != 1: raise ValueError("provided array does not have C_CONTIGUOUS rows") if format in ("yuv420p", "yuvj420p"): # For YUV420 planar formats, the UV plane stride is always half the Y stride. linesizes = (array.strides[0], array.strides[0] // 2, array.strides[0] // 2) else: # Planes where U and V are interleaved have the same stride as Y. linesizes = (array.strides[0], array.strides[0]) elif format in {"bayer_bggr8", "bayer_rggb8", "bayer_gbrg8", "bayer_grbg8","bayer_bggr16le", "bayer_rggb16le", "bayer_gbrg16le", "bayer_grbg16le","bayer_bggr16be", "bayer_rggb16be", "bayer_gbrg16be", "bayer_grbg16be"}: check_ndarray(array, "uint8" if format.endswith("8") else "uint16", 2) if array.strides[1] != (1 if format.endswith("8") else 2): raise ValueError("provided array does not have C_CONTIGUOUS rows") linesizes = (array.strides[0],) else: raise ValueError(f"Conversion from numpy array with format `{format}` is not yet supported") frame = alloc_video_frame() frame._image_fill_pointers_numpy(array, width, height, linesizes, format) return frame def _image_fill_pointers_numpy(self, buffer, width, height, linesizes, format): cdef lib.AVPixelFormat c_format cdef uint8_t * c_ptr cdef size_t c_data # If you want to use the numpy notation # then you need to include the following two lines at the top of the file # cimport numpy as cnp # cnp.import_array() # And add the numpy include directories to the setup.py files # hint np.get_include() # cdef cnp.ndarray[ # dtype=cnp.uint8_t, ndim=1, # negative_indices=False, mode='c'] c_buffer # c_buffer = buffer.reshape(-1) # c_ptr = &c_buffer[0] # c_ptr = <uint8_t*> (<void*>(buffer.ctypes.data)) # Using buffer.ctypes.data helps avoid any kind of # usage of the c-api from numpy, which avoid the need to add numpy # as a compile time dependency # Without this double cast, you get an error that looks like # c_ptr = <uint8_t*> (buffer.ctypes.data) # TypeError: expected bytes, int found c_data = buffer.ctypes.data c_ptr = <uint8_t*> (c_data) c_format = get_pix_fmt(format) lib.av_freep(&self._buffer) # Hold on to a reference for the numpy buffer # so that it doesn't get accidentally garbage collected self._np_buffer = buffer self.ptr.format = c_format self.ptr.width = width self.ptr.height = height for i, linesize in enumerate(linesizes): self.ptr.linesize[i] = linesize res = lib.av_image_fill_pointers( self.ptr.data, <lib.AVPixelFormat>self.ptr.format, self.ptr.height, c_ptr, self.ptr.linesize, ) if res: err_check(res) self._init_user_attributes() @staticmethod def from_ndarray(array, format="rgb24", channel_last=False): """ Construct a frame from a numpy array. :param bool channel_last: If False (default), the shape for the yuv444p and yuvj444p is given by (channels, height, width) rather than (height, width, channels). .. note:: For formats which expect an array of ``uint16``, the samples must be in the system's native byte order. .. note:: for ``pal8``, an ``(image, palette)`` pair must be passed. `palette` must have shape (256, 4) and is given in ARGB format (PyAV will swap bytes if needed). .. note:: for ``gbrp`` formats, channels are assumed to be given in RGB order. """ import numpy as np # case layers are concatenated channels, itemsize, dtype = { "yuv444p": (3, 1, "uint8"), "yuvj444p": (3, 1, "uint8"), "gbrp": (3, 1, "uint8"), "gbrp10be": (3, 2, "uint16"), "gbrp12be": (3, 2, "uint16"), "gbrp14be": (3, 2, "uint16"), "gbrp16be": (3, 2, "uint16"), "gbrp10le": (3, 2, "uint16"), "gbrp12le": (3, 2, "uint16"), "gbrp14le": (3, 2, "uint16"), "gbrp16le": (3, 2, "uint16"), "gbrpf32be": (3, 4, "float32"), "gbrpf32le": (3, 4, "float32"), "gray": (1, 1, "uint8"), "gray8": (1, 1, "uint8"), "rgb8": (1, 1, "uint8"), "bgr8": (1, 1, "uint8"), "gray16be": (1, 2, "uint16"), "gray16le": (1, 2, "uint16"), "grayf32be": (1, 4, "float32"), "grayf32le": (1, 4, "float32"), "gbrapf32be": (4, 4, "float32"), "gbrapf32le": (4, 4, "float32"), "yuv444p16be": (3, 2, "uint16"), "yuv444p16le": (3, 2, "uint16"), "yuva444p16be": (4, 2, "uint16"), "yuva444p16le": (4, 2, "uint16"), "bayer_bggr8": (1, 1, "uint8"), "bayer_rggb8": (1, 1, "uint8"), "bayer_grbg8": (1, 1, "uint8"), "bayer_gbrg8": (1, 1, "uint8"), "bayer_bggr16be": (1, 2, "uint16"), "bayer_bggr16le": (1, 2, "uint16"), "bayer_rggb16be": (1, 2, "uint16"), "bayer_rggb16le": (1, 2, "uint16"), "bayer_grbg16be": (1, 2, "uint16"), "bayer_grbg16le": (1, 2, "uint16"), "bayer_gbrg16be": (1, 2, "uint16"), "bayer_gbrg16le": (1, 2, "uint16"), }.get(format, (None, None, None)) if channels is not None: if array.ndim == 2: # (height, width) -> (height, width, 1) array = array[:, :, None] check_ndarray(array, dtype, 3) if not channel_last and format in {"yuv444p", "yuvj444p"}: array = np.moveaxis(array, 0, 2) # (channels, h, w) -> (h, w, channels) check_ndarray_shape(array, array.shape[2] == channels) array = byteswap_array(array, format.endswith("be")) frame = VideoFrame(array.shape[1], array.shape[0], format) if frame.format.name.startswith("gbr"): # rgb -> gbr array = np.concatenate([ # not inplace to avoid bad surprises array[:, :, 1:3], array[:, :, 0:1], array[:, :, 3:], ], axis=2) for i in range(channels): copy_array_to_plane(array[:, :, i], frame.planes[i], itemsize) return frame # other cases if format == "pal8": array, palette = array check_ndarray(array, "uint8", 2) check_ndarray(palette, "uint8", 2) check_ndarray_shape(palette, palette.shape == (256, 4)) frame = VideoFrame(array.shape[1], array.shape[0], format) copy_array_to_plane(array, frame.planes[0], 1) frame.planes[1].update(palette.view(">i4").astype("i4").tobytes()) return frame elif format in {"yuv420p", "yuvj420p"}: check_ndarray(array, "uint8", 2) check_ndarray_shape(array, array.shape[0] % 3 == 0) check_ndarray_shape(array, array.shape[1] % 2 == 0) frame = VideoFrame(array.shape[1], (array.shape[0] * 2) // 3, format) u_start = frame.width * frame.height v_start = 5 * u_start // 4 flat = array.reshape(-1) copy_array_to_plane(flat[0:u_start], frame.planes[0], 1) copy_array_to_plane(flat[u_start:v_start], frame.planes[1], 1) copy_array_to_plane(flat[v_start:], frame.planes[2], 1) return frame elif format == "yuv420p10le": if not isinstance(array, np.ndarray) or array.dtype != np.uint16: raise ValueError("Array must be uint16 type") # Convert to channel-first if needed: if channel_last and array.shape[2] == 3: array = np.moveaxis(array, 2, 0) elif not (array.shape[0] == 3): raise ValueError("Array must have shape (3, height, width) or (height, width, 3)") height, width = array.shape[1:] if width % 2 != 0 or height % 2 != 0: raise ValueError("Width and height must be even") frame = VideoFrame(width, height, format) copy_array_to_plane(array[0], frame.planes[0], 2) # Subsample U and V by taking every other row and column: u = array[1, ::2, ::2].copy() # Need copy to ensure C-contiguous v = array[2, ::2, ::2].copy() # Need copy to ensure C-contiguous copy_array_to_plane(u, frame.planes[1], 2) copy_array_to_plane(v, frame.planes[2], 2) return frame elif format == "yuv422p10le": if not isinstance(array, np.ndarray) or array.dtype != np.uint16: raise ValueError("Array must be uint16 type") # Convert to channel-first if needed if channel_last and array.shape[2] == 3: array = np.moveaxis(array, 2, 0) elif not (array.shape[0] == 3): raise ValueError("Array must have shape (3, height, width) or (height, width, 3)") height, width = array.shape[1:] if width % 2 != 0 or height % 2 != 0: raise ValueError("Width and height must be even") frame = VideoFrame(width, height, format) copy_array_to_plane(array[0], frame.planes[0], 2) # Subsample U and V by taking every other column u = array[1, :, ::2].copy() # Need copy to ensure C-contiguous v = array[2, :, ::2].copy() # Need copy to ensure C-contiguous copy_array_to_plane(u, frame.planes[1], 2) copy_array_to_plane(v, frame.planes[2], 2) return frame elif format == "yuyv422": check_ndarray(array, "uint8", 3) check_ndarray_shape(array, array.shape[0] % 2 == 0) check_ndarray_shape(array, array.shape[1] % 2 == 0) check_ndarray_shape(array, array.shape[2] == 2) elif format in {"rgb24", "bgr24"}: check_ndarray(array, "uint8", 3) check_ndarray_shape(array, array.shape[2] == 3) elif format in {"argb", "rgba", "abgr", "bgra"}: check_ndarray(array, "uint8", 3) check_ndarray_shape(array, array.shape[2] == 4) elif format in {"rgb48be", "rgb48le"}: check_ndarray(array, "uint16", 3) check_ndarray_shape(array, array.shape[2] == 3) frame = VideoFrame(array.shape[1], array.shape[0], format) copy_array_to_plane(byteswap_array(array, format.endswith("be")), frame.planes[0], 6) return frame elif format in {"rgba64be", "rgba64le"}: check_ndarray(array, "uint16", 3) check_ndarray_shape(array, array.shape[2] == 4) frame = VideoFrame(array.shape[1], array.shape[0], format) copy_array_to_plane(byteswap_array(array, format.endswith("be")), frame.planes[0], 8) return frame elif format == "nv12": check_ndarray(array, "uint8", 2) check_ndarray_shape(array, array.shape[0] % 3 == 0) check_ndarray_shape(array, array.shape[1] % 2 == 0) frame = VideoFrame(array.shape[1], (array.shape[0] * 2) // 3, format) uv_start = frame.width * frame.height flat = array.reshape(-1) copy_array_to_plane(flat[:uv_start], frame.planes[0], 1) copy_array_to_plane(flat[uv_start:], frame.planes[1], 2) return frame else: raise ValueError(f"Conversion from numpy array with format `{format}` is not yet supported") frame = VideoFrame(array.shape[1], array.shape[0], format) copy_array_to_plane(array, frame.planes[0], 1 if array.ndim == 2 else array.shape[2]) return frame @staticmethod def from_bytes(img_bytes: bytes, width: int, height: int, format="rgba", flip_horizontal=False, flip_vertical=False): frame = VideoFrame(width, height, format) if format == "rgba": copy_bytes_to_plane(img_bytes, frame.planes[0], 4, flip_horizontal, flip_vertical) elif format in ("bayer_bggr8", "bayer_rggb8", "bayer_gbrg8", "bayer_grbg8","bayer_bggr16le", "bayer_rggb16le", "bayer_gbrg16le", "bayer_grbg16le","bayer_bggr16be", "bayer_rggb16be", "bayer_gbrg16be", "bayer_grbg16be"): copy_bytes_to_plane(img_bytes, frame.planes[0], 1 if format.endswith("8") else 2, flip_horizontal, flip_vertical) else: raise NotImplementedError(f"Format '{format}' is not supported.") return frame
Memory