[opt] rtmp和rtsp收包时添加trace级别日志 #63

pull/130/head
q191201771 3 years ago
parent 9bc0beba1d
commit fdaf48528e

@ -0,0 +1,56 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import (
"fmt"
"github.com/q191201771/naza/pkg/nazalog"
)
type LogDump struct {
debugMaxNum int
debugCount int
}
// NewLogDump
//
// @param debugMaxNum: 日志最小级别为debug时使用debug打印日志次数的阈值
//
func NewLogDump(debugMaxNum int) LogDump {
return LogDump{
debugMaxNum: debugMaxNum,
}
}
func (ld *LogDump) ShouldDump() bool {
switch nazalog.GetOption().Level {
case nazalog.LevelTrace:
return true
case nazalog.LevelDebug:
if ld.debugCount >= ld.debugMaxNum {
return false
}
ld.debugCount++
return true
}
return false
}
// Outf
//
// 调用之前需调用 ShouldDump
// 将 ShouldDump 独立出来的目的是避免不需要打印日志时, Outf 调用前构造实参的开销,比如
// ld.Outf("hex=%s", hex.Dump(buf))
// 这个hex.Dump调用
//
func (ld *LogDump) Outf(format string, v ...interface{}) {
nazalog.Out(nazalog.GetOption().Level, 3, fmt.Sprintf(format, v...))
}

