diff --git a/trunk/configure b/trunk/configure index 596d68e0e..d0231b2fb 100755 --- a/trunk/configure +++ b/trunk/configure @@ -319,7 +319,8 @@ MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_sourc "srs_app_mpegts_udp" "srs_app_listener" "srs_app_async_call" "srs_app_caster_flv" "srs_app_latest_version" "srs_app_uuid" "srs_app_process" "srs_app_ng_exec" "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" - "srs_app_coworkers" "srs_app_hybrid" "srs_app_circuit_breaker") + "srs_app_coworkers" "srs_app_hybrid" "srs_app_circuit_breaker" + "srs_app_stream_token") if [[ $SRS_SRT == YES ]]; then MODULE_FILES+=("srs_app_srt_server" "srs_app_srt_listener" "srs_app_srt_conn" "srs_app_srt_utility" "srs_app_srt_source") fi @@ -439,7 +440,8 @@ if [[ $SRS_UTEST == YES ]]; then "srs_utest_config" "srs_utest_rtmp" "srs_utest_http" "srs_utest_avc" "srs_utest_reload" "srs_utest_mp4" "srs_utest_service" "srs_utest_app" "srs_utest_rtc" "srs_utest_config2" "srs_utest_protocol" "srs_utest_protocol2" "srs_utest_kernel2" "srs_utest_protocol3" - "srs_utest_st" "srs_utest_rtc2" "srs_utest_rtc3" "srs_utest_fmp4" "srs_utest_source_lock") + "srs_utest_st" "srs_utest_rtc2" "srs_utest_rtc3" "srs_utest_fmp4" "srs_utest_source_lock" + "srs_utest_stream_token") if [[ $SRS_SRT == YES ]]; then MODULE_FILES+=("srs_utest_srt") fi diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 3133325e4..8a234a6b1 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 7.0 Changelog +* v7.0, 2025-08-25, Merge [#4452](https://github.com/ossrs/srs/pull/4452): AI: Implement stream publish token system to prevent race conditions across all protocols. v7.0.62 (#4452) * v7.0, 2025-08-22, Merge [#4449](https://github.com/ossrs/srs/pull/4449): Refine source lock. v7.0.61 (#4449) * v7.0, 2025-08-21, Merge [#4447](https://github.com/ossrs/srs/pull/4447): AI: Always enable WebRTC and enforce C++98 compatibility. v7.0.60 (#4447) * v7.0, 2025-08-20, Merge [#4445](https://github.com/ossrs/srs/pull/4445): AI: Remove multi-threading support and change to single-thread architecture. v7.0.59 (#4445) diff --git a/trunk/src/app/srs_app_dash.cpp b/trunk/src/app/srs_app_dash.cpp index 9adc4405a..d2ed16c61 100644 --- a/trunk/src/app/srs_app_dash.cpp +++ b/trunk/src/app/srs_app_dash.cpp @@ -185,6 +185,12 @@ void SrsMpdWriter::dispose() } } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsMpdWriter::initialize(ISrsRequest *r) { req = r; @@ -394,6 +400,12 @@ void SrsDashController::dispose() srs_trace("gracefully dispose dash %s", req ? req->get_stream_url().c_str() : ""); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsDashController::initialize(ISrsRequest *r) { srs_error_t err = srs_success; @@ -737,6 +749,12 @@ srs_utime_t SrsDash::cleanup_delay() return _srs_config->get_dash_dispose(req->vhost) * 1.1; } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsDash::initialize(SrsOriginHub *h, ISrsRequest *r) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 647baab9b..7013e2f3a 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -51,6 +51,12 @@ SrsDvrSegmenter::~SrsDvrSegmenter() srs_freep(fs); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsDvrSegmenter::initialize(SrsDvrPlan *p, ISrsRequest *r) { req = r; @@ -581,6 +587,12 @@ SrsDvrPlan::~SrsDvrPlan() srs_freep(req); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsDvrPlan::initialize(SrsOriginHub *h, SrsDvrSegmenter *s, ISrsRequest *r) { srs_error_t err = srs_success; @@ -666,6 +678,12 @@ srs_error_t SrsDvrPlan::on_reap_segment() return err; } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsDvrPlan::create_plan(string vhost, SrsDvrPlan **pplan) { std::string plan = _srs_config->get_dvr_plan(vhost); @@ -932,6 +950,12 @@ SrsDvr::~SrsDvr() srs_freep(req); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsDvr::initialize(SrsOriginHub *h, ISrsRequest *r) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index c405f01c2..33368adef 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -417,6 +417,12 @@ SrsEdgeIngester::~SrsEdgeIngester() srs_freep(trd); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsEdgeIngester::initialize(SrsSharedPtr s, SrsPlayEdge *e, ISrsRequest *r) { // Because source references to this object, so we should directly use the source ptr. @@ -750,6 +756,12 @@ void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size) return queue->set_queue_size(queue_size); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsEdgeForwarder::initialize(SrsSharedPtr s, SrsPublishEdge *e, ISrsRequest *r) { // Because source references to this object, so we should directly use the source ptr. @@ -965,6 +977,12 @@ SrsPlayEdge::~SrsPlayEdge() srs_freep(ingester); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsPlayEdge::initialize(SrsSharedPtr source, ISrsRequest *req) { srs_error_t err = srs_success; @@ -1058,6 +1076,12 @@ void SrsPublishEdge::set_queue_size(srs_utime_t queue_size) return forwarder->set_queue_size(queue_size); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsPublishEdge::initialize(SrsSharedPtr source, ISrsRequest *req) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index d490ed07f..6bf1cdf42 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -465,6 +465,12 @@ void SrsHlsFmp4Muxer::set_latest_vcodec(SrsVideoCodecId v) latest_vcodec_ = v; } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsHlsFmp4Muxer::initialize(int v_tid, int a_tid) { video_track_id_ = v_tid; @@ -1172,6 +1178,12 @@ void SrsHlsMuxer::set_latest_vcodec(SrsVideoCodecId v) latest_vcodec_ = v; } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsHlsMuxer::initialize() { return srs_success; @@ -1901,6 +1913,12 @@ SrsHlsController::~SrsHlsController() srs_freep(tsmc); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsHlsController::initialize() { srs_error_t err = muxer->initialize(); @@ -2224,6 +2242,12 @@ SrsHlsMp4Controller::~SrsHlsMp4Controller() srs_freep(muxer_); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsHlsMp4Controller::initialize() { srs_error_t err = srs_success; @@ -2514,6 +2538,12 @@ srs_utime_t SrsHls::cleanup_delay() return _srs_config->get_hls_dispose(req->vhost) * 1.1; } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsHls::initialize(SrsOriginHub *h, ISrsRequest *r) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index 5cd50d775..6304ec680 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -694,6 +695,9 @@ srs_error_t srs_global_initialize() _srs_rtc_sources = new SrsRtcSourceManager(); _srs_blackhole = new SrsRtcBlackhole(); + // Initialize stream publish token manager + _srs_stream_publish_tokens = new SrsStreamPublishTokenManager(); + _srs_rtc_manager = new SrsResourceManager("RTC", true); _srs_rtc_dtls_certificate = new SrsDtlsCertificate(); #ifdef SRS_RTSP diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index e1ddcbe48..077274776 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -33,6 +33,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -1194,6 +1195,16 @@ srs_error_t SrsRtcPublishStream::initialize(ISrsRequest *r, SrsRtcSourceDescript track->set_nack_no_copy(nack_no_copy_); } + // Acquire stream publish token to prevent race conditions across all protocols. + SrsStreamPublishToken *publish_token_raw = NULL; + if ((err = _srs_stream_publish_tokens->acquire_token(req_, publish_token_raw)) != srs_success) { + return srs_error_wrap(err, "acquire stream publish token"); + } + SrsUniquePtr publish_token(publish_token_raw); + if (publish_token.get()) { + srs_trace("stream publish token acquired, type=rtc, url=%s", req_->get_stream_url().c_str()); + } + // Setup the publish stream in source to enable PLI as such. if ((err = _srs_rtc_sources->fetch_or_create(req_, source_)) != srs_success) { return srs_error_wrap(err, "create source"); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 8cb977127..916b41107 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -451,6 +451,12 @@ bool SrsRtcSource::stream_is_dead() return true; } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. void SrsRtcSource::init_for_play_before_publishing() { // If the stream description has already been setup by RTC publisher, diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 87a880124..85dde9422 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -28,6 +28,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -655,6 +656,17 @@ srs_error_t SrsRtmpConn::stream_service_cycle() rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT); rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); + // Acquire stream publish token to prevent race conditions across all protocols. + SrsStreamPublishToken *publish_token_raw = NULL; + if (info->type != SrsRtmpConnPlay && (err = _srs_stream_publish_tokens->acquire_token(req, publish_token_raw)) != srs_success) { + return srs_error_wrap(err, "acquire stream publish token"); + } + SrsUniquePtr publish_token(publish_token_raw); + if (publish_token.get()) { + srs_trace("stream publish token acquired, type=%s, url=%s", + srs_client_type_string(info->type).c_str(), req->get_stream_url().c_str()); + } + // find a source to serve. SrsSharedPtr live_source; if ((err = _srs_sources->fetch_or_create(req, live_source)) != srs_success) { diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index c5a39115b..9131b03e3 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -33,6 +33,7 @@ using namespace std; #include #include #include +#include #include #include #include diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index da6da16da..61f9e1ca2 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -859,6 +859,12 @@ SrsOriginHub::~SrsOriginHub() #endif } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsOriginHub::initialize(SrsSharedPtr s, ISrsRequest *r) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index 9ac18dd4d..f59e34e2b 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -15,6 +15,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -283,6 +284,16 @@ srs_error_t SrsMpegtsSrtConn::do_cycle() srs_trace("@srt, streamid=%s, stream_url=%s, vhost=%s, app=%s, stream=%s, param=%s", streamid.c_str(), req_->get_stream_url().c_str(), req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), req_->param.c_str()); + // Acquire stream publish token to prevent race conditions across all protocols. + SrsStreamPublishToken *publish_token_raw = NULL; + if ((err = _srs_stream_publish_tokens->acquire_token(req_, publish_token_raw)) != srs_success) { + return srs_error_wrap(err, "acquire stream publish token"); + } + SrsUniquePtr publish_token(publish_token_raw); + if (publish_token.get()) { + srs_trace("stream publish token acquired, type=srt, url=%s", req_->get_stream_url().c_str()); + } + if ((err = _srs_srt_sources->fetch_or_create(req_, srt_source_)) != srs_success) { return srs_error_wrap(err, "fetch srt source"); } diff --git a/trunk/src/app/srs_app_stream_token.cpp b/trunk/src/app/srs_app_stream_token.cpp new file mode 100644 index 000000000..6650750bf --- /dev/null +++ b/trunk/src/app/srs_app_stream_token.cpp @@ -0,0 +1,131 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#include + +#include +#include +#include +#include +#include + +// Global instance +SrsStreamPublishTokenManager *_srs_stream_publish_tokens = NULL; + +SrsStreamPublishToken::SrsStreamPublishToken(const std::string &stream_url, SrsStreamPublishTokenManager *manager) +{ + stream_url_ = stream_url; + acquired_ = false; + manager_ = manager; + publisher_cid_ = SrsContextId(); +} + +SrsStreamPublishToken::~SrsStreamPublishToken() +{ + // Automatically release the token when destroyed + if (acquired_ && manager_) { + manager_->release_token(stream_url_); + } +} + +std::string SrsStreamPublishToken::stream_url() +{ + return stream_url_; +} + +bool SrsStreamPublishToken::is_acquired() +{ + return acquired_; +} + +void SrsStreamPublishToken::set_acquired(bool acquired) +{ + acquired_ = acquired; +} + +const SrsContextId &SrsStreamPublishToken::publisher_cid() +{ + return publisher_cid_; +} + +void SrsStreamPublishToken::set_publisher_cid(const SrsContextId &cid) +{ + publisher_cid_ = cid; +} + +SrsStreamPublishTokenManager::SrsStreamPublishTokenManager() +{ + mutex_ = srs_mutex_new(); +} + +SrsStreamPublishTokenManager::~SrsStreamPublishTokenManager() +{ + // Clean up all remaining tokens. Each token's destructor automatically calls + // release_token() which removes it from tokens_ map, so we use while loop + // to avoid iterator invalidation issues. + while (!tokens_.empty()) { + SrsStreamPublishToken *token = tokens_.begin()->second; + + // Token destructor will call release_token() and remove this entry from map + srs_freep(token); + } + + srs_mutex_destroy(mutex_); + srs_trace("stream publish token manager destroyed"); +} + +srs_error_t SrsStreamPublishTokenManager::acquire_token(ISrsRequest *req, SrsStreamPublishToken *&token) +{ + srs_error_t err = srs_success; + + std::string stream_url = req->get_stream_url(); + SrsContextId current_cid = _srs_context->get_id(); + + SrsLocker(mutex_); + + // Get or create token for this stream + SrsStreamPublishToken *stream_token = NULL; + + std::map::iterator it = tokens_.find(stream_url); + if (it != tokens_.end()) { + stream_token = it->second; + } else { + stream_token = new SrsStreamPublishToken(stream_url, this); + tokens_[stream_url] = stream_token; + } + + // Check if token is already acquired by another publisher + if (stream_token->is_acquired()) { + SrsContextId existing_cid = stream_token->publisher_cid(); + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, + "stream %s is busy, acquired by cid=%s, current cid=%s", + stream_url.c_str(), existing_cid.c_str(), current_cid.c_str()); + } else { + stream_token->set_acquired(true); + stream_token->set_publisher_cid(current_cid); + } + + // Return the token from the map (caller will manage its lifetime) + token = stream_token; + + return err; +} + +void SrsStreamPublishTokenManager::release_token(const std::string &stream_url) +{ + SrsLocker(mutex_); + + // Find and erase the token from the map + std::map::iterator it = tokens_.find(stream_url); + srs_assert(it != tokens_.end()); + + SrsStreamPublishToken *token = it->second; + token->set_acquired(false); + srs_trace("stream publish token released and deleted, url=%s", stream_url.c_str()); + + // Erase from map first, then delete the token + tokens_.erase(it); +} diff --git a/trunk/src/app/srs_app_stream_token.hpp b/trunk/src/app/srs_app_stream_token.hpp new file mode 100644 index 000000000..2e56ab8b6 --- /dev/null +++ b/trunk/src/app/srs_app_stream_token.hpp @@ -0,0 +1,87 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#ifndef SRS_APP_STREAM_TOKEN_HPP +#define SRS_APP_STREAM_TOKEN_HPP + +#include + +#include +#include + +#include +#include +#include + +class ISrsRequest; +class SrsStreamPublishTokenManager; + +// The stream publish token represents exclusive access to publish a stream. +// Only one publisher can hold a token for a given stream URL at any time. +// This prevents race conditions across all protocols (RTMP, RTC, SRT, etc.). +class SrsStreamPublishToken +{ +private: + // The stream URL this token is for + std::string stream_url_; + // Whether this token is currently acquired + bool acquired_; + // The token manager that created this token + SrsStreamPublishTokenManager *manager_; + // The context ID of the publisher that acquired this token + SrsContextId publisher_cid_; + +public: + SrsStreamPublishToken(const std::string &stream_url, SrsStreamPublishTokenManager *manager); + virtual ~SrsStreamPublishToken(); + +public: + // Get the stream URL this token is for + std::string stream_url(); + + // Check if this token is currently acquired + bool is_acquired(); + + void set_acquired(bool acquired); + + // Get the publisher context ID that acquired this token + const SrsContextId &publisher_cid(); + + void set_publisher_cid(const SrsContextId &cid); +}; + +// The global stream publish token manager ensures only one publisher +// can acquire a token for a given stream URL at any time. +// This prevents race conditions across all protocols. +class SrsStreamPublishTokenManager +{ +private: + // Map of stream URL to token + std::map tokens_; + // Mutex to protect the tokens map + srs_mutex_t mutex_; + +public: + SrsStreamPublishTokenManager(); + virtual ~SrsStreamPublishTokenManager(); + +public: + // Acquire a publish token for the given stream URL. + // Returns success if token was acquired, error if stream is already being published. + // @param req The request containing stream information + // @param token Output parameter for the acquired token (will be set to NULL on error) + srs_error_t acquire_token(ISrsRequest *req, SrsStreamPublishToken *&token); + + // Release a publish token for the given stream URL. + // This is called automatically when the token is destroyed. + // @param stream_url The stream URL to release the token for + void release_token(const std::string &stream_url); +}; + +// Global instance accessor +extern SrsStreamPublishTokenManager *_srs_stream_publish_tokens; + +#endif diff --git a/trunk/src/core/srs_core_version7.hpp b/trunk/src/core/srs_core_version7.hpp index 56b0e01d5..81028ea37 100644 --- a/trunk/src/core/srs_core_version7.hpp +++ b/trunk/src/core/srs_core_version7.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 7 #define VERSION_MINOR 0 -#define VERSION_REVISION 61 +#define VERSION_REVISION 62 #endif \ No newline at end of file diff --git a/trunk/src/kernel/srs_kernel_codec.cpp b/trunk/src/kernel/srs_kernel_codec.cpp index 632a102dc..50604f0e9 100644 --- a/trunk/src/kernel/srs_kernel_codec.cpp +++ b/trunk/src/kernel/srs_kernel_codec.cpp @@ -1014,6 +1014,12 @@ SrsFormat::~SrsFormat() srs_freep(vcodec); } +// CRITICAL: This method is called AFTER the source has been added to the source pool +// in the fetch_or_create pattern (see PR 4449). +// +// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches. +// This prevents the race condition where multiple coroutines could create duplicate sources +// for the same stream when context switches occurred during initialization. srs_error_t SrsFormat::initialize() { if (!vcodec) { diff --git a/trunk/src/utest/srs_utest_stream_token.cpp b/trunk/src/utest/srs_utest_stream_token.cpp new file mode 100644 index 000000000..25a4379c5 --- /dev/null +++ b/trunk/src/utest/srs_utest_stream_token.cpp @@ -0,0 +1,625 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#include + +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include + +// Mock request for testing stream publish tokens +class MockStreamTokenRequest : public SrsRequest +{ +public: + string mock_stream_url; + + MockStreamTokenRequest(const string &url = "/live/livestream") + { + mock_stream_url = url; + } + + virtual ~MockStreamTokenRequest() + { + } + + virtual string get_stream_url() + { + return mock_stream_url; + } +}; + +VOID TEST(StreamTokenTest, TokenBasicProperties) +{ + SrsStreamPublishTokenManager manager; + SrsStreamPublishToken token("/live/livestream", &manager); + + // Test basic properties + EXPECT_STREQ("/live/livestream", token.stream_url().c_str()); + EXPECT_FALSE(token.is_acquired()); + + // Test setting acquired state + token.set_acquired(true); + EXPECT_TRUE(token.is_acquired()); + + token.set_acquired(false); + EXPECT_FALSE(token.is_acquired()); + + // Test setting publisher context ID + SrsContextId test_cid; + test_cid.set_value("test-context-id"); + token.set_publisher_cid(test_cid); + EXPECT_STREQ("test-context-id", token.publisher_cid().c_str()); +} + +VOID TEST(StreamTokenTest, SingleTokenAcquisition) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req("/live/stream1"); + SrsStreamPublishToken *token = NULL; + + // First acquisition should succeed + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token)); + EXPECT_TRUE(token != NULL); + EXPECT_TRUE(token->is_acquired()); + EXPECT_STREQ("/live/stream1", token->stream_url().c_str()); + + // Clean up + srs_freep(token); +} + +VOID TEST(StreamTokenTest, DuplicateTokenRejection) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req("/live/stream1"); + SrsStreamPublishToken *token1 = NULL; + SrsStreamPublishToken *token2 = NULL; + + // First acquisition should succeed + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token1)); + EXPECT_TRUE(token1 != NULL); + EXPECT_TRUE(token1->is_acquired()); + + // Second acquisition for same stream should fail + HELPER_EXPECT_FAILED(manager.acquire_token(&req, token2)); + EXPECT_TRUE(token2 == NULL); + + // Clean up + srs_freep(token1); +} + +VOID TEST(StreamTokenTest, DifferentStreamsAllowed) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req1("/live/stream1"); + MockStreamTokenRequest req2("/live/stream2"); + SrsStreamPublishToken *token1 = NULL; + SrsStreamPublishToken *token2 = NULL; + + // Both acquisitions should succeed for different streams + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req1, token1)); + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req2, token2)); + + EXPECT_TRUE(token1 != NULL); + EXPECT_TRUE(token2 != NULL); + EXPECT_TRUE(token1->is_acquired()); + EXPECT_TRUE(token2->is_acquired()); + EXPECT_STREQ("/live/stream1", token1->stream_url().c_str()); + EXPECT_STREQ("/live/stream2", token2->stream_url().c_str()); + + // Clean up + srs_freep(token1); + srs_freep(token2); +} + +VOID TEST(StreamTokenTest, TokenAutoRelease) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req("/live/stream1"); + + // Acquire token in a scope + if (true) { + SrsStreamPublishToken *token = NULL; + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token)); + EXPECT_TRUE(token != NULL); + EXPECT_TRUE(token->is_acquired()); + + // Token will be automatically released when destroyed + srs_freep(token); + } + + // Now we should be able to acquire the same stream again + SrsStreamPublishToken *token2 = NULL; + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token2)); + EXPECT_TRUE(token2 != NULL); + EXPECT_TRUE(token2->is_acquired()); + + // Clean up + srs_freep(token2); +} + +VOID TEST(StreamTokenTest, ContextIdTracking) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req("/live/stream1"); + SrsStreamPublishToken *token = NULL; + + // Get current context ID + SrsContextId current_cid = _srs_context->generate_id(); + _srs_context->set_id(current_cid); + + // Acquire token + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token)); + EXPECT_TRUE(token != NULL); + + // Token should track the current context ID + EXPECT_STREQ(current_cid.c_str(), token->publisher_cid().c_str()); + + // Clean up + srs_freep(token); +} + +VOID TEST(StreamTokenTest, ErrorCodeVerification) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req("/live/stream1"); + SrsStreamPublishToken *token1 = NULL; + SrsStreamPublishToken *token2 = NULL; + + // First acquisition should succeed + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token1)); + EXPECT_TRUE(token1 != NULL); + + // Second acquisition should fail with specific error code + err = manager.acquire_token(&req, token2); + EXPECT_TRUE(err != srs_success); + EXPECT_EQ(ERROR_SYSTEM_STREAM_BUSY, srs_error_code(err)); + EXPECT_TRUE(token2 == NULL); + + // Clean up error and token + srs_freep(err); + srs_freep(token1); +} + +VOID TEST(StreamTokenTest, MultipleStreamsConcurrent) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + // Test multiple different streams can be acquired simultaneously + vector requests; + vector tokens; + + for (int i = 0; i < 10; i++) { + string stream_url = "/live/stream" + srs_int2str(i); + MockStreamTokenRequest *req = new MockStreamTokenRequest(stream_url); + requests.push_back(req); + + SrsStreamPublishToken *token = NULL; + HELPER_EXPECT_SUCCESS(manager.acquire_token(req, token)); + EXPECT_TRUE(token != NULL); + EXPECT_TRUE(token->is_acquired()); + EXPECT_STREQ(stream_url.c_str(), token->stream_url().c_str()); + tokens.push_back(token); + } + + // Clean up + for (size_t i = 0; i < requests.size(); i++) { + srs_freep(requests[i]); + srs_freep(tokens[i]); + } +} + +VOID TEST(StreamTokenTest, TokenReleaseAndReacquire) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req("/live/stream1"); + + // Acquire, release, and reacquire the same stream multiple times + for (int i = 0; i < 5; i++) { + SrsStreamPublishToken *token = NULL; + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token)); + EXPECT_TRUE(token != NULL); + EXPECT_TRUE(token->is_acquired()); + EXPECT_STREQ("/live/stream1", token->stream_url().c_str()); + + // Release token + srs_freep(token); + + // Should be able to acquire again immediately + SrsStreamPublishToken *token2 = NULL; + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token2)); + EXPECT_TRUE(token2 != NULL); + EXPECT_TRUE(token2->is_acquired()); + + // Clean up + srs_freep(token2); + } +} + +VOID TEST(StreamTokenTest, EmptyStreamUrl) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req(""); + SrsStreamPublishToken *token = NULL; + + // Should be able to acquire token even with empty stream URL + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token)); + EXPECT_TRUE(token != NULL); + EXPECT_TRUE(token->is_acquired()); + EXPECT_STREQ("", token->stream_url().c_str()); + + // Second acquisition should fail + SrsStreamPublishToken *token2 = NULL; + HELPER_EXPECT_FAILED(manager.acquire_token(&req, token2)); + EXPECT_TRUE(token2 == NULL); + + // Clean up + srs_freep(token); +} + +VOID TEST(StreamTokenTest, TokenDestructorBehavior) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req("/live/stream1"); + + // Test that token destructor properly releases the token + if (true) { + SrsStreamPublishToken *token = NULL; + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token)); + EXPECT_TRUE(token != NULL); + EXPECT_TRUE(token->is_acquired()); + + // Manually call destructor + srs_freep(token); + } + + // Should be able to acquire the same stream again + SrsStreamPublishToken *token2 = NULL; + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token2)); + EXPECT_TRUE(token2 != NULL); + EXPECT_TRUE(token2->is_acquired()); + + // Clean up + srs_freep(token2); +} + +VOID TEST(StreamTokenTest, ManagerDestructorCleanup) +{ + srs_error_t err; + + // Test that manager destructor properly cleans up all tokens + if (true) { + SrsStreamPublishTokenManager manager; + + // Acquire multiple tokens + for (int i = 0; i < 5; i++) { + string stream_url = "/live/stream" + srs_int2str(i); + MockStreamTokenRequest req(stream_url); + SrsStreamPublishToken *token = NULL; + + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token)); + EXPECT_TRUE(token != NULL); + } + + // Don't manually free tokens - let manager destructor handle cleanup + // Manager destructor should clean up all tokens automatically + } + + // If we reach here without crashes, the cleanup worked correctly + EXPECT_TRUE(true); +} + +// Mock coroutine class to simulate race conditions +class MockTokenCoroutine : public ISrsCoroutineHandler +{ +public: + SrsStreamPublishTokenManager *manager; + MockStreamTokenRequest *req; + SrsStreamPublishToken *token; + srs_error_t result; + bool completed; + int delay_ms; + + MockTokenCoroutine(SrsStreamPublishTokenManager *mgr, MockStreamTokenRequest *request, int delay = 0) + { + manager = mgr; + req = request; + token = NULL; + result = srs_success; + completed = false; + delay_ms = delay; + } + + virtual ~MockTokenCoroutine() + { + srs_freep(result); + srs_freep(token); + } + + virtual srs_error_t cycle() + { + srs_error_t err = srs_success; + + // Add delay to simulate timing variations + if (delay_ms > 0) { + srs_usleep(delay_ms * SRS_UTIME_MILLISECONDS); + } + + // Try to acquire token + result = manager->acquire_token(req, token); + completed = true; + + return err; + } +}; + +VOID TEST(StreamTokenTest, RaceConditionPrevention) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + // Create multiple coroutines trying to acquire the same stream + MockStreamTokenRequest req("/live/stream1"); + vector coroutines; + vector threads; + + // Create 5 coroutines with different delays + for (int i = 0; i < 5; i++) { + MockTokenCoroutine *handler = new MockTokenCoroutine(&manager, &req, 1); + coroutines.push_back(handler); + + SrsSTCoroutine *trd = new SrsSTCoroutine("token-test", handler, _srs_context->get_id()); + threads.push_back(trd); + + HELPER_ASSERT_SUCCESS(trd->start()); + } + + // Wait a bit for completion + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + + // Wait for all coroutines to complete + for (size_t i = 0; i < threads.size(); i++) { + threads[i]->stop(); + threads[i]->interrupt(); + } + + // Wait a bit for completion + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + + // Check results - exactly one should succeed, others should fail + int success_count = 0; + int failure_count = 0; + SrsStreamPublishToken *acquired_token = NULL; + + for (size_t i = 0; i < coroutines.size(); i++) { + if (coroutines[i]->completed) { + if (coroutines[i]->result == srs_success) { + success_count++; + acquired_token = coroutines[i]->token; + } else { + failure_count++; + EXPECT_EQ(ERROR_SYSTEM_STREAM_BUSY, srs_error_code(coroutines[i]->result)); + } + } + } + + // Exactly one should succeed + EXPECT_EQ(1, success_count); + EXPECT_EQ(4, failure_count); + EXPECT_TRUE(acquired_token != NULL); + + // Clean up + for (size_t i = 0; i < coroutines.size(); i++) { + srs_freep(coroutines[i]); + srs_freep(threads[i]); + } +} + +VOID TEST(StreamTokenTest, SequentialAcquisitionAfterRelease) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req("/live/stream1"); + + // Test sequential acquisition and release pattern + for (int round = 0; round < 10; round++) { + SrsStreamPublishToken *token = NULL; + + // Acquire token + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token)); + EXPECT_TRUE(token != NULL); + EXPECT_TRUE(token->is_acquired()); + + // Verify we can't acquire again + SrsStreamPublishToken *token2 = NULL; + HELPER_EXPECT_FAILED(manager.acquire_token(&req, token2)); + EXPECT_TRUE(token2 == NULL); + + // Release token + srs_freep(token); + + // Should be able to acquire again immediately + SrsStreamPublishToken *token3 = NULL; + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token3)); + EXPECT_TRUE(token3 != NULL); + EXPECT_TRUE(token3->is_acquired()); + + // Clean up + srs_freep(token3); + } +} + +VOID TEST(StreamTokenTest, TokenManagerStressTest) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + // Stress test with many streams + const int num_streams = 100; + vector requests; + vector tokens; + + // Acquire tokens for many streams + for (int i = 0; i < num_streams; i++) { + string stream_url = "/live/stress_stream_" + srs_int2str(i); + MockStreamTokenRequest *req = new MockStreamTokenRequest(stream_url); + requests.push_back(req); + + SrsStreamPublishToken *token = NULL; + HELPER_EXPECT_SUCCESS(manager.acquire_token(req, token)); + EXPECT_TRUE(token != NULL); + EXPECT_TRUE(token->is_acquired()); + tokens.push_back(token); + } + + // Verify all tokens are unique and properly acquired + for (int i = 0; i < num_streams; i++) { + EXPECT_TRUE(tokens[i]->is_acquired()); + string expected_url = "/live/stress_stream_" + srs_int2str(i); + EXPECT_STREQ(expected_url.c_str(), tokens[i]->stream_url().c_str()); + + // Verify we can't acquire the same stream again + SrsStreamPublishToken *duplicate_token = NULL; + HELPER_EXPECT_FAILED(manager.acquire_token(requests[i], duplicate_token)); + EXPECT_TRUE(duplicate_token == NULL); + } + + // Clean up + for (int i = 0; i < num_streams; i++) { + srs_freep(requests[i]); + srs_freep(tokens[i]); + } +} + +VOID TEST(StreamTokenTest, TokenWithDifferentContextIds) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + MockStreamTokenRequest req("/live/stream1"); + + SrsContextId first_cid = _srs_context->get_id(); + SrsStreamPublishToken *token1 = NULL; + + // Acquire token with first context + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token1)); + EXPECT_TRUE(token1 != NULL); + EXPECT_STREQ(first_cid.c_str(), token1->publisher_cid().c_str()); + + // Change context ID + _srs_context->set_id(SrsContextId()); + SrsContextId second_cid = _srs_context->get_id(); + + // Should still fail to acquire same stream with different context + SrsStreamPublishToken *token2 = NULL; + HELPER_EXPECT_FAILED(manager.acquire_token(&req, token2)); + EXPECT_TRUE(token2 == NULL); + + // Clean up + srs_freep(token1); +} + +VOID TEST(StreamTokenTest, TokenUrlCaseSensitivity) +{ + srs_error_t err; + + SrsStreamPublishTokenManager manager; + + // Test case sensitivity of stream URLs + MockStreamTokenRequest req1("/live/Stream1"); + MockStreamTokenRequest req2("/live/stream1"); + MockStreamTokenRequest req3("/LIVE/STREAM1"); + + SrsStreamPublishToken *token1 = NULL; + SrsStreamPublishToken *token2 = NULL; + SrsStreamPublishToken *token3 = NULL; + + // All should be treated as different streams (case sensitive) + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req1, token1)); + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req2, token2)); + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req3, token3)); + + EXPECT_TRUE(token1 != NULL); + EXPECT_TRUE(token2 != NULL); + EXPECT_TRUE(token3 != NULL); + + EXPECT_STREQ("/live/Stream1", token1->stream_url().c_str()); + EXPECT_STREQ("/live/stream1", token2->stream_url().c_str()); + EXPECT_STREQ("/LIVE/STREAM1", token3->stream_url().c_str()); + + // Clean up + srs_freep(token1); + srs_freep(token2); + srs_freep(token3); +} + +VOID TEST(StreamTokenTest, TokenManagerMemoryLeakPrevention) +{ + srs_error_t err; + + // Test that repeated acquire/release cycles don't cause memory leaks + for (int cycle = 0; cycle < 10; cycle++) { + SrsStreamPublishTokenManager manager; + + // Acquire and release multiple tokens + for (int i = 0; i < 20; i++) { + string stream_url = "/live/leak_test_" + srs_int2str(i); + MockStreamTokenRequest req(stream_url); + SrsStreamPublishToken *token = NULL; + + HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token)); + EXPECT_TRUE(token != NULL); + + // Immediately release + srs_freep(token); + } + + // Manager destructor should clean up everything + } + + // If we reach here without crashes or excessive memory usage, test passes + EXPECT_TRUE(true); +} diff --git a/trunk/src/utest/srs_utest_stream_token.hpp b/trunk/src/utest/srs_utest_stream_token.hpp new file mode 100644 index 000000000..b43134b99 --- /dev/null +++ b/trunk/src/utest/srs_utest_stream_token.hpp @@ -0,0 +1,15 @@ +// +// Copyright (c) 2013-2025 The SRS Authors +// +// SPDX-License-Identifier: MIT +// + +#ifndef SRS_UTEST_STREAM_TOKEN_HPP +#define SRS_UTEST_STREAM_TOKEN_HPP + +/* +#include +*/ +#include + +#endif