AI: Refine RTMP/SRT/RTC bridge. v7.0.90 (#4503)

This PR refactors the stream bridge architecture in SRS to improve code
organization, type safety, and maintainability by replacing the generic
ISrsStreamBridge interface with protocol-specific bridge classes and
target interfaces.

1. New Target Interface Architecture:

- Introduces  ISrsFrameTarget for AV frame consumers (RTMP sources)
- Introduces  ISrsRtpTarget for RTP packet consumers (RTC sources)
- Introduces ISrsSrtTarget for SRT packet consumers (SRT sources)

2. Protocol-Specific Bridge Classes:

- SrsRtmpBridge: Converts RTMP frames to RTC/RTSP protocols
-  SrsSrtBridge: Converts SRT packets to RTMP/RTC protocols
-  SrsRtcBridge: Converts RTC packets to RTMP protocol

3. Simplified Bridge Management:

- Removes the generic SrsCompositeBridge chain pattern
- Each source type now uses its appropriate bridge type directly

With this improvement, you are able to implement very complex bridge and
protocol converting, for example, you can bridge RTMP to RTC with opus
audio when you support enhanced RTMP with opus.

Another plan is to support bridging RTC to RTSP, directly without
converting RTP to media frame packet, but directly deliver RTP packet
from RTC source to RTSP source.

---------

Co-authored-by: OSSRS-AI <winlinam@gmail.com>
This commit is contained in:
Winlin 2025-09-19 21:50:28 -04:00 committed by GitHub
parent e999de09ea
commit 20f6cd595c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 655 additions and 1072 deletions

View File

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a>
## SRS 7.0 Changelog
* v7.0, 2025-09-19, Merge [#4503](https://github.com/ossrs/srs/pull/4503): AI: Refine RTMP/SRT/RTC bridge. v7.0.90 (#4503)
* v7.0, 2025-09-15, RTC2RTMP: Fix sequence number wraparound assertion crashes. v7.0.89 (#4491)
* v7.0, 2025-09-14, Merge [#4489](https://github.com/ossrs/srs/pull/4489): Improve coverage for kernel. v7.0.88 (#4489)
* v7.0, 2025-09-14, Merge [#4488](https://github.com/ossrs/srs/pull/4488): AI: Add utests for kernel and protocol. v7.0.87 (#4488)

View File

@ -1205,8 +1205,11 @@ srs_error_t SrsRtcPublishStream::initialize(ISrsRequest *r, SrsRtcSourceDescript
source_->set_publish_stream(this);
// TODO: FIMXE: Check it in SrsRtcConnection::add_publisher?
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(r);
if (live_source.get() && !live_source->can_publish(false)) {
SrsSharedPtr<SrsLiveSource> live_source;
if ((err = _srs_sources->fetch_or_create(r, live_source)) != srs_success) {
return srs_error_wrap(err, "create live source");
}
if (!live_source->can_publish(false)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str());
}
@ -1224,29 +1227,34 @@ srs_error_t SrsRtcPublishStream::initialize(ISrsRequest *r, SrsRtcSourceDescript
}
}
// Bridge to rtmp
#if defined(SRS_FFMPEG_FIT)
// Create the bridge for RTC.
SrsRtcBridge *bridge = new SrsRtcBridge();
// Bridge to RTMP.
// TODO: Support bridge to RTSP.
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost_);
if (rtc_to_rtmp) {
if ((err = _srs_sources->fetch_or_create(r, live_source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
// Disable GOP cache for RTC2RTMP bridge, to keep the streams in sync,
// especially for stream merging.
live_source->set_cache(false);
SrsCompositeBridge *bridge = new SrsCompositeBridge();
bridge->append(new SrsFrameToRtmpBridge(live_source));
if ((err = bridge->initialize(r)) != srs_success) {
srs_freep(bridge);
return srs_error_wrap(err, "create bridge");
}
source_->set_bridge(bridge);
// Convert RTC to RTMP.
bridge->enable_rtc2rtmp(live_source);
}
#endif
if (bridge->empty()) {
srs_freep(bridge);
} else if ((err = bridge->initialize(r)) != srs_success) {
srs_freep(bridge);
return srs_error_wrap(err, "create bridge");
}
if ((err = bridge->initialize(r)) != srs_success) {
srs_freep(bridge);
return srs_error_wrap(err, "create bridge");
}
source_->set_bridge(bridge);
return err;
}

View File

@ -396,10 +396,7 @@ SrsRtcSource::SrsRtcSource()
stream_desc_ = NULL;
req_ = NULL;
bridge_ = NULL;
#ifdef SRS_FFMPEG_FIT
frame_builder_ = NULL;
#endif
rtc_bridge_ = NULL;
circuit_breaker_ = _srs_circuit_breaker;
pli_for_rtmp_ = pli_elapsed_ = 0;
@ -412,10 +409,7 @@ SrsRtcSource::~SrsRtcSource()
// for all consumers are auto free.
consumers_.clear();
#ifdef SRS_FFMPEG_FIT
srs_freep(frame_builder_);
#endif
srs_freep(bridge_);
srs_freep(rtc_bridge_);
srs_freep(req_);
srs_freep(stream_desc_);
@ -591,15 +585,10 @@ SrsContextId SrsRtcSource::pre_source_id()
return _pre_source_id;
}
void SrsRtcSource::set_bridge(ISrsStreamBridge *bridge)
void SrsRtcSource::set_bridge(ISrsRtcBridge *bridge)
{
srs_freep(bridge_);
bridge_ = bridge;
#ifdef SRS_FFMPEG_FIT
srs_freep(frame_builder_);
frame_builder_ = new SrsRtcFrameBuilder(bridge);
#endif
srs_freep(rtc_bridge_);
rtc_bridge_ = bridge;
}
srs_error_t SrsRtcSource::create_consumer(ISrsRtcConsumer *&consumer)
@ -678,31 +667,30 @@ srs_error_t SrsRtcSource::on_publish()
return srs_error_wrap(err, "source id change");
}
// Setup the audio and video codec.
SrsAudioCodecId audio_codec = SrsAudioCodecIdOpus;
if (stream_desc_->audio_track_desc_ && stream_desc_->audio_track_desc_->media_) {
audio_codec = SrsAudioCodecId(stream_desc_->audio_track_desc_->media_->codec(false));
}
SrsVideoCodecId video_codec = SrsVideoCodecIdAVC;
if (stream_desc_->video_track_descs_.size() > 0) {
SrsRtcTrackDescription *track_desc = stream_desc_->video_track_descs_.at(0);
video_codec = SrsVideoCodecId(track_desc->media_->codec(true));
}
// If bridge to other source, handle event and start timer to request PLI.
if (bridge_) {
#ifdef SRS_FFMPEG_FIT
SrsAudioCodecId audio_codec = SrsAudioCodecIdOpus;
if (stream_desc_->audio_track_desc_ && stream_desc_->audio_track_desc_->media_) {
audio_codec = SrsAudioCodecId(stream_desc_->audio_track_desc_->media_->codec(false));
if (rtc_bridge_) {
if ((err = rtc_bridge_->initialize(req_)) != srs_success) {
return srs_error_wrap(err, "rtp bridge initialize");
}
SrsVideoCodecId video_codec = SrsVideoCodecIdAVC;
if (stream_desc_->video_track_descs_.size() > 0) {
SrsRtcTrackDescription *track_desc = stream_desc_->video_track_descs_.at(0);
video_codec = SrsVideoCodecId(track_desc->media_->codec(true));
if ((err = rtc_bridge_->setup_codec(audio_codec, video_codec)) != srs_success) {
return srs_error_wrap(err, "rtp bridge setup codec");
}
if ((err = frame_builder_->initialize(req_, audio_codec, video_codec)) != srs_success) {
return srs_error_wrap(err, "frame builder initialize");
}
if ((err = frame_builder_->on_publish()) != srs_success) {
return srs_error_wrap(err, "frame builder on publish");
}
#endif
if ((err = bridge_->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridge on publish");
if ((err = rtc_bridge_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtp bridge on publish");
}
// The PLI interval for RTC2RTMP.
@ -741,17 +729,12 @@ void SrsRtcSource::on_unpublish()
}
// free bridge resource
if (bridge_) {
if (rtc_bridge_) {
// For SrsRtcSource::on_timer()
_srs_shared_timer->timer100ms()->unsubscribe(this);
#ifdef SRS_FFMPEG_FIT
frame_builder_->on_unpublish();
srs_freep(frame_builder_);
#endif
bridge_->on_unpublish();
srs_freep(bridge_);
rtc_bridge_->on_unpublish();
srs_freep(rtc_bridge_);
}
SrsStatistic *stat = SrsStatistic::instance();
@ -806,11 +789,9 @@ srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket *pkt)
}
}
#ifdef SRS_FFMPEG_FIT
if (frame_builder_ && (err = frame_builder_->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "frame builder consume packet");
if (rtc_bridge_ && (err = rtc_bridge_->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "rtp bridge consume packet");
}
#endif
return err;
}
@ -894,9 +875,9 @@ srs_error_t SrsRtcSource::on_timer(srs_utime_t interval)
#ifdef SRS_FFMPEG_FIT
SrsRtcRtpBuilder::SrsRtcRtpBuilder(SrsFrameToRtcBridge *bridge, SrsSharedPtr<SrsRtcSource> source)
SrsRtcRtpBuilder::SrsRtcRtpBuilder(ISrsRtpTarget *target, SrsSharedPtr<SrsRtcSource> source)
{
bridge_ = bridge;
rtp_target_ = target;
source_ = source;
req_ = NULL;
@ -1155,7 +1136,7 @@ srs_error_t SrsRtcRtpBuilder::transcode(SrsParsedAudioPacket *audio)
break;
}
if ((err = bridge_->on_rtp(pkt.get())) != srs_success) {
if ((err = rtp_target_->on_rtp(pkt.get())) != srs_success) {
err = srs_error_wrap(err, "consume opus");
break;
}
@ -1247,7 +1228,7 @@ srs_error_t SrsRtcRtpBuilder::on_video(SrsMediaPacket *msg)
return srs_error_wrap(err, "package stap-a");
}
if ((err = bridge_->on_rtp(pkt.get())) != srs_success) {
if ((err = rtp_target_->on_rtp(pkt.get())) != srs_success) {
return srs_error_wrap(err, "consume sps/pps");
}
}
@ -1383,7 +1364,7 @@ srs_error_t SrsRtcRtpBuilder::consume_packets(vector<SrsRtpPacket *> &pkts)
// TODO: FIXME: Consume a range of packets.
for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket *pkt = pkts[i];
if ((err = bridge_->on_rtp(pkt)) != srs_success) {
if ((err = rtp_target_->on_rtp(pkt)) != srs_success) {
err = srs_error_wrap(err, "consume sps/pps");
break;
}
@ -1758,9 +1739,9 @@ void SrsRtcFrameBuilderAudioPacketCache::clear_all()
audio_buffer_.clear();
}
SrsRtcFrameBuilder::SrsRtcFrameBuilder(ISrsStreamBridge *bridge)
SrsRtcFrameBuilder::SrsRtcFrameBuilder(ISrsFrameTarget *target)
{
bridge_ = bridge;
frame_target_ = target;
is_first_audio_ = true;
audio_transcoder_ = NULL;
video_codec_ = SrsVideoCodecIdAVC;
@ -1892,7 +1873,7 @@ srs_error_t SrsRtcFrameBuilder::transcode_audio(SrsRtpPacket *pkt)
SrsMediaPacket msg;
out_rtmp.to_msg(&msg);
if ((err = bridge_->on_frame(&msg)) != srs_success) {
if ((err = frame_target_->on_frame(&msg)) != srs_success) {
return srs_error_wrap(err, "source on audio");
}
@ -1922,7 +1903,7 @@ srs_error_t SrsRtcFrameBuilder::transcode_audio(SrsRtpPacket *pkt)
SrsMediaPacket msg;
out_rtmp.to_msg(&msg);
if ((err = bridge_->on_frame(&msg)) != srs_success) {
if ((err = frame_target_->on_frame(&msg)) != srs_success) {
err = srs_error_wrap(err, "source on audio");
break;
}
@ -2099,7 +2080,7 @@ srs_error_t SrsRtcFrameBuilder::do_packet_sequence_header_avc(SrsRtpPacket *pkt,
SrsMediaPacket msg;
rtmp.to_msg(&msg);
if ((err = bridge_->on_frame(&msg)) != srs_success) {
if ((err = frame_target_->on_frame(&msg)) != srs_success) {
return err;
}
@ -2196,7 +2177,7 @@ srs_error_t SrsRtcFrameBuilder::do_packet_sequence_header_hevc(SrsRtpPacket *pkt
SrsMediaPacket msg;
rtmp.to_msg(&msg);
if ((err = bridge_->on_frame(&msg)) != srs_success) {
if ((err = frame_target_->on_frame(&msg)) != srs_success) {
return err;
}
@ -2452,7 +2433,7 @@ srs_error_t SrsRtcFrameBuilder::packet_video_rtmp(const uint16_t start, const ui
SrsMediaPacket msg;
rtmp.to_msg(&msg);
if ((err = bridge_->on_frame(&msg)) != srs_success) {
if ((err = frame_target_->on_frame(&msg)) != srs_success) {
srs_warn("fail to pack video frame: %s", srs_error_summary(err).c_str());
srs_freep(err);
}

View File

@ -29,7 +29,6 @@ class SrsMediaPacket;
class SrsRtmpCommonMessage;
class SrsMessageArray;
class SrsRtcSource;
class SrsFrameToRtcBridge;
class SrsAudioTranscoder;
class SrsRtpPacket;
class SrsNaluSample;
@ -60,6 +59,7 @@ const int SLIDING_WINDOW_SIZE = 10;
// Maximum waiting time for out-of-order packets (in ms)
const int MAX_AUDIO_WAIT_MS = 100;
// NTP time for RTC.
class SrsNtp
{
public:
@ -157,6 +157,7 @@ public:
void on_stream_change(SrsRtcSourceDescription *desc);
};
// The RTC source manager.
class SrsRtcSourceManager : public ISrsHourGlass
{
private:
@ -203,6 +204,7 @@ public:
virtual const SrsContextId &context_id() = 0;
};
// The event handler for RTC source.
class ISrsRtcSourceEventHandler
{
public:
@ -217,8 +219,12 @@ public:
};
// A Source is a stream, to publish and to play with, binding to SrsRtcPublishStream and SrsRtcPlayStream.
class SrsRtcSource : public ISrsFastTimer, public ISrsRtcSourceForConsumer
class SrsRtcSource : public ISrsRtpTarget, public ISrsFastTimer, public ISrsRtcSourceForConsumer
{
private:
// The RTP bridge, convert RTP packets to other protocols.
ISrsRtcBridge *rtc_bridge_;
private:
// Circuit breaker for protecting server resources.
ISrsCircuitBreaker *circuit_breaker_;
@ -234,14 +240,6 @@ private:
// Steam description for this steam.
SrsRtcSourceDescription *stream_desc_;
private:
#ifdef SRS_FFMPEG_FIT
// Collect and build WebRTC RTP packets to AV frames.
SrsRtcFrameBuilder *frame_builder_;
#endif
// The Source bridge, bridge stream to other source.
ISrsStreamBridge *bridge_;
private:
// To delivery stream to clients.
std::vector<ISrsRtcConsumer *> consumers_;
@ -289,7 +287,7 @@ public:
virtual SrsContextId pre_source_id();
public:
void set_bridge(ISrsStreamBridge *bridge);
virtual void set_bridge(ISrsRtcBridge *bridge);
public:
// Create consumer
@ -313,19 +311,19 @@ public:
public:
// For event handler
void subscribe(ISrsRtcSourceEventHandler *h);
void unsubscribe(ISrsRtcSourceEventHandler *h);
virtual void subscribe(ISrsRtcSourceEventHandler *h);
virtual void unsubscribe(ISrsRtcSourceEventHandler *h);
public:
// Get and set the publisher, passed to consumer to process requests such as PLI.
ISrsRtcPublishStream *publish_stream();
void set_publish_stream(ISrsRtcPublishStream *v);
virtual ISrsRtcPublishStream *publish_stream();
virtual void set_publish_stream(ISrsRtcPublishStream *v);
// Consume the shared RTP packet, user must free it.
srs_error_t on_rtp(SrsRtpPacket *pkt);
virtual srs_error_t on_rtp(SrsRtpPacket *pkt);
// Set and get stream description for source
bool has_stream_desc();
void set_stream_desc(SrsRtcSourceDescription *stream_desc);
std::vector<SrsRtcTrackDescription *> get_track_desc(std::string type, std::string media_type);
virtual bool has_stream_desc();
virtual void set_stream_desc(SrsRtcSourceDescription *stream_desc);
virtual std::vector<SrsRtcTrackDescription *> get_track_desc(std::string type, std::string media_type);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
@ -338,7 +336,7 @@ class SrsRtcRtpBuilder
{
private:
ISrsRequest *req_;
SrsFrameToRtcBridge *bridge_;
ISrsRtpTarget *rtp_target_;
// The format, codec information.
SrsRtmpFormat *format_;
// The metadata cache.
@ -365,7 +363,7 @@ private:
bool video_initialized_;
public:
SrsRtcRtpBuilder(SrsFrameToRtcBridge *bridge, SrsSharedPtr<SrsRtcSource> source);
SrsRtcRtpBuilder(ISrsRtpTarget *target, SrsSharedPtr<SrsRtcSource> source);
virtual ~SrsRtcRtpBuilder();
private:
@ -494,7 +492,7 @@ public:
class SrsRtcFrameBuilder
{
private:
ISrsStreamBridge *bridge_;
ISrsFrameTarget *frame_target_;
private:
bool is_first_audio_;
@ -517,7 +515,7 @@ private:
SrsRtpPacket *obs_whip_pps_;
public:
SrsRtcFrameBuilder(ISrsStreamBridge *bridge);
SrsRtcFrameBuilder(ISrsFrameTarget *target);
virtual ~SrsRtcFrameBuilder();
public:

View File

@ -1059,17 +1059,17 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr<SrsLiveSource> source)
// Bridge to RTC streaming.
// TODO: FIXME: Need to convert RTMP to SRT.
SrsCompositeBridge *bridge = new SrsCompositeBridge();
SrsRtmpBridge *bridge = new SrsRtmpBridge();
#if defined(SRS_FFMPEG_FIT)
if (rtc.get() && _srs_config->get_rtc_from_rtmp(req->vhost_)) {
bridge->append(new SrsFrameToRtcBridge(rtc));
bridge->enable_rtmp2rtc(rtc);
}
#endif
#ifdef SRS_RTSP
if (rtsp.get() && _srs_config->get_rtsp_from_rtmp(req->vhost_)) {
bridge->append(new SrsFrameToRtspBridge(rtsp));
bridge->enable_rtmp2rtsp(rtsp);
}
#endif

View File

@ -1706,7 +1706,7 @@ SrsLiveSource::SrsLiveSource()
stream_die_at_ = 0;
publisher_idle_at_ = 0;
bridge_ = NULL;
rtmp_bridge_ = NULL;
play_edge_ = new SrsPlayEdge();
publish_edge_ = new SrsPublishEdge();
@ -1740,7 +1740,7 @@ SrsLiveSource::~SrsLiveSource()
srs_freep(gop_cache_);
srs_freep(req_);
srs_freep(bridge_);
srs_freep(rtmp_bridge_);
SrsContextId cid = _source_id;
if (cid.empty())
@ -1852,10 +1852,10 @@ srs_error_t SrsLiveSource::initialize(SrsSharedPtr<SrsLiveSource> wrapper, ISrsR
return err;
}
void SrsLiveSource::set_bridge(ISrsStreamBridge *v)
void SrsLiveSource::set_bridge(ISrsRtmpBridge *v)
{
srs_freep(bridge_);
bridge_ = v;
srs_freep(rtmp_bridge_);
rtmp_bridge_ = v;
}
srs_error_t SrsLiveSource::on_source_id_changed(SrsContextId id)
@ -2049,7 +2049,7 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsMediaPacket *msg)
}
// For bridge to consume the message.
if (bridge_ && (err = bridge_->on_frame(msg)) != srs_success) {
if (rtmp_bridge_ && (err = rtmp_bridge_->on_frame(msg)) != srs_success) {
return srs_error_wrap(err, "bridge consume audio");
}
@ -2170,7 +2170,7 @@ srs_error_t SrsLiveSource::on_video_imp(SrsMediaPacket *msg)
}
// For bridge to consume the message.
if (bridge_ && (err = bridge_->on_frame(msg)) != srs_success) {
if (rtmp_bridge_ && (err = rtmp_bridge_->on_frame(msg)) != srs_success) {
return srs_error_wrap(err, "bridge consume video");
}
@ -2332,7 +2332,7 @@ srs_error_t SrsLiveSource::on_publish()
return srs_error_wrap(err, "handle publish");
}
if (bridge_ && (err = bridge_->on_publish()) != srs_success) {
if (rtmp_bridge_ && (err = rtmp_bridge_->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridge publish");
}
@ -2383,9 +2383,9 @@ void SrsLiveSource::on_unpublish()
handler->on_unpublish(req_);
if (bridge_) {
bridge_->on_unpublish();
srs_freep(bridge_);
if (rtmp_bridge_) {
rtmp_bridge_->on_unpublish();
srs_freep(rtmp_bridge_);
}
// no consumer, stream is die.

View File

@ -507,7 +507,7 @@ public:
extern SrsLiveSourceManager *_srs_sources;
// The live streaming source.
class SrsLiveSource : public ISrsReloadHandler
class SrsLiveSource : public ISrsReloadHandler, public ISrsFrameTarget
{
friend class SrsOriginHub;
@ -539,7 +539,7 @@ private:
// The time of the packet we just got.
int64_t last_packet_time_;
// The source bridge for other source.
ISrsStreamBridge *bridge_;
ISrsRtmpBridge *rtmp_bridge_;
// The edge control service
SrsPlayEdge *play_edge_;
SrsPublishEdge *publish_edge_;
@ -576,7 +576,7 @@ public:
// Initialize the hls with handlers.
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> wrapper, ISrsRequest *r);
// Bridge to other source, forward packets to it.
void set_bridge(ISrsStreamBridge *v);
void set_bridge(ISrsRtmpBridge *v);
public:
// The source id changed.

View File

@ -503,9 +503,9 @@ void SrsRtspSource::set_video_desc(SrsRtcTrackDescription *video_desc)
video_desc_ = video_desc->copy();
}
SrsRtspRtpBuilder::SrsRtspRtpBuilder(SrsFrameToRtspBridge *bridge, SrsSharedPtr<SrsRtspSource> source)
SrsRtspRtpBuilder::SrsRtspRtpBuilder(ISrsRtpTarget *target, SrsSharedPtr<SrsRtspSource> source)
{
bridge_ = bridge;
rtp_target_ = target;
source_ = source;
req_ = NULL;
@ -746,7 +746,7 @@ srs_error_t SrsRtspRtpBuilder::on_audio(SrsMediaPacket *msg)
return srs_error_new(ERROR_NOT_IMPLEMENTED, "codec %d not implemented", acodec);
}
if ((err = bridge_->on_rtp(pkt.get())) != srs_success) {
if ((err = rtp_target_->on_rtp(pkt.get())) != srs_success) {
return srs_error_wrap(err, "consume audio packet");
}
@ -876,7 +876,7 @@ srs_error_t SrsRtspRtpBuilder::on_video(SrsMediaPacket *msg)
return srs_error_wrap(err, "package stap-a");
}
if ((err = bridge_->on_rtp(pkt.get())) != srs_success) {
if ((err = rtp_target_->on_rtp(pkt.get())) != srs_success) {
return srs_error_wrap(err, "consume sps/pps");
}
}
@ -975,7 +975,7 @@ srs_error_t SrsRtspRtpBuilder::consume_packets(vector<SrsRtpPacket *> &pkts)
// TODO: FIXME: Consume a range of packets.
for (int i = 0; i < (int)pkts.size(); i++) {
SrsRtpPacket *pkt = pkts[i];
if ((err = bridge_->on_rtp(pkt)) != srs_success) {
if ((err = rtp_target_->on_rtp(pkt)) != srs_success) {
err = srs_error_wrap(err, "consume sps/pps");
break;
}

View File

@ -26,7 +26,6 @@ class SrsRtcSourceDescription;
class SrsResourceManager;
class SrsRtspConnection;
class SrsRtpVideoBuilder;
class SrsFrameToRtspBridge;
// The RTSP stream consumer, consume packets from RTSP stream source.
class SrsRtspConsumer
@ -103,7 +102,7 @@ extern SrsRtspSourceManager *_srs_rtsp_sources;
extern SrsResourceManager *_srs_rtsp_manager;
// A Source is a stream, to publish and to play with, binding to SrsRtspPlayStream.
class SrsRtspSource
class SrsRtspSource : public ISrsRtpTarget
{
private:
// For publish, it's the publish client id.
@ -176,7 +175,7 @@ public:
public:
// Consume the shared RTP packet, user must free it.
srs_error_t on_rtp(SrsRtpPacket *pkt);
virtual srs_error_t on_rtp(SrsRtpPacket *pkt);
public:
SrsRtcTrackDescription *audio_desc();
@ -190,7 +189,7 @@ class SrsRtspRtpBuilder
{
private:
ISrsRequest *req_;
SrsFrameToRtspBridge *bridge_;
ISrsRtpTarget *rtp_target_;
// The format, codec information.
SrsRtmpFormat *format_;
// The metadata cache.
@ -211,7 +210,7 @@ private:
bool video_initialized_;
public:
SrsRtspRtpBuilder(SrsFrameToRtspBridge *bridge, SrsSharedPtr<SrsRtspSource> source);
SrsRtspRtpBuilder(ISrsRtpTarget *target, SrsSharedPtr<SrsRtspSource> source);
virtual ~SrsRtspRtpBuilder();
private:

View File

@ -413,25 +413,26 @@ srs_error_t SrsMpegtsSrtConn::acquire_publish()
}
}
// Bridge to RTMP and RTC streaming.
SrsSrtBridge *bridge = new SrsSrtBridge();
if (_srs_config->get_srt_to_rtmp(req_->vhost_)) {
// Bridge to RTMP and RTC streaming.
SrsCompositeBridge *bridge = new SrsCompositeBridge();
bridge->append(new SrsFrameToRtmpBridge(live_source));
#if defined(SRS_FFMPEG_FIT)
if (rtc.get() && _srs_config->get_rtc_from_rtmp(req_->vhost_)) {
bridge->append(new SrsFrameToRtcBridge(rtc));
}
#endif
if ((err = bridge->initialize(req_)) != srs_success) {
srs_freep(bridge);
return srs_error_wrap(err, "create bridge");
}
srt_source_->set_bridge(bridge);
bridge->enable_srt2rtmp(live_source);
}
if (rtc.get() && _srs_config->get_rtc_from_rtmp(req_->vhost_)) {
bridge->enable_srt2rtc(rtc);
}
if (bridge->empty()) {
srs_freep(bridge);
} else if ((err = bridge->initialize(req_)) != srs_success) {
srs_freep(bridge);
return srs_error_wrap(err, "bridge init");
}
srt_source_->set_bridge(bridge);
if ((err = srt_source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "srt source publish");
}

View File

@ -291,7 +291,7 @@ void SrsSrtConsumer::wait(int nb_msgs, srs_utime_t timeout)
srs_cond_timedwait(mw_wait_, timeout);
}
SrsSrtFrameBuilder::SrsSrtFrameBuilder(ISrsStreamBridge *bridge)
SrsSrtFrameBuilder::SrsSrtFrameBuilder(ISrsFrameTarget *target)
{
ts_ctx_ = new SrsTsContext();
@ -300,7 +300,7 @@ SrsSrtFrameBuilder::SrsSrtFrameBuilder(ISrsStreamBridge *bridge)
pps_ = "";
req_ = NULL;
bridge_ = bridge;
frame_target_ = target;
video_streamid_ = 1;
audio_streamid_ = 2;
@ -510,7 +510,7 @@ srs_error_t SrsSrtFrameBuilder::check_sps_pps_change(SrsTsMessage *msg)
SrsMediaPacket frame;
rtmp.to_msg(&frame);
if ((err = bridge_->on_frame(&frame)) != srs_success) {
if ((err = frame_target_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err, "srt to rtmp sps/pps");
}
@ -568,7 +568,7 @@ srs_error_t SrsSrtFrameBuilder::on_h264_frame(SrsTsMessage *msg, vector<pair<cha
SrsMediaPacket frame;
rtmp.to_msg(&frame);
if ((err = bridge_->on_frame(&frame)) != srs_success) {
if ((err = frame_target_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err, "srt ts video to rtmp");
}
@ -703,7 +703,7 @@ srs_error_t SrsSrtFrameBuilder::check_vps_sps_pps_change(SrsTsMessage *msg)
SrsMediaPacket frame;
rtmp.to_msg(&frame);
if ((err = bridge_->on_frame(&frame)) != srs_success) {
if ((err = frame_target_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err, "srt to rtmp vps/sps/pps");
}
@ -759,7 +759,7 @@ srs_error_t SrsSrtFrameBuilder::on_hevc_frame(SrsTsMessage *msg, vector<pair<cha
SrsMediaPacket frame;
rtmp.to_msg(&frame);
if ((err = bridge_->on_frame(&frame)) != srs_success) {
if ((err = frame_target_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err, "srt ts hevc video to rtmp");
}
@ -873,7 +873,7 @@ srs_error_t SrsSrtFrameBuilder::check_audio_sh_change(SrsTsMessage *msg, uint32_
SrsMediaPacket frame;
rtmp.to_msg(&frame);
if ((err = bridge_->on_frame(&frame)) != srs_success) {
if ((err = frame_target_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err, "srt to rtmp audio sh");
}
@ -901,7 +901,7 @@ srs_error_t SrsSrtFrameBuilder::on_aac_frame(SrsTsMessage *msg, uint32_t pts, ch
SrsMediaPacket frame;
rtmp.to_msg(&frame);
if ((err = bridge_->on_frame(&frame)) != srs_success) {
if ((err = frame_target_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err, "srt to rtmp audio sh");
}
@ -912,8 +912,7 @@ SrsSrtSource::SrsSrtSource()
{
req_ = NULL;
can_publish_ = true;
frame_builder_ = NULL;
bridge_ = NULL;
srt_bridge_ = NULL;
stream_die_at_ = 0;
}
@ -923,8 +922,7 @@ SrsSrtSource::~SrsSrtSource()
// for all consumers are auto free.
consumers_.clear();
srs_freep(frame_builder_);
srs_freep(bridge_);
srs_freep(srt_bridge_);
srs_freep(req_);
SrsContextId cid = _source_id;
@ -1013,13 +1011,10 @@ void SrsSrtSource::update_auth(ISrsRequest *r)
req_->update_auth(r);
}
void SrsSrtSource::set_bridge(ISrsStreamBridge *bridge)
void SrsSrtSource::set_bridge(ISrsSrtBridge *bridge)
{
srs_freep(bridge_);
bridge_ = bridge;
srs_freep(frame_builder_);
frame_builder_ = new SrsSrtFrameBuilder(bridge);
srs_freep(srt_bridge_);
srt_bridge_ = bridge;
}
srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer *&consumer)
@ -1073,18 +1068,8 @@ srs_error_t SrsSrtSource::on_publish()
return srs_error_wrap(err, "source id change");
}
if (bridge_) {
if ((err = frame_builder_->initialize(req_)) != srs_success) {
return srs_error_wrap(err, "frame builder initialize");
}
if ((err = frame_builder_->on_publish()) != srs_success) {
return srs_error_wrap(err, "frame builder on publish");
}
if ((err = bridge_->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridge on publish");
}
if (srt_bridge_ && (err = srt_bridge_->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridge on publish");
}
SrsStatistic *stat = SrsStatistic::instance();
@ -1105,12 +1090,9 @@ void SrsSrtSource::on_unpublish()
SrsStatistic *stat = SrsStatistic::instance();
stat->on_stream_close(req_);
if (bridge_) {
frame_builder_->on_unpublish();
srs_freep(frame_builder_);
bridge_->on_unpublish();
srs_freep(bridge_);
if (srt_bridge_) {
srt_bridge_->on_unpublish();
srs_freep(srt_bridge_);
}
// Destroy and cleanup source when no publishers and consumers.
@ -1130,7 +1112,7 @@ srs_error_t SrsSrtSource::on_packet(SrsSrtPacket *packet)
}
}
if (frame_builder_ && (err = frame_builder_->on_packet(packet)) != srs_success) {
if (srt_bridge_ && (err = srt_bridge_->on_packet(packet)) != srs_success) {
return srs_error_wrap(err, "bridge consume message");
}

View File

@ -117,7 +117,7 @@ public:
class SrsSrtFrameBuilder : public ISrsTsHandler
{
public:
SrsSrtFrameBuilder(ISrsStreamBridge *bridge);
SrsSrtFrameBuilder(ISrsFrameTarget *target);
virtual ~SrsSrtFrameBuilder();
public:
@ -143,7 +143,7 @@ private:
srs_error_t on_hevc_frame(SrsTsMessage *msg, std::vector<std::pair<char *, int> > &ipb_frames);
private:
ISrsStreamBridge *bridge_;
ISrsFrameTarget *frame_target_;
private:
SrsTsContext *ts_ctx_;
@ -171,7 +171,8 @@ private:
SrsAlonePithyPrint *pp_audio_duration_;
};
class SrsSrtSource
// A SRT source is a stream, to publish and to play with.
class SrsSrtSource : public ISrsSrtTarget
{
public:
SrsSrtSource();
@ -194,7 +195,7 @@ public:
virtual void update_auth(ISrsRequest *r);
public:
void set_bridge(ISrsStreamBridge *bridge);
void set_bridge(ISrsSrtBridge *bridge);
public:
// Create consumer
@ -226,8 +227,7 @@ private:
srs_utime_t stream_die_at_;
private:
SrsSrtFrameBuilder *frame_builder_;
ISrsStreamBridge *bridge_;
ISrsSrtBridge *srt_bridge_;
};
#endif

View File

@ -9,6 +9,7 @@
#include <srs_app_config.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_rtmp_source.hpp>
#include <srs_app_srt_source.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_rtc_rtp.hpp>
#include <srs_protocol_format.hpp>
@ -16,243 +17,434 @@
#ifdef SRS_RTSP
#include <srs_app_rtsp_source.hpp>
#endif
#include <srs_app_rtc_source.hpp>
#include <vector>
using namespace std;
ISrsStreamBridge::ISrsStreamBridge()
ISrsFrameTarget::ISrsFrameTarget()
{
}
ISrsStreamBridge::~ISrsStreamBridge()
ISrsFrameTarget::~ISrsFrameTarget()
{
}
SrsFrameToRtmpBridge::SrsFrameToRtmpBridge(SrsSharedPtr<SrsLiveSource> source)
{
source_ = source;
}
SrsFrameToRtmpBridge::~SrsFrameToRtmpBridge()
ISrsRtpTarget::ISrsRtpTarget()
{
}
srs_error_t SrsFrameToRtmpBridge::initialize(ISrsRequest *r)
ISrsRtpTarget::~ISrsRtpTarget()
{
return srs_success;
}
srs_error_t SrsFrameToRtmpBridge::on_publish()
ISrsSrtTarget::ISrsSrtTarget()
{
srs_error_t err = srs_success;
// TODO: FIXME: Should sync with bridge?
if ((err = source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "source publish");
}
return err;
}
void SrsFrameToRtmpBridge::on_unpublish()
ISrsSrtTarget::~ISrsSrtTarget()
{
// TODO: FIXME: Should sync with bridge?
source_->on_unpublish();
}
srs_error_t SrsFrameToRtmpBridge::on_frame(SrsMediaPacket *frame)
ISrsRtmpBridge::ISrsRtmpBridge()
{
return source_->on_frame(frame);
}
SrsFrameToRtcBridge::SrsFrameToRtcBridge(SrsSharedPtr<SrsRtcSource> source)
ISrsRtmpBridge::~ISrsRtmpBridge()
{
source_ = source;
}
#if defined(SRS_FFMPEG_FIT)
// Use lazy initialization - no need to determine codec/track parameters here
rtp_builder_ = new SrsRtcRtpBuilder(this, source);
SrsRtmpBridge::SrsRtmpBridge()
{
#ifdef SRS_FFMPEG_FIT
rtp_builder_ = NULL;
#endif
#ifdef SRS_RTSP
rtsp_builder_ = NULL;
#endif
rtc_target_ = NULL;
rtsp_target_ = NULL;
}
SrsFrameToRtcBridge::~SrsFrameToRtcBridge()
SrsRtmpBridge::~SrsRtmpBridge()
{
#ifdef SRS_FFMPEG_FIT
srs_freep(rtp_builder_);
#endif
}
srs_error_t SrsFrameToRtcBridge::initialize(ISrsRequest *r)
{
#ifdef SRS_FFMPEG_FIT
return rtp_builder_->initialize(r);
#else
return srs_success;
#ifdef SRS_RTSP
srs_freep(rtsp_builder_);
#endif
rtc_target_ = NULL;
rtsp_target_ = NULL;
}
srs_error_t SrsFrameToRtcBridge::on_publish()
bool SrsRtmpBridge::empty()
{
return !rtc_target_.get() || !rtsp_target_.get();
}
void SrsRtmpBridge::enable_rtmp2rtc(SrsSharedPtr<SrsRtcSource> rtc_source)
{
rtc_target_ = rtc_source;
}
void SrsRtmpBridge::enable_rtmp2rtsp(SrsSharedPtr<SrsRtspSource> rtsp_source)
{
rtsp_target_ = rtsp_source;
}
srs_error_t SrsRtmpBridge::initialize(ISrsRequest *r)
{
srs_error_t err = srs_success;
// TODO: FIXME: Should sync with bridge?
if ((err = source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "source publish");
}
#ifdef SRS_FFMPEG_FIT
if ((err = rtp_builder_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtp builder publish");
if (rtc_target_.get()) {
srs_freep(rtp_builder_);
rtp_builder_ = new SrsRtcRtpBuilder(rtc_target_.get(), rtc_target_);
if ((err = rtp_builder_->initialize(r)) != srs_success) {
return srs_error_wrap(err, "rtp builder initialize");
}
}
#endif
return err;
}
void SrsFrameToRtcBridge::on_unpublish()
{
#ifdef SRS_FFMPEG_FIT
rtp_builder_->on_unpublish();
#endif
// @remark This bridge might be disposed here, so never use it.
// TODO: FIXME: Should sync with bridge?
source_->on_unpublish();
}
srs_error_t SrsFrameToRtcBridge::on_frame(SrsMediaPacket *frame)
{
#ifdef SRS_FFMPEG_FIT
return rtp_builder_->on_frame(frame);
#else
return srs_success;
#endif
}
srs_error_t SrsFrameToRtcBridge::on_rtp(SrsRtpPacket *pkt)
{
return source_->on_rtp(pkt);
}
#ifdef SRS_RTSP
SrsFrameToRtspBridge::SrsFrameToRtspBridge(SrsSharedPtr<SrsRtspSource> source)
{
source_ = source;
// Use lazy initialization - no need to determine codec/track parameters here
rtp_builder_ = new SrsRtspRtpBuilder(this, source);
}
SrsFrameToRtspBridge::~SrsFrameToRtspBridge()
{
srs_freep(rtp_builder_);
}
srs_error_t SrsFrameToRtspBridge::initialize(ISrsRequest *r)
{
return rtp_builder_->initialize(r);
}
srs_error_t SrsFrameToRtspBridge::on_publish()
{
srs_error_t err = srs_success;
// TODO: FIXME: Should sync with bridge?
if ((err = source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "source publish");
if (rtsp_target_.get()) {
srs_freep(rtsp_builder_);
rtsp_builder_ = new SrsRtspRtpBuilder(rtsp_target_.get(), rtsp_target_);
if ((err = rtsp_builder_->initialize(r)) != srs_success) {
return srs_error_wrap(err, "rtsp builder initialize");
}
}
if ((err = rtp_builder_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtp builder publish");
}
return err;
}
void SrsFrameToRtspBridge::on_unpublish()
{
rtp_builder_->on_unpublish();
// @remark This bridge might be disposed here, so never use it.
// TODO: FIXME: Should sync with bridge?
source_->on_unpublish();
}
srs_error_t SrsFrameToRtspBridge::on_frame(SrsMediaPacket *frame)
{
return rtp_builder_->on_frame(frame);
}
srs_error_t SrsFrameToRtspBridge::on_rtp(SrsRtpPacket *pkt)
{
return source_->on_rtp(pkt);
}
#endif
SrsCompositeBridge::SrsCompositeBridge()
{
return err;
}
SrsCompositeBridge::~SrsCompositeBridge()
{
for (vector<ISrsStreamBridge *>::iterator it = bridges_.begin(); it != bridges_.end(); ++it) {
ISrsStreamBridge *bridge = *it;
srs_freep(bridge);
}
}
srs_error_t SrsCompositeBridge::initialize(ISrsRequest *r)
srs_error_t SrsRtmpBridge::on_publish()
{
srs_error_t err = srs_success;
for (vector<ISrsStreamBridge *>::iterator it = bridges_.begin(); it != bridges_.end(); ++it) {
ISrsStreamBridge *bridge = *it;
if ((err = bridge->initialize(r)) != srs_success) {
return err;
// TODO: FIXME: Should sync with bridge?
if (rtc_target_.get()) {
if ((err = rtc_target_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtc target publish");
}
#ifdef SRS_FFMPEG_FIT
if ((err = rtp_builder_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtp builder publish");
}
#endif
}
#ifdef SRS_RTSP
// TODO: FIXME: Should sync with bridge?
if (rtsp_target_.get()) {
if ((err = rtsp_target_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtsp target publish");
}
if ((err = rtsp_builder_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtsp builder publish");
}
}
#endif
return err;
}
void SrsRtmpBridge::on_unpublish()
{
if (rtc_target_.get()) {
#ifdef SRS_FFMPEG_FIT
rtp_builder_->on_unpublish();
#endif
rtc_target_->on_unpublish();
}
#ifdef SRS_RTSP
if (rtsp_target_.get()) {
rtsp_builder_->on_unpublish();
rtsp_target_->on_unpublish();
}
#endif
// Note that RTMP live source free this rtmp bridge, after on_unpublish() is called.
// So there is no need to free its components here.
}
srs_error_t SrsRtmpBridge::on_frame(SrsMediaPacket *frame)
{
srs_error_t err = srs_success;
#ifdef SRS_FFMPEG_FIT
if (rtp_builder_ && (err = rtp_builder_->on_frame(frame)) != srs_success) {
return srs_error_wrap(err, "rtp builder on frame");
}
#endif
#ifdef SRS_RTSP
if (rtsp_builder_ && (err = rtsp_builder_->on_frame(frame)) != srs_success) {
return srs_error_wrap(err, "rtsp builder on frame");
}
#endif
return err;
}
ISrsSrtBridge::ISrsSrtBridge()
{
}
ISrsSrtBridge::~ISrsSrtBridge()
{
}
SrsSrtBridge::SrsSrtBridge()
{
frame_builder_ = new SrsSrtFrameBuilder(this);
rtmp_target_ = NULL;
#ifdef SRS_FFMPEG_FIT
rtp_builder_ = NULL;
#endif
rtc_target_ = NULL;
}
SrsSrtBridge::~SrsSrtBridge()
{
rtmp_target_ = NULL;
srs_freep(frame_builder_);
rtc_target_ = NULL;
#ifdef SRS_FFMPEG_FIT
srs_freep(rtp_builder_);
#endif
}
bool SrsSrtBridge::empty()
{
return !rtmp_target_.get() && !rtc_target_.get();
}
void SrsSrtBridge::enable_srt2rtmp(SrsSharedPtr<SrsLiveSource> rtmp_source)
{
rtmp_target_ = rtmp_source;
}
void SrsSrtBridge::enable_srt2rtc(SrsSharedPtr<SrsRtcSource> rtc_source)
{
rtc_target_ = rtc_source;
}
srs_error_t SrsSrtBridge::initialize(ISrsRequest *r)
{
srs_error_t err = srs_success;
if ((err = frame_builder_->initialize(r)) != srs_success) {
return srs_error_wrap(err, "frame builder initialize");
}
#ifdef SRS_FFMPEG_FIT
if (rtc_target_.get()) {
srs_freep(rtp_builder_);
rtp_builder_ = new SrsRtcRtpBuilder(rtc_target_.get(), rtc_target_);
if ((err = rtp_builder_->initialize(r)) != srs_success) {
return srs_error_wrap(err, "rtp builder initialize");
}
}
#endif
return err;
}
srs_error_t SrsSrtBridge::on_publish()
{
srs_error_t err = srs_success;
if ((err = frame_builder_->on_publish()) != srs_success) {
return srs_error_wrap(err, "frame builder publish");
}
// TODO: FIXME: Should sync with bridge?
if (rtmp_target_.get()) {
if ((err = rtmp_target_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtmp target publish");
}
}
if (rtc_target_.get()) {
if ((err = rtc_target_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtc target publish");
}
#ifdef SRS_FFMPEG_FIT
if ((err = rtp_builder_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtp builder publish");
}
#endif
}
return err;
}
srs_error_t SrsCompositeBridge::on_publish()
void SrsSrtBridge::on_unpublish()
{
frame_builder_->on_unpublish();
if (rtmp_target_.get()) {
rtmp_target_->on_unpublish();
}
if (rtc_target_.get()) {
#ifdef SRS_FFMPEG_FIT
rtp_builder_->on_unpublish();
#endif
rtc_target_->on_unpublish();
}
// Note that SRT source free this srt bridge, after on_unpublish() is called.
// So there is no need to free its components here.
}
srs_error_t SrsSrtBridge::on_packet(SrsSrtPacket *pkt)
{
srs_error_t err = srs_success;
for (vector<ISrsStreamBridge *>::iterator it = bridges_.begin(); it != bridges_.end(); ++it) {
ISrsStreamBridge *bridge = *it;
if ((err = bridge->on_publish()) != srs_success) {
return err;
}
if ((err = frame_builder_->on_packet(pkt)) != srs_success) {
return srs_error_wrap(err, "frame builder on packet");
}
return err;
}
void SrsCompositeBridge::on_unpublish()
{
for (vector<ISrsStreamBridge *>::iterator it = bridges_.begin(); it != bridges_.end(); ++it) {
ISrsStreamBridge *bridge = *it;
bridge->on_unpublish();
}
}
srs_error_t SrsCompositeBridge::on_frame(SrsMediaPacket *frame)
srs_error_t SrsSrtBridge::on_frame(SrsMediaPacket *frame)
{
srs_error_t err = srs_success;
for (vector<ISrsStreamBridge *>::iterator it = bridges_.begin(); it != bridges_.end(); ++it) {
ISrsStreamBridge *bridge = *it;
if ((err = bridge->on_frame(frame)) != srs_success) {
return err;
}
// Deliver frame to RTMP target
if (rtmp_target_.get() && (err = rtmp_target_->on_frame(frame)) != srs_success) {
return srs_error_wrap(err, "rtmp target on frame");
}
// Deliver frame to RTP builder, which delivers to RTC target
#ifdef SRS_FFMPEG_FIT
if (rtp_builder_ && (err = rtp_builder_->on_frame(frame)) != srs_success) {
return srs_error_wrap(err, "rtp builder on frame");
}
#endif
return err;
}
SrsCompositeBridge *SrsCompositeBridge::append(ISrsStreamBridge *bridge)
ISrsRtcBridge::ISrsRtcBridge()
{
bridges_.push_back(bridge);
return this;
}
ISrsRtcBridge::~ISrsRtcBridge()
{
}
SrsRtcBridge::SrsRtcBridge()
{
req_ = NULL;
#ifdef SRS_FFMPEG_FIT
frame_builder_ = NULL;
#endif
rtmp_target_ = NULL;
}
SrsRtcBridge::~SrsRtcBridge()
{
srs_freep(req_);
#ifdef SRS_FFMPEG_FIT
srs_freep(frame_builder_);
#endif
rtmp_target_ = NULL;
}
void SrsRtcBridge::enable_rtc2rtmp(SrsSharedPtr<SrsLiveSource> rtmp_target)
{
rtmp_target_ = rtmp_target;
}
bool SrsRtcBridge::empty()
{
return !rtmp_target_.get();
}
srs_error_t SrsRtcBridge::initialize(ISrsRequest *r)
{
srs_error_t err = srs_success;
srs_freep(req_);
req_ = r->copy();
#ifdef SRS_FFMPEG_FIT
srs_assert(rtmp_target_.get());
srs_freep(frame_builder_);
frame_builder_ = new SrsRtcFrameBuilder(rtmp_target_.get());
#endif
return err;
}
srs_error_t SrsRtcBridge::setup_codec(SrsAudioCodecId acodec, SrsVideoCodecId vcodec)
{
srs_error_t err = srs_success;
#ifdef SRS_FFMPEG_FIT
srs_assert(frame_builder_);
if ((err = frame_builder_->initialize(req_, acodec, vcodec)) != srs_success) {
return srs_error_wrap(err, "frame builder initialize");
}
#endif
return err;
}
srs_error_t SrsRtcBridge::on_publish()
{
srs_error_t err = srs_success;
srs_assert(rtmp_target_.get());
if ((err = rtmp_target_->on_publish()) != srs_success) {
return srs_error_wrap(err, "rtmp target publish");
}
#ifdef SRS_FFMPEG_FIT
srs_assert(frame_builder_);
if ((err = frame_builder_->on_publish()) != srs_success) {
return srs_error_wrap(err, "frame builder on publish");
}
#endif
return err;
}
void SrsRtcBridge::on_unpublish()
{
#ifdef SRS_FFMPEG_FIT
srs_assert(frame_builder_);
frame_builder_->on_unpublish();
#endif
srs_assert(rtmp_target_.get());
rtmp_target_->on_unpublish();
// Note that RTC source free this rtc bridge, after on_unpublish() is called.
// So there is no need to free its components here.
}
srs_error_t SrsRtcBridge::on_rtp(SrsRtpPacket *pkt)
{
srs_error_t err = srs_success;
#ifdef SRS_FFMPEG_FIT
if (frame_builder_ && (err = frame_builder_->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "frame builder on rtp");
}
#endif
return err;
}

View File

@ -24,113 +24,185 @@ class SrsRtpPacket;
class SrsRtcRtpBuilder;
class SrsRtspSource;
class SrsRtspRtpBuilder;
class SrsRtcFrameBuilder;
class ISrsStreamBridge;
class SrsSrtFrameBuilder;
class SrsSrtPacket;
// A stream bridge is used to convert stream via different protocols, such as bridge for RTMP and RTC. Generally, we use
// frame as message for bridge. A frame is a audio or video frame, such as an I/B/P frame, a general frame for decoder.
// So you must assemble RTP or TS packets to a video frame if WebRTC or SRT.
class ISrsStreamBridge
// A target to feed AV frame, such as a RTMP live source, or a RTMP bridge
// that take frame and converts to RTC packets, or a SRT bridge that converts
// SRT packets to media frames then delivers to RTMP or RTC targets.
class ISrsFrameTarget
{
public:
ISrsStreamBridge();
virtual ~ISrsStreamBridge();
ISrsFrameTarget();
virtual ~ISrsFrameTarget();
public:
virtual srs_error_t on_frame(SrsMediaPacket *frame) = 0;
};
// A target to feed RTP packets, such as a RTC source, or a RTC bridge that
// take RTP packets and converts to AV frames.
class ISrsRtpTarget
{
public:
ISrsRtpTarget();
virtual ~ISrsRtpTarget();
public:
virtual srs_error_t on_rtp(SrsRtpPacket *pkt) = 0;
};
// A target to feed SRT packets, such as a SRT source, or a SRT bridge that
// take SRT packets and converts to AV frames.
class ISrsSrtTarget
{
public:
ISrsSrtTarget();
virtual ~ISrsSrtTarget();
public:
virtual srs_error_t on_packet(SrsSrtPacket *pkt) = 0;
};
// A RTMP bridge is used to convert RTMP stream to different protocols,
// such as bridge to RTC and RTSP.
class ISrsRtmpBridge : public ISrsFrameTarget
{
public:
ISrsRtmpBridge();
virtual ~ISrsRtmpBridge();
public:
virtual srs_error_t initialize(ISrsRequest *r) = 0;
virtual srs_error_t on_publish() = 0;
virtual srs_error_t on_frame(SrsMediaPacket *frame) = 0;
virtual void on_unpublish() = 0;
};
// A bridge to feed AV frame to RTMP stream.
class SrsFrameToRtmpBridge : public ISrsStreamBridge
// A RTMP bridge to convert RTMP stream to different protocols, such as RTC and RTSP.
// First, it use a RTP builder to convert RTMP frame to RTP packets.
// Then, deliver the RTP packets to RTP target, which binds to a RTC/RTSP source.
class SrsRtmpBridge : public ISrsRtmpBridge
{
private:
SrsSharedPtr<SrsLiveSource> source_;
public:
SrsFrameToRtmpBridge(SrsSharedPtr<SrsLiveSource> source);
virtual ~SrsFrameToRtmpBridge();
public:
srs_error_t initialize(ISrsRequest *r);
public:
virtual srs_error_t on_publish();
virtual void on_unpublish();
public:
virtual srs_error_t on_frame(SrsMediaPacket *frame);
};
// A bridge to covert AV frame to WebRTC stream.
class SrsFrameToRtcBridge : public ISrsStreamBridge
{
private:
SrsSharedPtr<SrsRtcSource> source_;
private:
#if defined(SRS_FFMPEG_FIT)
#ifdef SRS_FFMPEG_FIT
SrsRtcRtpBuilder *rtp_builder_;
#endif
public:
SrsFrameToRtcBridge(SrsSharedPtr<SrsRtcSource> source);
virtual ~SrsFrameToRtcBridge();
public:
virtual srs_error_t initialize(ISrsRequest *r);
virtual srs_error_t on_publish();
virtual void on_unpublish();
virtual srs_error_t on_frame(SrsMediaPacket *frame);
srs_error_t on_rtp(SrsRtpPacket *pkt);
};
#ifdef SRS_RTSP
// A bridge to covert AV frame to RTSP stream.
class SrsFrameToRtspBridge : public ISrsStreamBridge
{
private:
SrsSharedPtr<SrsRtspSource> source_;
private:
SrsRtspRtpBuilder *rtp_builder_;
SrsRtspRtpBuilder *rtsp_builder_;
#endif
// The Source bridge, bridge stream to other source.
SrsSharedPtr<SrsRtcSource> rtc_target_;
SrsSharedPtr<SrsRtspSource> rtsp_target_;
public:
SrsFrameToRtspBridge(SrsSharedPtr<SrsRtspSource> source);
virtual ~SrsFrameToRtspBridge();
SrsRtmpBridge();
virtual ~SrsRtmpBridge();
public:
bool empty();
void enable_rtmp2rtc(SrsSharedPtr<SrsRtcSource> rtc_source);
void enable_rtmp2rtsp(SrsSharedPtr<SrsRtspSource> rtsp_source);
public:
virtual srs_error_t initialize(ISrsRequest *r);
virtual srs_error_t on_publish();
virtual void on_unpublish();
virtual srs_error_t on_frame(SrsMediaPacket *frame);
srs_error_t on_rtp(SrsRtpPacket *pkt);
};
#endif
// A bridge chain, a set of bridges.
class SrsCompositeBridge : public ISrsStreamBridge
// A SRT bridge is used to convert SRT stream to different protocols,
// such as bridge to RTMP and RTC.
class ISrsSrtBridge : public ISrsSrtTarget
{
public:
SrsCompositeBridge();
virtual ~SrsCompositeBridge();
ISrsSrtBridge();
virtual ~ISrsSrtBridge();
public:
bool empty() { return bridges_.empty(); } // SrsCompositeBridge::empty()
public:
srs_error_t initialize(ISrsRequest *r);
virtual srs_error_t initialize(ISrsRequest *r) = 0;
virtual srs_error_t on_publish() = 0;
virtual void on_unpublish() = 0;
};
// A SRT bridge to convert SRT stream to different protocols, such as RTMP and RTC.
// First, it use a frame builder to convert SRT TS packets to AV frames.
// Then, deliver the AV frames to frame target, which binds to a RTMP/RTC source.
class SrsSrtBridge : public ISrsSrtBridge, public ISrsFrameTarget
{
private:
// Convert SRT TS packets to media frame packets.
SrsSrtFrameBuilder *frame_builder_;
// Deliver media frame packets to RTMP target.
SrsSharedPtr<SrsLiveSource> rtmp_target_;
// Convert media frame packets to RTP packets.
#ifdef SRS_FFMPEG_FIT
SrsRtcRtpBuilder *rtp_builder_;
#endif
// Deliver RTP packets to RTC target.
SrsSharedPtr<SrsRtcSource> rtc_target_;
public:
SrsSrtBridge();
virtual ~SrsSrtBridge();
public:
bool empty();
void enable_srt2rtmp(SrsSharedPtr<SrsLiveSource> rtmp_source);
void enable_srt2rtc(SrsSharedPtr<SrsRtcSource> rtc_source);
public:
virtual srs_error_t initialize(ISrsRequest *r);
virtual srs_error_t on_publish();
virtual void on_unpublish();
public:
virtual srs_error_t on_packet(SrsSrtPacket *pkt);
virtual srs_error_t on_frame(SrsMediaPacket *frame);
};
// A RTC RTP bridge is used to convert RTP packets to different protocols,
// such as bridge to RTMP.
class ISrsRtcBridge : public ISrsRtpTarget
{
public:
ISrsRtcBridge();
virtual ~ISrsRtcBridge();
public:
SrsCompositeBridge *append(ISrsStreamBridge *bridge);
virtual srs_error_t initialize(ISrsRequest *r) = 0;
virtual srs_error_t setup_codec(SrsAudioCodecId acodec, SrsVideoCodecId vcodec) = 0;
virtual srs_error_t on_publish() = 0;
virtual void on_unpublish() = 0;
};
// A RTC bridge to convert RTP packets to RTMP stream.
// First, it use a frame builder to convert RTP packets to RTMP frame packet.
// Then, deliver the RTMP frame packet to RTMP target, which binds to a live source.
class SrsRtcBridge : public ISrsRtcBridge
{
private:
std::vector<ISrsStreamBridge *> bridges_;
ISrsRequest *req_;
#ifdef SRS_FFMPEG_FIT
// Collect and build WebRTC RTP packets to AV frames.
SrsRtcFrameBuilder *frame_builder_;
#endif
// The Source bridge, bridge stream to other source.
SrsSharedPtr<SrsLiveSource> rtmp_target_;
public:
SrsRtcBridge();
virtual ~SrsRtcBridge();
public:
bool empty();
void enable_rtc2rtmp(SrsSharedPtr<SrsLiveSource> rtmp_target);
public:
virtual srs_error_t initialize(ISrsRequest *r);
virtual srs_error_t setup_codec(SrsAudioCodecId acodec, SrsVideoCodecId vcodec);
virtual srs_error_t on_publish();
virtual void on_unpublish();
virtual srs_error_t on_rtp(SrsRtpPacket *pkt);
};
#endif

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7
#define VERSION_MINOR 0
#define VERSION_REVISION 89
#define VERSION_REVISION 90
#endif

View File

@ -6,8 +6,8 @@
#include <srs_utest_app2.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_circuit_breaker.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_codec.hpp>
@ -241,39 +241,6 @@ public:
}
};
#ifdef SRS_FFMPEG_FIT
// Mock implementation of SrsRtcFrameBuilder for testing
class MockRtcFrameBuilder : public SrsRtcFrameBuilder
{
public:
int on_rtp_count_;
srs_error_t on_rtp_error_;
MockRtcFrameBuilder(ISrsStreamBridge *bridge) : SrsRtcFrameBuilder(bridge)
{
on_rtp_count_ = 0;
on_rtp_error_ = srs_success;
}
virtual ~MockRtcFrameBuilder()
{
srs_freep(on_rtp_error_);
}
virtual srs_error_t on_rtp(SrsRtpPacket *pkt)
{
on_rtp_count_++;
return srs_error_copy(on_rtp_error_);
}
void set_on_rtp_error(srs_error_t err)
{
srs_freep(on_rtp_error_);
on_rtp_error_ = srs_error_copy(err);
}
};
#endif
VOID TEST(AppTest2, AacRawAppendAdtsHeaderSequenceHeader)
{
srs_error_t err;
@ -1642,74 +1609,6 @@ VOID TEST(AppTest2, RtcSourceOnConsumerDestroyStreamAlive)
srs_freep(consumer);
}
// Mock implementation of ISrsStreamBridge for testing SrsRtcSource::on_publish
class MockStreamBridge : public ISrsStreamBridge
{
public:
int initialize_count_;
int on_publish_count_;
int on_unpublish_count_;
int on_frame_count_;
bool should_fail_initialize_;
bool should_fail_on_publish_;
ISrsRequest *last_initialize_request_;
MockStreamBridge()
{
initialize_count_ = 0;
on_publish_count_ = 0;
on_unpublish_count_ = 0;
on_frame_count_ = 0;
should_fail_initialize_ = false;
should_fail_on_publish_ = false;
last_initialize_request_ = NULL;
}
virtual ~MockStreamBridge()
{
}
virtual srs_error_t initialize(ISrsRequest *r)
{
initialize_count_++;
last_initialize_request_ = r;
if (should_fail_initialize_) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "mock initialize error");
}
return srs_success;
}
virtual srs_error_t on_publish()
{
on_publish_count_++;
if (should_fail_on_publish_) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "mock on_publish error");
}
return srs_success;
}
virtual srs_error_t on_frame(SrsMediaPacket *frame)
{
on_frame_count_++;
return srs_success;
}
virtual void on_unpublish()
{
on_unpublish_count_++;
}
void set_initialize_should_fail(bool should_fail)
{
should_fail_initialize_ = should_fail;
}
void set_on_publish_should_fail(bool should_fail)
{
should_fail_on_publish_ = should_fail;
}
};
VOID TEST(AppTest2, RtcSourceOnPublishBasicSuccess)
{
srs_error_t err;
@ -1778,79 +1677,6 @@ VOID TEST(AppTest2, RtcSourceOnPublishWithConsumers)
srs_freep(consumer2);
}
VOID TEST(AppTest2, RtcSourceOnPublishWithBridge)
{
srs_error_t err;
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->host_ = "localhost";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "test";
// Create RTC source and initialize
SrsUniquePtr<SrsRtcSource> source(new SrsRtcSource());
HELPER_EXPECT_SUCCESS(source->initialize(req.get()));
// Create mock bridge
MockStreamBridge *bridge = new MockStreamBridge();
source->set_bridge(bridge);
// Verify initial bridge state
EXPECT_EQ(0, bridge->on_publish_count_);
// Test on_publish with bridge
HELPER_EXPECT_SUCCESS(source->on_publish());
// Verify bridge was called
EXPECT_EQ(1, bridge->on_publish_count_);
// Verify source state
EXPECT_TRUE(source->is_created_);
EXPECT_TRUE(source->is_delivering_packets_);
// Clean up properly by calling on_unpublish to unsubscribe from timer
source->on_unpublish();
// Note: bridge is freed by source destructor, don't free manually
}
VOID TEST(AppTest2, RtcSourceOnPublishBridgeError)
{
srs_error_t err;
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->host_ = "localhost";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "test";
// Create RTC source and initialize
SrsUniquePtr<SrsRtcSource> source(new SrsRtcSource());
HELPER_EXPECT_SUCCESS(source->initialize(req.get()));
// Create mock bridge that will fail on_publish
MockStreamBridge *bridge = new MockStreamBridge();
bridge->set_on_publish_should_fail(true);
source->set_bridge(bridge);
// Test on_publish with bridge error - should fail
HELPER_EXPECT_FAILED(source->on_publish());
// Verify bridge was called
EXPECT_EQ(1, bridge->on_publish_count_);
// Note: Even though on_publish failed, we should still clean up properly
// The bridge subscription might have succeeded before the error
if (source->is_created_) {
source->on_unpublish();
}
// Note: bridge is freed by source destructor, don't free manually
}
VOID TEST(AppTest2, RtcSourceOnPublishConsumerError)
{
srs_error_t err;
@ -2005,109 +1831,6 @@ VOID TEST(AppTest2, RtcSourceOnPublishSourceIdChange)
srs_freep(consumer);
}
VOID TEST(AppTest2, RtcSourceOnPublishMultipleBridgeOperations)
{
srs_error_t err;
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->host_ = "localhost";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "test";
// Create RTC source and initialize
SrsUniquePtr<SrsRtcSource> source(new SrsRtcSource());
HELPER_EXPECT_SUCCESS(source->initialize(req.get()));
// Create mock bridge
MockStreamBridge *bridge = new MockStreamBridge();
source->set_bridge(bridge);
// Test multiple on_publish calls
HELPER_EXPECT_SUCCESS(source->on_publish());
EXPECT_EQ(1, bridge->on_publish_count_);
HELPER_EXPECT_SUCCESS(source->on_publish());
EXPECT_EQ(2, bridge->on_publish_count_);
HELPER_EXPECT_SUCCESS(source->on_publish());
EXPECT_EQ(3, bridge->on_publish_count_);
// Clean up properly by calling on_unpublish to unsubscribe from timer
source->on_unpublish();
// Note: bridge is freed by source destructor
}
VOID TEST(AppTest2, RtcSourceOnPublishCompleteWorkflow)
{
srs_error_t err;
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->host_ = "localhost";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "test";
// Create RTC source and initialize
SrsUniquePtr<SrsRtcSource> source(new SrsRtcSource());
HELPER_EXPECT_SUCCESS(source->initialize(req.get()));
// Create stream description
SrsRtcSourceDescription *stream_desc = new SrsRtcSourceDescription();
stream_desc->audio_track_desc_ = new SrsRtcTrackDescription();
stream_desc->audio_track_desc_->type_ = "audio";
stream_desc->audio_track_desc_->media_ = new SrsAudioPayload(111, "opus", 48000, 2);
source->set_stream_desc(stream_desc);
// Create mock bridge
MockStreamBridge *bridge = new MockStreamBridge();
source->set_bridge(bridge);
// Create mock consumers
MockRtcConsumer *consumer1 = new MockRtcConsumer();
MockRtcConsumer *consumer2 = new MockRtcConsumer();
source->consumers_.push_back(consumer1);
source->consumers_.push_back(consumer2);
// Create mock event handler
MockRtcSourceEventHandler *handler = new MockRtcSourceEventHandler();
source->subscribe(handler);
// Verify initial state
EXPECT_FALSE(source->is_created_);
EXPECT_FALSE(source->is_delivering_packets_);
EXPECT_EQ(0, bridge->on_publish_count_);
EXPECT_EQ(0, consumer1->update_source_id_count_);
EXPECT_EQ(0, consumer2->update_source_id_count_);
// Test complete on_publish workflow
HELPER_EXPECT_SUCCESS(source->on_publish());
// Verify all components were properly handled
EXPECT_TRUE(source->is_created_);
EXPECT_TRUE(source->is_delivering_packets_);
EXPECT_EQ(1, bridge->on_publish_count_);
EXPECT_EQ(1, consumer1->update_source_id_count_);
EXPECT_EQ(1, consumer1->stream_change_count_);
EXPECT_EQ(1, consumer2->update_source_id_count_);
EXPECT_EQ(1, consumer2->stream_change_count_);
// Verify source ID was set
EXPECT_FALSE(source->source_id().empty());
// Clean up properly by calling on_unpublish to unsubscribe from timer
source->on_unpublish();
// Clean up
srs_freep(consumer1);
srs_freep(consumer2);
srs_freep(handler);
srs_freep(stream_desc);
}
VOID TEST(AppTest2, RtcSourceSubscribeBasic)
{
srs_error_t err;
@ -2833,262 +2556,6 @@ VOID TEST(AppTest2, RtcSourceOnRtpConsumerEnqueueError)
srs_freep(consumer3);
}
#ifdef SRS_FFMPEG_FIT
VOID TEST(AppTest2, RtcSourceOnRtpWithFrameBuilder)
{
srs_error_t err;
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->ip_ = "127.0.0.1";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "test";
// Create RTC source and initialize
SrsUniquePtr<SrsRtcSource> source(new SrsRtcSource());
HELPER_EXPECT_SUCCESS(source->initialize(req.get()));
// Create mock consumer
MockRtcConsumer *consumer = new MockRtcConsumer();
source->consumers_.push_back(consumer);
// Create mock bridge and frame builder
MockStreamBridge *mock_bridge = new MockStreamBridge();
MockRtcFrameBuilder *mock_frame_builder = new MockRtcFrameBuilder(mock_bridge);
source->frame_builder_ = mock_frame_builder;
// Create a test RTP packet
SrsUniquePtr<SrsRtpPacket> pkt(new SrsRtpPacket());
pkt->header_.set_sequence(100);
pkt->header_.set_timestamp(1000);
pkt->header_.set_ssrc(12345);
// Set mock circuit breaker
MockCircuitBreaker mock_circuit_breaker;
source->circuit_breaker_ = &mock_circuit_breaker;
// Test: on_rtp should call frame builder
HELPER_EXPECT_SUCCESS(source->on_rtp(pkt.get()));
// Verify consumer received packet
EXPECT_EQ(1, consumer->enqueue_count_);
// Verify frame builder was called
EXPECT_EQ(1, mock_frame_builder->on_rtp_count_);
// Clean up (don't free frame_builder as it's owned by source)
source->frame_builder_ = NULL; // Prevent double free
srs_freep(mock_frame_builder);
srs_freep(mock_bridge);
srs_freep(consumer);
}
VOID TEST(AppTest2, RtcSourceOnRtpFrameBuilderError)
{
srs_error_t err;
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->ip_ = "127.0.0.1";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "test";
// Create RTC source and initialize
SrsUniquePtr<SrsRtcSource> source(new SrsRtcSource());
HELPER_EXPECT_SUCCESS(source->initialize(req.get()));
// Create mock consumer
MockRtcConsumer *consumer = new MockRtcConsumer();
source->consumers_.push_back(consumer);
// Create mock bridge and frame builder that will fail
MockStreamBridge *mock_bridge = new MockStreamBridge();
MockRtcFrameBuilder *mock_frame_builder = new MockRtcFrameBuilder(mock_bridge);
srs_error_t test_error = srs_error_new(ERROR_SYSTEM_ASSERT_FAILED, "test frame builder error");
mock_frame_builder->set_on_rtp_error(test_error);
srs_freep(test_error);
source->frame_builder_ = mock_frame_builder;
// Create a test RTP packet
SrsUniquePtr<SrsRtpPacket> pkt(new SrsRtpPacket());
pkt->header_.set_sequence(100);
pkt->header_.set_timestamp(1000);
pkt->header_.set_ssrc(12345);
// Set mock circuit breaker
MockCircuitBreaker mock_circuit_breaker;
source->circuit_breaker_ = &mock_circuit_breaker;
// Test: on_rtp should fail when frame builder fails
HELPER_EXPECT_FAILED(source->on_rtp(pkt.get()));
// Verify consumer still received packet (consumers are processed first)
EXPECT_EQ(1, consumer->enqueue_count_);
// Verify frame builder was called
EXPECT_EQ(1, mock_frame_builder->on_rtp_count_);
// Clean up
source->frame_builder_ = NULL; // Prevent double free
srs_freep(mock_frame_builder);
srs_freep(mock_bridge);
srs_freep(consumer);
}
VOID TEST(AppTest2, RtcSourceOnRtpNoFrameBuilder)
{
srs_error_t err;
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->ip_ = "127.0.0.1";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "test";
// Create RTC source and initialize
SrsUniquePtr<SrsRtcSource> source(new SrsRtcSource());
HELPER_EXPECT_SUCCESS(source->initialize(req.get()));
// Create mock consumer
MockRtcConsumer *consumer = new MockRtcConsumer();
source->consumers_.push_back(consumer);
// Ensure frame_builder_ is NULL (default state)
EXPECT_TRUE(source->frame_builder_ == NULL);
// Create a test RTP packet
SrsUniquePtr<SrsRtpPacket> pkt(new SrsRtpPacket());
pkt->header_.set_sequence(100);
pkt->header_.set_timestamp(1000);
pkt->header_.set_ssrc(12345);
// Set mock circuit breaker
MockCircuitBreaker mock_circuit_breaker;
source->circuit_breaker_ = &mock_circuit_breaker;
// Test: on_rtp should succeed even without frame builder
HELPER_EXPECT_SUCCESS(source->on_rtp(pkt.get()));
// Verify consumer received packet
EXPECT_EQ(1, consumer->enqueue_count_);
// Clean up
srs_freep(consumer);
}
VOID TEST(AppTest2, RtcSourceOnRtpCompleteScenario)
{
srs_error_t err;
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->ip_ = "127.0.0.1";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "test";
// Create RTC source and initialize
SrsUniquePtr<SrsRtcSource> source(new SrsRtcSource());
HELPER_EXPECT_SUCCESS(source->initialize(req.get()));
// Create multiple mock consumers
MockRtcConsumer *consumer1 = new MockRtcConsumer();
MockRtcConsumer *consumer2 = new MockRtcConsumer();
MockRtcConsumer *consumer3 = new MockRtcConsumer();
source->consumers_.push_back(consumer1);
source->consumers_.push_back(consumer2);
source->consumers_.push_back(consumer3);
// Create mock bridge and frame builder
MockStreamBridge *mock_bridge = new MockStreamBridge();
MockRtcFrameBuilder *mock_frame_builder = new MockRtcFrameBuilder(mock_bridge);
source->frame_builder_ = mock_frame_builder;
// Create a test RTP packet
SrsUniquePtr<SrsRtpPacket> pkt(new SrsRtpPacket());
pkt->header_.set_sequence(100);
pkt->header_.set_timestamp(1000);
pkt->header_.set_ssrc(12345);
// Create mock circuit breaker
MockCircuitBreaker mock_circuit_breaker;
source->circuit_breaker_ = &mock_circuit_breaker;
// Test 1: Normal operation - circuit breaker not dying
HELPER_EXPECT_SUCCESS(source->on_rtp(pkt.get()));
// Verify all consumers received packet
EXPECT_EQ(1, consumer1->enqueue_count_);
EXPECT_EQ(1, consumer2->enqueue_count_);
EXPECT_EQ(1, consumer3->enqueue_count_);
// Verify frame builder was called
EXPECT_EQ(1, mock_frame_builder->on_rtp_count_);
// Test 2: Circuit breaker dying - should drop packet
mock_circuit_breaker.set_hybrid_dying_water_level(true);
HELPER_EXPECT_SUCCESS(source->on_rtp(pkt.get()));
// Verify consumers did not receive second packet (dropped due to circuit breaker)
EXPECT_EQ(1, consumer1->enqueue_count_);
EXPECT_EQ(1, consumer2->enqueue_count_);
EXPECT_EQ(1, consumer3->enqueue_count_);
// Verify frame builder was not called again
EXPECT_EQ(1, mock_frame_builder->on_rtp_count_);
// Clean up
source->frame_builder_ = NULL; // Prevent double free
srs_freep(mock_frame_builder);
srs_freep(mock_bridge);
srs_freep(consumer1);
srs_freep(consumer2);
srs_freep(consumer3);
}
VOID TEST(AppTest2, RtcSourceOnRtpNullCircuitBreaker)
{
srs_error_t err;
// Create a mock request
SrsUniquePtr<SrsRequest> req(new SrsRequest());
req->ip_ = "127.0.0.1";
req->vhost_ = "test.vhost";
req->app_ = "live";
req->stream_ = "test";
// Create RTC source and initialize
SrsUniquePtr<SrsRtcSource> source(new SrsRtcSource());
HELPER_EXPECT_SUCCESS(source->initialize(req.get()));
// Create consumer
MockRtcConsumer *consumer = new MockRtcConsumer();
source->consumers_.push_back(consumer);
// Create a test RTP packet
SrsUniquePtr<SrsRtpPacket> pkt(new SrsRtpPacket());
pkt->header_.set_sequence(100);
pkt->header_.set_timestamp(1000);
pkt->header_.set_ssrc(12345);
// Set circuit breaker to NULL
source->circuit_breaker_ = NULL;
// Test: on_rtp should succeed even with NULL circuit breaker
HELPER_EXPECT_SUCCESS(source->on_rtp(pkt.get()));
// Verify consumer received packet
EXPECT_EQ(1, consumer->enqueue_count_);
// Clean up
srs_freep(consumer);
}
#endif
VOID TEST(AppTest2, RtcSourceGetTrackDescNoStreamDesc)
{
srs_error_t err;
@ -3571,4 +3038,3 @@ VOID TEST(AppTest2, RtcSourceGetTrackDescMultipleMatchingVideoTracks)
EXPECT_EQ("video-h264-track-2", all_video_tracks[1]->id_);
EXPECT_EQ("video-h265-track", all_video_tracks[2]->id_);
}

View File

@ -1515,123 +1515,6 @@ VOID TEST(KernelRTC2Test, SrsRtcFrameBuilderVideoFrameDetectorNullPacketHandling
}
}
// Mock bridge for testing SrsRtcFrameBuilder
class MockRtcFrameBuilderBridge : public ISrsStreamBridge
{
public:
srs_error_t last_error;
int frame_count;
MockRtcFrameBuilderBridge()
{
last_error = NULL;
frame_count = 0;
}
virtual ~MockRtcFrameBuilderBridge()
{
srs_freep(last_error);
}
virtual srs_error_t initialize(ISrsRequest *r)
{
return srs_success;
}
virtual srs_error_t on_publish()
{
return srs_success;
}
virtual srs_error_t on_frame(SrsMediaPacket *frame)
{
frame_count++;
return srs_success;
}
virtual void on_unpublish()
{
}
};
VOID TEST(KernelRTC2Test, SrsRtcFrameBuilderPacketVideoRtmpNullPointerCrash)
{
srs_error_t err;
// Test reproducing the null pointer crash from issue #4450 fixed by PR #4451
//
// ISSUE BACKGROUND:
// Before PR 4451, the packet_video_rtmp() function assumed that the packet at the
// 'start' sequence number would always be available in the cache. However, due to
// network packet loss or reordering, the packet at the start position could be missing.
//
// THE CRASH:
// The original code did: pkt = cache_video_pkts_[cache_index(start)].pkt;
// When pkt was NULL, calling pkt->get_avsync_time() caused a segmentation fault.
//
// THE FIX:
// PR #4451 added a loop to find the first non-null packet in the sequence range
// instead of blindly using the packet at the start position.
//
// This test simulates the crash scenario: packets exist at positions 101, 102, 103
// but the start packet (100) is missing. With the fix, the function should use
// packet 101 (first available) instead of crashing on the missing packet 100.
if (true) {
MockRtcFrameBuilderBridge bridge;
SrsRtcFrameBuilder frame_builder(&bridge);
// Skip initialization and directly set up the test scenario
// We only need to test the packet_video_rtmp function, not the full initialization
// Manually populate the video cache to simulate the crash scenario
// We'll store packets at positions 101, 102, 103 but NOT at position 100 (start)
// This simulates network packet loss where the first packet is missing
// Create test packets with payload to ensure nb_payload > 0
SrsRtpPacket *pkt101 = mock_create_test_rtp_packet(101, 1000, false);
SrsRtpPacket *pkt102 = mock_create_test_rtp_packet(102, 1000, false);
SrsRtpPacket *pkt103 = mock_create_test_rtp_packet(103, 1000, true); // marker bit
// Add some payload to ensure packets are not empty
char payload_data[] = "test_payload_data";
SrsRtpRawPayload *payload101 = new SrsRtpRawPayload();
payload101->payload_ = (char *)payload_data;
payload101->nn_payload_ = strlen(payload_data);
pkt101->set_payload(payload101, SrsRtpPacketPayloadTypeRaw);
SrsRtpRawPayload *payload102 = new SrsRtpRawPayload();
payload102->payload_ = (char *)payload_data;
payload102->nn_payload_ = strlen(payload_data);
pkt102->set_payload(payload102, SrsRtpPacketPayloadTypeRaw);
SrsRtpRawPayload *payload103 = new SrsRtpRawPayload();
payload103->payload_ = (char *)payload_data;
payload103->nn_payload_ = strlen(payload_data);
pkt103->set_payload(payload103, SrsRtpPacketPayloadTypeRaw);
// Set the avsync time for the packets to avoid other null pointer issues
pkt101->set_avsync_time(1000);
pkt102->set_avsync_time(1000);
pkt103->set_avsync_time(1000);
// Store packets in cache (but skip sequence 100 - this is the missing start packet)
frame_builder.video_cache_->store_packet(pkt101);
frame_builder.video_cache_->store_packet(pkt102);
frame_builder.video_cache_->store_packet(pkt103);
// Before the fix in PR #4451, calling packet_video_rtmp(100, 103) would crash
// because it would try to access cache_video_pkts_[cache_index(100)].pkt which is NULL
// and then call pkt->get_avsync_time() causing a null pointer dereference
// The fix ensures we find the first non-null packet (101) instead of assuming
// the start packet (100) exists
HELPER_EXPECT_SUCCESS(frame_builder.packet_video_rtmp(100, 103));
// Verify that a frame was successfully processed
EXPECT_EQ(1, bridge.frame_count);
}
}
VOID TEST(KernelRTC2Test, SrsRtcFrameBuilderSequenceWrapAroundFix)
{
// Test for the sequence number wraparound assertion fix