diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh index 129f3d3d1..19bedc1ff 100755 --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -181,7 +181,6 @@ Configure to generate Makefile. Features: --https=on|off Whether enable HTTPS client and server. Default: $(value2switch $SRS_HTTPS) --utest=on|off Whether build the utest. Default: $(value2switch $SRS_UTEST) - --srt=on|off Whether build the SRT. Default: $(value2switch $SRS_SRT) --rtsp=on|off Whether build the RTSP (requires RTC). Default: $(value2switch $SRS_RTSP) --gb28181=on|off Whether build the GB28181. Default: $(value2switch $SRS_GB28181) --ffmpeg-fit=on|off Whether enable the FFmpeg fit(source code). Default: $(value2switch $SRS_FFMPEG_FIT) @@ -252,6 +251,7 @@ Experts: Deprecated: --h265=on Always enable the build for the HEVC(H.265) support. --rtc=on Always enable WebRTC support. Default: $(value2switch $SRS_RTC) + --srt=on|off Always enable SRT support. Default: $(value2switch $SRS_SRT) --single-thread=on Always force single thread mode. Default: $(value2switch $SRS_SINGLE_THREAD) --cross-build Enable cross-build, please set bellow Toolchain also. Default: $(value2switch $SRS_CROSS_BUILD) --hds=on|off Whether build the hds streaming, mux RTMP to F4M/F4V files. Default: $(value2switch $SRS_HDS) diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 833620fdb..51d5e4375 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 7.0 Changelog +* v7.0, 2025-08-31, Merge [#4461](https://github.com/ossrs/srs/pull/4461): AI: Extract shared components and improve SRS server architecture. v7.0.70 (#4461) * v7.0, 2025-08-31, Merge [#4460](https://github.com/ossrs/srs/pull/4460): AI: Always enable SRT protocol. v7.0.69 (#4460) * v7.0, 2025-08-31, Merge [#4459](https://github.com/ossrs/srs/pull/4459): AI: Merge SRT and RTC servers into unified SrsServer. v7.0.68 (#4459) * v7.0, 2025-08-29, Merge [#4457](https://github.com/ossrs/srs/pull/4457): Support IPv6 for all protocols: RTMP, HTTP/HTTPS, WebRTC, SRT, RTSP. v7.0.67 (#4457) diff --git a/trunk/src/app/srs_app_circuit_breaker.cpp b/trunk/src/app/srs_app_circuit_breaker.cpp index 520362eef..ca92e485d 100644 --- a/trunk/src/app/srs_app_circuit_breaker.cpp +++ b/trunk/src/app/srs_app_circuit_breaker.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -62,7 +63,7 @@ srs_error_t SrsCircuitBreaker::initialize() // Update the water level for circuit breaker. // @see SrsCircuitBreaker::on_timer() - _srs_server->timer1s()->subscribe(this); + _srs_shared_timer->timer1s()->subscribe(this); srs_trace("CircuitBreaker: enabled=%d, high=%dx%d, critical=%dx%d, dying=%dx%d", enabled_, high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index a3960dd41..fd86ba936 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -1005,3 +1005,5 @@ srs_error_t SrsSslConnection::writev(const iovec *iov, int iov_size, ssize_t *nw return err; } + +SrsResourceManager *_srs_conn_manager = NULL; diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 3bab31f5e..e38bcb457 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -328,4 +328,7 @@ public: virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t *nwrite); }; +// Manager for RTC connections. +extern SrsResourceManager *_srs_conn_manager; + #endif diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index fa4424391..6deac3b7f 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -408,12 +408,11 @@ std::string SrsGbSession::desc() return "GBS"; } -SrsGbListener::SrsGbListener(ISrsHttpServeMux *http_api_mux) +SrsGbListener::SrsGbListener() { conf_ = NULL; sip_listener_ = new SrsTcpListener(this); media_listener_ = new SrsTcpListener(this); - http_api_mux_ = http_api_mux; } SrsGbListener::~SrsGbListener() @@ -470,7 +469,8 @@ srs_error_t SrsGbListener::listen_api() { srs_error_t err = srs_success; - if ((err = http_api_mux_->handle("/gb/v1/publish/", new SrsGoApiGbPublish(conf_))) != srs_success) { + ISrsHttpServeMux *mux = _srs_server->api_server(); + if ((err = mux->handle("/gb/v1/publish/", new SrsGoApiGbPublish(conf_))) != srs_success) { return srs_error_wrap(err, "handle publish"); } diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index 080e5a70f..2b1ebbe99 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -219,10 +219,9 @@ private: SrsConfDirective *conf_; SrsTcpListener *media_listener_; SrsTcpListener *sip_listener_; - ISrsHttpServeMux *http_api_mux_; public: - SrsGbListener(ISrsHttpServeMux *http_api_mux); + SrsGbListener(); virtual ~SrsGbListener(); public: diff --git a/trunk/src/app/srs_app_hourglass.cpp b/trunk/src/app/srs_app_hourglass.cpp index e2e3c5cfd..342bb2386 100644 --- a/trunk/src/app/srs_app_hourglass.cpp +++ b/trunk/src/app/srs_app_hourglass.cpp @@ -240,3 +240,77 @@ srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval) return err; } + +SrsSharedTimer::SrsSharedTimer() +{ + timer20ms_ = NULL; + timer100ms_ = NULL; + timer1s_ = NULL; + timer5s_ = NULL; + clock_monitor_ = NULL; +} + +SrsSharedTimer::~SrsSharedTimer() +{ + srs_freep(timer20ms_); + srs_freep(timer100ms_); + srs_freep(timer1s_); + srs_freep(timer5s_); + srs_freep(clock_monitor_); +} + +srs_error_t SrsSharedTimer::initialize() +{ + srs_error_t err = srs_success; + + // Initialize global shared timers + timer20ms_ = new SrsFastTimer("shared", 20 * SRS_UTIME_MILLISECONDS); + timer100ms_ = new SrsFastTimer("shared", 100 * SRS_UTIME_MILLISECONDS); + timer1s_ = new SrsFastTimer("shared", 1 * SRS_UTIME_SECONDS); + timer5s_ = new SrsFastTimer("shared", 5 * SRS_UTIME_SECONDS); + clock_monitor_ = new SrsClockWallMonitor(); + + // Start all timers + if ((err = timer20ms_->start()) != srs_success) { + return srs_error_wrap(err, "start timer20ms"); + } + + if ((err = timer100ms_->start()) != srs_success) { + return srs_error_wrap(err, "start timer100ms"); + } + + if ((err = timer1s_->start()) != srs_success) { + return srs_error_wrap(err, "start timer1s"); + } + + if ((err = timer5s_->start()) != srs_success) { + return srs_error_wrap(err, "start timer5s"); + } + + // Register clock monitor to 20ms timer + timer20ms_->subscribe(clock_monitor_); + + return err; +} + +SrsFastTimer *SrsSharedTimer::timer20ms() +{ + return timer20ms_; +} + +SrsFastTimer *SrsSharedTimer::timer100ms() +{ + return timer100ms_; +} + +SrsFastTimer *SrsSharedTimer::timer1s() +{ + return timer1s_; +} + +SrsFastTimer *SrsSharedTimer::timer5s() +{ + return timer5s_; +} + +SrsSharedTimer *_srs_shared_timer = NULL; diff --git a/trunk/src/app/srs_app_hourglass.hpp b/trunk/src/app/srs_app_hourglass.hpp index 44d07edc3..c13f3ade0 100644 --- a/trunk/src/app/srs_app_hourglass.hpp +++ b/trunk/src/app/srs_app_hourglass.hpp @@ -144,4 +144,33 @@ private: srs_error_t on_timer(srs_utime_t interval); }; +// Global shared timer manager +class SrsSharedTimer +{ +private: + SrsFastTimer *timer20ms_; + SrsFastTimer *timer100ms_; + SrsFastTimer *timer1s_; + SrsFastTimer *timer5s_; + SrsClockWallMonitor *clock_monitor_; + +public: + SrsSharedTimer(); + virtual ~SrsSharedTimer(); + +public: + // Initialize and start all timers + srs_error_t initialize(); + +public: + // Access to global shared timers + SrsFastTimer *timer20ms(); + SrsFastTimer *timer100ms(); + SrsFastTimer *timer1s(); + SrsFastTimer *timer5s(); +}; + +// Global shared timer instance +extern SrsSharedTimer *_srs_shared_timer; + #endif diff --git a/trunk/src/app/srs_app_http_static.cpp b/trunk/src/app/srs_app_http_static.cpp index da5e6505f..45c5bc314 100644 --- a/trunk/src/app/srs_app_http_static.cpp +++ b/trunk/src/app/srs_app_http_static.cpp @@ -15,6 +15,7 @@ using namespace std; #include +#include #include #include #include @@ -60,13 +61,13 @@ void SrsHlsVirtualConn::expire() SrsHlsStream::SrsHlsStream() { - _srs_server->timer5s()->subscribe(this); + _srs_shared_timer->timer5s()->subscribe(this); security_ = new SrsSecurity(); } SrsHlsStream::~SrsHlsStream() { - _srs_server->timer5s()->unsubscribe(this); + _srs_shared_timer->timer5s()->unsubscribe(this); std::map::iterator it; for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) { diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index c444457d4..0f6622c2f 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -21,6 +21,7 @@ using namespace std; #include #include +#include #include #include #include @@ -442,7 +443,7 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection *s, const SrsContextId &cid) SrsRtcPlayStream::~SrsRtcPlayStream() { if (req_) { - session_->server_->exec_rtc_async_work(new SrsRtcAsyncCallOnStop(cid_, req_)); + session_->exec_->exec_rtc_async_work(new SrsRtcAsyncCallOnStop(cid_, req_)); } _srs_config->unsubscribe(this); @@ -927,12 +928,12 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream *p) : p_(p) { - _srs_server->timer1s()->subscribe(this); + _srs_shared_timer->timer1s()->subscribe(this); } SrsRtcPublishRtcpTimer::~SrsRtcPublishRtcpTimer() { - _srs_server->timer1s()->unsubscribe(this); + _srs_shared_timer->timer1s()->unsubscribe(this); } srs_error_t SrsRtcPublishRtcpTimer::on_timer(srs_utime_t interval) @@ -963,12 +964,12 @@ srs_error_t SrsRtcPublishRtcpTimer::on_timer(srs_utime_t interval) SrsRtcPublishTwccTimer::SrsRtcPublishTwccTimer(SrsRtcPublishStream *p) : p_(p) { - _srs_server->timer100ms()->subscribe(this); + _srs_shared_timer->timer100ms()->subscribe(this); } SrsRtcPublishTwccTimer::~SrsRtcPublishTwccTimer() { - _srs_server->timer100ms()->unsubscribe(this); + _srs_shared_timer->timer100ms()->unsubscribe(this); } srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval) @@ -1084,7 +1085,7 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection *session, const SrsCon SrsRtcPublishStream::~SrsRtcPublishStream() { if (req_) { - session_->server_->exec_rtc_async_work(new SrsRtcAsyncCallOnUnpublish(cid_, req_)); + session_->exec_->exec_rtc_async_work(new SrsRtcAsyncCallOnUnpublish(cid_, req_)); } srs_freep(timer_rtcp_); @@ -1722,12 +1723,12 @@ void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp &n SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(SrsRtcConnection *p) : p_(p) { - _srs_server->timer20ms()->subscribe(this); + _srs_shared_timer->timer20ms()->subscribe(this); } SrsRtcConnectionNackTimer::~SrsRtcConnectionNackTimer() { - _srs_server->timer20ms()->unsubscribe(this); + _srs_shared_timer->timer20ms()->unsubscribe(this); } srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval) @@ -1759,12 +1760,20 @@ srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval) return err; } -SrsRtcConnection::SrsRtcConnection(SrsServer *s, const SrsContextId &cid) +ISrsExecRtcAsyncTask::ISrsExecRtcAsyncTask() +{ +} + +ISrsExecRtcAsyncTask::~ISrsExecRtcAsyncTask() +{ +} + +SrsRtcConnection::SrsRtcConnection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid) { req_ = NULL; cid_ = cid; - server_ = s; + exec_ = exec; networks_ = new SrsRtcNetworks(this); cache_iov_ = new iovec(); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 62a017d00..40b3ce927 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -461,6 +461,17 @@ private: srs_error_t on_timer(srs_utime_t interval); }; +// The interface for RTC async task. +class ISrsExecRtcAsyncTask +{ +public: + ISrsExecRtcAsyncTask(); + virtual ~ISrsExecRtcAsyncTask(); + +public: + virtual srs_error_t exec_rtc_async_work(ISrsAsyncCallTask *t) = 0; +}; + // A RTC Peer Connection, SDP level object. // // For performance, we use non-public from resource, @@ -479,7 +490,7 @@ public: bool disposing_; private: - SrsServer *server_; + ISrsExecRtcAsyncTask *exec_; private: iovec *cache_iov_; @@ -529,7 +540,7 @@ private: bool nack_enabled_; public: - SrsRtcConnection(SrsServer *s, const SrsContextId &cid); + SrsRtcConnection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid); virtual ~SrsRtcConnection(); // interface ISrsDisposingHandler public: diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index aaba59f3c..35ecc4b86 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -12,6 +12,7 @@ using namespace std; #include +#include #include #include #include @@ -21,6 +22,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -306,3 +308,331 @@ SrsRtcUserConfig::~SrsRtcUserConfig() { srs_freep(req_); } + +SrsRtcSessionManager::SrsRtcSessionManager() +{ + rtc_async_ = new SrsAsyncCallWorker(); +} + +SrsRtcSessionManager::~SrsRtcSessionManager() +{ + rtc_async_->stop(); + srs_freep(rtc_async_); +} + +srs_error_t SrsRtcSessionManager::initialize() +{ + srs_error_t err = srs_success; + + if ((err = rtc_async_->start()) != srs_success) { + return srs_error_wrap(err, "start async worker"); + } + + return err; +} + +SrsRtcConnection *SrsRtcSessionManager::find_rtc_session_by_username(const std::string &username) +{ + ISrsResource *conn = _srs_conn_manager->find_by_name(username); + return dynamic_cast(conn); +} + +srs_error_t SrsRtcSessionManager::create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession) +{ + srs_error_t err = srs_success; + + ISrsRequest *req = ruc->req_; + + // Acquire stream publish token to prevent race conditions across all protocols. + SrsStreamPublishToken *publish_token_raw = NULL; + if (ruc->publish_ && (err = _srs_stream_publish_tokens->acquire_token(req, publish_token_raw)) != srs_success) { + return srs_error_wrap(err, "acquire stream publish token"); + } + SrsUniquePtr publish_token(publish_token_raw); + if (publish_token.get()) { + srs_trace("stream publish token acquired, type=rtc, url=%s", req->get_stream_url().c_str()); + } + + SrsSharedPtr source; + if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + if (ruc->publish_ && !source->can_publish()) { + return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str()); + } + + // TODO: FIXME: add do_create_session to error process. + SrsContextId cid = _srs_context->get_id(); + SrsRtcConnection *session = new SrsRtcConnection(this, cid); + if ((err = do_create_rtc_session(ruc, local_sdp, session)) != srs_success) { + srs_freep(session); + return srs_error_wrap(err, "create session"); + } + + *psession = session; + + return err; +} + +srs_error_t SrsRtcSessionManager::do_create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session) +{ + srs_error_t err = srs_success; + + ISrsRequest *req = ruc->req_; + + // first add publisher/player for negotiate sdp media info + if (ruc->publish_) { + if ((err = session->add_publisher(ruc, local_sdp)) != srs_success) { + return srs_error_wrap(err, "add publisher"); + } + } else { + if ((err = session->add_player(ruc, local_sdp)) != srs_success) { + return srs_error_wrap(err, "add player"); + } + } + + // All tracks default as inactive, so we must enable them. + session->set_all_tracks_status(req->get_stream_url(), ruc->publish_, true); + + std::string local_pwd = ruc->req_->ice_pwd_.empty() ? srs_rand_gen_str(32) : ruc->req_->ice_pwd_; + std::string local_ufrag = ruc->req_->ice_ufrag_.empty() ? srs_rand_gen_str(8) : ruc->req_->ice_ufrag_; + // TODO: FIXME: Rename for a better name, it's not an username. + std::string username = ""; + while (true) { + username = local_ufrag + ":" + ruc->remote_sdp_.get_ice_ufrag(); + if (!_srs_conn_manager->find_by_name(username)) { + break; + } + + // Username conflict, regenerate a new one. + local_ufrag = srs_rand_gen_str(8); + } + + local_sdp.set_ice_ufrag(local_ufrag); + local_sdp.set_ice_pwd(local_pwd); + local_sdp.set_fingerprint_algo("sha-256"); + local_sdp.set_fingerprint(_srs_rtc_dtls_certificate->get_fingerprint()); + + // We allows to mock the eip of server. + if (true) { + // TODO: Support multiple listen ports. + int udp_port = 0; + if (true) { + string udp_host; + string udp_hostport = _srs_config->get_rtc_server_listens().at(0); + srs_net_split_for_listener(udp_hostport, udp_host, udp_port); + } + + int tcp_port = 0; + if (true) { + string tcp_host; + string tcp_hostport = _srs_config->get_rtc_server_tcp_listens().at(0); + srs_net_split_for_listener(tcp_hostport, tcp_host, tcp_port); + } + + string protocol = _srs_config->get_rtc_server_protocol(); + + set candidates = discover_candidates(ruc); + for (set::iterator it = candidates.begin(); it != candidates.end(); ++it) { + string hostname; + int uport = udp_port; + srs_net_split_hostport(*it, hostname, uport); + int tport = tcp_port; + srs_net_split_hostport(*it, hostname, tport); + + if (protocol == "udp") { + local_sdp.add_candidate("udp", hostname, uport, "host"); + } else if (protocol == "tcp") { + local_sdp.add_candidate("tcp", hostname, tport, "host"); + } else { + local_sdp.add_candidate("udp", hostname, uport, "host"); + local_sdp.add_candidate("tcp", hostname, tport, "host"); + } + } + + vector v = vector(candidates.begin(), candidates.end()); + srs_trace("RTC: Use candidates %s, protocol=%s, tcp_port=%d, udp_port=%d", + srs_strings_join(v, ", ").c_str(), protocol.c_str(), tcp_port, udp_port); + } + + // Setup the negotiate DTLS by config. + local_sdp.session_negotiate_ = local_sdp.session_config_; + + // Setup the negotiate DTLS role. + if (ruc->remote_sdp_.get_dtls_role() == "active") { + local_sdp.session_negotiate_.dtls_role = "passive"; + } else if (ruc->remote_sdp_.get_dtls_role() == "passive") { + local_sdp.session_negotiate_.dtls_role = "active"; + } else if (ruc->remote_sdp_.get_dtls_role() == "actpass") { + local_sdp.session_negotiate_.dtls_role = local_sdp.session_config_.dtls_role; + } else { + // @see: https://tools.ietf.org/html/rfc4145#section-4.1 + // The default value of the setup attribute in an offer/answer exchange + // is 'active' in the offer and 'passive' in the answer. + local_sdp.session_negotiate_.dtls_role = "passive"; + } + local_sdp.set_dtls_role(local_sdp.session_negotiate_.dtls_role); + + session->set_remote_sdp(ruc->remote_sdp_); + // We must setup the local SDP, then initialize the session object. + session->set_local_sdp(local_sdp); + session->set_state_as_waiting_stun(); + + // Before session initialize, we must setup the local SDP. + if ((err = session->initialize(req, ruc->dtls_, ruc->srtp_, username)) != srs_success) { + return srs_error_wrap(err, "init"); + } + + // We allows username is optional, but it never empty here. + _srs_conn_manager->add_with_name(username, session); + + return err; +} + +void SrsRtcSessionManager::srs_update_rtc_sessions() +{ + // Alive RTC sessions, for stat. + int nn_rtc_conns = 0; + + // Check all sessions and dispose the dead sessions. + for (int i = 0; i < (int)_srs_conn_manager->size(); i++) { + SrsRtcConnection *session = dynamic_cast(_srs_conn_manager->at(i)); + // Ignore not session, or already disposing. + if (!session || session->disposing_) { + continue; + } + + // Update stat if session is alive. + if (session->is_alive()) { + nn_rtc_conns++; + continue; + } + + SrsContextRestore(_srs_context->get_id()); + session->switch_to_context(); + + string username = session->username(); + srs_trace("RTC: session destroy by timeout, username=%s", username.c_str()); + + // Use manager to free session and notify other objects. + _srs_conn_manager->remove(session); + } + + // Ignore stats if no RTC connections. + if (!nn_rtc_conns) { + return; + } + + static char buf[128]; + + string loss_desc; + SrsSnmpUdpStat *s = srs_get_udp_snmp_stat(); + if (s->rcv_buf_errors_delta || s->snd_buf_errors_delta) { + snprintf(buf, sizeof(buf), ", loss=(r:%d,s:%d)", s->rcv_buf_errors_delta, s->snd_buf_errors_delta); + loss_desc = buf; + } + + SrsKbsRtcStats stats; + srs_global_rtc_update(&stats); + + srs_trace("RTC: Server conns=%u%s%s%s%s%s%s%s", + nn_rtc_conns, + stats.rpkts_desc.c_str(), stats.spkts_desc.c_str(), stats.rtcp_desc.c_str(), stats.snk_desc.c_str(), + stats.rnk_desc.c_str(), loss_desc.c_str(), stats.fid_desc.c_str()); +} + +srs_error_t SrsRtcSessionManager::exec_rtc_async_work(ISrsAsyncCallTask *t) +{ + return rtc_async_->execute(t); +} + +srs_error_t SrsRtcSessionManager::on_udp_packet(SrsUdpMuxSocket *skt) +{ + srs_error_t err = srs_success; + + SrsRtcConnection *session = NULL; + char *data = skt->data(); + int size = skt->size(); + bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t *)data, size); + bool is_rtcp = srs_is_rtcp((uint8_t *)data, size); + + uint64_t fast_id = skt->fast_id(); + // Try fast id first, if not found, search by long peer id. + if (fast_id) { + session = (SrsRtcConnection *)_srs_conn_manager->find_by_fast_id(fast_id); + } + if (!session) { + string peer_id = skt->peer_id(); + session = (SrsRtcConnection *)_srs_conn_manager->find_by_id(peer_id); + } + + if (session) { + // When got any packet, the session is alive now. + session->alive(); + } + + // For STUN, the peer address may change. + if (!is_rtp_or_rtcp && srs_is_stun((uint8_t *)data, size)) { + ++_srs_pps_rstuns->sugar; + string peer_id = skt->peer_id(); + + // TODO: FIXME: Should support ICE renomination, to switch network between candidates. + SrsStunPacket ping; + if ((err = ping.decode(data, size)) != srs_success) { + return srs_error_wrap(err, "decode stun packet failed"); + } + if (!session) { + session = find_rtc_session_by_username(ping.get_username()); + } + if (session) { + session->switch_to_context(); + } + + srs_info("recv stun packet from %s, fast=%" PRId64 ", use-candidate=%d, ice-controlled=%d, ice-controlling=%d", + peer_id.c_str(), fast_id, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling()); + + // TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it. + if (!session) { + return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s, peer_id=%s, fast=%" PRId64, + ping.get_username().c_str(), peer_id.c_str(), fast_id); + } + + // For each binding request, update the UDP socket. + if (ping.is_binding_request()) { + session->udp()->update_sendonly_socket(skt); + } + + return session->udp()->on_stun(&ping, data, size); + } + + // For DTLS, RTCP or RTP, which does not support peer address changing. + if (!session) { + string peer_id = skt->peer_id(); + return srs_error_new(ERROR_RTC_STUN, "no session, peer_id=%s, fast=%" PRId64, peer_id.c_str(), fast_id); + } + + // Note that we don't(except error) switch to the context of session, for performance issue. + if (is_rtp_or_rtcp && !is_rtcp) { + ++_srs_pps_rrtps->sugar; + + err = session->udp()->on_rtp(data, size); + if (err != srs_success) { + session->switch_to_context(); + } + return err; + } + + session->switch_to_context(); + if (is_rtp_or_rtcp && is_rtcp) { + ++_srs_pps_rrtcps->sugar; + + return session->udp()->on_rtcp(data, size); + } + if (srs_is_dtls((uint8_t *)data, size)) { + ++_srs_pps_rstuns->sugar; + + return session->udp()->on_dtls(data, size); + } + return srs_error_new(ERROR_RTC_UDP, "unknown packet"); +} diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 887616932..43b97260e 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -26,6 +27,7 @@ class ISrsRequest; class SrsSdp; class SrsRtcSource; class SrsResourceManager; +class SrsAsyncCallWorker; // The UDP black hole, for developer to use wireshark to catch plaintext packets. // For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole, @@ -84,10 +86,39 @@ public: // Discover the candidates for RTC server. extern std::set discover_candidates(SrsRtcUserConfig *ruc); -// Manager for RTC connections. -extern SrsResourceManager *_srs_conn_manager; - // The dns resolve utility, return the resolved ip address. extern std::string srs_dns_resolve(std::string host, int &family); +// RTC session manager to handle WebRTC session lifecycle and management. +class SrsRtcSessionManager : public ISrsExecRtcAsyncTask +{ +private: + // WebRTC async call worker for non-blocking operations. + SrsAsyncCallWorker *rtc_async_; + +public: + SrsRtcSessionManager(); + virtual ~SrsRtcSessionManager(); + +public: + virtual srs_error_t initialize(); + +public: + virtual SrsRtcConnection *find_rtc_session_by_username(const std::string &ufrag); + virtual srs_error_t create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession); + +private: + virtual srs_error_t do_create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session); + +public: + virtual void srs_update_rtc_sessions(); + + // interface ISrsExecRtcAsyncTask +public: + virtual srs_error_t exec_rtc_async_work(ISrsAsyncCallTask *t); + +public: + virtual srs_error_t on_udp_packet(SrsUdpMuxSocket *skt); +}; + #endif diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index ccfa9b316..2ce2f8a4d 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -691,7 +692,7 @@ srs_error_t SrsRtcSource::on_publish() pli_for_rtmp_ = _srs_config->get_rtc_pli_for_rtmp(req->vhost); // @see SrsRtcSource::on_timer() - _srs_server->timer100ms()->subscribe(this); + _srs_shared_timer->timer100ms()->subscribe(this); } SrsStatistic *stat = SrsStatistic::instance(); @@ -725,7 +726,7 @@ void SrsRtcSource::on_unpublish() // free bridge resource if (bridge_) { // For SrsRtcSource::on_timer() - _srs_server->timer100ms()->unsubscribe(this); + _srs_shared_timer->timer100ms()->unsubscribe(this); #ifdef SRS_FFMPEG_FIT frame_builder_->on_unpublish(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 8e8273a03..d144c1137 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -24,6 +24,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -46,6 +47,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -65,200 +67,12 @@ SrsServer *_srs_server = NULL; SrsAsyncCallWorker *_srs_dvr_async = NULL; -SrsPps *_srs_pps_recvfrom = NULL; -SrsPps *_srs_pps_recvfrom_eagain = NULL; -SrsPps *_srs_pps_sendto = NULL; -SrsPps *_srs_pps_sendto_eagain = NULL; - -SrsPps *_srs_pps_read = NULL; -SrsPps *_srs_pps_read_eagain = NULL; -SrsPps *_srs_pps_readv = NULL; -SrsPps *_srs_pps_readv_eagain = NULL; -SrsPps *_srs_pps_writev = NULL; -SrsPps *_srs_pps_writev_eagain = NULL; - -SrsPps *_srs_pps_recvmsg = NULL; -SrsPps *_srs_pps_recvmsg_eagain = NULL; -SrsPps *_srs_pps_sendmsg = NULL; -SrsPps *_srs_pps_sendmsg_eagain = NULL; - -SrsPps *_srs_pps_clock_15ms = NULL; -SrsPps *_srs_pps_clock_20ms = NULL; -SrsPps *_srs_pps_clock_25ms = NULL; -SrsPps *_srs_pps_clock_30ms = NULL; -SrsPps *_srs_pps_clock_35ms = NULL; -SrsPps *_srs_pps_clock_40ms = NULL; -SrsPps *_srs_pps_clock_80ms = NULL; -SrsPps *_srs_pps_clock_160ms = NULL; -SrsPps *_srs_pps_timer_s = NULL; - -// External declarations for WebRTC functions and variables -extern bool srs_is_stun(const uint8_t *data, size_t size); -extern bool srs_is_dtls(const uint8_t *data, size_t len); -extern bool srs_is_rtp_or_rtcp(const uint8_t *data, size_t len); -extern bool srs_is_rtcp(const uint8_t *data, size_t len); - -extern SrsPps *_srs_pps_rpkts; -SrsPps *_srs_pps_rstuns = NULL; -SrsPps *_srs_pps_rrtps = NULL; -SrsPps *_srs_pps_rrtcps = NULL; -extern SrsPps *_srs_pps_addrs; -extern SrsPps *_srs_pps_fast_addrs; - -extern SrsPps *_srs_pps_spkts; -extern SrsPps *_srs_pps_sstuns; -extern SrsPps *_srs_pps_srtcps; -extern SrsPps *_srs_pps_srtps; - -extern SrsPps *_srs_pps_ids; -extern SrsPps *_srs_pps_fids; -extern SrsPps *_srs_pps_fids_level0; -extern SrsPps *_srs_pps_dispose; - -extern SrsPps *_srs_pps_timer; -extern SrsPps *_srs_pps_pub; -extern SrsPps *_srs_pps_conn; - -extern SrsPps *_srs_pps_cids_get; -extern SrsPps *_srs_pps_cids_set; - -extern SrsPps *_srs_pps_snack3; -extern SrsPps *_srs_pps_snack4; -extern SrsPps *_srs_pps_aloss2; - extern SrsStageManager *_srs_stages; extern srs_error_t _srs_reload_err; extern SrsReloadState _srs_reload_state; extern std::string _srs_reload_id; -// Clock and timing statistics -extern SrsPps *_srs_pps_clock_15ms; -extern SrsPps *_srs_pps_clock_20ms; -extern SrsPps *_srs_pps_clock_25ms; -extern SrsPps *_srs_pps_clock_30ms; -extern SrsPps *_srs_pps_clock_35ms; -extern SrsPps *_srs_pps_clock_40ms; -extern SrsPps *_srs_pps_clock_80ms; -extern SrsPps *_srs_pps_clock_160ms; -extern SrsPps *_srs_pps_timer_s; - -// Object statistics -extern SrsPps *_srs_pps_objs_rtps; -extern SrsPps *_srs_pps_objs_rraw; -extern SrsPps *_srs_pps_objs_rfua; -extern SrsPps *_srs_pps_objs_rbuf; -extern SrsPps *_srs_pps_objs_msgs; -extern SrsPps *_srs_pps_objs_rothers; - -SrsPps *_srs_pps_aloss2 = NULL; - -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) -SrsPps *_srs_pps_thread_run = NULL; -SrsPps *_srs_pps_thread_idle = NULL; -SrsPps *_srs_pps_thread_yield = NULL; -SrsPps *_srs_pps_thread_yield2 = NULL; - -// Debug statistics for I/O operations -extern SrsPps *_srs_pps_recvfrom; -extern SrsPps *_srs_pps_recvfrom_eagain; -extern SrsPps *_srs_pps_sendto; -extern SrsPps *_srs_pps_sendto_eagain; - -extern SrsPps *_srs_pps_read; -extern SrsPps *_srs_pps_read_eagain; -extern SrsPps *_srs_pps_readv; -extern SrsPps *_srs_pps_readv_eagain; -extern SrsPps *_srs_pps_writev; -extern SrsPps *_srs_pps_writev_eagain; - -extern SrsPps *_srs_pps_recvmsg; -extern SrsPps *_srs_pps_recvmsg_eagain; -extern SrsPps *_srs_pps_sendmsg; -extern SrsPps *_srs_pps_sendmsg_eagain; - -extern SrsPps *_srs_pps_epoll; -extern SrsPps *_srs_pps_epoll_zero; -extern SrsPps *_srs_pps_epoll_shake; -extern SrsPps *_srs_pps_epoll_spin; - -extern SrsPps *_srs_pps_sched_160ms; -extern SrsPps *_srs_pps_sched_s; -extern SrsPps *_srs_pps_sched_15ms; -extern SrsPps *_srs_pps_sched_20ms; -extern SrsPps *_srs_pps_sched_25ms; -extern SrsPps *_srs_pps_sched_30ms; -extern SrsPps *_srs_pps_sched_35ms; -extern SrsPps *_srs_pps_sched_40ms; -extern SrsPps *_srs_pps_sched_80ms; - -extern SrsPps *_srs_pps_thread_run; -extern SrsPps *_srs_pps_thread_idle; -extern SrsPps *_srs_pps_thread_yield; -extern SrsPps *_srs_pps_thread_yield2; - -// External ST statistics -extern __thread unsigned long long _st_stat_recvfrom; -extern __thread unsigned long long _st_stat_recvfrom_eagain; -extern __thread unsigned long long _st_stat_sendto; -extern __thread unsigned long long _st_stat_sendto_eagain; - -extern __thread unsigned long long _st_stat_read; -extern __thread unsigned long long _st_stat_read_eagain; -extern __thread unsigned long long _st_stat_readv; -extern __thread unsigned long long _st_stat_readv_eagain; -extern __thread unsigned long long _st_stat_writev; -extern __thread unsigned long long _st_stat_writev_eagain; - -extern __thread unsigned long long _st_stat_recvmsg; -extern __thread unsigned long long _st_stat_recvmsg_eagain; -extern __thread unsigned long long _st_stat_sendmsg; -extern __thread unsigned long long _st_stat_sendmsg_eagain; - -extern __thread unsigned long long _st_stat_epoll; -extern __thread unsigned long long _st_stat_epoll_zero; -extern __thread unsigned long long _st_stat_epoll_shake; -extern __thread unsigned long long _st_stat_epoll_spin; - -extern __thread unsigned long long _st_stat_sched_15ms; -extern __thread unsigned long long _st_stat_sched_20ms; -extern __thread unsigned long long _st_stat_sched_25ms; -extern __thread unsigned long long _st_stat_sched_30ms; -extern __thread unsigned long long _st_stat_sched_35ms; -extern __thread unsigned long long _st_stat_sched_40ms; -extern __thread unsigned long long _st_stat_sched_80ms; -extern __thread unsigned long long _st_stat_sched_160ms; -extern __thread unsigned long long _st_stat_sched_s; - -extern __thread int _st_active_count; -extern __thread int _st_num_free_stacks; - -extern __thread unsigned long long _st_stat_thread_run; -extern __thread unsigned long long _st_stat_thread_idle; -extern __thread unsigned long long _st_stat_thread_yield; -extern __thread unsigned long long _st_stat_thread_yield2; -#endif - -extern SrsPps *_srs_pps_pli; -extern SrsPps *_srs_pps_twcc; -extern SrsPps *_srs_pps_rr; - -extern SrsPps *_srs_pps_snack; -extern SrsPps *_srs_pps_snack2; -extern SrsPps *_srs_pps_sanack; -extern SrsPps *_srs_pps_svnack; - -extern SrsPps *_srs_pps_rnack; -extern SrsPps *_srs_pps_rnack2; -extern SrsPps *_srs_pps_rhnack; -extern SrsPps *_srs_pps_rmnack; - -extern SrsPps *_srs_pps_sstuns; -extern SrsPps *_srs_pps_srtcps; -extern SrsPps *_srs_pps_srtps; - -SrsResourceManager *_srs_conn_manager = NULL; - // External WebRTC global variables extern SrsRtcBlackhole *_srs_blackhole; extern SrsDtlsCertificate *_srs_rtc_dtls_certificate; @@ -272,18 +86,22 @@ srs_error_t srs_global_initialize() _srs_context = new SrsThreadContext(); _srs_config = new SrsConfig(); - // The clock wall object. - _srs_clock = new SrsWallClock(); - - // The pps cids depends by st init. - _srs_pps_cids_get = new SrsPps(); - _srs_pps_cids_set = new SrsPps(); + // Initialize the global kbps statistics variables + if ((err = srs_global_kbps_initialize()) != srs_success) { + return srs_error_wrap(err, "global kbps initialize"); + } // Initialize ST, which depends on pps cids. if ((err = srs_st_init()) != srs_success) { return srs_error_wrap(err, "initialize st failed"); } + // Initialize global shared timer, which depends on ST + _srs_shared_timer = new SrsSharedTimer(); + if ((err = _srs_shared_timer->initialize()) != srs_success) { + return srs_error_wrap(err, "initialize shared timer"); + } + // The global objects which depends on ST. // Initialize _srs_stages first as it's needed by SrsServer constructor _srs_stages = new SrsStageManager(); @@ -309,106 +127,6 @@ srs_error_t srs_global_initialize() _srs_gb_manager = new SrsResourceManager("GB", true); #endif - // Initialize global pps, which depends on _srs_clock - _srs_pps_ids = new SrsPps(); - _srs_pps_fids = new SrsPps(); - _srs_pps_fids_level0 = new SrsPps(); - _srs_pps_dispose = new SrsPps(); - - _srs_pps_timer = new SrsPps(); - _srs_pps_conn = new SrsPps(); - _srs_pps_pub = new SrsPps(); - - _srs_pps_snack = new SrsPps(); - _srs_pps_snack2 = new SrsPps(); - _srs_pps_snack3 = new SrsPps(); - _srs_pps_snack4 = new SrsPps(); - _srs_pps_sanack = new SrsPps(); - _srs_pps_svnack = new SrsPps(); - - _srs_pps_rnack = new SrsPps(); - _srs_pps_rnack2 = new SrsPps(); - _srs_pps_rhnack = new SrsPps(); - _srs_pps_rmnack = new SrsPps(); - -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) - _srs_pps_recvfrom = new SrsPps(); - _srs_pps_recvfrom_eagain = new SrsPps(); - _srs_pps_sendto = new SrsPps(); - _srs_pps_sendto_eagain = new SrsPps(); - - _srs_pps_read = new SrsPps(); - _srs_pps_read_eagain = new SrsPps(); - _srs_pps_readv = new SrsPps(); - _srs_pps_readv_eagain = new SrsPps(); - _srs_pps_writev = new SrsPps(); - _srs_pps_writev_eagain = new SrsPps(); - - _srs_pps_recvmsg = new SrsPps(); - _srs_pps_recvmsg_eagain = new SrsPps(); - _srs_pps_sendmsg = new SrsPps(); - _srs_pps_sendmsg_eagain = new SrsPps(); - - _srs_pps_epoll = new SrsPps(); - _srs_pps_epoll_zero = new SrsPps(); - _srs_pps_epoll_shake = new SrsPps(); - _srs_pps_epoll_spin = new SrsPps(); - - _srs_pps_sched_15ms = new SrsPps(); - _srs_pps_sched_20ms = new SrsPps(); - _srs_pps_sched_25ms = new SrsPps(); - _srs_pps_sched_30ms = new SrsPps(); - _srs_pps_sched_35ms = new SrsPps(); - _srs_pps_sched_40ms = new SrsPps(); - _srs_pps_sched_80ms = new SrsPps(); - _srs_pps_sched_160ms = new SrsPps(); - _srs_pps_sched_s = new SrsPps(); -#endif - - _srs_pps_clock_15ms = new SrsPps(); - _srs_pps_clock_20ms = new SrsPps(); - _srs_pps_clock_25ms = new SrsPps(); - _srs_pps_clock_30ms = new SrsPps(); - _srs_pps_clock_35ms = new SrsPps(); - _srs_pps_clock_40ms = new SrsPps(); - _srs_pps_clock_80ms = new SrsPps(); - _srs_pps_clock_160ms = new SrsPps(); - _srs_pps_timer_s = new SrsPps(); - -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) - _srs_pps_thread_run = new SrsPps(); - _srs_pps_thread_idle = new SrsPps(); - _srs_pps_thread_yield = new SrsPps(); - _srs_pps_thread_yield2 = new SrsPps(); -#endif - - _srs_pps_rpkts = new SrsPps(); - _srs_pps_addrs = new SrsPps(); - _srs_pps_fast_addrs = new SrsPps(); - - _srs_pps_spkts = new SrsPps(); - _srs_pps_objs_msgs = new SrsPps(); - - _srs_pps_sstuns = new SrsPps(); - _srs_pps_srtcps = new SrsPps(); - _srs_pps_srtps = new SrsPps(); - - _srs_pps_rstuns = new SrsPps(); - _srs_pps_rrtps = new SrsPps(); - _srs_pps_rrtcps = new SrsPps(); - - _srs_pps_aloss2 = new SrsPps(); - - _srs_pps_pli = new SrsPps(); - _srs_pps_twcc = new SrsPps(); - _srs_pps_rr = new SrsPps(); - - _srs_pps_objs_rtps = new SrsPps(); - _srs_pps_objs_rraw = new SrsPps(); - _srs_pps_objs_rfua = new SrsPps(); - _srs_pps_objs_rbuf = new SrsPps(); - _srs_pps_objs_rothers = new SrsPps(); - // Create global async worker for DVR. _srs_dvr_async = new SrsAsyncCallWorker(); @@ -419,292 +137,6 @@ srs_error_t srs_global_initialize() return err; } -ISrsSrtClientHandler::ISrsSrtClientHandler() -{ -} - -ISrsSrtClientHandler::~ISrsSrtClientHandler() -{ -} - -srs_error_t ISrsSrtClientHandler::accept_srt_client(srs_srt_t srt_fd) -{ - return srs_success; -} - -SrsSignalManager *SrsSignalManager::instance = NULL; - -SrsSignalManager::SrsSignalManager(SrsServer *s) -{ - SrsSignalManager::instance = this; - - server = s; - sig_pipe[0] = sig_pipe[1] = -1; - trd = new SrsSTCoroutine("signal", this, _srs_context->get_id()); - signal_read_stfd = NULL; -} - -SrsSignalManager::~SrsSignalManager() -{ - srs_freep(trd); - - srs_close_stfd(signal_read_stfd); - - if (sig_pipe[0] > 0) { - ::close(sig_pipe[0]); - } - if (sig_pipe[1] > 0) { - ::close(sig_pipe[1]); - } -} - -srs_error_t SrsSignalManager::initialize() -{ - /* Create signal pipe */ - if (pipe(sig_pipe) < 0) { - return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe"); - } - - if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) { - return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "open pipe"); - } - - return srs_success; -} - -srs_error_t SrsSignalManager::start() -{ - srs_error_t err = srs_success; - - /** - * Note that if multiple processes are used (see below), - * the signal pipe should be initialized after the fork(2) call - * so that each process has its own private pipe. - */ - struct sigaction sa; - - /* Install sig_catcher() as a signal handler */ - sa.sa_handler = SrsSignalManager::sig_catcher; - sigemptyset(&sa.sa_mask); - sa.sa_flags = 0; - sigaction(SRS_SIGNAL_RELOAD, &sa, NULL); - - sa.sa_handler = SrsSignalManager::sig_catcher; - sigemptyset(&sa.sa_mask); - sa.sa_flags = 0; - sigaction(SRS_SIGNAL_FAST_QUIT, &sa, NULL); - - sa.sa_handler = SrsSignalManager::sig_catcher; - sigemptyset(&sa.sa_mask); - sa.sa_flags = 0; - sigaction(SRS_SIGNAL_GRACEFULLY_QUIT, &sa, NULL); - - sa.sa_handler = SrsSignalManager::sig_catcher; - sigemptyset(&sa.sa_mask); - sa.sa_flags = 0; - sigaction(SRS_SIGNAL_ASSERT_ABORT, &sa, NULL); - - sa.sa_handler = SrsSignalManager::sig_catcher; - sigemptyset(&sa.sa_mask); - sa.sa_flags = 0; - sigaction(SIGINT, &sa, NULL); - - sa.sa_handler = SrsSignalManager::sig_catcher; - sigemptyset(&sa.sa_mask); - sa.sa_flags = 0; - sigaction(SRS_SIGNAL_REOPEN_LOG, &sa, NULL); - - srs_trace("signal installed, reload=%d, reopen=%d, fast_quit=%d, grace_quit=%d", - SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_FAST_QUIT, SRS_SIGNAL_GRACEFULLY_QUIT); - - if ((err = trd->start()) != srs_success) { - return srs_error_wrap(err, "signal manager"); - } - - return err; -} - -srs_error_t SrsSignalManager::cycle() -{ - srs_error_t err = srs_success; - - while (true) { - if ((err = trd->pull()) != srs_success) { - return srs_error_wrap(err, "signal manager"); - } - - int signo; - - /* Read the next signal from the pipe */ - srs_read(signal_read_stfd, &signo, sizeof(int), SRS_UTIME_NO_TIMEOUT); - - /* Process signal synchronously */ - server->on_signal(signo); - } - - return err; -} - -void SrsSignalManager::sig_catcher(int signo) -{ - int err; - - /* Save errno to restore it after the write() */ - err = errno; - - /* write() is reentrant/async-safe */ - int fd = SrsSignalManager::instance->sig_pipe[1]; - write(fd, &signo, sizeof(int)); - - errno = err; -} - -// Whether we are in docker, defined in main module. -extern bool _srs_in_docker; - -SrsInotifyWorker::SrsInotifyWorker(SrsServer *s) -{ - server = s; - trd = new SrsSTCoroutine("inotify", this); - inotify_fd = NULL; -} - -SrsInotifyWorker::~SrsInotifyWorker() -{ - srs_freep(trd); - srs_close_stfd(inotify_fd); -} - -srs_error_t SrsInotifyWorker::start() -{ - srs_error_t err = srs_success; - -#if !defined(SRS_OSX) && !defined(SRS_CYGWIN64) - // Whether enable auto reload config. - bool auto_reload = _srs_config->inotify_auto_reload(); - if (!auto_reload && _srs_in_docker && _srs_config->auto_reload_for_docker()) { - srs_warn("enable auto reload for docker"); - auto_reload = true; - } - - if (!auto_reload) { - return err; - } - - // Create inotify to watch config file. - int fd = ::inotify_init1(IN_NONBLOCK); - if (fd < 0) { - return srs_error_new(ERROR_INOTIFY_CREATE, "create inotify"); - } - - // Open as stfd to read by ST. - if ((inotify_fd = srs_netfd_open(fd)) == NULL) { - ::close(fd); - return srs_error_new(ERROR_INOTIFY_OPENFD, "open fd=%d", fd); - } - - if (((err = srs_fd_closeexec(fd))) != srs_success) { - return srs_error_wrap(err, "closeexec fd=%d", fd); - } - - // /* the following are legal, implemented events that user-space can watch for */ - // #define IN_ACCESS 0x00000001 /* File was accessed */ - // #define IN_MODIFY 0x00000002 /* File was modified */ - // #define IN_ATTRIB 0x00000004 /* Metadata changed */ - // #define IN_CLOSE_WRITE 0x00000008 /* Writtable file was closed */ - // #define IN_CLOSE_NOWRITE 0x00000010 /* Unwrittable file closed */ - // #define IN_OPEN 0x00000020 /* File was opened */ - // #define IN_MOVED_FROM 0x00000040 /* File was moved from X */ - // #define IN_MOVED_TO 0x00000080 /* File was moved to Y */ - // #define IN_CREATE 0x00000100 /* Subfile was created */ - // #define IN_DELETE 0x00000200 /* Subfile was deleted */ - // #define IN_DELETE_SELF 0x00000400 /* Self was deleted */ - // #define IN_MOVE_SELF 0x00000800 /* Self was moved */ - // - // /* the following are legal events. they are sent as needed to any watch */ - // #define IN_UNMOUNT 0x00002000 /* Backing fs was unmounted */ - // #define IN_Q_OVERFLOW 0x00004000 /* Event queued overflowed */ - // #define IN_IGNORED 0x00008000 /* File was ignored */ - // - // /* helper events */ - // #define IN_CLOSE (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE) /* close */ - // #define IN_MOVE (IN_MOVED_FROM | IN_MOVED_TO) /* moves */ - // - // /* special flags */ - // #define IN_ONLYDIR 0x01000000 /* only watch the path if it is a directory */ - // #define IN_DONT_FOLLOW 0x02000000 /* don't follow a sym link */ - // #define IN_EXCL_UNLINK 0x04000000 /* exclude events on unlinked objects */ - // #define IN_MASK_ADD 0x20000000 /* add to the mask of an already existing watch */ - // #define IN_ISDIR 0x40000000 /* event occurred against dir */ - // #define IN_ONESHOT 0x80000000 /* only send event once */ - - // Watch the config directory events. - string config_dir = srs_path_filepath_dir(_srs_config->config()); - uint32_t mask = IN_MODIFY | IN_CREATE | IN_MOVED_TO; - int watch_conf = 0; - if ((watch_conf = ::inotify_add_watch(fd, config_dir.c_str(), mask)) < 0) { - return srs_error_new(ERROR_INOTIFY_WATCH, "watch file=%s, fd=%d, watch=%d, mask=%#x", - config_dir.c_str(), fd, watch_conf, mask); - } - srs_trace("auto reload watching fd=%d, watch=%d, file=%s", fd, watch_conf, config_dir.c_str()); - - if ((err = trd->start()) != srs_success) { - return srs_error_wrap(err, "inotify"); - } -#endif - - return err; -} - -srs_error_t SrsInotifyWorker::cycle() -{ - srs_error_t err = srs_success; - -#if !defined(SRS_OSX) && !defined(SRS_CYGWIN64) - string config_path = _srs_config->config(); - string config_file = srs_path_filepath_base(config_path); - string k8s_file = "..data"; - - while (true) { - char buf[4096]; - ssize_t nn = srs_read(inotify_fd, buf, (size_t)sizeof(buf), SRS_UTIME_NO_TIMEOUT); - if (nn < 0) { - srs_warn("inotify ignore read failed, nn=%d", (int)nn); - break; - } - - // Whether config file changed. - bool do_reload = false; - - // Parse all inotify events. - inotify_event *ie = NULL; - for (char *ptr = buf; ptr < buf + nn; ptr += sizeof(inotify_event) + ie->len) { - ie = (inotify_event *)ptr; - - if (!ie->len || !ie->name) { - continue; - } - - string name = ie->name; - if ((name == k8s_file || name == config_file) && ie->mask & (IN_MODIFY | IN_CREATE | IN_MOVED_TO)) { - do_reload = true; - } - - srs_trace("inotify event wd=%d, mask=%#x, len=%d, name=%s, reload=%d", ie->wd, ie->mask, ie->len, ie->name, do_reload); - } - - // Notify server to do reload. - if (do_reload && srs_path_exists(config_path)) { - server->on_signal(SRS_SIGNAL_RELOAD); - } - - srs_usleep(3000 * SRS_UTIME_MILLISECONDS); - } -#endif - - return err; -} - SrsServer::SrsServer() { signal_reload_ = false; @@ -712,7 +144,8 @@ SrsServer::SrsServer() signal_gmc_stop_ = false; signal_fast_quit_ = false; signal_gracefully_quit_ = false; - pid_fd_ = -1; + + pid_file_locker_ = new SrsPidFileLocker(); signal_manager_ = new SrsSignalManager(this); latest_version_ = new SrsLatestVersion(); @@ -734,7 +167,7 @@ SrsServer::SrsServer() stream_caster_mpegts_ = new SrsUdpCasterListener(); exporter_listener_ = new SrsTcpListener(this); #ifdef SRS_GB28181 - stream_caster_gb28181_ = new SrsGbListener(http_api_mux_); + stream_caster_gb28181_ = new SrsGbListener(); #endif http_server_ = new SrsHttpServer(this); @@ -745,33 +178,14 @@ SrsServer::SrsServer() ingester_ = new SrsIngester(); timer_ = NULL; - // Initialize global shared timers moved from SrsHybridServer - timer20ms_ = new SrsFastTimer("server", 20 * SRS_UTIME_MILLISECONDS); - timer100ms_ = new SrsFastTimer("server", 100 * SRS_UTIME_MILLISECONDS); - timer1s_ = new SrsFastTimer("server", 1 * SRS_UTIME_SECONDS); - timer5s_ = new SrsFastTimer("server", 5 * SRS_UTIME_SECONDS); - clock_monitor_ = new SrsClockWallMonitor(); - // Initialize WebRTC components - rtc_async_ = new SrsAsyncCallWorker(); + rtc_session_manager_ = new SrsRtcSessionManager(); } SrsServer::~SrsServer() -{ - destroy(); -} - -void SrsServer::destroy() { srs_freep(timer_); - // Free global shared timers - srs_freep(timer20ms_); - srs_freep(timer100ms_); - srs_freep(timer1s_); - srs_freep(timer5s_); - srs_freep(clock_monitor_); - dispose(); // If api reuse the same port of server, they're the same object. @@ -783,10 +197,7 @@ void SrsServer::destroy() srs_freep(http_heartbeat_); srs_freep(ingester_); - if (pid_fd_ > 0) { - ::close(pid_fd_); - pid_fd_ = -1; - } + srs_freep(pid_file_locker_); srs_freep(signal_manager_); srs_freep(latest_version_); @@ -818,10 +229,7 @@ void SrsServer::destroy() rtc_listeners_.clear(); } - if (rtc_async_) { - rtc_async_->stop(); - srs_freep(rtc_async_); - } + srs_freep(rtc_session_manager_); } void SrsServer::dispose() @@ -908,6 +316,11 @@ void SrsServer::gracefully_dispose() srs_trace("final wait for %dms", srsu2msi(_srs_config->get_grace_final_wait())); } +ISrsHttpServeMux *SrsServer::api_server() +{ + return http_api_mux_; +} + srs_error_t SrsServer::initialize() { srs_error_t err = srs_success; @@ -915,7 +328,7 @@ srs_error_t SrsServer::initialize() srs_trace("SRS server initialized in single thread mode"); // Initialize the server. - if ((err = acquire_pid_file()) != srs_success) { + if ((err = pid_file_locker_->acquire()) != srs_success) { return srs_error_wrap(err, "init server"); } @@ -998,90 +411,13 @@ srs_error_t SrsServer::initialize() } // Start WebRTC async worker - rtc_async_->start(); - - // Start global shared timers - if ((err = timer20ms_->start()) != srs_success) { - return srs_error_wrap(err, "start timer20ms"); + if ((err = rtc_session_manager_->initialize()) != srs_success) { + return srs_error_wrap(err, "rtc session manager"); } - if ((err = timer100ms_->start()) != srs_success) { - return srs_error_wrap(err, "start timer100ms"); - } - - if ((err = timer1s_->start()) != srs_success) { - return srs_error_wrap(err, "start timer1s"); - } - - if ((err = timer5s_->start()) != srs_success) { - return srs_error_wrap(err, "start timer5s"); - } - - // Register clock monitor to 20ms timer and statistics reporting to 5s timer - timer20ms_->subscribe(clock_monitor_); - timer5s_->subscribe(this); - return err; } -srs_error_t SrsServer::acquire_pid_file() -{ - std::string pid_file = _srs_config->get_pid_file(); - - // -rw-r--r-- - // 644 - int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; - - int fd; - // open pid file - if ((fd = ::open(pid_file.c_str(), O_WRONLY | O_CREAT, mode)) == -1) { - return srs_error_new(ERROR_SYSTEM_PID_ACQUIRE, "open pid file=%s", pid_file.c_str()); - } - - // require write lock - struct flock lock; - - lock.l_type = F_WRLCK; // F_RDLCK, F_WRLCK, F_UNLCK - lock.l_start = 0; // type offset, relative to l_whence - lock.l_whence = SEEK_SET; // SEEK_SET, SEEK_CUR, SEEK_END - lock.l_len = 0; - - if (fcntl(fd, F_SETLK, &lock) == -1) { - if (errno == EACCES || errno == EAGAIN) { - ::close(fd); - srs_error("srs is already running!"); - return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running"); - } - return srs_error_new(ERROR_SYSTEM_PID_LOCK, "access to pid=%s", pid_file.c_str()); - } - - // truncate file - if (ftruncate(fd, 0) != 0) { - return srs_error_new(ERROR_SYSTEM_PID_TRUNCATE_FILE, "truncate pid file=%s", pid_file.c_str()); - } - - // write the pid - string pid = srs_strconv_format_int(getpid()); - if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) { - return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%s to file=%s", pid.c_str(), pid_file.c_str()); - } - - // auto close when fork child process. - int val; - if ((val = fcntl(fd, F_GETFD, 0)) < 0) { - return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fcntl fd=%d", fd); - } - val |= FD_CLOEXEC; - if (fcntl(fd, F_SETFD, val) < 0) { - return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "lock file=%s fd=%d", pid_file.c_str(), fd); - } - - srs_trace("write pid=%s to %s success!", pid.c_str(), pid_file.c_str()); - pid_fd_ = fd; - - return srs_success; -} - srs_error_t SrsServer::run() { srs_error_t err = srs_success; @@ -1692,6 +1028,10 @@ srs_error_t SrsServer::setup_ticks() if ((err = timer_->tick(11, 5 * SRS_UTIME_SECONDS)) != srs_success) { return srs_error_wrap(err, "tick"); } + + if ((err = timer_->tick(12, 5 * SRS_UTIME_SECONDS)) != srs_success) { + return srs_error_wrap(err, "tick"); + } } if (_srs_config->get_heartbeat_enabled()) { @@ -1737,7 +1077,10 @@ srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick) srs_update_udp_snmp_statistic(); break; case 11: - srs_update_rtc_sessions(); + rtc_session_manager_->srs_update_rtc_sessions(); + break; + case 12: + srs_update_server_statistics(); break; } @@ -1947,92 +1290,7 @@ srs_error_t SrsServer::listen_rtc_udp() srs_error_t SrsServer::on_udp_packet(SrsUdpMuxSocket *skt) { - srs_error_t err = srs_success; - - SrsRtcConnection *session = NULL; - char *data = skt->data(); - int size = skt->size(); - bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t *)data, size); - bool is_rtcp = srs_is_rtcp((uint8_t *)data, size); - - uint64_t fast_id = skt->fast_id(); - // Try fast id first, if not found, search by long peer id. - if (fast_id) { - session = (SrsRtcConnection *)_srs_conn_manager->find_by_fast_id(fast_id); - } - if (!session) { - string peer_id = skt->peer_id(); - session = (SrsRtcConnection *)_srs_conn_manager->find_by_id(peer_id); - } - - if (session) { - // When got any packet, the session is alive now. - session->alive(); - } - - // For STUN, the peer address may change. - if (!is_rtp_or_rtcp && srs_is_stun((uint8_t *)data, size)) { - ++_srs_pps_rstuns->sugar; - string peer_id = skt->peer_id(); - - // TODO: FIXME: Should support ICE renomination, to switch network between candidates. - SrsStunPacket ping; - if ((err = ping.decode(data, size)) != srs_success) { - return srs_error_wrap(err, "decode stun packet failed"); - } - if (!session) { - session = find_rtc_session_by_username(ping.get_username()); - } - if (session) { - session->switch_to_context(); - } - - srs_info("recv stun packet from %s, fast=%" PRId64 ", use-candidate=%d, ice-controlled=%d, ice-controlling=%d", - peer_id.c_str(), fast_id, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling()); - - // TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it. - if (!session) { - return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s, peer_id=%s, fast=%" PRId64, - ping.get_username().c_str(), peer_id.c_str(), fast_id); - } - - // For each binding request, update the UDP socket. - if (ping.is_binding_request()) { - session->udp()->update_sendonly_socket(skt); - } - - return session->udp()->on_stun(&ping, data, size); - } - - // For DTLS, RTCP or RTP, which does not support peer address changing. - if (!session) { - string peer_id = skt->peer_id(); - return srs_error_new(ERROR_RTC_STUN, "no session, peer_id=%s, fast=%" PRId64, peer_id.c_str(), fast_id); - } - - // Note that we don't(except error) switch to the context of session, for performance issue. - if (is_rtp_or_rtcp && !is_rtcp) { - ++_srs_pps_rrtps->sugar; - - err = session->udp()->on_rtp(data, size); - if (err != srs_success) { - session->switch_to_context(); - } - return err; - } - - session->switch_to_context(); - if (is_rtp_or_rtcp && is_rtcp) { - ++_srs_pps_rrtcps->sugar; - - return session->udp()->on_rtcp(data, size); - } - if (srs_is_dtls((uint8_t *)data, size)) { - ++_srs_pps_rstuns->sugar; - - return session->udp()->on_dtls(data, size); - } - return srs_error_new(ERROR_RTC_UDP, "unknown packet"); + return rtc_session_manager_->on_udp_packet(skt); } srs_error_t SrsServer::listen_rtc_api() @@ -2071,15 +1329,9 @@ srs_error_t SrsServer::listen_rtc_api() return err; } -srs_error_t SrsServer::exec_rtc_async_work(ISrsAsyncCallTask *t) -{ - return rtc_async_->execute(t); -} - SrsRtcConnection *SrsServer::find_rtc_session_by_username(const std::string &username) { - ISrsResource *conn = _srs_conn_manager->find_by_name(username); - return dynamic_cast(conn); + return rtc_session_manager_->find_rtc_session_by_username(username); } srs_error_t SrsServer::create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession) @@ -2094,260 +1346,27 @@ srs_error_t SrsServer::create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_s return srs_error_wrap(err, "check"); } - // Acquire stream publish token to prevent race conditions across all protocols. - SrsStreamPublishToken *publish_token_raw = NULL; - if (ruc->publish_ && (err = _srs_stream_publish_tokens->acquire_token(req, publish_token_raw)) != srs_success) { - return srs_error_wrap(err, "acquire stream publish token"); - } - SrsUniquePtr publish_token(publish_token_raw); - if (publish_token.get()) { - srs_trace("stream publish token acquired, type=rtc, url=%s", req->get_stream_url().c_str()); - } - - SrsSharedPtr source; - if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) { - return srs_error_wrap(err, "create source"); - } - - if (ruc->publish_ && !source->can_publish()) { - return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str()); - } - - // TODO: FIXME: add do_create_session to error process. - SrsContextId cid = _srs_context->get_id(); - SrsRtcConnection *session = new SrsRtcConnection(this, cid); - if ((err = do_create_rtc_session(ruc, local_sdp, session)) != srs_success) { - srs_freep(session); - return srs_error_wrap(err, "create session"); - } - - *psession = session; - - return err; + return rtc_session_manager_->create_rtc_session(ruc, local_sdp, psession); } -srs_error_t SrsServer::do_create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session) +srs_error_t SrsServer::srs_update_server_statistics() { srs_error_t err = srs_success; - ISrsRequest *req = ruc->req_; + // Show statistics for RTC server. + SrsProcSelfStat *u = srs_get_self_proc_stat(); + // Resident Set Size: number of pages the process has in real memory. + int memory = (int)(u->rss * 4 / 1024); - // first add publisher/player for negotiate sdp media info - if (ruc->publish_) { - if ((err = session->add_publisher(ruc, local_sdp)) != srs_success) { - return srs_error_wrap(err, "add publisher"); - } - } else { - if ((err = session->add_player(ruc, local_sdp)) != srs_success) { - return srs_error_wrap(err, "add player"); - } - } + SrsKbpsStats stats; + srs_global_kbps_update(&stats); - // All tracks default as inactive, so we must enable them. - session->set_all_tracks_status(req->get_stream_url(), ruc->publish_, true); - - std::string local_pwd = ruc->req_->ice_pwd_.empty() ? srs_rand_gen_str(32) : ruc->req_->ice_pwd_; - std::string local_ufrag = ruc->req_->ice_ufrag_.empty() ? srs_rand_gen_str(8) : ruc->req_->ice_ufrag_; - // TODO: FIXME: Rename for a better name, it's not an username. - std::string username = ""; - while (true) { - username = local_ufrag + ":" + ruc->remote_sdp_.get_ice_ufrag(); - if (!_srs_conn_manager->find_by_name(username)) { - break; - } - - // Username conflict, regenerate a new one. - local_ufrag = srs_rand_gen_str(8); - } - - local_sdp.set_ice_ufrag(local_ufrag); - local_sdp.set_ice_pwd(local_pwd); - local_sdp.set_fingerprint_algo("sha-256"); - local_sdp.set_fingerprint(_srs_rtc_dtls_certificate->get_fingerprint()); - - // We allows to mock the eip of server. - if (true) { - // TODO: Support multiple listen ports. - int udp_port = 0; - if (true) { - string udp_host; - string udp_hostport = _srs_config->get_rtc_server_listens().at(0); - srs_net_split_for_listener(udp_hostport, udp_host, udp_port); - } - - int tcp_port = 0; - if (true) { - string tcp_host; - string tcp_hostport = _srs_config->get_rtc_server_tcp_listens().at(0); - srs_net_split_for_listener(tcp_hostport, tcp_host, tcp_port); - } - - string protocol = _srs_config->get_rtc_server_protocol(); - - set candidates = discover_candidates(ruc); - for (set::iterator it = candidates.begin(); it != candidates.end(); ++it) { - string hostname; - int uport = udp_port; - srs_net_split_hostport(*it, hostname, uport); - int tport = tcp_port; - srs_net_split_hostport(*it, hostname, tport); - - if (protocol == "udp") { - local_sdp.add_candidate("udp", hostname, uport, "host"); - } else if (protocol == "tcp") { - local_sdp.add_candidate("tcp", hostname, tport, "host"); - } else { - local_sdp.add_candidate("udp", hostname, uport, "host"); - local_sdp.add_candidate("tcp", hostname, tport, "host"); - } - } - - vector v = vector(candidates.begin(), candidates.end()); - srs_trace("RTC: Use candidates %s, protocol=%s, tcp_port=%d, udp_port=%d", - srs_strings_join(v, ", ").c_str(), protocol.c_str(), tcp_port, udp_port); - } - - // Setup the negotiate DTLS by config. - local_sdp.session_negotiate_ = local_sdp.session_config_; - - // Setup the negotiate DTLS role. - if (ruc->remote_sdp_.get_dtls_role() == "active") { - local_sdp.session_negotiate_.dtls_role = "passive"; - } else if (ruc->remote_sdp_.get_dtls_role() == "passive") { - local_sdp.session_negotiate_.dtls_role = "active"; - } else if (ruc->remote_sdp_.get_dtls_role() == "actpass") { - local_sdp.session_negotiate_.dtls_role = local_sdp.session_config_.dtls_role; - } else { - // @see: https://tools.ietf.org/html/rfc4145#section-4.1 - // The default value of the setup attribute in an offer/answer exchange - // is 'active' in the offer and 'passive' in the answer. - local_sdp.session_negotiate_.dtls_role = "passive"; - } - local_sdp.set_dtls_role(local_sdp.session_negotiate_.dtls_role); - - session->set_remote_sdp(ruc->remote_sdp_); - // We must setup the local SDP, then initialize the session object. - session->set_local_sdp(local_sdp); - session->set_state_as_waiting_stun(); - - // Before session initialize, we must setup the local SDP. - if ((err = session->initialize(req, ruc->dtls_, ruc->srtp_, username)) != srs_success) { - return srs_error_wrap(err, "init"); - } - - // We allows username is optional, but it never empty here. - _srs_conn_manager->add_with_name(username, session); - - return err; -} - -srs_error_t SrsServer::srs_update_rtc_sessions() -{ - srs_error_t err = srs_success; - - // Alive RTC sessions, for stat. - int nn_rtc_conns = 0; - - // Check all sessions and dispose the dead sessions. - for (int i = 0; i < (int)_srs_conn_manager->size(); i++) { - SrsRtcConnection *session = dynamic_cast(_srs_conn_manager->at(i)); - // Ignore not session, or already disposing. - if (!session || session->disposing_) { - continue; - } - - // Update stat if session is alive. - if (session->is_alive()) { - nn_rtc_conns++; - continue; - } - - SrsContextRestore(_srs_context->get_id()); - session->switch_to_context(); - - string username = session->username(); - srs_trace("RTC: session destroy by timeout, username=%s", username.c_str()); - - // Use manager to free session and notify other objects. - _srs_conn_manager->remove(session); - } - - // Ignore stats if no RTC connections. - if (!nn_rtc_conns) { - return err; - } - static char buf[128]; - - string rpkts_desc; - _srs_pps_rpkts->update(); - _srs_pps_rrtps->update(); - _srs_pps_rstuns->update(); - _srs_pps_rrtcps->update(); - if (_srs_pps_rpkts->r10s() || _srs_pps_rrtps->r10s() || _srs_pps_rstuns->r10s() || _srs_pps_rrtcps->r10s()) { - snprintf(buf, sizeof(buf), ", rpkts=(%d,rtp:%d,stun:%d,rtcp:%d)", _srs_pps_rpkts->r10s(), _srs_pps_rrtps->r10s(), _srs_pps_rstuns->r10s(), _srs_pps_rrtcps->r10s()); - rpkts_desc = buf; - } - - string spkts_desc; - _srs_pps_spkts->update(); - _srs_pps_srtps->update(); - _srs_pps_sstuns->update(); - _srs_pps_srtcps->update(); - if (_srs_pps_spkts->r10s() || _srs_pps_srtps->r10s() || _srs_pps_sstuns->r10s() || _srs_pps_srtcps->r10s()) { - snprintf(buf, sizeof(buf), ", spkts=(%d,rtp:%d,stun:%d,rtcp:%d)", _srs_pps_spkts->r10s(), _srs_pps_srtps->r10s(), _srs_pps_sstuns->r10s(), _srs_pps_srtcps->r10s()); - spkts_desc = buf; - } - - string rtcp_desc; - _srs_pps_pli->update(); - _srs_pps_twcc->update(); - _srs_pps_rr->update(); - if (_srs_pps_pli->r10s() || _srs_pps_twcc->r10s() || _srs_pps_rr->r10s()) { - snprintf(buf, sizeof(buf), ", rtcp=(pli:%d,twcc:%d,rr:%d)", _srs_pps_pli->r10s(), _srs_pps_twcc->r10s(), _srs_pps_rr->r10s()); - rtcp_desc = buf; - } - - string snk_desc; - _srs_pps_snack->update(); - _srs_pps_snack2->update(); - _srs_pps_sanack->update(); - _srs_pps_svnack->update(); - if (_srs_pps_snack->r10s() || _srs_pps_sanack->r10s() || _srs_pps_svnack->r10s() || _srs_pps_snack2->r10s()) { - snprintf(buf, sizeof(buf), ", snk=(%d,a:%d,v:%d,h:%d)", _srs_pps_snack->r10s(), _srs_pps_sanack->r10s(), _srs_pps_svnack->r10s(), _srs_pps_snack2->r10s()); - snk_desc = buf; - } - - string rnk_desc; - _srs_pps_rnack->update(); - _srs_pps_rnack2->update(); - _srs_pps_rhnack->update(); - _srs_pps_rmnack->update(); - if (_srs_pps_rnack->r10s() || _srs_pps_rnack2->r10s() || _srs_pps_rhnack->r10s() || _srs_pps_rmnack->r10s()) { - snprintf(buf, sizeof(buf), ", rnk=(%d,%d,h:%d,m:%d)", _srs_pps_rnack->r10s(), _srs_pps_rnack2->r10s(), _srs_pps_rhnack->r10s(), _srs_pps_rmnack->r10s()); - rnk_desc = buf; - } - - string loss_desc; - SrsSnmpUdpStat *s = srs_get_udp_snmp_stat(); - if (s->rcv_buf_errors_delta || s->snd_buf_errors_delta) { - snprintf(buf, sizeof(buf), ", loss=(r:%d,s:%d)", s->rcv_buf_errors_delta, s->snd_buf_errors_delta); - loss_desc = buf; - } - - string fid_desc; - _srs_pps_ids->update(); - _srs_pps_fids->update(); - _srs_pps_fids_level0->update(); - _srs_pps_addrs->update(); - _srs_pps_fast_addrs->update(); - if (_srs_pps_ids->r10s(), _srs_pps_fids->r10s(), _srs_pps_fids_level0->r10s(), _srs_pps_addrs->r10s(), _srs_pps_fast_addrs->r10s()) { - snprintf(buf, sizeof(buf), ", fid=(id:%d,fid:%d,ffid:%d,addr:%d,faddr:%d)", _srs_pps_ids->r10s(), _srs_pps_fids->r10s(), _srs_pps_fids_level0->r10s(), _srs_pps_addrs->r10s(), _srs_pps_fast_addrs->r10s()); - fid_desc = buf; - } - - srs_trace("RTC: Server conns=%u%s%s%s%s%s%s%s", - nn_rtc_conns, - rpkts_desc.c_str(), spkts_desc.c_str(), rtcp_desc.c_str(), snk_desc.c_str(), rnk_desc.c_str(), loss_desc.c_str(), fid_desc.c_str()); + srs_trace("SRS: cpu=%.2f%%,%dMB%s%s%s%s%s%s%s%s%s%s%s", + u->percent * 100, memory, + stats.cid_desc.c_str(), stats.timer_desc.c_str(), + stats.recvfrom_desc.c_str(), stats.io_desc.c_str(), stats.msg_desc.c_str(), + stats.epoll_desc.c_str(), stats.sched_desc.c_str(), stats.clock_desc.c_str(), + stats.thread_desc.c_str(), stats.free_desc.c_str(), stats.objs_desc.c_str()); return err; } @@ -2534,174 +1553,353 @@ void SrsServer::on_unpublish(ISrsRequest *r) coworkers->on_unpublish(r); } -SrsFastTimer *SrsServer::timer20ms() +SrsSignalManager *SrsSignalManager::instance = NULL; + +SrsSignalManager::SrsSignalManager(SrsServer *s) { - return timer20ms_; + SrsSignalManager::instance = this; + + server = s; + sig_pipe[0] = sig_pipe[1] = -1; + trd = new SrsSTCoroutine("signal", this, _srs_context->get_id()); + signal_read_stfd = NULL; } -SrsFastTimer *SrsServer::timer100ms() +SrsSignalManager::~SrsSignalManager() { - return timer100ms_; + srs_freep(trd); + + srs_close_stfd(signal_read_stfd); + + if (sig_pipe[0] > 0) { + ::close(sig_pipe[0]); + } + if (sig_pipe[1] > 0) { + ::close(sig_pipe[1]); + } } -SrsFastTimer *SrsServer::timer1s() +srs_error_t SrsSignalManager::initialize() { - return timer1s_; + /* Create signal pipe */ + if (pipe(sig_pipe) < 0) { + return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe"); + } + + if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) { + return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "open pipe"); + } + + return srs_success; } -SrsFastTimer *SrsServer::timer5s() -{ - return timer5s_; -} - -srs_error_t SrsServer::on_timer(srs_utime_t interval) +srs_error_t SrsSignalManager::start() { srs_error_t err = srs_success; - // Show statistics for RTC server. - SrsProcSelfStat *u = srs_get_self_proc_stat(); - // Resident Set Size: number of pages the process has in real memory. - int memory = (int)(u->rss * 4 / 1024); + /** + * Note that if multiple processes are used (see below), + * the signal pipe should be initialized after the fork(2) call + * so that each process has its own private pipe. + */ + struct sigaction sa; - static char buf[128]; + /* Install sig_catcher() as a signal handler */ + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SRS_SIGNAL_RELOAD, &sa, NULL); - string cid_desc; - _srs_pps_cids_get->update(); - _srs_pps_cids_set->update(); - if (_srs_pps_cids_get->r10s() || _srs_pps_cids_set->r10s()) { - snprintf(buf, sizeof(buf), ", cid=%d,%d", _srs_pps_cids_get->r10s(), _srs_pps_cids_set->r10s()); - cid_desc = buf; + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SRS_SIGNAL_FAST_QUIT, &sa, NULL); + + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SRS_SIGNAL_GRACEFULLY_QUIT, &sa, NULL); + + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SRS_SIGNAL_ASSERT_ABORT, &sa, NULL); + + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGINT, &sa, NULL); + + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SRS_SIGNAL_REOPEN_LOG, &sa, NULL); + + srs_trace("signal installed, reload=%d, reopen=%d, fast_quit=%d, grace_quit=%d", + SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_FAST_QUIT, SRS_SIGNAL_GRACEFULLY_QUIT); + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "signal manager"); } - string timer_desc; - _srs_pps_timer->update(); - _srs_pps_pub->update(); - _srs_pps_conn->update(); - if (_srs_pps_timer->r10s() || _srs_pps_pub->r10s() || _srs_pps_conn->r10s()) { - snprintf(buf, sizeof(buf), ", timer=%d,%d,%d", _srs_pps_timer->r10s(), _srs_pps_pub->r10s(), _srs_pps_conn->r10s()); - timer_desc = buf; - } - - string free_desc; - _srs_pps_dispose->update(); - if (_srs_pps_dispose->r10s()) { - snprintf(buf, sizeof(buf), ", free=%d", _srs_pps_dispose->r10s()); - free_desc = buf; - } - - string recvfrom_desc; -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) - _srs_pps_recvfrom->update(_st_stat_recvfrom); - _srs_pps_recvfrom_eagain->update(_st_stat_recvfrom_eagain); - _srs_pps_sendto->update(_st_stat_sendto); - _srs_pps_sendto_eagain->update(_st_stat_sendto_eagain); - if (_srs_pps_recvfrom->r10s() || _srs_pps_recvfrom_eagain->r10s() || _srs_pps_sendto->r10s() || _srs_pps_sendto_eagain->r10s()) { - snprintf(buf, sizeof(buf), ", udp=%d,%d,%d,%d", _srs_pps_recvfrom->r10s(), _srs_pps_recvfrom_eagain->r10s(), _srs_pps_sendto->r10s(), _srs_pps_sendto_eagain->r10s()); - recvfrom_desc = buf; - } -#endif - - string io_desc; -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) - _srs_pps_read->update(_st_stat_read); - _srs_pps_read_eagain->update(_st_stat_read_eagain); - _srs_pps_readv->update(_st_stat_readv); - _srs_pps_readv_eagain->update(_st_stat_readv_eagain); - _srs_pps_writev->update(_st_stat_writev); - _srs_pps_writev_eagain->update(_st_stat_writev_eagain); - if (_srs_pps_read->r10s() || _srs_pps_read_eagain->r10s() || _srs_pps_readv->r10s() || _srs_pps_readv_eagain->r10s() || _srs_pps_writev->r10s() || _srs_pps_writev_eagain->r10s()) { - snprintf(buf, sizeof(buf), ", io=%d,%d,%d,%d,%d,%d", _srs_pps_read->r10s(), _srs_pps_read_eagain->r10s(), _srs_pps_readv->r10s(), _srs_pps_readv_eagain->r10s(), _srs_pps_writev->r10s(), _srs_pps_writev_eagain->r10s()); - io_desc = buf; - } -#endif - - string msg_desc; -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) - _srs_pps_recvmsg->update(_st_stat_recvmsg); - _srs_pps_recvmsg_eagain->update(_st_stat_recvmsg_eagain); - _srs_pps_sendmsg->update(_st_stat_sendmsg); - _srs_pps_sendmsg_eagain->update(_st_stat_sendmsg_eagain); - if (_srs_pps_recvmsg->r10s() || _srs_pps_recvmsg_eagain->r10s() || _srs_pps_sendmsg->r10s() || _srs_pps_sendmsg_eagain->r10s()) { - snprintf(buf, sizeof(buf), ", msg=%d,%d,%d,%d", _srs_pps_recvmsg->r10s(), _srs_pps_recvmsg_eagain->r10s(), _srs_pps_sendmsg->r10s(), _srs_pps_sendmsg_eagain->r10s()); - msg_desc = buf; - } -#endif - - string epoll_desc; -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) - _srs_pps_epoll->update(_st_stat_epoll); - _srs_pps_epoll_zero->update(_st_stat_epoll_zero); - _srs_pps_epoll_shake->update(_st_stat_epoll_shake); - _srs_pps_epoll_spin->update(_st_stat_epoll_spin); - if (_srs_pps_epoll->r10s() || _srs_pps_epoll_zero->r10s() || _srs_pps_epoll_shake->r10s() || _srs_pps_epoll_spin->r10s()) { - snprintf(buf, sizeof(buf), ", epoll=%d,%d,%d,%d", _srs_pps_epoll->r10s(), _srs_pps_epoll_zero->r10s(), _srs_pps_epoll_shake->r10s(), _srs_pps_epoll_spin->r10s()); - epoll_desc = buf; - } -#endif - - string sched_desc; -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) - _srs_pps_sched_160ms->update(_st_stat_sched_160ms); - _srs_pps_sched_s->update(_st_stat_sched_s); - _srs_pps_sched_15ms->update(_st_stat_sched_15ms); - _srs_pps_sched_20ms->update(_st_stat_sched_20ms); - _srs_pps_sched_25ms->update(_st_stat_sched_25ms); - _srs_pps_sched_30ms->update(_st_stat_sched_30ms); - _srs_pps_sched_35ms->update(_st_stat_sched_35ms); - _srs_pps_sched_40ms->update(_st_stat_sched_40ms); - _srs_pps_sched_80ms->update(_st_stat_sched_80ms); - if (_srs_pps_sched_160ms->r10s() || _srs_pps_sched_s->r10s() || _srs_pps_sched_15ms->r10s() || _srs_pps_sched_20ms->r10s() || _srs_pps_sched_25ms->r10s() || _srs_pps_sched_30ms->r10s() || _srs_pps_sched_35ms->r10s() || _srs_pps_sched_40ms->r10s() || _srs_pps_sched_80ms->r10s()) { - snprintf(buf, sizeof(buf), ", sched=%d,%d,%d,%d,%d,%d,%d,%d,%d", _srs_pps_sched_15ms->r10s(), _srs_pps_sched_20ms->r10s(), _srs_pps_sched_25ms->r10s(), _srs_pps_sched_30ms->r10s(), _srs_pps_sched_35ms->r10s(), _srs_pps_sched_40ms->r10s(), _srs_pps_sched_80ms->r10s(), _srs_pps_sched_160ms->r10s(), _srs_pps_sched_s->r10s()); - sched_desc = buf; - } -#endif - - string clock_desc; - _srs_pps_clock_15ms->update(); - _srs_pps_clock_20ms->update(); - _srs_pps_clock_25ms->update(); - _srs_pps_clock_30ms->update(); - _srs_pps_clock_35ms->update(); - _srs_pps_clock_40ms->update(); - _srs_pps_clock_80ms->update(); - _srs_pps_clock_160ms->update(); - _srs_pps_timer_s->update(); - if (_srs_pps_clock_15ms->r10s() || _srs_pps_timer_s->r10s() || _srs_pps_clock_20ms->r10s() || _srs_pps_clock_25ms->r10s() || _srs_pps_clock_30ms->r10s() || _srs_pps_clock_35ms->r10s() || _srs_pps_clock_40ms->r10s() || _srs_pps_clock_80ms->r10s() || _srs_pps_clock_160ms->r10s()) { - snprintf(buf, sizeof(buf), ", clock=%d,%d,%d,%d,%d,%d,%d,%d,%d", _srs_pps_clock_15ms->r10s(), _srs_pps_clock_20ms->r10s(), _srs_pps_clock_25ms->r10s(), _srs_pps_clock_30ms->r10s(), _srs_pps_clock_35ms->r10s(), _srs_pps_clock_40ms->r10s(), _srs_pps_clock_80ms->r10s(), _srs_pps_clock_160ms->r10s(), _srs_pps_timer_s->r10s()); - clock_desc = buf; - } - - string thread_desc; -#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) - _srs_pps_thread_run->update(_st_stat_thread_run); - _srs_pps_thread_idle->update(_st_stat_thread_idle); - _srs_pps_thread_yield->update(_st_stat_thread_yield); - _srs_pps_thread_yield2->update(_st_stat_thread_yield2); - if (_st_active_count > 0 || _st_num_free_stacks > 0 || _srs_pps_thread_run->r10s() || _srs_pps_thread_idle->r10s() || _srs_pps_thread_yield->r10s() || _srs_pps_thread_yield2->r10s()) { - snprintf(buf, sizeof(buf), ", co=%d,%d,%d, stk=%d, yield=%d,%d", _st_active_count, _srs_pps_thread_run->r10s(), _srs_pps_thread_idle->r10s(), _st_num_free_stacks, _srs_pps_thread_yield->r10s(), _srs_pps_thread_yield2->r10s()); - thread_desc = buf; - } -#endif - - string objs_desc; - _srs_pps_objs_rtps->update(); - _srs_pps_objs_rraw->update(); - _srs_pps_objs_rfua->update(); - _srs_pps_objs_rbuf->update(); - _srs_pps_objs_msgs->update(); - _srs_pps_objs_rothers->update(); - if (_srs_pps_objs_rtps->r10s() || _srs_pps_objs_rraw->r10s() || _srs_pps_objs_rfua->r10s() || _srs_pps_objs_rbuf->r10s() || _srs_pps_objs_msgs->r10s() || _srs_pps_objs_rothers->r10s()) { - snprintf(buf, sizeof(buf), ", objs=(pkt:%d,raw:%d,fua:%d,msg:%d,oth:%d,buf:%d)", - _srs_pps_objs_rtps->r10s(), _srs_pps_objs_rraw->r10s(), _srs_pps_objs_rfua->r10s(), - _srs_pps_objs_msgs->r10s(), _srs_pps_objs_rothers->r10s(), _srs_pps_objs_rbuf->r10s()); - objs_desc = buf; - } - - srs_trace("Hybrid cpu=%.2f%%,%dMB%s%s%s%s%s%s%s%s%s%s%s", - u->percent * 100, memory, - cid_desc.c_str(), timer_desc.c_str(), - recvfrom_desc.c_str(), io_desc.c_str(), msg_desc.c_str(), - epoll_desc.c_str(), sched_desc.c_str(), clock_desc.c_str(), - thread_desc.c_str(), free_desc.c_str(), objs_desc.c_str()); return err; } + +srs_error_t SrsSignalManager::cycle() +{ + srs_error_t err = srs_success; + + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "signal manager"); + } + + int signo; + + /* Read the next signal from the pipe */ + srs_read(signal_read_stfd, &signo, sizeof(int), SRS_UTIME_NO_TIMEOUT); + + /* Process signal synchronously */ + server->on_signal(signo); + } + + return err; +} + +void SrsSignalManager::sig_catcher(int signo) +{ + int err; + + /* Save errno to restore it after the write() */ + err = errno; + + /* write() is reentrant/async-safe */ + int fd = SrsSignalManager::instance->sig_pipe[1]; + write(fd, &signo, sizeof(int)); + + errno = err; +} + +// Whether we are in docker, defined in main module. +extern bool _srs_in_docker; + +SrsInotifyWorker::SrsInotifyWorker(SrsServer *s) +{ + server = s; + trd = new SrsSTCoroutine("inotify", this); + inotify_fd = NULL; +} + +SrsInotifyWorker::~SrsInotifyWorker() +{ + srs_freep(trd); + srs_close_stfd(inotify_fd); +} + +srs_error_t SrsInotifyWorker::start() +{ + srs_error_t err = srs_success; + +#if !defined(SRS_OSX) && !defined(SRS_CYGWIN64) + // Whether enable auto reload config. + bool auto_reload = _srs_config->inotify_auto_reload(); + if (!auto_reload && _srs_in_docker && _srs_config->auto_reload_for_docker()) { + srs_warn("enable auto reload for docker"); + auto_reload = true; + } + + if (!auto_reload) { + return err; + } + + // Create inotify to watch config file. + int fd = ::inotify_init1(IN_NONBLOCK); + if (fd < 0) { + return srs_error_new(ERROR_INOTIFY_CREATE, "create inotify"); + } + + // Open as stfd to read by ST. + if ((inotify_fd = srs_netfd_open(fd)) == NULL) { + ::close(fd); + return srs_error_new(ERROR_INOTIFY_OPENFD, "open fd=%d", fd); + } + + if (((err = srs_fd_closeexec(fd))) != srs_success) { + return srs_error_wrap(err, "closeexec fd=%d", fd); + } + + // /* the following are legal, implemented events that user-space can watch for */ + // #define IN_ACCESS 0x00000001 /* File was accessed */ + // #define IN_MODIFY 0x00000002 /* File was modified */ + // #define IN_ATTRIB 0x00000004 /* Metadata changed */ + // #define IN_CLOSE_WRITE 0x00000008 /* Writtable file was closed */ + // #define IN_CLOSE_NOWRITE 0x00000010 /* Unwrittable file closed */ + // #define IN_OPEN 0x00000020 /* File was opened */ + // #define IN_MOVED_FROM 0x00000040 /* File was moved from X */ + // #define IN_MOVED_TO 0x00000080 /* File was moved to Y */ + // #define IN_CREATE 0x00000100 /* Subfile was created */ + // #define IN_DELETE 0x00000200 /* Subfile was deleted */ + // #define IN_DELETE_SELF 0x00000400 /* Self was deleted */ + // #define IN_MOVE_SELF 0x00000800 /* Self was moved */ + // + // /* the following are legal events. they are sent as needed to any watch */ + // #define IN_UNMOUNT 0x00002000 /* Backing fs was unmounted */ + // #define IN_Q_OVERFLOW 0x00004000 /* Event queued overflowed */ + // #define IN_IGNORED 0x00008000 /* File was ignored */ + // + // /* helper events */ + // #define IN_CLOSE (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE) /* close */ + // #define IN_MOVE (IN_MOVED_FROM | IN_MOVED_TO) /* moves */ + // + // /* special flags */ + // #define IN_ONLYDIR 0x01000000 /* only watch the path if it is a directory */ + // #define IN_DONT_FOLLOW 0x02000000 /* don't follow a sym link */ + // #define IN_EXCL_UNLINK 0x04000000 /* exclude events on unlinked objects */ + // #define IN_MASK_ADD 0x20000000 /* add to the mask of an already existing watch */ + // #define IN_ISDIR 0x40000000 /* event occurred against dir */ + // #define IN_ONESHOT 0x80000000 /* only send event once */ + + // Watch the config directory events. + string config_dir = srs_path_filepath_dir(_srs_config->config()); + uint32_t mask = IN_MODIFY | IN_CREATE | IN_MOVED_TO; + int watch_conf = 0; + if ((watch_conf = ::inotify_add_watch(fd, config_dir.c_str(), mask)) < 0) { + return srs_error_new(ERROR_INOTIFY_WATCH, "watch file=%s, fd=%d, watch=%d, mask=%#x", + config_dir.c_str(), fd, watch_conf, mask); + } + srs_trace("auto reload watching fd=%d, watch=%d, file=%s", fd, watch_conf, config_dir.c_str()); + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "inotify"); + } +#endif + + return err; +} + +srs_error_t SrsInotifyWorker::cycle() +{ + srs_error_t err = srs_success; + +#if !defined(SRS_OSX) && !defined(SRS_CYGWIN64) + string config_path = _srs_config->config(); + string config_file = srs_path_filepath_base(config_path); + string k8s_file = "..data"; + + while (true) { + char buf[4096]; + ssize_t nn = srs_read(inotify_fd, buf, (size_t)sizeof(buf), SRS_UTIME_NO_TIMEOUT); + if (nn < 0) { + srs_warn("inotify ignore read failed, nn=%d", (int)nn); + break; + } + + // Whether config file changed. + bool do_reload = false; + + // Parse all inotify events. + inotify_event *ie = NULL; + for (char *ptr = buf; ptr < buf + nn; ptr += sizeof(inotify_event) + ie->len) { + ie = (inotify_event *)ptr; + + if (!ie->len || !ie->name) { + continue; + } + + string name = ie->name; + if ((name == k8s_file || name == config_file) && ie->mask & (IN_MODIFY | IN_CREATE | IN_MOVED_TO)) { + do_reload = true; + } + + srs_trace("inotify event wd=%d, mask=%#x, len=%d, name=%s, reload=%d", ie->wd, ie->mask, ie->len, ie->name, do_reload); + } + + // Notify server to do reload. + if (do_reload && srs_path_exists(config_path)) { + server->on_signal(SRS_SIGNAL_RELOAD); + } + + srs_usleep(3000 * SRS_UTIME_MILLISECONDS); + } +#endif + + return err; +} + +SrsPidFileLocker::SrsPidFileLocker() +{ + pid_fd_ = -1; +} + +SrsPidFileLocker::~SrsPidFileLocker() +{ + close(); +} + +srs_error_t SrsPidFileLocker::acquire() +{ + srs_error_t err = srs_success; + + pid_file_ = _srs_config->get_pid_file(); + + // -rw-r--r-- + // 644 + int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + + int fd; + // open pid file + if ((fd = ::open(pid_file_.c_str(), O_WRONLY | O_CREAT, mode)) == -1) { + return srs_error_new(ERROR_SYSTEM_PID_ACQUIRE, "open pid file=%s", pid_file_.c_str()); + } + + // require write lock + struct flock lock; + + lock.l_type = F_WRLCK; // F_RDLCK, F_WRLCK, F_UNLCK + lock.l_start = 0; // type offset, relative to l_whence + lock.l_whence = SEEK_SET; // SEEK_SET, SEEK_CUR, SEEK_END + lock.l_len = 0; + + if (fcntl(fd, F_SETLK, &lock) == -1) { + if (errno == EACCES || errno == EAGAIN) { + ::close(fd); + srs_error("srs is already running!"); + return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running"); + } + return srs_error_new(ERROR_SYSTEM_PID_LOCK, "access to pid=%s", pid_file_.c_str()); + } + + // truncate file + if (ftruncate(fd, 0) != 0) { + return srs_error_new(ERROR_SYSTEM_PID_TRUNCATE_FILE, "truncate pid file=%s", pid_file_.c_str()); + } + + // write the pid + std::string pid = srs_strconv_format_int(getpid()); + if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) { + return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%s to file=%s", pid.c_str(), pid_file_.c_str()); + } + + // auto close when fork child process. + int val; + if ((val = fcntl(fd, F_GETFD, 0)) < 0) { + return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fcntl fd=%d", fd); + } + val |= FD_CLOEXEC; + if (fcntl(fd, F_SETFD, val) < 0) { + return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "lock file=%s fd=%d", pid_file_.c_str(), fd); + } + + srs_trace("write pid=%s to %s success!", pid.c_str(), pid_file_.c_str()); + pid_fd_ = fd; + + return err; +} + +void SrsPidFileLocker::close() +{ + if (pid_fd_ > 0) { + ::close(pid_fd_); + pid_fd_ = -1; + } +} diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 32678b326..3cfb4ede3 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -18,11 +18,11 @@ #include #include #include -#include -#include - #include +#include +#include #include +#include class SrsAsyncCallWorker; class SrsUdpMuxListener; @@ -31,7 +31,7 @@ class SrsRtcUserConfig; class SrsSdp; class SrsRtcConnection; class ISrsAsyncCallTask; - +class SrsSignalManager; class SrsServer; class ISrsHttpServeMux; class SrsHttpServer; @@ -54,82 +54,19 @@ class SrsRtmpTransport; class SrsRtmpsTransport; class SrsSrtAcceptor; class SrsSrtEventLoop; +class SrsRtcSessionManager; +class SrsPidFileLocker; // Initialize global shared variables cross all threads. extern srs_error_t srs_global_initialize(); -// Interface for SRT client acceptance -class ISrsSrtClientHandler -{ -public: - ISrsSrtClientHandler(); - virtual ~ISrsSrtClientHandler(); - -public: - virtual srs_error_t accept_srt_client(srs_srt_t srt_fd); -}; - -// Convert signal to io, -// @see: st-1.9/docs/notes.html -class SrsSignalManager : public ISrsCoroutineHandler -{ -private: - // Per-process pipe which is used as a signal queue. - // Up to PIPE_BUF/sizeof(int) signals can be queued up. - int sig_pipe[2]; - srs_netfd_t signal_read_stfd; - -private: - SrsServer *server; - SrsCoroutine *trd; - -public: - SrsSignalManager(SrsServer *s); - virtual ~SrsSignalManager(); - -public: - virtual srs_error_t initialize(); - virtual srs_error_t start(); - // Interface ISrsEndlessThreadHandler. -public: - virtual srs_error_t cycle(); - -private: - // Global singleton instance - static SrsSignalManager *instance; - // Signal catching function. - // Converts signal event to I/O event. - static void sig_catcher(int signo); -}; - -// Auto reload by inotify. -// @see https://github.com/ossrs/srs/issues/1635 -class SrsInotifyWorker : public ISrsCoroutineHandler -{ -private: - SrsServer *server; - SrsCoroutine *trd; - srs_netfd_t inotify_fd; - -public: - SrsInotifyWorker(SrsServer *s); - virtual ~SrsInotifyWorker(); - -public: - virtual srs_error_t start(); - // Interface ISrsEndlessThreadHandler. -public: - virtual srs_error_t cycle(); -}; - // SRS RTMP server, initialize and listen, start connection service thread, destroy client. class SrsServer : public ISrsReloadHandler, // Reload framework for permormance optimization. public ISrsLiveSourceHandler, public ISrsTcpHandler, public ISrsHourGlass, public ISrsSrtClientHandler, - public ISrsUdpMuxHandler, - public ISrsFastTimer + public ISrsUdpMuxHandler { private: // TODO: FIXME: Extract an HttpApiServer. @@ -142,19 +79,8 @@ private: SrsHourGlass *timer_; private: - // Global shared timers moved from SrsHybridServer - SrsFastTimer *timer20ms_; - SrsFastTimer *timer100ms_; - SrsFastTimer *timer1s_; - SrsFastTimer *timer5s_; - SrsClockWallMonitor *clock_monitor_; - -private: - // The pid file fd, lock the file write when server is running. - // @remark the init.d script should cleanup the pid file, when stop service, - // for the server never delete the file; when system startup, the pid in pid file - // maybe valid but the process is not SRS, the init.d script will never start server. - int pid_fd_; + // PID file manager for process identification and locking. + SrsPidFileLocker *pid_file_locker_; private: // If reusing, HTTP API use the same port of HTTP server. @@ -196,8 +122,8 @@ private: std::vector srt_acceptors_; // WebRTC UDP listeners for RTC server functionality. std::vector rtc_listeners_; - // WebRTC async call worker for non-blocking operations. - SrsAsyncCallWorker *rtc_async_; + // WebRTC session manager. + SrsRtcSessionManager *rtc_session_manager_; private: // Signal manager which convert gignal to io message. @@ -218,10 +144,6 @@ public: virtual ~SrsServer(); private: - // The destroy is for gmc to analysis the memory leak, - // if not destroy global/static data, the gmc will warning memory leak. - // In service, server never destroy, directly exit when restart. - virtual void destroy(); // When SIGTERM, SRS should do cleanup, for example, // to stop all ingesters, cleanup HLS and dvr. virtual void dispose(); @@ -229,16 +151,16 @@ private: // then wait and quit when all connections finished. virtual void gracefully_dispose(); +public: + // Get the HTTP API server mux. + ISrsHttpServeMux *api_server(); + // server startup workflow, @see run_master() public: // Initialize server with callback handler ch. // @remark user must free the handler. virtual srs_error_t initialize(); -private: - // Require the PID file for the whole process. - virtual srs_error_t acquire_pid_file(); - public: srs_error_t run(); @@ -295,6 +217,7 @@ private: virtual srs_error_t accept_srt_client(srs_srt_t srt_fd); virtual srs_error_t srt_fd_to_resource(srs_srt_t srt_fd, ISrsResource **pr); +private: // WebRTC-related methods virtual srs_error_t listen_rtc_udp(); @@ -306,15 +229,11 @@ private: virtual srs_error_t listen_rtc_api(); public: - virtual srs_error_t exec_rtc_async_work(ISrsAsyncCallTask *t); virtual SrsRtcConnection *find_rtc_session_by_username(const std::string &ufrag); virtual srs_error_t create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession); private: - virtual srs_error_t do_create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session); - -private: - virtual srs_error_t srs_update_rtc_sessions(); + virtual srs_error_t srs_update_server_statistics(); // Interface ISrsTcpHandler public: @@ -328,23 +247,82 @@ private: public: virtual srs_error_t on_publish(ISrsRequest *r); virtual void on_unpublish(ISrsRequest *r); - -public: - // Access to global shared timers - SrsFastTimer *timer20ms(); - SrsFastTimer *timer100ms(); - SrsFastTimer *timer1s(); - SrsFastTimer *timer5s(); - - // interface ISrsFastTimer for statistics reporting -private: - virtual srs_error_t on_timer(srs_utime_t interval); }; // @global main SRS server, for debugging extern SrsServer *_srs_server; -// Manager for RTC connections. -extern SrsResourceManager *_srs_conn_manager; +// Convert signal to io, +// @see: st-1.9/docs/notes.html +class SrsSignalManager : public ISrsCoroutineHandler +{ +private: + // Per-process pipe which is used as a signal queue. + // Up to PIPE_BUF/sizeof(int) signals can be queued up. + int sig_pipe[2]; + srs_netfd_t signal_read_stfd; + +private: + SrsServer *server; + SrsCoroutine *trd; + +public: + SrsSignalManager(SrsServer *s); + virtual ~SrsSignalManager(); + +public: + virtual srs_error_t initialize(); + virtual srs_error_t start(); + // Interface ISrsEndlessThreadHandler. +public: + virtual srs_error_t cycle(); + +private: + // Global singleton instance + static SrsSignalManager *instance; + // Signal catching function. + // Converts signal event to I/O event. + static void sig_catcher(int signo); +}; + +// Auto reload by inotify. +// @see https://github.com/ossrs/srs/issues/1635 +class SrsInotifyWorker : public ISrsCoroutineHandler +{ +private: + SrsServer *server; + SrsCoroutine *trd; + srs_netfd_t inotify_fd; + +public: + SrsInotifyWorker(SrsServer *s); + virtual ~SrsInotifyWorker(); + +public: + virtual srs_error_t start(); + // Interface ISrsEndlessThreadHandler. +public: + virtual srs_error_t cycle(); +}; + +// PID file manager for process identification and locking. +class SrsPidFileLocker +{ +private: + int pid_fd_; + std::string pid_file_; + +public: + SrsPidFileLocker(); + virtual ~SrsPidFileLocker(); + +public: + // Acquire the PID file for the whole process. + virtual srs_error_t acquire(); + +private: + // Close the PID file descriptor. + virtual void close(); +}; #endif diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index 529fd7374..baa7bf0b8 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -12,6 +12,7 @@ using namespace std; #include #include #include +#include #include #include #include diff --git a/trunk/src/app/srs_app_srt_server.cpp b/trunk/src/app/srs_app_srt_server.cpp index eaa83f57c..fefa1a7b3 100644 --- a/trunk/src/app/srs_app_srt_server.cpp +++ b/trunk/src/app/srs_app_srt_server.cpp @@ -9,6 +9,8 @@ using namespace std; #include +#include +#include #include #include #include @@ -17,6 +19,14 @@ using namespace std; SrsSrtEventLoop *_srt_eventloop = NULL; +ISrsSrtClientHandler::ISrsSrtClientHandler() +{ +} + +ISrsSrtClientHandler::~ISrsSrtClientHandler() +{ +} + SrsSrtAcceptor::SrsSrtAcceptor(ISrsSrtClientHandler *srt_handler) { port_ = 0; diff --git a/trunk/src/app/srs_app_srt_server.hpp b/trunk/src/app/srs_app_srt_server.hpp index 3881a2e80..4a85caba1 100644 --- a/trunk/src/app/srs_app_srt_server.hpp +++ b/trunk/src/app/srs_app_srt_server.hpp @@ -9,7 +9,6 @@ #include -#include #include #include @@ -17,6 +16,17 @@ class SrsSrtServer; class SrsHourGlass; class ISrsSrtClientHandler; +// Interface for SRT client acceptance +class ISrsSrtClientHandler +{ +public: + ISrsSrtClientHandler(); + virtual ~ISrsSrtClientHandler(); + +public: + virtual srs_error_t accept_srt_client(srs_srt_t srt_fd) = 0; +}; + // A common srt acceptor, for SRT server. class SrsSrtAcceptor : public ISrsSrtHandler { diff --git a/trunk/src/core/srs_core_version7.hpp b/trunk/src/core/srs_core_version7.hpp index 9396e81cd..e064f5b00 100644 --- a/trunk/src/core/srs_core_version7.hpp +++ b/trunk/src/core/srs_core_version7.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 7 #define VERSION_MINOR 0 -#define VERSION_REVISION 69 +#define VERSION_REVISION 70 #endif \ No newline at end of file diff --git a/trunk/src/kernel/srs_kernel_kbps.cpp b/trunk/src/kernel/srs_kernel_kbps.cpp index 6a7b4d985..ec906ff5e 100644 --- a/trunk/src/kernel/srs_kernel_kbps.cpp +++ b/trunk/src/kernel/srs_kernel_kbps.cpp @@ -9,6 +9,10 @@ #include #include +#include +#include +using namespace std; + SrsRateSample::SrsRateSample() { total = time = -1; @@ -112,14 +116,58 @@ srs_utime_t SrsWallClock::now() SrsWallClock *_srs_clock = NULL; +// Global SrsPps statistics variables implementations +// I/O operations statistics +SrsPps *_srs_pps_recvfrom = NULL; +SrsPps *_srs_pps_recvfrom_eagain = NULL; +SrsPps *_srs_pps_sendto = NULL; +SrsPps *_srs_pps_sendto_eagain = NULL; + +SrsPps *_srs_pps_read = NULL; +SrsPps *_srs_pps_read_eagain = NULL; +SrsPps *_srs_pps_readv = NULL; +SrsPps *_srs_pps_readv_eagain = NULL; +SrsPps *_srs_pps_writev = NULL; +SrsPps *_srs_pps_writev_eagain = NULL; + +SrsPps *_srs_pps_recvmsg = NULL; +SrsPps *_srs_pps_recvmsg_eagain = NULL; +SrsPps *_srs_pps_sendmsg = NULL; +SrsPps *_srs_pps_sendmsg_eagain = NULL; + +// Clock and timing statistics +SrsPps *_srs_pps_clock_15ms = NULL; +SrsPps *_srs_pps_clock_20ms = NULL; +SrsPps *_srs_pps_clock_25ms = NULL; +SrsPps *_srs_pps_clock_30ms = NULL; +SrsPps *_srs_pps_clock_35ms = NULL; +SrsPps *_srs_pps_clock_40ms = NULL; +SrsPps *_srs_pps_clock_80ms = NULL; +SrsPps *_srs_pps_clock_160ms = NULL; +SrsPps *_srs_pps_timer_s = NULL; + +// WebRTC packet statistics (only the ones originally in srs_app_server.cpp) +SrsPps *_srs_pps_rstuns = NULL; +SrsPps *_srs_pps_rrtps = NULL; +SrsPps *_srs_pps_rrtcps = NULL; + +// NACK and loss statistics (only _srs_pps_aloss2 was originally in srs_app_server.cpp) +SrsPps *_srs_pps_aloss2 = NULL; + #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) +// Debug thread statistics +SrsPps *_srs_pps_thread_run = NULL; +SrsPps *_srs_pps_thread_idle = NULL; +SrsPps *_srs_pps_thread_yield = NULL; +SrsPps *_srs_pps_thread_yield2 = NULL; + +// Debug epoll statistics SrsPps *_srs_pps_epoll = NULL; SrsPps *_srs_pps_epoll_zero = NULL; SrsPps *_srs_pps_epoll_shake = NULL; SrsPps *_srs_pps_epoll_spin = NULL; -SrsPps *_srs_pps_sched_160ms = NULL; -SrsPps *_srs_pps_sched_s = NULL; +// Debug scheduler statistics SrsPps *_srs_pps_sched_15ms = NULL; SrsPps *_srs_pps_sched_20ms = NULL; SrsPps *_srs_pps_sched_25ms = NULL; @@ -127,5 +175,366 @@ SrsPps *_srs_pps_sched_30ms = NULL; SrsPps *_srs_pps_sched_35ms = NULL; SrsPps *_srs_pps_sched_40ms = NULL; SrsPps *_srs_pps_sched_80ms = NULL; +SrsPps *_srs_pps_sched_160ms = NULL; +SrsPps *_srs_pps_sched_s = NULL; #endif +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) +extern "C" { +// External ST statistics +extern __thread unsigned long long _st_stat_recvfrom; +extern __thread unsigned long long _st_stat_recvfrom_eagain; +extern __thread unsigned long long _st_stat_sendto; +extern __thread unsigned long long _st_stat_sendto_eagain; + +extern __thread unsigned long long _st_stat_read; +extern __thread unsigned long long _st_stat_read_eagain; +extern __thread unsigned long long _st_stat_readv; +extern __thread unsigned long long _st_stat_readv_eagain; +extern __thread unsigned long long _st_stat_writev; +extern __thread unsigned long long _st_stat_writev_eagain; + +extern __thread unsigned long long _st_stat_recvmsg; +extern __thread unsigned long long _st_stat_recvmsg_eagain; +extern __thread unsigned long long _st_stat_sendmsg; +extern __thread unsigned long long _st_stat_sendmsg_eagain; + +extern __thread unsigned long long _st_stat_epoll; +extern __thread unsigned long long _st_stat_epoll_zero; +extern __thread unsigned long long _st_stat_epoll_shake; +extern __thread unsigned long long _st_stat_epoll_spin; + +extern __thread unsigned long long _st_stat_sched_15ms; +extern __thread unsigned long long _st_stat_sched_20ms; +extern __thread unsigned long long _st_stat_sched_25ms; +extern __thread unsigned long long _st_stat_sched_30ms; +extern __thread unsigned long long _st_stat_sched_35ms; +extern __thread unsigned long long _st_stat_sched_40ms; +extern __thread unsigned long long _st_stat_sched_80ms; +extern __thread unsigned long long _st_stat_sched_160ms; +extern __thread unsigned long long _st_stat_sched_s; + +extern __thread int _st_active_count; +extern __thread int _st_num_free_stacks; + +extern __thread unsigned long long _st_stat_thread_run; +extern __thread unsigned long long _st_stat_thread_idle; +extern __thread unsigned long long _st_stat_thread_yield; +extern __thread unsigned long long _st_stat_thread_yield2; +} +#endif + +srs_error_t srs_global_kbps_initialize() +{ + srs_error_t err = srs_success; + + // The clock wall object. + _srs_clock = new SrsWallClock(); + + // Initialize global pps, which depends on _srs_clock + _srs_pps_ids = new SrsPps(); + _srs_pps_fids = new SrsPps(); + _srs_pps_fids_level0 = new SrsPps(); + _srs_pps_dispose = new SrsPps(); + + _srs_pps_timer = new SrsPps(); + _srs_pps_conn = new SrsPps(); + _srs_pps_pub = new SrsPps(); + + _srs_pps_snack = new SrsPps(); + _srs_pps_snack2 = new SrsPps(); + _srs_pps_snack3 = new SrsPps(); + _srs_pps_snack4 = new SrsPps(); + _srs_pps_sanack = new SrsPps(); + _srs_pps_svnack = new SrsPps(); + + _srs_pps_rnack = new SrsPps(); + _srs_pps_rnack2 = new SrsPps(); + _srs_pps_rhnack = new SrsPps(); + _srs_pps_rmnack = new SrsPps(); + +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) + _srs_pps_recvfrom = new SrsPps(); + _srs_pps_recvfrom_eagain = new SrsPps(); + _srs_pps_sendto = new SrsPps(); + _srs_pps_sendto_eagain = new SrsPps(); + + _srs_pps_read = new SrsPps(); + _srs_pps_read_eagain = new SrsPps(); + _srs_pps_readv = new SrsPps(); + _srs_pps_readv_eagain = new SrsPps(); + _srs_pps_writev = new SrsPps(); + _srs_pps_writev_eagain = new SrsPps(); + + _srs_pps_recvmsg = new SrsPps(); + _srs_pps_recvmsg_eagain = new SrsPps(); + _srs_pps_sendmsg = new SrsPps(); + _srs_pps_sendmsg_eagain = new SrsPps(); + + _srs_pps_epoll = new SrsPps(); + _srs_pps_epoll_zero = new SrsPps(); + _srs_pps_epoll_shake = new SrsPps(); + _srs_pps_epoll_spin = new SrsPps(); + + _srs_pps_sched_15ms = new SrsPps(); + _srs_pps_sched_20ms = new SrsPps(); + _srs_pps_sched_25ms = new SrsPps(); + _srs_pps_sched_30ms = new SrsPps(); + _srs_pps_sched_35ms = new SrsPps(); + _srs_pps_sched_40ms = new SrsPps(); + _srs_pps_sched_80ms = new SrsPps(); + _srs_pps_sched_160ms = new SrsPps(); + _srs_pps_sched_s = new SrsPps(); +#endif + + _srs_pps_clock_15ms = new SrsPps(); + _srs_pps_clock_20ms = new SrsPps(); + _srs_pps_clock_25ms = new SrsPps(); + _srs_pps_clock_30ms = new SrsPps(); + _srs_pps_clock_35ms = new SrsPps(); + _srs_pps_clock_40ms = new SrsPps(); + _srs_pps_clock_80ms = new SrsPps(); + _srs_pps_clock_160ms = new SrsPps(); + _srs_pps_timer_s = new SrsPps(); + +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) + _srs_pps_thread_run = new SrsPps(); + _srs_pps_thread_idle = new SrsPps(); + _srs_pps_thread_yield = new SrsPps(); + _srs_pps_thread_yield2 = new SrsPps(); +#endif + + _srs_pps_rpkts = new SrsPps(); + _srs_pps_addrs = new SrsPps(); + _srs_pps_fast_addrs = new SrsPps(); + + _srs_pps_spkts = new SrsPps(); + _srs_pps_objs_msgs = new SrsPps(); + + _srs_pps_sstuns = new SrsPps(); + _srs_pps_srtcps = new SrsPps(); + _srs_pps_srtps = new SrsPps(); + + _srs_pps_rstuns = new SrsPps(); + _srs_pps_rrtps = new SrsPps(); + _srs_pps_rrtcps = new SrsPps(); + + _srs_pps_aloss2 = new SrsPps(); + + _srs_pps_pli = new SrsPps(); + _srs_pps_twcc = new SrsPps(); + _srs_pps_rr = new SrsPps(); + + _srs_pps_objs_rtps = new SrsPps(); + _srs_pps_objs_rraw = new SrsPps(); + _srs_pps_objs_rfua = new SrsPps(); + _srs_pps_objs_rbuf = new SrsPps(); + _srs_pps_objs_rothers = new SrsPps(); + + // The pps cids depends by st init. + _srs_pps_cids_get = new SrsPps(); + _srs_pps_cids_set = new SrsPps(); + + return err; +} + +void srs_global_kbps_update(SrsKbpsStats *stats) +{ + static char buf[128]; + + string &cid_desc = stats->cid_desc; + _srs_pps_cids_get->update(); + _srs_pps_cids_set->update(); + if (_srs_pps_cids_get->r10s() || _srs_pps_cids_set->r10s()) { + snprintf(buf, sizeof(buf), ", cid=%d,%d", _srs_pps_cids_get->r10s(), _srs_pps_cids_set->r10s()); + cid_desc = buf; + } + string &timer_desc = stats->timer_desc; + _srs_pps_timer->update(); + _srs_pps_pub->update(); + _srs_pps_conn->update(); + if (_srs_pps_timer->r10s() || _srs_pps_pub->r10s() || _srs_pps_conn->r10s()) { + snprintf(buf, sizeof(buf), ", timer=%d,%d,%d", _srs_pps_timer->r10s(), _srs_pps_pub->r10s(), _srs_pps_conn->r10s()); + timer_desc = buf; + } + + string &free_desc = stats->free_desc; + _srs_pps_dispose->update(); + if (_srs_pps_dispose->r10s()) { + snprintf(buf, sizeof(buf), ", free=%d", _srs_pps_dispose->r10s()); + free_desc = buf; + } + + string &recvfrom_desc = stats->recvfrom_desc; +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) + _srs_pps_recvfrom->update(_st_stat_recvfrom); + _srs_pps_recvfrom_eagain->update(_st_stat_recvfrom_eagain); + _srs_pps_sendto->update(_st_stat_sendto); + _srs_pps_sendto_eagain->update(_st_stat_sendto_eagain); + if (_srs_pps_recvfrom->r10s() || _srs_pps_recvfrom_eagain->r10s() || _srs_pps_sendto->r10s() || _srs_pps_sendto_eagain->r10s()) { + snprintf(buf, sizeof(buf), ", udp=%d,%d,%d,%d", _srs_pps_recvfrom->r10s(), _srs_pps_recvfrom_eagain->r10s(), _srs_pps_sendto->r10s(), _srs_pps_sendto_eagain->r10s()); + recvfrom_desc = buf; + } +#endif + + string &io_desc = stats->io_desc; +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) + _srs_pps_read->update(_st_stat_read); + _srs_pps_read_eagain->update(_st_stat_read_eagain); + _srs_pps_readv->update(_st_stat_readv); + _srs_pps_readv_eagain->update(_st_stat_readv_eagain); + _srs_pps_writev->update(_st_stat_writev); + _srs_pps_writev_eagain->update(_st_stat_writev_eagain); + if (_srs_pps_read->r10s() || _srs_pps_read_eagain->r10s() || _srs_pps_readv->r10s() || _srs_pps_readv_eagain->r10s() || _srs_pps_writev->r10s() || _srs_pps_writev_eagain->r10s()) { + snprintf(buf, sizeof(buf), ", io=%d,%d,%d,%d,%d,%d", _srs_pps_read->r10s(), _srs_pps_read_eagain->r10s(), _srs_pps_readv->r10s(), _srs_pps_readv_eagain->r10s(), _srs_pps_writev->r10s(), _srs_pps_writev_eagain->r10s()); + io_desc = buf; + } +#endif + + string &msg_desc = stats->msg_desc; +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) + _srs_pps_recvmsg->update(_st_stat_recvmsg); + _srs_pps_recvmsg_eagain->update(_st_stat_recvmsg_eagain); + _srs_pps_sendmsg->update(_st_stat_sendmsg); + _srs_pps_sendmsg_eagain->update(_st_stat_sendmsg_eagain); + if (_srs_pps_recvmsg->r10s() || _srs_pps_recvmsg_eagain->r10s() || _srs_pps_sendmsg->r10s() || _srs_pps_sendmsg_eagain->r10s()) { + snprintf(buf, sizeof(buf), ", msg=%d,%d,%d,%d", _srs_pps_recvmsg->r10s(), _srs_pps_recvmsg_eagain->r10s(), _srs_pps_sendmsg->r10s(), _srs_pps_sendmsg_eagain->r10s()); + msg_desc = buf; + } +#endif + + string &epoll_desc = stats->epoll_desc; +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) + _srs_pps_epoll->update(_st_stat_epoll); + _srs_pps_epoll_zero->update(_st_stat_epoll_zero); + _srs_pps_epoll_shake->update(_st_stat_epoll_shake); + _srs_pps_epoll_spin->update(_st_stat_epoll_spin); + if (_srs_pps_epoll->r10s() || _srs_pps_epoll_zero->r10s() || _srs_pps_epoll_shake->r10s() || _srs_pps_epoll_spin->r10s()) { + snprintf(buf, sizeof(buf), ", epoll=%d,%d,%d,%d", _srs_pps_epoll->r10s(), _srs_pps_epoll_zero->r10s(), _srs_pps_epoll_shake->r10s(), _srs_pps_epoll_spin->r10s()); + epoll_desc = buf; + } +#endif + + string &sched_desc = stats->sched_desc; +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) + _srs_pps_sched_160ms->update(_st_stat_sched_160ms); + _srs_pps_sched_s->update(_st_stat_sched_s); + _srs_pps_sched_15ms->update(_st_stat_sched_15ms); + _srs_pps_sched_20ms->update(_st_stat_sched_20ms); + _srs_pps_sched_25ms->update(_st_stat_sched_25ms); + _srs_pps_sched_30ms->update(_st_stat_sched_30ms); + _srs_pps_sched_35ms->update(_st_stat_sched_35ms); + _srs_pps_sched_40ms->update(_st_stat_sched_40ms); + _srs_pps_sched_80ms->update(_st_stat_sched_80ms); + if (_srs_pps_sched_160ms->r10s() || _srs_pps_sched_s->r10s() || _srs_pps_sched_15ms->r10s() || _srs_pps_sched_20ms->r10s() || _srs_pps_sched_25ms->r10s() || _srs_pps_sched_30ms->r10s() || _srs_pps_sched_35ms->r10s() || _srs_pps_sched_40ms->r10s() || _srs_pps_sched_80ms->r10s()) { + snprintf(buf, sizeof(buf), ", sched=%d,%d,%d,%d,%d,%d,%d,%d,%d", _srs_pps_sched_15ms->r10s(), _srs_pps_sched_20ms->r10s(), _srs_pps_sched_25ms->r10s(), _srs_pps_sched_30ms->r10s(), _srs_pps_sched_35ms->r10s(), _srs_pps_sched_40ms->r10s(), _srs_pps_sched_80ms->r10s(), _srs_pps_sched_160ms->r10s(), _srs_pps_sched_s->r10s()); + sched_desc = buf; + } +#endif + + string &clock_desc = stats->clock_desc; + _srs_pps_clock_15ms->update(); + _srs_pps_clock_20ms->update(); + _srs_pps_clock_25ms->update(); + _srs_pps_clock_30ms->update(); + _srs_pps_clock_35ms->update(); + _srs_pps_clock_40ms->update(); + _srs_pps_clock_80ms->update(); + _srs_pps_clock_160ms->update(); + _srs_pps_timer_s->update(); + if (_srs_pps_clock_15ms->r10s() || _srs_pps_timer_s->r10s() || _srs_pps_clock_20ms->r10s() || _srs_pps_clock_25ms->r10s() || _srs_pps_clock_30ms->r10s() || _srs_pps_clock_35ms->r10s() || _srs_pps_clock_40ms->r10s() || _srs_pps_clock_80ms->r10s() || _srs_pps_clock_160ms->r10s()) { + snprintf(buf, sizeof(buf), ", clock=%d,%d,%d,%d,%d,%d,%d,%d,%d", _srs_pps_clock_15ms->r10s(), _srs_pps_clock_20ms->r10s(), _srs_pps_clock_25ms->r10s(), _srs_pps_clock_30ms->r10s(), _srs_pps_clock_35ms->r10s(), _srs_pps_clock_40ms->r10s(), _srs_pps_clock_80ms->r10s(), _srs_pps_clock_160ms->r10s(), _srs_pps_timer_s->r10s()); + clock_desc = buf; + } + + string &thread_desc = stats->thread_desc; +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) + _srs_pps_thread_run->update(_st_stat_thread_run); + _srs_pps_thread_idle->update(_st_stat_thread_idle); + _srs_pps_thread_yield->update(_st_stat_thread_yield); + _srs_pps_thread_yield2->update(_st_stat_thread_yield2); + if (_st_active_count > 0 || _st_num_free_stacks > 0 || _srs_pps_thread_run->r10s() || _srs_pps_thread_idle->r10s() || _srs_pps_thread_yield->r10s() || _srs_pps_thread_yield2->r10s()) { + snprintf(buf, sizeof(buf), ", co=%d,%d,%d, stk=%d, yield=%d,%d", _st_active_count, _srs_pps_thread_run->r10s(), _srs_pps_thread_idle->r10s(), _st_num_free_stacks, _srs_pps_thread_yield->r10s(), _srs_pps_thread_yield2->r10s()); + thread_desc = buf; + } +#endif + + string &objs_desc = stats->objs_desc; + _srs_pps_objs_rtps->update(); + _srs_pps_objs_rraw->update(); + _srs_pps_objs_rfua->update(); + _srs_pps_objs_rbuf->update(); + _srs_pps_objs_msgs->update(); + _srs_pps_objs_rothers->update(); + if (_srs_pps_objs_rtps->r10s() || _srs_pps_objs_rraw->r10s() || _srs_pps_objs_rfua->r10s() || _srs_pps_objs_rbuf->r10s() || _srs_pps_objs_msgs->r10s() || _srs_pps_objs_rothers->r10s()) { + snprintf(buf, sizeof(buf), ", objs=(pkt:%d,raw:%d,fua:%d,msg:%d,oth:%d,buf:%d)", + _srs_pps_objs_rtps->r10s(), _srs_pps_objs_rraw->r10s(), _srs_pps_objs_rfua->r10s(), + _srs_pps_objs_msgs->r10s(), _srs_pps_objs_rothers->r10s(), _srs_pps_objs_rbuf->r10s()); + objs_desc = buf; + } +} + +void srs_global_rtc_update(SrsKbsRtcStats *stats) +{ + static char buf[128]; + + string &rpkts_desc = stats->rpkts_desc; + _srs_pps_rpkts->update(); + _srs_pps_rrtps->update(); + _srs_pps_rstuns->update(); + _srs_pps_rrtcps->update(); + if (_srs_pps_rpkts->r10s() || _srs_pps_rrtps->r10s() || _srs_pps_rstuns->r10s() || _srs_pps_rrtcps->r10s()) { + snprintf(buf, sizeof(buf), ", rpkts=(%d,rtp:%d,stun:%d,rtcp:%d)", _srs_pps_rpkts->r10s(), _srs_pps_rrtps->r10s(), _srs_pps_rstuns->r10s(), _srs_pps_rrtcps->r10s()); + rpkts_desc = buf; + } + + string &spkts_desc = stats->spkts_desc; + _srs_pps_spkts->update(); + _srs_pps_srtps->update(); + _srs_pps_sstuns->update(); + _srs_pps_srtcps->update(); + if (_srs_pps_spkts->r10s() || _srs_pps_srtps->r10s() || _srs_pps_sstuns->r10s() || _srs_pps_srtcps->r10s()) { + snprintf(buf, sizeof(buf), ", spkts=(%d,rtp:%d,stun:%d,rtcp:%d)", _srs_pps_spkts->r10s(), _srs_pps_srtps->r10s(), _srs_pps_sstuns->r10s(), _srs_pps_srtcps->r10s()); + spkts_desc = buf; + } + + string &rtcp_desc = stats->rtcp_desc; + _srs_pps_pli->update(); + _srs_pps_twcc->update(); + _srs_pps_rr->update(); + if (_srs_pps_pli->r10s() || _srs_pps_twcc->r10s() || _srs_pps_rr->r10s()) { + snprintf(buf, sizeof(buf), ", rtcp=(pli:%d,twcc:%d,rr:%d)", _srs_pps_pli->r10s(), _srs_pps_twcc->r10s(), _srs_pps_rr->r10s()); + rtcp_desc = buf; + } + + string &snk_desc = stats->snk_desc; + _srs_pps_snack->update(); + _srs_pps_snack2->update(); + _srs_pps_sanack->update(); + _srs_pps_svnack->update(); + if (_srs_pps_snack->r10s() || _srs_pps_sanack->r10s() || _srs_pps_svnack->r10s() || _srs_pps_snack2->r10s()) { + snprintf(buf, sizeof(buf), ", snk=(%d,a:%d,v:%d,h:%d)", _srs_pps_snack->r10s(), _srs_pps_sanack->r10s(), _srs_pps_svnack->r10s(), _srs_pps_snack2->r10s()); + snk_desc = buf; + } + + string &rnk_desc = stats->rnk_desc; + _srs_pps_rnack->update(); + _srs_pps_rnack2->update(); + _srs_pps_rhnack->update(); + _srs_pps_rmnack->update(); + if (_srs_pps_rnack->r10s() || _srs_pps_rnack2->r10s() || _srs_pps_rhnack->r10s() || _srs_pps_rmnack->r10s()) { + snprintf(buf, sizeof(buf), ", rnk=(%d,%d,h:%d,m:%d)", _srs_pps_rnack->r10s(), _srs_pps_rnack2->r10s(), _srs_pps_rhnack->r10s(), _srs_pps_rmnack->r10s()); + rnk_desc = buf; + } + + string &fid_desc = stats->fid_desc; + _srs_pps_ids->update(); + _srs_pps_fids->update(); + _srs_pps_fids_level0->update(); + _srs_pps_addrs->update(); + _srs_pps_fast_addrs->update(); + if (_srs_pps_ids->r10s(), _srs_pps_fids->r10s(), _srs_pps_fids_level0->r10s(), _srs_pps_addrs->r10s(), _srs_pps_fast_addrs->r10s()) { + snprintf(buf, sizeof(buf), ", fid=(id:%d,fid:%d,ffid:%d,addr:%d,faddr:%d)", _srs_pps_ids->r10s(), _srs_pps_fids->r10s(), _srs_pps_fids_level0->r10s(), _srs_pps_addrs->r10s(), _srs_pps_fast_addrs->r10s()); + fid_desc = buf; + } +} diff --git a/trunk/src/kernel/srs_kernel_kbps.hpp b/trunk/src/kernel/srs_kernel_kbps.hpp index 482bf6be5..f7e2bef08 100644 --- a/trunk/src/kernel/srs_kernel_kbps.hpp +++ b/trunk/src/kernel/srs_kernel_kbps.hpp @@ -11,6 +11,8 @@ #include +#include + class SrsWallClock; // A sample for rate-based stat, such as kbps or kps. @@ -82,4 +84,148 @@ public: // The global clock. extern SrsWallClock *_srs_clock; +// Global SrsPps statistics variables +// I/O operations statistics +extern SrsPps *_srs_pps_recvfrom; +extern SrsPps *_srs_pps_recvfrom_eagain; +extern SrsPps *_srs_pps_sendto; +extern SrsPps *_srs_pps_sendto_eagain; + +extern SrsPps *_srs_pps_read; +extern SrsPps *_srs_pps_read_eagain; +extern SrsPps *_srs_pps_readv; +extern SrsPps *_srs_pps_readv_eagain; +extern SrsPps *_srs_pps_writev; +extern SrsPps *_srs_pps_writev_eagain; + +extern SrsPps *_srs_pps_recvmsg; +extern SrsPps *_srs_pps_recvmsg_eagain; +extern SrsPps *_srs_pps_sendmsg; +extern SrsPps *_srs_pps_sendmsg_eagain; + +// Clock and timing statistics +extern SrsPps *_srs_pps_clock_15ms; +extern SrsPps *_srs_pps_clock_20ms; +extern SrsPps *_srs_pps_clock_25ms; +extern SrsPps *_srs_pps_clock_30ms; +extern SrsPps *_srs_pps_clock_35ms; +extern SrsPps *_srs_pps_clock_40ms; +extern SrsPps *_srs_pps_clock_80ms; +extern SrsPps *_srs_pps_clock_160ms; +extern SrsPps *_srs_pps_timer_s; + +// WebRTC packet statistics +extern SrsPps *_srs_pps_rpkts; +extern SrsPps *_srs_pps_rstuns; +extern SrsPps *_srs_pps_rrtps; +extern SrsPps *_srs_pps_rrtcps; +extern SrsPps *_srs_pps_addrs; +extern SrsPps *_srs_pps_fast_addrs; + +extern SrsPps *_srs_pps_spkts; +extern SrsPps *_srs_pps_sstuns; +extern SrsPps *_srs_pps_srtcps; +extern SrsPps *_srs_pps_srtps; + +// Object and resource statistics +extern SrsPps *_srs_pps_ids; +extern SrsPps *_srs_pps_fids; +extern SrsPps *_srs_pps_fids_level0; +extern SrsPps *_srs_pps_dispose; + +extern SrsPps *_srs_pps_timer; +extern SrsPps *_srs_pps_pub; +extern SrsPps *_srs_pps_conn; + +extern SrsPps *_srs_pps_cids_get; +extern SrsPps *_srs_pps_cids_set; + +// NACK and loss statistics +extern SrsPps *_srs_pps_snack; +extern SrsPps *_srs_pps_snack2; +extern SrsPps *_srs_pps_snack3; +extern SrsPps *_srs_pps_snack4; +extern SrsPps *_srs_pps_sanack; +extern SrsPps *_srs_pps_svnack; +extern SrsPps *_srs_pps_aloss2; + +extern SrsPps *_srs_pps_rnack; +extern SrsPps *_srs_pps_rnack2; +extern SrsPps *_srs_pps_rhnack; +extern SrsPps *_srs_pps_rmnack; + +// WebRTC control statistics +extern SrsPps *_srs_pps_pli; +extern SrsPps *_srs_pps_twcc; +extern SrsPps *_srs_pps_rr; + +// Object statistics +extern SrsPps *_srs_pps_objs_rtps; +extern SrsPps *_srs_pps_objs_rraw; +extern SrsPps *_srs_pps_objs_rfua; +extern SrsPps *_srs_pps_objs_rbuf; +extern SrsPps *_srs_pps_objs_msgs; +extern SrsPps *_srs_pps_objs_rothers; + +#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) +// Debug thread statistics +extern SrsPps *_srs_pps_thread_run; +extern SrsPps *_srs_pps_thread_idle; +extern SrsPps *_srs_pps_thread_yield; +extern SrsPps *_srs_pps_thread_yield2; + +// Debug epoll statistics +extern SrsPps *_srs_pps_epoll; +extern SrsPps *_srs_pps_epoll_zero; +extern SrsPps *_srs_pps_epoll_shake; +extern SrsPps *_srs_pps_epoll_spin; + +// Debug scheduler statistics +extern SrsPps *_srs_pps_sched_15ms; +extern SrsPps *_srs_pps_sched_20ms; +extern SrsPps *_srs_pps_sched_25ms; +extern SrsPps *_srs_pps_sched_30ms; +extern SrsPps *_srs_pps_sched_35ms; +extern SrsPps *_srs_pps_sched_40ms; +extern SrsPps *_srs_pps_sched_80ms; +extern SrsPps *_srs_pps_sched_160ms; +extern SrsPps *_srs_pps_sched_s; +#endif + +// Initialize the global kbps statistics variables +srs_error_t srs_global_kbps_initialize(); + +class SrsKbpsStats +{ +public: + std::string cid_desc; + std::string timer_desc; + std::string free_desc; + std::string recvfrom_desc; + std::string io_desc; + std::string msg_desc; + std::string epoll_desc; + std::string sched_desc; + std::string clock_desc; + std::string thread_desc; + std::string objs_desc; +}; + +// Update the global kbps statistics variables +void srs_global_kbps_update(SrsKbpsStats *stats); + +class SrsKbsRtcStats +{ +public: + std::string rpkts_desc; + std::string spkts_desc; + std::string rtcp_desc; + std::string snk_desc; + std::string rnk_desc; + std::string fid_desc; +}; + +// Update the global rtc statistics variables +void srs_global_rtc_update(SrsKbsRtcStats *stats); + #endif diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index fcf53d464..94086f91b 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -42,6 +42,7 @@ using namespace std; #include #include #include +#include #include #include @@ -73,9 +74,6 @@ bool _srs_in_docker = false; extern void asan_report_callback(const char *str); #endif -extern SrsPps *_srs_pps_cids_get; -extern SrsPps *_srs_pps_cids_set; - /** * main entrance. */