AI: Extract shared components and improve SRS server architecture. v7.0.70 (#4461)

Move global xpps statistics variables from `srs_app_server.cpp` to
`srs_kernel_kbps.cpp`.

Extract global shared timers from `SrsServer` into new `SrsSharedTimer`
class.

Extract WebRTC session management logic from `SrsServer` into dedicated
`SrsRtcSessionManager` class.

Extract PID file handling into dedicated  `SrsPidFileLocker` class.

---------

Co-authored-by: OSSRS-AI <winlinam@gmail.com>
This commit is contained in:
Winlin 2025-08-31 19:14:34 -04:00 committed by GitHub
parent 3ca4f0a068
commit 728828e1dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1570 additions and 1328 deletions

View File

@ -181,7 +181,6 @@ Configure to generate Makefile.
Features:
--https=on|off Whether enable HTTPS client and server. Default: $(value2switch $SRS_HTTPS)
--utest=on|off Whether build the utest. Default: $(value2switch $SRS_UTEST)
--srt=on|off Whether build the SRT. Default: $(value2switch $SRS_SRT)
--rtsp=on|off Whether build the RTSP (requires RTC). Default: $(value2switch $SRS_RTSP)
--gb28181=on|off Whether build the GB28181. Default: $(value2switch $SRS_GB28181)
--ffmpeg-fit=on|off Whether enable the FFmpeg fit(source code). Default: $(value2switch $SRS_FFMPEG_FIT)
@ -252,6 +251,7 @@ Experts:
Deprecated:
--h265=on Always enable the build for the HEVC(H.265) support.
--rtc=on Always enable WebRTC support. Default: $(value2switch $SRS_RTC)
--srt=on|off Always enable SRT support. Default: $(value2switch $SRS_SRT)
--single-thread=on Always force single thread mode. Default: $(value2switch $SRS_SINGLE_THREAD)
--cross-build Enable cross-build, please set bellow Toolchain also. Default: $(value2switch $SRS_CROSS_BUILD)
--hds=on|off Whether build the hds streaming, mux RTMP to F4M/F4V files. Default: $(value2switch $SRS_HDS)

View File

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a>
## SRS 7.0 Changelog
* v7.0, 2025-08-31, Merge [#4461](https://github.com/ossrs/srs/pull/4461): AI: Extract shared components and improve SRS server architecture. v7.0.70 (#4461)
* v7.0, 2025-08-31, Merge [#4460](https://github.com/ossrs/srs/pull/4460): AI: Always enable SRT protocol. v7.0.69 (#4460)
* v7.0, 2025-08-31, Merge [#4459](https://github.com/ossrs/srs/pull/4459): AI: Merge SRT and RTC servers into unified SrsServer. v7.0.68 (#4459)
* v7.0, 2025-08-29, Merge [#4457](https://github.com/ossrs/srs/pull/4457): Support IPv6 for all protocols: RTMP, HTTP/HTTPS, WebRTC, SRT, RTSP. v7.0.67 (#4457)

View File

@ -7,6 +7,7 @@
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_config.hpp>
#include <srs_app_hourglass.hpp>
#include <srs_app_server.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_kbps.hpp>
@ -62,7 +63,7 @@ srs_error_t SrsCircuitBreaker::initialize()
// Update the water level for circuit breaker.
// @see SrsCircuitBreaker::on_timer()
_srs_server->timer1s()->subscribe(this);
_srs_shared_timer->timer1s()->subscribe(this);
srs_trace("CircuitBreaker: enabled=%d, high=%dx%d, critical=%dx%d, dying=%dx%d", enabled_,
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_,

View File

@ -1005,3 +1005,5 @@ srs_error_t SrsSslConnection::writev(const iovec *iov, int iov_size, ssize_t *nw
return err;
}
SrsResourceManager *_srs_conn_manager = NULL;

View File

@ -328,4 +328,7 @@ public:
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t *nwrite);
};
// Manager for RTC connections.
extern SrsResourceManager *_srs_conn_manager;
#endif

View File

@ -408,12 +408,11 @@ std::string SrsGbSession::desc()
return "GBS";
}
SrsGbListener::SrsGbListener(ISrsHttpServeMux *http_api_mux)
SrsGbListener::SrsGbListener()
{
conf_ = NULL;
sip_listener_ = new SrsTcpListener(this);
media_listener_ = new SrsTcpListener(this);
http_api_mux_ = http_api_mux;
}
SrsGbListener::~SrsGbListener()
@ -470,7 +469,8 @@ srs_error_t SrsGbListener::listen_api()
{
srs_error_t err = srs_success;
if ((err = http_api_mux_->handle("/gb/v1/publish/", new SrsGoApiGbPublish(conf_))) != srs_success) {
ISrsHttpServeMux *mux = _srs_server->api_server();
if ((err = mux->handle("/gb/v1/publish/", new SrsGoApiGbPublish(conf_))) != srs_success) {
return srs_error_wrap(err, "handle publish");
}

View File

@ -219,10 +219,9 @@ private:
SrsConfDirective *conf_;
SrsTcpListener *media_listener_;
SrsTcpListener *sip_listener_;
ISrsHttpServeMux *http_api_mux_;
public:
SrsGbListener(ISrsHttpServeMux *http_api_mux);
SrsGbListener();
virtual ~SrsGbListener();
public:

View File

@ -240,3 +240,77 @@ srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval)
return err;
}
SrsSharedTimer::SrsSharedTimer()
{
timer20ms_ = NULL;
timer100ms_ = NULL;
timer1s_ = NULL;
timer5s_ = NULL;
clock_monitor_ = NULL;
}
SrsSharedTimer::~SrsSharedTimer()
{
srs_freep(timer20ms_);
srs_freep(timer100ms_);
srs_freep(timer1s_);
srs_freep(timer5s_);
srs_freep(clock_monitor_);
}
srs_error_t SrsSharedTimer::initialize()
{
srs_error_t err = srs_success;
// Initialize global shared timers
timer20ms_ = new SrsFastTimer("shared", 20 * SRS_UTIME_MILLISECONDS);
timer100ms_ = new SrsFastTimer("shared", 100 * SRS_UTIME_MILLISECONDS);
timer1s_ = new SrsFastTimer("shared", 1 * SRS_UTIME_SECONDS);
timer5s_ = new SrsFastTimer("shared", 5 * SRS_UTIME_SECONDS);
clock_monitor_ = new SrsClockWallMonitor();
// Start all timers
if ((err = timer20ms_->start()) != srs_success) {
return srs_error_wrap(err, "start timer20ms");
}
if ((err = timer100ms_->start()) != srs_success) {
return srs_error_wrap(err, "start timer100ms");
}
if ((err = timer1s_->start()) != srs_success) {
return srs_error_wrap(err, "start timer1s");
}
if ((err = timer5s_->start()) != srs_success) {
return srs_error_wrap(err, "start timer5s");
}
// Register clock monitor to 20ms timer
timer20ms_->subscribe(clock_monitor_);
return err;
}
SrsFastTimer *SrsSharedTimer::timer20ms()
{
return timer20ms_;
}
SrsFastTimer *SrsSharedTimer::timer100ms()
{
return timer100ms_;
}
SrsFastTimer *SrsSharedTimer::timer1s()
{
return timer1s_;
}
SrsFastTimer *SrsSharedTimer::timer5s()
{
return timer5s_;
}
SrsSharedTimer *_srs_shared_timer = NULL;

View File

@ -144,4 +144,33 @@ private:
srs_error_t on_timer(srs_utime_t interval);
};
// Global shared timer manager
class SrsSharedTimer
{
private:
SrsFastTimer *timer20ms_;
SrsFastTimer *timer100ms_;
SrsFastTimer *timer1s_;
SrsFastTimer *timer5s_;
SrsClockWallMonitor *clock_monitor_;
public:
SrsSharedTimer();
virtual ~SrsSharedTimer();
public:
// Initialize and start all timers
srs_error_t initialize();
public:
// Access to global shared timers
SrsFastTimer *timer20ms();
SrsFastTimer *timer100ms();
SrsFastTimer *timer1s();
SrsFastTimer *timer5s();
};
// Global shared timer instance
extern SrsSharedTimer *_srs_shared_timer;
#endif

View File

@ -15,6 +15,7 @@
using namespace std;
#include <srs_app_config.hpp>
#include <srs_app_hourglass.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_server.hpp>
@ -60,13 +61,13 @@ void SrsHlsVirtualConn::expire()
SrsHlsStream::SrsHlsStream()
{
_srs_server->timer5s()->subscribe(this);
_srs_shared_timer->timer5s()->subscribe(this);
security_ = new SrsSecurity();
}
SrsHlsStream::~SrsHlsStream()
{
_srs_server->timer5s()->unsubscribe(this);
_srs_shared_timer->timer5s()->unsubscribe(this);
std::map<std::string, SrsHlsVirtualConn *>::iterator it;
for (it = map_ctx_info_.begin(); it != map_ctx_info_.end(); ++it) {

View File

@ -21,6 +21,7 @@ using namespace std;
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_config.hpp>
#include <srs_app_hourglass.hpp>
#include <srs_app_http_api.hpp>
#include <srs_app_http_hooks.hpp>
#include <srs_app_log.hpp>
@ -442,7 +443,7 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection *s, const SrsContextId &cid)
SrsRtcPlayStream::~SrsRtcPlayStream()
{
if (req_) {
session_->server_->exec_rtc_async_work(new SrsRtcAsyncCallOnStop(cid_, req_));
session_->exec_->exec_rtc_async_work(new SrsRtcAsyncCallOnStop(cid_, req_));
}
_srs_config->unsubscribe(this);
@ -927,12 +928,12 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci
SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream *p) : p_(p)
{
_srs_server->timer1s()->subscribe(this);
_srs_shared_timer->timer1s()->subscribe(this);
}
SrsRtcPublishRtcpTimer::~SrsRtcPublishRtcpTimer()
{
_srs_server->timer1s()->unsubscribe(this);
_srs_shared_timer->timer1s()->unsubscribe(this);
}
srs_error_t SrsRtcPublishRtcpTimer::on_timer(srs_utime_t interval)
@ -963,12 +964,12 @@ srs_error_t SrsRtcPublishRtcpTimer::on_timer(srs_utime_t interval)
SrsRtcPublishTwccTimer::SrsRtcPublishTwccTimer(SrsRtcPublishStream *p) : p_(p)
{
_srs_server->timer100ms()->subscribe(this);
_srs_shared_timer->timer100ms()->subscribe(this);
}
SrsRtcPublishTwccTimer::~SrsRtcPublishTwccTimer()
{
_srs_server->timer100ms()->unsubscribe(this);
_srs_shared_timer->timer100ms()->unsubscribe(this);
}
srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval)
@ -1084,7 +1085,7 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection *session, const SrsCon
SrsRtcPublishStream::~SrsRtcPublishStream()
{
if (req_) {
session_->server_->exec_rtc_async_work(new SrsRtcAsyncCallOnUnpublish(cid_, req_));
session_->exec_->exec_rtc_async_work(new SrsRtcAsyncCallOnUnpublish(cid_, req_));
}
srs_freep(timer_rtcp_);
@ -1722,12 +1723,12 @@ void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp &n
SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(SrsRtcConnection *p) : p_(p)
{
_srs_server->timer20ms()->subscribe(this);
_srs_shared_timer->timer20ms()->subscribe(this);
}
SrsRtcConnectionNackTimer::~SrsRtcConnectionNackTimer()
{
_srs_server->timer20ms()->unsubscribe(this);
_srs_shared_timer->timer20ms()->unsubscribe(this);
}
srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval)
@ -1759,12 +1760,20 @@ srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval)
return err;
}
SrsRtcConnection::SrsRtcConnection(SrsServer *s, const SrsContextId &cid)
ISrsExecRtcAsyncTask::ISrsExecRtcAsyncTask()
{
}
ISrsExecRtcAsyncTask::~ISrsExecRtcAsyncTask()
{
}
SrsRtcConnection::SrsRtcConnection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid)
{
req_ = NULL;
cid_ = cid;
server_ = s;
exec_ = exec;
networks_ = new SrsRtcNetworks(this);
cache_iov_ = new iovec();

View File

@ -461,6 +461,17 @@ private:
srs_error_t on_timer(srs_utime_t interval);
};
// The interface for RTC async task.
class ISrsExecRtcAsyncTask
{
public:
ISrsExecRtcAsyncTask();
virtual ~ISrsExecRtcAsyncTask();
public:
virtual srs_error_t exec_rtc_async_work(ISrsAsyncCallTask *t) = 0;
};
// A RTC Peer Connection, SDP level object.
//
// For performance, we use non-public from resource,
@ -479,7 +490,7 @@ public:
bool disposing_;
private:
SrsServer *server_;
ISrsExecRtcAsyncTask *exec_;
private:
iovec *cache_iov_;
@ -529,7 +540,7 @@ private:
bool nack_enabled_;
public:
SrsRtcConnection(SrsServer *s, const SrsContextId &cid);
SrsRtcConnection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid);
virtual ~SrsRtcConnection();
// interface ISrsDisposingHandler
public:

