diff --git a/trunk/3rdparty/srs-bench/srs/rtc_test.go b/trunk/3rdparty/srs-bench/srs/rtc_test.go index 506aca650..ea1c39bed 100644 --- a/trunk/3rdparty/srs-bench/srs/rtc_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtc_test.go @@ -859,6 +859,9 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { // // @remark The pion is active, so it can be consider a benchmark for DTLS server. func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0 error err := func() error { streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) @@ -872,7 +875,6 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T } defer p.Close() - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { @@ -921,7 +923,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T return p.Run(ctx, cancel) }() - if err := filterTestError(err, r0); err != nil { + if err := filterTestError(ctx.Err(), err, r0); err != nil { t.Errorf("err %+v", err) } } @@ -937,6 +939,9 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T // openssl will create a new ClientHello with increased sequence. It's ok, but waste a lots of duplicated ClientHello // packets, so we fail the test, requires the epoch+sequence never dup, even for ARQ. func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0 error err := func() error { streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) @@ -950,7 +955,6 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. } defer p.Close() - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { @@ -999,7 +1003,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. return p.Run(ctx, cancel) }() - if err := filterTestError(err, r0); err != nil { + if err := filterTestError(ctx.Err(), err, r0); err != nil { t.Errorf("err %+v", err) } } @@ -1014,6 +1018,9 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. // // @remark The pion is active, so it can be consider a benchmark for DTLS server. func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0, r1 error err := func() error { streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) @@ -1027,7 +1034,6 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T } defer p.Close() - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { @@ -1085,7 +1091,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T return p.Run(ctx, cancel) }() - if err := filterTestError(err, r0, r1); err != nil { + if err := filterTestError(ctx.Err(), err, r0, r1); err != nil { t.Errorf("err %+v", err) } } @@ -1102,6 +1108,9 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T // openssl will create a new ClientHello with increased sequence. It's ok, but waste a lots of duplicated ClientHello // packets, so we fail the test, requires the epoch+sequence never dup, even for ARQ. func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0, r1 error err := func() error { streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) @@ -1115,7 +1124,6 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. } defer p.Close() - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { @@ -1173,7 +1181,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. return p.Run(ctx, cancel) }() - if err := filterTestError(err, r0, r1); err != nil { + if err := filterTestError(ctx.Err(), err, r0, r1); err != nil { t.Errorf("err %+v", err) } } @@ -1187,6 +1195,9 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. // // @remark The pion is active, so it can be consider a benchmark for DTLS server. func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0 error err := func() error { streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int()) @@ -1200,7 +1211,6 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T } defer p.Close() - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { @@ -1249,7 +1259,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T return p.Run(ctx, cancel) }() - if err := filterTestError(err, r0); err != nil { + if err := filterTestError(ctx.Err(), err, r0); err != nil { t.Errorf("err %+v", err) } } @@ -1264,6 +1274,9 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T // @remark If retransmit the Certificate, with the same epoch+sequence, peer will drop the message. It's ok right now, but // wast some packets, so we check the epoch+sequence which should never dup, even for ARQ. func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0 error err := func() error { streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int()) @@ -1277,7 +1290,6 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. } defer p.Close() - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { @@ -1326,7 +1338,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. return p.Run(ctx, cancel) }() - if err := filterTestError(err, r0); err != nil { + if err := filterTestError(ctx.Err(), err, r0); err != nil { t.Errorf("err %+v", err) } } @@ -1341,6 +1353,9 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. // // @remark The pion is active, so it can be consider a benchmark for DTLS server. func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0, r1 error err := func() error { streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int()) @@ -1354,7 +1369,6 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test } defer p.Close() - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { @@ -1411,7 +1425,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test return p.Run(ctx, cancel) }() - if err := filterTestError(err, r0, r1); err != nil { + if err := filterTestError(ctx.Err(), err, r0, r1); err != nil { t.Errorf("err %+v", err) } } @@ -1427,6 +1441,9 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test // @remark If retransmit the Certificate, with the same epoch+sequence, peer will drop the message, and never generate the // ChangeCipherSpec, which will cause DTLS fail. func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0, r1 error err := func() error { streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int()) @@ -1440,7 +1457,6 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes } defer p.Close() - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { @@ -1497,7 +1513,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes return p.Run(ctx, cancel) }() - if err := filterTestError(err, r0, r1); err != nil { + if err := filterTestError(ctx.Err(), err, r0, r1); err != nil { t.Errorf("err %+v", err) } } @@ -1795,6 +1811,9 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { // If we retransmit 2 ClientHello packets, consumed 150ms, server might wait at 200ms. // Then we retransmit the Certificate, server reset the timer and retransmit it in 50ms, not 200ms. func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0 error err := func() error { streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) @@ -1808,7 +1827,6 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { } defer p.Close() - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { @@ -1873,7 +1891,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { return p.Run(ctx, cancel) }() - if err := filterTestError(err, r0); err != nil { + if err := filterTestError(ctx.Err(), err, r0); err != nil { t.Errorf("err %+v", err) } } diff --git a/trunk/3rdparty/srs-bench/srs/rtmp_test.go b/trunk/3rdparty/srs-bench/srs/rtmp_test.go index a9bc8c015..213172421 100644 --- a/trunk/3rdparty/srs-bench/srs/rtmp_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtmp_test.go @@ -25,6 +25,7 @@ import ( "fmt" "github.com/ossrs/go-oryx-lib/avc" "github.com/ossrs/go-oryx-lib/flv" + "github.com/pion/interceptor" "math/rand" "os" "sync" @@ -36,6 +37,9 @@ import ( ) func TestRtmpPublishPlay(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0, r1 error err := func() error { publisher := NewRTMPPublisher() @@ -45,7 +49,6 @@ func TestRtmpPublishPlay(t *testing.T) { defer player.Close() // Connect to RTMP URL. - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int()) rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix) @@ -92,12 +95,100 @@ func TestRtmpPublishPlay(t *testing.T) { return nil }() - if err := filterTestError(err, r0, r1); err != nil { + if err := filterTestError(ctx.Err(), err, r0, r1); err != nil { + t.Errorf("err %+v", err) + } +} + +func TestRtmpPublish_RtcPlay(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + + var r0, r1 error + err := func() (err error) { + streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int()) + rtmpUrl := fmt.Sprintf("%v://%v%v-%v", srsSchema, *srsServer, *srsStream, streamSuffix) + + // Publisher connect to a RTMP stream. + publisher := NewRTMPPublisher() + defer publisher.Close() + + if err := publisher.Publish(ctx, rtmpUrl); err != nil { + return err + } + + // Setup the RTC player. + var thePlayer *testPlayer + if thePlayer, err = newTestPlayer(registerMiniCodecs, func(play *testPlayer) error { + play.streamSuffix = streamSuffix + var nnPlayReadRTP uint64 + return play.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { + api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) { + i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + nn, attr, err := i.nextRTPReader.Read(payload, attributes) + if err == nil { + if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) { + cancel() // Completed. + } + logger.Tf(ctx, "Play RECV RTP #%v %vB", nnPlayReadRTP, nn) + } + return nn, attr, err + } + })) + }) + }); err != nil { + return err + } + defer thePlayer.Close() + + // Run publisher and players. + var wg sync.WaitGroup + defer wg.Wait() + + var playerIceReady context.Context + playerIceReady, thePlayer.iceReadyCancel = context.WithCancel(ctx) + + wg.Add(1) + go func() { + defer wg.Done() + if r1 = thePlayer.Run(logger.WithContext(ctx), cancel); r1 != nil { + cancel() + } + logger.Tf(ctx, "player done") + }() + + wg.Add(1) + go func() { + defer wg.Done() + + // Wait for player ready. + select { + case <-ctx.Done(): + return + case <-playerIceReady.Done(): + } + + publisher.onSendPacket = func(m *rtmp.Message) error { + time.Sleep(100 * time.Microsecond) + return nil + } + if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil { + cancel() + } + logger.Tf(ctx, "publisher done") + }() + + return nil + }() + if err := filterTestError(ctx.Err(), err, r0, r1); err != nil { t.Errorf("err %+v", err) } } func TestRtmpPublish_MultipleSequences(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + var r0, r1, r2 error err := func() error { publisher := NewRTMPPublisher() @@ -107,7 +198,6 @@ func TestRtmpPublish_MultipleSequences(t *testing.T) { defer player.Close() // Connect to RTMP URL. - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) streamSuffix := fmt.Sprintf("rtmp-multi-spspps-%v-%v", os.Getpid(), rand.Int()) rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix) @@ -148,7 +238,7 @@ func TestRtmpPublish_MultipleSequences(t *testing.T) { logger.Tf(ctx, "got %v sps/pps, %v %vms %vB, sps=%v, pps=%v, %v, %v", nnPackets, m.MessageType, m.Timestamp, len(m.Payload), len(avccr.SequenceParameterSetNALUnits), len(avccr.PictureParameterSetNALUnits), avccr.AVCProfileIndication, avccr.AVCLevelIndication) - if nnPackets++; nnPackets >=2 { + if nnPackets++; nnPackets >= 2 { cancel() } return nil @@ -186,7 +276,7 @@ func TestRtmpPublish_MultipleSequences(t *testing.T) { return nil }() - if err := filterTestError(err, r0, r1, r2); err != nil { + if err := filterTestError(ctx.Err(), err, r0, r1, r2); err != nil { t.Errorf("err %+v", err) } } diff --git a/trunk/3rdparty/srs-bench/srs/util.go b/trunk/3rdparty/srs-bench/srs/util.go index d6877b55e..4e21c302c 100644 --- a/trunk/3rdparty/srs-bench/srs/util.go +++ b/trunk/3rdparty/srs-bench/srs/util.go @@ -749,10 +749,11 @@ func (v *testWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (* type testPlayerOptionFunc func(p *testPlayer) error type testPlayer struct { - onOffer func(s *webrtc.SessionDescription) error - onAnswer func(s *webrtc.SessionDescription) error - pc *webrtc.PeerConnection - receivers []*webrtc.RTPReceiver + onOffer func(s *webrtc.SessionDescription) error + onAnswer func(s *webrtc.SessionDescription) error + iceReadyCancel context.CancelFunc + pc *webrtc.PeerConnection + receivers []*webrtc.RTPReceiver // We should dispose it. api *testWebRTCAPI // Optional suffix for stream url. @@ -910,8 +911,20 @@ func (v *testPlayer) Run(ctx context.Context, cancel context.CancelFunc) error { }) pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { - if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed { - err = errors.Errorf("Close for ICE state %v", state) + logger.Tf(ctx, "ICE state %v", state) + }) + + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + logger.Tf(ctx, "PC state %v", state) + + if state == webrtc.PeerConnectionStateConnected { + if v.iceReadyCancel != nil { + v.iceReadyCancel() + } + } + + if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed { + err = errors.Errorf("Close for PC state %v", state) cancel() } })