srs/trunk/src/app/srs_app_encoder.cpp
Haibo Chen(陈海博) 133866a944
Transcode: Bugfix: Fix loop transcoding with host. #3516. v6.0.168 v7.0.41 (#4325)
#### What issue has been resolved?
for issue: https://github.com/ossrs/srs/issues/3516
https://github.com/ossrs/srs/issues/4055
https://github.com/ossrs/srs/pull/3618

#### What is the root cause of the problem?
The issue arises from a mismatch between the `input` and `output`
formats within the
[`SrsEncoder::initialize_ffmpeg`](https://github.com/ossrs/srs/pull/4325/files#diff-a3dd7c498fc26d36def2e8c2c3b7edfe1bf78f0620b1a838aefa70ba119cad03L241-L254)
function.

For example:
Input: `rtmp://127.0.0.1:1935/live?vhost=__defaultVhost__/livestream_ff`
Output:
`rtmp://127.0.0.1:1935/live/livestream_ff?vhost=__defaultVhost__`

This may result in the failure of the [code
segment](https://github.com/ossrs/srs/pull/4325/files#diff-a3dd7c498fc26d36def2e8c2c3b7edfe1bf78f0620b1a838aefa70ba119cad03L292-L298)
responsible for determining whether to loop.

#### What is the approach to solving this issue?
It simply involves modifying the order of `stream` and `vhost`.

#### How was the issue introduced?
The commit introducing this bug is:
7d47017a00
The order of [parameters in the configuration
file](7d47017a00 (diff-428de168925d659dae72bb49273c3b048ed2800906c6848560badae854250126L26-R26))
has been modified to address the `ingest` issue.

#### Outstanding issues
Please note that this PR does not entirely resolve the issue; for
example, modifying the `output` format in configuration still results in
exceptions. To comprehensively address this problem, extensive code
modifications would be required.

However, strictly adhering to the configuration file format can
effectively prevent this issue.

---------

Co-authored-by: Jacob Su <suzp1984@gmail.com>
Co-authored-by: john <hondaxiao@tencent.com>
Co-authored-by: winlin <winlinvip@gmail.com>
2025-06-04 10:11:58 -04:00

322 lines
8.9 KiB
C++

//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#include <srs_app_encoder.hpp>
#include <algorithm>
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_app_config.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_ffmpeg.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_utility.hpp>
// for encoder to detect the dead loop
static std::vector<std::string> _transcoded_url;
SrsEncoder::SrsEncoder()
{
trd = new SrsDummyCoroutine();
pprint = SrsPithyPrint::create_encoder();
}
SrsEncoder::~SrsEncoder()
{
on_unpublish();
srs_freep(trd);
srs_freep(pprint);
}
srs_error_t SrsEncoder::on_publish(SrsRequest* req)
{
srs_error_t err = srs_success;
// parse the transcode engines for vhost and app and stream.
err = parse_scope_engines(req);
// ignore the loop encoder
// if got a loop, donot transcode the whole stream.
if (srs_error_code(err) == ERROR_ENCODER_LOOP) {
clear_engines();
srs_error_reset(err);
}
// return for error or no engine.
if (err != srs_success || ffmpegs.empty()) {
return err;
}
// start thread to run all encoding engines.
srs_freep(trd);
trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id());
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start encoder");
}
return err;
}
void SrsEncoder::on_unpublish()
{
trd->stop();
clear_engines();
}
// when error, encoder sleep for a while and retry.
#define SRS_RTMP_ENCODER_CIMS (3 * SRS_UTIME_SECONDS)
srs_error_t SrsEncoder::cycle()
{
srs_error_t err = srs_success;
while (true) {
// We always check status first.
// @see https://github.com/ossrs/srs/issues/1634#issuecomment-597571561
if ((err = trd->pull()) != srs_success) {
err = srs_error_wrap(err, "encoder");
break;
}
if ((err = do_cycle()) != srs_success) {
srs_warn("Encoder: Ignore error, %s", srs_error_desc(err).c_str());
srs_error_reset(err);
}
srs_usleep(SRS_RTMP_ENCODER_CIMS);
}
// kill ffmpeg when finished and it alive
std::vector<SrsFFMPEG*>::iterator it;
for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
SrsFFMPEG* ffmpeg = *it;
ffmpeg->stop();
}
return err;
}
srs_error_t SrsEncoder::do_cycle()
{
srs_error_t err = srs_success;
std::vector<SrsFFMPEG*>::iterator it;
for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
SrsFFMPEG* ffmpeg = *it;
// start all ffmpegs.
if ((err = ffmpeg->start()) != srs_success) {
return srs_error_wrap(err, "ffmpeg start");
}
// check ffmpeg status.
if ((err = ffmpeg->cycle()) != srs_success) {
return srs_error_wrap(err, "ffmpeg cycle");
}
}
// pithy print
show_encode_log_message();
return err;
}
void SrsEncoder::clear_engines()
{
std::vector<SrsFFMPEG*>::iterator it;
for (it = ffmpegs.begin(); it != ffmpegs.end(); ++it) {
SrsFFMPEG* ffmpeg = *it;
std::string output = ffmpeg->output();
std::vector<std::string>::iterator tu_it;
tu_it = std::find(_transcoded_url.begin(), _transcoded_url.end(), output);
if (tu_it != _transcoded_url.end()) {
_transcoded_url.erase(tu_it);
}
srs_freep(ffmpeg);
}
ffmpegs.clear();
}
SrsFFMPEG* SrsEncoder::at(int index)
{
return ffmpegs[index];
}
srs_error_t SrsEncoder::parse_scope_engines(SrsRequest* req)
{
srs_error_t err = srs_success;
// parse all transcode engines.
SrsConfDirective* conf = NULL;
// parse vhost scope engines
std::string scope = "";
if ((conf = _srs_config->get_transcode(req->vhost, scope)) != NULL) {
if ((err = parse_ffmpeg(req, conf)) != srs_success) {
return srs_error_wrap(err, "parse ffmpeg");
}
}
// parse app scope engines
scope = req->app;
if ((conf = _srs_config->get_transcode(req->vhost, scope)) != NULL) {
if ((err = parse_ffmpeg(req, conf)) != srs_success) {
return srs_error_wrap(err, "parse ffmpeg");
}
}
// parse stream scope engines
scope += "/";
scope += req->stream;
if ((conf = _srs_config->get_transcode(req->vhost, scope)) != NULL) {
if ((err = parse_ffmpeg(req, conf)) != srs_success) {
return srs_error_wrap(err, "parse ffmpeg");
}
}
return err;
}
srs_error_t SrsEncoder::parse_ffmpeg(SrsRequest* req, SrsConfDirective* conf)
{
srs_error_t err = srs_success;
srs_assert(conf);
// enabled
if (!_srs_config->get_transcode_enabled(conf)) {
srs_trace("ignore the disabled transcode: %s", conf->arg0().c_str());
return err;
}
// ffmpeg
std::string ffmpeg_bin = _srs_config->get_transcode_ffmpeg(conf);
if (ffmpeg_bin.empty()) {
srs_trace("ignore the empty ffmpeg transcode: %s", conf->arg0().c_str());
return err;
}
// get all engines.
std::vector<SrsConfDirective*> engines = _srs_config->get_transcode_engines(conf);
if (engines.empty()) {
srs_trace("ignore the empty transcode engine: %s", conf->arg0().c_str());
return err;
}
// create engine
for (int i = 0; i < (int)engines.size(); i++) {
SrsConfDirective* engine = engines[i];
if (!_srs_config->get_engine_enabled(engine)) {
srs_trace("ignore the diabled transcode engine: %s %s", conf->arg0().c_str(), engine->arg0().c_str());
continue;
}
SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
if ((err = initialize_ffmpeg(ffmpeg, req, engine)) != srs_success) {
srs_freep(ffmpeg);
return srs_error_wrap(err, "init ffmpeg");
}
ffmpegs.push_back(ffmpeg);
}
return err;
}
srs_error_t SrsEncoder::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsRequest* req, SrsConfDirective* engine)
{
srs_error_t err = srs_success;
std::string input;
// input stream, from local.
// ie. rtmp://localhost:1935/live/livestream
input = "rtmp://";
input += SRS_CONSTS_LOCALHOST;
input += ":";
input += srs_int2str(req->port);
input += "/";
input += req->app;
input += "/";
input += req->stream;
input += "?vhost=";
input += req->vhost;
// stream name: vhost/app/stream for print
input_stream_name = req->vhost;
input_stream_name += "/";
input_stream_name += req->app;
input_stream_name += "/";
input_stream_name += req->stream;
std::string output = _srs_config->get_engine_output(engine);
// output stream, to other/self server
// ie. rtmp://localhost:1935/live/livestream_sd
output = srs_string_replace(output, "[vhost]", req->vhost);
output = srs_string_replace(output, "[port]", srs_int2str(req->port));
output = srs_string_replace(output, "[app]", req->app);
output = srs_string_replace(output, "[stream]", req->stream);
output = srs_string_replace(output, "[param]", req->param);
output = srs_string_replace(output, "[engine]", engine->arg0());
output = srs_path_build_timestamp(output);
std::string log_file = SRS_CONSTS_NULL_FILE; // disabled
// write ffmpeg info to log file.
if (_srs_config->get_ff_log_enabled()) {
log_file = _srs_config->get_ff_log_dir();
log_file += "/";
log_file += "ffmpeg-encoder";
log_file += "-";
log_file += req->vhost;
log_file += "-";
log_file += req->app;
log_file += "-";
log_file += req->stream;
if (!engine->args.empty()) {
log_file += "-";
log_file += engine->arg0();
}
log_file += ".log";
}
// important: loop check, donot transcode again.
std::vector<std::string>::iterator it;
it = std::find(_transcoded_url.begin(), _transcoded_url.end(), input);
if (it != _transcoded_url.end()) {
return srs_error_new(ERROR_ENCODER_LOOP, "detect a loop cycle, input=%s, output=%s", input.c_str(), output.c_str());
}
_transcoded_url.push_back(output);
if ((err = ffmpeg->initialize(input, output, log_file)) != srs_success) {
return srs_error_wrap(err, "init ffmpeg");
}
if ((err = ffmpeg->initialize_transcode(engine)) != srs_success) {
return srs_error_wrap(err, "init transcode");
}
return err;
}
void SrsEncoder::show_encode_log_message()
{
pprint->elapse();
// reportable
if (pprint->can_print()) {
// TODO: FIXME: show more info.
srs_trace("-> " SRS_CONSTS_LOG_ENCODER " time=%" PRId64 ", encoders=%d, input=%s",
pprint->age(), (int)ffmpegs.size(), input_stream_name.c_str());
}
}