Merge pull request #18 from q191201771/master

更新到最新
pull/134/head
joestarzxh 3 years ago committed by GitHub
commit 771182edf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -21,6 +21,8 @@ image:
.PHONY: clean .PHONY: clean
clean: clean:
rm -rf ./bin ./lal_record ./logs coverage.txt rm -rf ./bin ./lal_record ./logs coverage.txt
find ./pkg -name 'lal_record' | xargs rm -rf
find ./pkg -name 'logs' | xargs rm -rf
.PHONY: all .PHONY: all
all: build test all: build test

@ -14,6 +14,8 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hls" "github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/lal/pkg/rtmp"
@ -46,13 +48,13 @@ func main() {
hlsMuexer := hls.NewMuxer(streamName, true, &hlsMuxerConfig, nil) hlsMuexer := hls.NewMuxer(streamName, true, &hlsMuxerConfig, nil)
hlsMuexer.Start() hlsMuexer.Start()
rtmp2Mpegts := remux.NewRtmp2MpegtsRemuxer(hlsMuexer)
pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMs = 10000 option.PullTimeoutMs = 10000
option.ReadAvTimeoutMs = 10000 option.ReadAvTimeoutMs = 10000
}) })
err = pullSession.Pull(url, func(msg base.RtmpMsg) { err = pullSession.Pull(url, rtmp2Mpegts.FeedRtmpMessage)
hlsMuexer.FeedRtmpMessage(msg)
})
if err != nil { if err != nil {
nazalog.Fatalf("pull rtmp failed. err=%+v", err) nazalog.Fatalf("pull rtmp failed. err=%+v", err)

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.14 go 1.14
require github.com/q191201771/naza v0.30.0 require github.com/q191201771/naza v0.30.1

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.30.0 h1:tfy1O0QRl3O80mH8PSAd2FhpZ5eL7coQtCF0HzjEO4Y= github.com/q191201771/naza v0.30.1 h1:8EAydcrHs+4lUjP4XBJvTlevuOzVcZIoGj5ZK6Y6Njc=
github.com/q191201771/naza v0.30.0/go.mod h1:n+dpJjQSh90PxBwxBNuifOwQttywvSIN5TkWSSYCeBk= github.com/q191201771/naza v0.30.1/go.mod h1:n+dpJjQSh90PxBwxBNuifOwQttywvSIN5TkWSSYCeBk=

@ -174,7 +174,7 @@ func (ascCtx *AscContext) GetSamplingFrequency() (int, error) {
case AscSamplingFrequencyIndex44100: case AscSamplingFrequencyIndex44100:
return 44100, nil return 44100, nil
} }
return -1, fmt.Errorf("%w. index=%d", base.ErrSamplingFrequencyIndex, ascCtx.SamplingFrequencyIndex) return -1, fmt.Errorf("%w. asCtx=%+v", base.ErrSamplingFrequencyIndex, ascCtx)
} }
type AdtsHeaderContext struct { type AdtsHeaderContext struct {

@ -441,7 +441,6 @@ func SplitNaluAvcc(nals []byte) (nalList [][]byte, err error) {
nalList = append(nalList, nal) nalList = append(nalList, nal)
}) })
return return
} }
func IterateNaluAnnexb(nals []byte, handler func(nal []byte)) error { func IterateNaluAnnexb(nals []byte, handler func(nal []byte)) error {
@ -517,4 +516,14 @@ func IterateNaluAvcc(nals []byte, handler func(nal []byte)) error {
} }
} }
func Avcc2Annexb(nals []byte) ([]byte, error) {
ret := make([]byte, len(nals))
ret = ret[0:0]
err := IterateNaluAvcc(nals, func(nal []byte) {
ret = append(ret, NaluStartCode4...)
ret = append(ret, nal...)
})
return ret, err
}
// TODO(chef): 是否需要 func NaluAvcc2Annexb, func NaluAnnexb2Avcc // TODO(chef): 是否需要 func NaluAvcc2Annexb, func NaluAnnexb2Avcc

@ -13,6 +13,21 @@ import (
"strings" "strings"
) )
// group中session Dispose表现记录
//
// Dispose结束后回调OnDel:
// rtmp.ServerSession(包含pub和sub) 1
// rtsp.PubSession和rtsp.SubSession 1
// rtmp.PullSession 2
// httpflv.SubSession 3
// httpts.SubSession 3
//
//
// 情况1: 协议正常走完回调OnAdd在自身server的RunLoop结束后回调OnDel
// 情况2: 在group中pull阻塞结束后手动回调OnDel
// 情况3: 在logic中sub RunLoop结束后手动回调OnDel
//
// IsUseClosedConnectionError 当connection处于这些情况时就不需要再Close了 // IsUseClosedConnectionError 当connection处于这些情况时就不需要再Close了
// TODO(chef): 临时放这 // TODO(chef): 临时放这
// TODO(chef): 目前暂时没有使用因为connection支持多次调用Close // TODO(chef): 目前暂时没有使用因为connection支持多次调用Close
@ -81,7 +96,9 @@ type IClientSessionLifecycle interface {
type IServerSessionLifecycle interface { type IServerSessionLifecycle interface {
// RunLoop 开启session的事件循环阻塞直到session结束 // RunLoop 开启session的事件循环阻塞直到session结束
// //
RunLoop() error // 注意rtsp的 pub和sub没有RunLoopRunLoop是在cmd session上所以暂时把这个函数从接口去除
//
//RunLoop() error
// Dispose 主动关闭session时调用 // Dispose 主动关闭session时调用
// //

@ -25,7 +25,7 @@ const (
UkPreGroup = "GROUP" UkPreGroup = "GROUP"
UkPreHlsMuxer = "HLSMUXER" UkPreHlsMuxer = "HLSMUXER"
UkPreStreamer = "STREAMER" UkPreRtmp2MpegtsRemuxer = "RTMP2MPEGTS"
) )
//func GenUk(prefix string) string { //func GenUk(prefix string) string {
@ -84,8 +84,8 @@ func GenUkHlsMuxer() string {
return siUkHlsMuxer.GenUniqueKey() return siUkHlsMuxer.GenUniqueKey()
} }
func GenUkStreamer() string { func GenUkRtmp2MpegtsRemuxer() string {
return siUkStreamer.GenUniqueKey() return siUkRtmp2MpegtsRemuxer.GenUniqueKey()
} }
var ( var (
@ -103,7 +103,7 @@ var (
siUkGroup *unique.SingleGenerator siUkGroup *unique.SingleGenerator
siUkHlsMuxer *unique.SingleGenerator siUkHlsMuxer *unique.SingleGenerator
siUkStreamer *unique.SingleGenerator siUkRtmp2MpegtsRemuxer *unique.SingleGenerator
) )
func init() { func init() {
@ -121,5 +121,5 @@ func init() {
siUkGroup = unique.NewSingleGenerator(UkPreGroup) siUkGroup = unique.NewSingleGenerator(UkPreGroup)
siUkHlsMuxer = unique.NewSingleGenerator(UkPreHlsMuxer) siUkHlsMuxer = unique.NewSingleGenerator(UkPreHlsMuxer)
siUkStreamer = unique.NewSingleGenerator(UkPreStreamer) siUkRtmp2MpegtsRemuxer = unique.NewSingleGenerator(UkPreRtmp2MpegtsRemuxer)
} }

@ -19,19 +19,8 @@ import (
"github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/base"
) )
// TODO chef: 转换TS流的功能通过回调供httpts使用也放在了Muxer中好处是hls和httpts可以共用一份TS流。
// 后续从架构上考虑packet hls,mpegts,logic的分工
type MuxerObserver interface { type MuxerObserver interface {
OnPatPmt(b []byte) OnFragmentOpen()
// OnTsPackets
//
// @param rawFrame: TS流回调结束后内部不再使用该内存块
//
// @param boundary: 新的TS流接收者应该从该标志为true时开始发送数据
//
OnTsPackets(rawFrame []byte, boundary bool)
} }
// MuxerConfig // MuxerConfig
@ -54,7 +43,7 @@ const (
// Muxer // Muxer
// //
// 输入rtmp流转出hls(m3u8+ts)至文件中并回调给上层转出ts流 // 输入mpegts转出hls(m3u8+ts)至文件中
// //
type Muxer struct { type Muxer struct {
UniqueKey string UniqueKey string
@ -86,7 +75,6 @@ type Muxer struct {
frag int // frag 写入m3u8的EXT-X-MEDIA-SEQUENCE字段 frag int // frag 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []fragmentInfo // frags TS文件的固定大小环形队列记录TS的信息 frags []fragmentInfo // frags TS文件的固定大小环形队列记录TS的信息
streamer *Streamer
patpmt []byte patpmt []byte
} }
@ -123,8 +111,6 @@ func NewMuxer(streamName string, enable bool, config *MuxerConfig, observer Muxe
observer: observer, observer: observer,
} }
m.makeFrags() m.makeFrags()
streamer := NewStreamer(m)
m.streamer = streamer
Log.Infof("[%s] lifecycle new hls muxer. muxer=%p, streamName=%s", uk, m, streamName) Log.Infof("[%s] lifecycle new hls muxer. muxer=%p, streamName=%s", uk, m, streamName)
return m return m
} }
@ -136,30 +122,32 @@ func (m *Muxer) Start() {
func (m *Muxer) Dispose() { func (m *Muxer) Dispose() {
Log.Infof("[%s] lifecycle dispose hls muxer.", m.UniqueKey) Log.Infof("[%s] lifecycle dispose hls muxer.", m.UniqueKey)
m.streamer.FlushAudio()
if err := m.closeFragment(true); err != nil { if err := m.closeFragment(true); err != nil {
Log.Errorf("[%s] close fragment error. err=%+v", m.UniqueKey, err) Log.Errorf("[%s] close fragment error. err=%+v", m.UniqueKey, err)
} }
} }
// FeedRtmpMessage @param msg 函数调用结束后内部不持有msg中的内存块 // ---------------------------------------------------------------------------------------------------------------------
// OnPatPmt OnTsPackets
//
// 实现 remux.Rtmp2MpegtsRemuxerObserver方便直接将 remux.Rtmp2MpegtsRemuxer 的数据喂入 hls.Muxer
// //
func (m *Muxer) FeedRtmpMessage(msg base.RtmpMsg) { func (m *Muxer) OnPatPmt(b []byte) {
m.streamer.FeedRtmpMessage(msg) m.FeedPatPmt(b)
}
func (m *Muxer) OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) {
m.FeedMpegts(tsPackets, frame, boundary)
} }
// ----- implement StreamerObserver of Streamer ------------------------------------------------------------------------ // ---------------------------------------------------------------------------------------------------------------------
func (m *Muxer) OnPatPmt(b []byte) { func (m *Muxer) FeedPatPmt(b []byte) {
m.patpmt = b m.patpmt = b
if m.observer != nil {
m.observer.OnPatPmt(b)
}
} }
func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool) { func (m *Muxer) FeedMpegts(tsPackets []byte, frame *mpegts.Frame, boundary bool) {
var packets []byte
if frame.Sid == mpegts.StreamIdAudio { if frame.Sid == mpegts.StreamIdAudio {
// TODO(chef): 为什么音频用pts视频用dts // TODO(chef): 为什么音频用pts视频用dts
if err := m.updateFragment(frame.Pts, boundary); err != nil { if err := m.updateFragment(frame.Pts, boundary); err != nil {
@ -168,7 +156,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool)
} }
// TODO(chef): 有updateFragment的返回值判断这里的判断可以考虑删除 // TODO(chef): 有updateFragment的返回值判断这里的判断可以考虑删除
if !m.opened { if !m.opened {
Log.Warnf("[%s] OnFrame A not opened. boundary=%t", m.UniqueKey, boundary) Log.Warnf("[%s] FeedMpegts A not opened. boundary=%t", m.UniqueKey, boundary)
return return
} }
//Log.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw)) //Log.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
@ -179,26 +167,16 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool)
} }
// TODO(chef): 有updateFragment的返回值判断这里的判断可以考虑删除 // TODO(chef): 有updateFragment的返回值判断这里的判断可以考虑删除
if !m.opened { if !m.opened {
Log.Warnf("[%s] OnFrame V not opened. boundary=%t, key=%t", m.UniqueKey, boundary, frame.Key) Log.Warnf("[%s] FeedMpegts V not opened. boundary=%t, key=%t", m.UniqueKey, boundary, frame.Key)
return return
} }
//Log.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.Dts, len(frame.Raw)) //Log.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.Dts, len(frame.Raw))
} }
mpegts.PackTsPacket(frame, func(packet []byte) { if err := m.fragment.WriteFile(tsPackets); err != nil {
if m.enable {
if err := m.fragment.WriteFile(packet); err != nil {
Log.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err) Log.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err)
return return
} }
}
if m.observer != nil {
packets = append(packets, packet...)
}
})
if m.observer != nil {
m.observer.OnTsPackets(packets, boundary)
}
} }
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
@ -314,7 +292,7 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error {
m.fragTs = ts m.fragTs = ts
// nrm said: start fragment with audio to make iPhone happy // nrm said: start fragment with audio to make iPhone happy
m.streamer.FlushAudio() m.observer.OnFragmentOpen()
return nil return nil
} }

