diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 976b26b10..20551f544 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 7.0 Changelog +* v7.0, 2025-09-01, Merge [#4464](https://github.com/ossrs/srs/pull/4464): AI: Use shared ptr in RTMP message. v7.0.73 (#4464) * v7.0, 2025-09-01, Merge [#4463](https://github.com/ossrs/srs/pull/4463): AI: Use SrsHttpUri for URL parsing and add legacy RTMP URL conversion. v7.0.72 (#4463) * v7.0, 2025-09-01, Merge [#4462](https://github.com/ossrs/srs/pull/4462): HTTP: Rename HTTP hijack to dynamic match for better clarity. v7.0.71 (#4462) * v7.0, 2025-08-31, Merge [#4461](https://github.com/ossrs/srs/pull/4461): AI: Extract shared components and improve SRS server architecture. v7.0.70 (#4461) diff --git a/trunk/src/app/srs_app_dash.cpp b/trunk/src/app/srs_app_dash.cpp index 31e7d7781..1aaab172f 100644 --- a/trunk/src/app/srs_app_dash.cpp +++ b/trunk/src/app/srs_app_dash.cpp @@ -645,7 +645,7 @@ srs_error_t SrsDashController::refresh_init_mp4(SrsSharedPtrMessage *msg, SrsFor { srs_error_t err = srs_success; - if (msg->size <= 0 || (msg->is_video() && !format->vcodec->is_avc_codec_ok()) || (msg->is_audio() && !format->acodec->is_aac_codec_ok())) { + if (msg->size() <= 0 || (msg->is_video() && !format->vcodec->is_avc_codec_ok()) || (msg->is_audio() && !format->acodec->is_aac_codec_ok())) { srs_warn("DASH: Ignore empty sequence header."); return err; } diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index dd7a3440f..db5b994b5 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -319,7 +319,7 @@ srs_error_t SrsDvrFlvSegmenter::encode_metadata(SrsSharedPtrMessage *metadata) return err; } - SrsBuffer stream(metadata->payload, metadata->size); + SrsBuffer stream(metadata->payload(), metadata->size()); SrsUniquePtr name(SrsAmf0Any::str()); if ((err = name->read(&stream)) != srs_success) { @@ -370,8 +370,8 @@ srs_error_t SrsDvrFlvSegmenter::encode_audio(SrsSharedPtrMessage *audio, SrsForm { srs_error_t err = srs_success; - char *payload = audio->payload; - int size = audio->size; + char *payload = audio->payload(); + int size = audio->size(); if ((err = enc->write_audio(audio->timestamp, payload, size)) != srs_success) { return srs_error_wrap(err, "write audio"); } @@ -383,8 +383,8 @@ srs_error_t SrsDvrFlvSegmenter::encode_video(SrsSharedPtrMessage *video, SrsForm { srs_error_t err = srs_success; - char *payload = video->payload; - int size = video->size; + char *payload = video->payload(); + int size = video->size(); bool sh = (format->video->avc_packet_type == SrsVideoAvcFrameTraitSequenceHeader); bool keyframe = (!sh && format->video->frame_type == SrsVideoAvcFrameTypeKeyFrame); @@ -870,8 +870,8 @@ srs_error_t SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage *msg) return err; } - char *payload = msg->payload; - int size = msg->size; + char *payload = msg->payload(); + int size = msg->size(); bool codec_ok = SrsFlvVideo::h264(payload, size); codec_ok = codec_ok ? true : SrsFlvVideo::hevc(payload, size); diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index e7ece8ece..b17585eab 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -338,7 +338,7 @@ srs_error_t SrsEdgeFlvUpstream::decode_message(SrsCommonMessage *msg, SrsPacket srs_error_t err = srs_success; SrsPacket *packet = NULL; - SrsBuffer stream(msg->payload, msg->size); + SrsBuffer stream(msg->payload(), msg->size()); SrsMessageHeader &header = msg->header; if (header.is_amf0_data() || header.is_amf3_data()) { @@ -924,7 +924,7 @@ srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage *msg) // the msg is auto free by source, // so we just ignore, or copy then send it. - if (msg->size <= 0 || msg->header.is_set_chunk_size() || msg->header.is_window_ackledgement_size() || msg->header.is_ackledgement()) { + if (msg->size() <= 0 || msg->header.is_set_chunk_size() || msg->header.is_window_ackledgement_size() || msg->header.is_ackledgement()) { return err; } diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index d9058f0c1..6e50df3aa 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -127,7 +127,7 @@ srs_error_t SrsForwarder::on_audio(SrsSharedPtrMessage *shared_audio) return srs_error_wrap(err, "jitter"); } - if (SrsFlvAudio::sh(msg->payload, msg->size)) { + if (SrsFlvAudio::sh(msg->payload(), msg->size())) { srs_freep(sh_audio); sh_audio = msg->copy(); } @@ -150,7 +150,7 @@ srs_error_t SrsForwarder::on_video(SrsSharedPtrMessage *shared_video) return srs_error_wrap(err, "jitter"); } - if (SrsFlvVideo::sh(msg->payload, msg->size)) { + if (SrsFlvVideo::sh(msg->payload(), msg->size())) { srs_freep(sh_video); sh_video = msg->copy(); } diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 6deac3b7f..18d4a7df2 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -2129,7 +2129,7 @@ srs_error_t SrsGbMuxer::rtmp_write_packet(char type, uint32_t timestamp, char *d srs_trace("Muxer: send msg %s age=%d, dts=%" PRId64 ", size=%d", msg->is_audio() ? "A" : msg->is_video() ? "V" : "N", - pprint_->age(), msg->timestamp, msg->size); + pprint_->age(), msg->timestamp, msg->size()); } // send out encoded msg. diff --git a/trunk/src/app/srs_app_hds.cpp b/trunk/src/app/srs_app_hds.cpp index 4f42027c1..378c8a2b3 100644 --- a/trunk/src/app/srs_app_hds.cpp +++ b/trunk/src/app/srs_app_hds.cpp @@ -40,7 +40,7 @@ char flv_header[] = {'F', 'L', 'V', string serialFlv(SrsSharedPtrMessage *msg) { - int size = 15 + msg->size; + int size = 15 + msg->size(); char *byte = new char[size]; SrsUniquePtr stream(new SrsBuffer(byte, size)); @@ -50,14 +50,14 @@ string serialFlv(SrsSharedPtrMessage *msg) char type = msg->is_video() ? 0x09 : 0x08; stream->write_1bytes(type); - stream->write_3bytes(msg->size); + stream->write_3bytes(msg->size()); stream->write_3bytes((int32_t)dts); stream->write_1bytes(dts >> 24 & 0xFF); stream->write_3bytes(0); - stream->write_bytes(msg->payload, msg->size); + stream->write_bytes(msg->payload(), msg->size()); // pre tag size - int preTagSize = msg->size + 11; + int preTagSize = msg->size() + 11; stream->write_4bytes(preTagSize); string ret(stream->data(), stream->size()); @@ -298,7 +298,7 @@ srs_error_t SrsHds::on_video(SrsSharedPtrMessage *msg) return err; } - if (SrsFlvVideo::sh(msg->payload, msg->size)) { + if (SrsFlvVideo::sh(msg->payload(), msg->size())) { srs_freep(video_sh); video_sh = msg->copy(); } @@ -348,7 +348,7 @@ srs_error_t SrsHds::on_audio(SrsSharedPtrMessage *msg) return err; } - if (SrsFlvAudio::sh(msg->payload, msg->size)) { + if (SrsFlvAudio::sh(msg->payload(), msg->size())) { srs_freep(audio_sh); audio_sh = msg->copy(); } diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 0b5d3a8ec..64019beb1 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -392,11 +392,11 @@ srs_error_t SrsFlvStreamEncoder::write_tags(SrsSharedPtrMessage **msgs, int coun for (int i = 0; i < count; i++) { SrsSharedPtrMessage *msg = msgs[i]; if (msg->is_video()) { - if (!SrsFlvVideo::sh(msg->payload, msg->size)) + if (!SrsFlvVideo::sh(msg->payload(), msg->size())) nn_video_frames++; has_video = true; } else if (msg->is_audio()) { - if (!SrsFlvAudio::sh(msg->payload, msg->size)) + if (!SrsFlvAudio::sh(msg->payload(), msg->size())) nn_audio_frames++; has_audio = true; } @@ -958,11 +958,11 @@ srs_error_t SrsLiveStream::streaming_send_messages(ISrsBufferEncoder *enc, SrsSh SrsSharedPtrMessage *msg = msgs[i]; if (msg->is_audio()) { - err = enc->write_audio(msg->timestamp, msg->payload, msg->size); + err = enc->write_audio(msg->timestamp, msg->payload(), msg->size()); } else if (msg->is_video()) { - err = enc->write_video(msg->timestamp, msg->payload, msg->size); + err = enc->write_video(msg->timestamp, msg->payload(), msg->size()); } else { - err = enc->write_metadata(msg->timestamp, msg->payload, msg->size); + err = enc->write_metadata(msg->timestamp, msg->payload(), msg->size()); } if (err != srs_success) { diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index ff526b505..6b74ee061 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -626,7 +626,7 @@ srs_error_t SrsMpegtsOverUdp::rtmp_write_packet(char type, uint32_t timestamp, c srs_trace("mpegts: send msg %s age=%d, dts=%" PRId64 ", size=%d", msg->is_audio() ? "A" : msg->is_video() ? "V" : "N", - pprint->age(), msg->timestamp, msg->size); + pprint->age(), msg->timestamp, msg->size()); } // send out encoded msg. diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 2ce2f8a4d..bd73043b6 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -78,7 +78,7 @@ srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage *shared_audio, SrsFor // If no audio RAW frame, or not parsed for no sequence header, drop the packet. if (format->audio->nb_samples == 0) { - srs_warn("RTC: Drop AAC %d bytes for no sample", shared_audio->size); + srs_warn("RTC: Drop AAC %d bytes for no sample", shared_audio->size()); return err; } @@ -1186,7 +1186,7 @@ srs_error_t SrsRtcRtpBuilder::on_video(SrsSharedPtrMessage *msg) srs_error_t err = srs_success; // cache the sequence header if h264 - bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size); + bool is_sequence_header = SrsFlvVideo::sh(msg->payload(), msg->size()); if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) { return srs_error_wrap(err, "meta update video"); } @@ -1919,7 +1919,7 @@ void SrsRtcFrameBuilder::packet_aac(SrsCommonMessage *audio, char *data, int len int rtmp_len = len + 2; audio->header.initialize_audio(rtmp_len, pts, 1); audio->create_payload(rtmp_len); - SrsBuffer stream(audio->payload, rtmp_len); + SrsBuffer stream(audio->payload(), rtmp_len); uint8_t aac_flag = (SrsAudioCodecIdAAC << 4) | (SrsAudioSampleRate44100 << 2) | (SrsAudioSampleBits16bit << 1) | SrsAudioChannelsStereo; stream.write_1bytes(aac_flag); if (is_header) { @@ -1928,7 +1928,6 @@ void SrsRtcFrameBuilder::packet_aac(SrsCommonMessage *audio, char *data, int len stream.write_1bytes(1); } stream.write_bytes(data, len); - audio->size = rtmp_len; } srs_error_t SrsRtcFrameBuilder::packet_video(SrsRtpPacket *pkt) @@ -2406,8 +2405,7 @@ srs_error_t SrsRtcFrameBuilder::packet_video_rtmp(const uint16_t start, const ui SrsCommonMessage rtmp; rtmp.header.initialize_video(nb_payload, pkt->get_avsync_time(), 1); rtmp.create_payload(nb_payload); - rtmp.size = nb_payload; - SrsBuffer payload(rtmp.payload, rtmp.size); + SrsBuffer payload(rtmp.payload(), rtmp.size()); if (video_codec_ == SrsVideoCodecIdHEVC) { // @see: https://veovera.org/docs/enhanced/enhanced-rtmp-v1.pdf, page 8 payload.write_1bytes(SRS_FLV_IS_EX_HEADER | (frame_type << 4) | SrsVideoHEVCFrameTraitPacketTypeCodedFramesX); diff --git a/trunk/src/app/srs_app_rtsp_source.cpp b/trunk/src/app/srs_app_rtsp_source.cpp index b3c3aa832..7912c8cc2 100644 --- a/trunk/src/app/srs_app_rtsp_source.cpp +++ b/trunk/src/app/srs_app_rtsp_source.cpp @@ -832,7 +832,7 @@ srs_error_t SrsRtspRtpBuilder::on_video(SrsSharedPtrMessage *msg) srs_error_t err = srs_success; // cache the sequence header if h264 - bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size); + bool is_sequence_header = SrsFlvVideo::sh(msg->payload(), msg->size()); if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) { return srs_error_wrap(err, "meta update video"); } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 94afc9c26..4b99120a1 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -345,11 +345,11 @@ void SrsMessageQueue::shrink() for (int i = 0; i < (int)msgs.size(); i++) { SrsSharedPtrMessage *msg = msgs.at(i); - if (msg->is_video() && SrsFlvVideo::sh(msg->payload, msg->size)) { + if (msg->is_video() && SrsFlvVideo::sh(msg->payload(), msg->size())) { srs_freep(video_sh); video_sh = msg; continue; - } else if (msg->is_audio() && SrsFlvAudio::sh(msg->payload, msg->size)) { + } else if (msg->is_audio() && SrsFlvAudio::sh(msg->payload(), msg->size())) { srs_freep(audio_sh); audio_sh = msg; continue; @@ -620,8 +620,8 @@ srs_error_t SrsGopCache::cache(SrsSharedPtrMessage *shared_msg) // got video, update the video count if acceptable if (msg->is_video()) { // Drop video when not h.264 or h.265. - bool codec_ok = SrsFlvVideo::h264(msg->payload, msg->size); - codec_ok = codec_ok ? true : SrsFlvVideo::hevc(msg->payload, msg->size); + bool codec_ok = SrsFlvVideo::h264(msg->payload(), msg->size()); + codec_ok = codec_ok ? true : SrsFlvVideo::hevc(msg->payload(), msg->size()); if (!codec_ok) return err; @@ -647,7 +647,7 @@ srs_error_t SrsGopCache::cache(SrsSharedPtrMessage *shared_msg) } // clear gop cache when got key frame - if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload, msg->size)) { + if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload(), msg->size())) { clear(); // curent msg is video frame, so we set to 1. @@ -737,7 +737,7 @@ bool srs_hls_can_continue(int ret, SrsSharedPtrMessage *sh, SrsSharedPtrMessage // when video size equals to sequence header, // the video actually maybe a sequence header, // continue to make ffmpeg happy. - if (sh && sh->size == msg->size) { + if (sh && sh->size() == msg->size()) { srs_warn("the msg is actually a sequence header, ignore this packet."); return true; } @@ -965,11 +965,11 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage *shared_audio) if (format->acodec->id == SrsAudioCodecIdMP3) { srs_trace("%dB audio sh, codec(%d, %dbits, %dchannels, %dHZ)", - msg->size, c->id, flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type], + msg->size(), c->id, flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type], srs_flv_srates[c->sound_rate]); } else { srs_trace("%dB audio sh, codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), flv(%dbits, %dchannels, %dHZ)", - msg->size, c->id, srs_aac_object2str(c->aac_object).c_str(), c->aac_channels, + msg->size(), c->id, srs_aac_object2str(c->aac_object).c_str(), c->aac_channels, c->audio_data_rate / 1000, srs_aac_srates[c->aac_sample_rate], flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type], srs_flv_srates[c->sound_rate]); @@ -1047,12 +1047,12 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage *shared_video, bool is_se if (c->id == SrsVideoCodecIdAVC) { err = stat->on_video_info(req_, c->id, c->avc_profile, c->avc_level, c->width, c->height); srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %.1ffps, %.1fs)", - msg->size, c->id, srs_avc_profile2str(c->avc_profile).c_str(), srs_avc_level2str(c->avc_level).c_str(), + msg->size(), c->id, srs_avc_profile2str(c->avc_profile).c_str(), srs_avc_level2str(c->avc_level).c_str(), c->width, c->height, c->video_data_rate / 1000, c->frame_rate, c->duration); } else if (c->id == SrsVideoCodecIdHEVC) { err = stat->on_video_info(req_, c->id, c->hevc_profile, c->hevc_level, c->width, c->height); srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %.1ffps, %.1fs)", - msg->size, c->id, srs_hevc_profile2str(c->hevc_profile).c_str(), srs_hevc_level2str(c->hevc_level).c_str(), + msg->size(), c->id, srs_hevc_profile2str(c->hevc_profile).c_str(), srs_hevc_level2str(c->hevc_level).c_str(), c->width, c->height, c->video_data_rate / 1000, c->frame_rate, c->duration); } if (err != srs_success) { @@ -1942,7 +1942,7 @@ srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage *msg, SrsOnMetaDataPack bool drop_for_reduce = false; if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) { drop_for_reduce = true; - srs_warn("drop for reduce sh metadata, size=%d", msg->size); + srs_warn("drop for reduce sh metadata, size=%d", msg->size()); } // copy to all consumer @@ -2038,9 +2038,9 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage *msg) // whether consumer should drop for the duplicated sequence header. bool drop_for_reduce = false; if (is_sequence_header && meta->previous_ash() && _srs_config->get_reduce_sequence_header(req->vhost)) { - if (meta->previous_ash()->size == msg->size) { - drop_for_reduce = srs_bytes_equal(meta->previous_ash()->payload, msg->payload, msg->size); - srs_warn("drop for reduce sh audio, size=%d", msg->size); + if (meta->previous_ash()->size() == msg->size()) { + drop_for_reduce = srs_bytes_equal(meta->previous_ash()->payload(), msg->payload(), msg->size()); + srs_warn("drop for reduce sh audio, size=%d", msg->size()); } } @@ -2110,13 +2110,13 @@ srs_error_t SrsLiveSource::on_video(SrsCommonMessage *shared_video) // drop any unknown header video. // @see https://github.com/ossrs/srs/issues/421 - if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) { + if (!SrsFlvVideo::acceptable(shared_video->payload(), shared_video->size())) { char b0 = 0x00; - if (shared_video->size > 0) { - b0 = shared_video->payload[0]; + if (shared_video->size() > 0) { + b0 = shared_video->payload()[0]; } - srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0); + srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size(), b0); return err; } @@ -2134,7 +2134,7 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage *msg) { srs_error_t err = srs_success; - bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size); + bool is_sequence_header = SrsFlvVideo::sh(msg->payload(), msg->size()); // user can disable the sps parse to workaround when parse sps failed. // @see https://github.com/ossrs/srs/issues/474 @@ -2155,9 +2155,9 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage *msg) // whether consumer should drop for the duplicated sequence header. bool drop_for_reduce = false; if (is_sequence_header && meta->previous_vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) { - if (meta->previous_vsh()->size == msg->size) { - drop_for_reduce = srs_bytes_equal(meta->previous_vsh()->payload, msg->payload, msg->size); - srs_warn("drop for reduce sh video, size=%d", msg->size); + if (meta->previous_vsh()->size() == msg->size()) { + drop_for_reduce = srs_bytes_equal(meta->previous_vsh()->payload(), msg->payload(), msg->size()); + srs_warn("drop for reduce sh video, size=%d", msg->size()); } } @@ -2214,7 +2214,7 @@ srs_error_t SrsLiveSource::on_aggregate(SrsCommonMessage *msg) { srs_error_t err = srs_success; - SrsUniquePtr stream(new SrsBuffer(msg->payload, msg->size)); + SrsUniquePtr stream(new SrsBuffer(msg->payload(), msg->size())); // the aggregate message always use abs time. int delta = -1; @@ -2274,9 +2274,8 @@ srs_error_t SrsLiveSource::on_aggregate(SrsCommonMessage *msg) o.header.prefer_cid = msg->header.prefer_cid; if (data_size > 0) { - o.size = data_size; - o.payload = new char[o.size]; - stream->read_bytes(o.payload, o.size); + o.create_payload(data_size); + stream->read_bytes(o.payload(), data_size); } if (!stream->require(4)) { diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index b2e715063..e9e1c6c5c 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -40,8 +40,8 @@ char *SrsSrtPacket::wrap(int size) actual_buffer_size_ = size; // If the buffer is large enough, reuse it. - if (shared_buffer_ && shared_buffer_->size >= size) { - return shared_buffer_->payload; + if (shared_buffer_ && shared_buffer_->size() >= size) { + return shared_buffer_->payload(); } // Create a large enough message, with under-layer buffer. @@ -51,7 +51,7 @@ char *SrsSrtPacket::wrap(int size) char *buf = new char[size]; shared_buffer_->wrap(buf, size); - return shared_buffer_->payload; + return shared_buffer_->payload(); } char *SrsSrtPacket::wrap(char *data, int size) @@ -70,9 +70,9 @@ char *SrsSrtPacket::wrap(SrsSharedPtrMessage *msg) // Copy from the new message. shared_buffer_ = msg->copy(); // If we wrap a message, the size of packet equals to the message size. - actual_buffer_size_ = shared_buffer_->size; + actual_buffer_size_ = shared_buffer_->size(); - return msg->payload; + return msg->payload(); } SrsSrtPacket *SrsSrtPacket::copy() @@ -87,12 +87,12 @@ SrsSrtPacket *SrsSrtPacket::copy() char *SrsSrtPacket::data() { - return shared_buffer_->payload; + return shared_buffer_->payload(); } int SrsSrtPacket::size() { - return shared_buffer_->size; + return shared_buffer_->size(); } SrsSrtSourceManager::SrsSrtSourceManager() @@ -546,8 +546,7 @@ srs_error_t SrsSrtFrameBuilder::on_h264_frame(SrsTsMessage *msg, vector= 0); + + // Free existing payload + srs_freepa(payload_); + + // Allocate new buffer + if (size > 0) { + payload_ = new char[size]; + memset(payload_, 0, size); + size_ = size; + } else { + payload_ = NULL; + size_ = 0; + } +} + +void SrsMemoryBlock::create(char *data, int size) +{ + srs_assert(size >= 0); + + create(size); + + // Copy data if provided + if (data && size > 0) { + memcpy(payload_, data, size); + size_ = size; + } +} + +void SrsMemoryBlock::attach(char *data, int size) +{ + srs_assert(size >= 0); + + // Free existing payload + srs_freepa(payload_); + + // Attach new buffer + payload_ = data; + size_ = size; +} diff --git a/trunk/src/kernel/srs_kernel_buffer.hpp b/trunk/src/kernel/srs_kernel_buffer.hpp index 59a98180f..05e6059f5 100644 --- a/trunk/src/kernel/srs_kernel_buffer.hpp +++ b/trunk/src/kernel/srs_kernel_buffer.hpp @@ -195,4 +195,43 @@ public: srs_error_t read_bits_se(int32_t &v); }; +// Memory block for shared payload data. +// This class encapsulates a memory buffer with size information, +// designed to be used with SrsSharedPtr for efficient memory sharing. +class SrsMemoryBlock +{ +private: + // The current message parsed size, + // size <= allocated buffer size + // For the payload maybe sent in multiple chunks. + int size_; + // The payload of message, the SrsMemoryBlock manages the memory lifecycle. + // @remark, not all message payload can be decoded to packet. for example, + // video/audio packet use raw bytes, no video/audio packet. + char *payload_; + +public: + SrsMemoryBlock(); + virtual ~SrsMemoryBlock(); + +public: + char *payload() { return payload_; } + int size() { return size_; } + +public: + // Create memory block with specified size. + // @param size, the size of memory to allocate. Must be non-negative. + virtual void create(int size); + // Create memory block from existing buffer. + // @param data, the buffer to copy from. + // @param size, the size of buffer. Must be non-negative. + // @remark, this method will copy the data. + virtual void create(char *data, int size); + // Attach existing buffer to memory block. + // @param data, the buffer to attach, memory block takes ownership. + // @param size, the size of buffer. Must be non-negative. + // @remark, this method takes ownership of data, caller should not free it. + virtual void attach(char *data, int size); +}; + #endif diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index dc622af63..390a211ef 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -265,89 +265,62 @@ void SrsMessageHeader::initialize_video(int size, uint32_t time, int stream) SrsCommonMessage::SrsCommonMessage() { - payload = NULL; - size = 0; + payload_ = SrsSharedPtr(NULL); } SrsCommonMessage::~SrsCommonMessage() { - srs_freepa(payload); + // payload_ automatically cleaned up by SrsSharedPtr } void SrsCommonMessage::create_payload(int size) { - srs_freepa(payload); - - payload = new char[size]; + payload_ = SrsSharedPtr(new SrsMemoryBlock()); + payload_->create(size); srs_verbose("create payload for RTMP message. size=%d", size); } srs_error_t SrsCommonMessage::create(SrsMessageHeader *pheader, char *body, int size) { - // drop previous payload. - srs_freepa(payload); + srs_error_t err = srs_success; - this->header = *pheader; - this->payload = body; - this->size = size; + if (size < 0) { + return srs_error_new(ERROR_RTMP_MESSAGE_CREATE, "create message size=%d", size); + } - return srs_success; + // Create new memory block and attach the body + payload_ = SrsSharedPtr(new SrsMemoryBlock()); + payload_->attach(body, size); + + if (pheader) { + this->header = *pheader; + } + + return err; } -SrsSharedMessageHeader::SrsSharedMessageHeader() +SrsSharedPtrMessage::SrsSharedPtrMessage() : timestamp(0), stream_id(0), message_type(0), prefer_cid(RTMP_CID_OverConnection) { - payload_length = 0; - message_type = 0; - prefer_cid = 0; -} - -SrsSharedMessageHeader::~SrsSharedMessageHeader() -{ -} - -SrsSharedPtrMessage::SrsSharedPtrPayload::SrsSharedPtrPayload() -{ - payload = NULL; - size = 0; - shared_count = 0; -} - -SrsSharedPtrMessage::SrsSharedPtrPayload::~SrsSharedPtrPayload() -{ - srs_freepa(payload); -} - -SrsSharedPtrMessage::SrsSharedPtrMessage() : timestamp(0), stream_id(0), size(0), payload(NULL) -{ - ptr = NULL; + payload_ = SrsSharedPtr(NULL); ++_srs_pps_objs_msgs->sugar; } SrsSharedPtrMessage::~SrsSharedPtrMessage() { - if (ptr) { - if (ptr->shared_count == 0) { - srs_freep(ptr); - } else { - ptr->shared_count--; - } - } + // payload_ automatically cleaned up by SrsSharedPtr } srs_error_t SrsSharedPtrMessage::create(SrsCommonMessage *msg) { srs_error_t err = srs_success; - if ((err = create(&msg->header, msg->payload, msg->size)) != srs_success) { - return srs_error_wrap(err, "create message"); - } - - // to prevent double free of payload: - // initialize already attach the payload of msg, - // detach the payload to transfer the owner to shared ptr. - msg->payload = NULL; - msg->size = 0; + // Share the memory block from the common message + payload_ = msg->payload_; + this->timestamp = msg->header.timestamp; + this->stream_id = msg->header.stream_id; + this->message_type = msg->header.message_type; + this->prefer_cid = msg->header.prefer_cid; return err; } @@ -360,58 +333,35 @@ srs_error_t SrsSharedPtrMessage::create(SrsMessageHeader *pheader, char *payload return srs_error_new(ERROR_RTMP_MESSAGE_CREATE, "create message size=%d", size); } - srs_assert(!ptr); - ptr = new SrsSharedPtrPayload(); + // Create new memory block and attach the payload + payload_ = SrsSharedPtr(new SrsMemoryBlock()); + payload_->attach(payload, size); - // direct attach the data. + // Set header information if (pheader) { - ptr->header.message_type = pheader->message_type; - ptr->header.payload_length = size; - ptr->header.prefer_cid = pheader->prefer_cid; this->timestamp = pheader->timestamp; this->stream_id = pheader->stream_id; + this->message_type = pheader->message_type; + this->prefer_cid = pheader->prefer_cid; } - ptr->payload = payload; - ptr->size = size; - - // message can access it. - this->payload = ptr->payload; - this->size = ptr->size; return err; } void SrsSharedPtrMessage::wrap(char *payload, int size) { - srs_assert(!ptr); - ptr = new SrsSharedPtrPayload(); - - ptr->payload = payload; - ptr->size = size; - - this->payload = ptr->payload; - this->size = ptr->size; -} - -int SrsSharedPtrMessage::count() -{ - return ptr ? ptr->shared_count : 0; + // Create new memory block and wrap the payload + payload_ = SrsSharedPtr(new SrsMemoryBlock()); + payload_->attach(payload, size); } bool SrsSharedPtrMessage::check(int stream_id) { // Ignore error when message has no payload. - if (!ptr) { + if (!payload_.get()) { return true; } - // we donot use the complex basic header, - // ensure the basic header is 1bytes. - if (ptr->header.prefer_cid < 2 || ptr->header.prefer_cid > 63) { - srs_info("change the chunk_id=%d to default=%d", ptr->header.prefer_cid, RTMP_CID_ProtocolControl); - ptr->header.prefer_cid = RTMP_CID_ProtocolControl; - } - // we assume that the stream_id in a group must be the same. if (this->stream_id == stream_id) { return true; @@ -423,38 +373,40 @@ bool SrsSharedPtrMessage::check(int stream_id) bool SrsSharedPtrMessage::is_av() { - return ptr->header.message_type == RTMP_MSG_AudioMessage || ptr->header.message_type == RTMP_MSG_VideoMessage; + return message_type == RTMP_MSG_AudioMessage || message_type == RTMP_MSG_VideoMessage; } bool SrsSharedPtrMessage::is_audio() { - return ptr->header.message_type == RTMP_MSG_AudioMessage; + return message_type == RTMP_MSG_AudioMessage; } bool SrsSharedPtrMessage::is_video() { - return ptr->header.message_type == RTMP_MSG_VideoMessage; + return message_type == RTMP_MSG_VideoMessage; } int SrsSharedPtrMessage::chunk_header(char *cache, int nb_cache, bool c0) { + int payload_length = payload_.get() ? payload_->size() : 0; + if (c0) { - return srs_chunk_header_c0(ptr->header.prefer_cid, (uint32_t)timestamp, - ptr->header.payload_length, ptr->header.message_type, stream_id, cache, nb_cache); + return srs_chunk_header_c0(prefer_cid, (uint32_t)timestamp, + payload_length, message_type, stream_id, cache, nb_cache); } else { - return srs_chunk_header_c3(ptr->header.prefer_cid, (uint32_t)timestamp, + return srs_chunk_header_c3(prefer_cid, (uint32_t)timestamp, cache, nb_cache); } } SrsSharedPtrMessage *SrsSharedPtrMessage::copy() { - srs_assert(ptr); - SrsSharedPtrMessage *copy = copy2(); copy->timestamp = timestamp; copy->stream_id = stream_id; + copy->message_type = message_type; + copy->prefer_cid = prefer_cid; return copy; } @@ -463,15 +415,8 @@ SrsSharedPtrMessage *SrsSharedPtrMessage::copy2() { SrsSharedPtrMessage *copy = new SrsSharedPtrMessage(); - // We got an object from cache, the ptr might exists, so unwrap it. - // srs_assert(!copy->ptr); - - // Reference to this message instead. - copy->ptr = ptr; - ptr->shared_count++; - - copy->payload = ptr->payload; - copy->size = ptr->size; + // Share the memory block + copy->payload_ = payload_; return copy; } @@ -665,23 +610,23 @@ srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage **msgs, int count) if (msg->is_audio()) { if (drop_if_not_match_ && !has_audio_) continue; // Ignore audio packets if no audio stream. - cache_audio(msg->timestamp, msg->payload, msg->size, cache); + cache_audio(msg->timestamp, msg->payload(), msg->size(), cache); } else if (msg->is_video()) { if (drop_if_not_match_ && !has_video_) continue; // Ignore video packets if no video stream. - cache_video(msg->timestamp, msg->payload, msg->size, cache); + cache_video(msg->timestamp, msg->payload(), msg->size(), cache); } else { - cache_metadata(SrsFrameTypeScript, msg->payload, msg->size, cache); + cache_metadata(SrsFrameTypeScript, msg->payload(), msg->size(), cache); } // Cache FLV pts. - cache_pts(SRS_FLV_TAG_HEADER_SIZE + msg->size, pts); + cache_pts(SRS_FLV_TAG_HEADER_SIZE + msg->size(), pts); // Set cache to iovec. iovs[0].iov_base = cache; iovs[0].iov_len = SRS_FLV_TAG_HEADER_SIZE; - iovs[1].iov_base = msg->payload; - iovs[1].iov_len = msg->size; + iovs[1].iov_base = msg->payload(); + iovs[1].iov_len = msg->size(); iovs[2].iov_base = pts; iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE; diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index dac4a572f..9a05bb1fc 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -12,6 +12,9 @@ #include #include +#include +#include + // For srs-librtmp, @see https://github.com/ossrs/srs/issues/213 #ifndef _WIN32 #include @@ -192,25 +195,23 @@ public: // while the shared ptr message used to copy and send. class SrsCommonMessage { +public: // 4.1. Message Header -public: SrsMessageHeader header; - // 4.2. Message Payload + public: - // The current message parsed size, - // size <= header.payload_length - // For the payload maybe sent in multiple chunks. - int size; - // The payload of message, the SrsCommonMessage never know about the detail of payload, - // user must use SrsProtocol.decode_message to get concrete packet. - // @remark, not all message payload can be decoded to packet. for example, - // video/audio packet use raw bytes, no video/audio packet. - char *payload; + // 4.2. Message Payload + SrsSharedPtr payload_; public: SrsCommonMessage(); virtual ~SrsCommonMessage(); +public: + // Backward compatibility accessors + char *payload() { return payload_.get() ? payload_->payload() : NULL; } + int size() { return payload_.get() ? payload_->size() : 0; } + public: // Alloc the payload to specified size of bytes. virtual void create_payload(int size); @@ -223,30 +224,6 @@ public: virtual srs_error_t create(SrsMessageHeader *pheader, char *body, int size); }; -// The message header for shared ptr message. -// only the message for all msgs are same. -class SrsSharedMessageHeader -{ -public: - // 3bytes. - // Three-byte field that represents the size of the payload in bytes. - // It is set in big-endian format. - int32_t payload_length; - // 1byte. - // One byte field to represent the message type. A range of type IDs - // (1-7) are reserved for protocol control messages. - // For example, RTMP_MSG_AudioMessage or RTMP_MSG_VideoMessage. - int8_t message_type; - // Get the prefered cid(chunk stream id) which sendout over. - // set at decoding, and canbe used for directly send message, - // For example, dispatch to all connections. - int prefer_cid; - -public: - SrsSharedMessageHeader(); - virtual ~SrsSharedMessageHeader(); -}; - // The shared ptr message. // For audio/video/data message that need less memory copy. // and only for output. @@ -257,8 +234,6 @@ class SrsSharedPtrMessage { // 4.1. Message Header public: - // The header can shared, only set the timestamp and stream id. - // SrsSharedMessageHeader header; // Four-byte field that contains a timestamp of the message. // The 4 bytes are packed in the big-endian order. // @remark, used as calc timestamp when decode and encode time. @@ -268,41 +243,24 @@ public: // Four-byte field that identifies the stream of the message. These // bytes are set in big-endian format. int32_t stream_id; - // 4.2. Message Payload + // Message type for determining audio/video/data + int8_t message_type; + // Preferred chunk ID for RTMP chunking + int prefer_cid; + public: - // The current message parsed size, - // size <= header.payload_length - // For the payload maybe sent in multiple chunks. - int size; - // The payload of message, the SrsCommonMessage never know about the detail of payload, - // user must use SrsProtocol.decode_message to get concrete packet. - // @remark, not all message payload can be decoded to packet. for example, - // video/audio packet use raw bytes, no video/audio packet. - char *payload; - -private: - class SrsSharedPtrPayload - { - public: - // The shared message header. - SrsSharedMessageHeader header; - // The actual shared payload. - char *payload; - // The size of payload. - int size; - // The reference count - int shared_count; - - public: - SrsSharedPtrPayload(); - virtual ~SrsSharedPtrPayload(); - }; - SrsSharedPtrPayload *ptr; + // 4.2. Message Payload + SrsSharedPtr payload_; public: SrsSharedPtrMessage(); virtual ~SrsSharedPtrMessage(); +public: + // Backward compatibility accessors + char *payload() { return payload_.get() ? payload_->payload() : NULL; } + int size() { return payload_.get() ? payload_->size() : 0; } + public: // Create shared ptr message, // copy header, manage the payload of msg, @@ -318,12 +276,6 @@ public: // Create shared ptr message from RAW payload. // @remark Note that the header is set to zero. virtual void wrap(char *payload, int size); - // Get current reference count. - // when this object created, count set to 0. - // if copy() this object, count increase 1. - // if this or copy deleted, free payload when count is 0, or count--. - // @remark, assert object is created. - virtual int count(); // check prefer cid and stream id. // @return whether stream id already set. virtual bool check(int stream_id); diff --git a/trunk/src/kernel/srs_kernel_kbps.cpp b/trunk/src/kernel/srs_kernel_kbps.cpp index ec906ff5e..85bc91b5c 100644 --- a/trunk/src/kernel/srs_kernel_kbps.cpp +++ b/trunk/src/kernel/srs_kernel_kbps.cpp @@ -366,6 +366,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) } string &recvfrom_desc = stats->recvfrom_desc; + (void)recvfrom_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_recvfrom->update(_st_stat_recvfrom); _srs_pps_recvfrom_eagain->update(_st_stat_recvfrom_eagain); @@ -378,6 +379,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) #endif string &io_desc = stats->io_desc; + (void)io_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_read->update(_st_stat_read); _srs_pps_read_eagain->update(_st_stat_read_eagain); @@ -392,6 +394,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) #endif string &msg_desc = stats->msg_desc; + (void)msg_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_recvmsg->update(_st_stat_recvmsg); _srs_pps_recvmsg_eagain->update(_st_stat_recvmsg_eagain); @@ -404,6 +407,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) #endif string &epoll_desc = stats->epoll_desc; + (void)epoll_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_epoll->update(_st_stat_epoll); _srs_pps_epoll_zero->update(_st_stat_epoll_zero); @@ -416,6 +420,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) #endif string &sched_desc = stats->sched_desc; + (void)sched_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_sched_160ms->update(_st_stat_sched_160ms); _srs_pps_sched_s->update(_st_stat_sched_s); @@ -448,6 +453,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats) } string &thread_desc = stats->thread_desc; + (void)thread_desc; #if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS) _srs_pps_thread_run->update(_st_stat_thread_run); _srs_pps_thread_idle->update(_st_stat_thread_idle); diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index 69d5a1185..9150acf3f 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -761,7 +761,7 @@ SrsRtpPacket::SrsRtpPacket() { payload_ = NULL; payload_type_ = SrsRtpPacketPayloadTypeUnknown; - shared_buffer_ = NULL; + shared_buffer_ = SrsSharedPtr(NULL); actual_buffer_size_ = 0; nalu_type = 0; @@ -776,7 +776,7 @@ SrsRtpPacket::SrsRtpPacket() SrsRtpPacket::~SrsRtpPacket() { srs_freep(payload_); - srs_freep(shared_buffer_); + // shared_buffer_ automatically cleaned up by SrsSharedPtr } char *SrsRtpPacket::wrap(int size) @@ -785,23 +785,22 @@ char *SrsRtpPacket::wrap(int size) actual_buffer_size_ = size; // If the buffer is large enough, reuse it. - if (shared_buffer_ && shared_buffer_->size >= size) { - return shared_buffer_->payload; + if (shared_buffer_.get() && shared_buffer_->size() >= size) { + return shared_buffer_->payload(); } - // Create a large enough message, with under-layer buffer. - srs_freep(shared_buffer_); - shared_buffer_ = new SrsSharedPtrMessage(); + // Create a large enough memory block, with under-layer buffer. + shared_buffer_ = SrsSharedPtr(new SrsMemoryBlock()); - // Create under-layer buffer for new message + // Create under-layer buffer for new memory block // For RTC, we use larger under-layer buffer for each packet. int nb_buffer = srs_max(size, kRtpPacketSize); char *buf = new char[nb_buffer]; - shared_buffer_->wrap(buf, nb_buffer); + shared_buffer_->attach(buf, nb_buffer); ++_srs_pps_objs_rbuf->sugar; - return shared_buffer_->payload; + return shared_buffer_->payload(); } char *SrsRtpPacket::wrap(char *data, int size) @@ -811,18 +810,14 @@ char *SrsRtpPacket::wrap(char *data, int size) return buf; } -char *SrsRtpPacket::wrap(SrsSharedPtrMessage *msg) +char *SrsRtpPacket::wrap(SrsSharedPtr block) { - // Generally, the wrap(msg) is used for RTMP to RTC, where the msg - // is not generated by RTC. - srs_freep(shared_buffer_); + // Copy the shared memory block. + shared_buffer_ = block; + // If we wrap a memory block, the size of packet equals to the block size. + actual_buffer_size_ = shared_buffer_.get() ? shared_buffer_->size() : 0; - // Copy from the new message. - shared_buffer_ = msg->copy(); - // If we wrap a message, the size of packet equals to the message size. - actual_buffer_size_ = shared_buffer_->size; - - return msg->payload; + return shared_buffer_.get() ? shared_buffer_->payload() : NULL; } SrsRtpPacket *SrsRtpPacket::copy() @@ -834,7 +829,7 @@ SrsRtpPacket *SrsRtpPacket::copy() cp->payload_type_ = payload_type_; cp->nalu_type = nalu_type; - cp->shared_buffer_ = shared_buffer_ ? shared_buffer_->copy2() : NULL; + cp->shared_buffer_ = shared_buffer_; // Copy shared pointer cp->actual_buffer_size_ = actual_buffer_size_; cp->frame_type = frame_type; diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp index db14678e4..3cb3175d7 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp @@ -9,6 +9,7 @@ #include +#include #include #include @@ -27,6 +28,7 @@ #define SRS_NACK_DEBUG_DROP_PACKET_N 3 class SrsRtpPacket; +class SrsMemoryBlock; // The RTP packet max size, should never exceed this size. const int kRtpPacketSize = 1500; @@ -64,7 +66,6 @@ const uint8_t kEnd = 0x40; // Fu-header end bit class SrsBuffer; class SrsRtpRawPayload; class SrsRtpFUAPayload2; -class SrsSharedPtrMessage; class SrsRtpExtensionTypes; // Fast parse the SSRC from RTP packet. Return 0 if invalid. @@ -329,11 +330,11 @@ private: SrsRtpPacketPayloadType payload_type_; private: - // The original shared message, all RTP packets can refer to its data. - // Note that the size of shared msg, is not the packet size, it's a larger aligned buffer. + // The original shared memory block, all RTP packets can refer to its data. + // Note that the size of shared memory block, is not the packet size, it's a larger aligned buffer. // @remark Note that it may point to the whole RTP packet(for RTP parser, which decode RTP packet from buffer), // and it may point to the RTP payload(for RTMP to RTP, which build RTP header and payload). - SrsSharedPtrMessage *shared_buffer_; + SrsSharedPtr shared_buffer_; // The size of RTP packet or RTP payload. int actual_buffer_size_; // Helper fields. @@ -360,8 +361,8 @@ public: // Wrap buffer to shared_message, which is managed by us. char *wrap(int size); char *wrap(char *data, int size); - // Wrap the shared message, we copy it. - char *wrap(SrsSharedPtrMessage *msg); + // Wrap the shared memory block, we copy it. + char *wrap(SrsSharedPtr block); // Copy the RTP packet. virtual SrsRtpPacket *copy(); diff --git a/trunk/src/protocol/srs_protocol_format.cpp b/trunk/src/protocol/srs_protocol_format.cpp index 72ab42d9f..8b37f6b93 100644 --- a/trunk/src/protocol/srs_protocol_format.cpp +++ b/trunk/src/protocol/srs_protocol_format.cpp @@ -30,8 +30,8 @@ srs_error_t SrsRtmpFormat::on_metadata(SrsOnMetaDataPacket *meta) srs_error_t SrsRtmpFormat::on_audio(SrsSharedPtrMessage *shared_audio) { SrsSharedPtrMessage *msg = shared_audio; - char *data = msg->payload; - int size = msg->size; + char *data = msg->payload(); + int size = msg->size(); return SrsFormat::on_audio(msg->timestamp, data, size); } @@ -44,8 +44,8 @@ srs_error_t SrsRtmpFormat::on_audio(int64_t timestamp, char *data, int size) srs_error_t SrsRtmpFormat::on_video(SrsSharedPtrMessage *shared_video) { SrsSharedPtrMessage *msg = shared_video; - char *data = msg->payload; - int size = msg->size; + char *data = msg->payload(); + int size = msg->size(); return SrsFormat::on_video(msg->timestamp, data, size); } diff --git a/trunk/src/protocol/srs_protocol_rtmp_stack.cpp b/trunk/src/protocol/srs_protocol_rtmp_stack.cpp index 1cdafe166..8626a73c7 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_protocol_rtmp_stack.cpp @@ -341,7 +341,7 @@ srs_error_t SrsProtocol::recv_message(SrsCommonMessage **pmsg) continue; } - if (msg->size <= 0 || msg->header.payload_length <= 0) { + if (msg->size() <= 0 || msg->header.payload_length <= 0) { srs_trace("ignore empty message(type=%d, size=%d, time=%" PRId64 ", sid=%d).", msg->header.message_type, msg->header.payload_length, msg->header.timestamp, msg->header.stream_id); @@ -368,10 +368,10 @@ srs_error_t SrsProtocol::decode_message(SrsCommonMessage *msg, SrsPacket **ppack srs_error_t err = srs_success; srs_assert(msg != NULL); - srs_assert(msg->payload != NULL); - srs_assert(msg->size > 0); + srs_assert(msg->payload() != NULL); + srs_assert(msg->size() > 0); - SrsBuffer stream(msg->payload, msg->size); + SrsBuffer stream(msg->payload(), msg->size()); // decode the packet. SrsPacket *packet = NULL; @@ -407,20 +407,20 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage **msgs, int nb_msg } // ignore empty message. - if (!msg->payload || msg->size <= 0) { + if (!msg->payload() || msg->size() <= 0) { continue; } // p set to current write position, // it's ok when payload is NULL and size is 0. - char *p = msg->payload; - char *pend = msg->payload + msg->size; + char *p = msg->payload(); + char *pend = msg->payload() + msg->size(); // always write the header event payload is empty. while (p < pend) { // always has header int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index; - int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload); + int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload()); srs_assert(nbh > 0); // header iov @@ -985,6 +985,7 @@ srs_error_t SrsProtocol::read_message_header(SrsChunkStream *chunk, char fmt) // create msg when new chunk stream start if (!chunk->msg) { chunk->msg = new SrsCommonMessage(); + chunk->writing_pos_ = chunk->msg->payload(); } // read message header from socket to buffer. @@ -1199,32 +1200,39 @@ srs_error_t SrsProtocol::read_message_payload(SrsChunkStream *chunk, SrsCommonMe chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); *pmsg = chunk->msg; + chunk->msg = NULL; + chunk->writing_pos_ = NULL; return err; } srs_assert(chunk->header.payload_length > 0); // the chunk payload size. - int payload_size = chunk->header.payload_length - chunk->msg->size; - payload_size = srs_min(payload_size, in_chunk_size); + int nn_written = (int)(chunk->writing_pos_ - chunk->msg->payload()); + int payload_size = chunk->header.payload_length - nn_written; // Left bytes to read. + payload_size = srs_min(payload_size, in_chunk_size); // Restrict to chunk size. // create msg payload if not initialized - if (!chunk->msg->payload) { + if (!chunk->msg->payload()) { chunk->msg->create_payload(chunk->header.payload_length); + chunk->writing_pos_ = chunk->msg->payload(); } // read payload to buffer if ((err = in_buffer->grow(skt, payload_size)) != srs_success) { return srs_error_wrap(err, "read %d bytes payload", payload_size); } - memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->read_slice(payload_size), payload_size); - chunk->msg->size += payload_size; + memcpy(chunk->writing_pos_, in_buffer->read_slice(payload_size), payload_size); + chunk->writing_pos_ += payload_size; // got entire RTMP message? - if (chunk->header.payload_length == chunk->msg->size) { + nn_written = (int)(chunk->writing_pos_ - chunk->msg->payload()); + if (nn_written == chunk->header.payload_length) { *pmsg = chunk->msg; + chunk->msg = NULL; + chunk->writing_pos_ = NULL; return err; } @@ -1450,6 +1458,7 @@ SrsChunkStream::SrsChunkStream(int _cid) cid = _cid; has_extended_timestamp = false; msg = NULL; + writing_pos_ = NULL; msg_count = 0; extended_timestamp = 0; } diff --git a/trunk/src/protocol/srs_protocol_rtmp_stack.hpp b/trunk/src/protocol/srs_protocol_rtmp_stack.hpp index bff00f86f..ea0306d8e 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_protocol_rtmp_stack.hpp @@ -395,6 +395,8 @@ public: bool has_extended_timestamp; // The partially read message. SrsCommonMessage *msg; + // Current writing position of message. + char *writing_pos_; // Decoded msg count, to identify whether the chunk stream is fresh. int64_t msg_count; // Because the extended timestamp may be a delta timestamp, it can differ diff --git a/trunk/src/protocol/srs_protocol_rtp.cpp b/trunk/src/protocol/srs_protocol_rtp.cpp index d4f05dfc0..a6390d12b 100644 --- a/trunk/src/protocol/srs_protocol_rtp.cpp +++ b/trunk/src/protocol/srs_protocol_rtp.cpp @@ -148,7 +148,7 @@ srs_error_t SrsRtpVideoBuilder::package_nalus(SrsSharedPtrMessage *msg, const ve pkt->header.set_sequence(video_sequence_++); pkt->header.set_timestamp(msg->timestamp * 90); pkt->set_payload(raw_raw, SrsRtpPacketPayloadTypeNALU); - pkt->wrap(msg); + pkt->wrap(msg->payload_); } else { // We must free it, should never use RTP packets to free it, // because more than one RTP packet will refer to it. @@ -202,7 +202,7 @@ srs_error_t SrsRtpVideoBuilder::package_nalus(SrsSharedPtrMessage *msg, const ve pkt->set_payload(fua, SrsRtpPacketPayloadTypeFUA); } - pkt->wrap(msg); + pkt->wrap(msg->payload_); nb_left -= packet_size; } @@ -231,7 +231,7 @@ srs_error_t SrsRtpVideoBuilder::package_single_nalu(SrsSharedPtrMessage *msg, Sr raw->payload = sample->bytes; raw->nn_payload = sample->size; - pkt->wrap(msg); + pkt->wrap(msg->payload_); return err; } @@ -292,7 +292,7 @@ srs_error_t SrsRtpVideoBuilder::package_fu_a(SrsSharedPtrMessage *msg, SrsSample fua->size = packet_size; } - pkt->wrap(msg); + pkt->wrap(msg->payload_); p += packet_size; nb_left -= packet_size; diff --git a/trunk/src/utest/srs_utest_fmp4.cpp b/trunk/src/utest/srs_utest_fmp4.cpp index 69c15bda9..d102cc205 100644 --- a/trunk/src/utest/srs_utest_fmp4.cpp +++ b/trunk/src/utest/srs_utest_fmp4.cpp @@ -67,9 +67,9 @@ public: SrsSharedPtrMessage::wrap(payload, 1024); if (is_video_msg) { - ptr->header.message_type = RTMP_MSG_VideoMessage; + message_type = RTMP_MSG_VideoMessage; } else { - ptr->header.message_type = RTMP_MSG_AudioMessage; + message_type = RTMP_MSG_AudioMessage; } } virtual ~MockSrsSharedPtrMessage() {} diff --git a/trunk/src/utest/srs_utest_kernel.cpp b/trunk/src/utest/srs_utest_kernel.cpp index f735c83fd..3408331e4 100644 --- a/trunk/src/utest/srs_utest_kernel.cpp +++ b/trunk/src/utest/srs_utest_kernel.cpp @@ -5336,14 +5336,6 @@ VOID TEST(KernelFLVTest, CoverSharedPtrMessage) HELPER_EXPECT_FAILED(m.create(&h, NULL, -1)); } - if (true) { - SrsCommonMessage cm; - cm.size = -1; - - SrsSharedPtrMessage m; - HELPER_EXPECT_FAILED(m.create(&cm)); - } - if (true) { SrsMessageHeader h; h.prefer_cid = 1; @@ -6926,3 +6918,93 @@ VOID TEST(KernelUtilityTest, Base64Decode) EXPECT_STRNE("admin:admin", plaintext.c_str()); } } + +VOID TEST(KernelMemoryBlockTest, MemoryBlockBasic) +{ + + // Test basic construction and destruction + if (true) { + SrsMemoryBlock block; + EXPECT_EQ(0, block.size()); + EXPECT_EQ(NULL, block.payload()); + } + + // Test create with size + if (true) { + SrsMemoryBlock block; + block.create(1024); + EXPECT_EQ(1024, block.size()); + EXPECT_NE((char *)NULL, block.payload()); + } + + // Test create with data + if (true) { + SrsMemoryBlock block; + char test_data[] = "Hello, World!"; + int test_size = strlen(test_data); + + block.create(test_data, test_size); + EXPECT_EQ(test_size, block.size()); + EXPECT_NE((char *)NULL, block.payload()); + EXPECT_EQ(0, memcmp(block.payload(), test_data, test_size)); + } + + // Test attach + if (true) { + SrsMemoryBlock block; + char *test_data = new char[100]; + memset(test_data, 0x42, 100); + + block.attach(test_data, 100); + EXPECT_EQ(100, block.size()); + EXPECT_EQ(test_data, block.payload()); + + // Memory will be freed by block destructor + } +} + +VOID TEST(KernelMemoryBlockTest, SharedMemoryBlock) +{ + + // Test basic shared memory block usage + if (true) { + SrsSharedPtr shared_block(new SrsMemoryBlock()); + shared_block->create(1024); + + EXPECT_EQ(1024, shared_block->size()); + EXPECT_NE((char *)NULL, shared_block->payload()); + + // Test sharing + SrsSharedPtr shared_copy = shared_block; + EXPECT_EQ(shared_block->payload(), shared_copy->payload()); + EXPECT_EQ(shared_block->size(), shared_copy->size()); + } + + // Test multiple references + if (true) { + SrsSharedPtr original(new SrsMemoryBlock()); + char test_data[] = "Shared memory test data"; + original->create(test_data, strlen(test_data)); + + // Create multiple references + SrsSharedPtr copy1 = original; + SrsSharedPtr copy2 = original; + SrsSharedPtr copy3 = copy1; + + // All should point to the same memory + EXPECT_EQ(original->payload(), copy1->payload()); + EXPECT_EQ(original->payload(), copy2->payload()); + EXPECT_EQ(original->payload(), copy3->payload()); + + // All should have the same size + EXPECT_EQ(original->size(), copy1->size()); + EXPECT_EQ(original->size(), copy2->size()); + EXPECT_EQ(original->size(), copy3->size()); + + // Verify data integrity + EXPECT_EQ(0, memcmp(original->payload(), test_data, strlen(test_data))); + EXPECT_EQ(0, memcmp(copy1->payload(), test_data, strlen(test_data))); + EXPECT_EQ(0, memcmp(copy2->payload(), test_data, strlen(test_data))); + EXPECT_EQ(0, memcmp(copy3->payload(), test_data, strlen(test_data))); + } +} diff --git a/trunk/src/utest/srs_utest_protocol.cpp b/trunk/src/utest/srs_utest_protocol.cpp index 9cc8cb5d3..d28a7b39a 100644 --- a/trunk/src/utest/srs_utest_protocol.cpp +++ b/trunk/src/utest/srs_utest_protocol.cpp @@ -899,7 +899,6 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray) SrsSharedPtrMessage msg; char *payload = new char[1024]; HELPER_EXPECT_SUCCESS(msg.create(&header, payload, 1024)); - EXPECT_EQ(0, msg.count()); if (true) { SrsMessageArray arr(3); @@ -908,15 +907,11 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray) SrsUniquePtr parr2(parr, srs_utest_free_message_array); arr.msgs[0] = msg.copy(); - EXPECT_EQ(1, msg.count()); arr.msgs[1] = msg.copy(); - EXPECT_EQ(2, msg.count()); arr.msgs[2] = msg.copy(); - EXPECT_EQ(3, msg.count()); } - EXPECT_EQ(0, msg.count()); if (true) { SrsMessageArray arr(3); @@ -925,12 +920,9 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray) SrsUniquePtr parr2(parr, srs_utest_free_message_array); arr.msgs[0] = msg.copy(); - EXPECT_EQ(1, msg.count()); arr.msgs[2] = msg.copy(); - EXPECT_EQ(2, msg.count()); } - EXPECT_EQ(0, msg.count()); } /** @@ -4402,9 +4394,8 @@ VOID TEST(ProtocolStackTest, ProtocolSendVMessage) SrsCommonMessage *msg = new SrsCommonMessage(); SrsUniquePtr msg_uptr(msg); - msg->size = sizeof(data); - msg->payload = new char[msg->size]; - memcpy(msg->payload, data, msg->size); + msg->create_payload(sizeof(data)); + memcpy(msg->payload(), data, sizeof(data)); SrsSharedPtrMessage m; HELPER_ASSERT_SUCCESS(m.create(msg)); @@ -4964,8 +4955,8 @@ VOID TEST(ProtocolStackTest, ProtocolAckSizeFlow) if (true) { SrsCommonMessage *msg = new SrsCommonMessage(); SrsUniquePtr msg_uptr(msg); - msg->header.payload_length = msg->size = 4096; - msg->payload = new char[msg->size]; + msg->create_payload(4096); + msg->header.payload_length = 4096; msg->header.message_type = 9; EXPECT_TRUE(msg->header.is_video()); @@ -5014,8 +5005,8 @@ VOID TEST(ProtocolStackTest, ProtocolAckSizeFlow) if (true) { SrsCommonMessage *msg = new SrsCommonMessage(); SrsUniquePtr msg_uptr(msg); - msg->header.payload_length = msg->size = 4096; - msg->payload = new char[msg->size]; + msg->header.payload_length = 4096; + msg->create_payload(4096); msg->header.message_type = 9; EXPECT_TRUE(msg->header.is_video()); diff --git a/trunk/src/utest/srs_utest_rtmp.cpp b/trunk/src/utest/srs_utest_rtmp.cpp index ff346bd54..0f9109337 100644 --- a/trunk/src/utest/srs_utest_rtmp.cpp +++ b/trunk/src/utest/srs_utest_rtmp.cpp @@ -250,7 +250,6 @@ VOID TEST(ProtocolRTMPTest, SendPacketsError) SrsCommonMessage pkt; pkt.header.initialize_audio(200, 1000, 1); pkt.create_payload(256); - pkt.size = 256; SrsSharedPtrMessage *msg = new SrsSharedPtrMessage(); msg->create(&pkt); @@ -346,7 +345,6 @@ VOID TEST(ProtocolRTMPTest, HugeMessages) SrsCommonMessage pkt; pkt.header.initialize_audio(200, 1000, 1); pkt.create_payload(256); - pkt.size = 256; SrsSharedPtrMessage *msg = new SrsSharedPtrMessage(); msg->create(&pkt); @@ -362,7 +360,6 @@ VOID TEST(ProtocolRTMPTest, HugeMessages) SrsCommonMessage pkt; pkt.header.initialize_audio(200, 1000, 1); pkt.create_payload(256); - pkt.size = 256; SrsSharedPtrMessage *msg = new SrsSharedPtrMessage(); msg->create(&pkt); @@ -384,7 +381,6 @@ VOID TEST(ProtocolRTMPTest, HugeMessages) SrsCommonMessage pkt; pkt.header.initialize_audio(200, 1000, 1); pkt.create_payload(256); - pkt.size = 256; SrsSharedPtrMessage *msg = new SrsSharedPtrMessage(); msg->create(&pkt); @@ -412,7 +408,6 @@ VOID TEST(ProtocolRTMPTest, DecodeMessages) SrsCommonMessage msg; msg.header.initialize_amf0_script(1, 1); msg.create_payload(1); - msg.size = 1; SrsPacket *pkt; HELPER_EXPECT_FAILED(p.decode_message(&msg, &pkt)); @@ -455,8 +450,7 @@ SrsCommonMessage *_create_amf0(char *bytes, int size, int stream_id) SrsCommonMessage *msg = new SrsCommonMessage(); msg->header.initialize_amf0_script(size, stream_id); msg->create_payload(size); - memcpy(msg->payload, bytes, size); - msg->size = size; + memcpy(msg->payload(), bytes, size); return msg; } @@ -2949,7 +2943,7 @@ VOID TEST(ProtocolRTMPTest, CreateRTMPMessage) if (true) { SrsSharedPtrMessage *msg = NULL; HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeScript, 0, _strcpy("Hello"), 5, 0, &msg)); - EXPECT_STREQ("Hello", msg->payload); + EXPECT_STREQ("Hello", msg->payload()); srs_freep(msg); } @@ -2957,7 +2951,7 @@ VOID TEST(ProtocolRTMPTest, CreateRTMPMessage) if (true) { SrsSharedPtrMessage *msg = NULL; HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeVideo, 0, _strcpy("Hello"), 5, 0, &msg)); - EXPECT_STREQ("Hello", msg->payload); + EXPECT_STREQ("Hello", msg->payload()); srs_freep(msg); } @@ -2965,13 +2959,13 @@ VOID TEST(ProtocolRTMPTest, CreateRTMPMessage) if (true) { SrsSharedPtrMessage *msg = NULL; HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeAudio, 0, _strcpy("Hello"), 5, 0, &msg)); - EXPECT_STREQ("Hello", msg->payload); + EXPECT_STREQ("Hello", msg->payload()); srs_freep(msg); } if (true) { SrsCommonMessage *msg = NULL; HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeAudio, 0, _strcpy("Hello"), 5, 0, &msg)); - EXPECT_STREQ("Hello", msg->payload); + EXPECT_STREQ("Hello", msg->payload()); srs_freep(msg); } }