diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 63b758046..d55bffafb 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -171,6 +171,11 @@ void SrsAppCasterFlv::add_with_fast_id(uint64_t id, ISrsResource *conn) manager_->add_with_fast_id(id, conn); } +void SrsAppCasterFlv::add_with_name(const std::string &name, ISrsResource *conn) +{ + manager_->add_with_name(name, conn); +} + ISrsResource *SrsAppCasterFlv::at(int index) { return manager_->at(index); diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index 95dcb37f7..8630176ba 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -76,6 +76,7 @@ public: virtual void add(ISrsResource *conn, bool *exists = NULL); virtual void add_with_id(const std::string &id, ISrsResource *conn); virtual void add_with_fast_id(uint64_t id, ISrsResource *conn); + virtual void add_with_name(const std::string &name, ISrsResource *conn); virtual ISrsResource *at(int index); virtual ISrsResource *find_by_id(std::string id); virtual ISrsResource *find_by_fast_id(uint64_t id); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 507367477..b3d0da371 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -347,6 +347,12 @@ public: virtual std::vector get_rtc_server_listens() = 0; virtual int get_rtc_server_reuseport() = 0; virtual bool get_rtc_server_encrypt() = 0; + virtual bool get_api_as_candidates() = 0; + virtual bool get_resolve_api_domain() = 0; + virtual bool get_keep_api_domain() = 0; + virtual std::string get_rtc_server_candidates() = 0; + virtual bool get_use_auto_detect_network_ip() = 0; + virtual std::string get_rtc_server_ip_family() = 0; public: // RTSP config @@ -453,6 +459,8 @@ public: virtual bool get_rtc_twcc_enabled(std::string vhost) = 0; virtual bool get_srt_enabled() = 0; virtual bool get_srt_enabled(std::string vhost) = 0; + virtual std::string get_srt_default_streamid() = 0; + virtual bool get_srt_to_rtmp(std::string vhost) = 0; virtual bool get_rtc_to_rtmp(std::string vhost) = 0; virtual srs_utime_t get_rtc_stun_timeout(std::string vhost) = 0; virtual bool get_rtc_stun_strict_check(std::string vhost) = 0; diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index bcbca58e9..d9c2d1577 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -177,11 +177,11 @@ bool srs_is_rtcp(const uint8_t *data, size_t len) return (len >= 12) && (data[0] & 0x80) && (data[1] >= 192 && data[1] <= 223); } -srs_error_t api_server_as_candidates(string api, set &candidate_ips) +srs_error_t api_server_as_candidates(ISrsAppConfig *config, string api, set &candidate_ips) { srs_error_t err = srs_success; - if (api.empty() || !_srs_config->get_api_as_candidates()) { + if (api.empty() || !config->get_api_as_candidates()) { return err; } @@ -194,12 +194,12 @@ srs_error_t api_server_as_candidates(string api, set &candidate_ips) } // Whether add domain name. - if (!srs_net_is_ipv4(hostname) && _srs_config->get_keep_api_domain()) { + if (!srs_net_is_ipv4(hostname) && config->get_keep_api_domain()) { candidate_ips.insert(hostname); } // Try to parse the domain name if not IP. - if (!srs_net_is_ipv4(hostname) && _srs_config->get_resolve_api_domain()) { + if (!srs_net_is_ipv4(hostname) && config->get_resolve_api_domain()) { int family = 0; string ip = srs_dns_resolve(hostname, family); if (ip.empty() || ip == SRS_CONSTS_LOCALHOST || ip == SRS_CONSTS_LOOPBACK || ip == SRS_CONSTS_LOOPBACK6) { @@ -218,7 +218,7 @@ srs_error_t api_server_as_candidates(string api, set &candidate_ips) return err; } -set discover_candidates(SrsRtcUserConfig *ruc) +set discover_candidates(SrsProtocolUtility *utility, ISrsAppConfig *config, SrsRtcUserConfig *ruc) { srs_error_t err = srs_success; @@ -229,29 +229,28 @@ set discover_candidates(SrsRtcUserConfig *ruc) } // Try to discover from api of request, if api_as_candidates enabled. - if ((err = api_server_as_candidates(ruc->req_->host_, candidate_ips)) != srs_success) { + if ((err = api_server_as_candidates(config, ruc->req_->host_, candidate_ips)) != srs_success) { srs_warn("ignore discovering ip from api %s, err %s", ruc->req_->host_.c_str(), srs_error_summary(err).c_str()); srs_freep(err); } // If not * or 0.0.0.0, use the candidate as exposed IP. - string candidate = _srs_config->get_rtc_server_candidates(); + string candidate = config->get_rtc_server_candidates(); if (candidate != "*" && candidate != "0.0.0.0") { candidate_ips.insert(candidate); return candidate_ips; } // All automatically detected IP list. - SrsProtocolUtility utility; - vector &ips = utility.local_ips(); + vector &ips = utility->local_ips(); if (ips.empty()) { return candidate_ips; } // Discover from local network interface addresses. - if (_srs_config->get_use_auto_detect_network_ip()) { + if (config->get_use_auto_detect_network_ip()) { // We try to find the best match candidates, no loopback. - string family = _srs_config->get_rtc_server_ip_family(); + string family = config->get_rtc_server_ip_family(); for (int i = 0; i < (int)ips.size(); ++i) { SrsIPAddress *ip = ips[i]; if (ip->is_loopback_) { @@ -313,12 +312,16 @@ SrsRtcUserConfig::~SrsRtcUserConfig() SrsRtcSessionManager::SrsRtcSessionManager() { rtc_async_ = new SrsAsyncCallWorker(); + + conn_manager_ = _srs_conn_manager; } SrsRtcSessionManager::~SrsRtcSessionManager() { rtc_async_->stop(); srs_freep(rtc_async_); + + conn_manager_ = NULL; } srs_error_t SrsRtcSessionManager::initialize() @@ -334,7 +337,7 @@ srs_error_t SrsRtcSessionManager::initialize() SrsRtcConnection *SrsRtcSessionManager::find_rtc_session_by_username(const std::string &username) { - ISrsResource *conn = _srs_conn_manager->find_by_name(username); + ISrsResource *conn = conn_manager_->find_by_name(username); return dynamic_cast(conn); } @@ -410,7 +413,7 @@ srs_error_t SrsRtcSessionManager::do_create_rtc_session(SrsRtcUserConfig *ruc, S std::string username = ""; while (true) { username = local_ufrag + ":" + ruc->remote_sdp_.get_ice_ufrag(); - if (!_srs_conn_manager->find_by_name(username)) { + if (!conn_manager_->find_by_name(username)) { break; } @@ -442,7 +445,8 @@ srs_error_t SrsRtcSessionManager::do_create_rtc_session(SrsRtcUserConfig *ruc, S string protocol = _srs_config->get_rtc_server_protocol(); - set candidates = discover_candidates(ruc); + SrsProtocolUtility utility; + set candidates = discover_candidates(&utility, _srs_config, ruc); for (set::iterator it = candidates.begin(); it != candidates.end(); ++it) { string hostname; int uport = udp_port; @@ -494,7 +498,7 @@ srs_error_t SrsRtcSessionManager::do_create_rtc_session(SrsRtcUserConfig *ruc, S } // We allows username is optional, but it never empty here. - _srs_conn_manager->add_with_name(username, session); + conn_manager_->add_with_name(username, session); return err; } @@ -505,8 +509,8 @@ void SrsRtcSessionManager::srs_update_rtc_sessions() int nn_rtc_conns = 0; // Check all sessions and dispose the dead sessions. - for (int i = 0; i < (int)_srs_conn_manager->size(); i++) { - SrsRtcConnection *session = dynamic_cast(_srs_conn_manager->at(i)); + for (int i = 0; i < (int)conn_manager_->size(); i++) { + SrsRtcConnection *session = dynamic_cast(conn_manager_->at(i)); // Ignore not session, or already disposing. if (!session || session->disposing_) { continue; @@ -525,7 +529,7 @@ void SrsRtcSessionManager::srs_update_rtc_sessions() srs_trace("RTC: session destroy by timeout, username=%s", username.c_str()); // Use manager to free session and notify other objects. - _srs_conn_manager->remove(session); + conn_manager_->remove(session); } // Ignore stats if no RTC connections. @@ -569,11 +573,11 @@ srs_error_t SrsRtcSessionManager::on_udp_packet(ISrsUdpMuxSocket *skt) uint64_t fast_id = skt->fast_id(); // Try fast id first, if not found, search by long peer id. if (fast_id) { - session = (SrsRtcConnection *)_srs_conn_manager->find_by_fast_id(fast_id); + session = (SrsRtcConnection *)conn_manager_->find_by_fast_id(fast_id); } if (!session) { string peer_id = skt->peer_id(); - session = (SrsRtcConnection *)_srs_conn_manager->find_by_id(peer_id); + session = (SrsRtcConnection *)conn_manager_->find_by_id(peer_id); } if (session) { diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index c14a3e858..7b980499c 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -29,6 +29,7 @@ class SrsRtcSource; class SrsResourceManager; class SrsAsyncCallWorker; class ISrsUdpMuxSocket; +class ISrsResourceManager; // The UDP black hole, for developer to use wireshark to catch plaintext packets. // For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole, @@ -85,7 +86,7 @@ public: }; // Discover the candidates for RTC server. -extern std::set discover_candidates(SrsRtcUserConfig *ruc); +extern std::set discover_candidates(SrsProtocolUtility *utility, ISrsAppConfig *config, SrsRtcUserConfig *ruc); // The dns resolve utility, return the resolved ip address. extern std::string srs_dns_resolve(std::string host, int &family); @@ -93,6 +94,9 @@ extern std::string srs_dns_resolve(std::string host, int &family); // RTC session manager to handle WebRTC session lifecycle and management. class SrsRtcSessionManager : public ISrsExecRtcAsyncTask { +private: + ISrsResourceManager *conn_manager_; + private: // WebRTC async call worker for non-blocking operations. SrsAsyncCallWorker *rtc_async_; @@ -122,4 +126,8 @@ public: virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt); }; +// Helper function to discover candidate IPs from API server hostname +// Used by WebRTC ICE candidate discovery +extern srs_error_t api_server_as_candidates(ISrsAppConfig *config, std::string api, std::set &candidate_ips); + #endif diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index 6e2285cfa..d4c149b7d 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -92,7 +92,15 @@ srs_error_t SrsSrtConnection::writev(const iovec *iov, int iov_size, ssize_t *nw return srs_error_new(ERROR_SRT_CONN, "unsupport method"); } -SrsSrtRecvThread::SrsSrtRecvThread(SrsSrtConnection *srt_conn) +ISrsSrtRecvThread::ISrsSrtRecvThread() +{ +} + +ISrsSrtRecvThread::~ISrsSrtRecvThread() +{ +} + +SrsSrtRecvThread::SrsSrtRecvThread(ISrsProtocolReadWriter *srt_conn) { srt_conn_ = srt_conn; trd_ = new SrsSTCoroutine("srt-recv", this, _srs_context->get_id()); @@ -153,6 +161,14 @@ srs_error_t SrsSrtRecvThread::get_recv_err() return srs_error_copy(recv_err_); } +ISrsMpegtsSrtConnection::ISrsMpegtsSrtConnection() +{ +} + +ISrsMpegtsSrtConnection::~ISrsMpegtsSrtConnection() +{ +} + SrsMpegtsSrtConn::SrsMpegtsSrtConn(ISrsResourceManager *resource_manager, srs_srt_t srt_fd, std::string ip, int port) : srt_source_(new SrsSrtSource()) { // Create a identify for this client. @@ -176,6 +192,14 @@ SrsMpegtsSrtConn::SrsMpegtsSrtConn(ISrsResourceManager *resource_manager, srs_sr req_->ip_ = ip; security_ = new SrsSecurity(); + + stat_ = _srs_stat; + config_ = _srs_config; + stream_publish_tokens_ = _srs_stream_publish_tokens; + srt_sources_ = _srs_srt_sources; + live_sources_ = _srs_sources; + rtc_sources_ = _srs_rtc_sources; + hooks_ = _srs_hooks; } SrsMpegtsSrtConn::~SrsMpegtsSrtConn() @@ -187,6 +211,14 @@ SrsMpegtsSrtConn::~SrsMpegtsSrtConn() srs_freep(srt_conn_); srs_freep(req_); srs_freep(security_); + + stat_ = NULL; + config_ = NULL; + stream_publish_tokens_ = NULL; + srt_sources_ = NULL; + live_sources_ = NULL; + rtc_sources_ = NULL; + hooks_ = NULL; } std::string SrsMpegtsSrtConn::desc() @@ -230,9 +262,8 @@ srs_error_t SrsMpegtsSrtConn::cycle() srs_error_t err = do_cycle(); // Update statistic when done. - SrsStatistic *stat = _srs_stat; - stat->kbps_add_delta(get_id().c_str(), delta_); - stat->on_disconnect(get_id().c_str(), err); + stat_->kbps_add_delta(get_id().c_str(), delta_); + stat_->on_disconnect(get_id().c_str(), err); // Notify manager to remove it. // Note that we create this object, so we use manager to remove it. @@ -262,7 +293,7 @@ srs_error_t SrsMpegtsSrtConn::do_cycle() // If streamid empty, using default streamid instead. if (streamid.empty()) { - streamid = _srs_config->get_srt_default_streamid(); + streamid = config_->get_srt_default_streamid(); srs_warn("srt get empty streamid, using default streamid %s instead", streamid.c_str()); } @@ -273,13 +304,13 @@ srs_error_t SrsMpegtsSrtConn::do_cycle() } // discovery vhost, resolve the vhost from config - SrsConfDirective *parsed_vhost = _srs_config->get_vhost(req_->vhost_); + SrsConfDirective *parsed_vhost = config_->get_vhost(req_->vhost_); if (parsed_vhost) { req_->vhost_ = parsed_vhost->arg0(); } - bool srt_enabled = _srs_config->get_srt_enabled(req_->vhost_); - bool edge = _srs_config->get_vhost_is_edge(req_->vhost_); + bool srt_enabled = config_->get_srt_enabled(req_->vhost_); + bool edge = config_->get_vhost_is_edge(req_->vhost_); if (srt_enabled && edge) { srt_enabled = false; @@ -295,7 +326,7 @@ srs_error_t SrsMpegtsSrtConn::do_cycle() // Acquire stream publish token to prevent race conditions across all protocols. SrsStreamPublishToken *publish_token_raw = NULL; - if (mode == SrtModePush && (err = _srs_stream_publish_tokens->acquire_token(req_, publish_token_raw)) != srs_success) { + if (mode == SrtModePush && (err = stream_publish_tokens_->acquire_token(req_, publish_token_raw)) != srs_success) { return srs_error_wrap(err, "acquire stream publish token"); } SrsUniquePtr publish_token(publish_token_raw); @@ -303,7 +334,7 @@ srs_error_t SrsMpegtsSrtConn::do_cycle() srs_trace("stream publish token acquired, type=srt, url=%s", req_->get_stream_url().c_str()); } - if ((err = _srs_srt_sources->fetch_or_create(req_, srt_source_)) != srs_success) { + if ((err = srt_sources_->fetch_or_create(req_, srt_source_)) != srs_success) { return srs_error_wrap(err, "fetch srt source"); } @@ -327,8 +358,7 @@ srs_error_t SrsMpegtsSrtConn::publishing() srs_error_t err = srs_success; // We must do stat the client before hooks, because hooks depends on it. - SrsStatistic *stat = _srs_stat; - if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPublish)) != srs_success) { + if ((err = stat_->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPublish)) != srs_success) { return srs_error_wrap(err, "srt: stat client"); } @@ -356,8 +386,7 @@ srs_error_t SrsMpegtsSrtConn::playing() srs_error_t err = srs_success; // We must do stat the client before hooks, because hooks depends on it. - SrsStatistic *stat = _srs_stat; - if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPlay)) != srs_success) { + if ((err = stat_->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPlay)) != srs_success) { return srs_error_wrap(err, "srt: stat client"); } @@ -387,19 +416,19 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() } // Check rtmp stream is busy. - SrsSharedPtr live_source = _srs_sources->fetch(req_); + SrsSharedPtr live_source = live_sources_->fetch(req_); if (live_source.get() && !live_source->can_publish(false)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "live_source stream %s busy", req_->get_stream_url().c_str()); } - if ((err = _srs_sources->fetch_or_create(req_, live_source)) != srs_success) { + if ((err = live_sources_->fetch_or_create(req_, live_source)) != srs_success) { return srs_error_wrap(err, "create source"); } srs_assert(live_source.get() != NULL); - bool enabled_cache = _srs_config->get_gop_cache(req_->vhost_); - int gcmf = _srs_config->get_gop_cache_max_frames(req_->vhost_); + bool enabled_cache = config_->get_gop_cache(req_->vhost_); + int gcmf = config_->get_gop_cache_max_frames(req_->vhost_); live_source->set_cache(enabled_cache); live_source->set_gop_cache_max_frames(gcmf); @@ -408,9 +437,9 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() // Check whether RTC stream is busy. SrsSharedPtr rtc; - bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); - bool rtc_enabled = _srs_config->get_rtc_enabled(req_->vhost_); - bool edge = _srs_config->get_vhost_is_edge(req_->vhost_); + bool rtc_server_enabled = config_->get_rtc_server_enabled(); + bool rtc_enabled = config_->get_rtc_enabled(req_->vhost_); + bool edge = config_->get_vhost_is_edge(req_->vhost_); if (rtc_enabled && edge) { rtc_enabled = false; @@ -418,7 +447,7 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() } if (rtc_server_enabled && rtc_enabled) { - if ((err = _srs_rtc_sources->fetch_or_create(req_, rtc)) != srs_success) { + if ((err = rtc_sources_->fetch_or_create(req_, rtc)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -430,7 +459,7 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() // Bridge to RTMP and RTC streaming. SrsSrtBridge *bridge = new SrsSrtBridge(); - bool srt_to_rtmp = _srs_config->get_srt_to_rtmp(req_->vhost_); + bool srt_to_rtmp = config_->get_srt_to_rtmp(req_->vhost_); if (srt_to_rtmp && edge) { srt_to_rtmp = false; srs_warn("disable SRT to RTMP for edge vhost=%s", req_->vhost_.c_str()); @@ -440,7 +469,7 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() bridge->enable_srt2rtmp(live_source); } - bool rtmp_to_rtc = _srs_config->get_rtc_from_rtmp(req_->vhost_); + bool rtmp_to_rtc = config_->get_rtc_from_rtmp(req_->vhost_); if (rtmp_to_rtc && edge) { rtmp_to_rtc = false; srs_warn("disable RTMP to WebRTC for edge vhost=%s", req_->vhost_.c_str()); @@ -632,7 +661,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_connect() { srs_error_t err = srs_success; - if (!_srs_config->get_vhost_http_hooks_enabled(req_->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req_->vhost_)) { return err; } @@ -642,7 +671,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_connect() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_connect(req_->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_connect(req_->vhost_); if (!conf) { return err; @@ -653,7 +682,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_connect() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - if ((err = _srs_hooks->on_connect(url, req_)) != srs_success) { + if ((err = hooks_->on_connect(url, req_)) != srs_success) { return srs_error_wrap(err, "srt on_connect %s", url.c_str()); } } @@ -663,7 +692,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_connect() void SrsMpegtsSrtConn::http_hooks_on_close() { - if (!_srs_config->get_vhost_http_hooks_enabled(req_->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req_->vhost_)) { return; } @@ -673,7 +702,7 @@ void SrsMpegtsSrtConn::http_hooks_on_close() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_close(req_->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_close(req_->vhost_); if (!conf) { return; @@ -684,7 +713,7 @@ void SrsMpegtsSrtConn::http_hooks_on_close() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - _srs_hooks->on_close(url, req_, srt_conn_->get_send_bytes(), srt_conn_->get_recv_bytes()); + hooks_->on_close(url, req_, srt_conn_->get_send_bytes(), srt_conn_->get_recv_bytes()); } } @@ -692,7 +721,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_publish() { srs_error_t err = srs_success; - if (!_srs_config->get_vhost_http_hooks_enabled(req_->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req_->vhost_)) { return err; } @@ -702,7 +731,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_publish() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_publish(req_->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_publish(req_->vhost_); if (!conf) { return err; @@ -713,7 +742,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_publish() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - if ((err = _srs_hooks->on_publish(url, req_)) != srs_success) { + if ((err = hooks_->on_publish(url, req_)) != srs_success) { return srs_error_wrap(err, "srt on_publish %s", url.c_str()); } } @@ -723,7 +752,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_publish() void SrsMpegtsSrtConn::http_hooks_on_unpublish() { - if (!_srs_config->get_vhost_http_hooks_enabled(req_->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req_->vhost_)) { return; } @@ -733,7 +762,7 @@ void SrsMpegtsSrtConn::http_hooks_on_unpublish() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_unpublish(req_->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_unpublish(req_->vhost_); if (!conf) { return; @@ -744,7 +773,7 @@ void SrsMpegtsSrtConn::http_hooks_on_unpublish() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - _srs_hooks->on_unpublish(url, req_); + hooks_->on_unpublish(url, req_); } } @@ -752,7 +781,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_play() { srs_error_t err = srs_success; - if (!_srs_config->get_vhost_http_hooks_enabled(req_->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req_->vhost_)) { return err; } @@ -762,7 +791,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_play() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_play(req_->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_play(req_->vhost_); if (!conf) { return err; @@ -773,7 +802,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_play() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - if ((err = _srs_hooks->on_play(url, req_)) != srs_success) { + if ((err = hooks_->on_play(url, req_)) != srs_success) { return srs_error_wrap(err, "srt on_play %s", url.c_str()); } } @@ -783,7 +812,7 @@ srs_error_t SrsMpegtsSrtConn::http_hooks_on_play() void SrsMpegtsSrtConn::http_hooks_on_stop() { - if (!_srs_config->get_vhost_http_hooks_enabled(req_->vhost_)) { + if (!config_->get_vhost_http_hooks_enabled(req_->vhost_)) { return; } @@ -793,7 +822,7 @@ void SrsMpegtsSrtConn::http_hooks_on_stop() vector hooks; if (true) { - SrsConfDirective *conf = _srs_config->get_vhost_on_stop(req_->vhost_); + SrsConfDirective *conf = config_->get_vhost_on_stop(req_->vhost_); if (!conf) { return; @@ -804,7 +833,7 @@ void SrsMpegtsSrtConn::http_hooks_on_stop() for (int i = 0; i < (int)hooks.size(); i++) { std::string url = hooks.at(i); - _srs_hooks->on_stop(url, req_); + hooks_->on_stop(url, req_); } return; diff --git a/trunk/src/app/srs_app_srt_conn.hpp b/trunk/src/app/srs_app_srt_conn.hpp index c041d1811..4b642caa8 100644 --- a/trunk/src/app/srs_app_srt_conn.hpp +++ b/trunk/src/app/srs_app_srt_conn.hpp @@ -23,6 +23,17 @@ class SrsLiveSource; class SrsSrtSource; class SrsSrtServer; class SrsNetworkDelta; +class ISrsNetworkDelta; +class ISrsSrtSocket; +class SrsNetworkKbps; +class ISrsSecurity; +class ISrsStatistic; +class ISrsAppConfig; +class ISrsStreamPublishTokenManager; +class ISrsSrtSourceManager; +class ISrsLiveSourceManager; +class ISrsRtcSourceManager; +class ISrsHttpHooks; // The basic connection of SRS, for SRT based protocols, // all srt connections accept from srt listener must extends from this base class, @@ -52,13 +63,24 @@ private: // The underlayer srt fd handler. srs_srt_t srt_fd_; // The underlayer srt socket. - SrsSrtSocket *srt_skt_; + ISrsSrtSocket *srt_skt_; }; -class SrsSrtRecvThread : public ISrsCoroutineHandler +// The recv thread for SRT connection. +class ISrsSrtRecvThread : public ISrsCoroutineHandler { public: - SrsSrtRecvThread(SrsSrtConnection *srt_conn); + ISrsSrtRecvThread(); + virtual ~ISrsSrtRecvThread(); + +public: +}; + +// The recv thread for SRT connection. +class SrsSrtRecvThread : public ISrsSrtRecvThread +{ +public: + SrsSrtRecvThread(ISrsProtocolReadWriter *srt_conn); ~SrsSrtRecvThread(); // Interface ISrsCoroutineHandler public: @@ -72,17 +94,36 @@ public: srs_error_t get_recv_err(); private: - SrsSrtConnection *srt_conn_; + ISrsProtocolReadWriter *srt_conn_; ISrsCoroutine *trd_; srs_error_t recv_err_; }; // The SRT connection, for client to publish or play stream. -class SrsMpegtsSrtConn : public ISrsConnection, // It's a resource. +class ISrsMpegtsSrtConnection : public ISrsConnection, // It's a resource. public ISrsStartable, public ISrsCoroutineHandler, public ISrsExpire { +public: + ISrsMpegtsSrtConnection(); + virtual ~ISrsMpegtsSrtConnection(); + +public: +}; + +// The SRT connection, for client to publish or play stream. +class SrsMpegtsSrtConn : public ISrsMpegtsSrtConnection +{ +private: + ISrsStatistic *stat_; + ISrsAppConfig *config_; + ISrsStreamPublishTokenManager *stream_publish_tokens_; + ISrsSrtSourceManager *srt_sources_; + ISrsLiveSourceManager *live_sources_; + ISrsRtcSourceManager *rtc_sources_; + ISrsHttpHooks *hooks_; + public: SrsMpegtsSrtConn(ISrsResourceManager *resource_manager, srs_srt_t srt_fd, std::string ip, int port); virtual ~SrsMpegtsSrtConn(); @@ -131,8 +172,8 @@ private: private: ISrsResourceManager *resource_manager_; srs_srt_t srt_fd_; - SrsSrtConnection *srt_conn_; - SrsNetworkDelta *delta_; + ISrsProtocolReadWriter *srt_conn_; + ISrsNetworkDelta *delta_; SrsNetworkKbps *kbps_; std::string ip_; int port_; @@ -140,7 +181,7 @@ private: ISrsRequest *req_; SrsSharedPtr srt_source_; - SrsSecurity *security_; + ISrsSecurity *security_; }; #endif diff --git a/trunk/src/kernel/srs_kernel_resource.hpp b/trunk/src/kernel/srs_kernel_resource.hpp index dc29b3359..d1aeccce2 100644 --- a/trunk/src/kernel/srs_kernel_resource.hpp +++ b/trunk/src/kernel/srs_kernel_resource.hpp @@ -105,6 +105,8 @@ public: virtual ISrsResource *find_by_fast_id(uint64_t id) = 0; // Find resource by name. virtual ISrsResource *find_by_name(std::string name) = 0; + // Add a resource with name to the manager. + virtual void add_with_name(const std::string &name, ISrsResource *conn) = 0; public: // Remove then free the specified connection. Note that the manager always free c resource, diff --git a/trunk/src/protocol/srs_protocol_srt.cpp b/trunk/src/protocol/srs_protocol_srt.cpp index a956ae208..d88d86e65 100644 --- a/trunk/src/protocol/srs_protocol_srt.cpp +++ b/trunk/src/protocol/srs_protocol_srt.cpp @@ -698,6 +698,14 @@ ISrsSrtPoller *srs_srt_poller_new() return new SrsSrtPoller(); } +ISrsSrtSocket::ISrsSrtSocket() +{ +} + +ISrsSrtSocket::~ISrsSrtSocket() +{ +} + SrsSrtSocket::SrsSrtSocket(ISrsSrtPoller *srt_poller, srs_srt_t srt_fd) { srt_poller_ = srt_poller; diff --git a/trunk/src/protocol/srs_protocol_srt.hpp b/trunk/src/protocol/srs_protocol_srt.hpp index c6cff3c30..5e532129a 100644 --- a/trunk/src/protocol/srs_protocol_srt.hpp +++ b/trunk/src/protocol/srs_protocol_srt.hpp @@ -115,8 +115,26 @@ public: }; ISrsSrtPoller *srs_srt_poller_new(); +// Srt socket interface. +class ISrsSrtSocket +{ +public: + ISrsSrtSocket(); + virtual ~ISrsSrtSocket(); + +public: + virtual srs_error_t recvmsg(void *buf, size_t size, ssize_t *nread) = 0; + virtual srs_error_t sendmsg(void *buf, size_t size, ssize_t *nwrite) = 0; + virtual void set_recv_timeout(srs_utime_t tm) = 0; + virtual void set_send_timeout(srs_utime_t tm) = 0; + virtual srs_utime_t get_send_timeout() = 0; + virtual srs_utime_t get_recv_timeout() = 0; + virtual int64_t get_send_bytes() = 0; + virtual int64_t get_recv_bytes() = 0; +}; + // Srt ST socket, wrap SRT io and make it adapt to ST-thread. -class SrsSrtSocket +class SrsSrtSocket : public ISrsSrtSocket { public: SrsSrtSocket(ISrsSrtPoller *srt_poller, srs_srt_t srt_fd); diff --git a/trunk/src/utest/srs_utest_app10.cpp b/trunk/src/utest/srs_utest_app10.cpp index 7a67d698c..2d899d8fd 100644 --- a/trunk/src/utest/srs_utest_app10.cpp +++ b/trunk/src/utest/srs_utest_app10.cpp @@ -707,6 +707,10 @@ void MockConnectionManagerForResampleKbps::add_with_fast_id(uint64_t /*id*/, ISr { } +void MockConnectionManagerForResampleKbps::add_with_name(const std::string & /*name*/, ISrsResource * /*conn*/) +{ +} + ISrsResource *MockConnectionManagerForResampleKbps::at(int index) { if (index < 0 || index >= (int)connections_.size()) { @@ -1002,6 +1006,10 @@ void MockConnectionManagerForConnectionLimit::add_with_fast_id(uint64_t /*id*/, { } +void MockConnectionManagerForConnectionLimit::add_with_name(const std::string & /*name*/, ISrsResource * /*conn*/) +{ +} + ISrsResource *MockConnectionManagerForConnectionLimit::at(int index) { return NULL; diff --git a/trunk/src/utest/srs_utest_app10.hpp b/trunk/src/utest/srs_utest_app10.hpp index 8307fa82e..8d9f33d07 100644 --- a/trunk/src/utest/srs_utest_app10.hpp +++ b/trunk/src/utest/srs_utest_app10.hpp @@ -233,6 +233,7 @@ public: virtual void add(ISrsResource *conn, bool *exists = NULL); virtual void add_with_id(const std::string &id, ISrsResource *conn); virtual void add_with_fast_id(uint64_t id, ISrsResource *conn); + virtual void add_with_name(const std::string &name, ISrsResource *conn); virtual ISrsResource *at(int index); virtual ISrsResource *find_by_id(std::string id); virtual ISrsResource *find_by_fast_id(uint64_t id); @@ -308,6 +309,7 @@ public: virtual void add(ISrsResource *conn, bool *exists = NULL); virtual void add_with_id(const std::string &id, ISrsResource *conn); virtual void add_with_fast_id(uint64_t id, ISrsResource *conn); + virtual void add_with_name(const std::string &name, ISrsResource *conn); virtual ISrsResource *at(int index); virtual ISrsResource *find_by_id(std::string id); virtual ISrsResource *find_by_fast_id(uint64_t id); diff --git a/trunk/src/utest/srs_utest_app14.cpp b/trunk/src/utest/srs_utest_app14.cpp index f15c556fc..50c596151 100644 --- a/trunk/src/utest/srs_utest_app14.cpp +++ b/trunk/src/utest/srs_utest_app14.cpp @@ -859,6 +859,10 @@ void MockResourceManagerForBindSession::add_with_fast_id(uint64_t id, ISrsResour { } +void MockResourceManagerForBindSession::add_with_name(const std::string & /*name*/, ISrsResource * /*conn*/) +{ +} + ISrsResource *MockResourceManagerForBindSession::at(int index) { return NULL; @@ -2135,6 +2139,10 @@ void MockResourceManagerForGbPublish::add_with_fast_id(uint64_t id, ISrsResource fast_id_map_[id] = conn; } +void MockResourceManagerForGbPublish::add_with_name(const std::string & /*name*/, ISrsResource * /*conn*/) +{ +} + ISrsResource *MockResourceManagerForGbPublish::at(int index) { return NULL; @@ -3245,6 +3253,10 @@ void MockResourceManagerForUdpNetwork::add_with_fast_id(uint64_t id, ISrsResourc fast_id_map_[id] = conn; } +void MockResourceManagerForUdpNetwork::add_with_name(const std::string & /*name*/, ISrsResource * /*conn*/) +{ +} + ISrsResource *MockResourceManagerForUdpNetwork::at(int index) { return NULL; @@ -3918,6 +3930,10 @@ void MockResourceManagerForTcpConnHandshake::add_with_fast_id(uint64_t id, ISrsR { } +void MockResourceManagerForTcpConnHandshake::add_with_name(const std::string & /*name*/, ISrsResource * /*conn*/) +{ +} + ISrsResource *MockResourceManagerForTcpConnHandshake::at(int index) { return NULL; diff --git a/trunk/src/utest/srs_utest_app14.hpp b/trunk/src/utest/srs_utest_app14.hpp index 9996f2def..2ce3f1646 100644 --- a/trunk/src/utest/srs_utest_app14.hpp +++ b/trunk/src/utest/srs_utest_app14.hpp @@ -309,6 +309,7 @@ public: virtual void add(ISrsResource *conn, bool *exists = NULL); virtual void add_with_id(const std::string &id, ISrsResource *conn); virtual void add_with_fast_id(uint64_t id, ISrsResource *conn); + virtual void add_with_name(const std::string &name, ISrsResource *conn); virtual ISrsResource *at(int index); virtual ISrsResource *find_by_id(std::string id); virtual ISrsResource *find_by_fast_id(uint64_t id); @@ -531,6 +532,7 @@ public: virtual void add(ISrsResource *conn, bool *exists = NULL); virtual void add_with_id(const std::string &id, ISrsResource *conn); virtual void add_with_fast_id(uint64_t id, ISrsResource *conn); + virtual void add_with_name(const std::string &name, ISrsResource *conn); virtual ISrsResource *at(int index); virtual ISrsResource *find_by_id(std::string id); virtual ISrsResource *find_by_fast_id(uint64_t id); @@ -700,6 +702,7 @@ public: virtual void add(ISrsResource *conn, bool *exists = NULL); virtual void add_with_id(const std::string &id, ISrsResource *conn); virtual void add_with_fast_id(uint64_t id, ISrsResource *conn); + virtual void add_with_name(const std::string &name, ISrsResource *conn); virtual ISrsResource *at(int index); virtual ISrsResource *find_by_id(std::string id); virtual ISrsResource *find_by_fast_id(uint64_t id); @@ -796,6 +799,7 @@ public: virtual void add(ISrsResource *conn, bool *exists = NULL); virtual void add_with_id(const std::string &id, ISrsResource *conn); virtual void add_with_fast_id(uint64_t id, ISrsResource *conn); + virtual void add_with_name(const std::string &name, ISrsResource *conn); virtual ISrsResource *at(int index); virtual ISrsResource *find_by_id(std::string id); virtual ISrsResource *find_by_fast_id(uint64_t id); diff --git a/trunk/src/utest/srs_utest_app16.cpp b/trunk/src/utest/srs_utest_app16.cpp index c5828fea0..a69c5ea4f 100644 --- a/trunk/src/utest/srs_utest_app16.cpp +++ b/trunk/src/utest/srs_utest_app16.cpp @@ -10,12 +10,97 @@ using namespace std; #include #include +#include +#include +#include #include #include #include #include #include +// Mock ISrsSrtSocket implementation +MockSrtSocket::MockSrtSocket() +{ + recv_timeout_ = 1 * SRS_UTIME_SECONDS; + send_timeout_ = 1 * SRS_UTIME_SECONDS; + recv_bytes_ = 0; + send_bytes_ = 0; + recvmsg_error_ = srs_success; + sendmsg_error_ = srs_success; + recvmsg_called_count_ = 0; + sendmsg_called_count_ = 0; + last_recv_data_ = ""; + last_send_data_ = ""; +} + +MockSrtSocket::~MockSrtSocket() +{ + srs_freep(recvmsg_error_); + srs_freep(sendmsg_error_); +} + +srs_error_t MockSrtSocket::recvmsg(void *buf, size_t size, ssize_t *nread) +{ + recvmsg_called_count_++; + if (recvmsg_error_ != srs_success) { + return srs_error_copy(recvmsg_error_); + } + + // Simulate receiving data + string test_data = "test data"; + size_t copy_size = srs_min(size, test_data.size()); + memcpy(buf, test_data.c_str(), copy_size); + *nread = copy_size; + recv_bytes_ += copy_size; + last_recv_data_ = string((char *)buf, copy_size); + return srs_success; +} + +srs_error_t MockSrtSocket::sendmsg(void *buf, size_t size, ssize_t *nwrite) +{ + sendmsg_called_count_++; + if (sendmsg_error_ != srs_success) { + return srs_error_copy(sendmsg_error_); + } + + // Simulate sending data + *nwrite = size; + send_bytes_ += size; + last_send_data_ = string((char *)buf, size); + return srs_success; +} + +void MockSrtSocket::set_recv_timeout(srs_utime_t tm) +{ + recv_timeout_ = tm; +} + +void MockSrtSocket::set_send_timeout(srs_utime_t tm) +{ + send_timeout_ = tm; +} + +srs_utime_t MockSrtSocket::get_send_timeout() +{ + return send_timeout_; +} + +srs_utime_t MockSrtSocket::get_recv_timeout() +{ + return recv_timeout_; +} + +int64_t MockSrtSocket::get_send_bytes() +{ + return send_bytes_; +} + +int64_t MockSrtSocket::get_recv_bytes() +{ + return recv_bytes_; +} + // Mock ISrsUdpHandler implementation MockUdpHandler::MockUdpHandler() { @@ -109,7 +194,7 @@ VOID TEST(UdpListenerTest, ListenAndReceivePacket) EXPECT_EQ(sent, (int)test_data.size()); // Wait a bit for the listener to receive and process the packet - srs_usleep(100 * SRS_UTIME_MILLISECONDS); + srs_usleep(10 * SRS_UTIME_MILLISECONDS); // Verify that the mock handler received the packet EXPECT_TRUE(mock_handler->on_udp_packet_called_); @@ -229,7 +314,7 @@ VOID TEST(UdpMuxListenerTest, SetSocketBuffer) EXPECT_GT(fd, 0); // Wait a bit for the cycle() to start and call set_socket_buffer() - srs_usleep(100 * SRS_UTIME_MILLISECONDS); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); // Verify SO_SNDBUF is set - should be greater than default int sndbuf = 0; @@ -274,7 +359,7 @@ VOID TEST(UdpMuxListenerTest, ReceivePacketFromClient) EXPECT_GT(listener->fd(), 0); // Yield to allow the listener coroutine to start and initialize - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); // Create a UDP client socket to send test packets srs_netfd_t client_fd = NULL; @@ -297,7 +382,7 @@ VOID TEST(UdpMuxListenerTest, ReceivePacketFromClient) EXPECT_EQ(sent, (int)test_data.size()); // Yield to allow the listener coroutine to start and initialize - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); // Verify that the mock handler received the packet via SrsUdpMuxSocket EXPECT_TRUE(mock_handler->on_udp_packet_called_); @@ -312,7 +397,7 @@ VOID TEST(UdpMuxListenerTest, ReceivePacketFromClient) EXPECT_EQ(sent, (int)test_data2.size()); // Yield to allow the listener coroutine to start and initialize - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); // Verify the second packet was received EXPECT_EQ(mock_handler->packet_count_, 2); @@ -362,10 +447,10 @@ VOID TEST(UdpMuxSocketTest, SendtoReplyToClient) EXPECT_EQ(sent, (int)test_data.size()); // Yield to allow packet to arrive - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); // Receive the packet with server socket - this populates from_ and fromlen_ - int nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS); + int nread = server_socket->recvfrom(1 * SRS_UTIME_MILLISECONDS); EXPECT_EQ(nread, (int)test_data.size()); EXPECT_EQ(string(server_socket->data(), nread), test_data); @@ -381,13 +466,13 @@ VOID TEST(UdpMuxSocketTest, SendtoReplyToClient) HELPER_EXPECT_SUCCESS(server_socket->sendto((void *)reply_data.c_str(), reply_data.size(), SRS_UTIME_NO_TIMEOUT)); // Yield to allow the packet to be sent - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); // Receive the reply on the client side char recv_buf[1024]; sockaddr_in from_addr; int from_len = sizeof(from_addr); - nread = srs_recvfrom(client_fd, recv_buf, sizeof(recv_buf), (sockaddr *)&from_addr, &from_len, 100 * SRS_UTIME_MILLISECONDS); + nread = srs_recvfrom(client_fd, recv_buf, sizeof(recv_buf), (sockaddr *)&from_addr, &from_len, 1 * SRS_UTIME_MILLISECONDS); // Verify the reply was received EXPECT_EQ(nread, (int)reply_data.size()); @@ -404,10 +489,10 @@ VOID TEST(UdpMuxSocketTest, SendtoReplyToClient) } // Verify at least some packets were received (may not be all due to UDP nature) - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); int received_count = 0; while (received_count < 25) { - nread = srs_recvfrom(client_fd, recv_buf, sizeof(recv_buf), (sockaddr *)&from_addr, &from_len, 10 * SRS_UTIME_MILLISECONDS); + nread = srs_recvfrom(client_fd, recv_buf, sizeof(recv_buf), (sockaddr *)&from_addr, &from_len, 1 * SRS_UTIME_MILLISECONDS); if (nread <= 0) break; received_count++; @@ -458,10 +543,10 @@ VOID TEST(UdpMuxSocketTest, PeerIdGenerationAndCaching) EXPECT_EQ(sent, (int)test_data.size()); // Yield to allow packet to arrive - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); // Receive the packet with server socket - this sets address_changed_ to true - int nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS); + int nread = server_socket->recvfrom(1 * SRS_UTIME_MILLISECONDS); EXPECT_EQ(nread, (int)test_data.size()); EXPECT_EQ(string(server_socket->data(), nread), test_data); @@ -489,10 +574,10 @@ VOID TEST(UdpMuxSocketTest, PeerIdGenerationAndCaching) EXPECT_EQ(sent, (int)test_data2.size()); // Yield to allow packet to arrive - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); // Receive the second packet - this should set address_changed_ to true again - nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS); + nread = server_socket->recvfrom(1 * SRS_UTIME_MILLISECONDS); EXPECT_EQ(nread, (int)test_data2.size()); // Call peer_id() again - should regenerate but return the same value (same client) @@ -521,10 +606,10 @@ VOID TEST(UdpMuxSocketTest, PeerIdGenerationAndCaching) EXPECT_EQ(sent, (int)test_data3.size()); // Yield to allow packet to arrive - srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_usleep(1 * SRS_UTIME_MILLISECONDS); // Receive packet from second client - nread = server_socket->recvfrom(100 * SRS_UTIME_MILLISECONDS); + nread = server_socket->recvfrom(1 * SRS_UTIME_MILLISECONDS); EXPECT_EQ(nread, (int)test_data3.size()); // Call peer_id() - should generate new peer ID with different port @@ -541,3 +626,1232 @@ VOID TEST(UdpMuxSocketTest, PeerIdGenerationAndCaching) EXPECT_EQ(server_socket->get_peer_port(), client_port2); EXPECT_EQ(server_socket->get_peer_ip(), "127.0.0.1"); } + +VOID TEST(SrtConnectionTest, ReadWriteAndTimeouts) +{ + srs_error_t err; + + // Create SrsSrtConnection with a dummy SRT file descriptor + srs_srt_t dummy_fd = 1; + SrsUniquePtr conn(new SrsSrtConnection(dummy_fd)); + + // Create mock SRT socket + MockSrtSocket *mock_socket = new MockSrtSocket(); + + // Inject mock socket into the connection + conn->srt_skt_ = mock_socket; + + // Test initialize - should return success + HELPER_EXPECT_SUCCESS(conn->initialize()); + + // Test set_recv_timeout and get_recv_timeout + srs_utime_t recv_timeout = 1 * SRS_UTIME_SECONDS; + conn->set_recv_timeout(recv_timeout); + EXPECT_EQ(conn->get_recv_timeout(), recv_timeout); + EXPECT_EQ(mock_socket->recv_timeout_, recv_timeout); + + // Test set_send_timeout and get_send_timeout + srs_utime_t send_timeout = 1 * SRS_UTIME_SECONDS; + conn->set_send_timeout(send_timeout); + EXPECT_EQ(conn->get_send_timeout(), send_timeout); + EXPECT_EQ(mock_socket->send_timeout_, send_timeout); + + // Test read - should call mock socket's recvmsg + char read_buf[1024]; + ssize_t nread = 0; + HELPER_EXPECT_SUCCESS(conn->read(read_buf, sizeof(read_buf), &nread)); + EXPECT_EQ(mock_socket->recvmsg_called_count_, 1); + EXPECT_GT(nread, 0); + EXPECT_EQ(mock_socket->recv_bytes_, nread); + + // Test get_recv_bytes + EXPECT_EQ(conn->get_recv_bytes(), mock_socket->recv_bytes_); + + // Test write - should call mock socket's sendmsg + string test_data = "Hello SRT"; + ssize_t nwrite = 0; + HELPER_EXPECT_SUCCESS(conn->write((void *)test_data.c_str(), test_data.size(), &nwrite)); + EXPECT_EQ(mock_socket->sendmsg_called_count_, 1); + EXPECT_EQ(nwrite, (ssize_t)test_data.size()); + EXPECT_EQ(mock_socket->send_bytes_, (int64_t)test_data.size()); + EXPECT_EQ(mock_socket->last_send_data_, test_data); + + // Test get_send_bytes + EXPECT_EQ(conn->get_send_bytes(), mock_socket->send_bytes_); + + // Test read_fully - should return error (unsupported method) + HELPER_EXPECT_FAILED(conn->read_fully(read_buf, sizeof(read_buf), &nread)); + + // Test writev - should return error (unsupported method) + iovec iov[1]; + iov[0].iov_base = (void *)test_data.c_str(); + iov[0].iov_len = test_data.size(); + HELPER_EXPECT_FAILED(conn->writev(iov, 1, &nwrite)); + + // Clean up - set to NULL to avoid double-free + conn->srt_skt_ = NULL; + srs_freep(mock_socket); +} + +// Mock ISrsProtocolReadWriter implementation for SrsSrtRecvThread +MockSrtProtocolReadWriter::MockSrtProtocolReadWriter() +{ + read_error_ = srs_success; + read_count_ = 0; + simulate_timeout_ = false; + test_data_ = "test srt data"; + recv_timeout_ = 1 * SRS_UTIME_SECONDS; + send_timeout_ = 1 * SRS_UTIME_SECONDS; + recv_bytes_ = 0; + send_bytes_ = 0; +} + +MockSrtProtocolReadWriter::~MockSrtProtocolReadWriter() +{ + srs_freep(read_error_); +} + +srs_error_t MockSrtProtocolReadWriter::read(void *buf, size_t size, ssize_t *nread) +{ + read_count_++; + + // Simulate timeout error + if (simulate_timeout_) { + return srs_error_new(ERROR_SRT_TIMEOUT, "srt timeout"); + } + + // Return error if set + if (read_error_ != srs_success) { + return srs_error_copy(read_error_); + } + + // Simulate reading data + size_t copy_size = srs_min(size, test_data_.size()); + memcpy(buf, test_data_.c_str(), copy_size); + *nread = copy_size; + recv_bytes_ += copy_size; + return srs_success; +} + +srs_error_t MockSrtProtocolReadWriter::read_fully(void *buf, size_t size, ssize_t *nread) +{ + return srs_error_new(ERROR_NOT_SUPPORTED, "not supported"); +} + +srs_error_t MockSrtProtocolReadWriter::write(void *buf, size_t size, ssize_t *nwrite) +{ + *nwrite = size; + send_bytes_ += size; + return srs_success; +} + +srs_error_t MockSrtProtocolReadWriter::writev(const iovec *iov, int iov_size, ssize_t *nwrite) +{ + return srs_error_new(ERROR_NOT_SUPPORTED, "not supported"); +} + +void MockSrtProtocolReadWriter::set_recv_timeout(srs_utime_t tm) +{ + recv_timeout_ = tm; +} + +srs_utime_t MockSrtProtocolReadWriter::get_recv_timeout() +{ + return recv_timeout_; +} + +int64_t MockSrtProtocolReadWriter::get_recv_bytes() +{ + return recv_bytes_; +} + +void MockSrtProtocolReadWriter::set_send_timeout(srs_utime_t tm) +{ + send_timeout_ = tm; +} + +srs_utime_t MockSrtProtocolReadWriter::get_send_timeout() +{ + return send_timeout_; +} + +int64_t MockSrtProtocolReadWriter::get_send_bytes() +{ + return send_bytes_; +} + +// Mock ISrsCoroutine implementation for SrsSrtRecvThread +MockSrtCoroutine::MockSrtCoroutine() +{ + pull_error_ = srs_success; + pull_count_ = 0; + started_ = false; +} + +MockSrtCoroutine::~MockSrtCoroutine() +{ + srs_freep(pull_error_); +} + +srs_error_t MockSrtCoroutine::start() +{ + started_ = true; + return srs_success; +} + +void MockSrtCoroutine::stop() +{ +} + +void MockSrtCoroutine::interrupt() +{ +} + +srs_error_t MockSrtCoroutine::pull() +{ + pull_count_++; + + // Return error after 2 calls to allow at least one read iteration + if (pull_error_ != srs_success && pull_count_ > 2) { + return srs_error_copy(pull_error_); + } + + return srs_success; +} + +const SrsContextId &MockSrtCoroutine::cid() +{ + return cid_; +} + +void MockSrtCoroutine::set_cid(const SrsContextId &cid) +{ + cid_ = cid; +} + +VOID TEST(SrtRecvThreadTest, StartAndReadData) +{ + srs_error_t err; + + // Create mock SRT connection + MockSrtProtocolReadWriter *mock_conn = new MockSrtProtocolReadWriter(); + + // Create SrsSrtRecvThread with mock connection + SrsUniquePtr recv_thread(new SrsSrtRecvThread(mock_conn)); + + // Create mock coroutine + MockSrtCoroutine *mock_trd = new MockSrtCoroutine(); + + // Inject mock coroutine into recv thread + srs_freep(recv_thread->trd_); + recv_thread->trd_ = mock_trd; + + // Test start - should call mock coroutine's start() + HELPER_EXPECT_SUCCESS(recv_thread->start()); + EXPECT_TRUE(mock_trd->started_); + + // Test the major use scenario: reading data successfully with timeout handling + // The do_cycle() method has an infinite loop, so we need to make pull() return error after a few iterations + // to break the loop. We'll simulate: timeout -> timeout -> pull error (thread quit) + + // Set pull to return error after 3 calls to break the infinite loop + mock_trd->pull_error_ = srs_error_new(ERROR_THREAD_INTERRUPED, "thread interrupted"); + + // Simulate timeout on first read + mock_conn->simulate_timeout_ = true; + + // Call cycle() which calls do_cycle() - should handle timeout and then exit on pull error + HELPER_EXPECT_FAILED(recv_thread->cycle()); + + // Verify that pull was called and read was called + EXPECT_GT(mock_trd->pull_count_, 0); + EXPECT_GT(mock_conn->read_count_, 0); + + // Verify that recv_err_ was set by cycle() when do_cycle() failed + HELPER_EXPECT_FAILED(recv_thread->get_recv_err()); + + // Clean up - set to NULL to avoid double-free + recv_thread->trd_ = NULL; + srs_freep(mock_trd); + srs_freep(mock_conn); +} + +VOID TEST(MpegtsSrtConnTest, BasicConnectionInfo) +{ + // Create a dummy SRT file descriptor + srs_srt_t dummy_fd = 1; + std::string test_ip = "192.168.1.100"; + int test_port = 9000; + + // Create SrsMpegtsSrtConn with test parameters + SrsUniquePtr conn(new SrsMpegtsSrtConn(NULL, dummy_fd, test_ip, test_port)); + + // Test desc() - should return "srt-ts-conn" + EXPECT_EQ(conn->desc(), "srt-ts-conn"); + + // Test delta() - should return non-NULL delta_ member + ISrsKbpsDelta *delta = conn->delta(); + EXPECT_TRUE(delta != NULL); + + // Test remote_ip() - should return the IP address passed in constructor + EXPECT_EQ(conn->remote_ip(), test_ip); + + // Test get_id() - should return the context ID from the thread + const SrsContextId &cid = conn->get_id(); + EXPECT_TRUE(!cid.empty()); + + // Clean up - set members to NULL to avoid double-free of global references + conn->stat_ = NULL; + conn->config_ = NULL; + conn->stream_publish_tokens_ = NULL; + conn->srt_sources_ = NULL; + conn->live_sources_ = NULL; + conn->rtc_sources_ = NULL; + conn->hooks_ = NULL; +} + +// Mock SrsSrtSource implementation for testing on_srt_packet +MockSrtSourceForPacket::MockSrtSourceForPacket() +{ + on_packet_called_count_ = 0; + on_packet_error_ = srs_success; + last_packet_ = NULL; +} + +MockSrtSourceForPacket::~MockSrtSourceForPacket() +{ + srs_freep(on_packet_error_); + srs_freep(last_packet_); +} + +srs_error_t MockSrtSourceForPacket::on_packet(SrsSrtPacket *packet) +{ + on_packet_called_count_++; + + // Store a copy of the packet for verification + srs_freep(last_packet_); + last_packet_ = packet->copy(); + + if (on_packet_error_ != srs_success) { + return srs_error_copy(on_packet_error_); + } + + return srs_success; +} + +VOID TEST(MpegtsSrtConnTest, OnSrtPacketValidTsPacket) +{ + srs_error_t err; + + // Create a dummy SRT file descriptor + srs_srt_t dummy_fd = 1; + std::string test_ip = "192.168.1.100"; + int test_port = 9000; + + // Create SrsMpegtsSrtConn with test parameters + SrsUniquePtr conn(new SrsMpegtsSrtConn(NULL, dummy_fd, test_ip, test_port)); + + // Create mock SRT source + MockSrtSourceForPacket *mock_source = new MockSrtSourceForPacket(); + + // Inject mock source into the connection + conn->srt_source_ = SrsSharedPtr(mock_source); + + // Test 1: Valid TS packet with correct size (188 bytes) and sync byte (0x47) + char valid_ts_packet[188]; + memset(valid_ts_packet, 0, sizeof(valid_ts_packet)); + valid_ts_packet[0] = 0x47; // TS sync byte + + HELPER_EXPECT_SUCCESS(conn->on_srt_packet(valid_ts_packet, 188)); + EXPECT_EQ(mock_source->on_packet_called_count_, 1); + EXPECT_TRUE(mock_source->last_packet_ != NULL); + EXPECT_EQ(mock_source->last_packet_->size(), 188); + EXPECT_EQ(mock_source->last_packet_->data()[0], 0x47); + + // Test 2: Valid TS packet with multiple TS packets (376 bytes = 2 * 188) + char valid_ts_packets[376]; + memset(valid_ts_packets, 0, sizeof(valid_ts_packets)); + valid_ts_packets[0] = 0x47; // First TS packet sync byte + valid_ts_packets[188] = 0x47; // Second TS packet sync byte + + HELPER_EXPECT_SUCCESS(conn->on_srt_packet(valid_ts_packets, 376)); + EXPECT_EQ(mock_source->on_packet_called_count_, 2); + EXPECT_TRUE(mock_source->last_packet_ != NULL); + EXPECT_EQ(mock_source->last_packet_->size(), 376); + + // Test 3: Invalid length (0 bytes) - should return success but not call on_packet + int prev_count = mock_source->on_packet_called_count_; + HELPER_EXPECT_SUCCESS(conn->on_srt_packet(valid_ts_packet, 0)); + EXPECT_EQ(mock_source->on_packet_called_count_, prev_count); // No change + + // Test 4: Invalid length (negative) - should return success but not call on_packet + HELPER_EXPECT_SUCCESS(conn->on_srt_packet(valid_ts_packet, -1)); + EXPECT_EQ(mock_source->on_packet_called_count_, prev_count); // No change + + // Test 5: Invalid size (not multiple of 188) - should return error + HELPER_EXPECT_FAILED(conn->on_srt_packet(valid_ts_packet, 100)); + EXPECT_EQ(mock_source->on_packet_called_count_, prev_count); // No change + + // Test 6: Invalid sync byte (not 0x47) - should return error + char invalid_sync_packet[188]; + memset(invalid_sync_packet, 0, sizeof(invalid_sync_packet)); + invalid_sync_packet[0] = 0x48; // Wrong sync byte + + HELPER_EXPECT_FAILED(conn->on_srt_packet(invalid_sync_packet, 188)); + EXPECT_EQ(mock_source->on_packet_called_count_, prev_count); // No change + + // Test 7: Source returns error - should propagate error + mock_source->on_packet_error_ = srs_error_new(ERROR_SRT_CONN, "mock error"); + HELPER_EXPECT_FAILED(conn->on_srt_packet(valid_ts_packet, 188)); + + // Clean up - set members to NULL to avoid double-free of global references + conn->stat_ = NULL; + conn->config_ = NULL; + conn->stream_publish_tokens_ = NULL; + conn->srt_sources_ = NULL; + conn->live_sources_ = NULL; + conn->rtc_sources_ = NULL; + conn->hooks_ = NULL; +} + +// Mock ISrsAppConfig for SRT HTTP hooks implementation +MockAppConfigForSrtHooks::MockAppConfigForSrtHooks() +{ + on_connect_directive_ = NULL; + on_close_directive_ = NULL; + on_publish_directive_ = NULL; + on_unpublish_directive_ = NULL; + on_play_directive_ = NULL; + on_stop_directive_ = NULL; +} + +MockAppConfigForSrtHooks::~MockAppConfigForSrtHooks() +{ + clear_on_connect_directive(); + clear_on_close_directive(); + clear_on_publish_directive(); + clear_on_unpublish_directive(); + clear_on_play_directive(); + clear_on_stop_directive(); +} + +SrsConfDirective *MockAppConfigForSrtHooks::get_vhost_on_connect(std::string vhost) +{ + return on_connect_directive_; +} + +SrsConfDirective *MockAppConfigForSrtHooks::get_vhost_on_close(std::string vhost) +{ + return on_close_directive_; +} + +SrsConfDirective *MockAppConfigForSrtHooks::get_vhost_on_publish(std::string vhost) +{ + return on_publish_directive_; +} + +SrsConfDirective *MockAppConfigForSrtHooks::get_vhost_on_unpublish(std::string vhost) +{ + return on_unpublish_directive_; +} + +SrsConfDirective *MockAppConfigForSrtHooks::get_vhost_on_play(std::string vhost) +{ + return on_play_directive_; +} + +SrsConfDirective *MockAppConfigForSrtHooks::get_vhost_on_stop(std::string vhost) +{ + return on_stop_directive_; +} + +void MockAppConfigForSrtHooks::set_on_connect_urls(const std::vector &urls) +{ + clear_on_connect_directive(); + if (!urls.empty()) { + on_connect_directive_ = new SrsConfDirective(); + on_connect_directive_->name_ = "on_connect"; + on_connect_directive_->args_ = urls; + } +} + +void MockAppConfigForSrtHooks::set_on_close_urls(const std::vector &urls) +{ + clear_on_close_directive(); + if (!urls.empty()) { + on_close_directive_ = new SrsConfDirective(); + on_close_directive_->name_ = "on_close"; + on_close_directive_->args_ = urls; + } +} + +void MockAppConfigForSrtHooks::set_on_publish_urls(const std::vector &urls) +{ + clear_on_publish_directive(); + if (!urls.empty()) { + on_publish_directive_ = new SrsConfDirective(); + on_publish_directive_->name_ = "on_publish"; + on_publish_directive_->args_ = urls; + } +} + +void MockAppConfigForSrtHooks::set_on_unpublish_urls(const std::vector &urls) +{ + clear_on_unpublish_directive(); + if (!urls.empty()) { + on_unpublish_directive_ = new SrsConfDirective(); + on_unpublish_directive_->name_ = "on_unpublish"; + on_unpublish_directive_->args_ = urls; + } +} + +void MockAppConfigForSrtHooks::set_on_play_urls(const std::vector &urls) +{ + clear_on_play_directive(); + if (!urls.empty()) { + on_play_directive_ = new SrsConfDirective(); + on_play_directive_->name_ = "on_play"; + on_play_directive_->args_ = urls; + } +} + +void MockAppConfigForSrtHooks::set_on_stop_urls(const std::vector &urls) +{ + clear_on_stop_directive(); + if (!urls.empty()) { + on_stop_directive_ = new SrsConfDirective(); + on_stop_directive_->name_ = "on_stop"; + on_stop_directive_->args_ = urls; + } +} + +void MockAppConfigForSrtHooks::clear_on_connect_directive() +{ + srs_freep(on_connect_directive_); +} + +void MockAppConfigForSrtHooks::clear_on_close_directive() +{ + srs_freep(on_close_directive_); +} + +void MockAppConfigForSrtHooks::clear_on_publish_directive() +{ + srs_freep(on_publish_directive_); +} + +void MockAppConfigForSrtHooks::clear_on_unpublish_directive() +{ + srs_freep(on_unpublish_directive_); +} + +void MockAppConfigForSrtHooks::clear_on_play_directive() +{ + srs_freep(on_play_directive_); +} + +void MockAppConfigForSrtHooks::clear_on_stop_directive() +{ + srs_freep(on_stop_directive_); +} + +// Mock ISrsHttpHooks for SRT implementation +MockHttpHooksForSrt::MockHttpHooksForSrt() +{ + on_connect_count_ = 0; + on_connect_error_ = srs_success; + on_close_count_ = 0; + on_publish_count_ = 0; + on_publish_error_ = srs_success; + on_unpublish_count_ = 0; + on_play_count_ = 0; + on_play_error_ = srs_success; + on_stop_count_ = 0; +} + +MockHttpHooksForSrt::~MockHttpHooksForSrt() +{ + srs_freep(on_connect_error_); + srs_freep(on_publish_error_); + srs_freep(on_play_error_); + clear_calls(); +} + +srs_error_t MockHttpHooksForSrt::on_connect(std::string url, ISrsRequest *req) +{ + on_connect_count_++; + on_connect_calls_.push_back(std::make_pair(url, req)); + if (on_connect_error_ != srs_success) { + return srs_error_copy(on_connect_error_); + } + return srs_success; +} + +void MockHttpHooksForSrt::on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes) +{ + on_close_count_++; + on_close_calls_.push_back(std::make_tuple(url, req, send_bytes, recv_bytes)); +} + +srs_error_t MockHttpHooksForSrt::on_publish(std::string url, ISrsRequest *req) +{ + on_publish_count_++; + on_publish_calls_.push_back(std::make_pair(url, req)); + if (on_publish_error_ != srs_success) { + return srs_error_copy(on_publish_error_); + } + return srs_success; +} + +void MockHttpHooksForSrt::on_unpublish(std::string url, ISrsRequest *req) +{ + on_unpublish_count_++; + on_unpublish_calls_.push_back(std::make_pair(url, req)); +} + +srs_error_t MockHttpHooksForSrt::on_play(std::string url, ISrsRequest *req) +{ + on_play_count_++; + on_play_calls_.push_back(std::make_pair(url, req)); + if (on_play_error_ != srs_success) { + return srs_error_copy(on_play_error_); + } + return srs_success; +} + +void MockHttpHooksForSrt::on_stop(std::string url, ISrsRequest *req) +{ + on_stop_count_++; + on_stop_calls_.push_back(std::make_pair(url, req)); +} + +srs_error_t MockHttpHooksForSrt::on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForSrt::on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, + std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForSrt::on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForSrt::discover_co_workers(std::string url, std::string &host, int &port) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForSrt::on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls) +{ + return srs_success; +} + +void MockHttpHooksForSrt::clear_calls() +{ + on_connect_calls_.clear(); + on_connect_count_ = 0; + on_close_calls_.clear(); + on_close_count_ = 0; + on_publish_calls_.clear(); + on_publish_count_ = 0; + on_unpublish_calls_.clear(); + on_unpublish_count_ = 0; + on_play_calls_.clear(); + on_play_count_ = 0; + on_stop_calls_.clear(); + on_stop_count_ = 0; +} + +VOID TEST(MpegtsSrtConnTest, HttpHooksOnConnect) +{ + srs_error_t err; + + // Create a dummy SRT file descriptor + srs_srt_t dummy_fd = 1; + std::string test_ip = "192.168.1.100"; + int test_port = 9000; + + // Create SrsMpegtsSrtConn with test parameters + SrsUniquePtr conn(new SrsMpegtsSrtConn(NULL, dummy_fd, test_ip, test_port)); + + // Create mock config + SrsUniquePtr mock_config(new MockAppConfigForSrtHooks()); + + // Create mock hooks + SrsUniquePtr mock_hooks(new MockHttpHooksForSrt()); + + // Create mock request + SrsUniquePtr mock_req(new MockRtcAsyncCallRequest("test.vhost", "live", "stream1")); + + // Inject mocks into connection + conn->config_ = mock_config.get(); + conn->hooks_ = mock_hooks.get(); + conn->req_ = mock_req.get(); + + // Test 1: HTTP hooks disabled - should return success without calling hooks + mock_config->set_http_hooks_enabled(false); + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_connect()); + EXPECT_EQ(mock_hooks->on_connect_count_, 0); + + // Test 2: HTTP hooks enabled but no on_connect URLs configured - should return success + mock_config->set_http_hooks_enabled(true); + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_connect()); + EXPECT_EQ(mock_hooks->on_connect_count_, 0); + + // Test 3: HTTP hooks enabled with on_connect URLs - should call hooks + std::vector urls; + urls.push_back("http://localhost:8080/on_connect"); + urls.push_back("http://localhost:8080/on_connect2"); + mock_config->set_on_connect_urls(urls); + + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_connect()); + EXPECT_EQ(mock_hooks->on_connect_count_, 2); + EXPECT_EQ(mock_hooks->on_connect_calls_.size(), 2); + EXPECT_EQ(mock_hooks->on_connect_calls_[0].first, "http://localhost:8080/on_connect"); + EXPECT_EQ(mock_hooks->on_connect_calls_[0].second, mock_req.get()); + EXPECT_EQ(mock_hooks->on_connect_calls_[1].first, "http://localhost:8080/on_connect2"); + + // Test 4: Hook returns error - should propagate error + mock_hooks->clear_calls(); + mock_hooks->on_connect_error_ = srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "hook failed"); + HELPER_EXPECT_FAILED(conn->http_hooks_on_connect()); + + // Clean up - set to NULL to avoid double-free + conn->config_ = NULL; + conn->hooks_ = NULL; + conn->req_ = NULL; + conn->stat_ = NULL; + conn->stream_publish_tokens_ = NULL; + conn->srt_sources_ = NULL; + conn->live_sources_ = NULL; + conn->rtc_sources_ = NULL; +} + +VOID TEST(MpegtsSrtConnTest, HttpHooksOnClose) +{ + // Create a dummy SRT file descriptor + srs_srt_t dummy_fd = 1; + std::string test_ip = "192.168.1.100"; + int test_port = 9000; + + // Create SrsMpegtsSrtConn with test parameters + SrsUniquePtr conn(new SrsMpegtsSrtConn(NULL, dummy_fd, test_ip, test_port)); + + // Create mock config + SrsUniquePtr mock_config(new MockAppConfigForSrtHooks()); + + // Create mock hooks + SrsUniquePtr mock_hooks(new MockHttpHooksForSrt()); + + // Create mock request + SrsUniquePtr mock_req(new MockRtcAsyncCallRequest("test.vhost", "live", "stream1")); + + // Create mock SRT protocol read/writer to track bytes + MockSrtProtocolReadWriter *mock_srt_conn = new MockSrtProtocolReadWriter(); + mock_srt_conn->send_bytes_ = 1000; + mock_srt_conn->recv_bytes_ = 2000; + + // Inject mocks into connection + conn->config_ = mock_config.get(); + conn->hooks_ = mock_hooks.get(); + conn->req_ = mock_req.get(); + conn->srt_conn_ = mock_srt_conn; + + // Test 1: HTTP hooks disabled - should not call hooks + mock_config->set_http_hooks_enabled(false); + conn->http_hooks_on_close(); + EXPECT_EQ(mock_hooks->on_close_count_, 0); + + // Test 2: HTTP hooks enabled but no on_close URLs configured - should not call hooks + mock_config->set_http_hooks_enabled(true); + conn->http_hooks_on_close(); + EXPECT_EQ(mock_hooks->on_close_count_, 0); + + // Test 3: HTTP hooks enabled with on_close URLs - should call hooks with byte counts + std::vector urls; + urls.push_back("http://localhost:8080/on_close"); + urls.push_back("http://localhost:8080/on_close2"); + mock_config->set_on_close_urls(urls); + + conn->http_hooks_on_close(); + EXPECT_EQ(mock_hooks->on_close_count_, 2); + EXPECT_EQ(mock_hooks->on_close_calls_.size(), 2); + EXPECT_EQ(std::get<0>(mock_hooks->on_close_calls_[0]), "http://localhost:8080/on_close"); + EXPECT_EQ(std::get<1>(mock_hooks->on_close_calls_[0]), mock_req.get()); + EXPECT_EQ(std::get<2>(mock_hooks->on_close_calls_[0]), 1000); // send_bytes + EXPECT_EQ(std::get<3>(mock_hooks->on_close_calls_[0]), 2000); // recv_bytes + + // Clean up - set to NULL to avoid double-free + conn->srt_conn_ = NULL; + srs_freep(mock_srt_conn); + conn->config_ = NULL; + conn->hooks_ = NULL; + conn->req_ = NULL; + conn->stat_ = NULL; + conn->stream_publish_tokens_ = NULL; + conn->srt_sources_ = NULL; + conn->live_sources_ = NULL; + conn->rtc_sources_ = NULL; +} + +VOID TEST(MpegtsSrtConnTest, HttpHooksOnPublish) +{ + srs_error_t err; + + // Create a dummy SRT file descriptor + srs_srt_t dummy_fd = 1; + std::string test_ip = "192.168.1.100"; + int test_port = 9000; + + // Create SrsMpegtsSrtConn with test parameters + SrsUniquePtr conn(new SrsMpegtsSrtConn(NULL, dummy_fd, test_ip, test_port)); + + // Create mock config + SrsUniquePtr mock_config(new MockAppConfigForSrtHooks()); + + // Create mock hooks + SrsUniquePtr mock_hooks(new MockHttpHooksForSrt()); + + // Create mock request + SrsUniquePtr mock_req(new MockRtcAsyncCallRequest("test.vhost", "live", "stream1")); + + // Inject mocks into connection + conn->config_ = mock_config.get(); + conn->hooks_ = mock_hooks.get(); + conn->req_ = mock_req.get(); + + // Test 1: HTTP hooks disabled - should return success without calling hooks + mock_config->set_http_hooks_enabled(false); + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_publish()); + EXPECT_EQ(mock_hooks->on_publish_count_, 0); + + // Test 2: HTTP hooks enabled but no on_publish URLs configured - should return success + mock_config->set_http_hooks_enabled(true); + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_publish()); + EXPECT_EQ(mock_hooks->on_publish_count_, 0); + + // Test 3: HTTP hooks enabled with on_publish URLs - should call hooks + std::vector urls; + urls.push_back("http://localhost:8080/on_publish"); + urls.push_back("http://localhost:8080/on_publish2"); + mock_config->set_on_publish_urls(urls); + + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_publish()); + EXPECT_EQ(mock_hooks->on_publish_count_, 2); + EXPECT_EQ(mock_hooks->on_publish_calls_.size(), 2); + EXPECT_EQ(mock_hooks->on_publish_calls_[0].first, "http://localhost:8080/on_publish"); + EXPECT_EQ(mock_hooks->on_publish_calls_[0].second, mock_req.get()); + + // Test 4: Hook returns error - should propagate error + mock_hooks->clear_calls(); + mock_hooks->on_publish_error_ = srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "publish hook failed"); + HELPER_EXPECT_FAILED(conn->http_hooks_on_publish()); + + // Clean up - set to NULL to avoid double-free + conn->config_ = NULL; + conn->hooks_ = NULL; + conn->req_ = NULL; + conn->stat_ = NULL; + conn->stream_publish_tokens_ = NULL; + conn->srt_sources_ = NULL; + conn->live_sources_ = NULL; + conn->rtc_sources_ = NULL; +} + +VOID TEST(MpegtsSrtConnTest, HttpHooksOnUnpublish) +{ + // Create a dummy SRT file descriptor + srs_srt_t dummy_fd = 1; + std::string test_ip = "192.168.1.100"; + int test_port = 9000; + + // Create SrsMpegtsSrtConn with test parameters + SrsUniquePtr conn(new SrsMpegtsSrtConn(NULL, dummy_fd, test_ip, test_port)); + + // Create mock config + SrsUniquePtr mock_config(new MockAppConfigForSrtHooks()); + + // Create mock hooks + SrsUniquePtr mock_hooks(new MockHttpHooksForSrt()); + + // Create mock request + SrsUniquePtr mock_req(new MockRtcAsyncCallRequest("test.vhost", "live", "stream1")); + + // Inject mocks into connection + conn->config_ = mock_config.get(); + conn->hooks_ = mock_hooks.get(); + conn->req_ = mock_req.get(); + + // Test 1: HTTP hooks disabled - should not call hooks + mock_config->set_http_hooks_enabled(false); + conn->http_hooks_on_unpublish(); + EXPECT_EQ(mock_hooks->on_unpublish_count_, 0); + + // Test 2: HTTP hooks enabled but no on_unpublish URLs configured - should not call hooks + mock_config->set_http_hooks_enabled(true); + conn->http_hooks_on_unpublish(); + EXPECT_EQ(mock_hooks->on_unpublish_count_, 0); + + // Test 3: HTTP hooks enabled with on_unpublish URLs - should call hooks + std::vector urls; + urls.push_back("http://localhost:8080/on_unpublish"); + urls.push_back("http://localhost:8080/on_unpublish2"); + mock_config->set_on_unpublish_urls(urls); + + conn->http_hooks_on_unpublish(); + EXPECT_EQ(mock_hooks->on_unpublish_count_, 2); + EXPECT_EQ(mock_hooks->on_unpublish_calls_.size(), 2); + EXPECT_EQ(mock_hooks->on_unpublish_calls_[0].first, "http://localhost:8080/on_unpublish"); + EXPECT_EQ(mock_hooks->on_unpublish_calls_[0].second, mock_req.get()); + + // Clean up - set to NULL to avoid double-free + conn->config_ = NULL; + conn->hooks_ = NULL; + conn->req_ = NULL; + conn->stat_ = NULL; + conn->stream_publish_tokens_ = NULL; + conn->srt_sources_ = NULL; + conn->live_sources_ = NULL; + conn->rtc_sources_ = NULL; +} + +VOID TEST(MpegtsSrtConnTest, HttpHooksOnPlay) +{ + srs_error_t err; + + // Create a dummy SRT file descriptor + srs_srt_t dummy_fd = 1; + std::string test_ip = "192.168.1.100"; + int test_port = 9000; + + // Create SrsMpegtsSrtConn with test parameters + SrsUniquePtr conn(new SrsMpegtsSrtConn(NULL, dummy_fd, test_ip, test_port)); + + // Create mock config + SrsUniquePtr mock_config(new MockAppConfigForSrtHooks()); + + // Create mock hooks + SrsUniquePtr mock_hooks(new MockHttpHooksForSrt()); + + // Create mock request + SrsUniquePtr mock_req(new MockRtcAsyncCallRequest("test.vhost", "live", "stream1")); + + // Inject mocks into connection + conn->config_ = mock_config.get(); + conn->hooks_ = mock_hooks.get(); + conn->req_ = mock_req.get(); + + // Test 1: HTTP hooks disabled - should return success without calling hooks + mock_config->set_http_hooks_enabled(false); + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_play()); + EXPECT_EQ(mock_hooks->on_play_count_, 0); + + // Test 2: HTTP hooks enabled but no on_play URLs configured - should return success + mock_config->set_http_hooks_enabled(true); + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_play()); + EXPECT_EQ(mock_hooks->on_play_count_, 0); + + // Test 3: HTTP hooks enabled with on_play URLs - should call hooks + std::vector urls; + urls.push_back("http://localhost:8080/on_play"); + urls.push_back("http://localhost:8080/on_play2"); + mock_config->set_on_play_urls(urls); + + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_play()); + EXPECT_EQ(mock_hooks->on_play_count_, 2); + EXPECT_EQ(mock_hooks->on_play_calls_.size(), 2); + EXPECT_EQ(mock_hooks->on_play_calls_[0].first, "http://localhost:8080/on_play"); + EXPECT_EQ(mock_hooks->on_play_calls_[0].second, mock_req.get()); + + // Test 4: Hook returns error - should propagate error + mock_hooks->clear_calls(); + mock_hooks->on_play_error_ = srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "play hook failed"); + HELPER_EXPECT_FAILED(conn->http_hooks_on_play()); + + // Clean up - set to NULL to avoid double-free + conn->config_ = NULL; + conn->hooks_ = NULL; + conn->req_ = NULL; + conn->stat_ = NULL; + conn->stream_publish_tokens_ = NULL; + conn->srt_sources_ = NULL; + conn->live_sources_ = NULL; + conn->rtc_sources_ = NULL; +} + +VOID TEST(MpegtsSrtConnTest, HttpHooksOnStop) +{ + // Create a dummy SRT file descriptor + srs_srt_t dummy_fd = 1; + std::string test_ip = "192.168.1.100"; + int test_port = 9000; + + // Create SrsMpegtsSrtConn with test parameters + SrsUniquePtr conn(new SrsMpegtsSrtConn(NULL, dummy_fd, test_ip, test_port)); + + // Create mock config + SrsUniquePtr mock_config(new MockAppConfigForSrtHooks()); + + // Create mock hooks + SrsUniquePtr mock_hooks(new MockHttpHooksForSrt()); + + // Create mock request + SrsUniquePtr mock_req(new MockRtcAsyncCallRequest("test.vhost", "live", "stream1")); + + // Inject mocks into connection + conn->config_ = mock_config.get(); + conn->hooks_ = mock_hooks.get(); + conn->req_ = mock_req.get(); + + // Test 1: HTTP hooks disabled - should not call hooks + mock_config->set_http_hooks_enabled(false); + conn->http_hooks_on_stop(); + EXPECT_EQ(mock_hooks->on_stop_count_, 0); + + // Test 2: HTTP hooks enabled but no on_stop URLs configured - should not call hooks + mock_config->set_http_hooks_enabled(true); + conn->http_hooks_on_stop(); + EXPECT_EQ(mock_hooks->on_stop_count_, 0); + + // Test 3: HTTP hooks enabled with on_stop URLs - should call hooks + std::vector urls; + urls.push_back("http://localhost:8080/on_stop"); + urls.push_back("http://localhost:8080/on_stop2"); + mock_config->set_on_stop_urls(urls); + + conn->http_hooks_on_stop(); + EXPECT_EQ(mock_hooks->on_stop_count_, 2); + EXPECT_EQ(mock_hooks->on_stop_calls_.size(), 2); + EXPECT_EQ(mock_hooks->on_stop_calls_[0].first, "http://localhost:8080/on_stop"); + EXPECT_EQ(mock_hooks->on_stop_calls_[0].second, mock_req.get()); + + // Clean up - set to NULL to avoid double-free + conn->config_ = NULL; + conn->hooks_ = NULL; + conn->req_ = NULL; + conn->stat_ = NULL; + conn->stream_publish_tokens_ = NULL; + conn->srt_sources_ = NULL; + conn->live_sources_ = NULL; + conn->rtc_sources_ = NULL; +} + +VOID TEST(ApiServerAsCandidatesTest, MajorUseScenario) +{ + srs_error_t err; + + // Create mock config + SrsUniquePtr mock_config(new MockAppConfig()); + + // Test 1: API as candidates disabled - should return success without adding candidates + mock_config->set_api_as_candidates(false); + set candidate_ips; + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "192.168.1.100", candidate_ips)); + EXPECT_EQ(candidate_ips.size(), 0); + + // Test 2: Empty API string - should return success without adding candidates + mock_config->set_api_as_candidates(true); + candidate_ips.clear(); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "", candidate_ips)); + EXPECT_EQ(candidate_ips.size(), 0); + + // Test 3: Localhost name - should return success without adding candidates + candidate_ips.clear(); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "localhost", candidate_ips)); + EXPECT_EQ(candidate_ips.size(), 0); + + // Test 4: Loopback addresses - should return success without adding candidates + candidate_ips.clear(); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "127.0.0.1", candidate_ips)); + EXPECT_EQ(candidate_ips.size(), 0); + + candidate_ips.clear(); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "0.0.0.0", candidate_ips)); + EXPECT_EQ(candidate_ips.size(), 0); + + candidate_ips.clear(); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "::", candidate_ips)); + EXPECT_EQ(candidate_ips.size(), 0); + + // Test 5: Valid IPv4 address - should add to candidates + candidate_ips.clear(); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "192.168.1.100", candidate_ips)); + EXPECT_EQ(candidate_ips.size(), 1); + EXPECT_TRUE(candidate_ips.find("192.168.1.100") != candidate_ips.end()); + + // Test 6: Domain name with keep_api_domain enabled - should add domain to candidates + mock_config->set_keep_api_domain(true); + mock_config->set_resolve_api_domain(false); + candidate_ips.clear(); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "example.com", candidate_ips)); + EXPECT_EQ(candidate_ips.size(), 1); + EXPECT_TRUE(candidate_ips.find("example.com") != candidate_ips.end()); + + // Test 7: Domain name with resolve_api_domain enabled - should resolve and add IP + // Note: DNS resolution may fail in test environment, so we test with a domain that resolves to loopback + // which should be filtered out, resulting in no candidates added + mock_config->set_keep_api_domain(false); + mock_config->set_resolve_api_domain(true); + candidate_ips.clear(); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "localhost", candidate_ips)); + // localhost resolves to 127.0.0.1 which is filtered out + EXPECT_EQ(candidate_ips.size(), 0); + + // Test 8: Multiple calls should accumulate candidates in the set + candidate_ips.clear(); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "192.168.1.100", candidate_ips)); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "192.168.1.101", candidate_ips)); + EXPECT_EQ(candidate_ips.size(), 2); + EXPECT_TRUE(candidate_ips.find("192.168.1.100") != candidate_ips.end()); + EXPECT_TRUE(candidate_ips.find("192.168.1.101") != candidate_ips.end()); + + // Test 9: Duplicate IPs should not be added twice (set behavior) + candidate_ips.clear(); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "192.168.1.100", candidate_ips)); + HELPER_EXPECT_SUCCESS(api_server_as_candidates(mock_config.get(), "192.168.1.100", candidate_ips)); + EXPECT_EQ(candidate_ips.size(), 1); + EXPECT_TRUE(candidate_ips.find("192.168.1.100") != candidate_ips.end()); +} + +// Mock SrsProtocolUtility implementation +MockProtocolUtility::MockProtocolUtility() +{ +} + +MockProtocolUtility::~MockProtocolUtility() +{ + clear_ips(); +} + +vector &MockProtocolUtility::local_ips() +{ + return mock_ips_; +} + +void MockProtocolUtility::add_ip(string ip, string ifname, bool is_ipv4, bool is_loopback, bool is_internet) +{ + SrsIPAddress *addr = new SrsIPAddress(); + addr->ip_ = ip; + addr->ifname_ = ifname; + addr->is_ipv4_ = is_ipv4; + addr->is_loopback_ = is_loopback; + addr->is_internet_ = is_internet; + mock_ips_.push_back(addr); +} + +void MockProtocolUtility::clear_ips() +{ + for (size_t i = 0; i < mock_ips_.size(); i++) { + srs_freep(mock_ips_[i]); + } + mock_ips_.clear(); +} + +// Mock ISrsAppConfig for discover_candidates implementation +MockAppConfigForDiscoverCandidates::MockAppConfigForDiscoverCandidates() +{ + rtc_server_candidates_ = "*"; + use_auto_detect_network_ip_ = true; + rtc_server_ip_family_ = "ipv4"; + api_as_candidates_ = false; + resolve_api_domain_ = false; + keep_api_domain_ = false; +} + +MockAppConfigForDiscoverCandidates::~MockAppConfigForDiscoverCandidates() +{ +} + +string MockAppConfigForDiscoverCandidates::get_rtc_server_candidates() +{ + return rtc_server_candidates_; +} + +bool MockAppConfigForDiscoverCandidates::get_use_auto_detect_network_ip() +{ + return use_auto_detect_network_ip_; +} + +string MockAppConfigForDiscoverCandidates::get_rtc_server_ip_family() +{ + return rtc_server_ip_family_; +} + +VOID TEST(RtcServerTest, DiscoverCandidates_AutoDetectIPv4) +{ + // Create mock utility with multiple network interfaces + SrsUniquePtr mock_utility(new MockProtocolUtility()); + mock_utility->add_ip("127.0.0.1", "lo", true, true, false); // loopback + mock_utility->add_ip("192.168.1.100", "eth0", true, false, false); // private IPv4 + mock_utility->add_ip("10.0.0.50", "eth1", true, false, false); // private IPv4 + mock_utility->add_ip("fe80::1", "eth0", false, false, false); // IPv6 + + // Create mock config with auto-detect enabled + SrsUniquePtr mock_config(new MockAppConfigForDiscoverCandidates()); + mock_config->rtc_server_candidates_ = "*"; + mock_config->use_auto_detect_network_ip_ = true; + mock_config->rtc_server_ip_family_ = "ipv4"; + + // Create RTC user config + SrsUniquePtr ruc(new SrsRtcUserConfig()); + ruc->req_->host_ = "example.com"; + + // Test: Auto-detect should return non-loopback IPv4 addresses + set candidates = discover_candidates(mock_utility.get(), mock_config.get(), ruc.get()); + + EXPECT_EQ(candidates.size(), 2); + EXPECT_TRUE(candidates.find("192.168.1.100") != candidates.end()); + EXPECT_TRUE(candidates.find("10.0.0.50") != candidates.end()); + EXPECT_TRUE(candidates.find("127.0.0.1") == candidates.end()); // loopback excluded + EXPECT_TRUE(candidates.find("fe80::1") == candidates.end()); // IPv6 excluded when family=ipv4 +} + +VOID TEST(RtcServerTest, DiscoverCandidates_ExplicitCandidate) +{ + // Create mock utility with network interfaces + SrsUniquePtr mock_utility(new MockProtocolUtility()); + mock_utility->add_ip("192.168.1.100", "eth0", true, false, false); + + // Create mock config with explicit candidate + SrsUniquePtr mock_config(new MockAppConfigForDiscoverCandidates()); + mock_config->rtc_server_candidates_ = "203.0.113.10"; // explicit public IP + mock_config->use_auto_detect_network_ip_ = true; + + // Create RTC user config + SrsUniquePtr ruc(new SrsRtcUserConfig()); + ruc->req_->host_ = "example.com"; + + // Test: Explicit candidate should override auto-detection + set candidates = discover_candidates(mock_utility.get(), mock_config.get(), ruc.get()); + + EXPECT_EQ(candidates.size(), 1); + EXPECT_TRUE(candidates.find("203.0.113.10") != candidates.end()); + EXPECT_TRUE(candidates.find("192.168.1.100") == candidates.end()); // auto-detected IP not included +} + +VOID TEST(RtcServerTest, DiscoverCandidates_EipOverride) +{ + // Create mock utility + SrsUniquePtr mock_utility(new MockProtocolUtility()); + mock_utility->add_ip("192.168.1.100", "eth0", true, false, false); + + // Create mock config + SrsUniquePtr mock_config(new MockAppConfigForDiscoverCandidates()); + mock_config->rtc_server_candidates_ = "*"; + + // Create RTC user config with eip specified + SrsUniquePtr ruc(new SrsRtcUserConfig()); + ruc->req_->host_ = "example.com"; + ruc->eip_ = "198.51.100.20"; // user-specified external IP + + // Test: User-specified eip should be included in candidates + set candidates = discover_candidates(mock_utility.get(), mock_config.get(), ruc.get()); + + EXPECT_TRUE(candidates.find("198.51.100.20") != candidates.end()); + EXPECT_TRUE(candidates.find("192.168.1.100") != candidates.end()); +} diff --git a/trunk/src/utest/srs_utest_app16.hpp b/trunk/src/utest/srs_utest_app16.hpp index bfa2176dd..0766e35bb 100644 --- a/trunk/src/utest/srs_utest_app16.hpp +++ b/trunk/src/utest/srs_utest_app16.hpp @@ -13,10 +13,41 @@ #include #include +#include #include #include #include +// Mock ISrsSrtSocket for testing SrsSrtConnection +class MockSrtSocket : public ISrsSrtSocket +{ +public: + srs_utime_t recv_timeout_; + srs_utime_t send_timeout_; + int64_t recv_bytes_; + int64_t send_bytes_; + srs_error_t recvmsg_error_; + srs_error_t sendmsg_error_; + int recvmsg_called_count_; + int sendmsg_called_count_; + std::string last_recv_data_; + std::string last_send_data_; + +public: + MockSrtSocket(); + virtual ~MockSrtSocket(); + +public: + virtual srs_error_t recvmsg(void *buf, size_t size, ssize_t *nread); + virtual srs_error_t sendmsg(void *buf, size_t size, ssize_t *nwrite); + virtual void set_recv_timeout(srs_utime_t tm); + virtual void set_send_timeout(srs_utime_t tm); + virtual srs_utime_t get_send_timeout(); + virtual srs_utime_t get_recv_timeout(); + virtual int64_t get_send_bytes(); + virtual int64_t get_recv_bytes(); +}; + // Mock ISrsUdpHandler for testing SrsUdpListener class MockUdpHandler : public ISrsUdpHandler { @@ -53,4 +84,172 @@ public: virtual srs_error_t on_udp_packet(ISrsUdpMuxSocket *skt); }; +// Mock ISrsProtocolReadWriter for testing SrsSrtRecvThread +class MockSrtProtocolReadWriter : public ISrsProtocolReadWriter +{ +public: + srs_error_t read_error_; + int read_count_; + bool simulate_timeout_; + std::string test_data_; + srs_utime_t recv_timeout_; + srs_utime_t send_timeout_; + int64_t recv_bytes_; + int64_t send_bytes_; + +public: + MockSrtProtocolReadWriter(); + virtual ~MockSrtProtocolReadWriter(); + +public: + virtual srs_error_t read(void *buf, size_t size, ssize_t *nread); + virtual srs_error_t read_fully(void *buf, size_t size, ssize_t *nread); + virtual srs_error_t write(void *buf, size_t size, ssize_t *nwrite); + virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t *nwrite); + virtual void set_recv_timeout(srs_utime_t tm); + virtual srs_utime_t get_recv_timeout(); + virtual int64_t get_recv_bytes(); + virtual void set_send_timeout(srs_utime_t tm); + virtual srs_utime_t get_send_timeout(); + virtual int64_t get_send_bytes(); +}; + +// Mock ISrsCoroutine for testing SrsSrtRecvThread +class MockSrtCoroutine : public ISrsCoroutine +{ +public: + srs_error_t pull_error_; + int pull_count_; + bool started_; + SrsContextId cid_; + +public: + MockSrtCoroutine(); + virtual ~MockSrtCoroutine(); + +public: + virtual srs_error_t start(); + virtual void stop(); + virtual void interrupt(); + virtual srs_error_t pull(); + virtual const SrsContextId &cid(); + virtual void set_cid(const SrsContextId &cid); +}; + +// Mock SrsSrtSource for testing SrsMpegtsSrtConn::on_srt_packet +class MockSrtSourceForPacket : public SrsSrtSource +{ +public: + int on_packet_called_count_; + srs_error_t on_packet_error_; + SrsSrtPacket *last_packet_; + +public: + MockSrtSourceForPacket(); + virtual ~MockSrtSourceForPacket(); + virtual srs_error_t on_packet(SrsSrtPacket *packet); +}; + +// Mock ISrsAppConfig for testing SrsMpegtsSrtConn HTTP hooks +class MockAppConfigForSrtHooks : public MockAppConfig +{ +public: + SrsConfDirective *on_connect_directive_; + SrsConfDirective *on_close_directive_; + SrsConfDirective *on_publish_directive_; + SrsConfDirective *on_unpublish_directive_; + SrsConfDirective *on_play_directive_; + SrsConfDirective *on_stop_directive_; + +public: + MockAppConfigForSrtHooks(); + virtual ~MockAppConfigForSrtHooks(); + virtual SrsConfDirective *get_vhost_on_connect(std::string vhost); + virtual SrsConfDirective *get_vhost_on_close(std::string vhost); + virtual SrsConfDirective *get_vhost_on_publish(std::string vhost); + virtual SrsConfDirective *get_vhost_on_unpublish(std::string vhost); + virtual SrsConfDirective *get_vhost_on_play(std::string vhost); + virtual SrsConfDirective *get_vhost_on_stop(std::string vhost); + void set_on_connect_urls(const std::vector &urls); + void set_on_close_urls(const std::vector &urls); + void set_on_publish_urls(const std::vector &urls); + void set_on_unpublish_urls(const std::vector &urls); + void set_on_play_urls(const std::vector &urls); + void set_on_stop_urls(const std::vector &urls); + void clear_on_connect_directive(); + void clear_on_close_directive(); + void clear_on_publish_directive(); + void clear_on_unpublish_directive(); + void clear_on_play_directive(); + void clear_on_stop_directive(); +}; + +// Mock ISrsHttpHooks for testing SrsMpegtsSrtConn HTTP hooks +class MockHttpHooksForSrt : public ISrsHttpHooks +{ +public: + std::vector > on_connect_calls_; + int on_connect_count_; + srs_error_t on_connect_error_; + std::vector > on_close_calls_; + int on_close_count_; + std::vector > on_publish_calls_; + int on_publish_count_; + srs_error_t on_publish_error_; + std::vector > on_unpublish_calls_; + int on_unpublish_count_; + std::vector > on_play_calls_; + int on_play_count_; + srs_error_t on_play_error_; + std::vector > on_stop_calls_; + int on_stop_count_; + +public: + MockHttpHooksForSrt(); + virtual ~MockHttpHooksForSrt(); + virtual srs_error_t on_connect(std::string url, ISrsRequest *req); + virtual void on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes); + virtual srs_error_t on_publish(std::string url, ISrsRequest *req); + virtual void on_unpublish(std::string url, ISrsRequest *req); + virtual srs_error_t on_play(std::string url, ISrsRequest *req); + virtual void on_stop(std::string url, ISrsRequest *req); + virtual srs_error_t on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file); + virtual srs_error_t on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, + std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration); + virtual srs_error_t on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify); + virtual srs_error_t discover_co_workers(std::string url, std::string &host, int &port); + virtual srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls); + void clear_calls(); +}; + +// Mock SrsProtocolUtility for testing discover_candidates +class MockProtocolUtility : public SrsProtocolUtility +{ +public: + std::vector mock_ips_; + +public: + MockProtocolUtility(); + virtual ~MockProtocolUtility(); + virtual std::vector &local_ips(); + void add_ip(std::string ip, std::string ifname, bool is_ipv4, bool is_loopback, bool is_internet); + void clear_ips(); +}; + +// Mock ISrsAppConfig for testing discover_candidates +class MockAppConfigForDiscoverCandidates : public MockAppConfig +{ +public: + std::string rtc_server_candidates_; + bool use_auto_detect_network_ip_; + std::string rtc_server_ip_family_; + +public: + MockAppConfigForDiscoverCandidates(); + virtual ~MockAppConfigForDiscoverCandidates(); + virtual std::string get_rtc_server_candidates(); + virtual bool get_use_auto_detect_network_ip(); + virtual std::string get_rtc_server_ip_family(); +}; + #endif diff --git a/trunk/src/utest/srs_utest_app6.cpp b/trunk/src/utest/srs_utest_app6.cpp index 2fb1c68f0..e2b6c3f59 100644 --- a/trunk/src/utest/srs_utest_app6.cpp +++ b/trunk/src/utest/srs_utest_app6.cpp @@ -2183,6 +2183,9 @@ MockAppConfig::MockAppConfig() rtc_to_rtmp_ = false; dash_dispose_ = 0; dash_enabled_ = false; + api_as_candidates_ = true; + resolve_api_domain_ = true; + keep_api_domain_ = false; } MockAppConfig::~MockAppConfig() @@ -2271,6 +2274,16 @@ bool MockAppConfig::get_srt_enabled(std::string vhost) return srt_enabled_; } +std::string MockAppConfig::get_srt_default_streamid() +{ + return "#!::r=live/livestream,m=request"; +} + +bool MockAppConfig::get_srt_to_rtmp(std::string vhost) +{ + return true; +} + bool MockAppConfig::get_rtc_to_rtmp(std::string vhost) { return rtc_to_rtmp_; @@ -2571,6 +2584,21 @@ void MockAppConfig::set_rtc_to_rtmp(bool enabled) rtc_to_rtmp_ = enabled; } +void MockAppConfig::set_api_as_candidates(bool enabled) +{ + api_as_candidates_ = enabled; +} + +void MockAppConfig::set_resolve_api_domain(bool enabled) +{ + resolve_api_domain_ = enabled; +} + +void MockAppConfig::set_keep_api_domain(bool enabled) +{ + keep_api_domain_ = enabled; +} + // Mock request implementation MockRtcAsyncCallRequest::MockRtcAsyncCallRequest(std::string vhost, std::string app, std::string stream) { diff --git a/trunk/src/utest/srs_utest_app6.hpp b/trunk/src/utest/srs_utest_app6.hpp index 7a38d4658..018b11341 100644 --- a/trunk/src/utest/srs_utest_app6.hpp +++ b/trunk/src/utest/srs_utest_app6.hpp @@ -244,6 +244,9 @@ public: bool rtc_to_rtmp_; srs_utime_t dash_dispose_; bool dash_enabled_; + bool api_as_candidates_; + bool resolve_api_domain_; + bool keep_api_domain_; public: MockAppConfig(); @@ -295,6 +298,12 @@ public: virtual std::vector get_rtc_server_listens() { return std::vector(); } virtual int get_rtc_server_reuseport() { return 1; } virtual bool get_rtc_server_encrypt() { return false; } + virtual bool get_api_as_candidates() { return api_as_candidates_; } + virtual bool get_resolve_api_domain() { return resolve_api_domain_; } + virtual bool get_keep_api_domain() { return keep_api_domain_; } + virtual std::string get_rtc_server_candidates() { return "*"; } + virtual bool get_use_auto_detect_network_ip() { return true; } + virtual std::string get_rtc_server_ip_family() { return "ipv4"; } virtual bool get_rtsp_server_enabled() { return false; } virtual std::vector get_rtsp_server_listens() { return std::vector(); } virtual std::vector get_srt_listens() { return std::vector(); } @@ -358,6 +367,8 @@ public: virtual bool get_rtc_twcc_enabled(std::string vhost); virtual bool get_srt_enabled(); virtual bool get_srt_enabled(std::string vhost); + virtual std::string get_srt_default_streamid(); + virtual bool get_srt_to_rtmp(std::string vhost); virtual bool get_rtc_to_rtmp(std::string vhost); virtual srs_utime_t get_rtc_stun_timeout(std::string vhost); virtual bool get_rtc_stun_strict_check(std::string vhost); @@ -448,6 +459,9 @@ public: void set_rtc_twcc_enabled(bool enabled); void set_srt_enabled(bool enabled); void set_rtc_to_rtmp(bool enabled); + void set_api_as_candidates(bool enabled); + void set_resolve_api_domain(bool enabled); + void set_keep_api_domain(bool enabled); }; // Mock request for testing SrsRtcAsyncCallOnStop diff --git a/trunk/src/utest/srs_utest_app7.cpp b/trunk/src/utest/srs_utest_app7.cpp index 28b3e101f..ed9b50125 100644 --- a/trunk/src/utest/srs_utest_app7.cpp +++ b/trunk/src/utest/srs_utest_app7.cpp @@ -1029,6 +1029,10 @@ void MockConnectionManagerForExpire::add_with_fast_id(uint64_t /*id*/, ISrsResou { } +void MockConnectionManagerForExpire::add_with_name(const std::string & /*name*/, ISrsResource * /*conn*/) +{ +} + ISrsResource *MockConnectionManagerForExpire::at(int /*index*/) { return NULL; diff --git a/trunk/src/utest/srs_utest_app7.hpp b/trunk/src/utest/srs_utest_app7.hpp index 88e0e9960..39c19be6d 100644 --- a/trunk/src/utest/srs_utest_app7.hpp +++ b/trunk/src/utest/srs_utest_app7.hpp @@ -107,6 +107,7 @@ public: virtual void add(ISrsResource *conn, bool *exists = NULL); virtual void add_with_id(const std::string &id, ISrsResource *conn); virtual void add_with_fast_id(uint64_t id, ISrsResource *conn); + virtual void add_with_name(const std::string &name, ISrsResource *conn); virtual ISrsResource *at(int index); virtual ISrsResource *find_by_id(std::string id); virtual ISrsResource *find_by_fast_id(uint64_t id); diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index 3f8e3ead9..13b5b963e 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -1379,6 +1379,10 @@ void MockConnectionManager::add_with_fast_id(uint64_t /*id*/, ISrsResource * /*c { } +void MockConnectionManager::add_with_name(const std::string & /*name*/, ISrsResource * /*conn*/) +{ +} + ISrsResource *MockConnectionManager::at(int /*index*/) { return NULL; diff --git a/trunk/src/utest/srs_utest_service.hpp b/trunk/src/utest/srs_utest_service.hpp index 562f7017e..21db40361 100644 --- a/trunk/src/utest/srs_utest_service.hpp +++ b/trunk/src/utest/srs_utest_service.hpp @@ -94,6 +94,7 @@ public: virtual void add(ISrsResource *conn, bool *exists = NULL); virtual void add_with_id(const std::string &id, ISrsResource *conn); virtual void add_with_fast_id(uint64_t id, ISrsResource *conn); + virtual void add_with_name(const std::string &name, ISrsResource *conn); virtual ISrsResource *at(int index); virtual ISrsResource *find_by_id(std::string id); virtual ISrsResource *find_by_fast_id(uint64_t id);