修改内容如下:

- [结构调整] 将 app/lal 的部分代码抽离到 pkg/logic 中,使得其他 app 可以使用
- [结构调整] 将协议层 rtmp.Group 和 应用层 app/lal 中的 GroupManager 合并为 逻辑层 pkg/logic 的 Group,以后只在逻辑层维护一个 Group,用于处理各种具体协议的输入输出流的挂载
- [功能] pkg/logic 中新增 trans.go: RTMPMsg2FlvTag
- [功能] PubSession 退出时,清空缓存的 meta、avc header、aac header
- [功能] PubSession 已经存在时,后续再连接的 Pub 直接关闭掉
- [功能] app/rtmppull 存储为flv文件
- [优化] chunk divider: calcHeader 在原地计算
- [其他] rtmp 中所有 typeid 相关的类型 int -> uint8,msgLen 相关的类型 int -> uint32
- [其他] 更新 nezha,新版本的日志库
- [其他] 整理日志
- [其他] pprof web 地址放入配置文件中
- [测试] 使用一些开源工具对 app/lal 做推流、拉流测试
pull/200/head
q191201771 5 years ago
parent ffd623bd48
commit 7f1a48ce9b

@ -1,4 +1,4 @@
**v0.1.0**
#### v0.1.0
- /app/flvfile2rtmppush 优化推流平稳性
- bugfix rtmp 推拉流信令时可以携带 url 参数,并且在做上下行匹配时去掉 url 参数
@ -14,6 +14,6 @@
- 更新 nezha 0.1.0
- errors.PanicIfErrorOccur -> log.FatalIfErrorNotNil
**v0.0.1**
#### v0.0.1
1. 提供 `/app/flvfile2rtmppush` 给业务方使用

@ -32,11 +32,11 @@ Go语言编写的流媒体 库 / 客户端 / 服务端
```
app/ ......各种main包的源码文件一个子目录对应一个main包即对应可生成一个可执行文件
|-- lal/ ......[最重要的] 流媒体服务器
|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件
|-- flvfile2rtmppush ......rtmp推流客户端输入是本地flv文件文件推送完毕后可循环推送rtmp push流并不断开
|-- rtmppull ......rtmp拉流客户端
|-- httpflvpull ......http-flv拉流客户端
|-- modflvfile ......修改本地flv文件
|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件
|-- rtmppull ......rtmp拉流客户端存储为本地flv文件
pkg/ ......源码包
|-- aac/ ......音频aac编解码格式相关
|-- avc/ ......视频avc h264编解码格式相关
@ -67,14 +67,31 @@ $git clone https://github.com/q191201771/lal.git && cd lal && ./build.sh
"addr": ":19350" // rtmp服务监听的端口
},
"log": {
"level": 0, // 日志级别0 debug 1 info 2 warn 3 error
"level": 1, // 日志级别1 debug, 2 info, 3 warn, 4 error, 5 fatal
"filename": "./logs/lal.log", // 日志输出文件
"is_to_stdout": true, // 是否打印至标志控制台输出
"rotate_mbyte": 1024 // 日志按大小翻滚
"is_rotate_daily": true, // 日志按天翻滚
"short_file_flag": true // 日志末尾是否携带源码文件名以及行号的信息
}
}
```
### 测试过的客户端
```
推流端:
- OBS 21.0.3(mac)
- ffmpeg 3.4.2(mac)
- srs-bench (srs项目配套的一个压测工具)
- flvfile2rtmppush (lal app中的rtmp推流客户端)
拉流端:
- VLC 2.2.6(mac)
- MPV 0.29.1(mac)
- ffmpeg 3.4.2(mac)
- srs-bench (srs项目配套的一个压测工具)
```
#### roadmap
有建议、意见、bug、功能等等欢迎提 issue 啊100% 会回复的。

