AI: Add utest to cover app server module.

This commit is contained in:
OSSRS-AI 2025-10-03 10:15:00 -04:00 committed by winlin
parent 71302c4a77
commit cdfe82357e
36 changed files with 1967 additions and 418 deletions

3
trunk/configure vendored
View File

@ -383,7 +383,8 @@ if [[ $SRS_UTEST == YES ]]; then
"srs_utest_stream_token" "srs_utest_rtc_recv_track" "srs_utest_st2" "srs_utest_hevc_structs"
"srs_utest_coworkers" "srs_utest_pithy_print" "srs_utest_kernel3" "srs_utest_protocol4"
"srs_utest_protocol3" "srs_utest_app" "srs_utest_app2" "srs_utest_app3" "srs_utest_app4"
"srs_utest_app5" "srs_utest_app6" "srs_utest_app7" "srs_utest_app8" "srs_utest_app9")
"srs_utest_app5" "srs_utest_app6" "srs_utest_app7" "srs_utest_app8" "srs_utest_app9"
"srs_utest_app10")
# Always include SRT utest
MODULE_FILES+=("srs_utest_srt")
if [[ $SRS_GB28181 == YES ]]; then

View File

@ -19,6 +19,14 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask()
{
}
ISrsAsyncCallWorker::ISrsAsyncCallWorker()
{
}
ISrsAsyncCallWorker::~ISrsAsyncCallWorker()
{
}
SrsAsyncCallWorker::SrsAsyncCallWorker()
{
trd_ = new SrsDummyCoroutine();

View File

@ -35,10 +35,21 @@ public:
virtual std::string to_string() = 0;
};
// The async call worker, used to execute the task in async mode.
class ISrsAsyncCallWorker
{
public:
ISrsAsyncCallWorker();
virtual ~ISrsAsyncCallWorker();
public:
virtual srs_error_t start() = 0;
};
// The async callback for dvr, callback and other async worker.
// When worker call with the task, the worker will do it in isolate thread.
// That is, the task is execute/call in async mode.
class SrsAsyncCallWorker : public ISrsCoroutineHandler
class SrsAsyncCallWorker : public ISrsCoroutineHandler, public ISrsAsyncCallWorker
{
private:
ISrsCoroutine *trd_;

View File

@ -140,6 +140,31 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(ISrsListener *listener, srs_netfd_t s
return err;
}
srs_error_t SrsAppCasterFlv::start()
{
return manager_->start();
}
bool SrsAppCasterFlv::empty()
{
return manager_->empty();
}
size_t SrsAppCasterFlv::size()
{
return manager_->size();
}
void SrsAppCasterFlv::add(ISrsResource *conn, bool *exists)
{
manager_->add(conn, exists);
}
ISrsResource *SrsAppCasterFlv::at(int index)
{
return manager_->at(index);
}
void SrsAppCasterFlv::remove(ISrsResource *c)
{
ISrsConnection *conn = dynamic_cast<ISrsConnection *>(c);

View File

@ -70,6 +70,11 @@ public:
virtual srs_error_t on_tcp_client(ISrsListener *listener, srs_netfd_t stfd);
// Interface ISrsResourceManager
public:
virtual srs_error_t start();
virtual bool empty();
virtual size_t size();
virtual void add(ISrsResource *conn, bool *exists = NULL);
virtual ISrsResource *at(int index);
virtual void remove(ISrsResource *c);
virtual void subscribe(ISrsDisposingHandler *h);
virtual void unsubscribe(ISrsDisposingHandler *h);

View File

@ -357,6 +357,7 @@ public:
public:
// Stats config
virtual bool get_stats_enabled() = 0;
virtual int get_stats_network() = 0;
public:
// Heartbeat config
@ -364,8 +365,66 @@ public:
virtual srs_utime_t get_heartbeat_interval() = 0;
public:
// RTMPS config
virtual std::string get_rtmps_ssl_cert() = 0;
virtual std::string get_rtmps_ssl_key() = 0;
public:
// Vhost config
virtual SrsConfDirective *get_vhost(std::string vhost, bool try_default_vhost = true) = 0;
virtual bool get_vhost_enabled(std::string vhost) = 0;
virtual bool get_debug_srs_upnode(std::string vhost) = 0;
virtual int get_out_ack_size(std::string vhost) = 0;
virtual int get_in_ack_size(std::string vhost) = 0;
virtual int get_chunk_size(std::string vhost) = 0;
virtual bool get_gop_cache(std::string vhost) = 0;
virtual int get_gop_cache_max_frames(std::string vhost) = 0;
virtual bool get_tcp_nodelay(std::string vhost) = 0;
virtual srs_utime_t get_mw_sleep(std::string vhost, bool is_rtc = false) = 0;
virtual srs_utime_t get_send_min_interval(std::string vhost) = 0;
virtual bool get_mr_enabled(std::string vhost) = 0;
virtual srs_utime_t get_mr_sleep(std::string vhost) = 0;
virtual srs_utime_t get_publish_1stpkt_timeout(std::string vhost) = 0;
virtual srs_utime_t get_publish_normal_timeout(std::string vhost) = 0;
virtual srs_utime_t get_publish_kickoff_for_idle(std::string vhost) = 0;
public:
// Refer config
virtual bool get_refer_enabled(std::string vhost) = 0;
virtual SrsConfDirective *get_refer_all(std::string vhost) = 0;
virtual SrsConfDirective *get_refer_play(std::string vhost) = 0;
virtual SrsConfDirective *get_refer_publish(std::string vhost) = 0;
public:
// Edge config
virtual bool get_vhost_origin_cluster(std::string vhost) = 0;
virtual std::vector<std::string> get_vhost_coworkers(std::string vhost) = 0;
virtual bool get_vhost_edge_token_traverse(std::string vhost) = 0;
virtual SrsConfDirective *get_vhost_edge_origin(std::string vhost) = 0;
public:
// HTTP hooks config
virtual bool get_vhost_http_hooks_enabled(std::string vhost) = 0;
virtual SrsConfDirective *get_vhost_on_connect(std::string vhost) = 0;
virtual SrsConfDirective *get_vhost_on_close(std::string vhost) = 0;
virtual SrsConfDirective *get_vhost_on_publish(std::string vhost) = 0;
virtual SrsConfDirective *get_vhost_on_play(std::string vhost) = 0;
virtual SrsConfDirective *get_vhost_on_stop(std::string vhost) = 0;
public:
// RTC config
virtual bool get_rtc_enabled(std::string vhost) = 0;
public:
// RTSP config
virtual bool get_rtsp_enabled(std::string vhost) = 0;
public:
// Stream bridge config
virtual bool get_rtc_from_rtmp(std::string vhost) = 0;
virtual bool get_rtsp_from_rtmp(std::string vhost) = 0;
public:
virtual bool get_rtc_nack_enabled(std::string vhost) = 0;
virtual bool get_rtc_nack_no_copy(std::string vhost) = 0;
virtual bool get_realtime_enabled(std::string vhost, bool is_rtc) = 0;

View File

@ -388,9 +388,6 @@ SrsEdgeIngester::SrsEdgeIngester()
source_ = NULL;
edge_ = NULL;
req_ = NULL;
#ifdef SRS_APM
span_main_ = NULL;
#endif
upstream_ = new SrsEdgeRtmpUpstream("");
lb_ = new SrsLbRoundRobin();
@ -401,9 +398,6 @@ SrsEdgeIngester::~SrsEdgeIngester()
{
stop();
#ifdef SRS_APM
srs_freep(span_main_);
#endif
srs_freep(upstream_);
srs_freep(lb_);
srs_freep(trd_);
@ -455,14 +449,6 @@ void SrsEdgeIngester::stop()
}
}
#ifdef SRS_APM
ISrsApmSpan *SrsEdgeIngester::span()
{
srs_assert(span_main_);
return span_main_;
}
#endif
// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_INGESTER_CIMS (3 * SRS_UTIME_SECONDS)
@ -477,23 +463,11 @@ srs_error_t SrsEdgeIngester::cycle()
return srs_error_wrap(err, "edge ingester");
}
#ifdef SRS_APM
srs_assert(span_main_);
ISrsApmSpan *start = _srs_apm->span("edge-start")->set_kind(SrsApmKindConsumer)->as_child(span_main_)->end();
srs_freep(start);
#endif
if ((err = do_cycle()) != srs_success) {
srs_warn("EdgeIngester: Ignore error, %s", srs_error_desc(err).c_str());
srs_freep(err);
}
#ifdef SRS_APM
srs_assert(span_main_);
ISrsApmSpan *stop = _srs_apm->span("edge-stop")->set_kind(SrsApmKindConsumer)->as_child(span_main_)->end();
srs_freep(stop);
#endif
// Check whether coroutine is stopped, see https://github.com/ossrs/srs/issues/2901
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "edge ingester");

View File

@ -13,6 +13,7 @@
#include <srs_kernel_ts.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_st.hpp>
#include <srs_kernel_hourglass.hpp>
SrsAppFactory::SrsAppFactory()
{
@ -54,6 +55,11 @@ ISrsOriginHub *SrsAppFactory::create_origin_hub()
return hub;
}
ISrsHourGlass *SrsAppFactory::create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval)
{
return new SrsHourGlass(name, handler, interval);
}
SrsFinalFactory::SrsFinalFactory()
{
}

View File

@ -16,6 +16,8 @@ class ISrsFileReader;
class SrsPath;
class SrsLiveSource;
class ISrsOriginHub;
class ISrsHourGlass;
class ISrsHourGlassHandler;
// The factory to create app objects.
class SrsAppFactory
@ -31,6 +33,7 @@ public:
virtual SrsPath *create_path();
virtual SrsLiveSource *create_live_source();
virtual ISrsOriginHub *create_origin_hub();
virtual ISrsHourGlass *create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval);
};
extern SrsAppFactory *_srs_app_factory;

View File

