AI: Add utest to cover encoder module

This commit is contained in:
OSSRS-AI 2025-10-11 06:49:06 -04:00 committed by winlin
parent c6c6f38ed7
commit b239975458
21 changed files with 1884 additions and 94 deletions

View File

@ -24,4 +24,5 @@ fi
echo "Formatting source files in trunk directory..."
# Exclude the 3rdparty directory and format all .cpp, and .hpp
# Use -i to edit files in place
find trunk/src -name "*.*pp" | xargs clang-format -style=file -i
# Use xargs -P N to run N clang-format processes in parallel
find trunk/src -name "*.*pp" | xargs -P 16 -n 1 clang-format -style=file -i

View File

@ -395,6 +395,7 @@ public:
public:
// Vhost config
virtual void get_vhosts(std::vector<SrsConfDirective *> &vhosts) = 0;
virtual SrsConfDirective *get_vhost(std::string vhost, bool try_default_vhost = true) = 0;
virtual bool get_vhost_enabled(std::string vhost) = 0;
virtual bool get_vhost_enabled(SrsConfDirective *conf) = 0;
@ -545,6 +546,45 @@ public:
virtual bool get_dvr_enabled(std::string vhost) = 0;
virtual SrsConfDirective *get_dvr_apply(std::string vhost) = 0;
virtual srs_utime_t get_dvr_duration(std::string vhost) = 0;
public:
// Ingest config
virtual std::vector<SrsConfDirective *> get_ingesters(std::string vhost) = 0;
virtual SrsConfDirective *get_ingest_by_id(std::string vhost, std::string ingest_id) = 0;
virtual bool get_ingest_enabled(SrsConfDirective *conf) = 0;
virtual std::string get_ingest_ffmpeg(SrsConfDirective *conf) = 0;
virtual std::string get_ingest_input_type(SrsConfDirective *conf) = 0;
virtual std::string get_ingest_input_url(SrsConfDirective *conf) = 0;
public:
// FFmpeg log config
virtual bool get_ff_log_enabled() = 0;
virtual std::string get_ff_log_dir() = 0;
virtual std::string get_ff_log_level() = 0;
public:
// Transcode/Engine config
virtual std::vector<SrsConfDirective *> get_transcode_engines(SrsConfDirective *conf) = 0;
virtual bool get_engine_enabled(SrsConfDirective *conf) = 0;
virtual std::vector<std::string> get_engine_perfile(SrsConfDirective *conf) = 0;
virtual std::string get_engine_iformat(SrsConfDirective *conf) = 0;
virtual std::vector<std::string> get_engine_vfilter(SrsConfDirective *conf) = 0;
virtual std::string get_engine_vcodec(SrsConfDirective *conf) = 0;
virtual int get_engine_vbitrate(SrsConfDirective *conf) = 0;
virtual double get_engine_vfps(SrsConfDirective *conf) = 0;
virtual int get_engine_vwidth(SrsConfDirective *conf) = 0;
virtual int get_engine_vheight(SrsConfDirective *conf) = 0;
virtual int get_engine_vthreads(SrsConfDirective *conf) = 0;
virtual std::string get_engine_vprofile(SrsConfDirective *conf) = 0;
virtual std::string get_engine_vpreset(SrsConfDirective *conf) = 0;
virtual std::vector<std::string> get_engine_vparams(SrsConfDirective *conf) = 0;
virtual std::string get_engine_acodec(SrsConfDirective *conf) = 0;
virtual int get_engine_abitrate(SrsConfDirective *conf) = 0;
virtual int get_engine_asample_rate(SrsConfDirective *conf) = 0;
virtual int get_engine_achannels(SrsConfDirective *conf) = 0;
virtual std::vector<std::string> get_engine_aparams(SrsConfDirective *conf) = 0;
virtual std::string get_engine_oformat(SrsConfDirective *conf) = 0;
virtual std::string get_engine_output(SrsConfDirective *conf) = 0;
};
// The config service provider.

View File

@ -10,9 +10,12 @@
#include <srs_app_config.hpp>
#include <srs_app_dash.hpp>
#include <srs_app_dvr.hpp>
#include <srs_app_ffmpeg.hpp>
#include <srs_app_fragment.hpp>
#include <srs_app_gb28181.hpp>
#include <srs_app_ingest.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_rtc_conn.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_rtmp_source.hpp>
#include <srs_app_rtsp_source.hpp>
@ -25,7 +28,6 @@
#include <srs_kernel_utility.hpp>
#include <srs_protocol_http_client.hpp>
#include <srs_protocol_st.hpp>
#include <srs_app_rtc_conn.hpp>
ISrsAppFactory::ISrsAppFactory()
{
@ -173,6 +175,16 @@ ISrsRtcConnection *SrsAppFactory::create_rtc_connection(ISrsExecRtcAsyncTask *ex
return session;
}
ISrsFFMPEG *SrsAppFactory::create_ffmpeg(std::string ffmpeg_bin)
{
return new SrsFFMPEG(ffmpeg_bin);
}
ISrsIngesterFFMPEG *SrsAppFactory::create_ingester_ffmpeg()
{
return new SrsIngesterFFMPEG();
}
ISrsCoroutine *SrsAppFactory::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid)
{
return kernel_factory_->create_coroutine(name, handler, cid);

View File

@ -40,6 +40,8 @@ class ISrsIpListener;
class ISrsTcpHandler;
class ISrsRtcConnection;
class ISrsExecRtcAsyncTask;
class ISrsFFMPEG;
class ISrsIngesterFFMPEG;
// The factory to create app objects.
class ISrsAppFactory : public ISrsKernelFactory
@ -77,6 +79,8 @@ public:
virtual ISrsFragmentedMp4 *create_fragmented_mp4() = 0;
virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler) = 0;
virtual ISrsRtcConnection *create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid) = 0;
virtual ISrsFFMPEG *create_ffmpeg(std::string ffmpeg_bin) = 0;
virtual ISrsIngesterFFMPEG *create_ingester_ffmpeg() = 0;
};
// The factory to create app objects.
@ -118,6 +122,8 @@ public:
virtual ISrsFragmentedMp4 *create_fragmented_mp4();
virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler);
virtual ISrsRtcConnection *create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid);
virtual ISrsFFMPEG *create_ffmpeg(std::string ffmpeg_bin);
virtual ISrsIngesterFFMPEG *create_ingester_ffmpeg();
public:
virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid);

