AI: Improve converage for app rtc module.

This commit is contained in:
winlin 2025-09-27 19:49:56 -04:00
parent c7821b4770
commit df3c776580
13 changed files with 2697 additions and 210 deletions

View File

@ -155,6 +155,16 @@ void SrsAppCasterFlv::remove(ISrsResource *c)
manager_->remove(c);
}
void SrsAppCasterFlv::subscribe(ISrsDisposingHandler *h)
{
manager_->subscribe(h);
}
void SrsAppCasterFlv::unsubscribe(ISrsDisposingHandler *h)
{
manager_->unsubscribe(h);
}
srs_error_t SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r)
{
SrsHttpMessage *msg = dynamic_cast<SrsHttpMessage *>(r);

View File

@ -71,6 +71,8 @@ public:
// Interface ISrsResourceManager
public:
virtual void remove(ISrsResource *c);
virtual void subscribe(ISrsDisposingHandler *h);
virtual void unsubscribe(ISrsDisposingHandler *h);
// Interface ISrsHttpHandler
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r);

View File

@ -294,6 +294,8 @@ public:
virtual bool get_srt_enabled() = 0;
virtual bool get_srt_enabled(std::string vhost) = 0;
virtual bool get_rtc_to_rtmp(std::string vhost) = 0;
virtual srs_utime_t get_rtc_stun_timeout(std::string vhost) = 0;
virtual bool get_rtc_stun_strict_check(std::string vhost) = 0;
};
// The config service provider.

View File