@ -20,7 +20,3 @@ var (
Log = nazalog.GetGlobalLogger() Log = nazalog.GetGlobalLogger()
) )
var (
calcFragmentHeaderQueueSize = 16
)

@ -65,12 +65,10 @@ var (
// IServerSession // IServerSession
var ( var (
_ base.IServerSession = &rtmp.ServerSession{} _ base.IServerSession = &rtmp.ServerSession{}
_ base.IServerSession = &rtsp.PubSession{}
_ base.IServerSession = &rtsp.SubSession{}
_ base.IServerSession = &httpflv.SubSession{} _ base.IServerSession = &httpflv.SubSession{}
_ base.IServerSession = &httpts.SubSession{} _ base.IServerSession = &httpts.SubSession{}
// 这两个比较特殊它们没有RunLoop函数RunLoop在rtsp.ServerCommandSession上
//_ base.IServerSession = &rtsp.PubSession{}
//_ base.IServerSession = &rtsp.SubSession{}
) )
// IClientSessionLifecycle: 所有Client Session都满足 // IClientSessionLifecycle: 所有Client Session都满足
@ -91,12 +89,11 @@ var (
var ( var (
// server session // server session
_ base.IServerSessionLifecycle = &rtmp.ServerSession{} _ base.IServerSessionLifecycle = &rtmp.ServerSession{}
_ base.IServerSessionLifecycle = &rtsp.PubSession{}
_ base.IServerSessionLifecycle = &rtsp.SubSession{}
_ base.IServerSessionLifecycle = &httpflv.SubSession{} _ base.IServerSessionLifecycle = &httpflv.SubSession{}
_ base.IServerSessionLifecycle = &httpts.SubSession{} _ base.IServerSessionLifecycle = &httpts.SubSession{}
// 这两个比较特殊它们没有RunLoop函数RunLoop在rtsp.ServerCommandSession上
//_ base.IServerSessionLifecycle = &rtsp.PubSession{}
//_ base.IServerSessionLifecycle = &rtsp.SubSession{}
// other // other
_ base.IServerSessionLifecycle = &base.HttpSubSession{} _ base.IServerSessionLifecycle = &base.HttpSubSession{}
_ base.IServerSessionLifecycle = &rtsp.ServerCommandSession{} _ base.IServerSessionLifecycle = &rtsp.ServerCommandSession{}
@ -188,6 +185,7 @@ var _ rtsp.PubSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ hls.MuxerObserver = &logic.Group{} var _ hls.MuxerObserver = &logic.Group{}
var _ rtsp.BaseInSessionObserver = &logic.Group{} // var _ rtsp.BaseInSessionObserver = &logic.Group{} //
var _ rtsp.BaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{} var _ rtsp.BaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ remux.Rtmp2MpegtsRemuxerObserver = &hls.Muxer{}
var _ rtmp.ServerSessionObserver = &rtmp.Server{} var _ rtmp.ServerSessionObserver = &rtmp.Server{}
var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientSimple{} var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientSimple{}
@ -202,5 +200,3 @@ var _ rtsp.IInterleavedPacketWriter = &rtsp.PubSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.SubSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.SubSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.ClientCommandSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.ClientCommandSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.ServerCommandSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.ServerCommandSession{}
var _ hls.StreamerObserver = &hls.Muxer{}

@ -9,7 +9,6 @@
package innertest package innertest
import ( import (
"bytes"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -17,6 +16,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/lal/pkg/httpts" "github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/naza/pkg/filebatch" "github.com/q191201771/naza/pkg/filebatch"
@ -53,16 +55,11 @@ import (
// - 加上relay pull // - 加上relay pull
var ( var (
tt *testing.T t *testing.T
mode int // 0 正常 1 输入只有音频 2 输入只有视频
confFile = "../../testdata/lalserver.conf.json" confFile = "../../testdata/lalserver.conf.json"
rFlvFileName = "../../testdata/test.flv" rFlvFileName = "../../testdata/test.flv"
wRtmpPullFileName = "../../testdata/rtmppull.flv"
wFlvPullFileName = "../../testdata/flvpull.flv"
wPlaylistM3u8FileName string
wRecordM3u8FileName string
wHlsTsFilePath string
//wRtspPullFileName = "../../testdata/rtsppull.flv"
pushUrl string pushUrl string
httpflvPullUrl string httpflvPullUrl string
@ -70,9 +67,17 @@ var (
rtmpPullUrl string rtmpPullUrl string
rtspPullUrl string rtspPullUrl string
wRtmpPullFileName string
wFlvPullFileName string
wPlaylistM3u8FileName string
wRecordM3u8FileName string
wHlsTsFilePath string
wTsPullFileName string
fileTagCount int fileTagCount int
httpflvPullTagCount nazaatomic.Uint32 httpflvPullTagCount nazaatomic.Uint32
rtmpPullTagCount nazaatomic.Uint32 rtmpPullTagCount nazaatomic.Uint32
httptsSize nazaatomic.Uint32
rtspSdpCtx sdp.LogicContext rtspSdpCtx sdp.LogicContext
rtspPullAvPacketCount nazaatomic.Uint32 rtspPullAvPacketCount nazaatomic.Uint32
@ -99,7 +104,20 @@ func (r RtspPullObserver) OnAvPacket(pkt base.AvPacket) {
rtspPullAvPacketCount.Increment() rtspPullAvPacketCount.Increment()
} }
func Entry(t *testing.T) { func Entry(tt *testing.T) {
t = tt
mode = 0
entry()
mode = 1
entry()
mode = 2
entry()
}
func entry() {
if _, err := os.Lstat(confFile); err != nil { if _, err := os.Lstat(confFile); err != nil {
Log.Warnf("lstat %s error. err=%+v", confFile, err) Log.Warnf("lstat %s error. err=%+v", confFile, err)
return return
@ -109,22 +127,26 @@ func Entry(t *testing.T) {
return return
} }
httpflvPullTagCount.Store(0)
rtmpPullTagCount.Store(0)
httptsSize.Store(0)
hls.Clock = mock.NewFakeClock() hls.Clock = mock.NewFakeClock()
hls.Clock.Set(time.Date(2022, 1, 16, 23, 24, 25, 0, time.Local)) hls.Clock.Set(time.Date(2022, 1, 16, 23, 24, 25, 0, time.UTC))
httpts.SubSessionWriteChanSize = 0 httpts.SubSessionWriteChanSize = 0
tt = t
var err error var err error
sm := logic.NewServerManager(confFile) sm := logic.NewServerManager(confFile)
go sm.RunLoop()
time.Sleep(200 * time.Millisecond)
config := sm.Config() config := sm.Config()
//Log.Init(func(option *nazalog.Option) {
// option.Level = nazalog.LevelLogNothing
//})
_ = os.RemoveAll(config.HlsConfig.OutPath) _ = os.RemoveAll(config.HlsConfig.OutPath)
go sm.RunLoop()
time.Sleep(100 * time.Millisecond)
getAllHttpApi(config.HttpApiConfig.Addr) getAllHttpApi(config.HttpApiConfig.Addr)
pushUrl = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RtmpConfig.Addr) pushUrl = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RtmpConfig.Addr)
@ -132,12 +154,32 @@ func Entry(t *testing.T) {
httptsPullUrl = fmt.Sprintf("http://127.0.0.1%s/live/innertest.ts", config.HttpflvConfig.HttpListenAddr) httptsPullUrl = fmt.Sprintf("http://127.0.0.1%s/live/innertest.ts", config.HttpflvConfig.HttpListenAddr)
rtmpPullUrl = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RtmpConfig.Addr) rtmpPullUrl = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RtmpConfig.Addr)
rtspPullUrl = fmt.Sprintf("rtsp://127.0.0.1%s/live/innertest", config.RtspConfig.Addr) rtspPullUrl = fmt.Sprintf("rtsp://127.0.0.1%s/live/innertest", config.RtspConfig.Addr)
wRtmpPullFileName = "../../testdata/rtmppull.flv"
wFlvPullFileName = "../../testdata/flvpull.flv"
wTsPullFileName = fmt.Sprintf("../../testdata/tspull_%d.ts", mode)
wPlaylistM3u8FileName = fmt.Sprintf("%sinnertest/playlist.m3u8", config.HlsConfig.OutPath) wPlaylistM3u8FileName = fmt.Sprintf("%sinnertest/playlist.m3u8", config.HlsConfig.OutPath)
wRecordM3u8FileName = fmt.Sprintf("%sinnertest/record.m3u8", config.HlsConfig.OutPath) wRecordM3u8FileName = fmt.Sprintf("%sinnertest/record.m3u8", config.HlsConfig.OutPath)
wHlsTsFilePath = fmt.Sprintf("%sinnertest/", config.HlsConfig.OutPath) wHlsTsFilePath = fmt.Sprintf("%sinnertest/", config.HlsConfig.OutPath)
tags, err := httpflv.ReadAllTagsFromFlvFile(rFlvFileName) var tags []httpflv.Tag
originTags, err := httpflv.ReadAllTagsFromFlvFile(rFlvFileName)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
if mode == 0 {
tags = originTags
} else if mode == 1 {
for _, tag := range originTags {
if tag.Header.Type == base.RtmpTypeIdMetadata || tag.Header.Type == base.RtmpTypeIdAudio {
tags = append(tags, tag)
}
}
} else if mode == 2 {
for _, tag := range originTags {
if tag.Header.Type == base.RtmpTypeIdMetadata || tag.Header.Type == base.RtmpTypeIdVideo {
tags = append(tags, tag)
}
}
}
fileTagCount = len(tags) fileTagCount = len(tags)
err = httpFlvWriter.Open(wFlvPullFileName) err = httpFlvWriter.Open(wFlvPullFileName)
@ -160,7 +202,7 @@ func Entry(t *testing.T) {
func(msg base.RtmpMsg) { func(msg base.RtmpMsg) {
tag := remux.RtmpMsg2FlvTag(msg) tag := remux.RtmpMsg2FlvTag(msg)
err := rtmpWriter.WriteTag(*tag) err := rtmpWriter.WriteTag(*tag)
assert.Equal(tt, nil, err) assert.Equal(t, nil, err)
rtmpPullTagCount.Increment() rtmpPullTagCount.Increment()
}) })
Log.Assert(nil, err) Log.Assert(nil, err)
@ -169,6 +211,7 @@ func Entry(t *testing.T) {
}() }()
go func() { go func() {
var flvErr error
httpflvPullSession = httpflv.NewPullSession(func(option *httpflv.PullSessionOption) { httpflvPullSession = httpflv.NewPullSession(func(option *httpflv.PullSessionOption) {
option.ReadTimeoutMs = 10000 option.ReadTimeoutMs = 10000
}) })
@ -178,19 +221,17 @@ func Entry(t *testing.T) {
httpflvPullTagCount.Increment() httpflvPullTagCount.Increment()
}) })
Log.Assert(nil, err) Log.Assert(nil, err)
err = <-httpflvPullSession.WaitChan() flvErr = <-httpflvPullSession.WaitChan()
Log.Debug(err) Log.Debug(flvErr)
}() }()
go func() { go func() {
//nazalog.Info("CHEFGREPME >") b, _ := getHttpts()
b, err := httpGet(httptsPullUrl) _ = ioutil.WriteFile(wTsPullFileName, b, 0666)
assert.Equal(t, 2216332, len(b)) assert.Equal(t, goldenHttptsLenList[mode], len(b))
assert.Equal(t, "03f8eac7d2c3d5d85056c410f5fcc756", nazamd5.Md5(b)) assert.Equal(t, goldenHttptsMd5List[mode], nazamd5.Md5(b))
Log.Infof("CHEFGREPME %+v", err)
}() }()
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
// TODO(chef): [test] [2021.12.25] rtsp sub测试 由于rtsp sub不支持没有pub时sub只能sub失败后重试所以没有验证收到的数据 // TODO(chef): [test] [2021.12.25] rtsp sub测试 由于rtsp sub不支持没有pub时sub只能sub失败后重试所以没有验证收到的数据
// TODO(chef): [perf] [2021.12.25] rtmp推rtsp拉的性能。开启rtsp pull后rtmp pull的总时长增加了 // TODO(chef): [perf] [2021.12.25] rtmp推rtsp拉的性能。开启rtsp pull后rtmp pull的总时长增加了
@ -210,9 +251,11 @@ func Entry(t *testing.T) {
} }
}() }()
time.Sleep(100 * time.Millisecond)
pushSession = rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { pushSession = rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.WriteBufSize = 4096 option.WriteBufSize = 4096
option.WriteChanSize = 1024 //option.WriteChanSize = 1024
}) })
err = pushSession.Push(pushUrl) err = pushSession.Push(pushUrl)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
@ -221,7 +264,7 @@ func Entry(t *testing.T) {
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
chunks := remux.FlvTag2RtmpChunks(tag) chunks := remux.FlvTag2RtmpChunks(tag)
//Log.Debugf("rtmp push: %d", fileTagCount.Load()) //Log.Debugf("rtmp push: %d", fileTagCount.Load())
err = pushSession.Write(chunks) err := pushSession.Write(chunks)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
} }
err = pushSession.Flush() err = pushSession.Flush()
@ -229,21 +272,25 @@ func Entry(t *testing.T) {
getAllHttpApi(config.HttpApiConfig.Addr) getAllHttpApi(config.HttpApiConfig.Addr)
// 注意先释放push触发pub释放从而刷新hls的结束时切片逻辑
pushSession.Dispose()
for { for {
if httpflvPullTagCount.Load() == uint32(fileTagCount) && if httpflvPullTagCount.Load() == uint32(fileTagCount) &&
rtmpPullTagCount.Load() == uint32(fileTagCount) { rtmpPullTagCount.Load() == uint32(fileTagCount) &&
time.Sleep(100 * time.Millisecond) httptsSize.Load() == uint32(goldenHttptsLenList[mode]) {
break break
} }
time.Sleep(10 * time.Millisecond) nazalog.Debugf("%d(%d, %d) %d(%d)",
fileTagCount, httpflvPullTagCount.Load(), rtmpPullTagCount.Load(), goldenHttptsLenList[mode], httptsSize.Load())
time.Sleep(100 * time.Millisecond)
} }
Log.Debug("[innertest] start dispose.") Log.Debug("[innertest] start dispose.")
pushSession.Dispose()
httpflvPullSession.Dispose() httpflvPullSession.Dispose()
rtmpPullSession.Dispose() rtmpPullSession.Dispose()
//rtspPullSession.Dispose() rtspPullSession.Dispose()
httpFlvWriter.Dispose() httpFlvWriter.Dispose()
rtmpWriter.Dispose() rtmpWriter.Dispose()
@ -260,34 +307,28 @@ func Entry(t *testing.T) {
func compareFile() { func compareFile() {
r, err := ioutil.ReadFile(rFlvFileName) r, err := ioutil.ReadFile(rFlvFileName)
assert.Equal(tt, nil, err) assert.Equal(t, nil, err)
Log.Debugf("%s filesize:%d", rFlvFileName, len(r)) Log.Debugf("%s filesize:%d", rFlvFileName, len(r))
// 检查httpflv // 检查httpflv
w, err := ioutil.ReadFile(wFlvPullFileName) w, err := ioutil.ReadFile(wFlvPullFileName)
assert.Equal(tt, nil, err) assert.Equal(t, nil, err)
Log.Debugf("%s filesize:%d", wFlvPullFileName, len(w)) assert.Equal(t, goldenHttpflvLenList[mode], len(w))
res := bytes.Compare(r, w) assert.Equal(t, goldenHttpflvMd5List[mode], nazamd5.Md5(w))
assert.Equal(tt, 0, res)
//err = os.Remove(wFlvPullFileName)
assert.Equal(tt, nil, err)
// 检查rtmp // 检查rtmp
w2, err := ioutil.ReadFile(wRtmpPullFileName) w, err = ioutil.ReadFile(wRtmpPullFileName)
assert.Equal(tt, nil, err) assert.Equal(t, nil, err)
Log.Debugf("%s filesize:%d", wRtmpPullFileName, len(w2)) assert.Equal(t, goldenRtmpLenList[mode], len(w))
res = bytes.Compare(r, w2) assert.Equal(t, goldenRtmpMd5List[mode], nazamd5.Md5(w))
assert.Equal(tt, 0, res)
//err = os.Remove(wRtmpPullFileName)
assert.Equal(tt, nil, err)
// 检查hls的m3u8文件 // 检查hls的m3u8文件
playListM3u8, err := ioutil.ReadFile(wPlaylistM3u8FileName) playListM3u8, err := ioutil.ReadFile(wPlaylistM3u8FileName)
assert.Equal(tt, nil, err) assert.Equal(t, nil, err)
assert.Equal(tt, goldenPlaylistM3u8, string(playListM3u8)) assert.Equal(t, goldenPlaylistM3u8List[mode], string(playListM3u8))
recordM3u8, err := ioutil.ReadFile(wRecordM3u8FileName) recordM3u8, err := ioutil.ReadFile(wRecordM3u8FileName)
assert.Equal(tt, nil, err) assert.Equal(t, nil, err)
assert.Equal(tt, []byte(goldenRecordM3u8), recordM3u8) assert.Equal(t, goldenRecordM3u8List[mode], string(recordM3u8))
// 检查hls的ts文件 // 检查hls的ts文件
var allContent []byte var allContent []byte
@ -301,11 +342,11 @@ func compareFile() {
fileNum++ fileNum++
return nil return nil
}) })
assert.Equal(tt, nil, err) assert.Equal(t, nil, err)
allContentMd5 := nazamd5.Md5(allContent) allContentMd5 := nazamd5.Md5(allContent)
assert.Equal(tt, 8, fileNum) assert.Equal(t, goldenHlsTsNumList[mode], fileNum)
assert.Equal(tt, 2219152, len(allContent)) assert.Equal(t, goldenHlsTsLenList[mode], len(allContent))
assert.Equal(tt, "48db6251d40c271fd11b05650f074e0f", allContentMd5) assert.Equal(t, goldenHlsTsMd5List[mode], allContentMd5)
} }
func getAllHttpApi(addr string) { func getAllHttpApi(addr string) {
@ -339,6 +380,30 @@ func getAllHttpApi(addr string) {
Log.Debugf("%s", string(b)) Log.Debugf("%s", string(b))
} }
func getHttpts() ([]byte, error) {
resp, err := http.DefaultClient.Get(httptsPullUrl)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var buf nazabytes.Buffer
buf.ReserveBytes(goldenHttptsLenList[mode])
for {
n, err := resp.Body.Read(buf.WritableBytes())
if n > 0 {
buf.Flush(n)
httptsSize.Add(uint32(n))
}
if err != nil {
return buf.Bytes(), err
}
if buf.Len() == goldenHttptsLenList[mode] {
return buf.Bytes(), nil
}
}
}
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
// TODO(chef): refactor 移入naza中 // TODO(chef): refactor 移入naza中
@ -363,48 +428,175 @@ func httpPost(url string, info interface{}) ([]byte, error) {
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
var goldenPlaylistM3u8 = `#EXTM3U var (
goldenRtmpLenList = []int{2120047, 504722, 1615715}
goldenRtmpMd5List = []string{
"7d68f0e2ab85c1992f70740479c8d3db",
"b889f690e07399c8c8353a3b1dba7efb",
"b5a9759455039761b6d4dd3ed8e97634",
}
goldenHttpflvLenList = []int{2120047, 504722, 1615715}
goldenHttpflvMd5List = []string{
"7d68f0e2ab85c1992f70740479c8d3db",
"b889f690e07399c8c8353a3b1dba7efb",
"b5a9759455039761b6d4dd3ed8e97634",
}
goldenHlsTsNumList = []int{8, 10, 8}
goldenHlsTsLenList = []int{2219152, 525648, 1696512}
goldenHlsTsMd5List = []string{
"48db6251d40c271fd11b05650f074e0f",
"2eb19ad498688dadf950b3e749985922",
"2d1e5c1a3ca385e0b55b2cff2f52b710",
}
goldenHttptsLenList = []int{2216332, 522264, 1693880}
goldenHttptsMd5List = []string{
"03f8eac7d2c3d5d85056c410f5fcc756",
"0d102b6fb7fc3134e56a07f00292e888",
"651a2b5c93370738d81995f8fd49af4d",
}
)
var goldenPlaylistM3u8List = []string{
`#EXTM3U
#EXT-X-VERSION:3 #EXT-X-VERSION:3
#EXT-X-ALLOW-CACHE:NO #EXT-X-ALLOW-CACHE:NO
#EXT-X-TARGETDURATION:5 #EXT-X-TARGETDURATION:5
#EXT-X-MEDIA-SEQUENCE:2 #EXT-X-MEDIA-SEQUENCE:2
#EXTINF:3.333, #EXTINF:3.333,
innertest-1642346665000-2.ts innertest-1642375465000-2.ts
#EXTINF:4.000, #EXTINF:4.000,
innertest-1642346665000-3.ts innertest-1642375465000-3.ts
#EXTINF:4.867, #EXTINF:4.867,
innertest-1642346665000-4.ts innertest-1642375465000-4.ts
#EXTINF:3.133, #EXTINF:3.133,
innertest-1642346665000-5.ts innertest-1642375465000-5.ts
#EXTINF:4.000, #EXTINF:4.000,
innertest-1642346665000-6.ts innertest-1642375465000-6.ts
#EXTINF:2.621, #EXTINF:2.621,
innertest-1642346665000-7.ts innertest-1642375465000-7.ts
#EXT-X-ENDLIST
`,
`#EXTM3U
#EXT-X-VERSION:3
#EXT-X-ALLOW-CACHE:NO
#EXT-X-TARGETDURATION:3
#EXT-X-MEDIA-SEQUENCE:4
#EXTINF:3.088,
innertest-1642375465000-4.ts
#EXTINF:3.088,
innertest-1642375465000-5.ts
#EXTINF:3.089,
innertest-1642375465000-6.ts
#EXTINF:3.088,
innertest-1642375465000-7.ts
#EXTINF:3.088,
innertest-1642375465000-8.ts
#EXTINF:2.113,
innertest-1642375465000-9.ts
#EXT-X-ENDLIST #EXT-X-ENDLIST
` `,
`#EXTM3U
#EXT-X-VERSION:3
#EXT-X-ALLOW-CACHE:NO
#EXT-X-TARGETDURATION:5
#EXT-X-MEDIA-SEQUENCE:2
var goldenRecordM3u8 = `#EXTM3U #EXTINF:3.333,
innertest-1642375465000-2.ts
#EXTINF:4.000,
innertest-1642375465000-3.ts
#EXTINF:4.867,
innertest-1642375465000-4.ts
#EXTINF:3.133,
innertest-1642375465000-5.ts
#EXTINF:4.000,
innertest-1642375465000-6.ts
#EXTINF:2.600,
innertest-1642375465000-7.ts
#EXT-X-ENDLIST
`,
}
var goldenRecordM3u8List = []string{
`#EXTM3U
#EXT-X-VERSION:3 #EXT-X-VERSION:3
#EXT-X-TARGETDURATION:5 #EXT-X-TARGETDURATION:5
#EXT-X-MEDIA-SEQUENCE:0 #EXT-X-MEDIA-SEQUENCE:0
#EXT-X-DISCONTINUITY #EXT-X-DISCONTINUITY
#EXTINF:4.000, #EXTINF:4.000,
innertest-1642346665000-0.ts innertest-1642375465000-0.ts
#EXTINF:4.000, #EXTINF:4.000,
innertest-1642346665000-1.ts innertest-1642375465000-1.ts
#EXTINF:3.333, #EXTINF:3.333,
innertest-1642346665000-2.ts innertest-1642375465000-2.ts
#EXTINF:4.000, #EXTINF:4.000,
innertest-1642346665000-3.ts innertest-1642375465000-3.ts
#EXTINF:4.867, #EXTINF:4.867,
innertest-1642346665000-4.ts innertest-1642375465000-4.ts
#EXTINF:3.133, #EXTINF:3.133,
innertest-1642346665000-5.ts innertest-1642375465000-5.ts
#EXTINF:4.000, #EXTINF:4.000,
innertest-1642346665000-6.ts innertest-1642375465000-6.ts
#EXTINF:2.621, #EXTINF:2.621,
innertest-1642346665000-7.ts innertest-1642375465000-7.ts
#EXT-X-ENDLIST #EXT-X-ENDLIST
` `,
`#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:3
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-DISCONTINUITY
#EXTINF:3.088,
innertest-1642375465000-0.ts
#EXTINF:3.088,
innertest-1642375465000-1.ts
#EXTINF:3.089,
innertest-1642375465000-2.ts
#EXTINF:3.088,
innertest-1642375465000-3.ts
#EXTINF:3.088,
innertest-1642375465000-4.ts
#EXTINF:3.088,
innertest-1642375465000-5.ts
#EXTINF:3.089,
innertest-1642375465000-6.ts
#EXTINF:3.088,
innertest-1642375465000-7.ts
#EXTINF:3.088,
innertest-1642375465000-8.ts
#EXTINF:2.113,
innertest-1642375465000-9.ts
#EXT-X-ENDLIST
`,
`#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:5
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-DISCONTINUITY
#EXTINF:4.000,
innertest-1642375465000-0.ts
#EXTINF:4.000,
innertest-1642375465000-1.ts
#EXTINF:3.333,
innertest-1642375465000-2.ts
#EXTINF:4.000,
innertest-1642375465000-3.ts
#EXTINF:4.867,
innertest-1642375465000-4.ts
#EXTINF:3.133,
innertest-1642375465000-5.ts
#EXTINF:4.000,
innertest-1642375465000-6.ts
#EXTINF:2.600,
innertest-1642375465000-7.ts
#EXT-X-ENDLIST
`,
}

@ -0,0 +1,15 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package innertest
import "testing"
func TestEntry(t *testing.T) {
Entry(t)
}

@ -322,6 +322,9 @@ func LoadConfAndInitLog(confFile string) *Config {
return config return config
} }
// ---------------------------------------------------------------------------------------------------------------------
func mergeCommonHttpAddrConfig(dst, src *CommonHttpAddrConfig) { func mergeCommonHttpAddrConfig(dst, src *CommonHttpAddrConfig) {
if dst.HttpListenAddr == "" && src.HttpListenAddr != "" { if dst.HttpListenAddr == "" && src.HttpListenAddr != "" {
dst.HttpListenAddr = src.HttpListenAddr dst.HttpListenAddr = src.HttpListenAddr

File diff suppressed because it is too large Load Diff

@ -0,0 +1,440 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"net"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
)
// group__streaming.go
//
// 包含group中音视频数据转发、转封装协议的逻辑
//
// ---------------------------------------------------------------------------------------------------------------------
// OnReadRtmpAvMsg
//
// 输入rtmp数据.
// 来自 rtmp.ServerSession(Pub), rtmp.PullSession, (remux.DummyAudioFilter) 的回调.
//
func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.broadcastByRtmpMsg(msg)
}
// ---------------------------------------------------------------------------------------------------------------------
// OnSdp OnRtpPacket OnAvPacket
//
// 输入rtsp(rtp)和rtp合帧之后的数据.
// 来自 rtsp.PubSession 的回调.
//
func (group *Group) OnSdp(sdpCtx sdp.LogicContext) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.sdpCtx = &sdpCtx
group.rtsp2RtmpRemuxer.OnSdp(sdpCtx)
}
// OnRtpPacket ...
func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.feedRtpPacket(pkt)
}
// OnAvPacket ...
func (group *Group) OnAvPacket(pkt base.AvPacket) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtsp2RtmpRemuxer.OnAvPacket(pkt)
}
// ---------------------------------------------------------------------------------------------------------------------
// OnPatPmt OnTsPackets
//
// 输入mpegts数据.
// 来自 remux.Rtmp2MpegtsRemuxer 的回调.
//
func (group *Group) OnPatPmt(b []byte) {
group.patpmt = b
group.hlsMuxer.FeedPatPmt(b)
if group.recordMpegts != nil {
if err := group.recordMpegts.Write(b); err != nil {
Log.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err)
}
}
}
// OnTsPackets ...
func (group *Group) OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) {
group.feedTsPackets(tsPackets, frame, boundary)
}
// ---------------------------------------------------------------------------------------------------------------------
// onRtmpMsgFromRemux
//
// 输入rtmp数据.
// 来自 remux.AvPacket2RtmpRemuxer 的回调.
//
func (group *Group) onRtmpMsgFromRemux(msg base.RtmpMsg) {
group.broadcastByRtmpMsg(msg)
}
// ---------------------------------------------------------------------------------------------------------------------
// onSdpFromRemux onRtpPacketFromRemux
//
// 输入rtsp(rtp)数据.
// 来自 remux.Rtmp2RtspRemuxer 的回调.
//
func (group *Group) onSdpFromRemux(sdpCtx sdp.LogicContext) {
group.sdpCtx = &sdpCtx
}
// onRtpPacketFromRemux ...
func (group *Group) onRtpPacketFromRemux(pkt rtprtcp.RtpPacket) {
group.feedRtpPacket(pkt)
}
// ---------------------------------------------------------------------------------------------------------------------
// OnFragmentOpen
//
// 来自 hls.Muxer 的回调
//
func (group *Group) OnFragmentOpen() {
group.rtmp2MpegtsRemuxer.FlushAudio()
}
// ---------------------------------------------------------------------------------------------------------------------
// broadcastByRtmpMsg
//
// 使用rtmp类型的数据做为输入广播给各协议的输出
//
// @param msg 调用结束后内部不持有msg.Payload内存块
//
func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
var (
lcd remux.LazyRtmpChunkDivider
lrm2ft remux.LazyRtmpMsg2FlvTag
)
if len(msg.Payload) == 0 {
Log.Warnf("[%s] msg payload length is 0. %+v", group.UniqueKey, msg.Header)
return
}
// # mpegts remuxer
if group.config.HlsConfig.Enable || group.config.HttptsConfig.Enable {
group.rtmp2MpegtsRemuxer.FeedRtmpMessage(msg)
}
// # rtsp
if group.config.RtspConfig.Enable && group.rtmp2RtspRemuxer != nil {
group.rtmp2RtspRemuxer.FeedRtmpMsg(msg)
}
// # 设置好用于发送的 rtmp 头部信息
currHeader := remux.MakeDefaultRtmpHeader(msg.Header)
if currHeader.MsgLen != uint32(len(msg.Payload)) {
Log.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, currHeader.MsgLen, len(msg.Payload), msg.Header)
}
// # 懒初始化rtmp chunk切片以及httpflv转换
lcd.Init(msg.Payload, &currHeader)
lrm2ft.Init(msg)
// # 广播。遍历所有 rtmp sub session转发数据
// ## 如果是新的 sub session发送已缓存的信息
for session := range group.rtmpSubSessionSet {
if session.IsFresh {
// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
if group.rtmpGopCache.Metadata != nil {
Log.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.Metadata)
}
if group.rtmpGopCache.VideoSeqHeader != nil {
Log.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.rtmpGopCache.AacSeqHeader != nil {
Log.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.AacSeqHeader)
}
gopCount := group.rtmpGopCache.GetGopCount()
if gopCount > 0 {
// GOP缓存中肯定包含了关键帧
session.ShouldWaitVideoKeyFrame = false
Log.Debugf("[%s] [%s] write gop cache. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount)
}
for i := 0; i < gopCount; i++ {
for _, item := range group.rtmpGopCache.GetGopDataAt(i) {
_ = session.Write(item)
}
}
// 有新加入的sub session本次循环的第一个新加入的sub session把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 从而确保新加入的sub session不会发送这部分脏的数据
// 注意此处可能被调用多次但是只有第一次会实际flush缓存数据
if group.rtmpMergeWriter != nil {
group.rtmpMergeWriter.Flush()
}
session.IsFresh = false
}
if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNalu() {
// 有sub session在等待关键帧并且当前是关键帧
// 把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 并且修改这个sub session的标志
// 让rtmp buf writer来发送这个关键帧
if group.rtmpMergeWriter != nil {
group.rtmpMergeWriter.Flush()
}
session.ShouldWaitVideoKeyFrame = false
}
}
// ## 转发本次数据
if len(group.rtmpSubSessionSet) > 0 {
if group.rtmpMergeWriter == nil {
group.write2RtmpSubSessions(lcd.Get())
} else {
group.rtmpMergeWriter.Write(lcd.Get())
}
}
// TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下
if group.pushEnable {
for _, v := range group.url2PushProxy {
if v.pushSession == nil {
continue
}
if v.pushSession.IsFresh {
if group.rtmpGopCache.Metadata != nil {
_ = v.pushSession.Write(group.rtmpGopCache.Metadata)
}
if group.rtmpGopCache.VideoSeqHeader != nil {
_ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.rtmpGopCache.AacSeqHeader != nil {
_ = v.pushSession.Write(group.rtmpGopCache.AacSeqHeader)
}
for i := 0; i < group.rtmpGopCache.GetGopCount(); i++ {
for _, item := range group.rtmpGopCache.GetGopDataAt(i) {
_ = v.pushSession.Write(item)
}
}
v.pushSession.IsFresh = false
}
_ = v.pushSession.Write(lcd.Get())
}
}
// # 广播。遍历所有 httpflv sub session转发数据
for session := range group.httpflvSubSessionSet {
if session.IsFresh {
if group.httpflvGopCache.Metadata != nil {
session.Write(group.httpflvGopCache.Metadata)
}
if group.httpflvGopCache.VideoSeqHeader != nil {
session.Write(group.httpflvGopCache.VideoSeqHeader)
}
if group.httpflvGopCache.AacSeqHeader != nil {
session.Write(group.httpflvGopCache.AacSeqHeader)
}
gopCount := group.httpflvGopCache.GetGopCount()
if gopCount > 0 {
// GOP缓存中肯定包含了关键帧
session.ShouldWaitVideoKeyFrame = false
}
for i := 0; i < gopCount; i++ {
for _, item := range group.httpflvGopCache.GetGopDataAt(i) {
session.Write(item)
}
}
session.IsFresh = false
}
// 是否在等待关键帧
if session.ShouldWaitVideoKeyFrame {
if msg.IsVideoKeyNalu() {
session.Write(lrm2ft.Get())
session.ShouldWaitVideoKeyFrame = false
}
} else {
session.Write(lrm2ft.Get())
}
}
// # 录制flv文件
if group.recordFlv != nil {
if err := group.recordFlv.WriteRaw(lrm2ft.Get()); err != nil {
Log.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err)
}
}
// # 缓存关键信息以及gop
if group.config.RtmpConfig.Enable {
group.rtmpGopCache.Feed(msg, lcd.Get)
}
if group.config.HttpflvConfig.Enable {
group.httpflvGopCache.Feed(msg, lrm2ft.Get)
}
// # 记录stat
if group.stat.AudioCodec == "" {
if msg.IsAacSeqHeader() {
group.stat.AudioCodec = base.AudioCodecAac
}
}
if group.stat.VideoCodec == "" {
if msg.IsAvcKeySeqHeader() {
group.stat.VideoCodec = base.VideoCodecAvc
}
if msg.IsHevcKeySeqHeader() {
group.stat.VideoCodec = base.VideoCodecHevc
}
}
if group.stat.VideoHeight == 0 || group.stat.VideoWidth == 0 {
if msg.IsAvcKeySeqHeader() {
sps, _, err := avc.ParseSpsPpsFromSeqHeader(msg.Payload)
if err == nil {
var ctx avc.Context
err = avc.ParseSps(sps, &ctx)
if err == nil {
group.stat.VideoHeight = int(ctx.Height)
group.stat.VideoWidth = int(ctx.Width)
}
}
}
if msg.IsHevcKeySeqHeader() {
_, sps, _, err := hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload)
if err == nil {
var ctx hevc.Context
err = hevc.ParseSps(sps, &ctx)
if err == nil {
group.stat.VideoHeight = int(ctx.PicHeightInLumaSamples)
group.stat.VideoWidth = int(ctx.PicWidthInLumaSamples)
}
}
}
}
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) feedRtpPacket(pkt rtprtcp.RtpPacket) {
// 音频直接发送
if group.sdpCtx.IsAudioPayloadTypeOrigin(int(pkt.Header.PacketType)) {
for s := range group.rtspSubSessionSet {
s.WriteRtpPacket(pkt)
}
return
}
var (
boundary bool
boundaryChecked bool // 保证只检查0次或1次减少性能开销
)
for s := range group.rtspSubSessionSet {
if !s.ShouldWaitVideoKeyFrame {
s.WriteRtpPacket(pkt)
continue
}
if !boundaryChecked {
switch group.sdpCtx.GetVideoPayloadTypeBase() {
case base.AvPacketPtAvc:
boundary = rtprtcp.IsAvcBoundary(pkt)
case base.AvPacketPtHevc:
boundary = rtprtcp.IsHevcBoundary(pkt)
default:
// 注意不是avc和hevc时直接发送
boundary = true
}
boundaryChecked = true
}
if boundary {
s.WriteRtpPacket(pkt)
s.ShouldWaitVideoKeyFrame = false
}
}
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) feedTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) {
// 注意hls的处理放在前面让hls先判断是否打开新的fragment并flush audio
if group.hlsMuxer != nil {
group.hlsMuxer.FeedMpegts(tsPackets, frame, boundary)
}
for session := range group.httptsSubSessionSet {
if session.IsFresh {
if boundary {
session.Write(group.patpmt)
session.Write(tsPackets)
session.IsFresh = false
}
} else {
session.Write(tsPackets)
}
}
if group.recordMpegts != nil {
if err := group.recordMpegts.Write(tsPackets); err != nil {
Log.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err)
}
}
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) write2RtmpSubSessions(b []byte) {
for session := range group.rtmpSubSessionSet {
if session.IsFresh || session.ShouldWaitVideoKeyFrame {
continue
}
_ = session.Write(b)
}
}
func (group *Group) writev2RtmpSubSessions(bs net.Buffers) {
for session := range group.rtmpSubSessionSet {
if session.IsFresh || session.ShouldWaitVideoKeyFrame {
continue
}
_ = session.Writev(bs)
}
}

