diff --git a/app/httpflvpull/httpflvpull.go b/app/httpflvpull/httpflvpull.go index a4230aa..f35dbce 100644 --- a/app/httpflvpull/httpflvpull.go +++ b/app/httpflvpull/httpflvpull.go @@ -26,16 +26,23 @@ func main() { session := httpflv.NewPullSession() abr := bitrate.New() vbr := bitrate.New() + prevTs := int64(-1) var runFlag nazaatomic.Bool runFlag.Store(true) go func() { for runFlag.Load() { time.Sleep(1 * time.Second) - log.Infof("bitrate. audio=%fkb/s, video=%fkb/s", abr.Rate(), vbr.Rate()) + //log.Infof("bitrate. audio=%fkb/s, video=%fkb/s", abr.Rate(), vbr.Rate()) } }() err := session.Pull(url, func(tag httpflv.Tag) { - //log.Infof("onReadFLVTag. %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu()) + now := time.Now().UnixNano() / 1e6 + if prevTs != -1 { + //log.Infof("%v", now - prevTs) + } + prevTs = now + + log.Infof("onReadFLVTag. %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu()) switch tag.Header.Type { case httpflv.TagTypeAudio: abr.Add(len(tag.Raw)) diff --git a/go.mod b/go.mod index 49922c4..172ae0f 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.12 -require github.com/q191201771/naza v0.7.0 +require github.com/q191201771/naza v0.7.1 diff --git a/go.sum b/go.sum index 0062481..1399a5e 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/naza v0.7.0 h1:nWpxx17hulHvHNYWIYIiMq/xca5lwW8+JocFYFbsveg= -github.com/q191201771/naza v0.7.0/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= +github.com/q191201771/naza v0.7.1 h1:+kkUyEEMAGrXBRN7PJMQYCJTMc1/llXBuK4N09x1dQI= +github.com/q191201771/naza v0.7.1/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= diff --git a/pkg/logic/gop_cache.go b/pkg/logic/gop_cache.go index b33414c..e82d9bf 100644 --- a/pkg/logic/gop_cache.go +++ b/pkg/logic/gop_cache.go @@ -8,7 +8,9 @@ package logic -import "github.com/q191201771/lal/pkg/rtmp" +import ( + "github.com/q191201771/lal/pkg/rtmp" +) type LazyChunkDivider struct { message []byte diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 1d83f0a..ad90ccd 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -168,6 +168,8 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { // # 1. 设置好用于发送的 rtmp 头部信息 currHeader := Trans.MakeDefaultRTMPHeader(msg.Header) + // TODO 这行代码是否放到 MakeDefaultRTMPHeader 中 + currHeader.MsgLen = uint32(len(msg.Payload)) lcd.Init(msg.Payload, &currHeader) // # 2. 广播。遍历所有 rtmp sub session,决定是否转发 @@ -263,7 +265,7 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { } group.metadata = lcd.Get() group.metadataTag = currTag - log.Debugf("cache metadata. [%s]", group.UniqueKey) + log.Debugf("cache metadata. [%s] rtmp size:%d, flv size:%d", group.UniqueKey, len(group.metadata), group.metadataTag.Header.DataSize) case rtmp.TypeidVideo: // TODO chef: magic number if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x0 { @@ -272,7 +274,7 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { } group.avcKeySeqHeader = lcd.Get() group.avcKeySeqHeaderTag = currTag - log.Debugf("cache avc key seq header. [%s]", group.UniqueKey) + log.Debugf("cache avc key seq header. [%s] rtmp size:%d, flv size:%d", group.UniqueKey, len(group.avcKeySeqHeader), group.avcKeySeqHeaderTag.Header.DataSize) } case rtmp.TypeidAudio: if (msg.Payload[0]>>4) == 0x0a && msg.Payload[1] == 0x0 { @@ -281,7 +283,7 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { } group.aacSeqHeader = lcd.Get() group.aacSeqHeaderTag = currTag - log.Debugf("cache aac seq header. [%s]", group.UniqueKey) + log.Debugf("cache aac seq header. [%s] rtmp size:%d, flv size:%d", group.UniqueKey, len(group.aacSeqHeader), group.aacSeqHeaderTag.Header.DataSize) } } } diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 0e61bc0..ef4f6d6 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -200,7 +200,7 @@ func (s *ClientSession) doMsg(stream *Stream) error { case TypeidVideo: s.onReadRTMPAVMsg(stream.toAVMsg()) default: - log.Errorf("read unknown msg type id. [%s] typeid=%+v", s.UniqueKey, stream.header) + log.Errorf("read unknown message. [%s] typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString()) panic(0) } return nil @@ -248,7 +248,7 @@ func (s *ClientSession) doCommandMessage(stream *Stream) error { case "onStatus": return s.doOnStatusMessage(stream, tid) default: - log.Errorf("read unknown cmd. [%s] cmd=%s", s.UniqueKey, cmd) + log.Errorf("read unknown command message. [%s] cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString()) } return nil @@ -357,7 +357,7 @@ func (s *ClientSession) doProtocolControlMessage(stream *Stream) error { // composer内部会自动更新peer chunk size. log.Infof("-----> Set Chunk Size %d. [%s]", val, s.UniqueKey) default: - log.Errorf("unknown msg type id. [%s] id=%d", s.UniqueKey, stream.header.MsgTypeID) + log.Errorf("read unknown protocol control message. [%s] typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString()) } return nil } diff --git a/pkg/rtmp/server.go b/pkg/rtmp/server.go index 775408d..ca4dcff 100644 --- a/pkg/rtmp/server.go +++ b/pkg/rtmp/server.go @@ -68,7 +68,8 @@ func (server *Server) Dispose() { func (server *Server) handleTCPConnect(conn net.Conn) { log.Infof("accept a rtmp connection. remoteAddr=%v", conn.RemoteAddr()) session := NewServerSession(server, conn) - _ = session.RunLoop() + err := session.RunLoop() + log.Infof("rtmp loop done. [%s] err=%v", session.UniqueKey, err) switch session.t { case ServerSessionTypeUnknown: // noop diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index f58de9f..12336e8 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -9,7 +9,6 @@ package rtmp import ( - "encoding/hex" "net" "strings" @@ -146,10 +145,9 @@ func (s *ServerSession) doMsg(stream *Stream) error { log.Errorf("read audio/video message but server session not pub type. [%s]", s.UniqueKey) return ErrRTMP } - //log.Infof("t:%d ts:%d len:%d", stream.header.MsgTypeID, stream.header.timestampAbs, stream.msg.e - stream.msg.b) s.avObs.OnReadRTMPAVMsg(stream.toAVMsg()) default: - log.Warnf("unknown message. [%s] typeid=%d", s.UniqueKey, stream.header.MsgTypeID) + log.Warnf("read unknown message. [%s] typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString()) } return nil @@ -174,7 +172,7 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error { switch val { case "|RtmpSampleAccess": - log.Warn("recv |RtmpSampleAccess. ignore it.") + log.Warnf("read data message, ignore it. [%s] val=%s", s.UniqueKey, val) return nil case "@setDataFrame": // macos obs @@ -185,12 +183,13 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error { return err } if val != "onMetaData" { + log.Errorf("read unknown data message. [%s] val=%s, %s", s.UniqueKey, val, stream.toDebugString()) return ErrRTMP } case "onMetaData": // noop default: - log.Errorf("recv unknown message. val=%s, hex=%s", val, hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e])) + log.Errorf("read unknown data message. [%s] val=%s, %s", s.UniqueKey, val, stream.toDebugString()) return nil } @@ -224,9 +223,9 @@ func (s *ServerSession) doCommandMessage(stream *Stream) error { case "FCUnpublish": fallthrough case "getStreamLength": - log.Warnf("read command message,ignore it. [%s] %s", s.UniqueKey, cmd) + log.Warnf("read command message, ignore it. [%s] cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString()) default: - log.Errorf("unknown cmd. [%s] cmd=%s", s.UniqueKey, cmd) + log.Errorf("read unknown command message. [%s] cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString()) } return nil } diff --git a/pkg/rtmp/stream.go b/pkg/rtmp/stream.go index 604410b..a7702a6 100644 --- a/pkg/rtmp/stream.go +++ b/pkg/rtmp/stream.go @@ -8,7 +8,12 @@ package rtmp -import log "github.com/q191201771/naza/pkg/nazalog" +import ( + "encoding/hex" + "fmt" + + log "github.com/q191201771/naza/pkg/nazalog" +) const initMsgLen = 4096 @@ -41,6 +46,13 @@ func NewStream() *Stream { } } +// 序列化成可读字符串,一般用于发生错误时打印日志 +func (stream *Stream) toDebugString() string { + // 注意,这里打印的二进制数据的其实位置是从 0 开始,而不是 msg.b 位置 + return fmt.Sprintf("header=%+v, b=%d, hex=%s", + stream.header, stream.msg.b, hex.Dump(stream.msg.buf[:stream.msg.e])) +} + func (stream *Stream) toAVMsg() AVMsg { return AVMsg{ Header: stream.header, @@ -80,6 +92,10 @@ func (msg *StreamMsg) clear() { msg.e = 0 } +//func (msg *StreamMsg) bytes() []byte { +// return msg.buf[msg.b: msg.e] +//} + func (msg *StreamMsg) peekStringWithType() (string, error) { str, _, err := AMF0.ReadString(msg.buf[msg.b:msg.e]) return str, err