@ -69,6 +69,9 @@ extern SrsPps *_srs_pps_rnack2;
extern SrsPps *_srs_pps_pub;
extern SrsPps *_srs_pps_conn;
extern bool srs_sdp_has_h264_profile(const SrsMediaPayloadType &payload_type, const string &profile);
extern bool srs_sdp_has_h264_profile(const SrsSdp &sdp, const string &profile);
ISrsRtcTransport::ISrsRtcTransport()
{
}
@ -1001,6 +1004,8 @@ SrsRtcPublishTwccTimer::SrsRtcPublishTwccTimer(ISrsRtcRtcpSender *sender) : send
{
lock_ = srs_mutex_new();
_srs_shared_timer->timer100ms()->subscribe(this);
circuit_breaker_ = _srs_circuit_breaker;
}
SrsRtcPublishTwccTimer::~SrsRtcPublishTwccTimer()
@ -1010,6 +1015,8 @@ SrsRtcPublishTwccTimer::~SrsRtcPublishTwccTimer()
_srs_shared_timer->timer100ms()->unsubscribe(this);
}
srs_mutex_destroy(lock_);
circuit_breaker_ = NULL;
}
srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval)
@ -1036,7 +1043,7 @@ srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval)
++_srs_pps_twcc->sugar_;
// If circuit-breaker is dropping packet, disable TWCC.
if (_srs_circuit_breaker->hybrid_critical_water_level()) {
if (circuit_breaker_->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar_;
return err;
}
@ -1814,7 +1821,15 @@ void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp &n
}
}
SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(SrsRtcConnection *p) : p_(p)
ISrsRtcConnectionNackTimerHandler::ISrsRtcConnectionNackTimerHandler()
{
}
ISrsRtcConnectionNackTimerHandler::~ISrsRtcConnectionNackTimerHandler()
{
}
SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(ISrsRtcConnectionNackTimerHandler *handler) : handler_(handler)
{
lock_ = srs_mutex_new();
@ -1842,37 +1857,13 @@ srs_error_t SrsRtcConnectionNackTimer::initialize()
srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;
// This is a very heavy function, and it may potentially cause a coroutine switch.
// Therefore, during this function, the 'this' pointer might become invalid because
// the object could be freed by another thread. As a result, we must lock the object
// to prevent it from being freed.
SrsLocker(&lock_);
if (!p_->nack_enabled_) {
return err;
}
++_srs_pps_conn->sugar_;
// If circuit-breaker is enabled, disable nack.
if (circuit_breaker_->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar_;
return err;
}
std::map<std::string, SrsRtcPublishStream *>::iterator it;
for (it = p_->publishers_.begin(); it != p_->publishers_.end(); it++) {
SrsRtcPublishStream *publisher = it->second;
if ((err = publisher->check_send_nacks()) != srs_success) {
srs_warn("ignore nack err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
}
return err;
return handler_->do_check_send_nacks();
}
ISrsExecRtcAsyncTask::ISrsExecRtcAsyncTask()
@ -1890,6 +1881,8 @@ SrsRtcConnection::SrsRtcConnection(ISrsExecRtcAsyncTask *exec, const SrsContextI
exec_ = exec;
networks_ = new SrsRtcNetworks(this);
publisher_negotiator_ = new SrsRtcPublisherNegotiator();
player_negotiator_ = new SrsRtcPlayerNegotiator();
cache_iov_ = new iovec();
cache_iov_->iov_base = new char[kRtpPacketSize];
@ -1907,12 +1900,20 @@ SrsRtcConnection::SrsRtcConnection(ISrsExecRtcAsyncTask *exec, const SrsContextI
nack_enabled_ = false;
timer_nack_ = new SrsRtcConnectionNackTimer(this);
_srs_conn_manager->subscribe(this);
circuit_breaker_ = _srs_circuit_breaker;
conn_manager_ = _srs_conn_manager;
rtc_sources_ = _srs_rtc_sources;
config_ = _srs_config;
}
void SrsRtcConnection::assemble()
{
conn_manager_->subscribe(this);
}
SrsRtcConnection::~SrsRtcConnection()
{
_srs_conn_manager->unsubscribe(this);
conn_manager_->unsubscribe(this);
srs_freep(timer_nack_);
@ -1934,6 +1935,8 @@ SrsRtcConnection::~SrsRtcConnection()
// Free network over UDP or TCP.
srs_freep(networks_);
srs_freep(publisher_negotiator_);
srs_freep(player_negotiator_);
if (true) {
char *iov_base = (char *)cache_iov_->iov_base;
@ -1947,6 +1950,10 @@ SrsRtcConnection::~SrsRtcConnection()
// Optional to release the publisher token.
publish_token_ = NULL;
circuit_breaker_ = NULL;
conn_manager_ = NULL;
rtc_sources_ = NULL;
config_ = NULL;
}
void SrsRtcConnection::on_before_dispose(ISrsResource *c)
@ -2032,7 +2039,7 @@ std::string SrsRtcConnection::desc()
void SrsRtcConnection::expire()
{
// TODO: FIXME: Should set session to expired and remove it by heartbeat checking. Should not remove it directly.
_srs_conn_manager->remove(this);
conn_manager_->remove(this);
}
void SrsRtcConnection::switch_to_context()
@ -2054,16 +2061,16 @@ srs_error_t SrsRtcConnection::add_publisher(SrsRtcUserConfig *ruc, SrsSdp &local
SrsUniquePtr<SrsRtcSourceDescription> stream_desc(new SrsRtcSourceDescription());
// TODO: FIXME: Change to api of stream desc.
if ((err = negotiate_publish_capability(ruc, stream_desc.get())) != srs_success) {
if ((err = publisher_negotiator_->negotiate_publish_capability(ruc, stream_desc.get())) != srs_success) {
return srs_error_wrap(err, "publish negotiate, offer=%s", srs_strings_replace(ruc->remote_sdp_str_.c_str(), "\r\n", "\\r\\n").c_str());
}
if ((err = generate_publish_local_sdp(req, local_sdp, stream_desc.get(), ruc->remote_sdp_.is_unified(), ruc->audio_before_video_)) != srs_success) {
if ((err = publisher_negotiator_->generate_publish_local_sdp(req, local_sdp, stream_desc.get(), ruc->remote_sdp_.is_unified(), ruc->audio_before_video_)) != srs_success) {
return srs_error_wrap(err, "generate local sdp");
}
SrsSharedPtr<SrsRtcSource> source;
if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) {
if ((err = rtc_sources_->fetch_or_create(req, source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -2093,7 +2100,7 @@ srs_error_t SrsRtcConnection::add_player(SrsRtcUserConfig *ruc, SrsSdp &local_sd
ISrsRequest *req = ruc->req_;
std::map<uint32_t, SrsRtcTrackDescription *> play_sub_relations;
if ((err = negotiate_play_capability(ruc, play_sub_relations)) != srs_success) {
if ((err = player_negotiator_->negotiate_play_capability(ruc, play_sub_relations)) != srs_success) {
return srs_error_wrap(err, "play negotiate, offer=%s", srs_strings_replace(ruc->remote_sdp_str_.c_str(), "\r\n", "\\r\\n").c_str());
}
@ -2117,7 +2124,7 @@ srs_error_t SrsRtcConnection::add_player(SrsRtcUserConfig *ruc, SrsSdp &local_sd
++it;
}
if ((err = generate_play_local_sdp(req, local_sdp, stream_desc.get(), ruc->remote_sdp_.is_unified(), ruc->audio_before_video_)) != srs_success) {
if ((err = player_negotiator_->generate_play_local_sdp(req, local_sdp, stream_desc.get(), ruc->remote_sdp_.is_unified(), ruc->audio_before_video_)) != srs_success) {
return srs_error_wrap(err, "generate local sdp");
}
@ -2142,15 +2149,23 @@ srs_error_t SrsRtcConnection::initialize(ISrsRequest *r, bool dtls, bool srtp, s
}
// TODO: FIXME: Support reload.
session_timeout_ = _srs_config->get_rtc_stun_timeout(req_->vhost_);
session_timeout_ = config_->get_rtc_stun_timeout(req_->vhost_);
last_stun_time_ = srs_time_now_cached();
nack_enabled_ = _srs_config->get_rtc_nack_enabled(req_->vhost_);
nack_enabled_ = config_->get_rtc_nack_enabled(req_->vhost_);
if ((err = timer_nack_->initialize()) != srs_success) {
return srs_error_wrap(err, "initialize timer nack");
}
if ((err = publisher_negotiator_->initialize(r)) != srs_success) {
return srs_error_wrap(err, "initialize publisher negotiator");
}
if ((err = player_negotiator_->initialize(r)) != srs_success) {
return srs_error_wrap(err, "initialize player negotiator");
}
srs_trace("RTC init session, user=%s, url=%s, encrypt=%u/%u, DTLS(role=%s, version=%s), timeout=%dms, nack=%d",
username.c_str(), r->get_stream_url().c_str(), dtls, srtp, cfg->dtls_role_.c_str(), cfg->dtls_version_.c_str(),
srsu2msi(session_timeout_), nack_enabled_);
@ -2379,7 +2394,7 @@ srs_error_t SrsRtcConnection::on_dtls_alert(std::string type, std::string desc)
switch_to_context();
srs_trace("RTC: session destroy by DTLS alert(%s %s), username=%s", type.c_str(), desc.c_str(), username_.c_str());
_srs_conn_manager->remove(this);
conn_manager_->remove(this);
}
return err;
@ -2425,12 +2440,14 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data)
void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver *nack, uint32_t ssrc, uint32_t &sent_nacks, uint32_t &timeout_nacks)
{
srs_error_t err = srs_success;
++_srs_pps_snack->sugar_;
SrsRtcpNack rtcpNack(ssrc);
// If circuit-breaker is enabled, disable nack.
if (_srs_circuit_breaker->hybrid_high_water_level()) {
if (circuit_breaker_->hybrid_high_water_level()) {
++_srs_pps_snack4->sugar_;
} else {
rtcpNack.set_media_ssrc(ssrc);
@ -2448,10 +2465,14 @@ void SrsRtcConnection::check_send_nacks(SrsRtpNackForReceiver *nack, uint32_t ss
SrsBuffer stream(buf, sizeof(buf));
// TODO: FIXME: Check error.
rtcpNack.encode(&stream);
if ((err = rtcpNack.encode(&stream)) != srs_success) {
srs_freep(err);
return;
}
// TODO: FIXME: Check error.
send_rtcp(stream.data(), stream.pos());
err = send_rtcp(stream.data(), stream.pos());
srs_freep(err);
}
srs_error_t SrsRtcConnection::send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer *rtp_queue, const uint64_t &last_send_systime, const SrsNtp &last_send_ntp)
@ -2566,6 +2587,35 @@ srs_error_t SrsRtcConnection::send_rtcp_fb_pli(uint32_t ssrc, const SrsContextId
return send_rtcp(stream.data(), stream.pos());
}
srs_error_t SrsRtcConnection::do_check_send_nacks()
{
srs_error_t err = srs_success;
if (!nack_enabled_) {
return err;
}
++_srs_pps_conn->sugar_;
// If circuit-breaker is enabled, disable nack.
if (circuit_breaker_->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar_;
return err;
}
std::map<std::string, SrsRtcPublishStream *>::iterator it;
for (it = publishers_.begin(); it != publishers_.end(); it++) {
SrsRtcPublishStream *publisher = it->second;
if ((err = publisher->check_send_nacks()) != srs_success) {
srs_warn("ignore nack err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
}
return err;
}
void SrsRtcConnection::simulate_nack_drop(int nn)
{
for (map<string, SrsRtcPublishStream *>::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
@ -2663,7 +2713,7 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket *r, string &ice_p
++_srs_pps_sstuns->sugar_;
bool strict_check = _srs_config->get_rtc_stun_strict_check(req_->vhost_);
bool strict_check = config_->get_rtc_stun_strict_check(req_->vhost_);
if (strict_check && r->get_ice_controlled()) {
// @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1
// TODO: Send 487 (Role Conflict) error response.
@ -2676,6 +2726,184 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket *r, string &ice_p
return err;
}
srs_error_t SrsRtcConnection::create_player(ISrsRequest *req, std::map<uint32_t, SrsRtcTrackDescription *> sub_relations)
{
srs_error_t err = srs_success;
// Ignore if exists.
if (players_.end() != players_.find(req->get_stream_url())) {
return err;
}
SrsRtcPlayStream *player = new SrsRtcPlayStream(exec_, this, this, _srs_context->get_id());
if ((err = player->initialize(req, sub_relations)) != srs_success) {
srs_freep(player);
return srs_error_wrap(err, "SrsRtcPlayStream init");
}
players_.insert(make_pair(req->get_stream_url(), player));
// make map between ssrc and player for fastly searching
for (map<uint32_t, SrsRtcTrackDescription *>::iterator it = sub_relations.begin(); it != sub_relations.end(); ++it) {
SrsRtcTrackDescription *track_desc = it->second;
map<uint32_t, SrsRtcPlayStream *>::iterator it_player = players_ssrc_map_.find(track_desc->ssrc_);
if ((players_ssrc_map_.end() != it_player) && (player != it_player->second)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate ssrc %d, track id: %s",
track_desc->ssrc_, track_desc->id_.c_str());
}
players_ssrc_map_[track_desc->ssrc_] = player;
if (0 != track_desc->fec_ssrc_) {
if (players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->fec_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate fec ssrc %d, track id: %s",
track_desc->fec_ssrc_, track_desc->id_.c_str());
}
players_ssrc_map_[track_desc->fec_ssrc_] = player;
}
if (0 != track_desc->rtx_ssrc_) {
if (players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->rtx_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate rtx ssrc %d, track id: %s",
track_desc->rtx_ssrc_, track_desc->id_.c_str());
}
players_ssrc_map_[track_desc->rtx_ssrc_] = player;
}
}
// TODO: FIXME: Support reload.
// The TWCC ID is the ext-map ID in local SDP, and we set to enable GCC.
// Whatever the ext-map, we will disable GCC when config disable it.
int twcc_id = 0;
if (true) {
std::map<uint32_t, SrsRtcTrackDescription *>::iterator it = sub_relations.begin();
while (it != sub_relations.end()) {
if (it->second->type_ == "video") {
SrsRtcTrackDescription *track = it->second;
twcc_id = track->get_rtp_extension_id(kTWCCExt);
}
++it;
}
}
srs_trace("RTC connection player gcc=%d", twcc_id);
// TODO: Start player when DTLS done. Removed it because we don't support single PC now.
// If DTLS done, start the player. Because maybe create some players after DTLS done.
// For example, for single PC, we maybe start publisher when create it, because DTLS is done.
return err;
}
srs_error_t SrsRtcConnection::create_publisher(ISrsRequest *req, SrsRtcSourceDescription *stream_desc)
{
srs_error_t err = srs_success;
srs_assert(stream_desc);
// Ignore if exists.
if (publishers_.end() != publishers_.find(req->get_stream_url())) {
return err;
}
SrsRtcPublishStream *publisher = new SrsRtcPublishStream(exec_, this, this, _srs_context->get_id());
if ((err = publisher->initialize(req, stream_desc)) != srs_success) {
srs_freep(publisher);
return srs_error_wrap(err, "rtc publisher init");
}
publishers_[req->get_stream_url()] = publisher;
if (NULL != stream_desc->audio_track_desc_) {
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate ssrc %d, track id: %s",
stream_desc->audio_track_desc_->ssrc_, stream_desc->audio_track_desc_->id_.c_str());
}
publishers_ssrc_map_[stream_desc->audio_track_desc_->ssrc_] = publisher;
if (0 != stream_desc->audio_track_desc_->fec_ssrc_ && stream_desc->audio_track_desc_->ssrc_ != stream_desc->audio_track_desc_->fec_ssrc_) {
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->fec_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate fec ssrc %d, track id: %s",
stream_desc->audio_track_desc_->fec_ssrc_, stream_desc->audio_track_desc_->id_.c_str());
}
publishers_ssrc_map_[stream_desc->audio_track_desc_->fec_ssrc_] = publisher;
}
if (0 != stream_desc->audio_track_desc_->rtx_ssrc_ && stream_desc->audio_track_desc_->ssrc_ != stream_desc->audio_track_desc_->rtx_ssrc_) {
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->rtx_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate rtx ssrc %d, track id: %s",
stream_desc->audio_track_desc_->rtx_ssrc_, stream_desc->audio_track_desc_->id_.c_str());
}
publishers_ssrc_map_[stream_desc->audio_track_desc_->rtx_ssrc_] = publisher;
}
}
for (int i = 0; i < (int)stream_desc->video_track_descs_.size(); ++i) {
SrsRtcTrackDescription *track_desc = stream_desc->video_track_descs_.at(i);
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate ssrc %d, track id: %s",
track_desc->ssrc_, track_desc->id_.c_str());
}
publishers_ssrc_map_[track_desc->ssrc_] = publisher;
if (0 != track_desc->fec_ssrc_ && track_desc->ssrc_ != track_desc->fec_ssrc_) {
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->fec_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate fec ssrc %d, track id: %s",
track_desc->fec_ssrc_, track_desc->id_.c_str());
}
publishers_ssrc_map_[track_desc->fec_ssrc_] = publisher;
}
if (0 != track_desc->rtx_ssrc_ && track_desc->ssrc_ != track_desc->rtx_ssrc_) {
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->rtx_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate rtx ssrc %d, track id: %s",
track_desc->rtx_ssrc_, track_desc->id_.c_str());
}
publishers_ssrc_map_[track_desc->rtx_ssrc_] = publisher;
}
}
// TODO: Start player when DTLS done. Removed it because we don't support single PC now.
// If DTLS done, start the publisher. Because maybe create some publishers after DTLS done.
// For example, for single PC, we maybe start publisher when create it, because DTLS is done.
return err;
}
SrsRtcPublisherNegotiator::SrsRtcPublisherNegotiator()
{
req_ = NULL;
config_ = _srs_config;
}
SrsRtcPublisherNegotiator::~SrsRtcPublisherNegotiator()
{
srs_freep(req_);
config_ = NULL;
}
srs_error_t SrsRtcPublisherNegotiator::initialize(ISrsRequest *r)
{
req_ = r->copy();
return srs_success;
}
SrsRtcPlayerNegotiator::SrsRtcPlayerNegotiator()
{
req_ = NULL;
config_ = _srs_config;
rtc_sources_ = _srs_rtc_sources;
}
SrsRtcPlayerNegotiator::~SrsRtcPlayerNegotiator()
{
srs_freep(req_);
config_ = NULL;
rtc_sources_ = NULL;
}
srs_error_t SrsRtcPlayerNegotiator::initialize(ISrsRequest *r)
{
req_ = r->copy();
return srs_success;
}
bool srs_sdp_has_h264_profile(const SrsMediaPayloadType &payload_type, const string &profile)
{
srs_error_t err = srs_success;
@ -2767,7 +2995,7 @@ bool srs_sdp_has_h265_profile(const SrsSdp &sdp, const string &profile)
return false;
}
srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRtcUserConfig *ruc, SrsRtcSourceDescription *stream_desc)
srs_error_t SrsRtcPublisherNegotiator::negotiate_publish_capability(SrsRtcUserConfig *ruc, SrsRtcSourceDescription *stream_desc)
{
srs_error_t err = srs_success;
@ -2778,8 +3006,8 @@ srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRtcUserConfig *ruc
ISrsRequest *req = ruc->req_;
const SrsSdp &remote_sdp = ruc->remote_sdp_;
bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost_);
bool twcc_enabled = _srs_config->get_rtc_twcc_enabled(req->vhost_);
bool nack_enabled = config_->get_rtc_nack_enabled(req->vhost_);
bool twcc_enabled = config_->get_rtc_twcc_enabled(req->vhost_);
// TODO: FIME: Should check packetization-mode=1 also.
bool has_42e01f = srs_sdp_has_h264_profile(remote_sdp, "42e01f");
@ -3068,7 +3296,7 @@ srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRtcUserConfig *ruc
return err;
}
srs_error_t SrsRtcConnection::generate_publish_local_sdp(ISrsRequest *req, SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan, bool audio_before_video)
srs_error_t SrsRtcPublisherNegotiator::generate_publish_local_sdp(ISrsRequest *req, SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan, bool audio_before_video)
{
srs_error_t err = srs_success;
@ -3112,7 +3340,7 @@ srs_error_t SrsRtcConnection::generate_publish_local_sdp(ISrsRequest *req, SrsSd
return err;
}
srs_error_t SrsRtcConnection::generate_publish_local_sdp_for_audio(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc)
srs_error_t SrsRtcPublisherNegotiator::generate_publish_local_sdp_for_audio(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc)
{
srs_error_t err = srs_success;
@ -3153,7 +3381,7 @@ srs_error_t SrsRtcConnection::generate_publish_local_sdp_for_audio(SrsSdp &local
return err;
}
srs_error_t SrsRtcConnection::generate_publish_local_sdp_for_video(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan)
srs_error_t SrsRtcPublisherNegotiator::generate_publish_local_sdp_for_video(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan)
{
srs_error_t err = srs_success;
@ -3203,18 +3431,18 @@ srs_error_t SrsRtcConnection::generate_publish_local_sdp_for_video(SrsSdp &local
return err;
}
srs_error_t SrsRtcConnection::negotiate_play_capability(SrsRtcUserConfig *ruc, std::map<uint32_t, SrsRtcTrackDescription *> &sub_relations)
srs_error_t SrsRtcPlayerNegotiator::negotiate_play_capability(SrsRtcUserConfig *ruc, std::map<uint32_t, SrsRtcTrackDescription *> &sub_relations)
{
srs_error_t err = srs_success;
ISrsRequest *req = ruc->req_;
const SrsSdp &remote_sdp = ruc->remote_sdp_;
bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost_);
bool twcc_enabled = _srs_config->get_rtc_twcc_enabled(req->vhost_);
bool nack_enabled = config_->get_rtc_nack_enabled(req->vhost_);
bool twcc_enabled = config_->get_rtc_twcc_enabled(req->vhost_);
SrsSharedPtr<SrsRtcSource> source;
if ((err = _srs_rtc_sources->fetch_or_create(req, source)) != srs_success) {
if ((err = rtc_sources_->fetch_or_create(req, source)) != srs_success) {
return srs_error_wrap(err, "fetch rtc source");
}
@ -3446,7 +3674,7 @@ void video_track_generate_play_offer(SrsRtcTrackDescription *track, string mid,
}
}
srs_error_t SrsRtcConnection::generate_play_local_sdp(ISrsRequest *req, SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan, bool audio_before_video)
srs_error_t SrsRtcPlayerNegotiator::generate_play_local_sdp(ISrsRequest *req, SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan, bool audio_before_video)
{
srs_error_t err = srs_success;
@ -3492,7 +3720,7 @@ srs_error_t SrsRtcConnection::generate_play_local_sdp(ISrsRequest *req, SrsSdp &
return err;
}
srs_error_t SrsRtcConnection::generate_play_local_sdp_for_audio(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, std::string cname)
srs_error_t SrsRtcPlayerNegotiator::generate_play_local_sdp_for_audio(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, std::string cname)
{
srs_error_t err = srs_success;
@ -3558,7 +3786,7 @@ srs_error_t SrsRtcConnection::generate_play_local_sdp_for_audio(SrsSdp &local_sd
return err;
}
srs_error_t SrsRtcConnection::generate_play_local_sdp_for_video(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan, std::string cname)
srs_error_t SrsRtcPlayerNegotiator::generate_play_local_sdp_for_video(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan, std::string cname)
{
srs_error_t err = srs_success;
@ -3599,143 +3827,3 @@ srs_error_t SrsRtcConnection::generate_play_local_sdp_for_video(SrsSdp &local_sd
return err;
}
srs_error_t SrsRtcConnection::create_player(ISrsRequest *req, std::map<uint32_t, SrsRtcTrackDescription *> sub_relations)
{
srs_error_t err = srs_success;
// Ignore if exists.
if (players_.end() != players_.find(req->get_stream_url())) {
return err;
}
SrsRtcPlayStream *player = new SrsRtcPlayStream(exec_, this, this, _srs_context->get_id());
if ((err = player->initialize(req, sub_relations)) != srs_success) {
srs_freep(player);
return srs_error_wrap(err, "SrsRtcPlayStream init");
}
players_.insert(make_pair(req->get_stream_url(), player));
// make map between ssrc and player for fastly searching
for (map<uint32_t, SrsRtcTrackDescription *>::iterator it = sub_relations.begin(); it != sub_relations.end(); ++it) {
SrsRtcTrackDescription *track_desc = it->second;
map<uint32_t, SrsRtcPlayStream *>::iterator it_player = players_ssrc_map_.find(track_desc->ssrc_);
if ((players_ssrc_map_.end() != it_player) && (player != it_player->second)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate ssrc %d, track id: %s",
track_desc->ssrc_, track_desc->id_.c_str());
}
players_ssrc_map_[track_desc->ssrc_] = player;
if (0 != track_desc->fec_ssrc_) {
if (players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->fec_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate fec ssrc %d, track id: %s",
track_desc->fec_ssrc_, track_desc->id_.c_str());
}
players_ssrc_map_[track_desc->fec_ssrc_] = player;
}
if (0 != track_desc->rtx_ssrc_) {
if (players_ssrc_map_.end() != players_ssrc_map_.find(track_desc->rtx_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, "duplicate rtx ssrc %d, track id: %s",
track_desc->rtx_ssrc_, track_desc->id_.c_str());
}
players_ssrc_map_[track_desc->rtx_ssrc_] = player;
}
}
// TODO: FIXME: Support reload.
// The TWCC ID is the ext-map ID in local SDP, and we set to enable GCC.
// Whatever the ext-map, we will disable GCC when config disable it.
int twcc_id = 0;
if (true) {
std::map<uint32_t, SrsRtcTrackDescription *>::iterator it = sub_relations.begin();
while (it != sub_relations.end()) {
if (it->second->type_ == "video") {
SrsRtcTrackDescription *track = it->second;
twcc_id = track->get_rtp_extension_id(kTWCCExt);
}
++it;
}
}
srs_trace("RTC connection player gcc=%d", twcc_id);
// TODO: Start player when DTLS done. Removed it because we don't support single PC now.
// If DTLS done, start the player. Because maybe create some players after DTLS done.
// For example, for single PC, we maybe start publisher when create it, because DTLS is done.
return err;
}
srs_error_t SrsRtcConnection::create_publisher(ISrsRequest *req, SrsRtcSourceDescription *stream_desc)
{
srs_error_t err = srs_success;
srs_assert(stream_desc);
// Ignore if exists.
if (publishers_.end() != publishers_.find(req->get_stream_url())) {
return err;
}
SrsRtcPublishStream *publisher = new SrsRtcPublishStream(exec_, this, this, _srs_context->get_id());
if ((err = publisher->initialize(req, stream_desc)) != srs_success) {
srs_freep(publisher);
return srs_error_wrap(err, "rtc publisher init");
}
publishers_[req->get_stream_url()] = publisher;
if (NULL != stream_desc->audio_track_desc_) {
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate ssrc %d, track id: %s",
stream_desc->audio_track_desc_->ssrc_, stream_desc->audio_track_desc_->id_.c_str());
}
publishers_ssrc_map_[stream_desc->audio_track_desc_->ssrc_] = publisher;
if (0 != stream_desc->audio_track_desc_->fec_ssrc_ && stream_desc->audio_track_desc_->ssrc_ != stream_desc->audio_track_desc_->fec_ssrc_) {
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->fec_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate fec ssrc %d, track id: %s",
stream_desc->audio_track_desc_->fec_ssrc_, stream_desc->audio_track_desc_->id_.c_str());
}
publishers_ssrc_map_[stream_desc->audio_track_desc_->fec_ssrc_] = publisher;
}
if (0 != stream_desc->audio_track_desc_->rtx_ssrc_ && stream_desc->audio_track_desc_->ssrc_ != stream_desc->audio_track_desc_->rtx_ssrc_) {
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(stream_desc->audio_track_desc_->rtx_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate rtx ssrc %d, track id: %s",
stream_desc->audio_track_desc_->rtx_ssrc_, stream_desc->audio_track_desc_->id_.c_str());
}
publishers_ssrc_map_[stream_desc->audio_track_desc_->rtx_ssrc_] = publisher;
}
}
for (int i = 0; i < (int)stream_desc->video_track_descs_.size(); ++i) {
SrsRtcTrackDescription *track_desc = stream_desc->video_track_descs_.at(i);
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate ssrc %d, track id: %s",
track_desc->ssrc_, track_desc->id_.c_str());
}
publishers_ssrc_map_[track_desc->ssrc_] = publisher;
if (0 != track_desc->fec_ssrc_ && track_desc->ssrc_ != track_desc->fec_ssrc_) {
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->fec_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate fec ssrc %d, track id: %s",
track_desc->fec_ssrc_, track_desc->id_.c_str());
}
publishers_ssrc_map_[track_desc->fec_ssrc_] = publisher;
}
if (0 != track_desc->rtx_ssrc_ && track_desc->ssrc_ != track_desc->rtx_ssrc_) {
if (publishers_ssrc_map_.end() != publishers_ssrc_map_.find(track_desc->rtx_ssrc_)) {
return srs_error_new(ERROR_RTC_DUPLICATED_SSRC, " duplicate rtx ssrc %d, track id: %s",
track_desc->rtx_ssrc_, track_desc->id_.c_str());
}
publishers_ssrc_map_[track_desc->rtx_ssrc_] = publisher;
}
}
// TODO: Start player when DTLS done. Removed it because we don't support single PC now.
// If DTLS done, start the publisher. Because maybe create some publishers after DTLS done.
// For example, for single PC, we maybe start publisher when create it, because DTLS is done.
return err;
}

