#include <stdio.h> #include <unistd.h> #include <sys/stat.h> #include <libgen.h> #include <libavutil/opt.h> #include <libavcodec/avcodec.h> #include <libavformat/avformat.h> #include <libavfilter/avfilter.h> #include <libavfilter/buffersink.h> #include <libavfilter/buffersrc.h> #include "util.h" void assert_empty_opts(AVDictionary *opts) { AVDictionaryEntry *e; e = 0; while (e = av_dict_get(opts, "", e, AV_DICT_IGNORE_SUFFIX)) job_failed("Unrecognized option: %s", e->key); } static void setup_input_stream(char *stream, AVFormatContext *demux, AVCodecContext **decs, AVFilterGraph *fg, AVFilterContext **srcs, AVFilterInOut **pads) { char *p, *tmp; int idx, err; enum AVMediaType type; AVDictionary *opts; AVFilterInOut *_inpads; AVFilterInOut *_outpads; type = AVMEDIA_TYPE_VIDEO; if (!strcmp(jstr(jlookup(stream, "type"), ""), "audio")) type = AVMEDIA_TYPE_AUDIO; idx = av_find_best_stream(demux, type, jint(jlookup(stream, "index"), -1), -1, 0, 0); if ((idx == -1 || decs[idx]) && jint(jlookup(stream, "index"), -1) == -1) for (idx = 0; idx < demux->nb_streams && (decs[idx] || demux->streams[idx]->codecpar->codec_type == type); idx ++); if (idx == -1 || decs[idx]) job_failed("Could not find suitable input stream"); /* Prepare Decoder */ decs[idx] = avcodec_alloc_context3(avcodec_find_decoder(demux->streams[idx]->codecpar->codec_id)); if (!decs[idx]) job_failed("Could not allocate decoder context"); avcodec_parameters_to_context(decs[idx], demux->streams[idx]->codecpar); opts = 0; parse_dict(&opts, jlookup(stream, "options")); if ((err = avcodec_open2(decs[idx], decs[idx]->codec, &opts)) < 0) job_failed("Opening decoder failed: %s", av_err2str(err)); assert_empty_opts(opts); /* Prepare Filter Source */ if (type == AVMEDIA_TYPE_VIDEO) { p = mprintf("video_size=%dx%d:pix_fmt=%d:time_base=%d/%d:pixel_aspect=%d/%d", decs[idx]->width, decs[idx]->height, decs[idx]->pix_fmt, demux->streams[idx]->time_base.num, demux->streams[idx]->time_base.den, decs[idx]->sample_aspect_ratio.num, decs[idx]->sample_aspect_ratio.den); err = avfilter_graph_create_filter(&srcs[idx], avfilter_get_by_name("buffer"), 0, p, 0, fg); if (err < 0) job_failed("Creating buffer source failed: %s", av_err2str(err)); free(p); } else if (type == AVMEDIA_TYPE_AUDIO) { // Set channel layout to default if not set if(!decs[idx]->channel_layout) decs[idx]->channel_layout = av_get_default_channel_layout(decs[idx]->channels); /*if(decs[idx]->ch_layout.order == AV_CHANNEL_ORDER_UNSPEC) av_channel_layout_default(&decs[idx]->ch_layout, decs[idx]->ch_layout.nb_channels); char ch_layout[64]; err = av_channel_layout_describe(&decs[idx]->ch_layout, ch_layout, sizeof(ch_layout)); if (err < 0) job_failed("Could not describe channel layout: %s", av_err2str(err));*/ p = mprintf("time_base=%d/%d:sample_rate=%d:sample_fmt=%d:channel_layout=%d:channels=%d", demux->streams[idx]->time_base.num, demux->streams[idx]->time_base.den, decs[idx]->sample_rate, decs[idx]->sample_fmt, decs[idx]->channel_layout, decs[idx]->channels); err = avfilter_graph_create_filter(&srcs[idx], avfilter_get_by_name("abuffer"), 0, p, 0, fg); if (err < 0) job_failed("Creating abuffer source failed: %s", av_err2str(err)); free(p); } for (; *pads; pads = &(*pads)->next); *pads = avfilter_inout_alloc(); (*pads)->filter_ctx = srcs[idx]; (*pads)->pad_idx = 0; (*pads)->next = 0; for (p = jenter(jlookup(stream, "filters")); p; p = jnext(p)) { (*pads)->name = av_strdup("in"); tmp = eval_vars(jstr(p, "ERROR")); if ((err = avfilter_graph_parse_ptr(fg, tmp, 0, pads, 0)) < 0) job_failed("Parsing filter string \"%s\" failed: %s", tmp, av_err2str(err)); if ((*pads)->next) job_failed("Unconnected filter pad\n"); } (*pads)->name = av_strdup(jstr(jlookup(stream, "name"), "unknown")); } static void prepare_output_stream(char *stream, AVStream *st, AVCodecContext **encs, AVFilterGraph *fg, AVFilterContext **sinks, AVFilterInOut **pads) { int idx, err; char *p, *tmp; AVFilterInOut *_inpads; AVFilterInOut *_outpads; idx = st->index; encs[idx] = avcodec_alloc_context3(avcodec_find_encoder_by_name( jstr(jlookup(stream, "codec"), 0))); if (!encs[idx]) job_failed("Could not allocate decoder context"); /* Prepare Filter Sink */ if (encs[idx]->codec_type == AVMEDIA_TYPE_VIDEO) { err = avfilter_graph_create_filter(&sinks[idx], avfilter_get_by_name("buffersink"), 0, 0, 0, fg); if (err < 0) job_failed("Creating buffer sink failed: %s", av_err2str(err)); av_opt_set_int_list(sinks[idx], "pix_fmts", encs[idx]->codec->pix_fmts, AV_PIX_FMT_NONE, AV_OPT_SEARCH_CHILDREN); } else if (encs[idx]->codec_type == AVMEDIA_TYPE_AUDIO) { err = avfilter_graph_create_filter(&sinks[idx], avfilter_get_by_name("abuffersink"), 0, 0, 0, fg); if (err < 0) job_failed("Creating abuffer sink failed: %s", av_err2str(err)); av_opt_set_int_list(sinks[idx], "sample_fmts", encs[idx]->codec->sample_fmts, AV_SAMPLE_FMT_NONE, AV_OPT_SEARCH_CHILDREN); av_opt_set_int_list(sinks[idx], "sample_rates", encs[idx]->codec->supported_samplerates, 0, AV_OPT_SEARCH_CHILDREN); av_opt_set_int_list(sinks[idx], "channel_layouts", encs[idx]->codec->channel_layouts, 0, AV_OPT_SEARCH_CHILDREN); } else job_failed("Requested codec has unsupported type\n"); for (; *pads; pads = &(*pads)->next); *pads = avfilter_inout_alloc(); (*pads)->filter_ctx = sinks[idx]; (*pads)->pad_idx = 0; (*pads)->next = 0; for (p = jenter(jlookup(stream, "filters")); p; p = jnext(p)) { (*pads)->name = av_strdup("out"); tmp = eval_vars(jstr(p, "ERROR")); if ((err = avfilter_graph_parse_ptr(fg, tmp, pads, 0, 0)) < 0) job_failed("Parsing filter string \"%s\" failed: %s", tmp, av_err2str(err)); if ((*pads)->next) job_failed("Unconnected filter pad\n"); } (*pads)->name = av_strdup(jstr(jlookup(stream, "name"), "unknown")); } static void setup_output_stream(char *stream, AVStream *st, AVCodecContext *enc, AVFilterContext *sink) { int err; AVDictionary *opts; if (enc->codec_type == AVMEDIA_TYPE_VIDEO) { /* Why not use av_buffersink_get_* here? */ enc->height = av_buffersink_get_h(sink); enc->width = av_buffersink_get_w(sink); enc->sample_aspect_ratio = av_buffersink_get_sample_aspect_ratio(sink); enc->pix_fmt = av_buffersink_get_format(sink); enc->framerate = av_buffersink_get_frame_rate(sink); } else if (enc->codec_type == AVMEDIA_TYPE_AUDIO) { enc->sample_rate = av_buffersink_get_sample_rate(sink); enc->channel_layout = av_buffersink_get_channel_layout(sink); enc->channels = av_buffersink_get_channels(sink); enc->sample_fmt = av_buffersink_get_format(sink); } enc->time_base = av_buffersink_get_time_base(sink); opts = 0; parse_dict(&opts, jlookup(stream, "options")); if ((err = avcodec_open2(enc, enc->codec, &opts)) < 0) job_failed("Opening encoder failed: %s", av_err2str(err)); if (enc->codec_type == AVMEDIA_TYPE_AUDIO && !(enc->codec->capabilities & AV_CODEC_CAP_VARIABLE_FRAME_SIZE && enc->frame_size)) av_buffersink_set_frame_size(sink, enc->frame_size); assert_empty_opts(opts); avcodec_parameters_from_context(st->codecpar, enc); } static void flush_encoder(AVFormatContext *mux, AVFilterContext *sink, AVCodecContext *enc, int idx) { int err; AVPacket pkt; pkt.data = 0; pkt.size = 0; av_init_packet(&pkt); while (!avcodec_receive_packet(enc, &pkt)) { pkt.stream_index = idx; av_packet_rescale_ts(&pkt, enc->time_base, mux->streams[idx]->time_base); if (err = av_interleaved_write_frame(mux, &pkt)) job_failed("Could not write frame: %s", av_err2str(err)); } } static void flush_filtergraph(AVFormatContext *mux, AVFilterContext **sinks, AVCodecContext **encs) { int i; AVFrame *frame; if (!(frame = av_frame_alloc())) exit(99); for (i = 0; i < mux->nb_streams; i ++) while (av_buffersink_get_frame(sinks[i], frame) >= 0) { frame->pict_type = AV_PICTURE_TYPE_NONE; avcodec_send_frame(encs[i], frame); av_frame_unref(frame); flush_encoder(mux, sinks[i], encs[i], i); } av_frame_free(&frame); } static void filtergraph_send(AVFilterContext *src, AVCodecContext *dec, AVPacket *pkt) { int err; AVFrame *frame; if (!dec || !src) return; if (!(frame = av_frame_alloc())) exit(99); avcodec_send_packet(dec, pkt); while (!avcodec_receive_frame(dec, frame)) if ((err = av_buffersrc_add_frame(src, frame)) < 0) job_failed("Could not insert frame into filter graph: %s", av_err2str(err)); av_frame_free(&frame); } void connect_pads(AVFilterInOut **ins, AVFilterInOut **outs) { int err; AVFilterInOut *in, *out; /* TODO: Really remove matched pads from ins/outs */ for (in = *ins; in; in = in->next) { if (!in->name) continue; for (out = *outs; out && (!out->name || strcmp(in->name, out->name)); out = out->next); if (!out) continue; if ((err = avfilter_link(in->filter_ctx, in->pad_idx, out->filter_ctx, out->pad_idx)) < 0) job_failed("Could not connect remaining filter pads named \"%s\": %s", in->name, av_err2str(err)); out->name = 0; } } int main(int argc, char *argv[]) { int err, i, progress, _progress, canceled; char *p, *input, *output, *inpath, *outdir, *outpath, *tmppath, *oldsrcpath; AVFormatContext *demux, *mux; AVCodecContext **decs, **encs; AVFilterContext **srcs, **sinks; AVFilterGraph *fg; AVFilterInOut *inpads, *outpads; AVPacket pkt; AVStream *stream; AVDictionary *opts; if (argc != 5) return 1; init_env(); umask(S_IWOTH); init_avlogbuf(); pkt.data = 0; pkt.size = 0; av_init_packet(&pkt); if (!(fg = avfilter_graph_alloc())) return 99; jobid = atoi(argv[1]); ping_job(jobid, "running", 0); input = jlookup(argv[4], "input"); inpath = buildpath(getenv(WORKER_RAW), jstr(jlookup(input, "path"), 0)); output = jlookup(argv[4], "output"); outdir = buildpath(getenv(WORKER_RELEASED), dirname(jstr(jlookup(output, "path"), 0))); if (access(outdir, F_OK) && mkdir(outdir, 02775)) job_failed("Could not create target directory \"%s\": %s", outdir, strerror(errno)); outpath = buildpath(getenv(WORKER_RELEASED), jstr(jlookup(output, "path"), 0)); tmppath = mprintf("%s/.tmp-%i", getenv(WORKER_TMP), jobid); if (oldsrcpath = jstr(jlookup(argv[4], "srcpath"), 0)) oldsrcpath = buildpath(getenv(WORKER_RAW), oldsrcpath); else oldsrcpath = "/var/empty/nosource"; overwrite_check(outpath, oldsrcpath, jstr(jlookup(argv[4], "srchash"), "")); demux = 0; opts = 0; parse_dict(&opts, jlookup(input, "options")); if (err = avformat_open_input(&demux, inpath, 0, &opts)) job_failed("Opening input file failed: %s", av_err2str(err)); assert_empty_opts(opts); avformat_find_stream_info(demux, 0); decs = xmalloc(sizeof(AVCodecContext *)*demux->nb_streams); srcs = xmalloc(sizeof(AVFilterContext *)*demux->nb_streams); inpads = 0; for (p = jenter(jlookup(input, "streams")); p; p = jnext(p)) setup_input_stream(p, demux, decs, fg, srcs, &inpads); err = avformat_alloc_output_context2(&mux, av_guess_format(jstr(jlookup(output, "format"), 0), outpath, 0), 0, tmppath); if (err < 0) job_failed("Error allocating muxer context: %s", av_err2str(err)); parse_dict(&mux->metadata, jlookup(output, "metadata")); parse_chapters(mux, jlookup(output, "chapters"), demux->duration); for (i = 0, p = jenter(jlookup(output, "streams")); p; i ++, p = jnext(p)); sinks = xmalloc(sizeof(AVFilterContext *)*i); encs = xmalloc(sizeof(AVCodecContext *)*i); outpads = 0; for (i = 0, p = jenter(jlookup(output, "streams")); p; i ++, p = jnext(p)) prepare_output_stream(p, avformat_new_stream(mux, 0), encs, fg, sinks, &outpads); for (p = jenter(jlookup(argv[4], "filters")); p; p = jnext(p)) if ((err = avfilter_graph_parse_ptr(fg, jstr(p, "ERROR"), &outpads, &inpads, 0)) < 0) job_failed("Parsing filter string \"%s\" failed: %s", jstr(p, 0), av_err2str(err)); connect_pads(&inpads, &outpads); if (avfilter_graph_config(fg, 0) < 0) job_failed("Error configuring filter graph: %s", av_err2str(err)); for (i = 0, p = jenter(jlookup(output, "streams")); p; i ++, p = jnext(p)) setup_output_stream(p, mux->streams[i], encs[i], sinks[i]); if ((err = avio_open(&mux->pb, tmppath, AVIO_FLAG_WRITE)) < 0) job_failed("Opening temporary file \"%s\" failed: %s", tmppath, av_err2str(err)); opts = 0; parse_dict(&opts, jlookup(output, "options")); if ((err = avformat_write_header(mux, &opts)) < 0) job_failed("Writing temporary file failed: %s", av_err2str(err)); assert_empty_opts(opts); progress = 0; ping_job(jobid, "running", 0); while (!av_read_frame(demux, &pkt)) { i = pkt.stream_index; _progress = av_rescale_q(pkt.pts, demux->streams[i]->time_base, AV_TIME_BASE_Q)*100/demux->duration; if (_progress > progress || !checktime(30)) { canceled = ping_job(jobid, "running", "{\"progress\": %i, \"log\": \"%s\"}", _progress, jescape(get_avlogbuf())); if (canceled == 1) job_failed("Job canceled"); progress = _progress; } filtergraph_send(srcs[i], decs[i], &pkt); flush_filtergraph(mux, sinks, encs); av_packet_unref(&pkt); } /* Flush */ for (i = 0; i < demux->nb_streams; i ++) { if (!decs[i] || !srcs[i]) continue; filtergraph_send(srcs[i], decs[i], 0); !av_buffersrc_add_frame(srcs[i], 0); } flush_filtergraph(mux, sinks, encs); for (i = 0; i < mux->nb_streams; i ++) { avcodec_send_frame(encs[i], 0); flush_encoder(mux, sinks[i], encs[i], i); } av_interleaved_write_frame(mux, 0); avformat_close_input(&demux); if (err = av_write_trailer(mux)) job_failed("Error writing trailer to temporary file", av_err2str(err)); avio_closep(&mux->pb); if (!filesize(tmppath)) job_failed("Sanity check failed: Output file is empty"); overwrite_check(outpath, oldsrcpath, jstr(jlookup(argv[4], "srchash"), "")); if (rename(tmppath, outpath)) job_failed("Overwriting output file \"%s\" failed: %s", outpath, strerror(errno)); unlink(tmppath); ping_job(jobid, "finished", "{%s, \"log\": \"%s\"}", json_fileinfo(outpath), jescape(get_avlogbuf())); return 0; }