messages:

- [feat] package avc: 提供一些AVCC转AnnexB相关的代码。学习解析SPS、PPS内部的字段
- [refactor] package hls: 使用package avc
- [feat] package rtsp: 部分解析SDP的代码。从SDP中解析sps,pps
- [feat] package rtsp: 将AVC类型的RTP包合成帧数据。未完成
pull/12/head
q191201771 5 years ago
parent a11723bde7
commit 7e4e3816ac

@ -183,15 +183,16 @@ func analysisVideoTag(tag httpflv.Tag) {
}
switch t {
case typeAVC:
if avc.CalcNALUType(body[i+4:]) == avc.NALUTypeIDRSlice {
if avc.ParseNALUType(body[i+4]) == avc.NALUTypeIDRSlice {
if prevIDRTS != int64(-1) {
diffIDRTS = int64(tag.Header.Timestamp) - prevIDRTS
}
prevIDRTS = int64(tag.Header.Timestamp)
}
buf.WriteString(fmt.Sprintf(" [%s(%s)] ", avc.CalcNALUTypeReadable(body[i+4:]), avc.CalcSliceTypeReadable(body[i+4:])))
sliceTypeReadable, _ := avc.ParseSliceTypeReadable(body[i+4:])
buf.WriteString(fmt.Sprintf(" [%s(%s)] ", avc.ParseNALUTypeReadable(body[i+4]), sliceTypeReadable))
case typeHEVC:
buf.WriteString(fmt.Sprintf(" [%s] ", hevc.CalcNALUTypeReadable(body[i+4:])))
buf.WriteString(fmt.Sprintf(" [%s] ", hevc.ParseNALUTypeReadable(body[i+4])))
}
i = i + 4 + int(naluLen)
}

@ -69,7 +69,7 @@ func main() {
_, _ = afp.Write(d)
_, _ = afp.Write(payload[2:])
case httpflv.TagTypeVideo:
_ = avc.CaptureAVC(vfp, payload)
_ = avc.CaptureAVCC2AnnexB(vfp, payload)
}
}
}