View File

@ -12,6 +12,7 @@
using namespace std;
#include <srs_app_config.hpp>
#include <srs_app_conn.hpp>
#include <srs_app_http_api.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtc_api.hpp>
@ -21,6 +22,7 @@ using namespace std;
#include <srs_app_rtc_source.hpp>
#include <srs_app_server.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_stream_token.hpp>
#include <srs_app_utility.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_error.hpp>
@ -306,3 +308,331 @@ SrsRtcUserConfig::~SrsRtcUserConfig()
{
srs_freep(req_);
}
SrsRtcSessionManager::SrsRtcSessionManager()
{
rtc_async_ = new SrsAsyncCallWorker();
}
SrsRtcSessionManager::~SrsRtcSessionManager()
{
rtc_async_->stop();
srs_freep(rtc_async_);
}
srs_error_t SrsRtcSessionManager::initialize()
{
srs_error_t err = srs_success;
if ((err = rtc_async_->start()) != srs_success) {
return srs_error_wrap(err, "start async worker");
}
return err;
}
SrsRtcConnection *SrsRtcSessionManager::find_rtc_session_by_username(const std::string &username)
{
ISrsResource *conn = _srs_conn_manager->find_by_name(username);
return dynamic_cast<SrsRtcConnection *>(conn);
}
srs_error_t SrsRtcSessionManager::create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession)
{
srs_error_t err = srs_success;
ISrsRequest *req = ruc->req_;
// Acquire stream publish token to prevent race conditions across all protocols.
SrsStreamPublishToken *publish_token_raw = NULL;
if (ruc->publish_ && (err = _srs_stream_publish_tokens->acquire_token(req, publish_token_raw)) != srs_success) {
return srs_error_wrap(err, "acquire stream publish token");
}
SrsUniquePtr<SrsStreamPublishToken> publish_token(publish_token_raw);
if (publish_token.get()) {
srs_trace("stream publish token acquired, type=rtc, url=%s", req->get_stream_url().c_str());
}
SrsSharedPtr<SrsRtcSource> source;
if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
if (ruc->publish_ && !source->can_publish()) {
return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str());
}
// TODO: FIXME: add do_create_session to error process.
SrsContextId cid = _srs_context->get_id();
SrsRtcConnection *session = new SrsRtcConnection(this, cid);
if ((err = do_create_rtc_session(ruc, local_sdp, session)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "create session");
}
*psession = session;
return err;
}
srs_error_t SrsRtcSessionManager::do_create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session)
{
srs_error_t err = srs_success;
ISrsRequest *req = ruc->req_;
// first add publisher/player for negotiate sdp media info
if (ruc->publish_) {
if ((err = session->add_publisher(ruc, local_sdp)) != srs_success) {
return srs_error_wrap(err, "add publisher");
}
} else {
if ((err = session->add_player(ruc, local_sdp)) != srs_success) {
return srs_error_wrap(err, "add player");
}
}
// All tracks default as inactive, so we must enable them.
session->set_all_tracks_status(req->get_stream_url(), ruc->publish_, true);
std::string local_pwd = ruc->req_->ice_pwd_.empty() ? srs_rand_gen_str(32) : ruc->req_->ice_pwd_;
std::string local_ufrag = ruc->req_->ice_ufrag_.empty() ? srs_rand_gen_str(8) : ruc->req_->ice_ufrag_;
// TODO: FIXME: Rename for a better name, it's not an username.
std::string username = "";
while (true) {
username = local_ufrag + ":" + ruc->remote_sdp_.get_ice_ufrag();
if (!_srs_conn_manager->find_by_name(username)) {
break;
}
// Username conflict, regenerate a new one.
local_ufrag = srs_rand_gen_str(8);
}
local_sdp.set_ice_ufrag(local_ufrag);
local_sdp.set_ice_pwd(local_pwd);
local_sdp.set_fingerprint_algo("sha-256");
local_sdp.set_fingerprint(_srs_rtc_dtls_certificate->get_fingerprint());
// We allows to mock the eip of server.
if (true) {
// TODO: Support multiple listen ports.
int udp_port = 0;
if (true) {
string udp_host;
string udp_hostport = _srs_config->get_rtc_server_listens().at(0);
srs_net_split_for_listener(udp_hostport, udp_host, udp_port);
}
int tcp_port = 0;
if (true) {
string tcp_host;
string tcp_hostport = _srs_config->get_rtc_server_tcp_listens().at(0);
srs_net_split_for_listener(tcp_hostport, tcp_host, tcp_port);
}
string protocol = _srs_config->get_rtc_server_protocol();
set<string> candidates = discover_candidates(ruc);
for (set<string>::iterator it = candidates.begin(); it != candidates.end(); ++it) {
string hostname;
int uport = udp_port;
srs_net_split_hostport(*it, hostname, uport);
int tport = tcp_port;
srs_net_split_hostport(*it, hostname, tport);
if (protocol == "udp") {
local_sdp.add_candidate("udp", hostname, uport, "host");
} else if (protocol == "tcp") {
local_sdp.add_candidate("tcp", hostname, tport, "host");
} else {
local_sdp.add_candidate("udp", hostname, uport, "host");
local_sdp.add_candidate("tcp", hostname, tport, "host");
}
}
vector<string> v = vector<string>(candidates.begin(), candidates.end());
srs_trace("RTC: Use candidates %s, protocol=%s, tcp_port=%d, udp_port=%d",
srs_strings_join(v, ", ").c_str(), protocol.c_str(), tcp_port, udp_port);
}
// Setup the negotiate DTLS by config.
local_sdp.session_negotiate_ = local_sdp.session_config_;
// Setup the negotiate DTLS role.
if (ruc->remote_sdp_.get_dtls_role() == "active") {
local_sdp.session_negotiate_.dtls_role = "passive";
} else if (ruc->remote_sdp_.get_dtls_role() == "passive") {
local_sdp.session_negotiate_.dtls_role = "active";
} else if (ruc->remote_sdp_.get_dtls_role() == "actpass") {
local_sdp.session_negotiate_.dtls_role = local_sdp.session_config_.dtls_role;
} else {
// @see: https://tools.ietf.org/html/rfc4145#section-4.1
// The default value of the setup attribute in an offer/answer exchange
// is 'active' in the offer and 'passive' in the answer.
local_sdp.session_negotiate_.dtls_role = "passive";
}
local_sdp.set_dtls_role(local_sdp.session_negotiate_.dtls_role);
session->set_remote_sdp(ruc->remote_sdp_);
// We must setup the local SDP, then initialize the session object.
session->set_local_sdp(local_sdp);
session->set_state_as_waiting_stun();
// Before session initialize, we must setup the local SDP.
if ((err = session->initialize(req, ruc->dtls_, ruc->srtp_, username)) != srs_success) {
return srs_error_wrap(err, "init");
}
// We allows username is optional, but it never empty here.
_srs_conn_manager->add_with_name(username, session);
return err;
}
void SrsRtcSessionManager::srs_update_rtc_sessions()
{
// Alive RTC sessions, for stat.
int nn_rtc_conns = 0;
// Check all sessions and dispose the dead sessions.
for (int i = 0; i < (int)_srs_conn_manager->size(); i++) {
SrsRtcConnection *session = dynamic_cast<SrsRtcConnection *>(_srs_conn_manager->at(i));
// Ignore not session, or already disposing.
if (!session || session->disposing_) {
continue;
}
// Update stat if session is alive.
if (session->is_alive()) {
nn_rtc_conns++;
continue;
}
SrsContextRestore(_srs_context->get_id());
session->switch_to_context();
string username = session->username();
srs_trace("RTC: session destroy by timeout, username=%s", username.c_str());
// Use manager to free session and notify other objects.
_srs_conn_manager->remove(session);
}
// Ignore stats if no RTC connections.
if (!nn_rtc_conns) {
return;
}
static char buf[128];
string loss_desc;
SrsSnmpUdpStat *s = srs_get_udp_snmp_stat();
if (s->rcv_buf_errors_delta || s->snd_buf_errors_delta) {
snprintf(buf, sizeof(buf), ", loss=(r:%d,s:%d)", s->rcv_buf_errors_delta, s->snd_buf_errors_delta);
loss_desc = buf;
}
SrsKbsRtcStats stats;
srs_global_rtc_update(&stats);
srs_trace("RTC: Server conns=%u%s%s%s%s%s%s%s",
nn_rtc_conns,
stats.rpkts_desc.c_str(), stats.spkts_desc.c_str(), stats.rtcp_desc.c_str(), stats.snk_desc.c_str(),
stats.rnk_desc.c_str(), loss_desc.c_str(), stats.fid_desc.c_str());
}
srs_error_t SrsRtcSessionManager::exec_rtc_async_work(ISrsAsyncCallTask *t)
{
return rtc_async_->execute(t);
}
srs_error_t SrsRtcSessionManager::on_udp_packet(SrsUdpMuxSocket *skt)
{
srs_error_t err = srs_success;
SrsRtcConnection *session = NULL;
char *data = skt->data();
int size = skt->size();
bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t *)data, size);
bool is_rtcp = srs_is_rtcp((uint8_t *)data, size);
uint64_t fast_id = skt->fast_id();
// Try fast id first, if not found, search by long peer id.
if (fast_id) {
session = (SrsRtcConnection *)_srs_conn_manager->find_by_fast_id(fast_id);
}
if (!session) {
string peer_id = skt->peer_id();
session = (SrsRtcConnection *)_srs_conn_manager->find_by_id(peer_id);
}
if (session) {
// When got any packet, the session is alive now.
session->alive();
}
// For STUN, the peer address may change.
if (!is_rtp_or_rtcp && srs_is_stun((uint8_t *)data, size)) {
++_srs_pps_rstuns->sugar;
string peer_id = skt->peer_id();
// TODO: FIXME: Should support ICE renomination, to switch network between candidates.
SrsStunPacket ping;
if ((err = ping.decode(data, size)) != srs_success) {
return srs_error_wrap(err, "decode stun packet failed");
}
if (!session) {
session = find_rtc_session_by_username(ping.get_username());
}
if (session) {
session->switch_to_context();
}
srs_info("recv stun packet from %s, fast=%" PRId64 ", use-candidate=%d, ice-controlled=%d, ice-controlling=%d",
peer_id.c_str(), fast_id, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling());
// TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it.
if (!session) {
return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s, peer_id=%s, fast=%" PRId64,
ping.get_username().c_str(), peer_id.c_str(), fast_id);
}
// For each binding request, update the UDP socket.
if (ping.is_binding_request()) {
session->udp()->update_sendonly_socket(skt);
}
return session->udp()->on_stun(&ping, data, size);
}
// For DTLS, RTCP or RTP, which does not support peer address changing.
if (!session) {
string peer_id = skt->peer_id();
return srs_error_new(ERROR_RTC_STUN, "no session, peer_id=%s, fast=%" PRId64, peer_id.c_str(), fast_id);
}
// Note that we don't(except error) switch to the context of session, for performance issue.
if (is_rtp_or_rtcp && !is_rtcp) {
++_srs_pps_rrtps->sugar;
err = session->udp()->on_rtp(data, size);
if (err != srs_success) {
session->switch_to_context();
}
return err;
}
session->switch_to_context();
if (is_rtp_or_rtcp && is_rtcp) {
++_srs_pps_rrtcps->sugar;
return session->udp()->on_rtcp(data, size);
}
if (srs_is_dtls((uint8_t *)data, size)) {
++_srs_pps_rstuns->sugar;
return session->udp()->on_dtls(data, size);
}
return srs_error_new(ERROR_RTC_UDP, "unknown packet");
}

