From c5f756a51d0d2744a32cfb6e29a02e3cb49726e9 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 13 Mar 2021 20:35:34 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=20=E4=BF=AE=E5=A4=8D32=E4=BD=8Darm?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E4=BD=BF=E7=94=A8rtsp=E5=B4=A9=E6=BA=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- go.sum | 4 ++-- pkg/rtsp/base_in_session.go | 23 +++++++++++------------ pkg/rtsp/base_out_session.go | 17 ++++++++--------- 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index 72d9cbb..99ee826 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.13 -require github.com/q191201771/naza v0.17.0 +require github.com/q191201771/naza v0.17.1 diff --git a/go.sum b/go.sum index d85cb6e..5df3d7b 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/naza v0.17.0 h1:ZuETbHDX8srxxxL4AiDW+HTwEHnbfJvS40R8PUNStpQ= -github.com/q191201771/naza v0.17.0/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= +github.com/q191201771/naza v0.17.1 h1:gWk9jsybJoZRc4Bz39QMvtKoqLNQl+TY0mvWnuEcl34= +github.com/q191201771/naza v0.17.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= diff --git a/pkg/rtsp/base_in_session.go b/pkg/rtsp/base_in_session.go index 9958a0d..e075759 100644 --- a/pkg/rtsp/base_in_session.go +++ b/pkg/rtsp/base_in_session.go @@ -12,7 +12,6 @@ import ( "encoding/hex" "net" "sync" - "sync/atomic" "time" "github.com/q191201771/naza/pkg/nazaerrors" @@ -59,7 +58,7 @@ type BaseInSession struct { videoRTPChannel int videoRTCPChannel int - currConnStat connection.Stat + currConnStat connection.StatAtomic prevConnStat connection.Stat staleStat *connection.Stat stat base.StatSession @@ -227,14 +226,14 @@ func (session *BaseInSession) WriteRTPRTCPDummy() { } func (session *BaseInSession) GetStat() base.StatSession { - session.stat.ReadBytesSum = atomic.LoadUint64(&session.currConnStat.ReadBytesSum) - session.stat.WroteBytesSum = atomic.LoadUint64(&session.currConnStat.WroteBytesSum) + session.stat.ReadBytesSum = session.currConnStat.ReadBytesSum.Load() + session.stat.WroteBytesSum = session.currConnStat.WroteBytesSum.Load() return session.stat } func (session *BaseInSession) UpdateStat(interval uint32) { - readBytesSum := atomic.LoadUint64(&session.currConnStat.ReadBytesSum) - wroteBytesSum := atomic.LoadUint64(&session.currConnStat.WroteBytesSum) + readBytesSum := session.currConnStat.ReadBytesSum.Load() + wroteBytesSum := session.currConnStat.WroteBytesSum.Load() rDiff := readBytesSum - session.prevConnStat.ReadBytesSum session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(interval)) wDiff := wroteBytesSum - session.prevConnStat.WroteBytesSum @@ -245,8 +244,8 @@ func (session *BaseInSession) UpdateStat(interval uint32) { } func (session *BaseInSession) IsAlive() (readAlive, writeAlive bool) { - readBytesSum := atomic.LoadUint64(&session.currConnStat.ReadBytesSum) - wroteBytesSum := atomic.LoadUint64(&session.currConnStat.WroteBytesSum) + readBytesSum := session.currConnStat.ReadBytesSum.Load() + wroteBytesSum := session.currConnStat.WroteBytesSum.Load() if session.staleStat == nil { session.staleStat = new(connection.Stat) session.staleStat.ReadBytesSum = readBytesSum @@ -299,7 +298,7 @@ func (session *BaseInSession) onReadRTCPPacket(b []byte, rAddr *net.UDPAddr, err // @param rAddr 对端地址,往对端发送数据时使用,注意,如果nil,则表示是interleaved模式,我们直接往TCP连接发数据 func (session *BaseInSession) handleRTCPPacket(b []byte, rAddr *net.UDPAddr) error { - atomic.AddUint64(&session.currConnStat.ReadBytesSum, uint64(len(b))) + session.currConnStat.ReadBytesSum.Add(uint64(len(b))) if len(b) <= 0 { nazalog.Errorf("[%s] handleRTCPPacket but length invalid. len=%d", session.UniqueKey, len(b)) @@ -330,7 +329,7 @@ func (session *BaseInSession) handleRTCPPacket(b []byte, rAddr *net.UDPAddr) err } else { _ = session.cmdSession.WriteInterleavedPacket(rrBuf, session.audioRTCPChannel) } - atomic.AddUint64(&session.currConnStat.WroteBytesSum, uint64(len(b))) + session.currConnStat.WroteBytesSum.Add(uint64(len(b))) } case session.videoSSRC: rrBuf = session.videoRRProducer.Produce(sr.GetMiddleNTP()) @@ -340,7 +339,7 @@ func (session *BaseInSession) handleRTCPPacket(b []byte, rAddr *net.UDPAddr) err } else { _ = session.cmdSession.WriteInterleavedPacket(rrBuf, session.videoRTCPChannel) } - atomic.AddUint64(&session.currConnStat.WroteBytesSum, uint64(len(b))) + session.currConnStat.WroteBytesSum.Add(uint64(len(b))) } default: // ffmpeg推流时,会在发送第一个RTP包之前就发送一个SR,所以关闭这个警告日志 @@ -357,7 +356,7 @@ func (session *BaseInSession) handleRTCPPacket(b []byte, rAddr *net.UDPAddr) err } func (session *BaseInSession) handleRTPPacket(b []byte) error { - atomic.AddUint64(&session.currConnStat.ReadBytesSum, uint64(len(b))) + session.currConnStat.ReadBytesSum.Add(uint64(len(b))) if len(b) < rtprtcp.RTPFixedHeaderLength { nazalog.Errorf("[%s] handleRTPPacket but length invalid. len=%d", session.UniqueKey, len(b)) diff --git a/pkg/rtsp/base_out_session.go b/pkg/rtsp/base_out_session.go index d2328ef..6c81e98 100644 --- a/pkg/rtsp/base_out_session.go +++ b/pkg/rtsp/base_out_session.go @@ -11,7 +11,6 @@ package rtsp import ( "encoding/hex" "net" - "sync/atomic" "time" "github.com/q191201771/lal/pkg/rtprtcp" @@ -42,7 +41,7 @@ type BaseOutSession struct { videoRTCPChannel int stat base.StatSession - currConnStat connection.Stat + currConnStat connection.StatAtomic prevConnStat connection.Stat staleStat *connection.Stat @@ -140,7 +139,7 @@ func (session *BaseOutSession) HandleInterleavedPacket(b []byte, channel int) { } func (session *BaseOutSession) WriteRTPPacket(packet rtprtcp.RTPPacket) { - atomic.AddUint64(&session.currConnStat.WroteBytesSum, uint64(len(packet.Raw))) + session.currConnStat.WroteBytesSum.Add(uint64(len(packet.Raw))) // 发送数据时,保证和sdp的原始类型对应 t := int(packet.Header.PacketType) @@ -174,14 +173,14 @@ func (session *BaseOutSession) WriteRTPPacket(packet rtprtcp.RTPPacket) { } func (session *BaseOutSession) GetStat() base.StatSession { - session.stat.ReadBytesSum = atomic.LoadUint64(&session.currConnStat.ReadBytesSum) - session.stat.WroteBytesSum = atomic.LoadUint64(&session.currConnStat.WroteBytesSum) + session.stat.ReadBytesSum = session.currConnStat.ReadBytesSum.Load() + session.stat.WroteBytesSum = session.currConnStat.WroteBytesSum.Load() return session.stat } func (session *BaseOutSession) UpdateStat(interval uint32) { - readBytesSum := atomic.LoadUint64(&session.currConnStat.ReadBytesSum) - wroteBytesSum := atomic.LoadUint64(&session.currConnStat.WroteBytesSum) + readBytesSum := session.currConnStat.ReadBytesSum.Load() + wroteBytesSum := session.currConnStat.WroteBytesSum.Load() rDiff := readBytesSum - session.prevConnStat.ReadBytesSum session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(interval)) wDiff := wroteBytesSum - session.prevConnStat.WroteBytesSum @@ -192,8 +191,8 @@ func (session *BaseOutSession) UpdateStat(interval uint32) { } func (session *BaseOutSession) IsAlive() (readAlive, writeAlive bool) { - readBytesSum := atomic.LoadUint64(&session.currConnStat.ReadBytesSum) - wroteBytesSum := atomic.LoadUint64(&session.currConnStat.WroteBytesSum) + readBytesSum := session.currConnStat.ReadBytesSum.Load() + wroteBytesSum := session.currConnStat.WroteBytesSum.Load() if session.staleStat == nil { session.staleStat = new(connection.Stat) session.staleStat.ReadBytesSum = readBytesSum