diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 93904fe92..2eb333f41 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 7.0 Changelog +* v7.0, 2025-11-14, AI: Fix race condition causing immediate deletion of new sources. v7.0.127 (#4449) * v7.0, 2025-11-11, AI: WebRTC: Support optional msid attribute per RFC 8830. v7.0.126 (#4570) * v7.0, 2025-11-11, AI: SRT: Stop TS parsing after codec detection. v7.0.125 (#4569) * v7.0, 2025-11-09, AI: WebRTC: Support G.711 (PCMU/PCMA) audio codec for WebRTC. v7.0.124 (#4075) diff --git a/trunk/research/players/js/srs.sdk.js b/trunk/research/players/js/srs.sdk.js index d007ea297..c236ac725 100644 --- a/trunk/research/players/js/srs.sdk.js +++ b/trunk/research/players/js/srs.sdk.js @@ -34,6 +34,9 @@ function SrsRtcWhipWhepAsync() { self.displayStream = null; self.userStream = null; + // Store the WHIP session resource URL from Location header for cleanup. + self.resourceUrl = null; + // See https://datatracker.ietf.org/doc/draft-ietf-wish-whip/ // @url The WebRTC url to publish with, for example: // http://localhost:1985/rtc/v1/whip/?app=live&stream=livestream @@ -111,6 +114,14 @@ function SrsRtcWhipWhepAsync() { if (xhr.status !== 200 && xhr.status !== 201) return reject(xhr); const data = xhr.responseText; console.log("Got answer: ", data); + + // Extract Location header for WHIP session resource URL. + const location = xhr.getResponseHeader('Location'); + if (location) { + self.resourceUrl = new URL(location, url).href; + console.log(`WHIP session resource URL: ${self.resourceUrl}`); + } + return data.code ? reject(xhr) : resolve(data); } xhr.open('POST', url, true); @@ -159,6 +170,14 @@ function SrsRtcWhipWhepAsync() { if (xhr.status !== 200 && xhr.status !== 201) return reject(xhr); const data = xhr.responseText; console.log("Got answer: ", data); + + // Extract Location header for WHEP session resource URL. + const location = xhr.getResponseHeader('Location'); + if (location) { + self.resourceUrl = new URL(location, url).href; + console.log(`WHEP session resource URL: ${self.resourceUrl}`); + } + return data.code ? reject(xhr) : resolve(data); } xhr.open('POST', url, true); @@ -190,6 +209,25 @@ function SrsRtcWhipWhepAsync() { }); self.userStream = null; } + + // Send DELETE request to WHIP session resource URL to cleanup server resources. + if (self.resourceUrl) { + const xhr = new XMLHttpRequest(); + xhr.open('DELETE', self.resourceUrl, true); + xhr.onload = function() { + if (xhr.readyState !== xhr.DONE) return; + if (xhr.status === 200) { + console.log(`WHIP session deleted: ${self.resourceUrl}`); + } else { + console.warn(`Failed to delete WHIP session: ${self.resourceUrl}, status: ${xhr.status}`); + } + }; + xhr.onerror = function() { + console.warn(`Error deleting WHIP session: ${self.resourceUrl}`); + }; + xhr.send(); + self.resourceUrl = null; + } }; // The callback when got local stream. diff --git a/trunk/src/app/srs_app_rtc_api.cpp b/trunk/src/app/srs_app_rtc_api.cpp index 2d6410eb9..a82d2af4f 100644 --- a/trunk/src/app/srs_app_rtc_api.cpp +++ b/trunk/src/app/srs_app_rtc_api.cpp @@ -491,16 +491,26 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter *w, ISrsHtt vcodec = r->query_get("codec"); } string acodec = r->query_get("acodec"); + // For client to specifies whether encrypt by SRTP. + string srtp = r->query_get("encrypt"); + string dtls = r->query_get("dtls"); - srs_trace("RTC publish %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, vcodec=%s, acodec=%s", + srs_trace("RTC publish %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, vcodec=%s, acodec=%s, srtp=%s, dtls=%s", streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), ruc.req_->app_.c_str(), ruc.req_->stream_.c_str(), - remote_sdp_str.length(), eip.c_str(), vcodec.c_str(), acodec.c_str()); + remote_sdp_str.length(), eip.c_str(), vcodec.c_str(), acodec.c_str(), srtp.c_str(), dtls.c_str()); ruc.eip_ = eip; ruc.vcodec_ = vcodec; ruc.acodec_ = acodec; ruc.publish_ = true; - ruc.dtls_ = ruc.srtp_ = true; + + // For client to specifies whether encrypt by SRTP. + ruc.dtls_ = (dtls != "false"); + if (srtp.empty()) { + ruc.srtp_ = config_->get_rtc_server_encrypt(); + } else { + ruc.srtp_ = (srtp != "false"); + } // TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information. ruc.remote_sdp_str_ = remote_sdp_str; diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 1b7d07f06..db8f98900 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -335,7 +335,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr source = SrsSharedPtr(new SrsRtcSource()); - srs_trace("new rtc source, stream_url=%s", stream_url.c_str()); + srs_trace("new rtc source, stream_url=%s, dead=%d", stream_url.c_str(), source->stream_is_dead()); pps = source; pool_[stream_url] = source; @@ -399,7 +399,10 @@ SrsRtcSource::SrsRtcSource() circuit_breaker_ = _srs_circuit_breaker; pli_for_rtmp_ = pli_elapsed_ = 0; - stream_die_at_ = 0; + // Initialize stream_die_at_ to current time to prevent newly created sources + // from being immediately considered dead by stream_is_dead() check. + // @see https://github.com/ossrs/srs/issues/4449 + stream_die_at_ = srs_time_now_cached(); app_factory_ = _srs_app_factory; } diff --git a/trunk/src/app/srs_app_rtmp_source.cpp b/trunk/src/app/srs_app_rtmp_source.cpp index 07e058c49..724fc49f7 100644 --- a/trunk/src/app/srs_app_rtmp_source.cpp +++ b/trunk/src/app/srs_app_rtmp_source.cpp @@ -1659,7 +1659,7 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr source = app_factory_->create_live_source(); - srs_trace("new live source, stream_url=%s", stream_url.c_str()); + srs_trace("new live source, stream_url=%s, dead=%d", stream_url.c_str(), source->stream_is_dead()); pps = source; // Callback to notify request of source creation @@ -1781,7 +1781,10 @@ SrsLiveSource::SrsLiveSource() mix_queue_ = new SrsMixQueue(); can_publish_ = true; - stream_die_at_ = 0; + // Initialize stream_die_at_ to current time to prevent newly created sources + // from being immediately considered dead by stream_is_dead() check. + // @see https://github.com/ossrs/srs/issues/4449 + stream_die_at_ = srs_time_now_cached(); publisher_idle_at_ = 0; rtmp_bridge_ = NULL; diff --git a/trunk/src/app/srs_app_rtsp_source.cpp b/trunk/src/app/srs_app_rtsp_source.cpp index dfcb22da3..3dd0d601a 100644 --- a/trunk/src/app/srs_app_rtsp_source.cpp +++ b/trunk/src/app/srs_app_rtsp_source.cpp @@ -194,7 +194,7 @@ srs_error_t SrsRtspSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr source = SrsSharedPtr(new SrsRtspSource()); - srs_trace("new rtsp source, stream_url=%s", stream_url.c_str()); + srs_trace("new rtsp source, stream_url=%s, dead=%d", stream_url.c_str(), source->stream_is_dead()); pps = source; pool_[stream_url] = source; @@ -249,7 +249,10 @@ SrsRtspSource::SrsRtspSource() req_ = NULL; - stream_die_at_ = 0; + // Initialize stream_die_at_ to current time to prevent newly created sources + // from being immediately considered dead by stream_is_dead() check. + // @see https://github.com/ossrs/srs/issues/4449 + stream_die_at_ = srs_time_now_cached(); stat_ = _srs_stat; circuit_breaker_ = _srs_circuit_breaker; diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index d8179b778..3ee893d98 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -179,7 +179,7 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr source(new SrsSrtSource()); - srs_trace("new srt source, stream_url=%s", stream_url.c_str()); + srs_trace("new srt source, stream_url=%s, dead=%d", stream_url.c_str(), source->stream_is_dead()); pps = source; pool_[stream_url] = source; @@ -1230,7 +1230,10 @@ SrsSrtSource::SrsSrtSource() req_ = NULL; can_publish_ = true; srt_bridge_ = NULL; - stream_die_at_ = 0; + // Initialize stream_die_at_ to current time to prevent newly created sources + // from being immediately considered dead by stream_is_dead() check. + // @see https://github.com/ossrs/srs/issues/4449 + stream_die_at_ = srs_time_now_cached(); stat_ = _srs_stat; format_ = new SrsSrtFormat(); diff --git a/trunk/src/core/srs_core_version7.hpp b/trunk/src/core/srs_core_version7.hpp index 5abca4f16..65afefc46 100644 --- a/trunk/src/core/srs_core_version7.hpp +++ b/trunk/src/core/srs_core_version7.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 7 #define VERSION_MINOR 0 -#define VERSION_REVISION 126 +#define VERSION_REVISION 127 #endif \ No newline at end of file diff --git a/trunk/src/utest/srs_utest_ai07.cpp b/trunk/src/utest/srs_utest_ai07.cpp index 3b19d53d6..a38d50cf7 100644 --- a/trunk/src/utest/srs_utest_ai07.cpp +++ b/trunk/src/utest/srs_utest_ai07.cpp @@ -8,6 +8,11 @@ #include #include +#include +#include +#ifdef SRS_RTSP +#include +#endif #include #include #include @@ -1790,7 +1795,6 @@ VOID TEST(AppTest2, RtcSourceManagerFetchOrCreateInitializeFailure) mock_source->set_initialize_error(initialize_error_); } SrsSharedPtr source = SrsSharedPtr(mock_source); - srs_trace("new rtc source, stream_url=%s", stream_url.c_str()); pps = source; pool_[stream_url] = source; @@ -1870,7 +1874,6 @@ VOID TEST(AppTest2, RtcSourceManagerFetchOrCreateErrorWrapping) srs_freep(init_error); SrsSharedPtr source = SrsSharedPtr(mock_source); - srs_trace("new rtc source, stream_url=%s", stream_url.c_str()); pps = source; pool_[stream_url] = source; @@ -2176,14 +2179,16 @@ VOID TEST(AppTest2, RtcSourceOnConsumerDestroyStreamDeath) // Verify initial state - stream is not created (no publisher) EXPECT_FALSE(source->is_created_); - EXPECT_EQ(0, source->stream_die_at_); + // After fix #4449: stream_die_at_ is initialized to current time, not 0 + srs_utime_t initial_die_at = source->stream_die_at_; + EXPECT_GT(initial_die_at, 0); - // Remove consumer when stream is not created - should set stream_die_at_ + // Remove consumer when stream is not created - should update stream_die_at_ srs_utime_t before_time = srs_time_now_cached(); source->on_consumer_destroy(consumer); srs_utime_t after_time = srs_time_now_cached(); - // Verify stream death time was set + // Verify stream death time was updated to current time EXPECT_TRUE(source->stream_die_at_ >= before_time); EXPECT_TRUE(source->stream_die_at_ <= after_time); @@ -2215,13 +2220,15 @@ VOID TEST(AppTest2, RtcSourceOnConsumerDestroyStreamAlive) // Verify initial state EXPECT_TRUE(source->is_created_); - EXPECT_EQ(0, source->stream_die_at_); + // After fix #4449: stream_die_at_ is initialized to current time, not 0 + srs_utime_t initial_die_at = source->stream_die_at_; + EXPECT_GT(initial_die_at, 0); - // Remove consumer when stream is created - should NOT set stream_die_at_ + // Remove consumer when stream is created - should NOT update stream_die_at_ source->on_consumer_destroy(consumer); - // Verify stream death time was NOT set - EXPECT_EQ(0, source->stream_die_at_); + // Verify stream death time was NOT changed (still has initial value) + EXPECT_EQ(initial_die_at, source->stream_die_at_); // Clean up srs_freep(consumer); @@ -4598,3 +4605,144 @@ VOID TEST(AppTest2, RtcSourceGetTrackDescMultipleMatchingVideoTracks) EXPECT_EQ("video-h264-track-2", all_video_tracks[1]->id_); EXPECT_EQ("video-h265-track", all_video_tracks[2]->id_); } + +// Reproduce issue 4449: Newly created source is immediately considered dead +// When a new source is created with stream_die_at_=0, if notify() timer fires +// before a publisher connects, the source gets deleted because stream_is_dead() +// returns true. This causes "new live source, dead=1" in logs. +VOID TEST(ReproduceIssue4449, RtmpLiveSourceNotifyDeletesNewlyCreatedSource) +{ + srs_error_t err; + + // Create a source manager + SrsUniquePtr manager(new SrsLiveSourceManager()); + HELPER_EXPECT_SUCCESS(manager->initialize()); + + // Create a mock request + SrsUniquePtr req(new SrsRequest()); + req->host_ = "localhost"; + req->vhost_ = "test.vhost"; + req->app_ = "live"; + req->stream_ = "thegobot"; + + // Fetch or create source (this creates a new source) + SrsSharedPtr source; + HELPER_EXPECT_SUCCESS(manager->fetch_or_create(req.get(), source)); + + // After fix: newly created source should NOT be dead + EXPECT_FALSE(source->stream_is_dead()); + EXPECT_EQ(1, (int)manager->pool_.size()); + + // Simulate timer firing - call notify() + int pool_size_before = (int)manager->pool_.size(); + HELPER_EXPECT_SUCCESS(manager->notify(0, 0, 0)); + int pool_size_after = (int)manager->pool_.size(); + + // After fix: the newly created source should NOT be deleted by notify + EXPECT_EQ(pool_size_before, pool_size_after); + EXPECT_EQ(1, pool_size_after); +} + +// Test SRT source for the same issue +VOID TEST(ReproduceIssue4449, SrtSourceNotifyDeletesNewlyCreatedSource) +{ + srs_error_t err; + + // Create a SRT source manager + SrsUniquePtr manager(new SrsSrtSourceManager()); + HELPER_EXPECT_SUCCESS(manager->initialize()); + + // Create a mock request + SrsUniquePtr req(new SrsRequest()); + req->host_ = "localhost"; + req->vhost_ = "test.vhost"; + req->app_ = "live"; + req->stream_ = "thegobot"; + + // Fetch or create source (this creates a new source) + SrsSharedPtr source; + HELPER_EXPECT_SUCCESS(manager->fetch_or_create(req.get(), source)); + + // After fix: newly created source should NOT be dead + EXPECT_FALSE(source->stream_is_dead()); + EXPECT_EQ(1, (int)manager->pool_.size()); + + // Simulate timer firing - call notify() + int pool_size_before = (int)manager->pool_.size(); + HELPER_EXPECT_SUCCESS(manager->notify(0, 0, 0)); + int pool_size_after = (int)manager->pool_.size(); + + // After fix: the newly created source should NOT be deleted by notify + EXPECT_EQ(pool_size_before, pool_size_after); + EXPECT_EQ(1, pool_size_after); +} + +// Test RTC source for the same issue +VOID TEST(ReproduceIssue4449, RtcSourceNotifyDeletesNewlyCreatedSource) +{ + srs_error_t err; + + // Create a RTC source manager + SrsUniquePtr manager(new SrsRtcSourceManager()); + HELPER_EXPECT_SUCCESS(manager->initialize()); + + // Create a mock request + SrsUniquePtr req(new SrsRequest()); + req->host_ = "localhost"; + req->vhost_ = "test.vhost"; + req->app_ = "live"; + req->stream_ = "thegobot"; + + // Fetch or create source (this creates a new source) + SrsSharedPtr source; + HELPER_EXPECT_SUCCESS(manager->fetch_or_create(req.get(), source)); + + // After fix: newly created source should NOT be dead + EXPECT_FALSE(source->stream_is_dead()); + EXPECT_EQ(1, (int)manager->pool_.size()); + + // Simulate timer firing - call notify() + int pool_size_before = (int)manager->pool_.size(); + HELPER_EXPECT_SUCCESS(manager->notify(0, 0, 0)); + int pool_size_after = (int)manager->pool_.size(); + + // After fix: the newly created source should NOT be deleted by notify + EXPECT_EQ(pool_size_before, pool_size_after); + EXPECT_EQ(1, pool_size_after); +} + +#ifdef SRS_RTSP +// Test RTSP source for the same issue +VOID TEST(ReproduceIssue4449, RtspSourceNotifyDeletesNewlyCreatedSource) +{ + srs_error_t err; + + // Create a RTSP source manager + SrsUniquePtr manager(new SrsRtspSourceManager()); + HELPER_EXPECT_SUCCESS(manager->initialize()); + + // Create a mock request + SrsUniquePtr req(new SrsRequest()); + req->host_ = "localhost"; + req->vhost_ = "test.vhost"; + req->app_ = "live"; + req->stream_ = "thegobot"; + + // Fetch or create source (this creates a new source) + SrsSharedPtr source; + HELPER_EXPECT_SUCCESS(manager->fetch_or_create(req.get(), source)); + + // After fix: newly created source should NOT be dead + EXPECT_FALSE(source->stream_is_dead()); + EXPECT_EQ(1, (int)manager->pool_.size()); + + // Simulate timer firing - call notify() + int pool_size_before = (int)manager->pool_.size(); + HELPER_EXPECT_SUCCESS(manager->notify(0, 0, 0)); + int pool_size_after = (int)manager->pool_.size(); + + // After fix: the newly created source should NOT be deleted by notify + EXPECT_EQ(pool_size_before, pool_size_after); + EXPECT_EQ(1, pool_size_after); +} +#endif