pull/241/head
q191201771 2 years ago
parent bb1b3bc312
commit 3c675d563c

@ -62,7 +62,7 @@ func (packet *AvPacket) IsVideo() bool {
func (packet *AvPacket) DebugString() string {
return fmt.Sprintf("[%p] type=%s, timestamp=%d, pts=%d, len=%d, payload=%s",
packet, packet.PayloadType.ReadableString(), packet.Timestamp, packet.Pts, len(packet.Payload), hex.Dump(nazabytes.Prefix(packet.Payload, 64)))
packet, packet.PayloadType.ReadableString(), packet.Timestamp, packet.Pts, len(packet.Payload), hex.Dump(nazabytes.Prefix(packet.Payload, 8)))
}
// ---------------------------------------------------------------------------------------------------------------------

@ -12,6 +12,7 @@ import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/naza/pkg/nazaatomic"
"github.com/q191201771/naza/pkg/nazalog"
)
type CustomizePubSessionContext struct {
@ -25,11 +26,13 @@ type CustomizePubSessionContext struct {
}
func NewCustomizePubSessionContext(streamName string) *CustomizePubSessionContext {
return &CustomizePubSessionContext{
s := &CustomizePubSessionContext{
uniqueKey: base.GenUkCustomizePubSession(),
streamName: streamName,
remuxer: remux.NewAvPacket2RtmpRemuxer(),
}
nazalog.Infof("[%s] NewCustomizePubSessionContext.", s.uniqueKey)
return s
}
func (ctx *CustomizePubSessionContext) WithOnRtmpMsg(onRtmpMsg func(msg base.RtmpMsg)) *CustomizePubSessionContext {
@ -46,6 +49,7 @@ func (ctx *CustomizePubSessionContext) StreamName() string {
}
func (ctx *CustomizePubSessionContext) Dispose() {
nazalog.Infof("[%s] CustomizePubSessionContext::Dispose.", ctx.uniqueKey)
ctx.disposeFlag.Store(true)
}
@ -59,6 +63,7 @@ func (ctx *CustomizePubSessionContext) FeedAudioSpecificConfig(asc []byte) error
if ctx.disposeFlag.Load() {
return base.ErrDisposedInStream
}
//nazalog.Debugf("[%s] FeedAudioSpecificConfig. asc=%s", ctx.uniqueKey, hex.Dump(asc))
ctx.remuxer.InitWithAvConfig(asc, nil, nil, nil)
return nil
}
@ -67,6 +72,7 @@ func (ctx *CustomizePubSessionContext) FeedAvPacket(packet base.AvPacket) error
if ctx.disposeFlag.Load() {
return base.ErrDisposedInStream
}
//nazalog.Debugf("[%s] FeedAvPacket. packet=%s", ctx.uniqueKey, packet.DebugString())
ctx.remuxer.FeedAvPacket(packet)
return nil
}

@ -17,6 +17,7 @@ import (
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/naza/pkg/nazalog"
"math"
)
@ -51,7 +52,7 @@ type IRtmp2MpegtsRemuxerObserver interface {
// Rtmp2MpegtsRemuxer 输入rtmp流输出mpegts流
type Rtmp2MpegtsRemuxer struct {
UniqueKey string
uk string
observer IRtmp2MpegtsRemuxerObserver
filter *rtmp2MpegtsFilter
@ -108,7 +109,7 @@ type Rtmp2MpegtsRemuxer struct {
func NewRtmp2MpegtsRemuxer(observer IRtmp2MpegtsRemuxerObserver) *Rtmp2MpegtsRemuxer {
uk := base.GenUkRtmp2MpegtsRemuxer()
r := &Rtmp2MpegtsRemuxer{
UniqueKey: uk,
uk: uk,
observer: observer,
basicAudioDts: math.MaxUint64,
basicAudioPts: math.MaxUint64,
@ -119,6 +120,9 @@ func NewRtmp2MpegtsRemuxer(observer IRtmp2MpegtsRemuxerObserver) *Rtmp2MpegtsRem
r.videoOut = make([]byte, initialVideoOutBufferSize)
r.videoOut = r.videoOut[0:0]
r.filter = newRtmp2MpegtsFilter(calcFragmentHeaderQueueSize, r)
nazalog.Debugf("[%s] NewRtmp2MpegtsRemuxer", r.uk)
return r
}
@ -163,6 +167,10 @@ func (s *Rtmp2MpegtsRemuxer) FlushAudio() {
s.audioCc = frame.Cc
}
func (s *Rtmp2MpegtsRemuxer) UniqueKey() string {
return s.uk
}
// ----- implement of iRtmp2MpegtsFilterObserver ----------------------------------------------------------------------------------------------------------------
// onPatPmt onPop
@ -185,7 +193,7 @@ func (s *Rtmp2MpegtsRemuxer) onPop(msg base.RtmpMsg) {
func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) {
if len(msg.Payload) <= 5 {
Log.Warnf("[%s] rtmp msg too short, ignore. header=%+v, payload=%s", s.UniqueKey, msg.Header, hex.Dump(msg.Payload))
Log.Warnf("[%s] rtmp msg too short, ignore. header=%+v, payload=%s", s.uk, msg.Header, hex.Dump(msg.Payload))
return
}
@ -200,12 +208,12 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) {
var err error
if msg.IsAvcKeySeqHeader() {
if s.spspps, err = avc.SpsPpsSeqHeader2Annexb(msg.Payload); err != nil {
Log.Errorf("[%s] cache spspps failed. err=%+v", s.UniqueKey, err)
Log.Errorf("[%s] cache spspps failed. err=%+v", s.uk, err)
}
return
} else if msg.IsHevcKeySeqHeader() {
if s.spspps, err = hevc.VpsSpsPpsSeqHeader2Annexb(msg.Payload); err != nil {
Log.Errorf("[%s] cache vpsspspps failed. err=%+v", s.UniqueKey, err)
Log.Errorf("[%s] cache vpsspspps failed. err=%+v", s.uk, err)
}
return
}
@ -219,7 +227,7 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) {
// msg中可能有多个NALU逐个获取
nals, err := avc.SplitNaluAvcc(msg.Payload[5:])
if err != nil {
Log.Errorf("[%s] iterate nalu failed. err=%+v, header=%+v, payload=%s", err, s.UniqueKey, msg.Header, hex.Dump(nazabytes.Prefix(msg.Payload, 32)))
Log.Errorf("[%s] iterate nalu failed. err=%+v, header=%+v, payload=%s", err, s.uk, msg.Header, hex.Dump(nazabytes.Prefix(msg.Payload, 32)))
return
}
@ -308,7 +316,7 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) {
case avc.NaluTypeIdrSlice:
if !spsppsSent {
if s.videoOut, err = s.appendSpsPps(s.videoOut); err != nil {
Log.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
Log.Warnf("[%s] append spspps by not exist.", s.uk)
return
}
}
@ -322,7 +330,7 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) {
if hevc.IsIrapNalu(nalType) {
if !spsppsSent {
if s.videoOut, err = s.appendSpsPps(s.videoOut); err != nil {
Log.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
Log.Warnf("[%s] append spspps by not exist.", s.uk)
return
}
}
@ -370,24 +378,24 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) {
func (s *Rtmp2MpegtsRemuxer) feedAudio(msg base.RtmpMsg) {
if len(msg.Payload) <= 2 {
Log.Warnf("[%s] rtmp msg too short, ignore. header=%+v, payload=%s", s.UniqueKey, msg.Header, hex.Dump(msg.Payload))
Log.Warnf("[%s] rtmp msg too short, ignore. header=%+v, payload=%s", s.uk, msg.Header, hex.Dump(msg.Payload))
return
}
if msg.Payload[0]>>4 != base.RtmpSoundFormatAac {
return
}
//Log.Debugf("[%s] hls: feedAudio. dts=%d len=%d", s.UniqueKey, msg.Header.TimestampAbs, len(msg.Payload))
//Log.Debugf("[%s] hls: feedAudio. dts=%d len=%d", s.uk, msg.Header.TimestampAbs, len(msg.Payload))
if msg.Payload[1] == base.RtmpAacPacketTypeSeqHeader {
if err := s.cacheAacSeqHeader(msg); err != nil {
Log.Errorf("[%s] cache aac seq header failed. err=%+v", s.UniqueKey, err)
Log.Errorf("[%s] cache aac seq header failed. err=%+v", s.uk, err)
}
return
}
if !s.audioSeqHeaderCached() {
Log.Warnf("[%s] feed audio message but aac seq header not exist.", s.UniqueKey)
Log.Warnf("[%s] feed audio message but aac seq header not exist.", s.uk)
return
}
@ -480,8 +488,8 @@ func (s *Rtmp2MpegtsRemuxer) adjustDtsPts(frame *mpegts.Frame) {
if s.basicAudioPts == math.MaxUint64 {
s.basicAudioPts = frame.Pts
}
frame.Dts = subSafe(frame.Dts, s.basicAudioDts, s.UniqueKey, frame)
frame.Pts = subSafe(frame.Pts, s.basicAudioPts, s.UniqueKey, frame)
frame.Dts = subSafe(frame.Dts, s.basicAudioDts, s.uk, frame)
frame.Pts = subSafe(frame.Pts, s.basicAudioPts, s.uk, frame)
} else if frame.Sid == mpegts.StreamIdVideo {
if s.basicVideoDts == math.MaxUint64 {
s.basicVideoDts = frame.Dts
@ -489,8 +497,8 @@ func (s *Rtmp2MpegtsRemuxer) adjustDtsPts(frame *mpegts.Frame) {
if s.basicVideoPts == math.MaxUint64 {
s.basicVideoPts = frame.Pts
}
frame.Dts = subSafe(frame.Dts, s.basicVideoDts, s.UniqueKey, frame)
frame.Pts = subSafe(frame.Pts, s.basicVideoPts, s.UniqueKey, frame)
frame.Dts = subSafe(frame.Dts, s.basicVideoDts, s.uk, frame)
frame.Pts = subSafe(frame.Pts, s.basicVideoPts, s.uk, frame)
}
}

Loading…
Cancel
Save