AI: Update RTMP message memory management with shared pointers. v7.0.73 (#4464)

This PR modernizes the memory management architecture in SRS by
refactoring RTMP message handling to use shared pointers
(SrsSharedPtr<SrsMemoryBlock>) instead of manual memory management. This
change improves memory safety, reduces the risk of memory leaks, and
provides a cleaner abstraction for message payload handling.

* Introduced `SrsMemoryBlock`: A dedicated class for managing memory
buffers with size information
* Replaced manual memory management: `SrsCommonMessage` and
`SrsSharedPtrMessage` now use `SrsSharedPtr<SrsMemoryBlock>` instead of
raw pointers
* Updated `SrsRtpPacket`: Now uses `SrsSharedPtr<SrsMemoryBlock>` for
shared buffer management

---------

Co-authored-by: OSSRS-AI <winlinam@gmail.com>
This commit is contained in:
Winlin 2025-09-01 14:00:31 -04:00 committed by GitHub
parent b834be67a9
commit c534a265e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 404 additions and 340 deletions

View File

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a>
## SRS 7.0 Changelog
* v7.0, 2025-09-01, Merge [#4464](https://github.com/ossrs/srs/pull/4464): AI: Use shared ptr in RTMP message. v7.0.73 (#4464)
* v7.0, 2025-09-01, Merge [#4463](https://github.com/ossrs/srs/pull/4463): AI: Use SrsHttpUri for URL parsing and add legacy RTMP URL conversion. v7.0.72 (#4463)
* v7.0, 2025-09-01, Merge [#4462](https://github.com/ossrs/srs/pull/4462): HTTP: Rename HTTP hijack to dynamic match for better clarity. v7.0.71 (#4462)
* v7.0, 2025-08-31, Merge [#4461](https://github.com/ossrs/srs/pull/4461): AI: Extract shared components and improve SRS server architecture. v7.0.70 (#4461)

View File

@ -645,7 +645,7 @@ srs_error_t SrsDashController::refresh_init_mp4(SrsSharedPtrMessage *msg, SrsFor
{
srs_error_t err = srs_success;
if (msg->size <= 0 || (msg->is_video() && !format->vcodec->is_avc_codec_ok()) || (msg->is_audio() && !format->acodec->is_aac_codec_ok())) {
if (msg->size() <= 0 || (msg->is_video() && !format->vcodec->is_avc_codec_ok()) || (msg->is_audio() && !format->acodec->is_aac_codec_ok())) {
srs_warn("DASH: Ignore empty sequence header.");
return err;
}

View File

@ -319,7 +319,7 @@ srs_error_t SrsDvrFlvSegmenter::encode_metadata(SrsSharedPtrMessage *metadata)
return err;
}
SrsBuffer stream(metadata->payload, metadata->size);
SrsBuffer stream(metadata->payload(), metadata->size());
SrsUniquePtr<SrsAmf0Any> name(SrsAmf0Any::str());
if ((err = name->read(&stream)) != srs_success) {
@ -370,8 +370,8 @@ srs_error_t SrsDvrFlvSegmenter::encode_audio(SrsSharedPtrMessage *audio, SrsForm
{
srs_error_t err = srs_success;
char *payload = audio->payload;
int size = audio->size;
char *payload = audio->payload();
int size = audio->size();
if ((err = enc->write_audio(audio->timestamp, payload, size)) != srs_success) {
return srs_error_wrap(err, "write audio");
}
@ -383,8 +383,8 @@ srs_error_t SrsDvrFlvSegmenter::encode_video(SrsSharedPtrMessage *video, SrsForm
{
srs_error_t err = srs_success;
char *payload = video->payload;
int size = video->size;
char *payload = video->payload();
int size = video->size();
bool sh = (format->video->avc_packet_type == SrsVideoAvcFrameTraitSequenceHeader);
bool keyframe = (!sh && format->video->frame_type == SrsVideoAvcFrameTypeKeyFrame);
@ -870,8 +870,8 @@ srs_error_t SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage *msg)
return err;
}
char *payload = msg->payload;
int size = msg->size;
char *payload = msg->payload();
int size = msg->size();
bool codec_ok = SrsFlvVideo::h264(payload, size);
codec_ok = codec_ok ? true : SrsFlvVideo::hevc(payload, size);

View File

@ -338,7 +338,7 @@ srs_error_t SrsEdgeFlvUpstream::decode_message(SrsCommonMessage *msg, SrsPacket
srs_error_t err = srs_success;
SrsPacket *packet = NULL;
SrsBuffer stream(msg->payload, msg->size);
SrsBuffer stream(msg->payload(), msg->size());
SrsMessageHeader &header = msg->header;
if (header.is_amf0_data() || header.is_amf3_data()) {
@ -924,7 +924,7 @@ srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage *msg)
// the msg is auto free by source,
// so we just ignore, or copy then send it.
if (msg->size <= 0 || msg->header.is_set_chunk_size() || msg->header.is_window_ackledgement_size() || msg->header.is_ackledgement()) {
if (msg->size() <= 0 || msg->header.is_set_chunk_size() || msg->header.is_window_ackledgement_size() || msg->header.is_ackledgement()) {
return err;
}

View File

@ -127,7 +127,7 @@ srs_error_t SrsForwarder::on_audio(SrsSharedPtrMessage *shared_audio)
return srs_error_wrap(err, "jitter");
}
if (SrsFlvAudio::sh(msg->payload, msg->size)) {
if (SrsFlvAudio::sh(msg->payload(), msg->size())) {
srs_freep(sh_audio);
sh_audio = msg->copy();
}
@ -150,7 +150,7 @@ srs_error_t SrsForwarder::on_video(SrsSharedPtrMessage *shared_video)
return srs_error_wrap(err, "jitter");
}
if (SrsFlvVideo::sh(msg->payload, msg->size)) {
if (SrsFlvVideo::sh(msg->payload(), msg->size())) {
srs_freep(sh_video);
sh_video = msg->copy();
}

View File

@ -2129,7 +2129,7 @@ srs_error_t SrsGbMuxer::rtmp_write_packet(char type, uint32_t timestamp, char *d
srs_trace("Muxer: send msg %s age=%d, dts=%" PRId64 ", size=%d",
msg->is_audio() ? "A" : msg->is_video() ? "V"
: "N",
pprint_->age(), msg->timestamp, msg->size);
pprint_->age(), msg->timestamp, msg->size());
}
// send out encoded msg.

View File

@ -40,7 +40,7 @@ char flv_header[] = {'F', 'L', 'V',
string serialFlv(SrsSharedPtrMessage *msg)
{
int size = 15 + msg->size;
int size = 15 + msg->size();
char *byte = new char[size];
SrsUniquePtr<SrsBuffer> stream(new SrsBuffer(byte, size));
@ -50,14 +50,14 @@ string serialFlv(SrsSharedPtrMessage *msg)
char type = msg->is_video() ? 0x09 : 0x08;
stream->write_1bytes(type);
stream->write_3bytes(msg->size);
stream->write_3bytes(msg->size());
stream->write_3bytes((int32_t)dts);
stream->write_1bytes(dts >> 24 & 0xFF);
stream->write_3bytes(0);
stream->write_bytes(msg->payload, msg->size);
stream->write_bytes(msg->payload(), msg->size());
// pre tag size
int preTagSize = msg->size + 11;
int preTagSize = msg->size() + 11;
stream->write_4bytes(preTagSize);
string ret(stream->data(), stream->size());
@ -298,7 +298,7 @@ srs_error_t SrsHds::on_video(SrsSharedPtrMessage *msg)
return err;
}
if (SrsFlvVideo::sh(msg->payload, msg->size)) {
if (SrsFlvVideo::sh(msg->payload(), msg->size())) {
srs_freep(video_sh);
video_sh = msg->copy();
}
@ -348,7 +348,7 @@ srs_error_t SrsHds::on_audio(SrsSharedPtrMessage *msg)
return err;
}
if (SrsFlvAudio::sh(msg->payload, msg->size)) {
if (SrsFlvAudio::sh(msg->payload(), msg->size())) {
srs_freep(audio_sh);
audio_sh = msg->copy();
}

View File

@ -392,11 +392,11 @@ srs_error_t SrsFlvStreamEncoder::write_tags(SrsSharedPtrMessage **msgs, int coun
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage *msg = msgs[i];
if (msg->is_video()) {
if (!SrsFlvVideo::sh(msg->payload, msg->size))
if (!SrsFlvVideo::sh(msg->payload(), msg->size()))
nn_video_frames++;
has_video = true;
} else if (msg->is_audio()) {
if (!SrsFlvAudio::sh(msg->payload, msg->size))
if (!SrsFlvAudio::sh(msg->payload(), msg->size()))
nn_audio_frames++;
has_audio = true;
}
@ -958,11 +958,11 @@ srs_error_t SrsLiveStream::streaming_send_messages(ISrsBufferEncoder *enc, SrsSh
SrsSharedPtrMessage *msg = msgs[i];
if (msg->is_audio()) {
err = enc->write_audio(msg->timestamp, msg->payload, msg->size);
err = enc->write_audio(msg->timestamp, msg->payload(), msg->size());
} else if (msg->is_video()) {
err = enc->write_video(msg->timestamp, msg->payload, msg->size);
err = enc->write_video(msg->timestamp, msg->payload(), msg->size());
} else {
err = enc->write_metadata(msg->timestamp, msg->payload, msg->size);
err = enc->write_metadata(msg->timestamp, msg->payload(), msg->size());
}
if (err != srs_success) {

View File

@ -626,7 +626,7 @@ srs_error_t SrsMpegtsOverUdp::rtmp_write_packet(char type, uint32_t timestamp, c
srs_trace("mpegts: send msg %s age=%d, dts=%" PRId64 ", size=%d",
msg->is_audio() ? "A" : msg->is_video() ? "V"
: "N",
pprint->age(), msg->timestamp, msg->size);
pprint->age(), msg->timestamp, msg->size());
}
// send out encoded msg.

View File

@ -78,7 +78,7 @@ srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage *shared_audio, SrsFor
// If no audio RAW frame, or not parsed for no sequence header, drop the packet.
if (format->audio->nb_samples == 0) {
srs_warn("RTC: Drop AAC %d bytes for no sample", shared_audio->size);
srs_warn("RTC: Drop AAC %d bytes for no sample", shared_audio->size());
return err;
}
@ -1186,7 +1186,7 @@ srs_error_t SrsRtcRtpBuilder::on_video(SrsSharedPtrMessage *msg)
srs_error_t err = srs_success;
// cache the sequence header if h264
bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size);
bool is_sequence_header = SrsFlvVideo::sh(msg->payload(), msg->size());
if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) {
return srs_error_wrap(err, "meta update video");
}
@ -1919,7 +1919,7 @@ void SrsRtcFrameBuilder::packet_aac(SrsCommonMessage *audio, char *data, int len
int rtmp_len = len + 2;
audio->header.initialize_audio(rtmp_len, pts, 1);
audio->create_payload(rtmp_len);
SrsBuffer stream(audio->payload, rtmp_len);
SrsBuffer stream(audio->payload(), rtmp_len);
uint8_t aac_flag = (SrsAudioCodecIdAAC << 4) | (SrsAudioSampleRate44100 << 2) | (SrsAudioSampleBits16bit << 1) | SrsAudioChannelsStereo;
stream.write_1bytes(aac_flag);
if (is_header) {
@ -1928,7 +1928,6 @@ void SrsRtcFrameBuilder::packet_aac(SrsCommonMessage *audio, char *data, int len
stream.write_1bytes(1);
}
stream.write_bytes(data, len);
audio->size = rtmp_len;
}
srs_error_t SrsRtcFrameBuilder::packet_video(SrsRtpPacket *pkt)
@ -2406,8 +2405,7 @@ srs_error_t SrsRtcFrameBuilder::packet_video_rtmp(const uint16_t start, const ui
SrsCommonMessage rtmp;
rtmp.header.initialize_video(nb_payload, pkt->get_avsync_time(), 1);
rtmp.create_payload(nb_payload);
rtmp.size = nb_payload;
SrsBuffer payload(rtmp.payload, rtmp.size);
SrsBuffer payload(rtmp.payload(), rtmp.size());
if (video_codec_ == SrsVideoCodecIdHEVC) {
// @see: https://veovera.org/docs/enhanced/enhanced-rtmp-v1.pdf, page 8
payload.write_1bytes(SRS_FLV_IS_EX_HEADER | (frame_type << 4) | SrsVideoHEVCFrameTraitPacketTypeCodedFramesX);

View File

@ -832,7 +832,7 @@ srs_error_t SrsRtspRtpBuilder::on_video(SrsSharedPtrMessage *msg)
srs_error_t err = srs_success;
// cache the sequence header if h264
bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size);
bool is_sequence_header = SrsFlvVideo::sh(msg->payload(), msg->size());
if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) {
return srs_error_wrap(err, "meta update video");
}

View File

@ -345,11 +345,11 @@ void SrsMessageQueue::shrink()
for (int i = 0; i < (int)msgs.size(); i++) {
SrsSharedPtrMessage *msg = msgs.at(i);
if (msg->is_video() && SrsFlvVideo::sh(msg->payload, msg->size)) {
if (msg->is_video() && SrsFlvVideo::sh(msg->payload(), msg->size())) {
srs_freep(video_sh);
video_sh = msg;
continue;
} else if (msg->is_audio() && SrsFlvAudio::sh(msg->payload, msg->size)) {
} else if (msg->is_audio() && SrsFlvAudio::sh(msg->payload(), msg->size())) {
srs_freep(audio_sh);
audio_sh = msg;
continue;
@ -620,8 +620,8 @@ srs_error_t SrsGopCache::cache(SrsSharedPtrMessage *shared_msg)
// got video, update the video count if acceptable
if (msg->is_video()) {
// Drop video when not h.264 or h.265.
bool codec_ok = SrsFlvVideo::h264(msg->payload, msg->size);
codec_ok = codec_ok ? true : SrsFlvVideo::hevc(msg->payload, msg->size);
bool codec_ok = SrsFlvVideo::h264(msg->payload(), msg->size());
codec_ok = codec_ok ? true : SrsFlvVideo::hevc(msg->payload(), msg->size());
if (!codec_ok)
return err;
@ -647,7 +647,7 @@ srs_error_t SrsGopCache::cache(SrsSharedPtrMessage *shared_msg)
}
// clear gop cache when got key frame
if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload, msg->size)) {
if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload(), msg->size())) {
clear();
// curent msg is video frame, so we set to 1.
@ -737,7 +737,7 @@ bool srs_hls_can_continue(int ret, SrsSharedPtrMessage *sh, SrsSharedPtrMessage
// when video size equals to sequence header,
// the video actually maybe a sequence header,
// continue to make ffmpeg happy.
if (sh && sh->size == msg->size) {
if (sh && sh->size() == msg->size()) {
srs_warn("the msg is actually a sequence header, ignore this packet.");
return true;
}
@ -965,11 +965,11 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage *shared_audio)
if (format->acodec->id == SrsAudioCodecIdMP3) {
srs_trace("%dB audio sh, codec(%d, %dbits, %dchannels, %dHZ)",
msg->size, c->id, flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type],
msg->size(), c->id, flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type],
srs_flv_srates[c->sound_rate]);
} else {
srs_trace("%dB audio sh, codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), flv(%dbits, %dchannels, %dHZ)",
msg->size, c->id, srs_aac_object2str(c->aac_object).c_str(), c->aac_channels,
msg->size(), c->id, srs_aac_object2str(c->aac_object).c_str(), c->aac_channels,
c->audio_data_rate / 1000, srs_aac_srates[c->aac_sample_rate],
flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type],
srs_flv_srates[c->sound_rate]);
@ -1047,12 +1047,12 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage *shared_video, bool is_se
if (c->id == SrsVideoCodecIdAVC) {
err = stat->on_video_info(req_, c->id, c->avc_profile, c->avc_level, c->width, c->height);
srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %.1ffps, %.1fs)",
msg->size, c->id, srs_avc_profile2str(c->avc_profile).c_str(), srs_avc_level2str(c->avc_level).c_str(),
msg->size(), c->id, srs_avc_profile2str(c->avc_profile).c_str(), srs_avc_level2str(c->avc_level).c_str(),
c->width, c->height, c->video_data_rate / 1000, c->frame_rate, c->duration);
} else if (c->id == SrsVideoCodecIdHEVC) {
err = stat->on_video_info(req_, c->id, c->hevc_profile, c->hevc_level, c->width, c->height);
srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %.1ffps, %.1fs)",
msg->size, c->id, srs_hevc_profile2str(c->hevc_profile).c_str(), srs_hevc_level2str(c->hevc_level).c_str(),
msg->size(), c->id, srs_hevc_profile2str(c->hevc_profile).c_str(), srs_hevc_level2str(c->hevc_level).c_str(),
c->width, c->height, c->video_data_rate / 1000, c->frame_rate, c->duration);
}
if (err != srs_success) {
@ -1942,7 +1942,7 @@ srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage *msg, SrsOnMetaDataPack
bool drop_for_reduce = false;
if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) {
drop_for_reduce = true;
srs_warn("drop for reduce sh metadata, size=%d", msg->size);
srs_warn("drop for reduce sh metadata, size=%d", msg->size());
}
// copy to all consumer
@ -2038,9 +2038,9 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage *msg)
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && meta->previous_ash() && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->previous_ash()->size == msg->size) {
drop_for_reduce = srs_bytes_equal(meta->previous_ash()->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh audio, size=%d", msg->size);
if (meta->previous_ash()->size() == msg->size()) {
drop_for_reduce = srs_bytes_equal(meta->previous_ash()->payload(), msg->payload(), msg->size());
srs_warn("drop for reduce sh audio, size=%d", msg->size());
}
}
@ -2110,13 +2110,13 @@ srs_error_t SrsLiveSource::on_video(SrsCommonMessage *shared_video)
// drop any unknown header video.
// @see https://github.com/ossrs/srs/issues/421
if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {
if (!SrsFlvVideo::acceptable(shared_video->payload(), shared_video->size())) {
char b0 = 0x00;
if (shared_video->size > 0) {
b0 = shared_video->payload[0];
if (shared_video->size() > 0) {
b0 = shared_video->payload()[0];
}
srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);
srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size(), b0);
return err;
}
@ -2134,7 +2134,7 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage *msg)
{
srs_error_t err = srs_success;
bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size);
bool is_sequence_header = SrsFlvVideo::sh(msg->payload(), msg->size());
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/ossrs/srs/issues/474
@ -2155,9 +2155,9 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage *msg)
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && meta->previous_vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->previous_vsh()->size == msg->size) {
drop_for_reduce = srs_bytes_equal(meta->previous_vsh()->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh video, size=%d", msg->size);
if (meta->previous_vsh()->size() == msg->size()) {
drop_for_reduce = srs_bytes_equal(meta->previous_vsh()->payload(), msg->payload(), msg->size());
srs_warn("drop for reduce sh video, size=%d", msg->size());
}
}
@ -2214,7 +2214,7 @@ srs_error_t SrsLiveSource::on_aggregate(SrsCommonMessage *msg)
{
srs_error_t err = srs_success;
SrsUniquePtr<SrsBuffer> stream(new SrsBuffer(msg->payload, msg->size));
SrsUniquePtr<SrsBuffer> stream(new SrsBuffer(msg->payload(), msg->size()));
// the aggregate message always use abs time.
int delta = -1;
@ -2274,9 +2274,8 @@ srs_error_t SrsLiveSource::on_aggregate(SrsCommonMessage *msg)
o.header.prefer_cid = msg->header.prefer_cid;
if (data_size > 0) {
o.size = data_size;
o.payload = new char[o.size];
stream->read_bytes(o.payload, o.size);
o.create_payload(data_size);
stream->read_bytes(o.payload(), data_size);
}
if (!stream->require(4)) {

View File

@ -40,8 +40,8 @@ char *SrsSrtPacket::wrap(int size)
actual_buffer_size_ = size;
// If the buffer is large enough, reuse it.
if (shared_buffer_ && shared_buffer_->size >= size) {
return shared_buffer_->payload;
if (shared_buffer_ && shared_buffer_->size() >= size) {
return shared_buffer_->payload();
}
// Create a large enough message, with under-layer buffer.
@ -51,7 +51,7 @@ char *SrsSrtPacket::wrap(int size)
char *buf = new char[size];
shared_buffer_->wrap(buf, size);
return shared_buffer_->payload;
return shared_buffer_->payload();
}
char *SrsSrtPacket::wrap(char *data, int size)
@ -70,9 +70,9 @@ char *SrsSrtPacket::wrap(SrsSharedPtrMessage *msg)
// Copy from the new message.
shared_buffer_ = msg->copy();
// If we wrap a message, the size of packet equals to the message size.
actual_buffer_size_ = shared_buffer_->size;
actual_buffer_size_ = shared_buffer_->size();
return msg->payload;
return msg->payload();
}
SrsSrtPacket *SrsSrtPacket::copy()
@ -87,12 +87,12 @@ SrsSrtPacket *SrsSrtPacket::copy()
char *SrsSrtPacket::data()
{
return shared_buffer_->payload;
return shared_buffer_->payload();
}
int SrsSrtPacket::size()
{
return shared_buffer_->size;
return shared_buffer_->size();
}
SrsSrtSourceManager::SrsSrtSourceManager()
@ -546,8 +546,7 @@ srs_error_t SrsSrtFrameBuilder::on_h264_frame(SrsTsMessage *msg, vector<pair<cha
SrsCommonMessage rtmp;
rtmp.header.initialize_video(frame_size, dts, video_streamid_);
rtmp.create_payload(frame_size);
rtmp.size = frame_size;
SrsBuffer payload(rtmp.payload, rtmp.size);
SrsBuffer payload(rtmp.payload(), rtmp.size());
// Write 5bytes video tag header.
if (is_keyframe) {
payload.write_1bytes(0x17); // type(4 bits): key frame; code(4bits): avc
@ -740,8 +739,7 @@ srs_error_t SrsSrtFrameBuilder::on_hevc_frame(SrsTsMessage *msg, vector<pair<cha
SrsCommonMessage rtmp;
rtmp.header.initialize_video(frame_size, dts, video_streamid_);
rtmp.create_payload(frame_size);
rtmp.size = frame_size;
SrsBuffer payload(rtmp.payload, rtmp.size);
SrsBuffer payload(rtmp.payload(), rtmp.size());
// Write 5bytes video tag header.
@ -875,9 +873,8 @@ srs_error_t SrsSrtFrameBuilder::check_audio_sh_change(SrsTsMessage *msg, uint32_
SrsCommonMessage rtmp;
rtmp.header.initialize_audio(rtmp_len, pts, audio_streamid_);
rtmp.create_payload(rtmp_len);
rtmp.size = rtmp_len;
SrsBuffer stream(rtmp.payload, rtmp_len);
SrsBuffer stream(rtmp.payload(), rtmp_len);
uint8_t aac_flag = (SrsAudioCodecIdAAC << 4) | (SrsAudioSampleRate44100 << 2) | (SrsAudioSampleBits16bit << 1) | SrsAudioChannelsStereo;
stream.write_1bytes(aac_flag);
stream.write_1bytes(0);
@ -904,9 +901,8 @@ srs_error_t SrsSrtFrameBuilder::on_aac_frame(SrsTsMessage *msg, uint32_t pts, ch
SrsCommonMessage rtmp;
rtmp.header.initialize_audio(rtmp_len, pts, audio_streamid_);
rtmp.create_payload(rtmp_len);
rtmp.size = rtmp_len;
SrsBuffer stream(rtmp.payload, rtmp_len);
SrsBuffer stream(rtmp.payload(), rtmp_len);
uint8_t aac_flag = (SrsAudioCodecIdAAC << 4) | (SrsAudioSampleRate44100 << 2) | (SrsAudioSampleBits16bit << 1) | SrsAudioChannelsStereo;
// Write 2bytes audio tag header.
stream.write_1bytes(aac_flag);

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7
#define VERSION_MINOR 0
#define VERSION_REVISION 72
#define VERSION_REVISION 73
#endif

View File

@ -523,3 +523,57 @@ srs_error_t SrsBitBuffer::read_bits_se(int32_t &v)
return err;
}
SrsMemoryBlock::SrsMemoryBlock()
{
payload_ = NULL;
size_ = 0;
}
SrsMemoryBlock::~SrsMemoryBlock()
{
srs_freepa(payload_);
}
void SrsMemoryBlock::create(int size)
{
srs_assert(size >= 0);
// Free existing payload
srs_freepa(payload_);
// Allocate new buffer
if (size > 0) {
payload_ = new char[size];
memset(payload_, 0, size);
size_ = size;
} else {
payload_ = NULL;
size_ = 0;
}
}
void SrsMemoryBlock::create(char *data, int size)
{
srs_assert(size >= 0);
create(size);
// Copy data if provided
if (data && size > 0) {
memcpy(payload_, data, size);
size_ = size;
}
}
void SrsMemoryBlock::attach(char *data, int size)
{
srs_assert(size >= 0);
// Free existing payload
srs_freepa(payload_);
// Attach new buffer
payload_ = data;
size_ = size;
}

View File

@ -195,4 +195,43 @@ public:
srs_error_t read_bits_se(int32_t &v);
};
// Memory block for shared payload data.
// This class encapsulates a memory buffer with size information,
// designed to be used with SrsSharedPtr for efficient memory sharing.
class SrsMemoryBlock
{
private:
// The current message parsed size,
// size <= allocated buffer size
// For the payload maybe sent in multiple chunks.
int size_;
// The payload of message, the SrsMemoryBlock manages the memory lifecycle.
// @remark, not all message payload can be decoded to packet. for example,
// video/audio packet use raw bytes, no video/audio packet.
char *payload_;
public:
SrsMemoryBlock();
virtual ~SrsMemoryBlock();
public:
char *payload() { return payload_; }
int size() { return size_; }
public:
// Create memory block with specified size.
// @param size, the size of memory to allocate. Must be non-negative.
virtual void create(int size);
// Create memory block from existing buffer.
// @param data, the buffer to copy from.
// @param size, the size of buffer. Must be non-negative.
// @remark, this method will copy the data.
virtual void create(char *data, int size);
// Attach existing buffer to memory block.
// @param data, the buffer to attach, memory block takes ownership.
// @param size, the size of buffer. Must be non-negative.
// @remark, this method takes ownership of data, caller should not free it.
virtual void attach(char *data, int size);
};
#endif

View File

@ -265,89 +265,62 @@ void SrsMessageHeader::initialize_video(int size, uint32_t time, int stream)
SrsCommonMessage::SrsCommonMessage()
{
payload = NULL;
size = 0;
payload_ = SrsSharedPtr<SrsMemoryBlock>(NULL);
}
SrsCommonMessage::~SrsCommonMessage()
{
srs_freepa(payload);
// payload_ automatically cleaned up by SrsSharedPtr
}
void SrsCommonMessage::create_payload(int size)
{
srs_freepa(payload);
payload = new char[size];
payload_ = SrsSharedPtr<SrsMemoryBlock>(new SrsMemoryBlock());
payload_->create(size);
srs_verbose("create payload for RTMP message. size=%d", size);
}
srs_error_t SrsCommonMessage::create(SrsMessageHeader *pheader, char *body, int size)
{
// drop previous payload.
srs_freepa(payload);
srs_error_t err = srs_success;
this->header = *pheader;
this->payload = body;
this->size = size;
if (size < 0) {
return srs_error_new(ERROR_RTMP_MESSAGE_CREATE, "create message size=%d", size);
}
return srs_success;
// Create new memory block and attach the body
payload_ = SrsSharedPtr<SrsMemoryBlock>(new SrsMemoryBlock());
payload_->attach(body, size);
if (pheader) {
this->header = *pheader;
}
return err;
}
SrsSharedMessageHeader::SrsSharedMessageHeader()
SrsSharedPtrMessage::SrsSharedPtrMessage() : timestamp(0), stream_id(0), message_type(0), prefer_cid(RTMP_CID_OverConnection)
{
payload_length = 0;
message_type = 0;
prefer_cid = 0;
}
SrsSharedMessageHeader::~SrsSharedMessageHeader()
{
}
SrsSharedPtrMessage::SrsSharedPtrPayload::SrsSharedPtrPayload()
{
payload = NULL;
size = 0;
shared_count = 0;
}
SrsSharedPtrMessage::SrsSharedPtrPayload::~SrsSharedPtrPayload()
{
srs_freepa(payload);
}
SrsSharedPtrMessage::SrsSharedPtrMessage() : timestamp(0), stream_id(0), size(0), payload(NULL)
{
ptr = NULL;
payload_ = SrsSharedPtr<SrsMemoryBlock>(NULL);
++_srs_pps_objs_msgs->sugar;
}
SrsSharedPtrMessage::~SrsSharedPtrMessage()
{
if (ptr) {
if (ptr->shared_count == 0) {
srs_freep(ptr);
} else {
ptr->shared_count--;
}
}
// payload_ automatically cleaned up by SrsSharedPtr
}
srs_error_t SrsSharedPtrMessage::create(SrsCommonMessage *msg)
{
srs_error_t err = srs_success;
if ((err = create(&msg->header, msg->payload, msg->size)) != srs_success) {
return srs_error_wrap(err, "create message");
}
// to prevent double free of payload:
// initialize already attach the payload of msg,
// detach the payload to transfer the owner to shared ptr.
msg->payload = NULL;
msg->size = 0;
// Share the memory block from the common message
payload_ = msg->payload_;
this->timestamp = msg->header.timestamp;
this->stream_id = msg->header.stream_id;
this->message_type = msg->header.message_type;
this->prefer_cid = msg->header.prefer_cid;
return err;
}
@ -360,58 +333,35 @@ srs_error_t SrsSharedPtrMessage::create(SrsMessageHeader *pheader, char *payload
return srs_error_new(ERROR_RTMP_MESSAGE_CREATE, "create message size=%d", size);
}
srs_assert(!ptr);
ptr = new SrsSharedPtrPayload();
// Create new memory block and attach the payload
payload_ = SrsSharedPtr<SrsMemoryBlock>(new SrsMemoryBlock());
payload_->attach(payload, size);
// direct attach the data.
// Set header information
if (pheader) {
ptr->header.message_type = pheader->message_type;
ptr->header.payload_length = size;
ptr->header.prefer_cid = pheader->prefer_cid;
this->timestamp = pheader->timestamp;
this->stream_id = pheader->stream_id;
this->message_type = pheader->message_type;
this->prefer_cid = pheader->prefer_cid;
}
ptr->payload = payload;
ptr->size = size;
// message can access it.
this->payload = ptr->payload;
this->size = ptr->size;
return err;
}
void SrsSharedPtrMessage::wrap(char *payload, int size)
{
srs_assert(!ptr);
ptr = new SrsSharedPtrPayload();
ptr->payload = payload;
ptr->size = size;
this->payload = ptr->payload;
this->size = ptr->size;
}
int SrsSharedPtrMessage::count()
{
return ptr ? ptr->shared_count : 0;
// Create new memory block and wrap the payload
payload_ = SrsSharedPtr<SrsMemoryBlock>(new SrsMemoryBlock());
payload_->attach(payload, size);
}
bool SrsSharedPtrMessage::check(int stream_id)
{
// Ignore error when message has no payload.
if (!ptr) {
if (!payload_.get()) {
return true;
}
// we donot use the complex basic header,
// ensure the basic header is 1bytes.
if (ptr->header.prefer_cid < 2 || ptr->header.prefer_cid > 63) {
srs_info("change the chunk_id=%d to default=%d", ptr->header.prefer_cid, RTMP_CID_ProtocolControl);
ptr->header.prefer_cid = RTMP_CID_ProtocolControl;
}
// we assume that the stream_id in a group must be the same.
if (this->stream_id == stream_id) {
return true;
@ -423,38 +373,40 @@ bool SrsSharedPtrMessage::check(int stream_id)
bool SrsSharedPtrMessage::is_av()
{
return ptr->header.message_type == RTMP_MSG_AudioMessage || ptr->header.message_type == RTMP_MSG_VideoMessage;
return message_type == RTMP_MSG_AudioMessage || message_type == RTMP_MSG_VideoMessage;
}
bool SrsSharedPtrMessage::is_audio()
{
return ptr->header.message_type == RTMP_MSG_AudioMessage;
return message_type == RTMP_MSG_AudioMessage;
}
bool SrsSharedPtrMessage::is_video()
{
return ptr->header.message_type == RTMP_MSG_VideoMessage;
return message_type == RTMP_MSG_VideoMessage;
}
int SrsSharedPtrMessage::chunk_header(char *cache, int nb_cache, bool c0)
{
int payload_length = payload_.get() ? payload_->size() : 0;
if (c0) {
return srs_chunk_header_c0(ptr->header.prefer_cid, (uint32_t)timestamp,
ptr->header.payload_length, ptr->header.message_type, stream_id, cache, nb_cache);
return srs_chunk_header_c0(prefer_cid, (uint32_t)timestamp,
payload_length, message_type, stream_id, cache, nb_cache);
} else {
return srs_chunk_header_c3(ptr->header.prefer_cid, (uint32_t)timestamp,
return srs_chunk_header_c3(prefer_cid, (uint32_t)timestamp,
cache, nb_cache);
}
}
SrsSharedPtrMessage *SrsSharedPtrMessage::copy()
{
srs_assert(ptr);
SrsSharedPtrMessage *copy = copy2();
copy->timestamp = timestamp;
copy->stream_id = stream_id;
copy->message_type = message_type;
copy->prefer_cid = prefer_cid;
return copy;
}
@ -463,15 +415,8 @@ SrsSharedPtrMessage *SrsSharedPtrMessage::copy2()
{
SrsSharedPtrMessage *copy = new SrsSharedPtrMessage();
// We got an object from cache, the ptr might exists, so unwrap it.
// srs_assert(!copy->ptr);
// Reference to this message instead.
copy->ptr = ptr;
ptr->shared_count++;
copy->payload = ptr->payload;
copy->size = ptr->size;
// Share the memory block
copy->payload_ = payload_;
return copy;
}
@ -665,23 +610,23 @@ srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage **msgs, int count)
if (msg->is_audio()) {
if (drop_if_not_match_ && !has_audio_)
continue; // Ignore audio packets if no audio stream.
cache_audio(msg->timestamp, msg->payload, msg->size, cache);
cache_audio(msg->timestamp, msg->payload(), msg->size(), cache);
} else if (msg->is_video()) {
if (drop_if_not_match_ && !has_video_)
continue; // Ignore video packets if no video stream.
cache_video(msg->timestamp, msg->payload, msg->size, cache);
cache_video(msg->timestamp, msg->payload(), msg->size(), cache);
} else {
cache_metadata(SrsFrameTypeScript, msg->payload, msg->size, cache);
cache_metadata(SrsFrameTypeScript, msg->payload(), msg->size(), cache);
}
// Cache FLV pts.
cache_pts(SRS_FLV_TAG_HEADER_SIZE + msg->size, pts);
cache_pts(SRS_FLV_TAG_HEADER_SIZE + msg->size(), pts);
// Set cache to iovec.
iovs[0].iov_base = cache;
iovs[0].iov_len = SRS_FLV_TAG_HEADER_SIZE;
iovs[1].iov_base = msg->payload;
iovs[1].iov_len = msg->size;
iovs[1].iov_base = msg->payload();
iovs[1].iov_len = msg->size();
iovs[2].iov_base = pts;
iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE;

View File

@ -12,6 +12,9 @@
#include <string>
#include <vector>
#include <srs_core_autofree.hpp>
#include <srs_kernel_buffer.hpp>
// For srs-librtmp, @see https://github.com/ossrs/srs/issues/213
#ifndef _WIN32
#include <sys/uio.h>
@ -192,25 +195,23 @@ public:
// while the shared ptr message used to copy and send.
class SrsCommonMessage
{
public:
// 4.1. Message Header
public:
SrsMessageHeader header;
// 4.2. Message Payload
public:
// The current message parsed size,
// size <= header.payload_length
// For the payload maybe sent in multiple chunks.
int size;
// The payload of message, the SrsCommonMessage never know about the detail of payload,
// user must use SrsProtocol.decode_message to get concrete packet.
// @remark, not all message payload can be decoded to packet. for example,
// video/audio packet use raw bytes, no video/audio packet.
char *payload;
// 4.2. Message Payload
SrsSharedPtr<SrsMemoryBlock> payload_;
public:
SrsCommonMessage();
virtual ~SrsCommonMessage();
public:
// Backward compatibility accessors
char *payload() { return payload_.get() ? payload_->payload() : NULL; }
int size() { return payload_.get() ? payload_->size() : 0; }
public:
// Alloc the payload to specified size of bytes.
virtual void create_payload(int size);
@ -223,30 +224,6 @@ public:
virtual srs_error_t create(SrsMessageHeader *pheader, char *body, int size);
};
// The message header for shared ptr message.
// only the message for all msgs are same.
class SrsSharedMessageHeader
{
public:
// 3bytes.
// Three-byte field that represents the size of the payload in bytes.
// It is set in big-endian format.
int32_t payload_length;
// 1byte.
// One byte field to represent the message type. A range of type IDs
// (1-7) are reserved for protocol control messages.
// For example, RTMP_MSG_AudioMessage or RTMP_MSG_VideoMessage.
int8_t message_type;
// Get the prefered cid(chunk stream id) which sendout over.
// set at decoding, and canbe used for directly send message,
// For example, dispatch to all connections.
int prefer_cid;
public:
SrsSharedMessageHeader();
virtual ~SrsSharedMessageHeader();
};
// The shared ptr message.
// For audio/video/data message that need less memory copy.
// and only for output.
@ -257,8 +234,6 @@ class SrsSharedPtrMessage
{
// 4.1. Message Header
public:
// The header can shared, only set the timestamp and stream id.
// SrsSharedMessageHeader header;
// Four-byte field that contains a timestamp of the message.
// The 4 bytes are packed in the big-endian order.
// @remark, used as calc timestamp when decode and encode time.
@ -268,41 +243,24 @@ public:
// Four-byte field that identifies the stream of the message. These
// bytes are set in big-endian format.
int32_t stream_id;
// 4.2. Message Payload
// Message type for determining audio/video/data
int8_t message_type;
// Preferred chunk ID for RTMP chunking
int prefer_cid;
public:
// The current message parsed size,
// size <= header.payload_length
// For the payload maybe sent in multiple chunks.
int size;
// The payload of message, the SrsCommonMessage never know about the detail of payload,
// user must use SrsProtocol.decode_message to get concrete packet.
// @remark, not all message payload can be decoded to packet. for example,
// video/audio packet use raw bytes, no video/audio packet.
char *payload;
private:
class SrsSharedPtrPayload
{
public:
// The shared message header.
SrsSharedMessageHeader header;
// The actual shared payload.
char *payload;
// The size of payload.
int size;
// The reference count
int shared_count;
public:
SrsSharedPtrPayload();
virtual ~SrsSharedPtrPayload();
};
SrsSharedPtrPayload *ptr;
// 4.2. Message Payload
SrsSharedPtr<SrsMemoryBlock> payload_;
public:
SrsSharedPtrMessage();
virtual ~SrsSharedPtrMessage();
public:
// Backward compatibility accessors
char *payload() { return payload_.get() ? payload_->payload() : NULL; }
int size() { return payload_.get() ? payload_->size() : 0; }
public:
// Create shared ptr message,
// copy header, manage the payload of msg,
@ -318,12 +276,6 @@ public:
// Create shared ptr message from RAW payload.
// @remark Note that the header is set to zero.
virtual void wrap(char *payload, int size);
// Get current reference count.
// when this object created, count set to 0.
// if copy() this object, count increase 1.
// if this or copy deleted, free payload when count is 0, or count--.
// @remark, assert object is created.
virtual int count();
// check prefer cid and stream id.
// @return whether stream id already set.
virtual bool check(int stream_id);

View File

@ -366,6 +366,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats)
}
string &recvfrom_desc = stats->recvfrom_desc;
(void)recvfrom_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_recvfrom->update(_st_stat_recvfrom);
_srs_pps_recvfrom_eagain->update(_st_stat_recvfrom_eagain);
@ -378,6 +379,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats)
#endif
string &io_desc = stats->io_desc;
(void)io_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_read->update(_st_stat_read);
_srs_pps_read_eagain->update(_st_stat_read_eagain);
@ -392,6 +394,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats)
#endif
string &msg_desc = stats->msg_desc;
(void)msg_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_recvmsg->update(_st_stat_recvmsg);
_srs_pps_recvmsg_eagain->update(_st_stat_recvmsg_eagain);
@ -404,6 +407,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats)
#endif
string &epoll_desc = stats->epoll_desc;
(void)epoll_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_epoll->update(_st_stat_epoll);
_srs_pps_epoll_zero->update(_st_stat_epoll_zero);
@ -416,6 +420,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats)
#endif
string &sched_desc = stats->sched_desc;
(void)sched_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_sched_160ms->update(_st_stat_sched_160ms);
_srs_pps_sched_s->update(_st_stat_sched_s);
@ -448,6 +453,7 @@ void srs_global_kbps_update(SrsKbpsStats *stats)
}
string &thread_desc = stats->thread_desc;
(void)thread_desc;
#if defined(SRS_DEBUG) && defined(SRS_DEBUG_STATS)
_srs_pps_thread_run->update(_st_stat_thread_run);
_srs_pps_thread_idle->update(_st_stat_thread_idle);

View File

@ -761,7 +761,7 @@ SrsRtpPacket::SrsRtpPacket()
{
payload_ = NULL;
payload_type_ = SrsRtpPacketPayloadTypeUnknown;
shared_buffer_ = NULL;
shared_buffer_ = SrsSharedPtr<SrsMemoryBlock>(NULL);
actual_buffer_size_ = 0;
nalu_type = 0;
@ -776,7 +776,7 @@ SrsRtpPacket::SrsRtpPacket()
SrsRtpPacket::~SrsRtpPacket()
{
srs_freep(payload_);
srs_freep(shared_buffer_);
// shared_buffer_ automatically cleaned up by SrsSharedPtr
}
char *SrsRtpPacket::wrap(int size)
@ -785,23 +785,22 @@ char *SrsRtpPacket::wrap(int size)
actual_buffer_size_ = size;
// If the buffer is large enough, reuse it.
if (shared_buffer_ && shared_buffer_->size >= size) {
return shared_buffer_->payload;
if (shared_buffer_.get() && shared_buffer_->size() >= size) {
return shared_buffer_->payload();
}
// Create a large enough message, with under-layer buffer.
srs_freep(shared_buffer_);
shared_buffer_ = new SrsSharedPtrMessage();
// Create a large enough memory block, with under-layer buffer.
shared_buffer_ = SrsSharedPtr<SrsMemoryBlock>(new SrsMemoryBlock());
// Create under-layer buffer for new message
// Create under-layer buffer for new memory block
// For RTC, we use larger under-layer buffer for each packet.
int nb_buffer = srs_max(size, kRtpPacketSize);
char *buf = new char[nb_buffer];
shared_buffer_->wrap(buf, nb_buffer);
shared_buffer_->attach(buf, nb_buffer);
++_srs_pps_objs_rbuf->sugar;
return shared_buffer_->payload;
return shared_buffer_->payload();
}
char *SrsRtpPacket::wrap(char *data, int size)
@ -811,18 +810,14 @@ char *SrsRtpPacket::wrap(char *data, int size)
return buf;
}
char *SrsRtpPacket::wrap(SrsSharedPtrMessage *msg)
char *SrsRtpPacket::wrap(SrsSharedPtr<SrsMemoryBlock> block)
{
// Generally, the wrap(msg) is used for RTMP to RTC, where the msg
// is not generated by RTC.
srs_freep(shared_buffer_);
// Copy the shared memory block.
shared_buffer_ = block;
// If we wrap a memory block, the size of packet equals to the block size.
actual_buffer_size_ = shared_buffer_.get() ? shared_buffer_->size() : 0;
// Copy from the new message.
shared_buffer_ = msg->copy();
// If we wrap a message, the size of packet equals to the message size.
actual_buffer_size_ = shared_buffer_->size;
return msg->payload;
return shared_buffer_.get() ? shared_buffer_->payload() : NULL;
}
SrsRtpPacket *SrsRtpPacket::copy()
@ -834,7 +829,7 @@ SrsRtpPacket *SrsRtpPacket::copy()
cp->payload_type_ = payload_type_;
cp->nalu_type = nalu_type;
cp->shared_buffer_ = shared_buffer_ ? shared_buffer_->copy2() : NULL;
cp->shared_buffer_ = shared_buffer_; // Copy shared pointer
cp->actual_buffer_size_ = actual_buffer_size_;
cp->frame_type = frame_type;

View File

@ -9,6 +9,7 @@
#include <srs_core.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_codec.hpp>
@ -27,6 +28,7 @@
#define SRS_NACK_DEBUG_DROP_PACKET_N 3
class SrsRtpPacket;
class SrsMemoryBlock;
// The RTP packet max size, should never exceed this size.
const int kRtpPacketSize = 1500;
@ -64,7 +66,6 @@ const uint8_t kEnd = 0x40; // Fu-header end bit
class SrsBuffer;
class SrsRtpRawPayload;
class SrsRtpFUAPayload2;
class SrsSharedPtrMessage;
class SrsRtpExtensionTypes;
// Fast parse the SSRC from RTP packet. Return 0 if invalid.
@ -329,11 +330,11 @@ private:
SrsRtpPacketPayloadType payload_type_;
private:
// The original shared message, all RTP packets can refer to its data.
// Note that the size of shared msg, is not the packet size, it's a larger aligned buffer.
// The original shared memory block, all RTP packets can refer to its data.
// Note that the size of shared memory block, is not the packet size, it's a larger aligned buffer.
// @remark Note that it may point to the whole RTP packet(for RTP parser, which decode RTP packet from buffer),
// and it may point to the RTP payload(for RTMP to RTP, which build RTP header and payload).
SrsSharedPtrMessage *shared_buffer_;
SrsSharedPtr<SrsMemoryBlock> shared_buffer_;
// The size of RTP packet or RTP payload.
int actual_buffer_size_;
// Helper fields.
@ -360,8 +361,8 @@ public:
// Wrap buffer to shared_message, which is managed by us.
char *wrap(int size);
char *wrap(char *data, int size);
// Wrap the shared message, we copy it.
char *wrap(SrsSharedPtrMessage *msg);
// Wrap the shared memory block, we copy it.
char *wrap(SrsSharedPtr<SrsMemoryBlock> block);
// Copy the RTP packet.
virtual SrsRtpPacket *copy();

View File

@ -30,8 +30,8 @@ srs_error_t SrsRtmpFormat::on_metadata(SrsOnMetaDataPacket *meta)
srs_error_t SrsRtmpFormat::on_audio(SrsSharedPtrMessage *shared_audio)
{
SrsSharedPtrMessage *msg = shared_audio;
char *data = msg->payload;
int size = msg->size;
char *data = msg->payload();
int size = msg->size();
return SrsFormat::on_audio(msg->timestamp, data, size);
}
@ -44,8 +44,8 @@ srs_error_t SrsRtmpFormat::on_audio(int64_t timestamp, char *data, int size)
srs_error_t SrsRtmpFormat::on_video(SrsSharedPtrMessage *shared_video)
{
SrsSharedPtrMessage *msg = shared_video;
char *data = msg->payload;
int size = msg->size;
char *data = msg->payload();
int size = msg->size();
return SrsFormat::on_video(msg->timestamp, data, size);
}

View File

@ -341,7 +341,7 @@ srs_error_t SrsProtocol::recv_message(SrsCommonMessage **pmsg)
continue;
}
if (msg->size <= 0 || msg->header.payload_length <= 0) {
if (msg->size() <= 0 || msg->header.payload_length <= 0) {
srs_trace("ignore empty message(type=%d, size=%d, time=%" PRId64 ", sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
@ -368,10 +368,10 @@ srs_error_t SrsProtocol::decode_message(SrsCommonMessage *msg, SrsPacket **ppack
srs_error_t err = srs_success;
srs_assert(msg != NULL);
srs_assert(msg->payload != NULL);
srs_assert(msg->size > 0);
srs_assert(msg->payload() != NULL);
srs_assert(msg->size() > 0);
SrsBuffer stream(msg->payload, msg->size);
SrsBuffer stream(msg->payload(), msg->size());
// decode the packet.
SrsPacket *packet = NULL;
@ -407,20 +407,20 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage **msgs, int nb_msg
}
// ignore empty message.
if (!msg->payload || msg->size <= 0) {
if (!msg->payload() || msg->size() <= 0) {
continue;
}
// p set to current write position,
// it's ok when payload is NULL and size is 0.
char *p = msg->payload;
char *pend = msg->payload + msg->size;
char *p = msg->payload();
char *pend = msg->payload() + msg->size();
// always write the header event payload is empty.
while (p < pend) {
// always has header
int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;
int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload);
int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload());
srs_assert(nbh > 0);
// header iov
@ -985,6 +985,7 @@ srs_error_t SrsProtocol::read_message_header(SrsChunkStream *chunk, char fmt)
// create msg when new chunk stream start
if (!chunk->msg) {
chunk->msg = new SrsCommonMessage();
chunk->writing_pos_ = chunk->msg->payload();
}
// read message header from socket to buffer.
@ -1199,32 +1200,39 @@ srs_error_t SrsProtocol::read_message_payload(SrsChunkStream *chunk, SrsCommonMe
chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
*pmsg = chunk->msg;
chunk->msg = NULL;
chunk->writing_pos_ = NULL;
return err;
}
srs_assert(chunk->header.payload_length > 0);
// the chunk payload size.
int payload_size = chunk->header.payload_length - chunk->msg->size;
payload_size = srs_min(payload_size, in_chunk_size);
int nn_written = (int)(chunk->writing_pos_ - chunk->msg->payload());
int payload_size = chunk->header.payload_length - nn_written; // Left bytes to read.
payload_size = srs_min(payload_size, in_chunk_size); // Restrict to chunk size.
// create msg payload if not initialized
if (!chunk->msg->payload) {
if (!chunk->msg->payload()) {
chunk->msg->create_payload(chunk->header.payload_length);
chunk->writing_pos_ = chunk->msg->payload();
}
// read payload to buffer
if ((err = in_buffer->grow(skt, payload_size)) != srs_success) {
return srs_error_wrap(err, "read %d bytes payload", payload_size);
}
memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->read_slice(payload_size), payload_size);
chunk->msg->size += payload_size;
memcpy(chunk->writing_pos_, in_buffer->read_slice(payload_size), payload_size);
chunk->writing_pos_ += payload_size;
// got entire RTMP message?
if (chunk->header.payload_length == chunk->msg->size) {
nn_written = (int)(chunk->writing_pos_ - chunk->msg->payload());
if (nn_written == chunk->header.payload_length) {
*pmsg = chunk->msg;
chunk->msg = NULL;
chunk->writing_pos_ = NULL;
return err;
}
@ -1450,6 +1458,7 @@ SrsChunkStream::SrsChunkStream(int _cid)
cid = _cid;
has_extended_timestamp = false;
msg = NULL;
writing_pos_ = NULL;
msg_count = 0;
extended_timestamp = 0;
}

View File

@ -395,6 +395,8 @@ public:
bool has_extended_timestamp;
// The partially read message.
SrsCommonMessage *msg;
// Current writing position of message.
char *writing_pos_;
// Decoded msg count, to identify whether the chunk stream is fresh.
int64_t msg_count;
// Because the extended timestamp may be a delta timestamp, it can differ

View File

@ -148,7 +148,7 @@ srs_error_t SrsRtpVideoBuilder::package_nalus(SrsSharedPtrMessage *msg, const ve
pkt->header.set_sequence(video_sequence_++);
pkt->header.set_timestamp(msg->timestamp * 90);
pkt->set_payload(raw_raw, SrsRtpPacketPayloadTypeNALU);
pkt->wrap(msg);
pkt->wrap(msg->payload_);
} else {
// We must free it, should never use RTP packets to free it,
// because more than one RTP packet will refer to it.
@ -202,7 +202,7 @@ srs_error_t SrsRtpVideoBuilder::package_nalus(SrsSharedPtrMessage *msg, const ve
pkt->set_payload(fua, SrsRtpPacketPayloadTypeFUA);
}
pkt->wrap(msg);
pkt->wrap(msg->payload_);
nb_left -= packet_size;
}
@ -231,7 +231,7 @@ srs_error_t SrsRtpVideoBuilder::package_single_nalu(SrsSharedPtrMessage *msg, Sr
raw->payload = sample->bytes;
raw->nn_payload = sample->size;
pkt->wrap(msg);
pkt->wrap(msg->payload_);
return err;
}
@ -292,7 +292,7 @@ srs_error_t SrsRtpVideoBuilder::package_fu_a(SrsSharedPtrMessage *msg, SrsSample
fua->size = packet_size;
}
pkt->wrap(msg);
pkt->wrap(msg->payload_);
p += packet_size;
nb_left -= packet_size;

View File

@ -67,9 +67,9 @@ public:
SrsSharedPtrMessage::wrap(payload, 1024);
if (is_video_msg) {
ptr->header.message_type = RTMP_MSG_VideoMessage;
message_type = RTMP_MSG_VideoMessage;
} else {
ptr->header.message_type = RTMP_MSG_AudioMessage;
message_type = RTMP_MSG_AudioMessage;
}
}
virtual ~MockSrsSharedPtrMessage() {}

View File

@ -5336,14 +5336,6 @@ VOID TEST(KernelFLVTest, CoverSharedPtrMessage)
HELPER_EXPECT_FAILED(m.create(&h, NULL, -1));
}
if (true) {
SrsCommonMessage cm;
cm.size = -1;
SrsSharedPtrMessage m;
HELPER_EXPECT_FAILED(m.create(&cm));
}
if (true) {
SrsMessageHeader h;
h.prefer_cid = 1;
@ -6926,3 +6918,93 @@ VOID TEST(KernelUtilityTest, Base64Decode)
EXPECT_STRNE("admin:admin", plaintext.c_str());
}
}
VOID TEST(KernelMemoryBlockTest, MemoryBlockBasic)
{
// Test basic construction and destruction
if (true) {
SrsMemoryBlock block;
EXPECT_EQ(0, block.size());
EXPECT_EQ(NULL, block.payload());
}
// Test create with size
if (true) {
SrsMemoryBlock block;
block.create(1024);
EXPECT_EQ(1024, block.size());
EXPECT_NE((char *)NULL, block.payload());
}
// Test create with data
if (true) {
SrsMemoryBlock block;
char test_data[] = "Hello, World!";
int test_size = strlen(test_data);
block.create(test_data, test_size);
EXPECT_EQ(test_size, block.size());
EXPECT_NE((char *)NULL, block.payload());
EXPECT_EQ(0, memcmp(block.payload(), test_data, test_size));
}
// Test attach
if (true) {
SrsMemoryBlock block;
char *test_data = new char[100];
memset(test_data, 0x42, 100);
block.attach(test_data, 100);
EXPECT_EQ(100, block.size());
EXPECT_EQ(test_data, block.payload());
// Memory will be freed by block destructor
}
}
VOID TEST(KernelMemoryBlockTest, SharedMemoryBlock)
{
// Test basic shared memory block usage
if (true) {
SrsSharedPtr<SrsMemoryBlock> shared_block(new SrsMemoryBlock());
shared_block->create(1024);
EXPECT_EQ(1024, shared_block->size());
EXPECT_NE((char *)NULL, shared_block->payload());
// Test sharing
SrsSharedPtr<SrsMemoryBlock> shared_copy = shared_block;
EXPECT_EQ(shared_block->payload(), shared_copy->payload());
EXPECT_EQ(shared_block->size(), shared_copy->size());
}
// Test multiple references
if (true) {
SrsSharedPtr<SrsMemoryBlock> original(new SrsMemoryBlock());
char test_data[] = "Shared memory test data";
original->create(test_data, strlen(test_data));
// Create multiple references
SrsSharedPtr<SrsMemoryBlock> copy1 = original;
SrsSharedPtr<SrsMemoryBlock> copy2 = original;
SrsSharedPtr<SrsMemoryBlock> copy3 = copy1;
// All should point to the same memory
EXPECT_EQ(original->payload(), copy1->payload());
EXPECT_EQ(original->payload(), copy2->payload());
EXPECT_EQ(original->payload(), copy3->payload());
// All should have the same size
EXPECT_EQ(original->size(), copy1->size());
EXPECT_EQ(original->size(), copy2->size());
EXPECT_EQ(original->size(), copy3->size());
// Verify data integrity
EXPECT_EQ(0, memcmp(original->payload(), test_data, strlen(test_data)));
EXPECT_EQ(0, memcmp(copy1->payload(), test_data, strlen(test_data)));
EXPECT_EQ(0, memcmp(copy2->payload(), test_data, strlen(test_data)));
EXPECT_EQ(0, memcmp(copy3->payload(), test_data, strlen(test_data)));
}
}

View File

@ -899,7 +899,6 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray)
SrsSharedPtrMessage msg;
char *payload = new char[1024];
HELPER_EXPECT_SUCCESS(msg.create(&header, payload, 1024));
EXPECT_EQ(0, msg.count());
if (true) {
SrsMessageArray arr(3);
@ -908,15 +907,11 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray)
SrsUniquePtr<SrsMessageArray> parr2(parr, srs_utest_free_message_array);
arr.msgs[0] = msg.copy();
EXPECT_EQ(1, msg.count());
arr.msgs[1] = msg.copy();
EXPECT_EQ(2, msg.count());
arr.msgs[2] = msg.copy();
EXPECT_EQ(3, msg.count());
}
EXPECT_EQ(0, msg.count());
if (true) {
SrsMessageArray arr(3);
@ -925,12 +920,9 @@ VOID TEST(ProtocolMsgArrayTest, MessageArray)
SrsUniquePtr<SrsMessageArray> parr2(parr, srs_utest_free_message_array);
arr.msgs[0] = msg.copy();
EXPECT_EQ(1, msg.count());
arr.msgs[2] = msg.copy();
EXPECT_EQ(2, msg.count());
}
EXPECT_EQ(0, msg.count());
}
/**
@ -4402,9 +4394,8 @@ VOID TEST(ProtocolStackTest, ProtocolSendVMessage)
SrsCommonMessage *msg = new SrsCommonMessage();
SrsUniquePtr<SrsCommonMessage> msg_uptr(msg);
msg->size = sizeof(data);
msg->payload = new char[msg->size];
memcpy(msg->payload, data, msg->size);
msg->create_payload(sizeof(data));
memcpy(msg->payload(), data, sizeof(data));
SrsSharedPtrMessage m;
HELPER_ASSERT_SUCCESS(m.create(msg));
@ -4964,8 +4955,8 @@ VOID TEST(ProtocolStackTest, ProtocolAckSizeFlow)
if (true) {
SrsCommonMessage *msg = new SrsCommonMessage();
SrsUniquePtr<SrsCommonMessage> msg_uptr(msg);
msg->header.payload_length = msg->size = 4096;
msg->payload = new char[msg->size];
msg->create_payload(4096);
msg->header.payload_length = 4096;
msg->header.message_type = 9;
EXPECT_TRUE(msg->header.is_video());
@ -5014,8 +5005,8 @@ VOID TEST(ProtocolStackTest, ProtocolAckSizeFlow)
if (true) {
SrsCommonMessage *msg = new SrsCommonMessage();
SrsUniquePtr<SrsCommonMessage> msg_uptr(msg);
msg->header.payload_length = msg->size = 4096;
msg->payload = new char[msg->size];
msg->header.payload_length = 4096;
msg->create_payload(4096);
msg->header.message_type = 9;
EXPECT_TRUE(msg->header.is_video());

View File

@ -250,7 +250,6 @@ VOID TEST(ProtocolRTMPTest, SendPacketsError)
SrsCommonMessage pkt;
pkt.header.initialize_audio(200, 1000, 1);
pkt.create_payload(256);
pkt.size = 256;
SrsSharedPtrMessage *msg = new SrsSharedPtrMessage();
msg->create(&pkt);
@ -346,7 +345,6 @@ VOID TEST(ProtocolRTMPTest, HugeMessages)
SrsCommonMessage pkt;
pkt.header.initialize_audio(200, 1000, 1);
pkt.create_payload(256);
pkt.size = 256;
SrsSharedPtrMessage *msg = new SrsSharedPtrMessage();
msg->create(&pkt);
@ -362,7 +360,6 @@ VOID TEST(ProtocolRTMPTest, HugeMessages)
SrsCommonMessage pkt;
pkt.header.initialize_audio(200, 1000, 1);
pkt.create_payload(256);
pkt.size = 256;
SrsSharedPtrMessage *msg = new SrsSharedPtrMessage();
msg->create(&pkt);
@ -384,7 +381,6 @@ VOID TEST(ProtocolRTMPTest, HugeMessages)
SrsCommonMessage pkt;
pkt.header.initialize_audio(200, 1000, 1);
pkt.create_payload(256);
pkt.size = 256;
SrsSharedPtrMessage *msg = new SrsSharedPtrMessage();
msg->create(&pkt);
@ -412,7 +408,6 @@ VOID TEST(ProtocolRTMPTest, DecodeMessages)
SrsCommonMessage msg;
msg.header.initialize_amf0_script(1, 1);
msg.create_payload(1);
msg.size = 1;
SrsPacket *pkt;
HELPER_EXPECT_FAILED(p.decode_message(&msg, &pkt));
@ -455,8 +450,7 @@ SrsCommonMessage *_create_amf0(char *bytes, int size, int stream_id)
SrsCommonMessage *msg = new SrsCommonMessage();
msg->header.initialize_amf0_script(size, stream_id);
msg->create_payload(size);
memcpy(msg->payload, bytes, size);
msg->size = size;
memcpy(msg->payload(), bytes, size);
return msg;
}
@ -2949,7 +2943,7 @@ VOID TEST(ProtocolRTMPTest, CreateRTMPMessage)
if (true) {
SrsSharedPtrMessage *msg = NULL;
HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeScript, 0, _strcpy("Hello"), 5, 0, &msg));
EXPECT_STREQ("Hello", msg->payload);
EXPECT_STREQ("Hello", msg->payload());
srs_freep(msg);
}
@ -2957,7 +2951,7 @@ VOID TEST(ProtocolRTMPTest, CreateRTMPMessage)
if (true) {
SrsSharedPtrMessage *msg = NULL;
HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeVideo, 0, _strcpy("Hello"), 5, 0, &msg));
EXPECT_STREQ("Hello", msg->payload);
EXPECT_STREQ("Hello", msg->payload());
srs_freep(msg);
}
@ -2965,13 +2959,13 @@ VOID TEST(ProtocolRTMPTest, CreateRTMPMessage)
if (true) {
SrsSharedPtrMessage *msg = NULL;
HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeAudio, 0, _strcpy("Hello"), 5, 0, &msg));
EXPECT_STREQ("Hello", msg->payload);
EXPECT_STREQ("Hello", msg->payload());
srs_freep(msg);
}
if (true) {
SrsCommonMessage *msg = NULL;
HELPER_EXPECT_SUCCESS(srs_rtmp_create_msg(SrsFrameTypeAudio, 0, _strcpy("Hello"), 5, 0, &msg));
EXPECT_STREQ("Hello", msg->payload);
EXPECT_STREQ("Hello", msg->payload());
srs_freep(msg);
}
}