@ -187,6 +187,14 @@ SSL_CTX *srs_build_dtls_ctx(SrsDtlsVersion version, std::string role)
}
#pragma GCC diagnostic pop
ISrsDtlsCertificate::ISrsDtlsCertificate()
{
}
ISrsDtlsCertificate::~ISrsDtlsCertificate()
{
}
SrsDtlsCertificate::SrsDtlsCertificate()
{
ecdsa_mode_ = true;

View File

@ -19,7 +19,19 @@
class ISrsRequest;
class SrsDtlsCertificate
// The interface for DTLS certificate.
class ISrsDtlsCertificate
{
public:
ISrsDtlsCertificate();
virtual ~ISrsDtlsCertificate();
public:
virtual srs_error_t initialize() = 0;
};
// The DTLS certificate.
class SrsDtlsCertificate : public ISrsDtlsCertificate
{
private:
std::string fingerprint_;

View File

@ -62,20 +62,22 @@ using namespace std;
SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, srs_utime_t ctm, srs_utime_t stm) : SrsBasicRtmpClient(u, ctm, stm)
{
config_ = _srs_config;
}
SrsSimpleRtmpClient::~SrsSimpleRtmpClient()
{
config_ = NULL;
}
srs_error_t SrsSimpleRtmpClient::connect_app()
{
SrsProtocolUtility utility;
std::vector<SrsIPAddress *> &ips = utility.local_ips();
srs_assert(_srs_config->get_stats_network() < (int)ips.size());
SrsIPAddress *local_ip = ips[_srs_config->get_stats_network()];
srs_assert(config_->get_stats_network() < (int)ips.size());
SrsIPAddress *local_ip = ips[config_->get_stats_network()];
bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req_->vhost_);
bool debug_srs_upnode = config_->get_debug_srs_upnode(req_->vhost_);
return do_connect_app(local_ip->ip_, debug_srs_upnode);
}
@ -148,11 +150,15 @@ int64_t SrsRtmpTransport::get_send_bytes()
SrsRtmpsTransport::SrsRtmpsTransport(srs_netfd_t c) : SrsRtmpTransport(c)
{
ssl_ = new SrsSslConnection(skt_);
config_ = _srs_config;
}
SrsRtmpsTransport::~SrsRtmpsTransport()
{
srs_freep(ssl_);
config_ = NULL;
}
ISrsProtocolReadWriter *SrsRtmpsTransport::io()
@ -162,8 +168,8 @@ ISrsProtocolReadWriter *SrsRtmpsTransport::io()
srs_error_t SrsRtmpsTransport::handshake()
{
string crt_file = _srs_config->get_rtmps_ssl_cert();
string key_file = _srs_config->get_rtmps_ssl_key();
string crt_file = config_->get_rtmps_ssl_cert();
string key_file = config_->get_rtmps_ssl_key();
srs_error_t err = ssl_->handshake(key_file, crt_file);
if (err != srs_success) {
return srs_error_wrap(err, "ssl handshake");
@ -185,7 +191,6 @@ SrsRtmpConn::SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, string cip
server_ = svr;
transport_ = transport;
manager_ = _srs_conn_manager;
ip_ = cip;
port_ = cport;
create_time_ = srsu2ms(srs_time_now_cached());
@ -213,12 +218,25 @@ SrsRtmpConn::SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, string cip
publish_1stpkt_timeout_ = 0;
publish_normal_timeout_ = 0;
_srs_config->subscribe(this);
config_ = _srs_config;
manager_ = _srs_conn_manager;
stream_publish_tokens_ = _srs_stream_publish_tokens;
live_sources_ = _srs_sources;
stat_ = _srs_stat;
hooks_ = _srs_hooks;
rtc_sources_ = _srs_rtc_sources;
srt_sources_ = _srs_srt_sources;
rtsp_sources_ = _srs_rtsp_sources;
}
void SrsRtmpConn::assemble()
{
config_->subscribe(this);
}
SrsRtmpConn::~SrsRtmpConn()
{
_srs_config->unsubscribe(this);
config_->unsubscribe(this);
trd_->interrupt();
// wakeup the handler which need to notice.
@ -235,11 +253,15 @@ SrsRtmpConn::~SrsRtmpConn()
srs_freep(rtmp_);
srs_freep(refer_);
srs_freep(security_);
#ifdef SRS_APM
srs_freep(span_main_);
srs_freep(span_connect_);
srs_freep(span_client_);
#endif
manager_ = NULL;
stream_publish_tokens_ = NULL;
live_sources_ = NULL;
stat_ = NULL;
hooks_ = NULL;
rtc_sources_ = NULL;
srt_sources_ = NULL;
rtsp_sources_ = NULL;
}
std::string SrsRtmpConn::desc()
@ -257,12 +279,7 @@ srs_error_t SrsRtmpConn::do_cycle()
{
srs_error_t err = srs_success;
#ifdef SRS_APM
srs_trace("RTMP client transport=%s, ip=%s:%d, fd=%d, trace=%s, span=%s", transport_->transport_type(), ip.c_str(), port, srs_netfd_fileno(transport_->fd()),
span_main_->format_trace_id(), span_main_->format_span_id());
#else
srs_trace("RTMP client transport=%s, ip=%s:%d, fd=%d", transport_->transport_type(), ip_.c_str(), port_, srs_netfd_fileno(transport_->fd()));
#endif
if ((err = transport_->handshake()) != srs_success) {
return srs_error_wrap(err, "transport handshake");
@ -350,12 +367,12 @@ srs_error_t SrsRtmpConn::service_cycle()
ISrsRequest *req = info_->req_;
int out_ack_size = _srs_config->get_out_ack_size(req->vhost_);
int out_ack_size = config_->get_out_ack_size(req->vhost_);
if (out_ack_size && (err = rtmp_->set_window_ack_size(out_ack_size)) != srs_success) {
return srs_error_wrap(err, "rtmp: set out window ack size");
}
int in_ack_size = _srs_config->get_in_ack_size(req->vhost_);
int in_ack_size = config_->get_in_ack_size(req->vhost_);
if (in_ack_size && (err = rtmp_->set_in_window_ack_size(in_ack_size)) != srs_success) {
return srs_error_wrap(err, "rtmp: set in window ack size");
}
@ -370,7 +387,7 @@ srs_error_t SrsRtmpConn::service_cycle()
// set chunk size to larger.
// set the chunk size before any larger response greater than 128,
// to make OBS happy, @see https://github.com/ossrs/srs/issues/454
int chunk_size = _srs_config->get_chunk_size(req->vhost_);
int chunk_size = config_->get_chunk_size(req->vhost_);
if ((err = rtmp_->set_chunk_size(chunk_size)) != srs_success) {
return srs_error_wrap(err, "rtmp: set chunk size %d", chunk_size);
}
@ -380,11 +397,6 @@ srs_error_t SrsRtmpConn::service_cycle()
return srs_error_wrap(err, "rtmp: response connect app");
}
#ifdef SRS_APM
// Must be a connecting application span.
span_connect_->end();
#endif
if ((err = rtmp_->on_bw_done()) != srs_success) {
return srs_error_wrap(err, "rtmp: on bw down");
}
@ -462,7 +474,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
srs_client_type_string(info_->type_).c_str(), req->vhost_.c_str(), req->app_.c_str(), req->stream_.c_str(), req->param_.c_str(), srsu2msi(req->duration_));
// discovery vhost, resolve the vhost from config
SrsConfDirective *parsed_vhost = _srs_config->get_vhost(req->vhost_);
SrsConfDirective *parsed_vhost = config_->get_vhost(req->vhost_);
if (parsed_vhost) {
req->vhost_ = parsed_vhost->arg0();
}
@ -484,8 +496,8 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
// do token traverse before serve it.
// @see https://github.com/ossrs/srs/pull/239
if (true) {
info_->edge_ = _srs_config->get_vhost_is_edge(req->vhost_);
bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost_);
info_->edge_ = config_->get_vhost_is_edge(req->vhost_);
bool edge_traverse = config_->get_vhost_edge_token_traverse(req->vhost_);
if (info_->edge_ && edge_traverse) {
if ((err = check_edge_token_traverse_auth()) != srs_success) {
return srs_error_wrap(err, "rtmp: check token traverse");
@ -510,7 +522,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
// Acquire stream publish token to prevent race conditions across all protocols.
SrsStreamPublishToken *publish_token_raw = NULL;
if (info_->type_ != SrsRtmpConnPlay && (err = _srs_stream_publish_tokens->acquire_token(req, publish_token_raw)) != srs_success) {
if (info_->type_ != SrsRtmpConnPlay && (err = stream_publish_tokens_->acquire_token(req, publish_token_raw)) != srs_success) {
return srs_error_wrap(err, "acquire stream publish token");
}
SrsUniquePtr<SrsStreamPublishToken> publish_token(publish_token_raw);
@ -521,13 +533,13 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
// find a source to serve.
SrsSharedPtr<SrsLiveSource> live_source;
if ((err = _srs_sources->fetch_or_create(req, live_source)) != srs_success) {
if ((err = live_sources_->fetch_or_create(req, live_source)) != srs_success) {
return srs_error_wrap(err, "rtmp: fetch source");
}
srs_assert(live_source.get() != NULL);
bool enabled_cache = _srs_config->get_gop_cache(req->vhost_);
int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost_);
bool enabled_cache = config_->get_gop_cache(req->vhost_);
int gcmf = config_->get_gop_cache_max_frames(req->vhost_);
srs_trace("source url=%s, ip=%s, cache=%d/%d, is_edge=%d, source_id=%s/%s",
req->get_stream_url().c_str(), ip_.c_str(), enabled_cache, gcmf, info_->edge_, live_source->source_id().c_str(),
live_source->pre_source_id().c_str());
@ -542,8 +554,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
}
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic *stat = _srs_stat;
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info_->type_)) != srs_success) {
if ((err = stat_->on_client(_srs_context->get_id().c_str(), req, this, info_->type_)) != srs_success) {
return srs_error_wrap(err, "rtmp: stat client");
}
@ -552,13 +563,6 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
return srs_error_wrap(err, "rtmp: callback on play");
}
#ifdef SRS_APM
// Must be a client span.
span_client_->set_name("play")->end();
// We end the connection span because it's a producer and only trace the established.
span_main_->end();
#endif
err = playing(live_source);
http_hooks_on_stop();
@ -569,13 +573,6 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
return srs_error_wrap(err, "rtmp: start FMLE publish");
}
#ifdef SRS_APM
// Must be a client span.
span_client_->set_name("publish")->end();
// We end the connection span because it's a producer and only trace the established.
span_main_->end();
#endif
return publishing(live_source);
}
case SrsRtmpConnHaivisionPublish: {
@ -583,13 +580,6 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
return srs_error_wrap(err, "rtmp: start HAIVISION publish");
}
#ifdef SRS_APM
// Must be a client span.
span_client_->set_name("publish")->end();
// We end the connection span because it's a producer and only trace the established.
span_main_->end();
#endif
return publishing(live_source);
}
case SrsRtmpConnFlashPublish: {
@ -597,13 +587,6 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
return srs_error_wrap(err, "rtmp: start FLASH publish");
}
#ifdef SRS_APM
// Must be a client span.
span_client_->set_name("publish")->end();
// We end the connection span because it's a producer and only trace the established.
span_main_->end();
#endif
return publishing(live_source);
}
default: {
@ -621,12 +604,12 @@ srs_error_t SrsRtmpConn::check_vhost(bool try_default_vhost)
ISrsRequest *req = info_->req_;
srs_assert(req != NULL);
SrsConfDirective *vhost = _srs_config->get_vhost(req->vhost_, try_default_vhost);
SrsConfDirective *vhost = config_->get_vhost(req->vhost_, try_default_vhost);
if (vhost == NULL) {
return srs_error_new(ERROR_RTMP_VHOST_NOT_FOUND, "rtmp: no vhost %s", req->vhost_.c_str());
}
if (!_srs_config->get_vhost_enabled(req->vhost_)) {
if (!config_->get_vhost_enabled(req->vhost_)) {
return srs_error_new(ERROR_RTMP_VHOST_NOT_FOUND, "rtmp: vhost %s disabled", req->vhost_.c_str());
}
@ -635,8 +618,8 @@ srs_error_t SrsRtmpConn::check_vhost(bool try_default_vhost)
req->vhost_ = vhost->arg0();
}
if (_srs_config->get_refer_enabled(req->vhost_)) {
if ((err = refer_->check(req->pageUrl_, _srs_config->get_refer_all(req->vhost_))) != srs_success) {
if (config_->get_refer_enabled(req->vhost_)) {
if ((err = refer_->check(req->pageUrl_, config_->get_refer_all(req->vhost_))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}
@ -654,16 +637,16 @@ srs_error_t SrsRtmpConn::playing(SrsSharedPtr<SrsLiveSource> source)
// Check page referer of player.
ISrsRequest *req = info_->req_;
if (_srs_config->get_refer_enabled(req->vhost_)) {
if ((err = refer_->check(req->pageUrl_, _srs_config->get_refer_play(req->vhost_))) != srs_success) {
if (config_->get_refer_enabled(req->vhost_)) {
if ((err = refer_->check(req->pageUrl_, config_->get_refer_play(req->vhost_))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}
// When origin cluster enabled, try to redirect to the origin which is active.
// A active origin is a server which is delivering stream.
if (!info_->edge_ && _srs_config->get_vhost_origin_cluster(req->vhost_) && source->inactive()) {
vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost_);
if (!info_->edge_ && config_->get_vhost_origin_cluster(req->vhost_) && source->inactive()) {
vector<string> coworkers = config_->get_vhost_coworkers(req->vhost_);
for (int i = 0; i < (int)coworkers.size(); i++) {
// TODO: FIXME: User may config the server itself as coworker, we must identify and ignore it.
string host;
@ -671,7 +654,7 @@ srs_error_t SrsRtmpConn::playing(SrsSharedPtr<SrsLiveSource> source)
string coworker = coworkers.at(i);
string url = "http://" + coworker + "/api/v1/clusters?" + "vhost=" + req->vhost_ + "&ip=" + req->host_ + "&app=" + req->app_ + "&stream=" + req->stream_ + "&coworker=" + coworker;
if ((err = _srs_hooks->discover_co_workers(url, host, port)) != srs_success) {
if ((err = hooks_->discover_co_workers(url, host, port)) != srs_success) {
// If failed to discovery stream in this coworker, we should request the next one util the last.
// @see https://github.com/ossrs/srs/issues/1223
if (i < (int)coworkers.size() - 1) {
@ -752,14 +735,14 @@ srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveC
int64_t starttime = -1;
// setup the realtime.
realtime_ = _srs_config->get_realtime_enabled(req->vhost_, false);
realtime_ = config_->get_realtime_enabled(req->vhost_, false);
// setup the mw config.
// when mw_sleep changed, resize the socket send buffer.
mw_msgs_ = _srs_config->get_mw_msgs(req->vhost_, realtime_, false);
mw_sleep_ = _srs_config->get_mw_sleep(req->vhost_);
mw_msgs_ = config_->get_mw_msgs(req->vhost_, realtime_, false);
mw_sleep_ = config_->get_mw_sleep(req->vhost_);
transport_->set_socket_buffer(mw_sleep_);
// initialize the send_min_interval
send_min_interval_ = _srs_config->get_send_min_interval(req->vhost_);
send_min_interval_ = config_->get_send_min_interval(req->vhost_);
srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d",
srsu2msi(send_min_interval_), srsu2msi(mw_sleep_), mw_msgs_, realtime_, tcp_nodelay_);
@ -806,12 +789,6 @@ srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveC
srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d",
(int)pprint->age(), count, kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(),
kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m(), srsu2msi(mw_sleep_), mw_msgs_);
#ifdef SRS_APM
// TODO: Do not use pithy print for frame span.
ISrsApmSpan *sample = _srs_apm->span("play-frame")->set_kind(SrsApmKindConsumer)->as_child(span.get())->attr("msgs", srs_fmt_sprintf("%d", count))->attr("kbps", srs_fmt_sprintf("%d", kbps_->get_send_kbps_30s()));
srs_freep(sample);
#endif
}
if (count <= 0) {
@ -871,15 +848,14 @@ srs_error_t SrsRtmpConn::publishing(SrsSharedPtr<SrsLiveSource> source)
ISrsRequest *req = info_->req_;
if (_srs_config->get_refer_enabled(req->vhost_)) {
if ((err = refer_->check(req->pageUrl_, _srs_config->get_refer_publish(req->vhost_))) != srs_success) {
if (config_->get_refer_enabled(req->vhost_)) {
if ((err = refer_->check(req->pageUrl_, config_->get_refer_publish(req->vhost_))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic *stat = _srs_stat;
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info_->type_)) != srs_success) {
if ((err = stat_->on_client(_srs_context->get_id().c_str(), req, this, info_->type_)) != srs_success) {
return srs_error_wrap(err, "rtmp: stat client");
}
@ -922,23 +898,19 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPu
}
// initialize the publish timeout.
publish_1stpkt_timeout_ = _srs_config->get_publish_1stpkt_timeout(req->vhost_);
publish_normal_timeout_ = _srs_config->get_publish_normal_timeout(req->vhost_);
srs_utime_t publish_kickoff_for_idle = _srs_config->get_publish_kickoff_for_idle(req->vhost_);
publish_1stpkt_timeout_ = config_->get_publish_1stpkt_timeout(req->vhost_);
publish_normal_timeout_ = config_->get_publish_normal_timeout(req->vhost_);
srs_utime_t publish_kickoff_for_idle = config_->get_publish_kickoff_for_idle(req->vhost_);
// set the sock options.
set_sock_options();
if (true) {
bool mr = _srs_config->get_mr_enabled(req->vhost_);
srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost_);
bool mr = config_->get_mr_enabled(req->vhost_);
srs_utime_t mr_sleep = config_->get_mr_sleep(req->vhost_);
srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d", mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout_), srsu2msi(publish_normal_timeout_), tcp_nodelay_);
}
#ifdef SRS_APM
SrsUniquePtr<ISrsApmSpan> span(_srs_apm->span("publish-cycle")->set_kind(SrsApmKindProducer)->as_child(span_client_)->attr("timeout", srs_fmt_sprintf("%d", srsu2msi(publish_normal_timeout_)))->end());
#endif
// Response the start publishing message, let client start to publish messages.
if ((err = rtmp_->start_publishing(info_->res_->stream_id_)) != srs_success) {
return srs_error_wrap(err, "start publishing");
@ -980,8 +952,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPu
// Update the stat for video fps.
// @remark https://github.com/ossrs/srs/issues/851
SrsStatistic *stat = _srs_stat;
if ((err = stat->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != srs_success) {
if ((err = stat_->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != srs_success) {
return srs_error_wrap(err, "rtmp: stat video frames");
}
nb_frames = rtrd->nb_video_frames();
@ -989,18 +960,12 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPu
// reportable
if (pprint->can_print()) {
kbps_->sample();
bool mr = _srs_config->get_mr_enabled(req->vhost_);
srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost_);
bool mr = config_->get_mr_enabled(req->vhost_);
srs_utime_t mr_sleep = config_->get_mr_sleep(req->vhost_);
srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d",
(int)pprint->age(), kbps_->get_send_kbps(), kbps_->get_send_kbps_30s(), kbps_->get_send_kbps_5m(),
kbps_->get_recv_kbps(), kbps_->get_recv_kbps_30s(), kbps_->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep),
srsu2msi(publish_1stpkt_timeout_), srsu2msi(publish_normal_timeout_));
#ifdef SRS_APM
// TODO: Do not use pithy print for frame span.
ISrsApmSpan *sample = _srs_apm->span("publish-frame")->set_kind(SrsApmKindConsumer)->as_child(span.get())->attr("msgs", srs_fmt_sprintf("%" PRId64, nb_frames))->attr("kbps", srs_fmt_sprintf("%d", kbps_->get_recv_kbps_30s()));
srs_freep(sample);
#endif
}
}
@ -1020,9 +985,9 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr<SrsLiveSource> source)
// Check whether RTC stream is busy.
SrsSharedPtr<SrsRtcSource> rtc;
bool rtc_server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost_);
bool edge = _srs_config->get_vhost_is_edge(req->vhost_);
bool rtc_server_enabled = config_->get_rtc_server_enabled();
bool rtc_enabled = config_->get_rtc_enabled(req->vhost_);
bool edge = config_->get_vhost_is_edge(req->vhost_);
if (rtc_enabled && edge) {
rtc_enabled = false;
@ -1030,7 +995,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr<SrsLiveSource> source)
}
if (rtc_server_enabled && rtc_enabled && !info_->edge_) {
if ((err = _srs_rtc_sources->fetch_or_create(req, rtc)) != srs_success) {
if ((err = rtc_sources_->fetch_or_create(req, rtc)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -1040,11 +1005,11 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr<SrsLiveSource> source)
}
// Check whether SRT stream is busy.
bool srt_server_enabled = _srs_config->get_srt_enabled();
bool srt_enabled = _srs_config->get_srt_enabled(req->vhost_);
bool srt_server_enabled = config_->get_srt_enabled();
bool srt_enabled = config_->get_srt_enabled(req->vhost_);
if (srt_server_enabled && srt_enabled && !info_->edge_) {
SrsSharedPtr<SrsSrtSource> srt;
if ((err = _srs_srt_sources->fetch_or_create(req, srt)) != srs_success) {
if ((err = srt_sources_->fetch_or_create(req, srt)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -1056,10 +1021,10 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr<SrsLiveSource> source)
#ifdef SRS_RTSP
// RTSP only support viewer, so we don't need to check it.
SrsSharedPtr<SrsRtspSource> rtsp;
bool rtsp_server_enabled = _srs_config->get_rtsp_server_enabled();
bool rtsp_enabled = _srs_config->get_rtsp_enabled(req->vhost_);
bool rtsp_server_enabled = config_->get_rtsp_server_enabled();
bool rtsp_enabled = config_->get_rtsp_enabled(req->vhost_);
if (rtsp_server_enabled && rtsp_enabled && !info_->edge_) {
if ((err = _srs_rtsp_sources->fetch_or_create(req, rtsp)) != srs_success) {
if ((err = rtsp_sources_->fetch_or_create(req, rtsp)) != srs_success) {
return srs_error_wrap(err, "create source");
}
}
@ -1070,7 +1035,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr<SrsLiveSource> source)
SrsRtmpBridge *bridge = new SrsRtmpBridge();
#if defined(SRS_FFMPEG_FIT)
bool rtmp_to_rtc = _srs_config->get_rtc_from_rtmp(req->vhost_);
bool rtmp_to_rtc = config_->get_rtc_from_rtmp(req->vhost_);
if (rtmp_to_rtc && edge) {
rtmp_to_rtc = false;
srs_warn("disable RTMP to WebRTC for edge vhost=%s", req->vhost_.c_str());
@ -1082,7 +1047,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr<SrsLiveSource> source)
#endif
#ifdef SRS_RTSP
if (rtsp.get() && _srs_config->get_rtsp_from_rtmp(req->vhost_)) {
if (rtsp.get() && config_->get_rtsp_from_rtmp(req->vhost_)) {
bridge->enable_rtmp2rtsp(rtsp);
}
#endif
@ -1277,7 +1242,7 @@ void SrsRtmpConn::set_sock_options()
{
ISrsRequest *req = info_->req_;
bool nvalue = _srs_config->get_tcp_nodelay(req->vhost_);
bool nvalue = config_->get_tcp_nodelay(req->vhost_);
if (nvalue != tcp_nodelay_) {
tcp_nodelay_ = nvalue;
@ -1296,7 +1261,7 @@ srs_error_t SrsRtmpConn::check_edge_token_traverse_auth()
ISrsRequest *req = info_->req_;
srs_assert(req);
vector<string> args = _srs_config->get_vhost_edge_origin(req->vhost_)->args_;
vector<string> args = config_->get_vhost_edge_origin(req->vhost_)->args_;
if (args.empty()) {
return err;
}
@ -1364,7 +1329,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_connect()
ISrsRequest *req = info_->req_;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) {
if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) {
return err;
}
@ -1374,7 +1339,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_connect()
vector<string> hooks;
if (true) {
SrsConfDirective *conf = _srs_config->get_vhost_on_connect(req->vhost_);
SrsConfDirective *conf = config_->get_vhost_on_connect(req->vhost_);
if (!conf) {
return err;
@ -1385,7 +1350,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_connect()
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
if ((err = _srs_hooks->on_connect(url, req)) != srs_success) {
if ((err = hooks_->on_connect(url, req)) != srs_success) {
return srs_error_wrap(err, "rtmp on_connect %s", url.c_str());
}
}
@ -1397,7 +1362,7 @@ void SrsRtmpConn::http_hooks_on_close()
{
ISrsRequest *req = info_->req_;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) {
if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) {
return;
}
@ -1407,7 +1372,7 @@ void SrsRtmpConn::http_hooks_on_close()
vector<string> hooks;
if (true) {
SrsConfDirective *conf = _srs_config->get_vhost_on_close(req->vhost_);
SrsConfDirective *conf = config_->get_vhost_on_close(req->vhost_);
if (!conf) {
return;
@ -1418,7 +1383,7 @@ void SrsRtmpConn::http_hooks_on_close()
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
_srs_hooks->on_close(url, req, transport_->get_send_bytes(), transport_->get_recv_bytes());
hooks_->on_close(url, req, transport_->get_send_bytes(), transport_->get_recv_bytes());
}
}
@ -1428,7 +1393,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_publish()
ISrsRequest *req = info_->req_;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) {
if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) {
return err;
}
@ -1438,7 +1403,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_publish()
vector<string> hooks;
if (true) {
SrsConfDirective *conf = _srs_config->get_vhost_on_publish(req->vhost_);
SrsConfDirective *conf = config_->get_vhost_on_publish(req->vhost_);
if (!conf) {
return err;
@ -1449,7 +1414,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_publish()
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
if ((err = _srs_hooks->on_publish(url, req)) != srs_success) {
if ((err = hooks_->on_publish(url, req)) != srs_success) {
return srs_error_wrap(err, "rtmp on_publish %s", url.c_str());
}
}
@ -1461,7 +1426,7 @@ void SrsRtmpConn::http_hooks_on_unpublish()
{
ISrsRequest *req = info_->req_;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) {
if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) {
return;
}
@ -1471,7 +1436,7 @@ void SrsRtmpConn::http_hooks_on_unpublish()
vector<string> hooks;
if (true) {
SrsConfDirective *conf = _srs_config->get_vhost_on_unpublish(req->vhost_);
SrsConfDirective *conf = config_->get_vhost_on_unpublish(req->vhost_);
if (!conf) {
return;
@ -1482,7 +1447,7 @@ void SrsRtmpConn::http_hooks_on_unpublish()
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
_srs_hooks->on_unpublish(url, req);
hooks_->on_unpublish(url, req);
}
}
@ -1492,7 +1457,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_play()
ISrsRequest *req = info_->req_;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) {
if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) {
return err;
}
@ -1502,7 +1467,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_play()
vector<string> hooks;
if (true) {
SrsConfDirective *conf = _srs_config->get_vhost_on_play(req->vhost_);
SrsConfDirective *conf = config_->get_vhost_on_play(req->vhost_);
if (!conf) {
return err;
@ -1513,7 +1478,7 @@ srs_error_t SrsRtmpConn::http_hooks_on_play()
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
if ((err = _srs_hooks->on_play(url, req)) != srs_success) {
if ((err = hooks_->on_play(url, req)) != srs_success) {
return srs_error_wrap(err, "rtmp on_play %s", url.c_str());
}
}
@ -1525,7 +1490,7 @@ void SrsRtmpConn::http_hooks_on_stop()
{
ISrsRequest *req = info_->req_;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) {
if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) {
return;
}
@ -1535,7 +1500,7 @@ void SrsRtmpConn::http_hooks_on_stop()
vector<string> hooks;
if (true) {
SrsConfDirective *conf = _srs_config->get_vhost_on_stop(req->vhost_);
SrsConfDirective *conf = config_->get_vhost_on_stop(req->vhost_);
if (!conf) {
return;
@ -1546,7 +1511,7 @@ void SrsRtmpConn::http_hooks_on_stop()
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
_srs_hooks->on_stop(url, req);
hooks_->on_stop(url, req);
}
return;
@ -1571,9 +1536,8 @@ srs_error_t SrsRtmpConn::cycle()
err = do_cycle();
// Update statistic when done.
SrsStatistic *stat = _srs_stat;
stat->kbps_add_delta(get_id().c_str(), delta_);
stat->on_disconnect(get_id().c_str(), err);
stat_->kbps_add_delta(get_id().c_str(), delta_);
stat_->on_disconnect(get_id().c_str(), err);
// Notify manager to remove it.
// Note that we create this object, so we use manager to remove it.

View File

@ -39,12 +39,23 @@ class ISrsWakable;
class SrsRtmpCommonMessage;
class SrsRtmpCommand;
class SrsNetworkDelta;
class ISrsAppConfig;
class SrsSslConnection;
class ISrsResourceManager;
class ISrsStreamPublishTokenManager;
class ISrsLiveSourceManager;
class ISrsStatistic;
class ISrsHttpHooks;
class ISrsRtcSourceManager;
class ISrsSrtSourceManager;
class ISrsRtspSourceManager;
// The simple rtmp client for SRS.
class SrsSimpleRtmpClient : public SrsBasicRtmpClient
{
private:
ISrsAppConfig *config_;
public:
SrsSimpleRtmpClient(std::string u, srs_utime_t ctm, srs_utime_t stm);
virtual ~SrsSimpleRtmpClient();
@ -103,6 +114,9 @@ public:
// The SSL/TLS transport layer for RTMPS connections.
class SrsRtmpsTransport : public SrsRtmpTransport
{
private:
ISrsAppConfig *config_;
private:
SrsSslConnection *ssl_;
@ -124,6 +138,17 @@ class SrsRtmpConn : public ISrsConnection, public ISrsStartable, public ISrsRelo
// For the thread to directly access any field of connection.
friend class SrsPublishRecvThread;
private:
ISrsResourceManager *manager_;
ISrsAppConfig *config_;
ISrsStreamPublishTokenManager *stream_publish_tokens_;
ISrsLiveSourceManager *live_sources_;
ISrsStatistic *stat_;
ISrsHttpHooks *hooks_;
ISrsRtcSourceManager *rtc_sources_;
ISrsSrtSourceManager *srt_sources_;
ISrsRtspSourceManager *rtsp_sources_;
private:
SrsServer *server_;
SrsRtmpServer *rtmp_;
@ -158,8 +183,6 @@ private:
// Each connection start a green thread,
// when thread stop, the connection will be delete by server.
ISrsCoroutine *trd_;
// The manager object to manage the connection.
ISrsResourceManager *manager_;
// The ip and port of client.
std::string ip_;
int port_;
@ -172,6 +195,7 @@ private:
public:
SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, std::string cip, int port);
void assemble();
virtual ~SrsRtmpConn();
// Interface ISrsResource.
public:

View File

@ -529,6 +529,10 @@ public:
virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsLiveSource> &pps) = 0;
// Get the exists source, NULL when not exists.
virtual SrsSharedPtr<SrsLiveSource> fetch(ISrsRequest *r) = 0;
public:
virtual void dispose() = 0;
virtual srs_error_t initialize() = 0;
};
// The source manager to create and refresh all stream sources.

View File

@ -111,6 +111,14 @@ void SrsRtspConsumer::on_stream_change(SrsRtcSourceDescription *desc)
}
}
ISrsRtspSourceManager::ISrsRtspSourceManager()
{
}
ISrsRtspSourceManager::~ISrsRtspSourceManager()
{
}
SrsRtspSourceManager::SrsRtspSourceManager()
{
lock_ = srs_mutex_new();

View File

@ -67,7 +67,21 @@ public:
void on_stream_change(SrsRtcSourceDescription *desc);
};
class SrsRtspSourceManager : public ISrsHourGlassHandler
// The RTSP source manager interface.
class ISrsRtspSourceManager
{
public:
ISrsRtspSourceManager();
virtual ~ISrsRtspSourceManager();
public:
virtual srs_error_t initialize() = 0;
virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsRtspSource> &pps) = 0;
virtual SrsSharedPtr<SrsRtspSource> fetch(ISrsRequest *r) = 0;
};
// The RTSP source manager.
class SrsRtspSourceManager : public ISrsHourGlassHandler, public ISrsRtspSourceManager
{
private:
srs_mutex_t lock_;

View File

@ -62,6 +62,7 @@ using namespace std;
#include <srs_app_rtsp_conn.hpp>
#include <srs_app_rtsp_source.hpp>
#endif
#include <srs_app_factory.hpp>
SrsServer *_srs_server = NULL;
@ -186,6 +187,22 @@ SrsServer::SrsServer()
rtc_session_manager_ = new SrsRtcSessionManager();
config_ = _srs_config;
live_sources_ = _srs_sources;
conn_manager_ = _srs_conn_manager;
rtc_dtls_certificate_ = _srs_rtc_dtls_certificate;
dvr_async_ = _srs_dvr_async;
circuit_breaker_ = _srs_circuit_breaker;
srt_sources_ = _srs_srt_sources;
rtc_sources_ = _srs_rtc_sources;
#ifdef SRS_RTSP
rtsp_sources_ = _srs_rtsp_sources;
#endif
#ifdef SRS_GB28181
gb_manager_ = _srs_gb_manager;
#endif
log_ = _srs_log;
stat_ = _srs_stat;
app_factory_ = _srs_app_factory;
}
SrsServer::~SrsServer()
@ -238,6 +255,20 @@ SrsServer::~SrsServer()
srs_freep(rtc_session_manager_);
config_ = NULL;
live_sources_ = NULL;
conn_manager_ = NULL;
rtc_dtls_certificate_ = NULL;
dvr_async_ = NULL;
circuit_breaker_ = NULL;
srt_sources_ = NULL;
rtc_sources_ = NULL;
rtsp_sources_ = NULL;
#ifdef SRS_GB28181
gb_manager_ = NULL;
#endif
log_ = NULL;
stat_ = NULL;
app_factory_ = NULL;
}
void SrsServer::dispose()
@ -267,7 +298,7 @@ void SrsServer::dispose()
ingester_->dispose();
// dispose the source for hls and dvr.
_srs_sources->dispose();
live_sources_->dispose();
// @remark don't dispose all connections, for too slow.
}
@ -307,17 +338,17 @@ void SrsServer::gracefully_dispose()
// Wait for connections to quit.
// While gracefully quiting, user can requires SRS to fast quit.
int wait_step = 1;
while (!_srs_conn_manager->empty() && !signal_fast_quit_) {
for (int i = 0; i < wait_step && !_srs_conn_manager->empty() && !signal_fast_quit_; i++) {
while (!conn_manager_->empty() && !signal_fast_quit_) {
for (int i = 0; i < wait_step && !conn_manager_->empty() && !signal_fast_quit_; i++) {
srs_usleep(1000 * SRS_UTIME_MILLISECONDS);
}
wait_step = (wait_step * 2) % 33;
srs_trace("wait for %d conns to quit", (int)_srs_conn_manager->size());
srs_trace("wait for %d conns to quit", (int)conn_manager_->size());
}
// dispose the source for hls and dvr.
_srs_sources->dispose();
live_sources_->dispose();
srs_trace("source disposed");
srs_usleep(config_->get_grace_final_wait());
@ -355,12 +386,12 @@ srs_error_t SrsServer::initialize()
}
// Initialize WebRTC DTLS certificate
if ((err = _srs_rtc_dtls_certificate->initialize()) != srs_success) {
if ((err = rtc_dtls_certificate_->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc dtls certificate initialize");
}
// Start the DVR async call.
if ((err = _srs_dvr_async->start()) != srs_success) {
if ((err = dvr_async_->start()) != srs_success) {
return srs_error_wrap(err, "dvr async");
}
@ -431,7 +462,7 @@ srs_error_t SrsServer::run()
srs_error_t err = srs_success;
// Circuit breaker to protect server, which depends on server.
if ((err = _srs_circuit_breaker->initialize()) != srs_success) {
if ((err = circuit_breaker_->initialize()) != srs_success) {
return srs_error_wrap(err, "init circuit breaker");
}
@ -460,20 +491,20 @@ srs_error_t SrsServer::run()
return srs_error_wrap(err, "ingest");
}
if ((err = _srs_sources->initialize()) != srs_success) {
if ((err = live_sources_->initialize()) != srs_success) {
return srs_error_wrap(err, "live sources");
}
if ((err = _srs_srt_sources->initialize()) != srs_success) {
if ((err = srt_sources_->initialize()) != srs_success) {
return srs_error_wrap(err, "srt sources");
}
if ((err = _srs_rtc_sources->initialize()) != srs_success) {
if ((err = rtc_sources_->initialize()) != srs_success) {
return srs_error_wrap(err, "rtc sources");
}
#ifdef SRS_RTSP
if ((err = _srs_rtsp_sources->initialize()) != srs_success) {
if ((err = rtsp_sources_->initialize()) != srs_success) {
return srs_error_wrap(err, "rtsp sources");
}
#endif
@ -483,7 +514,7 @@ srs_error_t SrsServer::run()
}
#ifdef SRS_GB28181
if ((err = _srs_gb_manager->start()) != srs_success) {
if ((err = gb_manager_->start()) != srs_success) {
return srs_error_wrap(err, "start manager");
}
#endif
@ -659,7 +690,7 @@ srs_error_t SrsServer::listen()
return srs_error_wrap(err, "rtc udp listen");
}
if ((err = _srs_conn_manager->start()) != srs_success) {
if ((err = conn_manager_->start()) != srs_success) {
return srs_error_wrap(err, "connection manager");
}
@ -873,7 +904,7 @@ void SrsServer::on_signal(int signo)
#ifndef SRS_GPERF_MC
if (signo == SRS_SIGNAL_REOPEN_LOG) {
_srs_log->reopen();
log_->reopen();
srs_warn("reopen log file, signo=%d", signo);
return;
@ -924,6 +955,72 @@ srs_error_t _srs_reload_err;
SrsReloadState _srs_reload_state;
std::string _srs_reload_id;
srs_error_t SrsServer::do2_cycle()
{
srs_error_t err = srs_success;
// gracefully quit for SIGINT or SIGTERM or SIGQUIT.
if (signal_fast_quit_ || signal_gracefully_quit_) {
srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit_, signal_gracefully_quit_);
return err;
}
// for gperf heap checker,
// @see: research/gperftools/heap-checker/heap_checker.cc
// if user interrupt the program, exit to check mem leak.
// but, if gperf, use reload to ensure main return normally,
// because directly exit will cause core-dump.
#ifdef SRS_GPERF_MC
if (signal_gmc_stop_) {
srs_warn("gmc got singal to stop server.");
return err;
}
#endif
// do persistence config to file.
if (signal_persistence_config_) {
signal_persistence_config_ = false;
srs_info("get signal to persistence config to file.");
if ((err = config_->persistence()) != srs_success) {
return srs_error_wrap(err, "config persistence to file");
}
srs_trace("persistence config to file success.");
}
// do reload the config.
if (signal_reload_) {
signal_reload_ = false;
srs_trace("starting reload config.");
SrsReloadState state = SrsReloadStateInit;
_srs_reload_state = SrsReloadStateInit;
srs_freep(_srs_reload_err);
SrsRand rand;
_srs_reload_id = rand.gen_str(7);
err = config_->reload(&state);
_srs_reload_state = state;
_srs_reload_err = srs_error_copy(err);
if (err != srs_success) {
// If the parsing and transformation of the configuration fail, we can tolerate it by simply
// ignoring the new configuration and continuing to use the current one. However, if the
// application of the new configuration fails, some configurations may be applied while
// others may not. For instance, the listening port may be closed when the configuration
// is set to listen on an unavailable port. In such cases, we should terminate the service.
if (state == SrsReloadStateApplying) {
return srs_error_wrap(err, "reload fatal error state=%d", state);
}
srs_warn("reload failed, state=%d, err %s", state, srs_error_desc(err).c_str());
srs_freep(err);
} else {
srs_trace("reload config success, state=%d.", state);
}
}
return err;
}
srs_error_t SrsServer::do_cycle()
{
srs_error_t err = srs_success;
@ -937,63 +1034,8 @@ srs_error_t SrsServer::do_cycle()
return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid_, ::getppid());
}
// gracefully quit for SIGINT or SIGTERM or SIGQUIT.
if (signal_fast_quit_ || signal_gracefully_quit_) {
srs_trace("cleanup for quit signal fast=%d, grace=%d", signal_fast_quit_, signal_gracefully_quit_);
return err;
}
// for gperf heap checker,
// @see: research/gperftools/heap-checker/heap_checker.cc
// if user interrupt the program, exit to check mem leak.
// but, if gperf, use reload to ensure main return normally,
// because directly exit will cause core-dump.
#ifdef SRS_GPERF_MC
if (signal_gmc_stop_) {
srs_warn("gmc got singal to stop server.");
return err;
}
#endif
// do persistence config to file.
if (signal_persistence_config_) {
signal_persistence_config_ = false;
srs_info("get signal to persistence config to file.");
if ((err = config_->persistence()) != srs_success) {
return srs_error_wrap(err, "config persistence to file");
}
srs_trace("persistence config to file success.");
}
// do reload the config.
if (signal_reload_) {
signal_reload_ = false;
srs_trace("starting reload config.");
SrsReloadState state = SrsReloadStateInit;
_srs_reload_state = SrsReloadStateInit;
srs_freep(_srs_reload_err);
SrsRand rand;
_srs_reload_id = rand.gen_str(7);
err = config_->reload(&state);
_srs_reload_state = state;
_srs_reload_err = srs_error_copy(err);
if (err != srs_success) {
// If the parsing and transformation of the configuration fail, we can tolerate it by simply
// ignoring the new configuration and continuing to use the current one. However, if the
// application of the new configuration fails, some configurations may be applied while
// others may not. For instance, the listening port may be closed when the configuration
// is set to listen on an unavailable port. In such cases, we should terminate the service.
if (state == SrsReloadStateApplying) {
return srs_error_wrap(err, "reload fatal error state=%d", state);
}
srs_warn("reload failed, state=%d, err %s", state, srs_error_desc(err).c_str());
srs_freep(err);
} else {
srs_trace("reload config success, state=%d.", state);
}
if ((err = do2_cycle()) != srs_success) {
return srs_error_wrap(err, "cycle");
}
srs_usleep(1 * SRS_UTIME_SECONDS);
@ -1007,7 +1049,7 @@ srs_error_t SrsServer::setup_ticks()
srs_error_t err = srs_success;
srs_freep(timer_);
timer_ = new SrsHourGlass("srs", this, 1 * SRS_UTIME_SECONDS);
timer_ = app_factory_->create_hourglass("srs", this, 1 * SRS_UTIME_SECONDS);
if (config_->get_stats_enabled()) {
if ((err = timer_->tick(2, 3 * SRS_UTIME_SECONDS)) != srs_success) {
@ -1098,47 +1140,45 @@ srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
void SrsServer::resample_kbps()
{
SrsStatistic *stat = _srs_stat;
// collect delta from all clients.
for (int i = 0; i < (int)_srs_conn_manager->size(); i++) {
ISrsResource *c = _srs_conn_manager->at(i);
for (int i = 0; i < (int)conn_manager_->size(); i++) {
ISrsResource *c = conn_manager_->at(i);
SrsRtmpConn *rtmp = dynamic_cast<SrsRtmpConn *>(c);
if (rtmp) {
stat->kbps_add_delta(c->get_id().c_str(), rtmp->delta());
stat_->kbps_add_delta(c->get_id().c_str(), rtmp->delta());
continue;
}
SrsHttpxConn *httpx = dynamic_cast<SrsHttpxConn *>(c);
if (httpx) {
stat->kbps_add_delta(c->get_id().c_str(), httpx->delta());
stat_->kbps_add_delta(c->get_id().c_str(), httpx->delta());
continue;
}
#ifdef SRS_RTSP
SrsRtspConnection *rtsp = dynamic_cast<SrsRtspConnection *>(c);
if (rtsp) {
stat->kbps_add_delta(c->get_id().c_str(), rtsp->delta());
stat_->kbps_add_delta(c->get_id().c_str(), rtsp->delta());
continue;
}
#endif
SrsRtcTcpConn *tcp = dynamic_cast<SrsRtcTcpConn *>(c);
if (tcp) {
stat->kbps_add_delta(c->get_id().c_str(), tcp->delta());
stat_->kbps_add_delta(c->get_id().c_str(), tcp->delta());
continue;
}
SrsMpegtsSrtConn *srt = dynamic_cast<SrsMpegtsSrtConn *>(c);
if (srt) {
stat->kbps_add_delta(c->get_id().c_str(), srt->delta());
stat_->kbps_add_delta(c->get_id().c_str(), srt->delta());
continue;
}
SrsRtcConnection *rtc = dynamic_cast<SrsRtcConnection *>(c);
if (rtc) {
stat->kbps_add_delta(c->get_id().c_str(), rtc->delta());
stat_->kbps_add_delta(c->get_id().c_str(), rtc->delta());
continue;
}
@ -1147,7 +1187,7 @@ void SrsServer::resample_kbps()
}
// Update the global server level statistics.
stat->kbps_sample();
stat_->kbps_sample();
}
srs_error_t SrsServer::listen_srt_mpegts()
@ -1212,9 +1252,9 @@ srs_error_t SrsServer::accept_srt_client(srs_srt_t srt_fd)
srs_assert(resource);
// directly enqueue, the cycle thread will remove the client.
_srs_conn_manager->add(resource);
conn_manager_->add(resource);
// Note that conn is managed by _srs_conn_manager, so we don't need to free it.
// Note that conn is managed by conn_manager_, so we don't need to free it.
ISrsStartable *conn = dynamic_cast<ISrsStartable *>(resource);
if ((err = conn->start()) != srs_success) {
return srs_error_wrap(err, "start srt conn coroutine");
@ -1242,7 +1282,7 @@ srs_error_t SrsServer::srt_fd_to_resource(srs_srt_t srt_fd, ISrsResource **pr)
SrsContextRestore(_srs_context->get_id());
// Convert to SRT connection.
*pr = new SrsMpegtsSrtConn(_srs_conn_manager, srt_fd, ip, port);
*pr = new SrsMpegtsSrtConn(conn_manager_, srt_fd, ip, port);
return err;
}
@ -1459,7 +1499,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf
} else {
string key = listener == https_listener_ ? config_->get_https_stream_ssl_key() : "";
string cert = listener == https_listener_ ? config_->get_https_stream_ssl_cert() : "";
resource = new SrsHttpxConn(_srs_conn_manager, io, http_server_, ip, port, key, cert);
resource = new SrsHttpxConn(conn_manager_, io, http_server_, ip, port, key, cert);
}
}
@ -1467,27 +1507,31 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf
if (!resource) {
if (listener == rtmp_listener_) {
SrsRtmpTransport *transport = new SrsRtmpTransport(stfd2);
resource = new SrsRtmpConn(this, transport, ip, port);
SrsRtmpConn *conn = new SrsRtmpConn(this, transport, ip, port);
conn->assemble();
resource = conn;
} else if (listener == rtmps_listener_) {
SrsRtmpTransport *transport = new SrsRtmpsTransport(stfd2);
resource = new SrsRtmpConn(this, transport, ip, port);
SrsRtmpConn *conn = new SrsRtmpConn(this, transport, ip, port);
conn->assemble();
resource = conn;
} else if (listener == api_listener_ || listener == apis_listener_) {
string key = listener == apis_listener_ ? config_->get_https_api_ssl_key() : "";
string cert = listener == apis_listener_ ? config_->get_https_api_ssl_cert() : "";
resource = new SrsHttpxConn(_srs_conn_manager, new SrsTcpConnection(stfd2), http_api_mux_, ip, port, key, cert);
resource = new SrsHttpxConn(conn_manager_, new SrsTcpConnection(stfd2), http_api_mux_, ip, port, key, cert);
} else if (listener == http_listener_ || listener == https_listener_) {
string key = listener == https_listener_ ? config_->get_https_stream_ssl_key() : "";
string cert = listener == https_listener_ ? config_->get_https_stream_ssl_cert() : "";
resource = new SrsHttpxConn(_srs_conn_manager, new SrsTcpConnection(stfd2), http_server_, ip, port, key, cert);
resource = new SrsHttpxConn(conn_manager_, new SrsTcpConnection(stfd2), http_server_, ip, port, key, cert);
} else if (listener == webrtc_listener_) {
resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port);
#ifdef SRS_RTSP
} else if (listener == rtsp_listener_) {
resource = new SrsRtspConnection(_srs_conn_manager, new SrsTcpConnection(stfd2), ip, port);
resource = new SrsRtspConnection(conn_manager_, new SrsTcpConnection(stfd2), ip, port);
#endif
} else if (listener == exporter_listener_) {
// TODO: FIXME: Maybe should support https metrics.
resource = new SrsHttpxConn(_srs_conn_manager, new SrsTcpConnection(stfd2), http_api_mux_, ip, port, "", "");
resource = new SrsHttpxConn(conn_manager_, new SrsTcpConnection(stfd2), http_api_mux_, ip, port, "", "");
} else {
srs_close_stfd(stfd2);
srs_warn("Close for invalid fd=%d, ip=%s:%d", fd, ip.c_str(), port);
@ -1499,7 +1543,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf
SrsRtcTcpConn *raw_conn = dynamic_cast<SrsRtcTcpConn *>(resource);
if (raw_conn) {
SrsSharedResource<SrsRtcTcpConn> *conn = new SrsSharedResource<SrsRtcTcpConn>(raw_conn);
SrsExecutorCoroutine *executor = new SrsExecutorCoroutine(_srs_conn_manager, conn, raw_conn, raw_conn);
SrsExecutorCoroutine *executor = new SrsExecutorCoroutine(conn_manager_, conn, raw_conn, raw_conn);
raw_conn->setup_owner(conn, executor, executor);
if ((err = executor->start()) != srs_success) {
srs_freep(executor);
@ -1510,7 +1554,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf
// Use connection manager to manage all the resources.
srs_assert(resource);
_srs_conn_manager->add(resource);
conn_manager_->add(resource);
// If connection is a resource to start, start a coroutine to handle it.
// Note that conn is managed by conn_manager, so we don't need to free it.
@ -1530,9 +1574,9 @@ srs_error_t SrsServer::on_before_connection(const char *label, int fd, const std
// Failed if exceed the connection limitation.
int max_connections = config_->get_max_connections();
if ((int)_srs_conn_manager->size() >= max_connections) {
if ((int)conn_manager_->size() >= max_connections) {
return srs_error_new(ERROR_EXCEED_CONNECTIONS, "drop %s fd=%d, ip=%s:%d, max=%d, cur=%d for exceed connection limits",
label, fd, ip.c_str(), port, max_connections, (int)_srs_conn_manager->size());
label, fd, ip.c_str(), port, max_connections, (int)conn_manager_->size());
}
return err;
@ -1697,12 +1741,16 @@ SrsInotifyWorker::SrsInotifyWorker(SrsServer *s)
server_ = s;
trd_ = new SrsSTCoroutine("inotify", this);
inotify_fd_ = NULL;
config_ = _srs_config;
}
SrsInotifyWorker::~SrsInotifyWorker()
{
srs_freep(trd_);
srs_close_stfd(inotify_fd_);
config_ = NULL;
}
srs_error_t SrsInotifyWorker::start()

View File

@ -57,6 +57,18 @@ class SrsSrtEventLoop;
class SrsRtcSessionManager;
class SrsPidFileLocker;
class ISrsAppConfig;
class ISrsLiveSourceManager;
class ISrsResourceManager;
class ISrsDtlsCertificate;
class ISrsAsyncCallWorker;
class ISrsCircuitBreaker;
class ISrsSrtSourceManager;
class ISrsRtcSourceManager;
class ISrsRtspSourceManager;
class ISrsLog;
class ISrsStatistic;
class ISrsHourGlass;
class SrsAppFactory;
// Initialize global shared variables cross all threads.
extern srs_error_t srs_global_initialize();
@ -73,6 +85,20 @@ class SrsServer : public ISrsReloadHandler, // Reload framework for permormance
{
private:
ISrsAppConfig *config_;
ISrsLiveSourceManager *live_sources_;
ISrsResourceManager *conn_manager_;
ISrsDtlsCertificate *rtc_dtls_certificate_;
ISrsAsyncCallWorker *dvr_async_;
ISrsCircuitBreaker *circuit_breaker_;
ISrsSrtSourceManager *srt_sources_;
ISrsRtcSourceManager *rtc_sources_;
ISrsRtspSourceManager *rtsp_sources_;
#ifdef SRS_GB28181
ISrsResourceManager *gb_manager_;
#endif
ISrsLog *log_;
ISrsStatistic *stat_;
SrsAppFactory *app_factory_;
private:
ISrsHttpServeMux *http_api_mux_;
@ -81,7 +107,7 @@ private:
private:
SrsHttpHeartbeat *http_heartbeat_;
SrsIngester *ingester_;
SrsHourGlass *timer_;
ISrsHourGlass *timer_;
private:
// PID file manager for process identification and locking.
@ -210,6 +236,7 @@ private:
// update the global static data, for instance, the current time,
// the cpu/mem/network statistic.
virtual srs_error_t do_cycle();
virtual srs_error_t do2_cycle();
// interface ISrsHourGlassHandler
private:
@ -298,6 +325,9 @@ private:
// @see https://github.com/ossrs/srs/issues/1635
class SrsInotifyWorker : public ISrsCoroutineHandler
{
private:
ISrsAppConfig *config_;
private:
SrsServer *server_;
ISrsCoroutine *trd_;
@ -319,7 +349,7 @@ class SrsPidFileLocker
{
private:
ISrsAppConfig *config_;
private:
int pid_fd_;
std::string pid_file_;

View File

@ -674,67 +674,6 @@ void SrsStatistic::dumps_hints_kv(std::stringstream &ss)
}
}
#ifdef SRS_APM
void SrsStatistic::dumps_cls_summaries(SrsClsSugar *sugar)
{
if (!vhosts_.empty()) {
sugar->kv("vhosts", srs_fmt_sprintf("%d", (int)vhosts_.size()));
}
if (!streams_.empty()) {
sugar->kv("streams", srs_fmt_sprintf("%d", (int)streams_.size()));
}
if (!clients_.empty()) {
sugar->kv("clients", srs_fmt_sprintf("%d", (int)clients_.size()));
}
}
void SrsStatistic::dumps_cls_streams(SrsClsSugars *sugars)
{
for (std::map<std::string, SrsStatisticStream *>::iterator it = streams_.begin(); it != streams_.end(); ++it) {
SrsStatisticStream *stream = it->second;
if (!stream->active_ || !stream->nb_clients_) {
continue;
}
SrsClsSugar *sugar = sugars->create();
sugar->kv("hint", "stream");
sugar->kv("version", RTMP_SIG_SRS_VERSION);
sugar->kv("pid", srs_fmt_sprintf("%d", getpid()));
sugar->kv("sid", stream->id_);
sugar->kv("url", stream->url_);
if (stream->frames_->r30s()) {
sugar->kv("fps", srs_fmt_sprintf("%d", stream->frames_->r30s()));
}
if (stream->width_) {
sugar->kv("width", srs_fmt_sprintf("%d", stream->width_));
}
if (stream->height_) {
sugar->kv("height", srs_fmt_sprintf("%d", stream->height_));
}
SrsStatisticClient *pub = find_client(stream->publisher_id_);
if (pub) {
if (pub->kbps_->get_recv_kbps_30s()) {
sugar->kv("recv", srs_fmt_sprintf("%d", pub->kbps_->get_recv_kbps_30s()));
}
if (pub->kbps_->get_send_kbps_30s()) {
sugar->kv("send", srs_fmt_sprintf("%d", pub->kbps_->get_send_kbps_30s()));
}
}
sugar->kv("clients", srs_fmt_sprintf("%d", stream->nb_clients_));
if (stream->kbps_->get_recv_kbps_30s()) {
sugar->kv("recv2", srs_fmt_sprintf("%d", stream->kbps_->get_recv_kbps_30s()));
}
if (stream->kbps_->get_send_kbps_30s()) {
sugar->kv("send2", srs_fmt_sprintf("%d", stream->kbps_->get_send_kbps_30s()));
}
}
}
#endif
SrsStatisticVhost *SrsStatistic::create_vhost(ISrsRequest *req)
{
SrsStatisticVhost *vhost = NULL;

View File

@ -145,6 +145,9 @@ public:
SrsAudioChannels asound_type, SrsAacObjectType aac_object) = 0;
virtual void on_stream_publish(ISrsRequest *req, std::string publisher_id) = 0;
virtual void on_stream_close(ISrsRequest *req) = 0;
virtual void kbps_add_delta(std::string id, ISrsKbpsDelta *delta) = 0;
virtual void kbps_sample() = 0;
virtual srs_error_t on_video_frames(ISrsRequest *req, int nb_frames) = 0;
};
// The global statistic instance.
@ -255,12 +258,6 @@ public:
virtual srs_error_t dumps_clients(SrsJsonArray *arr, int start, int count);
// Dumps the hints about SRS server.
void dumps_hints_kv(std::stringstream &ss);
#ifdef SRS_APM
public:
// Dumps the CLS summary.
void dumps_cls_summaries(SrsClsSugar *sugar);
void dumps_cls_streams(SrsClsSugars *sugars);
#endif
private:
virtual SrsStatisticVhost *create_vhost(ISrsRequest *req);
virtual SrsStatisticStream *create_stream(SrsStatisticVhost *vhost, ISrsRequest *req);

View File

@ -56,6 +56,14 @@ void SrsStreamPublishToken::set_publisher_cid(const SrsContextId &cid)
publisher_cid_ = cid;
}
ISrsStreamPublishTokenManager::ISrsStreamPublishTokenManager()
{
}
ISrsStreamPublishTokenManager::~ISrsStreamPublishTokenManager()
{
}
SrsStreamPublishTokenManager::SrsStreamPublishTokenManager()
{
mutex_ = srs_mutex_new();

View File

@ -53,10 +53,22 @@ public:
void set_publisher_cid(const SrsContextId &cid);
};
// The interface for stream publish token manager
class ISrsStreamPublishTokenManager
{
public:
ISrsStreamPublishTokenManager();
virtual ~ISrsStreamPublishTokenManager();
public:
virtual srs_error_t acquire_token(ISrsRequest *req, SrsStreamPublishToken *&token) = 0;
virtual void release_token(const std::string &stream_url) = 0;
};
// The global stream publish token manager ensures only one publisher
// can acquire a token for a given stream URL at any time.
// This prevents race conditions across all protocols.
class SrsStreamPublishTokenManager
class SrsStreamPublishTokenManager : public ISrsStreamPublishTokenManager
{
private:
// Map of stream URL to token

View File

@ -82,6 +82,20 @@ public:
ISrsResourceManager();
virtual ~ISrsResourceManager();
public:
// Start the resource manager.
virtual srs_error_t start() = 0;
// Check if the resource manager is empty.
virtual bool empty() = 0;
// Get the number of resources.
virtual size_t size() = 0;
public:
// Add a resource to the manager.
virtual void add(ISrsResource *conn, bool *exists = NULL) = 0;
// Get resource at specified index.
virtual ISrsResource *at(int index) = 0;
public:
// Remove then free the specified connection. Note that the manager always free c resource,
// in the same coroutine or another coroutine. Some manager may support add c to a map, it

View File

@ -91,6 +91,9 @@ srs_error_t prepare_main()
// Prevent the output of srt logs in utest.
srt_setloghandler(NULL, srs_srt_utest_null_log_handler);
// Set SRT log level to FATAL to suppress ERROR and WARNING logs in unit tests.
// LOG_CRIT (2) is the highest level that suppresses most logs.
srt_setloglevel(LOG_CRIT);
_srt_eventloop = new SrsSrtEventLoop();
if ((err = _srt_eventloop->initialize()) != srs_success) {

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,296 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_UTEST_APP10_HPP
#define SRS_UTEST_APP10_HPP
/*
#include <srs_utest_app10.hpp>
*/
#include <srs_utest.hpp>
#include <srs_utest_app6.hpp>
#include <srs_protocol_http_stack.hpp>
#include <srs_kernel_hourglass.hpp>
#include <srs_app_factory.hpp>
#include <srs_app_rtc_server.hpp>
#include <srs_app_heartbeat.hpp>
// Mock config for testing SrsServer::listen()
class MockAppConfigForServerListen : public MockAppConfig
{
public:
std::vector<std::string> rtmp_listens_;
bool rtmps_enabled_;
std::vector<std::string> rtmps_listens_;
bool http_api_enabled_;
std::vector<std::string> http_api_listens_;
bool https_api_enabled_;
std::vector<std::string> https_api_listens_;
bool http_stream_enabled_;
std::vector<std::string> http_stream_listens_;
bool https_stream_enabled_;
std::vector<std::string> https_stream_listens_;
bool rtc_server_tcp_enabled_;
std::vector<std::string> rtc_server_tcp_listens_;
std::string rtc_server_protocol_;
bool rtsp_server_enabled_;
std::vector<std::string> rtsp_server_listens_;
bool exporter_enabled_;
std::string exporter_listen_;
public:
MockAppConfigForServerListen();
virtual ~MockAppConfigForServerListen();
public:
virtual std::vector<std::string> get_listens();
virtual bool get_rtmps_enabled();
virtual std::vector<std::string> get_rtmps_listen();
virtual bool get_http_api_enabled();
virtual std::vector<std::string> get_http_api_listens();
virtual bool get_https_api_enabled();
virtual std::vector<std::string> get_https_api_listens();
virtual bool get_http_stream_enabled();
virtual std::vector<std::string> get_http_stream_listens();
virtual bool get_https_stream_enabled();
virtual std::vector<std::string> get_https_stream_listens();
virtual bool get_rtc_server_tcp_enabled();
virtual std::vector<std::string> get_rtc_server_tcp_listens();
virtual std::string get_rtc_server_protocol();
virtual bool get_rtsp_server_enabled();
virtual std::vector<std::string> get_rtsp_server_listens();
virtual bool get_exporter_enabled();
virtual std::string get_exporter_listen();
};
// Mock ISrsHttpServeMux for testing SrsServer::http_handle()
class MockHttpServeMux : public ISrsHttpServeMux
{
public:
int handle_count_;
std::vector<std::string> patterns_;
public:
MockHttpServeMux();
virtual ~MockHttpServeMux();
public:
virtual srs_error_t handle(std::string pattern, ISrsHttpHandler *handler);
virtual srs_error_t serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r);
};
// Mock ISrsLog for testing SrsServer::on_signal()
class MockLogForSignal : public ISrsLog
{
public:
int reopen_count_;
public:
MockLogForSignal();
virtual ~MockLogForSignal();
public:
virtual srs_error_t initialize();
virtual void reopen();
virtual void log(SrsLogLevel level, const char *tag, const SrsContextId &context_id, const char *fmt, va_list args);
};
// Mock ISrsAppConfig for testing SrsServer::on_signal()
class MockAppConfigForSignal : public MockAppConfig
{
public:
bool force_grace_quit_;
public:
MockAppConfigForSignal();
virtual ~MockAppConfigForSignal();
public:
virtual bool is_force_grace_quit();
};
// Mock ISrsAppConfig for testing SrsServer::do2_cycle()
class MockAppConfigForDo2Cycle : public MockAppConfig
{
public:
srs_error_t reload_error_;
srs_error_t persistence_error_;
int reload_count_;
int persistence_count_;
SrsReloadState reload_state_;
public:
MockAppConfigForDo2Cycle();
virtual ~MockAppConfigForDo2Cycle();
public:
virtual srs_error_t reload(SrsReloadState *pstate);
virtual srs_error_t persistence();
void reset();
};
// Mock ISrsHourGlass for testing SrsServer::setup_ticks()
class MockHourGlassForSetupTicks : public ISrsHourGlass
{
public:
int tick_count_;
int start_count_;
std::vector<int> tick_events_;
std::vector<srs_utime_t> tick_intervals_;
public:
MockHourGlassForSetupTicks();
virtual ~MockHourGlassForSetupTicks();
public:
virtual srs_error_t start();
virtual void stop();
virtual srs_error_t tick(srs_utime_t interval);
virtual srs_error_t tick(int event, srs_utime_t interval);
virtual void untick(int event);
};
// Mock SrsAppFactory for testing SrsServer::setup_ticks()
class MockAppFactoryForSetupTicks : public SrsAppFactory
{
public:
MockHourGlassForSetupTicks *mock_hourglass_;
public:
MockAppFactoryForSetupTicks();
virtual ~MockAppFactoryForSetupTicks();
public:
virtual ISrsHourGlass *create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval);
};
// Mock ISrsAppConfig for testing SrsServer::setup_ticks()
class MockAppConfigForSetupTicks : public MockAppConfig
{
public:
bool stats_enabled_;
bool heartbeat_enabled_;
srs_utime_t heartbeat_interval_;
public:
MockAppConfigForSetupTicks();
virtual ~MockAppConfigForSetupTicks();
public:
virtual bool get_stats_enabled();
virtual bool get_heartbeat_enabled();
virtual srs_utime_t get_heartbeat_interval();
};
// Mock SrsRtcSessionManager for testing SrsServer::notify()
class MockRtcSessionManagerForNotify : public SrsRtcSessionManager
{
public:
int update_rtc_sessions_count_;
public:
MockRtcSessionManagerForNotify();
virtual ~MockRtcSessionManagerForNotify();
public:
virtual void srs_update_rtc_sessions();
};
// Mock SrsHttpHeartbeat for testing SrsServer::notify()
class MockHttpHeartbeatForNotify : public SrsHttpHeartbeat
{
public:
int heartbeat_count_;
public:
MockHttpHeartbeatForNotify();
virtual ~MockHttpHeartbeatForNotify();
public:
virtual void heartbeat();
};
// Mock connection manager for testing SrsServer::resample_kbps()
class MockConnectionManagerForResampleKbps : public ISrsResourceManager
{
public:
std::vector<ISrsResource *> connections_;
public:
MockConnectionManagerForResampleKbps();
virtual ~MockConnectionManagerForResampleKbps();
public:
virtual srs_error_t start();
virtual bool empty();
virtual size_t size();
virtual void add(ISrsResource *conn, bool *exists = NULL);
virtual ISrsResource *at(int index);
virtual void remove(ISrsResource *c);
virtual void subscribe(ISrsDisposingHandler *h);
virtual void unsubscribe(ISrsDisposingHandler *h);
};
// Mock statistic for testing SrsServer::resample_kbps()
class MockStatisticForResampleKbps : public ISrsStatistic
{
public:
int kbps_add_delta_count_;
int kbps_sample_count_;
public:
MockStatisticForResampleKbps();
virtual ~MockStatisticForResampleKbps();
public:
virtual void on_disconnect(std::string id, srs_error_t err);
virtual srs_error_t on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type);
virtual srs_error_t on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height);
virtual srs_error_t on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object);
virtual void on_stream_publish(ISrsRequest *req, std::string publisher_id);
virtual void on_stream_close(ISrsRequest *req);
virtual void kbps_add_delta(std::string id, ISrsKbpsDelta *delta);
virtual void kbps_sample();
virtual srs_error_t on_video_frames(ISrsRequest *req, int nb_frames);
void reset();
};
// Mock config for testing SrsServer::on_before_connection()
class MockAppConfigForConnectionLimit : public MockAppConfig
{
public:
int max_connections_;
public:
MockAppConfigForConnectionLimit();
virtual ~MockAppConfigForConnectionLimit();
public:
virtual int get_max_connections();
};
// Mock connection manager for testing SrsServer::on_before_connection()
class MockConnectionManagerForConnectionLimit : public ISrsResourceManager
{
public:
size_t connection_count_;
public:
MockConnectionManagerForConnectionLimit();
virtual ~MockConnectionManagerForConnectionLimit();
public:
virtual srs_error_t start();
virtual bool empty();
virtual size_t size();
virtual void add(ISrsResource *conn, bool *exists = NULL);
virtual ISrsResource *at(int index);
virtual void remove(ISrsResource *c);
virtual void subscribe(ISrsDisposingHandler *h);
virtual void unsubscribe(ISrsDisposingHandler *h);
};
#endif

View File

@ -2690,6 +2690,21 @@ void MockRtcStatistic::on_stream_close(ISrsRequest *req)
// Do nothing in mock
}
void MockRtcStatistic::kbps_add_delta(std::string id, ISrsKbpsDelta *delta)
{
// Do nothing in mock
}
void MockRtcStatistic::kbps_sample()
{
// Do nothing in mock
}
srs_error_t MockRtcStatistic::on_video_frames(ISrsRequest *req, int nb_frames)
{
return srs_success;
}
// Unit tests for SrsRtcAsyncCallOnStop::call()
VOID TEST(RtcAsyncCallOnStopTest, CallWithHttpHooksDisabled)
{
@ -4303,6 +4318,17 @@ SrsSharedPtr<SrsLiveSource> MockLiveSourceManager::fetch(ISrsRequest *r)
return mock_source_;
}
void MockLiveSourceManager::dispose()
{
// Mock implementation - no-op for testing
}
srs_error_t MockLiveSourceManager::initialize()
{
// Mock implementation - always succeeds
return srs_success;
}
void MockLiveSourceManager::set_fetch_or_create_error(srs_error_t err)
{
srs_freep(fetch_or_create_error_);

View File

@ -288,8 +288,43 @@ public:
virtual bool get_exporter_enabled() { return false; }
virtual std::string get_exporter_listen() { return ""; }
virtual bool get_stats_enabled() { return false; }
virtual int get_stats_network() { return 0; }
virtual bool get_heartbeat_enabled() { return false; }
virtual srs_utime_t get_heartbeat_interval() { return 0; }
virtual std::string get_rtmps_ssl_cert() { return ""; }
virtual std::string get_rtmps_ssl_key() { return ""; }
virtual SrsConfDirective *get_vhost(std::string vhost, bool try_default_vhost = true) { return NULL; }
virtual bool get_vhost_enabled(std::string vhost) { return true; }
virtual bool get_debug_srs_upnode(std::string vhost) { return true; }
virtual int get_out_ack_size(std::string vhost) { return 2500000; }
virtual int get_in_ack_size(std::string vhost) { return 2500000; }
virtual int get_chunk_size(std::string vhost) { return 60000; }
virtual bool get_gop_cache(std::string vhost) { return true; }
virtual int get_gop_cache_max_frames(std::string vhost) { return 2500; }
virtual bool get_tcp_nodelay(std::string vhost) { return false; }
virtual srs_utime_t get_mw_sleep(std::string vhost, bool is_rtc = false) { return 350 * SRS_UTIME_MILLISECONDS; }
virtual srs_utime_t get_send_min_interval(std::string vhost) { return 0; }
virtual bool get_mr_enabled(std::string vhost) { return false; }
virtual srs_utime_t get_mr_sleep(std::string vhost) { return 350 * SRS_UTIME_MILLISECONDS; }
virtual srs_utime_t get_publish_1stpkt_timeout(std::string vhost) { return 20000 * SRS_UTIME_MILLISECONDS; }
virtual srs_utime_t get_publish_normal_timeout(std::string vhost) { return 5000 * SRS_UTIME_MILLISECONDS; }
virtual srs_utime_t get_publish_kickoff_for_idle(std::string vhost) { return 0; }
virtual bool get_refer_enabled(std::string vhost) { return false; }
virtual SrsConfDirective *get_refer_all(std::string vhost) { return NULL; }
virtual SrsConfDirective *get_refer_play(std::string vhost) { return NULL; }
virtual SrsConfDirective *get_refer_publish(std::string vhost) { return NULL; }
virtual bool get_vhost_origin_cluster(std::string vhost) { return false; }
virtual std::vector<std::string> get_vhost_coworkers(std::string vhost) { return std::vector<std::string>(); }
virtual bool get_vhost_edge_token_traverse(std::string vhost) { return false; }
virtual SrsConfDirective *get_vhost_edge_origin(std::string vhost) { return NULL; }
virtual SrsConfDirective *get_vhost_on_connect(std::string vhost) { return NULL; }
virtual SrsConfDirective *get_vhost_on_close(std::string vhost) { return NULL; }
virtual SrsConfDirective *get_vhost_on_publish(std::string vhost) { return NULL; }
virtual SrsConfDirective *get_vhost_on_play(std::string vhost) { return NULL; }
virtual bool get_rtc_enabled(std::string vhost) { return false; }
virtual bool get_rtsp_enabled(std::string vhost) { return false; }
virtual bool get_rtc_from_rtmp(std::string vhost) { return false; }
virtual bool get_rtsp_from_rtmp(std::string vhost) { return false; }
// ISrsAppConfig methods
virtual bool get_vhost_http_hooks_enabled(std::string vhost);
virtual SrsConfDirective *get_vhost_on_stop(std::string vhost);
@ -421,6 +456,9 @@ public:
virtual srs_error_t on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object);
virtual void on_stream_publish(ISrsRequest *req, std::string publisher_id);
virtual void on_stream_close(ISrsRequest *req);
virtual void kbps_add_delta(std::string id, ISrsKbpsDelta *delta);
virtual void kbps_sample();
virtual srs_error_t on_video_frames(ISrsRequest *req, int nb_frames);
void set_on_client_error(srs_error_t err);
void reset();
};
@ -653,6 +691,8 @@ public:
virtual ~MockLiveSourceManager();
virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsLiveSource> &pps);
virtual SrsSharedPtr<SrsLiveSource> fetch(ISrsRequest *r);
virtual void dispose();
virtual srs_error_t initialize();
void set_fetch_or_create_error(srs_error_t err);
void set_can_publish(bool can_publish);
void reset();

View File

@ -1002,6 +1002,30 @@ MockConnectionManagerForExpire::~MockConnectionManagerForExpire()
{
}
srs_error_t MockConnectionManagerForExpire::start()
{
return srs_success;
}
bool MockConnectionManagerForExpire::empty()
{
return true;
}
size_t MockConnectionManagerForExpire::size()
{
return 0;
}
void MockConnectionManagerForExpire::add(ISrsResource * /*conn*/, bool * /*exists*/)
{
}
ISrsResource *MockConnectionManagerForExpire::at(int /*index*/)
{
return NULL;
}
void MockConnectionManagerForExpire::remove(ISrsResource *c)
{
removed_resource_ = c;

View File

@ -101,6 +101,11 @@ public:
virtual ~MockConnectionManagerForExpire();
public:
virtual srs_error_t start();
virtual bool empty();
virtual size_t size();
virtual void add(ISrsResource *conn, bool *exists = NULL);
virtual ISrsResource *at(int index);
virtual void remove(ISrsResource *c);
virtual void subscribe(ISrsDisposingHandler *h);
virtual void unsubscribe(ISrsDisposingHandler *h);

View File

@ -1901,6 +1901,21 @@ void MockStatisticForOriginHub::on_stream_close(ISrsRequest *req)
{
}
void MockStatisticForOriginHub::kbps_add_delta(std::string id, ISrsKbpsDelta *delta)
{
// Do nothing in mock
}
void MockStatisticForOriginHub::kbps_sample()
{
// Do nothing in mock
}
srs_error_t MockStatisticForOriginHub::on_video_frames(ISrsRequest *req, int nb_frames)
{
return srs_success;
}
// Mock ISrsNgExec implementation
MockNgExecForOriginHub::MockNgExecForOriginHub()
{

View File

@ -198,6 +198,9 @@ public:
virtual srs_error_t on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object);
virtual void on_stream_publish(ISrsRequest *req, std::string publisher_id);
virtual void on_stream_close(ISrsRequest *req);
virtual void kbps_add_delta(std::string id, ISrsKbpsDelta *delta);
virtual void kbps_sample();
virtual srs_error_t on_video_frames(ISrsRequest *req, int nb_frames);
};
// Mock ISrsNgExec for testing SrsOriginHub::on_publish

View File

@ -1301,6 +1301,10 @@ VOID TEST(KernelErrorTest, AsanReportCallback)
{
#ifdef SRS_SANITIZER_LOG
// Test asan_report_callback function with various input formats
// Temporarily disable log output to avoid cluttering test results
MockEmptyLog* mock_log = dynamic_cast<MockEmptyLog*>(_srs_log);
SrsLogLevel original_level = mock_log->level_;
mock_log->level_ = SrsLogLevelDisabled;
// Test with simple backtrace line
const char *simple_trace = " #0 0x555555555820 in foo /path/to/file.cpp:123";
@ -1349,6 +1353,9 @@ VOID TEST(KernelErrorTest, AsanReportCallback)
asan_report_callback(malformed_trace);
// Function should not crash with malformed input
EXPECT_TRUE(true);
// Restore original log level
mock_log->level_ = original_level;
#else
// On builds without SRS_SANITIZER_LOG, just pass the test
EXPECT_TRUE(true);

View File

@ -1166,83 +1166,6 @@ VOID TEST(TCPConnectionTest, BufferedReadWriterWriteMethods)
}
}
VOID TEST(TCPConnectionTest, TcpConnectionTimeoutAndStats)
{
srs_error_t err;
// Test SrsTcpConnection timeout and statistics methods
if (true) {
SrsTestTcpServer server("127.0.0.1");
HELPER_ASSERT_SUCCESS(server.start());
// Give server time to start
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
SrsTestTcpClient client("127.0.0.1", server.get_port());
HELPER_ASSERT_SUCCESS(client.connect());
// Give time for connection to be established
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
SrsTcpConnection *server_conn = server.get_connection();
SrsTcpConnection *client_conn = client.get_connection();
ASSERT_TRUE(server_conn != NULL);
ASSERT_TRUE(client_conn != NULL);
// Test timeout methods
srs_utime_t original_recv_timeout = server_conn->get_recv_timeout();
// Set new receive timeout
srs_utime_t new_recv_timeout = 10 * SRS_UTIME_SECONDS;
server_conn->set_recv_timeout(new_recv_timeout);
// Verify timeout was set correctly
EXPECT_EQ(new_recv_timeout, server_conn->get_recv_timeout());
// Test statistics methods - initial values
int64_t initial_recv_bytes = server_conn->get_recv_bytes();
int64_t initial_send_bytes = server_conn->get_send_bytes();
// Client sends data to server to test recv statistics
string test_msg = "Hello TCP Connection Statistics!";
ssize_t nwrite = 0;
HELPER_ASSERT_SUCCESS(client_conn->write((void *)test_msg.data(), test_msg.length(), &nwrite));
EXPECT_EQ((ssize_t)test_msg.length(), nwrite);
// Server reads data using read_fully
char read_buf[64];
ssize_t nread = 0;
HELPER_ASSERT_SUCCESS(server_conn->read_fully(read_buf, test_msg.length(), &nread));
EXPECT_EQ((ssize_t)test_msg.length(), nread);
EXPECT_STREQ(test_msg.c_str(), string(read_buf, nread).c_str());
// Check that recv bytes increased
int64_t final_recv_bytes = server_conn->get_recv_bytes();
EXPECT_GT(final_recv_bytes, initial_recv_bytes);
// Server sends response to test send statistics
string response_msg = "Response from server!";
ssize_t nwrite_resp = 0;
HELPER_ASSERT_SUCCESS(server_conn->write((void *)response_msg.data(), response_msg.length(), &nwrite_resp));
EXPECT_EQ((ssize_t)response_msg.length(), nwrite_resp);
// Check that send bytes increased
int64_t final_send_bytes = server_conn->get_send_bytes();
EXPECT_GT(final_send_bytes, initial_send_bytes);
// Client reads response to verify
char response_buf[32];
ssize_t nread_resp = 0;
HELPER_ASSERT_SUCCESS(client_conn->read_fully(response_buf, response_msg.length(), &nread_resp));
EXPECT_EQ((ssize_t)response_msg.length(), nread_resp);
EXPECT_STREQ(response_msg.c_str(), string(response_buf, nread_resp).c_str());
// Restore original timeout
server_conn->set_recv_timeout(original_recv_timeout);
}
}
VOID TEST(TCPConnectionTest, TcpConnectionReadFully)
{
srs_error_t err;

View File

@ -1352,6 +1352,30 @@ MockConnectionManager::~MockConnectionManager()
{
}
srs_error_t MockConnectionManager::start()
{
return srs_success;
}
bool MockConnectionManager::empty()
{
return true;
}
size_t MockConnectionManager::size()
{
return 0;
}
void MockConnectionManager::add(ISrsResource * /*conn*/, bool * /*exists*/)
{
}
ISrsResource *MockConnectionManager::at(int /*index*/)
{
return NULL;
}
void MockConnectionManager::remove(ISrsResource * /*c*/)
{
}

View File

@ -88,6 +88,11 @@ class MockConnectionManager : public ISrsResourceManager
public:
MockConnectionManager();
virtual ~MockConnectionManager();
virtual srs_error_t start();
virtual bool empty();
virtual size_t size();
virtual void add(ISrsResource *conn, bool *exists = NULL);
virtual ISrsResource *at(int index);
virtual void remove(ISrsResource *c);
virtual void subscribe(ISrsDisposingHandler *h);
virtual void unsubscribe(ISrsDisposingHandler *h);