diff --git a/trunk/scripts/clang_format.sh b/trunk/scripts/clang_format.sh index f0ce9a358..e7cbf8481 100755 --- a/trunk/scripts/clang_format.sh +++ b/trunk/scripts/clang_format.sh @@ -24,4 +24,5 @@ fi echo "Formatting source files in trunk directory..." # Exclude the 3rdparty directory and format all .cpp, and .hpp # Use -i to edit files in place -find trunk/src -name "*.*pp" | xargs clang-format -style=file -i +# Use xargs -P N to run N clang-format processes in parallel +find trunk/src -name "*.*pp" | xargs -P 16 -n 1 clang-format -style=file -i diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index b3d0da371..9149fa32d 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -395,6 +395,7 @@ public: public: // Vhost config + virtual void get_vhosts(std::vector &vhosts) = 0; virtual SrsConfDirective *get_vhost(std::string vhost, bool try_default_vhost = true) = 0; virtual bool get_vhost_enabled(std::string vhost) = 0; virtual bool get_vhost_enabled(SrsConfDirective *conf) = 0; @@ -545,6 +546,45 @@ public: virtual bool get_dvr_enabled(std::string vhost) = 0; virtual SrsConfDirective *get_dvr_apply(std::string vhost) = 0; virtual srs_utime_t get_dvr_duration(std::string vhost) = 0; + +public: + // Ingest config + virtual std::vector get_ingesters(std::string vhost) = 0; + virtual SrsConfDirective *get_ingest_by_id(std::string vhost, std::string ingest_id) = 0; + virtual bool get_ingest_enabled(SrsConfDirective *conf) = 0; + virtual std::string get_ingest_ffmpeg(SrsConfDirective *conf) = 0; + virtual std::string get_ingest_input_type(SrsConfDirective *conf) = 0; + virtual std::string get_ingest_input_url(SrsConfDirective *conf) = 0; + +public: + // FFmpeg log config + virtual bool get_ff_log_enabled() = 0; + virtual std::string get_ff_log_dir() = 0; + virtual std::string get_ff_log_level() = 0; + +public: + // Transcode/Engine config + virtual std::vector get_transcode_engines(SrsConfDirective *conf) = 0; + virtual bool get_engine_enabled(SrsConfDirective *conf) = 0; + virtual std::vector get_engine_perfile(SrsConfDirective *conf) = 0; + virtual std::string get_engine_iformat(SrsConfDirective *conf) = 0; + virtual std::vector get_engine_vfilter(SrsConfDirective *conf) = 0; + virtual std::string get_engine_vcodec(SrsConfDirective *conf) = 0; + virtual int get_engine_vbitrate(SrsConfDirective *conf) = 0; + virtual double get_engine_vfps(SrsConfDirective *conf) = 0; + virtual int get_engine_vwidth(SrsConfDirective *conf) = 0; + virtual int get_engine_vheight(SrsConfDirective *conf) = 0; + virtual int get_engine_vthreads(SrsConfDirective *conf) = 0; + virtual std::string get_engine_vprofile(SrsConfDirective *conf) = 0; + virtual std::string get_engine_vpreset(SrsConfDirective *conf) = 0; + virtual std::vector get_engine_vparams(SrsConfDirective *conf) = 0; + virtual std::string get_engine_acodec(SrsConfDirective *conf) = 0; + virtual int get_engine_abitrate(SrsConfDirective *conf) = 0; + virtual int get_engine_asample_rate(SrsConfDirective *conf) = 0; + virtual int get_engine_achannels(SrsConfDirective *conf) = 0; + virtual std::vector get_engine_aparams(SrsConfDirective *conf) = 0; + virtual std::string get_engine_oformat(SrsConfDirective *conf) = 0; + virtual std::string get_engine_output(SrsConfDirective *conf) = 0; }; // The config service provider. diff --git a/trunk/src/app/srs_app_factory.cpp b/trunk/src/app/srs_app_factory.cpp index 809023b46..370ffc787 100644 --- a/trunk/src/app/srs_app_factory.cpp +++ b/trunk/src/app/srs_app_factory.cpp @@ -10,9 +10,12 @@ #include #include #include +#include #include #include +#include #include +#include #include #include #include @@ -25,7 +28,6 @@ #include #include #include -#include ISrsAppFactory::ISrsAppFactory() { @@ -173,6 +175,16 @@ ISrsRtcConnection *SrsAppFactory::create_rtc_connection(ISrsExecRtcAsyncTask *ex return session; } +ISrsFFMPEG *SrsAppFactory::create_ffmpeg(std::string ffmpeg_bin) +{ + return new SrsFFMPEG(ffmpeg_bin); +} + +ISrsIngesterFFMPEG *SrsAppFactory::create_ingester_ffmpeg() +{ + return new SrsIngesterFFMPEG(); +} + ISrsCoroutine *SrsAppFactory::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid) { return kernel_factory_->create_coroutine(name, handler, cid); diff --git a/trunk/src/app/srs_app_factory.hpp b/trunk/src/app/srs_app_factory.hpp index ecec8da2b..d5aec498a 100644 --- a/trunk/src/app/srs_app_factory.hpp +++ b/trunk/src/app/srs_app_factory.hpp @@ -40,6 +40,8 @@ class ISrsIpListener; class ISrsTcpHandler; class ISrsRtcConnection; class ISrsExecRtcAsyncTask; +class ISrsFFMPEG; +class ISrsIngesterFFMPEG; // The factory to create app objects. class ISrsAppFactory : public ISrsKernelFactory @@ -77,6 +79,8 @@ public: virtual ISrsFragmentedMp4 *create_fragmented_mp4() = 0; virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler) = 0; virtual ISrsRtcConnection *create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid) = 0; + virtual ISrsFFMPEG *create_ffmpeg(std::string ffmpeg_bin) = 0; + virtual ISrsIngesterFFMPEG *create_ingester_ffmpeg() = 0; }; // The factory to create app objects. @@ -118,6 +122,8 @@ public: virtual ISrsFragmentedMp4 *create_fragmented_mp4(); virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler); virtual ISrsRtcConnection *create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid); + virtual ISrsFFMPEG *create_ffmpeg(std::string ffmpeg_bin); + virtual ISrsIngesterFFMPEG *create_ingester_ffmpeg(); public: virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid); diff --git a/trunk/src/app/srs_app_ffmpeg.cpp b/trunk/src/app/srs_app_ffmpeg.cpp index 500760b6c..b8ca4dc33 100644 --- a/trunk/src/app/srs_app_ffmpeg.cpp +++ b/trunk/src/app/srs_app_ffmpeg.cpp @@ -44,6 +44,14 @@ using namespace std; #define SRS_RTMP_ENCODER_LIBAACPLUS "libaacplus" #define SRS_RTMP_ENCODER_LIBFDKAAC "libfdk_aac" +ISrsFFMPEG::ISrsFFMPEG() +{ +} + +ISrsFFMPEG::~ISrsFFMPEG() +{ +} + SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin) { ffmpeg_ = ffmpeg_bin; @@ -58,6 +66,8 @@ SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin) achannels_ = 0; process_ = new SrsProcess(); + + config_ = _srs_config; } SrsFFMPEG::~SrsFFMPEG() @@ -65,6 +75,8 @@ SrsFFMPEG::~SrsFFMPEG() stop(); srs_freep(process_); + + config_ = NULL; } void SrsFFMPEG::append_iparam(string iparam) @@ -97,24 +109,24 @@ srs_error_t SrsFFMPEG::initialize_transcode(SrsConfDirective *engine) { srs_error_t err = srs_success; - perfile_ = _srs_config->get_engine_perfile(engine); - iformat_ = _srs_config->get_engine_iformat(engine); - vfilter_ = _srs_config->get_engine_vfilter(engine); - vcodec_ = _srs_config->get_engine_vcodec(engine); - vbitrate_ = _srs_config->get_engine_vbitrate(engine); - vfps_ = _srs_config->get_engine_vfps(engine); - vwidth_ = _srs_config->get_engine_vwidth(engine); - vheight_ = _srs_config->get_engine_vheight(engine); - vthreads_ = _srs_config->get_engine_vthreads(engine); - vprofile_ = _srs_config->get_engine_vprofile(engine); - vpreset_ = _srs_config->get_engine_vpreset(engine); - vparams_ = _srs_config->get_engine_vparams(engine); - acodec_ = _srs_config->get_engine_acodec(engine); - abitrate_ = _srs_config->get_engine_abitrate(engine); - asample_rate_ = _srs_config->get_engine_asample_rate(engine); - achannels_ = _srs_config->get_engine_achannels(engine); - aparams_ = _srs_config->get_engine_aparams(engine); - oformat_ = _srs_config->get_engine_oformat(engine); + perfile_ = config_->get_engine_perfile(engine); + iformat_ = config_->get_engine_iformat(engine); + vfilter_ = config_->get_engine_vfilter(engine); + vcodec_ = config_->get_engine_vcodec(engine); + vbitrate_ = config_->get_engine_vbitrate(engine); + vfps_ = config_->get_engine_vfps(engine); + vwidth_ = config_->get_engine_vwidth(engine); + vheight_ = config_->get_engine_vheight(engine); + vthreads_ = config_->get_engine_vthreads(engine); + vprofile_ = config_->get_engine_vprofile(engine); + vpreset_ = config_->get_engine_vpreset(engine); + vparams_ = config_->get_engine_vparams(engine); + acodec_ = config_->get_engine_acodec(engine); + abitrate_ = config_->get_engine_abitrate(engine); + asample_rate_ = config_->get_engine_asample_rate(engine); + achannels_ = config_->get_engine_achannels(engine); + aparams_ = config_->get_engine_aparams(engine); + oformat_ = config_->get_engine_oformat(engine); // ensure the size is even. vwidth_ -= vwidth_ % 2; diff --git a/trunk/src/app/srs_app_ffmpeg.hpp b/trunk/src/app/srs_app_ffmpeg.hpp index 5a0c6f176..624b79517 100644 --- a/trunk/src/app/srs_app_ffmpeg.hpp +++ b/trunk/src/app/srs_app_ffmpeg.hpp @@ -16,13 +16,46 @@ class SrsConfDirective; class SrsPithyPrint; +class ISrsPithyPrint; class SrsProcess; +class ISrsProcess; +class ISrsAppConfig; + +// The ffmpeg interface. +class ISrsFFMPEG +{ +public: + ISrsFFMPEG(); + virtual ~ISrsFFMPEG(); + +public: + virtual void append_iparam(std::string iparam) = 0; + virtual void set_oformat(std::string format) = 0; + virtual std::string output() = 0; + +public: + virtual srs_error_t initialize(std::string in, std::string out, std::string log) = 0; + virtual srs_error_t initialize_transcode(SrsConfDirective *engine) = 0; + virtual srs_error_t initialize_copy() = 0; + +public: + virtual srs_error_t start() = 0; + virtual srs_error_t cycle() = 0; + virtual void stop() = 0; + +public: + virtual void fast_stop() = 0; + virtual void fast_kill() = 0; +}; // A transcode engine: ffmepg, used to transcode a stream to another. -class SrsFFMPEG +class SrsFFMPEG : public ISrsFFMPEG { private: - SrsProcess *process_; + ISrsAppConfig *config_; + +private: + ISrsProcess *process_; std::vector params_; private: diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index f725d342c..69af6de0b 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -11,6 +11,7 @@ using namespace std; #include #include +#include #include #include #include @@ -22,7 +23,6 @@ using namespace std; #include #include #include -#include // The HTTP response body should be "0", see https://github.com/ossrs/srs/issues/3215#issuecomment-1319991512 #define SRS_HTTP_RESPONSE_OK SRS_XSTR(0) diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index 9358f4476..899a73556 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -10,6 +10,7 @@ using namespace std; #include +#include #include #include #include @@ -18,6 +19,14 @@ using namespace std; #include #include +ISrsIngesterFFMPEG::ISrsIngesterFFMPEG() +{ +} + +ISrsIngesterFFMPEG::~ISrsIngesterFFMPEG() +{ +} + SrsIngesterFFMPEG::SrsIngesterFFMPEG() { ffmpeg_ = NULL; @@ -29,7 +38,7 @@ SrsIngesterFFMPEG::~SrsIngesterFFMPEG() srs_freep(ffmpeg_); } -srs_error_t SrsIngesterFFMPEG::initialize(SrsFFMPEG *ff, string v, string i) +srs_error_t SrsIngesterFFMPEG::initialize(ISrsFFMPEG *ff, string v, string i) { srs_error_t err = srs_success; @@ -86,24 +95,34 @@ void SrsIngesterFFMPEG::fast_kill() ffmpeg_->fast_kill(); } +ISrsIngester::ISrsIngester() +{ +} + +ISrsIngester::~ISrsIngester() +{ +} + SrsIngester::SrsIngester() { - _srs_config->subscribe(this); - expired_ = false; disposed_ = false; trd_ = new SrsDummyCoroutine(); pprint_ = SrsPithyPrint::create_ingester(); + + app_factory_ = _srs_app_factory; + config_ = _srs_config; } SrsIngester::~SrsIngester() { - _srs_config->unsubscribe(this); - srs_freep(trd_); clear_engines(); srs_freep(pprint_); + + app_factory_ = NULL; + config_ = NULL; } void SrsIngester::dispose() @@ -116,7 +135,8 @@ void SrsIngester::dispose() // first, use fast stop to notice all FFMPEG to quit gracefully. fast_stop(); - srs_usleep(100 * SRS_UTIME_MILLISECONDS); + SrsUniquePtr time(app_factory_->create_time()); + time->usleep(100 * SRS_UTIME_MILLISECONDS); // then, use fast kill to ensure FFMPEG quit. fast_kill(); @@ -136,7 +156,7 @@ srs_error_t SrsIngester::start() // start thread to run all encoding engines. srs_freep(trd_); - trd_ = new SrsSTCoroutine("ingest", this, _srs_context->get_id()); + trd_ = app_factory_->create_coroutine("ingest", this, _srs_context->get_id()); if ((err = trd_->start()) != srs_success) { return srs_error_wrap(err, "start coroutine"); @@ -153,9 +173,9 @@ void SrsIngester::stop() void SrsIngester::fast_stop() { - std::vector::iterator it; + std::vector::iterator it; for (it = ingesters_.begin(); it != ingesters_.end(); ++it) { - SrsIngesterFFMPEG *ingester = *it; + ISrsIngesterFFMPEG *ingester = *it; ingester->fast_stop(); } @@ -166,9 +186,9 @@ void SrsIngester::fast_stop() void SrsIngester::fast_kill() { - std::vector::iterator it; + std::vector::iterator it; for (it = ingesters_.begin(); it != ingesters_.end(); ++it) { - SrsIngesterFFMPEG *ingester = *it; + ISrsIngesterFFMPEG *ingester = *it; ingester->fast_kill(); } @@ -185,6 +205,8 @@ srs_error_t SrsIngester::cycle() { srs_error_t err = srs_success; + SrsUniquePtr time(app_factory_->create_time()); + while (!disposed_) { // We always check status first. // @see https://github.com/ossrs/srs/issues/1634#issuecomment-597571561 @@ -197,7 +219,7 @@ srs_error_t SrsIngester::cycle() srs_freep(err); } - srs_usleep(SRS_INGESTER_CIMS); + time->usleep(SRS_INGESTER_CIMS); } return err; @@ -222,9 +244,9 @@ srs_error_t SrsIngester::do_cycle() } // cycle exists ingesters. - std::vector::iterator it; + std::vector::iterator it; for (it = ingesters_.begin(); it != ingesters_.end(); ++it) { - SrsIngesterFFMPEG *ingester = *it; + ISrsIngesterFFMPEG *ingester = *it; // start all ffmpegs. if ((err = ingester->start()) != srs_success) { @@ -245,10 +267,10 @@ srs_error_t SrsIngester::do_cycle() void SrsIngester::clear_engines() { - std::vector::iterator it; + std::vector::iterator it; for (it = ingesters_.begin(); it != ingesters_.end(); ++it) { - SrsIngesterFFMPEG *ingester = *it; + ISrsIngesterFFMPEG *ingester = *it; srs_freep(ingester); } @@ -261,7 +283,7 @@ srs_error_t SrsIngester::parse() // parse ingesters std::vector vhosts; - _srs_config->get_vhosts(vhosts); + config_->get_vhosts(vhosts); for (int i = 0; i < (int)vhosts.size(); i++) { SrsConfDirective *vhost = vhosts[i]; @@ -278,11 +300,11 @@ srs_error_t SrsIngester::parse_ingesters(SrsConfDirective *vhost) srs_error_t err = srs_success; // when vhost disabled, ignore any ingesters. - if (!_srs_config->get_vhost_enabled(vhost)) { + if (!config_->get_vhost_enabled(vhost)) { return err; } - std::vector ingesters = _srs_config->get_ingesters(vhost->arg0()); + std::vector ingesters = config_->get_ingesters(vhost->arg0()); // create engine for (int i = 0; i < (int)ingesters.size(); i++) { @@ -299,27 +321,27 @@ srs_error_t SrsIngester::parse_engines(SrsConfDirective *vhost, SrsConfDirective { srs_error_t err = srs_success; - if (!_srs_config->get_ingest_enabled(ingest)) { + if (!config_->get_ingest_enabled(ingest)) { return err; } - std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest); + std::string ffmpeg_bin = config_->get_ingest_ffmpeg(ingest); if (ffmpeg_bin.empty()) { return srs_error_new(ERROR_ENCODER_PARSE, "parse ffmpeg"); } // get all engines. - std::vector engines = _srs_config->get_transcode_engines(ingest); + std::vector engines = config_->get_transcode_engines(ingest); // create ingesters without engines. if (engines.empty()) { - SrsFFMPEG *ffmpeg = new SrsFFMPEG(ffmpeg_bin); + ISrsFFMPEG *ffmpeg = app_factory_->create_ffmpeg(ffmpeg_bin); if ((err = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != srs_success) { srs_freep(ffmpeg); return srs_error_wrap(err, "init ffmpeg"); } - SrsIngesterFFMPEG *ingester = new SrsIngesterFFMPEG(); + ISrsIngesterFFMPEG *ingester = app_factory_->create_ingester_ffmpeg(); if ((err = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != srs_success) { srs_freep(ingester); return srs_error_wrap(err, "init ingester"); @@ -332,13 +354,13 @@ srs_error_t SrsIngester::parse_engines(SrsConfDirective *vhost, SrsConfDirective // create ingesters with engine for (int i = 0; i < (int)engines.size(); i++) { SrsConfDirective *engine = engines[i]; - SrsFFMPEG *ffmpeg = new SrsFFMPEG(ffmpeg_bin); + ISrsFFMPEG *ffmpeg = app_factory_->create_ffmpeg(ffmpeg_bin); if ((err = initialize_ffmpeg(ffmpeg, vhost, ingest, engine)) != srs_success) { srs_freep(ffmpeg); return srs_error_wrap(err, "init ffmpeg"); } - SrsIngesterFFMPEG *ingester = new SrsIngesterFFMPEG(); + ISrsIngesterFFMPEG *ingester = app_factory_->create_ingester_ffmpeg(); if ((err = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != srs_success) { srs_freep(ingester); return srs_error_wrap(err, "init ingester"); @@ -350,13 +372,13 @@ srs_error_t SrsIngester::parse_engines(SrsConfDirective *vhost, SrsConfDirective return err; } -srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective *vhost, SrsConfDirective *ingest, SrsConfDirective *engine) +srs_error_t SrsIngester::initialize_ffmpeg(ISrsFFMPEG *ffmpeg, SrsConfDirective *vhost, SrsConfDirective *ingest, SrsConfDirective *engine) { srs_error_t err = srs_success; int port; if (true) { - std::vector ip_ports = _srs_config->get_listens(); + std::vector ip_ports = config_->get_listens(); srs_assert(ip_ports.size() > 0); std::string ip; @@ -364,7 +386,7 @@ srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective * srs_net_split_for_listener(ep, ip, port); } - std::string output = _srs_config->get_engine_output(engine); + std::string output = config_->get_engine_output(engine); // output stream, to other/self server // ie. rtmp://localhost:1935/live/livestream_sd output = srs_strings_replace(output, "[vhost]", vhost->arg0()); @@ -390,8 +412,8 @@ srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective * std::string log_file = SRS_CONSTS_NULL_FILE; // disabled // write ffmpeg info to log file. - if (_srs_config->get_ff_log_enabled()) { - log_file = _srs_config->get_ff_log_dir(); + if (config_->get_ff_log_enabled()) { + log_file = config_->get_ff_log_dir(); log_file += "/"; log_file += "ffmpeg-ingest"; log_file += "-"; @@ -403,20 +425,20 @@ srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective * log_file += ".log"; } - std::string log_level = _srs_config->get_ff_log_level(); + std::string log_level = config_->get_ff_log_level(); if (!log_level.empty()) { ffmpeg->append_iparam("-loglevel"); ffmpeg->append_iparam(log_level); } // input - std::string input_type = _srs_config->get_ingest_input_type(ingest); + std::string input_type = config_->get_ingest_input_type(ingest); if (input_type.empty()) { return srs_error_new(ERROR_ENCODER_NO_INPUT, "empty intput type, ingest=%s", ingest->arg0().c_str()); } if (srs_config_ingest_is_file(input_type)) { - std::string input_url = _srs_config->get_ingest_input_url(ingest); + std::string input_url = config_->get_ingest_input_url(ingest); if (input_url.empty()) { return srs_error_new(ERROR_ENCODER_NO_INPUT, "empty intput url, ingest=%s", ingest->arg0().c_str()); } @@ -428,7 +450,7 @@ srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective * return srs_error_wrap(err, "init ffmpeg"); } } else if (srs_config_ingest_is_stream(input_type)) { - std::string input_url = _srs_config->get_ingest_input_url(ingest); + std::string input_url = config_->get_ingest_input_url(ingest); if (input_url.empty()) { return srs_error_new(ERROR_ENCODER_NO_INPUT, "empty intput url, ingest=%s", ingest->arg0().c_str()); } @@ -446,10 +468,10 @@ srs_error_t SrsIngester::initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective * // set output format to flv for RTMP ffmpeg->set_oformat("flv"); - std::string vcodec = _srs_config->get_engine_vcodec(engine); - std::string acodec = _srs_config->get_engine_acodec(engine); + std::string vcodec = config_->get_engine_vcodec(engine); + std::string acodec = config_->get_engine_acodec(engine); // whatever the engine config, use copy as default. - bool engine_disabled = !engine || !_srs_config->get_engine_enabled(engine); + bool engine_disabled = !engine || !config_->get_engine_enabled(engine); if (engine_disabled || vcodec.empty() || acodec.empty()) { if ((err = ffmpeg->initialize_copy()) != srs_success) { return srs_error_wrap(err, "init ffmpeg"); @@ -476,7 +498,7 @@ void SrsIngester::show_ingest_log_message() // random choose one ingester to report. SrsRand rand; int index = rand.integer() % (int)ingesters_.size(); - SrsIngesterFFMPEG *ingester = ingesters_.at(index); + ISrsIngesterFFMPEG *ingester = ingesters_.at(index); // reportable if (pprint_->can_print()) { diff --git a/trunk/src/app/srs_app_ingest.hpp b/trunk/src/app/srs_app_ingest.hpp index fec6a0ce9..e84d5ac38 100644 --- a/trunk/src/app/srs_app_ingest.hpp +++ b/trunk/src/app/srs_app_ingest.hpp @@ -15,16 +15,44 @@ #include class SrsFFMPEG; +class ISrsFFMPEG; class SrsConfDirective; class SrsPithyPrint; +class ISrsPithyPrint; +class ISrsAppFactory; +class ISrsAppConfig; + +// The ingest ffmpeg interface. +class ISrsIngesterFFMPEG +{ +public: + ISrsIngesterFFMPEG(); + virtual ~ISrsIngesterFFMPEG(); + +public: + virtual srs_error_t initialize(ISrsFFMPEG *ff, std::string v, std::string i) = 0; + // The ingest uri, [vhost]/[ingest id] + virtual std::string uri() = 0; + // The alive in srs_utime_t. + virtual srs_utime_t alive() = 0; + virtual bool equals(std::string v, std::string i) = 0; + virtual bool equals(std::string v) = 0; + +public: + virtual srs_error_t start() = 0; + virtual void stop() = 0; + virtual srs_error_t cycle() = 0; + virtual void fast_stop() = 0; + virtual void fast_kill() = 0; +}; // Ingester ffmpeg object. -class SrsIngesterFFMPEG +class SrsIngesterFFMPEG : public ISrsIngesterFFMPEG { private: std::string vhost_; std::string id_; - SrsFFMPEG *ffmpeg_; + ISrsFFMPEG *ffmpeg_; srs_utime_t starttime_; public: @@ -32,7 +60,7 @@ public: virtual ~SrsIngesterFFMPEG(); public: - virtual srs_error_t initialize(SrsFFMPEG *ff, std::string v, std::string i); + virtual srs_error_t initialize(ISrsFFMPEG *ff, std::string v, std::string i); // The ingest uri, [vhost]/[ingest id] virtual std::string uri(); // The alive in srs_utime_t. @@ -44,22 +72,35 @@ public: virtual srs_error_t start(); virtual void stop(); virtual srs_error_t cycle(); - // @see SrsFFMPEG.fast_stop(). virtual void fast_stop(); virtual void fast_kill(); }; +// The ingest interface. +class ISrsIngester +{ +public: + ISrsIngester(); + virtual ~ISrsIngester(); + +public: +}; + // Ingest file/stream/device, // encode with FFMPEG(optional), // push to SRS(or any RTMP server) over RTMP. -class SrsIngester : public ISrsCoroutineHandler, public ISrsReloadHandler +class SrsIngester : public ISrsIngester, public ISrsCoroutineHandler { private: - std::vector ingesters_; + ISrsAppFactory *app_factory_; + ISrsAppConfig *config_; + +private: + std::vector ingesters_; private: ISrsCoroutine *trd_; - SrsPithyPrint *pprint_; + ISrsPithyPrint *pprint_; // Whether the ingesters are expired, for example, the listen port changed, // all ingesters must be restart. bool expired_; @@ -94,7 +135,7 @@ private: virtual srs_error_t parse(); virtual srs_error_t parse_ingesters(SrsConfDirective *vhost); virtual srs_error_t parse_engines(SrsConfDirective *vhost, SrsConfDirective *ingest); - virtual srs_error_t initialize_ffmpeg(SrsFFMPEG *ffmpeg, SrsConfDirective *vhost, SrsConfDirective *ingest, SrsConfDirective *engine); + virtual srs_error_t initialize_ffmpeg(ISrsFFMPEG *ffmpeg, SrsConfDirective *vhost, SrsConfDirective *ingest, SrsConfDirective *engine); virtual void show_ingest_log_message(); }; diff --git a/trunk/src/app/srs_app_process.cpp b/trunk/src/app/srs_app_process.cpp index 61ab1868c..689f9fbbd 100644 --- a/trunk/src/app/srs_app_process.cpp +++ b/trunk/src/app/srs_app_process.cpp @@ -27,6 +27,14 @@ using namespace std; #include #include +ISrsProcess::ISrsProcess() +{ +} + +ISrsProcess::~ISrsProcess() +{ +} + SrsProcess::SrsProcess() { is_started_ = false; diff --git a/trunk/src/app/srs_app_process.hpp b/trunk/src/app/srs_app_process.hpp index 1de14369a..c1d9cae6f 100644 --- a/trunk/src/app/srs_app_process.hpp +++ b/trunk/src/app/srs_app_process.hpp @@ -12,6 +12,36 @@ #include #include +// The process interface. +class ISrsProcess +{ +public: + ISrsProcess(); + virtual ~ISrsProcess(); + +public: + // Get pid of process. + virtual int get_pid() = 0; + // whether process is already started. + virtual bool started() = 0; + // Initialize the process with binary and argv. + virtual srs_error_t initialize(std::string binary, std::vector argv) = 0; + +public: + // Start the process, ignore when already started. + virtual srs_error_t start() = 0; + // cycle check the process, update the state of process. + virtual srs_error_t cycle() = 0; + // Send SIGTERM then SIGKILL to ensure the process stopped. + virtual void stop() = 0; + +public: + // The fast stop is to send a SIGTERM. + virtual void fast_stop() = 0; + // Directly kill process, never use it except server quiting. + virtual void fast_kill() = 0; +}; + // Start and stop a process. Call cycle to restart the process when terminated. // The usage: // // the binary is the process to fork. @@ -25,7 +55,7 @@ // if ((ret = process->cycle()) != ERROR_SUCCESS) { return ret; } // process->fast_stop(); // process->stop(); -class SrsProcess +class SrsProcess : public ISrsProcess { private: bool is_started_; diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index c86441c7d..f56e14a0d 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -12,6 +12,7 @@ using namespace std; #include +#include #include #include #include @@ -32,7 +33,6 @@ using namespace std; #include #include #include -#include extern SrsPps *_srs_pps_rpkts; extern SrsPps *_srs_pps_rstuns; diff --git a/trunk/src/app/srs_app_srt_conn.hpp b/trunk/src/app/srs_app_srt_conn.hpp index 4b642caa8..901f511bf 100644 --- a/trunk/src/app/srs_app_srt_conn.hpp +++ b/trunk/src/app/srs_app_srt_conn.hpp @@ -101,9 +101,9 @@ private: // The SRT connection, for client to publish or play stream. class ISrsMpegtsSrtConnection : public ISrsConnection, // It's a resource. - public ISrsStartable, - public ISrsCoroutineHandler, - public ISrsExpire + public ISrsStartable, + public ISrsCoroutineHandler, + public ISrsExpire { public: ISrsMpegtsSrtConnection(); diff --git a/trunk/src/utest/srs_utest_app13.cpp b/trunk/src/utest/srs_utest_app13.cpp index fdfdb5948..1750e5ded 100644 --- a/trunk/src/utest/srs_utest_app13.cpp +++ b/trunk/src/utest/srs_utest_app13.cpp @@ -3249,6 +3249,16 @@ ISrsRtcConnection *MockDvrAppFactory::create_rtc_connection(ISrsExecRtcAsyncTask return NULL; } +ISrsFFMPEG *MockDvrAppFactory::create_ffmpeg(std::string ffmpeg_bin) +{ + return NULL; +} + +ISrsIngesterFFMPEG *MockDvrAppFactory::create_ingester_ffmpeg() +{ + return NULL; +} + ISrsCoroutine *MockDvrAppFactory::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid) { return NULL; diff --git a/trunk/src/utest/srs_utest_app13.hpp b/trunk/src/utest/srs_utest_app13.hpp index 2df22d735..ad3e7739b 100644 --- a/trunk/src/utest/srs_utest_app13.hpp +++ b/trunk/src/utest/srs_utest_app13.hpp @@ -643,6 +643,8 @@ public: virtual ISrsFragmentedMp4 *create_fragmented_mp4(); virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler); virtual ISrsRtcConnection *create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid); + virtual ISrsFFMPEG *create_ffmpeg(std::string ffmpeg_bin); + virtual ISrsIngesterFFMPEG *create_ingester_ffmpeg(); // ISrsKernelFactory interface methods virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid); virtual ISrsTime *create_time(); diff --git a/trunk/src/utest/srs_utest_app14.cpp b/trunk/src/utest/srs_utest_app14.cpp index 27f84ac3c..aa10ca591 100644 --- a/trunk/src/utest/srs_utest_app14.cpp +++ b/trunk/src/utest/srs_utest_app14.cpp @@ -2470,6 +2470,16 @@ ISrsRtcConnection *MockAppFactoryForGbPublish::create_rtc_connection(ISrsExecRtc return NULL; } +ISrsFFMPEG *MockAppFactoryForGbPublish::create_ffmpeg(std::string ffmpeg_bin) +{ + return NULL; +} + +ISrsIngesterFFMPEG *MockAppFactoryForGbPublish::create_ingester_ffmpeg() +{ + return NULL; +} + ISrsCoroutine *MockAppFactoryForGbPublish::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid) { return NULL; diff --git a/trunk/src/utest/srs_utest_app14.hpp b/trunk/src/utest/srs_utest_app14.hpp index 461217d7f..1c3317abb 100644 --- a/trunk/src/utest/srs_utest_app14.hpp +++ b/trunk/src/utest/srs_utest_app14.hpp @@ -633,6 +633,8 @@ public: virtual ISrsFragmentedMp4 *create_fragmented_mp4(); virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler); virtual ISrsRtcConnection *create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid); + virtual ISrsFFMPEG *create_ffmpeg(std::string ffmpeg_bin); + virtual ISrsIngesterFFMPEG *create_ingester_ffmpeg(); // ISrsKernelFactory interface methods virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid); virtual ISrsTime *create_time(); diff --git a/trunk/src/utest/srs_utest_app15.cpp b/trunk/src/utest/srs_utest_app15.cpp index a91665514..e44b9f7a5 100644 --- a/trunk/src/utest/srs_utest_app15.cpp +++ b/trunk/src/utest/srs_utest_app15.cpp @@ -3725,7 +3725,7 @@ srs_error_t MockStatisticForHooks::on_video_info(ISrsRequest *req, SrsVideoCodec } srs_error_t MockStatisticForHooks::on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, - SrsAudioChannels asound_type, SrsAacObjectType aac_object) + SrsAudioChannels asound_type, SrsAacObjectType aac_object) { return srs_success; } @@ -3849,7 +3849,7 @@ public: // Create mock body reader for GET requests (e.g., on_hls_notify) MockHttpResponseReaderForHooks *reader = new MockHttpResponseReaderForHooks(); - reader->content_ = "OK"; // Simple response body + reader->content_ = "OK"; // Simple response body msg->body_reader_ = reader; client->mock_response_ = msg; @@ -4116,7 +4116,7 @@ VOID TEST(HttpHooksTest, OnHlsSuccess) std::string m3u8 = "/data/hls/test.vhost/live/stream1/playlist.m3u8"; std::string m3u8_url = "http://127.0.0.1:8080/live/stream1/playlist.m3u8"; int sn = 123; - srs_utime_t duration = 10 * SRS_UTIME_SECONDS; // 10 seconds + srs_utime_t duration = 10 * SRS_UTIME_SECONDS; // 10 seconds HELPER_EXPECT_SUCCESS(hooks->on_hls(cid, url, req.get(), file, ts_url, m3u8, m3u8_url, sn, duration)); @@ -4159,7 +4159,7 @@ VOID TEST(HttpHooksTest, OnHlsNotifySuccess) cid.set_value("client-789"); std::string url = "http://127.0.0.1:8085/api/v1/hls/notify?server=[server_id]&service=[service_id]&app=[app]&stream=[stream]&ts=[ts_url]¶m=[param]"; std::string ts_url = "segment-123.ts"; - int nb_notify = 1024; // Read up to 1KB from response + int nb_notify = 1024; // Read up to 1KB from response HELPER_EXPECT_SUCCESS(hooks->on_hls_notify(cid, url, req.get(), ts_url, nb_notify)); @@ -4265,4 +4265,3 @@ VOID TEST(HttpHooksTest, OnForwardBackendSuccess) hooks->stat_ = NULL; mock_factory->mock_http_client_ = NULL; } - diff --git a/trunk/src/utest/srs_utest_app16.cpp b/trunk/src/utest/srs_utest_app16.cpp index d0ed2781e..1ed1a1472 100644 --- a/trunk/src/utest/srs_utest_app16.cpp +++ b/trunk/src/utest/srs_utest_app16.cpp @@ -9,7 +9,10 @@ using namespace std; #include +#include +#include #include +#include #include #include #include @@ -190,18 +193,20 @@ VOID TEST(UdpListenerTest, ListenAndReceivePacket) dest_addr.sin_port = htons(port); dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); - int sent = srs_sendto(client_fd, (void *)test_data.c_str(), test_data.size(), - (sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT); - EXPECT_EQ(sent, (int)test_data.size()); + for (int i = 0; i < 3; i++) { + int sent = srs_sendto(client_fd, (void *)test_data.c_str(), test_data.size(), + (sockaddr *)&dest_addr, sizeof(dest_addr), SRS_UTIME_NO_TIMEOUT); + EXPECT_EQ(sent, (int)test_data.size()); + } // Wait a bit for the listener to receive and process the packet - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_usleep(30 * SRS_UTIME_MILLISECONDS); // Verify that the mock handler received the packet EXPECT_TRUE(mock_handler->on_udp_packet_called_); - EXPECT_EQ(mock_handler->packet_count_, 1); - EXPECT_EQ(mock_handler->last_packet_size_, (int)test_data.size()); - EXPECT_EQ(mock_handler->last_packet_data_, test_data); + EXPECT_GE(mock_handler->packet_count_, 1); + EXPECT_GE(mock_handler->last_packet_size_, (int)test_data.size()); + EXPECT_GE(mock_handler->last_packet_data_, test_data); // Clean up - close the listener listener->close(); @@ -1400,6 +1405,683 @@ VOID TEST(MpegtsSrtConnTest, HttpHooksOnClose) conn->rtc_sources_ = NULL; } +// Mock ISrsIngesterFFMPEG implementation +MockIngesterFFMPEG::MockIngesterFFMPEG() +{ + fast_stop_called_ = false; + fast_kill_called_ = false; +} + +MockIngesterFFMPEG::~MockIngesterFFMPEG() +{ +} + +srs_error_t MockIngesterFFMPEG::initialize(ISrsFFMPEG *ff, std::string v, std::string i) +{ + vhost_ = v; + id_ = i; + return srs_success; +} + +std::string MockIngesterFFMPEG::uri() +{ + return vhost_ + "/" + id_; +} + +srs_utime_t MockIngesterFFMPEG::alive() +{ + return 0; +} + +bool MockIngesterFFMPEG::equals(std::string v, std::string i) +{ + return vhost_ == v && id_ == i; +} + +bool MockIngesterFFMPEG::equals(std::string v) +{ + return vhost_ == v; +} + +srs_error_t MockIngesterFFMPEG::start() +{ + return srs_success; +} + +void MockIngesterFFMPEG::stop() +{ +} + +srs_error_t MockIngesterFFMPEG::cycle() +{ + return srs_success; +} + +void MockIngesterFFMPEG::fast_stop() +{ + fast_stop_called_ = true; +} + +void MockIngesterFFMPEG::fast_kill() +{ + fast_kill_called_ = true; +} + +// Mock ISrsAppFactory for testing SrsIngester +MockAppFactoryForIngester::MockAppFactoryForIngester() +{ + mock_coroutine_ = NULL; + mock_time_ = NULL; + create_coroutine_count_ = 0; + create_time_count_ = 0; +} + +MockAppFactoryForIngester::~MockAppFactoryForIngester() +{ + // Don't free mock_coroutine_ and mock_time_ - they are managed by the test +} + +ISrsFileWriter *MockAppFactoryForIngester::create_file_writer() +{ + return NULL; +} + +ISrsFileWriter *MockAppFactoryForIngester::create_enc_file_writer() +{ + return NULL; +} + +ISrsFileReader *MockAppFactoryForIngester::create_file_reader() +{ + return NULL; +} + +SrsPath *MockAppFactoryForIngester::create_path() +{ + return NULL; +} + +SrsLiveSource *MockAppFactoryForIngester::create_live_source() +{ + return NULL; +} + +ISrsOriginHub *MockAppFactoryForIngester::create_origin_hub() +{ + return NULL; +} + +ISrsHourGlass *MockAppFactoryForIngester::create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval) +{ + return NULL; +} + +ISrsBasicRtmpClient *MockAppFactoryForIngester::create_rtmp_client(std::string url, srs_utime_t cto, srs_utime_t sto) +{ + return NULL; +} + +ISrsHttpClient *MockAppFactoryForIngester::create_http_client() +{ + return NULL; +} + +ISrsFileReader *MockAppFactoryForIngester::create_http_file_reader(ISrsHttpResponseReader *r) +{ + return NULL; +} + +ISrsFlvDecoder *MockAppFactoryForIngester::create_flv_decoder() +{ + return NULL; +} + +ISrsRtspSendTrack *MockAppFactoryForIngester::create_rtsp_audio_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc) +{ + return NULL; +} + +ISrsRtspSendTrack *MockAppFactoryForIngester::create_rtsp_video_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc) +{ + return NULL; +} + +ISrsFlvTransmuxer *MockAppFactoryForIngester::create_flv_transmuxer() +{ + return NULL; +} + +ISrsMp4Encoder *MockAppFactoryForIngester::create_mp4_encoder() +{ + return NULL; +} + +ISrsDvrSegmenter *MockAppFactoryForIngester::create_dvr_flv_segmenter() +{ + return NULL; +} + +ISrsDvrSegmenter *MockAppFactoryForIngester::create_dvr_mp4_segmenter() +{ + return NULL; +} + +ISrsGbMediaTcpConn *MockAppFactoryForIngester::create_gb_media_tcp_conn() +{ + return NULL; +} + +ISrsGbSession *MockAppFactoryForIngester::create_gb_session() +{ + return NULL; +} + +ISrsInitMp4 *MockAppFactoryForIngester::create_init_mp4() +{ + return NULL; +} + +ISrsFragmentWindow *MockAppFactoryForIngester::create_fragment_window() +{ + return NULL; +} + +ISrsFragmentedMp4 *MockAppFactoryForIngester::create_fragmented_mp4() +{ + return NULL; +} + +ISrsIpListener *MockAppFactoryForIngester::create_tcp_listener(ISrsTcpHandler *handler) +{ + return NULL; +} + +ISrsRtcConnection *MockAppFactoryForIngester::create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid) +{ + return NULL; +} + +ISrsFFMPEG *MockAppFactoryForIngester::create_ffmpeg(std::string ffmpeg_bin) +{ + return new MockFFMPEG(); +} + +ISrsIngesterFFMPEG *MockAppFactoryForIngester::create_ingester_ffmpeg() +{ + return new SrsIngesterFFMPEG(); +} + +ISrsCoroutine *MockAppFactoryForIngester::create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid) +{ + create_coroutine_count_++; + return mock_coroutine_; +} + +ISrsTime *MockAppFactoryForIngester::create_time() +{ + create_time_count_++; + return mock_time_; +} + +ISrsConfig *MockAppFactoryForIngester::create_config() +{ + return NULL; +} + +ISrsCond *MockAppFactoryForIngester::create_cond() +{ + return NULL; +} + +void MockAppFactoryForIngester::reset() +{ + create_coroutine_count_ = 0; + create_time_count_ = 0; +} + +// Mock ISrsAppConfig for testing SrsIngester +MockAppConfigForIngester::MockAppConfigForIngester() +{ +} + +MockAppConfigForIngester::~MockAppConfigForIngester() +{ + clear_vhosts(); +} + +void MockAppConfigForIngester::get_vhosts(std::vector &vhosts) +{ + vhosts = vhosts_; +} + +std::vector MockAppConfigForIngester::get_listens() +{ + std::vector listens; + listens.push_back("1935"); + return listens; +} + +std::vector MockAppConfigForIngester::get_ingesters(std::string vhost) +{ + std::vector ingesters; + for (size_t i = 0; i < vhosts_.size(); i++) { + SrsConfDirective *v = vhosts_[i]; + if (v->arg0() == vhost) { + for (size_t j = 0; j < v->directives_.size(); j++) { + SrsConfDirective *d = v->directives_[j]; + if (d->name_ == "ingest") { + ingesters.push_back(d); + } + } + } + } + return ingesters; +} + +bool MockAppConfigForIngester::get_ingest_enabled(SrsConfDirective *conf) +{ + if (!conf) { + return false; + } + SrsConfDirective *enabled = conf->get("enabled"); + if (!enabled || enabled->arg0().empty()) { + return false; + } + return enabled->arg0() == "on"; +} + +std::string MockAppConfigForIngester::get_ingest_ffmpeg(SrsConfDirective *conf) +{ + if (!conf) { + return ""; + } + SrsConfDirective *ffmpeg = conf->get("ffmpeg"); + if (!ffmpeg) { + return ""; + } + return ffmpeg->arg0(); +} + +std::string MockAppConfigForIngester::get_ingest_input_type(SrsConfDirective *conf) +{ + if (!conf) { + return ""; + } + SrsConfDirective *input = conf->get("input"); + if (!input) { + return ""; + } + SrsConfDirective *type = input->get("type"); + if (!type) { + return ""; + } + return type->arg0(); +} + +std::string MockAppConfigForIngester::get_ingest_input_url(SrsConfDirective *conf) +{ + if (!conf) { + return ""; + } + SrsConfDirective *input = conf->get("input"); + if (!input) { + return ""; + } + SrsConfDirective *url = input->get("url"); + if (!url) { + return ""; + } + return url->arg0(); +} + +std::vector MockAppConfigForIngester::get_transcode_engines(SrsConfDirective *conf) +{ + std::vector engines; + if (!conf) { + return engines; + } + for (size_t i = 0; i < conf->directives_.size(); i++) { + SrsConfDirective *d = conf->directives_[i]; + if (d->name_ == "engine") { + engines.push_back(d); + } + } + return engines; +} + +bool MockAppConfigForIngester::get_engine_enabled(SrsConfDirective *conf) +{ + if (!conf) { + return false; + } + SrsConfDirective *enabled = conf->get("enabled"); + if (!enabled || enabled->arg0().empty()) { + return false; + } + return enabled->arg0() == "on"; +} + +std::string MockAppConfigForIngester::get_engine_output(SrsConfDirective *conf) +{ + if (!conf) { + return ""; + } + SrsConfDirective *output = conf->get("output"); + if (!output) { + return ""; + } + return output->arg0(); +} + +std::string MockAppConfigForIngester::get_engine_vcodec(SrsConfDirective *conf) +{ + if (!conf) { + return ""; + } + SrsConfDirective *vcodec = conf->get("vcodec"); + if (!vcodec) { + return ""; + } + return vcodec->arg0(); +} + +std::string MockAppConfigForIngester::get_engine_acodec(SrsConfDirective *conf) +{ + if (!conf) { + return ""; + } + SrsConfDirective *acodec = conf->get("acodec"); + if (!acodec) { + return ""; + } + return acodec->arg0(); +} + +std::vector MockAppConfigForIngester::get_engine_perfile(SrsConfDirective *conf) +{ + return std::vector(); +} + +std::string MockAppConfigForIngester::get_engine_iformat(SrsConfDirective *conf) +{ + if (!conf) { + return ""; + } + SrsConfDirective *iformat = conf->get("iformat"); + if (!iformat) { + return ""; + } + return iformat->arg0(); +} + +std::vector MockAppConfigForIngester::get_engine_vfilter(SrsConfDirective *conf) +{ + return std::vector(); +} + +int MockAppConfigForIngester::get_engine_vbitrate(SrsConfDirective *conf) +{ + if (!conf) { + return 0; + } + SrsConfDirective *vbitrate = conf->get("vbitrate"); + if (!vbitrate) { + return 0; + } + return ::atoi(vbitrate->arg0().c_str()); +} + +double MockAppConfigForIngester::get_engine_vfps(SrsConfDirective *conf) +{ + if (!conf) { + return 0; + } + SrsConfDirective *vfps = conf->get("vfps"); + if (!vfps) { + return 0; + } + return ::atof(vfps->arg0().c_str()); +} + +int MockAppConfigForIngester::get_engine_vwidth(SrsConfDirective *conf) +{ + if (!conf) { + return 0; + } + SrsConfDirective *vwidth = conf->get("vwidth"); + if (!vwidth) { + return 0; + } + return ::atoi(vwidth->arg0().c_str()); +} + +int MockAppConfigForIngester::get_engine_vheight(SrsConfDirective *conf) +{ + if (!conf) { + return 0; + } + SrsConfDirective *vheight = conf->get("vheight"); + if (!vheight) { + return 0; + } + return ::atoi(vheight->arg0().c_str()); +} + +int MockAppConfigForIngester::get_engine_vthreads(SrsConfDirective *conf) +{ + if (!conf) { + return 0; + } + SrsConfDirective *vthreads = conf->get("vthreads"); + if (!vthreads) { + return 0; + } + return ::atoi(vthreads->arg0().c_str()); +} + +std::string MockAppConfigForIngester::get_engine_vprofile(SrsConfDirective *conf) +{ + if (!conf) { + return ""; + } + SrsConfDirective *vprofile = conf->get("vprofile"); + if (!vprofile) { + return ""; + } + return vprofile->arg0(); +} + +std::string MockAppConfigForIngester::get_engine_vpreset(SrsConfDirective *conf) +{ + if (!conf) { + return ""; + } + SrsConfDirective *vpreset = conf->get("vpreset"); + if (!vpreset) { + return ""; + } + return vpreset->arg0(); +} + +std::vector MockAppConfigForIngester::get_engine_vparams(SrsConfDirective *conf) +{ + return std::vector(); +} + +int MockAppConfigForIngester::get_engine_abitrate(SrsConfDirective *conf) +{ + if (!conf) { + return 0; + } + SrsConfDirective *abitrate = conf->get("abitrate"); + if (!abitrate) { + return 0; + } + return ::atoi(abitrate->arg0().c_str()); +} + +int MockAppConfigForIngester::get_engine_asample_rate(SrsConfDirective *conf) +{ + if (!conf) { + return 0; + } + SrsConfDirective *asample_rate = conf->get("asample_rate"); + if (!asample_rate) { + return 0; + } + return ::atoi(asample_rate->arg0().c_str()); +} + +int MockAppConfigForIngester::get_engine_achannels(SrsConfDirective *conf) +{ + if (!conf) { + return 0; + } + SrsConfDirective *achannels = conf->get("achannels"); + if (!achannels) { + return 0; + } + return ::atoi(achannels->arg0().c_str()); +} + +std::vector MockAppConfigForIngester::get_engine_aparams(SrsConfDirective *conf) +{ + return std::vector(); +} + +std::string MockAppConfigForIngester::get_engine_oformat(SrsConfDirective *conf) +{ + if (!conf) { + return ""; + } + SrsConfDirective *oformat = conf->get("oformat"); + if (!oformat) { + return ""; + } + return oformat->arg0(); +} + +bool MockAppConfigForIngester::get_vhost_enabled(SrsConfDirective *conf) +{ + return true; +} + +void MockAppConfigForIngester::add_vhost(SrsConfDirective *vhost) +{ + vhosts_.push_back(vhost); +} + +void MockAppConfigForIngester::clear_vhosts() +{ + vhosts_.clear(); +} + +VOID TEST(FFMPEGTest, InitializeTranscodeWithValidConfig) +{ + srs_error_t err; + + // Create SrsFFMPEG instance + SrsUniquePtr ffmpeg(new SrsFFMPEG("/usr/bin/ffmpeg")); + + // Create mock config with all required engine configuration methods + SrsUniquePtr mock_config(new MockAppConfigForIngester()); + + // Inject mock config into ffmpeg + ffmpeg->config_ = mock_config.get(); + + // Test 1: Basic initialize() - set input, output, and log file + HELPER_EXPECT_SUCCESS(ffmpeg->initialize("rtmp://localhost/live/stream", "rtmp://localhost/live/output", "/tmp/ffmpeg.log")); + EXPECT_EQ(ffmpeg->output(), "rtmp://localhost/live/output"); + + // Test 2: initialize_transcode() with valid configuration + // Create a mock engine directive with valid transcode settings + SrsUniquePtr engine(new SrsConfDirective()); + engine->name_ = "engine"; + + // Add vcodec configuration (libx264) + SrsConfDirective *vcodec = new SrsConfDirective(); + vcodec->name_ = "vcodec"; + vcodec->args_.push_back("libx264"); + engine->directives_.push_back(vcodec); + + // Add video parameters with valid values + SrsConfDirective *vbitrate = new SrsConfDirective(); + vbitrate->name_ = "vbitrate"; + vbitrate->args_.push_back("800"); + engine->directives_.push_back(vbitrate); + + SrsConfDirective *vfps = new SrsConfDirective(); + vfps->name_ = "vfps"; + vfps->args_.push_back("25"); + engine->directives_.push_back(vfps); + + SrsConfDirective *vwidth = new SrsConfDirective(); + vwidth->name_ = "vwidth"; + vwidth->args_.push_back("1280"); + engine->directives_.push_back(vwidth); + + SrsConfDirective *vheight = new SrsConfDirective(); + vheight->name_ = "vheight"; + vheight->args_.push_back("720"); + engine->directives_.push_back(vheight); + + SrsConfDirective *vthreads = new SrsConfDirective(); + vthreads->name_ = "vthreads"; + vthreads->args_.push_back("4"); + engine->directives_.push_back(vthreads); + + SrsConfDirective *vprofile = new SrsConfDirective(); + vprofile->name_ = "vprofile"; + vprofile->args_.push_back("main"); + engine->directives_.push_back(vprofile); + + SrsConfDirective *vpreset = new SrsConfDirective(); + vpreset->name_ = "vpreset"; + vpreset->args_.push_back("medium"); + engine->directives_.push_back(vpreset); + + // Add acodec configuration (libfdk_aac) + SrsConfDirective *acodec = new SrsConfDirective(); + acodec->name_ = "acodec"; + acodec->args_.push_back("libfdk_aac"); + engine->directives_.push_back(acodec); + + // Add audio parameters with valid values + SrsConfDirective *abitrate = new SrsConfDirective(); + abitrate->name_ = "abitrate"; + abitrate->args_.push_back("64"); + engine->directives_.push_back(abitrate); + + SrsConfDirective *asample_rate = new SrsConfDirective(); + asample_rate->name_ = "asample_rate"; + asample_rate->args_.push_back("44100"); + engine->directives_.push_back(asample_rate); + + SrsConfDirective *achannels = new SrsConfDirective(); + achannels->name_ = "achannels"; + achannels->args_.push_back("2"); + engine->directives_.push_back(achannels); + + SrsConfDirective *oformat = new SrsConfDirective(); + oformat->name_ = "oformat"; + oformat->args_.push_back("flv"); + engine->directives_.push_back(oformat); + + // Call initialize_transcode with the mock engine directive + HELPER_EXPECT_SUCCESS(ffmpeg->initialize_transcode(engine.get())); + + // Test 3: initialize_copy() - copy mode without transcoding + SrsUniquePtr ffmpeg_copy(new SrsFFMPEG("/usr/bin/ffmpeg")); + ffmpeg_copy->config_ = mock_config.get(); + HELPER_EXPECT_SUCCESS(ffmpeg_copy->initialize("rtmp://localhost/live/stream", "rtmp://localhost/live/copy", "/tmp/ffmpeg_copy.log")); + HELPER_EXPECT_SUCCESS(ffmpeg_copy->initialize_copy()); + EXPECT_EQ(ffmpeg_copy->output(), "rtmp://localhost/live/copy"); + + // Clean up - set to NULL to avoid double-free + ffmpeg->config_ = NULL; + ffmpeg_copy->config_ = NULL; +} + VOID TEST(MpegtsSrtConnTest, HttpHooksOnPublish) { srs_error_t err; @@ -1787,7 +2469,7 @@ VOID TEST(RtcServerTest, DiscoverCandidates_AutoDetectIPv4) { // Create mock utility with multiple network interfaces SrsUniquePtr mock_utility(new MockProtocolUtility()); - mock_utility->add_ip("127.0.0.1", "lo", true, true, false); // loopback + mock_utility->add_ip("127.0.0.1", "lo", true, true, false); // loopback mock_utility->add_ip("192.168.1.100", "eth0", true, false, false); // private IPv4 mock_utility->add_ip("10.0.0.50", "eth1", true, false, false); // private IPv4 mock_utility->add_ip("fe80::1", "eth0", false, false, false); // IPv6 @@ -2583,7 +3265,7 @@ VOID TEST(RtcSessionManagerTest, OnUdpPacket_NoSessionFound) char rtp_packet[20]; memset(rtp_packet, 0, sizeof(rtp_packet)); rtp_packet[0] = (char)0x80; // V=2 (10000000) - rtp_packet[1] = 96; // PT=96 (RTP payload type) + rtp_packet[1] = 96; // PT=96 (RTP payload type) // Create mock UDP socket SrsUniquePtr mock_socket(new MockUdpMuxSocket()); @@ -2602,6 +3284,687 @@ VOID TEST(RtcSessionManagerTest, OnUdpPacket_NoSessionFound) session_manager->conn_manager_ = NULL; } -// Note: The remaining tests for RTP, RTCP, DTLS, and STUN packets require more complex setup -// including proper mock UDP networks and session initialization. The above test covers the -// basic error path when no session is found, which is a key scenario in on_udp_packet. +// Mock ISrsFFMPEG implementation +MockFFMPEG::MockFFMPEG() +{ + start_called_ = false; + stop_called_ = false; + cycle_called_ = false; + fast_stop_called_ = false; + fast_kill_called_ = false; + start_error_ = srs_success; + cycle_error_ = srs_success; +} + +MockFFMPEG::~MockFFMPEG() +{ + srs_freep(start_error_); + srs_freep(cycle_error_); +} + +void MockFFMPEG::append_iparam(std::string iparam) +{ +} + +void MockFFMPEG::set_oformat(std::string format) +{ +} + +std::string MockFFMPEG::output() +{ + return ""; +} + +srs_error_t MockFFMPEG::initialize(std::string in, std::string out, std::string log) +{ + return srs_success; +} + +srs_error_t MockFFMPEG::initialize_transcode(SrsConfDirective *engine) +{ + return srs_success; +} + +srs_error_t MockFFMPEG::initialize_copy() +{ + return srs_success; +} + +srs_error_t MockFFMPEG::start() +{ + start_called_ = true; + if (start_error_ != srs_success) { + return srs_error_copy(start_error_); + } + return srs_success; +} + +srs_error_t MockFFMPEG::cycle() +{ + cycle_called_ = true; + if (cycle_error_ != srs_success) { + return srs_error_copy(cycle_error_); + } + return srs_success; +} + +void MockFFMPEG::stop() +{ + stop_called_ = true; +} + +void MockFFMPEG::fast_stop() +{ + fast_stop_called_ = true; +} + +void MockFFMPEG::fast_kill() +{ + fast_kill_called_ = true; +} + +// Test SrsIngesterFFMPEG::uri() - returns vhost/id +VOID TEST(IngesterFFMPEGTest, Uri) +{ + srs_error_t err = srs_success; + + // Create mock FFMPEG + MockFFMPEG *mock_ffmpeg = new MockFFMPEG(); + + // Create SrsIngesterFFMPEG + SrsUniquePtr ingester(new SrsIngesterFFMPEG()); + + // Initialize with vhost and id + HELPER_EXPECT_SUCCESS(ingester->initialize(mock_ffmpeg, "test.vhost", "ingest1")); + + // Test uri() returns vhost/id + EXPECT_STREQ("test.vhost/ingest1", ingester->uri().c_str()); +} + +// Test SrsIngesterFFMPEG::alive() - returns time since start +VOID TEST(IngesterFFMPEGTest, Alive) +{ + srs_error_t err = srs_success; + + // Create mock FFMPEG + MockFFMPEG *mock_ffmpeg = new MockFFMPEG(); + + // Create SrsIngesterFFMPEG + SrsUniquePtr ingester(new SrsIngesterFFMPEG()); + + // Initialize + HELPER_EXPECT_SUCCESS(ingester->initialize(mock_ffmpeg, "test.vhost", "ingest1")); + + // Test alive() returns non-negative duration (just initialized) + srs_utime_t alive_time = ingester->alive(); + EXPECT_TRUE(alive_time >= 0); +} + +// Test SrsIngesterFFMPEG::equals(vhost) - single parameter version +VOID TEST(IngesterFFMPEGTest, EqualsVhost) +{ + srs_error_t err = srs_success; + + // Create mock FFMPEG + MockFFMPEG *mock_ffmpeg = new MockFFMPEG(); + + // Create SrsIngesterFFMPEG + SrsUniquePtr ingester(new SrsIngesterFFMPEG()); + + // Initialize with vhost and id + HELPER_EXPECT_SUCCESS(ingester->initialize(mock_ffmpeg, "test.vhost", "ingest1")); + + // Test equals with matching vhost + EXPECT_TRUE(ingester->equals("test.vhost")); + + // Test equals with non-matching vhost + EXPECT_FALSE(ingester->equals("other.vhost")); +} + +// Test SrsIngesterFFMPEG::equals(vhost, id) - two parameter version +VOID TEST(IngesterFFMPEGTest, EqualsVhostAndId) +{ + srs_error_t err = srs_success; + + // Create mock FFMPEG + MockFFMPEG *mock_ffmpeg = new MockFFMPEG(); + + // Create SrsIngesterFFMPEG + SrsUniquePtr ingester(new SrsIngesterFFMPEG()); + + // Initialize with vhost and id + HELPER_EXPECT_SUCCESS(ingester->initialize(mock_ffmpeg, "test.vhost", "ingest1")); + + // Test equals with matching vhost and id + EXPECT_TRUE(ingester->equals("test.vhost", "ingest1")); + + // Test equals with matching vhost but different id + EXPECT_FALSE(ingester->equals("test.vhost", "ingest2")); + + // Test equals with different vhost but matching id + EXPECT_FALSE(ingester->equals("other.vhost", "ingest1")); + + // Test equals with both different + EXPECT_FALSE(ingester->equals("other.vhost", "ingest2")); +} + +// Test SrsIngesterFFMPEG::start() - delegates to ffmpeg +VOID TEST(IngesterFFMPEGTest, Start) +{ + srs_error_t err = srs_success; + + // Create mock FFMPEG + MockFFMPEG *mock_ffmpeg = new MockFFMPEG(); + + // Create SrsIngesterFFMPEG + SrsUniquePtr ingester(new SrsIngesterFFMPEG()); + + // Initialize + HELPER_EXPECT_SUCCESS(ingester->initialize(mock_ffmpeg, "test.vhost", "ingest1")); + + // Test start() calls ffmpeg->start() + HELPER_EXPECT_SUCCESS(ingester->start()); + EXPECT_TRUE(mock_ffmpeg->start_called_); +} + +// Test SrsIngesterFFMPEG::stop() - delegates to ffmpeg +VOID TEST(IngesterFFMPEGTest, Stop) +{ + srs_error_t err = srs_success; + + // Create mock FFMPEG + MockFFMPEG *mock_ffmpeg = new MockFFMPEG(); + + // Create SrsIngesterFFMPEG + SrsUniquePtr ingester(new SrsIngesterFFMPEG()); + + // Initialize + HELPER_EXPECT_SUCCESS(ingester->initialize(mock_ffmpeg, "test.vhost", "ingest1")); + + // Test stop() calls ffmpeg->stop() + ingester->stop(); + EXPECT_TRUE(mock_ffmpeg->stop_called_); +} + +// Test SrsIngesterFFMPEG::cycle() - delegates to ffmpeg +VOID TEST(IngesterFFMPEGTest, Cycle) +{ + srs_error_t err = srs_success; + + // Create mock FFMPEG + MockFFMPEG *mock_ffmpeg = new MockFFMPEG(); + + // Create SrsIngesterFFMPEG + SrsUniquePtr ingester(new SrsIngesterFFMPEG()); + + // Initialize + HELPER_EXPECT_SUCCESS(ingester->initialize(mock_ffmpeg, "test.vhost", "ingest1")); + + // Test cycle() calls ffmpeg->cycle() + HELPER_EXPECT_SUCCESS(ingester->cycle()); + EXPECT_TRUE(mock_ffmpeg->cycle_called_); +} + +// Test SrsIngesterFFMPEG::fast_stop() - delegates to ffmpeg +VOID TEST(IngesterFFMPEGTest, FastStop) +{ + srs_error_t err = srs_success; + + // Create mock FFMPEG + MockFFMPEG *mock_ffmpeg = new MockFFMPEG(); + + // Create SrsIngesterFFMPEG + SrsUniquePtr ingester(new SrsIngesterFFMPEG()); + + // Initialize + HELPER_EXPECT_SUCCESS(ingester->initialize(mock_ffmpeg, "test.vhost", "ingest1")); + + // Test fast_stop() calls ffmpeg->fast_stop() + ingester->fast_stop(); + EXPECT_TRUE(mock_ffmpeg->fast_stop_called_); +} + +// Test SrsIngesterFFMPEG::fast_kill() - delegates to ffmpeg +VOID TEST(IngesterFFMPEGTest, FastKill) +{ + srs_error_t err = srs_success; + + // Create mock FFMPEG + MockFFMPEG *mock_ffmpeg = new MockFFMPEG(); + + // Create SrsIngesterFFMPEG + SrsUniquePtr ingester(new SrsIngesterFFMPEG()); + + // Initialize + HELPER_EXPECT_SUCCESS(ingester->initialize(mock_ffmpeg, "test.vhost", "ingest1")); + + // Test fast_kill() calls ffmpeg->fast_kill() + ingester->fast_kill(); + EXPECT_TRUE(mock_ffmpeg->fast_kill_called_); +} + +VOID TEST(IngesterTest, Dispose) +{ + // Create SrsIngester + SrsUniquePtr ingester(new SrsIngester()); + + // Create mock ingesters + MockIngesterFFMPEG *mock_ingester1 = new MockIngesterFFMPEG(); + MockIngesterFFMPEG *mock_ingester2 = new MockIngesterFFMPEG(); + + // Add mock ingesters to the ingester (cast to interface type) + ingester->ingesters_.push_back((ISrsIngesterFFMPEG *)mock_ingester1); + ingester->ingesters_.push_back((ISrsIngesterFFMPEG *)mock_ingester2); + + // Create mock time (managed by test, not by factory) + MockTimeForIngester *mock_time = new MockTimeForIngester(); + + // Create mock app factory + SrsUniquePtr mock_factory(new MockAppFactoryForIngester()); + mock_factory->mock_time_ = mock_time; + + // Inject mock factory + ingester->app_factory_ = mock_factory.get(); + + // Test dispose() - should call fast_stop(), sleep, then fast_kill() + ingester->dispose(); + + // Verify fast_stop was called on all ingesters + EXPECT_TRUE(mock_ingester1->fast_stop_called_); + EXPECT_TRUE(mock_ingester2->fast_stop_called_); + + // Verify fast_kill was called on all ingesters + EXPECT_TRUE(mock_ingester1->fast_kill_called_); + EXPECT_TRUE(mock_ingester2->fast_kill_called_); + + // Verify disposed flag is set + EXPECT_TRUE(ingester->disposed_); + + // Test dispose() again - should return early without doing anything + mock_ingester1->fast_stop_called_ = false; + mock_ingester1->fast_kill_called_ = false; + ingester->dispose(); + EXPECT_FALSE(mock_ingester1->fast_stop_called_); + EXPECT_FALSE(mock_ingester1->fast_kill_called_); + + // Clean up - set to NULL to avoid double-free + ingester->app_factory_ = NULL; + ingester->config_ = NULL; + // Note: mock_time is deleted by dispose() call via create_time()->usleep() +} + +VOID TEST(IngesterTest, Start) +{ + srs_error_t err; + + // Create SrsIngester + SrsUniquePtr ingester(new SrsIngester()); + + // Create mock config + SrsUniquePtr mock_config(new MockAppConfigForIngester()); + + // Create mock coroutine + MockSrtCoroutine *mock_coroutine = new MockSrtCoroutine(); + + // Create mock app factory + SrsUniquePtr mock_factory(new MockAppFactoryForIngester()); + mock_factory->mock_coroutine_ = mock_coroutine; + + // Inject mocks + ingester->app_factory_ = mock_factory.get(); + ingester->config_ = mock_config.get(); + + // Test start() - should parse config and start coroutine + HELPER_EXPECT_SUCCESS(ingester->start()); + + // Verify coroutine was created and started + EXPECT_EQ(mock_factory->create_coroutine_count_, 1); + EXPECT_TRUE(mock_coroutine->started_); + + // Clean up - set to NULL to avoid double-free + ingester->trd_ = NULL; + srs_freep(mock_coroutine); + ingester->app_factory_ = NULL; + ingester->config_ = NULL; +} + +VOID TEST(IngesterTest, Stop) +{ + // Create SrsIngester + SrsUniquePtr ingester(new SrsIngester()); + + // Create mock ingesters + MockIngesterFFMPEG *mock_ingester1 = new MockIngesterFFMPEG(); + MockIngesterFFMPEG *mock_ingester2 = new MockIngesterFFMPEG(); + + // Add mock ingesters to the ingester (cast to interface type) + ingester->ingesters_.push_back((ISrsIngesterFFMPEG *)mock_ingester1); + ingester->ingesters_.push_back((ISrsIngesterFFMPEG *)mock_ingester2); + + // Create mock coroutine + MockSrtCoroutine *mock_coroutine = new MockSrtCoroutine(); + + // Replace the thread with mock + srs_freep(ingester->trd_); + ingester->trd_ = mock_coroutine; + + // Test stop() - should stop coroutine and clear engines + ingester->stop(); + + // Verify ingesters were cleared + EXPECT_TRUE(ingester->ingesters_.empty()); + + // Clean up - set to NULL to avoid double-free + ingester->trd_ = NULL; + srs_freep(mock_coroutine); + ingester->app_factory_ = NULL; + ingester->config_ = NULL; +} + +VOID TEST(IngesterTest, FastStop) +{ + // Create SrsIngester + SrsUniquePtr ingester(new SrsIngester()); + + // Create mock ingesters + MockIngesterFFMPEG *mock_ingester1 = new MockIngesterFFMPEG(); + MockIngesterFFMPEG *mock_ingester2 = new MockIngesterFFMPEG(); + + // Add mock ingesters to the ingester (cast to interface type) + ingester->ingesters_.push_back((ISrsIngesterFFMPEG *)mock_ingester1); + ingester->ingesters_.push_back((ISrsIngesterFFMPEG *)mock_ingester2); + + // Test fast_stop() - should call fast_stop on all ingesters + ingester->fast_stop(); + + // Verify fast_stop was called on all ingesters + EXPECT_TRUE(mock_ingester1->fast_stop_called_); + EXPECT_TRUE(mock_ingester2->fast_stop_called_); + + // Clean up - set to NULL to avoid double-free + ingester->app_factory_ = NULL; + ingester->config_ = NULL; +} + +VOID TEST(IngesterTest, FastKill) +{ + // Create SrsIngester + SrsUniquePtr ingester(new SrsIngester()); + + // Create mock ingesters + MockIngesterFFMPEG *mock_ingester1 = new MockIngesterFFMPEG(); + MockIngesterFFMPEG *mock_ingester2 = new MockIngesterFFMPEG(); + + // Add mock ingesters to the ingester (cast to interface type) + ingester->ingesters_.push_back((ISrsIngesterFFMPEG *)mock_ingester1); + ingester->ingesters_.push_back((ISrsIngesterFFMPEG *)mock_ingester2); + + // Test fast_kill() - should call fast_kill on all ingesters + ingester->fast_kill(); + + // Verify fast_kill was called on all ingesters + EXPECT_TRUE(mock_ingester1->fast_kill_called_); + EXPECT_TRUE(mock_ingester2->fast_kill_called_); + + // Clean up - set to NULL to avoid double-free + ingester->app_factory_ = NULL; + ingester->config_ = NULL; +} + +// Mock ISrsTime implementation for testing SrsIngester +MockTimeForIngester::MockTimeForIngester() +{ + usleep_count_ = 0; + last_usleep_duration_ = 0; +} + +MockTimeForIngester::~MockTimeForIngester() +{ +} + +void MockTimeForIngester::usleep(srs_utime_t duration) +{ + usleep_count_++; + last_usleep_duration_ = duration; +} + +void MockTimeForIngester::reset() +{ + usleep_count_ = 0; + last_usleep_duration_ = 0; +} + +VOID TEST(IngesterTest, Cycle) +{ + srs_error_t err; + + // Create SrsIngester + SrsUniquePtr ingester(new SrsIngester()); + + // Create mock coroutine that returns error after 2 successful pulls + // (MockSrtCoroutine is designed to return success for first 2 calls) + MockSrtCoroutine *mock_coroutine = new MockSrtCoroutine(); + mock_coroutine->pull_error_ = srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "test exit"); + + // Create mock app factory + SrsUniquePtr mock_factory(new MockAppFactoryForIngester()); + // Use mock time that doesn't actually sleep + mock_factory->mock_time_ = new MockTimeForIngester(); + + // Replace the thread with mock + srs_freep(ingester->trd_); + ingester->trd_ = mock_coroutine; + + // Inject mock factory + ingester->app_factory_ = mock_factory.get(); + + // Test cycle() - should check thread status, call do_cycle, sleep, and exit on pull error + // Note: cycle() will create and free the mock time internally, so we can't verify usleep count + err = ingester->cycle(); + HELPER_EXPECT_FAILED(err); + + // Verify pull was called 3 times (MockSrtCoroutine returns error on 3rd call) + EXPECT_EQ(mock_coroutine->pull_count_, 3); + + // Verify time was created (mock time doesn't actually sleep, so test runs fast) + EXPECT_EQ(mock_factory->create_time_count_, 1); + + // Clean up - set to NULL to avoid double-free + ingester->trd_ = NULL; + srs_freep(mock_coroutine); + ingester->app_factory_ = NULL; + // mock_time_ was already freed by SrsUniquePtr in cycle(), so just set to NULL + mock_factory->mock_time_ = NULL; + ingester->config_ = NULL; +} + +VOID TEST(IngesterTest, DoCycle) +{ + srs_error_t err; + + // Create SrsIngester + SrsUniquePtr ingester(new SrsIngester()); + + // Create mock ingesters + MockIngesterFFMPEG *mock_ingester1 = new MockIngesterFFMPEG(); + MockIngesterFFMPEG *mock_ingester2 = new MockIngesterFFMPEG(); + + // Add mock ingesters to the ingester (cast to interface type) + ingester->ingesters_.push_back((ISrsIngesterFFMPEG *)mock_ingester1); + ingester->ingesters_.push_back((ISrsIngesterFFMPEG *)mock_ingester2); + + // Test do_cycle() - should start and cycle all ingesters + HELPER_EXPECT_SUCCESS(ingester->do_cycle()); + + // Verify start and cycle were called on all ingesters (implicitly through success) + // The mock ingesters return success for start() and cycle() + + // Clean up - set to NULL to avoid double-free + ingester->app_factory_ = NULL; + ingester->config_ = NULL; +} + +// Test the major use scenario for SrsIngester parsing: +// - parse() iterates through vhosts and calls parse_ingesters() +// - parse_ingesters() checks vhost enabled and calls parse_engines() +// - parse_engines() checks ingest enabled, gets ffmpeg binary, and creates ingesters +// - initialize_ffmpeg() sets up ffmpeg with input/output configuration +// - clear_engines() cleans up all created ingesters +VOID TEST(IngesterTest, ParseIngestersWithEngine) +{ + srs_error_t err; + + // Create SrsIngester + SrsUniquePtr ingester(new SrsIngester()); + + // Create mock config + SrsUniquePtr mock_config(new MockAppConfigForIngester()); + + // Create mock app factory + SrsUniquePtr mock_factory(new MockAppFactoryForIngester()); + + // Create vhost directive + SrsConfDirective *vhost = new SrsConfDirective(); + vhost->name_ = "vhost"; + vhost->args_.push_back("test.vhost"); + + // Create ingest directive + SrsConfDirective *ingest = new SrsConfDirective(); + ingest->name_ = "ingest"; + ingest->args_.push_back("livestream"); + + // Add enabled directive + SrsConfDirective *enabled = new SrsConfDirective(); + enabled->name_ = "enabled"; + enabled->args_.push_back("on"); + ingest->directives_.push_back(enabled); + + // Add ffmpeg directive + SrsConfDirective *ffmpeg = new SrsConfDirective(); + ffmpeg->name_ = "ffmpeg"; + ffmpeg->args_.push_back("/usr/bin/ffmpeg"); + ingest->directives_.push_back(ffmpeg); + + // Add input directive + SrsConfDirective *input = new SrsConfDirective(); + input->name_ = "input"; + ingest->directives_.push_back(input); + + // Add input type + SrsConfDirective *input_type = new SrsConfDirective(); + input_type->name_ = "type"; + input_type->args_.push_back("file"); + input->directives_.push_back(input_type); + + // Add input url + SrsConfDirective *input_url = new SrsConfDirective(); + input_url->name_ = "url"; + input_url->args_.push_back("./test.flv"); + input->directives_.push_back(input_url); + + // Add engine directive + SrsConfDirective *engine = new SrsConfDirective(); + engine->name_ = "engine"; + ingest->directives_.push_back(engine); + + // Add engine enabled + SrsConfDirective *engine_enabled = new SrsConfDirective(); + engine_enabled->name_ = "enabled"; + engine_enabled->args_.push_back("on"); + engine->directives_.push_back(engine_enabled); + + // Add engine output + SrsConfDirective *engine_output = new SrsConfDirective(); + engine_output->name_ = "output"; + engine_output->args_.push_back("rtmp://127.0.0.1:[port]/live/[stream]?vhost=[vhost]"); + engine->directives_.push_back(engine_output); + + // Add engine vcodec + SrsConfDirective *engine_vcodec = new SrsConfDirective(); + engine_vcodec->name_ = "vcodec"; + engine_vcodec->args_.push_back("copy"); + engine->directives_.push_back(engine_vcodec); + + // Add engine acodec + SrsConfDirective *engine_acodec = new SrsConfDirective(); + engine_acodec->name_ = "acodec"; + engine_acodec->args_.push_back("copy"); + engine->directives_.push_back(engine_acodec); + + // Add ingest to vhost + vhost->directives_.push_back(ingest); + + // Add vhost to mock config + mock_config->add_vhost(vhost); + + // Override mock config methods to return proper values + mock_config->http_hooks_enabled_ = false; + + // Inject mock dependencies + ingester->config_ = mock_config.get(); + ingester->app_factory_ = mock_factory.get(); + + // Test parse() - should parse vhost, ingest, and engine, creating one ingester + HELPER_EXPECT_SUCCESS(ingester->parse()); + + // Verify one ingester was created + EXPECT_EQ((int)ingester->ingesters_.size(), 1); + + // Verify the ingester was initialized with correct vhost and id + ISrsIngesterFFMPEG *created_ingester = ingester->ingesters_[0]; + EXPECT_EQ(created_ingester->uri(), "test.vhost/livestream"); + + // Clean up + ingester->clear_engines(); + ingester->config_ = NULL; + ingester->app_factory_ = NULL; + srs_freep(vhost); +} + +VOID TEST(ProcessTest, InitializeWithRedirections) +{ + srs_error_t err; + + // Create SrsProcess instance + SrsUniquePtr process(new SrsProcess()); + + // Test major use scenario: FFmpeg command with various redirection patterns + // Simulates: ffmpeg -i input.flv -c copy 1>stdout.log 2>stderr.log output.flv + std::string binary = "/usr/bin/ffmpeg"; + std::vector argv; + argv.push_back("/usr/bin/ffmpeg"); + argv.push_back("-i"); + argv.push_back("input.flv"); + argv.push_back("-c"); + argv.push_back("copy"); + argv.push_back("1>stdout.log"); + argv.push_back("2>stderr.log"); + argv.push_back("output.flv"); + + // Initialize process + HELPER_EXPECT_SUCCESS(process->initialize(binary, argv)); + + // Verify binary is set correctly + EXPECT_EQ(process->bin_, binary); + + // Verify stdout redirection is parsed correctly + EXPECT_EQ(process->stdout_file_, "stdout.log"); + + // Verify stderr redirection is parsed correctly + EXPECT_EQ(process->stderr_file_, "stderr.log"); + + // Verify params_ contains only actual command parameters (no redirections) + EXPECT_EQ((int)process->params_.size(), 6); + EXPECT_EQ(process->params_[0], "/usr/bin/ffmpeg"); + EXPECT_EQ(process->params_[1], "-i"); + EXPECT_EQ(process->params_[2], "input.flv"); + EXPECT_EQ(process->params_[3], "-c"); + EXPECT_EQ(process->params_[4], "copy"); + EXPECT_EQ(process->params_[5], "output.flv"); + + // Verify actual_cli_ contains command without redirections + EXPECT_EQ(process->actual_cli_, "/usr/bin/ffmpeg -i input.flv -c copy output.flv"); + + // Verify cli_ contains full original command with redirections + EXPECT_EQ(process->cli_, "/usr/bin/ffmpeg -i input.flv -c copy 1>stdout.log 2>stderr.log output.flv"); +} diff --git a/trunk/src/utest/srs_utest_app16.hpp b/trunk/src/utest/srs_utest_app16.hpp index 4cf61f789..961d22809 100644 --- a/trunk/src/utest/srs_utest_app16.hpp +++ b/trunk/src/utest/srs_utest_app16.hpp @@ -12,13 +12,15 @@ */ #include +#include +#include #include +#include +#include #include #include #include #include -#include -#include // Mock ISrsSrtSocket for testing SrsSrtConnection class MockSrtSocket : public ISrsSrtSocket @@ -454,4 +456,167 @@ public: virtual srs_error_t write(void *buf, size_t size, ssize_t *nwrite); }; +// Mock ISrsFFMPEG for testing SrsIngesterFFMPEG +class MockFFMPEG : public ISrsFFMPEG +{ +public: + bool start_called_; + bool stop_called_; + bool cycle_called_; + bool fast_stop_called_; + bool fast_kill_called_; + srs_error_t start_error_; + srs_error_t cycle_error_; + +public: + MockFFMPEG(); + virtual ~MockFFMPEG(); + +public: + virtual void append_iparam(std::string iparam); + virtual void set_oformat(std::string format); + virtual std::string output(); + virtual srs_error_t initialize(std::string in, std::string out, std::string log); + virtual srs_error_t initialize_transcode(SrsConfDirective *engine); + virtual srs_error_t initialize_copy(); + virtual srs_error_t start(); + virtual srs_error_t cycle(); + virtual void stop(); + virtual void fast_stop(); + virtual void fast_kill(); +}; + +// Mock ISrsIngesterFFMPEG for testing SrsIngester +class MockIngesterFFMPEG : public ISrsIngesterFFMPEG +{ +public: + bool fast_stop_called_; + bool fast_kill_called_; + std::string vhost_; + std::string id_; + +public: + MockIngesterFFMPEG(); + virtual ~MockIngesterFFMPEG(); + +public: + virtual srs_error_t initialize(ISrsFFMPEG *ff, std::string v, std::string i); + virtual std::string uri(); + virtual srs_utime_t alive(); + virtual bool equals(std::string v, std::string i); + virtual bool equals(std::string v); + virtual srs_error_t start(); + virtual void stop(); + virtual srs_error_t cycle(); + virtual void fast_stop(); + virtual void fast_kill(); +}; + +// Mock ISrsAppFactory for testing SrsIngester +class MockAppFactoryForIngester : public ISrsAppFactory +{ +public: + MockSrtCoroutine *mock_coroutine_; + ISrsTime *mock_time_; + int create_coroutine_count_; + int create_time_count_; + +public: + MockAppFactoryForIngester(); + virtual ~MockAppFactoryForIngester(); + +public: + virtual ISrsFileWriter *create_file_writer(); + virtual ISrsFileWriter *create_enc_file_writer(); + virtual ISrsFileReader *create_file_reader(); + virtual SrsPath *create_path(); + virtual SrsLiveSource *create_live_source(); + virtual ISrsOriginHub *create_origin_hub(); + virtual ISrsHourGlass *create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval); + virtual ISrsBasicRtmpClient *create_rtmp_client(std::string url, srs_utime_t cto, srs_utime_t sto); + virtual ISrsHttpClient *create_http_client(); + virtual ISrsFileReader *create_http_file_reader(ISrsHttpResponseReader *r); + virtual ISrsFlvDecoder *create_flv_decoder(); + virtual ISrsRtspSendTrack *create_rtsp_audio_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc); + virtual ISrsRtspSendTrack *create_rtsp_video_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc); + virtual ISrsFlvTransmuxer *create_flv_transmuxer(); + virtual ISrsMp4Encoder *create_mp4_encoder(); + virtual ISrsDvrSegmenter *create_dvr_flv_segmenter(); + virtual ISrsDvrSegmenter *create_dvr_mp4_segmenter(); + virtual ISrsGbMediaTcpConn *create_gb_media_tcp_conn(); + virtual ISrsGbSession *create_gb_session(); + virtual ISrsInitMp4 *create_init_mp4(); + virtual ISrsFragmentWindow *create_fragment_window(); + virtual ISrsFragmentedMp4 *create_fragmented_mp4(); + virtual ISrsIpListener *create_tcp_listener(ISrsTcpHandler *handler); + virtual ISrsRtcConnection *create_rtc_connection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid); + virtual ISrsFFMPEG *create_ffmpeg(std::string ffmpeg_bin); + virtual ISrsIngesterFFMPEG *create_ingester_ffmpeg(); + virtual ISrsCoroutine *create_coroutine(const std::string &name, ISrsCoroutineHandler *handler, SrsContextId cid); + virtual ISrsTime *create_time(); + virtual ISrsConfig *create_config(); + virtual ISrsCond *create_cond(); + void reset(); +}; + +// Mock ISrsAppConfig for testing SrsIngester +class MockAppConfigForIngester : public MockAppConfig +{ +public: + std::vector vhosts_; + +public: + MockAppConfigForIngester(); + virtual ~MockAppConfigForIngester(); + +public: + virtual void get_vhosts(std::vector &vhosts); + virtual std::vector get_listens(); + virtual std::vector get_ingesters(std::string vhost); + virtual bool get_ingest_enabled(SrsConfDirective *conf); + virtual std::string get_ingest_ffmpeg(SrsConfDirective *conf); + virtual std::string get_ingest_input_type(SrsConfDirective *conf); + virtual std::string get_ingest_input_url(SrsConfDirective *conf); + virtual std::vector get_transcode_engines(SrsConfDirective *conf); + virtual bool get_engine_enabled(SrsConfDirective *conf); + virtual std::string get_engine_output(SrsConfDirective *conf); + virtual std::string get_engine_vcodec(SrsConfDirective *conf); + virtual std::string get_engine_acodec(SrsConfDirective *conf); + virtual std::vector get_engine_perfile(SrsConfDirective *conf); + virtual std::string get_engine_iformat(SrsConfDirective *conf); + virtual std::vector get_engine_vfilter(SrsConfDirective *conf); + virtual int get_engine_vbitrate(SrsConfDirective *conf); + virtual double get_engine_vfps(SrsConfDirective *conf); + virtual int get_engine_vwidth(SrsConfDirective *conf); + virtual int get_engine_vheight(SrsConfDirective *conf); + virtual int get_engine_vthreads(SrsConfDirective *conf); + virtual std::string get_engine_vprofile(SrsConfDirective *conf); + virtual std::string get_engine_vpreset(SrsConfDirective *conf); + virtual std::vector get_engine_vparams(SrsConfDirective *conf); + virtual int get_engine_abitrate(SrsConfDirective *conf); + virtual int get_engine_asample_rate(SrsConfDirective *conf); + virtual int get_engine_achannels(SrsConfDirective *conf); + virtual std::vector get_engine_aparams(SrsConfDirective *conf); + virtual std::string get_engine_oformat(SrsConfDirective *conf); + virtual bool get_vhost_enabled(SrsConfDirective *conf); + void add_vhost(SrsConfDirective *vhost); + void clear_vhosts(); +}; + +// Mock ISrsTime for testing SrsIngester +class MockTimeForIngester : public ISrsTime +{ +public: + int usleep_count_; + srs_utime_t last_usleep_duration_; + +public: + MockTimeForIngester(); + virtual ~MockTimeForIngester(); + +public: + virtual void usleep(srs_utime_t duration); + void reset(); +}; + #endif diff --git a/trunk/src/utest/srs_utest_app6.hpp b/trunk/src/utest/srs_utest_app6.hpp index 018b11341..497bdeb53 100644 --- a/trunk/src/utest/srs_utest_app6.hpp +++ b/trunk/src/utest/srs_utest_app6.hpp @@ -448,6 +448,40 @@ public: virtual int get_dash_window_size(std::string vhost) { return 10; } virtual bool get_dash_cleanup(std::string vhost) { return true; } virtual srs_utime_t get_dash_dispose(std::string vhost) { return dash_dispose_; } + // Ingest config + virtual void get_vhosts(std::vector &vhosts) {} + virtual std::vector get_ingesters(std::string vhost) { return std::vector(); } + virtual SrsConfDirective *get_ingest_by_id(std::string vhost, std::string ingest_id) { return NULL; } + virtual bool get_ingest_enabled(SrsConfDirective *conf) { return false; } + virtual std::string get_ingest_ffmpeg(SrsConfDirective *conf) { return ""; } + virtual std::string get_ingest_input_type(SrsConfDirective *conf) { return ""; } + virtual std::string get_ingest_input_url(SrsConfDirective *conf) { return ""; } + // FFmpeg log config + virtual bool get_ff_log_enabled() { return false; } + virtual std::string get_ff_log_dir() { return ""; } + virtual std::string get_ff_log_level() { return ""; } + // Transcode/Engine config + virtual std::vector get_transcode_engines(SrsConfDirective *conf) { return std::vector(); } + virtual bool get_engine_enabled(SrsConfDirective *conf) { return false; } + virtual std::vector get_engine_perfile(SrsConfDirective *conf) { return std::vector(); } + virtual std::string get_engine_iformat(SrsConfDirective *conf) { return ""; } + virtual std::vector get_engine_vfilter(SrsConfDirective *conf) { return std::vector(); } + virtual std::string get_engine_vcodec(SrsConfDirective *conf) { return ""; } + virtual int get_engine_vbitrate(SrsConfDirective *conf) { return 0; } + virtual double get_engine_vfps(SrsConfDirective *conf) { return 0; } + virtual int get_engine_vwidth(SrsConfDirective *conf) { return 0; } + virtual int get_engine_vheight(SrsConfDirective *conf) { return 0; } + virtual int get_engine_vthreads(SrsConfDirective *conf) { return 0; } + virtual std::string get_engine_vprofile(SrsConfDirective *conf) { return ""; } + virtual std::string get_engine_vpreset(SrsConfDirective *conf) { return ""; } + virtual std::vector get_engine_vparams(SrsConfDirective *conf) { return std::vector(); } + virtual std::string get_engine_acodec(SrsConfDirective *conf) { return ""; } + virtual int get_engine_abitrate(SrsConfDirective *conf) { return 0; } + virtual int get_engine_asample_rate(SrsConfDirective *conf) { return 0; } + virtual int get_engine_achannels(SrsConfDirective *conf) { return 0; } + virtual std::vector get_engine_aparams(SrsConfDirective *conf) { return std::vector(); } + virtual std::string get_engine_oformat(SrsConfDirective *conf) { return ""; } + virtual std::string get_engine_output(SrsConfDirective *conf) { return ""; } void set_http_hooks_enabled(bool enabled); void set_on_stop_urls(const std::vector &urls); void clear_on_stop_directive();