#74 #85 [feat] 支持海康威视NVR,大华海康IDC的RTSP流(SDP不包含SPS、PPS等数据,而是通过RTP包发送)

pull/200/head^2
q191201771 4 years ago
parent 6bbc95dea0
commit 24e8887ef8

@ -17,45 +17,10 @@ import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/naza/pkg/nazalog"
)
var fileWriter httpflv.FLVFileWriter
type Observer struct {
}
func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) {
// noop
}
func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) {
metadata, ash, vsh, err := remux.AVConfig2FLVTag(asc, vps, sps, pps)
nazalog.Assert(nil, err)
err = fileWriter.WriteTag(*metadata)
nazalog.Assert(nil, err)
if ash != nil {
err = fileWriter.WriteTag(*ash)
nazalog.Assert(nil, err)
}
if vsh != nil {
err = fileWriter.WriteTag(*vsh)
nazalog.Assert(nil, err)
}
}
func (o *Observer) OnAVPacket(pkt base.AVPacket) {
tag, err := remux.AVPacket2FLVTag(pkt)
nazalog.Assert(nil, err)
err = fileWriter.WriteTag(tag)
nazalog.Assert(nil, err)
}
func main() {
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
@ -64,14 +29,18 @@ func main() {
inURL, outFilename, overTCP := parseFlag()
var fileWriter httpflv.FLVFileWriter
err := fileWriter.Open(outFilename)
nazalog.Assert(nil, err)
defer fileWriter.Dispose()
err = fileWriter.WriteRaw(httpflv.FLVHeader)
nazalog.Assert(nil, err)
o := &Observer{}
pullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) {
remuxer := remux.NewAVPacket2RTMPRemuxer(func(msg base.RTMPMsg) {
err = fileWriter.WriteTag(*remux.RTMPMsg2FLVTag(msg))
nazalog.Assert(nil, err)
})
pullSession := rtsp.NewPullSession(remuxer, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMS = 5000
option.OverTCP = overTCP != 0
})

@ -18,45 +18,10 @@ import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/naza/pkg/nazalog"
)
var pushSession *rtmp.PushSession
type Observer struct {
}
func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) {
// noop
}
func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) {
metadata, ash, vsh, err := remux.AVConfig2RTMPMsg(asc, vps, sps, pps)
nazalog.Assert(nil, err)
err = pushSession.Write(rtmp.Message2Chunks(metadata.Payload, &metadata.Header))
nazalog.Assert(nil, err)
if ash != nil {
err = pushSession.Write(rtmp.Message2Chunks(ash.Payload, &ash.Header))
nazalog.Assert(nil, err)
}
if vsh != nil {
err = pushSession.Write(rtmp.Message2Chunks(vsh.Payload, &vsh.Header))
nazalog.Assert(nil, err)
}
}
func (o *Observer) OnAVPacket(pkt base.AVPacket) {
msg, err := remux.AVPacket2RTMPMsg(pkt)
nazalog.Assert(nil, err)
err = pushSession.Write(rtmp.Message2Chunks(msg.Payload, &msg.Header))
nazalog.Assert(nil, err)
}
func main() {
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
@ -65,7 +30,7 @@ func main() {
inURL, outURL, overTCP := parseFlag()
pushSession = rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMS = 5000
option.WriteAVTimeoutMS = 5000
})
@ -74,8 +39,11 @@ func main() {
nazalog.Assert(nil, err)
defer pushSession.Dispose()
o := &Observer{}
pullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) {
remuxer := remux.NewAVPacket2RTMPRemuxer(func(msg base.RTMPMsg) {
err = pushSession.Write(rtmp.Message2Chunks(msg.Payload, &msg.Header))
nazalog.Assert(nil, err)
})
pullSession := rtsp.NewPullSession(remuxer, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMS = 5000
option.OverTCP = overTCP != 0
})

@ -14,6 +14,8 @@ import (
"os"
"time"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/rtsp"
@ -29,7 +31,7 @@ func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) {
rtpPacketChan <- pkt
}
func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) {
func (o *Observer) OnSDP(sdpCtx sdp.LogicContext) {
// noop
}

@ -157,7 +157,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) {
}
if !m.opened {
nazalog.Warnf("[%s] OnFrame A not opened.", m.UniqueKey)
nazalog.Warnf("[%s] OnFrame A not opened. boundary=%t", m.UniqueKey, boundary)
return
}
@ -176,7 +176,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) {
}
if !m.opened {
nazalog.Warnf("[%s] OnFrame V not opened.", m.UniqueKey)
nazalog.Warnf("[%s] OnFrame V not opened. boundary=%t, key=%t", m.UniqueKey, boundary, frame.Key)
return
}

