diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d10a1e..4dcb748 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -**v0.1.0** +#### v0.1.0 - /app/flvfile2rtmppush 优化推流平稳性 - bugfix rtmp 推拉流信令时可以携带 url 参数,并且在做上下行匹配时去掉 url 参数 @@ -14,6 +14,6 @@ - 更新 nezha 0.1.0 - errors.PanicIfErrorOccur -> log.FatalIfErrorNotNil -**v0.0.1** +#### v0.0.1 1. 提供 `/app/flvfile2rtmppush` 给业务方使用 diff --git a/README.md b/README.md index c2d5aa0..422ff3d 100644 --- a/README.md +++ b/README.md @@ -32,11 +32,11 @@ Go语言编写的流媒体 库 / 客户端 / 服务端 ``` app/ ......各种main包的源码文件,一个子目录对应一个main包,即对应可生成一个可执行文件 |-- lal/ ......[最重要的] 流媒体服务器 +|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件 |-- flvfile2rtmppush ......rtmp推流客户端,输入是本地flv文件,文件推送完毕后,可循环推送(rtmp push流并不断开) -|-- rtmppull ......rtmp拉流客户端 |-- httpflvpull ......http-flv拉流客户端 |-- modflvfile ......修改本地flv文件 -|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件 +|-- rtmppull ......rtmp拉流客户端,存储为本地flv文件 pkg/ ......源码包 |-- aac/ ......音频aac编解码格式相关 |-- avc/ ......视频avc h264编解码格式相关 @@ -67,14 +67,31 @@ $git clone https://github.com/q191201771/lal.git && cd lal && ./build.sh "addr": ":19350" // rtmp服务监听的端口 }, "log": { - "level": 0, // 日志级别,0 debug 1 info 2 warn 3 error + "level": 1, // 日志级别,1 debug, 2 info, 3 warn, 4 error, 5 fatal "filename": "./logs/lal.log", // 日志输出文件 "is_to_stdout": true, // 是否打印至标志控制台输出 - "rotate_mbyte": 1024 // 日志按大小翻滚 + "is_rotate_daily": true, // 日志按天翻滚 + "short_file_flag": true // 日志末尾是否携带源码文件名以及行号的信息 } } ``` +### 测试过的客户端 + +``` +推流端: +- OBS 21.0.3(mac) +- ffmpeg 3.4.2(mac) +- srs-bench (srs项目配套的一个压测工具) +- flvfile2rtmppush (lal app中的rtmp推流客户端) + +拉流端: +- VLC 2.2.6(mac) +- MPV 0.29.1(mac) +- ffmpeg 3.4.2(mac) +- srs-bench (srs项目配套的一个压测工具) +``` + #### roadmap 有建议、意见、bug、功能等等欢迎提 issue 啊,100% 会回复的。 diff --git a/app/flvfile2rtmppush/flvfile2rtmppush.go b/app/flvfile2rtmppush/flvfile2rtmppush.go index 69c672b..068dc9c 100644 --- a/app/flvfile2rtmppush/flvfile2rtmppush.go +++ b/app/flvfile2rtmppush/flvfile2rtmppush.go @@ -68,9 +68,9 @@ func main() { // TODO chef: 转换代码放入lal某个包中 var h rtmp.Header - h.MsgLen = int(tag.Header.DataSize) //len(tag.Raw)-httpflv.TagHeaderSize + h.MsgLen = tag.Header.DataSize //len(tag.Raw)-httpflv.TagHeaderSize - h.MsgTypeID = int(tag.Header.T) + h.MsgTypeID = tag.Header.T h.MsgStreamID = rtmp.MSID1 switch tag.Header.T { case httpflv.TagTypeMetadata: diff --git a/app/lal/group_manager.go b/app/lal/group_manager.go deleted file mode 100644 index 53e9841..0000000 --- a/app/lal/group_manager.go +++ /dev/null @@ -1,131 +0,0 @@ -package main - -import ( - "github.com/q191201771/lal/pkg/httpflv" - "github.com/q191201771/lal/pkg/rtmp" - "github.com/q191201771/nezha/pkg/log" - "github.com/q191201771/nezha/pkg/unique" - "sync" -) - -// TODO chef: 没有sub了一定时间后,停止pull - -type GroupManager struct { - config *Config - appName string - streamName string - - exitChan chan struct{} - rtmpGroup *rtmp.Group - httpFlvGroup *httpflv.Group - mutex sync.Mutex - - UniqueKey string -} - -func NewGroupManager(appName string, streamName string, config *Config) *GroupManager { - uk := unique.GenUniqueKey("GROUPMANAGER") - log.Infof("lifecycle new lal.GroupManager. [%s] appName=%s streamName=%s", uk, appName, streamName) - - return &GroupManager{ - config: config, - appName: appName, - streamName: streamName, - exitChan: make(chan struct{}), - UniqueKey: uk, - } -} - -func (gm *GroupManager) RunLoop() { - <- gm.exitChan -} - -func (gm *GroupManager) Dispose(err error) { - log.Infof("lifecycle dispose lal.GroupManager. [%s] reason=%v", gm.UniqueKey, err) - gm.exitChan <- struct{}{} -} - -// 返回true则允许推流,返回false则关闭连接 -func (gm *GroupManager) AddRTMPPubSession(session *rtmp.ServerSession, rtmpGroup *rtmp.Group) bool { - gm.attachRTMPGroup(rtmpGroup) - - return !gm.isInExist() -} - -func (gm *GroupManager) AddRTMPSubSession(session *rtmp.ServerSession, rtmpGroup *rtmp.Group) { - gm.attachRTMPGroup(rtmpGroup) - - gm.pullIfNeeded() -} - -func (gm *GroupManager) AddHTTPFlvSubSession(session *httpflv.SubSession, httpFlvGroup *httpflv.Group) { - gm.attachHTTPFlvGroup(httpFlvGroup) - - gm.pullIfNeeded() -} - -func (gm *GroupManager) IsTotalEmpty() bool { - gm.mutex.Lock() - defer gm.mutex.Unlock() - return (gm.rtmpGroup == nil || gm.rtmpGroup.IsTotalEmpty()) && - (gm.httpFlvGroup == nil || gm.httpFlvGroup.IsTotalEmpty()) -} - -// GroupObserver of rtmp.Group -func (gm *GroupManager) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { - - // TODO chef: broadcast to httpflv.Group -} - -// GroupObserver of httpflv.Group -func (gm *GroupManager) ReadHTTPRespHeaderCB() { - // noop -} - -// GroupObserver of httpflv.Group -func (gm *GroupManager) ReadFlvHeaderCB(flvHeader []byte) { - // noop -} - -// GroupObserver of httpflv.Group -func (gm *GroupManager) ReadFlvTagCB(tag *httpflv.Tag) { - log.Info("ReadFlvTagCB") - - // TODO chef: broadcast to rtmp.Group -} - -func (gm *GroupManager) attachRTMPGroup(rtmpGroup *rtmp.Group) { - gm.mutex.Lock() - defer gm.mutex.Unlock() - if gm.rtmpGroup != nil && gm.rtmpGroup != rtmpGroup { - log.Warnf("duplicate rtmp group in group manager. %+v %+v", gm.rtmpGroup, rtmpGroup) - } - gm.rtmpGroup = rtmpGroup - rtmpGroup.SetObserver(gm) -} - -func (gm *GroupManager) attachHTTPFlvGroup(httpFlvGroup *httpflv.Group) { - gm.mutex.Lock() - defer gm.mutex.Unlock() - if gm.httpFlvGroup != nil && gm.httpFlvGroup != httpFlvGroup { - log.Warnf("duplicate http flv group in group manager. %+v %+v", gm.httpFlvGroup, httpFlvGroup) - } - gm.httpFlvGroup = httpFlvGroup - httpFlvGroup.SetObserver(gm) -} - -func (gm *GroupManager) pullIfNeeded() { - if !gm.isInExist() { - switch gm.config.Pull.Type { - case "httpflv": - go gm.httpFlvGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout, gm.config.Pull.ReadTimeout) - case "rtmp": - go gm.rtmpGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout) - } - } -} - -func (gm *GroupManager) isInExist() bool { - return (gm.rtmpGroup != nil && gm.rtmpGroup.IsInExist()) || - (gm.httpFlvGroup != nil && gm.httpFlvGroup.IsInExist()) -} diff --git a/app/lal/main.go b/app/lal/main.go index 1e896e4..6d8fa9c 100644 --- a/app/lal/main.go +++ b/app/lal/main.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/q191201771/nezha/pkg/bininfo" "github.com/q191201771/nezha/pkg/log" + "github.com/q191201771/lal/pkg/logic" "net/http" _ "net/http/pprof" "os" @@ -12,7 +13,7 @@ import ( "syscall" ) -var sm *ServerManager +var sm *logic.ServerManager func main() { confFile := parseFlag() @@ -20,9 +21,11 @@ func main() { initLog(config.Log) log.Infof("bininfo: %s", bininfo.StringifySingleLine()) - sm = NewServerManager(config) + sm = logic.NewServerManager(config) - go runWebPProf() + if config.PProf.Addr != "" { + go runWebPProf(config.PProf.Addr) + } go runSignalHandler() sm.RunLoop() @@ -43,16 +46,8 @@ func parseFlag() string { return *cf } -func initLog(config log.Config) { - if err := log.Init(config); err != nil { - _, _ = fmt.Fprintf(os.Stderr, "initial log failed. err=%+v", err) - os.Exit(1) - } - log.Info("initial log succ.") -} - -func loadConf(confFile string) *Config { - config, err := LoadConf(confFile) +func loadConf(confFile string) *logic.Config { + config, err := logic.LoadConf(confFile) if err != nil { log.Errorf("load conf failed. file=%s err=%+v", confFile, err) os.Exit(1) @@ -61,9 +56,15 @@ func loadConf(confFile string) *Config { return config } -func runWebPProf() { - // TODO chef: config me - addr := ":10001" +func initLog(config log.Config) { + if err := log.Init(config); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "initial log failed. err=%+v", err) + os.Exit(1) + } + log.Info("initial log succ.") +} + +func runWebPProf(addr string) { log.Infof("start web pprof listen. addr=%s", addr) if err := http.ListenAndServe(addr, nil); err != nil { log.Error(err) diff --git a/app/rtmppull/rtmppull.go b/app/rtmppull/rtmppull.go index bdd2ba6..d168ef4 100644 --- a/app/rtmppull/rtmppull.go +++ b/app/rtmppull/rtmppull.go @@ -2,38 +2,52 @@ package main import ( "flag" + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/logic" "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/nezha/pkg/log" "os" ) type Obs struct { + w httpflv.FlvFileWriter } -func (obs Obs) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { +func (obs *Obs) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { log.Infof("%+v, abs ts=%d", header, timestampAbs) + tag := logic.Trans.RTMPMsg2FlvTag(header, timestampAbs, message) + err := obs.w.WriteTag(tag) + log.FatalIfErrorNotNil(err) } func main() { - url := parseFlag() + url, outFileName := parseFlag() var obs Obs - session := rtmp.NewPullSession(obs, rtmp.PullSessionTimeout{ + session := rtmp.NewPullSession(&obs, rtmp.PullSessionTimeout{ ConnectTimeoutMS: 3000, PullTimeoutMS: 5000, ReadAVTimeoutMS: 10000, }) err := session.Pull(url) log.FatalIfErrorNotNil(err) + + err = obs.w.Open(outFileName) + log.FatalIfErrorNotNil(err) + //defer obs.w.Dispose() + err = obs.w.WriteRaw(httpflv.FlvHeader) + log.FatalIfErrorNotNil(err) + err = session.WaitLoop() log.FatalIfErrorNotNil(err) } -func parseFlag() string { - url := flag.String("i", "", "specify rtmp url") +func parseFlag() (string, string) { + i := flag.String("i", "", "specify pull rtmp url") + o := flag.String("o", "", "specify ouput flv file") flag.Parse() - if *url == "" { + if *i == "" || *o == "" { flag.Usage() os.Exit(1) } - return *url + return *i, *o } diff --git a/conf/lal.conf.json b/conf/lal.conf.json index 78bd0cd..0f4e607 100644 --- a/conf/lal.conf.json +++ b/conf/lal.conf.json @@ -3,9 +3,13 @@ "addr": ":19350" }, "log": { - "level": 0, + "level": 1, "filename": "./logs/lal.log", "is_to_stdout": true, - "rotate_mbyte": 1024 + "is_rotate_daily": true, + "short_file_flag": true + }, + "pprof": { + "addr": ":10001" } } diff --git a/go.mod b/go.mod index d85839c..144a8a1 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.12 -require github.com/q191201771/nezha v0.1.0 +require github.com/q191201771/nezha v0.1.1-0.20190916054132-d7c7d6a55337 diff --git a/go.sum b/go.sum index 22a6e90..fddabb3 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/nezha v0.1.0 h1:ZCFC5g9Vc5jNGG/hSMMBxF2EF8BsWiBMMTMqdnM1Uew= -github.com/q191201771/nezha v0.1.0/go.mod h1:Rd4R+bJRemlSUnz7KHmSX6ZQlsHLBjT7wlzuLeOia/M= +github.com/q191201771/nezha v0.1.1-0.20190916054132-d7c7d6a55337 h1:rNlCVOQNOgoo5fja0UKPbpI5TrS4moe8lFoCSKZ55PM= +github.com/q191201771/nezha v0.1.1-0.20190916054132-d7c7d6a55337/go.mod h1:Rd4R+bJRemlSUnz7KHmSX6ZQlsHLBjT7wlzuLeOia/M= diff --git a/pkg/aac/aac.go b/pkg/aac/aac.go index dd647ef..02923ff 100644 --- a/pkg/aac/aac.go +++ b/pkg/aac/aac.go @@ -22,12 +22,13 @@ type ADTS struct { // 传入 AAC Sequence Header,一会生成 ADTS 头时需要使用 // @param rtmp message payload,包含前面2个字节 func (obj *ADTS) PutAACSequenceHeader(payload []byte) { + log.Debugf(hex.Dump(payload[:4])) soundFormat := payload[0] >> 4 // 10=AAC soundRate := (payload[0] >> 2) & 0x03 // 3=44kHz. For AAC: always 3 soundSize := (payload[0] >> 1) & 0x01 // 0=snd8Bit 1=snd16Bit soundType := payload[0] & 0x01 // For AAC: always 1 - - //aacPacketType := payload[1] // 0:sequence header 1:AAC raw + aacPacketType := payload[1] // 0:sequence header 1:AAC raw + log.Debugf("%d %d %d %d %d", soundFormat, soundRate, soundSize, soundType, aacPacketType) obj.audioObjectType = (payload[2] & 0xf8) >> 3 // 5bit 编码结构类型 obj.samplingFrequencyIndex = ((payload[2] & 0x07) << 1) | (payload[3] >> 7) // 4bit 音频采样率索引值 @@ -35,9 +36,6 @@ func (obj *ADTS) PutAACSequenceHeader(payload []byte) { obj.frameLengthFlag = (payload[3] & 0x04) >> 2 // 1bit obj.dependOnCoreCoder = (payload[3] & 0x02) >> 1 // 1bit obj.extensionFlag = payload[3] & 0x01 // 1bit - - log.Debugf(hex.Dump(payload[:4])) - log.Debugf("%d %d %d %d", soundFormat, soundRate, soundSize, soundType) log.Debugf("%+v", obj) } diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index f10c61a..53afcef 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -64,7 +64,7 @@ func (session *PullSession) Dispose(err error) { session.closeOnce.Do(func() { log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err) if err := session.Conn.Close(); err != nil { - log.Error("conn close error. [%s] err=%v", session.UniqueKey, err) + log.Errorf("conn close error. [%s] err=%v", session.UniqueKey, err) } }) } diff --git a/pkg/httpflv/flv_file_writer.go b/pkg/httpflv/flv_file_writer.go index dbb8f66..e406e7e 100644 --- a/pkg/httpflv/flv_file_writer.go +++ b/pkg/httpflv/flv_file_writer.go @@ -16,7 +16,7 @@ func (ffw *FlvFileWriter) WriteRaw(b []byte) (err error) { return } -func (ffw *FlvFileWriter) WriteTag(tag *Tag) (err error) { +func (ffw *FlvFileWriter) WriteTag(tag Tag) (err error) { _, err = ffw.fp.Write(tag.Raw) return } diff --git a/pkg/httpflv/httpflv.go b/pkg/httpflv/httpflv.go index bccf9e4..b44fe01 100644 --- a/pkg/httpflv/httpflv.go +++ b/pkg/httpflv/httpflv.go @@ -17,6 +17,8 @@ const ( prevTagFieldSize = 4 ) +var FlvHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00} + var readBufSize = 16384 type LineReader interface { diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index 9ca3064..6a8ab3c 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -158,7 +158,7 @@ func (session *SubSession) Dispose(err error) { atomic.StoreUint32(&session.hasClosedFlag, 1) close(session.exitChan) if err := session.conn.Close(); err != nil { - log.Error("conn close error. [%s] err=%v", session.UniqueKey, err) + log.Errorf("conn close error. [%s] err=%v", session.UniqueKey, err) } }) } diff --git a/pkg/httpflv/tag.go b/pkg/httpflv/tag.go index 3c912d2..6e63107 100644 --- a/pkg/httpflv/tag.go +++ b/pkg/httpflv/tag.go @@ -91,11 +91,11 @@ func IsAACSeqHeader(tag []byte) bool { return tag[0] == TagTypeAudio && tag[TagHeaderSize]>>4 == SoundFormatAAC && tag[TagHeaderSize+1] == AACPacketTypeSeqHeader } -func PackHTTPFlvTag(t uint8, timestamp int, in []byte) []byte { +func PackHTTPFlvTag(t uint8, timestamp uint32, in []byte) []byte { out := make([]byte, TagHeaderSize+len(in)+prevTagSizeFieldSize) out[0] = t bele.BEPutUint24(out[1:], uint32(len(in))) - bele.BEPutUint24(out[4:], uint32(timestamp&0xFFFFFF)) + bele.BEPutUint24(out[4:], timestamp&0xFFFFFF) out[7] = uint8(timestamp >> 24) out[8] = 0 out[9] = 0 diff --git a/app/lal/config.go b/pkg/logic/config.go similarity index 84% rename from app/lal/config.go rename to pkg/logic/config.go index 1400c36..4bc6e81 100644 --- a/app/lal/config.go +++ b/pkg/logic/config.go @@ -1,4 +1,4 @@ -package main +package logic import ( "encoding/json" @@ -7,23 +7,29 @@ import ( ) type Config struct { - RTMP RTMP `json:"rtmp"` - Log log.Config + RTMP RTMP `json:"rtmp"` + Log log.Config `json:"log"` + PProf PProf `json:"pprof"` + // v1.0.0之前不提供 SubIdleTimeout int64 `json:"sub_idle_timeout"` GOPCacheNum int `json:"gop_cache_number"` HTTPFlv HTTPFlv `json:"httpflv"` Pull Pull `json:"pull"` } -type HTTPFlv struct { - SubListenAddr string `json:"sub_listen_addr"` +type RTMP struct { + Addr string `json:"addr"` } -type RTMP struct { +type PProf struct { Addr string `json:"addr"` } +type HTTPFlv struct { + SubListenAddr string `json:"sub_listen_addr"` +} + type Pull struct { Type string `json:"type"` Addr string `json:"addr"` diff --git a/pkg/rtmp/group.go b/pkg/logic/group.go similarity index 61% rename from pkg/rtmp/group.go rename to pkg/logic/group.go index 60bc717..cc7be02 100644 --- a/pkg/rtmp/group.go +++ b/pkg/logic/group.go @@ -1,16 +1,14 @@ -package rtmp +package logic import ( + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/nezha/pkg/log" "github.com/q191201771/nezha/pkg/unique" "sync" "time" ) -type GroupObserver interface { - AVMsgObserver -} - type Group struct { UniqueKey string @@ -19,29 +17,27 @@ type Group struct { exitChan chan struct{} - mutex sync.Mutex - pubSession *ServerSession - pullSession *PullSession - subSessionSet map[*ServerSession]struct{} - obs GroupObserver - //prevAudioHeader *Header - //prevVideoHeader *Header + mutex sync.Mutex + pubSession *rtmp.ServerSession + pullSession *rtmp.PullSession + subSessionSet map[*rtmp.ServerSession]struct{} // TODO chef: metadata []byte avcKeySeqHeader []byte aacSeqHeader []byte - } +var _ rtmp.PubSessionObserver = &Group{} + func NewGroup(appName string, streamName string) *Group { uk := unique.GenUniqueKey("RTMPGROUP") - log.Infof("lifecycle new rtmp.Group. [%s] appName=%s, streamName=%s", uk, appName, streamName) + log.Infof("lifecycle new group. [%s] appName=%s, streamName=%s", uk, appName, streamName) return &Group{ UniqueKey: uk, appName: appName, streamName: streamName, - exitChan: make(chan struct{}, 1), - subSessionSet: make(map[*ServerSession]struct{}), + exitChan: make(chan struct{}, 1), + subSessionSet: make(map[*rtmp.ServerSession]struct{}), } } @@ -59,8 +55,8 @@ func (group *Group) RunLoop() { } } -func (group *Group) Dispose() { - log.Infof("lifecycle dispose rtmp.Group. [%s]", group.UniqueKey) +func (group *Group) Dispose(err error) { + log.Infof("lifecycle dispose group. [%s]", group.UniqueKey) group.exitChan <- struct{}{} group.mutex.Lock() @@ -68,47 +64,55 @@ func (group *Group) Dispose() { if group.pubSession != nil { group.pubSession.Dispose() } - // TODO chef: dispose pull session for session := range group.subSessionSet { session.Dispose() } } -func (group *Group) AddPubSession(session *ServerSession) { +func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool { log.Debugf("add PubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) group.mutex.Lock() if group.pubSession != nil { log.Errorf("PubSession already exist in group. [%s] old=%s, new=%s", group.UniqueKey, group.pubSession.UniqueKey, session.UniqueKey) + return false } group.pubSession = session group.mutex.Unlock() session.SetPubSessionObserver(group) + return true } -func (group *Group) AddSubSession(session *ServerSession) { +func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) { log.Debugf("add SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) group.mutex.Lock() + defer group.mutex.Unlock() group.subSessionSet[session] = struct{}{} - group.mutex.Unlock() // TODO chef: 多长没有拉流session存在的功能 //group.turnToEmptyTick = 0 } -func (group *Group) DelPubSession(session *ServerSession) { +func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) { log.Debugf("del PubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) group.mutex.Lock() + defer group.mutex.Unlock() group.pubSession = nil - group.mutex.Unlock() + group.metadata = nil + group.avcKeySeqHeader = nil + group.aacSeqHeader = nil } -func (group *Group) DelSubSession(session *ServerSession) { +func (group *Group) DelRTMPSubSession(session *rtmp.ServerSession) { log.Debugf("del SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) group.mutex.Lock() + defer group.mutex.Unlock() delete(group.subSessionSet, session) - group.mutex.Unlock() +} + +func (group *Group) AddHTTPFlvSubSession(session *httpflv.SubSession, httpFlvGroup *httpflv.Group) { + panic("not impl") } func (group *Group) Pull(addr string, connectTimeout int64) { @@ -148,39 +152,30 @@ func (group *Group) IsInExist() bool { return group.pubSession != nil } -func (group *Group) SetObserver(obs GroupObserver) { - group.obs = obs -} - // PubSession or PullSession -func (group *Group) ReadRTMPAVMsgCB(header Header, timestampAbs uint32, message []byte) { +func (group *Group) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { group.mutex.Lock() defer group.mutex.Unlock() - group.broadcastRTMP2RTMP(header, timestampAbs, message) - - if group.obs != nil { - group.obs.ReadRTMPAVMsgCB(header, timestampAbs, message) - } } -func (group *Group) broadcastRTMP2RTMP(header Header, timestampAbs uint32, message []byte) { - //log.Infof("%+v", header) +func (group *Group) broadcastRTMP2RTMP(header rtmp.Header, timestampAbs uint32, message []byte) { + log.Infof("%+v", header) // # 1. 设置好头部信息 - var currHeader Header - currHeader.MsgLen = len(message) + var currHeader rtmp.Header + currHeader.MsgLen = uint32(len(message)) currHeader.Timestamp = timestampAbs currHeader.MsgTypeID = header.MsgTypeID - currHeader.MsgStreamID = MSID1 + currHeader.MsgStreamID = rtmp.MSID1 switch header.MsgTypeID { - case TypeidDataMessageAMF0: - currHeader.CSID = CSIDAMF + case rtmp.TypeidDataMessageAMF0: + currHeader.CSID = rtmp.CSIDAMF //prevHeader = nil - case TypeidAudio: - currHeader.CSID = CSIDAudio + case rtmp.TypeidAudio: + currHeader.CSID = rtmp.CSIDAudio //prevHeader = group.prevAudioHeader - case TypeidVideo: - currHeader.CSID = CSIDVideo + case rtmp.TypeidVideo: + currHeader.CSID = rtmp.CSIDVideo //prevHeader = group.prevVideoHeader } @@ -190,11 +185,11 @@ func (group *Group) broadcastRTMP2RTMP(header Header, timestampAbs uint32, messa for session := range group.subSessionSet { // ## 2.1. 一个message广播给多个sub session时,只做一次chunk切割 if absChunks == nil { - absChunks = Message2Chunks(message, &currHeader, LocalChunkSize) + absChunks = rtmp.Message2Chunks(message, &currHeader, rtmp.LocalChunkSize) } // ## 2.2. 如果是新的sub session,发送已缓存的信息 - if session.isFresh { + if session.IsFresh { // 发送缓存的头部信息 if group.metadata != nil { session.AsyncWrite(group.metadata) @@ -205,23 +200,23 @@ func (group *Group) broadcastRTMP2RTMP(header Header, timestampAbs uint32, messa if group.aacSeqHeader != nil { session.AsyncWrite(group.aacSeqHeader) } - session.isFresh = false + session.IsFresh = false } // ## 2.3. 判断当前包的类型,以及sub session的状态,决定是否发送并更新sub session的状态 switch header.MsgTypeID { - case TypeidDataMessageAMF0: + case rtmp.TypeidDataMessageAMF0: session.AsyncWrite(absChunks) - case TypeidAudio: + case rtmp.TypeidAudio: session.AsyncWrite(absChunks) - case TypeidVideo: - if session.waitKeyNalu { + case rtmp.TypeidVideo: + if session.WaitKeyNalu { if message[0] == 0x17 && message[1] == 0x0 { session.AsyncWrite(absChunks) } if message[0] == 0x17 && message[1] == 0x1 { session.AsyncWrite(absChunks) - session.waitKeyNalu = false + session.WaitKeyNalu = false } } else { session.AsyncWrite(absChunks) @@ -234,28 +229,63 @@ func (group *Group) broadcastRTMP2RTMP(header Header, timestampAbs uint32, messa // # 3. 缓存 metadata 和 avc key seq header 和 aac seq header // 由于可能没有订阅者,所以message可能还没做chunk切割,所以这里要做判断是否做chunk切割 switch header.MsgTypeID { - case TypeidDataMessageAMF0: + case rtmp.TypeidDataMessageAMF0: if absChunks == nil { - absChunks = Message2Chunks(message, &currHeader, LocalChunkSize) + absChunks = rtmp.Message2Chunks(message, &currHeader, rtmp.LocalChunkSize) } log.Debugf("cache metadata. [%s]", group.UniqueKey) group.metadata = absChunks - case TypeidVideo: + case rtmp.TypeidVideo: // TODO chef: magic number if message[0] == 0x17 && message[1] == 0x0 { if absChunks == nil { - absChunks = Message2Chunks(message, &currHeader, LocalChunkSize) + absChunks = rtmp.Message2Chunks(message, &currHeader, rtmp.LocalChunkSize) } log.Debugf("cache avc key seq header. [%s]", group.UniqueKey) group.avcKeySeqHeader = absChunks } - case TypeidAudio: + case rtmp.TypeidAudio: if (message[0]>>4) == 0x0a && message[1] == 0x0 { if absChunks == nil { - absChunks = Message2Chunks(message, &currHeader, LocalChunkSize) + absChunks = rtmp.Message2Chunks(message, &currHeader, rtmp.LocalChunkSize) } log.Debugf("cache aac seq header. [%s]", group.UniqueKey) group.aacSeqHeader = absChunks } } } + +func (group *Group) pullIfNeeded() { + panic("not impl") + //if !gm.isInExist() { + // switch gm.config.Pull.Type { + // case "httpflv": + // go gm.httpFlvGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout, gm.config.Pull.ReadTimeout) + // case "rtmp": + // go gm.rtmpGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout) + // } + //} +} + +func (group *Group) isInExist() bool { + panic("not impl") + //return (gm.rtmpGroup != nil && gm.rtmpGroup.IsInExist()) || + // (gm.httpFlvGroup != nil && gm.httpFlvGroup.IsInExist()) +} + +// GroupObserver of httpflv.Group +func (group *Group) ReadHTTPRespHeaderCB() { + // noop +} + +// GroupObserver of httpflv.Group +func (group *Group) ReadFlvHeaderCB(flvHeader []byte) { + // noop +} + +// GroupObserver of httpflv.Group +func (group *Group) ReadFlvTagCB(tag *httpflv.Tag) { + log.Info("ReadFlvTagCB") + + // TODO chef: broadcast to rtmp.Group +} diff --git a/app/lal/lal.go b/pkg/logic/lal.go similarity index 79% rename from app/lal/lal.go rename to pkg/logic/lal.go index 4985353..8e73d75 100644 --- a/app/lal/lal.go +++ b/pkg/logic/lal.go @@ -1,4 +1,4 @@ -package main +package logic import "errors" diff --git a/app/lal/server_manager.go b/pkg/logic/server_manager.go similarity index 51% rename from app/lal/server_manager.go rename to pkg/logic/server_manager.go index 7427bf6..5728c1a 100644 --- a/app/lal/server_manager.go +++ b/pkg/logic/server_manager.go @@ -1,4 +1,4 @@ -package main +package logic import ( "github.com/q191201771/lal/pkg/httpflv" @@ -11,18 +11,20 @@ import ( type ServerManager struct { config *Config - httpFlvServer *httpflv.Server - rtmpServer *rtmp.Server - groupManagerMap map[string]*GroupManager // TODO chef: with appName - mutex sync.Mutex - exitChan chan bool + httpFlvServer *httpflv.Server + rtmpServer *rtmp.Server + groupMap map[string]*Group // TODO chef: with appName + mutex sync.Mutex + exitChan chan struct{} } +var _ rtmp.ServerObserver = &ServerManager{} + func NewServerManager(config *Config) *ServerManager { m := &ServerManager{ - config: config, - groupManagerMap: make(map[string]*GroupManager), - exitChan: make(chan bool), + config: config, + groupMap: make(map[string]*Group), + exitChan: make(chan struct{}), } if len(config.HTTPFlv.SubListenAddr) != 0 { m.httpFlvServer = httpflv.NewServer(m, config.HTTPFlv.SubListenAddr, config.SubIdleTimeout) @@ -62,7 +64,7 @@ func (sm *ServerManager) RunLoop() { count++ if (count % 10) == 0 { sm.mutex.Lock() - log.Infof("group size:%d", len(sm.groupManagerMap)) + log.Infof("group size:%d", len(sm.groupMap)) sm.mutex.Unlock() } } @@ -79,55 +81,67 @@ func (sm *ServerManager) Dispose() { } sm.mutex.Lock() - for _, gm := range sm.groupManagerMap { - gm.Dispose(lalErr) + for _, group := range sm.groupMap { + group.Dispose(lalErr) } - sm.groupManagerMap = nil + sm.groupMap = nil sm.mutex.Unlock() - sm.exitChan <- true + sm.exitChan <- struct{}{} } // ServerObserver of rtmp.Server -func (sm *ServerManager) NewRTMPPubSessionCB(session *rtmp.ServerSession, rtmpGroup *rtmp.Group) bool { - gm := sm.getOrCreateGroupManager(session.AppName, session.StreamName) - return gm.AddRTMPPubSession(session, rtmpGroup) +func (sm *ServerManager) NewRTMPPubSessionCB(session *rtmp.ServerSession) bool { + group := sm.getOrCreateGroup(session.AppName, session.StreamName) + return group.AddRTMPPubSession(session) } // ServerObserver of rtmp.Server -func (sm *ServerManager) NewRTMPSubSessionCB(session *rtmp.ServerSession, rtmpGroup *rtmp.Group) bool { - gm := sm.getOrCreateGroupManager(session.AppName, session.StreamName) - gm.AddRTMPSubSession(session, rtmpGroup) +func (sm *ServerManager) NewRTMPSubSessionCB(session *rtmp.ServerSession) bool { + group := sm.getOrCreateGroup(session.AppName, session.StreamName) + group.AddRTMPSubSession(session) return true } +// ServerObserver of rtmp.Server +func (sm *ServerManager) DelRTMPPubSessionCB(session *rtmp.ServerSession) { + group := sm.getOrCreateGroup(session.AppName, session.StreamName) + group.DelRTMPPubSession(session) +} + +// ServerObserver of rtmp.Server +func (sm *ServerManager) DelRTMPSubSessionCB(session *rtmp.ServerSession) { + group := sm.getOrCreateGroup(session.AppName, session.StreamName) + group.DelRTMPSubSession(session) +} + // ServerObserver of httpflv.Server func (sm *ServerManager) NewHTTPFlvSubSessionCB(session *httpflv.SubSession, httpFlvGroup *httpflv.Group) bool { - gm := sm.getOrCreateGroupManager(session.AppName, session.StreamName) - gm.AddHTTPFlvSubSession(session, httpFlvGroup) + group := sm.getOrCreateGroup(session.AppName, session.StreamName) + group.AddHTTPFlvSubSession(session, httpFlvGroup) return true } func (sm *ServerManager) check() { sm.mutex.Lock() defer sm.mutex.Unlock() - for k, gm := range sm.groupManagerMap { - if gm.IsTotalEmpty() { - log.Infof("erase empty group manager. [%s]", gm.UniqueKey) - gm.Dispose(lalErr) - delete(sm.groupManagerMap, k) + for k, group := range sm.groupMap { + if group.IsTotalEmpty() { + log.Infof("erase empty group manager. [%s]", group.UniqueKey) + group.Dispose(lalErr) + delete(sm.groupMap, k) } } } -func (sm *ServerManager) getOrCreateGroupManager(appName string, streamName string) *GroupManager { +func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group { sm.mutex.Lock() defer sm.mutex.Unlock() - gm, exist := sm.groupManagerMap[streamName] + group, exist := sm.groupMap[streamName] if !exist { - gm = NewGroupManager(appName, streamName, sm.config) - sm.groupManagerMap[streamName] = gm + group = NewGroup(appName, streamName) + sm.groupMap[streamName] = group } - go gm.RunLoop() - return gm + go group.RunLoop() + return group } diff --git a/pkg/logic/trans.go b/pkg/logic/trans.go new file mode 100644 index 0000000..bad0887 --- /dev/null +++ b/pkg/logic/trans.go @@ -0,0 +1,36 @@ +package logic + +import ( + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/rtmp" +) + +var Trans trans + +type trans struct { +} + +//// TODO chef: rtmp msg 也弄成结构体 +//func (t trans) FlvTag2RTMPMsg(tag httpflv.Tag) (header rtmp.Header, timestampAbs uint32, message []byte) { +// header.MsgLen = tag.Header.DataSize +// header.MsgTypeID = tag.Header.T +// header.MsgStreamID = rtmp.MSID1 // TODO +// switch tag.Header.T { +// case httpflv.TagTypeMetadata: +// header.CSID = rtmp.CSIDAMF +// case httpflv.TagTypeAudio: +// header.CSID = rtmp.CSIDAudio +// case httpflv.TagTypeVideo: +// header.CSID = rtmp.CSIDVideo +// } +// return +//} + +func (t trans) RTMPMsg2FlvTag(header rtmp.Header, timestampAbs uint32, message []byte) httpflv.Tag { + var tag httpflv.Tag + tag.Header.T = header.MsgTypeID + tag.Header.DataSize = header.MsgLen + tag.Header.Timestamp = timestampAbs + tag.Raw = httpflv.PackHTTPFlvTag(header.MsgTypeID, timestampAbs, message) + return tag +} \ No newline at end of file diff --git a/pkg/rtmp/amf0.go b/pkg/rtmp/amf0.go index 3c04325..6904358 100644 --- a/pkg/rtmp/amf0.go +++ b/pkg/rtmp/amf0.go @@ -12,8 +12,8 @@ import ( ) var ( - ErrAMFInvalidType = errors.New("lal.AMF0: invalid type.") - ErrAMFTooShort = errors.New("lal.AMF0: too short.") + ErrAMFInvalidType = errors.New("lal.AMF0: invalid type") + ErrAMFTooShort = errors.New("lal.AMF0: too short") ) const ( @@ -117,7 +117,7 @@ func (amf0) WriteObject(writer io.Writer, objs []ObjectPair) error { return err } default: - // TODO chef: if other types. + // TODO chef: 换成 nezha log panic panic(objs[i].value) } } @@ -251,7 +251,7 @@ func (amf0) ReadObject(b []byte) (map[string]interface{}, int, error) { obj[k] = v index += l default: - // TODO chef: if other types. + // TODO chef: 换成 nezha log panic panic(vt) } } diff --git a/pkg/rtmp/amf0_test.go b/pkg/rtmp/amf0_test.go index 375424b..b55483f 100644 --- a/pkg/rtmp/amf0_test.go +++ b/pkg/rtmp/amf0_test.go @@ -262,4 +262,4 @@ func BenchmarkAmf0_WriteObject(b *testing.B) { for i := 0; i < b.N; i++ { _ = AMF0.WriteObject(out, objs) } -} \ No newline at end of file +} diff --git a/pkg/rtmp/chunk_composer.go b/pkg/rtmp/chunk_composer.go index 4df74e4..7c4d8e5 100644 --- a/pkg/rtmp/chunk_composer.go +++ b/pkg/rtmp/chunk_composer.go @@ -10,7 +10,7 @@ import ( ) type ChunkComposer struct { - peerChunkSize int + peerChunkSize uint32 csid2stream map[int]*Stream } @@ -21,11 +21,11 @@ func NewChunkComposer() *ChunkComposer { } } -func (c *ChunkComposer) SetPeerChunkSize(val int) { +func (c *ChunkComposer) SetPeerChunkSize(val uint32) { c.peerChunkSize = val } -func (c *ChunkComposer) GetPeerChunkSize() int { +func (c *ChunkComposer) GetPeerChunkSize() uint32 { return c.peerChunkSize } @@ -69,8 +69,8 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { // 包头中为绝对时间戳 stream.header.Timestamp = bele.BEUint24(bootstrap) stream.timestampAbs = stream.header.Timestamp - stream.header.MsgLen = int(bele.BEUint24(bootstrap[3:])) - stream.header.MsgTypeID = int(bootstrap[6]) + stream.header.MsgLen = bele.BEUint24(bootstrap[3:]) + stream.header.MsgTypeID = bootstrap[6] stream.header.MsgStreamID = int(bele.LEUint32(bootstrap[7:])) stream.msg.reserve(stream.header.MsgLen) @@ -81,8 +81,8 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { // 包头中为相对时间戳 stream.header.Timestamp = bele.BEUint24(bootstrap) stream.timestampAbs += stream.header.Timestamp - stream.header.MsgLen = int(bele.BEUint24(bootstrap[3:])) - stream.header.MsgTypeID = int(bootstrap[6]) + stream.header.MsgLen = bele.BEUint24(bootstrap[3:]) + stream.header.MsgTypeID = bootstrap[6] stream.msg.reserve(stream.header.MsgLen) case 2: @@ -100,7 +100,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { // 5.3.1.3 Extended Timestamp // 使用ffmpeg推流时,发现时间戳超过3字节最大值后,即使是fmt3(即包头大小为0),依然存在ext ts字段 // 所以这里我将 `==` 的判断改成了 `>=` - // TODO 测试其他客户端和ext ts相关的表现 + // TODO chef: 测试其他客户端和ext ts相关的表现 //if stream.header.Timestamp == maxTimestampInMessageHeader { if stream.header.Timestamp >= maxTimestampInMessageHeader { if _, err := io.ReadAtLeast(reader, bootstrap[:4], 4); err != nil { @@ -119,9 +119,9 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { } } //stream.header.CSID = csid - //log.Debugf("CHEFGREPME tag1 fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.msgLen, stream.timestampAbs) + //log.Debugf("CHEFGREPME tag1 fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.timestampAbs) - var neededSize int + var neededSize uint32 if stream.header.MsgLen <= c.peerChunkSize { neededSize = stream.header.MsgLen } else { @@ -132,7 +132,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { } //stream.msg.reserve(neededSize) - if _, err := io.ReadAtLeast(reader, stream.msg.buf[stream.msg.e:stream.msg.e+neededSize], neededSize); err != nil { + if _, err := io.ReadAtLeast(reader, stream.msg.buf[stream.msg.e:stream.msg.e+neededSize], int(neededSize)); err != nil { return err } stream.msg.produced(neededSize) @@ -140,7 +140,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { if stream.msg.len() == stream.header.MsgLen { // 对端设置了chunk size if stream.header.MsgTypeID == typeidSetChunkSize { - val := int(bele.BEUint32(stream.msg.buf)) + val := bele.BEUint32(stream.msg.buf) c.SetPeerChunkSize(val) } diff --git a/pkg/rtmp/chunk_divider.go b/pkg/rtmp/chunk_divider.go index 5f673f5..7e2f5df 100644 --- a/pkg/rtmp/chunk_divider.go +++ b/pkg/rtmp/chunk_divider.go @@ -8,15 +8,14 @@ import ( "github.com/q191201771/nezha/pkg/bele" ) -// TODO chef: 新的message的第一个chunk始终使用fmt0格式,不参考前一个message +// TODO chef: 新的message的第一个chunk始终使用fmt0格式,没有参考前一个message func Message2Chunks(message []byte, header *Header, chunkSize int) []byte { return message2Chunks(message, header, nil, chunkSize) } -// TODO chef: 返回值直接传入 -func calcHeader(header *Header, prevHeader *Header) []byte { +// @param 返回头的大小 +func calcHeader(header *Header, prevHeader *Header, out []byte) int { var index int - out := make([]byte, 16) // 计算fmt和timestamp fmt := uint8(0) @@ -69,9 +68,9 @@ func calcHeader(header *Header, prevHeader *Header) []byte { index += 3 if fmt <= 1 { - bele.BEPutUint24(out[index:], uint32(header.MsgLen)) + bele.BEPutUint24(out[index:], header.MsgLen) index += 3 - out[index] = uint8(header.MsgTypeID) + out[index] = header.MsgTypeID index++ if fmt == 0 { @@ -88,7 +87,7 @@ func calcHeader(header *Header, prevHeader *Header) []byte { index += 4 } - return out[0:index] + return index } func message2Chunks(message []byte, header *Header, prevHeader *Header, chunkSize int) []byte { @@ -112,9 +111,8 @@ func message2Chunks(message []byte, header *Header, prevHeader *Header, chunkSiz // NOTICE 和srs交互时,发现srs要求message中的非第一个chunk不能使用fmt0 // 将message切割成chunk放入chunk body中 for i := 0; i < numOfChunk; i++ { - head := calcHeader(header, prevHeader) - copy(out[index:], head) - index += len(head) + headLen := calcHeader(header, prevHeader, out[index:]) + index += headLen if i != numOfChunk-1 { copy(out[index:], message[i*chunkSize:i*chunkSize+chunkSize]) diff --git a/pkg/rtmp/client_push_session.go b/pkg/rtmp/client_push_session.go index c07e88f..42c0018 100644 --- a/pkg/rtmp/client_push_session.go +++ b/pkg/rtmp/client_push_session.go @@ -23,5 +23,3 @@ func NewPushSession(timeout PushSessionTimeout) *PushSession { func (s *PushSession) Push(rawURL string) error { return s.Do(rawURL) } - -// TODO chef: add function to write av data diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 9802178..a65a4ac 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -189,7 +189,6 @@ func (s *ClientSession) doDataMessageAMF0(stream *Stream) error { case "|RtmpSampleAccess": // TODO chef: handle this? return nil default: - // TODO chef: log.Error(val) log.Error(hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e])) } @@ -298,7 +297,7 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error { return err } case CSTPushSession: - log.Infof("<----- publish('%s')", s.streamNameWithRawQuery, s.UniqueKey) + log.Infof("<----- publish('%s'). [%s]", s.streamNameWithRawQuery, s.UniqueKey) if err := s.packer.writePublish(s.Conn, s.appName, s.streamNameWithRawQuery, sid); err != nil { return err } @@ -356,7 +355,7 @@ func (s *ClientSession) parseURL(rawURL string) error { } else { s.streamNameWithRawQuery = s.streamName + "?" + s.url.RawQuery } - log.Debugf("%s %s %s %+v", s.tcURL, s.appName, s.streamNameWithRawQuery, *s.url) + log.Debugf("parseURL. [%s] %s %s %s %+v", s.UniqueKey, s.tcURL, s.appName, s.streamNameWithRawQuery, *s.url) return nil } @@ -393,7 +392,6 @@ func (s *ClientSession) tcpConnect() error { return err } - // TODO chef: 超时由接口设置 s.Conn = connection.New(conn, connection.Config{ ReadBufSize: readBufSize, }) diff --git a/pkg/rtmp/handshake.go b/pkg/rtmp/handshake.go index 6cb4ca2..6191282 100644 --- a/pkg/rtmp/handshake.go +++ b/pkg/rtmp/handshake.go @@ -10,10 +10,6 @@ import ( "time" ) -// TODO chef: doc - -// TODO chef: HandshakeClient with complex mode - const version = uint8(3) const ( @@ -65,9 +61,12 @@ var random1528Buf []byte type HandshakeClient interface { WriteC0C1(writer io.Writer) error ReadS0S1S2(reader io.Reader) error - WriterC2(writer io.Writer) error + WriteC2(writer io.Writer) error } +var _ HandshakeClient = &HandshakeClientSimple{} +var _ HandshakeClient = &HandshakeClientComplex{} + type HandshakeClientSimple struct { c0c1 []byte c2 []byte @@ -131,7 +130,6 @@ func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error { return err } - func (c *HandshakeClientSimple) ReadS0S1S2(reader io.Reader) error { s0s1s2 := make([]byte, s0s1s2Len) if _, err := io.ReadAtLeast(reader, s0s1s2, s0s1s2Len); err != nil { @@ -151,7 +149,6 @@ func (c *HandshakeClientSimple) WriteC2(write io.Writer) error { return err } - func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error) { c0c1 := make([]byte, c0c1Len) if _, err = io.ReadAtLeast(reader, c0c1, c0c1Len); err != nil { @@ -280,7 +277,6 @@ func random1528(out []byte) { } func init() { - // TODO chef: hack lal in bs := []byte{'l', 'a', 'l'} bsl := len(bs) random1528Buf = make([]byte, 1528) diff --git a/pkg/rtmp/message_packer.go b/pkg/rtmp/message_packer.go index 12f51ca..cc35843 100644 --- a/pkg/rtmp/message_packer.go +++ b/pkg/rtmp/message_packer.go @@ -28,7 +28,7 @@ func NewMessagePacker() *MessagePacker { } } -func (packer *MessagePacker) writeMessageHeader(csid int, bodyLen int, typeID int, streamID int) { +func (packer *MessagePacker) writeMessageHeader(csid int, bodyLen int, typeID uint8, streamID int) { // 目前这个函数只供发送信令时调用,信令的 csid 都是小于等于 63 的,如果传入的 csid 大于 63,直接 panic if csid > 63 { panic(csid) @@ -38,11 +38,11 @@ func (packer *MessagePacker) writeMessageHeader(csid int, bodyLen int, typeID in // 0 0 0 是时间戳 _, _ = packer.b.Write([]byte{uint8(fmt<<6 | csid), 0, 0, 0}) _ = bele.WriteBEUint24(packer.b, uint32(bodyLen)) - _, _ = packer.b.Write([]byte{uint8(typeID)}) + _ = packer.b.WriteByte(typeID) _ = bele.WriteLE(packer.b, uint32(streamID)) } -func (packer *MessagePacker) writeProtocolControlMessage(writer io.Writer, typeID int, val int) error { +func (packer *MessagePacker) writeProtocolControlMessage(writer io.Writer, typeID uint8, val int) error { packer.writeMessageHeader(csidProtocolControl, 4, typeID, 0) _ = bele.WriteBE(packer.b, uint32(val)) _, err := packer.b.WriteTo(writer) @@ -70,7 +70,6 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL strin _ = AMF0.WriteString(packer.b, "connect") _ = AMF0.WriteNumber(packer.b, float64(tidClientConnect)) - // TODO chef: hack lal in objs := []ObjectPair{ {key: "app", value: appName}, {key: "type", value: "nonprivate"}, diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/rtmp.go index 4181fa4..85925b7 100644 --- a/pkg/rtmp/rtmp.go +++ b/pkg/rtmp/rtmp.go @@ -1,7 +1,15 @@ package rtmp -import ( - "errors" +import "errors" + +// 一些更专业的配置项,暂时只在该源码文件中配置,不提供外部配置接口 +var ( + readBufSize = 4096 + writeBufSize = 4096 + wChanSize = 1024 + windowAcknowledgementSize = 5000000 + peerBandwidth = 5000000 + LocalChunkSize = 4096 // 本端设置的chunk size ) var rtmpErr = errors.New("rtmp: fxxk") @@ -19,15 +27,15 @@ const ( ) const ( - TypeidAudio = 8 - TypeidVideo = 9 - TypeidDataMessageAMF0 = 18 // meta - typeidSetChunkSize = 1 - typeidAck = 3 - typeidUserControl = 4 - typeidWinAckSize = 5 - typeidBandwidth = 6 - typeidCommandMessageAMF0 = 20 + TypeidAudio = uint8(8) + TypeidVideo = uint8(9) + TypeidDataMessageAMF0 = uint8(18) // meta + typeidSetChunkSize = uint8(1) + typeidAck = uint8(3) + typeidUserControl = uint8(4) + typeidWinAckSize = uint8(5) + typeidBandwidth = uint8(6) + typeidCommandMessageAMF0 = uint8(20) ) const ( @@ -49,21 +57,10 @@ const ( MSID1 = 1 // publish、play、onStatus 以及 音视频数据 ) -// TODO chef -var ( - readBufSize = 4096 - writeBufSize = 4096 - wChanSize = 1024 -) - -var windowAcknowledgementSize = 5000000 -var peerBandwidth = 5000000 -var LocalChunkSize = 4096 // 本端设置的chunk size - // 接收到音视频类型数据时的回调函数。目前被PullSession以及PubSession使用。 type AVMsgObserver interface { // @param header: // @param timestampAbs: 绝对时间戳 - // @param message: 不包含头内容。回调结束后,PullSession会继续使用这块内存。 + // @param message: 不包含头内容。回调结束后,PubSession 会继续使用这块内存。 ReadRTMPAVMsgCB(header Header, timestampAbs uint32, message []byte) } diff --git a/pkg/rtmp/server.go b/pkg/rtmp/server.go index a2380d8..e677359 100644 --- a/pkg/rtmp/server.go +++ b/pkg/rtmp/server.go @@ -3,29 +3,25 @@ package rtmp import ( "github.com/q191201771/nezha/pkg/log" "net" - "sync" ) type ServerObserver interface { - NewRTMPPubSessionCB(session *ServerSession, group *Group) bool // 返回true则允许推流,返回false则强制关闭这个连接 - NewRTMPSubSessionCB(session *ServerSession, group *Group) bool // 返回true则允许拉流,返回false则强制关闭这个连接 + NewRTMPPubSessionCB(session *ServerSession) bool // 返回true则允许推流,返回false则强制关闭这个连接 + NewRTMPSubSessionCB(session *ServerSession) bool // 返回true则允许拉流,返回false则强制关闭这个连接 + DelRTMPPubSessionCB(session *ServerSession) + DelRTMPSubSessionCB(session *ServerSession) } type Server struct { obs ServerObserver addr string ln net.Listener - - mutex sync.Mutex - groupMap map[string]*Group - // TODO chef: 清除空的group } func NewServer(obs ServerObserver, addr string) *Server { return &Server{ - obs: obs, - addr: addr, - groupMap: make(map[string]*Group), + obs: obs, + addr: addr, } } @@ -49,11 +45,6 @@ func (server *Server) Dispose() { if err := server.ln.Close(); err != nil { log.Error(err) } - server.mutex.Lock() - defer server.mutex.Unlock() - for _, g := range server.groupMap { - g.Dispose() - } } func (server *Server) handleTCPConnect(conn net.Conn) { @@ -64,60 +55,25 @@ func (server *Server) handleTCPConnect(conn net.Conn) { case ServerSessionTypeUnknown: // noop case ServerSessionTypePub: - server.DelRTMPPubSession(session) + server.obs.DelRTMPPubSessionCB(session) case ServerSessionTypeSub: - server.DelRTMPSubSession(session) + server.obs.DelRTMPSubSessionCB(session) } } // ServerSessionObserver func (server *Server) NewRTMPPubSessionCB(session *ServerSession) { - group := server.getOrCreateGroup(session.AppName, session.StreamName) - - if !server.obs.NewRTMPPubSessionCB(session, group) { + if !server.obs.NewRTMPPubSessionCB(session) { log.Warnf("dispose PubSession since pub exist.") session.Dispose() return } - group.AddPubSession(session) } // ServerSessionObserver func (server *Server) NewRTMPSubSessionCB(session *ServerSession) { - group := server.getOrCreateGroup(session.AppName, session.StreamName) - - if !server.obs.NewRTMPSubSessionCB(session, group) { + if !server.obs.NewRTMPSubSessionCB(session) { // TODO chef: 关闭这个连接 return } - group.AddSubSession(session) -} - -func (server *Server) DelRTMPPubSession(session *ServerSession) { - group := server.getOrCreateGroup(session.AppName, session.StreamName) - - // TODO chef: obs - - group.DelPubSession(session) -} - -func (server *Server) DelRTMPSubSession(session *ServerSession) { - group := server.getOrCreateGroup(session.AppName, session.StreamName) - - // TODO chef: obs - - group.DelSubSession(session) -} - -func (server *Server) getOrCreateGroup(appName string, streamName string) *Group { - server.mutex.Lock() - defer server.mutex.Unlock() - - group, exist := server.groupMap[streamName] - if !exist { - group = NewGroup(appName, streamName) - server.groupMap[streamName] = group - } - go group.RunLoop() - return group } diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index 2f6dc2e..0f8b4ee 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -19,10 +19,10 @@ import ( type ServerSessionObserver interface { NewRTMPPubSessionCB(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听 NewRTMPSubSessionCB(session *ServerSession) - //DelRTMPPubSessionCB(session *PubSession) - //DelRTMPSubSessionCB(session *SubSession) } +var _ ServerSessionObserver = &Server{} + type PubSessionObserver interface { AVMsgObserver } @@ -63,13 +63,13 @@ type ServerSession struct { avObs PubSessionObserver // only for SubSession - isFresh bool - waitKeyNalu bool + IsFresh bool + WaitKeyNalu bool } func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession { uk := unique.GenUniqueKey("RTMPPUBSUB") - log.Infof("lifecycle new rtmp.ServerSession. [%s]", uk) + log.Infof("lifecycle new rtmp server session. [%s]", uk) return &ServerSession{ UniqueKey: uk, obs: obs, @@ -81,8 +81,8 @@ func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession { wb: bufio.NewWriterSize(conn, writeBufSize), wChan: make(chan []byte, wChanSize), exitChan: make(chan struct{}), - isFresh: true, - waitKeyNalu: true, + IsFresh: true, + WaitKeyNalu: true, } } @@ -101,7 +101,7 @@ func (s *ServerSession) RunLoop() (err error) { } func (s *ServerSession) Dispose() { - log.Infof("lifecycle dispose rtmp.ServerSession. [%s]", s.UniqueKey) + log.Infof("lifecycle dispose rtmp server session. [%s]", s.UniqueKey) if atomic.LoadUint32(&s.hasClosedFlag) == 1 { return } @@ -177,7 +177,8 @@ func (s *ServerSession) doMsg(stream *Stream) error { //log.Debugf("%d %d %v", stream.header.msgTypeID, stream.msgLen, stream.header) switch stream.header.MsgTypeID { case typeidSetChunkSize: - // TODO chef: + // noop + // 因为底层的 chunk composer 已经处理过了,这里就不用处理 case typeidCommandMessageAMF0: return s.doCommandMessage(stream) case TypeidDataMessageAMF0: @@ -218,7 +219,8 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error { } switch val { - case "|RtmpSampleAccess": // TODO chef: handle this? + case "|RtmpSampleAccess": + log.Warn("recv |RtmpSampleAccess. ignore it.") return nil case "@setDataFrame": // macos obs @@ -234,9 +236,7 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error { case "onMetaData": // noop default: - // TODO chef: - log.Error(val) - log.Error(hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e])) + log.Errorf("recv unknown message. val=%s, hex=%s", val, hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e])) return nil } @@ -338,9 +338,8 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) { log.Debugf("[%s] pubType=%s", s.UniqueKey, pubType) log.Infof("-----> publish('%s') [%s]", s.StreamName, s.UniqueKey) - // TODO chef: hardcode streamID log.Infof("<---- onStatus('NetStream.Publish.Start'). [%s]", s.UniqueKey) - if err := s.packer.writeOnStatusPublish(s.conn, 1); err != nil { + if err := s.packer.writeOnStatusPublish(s.conn, MSID1); err != nil { return err } s.t = ServerSessionTypePub @@ -363,7 +362,7 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) { // TODO chef: start duration reset log.Infof("<----onStatus('NetStream.Play.Start'). [%s]", s.UniqueKey) - if err := s.packer.writeOnStatusPlay(s.conn, 1); err != nil { + if err := s.packer.writeOnStatusPlay(s.conn, MSID1); err != nil { return err } s.t = ServerSessionTypeSub diff --git a/pkg/rtmp/stream.go b/pkg/rtmp/stream.go index 448f34d..3e514bd 100644 --- a/pkg/rtmp/stream.go +++ b/pkg/rtmp/stream.go @@ -6,20 +6,20 @@ const initMsgLen = 4096 type Header struct { CSID int - MsgLen int + MsgLen uint32 // NOTICE 是header中的时间戳,可能是绝对的,也可能是相对的。 // 如果需要绝对时间戳,应该使用Stream中的timestampAbs Timestamp uint32 - MsgTypeID int // 8 audio 9 video 18 metadata + MsgTypeID uint8 // 8 audio 9 video 18 metadata MsgStreamID int } type StreamMsg struct { buf []byte - b int - e int + b uint32 + e uint32 } type Stream struct { @@ -37,29 +37,30 @@ func NewStream() *Stream { } } -func (msg *StreamMsg) reserve(n int) { - nn := cap(msg.buf) - msg.e +func (msg *StreamMsg) reserve(n uint32) { + bufCap := uint32(cap(msg.buf)) + nn := bufCap - msg.e if nn > n { return } for nn < n { nn <<= 1 } - nb := make([]byte, cap(msg.buf)+nn) + nb := make([]byte, bufCap+nn) copy(nb, msg.buf[msg.b:msg.e]) msg.buf = nb log.Debugf("reserve. need:%d left:%d %d %d", n, nn, len(msg.buf), cap(msg.buf)) } -func (msg *StreamMsg) len() int { +func (msg *StreamMsg) len() uint32 { return msg.e - msg.b } -func (msg *StreamMsg) produced(n int) { +func (msg *StreamMsg) produced(n uint32) { msg.e += n } -func (msg *StreamMsg) consumed(n int) { +func (msg *StreamMsg) consumed(n uint32) { msg.b += n } @@ -76,7 +77,7 @@ func (msg *StreamMsg) peekStringWithType() (string, error) { func (msg *StreamMsg) readStringWithType() (string, error) { str, l, err := AMF0.ReadString(msg.buf[msg.b:msg.e]) if err == nil { - msg.consumed(l) + msg.consumed(uint32(l)) } return str, err } @@ -84,7 +85,7 @@ func (msg *StreamMsg) readStringWithType() (string, error) { func (msg *StreamMsg) readNumberWithType() (int, error) { val, l, err := AMF0.ReadNumber(msg.buf[msg.b:msg.e]) if err == nil { - msg.consumed(l) + msg.consumed(uint32(l)) } return int(val), err } @@ -92,7 +93,7 @@ func (msg *StreamMsg) readNumberWithType() (int, error) { func (msg *StreamMsg) readObjectWithType() (map[string]interface{}, error) { obj, l, err := AMF0.ReadObject(msg.buf[msg.b:msg.e]) if err == nil { - msg.consumed(l) + msg.consumed(uint32(l)) } return obj, err } @@ -100,7 +101,7 @@ func (msg *StreamMsg) readObjectWithType() (map[string]interface{}, error) { func (msg *StreamMsg) readNull() error { l, err := AMF0.ReadNull(msg.buf[msg.b:msg.e]) if err == nil { - msg.consumed(l) + msg.consumed(uint32(l)) } return err }