From 3fea5c0ec3c7d8721450cae60d0960dcb3ed4334 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 23 Mar 2021 19:32:59 +0800 Subject: [PATCH] Test: Add republish regression test, should fail --- trunk/3rdparty/srs-bench/srs/ingester.go | 31 +- trunk/3rdparty/srs-bench/srs/rtc_test.go | 576 +++++++++++------------ trunk/3rdparty/srs-bench/srs/util.go | 161 ++++--- trunk/src/app/srs_app_rtc_conn.cpp | 2 +- 4 files changed, 397 insertions(+), 373 deletions(-) diff --git a/trunk/3rdparty/srs-bench/srs/ingester.go b/trunk/3rdparty/srs-bench/srs/ingester.go index f38409e59..bcedebbb6 100644 --- a/trunk/3rdparty/srs-bench/srs/ingester.go +++ b/trunk/3rdparty/srs-bench/srs/ingester.go @@ -44,16 +44,20 @@ type videoIngester struct { markerInterceptor *RTPInterceptor sVideoTrack *webrtc.TrackLocalStaticSample sVideoSender *webrtc.RTPSender + ready context.Context + readyCancel context.CancelFunc } func NewVideoIngester(sourceVideo string) *videoIngester { - return &videoIngester{markerInterceptor: &RTPInterceptor{}, sourceVideo: sourceVideo} + v := &videoIngester{markerInterceptor: &RTPInterceptor{}, sourceVideo: sourceVideo} + v.ready, v.readyCancel = context.WithCancel(context.Background()) + return v } func (v *videoIngester) Close() error { + v.readyCancel() if v.sVideoSender != nil { - v.sVideoSender.Stop() - v.sVideoSender = nil + _ = v.sVideoSender.Stop() } return nil } @@ -102,6 +106,9 @@ func (v *videoIngester) Ingest(ctx context.Context) error { logger.Tf(ctx, "Video %v, tbn=%v, fps=%v, ssrc=%v, pt=%v, header=%v", codec.MimeType, codec.ClockRate, fps, enc.SSRC, codec.PayloadType, headers) + // OK, we are ready. + v.readyCancel() + clock := newWallClock() sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 / uint64(fps)) for ctx.Err() == nil { @@ -179,16 +186,21 @@ type audioIngester struct { audioLevelInterceptor *RTPInterceptor sAudioTrack *webrtc.TrackLocalStaticSample sAudioSender *webrtc.RTPSender + ready context.Context + readyCancel context.CancelFunc } func NewAudioIngester(sourceAudio string) *audioIngester { - return &audioIngester{audioLevelInterceptor: &RTPInterceptor{}, sourceAudio: sourceAudio} + v := &audioIngester{audioLevelInterceptor: &RTPInterceptor{}, sourceAudio: sourceAudio} + v.ready, v.readyCancel = context.WithCancel(context.Background()) + return v } func (v *audioIngester) Close() error { + v.readyCancel() // OK we are closed, also ready. + if v.sAudioSender != nil { - v.sAudioSender.Stop() - v.sAudioSender = nil + _ = v.sAudioSender.Stop() } return nil } @@ -240,6 +252,9 @@ func (v *audioIngester) Ingest(ctx context.Context) error { } } + // OK, we are ready. + v.readyCancel() + clock := newWallClock() var lastGranule uint64 @@ -253,7 +268,7 @@ func (v *audioIngester) Ingest(ctx context.Context) error { } // The amount of samples is the difference between the last and current timestamp - sampleCount := uint64(pageHeader.GranulePosition - lastGranule) + sampleCount := pageHeader.GranulePosition - lastGranule lastGranule = pageHeader.GranulePosition sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 * sampleCount / uint64(codec.ClockRate)) @@ -266,7 +281,7 @@ func (v *audioIngester) Ingest(ctx context.Context) error { return 0, err } - header.SetExtension(uint8(audioLevel.ID), audioLevelPayload) + _ = header.SetExtension(uint8(audioLevel.ID), audioLevelPayload) } return ri.nextRTPWriter.Write(header, payload, attributes) diff --git a/trunk/3rdparty/srs-bench/srs/rtc_test.go b/trunk/3rdparty/srs-bench/srs/rtc_test.go index 62ad26663..de5ac0ff0 100644 --- a/trunk/3rdparty/srs-bench/srs/rtc_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtc_test.go @@ -22,11 +22,13 @@ package srs import ( "context" + "encoding/json" "fmt" "github.com/pion/transport/vnet" "io" "io/ioutil" "math/rand" + "net/http" "os" "sync" "testing" @@ -73,7 +75,7 @@ func TestRtcBasic_PublishPlay(t *testing.T) { var resources []io.Closer defer func() { for _, resource := range resources { - resource.Close() + _ = resource.Close() } }() @@ -93,27 +95,19 @@ func TestRtcBasic_PublishPlay(t *testing.T) { defer wg.Done() defer cancel() - doInit := func() error { - playOK, vnetClientIP := *srsPlayOKPackets, *srsVnetClientIP + doInit := func() (err error) { streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int()) // Initialize player with private api. - if play, err := NewTestPlayer(nil, func(play *TestPlayer) error { + if thePlayer, err = NewTestPlayer(CreateApiForPlayer, func(play *TestPlayer) error { play.streamSuffix = streamSuffix resources = append(resources, play) - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - resources = append(resources, api) - play.api = api - var nnPlayWriteRTCP, nnPlayReadRTCP, nnPlayWriteRTP, nnPlayReadRTP uint64 - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + return play.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnPlayReadRTP++; nnPlayReadRTP >= uint64(playOK) { + if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) { cancel() // Completed. } logger.Tf(ctx, "Play rtp=(recv:%v, send:%v), rtcp=(recv:%v send:%v) packets", @@ -133,32 +127,19 @@ func TestRtcBasic_PublishPlay(t *testing.T) { return nn, err } })) - }); err != nil { - return err - } - - return nil + }) }); err != nil { return err - } else { - thePlayer = play } // Initialize publisher with private api. - if pub, err := NewTestPublisher(nil, func(pub *TestPublisher) error { + if thePublisher, err = NewTestPublisher(CreateApiForPublisher, func(pub *TestPublisher) error { pub.streamSuffix = streamSuffix pub.iceReadyCancel = publishReadyCancel resources = append(resources, pub) - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - resources = append(resources, api) - pub.api = api - var nnPubWriteRTCP, nnPubReadRTCP, nnPubWriteRTP, nnPubReadRTP uint64 - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + return pub.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { nn, attr, err := i.nextRTPReader.Read(buf, attributes) @@ -185,15 +166,9 @@ func TestRtcBasic_PublishPlay(t *testing.T) { return nn, err } })) - }); err != nil { - return err - } - - return nil + }) }); err != nil { return err - } else { - thePublisher = pub } // Init done. @@ -216,14 +191,10 @@ func TestRtcBasic_PublishPlay(t *testing.T) { select { case <-ctx.Done(): - return case <-mainReady.Done(): + r2 = thePublisher.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "pub done") } - - if err := thePublisher.Run(logger.WithContext(ctx), cancel); err != nil { - r2 = err - } - logger.Tf(ctx, "pub done") }() // Run player. @@ -234,14 +205,159 @@ func TestRtcBasic_PublishPlay(t *testing.T) { select { case <-ctx.Done(): - return case <-publishReady.Done(): + r3 = thePlayer.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "play done") + } + }() +} + +// When republish a stream, the player stream SHOULD be continuous. +func TestRtcBasic_Republish(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + + var r0, r1, r2, r3, r4 error + defer func(ctx context.Context) { + if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4); err != nil { + t.Errorf("Fail for err %+v", err) + } else { + logger.Tf(ctx, "test done with err %+v", err) + } + }(ctx) + + var resources []io.Closer + defer func() { + for _, resource := range resources { + _ = resource.Close() + } + }() + + var wg sync.WaitGroup + defer wg.Wait() + + // The event notify. + var thePublisher, theRepublisher *TestPublisher + var thePlayer *TestPlayer + + mainReady, mainReadyCancel := context.WithCancel(context.Background()) + publishReady, publishReadyCancel := context.WithCancel(context.Background()) + republishReady, republishReadyCancel := context.WithCancel(context.Background()) + + // Objects init. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + doInit := func() (err error) { + streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int()) + + // Initialize player with private api. + if thePlayer, err = NewTestPlayer(CreateApiForPlayer, func(play *TestPlayer) error { + play.streamSuffix = streamSuffix + resources = append(resources, play) + + var nnPlayReadRTP uint64 + return play.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { + api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { + i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + select { + case <-republishReady.Done(): + if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) { + cancel() // Completed. + } + logger.Tf(ctx, "Play recv rtp %v packets", nnPlayReadRTP) + default: + logger.Tf(ctx, "Play recv rtp packet before republish") + } + return i.nextRTPReader.Read(payload, attributes) + } + })) + }) + }); err != nil { + return err + } + + // Initialize publisher with private api. + if thePublisher, err = NewTestPublisher(CreateApiForPublisher, func(pub *TestPublisher) error { + pub.streamSuffix = streamSuffix + pub.iceReadyCancel = publishReadyCancel + resources = append(resources, pub) + + var nnPubReadRTCP uint64 + return pub.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + nn, attr, err := i.nextRTCPReader.Read(buf, attributes) + if nnPubReadRTCP++; nnPubReadRTCP > 0 && pub.cancel != nil { + pub.cancel() // We only cancel the publisher itself. + } + logger.Tf(ctx, "Publish recv rtcp %v packets", nnPubReadRTCP) + return nn, attr, err + } + })) + }) + }); err != nil { + return err + } + + // Initialize re-publisher with private api. + if theRepublisher, err = NewTestPublisher(CreateApiForPublisher, func(pub *TestPublisher) error { + pub.streamSuffix = streamSuffix + pub.iceReadyCancel = republishReadyCancel + resources = append(resources, pub) + + return pub.Setup(*srsVnetClientIP) + }); err != nil { + return err + } + + // Init done. + mainReadyCancel() + + <-ctx.Done() + return nil } - if err := thePlayer.Run(logger.WithContext(ctx), cancel); err != nil { - r3 = err + if err := doInit(); err != nil { + r1 = err + } + }() + + // Run publisher. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + select { + case <-ctx.Done(): + case <-mainReady.Done(): + pubCtx, pubCancel := context.WithCancel(ctx) + r2 = thePublisher.Run(logger.WithContext(pubCtx), pubCancel) + logger.Tf(ctx, "pub done, re-publish again") + + // Dispose the stream. + _ = thePublisher.Close() + + r4 = theRepublisher.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "re-pub done") + } + }() + + // Run player. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + select { + case <-ctx.Done(): + case <-publishReady.Done(): + r3 = thePlayer.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "play done") } - logger.Tf(ctx, "play done") }() } @@ -252,18 +368,8 @@ func TestRtcBasic_PublishPlay(t *testing.T) { // No.4 srs-server: ChangeCipherSpec, Finished func TestRtcDTLS_ClientActive_Default(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -273,7 +379,8 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -283,7 +390,7 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -316,18 +423,8 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) { // No.4 srs-bench: ChangeCipherSpec, Finished func TestRtcDTLS_ClientPassive_Default(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -337,7 +434,8 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -347,7 +445,7 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -377,18 +475,8 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) { // When srs-bench close the PC, it will send DTLS alert and might retransmit it. func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -398,7 +486,8 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -408,7 +497,7 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -445,18 +534,8 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { // When srs-bench close the PC, it will send DTLS alert and might retransmit it. func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -466,7 +545,8 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -476,7 +556,7 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -520,18 +600,8 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) { var r0 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -541,7 +611,8 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -551,7 +622,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -579,7 +650,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T lastClientHello = record nnClientHello++ - ok = (nnClientHello > nnMaxDrop) + ok = nnClientHello > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnClientHello, chunk, record, ok, len(c.UserData())) return }) @@ -607,18 +678,8 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) { var r0 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -628,7 +689,8 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -638,7 +700,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -666,7 +728,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. lastClientHello = record nnClientHello++ - ok = (nnClientHello > nnMaxDrop) + ok = nnClientHello > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnClientHello, chunk, record, ok, len(c.UserData())) return }) @@ -693,18 +755,8 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) { var r0, r1 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -714,7 +766,8 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -724,7 +777,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -761,7 +814,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T lastServerHello = record nnServerHello++ - ok = (nnServerHello > nnMaxDrop) + ok = nnServerHello > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnServerHello, chunk, record, ok, len(c.UserData())) return }) @@ -790,18 +843,8 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) { var r0, r1 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -811,7 +854,8 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -821,7 +865,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -858,7 +902,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. lastServerHello = record nnServerHello++ - ok = (nnServerHello > nnMaxDrop) + ok = nnServerHello > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnServerHello, chunk, record, ok, len(c.UserData())) return }) @@ -884,18 +928,8 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) { var r0 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -905,7 +939,8 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -915,7 +950,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -943,7 +978,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T lastCertificate = record nnCertificate++ - ok = (nnCertificate > nnMaxDrop) + ok = nnCertificate > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnCertificate, chunk, record, ok, len(c.UserData())) return }) @@ -970,18 +1005,8 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) { var r0 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -991,7 +1016,8 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -1001,7 +1027,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -1029,7 +1055,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. lastCertificate = record nnCertificate++ - ok = (nnCertificate > nnMaxDrop) + ok = nnCertificate > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnCertificate, chunk, record, ok, len(c.UserData())) return }) @@ -1056,18 +1082,8 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) { var r0, r1 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -1077,7 +1093,8 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -1087,7 +1104,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -1123,7 +1140,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test lastChangeCipherSepc = record nnCertificate++ - ok = (nnCertificate > nnMaxDrop) + ok = nnCertificate > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnCertificate, chunk, record, ok, len(c.UserData())) return }) @@ -1151,18 +1168,8 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) { var r0, r1 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1172,7 +1179,8 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -1182,7 +1190,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -1218,7 +1226,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes lastChangeCipherSepc = record nnCertificate++ - ok = (nnCertificate > nnMaxDrop) + ok = nnCertificate > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnCertificate, chunk, record, ok, len(c.UserData())) return }) @@ -1237,18 +1245,8 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes // Drop all DTLS packets when got ClientHello, to test the server ARQ thread cleanup. func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - vnetClientIP, dtlsDropPackets := *srsVnetClientIP, *srsDTLSDropPackets - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1258,7 +1256,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { nnDrop, dropAll := 0, false api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) { chunk, parsed := NewChunkMessageType(c) @@ -1275,7 +1274,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { return true } - if nnDrop++; nnDrop >= dtlsDropPackets { + if nnDrop++; nnDrop >= *srsDTLSDropPackets { cancel() // Done, server transmit 5 Client Hello. } @@ -1299,18 +1298,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { // Drop all DTLS packets when got ServerHello, to test the server ARQ thread cleanup. func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - vnetClientIP, dtlsDropPackets := *srsVnetClientIP, *srsDTLSDropPackets - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1320,7 +1309,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { nnDrop, dropAll := 0, false api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) { chunk, parsed := NewChunkMessageType(c) @@ -1337,7 +1327,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { return true } - if nnDrop++; nnDrop >= dtlsDropPackets { + if nnDrop++; nnDrop >= *srsDTLSDropPackets { cancel() // Done, server transmit 5 Client Hello. } @@ -1361,18 +1351,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { // Drop all DTLS packets when got Certificate, to test the server ARQ thread cleanup. func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - vnetClientIP, dtlsDropPackets := *srsVnetClientIP, *srsDTLSDropPackets - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1382,7 +1362,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { nnDrop, dropAll := 0, false api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) { chunk, parsed := NewChunkMessageType(c) @@ -1399,7 +1380,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { return true } - if nnDrop++; nnDrop >= dtlsDropPackets { + if nnDrop++; nnDrop >= *srsDTLSDropPackets { cancel() // Done, server transmit 5 Client Hello. } @@ -1423,18 +1404,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { // Drop all DTLS packets when got ChangeCipherSpec, to test the server ARQ thread cleanup. func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - vnetClientIP, dtlsDropPackets := *srsVnetClientIP, *srsDTLSDropPackets - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1444,7 +1415,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { nnDrop, dropAll := 0, false api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) { chunk, parsed := NewChunkMessageType(c) @@ -1461,7 +1433,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { return true } - if nnDrop++; nnDrop >= dtlsDropPackets { + if nnDrop++; nnDrop >= *srsDTLSDropPackets { cancel() // Done, server transmit 5 Client Hello. } @@ -1486,18 +1458,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { // which also consume about 750ms, but finally should be done successfully. func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP, dtlsDropPackets := *srsPublishOKPackets, *srsVnetClientIP, *srsDTLSDropPackets - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1507,7 +1469,8 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -1517,7 +1480,7 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -1545,7 +1508,7 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { } if chunk.IsCertificate() { - if nnDropCertificate >= dtlsDropPackets { + if nnDropCertificate >= *srsDTLSDropPackets { return true } nnDropCertificate++ @@ -1573,18 +1536,8 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { var r0 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1594,7 +1547,8 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -1604,7 +1558,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -1662,3 +1616,47 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { t.Errorf("err %+v", err) } } + +func TestRTCServerVersion(t *testing.T) { + api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer) + req, err := http.NewRequest("POST", api, nil) + if err != nil { + t.Errorf("Request %v", api) + return + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + t.Errorf("Do request %v", api) + return + } + + b, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("Read body of %v", api) + return + } + + obj := struct { + Code int `json:"code"` + Server string `json:"server"` + Data struct { + Major int `json:"major"` + Minor int `json:"minor"` + Revision int `json:"revision"` + Version string `json:"version"` + } `json:"data"` + }{} + if err := json.Unmarshal(b, &obj); err != nil { + t.Errorf("Parse %v", string(b)) + return + } + if obj.Code != 0 { + t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server) + return + } + if obj.Data.Major == 0 && obj.Data.Minor == 0 { + t.Errorf("Invalid version %v", obj.Data) + return + } +} diff --git a/trunk/3rdparty/srs-bench/srs/util.go b/trunk/3rdparty/srs-bench/srs/util.go index 941b0bb76..e8cb47dad 100644 --- a/trunk/3rdparty/srs-bench/srs/util.go +++ b/trunk/3rdparty/srs-bench/srs/util.go @@ -36,7 +36,6 @@ import ( "strconv" "strings" "sync" - "testing" "time" "github.com/ossrs/go-oryx-lib/errors" @@ -207,7 +206,7 @@ func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error logger.Tf(ctx, "Parse response to code=%v, session=%v, sdp=%v bytes", resBody.Code, resBody.Session, len(resBody.SDP)) - return string(resBody.SDP), nil + return resBody.SDP, nil } func escapeSDP(sdp string) string { @@ -219,7 +218,7 @@ func packageAsSTAPA(frames ...*h264reader.NAL) *h264reader.NAL { buf := bytes.Buffer{} buf.WriteByte( - byte(first.RefIdc<<5)&0x60 | byte(24), // STAP-A + first.RefIdc<<5&0x60 | byte(24), // STAP-A ) for _, frame := range frames { @@ -325,6 +324,14 @@ func filterTestError(errs ...error) error { if err == nil || errors.Cause(err) == context.Canceled { continue } + + // If url error, server maybe error, do not print the detail log. + if r0 := errors.Cause(err); r0 != nil { + if r1, ok := r0.(*url.Error); ok { + err = r1 + } + } + filteredErrors = append(filteredErrors, err) } @@ -352,13 +359,13 @@ func srsIsStun(b []byte) bool { // @see https://tools.ietf.org/html/rfc2246#section-6.2.1 // @see srs_is_dtls of https://github.com/ossrs/srs func srsIsDTLS(b []byte) bool { - return (len(b) >= 13 && (b[0] > 19 && b[0] < 64)) + return len(b) >= 13 && (b[0] > 19 && b[0] < 64) } // For RTP or RTCP, the V=2 which is in the high 2bits, 0xC0 (1100 0000) // @see srs_is_rtp_or_rtcp of https://github.com/ossrs/srs func srsIsRTPOrRTCP(b []byte) bool { - return (len(b) >= 12 && (b[0]&0xC0) == 0x80) + return len(b) >= 12 && (b[0]&0xC0) == 0x80 } // For RTCP, PT is [128, 223] (or without marker [0, 95]). @@ -554,7 +561,7 @@ func (v *DTLSRecord) Unmarshal(b []byte) error { return errors.Errorf("requires 13B only %v", len(b)) } - v.ContentType = DTLSContentType(uint8(b[0])) + v.ContentType = DTLSContentType(b[0]) v.Version = uint16(b[1])<<8 | uint16(b[2]) v.Epoch = uint16(b[3])<<8 | uint16(b[4]) v.SequenceNumber = uint64(b[5])<<40 | uint64(b[6])<<32 | uint64(b[7])<<24 | uint64(b[8])<<16 | uint64(b[9])<<8 | uint64(b[10]) @@ -605,11 +612,11 @@ func NewTestWebRTCAPI(options ...TestWebRTCAPIOptionFunc) (*TestWebRTCAPI, error func (v *TestWebRTCAPI) Close() error { if v.proxy != nil { - v.proxy.Close() + _ = v.proxy.Close() } if v.router != nil { - v.router.Stop() + _ = v.router.Stop() } return nil @@ -676,14 +683,24 @@ type TestPlayerOptionFunc func(p *TestPlayer) error type TestPlayer struct { pc *webrtc.PeerConnection receivers []*webrtc.RTPReceiver - // root api object + // We should dispose it. api *TestWebRTCAPI // Optional suffix for stream url. streamSuffix string } -func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) (*TestPlayer, error) { - v := &TestPlayer{api: api} +func CreateApiForPlayer(play *TestPlayer) error { + api, err := NewTestWebRTCAPI() + if err != nil { + return err + } + + play.api = api + return nil +} + +func NewTestPlayer(options ...TestPlayerOptionFunc) (*TestPlayer, error) { + v := &TestPlayer{} for _, opt := range options { if err := opt(v); err != nil { @@ -691,19 +708,24 @@ func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) (*TestPl } } - // The api might be override by options. - api = v.api - return v, nil } +func (v *TestPlayer) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error { + return v.api.Setup(vnetClientIP, options...) +} + func (v *TestPlayer) Close() error { if v.pc != nil { - v.pc.Close() + _ = v.pc.Close() } for _, receiver := range v.receivers { - receiver.Stop() + _ = receiver.Stop() + } + + if v.api != nil { + _ = v.api.Close() } return nil @@ -723,12 +745,16 @@ func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error { } v.pc = pc - pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionRecvonly, - }) - pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ + }); err != nil { + return errors.Wrapf(err, "add track") + } + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionRecvonly, - }) + }); err != nil { + return errors.Wrapf(err, "add track") + } offer, err := pc.CreateOffer(nil) if err != nil { @@ -818,16 +844,28 @@ type TestPublisher struct { aIngester *audioIngester vIngester *videoIngester pc *webrtc.PeerConnection - // root api object + // We should dispose it. api *TestWebRTCAPI // Optional suffix for stream url. streamSuffix string + // To cancel the publisher, pass by Run. + cancel context.CancelFunc } -func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (*TestPublisher, error) { +func CreateApiForPublisher(pub *TestPublisher) error { + api, err := NewTestWebRTCAPI() + if err != nil { + return err + } + + pub.api = api + return nil +} + +func NewTestPublisher(options ...TestPublisherOptionFunc) (*TestPublisher, error) { sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio - v := &TestPublisher{api: api} + v := &TestPublisher{} for _, opt := range options { if err := opt(v); err != nil { @@ -835,9 +873,6 @@ func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (* } } - // The api might be override by options. - api = v.api - // Create ingesters. if sourceAudio != "" { v.aIngester = NewAudioIngester(sourceAudio) @@ -847,6 +882,7 @@ func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (* } // Setup the interceptors for packets. + api := v.api api.options = append(api.options, func(api *TestWebRTCAPI) { // Filter for RTCP packets. rtcpInterceptor := &RTCPInterceptor{} @@ -870,17 +906,25 @@ func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (* return v, nil } +func (v *TestPublisher) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error { + return v.api.Setup(vnetClientIP, options...) +} + func (v *TestPublisher) Close() error { if v.vIngester != nil { - v.vIngester.Close() + _ = v.vIngester.Close() } if v.aIngester != nil { - v.aIngester.Close() + _ = v.aIngester.Close() } if v.pc != nil { - v.pc.Close() + _ = v.pc.Close() + } + + if v.api != nil { + _ = v.api.Close() } return nil @@ -892,6 +936,9 @@ func (v *TestPublisher) SetStreamSuffix(suffix string) *TestPublisher { } func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) error { + // Save the cancel. + v.cancel = cancel + r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) if v.streamSuffix != "" { r = fmt.Sprintf("%v-%v", r, v.streamSuffix) @@ -1012,11 +1059,17 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro <-ctx.Done() if v.aIngester != nil && v.aIngester.sAudioSender != nil { - v.aIngester.sAudioSender.Stop() + // We MUST wait for the ingester ready(or closed), because it might crash if sender is disposed. + <-v.aIngester.ready.Done() + + _ = v.aIngester.Close() } if v.vIngester != nil && v.vIngester.sVideoSender != nil { - v.vIngester.sVideoSender.Stop() + // We MUST wait for the ingester ready(or closed), because it might crash if sender is disposed. + <-v.vIngester.ready.Done() + + _ = v.vIngester.Close() } }() @@ -1028,6 +1081,7 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro if v.aIngester == nil { return } + defer v.aIngester.readyCancel() select { case <-ctx.Done(): @@ -1072,6 +1126,7 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro if v.vIngester == nil { return } + defer v.vIngester.readyCancel() select { case <-ctx.Done(): @@ -1119,47 +1174,3 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro } return ctx.Err() } - -func TestRTCServerVersion(t *testing.T) { - api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer) - req, err := http.NewRequest("POST", api, nil) - if err != nil { - t.Errorf("Request %v", api) - return - } - - res, err := http.DefaultClient.Do(req) - if err != nil { - t.Errorf("Do request %v", api) - return - } - - b, err := ioutil.ReadAll(res.Body) - if err != nil { - t.Errorf("Read body of %v", api) - return - } - - obj := struct { - Code int `json:"code"` - Server string `json:"server"` - Data struct { - Major int `json:"major"` - Minor int `json:"minor"` - Revision int `json:"revision"` - Version string `json:"version"` - } `json:"data"` - }{} - if err := json.Unmarshal(b, &obj); err != nil { - t.Errorf("Parse %v", string(b)) - return - } - if obj.Code != 0 { - t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server) - return - } - if obj.Data.Major == 0 && obj.Data.Minor == 0 { - t.Errorf("Invalid version %v", obj.Data) - return - } -} diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index e6afab909..8e5928a06 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -605,7 +605,7 @@ srs_error_t SrsRtcPlayStream::send_packet(SrsRtpPacket2*& pkt) // TODO: FIXME: Maybe refine for performance issue. if (!audio_tracks_.count(pkt->header.get_ssrc()) && !video_tracks_.count(pkt->header.get_ssrc())) { - srs_warn("ssrc %u not found", pkt->header.get_ssrc()); + srs_warn("RTC: Drop for ssrc %u not found", pkt->header.get_ssrc()); return err; }