View File

@ -44,6 +44,14 @@ using namespace std;
#define SRS_RTMP_ENCODER_LIBAACPLUS "libaacplus"
#define SRS_RTMP_ENCODER_LIBFDKAAC "libfdk_aac"
ISrsFFMPEG::ISrsFFMPEG()
{
}
ISrsFFMPEG::~ISrsFFMPEG()
{
}
SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
{
ffmpeg_ = ffmpeg_bin;
@ -58,6 +66,8 @@ SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
achannels_ = 0;
process_ = new SrsProcess();
config_ = _srs_config;
}
SrsFFMPEG::~SrsFFMPEG()
@ -65,6 +75,8 @@ SrsFFMPEG::~SrsFFMPEG()
stop();
srs_freep(process_);
config_ = NULL;
}
void SrsFFMPEG::append_iparam(string iparam)
@ -97,24 +109,24 @@ srs_error_t SrsFFMPEG::initialize_transcode(SrsConfDirective *engine)
{
srs_error_t err = srs_success;
perfile_ = _srs_config->get_engine_perfile(engine);
iformat_ = _srs_config->get_engine_iformat(engine);
vfilter_ = _srs_config->get_engine_vfilter(engine);
vcodec_ = _srs_config->get_engine_vcodec(engine);
vbitrate_ = _srs_config->get_engine_vbitrate(engine);
vfps_ = _srs_config->get_engine_vfps(engine);
vwidth_ = _srs_config->get_engine_vwidth(engine);
vheight_ = _srs_config->get_engine_vheight(engine);
vthreads_ = _srs_config->get_engine_vthreads(engine);
vprofile_ = _srs_config->get_engine_vprofile(engine);
vpreset_ = _srs_config->get_engine_vpreset(engine);
vparams_ = _srs_config->get_engine_vparams(engine);
acodec_ = _srs_config->get_engine_acodec(engine);
abitrate_ = _srs_config->get_engine_abitrate(engine);
asample_rate_ = _srs_config->get_engine_asample_rate(engine);
achannels_ = _srs_config->get_engine_achannels(engine);
aparams_ = _srs_config->get_engine_aparams(engine);
oformat_ = _srs_config->get_engine_oformat(engine);
perfile_ = config_->get_engine_perfile(engine);
iformat_ = config_->get_engine_iformat(engine);
vfilter_ = config_->get_engine_vfilter(engine);
vcodec_ = config_->get_engine_vcodec(engine);
vbitrate_ = config_->get_engine_vbitrate(engine);
vfps_ = config_->get_engine_vfps(engine);
vwidth_ = config_->get_engine_vwidth(engine);
vheight_ = config_->get_engine_vheight(engine);
vthreads_ = config_->get_engine_vthreads(engine);
vprofile_ = config_->get_engine_vprofile(engine);
vpreset_ = config_->get_engine_vpreset(engine);
vparams_ = config_->get_engine_vparams(engine);
acodec_ = config_->get_engine_acodec(engine);
abitrate_ = config_->get_engine_abitrate(engine);
asample_rate_ = config_->get_engine_asample_rate(engine);
achannels_ = config_->get_engine_achannels(engine);
aparams_ = config_->get_engine_aparams(engine);
oformat_ = config_->get_engine_oformat(engine);
// ensure the size is even.
vwidth_ -= vwidth_ % 2;

View File

@ -16,13 +16,46 @@
class SrsConfDirective;
class SrsPithyPrint;
class ISrsPithyPrint;
class SrsProcess;
class ISrsProcess;
class ISrsAppConfig;
// The ffmpeg interface.
class ISrsFFMPEG
{
public:
ISrsFFMPEG();
virtual ~ISrsFFMPEG();
public:
virtual void append_iparam(std::string iparam) = 0;
virtual void set_oformat(std::string format) = 0;
virtual std::string output() = 0;
public:
virtual srs_error_t initialize(std::string in, std::string out, std::string log) = 0;
virtual srs_error_t initialize_transcode(SrsConfDirective *engine) = 0;
virtual srs_error_t initialize_copy() = 0;
public:
virtual srs_error_t start() = 0;
virtual srs_error_t cycle() = 0;
virtual void stop() = 0;
public:
virtual void fast_stop() = 0;
virtual void fast_kill() = 0;
};
// A transcode engine: ffmepg, used to transcode a stream to another.
class SrsFFMPEG
class SrsFFMPEG : public ISrsFFMPEG
{
private:
SrsProcess *process_;
ISrsAppConfig *config_;
private:
ISrsProcess *process_;
std::vector<std::string> params_;
private:

View File

@ -11,6 +11,7 @@ using namespace std;
#include <srs_app_config.hpp>
#include <srs_app_dvr.hpp>
#include <srs_app_factory.hpp>
#include <srs_app_http_client.hpp>
#include <srs_app_http_conn.hpp>
#include <srs_app_st.hpp>
@ -22,7 +23,6 @@ using namespace std;
#include <srs_protocol_amf0.hpp>
#include <srs_protocol_json.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_app_factory.hpp>
// The HTTP response body should be "0", see https://github.com/ossrs/srs/issues/3215#issuecomment-1319991512
#define SRS_HTTP_RESPONSE_OK SRS_XSTR(0)

View File

@ -10,6 +10,7 @@
using namespace std;
#include <srs_app_config.hpp>
#include <srs_app_factory.hpp>
#include <srs_app_ffmpeg.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_error.hpp>
@ -18,6 +19,14 @@ using namespace std;
#include <srs_kernel_utility.hpp>
#include <srs_protocol_utility.hpp>
ISrsIngesterFFMPEG::ISrsIngesterFFMPEG()
{
}
ISrsIngesterFFMPEG::~ISrsIngesterFFMPEG()
{
}
SrsIngesterFFMPEG::SrsIngesterFFMPEG()
{
ffmpeg_ = NULL;
@ -29,7 +38,7 @@ SrsIngesterFFMPEG::~SrsIngesterFFMPEG()
srs_freep(ffmpeg_);
}
srs_error_t SrsIngesterFFMPEG::initialize(SrsFFMPEG *ff, string v, string i)
srs_error_t SrsIngesterFFMPEG::initialize(ISrsFFMPEG *ff, string v, string i)
{
srs_error_t err = srs_success;
@ -86,24 +95,34 @@ void SrsIngesterFFMPEG::fast_kill()
ffmpeg_->fast_kill();
}
ISrsIngester::ISrsIngester()
{
}
ISrsIngester::~ISrsIngester()
{
}
SrsIngester::SrsIngester()
{
_srs_config->subscribe(this);
expired_ = false;
disposed_ = false;
trd_ = new SrsDummyCoroutine();
pprint_ = SrsPithyPrint::create_ingester();
app_factory_ = _srs_app_factory;
config_ = _srs_config;
}
SrsIngester::~SrsIngester()
{
_srs_config->unsubscribe(this);
srs_freep(trd_);
clear_engines();
srs_freep(pprint_);
app_factory_ = NULL;
config_ = NULL;
}
void SrsIngester::dispose()
@ -116,7 +135,8 @@ void SrsIngester::dispose()
// first, use fast stop to notice all FFMPEG to quit gracefully.
fast_stop();
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
SrsUniquePtr<ISrsTime> time(app_factory_->create_time());
time->usleep(100 * SRS_UTIME_MILLISECONDS);
// then, use fast kill to ensure FFMPEG quit.
fast_kill();
@ -136,7 +156,7 @@ srs_error_t SrsIngester::start()
// start thread to run all encoding engines.
srs_freep(trd_);
trd_ = new SrsSTCoroutine("ingest", this, _srs_context->get_id());
trd_ = app_factory_->create_coroutine("ingest", this, _srs_context->get_id());
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start coroutine");
@ -153,9 +173,9 @@ void SrsIngester::stop()
void SrsIngester::fast_stop()
{
std::vector<SrsIngesterFFMPEG *>::iterator it;
std::vector<ISrsIngesterFFMPEG *>::iterator it;
for (it = ingesters_.begin(); it != ingesters_.end(); ++it) {
SrsIngesterFFMPEG *ingester = *it;
ISrsIngesterFFMPEG *ingester = *it;
ingester->fast_stop();
}
@ -166,9 +186,9 @@ void SrsIngester::fast_stop()
void SrsIngester::fast_kill()
{
std::vector<SrsIngesterFFMPEG *>::iterator it;
std::vector<ISrsIngesterFFMPEG *>::iterator it;
for (it = ingesters_.begin(); it != ingesters_.end(); ++it) {
SrsIngesterFFMPEG *ingester = *it;
ISrsIngesterFFMPEG *ingester = *it;
ingester->fast_kill();
}
@ -185,6 +205,8 @@ srs_error_t SrsIngester::cycle()
{
srs_error_t err = srs_success;
SrsUniquePtr<ISrsTime> time(app_factory_->create_time());
while (!disposed_) {
// We always check status first.
// @see https://github.com/ossrs/srs/issues/1634#issuecomment-597571561
@ -197,7 +219,7 @@ srs_error_t SrsIngester::cycle()
srs_freep(err);
}
srs_usleep(SRS_INGESTER_CIMS);
time->usleep(SRS_INGESTER_CIMS);
}
return err;
@ -222,9 +244,9 @@ srs_error_t SrsIngester::do_cycle()
}
// cycle exists ingesters.
std::vector<SrsIngesterFFMPEG *>::iterator it;
std::vector<ISrsIngesterFFMPEG *>::iterator it;
for (it = ingesters_.begin(); it != ingesters_.end(); ++it) {
SrsIngesterFFMPEG *ingester = *it;
ISrsIngesterFFMPEG *ingester = *it;
// start all ffmpegs.
if ((err = ingester->start()) != srs_success) {
@ -245,10 +267,10 @@ srs_error_t SrsIngester::do_cycle()
void SrsIngester::clear_engines()
{
std::vector<SrsIngesterFFMPEG *>::iterator it;
std::vector<ISrsIngesterFFMPEG *>::iterator it;
for (it = ingesters_.begin(); it != ingesters_.end(); ++it) {
SrsIngesterFFMPEG *ingester = *it;
ISrsIngesterFFMPEG *ingester = *it;
srs_freep(ingester);
}
@ -261,7 +283,7 @@ srs_error_t SrsIngester::parse()
// parse ingesters
std::vector<SrsConfDirective *> vhosts;
_srs_config->get_vhosts(vhosts);
config_->get_vhosts(vhosts);
for (int i = 0; i < (int)vhosts.size(); i++) {
SrsConfDirective *vhost = vhosts[i];
@ -278,11 +300,11 @@ srs_error_t SrsIngester::parse_ingesters(SrsConfDirective *vhost)
srs_error_t err = srs_success;
// when vhost disabled, ignore any ingesters.
if (!_srs_config->get_vhost_enabled(vhost)) {
if (!config_->get_vhost_enabled(vhost)) {
return err;
}
std::vector<SrsConfDirective *> ingesters = _srs_config->get_ingesters(vhost->arg0());
std::vector<SrsConfDirective *> ingesters = config_->get_ingesters(vhost->arg0());
// create engine
for (int i = 0; i < (int)ingesters.size(); i++) {
@ -299,27 +321,27 @@ srs_error_t SrsIngester::parse_engines(SrsConfDirective *vhost, SrsConfDirective
{
srs_error_t err = srs_success;
if (!_srs_config->get_ingest_enabled(ingest)) {
if (!config_->get_ingest_enabled(ingest)) {
return err;
}
std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest);
std::string ffmpeg_bin = config_->get_ingest_ffmpeg(ingest);
if (ffmpeg_bin.empty()) {
return srs_error_new(ERROR_ENCODER_PARSE, "parse ffmpeg");
}
// get all engines.
std::vector<SrsConfDirective *> engines = _srs_config->get_transcode_engines(ingest);
std::vector<SrsConfDirective *> engines = config_->get_transcode_engines(ingest);
// create ingesters without engines.
if (engines.empty()) {
SrsFFMPEG *ffmpeg = new SrsFFMPEG(ffmpeg_bin);
ISrsFFMPEG *ffmpeg = app_factory_->create_ffmpeg(ffmpeg_bin);
if ((err = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != srs_success) {
srs_freep(ffmpeg);
return srs_error_wrap(err, "init ffmpeg");
}
SrsIngesterFFMPEG *ingester = new SrsIngesterFFMPEG();
ISrsIngesterFFMPEG *ingester = app_factory_->create_ingester_ffmpeg();
if ((err = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != srs_success) {
srs_freep(ingester);
return srs_error_wrap(err, "init ingester");
@ -332,13 +354,13 @@ srs_error_t SrsIngester::parse_engines(SrsConfDirective *vhost, SrsConfDirective
// create ingesters with engine
for (int i = 0; i < (int)engines.size(); i++) {
SrsConfDirective *engine = engines[i];
SrsFFMPEG *ffmpeg = new SrsFFMPEG(ffmpeg_bin);
ISrsFFMPEG *ffmpeg = app_factory_->create_ffmpeg(ffmpeg_bin);
if ((err = initialize_ffmpeg(ffmpeg, vhost, ingest, engine)) != srs_success) {
srs_freep(ffmpeg);
return srs_error_wrap(err, "init ffmpeg");
}
SrsIngesterFFMPEG *ingester = new SrsIngesterFFMPEG();
ISrsIngesterFFMPEG *ingester = app_factory_->create_ingester_ffmpeg();
if ((err = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != srs_success) {
srs_freep(ingester);
return srs_error_wrap(err, "init ingester");
@ -350,13 +372,13 @@ srs_error_t SrsIngester::parse_engines(SrsConfDirective *vhost, SrsConfDirective
return err;
}
srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective *vhost, SrsConfDirective *ingest, SrsConfDirective *engine)
srs_error_t SrsIngester::initialize_ffmpeg(ISrsFFMPEG *ffmpeg, SrsConfDirective *vhost, SrsConfDirective *ingest, SrsConfDirective *engine)
{
srs_error_t err = srs_success;
int port;
if (true) {
std::vector<std::string> ip_ports = _srs_config->get_listens();
std::vector<std::string> ip_ports = config_->get_listens();
srs_assert(ip_ports.size() > 0);
std::string ip;
@ -364,7 +386,7 @@ srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective *
srs_net_split_for_listener(ep, ip, port);
}
std::string output = _srs_config->get_engine_output(engine);
std::string output = config_->get_engine_output(engine);
// output stream, to other/self server
// ie. rtmp://localhost:1935/live/livestream_sd
output = srs_strings_replace(output, "[vhost]", vhost->arg0());
@ -390,8 +412,8 @@ srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective *
std::string log_file = SRS_CONSTS_NULL_FILE; // disabled
// write ffmpeg info to log file.
if (_srs_config->get_ff_log_enabled()) {
log_file = _srs_config->get_ff_log_dir();
if (config_->get_ff_log_enabled()) {
log_file = config_->get_ff_log_dir();
log_file += "/";
log_file += "ffmpeg-ingest";
log_file += "-";
@ -403,20 +425,20 @@ srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective *
log_file += ".log";
}
std::string log_level = _srs_config->get_ff_log_level();
std::string log_level = config_->get_ff_log_level();
if (!log_level.empty()) {
ffmpeg->append_iparam("-loglevel");
ffmpeg->append_iparam(log_level);
}
// input
std::string input_type = _srs_config->get_ingest_input_type(ingest);
std::string input_type = config_->get_ingest_input_type(ingest);
if (input_type.empty()) {
return srs_error_new(ERROR_ENCODER_NO_INPUT, "empty intput type, ingest=%s", ingest->arg0().c_str());
}
if (srs_config_ingest_is_file(input_type)) {
std::string input_url = _srs_config->get_ingest_input_url(ingest);
std::string input_url = config_->get_ingest_input_url(ingest);
if (input_url.empty()) {
return srs_error_new(ERROR_ENCODER_NO_INPUT, "empty intput url, ingest=%s", ingest->arg0().c_str());
}
@ -428,7 +450,7 @@ srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective *
return srs_error_wrap(err, "init ffmpeg");
}
} else if (srs_config_ingest_is_stream(input_type)) {
std::string input_url = _srs_config->get_ingest_input_url(ingest);
std::string input_url = config_->get_ingest_input_url(ingest);
if (input_url.empty()) {
return srs_error_new(ERROR_ENCODER_NO_INPUT, "empty intput url, ingest=%s", ingest->arg0().c_str());
}
@ -446,10 +468,10 @@ srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective *
// set output format to flv for RTMP
ffmpeg->set_oformat("flv");
std::string vcodec = _srs_config->get_engine_vcodec(engine);
std::string acodec = _srs_config->get_engine_acodec(engine);
std::string vcodec = config_->get_engine_vcodec(engine);
std::string acodec = config_->get_engine_acodec(engine);
// whatever the engine config, use copy as default.
bool engine_disabled = !engine || !_srs_config->get_engine_enabled(engine);
bool engine_disabled = !engine || !config_->get_engine_enabled(engine);
if (engine_disabled || vcodec.empty() || acodec.empty()) {
if ((err = ffmpeg->initialize_copy()) != srs_success) {
return srs_error_wrap(err, "init ffmpeg");
@ -476,7 +498,7 @@ void SrsIngester::show_ingest_log_message()
// random choose one ingester to report.
SrsRand rand;
int index = rand.integer() % (int)ingesters_.size();
SrsIngesterFFMPEG *ingester = ingesters_.at(index);
ISrsIngesterFFMPEG *ingester = ingesters_.at(index);
// reportable
if (pprint_->can_print()) {

View File

@ -15,16 +15,44 @@
#include <srs_app_st.hpp>
class SrsFFMPEG;
class ISrsFFMPEG;
class SrsConfDirective;
class SrsPithyPrint;
class ISrsPithyPrint;
class ISrsAppFactory;
class ISrsAppConfig;
// The ingest ffmpeg interface.
class ISrsIngesterFFMPEG
{
public:
ISrsIngesterFFMPEG();
virtual ~ISrsIngesterFFMPEG();
public:
virtual srs_error_t initialize(ISrsFFMPEG *ff, std::string v, std::string i) = 0;
// The ingest uri, [vhost]/[ingest id]
virtual std::string uri() = 0;
// The alive in srs_utime_t.
virtual srs_utime_t alive() = 0;
virtual bool equals(std::string v, std::string i) = 0;
virtual bool equals(std::string v) = 0;
public:
virtual srs_error_t start() = 0;
virtual void stop() = 0;
virtual srs_error_t cycle() = 0;
virtual void fast_stop() = 0;
virtual void fast_kill() = 0;
};
// Ingester ffmpeg object.
class SrsIngesterFFMPEG
class SrsIngesterFFMPEG : public ISrsIngesterFFMPEG
{
private:
std::string vhost_;
std::string id_;
SrsFFMPEG *ffmpeg_;
ISrsFFMPEG *ffmpeg_;
srs_utime_t starttime_;
public:
@ -32,7 +60,7 @@ public:
virtual ~SrsIngesterFFMPEG();
public:
virtual srs_error_t initialize(SrsFFMPEG *ff, std::string v, std::string i);
virtual srs_error_t initialize(ISrsFFMPEG *ff, std::string v, std::string i);
// The ingest uri, [vhost]/[ingest id]
virtual std::string uri();
// The alive in srs_utime_t.
@ -44,22 +72,35 @@ public:
virtual srs_error_t start();
virtual void stop();
virtual srs_error_t cycle();
// @see SrsFFMPEG.fast_stop().
virtual void fast_stop();
virtual void fast_kill();
};
// The ingest interface.
class ISrsIngester
{
public:
ISrsIngester();
virtual ~ISrsIngester();
public:
};
// Ingest file/stream/device,
// encode with FFMPEG(optional),
// push to SRS(or any RTMP server) over RTMP.
class SrsIngester : public ISrsCoroutineHandler, public ISrsReloadHandler
class SrsIngester : public ISrsIngester, public ISrsCoroutineHandler
{
private:
std::vector<SrsIngesterFFMPEG *> ingesters_;
ISrsAppFactory *app_factory_;
ISrsAppConfig *config_;
private:
std::vector<ISrsIngesterFFMPEG *> ingesters_;
private:
ISrsCoroutine *trd_;
SrsPithyPrint *pprint_;
ISrsPithyPrint *pprint_;
// Whether the ingesters are expired, for example, the listen port changed,
// all ingesters must be restart.
bool expired_;
@ -94,7 +135,7 @@ private:
virtual srs_error_t parse();
virtual srs_error_t parse_ingesters(SrsConfDirective *vhost);
virtual srs_error_t parse_engines(SrsConfDirective *vhost, SrsConfDirective *ingest);
virtual srs_error_t initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective *vhost, SrsConfDirective *ingest, SrsConfDirective *engine);
virtual srs_error_t initialize_ffmpeg(ISrsFFMPEG *ffmpeg, SrsConfDirective *vhost, SrsConfDirective *ingest, SrsConfDirective *engine);
virtual void show_ingest_log_message();
};

View File

@ -27,6 +27,14 @@ using namespace std;
#include <srs_kernel_utility.hpp>
#include <srs_protocol_utility.hpp>
ISrsProcess::ISrsProcess()
{
}
ISrsProcess::~ISrsProcess()
{
}
SrsProcess::SrsProcess()
{
is_started_ = false;

View File

@ -12,6 +12,36 @@
#include <string>
#include <vector>
// The process interface.
class ISrsProcess
{
public:
ISrsProcess();
virtual ~ISrsProcess();
public:
// Get pid of process.
virtual int get_pid() = 0;
// whether process is already started.
virtual bool started() = 0;
// Initialize the process with binary and argv.
virtual srs_error_t initialize(std::string binary, std::vector<std::string> argv) = 0;
public:
// Start the process, ignore when already started.
virtual srs_error_t start() = 0;
// cycle check the process, update the state of process.
virtual srs_error_t cycle() = 0;
// Send SIGTERM then SIGKILL to ensure the process stopped.
virtual void stop() = 0;
public:
// The fast stop is to send a SIGTERM.
virtual void fast_stop() = 0;
// Directly kill process, never use it except server quiting.
virtual void fast_kill() = 0;
};
// Start and stop a process. Call cycle to restart the process when terminated.
// The usage:
// // the binary is the process to fork.
@ -25,7 +55,7 @@
// if ((ret = process->cycle()) != ERROR_SUCCESS) { return ret; }
// process->fast_stop();
// process->stop();
class SrsProcess
class SrsProcess : public ISrsProcess
{
private:
bool is_started_;

View File

@ -12,6 +12,7 @@
using namespace std;
#include <srs_app_config.hpp>
#include <srs_app_factory.hpp>
#include <srs_app_http_api.hpp>
#include <srs_app_rtc_api.hpp>
#include <srs_app_rtc_conn.hpp>
@ -32,7 +33,6 @@ using namespace std;
#include <srs_protocol_log.hpp>
#include <srs_protocol_rtc_stun.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_app_factory.hpp>
extern SrsPps *_srs_pps_rpkts;
extern SrsPps *_srs_pps_rstuns;

View File

@ -3249,6 +3249,16 @@ ISrsRtcConnection *MockDvrAppFactory::create_rtc_connection(ISrsExecRtcAsyncTask
return NULL;
}
ISrsFFMPEG *MockDvrAppFactory::create_ffmpeg(std::string ffmpeg_bin)
{
return NULL;
}
ISrsIngesterFFMPEG *MockDvrAppFactory::create_ingester_ffmpeg()
{
return NULL;
}
ISrsCoroutine *MockDvrAppFactory::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid)
{
return NULL;

View File

@ -643,6 +643,8 @@ public:
virtual ISrsFragmentedMp4 *create_fragmented_mp4();
virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler);
virtual ISrsRtcConnection *create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid);
virtual ISrsFFMPEG *create_ffmpeg(std::string ffmpeg_bin);
virtual ISrsIngesterFFMPEG *create_ingester_ffmpeg();
// ISrsKernelFactory interface methods
virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid);
virtual ISrsTime *create_time();

View File

@ -2470,6 +2470,16 @@ ISrsRtcConnection *MockAppFactoryForGbPublish::create_rtc_connection(ISrsExecRtc
return NULL;
}
ISrsFFMPEG *MockAppFactoryForGbPublish::create_ffmpeg(std::string ffmpeg_bin)
{
return NULL;
}
ISrsIngesterFFMPEG *MockAppFactoryForGbPublish::create_ingester_ffmpeg()
{
return NULL;
}
ISrsCoroutine *MockAppFactoryForGbPublish::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid)
{
return NULL;

View File

@ -633,6 +633,8 @@ public:
virtual ISrsFragmentedMp4 *create_fragmented_mp4();
virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler);
virtual ISrsRtcConnection *create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid);
virtual ISrsFFMPEG *create_ffmpeg(std::string ffmpeg_bin);
virtual ISrsIngesterFFMPEG *create_ingester_ffmpeg();
// ISrsKernelFactory interface methods
virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid);
virtual ISrsTime *create_time();

View File

@ -4265,4 +4265,3 @@ VOID TEST(HttpHooksTest, OnForwardBackendSuccess)
hooks->stat_ = NULL;
mock_factory->mock_http_client_ = NULL;
}

File diff suppressed because it is too large Load Diff

View File

@ -12,13 +12,15 @@
*/
#include <srs_utest.hpp>
#include <srs_app_ffmpeg.hpp>
#include <srs_app_ingest.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_rtc_conn.hpp>
#include <srs_app_stream_token.hpp>
#include <srs_protocol_srt.hpp>
#include <srs_utest_app10.hpp>
#include <srs_utest_app11.hpp>
#include <srs_utest_app6.hpp>
#include <srs_app_stream_token.hpp>
#include <srs_app_rtc_conn.hpp>
// Mock ISrsSrtSocket for testing SrsSrtConnection
class MockSrtSocket : public ISrsSrtSocket
@ -454,4 +456,167 @@ public:
virtual srs_error_t write(void *buf, size_t size, ssize_t *nwrite);
};
// Mock ISrsFFMPEG for testing SrsIngesterFFMPEG
class MockFFMPEG : public ISrsFFMPEG
{
public:
bool start_called_;
bool stop_called_;
bool cycle_called_;
bool fast_stop_called_;
bool fast_kill_called_;
srs_error_t start_error_;
srs_error_t cycle_error_;
public:
MockFFMPEG();
virtual ~MockFFMPEG();
public:
virtual void append_iparam(std::string iparam);
virtual void set_oformat(std::string format);
virtual std::string output();
virtual srs_error_t initialize(std::string in, std::string out, std::string log);
virtual srs_error_t initialize_transcode(SrsConfDirective *engine);
virtual srs_error_t initialize_copy();
virtual srs_error_t start();
virtual srs_error_t cycle();
virtual void stop();
virtual void fast_stop();
virtual void fast_kill();
};
// Mock ISrsIngesterFFMPEG for testing SrsIngester
class MockIngesterFFMPEG : public ISrsIngesterFFMPEG
{
public:
bool fast_stop_called_;
bool fast_kill_called_;
std::string vhost_;
std::string id_;
public:
MockIngesterFFMPEG();
virtual ~MockIngesterFFMPEG();
public:
virtual srs_error_t initialize(ISrsFFMPEG *ff, std::string v, std::string i);
virtual std::string uri();
virtual srs_utime_t alive();
virtual bool equals(std::string v, std::string i);
virtual bool equals(std::string v);
virtual srs_error_t start();
virtual void stop();
virtual srs_error_t cycle();
virtual void fast_stop();
virtual void fast_kill();
};
// Mock ISrsAppFactory for testing SrsIngester
class MockAppFactoryForIngester : public ISrsAppFactory
{
public:
MockSrtCoroutine *mock_coroutine_;
ISrsTime *mock_time_;
int create_coroutine_count_;
int create_time_count_;
public:
MockAppFactoryForIngester();
virtual ~MockAppFactoryForIngester();
public:
virtual ISrsFileWriter *create_file_writer();
virtual ISrsFileWriter *create_enc_file_writer();
virtual ISrsFileReader *create_file_reader();
virtual SrsPath *create_path();
virtual SrsLiveSource *create_live_source();
virtual ISrsOriginHub *create_origin_hub();
virtual ISrsHourGlass *create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval);
virtual ISrsBasicRtmpClient *create_rtmp_client(std::string url, srs_utime_t cto, srs_utime_t sto);
virtual ISrsHttpClient *create_http_client();
virtual ISrsFileReader *create_http_file_reader(ISrsHttpResponseReader *r);
virtual ISrsFlvDecoder *create_flv_decoder();
virtual ISrsRtspSendTrack *create_rtsp_audio_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc);
virtual ISrsRtspSendTrack *create_rtsp_video_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc);
virtual ISrsFlvTransmuxer *create_flv_transmuxer();
virtual ISrsMp4Encoder *create_mp4_encoder();
virtual ISrsDvrSegmenter *create_dvr_flv_segmenter();
virtual ISrsDvrSegmenter *create_dvr_mp4_segmenter();
virtual ISrsGbMediaTcpConn *create_gb_media_tcp_conn();
virtual ISrsGbSession *create_gb_session();
virtual ISrsInitMp4 *create_init_mp4();
virtual ISrsFragmentWindow *create_fragment_window();
virtual ISrsFragmentedMp4 *create_fragmented_mp4();
virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler);
virtual ISrsRtcConnection *create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid);
virtual ISrsFFMPEG *create_ffmpeg(std::string ffmpeg_bin);
virtual ISrsIngesterFFMPEG *create_ingester_ffmpeg();
virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid);
virtual ISrsTime *create_time();
virtual ISrsConfig *create_config();
virtual ISrsCond *create_cond();
void reset();
};
// Mock ISrsAppConfig for testing SrsIngester
class MockAppConfigForIngester : public MockAppConfig
{
public:
std::vector<SrsConfDirective *> vhosts_;
public:
MockAppConfigForIngester();
virtual ~MockAppConfigForIngester();
public:
virtual void get_vhosts(std::vector<SrsConfDirective *> &vhosts);
virtual std::vector<std::string> get_listens();
virtual std::vector<SrsConfDirective *> get_ingesters(std::string vhost);
virtual bool get_ingest_enabled(SrsConfDirective *conf);
virtual std::string get_ingest_ffmpeg(SrsConfDirective *conf);
virtual std::string get_ingest_input_type(SrsConfDirective *conf);
virtual std::string get_ingest_input_url(SrsConfDirective *conf);
virtual std::vector<SrsConfDirective *> get_transcode_engines(SrsConfDirective *conf);
virtual bool get_engine_enabled(SrsConfDirective *conf);
virtual std::string get_engine_output(SrsConfDirective *conf);
virtual std::string get_engine_vcodec(SrsConfDirective *conf);
virtual std::string get_engine_acodec(SrsConfDirective *conf);
virtual std::vector<std::string> get_engine_perfile(SrsConfDirective *conf);
virtual std::string get_engine_iformat(SrsConfDirective *conf);
virtual std::vector<std::string> get_engine_vfilter(SrsConfDirective *conf);
virtual int get_engine_vbitrate(SrsConfDirective *conf);
virtual double get_engine_vfps(SrsConfDirective *conf);
virtual int get_engine_vwidth(SrsConfDirective *conf);
virtual int get_engine_vheight(SrsConfDirective *conf);
virtual int get_engine_vthreads(SrsConfDirective *conf);
virtual std::string get_engine_vprofile(SrsConfDirective *conf);
virtual std::string get_engine_vpreset(SrsConfDirective *conf);
virtual std::vector<std::string> get_engine_vparams(SrsConfDirective *conf);
virtual int get_engine_abitrate(SrsConfDirective *conf);
virtual int get_engine_asample_rate(SrsConfDirective *conf);
virtual int get_engine_achannels(SrsConfDirective *conf);
virtual std::vector<std::string> get_engine_aparams(SrsConfDirective *conf);
virtual std::string get_engine_oformat(SrsConfDirective *conf);
virtual bool get_vhost_enabled(SrsConfDirective *conf);
void add_vhost(SrsConfDirective *vhost);
void clear_vhosts();
};
// Mock ISrsTime for testing SrsIngester
class MockTimeForIngester : public ISrsTime
{
public:
int usleep_count_;
srs_utime_t last_usleep_duration_;
public:
MockTimeForIngester();
virtual ~MockTimeForIngester();
public:
virtual void usleep(srs_utime_t duration);
void reset();
};
#endif

View File

@ -448,6 +448,40 @@ public:
virtual int get_dash_window_size(std::string vhost) { return 10; }
virtual bool get_dash_cleanup(std::string vhost) { return true; }
virtual srs_utime_t get_dash_dispose(std::string vhost) { return dash_dispose_; }
// Ingest config
virtual void get_vhosts(std::vector<SrsConfDirective *> &vhosts) {}
virtual std::vector<SrsConfDirective *> get_ingesters(std::string vhost) { return std::vector<SrsConfDirective *>(); }
virtual SrsConfDirective *get_ingest_by_id(std::string vhost, std::string ingest_id) { return NULL; }
virtual bool get_ingest_enabled(SrsConfDirective *conf) { return false; }
virtual std::string get_ingest_ffmpeg(SrsConfDirective *conf) { return ""; }
virtual std::string get_ingest_input_type(SrsConfDirective *conf) { return ""; }
virtual std::string get_ingest_input_url(SrsConfDirective *conf) { return ""; }
// FFmpeg log config
virtual bool get_ff_log_enabled() { return false; }
virtual std::string get_ff_log_dir() { return ""; }
virtual std::string get_ff_log_level() { return ""; }
// Transcode/Engine config
virtual std::vector<SrsConfDirective *> get_transcode_engines(SrsConfDirective *conf) { return std::vector<SrsConfDirective *>(); }
virtual bool get_engine_enabled(SrsConfDirective *conf) { return false; }
virtual std::vector<std::string> get_engine_perfile(SrsConfDirective *conf) { return std::vector<std::string>(); }
virtual std::string get_engine_iformat(SrsConfDirective *conf) { return ""; }
virtual std::vector<std::string> get_engine_vfilter(SrsConfDirective *conf) { return std::vector<std::string>(); }
virtual std::string get_engine_vcodec(SrsConfDirective *conf) { return ""; }
virtual int get_engine_vbitrate(SrsConfDirective *conf) { return 0; }
virtual double get_engine_vfps(SrsConfDirective *conf) { return 0; }
virtual int get_engine_vwidth(SrsConfDirective *conf) { return 0; }
virtual int get_engine_vheight(SrsConfDirective *conf) { return 0; }
virtual int get_engine_vthreads(SrsConfDirective *conf) { return 0; }
virtual std::string get_engine_vprofile(SrsConfDirective *conf) { return ""; }
virtual std::string get_engine_vpreset(SrsConfDirective *conf) { return ""; }
virtual std::vector<std::string> get_engine_vparams(SrsConfDirective *conf) { return std::vector<std::string>(); }
virtual std::string get_engine_acodec(SrsConfDirective *conf) { return ""; }
virtual int get_engine_abitrate(SrsConfDirective *conf) { return 0; }
virtual int get_engine_asample_rate(SrsConfDirective *conf) { return 0; }
virtual int get_engine_achannels(SrsConfDirective *conf) { return 0; }
virtual std::vector<std::string> get_engine_aparams(SrsConfDirective *conf) { return std::vector<std::string>(); }
virtual std::string get_engine_oformat(SrsConfDirective *conf) { return ""; }
virtual std::string get_engine_output(SrsConfDirective *conf) { return ""; }
void set_http_hooks_enabled(bool enabled);
void set_on_stop_urls(const std::vector<std::string> &urls);
void clear_on_stop_directive();