[feat]精简ps视频流解析代码,并处理外部rtp的时间戳

pull/166/head
joestarzxh 3 years ago
parent 01abed7b34
commit cbd32f09d7

@ -69,7 +69,7 @@ var SliceTypeMapping = map[uint8]string{
const ( const (
NaluTypeSlice uint8 = 1 NaluTypeSlice uint8 = 1
NaluTypePartition_A uint8 =2 NaluTypePartitionA uint8 =2
NaluTypeIdrSlice uint8 = 5 NaluTypeIdrSlice uint8 = 5
NaluTypeSei uint8 = 6 NaluTypeSei uint8 = 6
NaluTypeSps uint8 = 7 NaluTypeSps uint8 = 7

@ -76,6 +76,9 @@ type PsUnpacker struct {
preVideoDts uint64 preVideoDts uint64
preAudioDts uint64 preAudioDts uint64
preVideoTimestamp uint32
preAudioTimestamp uint32
videoBuf []byte videoBuf []byte
audioBuf []byte audioBuf []byte
@ -138,7 +141,6 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpTimestamp uint32) {
// Table 2-35 - Program Stream map // Table 2-35 - Program Stream map
// //
// TODO(chef): [fix] 有些没做有效长度判断 // TODO(chef): [fix] 有些没做有效长度判断
firstVideoPack := false
for p.buf.Len() != 0 { for p.buf.Len() != 0 {
rb := p.buf.Bytes() rb := p.buf.Bytes()
i := 0 i := 0
@ -157,7 +159,6 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpTimestamp uint32) {
l := int(rb[i] & 0x7) l := int(rb[i] & 0x7)
i += 1 + l i += 1 + l
p.buf.Skip(i) p.buf.Skip(i)
firstVideoPack = true
case psPackStartCodeSystemHeader: case psPackStartCodeSystemHeader:
nazalog.Debugf("-----system header-----") nazalog.Debugf("-----system header-----")
// skip // skip
@ -248,7 +249,7 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpTimestamp uint32) {
i += phdl i += phdl
if code == psPackStartCodeAudioStream { if code == psPackStartCodeAudioStream {
if pts == 0 { if pts == 0 {
pts = uint64(rtpTimestamp) pts = p.preAudioPts
} }
if dts == 0 { if dts == 0 {
dts = pts dts = pts
@ -262,6 +263,17 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpTimestamp uint32) {
} }
p.audioBuf = nil p.audioBuf = nil
} }
} else {
if p.preAudioTimestamp != 0 {
if p.preAudioTimestamp != rtpTimestamp {
if p.onAudio != nil {
p.onAudio(p.audioBuf, int64(p.preAudioDts), int64(p.preAudioPts))
}
p.audioBuf = nil
}
} else {
p.preAudioTimestamp = rtpTimestamp
}
} }
p.audioBuf = append(p.audioBuf, rb[i:i+length-3-phdl]...) p.audioBuf = append(p.audioBuf, rb[i:i+length-3-phdl]...)
} }
@ -270,11 +282,7 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpTimestamp uint32) {
} else { } else {
if pts == 0 { if pts == 0 {
if firstVideoPack { pts = p.preVideoPts
pts = uint64(rtpTimestamp)
} else {
pts = p.preVideoPts
}
} }
if dts == 0 { if dts == 0 {
dts = pts dts = pts
@ -283,46 +291,28 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpTimestamp uint32) {
if p.preVideoPts != 0 { if p.preVideoPts != 0 {
if p.preVideoPts != pts { if p.preVideoPts != pts {
if p.onVideo != nil { if p.onVideo != nil {
leading, preLeading := 0, 0 p.iterateNaluByStartCode(code, p.preVideoDts, p.preVideoPts)
startPos, preLeading := p.findNextNaluStartPos(p.videoBuf, 0)
if startPos >= 0 {
nextPos := startPos
buf := p.videoBuf[:0]
for startPos >= 0 {
nextPos, leading = p.findNextNaluStartPos(p.videoBuf, startPos+3)
if nextPos >= 0 {
buf = p.videoBuf[startPos:nextPos]
} else {
buf = p.videoBuf[startPos:]
}
startPos = nextPos
if p.videoStreamType == StreamTypeH265 {
nazalog.Debugf("Video code=%d, length=%d,pts=%d, dts=%d, type=%s", code, len(buf), p.preVideoPts, p.preVideoDts, hevc.ParseNaluTypeReadable(buf[preLeading+1]))
} else {
nazalog.Debugf("Video code=%d, length=%d,pts=%d, dts=%d, type=%s", code, len(buf), p.preVideoPts, p.preVideoDts, avc.ParseNaluTypeReadable(buf[preLeading+1]))
}
p.onVideo(buf, int64(p.preVideoDts), int64(p.preVideoPts))
if nextPos >= 0 {
preLeading = leading
}
}
}
p.videoBuf = nil p.videoBuf = nil
} }
} }
} else {
if p.preVideoTimestamp != 0 {
if p.preVideoTimestamp != rtpTimestamp {
if p.onVideo != nil {
p.iterateNaluByStartCode(code, p.preVideoDts, p.preVideoPts)
p.videoBuf = nil
}
}
} else {
p.preVideoTimestamp = rtpTimestamp
}
} }
p.preVideoPts = pts p.preVideoPts = pts
p.preVideoDts = dts p.preVideoDts = dts
//暂存当前帧 //暂存当前帧
p.videoBuf = append(p.videoBuf, rb[i:i+length-3-phdl]...) p.videoBuf = append(p.videoBuf, rb[i:i+length-3-phdl]...)
} }
p.buf.Skip(6 + length) p.buf.Skip(6 + length)
case psPackStartCodeHikStream:
p.SkipPackStreamBody(rb, i)
case psPackStartCodePesPrivate2: case psPackStartCodePesPrivate2:
fallthrough fallthrough
case psPackStartCodePesEcm: case psPackStartCodePesEcm:
@ -333,8 +323,10 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpTimestamp uint32) {
fallthrough fallthrough
case psPackStartCodePackEnd: case psPackStartCodePackEnd:
p.buf.Skip(i) p.buf.Skip(i)
case psPackStartCodeHikStream:
fallthrough
case psPackStartCodePesPsd: case psPackStartCodePesPsd:
p.SkipPackStreamBody(rb, i) fallthrough
default: default:
nazalog.Errorf("default %s", hex.Dump(nazabytes.Prefix(rb[i-4:], 32))) nazalog.Errorf("default %s", hex.Dump(nazabytes.Prefix(rb[i-4:], 32)))
p.SkipPackStreamBody(rb, i) p.SkipPackStreamBody(rb, i)
@ -354,108 +346,32 @@ func (p *PsUnpacker) SkipPackStreamBody(rb []byte, indexId int) {
p.buf.Skip(indexId + 2 + l) p.buf.Skip(indexId + 2 + l)
} }
//参考media-server 中的ps解析 func (p *PsUnpacker) iterateNaluByStartCode(code uint32, pts, dts uint64) {
func (p *PsUnpacker) findNextNaluStartPos(buf []byte, index int) (startPos int, leading int) { leading, preLeading, startPos := 0, 0, 0
bufLen := len(buf) startPos, preLeading = avc.IterateNaluStartCode(p.videoBuf, 0)
startPos = -1 if startPos >= 0 {
leading = 0 nextPos := startPos
var pos int nalu := p.videoBuf[:0]
for i := index; i+2 < bufLen; i += pos { for startPos >= 0 {
if pos, leading = p.findNaluStartPos(buf[i:]); pos < 0 { nextPos, leading = avc.IterateNaluStartCode(p.videoBuf, startPos+preLeading)
return if nextPos >= 0 {
} nalu = p.videoBuf[startPos:nextPos]
if p.videoStreamType == StreamTypeH265 { } else {
nalType := (buf[i+pos] >> 1) & 0x3f nalu = p.videoBuf[startPos:]
if bufLen > i+pos+2 {
if isHevcNalu(nalType, buf[i+pos:]) {
startPos = i + pos - leading - 1
return
}
}
} else {
nalType := buf[i+pos] & 0x1f
if bufLen > i+pos+1 {
if isAvcNalu(nalType, buf[i+pos:]) {
startPos = i + pos - leading - 1
return
}
} }
} startPos = nextPos
if p.videoStreamType == StreamTypeH265 {
nazalog.Errorf("Video code=%d, length=%d,pts=%d, dts=%d, type=%s", code, len(nalu), pts, dts, hevc.ParseNaluTypeReadable(nalu[preLeading]))
} } else {
return nazalog.Errorf("Video code=%d, length=%d,pts=%d, dts=%d, type=%s", code, len(nalu), pts, dts, avc.ParseNaluTypeReadable(nalu[preLeading]))
} }
func (p *PsUnpacker) findNaluStartPos(buf []byte) (pos int, leading int) { p.onVideo(nalu, int64(dts), int64(pts))
bufLen := len(buf) if nextPos >= 0 {
zeros := 0 preLeading = leading
pos = -1
for i := 0; i+1 < bufLen; i++ {
if buf[i] == 1 {
if zeros > 2 {
leading = 3
pos = i + 1
break
} else if zeros == 2 {
leading = 2
pos = i + 1
break
} }
}
if buf[i] == 0 {
zeros += 1
} else {
zeros = 0
}
}
return
}
func isAvcNalu(nalType byte, nalu []byte) bool {
switch nalType {
case avc.NaluTypeSlice:
fallthrough
case avc.NaluTypePartition_A:
fallthrough
case avc.NaluTypeIdrSlice:
if nalu[1]&0x80 == 0 {
return false
} else {
return true
}
case avc.NaluTypeSei:
fallthrough
case avc.NaluTypeSps:
fallthrough
case avc.NaluTypePps:
fallthrough
case avc.NaluTypeAud:
return true
default:
if nalType > 14 && nalType < 18 {
return true
}
}
return false
}
func isHevcNalu(nalType byte, nalu []byte) bool {
nuhLayerId := (nalType & 0x01 << 5) | (nalu[1] >> 3 & 0x1F)
if nalType == hevc.NaluTypeVps ||
nalType == hevc.NaluTypeSps ||
nalType == hevc.NaluTypePps ||
(nuhLayerId == 0 && (nalType == hevc.NaluTypeAud ||
nalType == hevc.NaluTypeSei ||
nalType >= 41 && nalType <= 44 ||
nalType >= 48 && nalType <= 55)) {
return true
} else if nalType <= 31 {
if nalu[2]&0x80 == 0 {
return false
} else {
return true
} }
} }
return false
} }
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------

@ -111,20 +111,6 @@ var hevcNalu = []byte{
0x00, 0x00, 0x01, 0x26, 0x01, 0x83, 0x00, 0x00, 0x01, 0x26, 0x01, 0x83,
} }
func TestFindNaluStartPos(t *testing.T) {
unpacker := NewPsUnpacker().WithCallbackFunc(nil, func(payload []byte, dts int64, pts int64) {
})
findPos := 0
startPos, leading := unpacker.findNaluStartPos(avcNalu)
nazalog.Debugf("find pos=%d,start pos=%d,leading=%d", findPos+startPos, startPos, leading)
for startPos > 0 {
findPos += startPos
startPos, leading = unpacker.findNaluStartPos(avcNalu[findPos:])
nazalog.Debugf("find pos=%d,start pos=%d,leading=%d", findPos+startPos, startPos, leading)
}
}
func TestPsUnpacker2(t *testing.T) { func TestPsUnpacker2(t *testing.T) {
// 解析别人提供的一些测试数据,开发阶段用 // 解析别人提供的一些测试数据,开发阶段用

Loading…
Cancel
Save