@ -0,0 +1,188 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtsp"
)
func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error {
group.mutex.Lock()
defer group.mutex.Unlock()
Log.Debugf("[%s] [%s] add rtmp pub session into group.", group.UniqueKey, session.UniqueKey())
if group.hasInSession() {
Log.Errorf("[%s] in stream already exist at group. add=%s, exist=%s",
group.UniqueKey, session.UniqueKey(), group.inSessionUniqueKey())
return base.ErrDupInStream
}
group.rtmpPubSession = session
group.addIn()
if group.config.RtspConfig.Enable {
group.rtmp2RtspRemuxer = remux.NewRtmp2RtspRemuxer(
group.onSdpFromRemux,
group.onRtpPacketFromRemux,
)
}
// TODO(chef): 为rtmp pull以及rtsp也添加叠加静音音频的功能
if group.config.RtmpConfig.AddDummyAudioEnable {
// TODO(chef): 从整体控制和锁关系来说应该让pub的数据回调到group中进锁后再让数据流入filter
group.dummyAudioFilter = remux.NewDummyAudioFilter(group.UniqueKey, group.config.RtmpConfig.AddDummyAudioWaitAudioMs, group.OnReadRtmpAvMsg)
session.SetPubSessionObserver(group.dummyAudioFilter)
} else {
session.SetPubSessionObserver(group)
}
return nil
}
// AddRtspPubSession TODO chef: rtsp package中增加回调返回值判断如果是false将连接关掉
func (group *Group) AddRtspPubSession(session *rtsp.PubSession) error {
Log.Debugf("[%s] [%s] add RTSP PubSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.hasInSession() {
Log.Errorf("[%s] in stream already exist at group. wanna add=%s", group.UniqueKey, session.UniqueKey())
return base.ErrDupInStream
}
group.rtspPubSession = session
group.addIn()
group.rtsp2RtmpRemuxer = remux.NewAvPacket2RtmpRemuxer(group.onRtmpMsgFromRemux)
session.SetObserver(group)
return nil
}
func (group *Group) AddRtmpPullSession(session *rtmp.PullSession) bool {
Log.Debugf("[%s] [%s] add PullSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.hasInSession() {
Log.Errorf("[%s] in stream already exist. wanna add=%s", group.UniqueKey, session.UniqueKey())
return false
}
group.pullProxy.pullSession = session
group.addIn()
// TODO(chef): 这里也应该启动rtmp2RtspRemuxer
return true
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) DelRtmpPubSession(session *rtmp.ServerSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delRtmpPubSession(session)
}
func (group *Group) DelRtspPubSession(session *rtsp.PubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delRtspPubSession(session)
}
func (group *Group) DelRtmpPullSession(session *rtmp.PullSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delRtmpPullSession(session)
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) delRtmpPubSession(session *rtmp.ServerSession) {
Log.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, session.UniqueKey())
if session != group.rtmpPubSession {
Log.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p",
group.UniqueKey, session.UniqueKey(), group.rtmpPubSession)
return
}
group.delIn()
}
func (group *Group) delRtspPubSession(session *rtsp.PubSession) {
Log.Debugf("[%s] [%s] del rtsp PubSession from group.", group.UniqueKey, session.UniqueKey())
if session != group.rtspPubSession {
Log.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p",
group.UniqueKey, session.UniqueKey(), group.rtspPubSession)
return
}
group.delIn()
}
func (group *Group) delRtmpPullSession(session *rtmp.PullSession) {
Log.Debugf("[%s] [%s] del rtmp PullSession from group.", group.UniqueKey, session.UniqueKey())
group.pullProxy.pullSession = nil
group.setPullingFlag(false)
group.delIn()
}
// ---------------------------------------------------------------------------------------------------------------------
// addIn 有pub或pull的输入型session加入时需要调用该函数
//
func (group *Group) addIn() {
now := time.Now().Unix()
if group.config.HlsConfig.Enable || group.config.HttptsConfig.Enable {
group.rtmp2MpegtsRemuxer = remux.NewRtmp2MpegtsRemuxer(group)
}
group.startPushIfNeeded()
group.startHlsIfNeeded()
group.startRecordFlvIfNeeded(now)
group.startRecordMpegtsIfNeeded(now)
}
// delIn 有pub或pull的输入型session离开时需要调用该函数
//
func (group *Group) delIn() {
// 注意remuxer放前面使得有机会将内部缓存的数据吐出来
if group.rtmp2MpegtsRemuxer != nil {
group.rtmp2MpegtsRemuxer.Dispose()
group.rtmp2MpegtsRemuxer = nil
}
group.stopPushIfNeeded()
group.stopHlsIfNeeded()
group.stopRecordFlvIfNeeded()
group.stopRecordMpegtsIfNeeded()
group.rtmpPubSession = nil
group.rtspPubSession = nil
group.rtsp2RtmpRemuxer = nil
group.rtmp2RtspRemuxer = nil
group.dummyAudioFilter = nil
group.rtmpGopCache.Clear()
group.httpflvGopCache.Clear()
group.patpmt = nil
group.sdpCtx = nil
}