View File

@ -13,6 +13,7 @@
#include <srs_app_hourglass.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_reload.hpp>
#include <srs_app_rtc_conn.hpp>
#include <srs_app_rtc_sdp.hpp>
#include <srs_app_st.hpp>
@ -26,6 +27,7 @@ class ISrsRequest;
class SrsSdp;
class SrsRtcSource;
class SrsResourceManager;
class SrsAsyncCallWorker;
// The UDP black hole, for developer to use wireshark to catch plaintext packets.
// For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole,
@ -84,10 +86,39 @@ public:
// Discover the candidates for RTC server.
extern std::set<std::string> discover_candidates(SrsRtcUserConfig *ruc);
// Manager for RTC connections.
extern SrsResourceManager *_srs_conn_manager;
// The dns resolve utility, return the resolved ip address.
extern std::string srs_dns_resolve(std::string host, int &family);
// RTC session manager to handle WebRTC session lifecycle and management.
class SrsRtcSessionManager : public ISrsExecRtcAsyncTask
{
private:
// WebRTC async call worker for non-blocking operations.
SrsAsyncCallWorker *rtc_async_;
public:
SrsRtcSessionManager();
virtual ~SrsRtcSessionManager();
public:
virtual srs_error_t initialize();
public:
virtual SrsRtcConnection *find_rtc_session_by_username(const std::string &ufrag);
virtual srs_error_t create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession);
private:
virtual srs_error_t do_create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session);
public:
virtual void srs_update_rtc_sessions();
// interface ISrsExecRtcAsyncTask
public:
virtual srs_error_t exec_rtc_async_work(ISrsAsyncCallTask *t);
public:
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket *skt);
};
#endif

