AI: Fix race condition causing immediate deletion of new sources. v7.0.127 (#4449) (#4576)

**Problem**: Newly created sources (RTMP/SRT/RTC/RTSP) were being
immediately marked as "dead" and deleted by the cleanup timer before
publishers could connect, causing "new live source, dead=1" errors.

**Root Cause**: All source constructors initialized `stream_die_at_ =
0`, causing `stream_is_dead()` to return `true` immediately since
current time was always greater than `0 + 3 seconds`.

**Solution**: Changed all four source constructors to initialize
`stream_die_at_ = srs_time_now_cached()`, giving newly created sources a
proper 3-second grace period before cleanup.
This commit is contained in:
OSSRS-AI 2025-11-13 21:24:07 -05:00 committed by GitHub
parent 6e93dd73b5
commit a3a2fa5ceb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 230 additions and 21 deletions

View File

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a>
## SRS 7.0 Changelog
* v7.0, 2025-11-14, AI: Fix race condition causing immediate deletion of new sources. v7.0.127 (#4449)
* v7.0, 2025-11-11, AI: WebRTC: Support optional msid attribute per RFC 8830. v7.0.126 (#4570)
* v7.0, 2025-11-11, AI: SRT: Stop TS parsing after codec detection. v7.0.125 (#4569)
* v7.0, 2025-11-09, AI: WebRTC: Support G.711 (PCMU/PCMA) audio codec for WebRTC. v7.0.124 (#4075)

View File

@ -34,6 +34,9 @@ function SrsRtcWhipWhepAsync() {
self.displayStream = null;
self.userStream = null;
// Store the WHIP session resource URL from Location header for cleanup.
self.resourceUrl = null;
// See https://datatracker.ietf.org/doc/draft-ietf-wish-whip/
// @url The WebRTC url to publish with, for example:
// http://localhost:1985/rtc/v1/whip/?app=live&stream=livestream
@ -111,6 +114,14 @@ function SrsRtcWhipWhepAsync() {
if (xhr.status !== 200 && xhr.status !== 201) return reject(xhr);
const data = xhr.responseText;
console.log("Got answer: ", data);
// Extract Location header for WHIP session resource URL.
const location = xhr.getResponseHeader('Location');
if (location) {
self.resourceUrl = new URL(location, url).href;
console.log(`WHIP session resource URL: ${self.resourceUrl}`);
}
return data.code ? reject(xhr) : resolve(data);
}
xhr.open('POST', url, true);
@ -159,6 +170,14 @@ function SrsRtcWhipWhepAsync() {
if (xhr.status !== 200 && xhr.status !== 201) return reject(xhr);
const data = xhr.responseText;
console.log("Got answer: ", data);
// Extract Location header for WHEP session resource URL.
const location = xhr.getResponseHeader('Location');
if (location) {
self.resourceUrl = new URL(location, url).href;
console.log(`WHEP session resource URL: ${self.resourceUrl}`);
}
return data.code ? reject(xhr) : resolve(data);
}
xhr.open('POST', url, true);
@ -190,6 +209,25 @@ function SrsRtcWhipWhepAsync() {
});
self.userStream = null;
}
// Send DELETE request to WHIP session resource URL to cleanup server resources.
if (self.resourceUrl) {
const xhr = new XMLHttpRequest();
xhr.open('DELETE', self.resourceUrl, true);
xhr.onload = function() {
if (xhr.readyState !== xhr.DONE) return;
if (xhr.status === 200) {
console.log(`WHIP session deleted: ${self.resourceUrl}`);
} else {
console.warn(`Failed to delete WHIP session: ${self.resourceUrl}, status: ${xhr.status}`);
}
};
xhr.onerror = function() {
console.warn(`Error deleting WHIP session: ${self.resourceUrl}`);
};
xhr.send();
self.resourceUrl = null;
}
};
// The callback when got local stream.

View File

@ -491,16 +491,26 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter *w, ISrsHtt
vcodec = r->query_get("codec");
}
string acodec = r->query_get("acodec");
// For client to specifies whether encrypt by SRTP.
string srtp = r->query_get("encrypt");
string dtls = r->query_get("dtls");
srs_trace("RTC publish %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, vcodec=%s, acodec=%s",
srs_trace("RTC publish %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, vcodec=%s, acodec=%s, srtp=%s, dtls=%s",
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), ruc.req_->app_.c_str(), ruc.req_->stream_.c_str(),
remote_sdp_str.length(), eip.c_str(), vcodec.c_str(), acodec.c_str());
remote_sdp_str.length(), eip.c_str(), vcodec.c_str(), acodec.c_str(), srtp.c_str(), dtls.c_str());
ruc.eip_ = eip;
ruc.vcodec_ = vcodec;
ruc.acodec_ = acodec;
ruc.publish_ = true;
ruc.dtls_ = ruc.srtp_ = true;
// For client to specifies whether encrypt by SRTP.
ruc.dtls_ = (dtls != "false");
if (srtp.empty()) {
ruc.srtp_ = config_->get_rtc_server_encrypt();
} else {
ruc.srtp_ = (srtp != "false");
}
// TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information.
ruc.remote_sdp_str_ = remote_sdp_str;

View File

@ -335,7 +335,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<Sr
pps = source;
} else {
SrsSharedPtr<SrsRtcSource> source = SrsSharedPtr<SrsRtcSource>(new SrsRtcSource());
srs_trace("new rtc source, stream_url=%s", stream_url.c_str());
srs_trace("new rtc source, stream_url=%s, dead=%d", stream_url.c_str(), source->stream_is_dead());
pps = source;
pool_[stream_url] = source;
@ -399,7 +399,10 @@ SrsRtcSource::SrsRtcSource()
circuit_breaker_ = _srs_circuit_breaker;
pli_for_rtmp_ = pli_elapsed_ = 0;
stream_die_at_ = 0;
// Initialize stream_die_at_ to current time to prevent newly created sources
// from being immediately considered dead by stream_is_dead() check.
// @see https://github.com/ossrs/srs/issues/4449
stream_die_at_ = srs_time_now_cached();
app_factory_ = _srs_app_factory;
}