@ -0,0 +1,135 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtsp"
)
func (group *Group) AddRtmpSubSession(session *rtmp.ServerSession) {
Log.Debugf("[%s] [%s] add SubSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtmpSubSessionSet[session] = struct{}{}
// 加入时,如果上行还没有推过视频(比如还没推流,或者是单音频流),就不需要等待关键帧了
// 也即我们假定上行肯定是以关键帧为开始进行视频发送假设不是那么我们按上行的流正常发而不过滤掉关键帧前面的不包含关键帧的非完整GOP
// TODO(chef):
// 1. 需要仔细考虑单音频无视频的流的情况
// 2. 这里不修改标志让这个session继续等关键帧也可以
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
group.pullIfNeeded()
}
func (group *Group) AddHttpflvSubSession(session *httpflv.SubSession) {
Log.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey())
session.WriteHttpResponseHeader()
session.WriteFlvHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
group.httpflvSubSessionSet[session] = struct{}{}
// 加入时,如果上行还没有推流过,就不需要等待关键帧了
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
group.pullIfNeeded()
}
// AddHttptsSubSession TODO chef:
// 这里应该也要考虑触发hls muxer开启
// 也即HTTPTS sub需要使用hls muxerhls muxer开启和关闭都要考虑HTTPTS sub
func (group *Group) AddHttptsSubSession(session *httpts.SubSession) {
Log.Debugf("[%s] [%s] add httpts SubSession into group.", group.UniqueKey, session.UniqueKey())
session.WriteHttpResponseHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
group.httptsSubSessionSet[session] = struct{}{}
group.pullIfNeeded()
}
func (group *Group) HandleNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) {
group.mutex.Lock()
defer group.mutex.Unlock()
// TODO(chef): 应该有等待机制,而不是直接关闭
if group.sdpCtx == nil {
Log.Warnf("[%s] close rtsp subSession while describe but sdp not exist. [%s]",
group.UniqueKey, session.UniqueKey())
return false, nil
}
return true, group.sdpCtx.RawSdp
}
func (group *Group) HandleNewRtspSubSessionPlay(session *rtsp.SubSession) {
Log.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtspSubSessionSet[session] = struct{}{}
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
// TODO(chef): rtsp sub也应该判断是否需要静态pull回源
}
func (group *Group) DelRtmpSubSession(session *rtmp.ServerSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delRtmpSubSession(session)
}
func (group *Group) DelHttpflvSubSession(session *httpflv.SubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delHttpflvSubSession(session)
}
func (group *Group) DelHttptsSubSession(session *httpts.SubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delHttptsSubSession(session)
}
func (group *Group) DelRtspSubSession(session *rtsp.SubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delRtspSubSession(session)
}
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) delRtmpSubSession(session *rtmp.ServerSession) {
Log.Debugf("[%s] [%s] del rtmp SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.rtmpSubSessionSet, session)
}
func (group *Group) delHttpflvSubSession(session *httpflv.SubSession) {
Log.Debugf("[%s] [%s] del httpflv SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.httpflvSubSessionSet, session)
}
func (group *Group) delHttptsSubSession(session *httpts.SubSession) {
Log.Debugf("[%s] [%s] del httpts SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.httptsSubSessionSet, session)
}
func (group *Group) delRtspSubSession(session *rtsp.SubSession) {
Log.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.rtspSubSessionSet, session)
}

@ -0,0 +1,57 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"path/filepath"
"github.com/q191201771/lal/pkg/httpflv"
)
// startRecordFlvIfNeeded 必要时开启flv录制
//
func (group *Group) startRecordFlvIfNeeded(nowUnix int64) {
if !group.config.RecordConfig.EnableFlv {
return
}
// 构造文件名
filename := fmt.Sprintf("%s-%d.flv", group.streamName, nowUnix)
filenameWithPath := filepath.Join(group.config.RecordConfig.FlvOutPath, filename)
// 如果已经在录制,则先关闭
// TODO(chef): 正常的逻辑是否会走到这?
if group.recordFlv != nil {
Log.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordFlv.Name())
_ = group.recordFlv.Dispose()
}
// 初始化录制
group.recordFlv = &httpflv.FlvFileWriter{}
if err := group.recordFlv.Open(filenameWithPath); err != nil {
Log.Errorf("[%s] record flv open file failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFlv = nil
}
if err := group.recordFlv.WriteFlvHeader(); err != nil {
Log.Errorf("[%s] record flv write flv header failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFlv = nil
}
}
func (group *Group) stopRecordFlvIfNeeded() {
if !group.config.RecordConfig.EnableFlv {
return
}
if group.recordFlv != nil {
_ = group.recordFlv.Dispose()
group.recordFlv = nil
}
}

@ -0,0 +1,43 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import "github.com/q191201771/lal/pkg/hls"
func (group *Group) IsHlsMuxerAlive() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.hlsMuxer != nil
}
// startHlsIfNeeded 必要时启动hls
//
func (group *Group) startHlsIfNeeded() {
// TODO(chef): [refactor] ts依赖hls
if !group.config.HlsConfig.Enable {
return
}
if group.hlsMuxer != nil {
Log.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer)
}
enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps
group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group)
group.hlsMuxer.Start()
}
func (group *Group) stopHlsIfNeeded() {
if !group.config.HlsConfig.Enable {
return
}
if group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath())
group.hlsMuxer = nil
}
}

