rtmp推流支持enhanced RTMP

pull/276/head
ZSC714725 2 years ago
parent 2ec47bd16b
commit f01c7b4010

@ -72,6 +72,12 @@ const (
RtmpHevcPacketTypeSeqHeader = RtmpAvcPacketTypeSeqHeader
RtmpHevcPacketTypeNalu = RtmpAvcPacketTypeNalu
// enhanced-rtmp packetType https://github.com/veovera/enhanced-rtmp
RtmpExPacketTypeSequenceStart uint8 = 0
RtmpExPacketTypeCodedFrames uint8 = 1 // CompositionTime不为0时有这个类型
RtmpExPacketTypeSequenceEnd uint8 = 2
RtmpExPacketTypeCodedFramesX uint8 = 3
RtmpAvcKeyFrame = RtmpFrameTypeKey<<4 | RtmpCodecIdAvc
RtmpHevcKeyFrame = RtmpFrameTypeKey<<4 | RtmpCodecIdHevc
RtmpAvcInterFrame = RtmpFrameTypeInter<<4 | RtmpCodecIdAvc
@ -114,7 +120,30 @@ func (msg RtmpMsg) IsAvcKeySeqHeader() bool {
}
func (msg RtmpMsg) IsHevcKeySeqHeader() bool {
return msg.Header.MsgTypeId == RtmpTypeIdVideo && msg.Payload[0] == RtmpHevcKeyFrame && msg.Payload[1] == RtmpHevcPacketTypeSeqHeader
if msg.Header.MsgTypeId != RtmpTypeIdVideo {
return false
}
isExtHeader := msg.Payload[0] & 0x80
if isExtHeader != 0 {
packetType := msg.Payload[0] & 0x0f
if msg.Payload[1] == 'h' && msg.Payload[2] == 'v' && msg.Payload[3] == 'c' && msg.Payload[4] == '1' && packetType == RtmpExPacketTypeSequenceStart {
return true
}
} else {
return msg.Payload[0] == RtmpHevcKeyFrame && msg.Payload[1] == RtmpHevcPacketTypeSeqHeader
}
return false
}
func (msg RtmpMsg) IsEnhanced() bool {
isExtHeader := msg.Payload[0] & 0x80
if isExtHeader != 0 {
return true
}
return false
}
func (msg RtmpMsg) IsVideoKeySeqHeader() bool {
@ -129,6 +158,34 @@ func (msg RtmpMsg) IsHevcKeyNalu() bool {
return msg.Header.MsgTypeId == RtmpTypeIdVideo && msg.Payload[0] == RtmpHevcKeyFrame && msg.Payload[1] == RtmpHevcPacketTypeNalu
}
func (msg RtmpMsg) IsEnchanedHevcNalu() bool {
isExtHeader := msg.Payload[0] & 0x80
if isExtHeader != 0 {
packetType := msg.Payload[0] & 0x0f
if packetType == RtmpExPacketTypeCodedFrames || packetType == RtmpExPacketTypeCodedFramesX {
return true
}
}
return false
}
func (msg RtmpMsg) GetEnchanedHevcNaluIndex() int {
isExtHeader := msg.Payload[0] & 0x80
if isExtHeader != 0 {
packetType := msg.Payload[0] & 0x0f
switch packetType {
case RtmpExPacketTypeCodedFrames:
// NALU前面有3个字节CompositionTime
return 5 + 3
case RtmpExPacketTypeCodedFramesX:
return 5
}
}
return 0
}
func (msg RtmpMsg) IsVideoKeyNalu() bool {
return msg.IsAvcKeyNalu() || msg.IsHevcKeyNalu()
}
@ -138,7 +195,16 @@ func (msg RtmpMsg) IsAacSeqHeader() bool {
}
func (msg RtmpMsg) VideoCodecId() uint8 {
return msg.Payload[0] & 0xF
isExtHeader := msg.Payload[0] & 0x80
if isExtHeader == 0 {
return msg.Payload[0] & 0xF
}
if msg.Payload[1] == 'h' && msg.Payload[2] == 'v' && msg.Payload[3] == 'c' && msg.Payload[4] == '1' {
return RtmpCodecIdHevc
}
return RtmpCodecIdAvc
}
func (msg RtmpMsg) AudioCodecId() uint8 {

@ -169,6 +169,21 @@ func VpsSpsPpsSeqHeader2Annexb(payload []byte) ([]byte, error) {
return ret, nil
}
func VpsSpsPpsEnhancedSeqHeader2Annexb(payload []byte) ([]byte, error) {
vps, sps, pps, err := ParseVpsSpsPpsFromEnhancedSeqHeader(payload)
if err != nil {
return nil, err
}
var ret []byte
ret = append(ret, NaluStartCode4...)
ret = append(ret, vps...)
ret = append(ret, NaluStartCode4...)
ret = append(ret, sps...)
ret = append(ret, NaluStartCode4...)
ret = append(ret, pps...)
return ret, nil
}
func BuildVpsSpsPps2Annexb(vps, sps, pps []byte) ([]byte, error) {
ctx := newContext()
if err := ParseVps(vps, ctx); err != nil {
@ -203,6 +218,16 @@ func ParseVpsSpsPpsFromSeqHeader(payload []byte) (vps, sps, pps []byte, err erro
return
}
func ParseVpsSpsPpsFromEnhancedSeqHeader(payload []byte) (vps, sps, pps []byte, err error) {
packetType := payload[0] & 0x0f
if packetType == 0 {
return parseVpsSpsPpsFromRecord(payload)
}
return nil, nil, nil, nazaerrors.Wrap(base.ErrHevc)
}
// ParseVpsSpsPpsFromSeqHeaderWithoutMalloc
//
// 从HVCC格式的Seq Header中得到VPSSPSPPS内存块。
@ -226,6 +251,10 @@ func ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(payload []byte) (vps, sps, pps []b
return nil, nil, nil, nazaerrors.Wrap(base.ErrHevc)
}
return parseVpsSpsPpsFromRecord(payload)
}
func parseVpsSpsPpsFromRecord(payload []byte) (vps, sps, pps []byte, err error) {
index := 27
if numOfArrays := payload[index]; numOfArrays != 3 && numOfArrays != 4 {
return nil, nil, nil, nazaerrors.Wrap(base.ErrHevc)

@ -428,7 +428,14 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
}
}
if msg.IsHevcKeySeqHeader() {
_, sps, _, err := hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload)
var sps []byte
var err error
if msg.IsEnhanced() {
_, sps, _, err = hevc.ParseVpsSpsPpsFromEnhancedSeqHeader(msg.Payload)
} else {
_, sps, _, err = hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload)
}
if err == nil {
var ctx hevc.Context
err = hevc.ParseSps(sps, &ctx)

@ -10,6 +10,7 @@ package remux
import (
"encoding/hex"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
@ -193,7 +194,7 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) {
return
}
codecId := msg.Payload[0] & 0xF
codecId := msg.VideoCodecId()
if codecId != base.RtmpCodecIdAvc && codecId != base.RtmpCodecIdHevc {
return
}
@ -208,9 +209,16 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) {
}
return
} else if msg.IsHevcKeySeqHeader() {
if s.spspps, err = hevc.VpsSpsPpsSeqHeader2Annexb(msg.Payload); err != nil {
Log.Errorf("[%s] cache vpsspspps failed. err=%+v", s.uk, err)
if msg.IsEnhanced() {
if s.spspps, err = hevc.VpsSpsPpsEnhancedSeqHeader2Annexb(msg.Payload); err != nil {
Log.Errorf("[%s] cache vpsspspps failed. err=%+v", s.uk, err)
}
} else {
if s.spspps, err = hevc.VpsSpsPpsSeqHeader2Annexb(msg.Payload); err != nil {
Log.Errorf("[%s] cache vpsspspps failed. err=%+v", s.uk, err)
}
}
return
}
@ -221,7 +229,13 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) {
s.resetVideoOutBuffer()
// msg中可能有多个NALU逐个获取
nals, err := avc.SplitNaluAvcc(msg.Payload[5:])
var nals [][]byte
if codecId == base.RtmpCodecIdHevc && msg.IsEnchanedHevcNalu() {
index := msg.GetEnchanedHevcNaluIndex()
nals, err = avc.SplitNaluAvcc(msg.Payload[index:])
} else {
nals, err = avc.SplitNaluAvcc(msg.Payload[5:])
}
if err != nil {
Log.Errorf("[%s] iterate nalu failed. err=%+v, header=%+v, payload=%s", err, s.uk, msg.Header, hex.Dump(nazabytes.Prefix(msg.Payload, 32)))
return

@ -114,7 +114,12 @@ func (r *Rtmp2RtspRemuxer) FeedRtmpMsg(msg base.RtmpMsg) {
r.sps, r.pps, err = avc.ParseSpsPpsFromSeqHeader(msg.Payload)
Log.Assert(nil, err)
} else if msg.IsHevcKeySeqHeader() {
r.vps, r.sps, r.pps, err = hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload)
if msg.IsEnhanced() {
r.vps, r.sps, r.pps, err = hevc.ParseVpsSpsPpsFromEnhancedSeqHeader(msg.Payload)
} else {
r.vps, r.sps, r.pps, err = hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload)
}
Log.Assert(nil, err)
}
r.doAnalyze()
@ -232,7 +237,14 @@ func (r *Rtmp2RtspRemuxer) remux(msg base.RtmpMsg) {
case base.RtmpTypeIdVideo:
packer = r.getVideoPacker()
if packer != nil {
payload := msg.Payload[5:]
var payload []byte
if msg.VideoCodecId() == base.RtmpCodecIdHevc && msg.IsEnchanedHevcNalu() {
index := msg.GetEnchanedHevcNaluIndex()
payload = msg.Payload[index:]
} else {
payload = msg.Payload[5:]
}
if RtspRemuxerAddSpsPps2KeyFrameFlag {
if msg.IsAvcKeyNalu() && r.sps != nil && r.pps != nil {
payload = h2645.JoinNaluAvcc(r.sps, r.pps, msg.Payload[9:])

Loading…
Cancel
Save