AI: Implement stream publish token system to prevent race conditions across all protocols. v7.0.62 (#4452)

This PR introduces a comprehensive stream publish token system that
prevents race conditions when multiple publishers attempt to publish to
the same stream URL simultaneously across different protocols (RTMP,
WebRTC, SRT).

* Race Condition Issue: Multiple publishers could create duplicate
sources for the same stream when context switches occurred during source
initialization in SRS's coroutine-based architecture
* Cross-Protocol Conflicts: Different protocols (RTMP, RTC, SRT) could
simultaneously publish to the same stream URL without coordination
* Resource Management: No centralized mechanism to ensure exclusive
stream publishing access

---------

Co-authored-by: OSSRS-AI <winlinam@gmail.com>
This commit is contained in:
Winlin 2025-08-26 10:27:53 -04:00 committed by GitHub
parent 1b6f97bd2d
commit 72ddc28d97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1017 additions and 3 deletions

6
trunk/configure vendored
View File

@ -319,7 +319,8 @@ MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_sourc
"srs_app_mpegts_udp" "srs_app_listener" "srs_app_async_call"
"srs_app_caster_flv" "srs_app_latest_version" "srs_app_uuid" "srs_app_process" "srs_app_ng_exec"
"srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr"
"srs_app_coworkers" "srs_app_hybrid" "srs_app_circuit_breaker")
"srs_app_coworkers" "srs_app_hybrid" "srs_app_circuit_breaker"
"srs_app_stream_token")
if [[ $SRS_SRT == YES ]]; then
MODULE_FILES+=("srs_app_srt_server" "srs_app_srt_listener" "srs_app_srt_conn" "srs_app_srt_utility" "srs_app_srt_source")
fi
@ -439,7 +440,8 @@ if [[ $SRS_UTEST == YES ]]; then
"srs_utest_config" "srs_utest_rtmp" "srs_utest_http" "srs_utest_avc" "srs_utest_reload"
"srs_utest_mp4" "srs_utest_service" "srs_utest_app" "srs_utest_rtc" "srs_utest_config2"
"srs_utest_protocol" "srs_utest_protocol2" "srs_utest_kernel2" "srs_utest_protocol3"
"srs_utest_st" "srs_utest_rtc2" "srs_utest_rtc3" "srs_utest_fmp4" "srs_utest_source_lock")
"srs_utest_st" "srs_utest_rtc2" "srs_utest_rtc3" "srs_utest_fmp4" "srs_utest_source_lock"
"srs_utest_stream_token")
if [[ $SRS_SRT == YES ]]; then
MODULE_FILES+=("srs_utest_srt")
fi

