// Copyright 2020, Chef. All rights reserved. // https://github.com/q191201771/lal // // Use of this source code is governed by a MIT-style license // that can be found in the License file. // // Author: Chef (191201771@qq.com) package rtsp import ( "encoding/hex" "net" "sync" "github.com/q191201771/naza/pkg/nazaatomic" "github.com/q191201771/naza/pkg/nazabytes" "github.com/q191201771/naza/pkg/nazaerrors" "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/rtprtcp" "github.com/q191201771/lal/pkg/sdp" "github.com/q191201771/naza/pkg/nazanet" ) // 聚合PubSession和PullSession,也即流数据是输入类型的session // IBaseInSessionObserver // // BaseInSession会向上层回调两种格式的数据(本质上是一份数据,业务方可自由选择使用): // 1. 原始的rtp packet // 2. rtp合并后的av packet type IBaseInSessionObserver interface { OnSdp(sdpCtx sdp.LogicContext) // OnRtpPacket 回调收到的RTP包 // OnRtpPacket(pkt rtprtcp.RtpPacket) // OnAvPacket // // @param pkt: pkt结构体中字段含义见 rtprtcp.OnAvPacket // OnAvPacket(pkt base.AvPacket) } type BaseInSession struct { cmdSession IInterleavedPacketWriter observer IBaseInSessionObserver audioRtpConn *nazanet.UdpConnection videoRtpConn *nazanet.UdpConnection audioRtcpConn *nazanet.UdpConnection videoRtcpConn *nazanet.UdpConnection audioRtpChannel int audioRtcpChannel int videoRtpChannel int videoRtcpChannel int sessionStat base.BasicSessionStat mu sync.Mutex sdpCtx sdp.LogicContext // const after set avPacketQueue *AvPacketQueue audioRrProducer *rtprtcp.RrProducer videoRrProducer *rtprtcp.RrProducer audioUnpacker rtprtcp.IRtpUnpacker videoUnpacker rtprtcp.IRtpUnpacker audioSsrc nazaatomic.Uint32 videoSsrc nazaatomic.Uint32 disposeOnce sync.Once waitChan chan error dumpReadAudioRtp base.LogDump dumpReadVideoRtp base.LogDump dumpReadRtcp base.LogDump dumpReadSr base.LogDump } func NewBaseInSession(sessionType base.SessionType, cmdSession IInterleavedPacketWriter) *BaseInSession { s := &BaseInSession{ sessionStat: base.NewBasicSessionStat(sessionType, ""), cmdSession: cmdSession, waitChan: make(chan error, 1), dumpReadAudioRtp: base.NewLogDump(Log, 1), dumpReadVideoRtp: base.NewLogDump(Log, 1), dumpReadSr: base.NewLogDump(Log, 2), } Log.Infof("[%s] lifecycle new rtsp BaseInSession. session=%p", s.UniqueKey(), s) return s } func NewBaseInSessionWithObserver(sessionType base.SessionType, cmdSession IInterleavedPacketWriter, observer IBaseInSessionObserver) *BaseInSession { s := NewBaseInSession(sessionType, cmdSession) s.observer = observer return s } func (session *BaseInSession) InitWithSdp(sdpCtx sdp.LogicContext) { session.mu.Lock() session.sdpCtx = sdpCtx session.mu.Unlock() if session.sdpCtx.IsAudioUnpackable() { session.audioUnpacker = rtprtcp.DefaultRtpUnpackerFactory(session.sdpCtx.GetAudioPayloadTypeBase(), session.sdpCtx.AudioClockRate, unpackerItemMaxSize, session.onAvPacketUnpacked) } else { Log.Warnf("[%s] audio unpacker not support for this type yet. logicCtx=%+v", session.UniqueKey(), session.sdpCtx) } if session.sdpCtx.IsVideoUnpackable() { session.videoUnpacker = rtprtcp.DefaultRtpUnpackerFactory(session.sdpCtx.GetVideoPayloadTypeBase(), session.sdpCtx.VideoClockRate, unpackerItemMaxSize, session.onAvPacketUnpacked) } else { Log.Warnf("[%s] video unpacker not support this type yet. logicCtx=%+v", session.UniqueKey(), session.sdpCtx) } session.audioRrProducer = rtprtcp.NewRrProducer(session.sdpCtx.AudioClockRate) session.videoRrProducer = rtprtcp.NewRrProducer(session.sdpCtx.VideoClockRate) if session.sdpCtx.IsAudioUnpackable() && session.sdpCtx.IsVideoUnpackable() { session.mu.Lock() session.avPacketQueue = NewAvPacketQueue(session.onAvPacket) session.mu.Unlock() } if session.observer != nil { session.observer.OnSdp(session.sdpCtx) } } // SetObserver 如果没有设置回调监听对象,可以通过该函数设置,调用方保证调用该函数发生在调用InitWithSdp之后 func (session *BaseInSession) SetObserver(observer IBaseInSessionObserver) { session.observer = observer // 避免在当前协程回调,降低业务方使用负担,不必担心设置监听对象和回调函数中锁重入 TODO(chef): 更好的方式 go func() { session.observer.OnSdp(session.sdpCtx) }() } func (session *BaseInSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UdpConnection) error { if session.sdpCtx.IsAudioUri(uri) { session.audioRtpConn = rtpConn session.audioRtcpConn = rtcpConn } else if session.sdpCtx.IsVideoUri(uri) { session.videoRtpConn = rtpConn session.videoRtcpConn = rtcpConn } else { return nazaerrors.Wrap(base.ErrRtsp) } go rtpConn.RunLoop(session.onReadRtpPacket) go rtcpConn.RunLoop(session.onReadRtcpPacket) return nil } func (session *BaseInSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int) error { if session.sdpCtx.IsAudioUri(uri) { session.audioRtpChannel = rtpChannel session.audioRtcpChannel = rtcpChannel return nil } else if session.sdpCtx.IsVideoUri(uri) { session.videoRtpChannel = rtpChannel session.videoRtcpChannel = rtcpChannel return nil } return nazaerrors.Wrap(base.ErrRtsp) } // --------------------------------------------------------------------------------------------------------------------- // IClientSessionLifecycle interface // --------------------------------------------------------------------------------------------------------------------- // Dispose 文档请参考: IClientSessionLifecycle interface func (session *BaseInSession) Dispose() error { return session.dispose(nil) } // WaitChan 文档请参考: IClientSessionLifecycle interface // // 注意,目前只有一种情况,即上层主动调用Dispose函数,此时error为nil func (session *BaseInSession) WaitChan() <-chan error { return session.waitChan } // --------------------------------------------------------------------------------------------------------------------- func (session *BaseInSession) GetSdp() sdp.LogicContext { session.mu.Lock() defer session.mu.Unlock() return session.sdpCtx } func (session *BaseInSession) HandleInterleavedPacket(b []byte, channel int) { switch channel { case session.audioRtpChannel: fallthrough case session.videoRtpChannel: _ = session.handleRtpPacket(b) case session.audioRtcpChannel: fallthrough case session.videoRtcpChannel: _ = session.handleRtcpPacket(b, nil) default: Log.Errorf("[%s] read interleaved packet but channel invalid. channel=%d", session.UniqueKey(), channel) } } // WriteRtpRtcpDummy 发现pull时,需要先给对端发送数据,才能收到数据 func (session *BaseInSession) WriteRtpRtcpDummy() { if session.videoRtpConn != nil { _ = session.videoRtpConn.Write(dummyRtpPacket) } if session.videoRtcpConn != nil { _ = session.videoRtcpConn.Write(dummyRtcpPacket) } if session.audioRtpConn != nil { _ = session.audioRtpConn.Write(dummyRtpPacket) } if session.audioRtcpConn != nil { _ = session.audioRtcpConn.Write(dummyRtcpPacket) } } // ----- ISessionStat -------------------------------------------------------------------------------------------------- func (session *BaseInSession) GetStat() base.StatSession { return session.sessionStat.GetStat() } func (session *BaseInSession) UpdateStat(intervalSec uint32) { session.sessionStat.UpdateStat(intervalSec) } func (session *BaseInSession) IsAlive() (readAlive, writeAlive bool) { return session.sessionStat.IsAlive() } // --------------------------------------------------------------------------------------------------------------------- func (session *BaseInSession) UniqueKey() string { return session.sessionStat.UniqueKey() } // callback by RTPUnpacker func (session *BaseInSession) onAvPacketUnpacked(pkt base.AvPacket) { session.mu.Lock() defer session.mu.Unlock() if session.avPacketQueue != nil { session.avPacketQueue.Feed(pkt) } else { session.observer.OnAvPacket(pkt) } } // callback by avpacket queue func (session *BaseInSession) onAvPacket(pkt base.AvPacket) { session.observer.OnAvPacket(pkt) } // callback by UDPConnection func (session *BaseInSession) onReadRtpPacket(b []byte, rAddr *net.UDPAddr, err error) bool { if err != nil { // TODO(chef): // read udp [::]:30008: use of closed network connection // 可以退出loop,看是在上层退还是下层退,但是要注意每次read都判断的开销 Log.Warnf("[%s] read udp packet failed. err=%+v", session.UniqueKey(), err) return true } _ = session.handleRtpPacket(b) return true } // callback by UDPConnection func (session *BaseInSession) onReadRtcpPacket(b []byte, rAddr *net.UDPAddr, err error) bool { if err != nil { Log.Warnf("[%s] read udp packet failed. err=%+v", session.UniqueKey(), err) return true } _ = session.handleRtcpPacket(b, rAddr) return true } // @param rAddr 对端地址,往对端发送数据时使用,注意,如果nil,则表示是interleaved模式,我们直接往TCP连接发数据 func (session *BaseInSession) handleRtcpPacket(b []byte, rAddr *net.UDPAddr) error { session.sessionStat.AddReadBytes(len(b)) if len(b) <= 0 { Log.Errorf("[%s] handleRtcpPacket but length invalid. len=%d", session.UniqueKey(), len(b)) return nazaerrors.Wrap(base.ErrRtsp) } packetType := b[1] switch packetType { case rtprtcp.RtcpPacketTypeSr: sr := rtprtcp.ParseSr(b) if session.dumpReadSr.ShouldDump() { session.dumpReadSr.Outf("[%s] READ_RTCP. sr=%+v", session.UniqueKey(), sr) } var rrBuf []byte switch sr.SenderSsrc { case session.audioSsrc.Load(): session.mu.Lock() rrBuf = session.audioRrProducer.Produce(sr.GetMiddleNtp()) session.mu.Unlock() if rrBuf != nil { if rAddr != nil { _ = session.audioRtcpConn.Write2Addr(rrBuf, rAddr) } else { _ = session.cmdSession.WriteInterleavedPacket(rrBuf, session.audioRtcpChannel) } session.sessionStat.AddWriteBytes(len(b)) } case session.videoSsrc.Load(): session.mu.Lock() rrBuf = session.videoRrProducer.Produce(sr.GetMiddleNtp()) session.mu.Unlock() if rrBuf != nil { if rAddr != nil { _ = session.videoRtcpConn.Write2Addr(rrBuf, rAddr) } else { _ = session.cmdSession.WriteInterleavedPacket(rrBuf, session.videoRtcpChannel) } session.sessionStat.AddWriteBytes(len(b)) } default: // noop // // ffmpeg推流时,会在发送第一个RTP包之前就发送一个SR,所以关闭这个警告日志 //Log.Warnf("[%s] read rtcp sr but senderSsrc invalid. senderSsrc=%d, audio=%d, video=%d", // p.uniqueKey, sr.SenderSsrc, p.audioSsrc, p.videoSsrc) } default: Log.Warnf("[%s] handleRtcpPacket but type unknown. type=%d, len=%d, hex=%s", session.UniqueKey(), b[1], len(b), hex.Dump(nazabytes.Prefix(b, 32))) return nazaerrors.Wrap(base.ErrRtsp) } return nil } func (session *BaseInSession) handleRtpPacket(b []byte) error { session.sessionStat.AddReadBytes(len(b)) if len(b) < rtprtcp.RtpFixedHeaderLength { Log.Errorf("[%s] handleRtpPacket but length invalid. len=%d", session.UniqueKey(), len(b)) return nazaerrors.Wrap(base.ErrRtsp) } packetType := int(b[1] & 0x7F) if !session.sdpCtx.IsPayloadTypeOrigin(packetType) { Log.Errorf("[%s] handleRtpPacket but type invalid. type=%d", session.UniqueKey(), packetType) return nazaerrors.Wrap(base.ErrRtsp) } h, err := rtprtcp.ParseRtpHeader(b) if err != nil { Log.Errorf("[%s] handleRtpPacket invalid rtp packet. err=%+v", session.UniqueKey(), err) return err } var pkt rtprtcp.RtpPacket pkt.Header = h pkt.Raw = b // 接收数据时,保证了sdp的原始类型对应 if session.sdpCtx.IsAudioPayloadTypeOrigin(packetType) { if session.dumpReadAudioRtp.ShouldDump() { session.dumpReadAudioRtp.Outf("[%s] READ_RTP. audio, h=%+v, len=%d, hex=%s", session.UniqueKey(), h, len(b), hex.Dump(nazabytes.Prefix(b, 32))) } session.audioSsrc.Store(h.Ssrc) session.observer.OnRtpPacket(pkt) session.mu.Lock() session.audioRrProducer.FeedRtpPacket(h.Seq) session.mu.Unlock() if session.audioUnpacker != nil { session.audioUnpacker.Feed(pkt) } } else if session.sdpCtx.IsVideoPayloadTypeOrigin(packetType) { if session.dumpReadVideoRtp.ShouldDump() { session.dumpReadVideoRtp.Outf("[%s] READ_RTP. video, h=%+v, len=%d, hex=%s", session.UniqueKey(), h, len(b), hex.Dump(nazabytes.Prefix(b, 32))) } session.videoSsrc.Store(h.Ssrc) session.observer.OnRtpPacket(pkt) session.mu.Lock() session.videoRrProducer.FeedRtpPacket(h.Seq) session.mu.Unlock() if session.videoUnpacker != nil { session.videoUnpacker.Feed(pkt) } } else { // noop 因为前面已经判断过type了,所以永远不会走到这 } return nil } func (session *BaseInSession) dispose(err error) error { var retErr error session.disposeOnce.Do(func() { Log.Infof("[%s] lifecycle dispose rtsp BaseInSession. session=%p", session.UniqueKey(), session) var e1, e2, e3, e4 error if session.audioRtpConn != nil { e1 = session.audioRtpConn.Dispose() } if session.audioRtcpConn != nil { e2 = session.audioRtcpConn.Dispose() } if session.videoRtpConn != nil { e3 = session.videoRtpConn.Dispose() } if session.videoRtcpConn != nil { e4 = session.videoRtcpConn.Dispose() } session.waitChan <- nil retErr = nazaerrors.CombineErrors(e1, e2, e3, e4) }) return retErr }