From 030b94e7179981f7d9fd10767a012d3ef0d2419c Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 10 Mar 2021 07:03:57 +0800 Subject: [PATCH] Test: Add missing files for srs-bench --- trunk/.gitignore | 2 +- trunk/3rdparty/srs-bench/srs/ingester.go | 285 ++++++++ trunk/3rdparty/srs-bench/srs/interceptor.go | 175 +++++ trunk/3rdparty/srs-bench/srs/util_test.go | 721 ++++++++++++++++++++ 4 files changed, 1182 insertions(+), 1 deletion(-) create mode 100644 trunk/3rdparty/srs-bench/srs/ingester.go create mode 100644 trunk/3rdparty/srs-bench/srs/interceptor.go create mode 100644 trunk/3rdparty/srs-bench/srs/util_test.go diff --git a/trunk/.gitignore b/trunk/.gitignore index 1cbd7b7e6..44e1d851d 100644 --- a/trunk/.gitignore +++ b/trunk/.gitignore @@ -34,7 +34,7 @@ /research/speex/ /test/ .DS_Store -srs +./srs *.dSYM/ *.gcov *.ts diff --git a/trunk/3rdparty/srs-bench/srs/ingester.go b/trunk/3rdparty/srs-bench/srs/ingester.go new file mode 100644 index 000000000..1e3161a89 --- /dev/null +++ b/trunk/3rdparty/srs-bench/srs/ingester.go @@ -0,0 +1,285 @@ +// The MIT License (MIT) +// +// Copyright (c) 2021 srs-bench(ossrs) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +package srs + +import ( + "context" + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" + "github.com/pion/interceptor" + "github.com/pion/rtp" + "github.com/pion/sdp/v3" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" + "github.com/pion/webrtc/v3/pkg/media/h264reader" + "github.com/pion/webrtc/v3/pkg/media/oggreader" + "io" + "os" + "strings" + "time" +) + +type videoIngester struct { + sourceVideo string + fps int + markerInterceptor *RTPInterceptor + sVideoTrack *webrtc.TrackLocalStaticSample + sVideoSender *webrtc.RTPSender +} + +func NewVideoIngester(sourceVideo string) *videoIngester { + return &videoIngester{markerInterceptor: &RTPInterceptor{}, sourceVideo: sourceVideo} +} + +func (v *videoIngester) Close() error { + if v.sVideoSender != nil { + v.sVideoSender.Stop() + v.sVideoSender = nil + } + return nil +} + +func (v *videoIngester) AddTrack(pc *webrtc.PeerConnection, fps int) error { + v.fps = fps + + mimeType, trackID := "video/H264", "video" + if strings.HasSuffix(v.sourceVideo, ".ivf") { + mimeType = "video/VP8" + } + + var err error + v.sVideoTrack, err = webrtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 90000}, trackID, "pion", + ) + if err != nil { + return errors.Wrapf(err, "Create video track") + } + + v.sVideoSender, err = pc.AddTrack(v.sVideoTrack) + if err != nil { + return errors.Wrapf(err, "Add video track") + } + return err +} + +func (v *videoIngester) Ingest(ctx context.Context) error { + source, sender, track, fps := v.sourceVideo, v.sVideoSender, v.sVideoTrack, v.fps + + f, err := os.Open(source) + if err != nil { + return errors.Wrapf(err, "Open file %v", source) + } + defer f.Close() + + // TODO: FIXME: Support ivf for vp8. + h264, err := h264reader.NewReader(f) + if err != nil { + return errors.Wrapf(err, "Open h264 %v", source) + } + + enc := sender.GetParameters().Encodings[0] + codec := sender.GetParameters().Codecs[0] + headers := sender.GetParameters().HeaderExtensions + logger.Tf(ctx, "Video %v, tbn=%v, fps=%v, ssrc=%v, pt=%v, header=%v", + codec.MimeType, codec.ClockRate, fps, enc.SSRC, codec.PayloadType, headers) + + clock := newWallClock() + sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 / uint64(fps)) + for ctx.Err() == nil { + var sps, pps *h264reader.NAL + var oFrames []*h264reader.NAL + for ctx.Err() == nil { + frame, err := h264.NextNAL() + if err == io.EOF { + return io.EOF + } + if err != nil { + return errors.Wrapf(err, "Read h264") + } + + oFrames = append(oFrames, frame) + logger.If(ctx, "NALU %v PictureOrderCount=%v, ForbiddenZeroBit=%v, RefIdc=%v, %v bytes", + frame.UnitType.String(), frame.PictureOrderCount, frame.ForbiddenZeroBit, frame.RefIdc, len(frame.Data)) + + if frame.UnitType == h264reader.NalUnitTypeSPS { + sps = frame + } else if frame.UnitType == h264reader.NalUnitTypePPS { + pps = frame + } else { + break + } + } + + var frames []*h264reader.NAL + // Package SPS/PPS to STAP-A + if sps != nil && pps != nil { + stapA := packageAsSTAPA(sps, pps) + frames = append(frames, stapA) + } + // Append other original frames. + for _, frame := range oFrames { + if frame.UnitType != h264reader.NalUnitTypeSPS && frame.UnitType != h264reader.NalUnitTypePPS { + frames = append(frames, frame) + } + } + + // Covert frames to sample(buffers). + for i, frame := range frames { + sample := media.Sample{Data: frame.Data, Duration: sampleDuration} + // Use the sample timestamp for frames. + if i != len(frames)-1 { + sample.Duration = 0 + } + + // For STAP-A, set marker to false, to make Chrome happy. + if ri := v.markerInterceptor; ri.rtpWriter == nil { + ri.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + // TODO: Should we decode to check whether SPS/PPS? + if len(payload) > 0 && payload[0]&0x1f == 24 { + header.Marker = false // 24, STAP-A + } + return ri.nextRTPWriter.Write(header, payload, attributes) + } + } + + if err = track.WriteSample(sample); err != nil { + return errors.Wrapf(err, "Write sample") + } + } + + if d := clock.Tick(sampleDuration); d > 0 { + time.Sleep(d) + } + } + + return ctx.Err() +} + +type audioIngester struct { + sourceAudio string + audioLevelInterceptor *RTPInterceptor + sAudioTrack *webrtc.TrackLocalStaticSample + sAudioSender *webrtc.RTPSender +} + +func NewAudioIngester(sourceAudio string) *audioIngester { + return &audioIngester{audioLevelInterceptor: &RTPInterceptor{}, sourceAudio: sourceAudio} +} + +func (v *audioIngester) Close() error { + if v.sAudioSender != nil { + v.sAudioSender.Stop() + v.sAudioSender = nil + } + return nil +} + +func (v *audioIngester) AddTrack(pc *webrtc.PeerConnection) error { + var err error + + mimeType, trackID := "audio/opus", "audio" + v.sAudioTrack, err = webrtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 48000, Channels: 2}, trackID, "pion", + ) + if err != nil { + return errors.Wrapf(err, "Create audio track") + } + + v.sAudioSender, err = pc.AddTrack(v.sAudioTrack) + if err != nil { + return errors.Wrapf(err, "Add audio track") + } + + return nil +} + +func (v *audioIngester) Ingest(ctx context.Context) error { + source, sender, track := v.sourceAudio, v.sAudioSender, v.sAudioTrack + + f, err := os.Open(source) + if err != nil { + return errors.Wrapf(err, "Open file %v", source) + } + defer f.Close() + + ogg, _, err := oggreader.NewWith(f) + if err != nil { + return errors.Wrapf(err, "Open ogg %v", source) + } + + enc := sender.GetParameters().Encodings[0] + codec := sender.GetParameters().Codecs[0] + headers := sender.GetParameters().HeaderExtensions + logger.Tf(ctx, "Audio %v, tbn=%v, channels=%v, ssrc=%v, pt=%v, header=%v", + codec.MimeType, codec.ClockRate, codec.Channels, enc.SSRC, codec.PayloadType, headers) + + // Whether should encode the audio-level in RTP header. + var audioLevel *webrtc.RTPHeaderExtensionParameter + for _, h := range headers { + if h.URI == sdp.AudioLevelURI { + audioLevel = &h + } + } + + clock := newWallClock() + var lastGranule uint64 + + for ctx.Err() == nil { + pageData, pageHeader, err := ogg.ParseNextPage() + if err == io.EOF { + return io.EOF + } + if err != nil { + return errors.Wrapf(err, "Read ogg") + } + + // The amount of samples is the difference between the last and current timestamp + sampleCount := uint64(pageHeader.GranulePosition - lastGranule) + lastGranule = pageHeader.GranulePosition + sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 * sampleCount / uint64(codec.ClockRate)) + + // For audio-level, set the extensions if negotiated. + if ri := v.audioLevelInterceptor; ri.rtpWriter == nil { + ri.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + if audioLevel != nil { + audioLevelPayload, err := new(rtp.AudioLevelExtension).Marshal() + if err != nil { + return 0, err + } + + header.SetExtension(uint8(audioLevel.ID), audioLevelPayload) + } + + return ri.nextRTPWriter.Write(header, payload, attributes) + } + } + + if err = track.WriteSample(media.Sample{Data: pageData, Duration: sampleDuration}); err != nil { + return errors.Wrapf(err, "Write sample") + } + + if d := clock.Tick(sampleDuration); d > 0 { + time.Sleep(d) + } + } + + return ctx.Err() +} diff --git a/trunk/3rdparty/srs-bench/srs/interceptor.go b/trunk/3rdparty/srs-bench/srs/interceptor.go new file mode 100644 index 000000000..d853aaf7d --- /dev/null +++ b/trunk/3rdparty/srs-bench/srs/interceptor.go @@ -0,0 +1,175 @@ +// The MIT License (MIT) +// +// Copyright (c) 2021 srs-bench(ossrs) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +package srs + +import ( + "github.com/pion/interceptor" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +type RTPInterceptorOptionFunc func(i *RTPInterceptor) + +// Common RTP packet interceptor for benchmark. +// @remark Should never merge with RTCPInterceptor, because they has the same Write interface. +type RTPInterceptor struct { + localInfo *interceptor.StreamInfo + remoteInfo *interceptor.StreamInfo + // If rtpReader is nil, use the default next one to read. + rtpReader interceptor.RTPReaderFunc + nextRTPReader interceptor.RTPReader + // If rtpWriter is nil, use the default next one to write. + rtpWriter interceptor.RTPWriterFunc + nextRTPWriter interceptor.RTPWriter + BypassInterceptor +} + +func NewRTPInterceptor(options ...RTPInterceptorOptionFunc) *RTPInterceptor { + v := &RTPInterceptor{} + for _, opt := range options { + opt(v) + } + return v +} + +func (v *RTPInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + if v.localInfo != nil { + return writer // Only handle one stream. + } + + v.localInfo = info + v.nextRTPWriter = writer + return v // Handle all RTP +} + +func (v *RTPInterceptor) Write(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + if v.rtpWriter != nil { + return v.rtpWriter(header, payload, attributes) + } + return v.nextRTPWriter.Write(header, payload, attributes) +} + +func (v *RTPInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { + if v.localInfo == nil || v.localInfo.ID != info.ID { + return + } + v.localInfo = nil // Reset the interceptor. +} + +func (v *RTPInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + if v.remoteInfo != nil { + return reader // Only handle one stream. + } + + v.nextRTPReader = reader + return v // Handle all RTP +} + +func (v *RTPInterceptor) Read(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + if v.rtpReader != nil { + return v.rtpReader(b, a) + } + return v.nextRTPReader.Read(b, a) +} + +func (v *RTPInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) { + if v.remoteInfo == nil || v.remoteInfo.ID != info.ID { + return + } + v.remoteInfo = nil +} + +type RTCPInterceptorOptionFunc func(i *RTCPInterceptor) + +// Common RTCP packet interceptor for benchmark. +// @remark Should never merge with RTPInterceptor, because they has the same Write interface. +type RTCPInterceptor struct { + // If rtcpReader is nil, use the default next one to read. + rtcpReader interceptor.RTCPReaderFunc + nextRTCPReader interceptor.RTCPReader + // If rtcpWriter is nil, use the default next one to write. + rtcpWriter interceptor.RTCPWriterFunc + nextRTCPWriter interceptor.RTCPWriter + BypassInterceptor +} + +func NewRTCPInterceptor(options ...RTCPInterceptorOptionFunc) *RTCPInterceptor { + v := &RTCPInterceptor{} + for _, opt := range options { + opt(v) + } + return v +} + +func (v *RTCPInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { + v.nextRTCPReader = reader + return v // Handle all RTCP +} + +func (v *RTCPInterceptor) Read(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + if v.rtcpReader != nil { + return v.rtcpReader(b, a) + } + return v.nextRTCPReader.Read(b, a) +} + +func (v *RTCPInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + v.nextRTCPWriter = writer + return v // Handle all RTCP +} + +func (v *RTCPInterceptor) Write(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { + if v.rtcpWriter != nil { + return v.rtcpWriter(pkts, attributes) + } + return v.nextRTCPWriter.Write(pkts, attributes) +} + +// Do nothing. +type BypassInterceptor struct { + interceptor.Interceptor +} + +func (v *BypassInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { + return reader +} + +func (v *BypassInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + return writer +} + +func (v *BypassInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + return writer +} + +func (v *BypassInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { +} + +func (v *BypassInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + return reader +} + +func (v *BypassInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) { +} + +func (v *BypassInterceptor) Close() error { + return nil +} diff --git a/trunk/3rdparty/srs-bench/srs/util_test.go b/trunk/3rdparty/srs-bench/srs/util_test.go new file mode 100644 index 000000000..0c358f8e7 --- /dev/null +++ b/trunk/3rdparty/srs-bench/srs/util_test.go @@ -0,0 +1,721 @@ +// The MIT License (MIT) +// +// Copyright (c) 2021 srs-bench(ossrs) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +package srs + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" + vnet_proxy "github.com/ossrs/srs-bench/vnet" + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/transport/vnet" + "github.com/pion/webrtc/v3" + "io" + "io/ioutil" + "net/http" + "os" + "path" + "strings" + "sync" + "testing" + "time" +) + +var srsSchema = "http" +var srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API") +var srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to") +var srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play") +var srsLog = flag.Bool("srs-log", false, "Whether enable the detail log") +var srsTimeout = flag.Int("srs-timeout", 5000, "For each case, the timeout in ms") +var srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.") +var srsPlayOKPackets = flag.Int("srs-play-ok-packets", 10, "If got N packets, it's ok, or fail") +var srsPublishOKPackets = flag.Int("srs-publish-ok-packets", 10, "If send N packets, it's ok, or fail") +var srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.") +var srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.") +var srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.") +var srsVnetClientIP = flag.String("srs-vnet-client-ip", "192.168.168.168", "The client ip in pion/vnet.") +var srsDTLSDropPackets = flag.Int("srs-dtls-drop-packets", 5, "If dropped N packets, it's ok, or fail") + +func prepareTest() error { + var err error + + // Should parse it first. + flag.Parse() + + // The stream should starts with /, for example, /rtc/regression + if !strings.HasPrefix(*srsStream, "/") { + *srsStream = "/" + *srsStream + } + + // Generate srs protocol from whether use HTTPS. + if *srsHttps { + srsSchema = "https" + } + + // Check file. + tryOpenFile := func(filename string) (string, error) { + if filename == "" { + return filename, nil + } + + f, err := os.Open(filename) + if err != nil { + nfilename := path.Join("../", filename) + if fi, r0 := os.Stat(nfilename); r0 != nil && !fi.IsDir() { + return filename, errors.Wrapf(err, "No video file at %v or %v", filename, nfilename) + } + + return nfilename, nil + } + defer f.Close() + + return filename, nil + } + + if *srsPublishVideo, err = tryOpenFile(*srsPublishVideo); err != nil { + return err + } + + if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil { + return err + } + + return nil +} + +func TestMain(m *testing.M) { + if err := prepareTest(); err != nil { + logger.Ef(nil, "Prepare test fail, err %+v", err) + os.Exit(-1) + } + + // Disable the logger during all tests. + if *srsLog == false { + olw := logger.Switch(ioutil.Discard) + defer func() { + logger.Switch(olw) + }() + } + + os.Exit(m.Run()) +} + +type TestWebRTCAPIOptionFunc func(api *TestWebRTCAPI) + +type TestWebRTCAPI struct { + // The options to setup the api. + options []TestWebRTCAPIOptionFunc + // The api and settings. + api *webrtc.API + mediaEngine *webrtc.MediaEngine + registry *interceptor.Registry + settingEngine *webrtc.SettingEngine + // The vnet router, can be shared by different apis, but we do not share it. + router *vnet.Router + // The network for api. + network *vnet.Net + // The vnet UDP proxy bind to the router. + proxy *vnet_proxy.UDPProxy +} + +func NewTestWebRTCAPI(options ...TestWebRTCAPIOptionFunc) (*TestWebRTCAPI, error) { + v := &TestWebRTCAPI{} + + v.mediaEngine = &webrtc.MediaEngine{} + if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil { + return nil, err + } + + v.registry = &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(v.mediaEngine, v.registry); err != nil { + return nil, err + } + + for _, setup := range options { + setup(v) + } + + v.settingEngine = &webrtc.SettingEngine{} + + return v, nil +} + +func (v *TestWebRTCAPI) Close() error { + if v.proxy != nil { + v.proxy.Close() + v.proxy = nil + } + + if v.router != nil { + v.router.Stop() + v.router = nil + } + + return nil +} + +func (v *TestWebRTCAPI) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error { + // Setting engine for https://github.com/pion/transport/tree/master/vnet + setupVnet := func(vnetClientIP string) (err error) { + // We create a private router for a api, however, it's possible to share the + // same router between apis. + if v.router, err = vnet.NewRouter(&vnet.RouterConfig{ + CIDR: "0.0.0.0/0", // Accept all ip, no sub router. + LoggerFactory: logging.NewDefaultLoggerFactory(), + }); err != nil { + return errors.Wrapf(err, "create router for api") + } + + // Each api should bind to a network, however, it's possible to share it + // for different apis. + v.network = vnet.NewNet(&vnet.NetConfig{ + StaticIP: vnetClientIP, + }) + + if err = v.router.AddNet(v.network); err != nil { + return errors.Wrapf(err, "create network for api") + } + + v.settingEngine.SetVNet(v.network) + + // Create a proxy bind to the router. + if v.proxy, err = vnet_proxy.NewProxy(v.router); err != nil { + return errors.Wrapf(err, "create proxy for router") + } + + return v.router.Start() + } + if err := setupVnet(vnetClientIP); err != nil { + return err + } + + for _, setup := range options { + setup(v) + } + + for _, setup := range v.options { + setup(v) + } + + v.api = webrtc.NewAPI( + webrtc.WithMediaEngine(v.mediaEngine), + webrtc.WithInterceptorRegistry(v.registry), + webrtc.WithSettingEngine(*v.settingEngine), + ) + + return nil +} + +func (v *TestWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) { + return v.api.NewPeerConnection(configuration) +} + +type TestPlayerOptionFunc func(p *TestPlayer) + +type TestPlayer struct { + pc *webrtc.PeerConnection + receivers []*webrtc.RTPReceiver + // root api object + api *TestWebRTCAPI + // Optional suffix for stream url. + streamSuffix string +} + +func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) *TestPlayer { + v := &TestPlayer{api: api} + + for _, opt := range options { + opt(v) + } + + return v +} + +func (v *TestPlayer) Close() error { + if v.pc != nil { + v.pc.Close() + v.pc = nil + } + + for _, receiver := range v.receivers { + receiver.Stop() + } + v.receivers = nil + + return nil +} + +func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error { + r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) + if v.streamSuffix != "" { + r = fmt.Sprintf("%v-%v", r, v.streamSuffix) + } + pli := time.Duration(*srsPlayPLI) * time.Millisecond + logger.Tf(ctx, "Start play url=%v", r) + + pc, err := v.api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return errors.Wrapf(err, "Create PC") + } + v.pc = pc + + pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }) + pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }) + + offer, err := pc.CreateOffer(nil) + if err != nil { + return errors.Wrapf(err, "Create Offer") + } + + if err := pc.SetLocalDescription(offer); err != nil { + return errors.Wrapf(err, "Set offer %v", offer) + } + + answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP) + if err != nil { + return errors.Wrapf(err, "Api request offer=%v", offer.SDP) + } + + // Start a proxy for real server and vnet. + if address, err := parseAddressOfCandidate(answer); err != nil { + return errors.Wrapf(err, "parse address of %v", answer) + } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil { + return errors.Wrapf(err, "proxy %v to %v", v.api.network, address) + } + + if err := pc.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, SDP: answer, + }); err != nil { + return errors.Wrapf(err, "Set answer %v", answer) + } + + handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error { + // Send a PLI on an interval so that the publisher is pushing a keyframe + go func() { + if track.Kind() == webrtc.RTPCodecTypeAudio { + return + } + + for { + select { + case <-ctx.Done(): + return + case <-time.After(pli): + _ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ + MediaSSRC: uint32(track.SSRC()), + }}) + } + } + }() + + v.receivers = append(v.receivers, receiver) + + for ctx.Err() == nil { + _, _, err := track.ReadRTP() + if err != nil { + return errors.Wrapf(err, "Read RTP") + } + } + + return nil + } + + pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + err = handleTrack(ctx, track, receiver) + if err != nil { + codec := track.Codec() + err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType) + cancel() + } + }) + + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed { + err = errors.Errorf("Close for ICE state %v", state) + cancel() + } + }) + + <-ctx.Done() + return err +} + +type TestPublisherOptionFunc func(p *TestPublisher) + +type TestPublisher struct { + onOffer func(s *webrtc.SessionDescription) error + onAnswer func(s *webrtc.SessionDescription) error + iceReadyCancel context.CancelFunc + // internal objects + aIngester *audioIngester + vIngester *videoIngester + pc *webrtc.PeerConnection + // root api object + api *TestWebRTCAPI + // Optional suffix for stream url. + streamSuffix string +} + +func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) *TestPublisher { + sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio + + v := &TestPublisher{api: api} + + for _, opt := range options { + opt(v) + } + + // Create ingesters. + if sourceAudio != "" { + v.aIngester = NewAudioIngester(sourceAudio) + } + if sourceVideo != "" { + v.vIngester = NewVideoIngester(sourceVideo) + } + + // Setup the interceptors for packets. + api.options = append(api.options, func(api *TestWebRTCAPI) { + // Filter for RTCP packets. + rtcpInterceptor := &RTCPInterceptor{} + rtcpInterceptor.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + return rtcpInterceptor.nextRTCPReader.Read(buf, attributes) + } + rtcpInterceptor.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { + return rtcpInterceptor.nextRTCPWriter.Write(pkts, attributes) + } + api.registry.Add(rtcpInterceptor) + + // Filter for ingesters. + if sourceAudio != "" { + api.registry.Add(v.aIngester.audioLevelInterceptor) + } + if sourceVideo != "" { + api.registry.Add(v.vIngester.markerInterceptor) + } + }) + + return v +} + +func (v *TestPublisher) Close() error { + if v.vIngester != nil { + v.vIngester.Close() + } + + if v.aIngester != nil { + v.aIngester.Close() + } + + if v.pc != nil { + v.pc.Close() + } + + return nil +} + +func (v *TestPublisher) SetStreamSuffix(suffix string) *TestPublisher { + v.streamSuffix = suffix + return v +} + +func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) error { + r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) + if v.streamSuffix != "" { + r = fmt.Sprintf("%v-%v", r, v.streamSuffix) + } + sourceVideo, sourceAudio, fps := *srsPublishVideo, *srsPublishAudio, *srsPublishVideoFps + + logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v", + r, sourceAudio, sourceVideo, fps) + + pc, err := v.api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return errors.Wrapf(err, "Create PC") + } + v.pc = pc + + if v.vIngester != nil { + if err := v.vIngester.AddTrack(pc, fps); err != nil { + return errors.Wrapf(err, "Add track") + } + defer v.vIngester.Close() + } + + if v.aIngester != nil { + if err := v.aIngester.AddTrack(pc); err != nil { + return errors.Wrapf(err, "Add track") + } + defer v.aIngester.Close() + } + + offer, err := pc.CreateOffer(nil) + if err != nil { + return errors.Wrapf(err, "Create Offer") + } + + if err := pc.SetLocalDescription(offer); err != nil { + return errors.Wrapf(err, "Set offer %v", offer) + } + + if v.onOffer != nil { + if err := v.onOffer(&offer); err != nil { + return errors.Wrapf(err, "sdp %v %v", offer.Type, offer.SDP) + } + } + + answerSDP, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP) + if err != nil { + return errors.Wrapf(err, "Api request offer=%v", offer.SDP) + } + + // Start a proxy for real server and vnet. + if address, err := parseAddressOfCandidate(answerSDP); err != nil { + return errors.Wrapf(err, "parse address of %v", answerSDP) + } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil { + return errors.Wrapf(err, "proxy %v to %v", v.api.network, address) + } + + answer := &webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, SDP: answerSDP, + } + if v.onAnswer != nil { + if err := v.onAnswer(answer); err != nil { + return errors.Wrapf(err, "on answerSDP") + } + } + + if err := pc.SetRemoteDescription(*answer); err != nil { + return errors.Wrapf(err, "Set answerSDP %v", answerSDP) + } + + logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState()) + + // ICE state management. + pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) { + logger.Tf(ctx, "ICE gather state %v", state) + }) + pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { + logger.Tf(ctx, "ICE candidate %v %v:%v", candidate.Protocol, candidate.Address, candidate.Port) + + }) + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + logger.Tf(ctx, "ICE state %v", state) + }) + + pc.OnSignalingStateChange(func(state webrtc.SignalingState) { + logger.Tf(ctx, "Signaling state %v", state) + }) + + if v.aIngester != nil { + v.aIngester.sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) { + logger.Tf(ctx, "DTLS state %v", state) + }) + } + + pcDone, pcDoneCancel := context.WithCancel(context.Background()) + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + logger.Tf(ctx, "PC state %v", state) + + if state == webrtc.PeerConnectionStateConnected { + pcDoneCancel() + if v.iceReadyCancel != nil { + v.iceReadyCancel() + } + } + + if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed { + err = errors.Errorf("Close for PC state %v", state) + cancel() + } + }) + + // Wait for event from context or tracks. + var wg sync.WaitGroup + var finalErr error + + wg.Add(1) + go func() { + defer wg.Done() + defer logger.Tf(ctx, "ingest notify done") + + <-ctx.Done() + + if v.aIngester != nil && v.aIngester.sAudioSender != nil { + v.aIngester.sAudioSender.Stop() + } + + if v.vIngester != nil && v.vIngester.sVideoSender != nil { + v.vIngester.sVideoSender.Stop() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + if v.aIngester == nil { + return + } + + select { + case <-ctx.Done(): + return + case <-pcDone.Done(): + } + + wg.Add(1) + go func() { + defer wg.Done() + defer logger.Tf(ctx, "aingester sender read done") + + buf := make([]byte, 1500) + for ctx.Err() == nil { + if _, _, err := v.aIngester.sAudioSender.Read(buf); err != nil { + return + } + } + }() + + for { + if err := v.aIngester.Ingest(ctx); err != nil { + if err == io.EOF { + logger.Tf(ctx, "aingester retry for %v", err) + continue + } + if err != context.Canceled { + finalErr = errors.Wrapf(err, "audio") + } + + logger.Tf(ctx, "aingester err=%v, final=%v", err, finalErr) + return + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + if v.vIngester == nil { + return + } + + select { + case <-ctx.Done(): + return + case <-pcDone.Done(): + logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo) + } + + wg.Add(1) + go func() { + defer wg.Done() + defer logger.Tf(ctx, "vingester sender read done") + + buf := make([]byte, 1500) + for ctx.Err() == nil { + // The Read() might block in r.rtcpInterceptor.Read(b, a), + // so that the Stop() can not stop it. + if _, _, err := v.vIngester.sVideoSender.Read(buf); err != nil { + return + } + } + }() + + for { + if err := v.vIngester.Ingest(ctx); err != nil { + if err == io.EOF { + logger.Tf(ctx, "vingester retry for %v", err) + continue + } + if err != context.Canceled { + finalErr = errors.Wrapf(err, "video") + } + + logger.Tf(ctx, "vingester err=%v, final=%v", err, finalErr) + return + } + } + }() + + wg.Wait() + + logger.Tf(ctx, "ingester done ctx=%v, final=%v", ctx.Err(), finalErr) + if finalErr != nil { + return finalErr + } + return ctx.Err() +} + +func TestRTCServerVersion(t *testing.T) { + api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer) + req, err := http.NewRequest("POST", api, nil) + if err != nil { + t.Errorf("Request %v", api) + return + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + t.Errorf("Do request %v", api) + return + } + + b, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("Read body of %v", api) + return + } + + obj := struct { + Code int `json:"code"` + Server string `json:"server"` + Data struct { + Major int `json:"major"` + Minor int `json:"minor"` + Revision int `json:"revision"` + Version string `json:"version"` + } `json:"data"` + }{} + if err := json.Unmarshal(b, &obj); err != nil { + t.Errorf("Parse %v", string(b)) + return + } + if obj.Code != 0 { + t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server) + return + } + if obj.Data.Major == 0 && obj.Data.Minor == 0 { + t.Errorf("Invalid version %v", obj.Data) + return + } +}