[refactor] base.AvPacket的时间戳类型修改为int64, avpacket_stream.go移入package base

pull/156/head
q191201771 3 years ago
parent d2b80a4c47
commit 8482b9e0c6

@ -13,7 +13,6 @@ import (
"fmt"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/naza/pkg/nazalog"
"io/ioutil"
"os"
@ -25,24 +24,8 @@ import (
"github.com/q191201771/naza/pkg/bininfo"
)
// lal/app/demo/customize_lalserver
//
// [what]
// 演示业务方如何通过lalserver的插件功能将自身的流输入到lalserver中
//
// [why]
// 业务方的流输入到lalserver后就可以使用lalserver的功能了比如录制功能使用lalserver所支持的协议从lalserver拉流等等
//
// 提示插件功能是基于代码层面的和与lalserver建立连接将流发送到lalserver是两种不同的方式但是流进入lalserver后效果是一样的
//
// 提示这个demo可以看成是业务方基于lal实现的一个定制化功能增强版的lalserver应用
// 换句话说它并不强制要求在lal的github repo下
//
// [how]
// demo的具体功能是分别读取一个h264 es流文件和一个aac es流文件并将音视频流输入到lalserver中
//
// 注意其实lalserver并不关心业务方的流的来源比如网络or文件or其他也不关心流的原始格式
// 业务方只要将流转换成lalserver所要求的格式调用相应的接口传入数据即可
// 文档见 <lalserver二次开发 - pub接入自定义流>
// https://pengrl.com/lal/#/customize_pub
//
func main() {
@ -94,8 +77,8 @@ func showHowToCustomizePub(lals logic.ILalServer) {
session, err := lals.AddCustomizePubSession(customizePubStreamName)
nazalog.Assert(nil, err)
// 2. 配置session
session.WithOption(func(option *remux.AvPacketStreamOption) {
option.VideoFormat = remux.AvPacketStreamVideoFormatAnnexb
session.WithOption(func(option *base.AvPacketStreamOption) {
option.VideoFormat = base.AvPacketStreamVideoFormatAnnexb
})
asc, err := aac.MakeAscWithAdtsHeader(audioContent[:aac.AdtsHeaderLength])
@ -105,13 +88,13 @@ func showHowToCustomizePub(lals logic.ILalServer) {
// 4. 按时间戳间隔匀速发送音频和视频
startRealTime := time.Now()
startTs := uint32(0)
startTs := int64(0)
for i := range packets {
diffTs := time.Duration(packets[i].Timestamp - startTs)
diffReal := time.Duration(time.Now().Sub(startRealTime).Milliseconds())
diffTs := packets[i].Timestamp - startTs
diffReal := time.Now().Sub(startRealTime).Milliseconds()
//nazalog.Debugf("%d: %s, %d, %d", i, packets[i].DebugString(), diffTs, diffReal)
if diffReal < diffTs {
time.Sleep((diffTs - diffReal) * time.Millisecond)
time.Sleep(time.Duration(diffTs-diffReal) * time.Millisecond)
}
session.FeedAvPacket(packets[i])
}
@ -135,7 +118,7 @@ func readAudioPacketsFromFile(filename string) (audioContent []byte, audioPacket
packet := base.AvPacket{
PayloadType: base.AvPacketPtAac,
Timestamp: uint32(timestamp),
Timestamp: int64(timestamp),
Payload: audioContent[pos+aac.AdtsHeaderLength : pos+int(ctx.AdtsLength)],
}
@ -164,7 +147,7 @@ func readVideoPacketsFromFile(filename string) (videoContent []byte, videoPacket
// 将nal数据转换为lalserver要求的格式输入
packet := base.AvPacket{
PayloadType: base.AvPacketPtAvc,
Timestamp: uint32(timestamp),
Timestamp: int64(timestamp),
Payload: append(avc.NaluStartCode4, nal...),
}

@ -44,7 +44,7 @@ func (a AvPacketPt) ReadableString() string {
//
type AvPacket struct {
PayloadType AvPacketPt
Timestamp uint32 // TODO(chef): 改成int64
Timestamp int64
Payload []byte
}

@ -6,32 +6,44 @@
//
// Author: Chef (191201771@qq.com)
package remux
package base
import "github.com/q191201771/lal/pkg/base"
type AvPacketStreamVideoFormat int
type (
AvPacketStreamAudioFormat int
AvPacketStreamVideoFormat int
)
const (
AvPacketStreamAudioFormatUnknown AvPacketStreamAudioFormat = 0
AvPacketStreamAudioFormatRawAac AvPacketStreamAudioFormat = 1
AvPacketStreamVideoFormatUnknown AvPacketStreamVideoFormat = 0
AvPacketStreamVideoFormatAvcc AvPacketStreamVideoFormat = 1
AvPacketStreamVideoFormatAnnexb AvPacketStreamVideoFormat = 2
)
type AvPacketStreamOption struct {
VideoFormat AvPacketStreamVideoFormat
AudioFormat AvPacketStreamAudioFormat
VideoFormat AvPacketStreamVideoFormat // 视频流的格式,注意,不是指编码格式,而是编码格式确定后,流的格式
}
var DefaultApsOption = AvPacketStreamOption{
AudioFormat: AvPacketStreamAudioFormatRawAac,
VideoFormat: AvPacketStreamVideoFormatAvcc,
}
type IAvPacketStream interface {
// WithOption 修改配置项
//
WithOption(modOption func(option *AvPacketStreamOption))
// FeedAudioSpecificConfig 传入音频AAC的初始化数据
//
// @param asc: AudioSpecificConfig。含义可参考 aac.AscContext, aac.MakeAscWithAdtsHeader 等内容
// @param asc:
//
// AudioSpecificConfig。含义可参考 aac.AscContext, aac.MakeAscWithAdtsHeader 等内容。
// 注意,调用 FeedAvPacket 传入AAC音频数据前需要先调用 FeedAudioSpecificConfig。
// FeedAudioSpecificConfig 在最开始总共调用一次,后面就可以一直调用 FeedAvPacket
//
FeedAudioSpecificConfig(asc []byte)
@ -46,11 +58,12 @@ type IAvPacketStream interface {
// Payload: 音视频数据,格式如下
//
// 如果是音频AAC格式是裸数据不需要adts头。
// 注意,调用 FeedAvPacket 传入音频数据前,需要先调用 FeedAudioSpecificConfig
// 注意,调用 FeedAvPacket 传入AAC音频数据前需要先调用 FeedAudioSpecificConfig。
// FeedAudioSpecificConfig 在最开始总共调用一次,后面就可以一直调用 FeedAvPacket
//
// 如果是视频支持Avcc和Annexb两种格式。
// Avcc也即[<4字节长度 + nal>...]Annexb也即[<4字节start code 00 00 00 01 + nal>...]。
// 注意sps和pps也通过 FeedAvPacket 传入。sps和pps可以单独调用 FeedAvPacket也可以sps+pps+I帧组合在一起调用一次 FeedAvPacket
//
FeedAvPacket(packet base.AvPacket)
FeedAvPacket(packet AvPacket)
}

@ -137,9 +137,7 @@ var (
// ---------------------------------------------------------------------------------------------------------------------
var _ logic.ICustomizePubSessionContext = &logic.CustomizePubSessionContext{}
var _ remux.IAvPacketStream = &logic.CustomizePubSessionContext{}
//var _ remux.IAvPacketStream = &remux.AvPacket2RtmpRemuxer{}
var _ base.IAvPacketStream = &logic.CustomizePubSessionContext{}
// ---------------------------------------------------------------------------------------------------------------------

@ -44,7 +44,7 @@ func (ctx *CustomizePubSessionContext) StreamName() string {
// ---------------------------------------------------------------------------------------------------------------------
func (ctx *CustomizePubSessionContext) WithOption(modOption func(option *remux.AvPacketStreamOption)) {
func (ctx *CustomizePubSessionContext) WithOption(modOption func(option *base.AvPacketStreamOption)) {
ctx.remuxer.WithOption(modOption)
}

@ -10,7 +10,6 @@ package logic
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/remux"
"path/filepath"
)
@ -20,9 +19,11 @@ type ILalServer interface {
RunLoop() error
Dispose()
// AddCustomizePubSession 定制化功能。业务方可以将自己的流输入到 ILalServer 中
// AddCustomizePubSession 定制化增强功能。业务方可以将自己的流输入到 ILalServer 中
//
// @example 示例见 lal/app/demo/customize_lalserver
//
// @doc 文档见 <lalserver二次开发 - pub接入自定义流> https://pengrl.com/lal/#/customize_pub
//
AddCustomizePubSession(streamName string) (ICustomizePubSessionContext, error)
@ -53,12 +54,11 @@ func NewLalServer(modOption ...ModOption) ILalServer {
// ---------------------------------------------------------------------------------------------------------------------
type ICustomizePubSessionContext interface {
// IAvPacketStream 传入音视频数据相关的接口。具体见 remux.IAvPacketStream
// IAvPacketStream 传入音视频数据相关的接口。详细说明见 base.IAvPacketStream
//
remux.IAvPacketStream
base.IAvPacketStream
UniqueKey() string
StreamName() string
}

@ -20,13 +20,14 @@ import (
)
// AvPacket2RtmpRemuxer AvPacket转换为RTMP
//
// 目前AvPacket来自
// - RTSP的sdp以及rtp的合帧包
// - 业务方通过接口向lalserver输入的流
// - 理论上也支持webrtc后续接入webrtc时再验证
//
type AvPacket2RtmpRemuxer struct {
option AvPacketStreamOption
option base.AvPacketStreamOption
onRtmpMsg rtmp.OnReadRtmpAvMsg
hasEmittedMetadata bool
@ -40,13 +41,13 @@ type AvPacket2RtmpRemuxer struct {
func NewAvPacket2RtmpRemuxer() *AvPacket2RtmpRemuxer {
return &AvPacket2RtmpRemuxer{
option: DefaultApsOption,
option: base.DefaultApsOption,
audioType: base.AvPacketPtUnknown,
videoType: base.AvPacketPtUnknown,
}
}
func (r *AvPacket2RtmpRemuxer) WithOption(modOption func(option *AvPacketStreamOption)) {
func (r *AvPacket2RtmpRemuxer) WithOption(modOption func(option *base.AvPacketStreamOption)) {
modOption(&r.option)
}
@ -150,7 +151,7 @@ func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) {
case base.AvPacketPtHevc:
var nals [][]byte
var err error
if r.option.VideoFormat == AvPacketStreamVideoFormatAvcc {
if r.option.VideoFormat == base.AvPacketStreamVideoFormatAvcc {
nals, err = avc.SplitNaluAvcc(pkt.Payload)
} else {
nals, err = avc.SplitNaluAnnexb(pkt.Payload)
@ -257,7 +258,7 @@ func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) {
// ---------------------------------------------------------------------------------------------------------------------
func (r *AvPacket2RtmpRemuxer) emitRtmpAvMsg(isAudio bool, payload []byte, timestamp uint32) {
func (r *AvPacket2RtmpRemuxer) emitRtmpAvMsg(isAudio bool, payload []byte, timestamp int64) {
if !r.hasEmittedMetadata {
// TODO(chef): 此处简化了从sps中获取宽高写入metadata的逻辑
audiocodecid := -1
@ -301,7 +302,7 @@ func (r *AvPacket2RtmpRemuxer) emitRtmpAvMsg(isAudio bool, payload []byte, times
}
msg.Header.MsgLen = uint32(len(payload))
msg.Header.TimestampAbs = timestamp
msg.Header.TimestampAbs = uint32(timestamp)
msg.Payload = payload
r.onRtmpMsg(msg)

@ -178,7 +178,7 @@ func (r *Rtmp2RtspRemuxer) remux(msg base.RtmpMsg) {
packer = r.getAudioPacker()
if packer != nil {
rtppkts = packer.Pack(base.AvPacket{
Timestamp: msg.Header.TimestampAbs,
Timestamp: int64(msg.Header.TimestampAbs),
PayloadType: r.audioPt,
Payload: msg.Payload[2:],
})
@ -187,7 +187,7 @@ func (r *Rtmp2RtspRemuxer) remux(msg base.RtmpMsg) {
packer = r.getVideoPacker()
if packer != nil {
rtppkts = r.getVideoPacker().Pack(base.AvPacket{
Timestamp: msg.Header.TimestampAbs,
Timestamp: int64(msg.Header.TimestampAbs),
PayloadType: r.videoPt,
Payload: msg.Payload[5:],
})

@ -75,7 +75,7 @@ func (unpacker *RtpUnpackerAac) TryUnpackOne(list *RtpPacketList) (unpackedFlag
// one complete access unit
var outPkt base.AvPacket
outPkt.PayloadType = unpacker.payloadType
outPkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
outPkt.Timestamp = int64(p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000))
outPkt.Payload = b[aus[0].pos : aus[0].pos+aus[0].size]
unpacker.onAvPacket(outPkt)
@ -131,7 +131,7 @@ func (unpacker *RtpUnpackerAac) TryUnpackOne(list *RtpPacketList) (unpackedFlag
} else if cacheSize == totalSize {
var outPkt base.AvPacket
outPkt.PayloadType = unpacker.payloadType
outPkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
outPkt.Timestamp = int64(p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000))
for _, a := range as {
outPkt.Payload = append(outPkt.Payload, a...)
}
@ -153,9 +153,9 @@ func (unpacker *RtpUnpackerAac) TryUnpackOne(list *RtpPacketList) (unpackedFlag
for i := range aus {
var outPkt base.AvPacket
outPkt.PayloadType = unpacker.payloadType
outPkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
outPkt.Timestamp = int64(p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000))
// TODO chef: 这里1024的含义
outPkt.Timestamp += uint32(i * (1024 * 1000) / unpacker.clockRate)
outPkt.Timestamp += int64(uint32(i * (1024 * 1000) / unpacker.clockRate))
outPkt.Payload = b[aus[i].pos : aus[i].pos+aus[i].size]
unpacker.onAvPacket(outPkt)
}

@ -48,7 +48,7 @@ func (unpacker *RtpUnpackerAvcHevc) TryUnpackOne(list *RtpPacketList) (unpackedF
case PositionTypeSingle:
var pkt base.AvPacket
pkt.PayloadType = unpacker.payloadType
pkt.Timestamp = first.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
pkt.Timestamp = int64(first.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000))
pkt.Payload = make([]byte, len(first.Packet.Raw)-int(first.Packet.Header.payloadOffset)+4)
bele.BePutUint32(pkt.Payload, uint32(len(first.Packet.Raw))-first.Packet.Header.payloadOffset)
@ -62,7 +62,7 @@ func (unpacker *RtpUnpackerAvcHevc) TryUnpackOne(list *RtpPacketList) (unpackedF
case PositionTypeStapa:
var pkt base.AvPacket
pkt.PayloadType = unpacker.payloadType
pkt.Timestamp = first.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
pkt.Timestamp = int64(first.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000))
// 跳过首字节并且将多nalu前的2字节长度替换成4字节长度
buf := first.Packet.Raw[first.Packet.Header.payloadOffset+1:]
@ -113,7 +113,7 @@ func (unpacker *RtpUnpackerAvcHevc) TryUnpackOne(list *RtpPacketList) (unpackedF
} else if p.Packet.positionType == PositionTypeFuaEnd {
var pkt base.AvPacket
pkt.PayloadType = unpacker.payloadType
pkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
pkt.Timestamp = int64(p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000))
var naluTypeLen int
var naluType []byte

@ -49,7 +49,7 @@ func (a *AvPacketQueue) Feed(pkt base.AvPacket) {
fallthrough
case base.AvPacketPtHevc:
// 时间戳回退了
if int64(pkt.Timestamp) < a.videoBaseTs {
if pkt.Timestamp < a.videoBaseTs {
Log.Warnf("video ts rotate. pktTS=%d, audioBaseTs=%d, videoBaseTs=%d, audioQueue=%d, videoQueue=%d",
pkt.Timestamp, a.audioBaseTs, a.videoBaseTs, a.audioQueue.Size(), a.videoQueue.Size())
a.videoBaseTs = -1
@ -58,14 +58,14 @@ func (a *AvPacketQueue) Feed(pkt base.AvPacket) {
}
// 第一次
if a.videoBaseTs == -1 {
a.videoBaseTs = int64(pkt.Timestamp)
a.videoBaseTs = pkt.Timestamp
}
// 根据基准调节
pkt.Timestamp -= uint32(a.videoBaseTs)
pkt.Timestamp -= a.videoBaseTs
_ = a.videoQueue.PushBack(pkt)
case base.AvPacketPtAac:
if int64(pkt.Timestamp) < a.audioBaseTs {
if pkt.Timestamp < a.audioBaseTs {
Log.Warnf("audio ts rotate. pktTS=%d, audioBaseTs=%d, videoBaseTs=%d, audioQueue=%d, videoQueue=%d",
pkt.Timestamp, a.audioBaseTs, a.videoBaseTs, a.audioQueue.Size(), a.videoQueue.Size())
a.videoBaseTs = -1
@ -73,9 +73,9 @@ func (a *AvPacketQueue) Feed(pkt base.AvPacket) {
a.PopAllByForce()
}
if a.audioBaseTs == -1 {
a.audioBaseTs = int64(pkt.Timestamp)
a.audioBaseTs = pkt.Timestamp
}
pkt.Timestamp -= uint32(a.audioBaseTs)
pkt.Timestamp -= a.audioBaseTs
_ = a.audioQueue.PushBack(pkt)
}

@ -199,14 +199,14 @@ func TestAvPacketQueue(t *testing.T) {
func a(t uint32) base.AvPacket {
return base.AvPacket{
PayloadType: base.AvPacketPtAac,
Timestamp: t,
Timestamp: int64(t),
}
}
func v(t uint32) base.AvPacket {
return base.AvPacket{
PayloadType: base.AvPacketPtAvc,
Timestamp: t,
Timestamp: int64(t),
}
}

Loading…
Cancel
Save