AI: SRT: Fix player not exiting when publisher disconnects. v7.0.130 (#4591) (#4596)

When SRT publisher disconnects, player hangs indefinitely instead of
exiting after the configured peer_idle_timeout. This is because the
consumer wait() never checks if the publisher is still connected.

After fix, player waits for peer_idle_timeout (default 10s) then exits
gracefully when no packets arrive and publisher has disconnected.
This commit is contained in:
OSSRS-AI 2025-11-27 19:32:24 -05:00 committed by GitHub
parent 4101900daf
commit 18c30dc07b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 38 additions and 14 deletions

View File

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a>
## SRS 7.0 Changelog
* v7.0, 2025-11-28, SRT: Fix player not exiting when publisher disconnects. v7.0.130 (#4591)
* v7.0, 2025-11-27, Merge [#4588](https://github.com/ossrs/srs/pull/4588): RTMP: Ignore FMLE start packet after flash publish. v7.0.129 (#4588)
* v7.0, 2025-11-18, AI: API: Change pagination default count to 10, minimum 1. v7.0.128
* v7.0, 2025-11-14, AI: Fix race condition causing immediate deletion of new sources. v7.0.127 (#4449)

View File

@ -371,6 +371,8 @@ public:
public:
// SRT config
virtual std::vector<std::string> get_srt_listens() = 0;
// Get the srt SRTO_PEERIDLETIMEO, peer idle timeout, default is 10000ms.
virtual srs_utime_t get_srto_peeridletimeout() = 0;
public:
// Stream caster config

View File

@ -619,6 +619,7 @@ srs_error_t SrsMpegtsSrtConn::do_playing()
}
int nb_packets = 0;
srs_utime_t timeout = config_->get_srto_peeridletimeout();
while (true) {
// Check recv thread error first, so we can detect the client disconnecting event.
@ -634,8 +635,13 @@ srs_error_t SrsMpegtsSrtConn::do_playing()
SrsSrtPacket *pkt_raw = NULL;
consumer->dump_packet(&pkt_raw);
if (!pkt_raw) {
// TODO: FIXME: We should check the quit event.
consumer->wait(1, 1000 * SRS_UTIME_MILLISECONDS);
// Wait for peer_idle_timeout. Note that enqueue() signals the cond, so we wake up
// immediately when packets arrive during normal playback. Only check publisher disconnect
// when no packets available after timeout. @see https://github.com/ossrs/srs/issues/4591
bool has_msgs = consumer->wait(1, timeout);
if (!has_msgs && srt_source_->can_publish()) {
return srs_error_new(ERROR_SRT_SOURCE_DISCONNECTED, "srt source disconnected");
}
continue;
}

View File

@ -293,20 +293,23 @@ srs_error_t SrsSrtConsumer::dump_packet(SrsSrtPacket **ppkt)
return err;
}
void SrsSrtConsumer::wait(int nb_msgs, srs_utime_t timeout)
bool SrsSrtConsumer::wait(int nb_msgs, srs_utime_t timeout)
{
mw_min_msgs_ = nb_msgs;
// when duration ok, signal to flush.
if ((int)queue_.size() > mw_min_msgs_) {
return;
// When duration ok, signal to flush.
if ((int)queue_.size() >= mw_min_msgs_) {
return true;
}
// the enqueue will notify this cond.
// The enqueue will notify this cond.
mw_waiting_ = true;
// use cond block wait for high performance mode.
// Use cond block wait for high performance mode.
srs_cond_timedwait(mw_wait_, timeout);
// Return true if there are enough messages after wait.
return (int)queue_.size() >= mw_min_msgs_;
}
SrsSrtFrameBuilder::SrsSrtFrameBuilder(ISrsFrameTarget *target)

View File

@ -113,7 +113,9 @@ public:
public:
virtual srs_error_t enqueue(SrsSrtPacket *packet) = 0;
virtual srs_error_t dump_packet(SrsSrtPacket **ppkt) = 0;
virtual void wait(int nb_msgs, srs_utime_t timeout) = 0;
// Wait for at-least some messages incoming in queue.
// @return true if there are enough messages, false if timeout.
virtual bool wait(int nb_msgs, srs_utime_t timeout) = 0;
};
// The SRT consumer, consume packets from SRT stream source.
@ -146,7 +148,8 @@ public:
// For SRT, we only got one packet, because there is not many packets in queue.
virtual srs_error_t dump_packet(SrsSrtPacket **ppkt);
// Wait for at-least some messages incoming in queue.
virtual void wait(int nb_msgs, srs_utime_t timeout);
// @return true if there are enough messages, false if timeout.
virtual bool wait(int nb_msgs, srs_utime_t timeout);
};
// The SRT format interface.
@ -271,6 +274,8 @@ public:
virtual SrsContextId source_id() = 0;
virtual SrsContextId pre_source_id() = 0;
virtual void on_consumer_destroy(ISrsSrtConsumer *consumer) = 0;
// Whether we can publish stream to the source, return true if no publisher.
virtual bool can_publish() = 0;
};
// A SRT source is a stream, to publish and to play with.

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7
#define VERSION_MINOR 0
#define VERSION_REVISION 129
#define VERSION_REVISION 130
#endif

View File

@ -397,7 +397,8 @@
XX(ERROR_SRT_SOURCE_BUSY, 6007, "SrtStreamBusy", "SRT stream already exists or busy") \
XX(ERROR_RTMP_TO_SRT, 6008, "SrtFromRtmp", "Covert RTMP to SRT failed") \
XX(ERROR_SRT_STATS, 6009, "SrtStats", "SRT get statistic data failed") \
XX(ERROR_SRT_TO_RTMP_EMPTY_SPS_PPS, 6010, "SrtToRtmpEmptySpsPps", "SRT to rtmp have empty sps or pps")
XX(ERROR_SRT_TO_RTMP_EMPTY_SPS_PPS, 6010, "SrtToRtmpEmptySpsPps", "SRT to rtmp have empty sps or pps") \
XX(ERROR_SRT_SOURCE_DISCONNECTED, 6011, "SrtSourceDisconnected", "SRT source publisher disconnected")
/**************************************************/
/* For user-define error. */

View File

@ -1425,8 +1425,9 @@ srs_error_t MockSrtConsumer::dump_packet(SrsSrtPacket **ppkt)
return srs_success;
}
void MockSrtConsumer::wait(int nb_msgs, srs_utime_t timeout)
bool MockSrtConsumer::wait(int nb_msgs, srs_utime_t timeout)
{
return true;
}
void MockSrtConsumer::set_enqueue_error(srs_error_t err)

