@ -22,7 +22,6 @@ import (
)
// PsUnpacker 解析ps(Program Stream)流
//
type PsUnpacker struct {
list rtprtcp . RtpPacketList
buf * nazabytes . Buffer
@ -43,6 +42,13 @@ type PsUnpacker struct {
preVideoRtpts int64
onAvPacket base . OnAvPacketFunc
waitSpsFlag bool
feedPacketCount int
feedBodyCount int
onAvPacketWrapCount int
onAvPacketCount int
}
func NewPsUnpacker ( ) * PsUnpacker {
@ -52,7 +58,7 @@ func NewPsUnpacker() *PsUnpacker {
preAudioPts : - 1 ,
preVideoRtpts : - 1 ,
preAudioRtpts : - 1 ,
onAvPacket: defaultOnAvPacket ,
waitSpsFlag: true ,
}
p . list . InitMaxSize ( maxUnpackRtpListSize )
@ -62,12 +68,12 @@ func NewPsUnpacker() *PsUnpacker {
// WithOnAvPacket
//
// @param onAvPacket: 回调函数中 base.AvPacket 字段说明:
// PayloadType AvPacketPt 见 base.AvPacketPt
// Timestamp int64 dts, 单位毫秒
// Pts int64 pts, 单位毫秒
// Payload []byte 对于视频, h264和h265是AnnexB格式
// 对于音频, AAC是前面携带adts的格式
//
// PayloadType AvPacketPt 见 base.AvPacketPt。
// Timestamp int64 dts, 单位毫秒。
// Pts int64 pts, 单位毫秒。
// Payload []byte
// 对于视频, h264和h265是AnnexB格式。
// 对于音频, AAC是前面携带adts的格式。
func ( p * PsUnpacker ) WithOnAvPacket ( onAvPacket base . OnAvPacketFunc ) * PsUnpacker {
p . onAvPacket = onAvPacket
return p
@ -77,17 +83,25 @@ func (p *PsUnpacker) WithOnAvPacket(onAvPacket base.OnAvPacketFunc) *PsUnpacker
//
// 注意,内部会处理丢包、乱序等问题
//
// @param b: rtp包, 注意, 包含rtp包头部分
//
// @param b: rtp包, 注意, 包含rtp包头部分, 内部不持有该内存块
func ( p * PsUnpacker ) FeedRtpPacket ( b [ ] byte ) error {
//nazalog.Debugf("> FeedRtpPacket. len=%d", len(b))
// TODO(chef): [opt] 当前遇到的场景都是, 音频和视频共用一个ssrc, 并且音频和视频的seq是打在一起的, 是否存在两个ssrc的情况? 202209
p . feedPacketCount ++
//defer func() {
// nazalog.Debugf("<<<<<<<<<< PsUnpacker. list=%s", p.list.DebugString())
//}()
ipkt , err := rtprtcp . ParseRtpPacket ( b )
if err != nil {
nazalog . Errorf ( "PsUnpacker ParseRtpPacket failed. b=%s, err=%+v" ,
hex . Dump ( nazabytes . Prefix ( b , 64 ) ) , err )
return err
}
//nazalog.Debugf("FeedRtpPacket. h=%+v", ipkt.Header)
//nazalog.Debugf(">>>>>>>>>> PsUnpacker FeedRtpPacket. h=%+v, len=%d, body=%s",
// ipkt.Header, len(ipkt.Raw), hex.Dump(nazabytes.Prefix(ipkt.Raw[12:], 8)))
var isStartPositionFn = func ( pkt rtprtcp . RtpPacket ) bool {
body := pkt . Body ( )
@ -98,11 +112,11 @@ func (p *PsUnpacker) FeedRtpPacket(b []byte) error {
// 过期了直接丢掉
if p . list . IsStale ( ipkt . Header . Seq ) {
//nazalog.Debugf(" FeedRtpPacket stale, drop. %d", ipkt.Header.Seq)
//nazalog.Debugf(" PsUnpacker NOTICE stale, drop. %d", ipkt.Header.Seq)
return ErrGb28181
}
// 插入队列
//nazalog.Debugf(" FeedRtpPacket insert. %d", ipkt.Header.Seq)
//nazalog.Debugf(" PsUnpacker FeedRtpPacket insert. %d", ipkt.Header.Seq)
p . list . Insert ( ipkt )
for {
// 循环判断头部是否是顺序的
@ -112,14 +126,17 @@ func (p *PsUnpacker) FeedRtpPacket(b []byte) error {
opkt := p . list . PopFirst ( )
p . list . SetDoneSeq ( opkt . Header . Seq )
p . FeedRtpBody ( opkt . Body ( ) , opkt . Header . Timestamp )
//nazalog.Debugf("FeedRtpPacket feed. %d", opkt.Header.Seq)
//nazalog.Debugf("PsUnpacker FeedRtpBody. %d", opkt.Header.Seq)
errFeedRtpBody := p . FeedRtpBody ( opkt . Body ( ) , opkt . Header . Timestamp )
if errFeedRtpBody != nil {
p . list . Reset ( )
}
} else {
// 不是顺序的,如果还没达到容器阈值,就先缓存在容器中,直接退出了
// 注意,如果队列为空,也会走到这,然后通过!full退出
if ! p . list . Full ( ) {
//nazalog.Debugf(" FeedRtpPacket exit check !full.")
//nazalog.Debugf(" PsUnpacker exit check !full.")
break
}
@ -130,17 +147,17 @@ func (p *PsUnpacker) FeedRtpPacket(b []byte) error {
// 再丢弃连续的,直到下一个可解析帧位置
// 因为不连续的话,没法判断和正在丢弃的是否同一帧的,可以给个机会看后续是否能收到
prev := p . list . PopFirst ( )
//nazalog.Debugf(" FeedRtpPacket, drop. %d", prev.Header.Seq)
//nazalog.Debugf(" PsUnpacker NOTICE drop. %d", prev.Header.Seq)
for p . list . Size > 0 {
curr := p . list . PeekFirst ( )
if rtprtcp . SubSeq ( curr . Header . Seq , prev . Header . Seq ) != 1 {
//nazalog.Debugf(" FeedRtpPacket exit drop !sequential.")
//nazalog.Debugf(" PsUnpacker exit drop !sequential.")
break
}
if isStartPositionFn ( curr ) {
//nazalog.Debugf(" FeedRtpPacket exit drop start.")
//nazalog.Debugf(" PsUnpacker exit drop start.")
// 注意, 这里需要设置done seq, 确保这个seq在以后的判断中可被使用
p . list . SetDoneSeq ( curr . Header . Seq - 1 )
@ -148,7 +165,7 @@ func (p *PsUnpacker) FeedRtpPacket(b []byte) error {
}
prev = p . list . PopFirst ( )
//nazalog.Debugf(" FeedRtpPacket, drop. %d", prev.Header.Seq)
//nazalog.Debugf(" PsUnpacker NOTICE drop. %d", prev.Header.Seq)
}
// 注意,缓存的数据也需要清除
@ -162,8 +179,9 @@ func (p *PsUnpacker) FeedRtpPacket(b []byte) error {
}
// FeedRtpBody 注意,传入的数据应该是连续的,属于完整帧的
//
func ( p * PsUnpacker ) FeedRtpBody ( rtpBody [ ] byte , rtpts uint32 ) {
func ( p * PsUnpacker ) FeedRtpBody ( rtpBody [ ] byte , rtpts uint32 ) error {
p . feedBodyCount ++
//nazalog.Debugf("> FeedRtpBody. len=%d, prev buf=%d", len(rtpBody), p.buf.Len())
p . buf . Write ( rtpBody )
// ISO/IEC iso13818-1
@ -198,7 +216,7 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpts uint32) {
//nazalog.Debugf("----------video stream----------")
consumed = p . parseAvStream ( int ( code ) , rtpts , rb , i )
case psPackStartCodePackEnd :
nazalog . Errorf ( "----------skip----------. %s" , hex . Dump ( nazabytes . Prefix ( rb [ i - 4 : ] , 32 ) ) )
//nazalog.Errorf("----------skip----------. %s", hex.Dump(nazabytes.Prefix(rb[i-4:], 32)))
consumed = 0
case psPackStartCodeHikStream :
consumed = parsePackStreamBody ( rb , i )
@ -220,7 +238,7 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpts uint32) {
p . buf . Reset ( )
p . audioBuf = nil
p . videoBuf = nil
return
return base . ErrGb28181
}
if consumed < 0 {
@ -229,11 +247,17 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpts uint32) {
nazalog . Warnf ( "consumed failed. code=%d, len=(%d,%d), i=%d, buf=%s" ,
code , len ( rtpBody ) , len ( rb ) , i , hex . Dump ( nazabytes . Prefix ( rb [ i - 4 : ] , 128 ) ) )
}
return
return nil
}
p . buf . Skip ( i + consumed )
//nazalog.Debugf("skip. %d", i+consumed)
}
return nil
}
func ( p * PsUnpacker ) Dispose ( ) {
nazalog . Debugf ( "PsUnpacker Dispose. (%d, %d, %d, %d, %d)" ,
p . feedPacketCount , p . feedBodyCount , p . list . Size , p . onAvPacketWrapCount , p . onAvPacketCount )
}
func ( p * PsUnpacker ) parsePsm ( rb [ ] byte , index int ) int {
@ -359,7 +383,7 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int)
// noop
} else {
if p . preAudioRtpts != int64 ( rtpts ) {
p . onAvPacket ( & base . AvPacket {
p . onAvPacket Wrap ( & base . AvPacket {
PayloadType : p . audioPayloadType ,
Timestamp : p . preAudioDts / 90 ,
Pts : p . preAudioPts / 90 ,
@ -376,7 +400,7 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int)
}
} else {
if pts != p . preAudioPts && p . preAudioPts >= 0 {
p . onAvPacket ( & base . AvPacket {
p . onAvPacket Wrap ( & base . AvPacket {
PayloadType : p . audioPayloadType ,
Timestamp : p . preAudioDts / 90 ,
Pts : p . preAudioPts / 90 ,
@ -392,7 +416,7 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int)
p . preAudioPts = pts
p . preAudioDts = dts
} else {
nazalog . Error f( "unknown audio stream type. ast=%d" , p . audioStreamType )
nazalog . Warn f( "unknown audio stream type. ast=%d" , p . audioStreamType )
}
} else if code == psPackStartCodeVideoStream {
// 判断出当前pes是否是新的帧, 然后将缓存中的帧回调给上层
@ -446,7 +470,6 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int)
}
// parsePackHeader 注意,`rb[index:]`为待解析的内存块
//
func parsePackHeader ( rb [ ] byte , index int ) int {
// 2.5.3.3 Pack layer of Program Stream
// Table 2-33 - Program Stream pack header
@ -473,13 +496,13 @@ func parsePackStreamBody(rb []byte, index int) int {
i := index
if len ( rb ) < i + 2 {
nazalog . Warn f( "needed=%d, actual=%d" , i + 2 , len ( rb ) )
nazalog . Debug f( "needed=%d, actual=%d" , i + 2 , len ( rb ) )
return - 1
}
l := int ( bele . BeUint16 ( rb [ i : ] ) )
i += 2 + l
if len ( rb ) < i {
nazalog . Warn f( "needed=%d, actual=%d" , i , len ( rb ) )
nazalog . Debug f( "needed=%d, actual=%d" , i , len ( rb ) )
return - 1
}
@ -487,7 +510,6 @@ func parsePackStreamBody(rb []byte, index int) int {
}
// iterateNaluByStartCode 通过nal start code分隔缓存数据, 将nal回调给上层
//
func ( p * PsUnpacker ) iterateNaluByStartCode ( code int , pts , dts int64 ) {
leading , preLeading , startPos := 0 , 0 , 0
startPos , preLeading = h2645 . IterateNaluStartCode ( p . videoBuf , 0 )
@ -510,7 +532,7 @@ func (p *PsUnpacker) iterateNaluByStartCode(code int, pts, dts int64) {
}
startPos = nextPos
p . onAvPacket ( & base . AvPacket {
p . onAvPacket Wrap ( & base . AvPacket {
PayloadType : p . videoPayloadType ,
Timestamp : dts / 90 ,
Pts : pts / 90 ,
@ -523,6 +545,38 @@ func (p *PsUnpacker) iterateNaluByStartCode(code int, pts, dts int64) {
}
}
func ( p * PsUnpacker ) onAvPacketWrap ( packet * base . AvPacket ) {
p . onAvPacketWrapCount ++
//nazalog.Debugf("PsUnpacker > onAvPacketWrap. packet=%s", packet.DebugString())
if packet . IsVideo ( ) {
typ := h2645 . ParseNaluType ( packet . PayloadType == base . AvPacketPtAvc , packet . Payload [ 4 ] )
//nazalog.Debugf("PsUnpacker onAvPacketWrap. type=%d", typ)
// TODO(chef): [opt] 等待sps等信息再开始回调, 这个逻辑不完整简化了 202209
if p . waitSpsFlag {
if packet . PayloadType == base . AvPacketPtAvc {
if typ == h2645 . H264NaluTypeSps || typ == h2645 . H264NaluTypePps {
p . waitSpsFlag = false
} else {
//nazalog.Debugf("PsUnpacker onAvPacketWrap. drop. %d", typ)
return
}
} else if packet . PayloadType == base . AvPacketPtHevc {
if typ == h2645 . H265NaluTypeVps || typ == h2645 . H265NaluTypeSps || typ == h2645 . H265NaluTypePps {
p . waitSpsFlag = false
} else {
//nazalog.Debugf("PsUnpacker onAvPacketWrap. drop. %d", typ)
return
}
}
}
}
if p . onAvPacket != nil {
p . onAvPacketCount ++
//nazalog.Debugf("PsUnpacker > onAvPacket. packet=%s", packet.DebugString())
p . onAvPacket ( packet )
}
}
// ---------------------------------------------------------------------------------------------------------------------
// TODO(chef): [refactor] 以下代码拷贝来自package mpegts, 重复了
@ -599,7 +653,3 @@ func readPts(b []byte) (fb uint8, pts int64) {
pts |= ( int64 ( b [ 3 ] ) << 8 | int64 ( b [ 4 ] ) ) >> 1
return
}
func defaultOnAvPacket ( packet * base . AvPacket ) {
// noop
}