View File

@ -12,6 +12,7 @@
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_config.hpp>
#include <srs_app_conn.hpp>
#include <srs_app_hourglass.hpp>
#include <srs_app_log.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtc_conn.hpp>
@ -691,7 +692,7 @@ srs_error_t SrsRtcSource::on_publish()
pli_for_rtmp_ = _srs_config->get_rtc_pli_for_rtmp(req->vhost);
// @see SrsRtcSource::on_timer()
_srs_server->timer100ms()->subscribe(this);
_srs_shared_timer->timer100ms()->subscribe(this);
}
SrsStatistic *stat = SrsStatistic::instance();
@ -725,7 +726,7 @@ void SrsRtcSource::on_unpublish()
// free bridge resource
if (bridge_) {
// For SrsRtcSource::on_timer()
_srs_server->timer100ms()->unsubscribe(this);
_srs_shared_timer->timer100ms()->unsubscribe(this);
#ifdef SRS_FFMPEG_FIT
frame_builder_->on_unpublish();

File diff suppressed because it is too large Load Diff

View File

@ -18,11 +18,11 @@
#include <srs_app_listener.hpp>
#include <srs_app_reload.hpp>
#include <srs_app_source.hpp>
#include <srs_app_st.hpp>
#include <srs_protocol_st.hpp>
#include <srs_app_srt_listener.hpp>
#include <srs_app_srt_server.hpp>
#include <srs_app_st.hpp>
#include <srs_protocol_srt.hpp>
#include <srs_protocol_st.hpp>
class SrsAsyncCallWorker;
class SrsUdpMuxListener;
@ -31,7 +31,7 @@ class SrsRtcUserConfig;
class SrsSdp;
class SrsRtcConnection;
class ISrsAsyncCallTask;
class SrsSignalManager;
class SrsServer;
class ISrsHttpServeMux;
class SrsHttpServer;
@ -54,82 +54,19 @@ class SrsRtmpTransport;
class SrsRtmpsTransport;
class SrsSrtAcceptor;
class SrsSrtEventLoop;
class SrsRtcSessionManager;
class SrsPidFileLocker;
// Initialize global shared variables cross all threads.
extern srs_error_t srs_global_initialize();
// Interface for SRT client acceptance
class ISrsSrtClientHandler
{
public:
ISrsSrtClientHandler();
virtual ~ISrsSrtClientHandler();
public:
virtual srs_error_t accept_srt_client(srs_srt_t srt_fd);
};
// Convert signal to io,
// @see: st-1.9/docs/notes.html
class SrsSignalManager : public ISrsCoroutineHandler
{
private:
// Per-process pipe which is used as a signal queue.
// Up to PIPE_BUF/sizeof(int) signals can be queued up.
int sig_pipe[2];
srs_netfd_t signal_read_stfd;
private:
SrsServer *server;
SrsCoroutine *trd;
public:
SrsSignalManager(SrsServer *s);
virtual ~SrsSignalManager();
public:
virtual srs_error_t initialize();
virtual srs_error_t start();
// Interface ISrsEndlessThreadHandler.
public:
virtual srs_error_t cycle();
private:
// Global singleton instance
static SrsSignalManager *instance;
// Signal catching function.
// Converts signal event to I/O event.
static void sig_catcher(int signo);
};
// Auto reload by inotify.
// @see https://github.com/ossrs/srs/issues/1635
class SrsInotifyWorker : public ISrsCoroutineHandler
{
private:
SrsServer *server;
SrsCoroutine *trd;
srs_netfd_t inotify_fd;
public:
SrsInotifyWorker(SrsServer *s);
virtual ~SrsInotifyWorker();
public:
virtual srs_error_t start();
// Interface ISrsEndlessThreadHandler.
public:
virtual srs_error_t cycle();
};
// SRS RTMP server, initialize and listen, start connection service thread, destroy client.
class SrsServer : public ISrsReloadHandler, // Reload framework for permormance optimization.
public ISrsLiveSourceHandler,
public ISrsTcpHandler,
public ISrsHourGlass,
public ISrsSrtClientHandler,
public ISrsUdpMuxHandler,
public ISrsFastTimer
public ISrsUdpMuxHandler
{
private:
// TODO: FIXME: Extract an HttpApiServer.
@ -142,19 +79,8 @@ private:
SrsHourGlass *timer_;
private:
// Global shared timers moved from SrsHybridServer
SrsFastTimer *timer20ms_;
SrsFastTimer *timer100ms_;
SrsFastTimer *timer1s_;
SrsFastTimer *timer5s_;
SrsClockWallMonitor *clock_monitor_;
private:
// The pid file fd, lock the file write when server is running.
// @remark the init.d script should cleanup the pid file, when stop service,
// for the server never delete the file; when system startup, the pid in pid file
// maybe valid but the process is not SRS, the init.d script will never start server.
int pid_fd_;
// PID file manager for process identification and locking.
SrsPidFileLocker *pid_file_locker_;
private:
// If reusing, HTTP API use the same port of HTTP server.
@ -196,8 +122,8 @@ private:
std::vector<SrsSrtAcceptor *> srt_acceptors_;
// WebRTC UDP listeners for RTC server functionality.
std::vector<SrsUdpMuxListener *> rtc_listeners_;
// WebRTC async call worker for non-blocking operations.
SrsAsyncCallWorker *rtc_async_;
// WebRTC session manager.
SrsRtcSessionManager *rtc_session_manager_;
private:
// Signal manager which convert gignal to io message.
@ -218,10 +144,6 @@ public:
virtual ~SrsServer();
private:
// The destroy is for gmc to analysis the memory leak,
// if not destroy global/static data, the gmc will warning memory leak.
// In service, server never destroy, directly exit when restart.
virtual void destroy();
// When SIGTERM, SRS should do cleanup, for example,
// to stop all ingesters, cleanup HLS and dvr.
virtual void dispose();
@ -229,16 +151,16 @@ private:
// then wait and quit when all connections finished.
virtual void gracefully_dispose();
public:
// Get the HTTP API server mux.
ISrsHttpServeMux *api_server();
// server startup workflow, @see run_master()
public:
// Initialize server with callback handler ch.
// @remark user must free the handler.
virtual srs_error_t initialize();
private:
// Require the PID file for the whole process.
virtual srs_error_t acquire_pid_file();
public:
srs_error_t run();
@ -295,6 +217,7 @@ private:
virtual srs_error_t accept_srt_client(srs_srt_t srt_fd);
virtual srs_error_t srt_fd_to_resource(srs_srt_t srt_fd, ISrsResource **pr);
private:
// WebRTC-related methods
virtual srs_error_t listen_rtc_udp();
@ -306,15 +229,11 @@ private:
virtual srs_error_t listen_rtc_api();
public:
virtual srs_error_t exec_rtc_async_work(ISrsAsyncCallTask *t);
virtual SrsRtcConnection *find_rtc_session_by_username(const std::string &ufrag);
virtual srs_error_t create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection **psession);
private:
virtual srs_error_t do_create_rtc_session(SrsRtcUserConfig *ruc, SrsSdp &local_sdp, SrsRtcConnection *session);
private:
virtual srs_error_t srs_update_rtc_sessions();
virtual srs_error_t srs_update_server_statistics();
// Interface ISrsTcpHandler
public:
@ -328,23 +247,82 @@ private:
public:
virtual srs_error_t on_publish(ISrsRequest *r);
virtual void on_unpublish(ISrsRequest *r);
public:
// Access to global shared timers
SrsFastTimer *timer20ms();
SrsFastTimer *timer100ms();
SrsFastTimer *timer1s();
SrsFastTimer *timer5s();
// interface ISrsFastTimer for statistics reporting
private:
virtual srs_error_t on_timer(srs_utime_t interval);
};
// @global main SRS server, for debugging
extern SrsServer *_srs_server;
// Manager for RTC connections.
extern SrsResourceManager *_srs_conn_manager;
// Convert signal to io,
// @see: st-1.9/docs/notes.html
class SrsSignalManager : public ISrsCoroutineHandler
{
private:
// Per-process pipe which is used as a signal queue.
// Up to PIPE_BUF/sizeof(int) signals can be queued up.
int sig_pipe[2];
srs_netfd_t signal_read_stfd;
private:
SrsServer *server;
SrsCoroutine *trd;
public:
SrsSignalManager(SrsServer *s);
virtual ~SrsSignalManager();
public:
virtual srs_error_t initialize();
virtual srs_error_t start();
// Interface ISrsEndlessThreadHandler.
public:
virtual srs_error_t cycle();
private:
// Global singleton instance
static SrsSignalManager *instance;
// Signal catching function.
// Converts signal event to I/O event.
static void sig_catcher(int signo);
};
// Auto reload by inotify.
// @see https://github.com/ossrs/srs/issues/1635
class SrsInotifyWorker : public ISrsCoroutineHandler
{
private:
SrsServer *server;
SrsCoroutine *trd;
srs_netfd_t inotify_fd;
public:
SrsInotifyWorker(SrsServer *s);
virtual ~SrsInotifyWorker();
public:
virtual srs_error_t start();
// Interface ISrsEndlessThreadHandler.
public:
virtual srs_error_t cycle();
};
// PID file manager for process identification and locking.
class SrsPidFileLocker
{
private:
int pid_fd_;
std::string pid_file_;
public:
SrsPidFileLocker();
virtual ~SrsPidFileLocker();
public:
// Acquire the PID file for the whole process.
virtual srs_error_t acquire();
private:
// Close the PID file descriptor.
virtual void close();
};
#endif

