From c534a265e58b5fcfb42abfa739468e21b00de506 Mon Sep 17 00:00:00 2001 From: Winlin Date: Mon, 1 Sep 2025 14:00:31 -0400 Subject: [PATCH] AI: Update RTMP message memory management with shared pointers. v7.0.73 (#4464) This PR modernizes the memory management architecture in SRS by refactoring RTMP message handling to use shared pointers (SrsSharedPtr) instead of manual memory management. This change improves memory safety, reduces the risk of memory leaks, and provides a cleaner abstraction for message payload handling. * Introduced `SrsMemoryBlock`: A dedicated class for managing memory buffers with size information * Replaced manual memory management: `SrsCommonMessage` and `SrsSharedPtrMessage` now use `SrsSharedPtr` instead of raw pointers * Updated `SrsRtpPacket`: Now uses `SrsSharedPtr` for shared buffer management --------- Co-authored-by: OSSRS-AI --- trunk/doc/CHANGELOG.md | 1 + trunk/src/app/srs_app_dash.cpp | 2 +- trunk/src/app/srs_app_dvr.cpp | 14 +- trunk/src/app/srs_app_edge.cpp | 4 +- trunk/src/app/srs_app_forward.cpp | 4 +- trunk/src/app/srs_app_gb28181.cpp | 2 +- trunk/src/app/srs_app_hds.cpp | 12 +- trunk/src/app/srs_app_http_stream.cpp | 10 +- trunk/src/app/srs_app_mpegts_udp.cpp | 2 +- trunk/src/app/srs_app_rtc_source.cpp | 10 +- trunk/src/app/srs_app_rtsp_source.cpp | 2 +- trunk/src/app/srs_app_source.cpp | 51 +++--- trunk/src/app/srs_app_srt_source.cpp | 26 ++- trunk/src/core/srs_core_version7.hpp | 2 +- trunk/src/kernel/srs_kernel_buffer.cpp | 54 ++++++ trunk/src/kernel/srs_kernel_buffer.hpp | 39 +++++ trunk/src/kernel/srs_kernel_flv.cpp | 163 ++++++------------ trunk/src/kernel/srs_kernel_flv.hpp | 96 +++-------- trunk/src/kernel/srs_kernel_kbps.cpp | 6 + trunk/src/kernel/srs_kernel_rtc_rtp.cpp | 37 ++-- trunk/src/kernel/srs_kernel_rtc_rtp.hpp | 13 +- trunk/src/protocol/srs_protocol_format.cpp | 8 +- .../src/protocol/srs_protocol_rtmp_stack.cpp | 37 ++-- .../src/protocol/srs_protocol_rtmp_stack.hpp | 2 + trunk/src/protocol/srs_protocol_rtp.cpp | 8 +- trunk/src/utest/srs_utest_fmp4.cpp | 4 +- trunk/src/utest/srs_utest_kernel.cpp | 98 ++++++++++- trunk/src/utest/srs_utest_protocol.cpp | 21 +-- trunk/src/utest/srs_utest_rtmp.cpp | 16 +- 29 files changed, 404 insertions(+), 340 deletions(-) diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 976b26b10..20551f544 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 7.0 Changelog +* v7.0, 2025-09-01, Merge [#4464](https://github.com/ossrs/srs/pull/4464): AI: Use shared ptr in RTMP message. v7.0.73 (#4464) * v7.0, 2025-09-01, Merge [#4463](https://github.com/ossrs/srs/pull/4463): AI: Use SrsHttpUri for URL parsing and add legacy RTMP URL conversion. v7.0.72 (#4463) * v7.0, 2025-09-01, Merge [#4462](https://github.com/ossrs/srs/pull/4462): HTTP: Rename HTTP hijack to dynamic match for better clarity. v7.0.71 (#4462) * v7.0, 2025-08-31, Merge [#4461](https://github.com/ossrs/srs/pull/4461): AI: Extract shared components and improve SRS server architecture. v7.0.70 (#4461) diff --git a/trunk/src/app/srs_app_dash.cpp b/trunk/src/app/srs_app_dash.cpp index 31e7d7781..1aaab172f 100644 --- a/trunk/src/app/srs_app_dash.cpp +++ b/trunk/src/app/srs_app_dash.cpp @@ -645,7 +645,7 @@ srs_error_t SrsDashController::refresh_init_mp4(SrsSharedPtrMessage *msg, SrsFor { srs_error_t err = srs_success; - if (msg->size <= 0 || (msg->is_video() && !format->vcodec->is_avc_codec_ok()) || (msg->is_audio() && !format->acodec->is_aac_codec_ok())) { + if (msg->size() <= 0 || (msg->is_video() && !format->vcodec->is_avc_codec_ok()) || (msg->is_audio() && !format->acodec->is_aac_codec_ok())) { srs_warn("DASH: Ignore empty sequence header."); return err; } diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index dd7a3440f..db5b994b5 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -319,7 +319,7 @@ srs_error_t SrsDvrFlvSegmenter::encode_metadata(SrsSharedPtrMessage *metadata) return err; } - SrsBuffer stream(metadata->payload, metadata->size); + SrsBuffer stream(metadata->payload(), metadata->size()); SrsUniquePtr name(SrsAmf0Any::str()); if ((err = name->read(&stream)) != srs_success) { @@ -370,8 +370,8 @@ srs_error_t SrsDvrFlvSegmenter::encode_audio(SrsSharedPtrMessage *audio, SrsForm { srs_error_t err = srs_success; - char *payload = audio->payload; - int size = audio->size; + char *payload = audio->payload(); + int size = audio->size(); if ((err = enc->write_audio(audio->timestamp, payload, size)) != srs_success) { return srs_error_wrap(err, "write audio"); } @@ -383,8 +383,8 @@ srs_error_t SrsDvrFlvSegmenter::encode_video(SrsSharedPtrMessage *video, SrsForm { srs_error_t err = srs_success; - char *payload = video->payload; - int size = video->size; + char *payload = video->payload(); + int size = video->size(); bool sh = (format->video->avc_packet_type == SrsVideoAvcFrameTraitSequenceHeader); bool keyframe = (!sh && format->video->frame_type == SrsVideoAvcFrameTypeKeyFrame); @@ -870,8 +870,8 @@ srs_error_t SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage *msg) return err; } - char *payload = msg->payload; - int size = msg->size; + char *payload = msg->payload(); + int size = msg->size(); bool codec_ok = SrsFlvVideo::h264(payload, size); codec_ok = codec_ok ? true : SrsFlvVideo::hevc(payload, size); diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index e7ece8ece..b17585eab 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -338,7 +338,7 @@ srs_error_t SrsEdgeFlvUpstream::decode_message(SrsCommonMessage *msg, SrsPacket srs_error_t err = srs_success; SrsPacket *packet = NULL; - SrsBuffer stream(msg->payload, msg->size); + SrsBuffer stream(msg->payload(), msg->size()); SrsMessageHeader &header = msg->header; if (header.is_amf0_data() || header.is_amf3_data()) { @@ -924,7 +924,7 @@ srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage *msg) // the msg is auto free by source, // so we just ignore, or copy then send it. - if (msg->size <= 0 || msg->header.is_set_chunk_size() || msg->header.is_window_ackledgement_size() || msg->header.is_ackledgement()) { + if (msg->size() <= 0 || msg->header.is_set_chunk_size() || msg->header.is_window_ackledgement_size() || msg->header.is_ackledgement()) { return err; } diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index d9058f0c1..6e50df3aa 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -127,7 +127,7 @@ srs_error_t SrsForwarder::on_audio(SrsSharedPtrMessage *shared_audio) return srs_error_wrap(err, "jitter"); } - if (SrsFlvAudio::sh(msg->payload, msg->size)) { + if (SrsFlvAudio::sh(msg->payload(), msg->size())) { srs_freep(sh_audio); sh_audio = msg->copy(); } @@ -150,7 +150,7 @@ srs_error_t SrsForwarder::on_video(SrsSharedPtrMessage *shared_video) return srs_error_wrap(err, "jitter"); } - if (SrsFlvVideo::sh(msg->payload, msg->size)) { + if (SrsFlvVideo::sh(msg->payload(), msg->size())) { srs_freep(sh_video); sh_video = msg->copy(); } diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 6deac3b7f..18d4a7df2 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -2129,7 +2129,7 @@ srs_error_t SrsGbMuxer::rtmp_write_packet(char type, uint32_t timestamp, char *d srs_trace("Muxer: send msg %s age=%d, dts=%" PRId64 ", size=%d", msg->is_audio() ? "A" : msg->is_video() ? "V" : "N", - pprint_->age(), msg->timestamp, msg->size); + pprint_->age(), msg->timestamp, msg->size()); } // send out encoded msg. diff --git a/trunk/src/app/srs_app_hds.cpp b/trunk/src/app/srs_app_hds.cpp index 4f42027c1..378c8a2b3 100644 --- a/trunk/src/app/srs_app_hds.cpp +++ b/trunk/src/app/srs_app_hds.cpp @@ -40,7 +40,7 @@ char flv_header[] = {'F', 'L', 'V', string serialFlv(SrsSharedPtrMessage *msg) { - int size = 15 + msg->size; + int size = 15 + msg->size(); char *byte = new char[size]; SrsUniquePtr stream(new SrsBuffer(byte, size)); @@ -50,14 +50,14 @@ string serialFlv(SrsSharedPtrMessage *msg) char type = msg->is_video() ? 0x09 : 0x08; stream->write_1bytes(type); - stream->write_3bytes(msg->size); + stream->write_3bytes(msg->size()); stream->write_3bytes((int32_t)dts); stream->write_1bytes(dts >> 24 & 0xFF); stream->write_3bytes(0); - stream->write_bytes(msg->payload, msg->size); + stream->write_bytes(msg->payload(), msg->size()); // pre tag size - int preTagSize = msg->size + 11; + int preTagSize = msg->size() + 11; stream->write_4bytes(preTagSize); string ret(stream->data(), stream->size()); @@ -298,7 +298,7 @@ srs_error_t SrsHds::on_video(SrsSharedPtrMessage *msg) return err; } - if (SrsFlvVideo::sh(msg->payload, msg->size)) { + if (SrsFlvVideo::sh(msg->payload(), msg->size())) { srs_freep(video_sh); video_sh = msg->copy(); } @@ -348,7 +348,7 @@ srs_error_t SrsHds::on_audio(SrsSharedPtrMessage *msg) return err; } - if (SrsFlvAudio::sh(msg->payload, msg->size)) { + if (SrsFlvAudio::sh(msg->payload(), msg->size())) { srs_freep(audio_sh); audio_sh = msg->copy(); } diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 0b5d3a8ec..64019beb1 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -392,11 +392,11 @@ srs_error_t SrsFlvStreamEncoder::write_tags(SrsSharedPtrMessage **msgs, int coun for (int i = 0; i < count; i++) { SrsSharedPtrMessage *msg = msgs[i]; if (msg->is_video()) { - if (!SrsFlvVideo::sh(msg->payload, msg->size)) + if (!SrsFlvVideo::sh(msg->payload(), msg->size())) nn_video_frames++; has_video = true; } else if (msg->is_audio()) { - if (!SrsFlvAudio::sh(msg->payload, msg->size)) + if (!SrsFlvAudio::sh(msg->payload(), msg->size())) nn_audio_frames++; has_audio = true; } @@ -958,11 +958,11 @@ srs_error_t SrsLiveStream::streaming_send_messages(ISrsBufferEncoder *enc, SrsSh SrsSharedPtrMessage *msg = msgs[i]; if (msg->is_audio()) { - err = enc->write_audio(msg->timestamp, msg->payload, msg->size); + err = enc->write_audio(msg->timestamp, msg->payload(), msg->size()); } else if (msg->is_video()) { - err = enc->write_video(msg->timestamp, msg->payload, msg->size); + err = enc->write_video(msg->timestamp, msg->payload(), msg->size()); } else { - err = enc->write_metadata(msg->timestamp, msg->payload, msg->size); + err = enc->write_metadata(msg->timestamp, msg->payload(), msg->size()); } if (err != srs_success) { diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index ff526b505..6b74ee061 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -626,7 +626,7 @@ srs_error_t SrsMpegtsOverUdp::rtmp_write_packet(char type, uint32_t timestamp, c srs_trace("mpegts: send msg %s age=%d, dts=%" PRId64 ", size=%d", msg->is_audio() ? "A" : msg->is_video() ? "V" : "N", - pprint->age(), msg->timestamp, msg->size); + pprint->age(), msg->timestamp, msg->size()); } // send out encoded msg. diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 2ce2f8a4d..bd73043b6 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -78,7 +78,7 @@ srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage *shared_audio, SrsFor // If no audio RAW frame, or not parsed for no sequence header, drop the packet. if (format->audio->nb_samples == 0) { - srs_warn("RTC: Drop AAC %d bytes for no sample", shared_audio->size); + srs_warn("RTC: Drop AAC %d bytes for no sample", shared_audio->size()); return err; } @@ -1186,7 +1186,7 @@ srs_error_t SrsRtcRtpBuilder::on_video(SrsSharedPtrMessage *msg) srs_error_t err = srs_success; // cache the sequence header if h264 - bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size); + bool is_sequence_header = SrsFlvVideo::sh(msg->payload(), msg->size()); if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) { return srs_error_wrap(err, "meta update video"); } @@ -1919,7 +1919,7 @@ void SrsRtcFrameBuilder::packet_aac(SrsCommonMessage *audio, char *data, int len int rtmp_len = len + 2; audio->header.initialize_audio(rtmp_len, pts, 1); audio->create_payload(rtmp_len); - SrsBuffer stream(audio->payload, rtmp_len); + SrsBuffer stream(audio->payload(), rtmp_len); uint8_t aac_flag = (SrsAudioCodecIdAAC << 4) | (SrsAudioSampleRate44100 << 2) | (SrsAudioSampleBits16bit << 1) | SrsAudioChannelsStereo; stream.write_1bytes(aac_flag); if (is_header) { @@ -1928,7 +1928,6 @@ void SrsRtcFrameBuilder::packet_aac(SrsCommonMessage *audio, char *data, int len stream.write_1bytes(1); } stream.write_bytes(data, len); - audio->size = rtmp_len; } srs_error_t SrsRtcFrameBuilder::packet_video(SrsRtpPacket *pkt) @@ -2406,8 +2405,7 @@ srs_error_t SrsRtcFrameBuilder::packet_video_rtmp(const uint16_t start, const ui SrsCommonMessage rtmp; rtmp.header.initialize_video(nb_payload, pkt->get_avsync_time(), 1); rtmp.create_payload(nb_payload); - rtmp.size = nb_payload; - SrsBuffer payload(rtmp.payload, rtmp.size); + SrsBuffer payload(rtmp.payload(), rtmp.size()); if (video_codec_ == SrsVideoCodecIdHEVC) { // @see: https://veovera.org/docs/enhanced/enhanced-rtmp-v1.pdf, page 8 payload.write_1bytes(SRS_FLV_IS_EX_HEADER | (frame_type << 4) | SrsVideoHEVCFrameTraitPacketTypeCodedFramesX); diff --git a/trunk/src/app/srs_app_rtsp_source.cpp b/trunk/src/app/srs_app_rtsp_source.cpp index b3c3aa832..7912c8cc2 100644 --- a/trunk/src/app/srs_app_rtsp_source.cpp +++ b/trunk/src/app/srs_app_rtsp_source.cpp @@ -832,7 +832,7 @@ srs_error_t SrsRtspRtpBuilder::on_video(SrsSharedPtrMessage *msg) srs_error_t err = srs_success; // cache the sequence header if h264 - bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size); + bool is_sequence_header = SrsFlvVideo::sh(msg->payload(), msg->size()); if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) { return srs_error_wrap(err, "meta update video"); } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 94afc9c26..4b99120a1 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -345,11 +345,11 @@ void SrsMessageQueue::shrink() for (int i = 0; i < (int)msgs.size(); i++) { SrsSharedPtrMessage *msg = msgs.at(i); - if (msg->is_video() && SrsFlvVideo::sh(msg->payload, msg->size)) { + if (msg->is_video() && SrsFlvVideo::sh(msg->payload(), msg->size())) { srs_freep(video_sh); video_sh = msg; continue; - } else if (msg->is_audio() && SrsFlvAudio::sh(msg->payload, msg->size)) { + } else if (msg->is_audio() && SrsFlvAudio::sh(msg->payload(), msg->size())) { srs_freep(audio_sh); audio_sh = msg; continue; @@ -620,8 +620,8 @@ srs_error_t SrsGopCache::cache(SrsSharedPtrMessage *shared_msg) // got video, update the video count if acceptable if (msg->is_video()) { // Drop video when not h.264 or h.265. - bool codec_ok = SrsFlvVideo::h264(msg->payload, msg->size); - codec_ok = codec_ok ? true : SrsFlvVideo::hevc(msg->payload, msg->size); + bool codec_ok = SrsFlvVideo::h264(msg->payload(), msg->size()); + codec_ok = codec_ok ? true : SrsFlvVideo::hevc(msg->payload(), msg->size()); if (!codec_ok) return err; @@ -647,7 +647,7 @@ srs_error_t SrsGopCache::cache(SrsSharedPtrMessage *shared_msg) } // clear gop cache when got key frame - if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload, msg->size)) { + if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload(), msg->size())) { clear(); // curent msg is video frame, so we set to 1. @@ -737,7 +737,7 @@ bool srs_hls_can_continue(int ret, SrsSharedPtrMessage *sh, SrsSharedPtrMessage // when video size equals to sequence header, // the video actually maybe a sequence header, // continue to make ffmpeg happy. - if (sh && sh->size == msg->size) { + if (sh && sh->size() == msg->size()) { srs_warn("the msg is actually a sequence header, ignore this packet."); return true; } @@ -965,11 +965,11 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage *shared_audio) if (format->acodec->id == SrsAudioCodecIdMP3) { srs_trace("%dB audio sh, codec(%d, %dbits, %dchannels, %dHZ)", - msg->size, c->id, flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type], + msg->size(), c->id, flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type], srs_flv_srates[c->sound_rate]); } else { srs_trace("%dB audio sh, codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), flv(%dbits, %dchannels, %dHZ)", - msg->size, c->id, srs_aac_object2str(c->aac_object).c_str(), c->aac_channels, + msg->size(), c->id, srs_aac_object2str(c->aac_object).c_str(), c->aac_channels, c->audio_data_rate / 1000, srs_aac_srates[c->aac_sample_rate], flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type], srs_flv_srates[c->sound_rate]); @@ -1047,12 +1047,12 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage *shared_video, bool is_se if (c->id == SrsVideoCodecIdAVC) { err = stat->on_video_info(req_, c->id, c->avc_profile, c->avc_level, c->width, c->height); srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %.1ffps, %.1fs)", - msg->size, c->id, srs_avc_profile2str(c->avc_profile).c_str(), srs_avc_level2str(c->avc_level).c_str(), + msg->size(), c->id, srs_avc_profile2str(c->avc_profile).c_str(), srs_avc_level2str(c->avc_level).c_str(), c->width, c->height, c->video_data_rate / 1000, c->frame_rate, c->duration); } else if (c->id == SrsVideoCodecIdHEVC) { err = stat->on_video_info(req_, c->id, c->hevc_profile, c->hevc_level, c->width, c->height); srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %.1ffps, %.1fs)", - msg->size, c->id, srs_hevc_profile2str(c->hevc_profile).c_str(), srs_hevc_level2str(c->hevc_level).c_str(), + msg->size(), c->id, srs_hevc_profile2str(c->hevc_profile).c_str(), srs_hevc_level2str(c->hevc_level).c_str(), c->width, c->height, c->video_data_rate / 1000, c->frame_rate, c->duration); } if (err != srs_success) { @@ -1942,7 +1942,7 @@ srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage *msg, SrsOnMetaDataPack bool drop_for_reduce = false; if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) { drop_for_reduce = true; - srs_warn("drop for reduce sh metadata, size=%d", msg->size); + srs_warn("drop for reduce sh metadata, size=%d", msg->size()); } // copy to all consumer @@ -2038,9 +2038,9 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage *msg) // whether consumer should drop for the duplicated sequence header. bool drop_for_reduce = false; if (is_sequence_header && meta->previous_ash() && _srs_config->get_reduce_sequence_header(req->vhost)) { - if (meta->previous_ash()->size == msg->size) { - drop_for_reduce = srs_bytes_equal(meta->previous_ash()->payload, msg->payload, msg->size); - srs_warn("drop for reduce sh audio, size=%d", msg->size); + if (meta->previous_ash()->size() == msg->size()) { + drop_for_reduce = srs_bytes_equal(meta->previous_ash()->payload(), msg->payload(), msg->size()); + srs_warn("drop for reduce sh audio, size=%d", msg->size()); } } @@ -2110,13 +2110,13 @@ srs_error_t SrsLiveSource::on_video(SrsCommonMessage *shared_video) // drop any unknown header video. // @see https://github.com/ossrs/srs/issues/421 - if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) { + if (!SrsFlvVideo::acceptable(shared_video->payload(), shared_video->size())) { char b0 = 0x00; - if (shared_video->size > 0) { - b0 = shared_video->payload[0]; + if (shared_video->size() > 0) { + b0 = shared_video->payload()[0]; } - srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0); + srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size(), b0); return err; } @@ -2134,7 +2134,7 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage *msg) { srs_error_t err = srs_success; - bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size); + bool is_sequence_header = SrsFlvVideo::sh(msg->payload(), msg->size()); // user can disable the sps parse to workaround when parse sps failed. // @see https://github.com/ossrs/srs/issues/474 @@ -2155,9 +2155,9 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage *msg) // whether consumer should drop for the duplicated sequence header. bool drop_for_reduce = false; if (is_sequence_header && meta->previous_vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) { - if (meta->previous_vsh()->size == msg->size) { - drop_for_reduce = srs_bytes_equal(meta->previous_vsh()->payload, msg->payload, msg->size); - srs_warn("drop for reduce sh video, size=%d", msg->size); + if (meta->previous_vsh()->size() == msg->size()) { + drop_for_reduce = srs_bytes_equal(meta->previous_vsh()->payload(), msg->payload(), msg->size()); + srs_warn("drop for reduce sh video, size=%d", msg->size()); } } @@ -2214,7 +2214,7 @@ srs_error_t SrsLiveSource::on_aggregate(SrsCommonMessage *msg) { srs_error_t err = srs_success; - SrsUniquePtr stream(new SrsBuffer(msg->payload, msg->size)); + SrsUniquePtr stream(new SrsBuffer(msg->payload(), msg->size())); // the aggregate message always use abs time. int delta = -1; @@ -2274,9 +2274,8 @@ srs_error_t SrsLiveSource::on_aggregate(SrsCommonMessage *msg) o.header.prefer_cid = msg->header.prefer_cid; if (data_size > 0) { - o.size = data_size; - o.payload = new char[o.size]; - stream->read_bytes(o.payload, o.size); + o.create_payload(data_size); + stream->read_bytes(o.payload(), data_size); } if (!stream->require(4)) { diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index b2e715063..e9e1c6c5c 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -40,8 +40,8 @@ char *SrsSrtPacket::wrap(int size) actual_buffer_size_ = size; // If the buffer is large enough, reuse it. - if (shared_buffer_ && shared_buffer_->size >= size) { - return shared_buffer_->payload; + if (shared_buffer_ && shared_buffer_->size() >= size) { + return shared_buffer_->payload(); } // Create a large enough message, with under-layer buffer. @@ -51,7 +51,7 @@ char *SrsSrtPacket::wrap(int size) char *buf = new char[size]; shared_buffer_->wrap(buf, size); - return shared_buffer_->payload; + return shared_buffer_->payload(); } char *SrsSrtPacket::wrap(char *data, int size) @@ -70,9 +70,9 @@ char *SrsSrtPacket::wrap(SrsSharedPtrMessage *msg) // Copy from the new message. shared_buffer_ = msg->copy(); // If we wrap a message, the size of packet equals to the message size. - actual_buffer_size_ = shared_buffer_->size; + actual_buffer_size_ = shared_buffer_->size(); - return msg->payload; + return msg->payload(); } SrsSrtPacket *SrsSrtPacket::copy() @@ -87,12 +87,12 @@ SrsSrtPacket *SrsSrtPacket::copy() char *SrsSrtPacket::data() { - return shared_buffer_->payload; + return shared_buffer_->payload(); } int SrsSrtPacket::size() { - return shared_buffer_->size; + return shared_buffer_->size(); } SrsSrtSourceManager::SrsSrtSourceManager() @@ -546,8 +546,7 @@ srs_error_t SrsSrtFrameBuilder::on_h264_frame(SrsTsMessage *msg, vector= 0); + + // Free existing payload + srs_freepa(payload_); + + // Allocate new buffer + if (size > 0) { + payload_ = new char[size]; + memset(payload_, 0, size); + size_ = size; + } else { + payload_ = NULL; + size_ = 0; + } +} + +void SrsMemoryBlock::create(char *data, int size) +{ + srs_assert(size >= 0); + + create(size); + + // Copy data if provided + if (data && size > 0) { + memcpy(payload_, data, size); + size_ = size; + } +} + +void SrsMemoryBlock::attach(char *data, int size) +{ + srs_assert(size >= 0); + + // Free existing payload + srs_freepa(payload_); + + // Attach new buffer + payload_ = data; + size_ = size; +} diff --git a/trunk/src/kernel/srs_kernel_buffer.hpp b/trunk/src/kernel/srs_kernel_buffer.hpp index 59a98180f..05e6059f5 100644 --- a/trunk/src/kernel/srs_kernel_buffer.hpp +++ b/trunk/src/kernel/srs_kernel_buffer.hpp @@ -195,4 +195,43 @@ public: srs_error_t read_bits_se(int32_t &v); }; +// Memory block for shared payload data. +// This class encapsulates a memory buffer with size information, +// designed to be used with SrsSharedPtr for efficient memory sharing. +class SrsMemoryBlock +{ +private: + // The current message parsed size, + // size <= allocated buffer size + // For the payload maybe sent in multiple chunks. + int size_; + // The payload of message, the SrsMemoryBlock manages the memory lifecycle. + // @remark, not all message payload can be decoded to packet. for example, + // video/audio packet use raw bytes, no video/audio packet. + char *payload_; + +public: + SrsMemoryBlock(); + virtual ~SrsMemoryBlock(); + +public: + char *payload() { return payload_; } + int size() { return size_; } + +public: + // Create memory block with specified size. + // @param size, the size of memory to allocate. Must be non-negative. + virtual void create(int size); + // Create memory block from existing buffer. + // @param data, the buffer to copy from. + // @param size, the size of buffer. Must be non-negative. + // @remark, this method will copy the data. + virtual void create(char *data, int size); + // Attach existing buffer to memory block. + // @param data, the buffer to attach, memory block takes ownership. + // @param size, the size of buffer. Must be non-negative. + // @remark, this method takes ownership of data, caller should not free it. + virtual void attach(char *data, int size); +}; + #endif diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index dc622af63..390a211ef 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -265,89 +265,62 @@ void SrsMessageHeader::initialize_video(int size, uint32_t time, int stream) SrsCommonMessage::SrsCommonMessage() { - payload = NULL; - size = 0; + payload_ = SrsSharedPtr(NULL); } SrsCommonMessage::~SrsCommonMessage() { - srs_freepa(payload); + // payload_ automatically cleaned up by SrsSharedPtr } void SrsCommonMessage::create_payload(int size) { - srs_freepa(payload); - - payload = new char[size]; + payload_ = SrsSharedPtr(new SrsMemoryBlock()); + payload_->create(size); srs_verbose("create payload for RTMP message. size=%d", size); } srs_error_t SrsCommonMessage::create(SrsMessageHeader *pheader, char *body, int size) { - // drop previous payload. - srs_freepa(payload); + srs_error_t err = srs_success; - this->header = *pheader; - this->payload = body; - this->size = size; + if (size < 0) { + return srs_error_new(ERROR_RTMP_MESSAGE_CREATE, "create message size=%d", size); + } - return srs_success; + // Create new memory block and attach the body + payload_ = SrsSharedPtr(new SrsMemoryBlock()); + payload_->attach(body, size); + + if (pheader) { + this->header = *pheader; + } + + return err; } -SrsSharedMessageHeader::SrsSharedMessageHeader() +SrsSharedPtrMessage::SrsSharedPtrMessage() : timestamp(0), stream_id(0), message_type(0), prefer_cid(RTMP_CID_OverConnection) { - payload_length = 0; - message_type = 0; - prefer_cid = 0; -} - -SrsSharedMessageHeader::~SrsSharedMessageHeader() -{ -} - -SrsSharedPtrMessage::SrsSharedPtrPayload::SrsSharedPtrPayload() -{ - payload = NULL; - size = 0; - shared_count = 0; -} - -SrsSharedPtrMessage::SrsSharedPtrPayload::~SrsSharedPtrPayload() -{ - srs_freepa(payload); -} - -SrsSharedPtrMessage::SrsSharedPtrMessage() : timestamp(0), stream_id(0), size(0), payload(NULL) -{ - ptr = NULL; + payload_ = SrsSharedPtr(NULL); ++_srs_pps_objs_msgs->sugar; } SrsSharedPtrMessage::~SrsSharedPtrMessage() { - if (ptr) { - if (ptr->shared_count == 0) { - srs_freep(ptr); - } else { - ptr->shared_count--; - } - } + // payload_ automatically cleaned up by SrsSharedPtr } srs_error_t SrsSharedPtrMessage::create(SrsCommonMessage *msg) { srs_error_t err = srs_success; - if ((err = create(&msg->header, msg->payload, msg->size)) != srs_success) { - return srs_error_wrap(err, "create message"); - } - - // to prevent double free of payload: - // initialize already attach the payload of msg, - // detach the payload to transfer the owner to shared ptr. - msg->payload = NULL; - msg->size = 0; + // Share the memory block from the common message + payload_ = msg->payload_; + this->timestamp = msg->header.timestamp; + this->stream_id = msg->header.stream_id; + this->message_type = msg->header.message_type; + this->prefer_cid = msg->header.prefer_cid; return err; } @@ -360,58 +333,35 @@ srs_error_t SrsSharedPtrMessage::create(SrsMessageHeader *pheader, char *payload return srs_error_new(ERROR_RTMP_MESSAGE_CREATE, "create message size=%d", size); } - srs_assert(!ptr); - ptr = new SrsSharedPtrPayload(); + // Create new memory block and attach the payload + payload_ = SrsSharedPtr(new SrsMemoryBlock()); + payload_->attach(payload, size); - // direct attach the data. + // Set header information if (pheader) { - ptr->header.message_type = pheader->message_type; - ptr->header.payload_length = size; - ptr->header.prefer_cid = pheader->prefer_cid; this->timestamp = pheader->timestamp; this->stream_id = pheader->stream_id; + this->message_type = pheader->message_type; + this->prefer_cid = pheader->prefer_cid; } - ptr->payload = payload; - ptr->size = size; - - // message can access it. - this->payload = ptr->payload; - this->size = ptr->size; return err; } void SrsSharedPtrMessage::wrap(char *payload, int size) { - srs_assert(!ptr); - ptr = new SrsSharedPtrPayload(); - - ptr->payload = payload; - ptr->size = size; - - this->payload = ptr->payload; - this->size = ptr->size; -} - -int SrsSharedPtrMessage::count() -{ - return ptr ? ptr->shared_count : 0; + // Create new memory block and wrap the payload + payload_ = SrsSharedPtr(new SrsMemoryBlock()); + payload_->attach(payload, size); } bool SrsSharedPtrMessage::check(int stream_id) { // Ignore error when message has no payload. - if (!ptr) { + if (!payload_.get()) { return true; } - // we donot use the complex basic header, - // ensure the basic header is 1bytes. - if (ptr->header.prefer_cid < 2 || ptr->header.prefer_cid > 63) { - srs_info("change the chunk_id=%d to default=%d", ptr->header.prefer_cid, RTMP_CID_ProtocolControl); - ptr->header.prefer_cid = RTMP_CID_ProtocolControl; - } - // we assume that the stream_id in a group must be the same. if (this->stream_id == stream_id) { return true; @@ -423,38 +373,40 @@ bool SrsSharedPtrMessage::check(int stream_id) bool SrsSharedPtrMessage::is_av() { - return ptr->header.message_type == RTMP_MSG_AudioMessage || ptr->header.message_type == RTMP_MSG_VideoMessage; + return message_type == RTMP_MSG_AudioMessage || message_type == RTMP_MSG_VideoMessage; } bool SrsSharedPtrMessage::is_audio() { - return ptr->header.message_type == RTMP_MSG_AudioMessage; + return message_type == RTMP_MSG_AudioMessage; } bool SrsSharedPtrMessage::is_video() { - return ptr->header.message_type == RTMP_MSG_VideoMessage; + return message_type == RTMP_MSG_VideoMessage; } int SrsSharedPtrMessage::chunk_header(char *cache, int nb_cache, bool c0) { + int payload_length = payload_.get() ? payload_->size() : 0; + if (c0) { - return srs_chunk_header_c0(ptr->header.prefer_cid, (uint32_t)timestamp, - ptr->header.payload_length, ptr->header.message_type, stream_id, cache, nb_cache); + return srs_chunk_header_c0(prefer_cid, (uint32_t)timestamp, + payload_length, message_type, stream_id, cache, nb_cache); } else { - return srs_chunk_header_c3(ptr->header.prefer_cid, (uint32_t)timestamp, + return srs_chunk_header_c3(prefer_cid, (uint32_t)timestamp, cache, nb_cache); } } SrsSharedPtrMessage *SrsSharedPtrMessage::copy() { - srs_assert(ptr); - SrsSharedPtrMessage *copy = copy2(); copy->timestamp = timestamp; copy->stream_id = stream_id; + copy->message_type = message_type; + copy->prefer_cid = prefer_cid; return copy; } @@ -463,15 +415,8 @@ SrsSharedPtrMessage *SrsSharedPtrMessage::copy2() { SrsSharedPtrMessage *copy = new SrsSharedPtrMessage(); - // We got an object from cache, the ptr might exists, so unwrap it. - // srs_assert(!copy->ptr); - - // Reference to this message instead. - copy->ptr = ptr; - ptr->shared_count++; - - copy->payload = ptr->payload; - copy->size = ptr->size; + // Share the memory block + copy->payload_ = payload_; return copy; } @@ -665,23 +610,23 @@ srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage **msgs, int count) if (msg->is_audio()) { if (drop_if_not_match_ && !has_audio_) continue; // Ignore audio packets if no audio stream. - cache_audio(msg->timestamp, msg->payload, msg->size, cache); + cache_audio(msg->timestamp, msg->payload(), msg->size(), cache); } else if (msg->is_video()) { if (drop_if_not_match_ && !has_video_) continue; // Ignore video packets if no video stream. - cache_video(msg->timestamp, msg->payload, msg->size, cache); + cache_video(msg->timestamp, msg->payload(), msg->size(), cache); } else { - cache_metadata(SrsFrameTypeScript, msg->payload, msg->size, cache); + cache_metadata(SrsFrameTypeScript, msg->payload(), msg->size(), cache); } // Cache FLV pts. - cache_pts(SRS_FLV_TAG_HEADER_SIZE + msg->size, pts); + cache_pts(SRS_FLV_TAG_HEADER_SIZE + msg->size(), pts); // Set cache to iovec. iovs[0].iov_base = cache; iovs[0].iov_len = SRS_FLV_TAG_HEADER_SIZE; - iovs[1].iov_base = msg->payload; - iovs[1].iov_len = msg->size; + iovs[1].iov_base = msg->payload(); + iovs[1].iov_len = msg->size(); iovs[2].iov_base = pts; iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE; diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index dac4a572f..9a05bb1fc 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -12,6 +12,9 @@ #include #include +#include +#include + // For srs-librtmp, @see https://github.com/ossrs/srs/issues/213 #ifndef _WIN32 #include @@ -192,25 +195,23 @@ public: // while the shared ptr message used to copy and send. class SrsCommonMessage { +public: // 4.1. Message Header -public: SrsMessageHeader header; - // 4.2. Message Payload + public: - // The current message parsed size, - // size <= header.payload_length - // For the payload maybe sent in multiple chunks. - int size; - // The payload of message, the SrsCommonMessage never know about the detail of payload, - // user must use SrsProtocol.decode_message to get concrete packet. - // @remark, not all message payload can be decoded to packet. for example, - // video/audio packet use raw bytes, no video/audio packet. - char *payload; + // 4.2. Message Payload + SrsSharedPtr payload_; public: SrsCommonMessage(); virtual ~SrsCommonMessage(); +public: + // Backward compatibility accessors + char *payload() { return payload_.get() ? payload_->payload() : NULL; } + int size() { return payload_.get() ? payload_->size() : 0; } + public: // Alloc the payload to specified size of bytes. virtual void create_payload(int size); @@ -223,30 +224,6 @@ public: virtual srs_error_t create(SrsMessageHeader *pheader, char *body, int size); }; -// The message header for shared ptr message. -// only the message for all msgs are same. -class SrsSharedMessageHeader -{ -public: - // 3bytes. - // Three-byte field that represents the size of the payload in bytes. - // It is set in big-endian format. - int32_t payload_length; - // 1byte. - // One byte field to represent the message type. A range of type IDs - // (1-7) are reserved for protocol control messages. - // For example, RTMP_MSG_AudioMessage or RTMP_MSG_VideoMessage. - int8_t message_type; - // Get the prefered cid(chunk stream id) which sendout over. - // set at decoding, and canbe used for directly send message, - // For example, dispatch to all connections. - int prefer_cid; - -public: - SrsSharedMessageHeader(); - virtual ~SrsSharedMessageHeader(); -}; - // The shared ptr message. // For audio/video/data message that need less memory copy. // and only for output. @@ -257,8 +234,6 @@ class SrsSharedPtrMessage { // 4.1. Message Header public: - // The header can shared, only set the timestamp and stream id. - // SrsSharedMessageHeader header; // Four-byte field that contains a timestamp of the message. // The 4 bytes are packed in the big-endian order. // @remark, used as calc timestamp when decode and encode time. @@ -268,41 +243,24 @@ public: // Four-byte field that identifies the stream of the message. These // bytes are set in big-endian format. int32_t stream_id; - // 4.2. Message Payload + // Message type for determining audio/video/data + int8_t message_type; + // Preferred chunk ID for RTMP chunking + int prefer_cid; + public: - // The current message parsed size, - // size <= header.payload_length - // For the payload maybe sent in multiple chunks. - int size; - // The payload of message, the SrsCommonMessage never know about the detail of payload, - // user must use SrsProtocol.decode_message to get concrete packet. - // @remark, not all message payload can be decoded to packet. for example, - // video/audio packet use raw bytes, no video/audio packet. - char *payload; - -private: - class SrsSharedPtrPayload - { - public: - // The shared message header. - SrsSharedMessageHeader header; - // The actual shared payload. - char *payload; - // The size of payload. - int size; - // The reference count - int shared_count; - - public: - SrsSharedPtrPayload(); - virtual ~SrsSharedPtrPayload(); - }; - SrsSharedPtrPayload *ptr; + // 4.2. Message Payload + SrsSharedPtr payload_; public: SrsSharedPtrMessage(); virtual ~SrsSharedPtrMessage(); +public: + // Backward compatibility accessors + char *payload() { return payload_.get() ? payload_->payload() : NULL; } + int size() { return payload_.get() ? payload_->size() : 0; } + public: // Create shared ptr message, // copy header, manage the payload of msg, @@ -318,12 +276,6 @@ public: // Create shared ptr message from RAW payload. // @remark Note that the header is set to zero. virtual void wrap(char *payload, int size); - // Get current reference count. - // when this object created, count set to 0. - // if copy() this object, count increase 1. - // if this or copy deleted, free payload when count is 0, or count--. - // @remark, assert object is created. - virtual int count(); // check prefer cid and stream id. // @return whether stream id already set. virtual bool check(int stream_id); diff --git a/trunk/src/kernel/srs_kernel_kbps.cpp b/trunk/src/kernel/srs_kernel_kbps.cpp index ec906ff5e..85bc91b5c 100644 --- a/trunk/src/kernel/srs_kernel_kbps.cpp +++ b/trunk/src/kernel/srs_kernel_kbps.cpp @@ -366,6 +366,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) } string &recvfrom_desc = stats->recvfrom_desc; + (void)recvfrom_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_recvfrom->update(_st_stat_recvfrom); _srs_pps_recvfrom_eagain->update(_st_stat_recvfrom_eagain); @@ -378,6 +379,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) #endif string &io_desc = stats->io_desc; + (void)io_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_read->update(_st_stat_read); _srs_pps_read_eagain->update(_st_stat_read_eagain); @@ -392,6 +394,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) #endif string &msg_desc = stats->msg_desc; + (void)msg_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_recvmsg->update(_st_stat_recvmsg); _srs_pps_recvmsg_eagain->update(_st_stat_recvmsg_eagain); @@ -404,6 +407,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) #endif string &epoll_desc = stats->epoll_desc; + (void)epoll_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_epoll->update(_st_stat_epoll); _srs_pps_epoll_zero->update(_st_stat_epoll_zero); @@ -416,6 +420,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) #endif string &sched_desc = stats->sched_desc; + (void)sched_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_sched_160ms->update(_st_stat_sched_160ms); _srs_pps_sched_s->update(_st_stat_sched_s); @@ -448,6 +453,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) } string &thread_desc = stats->thread_desc; + (void)thread_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_thread_run->update(_st_stat_thread_run); _srs_pps_thread_idle->update(_st_stat_thread_idle); diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index 69d5a1185..9150acf3f 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -761,7 +761,7 @@ SrsRtpPacket::SrsRtpPacket() { payload_ = NULL; payload_type_ = SrsRtpPacketPayloadTypeUnknown; - shared_buffer_ = NULL; + shared_buffer_ = SrsSharedPtr(NULL); actual_buffer_size_ = 0; nalu_type = 0; @@ -776,7 +776,7 @@ SrsRtpPacket::SrsRtpPacket() SrsRtpPacket::~SrsRtpPacket() { srs_freep(payload_); - srs_freep(shared_buffer_); + // shared_buffer_ automatically cleaned up by SrsSharedPtr } char *SrsRtpPacket::wrap(int size) @@ -785,23 +785,22 @@ char *SrsRtpPacket::wrap(int size) actual_buffer_size_ = size; // If the buffer is large enough, reuse it. - if (shared_buffer_ && shared_buffer_->size >= size) { - return shared_buffer_->payload; + if (shared_buffer_.get() && shared_buffer_->size() >= size) { + return shared_buffer_->payload(); } - // Create a large enough message, with under-layer buffer. - srs_freep(shared_buffer_); - shared_buffer_ = new SrsSharedPtrMessage(); + // Create a large enough memory block, with under-layer buffer. + shared_buffer_ = SrsSharedPtr(new SrsMemoryBlock()); - // Create under-layer buffer for new message + // Create under-layer buffer for new memory block // For RTC, we use larger under-layer buffer for each packet. int nb_buffer = srs_max(size, kRtpPacketSize); char *buf = new char[nb_buffer]; - shared_buffer_->wrap(buf, nb_buffer); + shared_buffer_->attach(buf, nb_buffer); ++_srs_pps_objs_rbuf->sugar; - return shared_buffer_->payload; + return shared_buffer_->payload(); } char *SrsRtpPacket::wrap(char *data, int size) @@ -811,18 +810,14 @@ char *SrsRtpPacket::wrap(char *data, int size) return buf; } -char *SrsRtpPacket::wrap(SrsSharedPtrMessage *msg) +char *SrsRtpPacket::wrap(SrsSharedPtr block) { - // Generally, the wrap(msg) is used for RTMP to RTC, where the msg - // is not generated by RTC. - srs_freep(shared_buffer_); + // Copy the shared memory block. + shared_buffer_ = block; + // If we wrap a memory block, the size of packet equals to the block size. + actual_buffer_size_ = shared_buffer_.get() ? shared_buffer_->size() : 0; - // Copy from the new message. - shared_buffer_ = msg->copy(); - // If we wrap a message, the size of packet equals to the message size. - actual_buffer_size_ = shared_buffer_->size; - - return msg->payload; + return shared_buffer_.get() ? shared_buffer_->payload() : NULL; } SrsRtpPacket *SrsRtpPacket::copy() @@ -834,7 +829,7 @@ SrsRtpPacket *SrsRtpPacket::copy() cp->payload_type_ = payload_type_; cp->nalu_type = nalu_type; - cp->shared_buffer_ = shared_buffer_ ? shared_buffer_->copy2() : NULL; + cp->shared_buffer_ = shared_buffer_; // Copy shared pointer cp->actual_buffer_size_ = actual_buffer_size_; cp->frame_type = frame_type; diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp index db14678e4..3cb3175d7 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp @@ -9,6 +9,7 @@ #include +#include #include #include @@ -27,6 +28,7 @@ #define SRS_NACK_DEBUG_DROP_PACKET_N 3 class SrsRtpPacket; +class SrsMemoryBlock; // The RTP packet max size, should never exceed this size. const int kRtpPacketSize = 1500; @@ -64,7 +66,6 @@ const uint8_t kEnd = 0x40; // Fu-header end bit class SrsBuffer; class SrsRtpRawPayload; class SrsRtpFUAPayload2; -class SrsSharedPtrMessage; class SrsRtpExtensionTypes; // Fast parse the SSRC from RTP packet. Return 0 if invalid. @@ -329,11 +330,11 @@ private: SrsRtpPacketPayloadType payload_type_; private: - // The original shared message, all RTP packets can refer to its data. - // Note that the size of shared msg, is not the packet size, it's a larger aligned buffer. + // The original shared memory block, all RTP packets can refer to its data. + // Note that the size of shared memory block, is not the packet size, it's a larger aligned buffer. // @remark Note that it may point to the whole RTP packet(for RTP parser, which decode RTP packet from buffer), // and it may point to the RTP payload(for RTMP to RTP, which build RTP header and payload). - SrsSharedPtrMessage *shared_buffer_; + SrsSharedPtr shared_buffer_; // The size of RTP packet or RTP payload. int actual_buffer_size_; // Helper fields. @@ -360,8 +361,8 @@ public: // Wrap buffer to shared_message, which is managed by us. char *wrap(int size); char *wrap(char *data, int size); - // Wrap the shared message, we copy it. - char *wrap(SrsSharedPtrMessage *msg); + // Wrap the shared memory block, we copy it. + char *wrap(SrsSharedPtr block); // Copy the RTP packet. virtual SrsRtpPacket *copy(); diff --git a/trunk/src/protocol/srs_protocol_format.cpp b/trunk/src/protocol/srs_protocol_format.cpp index 72ab42d9f..8b37f6b93 100644 --- a/trunk/src/protocol/srs_protocol_format.cpp +++ b/trunk/src/protocol/srs_protocol_format.cpp @@ -30,8 +30,8 @@ srs_error_t SrsRtmpFormat::on_metadata(SrsOnMetaDataPacket *meta) srs_error_t SrsRtmpFormat::on_audio(SrsSharedPtrMessage *shared_audio) { SrsSharedPtrMessage *msg = shared_audio; - char *data = msg->payload; - int size = msg->size; + char *data = msg->payload(); + int size = msg->size(); return SrsFormat::on_audio(msg->timestamp, data, size); } @@ -44,8 +44,8 @@ srs_error_t SrsRtmpFormat::on_audio(int64_t timestamp, char *data, int size) srs_error_t SrsRtmpFormat::on_video(SrsSharedPtrMessage *shared_video) { SrsSharedPtrMessage *msg = shared_video; - char *data = msg->payload; - int size = msg->size; + char *data = msg->payload(); + int size = msg->size(); return SrsFormat::on_video(msg->timestamp, data, size); } diff --git a/trunk/src/protocol/srs_protocol_rtmp_stack.cpp b/trunk/src/protocol/srs_protocol_rtmp_stack.cpp index 1cdafe166..8626a73c7 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_protocol_rtmp_stack.cpp @@ -341,7 +341,7 @@ srs_error_t SrsProtocol::recv_message(SrsCommonMessage **pmsg) continue; } - if (msg->size <= 0 || msg->header.payload_length <= 0) { + if (msg->size() <= 0 || msg->header.payload_length <= 0) { srs_trace("ignore empty message(type=%d, size=%d, time=%" PRId64 ", sid=%d).", msg->header.message_type, msg->header.payload_length, msg->header.timestamp, msg->header.stream_id); @@ -368,10 +368,10 @@ srs_error_t SrsProtocol::decode_message(SrsCommonMessage *msg, SrsPacket **ppack srs_error_t err = srs_success; srs_assert(msg != NULL); - srs_assert(msg->payload != NULL); - srs_assert(msg->size > 0); + srs_assert(msg->payload() != NULL); + srs_assert(msg->size() > 0); - SrsBuffer stream(msg->payload, msg->size); + SrsBuffer stream(msg->payload(), msg->size()); // decode the packet. SrsPacket *packet = NULL; @@ -407,20 +407,20 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage **msgs, int nb_msg } // ignore empty message. - if (!msg->payload || msg->size <= 0) { + if (!msg->payload() || msg->size() <= 0) { continue; } // p set to current write position, // it's ok when payload is NULL and size is 0. - char *p = msg->payload; - char *pend = msg->payload + msg->size; + char *p = msg->payload(); + char *pend = msg->payload() + msg->size(); // always write the header event payload is empty. while (p < pend) { // always has header int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index; - int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload); + int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload()); srs_assert(nbh > 0); // header iov @@ -985,6 +985,7 @@ srs_error_t SrsProtocol::read_message_header(SrsChunkStream *chunk, char fmt) // create msg when new chunk stream start if (!chunk->msg) { chunk->msg = new SrsCommonMessage(); + chunk->writing_pos_ = chunk->msg->payload(); } // read message header from socket to buffer. @@ -1199,32 +1200,39 @@ srs_error_t SrsProtocol::read_message_payload(SrsChunkStream *chunk, SrsCommonMe chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); *pmsg = chunk->msg; + chunk->msg = NULL; + chunk->writing_pos_ = NULL; return err; } srs_assert(chunk->header.payload_length > 0); // the chunk payload size. - int payload_size = chunk->header.payload_length - chunk->msg->size; - payload_size = srs_min(payload_size, in_chunk_size); + int nn_written = (int)(chunk->writing_pos_ - chunk->msg->payload()); + int payload_size = chunk->header.payload_length - nn_written; // Left bytes to read. + payload_size = srs_min(payload_size, in_chunk_size); // Restrict to chunk size. // create msg payload if not initialized - if (!chunk->msg->payload) { + if (!chunk->msg->payload()) { chunk->msg->create_payload(chunk->header.payload_length); + chunk->writing_pos_ = chunk->msg->payload(); } // read payload to buffer if ((err = in_buffer->grow(skt, payload_size)) != srs_success) { return srs_error_wrap(err, "read %d bytes payload", payload_size); } - memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->read_slice(payload_size), payload_size); - chunk->msg->size += payload_size; + memcpy(chunk->writing_pos_, in_buffer->read_slice(payload_size), payload_size); + chunk->writing_pos_ += payload_size; // got entire RTMP message? - if (chunk->header.payload_length == chunk->msg->size) { + nn_written = (int)(chunk->writing_pos_ - chunk->msg->payload()); + if (nn_written == chunk->header.payload_length) { *pmsg = chunk->msg; + chunk->msg = NULL; + chunk->writing_pos_ = NULL; return err; } @@ -1450,6 +1458,7 @@ SrsChunkStream::SrsChunkStream(int _cid) cid = _cid; has_extended_timestamp = false; msg = NULL; + writing_pos_ = NULL; msg_count = 0; extended_timestamp = 0; } diff --git a/trunk/src/protocol/srs_protocol_rtmp_stack.hpp b/trunk/src/protocol/srs_protocol_rtmp_stack.hpp index bff00f86f..ea0306d8e 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_protocol_rtmp_stack.hpp @@ -395,6 +395,8 @@ public: bool has_extended_timestamp; // The partially read message. SrsCommonMessage *msg; + // Current writing position of message. + char *writing_pos_; // Decoded msg count, to identify whether the chunk stream is fresh. int64_t msg_count; // Because the extended timestamp may be a delta timestamp, it can differ diff --git a/trunk/src/protocol/srs_protocol_rtp.cpp b/trunk/src/protocol/srs_protocol_rtp.cpp index d4f05dfc0..a6390d12b 100644 --- a/trunk/src/protocol/srs_protocol_rtp.cpp +++ b/trunk/src/protocol/srs_protocol_rtp.cpp @@ -148,7 +148,7 @@ srs_error_t SrsRtpVideoBuilder::package_nalus(SrsSharedPtrMessage *msg, const ve pkt->header.set_sequence(video_sequence_++); pkt->header.set_timestamp(msg->timestamp * 90); pkt->set_payload(raw_raw, SrsRtpPacketPayloadTypeNALU); - pkt->wrap(msg); + pkt->wrap(msg->payload_); } else { // We must free it, should never use RTP packets to free it, // because more than one RTP packet will refer to it. @@ -202,7 +202,7 @@ srs_error_t SrsRtpVideoBuilder::package_nalus(SrsSharedPtrMessage *msg, const ve pkt->set_payload(fua, SrsRtpPacketPayloadTypeFUA); } - pkt->wrap(msg); + pkt->wrap(msg->payload_); nb_left -= packet_size; } @@ -231,7 +231,7 @@ srs_error_t SrsRtpVideoBuilder::package_single_nalu(SrsSharedPtrMessage *msg, Sr raw->payload = sample->bytes; raw->nn_payload = sample->size; - pkt->wrap(msg); + pkt->wrap(msg->payload_); return err; } @@ -292,7 +292,7 @@ srs_error_t SrsRtpVideoBuilder::package_fu_a(SrsSharedPtrMessage *msg, SrsSample fua->size = packet_size; } - pkt->wrap(msg); + pkt->wrap(msg->payload_); p += packet_size; nb_left -= packet_size; diff --git a/trunk/src/utest/srs_utest_fmp4.cpp b/trunk/src/utest/srs_utest_fmp4.cpp index 69c15bda9..d102cc205 100644 --- a/trunk/src/utest/srs_utest_fmp4.cpp +++ b/trunk/src/utest/srs_utest_fmp4.cpp @@ -67,9 +67,9 @@ public: SrsSharedPtrMessage::wrap(payload, 1024); if (is_video_msg) { - ptr->header.message_type = RTMP_MSG_VideoMessage; + message_type = RTMP_MSG_VideoMessage; } else { - ptr->header.message_type = RTMP_MSG_AudioMessage; + message_type = RTMP_MSG_AudioMessage; } } virtual ~MockSrsSharedPtrMessage() {} diff --git a/trunk/src/utest/srs_utest_kernel.cpp b/trunk/src/utest/srs_utest_kernel.cpp index f735c83fd..3408331e4 100644 --- a/trunk/src/utest/srs_utest_kernel.cpp +++ b/trunk/src/utest/srs_utest_kernel.cpp @@ -5336,14 +5336,6 @@ VOID TEST(KernelFLVTest, CoverSharedPtrMessage) HELPER_EXPECT_FAILED(m.create(&h, NULL, -1)); } - if (true) { - SrsCommonMessage cm; - cm.size = -1; - - SrsSharedPtrMessage m; - HELPER_EXPECT_FAILED(m.create(&cm)); - } - if (true) { SrsMessageHeader h; h.prefer_cid = 1; @@ -6926,3 +6918,93 @@ VOID TEST(KernelUtilityTest, Base64Decode) EXPECT_STRNE("admin:admin", plaintext.c_str()); } } + +VOID TEST(KernelMemoryBlockTest, MemoryBlockBasic) +{ + + // Test basic construction and destruction + if (true) { + SrsMemoryBlock block; + EXPECT_EQ(0, block.size()); + EXPECT_EQ(NULL, block.payload()); + } + + // Test create with size + if (true) { + SrsMemoryBlock block; + block.create(1024); + EXPECT_EQ(1024, block.size()); + EXPECT_NE((char *)NULL, block.payload()); + } + + // Test create with data + if (true) { + SrsMemoryBlock block; + char test_data[] = "Hello, World!"; + int test_size = strlen(test_data); + + block.create(test_data, test_size); + EXPECT_EQ(test_size, block.size()); + EXPECT_NE((char *)NULL, block.payload()); + EXPECT_EQ(0, memcmp(block.payload(), test_data, test_size)); + } + + // Test attach + if (true) { + SrsMemoryBlock block; + char *test_data = new char[100]; + memset(test_data, 0x42, 100); + + block.attach(test_data, 100); + EXPECT_EQ(100, block.size()); + EXPECT_EQ(test_data, block.payload()); + + // Memory will be freed by block destructor + } +} + +VOID TEST(KernelMemoryBlockTest, SharedMemoryBlock) +{ + + // Test basic shared memory block usage + if (true) { + SrsSharedPtr shared_block(new SrsMemoryBlock()); + shared_block->create(1024); + + EXPECT_EQ(1024, shared_block->size()); + EXPECT_NE((char *)NULL, shared_block->payload()); + + // Test sharing + SrsSharedPtr shared_copy = shared_block; + EXPECT_EQ(shared_block->payload(), shared_copy->payload()); + EXPECT_EQ(shared_block->size(), shared_copy->size()); + } + + // Test multiple references + if (true) { + SrsSharedPtr original(new SrsMemoryBlock()); + char test_data[] = "Shared memory test data"; + original->create(test_data, strlen(test_data)); + + // Create multiple references + SrsSharedPtr copy1 = original; + SrsSharedPtr copy2 = original; + SrsSharedPtr copy3 = copy1; + + // All should point to the same memory + EXPECT_EQ(original->payload(), copy1->payload()); + EXPECT_EQ(original->payload(), copy2->payload()); + EXPECT_EQ(original->payload(), copy3->payload()); + + // All should have the same size + EXPECT_EQ(original->size(), copy1->size()); + EXPECT_EQ(original->size(), copy2->size()); + EXPECT_EQ(original->size(), copy3->size()); + + // Verify data integrity + EXPECT_EQ(0, memcmp(original->payload(), test_data, strlen(test_data))); + EXPECT_EQ(0, memcmp(copy1->payload(), test_data, strlen(test_data))); + EXPECT_EQ(0, memcmp(copy2->payload(), test_data, strlen(test_data))); + EXPECT_EQ(0, memcmp(copy3->payload(), test_data, strlen(test_data))); + } +} diff --git a/trunk/src/utest/srs_utest_protocol.cpp b/trunk/src/utest/srs_utest_protocol.cpp index 9cc8cb5d3..d28a7b39a 100644 --- a/trunk/src/utest/srs_utest_protocol.cpp +++ b/trunk/src/utest/srs_utest_protocol.cpp @@ -899,7 +899,6 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray) SrsSharedPtrMessage msg; char *payload = new char[1024]; HELPER_EXPECT_SUCCESS(msg.create(&header, payload, 1024)); - EXPECT_EQ(0, msg.count()); if (true) { SrsMessageArray arr(3); @@ -908,15 +907,11 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray) SrsUniquePtr parr2(parr, srs_utest_free_message_array); arr.msgs[0] = msg.copy(); - EXPECT_EQ(1, msg.count()); arr.msgs[1] = msg.copy(); - EXPECT_EQ(2, msg.count()); arr.msgs[2] = msg.copy(); - EXPECT_EQ(3, msg.count()); } - EXPECT_EQ(0, msg.count()); if (true) { SrsMessageArray arr(3); @@ -925,12 +920,9 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray) SrsUniquePtr parr2(parr, srs_utest_free_message_array); arr.msgs[0] = msg.copy(); - EXPECT_EQ(1, msg.count()); arr.msgs[2] = msg.copy(); - EXPECT_EQ(2, msg.count()); } - EXPECT_EQ(0, msg.count()); } /** @@ -4402,9 +4394,8 @@ VOID TEST(ProtocolStackTest, ProtocolSendVMessage) SrsCommonMessage *msg = new SrsCommonMessage(); SrsUniquePtr msg_uptr(msg); - msg->size = sizeof(data); - msg->payload = new char[msg->size]; - memcpy(msg->payload, data, msg->size); + msg->create_payload(sizeof(data)); + memcpy(msg->payload(), data, sizeof(data)); SrsSharedPtrMessage m; HELPER_ASSERT_SUCCESS(m.create(msg)); @@ -4964,8 +4955,8 @@ VOID TEST(ProtocolStackTest, ProtocolAckSizeFlow) if (true) { SrsCommonMessage *msg = new SrsCommonMessage(); SrsUniquePtr msg_uptr(msg); - msg->header.payload_length = msg->size = 4096; - msg->payload = new char[msg->size]; + msg->create_payload(4096); + msg->header.payload_length = 4096; msg->header.message_type = 9; EXPECT_TRUE(msg->header.is_video()); @@ -5014,8 +5005,8 @@ VOID TEST(ProtocolStackTest, ProtocolAckSizeFlow) if (true) { SrsCommonMessage *msg = new SrsCommonMessage(); SrsUniquePtr msg_uptr(msg); - msg->header.payload_length = msg->size = 4096; - msg->payload = new char[msg->size]; + msg->header.payload_length = 4096; + msg->create_payload(4096); msg->header.message_type = 9; EXPECT_TRUE(msg->header.is_video()); diff --git a/trunk/src/utest/srs_utest_rtmp.cpp b/trunk/src/utest/srs_utest_rtmp.cpp index ff346bd54..0f9109337 100644 --- a/trunk/src/utest/srs_utest_rtmp.cpp +++ b/trunk/src/utest/srs_utest_rtmp.cpp @@ -250,7 +250,6 @@ VOID TEST(ProtocolRTMPTest, SendPacketsError) SrsCommonMessage pkt; pkt.header.initialize_audio(200, 1000, 1); pkt.create_payload(256); - pkt.size = 256; SrsSharedPtrMessage *msg = new SrsSharedPtrMessage(); msg->create(&pkt); @@ -346,7 +345,6 @@ VOID TEST(ProtocolRTMPTest, HugeMessages) SrsCommonMessage pkt; pkt.header.initialize_audio(200, 1000, 1); pkt.create_payload(256); - pkt.size = 256; SrsSharedPtrMessage *msg = new SrsSharedPtrMessage(); msg->create(&pkt); @@ -362,7 +360,6 @@ VOID TEST(ProtocolRTMPTest, HugeMessages) SrsCommonMessage pkt; pkt.header.initialize_audio(200, 1000, 1); pkt.create_payload(256); - pkt.size = 256; SrsSharedPtrMessage *msg = new SrsSharedPtrMessage(); msg->create(&pkt); @@ -384,7 +381,6 @@ VOID TEST(ProtocolRTMPTest, HugeMessages) SrsCommonMessage pkt; pkt.header.initialize_audio(200, 1000, 1); pkt.create_payload(256); - pkt.size = 256; SrsSharedPtrMessage *msg = new SrsSharedPtrMessage(); msg->create(&pkt); @@ -412,7 +408,6 @@ VOID TEST(ProtocolRTMPTest, DecodeMessages) SrsCommonMessage msg; msg.header.initialize_amf0_script(1, 1); msg.create_payload(1); - msg.size = 1; SrsPacket *pkt; HELPER_EXPECT_FAILED(p.decode_message(&msg, &pkt)); @@ -455,8 +450,7 @@ SrsCommonMessage *_create_amf0(char *bytes, int size, int stream_id) SrsCommonMessage *msg = new SrsCommonMessage(); msg->header.initialize_amf0_script(size, stream_id); msg->create_payload(size); - memcpy(msg->payload, bytes, size); - msg->size = size; + memcpy(msg->payload(), bytes, size); return msg; } @@ -2949,7 +2943,7 @@ VOID TEST(ProtocolRTMPTest, CreateRTMPMessage) if (true) { SrsSharedPtrMessage *msg = NULL; HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeScript, 0, _strcpy("Hello"), 5, 0, &msg)); - EXPECT_STREQ("Hello", msg->payload); + EXPECT_STREQ("Hello", msg->payload()); srs_freep(msg); } @@ -2957,7 +2951,7 @@ VOID TEST(ProtocolRTMPTest, CreateRTMPMessage) if (true) { SrsSharedPtrMessage *msg = NULL; HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeVideo, 0, _strcpy("Hello"), 5, 0, &msg)); - EXPECT_STREQ("Hello", msg->payload); + EXPECT_STREQ("Hello", msg->payload()); srs_freep(msg); } @@ -2965,13 +2959,13 @@ VOID TEST(ProtocolRTMPTest, CreateRTMPMessage) if (true) { SrsSharedPtrMessage *msg = NULL; HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeAudio, 0, _strcpy("Hello"), 5, 0, &msg)); - EXPECT_STREQ("Hello", msg->payload); + EXPECT_STREQ("Hello", msg->payload()); srs_freep(msg); } if (true) { SrsCommonMessage *msg = NULL; HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeAudio, 0, _strcpy("Hello"), 5, 0, &msg)); - EXPECT_STREQ("Hello", msg->payload); + EXPECT_STREQ("Hello", msg->payload()); srs_freep(msg); } }