diff --git a/pkg/innertest/innertest.go b/pkg/innertest/innertest.go index 524d1fb..4854a5d 100644 --- a/pkg/innertest/innertest.go +++ b/pkg/innertest/innertest.go @@ -10,6 +10,8 @@ package innertest import ( "fmt" + "github.com/q191201771/naza/pkg/nazabytes" + "github.com/q191201771/naza/pkg/nazalog" "io/ioutil" "net/http" "os" @@ -58,22 +60,23 @@ var ( confFile = "../../testdata/lalserver.conf.json" rFlvFileName = "../../testdata/test.flv" - wRtmpPullFileName = "../../testdata/rtmppull.flv" - wFlvPullFileName = "../../testdata/flvpull.flv" - wTsPullFileName = "../../testdata/tspull.flv" - wPlaylistM3u8FileName string - wRecordM3u8FileName string - wHlsTsFilePath string - pushUrl string httpflvPullUrl string httptsPullUrl string rtmpPullUrl string rtspPullUrl string + wRtmpPullFileName string + wFlvPullFileName string + wPlaylistM3u8FileName string + wRecordM3u8FileName string + wHlsTsFilePath string + wTsPullFileName string + fileTagCount int httpflvPullTagCount nazaatomic.Uint32 rtmpPullTagCount nazaatomic.Uint32 + httptsSize nazaatomic.Uint32 rtspSdpCtx sdp.LogicContext rtspPullAvPacketCount nazaatomic.Uint32 @@ -105,6 +108,12 @@ func Entry(tt *testing.T) { mode = 0 entry() + + //mode = 1 + //entry() + // + //mode = 2 + //entry() } func entry() { @@ -117,6 +126,9 @@ func entry() { return } + httpflvPullTagCount.Store(0) + rtmpPullTagCount.Store(0) + httptsSize.Store(0) hls.Clock = mock.NewFakeClock() hls.Clock.Set(time.Date(2022, 1, 16, 23, 24, 25, 0, time.Local)) httpts.SubSessionWriteChanSize = 0 @@ -124,13 +136,16 @@ func entry() { var err error sm := logic.NewServerManager(confFile) - go sm.RunLoop() - time.Sleep(200 * time.Millisecond) - config := sm.Config() + //Log.Init(func(option *nazalog.Option) { + // option.Level = nazalog.LevelLogNothing + //}) _ = os.RemoveAll(config.HlsConfig.OutPath) + go sm.RunLoop() + time.Sleep(100 * time.Millisecond) + getAllHttpApi(config.HttpApiConfig.Addr) pushUrl = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RtmpConfig.Addr) @@ -138,6 +153,10 @@ func entry() { httptsPullUrl = fmt.Sprintf("http://127.0.0.1%s/live/innertest.ts", config.HttpflvConfig.HttpListenAddr) rtmpPullUrl = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RtmpConfig.Addr) rtspPullUrl = fmt.Sprintf("rtsp://127.0.0.1%s/live/innertest", config.RtspConfig.Addr) + + wRtmpPullFileName = "../../testdata/rtmppull.flv" + wFlvPullFileName = "../../testdata/flvpull.flv" + wTsPullFileName = fmt.Sprintf("../../testdata/tspull_%d.ts", mode) wPlaylistM3u8FileName = fmt.Sprintf("%sinnertest/playlist.m3u8", config.HlsConfig.OutPath) wRecordM3u8FileName = fmt.Sprintf("%sinnertest/record.m3u8", config.HlsConfig.OutPath) wHlsTsFilePath = fmt.Sprintf("%sinnertest/", config.HlsConfig.OutPath) @@ -191,6 +210,7 @@ func entry() { }() go func() { + var flvErr error httpflvPullSession = httpflv.NewPullSession(func(option *httpflv.PullSessionOption) { option.ReadTimeoutMs = 10000 }) @@ -200,20 +220,18 @@ func entry() { httpflvPullTagCount.Increment() }) Log.Assert(nil, err) - err = <-httpflvPullSession.WaitChan() - Log.Debug(err) + flvErr = <-httpflvPullSession.WaitChan() + Log.Debug(flvErr) }() go func() { - //nazalog.Info("CHEFGREPME >") - b, err := httpGet(httptsPullUrl) - //_ = ioutil.WriteFile(wTsPullFileName, b, 0666) + b, _ := getHttpts() + _ = ioutil.WriteFile(wTsPullFileName, b, 0666) assert.Equal(t, goldenHttptsLenList[mode], len(b)) assert.Equal(t, goldenHttptsMd5List[mode], nazamd5.Md5(b)) - Log.Infof("CHEFGREPME %+v", err) + Log.Debugf("CHEFGREPME < get. %d %d", len(b), httptsSize.Load()) }() - - time.Sleep(200 * time.Millisecond) + time.Sleep(100 * time.Millisecond) // TODO(chef): [test] [2021.12.25] rtsp sub测试 由于rtsp sub不支持没有pub时sub,只能sub失败后重试,所以没有验证收到的数据 // TODO(chef): [perf] [2021.12.25] rtmp推rtsp拉的性能。开启rtsp pull后,rtmp pull的总时长增加了 @@ -233,9 +251,11 @@ func entry() { } }() + time.Sleep(100 * time.Millisecond) + pushSession = rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { option.WriteBufSize = 4096 - option.WriteChanSize = 1024 + //option.WriteChanSize = 1024 }) err = pushSession.Push(pushUrl) assert.Equal(t, nil, err) @@ -244,7 +264,7 @@ func entry() { assert.Equal(t, nil, err) chunks := remux.FlvTag2RtmpChunks(tag) //Log.Debugf("rtmp push: %d", fileTagCount.Load()) - err = pushSession.Write(chunks) + err := pushSession.Write(chunks) assert.Equal(t, nil, err) } err = pushSession.Flush() @@ -252,21 +272,25 @@ func entry() { getAllHttpApi(config.HttpApiConfig.Addr) + // 注意,先释放push,触发pub释放,从而刷新hls的结束时切片逻辑 + pushSession.Dispose() + for { if httpflvPullTagCount.Load() == uint32(fileTagCount) && - rtmpPullTagCount.Load() == uint32(fileTagCount) { - time.Sleep(100 * time.Millisecond) + rtmpPullTagCount.Load() == uint32(fileTagCount) && + rtmpPullTagCount.Load() == uint32(fileTagCount) && + httptsSize.Load() == uint32(goldenHttptsLenList[mode]) { break } - time.Sleep(10 * time.Millisecond) + nazalog.Debugf("%d %d %d", httpflvPullTagCount.Load(), rtmpPullTagCount.Load(), httptsSize.Load()) + time.Sleep(100 * time.Millisecond) } Log.Debug("[innertest] start dispose.") - pushSession.Dispose() httpflvPullSession.Dispose() rtmpPullSession.Dispose() - //rtspPullSession.Dispose() + rtspPullSession.Dispose() httpFlvWriter.Dispose() rtmpWriter.Dispose() @@ -356,6 +380,27 @@ func getAllHttpApi(addr string) { Log.Debugf("%s", string(b)) } +func getHttpts() ([]byte, error) { + resp, err := http.DefaultClient.Get(httptsPullUrl) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var buf nazabytes.Buffer + for { + buf.ReserveBytes(10000) + n, err := resp.Body.Read(buf.WritableBytes()) + if n > 0 { + buf.Flush(n) + httptsSize.Add(uint32(n)) + } + if err != nil { + return buf.Bytes(), err + } + } +} + // --------------------------------------------------------------------------------------------------------------------- // TODO(chef): refactor 移入naza中 diff --git a/pkg/innertest/innertest_test.go b/pkg/innertest/innertest_test.go new file mode 100644 index 0000000..605f71c --- /dev/null +++ b/pkg/innertest/innertest_test.go @@ -0,0 +1,15 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package innertest + +import "testing" + +func TestEntry(t *testing.T) { + Entry(t) +}