@ -68,9 +68,9 @@ func main() {
// TODO chef: 转换代码放入lal某个包中
var h rtmp.Header
h.MsgLen = int(tag.Header.DataSize) //len(tag.Raw)-httpflv.TagHeaderSize
h.MsgLen = tag.Header.DataSize //len(tag.Raw)-httpflv.TagHeaderSize
h.MsgTypeID = int(tag.Header.T)
h.MsgTypeID = tag.Header.T
h.MsgStreamID = rtmp.MSID1
switch tag.Header.T {
case httpflv.TagTypeMetadata:

@ -1,131 +0,0 @@
package main
import (
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/nezha/pkg/log"
"github.com/q191201771/nezha/pkg/unique"
"sync"
)
// TODO chef: 没有sub了一定时间后停止pull
type GroupManager struct {
config *Config
appName string
streamName string
exitChan chan struct{}
rtmpGroup *rtmp.Group
httpFlvGroup *httpflv.Group
mutex sync.Mutex
UniqueKey string
}
func NewGroupManager(appName string, streamName string, config *Config) *GroupManager {
uk := unique.GenUniqueKey("GROUPMANAGER")
log.Infof("lifecycle new lal.GroupManager. [%s] appName=%s streamName=%s", uk, appName, streamName)
return &GroupManager{
config: config,
appName: appName,
streamName: streamName,
exitChan: make(chan struct{}),
UniqueKey: uk,
}
}
func (gm *GroupManager) RunLoop() {
<- gm.exitChan
}
func (gm *GroupManager) Dispose(err error) {
log.Infof("lifecycle dispose lal.GroupManager. [%s] reason=%v", gm.UniqueKey, err)
gm.exitChan <- struct{}{}
}
// 返回true则允许推流返回false则关闭连接
func (gm *GroupManager) AddRTMPPubSession(session *rtmp.ServerSession, rtmpGroup *rtmp.Group) bool {
gm.attachRTMPGroup(rtmpGroup)
return !gm.isInExist()
}
func (gm *GroupManager) AddRTMPSubSession(session *rtmp.ServerSession, rtmpGroup *rtmp.Group) {
gm.attachRTMPGroup(rtmpGroup)
gm.pullIfNeeded()
}
func (gm *GroupManager) AddHTTPFlvSubSession(session *httpflv.SubSession, httpFlvGroup *httpflv.Group) {
gm.attachHTTPFlvGroup(httpFlvGroup)
gm.pullIfNeeded()
}
func (gm *GroupManager) IsTotalEmpty() bool {
gm.mutex.Lock()
defer gm.mutex.Unlock()
return (gm.rtmpGroup == nil || gm.rtmpGroup.IsTotalEmpty()) &&
(gm.httpFlvGroup == nil || gm.httpFlvGroup.IsTotalEmpty())
}
// GroupObserver of rtmp.Group
func (gm *GroupManager) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) {
// TODO chef: broadcast to httpflv.Group
}
// GroupObserver of httpflv.Group
func (gm *GroupManager) ReadHTTPRespHeaderCB() {
// noop
}
// GroupObserver of httpflv.Group
func (gm *GroupManager) ReadFlvHeaderCB(flvHeader []byte) {
// noop
}
// GroupObserver of httpflv.Group
func (gm *GroupManager) ReadFlvTagCB(tag *httpflv.Tag) {
log.Info("ReadFlvTagCB")
// TODO chef: broadcast to rtmp.Group
}
func (gm *GroupManager) attachRTMPGroup(rtmpGroup *rtmp.Group) {
gm.mutex.Lock()
defer gm.mutex.Unlock()
if gm.rtmpGroup != nil && gm.rtmpGroup != rtmpGroup {
log.Warnf("duplicate rtmp group in group manager. %+v %+v", gm.rtmpGroup, rtmpGroup)
}
gm.rtmpGroup = rtmpGroup
rtmpGroup.SetObserver(gm)
}
func (gm *GroupManager) attachHTTPFlvGroup(httpFlvGroup *httpflv.Group) {
gm.mutex.Lock()
defer gm.mutex.Unlock()
if gm.httpFlvGroup != nil && gm.httpFlvGroup != httpFlvGroup {
log.Warnf("duplicate http flv group in group manager. %+v %+v", gm.httpFlvGroup, httpFlvGroup)
}
gm.httpFlvGroup = httpFlvGroup
httpFlvGroup.SetObserver(gm)
}
func (gm *GroupManager) pullIfNeeded() {
if !gm.isInExist() {
switch gm.config.Pull.Type {
case "httpflv":
go gm.httpFlvGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout, gm.config.Pull.ReadTimeout)
case "rtmp":
go gm.rtmpGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout)
}
}
}
func (gm *GroupManager) isInExist() bool {
return (gm.rtmpGroup != nil && gm.rtmpGroup.IsInExist()) ||
(gm.httpFlvGroup != nil && gm.httpFlvGroup.IsInExist())
}

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/q191201771/nezha/pkg/bininfo"
"github.com/q191201771/nezha/pkg/log"
"github.com/q191201771/lal/pkg/logic"
"net/http"
_ "net/http/pprof"
"os"
@ -12,7 +13,7 @@ import (
"syscall"
)
var sm *ServerManager
var sm *logic.ServerManager
func main() {
confFile := parseFlag()
@ -20,9 +21,11 @@ func main() {
initLog(config.Log)
log.Infof("bininfo: %s", bininfo.StringifySingleLine())
sm = NewServerManager(config)
sm = logic.NewServerManager(config)
go runWebPProf()
if config.PProf.Addr != "" {
go runWebPProf(config.PProf.Addr)
}
go runSignalHandler()
sm.RunLoop()
@ -43,16 +46,8 @@ func parseFlag() string {
return *cf
}
func initLog(config log.Config) {
if err := log.Init(config); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "initial log failed. err=%+v", err)
os.Exit(1)
}
log.Info("initial log succ.")
}
func loadConf(confFile string) *Config {
config, err := LoadConf(confFile)
func loadConf(confFile string) *logic.Config {
config, err := logic.LoadConf(confFile)
if err != nil {
log.Errorf("load conf failed. file=%s err=%+v", confFile, err)
os.Exit(1)
@ -61,9 +56,15 @@ func loadConf(confFile string) *Config {
return config
}
func runWebPProf() {
// TODO chef: config me
addr := ":10001"
func initLog(config log.Config) {
if err := log.Init(config); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "initial log failed. err=%+v", err)
os.Exit(1)
}
log.Info("initial log succ.")
}
func runWebPProf(addr string) {
log.Infof("start web pprof listen. addr=%s", addr)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Error(err)

@ -2,38 +2,52 @@ package main
import (
"flag"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/nezha/pkg/log"
"os"
)
type Obs struct {
w httpflv.FlvFileWriter
}
func (obs Obs) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) {
func (obs *Obs) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) {
log.Infof("%+v, abs ts=%d", header, timestampAbs)
tag := logic.Trans.RTMPMsg2FlvTag(header, timestampAbs, message)
err := obs.w.WriteTag(tag)
log.FatalIfErrorNotNil(err)
}
func main() {
url := parseFlag()
url, outFileName := parseFlag()
var obs Obs
session := rtmp.NewPullSession(obs, rtmp.PullSessionTimeout{
session := rtmp.NewPullSession(&obs, rtmp.PullSessionTimeout{
ConnectTimeoutMS: 3000,
PullTimeoutMS: 5000,
ReadAVTimeoutMS: 10000,
})
err := session.Pull(url)
log.FatalIfErrorNotNil(err)
err = obs.w.Open(outFileName)
log.FatalIfErrorNotNil(err)
//defer obs.w.Dispose()
err = obs.w.WriteRaw(httpflv.FlvHeader)
log.FatalIfErrorNotNil(err)
err = session.WaitLoop()
log.FatalIfErrorNotNil(err)
}
func parseFlag() string {
url := flag.String("i", "", "specify rtmp url")
func parseFlag() (string, string) {
i := flag.String("i", "", "specify pull rtmp url")
o := flag.String("o", "", "specify ouput flv file")
flag.Parse()
if *url == "" {
if *i == "" || *o == "" {
flag.Usage()
os.Exit(1)
}
return *url
return *i, *o
}

@ -3,9 +3,13 @@
"addr": ":19350"
},
"log": {
"level": 0,
"level": 1,
"filename": "./logs/lal.log",
"is_to_stdout": true,
"rotate_mbyte": 1024
"is_rotate_daily": true,
"short_file_flag": true
},
"pprof": {
"addr": ":10001"
}
}

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.12
require github.com/q191201771/nezha v0.1.0
require github.com/q191201771/nezha v0.1.1-0.20190916054132-d7c7d6a55337

@ -1,2 +1,2 @@
github.com/q191201771/nezha v0.1.0 h1:ZCFC5g9Vc5jNGG/hSMMBxF2EF8BsWiBMMTMqdnM1Uew=
github.com/q191201771/nezha v0.1.0/go.mod h1:Rd4R+bJRemlSUnz7KHmSX6ZQlsHLBjT7wlzuLeOia/M=
github.com/q191201771/nezha v0.1.1-0.20190916054132-d7c7d6a55337 h1:rNlCVOQNOgoo5fja0UKPbpI5TrS4moe8lFoCSKZ55PM=
github.com/q191201771/nezha v0.1.1-0.20190916054132-d7c7d6a55337/go.mod h1:Rd4R+bJRemlSUnz7KHmSX6ZQlsHLBjT7wlzuLeOia/M=

@ -22,12 +22,13 @@ type ADTS struct {
// 传入 AAC Sequence Header一会生成 ADTS 头时需要使用
// @param <payload> rtmp message payload包含前面2个字节
func (obj *ADTS) PutAACSequenceHeader(payload []byte) {
log.Debugf(hex.Dump(payload[:4]))
soundFormat := payload[0] >> 4 // 10=AAC
soundRate := (payload[0] >> 2) & 0x03 // 3=44kHz. For AAC: always 3
soundSize := (payload[0] >> 1) & 0x01 // 0=snd8Bit 1=snd16Bit
soundType := payload[0] & 0x01 // For AAC: always 1
//aacPacketType := payload[1] // 0:sequence header 1:AAC raw
aacPacketType := payload[1] // 0:sequence header 1:AAC raw
log.Debugf("%d %d %d %d %d", soundFormat, soundRate, soundSize, soundType, aacPacketType)
obj.audioObjectType = (payload[2] & 0xf8) >> 3 // 5bit 编码结构类型
obj.samplingFrequencyIndex = ((payload[2] & 0x07) << 1) | (payload[3] >> 7) // 4bit 音频采样率索引值
@ -35,9 +36,6 @@ func (obj *ADTS) PutAACSequenceHeader(payload []byte) {
obj.frameLengthFlag = (payload[3] & 0x04) >> 2 // 1bit
obj.dependOnCoreCoder = (payload[3] & 0x02) >> 1 // 1bit
obj.extensionFlag = payload[3] & 0x01 // 1bit
log.Debugf(hex.Dump(payload[:4]))
log.Debugf("%d %d %d %d", soundFormat, soundRate, soundSize, soundType)
log.Debugf("%+v", obj)
}

@ -64,7 +64,7 @@ func (session *PullSession) Dispose(err error) {
session.closeOnce.Do(func() {
log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err)
if err := session.Conn.Close(); err != nil {
log.Error("conn close error. [%s] err=%v", session.UniqueKey, err)
log.Errorf("conn close error. [%s] err=%v", session.UniqueKey, err)
}
})
}

@ -16,7 +16,7 @@ func (ffw *FlvFileWriter) WriteRaw(b []byte) (err error) {
return
}
func (ffw *FlvFileWriter) WriteTag(tag *Tag) (err error) {
func (ffw *FlvFileWriter) WriteTag(tag Tag) (err error) {
_, err = ffw.fp.Write(tag.Raw)
return
}

@ -17,6 +17,8 @@ const (
prevTagFieldSize = 4
)
var FlvHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00}
var readBufSize = 16384
type LineReader interface {

@ -158,7 +158,7 @@ func (session *SubSession) Dispose(err error) {
atomic.StoreUint32(&session.hasClosedFlag, 1)
close(session.exitChan)
if err := session.conn.Close(); err != nil {
log.Error("conn close error. [%s] err=%v", session.UniqueKey, err)
log.Errorf("conn close error. [%s] err=%v", session.UniqueKey, err)
}
})
}

@ -91,11 +91,11 @@ func IsAACSeqHeader(tag []byte) bool {
return tag[0] == TagTypeAudio && tag[TagHeaderSize]>>4 == SoundFormatAAC && tag[TagHeaderSize+1] == AACPacketTypeSeqHeader
}
func PackHTTPFlvTag(t uint8, timestamp int, in []byte) []byte {
func PackHTTPFlvTag(t uint8, timestamp uint32, in []byte) []byte {
out := make([]byte, TagHeaderSize+len(in)+prevTagSizeFieldSize)
out[0] = t
bele.BEPutUint24(out[1:], uint32(len(in)))
bele.BEPutUint24(out[4:], uint32(timestamp&0xFFFFFF))
bele.BEPutUint24(out[4:], timestamp&0xFFFFFF)
out[7] = uint8(timestamp >> 24)
out[8] = 0
out[9] = 0

@ -1,4 +1,4 @@
package main
package logic
import (
"encoding/json"
@ -7,23 +7,29 @@ import (
)
type Config struct {
RTMP RTMP `json:"rtmp"`
Log log.Config
RTMP RTMP `json:"rtmp"`
Log log.Config `json:"log"`
PProf PProf `json:"pprof"`
// v1.0.0之前不提供
SubIdleTimeout int64 `json:"sub_idle_timeout"`
GOPCacheNum int `json:"gop_cache_number"`
HTTPFlv HTTPFlv `json:"httpflv"`
Pull Pull `json:"pull"`
}
type HTTPFlv struct {
SubListenAddr string `json:"sub_listen_addr"`
type RTMP struct {
Addr string `json:"addr"`
}
type RTMP struct {
type PProf struct {
Addr string `json:"addr"`
}
type HTTPFlv struct {
SubListenAddr string `json:"sub_listen_addr"`
}
type Pull struct {
Type string `json:"type"`
Addr string `json:"addr"`

@ -1,16 +1,14 @@
package rtmp
package logic
import (
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/nezha/pkg/log"
"github.com/q191201771/nezha/pkg/unique"
"sync"
"time"
)
type GroupObserver interface {
AVMsgObserver
}
type Group struct {
UniqueKey string
@ -19,29 +17,27 @@ type Group struct {
exitChan chan struct{}
mutex sync.Mutex
pubSession *ServerSession
pullSession *PullSession
subSessionSet map[*ServerSession]struct{}
obs GroupObserver
//prevAudioHeader *Header
//prevVideoHeader *Header
mutex sync.Mutex
pubSession *rtmp.ServerSession
pullSession *rtmp.PullSession
subSessionSet map[*rtmp.ServerSession]struct{}
// TODO chef:
metadata []byte
avcKeySeqHeader []byte
aacSeqHeader []byte
}
var _ rtmp.PubSessionObserver = &Group{}
func NewGroup(appName string, streamName string) *Group {
uk := unique.GenUniqueKey("RTMPGROUP")
log.Infof("lifecycle new rtmp.Group. [%s] appName=%s, streamName=%s", uk, appName, streamName)
log.Infof("lifecycle new group. [%s] appName=%s, streamName=%s", uk, appName, streamName)
return &Group{
UniqueKey: uk,
appName: appName,
streamName: streamName,
exitChan: make(chan struct{}, 1),
subSessionSet: make(map[*ServerSession]struct{}),
exitChan: make(chan struct{}, 1),
subSessionSet: make(map[*rtmp.ServerSession]struct{}),
}
}
@ -59,8 +55,8 @@ func (group *Group) RunLoop() {
}
}
func (group *Group) Dispose() {
log.Infof("lifecycle dispose rtmp.Group. [%s]", group.UniqueKey)
func (group *Group) Dispose(err error) {
log.Infof("lifecycle dispose group. [%s]", group.UniqueKey)
group.exitChan <- struct{}{}
group.mutex.Lock()
@ -68,47 +64,55 @@ func (group *Group) Dispose() {
if group.pubSession != nil {
group.pubSession.Dispose()
}
// TODO chef: dispose pull session
for session := range group.subSessionSet {
session.Dispose()
}
}
func (group *Group) AddPubSession(session *ServerSession) {
func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool {
log.Debugf("add PubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
if group.pubSession != nil {
log.Errorf("PubSession already exist in group. [%s] old=%s, new=%s", group.UniqueKey, group.pubSession.UniqueKey, session.UniqueKey)
return false
}
group.pubSession = session
group.mutex.Unlock()
session.SetPubSessionObserver(group)
return true
}
func (group *Group) AddSubSession(session *ServerSession) {
func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) {
log.Debugf("add SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
group.subSessionSet[session] = struct{}{}
group.mutex.Unlock()
// TODO chef: 多长没有拉流session存在的功能
//group.turnToEmptyTick = 0
}
func (group *Group) DelPubSession(session *ServerSession) {
func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) {
log.Debugf("del PubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
group.pubSession = nil
group.mutex.Unlock()
group.metadata = nil
group.avcKeySeqHeader = nil
group.aacSeqHeader = nil
}
func (group *Group) DelSubSession(session *ServerSession) {
func (group *Group) DelRTMPSubSession(session *rtmp.ServerSession) {
log.Debugf("del SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
delete(group.subSessionSet, session)
group.mutex.Unlock()
}
func (group *Group) AddHTTPFlvSubSession(session *httpflv.SubSession, httpFlvGroup *httpflv.Group) {
panic("not impl")
}
func (group *Group) Pull(addr string, connectTimeout int64) {
@ -148,39 +152,30 @@ func (group *Group) IsInExist() bool {
return group.pubSession != nil
}
func (group *Group) SetObserver(obs GroupObserver) {
group.obs = obs
}
// PubSession or PullSession
func (group *Group) ReadRTMPAVMsgCB(header Header, timestampAbs uint32, message []byte) {
func (group *Group) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.broadcastRTMP2RTMP(header, timestampAbs, message)
if group.obs != nil {
group.obs.ReadRTMPAVMsgCB(header, timestampAbs, message)
}
}
func (group *Group) broadcastRTMP2RTMP(header Header, timestampAbs uint32, message []byte) {
//log.Infof("%+v", header)
func (group *Group) broadcastRTMP2RTMP(header rtmp.Header, timestampAbs uint32, message []byte) {
log.Infof("%+v", header)
// # 1. 设置好头部信息
var currHeader Header
currHeader.MsgLen = len(message)
var currHeader rtmp.Header
currHeader.MsgLen = uint32(len(message))
currHeader.Timestamp = timestampAbs
currHeader.MsgTypeID = header.MsgTypeID
currHeader.MsgStreamID = MSID1
currHeader.MsgStreamID = rtmp.MSID1
switch header.MsgTypeID {
case TypeidDataMessageAMF0:
currHeader.CSID = CSIDAMF
case rtmp.TypeidDataMessageAMF0:
currHeader.CSID = rtmp.CSIDAMF
//prevHeader = nil
case TypeidAudio:
currHeader.CSID = CSIDAudio
case rtmp.TypeidAudio:
currHeader.CSID = rtmp.CSIDAudio
//prevHeader = group.prevAudioHeader
case TypeidVideo:
currHeader.CSID = CSIDVideo
case rtmp.TypeidVideo:
currHeader.CSID = rtmp.CSIDVideo
//prevHeader = group.prevVideoHeader
}
@ -190,11 +185,11 @@ func (group *Group) broadcastRTMP2RTMP(header Header, timestampAbs uint32, messa
for session := range group.subSessionSet {
// ## 2.1. 一个message广播给多个sub session时只做一次chunk切割
if absChunks == nil {
absChunks = Message2Chunks(message, &currHeader, LocalChunkSize)
absChunks = rtmp.Message2Chunks(message, &currHeader, rtmp.LocalChunkSize)
}
// ## 2.2. 如果是新的sub session发送已缓存的信息
if session.isFresh {
if session.IsFresh {
// 发送缓存的头部信息
if group.metadata != nil {
session.AsyncWrite(group.metadata)
@ -205,23 +200,23 @@ func (group *Group) broadcastRTMP2RTMP(header Header, timestampAbs uint32, messa
if group.aacSeqHeader != nil {
session.AsyncWrite(group.aacSeqHeader)
}
session.isFresh = false
session.IsFresh = false
}
// ## 2.3. 判断当前包的类型以及sub session的状态决定是否发送并更新sub session的状态
switch header.MsgTypeID {
case TypeidDataMessageAMF0:
case rtmp.TypeidDataMessageAMF0:
session.AsyncWrite(absChunks)
case TypeidAudio:
case rtmp.TypeidAudio:
session.AsyncWrite(absChunks)
case TypeidVideo:
if session.waitKeyNalu {
case rtmp.TypeidVideo:
if session.WaitKeyNalu {
if message[0] == 0x17 && message[1] == 0x0 {
session.AsyncWrite(absChunks)
}
if message[0] == 0x17 && message[1] == 0x1 {
session.AsyncWrite(absChunks)
session.waitKeyNalu = false
session.WaitKeyNalu = false
}
} else {
session.AsyncWrite(absChunks)
@ -234,28 +229,63 @@ func (group *Group) broadcastRTMP2RTMP(header Header, timestampAbs uint32, messa
// # 3. 缓存 metadata 和 avc key seq header 和 aac seq header
// 由于可能没有订阅者所以message可能还没做chunk切割所以这里要做判断是否做chunk切割
switch header.MsgTypeID {
case TypeidDataMessageAMF0:
case rtmp.TypeidDataMessageAMF0:
if absChunks == nil {
absChunks = Message2Chunks(message, &currHeader, LocalChunkSize)
absChunks = rtmp.Message2Chunks(message, &currHeader, rtmp.LocalChunkSize)
}
log.Debugf("cache metadata. [%s]", group.UniqueKey)
group.metadata = absChunks
case TypeidVideo:
case rtmp.TypeidVideo:
// TODO chef: magic number
if message[0] == 0x17 && message[1] == 0x0 {
if absChunks == nil {
absChunks = Message2Chunks(message, &currHeader, LocalChunkSize)
absChunks = rtmp.Message2Chunks(message, &currHeader, rtmp.LocalChunkSize)
}
log.Debugf("cache avc key seq header. [%s]", group.UniqueKey)
group.avcKeySeqHeader = absChunks
}
case TypeidAudio:
case rtmp.TypeidAudio:
if (message[0]>>4) == 0x0a && message[1] == 0x0 {
if absChunks == nil {
absChunks = Message2Chunks(message, &currHeader, LocalChunkSize)
absChunks = rtmp.Message2Chunks(message, &currHeader, rtmp.LocalChunkSize)
}
log.Debugf("cache aac seq header. [%s]", group.UniqueKey)
group.aacSeqHeader = absChunks
}
}
}
func (group *Group) pullIfNeeded() {
panic("not impl")
//if !gm.isInExist() {
// switch gm.config.Pull.Type {
// case "httpflv":
// go gm.httpFlvGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout, gm.config.Pull.ReadTimeout)
// case "rtmp":
// go gm.rtmpGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout)
// }
//}
}
func (group *Group) isInExist() bool {
panic("not impl")
//return (gm.rtmpGroup != nil && gm.rtmpGroup.IsInExist()) ||
// (gm.httpFlvGroup != nil && gm.httpFlvGroup.IsInExist())
}
// GroupObserver of httpflv.Group
func (group *Group) ReadHTTPRespHeaderCB() {
// noop
}
// GroupObserver of httpflv.Group
func (group *Group) ReadFlvHeaderCB(flvHeader []byte) {
// noop
}
// GroupObserver of httpflv.Group
func (group *Group) ReadFlvTagCB(tag *httpflv.Tag) {
log.Info("ReadFlvTagCB")
// TODO chef: broadcast to rtmp.Group
}

@ -1,4 +1,4 @@
package main
package logic
import "errors"

@ -1,4 +1,4 @@
package main
package logic
import (
"github.com/q191201771/lal/pkg/httpflv"
@ -11,18 +11,20 @@ import (
type ServerManager struct {
config *Config
httpFlvServer *httpflv.Server
rtmpServer *rtmp.Server
groupManagerMap map[string]*GroupManager // TODO chef: with appName
mutex sync.Mutex
exitChan chan bool
httpFlvServer *httpflv.Server
rtmpServer *rtmp.Server
groupMap map[string]*Group // TODO chef: with appName
mutex sync.Mutex
exitChan chan struct{}
}
var _ rtmp.ServerObserver = &ServerManager{}
func NewServerManager(config *Config) *ServerManager {
m := &ServerManager{
config: config,
groupManagerMap: make(map[string]*GroupManager),
exitChan: make(chan bool),
config: config,
groupMap: make(map[string]*Group),
exitChan: make(chan struct{}),
}
if len(config.HTTPFlv.SubListenAddr) != 0 {
m.httpFlvServer = httpflv.NewServer(m, config.HTTPFlv.SubListenAddr, config.SubIdleTimeout)
@ -62,7 +64,7 @@ func (sm *ServerManager) RunLoop() {
count++
if (count % 10) == 0 {
sm.mutex.Lock()
log.Infof("group size:%d", len(sm.groupManagerMap))
log.Infof("group size:%d", len(sm.groupMap))
sm.mutex.Unlock()
}
}
@ -79,55 +81,67 @@ func (sm *ServerManager) Dispose() {
}
sm.mutex.Lock()
for _, gm := range sm.groupManagerMap {
gm.Dispose(lalErr)
for _, group := range sm.groupMap {
group.Dispose(lalErr)
}
sm.groupManagerMap = nil
sm.groupMap = nil
sm.mutex.Unlock()
sm.exitChan <- true
sm.exitChan <- struct{}{}
}
// ServerObserver of rtmp.Server
func (sm *ServerManager) NewRTMPPubSessionCB(session *rtmp.ServerSession, rtmpGroup *rtmp.Group) bool {
gm := sm.getOrCreateGroupManager(session.AppName, session.StreamName)
return gm.AddRTMPPubSession(session, rtmpGroup)
func (sm *ServerManager) NewRTMPPubSessionCB(session *rtmp.ServerSession) bool {
group := sm.getOrCreateGroup(session.AppName, session.StreamName)
return group.AddRTMPPubSession(session)
}
// ServerObserver of rtmp.Server
func (sm *ServerManager) NewRTMPSubSessionCB(session *rtmp.ServerSession, rtmpGroup *rtmp.Group) bool {
gm := sm.getOrCreateGroupManager(session.AppName, session.StreamName)
gm.AddRTMPSubSession(session, rtmpGroup)
func (sm *ServerManager) NewRTMPSubSessionCB(session *rtmp.ServerSession) bool {
group := sm.getOrCreateGroup(session.AppName, session.StreamName)
group.AddRTMPSubSession(session)
return true
}
// ServerObserver of rtmp.Server
func (sm *ServerManager) DelRTMPPubSessionCB(session *rtmp.ServerSession) {
group := sm.getOrCreateGroup(session.AppName, session.StreamName)
group.DelRTMPPubSession(session)
}
// ServerObserver of rtmp.Server
func (sm *ServerManager) DelRTMPSubSessionCB(session *rtmp.ServerSession) {
group := sm.getOrCreateGroup(session.AppName, session.StreamName)
group.DelRTMPSubSession(session)
}
// ServerObserver of httpflv.Server
func (sm *ServerManager) NewHTTPFlvSubSessionCB(session *httpflv.SubSession, httpFlvGroup *httpflv.Group) bool {
gm := sm.getOrCreateGroupManager(session.AppName, session.StreamName)
gm.AddHTTPFlvSubSession(session, httpFlvGroup)
group := sm.getOrCreateGroup(session.AppName, session.StreamName)
group.AddHTTPFlvSubSession(session, httpFlvGroup)
return true
}
func (sm *ServerManager) check() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
for k, gm := range sm.groupManagerMap {
if gm.IsTotalEmpty() {
log.Infof("erase empty group manager. [%s]", gm.UniqueKey)
gm.Dispose(lalErr)
delete(sm.groupManagerMap, k)
for k, group := range sm.groupMap {
if group.IsTotalEmpty() {
log.Infof("erase empty group manager. [%s]", group.UniqueKey)
group.Dispose(lalErr)
delete(sm.groupMap, k)
}
}
}
func (sm *ServerManager) getOrCreateGroupManager(appName string, streamName string) *GroupManager {
func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group {
sm.mutex.Lock()
defer sm.mutex.Unlock()
gm, exist := sm.groupManagerMap[streamName]
group, exist := sm.groupMap[streamName]
if !exist {
gm = NewGroupManager(appName, streamName, sm.config)
sm.groupManagerMap[streamName] = gm
group = NewGroup(appName, streamName)
sm.groupMap[streamName] = group
}
go gm.RunLoop()
return gm
go group.RunLoop()
return group
}

@ -0,0 +1,36 @@
package logic
import (
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
)
var Trans trans
type trans struct {
}
//// TODO chef: rtmp msg 也弄成结构体
//func (t trans) FlvTag2RTMPMsg(tag httpflv.Tag) (header rtmp.Header, timestampAbs uint32, message []byte) {
// header.MsgLen = tag.Header.DataSize
// header.MsgTypeID = tag.Header.T
// header.MsgStreamID = rtmp.MSID1 // TODO
// switch tag.Header.T {
// case httpflv.TagTypeMetadata:
// header.CSID = rtmp.CSIDAMF
// case httpflv.TagTypeAudio:
// header.CSID = rtmp.CSIDAudio
// case httpflv.TagTypeVideo:
// header.CSID = rtmp.CSIDVideo
// }
// return
//}
func (t trans) RTMPMsg2FlvTag(header rtmp.Header, timestampAbs uint32, message []byte) httpflv.Tag {
var tag httpflv.Tag
tag.Header.T = header.MsgTypeID
tag.Header.DataSize = header.MsgLen
tag.Header.Timestamp = timestampAbs
tag.Raw = httpflv.PackHTTPFlvTag(header.MsgTypeID, timestampAbs, message)
return tag
}

@ -12,8 +12,8 @@ import (
)
var (
ErrAMFInvalidType = errors.New("lal.AMF0: invalid type.")
ErrAMFTooShort = errors.New("lal.AMF0: too short.")
ErrAMFInvalidType = errors.New("lal.AMF0: invalid type")
ErrAMFTooShort = errors.New("lal.AMF0: too short")
)
const (
@ -117,7 +117,7 @@ func (amf0) WriteObject(writer io.Writer, objs []ObjectPair) error {
return err
}
default:
// TODO chef: if other types.
// TODO chef: 换成 nezha log panic
panic(objs[i].value)
}
}
@ -251,7 +251,7 @@ func (amf0) ReadObject(b []byte) (map[string]interface{}, int, error) {
obj[k] = v
index += l
default:
// TODO chef: if other types.
// TODO chef: 换成 nezha log panic
panic(vt)
}
}

@ -262,4 +262,4 @@ func BenchmarkAmf0_WriteObject(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = AMF0.WriteObject(out, objs)
}
}
}

@ -10,7 +10,7 @@ import (
)
type ChunkComposer struct {
peerChunkSize int
peerChunkSize uint32
csid2stream map[int]*Stream
}
@ -21,11 +21,11 @@ func NewChunkComposer() *ChunkComposer {
}
}
func (c *ChunkComposer) SetPeerChunkSize(val int) {
func (c *ChunkComposer) SetPeerChunkSize(val uint32) {
c.peerChunkSize = val
}
func (c *ChunkComposer) GetPeerChunkSize() int {
func (c *ChunkComposer) GetPeerChunkSize() uint32 {
return c.peerChunkSize
}
@ -69,8 +69,8 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
// 包头中为绝对时间戳
stream.header.Timestamp = bele.BEUint24(bootstrap)
stream.timestampAbs = stream.header.Timestamp
stream.header.MsgLen = int(bele.BEUint24(bootstrap[3:]))
stream.header.MsgTypeID = int(bootstrap[6])
stream.header.MsgLen = bele.BEUint24(bootstrap[3:])
stream.header.MsgTypeID = bootstrap[6]
stream.header.MsgStreamID = int(bele.LEUint32(bootstrap[7:]))
stream.msg.reserve(stream.header.MsgLen)
@ -81,8 +81,8 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
// 包头中为相对时间戳
stream.header.Timestamp = bele.BEUint24(bootstrap)
stream.timestampAbs += stream.header.Timestamp
stream.header.MsgLen = int(bele.BEUint24(bootstrap[3:]))
stream.header.MsgTypeID = int(bootstrap[6])
stream.header.MsgLen = bele.BEUint24(bootstrap[3:])
stream.header.MsgTypeID = bootstrap[6]
stream.msg.reserve(stream.header.MsgLen)
case 2:
@ -100,7 +100,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
// 5.3.1.3 Extended Timestamp
// 使用ffmpeg推流时发现时间戳超过3字节最大值后即使是fmt3(即包头大小为0)依然存在ext ts字段
// 所以这里我将 `==` 的判断改成了 `>=`
// TODO 测试其他客户端和ext ts相关的表现
// TODO chef: 测试其他客户端和ext ts相关的表现
//if stream.header.Timestamp == maxTimestampInMessageHeader {
if stream.header.Timestamp >= maxTimestampInMessageHeader {
if _, err := io.ReadAtLeast(reader, bootstrap[:4], 4); err != nil {
@ -119,9 +119,9 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
}
}
//stream.header.CSID = csid
//log.Debugf("CHEFGREPME tag1 fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.msgLen, stream.timestampAbs)
//log.Debugf("CHEFGREPME tag1 fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.timestampAbs)
var neededSize int
var neededSize uint32
if stream.header.MsgLen <= c.peerChunkSize {
neededSize = stream.header.MsgLen
} else {
@ -132,7 +132,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
}
//stream.msg.reserve(neededSize)
if _, err := io.ReadAtLeast(reader, stream.msg.buf[stream.msg.e:stream.msg.e+neededSize], neededSize); err != nil {
if _, err := io.ReadAtLeast(reader, stream.msg.buf[stream.msg.e:stream.msg.e+neededSize], int(neededSize)); err != nil {
return err
}
stream.msg.produced(neededSize)
@ -140,7 +140,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
if stream.msg.len() == stream.header.MsgLen {
// 对端设置了chunk size
if stream.header.MsgTypeID == typeidSetChunkSize {
val := int(bele.BEUint32(stream.msg.buf))
val := bele.BEUint32(stream.msg.buf)
c.SetPeerChunkSize(val)
}

@ -8,15 +8,14 @@ import (
"github.com/q191201771/nezha/pkg/bele"
)
// TODO chef: 新的message的第一个chunk始终使用fmt0格式参考前一个message
// TODO chef: 新的message的第一个chunk始终使用fmt0格式没有参考前一个message
func Message2Chunks(message []byte, header *Header, chunkSize int) []byte {
return message2Chunks(message, header, nil, chunkSize)
}
// TODO chef: 返回值直接传入
func calcHeader(header *Header, prevHeader *Header) []byte {
// @param 返回头的大小
func calcHeader(header *Header, prevHeader *Header, out []byte) int {
var index int
out := make([]byte, 16)
// 计算fmt和timestamp
fmt := uint8(0)
@ -69,9 +68,9 @@ func calcHeader(header *Header, prevHeader *Header) []byte {
index += 3
if fmt <= 1 {
bele.BEPutUint24(out[index:], uint32(header.MsgLen))
bele.BEPutUint24(out[index:], header.MsgLen)
index += 3
out[index] = uint8(header.MsgTypeID)
out[index] = header.MsgTypeID
index++
if fmt == 0 {
@ -88,7 +87,7 @@ func calcHeader(header *Header, prevHeader *Header) []byte {
index += 4
}
return out[0:index]
return index
}
func message2Chunks(message []byte, header *Header, prevHeader *Header, chunkSize int) []byte {
@ -112,9 +111,8 @@ func message2Chunks(message []byte, header *Header, prevHeader *Header, chunkSiz
// NOTICE 和srs交互时发现srs要求message中的非第一个chunk不能使用fmt0
// 将message切割成chunk放入chunk body中
for i := 0; i < numOfChunk; i++ {
head := calcHeader(header, prevHeader)
copy(out[index:], head)
index += len(head)
headLen := calcHeader(header, prevHeader, out[index:])
index += headLen
if i != numOfChunk-1 {
copy(out[index:], message[i*chunkSize:i*chunkSize+chunkSize])

@ -23,5 +23,3 @@ func NewPushSession(timeout PushSessionTimeout) *PushSession {
func (s *PushSession) Push(rawURL string) error {
return s.Do(rawURL)
}
// TODO chef: add function to write av data

@ -189,7 +189,6 @@ func (s *ClientSession) doDataMessageAMF0(stream *Stream) error {
case "|RtmpSampleAccess": // TODO chef: handle this?
return nil
default:
// TODO chef:
log.Error(val)
log.Error(hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e]))
}
@ -298,7 +297,7 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
return err
}
case CSTPushSession:
log.Infof("<----- publish('%s')", s.streamNameWithRawQuery, s.UniqueKey)
log.Infof("<----- publish('%s'). [%s]", s.streamNameWithRawQuery, s.UniqueKey)
if err := s.packer.writePublish(s.Conn, s.appName, s.streamNameWithRawQuery, sid); err != nil {
return err
}
@ -356,7 +355,7 @@ func (s *ClientSession) parseURL(rawURL string) error {
} else {
s.streamNameWithRawQuery = s.streamName + "?" + s.url.RawQuery
}
log.Debugf("%s %s %s %+v", s.tcURL, s.appName, s.streamNameWithRawQuery, *s.url)
log.Debugf("parseURL. [%s] %s %s %s %+v", s.UniqueKey, s.tcURL, s.appName, s.streamNameWithRawQuery, *s.url)
return nil
}
@ -393,7 +392,6 @@ func (s *ClientSession) tcpConnect() error {
return err
}
// TODO chef: 超时由接口设置
s.Conn = connection.New(conn, connection.Config{
ReadBufSize: readBufSize,
})

@ -10,10 +10,6 @@ import (
"time"
)
// TODO chef: doc
// TODO chef: HandshakeClient with complex mode
const version = uint8(3)
const (
@ -65,9 +61,12 @@ var random1528Buf []byte
type HandshakeClient interface {
WriteC0C1(writer io.Writer) error
ReadS0S1S2(reader io.Reader) error
WriterC2(writer io.Writer) error
WriteC2(writer io.Writer) error
}
var _ HandshakeClient = &HandshakeClientSimple{}
var _ HandshakeClient = &HandshakeClientComplex{}
type HandshakeClientSimple struct {
c0c1 []byte
c2 []byte
@ -131,7 +130,6 @@ func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error {
return err
}
func (c *HandshakeClientSimple) ReadS0S1S2(reader io.Reader) error {
s0s1s2 := make([]byte, s0s1s2Len)
if _, err := io.ReadAtLeast(reader, s0s1s2, s0s1s2Len); err != nil {
@ -151,7 +149,6 @@ func (c *HandshakeClientSimple) WriteC2(write io.Writer) error {
return err
}
func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error) {
c0c1 := make([]byte, c0c1Len)
if _, err = io.ReadAtLeast(reader, c0c1, c0c1Len); err != nil {
@ -280,7 +277,6 @@ func random1528(out []byte) {
}
func init() {
// TODO chef: hack lal in
bs := []byte{'l', 'a', 'l'}
bsl := len(bs)
random1528Buf = make([]byte, 1528)

@ -28,7 +28,7 @@ func NewMessagePacker() *MessagePacker {
}
}
func (packer *MessagePacker) writeMessageHeader(csid int, bodyLen int, typeID int, streamID int) {
func (packer *MessagePacker) writeMessageHeader(csid int, bodyLen int, typeID uint8, streamID int) {
// 目前这个函数只供发送信令时调用,信令的 csid 都是小于等于 63 的,如果传入的 csid 大于 63直接 panic
if csid > 63 {
panic(csid)
@ -38,11 +38,11 @@ func (packer *MessagePacker) writeMessageHeader(csid int, bodyLen int, typeID in
// 0 0 0 是时间戳
_, _ = packer.b.Write([]byte{uint8(fmt<<6 | csid), 0, 0, 0})
_ = bele.WriteBEUint24(packer.b, uint32(bodyLen))
_, _ = packer.b.Write([]byte{uint8(typeID)})
_ = packer.b.WriteByte(typeID)
_ = bele.WriteLE(packer.b, uint32(streamID))
}
func (packer *MessagePacker) writeProtocolControlMessage(writer io.Writer, typeID int, val int) error {
func (packer *MessagePacker) writeProtocolControlMessage(writer io.Writer, typeID uint8, val int) error {
packer.writeMessageHeader(csidProtocolControl, 4, typeID, 0)
_ = bele.WriteBE(packer.b, uint32(val))
_, err := packer.b.WriteTo(writer)
@ -70,7 +70,6 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL strin
_ = AMF0.WriteString(packer.b, "connect")
_ = AMF0.WriteNumber(packer.b, float64(tidClientConnect))
// TODO chef: hack lal in
objs := []ObjectPair{
{key: "app", value: appName},
{key: "type", value: "nonprivate"},

@ -1,7 +1,15 @@
package rtmp
import (
"errors"
import "errors"
// 一些更专业的配置项,暂时只在该源码文件中配置,不提供外部配置接口
var (
readBufSize = 4096
writeBufSize = 4096
wChanSize = 1024
windowAcknowledgementSize = 5000000
peerBandwidth = 5000000
LocalChunkSize = 4096 // 本端设置的chunk size
)
var rtmpErr = errors.New("rtmp: fxxk")
@ -19,15 +27,15 @@ const (
)
const (
TypeidAudio = 8
TypeidVideo = 9
TypeidDataMessageAMF0 = 18 // meta
typeidSetChunkSize = 1
typeidAck = 3
typeidUserControl = 4
typeidWinAckSize = 5
typeidBandwidth = 6
typeidCommandMessageAMF0 = 20
TypeidAudio = uint8(8)
TypeidVideo = uint8(9)
TypeidDataMessageAMF0 = uint8(18) // meta
typeidSetChunkSize = uint8(1)
typeidAck = uint8(3)
typeidUserControl = uint8(4)
typeidWinAckSize = uint8(5)
typeidBandwidth = uint8(6)
typeidCommandMessageAMF0 = uint8(20)
)
const (
@ -49,21 +57,10 @@ const (
MSID1 = 1 // publish、play、onStatus 以及 音视频数据
)
// TODO chef
var (
readBufSize = 4096
writeBufSize = 4096
wChanSize = 1024
)
var windowAcknowledgementSize = 5000000
var peerBandwidth = 5000000
var LocalChunkSize = 4096 // 本端设置的chunk size
// 接收到音视频类型数据时的回调函数。目前被PullSession以及PubSession使用。
type AVMsgObserver interface {
// @param header:
// @param timestampAbs: 绝对时间戳
// @param message: 不包含头内容。回调结束后PullSession会继续使用这块内存。
// @param message: 不包含头内容。回调结束后PubSession 会继续使用这块内存。
ReadRTMPAVMsgCB(header Header, timestampAbs uint32, message []byte)
}

@ -3,29 +3,25 @@ package rtmp
import (
"github.com/q191201771/nezha/pkg/log"
"net"
"sync"
)
type ServerObserver interface {
NewRTMPPubSessionCB(session *ServerSession, group *Group) bool // 返回true则允许推流返回false则强制关闭这个连接
NewRTMPSubSessionCB(session *ServerSession, group *Group) bool // 返回true则允许拉流返回false则强制关闭这个连接
NewRTMPPubSessionCB(session *ServerSession) bool // 返回true则允许推流返回false则强制关闭这个连接
NewRTMPSubSessionCB(session *ServerSession) bool // 返回true则允许拉流返回false则强制关闭这个连接
DelRTMPPubSessionCB(session *ServerSession)
DelRTMPSubSessionCB(session *ServerSession)
}
type Server struct {
obs ServerObserver
addr string
ln net.Listener
mutex sync.Mutex
groupMap map[string]*Group
// TODO chef: 清除空的group
}
func NewServer(obs ServerObserver, addr string) *Server {
return &Server{
obs: obs,
addr: addr,
groupMap: make(map[string]*Group),
obs: obs,
addr: addr,
}
}
@ -49,11 +45,6 @@ func (server *Server) Dispose() {
if err := server.ln.Close(); err != nil {
log.Error(err)
}
server.mutex.Lock()
defer server.mutex.Unlock()
for _, g := range server.groupMap {
g.Dispose()
}
}
func (server *Server) handleTCPConnect(conn net.Conn) {
@ -64,60 +55,25 @@ func (server *Server) handleTCPConnect(conn net.Conn) {
case ServerSessionTypeUnknown:
// noop
case ServerSessionTypePub:
server.DelRTMPPubSession(session)
server.obs.DelRTMPPubSessionCB(session)
case ServerSessionTypeSub:
server.DelRTMPSubSession(session)
server.obs.DelRTMPSubSessionCB(session)
}
}
// ServerSessionObserver
func (server *Server) NewRTMPPubSessionCB(session *ServerSession) {
group := server.getOrCreateGroup(session.AppName, session.StreamName)
if !server.obs.NewRTMPPubSessionCB(session, group) {
if !server.obs.NewRTMPPubSessionCB(session) {
log.Warnf("dispose PubSession since pub exist.")
session.Dispose()
return
}
group.AddPubSession(session)
}
// ServerSessionObserver
func (server *Server) NewRTMPSubSessionCB(session *ServerSession) {
group := server.getOrCreateGroup(session.AppName, session.StreamName)
if !server.obs.NewRTMPSubSessionCB(session, group) {
if !server.obs.NewRTMPSubSessionCB(session) {
// TODO chef: 关闭这个连接
return
}
group.AddSubSession(session)
}
func (server *Server) DelRTMPPubSession(session *ServerSession) {
group := server.getOrCreateGroup(session.AppName, session.StreamName)
// TODO chef: obs
group.DelPubSession(session)
}
func (server *Server) DelRTMPSubSession(session *ServerSession) {
group := server.getOrCreateGroup(session.AppName, session.StreamName)
// TODO chef: obs
group.DelSubSession(session)
}
func (server *Server) getOrCreateGroup(appName string, streamName string) *Group {
server.mutex.Lock()
defer server.mutex.Unlock()
group, exist := server.groupMap[streamName]
if !exist {
group = NewGroup(appName, streamName)
server.groupMap[streamName] = group
}
go group.RunLoop()
return group
}

@ -19,10 +19,10 @@ import (
type ServerSessionObserver interface {
NewRTMPPubSessionCB(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听
NewRTMPSubSessionCB(session *ServerSession)
//DelRTMPPubSessionCB(session *PubSession)
//DelRTMPSubSessionCB(session *SubSession)
}
var _ ServerSessionObserver = &Server{}
type PubSessionObserver interface {
AVMsgObserver
}
@ -63,13 +63,13 @@ type ServerSession struct {
avObs PubSessionObserver
// only for SubSession
isFresh bool
waitKeyNalu bool
IsFresh bool
WaitKeyNalu bool
}
func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession {
uk := unique.GenUniqueKey("RTMPPUBSUB")
log.Infof("lifecycle new rtmp.ServerSession. [%s]", uk)
log.Infof("lifecycle new rtmp server session. [%s]", uk)
return &ServerSession{
UniqueKey: uk,
obs: obs,
@ -81,8 +81,8 @@ func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession {
wb: bufio.NewWriterSize(conn, writeBufSize),
wChan: make(chan []byte, wChanSize),
exitChan: make(chan struct{}),
isFresh: true,
waitKeyNalu: true,
IsFresh: true,
WaitKeyNalu: true,
}
}
@ -101,7 +101,7 @@ func (s *ServerSession) RunLoop() (err error) {
}
func (s *ServerSession) Dispose() {
log.Infof("lifecycle dispose rtmp.ServerSession. [%s]", s.UniqueKey)
log.Infof("lifecycle dispose rtmp server session. [%s]", s.UniqueKey)
if atomic.LoadUint32(&s.hasClosedFlag) == 1 {
return
}
@ -177,7 +177,8 @@ func (s *ServerSession) doMsg(stream *Stream) error {
//log.Debugf("%d %d %v", stream.header.msgTypeID, stream.msgLen, stream.header)
switch stream.header.MsgTypeID {
case typeidSetChunkSize:
// TODO chef:
// noop
// 因为底层的 chunk composer 已经处理过了,这里就不用处理
case typeidCommandMessageAMF0:
return s.doCommandMessage(stream)
case TypeidDataMessageAMF0:
@ -218,7 +219,8 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error {
}
switch val {
case "|RtmpSampleAccess": // TODO chef: handle this?
case "|RtmpSampleAccess":
log.Warn("recv |RtmpSampleAccess. ignore it.")
return nil
case "@setDataFrame":
// macos obs
@ -234,9 +236,7 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error {
case "onMetaData":
// noop
default:
// TODO chef:
log.Error(val)
log.Error(hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e]))
log.Errorf("recv unknown message. val=%s, hex=%s", val, hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e]))
return nil
}
@ -338,9 +338,8 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) {
log.Debugf("[%s] pubType=%s", s.UniqueKey, pubType)
log.Infof("-----> publish('%s') [%s]", s.StreamName, s.UniqueKey)
// TODO chef: hardcode streamID
log.Infof("<---- onStatus('NetStream.Publish.Start'). [%s]", s.UniqueKey)
if err := s.packer.writeOnStatusPublish(s.conn, 1); err != nil {
if err := s.packer.writeOnStatusPublish(s.conn, MSID1); err != nil {
return err
}
s.t = ServerSessionTypePub
@ -363,7 +362,7 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {
// TODO chef: start duration reset
log.Infof("<----onStatus('NetStream.Play.Start'). [%s]", s.UniqueKey)
if err := s.packer.writeOnStatusPlay(s.conn, 1); err != nil {
if err := s.packer.writeOnStatusPlay(s.conn, MSID1); err != nil {
return err
}
s.t = ServerSessionTypeSub

@ -6,20 +6,20 @@ const initMsgLen = 4096
type Header struct {
CSID int
MsgLen int
MsgLen uint32
// NOTICE 是header中的时间戳可能是绝对的也可能是相对的。
// 如果需要绝对时间戳应该使用Stream中的timestampAbs
Timestamp uint32
MsgTypeID int // 8 audio 9 video 18 metadata
MsgTypeID uint8 // 8 audio 9 video 18 metadata
MsgStreamID int
}
type StreamMsg struct {
buf []byte
b int
e int
b uint32
e uint32
}
type Stream struct {
@ -37,29 +37,30 @@ func NewStream() *Stream {
}
}
func (msg *StreamMsg) reserve(n int) {
nn := cap(msg.buf) - msg.e
func (msg *StreamMsg) reserve(n uint32) {
bufCap := uint32(cap(msg.buf))
nn := bufCap - msg.e
if nn > n {
return
}
for nn < n {
nn <<= 1
}
nb := make([]byte, cap(msg.buf)+nn)
nb := make([]byte, bufCap+nn)
copy(nb, msg.buf[msg.b:msg.e])
msg.buf = nb
log.Debugf("reserve. need:%d left:%d %d %d", n, nn, len(msg.buf), cap(msg.buf))
}
func (msg *StreamMsg) len() int {
func (msg *StreamMsg) len() uint32 {
return msg.e - msg.b
}
func (msg *StreamMsg) produced(n int) {
func (msg *StreamMsg) produced(n uint32) {
msg.e += n
}
func (msg *StreamMsg) consumed(n int) {
func (msg *StreamMsg) consumed(n uint32) {
msg.b += n
}
@ -76,7 +77,7 @@ func (msg *StreamMsg) peekStringWithType() (string, error) {
func (msg *StreamMsg) readStringWithType() (string, error) {
str, l, err := AMF0.ReadString(msg.buf[msg.b:msg.e])
if err == nil {
msg.consumed(l)
msg.consumed(uint32(l))
}
return str, err
}
@ -84,7 +85,7 @@ func (msg *StreamMsg) readStringWithType() (string, error) {
func (msg *StreamMsg) readNumberWithType() (int, error) {
val, l, err := AMF0.ReadNumber(msg.buf[msg.b:msg.e])
if err == nil {
msg.consumed(l)
msg.consumed(uint32(l))
}
return int(val), err
}
@ -92,7 +93,7 @@ func (msg *StreamMsg) readNumberWithType() (int, error) {
func (msg *StreamMsg) readObjectWithType() (map[string]interface{}, error) {
obj, l, err := AMF0.ReadObject(msg.buf[msg.b:msg.e])
if err == nil {
msg.consumed(l)
msg.consumed(uint32(l))
}
return obj, err
}
@ -100,7 +101,7 @@ func (msg *StreamMsg) readObjectWithType() (map[string]interface{}, error) {
func (msg *StreamMsg) readNull() error {
l, err := AMF0.ReadNull(msg.buf[msg.b:msg.e])
if err == nil {
msg.consumed(l)
msg.consumed(uint32(l))
}
return err
}

Loading…
Cancel
Save