[fix] 修复lalserver中(rtsp.BaseInSession以及logic.Group)的一些竞态读写,https://github.com/q191201771/lal/issues/47

pull/49/head
q191201771 4 years ago
parent 7c07f2ba05
commit 9bb3dac3d1

@ -11,12 +11,11 @@ package logic
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/q191201771/lal/pkg/remux"
"os" "os"
"strings" "strings"
"sync" "sync"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/naza/pkg/defertaskthread" "github.com/q191201771/naza/pkg/defertaskthread"
"github.com/q191201771/lal/pkg/rtprtcp" "github.com/q191201771/lal/pkg/rtprtcp"
@ -461,6 +460,12 @@ func (group *Group) HasOutSession() bool {
return group.hasOutSession() return group.hasOutSession()
} }
func (group *Group) BroadcastRTMP(msg base.RTMPMsg) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.broadcastRTMP(msg)
}
// hls.Muxer // hls.Muxer
func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) { func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) {
// 因为最前面Feed时已经加锁了所以这里回调上来就不用加锁了 // 因为最前面Feed时已经加锁了所以这里回调上来就不用加锁了
@ -479,10 +484,7 @@ func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) {
// rtmp.PubSession or rtmp.PullSession // rtmp.PubSession or rtmp.PullSession
func (group *Group) OnReadRTMPAVMsg(msg base.RTMPMsg) { func (group *Group) OnReadRTMPAVMsg(msg base.RTMPMsg) {
group.mutex.Lock() group.BroadcastRTMP(msg)
defer group.mutex.Unlock()
group.broadcastRTMP(msg)
} }
// rtsp.PubSession // rtsp.PubSession
@ -522,15 +524,13 @@ func (group *Group) OnAVConfig(asc, vps, sps, pps []byte) {
// rtsp.PubSession // rtsp.PubSession
func (group *Group) OnAVPacket(pkt base.AVPacket) { func (group *Group) OnAVPacket(pkt base.AVPacket) {
// TODO chef: 这里没有加锁,最起码下面广播前需要加锁
msg, err := remux.AVPacket2RTMPMsg(pkt) msg, err := remux.AVPacket2RTMPMsg(pkt)
if err != nil { if err != nil {
nazalog.Errorf("[%s] remux av packet to rtmp msg failed. err=+%v", group.UniqueKey, err) nazalog.Errorf("[%s] remux av packet to rtmp msg failed. err=+%v", group.UniqueKey, err)
return return
} }
group.broadcastRTMP(msg) group.BroadcastRTMP(msg)
} }
func (group *Group) StringifyDebugStats() string { func (group *Group) StringifyDebugStats() string {
@ -1012,7 +1012,7 @@ func (group *Group) disposeHLSMuxer() {
streamName := param[1].(string) streamName := param[1].(string)
outPath := param[2].(string) outPath := param[2].(string)
if g := sm.getGroup(appName, streamName); g != nil { if g := sm.GetGroup(appName, streamName); g != nil {
if g.IsHLSMuxerAlive() { if g.IsHLSMuxerAlive() {
nazalog.Warnf("cancel cleanup hls file path since hls muxer still alive. streamName=%s", streamName) nazalog.Warnf("cancel cleanup hls file path since hls muxer still alive. streamName=%s", streamName)
return return

@ -10,6 +10,7 @@ package rtsp
import ( import (
"encoding/hex" "encoding/hex"
"github.com/q191201771/naza/pkg/nazaatomic"
"net" "net"
"sync" "sync"
"time" "time"
@ -63,24 +64,25 @@ type BaseInSession struct {
staleStat *connection.Stat staleStat *connection.Stat
stat base.StatSession stat base.StatSession
m sync.Mutex mu sync.Mutex
rawSDP []byte // const after set rawSDP []byte // const after set
sdpLogicCtx sdp.LogicContext // const after set sdpLogicCtx sdp.LogicContext // const after set
avPacketQueue *AVPacketQueue avPacketQueue *AVPacketQueue
audioUnpacker *rtprtcp.RTPUnpacker
videoUnpacker *rtprtcp.RTPUnpacker
audioRRProducer *rtprtcp.RRProducer audioRRProducer *rtprtcp.RRProducer
videoRRProducer *rtprtcp.RRProducer videoRRProducer *rtprtcp.RRProducer
audioSSRC uint32
videoSSRC uint32 audioUnpacker *rtprtcp.RTPUnpacker
videoUnpacker *rtprtcp.RTPUnpacker
audioSSRC nazaatomic.Uint32
videoSSRC nazaatomic.Uint32
// only for debug log // only for debug log
debugLogMaxCount int debugLogMaxCount uint32
loggedReadAudioRTPCount int loggedReadAudioRTPCount nazaatomic.Uint32
loggedReadVideoRTPCount int loggedReadVideoRTPCount nazaatomic.Uint32
loggedReadRTCPCount int loggedReadRTCPCount nazaatomic.Uint32
loggedReadSRCount int loggedReadSRCount nazaatomic.Uint32
} }
func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *BaseInSession { func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *BaseInSession {
@ -105,10 +107,10 @@ func NewBaseInSessionWithObserver(uniqueKey string, cmdSession IInterleavedPacke
} }
func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) { func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) {
session.m.Lock() session.mu.Lock()
session.rawSDP = rawSDP session.rawSDP = rawSDP
session.sdpLogicCtx = sdpLogicCtx session.sdpLogicCtx = sdpLogicCtx
session.m.Unlock() session.mu.Unlock()
if session.sdpLogicCtx.IsAudioUnpackable() { if session.sdpLogicCtx.IsAudioUnpackable() {
session.audioUnpacker = rtprtcp.NewRTPUnpacker(session.sdpLogicCtx.GetAudioPayloadTypeBase(), session.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked) session.audioUnpacker = rtprtcp.NewRTPUnpacker(session.sdpLogicCtx.GetAudioPayloadTypeBase(), session.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked)
@ -125,7 +127,9 @@ func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicCo
session.videoRRProducer = rtprtcp.NewRRProducer(session.sdpLogicCtx.VideoClockRate) session.videoRRProducer = rtprtcp.NewRRProducer(session.sdpLogicCtx.VideoClockRate)
if session.sdpLogicCtx.IsAudioUnpackable() && session.sdpLogicCtx.IsVideoUnpackable() { if session.sdpLogicCtx.IsAudioUnpackable() && session.sdpLogicCtx.IsVideoUnpackable() {
session.mu.Lock()
session.avPacketQueue = NewAVPacketQueue(session.onAVPacket) session.avPacketQueue = NewAVPacketQueue(session.onAVPacket)
session.mu.Unlock()
} }
if session.observer != nil { if session.observer != nil {
@ -189,8 +193,8 @@ func (session *BaseInSession) Dispose() error {
} }
func (session *BaseInSession) GetSDP() ([]byte, sdp.LogicContext) { func (session *BaseInSession) GetSDP() ([]byte, sdp.LogicContext) {
session.m.Lock() session.mu.Lock()
defer session.m.Unlock() defer session.mu.Unlock()
return session.rawSDP, session.sdpLogicCtx return session.rawSDP, session.sdpLogicCtx
} }
@ -266,6 +270,9 @@ func (session *BaseInSession) UniqueKey() string {
// callback by RTPUnpacker // callback by RTPUnpacker
func (session *BaseInSession) onAVPacketUnpacked(pkt base.AVPacket) { func (session *BaseInSession) onAVPacketUnpacked(pkt base.AVPacket) {
session.mu.Lock()
defer session.mu.Unlock()
if session.avPacketQueue != nil { if session.avPacketQueue != nil {
session.avPacketQueue.Feed(pkt) session.avPacketQueue.Feed(pkt)
} else { } else {
@ -309,9 +316,9 @@ func (session *BaseInSession) handleRTCPPacket(b []byte, rAddr *net.UDPAddr) err
return ErrRTSP return ErrRTSP
} }
if session.loggedReadRTCPCount < session.debugLogMaxCount { if session.loggedReadRTCPCount.Load() < session.debugLogMaxCount {
nazalog.Debugf("[%s] LOGPACKET. read rtcp=%s", session.uniqueKey, hex.Dump(nazastring.SubSliceSafety(b, 32))) nazalog.Debugf("[%s] LOGPACKET. read rtcp=%s", session.uniqueKey, hex.Dump(nazastring.SubSliceSafety(b, 32)))
session.loggedReadRTCPCount++ session.loggedReadRTCPCount.Increment()
} }
packetType := b[1] packetType := b[1]
@ -319,14 +326,16 @@ func (session *BaseInSession) handleRTCPPacket(b []byte, rAddr *net.UDPAddr) err
switch packetType { switch packetType {
case rtprtcp.RTCPPacketTypeSR: case rtprtcp.RTCPPacketTypeSR:
sr := rtprtcp.ParseSR(b) sr := rtprtcp.ParseSR(b)
if session.loggedReadSRCount < session.debugLogMaxCount { if session.loggedReadSRCount.Load() < session.debugLogMaxCount {
nazalog.Debugf("[%s] LOGPACKET. %+v", session.uniqueKey, sr) nazalog.Debugf("[%s] LOGPACKET. %+v", session.uniqueKey, sr)
session.loggedReadSRCount++ session.loggedReadSRCount.Increment()
} }
var rrBuf []byte var rrBuf []byte
switch sr.SenderSSRC { switch sr.SenderSSRC {
case session.audioSSRC: case session.audioSSRC.Load():
session.mu.Lock()
rrBuf = session.audioRRProducer.Produce(sr.GetMiddleNTP()) rrBuf = session.audioRRProducer.Produce(sr.GetMiddleNTP())
session.mu.Unlock()
if rrBuf != nil { if rrBuf != nil {
if rAddr != nil { if rAddr != nil {
_ = session.audioRTCPConn.Write2Addr(rrBuf, rAddr) _ = session.audioRTCPConn.Write2Addr(rrBuf, rAddr)
@ -335,8 +344,10 @@ func (session *BaseInSession) handleRTCPPacket(b []byte, rAddr *net.UDPAddr) err
} }
session.currConnStat.WroteBytesSum.Add(uint64(len(b))) session.currConnStat.WroteBytesSum.Add(uint64(len(b)))
} }
case session.videoSSRC: case session.videoSSRC.Load():
session.mu.Lock()
rrBuf = session.videoRRProducer.Produce(sr.GetMiddleNTP()) rrBuf = session.videoRRProducer.Produce(sr.GetMiddleNTP())
session.mu.Unlock()
if rrBuf != nil { if rrBuf != nil {
if rAddr != nil { if rAddr != nil {
_ = session.videoRTCPConn.Write2Addr(rrBuf, rAddr) _ = session.videoRTCPConn.Write2Addr(rrBuf, rAddr)
@ -373,7 +384,7 @@ func (session *BaseInSession) handleRTPPacket(b []byte) error {
return ErrRTSP return ErrRTSP
} }
h, err := rtprtcp.ParseRTPPacket(b) h, err := rtprtcp.ParseRTPHeader(b)
if err != nil { if err != nil {
nazalog.Errorf("[%s] handleRTPPacket invalid rtp packet. err=%+v", session.uniqueKey, err) nazalog.Errorf("[%s] handleRTPPacket invalid rtp packet. err=%+v", session.uniqueKey, err)
return err return err
@ -385,27 +396,31 @@ func (session *BaseInSession) handleRTPPacket(b []byte) error {
// 接收数据时保证了sdp的原始类型对应 // 接收数据时保证了sdp的原始类型对应
if session.sdpLogicCtx.IsAudioPayloadTypeOrigin(packetType) { if session.sdpLogicCtx.IsAudioPayloadTypeOrigin(packetType) {
if session.loggedReadAudioRTPCount < session.debugLogMaxCount { if session.loggedReadAudioRTPCount.Load() < session.debugLogMaxCount {
nazalog.Debugf("[%s] LOGPACKET. read audio rtp=%+v", session.uniqueKey, h) nazalog.Debugf("[%s] LOGPACKET. read audio rtp=%+v, len=%d", session.uniqueKey, h, len(b))
session.loggedReadAudioRTPCount++ session.loggedReadAudioRTPCount.Increment()
} }
session.audioSSRC = h.SSRC session.audioSSRC.Store(h.SSRC)
session.observer.OnRTPPacket(pkt) session.observer.OnRTPPacket(pkt)
session.mu.Lock()
session.audioRRProducer.FeedRTPPacket(h.Seq) session.audioRRProducer.FeedRTPPacket(h.Seq)
session.mu.Unlock()
if session.audioUnpacker != nil { if session.audioUnpacker != nil {
session.audioUnpacker.Feed(pkt) session.audioUnpacker.Feed(pkt)
} }
} else if session.sdpLogicCtx.IsVideoPayloadTypeOrigin(packetType) { } else if session.sdpLogicCtx.IsVideoPayloadTypeOrigin(packetType) {
if session.loggedReadVideoRTPCount < session.debugLogMaxCount { if session.loggedReadVideoRTPCount.Load() < session.debugLogMaxCount {
nazalog.Debugf("[%s] LOGPACKET. read video rtp=%+v", session.uniqueKey, h) nazalog.Debugf("[%s] LOGPACKET. read video rtp=%+v, len=%d", session.uniqueKey, h, len(b))
session.loggedReadVideoRTPCount++ session.loggedReadVideoRTPCount.Increment()
} }
session.videoSSRC = h.SSRC session.videoSSRC.Store(h.SSRC)
session.observer.OnRTPPacket(pkt) session.observer.OnRTPPacket(pkt)
session.mu.Lock()
session.videoRRProducer.FeedRTPPacket(h.Seq) session.videoRRProducer.FeedRTPPacket(h.Seq)
session.mu.Unlock()
if session.videoUnpacker != nil { if session.videoUnpacker != nil {
session.videoUnpacker.Feed(pkt) session.videoUnpacker.Feed(pkt)

Loading…
Cancel
Save