diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index a9bfb620e..84ad906a8 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 7.0 Changelog +* v7.0, 2025-09-19, Merge [#4503](https://github.com/ossrs/srs/pull/4503): AI: Refine RTMP/SRT/RTC bridge. v7.0.90 (#4503) * v7.0, 2025-09-15, RTC2RTMP: Fix sequence number wraparound assertion crashes. v7.0.89 (#4491) * v7.0, 2025-09-14, Merge [#4489](https://github.com/ossrs/srs/pull/4489): Improve coverage for kernel. v7.0.88 (#4489) * v7.0, 2025-09-14, Merge [#4488](https://github.com/ossrs/srs/pull/4488): AI: Add utests for kernel and protocol. v7.0.87 (#4488) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 7e6f1926e..50e361cb8 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1205,8 +1205,11 @@ srs_error_t SrsRtcPublishStream::initialize(ISrsRequest *r, SrsRtcSourceDescript source_->set_publish_stream(this); // TODO: FIMXE: Check it in SrsRtcConnection::add_publisher? - SrsSharedPtr live_source = _srs_sources->fetch(r); - if (live_source.get() && !live_source->can_publish(false)) { + SrsSharedPtr live_source; + if ((err = _srs_sources->fetch_or_create(r, live_source)) != srs_success) { + return srs_error_wrap(err, "create live source"); + } + if (!live_source->can_publish(false)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str()); } @@ -1224,29 +1227,34 @@ srs_error_t SrsRtcPublishStream::initialize(ISrsRequest *r, SrsRtcSourceDescript } } - // Bridge to rtmp -#if defined(SRS_FFMPEG_FIT) + // Create the bridge for RTC. + SrsRtcBridge *bridge = new SrsRtcBridge(); + + // Bridge to RTMP. + // TODO: Support bridge to RTSP. bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost_); if (rtc_to_rtmp) { - if ((err = _srs_sources->fetch_or_create(r, live_source)) != srs_success) { - return srs_error_wrap(err, "create source"); - } - // Disable GOP cache for RTC2RTMP bridge, to keep the streams in sync, // especially for stream merging. live_source->set_cache(false); - SrsCompositeBridge *bridge = new SrsCompositeBridge(); - bridge->append(new SrsFrameToRtmpBridge(live_source)); - - if ((err = bridge->initialize(r)) != srs_success) { - srs_freep(bridge); - return srs_error_wrap(err, "create bridge"); - } - - source_->set_bridge(bridge); + // Convert RTC to RTMP. + bridge->enable_rtc2rtmp(live_source); } -#endif + + if (bridge->empty()) { + srs_freep(bridge); + } else if ((err = bridge->initialize(r)) != srs_success) { + srs_freep(bridge); + return srs_error_wrap(err, "create bridge"); + } + + if ((err = bridge->initialize(r)) != srs_success) { + srs_freep(bridge); + return srs_error_wrap(err, "create bridge"); + } + + source_->set_bridge(bridge); return err; } diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 89ac95834..b84fee9e8 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -396,10 +396,7 @@ SrsRtcSource::SrsRtcSource() stream_desc_ = NULL; req_ = NULL; - bridge_ = NULL; -#ifdef SRS_FFMPEG_FIT - frame_builder_ = NULL; -#endif + rtc_bridge_ = NULL; circuit_breaker_ = _srs_circuit_breaker; pli_for_rtmp_ = pli_elapsed_ = 0; @@ -412,10 +409,7 @@ SrsRtcSource::~SrsRtcSource() // for all consumers are auto free. consumers_.clear(); -#ifdef SRS_FFMPEG_FIT - srs_freep(frame_builder_); -#endif - srs_freep(bridge_); + srs_freep(rtc_bridge_); srs_freep(req_); srs_freep(stream_desc_); @@ -591,15 +585,10 @@ SrsContextId SrsRtcSource::pre_source_id() return _pre_source_id; } -void SrsRtcSource::set_bridge(ISrsStreamBridge *bridge) +void SrsRtcSource::set_bridge(ISrsRtcBridge *bridge) { - srs_freep(bridge_); - bridge_ = bridge; - -#ifdef SRS_FFMPEG_FIT - srs_freep(frame_builder_); - frame_builder_ = new SrsRtcFrameBuilder(bridge); -#endif + srs_freep(rtc_bridge_); + rtc_bridge_ = bridge; } srs_error_t SrsRtcSource::create_consumer(ISrsRtcConsumer *&consumer) @@ -678,31 +667,30 @@ srs_error_t SrsRtcSource::on_publish() return srs_error_wrap(err, "source id change"); } + // Setup the audio and video codec. + SrsAudioCodecId audio_codec = SrsAudioCodecIdOpus; + if (stream_desc_->audio_track_desc_ && stream_desc_->audio_track_desc_->media_) { + audio_codec = SrsAudioCodecId(stream_desc_->audio_track_desc_->media_->codec(false)); + } + + SrsVideoCodecId video_codec = SrsVideoCodecIdAVC; + if (stream_desc_->video_track_descs_.size() > 0) { + SrsRtcTrackDescription *track_desc = stream_desc_->video_track_descs_.at(0); + video_codec = SrsVideoCodecId(track_desc->media_->codec(true)); + } + // If bridge to other source, handle event and start timer to request PLI. - if (bridge_) { -#ifdef SRS_FFMPEG_FIT - SrsAudioCodecId audio_codec = SrsAudioCodecIdOpus; - if (stream_desc_->audio_track_desc_ && stream_desc_->audio_track_desc_->media_) { - audio_codec = SrsAudioCodecId(stream_desc_->audio_track_desc_->media_->codec(false)); + if (rtc_bridge_) { + if ((err = rtc_bridge_->initialize(req_)) != srs_success) { + return srs_error_wrap(err, "rtp bridge initialize"); } - SrsVideoCodecId video_codec = SrsVideoCodecIdAVC; - if (stream_desc_->video_track_descs_.size() > 0) { - SrsRtcTrackDescription *track_desc = stream_desc_->video_track_descs_.at(0); - video_codec = SrsVideoCodecId(track_desc->media_->codec(true)); + if ((err = rtc_bridge_->setup_codec(audio_codec, video_codec)) != srs_success) { + return srs_error_wrap(err, "rtp bridge setup codec"); } - if ((err = frame_builder_->initialize(req_, audio_codec, video_codec)) != srs_success) { - return srs_error_wrap(err, "frame builder initialize"); - } - - if ((err = frame_builder_->on_publish()) != srs_success) { - return srs_error_wrap(err, "frame builder on publish"); - } -#endif - - if ((err = bridge_->on_publish()) != srs_success) { - return srs_error_wrap(err, "bridge on publish"); + if ((err = rtc_bridge_->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtp bridge on publish"); } // The PLI interval for RTC2RTMP. @@ -741,17 +729,12 @@ void SrsRtcSource::on_unpublish() } // free bridge resource - if (bridge_) { + if (rtc_bridge_) { // For SrsRtcSource::on_timer() _srs_shared_timer->timer100ms()->unsubscribe(this); -#ifdef SRS_FFMPEG_FIT - frame_builder_->on_unpublish(); - srs_freep(frame_builder_); -#endif - - bridge_->on_unpublish(); - srs_freep(bridge_); + rtc_bridge_->on_unpublish(); + srs_freep(rtc_bridge_); } SrsStatistic *stat = SrsStatistic::instance(); @@ -806,11 +789,9 @@ srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket *pkt) } } -#ifdef SRS_FFMPEG_FIT - if (frame_builder_ && (err = frame_builder_->on_rtp(pkt)) != srs_success) { - return srs_error_wrap(err, "frame builder consume packet"); + if (rtc_bridge_ && (err = rtc_bridge_->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "rtp bridge consume packet"); } -#endif return err; } @@ -894,9 +875,9 @@ srs_error_t SrsRtcSource::on_timer(srs_utime_t interval) #ifdef SRS_FFMPEG_FIT -SrsRtcRtpBuilder::SrsRtcRtpBuilder(SrsFrameToRtcBridge *bridge, SrsSharedPtr source) +SrsRtcRtpBuilder::SrsRtcRtpBuilder(ISrsRtpTarget *target, SrsSharedPtr source) { - bridge_ = bridge; + rtp_target_ = target; source_ = source; req_ = NULL; @@ -1155,7 +1136,7 @@ srs_error_t SrsRtcRtpBuilder::transcode(SrsParsedAudioPacket *audio) break; } - if ((err = bridge_->on_rtp(pkt.get())) != srs_success) { + if ((err = rtp_target_->on_rtp(pkt.get())) != srs_success) { err = srs_error_wrap(err, "consume opus"); break; } @@ -1247,7 +1228,7 @@ srs_error_t SrsRtcRtpBuilder::on_video(SrsMediaPacket *msg) return srs_error_wrap(err, "package stap-a"); } - if ((err = bridge_->on_rtp(pkt.get())) != srs_success) { + if ((err = rtp_target_->on_rtp(pkt.get())) != srs_success) { return srs_error_wrap(err, "consume sps/pps"); } } @@ -1383,7 +1364,7 @@ srs_error_t SrsRtcRtpBuilder::consume_packets(vector &pkts) // TODO: FIXME: Consume a range of packets. for (int i = 0; i < (int)pkts.size(); i++) { SrsRtpPacket *pkt = pkts[i]; - if ((err = bridge_->on_rtp(pkt)) != srs_success) { + if ((err = rtp_target_->on_rtp(pkt)) != srs_success) { err = srs_error_wrap(err, "consume sps/pps"); break; } @@ -1758,9 +1739,9 @@ void SrsRtcFrameBuilderAudioPacketCache::clear_all() audio_buffer_.clear(); } -SrsRtcFrameBuilder::SrsRtcFrameBuilder(ISrsStreamBridge *bridge) +SrsRtcFrameBuilder::SrsRtcFrameBuilder(ISrsFrameTarget *target) { - bridge_ = bridge; + frame_target_ = target; is_first_audio_ = true; audio_transcoder_ = NULL; video_codec_ = SrsVideoCodecIdAVC; @@ -1892,7 +1873,7 @@ srs_error_t SrsRtcFrameBuilder::transcode_audio(SrsRtpPacket *pkt) SrsMediaPacket msg; out_rtmp.to_msg(&msg); - if ((err = bridge_->on_frame(&msg)) != srs_success) { + if ((err = frame_target_->on_frame(&msg)) != srs_success) { return srs_error_wrap(err, "source on audio"); } @@ -1922,7 +1903,7 @@ srs_error_t SrsRtcFrameBuilder::transcode_audio(SrsRtpPacket *pkt) SrsMediaPacket msg; out_rtmp.to_msg(&msg); - if ((err = bridge_->on_frame(&msg)) != srs_success) { + if ((err = frame_target_->on_frame(&msg)) != srs_success) { err = srs_error_wrap(err, "source on audio"); break; } @@ -2099,7 +2080,7 @@ srs_error_t SrsRtcFrameBuilder::do_packet_sequence_header_avc(SrsRtpPacket *pkt, SrsMediaPacket msg; rtmp.to_msg(&msg); - if ((err = bridge_->on_frame(&msg)) != srs_success) { + if ((err = frame_target_->on_frame(&msg)) != srs_success) { return err; } @@ -2196,7 +2177,7 @@ srs_error_t SrsRtcFrameBuilder::do_packet_sequence_header_hevc(SrsRtpPacket *pkt SrsMediaPacket msg; rtmp.to_msg(&msg); - if ((err = bridge_->on_frame(&msg)) != srs_success) { + if ((err = frame_target_->on_frame(&msg)) != srs_success) { return err; } @@ -2452,7 +2433,7 @@ srs_error_t SrsRtcFrameBuilder::packet_video_rtmp(const uint16_t start, const ui SrsMediaPacket msg; rtmp.to_msg(&msg); - if ((err = bridge_->on_frame(&msg)) != srs_success) { + if ((err = frame_target_->on_frame(&msg)) != srs_success) { srs_warn("fail to pack video frame: %s", srs_error_summary(err).c_str()); srs_freep(err); } diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index db850ea5f..6a37fd4a1 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -29,7 +29,6 @@ class SrsMediaPacket; class SrsRtmpCommonMessage; class SrsMessageArray; class SrsRtcSource; -class SrsFrameToRtcBridge; class SrsAudioTranscoder; class SrsRtpPacket; class SrsNaluSample; @@ -60,6 +59,7 @@ const int SLIDING_WINDOW_SIZE = 10; // Maximum waiting time for out-of-order packets (in ms) const int MAX_AUDIO_WAIT_MS = 100; +// NTP time for RTC. class SrsNtp { public: @@ -157,6 +157,7 @@ public: void on_stream_change(SrsRtcSourceDescription *desc); }; +// The RTC source manager. class SrsRtcSourceManager : public ISrsHourGlass { private: @@ -203,6 +204,7 @@ public: virtual const SrsContextId &context_id() = 0; }; +// The event handler for RTC source. class ISrsRtcSourceEventHandler { public: @@ -217,8 +219,12 @@ public: }; // A Source is a stream, to publish and to play with, binding to SrsRtcPublishStream and SrsRtcPlayStream. -class SrsRtcSource : public ISrsFastTimer, public ISrsRtcSourceForConsumer +class SrsRtcSource : public ISrsRtpTarget, public ISrsFastTimer, public ISrsRtcSourceForConsumer { +private: + // The RTP bridge, convert RTP packets to other protocols. + ISrsRtcBridge *rtc_bridge_; + private: // Circuit breaker for protecting server resources. ISrsCircuitBreaker *circuit_breaker_; @@ -234,14 +240,6 @@ private: // Steam description for this steam. SrsRtcSourceDescription *stream_desc_; -private: -#ifdef SRS_FFMPEG_FIT - // Collect and build WebRTC RTP packets to AV frames. - SrsRtcFrameBuilder *frame_builder_; -#endif - // The Source bridge, bridge stream to other source. - ISrsStreamBridge *bridge_; - private: // To delivery stream to clients. std::vector consumers_; @@ -289,7 +287,7 @@ public: virtual SrsContextId pre_source_id(); public: - void set_bridge(ISrsStreamBridge *bridge); + virtual void set_bridge(ISrsRtcBridge *bridge); public: // Create consumer @@ -313,19 +311,19 @@ public: public: // For event handler - void subscribe(ISrsRtcSourceEventHandler *h); - void unsubscribe(ISrsRtcSourceEventHandler *h); + virtual void subscribe(ISrsRtcSourceEventHandler *h); + virtual void unsubscribe(ISrsRtcSourceEventHandler *h); public: // Get and set the publisher, passed to consumer to process requests such as PLI. - ISrsRtcPublishStream *publish_stream(); - void set_publish_stream(ISrsRtcPublishStream *v); + virtual ISrsRtcPublishStream *publish_stream(); + virtual void set_publish_stream(ISrsRtcPublishStream *v); // Consume the shared RTP packet, user must free it. - srs_error_t on_rtp(SrsRtpPacket *pkt); + virtual srs_error_t on_rtp(SrsRtpPacket *pkt); // Set and get stream description for source - bool has_stream_desc(); - void set_stream_desc(SrsRtcSourceDescription *stream_desc); - std::vector get_track_desc(std::string type, std::string media_type); + virtual bool has_stream_desc(); + virtual void set_stream_desc(SrsRtcSourceDescription *stream_desc); + virtual std::vector get_track_desc(std::string type, std::string media_type); // interface ISrsFastTimer private: srs_error_t on_timer(srs_utime_t interval); @@ -338,7 +336,7 @@ class SrsRtcRtpBuilder { private: ISrsRequest *req_; - SrsFrameToRtcBridge *bridge_; + ISrsRtpTarget *rtp_target_; // The format, codec information. SrsRtmpFormat *format_; // The metadata cache. @@ -365,7 +363,7 @@ private: bool video_initialized_; public: - SrsRtcRtpBuilder(SrsFrameToRtcBridge *bridge, SrsSharedPtr source); + SrsRtcRtpBuilder(ISrsRtpTarget *target, SrsSharedPtr source); virtual ~SrsRtcRtpBuilder(); private: @@ -494,7 +492,7 @@ public: class SrsRtcFrameBuilder { private: - ISrsStreamBridge *bridge_; + ISrsFrameTarget *frame_target_; private: bool is_first_audio_; @@ -517,7 +515,7 @@ private: SrsRtpPacket *obs_whip_pps_; public: - SrsRtcFrameBuilder(ISrsStreamBridge *bridge); + SrsRtcFrameBuilder(ISrsFrameTarget *target); virtual ~SrsRtcFrameBuilder(); public: diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 59d32ae97..94380a2d6 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1059,17 +1059,17 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr source) // Bridge to RTC streaming. // TODO: FIXME: Need to convert RTMP to SRT. - SrsCompositeBridge *bridge = new SrsCompositeBridge(); + SrsRtmpBridge *bridge = new SrsRtmpBridge(); #if defined(SRS_FFMPEG_FIT) if (rtc.get() && _srs_config->get_rtc_from_rtmp(req->vhost_)) { - bridge->append(new SrsFrameToRtcBridge(rtc)); + bridge->enable_rtmp2rtc(rtc); } #endif #ifdef SRS_RTSP if (rtsp.get() && _srs_config->get_rtsp_from_rtmp(req->vhost_)) { - bridge->append(new SrsFrameToRtspBridge(rtsp)); + bridge->enable_rtmp2rtsp(rtsp); } #endif diff --git a/trunk/src/app/srs_app_rtmp_source.cpp b/trunk/src/app/srs_app_rtmp_source.cpp index dffb83a34..d5feaaaa9 100644 --- a/trunk/src/app/srs_app_rtmp_source.cpp +++ b/trunk/src/app/srs_app_rtmp_source.cpp @@ -1706,7 +1706,7 @@ SrsLiveSource::SrsLiveSource() stream_die_at_ = 0; publisher_idle_at_ = 0; - bridge_ = NULL; + rtmp_bridge_ = NULL; play_edge_ = new SrsPlayEdge(); publish_edge_ = new SrsPublishEdge(); @@ -1740,7 +1740,7 @@ SrsLiveSource::~SrsLiveSource() srs_freep(gop_cache_); srs_freep(req_); - srs_freep(bridge_); + srs_freep(rtmp_bridge_); SrsContextId cid = _source_id; if (cid.empty()) @@ -1852,10 +1852,10 @@ srs_error_t SrsLiveSource::initialize(SrsSharedPtr wrapper, ISrsR return err; } -void SrsLiveSource::set_bridge(ISrsStreamBridge *v) +void SrsLiveSource::set_bridge(ISrsRtmpBridge *v) { - srs_freep(bridge_); - bridge_ = v; + srs_freep(rtmp_bridge_); + rtmp_bridge_ = v; } srs_error_t SrsLiveSource::on_source_id_changed(SrsContextId id) @@ -2049,7 +2049,7 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsMediaPacket *msg) } // For bridge to consume the message. - if (bridge_ && (err = bridge_->on_frame(msg)) != srs_success) { + if (rtmp_bridge_ && (err = rtmp_bridge_->on_frame(msg)) != srs_success) { return srs_error_wrap(err, "bridge consume audio"); } @@ -2170,7 +2170,7 @@ srs_error_t SrsLiveSource::on_video_imp(SrsMediaPacket *msg) } // For bridge to consume the message. - if (bridge_ && (err = bridge_->on_frame(msg)) != srs_success) { + if (rtmp_bridge_ && (err = rtmp_bridge_->on_frame(msg)) != srs_success) { return srs_error_wrap(err, "bridge consume video"); } @@ -2332,7 +2332,7 @@ srs_error_t SrsLiveSource::on_publish() return srs_error_wrap(err, "handle publish"); } - if (bridge_ && (err = bridge_->on_publish()) != srs_success) { + if (rtmp_bridge_ && (err = rtmp_bridge_->on_publish()) != srs_success) { return srs_error_wrap(err, "bridge publish"); } @@ -2383,9 +2383,9 @@ void SrsLiveSource::on_unpublish() handler->on_unpublish(req_); - if (bridge_) { - bridge_->on_unpublish(); - srs_freep(bridge_); + if (rtmp_bridge_) { + rtmp_bridge_->on_unpublish(); + srs_freep(rtmp_bridge_); } // no consumer, stream is die. diff --git a/trunk/src/app/srs_app_rtmp_source.hpp b/trunk/src/app/srs_app_rtmp_source.hpp index e4c432ca6..2a6414579 100644 --- a/trunk/src/app/srs_app_rtmp_source.hpp +++ b/trunk/src/app/srs_app_rtmp_source.hpp @@ -507,7 +507,7 @@ public: extern SrsLiveSourceManager *_srs_sources; // The live streaming source. -class SrsLiveSource : public ISrsReloadHandler +class SrsLiveSource : public ISrsReloadHandler, public ISrsFrameTarget { friend class SrsOriginHub; @@ -539,7 +539,7 @@ private: // The time of the packet we just got. int64_t last_packet_time_; // The source bridge for other source. - ISrsStreamBridge *bridge_; + ISrsRtmpBridge *rtmp_bridge_; // The edge control service SrsPlayEdge *play_edge_; SrsPublishEdge *publish_edge_; @@ -576,7 +576,7 @@ public: // Initialize the hls with handlers. virtual srs_error_t initialize(SrsSharedPtr wrapper, ISrsRequest *r); // Bridge to other source, forward packets to it. - void set_bridge(ISrsStreamBridge *v); + void set_bridge(ISrsRtmpBridge *v); public: // The source id changed. diff --git a/trunk/src/app/srs_app_rtsp_source.cpp b/trunk/src/app/srs_app_rtsp_source.cpp index 9ed18b01e..01bf75578 100644 --- a/trunk/src/app/srs_app_rtsp_source.cpp +++ b/trunk/src/app/srs_app_rtsp_source.cpp @@ -503,9 +503,9 @@ void SrsRtspSource::set_video_desc(SrsRtcTrackDescription *video_desc) video_desc_ = video_desc->copy(); } -SrsRtspRtpBuilder::SrsRtspRtpBuilder(SrsFrameToRtspBridge *bridge, SrsSharedPtr source) +SrsRtspRtpBuilder::SrsRtspRtpBuilder(ISrsRtpTarget *target, SrsSharedPtr source) { - bridge_ = bridge; + rtp_target_ = target; source_ = source; req_ = NULL; @@ -746,7 +746,7 @@ srs_error_t SrsRtspRtpBuilder::on_audio(SrsMediaPacket *msg) return srs_error_new(ERROR_NOT_IMPLEMENTED, "codec %d not implemented", acodec); } - if ((err = bridge_->on_rtp(pkt.get())) != srs_success) { + if ((err = rtp_target_->on_rtp(pkt.get())) != srs_success) { return srs_error_wrap(err, "consume audio packet"); } @@ -876,7 +876,7 @@ srs_error_t SrsRtspRtpBuilder::on_video(SrsMediaPacket *msg) return srs_error_wrap(err, "package stap-a"); } - if ((err = bridge_->on_rtp(pkt.get())) != srs_success) { + if ((err = rtp_target_->on_rtp(pkt.get())) != srs_success) { return srs_error_wrap(err, "consume sps/pps"); } } @@ -975,7 +975,7 @@ srs_error_t SrsRtspRtpBuilder::consume_packets(vector &pkts) // TODO: FIXME: Consume a range of packets. for (int i = 0; i < (int)pkts.size(); i++) { SrsRtpPacket *pkt = pkts[i]; - if ((err = bridge_->on_rtp(pkt)) != srs_success) { + if ((err = rtp_target_->on_rtp(pkt)) != srs_success) { err = srs_error_wrap(err, "consume sps/pps"); break; } diff --git a/trunk/src/app/srs_app_rtsp_source.hpp b/trunk/src/app/srs_app_rtsp_source.hpp index 1ece97992..aec6cb850 100644 --- a/trunk/src/app/srs_app_rtsp_source.hpp +++ b/trunk/src/app/srs_app_rtsp_source.hpp @@ -26,7 +26,6 @@ class SrsRtcSourceDescription; class SrsResourceManager; class SrsRtspConnection; class SrsRtpVideoBuilder; -class SrsFrameToRtspBridge; // The RTSP stream consumer, consume packets from RTSP stream source. class SrsRtspConsumer @@ -103,7 +102,7 @@ extern SrsRtspSourceManager *_srs_rtsp_sources; extern SrsResourceManager *_srs_rtsp_manager; // A Source is a stream, to publish and to play with, binding to SrsRtspPlayStream. -class SrsRtspSource +class SrsRtspSource : public ISrsRtpTarget { private: // For publish, it's the publish client id. @@ -176,7 +175,7 @@ public: public: // Consume the shared RTP packet, user must free it. - srs_error_t on_rtp(SrsRtpPacket *pkt); + virtual srs_error_t on_rtp(SrsRtpPacket *pkt); public: SrsRtcTrackDescription *audio_desc(); @@ -190,7 +189,7 @@ class SrsRtspRtpBuilder { private: ISrsRequest *req_; - SrsFrameToRtspBridge *bridge_; + ISrsRtpTarget *rtp_target_; // The format, codec information. SrsRtmpFormat *format_; // The metadata cache. @@ -211,7 +210,7 @@ private: bool video_initialized_; public: - SrsRtspRtpBuilder(SrsFrameToRtspBridge *bridge, SrsSharedPtr source); + SrsRtspRtpBuilder(ISrsRtpTarget *target, SrsSharedPtr source); virtual ~SrsRtspRtpBuilder(); private: diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index 460c39193..ac11e2cd4 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -413,25 +413,26 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish() } } + // Bridge to RTMP and RTC streaming. + SrsSrtBridge *bridge = new SrsSrtBridge(); + if (_srs_config->get_srt_to_rtmp(req_->vhost_)) { - // Bridge to RTMP and RTC streaming. - SrsCompositeBridge *bridge = new SrsCompositeBridge(); - bridge->append(new SrsFrameToRtmpBridge(live_source)); - -#if defined(SRS_FFMPEG_FIT) - if (rtc.get() && _srs_config->get_rtc_from_rtmp(req_->vhost_)) { - bridge->append(new SrsFrameToRtcBridge(rtc)); - } -#endif - - if ((err = bridge->initialize(req_)) != srs_success) { - srs_freep(bridge); - return srs_error_wrap(err, "create bridge"); - } - - srt_source_->set_bridge(bridge); + bridge->enable_srt2rtmp(live_source); } + if (rtc.get() && _srs_config->get_rtc_from_rtmp(req_->vhost_)) { + bridge->enable_srt2rtc(rtc); + } + + if (bridge->empty()) { + srs_freep(bridge); + } else if ((err = bridge->initialize(req_)) != srs_success) { + srs_freep(bridge); + return srs_error_wrap(err, "bridge init"); + } + + srt_source_->set_bridge(bridge); + if ((err = srt_source_->on_publish()) != srs_success) { return srs_error_wrap(err, "srt source publish"); } diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index f3e8a846d..5a8e2f00e 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -291,7 +291,7 @@ void SrsSrtConsumer::wait(int nb_msgs, srs_utime_t timeout) srs_cond_timedwait(mw_wait_, timeout); } -SrsSrtFrameBuilder::SrsSrtFrameBuilder(ISrsStreamBridge *bridge) +SrsSrtFrameBuilder::SrsSrtFrameBuilder(ISrsFrameTarget *target) { ts_ctx_ = new SrsTsContext(); @@ -300,7 +300,7 @@ SrsSrtFrameBuilder::SrsSrtFrameBuilder(ISrsStreamBridge *bridge) pps_ = ""; req_ = NULL; - bridge_ = bridge; + frame_target_ = target; video_streamid_ = 1; audio_streamid_ = 2; @@ -510,7 +510,7 @@ srs_error_t SrsSrtFrameBuilder::check_sps_pps_change(SrsTsMessage *msg) SrsMediaPacket frame; rtmp.to_msg(&frame); - if ((err = bridge_->on_frame(&frame)) != srs_success) { + if ((err = frame_target_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err, "srt to rtmp sps/pps"); } @@ -568,7 +568,7 @@ srs_error_t SrsSrtFrameBuilder::on_h264_frame(SrsTsMessage *msg, vectoron_frame(&frame)) != srs_success) { + if ((err = frame_target_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err, "srt ts video to rtmp"); } @@ -703,7 +703,7 @@ srs_error_t SrsSrtFrameBuilder::check_vps_sps_pps_change(SrsTsMessage *msg) SrsMediaPacket frame; rtmp.to_msg(&frame); - if ((err = bridge_->on_frame(&frame)) != srs_success) { + if ((err = frame_target_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err, "srt to rtmp vps/sps/pps"); } @@ -759,7 +759,7 @@ srs_error_t SrsSrtFrameBuilder::on_hevc_frame(SrsTsMessage *msg, vectoron_frame(&frame)) != srs_success) { + if ((err = frame_target_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err, "srt ts hevc video to rtmp"); } @@ -873,7 +873,7 @@ srs_error_t SrsSrtFrameBuilder::check_audio_sh_change(SrsTsMessage *msg, uint32_ SrsMediaPacket frame; rtmp.to_msg(&frame); - if ((err = bridge_->on_frame(&frame)) != srs_success) { + if ((err = frame_target_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err, "srt to rtmp audio sh"); } @@ -901,7 +901,7 @@ srs_error_t SrsSrtFrameBuilder::on_aac_frame(SrsTsMessage *msg, uint32_t pts, ch SrsMediaPacket frame; rtmp.to_msg(&frame); - if ((err = bridge_->on_frame(&frame)) != srs_success) { + if ((err = frame_target_->on_frame(&frame)) != srs_success) { return srs_error_wrap(err, "srt to rtmp audio sh"); } @@ -912,8 +912,7 @@ SrsSrtSource::SrsSrtSource() { req_ = NULL; can_publish_ = true; - frame_builder_ = NULL; - bridge_ = NULL; + srt_bridge_ = NULL; stream_die_at_ = 0; } @@ -923,8 +922,7 @@ SrsSrtSource::~SrsSrtSource() // for all consumers are auto free. consumers_.clear(); - srs_freep(frame_builder_); - srs_freep(bridge_); + srs_freep(srt_bridge_); srs_freep(req_); SrsContextId cid = _source_id; @@ -1013,13 +1011,10 @@ void SrsSrtSource::update_auth(ISrsRequest *r) req_->update_auth(r); } -void SrsSrtSource::set_bridge(ISrsStreamBridge *bridge) +void SrsSrtSource::set_bridge(ISrsSrtBridge *bridge) { - srs_freep(bridge_); - bridge_ = bridge; - - srs_freep(frame_builder_); - frame_builder_ = new SrsSrtFrameBuilder(bridge); + srs_freep(srt_bridge_); + srt_bridge_ = bridge; } srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer *&consumer) @@ -1073,18 +1068,8 @@ srs_error_t SrsSrtSource::on_publish() return srs_error_wrap(err, "source id change"); } - if (bridge_) { - if ((err = frame_builder_->initialize(req_)) != srs_success) { - return srs_error_wrap(err, "frame builder initialize"); - } - - if ((err = frame_builder_->on_publish()) != srs_success) { - return srs_error_wrap(err, "frame builder on publish"); - } - - if ((err = bridge_->on_publish()) != srs_success) { - return srs_error_wrap(err, "bridge on publish"); - } + if (srt_bridge_ && (err = srt_bridge_->on_publish()) != srs_success) { + return srs_error_wrap(err, "bridge on publish"); } SrsStatistic *stat = SrsStatistic::instance(); @@ -1105,12 +1090,9 @@ void SrsSrtSource::on_unpublish() SrsStatistic *stat = SrsStatistic::instance(); stat->on_stream_close(req_); - if (bridge_) { - frame_builder_->on_unpublish(); - srs_freep(frame_builder_); - - bridge_->on_unpublish(); - srs_freep(bridge_); + if (srt_bridge_) { + srt_bridge_->on_unpublish(); + srs_freep(srt_bridge_); } // Destroy and cleanup source when no publishers and consumers. @@ -1130,7 +1112,7 @@ srs_error_t SrsSrtSource::on_packet(SrsSrtPacket *packet) } } - if (frame_builder_ && (err = frame_builder_->on_packet(packet)) != srs_success) { + if (srt_bridge_ && (err = srt_bridge_->on_packet(packet)) != srs_success) { return srs_error_wrap(err, "bridge consume message"); } diff --git a/trunk/src/app/srs_app_srt_source.hpp b/trunk/src/app/srs_app_srt_source.hpp index 3df46fa85..cda09c74a 100644 --- a/trunk/src/app/srs_app_srt_source.hpp +++ b/trunk/src/app/srs_app_srt_source.hpp @@ -117,7 +117,7 @@ public: class SrsSrtFrameBuilder : public ISrsTsHandler { public: - SrsSrtFrameBuilder(ISrsStreamBridge *bridge); + SrsSrtFrameBuilder(ISrsFrameTarget *target); virtual ~SrsSrtFrameBuilder(); public: @@ -143,7 +143,7 @@ private: srs_error_t on_hevc_frame(SrsTsMessage *msg, std::vector > &ipb_frames); private: - ISrsStreamBridge *bridge_; + ISrsFrameTarget *frame_target_; private: SrsTsContext *ts_ctx_; @@ -171,7 +171,8 @@ private: SrsAlonePithyPrint *pp_audio_duration_; }; -class SrsSrtSource +// A SRT source is a stream, to publish and to play with. +class SrsSrtSource : public ISrsSrtTarget { public: SrsSrtSource(); @@ -194,7 +195,7 @@ public: virtual void update_auth(ISrsRequest *r); public: - void set_bridge(ISrsStreamBridge *bridge); + void set_bridge(ISrsSrtBridge *bridge); public: // Create consumer @@ -226,8 +227,7 @@ private: srs_utime_t stream_die_at_; private: - SrsSrtFrameBuilder *frame_builder_; - ISrsStreamBridge *bridge_; + ISrsSrtBridge *srt_bridge_; }; #endif diff --git a/trunk/src/app/srs_app_stream_bridge.cpp b/trunk/src/app/srs_app_stream_bridge.cpp index 58927921d..1b1fa20d7 100644 --- a/trunk/src/app/srs_app_stream_bridge.cpp +++ b/trunk/src/app/srs_app_stream_bridge.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -16,243 +17,434 @@ #ifdef SRS_RTSP #include #endif +#include #include using namespace std; -ISrsStreamBridge::ISrsStreamBridge() +ISrsFrameTarget::ISrsFrameTarget() { } -ISrsStreamBridge::~ISrsStreamBridge() +ISrsFrameTarget::~ISrsFrameTarget() { } -SrsFrameToRtmpBridge::SrsFrameToRtmpBridge(SrsSharedPtr source) -{ - source_ = source; -} - -SrsFrameToRtmpBridge::~SrsFrameToRtmpBridge() +ISrsRtpTarget::ISrsRtpTarget() { } -srs_error_t SrsFrameToRtmpBridge::initialize(ISrsRequest *r) +ISrsRtpTarget::~ISrsRtpTarget() { - return srs_success; } -srs_error_t SrsFrameToRtmpBridge::on_publish() +ISrsSrtTarget::ISrsSrtTarget() { - srs_error_t err = srs_success; - - // TODO: FIXME: Should sync with bridge? - if ((err = source_->on_publish()) != srs_success) { - return srs_error_wrap(err, "source publish"); - } - - return err; } -void SrsFrameToRtmpBridge::on_unpublish() +ISrsSrtTarget::~ISrsSrtTarget() { - // TODO: FIXME: Should sync with bridge? - source_->on_unpublish(); } -srs_error_t SrsFrameToRtmpBridge::on_frame(SrsMediaPacket *frame) +ISrsRtmpBridge::ISrsRtmpBridge() { - return source_->on_frame(frame); } -SrsFrameToRtcBridge::SrsFrameToRtcBridge(SrsSharedPtr source) +ISrsRtmpBridge::~ISrsRtmpBridge() { - source_ = source; +} -#if defined(SRS_FFMPEG_FIT) - // Use lazy initialization - no need to determine codec/track parameters here - rtp_builder_ = new SrsRtcRtpBuilder(this, source); +SrsRtmpBridge::SrsRtmpBridge() +{ +#ifdef SRS_FFMPEG_FIT + rtp_builder_ = NULL; #endif +#ifdef SRS_RTSP + rtsp_builder_ = NULL; +#endif + rtc_target_ = NULL; + rtsp_target_ = NULL; } -SrsFrameToRtcBridge::~SrsFrameToRtcBridge() +SrsRtmpBridge::~SrsRtmpBridge() { #ifdef SRS_FFMPEG_FIT srs_freep(rtp_builder_); #endif -} - -srs_error_t SrsFrameToRtcBridge::initialize(ISrsRequest *r) -{ -#ifdef SRS_FFMPEG_FIT - return rtp_builder_->initialize(r); -#else - return srs_success; +#ifdef SRS_RTSP + srs_freep(rtsp_builder_); #endif + rtc_target_ = NULL; + rtsp_target_ = NULL; } -srs_error_t SrsFrameToRtcBridge::on_publish() +bool SrsRtmpBridge::empty() +{ + return !rtc_target_.get() || !rtsp_target_.get(); +} + +void SrsRtmpBridge::enable_rtmp2rtc(SrsSharedPtr rtc_source) +{ + rtc_target_ = rtc_source; +} + +void SrsRtmpBridge::enable_rtmp2rtsp(SrsSharedPtr rtsp_source) +{ + rtsp_target_ = rtsp_source; +} + +srs_error_t SrsRtmpBridge::initialize(ISrsRequest *r) { srs_error_t err = srs_success; - // TODO: FIXME: Should sync with bridge? - if ((err = source_->on_publish()) != srs_success) { - return srs_error_wrap(err, "source publish"); - } - #ifdef SRS_FFMPEG_FIT - if ((err = rtp_builder_->on_publish()) != srs_success) { - return srs_error_wrap(err, "rtp builder publish"); + if (rtc_target_.get()) { + srs_freep(rtp_builder_); + rtp_builder_ = new SrsRtcRtpBuilder(rtc_target_.get(), rtc_target_); + if ((err = rtp_builder_->initialize(r)) != srs_success) { + return srs_error_wrap(err, "rtp builder initialize"); + } } #endif - return err; -} - -void SrsFrameToRtcBridge::on_unpublish() -{ -#ifdef SRS_FFMPEG_FIT - rtp_builder_->on_unpublish(); -#endif - - // @remark This bridge might be disposed here, so never use it. - // TODO: FIXME: Should sync with bridge? - source_->on_unpublish(); -} - -srs_error_t SrsFrameToRtcBridge::on_frame(SrsMediaPacket *frame) -{ -#ifdef SRS_FFMPEG_FIT - return rtp_builder_->on_frame(frame); -#else - return srs_success; -#endif -} - -srs_error_t SrsFrameToRtcBridge::on_rtp(SrsRtpPacket *pkt) -{ - return source_->on_rtp(pkt); -} - #ifdef SRS_RTSP -SrsFrameToRtspBridge::SrsFrameToRtspBridge(SrsSharedPtr source) -{ - source_ = source; - - // Use lazy initialization - no need to determine codec/track parameters here - rtp_builder_ = new SrsRtspRtpBuilder(this, source); -} - -SrsFrameToRtspBridge::~SrsFrameToRtspBridge() -{ - srs_freep(rtp_builder_); -} - -srs_error_t SrsFrameToRtspBridge::initialize(ISrsRequest *r) -{ - return rtp_builder_->initialize(r); -} - -srs_error_t SrsFrameToRtspBridge::on_publish() -{ - srs_error_t err = srs_success; - - // TODO: FIXME: Should sync with bridge? - if ((err = source_->on_publish()) != srs_success) { - return srs_error_wrap(err, "source publish"); + if (rtsp_target_.get()) { + srs_freep(rtsp_builder_); + rtsp_builder_ = new SrsRtspRtpBuilder(rtsp_target_.get(), rtsp_target_); + if ((err = rtsp_builder_->initialize(r)) != srs_success) { + return srs_error_wrap(err, "rtsp builder initialize"); + } } - - if ((err = rtp_builder_->on_publish()) != srs_success) { - return srs_error_wrap(err, "rtp builder publish"); - } - - return err; -} - -void SrsFrameToRtspBridge::on_unpublish() -{ - rtp_builder_->on_unpublish(); - - // @remark This bridge might be disposed here, so never use it. - // TODO: FIXME: Should sync with bridge? - source_->on_unpublish(); -} - -srs_error_t SrsFrameToRtspBridge::on_frame(SrsMediaPacket *frame) -{ - return rtp_builder_->on_frame(frame); -} - -srs_error_t SrsFrameToRtspBridge::on_rtp(SrsRtpPacket *pkt) -{ - return source_->on_rtp(pkt); -} #endif -SrsCompositeBridge::SrsCompositeBridge() -{ + return err; } -SrsCompositeBridge::~SrsCompositeBridge() -{ - for (vector::iterator it = bridges_.begin(); it != bridges_.end(); ++it) { - ISrsStreamBridge *bridge = *it; - srs_freep(bridge); - } -} - -srs_error_t SrsCompositeBridge::initialize(ISrsRequest *r) +srs_error_t SrsRtmpBridge::on_publish() { srs_error_t err = srs_success; - for (vector::iterator it = bridges_.begin(); it != bridges_.end(); ++it) { - ISrsStreamBridge *bridge = *it; - if ((err = bridge->initialize(r)) != srs_success) { - return err; + // TODO: FIXME: Should sync with bridge? + if (rtc_target_.get()) { + if ((err = rtc_target_->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtc target publish"); } + +#ifdef SRS_FFMPEG_FIT + if ((err = rtp_builder_->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtp builder publish"); + } +#endif + } + +#ifdef SRS_RTSP + // TODO: FIXME: Should sync with bridge? + if (rtsp_target_.get()) { + if ((err = rtsp_target_->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtsp target publish"); + } + + if ((err = rtsp_builder_->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtsp builder publish"); + } + } +#endif + + return err; +} + +void SrsRtmpBridge::on_unpublish() +{ + if (rtc_target_.get()) { +#ifdef SRS_FFMPEG_FIT + rtp_builder_->on_unpublish(); +#endif + rtc_target_->on_unpublish(); + } + +#ifdef SRS_RTSP + if (rtsp_target_.get()) { + rtsp_builder_->on_unpublish(); + rtsp_target_->on_unpublish(); + } +#endif + + // Note that RTMP live source free this rtmp bridge, after on_unpublish() is called. + // So there is no need to free its components here. +} + +srs_error_t SrsRtmpBridge::on_frame(SrsMediaPacket *frame) +{ + srs_error_t err = srs_success; + +#ifdef SRS_FFMPEG_FIT + if (rtp_builder_ && (err = rtp_builder_->on_frame(frame)) != srs_success) { + return srs_error_wrap(err, "rtp builder on frame"); + } +#endif + +#ifdef SRS_RTSP + if (rtsp_builder_ && (err = rtsp_builder_->on_frame(frame)) != srs_success) { + return srs_error_wrap(err, "rtsp builder on frame"); + } +#endif + + return err; +} + +ISrsSrtBridge::ISrsSrtBridge() +{ +} + +ISrsSrtBridge::~ISrsSrtBridge() +{ +} + +SrsSrtBridge::SrsSrtBridge() +{ + frame_builder_ = new SrsSrtFrameBuilder(this); + rtmp_target_ = NULL; + +#ifdef SRS_FFMPEG_FIT + rtp_builder_ = NULL; +#endif + rtc_target_ = NULL; +} + +SrsSrtBridge::~SrsSrtBridge() +{ + rtmp_target_ = NULL; + srs_freep(frame_builder_); + + rtc_target_ = NULL; +#ifdef SRS_FFMPEG_FIT + srs_freep(rtp_builder_); +#endif +} + +bool SrsSrtBridge::empty() +{ + return !rtmp_target_.get() && !rtc_target_.get(); +} + +void SrsSrtBridge::enable_srt2rtmp(SrsSharedPtr rtmp_source) +{ + rtmp_target_ = rtmp_source; +} + +void SrsSrtBridge::enable_srt2rtc(SrsSharedPtr rtc_source) +{ + rtc_target_ = rtc_source; +} + +srs_error_t SrsSrtBridge::initialize(ISrsRequest *r) +{ + srs_error_t err = srs_success; + + if ((err = frame_builder_->initialize(r)) != srs_success) { + return srs_error_wrap(err, "frame builder initialize"); + } + +#ifdef SRS_FFMPEG_FIT + if (rtc_target_.get()) { + srs_freep(rtp_builder_); + rtp_builder_ = new SrsRtcRtpBuilder(rtc_target_.get(), rtc_target_); + if ((err = rtp_builder_->initialize(r)) != srs_success) { + return srs_error_wrap(err, "rtp builder initialize"); + } + } +#endif + + return err; +} + +srs_error_t SrsSrtBridge::on_publish() +{ + srs_error_t err = srs_success; + + if ((err = frame_builder_->on_publish()) != srs_success) { + return srs_error_wrap(err, "frame builder publish"); + } + + // TODO: FIXME: Should sync with bridge? + if (rtmp_target_.get()) { + if ((err = rtmp_target_->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtmp target publish"); + } + } + + if (rtc_target_.get()) { + if ((err = rtc_target_->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtc target publish"); + } + +#ifdef SRS_FFMPEG_FIT + if ((err = rtp_builder_->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtp builder publish"); + } +#endif } return err; } -srs_error_t SrsCompositeBridge::on_publish() +void SrsSrtBridge::on_unpublish() +{ + frame_builder_->on_unpublish(); + + if (rtmp_target_.get()) { + rtmp_target_->on_unpublish(); + } + + if (rtc_target_.get()) { +#ifdef SRS_FFMPEG_FIT + rtp_builder_->on_unpublish(); +#endif + rtc_target_->on_unpublish(); + } + + // Note that SRT source free this srt bridge, after on_unpublish() is called. + // So there is no need to free its components here. +} + +srs_error_t SrsSrtBridge::on_packet(SrsSrtPacket *pkt) { srs_error_t err = srs_success; - for (vector::iterator it = bridges_.begin(); it != bridges_.end(); ++it) { - ISrsStreamBridge *bridge = *it; - if ((err = bridge->on_publish()) != srs_success) { - return err; - } + if ((err = frame_builder_->on_packet(pkt)) != srs_success) { + return srs_error_wrap(err, "frame builder on packet"); } return err; } -void SrsCompositeBridge::on_unpublish() -{ - for (vector::iterator it = bridges_.begin(); it != bridges_.end(); ++it) { - ISrsStreamBridge *bridge = *it; - bridge->on_unpublish(); - } -} - -srs_error_t SrsCompositeBridge::on_frame(SrsMediaPacket *frame) +srs_error_t SrsSrtBridge::on_frame(SrsMediaPacket *frame) { srs_error_t err = srs_success; - for (vector::iterator it = bridges_.begin(); it != bridges_.end(); ++it) { - ISrsStreamBridge *bridge = *it; - if ((err = bridge->on_frame(frame)) != srs_success) { - return err; - } + // Deliver frame to RTMP target + if (rtmp_target_.get() && (err = rtmp_target_->on_frame(frame)) != srs_success) { + return srs_error_wrap(err, "rtmp target on frame"); } + // Deliver frame to RTP builder, which delivers to RTC target +#ifdef SRS_FFMPEG_FIT + if (rtp_builder_ && (err = rtp_builder_->on_frame(frame)) != srs_success) { + return srs_error_wrap(err, "rtp builder on frame"); + } +#endif + return err; } -SrsCompositeBridge *SrsCompositeBridge::append(ISrsStreamBridge *bridge) +ISrsRtcBridge::ISrsRtcBridge() { - bridges_.push_back(bridge); - return this; +} + +ISrsRtcBridge::~ISrsRtcBridge() +{ +} + +SrsRtcBridge::SrsRtcBridge() +{ + req_ = NULL; +#ifdef SRS_FFMPEG_FIT + frame_builder_ = NULL; +#endif + rtmp_target_ = NULL; +} + +SrsRtcBridge::~SrsRtcBridge() +{ + srs_freep(req_); +#ifdef SRS_FFMPEG_FIT + srs_freep(frame_builder_); +#endif + rtmp_target_ = NULL; +} + +void SrsRtcBridge::enable_rtc2rtmp(SrsSharedPtr rtmp_target) +{ + rtmp_target_ = rtmp_target; +} + +bool SrsRtcBridge::empty() +{ + return !rtmp_target_.get(); +} + +srs_error_t SrsRtcBridge::initialize(ISrsRequest *r) +{ + srs_error_t err = srs_success; + + srs_freep(req_); + req_ = r->copy(); + +#ifdef SRS_FFMPEG_FIT + srs_assert(rtmp_target_.get()); + srs_freep(frame_builder_); + frame_builder_ = new SrsRtcFrameBuilder(rtmp_target_.get()); +#endif + + return err; +} + +srs_error_t SrsRtcBridge::setup_codec(SrsAudioCodecId acodec, SrsVideoCodecId vcodec) +{ + srs_error_t err = srs_success; + +#ifdef SRS_FFMPEG_FIT + srs_assert(frame_builder_); + if ((err = frame_builder_->initialize(req_, acodec, vcodec)) != srs_success) { + return srs_error_wrap(err, "frame builder initialize"); + } +#endif + + return err; +} + +srs_error_t SrsRtcBridge::on_publish() +{ + srs_error_t err = srs_success; + + srs_assert(rtmp_target_.get()); + if ((err = rtmp_target_->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtmp target publish"); + } + +#ifdef SRS_FFMPEG_FIT + srs_assert(frame_builder_); + if ((err = frame_builder_->on_publish()) != srs_success) { + return srs_error_wrap(err, "frame builder on publish"); + } +#endif + + return err; +} + +void SrsRtcBridge::on_unpublish() +{ +#ifdef SRS_FFMPEG_FIT + srs_assert(frame_builder_); + frame_builder_->on_unpublish(); +#endif + + srs_assert(rtmp_target_.get()); + rtmp_target_->on_unpublish(); + + // Note that RTC source free this rtc bridge, after on_unpublish() is called. + // So there is no need to free its components here. +} + +srs_error_t SrsRtcBridge::on_rtp(SrsRtpPacket *pkt) +{ + srs_error_t err = srs_success; + +#ifdef SRS_FFMPEG_FIT + if (frame_builder_ && (err = frame_builder_->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "frame builder on rtp"); + } +#endif + + return err; } diff --git a/trunk/src/app/srs_app_stream_bridge.hpp b/trunk/src/app/srs_app_stream_bridge.hpp index b3aa26735..aead49ca2 100644 --- a/trunk/src/app/srs_app_stream_bridge.hpp +++ b/trunk/src/app/srs_app_stream_bridge.hpp @@ -24,113 +24,185 @@ class SrsRtpPacket; class SrsRtcRtpBuilder; class SrsRtspSource; class SrsRtspRtpBuilder; +class SrsRtcFrameBuilder; +class ISrsStreamBridge; +class SrsSrtFrameBuilder; +class SrsSrtPacket; -// A stream bridge is used to convert stream via different protocols, such as bridge for RTMP and RTC. Generally, we use -// frame as message for bridge. A frame is a audio or video frame, such as an I/B/P frame, a general frame for decoder. -// So you must assemble RTP or TS packets to a video frame if WebRTC or SRT. -class ISrsStreamBridge +// A target to feed AV frame, such as a RTMP live source, or a RTMP bridge +// that take frame and converts to RTC packets, or a SRT bridge that converts +// SRT packets to media frames then delivers to RTMP or RTC targets. +class ISrsFrameTarget { public: - ISrsStreamBridge(); - virtual ~ISrsStreamBridge(); + ISrsFrameTarget(); + virtual ~ISrsFrameTarget(); + +public: + virtual srs_error_t on_frame(SrsMediaPacket *frame) = 0; +}; + +// A target to feed RTP packets, such as a RTC source, or a RTC bridge that +// take RTP packets and converts to AV frames. +class ISrsRtpTarget +{ +public: + ISrsRtpTarget(); + virtual ~ISrsRtpTarget(); + +public: + virtual srs_error_t on_rtp(SrsRtpPacket *pkt) = 0; +}; + +// A target to feed SRT packets, such as a SRT source, or a SRT bridge that +// take SRT packets and converts to AV frames. +class ISrsSrtTarget +{ +public: + ISrsSrtTarget(); + virtual ~ISrsSrtTarget(); + +public: + virtual srs_error_t on_packet(SrsSrtPacket *pkt) = 0; +}; + +// A RTMP bridge is used to convert RTMP stream to different protocols, +// such as bridge to RTC and RTSP. +class ISrsRtmpBridge : public ISrsFrameTarget +{ +public: + ISrsRtmpBridge(); + virtual ~ISrsRtmpBridge(); public: virtual srs_error_t initialize(ISrsRequest *r) = 0; virtual srs_error_t on_publish() = 0; - virtual srs_error_t on_frame(SrsMediaPacket *frame) = 0; virtual void on_unpublish() = 0; }; -// A bridge to feed AV frame to RTMP stream. -class SrsFrameToRtmpBridge : public ISrsStreamBridge +// A RTMP bridge to convert RTMP stream to different protocols, such as RTC and RTSP. +// First, it use a RTP builder to convert RTMP frame to RTP packets. +// Then, deliver the RTP packets to RTP target, which binds to a RTC/RTSP source. +class SrsRtmpBridge : public ISrsRtmpBridge { private: - SrsSharedPtr source_; - -public: - SrsFrameToRtmpBridge(SrsSharedPtr source); - virtual ~SrsFrameToRtmpBridge(); - -public: - srs_error_t initialize(ISrsRequest *r); - -public: - virtual srs_error_t on_publish(); - virtual void on_unpublish(); - -public: - virtual srs_error_t on_frame(SrsMediaPacket *frame); -}; - -// A bridge to covert AV frame to WebRTC stream. -class SrsFrameToRtcBridge : public ISrsStreamBridge -{ -private: - SrsSharedPtr source_; - -private: -#if defined(SRS_FFMPEG_FIT) +#ifdef SRS_FFMPEG_FIT SrsRtcRtpBuilder *rtp_builder_; #endif -public: - SrsFrameToRtcBridge(SrsSharedPtr source); - virtual ~SrsFrameToRtcBridge(); - -public: - virtual srs_error_t initialize(ISrsRequest *r); - virtual srs_error_t on_publish(); - virtual void on_unpublish(); - virtual srs_error_t on_frame(SrsMediaPacket *frame); - srs_error_t on_rtp(SrsRtpPacket *pkt); -}; - #ifdef SRS_RTSP -// A bridge to covert AV frame to RTSP stream. -class SrsFrameToRtspBridge : public ISrsStreamBridge -{ -private: - SrsSharedPtr source_; - -private: - SrsRtspRtpBuilder *rtp_builder_; + SrsRtspRtpBuilder *rtsp_builder_; +#endif + // The Source bridge, bridge stream to other source. + SrsSharedPtr rtc_target_; + SrsSharedPtr rtsp_target_; public: - SrsFrameToRtspBridge(SrsSharedPtr source); - virtual ~SrsFrameToRtspBridge(); + SrsRtmpBridge(); + virtual ~SrsRtmpBridge(); + +public: + bool empty(); + void enable_rtmp2rtc(SrsSharedPtr rtc_source); + void enable_rtmp2rtsp(SrsSharedPtr rtsp_source); public: virtual srs_error_t initialize(ISrsRequest *r); virtual srs_error_t on_publish(); virtual void on_unpublish(); virtual srs_error_t on_frame(SrsMediaPacket *frame); - srs_error_t on_rtp(SrsRtpPacket *pkt); }; -#endif -// A bridge chain, a set of bridges. -class SrsCompositeBridge : public ISrsStreamBridge +// A SRT bridge is used to convert SRT stream to different protocols, +// such as bridge to RTMP and RTC. +class ISrsSrtBridge : public ISrsSrtTarget { public: - SrsCompositeBridge(); - virtual ~SrsCompositeBridge(); + ISrsSrtBridge(); + virtual ~ISrsSrtBridge(); public: - bool empty() { return bridges_.empty(); } // SrsCompositeBridge::empty() -public: - srs_error_t initialize(ISrsRequest *r); + virtual srs_error_t initialize(ISrsRequest *r) = 0; + virtual srs_error_t on_publish() = 0; + virtual void on_unpublish() = 0; +}; + +// A SRT bridge to convert SRT stream to different protocols, such as RTMP and RTC. +// First, it use a frame builder to convert SRT TS packets to AV frames. +// Then, deliver the AV frames to frame target, which binds to a RTMP/RTC source. +class SrsSrtBridge : public ISrsSrtBridge, public ISrsFrameTarget +{ +private: + // Convert SRT TS packets to media frame packets. + SrsSrtFrameBuilder *frame_builder_; + // Deliver media frame packets to RTMP target. + SrsSharedPtr rtmp_target_; + // Convert media frame packets to RTP packets. +#ifdef SRS_FFMPEG_FIT + SrsRtcRtpBuilder *rtp_builder_; +#endif + // Deliver RTP packets to RTC target. + SrsSharedPtr rtc_target_; public: + SrsSrtBridge(); + virtual ~SrsSrtBridge(); + +public: + bool empty(); + void enable_srt2rtmp(SrsSharedPtr rtmp_source); + void enable_srt2rtc(SrsSharedPtr rtc_source); + +public: + virtual srs_error_t initialize(ISrsRequest *r); virtual srs_error_t on_publish(); virtual void on_unpublish(); - -public: + virtual srs_error_t on_packet(SrsSrtPacket *pkt); virtual srs_error_t on_frame(SrsMediaPacket *frame); +}; + +// A RTC RTP bridge is used to convert RTP packets to different protocols, +// such as bridge to RTMP. +class ISrsRtcBridge : public ISrsRtpTarget +{ +public: + ISrsRtcBridge(); + virtual ~ISrsRtcBridge(); public: - SrsCompositeBridge *append(ISrsStreamBridge *bridge); + virtual srs_error_t initialize(ISrsRequest *r) = 0; + virtual srs_error_t setup_codec(SrsAudioCodecId acodec, SrsVideoCodecId vcodec) = 0; + virtual srs_error_t on_publish() = 0; + virtual void on_unpublish() = 0; +}; +// A RTC bridge to convert RTP packets to RTMP stream. +// First, it use a frame builder to convert RTP packets to RTMP frame packet. +// Then, deliver the RTMP frame packet to RTMP target, which binds to a live source. +class SrsRtcBridge : public ISrsRtcBridge +{ private: - std::vector bridges_; + ISrsRequest *req_; +#ifdef SRS_FFMPEG_FIT + // Collect and build WebRTC RTP packets to AV frames. + SrsRtcFrameBuilder *frame_builder_; +#endif + // The Source bridge, bridge stream to other source. + SrsSharedPtr rtmp_target_; + +public: + SrsRtcBridge(); + virtual ~SrsRtcBridge(); + +public: + bool empty(); + void enable_rtc2rtmp(SrsSharedPtr rtmp_target); + +public: + virtual srs_error_t initialize(ISrsRequest *r); + virtual srs_error_t setup_codec(SrsAudioCodecId acodec, SrsVideoCodecId vcodec); + virtual srs_error_t on_publish(); + virtual void on_unpublish(); + virtual srs_error_t on_rtp(SrsRtpPacket *pkt); }; #endif diff --git a/trunk/src/core/srs_core_version7.hpp b/trunk/src/core/srs_core_version7.hpp index d2ca902e7..9e220dfa0 100644 --- a/trunk/src/core/srs_core_version7.hpp +++ b/trunk/src/core/srs_core_version7.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 7 #define VERSION_MINOR 0 -#define VERSION_REVISION 89 +#define VERSION_REVISION 90 #endif \ No newline at end of file diff --git a/trunk/src/utest/srs_utest_app2.cpp b/trunk/src/utest/srs_utest_app2.cpp index 77f67b8fe..05c5bccff 100644 --- a/trunk/src/utest/srs_utest_app2.cpp +++ b/trunk/src/utest/srs_utest_app2.cpp @@ -6,8 +6,8 @@ #include -#include #include +#include #include #include #include @@ -241,39 +241,6 @@ public: } }; -#ifdef SRS_FFMPEG_FIT -// Mock implementation of SrsRtcFrameBuilder for testing -class MockRtcFrameBuilder : public SrsRtcFrameBuilder -{ -public: - int on_rtp_count_; - srs_error_t on_rtp_error_; - - MockRtcFrameBuilder(ISrsStreamBridge *bridge) : SrsRtcFrameBuilder(bridge) - { - on_rtp_count_ = 0; - on_rtp_error_ = srs_success; - } - - virtual ~MockRtcFrameBuilder() - { - srs_freep(on_rtp_error_); - } - - virtual srs_error_t on_rtp(SrsRtpPacket *pkt) - { - on_rtp_count_++; - return srs_error_copy(on_rtp_error_); - } - - void set_on_rtp_error(srs_error_t err) - { - srs_freep(on_rtp_error_); - on_rtp_error_ = srs_error_copy(err); - } -}; -#endif - VOID TEST(AppTest2, AacRawAppendAdtsHeaderSequenceHeader) { srs_error_t err; @@ -1642,74 +1609,6 @@ VOID TEST(AppTest2, RtcSourceOnConsumerDestroyStreamAlive) srs_freep(consumer); } -// Mock implementation of ISrsStreamBridge for testing SrsRtcSource::on_publish -class MockStreamBridge : public ISrsStreamBridge -{ -public: - int initialize_count_; - int on_publish_count_; - int on_unpublish_count_; - int on_frame_count_; - bool should_fail_initialize_; - bool should_fail_on_publish_; - ISrsRequest *last_initialize_request_; - - MockStreamBridge() - { - initialize_count_ = 0; - on_publish_count_ = 0; - on_unpublish_count_ = 0; - on_frame_count_ = 0; - should_fail_initialize_ = false; - should_fail_on_publish_ = false; - last_initialize_request_ = NULL; - } - - virtual ~MockStreamBridge() - { - } - - virtual srs_error_t initialize(ISrsRequest *r) - { - initialize_count_++; - last_initialize_request_ = r; - if (should_fail_initialize_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "mock initialize error"); - } - return srs_success; - } - - virtual srs_error_t on_publish() - { - on_publish_count_++; - if (should_fail_on_publish_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "mock on_publish error"); - } - return srs_success; - } - - virtual srs_error_t on_frame(SrsMediaPacket *frame) - { - on_frame_count_++; - return srs_success; - } - - virtual void on_unpublish() - { - on_unpublish_count_++; - } - - void set_initialize_should_fail(bool should_fail) - { - should_fail_initialize_ = should_fail; - } - - void set_on_publish_should_fail(bool should_fail) - { - should_fail_on_publish_ = should_fail; - } -}; - VOID TEST(AppTest2, RtcSourceOnPublishBasicSuccess) { srs_error_t err; @@ -1778,79 +1677,6 @@ VOID TEST(AppTest2, RtcSourceOnPublishWithConsumers) srs_freep(consumer2); } -VOID TEST(AppTest2, RtcSourceOnPublishWithBridge) -{ - srs_error_t err; - - // Create a mock request - SrsUniquePtr req(new SrsRequest()); - req->host_ = "localhost"; - req->vhost_ = "test.vhost"; - req->app_ = "live"; - req->stream_ = "test"; - - // Create RTC source and initialize - SrsUniquePtr source(new SrsRtcSource()); - HELPER_EXPECT_SUCCESS(source->initialize(req.get())); - - // Create mock bridge - MockStreamBridge *bridge = new MockStreamBridge(); - source->set_bridge(bridge); - - // Verify initial bridge state - EXPECT_EQ(0, bridge->on_publish_count_); - - // Test on_publish with bridge - HELPER_EXPECT_SUCCESS(source->on_publish()); - - // Verify bridge was called - EXPECT_EQ(1, bridge->on_publish_count_); - - // Verify source state - EXPECT_TRUE(source->is_created_); - EXPECT_TRUE(source->is_delivering_packets_); - - // Clean up properly by calling on_unpublish to unsubscribe from timer - source->on_unpublish(); - - // Note: bridge is freed by source destructor, don't free manually -} - -VOID TEST(AppTest2, RtcSourceOnPublishBridgeError) -{ - srs_error_t err; - - // Create a mock request - SrsUniquePtr req(new SrsRequest()); - req->host_ = "localhost"; - req->vhost_ = "test.vhost"; - req->app_ = "live"; - req->stream_ = "test"; - - // Create RTC source and initialize - SrsUniquePtr source(new SrsRtcSource()); - HELPER_EXPECT_SUCCESS(source->initialize(req.get())); - - // Create mock bridge that will fail on_publish - MockStreamBridge *bridge = new MockStreamBridge(); - bridge->set_on_publish_should_fail(true); - source->set_bridge(bridge); - - // Test on_publish with bridge error - should fail - HELPER_EXPECT_FAILED(source->on_publish()); - - // Verify bridge was called - EXPECT_EQ(1, bridge->on_publish_count_); - - // Note: Even though on_publish failed, we should still clean up properly - // The bridge subscription might have succeeded before the error - if (source->is_created_) { - source->on_unpublish(); - } - - // Note: bridge is freed by source destructor, don't free manually -} - VOID TEST(AppTest2, RtcSourceOnPublishConsumerError) { srs_error_t err; @@ -2005,109 +1831,6 @@ VOID TEST(AppTest2, RtcSourceOnPublishSourceIdChange) srs_freep(consumer); } -VOID TEST(AppTest2, RtcSourceOnPublishMultipleBridgeOperations) -{ - srs_error_t err; - - // Create a mock request - SrsUniquePtr req(new SrsRequest()); - req->host_ = "localhost"; - req->vhost_ = "test.vhost"; - req->app_ = "live"; - req->stream_ = "test"; - - // Create RTC source and initialize - SrsUniquePtr source(new SrsRtcSource()); - HELPER_EXPECT_SUCCESS(source->initialize(req.get())); - - // Create mock bridge - MockStreamBridge *bridge = new MockStreamBridge(); - source->set_bridge(bridge); - - // Test multiple on_publish calls - HELPER_EXPECT_SUCCESS(source->on_publish()); - EXPECT_EQ(1, bridge->on_publish_count_); - - HELPER_EXPECT_SUCCESS(source->on_publish()); - EXPECT_EQ(2, bridge->on_publish_count_); - - HELPER_EXPECT_SUCCESS(source->on_publish()); - EXPECT_EQ(3, bridge->on_publish_count_); - - // Clean up properly by calling on_unpublish to unsubscribe from timer - source->on_unpublish(); - - // Note: bridge is freed by source destructor -} - -VOID TEST(AppTest2, RtcSourceOnPublishCompleteWorkflow) -{ - srs_error_t err; - - // Create a mock request - SrsUniquePtr req(new SrsRequest()); - req->host_ = "localhost"; - req->vhost_ = "test.vhost"; - req->app_ = "live"; - req->stream_ = "test"; - - // Create RTC source and initialize - SrsUniquePtr source(new SrsRtcSource()); - HELPER_EXPECT_SUCCESS(source->initialize(req.get())); - - // Create stream description - SrsRtcSourceDescription *stream_desc = new SrsRtcSourceDescription(); - stream_desc->audio_track_desc_ = new SrsRtcTrackDescription(); - stream_desc->audio_track_desc_->type_ = "audio"; - stream_desc->audio_track_desc_->media_ = new SrsAudioPayload(111, "opus", 48000, 2); - source->set_stream_desc(stream_desc); - - // Create mock bridge - MockStreamBridge *bridge = new MockStreamBridge(); - source->set_bridge(bridge); - - // Create mock consumers - MockRtcConsumer *consumer1 = new MockRtcConsumer(); - MockRtcConsumer *consumer2 = new MockRtcConsumer(); - source->consumers_.push_back(consumer1); - source->consumers_.push_back(consumer2); - - // Create mock event handler - MockRtcSourceEventHandler *handler = new MockRtcSourceEventHandler(); - source->subscribe(handler); - - // Verify initial state - EXPECT_FALSE(source->is_created_); - EXPECT_FALSE(source->is_delivering_packets_); - EXPECT_EQ(0, bridge->on_publish_count_); - EXPECT_EQ(0, consumer1->update_source_id_count_); - EXPECT_EQ(0, consumer2->update_source_id_count_); - - // Test complete on_publish workflow - HELPER_EXPECT_SUCCESS(source->on_publish()); - - // Verify all components were properly handled - EXPECT_TRUE(source->is_created_); - EXPECT_TRUE(source->is_delivering_packets_); - EXPECT_EQ(1, bridge->on_publish_count_); - EXPECT_EQ(1, consumer1->update_source_id_count_); - EXPECT_EQ(1, consumer1->stream_change_count_); - EXPECT_EQ(1, consumer2->update_source_id_count_); - EXPECT_EQ(1, consumer2->stream_change_count_); - - // Verify source ID was set - EXPECT_FALSE(source->source_id().empty()); - - // Clean up properly by calling on_unpublish to unsubscribe from timer - source->on_unpublish(); - - // Clean up - srs_freep(consumer1); - srs_freep(consumer2); - srs_freep(handler); - srs_freep(stream_desc); -} - VOID TEST(AppTest2, RtcSourceSubscribeBasic) { srs_error_t err; @@ -2833,262 +2556,6 @@ VOID TEST(AppTest2, RtcSourceOnRtpConsumerEnqueueError) srs_freep(consumer3); } -#ifdef SRS_FFMPEG_FIT -VOID TEST(AppTest2, RtcSourceOnRtpWithFrameBuilder) -{ - srs_error_t err; - - // Create a mock request - SrsUniquePtr req(new SrsRequest()); - req->ip_ = "127.0.0.1"; - req->vhost_ = "test.vhost"; - req->app_ = "live"; - req->stream_ = "test"; - - // Create RTC source and initialize - SrsUniquePtr source(new SrsRtcSource()); - HELPER_EXPECT_SUCCESS(source->initialize(req.get())); - - // Create mock consumer - MockRtcConsumer *consumer = new MockRtcConsumer(); - source->consumers_.push_back(consumer); - - // Create mock bridge and frame builder - MockStreamBridge *mock_bridge = new MockStreamBridge(); - MockRtcFrameBuilder *mock_frame_builder = new MockRtcFrameBuilder(mock_bridge); - source->frame_builder_ = mock_frame_builder; - - // Create a test RTP packet - SrsUniquePtr pkt(new SrsRtpPacket()); - pkt->header_.set_sequence(100); - pkt->header_.set_timestamp(1000); - pkt->header_.set_ssrc(12345); - - // Set mock circuit breaker - MockCircuitBreaker mock_circuit_breaker; - source->circuit_breaker_ = &mock_circuit_breaker; - - // Test: on_rtp should call frame builder - HELPER_EXPECT_SUCCESS(source->on_rtp(pkt.get())); - - // Verify consumer received packet - EXPECT_EQ(1, consumer->enqueue_count_); - - // Verify frame builder was called - EXPECT_EQ(1, mock_frame_builder->on_rtp_count_); - - // Clean up (don't free frame_builder as it's owned by source) - source->frame_builder_ = NULL; // Prevent double free - srs_freep(mock_frame_builder); - srs_freep(mock_bridge); - srs_freep(consumer); -} - -VOID TEST(AppTest2, RtcSourceOnRtpFrameBuilderError) -{ - srs_error_t err; - - // Create a mock request - SrsUniquePtr req(new SrsRequest()); - req->ip_ = "127.0.0.1"; - req->vhost_ = "test.vhost"; - req->app_ = "live"; - req->stream_ = "test"; - - // Create RTC source and initialize - SrsUniquePtr source(new SrsRtcSource()); - HELPER_EXPECT_SUCCESS(source->initialize(req.get())); - - // Create mock consumer - MockRtcConsumer *consumer = new MockRtcConsumer(); - source->consumers_.push_back(consumer); - - // Create mock bridge and frame builder that will fail - MockStreamBridge *mock_bridge = new MockStreamBridge(); - MockRtcFrameBuilder *mock_frame_builder = new MockRtcFrameBuilder(mock_bridge); - srs_error_t test_error = srs_error_new(ERROR_SYSTEM_ASSERT_FAILED, "test frame builder error"); - mock_frame_builder->set_on_rtp_error(test_error); - srs_freep(test_error); - source->frame_builder_ = mock_frame_builder; - - // Create a test RTP packet - SrsUniquePtr pkt(new SrsRtpPacket()); - pkt->header_.set_sequence(100); - pkt->header_.set_timestamp(1000); - pkt->header_.set_ssrc(12345); - - // Set mock circuit breaker - MockCircuitBreaker mock_circuit_breaker; - source->circuit_breaker_ = &mock_circuit_breaker; - - // Test: on_rtp should fail when frame builder fails - HELPER_EXPECT_FAILED(source->on_rtp(pkt.get())); - - // Verify consumer still received packet (consumers are processed first) - EXPECT_EQ(1, consumer->enqueue_count_); - - // Verify frame builder was called - EXPECT_EQ(1, mock_frame_builder->on_rtp_count_); - - // Clean up - source->frame_builder_ = NULL; // Prevent double free - srs_freep(mock_frame_builder); - srs_freep(mock_bridge); - srs_freep(consumer); -} - -VOID TEST(AppTest2, RtcSourceOnRtpNoFrameBuilder) -{ - srs_error_t err; - - // Create a mock request - SrsUniquePtr req(new SrsRequest()); - req->ip_ = "127.0.0.1"; - req->vhost_ = "test.vhost"; - req->app_ = "live"; - req->stream_ = "test"; - - // Create RTC source and initialize - SrsUniquePtr source(new SrsRtcSource()); - HELPER_EXPECT_SUCCESS(source->initialize(req.get())); - - // Create mock consumer - MockRtcConsumer *consumer = new MockRtcConsumer(); - source->consumers_.push_back(consumer); - - // Ensure frame_builder_ is NULL (default state) - EXPECT_TRUE(source->frame_builder_ == NULL); - - // Create a test RTP packet - SrsUniquePtr pkt(new SrsRtpPacket()); - pkt->header_.set_sequence(100); - pkt->header_.set_timestamp(1000); - pkt->header_.set_ssrc(12345); - - // Set mock circuit breaker - MockCircuitBreaker mock_circuit_breaker; - source->circuit_breaker_ = &mock_circuit_breaker; - - // Test: on_rtp should succeed even without frame builder - HELPER_EXPECT_SUCCESS(source->on_rtp(pkt.get())); - - // Verify consumer received packet - EXPECT_EQ(1, consumer->enqueue_count_); - - // Clean up - srs_freep(consumer); -} - -VOID TEST(AppTest2, RtcSourceOnRtpCompleteScenario) -{ - srs_error_t err; - - // Create a mock request - SrsUniquePtr req(new SrsRequest()); - req->ip_ = "127.0.0.1"; - req->vhost_ = "test.vhost"; - req->app_ = "live"; - req->stream_ = "test"; - - // Create RTC source and initialize - SrsUniquePtr source(new SrsRtcSource()); - HELPER_EXPECT_SUCCESS(source->initialize(req.get())); - - // Create multiple mock consumers - MockRtcConsumer *consumer1 = new MockRtcConsumer(); - MockRtcConsumer *consumer2 = new MockRtcConsumer(); - MockRtcConsumer *consumer3 = new MockRtcConsumer(); - source->consumers_.push_back(consumer1); - source->consumers_.push_back(consumer2); - source->consumers_.push_back(consumer3); - - // Create mock bridge and frame builder - MockStreamBridge *mock_bridge = new MockStreamBridge(); - MockRtcFrameBuilder *mock_frame_builder = new MockRtcFrameBuilder(mock_bridge); - source->frame_builder_ = mock_frame_builder; - - // Create a test RTP packet - SrsUniquePtr pkt(new SrsRtpPacket()); - pkt->header_.set_sequence(100); - pkt->header_.set_timestamp(1000); - pkt->header_.set_ssrc(12345); - - // Create mock circuit breaker - MockCircuitBreaker mock_circuit_breaker; - source->circuit_breaker_ = &mock_circuit_breaker; - - // Test 1: Normal operation - circuit breaker not dying - HELPER_EXPECT_SUCCESS(source->on_rtp(pkt.get())); - - // Verify all consumers received packet - EXPECT_EQ(1, consumer1->enqueue_count_); - EXPECT_EQ(1, consumer2->enqueue_count_); - EXPECT_EQ(1, consumer3->enqueue_count_); - - // Verify frame builder was called - EXPECT_EQ(1, mock_frame_builder->on_rtp_count_); - - // Test 2: Circuit breaker dying - should drop packet - mock_circuit_breaker.set_hybrid_dying_water_level(true); - - HELPER_EXPECT_SUCCESS(source->on_rtp(pkt.get())); - - // Verify consumers did not receive second packet (dropped due to circuit breaker) - EXPECT_EQ(1, consumer1->enqueue_count_); - EXPECT_EQ(1, consumer2->enqueue_count_); - EXPECT_EQ(1, consumer3->enqueue_count_); - - // Verify frame builder was not called again - EXPECT_EQ(1, mock_frame_builder->on_rtp_count_); - - // Clean up - source->frame_builder_ = NULL; // Prevent double free - srs_freep(mock_frame_builder); - srs_freep(mock_bridge); - srs_freep(consumer1); - srs_freep(consumer2); - srs_freep(consumer3); -} - -VOID TEST(AppTest2, RtcSourceOnRtpNullCircuitBreaker) -{ - srs_error_t err; - - // Create a mock request - SrsUniquePtr req(new SrsRequest()); - req->ip_ = "127.0.0.1"; - req->vhost_ = "test.vhost"; - req->app_ = "live"; - req->stream_ = "test"; - - // Create RTC source and initialize - SrsUniquePtr source(new SrsRtcSource()); - HELPER_EXPECT_SUCCESS(source->initialize(req.get())); - - // Create consumer - MockRtcConsumer *consumer = new MockRtcConsumer(); - source->consumers_.push_back(consumer); - - // Create a test RTP packet - SrsUniquePtr pkt(new SrsRtpPacket()); - pkt->header_.set_sequence(100); - pkt->header_.set_timestamp(1000); - pkt->header_.set_ssrc(12345); - - // Set circuit breaker to NULL - source->circuit_breaker_ = NULL; - - // Test: on_rtp should succeed even with NULL circuit breaker - HELPER_EXPECT_SUCCESS(source->on_rtp(pkt.get())); - - // Verify consumer received packet - EXPECT_EQ(1, consumer->enqueue_count_); - - // Clean up - srs_freep(consumer); -} -#endif - VOID TEST(AppTest2, RtcSourceGetTrackDescNoStreamDesc) { srs_error_t err; @@ -3571,4 +3038,3 @@ VOID TEST(AppTest2, RtcSourceGetTrackDescMultipleMatchingVideoTracks) EXPECT_EQ("video-h264-track-2", all_video_tracks[1]->id_); EXPECT_EQ("video-h265-track", all_video_tracks[2]->id_); } - diff --git a/trunk/src/utest/srs_utest_rtc2.cpp b/trunk/src/utest/srs_utest_rtc2.cpp index ff95ed0a8..d16dc3922 100644 --- a/trunk/src/utest/srs_utest_rtc2.cpp +++ b/trunk/src/utest/srs_utest_rtc2.cpp @@ -1515,123 +1515,6 @@ VOID TEST(KernelRTC2Test, SrsRtcFrameBuilderVideoFrameDetectorNullPacketHandling } } -// Mock bridge for testing SrsRtcFrameBuilder -class MockRtcFrameBuilderBridge : public ISrsStreamBridge -{ -public: - srs_error_t last_error; - int frame_count; - - MockRtcFrameBuilderBridge() - { - last_error = NULL; - frame_count = 0; - } - - virtual ~MockRtcFrameBuilderBridge() - { - srs_freep(last_error); - } - - virtual srs_error_t initialize(ISrsRequest *r) - { - return srs_success; - } - - virtual srs_error_t on_publish() - { - return srs_success; - } - - virtual srs_error_t on_frame(SrsMediaPacket *frame) - { - frame_count++; - return srs_success; - } - - virtual void on_unpublish() - { - } -}; - -VOID TEST(KernelRTC2Test, SrsRtcFrameBuilderPacketVideoRtmpNullPointerCrash) -{ - srs_error_t err; - - // Test reproducing the null pointer crash from issue #4450 fixed by PR #4451 - // - // ISSUE BACKGROUND: - // Before PR 4451, the packet_video_rtmp() function assumed that the packet at the - // 'start' sequence number would always be available in the cache. However, due to - // network packet loss or reordering, the packet at the start position could be missing. - // - // THE CRASH: - // The original code did: pkt = cache_video_pkts_[cache_index(start)].pkt; - // When pkt was NULL, calling pkt->get_avsync_time() caused a segmentation fault. - // - // THE FIX: - // PR #4451 added a loop to find the first non-null packet in the sequence range - // instead of blindly using the packet at the start position. - // - // This test simulates the crash scenario: packets exist at positions 101, 102, 103 - // but the start packet (100) is missing. With the fix, the function should use - // packet 101 (first available) instead of crashing on the missing packet 100. - if (true) { - MockRtcFrameBuilderBridge bridge; - SrsRtcFrameBuilder frame_builder(&bridge); - - // Skip initialization and directly set up the test scenario - // We only need to test the packet_video_rtmp function, not the full initialization - - // Manually populate the video cache to simulate the crash scenario - // We'll store packets at positions 101, 102, 103 but NOT at position 100 (start) - // This simulates network packet loss where the first packet is missing - - // Create test packets with payload to ensure nb_payload > 0 - SrsRtpPacket *pkt101 = mock_create_test_rtp_packet(101, 1000, false); - SrsRtpPacket *pkt102 = mock_create_test_rtp_packet(102, 1000, false); - SrsRtpPacket *pkt103 = mock_create_test_rtp_packet(103, 1000, true); // marker bit - - // Add some payload to ensure packets are not empty - char payload_data[] = "test_payload_data"; - SrsRtpRawPayload *payload101 = new SrsRtpRawPayload(); - payload101->payload_ = (char *)payload_data; - payload101->nn_payload_ = strlen(payload_data); - pkt101->set_payload(payload101, SrsRtpPacketPayloadTypeRaw); - - SrsRtpRawPayload *payload102 = new SrsRtpRawPayload(); - payload102->payload_ = (char *)payload_data; - payload102->nn_payload_ = strlen(payload_data); - pkt102->set_payload(payload102, SrsRtpPacketPayloadTypeRaw); - - SrsRtpRawPayload *payload103 = new SrsRtpRawPayload(); - payload103->payload_ = (char *)payload_data; - payload103->nn_payload_ = strlen(payload_data); - pkt103->set_payload(payload103, SrsRtpPacketPayloadTypeRaw); - - // Set the avsync time for the packets to avoid other null pointer issues - pkt101->set_avsync_time(1000); - pkt102->set_avsync_time(1000); - pkt103->set_avsync_time(1000); - - // Store packets in cache (but skip sequence 100 - this is the missing start packet) - frame_builder.video_cache_->store_packet(pkt101); - frame_builder.video_cache_->store_packet(pkt102); - frame_builder.video_cache_->store_packet(pkt103); - - // Before the fix in PR #4451, calling packet_video_rtmp(100, 103) would crash - // because it would try to access cache_video_pkts_[cache_index(100)].pkt which is NULL - // and then call pkt->get_avsync_time() causing a null pointer dereference - - // The fix ensures we find the first non-null packet (101) instead of assuming - // the start packet (100) exists - HELPER_EXPECT_SUCCESS(frame_builder.packet_video_rtmp(100, 103)); - - // Verify that a frame was successfully processed - EXPECT_EQ(1, bridge.frame_count); - } -} - VOID TEST(KernelRTC2Test, SrsRtcFrameBuilderSequenceWrapAroundFix) { // Test for the sequence number wraparound assertion fix