@ -0,0 +1,50 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"path/filepath"
"github.com/q191201771/lal/pkg/mpegts"
)
// startRecordMpegtsIfNeeded 必要时开启ts录制
//
func (group *Group) startRecordMpegtsIfNeeded(nowUnix int64) {
if !group.config.RecordConfig.EnableMpegts {
return
}
// 构造文件名
filename := fmt.Sprintf("%s-%d.ts", group.streamName, nowUnix)
filenameWithPath := filepath.Join(group.config.RecordConfig.MpegtsOutPath, filename)
// 如果已经在录制,则先关闭
if group.recordMpegts != nil {
Log.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordMpegts.Name())
_ = group.recordMpegts.Dispose()
}
group.recordMpegts = &mpegts.FileWriter{}
if err := group.recordMpegts.Create(filenameWithPath); err != nil {
Log.Errorf("[%s] record mpegts open file failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordMpegts = nil
}
}
func (group *Group) stopRecordMpegtsIfNeeded() {
if !group.config.RecordConfig.EnableMpegts {
return
}
if group.recordMpegts != nil {
_ = group.recordMpegts.Dispose()
group.recordMpegts = nil
}
}

@ -0,0 +1,133 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"github.com/q191201771/lal/pkg/rtmp"
)
// StartPull 外部命令主动触发pull拉流
//
// 当前调用时机:
// 1. 比如http api
//
func (group *Group) StartPull(url string) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.setPullUrl(true, url)
group.pullIfNeeded()
}
// ---------------------------------------------------------------------------------------------------------------------
type pullProxy struct {
isPulling bool
pullSession *rtmp.PullSession
}
func (group *Group) initRelayPull() {
enable := group.config.RelayPullConfig.Enable
addr := group.config.RelayPullConfig.Addr
appName := group.appName
streamName := group.streamName
// 根据配置文件中的静态回源配置来初始化回源设置
var pullUrl string
if enable {
pullUrl = fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName)
}
group.setPullUrl(enable, pullUrl)
}
func (group *Group) isPullEnable() bool {
return group.pullEnable
}
func (group *Group) setPullUrl(enable bool, url string) {
group.pullEnable = enable
group.pullUrl = url
}
func (group *Group) getPullUrl() string {
return group.pullUrl
}
func (group *Group) setPullingFlag(flag bool) {
group.pullProxy.isPulling = flag
}
func (group *Group) getPullingFlag() bool {
return group.pullProxy.isPulling
}
// 判断是否需要pull从远端拉流至本地如果需要则触发pull
//
// 当前调用时机:
// 1. 添加新sub session
// 2. 外部命令比如http api
// 3. 定时器比如pull的连接断了通过定时器可以重启触发pull
//
func (group *Group) pullIfNeeded() {
if !group.isPullEnable() {
return
}
// 如果没有从本地拉流的就不需要pull了
if !group.hasOutSession() {
return
}
// 如果本地已经有输入型的流就不需要pull了
if group.hasInSession() {
return
}
// 已经在pull中就不需要pull了
if group.getPullingFlag() {
return
}
group.setPullingFlag(true)
Log.Infof("[%s] start relay pull. url=%s", group.UniqueKey, group.getPullUrl())
go func() {
pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMs = relayPullTimeoutMs
option.ReadAvTimeoutMs = relayPullReadAvTimeoutMs
})
// TODO(chef): 处理数据回调是否应该等待Add成功之后。避免竞态条件中途加入了其他in session
err := pullSession.Pull(group.getPullUrl(), group.OnReadRtmpAvMsg)
if err != nil {
Log.Errorf("[%s] relay pull fail. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
return
}
res := group.AddRtmpPullSession(pullSession)
if res {
err = <-pullSession.WaitChan()
Log.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err)
group.DelRtmpPullSession(pullSession)
} else {
pullSession.Dispose()
}
}()
}
// 判断是否需要停止pull
//
// 当前调用时机:
// 1. 定时器定时检查
//
func (group *Group) stopPullIfNeeded() {
// 没有输出型的流了
if group.pullProxy.pullSession != nil && !group.hasOutSession() {
Log.Infof("[%s] stop pull since no sub session.", group.UniqueKey)
group.pullProxy.pullSession.Dispose()
}
}