View File

@ -62,6 +62,8 @@ class ISrsStatistic;
class ISrsExecRtcAsyncTask;
class ISrsSrtSourceManager;
class ISrsLiveSourceManager;
class SrsRtcPublisherNegotiator;
class SrsRtcPlayerNegotiator;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
@ -354,6 +356,9 @@ private:
// A fast timer for publish stream, for TWCC feedback.
class SrsRtcPublishTwccTimer : public ISrsFastTimerHandler
{
private:
ISrsCircuitBreaker *circuit_breaker_;
private:
ISrsRtcRtcpSender *sender_;
srs_mutex_t lock_;
@ -502,6 +507,17 @@ private:
void update_send_report_time(uint32_t ssrc, const SrsNtp &ntp, uint32_t rtp_time);
};
// The handler for RTC connection nack timer.
class ISrsRtcConnectionNackTimerHandler
{
public:
ISrsRtcConnectionNackTimerHandler();
virtual ~ISrsRtcConnectionNackTimerHandler();
public:
virtual srs_error_t do_check_send_nacks() = 0;
};
// A fast timer for conntion, for NACK feedback.
class SrsRtcConnectionNackTimer : public ISrsFastTimerHandler
{
@ -510,11 +526,11 @@ private:
ISrsCircuitBreaker *circuit_breaker_;
private:
SrsRtcConnection *p_;
ISrsRtcConnectionNackTimerHandler *handler_;
srs_mutex_t lock_;
public:
SrsRtcConnectionNackTimer(SrsRtcConnection *p);
SrsRtcConnectionNackTimer(ISrsRtcConnectionNackTimerHandler *handler);
virtual ~SrsRtcConnectionNackTimer();
public:
@ -540,20 +556,25 @@ public:
//
// For performance, we use non-public from resource,
// see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a
class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, public ISrsExpire, public ISrsRtcPacketSender, public ISrsRtcPacketReceiver
class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, public ISrsExpire, public ISrsRtcPacketSender, public ISrsRtcPacketReceiver, public ISrsRtcConnectionNackTimerHandler
{
friend class SrsSecurityTransport;
private:
friend class SrsRtcConnectionNackTimer;
ISrsCircuitBreaker *circuit_breaker_;
ISrsResourceManager *conn_manager_;
ISrsRtcSourceManager *rtc_sources_;
ISrsAppConfig *config_;
private:
SrsRtcConnectionNackTimer *timer_nack_;
ISrsExecRtcAsyncTask *exec_;
SrsRtcPublisherNegotiator *publisher_negotiator_;
SrsRtcPlayerNegotiator *player_negotiator_;
public:
bool disposing_;
private:
ISrsExecRtcAsyncTask *exec_;
private:
iovec *cache_iov_;
SrsBuffer *cache_buffer_;
@ -604,7 +625,9 @@ private:
public:
SrsRtcConnection(ISrsExecRtcAsyncTask *exec, const SrsContextId &cid);
void assemble(); // Construct object, to avoid call function in constructor.
virtual ~SrsRtcConnection();
// interface ISrsDisposingHandler
public:
virtual void on_before_dispose(ISrsResource *c);
@ -681,6 +704,10 @@ public:
srs_error_t send_rtcp_xr_rrtr(uint32_t ssrc);
srs_error_t send_rtcp_fb_pli(uint32_t ssrc, const SrsContextId &cid_of_subscriber);
// interface ISrsRtcConnectionNackTimerHandler
public:
virtual srs_error_t do_check_send_nacks();
public:
// Simulate the NACK to drop nn packets.
void simulate_nack_drop(int nn);
@ -694,19 +721,50 @@ public:
srs_error_t on_binding_request(SrsStunPacket *r, std::string &ice_pwd);
private:
srs_error_t create_player(ISrsRequest *request, std::map<uint32_t, SrsRtcTrackDescription *> sub_relations);
srs_error_t create_publisher(ISrsRequest *request, SrsRtcSourceDescription *stream_desc);
};
// Negotiate via SDP exchange for WebRTC publisher.
class SrsRtcPublisherNegotiator
{
private:
ISrsRequest *req_;
ISrsAppConfig *config_;
public:
SrsRtcPublisherNegotiator();
virtual ~SrsRtcPublisherNegotiator();
public:
virtual srs_error_t initialize(ISrsRequest *r);
// publish media capabilitiy negotiate
srs_error_t negotiate_publish_capability(SrsRtcUserConfig *ruc, SrsRtcSourceDescription *stream_desc);
srs_error_t generate_publish_local_sdp(ISrsRequest *req, SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan, bool audio_before_video);
srs_error_t generate_publish_local_sdp_for_audio(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc);
srs_error_t generate_publish_local_sdp_for_video(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan);
};
// Negotiate via SDP exchange for WebRTC player.
class SrsRtcPlayerNegotiator
{
private:
ISrsRequest *req_;
ISrsAppConfig *config_;
ISrsRtcSourceManager *rtc_sources_;
public:
SrsRtcPlayerNegotiator();
virtual ~SrsRtcPlayerNegotiator();
public:
virtual srs_error_t initialize(ISrsRequest *r);
// play media capabilitiy negotiate
// TODO: Use StreamDescription to negotiate and remove first negotiate_play_capability function
srs_error_t negotiate_play_capability(SrsRtcUserConfig *ruc, std::map<uint32_t, SrsRtcTrackDescription *> &sub_relations);
srs_error_t generate_play_local_sdp(ISrsRequest *req, SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan, bool audio_before_video);
srs_error_t generate_play_local_sdp_for_audio(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, std::string cname);
srs_error_t generate_play_local_sdp_for_video(SrsSdp &local_sdp, SrsRtcSourceDescription *stream_desc, bool unified_plan, std::string cname);
srs_error_t create_player(ISrsRequest *request, std::map<uint32_t, SrsRtcTrackDescription *> sub_relations);
srs_error_t create_publisher(ISrsRequest *request, SrsRtcSourceDescription *stream_desc);
};
#endif

View File

@ -365,6 +365,8 @@ srs_error_t SrsRtcSessionManager::create_rtc_session(SrsRtcUserConfig *ruc, SrsS
// TODO: FIXME: add do_create_session to error process.
SrsContextId cid = _srs_context->get_id();
SrsRtcConnection *session = new SrsRtcConnection(this, cid);
session->assemble();
if ((err = do_create_rtc_session(ruc, local_sdp, session)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "create session");

View File

@ -87,6 +87,11 @@ public:
// in the same coroutine or another coroutine. Some manager may support add c to a map, it
// should always free it even if it's in the map.
virtual void remove(ISrsResource *c) = 0;
public:
// Subscribe the handler to be notified when before-dispose and disposing.
virtual void subscribe(ISrsDisposingHandler *h) = 0;
virtual void unsubscribe(ISrsDisposingHandler *h) = 0;
};
// The resource manager remove resource and delete it asynchronously.

View File

@ -2230,6 +2230,16 @@ bool MockAppConfig::get_rtc_to_rtmp(std::string vhost)
return rtc_to_rtmp_;
}
srs_utime_t MockAppConfig::get_rtc_stun_timeout(std::string vhost)
{
return 30 * SRS_UTIME_SECONDS; // Default 30 seconds timeout
}
bool MockAppConfig::get_rtc_stun_strict_check(std::string vhost)
{
return false; // Default to non-strict mode
}
void MockAppConfig::set_http_hooks_enabled(bool enabled)
{
http_hooks_enabled_ = enabled;
@ -3793,17 +3803,20 @@ VOID TEST(SrsRtcPublishTwccTimerTest, OnTimer)
// Test 4: Circuit breaker in critical state - should return early without calling send_periodic_twcc
if (true) {
mock_sender->reset();
mock_sender->set_sender_started(true);
mock_sender->set_sender_twcc_enabled(true);
// Mock circuit breaker to be in critical state
// Mock circuit breaker to be in critical state - must be set BEFORE creating timer
MockCircuitBreaker mock_circuit_breaker;
mock_circuit_breaker.hybrid_critical_water_level_ = true;
ISrsCircuitBreaker *original_circuit_breaker = _srs_circuit_breaker;
_srs_circuit_breaker = &mock_circuit_breaker;
HELPER_EXPECT_SUCCESS(timer->on_timer(100 * SRS_UTIME_MILLISECONDS));
// Create new timer with the mock circuit breaker
SrsUniquePtr<SrsRtcPublishTwccTimer> timer_with_mock_cb(new SrsRtcPublishTwccTimer(mock_sender));
mock_sender->reset();
mock_sender->set_sender_started(true);
mock_sender->set_sender_twcc_enabled(true);
HELPER_EXPECT_SUCCESS(timer_with_mock_cb->on_timer(100 * SRS_UTIME_MILLISECONDS));
// Verify send_periodic_twcc was not called due to circuit breaker
EXPECT_EQ(0, mock_sender->send_periodic_twcc_count_);

View File

@ -256,6 +256,8 @@ public:
virtual bool get_srt_enabled();
virtual bool get_srt_enabled(std::string vhost);
virtual bool get_rtc_to_rtmp(std::string vhost);
virtual srs_utime_t get_rtc_stun_timeout(std::string vhost);
virtual bool get_rtc_stun_strict_check(std::string vhost);
void set_http_hooks_enabled(bool enabled);
void set_on_stop_urls(const std::vector<std::string> &urls);
void clear_on_stop_directive();

File diff suppressed because it is too large Load Diff

View File

@ -14,7 +14,11 @@
#include <srs_app_rtc_conn.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_kernel_rtc_queue.hpp>
#include <srs_utest_app6.hpp>
#include <srs_utest_app2.hpp>
#include <srs_utest_service.hpp>
#include <srs_utest_kernel3.hpp>
// Mock video recv track for testing check_send_nacks
class MockRtcVideoRecvTrackForNack : public SrsRtcVideoRecvTrack
@ -50,4 +54,109 @@ public:
void reset();
};
// Mock NACK timer handler for testing SrsRtcConnectionNackTimer
class MockRtcConnectionNackTimerHandler : public ISrsRtcConnectionNackTimerHandler
{
public:
srs_error_t do_check_send_nacks_error_;
int do_check_send_nacks_count_;
public:
MockRtcConnectionNackTimerHandler();
virtual ~MockRtcConnectionNackTimerHandler();
public:
virtual srs_error_t do_check_send_nacks();
void set_do_check_send_nacks_error(srs_error_t err);
void reset();
};
// Mock RTC connection for testing on_before_dispose
class MockRtcConnectionForDispose : public ISrsResource
{
public:
SrsContextId cid_;
std::string desc_;
bool disposing_;
public:
MockRtcConnectionForDispose();
virtual ~MockRtcConnectionForDispose();
public:
virtual const SrsContextId &get_id();
virtual std::string desc();
void set_disposing(bool disposing);
};
// Mock connection manager for testing expire functionality
class MockConnectionManagerForExpire : public ISrsResourceManager
{
public:
ISrsResource *removed_resource_;
int remove_count_;
public:
MockConnectionManagerForExpire();
virtual ~MockConnectionManagerForExpire();
public:
virtual void remove(ISrsResource *c);
virtual void subscribe(ISrsDisposingHandler *h);
virtual void unsubscribe(ISrsDisposingHandler *h);
void reset();
};
// Mock NACK receiver for testing check_send_nacks
class MockRtpNackForReceiver : public SrsRtpNackForReceiver
{
public:
uint32_t timeout_nacks_to_return_;
std::vector<uint16_t> nack_seqs_to_add_;
int get_nack_seqs_count_;
public:
MockRtpNackForReceiver(SrsRtpRingBuffer *rtp, size_t queue_size);
virtual ~MockRtpNackForReceiver();
public:
virtual void get_nack_seqs(SrsRtcpNack &seqs, uint32_t &timeout_nacks);
void set_timeout_nacks(uint32_t timeout_nacks);
void add_nack_seq(uint16_t seq);
void reset();
};
// Mock RTC connection for testing check_send_nacks
class MockRtcConnectionForNack : public SrsRtcConnection
{
public:
srs_error_t send_rtcp_error_;
int send_rtcp_count_;
std::vector<std::string> sent_rtcp_data_;
public:
MockRtcConnectionForNack(ISrsExecRtcAsyncTask *async, const SrsContextId &cid);
virtual ~MockRtcConnectionForNack();
public:
virtual srs_error_t send_rtcp(char *data, int nb_data);
void set_send_rtcp_error(srs_error_t err);
void reset();
};
// Mock request class for testing SrsRtcConnection::create_publisher
class MockRtcConnectionRequest : public ISrsRequest
{
public:
MockRtcConnectionRequest(std::string vhost = "__defaultVhost__", std::string app = "live", std::string stream = "test");
virtual ~MockRtcConnectionRequest();
virtual ISrsRequest *copy();
virtual std::string get_stream_url();
virtual void update_auth(ISrsRequest *req);
virtual void strip();
virtual ISrsRequest *as_http();
};
#endif

View File

@ -1354,6 +1354,14 @@ void MockConnectionManager::remove(ISrsResource * /*c*/)
{
}
void MockConnectionManager::subscribe(ISrsDisposingHandler * /*h*/)
{
}
void MockConnectionManager::unsubscribe(ISrsDisposingHandler * /*h*/)
{
}
void MockConnectionManager::dispose()
{
}

View File

@ -89,6 +89,8 @@ public:
MockConnectionManager();
virtual ~MockConnectionManager();
virtual void remove(ISrsResource *c);
virtual void subscribe(ISrsDisposingHandler *h);
virtual void unsubscribe(ISrsDisposingHandler *h);
virtual void dispose();
};