View File

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a>
## SRS 7.0 Changelog
* v7.0, 2025-08-25, Merge [#4452](https://github.com/ossrs/srs/pull/4452): AI: Implement stream publish token system to prevent race conditions across all protocols. v7.0.62 (#4452)
* v7.0, 2025-08-22, Merge [#4449](https://github.com/ossrs/srs/pull/4449): Refine source lock. v7.0.61 (#4449)
* v7.0, 2025-08-21, Merge [#4447](https://github.com/ossrs/srs/pull/4447): AI: Always enable WebRTC and enforce C++98 compatibility. v7.0.60 (#4447)
* v7.0, 2025-08-20, Merge [#4445](https://github.com/ossrs/srs/pull/4445): AI: Remove multi-threading support and change to single-thread architecture. v7.0.59 (#4445)

View File

@ -185,6 +185,12 @@ void SrsMpdWriter::dispose()
}
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsMpdWriter::initialize(ISrsRequest *r)
{
req = r;
@ -394,6 +400,12 @@ void SrsDashController::dispose()
srs_trace("gracefully dispose dash %s", req ? req->get_stream_url().c_str() : "");
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsDashController::initialize(ISrsRequest *r)
{
srs_error_t err = srs_success;
@ -737,6 +749,12 @@ srs_utime_t SrsDash::cleanup_delay()
return _srs_config->get_dash_dispose(req->vhost) * 1.1;
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsDash::initialize(SrsOriginHub *h, ISrsRequest *r)
{
srs_error_t err = srs_success;

View File

@ -51,6 +51,12 @@ SrsDvrSegmenter::~SrsDvrSegmenter()
srs_freep(fs);
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsDvrSegmenter::initialize(SrsDvrPlan *p, ISrsRequest *r)
{
req = r;
@ -581,6 +587,12 @@ SrsDvrPlan::~SrsDvrPlan()
srs_freep(req);
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsDvrPlan::initialize(SrsOriginHub *h, SrsDvrSegmenter *s, ISrsRequest *r)
{
srs_error_t err = srs_success;
@ -666,6 +678,12 @@ srs_error_t SrsDvrPlan::on_reap_segment()
return err;
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsDvrPlan::create_plan(string vhost, SrsDvrPlan **pplan)
{
std::string plan = _srs_config->get_dvr_plan(vhost);
@ -932,6 +950,12 @@ SrsDvr::~SrsDvr()
srs_freep(req);
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsDvr::initialize(SrsOriginHub *h, ISrsRequest *r)
{
srs_error_t err = srs_success;

View File

@ -417,6 +417,12 @@ SrsEdgeIngester::~SrsEdgeIngester()
srs_freep(trd);
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsEdgeIngester::initialize(SrsSharedPtr<SrsLiveSource> s, SrsPlayEdge *e, ISrsRequest *r)
{
// Because source references to this object, so we should directly use the source ptr.
@ -750,6 +756,12 @@ void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size)
return queue->set_queue_size(queue_size);
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsEdgeForwarder::initialize(SrsSharedPtr<SrsLiveSource> s, SrsPublishEdge *e, ISrsRequest *r)
{
// Because source references to this object, so we should directly use the source ptr.
@ -965,6 +977,12 @@ SrsPlayEdge::~SrsPlayEdge()
srs_freep(ingester);
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsPlayEdge::initialize(SrsSharedPtr<SrsLiveSource> source, ISrsRequest *req)
{
srs_error_t err = srs_success;
@ -1058,6 +1076,12 @@ void SrsPublishEdge::set_queue_size(srs_utime_t queue_size)
return forwarder->set_queue_size(queue_size);
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsPublishEdge::initialize(SrsSharedPtr<SrsLiveSource> source, ISrsRequest *req)
{
srs_error_t err = srs_success;

View File

@ -465,6 +465,12 @@ void SrsHlsFmp4Muxer::set_latest_vcodec(SrsVideoCodecId v)
latest_vcodec_ = v;
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsHlsFmp4Muxer::initialize(int v_tid, int a_tid)
{
video_track_id_ = v_tid;
@ -1172,6 +1178,12 @@ void SrsHlsMuxer::set_latest_vcodec(SrsVideoCodecId v)
latest_vcodec_ = v;
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsHlsMuxer::initialize()
{
return srs_success;
@ -1901,6 +1913,12 @@ SrsHlsController::~SrsHlsController()
srs_freep(tsmc);
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsHlsController::initialize()
{
srs_error_t err = muxer->initialize();
@ -2224,6 +2242,12 @@ SrsHlsMp4Controller::~SrsHlsMp4Controller()
srs_freep(muxer_);
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsHlsMp4Controller::initialize()
{
srs_error_t err = srs_success;
@ -2514,6 +2538,12 @@ srs_utime_t SrsHls::cleanup_delay()
return _srs_config->get_hls_dispose(req->vhost) * 1.1;
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsHls::initialize(SrsOriginHub *h, ISrsRequest *r)
{
srs_error_t err = srs_success;

View File

@ -20,6 +20,7 @@
#include <srs_app_rtc_source.hpp>
#include <srs_app_server.hpp>
#include <srs_app_source.hpp>
#include <srs_app_stream_token.hpp>
#include <srs_app_tencentcloud.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_error.hpp>
@ -694,6 +695,9 @@ srs_error_t srs_global_initialize()
_srs_rtc_sources = new SrsRtcSourceManager();
_srs_blackhole = new SrsRtcBlackhole();
// Initialize stream publish token manager
_srs_stream_publish_tokens = new SrsStreamPublishTokenManager();
_srs_rtc_manager = new SrsResourceManager("RTC", true);
_srs_rtc_dtls_certificate = new SrsDtlsCertificate();
#ifdef SRS_RTSP

View File

@ -33,6 +33,7 @@ using namespace std;
#include <srs_app_source.hpp>
#include <srs_app_srt_source.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_stream_token.hpp>
#include <srs_app_utility.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_buffer.hpp>
@ -1194,6 +1195,16 @@ srs_error_t SrsRtcPublishStream::initialize(ISrsRequest *r, SrsRtcSourceDescript
track->set_nack_no_copy(nack_no_copy_);
}
// Acquire stream publish token to prevent race conditions across all protocols.
SrsStreamPublishToken *publish_token_raw = NULL;
if ((err = _srs_stream_publish_tokens->acquire_token(req_, publish_token_raw)) != srs_success) {
return srs_error_wrap(err, "acquire stream publish token");
}
SrsUniquePtr<SrsStreamPublishToken> publish_token(publish_token_raw);
if (publish_token.get()) {
srs_trace("stream publish token acquired, type=rtc, url=%s", req_->get_stream_url().c_str());
}
// Setup the publish stream in source to enable PLI as such.
if ((err = _srs_rtc_sources->fetch_or_create(req_, source_)) != srs_success) {
return srs_error_wrap(err, "create source");

View File

@ -451,6 +451,12 @@ bool SrsRtcSource::stream_is_dead()
return true;
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
void SrsRtcSource::init_for_play_before_publishing()
{
// If the stream description has already been setup by RTC publisher,

View File

@ -28,6 +28,7 @@ using namespace std;
#include <srs_app_srt_source.hpp>
#include <srs_app_st.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_stream_token.hpp>
#include <srs_app_tencentcloud.hpp>
#include <srs_app_utility.hpp>
#include <srs_core_autofree.hpp>
@ -655,6 +656,17 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
// Acquire stream publish token to prevent race conditions across all protocols.
SrsStreamPublishToken *publish_token_raw = NULL;
if (info->type != SrsRtmpConnPlay && (err = _srs_stream_publish_tokens->acquire_token(req, publish_token_raw)) != srs_success) {
return srs_error_wrap(err, "acquire stream publish token");
}
SrsUniquePtr<SrsStreamPublishToken> publish_token(publish_token_raw);
if (publish_token.get()) {
srs_trace("stream publish token acquired, type=%s, url=%s",
srs_client_type_string(info->type).c_str(), req->get_stream_url().c_str());
}
// find a source to serve.
SrsSharedPtr<SrsLiveSource> live_source;
if ((err = _srs_sources->fetch_or_create(req, live_source)) != srs_success) {

View File

@ -33,6 +33,7 @@ using namespace std;
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_source.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_stream_token.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_consts.hpp>
#include <srs_kernel_error.hpp>

View File

@ -859,6 +859,12 @@ SrsOriginHub::~SrsOriginHub()
#endif
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsOriginHub::initialize(SrsSharedPtr<SrsLiveSource> s, ISrsRequest *r)
{
srs_error_t err = srs_success;

View File

@ -15,6 +15,7 @@ using namespace std;
#include <srs_app_srt_server.hpp>
#include <srs_app_srt_source.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_stream_token.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_flv.hpp>
@ -283,6 +284,16 @@ srs_error_t SrsMpegtsSrtConn::do_cycle()
srs_trace("@srt, streamid=%s, stream_url=%s, vhost=%s, app=%s, stream=%s, param=%s",
streamid.c_str(), req_->get_stream_url().c_str(), req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), req_->param.c_str());
// Acquire stream publish token to prevent race conditions across all protocols.
SrsStreamPublishToken *publish_token_raw = NULL;
if ((err = _srs_stream_publish_tokens->acquire_token(req_, publish_token_raw)) != srs_success) {
return srs_error_wrap(err, "acquire stream publish token");
}
SrsUniquePtr<SrsStreamPublishToken> publish_token(publish_token_raw);
if (publish_token.get()) {
srs_trace("stream publish token acquired, type=srt, url=%s", req_->get_stream_url().c_str());
}
if ((err = _srs_srt_sources->fetch_or_create(req_, srt_source_)) != srs_success) {
return srs_error_wrap(err, "fetch srt source");
}

View File

@ -0,0 +1,131 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#include <srs_app_stream_token.hpp>
#include <srs_app_config.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_protocol_st.hpp>
// Global instance
SrsStreamPublishTokenManager *_srs_stream_publish_tokens = NULL;
SrsStreamPublishToken::SrsStreamPublishToken(const std::string &stream_url, SrsStreamPublishTokenManager *manager)
{
stream_url_ = stream_url;
acquired_ = false;
manager_ = manager;
publisher_cid_ = SrsContextId();
}
SrsStreamPublishToken::~SrsStreamPublishToken()
{
// Automatically release the token when destroyed
if (acquired_ && manager_) {
manager_->release_token(stream_url_);
}
}
std::string SrsStreamPublishToken::stream_url()
{
return stream_url_;
}
bool SrsStreamPublishToken::is_acquired()
{
return acquired_;
}
void SrsStreamPublishToken::set_acquired(bool acquired)
{
acquired_ = acquired;
}
const SrsContextId &SrsStreamPublishToken::publisher_cid()
{
return publisher_cid_;
}
void SrsStreamPublishToken::set_publisher_cid(const SrsContextId &cid)
{
publisher_cid_ = cid;
}
SrsStreamPublishTokenManager::SrsStreamPublishTokenManager()
{
mutex_ = srs_mutex_new();
}
SrsStreamPublishTokenManager::~SrsStreamPublishTokenManager()
{
// Clean up all remaining tokens. Each token's destructor automatically calls
// release_token() which removes it from tokens_ map, so we use while loop
// to avoid iterator invalidation issues.
while (!tokens_.empty()) {
SrsStreamPublishToken *token = tokens_.begin()->second;
// Token destructor will call release_token() and remove this entry from map
srs_freep(token);
}
srs_mutex_destroy(mutex_);
srs_trace("stream publish token manager destroyed");
}
srs_error_t SrsStreamPublishTokenManager::acquire_token(ISrsRequest *req, SrsStreamPublishToken *&token)
{
srs_error_t err = srs_success;
std::string stream_url = req->get_stream_url();
SrsContextId current_cid = _srs_context->get_id();
SrsLocker(mutex_);
// Get or create token for this stream
SrsStreamPublishToken *stream_token = NULL;
std::map<std::string, SrsStreamPublishToken *>::iterator it = tokens_.find(stream_url);
if (it != tokens_.end()) {
stream_token = it->second;
} else {
stream_token = new SrsStreamPublishToken(stream_url, this);
tokens_[stream_url] = stream_token;
}
// Check if token is already acquired by another publisher
if (stream_token->is_acquired()) {
SrsContextId existing_cid = stream_token->publisher_cid();
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY,
"stream %s is busy, acquired by cid=%s, current cid=%s",
stream_url.c_str(), existing_cid.c_str(), current_cid.c_str());
} else {
stream_token->set_acquired(true);
stream_token->set_publisher_cid(current_cid);
}
// Return the token from the map (caller will manage its lifetime)
token = stream_token;
return err;
}
void SrsStreamPublishTokenManager::release_token(const std::string &stream_url)
{
SrsLocker(mutex_);
// Find and erase the token from the map
std::map<std::string, SrsStreamPublishToken *>::iterator it = tokens_.find(stream_url);
srs_assert(it != tokens_.end());
SrsStreamPublishToken *token = it->second;
token->set_acquired(false);
srs_trace("stream publish token released and deleted, url=%s", stream_url.c_str());
// Erase from map first, then delete the token
tokens_.erase(it);
}

View File

@ -0,0 +1,87 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_APP_STREAM_TOKEN_HPP
#define SRS_APP_STREAM_TOKEN_HPP
#include <srs_core.hpp>
#include <map>
#include <string>
#include <srs_core_autofree.hpp>
#include <srs_kernel_error.hpp>
#include <srs_protocol_st.hpp>
class ISrsRequest;
class SrsStreamPublishTokenManager;
// The stream publish token represents exclusive access to publish a stream.
// Only one publisher can hold a token for a given stream URL at any time.
// This prevents race conditions across all protocols (RTMP, RTC, SRT, etc.).
class SrsStreamPublishToken
{
private:
// The stream URL this token is for
std::string stream_url_;
// Whether this token is currently acquired
bool acquired_;
// The token manager that created this token
SrsStreamPublishTokenManager *manager_;
// The context ID of the publisher that acquired this token
SrsContextId publisher_cid_;
public:
SrsStreamPublishToken(const std::string &stream_url, SrsStreamPublishTokenManager *manager);
virtual ~SrsStreamPublishToken();
public:
// Get the stream URL this token is for
std::string stream_url();
// Check if this token is currently acquired
bool is_acquired();
void set_acquired(bool acquired);
// Get the publisher context ID that acquired this token
const SrsContextId &publisher_cid();
void set_publisher_cid(const SrsContextId &cid);
};
// The global stream publish token manager ensures only one publisher
// can acquire a token for a given stream URL at any time.
// This prevents race conditions across all protocols.
class SrsStreamPublishTokenManager
{
private:
// Map of stream URL to token
std::map<std::string, SrsStreamPublishToken *> tokens_;
// Mutex to protect the tokens map
srs_mutex_t mutex_;
public:
SrsStreamPublishTokenManager();
virtual ~SrsStreamPublishTokenManager();
public:
// Acquire a publish token for the given stream URL.
// Returns success if token was acquired, error if stream is already being published.
// @param req The request containing stream information
// @param token Output parameter for the acquired token (will be set to NULL on error)
srs_error_t acquire_token(ISrsRequest *req, SrsStreamPublishToken *&token);
// Release a publish token for the given stream URL.
// This is called automatically when the token is destroyed.
// @param stream_url The stream URL to release the token for
void release_token(const std::string &stream_url);
};
// Global instance accessor
extern SrsStreamPublishTokenManager *_srs_stream_publish_tokens;
#endif

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7
#define VERSION_MINOR 0
#define VERSION_REVISION 61
#define VERSION_REVISION 62
#endif

View File

@ -1014,6 +1014,12 @@ SrsFormat::~SrsFormat()
srs_freep(vcodec);
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
// in the fetch_or_create pattern (see PR 4449).
//
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsFormat::initialize()
{
if (!vcodec) {

View File

@ -0,0 +1,625 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#include <srs_utest_stream_token.hpp>
using namespace std;
#include <srs_app_st.hpp>
#include <srs_app_stream_token.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_amf0.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_protocol_st.hpp>
// Mock request for testing stream publish tokens
class MockStreamTokenRequest : public SrsRequest
{
public:
string mock_stream_url;
MockStreamTokenRequest(const string &url = "/live/livestream")
{
mock_stream_url = url;
}
virtual ~MockStreamTokenRequest()
{
}
virtual string get_stream_url()
{
return mock_stream_url;
}
};
VOID TEST(StreamTokenTest, TokenBasicProperties)
{
SrsStreamPublishTokenManager manager;
SrsStreamPublishToken token("/live/livestream", &manager);
// Test basic properties
EXPECT_STREQ("/live/livestream", token.stream_url().c_str());
EXPECT_FALSE(token.is_acquired());
// Test setting acquired state
token.set_acquired(true);
EXPECT_TRUE(token.is_acquired());
token.set_acquired(false);
EXPECT_FALSE(token.is_acquired());
// Test setting publisher context ID
SrsContextId test_cid;
test_cid.set_value("test-context-id");
token.set_publisher_cid(test_cid);
EXPECT_STREQ("test-context-id", token.publisher_cid().c_str());
}
VOID TEST(StreamTokenTest, SingleTokenAcquisition)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req("/live/stream1");
SrsStreamPublishToken *token = NULL;
// First acquisition should succeed
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token));
EXPECT_TRUE(token != NULL);
EXPECT_TRUE(token->is_acquired());
EXPECT_STREQ("/live/stream1", token->stream_url().c_str());
// Clean up
srs_freep(token);
}
VOID TEST(StreamTokenTest, DuplicateTokenRejection)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req("/live/stream1");
SrsStreamPublishToken *token1 = NULL;
SrsStreamPublishToken *token2 = NULL;
// First acquisition should succeed
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token1));
EXPECT_TRUE(token1 != NULL);
EXPECT_TRUE(token1->is_acquired());
// Second acquisition for same stream should fail
HELPER_EXPECT_FAILED(manager.acquire_token(&req, token2));
EXPECT_TRUE(token2 == NULL);
// Clean up
srs_freep(token1);
}
VOID TEST(StreamTokenTest, DifferentStreamsAllowed)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req1("/live/stream1");
MockStreamTokenRequest req2("/live/stream2");
SrsStreamPublishToken *token1 = NULL;
SrsStreamPublishToken *token2 = NULL;
// Both acquisitions should succeed for different streams
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req1, token1));
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req2, token2));
EXPECT_TRUE(token1 != NULL);
EXPECT_TRUE(token2 != NULL);
EXPECT_TRUE(token1->is_acquired());
EXPECT_TRUE(token2->is_acquired());
EXPECT_STREQ("/live/stream1", token1->stream_url().c_str());
EXPECT_STREQ("/live/stream2", token2->stream_url().c_str());
// Clean up
srs_freep(token1);
srs_freep(token2);
}
VOID TEST(StreamTokenTest, TokenAutoRelease)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req("/live/stream1");
// Acquire token in a scope
if (true) {
SrsStreamPublishToken *token = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token));
EXPECT_TRUE(token != NULL);
EXPECT_TRUE(token->is_acquired());
// Token will be automatically released when destroyed
srs_freep(token);
}
// Now we should be able to acquire the same stream again
SrsStreamPublishToken *token2 = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token2));
EXPECT_TRUE(token2 != NULL);
EXPECT_TRUE(token2->is_acquired());
// Clean up
srs_freep(token2);
}
VOID TEST(StreamTokenTest, ContextIdTracking)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req("/live/stream1");
SrsStreamPublishToken *token = NULL;
// Get current context ID
SrsContextId current_cid = _srs_context->generate_id();
_srs_context->set_id(current_cid);
// Acquire token
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token));
EXPECT_TRUE(token != NULL);
// Token should track the current context ID
EXPECT_STREQ(current_cid.c_str(), token->publisher_cid().c_str());
// Clean up
srs_freep(token);
}
VOID TEST(StreamTokenTest, ErrorCodeVerification)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req("/live/stream1");
SrsStreamPublishToken *token1 = NULL;
SrsStreamPublishToken *token2 = NULL;
// First acquisition should succeed
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token1));
EXPECT_TRUE(token1 != NULL);
// Second acquisition should fail with specific error code
err = manager.acquire_token(&req, token2);
EXPECT_TRUE(err != srs_success);
EXPECT_EQ(ERROR_SYSTEM_STREAM_BUSY, srs_error_code(err));
EXPECT_TRUE(token2 == NULL);
// Clean up error and token
srs_freep(err);
srs_freep(token1);
}
VOID TEST(StreamTokenTest, MultipleStreamsConcurrent)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
// Test multiple different streams can be acquired simultaneously
vector<MockStreamTokenRequest *> requests;
vector<SrsStreamPublishToken *> tokens;
for (int i = 0; i < 10; i++) {
string stream_url = "/live/stream" + srs_int2str(i);
MockStreamTokenRequest *req = new MockStreamTokenRequest(stream_url);
requests.push_back(req);
SrsStreamPublishToken *token = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(req, token));
EXPECT_TRUE(token != NULL);
EXPECT_TRUE(token->is_acquired());
EXPECT_STREQ(stream_url.c_str(), token->stream_url().c_str());
tokens.push_back(token);
}
// Clean up
for (size_t i = 0; i < requests.size(); i++) {
srs_freep(requests[i]);
srs_freep(tokens[i]);
}
}
VOID TEST(StreamTokenTest, TokenReleaseAndReacquire)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req("/live/stream1");
// Acquire, release, and reacquire the same stream multiple times
for (int i = 0; i < 5; i++) {
SrsStreamPublishToken *token = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token));
EXPECT_TRUE(token != NULL);
EXPECT_TRUE(token->is_acquired());
EXPECT_STREQ("/live/stream1", token->stream_url().c_str());
// Release token
srs_freep(token);
// Should be able to acquire again immediately
SrsStreamPublishToken *token2 = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token2));
EXPECT_TRUE(token2 != NULL);
EXPECT_TRUE(token2->is_acquired());
// Clean up
srs_freep(token2);
}
}
VOID TEST(StreamTokenTest, EmptyStreamUrl)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req("");
SrsStreamPublishToken *token = NULL;
// Should be able to acquire token even with empty stream URL
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token));
EXPECT_TRUE(token != NULL);
EXPECT_TRUE(token->is_acquired());
EXPECT_STREQ("", token->stream_url().c_str());
// Second acquisition should fail
SrsStreamPublishToken *token2 = NULL;
HELPER_EXPECT_FAILED(manager.acquire_token(&req, token2));
EXPECT_TRUE(token2 == NULL);
// Clean up
srs_freep(token);
}
VOID TEST(StreamTokenTest, TokenDestructorBehavior)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req("/live/stream1");
// Test that token destructor properly releases the token
if (true) {
SrsStreamPublishToken *token = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token));
EXPECT_TRUE(token != NULL);
EXPECT_TRUE(token->is_acquired());
// Manually call destructor
srs_freep(token);
}
// Should be able to acquire the same stream again
SrsStreamPublishToken *token2 = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token2));
EXPECT_TRUE(token2 != NULL);
EXPECT_TRUE(token2->is_acquired());
// Clean up
srs_freep(token2);
}
VOID TEST(StreamTokenTest, ManagerDestructorCleanup)
{
srs_error_t err;
// Test that manager destructor properly cleans up all tokens
if (true) {
SrsStreamPublishTokenManager manager;
// Acquire multiple tokens
for (int i = 0; i < 5; i++) {
string stream_url = "/live/stream" + srs_int2str(i);
MockStreamTokenRequest req(stream_url);
SrsStreamPublishToken *token = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token));
EXPECT_TRUE(token != NULL);
}
// Don't manually free tokens - let manager destructor handle cleanup
// Manager destructor should clean up all tokens automatically
}
// If we reach here without crashes, the cleanup worked correctly
EXPECT_TRUE(true);
}
// Mock coroutine class to simulate race conditions
class MockTokenCoroutine : public ISrsCoroutineHandler
{
public:
SrsStreamPublishTokenManager *manager;
MockStreamTokenRequest *req;
SrsStreamPublishToken *token;
srs_error_t result;
bool completed;
int delay_ms;
MockTokenCoroutine(SrsStreamPublishTokenManager *mgr, MockStreamTokenRequest *request, int delay = 0)
{
manager = mgr;
req = request;
token = NULL;
result = srs_success;
completed = false;
delay_ms = delay;
}
virtual ~MockTokenCoroutine()
{
srs_freep(result);
srs_freep(token);
}
virtual srs_error_t cycle()
{
srs_error_t err = srs_success;
// Add delay to simulate timing variations
if (delay_ms > 0) {
srs_usleep(delay_ms * SRS_UTIME_MILLISECONDS);
}
// Try to acquire token
result = manager->acquire_token(req, token);
completed = true;
return err;
}
};
VOID TEST(StreamTokenTest, RaceConditionPrevention)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
// Create multiple coroutines trying to acquire the same stream
MockStreamTokenRequest req("/live/stream1");
vector<MockTokenCoroutine *> coroutines;
vector<SrsSTCoroutine *> threads;
// Create 5 coroutines with different delays
for (int i = 0; i < 5; i++) {
MockTokenCoroutine *handler = new MockTokenCoroutine(&manager, &req, 1);
coroutines.push_back(handler);
SrsSTCoroutine *trd = new SrsSTCoroutine("token-test", handler, _srs_context->get_id());
threads.push_back(trd);
HELPER_ASSERT_SUCCESS(trd->start());
}
// Wait a bit for completion
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Wait for all coroutines to complete
for (size_t i = 0; i < threads.size(); i++) {
threads[i]->stop();
threads[i]->interrupt();
}
// Wait a bit for completion
srs_usleep(10 * SRS_UTIME_MILLISECONDS);
// Check results - exactly one should succeed, others should fail
int success_count = 0;
int failure_count = 0;
SrsStreamPublishToken *acquired_token = NULL;
for (size_t i = 0; i < coroutines.size(); i++) {
if (coroutines[i]->completed) {
if (coroutines[i]->result == srs_success) {
success_count++;
acquired_token = coroutines[i]->token;
} else {
failure_count++;
EXPECT_EQ(ERROR_SYSTEM_STREAM_BUSY, srs_error_code(coroutines[i]->result));
}
}
}
// Exactly one should succeed
EXPECT_EQ(1, success_count);
EXPECT_EQ(4, failure_count);
EXPECT_TRUE(acquired_token != NULL);
// Clean up
for (size_t i = 0; i < coroutines.size(); i++) {
srs_freep(coroutines[i]);
srs_freep(threads[i]);
}
}
VOID TEST(StreamTokenTest, SequentialAcquisitionAfterRelease)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req("/live/stream1");
// Test sequential acquisition and release pattern
for (int round = 0; round < 10; round++) {
SrsStreamPublishToken *token = NULL;
// Acquire token
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token));
EXPECT_TRUE(token != NULL);
EXPECT_TRUE(token->is_acquired());
// Verify we can't acquire again
SrsStreamPublishToken *token2 = NULL;
HELPER_EXPECT_FAILED(manager.acquire_token(&req, token2));
EXPECT_TRUE(token2 == NULL);
// Release token
srs_freep(token);
// Should be able to acquire again immediately
SrsStreamPublishToken *token3 = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token3));
EXPECT_TRUE(token3 != NULL);
EXPECT_TRUE(token3->is_acquired());
// Clean up
srs_freep(token3);
}
}
VOID TEST(StreamTokenTest, TokenManagerStressTest)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
// Stress test with many streams
const int num_streams = 100;
vector<MockStreamTokenRequest *> requests;
vector<SrsStreamPublishToken *> tokens;
// Acquire tokens for many streams
for (int i = 0; i < num_streams; i++) {
string stream_url = "/live/stress_stream_" + srs_int2str(i);
MockStreamTokenRequest *req = new MockStreamTokenRequest(stream_url);
requests.push_back(req);
SrsStreamPublishToken *token = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(req, token));
EXPECT_TRUE(token != NULL);
EXPECT_TRUE(token->is_acquired());
tokens.push_back(token);
}
// Verify all tokens are unique and properly acquired
for (int i = 0; i < num_streams; i++) {
EXPECT_TRUE(tokens[i]->is_acquired());
string expected_url = "/live/stress_stream_" + srs_int2str(i);
EXPECT_STREQ(expected_url.c_str(), tokens[i]->stream_url().c_str());
// Verify we can't acquire the same stream again
SrsStreamPublishToken *duplicate_token = NULL;
HELPER_EXPECT_FAILED(manager.acquire_token(requests[i], duplicate_token));
EXPECT_TRUE(duplicate_token == NULL);
}
// Clean up
for (int i = 0; i < num_streams; i++) {
srs_freep(requests[i]);
srs_freep(tokens[i]);
}
}
VOID TEST(StreamTokenTest, TokenWithDifferentContextIds)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
MockStreamTokenRequest req("/live/stream1");
SrsContextId first_cid = _srs_context->get_id();
SrsStreamPublishToken *token1 = NULL;
// Acquire token with first context
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token1));
EXPECT_TRUE(token1 != NULL);
EXPECT_STREQ(first_cid.c_str(), token1->publisher_cid().c_str());
// Change context ID
_srs_context->set_id(SrsContextId());
SrsContextId second_cid = _srs_context->get_id();
// Should still fail to acquire same stream with different context
SrsStreamPublishToken *token2 = NULL;
HELPER_EXPECT_FAILED(manager.acquire_token(&req, token2));
EXPECT_TRUE(token2 == NULL);
// Clean up
srs_freep(token1);
}
VOID TEST(StreamTokenTest, TokenUrlCaseSensitivity)
{
srs_error_t err;
SrsStreamPublishTokenManager manager;
// Test case sensitivity of stream URLs
MockStreamTokenRequest req1("/live/Stream1");
MockStreamTokenRequest req2("/live/stream1");
MockStreamTokenRequest req3("/LIVE/STREAM1");
SrsStreamPublishToken *token1 = NULL;
SrsStreamPublishToken *token2 = NULL;
SrsStreamPublishToken *token3 = NULL;
// All should be treated as different streams (case sensitive)
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req1, token1));
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req2, token2));
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req3, token3));
EXPECT_TRUE(token1 != NULL);
EXPECT_TRUE(token2 != NULL);
EXPECT_TRUE(token3 != NULL);
EXPECT_STREQ("/live/Stream1", token1->stream_url().c_str());
EXPECT_STREQ("/live/stream1", token2->stream_url().c_str());
EXPECT_STREQ("/LIVE/STREAM1", token3->stream_url().c_str());
// Clean up
srs_freep(token1);
srs_freep(token2);
srs_freep(token3);
}
VOID TEST(StreamTokenTest, TokenManagerMemoryLeakPrevention)
{
srs_error_t err;
// Test that repeated acquire/release cycles don't cause memory leaks
for (int cycle = 0; cycle < 10; cycle++) {
SrsStreamPublishTokenManager manager;
// Acquire and release multiple tokens
for (int i = 0; i < 20; i++) {
string stream_url = "/live/leak_test_" + srs_int2str(i);
MockStreamTokenRequest req(stream_url);
SrsStreamPublishToken *token = NULL;
HELPER_EXPECT_SUCCESS(manager.acquire_token(&req, token));
EXPECT_TRUE(token != NULL);
// Immediately release
srs_freep(token);
}
// Manager destructor should clean up everything
}
// If we reach here without crashes or excessive memory usage, test passes
EXPECT_TRUE(true);
}

View File

@ -0,0 +1,15 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_UTEST_STREAM_TOKEN_HPP
#define SRS_UTEST_STREAM_TOKEN_HPP
/*
#include <srs_utest_stream_token.hpp>
*/
#include <srs_utest.hpp>
#endif