View File

@ -107,7 +107,7 @@ public:
virtual ~MockSrtConsumer();
virtual srs_error_t enqueue(SrsSrtPacket *packet);
virtual srs_error_t dump_packet(SrsSrtPacket **ppkt);
virtual void wait(int nb_msgs, srs_utime_t timeout);
virtual bool wait(int nb_msgs, srs_utime_t timeout);
void set_enqueue_error(srs_error_t err);
void reset();
};

View File

@ -534,6 +534,7 @@ public:
virtual bool get_srt_enabled(std::string vhost) { return srt_enabled_; }
virtual std::string get_srt_default_streamid() { return "#!::r=live/livestream,m=request"; }
virtual bool get_srt_to_rtmp(std::string vhost) { return srt_to_rtmp_; }
virtual srs_utime_t get_srto_peeridletimeout() { return 10 * SRS_UTIME_SECONDS; }
virtual bool get_rtc_to_rtmp(std::string vhost) { return rtc_to_rtmp_; }
virtual srs_utime_t get_rtc_stun_timeout(std::string vhost) { return 30 * SRS_UTIME_SECONDS; }
virtual bool get_rtc_stun_strict_check(std::string vhost) { return false; }

View File

@ -174,6 +174,10 @@ VOID TEST(BasicWorkflowSrtConnTest, ManuallyVerifyForPlayer)
mock_srt_conn->streamid_ = "#!::h=127.0.0.1,r=live/livestream,m=request";
mock_srt_conn->srt_fd_ = 100;
// Simulate a publisher is connected (can_publish=false means publisher exists)
// @see https://github.com/ossrs/srs/issues/4591
mock_srt_sources->set_can_publish(false);
// Create SrsMpegtsSrtConn - it takes ownership of srt_conn
SrsUniquePtr<SrsMpegtsSrtConn> conn(new SrsMpegtsSrtConn(mock_manager.get(), 100, "192.168.1.100", 9000));