From cdfe82357eb4f8d8ae72de3f38b63d1d397678c8 Mon Sep 17 00:00:00 2001 From: OSSRS-AI Date: Fri, 3 Oct 2025 10:15:00 -0400 Subject: [PATCH] AI: Add utest to cover app server module. --- trunk/configure | 3 +- trunk/src/app/srs_app_async_call.cpp | 8 + trunk/src/app/srs_app_async_call.hpp | 13 +- trunk/src/app/srs_app_caster_flv.cpp | 25 + trunk/src/app/srs_app_caster_flv.hpp | 5 + trunk/src/app/srs_app_config.hpp | 59 ++ trunk/src/app/srs_app_edge.cpp | 26 - trunk/src/app/srs_app_factory.cpp | 6 + trunk/src/app/srs_app_factory.hpp | 3 + trunk/src/app/srs_app_rtc_dtls.cpp | 8 + trunk/src/app/srs_app_rtc_dtls.hpp | 14 +- trunk/src/app/srs_app_rtmp_conn.cpp | 244 +++--- trunk/src/app/srs_app_rtmp_conn.hpp | 30 +- trunk/src/app/srs_app_rtmp_source.hpp | 4 + trunk/src/app/srs_app_rtsp_source.cpp | 8 + trunk/src/app/srs_app_rtsp_source.hpp | 16 +- trunk/src/app/srs_app_server.cpp | 244 +++--- trunk/src/app/srs_app_server.hpp | 34 +- trunk/src/app/srs_app_statistic.cpp | 61 -- trunk/src/app/srs_app_statistic.hpp | 9 +- trunk/src/app/srs_app_stream_token.cpp | 8 + trunk/src/app/srs_app_stream_token.hpp | 14 +- trunk/src/kernel/srs_kernel_resource.hpp | 14 + trunk/src/utest/srs_utest.cpp | 3 + trunk/src/utest/srs_utest_app10.cpp | 1004 ++++++++++++++++++++++ trunk/src/utest/srs_utest_app10.hpp | 296 +++++++ trunk/src/utest/srs_utest_app6.cpp | 26 + trunk/src/utest/srs_utest_app6.hpp | 40 + trunk/src/utest/srs_utest_app7.cpp | 24 + trunk/src/utest/srs_utest_app7.hpp | 5 + trunk/src/utest/srs_utest_app9.cpp | 15 + trunk/src/utest/srs_utest_app9.hpp | 3 + trunk/src/utest/srs_utest_kernel3.cpp | 7 + trunk/src/utest/srs_utest_protocol4.cpp | 77 -- trunk/src/utest/srs_utest_service.cpp | 24 + trunk/src/utest/srs_utest_service.hpp | 5 + 36 files changed, 1967 insertions(+), 418 deletions(-) create mode 100644 trunk/src/utest/srs_utest_app10.cpp create mode 100644 trunk/src/utest/srs_utest_app10.hpp diff --git a/trunk/configure b/trunk/configure index c515a2deb..908c8d16f 100755 --- a/trunk/configure +++ b/trunk/configure @@ -383,7 +383,8 @@ if [[ $SRS_UTEST == YES ]]; then "srs_utest_stream_token" "srs_utest_rtc_recv_track" "srs_utest_st2" "srs_utest_hevc_structs" "srs_utest_coworkers" "srs_utest_pithy_print" "srs_utest_kernel3" "srs_utest_protocol4" "srs_utest_protocol3" "srs_utest_app" "srs_utest_app2" "srs_utest_app3" "srs_utest_app4" - "srs_utest_app5" "srs_utest_app6" "srs_utest_app7" "srs_utest_app8" "srs_utest_app9") + "srs_utest_app5" "srs_utest_app6" "srs_utest_app7" "srs_utest_app8" "srs_utest_app9" + "srs_utest_app10") # Always include SRT utest MODULE_FILES+=("srs_utest_srt") if [[ $SRS_GB28181 == YES ]]; then diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp index f632d54d6..2609ad3fe 100644 --- a/trunk/src/app/srs_app_async_call.cpp +++ b/trunk/src/app/srs_app_async_call.cpp @@ -19,6 +19,14 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask() { } +ISrsAsyncCallWorker::ISrsAsyncCallWorker() +{ +} + +ISrsAsyncCallWorker::~ISrsAsyncCallWorker() +{ +} + SrsAsyncCallWorker::SrsAsyncCallWorker() { trd_ = new SrsDummyCoroutine(); diff --git a/trunk/src/app/srs_app_async_call.hpp b/trunk/src/app/srs_app_async_call.hpp index 7f6c04fae..560a6770f 100644 --- a/trunk/src/app/srs_app_async_call.hpp +++ b/trunk/src/app/srs_app_async_call.hpp @@ -35,10 +35,21 @@ public: virtual std::string to_string() = 0; }; +// The async call worker, used to execute the task in async mode. +class ISrsAsyncCallWorker +{ +public: + ISrsAsyncCallWorker(); + virtual ~ISrsAsyncCallWorker(); + +public: + virtual srs_error_t start() = 0; +}; + // The async callback for dvr, callback and other async worker. // When worker call with the task, the worker will do it in isolate thread. // That is, the task is execute/call in async mode. -class SrsAsyncCallWorker : public ISrsCoroutineHandler +class SrsAsyncCallWorker : public ISrsCoroutineHandler, public ISrsAsyncCallWorker { private: ISrsCoroutine *trd_; diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 8ab5458f7..f78faaedc 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -140,6 +140,31 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(ISrsListener *listener, srs_netfd_t s return err; } +srs_error_t SrsAppCasterFlv::start() +{ + return manager_->start(); +} + +bool SrsAppCasterFlv::empty() +{ + return manager_->empty(); +} + +size_t SrsAppCasterFlv::size() +{ + return manager_->size(); +} + +void SrsAppCasterFlv::add(ISrsResource *conn, bool *exists) +{ + manager_->add(conn, exists); +} + +ISrsResource *SrsAppCasterFlv::at(int index) +{ + return manager_->at(index); +} + void SrsAppCasterFlv::remove(ISrsResource *c) { ISrsConnection *conn = dynamic_cast(c); diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index 649c4fe05..94249274f 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -70,6 +70,11 @@ public: virtual srs_error_t on_tcp_client(ISrsListener *listener, srs_netfd_t stfd); // Interface ISrsResourceManager public: + virtual srs_error_t start(); + virtual bool empty(); + virtual size_t size(); + virtual void add(ISrsResource *conn, bool *exists = NULL); + virtual ISrsResource *at(int index); virtual void remove(ISrsResource *c); virtual void subscribe(ISrsDisposingHandler *h); virtual void unsubscribe(ISrsDisposingHandler *h); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index c02083a33..d852eafe3 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -357,6 +357,7 @@ public: public: // Stats config virtual bool get_stats_enabled() = 0; + virtual int get_stats_network() = 0; public: // Heartbeat config @@ -364,8 +365,66 @@ public: virtual srs_utime_t get_heartbeat_interval() = 0; public: + // RTMPS config + virtual std::string get_rtmps_ssl_cert() = 0; + virtual std::string get_rtmps_ssl_key() = 0; + +public: + // Vhost config + virtual SrsConfDirective *get_vhost(std::string vhost, bool try_default_vhost = true) = 0; + virtual bool get_vhost_enabled(std::string vhost) = 0; + virtual bool get_debug_srs_upnode(std::string vhost) = 0; + virtual int get_out_ack_size(std::string vhost) = 0; + virtual int get_in_ack_size(std::string vhost) = 0; + virtual int get_chunk_size(std::string vhost) = 0; + virtual bool get_gop_cache(std::string vhost) = 0; + virtual int get_gop_cache_max_frames(std::string vhost) = 0; + virtual bool get_tcp_nodelay(std::string vhost) = 0; + virtual srs_utime_t get_mw_sleep(std::string vhost, bool is_rtc = false) = 0; + virtual srs_utime_t get_send_min_interval(std::string vhost) = 0; + virtual bool get_mr_enabled(std::string vhost) = 0; + virtual srs_utime_t get_mr_sleep(std::string vhost) = 0; + virtual srs_utime_t get_publish_1stpkt_timeout(std::string vhost) = 0; + virtual srs_utime_t get_publish_normal_timeout(std::string vhost) = 0; + virtual srs_utime_t get_publish_kickoff_for_idle(std::string vhost) = 0; + +public: + // Refer config + virtual bool get_refer_enabled(std::string vhost) = 0; + virtual SrsConfDirective *get_refer_all(std::string vhost) = 0; + virtual SrsConfDirective *get_refer_play(std::string vhost) = 0; + virtual SrsConfDirective *get_refer_publish(std::string vhost) = 0; + +public: + // Edge config + virtual bool get_vhost_origin_cluster(std::string vhost) = 0; + virtual std::vector get_vhost_coworkers(std::string vhost) = 0; + virtual bool get_vhost_edge_token_traverse(std::string vhost) = 0; + virtual SrsConfDirective *get_vhost_edge_origin(std::string vhost) = 0; + +public: + // HTTP hooks config virtual bool get_vhost_http_hooks_enabled(std::string vhost) = 0; + virtual SrsConfDirective *get_vhost_on_connect(std::string vhost) = 0; + virtual SrsConfDirective *get_vhost_on_close(std::string vhost) = 0; + virtual SrsConfDirective *get_vhost_on_publish(std::string vhost) = 0; + virtual SrsConfDirective *get_vhost_on_play(std::string vhost) = 0; virtual SrsConfDirective *get_vhost_on_stop(std::string vhost) = 0; + +public: + // RTC config + virtual bool get_rtc_enabled(std::string vhost) = 0; + +public: + // RTSP config + virtual bool get_rtsp_enabled(std::string vhost) = 0; + +public: + // Stream bridge config + virtual bool get_rtc_from_rtmp(std::string vhost) = 0; + virtual bool get_rtsp_from_rtmp(std::string vhost) = 0; + +public: virtual bool get_rtc_nack_enabled(std::string vhost) = 0; virtual bool get_rtc_nack_no_copy(std::string vhost) = 0; virtual bool get_realtime_enabled(std::string vhost, bool is_rtc) = 0; diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index ee9d7d6d5..1c393ec2a 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -388,9 +388,6 @@ SrsEdgeIngester::SrsEdgeIngester() source_ = NULL; edge_ = NULL; req_ = NULL; -#ifdef SRS_APM - span_main_ = NULL; -#endif upstream_ = new SrsEdgeRtmpUpstream(""); lb_ = new SrsLbRoundRobin(); @@ -401,9 +398,6 @@ SrsEdgeIngester::~SrsEdgeIngester() { stop(); -#ifdef SRS_APM - srs_freep(span_main_); -#endif srs_freep(upstream_); srs_freep(lb_); srs_freep(trd_); @@ -455,14 +449,6 @@ void SrsEdgeIngester::stop() } } -#ifdef SRS_APM -ISrsApmSpan *SrsEdgeIngester::span() -{ - srs_assert(span_main_); - return span_main_; -} -#endif - // when error, edge ingester sleep for a while and retry. #define SRS_EDGE_INGESTER_CIMS (3 * SRS_UTIME_SECONDS) @@ -477,23 +463,11 @@ srs_error_t SrsEdgeIngester::cycle() return srs_error_wrap(err, "edge ingester"); } -#ifdef SRS_APM - srs_assert(span_main_); - ISrsApmSpan *start = _srs_apm->span("edge-start")->set_kind(SrsApmKindConsumer)->as_child(span_main_)->end(); - srs_freep(start); -#endif - if ((err = do_cycle()) != srs_success) { srs_warn("EdgeIngester: Ignore error, %s", srs_error_desc(err).c_str()); srs_freep(err); } -#ifdef SRS_APM - srs_assert(span_main_); - ISrsApmSpan *stop = _srs_apm->span("edge-stop")->set_kind(SrsApmKindConsumer)->as_child(span_main_)->end(); - srs_freep(stop); -#endif - // Check whether coroutine is stopped, see https://github.com/ossrs/srs/issues/2901 if ((err = trd_->pull()) != srs_success) { return srs_error_wrap(err, "edge ingester"); diff --git a/trunk/src/app/srs_app_factory.cpp b/trunk/src/app/srs_app_factory.cpp index 499b8b6ab..af12cbb48 100644 --- a/trunk/src/app/srs_app_factory.cpp +++ b/trunk/src/app/srs_app_factory.cpp @@ -13,6 +13,7 @@ #include #include #include +#include SrsAppFactory::SrsAppFactory() { @@ -54,6 +55,11 @@ ISrsOriginHub *SrsAppFactory::create_origin_hub() return hub; } +ISrsHourGlass *SrsAppFactory::create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval) +{ + return new SrsHourGlass(name, handler, interval); +} + SrsFinalFactory::SrsFinalFactory() { } diff --git a/trunk/src/app/srs_app_factory.hpp b/trunk/src/app/srs_app_factory.hpp index 8d29e46e8..9afa2e43c 100644 --- a/trunk/src/app/srs_app_factory.hpp +++ b/trunk/src/app/srs_app_factory.hpp @@ -16,6 +16,8 @@ class ISrsFileReader; class SrsPath; class SrsLiveSource; class ISrsOriginHub; +class ISrsHourGlass; +class ISrsHourGlassHandler; // The factory to create app objects. class SrsAppFactory @@ -31,6 +33,7 @@ public: virtual SrsPath *create_path(); virtual SrsLiveSource *create_live_source(); virtual ISrsOriginHub *create_origin_hub(); + virtual ISrsHourGlass *create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval); }; extern SrsAppFactory *_srs_app_factory; diff --git a/trunk/src/app/srs_app_rtc_dtls.cpp b/trunk/src/app/srs_app_rtc_dtls.cpp index 2530e66f4..a7d371ae7 100644 --- a/trunk/src/app/srs_app_rtc_dtls.cpp +++ b/trunk/src/app/srs_app_rtc_dtls.cpp @@ -187,6 +187,14 @@ SSL_CTX *srs_build_dtls_ctx(SrsDtlsVersion version, std::string role) } #pragma GCC diagnostic pop +ISrsDtlsCertificate::ISrsDtlsCertificate() +{ +} + +ISrsDtlsCertificate::~ISrsDtlsCertificate() +{ +} + SrsDtlsCertificate::SrsDtlsCertificate() { ecdsa_mode_ = true; diff --git a/trunk/src/app/srs_app_rtc_dtls.hpp b/trunk/src/app/srs_app_rtc_dtls.hpp index f79d9c953..82b099c57 100644 --- a/trunk/src/app/srs_app_rtc_dtls.hpp +++ b/trunk/src/app/srs_app_rtc_dtls.hpp @@ -19,7 +19,19 @@ class ISrsRequest; -class SrsDtlsCertificate +// The interface for DTLS certificate. +class ISrsDtlsCertificate +{ +public: + ISrsDtlsCertificate(); + virtual ~ISrsDtlsCertificate(); + +public: + virtual srs_error_t initialize() = 0; +}; + +// The DTLS certificate. +class SrsDtlsCertificate : public ISrsDtlsCertificate { private: std::string fingerprint_; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 0c1285160..dc54a37c2 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -62,20 +62,22 @@ using namespace std; SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, srs_utime_t ctm, srs_utime_t stm) : SrsBasicRtmpClient(u, ctm, stm) { + config_ = _srs_config; } SrsSimpleRtmpClient::~SrsSimpleRtmpClient() { + config_ = NULL; } srs_error_t SrsSimpleRtmpClient::connect_app() { SrsProtocolUtility utility; std::vector &ips = utility.local_ips(); - srs_assert(_srs_config->get_stats_network() < (int)ips.size()); - SrsIPAddress *local_ip = ips[_srs_config->get_stats_network()]; + srs_assert(config_->get_stats_network() < (int)ips.size()); + SrsIPAddress *local_ip = ips[config_->get_stats_network()]; - bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req_->vhost_); + bool debug_srs_upnode = config_->get_debug_srs_upnode(req_->vhost_); return do_connect_app(local_ip->ip_, debug_srs_upnode); } @@ -148,11 +150,15 @@ int64_t SrsRtmpTransport::get_send_bytes() SrsRtmpsTransport::SrsRtmpsTransport(srs_netfd_t c) : SrsRtmpTransport(c) { ssl_ = new SrsSslConnection(skt_); + + config_ = _srs_config; } SrsRtmpsTransport::~SrsRtmpsTransport() { srs_freep(ssl_); + + config_ = NULL; } ISrsProtocolReadWriter *SrsRtmpsTransport::io() @@ -162,8 +168,8 @@ ISrsProtocolReadWriter *SrsRtmpsTransport::io() srs_error_t SrsRtmpsTransport::handshake() { - string crt_file = _srs_config->get_rtmps_ssl_cert(); - string key_file = _srs_config->get_rtmps_ssl_key(); + string crt_file = config_->get_rtmps_ssl_cert(); + string key_file = config_->get_rtmps_ssl_key(); srs_error_t err = ssl_->handshake(key_file, crt_file); if (err != srs_success) { return srs_error_wrap(err, "ssl handshake"); @@ -185,7 +191,6 @@ SrsRtmpConn::SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, string cip server_ = svr; transport_ = transport; - manager_ = _srs_conn_manager; ip_ = cip; port_ = cport; create_time_ = srsu2ms(srs_time_now_cached()); @@ -213,12 +218,25 @@ SrsRtmpConn::SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, string cip publish_1stpkt_timeout_ = 0; publish_normal_timeout_ = 0; - _srs_config->subscribe(this); + config_ = _srs_config; + manager_ = _srs_conn_manager; + stream_publish_tokens_ = _srs_stream_publish_tokens; + live_sources_ = _srs_sources; + stat_ = _srs_stat; + hooks_ = _srs_hooks; + rtc_sources_ = _srs_rtc_sources; + srt_sources_ = _srs_srt_sources; + rtsp_sources_ = _srs_rtsp_sources; +} + +void SrsRtmpConn::assemble() +{ + config_->subscribe(this); } SrsRtmpConn::~SrsRtmpConn() { - _srs_config->unsubscribe(this); + config_->unsubscribe(this); trd_->interrupt(); // wakeup the handler which need to notice. @@ -235,11 +253,15 @@ SrsRtmpConn::~SrsRtmpConn() srs_freep(rtmp_); srs_freep(refer_); srs_freep(security_); -#ifdef SRS_APM - srs_freep(span_main_); - srs_freep(span_connect_); - srs_freep(span_client_); -#endif + + manager_ = NULL; + stream_publish_tokens_ = NULL; + live_sources_ = NULL; + stat_ = NULL; + hooks_ = NULL; + rtc_sources_ = NULL; + srt_sources_ = NULL; + rtsp_sources_ = NULL; } std::string SrsRtmpConn::desc() @@ -257,12 +279,7 @@ srs_error_t SrsRtmpConn::do_cycle() { srs_error_t err = srs_success; -#ifdef SRS_APM - srs_trace("RTMP client transport=%s, ip=%s:%d, fd=%d, trace=%s, span=%s", transport_->transport_type(), ip.c_str(), port, srs_netfd_fileno(transport_->fd()), - span_main_->format_trace_id(), span_main_->format_span_id()); -#else srs_trace("RTMP client transport=%s, ip=%s:%d, fd=%d", transport_->transport_type(), ip_.c_str(), port_, srs_netfd_fileno(transport_->fd())); -#endif if ((err = transport_->handshake()) != srs_success) { return srs_error_wrap(err, "transport handshake"); @@ -350,12 +367,12 @@ srs_error_t SrsRtmpConn::service_cycle() ISrsRequest *req = info_->req_; - int out_ack_size = _srs_config->get_out_ack_size(req->vhost_); + int out_ack_size = config_->get_out_ack_size(req->vhost_); if (out_ack_size && (err = rtmp_->set_window_ack_size(out_ack_size)) != srs_success) { return srs_error_wrap(err, "rtmp: set out window ack size"); } - int in_ack_size = _srs_config->get_in_ack_size(req->vhost_); + int in_ack_size = config_->get_in_ack_size(req->vhost_); if (in_ack_size && (err = rtmp_->set_in_window_ack_size(in_ack_size)) != srs_success) { return srs_error_wrap(err, "rtmp: set in window ack size"); } @@ -370,7 +387,7 @@ srs_error_t SrsRtmpConn::service_cycle() // set chunk size to larger. // set the chunk size before any larger response greater than 128, // to make OBS happy, @see https://github.com/ossrs/srs/issues/454 - int chunk_size = _srs_config->get_chunk_size(req->vhost_); + int chunk_size = config_->get_chunk_size(req->vhost_); if ((err = rtmp_->set_chunk_size(chunk_size)) != srs_success) { return srs_error_wrap(err, "rtmp: set chunk size %d", chunk_size); } @@ -380,11 +397,6 @@ srs_error_t SrsRtmpConn::service_cycle() return srs_error_wrap(err, "rtmp: response connect app"); } -#ifdef SRS_APM - // Must be a connecting application span. - span_connect_->end(); -#endif - if ((err = rtmp_->on_bw_done()) != srs_success) { return srs_error_wrap(err, "rtmp: on bw down"); } @@ -462,7 +474,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle() srs_client_type_string(info_->type_).c_str(), req->vhost_.c_str(), req->app_.c_str(), req->stream_.c_str(), req->param_.c_str(), srsu2msi(req->duration_)); // discovery vhost, resolve the vhost from config - SrsConfDirective *parsed_vhost = _srs_config->get_vhost(req->vhost_); + SrsConfDirective *parsed_vhost = config_->get_vhost(req->vhost_); if (parsed_vhost) { req->vhost_ = parsed_vhost->arg0(); } @@ -484,8 +496,8 @@ srs_error_t SrsRtmpConn::stream_service_cycle() // do token traverse before serve it. // @see https://github.com/ossrs/srs/pull/239 if (true) { - info_->edge_ = _srs_config->get_vhost_is_edge(req->vhost_); - bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost_); + info_->edge_ = config_->get_vhost_is_edge(req->vhost_); + bool edge_traverse = config_->get_vhost_edge_token_traverse(req->vhost_); if (info_->edge_ && edge_traverse) { if ((err = check_edge_token_traverse_auth()) != srs_success) { return srs_error_wrap(err, "rtmp: check token traverse"); @@ -510,7 +522,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle() // Acquire stream publish token to prevent race conditions across all protocols. SrsStreamPublishToken *publish_token_raw = NULL; - if (info_->type_ != SrsRtmpConnPlay && (err = _srs_stream_publish_tokens->acquire_token(req, publish_token_raw)) != srs_success) { + if (info_->type_ != SrsRtmpConnPlay && (err = stream_publish_tokens_->acquire_token(req, publish_token_raw)) != srs_success) { return srs_error_wrap(err, "acquire stream publish token"); } SrsUniquePtr publish_token(publish_token_raw); @@ -521,13 +533,13 @@ srs_error_t SrsRtmpConn::stream_service_cycle() // find a source to serve. SrsSharedPtr live_source; - if ((err = _srs_sources->fetch_or_create(req, live_source)) != srs_success) { + if ((err = live_sources_->fetch_or_create(req, live_source)) != srs_success) { return srs_error_wrap(err, "rtmp: fetch source"); } srs_assert(live_source.get() != NULL); - bool enabled_cache = _srs_config->get_gop_cache(req->vhost_); - int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost_); + bool enabled_cache = config_->get_gop_cache(req->vhost_); + int gcmf = config_->get_gop_cache_max_frames(req->vhost_); srs_trace("source url=%s, ip=%s, cache=%d/%d, is_edge=%d, source_id=%s/%s", req->get_stream_url().c_str(), ip_.c_str(), enabled_cache, gcmf, info_->edge_, live_source->source_id().c_str(), live_source->pre_source_id().c_str()); @@ -542,8 +554,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle() } // We must do stat the client before hooks, because hooks depends on it. - SrsStatistic *stat = _srs_stat; - if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info_->type_)) != srs_success) { + if ((err = stat_->on_client(_srs_context->get_id().c_str(), req, this, info_->type_)) != srs_success) { return srs_error_wrap(err, "rtmp: stat client"); } @@ -552,13 +563,6 @@ srs_error_t SrsRtmpConn::stream_service_cycle() return srs_error_wrap(err, "rtmp: callback on play"); } -#ifdef SRS_APM - // Must be a client span. - span_client_->set_name("play")->end(); - // We end the connection span because it's a producer and only trace the established. - span_main_->end(); -#endif - err = playing(live_source); http_hooks_on_stop(); @@ -569,13 +573,6 @@ srs_error_t SrsRtmpConn::stream_service_cycle() return srs_error_wrap(err, "rtmp: start FMLE publish"); } -#ifdef SRS_APM - // Must be a client span. - span_client_->set_name("publish")->end(); - // We end the connection span because it's a producer and only trace the established. - span_main_->end(); -#endif - return publishing(live_source); } case SrsRtmpConnHaivisionPublish: { @@ -583,13 +580,6 @@ srs_error_t SrsRtmpConn::stream_service_cycle() return srs_error_wrap(err, "rtmp: start HAIVISION publish"); } -#ifdef SRS_APM - // Must be a client span. - span_client_->set_name("publish")->end(); - // We end the connection span because it's a producer and only trace the established. - span_main_->end(); -#endif - return publishing(live_source); } case SrsRtmpConnFlashPublish: { @@ -597,13 +587,6 @@ srs_error_t SrsRtmpConn::stream_service_cycle() return srs_error_wrap(err, "rtmp: start FLASH publish"); } -#ifdef SRS_APM - // Must be a client span. - span_client_->set_name("publish")->end(); - // We end the connection span because it's a producer and only trace the established. - span_main_->end(); -#endif - return publishing(live_source); } default: { @@ -621,12 +604,12 @@ srs_error_t SrsRtmpConn::check_vhost(bool try_default_vhost) ISrsRequest *req = info_->req_; srs_assert(req != NULL); - SrsConfDirective *vhost = _srs_config->get_vhost(req->vhost_, try_default_vhost); + SrsConfDirective *vhost = config_->get_vhost(req->vhost_, try_default_vhost); if (vhost == NULL) { return srs_error_new(ERROR_RTMP_VHOST_NOT_FOUND, "rtmp: no vhost %s", req->vhost_.c_str()); } - if (!_srs_config->get_vhost_enabled(req->vhost_)) { + if (!config_->get_vhost_enabled(req->vhost_)) { return srs_error_new(ERROR_RTMP_VHOST_NOT_FOUND, "rtmp: vhost %s disabled", req->vhost_.c_str()); } @@ -635,8 +618,8 @@ srs_error_t SrsRtmpConn::check_vhost(bool try_default_vhost) req->vhost_ = vhost->arg0(); } - if (_srs_config->get_refer_enabled(req->vhost_)) { - if ((err = refer_->check(req->pageUrl_, _srs_config->get_refer_all(req->vhost_))) != srs_success) { + if (config_->get_refer_enabled(req->vhost_)) { + if ((err = refer_->check(req->pageUrl_, config_->get_refer_all(req->vhost_))) != srs_success) { return srs_error_wrap(err, "rtmp: referer check"); } } @@ -654,16 +637,16 @@ srs_error_t SrsRtmpConn::playing(SrsSharedPtr source) // Check page referer of player. ISrsRequest *req = info_->req_; - if (_srs_config->get_refer_enabled(req->vhost_)) { - if ((err = refer_->check(req->pageUrl_, _srs_config->get_refer_play(req->vhost_))) != srs_success) { + if (config_->get_refer_enabled(req->vhost_)) { + if ((err = refer_->check(req->pageUrl_, config_->get_refer_play(req->vhost_))) != srs_success) { return srs_error_wrap(err, "rtmp: referer check"); } } // When origin cluster enabled, try to redirect to the origin which is active. // A active origin is a server which is delivering stream. - if (!info_->edge_ && _srs_config->get_vhost_origin_cluster(req->vhost_) && source->inactive()) { - vector coworkers = _srs_config->get_vhost_coworkers(req->vhost_); + if (!info_->edge_ && config_->get_vhost_origin_cluster(req->vhost_) && source->inactive()) { + vector coworkers = config_->get_vhost_coworkers(req->vhost_); for (int i = 0; i < (int)coworkers.size(); i++) { // TODO: FIXME: User may config the server itself as coworker, we must identify and ignore it. string host; @@ -671,7 +654,7 @@ srs_error_t SrsRtmpConn::playing(SrsSharedPtr source) string coworker = coworkers.at(i); string url = "http://" + coworker + "/api/v1/clusters?" + "vhost=" + req->vhost_ + "&ip=" + req->host_ + "&app=" + req->app_ + "&stream=" + req->stream_ + "&coworker=" + coworker; - if ((err = _srs_hooks->discover_co_workers(url, host, port)) != srs_success) { + if ((err = hooks_->discover_co_workers(url, host, port)) != srs_success) { // If failed to discovery stream in this coworker, we should request the next one util the last. // @see https://github.com/ossrs/srs/issues/1223 if (i < (int)coworkers.size() - 1) { @@ -752,14 +735,14 @@ srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr source, SrsLiveC int64_t starttime = -1; // setup the realtime. - realtime_ = _srs_config->get_realtime_enabled(req->vhost_, false); + realtime_ = config_->get_realtime_enabled(req->vhost_, false); // setup the mw config. // when mw_sleep changed, resize the socket send buffer. - mw_msgs_ = _srs_config->get_mw_msgs(req->vhost_, realtime_, false); - mw_sleep_ = _srs_config->get_mw_sleep(req->vhost_); + mw_msgs_ = config_->get_mw_msgs(req->vhost_, realtime_, false); + mw_sleep_ = config_->get_mw_sleep(req->vhost_); transport_->set_socket_buffer(mw_sleep_); // initialize the send_min_interval - send_min_interval_ = _srs_config->get_send_min_interval(req->vhost_); + send_min_interval_ = config_->get_send_min_interval(req->vhost_); srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d", srsu2msi(send_min_interval_), srsu2msi(mw_sleep_), mw_msgs_, realtime_, tcp_nodelay_); @@ -806,12 +789,6 @@ srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr source, SrsLiveC srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d", (int)pprint->age(), count, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(), kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m(), srsu2msi(mw_sleep_), mw_msgs_); - -#ifdef SRS_APM - // TODO: Do not use pithy print for frame span. - ISrsApmSpan *sample = _srs_apm->span("play-frame")->set_kind(SrsApmKindConsumer)->as_child(span.get())->attr("msgs", srs_fmt_sprintf("%d", count))->attr("kbps", srs_fmt_sprintf("%d", kbps_->get_send_kbps_30s())); - srs_freep(sample); -#endif } if (count <= 0) { @@ -871,15 +848,14 @@ srs_error_t SrsRtmpConn::publishing(SrsSharedPtr source) ISrsRequest *req = info_->req_; - if (_srs_config->get_refer_enabled(req->vhost_)) { - if ((err = refer_->check(req->pageUrl_, _srs_config->get_refer_publish(req->vhost_))) != srs_success) { + if (config_->get_refer_enabled(req->vhost_)) { + if ((err = refer_->check(req->pageUrl_, config_->get_refer_publish(req->vhost_))) != srs_success) { return srs_error_wrap(err, "rtmp: referer check"); } } // We must do stat the client before hooks, because hooks depends on it. - SrsStatistic *stat = _srs_stat; - if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info_->type_)) != srs_success) { + if ((err = stat_->on_client(_srs_context->get_id().c_str(), req, this, info_->type_)) != srs_success) { return srs_error_wrap(err, "rtmp: stat client"); } @@ -922,23 +898,19 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr source, SrsPu } // initialize the publish timeout. - publish_1stpkt_timeout_ = _srs_config->get_publish_1stpkt_timeout(req->vhost_); - publish_normal_timeout_ = _srs_config->get_publish_normal_timeout(req->vhost_); - srs_utime_t publish_kickoff_for_idle = _srs_config->get_publish_kickoff_for_idle(req->vhost_); + publish_1stpkt_timeout_ = config_->get_publish_1stpkt_timeout(req->vhost_); + publish_normal_timeout_ = config_->get_publish_normal_timeout(req->vhost_); + srs_utime_t publish_kickoff_for_idle = config_->get_publish_kickoff_for_idle(req->vhost_); // set the sock options. set_sock_options(); if (true) { - bool mr = _srs_config->get_mr_enabled(req->vhost_); - srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost_); + bool mr = config_->get_mr_enabled(req->vhost_); + srs_utime_t mr_sleep = config_->get_mr_sleep(req->vhost_); srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d", mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout_), srsu2msi(publish_normal_timeout_), tcp_nodelay_); } -#ifdef SRS_APM - SrsUniquePtr span(_srs_apm->span("publish-cycle")->set_kind(SrsApmKindProducer)->as_child(span_client_)->attr("timeout", srs_fmt_sprintf("%d", srsu2msi(publish_normal_timeout_)))->end()); -#endif - // Response the start publishing message, let client start to publish messages. if ((err = rtmp_->start_publishing(info_->res_->stream_id_)) != srs_success) { return srs_error_wrap(err, "start publishing"); @@ -980,8 +952,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr source, SrsPu // Update the stat for video fps. // @remark https://github.com/ossrs/srs/issues/851 - SrsStatistic *stat = _srs_stat; - if ((err = stat->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != srs_success) { + if ((err = stat_->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != srs_success) { return srs_error_wrap(err, "rtmp: stat video frames"); } nb_frames = rtrd->nb_video_frames(); @@ -989,18 +960,12 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr source, SrsPu // reportable if (pprint->can_print()) { kbps_->sample(); - bool mr = _srs_config->get_mr_enabled(req->vhost_); - srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost_); + bool mr = config_->get_mr_enabled(req->vhost_); + srs_utime_t mr_sleep = config_->get_mr_sleep(req->vhost_); srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d", (int)pprint->age(), kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(), kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout_), srsu2msi(publish_normal_timeout_)); - -#ifdef SRS_APM - // TODO: Do not use pithy print for frame span. - ISrsApmSpan *sample = _srs_apm->span("publish-frame")->set_kind(SrsApmKindConsumer)->as_child(span.get())->attr("msgs", srs_fmt_sprintf("%" PRId64, nb_frames))->attr("kbps", srs_fmt_sprintf("%d", kbps_->get_recv_kbps_30s())); - srs_freep(sample); -#endif } } @@ -1020,9 +985,9 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr source) // Check whether RTC stream is busy. SrsSharedPtr rtc; - bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); - bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost_); - bool edge = _srs_config->get_vhost_is_edge(req->vhost_); + bool rtc_server_enabled = config_->get_rtc_server_enabled(); + bool rtc_enabled = config_->get_rtc_enabled(req->vhost_); + bool edge = config_->get_vhost_is_edge(req->vhost_); if (rtc_enabled && edge) { rtc_enabled = false; @@ -1030,7 +995,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr source) } if (rtc_server_enabled && rtc_enabled && !info_->edge_) { - if ((err = _srs_rtc_sources->fetch_or_create(req, rtc)) != srs_success) { + if ((err = rtc_sources_->fetch_or_create(req, rtc)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -1040,11 +1005,11 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr source) } // Check whether SRT stream is busy. - bool srt_server_enabled = _srs_config->get_srt_enabled(); - bool srt_enabled = _srs_config->get_srt_enabled(req->vhost_); + bool srt_server_enabled = config_->get_srt_enabled(); + bool srt_enabled = config_->get_srt_enabled(req->vhost_); if (srt_server_enabled && srt_enabled && !info_->edge_) { SrsSharedPtr srt; - if ((err = _srs_srt_sources->fetch_or_create(req, srt)) != srs_success) { + if ((err = srt_sources_->fetch_or_create(req, srt)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -1056,10 +1021,10 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr source) #ifdef SRS_RTSP // RTSP only support viewer, so we don't need to check it. SrsSharedPtr rtsp; - bool rtsp_server_enabled = _srs_config->get_rtsp_server_enabled(); - bool rtsp_enabled = _srs_config->get_rtsp_enabled(req->vhost_); + bool rtsp_server_enabled = config_->get_rtsp_server_enabled(); + bool rtsp_enabled = config_->get_rtsp_enabled(req->vhost_); if (rtsp_server_enabled && rtsp_enabled && !info_->edge_) { - if ((err = _srs_rtsp_sources->fetch_or_create(req, rtsp)) != srs_success) { + if ((err = rtsp_sources_->fetch_or_create(req, rtsp)) != srs_success) { return srs_error_wrap(err, "create source"); } } @@ -1070,7 +1035,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr source) SrsRtmpBridge *bridge = new SrsRtmpBridge(); #if defined(SRS_FFMPEG_FIT) - bool rtmp_to_rtc = _srs_config->get_rtc_from_rtmp(req->vhost_); + bool rtmp_to_rtc = config_->get_rtc_from_rtmp(req->vhost_); if (rtmp_to_rtc && edge) { rtmp_to_rtc = false; srs_warn("disable RTMP to WebRTC for edge vhost=%s", req->vhost_.c_str()); @@ -1082,7 +1047,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr source) #endif #ifdef SRS_RTSP - if (rtsp.get() && _srs_config->get_rtsp_from_rtmp(req->vhost_)) { + if (rtsp.get() && config_->get_rtsp_from_rtmp(req->vhost_)) { bridge->enable_rtmp2rtsp(rtsp); } #endif @@ -1277,7 +1242,7 @@ void SrsRtmpConn::set_sock_options() { ISrsRequest *req = info_->req_; - bool nvalue = _srs_config->get_tcp_nodelay(req->vhost_); + bool nvalue = config_->get_tcp_nodelay(req->vhost_); if (nvalue != tcp_nodelay_) { tcp_nodelay_ = nvalue; @@ -1296,7 +1261,7 @@ srs_error_t SrsRtmpConn::check_edge_token_traverse_auth() ISrsRequest *req = info_->req_; srs_assert(req); - vector args = _srs_config->get_vhost_edge_origin(req->vhost_)->args_; + vector args = config_->get_vhost_edge_origin(req->vhost_)->args_; if (args.empty()) { return err; } @@ -1364,7 +1329,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_connect() ISrsRequest *req = info_->req_; - if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) { return err; } @@ -1374,7 +1339,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_connect() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_connect(req->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_connect(req->vhost_); if (!conf) { return err; @@ -1385,7 +1350,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_connect() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - if ((err = _srs_hooks->on_connect(url, req)) != srs_success) { + if ((err = hooks_->on_connect(url, req)) != srs_success) { return srs_error_wrap(err, "rtmp on_connect %s", url.c_str()); } } @@ -1397,7 +1362,7 @@ void SrsRtmpConn::http_hooks_on_close() { ISrsRequest *req = info_->req_; - if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) { return; } @@ -1407,7 +1372,7 @@ void SrsRtmpConn::http_hooks_on_close() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_close(req->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_close(req->vhost_); if (!conf) { return; @@ -1418,7 +1383,7 @@ void SrsRtmpConn::http_hooks_on_close() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - _srs_hooks->on_close(url, req, transport_->get_send_bytes(), transport_->get_recv_bytes()); + hooks_->on_close(url, req, transport_->get_send_bytes(), transport_->get_recv_bytes()); } } @@ -1428,7 +1393,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_publish() ISrsRequest *req = info_->req_; - if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) { return err; } @@ -1438,7 +1403,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_publish() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_publish(req->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_publish(req->vhost_); if (!conf) { return err; @@ -1449,7 +1414,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_publish() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - if ((err = _srs_hooks->on_publish(url, req)) != srs_success) { + if ((err = hooks_->on_publish(url, req)) != srs_success) { return srs_error_wrap(err, "rtmp on_publish %s", url.c_str()); } } @@ -1461,7 +1426,7 @@ void SrsRtmpConn::http_hooks_on_unpublish() { ISrsRequest *req = info_->req_; - if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) { return; } @@ -1471,7 +1436,7 @@ void SrsRtmpConn::http_hooks_on_unpublish() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_unpublish(req->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_unpublish(req->vhost_); if (!conf) { return; @@ -1482,7 +1447,7 @@ void SrsRtmpConn::http_hooks_on_unpublish() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - _srs_hooks->on_unpublish(url, req); + hooks_->on_unpublish(url, req); } } @@ -1492,7 +1457,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_play() ISrsRequest *req = info_->req_; - if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) { return err; } @@ -1502,7 +1467,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_play() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_play(req->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_play(req->vhost_); if (!conf) { return err; @@ -1513,7 +1478,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_play() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - if ((err = _srs_hooks->on_play(url, req)) != srs_success) { + if ((err = hooks_->on_play(url, req)) != srs_success) { return srs_error_wrap(err, "rtmp on_play %s", url.c_str()); } } @@ -1525,7 +1490,7 @@ void SrsRtmpConn::http_hooks_on_stop() { ISrsRequest *req = info_->req_; - if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) { return; } @@ -1535,7 +1500,7 @@ void SrsRtmpConn::http_hooks_on_stop() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_stop(req->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_stop(req->vhost_); if (!conf) { return; @@ -1546,7 +1511,7 @@ void SrsRtmpConn::http_hooks_on_stop() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - _srs_hooks->on_stop(url, req); + hooks_->on_stop(url, req); } return; @@ -1571,9 +1536,8 @@ srs_error_t SrsRtmpConn::cycle() err = do_cycle(); // Update statistic when done. - SrsStatistic *stat = _srs_stat; - stat->kbps_add_delta(get_id().c_str(), delta_); - stat->on_disconnect(get_id().c_str(), err); + stat_->kbps_add_delta(get_id().c_str(), delta_); + stat_->on_disconnect(get_id().c_str(), err); // Notify manager to remove it. // Note that we create this object, so we use manager to remove it. diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 8310cc911..0a3ec4a65 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -39,12 +39,23 @@ class ISrsWakable; class SrsRtmpCommonMessage; class SrsRtmpCommand; class SrsNetworkDelta; - +class ISrsAppConfig; class SrsSslConnection; +class ISrsResourceManager; +class ISrsStreamPublishTokenManager; +class ISrsLiveSourceManager; +class ISrsStatistic; +class ISrsHttpHooks; +class ISrsRtcSourceManager; +class ISrsSrtSourceManager; +class ISrsRtspSourceManager; // The simple rtmp client for SRS. class SrsSimpleRtmpClient : public SrsBasicRtmpClient { +private: + ISrsAppConfig *config_; + public: SrsSimpleRtmpClient(std::string u, srs_utime_t ctm, srs_utime_t stm); virtual ~SrsSimpleRtmpClient(); @@ -103,6 +114,9 @@ public: // The SSL/TLS transport layer for RTMPS connections. class SrsRtmpsTransport : public SrsRtmpTransport { +private: + ISrsAppConfig *config_; + private: SrsSslConnection *ssl_; @@ -124,6 +138,17 @@ class SrsRtmpConn : public ISrsConnection, public ISrsStartable, public ISrsRelo // For the thread to directly access any field of connection. friend class SrsPublishRecvThread; +private: + ISrsResourceManager *manager_; + ISrsAppConfig *config_; + ISrsStreamPublishTokenManager *stream_publish_tokens_; + ISrsLiveSourceManager *live_sources_; + ISrsStatistic *stat_; + ISrsHttpHooks *hooks_; + ISrsRtcSourceManager *rtc_sources_; + ISrsSrtSourceManager *srt_sources_; + ISrsRtspSourceManager *rtsp_sources_; + private: SrsServer *server_; SrsRtmpServer *rtmp_; @@ -158,8 +183,6 @@ private: // Each connection start a green thread, // when thread stop, the connection will be delete by server. ISrsCoroutine *trd_; - // The manager object to manage the connection. - ISrsResourceManager *manager_; // The ip and port of client. std::string ip_; int port_; @@ -172,6 +195,7 @@ private: public: SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, std::string cip, int port); + void assemble(); virtual ~SrsRtmpConn(); // Interface ISrsResource. public: diff --git a/trunk/src/app/srs_app_rtmp_source.hpp b/trunk/src/app/srs_app_rtmp_source.hpp index 1fa40bbfd..35f492a8f 100644 --- a/trunk/src/app/srs_app_rtmp_source.hpp +++ b/trunk/src/app/srs_app_rtmp_source.hpp @@ -529,6 +529,10 @@ public: virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr &pps) = 0; // Get the exists source, NULL when not exists. virtual SrsSharedPtr fetch(ISrsRequest *r) = 0; + +public: + virtual void dispose() = 0; + virtual srs_error_t initialize() = 0; }; // The source manager to create and refresh all stream sources. diff --git a/trunk/src/app/srs_app_rtsp_source.cpp b/trunk/src/app/srs_app_rtsp_source.cpp index 3058cde5a..21bb5698f 100644 --- a/trunk/src/app/srs_app_rtsp_source.cpp +++ b/trunk/src/app/srs_app_rtsp_source.cpp @@ -111,6 +111,14 @@ void SrsRtspConsumer::on_stream_change(SrsRtcSourceDescription *desc) } } +ISrsRtspSourceManager::ISrsRtspSourceManager() +{ +} + +ISrsRtspSourceManager::~ISrsRtspSourceManager() +{ +} + SrsRtspSourceManager::SrsRtspSourceManager() { lock_ = srs_mutex_new(); diff --git a/trunk/src/app/srs_app_rtsp_source.hpp b/trunk/src/app/srs_app_rtsp_source.hpp index 1aad6dc1d..8a074aa54 100644 --- a/trunk/src/app/srs_app_rtsp_source.hpp +++ b/trunk/src/app/srs_app_rtsp_source.hpp @@ -67,7 +67,21 @@ public: void on_stream_change(SrsRtcSourceDescription *desc); }; -class SrsRtspSourceManager : public ISrsHourGlassHandler +// The RTSP source manager interface. +class ISrsRtspSourceManager +{ +public: + ISrsRtspSourceManager(); + virtual ~ISrsRtspSourceManager(); + +public: + virtual srs_error_t initialize() = 0; + virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr &pps) = 0; + virtual SrsSharedPtr fetch(ISrsRequest *r) = 0; +}; + +// The RTSP source manager. +class SrsRtspSourceManager : public ISrsHourGlassHandler, public ISrsRtspSourceManager { private: srs_mutex_t lock_; diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 06cc1d015..63503fc3f 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -62,6 +62,7 @@ using namespace std; #include #include #endif +#include SrsServer *_srs_server = NULL; @@ -186,6 +187,22 @@ SrsServer::SrsServer() rtc_session_manager_ = new SrsRtcSessionManager(); config_ = _srs_config; + live_sources_ = _srs_sources; + conn_manager_ = _srs_conn_manager; + rtc_dtls_certificate_ = _srs_rtc_dtls_certificate; + dvr_async_ = _srs_dvr_async; + circuit_breaker_ = _srs_circuit_breaker; + srt_sources_ = _srs_srt_sources; + rtc_sources_ = _srs_rtc_sources; +#ifdef SRS_RTSP + rtsp_sources_ = _srs_rtsp_sources; +#endif +#ifdef SRS_GB28181 + gb_manager_ = _srs_gb_manager; +#endif + log_ = _srs_log; + stat_ = _srs_stat; + app_factory_ = _srs_app_factory; } SrsServer::~SrsServer() @@ -238,6 +255,20 @@ SrsServer::~SrsServer() srs_freep(rtc_session_manager_); config_ = NULL; + live_sources_ = NULL; + conn_manager_ = NULL; + rtc_dtls_certificate_ = NULL; + dvr_async_ = NULL; + circuit_breaker_ = NULL; + srt_sources_ = NULL; + rtc_sources_ = NULL; + rtsp_sources_ = NULL; +#ifdef SRS_GB28181 + gb_manager_ = NULL; +#endif + log_ = NULL; + stat_ = NULL; + app_factory_ = NULL; } void SrsServer::dispose() @@ -267,7 +298,7 @@ void SrsServer::dispose() ingester_->dispose(); // dispose the source for hls and dvr. - _srs_sources->dispose(); + live_sources_->dispose(); // @remark don't dispose all connections, for too slow. } @@ -307,17 +338,17 @@ void SrsServer::gracefully_dispose() // Wait for connections to quit. // While gracefully quiting, user can requires SRS to fast quit. int wait_step = 1; - while (!_srs_conn_manager->empty() && !signal_fast_quit_) { - for (int i = 0; i < wait_step && !_srs_conn_manager->empty() && !signal_fast_quit_; i++) { + while (!conn_manager_->empty() && !signal_fast_quit_) { + for (int i = 0; i < wait_step && !conn_manager_->empty() && !signal_fast_quit_; i++) { srs_usleep(1000 * SRS_UTIME_MILLISECONDS); } wait_step = (wait_step * 2) % 33; - srs_trace("wait for %d conns to quit", (int)_srs_conn_manager->size()); + srs_trace("wait for %d conns to quit", (int)conn_manager_->size()); } // dispose the source for hls and dvr. - _srs_sources->dispose(); + live_sources_->dispose(); srs_trace("source disposed"); srs_usleep(config_->get_grace_final_wait()); @@ -355,12 +386,12 @@ srs_error_t SrsServer::initialize() } // Initialize WebRTC DTLS certificate - if ((err = _srs_rtc_dtls_certificate->initialize()) != srs_success) { + if ((err = rtc_dtls_certificate_->initialize()) != srs_success) { return srs_error_wrap(err, "rtc dtls certificate initialize"); } // Start the DVR async call. - if ((err = _srs_dvr_async->start()) != srs_success) { + if ((err = dvr_async_->start()) != srs_success) { return srs_error_wrap(err, "dvr async"); } @@ -431,7 +462,7 @@ srs_error_t SrsServer::run() srs_error_t err = srs_success; // Circuit breaker to protect server, which depends on server. - if ((err = _srs_circuit_breaker->initialize()) != srs_success) { + if ((err = circuit_breaker_->initialize()) != srs_success) { return srs_error_wrap(err, "init circuit breaker"); } @@ -460,20 +491,20 @@ srs_error_t SrsServer::run() return srs_error_wrap(err, "ingest"); } - if ((err = _srs_sources->initialize()) != srs_success) { + if ((err = live_sources_->initialize()) != srs_success) { return srs_error_wrap(err, "live sources"); } - if ((err = _srs_srt_sources->initialize()) != srs_success) { + if ((err = srt_sources_->initialize()) != srs_success) { return srs_error_wrap(err, "srt sources"); } - if ((err = _srs_rtc_sources->initialize()) != srs_success) { + if ((err = rtc_sources_->initialize()) != srs_success) { return srs_error_wrap(err, "rtc sources"); } #ifdef SRS_RTSP - if ((err = _srs_rtsp_sources->initialize()) != srs_success) { + if ((err = rtsp_sources_->initialize()) != srs_success) { return srs_error_wrap(err, "rtsp sources"); } #endif @@ -483,7 +514,7 @@ srs_error_t SrsServer::run() } #ifdef SRS_GB28181 - if ((err = _srs_gb_manager->start()) != srs_success) { + if ((err = gb_manager_->start()) != srs_success) { return srs_error_wrap(err, "start manager"); } #endif @@ -659,7 +690,7 @@ srs_error_t SrsServer::listen() return srs_error_wrap(err, "rtc udp listen"); } - if ((err = _srs_conn_manager->start()) != srs_success) { + if ((err = conn_manager_->start()) != srs_success) { return srs_error_wrap(err, "connection manager"); } @@ -873,7 +904,7 @@ void SrsServer::on_signal(int signo) #ifndef SRS_GPERF_MC if (signo == SRS_SIGNAL_REOPEN_LOG) { - _srs_log->reopen(); + log_->reopen(); srs_warn("reopen log file, signo=%d", signo); return; @@ -924,6 +955,72 @@ srs_error_t _srs_reload_err; SrsReloadState _srs_reload_state; std::string _srs_reload_id; +srs_error_t SrsServer::do2_cycle() +{ + srs_error_t err = srs_success; + + // gracefully quit for SIGINT or SIGTERM or SIGQUIT. + if (signal_fast_quit_ || signal_gracefully_quit_) { + srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit_, signal_gracefully_quit_); + return err; + } + + // for gperf heap checker, + // @see: research/gperftools/heap-checker/heap_checker.cc + // if user interrupt the program, exit to check mem leak. + // but, if gperf, use reload to ensure main return normally, + // because directly exit will cause core-dump. +#ifdef SRS_GPERF_MC + if (signal_gmc_stop_) { + srs_warn("gmc got singal to stop server."); + return err; + } +#endif + + // do persistence config to file. + if (signal_persistence_config_) { + signal_persistence_config_ = false; + srs_info("get signal to persistence config to file."); + + if ((err = config_->persistence()) != srs_success) { + return srs_error_wrap(err, "config persistence to file"); + } + srs_trace("persistence config to file success."); + } + + // do reload the config. + if (signal_reload_) { + signal_reload_ = false; + srs_trace("starting reload config."); + + SrsReloadState state = SrsReloadStateInit; + _srs_reload_state = SrsReloadStateInit; + srs_freep(_srs_reload_err); + SrsRand rand; + _srs_reload_id = rand.gen_str(7); + err = config_->reload(&state); + _srs_reload_state = state; + _srs_reload_err = srs_error_copy(err); + if (err != srs_success) { + // If the parsing and transformation of the configuration fail, we can tolerate it by simply + // ignoring the new configuration and continuing to use the current one. However, if the + // application of the new configuration fails, some configurations may be applied while + // others may not. For instance, the listening port may be closed when the configuration + // is set to listen on an unavailable port. In such cases, we should terminate the service. + if (state == SrsReloadStateApplying) { + return srs_error_wrap(err, "reload fatal error state=%d", state); + } + + srs_warn("reload failed, state=%d, err %s", state, srs_error_desc(err).c_str()); + srs_freep(err); + } else { + srs_trace("reload config success, state=%d.", state); + } + } + + return err; +} + srs_error_t SrsServer::do_cycle() { srs_error_t err = srs_success; @@ -937,63 +1034,8 @@ srs_error_t SrsServer::do_cycle() return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid_, ::getppid()); } - // gracefully quit for SIGINT or SIGTERM or SIGQUIT. - if (signal_fast_quit_ || signal_gracefully_quit_) { - srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit_, signal_gracefully_quit_); - return err; - } - - // for gperf heap checker, - // @see: research/gperftools/heap-checker/heap_checker.cc - // if user interrupt the program, exit to check mem leak. - // but, if gperf, use reload to ensure main return normally, - // because directly exit will cause core-dump. -#ifdef SRS_GPERF_MC - if (signal_gmc_stop_) { - srs_warn("gmc got singal to stop server."); - return err; - } -#endif - - // do persistence config to file. - if (signal_persistence_config_) { - signal_persistence_config_ = false; - srs_info("get signal to persistence config to file."); - - if ((err = config_->persistence()) != srs_success) { - return srs_error_wrap(err, "config persistence to file"); - } - srs_trace("persistence config to file success."); - } - - // do reload the config. - if (signal_reload_) { - signal_reload_ = false; - srs_trace("starting reload config."); - - SrsReloadState state = SrsReloadStateInit; - _srs_reload_state = SrsReloadStateInit; - srs_freep(_srs_reload_err); - SrsRand rand; - _srs_reload_id = rand.gen_str(7); - err = config_->reload(&state); - _srs_reload_state = state; - _srs_reload_err = srs_error_copy(err); - if (err != srs_success) { - // If the parsing and transformation of the configuration fail, we can tolerate it by simply - // ignoring the new configuration and continuing to use the current one. However, if the - // application of the new configuration fails, some configurations may be applied while - // others may not. For instance, the listening port may be closed when the configuration - // is set to listen on an unavailable port. In such cases, we should terminate the service. - if (state == SrsReloadStateApplying) { - return srs_error_wrap(err, "reload fatal error state=%d", state); - } - - srs_warn("reload failed, state=%d, err %s", state, srs_error_desc(err).c_str()); - srs_freep(err); - } else { - srs_trace("reload config success, state=%d.", state); - } + if ((err = do2_cycle()) != srs_success) { + return srs_error_wrap(err, "cycle"); } srs_usleep(1 * SRS_UTIME_SECONDS); @@ -1007,7 +1049,7 @@ srs_error_t SrsServer::setup_ticks() srs_error_t err = srs_success; srs_freep(timer_); - timer_ = new SrsHourGlass("srs", this, 1 * SRS_UTIME_SECONDS); + timer_ = app_factory_->create_hourglass("srs", this, 1 * SRS_UTIME_SECONDS); if (config_->get_stats_enabled()) { if ((err = timer_->tick(2, 3 * SRS_UTIME_SECONDS)) != srs_success) { @@ -1098,47 +1140,45 @@ srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick) void SrsServer::resample_kbps() { - SrsStatistic *stat = _srs_stat; - // collect delta from all clients. - for (int i = 0; i < (int)_srs_conn_manager->size(); i++) { - ISrsResource *c = _srs_conn_manager->at(i); + for (int i = 0; i < (int)conn_manager_->size(); i++) { + ISrsResource *c = conn_manager_->at(i); SrsRtmpConn *rtmp = dynamic_cast(c); if (rtmp) { - stat->kbps_add_delta(c->get_id().c_str(), rtmp->delta()); + stat_->kbps_add_delta(c->get_id().c_str(), rtmp->delta()); continue; } SrsHttpxConn *httpx = dynamic_cast(c); if (httpx) { - stat->kbps_add_delta(c->get_id().c_str(), httpx->delta()); + stat_->kbps_add_delta(c->get_id().c_str(), httpx->delta()); continue; } #ifdef SRS_RTSP SrsRtspConnection *rtsp = dynamic_cast(c); if (rtsp) { - stat->kbps_add_delta(c->get_id().c_str(), rtsp->delta()); + stat_->kbps_add_delta(c->get_id().c_str(), rtsp->delta()); continue; } #endif SrsRtcTcpConn *tcp = dynamic_cast(c); if (tcp) { - stat->kbps_add_delta(c->get_id().c_str(), tcp->delta()); + stat_->kbps_add_delta(c->get_id().c_str(), tcp->delta()); continue; } SrsMpegtsSrtConn *srt = dynamic_cast(c); if (srt) { - stat->kbps_add_delta(c->get_id().c_str(), srt->delta()); + stat_->kbps_add_delta(c->get_id().c_str(), srt->delta()); continue; } SrsRtcConnection *rtc = dynamic_cast(c); if (rtc) { - stat->kbps_add_delta(c->get_id().c_str(), rtc->delta()); + stat_->kbps_add_delta(c->get_id().c_str(), rtc->delta()); continue; } @@ -1147,7 +1187,7 @@ void SrsServer::resample_kbps() } // Update the global server level statistics. - stat->kbps_sample(); + stat_->kbps_sample(); } srs_error_t SrsServer::listen_srt_mpegts() @@ -1212,9 +1252,9 @@ srs_error_t SrsServer::accept_srt_client(srs_srt_t srt_fd) srs_assert(resource); // directly enqueue, the cycle thread will remove the client. - _srs_conn_manager->add(resource); + conn_manager_->add(resource); - // Note that conn is managed by _srs_conn_manager, so we don't need to free it. + // Note that conn is managed by conn_manager_, so we don't need to free it. ISrsStartable *conn = dynamic_cast(resource); if ((err = conn->start()) != srs_success) { return srs_error_wrap(err, "start srt conn coroutine"); @@ -1242,7 +1282,7 @@ srs_error_t SrsServer::srt_fd_to_resource(srs_srt_t srt_fd, ISrsResource **pr) SrsContextRestore(_srs_context->get_id()); // Convert to SRT connection. - *pr = new SrsMpegtsSrtConn(_srs_conn_manager, srt_fd, ip, port); + *pr = new SrsMpegtsSrtConn(conn_manager_, srt_fd, ip, port); return err; } @@ -1459,7 +1499,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf } else { string key = listener == https_listener_ ? config_->get_https_stream_ssl_key() : ""; string cert = listener == https_listener_ ? config_->get_https_stream_ssl_cert() : ""; - resource = new SrsHttpxConn(_srs_conn_manager, io, http_server_, ip, port, key, cert); + resource = new SrsHttpxConn(conn_manager_, io, http_server_, ip, port, key, cert); } } @@ -1467,27 +1507,31 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf if (!resource) { if (listener == rtmp_listener_) { SrsRtmpTransport *transport = new SrsRtmpTransport(stfd2); - resource = new SrsRtmpConn(this, transport, ip, port); + SrsRtmpConn *conn = new SrsRtmpConn(this, transport, ip, port); + conn->assemble(); + resource = conn; } else if (listener == rtmps_listener_) { SrsRtmpTransport *transport = new SrsRtmpsTransport(stfd2); - resource = new SrsRtmpConn(this, transport, ip, port); + SrsRtmpConn *conn = new SrsRtmpConn(this, transport, ip, port); + conn->assemble(); + resource = conn; } else if (listener == api_listener_ || listener == apis_listener_) { string key = listener == apis_listener_ ? config_->get_https_api_ssl_key() : ""; string cert = listener == apis_listener_ ? config_->get_https_api_ssl_cert() : ""; - resource = new SrsHttpxConn(_srs_conn_manager, new SrsTcpConnection(stfd2), http_api_mux_, ip, port, key, cert); + resource = new SrsHttpxConn(conn_manager_, new SrsTcpConnection(stfd2), http_api_mux_, ip, port, key, cert); } else if (listener == http_listener_ || listener == https_listener_) { string key = listener == https_listener_ ? config_->get_https_stream_ssl_key() : ""; string cert = listener == https_listener_ ? config_->get_https_stream_ssl_cert() : ""; - resource = new SrsHttpxConn(_srs_conn_manager, new SrsTcpConnection(stfd2), http_server_, ip, port, key, cert); + resource = new SrsHttpxConn(conn_manager_, new SrsTcpConnection(stfd2), http_server_, ip, port, key, cert); } else if (listener == webrtc_listener_) { resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port); #ifdef SRS_RTSP } else if (listener == rtsp_listener_) { - resource = new SrsRtspConnection(_srs_conn_manager, new SrsTcpConnection(stfd2), ip, port); + resource = new SrsRtspConnection(conn_manager_, new SrsTcpConnection(stfd2), ip, port); #endif } else if (listener == exporter_listener_) { // TODO: FIXME: Maybe should support https metrics. - resource = new SrsHttpxConn(_srs_conn_manager, new SrsTcpConnection(stfd2), http_api_mux_, ip, port, "", ""); + resource = new SrsHttpxConn(conn_manager_, new SrsTcpConnection(stfd2), http_api_mux_, ip, port, "", ""); } else { srs_close_stfd(stfd2); srs_warn("Close for invalid fd=%d, ip=%s:%d", fd, ip.c_str(), port); @@ -1499,7 +1543,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf SrsRtcTcpConn *raw_conn = dynamic_cast(resource); if (raw_conn) { SrsSharedResource *conn = new SrsSharedResource(raw_conn); - SrsExecutorCoroutine *executor = new SrsExecutorCoroutine(_srs_conn_manager, conn, raw_conn, raw_conn); + SrsExecutorCoroutine *executor = new SrsExecutorCoroutine(conn_manager_, conn, raw_conn, raw_conn); raw_conn->setup_owner(conn, executor, executor); if ((err = executor->start()) != srs_success) { srs_freep(executor); @@ -1510,7 +1554,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf // Use connection manager to manage all the resources. srs_assert(resource); - _srs_conn_manager->add(resource); + conn_manager_->add(resource); // If connection is a resource to start, start a coroutine to handle it. // Note that conn is managed by conn_manager, so we don't need to free it. @@ -1530,9 +1574,9 @@ srs_error_t SrsServer::on_before_connection(const char *label, int fd, const std // Failed if exceed the connection limitation. int max_connections = config_->get_max_connections(); - if ((int)_srs_conn_manager->size() >= max_connections) { + if ((int)conn_manager_->size() >= max_connections) { return srs_error_new(ERROR_EXCEED_CONNECTIONS, "drop %s fd=%d, ip=%s:%d, max=%d, cur=%d for exceed connection limits", - label, fd, ip.c_str(), port, max_connections, (int)_srs_conn_manager->size()); + label, fd, ip.c_str(), port, max_connections, (int)conn_manager_->size()); } return err; @@ -1697,12 +1741,16 @@ SrsInotifyWorker::SrsInotifyWorker(SrsServer *s) server_ = s; trd_ = new SrsSTCoroutine("inotify", this); inotify_fd_ = NULL; + + config_ = _srs_config; } SrsInotifyWorker::~SrsInotifyWorker() { srs_freep(trd_); srs_close_stfd(inotify_fd_); + + config_ = NULL; } srs_error_t SrsInotifyWorker::start() diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index c75029a62..19ebd74ba 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -57,6 +57,18 @@ class SrsSrtEventLoop; class SrsRtcSessionManager; class SrsPidFileLocker; class ISrsAppConfig; +class ISrsLiveSourceManager; +class ISrsResourceManager; +class ISrsDtlsCertificate; +class ISrsAsyncCallWorker; +class ISrsCircuitBreaker; +class ISrsSrtSourceManager; +class ISrsRtcSourceManager; +class ISrsRtspSourceManager; +class ISrsLog; +class ISrsStatistic; +class ISrsHourGlass; +class SrsAppFactory; // Initialize global shared variables cross all threads. extern srs_error_t srs_global_initialize(); @@ -73,6 +85,20 @@ class SrsServer : public ISrsReloadHandler, // Reload framework for permormance { private: ISrsAppConfig *config_; + ISrsLiveSourceManager *live_sources_; + ISrsResourceManager *conn_manager_; + ISrsDtlsCertificate *rtc_dtls_certificate_; + ISrsAsyncCallWorker *dvr_async_; + ISrsCircuitBreaker *circuit_breaker_; + ISrsSrtSourceManager *srt_sources_; + ISrsRtcSourceManager *rtc_sources_; + ISrsRtspSourceManager *rtsp_sources_; +#ifdef SRS_GB28181 + ISrsResourceManager *gb_manager_; +#endif + ISrsLog *log_; + ISrsStatistic *stat_; + SrsAppFactory *app_factory_; private: ISrsHttpServeMux *http_api_mux_; @@ -81,7 +107,7 @@ private: private: SrsHttpHeartbeat *http_heartbeat_; SrsIngester *ingester_; - SrsHourGlass *timer_; + ISrsHourGlass *timer_; private: // PID file manager for process identification and locking. @@ -210,6 +236,7 @@ private: // update the global static data, for instance, the current time, // the cpu/mem/network statistic. virtual srs_error_t do_cycle(); + virtual srs_error_t do2_cycle(); // interface ISrsHourGlassHandler private: @@ -298,6 +325,9 @@ private: // @see https://github.com/ossrs/srs/issues/1635 class SrsInotifyWorker : public ISrsCoroutineHandler { +private: + ISrsAppConfig *config_; + private: SrsServer *server_; ISrsCoroutine *trd_; @@ -319,7 +349,7 @@ class SrsPidFileLocker { private: ISrsAppConfig *config_; - + private: int pid_fd_; std::string pid_file_; diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index 6e762a039..3ef88dc60 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -674,67 +674,6 @@ void SrsStatistic::dumps_hints_kv(std::stringstream &ss) } } -#ifdef SRS_APM -void SrsStatistic::dumps_cls_summaries(SrsClsSugar *sugar) -{ - if (!vhosts_.empty()) { - sugar->kv("vhosts", srs_fmt_sprintf("%d", (int)vhosts_.size())); - } - if (!streams_.empty()) { - sugar->kv("streams", srs_fmt_sprintf("%d", (int)streams_.size())); - } - if (!clients_.empty()) { - sugar->kv("clients", srs_fmt_sprintf("%d", (int)clients_.size())); - } -} - -void SrsStatistic::dumps_cls_streams(SrsClsSugars *sugars) -{ - for (std::map::iterator it = streams_.begin(); it != streams_.end(); ++it) { - SrsStatisticStream *stream = it->second; - if (!stream->active_ || !stream->nb_clients_) { - continue; - } - - SrsClsSugar *sugar = sugars->create(); - sugar->kv("hint", "stream"); - sugar->kv("version", RTMP_SIG_SRS_VERSION); - sugar->kv("pid", srs_fmt_sprintf("%d", getpid())); - - sugar->kv("sid", stream->id_); - sugar->kv("url", stream->url_); - - if (stream->frames_->r30s()) { - sugar->kv("fps", srs_fmt_sprintf("%d", stream->frames_->r30s())); - } - if (stream->width_) { - sugar->kv("width", srs_fmt_sprintf("%d", stream->width_)); - } - if (stream->height_) { - sugar->kv("height", srs_fmt_sprintf("%d", stream->height_)); - } - - SrsStatisticClient *pub = find_client(stream->publisher_id_); - if (pub) { - if (pub->kbps_->get_recv_kbps_30s()) { - sugar->kv("recv", srs_fmt_sprintf("%d", pub->kbps_->get_recv_kbps_30s())); - } - if (pub->kbps_->get_send_kbps_30s()) { - sugar->kv("send", srs_fmt_sprintf("%d", pub->kbps_->get_send_kbps_30s())); - } - } - - sugar->kv("clients", srs_fmt_sprintf("%d", stream->nb_clients_)); - if (stream->kbps_->get_recv_kbps_30s()) { - sugar->kv("recv2", srs_fmt_sprintf("%d", stream->kbps_->get_recv_kbps_30s())); - } - if (stream->kbps_->get_send_kbps_30s()) { - sugar->kv("send2", srs_fmt_sprintf("%d", stream->kbps_->get_send_kbps_30s())); - } - } -} -#endif - SrsStatisticVhost *SrsStatistic::create_vhost(ISrsRequest *req) { SrsStatisticVhost *vhost = NULL; diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index d285290d6..85d5cee23 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -145,6 +145,9 @@ public: SrsAudioChannels asound_type, SrsAacObjectType aac_object) = 0; virtual void on_stream_publish(ISrsRequest *req, std::string publisher_id) = 0; virtual void on_stream_close(ISrsRequest *req) = 0; + virtual void kbps_add_delta(std::string id, ISrsKbpsDelta *delta) = 0; + virtual void kbps_sample() = 0; + virtual srs_error_t on_video_frames(ISrsRequest *req, int nb_frames) = 0; }; // The global statistic instance. @@ -255,12 +258,6 @@ public: virtual srs_error_t dumps_clients(SrsJsonArray *arr, int start, int count); // Dumps the hints about SRS server. void dumps_hints_kv(std::stringstream &ss); -#ifdef SRS_APM -public: - // Dumps the CLS summary. - void dumps_cls_summaries(SrsClsSugar *sugar); - void dumps_cls_streams(SrsClsSugars *sugars); -#endif private: virtual SrsStatisticVhost *create_vhost(ISrsRequest *req); virtual SrsStatisticStream *create_stream(SrsStatisticVhost *vhost, ISrsRequest *req); diff --git a/trunk/src/app/srs_app_stream_token.cpp b/trunk/src/app/srs_app_stream_token.cpp index 2c0b2588e..9ef24c9f6 100644 --- a/trunk/src/app/srs_app_stream_token.cpp +++ b/trunk/src/app/srs_app_stream_token.cpp @@ -56,6 +56,14 @@ void SrsStreamPublishToken::set_publisher_cid(const SrsContextId &cid) publisher_cid_ = cid; } +ISrsStreamPublishTokenManager::ISrsStreamPublishTokenManager() +{ +} + +ISrsStreamPublishTokenManager::~ISrsStreamPublishTokenManager() +{ +} + SrsStreamPublishTokenManager::SrsStreamPublishTokenManager() { mutex_ = srs_mutex_new(); diff --git a/trunk/src/app/srs_app_stream_token.hpp b/trunk/src/app/srs_app_stream_token.hpp index 2e56ab8b6..af8867fa5 100644 --- a/trunk/src/app/srs_app_stream_token.hpp +++ b/trunk/src/app/srs_app_stream_token.hpp @@ -53,10 +53,22 @@ public: void set_publisher_cid(const SrsContextId &cid); }; +// The interface for stream publish token manager +class ISrsStreamPublishTokenManager +{ +public: + ISrsStreamPublishTokenManager(); + virtual ~ISrsStreamPublishTokenManager(); + +public: + virtual srs_error_t acquire_token(ISrsRequest *req, SrsStreamPublishToken *&token) = 0; + virtual void release_token(const std::string &stream_url) = 0; +}; + // The global stream publish token manager ensures only one publisher // can acquire a token for a given stream URL at any time. // This prevents race conditions across all protocols. -class SrsStreamPublishTokenManager +class SrsStreamPublishTokenManager : public ISrsStreamPublishTokenManager { private: // Map of stream URL to token diff --git a/trunk/src/kernel/srs_kernel_resource.hpp b/trunk/src/kernel/srs_kernel_resource.hpp index e64ca8ebc..28058fe44 100644 --- a/trunk/src/kernel/srs_kernel_resource.hpp +++ b/trunk/src/kernel/srs_kernel_resource.hpp @@ -82,6 +82,20 @@ public: ISrsResourceManager(); virtual ~ISrsResourceManager(); +public: + // Start the resource manager. + virtual srs_error_t start() = 0; + // Check if the resource manager is empty. + virtual bool empty() = 0; + // Get the number of resources. + virtual size_t size() = 0; + +public: + // Add a resource to the manager. + virtual void add(ISrsResource *conn, bool *exists = NULL) = 0; + // Get resource at specified index. + virtual ISrsResource *at(int index) = 0; + public: // Remove then free the specified connection. Note that the manager always free c resource, // in the same coroutine or another coroutine. Some manager may support add c to a map, it diff --git a/trunk/src/utest/srs_utest.cpp b/trunk/src/utest/srs_utest.cpp index f047a4bb5..5e866a625 100644 --- a/trunk/src/utest/srs_utest.cpp +++ b/trunk/src/utest/srs_utest.cpp @@ -91,6 +91,9 @@ srs_error_t prepare_main() // Prevent the output of srt logs in utest. srt_setloghandler(NULL, srs_srt_utest_null_log_handler); + // Set SRT log level to FATAL to suppress ERROR and WARNING logs in unit tests. + // LOG_CRIT (2) is the highest level that suppresses most logs. + srt_setloglevel(LOG_CRIT); _srt_eventloop = new SrsSrtEventLoop(); if ((err = _srt_eventloop->initialize()) != srs_success) { diff --git a/trunk/src/utest/srs_utest_app10.cpp b/trunk/src/utest/srs_utest_app10.cpp new file mode 100644 index 000000000..8075964ac --- /dev/null +++ b/trunk/src/utest/srs_utest_app10.cpp @@ -0,0 +1,1004 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// +#include + +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include + +// Mock config implementation for SrsServer::listen() testing +MockAppConfigForServerListen::MockAppConfigForServerListen() +{ + rtmps_enabled_ = false; + http_api_enabled_ = false; + https_api_enabled_ = false; + http_stream_enabled_ = false; + https_stream_enabled_ = false; + rtc_server_tcp_enabled_ = false; + rtc_server_protocol_ = "udp"; + rtsp_server_enabled_ = false; + exporter_enabled_ = false; +} + +MockAppConfigForServerListen::~MockAppConfigForServerListen() +{ +} + +std::vector MockAppConfigForServerListen::get_listens() +{ + return rtmp_listens_; +} + +bool MockAppConfigForServerListen::get_rtmps_enabled() +{ + return rtmps_enabled_; +} + +std::vector MockAppConfigForServerListen::get_rtmps_listen() +{ + return rtmps_listens_; +} + +bool MockAppConfigForServerListen::get_http_api_enabled() +{ + return http_api_enabled_; +} + +std::vector MockAppConfigForServerListen::get_http_api_listens() +{ + return http_api_listens_; +} + +bool MockAppConfigForServerListen::get_https_api_enabled() +{ + return https_api_enabled_; +} + +std::vector MockAppConfigForServerListen::get_https_api_listens() +{ + return https_api_listens_; +} + +bool MockAppConfigForServerListen::get_http_stream_enabled() +{ + return http_stream_enabled_; +} + +std::vector MockAppConfigForServerListen::get_http_stream_listens() +{ + return http_stream_listens_; +} + +bool MockAppConfigForServerListen::get_https_stream_enabled() +{ + return https_stream_enabled_; +} + +std::vector MockAppConfigForServerListen::get_https_stream_listens() +{ + return https_stream_listens_; +} + +bool MockAppConfigForServerListen::get_rtc_server_tcp_enabled() +{ + return rtc_server_tcp_enabled_; +} + +std::vector MockAppConfigForServerListen::get_rtc_server_tcp_listens() +{ + return rtc_server_tcp_listens_; +} + +std::string MockAppConfigForServerListen::get_rtc_server_protocol() +{ + return rtc_server_protocol_; +} + +bool MockAppConfigForServerListen::get_rtsp_server_enabled() +{ + return rtsp_server_enabled_; +} + +std::vector MockAppConfigForServerListen::get_rtsp_server_listens() +{ + return rtsp_server_listens_; +} + +bool MockAppConfigForServerListen::get_exporter_enabled() +{ + return exporter_enabled_; +} + +std::string MockAppConfigForServerListen::get_exporter_listen() +{ + return exporter_listen_; +} + +// Test SrsServer constructor and destructor to ensure proper initialization +// and cleanup of all server components including listeners, managers, and +// WebRTC session management. +VOID TEST(SrsServerTest, ConstructorAndDestructor) +{ + // Create SrsServer instance - tests constructor initialization + SrsUniquePtr server(new SrsServer()); + + // Verify that the server object was created successfully + EXPECT_TRUE(server.get() != NULL); + + // The destructor will be called automatically when server goes out of scope + // This tests proper cleanup of all components: + // - Signal manager + // - HTTP API mux + // - All protocol listeners (RTMP, RTMPS, HTTP, HTTPS, WebRTC, RTSP, etc.) + // - Stream casters + // - HTTP server and heartbeat + // - Ingester and timer + // - WebRTC session manager + // - PID file locker +} + +// Test SrsServer::initialize() method to verify proper initialization sequence +// including PID file locking, SRT event loop, DTLS certificate, DVR async worker, +// config subscription, HTTP API/server initialization, blackhole, and WebRTC +// session manager. This test covers the major use scenario of server initialization. +VOID TEST(SrsServerTest, InitializeSuccess) +{ + srs_error_t err = srs_success; + + // Create SrsServer instance + SrsUniquePtr server(new SrsServer()); + EXPECT_TRUE(server.get() != NULL); + + // Call initialize() - this is the main test + // This will initialize all server components in the correct sequence: + // 1. PID file locker acquisition + // 2. SRT log and event loop initialization + // 3. WebRTC DTLS certificate initialization + // 4. DVR async call worker start + // 5. Config subscription + // 6. HTTP stream/API configuration check and port reuse logic + // 7. HTTP API mux initialization (if not reusing) + // 8. HTTP server initialization + // 9. WebRTC blackhole initialization + // 10. WebRTC session manager initialization + HELPER_EXPECT_SUCCESS(server->initialize()); + + // Verify that the server was initialized successfully + // The fact that initialize() returned success means all components + // were initialized properly without errors + EXPECT_TRUE(server.get() != NULL); +} + +// Test SrsServer::listen() method to verify proper listener setup for RTMP protocol. +// This test covers the major use scenario of starting RTMP listener on a random port. +VOID TEST(SrsServerTest, ListenRtmpSuccess) +{ + srs_error_t err = srs_success; + + // Generate random port in range [30000, 60000] + SrsRand rand; + int port = rand.integer(30000, 60000); + std::string listen_addr = srs_strconv_format_int(port); + + // Create mock config with RTMP listening enabled + MockAppConfigForServerListen mock_config; + mock_config.rtmp_listens_.push_back(listen_addr); + + // Create SrsServer instance + SrsUniquePtr server(new SrsServer()); + EXPECT_TRUE(server.get() != NULL); + + // Inject mock config + server->config_ = &mock_config; + + // Initialize server first (required before listen) + HELPER_EXPECT_SUCCESS(server->initialize()); + + // Call listen() - this is the main test + // This will: + // 1. Create RTMP listener on the configured port + // 2. Start listening for incoming RTMP connections + // 3. Start connection manager + HELPER_EXPECT_SUCCESS(server->listen()); + + // Verify that the server is listening + // The fact that listen() returned success means: + // - RTMP listener was created successfully + // - Socket binding succeeded on the random port + // - Connection manager started successfully + EXPECT_TRUE(server.get() != NULL); + + // Cleanup: restore original config to avoid side effects + server->config_ = _srs_config; +} + +MockHttpServeMux::MockHttpServeMux() +{ + handle_count_ = 0; +} + +MockHttpServeMux::~MockHttpServeMux() +{ +} + +srs_error_t MockHttpServeMux::handle(std::string pattern, ISrsHttpHandler *handler) +{ + handle_count_++; + patterns_.push_back(pattern); + // Free the handler since we're not actually using it + srs_freep(handler); + return srs_success; +} + +srs_error_t MockHttpServeMux::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r) +{ + return srs_success; +} + +// Test SrsServer::http_handle() method to verify proper HTTP API handler registration. +// This test covers the major use scenario of registering all HTTP API endpoints including +// root API, versioning, summaries, rusages, stats, meminfos, authors, features, vhosts, +// streams, clients, raw API, clusters, test endpoints, metrics, console, and WebRTC APIs. +VOID TEST(SrsServerTest, HttpHandleSuccess) +{ + srs_error_t err = srs_success; + + // Create mock HTTP API mux + MockHttpServeMux* mock_mux = new MockHttpServeMux(); + + // Create SrsServer instance + SrsUniquePtr server(new SrsServer()); + EXPECT_TRUE(server.get() != NULL); + + // Inject mock HTTP API mux + ISrsHttpServeMux* original_mux = server->http_api_mux_; + server->http_api_mux_ = mock_mux; + + // Set reuse_api_over_server_ to false to test all handler registrations + server->reuse_api_over_server_ = false; + + // Call http_handle() - this is the main test + // This will register all HTTP API handlers: + // 1. Root API (/) + // 2. API version (/api/v1/versions) + // 3. API endpoints (/api/, /api/v1/) + // 4. Server stats (/api/v1/summaries, /api/v1/rusages, etc.) + // 5. System info (/api/v1/meminfos, /api/v1/authors, /api/v1/features) + // 6. Stream management (/api/v1/vhosts/, /api/v1/streams/, /api/v1/clients/) + // 7. Raw API (/api/v1/raw) + // 8. Cluster API (/api/v1/clusters) + // 9. Test endpoints (/api/v1/tests/*) + // 10. Metrics (/metrics) + // 11. Console (/console/) + // 12. WebRTC APIs (via listen_rtc_api()) + HELPER_EXPECT_SUCCESS(server->http_handle()); + + // Verify that handlers were registered + // The exact count depends on conditional compilation flags (SRS_GPERF, SRS_VALGRIND, SRS_SIGNAL_API) + // but should be at least 20 handlers (basic APIs + WebRTC APIs) + EXPECT_TRUE(mock_mux->handle_count_ >= 20); + + // Verify some key patterns were registered + bool has_root = false; + bool has_api = false; + bool has_summaries = false; + bool has_metrics = false; + bool has_rtc_play = false; + + for (size_t i = 0; i < mock_mux->patterns_.size(); i++) { + if (mock_mux->patterns_[i] == "/") has_root = true; + if (mock_mux->patterns_[i] == "/api/") has_api = true; + if (mock_mux->patterns_[i] == "/api/v1/summaries") has_summaries = true; + if (mock_mux->patterns_[i] == "/metrics") has_metrics = true; + if (mock_mux->patterns_[i] == "/rtc/v1/play/") has_rtc_play = true; + } + + EXPECT_TRUE(has_root); + EXPECT_TRUE(has_api); + EXPECT_TRUE(has_summaries); + EXPECT_TRUE(has_metrics); + EXPECT_TRUE(has_rtc_play); + + // Cleanup: restore original mux to avoid side effects + server->http_api_mux_ = original_mux; + srs_freep(mock_mux); +} + +MockLogForSignal::MockLogForSignal() +{ + reopen_count_ = 0; +} + +MockLogForSignal::~MockLogForSignal() +{ +} + +srs_error_t MockLogForSignal::initialize() +{ + return srs_success; +} + +void MockLogForSignal::reopen() +{ + reopen_count_++; +} + +void MockLogForSignal::log(SrsLogLevel level, const char *tag, const SrsContextId &context_id, const char *fmt, va_list args) +{ + // Do nothing for mock +} + +MockAppConfigForSignal::MockAppConfigForSignal() +{ + force_grace_quit_ = false; +} + +MockAppConfigForSignal::~MockAppConfigForSignal() +{ +} + +bool MockAppConfigForSignal::is_force_grace_quit() +{ + return force_grace_quit_; +} + +VOID TEST(ServerTest, OnSignalHandling) +{ + // Create SrsServer instance + SrsUniquePtr server(new SrsServer()); + EXPECT_TRUE(server.get() != NULL); + + // Create and inject mock config + MockAppConfigForSignal* mock_config = new MockAppConfigForSignal(); + ISrsAppConfig* original_config = server->config_; + server->config_ = mock_config; + + // Create and inject mock log + MockLogForSignal* mock_log = new MockLogForSignal(); + ISrsLog* original_log = server->log_; + server->log_ = mock_log; + + // Test 1: SRS_SIGNAL_RELOAD should set signal_reload_ flag + server->signal_reload_ = false; + server->on_signal(SRS_SIGNAL_RELOAD); + EXPECT_TRUE(server->signal_reload_); + + // Test 2: SRS_SIGNAL_REOPEN_LOG should call log_->reopen() + EXPECT_EQ(0, mock_log->reopen_count_); + server->on_signal(SRS_SIGNAL_REOPEN_LOG); + EXPECT_EQ(1, mock_log->reopen_count_); + + // Test 3: SRS_SIGNAL_PERSISTENCE_CONFIG should set signal_persistence_config_ flag + server->signal_persistence_config_ = false; + server->on_signal(SRS_SIGNAL_PERSISTENCE_CONFIG); + EXPECT_TRUE(server->signal_persistence_config_); + + // Test 4: SRS_SIGNAL_FAST_QUIT should set signal_fast_quit_ flag + server->signal_fast_quit_ = false; + server->on_signal(SRS_SIGNAL_FAST_QUIT); + EXPECT_TRUE(server->signal_fast_quit_); + + // Test 5: SIGINT should set signal_fast_quit_ flag + server->signal_fast_quit_ = false; + server->on_signal(SIGINT); + EXPECT_TRUE(server->signal_fast_quit_); + + // Test 6: SRS_SIGNAL_GRACEFULLY_QUIT should set signal_gracefully_quit_ flag + server->signal_gracefully_quit_ = false; + server->on_signal(SRS_SIGNAL_GRACEFULLY_QUIT); + EXPECT_TRUE(server->signal_gracefully_quit_); + + // Test 7: SRS_SIGNAL_FAST_QUIT with force_grace_quit enabled should convert to gracefully quit + mock_config->force_grace_quit_ = true; + server->signal_gracefully_quit_ = false; + server->signal_fast_quit_ = false; + server->on_signal(SRS_SIGNAL_FAST_QUIT); + EXPECT_TRUE(server->signal_gracefully_quit_); + EXPECT_FALSE(server->signal_fast_quit_); + + // Test 8: Repeated signals should not change already-set flags + server->signal_fast_quit_ = true; + server->on_signal(SRS_SIGNAL_FAST_QUIT); + EXPECT_TRUE(server->signal_fast_quit_); + + server->signal_gracefully_quit_ = true; + server->on_signal(SRS_SIGNAL_GRACEFULLY_QUIT); + EXPECT_TRUE(server->signal_gracefully_quit_); + + // Cleanup: restore original config and log + server->config_ = original_config; + server->log_ = original_log; + srs_freep(mock_config); + srs_freep(mock_log); +} + +// Mock config implementation for SrsServer::do2_cycle() testing +MockAppConfigForDo2Cycle::MockAppConfigForDo2Cycle() +{ + reload_error_ = srs_success; + persistence_error_ = srs_success; + reload_count_ = 0; + persistence_count_ = 0; + reload_state_ = SrsReloadStateInit; +} + +MockAppConfigForDo2Cycle::~MockAppConfigForDo2Cycle() +{ + srs_freep(reload_error_); + srs_freep(persistence_error_); +} + +srs_error_t MockAppConfigForDo2Cycle::reload(SrsReloadState *pstate) +{ + reload_count_++; + if (pstate) { + *pstate = reload_state_; + } + return srs_error_copy(reload_error_); +} + +srs_error_t MockAppConfigForDo2Cycle::persistence() +{ + persistence_count_++; + return srs_error_copy(persistence_error_); +} + +void MockAppConfigForDo2Cycle::reset() +{ + srs_freep(reload_error_); + srs_freep(persistence_error_); + reload_error_ = srs_success; + persistence_error_ = srs_success; + reload_count_ = 0; + persistence_count_ = 0; + reload_state_ = SrsReloadStateInit; +} + +VOID TEST(ServerTest, Do2CycleReloadSuccess) +{ + srs_error_t err; + + // Create SrsServer instance + SrsUniquePtr server(new SrsServer()); + EXPECT_TRUE(server.get() != NULL); + + // Create and inject mock config + MockAppConfigForDo2Cycle* mock_config = new MockAppConfigForDo2Cycle(); + ISrsAppConfig* original_config = server->config_; + server->config_ = mock_config; + + // Test major use scenario: signal_reload_ triggers config reload with success + // This covers the main code path where reload signal is processed successfully + server->signal_reload_ = true; + server->signal_fast_quit_ = false; + server->signal_gracefully_quit_ = false; + mock_config->reload_state_ = SrsReloadStateFinished; + HELPER_EXPECT_SUCCESS(server->do2_cycle()); + EXPECT_FALSE(server->signal_reload_); // Flag should be cleared after processing + EXPECT_EQ(1, mock_config->reload_count_); // Config reload should be called once + + // Cleanup: restore original config + server->config_ = original_config; + srs_freep(mock_config); +} + +// Mock hourglass implementation for SrsServer::setup_ticks() testing +MockHourGlassForSetupTicks::MockHourGlassForSetupTicks() +{ + tick_count_ = 0; + start_count_ = 0; +} + +MockHourGlassForSetupTicks::~MockHourGlassForSetupTicks() +{ +} + +srs_error_t MockHourGlassForSetupTicks::start() +{ + start_count_++; + return srs_success; +} + +void MockHourGlassForSetupTicks::stop() +{ +} + +srs_error_t MockHourGlassForSetupTicks::tick(srs_utime_t interval) +{ + return tick(0, interval); +} + +srs_error_t MockHourGlassForSetupTicks::tick(int event, srs_utime_t interval) +{ + tick_count_++; + tick_events_.push_back(event); + tick_intervals_.push_back(interval); + return srs_success; +} + +void MockHourGlassForSetupTicks::untick(int event) +{ +} + +// Mock app factory implementation for SrsServer::setup_ticks() testing +MockAppFactoryForSetupTicks::MockAppFactoryForSetupTicks() +{ + mock_hourglass_ = new MockHourGlassForSetupTicks(); +} + +MockAppFactoryForSetupTicks::~MockAppFactoryForSetupTicks() +{ + // Do NOT free mock_hourglass_ here because it's owned by SrsServer::timer_ + // and will be freed by SrsServer destructor + mock_hourglass_ = NULL; +} + +ISrsHourGlass *MockAppFactoryForSetupTicks::create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval) +{ + return mock_hourglass_; +} + +// Mock config implementation for SrsServer::setup_ticks() testing +MockAppConfigForSetupTicks::MockAppConfigForSetupTicks() +{ + stats_enabled_ = false; + heartbeat_enabled_ = false; + heartbeat_interval_ = 1 * SRS_UTIME_MILLISECONDS; +} + +MockAppConfigForSetupTicks::~MockAppConfigForSetupTicks() +{ +} + +bool MockAppConfigForSetupTicks::get_stats_enabled() +{ + return stats_enabled_; +} + +bool MockAppConfigForSetupTicks::get_heartbeat_enabled() +{ + return heartbeat_enabled_; +} + +srs_utime_t MockAppConfigForSetupTicks::get_heartbeat_interval() +{ + return heartbeat_interval_; +} + +VOID TEST(ServerTest, SetupTicksWithStatsAndHeartbeat) +{ + srs_error_t err; + + // Create SrsServer instance + SrsUniquePtr server(new SrsServer()); + EXPECT_TRUE(server.get() != NULL); + + // Create and inject mock config + MockAppConfigForSetupTicks* mock_config = new MockAppConfigForSetupTicks(); + ISrsAppConfig* original_config = server->config_; + server->config_ = mock_config; + + // Create and inject mock app factory + MockAppFactoryForSetupTicks* mock_factory = new MockAppFactoryForSetupTicks(); + SrsAppFactory* original_factory = server->app_factory_; + server->app_factory_ = mock_factory; + + // Test major use scenario: setup_ticks with stats and heartbeat enabled + // This covers the main code path where both stats and heartbeat are enabled + mock_config->stats_enabled_ = true; + mock_config->heartbeat_enabled_ = true; + mock_config->heartbeat_interval_ = 1 * SRS_UTIME_MILLISECONDS; + + HELPER_EXPECT_SUCCESS(server->setup_ticks()); + + // Verify that timer was created and started + EXPECT_EQ(1, mock_factory->mock_hourglass_->start_count_); + + // Verify that ticks were registered + // When stats_enabled: events 2,4,5,6,7,8,10,11,12 (9 ticks) + // When heartbeat_enabled: event 9 (1 tick) + // Total: 10 ticks + EXPECT_EQ(10, mock_factory->mock_hourglass_->tick_count_); + + // Verify specific tick events are registered + std::vector &events = mock_factory->mock_hourglass_->tick_events_; + EXPECT_TRUE(std::find(events.begin(), events.end(), 2) != events.end()); // rusage + EXPECT_TRUE(std::find(events.begin(), events.end(), 4) != events.end()); // disk + EXPECT_TRUE(std::find(events.begin(), events.end(), 5) != events.end()); // meminfo + EXPECT_TRUE(std::find(events.begin(), events.end(), 6) != events.end()); // platform + EXPECT_TRUE(std::find(events.begin(), events.end(), 7) != events.end()); // network + EXPECT_TRUE(std::find(events.begin(), events.end(), 8) != events.end()); // kbps + EXPECT_TRUE(std::find(events.begin(), events.end(), 9) != events.end()); // heartbeat + EXPECT_TRUE(std::find(events.begin(), events.end(), 10) != events.end()); // udp snmp + EXPECT_TRUE(std::find(events.begin(), events.end(), 11) != events.end()); // rtc sessions + EXPECT_TRUE(std::find(events.begin(), events.end(), 12) != events.end()); // server stats + + // Cleanup: restore original config and factory + server->config_ = original_config; + server->app_factory_ = original_factory; + srs_freep(mock_config); + srs_freep(mock_factory); +} + +MockRtcSessionManagerForNotify::MockRtcSessionManagerForNotify() +{ + update_rtc_sessions_count_ = 0; +} + +MockRtcSessionManagerForNotify::~MockRtcSessionManagerForNotify() +{ +} + +void MockRtcSessionManagerForNotify::srs_update_rtc_sessions() +{ + update_rtc_sessions_count_++; +} + +MockHttpHeartbeatForNotify::MockHttpHeartbeatForNotify() +{ + heartbeat_count_ = 0; +} + +MockHttpHeartbeatForNotify::~MockHttpHeartbeatForNotify() +{ +} + +void MockHttpHeartbeatForNotify::heartbeat() +{ + heartbeat_count_++; +} + +MockConnectionManagerForResampleKbps::MockConnectionManagerForResampleKbps() +{ +} + +MockConnectionManagerForResampleKbps::~MockConnectionManagerForResampleKbps() +{ +} + +srs_error_t MockConnectionManagerForResampleKbps::start() +{ + return srs_success; +} + +bool MockConnectionManagerForResampleKbps::empty() +{ + return connections_.empty(); +} + +size_t MockConnectionManagerForResampleKbps::size() +{ + return connections_.size(); +} + +void MockConnectionManagerForResampleKbps::add(ISrsResource *conn, bool *exists) +{ + connections_.push_back(conn); +} + +ISrsResource *MockConnectionManagerForResampleKbps::at(int index) +{ + if (index < 0 || index >= (int)connections_.size()) { + return NULL; + } + return connections_[index]; +} + +void MockConnectionManagerForResampleKbps::remove(ISrsResource *c) +{ +} + +void MockConnectionManagerForResampleKbps::subscribe(ISrsDisposingHandler *h) +{ +} + +void MockConnectionManagerForResampleKbps::unsubscribe(ISrsDisposingHandler *h) +{ +} + +MockStatisticForResampleKbps::MockStatisticForResampleKbps() +{ + kbps_add_delta_count_ = 0; + kbps_sample_count_ = 0; +} + +MockStatisticForResampleKbps::~MockStatisticForResampleKbps() +{ +} + +void MockStatisticForResampleKbps::on_disconnect(std::string id, srs_error_t err) +{ +} + +srs_error_t MockStatisticForResampleKbps::on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type) +{ + return srs_success; +} + +srs_error_t MockStatisticForResampleKbps::on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height) +{ + return srs_success; +} + +srs_error_t MockStatisticForResampleKbps::on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object) +{ + return srs_success; +} + +void MockStatisticForResampleKbps::on_stream_publish(ISrsRequest *req, std::string publisher_id) +{ +} + +void MockStatisticForResampleKbps::on_stream_close(ISrsRequest *req) +{ +} + +void MockStatisticForResampleKbps::kbps_add_delta(std::string id, ISrsKbpsDelta *delta) +{ + kbps_add_delta_count_++; +} + +void MockStatisticForResampleKbps::kbps_sample() +{ + kbps_sample_count_++; +} + +srs_error_t MockStatisticForResampleKbps::on_video_frames(ISrsRequest *req, int nb_frames) +{ + return srs_success; +} + +void MockStatisticForResampleKbps::reset() +{ + kbps_add_delta_count_ = 0; + kbps_sample_count_ = 0; +} + +VOID TEST(SrsServerTest, NotifyEventDispatch) +{ + srs_error_t err = srs_success; + + // Create server instance + SrsUniquePtr server(new SrsServer()); + + // Create mock objects + MockRtcSessionManagerForNotify* mock_rtc_manager = new MockRtcSessionManagerForNotify(); + MockHttpHeartbeatForNotify* mock_heartbeat = new MockHttpHeartbeatForNotify(); + + // Save original pointers + SrsRtcSessionManager* original_rtc_manager = server->rtc_session_manager_; + SrsHttpHeartbeat* original_heartbeat = server->http_heartbeat_; + + // Inject mock objects (no cast needed since they inherit from the base classes) + server->rtc_session_manager_ = mock_rtc_manager; + server->http_heartbeat_ = mock_heartbeat; + + // Test event 11 - RTC session update + HELPER_EXPECT_SUCCESS(server->notify(11, 0, 0)); + EXPECT_EQ(1, mock_rtc_manager->update_rtc_sessions_count_); + + // Test event 9 - HTTP heartbeat + HELPER_EXPECT_SUCCESS(server->notify(9, 0, 0)); + EXPECT_EQ(1, mock_heartbeat->heartbeat_count_); + + // Test event 8 - resample_kbps (should not crash) + HELPER_EXPECT_SUCCESS(server->notify(8, 0, 0)); + + // Test multiple calls to same event + HELPER_EXPECT_SUCCESS(server->notify(11, 0, 0)); + EXPECT_EQ(2, mock_rtc_manager->update_rtc_sessions_count_); + + HELPER_EXPECT_SUCCESS(server->notify(9, 0, 0)); + EXPECT_EQ(2, mock_heartbeat->heartbeat_count_); + + // Test other events (should not crash) + HELPER_EXPECT_SUCCESS(server->notify(2, 0, 0)); // srs_update_system_rusage + HELPER_EXPECT_SUCCESS(server->notify(4, 0, 0)); // srs_update_disk_stat + HELPER_EXPECT_SUCCESS(server->notify(5, 0, 0)); // srs_update_meminfo + HELPER_EXPECT_SUCCESS(server->notify(6, 0, 0)); // srs_update_platform_info + HELPER_EXPECT_SUCCESS(server->notify(7, 0, 0)); // srs_update_network_devices + HELPER_EXPECT_SUCCESS(server->notify(10, 0, 0)); // srs_update_udp_snmp_statistic + HELPER_EXPECT_SUCCESS(server->notify(12, 0, 0)); // srs_update_server_statistics + + // Restore original pointers + server->rtc_session_manager_ = original_rtc_manager; + server->http_heartbeat_ = original_heartbeat; + + // Cleanup mock objects + srs_freep(mock_rtc_manager); + srs_freep(mock_heartbeat); +} + +VOID TEST(SrsServerTest, ResampleKbps) +{ + // Create server instance + SrsUniquePtr server(new SrsServer()); + + // Create mock objects + MockConnectionManagerForResampleKbps *mock_conn_manager = new MockConnectionManagerForResampleKbps(); + MockStatisticForResampleKbps *mock_stat = new MockStatisticForResampleKbps(); + + // Save original pointers + ISrsResourceManager *original_conn_manager = server->conn_manager_; + ISrsStatistic *original_stat = server->stat_; + + // Inject mock objects + server->conn_manager_ = mock_conn_manager; + server->stat_ = mock_stat; + + // Test case: Empty connection manager - verifies kbps_sample is called + // This is the major use scenario: resample_kbps() should always call + // stat_->kbps_sample() to update global server statistics, even when + // there are no connections. + if (true) { + server->resample_kbps(); + EXPECT_EQ(0, mock_stat->kbps_add_delta_count_); + EXPECT_EQ(1, mock_stat->kbps_sample_count_); + } + + // Restore original pointers + server->conn_manager_ = original_conn_manager; + server->stat_ = original_stat; + + // Cleanup mock objects + srs_freep(mock_conn_manager); + srs_freep(mock_stat); +} + +MockAppConfigForConnectionLimit::MockAppConfigForConnectionLimit() +{ + max_connections_ = 10; +} + +MockAppConfigForConnectionLimit::~MockAppConfigForConnectionLimit() +{ +} + +int MockAppConfigForConnectionLimit::get_max_connections() +{ + return max_connections_; +} + +MockConnectionManagerForConnectionLimit::MockConnectionManagerForConnectionLimit() +{ + connection_count_ = 0; +} + +MockConnectionManagerForConnectionLimit::~MockConnectionManagerForConnectionLimit() +{ +} + +srs_error_t MockConnectionManagerForConnectionLimit::start() +{ + return srs_success; +} + +bool MockConnectionManagerForConnectionLimit::empty() +{ + return connection_count_ == 0; +} + +size_t MockConnectionManagerForConnectionLimit::size() +{ + return connection_count_; +} + +void MockConnectionManagerForConnectionLimit::add(ISrsResource *conn, bool *exists) +{ +} + +ISrsResource *MockConnectionManagerForConnectionLimit::at(int index) +{ + return NULL; +} + +void MockConnectionManagerForConnectionLimit::remove(ISrsResource *c) +{ +} + +void MockConnectionManagerForConnectionLimit::subscribe(ISrsDisposingHandler *h) +{ +} + +void MockConnectionManagerForConnectionLimit::unsubscribe(ISrsDisposingHandler *h) +{ +} + +VOID TEST(SrsServerTest, OnBeforeConnectionExceedLimit) +{ + srs_error_t err = srs_success; + + // Create server instance + SrsUniquePtr server(new SrsServer()); + + // Create mock objects + MockAppConfigForConnectionLimit *mock_config = new MockAppConfigForConnectionLimit(); + MockConnectionManagerForConnectionLimit *mock_conn_manager = new MockConnectionManagerForConnectionLimit(); + + // Save original pointers + ISrsAppConfig *original_config = server->config_; + ISrsResourceManager *original_conn_manager = server->conn_manager_; + + // Inject mock objects + server->config_ = mock_config; + server->conn_manager_ = mock_conn_manager; + + // Test case: Connection limit exceeded + // This is the major use scenario: when current connections >= max_connections, + // on_before_connection() should return ERROR_EXCEED_CONNECTIONS error. + if (true) { + // Set max connections to 10 + mock_config->max_connections_ = 10; + // Set current connections to 10 (at limit) + mock_conn_manager->connection_count_ = 10; + + // Try to accept a new connection - should fail + err = server->on_before_connection("RTMP", 100, "192.168.1.100", 1935); + EXPECT_TRUE(err != srs_success); + EXPECT_EQ(ERROR_EXCEED_CONNECTIONS, srs_error_code(err)); + srs_freep(err); + } + + // Test case: Connection limit not exceeded + if (true) { + // Set max connections to 10 + mock_config->max_connections_ = 10; + // Set current connections to 9 (below limit) + mock_conn_manager->connection_count_ = 9; + + // Try to accept a new connection - should succeed + HELPER_EXPECT_SUCCESS(server->on_before_connection("RTMP", 101, "192.168.1.101", 1935)); + } + + // Restore original pointers + server->config_ = original_config; + server->conn_manager_ = original_conn_manager; + + // Cleanup mock objects + srs_freep(mock_config); + srs_freep(mock_conn_manager); +} + +VOID TEST(SrsRtmpTransportTest, BasicOperations) +{ + srs_error_t err = srs_success; + + // Create a dummy file descriptor (cast from int for testing) + // Note: We won't actually use this for I/O, just testing the wrapper methods + srs_netfd_t dummy_fd = (srs_netfd_t)((void*)0x1234); + + // Create SrsRtmpTransport instance + SrsUniquePtr transport(new SrsRtmpTransport(dummy_fd)); + + // Test fd() - should return the file descriptor we passed + EXPECT_EQ(dummy_fd, transport->fd()); + + // Test io() - should return the socket interface (not NULL) + EXPECT_TRUE(transport->io() != NULL); + + // Test handshake() - should return success (no-op for plain RTMP) + HELPER_EXPECT_SUCCESS(transport->handshake()); + + // Test transport_type() - should return "plaintext" + EXPECT_STREQ("plaintext", transport->transport_type()); + + // Test get_recv_bytes() - should return 0 for new connection + EXPECT_EQ(0, transport->get_recv_bytes()); + + // Test get_send_bytes() - should return 0 for new connection + EXPECT_EQ(0, transport->get_send_bytes()); + + // Prevent the destructor from trying to close the dummy fd + // by setting the internal stfd_ to NULL before destruction + transport->skt_->stfd_ = NULL; +} diff --git a/trunk/src/utest/srs_utest_app10.hpp b/trunk/src/utest/srs_utest_app10.hpp new file mode 100644 index 000000000..a7a10dff3 --- /dev/null +++ b/trunk/src/utest/srs_utest_app10.hpp @@ -0,0 +1,296 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#ifndef SRS_UTEST_APP10_HPP +#define SRS_UTEST_APP10_HPP + +/* +#include +*/ +#include +#include +#include +#include +#include +#include +#include + +// Mock config for testing SrsServer::listen() +class MockAppConfigForServerListen : public MockAppConfig +{ +public: + std::vector rtmp_listens_; + bool rtmps_enabled_; + std::vector rtmps_listens_; + bool http_api_enabled_; + std::vector http_api_listens_; + bool https_api_enabled_; + std::vector https_api_listens_; + bool http_stream_enabled_; + std::vector http_stream_listens_; + bool https_stream_enabled_; + std::vector https_stream_listens_; + bool rtc_server_tcp_enabled_; + std::vector rtc_server_tcp_listens_; + std::string rtc_server_protocol_; + bool rtsp_server_enabled_; + std::vector rtsp_server_listens_; + bool exporter_enabled_; + std::string exporter_listen_; + +public: + MockAppConfigForServerListen(); + virtual ~MockAppConfigForServerListen(); + +public: + virtual std::vector get_listens(); + virtual bool get_rtmps_enabled(); + virtual std::vector get_rtmps_listen(); + virtual bool get_http_api_enabled(); + virtual std::vector get_http_api_listens(); + virtual bool get_https_api_enabled(); + virtual std::vector get_https_api_listens(); + virtual bool get_http_stream_enabled(); + virtual std::vector get_http_stream_listens(); + virtual bool get_https_stream_enabled(); + virtual std::vector get_https_stream_listens(); + virtual bool get_rtc_server_tcp_enabled(); + virtual std::vector get_rtc_server_tcp_listens(); + virtual std::string get_rtc_server_protocol(); + virtual bool get_rtsp_server_enabled(); + virtual std::vector get_rtsp_server_listens(); + virtual bool get_exporter_enabled(); + virtual std::string get_exporter_listen(); +}; + +// Mock ISrsHttpServeMux for testing SrsServer::http_handle() +class MockHttpServeMux : public ISrsHttpServeMux +{ +public: + int handle_count_; + std::vector patterns_; + +public: + MockHttpServeMux(); + virtual ~MockHttpServeMux(); + +public: + virtual srs_error_t handle(std::string pattern, ISrsHttpHandler *handler); + virtual srs_error_t serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r); +}; + +// Mock ISrsLog for testing SrsServer::on_signal() +class MockLogForSignal : public ISrsLog +{ +public: + int reopen_count_; + +public: + MockLogForSignal(); + virtual ~MockLogForSignal(); + +public: + virtual srs_error_t initialize(); + virtual void reopen(); + virtual void log(SrsLogLevel level, const char *tag, const SrsContextId &context_id, const char *fmt, va_list args); +}; + +// Mock ISrsAppConfig for testing SrsServer::on_signal() +class MockAppConfigForSignal : public MockAppConfig +{ +public: + bool force_grace_quit_; + +public: + MockAppConfigForSignal(); + virtual ~MockAppConfigForSignal(); + +public: + virtual bool is_force_grace_quit(); +}; + +// Mock ISrsAppConfig for testing SrsServer::do2_cycle() +class MockAppConfigForDo2Cycle : public MockAppConfig +{ +public: + srs_error_t reload_error_; + srs_error_t persistence_error_; + int reload_count_; + int persistence_count_; + SrsReloadState reload_state_; + +public: + MockAppConfigForDo2Cycle(); + virtual ~MockAppConfigForDo2Cycle(); + +public: + virtual srs_error_t reload(SrsReloadState *pstate); + virtual srs_error_t persistence(); + void reset(); +}; + +// Mock ISrsHourGlass for testing SrsServer::setup_ticks() +class MockHourGlassForSetupTicks : public ISrsHourGlass +{ +public: + int tick_count_; + int start_count_; + std::vector tick_events_; + std::vector tick_intervals_; + +public: + MockHourGlassForSetupTicks(); + virtual ~MockHourGlassForSetupTicks(); + +public: + virtual srs_error_t start(); + virtual void stop(); + virtual srs_error_t tick(srs_utime_t interval); + virtual srs_error_t tick(int event, srs_utime_t interval); + virtual void untick(int event); +}; + +// Mock SrsAppFactory for testing SrsServer::setup_ticks() +class MockAppFactoryForSetupTicks : public SrsAppFactory +{ +public: + MockHourGlassForSetupTicks *mock_hourglass_; + +public: + MockAppFactoryForSetupTicks(); + virtual ~MockAppFactoryForSetupTicks(); + +public: + virtual ISrsHourGlass *create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval); +}; + +// Mock ISrsAppConfig for testing SrsServer::setup_ticks() +class MockAppConfigForSetupTicks : public MockAppConfig +{ +public: + bool stats_enabled_; + bool heartbeat_enabled_; + srs_utime_t heartbeat_interval_; + +public: + MockAppConfigForSetupTicks(); + virtual ~MockAppConfigForSetupTicks(); + +public: + virtual bool get_stats_enabled(); + virtual bool get_heartbeat_enabled(); + virtual srs_utime_t get_heartbeat_interval(); +}; + +// Mock SrsRtcSessionManager for testing SrsServer::notify() +class MockRtcSessionManagerForNotify : public SrsRtcSessionManager +{ +public: + int update_rtc_sessions_count_; + +public: + MockRtcSessionManagerForNotify(); + virtual ~MockRtcSessionManagerForNotify(); + +public: + virtual void srs_update_rtc_sessions(); +}; + +// Mock SrsHttpHeartbeat for testing SrsServer::notify() +class MockHttpHeartbeatForNotify : public SrsHttpHeartbeat +{ +public: + int heartbeat_count_; + +public: + MockHttpHeartbeatForNotify(); + virtual ~MockHttpHeartbeatForNotify(); + +public: + virtual void heartbeat(); +}; + +// Mock connection manager for testing SrsServer::resample_kbps() +class MockConnectionManagerForResampleKbps : public ISrsResourceManager +{ +public: + std::vector connections_; + +public: + MockConnectionManagerForResampleKbps(); + virtual ~MockConnectionManagerForResampleKbps(); + +public: + virtual srs_error_t start(); + virtual bool empty(); + virtual size_t size(); + virtual void add(ISrsResource *conn, bool *exists = NULL); + virtual ISrsResource *at(int index); + virtual void remove(ISrsResource *c); + virtual void subscribe(ISrsDisposingHandler *h); + virtual void unsubscribe(ISrsDisposingHandler *h); +}; + +// Mock statistic for testing SrsServer::resample_kbps() +class MockStatisticForResampleKbps : public ISrsStatistic +{ +public: + int kbps_add_delta_count_; + int kbps_sample_count_; + +public: + MockStatisticForResampleKbps(); + virtual ~MockStatisticForResampleKbps(); + +public: + virtual void on_disconnect(std::string id, srs_error_t err); + virtual srs_error_t on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type); + virtual srs_error_t on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height); + virtual srs_error_t on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object); + virtual void on_stream_publish(ISrsRequest *req, std::string publisher_id); + virtual void on_stream_close(ISrsRequest *req); + virtual void kbps_add_delta(std::string id, ISrsKbpsDelta *delta); + virtual void kbps_sample(); + virtual srs_error_t on_video_frames(ISrsRequest *req, int nb_frames); + void reset(); +}; + +// Mock config for testing SrsServer::on_before_connection() +class MockAppConfigForConnectionLimit : public MockAppConfig +{ +public: + int max_connections_; + +public: + MockAppConfigForConnectionLimit(); + virtual ~MockAppConfigForConnectionLimit(); + +public: + virtual int get_max_connections(); +}; + +// Mock connection manager for testing SrsServer::on_before_connection() +class MockConnectionManagerForConnectionLimit : public ISrsResourceManager +{ +public: + size_t connection_count_; + +public: + MockConnectionManagerForConnectionLimit(); + virtual ~MockConnectionManagerForConnectionLimit(); + +public: + virtual srs_error_t start(); + virtual bool empty(); + virtual size_t size(); + virtual void add(ISrsResource *conn, bool *exists = NULL); + virtual ISrsResource *at(int index); + virtual void remove(ISrsResource *c); + virtual void subscribe(ISrsDisposingHandler *h); + virtual void unsubscribe(ISrsDisposingHandler *h); +}; + +#endif diff --git a/trunk/src/utest/srs_utest_app6.cpp b/trunk/src/utest/srs_utest_app6.cpp index 9d511e4c2..0ece1c19d 100644 --- a/trunk/src/utest/srs_utest_app6.cpp +++ b/trunk/src/utest/srs_utest_app6.cpp @@ -2690,6 +2690,21 @@ void MockRtcStatistic::on_stream_close(ISrsRequest *req) // Do nothing in mock } +void MockRtcStatistic::kbps_add_delta(std::string id, ISrsKbpsDelta *delta) +{ + // Do nothing in mock +} + +void MockRtcStatistic::kbps_sample() +{ + // Do nothing in mock +} + +srs_error_t MockRtcStatistic::on_video_frames(ISrsRequest *req, int nb_frames) +{ + return srs_success; +} + // Unit tests for SrsRtcAsyncCallOnStop::call() VOID TEST(RtcAsyncCallOnStopTest, CallWithHttpHooksDisabled) { @@ -4303,6 +4318,17 @@ SrsSharedPtr MockLiveSourceManager::fetch(ISrsRequest *r) return mock_source_; } +void MockLiveSourceManager::dispose() +{ + // Mock implementation - no-op for testing +} + +srs_error_t MockLiveSourceManager::initialize() +{ + // Mock implementation - always succeeds + return srs_success; +} + void MockLiveSourceManager::set_fetch_or_create_error(srs_error_t err) { srs_freep(fetch_or_create_error_); diff --git a/trunk/src/utest/srs_utest_app6.hpp b/trunk/src/utest/srs_utest_app6.hpp index 7e67d053e..9977cdbb3 100644 --- a/trunk/src/utest/srs_utest_app6.hpp +++ b/trunk/src/utest/srs_utest_app6.hpp @@ -288,8 +288,43 @@ public: virtual bool get_exporter_enabled() { return false; } virtual std::string get_exporter_listen() { return ""; } virtual bool get_stats_enabled() { return false; } + virtual int get_stats_network() { return 0; } virtual bool get_heartbeat_enabled() { return false; } virtual srs_utime_t get_heartbeat_interval() { return 0; } + virtual std::string get_rtmps_ssl_cert() { return ""; } + virtual std::string get_rtmps_ssl_key() { return ""; } + virtual SrsConfDirective *get_vhost(std::string vhost, bool try_default_vhost = true) { return NULL; } + virtual bool get_vhost_enabled(std::string vhost) { return true; } + virtual bool get_debug_srs_upnode(std::string vhost) { return true; } + virtual int get_out_ack_size(std::string vhost) { return 2500000; } + virtual int get_in_ack_size(std::string vhost) { return 2500000; } + virtual int get_chunk_size(std::string vhost) { return 60000; } + virtual bool get_gop_cache(std::string vhost) { return true; } + virtual int get_gop_cache_max_frames(std::string vhost) { return 2500; } + virtual bool get_tcp_nodelay(std::string vhost) { return false; } + virtual srs_utime_t get_mw_sleep(std::string vhost, bool is_rtc = false) { return 350 * SRS_UTIME_MILLISECONDS; } + virtual srs_utime_t get_send_min_interval(std::string vhost) { return 0; } + virtual bool get_mr_enabled(std::string vhost) { return false; } + virtual srs_utime_t get_mr_sleep(std::string vhost) { return 350 * SRS_UTIME_MILLISECONDS; } + virtual srs_utime_t get_publish_1stpkt_timeout(std::string vhost) { return 20000 * SRS_UTIME_MILLISECONDS; } + virtual srs_utime_t get_publish_normal_timeout(std::string vhost) { return 5000 * SRS_UTIME_MILLISECONDS; } + virtual srs_utime_t get_publish_kickoff_for_idle(std::string vhost) { return 0; } + virtual bool get_refer_enabled(std::string vhost) { return false; } + virtual SrsConfDirective *get_refer_all(std::string vhost) { return NULL; } + virtual SrsConfDirective *get_refer_play(std::string vhost) { return NULL; } + virtual SrsConfDirective *get_refer_publish(std::string vhost) { return NULL; } + virtual bool get_vhost_origin_cluster(std::string vhost) { return false; } + virtual std::vector get_vhost_coworkers(std::string vhost) { return std::vector(); } + virtual bool get_vhost_edge_token_traverse(std::string vhost) { return false; } + virtual SrsConfDirective *get_vhost_edge_origin(std::string vhost) { return NULL; } + virtual SrsConfDirective *get_vhost_on_connect(std::string vhost) { return NULL; } + virtual SrsConfDirective *get_vhost_on_close(std::string vhost) { return NULL; } + virtual SrsConfDirective *get_vhost_on_publish(std::string vhost) { return NULL; } + virtual SrsConfDirective *get_vhost_on_play(std::string vhost) { return NULL; } + virtual bool get_rtc_enabled(std::string vhost) { return false; } + virtual bool get_rtsp_enabled(std::string vhost) { return false; } + virtual bool get_rtc_from_rtmp(std::string vhost) { return false; } + virtual bool get_rtsp_from_rtmp(std::string vhost) { return false; } // ISrsAppConfig methods virtual bool get_vhost_http_hooks_enabled(std::string vhost); virtual SrsConfDirective *get_vhost_on_stop(std::string vhost); @@ -421,6 +456,9 @@ public: virtual srs_error_t on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object); virtual void on_stream_publish(ISrsRequest *req, std::string publisher_id); virtual void on_stream_close(ISrsRequest *req); + virtual void kbps_add_delta(std::string id, ISrsKbpsDelta *delta); + virtual void kbps_sample(); + virtual srs_error_t on_video_frames(ISrsRequest *req, int nb_frames); void set_on_client_error(srs_error_t err); void reset(); }; @@ -653,6 +691,8 @@ public: virtual ~MockLiveSourceManager(); virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr &pps); virtual SrsSharedPtr fetch(ISrsRequest *r); + virtual void dispose(); + virtual srs_error_t initialize(); void set_fetch_or_create_error(srs_error_t err); void set_can_publish(bool can_publish); void reset(); diff --git a/trunk/src/utest/srs_utest_app7.cpp b/trunk/src/utest/srs_utest_app7.cpp index 733c8653a..8ed2d2d85 100644 --- a/trunk/src/utest/srs_utest_app7.cpp +++ b/trunk/src/utest/srs_utest_app7.cpp @@ -1002,6 +1002,30 @@ MockConnectionManagerForExpire::~MockConnectionManagerForExpire() { } +srs_error_t MockConnectionManagerForExpire::start() +{ + return srs_success; +} + +bool MockConnectionManagerForExpire::empty() +{ + return true; +} + +size_t MockConnectionManagerForExpire::size() +{ + return 0; +} + +void MockConnectionManagerForExpire::add(ISrsResource * /*conn*/, bool * /*exists*/) +{ +} + +ISrsResource *MockConnectionManagerForExpire::at(int /*index*/) +{ + return NULL; +} + void MockConnectionManagerForExpire::remove(ISrsResource *c) { removed_resource_ = c; diff --git a/trunk/src/utest/srs_utest_app7.hpp b/trunk/src/utest/srs_utest_app7.hpp index 508aa278d..fd18bfa86 100644 --- a/trunk/src/utest/srs_utest_app7.hpp +++ b/trunk/src/utest/srs_utest_app7.hpp @@ -101,6 +101,11 @@ public: virtual ~MockConnectionManagerForExpire(); public: + virtual srs_error_t start(); + virtual bool empty(); + virtual size_t size(); + virtual void add(ISrsResource *conn, bool *exists = NULL); + virtual ISrsResource *at(int index); virtual void remove(ISrsResource *c); virtual void subscribe(ISrsDisposingHandler *h); virtual void unsubscribe(ISrsDisposingHandler *h); diff --git a/trunk/src/utest/srs_utest_app9.cpp b/trunk/src/utest/srs_utest_app9.cpp index 4e88beffd..5e6beea36 100644 --- a/trunk/src/utest/srs_utest_app9.cpp +++ b/trunk/src/utest/srs_utest_app9.cpp @@ -1901,6 +1901,21 @@ void MockStatisticForOriginHub::on_stream_close(ISrsRequest *req) { } +void MockStatisticForOriginHub::kbps_add_delta(std::string id, ISrsKbpsDelta *delta) +{ + // Do nothing in mock +} + +void MockStatisticForOriginHub::kbps_sample() +{ + // Do nothing in mock +} + +srs_error_t MockStatisticForOriginHub::on_video_frames(ISrsRequest *req, int nb_frames) +{ + return srs_success; +} + // Mock ISrsNgExec implementation MockNgExecForOriginHub::MockNgExecForOriginHub() { diff --git a/trunk/src/utest/srs_utest_app9.hpp b/trunk/src/utest/srs_utest_app9.hpp index 220f72555..2c0d8dad6 100644 --- a/trunk/src/utest/srs_utest_app9.hpp +++ b/trunk/src/utest/srs_utest_app9.hpp @@ -198,6 +198,9 @@ public: virtual srs_error_t on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object); virtual void on_stream_publish(ISrsRequest *req, std::string publisher_id); virtual void on_stream_close(ISrsRequest *req); + virtual void kbps_add_delta(std::string id, ISrsKbpsDelta *delta); + virtual void kbps_sample(); + virtual srs_error_t on_video_frames(ISrsRequest *req, int nb_frames); }; // Mock ISrsNgExec for testing SrsOriginHub::on_publish diff --git a/trunk/src/utest/srs_utest_kernel3.cpp b/trunk/src/utest/srs_utest_kernel3.cpp index a14dcc0ef..a180bc89d 100644 --- a/trunk/src/utest/srs_utest_kernel3.cpp +++ b/trunk/src/utest/srs_utest_kernel3.cpp @@ -1301,6 +1301,10 @@ VOID TEST(KernelErrorTest, AsanReportCallback) { #ifdef SRS_SANITIZER_LOG // Test asan_report_callback function with various input formats + // Temporarily disable log output to avoid cluttering test results + MockEmptyLog* mock_log = dynamic_cast(_srs_log); + SrsLogLevel original_level = mock_log->level_; + mock_log->level_ = SrsLogLevelDisabled; // Test with simple backtrace line const char *simple_trace = " #0 0x555555555820 in foo /path/to/file.cpp:123"; @@ -1349,6 +1353,9 @@ VOID TEST(KernelErrorTest, AsanReportCallback) asan_report_callback(malformed_trace); // Function should not crash with malformed input EXPECT_TRUE(true); + + // Restore original log level + mock_log->level_ = original_level; #else // On builds without SRS_SANITIZER_LOG, just pass the test EXPECT_TRUE(true); diff --git a/trunk/src/utest/srs_utest_protocol4.cpp b/trunk/src/utest/srs_utest_protocol4.cpp index a55580267..223500b50 100644 --- a/trunk/src/utest/srs_utest_protocol4.cpp +++ b/trunk/src/utest/srs_utest_protocol4.cpp @@ -1166,83 +1166,6 @@ VOID TEST(TCPConnectionTest, BufferedReadWriterWriteMethods) } } -VOID TEST(TCPConnectionTest, TcpConnectionTimeoutAndStats) -{ - srs_error_t err; - - // Test SrsTcpConnection timeout and statistics methods - if (true) { - SrsTestTcpServer server("127.0.0.1"); - HELPER_ASSERT_SUCCESS(server.start()); - - // Give server time to start - srs_usleep(1 * SRS_UTIME_MILLISECONDS); - - SrsTestTcpClient client("127.0.0.1", server.get_port()); - HELPER_ASSERT_SUCCESS(client.connect()); - - // Give time for connection to be established - srs_usleep(1 * SRS_UTIME_MILLISECONDS); - - SrsTcpConnection *server_conn = server.get_connection(); - SrsTcpConnection *client_conn = client.get_connection(); - - ASSERT_TRUE(server_conn != NULL); - ASSERT_TRUE(client_conn != NULL); - - // Test timeout methods - srs_utime_t original_recv_timeout = server_conn->get_recv_timeout(); - - // Set new receive timeout - srs_utime_t new_recv_timeout = 10 * SRS_UTIME_SECONDS; - server_conn->set_recv_timeout(new_recv_timeout); - - // Verify timeout was set correctly - EXPECT_EQ(new_recv_timeout, server_conn->get_recv_timeout()); - - // Test statistics methods - initial values - int64_t initial_recv_bytes = server_conn->get_recv_bytes(); - int64_t initial_send_bytes = server_conn->get_send_bytes(); - - // Client sends data to server to test recv statistics - string test_msg = "Hello TCP Connection Statistics!"; - ssize_t nwrite = 0; - HELPER_ASSERT_SUCCESS(client_conn->write((void *)test_msg.data(), test_msg.length(), &nwrite)); - EXPECT_EQ((ssize_t)test_msg.length(), nwrite); - - // Server reads data using read_fully - char read_buf[64]; - ssize_t nread = 0; - HELPER_ASSERT_SUCCESS(server_conn->read_fully(read_buf, test_msg.length(), &nread)); - EXPECT_EQ((ssize_t)test_msg.length(), nread); - EXPECT_STREQ(test_msg.c_str(), string(read_buf, nread).c_str()); - - // Check that recv bytes increased - int64_t final_recv_bytes = server_conn->get_recv_bytes(); - EXPECT_GT(final_recv_bytes, initial_recv_bytes); - - // Server sends response to test send statistics - string response_msg = "Response from server!"; - ssize_t nwrite_resp = 0; - HELPER_ASSERT_SUCCESS(server_conn->write((void *)response_msg.data(), response_msg.length(), &nwrite_resp)); - EXPECT_EQ((ssize_t)response_msg.length(), nwrite_resp); - - // Check that send bytes increased - int64_t final_send_bytes = server_conn->get_send_bytes(); - EXPECT_GT(final_send_bytes, initial_send_bytes); - - // Client reads response to verify - char response_buf[32]; - ssize_t nread_resp = 0; - HELPER_ASSERT_SUCCESS(client_conn->read_fully(response_buf, response_msg.length(), &nread_resp)); - EXPECT_EQ((ssize_t)response_msg.length(), nread_resp); - EXPECT_STREQ(response_msg.c_str(), string(response_buf, nread_resp).c_str()); - - // Restore original timeout - server_conn->set_recv_timeout(original_recv_timeout); - } -} - VOID TEST(TCPConnectionTest, TcpConnectionReadFully) { srs_error_t err; diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index ccfcb30a4..5eed8ccf6 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -1352,6 +1352,30 @@ MockConnectionManager::~MockConnectionManager() { } +srs_error_t MockConnectionManager::start() +{ + return srs_success; +} + +bool MockConnectionManager::empty() +{ + return true; +} + +size_t MockConnectionManager::size() +{ + return 0; +} + +void MockConnectionManager::add(ISrsResource * /*conn*/, bool * /*exists*/) +{ +} + +ISrsResource *MockConnectionManager::at(int /*index*/) +{ + return NULL; +} + void MockConnectionManager::remove(ISrsResource * /*c*/) { } diff --git a/trunk/src/utest/srs_utest_service.hpp b/trunk/src/utest/srs_utest_service.hpp index 3f7f729d1..e79b1cdff 100644 --- a/trunk/src/utest/srs_utest_service.hpp +++ b/trunk/src/utest/srs_utest_service.hpp @@ -88,6 +88,11 @@ class MockConnectionManager : public ISrsResourceManager public: MockConnectionManager(); virtual ~MockConnectionManager(); + virtual srs_error_t start(); + virtual bool empty(); + virtual size_t size(); + virtual void add(ISrsResource *conn, bool *exists = NULL); + virtual ISrsResource *at(int index); virtual void remove(ISrsResource *c); virtual void subscribe(ISrsDisposingHandler *h); virtual void unsubscribe(ISrsDisposingHandler *h);