diff --git a/trunk/3rdparty/srs-bench/srs/rtmp_test.go b/trunk/3rdparty/srs-bench/srs/rtmp_test.go index 213172421..eedfaaeba 100644 --- a/trunk/3rdparty/srs-bench/srs/rtmp_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtmp_test.go @@ -21,19 +21,20 @@ package srs import ( + "bytes" "context" "fmt" - "github.com/ossrs/go-oryx-lib/avc" - "github.com/ossrs/go-oryx-lib/flv" - "github.com/pion/interceptor" "math/rand" "os" "sync" "testing" "time" + "github.com/ossrs/go-oryx-lib/avc" + "github.com/ossrs/go-oryx-lib/flv" "github.com/ossrs/go-oryx-lib/logger" "github.com/ossrs/go-oryx-lib/rtmp" + "github.com/pion/interceptor" ) func TestRtmpPublishPlay(t *testing.T) { @@ -126,12 +127,14 @@ func TestRtmpPublish_RtcPlay(t *testing.T) { api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { nn, attr, err := i.nextRTPReader.Read(payload, attributes) - if err == nil { - if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) { - cancel() // Completed. - } - logger.Tf(ctx, "Play RECV RTP #%v %vB", nnPlayReadRTP, nn) + if err != nil { + return nn, attr, err } + + if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) { + cancel() // Completed. + } + logger.Tf(ctx, "Play RECV RTP #%v %vB", nnPlayReadRTP, nn) return nn, attr, err } })) @@ -280,3 +283,113 @@ func TestRtmpPublish_MultipleSequences(t *testing.T) { t.Errorf("err %+v", err) } } + +func TestRtmpPublish_MultipleSequences_RtcPlay(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + + var r0, r1, r2 error + err := func() (err error) { + streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int()) + rtmpUrl := fmt.Sprintf("%v://%v%v-%v", srsSchema, *srsServer, *srsStream, streamSuffix) + + // Publisher connect to a RTMP stream. + publisher := NewRTMPPublisher() + defer publisher.Close() + + if err := publisher.Publish(ctx, rtmpUrl); err != nil { + return err + } + + // Setup the RTC player. + var thePlayer *testPlayer + if thePlayer, err = newTestPlayer(registerMiniCodecs, func(play *testPlayer) error { + play.streamSuffix = streamSuffix + var nnSpsPps uint64 + var previousSpsPps []byte + return play.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { + api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { + i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + nn, attr, err := i.nextRTPReader.Read(payload, attributes) + if err != nil { + return nn, attr, err + } + + annexb, nalus, err := DemuxRtpSpsPps(payload[:nn]) + if err != nil || len(nalus) == 0 || + (nalus[0].NALUType != avc.NALUTypeSPS && nalus[0].NALUType != avc.NALUTypePPS) || + bytes.Equal(annexb, previousSpsPps) { + return nn, attr, err + } + + previousSpsPps = annexb + if nnSpsPps++; nnSpsPps >= 2 { + cancel() // Completed. + } + logger.Tf(ctx, "Play RECV SPS/PPS #%v %vB %v", nnSpsPps, nn, nalus[0].NALUType) + return nn, attr, err + } + })) + }) + }); err != nil { + return err + } + defer thePlayer.Close() + + // Run publisher and players. + var wg sync.WaitGroup + defer wg.Wait() + + var playerIceReady context.Context + playerIceReady, thePlayer.iceReadyCancel = context.WithCancel(ctx) + + wg.Add(1) + go func() { + defer wg.Done() + if r1 = thePlayer.Run(logger.WithContext(ctx), cancel); r1 != nil { + cancel() + } + logger.Tf(ctx, "player done") + }() + + wg.Add(1) + go func() { + defer wg.Done() + + // Wait for player ready. + select { + case <-ctx.Done(): + return + case <-playerIceReady.Done(): + } + + var nnPackets int + ctxAvatar, cancelAvatar := context.WithCancel(ctx) + publisher.onSendPacket = func(m *rtmp.Message) error { + if m.MessageType == rtmp.MessageTypeVideo { + nnPackets++ + } + if nnPackets > 10 { + cancelAvatar() + } + return nil + } + + publisher.closeTransportWhenIngestDone = false + if r0 = publisher.Ingest(ctxAvatar, *srsPublishBBB); r0 != nil { + cancel() + } + + publisher.closeTransportWhenIngestDone = true + if r2 = publisher.Ingest(ctx, *srsPublishAvatar); r2 != nil { + cancel() + } + logger.Tf(ctx, "publisher done") + }() + + return nil + }() + if err := filterTestError(ctx.Err(), err, r0, r1, r2); err != nil { + t.Errorf("err %+v", err) + } +} diff --git a/trunk/3rdparty/srs-bench/srs/util.go b/trunk/3rdparty/srs-bench/srs/util.go index 4e21c302c..b6543902e 100644 --- a/trunk/3rdparty/srs-bench/srs/util.go +++ b/trunk/3rdparty/srs-bench/srs/util.go @@ -29,6 +29,8 @@ import ( "github.com/ossrs/go-oryx-lib/avc" "github.com/ossrs/go-oryx-lib/flv" "github.com/ossrs/go-oryx-lib/rtmp" + "github.com/pion/rtp" + "github.com/pion/rtp/codecs" "io" "math/rand" "net" @@ -1657,3 +1659,42 @@ func IsNALUEquals(a, b *avc.NALU) bool { return bytes.Equal(a.Data, b.Data) } + +func DemuxRtpSpsPps(payload []byte) ([]byte, []*avc.NALU, error) { + // Parse RTP packet. + pkt := rtp.Packet{} + if err := pkt.Unmarshal(payload); err != nil { + return nil, nil, err + } + + // Decode H264 packet. + h264Packet := codecs.H264Packet{} + annexb, err := h264Packet.Unmarshal(pkt.Payload) + if err != nil { + return annexb, nil, err + } + + // Ignore if not STAP-A + if !bytes.HasPrefix(annexb, []byte{0x00, 0x00, 0x00, 0x01}) { + return annexb, nil, err + } + + // Parse to NALUs + rawNalus := bytes.Split(annexb, []byte{0x00, 0x00, 0x00, 0x01}) + + nalus := []*avc.NALU{} + for _, rawNalu := range rawNalus { + if len(rawNalu) == 0 { + continue + } + + nalu := avc.NewNALU() + if err := nalu.UnmarshalBinary(rawNalu); err != nil { + return annexb, nil, err + } + + nalus = append(nalus, nalu) + } + + return annexb, nalus, nil +} diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 5f3f07b4e..814b9fa3a 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -8,6 +8,7 @@ The changelog for SRS. ## SRS 4.0 Changelog +* v4.0, 2021-10-11, Fix [#1641](https://github.com/ossrs/srs/issues/1641), HLS/RTC picture corrupt for SPS/PPS lost. v4.0.175 * v4.0, 2021-10-11, RTC: Refine config, aac to rtmp_to_rtc, bframe to keep_bframe. v4.0.174 * v4.0, 2021-10-10, For [#1641](https://github.com/ossrs/srs/issues/1641), Support RTMP publish and play regression test. v4.0.173 * v4.0, 2021-10-10, RTC: Change rtc.aac to discard by default. v4.0.172 diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index cd876d483..f95505861 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 174 +#define VERSION_REVISION 175 #endif diff --git a/trunk/src/kernel/srs_kernel_codec.cpp b/trunk/src/kernel/srs_kernel_codec.cpp index 5dc262f35..c0289c00a 100644 --- a/trunk/src/kernel/srs_kernel_codec.cpp +++ b/trunk/src/kernel/srs_kernel_codec.cpp @@ -818,10 +818,6 @@ srs_error_t SrsFormat::avc_demux_sps_pps(SrsBuffer* stream) if (!stream->require(sequenceParameterSetLength)) { return srs_error_new(ERROR_HLS_DECODE_ERROR, "decode SPS data"); } - if (vcodec->sequenceParameterSetNALUnit.size() > 0) { - stream->skip(sequenceParameterSetLength); - continue; - } if (sequenceParameterSetLength > 0) { vcodec->sequenceParameterSetNALUnit.resize(sequenceParameterSetLength); stream->read_bytes(&vcodec->sequenceParameterSetNALUnit[0], sequenceParameterSetLength); @@ -846,10 +842,6 @@ srs_error_t SrsFormat::avc_demux_sps_pps(SrsBuffer* stream) if (!stream->require(pictureParameterSetLength)) { return srs_error_new(ERROR_HLS_DECODE_ERROR, "decode PPS data"); } - if (vcodec->pictureParameterSetNALUnit.size() > 0) { - stream->skip(pictureParameterSetLength); - continue; - } if (pictureParameterSetLength > 0) { vcodec->pictureParameterSetNALUnit.resize(pictureParameterSetLength); stream->read_bytes(&vcodec->pictureParameterSetNALUnit[0], pictureParameterSetLength);