diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 78af494..162d8cb 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -11,12 +11,11 @@ package logic import ( "encoding/json" "fmt" + "github.com/q191201771/lal/pkg/remux" "os" "strings" "sync" - "github.com/q191201771/lal/pkg/remux" - "github.com/q191201771/naza/pkg/defertaskthread" "github.com/q191201771/lal/pkg/rtprtcp" @@ -461,6 +460,12 @@ func (group *Group) HasOutSession() bool { return group.hasOutSession() } +func (group *Group) BroadcastRTMP(msg base.RTMPMsg) { + group.mutex.Lock() + defer group.mutex.Unlock() + group.broadcastRTMP(msg) +} + // hls.Muxer func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) { // 因为最前面Feed时已经加锁了,所以这里回调上来就不用加锁了 @@ -479,10 +484,7 @@ func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) { // rtmp.PubSession or rtmp.PullSession func (group *Group) OnReadRTMPAVMsg(msg base.RTMPMsg) { - group.mutex.Lock() - defer group.mutex.Unlock() - - group.broadcastRTMP(msg) + group.BroadcastRTMP(msg) } // rtsp.PubSession @@ -522,15 +524,13 @@ func (group *Group) OnAVConfig(asc, vps, sps, pps []byte) { // rtsp.PubSession func (group *Group) OnAVPacket(pkt base.AVPacket) { - // TODO chef: 这里没有加锁,最起码下面广播前需要加锁 - msg, err := remux.AVPacket2RTMPMsg(pkt) if err != nil { nazalog.Errorf("[%s] remux av packet to rtmp msg failed. err=+%v", group.UniqueKey, err) return } - group.broadcastRTMP(msg) + group.BroadcastRTMP(msg) } func (group *Group) StringifyDebugStats() string { @@ -1012,7 +1012,7 @@ func (group *Group) disposeHLSMuxer() { streamName := param[1].(string) outPath := param[2].(string) - if g := sm.getGroup(appName, streamName); g != nil { + if g := sm.GetGroup(appName, streamName); g != nil { if g.IsHLSMuxerAlive() { nazalog.Warnf("cancel cleanup hls file path since hls muxer still alive. streamName=%s", streamName) return diff --git a/pkg/rtsp/base_in_session.go b/pkg/rtsp/base_in_session.go index bc05649..cdf167a 100644 --- a/pkg/rtsp/base_in_session.go +++ b/pkg/rtsp/base_in_session.go @@ -10,6 +10,7 @@ package rtsp import ( "encoding/hex" + "github.com/q191201771/naza/pkg/nazaatomic" "net" "sync" "time" @@ -63,24 +64,25 @@ type BaseInSession struct { staleStat *connection.Stat stat base.StatSession - m sync.Mutex + mu sync.Mutex rawSDP []byte // const after set sdpLogicCtx sdp.LogicContext // const after set - avPacketQueue *AVPacketQueue - audioUnpacker *rtprtcp.RTPUnpacker - videoUnpacker *rtprtcp.RTPUnpacker audioRRProducer *rtprtcp.RRProducer videoRRProducer *rtprtcp.RRProducer - audioSSRC uint32 - videoSSRC uint32 + + audioUnpacker *rtprtcp.RTPUnpacker + videoUnpacker *rtprtcp.RTPUnpacker + + audioSSRC nazaatomic.Uint32 + videoSSRC nazaatomic.Uint32 // only for debug log - debugLogMaxCount int - loggedReadAudioRTPCount int - loggedReadVideoRTPCount int - loggedReadRTCPCount int - loggedReadSRCount int + debugLogMaxCount uint32 + loggedReadAudioRTPCount nazaatomic.Uint32 + loggedReadVideoRTPCount nazaatomic.Uint32 + loggedReadRTCPCount nazaatomic.Uint32 + loggedReadSRCount nazaatomic.Uint32 } func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *BaseInSession { @@ -105,10 +107,10 @@ func NewBaseInSessionWithObserver(uniqueKey string, cmdSession IInterleavedPacke } func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) { - session.m.Lock() + session.mu.Lock() session.rawSDP = rawSDP session.sdpLogicCtx = sdpLogicCtx - session.m.Unlock() + session.mu.Unlock() if session.sdpLogicCtx.IsAudioUnpackable() { session.audioUnpacker = rtprtcp.NewRTPUnpacker(session.sdpLogicCtx.GetAudioPayloadTypeBase(), session.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked) @@ -125,7 +127,9 @@ func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicCo session.videoRRProducer = rtprtcp.NewRRProducer(session.sdpLogicCtx.VideoClockRate) if session.sdpLogicCtx.IsAudioUnpackable() && session.sdpLogicCtx.IsVideoUnpackable() { + session.mu.Lock() session.avPacketQueue = NewAVPacketQueue(session.onAVPacket) + session.mu.Unlock() } if session.observer != nil { @@ -189,8 +193,8 @@ func (session *BaseInSession) Dispose() error { } func (session *BaseInSession) GetSDP() ([]byte, sdp.LogicContext) { - session.m.Lock() - defer session.m.Unlock() + session.mu.Lock() + defer session.mu.Unlock() return session.rawSDP, session.sdpLogicCtx } @@ -266,6 +270,9 @@ func (session *BaseInSession) UniqueKey() string { // callback by RTPUnpacker func (session *BaseInSession) onAVPacketUnpacked(pkt base.AVPacket) { + session.mu.Lock() + defer session.mu.Unlock() + if session.avPacketQueue != nil { session.avPacketQueue.Feed(pkt) } else { @@ -309,9 +316,9 @@ func (session *BaseInSession) handleRTCPPacket(b []byte, rAddr *net.UDPAddr) err return ErrRTSP } - if session.loggedReadRTCPCount < session.debugLogMaxCount { + if session.loggedReadRTCPCount.Load() < session.debugLogMaxCount { nazalog.Debugf("[%s] LOGPACKET. read rtcp=%s", session.uniqueKey, hex.Dump(nazastring.SubSliceSafety(b, 32))) - session.loggedReadRTCPCount++ + session.loggedReadRTCPCount.Increment() } packetType := b[1] @@ -319,14 +326,16 @@ func (session *BaseInSession) handleRTCPPacket(b []byte, rAddr *net.UDPAddr) err switch packetType { case rtprtcp.RTCPPacketTypeSR: sr := rtprtcp.ParseSR(b) - if session.loggedReadSRCount < session.debugLogMaxCount { + if session.loggedReadSRCount.Load() < session.debugLogMaxCount { nazalog.Debugf("[%s] LOGPACKET. %+v", session.uniqueKey, sr) - session.loggedReadSRCount++ + session.loggedReadSRCount.Increment() } var rrBuf []byte switch sr.SenderSSRC { - case session.audioSSRC: + case session.audioSSRC.Load(): + session.mu.Lock() rrBuf = session.audioRRProducer.Produce(sr.GetMiddleNTP()) + session.mu.Unlock() if rrBuf != nil { if rAddr != nil { _ = session.audioRTCPConn.Write2Addr(rrBuf, rAddr) @@ -335,8 +344,10 @@ func (session *BaseInSession) handleRTCPPacket(b []byte, rAddr *net.UDPAddr) err } session.currConnStat.WroteBytesSum.Add(uint64(len(b))) } - case session.videoSSRC: + case session.videoSSRC.Load(): + session.mu.Lock() rrBuf = session.videoRRProducer.Produce(sr.GetMiddleNTP()) + session.mu.Unlock() if rrBuf != nil { if rAddr != nil { _ = session.videoRTCPConn.Write2Addr(rrBuf, rAddr) @@ -373,7 +384,7 @@ func (session *BaseInSession) handleRTPPacket(b []byte) error { return ErrRTSP } - h, err := rtprtcp.ParseRTPPacket(b) + h, err := rtprtcp.ParseRTPHeader(b) if err != nil { nazalog.Errorf("[%s] handleRTPPacket invalid rtp packet. err=%+v", session.uniqueKey, err) return err @@ -385,27 +396,31 @@ func (session *BaseInSession) handleRTPPacket(b []byte) error { // 接收数据时,保证了sdp的原始类型对应 if session.sdpLogicCtx.IsAudioPayloadTypeOrigin(packetType) { - if session.loggedReadAudioRTPCount < session.debugLogMaxCount { - nazalog.Debugf("[%s] LOGPACKET. read audio rtp=%+v", session.uniqueKey, h) - session.loggedReadAudioRTPCount++ + if session.loggedReadAudioRTPCount.Load() < session.debugLogMaxCount { + nazalog.Debugf("[%s] LOGPACKET. read audio rtp=%+v, len=%d", session.uniqueKey, h, len(b)) + session.loggedReadAudioRTPCount.Increment() } - session.audioSSRC = h.SSRC + session.audioSSRC.Store(h.SSRC) session.observer.OnRTPPacket(pkt) + session.mu.Lock() session.audioRRProducer.FeedRTPPacket(h.Seq) + session.mu.Unlock() if session.audioUnpacker != nil { session.audioUnpacker.Feed(pkt) } } else if session.sdpLogicCtx.IsVideoPayloadTypeOrigin(packetType) { - if session.loggedReadVideoRTPCount < session.debugLogMaxCount { - nazalog.Debugf("[%s] LOGPACKET. read video rtp=%+v", session.uniqueKey, h) - session.loggedReadVideoRTPCount++ + if session.loggedReadVideoRTPCount.Load() < session.debugLogMaxCount { + nazalog.Debugf("[%s] LOGPACKET. read video rtp=%+v, len=%d", session.uniqueKey, h, len(b)) + session.loggedReadVideoRTPCount.Increment() } - session.videoSSRC = h.SSRC + session.videoSSRC.Store(h.SSRC) session.observer.OnRTPPacket(pkt) + session.mu.Lock() session.videoRRProducer.FeedRTPPacket(h.Seq) + session.mu.Unlock() if session.videoUnpacker != nil { session.videoUnpacker.Feed(pkt)