You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lal/pkg/remux/avpacket2rtmp.go

387 lines
11 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package remux
import (
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/bele"
)
// AvPacket2RtmpRemuxer AvPacket转换为RTMP
//
// 目前AvPacket来自:
//
// - RTSP: sdp以及rtp的合帧包
// - gb28181 ps: rtp的合帧包
// - customize: 业务方通过接口向lalserver输入的流
// - 理论上也支持webrtc后续接入webrtc时再验证
//
type AvPacket2RtmpRemuxer struct {
option base.AvPacketStreamOption
onRtmpMsg rtmp.OnReadRtmpAvMsg
hasEmittedMetadata bool
audioType base.AvPacketPt
videoType base.AvPacketPt
vps []byte // 从AvPacket数据中获取
sps []byte
pps []byte
hasAdts2Asc bool
}
func NewAvPacket2RtmpRemuxer() *AvPacket2RtmpRemuxer {
return &AvPacket2RtmpRemuxer{
option: base.DefaultApsOption,
audioType: base.AvPacketPtUnknown,
videoType: base.AvPacketPtUnknown,
}
}
// WithOption
//
// TODO(chef): [refactor] 返回*AvPacket2RtmpRemuxer 202208
//
func (r *AvPacket2RtmpRemuxer) WithOption(modOption func(option *base.AvPacketStreamOption)) {
modOption(&r.option)
}
func (r *AvPacket2RtmpRemuxer) WithOnRtmpMsg(onRtmpMsg rtmp.OnReadRtmpAvMsg) *AvPacket2RtmpRemuxer {
r.onRtmpMsg = onRtmpMsg
return r
}
// ---------------------------------------------------------------------------------------------------------------------
// OnRtpPacket OnSdp OnAvPacket
//
// 实现RTSP回调数据的接口 rtsp.IBaseInSessionObserver ,使得接入时方便些
//
func (r *AvPacket2RtmpRemuxer) OnRtpPacket(pkt rtprtcp.RtpPacket) {
// noop
}
func (r *AvPacket2RtmpRemuxer) OnSdp(sdpCtx sdp.LogicContext) {
r.InitWithAvConfig(sdpCtx.Asc, sdpCtx.Vps, sdpCtx.Sps, sdpCtx.Pps)
}
func (r *AvPacket2RtmpRemuxer) OnAvPacket(pkt base.AvPacket) {
r.FeedAvPacket(pkt)
}
// ---------------------------------------------------------------------------------------------------------------------
// InitWithAvConfig rtsp场景下有时sps、pps等信息只包含在sdp中有时包含在rtp包中
// 这里提供输入sdp的sps、pps等信息的机会如果没有可以不调用
//
// 内部不持有输入参数的内存块
//
func (r *AvPacket2RtmpRemuxer) InitWithAvConfig(asc, vps, sps, pps []byte) {
var err error
var bVsh []byte
var bAsh []byte
if asc != nil {
r.audioType = base.AvPacketPtAac
}
if sps != nil && pps != nil {
if vps != nil {
r.videoType = base.AvPacketPtHevc
} else {
r.videoType = base.AvPacketPtAvc
}
}
if r.audioType == base.AvPacketPtUnknown && r.videoType == base.AvPacketPtUnknown {
Log.Warn("has no audio or video")
return
}
if r.audioType != base.AvPacketPtUnknown {
bAsh, err = aac.MakeAudioDataSeqHeaderWithAsc(asc)
if err != nil {
Log.Errorf("build aac seq header failed. err=%+v", err)
return
}
}
if r.videoType != base.AvPacketPtUnknown {
if r.videoType == base.AvPacketPtHevc {
bVsh, err = hevc.BuildSeqHeaderFromVpsSpsPps(vps, sps, pps)
if err != nil {
Log.Errorf("build hevc seq header failed. err=%+v", err)
return
}
} else {
bVsh, err = avc.BuildSeqHeaderFromSpsPps(sps, pps)
if err != nil {
Log.Errorf("build avc seq header failed. err=%+v", err)
return
}
}
}
if r.audioType != base.AvPacketPtUnknown {
r.emitRtmpAvMsg(true, bAsh, 0)
}
if r.videoType != base.AvPacketPtUnknown {
r.emitRtmpAvMsg(false, bVsh, 0)
}
}
// FeedAvPacket
//
// 输入 base.AvPacket 数据
//
// @param pkt:
//
// - 如果是aac格式是裸数据或带adts头具体取决于前面的配置
// - 如果是h264格式是avcc或Annexb具体取决于前面的配置
//
// 内部不持有该内存块
//
func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) {
switch pkt.PayloadType {
case base.AvPacketPtAvc:
fallthrough
case base.AvPacketPtHevc:
var nals [][]byte
var err error
if r.option.VideoFormat == base.AvPacketStreamVideoFormatAvcc {
nals, err = avc.SplitNaluAvcc(pkt.Payload)
} else {
nals, err = avc.SplitNaluAnnexb(pkt.Payload)
}
if err != nil {
Log.Errorf("iterate nalu failed. err=%+v", err)
return
}
pos := 5
maxLength := len(pkt.Payload) + pos
payload := make([]byte, maxLength)
for _, nal := range nals {
if pkt.PayloadType == base.AvPacketPtAvc {
t := avc.ParseNaluType(nal[0])
if t == avc.NaluTypeSps || t == avc.NaluTypePps {
// 如果有spspps先把它们抽离出来进行缓存
if t == avc.NaluTypeSps {
r.setSps(nal)
} else {
r.setPps(nal)
}
// 注意由于sps空值时可能是nil也可能是[0:0]所以这里不用nil做判断而用len
if len(r.sps) > 0 && len(r.pps) > 0 {
// 凑齐了发送video seq header
//
// TODO(chef): 是否应该判断sps、pps是连续的比如rtp seq的关系或者timestamp是相等的
bVsh, err := avc.BuildSeqHeaderFromSpsPps(r.sps, r.pps)
if err != nil {
Log.Errorf("build avc seq header failed. err=%+v", err)
continue
}
r.emitRtmpAvMsg(false, bVsh, pkt.Timestamp)
//if !AvPacket2RtmpRemuxerAddSpsPps2KeyFrameFlag {
// r.clearVideoSeqHeader()
//}
r.clearVideoSeqHeader()
}
} else {
// 重组实际数据
if t == avc.NaluTypeIdrSlice {
//if AvPacket2RtmpRemuxerAddSpsPps2KeyFrameFlag {
// // 关键帧 组合sps vps与数据帧
// nal = append(append(avc.BuildSpsPps2Annexb(r.sps, r.pps)[4:], hevc.NaluStartCode4...), nal...)
// // 考虑feed时 无sps 与pps数据
// if len(pkt.Payload) < len(nal) {
// maxLength = len(nal) + pos
// payload = make([]byte, maxLength)
// }
//}
payload[0] = base.RtmpAvcKeyFrame
} else {
payload[0] = base.RtmpAvcInterFrame
}
payload[1] = base.RtmpAvcPacketTypeNalu
bele.BePutUint32(payload[pos:], uint32(len(nal)))
pos += 4
copy(payload[pos:], nal)
pos += len(nal)
}
} else if pkt.PayloadType == base.AvPacketPtHevc {
t := hevc.ParseNaluType(nal[0])
if t == hevc.NaluTypeVps || t == hevc.NaluTypeSps || t == hevc.NaluTypePps {
if t == hevc.NaluTypeVps {
r.setVps(nal)
} else if t == hevc.NaluTypeSps {
r.setSps(nal)
} else {
r.setPps(nal)
}
if len(r.vps) > 0 && len(r.sps) > 0 && len(r.pps) > 0 {
bVsh, err := hevc.BuildSeqHeaderFromVpsSpsPps(r.vps, r.sps, r.pps)
if err != nil {
Log.Errorf("build hevc seq header failed. err=%+v", err)
continue
}
r.emitRtmpAvMsg(false, bVsh, pkt.Timestamp)
//if !AvPacket2RtmpRemuxerAddSpsPps2KeyFrameFlag {
// r.clearVideoSeqHeader()
//}
r.clearVideoSeqHeader()
}
} else {
if hevc.IsIrapNalu(t) {
//if AvPacket2RtmpRemuxerAddSpsPps2KeyFrameFlag {
// // 关键帧 组合vps sps pps与数据帧
// annexb, err := hevc.BuildVpsSpsPps2Annexb(r.vps, r.sps, r.pps)
// if err != nil {
// Log.Errorf("build hevc vps sps pps data failed. err=%+v", err)
// }
// nal = append(append(annexb[4:], hevc.NaluStartCode4...), nal...)
// // 考虑feed时 无sps 与pps数据
// if len(pkt.Payload) < len(nal) {
// maxLength = len(nal) + pos
// payload = make([]byte, maxLength)
// }
//}
payload[0] = base.RtmpHevcKeyFrame
} else {
payload[0] = base.RtmpHevcInterFrame
}
payload[1] = base.RtmpHevcPacketTypeNalu
bele.BePutUint32(payload[pos:], uint32(len(nal)))
pos += 4
copy(payload[pos:], nal)
pos += len(nal)
}
}
}
// 有实际数据
if pos > 5 {
r.emitRtmpAvMsg(false, payload[:pos], pkt.Timestamp)
}
case base.AvPacketPtAac:
if r.option.AudioFormat == base.AvPacketStreamAudioFormatRawAac {
length := len(pkt.Payload) + 2
payload := make([]byte, length)
// TODO(chef) 处理此处的魔数0xAF
payload[0] = 0xAF
payload[1] = base.RtmpAacPacketTypeRaw
copy(payload[2:], pkt.Payload)
r.emitRtmpAvMsg(true, payload, pkt.Timestamp)
} else if r.option.AudioFormat == base.AvPacketStreamAudioFormatAdtsAac {
if !r.hasAdts2Asc {
adts, err := aac.MakeAudioDataSeqHeaderWithAdtsHeader(pkt.Payload)
if err != nil {
Log.Errorf("%+v", err)
}
r.emitRtmpAvMsg(true, adts, pkt.Timestamp)
r.hasAdts2Asc = true
}
length := len(pkt.Payload) - 5 // -7+2
payload := make([]byte, length)
payload[0] = 0xAF
payload[1] = base.RtmpAacPacketTypeRaw
copy(payload[7:], pkt.Payload)
r.emitRtmpAvMsg(true, payload, pkt.Timestamp)
}
default:
Log.Warnf("unsupported packet. type=%d", pkt.PayloadType)
}
}
// ---------------------------------------------------------------------------------------------------------------------
func (r *AvPacket2RtmpRemuxer) emitRtmpAvMsg(isAudio bool, payload []byte, timestamp int64) {
if !r.hasEmittedMetadata {
// TODO(chef): 此处简化了从sps中获取宽高写入metadata的逻辑
audiocodecid := -1
videocodecid := -1
if r.audioType == base.AvPacketPtAac {
audiocodecid = int(base.RtmpSoundFormatAac)
}
switch r.videoType {
case base.AvPacketPtAvc:
videocodecid = int(base.RtmpCodecIdAvc)
case base.AvPacketPtHevc:
videocodecid = int(base.RtmpCodecIdHevc)
}
bMetadata, err := rtmp.BuildMetadata(-1, -1, audiocodecid, videocodecid)
if err != nil {
Log.Errorf("build metadata failed. err=%+v", err)
return
}
r.onRtmpMsg(base.RtmpMsg{
Header: base.RtmpHeader{
Csid: rtmp.CsidAmf,
MsgLen: uint32(len(bMetadata)),
MsgTypeId: base.RtmpTypeIdMetadata,
MsgStreamId: rtmp.Msid1,
TimestampAbs: 0,
},
Payload: bMetadata,
})
r.hasEmittedMetadata = true
}
var msg base.RtmpMsg
msg.Header.MsgStreamId = rtmp.Msid1
if isAudio {
msg.Header.Csid = rtmp.CsidAudio
msg.Header.MsgTypeId = base.RtmpTypeIdAudio
} else {
msg.Header.Csid = rtmp.CsidVideo
msg.Header.MsgTypeId = base.RtmpTypeIdVideo
}
msg.Header.MsgLen = uint32(len(payload))
msg.Header.TimestampAbs = uint32(timestamp)
msg.Payload = payload
r.onRtmpMsg(msg)
}
func (r *AvPacket2RtmpRemuxer) setVps(b []byte) {
r.vps = r.vps[0:0]
r.vps = append(r.vps, b...)
}
func (r *AvPacket2RtmpRemuxer) setSps(b []byte) {
r.sps = r.sps[0:0]
r.sps = append(r.sps, b...)
}
func (r *AvPacket2RtmpRemuxer) setPps(b []byte) {
r.pps = r.pps[0:0]
r.pps = append(r.pps, b...)
}
func (r *AvPacket2RtmpRemuxer) clearVideoSeqHeader() {
r.vps = r.vps[0:0]
r.sps = r.sps[0:0]
r.pps = r.pps[0:0]
}