1. [fix] package logic: 转发 rtmp metadata 时,message header 中的 len 字段可能和 body 实际长度不一致 2. [log] package rtmp: 一些错误情况下,对接收到包 dump hex 3. [dep] update naza -> v0.7.1

pull/1/head
q191201771 5 years ago
parent 054fcc935b
commit 570228f18d

@ -26,16 +26,23 @@ func main() {
session := httpflv.NewPullSession()
abr := bitrate.New()
vbr := bitrate.New()
prevTs := int64(-1)
var runFlag nazaatomic.Bool
runFlag.Store(true)
go func() {
for runFlag.Load() {
time.Sleep(1 * time.Second)
log.Infof("bitrate. audio=%fkb/s, video=%fkb/s", abr.Rate(), vbr.Rate())
//log.Infof("bitrate. audio=%fkb/s, video=%fkb/s", abr.Rate(), vbr.Rate())
}
}()
err := session.Pull(url, func(tag httpflv.Tag) {
//log.Infof("onReadFLVTag. %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu())
now := time.Now().UnixNano() / 1e6
if prevTs != -1 {
//log.Infof("%v", now - prevTs)
}
prevTs = now
log.Infof("onReadFLVTag. %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu())
switch tag.Header.Type {
case httpflv.TagTypeAudio:
abr.Add(len(tag.Raw))

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.12
require github.com/q191201771/naza v0.7.0
require github.com/q191201771/naza v0.7.1

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.7.0 h1:nWpxx17hulHvHNYWIYIiMq/xca5lwW8+JocFYFbsveg=
github.com/q191201771/naza v0.7.0/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=
github.com/q191201771/naza v0.7.1 h1:+kkUyEEMAGrXBRN7PJMQYCJTMc1/llXBuK4N09x1dQI=
github.com/q191201771/naza v0.7.1/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=

@ -8,7 +8,9 @@
package logic
import "github.com/q191201771/lal/pkg/rtmp"
import (
"github.com/q191201771/lal/pkg/rtmp"
)
type LazyChunkDivider struct {
message []byte

@ -168,6 +168,8 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
// # 1. 设置好用于发送的 rtmp 头部信息
currHeader := Trans.MakeDefaultRTMPHeader(msg.Header)
// TODO 这行代码是否放到 MakeDefaultRTMPHeader 中
currHeader.MsgLen = uint32(len(msg.Payload))
lcd.Init(msg.Payload, &currHeader)
// # 2. 广播。遍历所有 rtmp sub session决定是否转发
@ -263,7 +265,7 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
}
group.metadata = lcd.Get()
group.metadataTag = currTag
log.Debugf("cache metadata. [%s]", group.UniqueKey)
log.Debugf("cache metadata. [%s] rtmp size:%d, flv size:%d", group.UniqueKey, len(group.metadata), group.metadataTag.Header.DataSize)
case rtmp.TypeidVideo:
// TODO chef: magic number
if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x0 {
@ -272,7 +274,7 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
}
group.avcKeySeqHeader = lcd.Get()
group.avcKeySeqHeaderTag = currTag
log.Debugf("cache avc key seq header. [%s]", group.UniqueKey)
log.Debugf("cache avc key seq header. [%s] rtmp size:%d, flv size:%d", group.UniqueKey, len(group.avcKeySeqHeader), group.avcKeySeqHeaderTag.Header.DataSize)
}
case rtmp.TypeidAudio:
if (msg.Payload[0]>>4) == 0x0a && msg.Payload[1] == 0x0 {
@ -281,7 +283,7 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
}
group.aacSeqHeader = lcd.Get()
group.aacSeqHeaderTag = currTag
log.Debugf("cache aac seq header. [%s]", group.UniqueKey)
log.Debugf("cache aac seq header. [%s] rtmp size:%d, flv size:%d", group.UniqueKey, len(group.aacSeqHeader), group.aacSeqHeaderTag.Header.DataSize)
}
}
}

@ -200,7 +200,7 @@ func (s *ClientSession) doMsg(stream *Stream) error {
case TypeidVideo:
s.onReadRTMPAVMsg(stream.toAVMsg())
default:
log.Errorf("read unknown msg type id. [%s] typeid=%+v", s.UniqueKey, stream.header)
log.Errorf("read unknown message. [%s] typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString())
panic(0)
}
return nil
@ -248,7 +248,7 @@ func (s *ClientSession) doCommandMessage(stream *Stream) error {
case "onStatus":
return s.doOnStatusMessage(stream, tid)
default:
log.Errorf("read unknown cmd. [%s] cmd=%s", s.UniqueKey, cmd)
log.Errorf("read unknown command message. [%s] cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString())
}
return nil
@ -357,7 +357,7 @@ func (s *ClientSession) doProtocolControlMessage(stream *Stream) error {
// composer内部会自动更新peer chunk size.
log.Infof("-----> Set Chunk Size %d. [%s]", val, s.UniqueKey)
default:
log.Errorf("unknown msg type id. [%s] id=%d", s.UniqueKey, stream.header.MsgTypeID)
log.Errorf("read unknown protocol control message. [%s] typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString())
}
return nil
}

@ -68,7 +68,8 @@ func (server *Server) Dispose() {
func (server *Server) handleTCPConnect(conn net.Conn) {
log.Infof("accept a rtmp connection. remoteAddr=%v", conn.RemoteAddr())
session := NewServerSession(server, conn)
_ = session.RunLoop()
err := session.RunLoop()
log.Infof("rtmp loop done. [%s] err=%v", session.UniqueKey, err)
switch session.t {
case ServerSessionTypeUnknown:
// noop

@ -9,7 +9,6 @@
package rtmp
import (
"encoding/hex"
"net"
"strings"
@ -146,10 +145,9 @@ func (s *ServerSession) doMsg(stream *Stream) error {
log.Errorf("read audio/video message but server session not pub type. [%s]", s.UniqueKey)
return ErrRTMP
}
//log.Infof("t:%d ts:%d len:%d", stream.header.MsgTypeID, stream.header.timestampAbs, stream.msg.e - stream.msg.b)
s.avObs.OnReadRTMPAVMsg(stream.toAVMsg())
default:
log.Warnf("unknown message. [%s] typeid=%d", s.UniqueKey, stream.header.MsgTypeID)
log.Warnf("read unknown message. [%s] typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString())
}
return nil
@ -174,7 +172,7 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error {
switch val {
case "|RtmpSampleAccess":
log.Warn("recv |RtmpSampleAccess. ignore it.")
log.Warnf("read data message, ignore it. [%s] val=%s", s.UniqueKey, val)
return nil
case "@setDataFrame":
// macos obs
@ -185,12 +183,13 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error {
return err
}
if val != "onMetaData" {
log.Errorf("read unknown data message. [%s] val=%s, %s", s.UniqueKey, val, stream.toDebugString())
return ErrRTMP
}
case "onMetaData":
// noop
default:
log.Errorf("recv unknown message. val=%s, hex=%s", val, hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e]))
log.Errorf("read unknown data message. [%s] val=%s, %s", s.UniqueKey, val, stream.toDebugString())
return nil
}
@ -224,9 +223,9 @@ func (s *ServerSession) doCommandMessage(stream *Stream) error {
case "FCUnpublish":
fallthrough
case "getStreamLength":
log.Warnf("read command message,ignore it. [%s] %s", s.UniqueKey, cmd)
log.Warnf("read command message, ignore it. [%s] cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString())
default:
log.Errorf("unknown cmd. [%s] cmd=%s", s.UniqueKey, cmd)
log.Errorf("read unknown command message. [%s] cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString())
}
return nil
}

@ -8,7 +8,12 @@
package rtmp
import log "github.com/q191201771/naza/pkg/nazalog"
import (
"encoding/hex"
"fmt"
log "github.com/q191201771/naza/pkg/nazalog"
)
const initMsgLen = 4096
@ -41,6 +46,13 @@ func NewStream() *Stream {
}
}
// 序列化成可读字符串,一般用于发生错误时打印日志
func (stream *Stream) toDebugString() string {
// 注意,这里打印的二进制数据的其实位置是从 0 开始,而不是 msg.b 位置
return fmt.Sprintf("header=%+v, b=%d, hex=%s",
stream.header, stream.msg.b, hex.Dump(stream.msg.buf[:stream.msg.e]))
}
func (stream *Stream) toAVMsg() AVMsg {
return AVMsg{
Header: stream.header,
@ -80,6 +92,10 @@ func (msg *StreamMsg) clear() {
msg.e = 0
}
//func (msg *StreamMsg) bytes() []byte {
// return msg.buf[msg.b: msg.e]
//}
func (msg *StreamMsg) peekStringWithType() (string, error) {
str, _, err := AMF0.ReadString(msg.buf[msg.b:msg.e])
return str, err

Loading…
Cancel
Save