@ -0,0 +1,123 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"github.com/q191201771/lal/pkg/rtmp"
)
func (group *Group) AddRtmpPushSession(url string, session *rtmp.PushSession) {
Log.Debugf("[%s] [%s] add rtmp PushSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.url2PushProxy != nil {
group.url2PushProxy[url].pushSession = session
}
}
func (group *Group) DelRtmpPushSession(url string, session *rtmp.PushSession) {
Log.Debugf("[%s] [%s] del rtmp PushSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
defer group.mutex.Unlock()
if group.url2PushProxy != nil {
group.url2PushProxy[url].pushSession = nil
group.url2PushProxy[url].isPushing = false
}
}
type pushProxy struct {
isPushing bool
pushSession *rtmp.PushSession
}
func (group *Group) initRelayPush() {
enable := group.config.RelayPushConfig.Enable
addrList := group.config.RelayPushConfig.AddrList
appName := group.appName
streamName := group.streamName
url2PushProxy := make(map[string]*pushProxy)
if enable {
for _, addr := range addrList {
pushUrl := fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName)
url2PushProxy[pushUrl] = &pushProxy{
isPushing: false,
pushSession: nil,
}
}
}
group.pushEnable = group.config.RelayPushConfig.Enable
group.url2PushProxy = url2PushProxy
}
// startPushIfNeeded 必要时进行replay push转推
//
func (group *Group) startPushIfNeeded() {
// push转推功能没开
if !group.pushEnable {
return
}
// 没有pub发布者
if group.rtmpPubSession == nil && group.rtspPubSession == nil {
return
}
// relay push时携带rtmp pub的参数
// TODO chef: 这个逻辑放这里不太好看
var urlParam string
if group.rtmpPubSession != nil {
urlParam = group.rtmpPubSession.RawQuery()
}
for url, v := range group.url2PushProxy {
// 正在转推中
if v.isPushing {
continue
}
v.isPushing = true
urlWithParam := url
if urlParam != "" {
urlWithParam += "?" + urlParam
}
Log.Infof("[%s] start relay push. url=%s", group.UniqueKey, urlWithParam)
go func(u, u2 string) {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = relayPushTimeoutMs
option.WriteAvTimeoutMs = relayPushWriteAvTimeoutMs
})
err := pushSession.Push(u2)
if err != nil {
Log.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
return
}
group.AddRtmpPushSession(u, pushSession)
err = <-pushSession.WaitChan()
Log.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
}(url, urlWithParam)
}
}
func (group *Group) stopPushIfNeeded() {
if !group.pushEnable {
return
}
for _, v := range group.url2PushProxy {
if v.pushSession != nil {
v.pushSession.Dispose()
}
v.pushSession = nil
}
}

