diff --git a/trunk/3rdparty/srs-bench/LICENSE b/trunk/3rdparty/srs-bench/LICENSE index 77ba5769d..1cdf14566 100644 --- a/trunk/3rdparty/srs-bench/LICENSE +++ b/trunk/3rdparty/srs-bench/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2021 srs-bench(ossrs) +Copyright (c) 2021 Winlin Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/trunk/3rdparty/srs-bench/README.md b/trunk/3rdparty/srs-bench/README.md index 415cb195f..6b903692c 100644 --- a/trunk/3rdparty/srs-bench/README.md +++ b/trunk/3rdparty/srs-bench/README.md @@ -142,6 +142,12 @@ go test ./srs -mod=vendor -v -srs-server=127.0.0.1 make && ./objs/srs_test -test.v -srs-server=127.0.0.1 ``` +可以只运行某个用例,并打印详细日志,比如: + +```bash +make && ./objs/srs_test -test.v -srs-log -test.run TestRtcBasic_PublishPlay +``` + 支持的参数如下: * `-srs-server`,RTC服务器地址。默认值:`127.0.0.1` diff --git a/trunk/3rdparty/srs-bench/auto/sync_vnet.sh b/trunk/3rdparty/srs-bench/auto/sync_vnet.sh new file mode 100755 index 000000000..55ef15f1a --- /dev/null +++ b/trunk/3rdparty/srs-bench/auto/sync_vnet.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +FILES=(udpproxy.go udpproxy_test.go) +for file in ${FILES[@]}; do + echo "cp vnet/udpproxy.go ~/git/transport/vnet/" && + cp vnet/udpproxy.go ~/git/transport/vnet/ +done + +# https://github.com/pion/webrtc/wiki/Contributing#run-all-automated-tests-and-checks-before-submitting +cd ~/git/transport/ + +echo ".github/lint-commit-message.sh" && +.github/lint-commit-message.sh && +echo ".github/assert-contributors.sh" && +.github/assert-contributors.sh && +echo ".github/lint-disallowed-functions-in-library.sh" && +.github/lint-disallowed-functions-in-library.sh && +echo ".github/lint-filename.sh" && +.github/lint-filename.sh +if [[ $? -ne 0 ]]; then echo "fail"; exit -1; fi + +# https://github.com/pion/webrtc/wiki/Contributing#run-all-automated-tests-and-checks-before-submitting +cd ~/git/transport/vnet/ + +echo "go test -race ./..." && +go test -race ./... +if [[ $? -ne 0 ]]; then echo "fail"; exit -1; fi + +echo "golangci-lint run --skip-files conn_map_test.go" && +golangci-lint run --skip-files conn_map_test.go +if [[ $? -ne 0 ]]; then echo "fail"; exit -1; fi + +echo "OK" diff --git a/trunk/3rdparty/srs-bench/main.go b/trunk/3rdparty/srs-bench/main.go index d56fa4995..8b83c4dcb 100644 --- a/trunk/3rdparty/srs-bench/main.go +++ b/trunk/3rdparty/srs-bench/main.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in diff --git a/trunk/3rdparty/srs-bench/srs/ingester.go b/trunk/3rdparty/srs-bench/srs/ingester.go index 1e3161a89..f38409e59 100644 --- a/trunk/3rdparty/srs-bench/srs/ingester.go +++ b/trunk/3rdparty/srs-bench/srs/ingester.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in @@ -22,6 +22,11 @@ package srs import ( "context" + "io" + "os" + "strings" + "time" + "github.com/ossrs/go-oryx-lib/errors" "github.com/ossrs/go-oryx-lib/logger" "github.com/pion/interceptor" @@ -31,10 +36,6 @@ import ( "github.com/pion/webrtc/v3/pkg/media" "github.com/pion/webrtc/v3/pkg/media/h264reader" "github.com/pion/webrtc/v3/pkg/media/oggreader" - "io" - "os" - "strings" - "time" ) type videoIngester struct { diff --git a/trunk/3rdparty/srs-bench/srs/interceptor.go b/trunk/3rdparty/srs-bench/srs/interceptor.go index d853aaf7d..9757b705c 100644 --- a/trunk/3rdparty/srs-bench/srs/interceptor.go +++ b/trunk/3rdparty/srs-bench/srs/interceptor.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in @@ -31,14 +31,13 @@ type RTPInterceptorOptionFunc func(i *RTPInterceptor) // Common RTP packet interceptor for benchmark. // @remark Should never merge with RTCPInterceptor, because they has the same Write interface. type RTPInterceptor struct { - localInfo *interceptor.StreamInfo - remoteInfo *interceptor.StreamInfo // If rtpReader is nil, use the default next one to read. rtpReader interceptor.RTPReaderFunc nextRTPReader interceptor.RTPReader // If rtpWriter is nil, use the default next one to write. rtpWriter interceptor.RTPWriterFunc nextRTPWriter interceptor.RTPWriter + // Other common fields. BypassInterceptor } @@ -51,11 +50,6 @@ func NewRTPInterceptor(options ...RTPInterceptorOptionFunc) *RTPInterceptor { } func (v *RTPInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { - if v.localInfo != nil { - return writer // Only handle one stream. - } - - v.localInfo = info v.nextRTPWriter = writer return v // Handle all RTP } @@ -68,17 +62,9 @@ func (v *RTPInterceptor) Write(header *rtp.Header, payload []byte, attributes in } func (v *RTPInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { - if v.localInfo == nil || v.localInfo.ID != info.ID { - return - } - v.localInfo = nil // Reset the interceptor. } func (v *RTPInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { - if v.remoteInfo != nil { - return reader // Only handle one stream. - } - v.nextRTPReader = reader return v // Handle all RTP } @@ -91,10 +77,6 @@ func (v *RTPInterceptor) Read(b []byte, a interceptor.Attributes) (int, intercep } func (v *RTPInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) { - if v.remoteInfo == nil || v.remoteInfo.ID != info.ID { - return - } - v.remoteInfo = nil } type RTCPInterceptorOptionFunc func(i *RTCPInterceptor) @@ -108,6 +90,7 @@ type RTCPInterceptor struct { // If rtcpWriter is nil, use the default next one to write. rtcpWriter interceptor.RTCPWriterFunc nextRTCPWriter interceptor.RTCPWriter + // Other common fields. BypassInterceptor } diff --git a/trunk/3rdparty/srs-bench/srs/player.go b/trunk/3rdparty/srs-bench/srs/player.go index 0947ad41c..8dc91d030 100644 --- a/trunk/3rdparty/srs-bench/srs/player.go +++ b/trunk/3rdparty/srs-bench/srs/player.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in @@ -23,6 +23,10 @@ package srs import ( "context" "fmt" + "strings" + "sync" + "time" + "github.com/ossrs/go-oryx-lib/errors" "github.com/ossrs/go-oryx-lib/logger" "github.com/pion/interceptor" @@ -33,9 +37,6 @@ import ( "github.com/pion/webrtc/v3/pkg/media/h264writer" "github.com/pion/webrtc/v3/pkg/media/ivfwriter" "github.com/pion/webrtc/v3/pkg/media/oggwriter" - "strings" - "sync" - "time" ) // @see https://github.com/pion/webrtc/blob/master/examples/save-to-disk/main.go diff --git a/trunk/3rdparty/srs-bench/srs/publisher.go b/trunk/3rdparty/srs-bench/srs/publisher.go index 8d38fb055..49abab72a 100644 --- a/trunk/3rdparty/srs-bench/srs/publisher.go +++ b/trunk/3rdparty/srs-bench/srs/publisher.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in @@ -22,14 +22,15 @@ package srs import ( "context" + "io" + "sync" + "time" + "github.com/ossrs/go-oryx-lib/errors" "github.com/ossrs/go-oryx-lib/logger" "github.com/pion/interceptor" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" - "io" - "sync" - "time" ) // @see https://github.com/pion/webrtc/blob/master/examples/play-from-disk/main.go diff --git a/trunk/3rdparty/srs-bench/srs/rtc_test.go b/trunk/3rdparty/srs-bench/srs/rtc_test.go index 4ac869c42..62ad26663 100644 --- a/trunk/3rdparty/srs-bench/srs/rtc_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtc_test.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in @@ -23,19 +23,39 @@ package srs import ( "context" "fmt" - "github.com/ossrs/go-oryx-lib/errors" - "github.com/ossrs/go-oryx-lib/logger" - "github.com/pion/interceptor" - "github.com/pion/rtcp" - "github.com/pion/rtp" "github.com/pion/transport/vnet" + "io" + "io/ioutil" "math/rand" "os" "sync" "testing" "time" + + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" + "github.com/pion/interceptor" + "github.com/pion/rtcp" + "github.com/pion/rtp" ) +func TestMain(m *testing.M) { + if err := prepareTest(); err != nil { + logger.Ef(nil, "Prepare test fail, err %+v", err) + os.Exit(-1) + } + + // Disable the logger during all tests. + if *srsLog == false { + olw := logger.Switch(ioutil.Discard) + defer func() { + logger.Switch(olw) + }() + } + + os.Exit(m.Run()) +} + // Basic use scenario, publish a stream, then play it. func TestRtcBasic_PublishPlay(t *testing.T) { ctx := logger.WithContext(context.Background()) @@ -50,12 +70,20 @@ func TestRtcBasic_PublishPlay(t *testing.T) { } }(ctx) + var resources []io.Closer + defer func() { + for _, resource := range resources { + resource.Close() + } + }() + var wg sync.WaitGroup defer wg.Wait() // The event notify. var thePublisher *TestPublisher var thePlayer *TestPlayer + mainReady, mainReadyCancel := context.WithCancel(context.Background()) publishReady, publishReadyCancel := context.WithCancel(context.Background()) @@ -66,76 +94,110 @@ func TestRtcBasic_PublishPlay(t *testing.T) { defer cancel() doInit := func() error { - playOK := *srsPlayOKPackets - vnetClientIP := *srsVnetClientIP + playOK, vnetClientIP := *srsPlayOKPackets, *srsVnetClientIP + streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int()) - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { + // Initialize player with private api. + if play, err := NewTestPlayer(nil, func(play *TestPlayer) error { + play.streamSuffix = streamSuffix + resources = append(resources, play) + + api, err := NewTestWebRTCAPI() + if err != nil { + return err + } + resources = append(resources, api) + play.api = api + + var nnPlayWriteRTCP, nnPlayReadRTCP, nnPlayWriteRTP, nnPlayReadRTP uint64 + if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { + i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnPlayReadRTP++; nnPlayReadRTP >= uint64(playOK) { + cancel() // Completed. + } + logger.Tf(ctx, "Play rtp=(recv:%v, send:%v), rtcp=(recv:%v send:%v) packets", + nnPlayReadRTP, nnPlayWriteRTP, nnPlayReadRTCP, nnPlayWriteRTCP) + return i.nextRTPReader.Read(payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + nn, attr, err := i.nextRTCPReader.Read(buf, attributes) + nnPlayReadRTCP++ + return nn, attr, err + } + i.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { + nn, err := i.nextRTCPWriter.Write(pkts, attributes) + nnPlayWriteRTCP++ + return nn, err + } + })) + }); err != nil { + return err + } + + return nil + }); err != nil { return err + } else { + thePlayer = play } - defer api.Close() - - streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int()) - play := NewTestPlayer(api, func(play *TestPlayer) { - play.streamSuffix = streamSuffix - }) - defer play.Close() - pub := NewTestPublisher(api, func(pub *TestPublisher) { + // Initialize publisher with private api. + if pub, err := NewTestPublisher(nil, func(pub *TestPublisher) error { pub.streamSuffix = streamSuffix pub.iceReadyCancel = publishReadyCancel - }) - defer pub.Close() - - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nnWriteRTP, nnReadRTP, nnWriteRTCP, nnReadRTCP int64 - api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { - i.rtpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - nn, attr, err := i.nextRTPReader.Read(buf, attributes) - nnReadRTP++ - return nn, attr, err - } - i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - nn, err := i.nextRTPWriter.Write(header, payload, attributes) + resources = append(resources, pub) - nnWriteRTP++ - logger.Tf(ctx, "publish rtp=(read:%v write:%v), rtcp=(read:%v write:%v) packets", - nnReadRTP, nnWriteRTP, nnReadRTCP, nnWriteRTCP) - return nn, err - } - })) - api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { - i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - nn, attr, err := i.nextRTCPReader.Read(buf, attributes) - nnReadRTCP++ - return nn, attr, err - } - i.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { - nn, err := i.nextRTCPWriter.Write(pkts, attributes) - nnWriteRTCP++ - return nn, err - } - })) - }, func(api *TestWebRTCAPI) { - var nn uint64 - api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { - i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nn++; nn >= uint64(playOK) { - cancel() // Completed. + api, err := NewTestWebRTCAPI() + if err != nil { + return err + } + resources = append(resources, api) + pub.api = api + + var nnPubWriteRTCP, nnPubReadRTCP, nnPubWriteRTP, nnPubReadRTP uint64 + if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { + i.rtpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + nn, attr, err := i.nextRTPReader.Read(buf, attributes) + nnPubReadRTP++ + return nn, attr, err } - logger.Tf(ctx, "play got %v packets", nn) - return i.nextRTPReader.Read(payload, attributes) - } - })) + i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + nn, err := i.nextRTPWriter.Write(header, payload, attributes) + nnPubWriteRTP++ + logger.Tf(ctx, "Publish rtp=(recv:%v, send:%v), rtcp=(recv:%v send:%v) packets", + nnPubReadRTP, nnPubWriteRTP, nnPubReadRTCP, nnPubWriteRTCP) + return nn, err + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + nn, attr, err := i.nextRTCPReader.Read(buf, attributes) + nnPubReadRTCP++ + return nn, attr, err + } + i.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { + nn, err := i.nextRTCPWriter.Write(pkts, attributes) + nnPubWriteRTCP++ + return nn, err + } + })) + }); err != nil { + return err + } + + return nil }); err != nil { return err + } else { + thePublisher = pub } - // Set the available objects. + // Init done. mainReadyCancel() - thePublisher = pub - thePlayer = play <-ctx.Done() return nil @@ -158,17 +220,10 @@ func TestRtcBasic_PublishPlay(t *testing.T) { case <-mainReady.Done(): } - doPublish := func() error { - if err := thePublisher.Run(logger.WithContext(ctx), cancel); err != nil { - return err - } - - logger.Tf(ctx, "pub done") - return nil - } - if err := doPublish(); err != nil { + if err := thePublisher.Run(logger.WithContext(ctx), cancel); err != nil { r2 = err } + logger.Tf(ctx, "pub done") }() // Run player. @@ -177,30 +232,16 @@ func TestRtcBasic_PublishPlay(t *testing.T) { defer wg.Done() defer cancel() - select { - case <-ctx.Done(): - return - case <-mainReady.Done(): - } - select { case <-ctx.Done(): return case <-publishReady.Done(): } - doPlay := func() error { - if err := thePlayer.Run(logger.WithContext(ctx), cancel); err != nil { - return err - } - - logger.Tf(ctx, "play done") - return nil - } - if err := doPlay(); err != nil { + if err := thePlayer.Run(logger.WithContext(ctx), cancel); err != nil { r3 = err } - + logger.Tf(ctx, "play done") }() } @@ -222,21 +263,31 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) { defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -276,21 +327,31 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) { defer api.Close() streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -327,21 +388,31 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { defer api.Close() streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -385,21 +456,31 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { defer api.Close() streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -450,21 +531,31 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T defer api.Close() streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -527,21 +618,31 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -603,21 +704,31 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T defer api.Close() streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -690,21 +801,31 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -774,21 +895,31 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T defer api.Close() streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -850,21 +981,31 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -926,21 +1067,31 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test defer api.Close() streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -1011,21 +1162,31 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -1087,10 +1248,14 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { @@ -1145,10 +1310,14 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { @@ -1203,10 +1372,14 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { @@ -1261,10 +1434,14 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { @@ -1320,21 +1497,31 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { @@ -1397,21 +1584,31 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { defer api.Close() streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p := NewTestPublisher(api, func(p *TestPublisher) { + p, err := NewTestPublisher(api, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive + return nil }) + if err != nil { + return err + } defer p.Close() if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { - var nn int64 + var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - if nn++; nn >= int64(publishOK) { + nnRTP++ + return i.nextRTPWriter.Write(header, payload, attributes) + } + })) + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { cancel() // Send enough packets, done. } - logger.Tf(ctx, "publish write %v packets", nn) - return i.nextRTPWriter.Write(header, payload, attributes) + logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) + return i.nextRTCPReader.Read(buf, attributes) } })) }, func(api *TestWebRTCAPI) { diff --git a/trunk/3rdparty/srs-bench/srs/stat.go b/trunk/3rdparty/srs-bench/srs/stat.go index 3ca7ed79a..ef83fe78d 100644 --- a/trunk/3rdparty/srs-bench/srs/stat.go +++ b/trunk/3rdparty/srs-bench/srs/stat.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in @@ -23,9 +23,10 @@ package srs import ( "context" "encoding/json" - "github.com/ossrs/go-oryx-lib/logger" "net/http" "strings" + + "github.com/ossrs/go-oryx-lib/logger" ) type statRTC struct { diff --git a/trunk/3rdparty/srs-bench/srs/util.go b/trunk/3rdparty/srs-bench/srs/util.go index 8c5a5434d..941b0bb76 100644 --- a/trunk/3rdparty/srs-bench/srs/util.go +++ b/trunk/3rdparty/srs-bench/srs/util.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in @@ -24,21 +24,113 @@ import ( "bytes" "context" "encoding/json" + "flag" "fmt" - "github.com/ossrs/go-oryx-lib/errors" - "github.com/ossrs/go-oryx-lib/logger" - "github.com/pion/transport/vnet" - "github.com/pion/webrtc/v3" - "github.com/pion/webrtc/v3/pkg/media/h264reader" + "io" "io/ioutil" "net" "net/http" "net/url" + "os" + "path" "strconv" "strings" + "sync" + "testing" "time" + + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" + vnet_proxy "github.com/ossrs/srs-bench/vnet" + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/transport/vnet" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media/h264reader" ) +var srsHttps *bool +var srsLog *bool + +var srsTimeout *int +var srsPlayPLI *int +var srsPlayOKPackets *int +var srsPublishOKPackets *int +var srsPublishVideoFps *int +var srsDTLSDropPackets *int + +var srsSchema string +var srsServer *string +var srsStream *string +var srsPublishAudio *string +var srsPublishVideo *string +var srsVnetClientIP *string + +func prepareTest() error { + var err error + + srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API") + srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to") + srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play") + srsLog = flag.Bool("srs-log", false, "Whether enable the detail log") + srsTimeout = flag.Int("srs-timeout", 5000, "For each case, the timeout in ms") + srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.") + srsPlayOKPackets = flag.Int("srs-play-ok-packets", 10, "If recv N RTP packets, it's ok, or fail") + srsPublishOKPackets = flag.Int("srs-publish-ok-packets", 3, "If send N RTP, recv N RTCP packets, it's ok, or fail") + srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.") + srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.") + srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.") + srsVnetClientIP = flag.String("srs-vnet-client-ip", "192.168.168.168", "The client ip in pion/vnet.") + srsDTLSDropPackets = flag.Int("srs-dtls-drop-packets", 5, "If dropped N packets, it's ok, or fail") + + // Should parse it first. + flag.Parse() + + // The stream should starts with /, for example, /rtc/regression + if !strings.HasPrefix(*srsStream, "/") { + *srsStream = "/" + *srsStream + } + + // Generate srs protocol from whether use HTTPS. + srsSchema = "http" + if *srsHttps { + srsSchema = "https" + } + + // Check file. + tryOpenFile := func(filename string) (string, error) { + if filename == "" { + return filename, nil + } + + f, err := os.Open(filename) + if err != nil { + nfilename := path.Join("../", filename) + f2, err := os.Open(nfilename) + if err != nil { + return filename, errors.Wrapf(err, "No video file at %v or %v", filename, nfilename) + } + defer f2.Close() + + return nfilename, nil + } + defer f.Close() + + return filename, nil + } + + if *srsPublishVideo, err = tryOpenFile(*srsPublishVideo); err != nil { + return err + } + + if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil { + return err + } + + return nil +} + func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error) { u, err := url.Parse(r) if err != nil { @@ -367,7 +459,11 @@ type ChunkMessageType struct { func (v *ChunkMessageType) String() string { if v.chunk == ChunkTypeDTLS { - return fmt.Sprintf("%v-%v-%v", v.chunk, v.content, v.handshake) + if v.content == DTLSContentTypeHandshake { + return fmt.Sprintf("%v-%v-%v", v.chunk, v.content, v.handshake) + } else { + return fmt.Sprintf("%v-%v", v.chunk, v.content) + } } return fmt.Sprintf("%v", v.chunk) } @@ -466,3 +562,604 @@ func (v *DTLSRecord) Unmarshal(b []byte) error { v.Data = b[13:] return nil } + +type TestWebRTCAPIOptionFunc func(api *TestWebRTCAPI) + +type TestWebRTCAPI struct { + // The options to setup the api. + options []TestWebRTCAPIOptionFunc + // The api and settings. + api *webrtc.API + mediaEngine *webrtc.MediaEngine + registry *interceptor.Registry + settingEngine *webrtc.SettingEngine + // The vnet router, can be shared by different apis, but we do not share it. + router *vnet.Router + // The network for api. + network *vnet.Net + // The vnet UDP proxy bind to the router. + proxy *vnet_proxy.UDPProxy +} + +func NewTestWebRTCAPI(options ...TestWebRTCAPIOptionFunc) (*TestWebRTCAPI, error) { + v := &TestWebRTCAPI{} + + v.mediaEngine = &webrtc.MediaEngine{} + if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil { + return nil, err + } + + v.registry = &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(v.mediaEngine, v.registry); err != nil { + return nil, err + } + + for _, setup := range options { + setup(v) + } + + v.settingEngine = &webrtc.SettingEngine{} + + return v, nil +} + +func (v *TestWebRTCAPI) Close() error { + if v.proxy != nil { + v.proxy.Close() + } + + if v.router != nil { + v.router.Stop() + } + + return nil +} + +func (v *TestWebRTCAPI) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error { + // Setting engine for https://github.com/pion/transport/tree/master/vnet + setupVnet := func(vnetClientIP string) (err error) { + // We create a private router for a api, however, it's possible to share the + // same router between apis. + if v.router, err = vnet.NewRouter(&vnet.RouterConfig{ + CIDR: "0.0.0.0/0", // Accept all ip, no sub router. + LoggerFactory: logging.NewDefaultLoggerFactory(), + }); err != nil { + return errors.Wrapf(err, "create router for api") + } + + // Each api should bind to a network, however, it's possible to share it + // for different apis. + v.network = vnet.NewNet(&vnet.NetConfig{ + StaticIP: vnetClientIP, + }) + + if err = v.router.AddNet(v.network); err != nil { + return errors.Wrapf(err, "create network for api") + } + + v.settingEngine.SetVNet(v.network) + + // Create a proxy bind to the router. + if v.proxy, err = vnet_proxy.NewProxy(v.router); err != nil { + return errors.Wrapf(err, "create proxy for router") + } + + return v.router.Start() + } + if err := setupVnet(vnetClientIP); err != nil { + return err + } + + for _, setup := range options { + setup(v) + } + + for _, setup := range v.options { + setup(v) + } + + v.api = webrtc.NewAPI( + webrtc.WithMediaEngine(v.mediaEngine), + webrtc.WithInterceptorRegistry(v.registry), + webrtc.WithSettingEngine(*v.settingEngine), + ) + + return nil +} + +func (v *TestWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) { + return v.api.NewPeerConnection(configuration) +} + +type TestPlayerOptionFunc func(p *TestPlayer) error + +type TestPlayer struct { + pc *webrtc.PeerConnection + receivers []*webrtc.RTPReceiver + // root api object + api *TestWebRTCAPI + // Optional suffix for stream url. + streamSuffix string +} + +func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) (*TestPlayer, error) { + v := &TestPlayer{api: api} + + for _, opt := range options { + if err := opt(v); err != nil { + return nil, err + } + } + + // The api might be override by options. + api = v.api + + return v, nil +} + +func (v *TestPlayer) Close() error { + if v.pc != nil { + v.pc.Close() + } + + for _, receiver := range v.receivers { + receiver.Stop() + } + + return nil +} + +func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error { + r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) + if v.streamSuffix != "" { + r = fmt.Sprintf("%v-%v", r, v.streamSuffix) + } + pli := time.Duration(*srsPlayPLI) * time.Millisecond + logger.Tf(ctx, "Start play url=%v", r) + + pc, err := v.api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return errors.Wrapf(err, "Create PC") + } + v.pc = pc + + pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }) + pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }) + + offer, err := pc.CreateOffer(nil) + if err != nil { + return errors.Wrapf(err, "Create Offer") + } + + if err := pc.SetLocalDescription(offer); err != nil { + return errors.Wrapf(err, "Set offer %v", offer) + } + + answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP) + if err != nil { + return errors.Wrapf(err, "Api request offer=%v", offer.SDP) + } + + // Start a proxy for real server and vnet. + if address, err := parseAddressOfCandidate(answer); err != nil { + return errors.Wrapf(err, "parse address of %v", answer) + } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil { + return errors.Wrapf(err, "proxy %v to %v", v.api.network, address) + } + + if err := pc.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, SDP: answer, + }); err != nil { + return errors.Wrapf(err, "Set answer %v", answer) + } + + handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error { + // Send a PLI on an interval so that the publisher is pushing a keyframe + go func() { + if track.Kind() == webrtc.RTPCodecTypeAudio { + return + } + + for { + select { + case <-ctx.Done(): + return + case <-time.After(pli): + _ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ + MediaSSRC: uint32(track.SSRC()), + }}) + } + } + }() + + v.receivers = append(v.receivers, receiver) + + for ctx.Err() == nil { + _, _, err := track.ReadRTP() + if err != nil { + return errors.Wrapf(err, "Read RTP") + } + } + + return nil + } + + pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + err = handleTrack(ctx, track, receiver) + if err != nil { + codec := track.Codec() + err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType) + cancel() + } + }) + + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed { + err = errors.Errorf("Close for ICE state %v", state) + cancel() + } + }) + + <-ctx.Done() + return err +} + +type TestPublisherOptionFunc func(p *TestPublisher) error + +type TestPublisher struct { + onOffer func(s *webrtc.SessionDescription) error + onAnswer func(s *webrtc.SessionDescription) error + iceReadyCancel context.CancelFunc + // internal objects + aIngester *audioIngester + vIngester *videoIngester + pc *webrtc.PeerConnection + // root api object + api *TestWebRTCAPI + // Optional suffix for stream url. + streamSuffix string +} + +func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (*TestPublisher, error) { + sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio + + v := &TestPublisher{api: api} + + for _, opt := range options { + if err := opt(v); err != nil { + return nil, err + } + } + + // The api might be override by options. + api = v.api + + // Create ingesters. + if sourceAudio != "" { + v.aIngester = NewAudioIngester(sourceAudio) + } + if sourceVideo != "" { + v.vIngester = NewVideoIngester(sourceVideo) + } + + // Setup the interceptors for packets. + api.options = append(api.options, func(api *TestWebRTCAPI) { + // Filter for RTCP packets. + rtcpInterceptor := &RTCPInterceptor{} + rtcpInterceptor.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + return rtcpInterceptor.nextRTCPReader.Read(buf, attributes) + } + rtcpInterceptor.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { + return rtcpInterceptor.nextRTCPWriter.Write(pkts, attributes) + } + api.registry.Add(rtcpInterceptor) + + // Filter for ingesters. + if sourceAudio != "" { + api.registry.Add(v.aIngester.audioLevelInterceptor) + } + if sourceVideo != "" { + api.registry.Add(v.vIngester.markerInterceptor) + } + }) + + return v, nil +} + +func (v *TestPublisher) Close() error { + if v.vIngester != nil { + v.vIngester.Close() + } + + if v.aIngester != nil { + v.aIngester.Close() + } + + if v.pc != nil { + v.pc.Close() + } + + return nil +} + +func (v *TestPublisher) SetStreamSuffix(suffix string) *TestPublisher { + v.streamSuffix = suffix + return v +} + +func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) error { + r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) + if v.streamSuffix != "" { + r = fmt.Sprintf("%v-%v", r, v.streamSuffix) + } + sourceVideo, sourceAudio, fps := *srsPublishVideo, *srsPublishAudio, *srsPublishVideoFps + + logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v", + r, sourceAudio, sourceVideo, fps) + + pc, err := v.api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return errors.Wrapf(err, "Create PC") + } + v.pc = pc + + if v.vIngester != nil { + if err := v.vIngester.AddTrack(pc, fps); err != nil { + return errors.Wrapf(err, "Add track") + } + } + + if v.aIngester != nil { + if err := v.aIngester.AddTrack(pc); err != nil { + return errors.Wrapf(err, "Add track") + } + } + + offer, err := pc.CreateOffer(nil) + if err != nil { + return errors.Wrapf(err, "Create Offer") + } + + if err := pc.SetLocalDescription(offer); err != nil { + return errors.Wrapf(err, "Set offer %v", offer) + } + + if v.onOffer != nil { + if err := v.onOffer(&offer); err != nil { + return errors.Wrapf(err, "sdp %v %v", offer.Type, offer.SDP) + } + } + + answerSDP, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP) + if err != nil { + return errors.Wrapf(err, "Api request offer=%v", offer.SDP) + } + + // Start a proxy for real server and vnet. + if address, err := parseAddressOfCandidate(answerSDP); err != nil { + return errors.Wrapf(err, "parse address of %v", answerSDP) + } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil { + return errors.Wrapf(err, "proxy %v to %v", v.api.network, address) + } + + answer := &webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, SDP: answerSDP, + } + if v.onAnswer != nil { + if err := v.onAnswer(answer); err != nil { + return errors.Wrapf(err, "on answerSDP") + } + } + + if err := pc.SetRemoteDescription(*answer); err != nil { + return errors.Wrapf(err, "Set answerSDP %v", answerSDP) + } + + logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState()) + + // ICE state management. + pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) { + logger.Tf(ctx, "ICE gather state %v", state) + }) + pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { + logger.Tf(ctx, "ICE candidate %v %v:%v", candidate.Protocol, candidate.Address, candidate.Port) + + }) + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + logger.Tf(ctx, "ICE state %v", state) + }) + + pc.OnSignalingStateChange(func(state webrtc.SignalingState) { + logger.Tf(ctx, "Signaling state %v", state) + }) + + if v.aIngester != nil { + v.aIngester.sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) { + logger.Tf(ctx, "DTLS state %v", state) + }) + } + + pcDone, pcDoneCancel := context.WithCancel(context.Background()) + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + logger.Tf(ctx, "PC state %v", state) + + if state == webrtc.PeerConnectionStateConnected { + pcDoneCancel() + if v.iceReadyCancel != nil { + v.iceReadyCancel() + } + } + + if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed { + err = errors.Errorf("Close for PC state %v", state) + cancel() + } + }) + + // Wait for event from context or tracks. + var wg sync.WaitGroup + var finalErr error + + wg.Add(1) + go func() { + defer wg.Done() + defer logger.Tf(ctx, "ingest notify done") + + <-ctx.Done() + + if v.aIngester != nil && v.aIngester.sAudioSender != nil { + v.aIngester.sAudioSender.Stop() + } + + if v.vIngester != nil && v.vIngester.sVideoSender != nil { + v.vIngester.sVideoSender.Stop() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + if v.aIngester == nil { + return + } + + select { + case <-ctx.Done(): + return + case <-pcDone.Done(): + } + + wg.Add(1) + go func() { + defer wg.Done() + defer logger.Tf(ctx, "aingester sender read done") + + buf := make([]byte, 1500) + for ctx.Err() == nil { + if _, _, err := v.aIngester.sAudioSender.Read(buf); err != nil { + return + } + } + }() + + for { + if err := v.aIngester.Ingest(ctx); err != nil { + if err == io.EOF { + logger.Tf(ctx, "aingester retry for %v", err) + continue + } + if err != context.Canceled { + finalErr = errors.Wrapf(err, "audio") + } + + logger.Tf(ctx, "aingester err=%v, final=%v", err, finalErr) + return + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + if v.vIngester == nil { + return + } + + select { + case <-ctx.Done(): + return + case <-pcDone.Done(): + logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo) + } + + wg.Add(1) + go func() { + defer wg.Done() + defer logger.Tf(ctx, "vingester sender read done") + + buf := make([]byte, 1500) + for ctx.Err() == nil { + // The Read() might block in r.rtcpInterceptor.Read(b, a), + // so that the Stop() can not stop it. + if _, _, err := v.vIngester.sVideoSender.Read(buf); err != nil { + return + } + } + }() + + for { + if err := v.vIngester.Ingest(ctx); err != nil { + if err == io.EOF { + logger.Tf(ctx, "vingester retry for %v", err) + continue + } + if err != context.Canceled { + finalErr = errors.Wrapf(err, "video") + } + + logger.Tf(ctx, "vingester err=%v, final=%v", err, finalErr) + return + } + } + }() + + wg.Wait() + + logger.Tf(ctx, "ingester done ctx=%v, final=%v", ctx.Err(), finalErr) + if finalErr != nil { + return finalErr + } + return ctx.Err() +} + +func TestRTCServerVersion(t *testing.T) { + api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer) + req, err := http.NewRequest("POST", api, nil) + if err != nil { + t.Errorf("Request %v", api) + return + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + t.Errorf("Do request %v", api) + return + } + + b, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("Read body of %v", api) + return + } + + obj := struct { + Code int `json:"code"` + Server string `json:"server"` + Data struct { + Major int `json:"major"` + Minor int `json:"minor"` + Revision int `json:"revision"` + Version string `json:"version"` + } `json:"data"` + }{} + if err := json.Unmarshal(b, &obj); err != nil { + t.Errorf("Parse %v", string(b)) + return + } + if obj.Code != 0 { + t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server) + return + } + if obj.Data.Major == 0 && obj.Data.Minor == 0 { + t.Errorf("Invalid version %v", obj.Data) + return + } +} diff --git a/trunk/3rdparty/srs-bench/srs/util_test.go b/trunk/3rdparty/srs-bench/srs/util_test.go deleted file mode 100644 index 68187c9ef..000000000 --- a/trunk/3rdparty/srs-bench/srs/util_test.go +++ /dev/null @@ -1,723 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2021 srs-bench(ossrs) -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -package srs - -import ( - "context" - "encoding/json" - "flag" - "fmt" - "github.com/ossrs/go-oryx-lib/errors" - "github.com/ossrs/go-oryx-lib/logger" - vnet_proxy "github.com/ossrs/srs-bench/vnet" - "github.com/pion/interceptor" - "github.com/pion/logging" - "github.com/pion/rtcp" - "github.com/pion/transport/vnet" - "github.com/pion/webrtc/v3" - "io" - "io/ioutil" - "net/http" - "os" - "path" - "strings" - "sync" - "testing" - "time" -) - -var srsSchema = "http" -var srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API") -var srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to") -var srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play") -var srsLog = flag.Bool("srs-log", false, "Whether enable the detail log") -var srsTimeout = flag.Int("srs-timeout", 5000, "For each case, the timeout in ms") -var srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.") -var srsPlayOKPackets = flag.Int("srs-play-ok-packets", 10, "If got N packets, it's ok, or fail") -var srsPublishOKPackets = flag.Int("srs-publish-ok-packets", 10, "If send N packets, it's ok, or fail") -var srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.") -var srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.") -var srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.") -var srsVnetClientIP = flag.String("srs-vnet-client-ip", "192.168.168.168", "The client ip in pion/vnet.") -var srsDTLSDropPackets = flag.Int("srs-dtls-drop-packets", 5, "If dropped N packets, it's ok, or fail") - -func prepareTest() error { - var err error - - // Should parse it first. - flag.Parse() - - // The stream should starts with /, for example, /rtc/regression - if !strings.HasPrefix(*srsStream, "/") { - *srsStream = "/" + *srsStream - } - - // Generate srs protocol from whether use HTTPS. - if *srsHttps { - srsSchema = "https" - } - - // Check file. - tryOpenFile := func(filename string) (string, error) { - if filename == "" { - return filename, nil - } - - f, err := os.Open(filename) - if err != nil { - nfilename := path.Join("../", filename) - f2, err := os.Open(nfilename) - if err != nil { - return filename, errors.Wrapf(err, "No video file at %v or %v", filename, nfilename) - } - defer f2.Close() - - return nfilename, nil - } - defer f.Close() - - return filename, nil - } - - if *srsPublishVideo, err = tryOpenFile(*srsPublishVideo); err != nil { - return err - } - - if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil { - return err - } - - return nil -} - -func TestMain(m *testing.M) { - if err := prepareTest(); err != nil { - logger.Ef(nil, "Prepare test fail, err %+v", err) - os.Exit(-1) - } - - // Disable the logger during all tests. - if *srsLog == false { - olw := logger.Switch(ioutil.Discard) - defer func() { - logger.Switch(olw) - }() - } - - os.Exit(m.Run()) -} - -type TestWebRTCAPIOptionFunc func(api *TestWebRTCAPI) - -type TestWebRTCAPI struct { - // The options to setup the api. - options []TestWebRTCAPIOptionFunc - // The api and settings. - api *webrtc.API - mediaEngine *webrtc.MediaEngine - registry *interceptor.Registry - settingEngine *webrtc.SettingEngine - // The vnet router, can be shared by different apis, but we do not share it. - router *vnet.Router - // The network for api. - network *vnet.Net - // The vnet UDP proxy bind to the router. - proxy *vnet_proxy.UDPProxy -} - -func NewTestWebRTCAPI(options ...TestWebRTCAPIOptionFunc) (*TestWebRTCAPI, error) { - v := &TestWebRTCAPI{} - - v.mediaEngine = &webrtc.MediaEngine{} - if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil { - return nil, err - } - - v.registry = &interceptor.Registry{} - if err := webrtc.RegisterDefaultInterceptors(v.mediaEngine, v.registry); err != nil { - return nil, err - } - - for _, setup := range options { - setup(v) - } - - v.settingEngine = &webrtc.SettingEngine{} - - return v, nil -} - -func (v *TestWebRTCAPI) Close() error { - if v.proxy != nil { - v.proxy.Close() - v.proxy = nil - } - - if v.router != nil { - v.router.Stop() - v.router = nil - } - - return nil -} - -func (v *TestWebRTCAPI) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error { - // Setting engine for https://github.com/pion/transport/tree/master/vnet - setupVnet := func(vnetClientIP string) (err error) { - // We create a private router for a api, however, it's possible to share the - // same router between apis. - if v.router, err = vnet.NewRouter(&vnet.RouterConfig{ - CIDR: "0.0.0.0/0", // Accept all ip, no sub router. - LoggerFactory: logging.NewDefaultLoggerFactory(), - }); err != nil { - return errors.Wrapf(err, "create router for api") - } - - // Each api should bind to a network, however, it's possible to share it - // for different apis. - v.network = vnet.NewNet(&vnet.NetConfig{ - StaticIP: vnetClientIP, - }) - - if err = v.router.AddNet(v.network); err != nil { - return errors.Wrapf(err, "create network for api") - } - - v.settingEngine.SetVNet(v.network) - - // Create a proxy bind to the router. - if v.proxy, err = vnet_proxy.NewProxy(v.router); err != nil { - return errors.Wrapf(err, "create proxy for router") - } - - return v.router.Start() - } - if err := setupVnet(vnetClientIP); err != nil { - return err - } - - for _, setup := range options { - setup(v) - } - - for _, setup := range v.options { - setup(v) - } - - v.api = webrtc.NewAPI( - webrtc.WithMediaEngine(v.mediaEngine), - webrtc.WithInterceptorRegistry(v.registry), - webrtc.WithSettingEngine(*v.settingEngine), - ) - - return nil -} - -func (v *TestWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) { - return v.api.NewPeerConnection(configuration) -} - -type TestPlayerOptionFunc func(p *TestPlayer) - -type TestPlayer struct { - pc *webrtc.PeerConnection - receivers []*webrtc.RTPReceiver - // root api object - api *TestWebRTCAPI - // Optional suffix for stream url. - streamSuffix string -} - -func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) *TestPlayer { - v := &TestPlayer{api: api} - - for _, opt := range options { - opt(v) - } - - return v -} - -func (v *TestPlayer) Close() error { - if v.pc != nil { - v.pc.Close() - v.pc = nil - } - - for _, receiver := range v.receivers { - receiver.Stop() - } - v.receivers = nil - - return nil -} - -func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error { - r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) - if v.streamSuffix != "" { - r = fmt.Sprintf("%v-%v", r, v.streamSuffix) - } - pli := time.Duration(*srsPlayPLI) * time.Millisecond - logger.Tf(ctx, "Start play url=%v", r) - - pc, err := v.api.NewPeerConnection(webrtc.Configuration{}) - if err != nil { - return errors.Wrapf(err, "Create PC") - } - v.pc = pc - - pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ - Direction: webrtc.RTPTransceiverDirectionRecvonly, - }) - pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ - Direction: webrtc.RTPTransceiverDirectionRecvonly, - }) - - offer, err := pc.CreateOffer(nil) - if err != nil { - return errors.Wrapf(err, "Create Offer") - } - - if err := pc.SetLocalDescription(offer); err != nil { - return errors.Wrapf(err, "Set offer %v", offer) - } - - answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP) - if err != nil { - return errors.Wrapf(err, "Api request offer=%v", offer.SDP) - } - - // Start a proxy for real server and vnet. - if address, err := parseAddressOfCandidate(answer); err != nil { - return errors.Wrapf(err, "parse address of %v", answer) - } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil { - return errors.Wrapf(err, "proxy %v to %v", v.api.network, address) - } - - if err := pc.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, SDP: answer, - }); err != nil { - return errors.Wrapf(err, "Set answer %v", answer) - } - - handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error { - // Send a PLI on an interval so that the publisher is pushing a keyframe - go func() { - if track.Kind() == webrtc.RTPCodecTypeAudio { - return - } - - for { - select { - case <-ctx.Done(): - return - case <-time.After(pli): - _ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ - MediaSSRC: uint32(track.SSRC()), - }}) - } - } - }() - - v.receivers = append(v.receivers, receiver) - - for ctx.Err() == nil { - _, _, err := track.ReadRTP() - if err != nil { - return errors.Wrapf(err, "Read RTP") - } - } - - return nil - } - - pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { - err = handleTrack(ctx, track, receiver) - if err != nil { - codec := track.Codec() - err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType) - cancel() - } - }) - - pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { - if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed { - err = errors.Errorf("Close for ICE state %v", state) - cancel() - } - }) - - <-ctx.Done() - return err -} - -type TestPublisherOptionFunc func(p *TestPublisher) - -type TestPublisher struct { - onOffer func(s *webrtc.SessionDescription) error - onAnswer func(s *webrtc.SessionDescription) error - iceReadyCancel context.CancelFunc - // internal objects - aIngester *audioIngester - vIngester *videoIngester - pc *webrtc.PeerConnection - // root api object - api *TestWebRTCAPI - // Optional suffix for stream url. - streamSuffix string -} - -func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) *TestPublisher { - sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio - - v := &TestPublisher{api: api} - - for _, opt := range options { - opt(v) - } - - // Create ingesters. - if sourceAudio != "" { - v.aIngester = NewAudioIngester(sourceAudio) - } - if sourceVideo != "" { - v.vIngester = NewVideoIngester(sourceVideo) - } - - // Setup the interceptors for packets. - api.options = append(api.options, func(api *TestWebRTCAPI) { - // Filter for RTCP packets. - rtcpInterceptor := &RTCPInterceptor{} - rtcpInterceptor.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - return rtcpInterceptor.nextRTCPReader.Read(buf, attributes) - } - rtcpInterceptor.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { - return rtcpInterceptor.nextRTCPWriter.Write(pkts, attributes) - } - api.registry.Add(rtcpInterceptor) - - // Filter for ingesters. - if sourceAudio != "" { - api.registry.Add(v.aIngester.audioLevelInterceptor) - } - if sourceVideo != "" { - api.registry.Add(v.vIngester.markerInterceptor) - } - }) - - return v -} - -func (v *TestPublisher) Close() error { - if v.vIngester != nil { - v.vIngester.Close() - } - - if v.aIngester != nil { - v.aIngester.Close() - } - - if v.pc != nil { - v.pc.Close() - } - - return nil -} - -func (v *TestPublisher) SetStreamSuffix(suffix string) *TestPublisher { - v.streamSuffix = suffix - return v -} - -func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) error { - r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) - if v.streamSuffix != "" { - r = fmt.Sprintf("%v-%v", r, v.streamSuffix) - } - sourceVideo, sourceAudio, fps := *srsPublishVideo, *srsPublishAudio, *srsPublishVideoFps - - logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v", - r, sourceAudio, sourceVideo, fps) - - pc, err := v.api.NewPeerConnection(webrtc.Configuration{}) - if err != nil { - return errors.Wrapf(err, "Create PC") - } - v.pc = pc - - if v.vIngester != nil { - if err := v.vIngester.AddTrack(pc, fps); err != nil { - return errors.Wrapf(err, "Add track") - } - defer v.vIngester.Close() - } - - if v.aIngester != nil { - if err := v.aIngester.AddTrack(pc); err != nil { - return errors.Wrapf(err, "Add track") - } - defer v.aIngester.Close() - } - - offer, err := pc.CreateOffer(nil) - if err != nil { - return errors.Wrapf(err, "Create Offer") - } - - if err := pc.SetLocalDescription(offer); err != nil { - return errors.Wrapf(err, "Set offer %v", offer) - } - - if v.onOffer != nil { - if err := v.onOffer(&offer); err != nil { - return errors.Wrapf(err, "sdp %v %v", offer.Type, offer.SDP) - } - } - - answerSDP, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP) - if err != nil { - return errors.Wrapf(err, "Api request offer=%v", offer.SDP) - } - - // Start a proxy for real server and vnet. - if address, err := parseAddressOfCandidate(answerSDP); err != nil { - return errors.Wrapf(err, "parse address of %v", answerSDP) - } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil { - return errors.Wrapf(err, "proxy %v to %v", v.api.network, address) - } - - answer := &webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, SDP: answerSDP, - } - if v.onAnswer != nil { - if err := v.onAnswer(answer); err != nil { - return errors.Wrapf(err, "on answerSDP") - } - } - - if err := pc.SetRemoteDescription(*answer); err != nil { - return errors.Wrapf(err, "Set answerSDP %v", answerSDP) - } - - logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState()) - - // ICE state management. - pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) { - logger.Tf(ctx, "ICE gather state %v", state) - }) - pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { - logger.Tf(ctx, "ICE candidate %v %v:%v", candidate.Protocol, candidate.Address, candidate.Port) - - }) - pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { - logger.Tf(ctx, "ICE state %v", state) - }) - - pc.OnSignalingStateChange(func(state webrtc.SignalingState) { - logger.Tf(ctx, "Signaling state %v", state) - }) - - if v.aIngester != nil { - v.aIngester.sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) { - logger.Tf(ctx, "DTLS state %v", state) - }) - } - - pcDone, pcDoneCancel := context.WithCancel(context.Background()) - pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - logger.Tf(ctx, "PC state %v", state) - - if state == webrtc.PeerConnectionStateConnected { - pcDoneCancel() - if v.iceReadyCancel != nil { - v.iceReadyCancel() - } - } - - if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed { - err = errors.Errorf("Close for PC state %v", state) - cancel() - } - }) - - // Wait for event from context or tracks. - var wg sync.WaitGroup - var finalErr error - - wg.Add(1) - go func() { - defer wg.Done() - defer logger.Tf(ctx, "ingest notify done") - - <-ctx.Done() - - if v.aIngester != nil && v.aIngester.sAudioSender != nil { - v.aIngester.sAudioSender.Stop() - } - - if v.vIngester != nil && v.vIngester.sVideoSender != nil { - v.vIngester.sVideoSender.Stop() - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - defer cancel() - - if v.aIngester == nil { - return - } - - select { - case <-ctx.Done(): - return - case <-pcDone.Done(): - } - - wg.Add(1) - go func() { - defer wg.Done() - defer logger.Tf(ctx, "aingester sender read done") - - buf := make([]byte, 1500) - for ctx.Err() == nil { - if _, _, err := v.aIngester.sAudioSender.Read(buf); err != nil { - return - } - } - }() - - for { - if err := v.aIngester.Ingest(ctx); err != nil { - if err == io.EOF { - logger.Tf(ctx, "aingester retry for %v", err) - continue - } - if err != context.Canceled { - finalErr = errors.Wrapf(err, "audio") - } - - logger.Tf(ctx, "aingester err=%v, final=%v", err, finalErr) - return - } - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - defer cancel() - - if v.vIngester == nil { - return - } - - select { - case <-ctx.Done(): - return - case <-pcDone.Done(): - logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo) - } - - wg.Add(1) - go func() { - defer wg.Done() - defer logger.Tf(ctx, "vingester sender read done") - - buf := make([]byte, 1500) - for ctx.Err() == nil { - // The Read() might block in r.rtcpInterceptor.Read(b, a), - // so that the Stop() can not stop it. - if _, _, err := v.vIngester.sVideoSender.Read(buf); err != nil { - return - } - } - }() - - for { - if err := v.vIngester.Ingest(ctx); err != nil { - if err == io.EOF { - logger.Tf(ctx, "vingester retry for %v", err) - continue - } - if err != context.Canceled { - finalErr = errors.Wrapf(err, "video") - } - - logger.Tf(ctx, "vingester err=%v, final=%v", err, finalErr) - return - } - } - }() - - wg.Wait() - - logger.Tf(ctx, "ingester done ctx=%v, final=%v", ctx.Err(), finalErr) - if finalErr != nil { - return finalErr - } - return ctx.Err() -} - -func TestRTCServerVersion(t *testing.T) { - api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer) - req, err := http.NewRequest("POST", api, nil) - if err != nil { - t.Errorf("Request %v", api) - return - } - - res, err := http.DefaultClient.Do(req) - if err != nil { - t.Errorf("Do request %v", api) - return - } - - b, err := ioutil.ReadAll(res.Body) - if err != nil { - t.Errorf("Read body of %v", api) - return - } - - obj := struct { - Code int `json:"code"` - Server string `json:"server"` - Data struct { - Major int `json:"major"` - Minor int `json:"minor"` - Revision int `json:"revision"` - Version string `json:"version"` - } `json:"data"` - }{} - if err := json.Unmarshal(b, &obj); err != nil { - t.Errorf("Parse %v", string(b)) - return - } - if obj.Code != 0 { - t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server) - return - } - if obj.Data.Major == 0 && obj.Data.Minor == 0 { - t.Errorf("Invalid version %v", obj.Data) - return - } -} diff --git a/trunk/3rdparty/srs-bench/vnet/example_udpproxy_test.go b/trunk/3rdparty/srs-bench/vnet/example_udpproxy_test.go index d54e71653..0a8ebb55a 100644 --- a/trunk/3rdparty/srs-bench/vnet/example_udpproxy_test.go +++ b/trunk/3rdparty/srs-bench/vnet/example_udpproxy_test.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in diff --git a/trunk/3rdparty/srs-bench/vnet/udpproxy.go b/trunk/3rdparty/srs-bench/vnet/udpproxy.go index c21fe4603..caba86e0e 100644 --- a/trunk/3rdparty/srs-bench/vnet/udpproxy.go +++ b/trunk/3rdparty/srs-bench/vnet/udpproxy.go @@ -1,34 +1,13 @@ -// The MIT License (MIT) -// -// Copyright (c) 2021 srs-bench(ossrs) -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. package vnet import ( + "context" "net" "sync" "time" - - "github.com/pion/transport/vnet" ) -// A UDP proxy between real server(net.UDPConn) and vnet.UDPConn. +// UDPProxy is a proxy between real server(net.UDPConn) and vnet.UDPConn. // // High level design: // .............................................. @@ -44,32 +23,9 @@ import ( // : | | ............................: // : +--------+ : // ............................................... -// -// The whole big picture: -// ...................................... -// : Virtual Network (vnet) : -// : : -// +-------+ * 1 +----+ +--------+ : -// | :App |------------>|:Net|--o<-----|:Router | ............................. -// +-------+ +----+ | | : UDPProxy : -// +-----------+ * 1 +----+ | | +----+ +---------+ +---------+ +--------+ -// |:STUNServer|-------->|:Net|--o<-----| |--->o--|:Net|-->o-| vnet. |-->o-| net. |--->-| :Real | -// +-----------+ +----+ | | +----+ | UDPConn | | UDPConn | | Server | -// +-----------+ * 1 +----+ | | : +---------+ +---------+ +--------+ -// |:TURNServer|-------->|:Net|--o<-----| | ............................: -// +-----------+ +----+ [1] | | : -// : 1 | | 1 <> : -// : +---<>| |<>----+ [2] : -// : | +--------+ | : -// To form | *| v 0..1 : -// a subnet tree | o [3] +-----+ : -// : | ^ |:NAT | : -// : | | +-----+ : -// : +-------+ : -// ...................................... type UDPProxy struct { // The router bind to. - router *vnet.Router + router *Router // Each vnet source, bind to a real socket to server. // key is real server addr, which is net.Addr @@ -88,19 +44,22 @@ type UDPProxy struct { // NewProxy create a proxy, the router for this proxy belongs/bind to. If need to proxy for // please create a new proxy for each router. For all addresses we proxy, we will create a // vnet.Net in this router and proxy all packets. -func NewProxy(router *vnet.Router) (*UDPProxy, error) { +func NewProxy(router *Router) (*UDPProxy, error) { v := &UDPProxy{router: router, timeout: 2 * time.Minute} return v, nil } // Close the proxy, stop all workers. func (v *UDPProxy) Close() error { - // nolint:godox // TODO: FIXME: Do cleanup. + v.workers.Range(func(key, value interface{}) bool { + _ = value.(*aUDPProxyWorker).Close() + return true + }) return nil } // Proxy starts a worker for server, ignore if already started. -func (v *UDPProxy) Proxy(client *vnet.Net, server *net.UDPAddr) error { +func (v *UDPProxy) Proxy(client *Net, server *net.UDPAddr) error { // Note that even if the worker exists, it's also ok to create a same worker, // because the router will use the last one, and the real server will see a address // change event after we switch to the next worker. @@ -113,25 +72,44 @@ func (v *UDPProxy) Proxy(client *vnet.Net, server *net.UDPAddr) error { worker := &aUDPProxyWorker{ router: v.router, mockRealServerAddr: v.mockRealServerAddr, } + + // Create context for cleanup. + var ctx context.Context + ctx, worker.ctxDisposeCancel = context.WithCancel(context.Background()) + v.workers.Store(server.String(), worker) - return worker.Proxy(client, server) + return worker.Proxy(ctx, client, server) } // A proxy worker for a specified proxy server. type aUDPProxyWorker struct { - router *vnet.Router + router *Router mockRealServerAddr *net.UDPAddr // Each vnet source, bind to a real socket to server. // key is vnet client addr, which is net.Addr // value is *net.UDPConn endpoints sync.Map + + // For cleanup. + ctxDisposeCancel context.CancelFunc + wg sync.WaitGroup +} + +func (v *aUDPProxyWorker) Close() error { + // Notify all goroutines to dispose. + v.ctxDisposeCancel() + + // Wait for all goroutines quit. + v.wg.Wait() + + return nil } -func (v *aUDPProxyWorker) Proxy(client *vnet.Net, serverAddr *net.UDPAddr) error { // nolint:gocognit +func (v *aUDPProxyWorker) Proxy(ctx context.Context, client *Net, serverAddr *net.UDPAddr) error { // nolint:gocognit // Create vnet for real server by serverAddr. - nw := vnet.NewNet(&vnet.NetConfig{ + nw := NewNet(&NetConfig{ StaticIP: serverAddr.IP.String(), }) if err := v.router.AddNet(nw); err != nil { @@ -145,35 +123,14 @@ func (v *aUDPProxyWorker) Proxy(client *vnet.Net, serverAddr *net.UDPAddr) error return err } - // Start a proxy goroutine. - var findEndpointBy func(addr net.Addr) (*net.UDPConn, error) - // nolint:godox // TODO: FIXME: Do cleanup. + // User stop proxy, we should close the socket. go func() { - buf := make([]byte, 1500) - - for { - n, addr, err := vnetSocket.ReadFrom(buf) - if err != nil { - return - } - - if n <= 0 || addr == nil { - continue // Drop packet - } - - realSocket, err := findEndpointBy(addr) - if err != nil { - continue // Drop packet. - } - - if _, err := realSocket.Write(buf[:n]); err != nil { - return - } - } + <-ctx.Done() + _ = vnetSocket.Close() }() // Got new vnet client, start a new endpoint. - findEndpointBy = func(addr net.Addr) (*net.UDPConn, error) { + findEndpointBy := func(addr net.Addr) (*net.UDPConn, error) { // Exists binding. if value, ok := v.endpoints.Load(addr.String()); ok { // Exists endpoint, reuse it. @@ -192,12 +149,20 @@ func (v *aUDPProxyWorker) Proxy(client *vnet.Net, serverAddr *net.UDPAddr) error return nil, err } + // User stop proxy, we should close the socket. + go func() { + <-ctx.Done() + _ = realSocket.Close() + }() + // Bind address. v.endpoints.Store(addr.String(), realSocket) // Got packet from real serverAddr, we should proxy it to vnet. - // nolint:godox // TODO: FIXME: Do cleanup. + v.wg.Add(1) go func(vnetClientAddr net.Addr) { + defer v.wg.Done() + buf := make([]byte, 1500) for { n, _, err := realSocket.ReadFrom(buf) @@ -218,5 +183,33 @@ func (v *aUDPProxyWorker) Proxy(client *vnet.Net, serverAddr *net.UDPAddr) error return realSocket, nil } + // Start a proxy goroutine. + v.wg.Add(1) + go func() { + defer v.wg.Done() + + buf := make([]byte, 1500) + + for { + n, addr, err := vnetSocket.ReadFrom(buf) + if err != nil { + return + } + + if n <= 0 || addr == nil { + continue // Drop packet + } + + realSocket, err := findEndpointBy(addr) + if err != nil { + continue // Drop packet. + } + + if _, err := realSocket.Write(buf[:n]); err != nil { + return + } + } + }() + return nil } diff --git a/trunk/3rdparty/srs-bench/vnet/udpproxy_direct.go b/trunk/3rdparty/srs-bench/vnet/udpproxy_direct.go index 6d49494ed..35e4618d7 100644 --- a/trunk/3rdparty/srs-bench/vnet/udpproxy_direct.go +++ b/trunk/3rdparty/srs-bench/vnet/udpproxy_direct.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in diff --git a/trunk/3rdparty/srs-bench/vnet/udpproxy_direct_test.go b/trunk/3rdparty/srs-bench/vnet/udpproxy_direct_test.go index b347c682c..48b776957 100644 --- a/trunk/3rdparty/srs-bench/vnet/udpproxy_direct_test.go +++ b/trunk/3rdparty/srs-bench/vnet/udpproxy_direct_test.go @@ -1,6 +1,6 @@ // The MIT License (MIT) // -// Copyright (c) 2021 srs-bench(ossrs) +// Copyright (c) 2021 Winlin // // Permission is hereby granted, free of charge, to any person obtaining a copy of // this software and associated documentation files (the "Software"), to deal in diff --git a/trunk/3rdparty/srs-bench/vnet/udpproxy_test.go b/trunk/3rdparty/srs-bench/vnet/udpproxy_test.go index c0c1c4a2b..e5689bc18 100644 --- a/trunk/3rdparty/srs-bench/vnet/udpproxy_test.go +++ b/trunk/3rdparty/srs-bench/vnet/udpproxy_test.go @@ -1,23 +1,5 @@ -// The MIT License (MIT) -// -// Copyright (c) 2021 srs-bench(ossrs) -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// +build !wasm + package vnet import ( @@ -32,7 +14,6 @@ import ( "time" "github.com/pion/logging" - "github.com/pion/transport/vnet" ) type MockUDPEchoServer struct { @@ -163,7 +144,7 @@ func TestUDPProxyOne2One(t *testing.T) { } doVnetProxy := func() error { - router, err := vnet.NewRouter(&vnet.RouterConfig{ + router, err := NewRouter(&RouterConfig{ CIDR: "0.0.0.0/0", LoggerFactory: logging.NewDefaultLoggerFactory(), }) @@ -171,7 +152,7 @@ func TestUDPProxyOne2One(t *testing.T) { return err } - clientNetwork := vnet.NewNet(&vnet.NetConfig{ + clientNetwork := NewNet(&NetConfig{ StaticIP: "10.0.0.11", }) if err = router.AddNet(clientNetwork); err != nil { @@ -309,7 +290,7 @@ func TestUDPProxyTwo2One(t *testing.T) { } doVnetProxy := func() error { - router, err := vnet.NewRouter(&vnet.RouterConfig{ + router, err := NewRouter(&RouterConfig{ CIDR: "0.0.0.0/0", LoggerFactory: logging.NewDefaultLoggerFactory(), }) @@ -317,7 +298,7 @@ func TestUDPProxyTwo2One(t *testing.T) { return err } - clientNetwork := vnet.NewNet(&vnet.NetConfig{ + clientNetwork := NewNet(&NetConfig{ StaticIP: "10.0.0.11", }) if err = router.AddNet(clientNetwork); err != nil { @@ -487,7 +468,7 @@ func TestUDPProxyProxyTwice(t *testing.T) { } doVnetProxy := func() error { - router, err := vnet.NewRouter(&vnet.RouterConfig{ + router, err := NewRouter(&RouterConfig{ CIDR: "0.0.0.0/0", LoggerFactory: logging.NewDefaultLoggerFactory(), }) @@ -495,7 +476,7 @@ func TestUDPProxyProxyTwice(t *testing.T) { return err } - clientNetwork := vnet.NewNet(&vnet.NetConfig{ + clientNetwork := NewNet(&NetConfig{ StaticIP: "10.0.0.11", }) if err = router.AddNet(clientNetwork); err != nil { diff --git a/trunk/3rdparty/srs-bench/vnet/vnet.go b/trunk/3rdparty/srs-bench/vnet/vnet.go new file mode 100644 index 000000000..ceab0847f --- /dev/null +++ b/trunk/3rdparty/srs-bench/vnet/vnet.go @@ -0,0 +1,38 @@ +// The MIT License (MIT) +// +// Copyright (c) 2021 Winlin +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +package vnet + +import ( + "github.com/pion/transport/vnet" +) + +type Router = vnet.Router +type Net = vnet.Net + +type NetConfig = vnet.NetConfig +type RouterConfig = vnet.RouterConfig + +func NewNet(config *NetConfig) *Net { + return vnet.NewNet(config) +} +func NewRouter(config *RouterConfig) (*Router, error) { + return vnet.NewRouter(config) +} diff --git a/trunk/3rdparty/st-srs/.gitignore b/trunk/3rdparty/st-srs/.gitignore index 97cd5081d..18d2ab26f 100644 --- a/trunk/3rdparty/st-srs/.gitignore +++ b/trunk/3rdparty/st-srs/.gitignore @@ -2,3 +2,6 @@ DARWIN_*_DBG LINUX_*_DBG obj st.pc + +coverage +utest