@ -8,19 +8,22 @@
package rtmp
// chunk_composer.go
// @pure
// 读取chunk并组织chunk生成message返回给上层
import (
"encoding/hex"
"io"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/bele"
)
// ChunkComposer
//
// 读取chunk并合并chunk生成message返回给上层
//
type ChunkComposer struct {
peerChunkSize uint32
csid2stream map[int]*Stream
@ -123,7 +126,10 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
case 3:
// noop
}
//nazalog.Debugf("RTMP_CHUNK_COMPOSER chunk.fmt=%d, csid=%d, header=%+v", fmt, csid, stream.header)
if nazalog.GetOption().Level == nazalog.LevelTrace {
nazalog.Tracef("[%p] RTMP_READ chunk.fmt=%d, csid=%d, header=%+v, timestamp=%d",
c, fmt, csid, stream.header, stream.timestamp)
}
// 5.3.1.3 Extended Timestamp
// 使用ffmpeg推流时发现时间戳超过3字节最大值后即使是fmt3(即包头大小为0)依然存在ext ts字段
@ -136,8 +142,12 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
if _, err := io.ReadAtLeast(reader, bootstrap[:4], 4); err != nil {
return err
}
stream.timestamp = bele.BeUint32(bootstrap)
//nazalog.Debugf("RTMP_CHUNK_COMPOSER ext. extTs=%d", stream.header.Timestamp)
newTs := bele.BeUint32(bootstrap)
if nazalog.GetOption().Level == nazalog.LevelTrace {
nazalog.Tracef("[%p] RTMP_READ ext. ts=(%d,%d,%d)",
c, stream.timestamp, newTs, stream.header.TimestampAbs)
}
stream.timestamp = newTs
switch fmt {
case 0:
stream.header.TimestampAbs = stream.timestamp
@ -178,8 +188,10 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
stream.header.TimestampAbs += stream.timestamp
}
absTsFlag = false
//nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, ctimestamp=%d, c=%p",
// fmt, csid, stream.header, stream.timestamp, c)
if nazalog.GetOption().Level == nazalog.LevelTrace {
nazalog.Tracef("[%p] RTMP_READ cb. fmt=%d, csid=%d, header=%+v, timestamp=%d, hex=%s",
c, fmt, csid, stream.header, stream.timestamp, hex.Dump(nazabytes.Prefix(stream.msg.buff.Bytes(), 32)))
}
if stream.header.MsgTypeId == base.RtmpTypeIdAggregateMessage {
firstSubMessage := true

@ -81,15 +81,13 @@ type BaseInSession struct {
audioSsrc nazaatomic.Uint32
videoSsrc nazaatomic.Uint32
// only for debug log
debugLogMaxCount uint32
loggedReadAudioRtpCount nazaatomic.Uint32
loggedReadVideoRtpCount nazaatomic.Uint32
loggedReadRtcpCount nazaatomic.Uint32
loggedReadSrCount nazaatomic.Uint32
disposeOnce sync.Once
waitChan chan error
dumpReadAudioRtp base.LogDump
dumpReadVideoRtp base.LogDump
dumpReadRtcp base.LogDump
dumpReadSr base.LogDump
}
func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *BaseInSession {
@ -101,8 +99,10 @@ func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *Ba
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
},
cmdSession: cmdSession,
debugLogMaxCount: 3,
waitChan: make(chan error, 1),
dumpReadAudioRtp: base.NewLogDump(1),
dumpReadVideoRtp: base.NewLogDump(1),
dumpReadSr: base.NewLogDump(2),
}
nazalog.Infof("[%s] lifecycle new rtsp BaseInSession. session=%p", uniqueKey, s)
return s
@ -331,19 +331,13 @@ func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) err
return nazaerrors.Wrap(base.ErrRtsp)
}
if session.loggedReadRtcpCount.Load() < session.debugLogMaxCount {
nazalog.Debugf("[%s] LOGPACKET. read rtcp=%s", session.uniqueKey, hex.Dump(nazabytes.Prefix(b, 32)))
session.loggedReadRtcpCount.Increment()
}
packetType := b[1]
switch packetType {
case rtprtcp.RtcpPacketTypeSr:
sr := rtprtcp.ParseSr(b)
if session.loggedReadSrCount.Load() < session.debugLogMaxCount {
nazalog.Debugf("[%s] LOGPACKET. %+v", session.uniqueKey, sr)
session.loggedReadSrCount.Increment()
if session.dumpReadSr.ShouldDump() {
session.dumpReadSr.Outf("[%s] READ_RTCP. sr=%+v", session.uniqueKey, sr)
}
var rrBuf []byte
switch sr.SenderSsrc {
@ -372,13 +366,15 @@ func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) err
session.currConnStat.WroteBytesSum.Add(uint64(len(b)))
}
default:
// noop
//
// ffmpeg推流时会在发送第一个RTP包之前就发送一个SR所以关闭这个警告日志
//nazalog.Warnf("[%s] read rtcp sr but senderSsrc invalid. senderSsrc=%d, audio=%d, video=%d",
// p.uniqueKey, sr.SenderSsrc, p.audioSsrc, p.videoSsrc)
return nazaerrors.Wrap(base.ErrRtsp)
}
default:
nazalog.Warnf("[%s] handleRtcpPacket but type unknown. type=%d", session.uniqueKey, b[1])
nazalog.Warnf("[%s] handleRtcpPacket but type unknown. type=%d, len=%d, hex=%s",
session.uniqueKey, b[1], len(b), hex.Dump(nazabytes.Prefix(b, 32)))
return nazaerrors.Wrap(base.ErrRtsp)
}
@ -411,9 +407,9 @@ func (session *BaseInSession) handleRtpPacket(b []byte) error {
// 接收数据时保证了sdp的原始类型对应
if session.sdpCtx.IsAudioPayloadTypeOrigin(packetType) {
if session.loggedReadAudioRtpCount.Load() < session.debugLogMaxCount {
nazalog.Debugf("[%s] LOGPACKET. read audio rtp=%+v, len=%d", session.uniqueKey, h, len(b))
session.loggedReadAudioRtpCount.Increment()
if session.dumpReadAudioRtp.ShouldDump() {
session.dumpReadAudioRtp.Outf("[%s] READ_RTP. audio, h=%+v, len=%d, hex=%s",
session.uniqueKey, h, len(b), hex.Dump(nazabytes.Prefix(b, 32)))
}
session.audioSsrc.Store(h.Ssrc)
@ -426,9 +422,9 @@ func (session *BaseInSession) handleRtpPacket(b []byte) error {
session.audioUnpacker.Feed(pkt)
}
} else if session.sdpCtx.IsVideoPayloadTypeOrigin(packetType) {
if session.loggedReadVideoRtpCount.Load() < session.debugLogMaxCount {
nazalog.Debugf("[%s] LOGPACKET. read video rtp=%+v, len=%d", session.uniqueKey, h, len(b))
session.loggedReadVideoRtpCount.Increment()
if session.dumpReadVideoRtp.ShouldDump() {
session.dumpReadVideoRtp.Outf("[%s] READ_RTP. video, h=%+v, len=%d, hex=%s",
session.uniqueKey, h, len(b), hex.Dump(nazabytes.Prefix(b, 32)))
}
session.videoSsrc.Store(h.Ssrc)

Loading…
Cancel
Save