Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A "gstreamer-like" C++ framework for ffmpeg/libav graphs with support of async processing and lock-free queue

License

NotificationsYou must be signed in to change notification settings

amagimedia/avplumber

Repository files navigation

avplumber is a graph-based real-time processing framework. Graph can be reconfigured on the fly using a text API. Most nodes are based on FFmpeg's libavcodec, libavformat & libavfilter. You can create entire transcoding & filtering chain in it, replacing FFmpeg in many use cases.

avplumber was created because we were experienced with FFmpeg and wanted to have its features, plus more flexibility. For example, it is possible to:

  • encode once and send encoded packets to multiple outputs.
  • filter video (using FFmpeg's filter graph syntax) in multiple threads. It is possible since FFmpeg 6.0, but we needed this feature long before its release.
  • maintain output timestamps continuityand audio-video synchronization even when input timestamps jump.
  • insert fallback slate ("we'll be back shortly") when input stream breaks.

Furthermore, it was designed to allow easy prototyping of new video & audio processing blocks (nodes in graph) without writing so much boilerplate code that is needed in case of libavfilter or GStreamer.

So does it replace FFmpeg in all use cases? Not at all. It is targetted at live use - currently it can't seek the input at all. Also, subtitles aren't supported due to limitations of the underlying library - avcpp.

Quick start

Note: be sure tocheck other branches (tree view) if you want to test latest features.

Make sure to clone this repo with--recursive option.

git clone --recursive https://github.com/amagimedia/avplumberdocker build -t avplumber .docker run -p 20200:20200 avplumber -p 20200

or if you don't want to use Docker but have Ubuntu:

apt install git gcc pkg-config make cmake libavcodec-dev libavdevice-dev libavfilter-dev libavformat-dev libavutil-dev libswresample-dev libcurl4-openssl-dev libboost-thread-dev libboost-system-dev libssl-devmake -j`nproc`./avplumber

and in a different terminal:

nc localhost 20200

and you can type some commands (seeControl protocol) or paste a script (e.g. fromexamples/ directory)

Development on Windows

Development on Windows can be done using Docker and VSCode Dev Containers.

  1. Enable symbolic links by followingthese steps.
  2. Clone this repogit clone --recursive https://github.com/amagimedia/avplumber
  3. Open it in VSCode
  4. Open Command Palette and runDev Containers: Reopen in Container command

Development container comes with all required dependencies and clangd installed.

Demo

To quickly run demo with FFmpeg test source, use the provided Docker Compose file:

script=remux_analyze_audio.avplumber docker compose -f examples/compose/rtmp_test_source.yml up

After Docker pulls and builds everything, you should see stream statistics JSON lines, once per second.

Output stream will be available atrtmp://localhost/live/output

Change script tocomplicated_transcoder.avplumber to test transcoding.

This demo usesMediaMTX as streaming server.

Running Docker on recent Mac OSX versions

brew install docker docker-compose colimacolima start

Using as a library

avplumber can be built as a static library:make static_library will makelibavplumber.a which your app or library can link to.library_examples/obs-avplumber-source/CMakeLists.txt is an example of CMake integration.

Public API is contained insrc/avplumber.hpp.

Example:library_examples/obs-avplumber-source - source plugin forOBS supporting video decoder to texture direct VRAM copy.

Developing custom nodes

Seedoc/developing_nodes.md

Graph

An avplumber instance consists of adirected acyclic graph of interconnected nodes.

Edges = queues

Nodes in the graph are connected by edges. Edge is implemented as a queue.queue.plan_capacity can be used to change its size. Type of data inside queue is determinated automatically when the queue is created.

Data types:

Some nodes support multiple input/output types - they work like templates/generics in programming languages (and are implemented this way). If the data type can be deduced from source or sink edges, there is no need to provide it explicitly. But if it can't be, use template syntax intype field of the node JSON object:

node_type<data_type>

for example:

split<av::VideoFrame>

Topology

Some nodes require that other node implementing specific features (aninterface) is placed before (up) or after (down) it:

  • input/input_rec beforedemux
  • mux beforeoutput
  • video format metadata source beforeenc_video. It can bedec_video,assume_video_format,rescale_video orfilter_video
  • FPS metadata source beforeenc_video,extract_timestamps andfilter_video. It can bedec_video,force_fps,filter_video orsentinel_video
  • audio metadata source beforeenc_audio andsentinel_audio. It can bedec_audio,assume_audio_format orfilter_audio
  • time base source beforebsf,enc_video,enc_audio,extract_timestamps,filter_video,filter_audio,sentinel_video,sentinel_audio. It can beassume_video_format,assume_audio_format,dec_video,dec_audio,filter_video,filter_audio,force_fps,packet_relay orresample_audio
  • encoder (enc_video/enc_audio),bsf orpacket_relay beforemux

Control methods

avplumber is controlled using text commands on TCP socket, so it can be controlled manually usingnetcat ortelnet.--port argument specifies the port to listen on.

--script argument specifies commands to execute on startup.

Control protocol

Control protocol loosely followsMVCP.

On new connection, server (avplumber) sends a line:100 VTR READY

Client issues a command followed by arguments separated by spaces and ending with the new line character. The last argument may contain spaces.

Server (avplumber) responds with line with status code and information:

  • 200 OK - command accepted, empty response
  • 201 OK - command accepted, response will follow and an empty line marks the end of response
  • 400 Unknown command: ...
  • 500 ERROR: ...
  • BYE and connection close - special response forbye command

Commands:

hello

Replies withHELLO

version

Replies with app version and build date

bye

Closes the connection

Nodes management & control

node.add { ...json object... }

Add node

node.add_create { ...json object... }

Add and create node (without starting it right now)

node.add_start { ...json object... }

Add, create and start node

node.delete name

Delete node

node.start name

Start node

node.stop name

Stop node (auto_restart action is inhibited)

node.stop_wait name

Stop node, return reply when node really stopped

node.auto_restart name

Stop node and trigger itsauto_restart action. This command is probably most useful for restarting input after changing its URL, with confidence that it will be handled the same way as if the previous input stream finished. And unlikeretry group.restart ..., nothing blocks! See also more experimentalnode.interrupt.

node.interrupt name

Stop nodeeven if it is being constructed right now. Also, bypass any locks. Currently onlyinput node supports this command. After interruption,auto_restart action will be triggered.

node.param.set node_name param_name new_json_value

Change node parameter. Equivalent in #"auto">node.param.get node_name

Get whole node JSON object, for example:

node.param.get encode720{"codec":"libx264","dst":"venc1","group":"out","name":"encode720","options":{"b":"4M","bufsize":"8M","flags":"+cgop","g":25,"level":"3.2","maxrate":"5M","minrate":"3M","muxdelay":0,"muxrate":0,"preset":"ultrafast","profile":"baseline","sc_threshold":0,"x264opts":"no-scenecut"},"src":"vs1o","type":"enc_video"}OK

node.param.get node_name param_name

Get single parameter as JSON. Example:

node.param.get encode720 group"out"OK

node.object.get node_name object_name

Get object from node. Example:

node.object.get input streams201 OK[{"codec":"h264","index":0,"type":"V"},{"codec":"h264","index":1,"type":"V"},{"codec":"h264","index":2,"type":"V"},{"codec":"aac","index":3,"type":"A"},{"codec":"aac","index":4,"type":"A"},{"codec":"aac","index":5,"type":"A"}]node.object.get input programs201 OK[{"index":0,"streams":[0,3,4,5]},{"index":1,"streams":[1,3,4,5]},{"index":2,"streams":[2,3,4,5]}]

node.object.set node_name object_name object_value

Set object in node. Example:

node.object.set input stream-limits {"start": "5:20", "stop": "5:30", "loop": False }200 OK

Closed Captions control

cc.pause

Pause Closed Captions processing. This command will return an error when noextract_cc_data node present or when there are multipleextract_cc_data nodes connected in graph.

cc.resume

Resume Closed Captions processing. This command will return an error when noextract_cc_data node present or when there are multipleextract_cc_data nodes connected in graph.

Queues (edges)

queue.plan_capacity queue_name capacity

Plan capacity of queue (which must be createdafter issuing this command).capacity is positive integer.Warning:moodycamel::ReaderWriterQueue (the library we use for queues) forces the queue size to be the smallest 2^n-1 (n=integer) larger or equal to specified capacity.

Default capacity can also be changed - use* as a queue_name, e.g.queue.plan_capacity * 7

queues.stats

Print (human-readable) statistics of queues (graph edges). Example:

[#...................]  1821.1  videoin[....................]  243265  aenc1[....................]  243265  aenc0[....................]  243265  aenc2[....................]  243265  mux2[#####...............]  243265  aenc[....................]  1821.03  audioin ^                      ^         ^ queue fill             last PTS  queue name

queue.drain queue_name

Wait until queue is empty.

Groups

group.restart group

group.stop group

group.start group

Raw outputs

output.start output_group

output.stop output_group

realtime nodes

realtime.team.reset team

Manually trigger reset of a realtime team.

Playback control

pause team_name now

Tell allpause nodes in a teamteam_name to pause (stop passing packets)

pause team_name at timestamp

Tell allpause nodes in a teamteam_name to pause (stop passing packets) at specified timestamp.Timestamp may be expressed in many forms like:01:02:03 (hh:mm:ss),01:00.150 (mm:ss.millis),12000 (time expressed in ms),2024-10-03T08:12:44.100 (wallclock time, ISO 9601 format with optional milliseconds).

resume team_name

Tell allpause nodes in a teamteam_name to resume playback

seek team_name live

Flush all queues between theinput_res nodes and the nodes in ateam_name team and seek to live video.Live wideo is a video playback from recording which is still recorded. It is delayed 10 seconds after current time.

seek team_name now timestamp

Flush all queues between theinput_rec nodes and the nodes in ateam_name team and seek to given timestamp.Timestamp may be expressed in many forms like:01:02:03 (hh:mm:ss),01:00.150 (mm:ss.millis),12000 (time expressed in ms),2024-10-03T08:12:44.100 (wallclock time, ISO 9601 format with optional milliseconds).When timestamp is preceeded with+ sign, then it will seek to<current time> + <timestamp> (likeseek team +0:10 - seek 10s forward from now).When timestamp is preceeded with- sign, then it will seek to<current time> + <timestamp> (likeseek team frame -0:10 - seek 10s backward from now).When there is no sign before the timestamp, it will seek go absolute timestamp.

seek team_name frame <frame number>

Flush all queues between theinput_rec nodes and the nodes in ateam_name team and seek to given frame number.When frame number is preceeded with+ sign, then it will seek to<current frame> + <frame number> (likeseek team frame +10 - seek 10 frames forward from now).When frame number is preceeded with- sign, then it will seek to<current frame> + <frame number> (likeseek team frame -10 - seek 10 frames backward from now).When there is no sign before the frame number, it will seek go absolute frame number (starting from0).

seek team_name at timestamp_when timestamp_to

Flush all queues between theinput_rec nodes and the nodes in ateam_name team and seek to giventimestamp_to when playback time reachestimestamp_when.Multipleseek at commands may be specified and it will executed in order of adding.Timestamp may be expressed in many forms like:01:02:03 (hh:mm:ss),01:00.150 (mm:ss.millis),12000 (time expressed in ms),2024-10-03T08:12:44.100 (wallclock time, ISO 9601 format with optional milliseconds).

seek team_name clear

Clearsseek at queue.

speed.set team_name speed

Set speed of thespeed nodes belonging to the teamteam_name tospeed (float).

speed.get team_name

Get current playback speed of the teamteam_name.

Hardware acceleration

hwaccel.init { "name": "name", "type": "type" }

Init hardware accelerator which may be used for encoding, decoding or filtering video frames.

  • name - identifier, supports global objects syntax (@). If accelerator with given name already exists, it isn't touched and no error is returned.
  • type - currently onlycuda is supported

Statistics

stats.subscribe { ... json object ... }

Subscribe to statistics. JSON object structure:

{  "url":"",         // URL to put statistics using HTTP POST. Empty to write them to console.  "name":"qwerty",  // opaque  "interval":1,     // refresh interval, in seconds  "streams":{    "audio":[      {        "q_pre_dec":"audioin",    // name of queue before decoder (kbitrate, speed, AV_diff are taken from it)        "q_post_dec":"a10",       // name of queue after decoder (fps, width, height, pix_fmt, field_order, samplerate, samplerate_md, channel_layout are taken from it)        "decoder":"Audio_Decode"  // name of decoder node (codec, type are taken from it)      }    ],    "video":[      {        "q_pre_dec":"videoin",        "q_post_dec":"v05",        "decoder":"Video_Decode"      }    ]  },  "sentinel":"Video_Sentinel"  // name of sentinel node (card flag is taken from it)}

Events

event.on.node.finished event_name node_name

When node finishes, signal (wake up) an eventevent_name

event.wait event_name

Wait for eventevent_name

Special commands

retrycommand arguments ...

Repeat command until it succeeds. Intended for startup scripts (--script). Not recommended for remote control.

detachcommand arguments ...

Return 200 OK immediately, run command in background thread.

Node object

Each node is described by a JSON object consisting of the following fields:

  • name (string without spaces) - optional, specifies identifier that can be later used for controlling the node
    • if specified, must be unique within the instance
    • if unspecified, the stringtype@memory_address will be generated and used
  • type (string) - mandatory
  • group (string) - used for grouping together nearby nodes. Example: transcoder that will have separate input and output groups so that when input URL is changed, only demuxer and decoders will be restarted, not encoders and muxer.
  • auto_restart (string) - optional:
    • off (default) - let the node stop without restarting
    • on - restart single node when it finishes/crashes
    • group - restart the whole group to which the node belongs
    • panic - when the node finishes/crashes, shutdown the whole avplumber instance
  • src (string for single-input nodes, list of strings for multi-input nodes) - source edge
  • dst (string for single-output nodes, list of strings for multi-output nodes) - sink edge
  • optional (bool) - optional: when creating the node fails:
    • true - ignore exceptions (return 20x) and pretend nothing bad happened
    • false (default) - fail the whole operation (e.g. starting a group)

Most nodes have also their specific parameters which are specified on the same level as the fields above.

Non-blocking nodes

Some node types are non-blocking, which means that there is no separate thread to run the node, but it processes data in an event-based manner, which is configurable using the following fields:

  • event_loop (string, name of instance-shared object) - name of the event loop, if not specified,default event loop will be used. Each event loop works in a separate thread.
  • tick_source (string, name of instance-shared object) - name of the tick source. If not specified, node will work in tickless manner, waking up only when necessary (e.g. a node above in graph has put some data into queue). On the other hand, if this field is specified, the tick source will wake up the node at regular intervals synchronized to some external clock. This reduces latency and jitter. Currently useful only inOBS avplumber plugin - specifyobs as atick_source to synchronize a non-blocking node to the video mixer's FPS.

The tick source has its own event loop (or may even bypass it and call the node in its own thread to reduce latency) so you can't specify bothevent_loop andtick_source.

Example JSON syntax for fields

  • string:"string"
  • string of URL:"protocol://domain/path"
  • string of rational:"30000/1001" (so-called 29.97 fps)
  • list of strings:["string1", "string2", "string3"]
  • dictionary (also known as map):{"key1":"value1", "key2":"value2"}
  • bool:true orfalse
  • int:31337
  • float:1337.42
  • name of aninstance-shared object:"object"
  • name of a global instance-shared object:"@global_object"

Node types

input

1 output:av::Packet

  • url (string of URL)
  • options (dictionary) - options for libavformat
  • timeout (float, seconds) - packet read timeout
  • initial_timeout (float, seconds) - URL open timeout

input_rec

1 output:av::Packet

This node is a special case on theinput node with some changes required to plae & seek recoding files.It has the same parameters as theinput node and some additional

  • url (string of URL)
  • options (dictionary) - options for libavformat
  • timeout (float, seconds) - packet read timeout
  • initial_timeout (float, seconds) - URL open timeout
  • live_delay (float, seconds) - time to delay between latest available packet and currently displayed in live mode (default1)
  • start_ts (string) - start timestamp
  • stop_ts (string) - stop timestamp
  • loop (bool) - if set to true, video will be played in a loop
  • stop_delay (int, miliseconds) - time to hold on last read frame before reporting end od processing
  • seek_table (string of URL) - file with fast-seek offsets (may be generated by theoutput node)
  • ts_offsets (string of URL) - file with timestamp offsets (may be generated by thesentinel node)
  • timestamp_source (string) - timestamp source used to synchronize multiple videos. Possible values arewallclock andinput. Both values are calculated from the timestamps offsets file (ts_offsets)
  • preseek (float, seconds, default 0) - how many seconds to preseek back to increase chance of finding a keyframe, when seek to timestamp (seek command) is requested
  • team (string, name of instance-shared object) - if specified, seeks on theinput nodes may be synchronized inside specified team

realtime

Rate limit output packets/frames to wallclock. This way, DTS (inpackets) or PTS (in frames) differences will equal wallclock differencesat this node's sink.

Also, allows inter-stream synchronization (as long as timestamps in them are synchronized) and automatic flushing if too much data is buffered in queues before this node.

This node is non-blocking.

1 input, 1 output: anything

  • leak_after (float, seconds) - if specified, bypass rate limitingafter having input packets available immediately (in other words, atleast one packet was always enqueued) for specified time. Intendedfor segmented inputs for which realtime node is useful to preventbursts, while we don't want clock drift problems and missed segmentsin long running streams.
  • speed (float) - default 1, implemented by scaling wallclock'stimebase (millisecond precision) so values between ~0.9995 and~1.0006 are treated as 1.
  • tick_period (string of rational, seconds) - if specified andtick_source is also specified, anti-jitter filter will be enabled, assuming that tick source emits a tick everytick_period. Generally should be set to 1/FPS, e.g.1/60. The filter maintains its own clock independent of wallclock, but will resync to the wallclock if it drifts too much. If unspecified, wallclock will be used.
  • set_pts (bool, default false) - set PTS to wallclock timestamps corresponding to time when packets are outputted (or, more precisely, when they would be outputted if there was no jitter)

Input tolerance parameters:

  • negative_time_tolerance (float, seconds) - default0.25. Do notresync if newly arrived packet should have been emitted at most thatmuch time in the past. 0 to disable and always resync in suchsituation - effectively increasing latency until sufficientbuffering for smooth output is achieved.
  • negative_time_discard (float, seconds) - if specified, if newlyarrived packet should have been emitted at least that much time inthe past, discard this packet. Discarding has lower priority thanresyncing (negative_time_tolerance), so the value must be lessthannegative_time_tolerance to make sense, equal or higher valuesdisable discarding.
  • discontinuity_threshold (float, seconds) - default1. If we need towait for more than specified time, treat as discontinuity andresync. Default value may be unsuitable (too small) for multiplesource synchronization in case more data is buffered.
  • jitter_margin (float, seconds) - default0. When (re)syncing, addthis value to the time to be waited. This prevents frequentresyncing and visible jitter at the cost of higher latency. It makessense only with unbuffered output (e.g. display)
  • initial_jitter_margin (float, seconds) - default =jitter_margin.jitter_margin to use for the first frame received after node start,after discontinuity or afterleak_after-triggered bypass,butnot after "negative time to wait (...), resyncing"

Inter-stream synchronization parameters:

  • team (string, name of instance-shared object) - if specified,realtime nodes with the same team will cooperate to have theiroutput synchronized. Use only if timestamps are synchronized.
  • master (bool) - defaulttrue. Only masters are allowed to resync incase of discontinuity. A team can have multiple masters.

Automatic flushing parameters (experimental):

  • input_ts_queue (string, name of queue) - which queue should be treated as the beginning of buffering chain. Usually should be set to the output of the demuxer. If unspecified, automatic flushing is disabled.
  • intermediate_queues (list of strings) - list of intermediate queues that will be examined whether they contain packets
  • max_buffered (float, seconds, default 5.5) - start flushing when the buffering chain has more than this amount buffered (calculated as difference between timestamp of last packet inserted into theinput_ts_queue and timestamp of the frame coming to this node)
  • min_buffered (float, seconds, default 0.5) - stop flushing when the buffering chain has less than this amount buffered, orinput_ts_queue andintermediate_queues are all empty

For each passing packet, time to wait is computed (how long should wesleep before outputting that packet, to maintain realtime output rate)and different actions are performed based on its value

  • iftimeToWait <= 0:
    • iftimeToWait < -negative_time_tolerance:
      • resync and emit packet
    • else iftimeToWait < -negative_time_discard:
      • discard this packet
    • else: /*-negative_time_discard <= timeToWait <= 0*/
      • emit packet immediately
  • else: /*timeToWait > 0 */
    • iftimeToWait < discontinuity_threshold:
      • normal behavior - wait timeToWait and emit packet
    • else: /*timeToWait >= discontinuity_threshold */
      • resync and emit packet

Note: This pseudocode omitsleak_after logic.

Recommended options for encoding from segmented input (HLS, DASH):

  • speed: 1.01
  • leak_after: segment length * 2

Recommended options for displaying live video:

  • negative_time_tolerance: 0.001 - 0.02 depending on clock precision of your system
  • jitter_margin: 0.1 or some more

demux

1 input, many outputs:av::Packet

  • streams_filter (string): if present, filter input streams accordingto ffmpeg syntax (for example "p:1" to select Program 1 - usefulfor HLSes) before parsing routing keys
  • routing (dictionary of string => string): stream mapping, keys arestreams in input file, values are queues, like{ "v:0": "videoin", "a:0": "audioin" }, used instead ofdst
    • keys may be prefixed with? to indicate optional input (ignorethe route if stream not found)
  • wait_for_keyframe (bool): default false, discard packets until akeyframe appears in any video stream

dec_video,dec_audio

1 input:av::Packet, 1 output:av::VideoFrame orav::AudioSamples respectively

  • codec (string) - optional, codec name, auto-detected by libavcodecif not specified
  • codec_map (dictionary of string => string) - optional, usespecified decoder for matching stream's codec, for example to usecuvid for h264 and hevc streams:"codec_map": {"h264": "h264_cuvid", "hevc": "hevc_cuvid"}
  • pixel_format (string) - optional, if unspecified, libavcodec and/orcodec will select best possible pixel format for given input stream.As seen inpix_fmt field of FFmpeg'sAVPacket, so it doesn't have to be anyreal pixel format (e.g.yuv420p) but can also be hardwareacceleration specification (e.g.cuda)
    • if starts with?, prefer specified pixel format, e.g.?cuda,but allow use of any
    • if doesn't start with?, force specified pixel format, fail ifit's incompatible with codec or stream
  • hwaccel (string, name of instance-shared object) - optional, name ofhwaccel previously created withhwaccel.init
  • hwaccel_only_for_codecs (list of strings) - use hwaccel only forspecified input stream codecs, useful because apparently settinghw_device_ctx in normally-software libavcodecs triggers framecorruption bugs
  • options (dictionary) - optional, options passed to libavcodec

extract_timestamps

Set PTS to timecode from video frame's side data.

1 input, 1 output:av::VideoFrame

  • team (string, name of instance-shared object) - if specified,multipleextract_timestamps andextract_timestamps_slave nodeswithin the same team will share the same offset, so streams notcontaining timecode side data (e.g. audio) will be synchronized tothe same timecode, too (as long as their PTSes are synchronized toeach other).

  • passthrough_before_available (bool) - defaultfalse, meaning thatprocessing will be blocked (and input queue will grow) as long astimecode isn't available. If true, frames will be passed throughwith PTS unchanged in such case.

  • drop_before_available (bool) - discard incoming packets beforetimecode is available. Disabled by default. Has lower priority thanpassthrough_before_available.

  • timecodes (list of strings), default["S12M"] - side data to gettimecodes from. If specified timecode doesn't exist, next one in thelist is tried. Possible items:

    • S12M.1 orS12M - SMPTE 12M = SEI
    • S12M.2
    • S12M.3
    • GOP
  • liveu (bool, default false) - workaround for LiveU-encoded SMPTE12M. Treat drop bit as a part of frames field.

  • frame_rate_source (string) - timecodes have frame numbers, so tocalculate PTS from them, number of frames per second must be known.Possible values:

    • fps - default. Use FPS from nearest node implementingIFrameRateSource (decoder, filter orforce_fps)
    • timebase - use 1/timebase from nearest node implementingITimeBaseSource (decoder, filter orforce_fps)

    both values may yield the same or different results depending oninput stream. Some video streams are generated by skipping everysecond frame from higher FPS stream, with S12M side-data preservedin remaining frames. In such casesfps is wrong andtimebase iscorrect.

extract_timestamps_slave

Set PTS to timecode extracted byextract_timestamps node.

1 input, 1 output:av::VideoFrame orav::AudioSamples

Supports parameters working the same as inextract_timestamps node:

  • team - mandatory
  • passthrough_before_available
  • drop_before_available

filter_video,filter_audio

1 input, 1 output:av::VideoFrame orav::AudioSamples, respectively

  • graph (string) -FFmpeg filtergraph
  • hwaccel (string, name of instance-shared object) - optional(mandatory for some filters), name of hwaccel previously createdwithhwaccel.init

speed_video,speed_audio

Change timestamps so that playback speed changes in real-time.

1 input, 1 output:av::VideoFrame orav::AudioSamples, respectively

  • team (string, name of instance-shared object, default"default") - team name that will be used for changing the speed usingspeed.set command
  • discard_when_speed_changed (bool, default false) - discard frames when speed isn't equal to 1. Intended for audio streams.
  • timebase (string of rational) - optional, if specified, will scale incoming timestamps to this timebase. Otherwise original timebase will be preserved. To get as smooth output as possible, set it to your native output timebase (1/fps).
  • speed (float) - initial speed, default: 1
  • sync_team (string, name of instance-shared object) -realtime team name. Required by smooth change playback direction (eg. from forward to backward) to flush all the queues betweeen input and therealtime nodes.

pause

Stop passing through frames/packets and resume on request (pause,resume commands).

1 input, 1 output: anything

  • team (string, name of instance-shared object, default"default") - team name that will be used for control
  • paused (bool) - sets the team initially in paused state

force_fps

Duplicate and drop frames to achieve requested FPS

1 input, 1 output:av::VideoFrame

  • fps (string of rational) - target FPS as a string, e.g.25 or30000/1001

assume_video_format /assume_audio_format

Set initial metadata to allow nodes that rely on them to startwhen real metadata aren't available yet.

1 input, 1 output:av::VideoFrame orav::AudioSamples

Parameters for video:

  • width (int) - default 1920
  • height (int) - default 1080
  • pixel_format (string) - defaultyuv420p
  • real_pixel_format (string) - specify only ifpixel_format is hardware-accelerated (e.g.cuda)

Parameters for audio:

  • sample_rate (int) - default 48000
  • sample_format (string) - defaults32p
  • channel_layout (string) - defaultstereo

sentinel_video /sentinel_audio

A sentinel

  • guards streams against wild timestamps - PTSes jumping forward or backward, or repeating timestamps
  • inserts backup frames when input signal is not available for specified time:
    • in audio stream: silence
    • in video stream, for maximum time offreeze parameter: repeated last frame
    • in video stream: custom slate, can be used for adding "we'll be back shortly" card

Sentinel's output has "ideal" timestamps with tolerance specified in sentinel's parameters. In other words, it ensures output stream continuity.

1 input, 1 output:av::VideoFrame/av::AudioSamples

  • timeout (float, seconds) - default 1, seconds to wait for input frame beforeinserting frozen or backup frame

  • correction_group (string, name of instance-shared object) -optional, defaults to"default", used for sharing the clock betweenstreams

  • forward_start_shift (bool):

    • true: if input streams are present when starting the sentinels,forward relative shifts of their first packets (i.e. A-V offset)to output.
    • false (default): start all output streams at exact PTS = 10seconds (hardcoded inPTSCorrectorCommon class)
  • max_streams_diff (float, seconds) - default0.001, tolerance in seconds

  • start_ts (float, seconds) - default10, first output timestamp

  • lock_timeshift (bool) - after receiving first PTS, maintain constant input-output PTS difference. Disabled by default. Enable only if you're sure that input timestamps are synchronized to real-time clock.

  • reporting_url (optional, string of URL) - if specified, correctiontime shift changes will be reported to this URL as HTTP POST withJSON body:

    {"changed_at":128.1,"input_pts_offset":126.75999999999999,"output_pts_offset":10.0}

    • changed_at - output timestamp of the change relative to firstoutput PTS (output_pts_offset)
    • input_pts_offset - what sentinel needs to add to inputtimestamp to achieve output PTS, minusoutput_pts_offset
    • output_pts_offset = first output PTS, constant throughprocessing, changeable usingstart_ts parameter

For video only:

  • freeze (float) - default 5, seconds to duplicate last good framebefore outputting backup frame
  • backup_frame (string of URL) - backup frame (slate) image
  • backup_picture_buffer (string, name of instance-shared object) - read backup frame (slate) from this buffer. Usepicture_buffer_sink to write frame to the buffer. Sentinel will reload the slate from the picture buffer every 64 frames and on every input signal break triggering slate insertion.
  • initial_picture_buffer (string, name of instance-shared object) - initialize last frame buffer with this buffer, so that at the beginning of stream it will be used for at mostfreeze duration. Useful to insert black frame instead of slate at the beginning whenforward_start_shift is set to false. If unspecified, regularbackup_frame orbackup_picture_buffer will be used.

Forsentinel_video, eitherbackup_frame orbackup_picture_buffer must be provided.

rescale_video

Dynamic video scaler, maintains constant output dimensions and pixelformat even if input stream changes parameters.Does not resampleFPS (seeforce_fps node)

1 input, 1 output:av::VideoFrame

resample_audio

Dynamic audio resampler, maintains continuity of output stream even ifinput stream changes parameters.

1 input, 1 output:av::AudioSamples

  • dst_sample_rate (int)
  • dst_channel_layout (string)
  • dst_sample_format (string)
  • compensation (float)
    • 0 (default) means brutal compensation using built-in avplumber'ssample dropping algorithm
    • any negative value means brutal compensation usinglibswresample,may not work correctly
    • positive value between 0 and 1 means soft compensation usinglibswresample, value means fraction of samples to compensate,may not work correctly

split

1 input, multi outputs: anything

  • drop (bool) - drop packets if output queue is full, disabled by default

force_keyframe

Set keyframe flag in frame to make encoder output keyframe. Unlike-gencoder option in FFmpeg, works with non-integer FPS.

1 input, 1 output:av::VideoFrame

  • interval_sec (int / float / string of rational) - keyframeinterval, in seconds

enc_video,enc_audio

Encodes video or audio frames.

1 input:av::VideoFrame orav::AudioSamples, 1 output:av::Packet

  • codec (string) - mandatory
  • options (dictionary) - options passed to libavcodec
  • hwaccel (string, name of instance-shared object) - optional(mandatory for some encoders), name of hwaccel previously createdwithhwaccel.init
  • timestamps_passthrough (bool) - defaultfalse, intended for codecsthat don't buffer data (otherwise bad things like repeatedtimestamps may happen), replace PTS & DTS in outgoing packet withincoming PTS

packet_relay

Insert it between demuxer and muxer to remux packets withouttranscoding.

1 input, 1 output:av::Packet

no parameters

bsf

BitStream filter

1 input, 1 output:av::Packet

  • bsf (string) - name of bsf to use

mux

multiple inputs, 1 output:av::Packet

  • fix_timestamps (bool) - shift PTSes and DTSes sothat DTSes are always increasing and (PTS >= DTS). Disabled by default.
  • ts_sort_wait (float, seconds) - default2.5, maximum time to waitfor all streams to select the packet with least DTS. Set to0 toemit packets as soon as they arrive.

output

1 input:av::Packet

  • format (string) - mandatory
  • url (string of URL) - mandatory
  • options (dictionary) - format options that will be passed to libavformat
  • seek_table (string of file name) - if specified, binary file with native endianness with seek table will be written.
  • seek_table_text (string of file name) - if specified, text file with seek table will be written.

Format of binary seek table: 16-byte records containing:

  • timestamp of the frame in milliseconds: int64_t
  • bytes offset in the output file: uint64_t

Format of text seek table: values as above separated by space, each record in one line

jack_sink

1 input:av::AudioSamples (sample format must befltp, sample rate must be equal to JACK's)

Output audio toJACK Audio Connection Kit server. Can be used for both inter-app routing and outputting audio to the sound card. Note that JACK has its own clock (soundcard clock or in case of dummy device - OS monotonic clock) so in long running streams underruns or overruns may occur, unless the stream's clock and JACK clock are synchronized (for example, input is AES67 RTP and the hardware audio interface is synchronized to AES67 master clock using wordclock or S/PDIF).

Both sample format and sample rate must match configuration of the JACK server (sample format is alwaysfltp in JACK). Useresample_audio node to convert.

This node has an internal buffer to ensure that JACK thread can run in real time. In case of bursty streams (e.g. coming from the Internet) this buffering may be insufficinent. Therealtime node with a smallnegative_time_tolerance (below JACK period size) will help in such cases.

  • channels_count (int, default 2) - number of JACK ports to create. If incoming audio stream has less channels, the remaining ones will be filled with silence. If more, the excessive channels will be discarded.
  • port_prefix (string, default empty string) - if specified, will append channel index to this string and it'll become the JACK port name. If unspecified, JACK port name will be only the channel index.
  • connect_port_prefix (string) - if specified, will append port number to this string and try to connect to an input port with that name in the JACK graph
  • client_name (string) - mandatory, name of the JACK client. If multiplejack_sink nodes are created with the same client name, ports will belong to the same JACK client (make sure to setport_prefix in such cases). If they have different client names, multiple JACK clients will be created. What is better depends on the use case - multiple JACK clients allow parallel processing but put more overhead on the CPU.

raw_output

Write stream of raw packets or frames to file or pipe. Unlikeoutputnode, can be used anywhere in graph. Outputs whatever will be thrown onit. Use nodesrescale_video orresample_audio to convert to requiredformat.

1 input: anything

  • path (string) - file/pipe name,not URL, must be reachable viaUnix file structure, libavformat protocols liketcp:// aren'tsupported
  • output_group (string) - default "default". Name of output groupfor control (seeoutput.start andoutput.stop commands) andsynchronization. Since raw output doesn't have PTSes, avplumber will tryto synchronize audio with video by cutting first audio frame to makeit start together with first video frame.

picture_buffer_sink

Take a frame and write it to picture buffer that can be later used bysentinel_video.

1 input:av::VideoFrame

  • buffer (string, name of instance-shared object) - mandatory, picture buffer name
  • once (bool) - default true, finish after getting a single frame

null_sink

Discards incoming packets just like/dev/null.

1 input: anything

no parameters

ipc_cuda_source

Get video frames from CUDA IPC memory. Frame pointer and parameters are read from named pipe. Seesrc/nodes/cuda/ipc_cuda_source.cpp for structure.

1 output:av::VideoFrame (frame's pixel format will always becuda)

  • pipe (string) - mandatory, path to named pipe
  • hwaccel (string, name of instance-shared object) - mandatory, name ofhwaccel previously created withhwaccel.init

ipc_audio_source

Get audio frames from named pipe. Seesrc/nodes/ipc_audio_source.cpp for header structure. Header must be followed by interleaved audio samples.

1 output:av::AudioSamples

  • pipe (string) - mandatory, path to named pipe

parse_scte35

Parse SCTE35SPLICE_INSERT command.

1 input:av::Packet, must be connected to SCTE35 data stream (e.g.d:0 key in routing map of thedemux node)

  • url (string of URL) - if specified, JSON object with splice insert data will be HTTP POSTed to this url. If unspecified, the object will be printed to the log for debugging purposes.

extract_cc_data

1 input:av::VideoFrame, 1 output:av::Packet

Extract ATSC A53 Part 4 Closed Captions data from video frame. Subtitle codec is usually EIA-708 or 608 in such side data. When outputted to UDP with libavformat'sspecialdata 'muxer' (seeexamples/extract_cc_data.avplumber), subtitles can be parsed usingCCExtractor or GStreamer (YMMV).

no parameters

jittergen

Enabled only if avplumber is compiled withBUILD_TYPE=Debug. Delay packetsor frames for a random time. Timestamps aren't modified. Delay will begradually decreased down to 1ms if a congestion is detected.

1 input, 1 output: anything

delaygen

Enabled only if avplumber is compiled withBUILD_TYPE=Debug. Delay packets or frames for specified time. Timestamps aren't modified.

1 input, 1 output: anything

  • delay (float) - mandatory, delay in seconds

Instance-shared objects

Some nodes (sentinel,realtime) can have shared state. It's stored ininstance-shared objects. Other nodes (encoder,filter) need theinstance-shared object created (hwaccel.init) before it's used in them.

If a name of an instance-shared object starts with@, it is global inprocess address space. If not, its scope is limited to avplumber instance.

In case of avplumber launched as a standalone process, instance==process andusing global objects doesn't have any benefit.

In case of avplumber used as a library, each AVPlumber object is an avplumber instance.Global objects can be used to share state between nodes ofdifferent instances as long as they're within the same operatingsystem's process.

Seeking infrastructure & playback control (experimental)

Despite the architecture initially being designed solely for handling live streams, latest updates to avplumber bring playback control support.

Seeking is complicated because queues need to be flushed to ensure that user doesn't have to wait for them to drain after requesting a seek. Also, we want to display frame after seek even when the player is paused. That's why seek commands (seek) need the name of the downmost node in the graph that limits output speed (in a video player it would berealtime). The graph is walked up, passing needed requests to decoder nodes and issuing the actual seek request to theinput_rec node.

Seeexamples/video_player.avplumber for a typical graph with playback control including seeking. Example control commands compatible with it:

  • seek rtsync now 30000 - seek to DTS=30s
  • pause p,resume p
  • speed.set s 0.25 - set speed to 4 times slower than realtime
  • speed.set s 2 - set speed to 2 times faster than realtime
  • speed.set s -1 - set speed to reverse (1x)

Fast seek

If you want seeking to be as fast as possible, you'll need a specially encoded file. You can make it with avplumber, too.

  • Use intra-frame-only codec forenc_video
  • Specifyseek_table option of theoutput node

In your application controlling the player, parse the generated seek table and find byte offset corresponding to the timestamp you want to seek to. Then issue the command:

seek rtsync now <timestamp>

Make sure thatpreseek is set to 0 (or unspecified) in the player'sinput_rec node.

Tips & tricks

How to quickly change input on the fly

node.interrupt inputnode.param.set input url "rtmp://new.stream/url"

Important: Execute the second command immediately after the first.

The first command stops input close to immediately (even if it's beingrestarted right now). Input (if configured properly byauto_restartpolicy) will restart itself (or the whole group) after a second. So weissue the second command within that second, before internal lock onnodes manager is acquired.

Note that if input is running normally (i.e. not starting right now),the following commands will do effectively the same:

node.param.set input url "rtmp://new.stream/url"node.auto_restart input

Dump avplumber config from log

sed -e 's/^.\+\[control\] Executing: \(.\+\)$/\1/; t; d' < log

Show nodes graph based on add node commands from log

./tools/graph_from_log_to_dot log > graph.dotdot -Tsvg graph.dot -o graph.svgxdg-open graph.svg

(may not work correctly withdetach orretry commands, will not work with dangling edges, pull requests welcome!)

View log without HLS muxer's spam

grep -Ev '^(EXT-X-MEDIA-SEQUENCE:[0-9]+|\[AVIOContext @ 0x[a-f0-9]+\] Statistics: [0-9]+ seeks, [0-9]+ writeouts|\[hls @ 0x[a-f0-9]+\] Opening '\''.+\.tmp'\'' for writing)$' logfile | less

Watch queues fill in real time

watch -n0.1 "echo 'queues.stats' | nc localhost 20200"

In some versions of netcat it doesn't work. Try this:

watch -n0.1 "echo 'queues.stats\nbye\n\n' | nc localhost 20200"

If you have big queues, they may occupy multiple lines in terminal. To make them shorter:

while true; do echo -e 'queues.stats\nbye\n\n' | nc localhost 20200 | sed -E 's/#{16}/\$/g; s/\.{16}/,/g' ; sleep 0.1; done

Find non-empty queues

open log file in less, press/ or? and use this regular expression:

[1-9]0?/[0-9]{1,3},

License and acknowledgements

Created by Teodor Wozniakteodor.wozniak@amagi.comhttps://lumifaza.org

Copyright (c) 2018-2024 Amagi Media Labs Pvt. Ltdhttps://amagi.com

This program is free software: you can redistribute it and/or modifyit under the terms of the GNU Affero General Public License aspublished by the Free Software Foundation, either version 3 of theLicense, or (at your option) any later version.

This program is distributed in the hope that it will be useful,but WITHOUT ANY WARRANTY; without even the implied warranty ofMERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See theGNU Affero General Public License for more details.

You should have received a copy of theGNU Affero General Public Licensealong with this program. If not, seehttps://www.gnu.org/licenses/.

FFmpeg

This program uses FFmpeg libraries.

FFmpeg codebase is mainly LGPL-licensed with optional components licensed under GPL. Please refer toits LICENSE file for detailed information.

AvCpp

This program uses AvCpp - C++ wrapper for FFmpeg dual-licensed under theGNU Lesser General Public License, version 2.1 ora BSD-Style License

C++ Requests

This program uses C++ Requests (cpr) library.

Copyright (c) 2017-2021 Huu Nguyen

Copyright (c) 2022 libcpr and many other contributors

MIT License

Flags.hh

This program uses Flags.hh command line parser header.

Copyright (c) 2015, Song Gao

BSD-3-Clause license

ReaderWriterQueue

This program uses ReaderWriterQueue.

Copyright (c) 2013-2021, Cameron Desrochers

Simplified BSD License

nlohmann::json

This program uses JSON for Modern C++ library licensed under theMIT License

Copyright © 2013-2022Niels Lohmann

CUDA

This program uses CUDA loader taken fromNVIDIA's CUDA samples.

Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.

BSD-3-Clause license

About

A "gstreamer-like" C++ framework for ffmpeg/libav graphs with support of async processing and lock-free queue

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp