import logging
import os
from fractions import Fraction
cimport libav as lib
from av.codec.codec cimport Codec
from av.codec.context cimport CodecContext, wrap_codec_context
from av.container.streams cimport StreamContainer
from av.dictionary cimport _Dictionary
from av.error cimport err_check
from av.packet cimport Packet
from av.stream cimport Stream, wrap_stream
from av.utils cimport dict_to_avdict, to_avrational
from av.dictionary import Dictionary
log = logging.getLogger(__name__)
cdef close_output(OutputContainer self):
self.streams = StreamContainer()
if self._started and not self._done:
# We must only ever call av_write_trailer *once*, otherwise we get a
# segmentation fault. Therefore no matter whether it succeeds or not
# we must absolutely set self._done.
try:
self.err_check(lib.av_write_trailer(self.ptr))
finally:
if self.file is None and not (self.ptr.oformat.flags & lib.AVFMT_NOFILE):
lib.avio_closep(&self.ptr.pb)
self._done = True
cdef class OutputContainer(Container):
def __cinit__(self, *args, **kwargs):
self.streams = StreamContainer()
self.metadata = {}
with nogil:
self.packet_ptr = lib.av_packet_alloc()
def __dealloc__(self):
close_output(self)
with nogil:
lib.av_packet_free(&self.packet_ptr)
def add_stream(self, codec_name, rate=None, dict options=None, **kwargs):
"""add_stream(codec_name, rate=None)
Creates a new stream from a codec name and returns it.
Supports video, audio, and subtitle streams.
:param codec_name: The name of a codec.
:type codec_name: str
:param dict options: Stream options.
:param \\**kwargs: Set attributes for the stream.
:rtype: The new :class:`~av.stream.Stream`.
"""
cdef Codec codec_obj = Codec(codec_name, "w")
cdef const lib.AVCodec *codec = codec_obj.ptr
# Assert that this format supports the requested codec.
if not lib.avformat_query_codec(self.ptr.oformat, codec.id, lib.FF_COMPLIANCE_NORMAL):
raise ValueError(
f"{self.format.name!r} format does not support {codec_obj.name!r} codec"
)
# Create new stream in the AVFormatContext, set AVCodecContext values.
cdef lib.AVStream *stream = lib.avformat_new_stream(self.ptr, codec)
cdef lib.AVCodecContext *codec_context = lib.avcodec_alloc_context3(codec)
# Now lets set some more sane video defaults
if codec.type == lib.AVMEDIA_TYPE_VIDEO:
codec_context.pix_fmt = lib.AV_PIX_FMT_YUV420P
codec_context.width = kwargs.pop("width", 640)
codec_context.height = kwargs.pop("height", 480)
codec_context.bit_rate = kwargs.pop("bit_rate", 0)
codec_context.bit_rate_tolerance = kwargs.pop("bit_rate_tolerance", 128000)
try:
to_avrational(kwargs.pop("time_base"), &codec_context.time_base)
except KeyError:
pass
to_avrational(rate or 24, &codec_context.framerate)
stream.avg_frame_rate = codec_context.framerate
stream.time_base = codec_context.time_base
# Some sane audio defaults
elif codec.type == lib.AVMEDIA_TYPE_AUDIO:
codec_context.sample_fmt = codec.sample_fmts[0]
codec_context.bit_rate = kwargs.pop("bit_rate", 0)
codec_context.bit_rate_tolerance = kwargs.pop("bit_rate_tolerance", 32000)
try:
to_avrational(kwargs.pop("time_base"), &codec_context.time_base)
except KeyError:
pass
if rate is None:
codec_context.sample_rate = 48000
elif type(rate) is int:
codec_context.sample_rate = rate
else:
raise TypeError("audio stream `rate` must be: int | None")
stream.time_base = codec_context.time_base
lib.av_channel_layout_default(&codec_context.ch_layout, 2)
# Some formats want stream headers to be separate
if self.ptr.oformat.flags & lib.AVFMT_GLOBALHEADER:
codec_context.flags |= lib.AV_CODEC_FLAG_GLOBAL_HEADER
# Initialise stream codec parameters to populate the codec type.
#
# Subsequent changes to the codec context will be applied just before
# encoding starts in `start_encoding()`.
err_check(lib.avcodec_parameters_from_context(stream.codecpar, codec_context))
# Construct the user-land stream
cdef CodecContext py_codec_context = wrap_codec_context(codec_context, codec, None)
cdef Stream py_stream = wrap_stream(self, stream, py_codec_context)
self.streams.add_stream(py_stream)
if options:
py_stream.options.update(options)
for k, v in kwargs.items():
setattr(py_stream, k, v)
return py_stream
def add_stream_from_template(self, Stream template not None, opaque=None, **kwargs):
"""
Creates a new stream from a template. Supports video, audio, and subtitle streams.
:param template: Copy codec from another :class:`~av.stream.Stream` instance.
:param opaque: If True, copy opaque data from the template's codec context.
:param \\**kwargs: Set attributes for the stream.
:rtype: The new :class:`~av.stream.Stream`.
"""
cdef const lib.AVCodec *codec
cdef Codec codec_obj
if opaque is None:
opaque = template.type != "video"
if opaque: # Copy ctx from template.
codec_obj = template.codec_context.codec
else: # Construct new codec object.
codec_obj = Codec(template.codec_context.codec.name, "w")
codec = codec_obj.ptr
# Assert that this format supports the requested codec.
if not lib.avformat_query_codec(self.ptr.oformat, codec.id, lib.FF_COMPLIANCE_NORMAL):
raise ValueError(
f"{self.format.name!r} format does not support {codec_obj.name!r} codec"
)
# Create new stream in the AVFormatContext, set AVCodecContext values.
cdef lib.AVStream *stream = lib.avformat_new_stream(self.ptr, codec)
cdef lib.AVCodecContext *codec_context = lib.avcodec_alloc_context3(codec)
err_check(lib.avcodec_parameters_to_context(codec_context, template.ptr.codecpar))
# Reset the codec tag assuming we are remuxing.
codec_context.codec_tag = 0
# Some formats want stream headers to be separate
if self.ptr.oformat.flags & lib.AVFMT_GLOBALHEADER:
codec_context.flags |= lib.AV_CODEC_FLAG_GLOBAL_HEADER
# Initialize stream codec parameters to populate the codec type. Subsequent changes to
# the codec context will be applied just before encoding starts in `start_encoding()`.
err_check(lib.avcodec_parameters_from_context(stream.codecpar, codec_context))
# Construct the user-land stream
cdef CodecContext py_codec_context = wrap_codec_context(codec_context, codec, None)
cdef Stream py_stream = wrap_stream(self, stream, py_codec_context)
self.streams.add_stream(py_stream)
for k, v in kwargs.items():
setattr(py_stream, k, v)
return py_stream
def add_data_stream(self, codec_name=None, dict options=None):
"""add_data_stream(codec_name=None)
Creates a new data stream and returns it.
:param codec_name: Optional name of the data codec (e.g. 'klv')
:type codec_name: str | None
:param dict options: Stream options.
:rtype: The new :class:`~av.data.stream.DataStream`.
"""
cdef const lib.AVCodec *codec = NULL
if codec_name is not None:
codec = lib.avcodec_find_encoder_by_name(codec_name.encode())
if codec == NULL:
raise ValueError(f"Unknown data codec: {codec_name}")
# Assert that this format supports the requested codec
if not lib.avformat_query_codec(self.ptr.oformat, codec.id, lib.FF_COMPLIANCE_NORMAL):
raise ValueError(
f"{self.format.name!r} format does not support {codec_name!r} codec"
)
# Create new stream in the AVFormatContext
cdef lib.AVStream *stream = lib.avformat_new_stream(self.ptr, codec)
if stream == NULL:
raise MemoryError("Could not allocate stream")
# Set up codec context if we have a codec
cdef lib.AVCodecContext *codec_context = NULL
if codec != NULL:
codec_context = lib.avcodec_alloc_context3(codec)
if codec_context == NULL:
raise MemoryError("Could not allocate codec context")
# Some formats want stream headers to be separate
if self.ptr.oformat.flags & lib.AVFMT_GLOBALHEADER:
codec_context.flags |= lib.AV_CODEC_FLAG_GLOBAL_HEADER
# Initialize stream codec parameters
err_check(lib.avcodec_parameters_from_context(stream.codecpar, codec_context))
else:
# For raw data streams, just set the codec type
stream.codecpar.codec_type = lib.AVMEDIA_TYPE_DATA
# Construct the user-land stream
cdef CodecContext py_codec_context = None
if codec_context != NULL:
py_codec_context = wrap_codec_context(codec_context, codec, None)
cdef Stream py_stream = wrap_stream(self, stream, py_codec_context)
self.streams.add_stream(py_stream)
if options:
py_stream.options.update(options)
return py_stream
cpdef start_encoding(self):
"""Write the file header! Called automatically."""
if self._started:
return
# TODO: This does NOT handle options coming from 3 sources.
# This is only a rough approximation of what would be cool to do.
used_options = set()
# Finalize and open all streams.
cdef Stream stream
for stream in self.streams:
ctx = stream.codec_context
# Skip codec context handling for data streams without codecs
if ctx is None:
if stream.type != "data":
raise ValueError(f"Stream {stream.index} has no codec context")
continue
if not ctx.is_open:
for k, v in self.options.items():
ctx.options.setdefault(k, v)
ctx.open()
# Track option consumption.
for k in self.options:
if k not in ctx.options:
used_options.add(k)
stream._finalize_for_output()
# Open the output file, if needed.
cdef bytes name_obj = os.fsencode(self.name if self.file is None else "")
cdef char *name = name_obj
if self.ptr.pb == NULL and not self.ptr.oformat.flags & lib.AVFMT_NOFILE:
err_check(lib.avio_open(&self.ptr.pb, name, lib.AVIO_FLAG_WRITE))
# Copy the metadata dict.
dict_to_avdict(
&self.ptr.metadata, self.metadata,
encoding=self.metadata_encoding,
errors=self.metadata_errors
)
cdef _Dictionary all_options = Dictionary(self.options, self.container_options)
cdef _Dictionary options = all_options.copy()
self.err_check(lib.avformat_write_header(self.ptr, &options.ptr))
# Track option usage...
for k in all_options:
if k not in options:
used_options.add(k)
# ... and warn if any weren't used.
unused_options = {k: v for k, v in self.options.items() if k not in used_options}
if unused_options:
log.warning("Some options were not used: %s" % unused_options)
self._started = True
@property
def supported_codecs(self):
"""
Returns a set of all codecs this format supports.
"""
result = set()
cdef const lib.AVCodec *codec = NULL
cdef void *opaque = NULL
while True:
codec = lib.av_codec_iterate(&opaque)
if codec == NULL:
break
if lib.avformat_query_codec(self.ptr.oformat, codec.id, lib.FF_COMPLIANCE_NORMAL) == 1:
result.add(codec.name)
return result
@property
def default_video_codec(self):
"""
Returns the default video codec this container recommends.
"""
return lib.avcodec_get_name(self.format.optr.video_codec)
@property
def default_audio_codec(self):
"""
Returns the default audio codec this container recommends.
"""
return lib.avcodec_get_name(self.format.optr.audio_codec)
@property
def default_subtitle_codec(self):
"""
Returns the default subtitle codec this container recommends.
"""
return lib.avcodec_get_name(self.format.optr.subtitle_codec)
def close(self):
close_output(self)
def mux(self, packets):
# We accept either a Packet, or a sequence of packets. This should
# smooth out the transition to the new encode API which returns a
# sequence of packets.
if isinstance(packets, Packet):
self.mux_one(packets)
else:
for packet in packets:
self.mux_one(packet)
def mux_one(self, Packet packet not None):
self.start_encoding()
# Assert the packet is in stream time.
if packet.ptr.stream_index < 0 or <unsigned int>packet.ptr.stream_index >= self.ptr.nb_streams:
raise ValueError("Bad Packet stream_index.")
cdef lib.AVStream *stream = self.ptr.streams[packet.ptr.stream_index]
packet._rebase_time(stream.time_base)
# Make another reference to the packet, as av_interleaved_write_frame
# takes ownership of the reference.
self.err_check(lib.av_packet_ref(self.packet_ptr, packet.ptr))
cdef int ret
with nogil:
ret = lib.av_interleaved_write_frame(self.ptr, self.packet_ptr)
self.err_check(ret)