messages:

1. #86 [fix] gop缓存设置为0时,可能花屏
2. #84 [fix] rtmp merge write开启时,可能给新加入的sub session发送错误的数据
pull/200/head^2
q191201771 4 years ago
parent 0cf8660d17
commit a41dc6d8ee

@ -4,9 +4,10 @@
"conf_version": "0.2.2", //. 配置文件版本号,业务方不应该手动修改,程序中会检查该版本
// 号是否与代码中声明的一致
"rtmp": {
"enable": true, //. 是否开启rtmp服务的监听
"addr": ":19350", //. RTMP服务监听的端口客户端向lalserver推拉流都是这个地址
"gop_num": 2 //. RTMP拉流的GOP缓存数量加速秒开
"enable": true, //. 是否开启rtmp服务的监听
"addr": ":19350", //. RTMP服务监听的端口客户端向lalserver推拉流都是这个地址
"gop_num": 2, //. RTMP拉流的GOP缓存数量加速流打开时间但是可能增加延时
"merge_write_size": 8192 //. 将小包数据合并进行发送,单位字节,提高服务器性能,但是可能造成卡顿
},
"default_http": { //. http监听相关的默认配置如果hls, httpflv, httpts中没有单独配置以下配置项
// 则使用default_http中的配置

@ -85,7 +85,10 @@ func (b *bufWriter) Write(p []byte) {
}
func (b *bufWriter) Flush() {
b.flush()
if b.n == 0 {
return
}
b.wr(b.buf[:b.n])
b.mallocOnePiece(b.defaultSize)
}
@ -104,13 +107,6 @@ func (b *bufWriter) append(p []byte) {
b.n += len(p)
}
func (b *bufWriter) flush() {
if b.n == 0 {
return
}
b.wr(b.buf[:b.n])
}
func (dw *directWriter) Write(p []byte) {
dw.wr(p)
}

@ -21,14 +21,15 @@ import (
var flvHTTPResponseHeader []byte
type SubSession struct {
*base.HTTPSubSession // 直接使用它提供的函数
IsFresh bool
*base.HTTPSubSession // 直接使用它提供的函数
IsFresh bool
ShouldWaitVideoKeyFrame bool
}
func NewSubSession(conn net.Conn, urlCtx base.URLContext, isWebSocket bool, websocketKey string) *SubSession {
uk := base.GenUKFLVSubSession()
s := &SubSession{
base.NewHTTPSubSession(base.HTTPSubSessionOption{
HTTPSubSession: base.NewHTTPSubSession(base.HTTPSubSessionOption{
Conn: conn,
ConnModOption: func(option *connection.Option) {
option.WriteChanSize = SubSessionWriteChanSize
@ -40,7 +41,8 @@ func NewSubSession(conn net.Conn, urlCtx base.URLContext, isWebSocket bool, webs
IsWebSocket: isWebSocket,
WebSocketKey: websocketKey,
}),
true,
IsFresh: true,
ShouldWaitVideoKeyFrame: true,
}
nazalog.Infof("[%s] lifecycle new httpflv SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String())
return s

@ -54,18 +54,63 @@ func (l *LazyRTMPMsg2FLVTag) Get() []byte {
return l.tag
}
// ---------------------------------------------------------------------------------------------------------------------
// 提供两个功能:
// 1. 缓存Metadata, VideoSeqHeader, AACSeqHeader
// 2. 缓存音视频GOP数据
//
// 以下只讨论GOPCache的第2点功能
//
// 音频和视频都会缓存
//
// GOPCache也可能不缓存GOP数据见NewGOPCache函数的gopNum参数说明
//
// 以下我们只讨论gopNum > 0(也即gopSize > 1)的情况
//
// GOPCache为空时只有输入了关键帧才能开启GOP缓存非关键帧以及音频数据不会被缓存
// 因此单音频的流是ok的相当于不缓存任何数据
//
// GOPCache不为空时输入关键帧触发生成新的GOP元素其他情况则往最后一个GOP元素一直追加
//
// first用于读取第一个GOP可能不完整last的前一个用于写入当前GOP
//
// 最近不完整的GOP也会被缓存见NewGOPCache函数的gopNum参数说明
//
// -----
// gopNum = 1
// gopSize = 2
//
// first | first | first | 在后面两个状态间转换,就不画了
// | | | | | |
// 0 1 | 0 1 | 0 1 |
// * * | * * | * * |
// | | | | | |
// last | last | last |
// | | |
// (empty) | (full) | (full) |
// GetGOPCount: 0 | 1 | 1 |
// -----
//
//
type GOPCache struct {
t string
uniqueKey string
t string
uniqueKey string
Metadata []byte
VideoSeqHeader []byte
AACSeqHeader []byte
gopRing []GOP
gopRingFirst int
gopRingLast int
gopSize int
gopRing []GOP
gopRingFirst int
gopRingLast int
gopSize int
}
// @param gopNum: gop缓存大小
// 如果为0则不缓存音频数据也即GOP缓存功能不生效
// 如果>0则缓存<gopNum>个完整GOP另外还可能有半个最近不完整的GOP
//
func NewGOPCache(t string, uniqueKey string, gopNum int) *GOPCache {
return &GOPCache{
t: t,
@ -99,7 +144,6 @@ func (gc *GOPCache) Feed(msg base.RTMPMsg, lg LazyGet) {
}
}
// 这个size的判断去掉也行
if gc.gopSize > 1 {
if msg.IsVideoKeyNALU() {
gc.feedNewGOP(msg, lg())
@ -109,6 +153,7 @@ func (gc *GOPCache) Feed(msg base.RTMPMsg, lg LazyGet) {
}
}
// 获取GOP数量注意最后一个可能是不完整的
func (gc *GOPCache) GetGOPCount() int {
return (gc.gopRingLast + gc.gopSize - gc.gopRingFirst) % gc.gopSize
}
@ -128,12 +173,15 @@ func (gc *GOPCache) Clear() {
gc.gopRingFirst = 0
}
// 往最后一个GOP元素追加一个msg
// 注意如果GOPCache为空则不缓存msg
func (gc *GOPCache) feedLastGOP(msg base.RTMPMsg, b []byte) {
if !gc.isGOPRingEmpty() {
gc.gopRing[(gc.gopRingLast-1+gc.gopSize)%gc.gopSize].Feed(msg, b)
}
}
// 生成一个最新的GOP元素并往里追加一个msg
func (gc *GOPCache) feedNewGOP(msg base.RTMPMsg, b []byte) {
if gc.isGOPRingFull() {
gc.gopRingFirst = (gc.gopRingFirst + 1) % gc.gopSize

@ -364,6 +364,14 @@ func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtmpSubSessionSet[session] = struct{}{}
// 加入时,如果上行还没有推过视频(比如还没推流,或者是单音频流),就不需要等待关键帧了
// 也即我们假定上行肯定是以关键帧为开始进行视频发送假设不是那么我们按上行的流正常发而不过滤掉关键帧前面的不包含关键帧的非完整GOP
// TODO(chef):
// 1. 需要仔细考虑单音频无视频的流的情况
// 2. 这里不修改标志让这个session继续等关键帧也可以
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
group.pullIfNeeded()
}
@ -382,6 +390,10 @@ func (group *Group) AddHTTPFLVSubSession(session *httpflv.SubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.httpflvSubSessionSet[session] = struct{}{}
// 加入时,如果上行还没有推流过,就不需要等待关键帧了
if group.stat.VideoCodec == "" {
session.ShouldWaitVideoKeyFrame = false
}
group.pullIfNeeded()
}
@ -753,25 +765,46 @@ func (group *Group) broadcastByRTMPMsg(msg base.RTMPMsg) {
if session.IsFresh {
// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
if group.rtmpGopCache.Metadata != nil {
//nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey)
nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.Metadata)
}
if group.rtmpGopCache.VideoSeqHeader != nil {
//nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey)
nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.rtmpGopCache.AACSeqHeader != nil {
//nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey)
nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey())
_ = session.Write(group.rtmpGopCache.AACSeqHeader)
}
for i := 0; i < group.rtmpGopCache.GetGOPCount(); i++ {
gopCount := group.rtmpGopCache.GetGOPCount()
if gopCount > 0 {
// GOP缓存中肯定包含了关键帧
session.ShouldWaitVideoKeyFrame = false
nazalog.Debugf("[%s] [%s] write gop cahe. gop num=%d", group.UniqueKey, session.UniqueKey(), gopCount)
}
for i := 0; i < gopCount; i++ {
for _, item := range group.rtmpGopCache.GetGOPDataAt(i) {
_ = session.Write(item)
}
}
// 有新加入的sub session本次循环的第一个新加入的sub session把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 从而确保新加入的sub session不会发送这部分脏的数据
// 注意此处可能被调用多次但是只有第一次会实际flush缓存数据
group.rtmpBufWriter.Flush()
session.IsFresh = false
}
if session.ShouldWaitVideoKeyFrame && msg.IsVideoKeyNALU() {
// 有sub session在等待关键帧并且当前是关键帧
// 把rtmp buf writer中的缓存数据全部广播发送给老的sub session
// 并且修改这个sub session的标志
// 让rtmp buf writer来发送这个关键帧
group.rtmpBufWriter.Flush()
session.ShouldWaitVideoKeyFrame = false
}
}
// ## 3.2. 转发本次数据
if len(group.rtmpSubSessionSet) > 0 {
@ -820,7 +853,12 @@ func (group *Group) broadcastByRTMPMsg(msg base.RTMPMsg) {
if group.httpflvGopCache.AACSeqHeader != nil {
session.Write(group.httpflvGopCache.AACSeqHeader)
}
for i := 0; i < group.httpflvGopCache.GetGOPCount(); i++ {
gopCount := group.httpflvGopCache.GetGOPCount()
if gopCount > 0 {
// GOP缓存中肯定包含了关键帧
session.ShouldWaitVideoKeyFrame = false
}
for i := 0; i < gopCount; i++ {
for _, item := range group.httpflvGopCache.GetGOPDataAt(i) {
session.Write(item)
}
@ -829,7 +867,15 @@ func (group *Group) broadcastByRTMPMsg(msg base.RTMPMsg) {
session.IsFresh = false
}
session.Write(lrm2ft.Get())
// 是否在等待关键帧
if session.ShouldWaitVideoKeyFrame {
if msg.IsVideoKeyNALU() {
session.Write(lrm2ft.Get())
session.ShouldWaitVideoKeyFrame = false
}
} else {
session.Write(lrm2ft.Get())
}
}
// # 5. 录制flv文件
@ -853,7 +899,7 @@ func (group *Group) broadcastByRTMPMsg(msg base.RTMPMsg) {
group.stat.AudioCodec = base.AudioCodecAAC
}
}
if group.stat.AudioCodec == "" {
if group.stat.VideoCodec == "" {
if msg.IsAVCKeySeqHeader() {
group.stat.VideoCodec = base.VideoCodecAVC
}
@ -889,6 +935,9 @@ func (group *Group) broadcastByRTMPMsg(msg base.RTMPMsg) {
func (group *Group) write2RTMPSubSessions(b []byte) {
for session := range group.rtmpSubSessionSet {
if session.IsFresh || session.ShouldWaitVideoKeyFrame {
continue
}
_ = session.Write(b)
}
}

@ -70,7 +70,8 @@ type ServerSession struct {
avObserver PubSessionObserver
// only for SubSession
IsFresh bool
IsFresh bool
ShouldWaitVideoKeyFrame bool
}
func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession {
@ -85,12 +86,13 @@ func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSess
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
RemoteAddr: conn.RemoteAddr().String(),
},
uniqueKey: uk,
observer: observer,
t: ServerSessionTypeUnknown,
chunkComposer: NewChunkComposer(),
packer: NewMessagePacker(),
IsFresh: true,
uniqueKey: uk,
observer: observer,
t: ServerSessionTypeUnknown,
chunkComposer: NewChunkComposer(),
packer: NewMessagePacker(),
IsFresh: true,
ShouldWaitVideoKeyFrame: true,
}
nazalog.Infof("[%s] lifecycle new rtmp ServerSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String())
return s

Loading…
Cancel
Save