From 657eaf61703d42134cd7b5ccb5919162c96017d8 Mon Sep 17 00:00:00 2001
From: Julian Rother <julianr@fsmpi.rwth-aachen.de>
Date: Sun, 26 Nov 2017 03:39:34 +0100
Subject: [PATCH] Added transcode worker

---
 transcode.c | 311 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 311 insertions(+)
 create mode 100644 transcode.c

diff --git a/transcode.c b/transcode.c
new file mode 100644
index 0000000..cdeeac0
--- /dev/null
+++ b/transcode.c
@@ -0,0 +1,311 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <libavutil/opt.h>
+#include <libavcodec/avcodec.h>
+#include <libavformat/avformat.h>
+#include <libavfilter/avfilter.h>
+#include <libavfilter/buffersink.h>
+#include <libavfilter/buffersrc.h>
+
+#include "util.h"
+#include "config.h"
+
+void assert_empty_opts(AVDictionary *opts)
+{
+	AVDictionaryEntry *e;
+	e = 0;
+	while (e = av_dict_get(opts, "", e, AV_DICT_IGNORE_SUFFIX))
+		job_failed("Unrecognized option: %s", e->key);
+}
+
+static void setup_input_stream(char *stream, AVFormatContext *demux,
+		AVCodecContext **decs, AVFilterGraph *fg, AVFilterContext **srcs,
+		AVFilterInOut **pads)
+{
+	char *p;
+	int idx, err;
+	enum AVMediaType type;
+	AVDictionary *opts;
+	AVFilterInOut *_inpads;
+	AVFilterInOut *_outpads;
+	type = AVMEDIA_TYPE_VIDEO;
+	if (!strcmp(jstr(jlookup(stream, "type"), ""), "audio"))
+		type = AVMEDIA_TYPE_AUDIO;
+	idx = av_find_best_stream(demux, type, jint(jlookup(stream, "index"), -1), -1, 0, 0);
+	if ((idx == -1 || decs[idx]) && jint(jlookup(stream, "index"), -1) == -1)
+		for (idx = 0; idx < demux->nb_streams
+				&& (decs[idx] || demux->streams[idx]->codecpar->codec_type == type); idx ++);
+	if (idx == -1 || decs[idx])
+		job_failed("Could not find suitable input stream");
+
+	/* Prepare Decoder */
+	decs[idx] = avcodec_alloc_context3(avcodec_find_decoder(demux->streams[idx]->codecpar->codec_id));
+	if (!decs[idx])
+		job_failed("Could not allocate decoder context");
+	avcodec_parameters_to_context(decs[idx], demux->streams[idx]->codecpar);
+	opts = 0;
+	parse_dict(&opts, jlookup(stream, "options"));
+	if ((err = avcodec_open2(decs[idx], decs[idx]->codec, &opts)) < 0)
+		job_failed("Opening decoder failed: %s", av_err2str(err));
+	assert_empty_opts(opts);
+
+	/* Prepare Filter Source */
+	if (type == AVMEDIA_TYPE_VIDEO)
+	{
+		p = mprintf("video_size=%dx%d:pix_fmt=%d:time_base=%d/%d:pixel_aspect=%d/%d",
+			decs[idx]->width, decs[idx]->height, decs[idx]->pix_fmt,
+			demux->streams[idx]->time_base.num, demux->streams[idx]->time_base.den,
+			decs[idx]->sample_aspect_ratio.num, decs[idx]->sample_aspect_ratio.den);
+		err = avfilter_graph_create_filter(&srcs[idx],
+				avfilter_get_by_name("buffer"), 0, p, 0, fg);
+		if (err < 0)
+			job_failed("Creating buffer source failed: %s", av_err2str(err));
+		free(p);
+	}
+	else if (type == AVMEDIA_TYPE_AUDIO)
+	{
+		p = mprintf("time_base=%d/%d:sample_rate=%d:sample_fmt=%d:channel_layout=%d:channels=%d",
+			demux->streams[idx]->time_base.num, demux->streams[idx]->time_base.den,
+			decs[idx]->sample_rate, decs[idx]->sample_fmt, decs[idx]->channel_layout,
+			decs[idx]->channels);
+		err = avfilter_graph_create_filter(&srcs[idx],
+				avfilter_get_by_name("abuffer"), 0, p, 0, fg);
+		if (err < 0)
+			job_failed("Creating abuffer source failed: %s", av_err2str(err));
+		free(p);
+	}
+	for (; *pads; pads = &(*pads)->next);
+	*pads = avfilter_inout_alloc();
+	(*pads)->filter_ctx = srcs[idx];
+	(*pads)->pad_idx = 0;
+	(*pads)->next = 0;
+	for (p = jenter(jlookup(stream, "filters")); p; p = jnext(p))
+	{
+		(*pads)->name = av_strdup("in");
+		if ((err = avfilter_graph_parse_ptr(fg, jstr(p, 0), 0, pads, 0)) < 0)
+			job_failed("Parsing filter string \"%s\" failed: %s", jstr(p, 0), av_err2str(err));
+		if ((*pads)->next)
+			job_failed("Unconnected filter pad\n");
+	}
+	(*pads)->name = av_strdup(jstr(jlookup(stream, "name"), "unknown"));
+}
+
+static void prepare_output_stream(char *stream, AVStream *st,
+		AVCodecContext **encs, AVFilterGraph *fg, AVFilterContext **sinks,
+		AVFilterInOut **pads)
+{
+	int idx, err;
+	char *p;
+	AVFilterInOut *_inpads;
+	AVFilterInOut *_outpads;
+	idx = st->index;
+	encs[idx] = avcodec_alloc_context3(avcodec_find_encoder_by_name(
+				jstr(jlookup(stream, "codec"), 0)));
+	if (!encs[idx])
+		job_failed("Could not allocate decoder context");
+
+	/* Prepare Filter Sink */
+	if (encs[idx]->codec_type == AVMEDIA_TYPE_VIDEO)
+	{
+		err = avfilter_graph_create_filter(&sinks[idx],
+				avfilter_get_by_name("buffersink"), 0, 0, 0, fg);
+		if (err < 0)
+			job_failed("Creating buffer sink failed: %s", av_err2str(err));
+		av_opt_set_int_list(sinks[idx], "pix_fmts", encs[idx]->codec->pix_fmts,
+				AV_PIX_FMT_NONE, AV_OPT_SEARCH_CHILDREN);
+	}
+	else if (encs[idx]->codec_type == AVMEDIA_TYPE_AUDIO)
+	{
+		err = avfilter_graph_create_filter(&sinks[idx],
+				avfilter_get_by_name("abuffersink"), 0, 0, 0, fg);
+		if (err < 0)
+			job_failed("Creating abuffer sink failed: %s", av_err2str(err));
+		av_opt_set_int_list(sinks[idx], "sample_fmts", encs[idx]->codec->sample_fmts,
+				AV_SAMPLE_FMT_NONE, AV_OPT_SEARCH_CHILDREN);
+		av_opt_set_int_list(sinks[idx], "sample_rates",
+				encs[idx]->codec->supported_samplerates, 0, AV_OPT_SEARCH_CHILDREN);
+		av_opt_set_int_list(sinks[idx], "channel_layouts",
+				encs[idx]->codec->channel_layouts, 0, AV_OPT_SEARCH_CHILDREN);
+	}
+	else
+		job_failed("Requested codec has unsupported type\n");
+	for (; *pads; pads = &(*pads)->next);
+	*pads = avfilter_inout_alloc();
+	(*pads)->filter_ctx = sinks[idx];
+	(*pads)->pad_idx = 0;
+	(*pads)->next = 0;
+	for (p = jenter(jlookup(stream, "filters")); p; p = jnext(p))
+	{
+		(*pads)->name = av_strdup("out");
+		if ((err = avfilter_graph_parse_ptr(fg, jstr(p, 0), pads, 0, 0)) < 0)
+			job_failed("Parsing filter string \"%s\" failed: %s", jstr(p, 0), av_err2str(err));
+		if ((*pads)->next)
+			job_failed("Unconnected filter pad\n");
+	}
+	(*pads)->name = av_strdup(jstr(jlookup(stream, "name"), "unknown"));
+}
+
+static void setup_output_stream(char *stream, AVStream *st, AVCodecContext *enc,
+		AVFilterContext *sink)
+{
+	int err;
+	AVFilterLink *l;
+	AVDictionary *opts;
+	l = sink->inputs[0];
+	if (enc->codec_type == AVMEDIA_TYPE_VIDEO)
+	{
+		/* Why not use av_buffersink_get_* here? */
+		enc->height = l->h;
+		enc->width = l->w;
+		enc->sample_aspect_ratio = l->sample_aspect_ratio;
+		enc->pix_fmt = l->format;
+	}
+	else if (enc->codec_type == AVMEDIA_TYPE_AUDIO)
+	{
+		enc->sample_rate = l->sample_rate;
+		enc->channel_layout = l->channel_layout;
+		enc->channels = l->channels;
+		enc->sample_fmt = l->format;
+	}
+	enc->time_base = l->time_base;
+	opts = 0;
+	parse_dict(&opts, jlookup(stream, "options"));
+	if ((err = avcodec_open2(enc, enc->codec, &opts)) < 0)
+		job_failed("Opening encoder failed: %s", av_err2str(err));
+	assert_empty_opts(opts);
+	avcodec_parameters_from_context(st->codecpar, enc);
+}
+
+int main(int argc, char *argv[])
+{
+	int err, i, progress, _progress;
+	char *p, *input, *output, *inpath, *outpath, *tmppath;
+	AVFormatContext *demux, *mux;
+	AVCodecContext **decs, **encs;
+	AVFilterContext **srcs, **sinks;
+	AVFilterGraph *fg;
+	AVFilterInOut *inpads, *outpads;
+	AVPacket pkt;
+	AVFrame *frame;
+	AVStream *stream;
+	AVDictionary *opts;
+	if (argc != 5)
+		return 1;
+	av_register_all();
+	avfilter_register_all();
+	init_avlogbuf();
+	memset(&pkt, 0, sizeof(pkt));
+	av_init_packet(&pkt);
+	if (!(frame = av_frame_alloc()))
+		return 99;
+	if (!(fg = avfilter_graph_alloc()))
+		return 99;
+
+	jobid = atoi(argv[1]);
+	input = jlookup(argv[4], "input");
+	inpath = mprintf("%s/%s", CONFIG_VIDEOS_RAW, jstr(jlookup(input, "path"), ""));
+	output = jlookup(argv[4], "output");
+	outpath = mprintf("%s/%s", CONFIG_VIDEOS_RELEASED, jstr(jlookup(output, "path"), ""));
+	tmppath = mprintf("%s/.tmp-%i", CONFIG_VIDEOS_TMP, jobid);
+
+	demux = 0;
+	opts = 0;
+	parse_dict(&opts, jlookup(input, "options"));
+	if (err = avformat_open_input(&demux, inpath, 0, &opts))
+		job_failed("Opening input file failed: %s", av_err2str(err));
+	assert_empty_opts(opts);
+	avformat_find_stream_info(demux, 0);
+	decs = xmalloc(sizeof(AVCodecContext *)*demux->nb_streams);
+	srcs = xmalloc(sizeof(AVFilterContext *)*demux->nb_streams);
+	inpads = 0;
+	for (p = jenter(jlookup(input, "streams")); p; p = jnext(p))
+		setup_input_stream(p, demux, decs, fg, srcs, &inpads);
+
+	err = avformat_alloc_output_context2(&mux, 0,
+			jstr(jlookup(output, "format"), 0), outpath);
+	if (err < 0)
+		job_failed("Error allocating muxer context: %s", av_err2str(err));
+	parse_dict(&mux->metadata, jlookup(output, "metadata"));
+	parse_chapters(mux, jlookup(output, "chapters"), demux->duration);
+	for (i = 0, p = jenter(jlookup(output, "streams")); p; i ++, p = jnext(p));
+	sinks = xmalloc(sizeof(AVFilterContext *)*i);
+	encs = xmalloc(sizeof(AVCodecContext *)*i);
+	outpads = 0;
+	for (i = 0, p = jenter(jlookup(output, "streams")); p; i ++, p = jnext(p))
+		prepare_output_stream(p, avformat_new_stream(mux, 0), encs, fg, sinks, &outpads);
+
+	/* TODO: Connect pads of same name before applying any filter strings */
+	for (p = jenter(jlookup(argv[4], "filters")); p; p = jnext(p))
+		if ((err = avfilter_graph_parse_ptr(fg, jstr(p, 0), &outpads, &inpads, 0)) < 0)
+			job_failed("Parsing filter string \"%s\" failed: %s", jstr(p, 0), av_err2str(err));
+	if (avfilter_graph_config(fg, 0) < 0)
+		job_failed("Error configuring filter graph: %s", av_err2str(err));
+
+	for (i = 0, p = jenter(jlookup(output, "streams")); p; i ++, p = jnext(p))
+		setup_output_stream(p, mux->streams[i], encs[i], sinks[i]);
+	if ((err = avio_open(&mux->pb, tmppath, AVIO_FLAG_WRITE)) < 0)
+		job_failed("Opening temporary file \"%s\" failed: %s", tmppath, av_err2str(err));
+	opts = 0;
+	parse_dict(&opts, jlookup(output, "options"));
+	if ((err = avformat_write_header(mux, &opts)) < 0)
+		job_failed("Writing temporary file failed: %s", av_err2str(err));
+	assert_empty_opts(opts);
+
+	progress = 0;
+	ping_job(jobid, "running", 0);
+	while (!av_read_frame(demux, &pkt))
+	{
+		i = pkt.stream_index;
+		_progress = av_rescale_q(pkt.pts, demux->streams[i]->time_base, AV_TIME_BASE_Q)*100/demux->duration;
+		if (_progress > progress)
+		{
+			ping_job(jobid, "running", "{\"progress\": %i, \"log\": \"%s\"}", _progress,
+					jescape(get_avlogbuf()));
+			progress = _progress;
+		}
+		if (!decs[i])
+			continue;
+		avcodec_send_packet(decs[i], &pkt);
+		while (!avcodec_receive_frame(decs[i], frame))
+			if ((err = av_buffersrc_add_frame(srcs[i], frame)) < 0)
+				job_failed("Could not insert frame into filter graph: %s", av_err2str(err));
+		for (i = 0; i < mux->nb_streams; i ++)
+			while (av_buffersink_get_frame(sinks[i], frame) >= 0)
+			{
+				avcodec_send_frame(encs[i], frame);
+				av_frame_unref(frame);
+				while (!avcodec_receive_packet(encs[i], &pkt))
+				{
+					pkt.stream_index = i;
+					av_packet_rescale_ts(&pkt, sinks[i]->inputs[0]->time_base,
+							mux->streams[i]->time_base);
+					if (err = av_interleaved_write_frame(mux, &pkt))
+						job_failed("Could not write frame: %s", av_err2str(err));
+				}
+			}
+	}
+
+	avformat_close_input(&demux);
+	/* Flush */
+	for (i = 0; i < mux->nb_streams; i ++)
+	{
+		avcodec_send_frame(encs[i], 0);
+		while (!avcodec_receive_packet(encs[i], &pkt))
+		{
+			pkt.stream_index = i;
+			av_packet_rescale_ts(&pkt, sinks[i]->inputs[0]->time_base,
+					mux->streams[i]->time_base);
+			av_interleaved_write_frame(mux, &pkt);
+		}
+	}
+	av_interleaved_write_frame(mux, 0);
+	if (err = av_write_trailer(mux))
+		job_failed("Error writing trailer to temporary file", av_err2str(err));
+	avio_closep(&mux->pb);
+	if (rename(tmppath, outpath))
+		job_failed("Overwriting output file \"%s\" failed: %s", outpath, strerror(errno));
+	unlink(tmppath);
+	ping_job(jobid, "finished", "{\"hash\": \"%s\", \"duration\": %f, \"filesize\": %i, \"log\": \"%s\"}",
+			hashfile(outpath), fileduration(outpath), filesize(outpath), jescape(get_avlogbuf()));
+	return 0;
+}
-- 
GitLab