diff --git a/Makefile b/Makefile index 4f40838..0d66f37 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,8 @@ image: .PHONY: clean clean: rm -rf ./bin ./lal_record ./logs coverage.txt + find ./pkg -name 'lal_record' | xargs rm -rf + find ./pkg -name 'logs' | xargs rm -rf .PHONY: all all: build test diff --git a/app/demo/pullrtmp2hls/pullrtmp2hls.go b/app/demo/pullrtmp2hls/pullrtmp2hls.go index edf8282..a9de549 100644 --- a/app/demo/pullrtmp2hls/pullrtmp2hls.go +++ b/app/demo/pullrtmp2hls/pullrtmp2hls.go @@ -14,6 +14,8 @@ import ( "os" "path/filepath" + "github.com/q191201771/lal/pkg/remux" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/hls" "github.com/q191201771/lal/pkg/rtmp" @@ -46,13 +48,13 @@ func main() { hlsMuexer := hls.NewMuxer(streamName, true, &hlsMuxerConfig, nil) hlsMuexer.Start() + rtmp2Mpegts := remux.NewRtmp2MpegtsRemuxer(hlsMuexer) + pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { option.PullTimeoutMs = 10000 option.ReadAvTimeoutMs = 10000 }) - err = pullSession.Pull(url, func(msg base.RtmpMsg) { - hlsMuexer.FeedRtmpMessage(msg) - }) + err = pullSession.Pull(url, rtmp2Mpegts.FeedRtmpMessage) if err != nil { nazalog.Fatalf("pull rtmp failed. err=%+v", err) diff --git a/go.mod b/go.mod index 427de17..ab1f6f9 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.14 -require github.com/q191201771/naza v0.30.0 +require github.com/q191201771/naza v0.30.1 diff --git a/go.sum b/go.sum index 96cd5c5..d89f26d 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/naza v0.30.0 h1:tfy1O0QRl3O80mH8PSAd2FhpZ5eL7coQtCF0HzjEO4Y= -github.com/q191201771/naza v0.30.0/go.mod h1:n+dpJjQSh90PxBwxBNuifOwQttywvSIN5TkWSSYCeBk= +github.com/q191201771/naza v0.30.1 h1:8EAydcrHs+4lUjP4XBJvTlevuOzVcZIoGj5ZK6Y6Njc= +github.com/q191201771/naza v0.30.1/go.mod h1:n+dpJjQSh90PxBwxBNuifOwQttywvSIN5TkWSSYCeBk= diff --git a/pkg/aac/aac.go b/pkg/aac/aac.go index fcfd128..df47c64 100644 --- a/pkg/aac/aac.go +++ b/pkg/aac/aac.go @@ -174,7 +174,7 @@ func (ascCtx *AscContext) GetSamplingFrequency() (int, error) { case AscSamplingFrequencyIndex44100: return 44100, nil } - return -1, fmt.Errorf("%w. index=%d", base.ErrSamplingFrequencyIndex, ascCtx.SamplingFrequencyIndex) + return -1, fmt.Errorf("%w. asCtx=%+v", base.ErrSamplingFrequencyIndex, ascCtx) } type AdtsHeaderContext struct { diff --git a/pkg/avc/avc.go b/pkg/avc/avc.go index eadab0b..7846d14 100644 --- a/pkg/avc/avc.go +++ b/pkg/avc/avc.go @@ -441,7 +441,6 @@ func SplitNaluAvcc(nals []byte) (nalList [][]byte, err error) { nalList = append(nalList, nal) }) return - } func IterateNaluAnnexb(nals []byte, handler func(nal []byte)) error { @@ -517,4 +516,14 @@ func IterateNaluAvcc(nals []byte, handler func(nal []byte)) error { } } +func Avcc2Annexb(nals []byte) ([]byte, error) { + ret := make([]byte, len(nals)) + ret = ret[0:0] + err := IterateNaluAvcc(nals, func(nal []byte) { + ret = append(ret, NaluStartCode4...) + ret = append(ret, nal...) + }) + return ret, err +} + // TODO(chef): 是否需要 func NaluAvcc2Annexb, func NaluAnnexb2Avcc diff --git a/pkg/base/session.go b/pkg/base/session.go index d3d50d1..b40ab45 100644 --- a/pkg/base/session.go +++ b/pkg/base/session.go @@ -13,6 +13,21 @@ import ( "strings" ) +// group中,session Dispose表现记录 +// +// Dispose结束后回调OnDel: +// rtmp.ServerSession(包含pub和sub) 1 +// rtsp.PubSession和rtsp.SubSession 1 +// rtmp.PullSession 2 +// httpflv.SubSession 3 +// httpts.SubSession 3 +// +// +// 情况1: 协议正常走完回调OnAdd,在自身server的RunLoop结束后,回调OnDel +// 情况2: 在group中pull阻塞结束后,手动回调OnDel +// 情况3: 在logic中sub RunLoop结束后,手动回调OnDel +// + // IsUseClosedConnectionError 当connection处于这些情况时,就不需要再Close了 // TODO(chef): 临时放这 // TODO(chef): 目前暂时没有使用,因为connection支持多次调用Close @@ -81,7 +96,9 @@ type IClientSessionLifecycle interface { type IServerSessionLifecycle interface { // RunLoop 开启session的事件循环,阻塞直到session结束 // - RunLoop() error + // 注意,rtsp的 pub和sub没有RunLoop,RunLoop是在cmd session上,所以暂时把这个函数从接口去除 + // + //RunLoop() error // Dispose 主动关闭session时调用 // diff --git a/pkg/base/unique.go b/pkg/base/unique.go index 76e4c55..35422da 100644 --- a/pkg/base/unique.go +++ b/pkg/base/unique.go @@ -23,9 +23,9 @@ const ( UkPreTsSubSession = "TSSUB" UkPreFlvPullSession = "FLVPULL" - UkPreGroup = "GROUP" - UkPreHlsMuxer = "HLSMUXER" - UkPreStreamer = "STREAMER" + UkPreGroup = "GROUP" + UkPreHlsMuxer = "HLSMUXER" + UkPreRtmp2MpegtsRemuxer = "RTMP2MPEGTS" ) //func GenUk(prefix string) string { @@ -84,8 +84,8 @@ func GenUkHlsMuxer() string { return siUkHlsMuxer.GenUniqueKey() } -func GenUkStreamer() string { - return siUkStreamer.GenUniqueKey() +func GenUkRtmp2MpegtsRemuxer() string { + return siUkRtmp2MpegtsRemuxer.GenUniqueKey() } var ( @@ -101,9 +101,9 @@ var ( siUkTsSubSession *unique.SingleGenerator siUkFlvPullSession *unique.SingleGenerator - siUkGroup *unique.SingleGenerator - siUkHlsMuxer *unique.SingleGenerator - siUkStreamer *unique.SingleGenerator + siUkGroup *unique.SingleGenerator + siUkHlsMuxer *unique.SingleGenerator + siUkRtmp2MpegtsRemuxer *unique.SingleGenerator ) func init() { @@ -121,5 +121,5 @@ func init() { siUkGroup = unique.NewSingleGenerator(UkPreGroup) siUkHlsMuxer = unique.NewSingleGenerator(UkPreHlsMuxer) - siUkStreamer = unique.NewSingleGenerator(UkPreStreamer) + siUkRtmp2MpegtsRemuxer = unique.NewSingleGenerator(UkPreRtmp2MpegtsRemuxer) } diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index d7bc1e8..2026cae 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -19,19 +19,8 @@ import ( "github.com/q191201771/lal/pkg/base" ) -// TODO chef: 转换TS流的功能(通过回调供httpts使用)也放在了Muxer中,好处是hls和httpts可以共用一份TS流。 -// 后续从架构上考虑,packet hls,mpegts,logic的分工 - type MuxerObserver interface { - OnPatPmt(b []byte) - - // OnTsPackets - // - // @param rawFrame: TS流,回调结束后,内部不再使用该内存块 - // - // @param boundary: 新的TS流接收者,应该从该标志为true时开始发送数据 - // - OnTsPackets(rawFrame []byte, boundary bool) + OnFragmentOpen() } // MuxerConfig @@ -54,7 +43,7 @@ const ( // Muxer // -// 输入rtmp流,转出hls(m3u8+ts)至文件中,并回调给上层转出ts流 +// 输入mpegts流,转出hls(m3u8+ts)至文件中 // type Muxer struct { UniqueKey string @@ -86,8 +75,7 @@ type Muxer struct { frag int // frag 写入m3u8的EXT-X-MEDIA-SEQUENCE字段 frags []fragmentInfo // frags TS文件的固定大小环形队列,记录TS的信息 - streamer *Streamer - patpmt []byte + patpmt []byte } // 记录fragment的一些信息,注意,写m3u8文件时可能还需要用到历史fragment的信息 @@ -123,8 +111,6 @@ func NewMuxer(streamName string, enable bool, config *MuxerConfig, observer Muxe observer: observer, } m.makeFrags() - streamer := NewStreamer(m) - m.streamer = streamer Log.Infof("[%s] lifecycle new hls muxer. muxer=%p, streamName=%s", uk, m, streamName) return m } @@ -136,30 +122,32 @@ func (m *Muxer) Start() { func (m *Muxer) Dispose() { Log.Infof("[%s] lifecycle dispose hls muxer.", m.UniqueKey) - m.streamer.FlushAudio() if err := m.closeFragment(true); err != nil { Log.Errorf("[%s] close fragment error. err=%+v", m.UniqueKey, err) } } -// FeedRtmpMessage @param msg 函数调用结束后,内部不持有msg中的内存块 +// --------------------------------------------------------------------------------------------------------------------- + +// OnPatPmt OnTsPackets +// +// 实现 remux.Rtmp2MpegtsRemuxerObserver,方便直接将 remux.Rtmp2MpegtsRemuxer 的数据喂入 hls.Muxer // -func (m *Muxer) FeedRtmpMessage(msg base.RtmpMsg) { - m.streamer.FeedRtmpMessage(msg) +func (m *Muxer) OnPatPmt(b []byte) { + m.FeedPatPmt(b) +} + +func (m *Muxer) OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) { + m.FeedMpegts(tsPackets, frame, boundary) } -// ----- implement StreamerObserver of Streamer ------------------------------------------------------------------------ +// --------------------------------------------------------------------------------------------------------------------- -func (m *Muxer) OnPatPmt(b []byte) { +func (m *Muxer) FeedPatPmt(b []byte) { m.patpmt = b - if m.observer != nil { - m.observer.OnPatPmt(b) - } } -func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool) { - var packets []byte - +func (m *Muxer) FeedMpegts(tsPackets []byte, frame *mpegts.Frame, boundary bool) { if frame.Sid == mpegts.StreamIdAudio { // TODO(chef): 为什么音频用pts,视频用dts if err := m.updateFragment(frame.Pts, boundary); err != nil { @@ -168,7 +156,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool) } // TODO(chef): 有updateFragment的返回值判断,这里的判断可以考虑删除 if !m.opened { - Log.Warnf("[%s] OnFrame A not opened. boundary=%t", m.UniqueKey, boundary) + Log.Warnf("[%s] FeedMpegts A not opened. boundary=%t", m.UniqueKey, boundary) return } //Log.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw)) @@ -179,25 +167,15 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool) } // TODO(chef): 有updateFragment的返回值判断,这里的判断可以考虑删除 if !m.opened { - Log.Warnf("[%s] OnFrame V not opened. boundary=%t, key=%t", m.UniqueKey, boundary, frame.Key) + Log.Warnf("[%s] FeedMpegts V not opened. boundary=%t, key=%t", m.UniqueKey, boundary, frame.Key) return } //Log.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.Dts, len(frame.Raw)) } - mpegts.PackTsPacket(frame, func(packet []byte) { - if m.enable { - if err := m.fragment.WriteFile(packet); err != nil { - Log.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err) - return - } - } - if m.observer != nil { - packets = append(packets, packet...) - } - }) - if m.observer != nil { - m.observer.OnTsPackets(packets, boundary) + if err := m.fragment.WriteFile(tsPackets); err != nil { + Log.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err) + return } } @@ -314,7 +292,7 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error { m.fragTs = ts // nrm said: start fragment with audio to make iPhone happy - m.streamer.FlushAudio() + m.observer.OnFragmentOpen() return nil } diff --git a/pkg/hls/var.go b/pkg/hls/var.go index 55bd457..322a3d0 100644 --- a/pkg/hls/var.go +++ b/pkg/hls/var.go @@ -20,7 +20,3 @@ var ( Log = nazalog.GetGlobalLogger() ) - -var ( - calcFragmentHeaderQueueSize = 16 -) diff --git a/pkg/innertest/iface_impl.go b/pkg/innertest/iface_impl.go index 288e4cc..da2a040 100644 --- a/pkg/innertest/iface_impl.go +++ b/pkg/innertest/iface_impl.go @@ -65,12 +65,10 @@ var ( // IServerSession var ( _ base.IServerSession = &rtmp.ServerSession{} + _ base.IServerSession = &rtsp.PubSession{} + _ base.IServerSession = &rtsp.SubSession{} _ base.IServerSession = &httpflv.SubSession{} _ base.IServerSession = &httpts.SubSession{} - - // 这两个比较特殊,它们没有RunLoop函数,RunLoop在rtsp.ServerCommandSession上 - //_ base.IServerSession = &rtsp.PubSession{} - //_ base.IServerSession = &rtsp.SubSession{} ) // IClientSessionLifecycle: 所有Client Session都满足 @@ -91,12 +89,11 @@ var ( var ( // server session _ base.IServerSessionLifecycle = &rtmp.ServerSession{} + _ base.IServerSessionLifecycle = &rtsp.PubSession{} + _ base.IServerSessionLifecycle = &rtsp.SubSession{} _ base.IServerSessionLifecycle = &httpflv.SubSession{} _ base.IServerSessionLifecycle = &httpts.SubSession{} - // 这两个比较特殊,它们没有RunLoop函数,RunLoop在rtsp.ServerCommandSession上 - //_ base.IServerSessionLifecycle = &rtsp.PubSession{} - //_ base.IServerSessionLifecycle = &rtsp.SubSession{} // other _ base.IServerSessionLifecycle = &base.HttpSubSession{} _ base.IServerSessionLifecycle = &rtsp.ServerCommandSession{} @@ -188,6 +185,7 @@ var _ rtsp.PubSessionObserver = &remux.AvPacket2RtmpRemuxer{} var _ hls.MuxerObserver = &logic.Group{} var _ rtsp.BaseInSessionObserver = &logic.Group{} // var _ rtsp.BaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{} +var _ remux.Rtmp2MpegtsRemuxerObserver = &hls.Muxer{} var _ rtmp.ServerSessionObserver = &rtmp.Server{} var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientSimple{} @@ -202,5 +200,3 @@ var _ rtsp.IInterleavedPacketWriter = &rtsp.PubSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.SubSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.ClientCommandSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.ServerCommandSession{} - -var _ hls.StreamerObserver = &hls.Muxer{} diff --git a/pkg/innertest/innertest.go b/pkg/innertest/innertest.go index a626dc9..7ce5df6 100644 --- a/pkg/innertest/innertest.go +++ b/pkg/innertest/innertest.go @@ -9,7 +9,6 @@ package innertest import ( - "bytes" "fmt" "io/ioutil" "net/http" @@ -17,6 +16,9 @@ import ( "testing" "time" + "github.com/q191201771/naza/pkg/nazabytes" + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/lal/pkg/httpts" "github.com/q191201771/naza/pkg/filebatch" @@ -53,16 +55,11 @@ import ( // - 加上relay pull var ( - tt *testing.T + t *testing.T - confFile = "../../testdata/lalserver.conf.json" - rFlvFileName = "../../testdata/test.flv" - wRtmpPullFileName = "../../testdata/rtmppull.flv" - wFlvPullFileName = "../../testdata/flvpull.flv" - wPlaylistM3u8FileName string - wRecordM3u8FileName string - wHlsTsFilePath string - //wRtspPullFileName = "../../testdata/rtsppull.flv" + mode int // 0 正常 1 输入只有音频 2 输入只有视频 + confFile = "../../testdata/lalserver.conf.json" + rFlvFileName = "../../testdata/test.flv" pushUrl string httpflvPullUrl string @@ -70,9 +67,17 @@ var ( rtmpPullUrl string rtspPullUrl string + wRtmpPullFileName string + wFlvPullFileName string + wPlaylistM3u8FileName string + wRecordM3u8FileName string + wHlsTsFilePath string + wTsPullFileName string + fileTagCount int httpflvPullTagCount nazaatomic.Uint32 rtmpPullTagCount nazaatomic.Uint32 + httptsSize nazaatomic.Uint32 rtspSdpCtx sdp.LogicContext rtspPullAvPacketCount nazaatomic.Uint32 @@ -99,7 +104,20 @@ func (r RtspPullObserver) OnAvPacket(pkt base.AvPacket) { rtspPullAvPacketCount.Increment() } -func Entry(t *testing.T) { +func Entry(tt *testing.T) { + t = tt + + mode = 0 + entry() + + mode = 1 + entry() + + mode = 2 + entry() +} + +func entry() { if _, err := os.Lstat(confFile); err != nil { Log.Warnf("lstat %s error. err=%+v", confFile, err) return @@ -109,22 +127,26 @@ func Entry(t *testing.T) { return } + httpflvPullTagCount.Store(0) + rtmpPullTagCount.Store(0) + httptsSize.Store(0) hls.Clock = mock.NewFakeClock() - hls.Clock.Set(time.Date(2022, 1, 16, 23, 24, 25, 0, time.Local)) + hls.Clock.Set(time.Date(2022, 1, 16, 23, 24, 25, 0, time.UTC)) httpts.SubSessionWriteChanSize = 0 - tt = t - var err error sm := logic.NewServerManager(confFile) - go sm.RunLoop() - time.Sleep(200 * time.Millisecond) - config := sm.Config() + //Log.Init(func(option *nazalog.Option) { + // option.Level = nazalog.LevelLogNothing + //}) _ = os.RemoveAll(config.HlsConfig.OutPath) + go sm.RunLoop() + time.Sleep(100 * time.Millisecond) + getAllHttpApi(config.HttpApiConfig.Addr) pushUrl = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RtmpConfig.Addr) @@ -132,12 +154,32 @@ func Entry(t *testing.T) { httptsPullUrl = fmt.Sprintf("http://127.0.0.1%s/live/innertest.ts", config.HttpflvConfig.HttpListenAddr) rtmpPullUrl = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RtmpConfig.Addr) rtspPullUrl = fmt.Sprintf("rtsp://127.0.0.1%s/live/innertest", config.RtspConfig.Addr) + + wRtmpPullFileName = "../../testdata/rtmppull.flv" + wFlvPullFileName = "../../testdata/flvpull.flv" + wTsPullFileName = fmt.Sprintf("../../testdata/tspull_%d.ts", mode) wPlaylistM3u8FileName = fmt.Sprintf("%sinnertest/playlist.m3u8", config.HlsConfig.OutPath) wRecordM3u8FileName = fmt.Sprintf("%sinnertest/record.m3u8", config.HlsConfig.OutPath) wHlsTsFilePath = fmt.Sprintf("%sinnertest/", config.HlsConfig.OutPath) - tags, err := httpflv.ReadAllTagsFromFlvFile(rFlvFileName) + var tags []httpflv.Tag + originTags, err := httpflv.ReadAllTagsFromFlvFile(rFlvFileName) assert.Equal(t, nil, err) + if mode == 0 { + tags = originTags + } else if mode == 1 { + for _, tag := range originTags { + if tag.Header.Type == base.RtmpTypeIdMetadata || tag.Header.Type == base.RtmpTypeIdAudio { + tags = append(tags, tag) + } + } + } else if mode == 2 { + for _, tag := range originTags { + if tag.Header.Type == base.RtmpTypeIdMetadata || tag.Header.Type == base.RtmpTypeIdVideo { + tags = append(tags, tag) + } + } + } fileTagCount = len(tags) err = httpFlvWriter.Open(wFlvPullFileName) @@ -160,7 +202,7 @@ func Entry(t *testing.T) { func(msg base.RtmpMsg) { tag := remux.RtmpMsg2FlvTag(msg) err := rtmpWriter.WriteTag(*tag) - assert.Equal(tt, nil, err) + assert.Equal(t, nil, err) rtmpPullTagCount.Increment() }) Log.Assert(nil, err) @@ -169,6 +211,7 @@ func Entry(t *testing.T) { }() go func() { + var flvErr error httpflvPullSession = httpflv.NewPullSession(func(option *httpflv.PullSessionOption) { option.ReadTimeoutMs = 10000 }) @@ -178,19 +221,17 @@ func Entry(t *testing.T) { httpflvPullTagCount.Increment() }) Log.Assert(nil, err) - err = <-httpflvPullSession.WaitChan() - Log.Debug(err) + flvErr = <-httpflvPullSession.WaitChan() + Log.Debug(flvErr) }() go func() { - //nazalog.Info("CHEFGREPME >") - b, err := httpGet(httptsPullUrl) - assert.Equal(t, 2216332, len(b)) - assert.Equal(t, "03f8eac7d2c3d5d85056c410f5fcc756", nazamd5.Md5(b)) - Log.Infof("CHEFGREPME %+v", err) + b, _ := getHttpts() + _ = ioutil.WriteFile(wTsPullFileName, b, 0666) + assert.Equal(t, goldenHttptsLenList[mode], len(b)) + assert.Equal(t, goldenHttptsMd5List[mode], nazamd5.Md5(b)) }() - - time.Sleep(200 * time.Millisecond) + time.Sleep(100 * time.Millisecond) // TODO(chef): [test] [2021.12.25] rtsp sub测试 由于rtsp sub不支持没有pub时sub,只能sub失败后重试,所以没有验证收到的数据 // TODO(chef): [perf] [2021.12.25] rtmp推rtsp拉的性能。开启rtsp pull后,rtmp pull的总时长增加了 @@ -210,9 +251,11 @@ func Entry(t *testing.T) { } }() + time.Sleep(100 * time.Millisecond) + pushSession = rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { option.WriteBufSize = 4096 - option.WriteChanSize = 1024 + //option.WriteChanSize = 1024 }) err = pushSession.Push(pushUrl) assert.Equal(t, nil, err) @@ -221,7 +264,7 @@ func Entry(t *testing.T) { assert.Equal(t, nil, err) chunks := remux.FlvTag2RtmpChunks(tag) //Log.Debugf("rtmp push: %d", fileTagCount.Load()) - err = pushSession.Write(chunks) + err := pushSession.Write(chunks) assert.Equal(t, nil, err) } err = pushSession.Flush() @@ -229,21 +272,25 @@ func Entry(t *testing.T) { getAllHttpApi(config.HttpApiConfig.Addr) + // 注意,先释放push,触发pub释放,从而刷新hls的结束时切片逻辑 + pushSession.Dispose() + for { if httpflvPullTagCount.Load() == uint32(fileTagCount) && - rtmpPullTagCount.Load() == uint32(fileTagCount) { - time.Sleep(100 * time.Millisecond) + rtmpPullTagCount.Load() == uint32(fileTagCount) && + httptsSize.Load() == uint32(goldenHttptsLenList[mode]) { break } - time.Sleep(10 * time.Millisecond) + nazalog.Debugf("%d(%d, %d) %d(%d)", + fileTagCount, httpflvPullTagCount.Load(), rtmpPullTagCount.Load(), goldenHttptsLenList[mode], httptsSize.Load()) + time.Sleep(100 * time.Millisecond) } Log.Debug("[innertest] start dispose.") - pushSession.Dispose() httpflvPullSession.Dispose() rtmpPullSession.Dispose() - //rtspPullSession.Dispose() + rtspPullSession.Dispose() httpFlvWriter.Dispose() rtmpWriter.Dispose() @@ -260,34 +307,28 @@ func Entry(t *testing.T) { func compareFile() { r, err := ioutil.ReadFile(rFlvFileName) - assert.Equal(tt, nil, err) + assert.Equal(t, nil, err) Log.Debugf("%s filesize:%d", rFlvFileName, len(r)) // 检查httpflv w, err := ioutil.ReadFile(wFlvPullFileName) - assert.Equal(tt, nil, err) - Log.Debugf("%s filesize:%d", wFlvPullFileName, len(w)) - res := bytes.Compare(r, w) - assert.Equal(tt, 0, res) - //err = os.Remove(wFlvPullFileName) - assert.Equal(tt, nil, err) + assert.Equal(t, nil, err) + assert.Equal(t, goldenHttpflvLenList[mode], len(w)) + assert.Equal(t, goldenHttpflvMd5List[mode], nazamd5.Md5(w)) // 检查rtmp - w2, err := ioutil.ReadFile(wRtmpPullFileName) - assert.Equal(tt, nil, err) - Log.Debugf("%s filesize:%d", wRtmpPullFileName, len(w2)) - res = bytes.Compare(r, w2) - assert.Equal(tt, 0, res) - //err = os.Remove(wRtmpPullFileName) - assert.Equal(tt, nil, err) + w, err = ioutil.ReadFile(wRtmpPullFileName) + assert.Equal(t, nil, err) + assert.Equal(t, goldenRtmpLenList[mode], len(w)) + assert.Equal(t, goldenRtmpMd5List[mode], nazamd5.Md5(w)) // 检查hls的m3u8文件 playListM3u8, err := ioutil.ReadFile(wPlaylistM3u8FileName) - assert.Equal(tt, nil, err) - assert.Equal(tt, goldenPlaylistM3u8, string(playListM3u8)) + assert.Equal(t, nil, err) + assert.Equal(t, goldenPlaylistM3u8List[mode], string(playListM3u8)) recordM3u8, err := ioutil.ReadFile(wRecordM3u8FileName) - assert.Equal(tt, nil, err) - assert.Equal(tt, []byte(goldenRecordM3u8), recordM3u8) + assert.Equal(t, nil, err) + assert.Equal(t, goldenRecordM3u8List[mode], string(recordM3u8)) // 检查hls的ts文件 var allContent []byte @@ -301,11 +342,11 @@ func compareFile() { fileNum++ return nil }) - assert.Equal(tt, nil, err) + assert.Equal(t, nil, err) allContentMd5 := nazamd5.Md5(allContent) - assert.Equal(tt, 8, fileNum) - assert.Equal(tt, 2219152, len(allContent)) - assert.Equal(tt, "48db6251d40c271fd11b05650f074e0f", allContentMd5) + assert.Equal(t, goldenHlsTsNumList[mode], fileNum) + assert.Equal(t, goldenHlsTsLenList[mode], len(allContent)) + assert.Equal(t, goldenHlsTsMd5List[mode], allContentMd5) } func getAllHttpApi(addr string) { @@ -339,6 +380,30 @@ func getAllHttpApi(addr string) { Log.Debugf("%s", string(b)) } +func getHttpts() ([]byte, error) { + resp, err := http.DefaultClient.Get(httptsPullUrl) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var buf nazabytes.Buffer + buf.ReserveBytes(goldenHttptsLenList[mode]) + for { + n, err := resp.Body.Read(buf.WritableBytes()) + if n > 0 { + buf.Flush(n) + httptsSize.Add(uint32(n)) + } + if err != nil { + return buf.Bytes(), err + } + if buf.Len() == goldenHttptsLenList[mode] { + return buf.Bytes(), nil + } + } +} + // --------------------------------------------------------------------------------------------------------------------- // TODO(chef): refactor 移入naza中 @@ -363,48 +428,175 @@ func httpPost(url string, info interface{}) ([]byte, error) { // --------------------------------------------------------------------------------------------------------------------- -var goldenPlaylistM3u8 = `#EXTM3U +var ( + goldenRtmpLenList = []int{2120047, 504722, 1615715} + goldenRtmpMd5List = []string{ + "7d68f0e2ab85c1992f70740479c8d3db", + "b889f690e07399c8c8353a3b1dba7efb", + "b5a9759455039761b6d4dd3ed8e97634", + } + + goldenHttpflvLenList = []int{2120047, 504722, 1615715} + goldenHttpflvMd5List = []string{ + "7d68f0e2ab85c1992f70740479c8d3db", + "b889f690e07399c8c8353a3b1dba7efb", + "b5a9759455039761b6d4dd3ed8e97634", + } + + goldenHlsTsNumList = []int{8, 10, 8} + goldenHlsTsLenList = []int{2219152, 525648, 1696512} + goldenHlsTsMd5List = []string{ + "48db6251d40c271fd11b05650f074e0f", + "2eb19ad498688dadf950b3e749985922", + "2d1e5c1a3ca385e0b55b2cff2f52b710", + } + + goldenHttptsLenList = []int{2216332, 522264, 1693880} + goldenHttptsMd5List = []string{ + "03f8eac7d2c3d5d85056c410f5fcc756", + "0d102b6fb7fc3134e56a07f00292e888", + "651a2b5c93370738d81995f8fd49af4d", + } +) + +var goldenPlaylistM3u8List = []string{ + `#EXTM3U #EXT-X-VERSION:3 #EXT-X-ALLOW-CACHE:NO #EXT-X-TARGETDURATION:5 #EXT-X-MEDIA-SEQUENCE:2 #EXTINF:3.333, -innertest-1642346665000-2.ts +innertest-1642375465000-2.ts #EXTINF:4.000, -innertest-1642346665000-3.ts +innertest-1642375465000-3.ts #EXTINF:4.867, -innertest-1642346665000-4.ts +innertest-1642375465000-4.ts #EXTINF:3.133, -innertest-1642346665000-5.ts +innertest-1642375465000-5.ts #EXTINF:4.000, -innertest-1642346665000-6.ts +innertest-1642375465000-6.ts #EXTINF:2.621, -innertest-1642346665000-7.ts +innertest-1642375465000-7.ts +#EXT-X-ENDLIST +`, + `#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-ALLOW-CACHE:NO +#EXT-X-TARGETDURATION:3 +#EXT-X-MEDIA-SEQUENCE:4 + +#EXTINF:3.088, +innertest-1642375465000-4.ts +#EXTINF:3.088, +innertest-1642375465000-5.ts +#EXTINF:3.089, +innertest-1642375465000-6.ts +#EXTINF:3.088, +innertest-1642375465000-7.ts +#EXTINF:3.088, +innertest-1642375465000-8.ts +#EXTINF:2.113, +innertest-1642375465000-9.ts +#EXT-X-ENDLIST +`, + `#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-ALLOW-CACHE:NO +#EXT-X-TARGETDURATION:5 +#EXT-X-MEDIA-SEQUENCE:2 + +#EXTINF:3.333, +innertest-1642375465000-2.ts +#EXTINF:4.000, +innertest-1642375465000-3.ts +#EXTINF:4.867, +innertest-1642375465000-4.ts +#EXTINF:3.133, +innertest-1642375465000-5.ts +#EXTINF:4.000, +innertest-1642375465000-6.ts +#EXTINF:2.600, +innertest-1642375465000-7.ts #EXT-X-ENDLIST -` +`, +} -var goldenRecordM3u8 = `#EXTM3U +var goldenRecordM3u8List = []string{ + `#EXTM3U #EXT-X-VERSION:3 #EXT-X-TARGETDURATION:5 #EXT-X-MEDIA-SEQUENCE:0 #EXT-X-DISCONTINUITY #EXTINF:4.000, -innertest-1642346665000-0.ts +innertest-1642375465000-0.ts #EXTINF:4.000, -innertest-1642346665000-1.ts +innertest-1642375465000-1.ts #EXTINF:3.333, -innertest-1642346665000-2.ts +innertest-1642375465000-2.ts #EXTINF:4.000, -innertest-1642346665000-3.ts +innertest-1642375465000-3.ts #EXTINF:4.867, -innertest-1642346665000-4.ts +innertest-1642375465000-4.ts #EXTINF:3.133, -innertest-1642346665000-5.ts +innertest-1642375465000-5.ts #EXTINF:4.000, -innertest-1642346665000-6.ts +innertest-1642375465000-6.ts #EXTINF:2.621, -innertest-1642346665000-7.ts +innertest-1642375465000-7.ts +#EXT-X-ENDLIST +`, + `#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-TARGETDURATION:3 +#EXT-X-MEDIA-SEQUENCE:0 + +#EXT-X-DISCONTINUITY +#EXTINF:3.088, +innertest-1642375465000-0.ts +#EXTINF:3.088, +innertest-1642375465000-1.ts +#EXTINF:3.089, +innertest-1642375465000-2.ts +#EXTINF:3.088, +innertest-1642375465000-3.ts +#EXTINF:3.088, +innertest-1642375465000-4.ts +#EXTINF:3.088, +innertest-1642375465000-5.ts +#EXTINF:3.089, +innertest-1642375465000-6.ts +#EXTINF:3.088, +innertest-1642375465000-7.ts +#EXTINF:3.088, +innertest-1642375465000-8.ts +#EXTINF:2.113, +innertest-1642375465000-9.ts #EXT-X-ENDLIST -` +`, + `#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-TARGETDURATION:5 +#EXT-X-MEDIA-SEQUENCE:0 + +#EXT-X-DISCONTINUITY +#EXTINF:4.000, +innertest-1642375465000-0.ts +#EXTINF:4.000, +innertest-1642375465000-1.ts +#EXTINF:3.333, +innertest-1642375465000-2.ts +#EXTINF:4.000, +innertest-1642375465000-3.ts +#EXTINF:4.867, +innertest-1642375465000-4.ts +#EXTINF:3.133, +innertest-1642375465000-5.ts +#EXTINF:4.000, +innertest-1642375465000-6.ts +#EXTINF:2.600, +innertest-1642375465000-7.ts +#EXT-X-ENDLIST +`, +} diff --git a/pkg/innertest/innertest_test.go b/pkg/innertest/innertest_test.go new file mode 100644 index 0000000..605f71c --- /dev/null +++ b/pkg/innertest/innertest_test.go @@ -0,0 +1,15 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package innertest + +import "testing" + +func TestEntry(t *testing.T) { + Entry(t) +} diff --git a/pkg/logic/config.go b/pkg/logic/config.go index 5335a1c..d6fe0cc 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -322,6 +322,9 @@ func LoadConfAndInitLog(confFile string) *Config { return config } + +// --------------------------------------------------------------------------------------------------------------------- + func mergeCommonHttpAddrConfig(dst, src *CommonHttpAddrConfig) { if dst.HttpListenAddr == "" && src.HttpListenAddr != "" { dst.HttpListenAddr = src.HttpListenAddr diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 5721b96..0e63444 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -10,34 +10,18 @@ package logic import ( "encoding/json" - "fmt" - "net" - "path/filepath" "strings" "sync" - "time" - - "github.com/q191201771/lal/pkg/sdp" - - "github.com/q191201771/lal/pkg/mpegts" - - "github.com/q191201771/lal/pkg/remux" - - "github.com/q191201771/lal/pkg/rtprtcp" - - "github.com/q191201771/lal/pkg/hevc" - - "github.com/q191201771/lal/pkg/httpts" "github.com/q191201771/lal/pkg/base" - - "github.com/q191201771/lal/pkg/avc" - "github.com/q191201771/lal/pkg/rtsp" - "github.com/q191201771/lal/pkg/hls" - "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/httpts" + "github.com/q191201771/lal/pkg/mpegts" + "github.com/q191201771/lal/pkg/remux" "github.com/q191201771/lal/pkg/rtmp" + "github.com/q191201771/lal/pkg/rtsp" + "github.com/q191201771/lal/pkg/sdp" ) type GroupObserver interface { @@ -54,17 +38,25 @@ type Group struct { exitChan chan struct{} mutex sync.Mutex - // - stat base.StatGroup // pub - rtmpPubSession *rtmp.ServerSession - rtspPubSession *rtsp.PubSession - rtsp2RtmpRemuxer *remux.AvPacket2RtmpRemuxer - rtmp2RtspRemuxer *remux.Rtmp2RtspRemuxer + rtmpPubSession *rtmp.ServerSession + rtspPubSession *rtsp.PubSession + rtsp2RtmpRemuxer *remux.AvPacket2RtmpRemuxer + rtmp2RtspRemuxer *remux.Rtmp2RtspRemuxer + rtmp2MpegtsRemuxer *remux.Rtmp2MpegtsRemuxer // pull pullEnable bool pullUrl string pullProxy *pullProxy + // rtmp pub使用 + dummyAudioFilter *remux.DummyAudioFilter + // rtmp pub/pull使用 + rtmpGopCache *remux.GopCache + httpflvGopCache *remux.GopCache + // rtsp使用 + sdpCtx *sdp.LogicContext + // mpegts使用 + patpmt []byte // sub rtmpSubSessionSet map[*rtmp.ServerSession]struct{} httpflvSubSessionSet map[*httpflv.SubSession]struct{} @@ -78,45 +70,15 @@ type Group struct { // record recordFlv *httpflv.FlvFileWriter recordMpegts *mpegts.FileWriter - // rtmp pub/pull使用 - rtmpGopCache *remux.GopCache - httpflvGopCache *remux.GopCache - // rtmp pub使用 - dummyAudioFilter *remux.DummyAudioFilter // rtmp sub使用 rtmpMergeWriter *base.MergeWriter // TODO(chef): 后面可以在业务层加一个定时Flush - // mpegts使用 - patpmt []byte - // rtsp使用 - sdpCtx *sdp.LogicContext // - tickCount uint32 -} - -type pullProxy struct { - isPulling bool - pullSession *rtmp.PullSession -} - -type pushProxy struct { - isPushing bool - pushSession *rtmp.PushSession + stat base.StatGroup } func NewGroup(appName string, streamName string, config *Config, observer GroupObserver) *Group { uk := base.GenUkGroup() - url2PushProxy := make(map[string]*pushProxy) // TODO(chef): 移入Enable里面并进行review+测试 - if config.RelayPushConfig.Enable { - for _, addr := range config.RelayPushConfig.AddrList { - pushUrl := fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName) - url2PushProxy[pushUrl] = &pushProxy{ - isPushing: false, - pushSession: nil, - } - } - } - g := &Group{ UniqueKey: uk, appName: appName, @@ -133,23 +95,17 @@ func NewGroup(appName string, streamName string, config *Config, observer GroupO rtspSubSessionSet: make(map[*rtsp.SubSession]struct{}), rtmpGopCache: remux.NewGopCache("rtmp", uk, config.RtmpConfig.GopNum), httpflvGopCache: remux.NewGopCache("httpflv", uk, config.HttpflvConfig.GopNum), - pushEnable: config.RelayPushConfig.Enable, - url2PushProxy: url2PushProxy, pullProxy: &pullProxy{}, } - // 根据配置文件中的静态回源配置来初始化回源设置 - var pullUrl string - if config.RelayPullConfig.Enable { - pullUrl = fmt.Sprintf("rtmp://%s/%s/%s", config.RelayPullConfig.Addr, appName, streamName) - } - g.setPullUrl(config.RelayPullConfig.Enable, pullUrl) + g.initRelayPush() + g.initRelayPull() if config.RtmpConfig.MergeWriteSize > 0 { g.rtmpMergeWriter = base.NewMergeWriter(g.writev2RtmpSubSessions, config.RtmpConfig.MergeWriteSize) } - Log.Infof("[%s] lifecycle new group. group=%p, appName=%s, streamName=%s", uk, g, appName, streamName) + Log.Infof("[%s] lifecycle new group. group=%p, appName=%s, streamName=%s", uk, g, appName, streamName) return g } @@ -157,106 +113,30 @@ func (group *Group) RunLoop() { <-group.exitChan } -// Tick TODO chef: 传入时间 -// 目前每秒触发一次 -func (group *Group) Tick() { +// Tick 定时器 +// +// @param tickCount 当前时间,单位秒。注意,不一定是Unix时间戳,可以是从0开始+1秒递增的时间 +// +func (group *Group) Tick(tickCount uint32) { group.mutex.Lock() defer group.mutex.Unlock() group.stopPullIfNeeded() group.pullIfNeeded() - // 还有pub推流,没在push就触发push - group.pushIfNeeded() - - // TODO chef: - // 梳理和naza.Connection超时重复部分 + group.startPushIfNeeded() // 定时关闭没有数据的session - if group.tickCount%checkSessionAliveIntervalSec == 0 { - if group.rtmpPubSession != nil { - if readAlive, _ := group.rtmpPubSession.IsAlive(); !readAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtmpPubSession.UniqueKey()) - group.rtmpPubSession.Dispose() - group.rtmp2RtspRemuxer = nil - } - } - if group.rtspPubSession != nil { - if readAlive, _ := group.rtspPubSession.IsAlive(); !readAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey()) - group.rtspPubSession.Dispose() - group.rtspPubSession = nil - group.rtsp2RtmpRemuxer = nil - } - } - if group.pullProxy.pullSession != nil { - if readAlive, _ := group.pullProxy.pullSession.IsAlive(); !readAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.pullProxy.pullSession.UniqueKey()) - group.pullProxy.pullSession.Dispose() - group.delRtmpPullSession(group.pullProxy.pullSession) - } - } - for session := range group.rtmpSubSessionSet { - if _, writeAlive := session.IsAlive(); !writeAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) - session.Dispose() - group.delRtmpSubSession(session) - } - } - for session := range group.httpflvSubSessionSet { - if _, writeAlive := session.IsAlive(); !writeAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) - session.Dispose() - group.delHttpflvSubSession(session) - } - } - for session := range group.httptsSubSessionSet { - if _, writeAlive := session.IsAlive(); !writeAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) - session.Dispose() - group.delHttptsSubSession(session) - } - } - for session := range group.rtspSubSessionSet { - if _, writeAlive := session.IsAlive(); !writeAlive { - Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) - session.Dispose() - group.delRtspSubSession(session) - } - } + if tickCount%checkSessionAliveIntervalSec == 0 { + group.disposeInactiveSessions() } // 定时计算session bitrate - if group.tickCount%calcSessionStatIntervalSec == 0 { - if group.rtmpPubSession != nil { - group.rtmpPubSession.UpdateStat(calcSessionStatIntervalSec) - } - if group.rtspPubSession != nil { - group.rtspPubSession.UpdateStat(calcSessionStatIntervalSec) - } - if group.pullProxy.pullSession != nil { - group.pullProxy.pullSession.UpdateStat(calcSessionStatIntervalSec) - } - for session := range group.rtmpSubSessionSet { - session.UpdateStat(calcSessionStatIntervalSec) - } - for session := range group.httpflvSubSessionSet { - session.UpdateStat(calcSessionStatIntervalSec) - } - for session := range group.httptsSubSessionSet { - session.UpdateStat(calcSessionStatIntervalSec) - } - for session := range group.rtspSubSessionSet { - session.UpdateStat(calcSessionStatIntervalSec) - } + if tickCount%calcSessionStatIntervalSec == 0 { + group.updateAllSessionStat() } - - group.tickCount++ } -// Dispose 主动释放所有资源。保证所有资源的生命周期逻辑上都在我们的控制中。降低出bug的几率,降低心智负担。 -// 注意,Dispose后,不应再使用这个对象。 -// 值得一提,如果是从其他协程回调回来的消息,在使用Group中的资源前,要判断资源是否存在以及可用。 -// +// Dispose ... func (group *Group) Dispose() { Log.Infof("[%s] lifecycle dispose group.", group.UniqueKey) group.exitChan <- struct{}{} @@ -266,13 +146,9 @@ func (group *Group) Dispose() { if group.rtmpPubSession != nil { group.rtmpPubSession.Dispose() - group.rtmpPubSession = nil - group.rtmp2RtspRemuxer = nil } if group.rtspPubSession != nil { group.rtspPubSession.Dispose() - group.rtspPubSession = nil - group.rtsp2RtmpRemuxer = nil } for session := range group.rtmpSubSessionSet { @@ -290,261 +166,11 @@ func (group *Group) Dispose() { } group.httptsSubSessionSet = nil - group.disposeHlsMuxer() - - if group.pushEnable { - for _, v := range group.url2PushProxy { - if v.pushSession != nil { - v.pushSession.Dispose() - } - } - group.url2PushProxy = nil - } -} - -// --------------------------------------------------------------------------------------------------------------------- - -func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error { - Log.Debugf("[%s] [%s] add PubSession into group.", group.UniqueKey, session.UniqueKey()) - - group.mutex.Lock() - defer group.mutex.Unlock() - - if group.hasInSession() { - // TODO(chef): [refactor] 打印in session - Log.Errorf("[%s] in stream already exist at group. add=%s", group.UniqueKey, session.UniqueKey()) - return base.ErrDupInStream - } - - group.rtmpPubSession = session - group.addIn() - - if group.config.RtspConfig.Enable { - group.rtmp2RtspRemuxer = remux.NewRtmp2RtspRemuxer( - func(sdpCtx sdp.LogicContext) { - group.sdpCtx = &sdpCtx - }, - group.onRtpPacket, - ) - } - - // TODO(chef): 为rtmp pull以及rtsp也添加叠加静音音频的功能 - if group.config.RtmpConfig.AddDummyAudioEnable { - // TODO(chef): 从整体控制和锁关系来说,应该让pub的数据回调到group中进锁后再让数据流入filter - group.dummyAudioFilter = remux.NewDummyAudioFilter(group.UniqueKey, group.config.RtmpConfig.AddDummyAudioWaitAudioMs, group.OnReadRtmpAvMsg) - session.SetPubSessionObserver(group.dummyAudioFilter) - } else { - session.SetPubSessionObserver(group) - } - - return nil -} - -// AddRtspPubSession TODO chef: rtsp package中,增加回调返回值判断,如果是false,将连接关掉 -func (group *Group) AddRtspPubSession(session *rtsp.PubSession) error { - Log.Debugf("[%s] [%s] add RTSP PubSession into group.", group.UniqueKey, session.UniqueKey()) - - group.mutex.Lock() - defer group.mutex.Unlock() - - if group.hasInSession() { - Log.Errorf("[%s] in stream already exist at group. wanna add=%s", group.UniqueKey, session.UniqueKey()) - return base.ErrDupInStream - } - - group.rtspPubSession = session - group.addIn() - - group.rtsp2RtmpRemuxer = remux.NewAvPacket2RtmpRemuxer(func(msg base.RtmpMsg) { - group.broadcastByRtmpMsg(msg) - }) - session.SetObserver(group) - - return nil -} - -func (group *Group) AddRtmpPullSession(session *rtmp.PullSession) bool { - Log.Debugf("[%s] [%s] add PullSession into group.", group.UniqueKey, session.UniqueKey()) - - group.mutex.Lock() - defer group.mutex.Unlock() - - if group.hasInSession() { - Log.Errorf("[%s] in stream already exist. wanna add=%s", group.UniqueKey, session.UniqueKey()) - return false - } - - group.pullProxy.pullSession = session - group.addIn() - - // TODO(chef): 这里也应该启动rtmp2RtspRemuxer - - return true -} - -func (group *Group) DelRtmpPubSession(session *rtmp.ServerSession) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.delRtmpPubSession(session) -} - -func (group *Group) DelRtspPubSession(session *rtsp.PubSession) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.delRtspPubSession(session) -} - -func (group *Group) DelRtmpPullSession(session *rtmp.PullSession) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.delRtmpPullSession(session) -} - -// --------------------------------------------------------------------------------------------------------------------- - -func (group *Group) AddRtmpSubSession(session *rtmp.ServerSession) { - Log.Debugf("[%s] [%s] add SubSession into group.", group.UniqueKey, session.UniqueKey()) - group.mutex.Lock() - defer group.mutex.Unlock() - group.rtmpSubSessionSet[session] = struct{}{} - // 加入时,如果上行还没有推过视频(比如还没推流,或者是单音频流),就不需要等待关键帧了 - // 也即我们假定上行肯定是以关键帧为开始进行视频发送,假设不是,那么我们按上行的流正常发,而不过滤掉关键帧前面的不包含关键帧的非完整GOP - // TODO(chef): - // 1. 需要仔细考虑单音频无视频的流的情况 - // 2. 这里不修改标志,让这个session继续等关键帧也可以 - if group.stat.VideoCodec == "" { - session.ShouldWaitVideoKeyFrame = false - } - - group.pullIfNeeded() -} - -func (group *Group) AddHttpflvSubSession(session *httpflv.SubSession) { - Log.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey()) - session.WriteHttpResponseHeader() - session.WriteFlvHeader() - - group.mutex.Lock() - defer group.mutex.Unlock() - group.httpflvSubSessionSet[session] = struct{}{} - // 加入时,如果上行还没有推流过,就不需要等待关键帧了 - if group.stat.VideoCodec == "" { - session.ShouldWaitVideoKeyFrame = false - } - - group.pullIfNeeded() -} - -// AddHttptsSubSession TODO chef: -// 这里应该也要考虑触发hls muxer开启 -// 也即HTTPTS sub需要使用hls muxer,hls muxer开启和关闭都要考虑HTTPTS sub -func (group *Group) AddHttptsSubSession(session *httpts.SubSession) { - Log.Debugf("[%s] [%s] add httpts SubSession into group.", group.UniqueKey, session.UniqueKey()) - session.WriteHttpResponseHeader() - - group.mutex.Lock() - defer group.mutex.Unlock() - group.httptsSubSessionSet[session] = struct{}{} - - group.pullIfNeeded() -} - -func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) { - group.mutex.Lock() - defer group.mutex.Unlock() - // TODO(chef): 应该有等待机制,而不是直接关闭 - if group.sdpCtx == nil { - Log.Warnf("[%s] close rtsp subSession while describe but sdp not exist. [%s]", - group.UniqueKey, session.UniqueKey()) - return false, nil - } - - return true, group.sdpCtx.RawSdp -} - -func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) { - Log.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey()) - - group.mutex.Lock() - defer group.mutex.Unlock() - group.rtspSubSessionSet[session] = struct{}{} - if group.stat.VideoCodec == "" { - session.ShouldWaitVideoKeyFrame = false - } - - // TODO(chef): rtsp sub也应该判断是否需要静态pull回源 -} - -func (group *Group) AddRtmpPushSession(url string, session *rtmp.PushSession) { - Log.Debugf("[%s] [%s] add rtmp PushSession into group.", group.UniqueKey, session.UniqueKey()) - group.mutex.Lock() - defer group.mutex.Unlock() - if group.url2PushProxy != nil { - group.url2PushProxy[url].pushSession = session - } -} - -func (group *Group) DelRtmpSubSession(session *rtmp.ServerSession) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.delRtmpSubSession(session) -} - -func (group *Group) DelHttpflvSubSession(session *httpflv.SubSession) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.delHttpflvSubSession(session) -} - -func (group *Group) DelHttptsSubSession(session *httpts.SubSession) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.delHttptsSubSession(session) -} - -func (group *Group) DelRtspSubSession(session *rtsp.SubSession) { - Log.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey()) - group.mutex.Lock() - defer group.mutex.Unlock() - delete(group.rtspSubSessionSet, session) -} - -func (group *Group) DelRtmpPushSession(url string, session *rtmp.PushSession) { - Log.Debugf("[%s] [%s] del rtmp PushSession into group.", group.UniqueKey, session.UniqueKey()) - group.mutex.Lock() - defer group.mutex.Unlock() - if group.url2PushProxy != nil { - group.url2PushProxy[url].pushSession = nil - group.url2PushProxy[url].isPushing = false - } + group.delIn() } // --------------------------------------------------------------------------------------------------------------------- -func (group *Group) IsTotalEmpty() bool { - group.mutex.Lock() - defer group.mutex.Unlock() - return group.isTotalEmpty() -} - -func (group *Group) HasInSession() bool { - group.mutex.Lock() - defer group.mutex.Unlock() - return group.hasInSession() -} - -func (group *Group) HasOutSession() bool { - group.mutex.Lock() - defer group.mutex.Unlock() - return group.hasOutSession() -} - -func (group *Group) IsHlsMuxerAlive() bool { - group.mutex.Lock() - defer group.mutex.Unlock() - return group.hlsMuxer != nil -} - func (group *Group) StringifyDebugStats(maxsub int) string { b, _ := json.Marshal(group.GetStat(maxsub)) return string(b) @@ -611,13 +237,11 @@ func (group *Group) KickOutSession(sessionId string) bool { if strings.HasPrefix(sessionId, base.UkPreRtmpServerSession) { if group.rtmpPubSession != nil { group.rtmpPubSession.Dispose() - group.rtmp2RtspRemuxer = nil return true } } else if strings.HasPrefix(sessionId, base.UkPreRtspPubSession) { if group.rtspPubSession != nil { group.rtspPubSession.Dispose() - group.rtsp2RtmpRemuxer = nil return true } } else if strings.HasPrefix(sessionId, base.UkPreFlvSubSession) { @@ -636,7 +260,12 @@ func (group *Group) KickOutSession(sessionId string) bool { } } } else if strings.HasPrefix(sessionId, base.UkPreRtspSubSession) { - // TODO chef: impl me + for s := range group.rtspSubSessionSet { + if s.UniqueKey() == sessionId { + s.Dispose() + return true + } + } } else { Log.Errorf("[%s] kick out session while session id format invalid. %s", group.UniqueKey, sessionId) } @@ -644,123 +273,101 @@ func (group *Group) KickOutSession(sessionId string) bool { return false } -// StartPull 外部命令主动触发pull拉流 -// -// 当前调用时机: -// 1. 比如http api -// -func (group *Group) StartPull(url string) { +func (group *Group) IsTotalEmpty() bool { group.mutex.Lock() defer group.mutex.Unlock() - - group.setPullUrl(true, url) - group.pullIfNeeded() -} - -// --------------------------------------------------------------------------------------------------------------------- - -func (group *Group) delRtmpPubSession(session *rtmp.ServerSession) { - Log.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, session.UniqueKey()) - - if session != group.rtmpPubSession { - Log.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p", group.UniqueKey, session.UniqueKey(), group.rtmpPubSession) - return - } - - group.rtmpPubSession = nil - group.rtmp2RtspRemuxer = nil - group.dummyAudioFilter = nil - group.delIn() -} - -func (group *Group) delRtspPubSession(session *rtsp.PubSession) { - Log.Debugf("[%s] [%s] del rtsp PubSession from group.", group.UniqueKey, session.UniqueKey()) - - if session != group.rtspPubSession { - Log.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p", group.UniqueKey, session.UniqueKey(), group.rtmpPubSession) - return - } - - _ = group.rtspPubSession.Dispose() - group.rtspPubSession = nil - group.rtsp2RtmpRemuxer = nil - group.delIn() -} - -func (group *Group) delRtmpPullSession(session *rtmp.PullSession) { - Log.Debugf("[%s] [%s] del rtmp PullSession from group.", group.UniqueKey, session.UniqueKey()) - - group.pullProxy.pullSession = nil - group.setPullingFlag(false) - group.delIn() -} - -func (group *Group) delRtmpSubSession(session *rtmp.ServerSession) { - Log.Debugf("[%s] [%s] del rtmp SubSession from group.", group.UniqueKey, session.UniqueKey()) - delete(group.rtmpSubSessionSet, session) + return group.isTotalEmpty() } -func (group *Group) delHttpflvSubSession(session *httpflv.SubSession) { - Log.Debugf("[%s] [%s] del httpflv SubSession from group.", group.UniqueKey, session.UniqueKey()) - delete(group.httpflvSubSessionSet, session) +func (group *Group) HasInSession() bool { + group.mutex.Lock() + defer group.mutex.Unlock() + return group.hasInSession() } -func (group *Group) delHttptsSubSession(session *httpts.SubSession) { - Log.Debugf("[%s] [%s] del httpts SubSession from group.", group.UniqueKey, session.UniqueKey()) - delete(group.httptsSubSessionSet, session) +func (group *Group) HasOutSession() bool { + group.mutex.Lock() + defer group.mutex.Unlock() + return group.hasOutSession() } -func (group *Group) delRtspSubSession(session *rtsp.SubSession) { - Log.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey()) - delete(group.rtspSubSessionSet, session) -} +// --------------------------------------------------------------------------------------------------------------------- -func (group *Group) pushIfNeeded() { - // push转推功能没开 - if !group.pushEnable { - return +// disposeInactiveSessions 关闭不活跃的session +// +// TODO(chef): [fix] Push是否需要检查 +// TODO chef: [refactor] 梳理和naza.Connection超时重复部分 +// +func (group *Group) disposeInactiveSessions() { + if group.rtmpPubSession != nil { + if readAlive, _ := group.rtmpPubSession.IsAlive(); !readAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtmpPubSession.UniqueKey()) + group.rtmpPubSession.Dispose() + } + } + if group.rtspPubSession != nil { + if readAlive, _ := group.rtspPubSession.IsAlive(); !readAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey()) + group.rtspPubSession.Dispose() + } } - // 没有pub发布者 - if group.rtmpPubSession == nil && group.rtspPubSession == nil { - return + if group.pullProxy.pullSession != nil { + if readAlive, _ := group.pullProxy.pullSession.IsAlive(); !readAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.pullProxy.pullSession.UniqueKey()) + group.pullProxy.pullSession.Dispose() + } } - - // relay push时携带rtmp pub的参数 - // TODO chef: 这个逻辑放这里不太好看 - var urlParam string - if group.rtmpPubSession != nil { - urlParam = group.rtmpPubSession.RawQuery() + for session := range group.rtmpSubSessionSet { + if _, writeAlive := session.IsAlive(); !writeAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) + session.Dispose() + } } - - for url, v := range group.url2PushProxy { - // 正在转推中 - if v.isPushing { - continue + for session := range group.rtspSubSessionSet { + if _, writeAlive := session.IsAlive(); !writeAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) + session.Dispose() } - v.isPushing = true - - urlWithParam := url - if urlParam != "" { - urlWithParam += "?" + urlParam + } + for session := range group.httpflvSubSessionSet { + if _, writeAlive := session.IsAlive(); !writeAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) + session.Dispose() } - Log.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam) + } + for session := range group.httptsSubSessionSet { + if _, writeAlive := session.IsAlive(); !writeAlive { + Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey()) + session.Dispose() + } + } +} - go func(u, u2 string) { - pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.PushTimeoutMs = relayPushTimeoutMs - option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs - }) - err := pushSession.Push(u2) - if err != nil { - Log.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) - group.DelRtmpPushSession(u, pushSession) - return - } - group.AddRtmpPushSession(u, pushSession) - err = <-pushSession.WaitChan() - Log.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) - group.DelRtmpPushSession(u, pushSession) - }(url, urlWithParam) +// updateAllSessionStat 更新所有session的状态 +// +// TODO(chef): [fix] Push是否需要更新 +// +func (group *Group) updateAllSessionStat() { + if group.rtmpPubSession != nil { + group.rtmpPubSession.UpdateStat(calcSessionStatIntervalSec) + } + if group.rtspPubSession != nil { + group.rtspPubSession.UpdateStat(calcSessionStatIntervalSec) + } + if group.pullProxy.pullSession != nil { + group.pullProxy.pullSession.UpdateStat(calcSessionStatIntervalSec) + } + for session := range group.rtmpSubSessionSet { + session.UpdateStat(calcSessionStatIntervalSec) + } + for session := range group.httpflvSubSessionSet { + session.UpdateStat(calcSessionStatIntervalSec) + } + for session := range group.httptsSubSessionSet { + session.UpdateStat(calcSessionStatIntervalSec) + } + for session := range group.rtspSubSessionSet { + session.UpdateStat(calcSessionStatIntervalSec) } } @@ -798,562 +405,12 @@ func (group *Group) isTotalEmpty() bool { !group.hasPushSession() } -// 有pub或pull的in session加入时,需要调用该函数 -// -func (group *Group) addIn() { - // 是否启动hls - if group.config.HlsConfig.Enable { - if group.hlsMuxer != nil { - Log.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer) - } - enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps - group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group) - group.hlsMuxer.Start() - } - - // 是否push转推 - if group.pushEnable { - group.pushIfNeeded() - } - - now := time.Now().Unix() - - // 是否录制成flv文件 - if group.config.RecordConfig.EnableFlv { - filename := fmt.Sprintf("%s-%d.flv", group.streamName, now) - filenameWithPath := filepath.Join(group.config.RecordConfig.FlvOutPath, filename) - if group.recordFlv != nil { - Log.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s", - group.UniqueKey, filenameWithPath, group.recordFlv.Name()) - if err := group.recordFlv.Dispose(); err != nil { - Log.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err) - } - } - group.recordFlv = &httpflv.FlvFileWriter{} - if err := group.recordFlv.Open(filenameWithPath); err != nil { - Log.Errorf("[%s] record flv open file failed. filename=%s, err=%+v", - group.UniqueKey, filenameWithPath, err) - group.recordFlv = nil - } - if err := group.recordFlv.WriteFlvHeader(); err != nil { - Log.Errorf("[%s] record flv write flv header failed. filename=%s, err=%+v", - group.UniqueKey, filenameWithPath, err) - group.recordFlv = nil - } - } - - // 是否录制成ts文件 - if group.config.RecordConfig.EnableMpegts { - filename := fmt.Sprintf("%s-%d.ts", group.streamName, now) - filenameWithPath := filepath.Join(group.config.RecordConfig.MpegtsOutPath, filename) - if group.recordMpegts != nil { - Log.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s", - group.UniqueKey, filenameWithPath, group.recordMpegts.Name()) - if err := group.recordMpegts.Dispose(); err != nil { - Log.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err) - } - } - group.recordMpegts = &mpegts.FileWriter{} - if err := group.recordMpegts.Create(filenameWithPath); err != nil { - Log.Errorf("[%s] record mpegts open file failed. filename=%s, err=%+v", - group.UniqueKey, filenameWithPath, err) - group.recordFlv = nil - } - } -} - -// 有pub或pull的in session离开时,需要调用该函数 -// -func (group *Group) delIn() { - // 停止hls - if group.config.HlsConfig.Enable && group.hlsMuxer != nil { - group.disposeHlsMuxer() - } - - // 停止转推 - if group.pushEnable { - for _, v := range group.url2PushProxy { - if v.pushSession != nil { - v.pushSession.Dispose() - } - v.pushSession = nil - } - } - - // 停止flv录制 - if group.config.RecordConfig.EnableFlv { - if group.recordFlv != nil { - if err := group.recordFlv.Dispose(); err != nil { - Log.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err) - } - group.recordFlv = nil - } - } - - // 停止ts录制 - if group.config.RecordConfig.EnableMpegts { - if group.recordMpegts != nil { - if err := group.recordMpegts.Dispose(); err != nil { - Log.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err) - } - group.recordMpegts = nil - } - } - - // 清理各种和in session相关的资源 - // TODO(chef) 清空rtsp pub缓存的asc sps pps等数据 - group.rtmpGopCache.Clear() - group.httpflvGopCache.Clear() - group.patpmt = nil - group.sdpCtx = nil -} - -func (group *Group) disposeHlsMuxer() { - if group.hlsMuxer != nil { - group.hlsMuxer.Dispose() - - group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath()) - - group.hlsMuxer = nil - } -} - -// --------------------------------------------------------------------------------------------------------------------- -// 音视频数据转发、转封装的逻辑 -// --------------------------------------------------------------------------------------------------------------------- - -// OnReadRtmpAvMsg rtmp.PubSession or rtmp.PullSession -func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) { - group.mutex.Lock() - defer group.mutex.Unlock() - group.broadcastByRtmpMsg(msg) -} - -// OnRtpPacket rtsp.PubSession -func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) { - group.mutex.Lock() - defer group.mutex.Unlock() - - group.onRtpPacket(pkt) -} - -// OnSdp rtsp.PubSession -func (group *Group) OnSdp(sdpCtx sdp.LogicContext) { - group.mutex.Lock() - defer group.mutex.Unlock() - - group.sdpCtx = &sdpCtx - group.rtsp2RtmpRemuxer.OnSdp(sdpCtx) -} - -// OnAvPacket rtsp.PubSession -func (group *Group) OnAvPacket(pkt base.AvPacket) { - group.mutex.Lock() - defer group.mutex.Unlock() - //Log.Debugf("[%s] > Group::OnAvPacket. type=%s, ts=%d, len=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp, len(pkt.Payload)) - - group.rtsp2RtmpRemuxer.OnAvPacket(pkt) -} - -// ----- implement hls.MuxerObserver of hls.Muxer ---------------------------------------------------------------------- - -// OnPatPmt hls.Muxer -func (group *Group) OnPatPmt(b []byte) { - group.patpmt = b - - if group.recordMpegts != nil { - if err := group.recordMpegts.Write(b); err != nil { - Log.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err) - } - } -} - -// OnTsPackets hls.Muxer -func (group *Group) OnTsPackets(rawFrame []byte, boundary bool) { - // 因为最前面Feed时已经加锁了,所以这里回调上来就不用加锁了 - - for session := range group.httptsSubSessionSet { - if session.IsFresh { - if boundary { - session.Write(group.patpmt) - session.Write(rawFrame) - session.IsFresh = false - } - } else { - session.Write(rawFrame) - } - } - - if group.recordMpegts != nil { - if err := group.recordMpegts.Write(rawFrame); err != nil { - Log.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err) - } - } -} - -// --------------------------------------------------------------------------------------------------------------------- - -// TODO chef: 目前相当于其他类型往rtmp.AVMsg转了,考虑统一往一个通用类型转 -// -// rtmp.PubSession, rtmp.PullSession, rtsp2rtmpRemuxer -// -// @param msg 调用结束后,内部不持有msg.Payload内存块 -// -func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) { - var ( - lcd remux.LazyRtmpChunkDivider - lrm2ft remux.LazyRtmpMsg2FlvTag - ) - - if len(msg.Payload) == 0 { - Log.Warnf("[%s] msg payload length is 0. %+v", group.UniqueKey, msg.Header) - return - } - - // # hls - if group.config.HlsConfig.Enable && group.hlsMuxer != nil { - group.hlsMuxer.FeedRtmpMessage(msg) - } - - // # rtsp - if group.config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil { - group.rtmp2RtspRemuxer.FeedRtmpMsg(msg) - } - - // # 设置好用于发送的 rtmp 头部信息 - currHeader := remux.MakeDefaultRtmpHeader(msg.Header) - if currHeader.MsgLen != uint32(len(msg.Payload)) { - Log.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, currHeader.MsgLen, len(msg.Payload), msg.Header) - } - - // # 懒初始化rtmp chunk切片,以及httpflv转换 - lcd.Init(msg.Payload, &currHeader) - lrm2ft.Init(msg) - - // # 广播。遍历所有 rtmp sub session,转发数据 - // ## 如果是新的 sub session,发送已缓存的信息 - for session := range group.rtmpSubSessionSet { - if session.IsFresh { - // TODO chef: 头信息和full gop也可以在SubSession刚加入时发送 - if group.rtmpGopCache.Metadata != nil { - Log.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey()) - _ = session.Write(group.rtmpGopCache.Metadata) - } - if group.rtmpGopCache.VideoSeqHeader != nil { - Log.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey()) - _ = session.Write(group.rtmpGopCache.VideoSeqHeader) - } - if group.rtmpGopCache.AacSeqHeader != nil { - Log.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey()) - _ = session.Write(group.rtmpGopCache.AacSeqHeader) - } - gopCount := group.rtmpGopCache.GetGopCount() - if gopCount > 0 { - // GOP缓存中肯定包含了关键帧 - session.ShouldWaitVideoKeyFrame = false - - Log.Debugf("[%s] [%s] write gop cache. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount) - } - for i := 0; i < gopCount; i++ { - for _, item := range group.rtmpGopCache.GetGopDataAt(i) { - _ = session.Write(item) - } - } - - // 有新加入的sub session(本次循环的第一个新加入的sub session),把rtmp buf writer中的缓存数据全部广播发送给老的sub session - // 从而确保新加入的sub session不会发送这部分脏的数据 - // 注意,此处可能被调用多次,但是只有第一次会实际flush缓存数据 - if group.rtmpMergeWriter != nil { - group.rtmpMergeWriter.Flush() - } - - session.IsFresh = false - } - - if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNalu() { - // 有sub session在等待关键帧,并且当前是关键帧 - // 把rtmp buf writer中的缓存数据全部广播发送给老的sub session - // 并且修改这个sub session的标志 - // 让rtmp buf writer来发送这个关键帧 - if group.rtmpMergeWriter != nil { - group.rtmpMergeWriter.Flush() - } - session.ShouldWaitVideoKeyFrame = false - } - } - // ## 转发本次数据 - if len(group.rtmpSubSessionSet) > 0 { - if group.rtmpMergeWriter == nil { - group.write2RtmpSubSessions(lcd.Get()) - } else { - group.rtmpMergeWriter.Write(lcd.Get()) - } - } - - // TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下 - if group.pushEnable { - for _, v := range group.url2PushProxy { - if v.pushSession == nil { - continue - } - - if v.pushSession.IsFresh { - if group.rtmpGopCache.Metadata != nil { - _ = v.pushSession.Write(group.rtmpGopCache.Metadata) - } - if group.rtmpGopCache.VideoSeqHeader != nil { - _ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader) - } - if group.rtmpGopCache.AacSeqHeader != nil { - _ = v.pushSession.Write(group.rtmpGopCache.AacSeqHeader) - } - for i := 0; i < group.rtmpGopCache.GetGopCount(); i++ { - for _, item := range group.rtmpGopCache.GetGopDataAt(i) { - _ = v.pushSession.Write(item) - } - } - - v.pushSession.IsFresh = false - } - - _ = v.pushSession.Write(lcd.Get()) - } - } - - // # 广播。遍历所有 httpflv sub session,转发数据 - for session := range group.httpflvSubSessionSet { - if session.IsFresh { - if group.httpflvGopCache.Metadata != nil { - session.Write(group.httpflvGopCache.Metadata) - } - if group.httpflvGopCache.VideoSeqHeader != nil { - session.Write(group.httpflvGopCache.VideoSeqHeader) - } - if group.httpflvGopCache.AacSeqHeader != nil { - session.Write(group.httpflvGopCache.AacSeqHeader) - } - gopCount := group.httpflvGopCache.GetGopCount() - if gopCount > 0 { - // GOP缓存中肯定包含了关键帧 - session.ShouldWaitVideoKeyFrame = false - } - for i := 0; i < gopCount; i++ { - for _, item := range group.httpflvGopCache.GetGopDataAt(i) { - session.Write(item) - } - } - - session.IsFresh = false - } - - // 是否在等待关键帧 - if session.ShouldWaitVideoKeyFrame { - if msg.IsVideoKeyNalu() { - session.Write(lrm2ft.Get()) - session.ShouldWaitVideoKeyFrame = false - } - } else { - session.Write(lrm2ft.Get()) - } - } - - // # 录制flv文件 - if group.recordFlv != nil { - if err := group.recordFlv.WriteRaw(lrm2ft.Get()); err != nil { - Log.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err) - } - } - - // # 缓存关键信息,以及gop - if group.config.RtmpConfig.Enable { - group.rtmpGopCache.Feed(msg, lcd.Get) - } - if group.config.HttpflvConfig.Enable { - group.httpflvGopCache.Feed(msg, lrm2ft.Get) - } - - // # 记录stat - if group.stat.AudioCodec == "" { - if msg.IsAacSeqHeader() { - group.stat.AudioCodec = base.AudioCodecAac - } - } - if group.stat.VideoCodec == "" { - if msg.IsAvcKeySeqHeader() { - group.stat.VideoCodec = base.VideoCodecAvc - } - if msg.IsHevcKeySeqHeader() { - group.stat.VideoCodec = base.VideoCodecHevc - } - } - if group.stat.VideoHeight == 0 || group.stat.VideoWidth == 0 { - if msg.IsAvcKeySeqHeader() { - sps, _, err := avc.ParseSpsPpsFromSeqHeader(msg.Payload) - if err == nil { - var ctx avc.Context - err = avc.ParseSps(sps, &ctx) - if err == nil { - group.stat.VideoHeight = int(ctx.Height) - group.stat.VideoWidth = int(ctx.Width) - } - } - } - if msg.IsHevcKeySeqHeader() { - _, sps, _, err := hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload) - if err == nil { - var ctx hevc.Context - err = hevc.ParseSps(sps, &ctx) - if err == nil { - group.stat.VideoHeight = int(ctx.PicHeightInLumaSamples) - group.stat.VideoWidth = int(ctx.PicWidthInLumaSamples) - } - } - } - } -} - -// rtsp.PubSession, rtmp2RtspRemuxer -// -func (group *Group) onRtpPacket(pkt rtprtcp.RtpPacket) { - // 音频直接发送 - if group.sdpCtx.IsAudioPayloadTypeOrigin(int(pkt.Header.PacketType)) { - for s := range group.rtspSubSessionSet { - s.WriteRtpPacket(pkt) - } - return - } - - var ( - boundary bool - boundaryChecked bool // 保证只检查0次或1次,减少性能开销 - ) - - for s := range group.rtspSubSessionSet { - if !s.ShouldWaitVideoKeyFrame { - s.WriteRtpPacket(pkt) - continue - } - - if !boundaryChecked { - switch group.sdpCtx.GetVideoPayloadTypeBase() { - case base.AvPacketPtAvc: - boundary = rtprtcp.IsAvcBoundary(pkt) - case base.AvPacketPtHevc: - boundary = rtprtcp.IsHevcBoundary(pkt) - default: - // 注意,不是avc和hevc时,直接发送 - boundary = true - } - boundaryChecked = true - } - - if boundary { - s.WriteRtpPacket(pkt) - s.ShouldWaitVideoKeyFrame = false - } - } -} - -func (group *Group) write2RtmpSubSessions(b []byte) { - for session := range group.rtmpSubSessionSet { - if session.IsFresh || session.ShouldWaitVideoKeyFrame { - continue - } - _ = session.Write(b) - } -} - -func (group *Group) writev2RtmpSubSessions(bs net.Buffers) { - for session := range group.rtmpSubSessionSet { - if session.IsFresh || session.ShouldWaitVideoKeyFrame { - continue - } - _ = session.Writev(bs) - } -} - -// --------------------------------------------------------------------------------------------------------------------- - -func (group *Group) isPullEnable() bool { - return group.pullEnable -} - -func (group *Group) setPullUrl(enable bool, url string) { - group.pullEnable = enable - group.pullUrl = url -} - -func (group *Group) getPullUrl() string { - return group.pullUrl -} - -func (group *Group) setPullingFlag(flag bool) { - group.pullProxy.isPulling = flag -} - -func (group *Group) getPullingFlag() bool { - return group.pullProxy.isPulling -} - -// 判断是否需要pull从远端拉流至本地,如果需要,则触发pull -// -// 当前调用时机: -// 1. 添加新sub session -// 2. 外部命令,比如http api -// 3. 定时器,比如pull的连接断了,通过定时器可以重启触发pull -// -func (group *Group) pullIfNeeded() { - if !group.isPullEnable() { - return - } - // 如果没有从本地拉流的,就不需要pull了 - if !group.hasOutSession() { - return - } - // 如果本地已经有输入型的流,就不需要pull了 - if group.hasInSession() { - return - } - // 已经在pull中,就不需要pull了 - if group.getPullingFlag() { - return +func (group *Group) inSessionUniqueKey() string { + if group.rtmpPubSession != nil { + return group.rtmpPubSession.UniqueKey() } - group.setPullingFlag(true) - - Log.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.getPullUrl()) - - go func() { - pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { - option.PullTimeoutMs = relayPullTimeoutMs - option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs - }) - // TODO(chef): 处理数据回调,是否应该等待Add成功之后。避免竞态条件中途加入了其他in session - err := pullSession.Pull(group.getPullUrl(), group.OnReadRtmpAvMsg) - if err != nil { - Log.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err) - group.DelRtmpPullSession(pullSession) - return - } - res := group.AddRtmpPullSession(pullSession) - if res { - err = <-pullSession.WaitChan() - Log.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err) - group.DelRtmpPullSession(pullSession) - } else { - pullSession.Dispose() - } - }() -} - -// 判断是否需要停止pull -// -// 当前调用时机: -// 1. 定时器定时检查 -// -func (group *Group) stopPullIfNeeded() { - // 没有输出型的流了 - if group.pullProxy.pullSession != nil && !group.hasOutSession() { - Log.Infof("[%s] stop pull since no sub session.", group.UniqueKey) - group.pullProxy.pullSession.Dispose() + if group.rtspPubSession != nil { + return group.rtspPubSession.UniqueKey() } + return "" } diff --git a/pkg/logic/group__core_streaming.go b/pkg/logic/group__core_streaming.go new file mode 100644 index 0000000..b17e458 --- /dev/null +++ b/pkg/logic/group__core_streaming.go @@ -0,0 +1,440 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "net" + + "github.com/q191201771/lal/pkg/mpegts" + + "github.com/q191201771/lal/pkg/avc" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/hevc" + "github.com/q191201771/lal/pkg/remux" + "github.com/q191201771/lal/pkg/rtprtcp" + "github.com/q191201771/lal/pkg/sdp" +) + +// group__streaming.go +// +// 包含group中音视频数据转发、转封装协议的逻辑 +// + +// --------------------------------------------------------------------------------------------------------------------- + +// OnReadRtmpAvMsg +// +// 输入rtmp数据. +// 来自 rtmp.ServerSession(Pub), rtmp.PullSession, (remux.DummyAudioFilter) 的回调. +// +func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.broadcastByRtmpMsg(msg) +} + +// --------------------------------------------------------------------------------------------------------------------- + +// OnSdp OnRtpPacket OnAvPacket +// +// 输入rtsp(rtp)和rtp合帧之后的数据. +// 来自 rtsp.PubSession 的回调. +// +func (group *Group) OnSdp(sdpCtx sdp.LogicContext) { + group.mutex.Lock() + defer group.mutex.Unlock() + + group.sdpCtx = &sdpCtx + group.rtsp2RtmpRemuxer.OnSdp(sdpCtx) +} + +// OnRtpPacket ... +func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.feedRtpPacket(pkt) +} + +// OnAvPacket ... +func (group *Group) OnAvPacket(pkt base.AvPacket) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.rtsp2RtmpRemuxer.OnAvPacket(pkt) +} + +// --------------------------------------------------------------------------------------------------------------------- + +// OnPatPmt OnTsPackets +// +// 输入mpegts数据. +// 来自 remux.Rtmp2MpegtsRemuxer 的回调. +// +func (group *Group) OnPatPmt(b []byte) { + group.patpmt = b + + group.hlsMuxer.FeedPatPmt(b) + + if group.recordMpegts != nil { + if err := group.recordMpegts.Write(b); err != nil { + Log.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err) + } + } +} + +// OnTsPackets ... +func (group *Group) OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) { + group.feedTsPackets(tsPackets, frame, boundary) +} + +// --------------------------------------------------------------------------------------------------------------------- + +// onRtmpMsgFromRemux +// +// 输入rtmp数据. +// 来自 remux.AvPacket2RtmpRemuxer 的回调. +// +func (group *Group) onRtmpMsgFromRemux(msg base.RtmpMsg) { + group.broadcastByRtmpMsg(msg) +} + +// --------------------------------------------------------------------------------------------------------------------- + +// onSdpFromRemux onRtpPacketFromRemux +// +// 输入rtsp(rtp)数据. +// 来自 remux.Rtmp2RtspRemuxer 的回调. +// +func (group *Group) onSdpFromRemux(sdpCtx sdp.LogicContext) { + group.sdpCtx = &sdpCtx +} + +// onRtpPacketFromRemux ... +func (group *Group) onRtpPacketFromRemux(pkt rtprtcp.RtpPacket) { + group.feedRtpPacket(pkt) +} + +// --------------------------------------------------------------------------------------------------------------------- + +// OnFragmentOpen +// +// 来自 hls.Muxer 的回调 +// +func (group *Group) OnFragmentOpen() { + group.rtmp2MpegtsRemuxer.FlushAudio() +} + +// --------------------------------------------------------------------------------------------------------------------- + +// broadcastByRtmpMsg +// +// 使用rtmp类型的数据做为输入,广播给各协议的输出 +// +// @param msg 调用结束后,内部不持有msg.Payload内存块 +// +func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) { + var ( + lcd remux.LazyRtmpChunkDivider + lrm2ft remux.LazyRtmpMsg2FlvTag + ) + + if len(msg.Payload) == 0 { + Log.Warnf("[%s] msg payload length is 0. %+v", group.UniqueKey, msg.Header) + return + } + + // # mpegts remuxer + if group.config.HlsConfig.Enable || group.config.HttptsConfig.Enable { + group.rtmp2MpegtsRemuxer.FeedRtmpMessage(msg) + } + + // # rtsp + if group.config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil { + group.rtmp2RtspRemuxer.FeedRtmpMsg(msg) + } + + // # 设置好用于发送的 rtmp 头部信息 + currHeader := remux.MakeDefaultRtmpHeader(msg.Header) + if currHeader.MsgLen != uint32(len(msg.Payload)) { + Log.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, currHeader.MsgLen, len(msg.Payload), msg.Header) + } + + // # 懒初始化rtmp chunk切片,以及httpflv转换 + lcd.Init(msg.Payload, &currHeader) + lrm2ft.Init(msg) + + // # 广播。遍历所有 rtmp sub session,转发数据 + // ## 如果是新的 sub session,发送已缓存的信息 + for session := range group.rtmpSubSessionSet { + if session.IsFresh { + // TODO chef: 头信息和full gop也可以在SubSession刚加入时发送 + if group.rtmpGopCache.Metadata != nil { + Log.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey()) + _ = session.Write(group.rtmpGopCache.Metadata) + } + if group.rtmpGopCache.VideoSeqHeader != nil { + Log.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey()) + _ = session.Write(group.rtmpGopCache.VideoSeqHeader) + } + if group.rtmpGopCache.AacSeqHeader != nil { + Log.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey()) + _ = session.Write(group.rtmpGopCache.AacSeqHeader) + } + gopCount := group.rtmpGopCache.GetGopCount() + if gopCount > 0 { + // GOP缓存中肯定包含了关键帧 + session.ShouldWaitVideoKeyFrame = false + + Log.Debugf("[%s] [%s] write gop cache. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount) + } + for i := 0; i < gopCount; i++ { + for _, item := range group.rtmpGopCache.GetGopDataAt(i) { + _ = session.Write(item) + } + } + + // 有新加入的sub session(本次循环的第一个新加入的sub session),把rtmp buf writer中的缓存数据全部广播发送给老的sub session + // 从而确保新加入的sub session不会发送这部分脏的数据 + // 注意,此处可能被调用多次,但是只有第一次会实际flush缓存数据 + if group.rtmpMergeWriter != nil { + group.rtmpMergeWriter.Flush() + } + + session.IsFresh = false + } + + if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNalu() { + // 有sub session在等待关键帧,并且当前是关键帧 + // 把rtmp buf writer中的缓存数据全部广播发送给老的sub session + // 并且修改这个sub session的标志 + // 让rtmp buf writer来发送这个关键帧 + if group.rtmpMergeWriter != nil { + group.rtmpMergeWriter.Flush() + } + session.ShouldWaitVideoKeyFrame = false + } + } + // ## 转发本次数据 + if len(group.rtmpSubSessionSet) > 0 { + if group.rtmpMergeWriter == nil { + group.write2RtmpSubSessions(lcd.Get()) + } else { + group.rtmpMergeWriter.Write(lcd.Get()) + } + } + + // TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下 + if group.pushEnable { + for _, v := range group.url2PushProxy { + if v.pushSession == nil { + continue + } + + if v.pushSession.IsFresh { + if group.rtmpGopCache.Metadata != nil { + _ = v.pushSession.Write(group.rtmpGopCache.Metadata) + } + if group.rtmpGopCache.VideoSeqHeader != nil { + _ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader) + } + if group.rtmpGopCache.AacSeqHeader != nil { + _ = v.pushSession.Write(group.rtmpGopCache.AacSeqHeader) + } + for i := 0; i < group.rtmpGopCache.GetGopCount(); i++ { + for _, item := range group.rtmpGopCache.GetGopDataAt(i) { + _ = v.pushSession.Write(item) + } + } + + v.pushSession.IsFresh = false + } + + _ = v.pushSession.Write(lcd.Get()) + } + } + + // # 广播。遍历所有 httpflv sub session,转发数据 + for session := range group.httpflvSubSessionSet { + if session.IsFresh { + if group.httpflvGopCache.Metadata != nil { + session.Write(group.httpflvGopCache.Metadata) + } + if group.httpflvGopCache.VideoSeqHeader != nil { + session.Write(group.httpflvGopCache.VideoSeqHeader) + } + if group.httpflvGopCache.AacSeqHeader != nil { + session.Write(group.httpflvGopCache.AacSeqHeader) + } + gopCount := group.httpflvGopCache.GetGopCount() + if gopCount > 0 { + // GOP缓存中肯定包含了关键帧 + session.ShouldWaitVideoKeyFrame = false + } + for i := 0; i < gopCount; i++ { + for _, item := range group.httpflvGopCache.GetGopDataAt(i) { + session.Write(item) + } + } + + session.IsFresh = false + } + + // 是否在等待关键帧 + if session.ShouldWaitVideoKeyFrame { + if msg.IsVideoKeyNalu() { + session.Write(lrm2ft.Get()) + session.ShouldWaitVideoKeyFrame = false + } + } else { + session.Write(lrm2ft.Get()) + } + } + + // # 录制flv文件 + if group.recordFlv != nil { + if err := group.recordFlv.WriteRaw(lrm2ft.Get()); err != nil { + Log.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err) + } + } + + // # 缓存关键信息,以及gop + if group.config.RtmpConfig.Enable { + group.rtmpGopCache.Feed(msg, lcd.Get) + } + if group.config.HttpflvConfig.Enable { + group.httpflvGopCache.Feed(msg, lrm2ft.Get) + } + + // # 记录stat + if group.stat.AudioCodec == "" { + if msg.IsAacSeqHeader() { + group.stat.AudioCodec = base.AudioCodecAac + } + } + if group.stat.VideoCodec == "" { + if msg.IsAvcKeySeqHeader() { + group.stat.VideoCodec = base.VideoCodecAvc + } + if msg.IsHevcKeySeqHeader() { + group.stat.VideoCodec = base.VideoCodecHevc + } + } + if group.stat.VideoHeight == 0 || group.stat.VideoWidth == 0 { + if msg.IsAvcKeySeqHeader() { + sps, _, err := avc.ParseSpsPpsFromSeqHeader(msg.Payload) + if err == nil { + var ctx avc.Context + err = avc.ParseSps(sps, &ctx) + if err == nil { + group.stat.VideoHeight = int(ctx.Height) + group.stat.VideoWidth = int(ctx.Width) + } + } + } + if msg.IsHevcKeySeqHeader() { + _, sps, _, err := hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload) + if err == nil { + var ctx hevc.Context + err = hevc.ParseSps(sps, &ctx) + if err == nil { + group.stat.VideoHeight = int(ctx.PicHeightInLumaSamples) + group.stat.VideoWidth = int(ctx.PicWidthInLumaSamples) + } + } + } + } +} + +// --------------------------------------------------------------------------------------------------------------------- + +func (group *Group) feedRtpPacket(pkt rtprtcp.RtpPacket) { + // 音频直接发送 + if group.sdpCtx.IsAudioPayloadTypeOrigin(int(pkt.Header.PacketType)) { + for s := range group.rtspSubSessionSet { + s.WriteRtpPacket(pkt) + } + return + } + + var ( + boundary bool + boundaryChecked bool // 保证只检查0次或1次,减少性能开销 + ) + + for s := range group.rtspSubSessionSet { + if !s.ShouldWaitVideoKeyFrame { + s.WriteRtpPacket(pkt) + continue + } + + if !boundaryChecked { + switch group.sdpCtx.GetVideoPayloadTypeBase() { + case base.AvPacketPtAvc: + boundary = rtprtcp.IsAvcBoundary(pkt) + case base.AvPacketPtHevc: + boundary = rtprtcp.IsHevcBoundary(pkt) + default: + // 注意,不是avc和hevc时,直接发送 + boundary = true + } + boundaryChecked = true + } + + if boundary { + s.WriteRtpPacket(pkt) + s.ShouldWaitVideoKeyFrame = false + } + } +} + +// --------------------------------------------------------------------------------------------------------------------- + +func (group *Group) feedTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) { + // 注意,hls的处理放在前面,让hls先判断是否打开新的fragment并flush audio + if group.hlsMuxer != nil { + group.hlsMuxer.FeedMpegts(tsPackets, frame, boundary) + } + + for session := range group.httptsSubSessionSet { + if session.IsFresh { + if boundary { + session.Write(group.patpmt) + session.Write(tsPackets) + session.IsFresh = false + } + } else { + session.Write(tsPackets) + } + } + + if group.recordMpegts != nil { + if err := group.recordMpegts.Write(tsPackets); err != nil { + Log.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err) + } + } +} + +// --------------------------------------------------------------------------------------------------------------------- + +func (group *Group) write2RtmpSubSessions(b []byte) { + for session := range group.rtmpSubSessionSet { + if session.IsFresh || session.ShouldWaitVideoKeyFrame { + continue + } + _ = session.Write(b) + } +} + +func (group *Group) writev2RtmpSubSessions(bs net.Buffers) { + for session := range group.rtmpSubSessionSet { + if session.IsFresh || session.ShouldWaitVideoKeyFrame { + continue + } + _ = session.Writev(bs) + } +} diff --git a/pkg/logic/group__in.go b/pkg/logic/group__in.go new file mode 100644 index 0000000..2c3fb02 --- /dev/null +++ b/pkg/logic/group__in.go @@ -0,0 +1,188 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "time" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/remux" + "github.com/q191201771/lal/pkg/rtmp" + "github.com/q191201771/lal/pkg/rtsp" +) + +func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error { + group.mutex.Lock() + defer group.mutex.Unlock() + + Log.Debugf("[%s] [%s] add rtmp pub session into group.", group.UniqueKey, session.UniqueKey()) + + if group.hasInSession() { + Log.Errorf("[%s] in stream already exist at group. add=%s, exist=%s", + group.UniqueKey, session.UniqueKey(), group.inSessionUniqueKey()) + return base.ErrDupInStream + } + + group.rtmpPubSession = session + group.addIn() + + if group.config.RtspConfig.Enable { + group.rtmp2RtspRemuxer = remux.NewRtmp2RtspRemuxer( + group.onSdpFromRemux, + group.onRtpPacketFromRemux, + ) + } + + // TODO(chef): 为rtmp pull以及rtsp也添加叠加静音音频的功能 + if group.config.RtmpConfig.AddDummyAudioEnable { + // TODO(chef): 从整体控制和锁关系来说,应该让pub的数据回调到group中进锁后再让数据流入filter + group.dummyAudioFilter = remux.NewDummyAudioFilter(group.UniqueKey, group.config.RtmpConfig.AddDummyAudioWaitAudioMs, group.OnReadRtmpAvMsg) + session.SetPubSessionObserver(group.dummyAudioFilter) + } else { + session.SetPubSessionObserver(group) + } + + return nil +} + +// AddRtspPubSession TODO chef: rtsp package中,增加回调返回值判断,如果是false,将连接关掉 +func (group *Group) AddRtspPubSession(session *rtsp.PubSession) error { + Log.Debugf("[%s] [%s] add RTSP PubSession into group.", group.UniqueKey, session.UniqueKey()) + + group.mutex.Lock() + defer group.mutex.Unlock() + + if group.hasInSession() { + Log.Errorf("[%s] in stream already exist at group. wanna add=%s", group.UniqueKey, session.UniqueKey()) + return base.ErrDupInStream + } + + group.rtspPubSession = session + group.addIn() + + group.rtsp2RtmpRemuxer = remux.NewAvPacket2RtmpRemuxer(group.onRtmpMsgFromRemux) + session.SetObserver(group) + + return nil +} + +func (group *Group) AddRtmpPullSession(session *rtmp.PullSession) bool { + Log.Debugf("[%s] [%s] add PullSession into group.", group.UniqueKey, session.UniqueKey()) + + group.mutex.Lock() + defer group.mutex.Unlock() + + if group.hasInSession() { + Log.Errorf("[%s] in stream already exist. wanna add=%s", group.UniqueKey, session.UniqueKey()) + return false + } + + group.pullProxy.pullSession = session + group.addIn() + + // TODO(chef): 这里也应该启动rtmp2RtspRemuxer + + return true +} + +// --------------------------------------------------------------------------------------------------------------------- + +func (group *Group) DelRtmpPubSession(session *rtmp.ServerSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delRtmpPubSession(session) +} + +func (group *Group) DelRtspPubSession(session *rtsp.PubSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delRtspPubSession(session) +} + +func (group *Group) DelRtmpPullSession(session *rtmp.PullSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delRtmpPullSession(session) +} + +// --------------------------------------------------------------------------------------------------------------------- + +func (group *Group) delRtmpPubSession(session *rtmp.ServerSession) { + Log.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, session.UniqueKey()) + + if session != group.rtmpPubSession { + Log.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p", + group.UniqueKey, session.UniqueKey(), group.rtmpPubSession) + return + } + + group.delIn() +} + +func (group *Group) delRtspPubSession(session *rtsp.PubSession) { + Log.Debugf("[%s] [%s] del rtsp PubSession from group.", group.UniqueKey, session.UniqueKey()) + + if session != group.rtspPubSession { + Log.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p", + group.UniqueKey, session.UniqueKey(), group.rtspPubSession) + return + } + + group.delIn() +} + +func (group *Group) delRtmpPullSession(session *rtmp.PullSession) { + Log.Debugf("[%s] [%s] del rtmp PullSession from group.", group.UniqueKey, session.UniqueKey()) + + group.pullProxy.pullSession = nil + group.setPullingFlag(false) + group.delIn() +} + +// --------------------------------------------------------------------------------------------------------------------- + +// addIn 有pub或pull的输入型session加入时,需要调用该函数 +// +func (group *Group) addIn() { + now := time.Now().Unix() + + if group.config.HlsConfig.Enable || group.config.HttptsConfig.Enable { + group.rtmp2MpegtsRemuxer = remux.NewRtmp2MpegtsRemuxer(group) + } + + group.startPushIfNeeded() + group.startHlsIfNeeded() + group.startRecordFlvIfNeeded(now) + group.startRecordMpegtsIfNeeded(now) +} + +// delIn 有pub或pull的输入型session离开时,需要调用该函数 +// +func (group *Group) delIn() { + // 注意,remuxer放前面,使得有机会将内部缓存的数据吐出来 + if group.rtmp2MpegtsRemuxer != nil { + group.rtmp2MpegtsRemuxer.Dispose() + group.rtmp2MpegtsRemuxer = nil + } + + group.stopPushIfNeeded() + group.stopHlsIfNeeded() + group.stopRecordFlvIfNeeded() + group.stopRecordMpegtsIfNeeded() + + group.rtmpPubSession = nil + group.rtspPubSession = nil + group.rtsp2RtmpRemuxer = nil + group.rtmp2RtspRemuxer = nil + group.dummyAudioFilter = nil + group.rtmpGopCache.Clear() + group.httpflvGopCache.Clear() + group.patpmt = nil + group.sdpCtx = nil +} diff --git a/pkg/logic/group__out_sub.go b/pkg/logic/group__out_sub.go new file mode 100644 index 0000000..a8875aa --- /dev/null +++ b/pkg/logic/group__out_sub.go @@ -0,0 +1,135 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/httpts" + "github.com/q191201771/lal/pkg/rtmp" + "github.com/q191201771/lal/pkg/rtsp" +) + +func (group *Group) AddRtmpSubSession(session *rtmp.ServerSession) { + Log.Debugf("[%s] [%s] add SubSession into group.", group.UniqueKey, session.UniqueKey()) + group.mutex.Lock() + defer group.mutex.Unlock() + group.rtmpSubSessionSet[session] = struct{}{} + // 加入时,如果上行还没有推过视频(比如还没推流,或者是单音频流),就不需要等待关键帧了 + // 也即我们假定上行肯定是以关键帧为开始进行视频发送,假设不是,那么我们按上行的流正常发,而不过滤掉关键帧前面的不包含关键帧的非完整GOP + // TODO(chef): + // 1. 需要仔细考虑单音频无视频的流的情况 + // 2. 这里不修改标志,让这个session继续等关键帧也可以 + if group.stat.VideoCodec == "" { + session.ShouldWaitVideoKeyFrame = false + } + + group.pullIfNeeded() +} + +func (group *Group) AddHttpflvSubSession(session *httpflv.SubSession) { + Log.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey()) + session.WriteHttpResponseHeader() + session.WriteFlvHeader() + + group.mutex.Lock() + defer group.mutex.Unlock() + group.httpflvSubSessionSet[session] = struct{}{} + // 加入时,如果上行还没有推流过,就不需要等待关键帧了 + if group.stat.VideoCodec == "" { + session.ShouldWaitVideoKeyFrame = false + } + + group.pullIfNeeded() +} + +// AddHttptsSubSession TODO chef: +// 这里应该也要考虑触发hls muxer开启 +// 也即HTTPTS sub需要使用hls muxer,hls muxer开启和关闭都要考虑HTTPTS sub +func (group *Group) AddHttptsSubSession(session *httpts.SubSession) { + Log.Debugf("[%s] [%s] add httpts SubSession into group.", group.UniqueKey, session.UniqueKey()) + session.WriteHttpResponseHeader() + + group.mutex.Lock() + defer group.mutex.Unlock() + group.httptsSubSessionSet[session] = struct{}{} + + group.pullIfNeeded() +} + +func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) { + group.mutex.Lock() + defer group.mutex.Unlock() + // TODO(chef): 应该有等待机制,而不是直接关闭 + if group.sdpCtx == nil { + Log.Warnf("[%s] close rtsp subSession while describe but sdp not exist. [%s]", + group.UniqueKey, session.UniqueKey()) + return false, nil + } + + return true, group.sdpCtx.RawSdp +} + +func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) { + Log.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey()) + + group.mutex.Lock() + defer group.mutex.Unlock() + group.rtspSubSessionSet[session] = struct{}{} + if group.stat.VideoCodec == "" { + session.ShouldWaitVideoKeyFrame = false + } + + // TODO(chef): rtsp sub也应该判断是否需要静态pull回源 +} + +func (group *Group) DelRtmpSubSession(session *rtmp.ServerSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delRtmpSubSession(session) +} + +func (group *Group) DelHttpflvSubSession(session *httpflv.SubSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delHttpflvSubSession(session) +} + +func (group *Group) DelHttptsSubSession(session *httpts.SubSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delHttptsSubSession(session) +} + +func (group *Group) DelRtspSubSession(session *rtsp.SubSession) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.delRtspSubSession(session) +} + +// --------------------------------------------------------------------------------------------------------------------- + +func (group *Group) delRtmpSubSession(session *rtmp.ServerSession) { + Log.Debugf("[%s] [%s] del rtmp SubSession from group.", group.UniqueKey, session.UniqueKey()) + delete(group.rtmpSubSessionSet, session) +} + +func (group *Group) delHttpflvSubSession(session *httpflv.SubSession) { + Log.Debugf("[%s] [%s] del httpflv SubSession from group.", group.UniqueKey, session.UniqueKey()) + delete(group.httpflvSubSessionSet, session) +} + +func (group *Group) delHttptsSubSession(session *httpts.SubSession) { + Log.Debugf("[%s] [%s] del httpts SubSession from group.", group.UniqueKey, session.UniqueKey()) + delete(group.httptsSubSessionSet, session) +} + +func (group *Group) delRtspSubSession(session *rtsp.SubSession) { + Log.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey()) + delete(group.rtspSubSessionSet, session) +} diff --git a/pkg/logic/group__record_flv.go b/pkg/logic/group__record_flv.go new file mode 100644 index 0000000..8d615e4 --- /dev/null +++ b/pkg/logic/group__record_flv.go @@ -0,0 +1,57 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "fmt" + "path/filepath" + + "github.com/q191201771/lal/pkg/httpflv" +) + +// startRecordFlvIfNeeded 必要时开启flv录制 +// +func (group *Group) startRecordFlvIfNeeded(nowUnix int64) { + if !group.config.RecordConfig.EnableFlv { + return + } + + // 构造文件名 + filename := fmt.Sprintf("%s-%d.flv", group.streamName, nowUnix) + filenameWithPath := filepath.Join(group.config.RecordConfig.FlvOutPath, filename) + // 如果已经在录制,则先关闭 + // TODO(chef): 正常的逻辑是否会走到这? + if group.recordFlv != nil { + Log.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s", + group.UniqueKey, filenameWithPath, group.recordFlv.Name()) + _ = group.recordFlv.Dispose() + } + // 初始化录制 + group.recordFlv = &httpflv.FlvFileWriter{} + if err := group.recordFlv.Open(filenameWithPath); err != nil { + Log.Errorf("[%s] record flv open file failed. filename=%s, err=%+v", + group.UniqueKey, filenameWithPath, err) + group.recordFlv = nil + } + if err := group.recordFlv.WriteFlvHeader(); err != nil { + Log.Errorf("[%s] record flv write flv header failed. filename=%s, err=%+v", + group.UniqueKey, filenameWithPath, err) + group.recordFlv = nil + } +} + +func (group *Group) stopRecordFlvIfNeeded() { + if !group.config.RecordConfig.EnableFlv { + return + } + if group.recordFlv != nil { + _ = group.recordFlv.Dispose() + group.recordFlv = nil + } +} diff --git a/pkg/logic/group__record_hls.go b/pkg/logic/group__record_hls.go new file mode 100644 index 0000000..f8015d2 --- /dev/null +++ b/pkg/logic/group__record_hls.go @@ -0,0 +1,43 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import "github.com/q191201771/lal/pkg/hls" + +func (group *Group) IsHlsMuxerAlive() bool { + group.mutex.Lock() + defer group.mutex.Unlock() + return group.hlsMuxer != nil +} + +// startHlsIfNeeded 必要时启动hls +// +func (group *Group) startHlsIfNeeded() { + // TODO(chef): [refactor] ts依赖hls + if !group.config.HlsConfig.Enable { + return + } + if group.hlsMuxer != nil { + Log.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer) + } + enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps + group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group) + group.hlsMuxer.Start() +} + +func (group *Group) stopHlsIfNeeded() { + if !group.config.HlsConfig.Enable { + return + } + if group.hlsMuxer != nil { + group.hlsMuxer.Dispose() + group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath()) + group.hlsMuxer = nil + } +} diff --git a/pkg/logic/group__record_mpegts.go b/pkg/logic/group__record_mpegts.go new file mode 100644 index 0000000..ccc3e76 --- /dev/null +++ b/pkg/logic/group__record_mpegts.go @@ -0,0 +1,50 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "fmt" + "path/filepath" + + "github.com/q191201771/lal/pkg/mpegts" +) + +// startRecordMpegtsIfNeeded 必要时开启ts录制 +// +func (group *Group) startRecordMpegtsIfNeeded(nowUnix int64) { + if !group.config.RecordConfig.EnableMpegts { + return + } + + // 构造文件名 + filename := fmt.Sprintf("%s-%d.ts", group.streamName, nowUnix) + filenameWithPath := filepath.Join(group.config.RecordConfig.MpegtsOutPath, filename) + // 如果已经在录制,则先关闭 + if group.recordMpegts != nil { + Log.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s", + group.UniqueKey, filenameWithPath, group.recordMpegts.Name()) + _ = group.recordMpegts.Dispose() + } + group.recordMpegts = &mpegts.FileWriter{} + if err := group.recordMpegts.Create(filenameWithPath); err != nil { + Log.Errorf("[%s] record mpegts open file failed. filename=%s, err=%+v", + group.UniqueKey, filenameWithPath, err) + group.recordMpegts = nil + } +} + +func (group *Group) stopRecordMpegtsIfNeeded() { + if !group.config.RecordConfig.EnableMpegts { + return + } + if group.recordMpegts != nil { + _ = group.recordMpegts.Dispose() + group.recordMpegts = nil + } +} diff --git a/pkg/logic/group__relay_pull.go b/pkg/logic/group__relay_pull.go new file mode 100644 index 0000000..d785ba7 --- /dev/null +++ b/pkg/logic/group__relay_pull.go @@ -0,0 +1,133 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "fmt" + + "github.com/q191201771/lal/pkg/rtmp" +) + +// StartPull 外部命令主动触发pull拉流 +// +// 当前调用时机: +// 1. 比如http api +// +func (group *Group) StartPull(url string) { + group.mutex.Lock() + defer group.mutex.Unlock() + + group.setPullUrl(true, url) + group.pullIfNeeded() +} + +// --------------------------------------------------------------------------------------------------------------------- + +type pullProxy struct { + isPulling bool + pullSession *rtmp.PullSession +} + +func (group *Group) initRelayPull() { + enable := group.config.RelayPullConfig.Enable + addr := group.config.RelayPullConfig.Addr + appName := group.appName + streamName := group.streamName + + // 根据配置文件中的静态回源配置来初始化回源设置 + var pullUrl string + if enable { + pullUrl = fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName) + } + group.setPullUrl(enable, pullUrl) +} + +func (group *Group) isPullEnable() bool { + return group.pullEnable +} + +func (group *Group) setPullUrl(enable bool, url string) { + group.pullEnable = enable + group.pullUrl = url +} + +func (group *Group) getPullUrl() string { + return group.pullUrl +} + +func (group *Group) setPullingFlag(flag bool) { + group.pullProxy.isPulling = flag +} + +func (group *Group) getPullingFlag() bool { + return group.pullProxy.isPulling +} + +// 判断是否需要pull从远端拉流至本地,如果需要,则触发pull +// +// 当前调用时机: +// 1. 添加新sub session +// 2. 外部命令,比如http api +// 3. 定时器,比如pull的连接断了,通过定时器可以重启触发pull +// +func (group *Group) pullIfNeeded() { + if !group.isPullEnable() { + return + } + // 如果没有从本地拉流的,就不需要pull了 + if !group.hasOutSession() { + return + } + // 如果本地已经有输入型的流,就不需要pull了 + if group.hasInSession() { + return + } + // 已经在pull中,就不需要pull了 + if group.getPullingFlag() { + return + } + group.setPullingFlag(true) + + Log.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.getPullUrl()) + + go func() { + pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { + option.PullTimeoutMs = relayPullTimeoutMs + option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs + }) + // TODO(chef): 处理数据回调,是否应该等待Add成功之后。避免竞态条件中途加入了其他in session + err := pullSession.Pull(group.getPullUrl(), group.OnReadRtmpAvMsg) + if err != nil { + Log.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err) + group.DelRtmpPullSession(pullSession) + return + } + res := group.AddRtmpPullSession(pullSession) + if res { + err = <-pullSession.WaitChan() + Log.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err) + group.DelRtmpPullSession(pullSession) + } else { + pullSession.Dispose() + } + }() +} + +// 判断是否需要停止pull +// +// 当前调用时机: +// 1. 定时器定时检查 +// +func (group *Group) stopPullIfNeeded() { + // 没有输出型的流了 + if group.pullProxy.pullSession != nil && !group.hasOutSession() { + Log.Infof("[%s] stop pull since no sub session.", group.UniqueKey) + group.pullProxy.pullSession.Dispose() + } +} diff --git a/pkg/logic/group__relay_push.go b/pkg/logic/group__relay_push.go new file mode 100644 index 0000000..40c5f37 --- /dev/null +++ b/pkg/logic/group__relay_push.go @@ -0,0 +1,123 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "fmt" + + "github.com/q191201771/lal/pkg/rtmp" +) + +func (group *Group) AddRtmpPushSession(url string, session *rtmp.PushSession) { + Log.Debugf("[%s] [%s] add rtmp PushSession into group.", group.UniqueKey, session.UniqueKey()) + group.mutex.Lock() + defer group.mutex.Unlock() + if group.url2PushProxy != nil { + group.url2PushProxy[url].pushSession = session + } +} + +func (group *Group) DelRtmpPushSession(url string, session *rtmp.PushSession) { + Log.Debugf("[%s] [%s] del rtmp PushSession into group.", group.UniqueKey, session.UniqueKey()) + group.mutex.Lock() + defer group.mutex.Unlock() + if group.url2PushProxy != nil { + group.url2PushProxy[url].pushSession = nil + group.url2PushProxy[url].isPushing = false + } +} + +type pushProxy struct { + isPushing bool + pushSession *rtmp.PushSession +} + +func (group *Group) initRelayPush() { + enable := group.config.RelayPushConfig.Enable + addrList := group.config.RelayPushConfig.AddrList + appName := group.appName + streamName := group.streamName + + url2PushProxy := make(map[string]*pushProxy) + if enable { + for _, addr := range addrList { + pushUrl := fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName) + url2PushProxy[pushUrl] = &pushProxy{ + isPushing: false, + pushSession: nil, + } + } + } + + group.pushEnable = group.config.RelayPushConfig.Enable + group.url2PushProxy = url2PushProxy +} + +// startPushIfNeeded 必要时进行replay push转推 +// +func (group *Group) startPushIfNeeded() { + // push转推功能没开 + if !group.pushEnable { + return + } + // 没有pub发布者 + if group.rtmpPubSession == nil && group.rtspPubSession == nil { + return + } + + // relay push时携带rtmp pub的参数 + // TODO chef: 这个逻辑放这里不太好看 + var urlParam string + if group.rtmpPubSession != nil { + urlParam = group.rtmpPubSession.RawQuery() + } + + for url, v := range group.url2PushProxy { + // 正在转推中 + if v.isPushing { + continue + } + v.isPushing = true + + urlWithParam := url + if urlParam != "" { + urlWithParam += "?" + urlParam + } + Log.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam) + + go func(u, u2 string) { + pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { + option.PushTimeoutMs = relayPushTimeoutMs + option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs + }) + err := pushSession.Push(u2) + if err != nil { + Log.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) + group.DelRtmpPushSession(u, pushSession) + return + } + group.AddRtmpPushSession(u, pushSession) + err = <-pushSession.WaitChan() + Log.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) + group.DelRtmpPushSession(u, pushSession) + }(url, urlWithParam) + } +} + +func (group *Group) stopPushIfNeeded() { + if !group.pushEnable { + return + } + for _, v := range group.url2PushProxy { + if v.pushSession != nil { + v.pushSession.Dispose() + } + v.pushSession = nil + } +} diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index f45a3e3..ab45976 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -46,6 +46,7 @@ type ServerManager struct { rtmpServer *rtmp.Server rtspServer *rtsp.Server httpApiServer *HttpApiServer + pprofServer *http.Server exitChan chan struct{} mutex sync.Mutex @@ -95,8 +96,7 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager { } if sm.config.RtmpConfig.Enable { - // TODO(chef): refactor 参数顺序统一。Observer都放最后好一些。比如rtmp和rtsp的NewServer - sm.rtmpServer = rtmp.NewServer(sm, sm.config.RtmpConfig.Addr) + sm.rtmpServer = rtmp.NewServer(sm.config.RtmpConfig.Addr, sm) } if sm.config.RtspConfig.Enable { sm.rtspServer = rtsp.NewServer(sm.config.RtspConfig.Addr, sm) @@ -105,6 +105,10 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager { sm.httpApiServer = NewHttpApiServer(sm.config.HttpApiConfig.Addr, sm) } + if sm.config.PprofConfig.Enable { + sm.pprofServer = &http.Server{Addr: sm.config.PprofConfig.Addr, Handler: nil} + } + sm.simpleAuthCtx = NewSimpleAuthCtx(sm.config.SimpleAuthConfig) return sm @@ -115,8 +119,15 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager { func (sm *ServerManager) RunLoop() error { sm.option.NotifyHandler.OnServerStart(sm.StatLalInfo()) - if sm.config.PprofConfig.Enable { - go runWebPprof(sm.config.PprofConfig.Addr) + if sm.pprofServer != nil { + go func() { + //Log.Warn("start fgprof.") + //http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler()) + Log.Infof("start web pprof listen. addr=%s", sm.config.PprofConfig.Addr) + if err := sm.pprofServer.ListenAndServe(); err != nil { + Log.Error(err) + } + }() } go base.RunSignalHandler(func() { @@ -210,13 +221,13 @@ func (sm *ServerManager) RunLoop() error { t := time.NewTicker(1 * time.Second) defer t.Stop() - var count uint32 + var tickCount uint32 for { select { case <-sm.exitChan: return nil case <-t.C: - count++ + tickCount++ sm.mutex.Lock() @@ -228,13 +239,13 @@ func (sm *ServerManager) RunLoop() error { return false } - group.Tick() + group.Tick(tickCount) return true }) // 定时打印一些group相关的debug日志 if sm.config.DebugConfig.LogGroupIntervalSec > 0 && - count%uint32(sm.config.DebugConfig.LogGroupIntervalSec) == 0 { + tickCount%uint32(sm.config.DebugConfig.LogGroupIntervalSec) == 0 { groupNum := sm.groupManager.Len() Log.Debugf("DEBUG_GROUP_LOG: group size=%d", groupNum) if sm.config.DebugConfig.LogGroupMaxGroupNum > 0 { @@ -252,7 +263,7 @@ func (sm *ServerManager) RunLoop() error { sm.mutex.Unlock() // 定时通过http notify发送group相关的信息 - if uis != 0 && (count%uis) == 0 { + if uis != 0 && (tickCount%uis) == 0 { updateInfo.ServerId = sm.config.ServerId updateInfo.Groups = sm.StatAllGroup() sm.option.NotifyHandler.OnUpdate(updateInfo) @@ -266,11 +277,18 @@ func (sm *ServerManager) RunLoop() error { func (sm *ServerManager) Dispose() { Log.Debug("dispose server manager.") - // TODO(chef) add httpServer - if sm.rtmpServer != nil { sm.rtmpServer.Dispose() } + + if sm.httpServerManager != nil { + sm.httpServerManager.Dispose() + } + + if sm.pprofServer != nil { + sm.pprofServer.Close() + } + //if sm.hlsServer != nil { // sm.hlsServer.Dispose() //} @@ -634,7 +652,6 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) error { } func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) { - // TODO chef: impl me sm.mutex.Lock() defer sm.mutex.Unlock() group := sm.getGroup(session.AppName(), session.StreamName()) @@ -796,15 +813,3 @@ func (sm *ServerManager) serveHls(writer http.ResponseWriter, req *http.Request) sm.hlsServerHandler.ServeHTTP(writer, req) } - -func runWebPprof(addr string) { - Log.Infof("start web pprof listen. addr=%s", addr) - - //Log.Warn("start fgprof.") - //http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler()) - - if err := http.ListenAndServe(addr, nil); err != nil { - Log.Error(err) - return - } -} diff --git a/pkg/logic/var.go b/pkg/logic/var.go index 811dfae..a5ad96b 100644 --- a/pkg/logic/var.go +++ b/pkg/logic/var.go @@ -13,10 +13,13 @@ import "github.com/q191201771/naza/pkg/nazalog" var Log = nazalog.GetGlobalLogger() var ( - relayPushTimeoutMs = 5000 - relayPushWriteAvTimeoutMs = 5000 - relayPullTimeoutMs = 5000 - relayPullReadAvTimeoutMs = 5000 + relayPushTimeoutMs = 5000 + relayPushWriteAvTimeoutMs = 5000 + relayPullTimeoutMs = 5000 + relayPullReadAvTimeoutMs = 5000 + + // calcSessionStatIntervalSec 计算所有session收发码率的时间间隔 + // calcSessionStatIntervalSec uint32 = 5 // checkSessionAliveIntervalSec diff --git a/pkg/mpegts/pack.go b/pkg/mpegts/pack.go index fe8de3c..f7a99fc 100644 --- a/pkg/mpegts/pack.go +++ b/pkg/mpegts/pack.go @@ -8,7 +8,7 @@ package mpegts -// Frame 帧数据 +// Frame 帧数据,用于打包成mpegts格式的数据 // type Frame struct { Pts uint64 // =(毫秒 * 90) @@ -34,29 +34,38 @@ type Frame struct { Raw []byte } -// OnTsPacket @param packet: 188字节大小的TS包,注意,一次Pack对应的多个TSPacket,复用的是一块内存 +// Pack annexb格式的流转换为mpegts流 // -type OnTsPacket func(packet []byte) - -// PackTsPacket Annexb格式的流转换为mpegts packet -// -// @param frame: 各字段含义见mpegts.Frame结构体定义 -// frame.CC 注意,内部会修改frame.CC的值,外部在调用结束后,可保存CC的值,供下次调用时使用 -// frame.Raw 函数调用结束后,内部不会持有该内存块 +// 注意,内部会增加 Frame.Cc 的值. // -// @param onTsPacket: 注意,一次函数调用,可能对应多次回调 +// @return: 内存块为独立申请,调度结束后,内部不再持有 // -func PackTsPacket(frame *Frame, onTsPacket OnTsPacket) { - wpos := 0 // 当前packet的写入位置 - lpos := 0 // 当前帧的处理位置 - rpos := len(frame.Raw) // 当前帧大小 - first := true // 是否为帧的首个packet的标准 +func (frame *Frame) Pack() []byte { + bufLen := len(frame.Raw) * 2 // 预分配一块足够大的内存 + if bufLen < 188 { + bufLen = 188 + } + buf := make([]byte, bufLen) - // TODO(chef): [perf] 由于上层并不需要区分单个packet,所以可以考虑预分配内存,存储整个packet流 - packet := make([]byte, 188) + lpos := 0 // 当前输入帧的处理位置 + rpos := len(frame.Raw) // 当前输入帧大小 + first := true // 是否为帧的首个packet的标准 + packetPosAtBuf := 0 // 当前输出packet相对于整个输出内存块的位置 for lpos != rpos { - wpos = 0 + + // TODO(chef): CHEFNOTICEME 正常来说,预分配的内存应该是足够用了,我们加个扩容逻辑保证绝对正确性,并且加个日志观察一段时间 + if packetPosAtBuf+188 > len(buf) { + Log.Warnf("buffer too short. frame size=%d, buf=%d, packetPosAtBuf=%d", len(frame.Raw), len(buf), packetPosAtBuf) + newBuf := make([]byte, packetPosAtBuf+188) + copy(newBuf, buf) + buf = newBuf + } + + packet := buf[packetPosAtBuf : packetPosAtBuf+188] // 当前输出packet + wpos := 0 // 当前输出packet的写入位置 + packetPosAtBuf += 188 + frame.Cc++ // 每个packet都需要添加TS Header @@ -224,11 +233,13 @@ func PackTsPacket(frame *Frame, onTsPacket OnTsPacket) { copy(packet[wpos:], frame.Raw[lpos:lpos+inSize]) lpos = rpos } - - onTsPacket(packet) } + + return buf[:packetPosAtBuf] } +// ----- private ------------------------------------------------------------------------------------------------------- + func packPcr(out []byte, pcr uint64) { out[0] = uint8(pcr >> 25) out[1] = uint8(pcr >> 17) diff --git a/pkg/remux/remux.go b/pkg/remux/remux.go index c669943..de9df6a 100644 --- a/pkg/remux/remux.go +++ b/pkg/remux/remux.go @@ -9,3 +9,5 @@ package remux // TODO(chef): refactor 此package更名为avop,内部包含remux_xxx2xxx.go, filter_xxx.go, 协议相关(比如rtmp.go)等 + +var _ rtmp2MpegtsFilterObserver = &Rtmp2MpegtsRemuxer{} diff --git a/pkg/hls/streamer.go b/pkg/remux/rtmp2mpegts.go similarity index 66% rename from pkg/hls/streamer.go rename to pkg/remux/rtmp2mpegts.go index 57a6ff6..e43b923 100644 --- a/pkg/hls/streamer.go +++ b/pkg/remux/rtmp2mpegts.go @@ -6,7 +6,7 @@ // // Author: Chef (191201771@qq.com) -package hls +package remux import ( "encoding/hex" @@ -20,31 +20,42 @@ import ( "github.com/q191201771/naza/pkg/nazabytes" ) -type StreamerObserver interface { - // OnPatPmt @param b const只读内存块,上层可以持有,但是不允许修改 +const ( + initialVideoOutBufferSize = 1024 * 1024 + calcFragmentHeaderQueueSize = 16 + maxAudioCacheDelayByAudio uint64 = 150 * 90 // 单位(毫秒*90) + maxAudioCacheDelayByVideo uint64 = 300 * 90 // 单位(毫秒*90) +) + +type Rtmp2MpegtsRemuxerObserver interface { + // OnPatPmt + // + // @param b: const只读内存块,上层可以持有,但是不允许修改 + // OnPatPmt(b []byte) - // OnFrame + // OnTsPackets // - // @param streamer: 供上层获取streamer内部的一些状态,比如spspps是否已缓存,音频缓存队列是否有数据等 + // @param tsPackets: + // - mpegts数据,有一个或多个188字节的ts数据组成. + // - 回调结束后,remux.Rtmp2MpegtsRemuxer 不再使用这块内存块. // - // @param frame: 各字段含义见 mpegts.Frame 结构体定义 - // frame.CC 注意,回调结束后,Streamer会保存frame.CC,上层在TS打包完成后,可通过frame.CC将cc值传递给Streamer - // frame.Raw 回调结束后,这块内存可能会被内部重复使用 + // @param frame: 各字段含义见 mpegts.Frame 结构体定义 // - OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool) + OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) } -// Streamer 输入rtmp流,回调转封装成Annexb格式的流 -type Streamer struct { +// Rtmp2MpegtsRemuxer 输入rtmp流,输出mpegts流 +// +type Rtmp2MpegtsRemuxer struct { UniqueKey string - observer StreamerObserver - calcFragmentHeaderQueue *Queue - videoOut []byte // Annexb TODO chef: 优化这块buff + observer Rtmp2MpegtsRemuxerObserver + filter *rtmp2MpegtsFilter + videoOut []byte // Annexb spspps []byte // Annexb 也可能是vps+sps+pps ascCtx *aac.AscContext - audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 TODO chef: 优化这块buff + audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 audioCacheFirstFramePts uint64 // audioCacheFrames中第一个音频帧的时间戳 TODO chef: rename to DTS audioCc uint8 videoCc uint8 @@ -52,39 +63,29 @@ type Streamer struct { opened bool } -func NewStreamer(observer StreamerObserver) *Streamer { - uk := base.GenUkStreamer() - videoOut := make([]byte, 1024*1024) - videoOut = videoOut[0:0] - streamer := &Streamer{ +func NewRtmp2MpegtsRemuxer(observer Rtmp2MpegtsRemuxerObserver) *Rtmp2MpegtsRemuxer { + uk := base.GenUkRtmp2MpegtsRemuxer() + r := &Rtmp2MpegtsRemuxer{ UniqueKey: uk, observer: observer, - videoOut: videoOut, } - streamer.calcFragmentHeaderQueue = NewQueue(calcFragmentHeaderQueueSize, streamer) - return streamer + r.audioCacheFrames = nil + r.videoOut = make([]byte, initialVideoOutBufferSize) + r.videoOut = r.videoOut[0:0] + r.filter = newRtmp2MpegtsFilter(calcFragmentHeaderQueueSize, r) + return r } -// FeedRtmpMessage @param msg msg.Payload 调用结束后,函数内部不会持有这块内存 +// FeedRtmpMessage +// +// @param msg: msg.Payload 调用结束后,函数内部不会持有这块内存 // -// TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接 -func (s *Streamer) FeedRtmpMessage(msg base.RtmpMsg) { - s.calcFragmentHeaderQueue.Push(msg) +func (s *Rtmp2MpegtsRemuxer) FeedRtmpMessage(msg base.RtmpMsg) { + s.filter.Push(msg) } -// ----- implement IQueueObserver of Queue ----------------------------------------------------------------------------- - -func (s *Streamer) OnPatPmt(b []byte) { - s.observer.OnPatPmt(b) -} - -func (s *Streamer) OnPop(msg base.RtmpMsg) { - switch msg.Header.MsgTypeId { - case base.RtmpTypeIdAudio: - s.feedAudio(msg) - case base.RtmpTypeIdVideo: - s.feedVideo(msg) - } +func (s *Rtmp2MpegtsRemuxer) Dispose() { + s.FlushAudio() } // --------------------------------------------------------------------------------------------------------------------- @@ -92,12 +93,12 @@ func (s *Streamer) OnPop(msg base.RtmpMsg) { // FlushAudio // // 吐出音频数据的三种情况: -// 1. 收到音频或视频时,音频缓存队列已达到一定长度 +// 1. 收到音频或视频时,音频缓存队列已达到一定长度(内部判断) // 2. 打开一个新的TS文件切片时 // 3. 输入流关闭时 // -func (s *Streamer) FlushAudio() { - if s.audioCacheFrames == nil { +func (s *Rtmp2MpegtsRemuxer) FlushAudio() { + if s.audioCacheEmpty() { return } @@ -109,27 +110,37 @@ func (s *Streamer) FlushAudio() { frame.Raw = s.audioCacheFrames frame.Pid = mpegts.PidAudio frame.Sid = mpegts.StreamIdAudio + + // 注意,在回调前设置为空,因为回调中有可能再次调用FlushAudio + s.resetAudioCache() + s.onFrame(&frame) + // 回调结束后更新cc s.audioCc = frame.Cc - - s.audioCacheFrames = nil } -func (s *Streamer) AudioSeqHeaderCached() bool { - return s.ascCtx != nil -} +// --------------------------------------------------------------------------------------------------------------------- -func (s *Streamer) VideoSeqHeaderCached() bool { - return s.spspps != nil +// onPatPmt onPop +// +// 实现 rtmp2MpegtsFilterObserver +// +func (s *Rtmp2MpegtsRemuxer) onPatPmt(b []byte) { + s.observer.OnPatPmt(b) } -func (s *Streamer) AudioCacheEmpty() bool { - return s.audioCacheFrames == nil +func (s *Rtmp2MpegtsRemuxer) onPop(msg base.RtmpMsg) { + switch msg.Header.MsgTypeId { + case base.RtmpTypeIdAudio: + s.feedAudio(msg) + case base.RtmpTypeIdVideo: + s.feedVideo(msg) + } } -// ----- private ------------------------------------------------------------------------------------------------------- +// --------------------------------------------------------------------------------------------------------------------- -func (s *Streamer) feedVideo(msg base.RtmpMsg) { +func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) { // 注意,有一种情况是msg.Payload为 27 02 00 00 00 // 此时打印错误并返回也不影响 // @@ -163,8 +174,7 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) { audSent := false spsppsSent := false - // 优化这块buffer - out := s.videoOut[0:0] + s.resetVideoOutBuffer() // msg中可能有多个NALU,逐个获取 nals, err := avc.SplitNaluAvcc(msg.Payload[5:]) @@ -197,9 +207,9 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) { //if codecId == base.RtmpCodecIdAvc && (nalType == avc.NaluTypeSei || nalType == avc.NaluTypeIdrSlice || nalType == avc.NaluTypeSlice) { switch codecId { case base.RtmpCodecIdAvc: - out = append(out, avc.AudNalu...) + s.videoOut = append(s.videoOut, avc.AudNalu...) case base.RtmpCodecIdHevc: - out = append(out, hevc.AudNalu...) + s.videoOut = append(s.videoOut, hevc.AudNalu...) } audSent = true } @@ -210,7 +220,7 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) { switch nalType { case avc.NaluTypeIdrSlice: if !spsppsSent { - if out, err = s.appendSpsPps(out); err != nil { + if s.videoOut, err = s.appendSpsPps(s.videoOut); err != nil { Log.Warnf("[%s] append spspps by not exist.", s.UniqueKey) return } @@ -224,7 +234,7 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) { switch nalType { case hevc.NaluTypeSliceIdr, hevc.NaluTypeSliceIdrNlp, hevc.NaluTypeSliceCranut: if !spsppsSent { - if out, err = s.appendSpsPps(out); err != nil { + if s.videoOut, err = s.appendSpsPps(s.videoOut); err != nil { Log.Warnf("[%s] append spspps by not exist.", s.UniqueKey) return } @@ -238,18 +248,18 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) { // 如果写入了aud或spspps,则用start code3,否则start code4。为什么要这样处理 // 这里不知为什么要区分写入两种类型的start code - if len(out) == 0 { - out = append(out, avc.NaluStartCode4...) + if len(s.videoOut) == 0 { + s.videoOut = append(s.videoOut, avc.NaluStartCode4...) } else { - out = append(out, avc.NaluStartCode3...) + s.videoOut = append(s.videoOut, avc.NaluStartCode3...) } - out = append(out, nal...) + s.videoOut = append(s.videoOut, nal...) } dts := uint64(msg.Header.TimestampAbs) * 90 - if s.audioCacheFrames != nil && s.audioCacheFirstFramePts+maxAudioCacheDelayByVideo < dts { + if !s.audioCacheEmpty() && s.audioCacheFirstFramePts+maxAudioCacheDelayByVideo < dts { s.FlushAudio() } @@ -258,7 +268,7 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) { frame.Dts = dts frame.Pts = frame.Dts + uint64(cts)*90 frame.Key = msg.IsVideoKeyNalu() - frame.Raw = out + frame.Raw = s.videoOut frame.Pid = mpegts.PidVideo frame.Sid = mpegts.StreamIdVideo @@ -266,7 +276,7 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) { s.videoCc = frame.Cc } -func (s *Streamer) feedAudio(msg base.RtmpMsg) { +func (s *Rtmp2MpegtsRemuxer) feedAudio(msg base.RtmpMsg) { if len(msg.Payload) < 3 { Log.Errorf("[%s] invalid audio message length. len=%d", s.UniqueKey, len(msg.Payload)) return @@ -284,18 +294,18 @@ func (s *Streamer) feedAudio(msg base.RtmpMsg) { return } - if !s.AudioSeqHeaderCached() { + if !s.audioSeqHeaderCached() { Log.Warnf("[%s] feed audio message but aac seq header not exist.", s.UniqueKey) return } pts := uint64(msg.Header.TimestampAbs) * 90 - if s.audioCacheFrames != nil && s.audioCacheFirstFramePts+maxAudioCacheDelayByAudio < pts { + if !s.audioCacheEmpty() && s.audioCacheFirstFramePts+maxAudioCacheDelayByAudio < pts { s.FlushAudio() } - if s.audioCacheFrames == nil { + if s.audioCacheEmpty() { s.audioCacheFirstFramePts = pts } @@ -304,13 +314,17 @@ func (s *Streamer) feedAudio(msg base.RtmpMsg) { s.audioCacheFrames = append(s.audioCacheFrames, msg.Payload[2:]...) } -func (s *Streamer) cacheAacSeqHeader(msg base.RtmpMsg) error { +func (s *Rtmp2MpegtsRemuxer) cacheAacSeqHeader(msg base.RtmpMsg) error { var err error s.ascCtx, err = aac.NewAscContext(msg.Payload[2:]) return err } -func (s *Streamer) appendSpsPps(out []byte) ([]byte, error) { +func (s *Rtmp2MpegtsRemuxer) audioSeqHeaderCached() bool { + return s.ascCtx != nil +} + +func (s *Rtmp2MpegtsRemuxer) appendSpsPps(out []byte) ([]byte, error) { if s.spspps == nil { return out, base.ErrHls } @@ -319,12 +333,28 @@ func (s *Streamer) appendSpsPps(out []byte) ([]byte, error) { return out, nil } -func (s *Streamer) onFrame(frame *mpegts.Frame) { +func (s *Rtmp2MpegtsRemuxer) videoSeqHeaderCached() bool { + return s.spspps != nil +} + +func (s *Rtmp2MpegtsRemuxer) audioCacheEmpty() bool { + return len(s.audioCacheFrames) == 0 +} + +func (s *Rtmp2MpegtsRemuxer) resetAudioCache() { + s.audioCacheFrames = s.audioCacheFrames[0:0] +} + +func (s *Rtmp2MpegtsRemuxer) resetVideoOutBuffer() { + s.videoOut = s.videoOut[0:0] +} + +func (s *Rtmp2MpegtsRemuxer) onFrame(frame *mpegts.Frame) { var boundary bool if frame.Sid == mpegts.StreamIdAudio { // 为了考虑没有视频的情况也能切片,所以这里判断spspps为空时,也建议生成fragment - boundary = !s.VideoSeqHeaderCached() + boundary = !s.videoSeqHeaderCached() } else { // 收到视频,可能触发建立fragment的条件是: // 关键帧数据 && @@ -333,12 +363,14 @@ func (s *Streamer) onFrame(frame *mpegts.Frame) { // (收到过音频seq header && fragment没有打开) || 说明 音视频都有,且都已ready // (收到过音频seq header && fragment已经打开 && 音频缓存数据不为空) 说明 为什么音频缓存需不为空? // ) - boundary = frame.Key && (!s.AudioSeqHeaderCached() || !s.opened || !s.AudioCacheEmpty()) + boundary = frame.Key && (!s.audioSeqHeaderCached() || !s.opened || !s.audioCacheEmpty()) } if boundary { s.opened = true } - s.observer.OnFrame(s, frame, boundary) + packets := frame.Pack() + + s.observer.OnTsPackets(packets, frame, boundary) } diff --git a/pkg/hls/queue.go b/pkg/remux/rtmp2mpegts_filter_.go similarity index 55% rename from pkg/hls/queue.go rename to pkg/remux/rtmp2mpegts_filter_.go index 08b3fa9..670b0c3 100644 --- a/pkg/hls/queue.go +++ b/pkg/remux/rtmp2mpegts_filter_.go @@ -6,39 +6,49 @@ // // Author: Chef (191201771@qq.com) -package hls +package remux import ( "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/mpegts" ) -// Queue +// rtmp2MpegtsFilter +// +// 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式,生成正确的mpegts PatPmt头信息 // -// 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式 // 一旦判断结束,该队列变成直进直出,不再有实际缓存 // -type Queue struct { +type rtmp2MpegtsFilter struct { maxMsgSize int data []base.RtmpMsg - observer IQueueObserver + observer rtmp2MpegtsFilterObserver audioCodecId int videoCodecId int done bool } -type IQueueObserver interface { - // OnPatPmt 该回调一定发生在数据回调之前 +type rtmp2MpegtsFilterObserver interface { + // OnPatPmt + // + // 该回调一定发生在数据回调之前 + // 只会返回两种格式,h264和h265 + // + // TODO(chef): [opt] 当没有视频时,不应该返回h264的格式 // TODO(chef) 这里可以考虑换成只通知drain,由上层完成FragmentHeader的组装逻辑 - OnPatPmt(b []byte) + // + onPatPmt(b []byte) - OnPop(msg base.RtmpMsg) + onPop(msg base.RtmpMsg) } -// NewQueue @param maxMsgSize 最大缓存多少个包 -func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue { - return &Queue{ +// NewRtmp2MpegtsFilter +// +// @param maxMsgSize: 最大缓存多少个包 +// +func newRtmp2MpegtsFilter(maxMsgSize int, observer rtmp2MpegtsFilterObserver) *rtmp2MpegtsFilter { + return &rtmp2MpegtsFilter{ maxMsgSize: maxMsgSize, data: make([]base.RtmpMsg, maxMsgSize)[0:0], observer: observer, @@ -48,10 +58,13 @@ func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue { } } -// Push @param msg 函数调用结束后,内部不持有该内存块 -func (q *Queue) Push(msg base.RtmpMsg) { +// Push +// +// @param msg: 函数调用结束后,内部不持有该内存块 +// +func (q *rtmp2MpegtsFilter) Push(msg base.RtmpMsg) { if q.done { - q.observer.OnPop(msg) + q.observer.onPop(msg) return } @@ -75,18 +88,20 @@ func (q *Queue) Push(msg base.RtmpMsg) { } } -func (q *Queue) drain() { +// --------------------------------------------------------------------------------------------------------------------- + +func (q *rtmp2MpegtsFilter) drain() { switch q.videoCodecId { case int(base.RtmpCodecIdAvc): - q.observer.OnPatPmt(mpegts.FixedFragmentHeader) + q.observer.onPatPmt(mpegts.FixedFragmentHeader) case int(base.RtmpCodecIdHevc): - q.observer.OnPatPmt(mpegts.FixedFragmentHeaderHevc) + q.observer.onPatPmt(mpegts.FixedFragmentHeaderHevc) default: // TODO(chef) 正确处理只有音频或只有视频的情况 #56 - q.observer.OnPatPmt(mpegts.FixedFragmentHeader) + q.observer.onPatPmt(mpegts.FixedFragmentHeader) } for i := range q.data { - q.observer.OnPop(q.data[i]) + q.observer.onPop(q.data[i]) } q.data = nil diff --git a/pkg/hls/queue_test.go b/pkg/remux/rtmp2mpegts_filter_test.go similarity index 82% rename from pkg/hls/queue_test.go rename to pkg/remux/rtmp2mpegts_filter_test.go index 8aeec42..5d7e167 100644 --- a/pkg/hls/queue_test.go +++ b/pkg/remux/rtmp2mpegts_filter_test.go @@ -6,7 +6,7 @@ // // Author: Chef (191201771@qq.com) -package hls +package remux import ( "testing" @@ -24,15 +24,15 @@ var ( type qo struct { } -func (q *qo) OnPatPmt(b []byte) { +func (q *qo) onPatPmt(b []byte) { fh = b } -func (q *qo) OnPop(msg base.RtmpMsg) { +func (q *qo) onPop(msg base.RtmpMsg) { poped = append(poped, msg) } -func TestQueue(t *testing.T) { +func TestRtmp2MpegtsFilter(t *testing.T) { goldenRtmpMsg := []base.RtmpMsg{ { Header: base.RtmpHeader{ @@ -49,9 +49,9 @@ func TestQueue(t *testing.T) { } q := &qo{} - queue := NewQueue(8, q) + f := newRtmp2MpegtsFilter(8, q) for i := range goldenRtmpMsg { - queue.Push(goldenRtmpMsg[i]) + f.Push(goldenRtmpMsg[i]) } assert.Equal(t, mpegts.FixedFragmentHeader, fh) assert.Equal(t, goldenRtmpMsg, poped) diff --git a/pkg/remux/rtmp2rtsp.go b/pkg/remux/rtmp2rtsp.go index 77ab9e7..3607eca 100644 --- a/pkg/remux/rtmp2rtsp.go +++ b/pkg/remux/rtmp2rtsp.go @@ -192,7 +192,7 @@ func (r *Rtmp2RtspRemuxer) getAudioPacker() *rtprtcp.RtpPacker { } clockRate, err := ascCtx.GetSamplingFrequency() if err != nil { - Log.Errorf("get sampling frequency failed. err=%+v", err) + Log.Errorf("get sampling frequency failed. err=%+v, asc=%s", err, hex.Dump(r.asc)) } pp := rtprtcp.NewRtpPackerPayloadAac() diff --git a/pkg/rtmp/chunk_divider.go b/pkg/rtmp/chunk_divider.go index 84e1ad1..8aea675 100644 --- a/pkg/rtmp/chunk_divider.go +++ b/pkg/rtmp/chunk_divider.go @@ -108,7 +108,6 @@ func calcHeader(header *base.RtmpHeader, prevHeader *base.RtmpHeader, out []byte // 设置扩展时间戳 if timestamp > maxTimestampInMessageHeader { - //log.Debugf("CHEFERASEME %+v %+v %d %d", header, prevHeader, timestamp, index) bele.BePutUint32(out[index:], timestamp) index += 4 } diff --git a/pkg/rtmp/server.go b/pkg/rtmp/server.go index 3d2c366..9785621 100644 --- a/pkg/rtmp/server.go +++ b/pkg/rtmp/server.go @@ -34,15 +34,15 @@ type ServerObserver interface { } type Server struct { - observer ServerObserver addr string + observer ServerObserver ln net.Listener } -func NewServer(observer ServerObserver, addr string) *Server { +func NewServer(addr string, observer ServerObserver) *Server { return &Server{ - observer: observer, addr: addr, + observer: observer, } }