From 24e8887ef851fa9ccf149cd756c126e7ad269d96 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 29 May 2021 21:29:46 +0800 Subject: [PATCH] =?UTF-8?q?#74=20#85=20[feat]=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E6=B5=B7=E5=BA=B7=E5=A8=81=E8=A7=86NVR=EF=BC=8C=E5=A4=A7?= =?UTF-8?q?=E5=8D=8E=E6=B5=B7=E5=BA=B7IDC=E7=9A=84RTSP=E6=B5=81=EF=BC=88SD?= =?UTF-8?q?P=E4=B8=8D=E5=8C=85=E5=90=ABSPS=E3=80=81PPS=E7=AD=89=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=EF=BC=8C=E8=80=8C=E6=98=AF=E9=80=9A=E8=BF=87RTP?= =?UTF-8?q?=E5=8C=85=E5=8F=91=E9=80=81=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/demo/pullrtsp/pullrtsp.go | 43 +-- .../pullrtsp2pushrtmp/pullrtsp2pushrtmp.go | 44 +-- .../pullrtsp2pushrtsp/pullrtsp2pushrtsp.go | 4 +- pkg/hls/muxer.go | 4 +- pkg/logic/group.go | 59 +-- pkg/logic/iface_impl.go | 4 + pkg/remux/avpacket2flv.go | 203 ---------- pkg/remux/avpacket2rtmp.go | 348 +++++++++++------- pkg/remux/avpacket2rtmp_test.go | 86 +++++ pkg/rtprtcp/rtp_unpacker.go | 12 +- pkg/rtprtcp/rtp_unpacker_test.go | 232 ++++++++---- pkg/rtsp/base_in_session.go | 19 +- pkg/sdp/logic.go | 14 +- pkg/sdp/sdp_test.go | 64 ++++ 14 files changed, 595 insertions(+), 541 deletions(-) delete mode 100644 pkg/remux/avpacket2flv.go create mode 100644 pkg/remux/avpacket2rtmp_test.go diff --git a/app/demo/pullrtsp/pullrtsp.go b/app/demo/pullrtsp/pullrtsp.go index f66dac4..d70c7f9 100644 --- a/app/demo/pullrtsp/pullrtsp.go +++ b/app/demo/pullrtsp/pullrtsp.go @@ -17,45 +17,10 @@ import ( "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/remux" - "github.com/q191201771/lal/pkg/rtprtcp" "github.com/q191201771/lal/pkg/rtsp" "github.com/q191201771/naza/pkg/nazalog" ) -var fileWriter httpflv.FLVFileWriter - -type Observer struct { -} - -func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) { - // noop -} - -func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) { - metadata, ash, vsh, err := remux.AVConfig2FLVTag(asc, vps, sps, pps) - nazalog.Assert(nil, err) - - err = fileWriter.WriteTag(*metadata) - nazalog.Assert(nil, err) - - if ash != nil { - err = fileWriter.WriteTag(*ash) - nazalog.Assert(nil, err) - } - - if vsh != nil { - err = fileWriter.WriteTag(*vsh) - nazalog.Assert(nil, err) - } -} - -func (o *Observer) OnAVPacket(pkt base.AVPacket) { - tag, err := remux.AVPacket2FLVTag(pkt) - nazalog.Assert(nil, err) - err = fileWriter.WriteTag(tag) - nazalog.Assert(nil, err) -} - func main() { _ = nazalog.Init(func(option *nazalog.Option) { option.AssertBehavior = nazalog.AssertFatal @@ -64,14 +29,18 @@ func main() { inURL, outFilename, overTCP := parseFlag() + var fileWriter httpflv.FLVFileWriter err := fileWriter.Open(outFilename) nazalog.Assert(nil, err) defer fileWriter.Dispose() err = fileWriter.WriteRaw(httpflv.FLVHeader) nazalog.Assert(nil, err) - o := &Observer{} - pullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) { + remuxer := remux.NewAVPacket2RTMPRemuxer(func(msg base.RTMPMsg) { + err = fileWriter.WriteTag(*remux.RTMPMsg2FLVTag(msg)) + nazalog.Assert(nil, err) + }) + pullSession := rtsp.NewPullSession(remuxer, func(option *rtsp.PullSessionOption) { option.PullTimeoutMS = 5000 option.OverTCP = overTCP != 0 }) diff --git a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go index f7e60e4..dbe6241 100644 --- a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go +++ b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go @@ -18,45 +18,10 @@ import ( "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/remux" - "github.com/q191201771/lal/pkg/rtprtcp" "github.com/q191201771/lal/pkg/rtsp" "github.com/q191201771/naza/pkg/nazalog" ) -var pushSession *rtmp.PushSession - -type Observer struct { -} - -func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) { - // noop -} - -func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) { - metadata, ash, vsh, err := remux.AVConfig2RTMPMsg(asc, vps, sps, pps) - nazalog.Assert(nil, err) - - err = pushSession.Write(rtmp.Message2Chunks(metadata.Payload, &metadata.Header)) - nazalog.Assert(nil, err) - - if ash != nil { - err = pushSession.Write(rtmp.Message2Chunks(ash.Payload, &ash.Header)) - nazalog.Assert(nil, err) - } - - if vsh != nil { - err = pushSession.Write(rtmp.Message2Chunks(vsh.Payload, &vsh.Header)) - nazalog.Assert(nil, err) - } -} - -func (o *Observer) OnAVPacket(pkt base.AVPacket) { - msg, err := remux.AVPacket2RTMPMsg(pkt) - nazalog.Assert(nil, err) - err = pushSession.Write(rtmp.Message2Chunks(msg.Payload, &msg.Header)) - nazalog.Assert(nil, err) -} - func main() { _ = nazalog.Init(func(option *nazalog.Option) { option.AssertBehavior = nazalog.AssertFatal @@ -65,7 +30,7 @@ func main() { inURL, outURL, overTCP := parseFlag() - pushSession = rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { + pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { option.PushTimeoutMS = 5000 option.WriteAVTimeoutMS = 5000 }) @@ -74,8 +39,11 @@ func main() { nazalog.Assert(nil, err) defer pushSession.Dispose() - o := &Observer{} - pullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) { + remuxer := remux.NewAVPacket2RTMPRemuxer(func(msg base.RTMPMsg) { + err = pushSession.Write(rtmp.Message2Chunks(msg.Payload, &msg.Header)) + nazalog.Assert(nil, err) + }) + pullSession := rtsp.NewPullSession(remuxer, func(option *rtsp.PullSessionOption) { option.PullTimeoutMS = 5000 option.OverTCP = overTCP != 0 }) diff --git a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go index 9755582..f2b0d05 100644 --- a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go +++ b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go @@ -14,6 +14,8 @@ import ( "os" "time" + "github.com/q191201771/lal/pkg/sdp" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/rtprtcp" "github.com/q191201771/lal/pkg/rtsp" @@ -29,7 +31,7 @@ func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) { rtpPacketChan <- pkt } -func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) { +func (o *Observer) OnSDP(sdpCtx sdp.LogicContext) { // noop } diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index aa95ad7..e8a8c1d 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -157,7 +157,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) { } if !m.opened { - nazalog.Warnf("[%s] OnFrame A not opened.", m.UniqueKey) + nazalog.Warnf("[%s] OnFrame A not opened. boundary=%t", m.UniqueKey, boundary) return } @@ -176,7 +176,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) { } if !m.opened { - nazalog.Warnf("[%s] OnFrame V not opened.", m.UniqueKey) + nazalog.Warnf("[%s] OnFrame V not opened. boundary=%t, key=%t", m.UniqueKey, boundary, frame.Key) return } diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 3c370ce..dff0a61 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "github.com/q191201771/lal/pkg/sdp" + "github.com/q191201771/lal/pkg/mpegts" "github.com/q191201771/lal/pkg/remux" @@ -51,8 +53,9 @@ type Group struct { // stat base.StatGroup // pub - rtmpPubSession *rtmp.ServerSession - rtspPubSession *rtsp.PubSession + rtmpPubSession *rtmp.ServerSession + rtspPubSession *rtsp.PubSession + rtsp2RTMPRemuxer *remux.AVPacket2RTMPRemuxer // pull pullEnable bool pullURL string @@ -77,12 +80,6 @@ type Group struct { // rtmpBufWriter base.IBufWriter // TODO(chef): 后面可以在业务层加一个定时Flush - // rtsp pub使用 - asc []byte - vps []byte - sps []byte - pps []byte - // mpegts使用 patpmt []byte @@ -172,6 +169,7 @@ func (group *Group) Tick() { nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey()) group.rtspPubSession.Dispose() group.rtspPubSession = nil + group.rtsp2RTMPRemuxer = nil } } if group.pullProxy.pullSession != nil { @@ -259,6 +257,7 @@ func (group *Group) Dispose() { if group.rtspPubSession != nil { group.rtspPubSession.Dispose() group.rtspPubSession = nil + group.rtsp2RTMPRemuxer = nil } for session := range group.rtmpSubSessionSet { @@ -326,6 +325,9 @@ func (group *Group) AddRTSPPubSession(session *rtsp.PubSession) bool { group.rtspPubSession = session group.addIn() + group.rtsp2RTMPRemuxer = remux.NewAVPacket2RTMPRemuxer(func(msg base.RTMPMsg) { + group.broadcastByRTMPMsg(msg) + }) session.SetObserver(group) return true @@ -546,40 +548,20 @@ func (group *Group) OnRTPPacket(pkt rtprtcp.RTPPacket) { } // rtsp.PubSession -func (group *Group) OnAVConfig(asc, vps, sps, pps []byte) { - // 注意,前面已经进锁了,这里依然在锁保护内 - - group.asc = asc - group.vps = vps - group.sps = sps - group.pps = pps +func (group *Group) OnSDP(sdpCtx sdp.LogicContext) { + group.mutex.Lock() + defer group.mutex.Unlock() - metadata, vsh, ash, err := remux.AVConfig2RTMPMsg(group.asc, group.vps, group.sps, group.pps) - if err != nil { - nazalog.Errorf("[%s] remux avconfig to metadata and seqheader failed. err=%+v", group.UniqueKey, err) - return - } - if metadata != nil { - group.broadcastByRTMPMsg(*metadata) - } - if vsh != nil { - group.broadcastByRTMPMsg(*vsh) - } - if ash != nil { - group.broadcastByRTMPMsg(*ash) - } + group.rtsp2RTMPRemuxer.OnSDP(sdpCtx) } // rtsp.PubSession func (group *Group) OnAVPacket(pkt base.AVPacket) { + group.mutex.Lock() + defer group.mutex.Unlock() //nazalog.Tracef("[%s] > Group::OnAVPacket. type=%s, ts=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp) - msg, err := remux.AVPacket2RTMPMsg(pkt) - if err != nil { - nazalog.Errorf("[%s] remux av packet to rtmp msg failed. err=+%v", group.UniqueKey, err) - return - } - group.BroadcastByRTMPMsg(msg) + group.rtsp2RTMPRemuxer.OnAVPacket(pkt) } func (group *Group) StringifyDebugStats() string { @@ -655,6 +637,7 @@ func (group *Group) KickOutSession(sessionID string) bool { } else if strings.HasPrefix(sessionID, base.UKPreRTSPPubSession) { if group.rtspPubSession != nil { group.rtspPubSession.Dispose() + group.rtsp2RTMPRemuxer = nil return true } } else if strings.HasPrefix(sessionID, base.UKPreFLVSubSession) { @@ -703,6 +686,7 @@ func (group *Group) delRTSPPubSession(session *rtsp.PubSession) { _ = group.rtspPubSession.Dispose() group.rtspPubSession = nil + group.rtsp2RTMPRemuxer = nil group.delIn() } @@ -1204,8 +1188,3 @@ func (group *Group) disposeHLSMuxer() { group.hlsMuxer = nil } } - -// TODO chef: 后续看是否有更合适的方法判断 -func (group *Group) isHEVC() bool { - return group.vps != nil -} diff --git a/pkg/logic/iface_impl.go b/pkg/logic/iface_impl.go index 8a99459..f34d7a4 100644 --- a/pkg/logic/iface_impl.go +++ b/pkg/logic/iface_impl.go @@ -13,6 +13,7 @@ import ( "github.com/q191201771/lal/pkg/hls" "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/httpts" + "github.com/q191201771/lal/pkg/remux" "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/lal/pkg/rtsp" ) @@ -140,9 +141,12 @@ var _ HTTPAPIServerObserver = &ServerManager{} var _ rtmp.PubSessionObserver = &Group{} // var _ rtsp.PullSessionObserver = &Group{} +var _ rtsp.PullSessionObserver = &remux.AVPacket2RTMPRemuxer{} var _ rtsp.PubSessionObserver = &Group{} +var _ rtsp.PubSessionObserver = &remux.AVPacket2RTMPRemuxer{} var _ hls.MuxerObserver = &Group{} var _ rtsp.BaseInSessionObserver = &Group{} // +var _ rtsp.BaseInSessionObserver = &remux.AVPacket2RTMPRemuxer{} var _ rtmp.ServerSessionObserver = &rtmp.Server{} var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientSimple{} diff --git a/pkg/remux/avpacket2flv.go b/pkg/remux/avpacket2flv.go deleted file mode 100644 index 5872924..0000000 --- a/pkg/remux/avpacket2flv.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright 2020, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package remux - -import ( - "github.com/q191201771/lal/pkg/aac" - "github.com/q191201771/lal/pkg/avc" - "github.com/q191201771/lal/pkg/base" - "github.com/q191201771/lal/pkg/hevc" - "github.com/q191201771/lal/pkg/httpflv" - "github.com/q191201771/lal/pkg/rtmp" - "github.com/q191201771/naza/pkg/bele" - "github.com/q191201771/naza/pkg/nazalog" -) - -// @param asc 如果为nil,则没有音频 -// @param vps 如果为nil,则是H264,如果不为nil,则是H265 -// @return 返回的内存块为新申请的独立内存块 -func AVConfig2FLVTag(asc, vps, sps, pps []byte) (metadata, ash, vsh *httpflv.Tag, err error) { - var bMetadata []byte - var bVsh []byte - var bAsh []byte - - hasAudio := asc != nil - hasVideo := sps != nil && pps != nil - isHEVC := vps != nil - - if !hasAudio && !hasVideo { - err = ErrRemux - return - } - - audiocodecid := -1 - if hasAudio { - audiocodecid = int(base.RTMPSoundFormatAAC) - } - videocodecid := -1 - width := -1 - height := -1 - if hasVideo { - if isHEVC { - videocodecid = int(base.RTMPCodecIDHEVC) - var ctx hevc.Context - err = hevc.ParseSPS(sps, &ctx) - if err == nil { - width = int(ctx.PicWidthInLumaSamples) - height = int(ctx.PicHeightInLumaSamples) - } else { - nazalog.Warnf("parse hevc sps failed. err=%+v", err) - } - bVsh, err = hevc.BuildSeqHeaderFromVPSSPSPPS(vps, sps, pps) - if err != nil { - return - } - } else { - videocodecid = int(base.RTMPCodecIDAVC) - var ctx avc.Context - err = avc.ParseSPS(sps, &ctx) - if err != nil { - return - } - if ctx.Width != 0 { - width = int(ctx.Width) - } - if ctx.Height != 0 { - height = int(ctx.Height) - } - - bVsh, err = avc.BuildSeqHeaderFromSPSPPS(sps, pps) - if err != nil { - return - } - } - } - - if hasAudio { - bAsh, err = aac.BuildAACSeqHeader(asc) - if err != nil { - return - } - } - - var h httpflv.TagHeader - var tagRaw []byte - - bMetadata, err = rtmp.BuildMetadata(width, height, audiocodecid, videocodecid) - if err != nil { - return - } - - h.Type = base.RTMPTypeIDMetadata - h.DataSize = uint32(len(bMetadata)) - h.Timestamp = 0 - tagRaw = httpflv.PackHTTPFLVTag(h.Type, h.Timestamp, bMetadata) - metadata = &httpflv.Tag{ - Header: h, - Raw: tagRaw, - } - - if hasVideo { - h.Type = base.RTMPTypeIDVideo - h.DataSize = uint32(len(bVsh)) - h.Timestamp = 0 - tagRaw = httpflv.PackHTTPFLVTag(h.Type, h.Timestamp, bVsh) - vsh = &httpflv.Tag{ - Header: h, - Raw: tagRaw, - } - } - - if hasAudio { - h.Type = base.RTMPTypeIDAudio - h.DataSize = uint32(len(bAsh)) - h.Timestamp = 0 - tagRaw = httpflv.PackHTTPFLVTag(h.Type, h.Timestamp, bAsh) - ash = &httpflv.Tag{ - Header: h, - Raw: tagRaw, - } - } - - return -} - -// @return 返回的内存块为新申请的独立内存块 -func AVPacket2FLVTag(pkt base.AVPacket) (tag httpflv.Tag, err error) { - switch pkt.PayloadType { - case base.AVPacketPTAVC: - fallthrough - case base.AVPacketPTHEVC: - tag.Header.Type = base.RTMPTypeIDVideo - tag.Header.DataSize = uint32(len(pkt.Payload)) + 5 - tag.Header.Timestamp = pkt.Timestamp - tag.Raw = make([]byte, httpflv.TagHeaderSize+int(tag.Header.DataSize)+httpflv.PrevTagSizeFieldSize) - tag.Raw[0] = tag.Header.Type - bele.BEPutUint24(tag.Raw[1:], tag.Header.DataSize) - bele.BEPutUint24(tag.Raw[4:], tag.Header.Timestamp&0xFFFFFF) - tag.Raw[7] = uint8(tag.Header.Timestamp >> 24) - tag.Raw[8] = 0 - tag.Raw[9] = 0 - tag.Raw[10] = 0 - - var nals [][]byte - nals, err = avc.SplitNALUAVCC(pkt.Payload) - if err != nil { - return - } - for _, nal := range nals { - switch pkt.PayloadType { - case base.AVPacketPTAVC: - t := avc.ParseNALUType(nal[0]) - if t == avc.NALUTypeIDRSlice { - tag.Raw[httpflv.TagHeaderSize] = base.RTMPAVCKeyFrame - } else { - tag.Raw[httpflv.TagHeaderSize] = base.RTMPAVCInterFrame - } - tag.Raw[httpflv.TagHeaderSize+1] = base.RTMPAVCPacketTypeNALU - case base.AVPacketPTHEVC: - t := hevc.ParseNALUType(nal[0]) - if t == hevc.NALUTypeSliceIDR || t == hevc.NALUTypeSliceIDRNLP { - tag.Raw[httpflv.TagHeaderSize] = base.RTMPHEVCKeyFrame - } else { - tag.Raw[httpflv.TagHeaderSize] = base.RTMPHEVCInterFrame - } - tag.Raw[httpflv.TagHeaderSize+1] = base.RTMPHEVCPacketTypeNALU - } - } - - tag.Raw[httpflv.TagHeaderSize+2] = 0x0 // cts - tag.Raw[httpflv.TagHeaderSize+3] = 0x0 - tag.Raw[httpflv.TagHeaderSize+4] = 0x0 - copy(tag.Raw[httpflv.TagHeaderSize+5:], pkt.Payload) - bele.BEPutUint32(tag.Raw[httpflv.TagHeaderSize+int(tag.Header.DataSize):], uint32(httpflv.TagHeaderSize)+tag.Header.DataSize) - //nazalog.Debugf("%d %s", len(msg.Payload), hex.Dump(msg.Payload[:32])) - case base.AVPacketPTAAC: - tag.Header.Type = base.RTMPTypeIDAudio - tag.Header.DataSize = uint32(len(pkt.Payload)) + 2 - tag.Header.Timestamp = pkt.Timestamp - tag.Raw = make([]byte, httpflv.TagHeaderSize+int(tag.Header.DataSize)+httpflv.PrevTagSizeFieldSize) - tag.Raw[0] = tag.Header.Type - bele.BEPutUint24(tag.Raw[1:], tag.Header.DataSize) - bele.BEPutUint24(tag.Raw[4:], tag.Header.Timestamp&0xFFFFFF) - tag.Raw[7] = uint8(tag.Header.Timestamp >> 24) - tag.Raw[8] = 0 - tag.Raw[9] = 0 - tag.Raw[10] = 0 - tag.Raw[httpflv.TagHeaderSize] = 0xAF - tag.Raw[httpflv.TagHeaderSize+1] = base.RTMPAACPacketTypeRaw - copy(tag.Raw[httpflv.TagHeaderSize+2:], pkt.Payload) - bele.BEPutUint32(tag.Raw[httpflv.TagHeaderSize+int(tag.Header.DataSize):], uint32(httpflv.TagHeaderSize)+tag.Header.DataSize) - default: - err = ErrRemux - return - } - - return -} diff --git a/pkg/remux/avpacket2rtmp.go b/pkg/remux/avpacket2rtmp.go index 046f720..c30361b 100644 --- a/pkg/remux/avpacket2rtmp.go +++ b/pkg/remux/avpacket2rtmp.go @@ -1,4 +1,4 @@ -// Copyright 2020, Chef. All rights reserved. +// Copyright 2021, Chef. All rights reserved. // https://github.com/q191201771/lal // // Use of this source code is governed by a MIT-style license @@ -14,177 +14,277 @@ import ( "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/hevc" "github.com/q191201771/lal/pkg/rtmp" + "github.com/q191201771/lal/pkg/rtprtcp" + "github.com/q191201771/lal/pkg/sdp" + "github.com/q191201771/naza/pkg/bele" + "github.com/q191201771/naza/pkg/nazalog" ) -// @return 返回的内存块为新申请的独立内存块 +// AVPacket转换为RTMP +// 目前AVPacket来自RTSP的sdp以及rtp包。理论上也支持webrtc,后续接入webrtc时再验证 +type AVPacket2RTMPRemuxer struct { + onRTMPAVMsg rtmp.OnReadRTMPAVMsg + + hasEmittedMetadata bool + audioType base.AVPacketPT + videoType base.AVPacketPT + + vps []byte // 从AVPacket数据中获取 + sps []byte + pps []byte +} + +func NewAVPacket2RTMPRemuxer(onRTMPAVMsg rtmp.OnReadRTMPAVMsg) *AVPacket2RTMPRemuxer { + return &AVPacket2RTMPRemuxer{ + onRTMPAVMsg: onRTMPAVMsg, + audioType: base.AVPacketPTUnknown, + videoType: base.AVPacketPTUnknown, + } +} + +// 实现RTSP回调数据的三个接口,使得接入时方便些 +func (r *AVPacket2RTMPRemuxer) OnRTPPacket(pkt rtprtcp.RTPPacket) { + // noop +} +func (r *AVPacket2RTMPRemuxer) OnSDP(sdpCtx sdp.LogicContext) { + r.InitWithAVConfig(sdpCtx.ASC, sdpCtx.VPS, sdpCtx.SPS, sdpCtx.PPS) +} +func (r *AVPacket2RTMPRemuxer) OnAVPacket(pkt base.AVPacket) { + r.FeedAVPacket(pkt) +} + +// rtsp场景下,有时sps、pps等信息只包含在sdp中,有时包含在rtp包中, +// 这里提供输入sdp的sps、pps等信息的机会,如果没有,可以不调用 // -func AVConfig2RTMPMsg(asc, vps, sps, pps []byte) (metadata, ash, vsh *base.RTMPMsg, err error) { - var bMetadata []byte +// 内部不持有输入参数的内存块 +// +func (r *AVPacket2RTMPRemuxer) InitWithAVConfig(asc, vps, sps, pps []byte) { + var err error var bVsh []byte var bAsh []byte - hasAudio := asc != nil - hasVideo := sps != nil && pps != nil - isHEVC := vps != nil + if asc != nil { + r.audioType = base.AVPacketPTAAC + } + if sps != nil && pps != nil { + if vps != nil { + r.videoType = base.AVPacketPTHEVC + } else { + r.videoType = base.AVPacketPTAVC + } + } - if !hasAudio && !hasVideo { - err = ErrRemux + if r.audioType == base.AVPacketPTUnknown && r.videoType == base.AVPacketPTUnknown { + nazalog.Warn("has no audio or video") return } - audiocodecid := -1 - if hasAudio { - audiocodecid = int(base.RTMPSoundFormatAAC) + if r.audioType != base.AVPacketPTUnknown { + bAsh, err = aac.BuildAACSeqHeader(asc) + if err != nil { + nazalog.Errorf("build aac seq header failed. err=%+v", err) + return + } } - videocodecid := -1 - width := -1 - height := -1 - if hasVideo { - if isHEVC { - videocodecid = int(base.RTMPCodecIDHEVC) - var ctx hevc.Context - if err = hevc.ParseSPS(sps, &ctx); err != nil { - return - } - width = int(ctx.PicWidthInLumaSamples) - height = int(ctx.PicHeightInLumaSamples) + + if r.videoType != base.AVPacketPTUnknown { + if r.videoType == base.AVPacketPTHEVC { bVsh, err = hevc.BuildSeqHeaderFromVPSSPSPPS(vps, sps, pps) if err != nil { + nazalog.Errorf("build hevc seq header failed. err=%+v", err) return } } else { - videocodecid = int(base.RTMPCodecIDAVC) - var ctx avc.Context - err = avc.ParseSPS(sps, &ctx) - if err != nil { - return - } - if ctx.Width != 0 { - width = int(ctx.Width) - } - if ctx.Height != 0 { - height = int(ctx.Height) - } bVsh, err = avc.BuildSeqHeaderFromSPSPPS(sps, pps) if err != nil { + nazalog.Errorf("build avc seq header failed. err=%+v", err) return } } } - if hasAudio { - bAsh, err = aac.BuildAACSeqHeader(asc) - if err != nil { - return - } - } - - var h base.RTMPHeader - - bMetadata, err = rtmp.BuildMetadata(width, height, audiocodecid, videocodecid) - if err != nil { - return + if r.audioType != base.AVPacketPTUnknown { + r.emitRTMPAVMsg(true, bAsh, 0) } - h.MsgLen = uint32(len(bMetadata)) - h.TimestampAbs = 0 - h.MsgTypeID = base.RTMPTypeIDMetadata - h.MsgStreamID = rtmp.MSID1 - h.CSID = rtmp.CSIDAMF - metadata = &base.RTMPMsg{ - Header: h, - Payload: bMetadata, - } - - if hasVideo { - h.MsgLen = uint32(len(bVsh)) - h.TimestampAbs = 0 - h.MsgTypeID = base.RTMPTypeIDVideo - h.MsgStreamID = rtmp.MSID1 - h.CSID = rtmp.CSIDVideo - vsh = &base.RTMPMsg{ - Header: h, - Payload: bVsh, - } - } - - if hasAudio { - h.MsgLen = uint32(len(bAsh)) - h.TimestampAbs = 0 - h.MsgTypeID = base.RTMPTypeIDAudio - h.MsgStreamID = rtmp.MSID1 - h.CSID = rtmp.CSIDAudio - ash = &base.RTMPMsg{ - Header: h, - Payload: bAsh, - } + if r.videoType != base.AVPacketPTUnknown { + r.emitRTMPAVMsg(false, bVsh, 0) } - - return } -// @return 返回的内存块为新申请的独立内存块 -func AVPacket2RTMPMsg(pkt base.AVPacket) (msg base.RTMPMsg, err error) { +// @param pkt: 内部不持有该内存块 +// +func (r *AVPacket2RTMPRemuxer) FeedAVPacket(pkt base.AVPacket) { switch pkt.PayloadType { case base.AVPacketPTAVC: fallthrough case base.AVPacketPTHEVC: - msg.Header.TimestampAbs = pkt.Timestamp - msg.Header.MsgStreamID = rtmp.MSID1 - - msg.Header.MsgTypeID = base.RTMPTypeIDVideo - msg.Header.CSID = rtmp.CSIDVideo - msg.Header.MsgLen = uint32(len(pkt.Payload)) + 5 - - msg.Payload = make([]byte, msg.Header.MsgLen) - - var nals [][]byte - nals, err = avc.SplitNALUAVCC(pkt.Payload) + nals, err := avc.SplitNALUAVCC(pkt.Payload) if err != nil { + nazalog.Errorf("iterate nalu failed. err=%+v", err) return } + + pos := 5 + maxLength := len(pkt.Payload) + pos + payload := make([]byte, maxLength) + for _, nal := range nals { - switch pkt.PayloadType { - case base.AVPacketPTAVC: + if pkt.PayloadType == base.AVPacketPTAVC { t := avc.ParseNALUType(nal[0]) - if t == avc.NALUTypeIDRSlice { - msg.Payload[0] = base.RTMPAVCKeyFrame + if t == avc.NALUTypeSPS || t == avc.NALUTypePPS { + // 如果有sps,pps,先把它们抽离出来进行缓存 + if t == avc.NALUTypeSPS { + r.setSPS(nal) + } else { + r.setPPS(nal) + } + + if r.sps != nil && r.pps != nil { + // TODO(chef): 是否应该判断sps、pps是连续的,比如rtp seq的关系,或者timestamp是相等的 + // 凑齐了,发送video seq header + + bVsh, err := avc.BuildSeqHeaderFromSPSPPS(r.sps, r.pps) + if err != nil { + nazalog.Errorf("build avc seq header failed. err=%+v", err) + continue + } + r.emitRTMPAVMsg(false, bVsh, pkt.Timestamp) + r.clearVideoSeqHeader() + } } else { - msg.Payload[0] = base.RTMPAVCInterFrame + // 重组实际数据 + + if t == avc.NALUTypeIDRSlice { + payload[0] = base.RTMPAVCKeyFrame + } else { + payload[0] = base.RTMPAVCInterFrame + } + payload[1] = base.RTMPAVCPacketTypeNALU + bele.BEPutUint32(payload[pos:], uint32(len(nal))) + pos += 4 + copy(payload[pos:], nal) + pos += len(nal) } - msg.Payload[1] = base.RTMPAVCPacketTypeNALU - case base.AVPacketPTHEVC: + } else if pkt.PayloadType == base.AVPacketPTHEVC { t := hevc.ParseNALUType(nal[0]) - if t == hevc.NALUTypeSliceIDR || t == hevc.NALUTypeSliceIDRNLP { - msg.Payload[0] = base.RTMPHEVCKeyFrame + if t == hevc.NALUTypeVPS || t == hevc.NALUTypeSPS || t == hevc.NALUTypePPS { + if t == hevc.NALUTypeVPS { + r.setVPS(nal) + } else if t == hevc.NALUTypeSPS { + r.setSPS(nal) + } else { + r.setPPS(nal) + } + if r.vps != nil && r.sps != nil && r.pps != nil { + bVsh, err := hevc.BuildSeqHeaderFromVPSSPSPPS(r.vps, r.sps, r.pps) + if err != nil { + nazalog.Errorf("build hevc seq header failed. err=%+v", err) + continue + } + r.emitRTMPAVMsg(false, bVsh, pkt.Timestamp) + r.clearVideoSeqHeader() + } } else { - msg.Payload[0] = base.RTMPHEVCInterFrame + if t == hevc.NALUTypeSliceIDR || t == hevc.NALUTypeSliceIDRNLP { + payload[0] = base.RTMPHEVCKeyFrame + } else { + payload[0] = base.RTMPHEVCInterFrame + } + payload[1] = base.RTMPHEVCPacketTypeNALU + bele.BEPutUint32(payload[pos:], uint32(len(nal))) + pos += 4 + copy(payload[pos:], nal) + pos += len(nal) } - msg.Payload[1] = base.RTMPHEVCPacketTypeNALU } } - msg.Payload[2] = 0x0 // cts - msg.Payload[3] = 0x0 - msg.Payload[4] = 0x0 - copy(msg.Payload[5:], pkt.Payload) - //nazalog.Debugf("%d %s", len(msg.Payload), hex.Dump(msg.Payload[:32])) + // 有实际数据 + if pos > 5 { + r.emitRTMPAVMsg(false, payload[:pos], pkt.Timestamp) + } + case base.AVPacketPTAAC: - msg.Header.TimestampAbs = pkt.Timestamp - msg.Header.MsgStreamID = rtmp.MSID1 + length := len(pkt.Payload) + 2 + payload := make([]byte, length) + // TODO(chef) 处理此处的魔数0xAF + payload[0] = 0xAF + payload[1] = base.RTMPAACPacketTypeRaw + copy(payload[2:], pkt.Payload) + r.emitRTMPAVMsg(true, payload, pkt.Timestamp) + default: + nazalog.Warnf("unsupported packet. type=%d", pkt.PayloadType) + } +} - msg.Header.MsgTypeID = base.RTMPTypeIDAudio - msg.Header.CSID = rtmp.CSIDAudio - msg.Header.MsgLen = uint32(len(pkt.Payload)) + 2 +func (r *AVPacket2RTMPRemuxer) emitRTMPAVMsg(isAudio bool, payload []byte, timestamp uint32) { + if !r.hasEmittedMetadata { + // TODO(chef): 此处简化了从sps中获取宽高写入metadata的逻辑 + audiocodecid := -1 + videocodecid := -1 + if r.audioType == base.AVPacketPTAAC { + audiocodecid = int(base.RTMPSoundFormatAAC) + } + switch r.videoType { + case base.AVPacketPTAVC: + videocodecid = int(base.RTMPCodecIDAVC) + case base.AVPacketPTHEVC: + videocodecid = int(base.RTMPCodecIDHEVC) + } + bMetadata, err := rtmp.BuildMetadata(-1, -1, audiocodecid, videocodecid) + if err != nil { + nazalog.Errorf("build metadata failed. err=%+v", err) + return + } + r.onRTMPAVMsg(base.RTMPMsg{ + Header: base.RTMPHeader{ + CSID: rtmp.CSIDAMF, + MsgLen: uint32(len(bMetadata)), + MsgTypeID: base.RTMPTypeIDMetadata, + MsgStreamID: rtmp.MSID1, + TimestampAbs: 0, + }, + Payload: bMetadata, + }) + r.hasEmittedMetadata = true + } - msg.Payload = make([]byte, msg.Header.MsgLen) - msg.Payload[0] = 0xAF - msg.Payload[1] = base.RTMPAACPacketTypeRaw - copy(msg.Payload[2:], pkt.Payload) - default: - err = ErrRemux - return + var msg base.RTMPMsg + msg.Header.MsgStreamID = rtmp.MSID1 + + if isAudio { + msg.Header.CSID = rtmp.CSIDAudio + msg.Header.MsgTypeID = base.RTMPTypeIDAudio + } else { + msg.Header.CSID = rtmp.CSIDVideo + msg.Header.MsgTypeID = base.RTMPTypeIDVideo } - return + msg.Header.MsgLen = uint32(len(payload)) + msg.Header.TimestampAbs = timestamp + msg.Payload = payload + + r.onRTMPAVMsg(msg) +} + +func (r *AVPacket2RTMPRemuxer) setVPS(b []byte) { + r.vps = r.vps[0:0] + r.vps = append(r.vps, b...) +} + +func (r *AVPacket2RTMPRemuxer) setSPS(b []byte) { + r.sps = r.sps[0:0] + r.sps = append(r.sps, b...) +} + +func (r *AVPacket2RTMPRemuxer) setPPS(b []byte) { + r.pps = r.pps[0:0] + r.pps = append(r.pps, b...) +} + +func (r *AVPacket2RTMPRemuxer) clearVideoSeqHeader() { + r.vps = r.vps[0:0] + r.sps = r.sps[0:0] + r.pps = r.pps[0:0] } diff --git a/pkg/remux/avpacket2rtmp_test.go b/pkg/remux/avpacket2rtmp_test.go new file mode 100644 index 0000000..3cffc57 --- /dev/null +++ b/pkg/remux/avpacket2rtmp_test.go @@ -0,0 +1,86 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package remux_test + +import ( + "encoding/hex" + "testing" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/remux" + "github.com/q191201771/naza/pkg/nazalog" +) + +// #85 +func TestCase1(t *testing.T) { + ps := []string{ + "0000002c67640032ad84010c20086100430802184010c200843b5014005ad370101014000003000400000300ca100002", + "0000000468ee3cb0", + } + golden := []base.AVPacket{ + { + Timestamp: 10340642, + PayloadType: base.AVPacketPTAVC, + }, + { + Timestamp: 10340642, + PayloadType: base.AVPacketPTAVC, + }, + } + for i := range ps { + p, _ := hex.DecodeString(ps[i]) + golden[i].Payload = p + } + + remuxer := remux.NewAVPacket2RTMPRemuxer(func(msg base.RTMPMsg) { + nazalog.Debugf("%+v", msg) + }) + for _, p := range golden { + remuxer.FeedAVPacket(p) + } +} + +func TestCase2(t *testing.T) { + ps := []string{ + "0000001840010c01ffff016000000300b0000003000003007bac0901", + "00000024420101016000000300b0000003000003007ba003c08010e58dae4914bf37010101008001", + "0000000c4401c0f2c68d03b240000003", + "0000000c4e01e504ebc3000080000003", + } + golden := []base.AVPacket{ + { + Timestamp: 25753900, + PayloadType: base.AVPacketPTHEVC, + }, + { + Timestamp: 25753900, + PayloadType: base.AVPacketPTHEVC, + }, + { + Timestamp: 25753900, + PayloadType: base.AVPacketPTHEVC, + }, + { + Timestamp: 25753900, + PayloadType: base.AVPacketPTHEVC, + }, + } + + for i := range ps { + p, _ := hex.DecodeString(ps[i]) + golden[i].Payload = p + } + + remuxer := remux.NewAVPacket2RTMPRemuxer(func(msg base.RTMPMsg) { + nazalog.Debugf("%+v", msg) + }) + for _, p := range golden { + remuxer.FeedAVPacket(p) + } +} diff --git a/pkg/rtprtcp/rtp_unpacker.go b/pkg/rtprtcp/rtp_unpacker.go index d7e09bb..ebd51a6 100644 --- a/pkg/rtprtcp/rtp_unpacker.go +++ b/pkg/rtprtcp/rtp_unpacker.go @@ -50,10 +50,14 @@ type IRTPUnpackerProtocol interface { // @param pkt: pkt.Timestamp RTP包头中的时间戳(pts)经过clockrate换算后的时间戳,单位毫秒 // 注意,不支持带B帧的视频流,pts和dts永远相同 // pkt.PayloadType base.AVPacketPTXXX -// pkt.Payload 如果是AAC,返回的是raw frame,一个AVPacket只包含一帧 -// 如果是AVC或HEVC,是AVCC格式,每个NAL前包含4字节NAL的长度 -// AAC引用的是接收到的RTP包中的内存块 -// AVC或者HEVC是新申请的内存块,回调结束后,内部不再使用该内存块 +// pkt.Payload AAC: +// 返回的是raw frame,一个AVPacket只包含一帧 +// 引用的是接收到的RTP包中的内存块 +// AVC或HEVC: +// AVCC格式,每个NAL前包含4字节NAL的长度 +// 新申请的内存块,回调结束后,内部不再使用该内存块 +// 注意,这一层只做RTP包的合并,假如sps和pps是两个RTP single包,则合并结果为两个AVPacket, +// 假如sps和pps是一个stapA包,则合并结果为一个AVPacket type OnAVPacket func(pkt base.AVPacket) // 目前支持AVC,HEVC和AAC MPEG4-GENERIC/44100/2,业务方也可以自己实现IRTPUnpackerProtocol,甚至是IRTPUnpackContainer diff --git a/pkg/rtprtcp/rtp_unpacker_test.go b/pkg/rtprtcp/rtp_unpacker_test.go index abb6c8f..172e48d 100644 --- a/pkg/rtprtcp/rtp_unpacker_test.go +++ b/pkg/rtprtcp/rtp_unpacker_test.go @@ -12,10 +12,79 @@ import ( "encoding/hex" "testing" + "github.com/q191201771/naza/pkg/nazalog" + + "github.com/q191201771/naza/pkg/bele" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/assert" ) +// #85 +func TestAVCCase1(t *testing.T) { + // single sps + // single pps + // FUA IDR + ss := []string{ + "a06000013778b64c1921d51867640032ad84010c20086100430802184010c200843b5014005ad370101014000003000400000300ca100002", + "806000023778b64c1921d51868ee3cb0", + "806000033778b64c1921d5187c85b8000006bff0ffee6021caad8ffdd001c15e27306e7fc6bf36ca8b6bc1411afef158dc64d75094f5b2bbddec364e6904cc9a14b4069bb6b6cb6d6ab8f132e77e3324291351b52e4a58fce30dd7b64313d208df50ab636423abce5a0dcc6c3ff8397c84250042244aafef705063debcdd7fe6c6c2fd41b4c6251fe4ca318c40e4bf2eb373246f14ad1623f9f5356154b02c0c8f53fcd8b6ad452da457b48ba704ec227f16b59d9e7c0d50423bb7f78059a68d26aef86bc94ebb27723c6eee018b45977028b400e474d40f7464fcb5f0f292b23a2324486bdd500fe72b115bd93bdf5f0f9207daedfc02e4b8bed2c2b8a4b081b00c3a9172e1ce34222a870d06b925a9263c59276d4ad6bd82bdab07f8e01b3ae3263899948d345f83e4ff127b3c4d2721d3543880a0ea72d9b13ea0a4dd64ca4617f3d63b0182f9cc37c917dc620853f12487d608bc76fe3d5bc039fc9df161d4d6ca2cae273f7893ffbe78ba9e0e85217c9c2b6214e48e831cf4eed74f1e70f3050fcd72dd466d1a3d00f500b884c141625f5daba37c20e4623387406a6930819f08c0e3321ea7695d92e8da38e5d926767926ee5ebce20e4eecbd7aea2b8756489271ba7c2977c4e568a4a25c82b159cc3f5f6575f1b96ad1c589267947258bc1e62d5eed1a17a99219c019fd9167754310ad3c2ef1acbf304fe0b5342d09ae20b4b4fc49f5231ca744977d3a73ce2821f8cc76f1c0149ae8d5dadf44bf87ad989a311ce57ad19e4579bc8cdc309eb28954e875441f9c135182eca8f3016aa5792ec3d1926da40694465dfd625b8c8c836792f4cee619bcce39a71c8055827cbd5c22fedfcb7626534d07385df9247b6338e9bc68c1ee8903a4fc9309decc5ce2ca60a7331ccd15f285ddb9be2134173fada2150ce68dddc4d2ef397624d74cb0ff9dab86664e17a0bc76ec86875736e68133e3bcdf5419cd6ce19ff1108bbe7e39ea36578ea8e8a4cbe6c850095467f22c1c220aa8203f8ee16d39de0f414db0f46dbc2f7f34b4dc7062aeed316c991522efb7fd8ac10d510596638f0bc330005317ab085787db0b26ed4c6086513d059ceef9274d6788a9bf30abb1cf6b353d9e57103e10a299df9d77c688cc0d69af84f319e97f55db30d419fdf9a028b4185fa1cae5f2fdfadcb0abc68ca03968a8b4801d31d8096451b1dd54d159d81b3cf8554a43a65a80fc1581eb00719de7c606201c31af22d0fef3208889c4f0a5c0ea06ecdb13e6d99cff25f449d1fe90c3d7beadc52595b12d9e58fc647ebbc30ac4131fad7dfad3a6ea9d51948b095d9aafd48a0545378c2083406a2248f4876e104d76d22ce0a552848d89d603392dad6486554a58dd8fb154b70e52d4c4e1f093d73d731563bebf9fe61493779ba791df3dc65430cf27d00292609f6c09a682e9f4b03a4e3507459ae06ff07e66fa992b54f8d0837cf806d9921ac916b1fe064adb76e7cedfa4e2c5a9126a0f7a7927e5686b990cc87f89612b983cb81783e0763a548648ef73d855a0afeaf78192047b060fcf8b1016a46016de75983d7e57c70d5fa5259012e465531f953b27ca67b554620a5e194386b8fd6d784e989d151b7326d028ff28b2707d376147ca3657a60daa7158e630b9def41c579a02b24c6b5cbe5161c985bf5a6ba41b033be092c4a590098aca5a4a6b8aef024863ef403fe4cb4dc2010dedd3cc1c5f7c6a123e2d7f8fd8feef793afb2402a763df927316b337d83808f13354f4706af395556caa6ec0efb724a7abe587b8eff333d64623ad7556a735dcd04bfe88c8f57327625aef8f25a0b722499de91a509ad383650d3ba250e16c5dd2671d67039b9404c174863a9af5c2738dd293770488c034c1f4f9c0d1cfb8f02c9bcecfb24a17ad06c7163788f22c8bbeb30b26423ad22515aca1916a28f716aad8b970623f054537448d74ee4822", + } + + testHelperTemplete(t, base.AVPacketPTAVC, 90000, 128, ss, func(rtpPackets []RTPPacket) []base.AVPacket { + return []base.AVPacket{ + { + Timestamp: 10340642, + PayloadType: base.AVPacketPTAVC, + Payload: testHelperAddPrefixLength(rtpPackets[0].Raw[12:]), + }, + { + Timestamp: 10340642, + PayloadType: base.AVPacketPTAVC, + Payload: testHelperAddPrefixLength(rtpPackets[1].Raw[12:]), + }, + } + }) +} + +func TestHEVCCase1(t *testing.T) { + // single vps + // single sps + // single pps + // single sei + ss := []string{ + "a060d3a38a27999a01c0125940010c01ffff016000000300b0000003000003007bac0901", + "a060d3a48a27999a01c01259420101016000000300b0000003000003007ba003c08010e58dae4914bf37010101008001", + "a060d3a58a27999a01c012594401c0f2c68d03b240000003", + "a060d3a68a27999a01c012594e01e504ebc3000080000003", + } + + testHelperTemplete(t, base.AVPacketPTHEVC, 90000, 128, ss, func(rtpPackets []RTPPacket) []base.AVPacket { + return []base.AVPacket{ + { + Timestamp: 25753900, + PayloadType: base.AVPacketPTHEVC, + Payload: testHelperAddPrefixLength(rtpPackets[0].Raw[12:]), + }, + { + Timestamp: 25753900, + PayloadType: base.AVPacketPTHEVC, + Payload: testHelperAddPrefixLength(rtpPackets[1].Raw[12:]), + }, + { + Timestamp: 25753900, + PayloadType: base.AVPacketPTHEVC, + Payload: testHelperAddPrefixLength(rtpPackets[2].Raw[12:]), + }, + { + Timestamp: 25753900, + PayloadType: base.AVPacketPTHEVC, + Payload: testHelperAddPrefixLength(rtpPackets[3].Raw[12:]), + }, + } + }) +} + func TestAACCase1(t *testing.T) { ss := []string{ "80e10e9a56843e4cf0bdf2fe00102d10214e6c425f74815f92415f94415f924100000114008a004027f313d564a770026fd01203c9cbac420e1aecaa41efb4619391309daa7938b4b905989c5c293c024819e4234eb324934327e3f083c98c2220137bc9b5b2bae5809351710e4bfec0dec11fffe0ab6e568f8357fb74c3461e892fc3a4e22ac512fa73bdc13004b9d73e21c5221387922692939994e38ab1516ea2cfb64df8cffb5468a81e4704f255a5139f7f21d622b944e0b08221916d4ba6e10a31b2a148c38642d1c8b2b12a15681c0f0b589160feb32b43c7ebade413590916a934148ca7915d6250904e5172a3ecc612ab23fd4431b0eef591be52155e4a0ac8564108012689c4df89221892e50d8fca64a747bb2313908211004222ea6a848132e87935978e03845905ccc057c743c7424e64e2338bfe52090d4a5a961f35ec5e14befbc2b3d41f89bdfe949fc2dd40e141b13e397f84f7e1ef43f303df77407bdacc01c2832c3d3df65fe3bfde0001f37f85f4a021f27baf11e4f388fd5ce464337648fc89e6043a0c8268a44f0e024e112910c8b9bc282ee2e5587762b2752c08a422ac99558f904273f858d9de9938ef2502493ae3217c247410eb39395f084a117f6486161103c5cab0881d7c24ffd7123cdcac226bc292c2dc213d04f0b26dd864e6e149c3a4468a48d86124209e172166c5b331e434c8cae4276e191acc2682d692a51210c1ce2254102e048435919442449f2c57bb271364fbb67f1d79364ee004a28c88e75dd53210eef613186531f22908a5253a87e66423fe1bffef7dd95fed588acf815908808fefff2ffef581fe23b2b8e9fdfbd7e3f7071c05d40ec89401efbdf87beee9a9c1eff2d054c0f7ff5febc514514514514514514517d7fe0faf145145111fc0dd8218d9f9d064ee8895cb84ee848064e7593530495f87ffa276e76549a4e6c0e381270c24ea288e0e1d8ad270f0dc243210e413a84b160e085b10a416be12011022b31354b202020771fe376bf6afe387e67fe0fee8087bee102bf415fa0afd0400000e3e3e3fe001f8f8ff53eff60fafab04fe0877121cebad11c16048271174dc251cbdd3da84e0c9ba23dd312b26500020154a269d9c4520270444131665313103f0f7412d40de5c59cfff0f6383440fc588e986f10b35ddbfdb6be780f30eb25f49fd7e479062097adbc044ba3e0c956ef043c31d188a23d453e83ca71ca44df64805f4514916e71c7e760674293094980642012d104ba2cacd260456e2e93a8c1f492739f9dd445aba9241088de157cb3d976e20888a41b34808f3a4ca181528c989641e0a187dc5443494c896e4421711580a84276a12a229662108289464d105b1513f80841836f009c987f86969641eea943c86c475685fb09043ae947ac5d22ff6ff7c1d380a48885c2a7d03905668ec9fd1fcdf7fc700450094d16f034c90003df9240b851764658e101efb8ea83855071d6871c5126a8385556aa0db094e571d9dc7271352b846810c4e24000000000000038e60384c4f09800000021e17eb447ca381218ff036078221ec1e7843a466acc013c0130585c7518330993235855b71158610843210aa9ba05c22ea5984de5e1227e3e4d2dd942a1659f5320232b45bb6c6418df87b313c727fc3f1d170a6ecb0b74e4449209804a3bf270c98518182b79dd74412c24f7e4f45131a889442502df3928282006d0f165c6e4d39059e7404eb1092044c67267817686dc7dbe82281f1d2f65ddaf70e67250c77adbe39d09be32b8f0007d2f905140d3c34fb6784075ce77075dfb069da091017344c80b6b9a36ff1d93c22889d5a7ccaa2073ccf278e1f8e9c38e1c9a8f502b8e9c84938542ae3a7e1313c26240000000000000384c4f1cc4f09890000021f49fc3847d1b76e8613938122e759d2494b9e4634fc812ab747e3132beb075bf23fed67a0943a245082648a4d06e11166a25f110a24c0c993964042ff0a8d24e4108c489514499641221f84c013c06008e0301c2e0038e6009e0327c7326000001c", @@ -24,38 +93,25 @@ func TestAACCase1(t *testing.T) { "80e10e9d5684464cf0bdf2fe00102db871cdd0709ba0e0", } - var pkts []RTPPacket - for _, s := range ss { - pkt, err := hexstream2rtppacket(s) - assert.Equal(t, nil, err) - pkts = append(pkts, pkt) - } - - expected := []base.AVPacket{ - { - Timestamp: 30239734, - PayloadType: base.AVPacketPTAAC, - Payload: pkts[0].Raw[12+4:], - }, - { - Timestamp: 30239756, - PayloadType: base.AVPacketPTAAC, - Payload: pkts[1].Raw[12+4:], - }, - { - Timestamp: 30239777, - PayloadType: base.AVPacketPTAAC, - Payload: append(pkts[2].Raw[12+4:], pkts[3].Raw[12+4:]...), - }, - } - var outPkts []base.AVPacket - unpacker := DefaultRTPUnpackerFactory(base.AVPacketPTAAC, 48000, 128, func(pkt base.AVPacket) { - outPkts = append(outPkts, pkt) + testHelperTemplete(t, base.AVPacketPTAAC, 48000, 128, ss, func(rtpPackets []RTPPacket) []base.AVPacket { + return []base.AVPacket{ + { + Timestamp: 30239734, + PayloadType: base.AVPacketPTAAC, + Payload: rtpPackets[0].Raw[12+4:], + }, + { + Timestamp: 30239756, + PayloadType: base.AVPacketPTAAC, + Payload: rtpPackets[1].Raw[12+4:], + }, + { + Timestamp: 30239777, + PayloadType: base.AVPacketPTAAC, + Payload: append(rtpPackets[2].Raw[12+4:], rtpPackets[3].Raw[12+4:]...), + }, + } }) - for _, pkt := range pkts { - unpacker.Feed(pkt) - } - assert.Equal(t, expected, outPkts) } func TestAACCase2(t *testing.T) { @@ -66,60 +122,84 @@ func TestAACCase2(t *testing.T) { "80e104588424be31b06db689001016d8211c8d2ff7fffffffd1975a43251621000000006af6ba26aa2e3f904d00271b1e487d009e4320471d8f204892ad2252d72b8700260e8b4113e20831048a5a0438f1b93832a048415ff73b5bf69631c8a4e4ca6eb89fc8454ab3558b7ed96e5557083eb2ac47779eb23fa2d680f806dfa4ffd6590df3f6e990132030626063e01e73386af34ea3ff2746f376e6c8fcc3f0574828a15f1375e386521f5b50f0f4aea7ffdb9e0d27caff771bbfd4788dcbd118e539df41f01be771cf33af56f9a35090fd7b57dfd8f557cb9e398b7df5f1ac4d4bb56f69cdd2211b6f49679293c43dc7b283840528ea9f573ee24cc7c055d4b9844491583e315d70af6dd5e73fdab59fd17d532f48dda3cb452cba838dccbbdc2b7d5b4699a8e565a4b71035f2a9bd97bb88b90057c240692aa50aca5486142176a21104b5b19c1a691b086d4118810597934c2302e49091073911ba492badc8dba7ec264c5d29602f779ff3b58a48c510a681f42e7cdfd3ecf37fa7e8f6f8eda069baed02641111cea27687eba9db1f5471d5bb4734a6ca7f63caa22f5c04b5df2a7a5f75000ca648557f5c4ee3bca7e079e382643a1a808cb851d8e67222204200000000d5ed744d545c7f916413a981219ae144e8cafae1383088cb0139c294c240e3f9c9dc1d324047aaf90dd06794b20bebd8f4d73ffefb80f3d6dfde7dddd613573e886de98f79a921d12eb49909e8b4415f66cdc863a11bf1aaada975bd2d91e20d7f58ca2a3219d9bb55d873c2c399a17028a2bdc09a513165845609d47ee11f4f5e8822407a13c1f876c728291c28931eb6004cc5e86a441a29503a82c2b848200ee55e3df0880cc00dbd92d204170376d1d2c19355727b70dd508350425a031cb73d7fee9bbd887878b6b4a3ebb056e59b44b1e7a4884f1e3cfdab2b3607e594e2cf47eb553f54696c0acb08ba8b411ed053572db39a5c885723bc0bd2d97d92b27b5381d5bd555ae9fba09cde8ea4e66a7bc1b3488deebff89ad1d8aac49546b79e301c0", } - var pkts []RTPPacket - for _, s := range ss { - pkt, err := hexstream2rtppacket(s) - assert.Equal(t, nil, err) - pkts = append(pkts, pkt) - } + testHelperTemplete(t, base.AVPacketPTAAC, 32000, 128, ss, func(rtpPackets []RTPPacket) []base.AVPacket { + return []base.AVPacket{ + { + Timestamp: 69281105, + PayloadType: base.AVPacketPTAAC, + Payload: rtpPackets[0].Raw[12+2+6 : 12+2+6+24], + }, + { + Timestamp: 69281137, + PayloadType: base.AVPacketPTAAC, + Payload: rtpPackets[0].Raw[12+2+6+24 : 12+2+6+24+6], + }, + { + Timestamp: 69281169, + PayloadType: base.AVPacketPTAAC, + Payload: rtpPackets[0].Raw[12+2+6+24+6:], + }, + { + Timestamp: 69281201, + PayloadType: base.AVPacketPTAAC, + Payload: rtpPackets[1].Raw[12+4:], + }, + { + Timestamp: 69281233, + PayloadType: base.AVPacketPTAAC, + Payload: rtpPackets[2].Raw[12+4:], + }, + { + Timestamp: 69281265, + PayloadType: base.AVPacketPTAAC, + Payload: rtpPackets[3].Raw[12+4:], + }, + } + }) +} - expected := []base.AVPacket{ - { - Timestamp: 69281105, - PayloadType: base.AVPacketPTAAC, - Payload: pkts[0].Raw[12+2+6 : 12+2+6+24], - }, - { - Timestamp: 69281137, - PayloadType: base.AVPacketPTAAC, - Payload: pkts[0].Raw[12+2+6+24 : 12+2+6+24+6], - }, - { - Timestamp: 69281169, - PayloadType: base.AVPacketPTAAC, - Payload: pkts[0].Raw[12+2+6+24+6:], - }, - { - Timestamp: 69281201, - PayloadType: base.AVPacketPTAAC, - Payload: pkts[1].Raw[12+4:], - }, - { - Timestamp: 69281233, - PayloadType: base.AVPacketPTAAC, - Payload: pkts[2].Raw[12+4:], - }, - { - Timestamp: 69281265, - PayloadType: base.AVPacketPTAAC, - Payload: pkts[3].Raw[12+4:], - }, - } +func testHelperTemplete(t *testing.T, payloadType base.AVPacketPT, clockRate int, maxSize int, hexRTPPackets []string, expectedFn func([]RTPPacket) []base.AVPacket) { + rtpPackets, err := testHelperHexstream2rtppackets(hexRTPPackets) + assert.Equal(t, nil, err) + + expected := expectedFn(rtpPackets) + assert.Equal(t, expected, testHelperUnpack(payloadType, clockRate, maxSize, rtpPackets)) +} + +func testHelperAddPrefixLength(in []byte) (out []byte) { + out = make([]byte, len(in)+4) + bele.BEPutUint32(out, uint32(len(in))) + copy(out[4:], in) + return +} + +func testHelperUnpack(payloadType base.AVPacketPT, clockRate int, maxSize int, rtpPackets []RTPPacket) []base.AVPacket { var outPkts []base.AVPacket - unpacker := DefaultRTPUnpackerFactory(base.AVPacketPTAAC, 32000, 128, func(pkt base.AVPacket) { - //nazalog.Infof("out: %d, %d", pkt.Timestamp, len(pkt.Payload)) + unpacker := DefaultRTPUnpackerFactory(payloadType, clockRate, maxSize, func(pkt base.AVPacket) { + nazalog.Debugf("%s", hex.EncodeToString(pkt.Payload)) outPkts = append(outPkts, pkt) }) - for _, pkt := range pkts { - //nazalog.Infof("in: %+v %d", pkt.Header, len(pkt.Raw)) + for _, pkt := range rtpPackets { unpacker.Feed(pkt) } - assert.Equal(t, expected, outPkts) + return outPkts +} + +func testHelperHexstream2rtppackets(hexPackets []string) (pkts []RTPPacket, err error) { + var pkt RTPPacket + for _, p := range hexPackets { + pkt, err = testHelperHexstream2rtppacket(p) + if err != nil { + return + } + pkts = append(pkts, pkt) + } + return } -func hexstream2rtppacket(in string) (pkt RTPPacket, err error) { +func testHelperHexstream2rtppacket(hexPacket string) (pkt RTPPacket, err error) { var raw []byte - raw, err = hex.DecodeString(in) + raw, err = hex.DecodeString(hexPacket) if err != nil { return } diff --git a/pkg/rtsp/base_in_session.go b/pkg/rtsp/base_in_session.go index 5ed029b..f8f924f 100644 --- a/pkg/rtsp/base_in_session.go +++ b/pkg/rtsp/base_in_session.go @@ -29,17 +29,15 @@ import ( // 聚合PubSession和PullSession,也即流数据是输入类型的session -// BaseInSession会向上层回调两种格式的数据: +// BaseInSession会向上层回调两种格式的数据(本质上是一份数据,业务方可自由选择使用): // 1. 原始的rtp packet // 2. rtp合并后的av packet +// type BaseInSessionObserver interface { - OnRTPPacket(pkt rtprtcp.RTPPacket) + OnSDP(sdpCtx sdp.LogicContext) - // @param asc: AAC AudioSpecificConfig,注意,如果不存在音频或音频不为AAC,则为nil - // @param vps, sps, pps 如果都为nil,则没有视频,如果sps, pps不为nil,则vps不为nil是H265,vps为nil是H264 - // - // 注意,4个参数可能同时为nil - OnAVConfig(asc, vps, sps, pps []byte) + // 回调收到的RTP包 + OnRTPPacket(pkt rtprtcp.RTPPacket) // @param pkt: pkt结构体中字段含义见rtprtcp.OnAVPacket OnAVPacket(pkt base.AVPacket) @@ -134,7 +132,7 @@ func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicCo } if session.observer != nil { - session.observer.OnAVConfig(session.sdpLogicCtx.ASC, session.sdpLogicCtx.VPS, session.sdpLogicCtx.SPS, session.sdpLogicCtx.PPS) + session.observer.OnSDP(session.sdpLogicCtx) } } @@ -142,7 +140,10 @@ func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicCo func (session *BaseInSession) SetObserver(observer BaseInSessionObserver) { session.observer = observer - session.observer.OnAVConfig(session.sdpLogicCtx.ASC, session.sdpLogicCtx.VPS, session.sdpLogicCtx.SPS, session.sdpLogicCtx.PPS) + // 避免在当前协程回调,降低业务方使用负担,不必担心设置监听对象和回调函数中锁重入 + go func() { + session.observer.OnSDP(session.sdpLogicCtx) + }() } func (session *BaseInSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error { diff --git a/pkg/sdp/logic.go b/pkg/sdp/logic.go index 6fd741a..426d455 100644 --- a/pkg/sdp/logic.go +++ b/pkg/sdp/logic.go @@ -21,6 +21,11 @@ type LogicContext struct { AudioClockRate int VideoClockRate int + ASC []byte + VPS []byte + SPS []byte + PPS []byte + audioPayloadTypeBase base.AVPacketPT // lal内部定义的类型 videoPayloadTypeBase base.AVPacketPT @@ -29,11 +34,6 @@ type LogicContext struct { audioAControl string videoAControl string - ASC []byte - VPS []byte - SPS []byte - PPS []byte - // 没有用上的 hasAudio bool hasVideo bool @@ -140,7 +140,7 @@ func ParseSDP2LogicContext(b []byte) (LogicContext, error) { if md.AFmtPBase != nil { ret.SPS, ret.PPS, err = ParseSPSPPS(md.AFmtPBase) if err != nil { - return ret, err + nazalog.Warnf("parse sps pps from afmtp failed. err=%+v", err) } } else { nazalog.Warnf("avc afmtp not exist.") @@ -150,7 +150,7 @@ func ParseSDP2LogicContext(b []byte) (LogicContext, error) { if md.AFmtPBase != nil { ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(md.AFmtPBase) if err != nil { - return ret, err + nazalog.Warnf("parse vps sps pps from afmtp failed. err=%+v", err) } } else { nazalog.Warnf("hevc afmtp not exist.") diff --git a/pkg/sdp/sdp_test.go b/pkg/sdp/sdp_test.go index 7404227..07c0f2d 100644 --- a/pkg/sdp/sdp_test.go +++ b/pkg/sdp/sdp_test.go @@ -360,3 +360,67 @@ a=appversion:1.0` assert.Equal(t, true, ctx.hasVideo) nazalog.Debugf("%+v", ctx) } + +// #85 +func TestCase7(t *testing.T) { + golden := `v=0 +o=- 1109162014219182 0 IN IP4 0.0.0.0 +s=HIK Media Server V3.4.106 +i=HIK Media Server Session Description : standard +e=NONE +c=IN IP4 0.0.0.0 +t=0 0 +a=control:* +b=AS:4106 +a=range:clock=20210520T063812Z-20210520T064816Z +m=video 0 RTP/AVP 96 +i=Video Media +a=rtpmap:96 H264/90000 +a=fmtp:96 profile-level-id=4D0014;packetization-mode=0 +a=control:trackID=video +b=AS:4096 +m=audio 0 RTP/AVP 98 +i=Audio Media +a=rtpmap:98 G7221/16000 +a=control:trackID=audio +b=AS:10 +a=Media_header:MEDIAINFO=494D4B48020100000400000121720110803E0000803E000000000000000000000000000000000000; +a=appversion:1.0 +` + + golden = strings.ReplaceAll(golden, "\n", "\r\n") + ctx, err := ParseSDP2LogicContext([]byte(golden)) + assert.Equal(t, nil, err) + _ = ctx +} + +func TestCase8(t *testing.T) { + golden := `v=0 +o=- 1622201479405259 1622201479405259 IN IP4 192.168.3.58 +s=Media Presentation +e=NONE +b=AS:5100 +t=0 0 +a=control:rtsp://192.168.3.58:554/Streaming/Channels/101/?transportmode=unicast +m=video 0 RTP/AVP 96 +c=IN IP4 0.0.0.0 +b=AS:5000 +a=recvonly +a=x-dimensions:1920,1080 +a=control:rtsp://192.168.3.58:554/Streaming/Channels/101/trackID=1?transportmode=unicast +a=rtpmap:96 H265/90000 +m=audio 0 RTP/AVP 8 +c=IN IP4 0.0.0.0 +b=AS:50 +a=recvonly +a=control:rtsp://192.168.3.58:554/Streaming/Channels/101/trackID=2?transportmode=unicast +a=rtpmap:8 PCMA/8000 +a=Media_header:MEDIAINFO=494D4B48010200000400050011710110401F000000FA000000000000000000000000000000000000; +a=appversion:1.0 +` + + golden = strings.ReplaceAll(golden, "\n", "\r\n") + ctx, err := ParseSDP2LogicContext([]byte(golden)) + assert.Equal(t, nil, err) + _ = ctx +}