@ -46,6 +46,7 @@ type ServerManager struct {
rtmpServer *rtmp.Server rtmpServer *rtmp.Server
rtspServer *rtsp.Server rtspServer *rtsp.Server
httpApiServer *HttpApiServer httpApiServer *HttpApiServer
pprofServer *http.Server
exitChan chan struct{} exitChan chan struct{}
mutex sync.Mutex mutex sync.Mutex
@ -95,8 +96,7 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager {
} }
if sm.config.RtmpConfig.Enable { if sm.config.RtmpConfig.Enable {
// TODO(chef): refactor 参数顺序统一。Observer都放最后好一些。比如rtmp和rtsp的NewServer sm.rtmpServer = rtmp.NewServer(sm.config.RtmpConfig.Addr, sm)
sm.rtmpServer = rtmp.NewServer(sm, sm.config.RtmpConfig.Addr)
} }
if sm.config.RtspConfig.Enable { if sm.config.RtspConfig.Enable {
sm.rtspServer = rtsp.NewServer(sm.config.RtspConfig.Addr, sm) sm.rtspServer = rtsp.NewServer(sm.config.RtspConfig.Addr, sm)
@ -105,6 +105,10 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager {
sm.httpApiServer = NewHttpApiServer(sm.config.HttpApiConfig.Addr, sm) sm.httpApiServer = NewHttpApiServer(sm.config.HttpApiConfig.Addr, sm)
} }
if sm.config.PprofConfig.Enable {
sm.pprofServer = &http.Server{Addr: sm.config.PprofConfig.Addr, Handler: nil}
}
sm.simpleAuthCtx = NewSimpleAuthCtx(sm.config.SimpleAuthConfig) sm.simpleAuthCtx = NewSimpleAuthCtx(sm.config.SimpleAuthConfig)
return sm return sm
@ -115,8 +119,15 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager {
func (sm *ServerManager) RunLoop() error { func (sm *ServerManager) RunLoop() error {
sm.option.NotifyHandler.OnServerStart(sm.StatLalInfo()) sm.option.NotifyHandler.OnServerStart(sm.StatLalInfo())
if sm.config.PprofConfig.Enable { if sm.pprofServer != nil {
go runWebPprof(sm.config.PprofConfig.Addr) go func() {
//Log.Warn("start fgprof.")
//http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
Log.Infof("start web pprof listen. addr=%s", sm.config.PprofConfig.Addr)
if err := sm.pprofServer.ListenAndServe(); err != nil {
Log.Error(err)
}
}()
} }
go base.RunSignalHandler(func() { go base.RunSignalHandler(func() {
@ -210,13 +221,13 @@ func (sm *ServerManager) RunLoop() error {
t := time.NewTicker(1 * time.Second) t := time.NewTicker(1 * time.Second)
defer t.Stop() defer t.Stop()
var count uint32 var tickCount uint32
for { for {
select { select {
case <-sm.exitChan: case <-sm.exitChan:
return nil return nil
case <-t.C: case <-t.C:
count++ tickCount++
sm.mutex.Lock() sm.mutex.Lock()
@ -228,13 +239,13 @@ func (sm *ServerManager) RunLoop() error {
return false return false
} }
group.Tick() group.Tick(tickCount)
return true return true
}) })
// 定时打印一些group相关的debug日志 // 定时打印一些group相关的debug日志
if sm.config.DebugConfig.LogGroupIntervalSec > 0 && if sm.config.DebugConfig.LogGroupIntervalSec > 0 &&
count%uint32(sm.config.DebugConfig.LogGroupIntervalSec) == 0 { tickCount%uint32(sm.config.DebugConfig.LogGroupIntervalSec) == 0 {
groupNum := sm.groupManager.Len() groupNum := sm.groupManager.Len()
Log.Debugf("DEBUG_GROUP_LOG: group size=%d", groupNum) Log.Debugf("DEBUG_GROUP_LOG: group size=%d", groupNum)
if sm.config.DebugConfig.LogGroupMaxGroupNum > 0 { if sm.config.DebugConfig.LogGroupMaxGroupNum > 0 {
@ -252,7 +263,7 @@ func (sm *ServerManager) RunLoop() error {
sm.mutex.Unlock() sm.mutex.Unlock()
// 定时通过http notify发送group相关的信息 // 定时通过http notify发送group相关的信息
if uis != 0 && (count%uis) == 0 { if uis != 0 && (tickCount%uis) == 0 {
updateInfo.ServerId = sm.config.ServerId updateInfo.ServerId = sm.config.ServerId
updateInfo.Groups = sm.StatAllGroup() updateInfo.Groups = sm.StatAllGroup()
sm.option.NotifyHandler.OnUpdate(updateInfo) sm.option.NotifyHandler.OnUpdate(updateInfo)
@ -266,11 +277,18 @@ func (sm *ServerManager) RunLoop() error {
func (sm *ServerManager) Dispose() { func (sm *ServerManager) Dispose() {
Log.Debug("dispose server manager.") Log.Debug("dispose server manager.")
// TODO(chef) add httpServer
if sm.rtmpServer != nil { if sm.rtmpServer != nil {
sm.rtmpServer.Dispose() sm.rtmpServer.Dispose()
} }
if sm.httpServerManager != nil {
sm.httpServerManager.Dispose()
}
if sm.pprofServer != nil {
sm.pprofServer.Close()
}
//if sm.hlsServer != nil { //if sm.hlsServer != nil {
// sm.hlsServer.Dispose() // sm.hlsServer.Dispose()
//} //}
@ -634,7 +652,6 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) error {
} }
func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) { func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) {
// TODO chef: impl me
sm.mutex.Lock() sm.mutex.Lock()
defer sm.mutex.Unlock() defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName()) group := sm.getGroup(session.AppName(), session.StreamName())
@ -796,15 +813,3 @@ func (sm *ServerManager) serveHls(writer http.ResponseWriter, req *http.Request)
sm.hlsServerHandler.ServeHTTP(writer, req) sm.hlsServerHandler.ServeHTTP(writer, req)
} }
func runWebPprof(addr string) {
Log.Infof("start web pprof listen. addr=%s", addr)
//Log.Warn("start fgprof.")
//http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
if err := http.ListenAndServe(addr, nil); err != nil {
Log.Error(err)
return
}
}

@ -17,6 +17,9 @@ var (
relayPushWriteAvTimeoutMs = 5000 relayPushWriteAvTimeoutMs = 5000
relayPullTimeoutMs = 5000 relayPullTimeoutMs = 5000
relayPullReadAvTimeoutMs = 5000 relayPullReadAvTimeoutMs = 5000
// calcSessionStatIntervalSec 计算所有session收发码率的时间间隔
//
calcSessionStatIntervalSec uint32 = 5 calcSessionStatIntervalSec uint32 = 5
// checkSessionAliveIntervalSec // checkSessionAliveIntervalSec

@ -8,7 +8,7 @@
package mpegts package mpegts
// Frame 帧数据 // Frame 帧数据用于打包成mpegts格式的数据
// //
type Frame struct { type Frame struct {
Pts uint64 // =(毫秒 * 90) Pts uint64 // =(毫秒 * 90)
@ -34,29 +34,38 @@ type Frame struct {
Raw []byte Raw []byte
} }
// OnTsPacket @param packet: 188字节大小的TS包注意一次Pack对应的多个TSPacket复用的是一块内存 // Pack annexb格式的流转换为mpegts流
// //
type OnTsPacket func(packet []byte) // 注意,内部会增加 Frame.Cc 的值.
// PackTsPacket Annexb格式的流转换为mpegts packet
//
// @param frame: 各字段含义见mpegts.Frame结构体定义
// frame.CC 注意内部会修改frame.CC的值外部在调用结束后可保存CC的值供下次调用时使用
// frame.Raw 函数调用结束后,内部不会持有该内存块
// //
// @param onTsPacket: 注意,一次函数调用,可能对应多次回调 // @return: 内存块为独立申请,调度结束后,内部不再持有
// //
func PackTsPacket(frame *Frame, onTsPacket OnTsPacket) { func (frame *Frame) Pack() []byte {
wpos := 0 // 当前packet的写入位置 bufLen := len(frame.Raw) * 2 // 预分配一块足够大的内存
lpos := 0 // 当前帧的处理位置 if bufLen < 188 {
rpos := len(frame.Raw) // 当前帧大小 bufLen = 188
first := true // 是否为帧的首个packet的标准 }
buf := make([]byte, bufLen)
// TODO(chef): [perf] 由于上层并不需要区分单个packet所以可以考虑预分配内存存储整个packet流 lpos := 0 // 当前输入帧的处理位置
packet := make([]byte, 188) rpos := len(frame.Raw) // 当前输入帧大小
first := true // 是否为帧的首个packet的标准
packetPosAtBuf := 0 // 当前输出packet相对于整个输出内存块的位置
for lpos != rpos { for lpos != rpos {
wpos = 0
// TODO(chef): CHEFNOTICEME 正常来说,预分配的内存应该是足够用了,我们加个扩容逻辑保证绝对正确性,并且加个日志观察一段时间
if packetPosAtBuf+188 > len(buf) {
Log.Warnf("buffer too short. frame size=%d, buf=%d, packetPosAtBuf=%d", len(frame.Raw), len(buf), packetPosAtBuf)
newBuf := make([]byte, packetPosAtBuf+188)
copy(newBuf, buf)
buf = newBuf
}
packet := buf[packetPosAtBuf : packetPosAtBuf+188] // 当前输出packet
wpos := 0 // 当前输出packet的写入位置
packetPosAtBuf += 188
frame.Cc++ frame.Cc++
// 每个packet都需要添加TS Header // 每个packet都需要添加TS Header
@ -224,11 +233,13 @@ func PackTsPacket(frame *Frame, onTsPacket OnTsPacket) {
copy(packet[wpos:], frame.Raw[lpos:lpos+inSize]) copy(packet[wpos:], frame.Raw[lpos:lpos+inSize])
lpos = rpos lpos = rpos
} }
onTsPacket(packet)
} }
return buf[:packetPosAtBuf]
} }
// ----- private -------------------------------------------------------------------------------------------------------
func packPcr(out []byte, pcr uint64) { func packPcr(out []byte, pcr uint64) {
out[0] = uint8(pcr >> 25) out[0] = uint8(pcr >> 25)
out[1] = uint8(pcr >> 17) out[1] = uint8(pcr >> 17)

@ -9,3 +9,5 @@
package remux package remux
// TODO(chef): refactor 此package更名为avop内部包含remux_xxx2xxx.go, filter_xxx.go, 协议相关(比如rtmp.go)等 // TODO(chef): refactor 此package更名为avop内部包含remux_xxx2xxx.go, filter_xxx.go, 协议相关(比如rtmp.go)等
var _ rtmp2MpegtsFilterObserver = &Rtmp2MpegtsRemuxer{}

@ -6,7 +6,7 @@
// //
// Author: Chef (191201771@qq.com) // Author: Chef (191201771@qq.com)
package hls package remux
import ( import (
"encoding/hex" "encoding/hex"
@ -20,31 +20,42 @@ import (
"github.com/q191201771/naza/pkg/nazabytes" "github.com/q191201771/naza/pkg/nazabytes"
) )
type StreamerObserver interface { const (
// OnPatPmt @param b const只读内存块上层可以持有但是不允许修改 initialVideoOutBufferSize = 1024 * 1024
calcFragmentHeaderQueueSize = 16
maxAudioCacheDelayByAudio uint64 = 150 * 90 // 单位(毫秒*90
maxAudioCacheDelayByVideo uint64 = 300 * 90 // 单位(毫秒*90
)
type Rtmp2MpegtsRemuxerObserver interface {
// OnPatPmt
//
// @param b: const只读内存块上层可以持有但是不允许修改
//
OnPatPmt(b []byte) OnPatPmt(b []byte)
// OnFrame // OnTsPackets
// //
// @param streamer: 供上层获取streamer内部的一些状态比如spspps是否已缓存音频缓存队列是否有数据等 // @param tsPackets:
// - mpegts数据有一个或多个188字节的ts数据组成.
// - 回调结束后remux.Rtmp2MpegtsRemuxer 不再使用这块内存块.
// //
// @param frame: 各字段含义见 mpegts.Frame 结构体定义 // @param frame: 各字段含义见 mpegts.Frame 结构体定义
// frame.CC 注意回调结束后Streamer会保存frame.CC上层在TS打包完成后可通过frame.CC将cc值传递给Streamer
// frame.Raw 回调结束后,这块内存可能会被内部重复使用
// //
OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool) OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool)
} }
// Streamer 输入rtmp流回调转封装成Annexb格式的流 // Rtmp2MpegtsRemuxer 输入rtmp流输出mpegts流
type Streamer struct { //
type Rtmp2MpegtsRemuxer struct {
UniqueKey string UniqueKey string
observer StreamerObserver observer Rtmp2MpegtsRemuxerObserver
calcFragmentHeaderQueue *Queue filter *rtmp2MpegtsFilter
videoOut []byte // Annexb TODO chef: 优化这块buff videoOut []byte // Annexb
spspps []byte // Annexb 也可能是vps+sps+pps spspps []byte // Annexb 也可能是vps+sps+pps
ascCtx *aac.AscContext ascCtx *aac.AscContext
audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 TODO chef: 优化这块buff audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧
audioCacheFirstFramePts uint64 // audioCacheFrames中第一个音频帧的时间戳 TODO chef: rename to DTS audioCacheFirstFramePts uint64 // audioCacheFrames中第一个音频帧的时间戳 TODO chef: rename to DTS
audioCc uint8 audioCc uint8
videoCc uint8 videoCc uint8
@ -52,39 +63,29 @@ type Streamer struct {
opened bool opened bool
} }
func NewStreamer(observer StreamerObserver) *Streamer { func NewRtmp2MpegtsRemuxer(observer Rtmp2MpegtsRemuxerObserver) *Rtmp2MpegtsRemuxer {
uk := base.GenUkStreamer() uk := base.GenUkRtmp2MpegtsRemuxer()
videoOut := make([]byte, 1024*1024) r := &Rtmp2MpegtsRemuxer{
videoOut = videoOut[0:0]
streamer := &Streamer{
UniqueKey: uk, UniqueKey: uk,
observer: observer, observer: observer,
videoOut: videoOut,
} }
streamer.calcFragmentHeaderQueue = NewQueue(calcFragmentHeaderQueueSize, streamer) r.audioCacheFrames = nil
return streamer r.videoOut = make([]byte, initialVideoOutBufferSize)
r.videoOut = r.videoOut[0:0]
r.filter = newRtmp2MpegtsFilter(calcFragmentHeaderQueueSize, r)
return r
} }
// FeedRtmpMessage @param msg msg.Payload 调用结束后,函数内部不会持有这块内存 // FeedRtmpMessage
// //
// TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接 // @param msg: msg.Payload 调用结束后,函数内部不会持有这块内存
func (s *Streamer) FeedRtmpMessage(msg base.RtmpMsg) { //
s.calcFragmentHeaderQueue.Push(msg) func (s *Rtmp2MpegtsRemuxer) FeedRtmpMessage(msg base.RtmpMsg) {
} s.filter.Push(msg)
// ----- implement IQueueObserver of Queue -----------------------------------------------------------------------------
func (s *Streamer) OnPatPmt(b []byte) {
s.observer.OnPatPmt(b)
} }
func (s *Streamer) OnPop(msg base.RtmpMsg) { func (s *Rtmp2MpegtsRemuxer) Dispose() {
switch msg.Header.MsgTypeId { s.FlushAudio()
case base.RtmpTypeIdAudio:
s.feedAudio(msg)
case base.RtmpTypeIdVideo:
s.feedVideo(msg)
}
} }
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
@ -92,12 +93,12 @@ func (s *Streamer) OnPop(msg base.RtmpMsg) {
// FlushAudio // FlushAudio
// //
// 吐出音频数据的三种情况: // 吐出音频数据的三种情况:
// 1. 收到音频或视频时,音频缓存队列已达到一定长度 // 1. 收到音频或视频时,音频缓存队列已达到一定长度(内部判断)
// 2. 打开一个新的TS文件切片时 // 2. 打开一个新的TS文件切片时
// 3. 输入流关闭时 // 3. 输入流关闭时
// //
func (s *Streamer) FlushAudio() { func (s *Rtmp2MpegtsRemuxer) FlushAudio() {
if s.audioCacheFrames == nil { if s.audioCacheEmpty() {
return return
} }
@ -109,27 +110,37 @@ func (s *Streamer) FlushAudio() {
frame.Raw = s.audioCacheFrames frame.Raw = s.audioCacheFrames
frame.Pid = mpegts.PidAudio frame.Pid = mpegts.PidAudio
frame.Sid = mpegts.StreamIdAudio frame.Sid = mpegts.StreamIdAudio
// 注意在回调前设置为空因为回调中有可能再次调用FlushAudio
s.resetAudioCache()
s.onFrame(&frame) s.onFrame(&frame)
// 回调结束后更新cc
s.audioCc = frame.Cc s.audioCc = frame.Cc
s.audioCacheFrames = nil
} }
func (s *Streamer) AudioSeqHeaderCached() bool { // ---------------------------------------------------------------------------------------------------------------------
return s.ascCtx != nil
}
func (s *Streamer) VideoSeqHeaderCached() bool { // onPatPmt onPop
return s.spspps != nil //
// 实现 rtmp2MpegtsFilterObserver
//
func (s *Rtmp2MpegtsRemuxer) onPatPmt(b []byte) {
s.observer.OnPatPmt(b)
} }
func (s *Streamer) AudioCacheEmpty() bool { func (s *Rtmp2MpegtsRemuxer) onPop(msg base.RtmpMsg) {
return s.audioCacheFrames == nil switch msg.Header.MsgTypeId {
case base.RtmpTypeIdAudio:
s.feedAudio(msg)
case base.RtmpTypeIdVideo:
s.feedVideo(msg)
}
} }
// ----- private ------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
func (s *Streamer) feedVideo(msg base.RtmpMsg) { func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) {
// 注意有一种情况是msg.Payload为 27 02 00 00 00 // 注意有一种情况是msg.Payload为 27 02 00 00 00
// 此时打印错误并返回也不影响 // 此时打印错误并返回也不影响
// //
@ -163,8 +174,7 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) {
audSent := false audSent := false
spsppsSent := false spsppsSent := false
// 优化这块buffer s.resetVideoOutBuffer()
out := s.videoOut[0:0]
// msg中可能有多个NALU逐个获取 // msg中可能有多个NALU逐个获取
nals, err := avc.SplitNaluAvcc(msg.Payload[5:]) nals, err := avc.SplitNaluAvcc(msg.Payload[5:])
@ -197,9 +207,9 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) {
//if codecId == base.RtmpCodecIdAvc && (nalType == avc.NaluTypeSei || nalType == avc.NaluTypeIdrSlice || nalType == avc.NaluTypeSlice) { //if codecId == base.RtmpCodecIdAvc && (nalType == avc.NaluTypeSei || nalType == avc.NaluTypeIdrSlice || nalType == avc.NaluTypeSlice) {
switch codecId { switch codecId {
case base.RtmpCodecIdAvc: case base.RtmpCodecIdAvc:
out = append(out, avc.AudNalu...) s.videoOut = append(s.videoOut, avc.AudNalu...)
case base.RtmpCodecIdHevc: case base.RtmpCodecIdHevc:
out = append(out, hevc.AudNalu...) s.videoOut = append(s.videoOut, hevc.AudNalu...)
} }
audSent = true audSent = true
} }
@ -210,7 +220,7 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) {
switch nalType { switch nalType {
case avc.NaluTypeIdrSlice: case avc.NaluTypeIdrSlice:
if !spsppsSent { if !spsppsSent {
if out, err = s.appendSpsPps(out); err != nil { if s.videoOut, err = s.appendSpsPps(s.videoOut); err != nil {
Log.Warnf("[%s] append spspps by not exist.", s.UniqueKey) Log.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
return return
} }
@ -224,7 +234,7 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) {
switch nalType { switch nalType {
case hevc.NaluTypeSliceIdr, hevc.NaluTypeSliceIdrNlp, hevc.NaluTypeSliceCranut: case hevc.NaluTypeSliceIdr, hevc.NaluTypeSliceIdrNlp, hevc.NaluTypeSliceCranut:
if !spsppsSent { if !spsppsSent {
if out, err = s.appendSpsPps(out); err != nil { if s.videoOut, err = s.appendSpsPps(s.videoOut); err != nil {
Log.Warnf("[%s] append spspps by not exist.", s.UniqueKey) Log.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
return return
} }
@ -238,18 +248,18 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) {
// 如果写入了aud或spspps则用start code3否则start code4。为什么要这样处理 // 如果写入了aud或spspps则用start code3否则start code4。为什么要这样处理
// 这里不知为什么要区分写入两种类型的start code // 这里不知为什么要区分写入两种类型的start code
if len(out) == 0 { if len(s.videoOut) == 0 {
out = append(out, avc.NaluStartCode4...) s.videoOut = append(s.videoOut, avc.NaluStartCode4...)
} else { } else {
out = append(out, avc.NaluStartCode3...) s.videoOut = append(s.videoOut, avc.NaluStartCode3...)
} }
out = append(out, nal...) s.videoOut = append(s.videoOut, nal...)
} }
dts := uint64(msg.Header.TimestampAbs) * 90 dts := uint64(msg.Header.TimestampAbs) * 90
if s.audioCacheFrames != nil && s.audioCacheFirstFramePts+maxAudioCacheDelayByVideo < dts { if !s.audioCacheEmpty() && s.audioCacheFirstFramePts+maxAudioCacheDelayByVideo < dts {
s.FlushAudio() s.FlushAudio()
} }
@ -258,7 +268,7 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) {
frame.Dts = dts frame.Dts = dts
frame.Pts = frame.Dts + uint64(cts)*90 frame.Pts = frame.Dts + uint64(cts)*90
frame.Key = msg.IsVideoKeyNalu() frame.Key = msg.IsVideoKeyNalu()
frame.Raw = out frame.Raw = s.videoOut
frame.Pid = mpegts.PidVideo frame.Pid = mpegts.PidVideo
frame.Sid = mpegts.StreamIdVideo frame.Sid = mpegts.StreamIdVideo
@ -266,7 +276,7 @@ func (s *Streamer) feedVideo(msg base.RtmpMsg) {
s.videoCc = frame.Cc s.videoCc = frame.Cc
} }
func (s *Streamer) feedAudio(msg base.RtmpMsg) { func (s *Rtmp2MpegtsRemuxer) feedAudio(msg base.RtmpMsg) {
if len(msg.Payload) < 3 { if len(msg.Payload) < 3 {
Log.Errorf("[%s] invalid audio message length. len=%d", s.UniqueKey, len(msg.Payload)) Log.Errorf("[%s] invalid audio message length. len=%d", s.UniqueKey, len(msg.Payload))
return return
@ -284,18 +294,18 @@ func (s *Streamer) feedAudio(msg base.RtmpMsg) {
return return
} }
if !s.AudioSeqHeaderCached() { if !s.audioSeqHeaderCached() {
Log.Warnf("[%s] feed audio message but aac seq header not exist.", s.UniqueKey) Log.Warnf("[%s] feed audio message but aac seq header not exist.", s.UniqueKey)
return return
} }
pts := uint64(msg.Header.TimestampAbs) * 90 pts := uint64(msg.Header.TimestampAbs) * 90
if s.audioCacheFrames != nil && s.audioCacheFirstFramePts+maxAudioCacheDelayByAudio < pts { if !s.audioCacheEmpty() && s.audioCacheFirstFramePts+maxAudioCacheDelayByAudio < pts {
s.FlushAudio() s.FlushAudio()
} }
if s.audioCacheFrames == nil { if s.audioCacheEmpty() {
s.audioCacheFirstFramePts = pts s.audioCacheFirstFramePts = pts
} }
@ -304,13 +314,17 @@ func (s *Streamer) feedAudio(msg base.RtmpMsg) {
s.audioCacheFrames = append(s.audioCacheFrames, msg.Payload[2:]...) s.audioCacheFrames = append(s.audioCacheFrames, msg.Payload[2:]...)
} }
func (s *Streamer) cacheAacSeqHeader(msg base.RtmpMsg) error { func (s *Rtmp2MpegtsRemuxer) cacheAacSeqHeader(msg base.RtmpMsg) error {
var err error var err error
s.ascCtx, err = aac.NewAscContext(msg.Payload[2:]) s.ascCtx, err = aac.NewAscContext(msg.Payload[2:])
return err return err
} }
func (s *Streamer) appendSpsPps(out []byte) ([]byte, error) { func (s *Rtmp2MpegtsRemuxer) audioSeqHeaderCached() bool {
return s.ascCtx != nil
}
func (s *Rtmp2MpegtsRemuxer) appendSpsPps(out []byte) ([]byte, error) {
if s.spspps == nil { if s.spspps == nil {
return out, base.ErrHls return out, base.ErrHls
} }
@ -319,12 +333,28 @@ func (s *Streamer) appendSpsPps(out []byte) ([]byte, error) {
return out, nil return out, nil
} }
func (s *Streamer) onFrame(frame *mpegts.Frame) { func (s *Rtmp2MpegtsRemuxer) videoSeqHeaderCached() bool {
return s.spspps != nil
}
func (s *Rtmp2MpegtsRemuxer) audioCacheEmpty() bool {
return len(s.audioCacheFrames) == 0
}
func (s *Rtmp2MpegtsRemuxer) resetAudioCache() {
s.audioCacheFrames = s.audioCacheFrames[0:0]
}
func (s *Rtmp2MpegtsRemuxer) resetVideoOutBuffer() {
s.videoOut = s.videoOut[0:0]
}
func (s *Rtmp2MpegtsRemuxer) onFrame(frame *mpegts.Frame) {
var boundary bool var boundary bool
if frame.Sid == mpegts.StreamIdAudio { if frame.Sid == mpegts.StreamIdAudio {
// 为了考虑没有视频的情况也能切片所以这里判断spspps为空时也建议生成fragment // 为了考虑没有视频的情况也能切片所以这里判断spspps为空时也建议生成fragment
boundary = !s.VideoSeqHeaderCached() boundary = !s.videoSeqHeaderCached()
} else { } else {
// 收到视频可能触发建立fragment的条件是 // 收到视频可能触发建立fragment的条件是
// 关键帧数据 && // 关键帧数据 &&
@ -333,12 +363,14 @@ func (s *Streamer) onFrame(frame *mpegts.Frame) {
// (收到过音频seq header && fragment没有打开) || 说明 音视频都有且都已ready // (收到过音频seq header && fragment没有打开) || 说明 音视频都有且都已ready
// (收到过音频seq header && fragment已经打开 && 音频缓存数据不为空) 说明 为什么音频缓存需不为空? // (收到过音频seq header && fragment已经打开 && 音频缓存数据不为空) 说明 为什么音频缓存需不为空?
// ) // )
boundary = frame.Key && (!s.AudioSeqHeaderCached() || !s.opened || !s.AudioCacheEmpty()) boundary = frame.Key && (!s.audioSeqHeaderCached() || !s.opened || !s.audioCacheEmpty())
} }
if boundary { if boundary {
s.opened = true s.opened = true
} }
s.observer.OnFrame(s, frame, boundary) packets := frame.Pack()
s.observer.OnTsPackets(packets, frame, boundary)
} }

@ -6,39 +6,49 @@
// //
// Author: Chef (191201771@qq.com) // Author: Chef (191201771@qq.com)
package hls package remux
import ( import (
"github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/mpegts" "github.com/q191201771/lal/pkg/mpegts"
) )
// Queue // rtmp2MpegtsFilter
//
// 缓存流起始的一些数据判断流中是否存在音频、视频以及编码格式生成正确的mpegts PatPmt头信息
// //
// 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式
// 一旦判断结束,该队列变成直进直出,不再有实际缓存 // 一旦判断结束,该队列变成直进直出,不再有实际缓存
// //
type Queue struct { type rtmp2MpegtsFilter struct {
maxMsgSize int maxMsgSize int
data []base.RtmpMsg data []base.RtmpMsg
observer IQueueObserver observer rtmp2MpegtsFilterObserver
audioCodecId int audioCodecId int
videoCodecId int videoCodecId int
done bool done bool
} }
type IQueueObserver interface { type rtmp2MpegtsFilterObserver interface {
// OnPatPmt 该回调一定发生在数据回调之前 // OnPatPmt
//
// 该回调一定发生在数据回调之前
// 只会返回两种格式h264和h265
//
// TODO(chef): [opt] 当没有视频时不应该返回h264的格式
// TODO(chef) 这里可以考虑换成只通知drain由上层完成FragmentHeader的组装逻辑 // TODO(chef) 这里可以考虑换成只通知drain由上层完成FragmentHeader的组装逻辑
OnPatPmt(b []byte) //
onPatPmt(b []byte)
OnPop(msg base.RtmpMsg) onPop(msg base.RtmpMsg)
} }
// NewQueue @param maxMsgSize 最大缓存多少个包 // NewRtmp2MpegtsFilter
func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue { //
return &Queue{ // @param maxMsgSize: 最大缓存多少个包
//
func newRtmp2MpegtsFilter(maxMsgSize int, observer rtmp2MpegtsFilterObserver) *rtmp2MpegtsFilter {
return &rtmp2MpegtsFilter{
maxMsgSize: maxMsgSize, maxMsgSize: maxMsgSize,
data: make([]base.RtmpMsg, maxMsgSize)[0:0], data: make([]base.RtmpMsg, maxMsgSize)[0:0],
observer: observer, observer: observer,
@ -48,10 +58,13 @@ func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue {
} }
} }
// Push @param msg 函数调用结束后,内部不持有该内存块 // Push
func (q *Queue) Push(msg base.RtmpMsg) { //
// @param msg: 函数调用结束后,内部不持有该内存块
//
func (q *rtmp2MpegtsFilter) Push(msg base.RtmpMsg) {
if q.done { if q.done {
q.observer.OnPop(msg) q.observer.onPop(msg)
return return
} }
@ -75,18 +88,20 @@ func (q *Queue) Push(msg base.RtmpMsg) {
} }
} }
func (q *Queue) drain() { // ---------------------------------------------------------------------------------------------------------------------
func (q *rtmp2MpegtsFilter) drain() {
switch q.videoCodecId { switch q.videoCodecId {
case int(base.RtmpCodecIdAvc): case int(base.RtmpCodecIdAvc):
q.observer.OnPatPmt(mpegts.FixedFragmentHeader) q.observer.onPatPmt(mpegts.FixedFragmentHeader)
case int(base.RtmpCodecIdHevc): case int(base.RtmpCodecIdHevc):
q.observer.OnPatPmt(mpegts.FixedFragmentHeaderHevc) q.observer.onPatPmt(mpegts.FixedFragmentHeaderHevc)
default: default:
// TODO(chef) 正确处理只有音频或只有视频的情况 #56 // TODO(chef) 正确处理只有音频或只有视频的情况 #56
q.observer.OnPatPmt(mpegts.FixedFragmentHeader) q.observer.onPatPmt(mpegts.FixedFragmentHeader)
} }
for i := range q.data { for i := range q.data {
q.observer.OnPop(q.data[i]) q.observer.onPop(q.data[i])
} }
q.data = nil q.data = nil

@ -6,7 +6,7 @@
// //
// Author: Chef (191201771@qq.com) // Author: Chef (191201771@qq.com)
package hls package remux
import ( import (
"testing" "testing"
@ -24,15 +24,15 @@ var (
type qo struct { type qo struct {
} }
func (q *qo) OnPatPmt(b []byte) { func (q *qo) onPatPmt(b []byte) {
fh = b fh = b
} }
func (q *qo) OnPop(msg base.RtmpMsg) { func (q *qo) onPop(msg base.RtmpMsg) {
poped = append(poped, msg) poped = append(poped, msg)
} }
func TestQueue(t *testing.T) { func TestRtmp2MpegtsFilter(t *testing.T) {
goldenRtmpMsg := []base.RtmpMsg{ goldenRtmpMsg := []base.RtmpMsg{
{ {
Header: base.RtmpHeader{ Header: base.RtmpHeader{
@ -49,9 +49,9 @@ func TestQueue(t *testing.T) {
} }
q := &qo{} q := &qo{}
queue := NewQueue(8, q) f := newRtmp2MpegtsFilter(8, q)
for i := range goldenRtmpMsg { for i := range goldenRtmpMsg {
queue.Push(goldenRtmpMsg[i]) f.Push(goldenRtmpMsg[i])
} }
assert.Equal(t, mpegts.FixedFragmentHeader, fh) assert.Equal(t, mpegts.FixedFragmentHeader, fh)
assert.Equal(t, goldenRtmpMsg, poped) assert.Equal(t, goldenRtmpMsg, poped)

@ -192,7 +192,7 @@ func (r *Rtmp2RtspRemuxer) getAudioPacker() *rtprtcp.RtpPacker {
} }
clockRate, err := ascCtx.GetSamplingFrequency() clockRate, err := ascCtx.GetSamplingFrequency()
if err != nil { if err != nil {
Log.Errorf("get sampling frequency failed. err=%+v", err) Log.Errorf("get sampling frequency failed. err=%+v, asc=%s", err, hex.Dump(r.asc))
} }
pp := rtprtcp.NewRtpPackerPayloadAac() pp := rtprtcp.NewRtpPackerPayloadAac()

@ -108,7 +108,6 @@ func calcHeader(header *base.RtmpHeader, prevHeader *base.RtmpHeader, out []byte
// 设置扩展时间戳 // 设置扩展时间戳
if timestamp > maxTimestampInMessageHeader { if timestamp > maxTimestampInMessageHeader {
//log.Debugf("CHEFERASEME %+v %+v %d %d", header, prevHeader, timestamp, index)
bele.BePutUint32(out[index:], timestamp) bele.BePutUint32(out[index:], timestamp)
index += 4 index += 4
} }

@ -34,15 +34,15 @@ type ServerObserver interface {
} }
type Server struct { type Server struct {
observer ServerObserver
addr string addr string
observer ServerObserver
ln net.Listener ln net.Listener
} }
func NewServer(observer ServerObserver, addr string) *Server { func NewServer(addr string, observer ServerObserver) *Server {
return &Server{ return &Server{
observer: observer,
addr: addr, addr: addr,
observer: observer,
} }
} }

Loading…
Cancel
Save