From fdaf48528eb898fef8e43bad5b7ac79e4698d869 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sun, 30 Jan 2022 12:02:22 +0800 Subject: [PATCH] =?UTF-8?q?[opt]=20rtmp=E5=92=8Crtsp=E6=94=B6=E5=8C=85?= =?UTF-8?q?=E6=97=B6=E6=B7=BB=E5=8A=A0trace=E7=BA=A7=E5=88=AB=E6=97=A5?= =?UTF-8?q?=E5=BF=97=20#63?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/base/log.go | 56 +++++++++++++++++++++++++++++++++++++ pkg/rtmp/chunk_composer.go | 30 ++++++++++++++------ pkg/rtsp/base_in_session.go | 44 +++++++++++++---------------- 3 files changed, 97 insertions(+), 33 deletions(-) create mode 100644 pkg/base/log.go diff --git a/pkg/base/log.go b/pkg/base/log.go new file mode 100644 index 0000000..a88161c --- /dev/null +++ b/pkg/base/log.go @@ -0,0 +1,56 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package base + +import ( + "fmt" + + "github.com/q191201771/naza/pkg/nazalog" +) + +type LogDump struct { + debugMaxNum int + + debugCount int +} + +// NewLogDump +// +// @param debugMaxNum: 日志最小级别为debug时,使用debug打印日志次数的阈值 +// +func NewLogDump(debugMaxNum int) LogDump { + return LogDump{ + debugMaxNum: debugMaxNum, + } +} + +func (ld *LogDump) ShouldDump() bool { + switch nazalog.GetOption().Level { + case nazalog.LevelTrace: + return true + case nazalog.LevelDebug: + if ld.debugCount >= ld.debugMaxNum { + return false + } + ld.debugCount++ + return true + } + return false +} + +// Outf +// +// 调用之前需调用 ShouldDump +// 将 ShouldDump 独立出来的目的是避免不需要打印日志时, Outf 调用前构造实参的开销,比如 +// ld.Outf("hex=%s", hex.Dump(buf)) +// 这个hex.Dump调用 +// +func (ld *LogDump) Outf(format string, v ...interface{}) { + nazalog.Out(nazalog.GetOption().Level, 3, fmt.Sprintf(format, v...)) +} diff --git a/pkg/rtmp/chunk_composer.go b/pkg/rtmp/chunk_composer.go index 41c92e7..ae17623 100644 --- a/pkg/rtmp/chunk_composer.go +++ b/pkg/rtmp/chunk_composer.go @@ -8,19 +8,22 @@ package rtmp -// chunk_composer.go -// @pure -// 读取chunk,并组织chunk,生成message返回给上层 - import ( + "encoding/hex" "io" + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/nazabytes" "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/bele" ) +// ChunkComposer +// +// 读取chunk,并合并chunk,生成message返回给上层 +// type ChunkComposer struct { peerChunkSize uint32 csid2stream map[int]*Stream @@ -123,7 +126,10 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { case 3: // noop } - //nazalog.Debugf("RTMP_CHUNK_COMPOSER chunk.fmt=%d, csid=%d, header=%+v", fmt, csid, stream.header) + if nazalog.GetOption().Level == nazalog.LevelTrace { + nazalog.Tracef("[%p] RTMP_READ chunk.fmt=%d, csid=%d, header=%+v, timestamp=%d", + c, fmt, csid, stream.header, stream.timestamp) + } // 5.3.1.3 Extended Timestamp // 使用ffmpeg推流时,发现时间戳超过3字节最大值后,即使是fmt3(即包头大小为0),依然存在ext ts字段 @@ -136,8 +142,12 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { if _, err := io.ReadAtLeast(reader, bootstrap[:4], 4); err != nil { return err } - stream.timestamp = bele.BeUint32(bootstrap) - //nazalog.Debugf("RTMP_CHUNK_COMPOSER ext. extTs=%d", stream.header.Timestamp) + newTs := bele.BeUint32(bootstrap) + if nazalog.GetOption().Level == nazalog.LevelTrace { + nazalog.Tracef("[%p] RTMP_READ ext. ts=(%d,%d,%d)", + c, stream.timestamp, newTs, stream.header.TimestampAbs) + } + stream.timestamp = newTs switch fmt { case 0: stream.header.TimestampAbs = stream.timestamp @@ -178,8 +188,10 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { stream.header.TimestampAbs += stream.timestamp } absTsFlag = false - //nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, ctimestamp=%d, c=%p", - // fmt, csid, stream.header, stream.timestamp, c) + if nazalog.GetOption().Level == nazalog.LevelTrace { + nazalog.Tracef("[%p] RTMP_READ cb. fmt=%d, csid=%d, header=%+v, timestamp=%d, hex=%s", + c, fmt, csid, stream.header, stream.timestamp, hex.Dump(nazabytes.Prefix(stream.msg.buff.Bytes(), 32))) + } if stream.header.MsgTypeId == base.RtmpTypeIdAggregateMessage { firstSubMessage := true diff --git a/pkg/rtsp/base_in_session.go b/pkg/rtsp/base_in_session.go index 2691d2d..ccb655f 100644 --- a/pkg/rtsp/base_in_session.go +++ b/pkg/rtsp/base_in_session.go @@ -81,15 +81,13 @@ type BaseInSession struct { audioSsrc nazaatomic.Uint32 videoSsrc nazaatomic.Uint32 - // only for debug log - debugLogMaxCount uint32 - loggedReadAudioRtpCount nazaatomic.Uint32 - loggedReadVideoRtpCount nazaatomic.Uint32 - loggedReadRtcpCount nazaatomic.Uint32 - loggedReadSrCount nazaatomic.Uint32 - disposeOnce sync.Once waitChan chan error + + dumpReadAudioRtp base.LogDump + dumpReadVideoRtp base.LogDump + dumpReadRtcp base.LogDump + dumpReadSr base.LogDump } func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *BaseInSession { @@ -101,8 +99,10 @@ func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *Ba StartTime: time.Now().Format("2006-01-02 15:04:05.999"), }, cmdSession: cmdSession, - debugLogMaxCount: 3, waitChan: make(chan error, 1), + dumpReadAudioRtp: base.NewLogDump(1), + dumpReadVideoRtp: base.NewLogDump(1), + dumpReadSr: base.NewLogDump(2), } nazalog.Infof("[%s] lifecycle new rtsp BaseInSession. session=%p", uniqueKey, s) return s @@ -331,19 +331,13 @@ func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) err return nazaerrors.Wrap(base.ErrRtsp) } - if session.loggedReadRtcpCount.Load() < session.debugLogMaxCount { - nazalog.Debugf("[%s] LOGPACKET. read rtcp=%s", session.uniqueKey, hex.Dump(nazabytes.Prefix(b, 32))) - session.loggedReadRtcpCount.Increment() - } - packetType := b[1] switch packetType { case rtprtcp.RtcpPacketTypeSr: sr := rtprtcp.ParseSr(b) - if session.loggedReadSrCount.Load() < session.debugLogMaxCount { - nazalog.Debugf("[%s] LOGPACKET. %+v", session.uniqueKey, sr) - session.loggedReadSrCount.Increment() + if session.dumpReadSr.ShouldDump() { + session.dumpReadSr.Outf("[%s] READ_RTCP. sr=%+v", session.uniqueKey, sr) } var rrBuf []byte switch sr.SenderSsrc { @@ -372,13 +366,15 @@ func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) err session.currConnStat.WroteBytesSum.Add(uint64(len(b))) } default: + // noop + // // ffmpeg推流时,会在发送第一个RTP包之前就发送一个SR,所以关闭这个警告日志 //nazalog.Warnf("[%s] read rtcp sr but senderSsrc invalid. senderSsrc=%d, audio=%d, video=%d", // p.uniqueKey, sr.SenderSsrc, p.audioSsrc, p.videoSsrc) - return nazaerrors.Wrap(base.ErrRtsp) } default: - nazalog.Warnf("[%s] handleRtcpPacket but type unknown. type=%d", session.uniqueKey, b[1]) + nazalog.Warnf("[%s] handleRtcpPacket but type unknown. type=%d, len=%d, hex=%s", + session.uniqueKey, b[1], len(b), hex.Dump(nazabytes.Prefix(b, 32))) return nazaerrors.Wrap(base.ErrRtsp) } @@ -411,9 +407,9 @@ func (session *BaseInSession) handleRtpPacket(b []byte) error { // 接收数据时,保证了sdp的原始类型对应 if session.sdpCtx.IsAudioPayloadTypeOrigin(packetType) { - if session.loggedReadAudioRtpCount.Load() < session.debugLogMaxCount { - nazalog.Debugf("[%s] LOGPACKET. read audio rtp=%+v, len=%d", session.uniqueKey, h, len(b)) - session.loggedReadAudioRtpCount.Increment() + if session.dumpReadAudioRtp.ShouldDump() { + session.dumpReadAudioRtp.Outf("[%s] READ_RTP. audio, h=%+v, len=%d, hex=%s", + session.uniqueKey, h, len(b), hex.Dump(nazabytes.Prefix(b, 32))) } session.audioSsrc.Store(h.Ssrc) @@ -426,9 +422,9 @@ func (session *BaseInSession) handleRtpPacket(b []byte) error { session.audioUnpacker.Feed(pkt) } } else if session.sdpCtx.IsVideoPayloadTypeOrigin(packetType) { - if session.loggedReadVideoRtpCount.Load() < session.debugLogMaxCount { - nazalog.Debugf("[%s] LOGPACKET. read video rtp=%+v, len=%d", session.uniqueKey, h, len(b)) - session.loggedReadVideoRtpCount.Increment() + if session.dumpReadVideoRtp.ShouldDump() { + session.dumpReadVideoRtp.Outf("[%s] READ_RTP. video, h=%+v, len=%d, hex=%s", + session.uniqueKey, h, len(b), hex.Dump(nazabytes.Prefix(b, 32))) } session.videoSsrc.Store(h.Ssrc)