View File

@ -1659,7 +1659,7 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<S
pps = source;
} else {
SrsSharedPtr<SrsLiveSource> source = app_factory_->create_live_source();
srs_trace("new live source, stream_url=%s", stream_url.c_str());
srs_trace("new live source, stream_url=%s, dead=%d", stream_url.c_str(), source->stream_is_dead());
pps = source;
// Callback to notify request of source creation
@ -1781,7 +1781,10 @@ SrsLiveSource::SrsLiveSource()
mix_queue_ = new SrsMixQueue();
can_publish_ = true;
stream_die_at_ = 0;
// Initialize stream_die_at_ to current time to prevent newly created sources
// from being immediately considered dead by stream_is_dead() check.
// @see https://github.com/ossrs/srs/issues/4449
stream_die_at_ = srs_time_now_cached();
publisher_idle_at_ = 0;
rtmp_bridge_ = NULL;

View File

@ -194,7 +194,7 @@ srs_error_t SrsRtspSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<S
pps = source;
} else {
SrsSharedPtr<SrsRtspSource> source = SrsSharedPtr<SrsRtspSource>(new SrsRtspSource());
srs_trace("new rtsp source, stream_url=%s", stream_url.c_str());
srs_trace("new rtsp source, stream_url=%s, dead=%d", stream_url.c_str(), source->stream_is_dead());
pps = source;
pool_[stream_url] = source;
@ -249,7 +249,10 @@ SrsRtspSource::SrsRtspSource()
req_ = NULL;
stream_die_at_ = 0;
// Initialize stream_die_at_ to current time to prevent newly created sources
// from being immediately considered dead by stream_is_dead() check.
// @see https://github.com/ossrs/srs/issues/4449
stream_die_at_ = srs_time_now_cached();
stat_ = _srs_stat;
circuit_breaker_ = _srs_circuit_breaker;

View File

@ -179,7 +179,7 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<Sr
pps = source;
} else {
SrsSharedPtr<SrsSrtSource> source(new SrsSrtSource());
srs_trace("new srt source, stream_url=%s", stream_url.c_str());
srs_trace("new srt source, stream_url=%s, dead=%d", stream_url.c_str(), source->stream_is_dead());
pps = source;
pool_[stream_url] = source;
@ -1230,7 +1230,10 @@ SrsSrtSource::SrsSrtSource()
req_ = NULL;
can_publish_ = true;
srt_bridge_ = NULL;
stream_die_at_ = 0;
// Initialize stream_die_at_ to current time to prevent newly created sources
// from being immediately considered dead by stream_is_dead() check.
// @see https://github.com/ossrs/srs/issues/4449
stream_die_at_ = srs_time_now_cached();
stat_ = _srs_stat;
format_ = new SrsSrtFormat();

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7
#define VERSION_MINOR 0
#define VERSION_REVISION 126
#define VERSION_REVISION 127
#endif

View File

@ -8,6 +8,11 @@
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_rtmp_source.hpp>
#include <srs_app_srt_source.hpp>
#ifdef SRS_RTSP
#include <srs_app_rtsp_source.hpp>
#endif
#include <srs_core_autofree.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_codec.hpp>
@ -1790,7 +1795,6 @@ VOID TEST(AppTest2, RtcSourceManagerFetchOrCreateInitializeFailure)
mock_source->set_initialize_error(initialize_error_);
}
SrsSharedPtr<SrsRtcSource> source = SrsSharedPtr<SrsRtcSource>(mock_source);
srs_trace("new rtc source, stream_url=%s", stream_url.c_str());
pps = source;
pool_[stream_url] = source;
@ -1870,7 +1874,6 @@ VOID TEST(AppTest2, RtcSourceManagerFetchOrCreateErrorWrapping)
srs_freep(init_error);
SrsSharedPtr<SrsRtcSource> source = SrsSharedPtr<SrsRtcSource>(mock_source);
srs_trace("new rtc source, stream_url=%s", stream_url.c_str());
pps = source;
pool_[stream_url] = source;
@ -2176,14 +2179,16 @@ VOID TEST(AppTest2, RtcSourceOnConsumerDestroyStreamDeath)
// Verify initial state - stream is not created (no publisher)
EXPECT_FALSE(source->is_created_);
EXPECT_EQ(0, source->stream_die_at_);
// After fix #4449: stream_die_at_ is initialized to current time, not 0
srs_utime_t initial_die_at = source->stream_die_at_;
EXPECT_GT(initial_die_at, 0);
// Remove consumer when stream is not created - should set stream_die_at_
// Remove consumer when stream is not created - should update stream_die_at_
srs_utime_t before_time = srs_time_now_cached();
source->on_consumer_destroy(consumer);
srs_utime_t after_time = srs_time_now_cached();
// Verify stream death time was set
// Verify stream death time was updated to current time
EXPECT_TRUE(source->stream_die_at_ >= before_time);
EXPECT_TRUE(source->stream_die_at_ <= after_time);
@ -2215,13 +2220,15 @@ VOID TEST(AppTest2, RtcSourceOnConsumerDestroyStreamAlive)
// Verify initial state
EXPECT_TRUE(source->is_created_);
EXPECT_EQ(0, source->stream_die_at_);
// After fix #4449: stream_die_at_ is initialized to current time, not 0
srs_utime_t initial_die_at = source->stream_die_at_;
EXPECT_GT(initial_die_at, 0);
// Remove consumer when stream is created - should NOT set stream_die_at_
// Remove consumer when stream is created - should NOT update stream_die_at_
source->on_consumer_destroy(consumer);
// Verify stream death time was NOT set
EXPECT_EQ(0, source->stream_die_at_);
// Verify stream death time was NOT changed (still has initial value)
EXPECT_EQ(initial_die_at, source->stream_die_at_);
// Clean up
srs_freep(consumer);
@ -4598,3 +4605,144 @@ VOID TEST(AppTest2, RtcSourceGetTrackDescMultipleMatchingVideoTracks)
EXPECT_EQ("video-h264-track-2", all_video_tracks[1]->id_);
EXPECT_EQ("video-h265-track", all_video_tracks[2]->id_);
}
// Reproduce issue 4449: Newly created source is immediately considered dead
// When a new source is created with stream_die_at_=0, if notify() timer fires
// before a publisher connects, the source gets deleted because stream_is_dead()
// returns true. This causes "new live source, dead=1" in logs.
VOID TEST(ReproduceIssue4449, RtmpLiveSourceNotifyDeletesNewlyCreatedSource)
{
srs_error_t err;
// Create a source manager
SrsUniquePtr<SrsLiveSourceManager> manager(new SrsLiveSourceManager());
HELPER_EXPECT_SUCCESS(manager->initialize());
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->host_ = "localhost";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "thegobot";
// Fetch or create source (this creates a new source)
SrsSharedPtr<SrsLiveSource> source;
HELPER_EXPECT_SUCCESS(manager->fetch_or_create(req.get(), source));
// After fix: newly created source should NOT be dead
EXPECT_FALSE(source->stream_is_dead());
EXPECT_EQ(1, (int)manager->pool_.size());
// Simulate timer firing - call notify()
int pool_size_before = (int)manager->pool_.size();
HELPER_EXPECT_SUCCESS(manager->notify(0, 0, 0));
int pool_size_after = (int)manager->pool_.size();
// After fix: the newly created source should NOT be deleted by notify
EXPECT_EQ(pool_size_before, pool_size_after);
EXPECT_EQ(1, pool_size_after);
}
// Test SRT source for the same issue
VOID TEST(ReproduceIssue4449, SrtSourceNotifyDeletesNewlyCreatedSource)
{
srs_error_t err;
// Create a SRT source manager
SrsUniquePtr<SrsSrtSourceManager> manager(new SrsSrtSourceManager());
HELPER_EXPECT_SUCCESS(manager->initialize());
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->host_ = "localhost";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "thegobot";
// Fetch or create source (this creates a new source)
SrsSharedPtr<SrsSrtSource> source;
HELPER_EXPECT_SUCCESS(manager->fetch_or_create(req.get(), source));
// After fix: newly created source should NOT be dead
EXPECT_FALSE(source->stream_is_dead());
EXPECT_EQ(1, (int)manager->pool_.size());
// Simulate timer firing - call notify()
int pool_size_before = (int)manager->pool_.size();
HELPER_EXPECT_SUCCESS(manager->notify(0, 0, 0));
int pool_size_after = (int)manager->pool_.size();
// After fix: the newly created source should NOT be deleted by notify
EXPECT_EQ(pool_size_before, pool_size_after);
EXPECT_EQ(1, pool_size_after);
}
// Test RTC source for the same issue
VOID TEST(ReproduceIssue4449, RtcSourceNotifyDeletesNewlyCreatedSource)
{
srs_error_t err;
// Create a RTC source manager
SrsUniquePtr<SrsRtcSourceManager> manager(new SrsRtcSourceManager());
HELPER_EXPECT_SUCCESS(manager->initialize());
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->host_ = "localhost";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "thegobot";
// Fetch or create source (this creates a new source)
SrsSharedPtr<SrsRtcSource> source;
HELPER_EXPECT_SUCCESS(manager->fetch_or_create(req.get(), source));
// After fix: newly created source should NOT be dead
EXPECT_FALSE(source->stream_is_dead());
EXPECT_EQ(1, (int)manager->pool_.size());
// Simulate timer firing - call notify()
int pool_size_before = (int)manager->pool_.size();
HELPER_EXPECT_SUCCESS(manager->notify(0, 0, 0));
int pool_size_after = (int)manager->pool_.size();
// After fix: the newly created source should NOT be deleted by notify
EXPECT_EQ(pool_size_before, pool_size_after);
EXPECT_EQ(1, pool_size_after);
}
#ifdef SRS_RTSP
// Test RTSP source for the same issue
VOID TEST(ReproduceIssue4449, RtspSourceNotifyDeletesNewlyCreatedSource)
{
srs_error_t err;
// Create a RTSP source manager
SrsUniquePtr<SrsRtspSourceManager> manager(new SrsRtspSourceManager());
HELPER_EXPECT_SUCCESS(manager->initialize());
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->host_ = "localhost";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "thegobot";
// Fetch or create source (this creates a new source)
SrsSharedPtr<SrsRtspSource> source;
HELPER_EXPECT_SUCCESS(manager->fetch_or_create(req.get(), source));
// After fix: newly created source should NOT be dead
EXPECT_FALSE(source->stream_is_dead());
EXPECT_EQ(1, (int)manager->pool_.size());
// Simulate timer firing - call notify()
int pool_size_before = (int)manager->pool_.size();
HELPER_EXPECT_SUCCESS(manager->notify(0, 0, 0));
int pool_size_after = (int)manager->pool_.size();
// After fix: the newly created source should NOT be deleted by notify
EXPECT_EQ(pool_size_before, pool_size_after);
EXPECT_EQ(1, pool_size_after);
}
#endif