View File

@ -12,6 +12,7 @@ using namespace std;
#include <srs_app_http_hooks.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_source.hpp>
#include <srs_app_srt_server.hpp>
#include <srs_app_srt_source.hpp>
#include <srs_app_statistic.hpp>

View File

@ -9,6 +9,8 @@
using namespace std;
#include <srs_app_config.hpp>
#include <srs_app_server.hpp>
#include <srs_app_source.hpp>
#include <srs_app_srt_conn.hpp>
#include <srs_app_statistic.hpp>
#include <srs_kernel_log.hpp>
@ -17,6 +19,14 @@ using namespace std;
SrsSrtEventLoop *_srt_eventloop = NULL;
ISrsSrtClientHandler::ISrsSrtClientHandler()
{
}
ISrsSrtClientHandler::~ISrsSrtClientHandler()
{
}
SrsSrtAcceptor::SrsSrtAcceptor(ISrsSrtClientHandler *srt_handler)
{
port_ = 0;

View File

@ -9,7 +9,6 @@
#include <srs_core.hpp>
#include <srs_app_server.hpp>
#include <srs_app_srt_listener.hpp>
#include <srs_protocol_srt.hpp>
@ -17,6 +16,17 @@ class SrsSrtServer;
class SrsHourGlass;
class ISrsSrtClientHandler;
// Interface for SRT client acceptance
class ISrsSrtClientHandler
{
public:
ISrsSrtClientHandler();
virtual ~ISrsSrtClientHandler();
public:
virtual srs_error_t accept_srt_client(srs_srt_t srt_fd) = 0;
};
// A common srt acceptor, for SRT server.
class SrsSrtAcceptor : public ISrsSrtHandler
{

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7
#define VERSION_MINOR 0
#define VERSION_REVISION 69
#define VERSION_REVISION 70
#endif

View File

@ -9,6 +9,10 @@
#include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
#include <cstdio>
#include <string>
using namespace std;
SrsRateSample::SrsRateSample()
{
total = time = -1;
@ -112,14 +116,58 @@ srs_utime_t SrsWallClock::now()
SrsWallClock *_srs_clock = NULL;
// Global SrsPps statistics variables implementations
// I/O operations statistics
SrsPps *_srs_pps_recvfrom = NULL;
SrsPps *_srs_pps_recvfrom_eagain = NULL;
SrsPps *_srs_pps_sendto = NULL;
SrsPps *_srs_pps_sendto_eagain = NULL;
SrsPps *_srs_pps_read = NULL;
SrsPps *_srs_pps_read_eagain = NULL;
SrsPps *_srs_pps_readv = NULL;
SrsPps *_srs_pps_readv_eagain = NULL;
SrsPps *_srs_pps_writev = NULL;
SrsPps *_srs_pps_writev_eagain = NULL;
SrsPps *_srs_pps_recvmsg = NULL;
SrsPps *_srs_pps_recvmsg_eagain = NULL;
SrsPps *_srs_pps_sendmsg = NULL;
SrsPps *_srs_pps_sendmsg_eagain = NULL;
// Clock and timing statistics
SrsPps *_srs_pps_clock_15ms = NULL;
SrsPps *_srs_pps_clock_20ms = NULL;
SrsPps *_srs_pps_clock_25ms = NULL;
SrsPps *_srs_pps_clock_30ms = NULL;
SrsPps *_srs_pps_clock_35ms = NULL;
SrsPps *_srs_pps_clock_40ms = NULL;
SrsPps *_srs_pps_clock_80ms = NULL;
SrsPps *_srs_pps_clock_160ms = NULL;
SrsPps *_srs_pps_timer_s = NULL;
// WebRTC packet statistics (only the ones originally in srs_app_server.cpp)
SrsPps *_srs_pps_rstuns = NULL;
SrsPps *_srs_pps_rrtps = NULL;
SrsPps *_srs_pps_rrtcps = NULL;
// NACK and loss statistics (only _srs_pps_aloss2 was originally in srs_app_server.cpp)
SrsPps *_srs_pps_aloss2 = NULL;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
// Debug thread statistics
SrsPps *_srs_pps_thread_run = NULL;
SrsPps *_srs_pps_thread_idle = NULL;
SrsPps *_srs_pps_thread_yield = NULL;
SrsPps *_srs_pps_thread_yield2 = NULL;
// Debug epoll statistics
SrsPps *_srs_pps_epoll = NULL;
SrsPps *_srs_pps_epoll_zero = NULL;
SrsPps *_srs_pps_epoll_shake = NULL;
SrsPps *_srs_pps_epoll_spin = NULL;
SrsPps *_srs_pps_sched_160ms = NULL;
SrsPps *_srs_pps_sched_s = NULL;
// Debug scheduler statistics
SrsPps *_srs_pps_sched_15ms = NULL;
SrsPps *_srs_pps_sched_20ms = NULL;
SrsPps *_srs_pps_sched_25ms = NULL;
@ -127,5 +175,366 @@ SrsPps *_srs_pps_sched_30ms = NULL;
SrsPps *_srs_pps_sched_35ms = NULL;
SrsPps *_srs_pps_sched_40ms = NULL;
SrsPps *_srs_pps_sched_80ms = NULL;
SrsPps *_srs_pps_sched_160ms = NULL;
SrsPps *_srs_pps_sched_s = NULL;
#endif
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
extern "C" {
// External ST statistics
extern __thread unsigned long long _st_stat_recvfrom;
extern __thread unsigned long long _st_stat_recvfrom_eagain;
extern __thread unsigned long long _st_stat_sendto;
extern __thread unsigned long long _st_stat_sendto_eagain;
extern __thread unsigned long long _st_stat_read;
extern __thread unsigned long long _st_stat_read_eagain;
extern __thread unsigned long long _st_stat_readv;
extern __thread unsigned long long _st_stat_readv_eagain;
extern __thread unsigned long long _st_stat_writev;
extern __thread unsigned long long _st_stat_writev_eagain;
extern __thread unsigned long long _st_stat_recvmsg;
extern __thread unsigned long long _st_stat_recvmsg_eagain;
extern __thread unsigned long long _st_stat_sendmsg;
extern __thread unsigned long long _st_stat_sendmsg_eagain;
extern __thread unsigned long long _st_stat_epoll;
extern __thread unsigned long long _st_stat_epoll_zero;
extern __thread unsigned long long _st_stat_epoll_shake;
extern __thread unsigned long long _st_stat_epoll_spin;
extern __thread unsigned long long _st_stat_sched_15ms;
extern __thread unsigned long long _st_stat_sched_20ms;
extern __thread unsigned long long _st_stat_sched_25ms;
extern __thread unsigned long long _st_stat_sched_30ms;
extern __thread unsigned long long _st_stat_sched_35ms;
extern __thread unsigned long long _st_stat_sched_40ms;
extern __thread unsigned long long _st_stat_sched_80ms;
extern __thread unsigned long long _st_stat_sched_160ms;
extern __thread unsigned long long _st_stat_sched_s;
extern __thread int _st_active_count;
extern __thread int _st_num_free_stacks;
extern __thread unsigned long long _st_stat_thread_run;
extern __thread unsigned long long _st_stat_thread_idle;
extern __thread unsigned long long _st_stat_thread_yield;
extern __thread unsigned long long _st_stat_thread_yield2;
}
#endif
srs_error_t srs_global_kbps_initialize()
{
srs_error_t err = srs_success;
// The clock wall object.
_srs_clock = new SrsWallClock();
// Initialize global pps, which depends on _srs_clock
_srs_pps_ids = new SrsPps();
_srs_pps_fids = new SrsPps();
_srs_pps_fids_level0 = new SrsPps();
_srs_pps_dispose = new SrsPps();
_srs_pps_timer = new SrsPps();
_srs_pps_conn = new SrsPps();
_srs_pps_pub = new SrsPps();
_srs_pps_snack = new SrsPps();
_srs_pps_snack2 = new SrsPps();
_srs_pps_snack3 = new SrsPps();
_srs_pps_snack4 = new SrsPps();
_srs_pps_sanack = new SrsPps();
_srs_pps_svnack = new SrsPps();
_srs_pps_rnack = new SrsPps();
_srs_pps_rnack2 = new SrsPps();
_srs_pps_rhnack = new SrsPps();
_srs_pps_rmnack = new SrsPps();
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_recvfrom = new SrsPps();
_srs_pps_recvfrom_eagain = new SrsPps();
_srs_pps_sendto = new SrsPps();
_srs_pps_sendto_eagain = new SrsPps();
_srs_pps_read = new SrsPps();
_srs_pps_read_eagain = new SrsPps();
_srs_pps_readv = new SrsPps();
_srs_pps_readv_eagain = new SrsPps();
_srs_pps_writev = new SrsPps();
_srs_pps_writev_eagain = new SrsPps();
_srs_pps_recvmsg = new SrsPps();
_srs_pps_recvmsg_eagain = new SrsPps();
_srs_pps_sendmsg = new SrsPps();
_srs_pps_sendmsg_eagain = new SrsPps();
_srs_pps_epoll = new SrsPps();
_srs_pps_epoll_zero = new SrsPps();
_srs_pps_epoll_shake = new SrsPps();
_srs_pps_epoll_spin = new SrsPps();
_srs_pps_sched_15ms = new SrsPps();
_srs_pps_sched_20ms = new SrsPps();
_srs_pps_sched_25ms = new SrsPps();
_srs_pps_sched_30ms = new SrsPps();
_srs_pps_sched_35ms = new SrsPps();
_srs_pps_sched_40ms = new SrsPps();
_srs_pps_sched_80ms = new SrsPps();
_srs_pps_sched_160ms = new SrsPps();
_srs_pps_sched_s = new SrsPps();
#endif
_srs_pps_clock_15ms = new SrsPps();
_srs_pps_clock_20ms = new SrsPps();
_srs_pps_clock_25ms = new SrsPps();
_srs_pps_clock_30ms = new SrsPps();
_srs_pps_clock_35ms = new SrsPps();
_srs_pps_clock_40ms = new SrsPps();
_srs_pps_clock_80ms = new SrsPps();
_srs_pps_clock_160ms = new SrsPps();
_srs_pps_timer_s = new SrsPps();
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_thread_run = new SrsPps();
_srs_pps_thread_idle = new SrsPps();
_srs_pps_thread_yield = new SrsPps();
_srs_pps_thread_yield2 = new SrsPps();
#endif
_srs_pps_rpkts = new SrsPps();
_srs_pps_addrs = new SrsPps();
_srs_pps_fast_addrs = new SrsPps();
_srs_pps_spkts = new SrsPps();
_srs_pps_objs_msgs = new SrsPps();
_srs_pps_sstuns = new SrsPps();
_srs_pps_srtcps = new SrsPps();
_srs_pps_srtps = new SrsPps();
_srs_pps_rstuns = new SrsPps();
_srs_pps_rrtps = new SrsPps();
_srs_pps_rrtcps = new SrsPps();
_srs_pps_aloss2 = new SrsPps();
_srs_pps_pli = new SrsPps();
_srs_pps_twcc = new SrsPps();
_srs_pps_rr = new SrsPps();
_srs_pps_objs_rtps = new SrsPps();
_srs_pps_objs_rraw = new SrsPps();
_srs_pps_objs_rfua = new SrsPps();
_srs_pps_objs_rbuf = new SrsPps();
_srs_pps_objs_rothers = new SrsPps();
// The pps cids depends by st init.
_srs_pps_cids_get = new SrsPps();
_srs_pps_cids_set = new SrsPps();
return err;
}
void srs_global_kbps_update(SrsKbpsStats *stats)
{
static char buf[128];
string &cid_desc = stats->cid_desc;
_srs_pps_cids_get->update();
_srs_pps_cids_set->update();
if (_srs_pps_cids_get->r10s() || _srs_pps_cids_set->r10s()) {
snprintf(buf, sizeof(buf), ", cid=%d,%d", _srs_pps_cids_get->r10s(), _srs_pps_cids_set->r10s());
cid_desc = buf;
}
string &timer_desc = stats->timer_desc;
_srs_pps_timer->update();
_srs_pps_pub->update();
_srs_pps_conn->update();
if (_srs_pps_timer->r10s() || _srs_pps_pub->r10s() || _srs_pps_conn->r10s()) {
snprintf(buf, sizeof(buf), ", timer=%d,%d,%d", _srs_pps_timer->r10s(), _srs_pps_pub->r10s(), _srs_pps_conn->r10s());
timer_desc = buf;
}
string &free_desc = stats->free_desc;
_srs_pps_dispose->update();
if (_srs_pps_dispose->r10s()) {
snprintf(buf, sizeof(buf), ", free=%d", _srs_pps_dispose->r10s());
free_desc = buf;
}
string &recvfrom_desc = stats->recvfrom_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_recvfrom->update(_st_stat_recvfrom);
_srs_pps_recvfrom_eagain->update(_st_stat_recvfrom_eagain);
_srs_pps_sendto->update(_st_stat_sendto);
_srs_pps_sendto_eagain->update(_st_stat_sendto_eagain);
if (_srs_pps_recvfrom->r10s() || _srs_pps_recvfrom_eagain->r10s() || _srs_pps_sendto->r10s() || _srs_pps_sendto_eagain->r10s()) {
snprintf(buf, sizeof(buf), ", udp=%d,%d,%d,%d", _srs_pps_recvfrom->r10s(), _srs_pps_recvfrom_eagain->r10s(), _srs_pps_sendto->r10s(), _srs_pps_sendto_eagain->r10s());
recvfrom_desc = buf;
}
#endif
string &io_desc = stats->io_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_read->update(_st_stat_read);
_srs_pps_read_eagain->update(_st_stat_read_eagain);
_srs_pps_readv->update(_st_stat_readv);
_srs_pps_readv_eagain->update(_st_stat_readv_eagain);
_srs_pps_writev->update(_st_stat_writev);
_srs_pps_writev_eagain->update(_st_stat_writev_eagain);
if (_srs_pps_read->r10s() || _srs_pps_read_eagain->r10s() || _srs_pps_readv->r10s() || _srs_pps_readv_eagain->r10s() || _srs_pps_writev->r10s() || _srs_pps_writev_eagain->r10s()) {
snprintf(buf, sizeof(buf), ", io=%d,%d,%d,%d,%d,%d", _srs_pps_read->r10s(), _srs_pps_read_eagain->r10s(), _srs_pps_readv->r10s(), _srs_pps_readv_eagain->r10s(), _srs_pps_writev->r10s(), _srs_pps_writev_eagain->r10s());
io_desc = buf;
}
#endif
string &msg_desc = stats->msg_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_recvmsg->update(_st_stat_recvmsg);
_srs_pps_recvmsg_eagain->update(_st_stat_recvmsg_eagain);
_srs_pps_sendmsg->update(_st_stat_sendmsg);
_srs_pps_sendmsg_eagain->update(_st_stat_sendmsg_eagain);
if (_srs_pps_recvmsg->r10s() || _srs_pps_recvmsg_eagain->r10s() || _srs_pps_sendmsg->r10s() || _srs_pps_sendmsg_eagain->r10s()) {
snprintf(buf, sizeof(buf), ", msg=%d,%d,%d,%d", _srs_pps_recvmsg->r10s(), _srs_pps_recvmsg_eagain->r10s(), _srs_pps_sendmsg->r10s(), _srs_pps_sendmsg_eagain->r10s());
msg_desc = buf;
}
#endif
string &epoll_desc = stats->epoll_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_epoll->update(_st_stat_epoll);
_srs_pps_epoll_zero->update(_st_stat_epoll_zero);
_srs_pps_epoll_shake->update(_st_stat_epoll_shake);
_srs_pps_epoll_spin->update(_st_stat_epoll_spin);
if (_srs_pps_epoll->r10s() || _srs_pps_epoll_zero->r10s() || _srs_pps_epoll_shake->r10s() || _srs_pps_epoll_spin->r10s()) {
snprintf(buf, sizeof(buf), ", epoll=%d,%d,%d,%d", _srs_pps_epoll->r10s(), _srs_pps_epoll_zero->r10s(), _srs_pps_epoll_shake->r10s(), _srs_pps_epoll_spin->r10s());
epoll_desc = buf;
}
#endif
string &sched_desc = stats->sched_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_sched_160ms->update(_st_stat_sched_160ms);
_srs_pps_sched_s->update(_st_stat_sched_s);
_srs_pps_sched_15ms->update(_st_stat_sched_15ms);
_srs_pps_sched_20ms->update(_st_stat_sched_20ms);
_srs_pps_sched_25ms->update(_st_stat_sched_25ms);
_srs_pps_sched_30ms->update(_st_stat_sched_30ms);
_srs_pps_sched_35ms->update(_st_stat_sched_35ms);
_srs_pps_sched_40ms->update(_st_stat_sched_40ms);
_srs_pps_sched_80ms->update(_st_stat_sched_80ms);
if (_srs_pps_sched_160ms->r10s() || _srs_pps_sched_s->r10s() || _srs_pps_sched_15ms->r10s() || _srs_pps_sched_20ms->r10s() || _srs_pps_sched_25ms->r10s() || _srs_pps_sched_30ms->r10s() || _srs_pps_sched_35ms->r10s() || _srs_pps_sched_40ms->r10s() || _srs_pps_sched_80ms->r10s()) {
snprintf(buf, sizeof(buf), ", sched=%d,%d,%d,%d,%d,%d,%d,%d,%d", _srs_pps_sched_15ms->r10s(), _srs_pps_sched_20ms->r10s(), _srs_pps_sched_25ms->r10s(), _srs_pps_sched_30ms->r10s(), _srs_pps_sched_35ms->r10s(), _srs_pps_sched_40ms->r10s(), _srs_pps_sched_80ms->r10s(), _srs_pps_sched_160ms->r10s(), _srs_pps_sched_s->r10s());
sched_desc = buf;
}
#endif
string &clock_desc = stats->clock_desc;
_srs_pps_clock_15ms->update();
_srs_pps_clock_20ms->update();
_srs_pps_clock_25ms->update();
_srs_pps_clock_30ms->update();
_srs_pps_clock_35ms->update();
_srs_pps_clock_40ms->update();
_srs_pps_clock_80ms->update();
_srs_pps_clock_160ms->update();
_srs_pps_timer_s->update();
if (_srs_pps_clock_15ms->r10s() || _srs_pps_timer_s->r10s() || _srs_pps_clock_20ms->r10s() || _srs_pps_clock_25ms->r10s() || _srs_pps_clock_30ms->r10s() || _srs_pps_clock_35ms->r10s() || _srs_pps_clock_40ms->r10s() || _srs_pps_clock_80ms->r10s() || _srs_pps_clock_160ms->r10s()) {
snprintf(buf, sizeof(buf), ", clock=%d,%d,%d,%d,%d,%d,%d,%d,%d", _srs_pps_clock_15ms->r10s(), _srs_pps_clock_20ms->r10s(), _srs_pps_clock_25ms->r10s(), _srs_pps_clock_30ms->r10s(), _srs_pps_clock_35ms->r10s(), _srs_pps_clock_40ms->r10s(), _srs_pps_clock_80ms->r10s(), _srs_pps_clock_160ms->r10s(), _srs_pps_timer_s->r10s());
clock_desc = buf;
}
string &thread_desc = stats->thread_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_thread_run->update(_st_stat_thread_run);
_srs_pps_thread_idle->update(_st_stat_thread_idle);
_srs_pps_thread_yield->update(_st_stat_thread_yield);
_srs_pps_thread_yield2->update(_st_stat_thread_yield2);
if (_st_active_count > 0 || _st_num_free_stacks > 0 || _srs_pps_thread_run->r10s() || _srs_pps_thread_idle->r10s() || _srs_pps_thread_yield->r10s() || _srs_pps_thread_yield2->r10s()) {
snprintf(buf, sizeof(buf), ", co=%d,%d,%d, stk=%d, yield=%d,%d", _st_active_count, _srs_pps_thread_run->r10s(), _srs_pps_thread_idle->r10s(), _st_num_free_stacks, _srs_pps_thread_yield->r10s(), _srs_pps_thread_yield2->r10s());
thread_desc = buf;
}
#endif
string &objs_desc = stats->objs_desc;
_srs_pps_objs_rtps->update();
_srs_pps_objs_rraw->update();
_srs_pps_objs_rfua->update();
_srs_pps_objs_rbuf->update();
_srs_pps_objs_msgs->update();
_srs_pps_objs_rothers->update();
if (_srs_pps_objs_rtps->r10s() || _srs_pps_objs_rraw->r10s() || _srs_pps_objs_rfua->r10s() || _srs_pps_objs_rbuf->r10s() || _srs_pps_objs_msgs->r10s() || _srs_pps_objs_rothers->r10s()) {
snprintf(buf, sizeof(buf), ", objs=(pkt:%d,raw:%d,fua:%d,msg:%d,oth:%d,buf:%d)",
_srs_pps_objs_rtps->r10s(), _srs_pps_objs_rraw->r10s(), _srs_pps_objs_rfua->r10s(),
_srs_pps_objs_msgs->r10s(), _srs_pps_objs_rothers->r10s(), _srs_pps_objs_rbuf->r10s());
objs_desc = buf;
}
}
void srs_global_rtc_update(SrsKbsRtcStats *stats)
{
static char buf[128];
string &rpkts_desc = stats->rpkts_desc;
_srs_pps_rpkts->update();
_srs_pps_rrtps->update();
_srs_pps_rstuns->update();
_srs_pps_rrtcps->update();
if (_srs_pps_rpkts->r10s() || _srs_pps_rrtps->r10s() || _srs_pps_rstuns->r10s() || _srs_pps_rrtcps->r10s()) {
snprintf(buf, sizeof(buf), ", rpkts=(%d,rtp:%d,stun:%d,rtcp:%d)", _srs_pps_rpkts->r10s(), _srs_pps_rrtps->r10s(), _srs_pps_rstuns->r10s(), _srs_pps_rrtcps->r10s());
rpkts_desc = buf;
}
string &spkts_desc = stats->spkts_desc;
_srs_pps_spkts->update();
_srs_pps_srtps->update();
_srs_pps_sstuns->update();
_srs_pps_srtcps->update();
if (_srs_pps_spkts->r10s() || _srs_pps_srtps->r10s() || _srs_pps_sstuns->r10s() || _srs_pps_srtcps->r10s()) {
snprintf(buf, sizeof(buf), ", spkts=(%d,rtp:%d,stun:%d,rtcp:%d)", _srs_pps_spkts->r10s(), _srs_pps_srtps->r10s(), _srs_pps_sstuns->r10s(), _srs_pps_srtcps->r10s());
spkts_desc = buf;
}
string &rtcp_desc = stats->rtcp_desc;
_srs_pps_pli->update();
_srs_pps_twcc->update();
_srs_pps_rr->update();
if (_srs_pps_pli->r10s() || _srs_pps_twcc->r10s() || _srs_pps_rr->r10s()) {
snprintf(buf, sizeof(buf), ", rtcp=(pli:%d,twcc:%d,rr:%d)", _srs_pps_pli->r10s(), _srs_pps_twcc->r10s(), _srs_pps_rr->r10s());
rtcp_desc = buf;
}
string &snk_desc = stats->snk_desc;
_srs_pps_snack->update();
_srs_pps_snack2->update();
_srs_pps_sanack->update();
_srs_pps_svnack->update();
if (_srs_pps_snack->r10s() || _srs_pps_sanack->r10s() || _srs_pps_svnack->r10s() || _srs_pps_snack2->r10s()) {
snprintf(buf, sizeof(buf), ", snk=(%d,a:%d,v:%d,h:%d)", _srs_pps_snack->r10s(), _srs_pps_sanack->r10s(), _srs_pps_svnack->r10s(), _srs_pps_snack2->r10s());
snk_desc = buf;
}
string &rnk_desc = stats->rnk_desc;
_srs_pps_rnack->update();
_srs_pps_rnack2->update();
_srs_pps_rhnack->update();
_srs_pps_rmnack->update();
if (_srs_pps_rnack->r10s() || _srs_pps_rnack2->r10s() || _srs_pps_rhnack->r10s() || _srs_pps_rmnack->r10s()) {
snprintf(buf, sizeof(buf), ", rnk=(%d,%d,h:%d,m:%d)", _srs_pps_rnack->r10s(), _srs_pps_rnack2->r10s(), _srs_pps_rhnack->r10s(), _srs_pps_rmnack->r10s());
rnk_desc = buf;
}
string &fid_desc = stats->fid_desc;
_srs_pps_ids->update();
_srs_pps_fids->update();
_srs_pps_fids_level0->update();
_srs_pps_addrs->update();
_srs_pps_fast_addrs->update();
if (_srs_pps_ids->r10s(), _srs_pps_fids->r10s(), _srs_pps_fids_level0->r10s(), _srs_pps_addrs->r10s(), _srs_pps_fast_addrs->r10s()) {
snprintf(buf, sizeof(buf), ", fid=(id:%d,fid:%d,ffid:%d,addr:%d,faddr:%d)", _srs_pps_ids->r10s(), _srs_pps_fids->r10s(), _srs_pps_fids_level0->r10s(), _srs_pps_addrs->r10s(), _srs_pps_fast_addrs->r10s());
fid_desc = buf;
}
}

View File

@ -11,6 +11,8 @@
#include <srs_kernel_kbps.hpp>
#include <string>
class SrsWallClock;
// A sample for rate-based stat, such as kbps or kps.
@ -82,4 +84,148 @@ public:
// The global clock.
extern SrsWallClock *_srs_clock;
// Global SrsPps statistics variables
// I/O operations statistics
extern SrsPps *_srs_pps_recvfrom;
extern SrsPps *_srs_pps_recvfrom_eagain;
extern SrsPps *_srs_pps_sendto;
extern SrsPps *_srs_pps_sendto_eagain;
extern SrsPps *_srs_pps_read;
extern SrsPps *_srs_pps_read_eagain;
extern SrsPps *_srs_pps_readv;
extern SrsPps *_srs_pps_readv_eagain;
extern SrsPps *_srs_pps_writev;
extern SrsPps *_srs_pps_writev_eagain;
extern SrsPps *_srs_pps_recvmsg;
extern SrsPps *_srs_pps_recvmsg_eagain;
extern SrsPps *_srs_pps_sendmsg;
extern SrsPps *_srs_pps_sendmsg_eagain;
// Clock and timing statistics
extern SrsPps *_srs_pps_clock_15ms;
extern SrsPps *_srs_pps_clock_20ms;
extern SrsPps *_srs_pps_clock_25ms;
extern SrsPps *_srs_pps_clock_30ms;
extern SrsPps *_srs_pps_clock_35ms;
extern SrsPps *_srs_pps_clock_40ms;
extern SrsPps *_srs_pps_clock_80ms;
extern SrsPps *_srs_pps_clock_160ms;
extern SrsPps *_srs_pps_timer_s;
// WebRTC packet statistics
extern SrsPps *_srs_pps_rpkts;
extern SrsPps *_srs_pps_rstuns;
extern SrsPps *_srs_pps_rrtps;
extern SrsPps *_srs_pps_rrtcps;
extern SrsPps *_srs_pps_addrs;
extern SrsPps *_srs_pps_fast_addrs;
extern SrsPps *_srs_pps_spkts;
extern SrsPps *_srs_pps_sstuns;
extern SrsPps *_srs_pps_srtcps;
extern SrsPps *_srs_pps_srtps;
// Object and resource statistics
extern SrsPps *_srs_pps_ids;
extern SrsPps *_srs_pps_fids;
extern SrsPps *_srs_pps_fids_level0;
extern SrsPps *_srs_pps_dispose;
extern SrsPps *_srs_pps_timer;
extern SrsPps *_srs_pps_pub;
extern SrsPps *_srs_pps_conn;
extern SrsPps *_srs_pps_cids_get;
extern SrsPps *_srs_pps_cids_set;
// NACK and loss statistics
extern SrsPps *_srs_pps_snack;
extern SrsPps *_srs_pps_snack2;
extern SrsPps *_srs_pps_snack3;
extern SrsPps *_srs_pps_snack4;
extern SrsPps *_srs_pps_sanack;
extern SrsPps *_srs_pps_svnack;
extern SrsPps *_srs_pps_aloss2;
extern SrsPps *_srs_pps_rnack;
extern SrsPps *_srs_pps_rnack2;
extern SrsPps *_srs_pps_rhnack;
extern SrsPps *_srs_pps_rmnack;
// WebRTC control statistics
extern SrsPps *_srs_pps_pli;
extern SrsPps *_srs_pps_twcc;
extern SrsPps *_srs_pps_rr;
// Object statistics
extern SrsPps *_srs_pps_objs_rtps;
extern SrsPps *_srs_pps_objs_rraw;
extern SrsPps *_srs_pps_objs_rfua;
extern SrsPps *_srs_pps_objs_rbuf;
extern SrsPps *_srs_pps_objs_msgs;
extern SrsPps *_srs_pps_objs_rothers;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
// Debug thread statistics
extern SrsPps *_srs_pps_thread_run;
extern SrsPps *_srs_pps_thread_idle;
extern SrsPps *_srs_pps_thread_yield;
extern SrsPps *_srs_pps_thread_yield2;
// Debug epoll statistics
extern SrsPps *_srs_pps_epoll;
extern SrsPps *_srs_pps_epoll_zero;
extern SrsPps *_srs_pps_epoll_shake;
extern SrsPps *_srs_pps_epoll_spin;
// Debug scheduler statistics
extern SrsPps *_srs_pps_sched_15ms;
extern SrsPps *_srs_pps_sched_20ms;
extern SrsPps *_srs_pps_sched_25ms;
extern SrsPps *_srs_pps_sched_30ms;
extern SrsPps *_srs_pps_sched_35ms;
extern SrsPps *_srs_pps_sched_40ms;
extern SrsPps *_srs_pps_sched_80ms;
extern SrsPps *_srs_pps_sched_160ms;
extern SrsPps *_srs_pps_sched_s;
#endif
// Initialize the global kbps statistics variables
srs_error_t srs_global_kbps_initialize();
class SrsKbpsStats
{
public:
std::string cid_desc;
std::string timer_desc;
std::string free_desc;
std::string recvfrom_desc;
std::string io_desc;
std::string msg_desc;
std::string epoll_desc;
std::string sched_desc;
std::string clock_desc;
std::string thread_desc;
std::string objs_desc;
};
// Update the global kbps statistics variables
void srs_global_kbps_update(SrsKbpsStats *stats);
class SrsKbsRtcStats
{
public:
std::string rpkts_desc;
std::string spkts_desc;
std::string rtcp_desc;
std::string snk_desc;
std::string rnk_desc;
std::string fid_desc;
};
// Update the global rtc statistics variables
void srs_global_rtc_update(SrsKbsRtcStats *stats);
#endif

View File

@ -42,6 +42,7 @@ using namespace std;
#include <srs_core_performance.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_file.hpp>
#include <srs_kernel_kbps.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_srt.hpp>
@ -73,9 +74,6 @@ bool _srs_in_docker = false;
extern void asan_report_callback(const char *str);
#endif
extern SrsPps *_srs_pps_cids_get;
extern SrsPps *_srs_pps_cids_set;
/**
* main entrance.
*/