diff --git a/README.md b/README.md index 1c490f621..3e25a39f4 100755 --- a/README.md +++ b/README.md @@ -159,6 +159,8 @@ For previous versions, please read: ## V4 changes +* v4.0, 2020-05-14, For [#307][bug #307], refine core structure, RTMP base on frame, RTC base on RTP. 4.0.26 +* v4.0, 2020-05-11, For [#307][bug #307], refine RTC publisher structure. 4.0.25 * v4.0, 2020-04-30, For [#307][bug #307], support publish RTC with passing opus. 4.0.24 * v4.0, 2020-04-14, For [#307][bug #307], support sendmmsg, GSO and reuseport. 4.0.23 * v4.0, 2020-04-05, For [#307][bug #307], SRTP ASM only works with openssl-1.0, auto detect it. 4.0.22 diff --git a/trunk/configure b/trunk/configure index f34c47e30..210107426 100755 --- a/trunk/configure +++ b/trunk/configure @@ -279,8 +279,8 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" "srs_app_coworkers" "srs_app_hybrid") if [[ $SRS_RTC == YES ]]; then - MODULE_FILES+=("srs_app_rtc" "srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_codec" "srs_app_rtc_sdp" - "srs_app_rtc_queue" "srs_app_rtc_server") + MODULE_FILES+=("srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_codec" "srs_app_rtc_sdp" + "srs_app_rtc_queue" "srs_app_rtc_server" "srs_app_rtc_source") fi if [[ $SRS_GB28181 == YES ]]; then MODULE_FILES+=("srs_app_gb28181" "srs_app_gb28181_sip") diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 0603b7a4b..db2a0103a 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4879,7 +4879,9 @@ bool SrsConfig::get_rtc_server_gso() } #elif LINUX_VERSION_CODE < KERNEL_VERSION(4,18,0) if (v) { - utsname un = {0}; + utsname un; + memset((void*)&un, 0, sizeof(utsname)); + int r0 = uname(&un); if (r0 || strcmp(un.release, "4.18.0") < 0) { gso_disabled = true; diff --git a/trunk/src/app/srs_app_rtc.cpp b/trunk/src/app/srs_app_rtc.cpp deleted file mode 100644 index 2e05100b8..000000000 --- a/trunk/src/app/srs_app_rtc.cpp +++ /dev/null @@ -1,370 +0,0 @@ -/** - * The MIT License (MIT) - * - * Copyright (c) 2013-2020 John - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -using namespace std; - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// TODO: Add this function into SrsRtpMux class. -srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf) -{ - srs_error_t err = srs_success; - - if (format->is_aac_sequence_header()) { - return err; - } - - if (format->audio->nb_samples != 1) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "adts"); - } - - int nb_buf = format->audio->samples[0].size + 7; - char* buf = new char[nb_buf]; - SrsBuffer stream(buf, nb_buf); - - // TODO: Add comment. - stream.write_1bytes(0xFF); - stream.write_1bytes(0xF9); - stream.write_1bytes(((format->acodec->aac_object - 1) << 6) | ((format->acodec->aac_sample_rate & 0x0F) << 2) | ((format->acodec->aac_channels & 0x04) >> 2)); - stream.write_1bytes(((format->acodec->aac_channels & 0x03) << 6) | ((nb_buf >> 11) & 0x03)); - stream.write_1bytes((nb_buf >> 3) & 0xFF); - stream.write_1bytes(((nb_buf & 0x07) << 5) | 0x1F); - stream.write_1bytes(0xFC); - - stream.write_bytes(format->audio->samples[0].bytes, format->audio->samples[0].size); - - *pbuf = buf; - *pnn_buf = nb_buf; - - return err; -} - -SrsRtpH264Muxer::SrsRtpH264Muxer() -{ - discard_bframe = false; -} - -SrsRtpH264Muxer::~SrsRtpH264Muxer() -{ -} - -srs_error_t SrsRtpH264Muxer::filter(SrsSharedPtrMessage* shared_frame, SrsFormat* format) -{ - srs_error_t err = srs_success; - - // If IDR, we will insert SPS/PPS before IDR frame. - if (format->video && format->video->has_idr) { - shared_frame->set_has_idr(true); - } - - // Update samples to shared frame. - for (int i = 0; i < format->video->nb_samples; ++i) { - SrsSample* sample = &format->video->samples[i]; - - // Because RTC does not support B-frame, so we will drop them. - // TODO: Drop B-frame in better way, which not cause picture corruption. - if (discard_bframe) { - if ((err = sample->parse_bframe()) != srs_success) { - return srs_error_wrap(err, "parse bframe"); - } - if (sample->bframe) { - continue; - } - } - } - - if (format->video->nb_samples <= 0) { - return err; - } - - shared_frame->set_samples(format->video->samples, format->video->nb_samples); - - return err; -} - -SrsRtpOpusMuxer::SrsRtpOpusMuxer() -{ - codec = NULL; -} - -SrsRtpOpusMuxer::~SrsRtpOpusMuxer() -{ - srs_freep(codec); -} - -srs_error_t SrsRtpOpusMuxer::initialize() -{ - srs_error_t err = srs_success; - - codec = new SrsAudioRecode(kChannel, kSamplerate); - if (!codec) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "SrsAacOpus init failed"); - } - - if ((err = codec->initialize()) != srs_success) { - return srs_error_wrap(err, "init codec"); - } - - return err; -} - -// An AAC packet may be transcoded to many OPUS packets. -const int kMaxOpusPackets = 8; -// The max size for each OPUS packet. -const int kMaxOpusPacketSize = 4096; - -srs_error_t SrsRtpOpusMuxer::transcode(SrsSharedPtrMessage* shared_audio, char* adts_audio, int nn_adts_audio) -{ - srs_error_t err = srs_success; - - // Opus packet cache. - static char* opus_payloads[kMaxOpusPackets]; - - static bool initialized = false; - if (!initialized) { - initialized = true; - - static char opus_packets_cache[kMaxOpusPackets][kMaxOpusPacketSize]; - opus_payloads[0] = &opus_packets_cache[0][0]; - for (int i = 1; i < kMaxOpusPackets; i++) { - opus_payloads[i] = opus_packets_cache[i]; - } - } - - // Transcode an aac packet to many opus packets. - SrsSample aac; - aac.bytes = adts_audio; - aac.size = nn_adts_audio; - - int nn_opus_packets = 0; - int opus_sizes[kMaxOpusPackets]; - if ((err = codec->recode(&aac, opus_payloads, opus_sizes, nn_opus_packets)) != srs_success) { - return srs_error_wrap(err, "recode error"); - } - - // Save OPUS packets in shared message. - if (nn_opus_packets <= 0) { - return err; - } - - int nn_max_extra_payload = 0; - SrsSample samples[nn_opus_packets]; - for (int i = 0; i < nn_opus_packets; i++) { - SrsSample* p = samples + i; - p->size = opus_sizes[i]; - p->bytes = new char[p->size]; - memcpy(p->bytes, opus_payloads[i], p->size); - - nn_max_extra_payload = srs_max(nn_max_extra_payload, p->size); - } - - shared_audio->set_extra_payloads(samples, nn_opus_packets); - shared_audio->set_max_extra_payload(nn_max_extra_payload); - - return err; -} - -SrsRtc::SrsRtc() -{ - req = NULL; - hub = NULL; - - enabled = false; - disposable = false; - last_update_time = 0; - - discard_aac = false; -} - -SrsRtc::~SrsRtc() -{ - srs_freep(rtp_h264_muxer); -} - -void SrsRtc::dispose() -{ - if (enabled) { - on_unpublish(); - } -} - -// TODO: FIXME: Dead code? -srs_error_t SrsRtc::cycle() -{ - srs_error_t err = srs_success; - - return err; -} - -srs_error_t SrsRtc::initialize(SrsOriginHub* h, SrsRequest* r) -{ - srs_error_t err = srs_success; - - hub = h; - req = r; - - rtp_h264_muxer = new SrsRtpH264Muxer(); - rtp_h264_muxer->discard_bframe = _srs_config->get_rtc_bframe_discard(req->vhost); - // TODO: FIXME: Support reload and log it. - discard_aac = _srs_config->get_rtc_aac_discard(req->vhost); - - rtp_opus_muxer = new SrsRtpOpusMuxer(); - if (!rtp_opus_muxer) { - return srs_error_wrap(err, "rtp_opus_muxer nullptr"); - } - - return rtp_opus_muxer->initialize(); -} - -srs_error_t SrsRtc::on_publish() -{ - srs_error_t err = srs_success; - - // update the hls time, for hls_dispose. - last_update_time = srs_get_system_time(); - - // support multiple publish. - if (enabled) { - return err; - } - - if (!_srs_config->get_rtc_enabled(req->vhost)) { - return err; - } - - // if enabled, open the muxer. - enabled = true; - - // ok, the hls can be dispose, or need to be dispose. - disposable = true; - - return err; -} - -void SrsRtc::on_unpublish() -{ - // support multiple unpublish. - if (!enabled) { - return; - } - - enabled = false; -} - -srs_error_t SrsRtc::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format) -{ - srs_error_t err = srs_success; - - if (!enabled) { - return err; - } - - // Ignore if no format->acodec, it means the codec is not parsed, or unknown codec. - // @issue https://github.com/ossrs/srs/issues/1506#issuecomment-562079474 - if (!format->acodec) { - return err; - } - - // update the hls time, for hls_dispose. - last_update_time = srs_get_system_time(); - - // ts support audio codec: aac/mp3 - SrsAudioCodecId acodec = format->acodec->id; - if (acodec != SrsAudioCodecIdAAC && acodec != SrsAudioCodecIdMP3) { - return err; - } - - // When drop aac audio packet, never transcode. - if (discard_aac && acodec == SrsAudioCodecIdAAC) { - return err; - } - - // ignore sequence header - srs_assert(format->audio); - - char* adts_audio = NULL; - int nn_adts_audio = 0; - // TODO: FIXME: Reserve 7 bytes header when create shared message. - if ((err = aac_raw_append_adts_header(shared_audio, format, &adts_audio, &nn_adts_audio)) != srs_success) { - return srs_error_wrap(err, "aac append header"); - } - - if (adts_audio) { - err = rtp_opus_muxer->transcode(shared_audio, adts_audio, nn_adts_audio); - srs_freep(adts_audio); - } - - return err; -} - -srs_error_t SrsRtc::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format) -{ - srs_error_t err = srs_success; - - // TODO: FIXME: Maybe it should config on vhost level. - if (!enabled) { - return err; - } - - // Ignore if no format->vcodec, it means the codec is not parsed, or unknown codec. - // @issue https://github.com/ossrs/srs/issues/1506#issuecomment-562079474 - if (!format->vcodec) { - return err; - } - - // update the hls time, for hls_dispose. - last_update_time = srs_get_system_time(); - - // ignore info frame, - // @see https://github.com/ossrs/srs/issues/288#issuecomment-69863909 - srs_assert(format->video); - return rtp_h264_muxer->filter(shared_video, format); -} diff --git a/trunk/src/app/srs_app_rtc.hpp b/trunk/src/app/srs_app_rtc.hpp deleted file mode 100644 index df3f85306..000000000 --- a/trunk/src/app/srs_app_rtc.hpp +++ /dev/null @@ -1,105 +0,0 @@ -/** - * The MIT License (MIT) - * - * Copyright (c) 2013-2020 John - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#ifndef SRS_APP_RTC_HPP -#define SRS_APP_RTC_HPP - -#include - -#include -#include -#include - -class SrsFormat; -class SrsSample; -class SrsSharedPtrMessage; -class SrsRequest; -class SrsOriginHub; -class SrsAudioRecode; -class SrsBuffer; - -// The RTP packet max size, should never exceed this size. -const int kRtpPacketSize = 1500; - -// Payload type will rewrite in srs_app_rtc_conn.cpp when send to client. -const uint8_t kOpusPayloadType = 111; -const uint8_t kH264PayloadType = 102; - -const int kChannel = 2; -const int kSamplerate = 48000; - -// SSRC will rewrite in srs_app_rtc_conn.cpp when send to client. -const uint32_t kAudioSSRC = 1; -const uint32_t kVideoSSRC = 2; - -// TODO: Define interface class like ISrsRtpMuxer -class SrsRtpH264Muxer -{ -public: - bool discard_bframe; -public: - SrsRtpH264Muxer(); - virtual ~SrsRtpH264Muxer(); -public: - srs_error_t filter(SrsSharedPtrMessage* shared_video, SrsFormat* format); -}; - -// TODO: FIXME: It's not a muxer, but a transcoder. -class SrsRtpOpusMuxer -{ -private: - SrsAudioRecode* codec; -public: - SrsRtpOpusMuxer(); - virtual ~SrsRtpOpusMuxer(); - virtual srs_error_t initialize(); -public: - srs_error_t transcode(SrsSharedPtrMessage* shared_audio, char* adts_audio, int nn_adts_audio); -}; - -class SrsRtc -{ -private: - SrsRequest* req; - bool enabled; - bool disposable; - bool discard_aac; - srs_utime_t last_update_time; - SrsRtpH264Muxer* rtp_h264_muxer; - SrsRtpOpusMuxer* rtp_opus_muxer; - SrsOriginHub* hub; -public: - SrsRtc(); - virtual ~SrsRtc(); -public: - virtual void dispose(); - virtual srs_error_t cycle(); -public: - virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r); - virtual srs_error_t on_publish(); - virtual void on_unpublish(); - virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format); - virtual srs_error_t on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format); -}; - -#endif diff --git a/trunk/src/app/srs_app_rtc_codec.cpp b/trunk/src/app/srs_app_rtc_codec.cpp index c703b425a..7c0db4454 100644 --- a/trunk/src/app/srs_app_rtc_codec.cpp +++ b/trunk/src/app/srs_app_rtc_codec.cpp @@ -384,8 +384,7 @@ srs_error_t SrsAudioRecode::initialize() return err; } -// TODO: FIXME: Rename to transcode. -srs_error_t SrsAudioRecode::recode(SrsSample *pkt, char **buf, int *buf_len, int &n) +srs_error_t SrsAudioRecode::transcode(SrsSample *pkt, char **buf, int *buf_len, int &n) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_codec.hpp b/trunk/src/app/srs_app_rtc_codec.hpp index d236a2304..a37f6525f 100644 --- a/trunk/src/app/srs_app_rtc_codec.hpp +++ b/trunk/src/app/srs_app_rtc_codec.hpp @@ -121,7 +121,7 @@ public: SrsAudioRecode(int channels, int samplerate); virtual ~SrsAudioRecode(); srs_error_t initialize(); - virtual srs_error_t recode(SrsSample *pkt, char **buf, int *buf_len, int &n); + virtual srs_error_t transcode(SrsSample *pkt, char **buf, int *buf_len, int &n); }; #endif /* SRS_APP_AUDIO_RECODE_HPP */ diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index b4352f236..5ccac9172 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -53,7 +53,6 @@ using namespace std; #include #include #include -#include #include #include #include @@ -64,12 +63,7 @@ using namespace std; #include #include #include - -// The RTP payload max size, reserved some paddings for SRTP as such: -// kRtpPacketSize = kRtpMaxPayloadSize + paddings -// For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400, -// which reserves 100 bytes for SRTP or paddings. -const int kRtpMaxPayloadSize = kRtpPacketSize - 200; +#include string gen_random_str(int len) { @@ -475,86 +469,23 @@ srs_error_t SrsRtcDtls::unprotect_rtcp(char* out_buf, const char* in_buf, int& n return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed"); } -SrsRtcOutgoingPackets::SrsRtcOutgoingPackets(int nn_cache_max) +SrsRtcOutgoingInfo::SrsRtcOutgoingInfo() { #if defined(SRS_DEBUG) debug_id = 0; #endif use_gso = false; - should_merge_nalus = false; - nn_rtp_pkts = 0; nn_audios = nn_extras = 0; nn_videos = nn_samples = 0; nn_bytes = nn_rtp_bytes = 0; nn_padding_bytes = nn_paddings = 0; nn_dropped = 0; - - cursor = 0; - nn_cache = nn_cache_max; - // TODO: FIXME: We should allocate a smaller cache, and increase it when exhausted. - cache = new SrsRtpPacket2[nn_cache]; } -SrsRtcOutgoingPackets::~SrsRtcOutgoingPackets() +SrsRtcOutgoingInfo::~SrsRtcOutgoingInfo() { - srs_freepa(cache); - nn_cache = 0; -} - -void SrsRtcOutgoingPackets::reset(bool gso, bool merge_nalus) -{ - for (int i = 0; i < cursor; i++) { - SrsRtpPacket2* packet = cache + i; - packet->reset(); - } - -#if defined(SRS_DEBUG) - debug_id++; -#endif - - use_gso = gso; - should_merge_nalus = merge_nalus; - - nn_rtp_pkts = 0; - nn_audios = nn_extras = 0; - nn_videos = nn_samples = 0; - nn_bytes = nn_rtp_bytes = 0; - nn_padding_bytes = nn_paddings = 0; - nn_dropped = 0; - - cursor = 0; -} - -SrsRtpPacket2* SrsRtcOutgoingPackets::fetch() -{ - if (cursor >= nn_cache) { - return NULL; - } - return cache + (cursor++); -} - -SrsRtpPacket2* SrsRtcOutgoingPackets::back() -{ - srs_assert(cursor > 0); - return cache + cursor - 1; -} - -int SrsRtcOutgoingPackets::size() -{ - return cursor; -} - -int SrsRtcOutgoingPackets::capacity() -{ - return nn_cache; -} - -SrsRtpPacket2* SrsRtcOutgoingPackets::at(int index) -{ - srs_assert(index < cursor); - return cache + index; } SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid) @@ -565,7 +496,6 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid) session_ = s; gso = false; - merge_nalus = false; max_padding = 0; audio_timestamp = 0; @@ -573,7 +503,6 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid) video_sequence = 0; - mw_sleep = 0; mw_msgs = 0; realtime = true; @@ -607,12 +536,11 @@ srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assr audio_payload_type = a_pt; gso = _srs_config->get_rtc_server_gso(); - merge_nalus = _srs_config->get_rtc_server_merge_nalus(); max_padding = _srs_config->get_rtc_server_padding(); // TODO: FIXME: Support reload. nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost); - srs_trace("RTC publisher video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d), padding=%d, nack=%d", - video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus, max_padding, nack_enabled_); + srs_trace("RTC publisher video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), gso=%d, padding=%d, nack=%d", + video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, max_padding, nack_enabled_); return err; } @@ -620,10 +548,9 @@ srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assr srs_error_t SrsRtcPlayer::on_reload_rtc_server() { gso = _srs_config->get_rtc_server_gso(); - merge_nalus = _srs_config->get_rtc_server_merge_nalus(); max_padding = _srs_config->get_rtc_server_padding(); - srs_trace("Reload rtc_server gso=%d, merge_nalus=%d, max_padding=%d", gso, merge_nalus, max_padding); + srs_trace("Reload rtc_server gso=%d, max_padding=%d", gso, max_padding); return srs_success; } @@ -638,9 +565,8 @@ srs_error_t SrsRtcPlayer::on_reload_vhost_play(string vhost) realtime = _srs_config->get_realtime_enabled(req->vhost, true); mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); - mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); - srs_trace("Reload play realtime=%d, mw_msgs=%d, mw_sleep=%d", realtime, mw_msgs, mw_sleep); + srs_trace("Reload play realtime=%d, mw_msgs=%d", realtime, mw_msgs); return srs_success; } @@ -683,35 +609,25 @@ srs_error_t SrsRtcPlayer::cycle() { srs_error_t err = srs_success; - SrsSource* source = NULL; + SrsRtcSource* source = NULL; SrsRequest* req = session_->req; - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { return srs_error_wrap(err, "rtc fetch source failed"); } - SrsConsumer* consumer = NULL; - SrsAutoFree(SrsConsumer, consumer); - if ((err = source->create_consumer(NULL, consumer)) != srs_success) { + SrsRtcConsumer* consumer = NULL; + SrsAutoFree(SrsRtcConsumer, consumer); + if ((err = source->create_consumer(consumer)) != srs_success) { return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str()); } - // For RTC, we enable pass-timestamp mode, ignore the timestamp in queue, never depends on the duration, - // because RTC allows the audio and video has its own timebase, that is the audio timestamp and video timestamp - // maybe not monotonically increase. - // In this mode, we use mw_msgs to set the delay. We never shrink the consumer queue, instead, we dumps the - // messages and drop them if the shared sender queue is full. - consumer->enable_pass_timestamp(); - // TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames. if ((err = source->consumer_dumps(consumer)) != srs_success) { return srs_error_wrap(err, "dumps consumer, source url=%s", req->get_stream_url().c_str()); } realtime = _srs_config->get_realtime_enabled(req->vhost, true); - mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); // We merged write more messages, so we need larger queue. @@ -722,11 +638,8 @@ srs_error_t SrsRtcPlayer::cycle() sender->set_extra_ratio(80); } - srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", req->get_stream_url().c_str(), - ::getpid(), source->source_id(), session_->encrypt, realtime, srsu2msi(mw_sleep), mw_msgs); - - SrsMessageArray msgs(SRS_PERF_MW_MSGS); - SrsRtcOutgoingPackets pkts(SRS_PERF_RTC_RTP_PACKETS); + srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_msgs=%d", req->get_stream_url().c_str(), + ::getpid(), source->source_id(), session_->encrypt, realtime, mw_msgs); SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_play(); SrsAutoFree(SrsPithyPrint, pprint); @@ -735,41 +648,35 @@ srs_error_t SrsRtcPlayer::cycle() bool stat_enabled = _srs_config->get_rtc_server_perf_stat(); SrsStatistic* stat = SrsStatistic::instance(); + // TODO: FIXME: Use cache for performance? + vector pkts; + SrsRtcOutgoingInfo info; + while (true) { if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "rtc sender thread"); } -#ifdef SRS_PERF_QUEUE_COND_WAIT - // Wait for amount of messages or a duration. - consumer->wait(mw_msgs, mw_sleep); -#endif + // Wait for amount of packets. + consumer->wait(mw_msgs); - // Try to read some messages. - int msg_count = 0; - if ((err = consumer->dump_packets(&msgs, msg_count)) != srs_success) { + // TODO: FIXME: Handle error. + consumer->dump_packets(pkts); + + int msg_count = (int)pkts.size(); + if (!msg_count) { continue; } - if (msg_count <= 0) { -#ifndef SRS_PERF_QUEUE_COND_WAIT - srs_usleep(mw_sleep); -#endif - continue; - } + // Send-out all RTP packets and do cleanup. + // TODO: FIXME: Handle error. + send_messages(source, pkts, info); - // Transmux and send out messages. - pkts.reset(gso, merge_nalus); - - if ((err = send_messages(source, msgs.msgs, msg_count, pkts)) != srs_success) { - srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); - } - - // Do cleanup messages. for (int i = 0; i < msg_count; i++) { - SrsSharedPtrMessage* msg = msgs.msgs[i]; - srs_freep(msg); + SrsRtpPacket2* pkt = pkts[i]; + srs_freep(pkt); } + pkts.clear(); // Stat for performance analysis. if (!stat_enabled) { @@ -779,36 +686,29 @@ srs_error_t SrsRtcPlayer::cycle() // Stat the original RAW AV frame, maybe h264+aac. stat->perf_on_msgs(msg_count); // Stat the RTC packets, RAW AV frame, maybe h.264+opus. - int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos; + int nn_rtc_packets = srs_max(info.nn_audios, info.nn_extras) + info.nn_videos; stat->perf_on_rtc_packets(nn_rtc_packets); // Stat the RAW RTP packets, which maybe group by GSO. - stat->perf_on_rtp_packets(pkts.size()); + stat->perf_on_rtp_packets(msg_count); // Stat the RTP packets going into kernel. - stat->perf_on_gso_packets(pkts.nn_rtp_pkts); + stat->perf_on_gso_packets(info.nn_rtp_pkts); // Stat the bytes and paddings. - stat->perf_on_rtc_bytes(pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes); + stat->perf_on_rtc_bytes(info.nn_bytes, info.nn_rtp_bytes, info.nn_padding_bytes); // Stat the messages and dropped count. - stat->perf_on_dropped(msg_count, nn_rtc_packets, pkts.nn_dropped); - -#if defined(SRS_DEBUG) - srs_trace("RTC PLAY perf, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes", - msg_count, nn_rtc_packets, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, - pkts.nn_samples, pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes); -#endif + stat->perf_on_dropped(msg_count, nn_rtc_packets, info.nn_dropped); pprint->elapse(); if (pprint->can_print()) { // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. srs_trace("-> RTC PLAY %d/%d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes, %d pad, %d/%d cache", - msg_count, pkts.nn_dropped, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, pkts.nn_samples, pkts.nn_bytes, - pkts.nn_rtp_bytes, pkts.nn_padding_bytes, pkts.nn_paddings, pkts.size(), pkts.capacity()); + msg_count, info.nn_dropped, msg_count, info.nn_rtp_pkts, info.nn_audios, info.nn_extras, info.nn_videos, info.nn_samples, info.nn_bytes, + info.nn_rtp_bytes, info.nn_padding_bytes, info.nn_paddings, msg_count, msg_count); } } } -srs_error_t SrsRtcPlayer::send_messages( - SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets -) { +srs_error_t SrsRtcPlayer::send_messages(SrsRtcSource* source, vector& pkts, SrsRtcOutgoingInfo& info) +{ srs_error_t err = srs_success; // If DTLS is not OK, drop all messages. @@ -817,15 +717,16 @@ srs_error_t SrsRtcPlayer::send_messages( } // Covert kernel messages to RTP packets. - if ((err = messages_to_packets(source, msgs, nb_msgs, packets)) != srs_success) { + if ((err = messages_to_packets(source, pkts, info)) != srs_success) { return srs_error_wrap(err, "messages to packets"); } #ifndef SRS_OSX // If enabled GSO, send out some packets in a msghdr. // @remark When NACK simulator is on, we don't use GSO. - if (packets.use_gso && !nn_simulate_nack_drop) { - if ((err = send_packets_gso(packets)) != srs_success) { + // TODO: FIXME: Support GSO. + if (info.use_gso && !nn_simulate_nack_drop) { + if ((err = send_packets_gso(pkts, info)) != srs_success) { return srs_error_wrap(err, "gso send"); } return err; @@ -833,99 +734,84 @@ srs_error_t SrsRtcPlayer::send_messages( #endif // By default, we send packets by sendmmsg. - if ((err = send_packets(packets)) != srs_success) { + if ((err = send_packets(pkts, info)) != srs_success) { return srs_error_wrap(err, "raw send"); } return err; } -srs_error_t SrsRtcPlayer::messages_to_packets( - SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets -) { +srs_error_t SrsRtcPlayer::messages_to_packets(SrsRtcSource* source, vector& pkts, SrsRtcOutgoingInfo& info) +{ srs_error_t err = srs_success; ISrsUdpSender* sender = session_->sendonly_skt->sender(); - for (int i = 0; i < nb_msgs; i++) { - SrsSharedPtrMessage* msg = msgs[i]; + for (int i = 0; i < (int)pkts.size(); i++) { + SrsRtpPacket2* pkt = pkts[i]; // If overflow, drop all messages. if (sender->overflow()) { - packets.nn_dropped += nb_msgs - i; + info.nn_dropped += (int)pkts.size() - i; return err; } // Update stats. - packets.nn_bytes += msg->size; - - int nn_extra_payloads = msg->nn_extra_payloads(); - packets.nn_extras += nn_extra_payloads; - - int nn_samples = msg->nn_samples(); - packets.nn_samples += nn_samples; + info.nn_bytes += pkt->nb_bytes(); // For audio, we transcoded AAC to opus in extra payloads. - if (msg->is_audio()) { - packets.nn_audios++; + if (pkt->is_audio()) { + info.nn_audios++; - for (int i = 0; i < nn_extra_payloads; i++) { - SrsSample* sample = msg->extra_payloads() + i; - if ((err = package_opus(sample, packets, msg->nn_max_extra_payloads())) != srs_success) { - return srs_error_wrap(err, "opus package"); - } + if ((err = package_opus(pkt)) != srs_success) { + return srs_error_wrap(err, "package opus"); } continue; } // For video, we should process all NALUs in samples. - packets.nn_videos++; + info.nn_videos++; - // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. - if (msg->has_idr()) { - if ((err = package_stap_a(source, msg, packets)) != srs_success) { - return srs_error_wrap(err, "packet stap-a"); - } - } - - // If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet. - if (packets.should_merge_nalus && nn_samples > 1) { - if ((err = package_nalus(msg, packets)) != srs_success) { - return srs_error_wrap(err, "packet stap-a"); - } - continue; - } - - // By default, we package each NALU(sample) to a RTP or FUA packet. - for (int i = 0; i < nn_samples; i++) { - SrsSample* sample = msg->samples() + i; - - // We always ignore bframe here, if config to discard bframe, - // the bframe flag will not be set. - if (sample->bframe) { - continue; - } - - if (sample->size <= kRtpMaxPayloadSize) { - if ((err = package_single_nalu(msg, sample, packets)) != srs_success) { - return srs_error_wrap(err, "packet single nalu"); - } - } else { - if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) { - return srs_error_wrap(err, "packet fu-a"); - } - } - - if (i == nn_samples - 1) { - packets.back()->rtp_header.set_marker(true); - } + // For video, we should set the RTP packet informations about this consumer. + if ((err = package_video(pkt)) != srs_success) { + return srs_error_wrap(err, "package video"); } } return err; } -srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) +srs_error_t SrsRtcPlayer::package_opus(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + pkt->rtp_header.set_timestamp(audio_timestamp); + pkt->rtp_header.set_sequence(audio_sequence++); + pkt->rtp_header.set_ssrc(audio_ssrc); + pkt->rtp_header.set_payload_type(audio_payload_type); + + // TODO: FIXME: Padding audio to the max payload in RTP packets. + if (max_padding > 0) { + } + + // TODO: FIXME: Why 960? Need Refactoring? + audio_timestamp += 960; + + return err; +} + +srs_error_t SrsRtcPlayer::package_video(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + pkt->rtp_header.set_sequence(video_sequence++); + pkt->rtp_header.set_ssrc(video_ssrc); + pkt->rtp_header.set_payload_type(video_payload_type); + + return err; +} + +srs_error_t SrsRtcPlayer::send_packets(std::vector& pkts, SrsRtcOutgoingInfo& info) { srs_error_t err = srs_success; @@ -933,9 +819,8 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) bool encrypt = session_->encrypt; ISrsUdpSender* sender = session_->sendonly_skt->sender(); - int nn_packets = packets.size(); - for (int i = 0; i < nn_packets; i++) { - SrsRtpPacket2* packet = packets.at(i); + for (int i = 0; i < (int)pkts.size(); i++) { + SrsRtpPacket2* pkt = pkts.at(i); // Fetch a cached message from queue. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. @@ -956,7 +841,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) // Marshal packet to bytes in iovec. if (true) { SrsBuffer stream((char*)iov->iov_base, iov->iov_len); - if ((err = packet->encode(&stream)) != srs_success) { + if ((err = pkt->encode(&stream)) != srs_success) { return srs_error_wrap(err, "encode packet"); } iov->iov_len = stream.pos(); @@ -974,11 +859,12 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) // Put final RTP packet to NACK/ARQ queue. if (nack_enabled_) { SrsRtpPacket2* nack = new SrsRtpPacket2(); - nack->rtp_header = packet->rtp_header; - nack->padding = packet->padding; + nack->rtp_header = pkt->rtp_header; // TODO: FIXME: Should avoid memory copying. - SrsRtpRawPayload* payload = nack->reuse_raw(); + SrsRtpRawPayload* payload = new SrsRtpRawPayload(); + nack->payload = payload; + payload->nn_payload = (int)iov->iov_len; payload->payload = new char[payload->nn_payload]; memcpy((void*)payload->payload, iov->iov_base, iov->iov_len); @@ -990,7 +876,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) } } - packets.nn_rtp_bytes += (int)iov->iov_len; + info.nn_rtp_bytes += (int)iov->iov_len; // Set the address and control information. sockaddr_in* addr = (sockaddr_in*)session_->sendonly_skt->peer_addr(); @@ -1001,11 +887,11 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) mhdr->msg_hdr.msg_controllen = 0; // When we send out a packet, increase the stat counter. - packets.nn_rtp_pkts++; + info.nn_rtp_pkts++; // For NACK simulator, drop packet. if (nn_simulate_nack_drop) { - simulate_drop_packet(&packet->rtp_header, (int)iov->iov_len); + simulate_drop_packet(&pkt->rtp_header, (int)iov->iov_len); iov->iov_len = 0; continue; } @@ -1019,7 +905,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) } // TODO: FIXME: We can gather and pad audios, because they have similar size. -srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) +srs_error_t SrsRtcPlayer::send_packets_gso(vector& pkts, SrsRtcOutgoingInfo& info) { srs_error_t err = srs_success; @@ -1034,9 +920,9 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) // The message will marshal in iovec. iovec* iov = NULL; - int nn_packets = packets.size(); + int nn_packets = pkts.size(); for (int i = 0; i < nn_packets; i++) { - SrsRtpPacket2* packet = packets.at(i); + SrsRtpPacket2* packet = pkts.at(i); int nn_packet = packet->nb_bytes(); int padding = 0; @@ -1044,7 +930,7 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) int nn_next_packet = 0; if (max_padding > 0) { if (i < nn_packets - 1) { - next_packet = (i < nn_packets - 1)? packets.at(i + 1):NULL; + next_packet = (i < nn_packets - 1)? pkts.at(i + 1):NULL; nn_next_packet = next_packet? next_packet->nb_bytes() : 0; } @@ -1069,13 +955,13 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) if (padding > 0) { #if defined(SRS_DEBUG) - srs_trace("#%d, Padding %d bytes %d=>%d, packets %d, max_padding %d", packets.debug_id, + srs_trace("#%d, Padding %d bytes %d=>%d, packets %d, max_padding %d", info.debug_id, padding, nn_packet, nn_packet + padding, nn_packets, max_padding); #endif packet->add_padding(padding); nn_packet += padding; - packets.nn_paddings++; - packets.nn_padding_bytes += padding; + info.nn_paddings++; + info.nn_padding_bytes += padding; } } } @@ -1148,10 +1034,11 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) if (nack_enabled_) { SrsRtpPacket2* nack = new SrsRtpPacket2(); nack->rtp_header = packet->rtp_header; - nack->padding = packet->padding; // TODO: FIXME: Should avoid memory copying. - SrsRtpRawPayload* payload = nack->reuse_raw(); + SrsRtpRawPayload* payload = new SrsRtpRawPayload(); + nack->payload = payload; + payload->nn_payload = (int)iov->iov_len; payload->payload = new char[payload->nn_payload]; memcpy((void*)payload->payload, iov->iov_base, iov->iov_len); @@ -1163,7 +1050,7 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) } } - packets.nn_rtp_bytes += (int)iov->iov_len; + info.nn_rtp_bytes += (int)iov->iov_len; // If GSO, they must has same size, except the final one. if (using_gso && !gso_final && gso_encrypt && gso_encrypt != (int)iov->iov_len) { @@ -1184,12 +1071,12 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) #if defined(SRS_DEBUG) bool is_video = packet->rtp_header.get_payload_type() == video_payload_type; - srs_trace("#%d, Packet %s SSRC=%d, SN=%d, %d/%d bytes", packets.debug_id, is_video? "Video":"Audio", + srs_trace("#%d, Packet %s SSRC=%d, SN=%d, %d/%d bytes", info.debug_id, is_video? "Video":"Audio", packet->rtp_header.get_ssrc(), packet->rtp_header.get_sequence(), nn_packet - padding, padding); if (do_send) { for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) { iovec* iov = mhdr->msg_hdr.msg_iov + j; - srs_trace("#%d, %s #%d/%d/%d, %d/%d bytes, size %d/%d", packets.debug_id, (using_gso? "GSO":"RAW"), j, + srs_trace("#%d, %s #%d/%d/%d, %d/%d bytes, size %d/%d", info.debug_id, (using_gso? "GSO":"RAW"), j, gso_cursor + 1, mhdr->msg_hdr.msg_iovlen, iov->iov_len, padding, gso_size, gso_encrypt); } } @@ -1220,7 +1107,7 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) #endif // When we send out a packet, we commit a RTP packet. - packets.nn_rtp_pkts++; + info.nn_rtp_pkts++; if ((err = sender->sendmmsg(mhdr)) != srs_success) { return srs_error_wrap(err, "send msghdr"); @@ -1233,256 +1120,14 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) } #if defined(SRS_DEBUG) - srs_trace("#%d, RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d, pad %d/%d/%d", packets.debug_id, packets.size(), - packets.nn_rtp_pkts, packets.nn_videos, packets.nn_samples, packets.nn_audios, packets.nn_extras, packets.nn_paddings, - packets.nn_padding_bytes, packets.nn_rtp_bytes); + srs_trace("#%d, RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d, pad %d/%d/%d", info.debug_id, pkts.size(), + info.nn_rtp_pkts, info.nn_videos, info.nn_samples, info.nn_audios, info.nn_extras, info.nn_paddings, + info.nn_padding_bytes, info.nn_rtp_bytes); #endif return err; } -srs_error_t SrsRtcPlayer::package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets) -{ - srs_error_t err = srs_success; - - SrsRtpRawNALUs* raw = new SrsRtpRawNALUs(); - - for (int i = 0; i < msg->nn_samples(); i++) { - SrsSample* sample = msg->samples() + i; - - // We always ignore bframe here, if config to discard bframe, - // the bframe flag will not be set. - if (sample->bframe) { - continue; - } - - raw->push_back(sample->copy()); - } - - // Ignore empty. - int nn_bytes = raw->nb_bytes(); - if (nn_bytes <= 0) { - srs_freep(raw); - return err; - } - - if (nn_bytes < kRtpMaxPayloadSize) { - // Package NALUs in a single RTP packet. - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - srs_freep(raw); - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - - packet->rtp_header.set_timestamp(msg->timestamp * 90); - packet->rtp_header.set_sequence(video_sequence++); - packet->rtp_header.set_ssrc(video_ssrc); - packet->rtp_header.set_payload_type(video_payload_type); - - packet->payload = raw; - } else { - // We must free it, should never use RTP packets to free it, - // because more than one RTP packet will refer to it. - SrsAutoFree(SrsRtpRawNALUs, raw); - - // Package NALUs in FU-A RTP packets. - int fu_payload_size = kRtpMaxPayloadSize; - - // The first byte is store in FU-A header. - uint8_t header = raw->skip_first_byte(); - uint8_t nal_type = header & kNalTypeMask; - int nb_left = nn_bytes - 1; - - int num_of_packet = 1 + (nn_bytes - 1) / fu_payload_size; - for (int i = 0; i < num_of_packet; ++i) { - int packet_size = srs_min(nb_left, fu_payload_size); - - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - srs_freep(raw); - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - - packet->rtp_header.set_timestamp(msg->timestamp * 90); - packet->rtp_header.set_sequence(video_sequence++); - packet->rtp_header.set_ssrc(video_ssrc); - packet->rtp_header.set_payload_type(video_payload_type); - - SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); - packet->payload = fua; - - fua->nri = (SrsAvcNaluType)header; - fua->nalu_type = (SrsAvcNaluType)nal_type; - fua->start = bool(i == 0); - fua->end = bool(i == num_of_packet - 1); - - if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) { - return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes); - } - - nb_left -= packet_size; - } - } - - if (packets.size() > 0) { - packets.back()->rtp_header.set_marker(true); - } - - return err; -} - -srs_error_t SrsRtcPlayer::package_opus(SrsSample* sample, SrsRtcOutgoingPackets& packets, int nn_max_payload) -{ - srs_error_t err = srs_success; - - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - packet->rtp_header.set_marker(true); - packet->rtp_header.set_timestamp(audio_timestamp); - packet->rtp_header.set_sequence(audio_sequence++); - packet->rtp_header.set_ssrc(audio_ssrc); - packet->rtp_header.set_payload_type(audio_payload_type); - - SrsRtpRawPayload* raw = packet->reuse_raw(); - raw->payload = sample->bytes; - raw->nn_payload = sample->size; - - if (max_padding > 0) { - if (sample->size < nn_max_payload && nn_max_payload - sample->size < max_padding) { - int padding = nn_max_payload - sample->size; - packet->set_padding(padding); - -#if defined(SRS_DEBUG) - srs_trace("#%d, Fast Padding %d bytes %d=>%d, SN=%d, max_payload %d, max_padding %d", packets.debug_id, - padding, sample->size, sample->size + padding, packet->rtp_header.get_sequence(), nn_max_payload, max_padding); -#endif - } - } - - // TODO: FIXME: Why 960? Need Refactoring? - audio_timestamp += 960; - - return err; -} - -srs_error_t SrsRtcPlayer::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcOutgoingPackets& packets) -{ - srs_error_t err = srs_success; - - char* p = sample->bytes + 1; - int nb_left = sample->size - 1; - uint8_t header = sample->bytes[0]; - uint8_t nal_type = header & kNalTypeMask; - - int num_of_packet = 1 + (sample->size - 1) / fu_payload_size; - for (int i = 0; i < num_of_packet; ++i) { - int packet_size = srs_min(nb_left, fu_payload_size); - - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - - packet->rtp_header.set_timestamp(msg->timestamp * 90); - packet->rtp_header.set_sequence(video_sequence++); - packet->rtp_header.set_ssrc(video_ssrc); - packet->rtp_header.set_payload_type(video_payload_type); - - SrsRtpFUAPayload2* fua = packet->reuse_fua(); - - fua->nri = (SrsAvcNaluType)header; - fua->nalu_type = (SrsAvcNaluType)nal_type; - fua->start = bool(i == 0); - fua->end = bool(i == num_of_packet - 1); - - fua->payload = p; - fua->size = packet_size; - - p += packet_size; - nb_left -= packet_size; - } - - return err; -} - -// Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 -srs_error_t SrsRtcPlayer::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcOutgoingPackets& packets) -{ - srs_error_t err = srs_success; - - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - packet->rtp_header.set_timestamp(msg->timestamp * 90); - packet->rtp_header.set_sequence(video_sequence++); - packet->rtp_header.set_ssrc(video_ssrc); - packet->rtp_header.set_payload_type(video_payload_type); - - SrsRtpRawPayload* raw = packet->reuse_raw(); - raw->payload = sample->bytes; - raw->nn_payload = sample->size; - - return err; -} - -srs_error_t SrsRtcPlayer::package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets) -{ - srs_error_t err = srs_success; - - SrsMetaCache* meta = source->cached_meta(); - if (!meta) { - return err; - } - - SrsFormat* format = meta->vsh_format(); - if (!format || !format->vcodec) { - return err; - } - - const vector& sps = format->vcodec->sequenceParameterSetNALUnit; - const vector& pps = format->vcodec->pictureParameterSetNALUnit; - if (sps.empty() || pps.empty()) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty"); - } - - SrsRtpPacket2* packet = packets.fetch(); - if (!packet) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); - } - packet->rtp_header.set_marker(false); - packet->rtp_header.set_timestamp(msg->timestamp * 90); - packet->rtp_header.set_sequence(video_sequence++); - packet->rtp_header.set_ssrc(video_ssrc); - packet->rtp_header.set_payload_type(video_payload_type); - - SrsRtpSTAPPayload* stap = new SrsRtpSTAPPayload(); - packet->payload = stap; - - uint8_t header = sps[0]; - stap->nri = (SrsAvcNaluType)header; - - if (true) { - SrsSample* sample = new SrsSample(); - sample->bytes = (char*)&sps[0]; - sample->size = (int)sps.size(); - stap->nalus.push_back(sample); - } - - if (true) { - SrsSample* sample = new SrsSample(); - sample->bytes = (char*)&pps[0]; - sample->size = (int)pps.size(); - stap->nalus.push_back(sample); - } - - srs_trace("RTC STAP-A seq=%u, sps %d, pps %d bytes", packet->rtp_header.get_sequence(), sps.size(), pps.size()); - - return err; -} - void SrsRtcPlayer::nack_fetch(vector& pkts, uint32_t ssrc, uint16_t seq) { SrsRtpPacket2* pkt = NULL; @@ -1783,9 +1428,7 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, SrsReque return srs_error_wrap(err, "start report_timer"); } - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -2008,7 +1651,8 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf) SrsRtpPacket2* pkt = new SrsRtpPacket2(); pkt->set_decode_handler(this); - pkt->original_bytes = buf; + pkt->shared_msg = new SrsSharedPtrMessage(); + pkt->shared_msg->wrap(buf, nb_buf); SrsBuffer b(buf, nb_buf); if ((err = pkt->decode(&b)) != srs_success) { @@ -2033,7 +1677,7 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf) } } -void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload) +void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) { // No payload, ignore. if (buf->empty()) { @@ -2042,88 +1686,31 @@ void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* bu uint32_t ssrc = pkt->rtp_header.get_ssrc(); if (ssrc == audio_ssrc) { - *ppayload = pkt->reuse_raw(); + *ppayload = new SrsRtpRawPayload(); } else if (ssrc == video_ssrc) { uint8_t v = (uint8_t)pkt->nalu_type; if (v == kStapA) { *ppayload = new SrsRtpSTAPPayload(); } else if (v == kFuA) { - *ppayload = pkt->reuse_fua(); + *ppayload = new SrsRtpFUAPayload2(); } else { - *ppayload = pkt->reuse_raw(); + *ppayload = new SrsRtpRawPayload(); } } } -srs_error_t SrsRtcPublisher::on_rtcp(char* data, int nb_data) -{ - srs_error_t err = srs_success; - - char* ph = data; - int nb_left = nb_data; - while (nb_left) { - uint8_t payload_type = ph[1]; - uint16_t length_4bytes = (((uint16_t)ph[2]) << 8) | ph[3]; - - int length = (length_4bytes + 1) * 4; - - if (length > nb_data) { - return srs_error_new(ERROR_RTC_RTCP, "invalid rtcp packet, length=%u", length); - } - - srs_verbose("on rtcp, payload_type=%u", payload_type); - - switch (payload_type) { - case kSR: { - err = on_rtcp_sr(ph, length); - break; - } - case kRR: { - err = on_rtcp_rr(ph, length); - break; - } - case kSDES: { - break; - } - case kBye: { - break; - } - case kApp: { - break; - } - case kRtpFb: { - err = on_rtcp_feedback(ph, length); - break; - } - case kPsFb: { - err = on_rtcp_ps_feedback(ph, length); - break; - } - case kXR: { - err = on_rtcp_xr(ph, length); - break; - } - default:{ - return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", payload_type); - break; - } - } - - if (err != srs_success) { - return srs_error_wrap(err, "rtcp"); - } - - ph += length; - nb_left -= length; - } - - return err; -} - srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; + pkt->frame_type = SrsFrameTypeAudio; + + // TODO: FIXME: Error check. + source->on_rtp(pkt); + + return err; + + // TODO: FIXME: Directly dispatch to consumer for performance? std::vector frames; if (nack_enabled_) { @@ -2146,50 +1733,13 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt) SrsRtpPacket2* frame = frames[i]; // TODO: FIXME: Check error. - on_audio_frame(frame); + source->on_rtp(frame); - srs_freep(frame); - } - - return err; -} - -srs_error_t SrsRtcPublisher::on_audio_frame(SrsRtpPacket2* frame) -{ - srs_error_t err = srs_success; - - SrsRtpRawPayload* payload = dynamic_cast(frame->payload); - - if (!payload) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "OPUS payload"); - } - - // TODO: FIXME: Transcode OPUS to AAC. - if (!payload->nn_payload) { - return err; - } - - SrsMessageHeader header; - header.message_type = RTMP_MSG_AudioMessage; - // TODO: FIXME: Maybe the tbn is not 90k. - header.timestamp = frame->rtp_header.get_timestamp() / 90; - - SrsSharedPtrMessage msg; - // TODO: FIXME: Check error. - msg.create(&header, NULL, 0); - - SrsSample sample; - sample.size = payload->nn_payload; - sample.bytes = new char[sample.size]; - memcpy((void*)sample.bytes, payload->payload, sample.size); - msg.set_extra_payloads(&sample, 1); - - // TODO: FIXME: Check error. - source->on_rtc_audio(&msg); - - if (nn_audio_frames++ == 0) { - SrsRtpHeader* h = &frame->rtp_header; - srs_trace("RTC got Opus seq=%u, ssrc=%u, ts=%u, %d bytes", h->get_sequence(), h->get_ssrc(), h->get_timestamp(), payload->nn_payload); + if (nn_audio_frames++ == 0) { + SrsRtpHeader* h = &frame->rtp_header; + SrsRtpRawPayload* payload = dynamic_cast(frame->payload); + srs_trace("RTC got Opus seq=%u, ssrc=%u, ts=%u, %d bytes", h->get_sequence(), h->get_ssrc(), h->get_timestamp(), payload->nn_payload); + } } return err; @@ -2197,6 +1747,20 @@ srs_error_t SrsRtcPublisher::on_audio_frame(SrsRtpPacket2* frame) srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) { + srs_error_t err = srs_success; + + pkt->frame_type = SrsFrameTypeVideo; + + // TODO: FIXME: Error check. + source->on_rtp(pkt); + + if (video_queue_->should_request_key_frame()) { + // TODO: FIXME: Check error. + send_rtcp_fb_pli(video_ssrc); + } + + return err; + std::vector frames; if (nack_enabled_) { @@ -2338,6 +1902,71 @@ srs_error_t SrsRtcPublisher::on_video_frame(SrsRtpPacket2* frame) return source->on_video(shared_video); } +srs_error_t SrsRtcPublisher::on_rtcp(char* data, int nb_data) +{ + srs_error_t err = srs_success; + + char* ph = data; + int nb_left = nb_data; + while (nb_left) { + uint8_t payload_type = ph[1]; + uint16_t length_4bytes = (((uint16_t)ph[2]) << 8) | ph[3]; + + int length = (length_4bytes + 1) * 4; + + if (length > nb_data) { + return srs_error_new(ERROR_RTC_RTCP, "invalid rtcp packet, length=%u", length); + } + + srs_verbose("on rtcp, payload_type=%u", payload_type); + + switch (payload_type) { + case kSR: { + err = on_rtcp_sr(ph, length); + break; + } + case kRR: { + err = on_rtcp_rr(ph, length); + break; + } + case kSDES: { + break; + } + case kBye: { + break; + } + case kApp: { + break; + } + case kRtpFb: { + err = on_rtcp_feedback(ph, length); + break; + } + case kPsFb: { + err = on_rtcp_ps_feedback(ph, length); + break; + } + case kXR: { + err = on_rtcp_xr(ph, length); + break; + } + default:{ + return srs_error_new(ERROR_RTC_RTCP_CHECK, "unknown rtcp type=%u", payload_type); + break; + } + } + + if (err != srs_success) { + return srs_error_wrap(err, "rtcp"); + } + + ph += length; + nb_left -= length; + } + + return err; +} + srs_error_t SrsRtcPublisher::on_rtcp_sr(char* buf, int nb_buf) { srs_error_t err = srs_success; @@ -2751,7 +2380,7 @@ int SrsRtcSession::context_id() return cid; } -srs_error_t SrsRtcSession::initialize(SrsSource* source, SrsRequest* r, bool is_publisher, string username, int context_id) +srs_error_t SrsRtcSession::initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, string username, int context_id) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index b47dc3091..f598a5e14 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -50,7 +50,7 @@ class SrsStunPacket; class SrsRtcServer; class SrsRtcSession; class SrsSharedPtrMessage; -class SrsSource; +class SrsRtcSource; class SrsRtpPacket2; class ISrsUdpSender; class SrsRtpQueue; @@ -149,11 +149,11 @@ private: }; // A group of RTP packets for outgoing(send to players). -class SrsRtcOutgoingPackets +// TODO: FIXME: Rename to stat for RTP packets. +class SrsRtcOutgoingInfo { public: bool use_gso; - bool should_merge_nalus; public: #if defined(SRS_DEBUG) // Debug id. @@ -171,9 +171,11 @@ public: // one msghdr by GSO, it's only one RTP packet, because we only send once. int nn_rtp_pkts; // For video, the samples or NALUs. + // TODO: FIXME: Remove it because we may don't know. int nn_samples; // For audio, the generated extra audio packets. // For example, when transcoding AAC to opus, may many extra payloads for a audio. + // TODO: FIXME: Remove it because we may don't know. int nn_extras; // The original audio messages. int nn_audios; @@ -183,20 +185,9 @@ public: int nn_paddings; // The number of dropped messages. int nn_dropped; -private: - int cursor; - int nn_cache; - SrsRtpPacket2* cache; public: - SrsRtcOutgoingPackets(int nn_cache_max); - virtual ~SrsRtcOutgoingPackets(); -public: - void reset(bool gso, bool merge_nalus); - SrsRtpPacket2* fetch(); - SrsRtpPacket2* back(); - int size(); - int capacity(); - SrsRtpPacket2* at(int index); + SrsRtcOutgoingInfo(); + virtual ~SrsRtcOutgoingInfo(); }; class SrsRtcPlayer : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler @@ -223,11 +214,9 @@ private: int nn_simulate_nack_drop; private: // For merged-write and GSO. - bool merge_nalus; bool gso; int max_padding; // For merged-write messages. - srs_utime_t mw_sleep; int mw_msgs; bool realtime; // Whether enabled nack. @@ -251,17 +240,12 @@ public: public: virtual srs_error_t cycle(); private: - srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets); - srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcOutgoingPackets& packets); - srs_error_t send_packets(SrsRtcOutgoingPackets& packets); - srs_error_t send_packets_gso(SrsRtcOutgoingPackets& packets); -private: - srs_error_t package_opus(SrsSample* sample, SrsRtcOutgoingPackets& packets, int nn_max_payload); -private: - srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcOutgoingPackets& packets); - srs_error_t package_nalus(SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets); - srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcOutgoingPackets& packets); - srs_error_t package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcOutgoingPackets& packets); + srs_error_t send_messages(SrsRtcSource* source, std::vector& pkts, SrsRtcOutgoingInfo& info); + srs_error_t messages_to_packets(SrsRtcSource* source, std::vector& pkts, SrsRtcOutgoingInfo& info); + srs_error_t package_opus(SrsRtpPacket2* pkt); + srs_error_t package_video(SrsRtpPacket2* pkt); + srs_error_t send_packets(std::vector& pkts, SrsRtcOutgoingInfo& info); + srs_error_t send_packets_gso(std::vector& pkts, SrsRtcOutgoingInfo& info); public: void nack_fetch(std::vector& pkts, uint32_t ssrc, uint16_t seq); void simulate_nack_drop(int nn); @@ -293,7 +277,7 @@ private: SrsRtpNackForReceiver* audio_nack_; private: SrsRequest* req; - SrsSource* source; + SrsRtcSource* source; // Whether enabled nack. bool nack_enabled_; // Simulators. @@ -313,13 +297,14 @@ private: srs_error_t send_rtcp_fb_pli(uint32_t ssrc); public: srs_error_t on_rtp(char* buf, int nb_buf); - virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload); - srs_error_t on_rtcp(char* data, int nb_data); + virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload); private: srs_error_t on_audio(SrsRtpPacket2* pkt); - srs_error_t on_audio_frame(SrsRtpPacket2* frame); srs_error_t on_video(SrsRtpPacket2* pkt); srs_error_t on_video_frame(SrsRtpPacket2* frame); +public: + srs_error_t on_rtcp(char* data, int nb_data); +private: srs_error_t on_rtcp_sr(char* buf, int nb_buf); srs_error_t on_rtcp_xr(char* buf, int nb_buf); srs_error_t on_rtcp_feedback(char* data, int nb_data); @@ -365,7 +350,7 @@ private: // TODO: FIXME: Support reload. bool encrypt; SrsRequest* req; - SrsSource* source_; + SrsRtcSource* source_; SrsSdp remote_sdp; SrsSdp local_sdp; private: @@ -390,7 +375,7 @@ public: void switch_to_context(); int context_id(); public: - srs_error_t initialize(SrsSource* source, SrsRequest* r, bool is_publisher, std::string username, int context_id); + srs_error_t initialize(SrsRtcSource* source, SrsRequest* r, bool is_publisher, std::string username, int context_id); // The peer address may change, we can identify that by STUN messages. srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r); srs_error_t on_dtls(char* data, int nb_data); diff --git a/trunk/src/app/srs_app_rtc_queue.cpp b/trunk/src/app/srs_app_rtc_queue.cpp index 65da0f5ff..a610e31b1 100644 --- a/trunk/src/app/srs_app_rtc_queue.cpp +++ b/trunk/src/app/srs_app_rtc_queue.cpp @@ -620,12 +620,13 @@ void SrsRtpVideoQueue::covert_frame(std::vector& frame, SrsR // TODO: FIXME: Should covert to multiple NALU RTP packet to avoid copying. SrsRtpPacket2* pkt = new SrsRtpPacket2(); pkt->rtp_header = head->rtp_header; - pkt->padding = head->padding; SrsRtpFUAPayload2* head_payload = dynamic_cast(head->payload); pkt->nalu_type = head_payload->nalu_type; - SrsRtpRawPayload* payload = pkt->reuse_raw(); + SrsRtpRawPayload* payload = new SrsRtpRawPayload(); + pkt->payload = payload; + payload->nn_payload = nn_nalus + 1; payload->payload = new char[payload->nn_payload]; diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index f4d3dcd5a..ea5056528 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -38,6 +37,7 @@ #include #include #include +#include using namespace std; @@ -569,11 +569,8 @@ srs_error_t SrsRtcServer::create_session( ) { srs_error_t err = srs_success; - SrsSource* source = NULL; - - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { + SrsRtcSource* source = NULL; + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { return srs_error_wrap(err, "create source"); } @@ -663,11 +660,8 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req return err; } - SrsSource* source = NULL; - - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { + SrsRtcSource* source = NULL; + if ((err = _srs_rtc_sources->fetch_or_create(req, &source)) != srs_success) { return srs_error_wrap(err, "create source"); } diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp new file mode 100644 index 000000000..e37685bf5 --- /dev/null +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -0,0 +1,1041 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 John + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +const int kChannel = 2; +const int kSamplerate = 48000; + +// An AAC packet may be transcoded to many OPUS packets. +const int kMaxOpusPackets = 8; +// The max size for each OPUS packet. +const int kMaxOpusPacketSize = 4096; + +// The RTP payload max size, reserved some paddings for SRTP as such: +// kRtpPacketSize = kRtpMaxPayloadSize + paddings +// For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400, +// which reserves 100 bytes for SRTP or paddings. +const int kRtpMaxPayloadSize = kRtpPacketSize - 200; + +using namespace std; + +// TODO: Add this function into SrsRtpMux class. +srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf) +{ + srs_error_t err = srs_success; + + if (format->is_aac_sequence_header()) { + return err; + } + + if (format->audio->nb_samples != 1) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "adts"); + } + + int nb_buf = format->audio->samples[0].size + 7; + char* buf = new char[nb_buf]; + SrsBuffer stream(buf, nb_buf); + + // TODO: Add comment. + stream.write_1bytes(0xFF); + stream.write_1bytes(0xF9); + stream.write_1bytes(((format->acodec->aac_object - 1) << 6) | ((format->acodec->aac_sample_rate & 0x0F) << 2) | ((format->acodec->aac_channels & 0x04) >> 2)); + stream.write_1bytes(((format->acodec->aac_channels & 0x03) << 6) | ((nb_buf >> 11) & 0x03)); + stream.write_1bytes((nb_buf >> 3) & 0xFF); + stream.write_1bytes(((nb_buf & 0x07) << 5) | 0x1F); + stream.write_1bytes(0xFC); + + stream.write_bytes(format->audio->samples[0].bytes, format->audio->samples[0].size); + + *pbuf = buf; + *pnn_buf = nb_buf; + + return err; +} + +SrsRtcConsumer::SrsRtcConsumer(SrsRtcSource* s) +{ + source = s; + should_update_source_id = false; + + mw_wait = srs_cond_new(); + mw_min_msgs = 0; + mw_waiting = false; +} + +SrsRtcConsumer::~SrsRtcConsumer() +{ + source->on_consumer_destroy(this); + + vector::iterator it; + for (it = queue.begin(); it != queue.end(); ++it) { + SrsRtpPacket2* pkt = *it; + srs_freep(pkt); + } + + srs_cond_destroy(mw_wait); +} + +void SrsRtcConsumer::update_source_id() +{ + should_update_source_id = true; +} + +srs_error_t SrsRtcConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) +{ + srs_error_t err = srs_success; + return err; +} + +srs_error_t SrsRtcConsumer::enqueue2(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + queue.push_back(pkt); + + if (mw_waiting) { + if ((int)queue.size() > mw_min_msgs) { + srs_cond_signal(mw_wait); + mw_waiting = false; + return err; + } + } + + return err; +} + +srs_error_t SrsRtcConsumer::dump_packets(std::vector& pkts) +{ + srs_error_t err = srs_success; + + if (should_update_source_id) { + srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id()); + should_update_source_id = false; + } + + queue.swap(pkts); + + return err; +} + +void SrsRtcConsumer::wait(int nb_msgs) +{ + mw_min_msgs = nb_msgs; + + // when duration ok, signal to flush. + if ((int)queue.size() > mw_min_msgs) { + return; + } + + // the enqueue will notify this cond. + mw_waiting = true; + + // use cond block wait for high performance mode. + srs_cond_wait(mw_wait); +} + +SrsRtcSourceManager::SrsRtcSourceManager() +{ + lock = NULL; +} + +SrsRtcSourceManager::~SrsRtcSourceManager() +{ + srs_mutex_destroy(lock); +} + +srs_error_t SrsRtcSourceManager::fetch_or_create(SrsRequest* r, SrsRtcSource** pps) +{ + srs_error_t err = srs_success; + + // Lazy create lock, because ST is not ready in SrsRtcSourceManager constructor. + if (!lock) { + lock = srs_mutex_new(); + } + + // Use lock to protect coroutine switch. + // @bug https://github.com/ossrs/srs/issues/1230 + SrsLocker(lock); + + SrsRtcSource* source = NULL; + if ((source = fetch(r)) != NULL) { + *pps = source; + return err; + } + + string stream_url = r->get_stream_url(); + string vhost = r->vhost; + + // should always not exists for create a source. + srs_assert (pool.find(stream_url) == pool.end()); + + srs_trace("new source, stream_url=%s", stream_url.c_str()); + + source = new SrsRtcSource(); + if ((err = source->initialize(r)) != srs_success) { + return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); + } + + pool[stream_url] = source; + + *pps = source; + + return err; +} + +SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r) +{ + SrsRtcSource* source = NULL; + + string stream_url = r->get_stream_url(); + if (pool.find(stream_url) == pool.end()) { + return NULL; + } + + source = pool[stream_url]; + + // we always update the request of resource, + // for origin auth is on, the token in request maybe invalid, + // and we only need to update the token of request, it's simple. + source->update_auth(r); + + return source; +} + +SrsRtcSourceManager* _srs_rtc_sources = new SrsRtcSourceManager(); + +SrsRtcSource::SrsRtcSource() +{ + _source_id = _pre_source_id = -1; + _can_publish = true; + rtc_publisher_ = NULL; + + req = NULL; + bridger_ = new SrsRtcFromRtmpBridger(this); + format = new SrsRtmpFormat(); + meta = new SrsMetaCache(); +} + +SrsRtcSource::~SrsRtcSource() +{ + // never free the consumers, + // for all consumers are auto free. + consumers.clear(); + + srs_freep(req); + srs_freep(bridger_); + srs_freep(format); + srs_freep(meta); +} + +srs_error_t SrsRtcSource::initialize(SrsRequest* r) +{ + srs_error_t err = srs_success; + + req = r->copy(); + + if ((err = bridger_->initialize(req)) != srs_success) { + return srs_error_wrap(err, "bridge initialize"); + } + + if ((err = format->initialize()) != srs_success) { + return srs_error_wrap(err, "format initialize"); + } + + return err; +} + +void SrsRtcSource::update_auth(SrsRequest* r) +{ + req->update_auth(r); +} + +srs_error_t SrsRtcSource::on_source_id_changed(int id) +{ + srs_error_t err = srs_success; + + if (_source_id == id) { + return err; + } + + if (_pre_source_id == -1) { + _pre_source_id = id; + } else if (_pre_source_id != _source_id) { + _pre_source_id = _source_id; + } + + _source_id = id; + + // notice all consumer + std::vector::iterator it; + for (it = consumers.begin(); it != consumers.end(); ++it) { + SrsRtcConsumer* consumer = *it; + consumer->update_source_id(); + } + + return err; +} + +int SrsRtcSource::source_id() +{ + return _source_id; +} + +int SrsRtcSource::pre_source_id() +{ + return _pre_source_id; +} + +ISrsSourceBridger* SrsRtcSource::bridger() +{ + return bridger_; +} + +SrsMetaCache* SrsRtcSource::cached_meta() +{ + return meta; +} + +srs_error_t SrsRtcSource::create_consumer(SrsRtcConsumer*& consumer) +{ + srs_error_t err = srs_success; + + consumer = new SrsRtcConsumer(this); + consumers.push_back(consumer); + + // TODO: FIXME: Implements edge cluster. + + return err; +} + +srs_error_t SrsRtcSource::consumer_dumps(SrsRtcConsumer* consumer, bool ds, bool dm, bool dg) +{ + srs_error_t err = srs_success; + + // print status. + srs_trace("create consumer, no gop cache"); + + return err; +} + +void SrsRtcSource::on_consumer_destroy(SrsRtcConsumer* consumer) +{ + std::vector::iterator it; + it = std::find(consumers.begin(), consumers.end(), consumer); + if (it != consumers.end()) { + consumers.erase(it); + } +} + +bool SrsRtcSource::can_publish(bool is_edge) +{ + return _can_publish; +} + +srs_error_t SrsRtcSource::on_publish() +{ + srs_error_t err = srs_success; + + // update the request object. + srs_assert(req); + + _can_publish = false; + + // whatever, the publish thread is the source or edge source, + // save its id to srouce id. + if ((err = on_source_id_changed(_srs_context->get_id())) != srs_success) { + return srs_error_wrap(err, "source id change"); + } + + // Reset the metadata cache, to make VLC happy when disable/enable stream. + // @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448 + meta->clear(); + + // TODO: FIXME: Handle by statistic. + + return err; +} + +void SrsRtcSource::on_unpublish() +{ + // ignore when already unpublished. + if (_can_publish) { + return; + } + + // Reset the metadata cache, to make VLC happy when disable/enable stream. + // @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448 + meta->update_previous_vsh(); + meta->update_previous_ash(); + + srs_trace("cleanup when unpublish"); + + _can_publish = true; + _source_id = -1; + + // TODO: FIXME: Handle by statistic. +} + +SrsRtcPublisher* SrsRtcSource::rtc_publisher() +{ + return rtc_publisher_; +} + +void SrsRtcSource::set_rtc_publisher(SrsRtcPublisher* v) +{ + rtc_publisher_ = v; +} + +srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + SrsAutoFree(SrsRtpPacket2, pkt); + + for (int i = 0; i < (int)consumers.size(); i++) { + SrsRtcConsumer* consumer = consumers.at(i); + if ((err = consumer->enqueue2(pkt->copy())) != srs_success) { + return srs_error_wrap(err, "consume message"); + } + } + + return err; +} + +srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg) +{ + srs_error_t err = srs_success; + + for (int i = 0; i < (int)consumers.size(); i++) { + SrsRtcConsumer* consumer = consumers.at(i); + if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) { + return srs_error_wrap(err, "consume message"); + } + } + + return err; +} + +srs_error_t SrsRtcSource::on_video(SrsCommonMessage* shared_video) +{ + srs_error_t err = srs_success; + + // convert shared_video to msg, user should not use shared_video again. + // the payload is transfer to msg, and set to NULL in shared_video. + SrsSharedPtrMessage msg; + if ((err = msg.create(shared_video)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + + bool is_sequence_header = SrsFlvVideo::sh(msg.payload, msg.size); + if (is_sequence_header && (err = meta->update_vsh(&msg)) != srs_success) { + return srs_error_wrap(err, "meta update video"); + } + + // user can disable the sps parse to workaround when parse sps failed. + // @see https://github.com/ossrs/srs/issues/474 + if (is_sequence_header) { + format->avc_parse_sps = _srs_config->get_parse_sps(req->vhost); + } + + if ((err = format->on_video(&msg)) != srs_success) { + return srs_error_wrap(err, "format consume video"); + } + + if ((err = filter(&msg, format)) != srs_success) { + return srs_error_wrap(err, "filter video"); + } + + // directly process the video message. + return on_video_imp(&msg); +} + +srs_error_t SrsRtcSource::on_video_imp(SrsSharedPtrMessage* msg) +{ + srs_error_t err = srs_success; + + // copy to all consumer + for (int i = 0; i < (int)consumers.size(); i++) { + SrsRtcConsumer* consumer = consumers.at(i); + if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) { + return srs_error_wrap(err, "consume video"); + } + } + + return err; +} + +srs_error_t SrsRtcSource::filter(SrsSharedPtrMessage* shared_frame, SrsFormat* format) +{ + srs_error_t err = srs_success; + + // If IDR, we will insert SPS/PPS before IDR frame. + if (format->video && format->video->has_idr) { + shared_frame->set_has_idr(true); + } + + // Update samples to shared frame. + for (int i = 0; i < format->video->nb_samples; ++i) { + SrsSample* sample = &format->video->samples[i]; + + // Because RTC does not support B-frame, so we will drop them. + // TODO: Drop B-frame in better way, which not cause picture corruption. + if (true) { + if ((err = sample->parse_bframe()) != srs_success) { + return srs_error_wrap(err, "parse bframe"); + } + if (sample->bframe) { + continue; + } + } + } + + if (format->video->nb_samples <= 0) { + return err; + } + + shared_frame->set_samples(format->video->samples, format->video->nb_samples); + + return err; +} + +SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcSource* source) +{ + req = NULL; + source_ = source; + format = new SrsRtmpFormat(); + codec = new SrsAudioRecode(kChannel, kSamplerate); + discard_aac = false; + discard_bframe = false; + merge_nalus = false; +} + +SrsRtcFromRtmpBridger::~SrsRtcFromRtmpBridger() +{ + srs_freep(format); + srs_freep(codec); +} + +srs_error_t SrsRtcFromRtmpBridger::initialize(SrsRequest* r) +{ + srs_error_t err = srs_success; + + req = r; + + if ((err = format->initialize()) != srs_success) { + return srs_error_wrap(err, "format initialize"); + } + + if ((err = codec->initialize()) != srs_success) { + return srs_error_wrap(err, "init codec"); + } + + // TODO: FIXME: Support reload. + discard_aac = _srs_config->get_rtc_aac_discard(req->vhost); + discard_bframe = _srs_config->get_rtc_bframe_discard(req->vhost); + merge_nalus = _srs_config->get_rtc_server_merge_nalus(); + srs_trace("RTC bridge from RTMP, discard_aac=%d, discard_bframe=%d, merge_nalus=%d", + discard_aac, discard_bframe, merge_nalus); + + return err; +} + +srs_error_t SrsRtcFromRtmpBridger::on_publish() +{ + srs_error_t err = srs_success; + + // TODO: FIXME: Should sync with bridger? + if ((err = source_->on_publish()) != srs_success) { + return srs_error_wrap(err, "source publish"); + } + + return err; +} + +void SrsRtcFromRtmpBridger::on_unpublish() +{ + // TODO: FIXME: Should sync with bridger? + source_->on_unpublish(); +} + +srs_error_t SrsRtcFromRtmpBridger::on_audio(SrsSharedPtrMessage* msg) +{ + srs_error_t err = srs_success; + + // TODO: FIXME: Support parsing OPUS for RTC. + if ((err = format->on_audio(msg)) != srs_success) { + return srs_error_wrap(err, "format consume audio"); + } + + // Ignore if no format->acodec, it means the codec is not parsed, or unknown codec. + // @issue https://github.com/ossrs/srs/issues/1506#issuecomment-562079474 + if (!format->acodec) { + return err; + } + + // ts support audio codec: aac/mp3 + SrsAudioCodecId acodec = format->acodec->id; + if (acodec != SrsAudioCodecIdAAC && acodec != SrsAudioCodecIdMP3) { + return err; + } + + // When drop aac audio packet, never transcode. + if (discard_aac && acodec == SrsAudioCodecIdAAC) { + return err; + } + + // ignore sequence header + srs_assert(format->audio); + + char* adts_audio = NULL; + int nn_adts_audio = 0; + // TODO: FIXME: Reserve 7 bytes header when create shared message. + if ((err = aac_raw_append_adts_header(msg, format, &adts_audio, &nn_adts_audio)) != srs_success) { + return srs_error_wrap(err, "aac append header"); + } + + if (adts_audio) { + err = transcode(adts_audio, nn_adts_audio); + srs_freep(adts_audio); + } + + return err; +} + +srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio) +{ + srs_error_t err = srs_success; + + // Opus packet cache. + static char* opus_payloads[kMaxOpusPackets]; + + static bool initialized = false; + if (!initialized) { + initialized = true; + + static char opus_packets_cache[kMaxOpusPackets][kMaxOpusPacketSize]; + opus_payloads[0] = &opus_packets_cache[0][0]; + for (int i = 1; i < kMaxOpusPackets; i++) { + opus_payloads[i] = opus_packets_cache[i]; + } + } + + // Transcode an aac packet to many opus packets. + SrsSample aac; + aac.bytes = adts_audio; + aac.size = nn_adts_audio; + + int nn_opus_packets = 0; + int opus_sizes[kMaxOpusPackets]; + if ((err = codec->transcode(&aac, opus_payloads, opus_sizes, nn_opus_packets)) != srs_success) { + return srs_error_wrap(err, "recode error"); + } + + // Save OPUS packets in shared message. + if (nn_opus_packets <= 0) { + return err; + } + + int nn_max_extra_payload = 0; + for (int i = 0; i < nn_opus_packets; i++) { + char* data = (char*)opus_payloads[i]; + int size = (int)opus_sizes[i]; + + // TODO: FIXME: Use it to padding audios. + nn_max_extra_payload = srs_max(nn_max_extra_payload, size); + + SrsRtpPacket2* pkt = NULL; + if ((err = package_opus(data, size, &pkt)) != srs_success) { + return srs_error_wrap(err, "package opus"); + } + + if ((err = source_->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "consume opus"); + } + } + + return err; +} + +srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPacket2** ppkt) +{ + srs_error_t err = srs_success; + + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->frame_type = SrsFrameTypeAudio; + pkt->rtp_header.set_marker(true); + + SrsRtpRawPayload* raw = new SrsRtpRawPayload(); + pkt->payload = raw; + + raw->payload = new char[size]; + raw->nn_payload = size; + memcpy(raw->payload, data, size); + + pkt->shared_msg = new SrsSharedPtrMessage(); + pkt->shared_msg->wrap(raw->payload, size); + + *ppkt = pkt; + + return err; +} + +srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg) +{ + srs_error_t err = srs_success; + + // cache the sequence header if h264 + bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size); + SrsMetaCache* meta = source_->cached_meta(); + if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) { + return srs_error_wrap(err, "meta update video"); + } + + if ((err = format->on_video(msg)) != srs_success) { + return srs_error_wrap(err, "format consume video"); + } + + if ((err = filter(msg, format)) != srs_success) { + return srs_error_wrap(err, "filter video"); + } + + return source_->on_video_imp(msg); +} + +srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* format) +{ + srs_error_t err = srs_success; + + // If IDR, we will insert SPS/PPS before IDR frame. + if (format->video && format->video->has_idr) { + msg->set_has_idr(true); + } + + // Update samples to shared frame. + for (int i = 0; i < format->video->nb_samples; ++i) { + SrsSample* sample = &format->video->samples[i]; + + // Because RTC does not support B-frame, so we will drop them. + // TODO: Drop B-frame in better way, which not cause picture corruption. + if (discard_bframe) { + if ((err = sample->parse_bframe()) != srs_success) { + return srs_error_wrap(err, "parse bframe"); + } + if (sample->bframe) { + continue; + } + } + } + + if (format->video->nb_samples <= 0) { + return err; + } + + // TODO: FIXME: Directly covert samples to RTP packets. + msg->set_samples(format->video->samples, format->video->nb_samples); + int nn_samples = format->video->nb_samples; + + // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. + if (msg->has_idr()) { + SrsRtpPacket2* pkt = NULL; + if ((err = package_stap_a(source_, msg, &pkt)) != srs_success) { + return srs_error_wrap(err, "package stap-a"); + } + + if ((err = source_->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "consume sps/pps"); + } + } + + // If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet. + vector pkts; + if (merge_nalus && nn_samples > 1) { + if ((err = package_nalus(msg, pkts)) != srs_success) { + return srs_error_wrap(err, "package nalus as one"); + } + } + + // By default, we package each NALU(sample) to a RTP or FUA packet. + for (int i = 0; i < nn_samples; i++) { + SrsSample* sample = msg->samples() + i; + + // We always ignore bframe here, if config to discard bframe, + // the bframe flag will not be set. + if (sample->bframe) { + continue; + } + + if (sample->size <= kRtpMaxPayloadSize) { + if ((err = package_single_nalu(msg, sample, pkts)) != srs_success) { + return srs_error_wrap(err, "package single nalu"); + } + } else { + if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, pkts)) != srs_success) { + return srs_error_wrap(err, "package fu-a"); + } + } + } + + if (pkts.size() > 0) { + pkts.back()->rtp_header.set_marker(true); + } + + return consume_packets(pkts); +} + +srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt) +{ + srs_error_t err = srs_success; + + SrsMetaCache* meta = source->cached_meta(); + if (!meta) { + return err; + } + + SrsFormat* format = meta->vsh_format(); + if (!format || !format->vcodec) { + return err; + } + + // Note that the sps/pps may change, so we should copy it. + const vector& sps = format->vcodec->sequenceParameterSetNALUnit; + const vector& pps = format->vcodec->pictureParameterSetNALUnit; + if (sps.empty() || pps.empty()) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty"); + } + + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->frame_type = SrsFrameTypeVideo; + pkt->rtp_header.set_marker(false); + pkt->rtp_header.set_timestamp(msg->timestamp * 90); + + SrsRtpSTAPPayload* stap = new SrsRtpSTAPPayload(); + pkt->payload = stap; + + uint8_t header = sps[0]; + stap->nri = (SrsAvcNaluType)header; + + // Copy the SPS/PPS bytes, because it may change. + int size = (int)(sps.size() + pps.size()); + char* payload = new char[size]; + pkt->shared_msg = new SrsSharedPtrMessage(); + pkt->shared_msg->wrap(payload, size); + + if (true) { + SrsSample* sample = new SrsSample(); + sample->bytes = payload; + sample->size = (int)sps.size(); + stap->nalus.push_back(sample); + + memcpy(payload, (char*)&sps[0], sps.size()); + payload += (int)sps.size(); + } + + if (true) { + SrsSample* sample = new SrsSample(); + sample->bytes = payload; + sample->size = (int)pps.size(); + stap->nalus.push_back(sample); + + memcpy(payload, (char*)&pps[0], pps.size()); + payload += (int)pps.size(); + } + + *ppkt = pkt; + srs_trace("RTC STAP-A seq=%u, sps %d, pps %d bytes", pkt->rtp_header.get_sequence(), sps.size(), pps.size()); + + return err; +} + +srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, vector& pkts) +{ + srs_error_t err = srs_success; + + SrsRtpRawNALUs* raw = new SrsRtpRawNALUs(); + + for (int i = 0; i < msg->nn_samples(); i++) { + SrsSample* sample = msg->samples() + i; + + // We always ignore bframe here, if config to discard bframe, + // the bframe flag will not be set. + if (sample->bframe) { + continue; + } + + raw->push_back(sample->copy()); + } + + // Ignore empty. + int nn_bytes = raw->nb_bytes(); + if (nn_bytes <= 0) { + srs_freep(raw); + return err; + } + + if (nn_bytes < kRtpMaxPayloadSize) { + // Package NALUs in a single RTP packet. + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->frame_type = SrsFrameTypeVideo; + pkt->rtp_header.set_timestamp(msg->timestamp * 90); + pkt->payload = raw; + pkt->shared_msg = msg->copy(); + pkts.push_back(pkt); + } else { + // We must free it, should never use RTP packets to free it, + // because more than one RTP packet will refer to it. + SrsAutoFree(SrsRtpRawNALUs, raw); + + // Package NALUs in FU-A RTP packets. + int fu_payload_size = kRtpMaxPayloadSize; + + // The first byte is store in FU-A header. + uint8_t header = raw->skip_first_byte(); + uint8_t nal_type = header & kNalTypeMask; + int nb_left = nn_bytes - 1; + + int num_of_packet = 1 + (nn_bytes - 1) / fu_payload_size; + for (int i = 0; i < num_of_packet; ++i) { + int packet_size = srs_min(nb_left, fu_payload_size); + + SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); + if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) { + srs_freep(fua); + return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes); + } + + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->frame_type = SrsFrameTypeVideo; + pkt->rtp_header.set_timestamp(msg->timestamp * 90); + + fua->nri = (SrsAvcNaluType)header; + fua->nalu_type = (SrsAvcNaluType)nal_type; + fua->start = bool(i == 0); + fua->end = bool(i == num_of_packet - 1); + + pkt->payload = fua; + pkt->shared_msg = msg->copy(); + pkts.push_back(pkt); + + nb_left -= packet_size; + } + } + + return err; +} + +// Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 +srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector& pkts) +{ + srs_error_t err = srs_success; + + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->frame_type = SrsFrameTypeVideo; + pkt->rtp_header.set_timestamp(msg->timestamp * 90); + + SrsRtpRawPayload* raw = new SrsRtpRawPayload(); + pkt->payload = raw; + + raw->payload = sample->bytes; + raw->nn_payload = sample->size; + + pkt->shared_msg = msg->copy(); + pkts.push_back(pkt); + + return err; +} + +srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector& pkts) +{ + srs_error_t err = srs_success; + + char* p = sample->bytes + 1; + int nb_left = sample->size - 1; + uint8_t header = sample->bytes[0]; + uint8_t nal_type = header & kNalTypeMask; + + int num_of_packet = 1 + (sample->size - 1) / fu_payload_size; + for (int i = 0; i < num_of_packet; ++i) { + int packet_size = srs_min(nb_left, fu_payload_size); + + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->frame_type = SrsFrameTypeVideo; + pkt->rtp_header.set_timestamp(msg->timestamp * 90); + + SrsRtpFUAPayload2* fua = new SrsRtpFUAPayload2(); + pkt->payload = fua; + + fua->nri = (SrsAvcNaluType)header; + fua->nalu_type = (SrsAvcNaluType)nal_type; + fua->start = bool(i == 0); + fua->end = bool(i == num_of_packet - 1); + + fua->payload = p; + fua->size = packet_size; + + pkt->shared_msg = msg->copy(); + pkts.push_back(pkt); + + p += packet_size; + nb_left -= packet_size; + } + + return err; +} + +srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector& pkts) +{ + srs_error_t err = srs_success; + + // TODO: FIXME: Consume a range of packets. + int i = 0; + for (; i < (int)pkts.size(); i++) { + SrsRtpPacket2* pkt = pkts[i]; + + if ((err = source_->on_rtp(pkt)) != srs_success) { + err = srs_error_wrap(err, "consume sps/pps"); + break; + } + } + + for (; i < (int)pkts.size(); i++) { + SrsRtpPacket2* pkt = pkts[i]; + srs_freep(pkt); + } + + return err; +} + diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp new file mode 100644 index 000000000..ea1c5ac48 --- /dev/null +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -0,0 +1,201 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 John + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_RTC_SOURCE_HPP +#define SRS_APP_RTC_SOURCE_HPP + +#include + +#include +#include + +#include +#include + +class SrsRequest; +class SrsConnection; +class SrsMetaCache; +class SrsRtcPublisher; +class SrsSharedPtrMessage; +class SrsCommonMessage; +class SrsMessageArray; +class SrsRtcSource; +class SrsRtcFromRtmpBridger; +class SrsAudioRecode; +class SrsRtpPacket2; +class SrsSample; + +class SrsRtcConsumer : public ISrsConsumerQueue +{ +private: + SrsRtcSource* source; + std::vector queue; + // when source id changed, notice all consumers + bool should_update_source_id; + // The cond wait for mw. + // @see https://github.com/ossrs/srs/issues/251 + srs_cond_t mw_wait; + bool mw_waiting; + int mw_min_msgs; +public: + SrsRtcConsumer(SrsRtcSource* s); + virtual ~SrsRtcConsumer(); +public: + // When source id changed, notice client to print. + virtual void update_source_id(); + // Put or get RTP packet in queue. + virtual srs_error_t enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag); + srs_error_t enqueue2(SrsRtpPacket2* pkt); + virtual srs_error_t dump_packets(std::vector& pkts); + // Wait for at-least some messages incoming in queue. + virtual void wait(int nb_msgs); +}; + +class SrsRtcSourceManager +{ +private: + srs_mutex_t lock; + std::map pool; +public: + SrsRtcSourceManager(); + virtual ~SrsRtcSourceManager(); +public: + // create source when fetch from cache failed. + // @param r the client request. + // @param pps the matched source, if success never be NULL. + virtual srs_error_t fetch_or_create(SrsRequest* r, SrsRtcSource** pps); +private: + // Get the exists source, NULL when not exists. + // update the request and return the exists source. + virtual SrsRtcSource* fetch(SrsRequest* r); +}; + +// Global singleton instance. +extern SrsRtcSourceManager* _srs_rtc_sources; + +class SrsRtcSource +{ +private: + // For publish, it's the publish client id. + // For edge, it's the edge ingest id. + // when source id changed, for example, the edge reconnect, + // invoke the on_source_id_changed() to let all clients know. + int _source_id; + // previous source id. + int _pre_source_id; + SrsRequest* req; + SrsRtcPublisher* rtc_publisher_; + // Transmux RTMP to RTC. + SrsRtcFromRtmpBridger* bridger_; + // The metadata cache. + SrsMetaCache* meta; +private: + // To delivery stream to clients. + std::vector consumers; + // Whether source is avaiable for publishing. + bool _can_publish; +public: + SrsRtcSource(); + virtual ~SrsRtcSource(); +public: + virtual srs_error_t initialize(SrsRequest* r); + // Update the authentication information in request. + virtual void update_auth(SrsRequest* r); + // The source id changed. + virtual srs_error_t on_source_id_changed(int id); + // Get current source id. + virtual int source_id(); + virtual int pre_source_id(); + // Get the bridger. + ISrsSourceBridger* bridger(); + // For RTC, we need to package SPS/PPS(in cached meta) before each IDR. + SrsMetaCache* cached_meta(); +public: + // Create consumer + // @param consumer, output the create consumer. + virtual srs_error_t create_consumer(SrsRtcConsumer*& consumer); + // Dumps packets in cache to consumer. + // @param ds, whether dumps the sequence header. + // @param dm, whether dumps the metadata. + // @param dg, whether dumps the gop cache. + virtual srs_error_t consumer_dumps(SrsRtcConsumer* consumer, bool ds = true, bool dm = true, bool dg = true); + virtual void on_consumer_destroy(SrsRtcConsumer* consumer); + // TODO: FIXME: Remove the param is_edge. + virtual bool can_publish(bool is_edge); + // When start publish stream. + virtual srs_error_t on_publish(); + // When stop publish stream. + virtual void on_unpublish(); +public: + // Get and set the publisher, passed to consumer to process requests such as PLI. + SrsRtcPublisher* rtc_publisher(); + void set_rtc_publisher(SrsRtcPublisher* v); + srs_error_t on_rtp(SrsRtpPacket2* pkt); + virtual srs_error_t on_audio_imp(SrsSharedPtrMessage* audio); + // When got RTC audio message, which is encoded in opus. + // TODO: FIXME: Merge with on_audio. + virtual srs_error_t on_video(SrsCommonMessage* video); + virtual srs_error_t on_video_imp(SrsSharedPtrMessage* video); +private: + // The format, codec information. + // TODO: FIXME: Remove it. + SrsRtmpFormat* format; + srs_error_t filter(SrsSharedPtrMessage* shared_video, SrsFormat* format); +}; + +class SrsRtcFromRtmpBridger : public ISrsSourceBridger +{ +private: + SrsRequest* req; + SrsRtcSource* source_; + // The format, codec information. + SrsRtmpFormat* format; +private: + bool discard_aac; + SrsAudioRecode* codec; + bool discard_bframe; + bool merge_nalus; +public: + SrsRtcFromRtmpBridger(SrsRtcSource* source); + virtual ~SrsRtcFromRtmpBridger(); +public: + virtual srs_error_t initialize(SrsRequest* r); + virtual srs_error_t on_publish(); + virtual void on_unpublish(); + virtual srs_error_t on_audio(SrsSharedPtrMessage* msg); +private: + srs_error_t transcode(char* adts_audio, int nn_adts_audio); + srs_error_t package_opus(char* data, int size, SrsRtpPacket2** ppkt); +public: + virtual srs_error_t on_video(SrsSharedPtrMessage* msg); +private: + srs_error_t filter(SrsSharedPtrMessage* msg, SrsFormat* format); + srs_error_t package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt); + srs_error_t package_nalus(SrsSharedPtrMessage* msg, std::vector& pkts); + srs_error_t package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector& pkts); + srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& pkts); + srs_error_t consume_packets(std::vector& pkts); +}; + +#endif + diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index d881ef140..6c3e888f7 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -50,10 +50,7 @@ using namespace std; #include #include #include -#ifdef SRS_RTC -#include -#include -#endif +#include #define CONST_MAX_JITTER_MS 250 #define CONST_MAX_JITTER_MS_NEG -250 @@ -270,17 +267,11 @@ void SrsMessageQueue::set_queue_size(srs_utime_t queue_size) max_queue_size = queue_size; } -srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow, bool pass_timestamp) +srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) { srs_error_t err = srs_success; msgs.push_back(msg); - - // For RTC, we never care about the timestamp and duration, so we never shrink queue here, - // but we will drop messages in each consumer coroutine. - if (pass_timestamp) { - return err; - } if (msg->is_av()) { if (av_start_time == -1) { @@ -289,6 +280,10 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS); } + + if (max_queue_size <= 0) { + return err; + } while (av_end_time - av_start_time > max_queue_size) { // notice the caller queue already overflow and shrinked. @@ -302,7 +297,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow return err; } -srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp) +srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) { srs_error_t err = srs_success; @@ -317,13 +312,9 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p SrsSharedPtrMessage** omsgs = msgs.data(); memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*)); - // For RTC, we enable pass_timestamp mode, which never care about the timestamp and duration, - // so we do not have to update the start time here. - if (!pass_timestamp) { - SrsSharedPtrMessage* last = omsgs[count - 1]; - av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS); - } - + SrsSharedPtrMessage* last = omsgs[count - 1]; + av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS); + if (count >= nb_msgs) { // the pmsgs is big enough and clear msgs at most time. msgs.clear(); @@ -427,6 +418,14 @@ ISrsWakable::~ISrsWakable() { } +ISrsConsumerQueue::ISrsConsumerQueue() +{ +} + +ISrsConsumerQueue::~ISrsConsumerQueue() +{ +} + SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c) { source = s; @@ -442,8 +441,6 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c) mw_duration = 0; mw_waiting = false; #endif - - pass_timestamp = false; } SrsConsumer::~SrsConsumer() @@ -457,11 +454,6 @@ SrsConsumer::~SrsConsumer() #endif } -void SrsConsumer::enable_pass_timestamp() -{ - pass_timestamp = true; -} - void SrsConsumer::set_queue_size(srs_utime_t queue_size) { queue->set_queue_size(queue_size); @@ -483,33 +475,19 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR SrsSharedPtrMessage* msg = shared_msg->copy(); - // For RTC, we enable pass_timestamp mode, which never correct or depends on monotonic increasing of - // timestamp. And in RTC, the audio and video timebase can be different, so we ignore time_jitter here. - if (!pass_timestamp && !atc) { + if (!atc) { if ((err = jitter->correct(msg, ag)) != srs_success) { return srs_error_wrap(err, "consume message"); } } - // Put message in queue, here we may enable pass_timestamp mode. - if ((err = queue->enqueue(msg, NULL, pass_timestamp)) != srs_success) { + if ((err = queue->enqueue(msg, NULL)) != srs_success) { return srs_error_wrap(err, "enqueue message"); } #ifdef SRS_PERF_QUEUE_COND_WAIT // fire the mw when msgs is enough. if (mw_waiting) { - // For RTC, we use pass_timestamp mode, we don't care about the timestamp in queue, - // so we only check the messages in queue. - if (pass_timestamp) { - if (queue->size() > mw_min_msgs) { - srs_cond_signal(mw_wait); - mw_waiting = false; - return err; - } - return err; - } - // For RTMP, we wait for messages and duration. srs_utime_t duration = queue->duration(); bool match_min_msgs = queue->size() > mw_min_msgs; @@ -560,7 +538,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) } // pump msgs from queue. - if ((err = queue->dump_packets(max, msgs->msgs, count, pass_timestamp)) != srs_success) { + if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) { return srs_error_wrap(err, "dump packets"); } @@ -859,9 +837,6 @@ SrsOriginHub::SrsOriginHub() dash = new SrsDash(); dvr = new SrsDvr(); encoder = new SrsEncoder(); -#ifdef SRS_RTC - rtc = new SrsRtc(); -#endif #ifdef SRS_HDS hds = new SrsHds(); #endif @@ -905,12 +880,6 @@ srs_error_t SrsOriginHub::initialize(SrsSource* s, SrsRequest* r) if ((err = format->initialize()) != srs_success) { return srs_error_wrap(err, "format initialize"); } - -#ifdef SRS_RTC - if ((err = rtc->initialize(this, req)) != srs_success) { - return srs_error_wrap(err, "rtc initialize"); - } -#endif if ((err = hls->initialize(this, req)) != srs_success) { return srs_error_wrap(err, "hls initialize"); @@ -1010,15 +979,6 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type], srs_flv_srates[c->sound_rate]); } - -#ifdef SRS_RTC - // TODO: FIXME: Support parsing OPUS for RTC. - if ((err = rtc->on_audio(msg, format)) != srs_success) { - srs_warn("rtc: ignore audio error %s", srs_error_desc(err).c_str()); - srs_error_reset(err); - rtc->on_unpublish(); - } -#endif if ((err = hls->on_audio(msg, format)) != srs_success) { // apply the error strategy for hls. @@ -1112,16 +1072,6 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se if (format->vcodec && !format->vcodec->is_avc_codec_ok()) { return err; } - -#ifdef SRS_RTC - // Parse RTMP message to RTP packets, in FU-A if too large. - if ((err = rtc->on_video(msg, format)) != srs_success) { - // TODO: We should support more strategies. - srs_warn("rtc: ignore video error %s", srs_error_desc(err).c_str()); - srs_error_reset(err); - rtc->on_unpublish(); - } -#endif if ((err = hls->on_video(msg, format)) != srs_success) { // TODO: We should support more strategies. @@ -1191,12 +1141,6 @@ srs_error_t SrsOriginHub::on_publish() return srs_error_wrap(err, "encoder publish"); } -#ifdef SRS_RTC - if ((err = rtc->on_publish()) != srs_success) { - return srs_error_wrap(err, "rtc publish"); - } -#endif - if ((err = hls->on_publish()) != srs_success) { return srs_error_wrap(err, "hls publish"); } @@ -1234,9 +1178,6 @@ void SrsOriginHub::on_unpublish() destroy_forwarders(); encoder->on_unpublish(); -#ifdef SRS_RTC - rtc->on_unpublish(); -#endif hls->on_unpublish(); dash->on_unpublish(); dvr->on_unpublish(); @@ -1629,7 +1570,7 @@ SrsFormat* SrsMetaCache::ash_format() return aformat; } -srs_error_t SrsMetaCache::dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds) +srs_error_t SrsMetaCache::dumps(ISrsConsumerQueue* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds) { srs_error_t err = srs_success; @@ -1775,6 +1716,7 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* // Use lock to protect coroutine switch. // @bug https://github.com/ossrs/srs/issues/1230 + // TODO: FIXME: Use smaller lock. SrsLocker(lock); SrsSource* source = NULL; @@ -1789,17 +1731,41 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); +#ifdef SRS_RTC + bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); + bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost); + + // Get the RTC source and bridger. + SrsRtcSource* rtc = NULL; + if (rtc_server_enabled && rtc_enabled) { + if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) { + err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str()); + goto failed; + } + } +#endif srs_trace("new source, stream_url=%s", stream_url.c_str()); - + source = new SrsSource(); if ((err = source->initialize(r, h)) != srs_success) { - return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); + err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); + goto failed; } + +#ifdef SRS_RTC + // If rtc enabled, bridge RTMP source to RTC, + // all RTMP packets will be forwarded to RTC source. + if (source && rtc) { + source->bridge_to(rtc->bridger()); + } +#endif pool[stream_url] = source; - *pps = source; - + return err; + +failed: + srs_freep(source); return err; } @@ -1892,6 +1858,14 @@ void SrsSourceManager::destroy() pool.clear(); } +ISrsSourceBridger::ISrsSourceBridger() +{ +} + +ISrsSourceBridger::~ISrsSourceBridger() +{ +} + SrsSource::SrsSource() { req = NULL; @@ -1902,6 +1876,9 @@ SrsSource::SrsSource() _can_publish = true; _pre_source_id = _source_id = -1; die_at = 0; + + handler = NULL; + bridger = NULL; play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); @@ -1914,10 +1891,6 @@ SrsSource::SrsSource() _srs_config->subscribe(this); atc = false; - -#ifdef SRS_RTC - rtc_publisher_ = NULL; -#endif } SrsSource::~SrsSource() @@ -2012,6 +1985,11 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) return err; } +void SrsSource::bridge_to(ISrsSourceBridger* v) +{ + bridger = v; +} + srs_error_t SrsSource::on_reload_vhost_play(string vhost) { srs_error_t err = srs_success; @@ -2262,6 +2240,11 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) return srs_error_wrap(err, "consume audio"); } + // For bridger to consume the message. + if (bridger && (err = bridger->on_audio(msg)) != srs_success) { + return srs_error_wrap(err, "bridger consume audio"); + } + // copy to all consumer if (!drop_for_reduce) { for (int i = 0; i < (int)consumers.size(); i++) { @@ -2290,7 +2273,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) if ((err = gop_cache->cache(msg)) != srs_success) { return srs_error_wrap(err, "gop cache consume audio"); } - + // if atc, update the sequence header to abs time. if (atc) { if (meta->ash()) { @@ -2387,6 +2370,11 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg) return srs_error_wrap(err, "hub consume video"); } + // For bridger to consume the message. + if (bridger && (err = bridger->on_video(msg)) != srs_success) { + return srs_error_wrap(err, "bridger consume video"); + } + // copy to all consumer if (!drop_for_reduce) { for (int i = 0; i < (int)consumers.size(); i++) { @@ -2546,6 +2534,11 @@ srs_error_t SrsSource::on_publish() if ((err = handler->on_publish(this, req)) != srs_success) { return srs_error_wrap(err, "handle publish"); } + + if (bridger && (err = bridger->on_publish()) != srs_success) { + return srs_error_wrap(err, "bridger publish"); + } + SrsStatistic* stat = SrsStatistic::instance(); stat->on_stream_publish(req, _source_id); @@ -2581,7 +2574,12 @@ void SrsSource::on_unpublish() srs_assert(handler); SrsStatistic* stat = SrsStatistic::instance(); stat->on_stream_close(req); + handler->on_unpublish(this, req); + + if (bridger) { + bridger->on_unpublish(); + } // no consumer, stream is die. if (consumers.empty()) { @@ -2695,26 +2693,3 @@ string SrsSource::get_curr_origin() return play_edge->get_curr_origin(); } -#ifdef SRS_RTC -SrsMetaCache* SrsSource::cached_meta() -{ - return meta; -} - -SrsRtcPublisher* SrsSource::rtc_publisher() -{ - return rtc_publisher_; -} - -void SrsSource::set_rtc_publisher(SrsRtcPublisher* v) -{ - rtc_publisher_ = v; -} - -srs_error_t SrsSource::on_rtc_audio(SrsSharedPtrMessage* audio) -{ - // TODO: FIXME: Merge with on_audio. - // TODO: FIXME: Print key information. - return on_audio_imp(audio); -} -#endif diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 5aba081d1..be84b1328 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -62,9 +62,6 @@ class SrsBuffer; #ifdef SRS_HDS class SrsHds; #endif -#ifdef SRS_RTC -class SrsRtcPublisher; -#endif // The time jitter algorithm: // 1. full, to ensure stream start at zero, and ensure stream monotonically increasing. @@ -153,13 +150,12 @@ public: // Enqueue the message, the timestamp always monotonically. // @param msg, the msg to enqueue, user never free it whatever the return code. // @param is_overflow, whether overflow and shrinked. NULL to ignore. - // @remark If pass_timestamp, we never shrink and never care about the timestamp or duration. - virtual srs_error_t enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL, bool pass_timestamp = false); + virtual srs_error_t enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL); // Get packets in consumer queue. // @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it. // @count the count in array, output param. // @max_count the max count to dequeue, must be positive. - virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp = false); + virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); // Dumps packets to consumer, use specified args. // @remark the atc/tba/tbv/ag are same to SrsConsumer.enqueue(). virtual srs_error_t dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag); @@ -186,8 +182,18 @@ public: virtual void wakeup() = 0; }; +// Enqueue the packet to consumer. +class ISrsConsumerQueue +{ +public: + ISrsConsumerQueue(); + virtual ~ISrsConsumerQueue(); +public: + virtual srs_error_t enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) = 0; +}; + // The consumer for SrsSource, that is a play client. -class SrsConsumer : public ISrsWakable +class SrsConsumer : virtual public ISrsWakable, virtual public ISrsConsumerQueue { private: SrsRtmpJitter* jitter; @@ -206,17 +212,10 @@ private: int mw_min_msgs; srs_utime_t mw_duration; #endif -private: - // For RTC, we never use jitter to correct timestamp. - // But we should not change the atc or time_jitter for source or RTMP. - // @remark In this mode, we also never check the queue by timstamp, but only by count. - bool pass_timestamp; public: SrsConsumer(SrsSource* s, SrsConnection* c); virtual ~SrsConsumer(); public: - // Use pass timestamp mode. - void enable_pass_timestamp(); // Set the size of queue. virtual void set_queue_size(srs_utime_t queue_size); // when source id changed, notice client to print. @@ -347,10 +346,6 @@ private: private: // The format, codec information. SrsRtmpFormat* format; -#ifdef SRS_RTC - // rtc handler - SrsRtc* rtc; -#endif // hls handler. SrsHls* hls; // The DASH encoder. @@ -450,7 +445,7 @@ public: // Dumps cached metadata to consumer. // @param dm Whether dumps the metadata. // @param ds Whether dumps the sequence header. - virtual srs_error_t dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds); + virtual srs_error_t dumps(ISrsConsumerQueue* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds); public: // Previous exists sequence header. virtual SrsSharedPtrMessage* previous_vsh(); @@ -493,7 +488,7 @@ public: private: virtual srs_error_t do_cycle(); public: - // when system exit, destroy the sources, + // when system exit, destroy th`e sources, // For gmc to analysis mem leaks. virtual void destroy(); }; @@ -501,6 +496,19 @@ public: // Global singleton instance. extern SrsSourceManager* _srs_sources; +// For two sources to bridge with each other. +class ISrsSourceBridger +{ +public: + ISrsSourceBridger(); + virtual ~ISrsSourceBridger(); +public: + virtual srs_error_t on_publish() = 0; + virtual srs_error_t on_audio(SrsSharedPtrMessage* audio) = 0; + virtual srs_error_t on_video(SrsSharedPtrMessage* video) = 0; + virtual void on_unpublish() = 0; +}; + // live streaming source. class SrsSource : public ISrsReloadHandler { @@ -534,6 +542,8 @@ private: int64_t last_packet_time; // The event handler. ISrsSourceHandler* handler; + // The source bridger for other source. + ISrsSourceBridger* bridger; // The edge control service SrsPlayEdge* play_edge; SrsPublishEdge* publish_edge; @@ -549,10 +559,6 @@ private: // The last die time, when all consumers quit and no publisher, // We will remove the source when source die. srs_utime_t die_at; -#ifdef SRS_RTC -private: - SrsRtcPublisher* rtc_publisher_; -#endif public: SrsSource(); virtual ~SrsSource(); @@ -564,6 +570,8 @@ public: public: // Initialize the hls with handlers. virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h); + // Bridge to other source, forward packets to it. + void bridge_to(ISrsSourceBridger* v); // Interface ISrsReloadHandler public: virtual srs_error_t on_reload_vhost_play(std::string vhost); @@ -619,17 +627,6 @@ public: virtual void on_edge_proxy_unpublish(); public: virtual std::string get_curr_origin(); -#ifdef SRS_RTC -public: - // For RTC, we need to package SPS/PPS(in cached meta) before each IDR. - SrsMetaCache* cached_meta(); - // Get and set the publisher, passed to consumer to process requests such as PLI. - SrsRtcPublisher* rtc_publisher(); - void set_rtc_publisher(SrsRtcPublisher* v); - // When got RTC audio message, which is encoded in opus. - // TODO: FIXME: Merge with on_audio. - srs_error_t on_rtc_audio(SrsSharedPtrMessage* audio); -#endif }; #endif diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 39493646a..3a426c3c7 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -125,6 +125,7 @@ * @remark this improve performance for large connectios. * @see https://github.com/ossrs/srs/issues/251 */ +// TODO: FIXME: Should always enable it. #define SRS_PERF_QUEUE_COND_WAIT #ifdef SRS_PERF_QUEUE_COND_WAIT // For RTMP, use larger wait queue. @@ -212,6 +213,7 @@ #define SRS_PERF_RTC_GSO_MAX 64 // For RTC, the max count of RTP packets we process in one loop. +// TODO: FIXME: Remove it. #define SRS_PERF_RTC_RTP_PACKETS 1024 #endif diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index a5eb4d404..55033b466 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -24,6 +24,6 @@ #ifndef SRS_CORE_VERSION4_HPP #define SRS_CORE_VERSION4_HPP -#define SRS_VERSION4_REVISION 25 +#define SRS_VERSION4_REVISION 26 #endif diff --git a/trunk/src/kernel/srs_kernel_codec.cpp b/trunk/src/kernel/srs_kernel_codec.cpp index e104790e0..e3d3ca4a3 100644 --- a/trunk/src/kernel/srs_kernel_codec.cpp +++ b/trunk/src/kernel/srs_kernel_codec.cpp @@ -414,6 +414,7 @@ SrsSample* SrsSample::copy() SrsSample* p = new SrsSample(); p->bytes = bytes; p->size = size; + p->bframe = bframe; return p; } diff --git a/trunk/src/kernel/srs_kernel_codec.hpp b/trunk/src/kernel/srs_kernel_codec.hpp index 9c74e1387..a5bb960cb 100644 --- a/trunk/src/kernel/srs_kernel_codec.hpp +++ b/trunk/src/kernel/srs_kernel_codec.hpp @@ -636,6 +636,7 @@ public: SrsAvcLevel avc_level; // lengthSizeMinusOne, ISO_IEC_14496-15-AVC-format-2012.pdf, page 16 int8_t NAL_unit_length; + // Note that we may resize the vector, so the under-layer bytes may change. std::vector sequenceParameterSetNALUnit; std::vector pictureParameterSetNALUnit; public: diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index ff8be065c..76d808a44 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -303,6 +303,18 @@ srs_error_t SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload return err; } +void SrsSharedPtrMessage::wrap(char* payload, int size) +{ + srs_assert(!ptr); + ptr = new SrsSharedPtrPayload(); + + ptr->payload = payload; + ptr->size = size; + + this->payload = ptr->payload; + this->size = ptr->size; +} + int SrsSharedPtrMessage::count() { srs_assert(ptr); diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index 91faaa2a0..60d8ee1d3 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -304,6 +304,7 @@ private: // The reference count int shared_count; #ifdef SRS_RTC + // TODO: FIXME: Remove it. public: // For RTC video, we need to know the NALU structures, // because the RTP STAP-A or FU-A based on NALU. @@ -340,6 +341,9 @@ public: // @remark user should never free the payload. // @param pheader, the header to copy to the message. NULL to ignore. virtual srs_error_t create(SrsMessageHeader* pheader, char* payload, int size); + // Create shared ptr message from RAW payload. + // @remark Note that the header is set to zero. + virtual void wrap(char* payload, int size); // Get current reference count. // when this object created, count set to 0. // if copy() this object, count increase 1. diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index c880b6fc0..bdec386c9 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -31,10 +31,10 @@ using namespace std; #include #include #include +#include SrsRtpHeader::SrsRtpHeader() { - padding = false; padding_length = 0; extension = false; cc = 0; @@ -46,17 +46,6 @@ SrsRtpHeader::SrsRtpHeader() extension_length = 0; } -void SrsRtpHeader::reset() -{ - // We only reset the optional fields, the required field such as ssrc - // will always be set by user. - padding = false; - extension = false; - cc = 0; - marker = false; - extension_length = 0; -} - SrsRtpHeader::~SrsRtpHeader() { } @@ -85,7 +74,7 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* buf) */ uint8_t first = buf->read_1bytes(); - padding = (first & 0x20); + bool padding = (first & 0x20); extension = (first & 0x10); cc = (first & 0x0F); @@ -141,7 +130,7 @@ srs_error_t SrsRtpHeader::encode(SrsBuffer* buf) // The version, padding, extension and cc, total 1 byte. uint8_t v = 0x80 | cc; - if (padding) { + if (padding_length > 0) { v |= 0x20; } if (extension) { @@ -249,21 +238,24 @@ uint32_t SrsRtpHeader::get_ssrc() const return ssrc; } -void SrsRtpHeader::set_padding(bool v) -{ - padding = v; -} - -void SrsRtpHeader::set_padding_length(uint8_t v) +void SrsRtpHeader::set_padding(uint8_t v) { padding_length = v; } -uint8_t SrsRtpHeader::get_padding_length() const +uint8_t SrsRtpHeader::get_padding() const { return padding_length; } +ISrsRtpPayloader::ISrsRtpPayloader() +{ +} + +ISrsRtpPayloader::~ISrsRtpPayloader() +{ +} + ISrsRtpPacketDecodeHandler::ISrsRtpPacketDecodeHandler() { } @@ -274,75 +266,35 @@ ISrsRtpPacketDecodeHandler::~ISrsRtpPacketDecodeHandler() SrsRtpPacket2::SrsRtpPacket2() { - padding = 0; payload = NULL; decode_handler = NULL; nalu_type = SrsAvcNaluTypeReserved; - original_bytes = NULL; - - cache_raw = new SrsRtpRawPayload(); - cache_fua = new SrsRtpFUAPayload2(); - cache_payload = 0; + shared_msg = NULL; + frame_type = SrsFrameTypeReserved; + cached_payload_size = 0; } SrsRtpPacket2::~SrsRtpPacket2() { - // We may use the cache as payload. - if (payload == cache_raw || payload == cache_fua) { - payload = NULL; - } - srs_freep(payload); - srs_freep(cache_raw); - srs_freep(cache_fua); - - srs_freepa(original_bytes); + srs_freep(shared_msg); } void SrsRtpPacket2::set_padding(int size) { - rtp_header.set_padding(size > 0); - rtp_header.set_padding_length(size); - if (cache_payload) { - cache_payload += size - padding; + rtp_header.set_padding(size); + if (cached_payload_size) { + cached_payload_size += size - rtp_header.get_padding(); } - padding = size; } void SrsRtpPacket2::add_padding(int size) { - rtp_header.set_padding(padding + size > 0); - rtp_header.set_padding_length(rtp_header.get_padding_length() + size); - if (cache_payload) { - cache_payload += size; + rtp_header.set_padding(rtp_header.get_padding() + size); + if (cached_payload_size) { + cached_payload_size += size; } - padding += size; -} - -void SrsRtpPacket2::reset() -{ - rtp_header.reset(); - padding = 0; - cache_payload = 0; - - // We may use the cache as payload. - if (payload == cache_raw || payload == cache_fua) { - payload = NULL; - } - srs_freep(payload); -} - -SrsRtpRawPayload* SrsRtpPacket2::reuse_raw() -{ - payload = cache_raw; - return cache_raw; -} - -SrsRtpFUAPayload2* SrsRtpPacket2::reuse_fua() -{ - payload = cache_fua; - return cache_fua; } void SrsRtpPacket2::set_decode_handler(ISrsRtpPacketDecodeHandler* h) @@ -350,12 +302,35 @@ void SrsRtpPacket2::set_decode_handler(ISrsRtpPacketDecodeHandler* h) decode_handler = h; } +bool SrsRtpPacket2::is_audio() +{ + return frame_type == SrsFrameTypeAudio; +} + +SrsRtpPacket2* SrsRtpPacket2::copy() +{ + SrsRtpPacket2* cp = new SrsRtpPacket2(); + + cp->rtp_header = rtp_header; + cp->payload = payload? payload->copy():NULL; + + cp->nalu_type = nalu_type; + cp->shared_msg = shared_msg? shared_msg->copy():NULL; + cp->frame_type = frame_type; + + cp->cached_payload_size = cached_payload_size; + cp->decode_handler = decode_handler; + + return cp; +} + int SrsRtpPacket2::nb_bytes() { - if (!cache_payload) { - cache_payload = rtp_header.nb_bytes() + (payload? payload->nb_bytes():0) + padding; + if (!cached_payload_size) { + int nn_payload = (payload? payload->nb_bytes():0); + cached_payload_size = rtp_header.nb_bytes() + nn_payload + rtp_header.get_padding(); } - return cache_payload; + return cached_payload_size; } srs_error_t SrsRtpPacket2::encode(SrsBuffer* buf) @@ -370,7 +345,8 @@ srs_error_t SrsRtpPacket2::encode(SrsBuffer* buf) return srs_error_wrap(err, "rtp payload"); } - if (padding > 0) { + if (rtp_header.get_padding() > 0) { + uint8_t padding = rtp_header.get_padding(); if (!buf->require(padding)) { return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", padding); } @@ -390,7 +366,7 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf) } // We must skip the padding bytes before parsing payload. - padding = rtp_header.get_padding_length(); + uint8_t padding = rtp_header.get_padding(); if (!buf->require(padding)) { return srs_error_wrap(err, "requires padding %d bytes", padding); } @@ -408,7 +384,7 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf) // By default, we always use the RAW payload. if (!payload) { - payload = reuse_raw(); + payload = new SrsRtpRawPayload(); } if ((err = payload->decode(buf)) != srs_success) { @@ -460,6 +436,16 @@ srs_error_t SrsRtpRawPayload::decode(SrsBuffer* buf) return srs_success; } +ISrsRtpPayloader* SrsRtpRawPayload::copy() +{ + SrsRtpRawPayload* cp = new SrsRtpRawPayload(); + + cp->payload = payload; + cp->nn_payload = nn_payload; + + return cp; +} + SrsRtpRawNALUs::SrsRtpRawNALUs() { cursor = 0; @@ -582,6 +568,22 @@ srs_error_t SrsRtpRawNALUs::decode(SrsBuffer* buf) return srs_success; } +ISrsRtpPayloader* SrsRtpRawNALUs::copy() +{ + SrsRtpRawNALUs* cp = new SrsRtpRawNALUs(); + + cp->nn_bytes = nn_bytes; + cp->cursor = cursor; + + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + cp->nalus.push_back(p->copy()); + } + + return cp; +} + SrsRtpSTAPPayload::SrsRtpSTAPPayload() { nri = (SrsAvcNaluType)0; @@ -706,6 +708,21 @@ srs_error_t SrsRtpSTAPPayload::decode(SrsBuffer* buf) return srs_success; } +ISrsRtpPayloader* SrsRtpSTAPPayload::copy() +{ + SrsRtpSTAPPayload* cp = new SrsRtpSTAPPayload(); + + cp->nri = nri; + + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + cp->nalus.push_back(p->copy()); + } + + return cp; +} + SrsRtpFUAPayload::SrsRtpFUAPayload() { start = end = false; @@ -800,6 +817,24 @@ srs_error_t SrsRtpFUAPayload::decode(SrsBuffer* buf) return srs_success; } +ISrsRtpPayloader* SrsRtpFUAPayload::copy() +{ + SrsRtpFUAPayload* cp = new SrsRtpFUAPayload(); + + cp->nri = nri; + cp->start = start; + cp->end = end; + cp->nalu_type = nalu_type; + + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + cp->nalus.push_back(p->copy()); + } + + return cp; +} + SrsRtpFUAPayload2::SrsRtpFUAPayload2() { start = end = false; @@ -877,3 +912,17 @@ srs_error_t SrsRtpFUAPayload2::decode(SrsBuffer* buf) return srs_success; } + +ISrsRtpPayloader* SrsRtpFUAPayload2::copy() +{ + SrsRtpFUAPayload2* cp = new SrsRtpFUAPayload2(); + + cp->nri = nri; + cp->start = start; + cp->end = end; + cp->nalu_type = nalu_type; + cp->payload = payload; + cp->size = size; + + return cp; +} diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp index 77a7fb5b3..3c8da5cca 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp @@ -33,8 +33,11 @@ class SrsRtpPacket2; -const int kRtpHeaderFixedSize = 12; -const uint8_t kRtpMarker = 0x80; +// The RTP packet max size, should never exceed this size. +const int kRtpPacketSize = 1500; + +const int kRtpHeaderFixedSize = 12; +const uint8_t kRtpMarker = 0x80; // H.264 nalu header type mask. const uint8_t kNalTypeMask = 0x1F; @@ -53,11 +56,11 @@ const uint8_t kEnd = 0x40; // Fu-header end bit class SrsBuffer; class SrsRtpRawPayload; class SrsRtpFUAPayload2; +class SrsSharedPtrMessage; class SrsRtpHeader { private: - bool padding; uint8_t padding_length; bool extension; uint8_t cc; @@ -72,7 +75,6 @@ private: public: SrsRtpHeader(); virtual ~SrsRtpHeader(); - void reset(); public: virtual srs_error_t decode(SrsBuffer* buf); virtual srs_error_t encode(SrsBuffer* buf); @@ -88,9 +90,17 @@ public: uint32_t get_timestamp() const; void set_ssrc(uint32_t v); uint32_t get_ssrc() const; - void set_padding(bool v); - void set_padding_length(uint8_t v); - uint8_t get_padding_length() const; + void set_padding(uint8_t v); + uint8_t get_padding() const; +}; + +class ISrsRtpPayloader : public ISrsCodec +{ +public: + ISrsRtpPayloader(); + virtual ~ISrsRtpPayloader(); +public: + virtual ISrsRtpPayloader* copy() = 0; }; class ISrsRtpPacketDecodeHandler @@ -100,7 +110,7 @@ public: virtual ~ISrsRtpPacketDecodeHandler(); public: // We don't know the actual payload, so we depends on external handler. - virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload) = 0; + virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload) = 0; }; class SrsRtpPacket2 @@ -109,21 +119,19 @@ class SrsRtpPacket2 public: // TODO: FIXME: Rename to header. SrsRtpHeader rtp_header; - ISrsCodec* payload; - // TODO: FIXME: Merge into rtp_header. - int padding; -// Decoder helper. + ISrsRtpPayloader* payload; +// Helper fields. public: // The first byte as nalu type, for video decoder only. SrsAvcNaluType nalu_type; - // The original bytes for decoder only, we will free it. - char* original_bytes; + // The original shared message, all RTP packets can refer to its data. + SrsSharedPtrMessage* shared_msg; + // The frame type, for RTMP bridger or SFU source. + SrsFrameType frame_type; // Fast cache for performance. private: - // Cache frequently used payload for performance. - SrsRtpRawPayload* cache_raw; - SrsRtpFUAPayload2* cache_fua; - int cache_payload; + // The cached payload size for packet. + int cached_payload_size; // The helper handler for decoder, use RAW payload if NULL. ISrsRtpPacketDecodeHandler* decode_handler; public: @@ -134,14 +142,12 @@ public: void set_padding(int size); // Increase the padding of RTP packet. void add_padding(int size); - // Reset RTP packet. - void reset(); - // Reuse the cached raw message as payload. - SrsRtpRawPayload* reuse_raw(); - // Reuse the cached fua message as payload. - SrsRtpFUAPayload2* reuse_fua(); // Set the decode handler. void set_decode_handler(ISrsRtpPacketDecodeHandler* h); + // Whether the packet is Audio packet. + bool is_audio(); + // Copy the RTP packet. + SrsRtpPacket2* copy(); // interface ISrsEncoder public: virtual int nb_bytes(); @@ -150,7 +156,7 @@ public: }; // Single payload data. -class SrsRtpRawPayload : public ISrsCodec +class SrsRtpRawPayload : public ISrsRtpPayloader { public: // The RAW payload, directly point to the shared memory. @@ -160,15 +166,16 @@ public: public: SrsRtpRawPayload(); virtual ~SrsRtpRawPayload(); -// interface ISrsEncoder +// interface ISrsRtpPayloader public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); + virtual ISrsRtpPayloader* copy(); }; // Multiple NALUs, automatically insert 001 between NALUs. -class SrsRtpRawNALUs : public ISrsCodec +class SrsRtpRawNALUs : public ISrsRtpPayloader { private: // We will manage the samples, but the sample itself point to the shared memory. @@ -184,15 +191,16 @@ public: uint8_t skip_first_byte(); // We will manage the returned samples, if user want to manage it, please copy it. srs_error_t read_samples(std::vector& samples, int packet_size); -// interface ISrsEncoder +// interface ISrsRtpPayloader public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); + virtual ISrsRtpPayloader* copy(); }; // STAP-A, for multiple NALUs. -class SrsRtpSTAPPayload : public ISrsCodec +class SrsRtpSTAPPayload : public ISrsRtpPayloader { public: // The NRI in NALU type. @@ -206,16 +214,17 @@ public: public: SrsSample* get_sps(); SrsSample* get_pps(); -// interface ISrsEncoder +// interface ISrsRtpPayloader public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); + virtual ISrsRtpPayloader* copy(); }; // FU-A, for one NALU with multiple fragments. // With more than one payload. -class SrsRtpFUAPayload : public ISrsCodec +class SrsRtpFUAPayload : public ISrsRtpPayloader { public: // The NRI in NALU type. @@ -230,16 +239,17 @@ public: public: SrsRtpFUAPayload(); virtual ~SrsRtpFUAPayload(); -// interface ISrsEncoder +// interface ISrsRtpPayloader public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); + virtual ISrsRtpPayloader* copy(); }; // FU-A, for one NALU with multiple fragments. // With only one payload. -class SrsRtpFUAPayload2 : public ISrsCodec +class SrsRtpFUAPayload2 : public ISrsRtpPayloader { public: // The NRI in NALU type. @@ -254,11 +264,12 @@ public: public: SrsRtpFUAPayload2(); virtual ~SrsRtpFUAPayload2(); -// interface ISrsEncoder +// interface ISrsRtpPayloader public: virtual int nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); + virtual ISrsRtpPayloader* copy(); }; #endif