@ -12,13 +12,28 @@ import (
"errors"
"io"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazabits"
)
// Annex B:
// keywords: MPEG-2 transport stream, ElementaryStream(ES),
// nalu with start code.
// e.g. ts
//
// AVCC:
// keywords: AVC1, MPEG-4, extradata, sequence header, AVCDecoderConfigurationRecord
// nalu with length prefix.
// e.g. rtmp, flv
var ErrAVC = errors.New("lal.avc: fxxk")
var NALUStartCode = []byte{0x0, 0x0, 0x0, 0x1}
var (
NALUStartCode3 = []byte{0x0, 0x0, 0x1}
NALUStartCode4 = []byte{0x0, 0x0, 0x0, 0x1}
)
var NALUTypeMapping = map[uint8]string{
1: "SLICE",
@ -55,35 +70,95 @@ const (
SliceTypeP uint8 = 0
SliceTypeB uint8 = 1
SliceTypeI uint8 = 2
SliceTypeSP uint8 = 3 // TODO chef
SliceTypeSI uint8 = 4 // TODO chef
SliceTypeSP uint8 = 3
SliceTypeSI uint8 = 4
)
func CalcNALUType(nalu []byte) uint8 {
return nalu[0] & 0x1f
type Context struct {
width uint32
height uint32
}
// H.264-AVC-ISO_IEC_14496-15.pdf
// 5.2.4 Decoder configuration information
type DecoderConfigurationRecord struct {
ConfigurationVersion uint8
AVCProfileIndication uint8
ProfileCompatibility uint8
AVCLevelIndication uint8
LengthSizeMinusOne uint8
NumOfSPS uint8
SPSLength uint16
NumOfPPS uint8
PPSLength uint16
}
// ISO-14496-10.pdf
// 7.3.2.1 Sequence parameter set RBSP syntax
// 7.4.2.1 Sequence parameter set RBSP semantics
type SPS struct {
ProfileIdc uint8
ConstraintSet0Flag uint8
ConstraintSet1Flag uint8
ConstraintSet2Flag uint8
LevelIdc uint8
SPSId uint32
ChromaFormatIdc uint32
ResidualColorTransformFlag uint8
BitDepthLuma uint32
BitDepthChroma uint32
TransFormBypass uint8
Log2MaxFrameNumMinus4 uint32
PicOrderCntType uint32
Log2MaxPicOrderCntLsb uint32
NumRefFrames uint32 // num_ref_frames
GapsInFrameNumValueAllowedFlag uint8 // gaps_in_frame_num_value_allowed_flag
PicWidthInMbsMinusOne uint32 // pic_width_in_mbs_minus1
PicHeightInMapUnitsMinusOne uint32 // pic_height_in_map_units_minus1
FrameMbsOnlyFlag uint8 // frame_mbs_only_flag
MbAdaptiveFrameFieldFlag uint8 // mb_adaptive_frame_field_flag
Direct8X8InferenceFlag uint8 // direct_8x8_inference_flag
FrameCroppingFlag uint8 // frame_cropping_flag
FrameCropLeftOffset uint32 // frame_crop_left_offset
FrameCropRightOffset uint32 // frame_crop_right_offset
FrameCropTopOffset uint32 // frame_crop_top_offset
FrameCropBottomOffset uint32 // frame_crop_bottom_offset
}
func ParseNALUType(v uint8) uint8 {
return v & 0x1f
}
func ParseSliceType(nalu []byte) (uint8, error) {
if len(nalu) < 2 {
return 0, ErrAVC
}
// TODO chef: 考虑将error返回给上层
func CalcSliceType(nalu []byte) uint8 {
br := nazabits.NewBitReader(nalu[1:])
// first_mb_in_slice
_, err := br.ReadGolomb()
if err != nil {
return 0
// skip first_mb_in_slice
if _, err := br.ReadGolomb(); err != nil {
return 0, err
}
sliceType, err := br.ReadGolomb()
if err != nil {
return 0
return 0, err
}
// TODO chef: 检查非法数据slice type范围 [0, 9]
// range: [0, 9]
if sliceType > 9 {
return 0, ErrAVC
}
if sliceType > 4 {
sliceType -= 5
}
return uint8(sliceType)
return uint8(sliceType), nil
}
func CalcNALUTypeReadable(nalu []byte) string {
t := nalu[0] & 0x1f
func ParseNALUTypeReadable(v uint8) string {
t := ParseNALUType(v)
ret, ok := NALUTypeMapping[t]
if !ok {
return "unknown"
@ -91,100 +166,281 @@ func CalcNALUTypeReadable(nalu []byte) string {
return ret
}
func CalcSliceTypeReadable(nalu []byte) string {
naluType := CalcNALUType(nalu)
func ParseSliceTypeReadable(nalu []byte) (string, error) {
naluType := ParseNALUType(nalu[0])
// 这些类型不属于视频帧数据类型没有slice type
switch naluType {
case NALUTypeSEI:
fallthrough
case NALUTypeSPS:
fallthrough
case NALUTypePPS:
return ""
return "", nil
}
t := CalcSliceType(nalu)
t, err := ParseSliceType(nalu)
if err != nil {
return "unknown", err
}
ret, ok := SliceTypeMapping[t]
if !ok {
return "unknown"
return "unknown", ErrAVC
}
return ret
return ret, nil
}
// TODO chef: 参考 hls session的代码重构这个函数
// 从 rtmp avc sequence header 中解析 sps 和 pps
// @param <payload> rtmp message的payload部分 或者 flv tag的payload部分
func ParseAVCSeqHeader(payload []byte) (sps, pps []byte, err error) {
// TODO chef: check if read out of <payload> range
// AVCC Seq Header -> AnnexB
// 注意,返回的内存块为独立的内存块,不依赖指向传输参数<payload>内存块
//
func SPSPPSSeqHeader2AnnexB(payload []byte) ([]byte, error) {
sps, pps, err := ParseSPSPPSFromSeqHeader(payload)
if err != nil {
return nil, ErrAVC
}
var ret []byte
ret = append(ret, NALUStartCode4...)
ret = append(ret, sps...)
ret = append(ret, NALUStartCode4...)
ret = append(ret, pps...)
return ret, nil
}
// 从AVCC格式的Seq Header中得到SPS和PPS内存块
//
// @param <payload> rtmp message的payload部分或者flv tag的payload部分
// 注意包含了头部2字节类型以及3字节的cts
//
// @return 注意返回的spspps内存块指向的是传入参数<payload>内存块的内存
//
func ParseSPSPPSFromSeqHeader(payload []byte) (sps, pps []byte, err error) {
if len(payload) < 5 {
return nil, nil, ErrAVC
}
if payload[0] != 0x17 || payload[1] != 0x00 || payload[2] != 0 || payload[3] != 0 || payload[4] != 0 {
err = ErrAVC
return
return nil, nil, ErrAVC
}
// H.264-AVC-ISO_IEC_14496-15.pdf
// 5.2.4 Decoder configuration information
//configurationVersion := payload[5]
//avcProfileIndication := payload[6]
//profileCompatibility := payload[7]
//avcLevelIndication := payload[8]
//lengthSizeMinusOne := payload[9] & 0x03
if len(payload) < 13 {
return nil, nil, ErrAVC
}
index := 10
numOfSPS := int(payload[index] & 0x1F)
index++
// TODO chef: if the situation of multi sps exist?
// only take the last one.
for i := 0; i < numOfSPS; i++ {
lenOfSPS := int(bele.BEUint16(payload[index:]))
if numOfSPS != 1 {
return nil, nil, ErrAVC
}
spsLength := int(bele.BEUint16(payload[index:]))
index += 2
sps = append(sps, payload[index:index+lenOfSPS]...)
index += lenOfSPS
if len(payload) < 13+spsLength {
return nil, nil, ErrAVC
}
sps = payload[index : index+spsLength]
index += spsLength
if len(payload) < 16+spsLength {
return nil, nil, ErrAVC
}
numOfPPS := int(payload[index] & 0x1F)
index++
for i := 0; i < numOfPPS; i++ {
lenOfPPS := int(bele.BEUint16(payload[index:]))
if numOfPPS != 1 {
return nil, nil, ErrAVC
}
ppsLength := int(bele.BEUint16(payload[index:]))
index += 2
pps = append(pps, payload[index:index+lenOfPPS]...)
index += lenOfPPS
if len(payload) < 16+spsLength+ppsLength {
return nil, nil, ErrAVC
}
pps = payload[index : index+ppsLength]
return
}
// TODO chef: 和HLS中的代码有重复合并一下
// 将rtmp avc数据转换成avc裸流
// AVCC -> AnnexB
//
// @param <payload> rtmp message的payload部分或者flv tag的payload部分
func CaptureAVC(w io.Writer, payload []byte) error {
// 注意包含了头部2字节类型以及3字节的cts
//
func CaptureAVCC2AnnexB(w io.Writer, payload []byte) error {
// sps pps
if payload[0] == 0x17 && payload[1] == 0x00 {
sps, pps, err := ParseAVCSeqHeader(payload)
spspps, err := SPSPPSSeqHeader2AnnexB(payload)
if err != nil {
return err
}
//utilErrors.PanicIfErrorOccur(err)
_, _ = w.Write(NALUStartCode)
_, _ = w.Write(sps)
_, _ = w.Write(NALUStartCode)
_, _ = w.Write(pps)
_, _ = w.Write(spspps)
return nil
}
// payload中可能存在多个nalu
// 先跳过前面type的2字节以及composition time的3字节
for i := 5; i != len(payload); {
naluLen := int(bele.BEUint32(payload[i:]))
i += 4
//naluType := payload[i] & 0x1f
//log.Debugf("naluLen:%d t:%d %s\n", naluLen, naluType, avc.NALUTypeMapping[naluUintType])
_, _ = w.Write(NALUStartCode)
_, _ = w.Write(NALUStartCode4)
_, _ = w.Write(payload[i : i+naluLen])
i += naluLen
break
}
return nil
}
func TryParseSPS(payload []byte) error {
var sps SPS
var err error
br := nazabits.NewBitReader(payload)
t, err := br.ReadBits8(8) //nalType SPS should be 0x67
if t != 0x67 {
nazalog.Errorf("invalid SPS type. expected=%d, actual=%d", 0x67, t)
return ErrAVC
}
sps.ProfileIdc, err = br.ReadBits8(8)
sps.ConstraintSet0Flag, err = br.ReadBits8(1)
sps.ConstraintSet1Flag, err = br.ReadBits8(1)
sps.ConstraintSet2Flag, err = br.ReadBits8(1)
_, err = br.ReadBits8(5)
sps.LevelIdc, err = br.ReadBits8(8)
sps.SPSId, err = br.ReadGolomb()
if sps.SPSId >= 32 {
return ErrAVC
}
// 100 High profile
if sps.ProfileIdc == 100 {
sps.ChromaFormatIdc, err = br.ReadGolomb()
if sps.ChromaFormatIdc > 3 {
return ErrAVC
}
if sps.ChromaFormatIdc == 3 {
sps.ResidualColorTransformFlag, err = br.ReadBits8(1)
}
sps.BitDepthLuma, err = br.ReadGolomb()
sps.BitDepthLuma += 8
sps.BitDepthChroma, err = br.ReadGolomb()
sps.BitDepthChroma += 8
if sps.BitDepthChroma != sps.BitDepthLuma || sps.BitDepthChroma < 8 || sps.BitDepthChroma > 14 {
return ErrAVC
}
sps.TransFormBypass, err = br.ReadBits8(1)
// seq scaling matrix present
flag, _ := br.ReadBits8(1)
if flag == 1 {
nazalog.Debugf("scaling matrix present, not impl yet.")
return ErrAVC
}
} else {
sps.ChromaFormatIdc = 1
sps.BitDepthLuma = 8
sps.BitDepthChroma = 8
}
sps.Log2MaxFrameNumMinus4, err = br.ReadGolomb()
sps.PicOrderCntType, err = br.ReadGolomb()
if sps.PicOrderCntType == 0 {
sps.Log2MaxPicOrderCntLsb, err = br.ReadGolomb()
sps.Log2MaxPicOrderCntLsb += 4
} else {
nazalog.Debugf("not impl yet. sps.PicOrderCntType=%d", sps.PicOrderCntType)
return ErrAVC
}
sps.NumRefFrames, err = br.ReadGolomb()
sps.GapsInFrameNumValueAllowedFlag, err = br.ReadBits8(1)
sps.PicWidthInMbsMinusOne, err = br.ReadGolomb()
sps.PicHeightInMapUnitsMinusOne, err = br.ReadGolomb()
sps.FrameMbsOnlyFlag, err = br.ReadBits8(1)
if sps.FrameMbsOnlyFlag == 0 {
sps.MbAdaptiveFrameFieldFlag, err = br.ReadBits8(1)
}
sps.Direct8X8InferenceFlag, err = br.ReadBits8(1)
sps.FrameCroppingFlag, err = br.ReadBits8(1)
if sps.FrameCroppingFlag == 1 {
sps.FrameCropLeftOffset, err = br.ReadGolomb()
sps.FrameCropRightOffset, err = br.ReadGolomb()
sps.FrameCropTopOffset, err = br.ReadGolomb()
sps.FrameCropBottomOffset, err = br.ReadGolomb()
}
// TODO parse sps vui parameters
nazalog.Debugf("%+v", sps)
var ctx Context
ctx.width = (sps.PicWidthInMbsMinusOne+1)*16 - (sps.FrameCropLeftOffset+sps.FrameCropRightOffset)*2
ctx.height = (2-uint32(sps.FrameMbsOnlyFlag))*(sps.PicHeightInMapUnitsMinusOne+1)*16 - (sps.FrameCropTopOffset+sps.FrameCropBottomOffset)*2
nazalog.Debugf("%+v", ctx)
return err
}
func TryParsePPS(payload []byte) error {
// ISO-14496-10.pdf
// 7.3.2.2 Picture parameter set RBSP syntax
// TODO impl me
return nil
}
// 这个函数是我用来学习解析SPS PPS用的暂时没有实际调用使用
//
// @param <payload> rtmp message的payload部分或者flv tag的payload部分
// 注意包含了头部2字节类型以及3字节的cts
//
func TryParseSeqHeader(payload []byte) error {
if len(payload) < 5 {
return ErrAVC
}
if payload[0] != 0x17 || payload[1] != 0x00 || payload[2] != 0 || payload[3] != 0 || payload[4] != 0 {
return ErrAVC
}
// H.264-AVC-ISO_IEC_14496-15.pdf
// 5.2.4 Decoder configuration information
var dcr DecoderConfigurationRecord
var err error
br := nazabits.NewBitReader(payload[5:])
// TODO check error
dcr.ConfigurationVersion, err = br.ReadBits8(8)
dcr.AVCProfileIndication, err = br.ReadBits8(8)
dcr.ProfileCompatibility, err = br.ReadBits8(8)
dcr.AVCLevelIndication, err = br.ReadBits8(8)
_, err = br.ReadBits8(6) // reserved = '111111'b
dcr.LengthSizeMinusOne, err = br.ReadBits8(2)
_, err = br.ReadBits8(3) // reserved = '111'b
dcr.NumOfSPS, err = br.ReadBits8(5)
b, err := br.ReadBytes(2)
dcr.SPSLength = bele.BEUint16(b)
_, _ = br.ReadBytes(uint(dcr.SPSLength))
_, err = br.ReadBits8(3) // reserved = '111'b
dcr.NumOfPPS, err = br.ReadBits8(5)
b, err = br.ReadBytes(2)
dcr.PPSLength = bele.BEUint16(b)
nazalog.Debugf("%+v", dcr)
// 5 + 5 + 1 + 2
_ = TryParseSPS(payload[13 : 13+dcr.SPSLength])
// 13 + 1 + 2
_ = TryParsePPS(payload[16 : 16+dcr.PPSLength])
return err
}

File diff suppressed because one or more lines are too long

@ -26,17 +26,17 @@ var (
NALUTypeSEISuffix uint8 = 40 // 0x28
)
func CalcNALUTypeReadable(nalu []byte) string {
b, ok := NALUTypeMapping[CalcNALUType(nalu)]
func ParseNALUTypeReadable(v uint8) string {
b, ok := NALUTypeMapping[ParseNALUType(v)]
if !ok {
return "unknown"
}
return b
}
func CalcNALUType(nalu []byte) uint8 {
func ParseNALUType(v uint8) uint8 {
// 6 bit in middle
// 0*** ***0
// or return (nalu[0] >> 1) & 0x3F
return (nalu[0] & 0x7E) >> 1
return (v & 0x7E) >> 1
}

@ -95,16 +95,6 @@ var audNal = []byte{
0x00, 0x00, 0x00, 0x01, 0x09, 0xf0,
}
// AnnexB prefix
var nalStartCode = []byte{
0x00, 0x00, 0x00, 0x01,
}
// TODO chef:
var nalStartCode3 = []byte{
0x00, 0x00, 0x01,
}
// TS Packet Header
const (
syncByte uint8 = 0x47

@ -13,6 +13,8 @@ import (
"fmt"
"os"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/naza/pkg/unique"
"github.com/q191201771/lal/pkg/aac"
@ -48,7 +50,7 @@ type Muxer struct {
fragmentOP FragmentOP
opened bool
adts aac.ADTS
spspps []byte
spspps []byte // AnnexB
videoCC uint8
audioCC uint8
videoOut []byte // 帧
@ -122,7 +124,9 @@ func (m *Muxer) feedVideo(msg rtmp.AVMsg) {
htype := msg.Payload[1]
if ftype == 1 && htype == 0 {
m.cacheSPSPPS(msg)
if err := m.cacheSPSPPS(msg); err != nil {
nazalog.Errorf("[%s] cache spspps failed. err=%+v", m.UniqueKey, err)
}
return
}
@ -143,31 +147,30 @@ func (m *Muxer) feedVideo(msg rtmp.AVMsg) {
nazalog.Errorf("[%s] slice len not enough. i=%d, payload len=%d, nalBytes=%d", m.UniqueKey, i, len(msg.Payload), nalBytes)
return
}
srcNalType := msg.Payload[i]
nalType := srcNalType & 0x1F
nalType := avc.ParseNALUType(msg.Payload[i])
//nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts)
if nalType >= 7 && nalType <= 9 {
//nazalog.Warn("should not reach here.")
if nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD {
i += nalBytes
continue
}
if !audSent {
switch nalType {
case 1, 5, 6:
case avc.NALUTypeSlice, avc.NALUTypeIDRSlice, avc.NALUTypeSEI:
out = append(out, audNal...)
audSent = true
case 9:
case avc.NALUTypeAUD:
audSent = true
}
}
switch nalType {
case 1:
case avc.NALUTypeSlice:
spsppsSent = false
case 5:
case avc.NALUTypeIDRSlice:
if !spsppsSent {
out = m.appendSPSPPS(out)
}
@ -176,9 +179,9 @@ func (m *Muxer) feedVideo(msg rtmp.AVMsg) {
}
if len(out) == 0 {
out = append(out, nalStartCode...)
out = append(out, avc.NALUStartCode4...)
} else {
out = append(out, nalStartCode3...)
out = append(out, avc.NALUStartCode3...)
}
out = append(out, msg.Payload[i:i+nalBytes]...)
@ -241,9 +244,10 @@ func (m *Muxer) cacheAACSeqHeader(msg rtmp.AVMsg) {
_ = m.adts.PutAACSequenceHeader(msg.Payload)
}
func (m *Muxer) cacheSPSPPS(msg rtmp.AVMsg) {
m.spspps = make([]byte, len(msg.Payload))
copy(m.spspps, msg.Payload)
func (m *Muxer) cacheSPSPPS(msg rtmp.AVMsg) error {
var err error
m.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload)
return err
}
func (m *Muxer) appendSPSPPS(out []byte) []byte {
@ -252,24 +256,7 @@ func (m *Muxer) appendSPSPPS(out []byte) []byte {
return out
}
index := 10
nnals := m.spspps[index] & 0x1f
index++
for n := 0; ; n++ {
for ; nnals != 0; nnals-- {
length := int(bele.BEUint16(m.spspps[index:]))
index += 2
out = append(out, nalStartCode...)
out = append(out, m.spspps[index:index+length]...)
index += length
}
if n == 1 {
break
}
nnals = m.spspps[index]
index++
}
out = append(out, m.spspps...)
return out
}

@ -62,20 +62,35 @@ const (
NALUTypeFUA = 28
)
const (
//PositionUnknown uint8 = 0
PositionTypeSingle uint8 = 1
PositionTypeMultiStart uint8 = 2
PositionTypeMultiMiddle uint8 = 3
PositionTypeMultiEnd uint8 = 4
)
type RTPHeader struct {
version uint8 // 2b
version uint8 // 2b *
padding uint8 // 1b
extension uint8 // 1
csrcCount uint8 // 4b
mark uint8 // 1b
mark uint8 // 1b *
packetType uint8 // 7b
seq uint16 // 16b
timestamp uint32 // 32b
ssrc uint32 // 32b
seq uint16 // 16b **
timestamp uint32 // 32b ****
ssrc uint32 // 32b ****
payloadOffset uint32
}
type RTPPacket struct {
header RTPHeader
raw []byte // 包含header内存
positionType uint8
}
func isAudio(packetType uint8) bool {
if packetType == RTPPacketTypeAAC {
return true
@ -102,3 +117,22 @@ func parseRTPPacket(b []byte) (h RTPHeader, err error) {
h.payloadOffset = RTPFixedHeaderLength
return
}
func compareSeq(a, b uint16) int {
if a == b {
return 0
}
if a > b {
if a-b < 16384 {
return 1
}
return -1
}
// a < b
if b-a < 16384 {
return -1
}
return 1
}

@ -0,0 +1,195 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp
// TODO chef: move to package base
type AVPacket struct {
timestamp uint32
payload []byte
}
type RTPPacketListItem struct {
packet RTPPacket
next *RTPPacketListItem
}
type RTPPacketList struct {
head RTPPacketListItem // 哨兵自身不存放rtp包
size int // 实际元素个数
}
type RTPComposer struct {
maxSize int
cb OnAVPacketComposed
list RTPPacketList
composedFlag bool
composedSeq uint16
}
type OnAVPacketComposed func(pkt AVPacket)
func NewRTPComposer(maxSize int, cb OnAVPacketComposed) *RTPComposer {
return &RTPComposer{
maxSize: maxSize,
cb: cb,
}
}
func (r *RTPComposer) Feed(pkt RTPPacket) {
if r.isStale(pkt.header.seq) {
return
}
calcPosition(pkt)
r.insert(pkt)
}
// 检查rtp包是否已经过期
func (r *RTPComposer) isStale(seq uint16) bool {
if !r.composedFlag {
return false
}
return compareSeq(seq, r.composedSeq) <= 0
}
// 计算rtp包处于帧中的位置
func calcPosition(pkt RTPPacket) {
// TODO chef: 目前只写了264部分
b := pkt.raw[pkt.header.payloadOffset:]
// rfc3984 5.3. NAL Unit Octet Usage
//
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |F|NRI| Type |
// +---------------+
outerNALUType := b[0] & 0x1F
if outerNALUType <= NALUTypeSingleMax {
pkt.positionType = PositionTypeSingle
return
} else if outerNALUType == NALUTypeFUA {
// rfc3984 5.8. Fragmentation Units (FUs)
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | FU indicator | FU header | |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
// | |
// | FU payload |
// | |
// | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | :...OPTIONAL RTP padding |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// FU indicator:
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |F|NRI| Type |
// +---------------+
//
// Fu header:
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |S|E|R| Type |
// +---------------+
//fuIndicator := b[0]
fuHeader := b[1]
startCode := (fuHeader & 0x80) != 0
endCode := (fuHeader & 0x40) != 0
if startCode {
pkt.positionType = PositionTypeMultiStart
return
}
if endCode {
pkt.positionType = PositionTypeMultiEnd
return
}
pkt.positionType = PositionTypeMultiMiddle
return
}
}
// 将rtp包插入队列中的合适位置
func (r *RTPComposer) insert(pkt RTPPacket) {
l := r.list
l.size++
p := &l.head
for ; p.next != nil; p = p.next {
res := compareSeq(pkt.header.seq, p.next.packet.header.seq)
switch res {
case 0:
return
case 1:
// noop
case -1:
item := &RTPPacketListItem{
packet: pkt,
next: p.next,
}
p.next = item
return
}
}
item := &RTPPacketListItem{
packet: pkt,
next: p.next,
}
p.next = item
}
// 从头部检查,尽可能多的合成连续的完整的帧
func (r *RTPComposer) tryCompose() {
for p := r.list.head.next; p != nil; p = p.next {
switch p.packet.positionType {
case PositionTypeSingle:
}
}
}
// 从头部检查,是否可以合成一个完整的帧
// TODO chef: 增加参数,用于区分两种逻辑,是连续的帧,还是跳跃的帧
func (r *RTPComposer) tryComposeOne() bool {
first := r.list.head.next
if first == nil {
return false
}
switch first.packet.positionType {
case PositionTypeSingle:
pkt := AVPacket{
timestamp: first.packet.header.timestamp,
payload: first.packet.raw[first.packet.header.payloadOffset:],
}
r.composedFlag = true
r.composedSeq = first.packet.header.seq
r.cb(pkt)
return true
case PositionTypeMultiStart:
// to be continued
}
return false
}

@ -34,14 +34,17 @@ func (r *RTPServer) OnReadUDPPacket(b []byte, addr string, err error) {
if err != nil {
nazalog.Errorf("read invalid rtp packet. err=%+v", err)
}
var rtpPacket RTPPacket
rtpPacket.header = h
rtpPacket.raw = b
switch h.packetType {
case RTPPacketTypeAAC:
s := r.getOrCreateSession(h)
s.FeedAACPacket(b, h)
s.FeedAACPacket(rtpPacket)
case RTPPacketTypeAVC:
nazalog.Debugf("header=%+v, length=%d", h, len(b))
s := r.getOrCreateSession(h)
s.FeedAVCPacket(b, h)
s.FeedAVCPacket(rtpPacket)
}
}

@ -14,6 +14,8 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// TODO chef: 这个模块叫Stream可能更合适
type Session struct {
ssrc uint32
isAudio bool
@ -26,7 +28,8 @@ func NewSession(ssrc uint32, isAudio bool) *Session {
}
}
func (s *Session) FeedAVCPacket(b []byte, h RTPHeader) {
func (s *Session) FeedAVCPacket(pkt RTPPacket) {
b := pkt.raw[pkt.header.payloadOffset:]
// h264
{
// rfc3984 5.3. NAL Unit Octet Usage
@ -37,7 +40,7 @@ func (s *Session) FeedAVCPacket(b []byte, h RTPHeader) {
// |F|NRI| Type |
// +---------------+
outerNALUType := b[h.payloadOffset] & 0x1F
outerNALUType := b[0] & 0x1F
if outerNALUType <= NALUTypeSingleMax {
nazalog.Debugf("SINGLE. naluType=%d %s", outerNALUType, hex.Dump(b[12:32]))
} else if outerNALUType == NALUTypeFUA {
@ -70,8 +73,8 @@ func (s *Session) FeedAVCPacket(b []byte, h RTPHeader) {
// |S|E|R| Type |
// +---------------+
//fuIndicator := b[h.payloadOffset]
fuHeader := b[h.payloadOffset+1]
//fuIndicator := b[0]
fuHeader := b[1]
startCode := (fuHeader & 0x80) != 0
endCode := (fuHeader & 0x40) != 0
@ -79,7 +82,7 @@ func (s *Session) FeedAVCPacket(b []byte, h RTPHeader) {
//naluType := (fuIndicator & 0xE0) | (fuHeader & 0x1F)
naluType := fuHeader & 0x1F
nazalog.Debugf("FUA. outerNALUType=%d, naluType=%d, startCode=%t, endCode=%t %s", outerNALUType, naluType, startCode, endCode, hex.Dump(b[12:32]))
nazalog.Debugf("FUA. outerNALUType=%d, naluType=%d, startCode=%t, endCode=%t %s", outerNALUType, naluType, startCode, endCode, hex.Dump(b[0:16]))
} else {
nazalog.Errorf("error. type=%d", outerNALUType)
}
@ -87,6 +90,7 @@ func (s *Session) FeedAVCPacket(b []byte, h RTPHeader) {
// TODO chef: to be continued
// 从SDP中获取SPSPPS等信息
// 将RTP包合并出视频帧
// 先做一个rtsp server接收rtsp的流录制成ES流吧
}
// h265
@ -108,10 +112,11 @@ func (s *Session) FeedAVCPacket(b []byte, h RTPHeader) {
//}
}
func (s *Session) FeedAACPacket(b []byte, h RTPHeader) {
func (s *Session) FeedAACPacket(pkt RTPPacket) {
return
// TODO chef: 目前只实现了AAC MPEG4-GENERIC/44100/2
/*
// rfc3640 2.11. Global Structure of Payload Format
//
// +---------+-----------+-----------+---------------+
@ -134,17 +139,19 @@ func (s *Session) FeedAACPacket(b []byte, h RTPHeader) {
nazalog.Debugf("%s", hex.Dump(b[12:]))
// au header section
auHeaderLength := b[h.payloadOffset]<<8 + b[h.payloadOffset+1]
var auHeaderLength uint32
auHeaderLength = uint32(b[h.payloadOffset])<<8 + uint32(b[h.payloadOffset+1])
auHeaderLength = (auHeaderLength + 7) / 8
nazalog.Debugf("auHeaderLength=%d", auHeaderLength)
// no auxiliary section
pauh := h.payloadOffset + uint32(2) // au header pos
pau := h.payloadOffset + uint32(2) + uint32(auHeaderLength) // au pos
pau := h.payloadOffset + uint32(2) + auHeaderLength // au pos
auNum := uint32(auHeaderLength) / 2
for i := uint32(0); i < auNum; i++ {
auSize := uint32(b[pauh]<<8 | b[pauh+1]&0xF8) // 13bit
var auSize uint32
auSize = uint32(b[pauh])<<8 | uint32(b[pauh+1]&0xF8) // 13bit
auSize /= 8
auIndex := b[pauh+1] & 0x7
@ -154,6 +161,7 @@ func (s *Session) FeedAACPacket(b []byte, h RTPHeader) {
nazalog.Debugf("%d %d %s", auSize, auIndex, hex.Dump(b[pau:pau+auSize]))
pauh += 2
pau += uint32(auSize)
pau += auSize
}
*/
}

@ -89,7 +89,7 @@ func (s *Server) handleTCPConnect(conn net.Conn) {
_, _ = conn.Write([]byte(resp))
case MethodAnnounce:
nazalog.Info("< R ANNOUNCE")
parseSDP(body)
ParseSDP(body)
resp := PackResponseAnnounce(headers[HeaderFieldCSeq])
_, _ = conn.Write([]byte(resp))
case MethodSetup:

@ -9,6 +9,7 @@
package rtsp
import (
"encoding/base64"
"errors"
"strconv"
"strings"
@ -36,43 +37,46 @@ type SDP struct {
type ARTPMap struct {
PayloadType int
EncodingName string
ClockRate string
ClockRate int
EncodingParameters string
}
type FmtP struct {
Mode string
type FmtPBase struct {
Format int // same as PayloadType
Parameters map[string]string // name -> value
}
func parseSDP(b []byte) SDP {
func ParseSDP(b []byte) SDP {
s := string(b)
lines := strings.Split(s, "\r\n")
for _, line := range lines {
if strings.HasPrefix(line, "a=rtpmap") {
aRTPMap, err := parseARTPMap(line)
aRTPMap, err := ParseARTPMap(line)
nazalog.Debugf("%+v, %v", aRTPMap, err)
}
if strings.HasPrefix(line, "a=fmtp") {
fmtPBase, err := ParseFmtPBase(line)
nazalog.Debugf("%+v, %v", fmtPBase, err)
}
}
return SDP{}
}
func parseARTPMap(s string) (ret ARTPMap, err error) {
func ParseARTPMap(s string) (ret ARTPMap, err error) {
// rfc 3640 3.3.1. General
// rfc 3640 3.3.6. High Bit-rate AAC
//
// a=rtpmap:<payload type> <encoding name>/<clock rate>[/<encoding parameters>]
//
// example
// a=rtpmap:96 H264/90000
// a=rtpmap:97 MPEG4-GENERIC/44100/2
// example see unit test
items := strings.Split(s, ":")
items := strings.SplitN(s, ":", 2)
if len(items) != 2 {
err = ErrSDP
return
}
items = strings.Split(items[1], " ")
items = strings.SplitN(items[1], " ", 2)
if len(items) != 2 {
err = ErrSDP
return
@ -81,28 +85,87 @@ func parseARTPMap(s string) (ret ARTPMap, err error) {
if err != nil {
return
}
items = strings.Split(items[1], "/")
items = strings.SplitN(items[1], "/", 3)
switch len(items) {
case 3:
ret.EncodingParameters = items[2]
fallthrough
case 2:
ret.EncodingName = items[0]
ret.ClockRate = items[1]
ret.ClockRate, err = strconv.Atoi(items[1])
if err != nil {
return
}
default:
err = ErrSDP
}
return
}
func parseFmtP(s string) (ret ARTPMap, err error) {
func ParseFmtPBase(s string) (ret FmtPBase, err error) {
// rfc 3640 4.4.1. The a=fmtp Keyword
//
// a=fmtp:<format> <parameter name>=<value>[; <parameter name>=<value>]
//
// example:
// a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=; profile-level-id=640020
// a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=1210
// example see unit test
ret.Parameters = make(map[string]string)
items := strings.SplitN(s, ":", 2)
if len(items) != 2 {
err = ErrSDP
return
}
items = strings.SplitN(items[1], " ", 2)
if len(items) != 2 {
err = ErrSDP
return
}
ret.Format, err = strconv.Atoi(items[0])
if err != nil {
return
}
items = strings.Split(items[1], ";")
for _, pp := range items {
pp = strings.TrimSpace(pp)
kv := strings.SplitN(pp, "=", 2)
if len(kv) != 2 {
err = ErrSDP
return
}
ret.Parameters[kv[0]] = kv[1]
}
return
}
func ParseSPSPPS(f FmtPBase) (sps, pps []byte, err error) {
if f.Format != RTPPacketTypeAVC {
err = ErrSDP
return
}
v, ok := f.Parameters["sprop-parameter-sets"]
if !ok {
err = ErrSDP
return
}
items := strings.SplitN(v, ",", 2)
if len(items) != 2 {
err = ErrSDP
return
}
sps, err = base64.StdEncoding.DecodeString(items[0])
if err != nil {
return
}
pps, err = base64.StdEncoding.DecodeString(items[1])
return
}

@ -0,0 +1,105 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp
import (
"testing"
"github.com/q191201771/naza/pkg/assert"
)
var goldenSDP = "v=0" + "\r\n" +
"o=- 0 0 IN IP6 ::1" + "\r\n" +
"s=No Name" + "\r\n" +
"c=IN IP6 ::1" + "\r\n" +
"t=0 0" + "\r\n" +
"a=tool:libavformat 57.83.100" + "\r\n" +
"m=video 0 RTP/AVP 96" + "\r\n" +
"b=AS:212" + "\r\n" +
"a=rtpmap:96 H264/90000" + "\r\n" +
"a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=; profile-level-id=640020" + "\r\n" +
"a=control:streamid=0" + "\r\n" +
"m=audio 0 RTP/AVP 97" + "\r\n" +
"b=AS:30" + "\r\n" +
"a=rtpmap:97 MPEG4-GENERIC/44100/2" + "\r\n" +
"a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=1210" + "\r\n" +
"a=control:streamid=1" + "\r\n"
var goldenSPS = []byte{
0x67, 0x64, 0x00, 0x20, 0xAC, 0xD9, 0x40, 0xC0, 0x29, 0xB0, 0x11, 0x00, 0x00, 0x03, 0x00, 0x01, 0x00, 0x00, 0x03, 0x00, 0x32, 0x0F, 0x18, 0x31, 0x96,
}
var goldenPPS = []byte{
0x68, 0xEB, 0xEC, 0xB2, 0x2C,
}
func TestParseSDP(t *testing.T) {
ParseSDP([]byte(goldenSDP))
}
func TestParseARTPMap(t *testing.T) {
golden := map[string]ARTPMap{
"rtpmap:96 H264/90000": {
PayloadType: 96,
EncodingName: "H264",
ClockRate: 90000,
EncodingParameters: "",
},
"rtpmap:97 MPEG4-GENERIC/44100/2": {
PayloadType: 97,
EncodingName: "MPEG4-GENERIC",
ClockRate: 44100,
EncodingParameters: "2",
},
}
for in, out := range golden {
actual, err := ParseARTPMap(in)
assert.Equal(t, nil, err)
assert.Equal(t, out, actual)
}
}
func TestParseFmtPBase(t *testing.T) {
golden := map[string]FmtPBase{
"a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=; profile-level-id=640020": {
Format: 96,
Parameters: map[string]string{
"packetization-mode": "1",
"sprop-parameter-sets": "Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=",
"profile-level-id": "640020",
},
},
"a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=1210": {
Format: 97,
Parameters: map[string]string{
"profile-level-id": "1",
"mode": "AAC-hbr",
"sizelength": "13",
"indexlength": "3",
"indexdeltalength": "3",
"config": "1210",
},
},
}
for in, out := range golden {
actual, err := ParseFmtPBase(in)
assert.Equal(t, nil, err)
assert.Equal(t, out, actual)
}
}
func TestParseSPSPPS(t *testing.T) {
s := "a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=; profile-level-id=640020"
f, err := ParseFmtPBase(s)
assert.Equal(t, nil, err)
sps, pps, err := ParseSPSPPS(f)
assert.Equal(t, nil, err)
assert.Equal(t, goldenSPS, sps)
assert.Equal(t, goldenPPS, pps)
}
Loading…
Cancel
Save