@ -16,6 +16,8 @@ import (
"sync"
"time"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/remux"
@ -51,8 +53,9 @@ type Group struct {
//
stat base.StatGroup
// pub
rtmpPubSession *rtmp.ServerSession
rtspPubSession *rtsp.PubSession
rtmpPubSession *rtmp.ServerSession
rtspPubSession *rtsp.PubSession
rtsp2RTMPRemuxer *remux.AVPacket2RTMPRemuxer
// pull
pullEnable bool
pullURL string
@ -77,12 +80,6 @@ type Group struct {
//
rtmpBufWriter base.IBufWriter // TODO(chef): 后面可以在业务层加一个定时Flush
// rtsp pub使用
asc []byte
vps []byte
sps []byte
pps []byte
// mpegts使用
patpmt []byte
@ -172,6 +169,7 @@ func (group *Group) Tick() {
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey())
group.rtspPubSession.Dispose()
group.rtspPubSession = nil
group.rtsp2RTMPRemuxer = nil
}
}
if group.pullProxy.pullSession != nil {
@ -259,6 +257,7 @@ func (group *Group) Dispose() {
if group.rtspPubSession != nil {
group.rtspPubSession.Dispose()
group.rtspPubSession = nil
group.rtsp2RTMPRemuxer = nil
}
for session := range group.rtmpSubSessionSet {
@ -326,6 +325,9 @@ func (group *Group) AddRTSPPubSession(session *rtsp.PubSession) bool {
group.rtspPubSession = session
group.addIn()
group.rtsp2RTMPRemuxer = remux.NewAVPacket2RTMPRemuxer(func(msg base.RTMPMsg) {
group.broadcastByRTMPMsg(msg)
})
session.SetObserver(group)
return true
@ -546,40 +548,20 @@ func (group *Group) OnRTPPacket(pkt rtprtcp.RTPPacket) {
}
// rtsp.PubSession
func (group *Group) OnAVConfig(asc, vps, sps, pps []byte) {
// 注意,前面已经进锁了,这里依然在锁保护内
group.asc = asc
group.vps = vps
group.sps = sps
group.pps = pps
func (group *Group) OnSDP(sdpCtx sdp.LogicContext) {
group.mutex.Lock()
defer group.mutex.Unlock()
metadata, vsh, ash, err := remux.AVConfig2RTMPMsg(group.asc, group.vps, group.sps, group.pps)
if err != nil {
nazalog.Errorf("[%s] remux avconfig to metadata and seqheader failed. err=%+v", group.UniqueKey, err)
return
}
if metadata != nil {
group.broadcastByRTMPMsg(*metadata)
}
if vsh != nil {
group.broadcastByRTMPMsg(*vsh)
}
if ash != nil {
group.broadcastByRTMPMsg(*ash)
}
group.rtsp2RTMPRemuxer.OnSDP(sdpCtx)
}
// rtsp.PubSession
func (group *Group) OnAVPacket(pkt base.AVPacket) {
group.mutex.Lock()
defer group.mutex.Unlock()
//nazalog.Tracef("[%s] > Group::OnAVPacket. type=%s, ts=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp)
msg, err := remux.AVPacket2RTMPMsg(pkt)
if err != nil {
nazalog.Errorf("[%s] remux av packet to rtmp msg failed. err=+%v", group.UniqueKey, err)
return
}
group.BroadcastByRTMPMsg(msg)
group.rtsp2RTMPRemuxer.OnAVPacket(pkt)
}
func (group *Group) StringifyDebugStats() string {
@ -655,6 +637,7 @@ func (group *Group) KickOutSession(sessionID string) bool {
} else if strings.HasPrefix(sessionID, base.UKPreRTSPPubSession) {
if group.rtspPubSession != nil {
group.rtspPubSession.Dispose()
group.rtsp2RTMPRemuxer = nil
return true
}
} else if strings.HasPrefix(sessionID, base.UKPreFLVSubSession) {
@ -703,6 +686,7 @@ func (group *Group) delRTSPPubSession(session *rtsp.PubSession) {
_ = group.rtspPubSession.Dispose()
group.rtspPubSession = nil
group.rtsp2RTMPRemuxer = nil
group.delIn()
}
@ -1204,8 +1188,3 @@ func (group *Group) disposeHLSMuxer() {
group.hlsMuxer = nil
}
}
// TODO chef: 后续看是否有更合适的方法判断
func (group *Group) isHEVC() bool {
return group.vps != nil
}

@ -13,6 +13,7 @@ import (
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtsp"
)
@ -140,9 +141,12 @@ var _ HTTPAPIServerObserver = &ServerManager{}
var _ rtmp.PubSessionObserver = &Group{} //
var _ rtsp.PullSessionObserver = &Group{}
var _ rtsp.PullSessionObserver = &remux.AVPacket2RTMPRemuxer{}
var _ rtsp.PubSessionObserver = &Group{}
var _ rtsp.PubSessionObserver = &remux.AVPacket2RTMPRemuxer{}
var _ hls.MuxerObserver = &Group{}
var _ rtsp.BaseInSessionObserver = &Group{} //
var _ rtsp.BaseInSessionObserver = &remux.AVPacket2RTMPRemuxer{}
var _ rtmp.ServerSessionObserver = &rtmp.Server{}
var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientSimple{}

@ -1,203 +0,0 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package remux
import (
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
// @param asc 如果为nil则没有音频
// @param vps 如果为nil则是H264如果不为nil则是H265
// @return 返回的内存块为新申请的独立内存块
func AVConfig2FLVTag(asc, vps, sps, pps []byte) (metadata, ash, vsh *httpflv.Tag, err error) {
var bMetadata []byte
var bVsh []byte
var bAsh []byte
hasAudio := asc != nil
hasVideo := sps != nil && pps != nil
isHEVC := vps != nil
if !hasAudio && !hasVideo {
err = ErrRemux
return
}
audiocodecid := -1
if hasAudio {
audiocodecid = int(base.RTMPSoundFormatAAC)
}
videocodecid := -1
width := -1
height := -1
if hasVideo {
if isHEVC {
videocodecid = int(base.RTMPCodecIDHEVC)
var ctx hevc.Context
err = hevc.ParseSPS(sps, &ctx)
if err == nil {
width = int(ctx.PicWidthInLumaSamples)
height = int(ctx.PicHeightInLumaSamples)
} else {
nazalog.Warnf("parse hevc sps failed. err=%+v", err)
}
bVsh, err = hevc.BuildSeqHeaderFromVPSSPSPPS(vps, sps, pps)
if err != nil {
return
}
} else {
videocodecid = int(base.RTMPCodecIDAVC)
var ctx avc.Context
err = avc.ParseSPS(sps, &ctx)
if err != nil {
return
}
if ctx.Width != 0 {
width = int(ctx.Width)
}
if ctx.Height != 0 {
height = int(ctx.Height)
}
bVsh, err = avc.BuildSeqHeaderFromSPSPPS(sps, pps)
if err != nil {
return
}
}
}
if hasAudio {
bAsh, err = aac.BuildAACSeqHeader(asc)
if err != nil {
return
}
}
var h httpflv.TagHeader
var tagRaw []byte
bMetadata, err = rtmp.BuildMetadata(width, height, audiocodecid, videocodecid)
if err != nil {
return
}
h.Type = base.RTMPTypeIDMetadata
h.DataSize = uint32(len(bMetadata))
h.Timestamp = 0
tagRaw = httpflv.PackHTTPFLVTag(h.Type, h.Timestamp, bMetadata)
metadata = &httpflv.Tag{
Header: h,
Raw: tagRaw,
}
if hasVideo {
h.Type = base.RTMPTypeIDVideo
h.DataSize = uint32(len(bVsh))
h.Timestamp = 0
tagRaw = httpflv.PackHTTPFLVTag(h.Type, h.Timestamp, bVsh)
vsh = &httpflv.Tag{
Header: h,
Raw: tagRaw,
}
}
if hasAudio {
h.Type = base.RTMPTypeIDAudio
h.DataSize = uint32(len(bAsh))
h.Timestamp = 0
tagRaw = httpflv.PackHTTPFLVTag(h.Type, h.Timestamp, bAsh)
ash = &httpflv.Tag{
Header: h,
Raw: tagRaw,
}
}
return
}
// @return 返回的内存块为新申请的独立内存块
func AVPacket2FLVTag(pkt base.AVPacket) (tag httpflv.Tag, err error) {
switch pkt.PayloadType {
case base.AVPacketPTAVC:
fallthrough
case base.AVPacketPTHEVC:
tag.Header.Type = base.RTMPTypeIDVideo
tag.Header.DataSize = uint32(len(pkt.Payload)) + 5
tag.Header.Timestamp = pkt.Timestamp
tag.Raw = make([]byte, httpflv.TagHeaderSize+int(tag.Header.DataSize)+httpflv.PrevTagSizeFieldSize)
tag.Raw[0] = tag.Header.Type
bele.BEPutUint24(tag.Raw[1:], tag.Header.DataSize)
bele.BEPutUint24(tag.Raw[4:], tag.Header.Timestamp&0xFFFFFF)
tag.Raw[7] = uint8(tag.Header.Timestamp >> 24)
tag.Raw[8] = 0
tag.Raw[9] = 0
tag.Raw[10] = 0
var nals [][]byte
nals, err = avc.SplitNALUAVCC(pkt.Payload)
if err != nil {
return
}
for _, nal := range nals {
switch pkt.PayloadType {
case base.AVPacketPTAVC:
t := avc.ParseNALUType(nal[0])
if t == avc.NALUTypeIDRSlice {
tag.Raw[httpflv.TagHeaderSize] = base.RTMPAVCKeyFrame
} else {
tag.Raw[httpflv.TagHeaderSize] = base.RTMPAVCInterFrame
}
tag.Raw[httpflv.TagHeaderSize+1] = base.RTMPAVCPacketTypeNALU
case base.AVPacketPTHEVC:
t := hevc.ParseNALUType(nal[0])
if t == hevc.NALUTypeSliceIDR || t == hevc.NALUTypeSliceIDRNLP {
tag.Raw[httpflv.TagHeaderSize] = base.RTMPHEVCKeyFrame
} else {
tag.Raw[httpflv.TagHeaderSize] = base.RTMPHEVCInterFrame
}
tag.Raw[httpflv.TagHeaderSize+1] = base.RTMPHEVCPacketTypeNALU
}
}
tag.Raw[httpflv.TagHeaderSize+2] = 0x0 // cts
tag.Raw[httpflv.TagHeaderSize+3] = 0x0
tag.Raw[httpflv.TagHeaderSize+4] = 0x0
copy(tag.Raw[httpflv.TagHeaderSize+5:], pkt.Payload)
bele.BEPutUint32(tag.Raw[httpflv.TagHeaderSize+int(tag.Header.DataSize):], uint32(httpflv.TagHeaderSize)+tag.Header.DataSize)
//nazalog.Debugf("%d %s", len(msg.Payload), hex.Dump(msg.Payload[:32]))
case base.AVPacketPTAAC:
tag.Header.Type = base.RTMPTypeIDAudio
tag.Header.DataSize = uint32(len(pkt.Payload)) + 2
tag.Header.Timestamp = pkt.Timestamp
tag.Raw = make([]byte, httpflv.TagHeaderSize+int(tag.Header.DataSize)+httpflv.PrevTagSizeFieldSize)
tag.Raw[0] = tag.Header.Type
bele.BEPutUint24(tag.Raw[1:], tag.Header.DataSize)
bele.BEPutUint24(tag.Raw[4:], tag.Header.Timestamp&0xFFFFFF)
tag.Raw[7] = uint8(tag.Header.Timestamp >> 24)
tag.Raw[8] = 0
tag.Raw[9] = 0
tag.Raw[10] = 0
tag.Raw[httpflv.TagHeaderSize] = 0xAF
tag.Raw[httpflv.TagHeaderSize+1] = base.RTMPAACPacketTypeRaw
copy(tag.Raw[httpflv.TagHeaderSize+2:], pkt.Payload)
bele.BEPutUint32(tag.Raw[httpflv.TagHeaderSize+int(tag.Header.DataSize):], uint32(httpflv.TagHeaderSize)+tag.Header.DataSize)
default:
err = ErrRemux
return
}
return
}

@ -1,4 +1,4 @@
// Copyright 2020, Chef. All rights reserved.
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
@ -14,177 +14,277 @@ import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
// @return 返回的内存块为新申请的独立内存块
// AVPacket转换为RTMP
// 目前AVPacket来自RTSP的sdp以及rtp包。理论上也支持webrtc后续接入webrtc时再验证
type AVPacket2RTMPRemuxer struct {
onRTMPAVMsg rtmp.OnReadRTMPAVMsg
hasEmittedMetadata bool
audioType base.AVPacketPT
videoType base.AVPacketPT
vps []byte // 从AVPacket数据中获取
sps []byte
pps []byte
}
func NewAVPacket2RTMPRemuxer(onRTMPAVMsg rtmp.OnReadRTMPAVMsg) *AVPacket2RTMPRemuxer {
return &AVPacket2RTMPRemuxer{
onRTMPAVMsg: onRTMPAVMsg,
audioType: base.AVPacketPTUnknown,
videoType: base.AVPacketPTUnknown,
}
}
// 实现RTSP回调数据的三个接口使得接入时方便些
func (r *AVPacket2RTMPRemuxer) OnRTPPacket(pkt rtprtcp.RTPPacket) {
// noop
}
func (r *AVPacket2RTMPRemuxer) OnSDP(sdpCtx sdp.LogicContext) {
r.InitWithAVConfig(sdpCtx.ASC, sdpCtx.VPS, sdpCtx.SPS, sdpCtx.PPS)
}
func (r *AVPacket2RTMPRemuxer) OnAVPacket(pkt base.AVPacket) {
r.FeedAVPacket(pkt)
}
// rtsp场景下有时sps、pps等信息只包含在sdp中有时包含在rtp包中
// 这里提供输入sdp的sps、pps等信息的机会如果没有可以不调用
//
func AVConfig2RTMPMsg(asc, vps, sps, pps []byte) (metadata, ash, vsh *base.RTMPMsg, err error) {
var bMetadata []byte
// 内部不持有输入参数的内存块
//
func (r *AVPacket2RTMPRemuxer) InitWithAVConfig(asc, vps, sps, pps []byte) {
var err error
var bVsh []byte
var bAsh []byte
hasAudio := asc != nil
hasVideo := sps != nil && pps != nil
isHEVC := vps != nil
if asc != nil {
r.audioType = base.AVPacketPTAAC
}
if sps != nil && pps != nil {
if vps != nil {
r.videoType = base.AVPacketPTHEVC
} else {
r.videoType = base.AVPacketPTAVC
}
}
if !hasAudio && !hasVideo {
err = ErrRemux
if r.audioType == base.AVPacketPTUnknown && r.videoType == base.AVPacketPTUnknown {
nazalog.Warn("has no audio or video")
return
}
audiocodecid := -1
if hasAudio {
audiocodecid = int(base.RTMPSoundFormatAAC)
if r.audioType != base.AVPacketPTUnknown {
bAsh, err = aac.BuildAACSeqHeader(asc)
if err != nil {
nazalog.Errorf("build aac seq header failed. err=%+v", err)
return
}
}
videocodecid := -1
width := -1
height := -1
if hasVideo {
if isHEVC {
videocodecid = int(base.RTMPCodecIDHEVC)
var ctx hevc.Context
if err = hevc.ParseSPS(sps, &ctx); err != nil {
return
}
width = int(ctx.PicWidthInLumaSamples)
height = int(ctx.PicHeightInLumaSamples)
if r.videoType != base.AVPacketPTUnknown {
if r.videoType == base.AVPacketPTHEVC {
bVsh, err = hevc.BuildSeqHeaderFromVPSSPSPPS(vps, sps, pps)
if err != nil {
nazalog.Errorf("build hevc seq header failed. err=%+v", err)
return
}
} else {
videocodecid = int(base.RTMPCodecIDAVC)
var ctx avc.Context
err = avc.ParseSPS(sps, &ctx)
if err != nil {
return
}
if ctx.Width != 0 {
width = int(ctx.Width)
}
if ctx.Height != 0 {
height = int(ctx.Height)
}
bVsh, err = avc.BuildSeqHeaderFromSPSPPS(sps, pps)
if err != nil {
nazalog.Errorf("build avc seq header failed. err=%+v", err)
return
}
}
}
if hasAudio {
bAsh, err = aac.BuildAACSeqHeader(asc)
if err != nil {
return
}
}
var h base.RTMPHeader
bMetadata, err = rtmp.BuildMetadata(width, height, audiocodecid, videocodecid)
if err != nil {
return
if r.audioType != base.AVPacketPTUnknown {
r.emitRTMPAVMsg(true, bAsh, 0)
}
h.MsgLen = uint32(len(bMetadata))
h.TimestampAbs = 0
h.MsgTypeID = base.RTMPTypeIDMetadata
h.MsgStreamID = rtmp.MSID1
h.CSID = rtmp.CSIDAMF
metadata = &base.RTMPMsg{
Header: h,
Payload: bMetadata,
}
if hasVideo {
h.MsgLen = uint32(len(bVsh))
h.TimestampAbs = 0
h.MsgTypeID = base.RTMPTypeIDVideo
h.MsgStreamID = rtmp.MSID1
h.CSID = rtmp.CSIDVideo
vsh = &base.RTMPMsg{
Header: h,
Payload: bVsh,
}
}
if hasAudio {
h.MsgLen = uint32(len(bAsh))
h.TimestampAbs = 0
h.MsgTypeID = base.RTMPTypeIDAudio
h.MsgStreamID = rtmp.MSID1
h.CSID = rtmp.CSIDAudio
ash = &base.RTMPMsg{
Header: h,
Payload: bAsh,
}
if r.videoType != base.AVPacketPTUnknown {
r.emitRTMPAVMsg(false, bVsh, 0)
}
return
}
// @return 返回的内存块为新申请的独立内存块
func AVPacket2RTMPMsg(pkt base.AVPacket) (msg base.RTMPMsg, err error) {
// @param pkt: 内部不持有该内存块
//
func (r *AVPacket2RTMPRemuxer) FeedAVPacket(pkt base.AVPacket) {
switch pkt.PayloadType {
case base.AVPacketPTAVC:
fallthrough
case base.AVPacketPTHEVC:
msg.Header.TimestampAbs = pkt.Timestamp
msg.Header.MsgStreamID = rtmp.MSID1
msg.Header.MsgTypeID = base.RTMPTypeIDVideo
msg.Header.CSID = rtmp.CSIDVideo
msg.Header.MsgLen = uint32(len(pkt.Payload)) + 5
msg.Payload = make([]byte, msg.Header.MsgLen)
var nals [][]byte
nals, err = avc.SplitNALUAVCC(pkt.Payload)
nals, err := avc.SplitNALUAVCC(pkt.Payload)
if err != nil {
nazalog.Errorf("iterate nalu failed. err=%+v", err)
return
}
pos := 5
maxLength := len(pkt.Payload) + pos
payload := make([]byte, maxLength)
for _, nal := range nals {
switch pkt.PayloadType {
case base.AVPacketPTAVC:
if pkt.PayloadType == base.AVPacketPTAVC {
t := avc.ParseNALUType(nal[0])
if t == avc.NALUTypeIDRSlice {
msg.Payload[0] = base.RTMPAVCKeyFrame
if t == avc.NALUTypeSPS || t == avc.NALUTypePPS {
// 如果有spspps先把它们抽离出来进行缓存
if t == avc.NALUTypeSPS {
r.setSPS(nal)
} else {
r.setPPS(nal)
}
if r.sps != nil && r.pps != nil {
// TODO(chef): 是否应该判断sps、pps是连续的比如rtp seq的关系或者timestamp是相等的
// 凑齐了发送video seq header
bVsh, err := avc.BuildSeqHeaderFromSPSPPS(r.sps, r.pps)
if err != nil {
nazalog.Errorf("build avc seq header failed. err=%+v", err)
continue
}
r.emitRTMPAVMsg(false, bVsh, pkt.Timestamp)
r.clearVideoSeqHeader()
}
} else {
msg.Payload[0] = base.RTMPAVCInterFrame
// 重组实际数据
if t == avc.NALUTypeIDRSlice {
payload[0] = base.RTMPAVCKeyFrame
} else {
payload[0] = base.RTMPAVCInterFrame
}
payload[1] = base.RTMPAVCPacketTypeNALU
bele.BEPutUint32(payload[pos:], uint32(len(nal)))
pos += 4
copy(payload[pos:], nal)
pos += len(nal)
}
msg.Payload[1] = base.RTMPAVCPacketTypeNALU
case base.AVPacketPTHEVC:
} else if pkt.PayloadType == base.AVPacketPTHEVC {
t := hevc.ParseNALUType(nal[0])
if t == hevc.NALUTypeSliceIDR || t == hevc.NALUTypeSliceIDRNLP {
msg.Payload[0] = base.RTMPHEVCKeyFrame
if t == hevc.NALUTypeVPS || t == hevc.NALUTypeSPS || t == hevc.NALUTypePPS {
if t == hevc.NALUTypeVPS {
r.setVPS(nal)
} else if t == hevc.NALUTypeSPS {
r.setSPS(nal)
} else {
r.setPPS(nal)
}
if r.vps != nil && r.sps != nil && r.pps != nil {
bVsh, err := hevc.BuildSeqHeaderFromVPSSPSPPS(r.vps, r.sps, r.pps)
if err != nil {
nazalog.Errorf("build hevc seq header failed. err=%+v", err)
continue
}
r.emitRTMPAVMsg(false, bVsh, pkt.Timestamp)
r.clearVideoSeqHeader()
}
} else {
msg.Payload[0] = base.RTMPHEVCInterFrame
if t == hevc.NALUTypeSliceIDR || t == hevc.NALUTypeSliceIDRNLP {
payload[0] = base.RTMPHEVCKeyFrame
} else {
payload[0] = base.RTMPHEVCInterFrame
}
payload[1] = base.RTMPHEVCPacketTypeNALU
bele.BEPutUint32(payload[pos:], uint32(len(nal)))
pos += 4
copy(payload[pos:], nal)
pos += len(nal)
}
msg.Payload[1] = base.RTMPHEVCPacketTypeNALU
}
}
msg.Payload[2] = 0x0 // cts
msg.Payload[3] = 0x0
msg.Payload[4] = 0x0
copy(msg.Payload[5:], pkt.Payload)
//nazalog.Debugf("%d %s", len(msg.Payload), hex.Dump(msg.Payload[:32]))
// 有实际数据
if pos > 5 {
r.emitRTMPAVMsg(false, payload[:pos], pkt.Timestamp)
}
case base.AVPacketPTAAC:
msg.Header.TimestampAbs = pkt.Timestamp
msg.Header.MsgStreamID = rtmp.MSID1
length := len(pkt.Payload) + 2
payload := make([]byte, length)
// TODO(chef) 处理此处的魔数0xAF
payload[0] = 0xAF
payload[1] = base.RTMPAACPacketTypeRaw
copy(payload[2:], pkt.Payload)
r.emitRTMPAVMsg(true, payload, pkt.Timestamp)
default:
nazalog.Warnf("unsupported packet. type=%d", pkt.PayloadType)
}
}
msg.Header.MsgTypeID = base.RTMPTypeIDAudio
msg.Header.CSID = rtmp.CSIDAudio
msg.Header.MsgLen = uint32(len(pkt.Payload)) + 2
func (r *AVPacket2RTMPRemuxer) emitRTMPAVMsg(isAudio bool, payload []byte, timestamp uint32) {
if !r.hasEmittedMetadata {
// TODO(chef): 此处简化了从sps中获取宽高写入metadata的逻辑
audiocodecid := -1
videocodecid := -1
if r.audioType == base.AVPacketPTAAC {
audiocodecid = int(base.RTMPSoundFormatAAC)
}
switch r.videoType {
case base.AVPacketPTAVC:
videocodecid = int(base.RTMPCodecIDAVC)
case base.AVPacketPTHEVC:
videocodecid = int(base.RTMPCodecIDHEVC)
}
bMetadata, err := rtmp.BuildMetadata(-1, -1, audiocodecid, videocodecid)
if err != nil {
nazalog.Errorf("build metadata failed. err=%+v", err)
return
}
r.onRTMPAVMsg(base.RTMPMsg{
Header: base.RTMPHeader{
CSID: rtmp.CSIDAMF,
MsgLen: uint32(len(bMetadata)),
MsgTypeID: base.RTMPTypeIDMetadata,
MsgStreamID: rtmp.MSID1,
TimestampAbs: 0,
},
Payload: bMetadata,
})
r.hasEmittedMetadata = true
}
msg.Payload = make([]byte, msg.Header.MsgLen)
msg.Payload[0] = 0xAF
msg.Payload[1] = base.RTMPAACPacketTypeRaw
copy(msg.Payload[2:], pkt.Payload)
default:
err = ErrRemux
return
var msg base.RTMPMsg
msg.Header.MsgStreamID = rtmp.MSID1
if isAudio {
msg.Header.CSID = rtmp.CSIDAudio
msg.Header.MsgTypeID = base.RTMPTypeIDAudio
} else {
msg.Header.CSID = rtmp.CSIDVideo
msg.Header.MsgTypeID = base.RTMPTypeIDVideo
}
return
msg.Header.MsgLen = uint32(len(payload))
msg.Header.TimestampAbs = timestamp
msg.Payload = payload
r.onRTMPAVMsg(msg)
}
func (r *AVPacket2RTMPRemuxer) setVPS(b []byte) {
r.vps = r.vps[0:0]
r.vps = append(r.vps, b...)
}
func (r *AVPacket2RTMPRemuxer) setSPS(b []byte) {
r.sps = r.sps[0:0]
r.sps = append(r.sps, b...)
}
func (r *AVPacket2RTMPRemuxer) setPPS(b []byte) {
r.pps = r.pps[0:0]
r.pps = append(r.pps, b...)
}
func (r *AVPacket2RTMPRemuxer) clearVideoSeqHeader() {
r.vps = r.vps[0:0]
r.sps = r.sps[0:0]
r.pps = r.pps[0:0]
}

@ -0,0 +1,86 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package remux_test
import (
"encoding/hex"
"testing"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/naza/pkg/nazalog"
)
// #85
func TestCase1(t *testing.T) {
ps := []string{
"0000002c67640032ad84010c20086100430802184010c200843b5014005ad370101014000003000400000300ca100002",
"0000000468ee3cb0",
}
golden := []base.AVPacket{
{
Timestamp: 10340642,
PayloadType: base.AVPacketPTAVC,
},
{
Timestamp: 10340642,
PayloadType: base.AVPacketPTAVC,
},
}
for i := range ps {
p, _ := hex.DecodeString(ps[i])
golden[i].Payload = p
}
remuxer := remux.NewAVPacket2RTMPRemuxer(func(msg base.RTMPMsg) {
nazalog.Debugf("%+v", msg)
})
for _, p := range golden {
remuxer.FeedAVPacket(p)
}
}
func TestCase2(t *testing.T) {
ps := []string{
"0000001840010c01ffff016000000300b0000003000003007bac0901",
"00000024420101016000000300b0000003000003007ba003c08010e58dae4914bf37010101008001",
"0000000c4401c0f2c68d03b240000003",
"0000000c4e01e504ebc3000080000003",
}
golden := []base.AVPacket{
{
Timestamp: 25753900,
PayloadType: base.AVPacketPTHEVC,
},
{
Timestamp: 25753900,
PayloadType: base.AVPacketPTHEVC,
},
{
Timestamp: 25753900,
PayloadType: base.AVPacketPTHEVC,
},
{
Timestamp: 25753900,
PayloadType: base.AVPacketPTHEVC,
},
}
for i := range ps {
p, _ := hex.DecodeString(ps[i])
golden[i].Payload = p
}
remuxer := remux.NewAVPacket2RTMPRemuxer(func(msg base.RTMPMsg) {
nazalog.Debugf("%+v", msg)
})
for _, p := range golden {
remuxer.FeedAVPacket(p)
}
}

@ -50,10 +50,14 @@ type IRTPUnpackerProtocol interface {
// @param pkt: pkt.Timestamp RTP包头中的时间戳(pts)经过clockrate换算后的时间戳单位毫秒
// 注意不支持带B帧的视频流pts和dts永远相同
// pkt.PayloadType base.AVPacketPTXXX
// pkt.Payload 如果是AAC返回的是raw frame一个AVPacket只包含一帧
// 如果是AVC或HEVC是AVCC格式每个NAL前包含4字节NAL的长度
// AAC引用的是接收到的RTP包中的内存块
// AVC或者HEVC是新申请的内存块回调结束后内部不再使用该内存块
// pkt.Payload AAC:
// 返回的是raw frame一个AVPacket只包含一帧
// 引用的是接收到的RTP包中的内存块
// AVC或HEVC:
// AVCC格式每个NAL前包含4字节NAL的长度
// 新申请的内存块,回调结束后,内部不再使用该内存块
// 注意这一层只做RTP包的合并假如sps和pps是两个RTP single包则合并结果为两个AVPacket
// 假如sps和pps是一个stapA包则合并结果为一个AVPacket
type OnAVPacket func(pkt base.AVPacket)
// 目前支持AVCHEVC和AAC MPEG4-GENERIC/44100/2业务方也可以自己实现IRTPUnpackerProtocol甚至是IRTPUnpackContainer

@ -12,10 +12,79 @@ import (
"encoding/hex"
"testing"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/assert"
)
// #85
func TestAVCCase1(t *testing.T) {
// single sps
// single pps
// FUA IDR
ss := []string{
"a06000013778b64c1921d51867640032ad84010c20086100430802184010c200843b5014005ad370101014000003000400000300ca100002",
"806000023778b64c1921d51868ee3cb0",
"806000033778b64c1921d5187c85b8000006bff0ffee6021caad8ffdd001c15e27306e7fc6bf36ca8b6bc1411afef158dc64d75094f5b2bbddec364e6904cc9a14b4069bb6b6cb6d6ab8f132e77e3324291351b52e4a58fce30dd7b64313d208df50ab636423abce5a0dcc6c3ff8397c84250042244aafef705063debcdd7fe6c6c2fd41b4c6251fe4ca318c40e4bf2eb373246f14ad1623f9f5356154b02c0c8f53fcd8b6ad452da457b48ba704ec227f16b59d9e7c0d50423bb7f78059a68d26aef86bc94ebb27723c6eee018b45977028b400e474d40f7464fcb5f0f292b23a2324486bdd500fe72b115bd93bdf5f0f9207daedfc02e4b8bed2c2b8a4b081b00c3a9172e1ce34222a870d06b925a9263c59276d4ad6bd82bdab07f8e01b3ae3263899948d345f83e4ff127b3c4d2721d3543880a0ea72d9b13ea0a4dd64ca4617f3d63b0182f9cc37c917dc620853f12487d608bc76fe3d5bc039fc9df161d4d6ca2cae273f7893ffbe78ba9e0e85217c9c2b6214e48e831cf4eed74f1e70f3050fcd72dd466d1a3d00f500b884c141625f5daba37c20e4623387406a6930819f08c0e3321ea7695d92e8da38e5d926767926ee5ebce20e4eecbd7aea2b8756489271ba7c2977c4e568a4a25c82b159cc3f5f6575f1b96ad1c589267947258bc1e62d5eed1a17a99219c019fd9167754310ad3c2ef1acbf304fe0b5342d09ae20b4b4fc49f5231ca744977d3a73ce2821f8cc76f1c0149ae8d5dadf44bf87ad989a311ce57ad19e4579bc8cdc309eb28954e875441f9c135182eca8f3016aa5792ec3d1926da40694465dfd625b8c8c836792f4cee619bcce39a71c8055827cbd5c22fedfcb7626534d07385df9247b6338e9bc68c1ee8903a4fc9309decc5ce2ca60a7331ccd15f285ddb9be2134173fada2150ce68dddc4d2ef397624d74cb0ff9dab86664e17a0bc76ec86875736e68133e3bcdf5419cd6ce19ff1108bbe7e39ea36578ea8e8a4cbe6c850095467f22c1c220aa8203f8ee16d39de0f414db0f46dbc2f7f34b4dc7062aeed316c991522efb7fd8ac10d510596638f0bc330005317ab085787db0b26ed4c6086513d059ceef9274d6788a9bf30abb1cf6b353d9e57103e10a299df9d77c688cc0d69af84f319e97f55db30d419fdf9a028b4185fa1cae5f2fdfadcb0abc68ca03968a8b4801d31d8096451b1dd54d159d81b3cf8554a43a65a80fc1581eb00719de7c606201c31af22d0fef3208889c4f0a5c0ea06ecdb13e6d99cff25f449d1fe90c3d7beadc52595b12d9e58fc647ebbc30ac4131fad7dfad3a6ea9d51948b095d9aafd48a0545378c2083406a2248f4876e104d76d22ce0a552848d89d603392dad6486554a58dd8fb154b70e52d4c4e1f093d73d731563bebf9fe61493779ba791df3dc65430cf27d00292609f6c09a682e9f4b03a4e3507459ae06ff07e66fa992b54f8d0837cf806d9921ac916b1fe064adb76e7cedfa4e2c5a9126a0f7a7927e5686b990cc87f89612b983cb81783e0763a548648ef73d855a0afeaf78192047b060fcf8b1016a46016de75983d7e57c70d5fa5259012e465531f953b27ca67b554620a5e194386b8fd6d784e989d151b7326d028ff28b2707d376147ca3657a60daa7158e630b9def41c579a02b24c6b5cbe5161c985bf5a6ba41b033be092c4a590098aca5a4a6b8aef024863ef403fe4cb4dc2010dedd3cc1c5f7c6a123e2d7f8fd8feef793afb2402a763df927316b337d83808f13354f4706af395556caa6ec0efb724a7abe587b8eff333d64623ad7556a735dcd04bfe88c8f57327625aef8f25a0b722499de91a509ad383650d3ba250e16c5dd2671d67039b9404c174863a9af5c2738dd293770488c034c1f4f9c0d1cfb8f02c9bcecfb24a17ad06c7163788f22c8bbeb30b26423ad22515aca1916a28f716aad8b970623f054537448d74ee4822",
}
testHelperTemplete(t, base.AVPacketPTAVC, 90000, 128, ss, func(rtpPackets []RTPPacket) []base.AVPacket {
return []base.AVPacket{
{
Timestamp: 10340642,
PayloadType: base.AVPacketPTAVC,
Payload: testHelperAddPrefixLength(rtpPackets[0].Raw[12:]),
},
{
Timestamp: 10340642,
PayloadType: base.AVPacketPTAVC,
Payload: testHelperAddPrefixLength(rtpPackets[1].Raw[12:]),
},
}
})
}
func TestHEVCCase1(t *testing.T) {
// single vps
// single sps
// single pps
// single sei
ss := []string{
"a060d3a38a27999a01c0125940010c01ffff016000000300b0000003000003007bac0901",
"a060d3a48a27999a01c01259420101016000000300b0000003000003007ba003c08010e58dae4914bf37010101008001",
"a060d3a58a27999a01c012594401c0f2c68d03b240000003",
"a060d3a68a27999a01c012594e01e504ebc3000080000003",
}
testHelperTemplete(t, base.AVPacketPTHEVC, 90000, 128, ss, func(rtpPackets []RTPPacket) []base.AVPacket {
return []base.AVPacket{
{
Timestamp: 25753900,
PayloadType: base.AVPacketPTHEVC,
Payload: testHelperAddPrefixLength(rtpPackets[0].Raw[12:]),
},
{
Timestamp: 25753900,
PayloadType: base.AVPacketPTHEVC,
Payload: testHelperAddPrefixLength(rtpPackets[1].Raw[12:]),
},
{
Timestamp: 25753900,
PayloadType: base.AVPacketPTHEVC,
Payload: testHelperAddPrefixLength(rtpPackets[2].Raw[12:]),
},
{
Timestamp: 25753900,
PayloadType: base.AVPacketPTHEVC,
Payload: testHelperAddPrefixLength(rtpPackets[3].Raw[12:]),
},
}
})
}
func TestAACCase1(t *testing.T) {
ss := []string{
"80e10e9a56843e4cf0bdf2fe00102d10214e6c425f74815f92415f94415f924100000114008a004027f313d564a770026fd01203c9cbac420e1aecaa41efb4619391309daa7938b4b905989c5c293c024819e4234eb324934327e3f083c98c2220137bc9b5b2bae5809351710e4bfec0dec11fffe0ab6e568f8357fb74c3461e892fc3a4e22ac512fa73bdc13004b9d73e21c5221387922692939994e38ab1516ea2cfb64df8cffb5468a81e4704f255a5139f7f21d622b944e0b08221916d4ba6e10a31b2a148c38642d1c8b2b12a15681c0f0b589160feb32b43c7ebade413590916a934148ca7915d6250904e5172a3ecc612ab23fd4431b0eef591be52155e4a0ac8564108012689c4df89221892e50d8fca64a747bb2313908211004222ea6a848132e87935978e03845905ccc057c743c7424e64e2338bfe52090d4a5a961f35ec5e14befbc2b3d41f89bdfe949fc2dd40e141b13e397f84f7e1ef43f303df77407bdacc01c2832c3d3df65fe3bfde0001f37f85f4a021f27baf11e4f388fd5ce464337648fc89e6043a0c8268a44f0e024e112910c8b9bc282ee2e5587762b2752c08a422ac99558f904273f858d9de9938ef2502493ae3217c247410eb39395f084a117f6486161103c5cab0881d7c24ffd7123cdcac226bc292c2dc213d04f0b26dd864e6e149c3a4468a48d86124209e172166c5b331e434c8cae4276e191acc2682d692a51210c1ce2254102e048435919442449f2c57bb271364fbb67f1d79364ee004a28c88e75dd53210eef613186531f22908a5253a87e66423fe1bffef7dd95fed588acf815908808fefff2ffef581fe23b2b8e9fdfbd7e3f7071c05d40ec89401efbdf87beee9a9c1eff2d054c0f7ff5febc514514514514514514517d7fe0faf145145111fc0dd8218d9f9d064ee8895cb84ee848064e7593530495f87ffa276e76549a4e6c0e381270c24ea288e0e1d8ad270f0dc243210e413a84b160e085b10a416be12011022b31354b202020771fe376bf6afe387e67fe0fee8087bee102bf415fa0afd0400000e3e3e3fe001f8f8ff53eff60fafab04fe0877121cebad11c16048271174dc251cbdd3da84e0c9ba23dd312b26500020154a269d9c4520270444131665313103f0f7412d40de5c59cfff0f6383440fc588e986f10b35ddbfdb6be780f30eb25f49fd7e479062097adbc044ba3e0c956ef043c31d188a23d453e83ca71ca44df64805f4514916e71c7e760674293094980642012d104ba2cacd260456e2e93a8c1f492739f9dd445aba9241088de157cb3d976e20888a41b34808f3a4ca181528c989641e0a187dc5443494c896e4421711580a84276a12a229662108289464d105b1513f80841836f009c987f86969641eea943c86c475685fb09043ae947ac5d22ff6ff7c1d380a48885c2a7d03905668ec9fd1fcdf7fc700450094d16f034c90003df9240b851764658e101efb8ea83855071d6871c5126a8385556aa0db094e571d9dc7271352b846810c4e24000000000000038e60384c4f09800000021e17eb447ca381218ff036078221ec1e7843a466acc013c0130585c7518330993235855b71158610843210aa9ba05c22ea5984de5e1227e3e4d2dd942a1659f5320232b45bb6c6418df87b313c727fc3f1d170a6ecb0b74e4449209804a3bf270c98518182b79dd74412c24f7e4f45131a889442502df3928282006d0f165c6e4d39059e7404eb1092044c67267817686dc7dbe82281f1d2f65ddaf70e67250c77adbe39d09be32b8f0007d2f905140d3c34fb6784075ce77075dfb069da091017344c80b6b9a36ff1d93c22889d5a7ccaa2073ccf278e1f8e9c38e1c9a8f502b8e9c84938542ae3a7e1313c26240000000000000384c4f1cc4f09890000021f49fc3847d1b76e8613938122e759d2494b9e4634fc812ab747e3132beb075bf23fed67a0943a245082648a4d06e11166a25f110a24c0c993964042ff0a8d24e4108c489514499641221f84c013c06008e0301c2e0038e6009e0327c7326000001c",
@ -24,38 +93,25 @@ func TestAACCase1(t *testing.T) {
"80e10e9d5684464cf0bdf2fe00102db871cdd0709ba0e0",
}
var pkts []RTPPacket
for _, s := range ss {
pkt, err := hexstream2rtppacket(s)
assert.Equal(t, nil, err)
pkts = append(pkts, pkt)
}
expected := []base.AVPacket{
{
Timestamp: 30239734,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[0].Raw[12+4:],
},
{
Timestamp: 30239756,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[1].Raw[12+4:],
},
{
Timestamp: 30239777,
PayloadType: base.AVPacketPTAAC,
Payload: append(pkts[2].Raw[12+4:], pkts[3].Raw[12+4:]...),
},
}
var outPkts []base.AVPacket
unpacker := DefaultRTPUnpackerFactory(base.AVPacketPTAAC, 48000, 128, func(pkt base.AVPacket) {
outPkts = append(outPkts, pkt)
testHelperTemplete(t, base.AVPacketPTAAC, 48000, 128, ss, func(rtpPackets []RTPPacket) []base.AVPacket {
return []base.AVPacket{
{
Timestamp: 30239734,
PayloadType: base.AVPacketPTAAC,
Payload: rtpPackets[0].Raw[12+4:],
},
{
Timestamp: 30239756,
PayloadType: base.AVPacketPTAAC,
Payload: rtpPackets[1].Raw[12+4:],
},
{
Timestamp: 30239777,
PayloadType: base.AVPacketPTAAC,
Payload: append(rtpPackets[2].Raw[12+4:], rtpPackets[3].Raw[12+4:]...),
},
}
})
for _, pkt := range pkts {
unpacker.Feed(pkt)
}
assert.Equal(t, expected, outPkts)
}
func TestAACCase2(t *testing.T) {
@ -66,60 +122,84 @@ func TestAACCase2(t *testing.T) {
"80e104588424be31b06db689001016d8211c8d2ff7fffffffd1975a43251621000000006af6ba26aa2e3f904d00271b1e487d009e4320471d8f204892ad2252d72b8700260e8b4113e20831048a5a0438f1b93832a048415ff73b5bf69631c8a4e4ca6eb89fc8454ab3558b7ed96e5557083eb2ac47779eb23fa2d680f806dfa4ffd6590df3f6e990132030626063e01e73386af34ea3ff2746f376e6c8fcc3f0574828a15f1375e386521f5b50f0f4aea7ffdb9e0d27caff771bbfd4788dcbd118e539df41f01be771cf33af56f9a35090fd7b57dfd8f557cb9e398b7df5f1ac4d4bb56f69cdd2211b6f49679293c43dc7b283840528ea9f573ee24cc7c055d4b9844491583e315d70af6dd5e73fdab59fd17d532f48dda3cb452cba838dccbbdc2b7d5b4699a8e565a4b71035f2a9bd97bb88b90057c240692aa50aca5486142176a21104b5b19c1a691b086d4118810597934c2302e49091073911ba492badc8dba7ec264c5d29602f779ff3b58a48c510a681f42e7cdfd3ecf37fa7e8f6f8eda069baed02641111cea27687eba9db1f5471d5bb4734a6ca7f63caa22f5c04b5df2a7a5f75000ca648557f5c4ee3bca7e079e382643a1a808cb851d8e67222204200000000d5ed744d545c7f916413a981219ae144e8cafae1383088cb0139c294c240e3f9c9dc1d324047aaf90dd06794b20bebd8f4d73ffefb80f3d6dfde7dddd613573e886de98f79a921d12eb49909e8b4415f66cdc863a11bf1aaada975bd2d91e20d7f58ca2a3219d9bb55d873c2c399a17028a2bdc09a513165845609d47ee11f4f5e8822407a13c1f876c728291c28931eb6004cc5e86a441a29503a82c2b848200ee55e3df0880cc00dbd92d204170376d1d2c19355727b70dd508350425a031cb73d7fee9bbd887878b6b4a3ebb056e59b44b1e7a4884f1e3cfdab2b3607e594e2cf47eb553f54696c0acb08ba8b411ed053572db39a5c885723bc0bd2d97d92b27b5381d5bd555ae9fba09cde8ea4e66a7bc1b3488deebff89ad1d8aac49546b79e301c0",
}
var pkts []RTPPacket
for _, s := range ss {
pkt, err := hexstream2rtppacket(s)
assert.Equal(t, nil, err)
pkts = append(pkts, pkt)
}
testHelperTemplete(t, base.AVPacketPTAAC, 32000, 128, ss, func(rtpPackets []RTPPacket) []base.AVPacket {
return []base.AVPacket{
{
Timestamp: 69281105,
PayloadType: base.AVPacketPTAAC,
Payload: rtpPackets[0].Raw[12+2+6 : 12+2+6+24],
},
{
Timestamp: 69281137,
PayloadType: base.AVPacketPTAAC,
Payload: rtpPackets[0].Raw[12+2+6+24 : 12+2+6+24+6],
},
{
Timestamp: 69281169,
PayloadType: base.AVPacketPTAAC,
Payload: rtpPackets[0].Raw[12+2+6+24+6:],
},
{
Timestamp: 69281201,
PayloadType: base.AVPacketPTAAC,
Payload: rtpPackets[1].Raw[12+4:],
},
{
Timestamp: 69281233,
PayloadType: base.AVPacketPTAAC,
Payload: rtpPackets[2].Raw[12+4:],
},
{
Timestamp: 69281265,
PayloadType: base.AVPacketPTAAC,
Payload: rtpPackets[3].Raw[12+4:],
},
}
})
}
expected := []base.AVPacket{
{
Timestamp: 69281105,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[0].Raw[12+2+6 : 12+2+6+24],
},
{
Timestamp: 69281137,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[0].Raw[12+2+6+24 : 12+2+6+24+6],
},
{
Timestamp: 69281169,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[0].Raw[12+2+6+24+6:],
},
{
Timestamp: 69281201,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[1].Raw[12+4:],
},
{
Timestamp: 69281233,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[2].Raw[12+4:],
},
{
Timestamp: 69281265,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[3].Raw[12+4:],
},
}
func testHelperTemplete(t *testing.T, payloadType base.AVPacketPT, clockRate int, maxSize int, hexRTPPackets []string, expectedFn func([]RTPPacket) []base.AVPacket) {
rtpPackets, err := testHelperHexstream2rtppackets(hexRTPPackets)
assert.Equal(t, nil, err)
expected := expectedFn(rtpPackets)
assert.Equal(t, expected, testHelperUnpack(payloadType, clockRate, maxSize, rtpPackets))
}
func testHelperAddPrefixLength(in []byte) (out []byte) {
out = make([]byte, len(in)+4)
bele.BEPutUint32(out, uint32(len(in)))
copy(out[4:], in)
return
}
func testHelperUnpack(payloadType base.AVPacketPT, clockRate int, maxSize int, rtpPackets []RTPPacket) []base.AVPacket {
var outPkts []base.AVPacket
unpacker := DefaultRTPUnpackerFactory(base.AVPacketPTAAC, 32000, 128, func(pkt base.AVPacket) {
//nazalog.Infof("out: %d, %d", pkt.Timestamp, len(pkt.Payload))
unpacker := DefaultRTPUnpackerFactory(payloadType, clockRate, maxSize, func(pkt base.AVPacket) {
nazalog.Debugf("%s", hex.EncodeToString(pkt.Payload))
outPkts = append(outPkts, pkt)
})
for _, pkt := range pkts {
//nazalog.Infof("in: %+v %d", pkt.Header, len(pkt.Raw))
for _, pkt := range rtpPackets {
unpacker.Feed(pkt)
}
assert.Equal(t, expected, outPkts)
return outPkts
}
func testHelperHexstream2rtppackets(hexPackets []string) (pkts []RTPPacket, err error) {
var pkt RTPPacket
for _, p := range hexPackets {
pkt, err = testHelperHexstream2rtppacket(p)
if err != nil {
return
}
pkts = append(pkts, pkt)
}
return
}
func hexstream2rtppacket(in string) (pkt RTPPacket, err error) {
func testHelperHexstream2rtppacket(hexPacket string) (pkt RTPPacket, err error) {
var raw []byte
raw, err = hex.DecodeString(in)
raw, err = hex.DecodeString(hexPacket)
if err != nil {
return
}

@ -29,17 +29,15 @@ import (
// 聚合PubSession和PullSession也即流数据是输入类型的session
// BaseInSession会向上层回调两种格式的数据
// BaseInSession会向上层回调两种格式的数据(本质上是一份数据,业务方可自由选择使用)
// 1. 原始的rtp packet
// 2. rtp合并后的av packet
//
type BaseInSessionObserver interface {
OnRTPPacket(pkt rtprtcp.RTPPacket)
OnSDP(sdpCtx sdp.LogicContext)
// @param asc: AAC AudioSpecificConfig注意如果不存在音频或音频不为AAC则为nil
// @param vps, sps, pps 如果都为nil则没有视频如果sps, pps不为nil则vps不为nil是H265vps为nil是H264
//
// 注意4个参数可能同时为nil
OnAVConfig(asc, vps, sps, pps []byte)
// 回调收到的RTP包
OnRTPPacket(pkt rtprtcp.RTPPacket)
// @param pkt: pkt结构体中字段含义见rtprtcp.OnAVPacket
OnAVPacket(pkt base.AVPacket)
@ -134,7 +132,7 @@ func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicCo
}
if session.observer != nil {
session.observer.OnAVConfig(session.sdpLogicCtx.ASC, session.sdpLogicCtx.VPS, session.sdpLogicCtx.SPS, session.sdpLogicCtx.PPS)
session.observer.OnSDP(session.sdpLogicCtx)
}
}
@ -142,7 +140,10 @@ func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicCo
func (session *BaseInSession) SetObserver(observer BaseInSessionObserver) {
session.observer = observer
session.observer.OnAVConfig(session.sdpLogicCtx.ASC, session.sdpLogicCtx.VPS, session.sdpLogicCtx.SPS, session.sdpLogicCtx.PPS)
// 避免在当前协程回调,降低业务方使用负担,不必担心设置监听对象和回调函数中锁重入
go func() {
session.observer.OnSDP(session.sdpLogicCtx)
}()
}
func (session *BaseInSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error {

@ -21,6 +21,11 @@ type LogicContext struct {
AudioClockRate int
VideoClockRate int
ASC []byte
VPS []byte
SPS []byte
PPS []byte
audioPayloadTypeBase base.AVPacketPT // lal内部定义的类型
videoPayloadTypeBase base.AVPacketPT
@ -29,11 +34,6 @@ type LogicContext struct {
audioAControl string
videoAControl string
ASC []byte
VPS []byte
SPS []byte
PPS []byte
// 没有用上的
hasAudio bool
hasVideo bool
@ -140,7 +140,7 @@ func ParseSDP2LogicContext(b []byte) (LogicContext, error) {
if md.AFmtPBase != nil {
ret.SPS, ret.PPS, err = ParseSPSPPS(md.AFmtPBase)
if err != nil {
return ret, err
nazalog.Warnf("parse sps pps from afmtp failed. err=%+v", err)
}
} else {
nazalog.Warnf("avc afmtp not exist.")
@ -150,7 +150,7 @@ func ParseSDP2LogicContext(b []byte) (LogicContext, error) {
if md.AFmtPBase != nil {
ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(md.AFmtPBase)
if err != nil {
return ret, err
nazalog.Warnf("parse vps sps pps from afmtp failed. err=%+v", err)
}
} else {
nazalog.Warnf("hevc afmtp not exist.")

@ -360,3 +360,67 @@ a=appversion:1.0`
assert.Equal(t, true, ctx.hasVideo)
nazalog.Debugf("%+v", ctx)
}
// #85
func TestCase7(t *testing.T) {
golden := `v=0
o=- 1109162014219182 0 IN IP4 0.0.0.0
s=HIK Media Server V3.4.106
i=HIK Media Server Session Description : standard
e=NONE
c=IN IP4 0.0.0.0
t=0 0
a=control:*
b=AS:4106
a=range:clock=20210520T063812Z-20210520T064816Z
m=video 0 RTP/AVP 96
i=Video Media
a=rtpmap:96 H264/90000
a=fmtp:96 profile-level-id=4D0014;packetization-mode=0
a=control:trackID=video
b=AS:4096
m=audio 0 RTP/AVP 98
i=Audio Media
a=rtpmap:98 G7221/16000
a=control:trackID=audio
b=AS:10
a=Media_header:MEDIAINFO=494D4B48020100000400000121720110803E0000803E000000000000000000000000000000000000;
a=appversion:1.0
`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
_ = ctx
}
func TestCase8(t *testing.T) {
golden := `v=0
o=- 1622201479405259 1622201479405259 IN IP4 192.168.3.58
s=Media Presentation
e=NONE
b=AS:5100
t=0 0
a=control:rtsp://192.168.3.58:554/Streaming/Channels/101/?transportmode=unicast
m=video 0 RTP/AVP 96
c=IN IP4 0.0.0.0
b=AS:5000
a=recvonly
a=x-dimensions:1920,1080
a=control:rtsp://192.168.3.58:554/Streaming/Channels/101/trackID=1?transportmode=unicast
a=rtpmap:96 H265/90000
m=audio 0 RTP/AVP 8
c=IN IP4 0.0.0.0
b=AS:50
a=recvonly
a=control:rtsp://192.168.3.58:554/Streaming/Channels/101/trackID=2?transportmode=unicast
a=rtpmap:8 PCMA/8000
a=Media_header:MEDIAINFO=494D4B48010200000400050011710110401F000000FA000000000000000000000000000000000000;
a=appversion:1.0
`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
_ = ctx
}

Loading…
Cancel
Save