diff --git a/pkg/avc/avc.go b/pkg/avc/avc.go index 19cf516..6d3c602 100644 --- a/pkg/avc/avc.go +++ b/pkg/avc/avc.go @@ -31,6 +31,9 @@ var ErrAVC = errors.New("lal.avc: fxxk") var ( NALUStartCode3 = []byte{0x0, 0x0, 0x1} NALUStartCode4 = []byte{0x0, 0x0, 0x0, 0x1} + + // aud nalu + AUDNALU = []byte{0x00, 0x00, 0x00, 0x01, 0x09, 0xf0} ) var NALUTypeMapping = map[uint8]string{ diff --git a/pkg/base/avpacket.go b/pkg/base/avpacket.go index 3e568d5..9e326f1 100644 --- a/pkg/base/avpacket.go +++ b/pkg/base/avpacket.go @@ -17,11 +17,24 @@ const ( AVPacketPTAAC AVPacketPT = RTPPacketTypeAAC ) -// 目前供package rtsp使用。以后可能被多个package使用。 -// 不排除不同package使用时,字段含义也不同的情况出现。 +// 不同场景使用时,字段含义可能不同。 // 使用AVPacket的地方,应注明各字段的含义。 type AVPacket struct { Timestamp uint32 PayloadType AVPacketPT Payload []byte } + +func (a AVPacketPT) ReadableString() string { + switch a { + case AVPacketPTUnknown: + return "unknown" + case AVPacketPTAVC: + return "avc" + case AVPacketPTHEVC: + return "hevc" + case AVPacketPTAAC: + return "aac" + } + return "" +} diff --git a/pkg/base/rtmp_t.go b/pkg/base/rtmp_t.go index 593a509..a1c45d3 100644 --- a/pkg/base/rtmp_t.go +++ b/pkg/base/rtmp_t.go @@ -62,7 +62,7 @@ const ( // AACAUDIODATA // AACPacketType UI8 // Data UI8[n] - RTMPSoundFormatAAC uint8 = 10 + RTMPSoundFormatAAC uint8 = 10 // 注意,视频的CodecID是后4位,音频是前4位 RTMPAACPacketTypeSeqHeader = 0 RTMPAACPacketTypeRaw = 1 ) @@ -72,7 +72,7 @@ type RTMPHeader struct { MsgLen uint32 // 不包含header的大小 MsgTypeID uint8 // 8 audio 9 video 18 metadata MsgStreamID int - TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳 + TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳,单位毫秒 } type RTMPMsg struct { diff --git a/pkg/hevc/hevc.go b/pkg/hevc/hevc.go index a103638..ee187ae 100644 --- a/pkg/hevc/hevc.go +++ b/pkg/hevc/hevc.go @@ -33,23 +33,37 @@ var ErrHEVC = errors.New("lal.hevc: fxxk") var ( NALUStartCode4 = []byte{0x0, 0x0, 0x0, 0x1} + + // aud nalu + AUDNALU = []byte{0x00, 0x00, 0x00, 0x01, 0x46, 0x01, 0x10} ) var NALUTypeMapping = map[uint8]string{ - NALUTypeSliceTrailR: "SLICE", - NALUTypeSliceIDR: "I", - NALUTypeSliceIDRNLP: "IDR", + NALUTypeSliceTrailN: "TrailN", + NALUTypeSliceTrailR: "TrailR", + NALUTypeSliceIDR: "IDR", + NALUTypeSliceIDRNLP: "IDRNLP", + NALUTypeSliceCRANUT: "CRANUT", + NALUTypeVPS: "VPS", + NALUTypeSPS: "SPS", + NALUTypePPS: "PPS", + NALUTypeAUD: "AUD", NALUTypeSEI: "SEI", - NALUTypeSEISuffix: "SEI", + NALUTypeSEISuffix: "SEISuffix", } + +// ISO_IEC_23008-2_2013.pdf +// Table 7-1 – NAL unit type codes and NAL unit type classes var ( NALUTypeSliceTrailN uint8 = 0 // 0x0 NALUTypeSliceTrailR uint8 = 1 // 0x01 NALUTypeSliceIDR uint8 = 19 // 0x13 NALUTypeSliceIDRNLP uint8 = 20 // 0x14 + NALUTypeSliceCRANUT uint8 = 21 // 0x15 NALUTypeVPS uint8 = 32 // 0x20 NALUTypeSPS uint8 = 33 // 0x21 NALUTypePPS uint8 = 34 // 0x22 + NALUTypeAUD uint8 = 35 // 0x23 NALUTypeSEI uint8 = 39 // 0x27 NALUTypeSEISuffix uint8 = 40 // 0x28 ) diff --git a/pkg/hls/fragment.go b/pkg/hls/fragment.go index ca9b613..8836dcb 100644 --- a/pkg/hls/fragment.go +++ b/pkg/hls/fragment.go @@ -10,8 +10,6 @@ package hls import ( "github.com/q191201771/naza/pkg/filesystemlayer" - - "github.com/q191201771/lal/pkg/mpegts" ) type Fragment struct { @@ -23,7 +21,6 @@ func (f *Fragment) OpenFile(filename string) (err error) { if err != nil { return } - err = f.WriteFile(mpegts.FixedFragmentHeader) return } diff --git a/pkg/hls/hls.go b/pkg/hls/hls.go index 1c7d599..90957c5 100644 --- a/pkg/hls/hls.go +++ b/pkg/hls/hls.go @@ -33,10 +33,6 @@ import ( var ErrHLS = errors.New("lal.hls: fxxk") -var audNal = []byte{ - 0x00, 0x00, 0x00, 0x01, 0x09, 0xf0, -} - const ( // TODO chef 这些在配置项中提供 negMaxfraglen uint64 = 1000 * 90 // 当前包时间戳回滚了,比当前fragment的首个时间戳还小,强制切割新的fragment,单位(毫秒*90) diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index d1ba3e8..aa04b30 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -24,6 +24,8 @@ import ( // 后续从架构上考虑,packet hls,mpegts,logic的分工 type MuxerObserver interface { + OnPATPMT(b []byte) + // @param rawFrame TS流,回调结束后,内部不再使用该内存块 // @param boundary 新的TS流接收者,应该从该标志为true时开始发送数据 // @@ -52,6 +54,7 @@ const ( CleanupModeASAP = 2 ) +// 输入rtmp流,转出hls(m3u8+ts)至文件中,并回调给上层转出ts流 type Muxer struct { UniqueKey string @@ -77,6 +80,7 @@ type Muxer struct { recordMaxFragDuration float64 streamer *Streamer + patpmt []byte } // 记录fragment的一些信息,注意,写m3u8文件时可能还需要用到历史fragment的信息 @@ -133,6 +137,11 @@ func (m *Muxer) FeedRTMPMessage(msg base.RTMPMsg) { m.streamer.FeedRTMPMessage(msg) } +func (m *Muxer) OnPATPMT(b []byte) { + m.patpmt = b + m.observer.OnPATPMT(b) +} + func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) { var boundary bool var packets []byte @@ -204,7 +213,13 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool) error { if m.opened { f := m.getCurrFrag() - // 当前时间戳跳跃很大,或者是往回跳跃超过了阈值,强制开启新的fragment + // 以下情况,强制开启新的分片: + // 1. 当前时间戳 - 当前分片的初始时间戳 > 配置中单个ts分片时长的10倍 + // 原因可能是: + // 1. 当前包的时间戳发生了大的跳跃 + // 2. 一直没有I帧导致没有合适的时间重新切片,堆积的包达到阈值 + // 2. 往回跳跃超过了阈值 + // maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10) if (ts > m.fragTS && ts-m.fragTS > maxfraglen) || (m.fragTS > ts && m.fragTS-ts > negMaxfraglen) { nazalog.Warnf("[%s] force fragment split. fragTS=%d, ts=%d", m.UniqueKey, m.fragTS, ts) @@ -240,6 +255,9 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool) error { } // 开启新的fragment + // 此时的情况是,上层认为是合适的开启分片的时机(比如是I帧),并且 + // 1. 当前是第一个分片 + // 2. 当前不是第一个分片,但是上一个分片已经达到配置时长 if boundary { if err := m.closeFragment(false); err != nil { return err @@ -267,6 +285,9 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error { if err := m.fragment.OpenFile(filenameWithPath); err != nil { return err } + if err := m.fragment.WriteFile(m.patpmt); err != nil { + return err + } } m.opened = true diff --git a/pkg/hls/queue.go b/pkg/hls/queue.go new file mode 100644 index 0000000..5bbb1ac --- /dev/null +++ b/pkg/hls/queue.go @@ -0,0 +1,91 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package hls + +import ( + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/mpegts" +) + +// 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式 +// 一旦判断结束,该队列变成直进直出,不再有实际缓存 +type Queue struct { + maxMsgSize int + data []base.RTMPMsg + observer IQueueObserver + + audioCodecID int + videoCodecID int + done bool +} + +type IQueueObserver interface { + // 该回调一定发生在数据回调之前 + // TODO(chef) 这里可以考虑换成只通知drain,由上层完成FragmentHeader的组装逻辑 + OnPATPMT(b []byte) + + OnPop(msg base.RTMPMsg) +} + +// @param maxMsgSize 最大缓存多少个包 +func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue { + return &Queue{ + maxMsgSize: maxMsgSize, + data: make([]base.RTMPMsg, maxMsgSize)[0:0], + observer: observer, + audioCodecID: -1, + videoCodecID: -1, + done: false, + } +} + +// @param msg 函数调用结束后,内部不持有该内存块 +func (q *Queue) Push(msg base.RTMPMsg) { + if q.done { + q.observer.OnPop(msg) + return + } + + q.data = append(q.data, msg.Clone()) + + switch msg.Header.MsgTypeID { + case base.RTMPTypeIDAudio: + q.audioCodecID = int(msg.Payload[0] >> 4) + case base.RTMPTypeIDVideo: + q.videoCodecID = int(msg.Payload[0] & 0xF) + } + + if q.videoCodecID != -1 && q.audioCodecID != -1 { + q.drain() + return + } + + if len(q.data) >= q.maxMsgSize { + q.drain() + return + } +} + +func (q *Queue) drain() { + switch q.videoCodecID { + case int(base.RTMPCodecIDAVC): + q.observer.OnPATPMT(mpegts.FixedFragmentHeader) + case int(base.RTMPCodecIDHEVC): + q.observer.OnPATPMT(mpegts.FixedFragmentHeaderHEVC) + default: + // TODO(chef) 正确处理只有音频或只有视频的情况 #56 + q.observer.OnPATPMT(mpegts.FixedFragmentHeader) + } + for i := range q.data { + q.observer.OnPop(q.data[i]) + } + q.data = nil + + q.done = true +} diff --git a/pkg/hls/queue_test.go b/pkg/hls/queue_test.go new file mode 100644 index 0000000..26b3025 --- /dev/null +++ b/pkg/hls/queue_test.go @@ -0,0 +1,58 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package hls + +import ( + "testing" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/mpegts" + "github.com/q191201771/naza/pkg/assert" +) + +var ( + fh []byte + poped []base.RTMPMsg +) + +type qo struct { +} + +func (q *qo) OnPATPMT(b []byte) { + fh = b +} + +func (q *qo) OnPop(msg base.RTMPMsg) { + poped = append(poped, msg) +} + +func TestQueue(t *testing.T) { + goldenRTMPMsg := []base.RTMPMsg{ + { + Header: base.RTMPHeader{ + MsgTypeID: base.RTMPTypeIDAudio, + }, + Payload: []byte{0xAF}, + }, + { + Header: base.RTMPHeader{ + MsgTypeID: base.RTMPTypeIDVideo, + }, + Payload: []byte{0x17}, + }, + } + + q := &qo{} + queue := NewQueue(8, q) + for i := range goldenRTMPMsg { + queue.Push(goldenRTMPMsg[i]) + } + assert.Equal(t, mpegts.FixedFragmentHeader, fh) + assert.Equal(t, goldenRTMPMsg, poped) +} diff --git a/pkg/hls/streamer.go b/pkg/hls/streamer.go index 3dcb002..8fdb8be 100644 --- a/pkg/hls/streamer.go +++ b/pkg/hls/streamer.go @@ -12,12 +12,16 @@ import ( "github.com/q191201771/lal/pkg/aac" "github.com/q191201771/lal/pkg/avc" "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/hevc" "github.com/q191201771/lal/pkg/mpegts" "github.com/q191201771/naza/pkg/bele" "github.com/q191201771/naza/pkg/nazalog" ) type StreamerObserver interface { + // @param b const只读内存块,上层可以持有,但是不允许修改 + OnPATPMT(b []byte) + // @param streamer: 供上层获取streamer内部的一些状态,比如spspps是否已缓存,音频缓存队列是否有数据等 // // @param frame: 各字段含义见mpegts.Frame结构体定义 @@ -27,12 +31,14 @@ type StreamerObserver interface { OnFrame(streamer *Streamer, frame *mpegts.Frame) } +// 输入rtmp流,回调转封装成AnnexB格式的流 type Streamer struct { UniqueKey string observer StreamerObserver + calcFragmentHeaderQueue *Queue videoOut []byte // AnnexB TODO chef: 优化这块buff - spspps []byte // AnnexB + spspps []byte // AnnexB 也可能是vps+sps+pps adts aac.ADTS audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 TODO chef: 优化这块buff audioCacheFirstFramePTS uint64 // audioCacheFrames中第一个音频帧的时间戳 TODO chef: rename to DTS @@ -44,17 +50,27 @@ func NewStreamer(observer StreamerObserver) *Streamer { uk := base.GenUKStreamer() videoOut := make([]byte, 1024*1024) videoOut = videoOut[0:0] - return &Streamer{ + streamer := &Streamer{ UniqueKey: uk, observer: observer, videoOut: videoOut, } + streamer.calcFragmentHeaderQueue = NewQueue(calcFragmentHeaderQueueSize, streamer) + return streamer } // @param msg msg.Payload 调用结束后,函数内部不会持有这块内存 // // TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接 func (s *Streamer) FeedRTMPMessage(msg base.RTMPMsg) { + s.calcFragmentHeaderQueue.Push(msg) +} + +func (s *Streamer) OnPATPMT(b []byte) { + s.observer.OnPATPMT(b) +} + +func (s *Streamer) OnPop(msg base.RTMPMsg) { switch msg.Header.MsgTypeID { case base.RTMPTypeIDAudio: s.feedAudio(msg) @@ -80,21 +96,25 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) { nazalog.Errorf("[%s] invalid video message length. len=%d", s.UniqueKey, len(msg.Payload)) return } - if msg.Payload[0]&0xF != base.RTMPCodecIDAVC { + codecID := msg.Payload[0] & 0xF + if codecID != base.RTMPCodecIDAVC && codecID != base.RTMPCodecIDHEVC { return } - ftype := msg.Payload[0] & 0xF0 >> 4 - htype := msg.Payload[1] - // 将数据转换成AnnexB // 如果是sps pps,缓存住,然后直接返回 - if ftype == base.RTMPFrameTypeKey && htype == base.RTMPAVCPacketTypeSeqHeader { - if err := s.cacheSPSPPS(msg); err != nil { + var err error + if msg.IsAVCKeySeqHeader() { + if s.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload); err != nil { nazalog.Errorf("[%s] cache spspps failed. err=%+v", s.UniqueKey, err) } return + } else if msg.IsHEVCKeySeqHeader() { + if s.spspps, err = hevc.VPSSPSPPSSeqHeader2AnnexB(msg.Payload); err != nil { + nazalog.Errorf("[%s] cache vpsspspps failed. err=%+v", s.UniqueKey, err) + } + return } cts := bele.BEUint24(msg.Payload[2:]) @@ -117,46 +137,71 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) { return } - nalType := avc.ParseNALUType(msg.Payload[i]) + var nalType uint8 + switch codecID { + case base.RTMPCodecIDAVC: + nalType = avc.ParseNALUType(msg.Payload[i]) + case base.RTMPCodecIDHEVC: + nalType = hevc.ParseNALUType(msg.Payload[i]) + } - //nazalog.Debugf("[%s] hls: h264 NAL type=%d, len=%d(%d) cts=%d.", s.UniqueKey, nalType, nalBytes, len(msg.Payload), cts) + //nazalog.Debugf("[%s] naltype=%d, len=%d(%d), cts=%d, key=%t.", s.UniqueKey, nalType, nalBytes, len(msg.Payload), cts, msg.IsVideoKeyNALU()) - // sps pps前面已经缓存过了,这里就不用处理了 - // aud有自己的生产逻辑,原流中的aud直接过滤掉 - if nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD { + // 过滤掉原流中的sps pps aud + // sps pps前面已经缓存过了,后面有自己的写入逻辑 + // aud有自己的写入逻辑 + if (codecID == base.RTMPCodecIDAVC && (nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD)) || + (codecID == base.RTMPCodecIDHEVC && (nalType == hevc.NALUTypeVPS || nalType == hevc.NALUTypeSPS || nalType == hevc.NALUTypePPS || nalType == hevc.NALUTypeAUD)) { i += nalBytes continue } + // tag中的首个nalu前面写入aud if !audSent { - switch nalType { - case avc.NALUTypeSlice, avc.NALUTypeIDRSlice, avc.NALUTypeSEI: - // 在前面写入aud - out = append(out, audNal...) - audSent = true - //case avc.NALUTypeAUD: - // // 上面aud已经continue跳过了,应该进不到这个分支,可以考虑删除这个分支代码 - // audSent = true + // 注意,因为前面已经过滤了sps pps aud的信息,所以这里可以认为都是需要用aud分隔的,不需要单独判断了 + //if codecID == base.RTMPCodecIDAVC && (nalType == avc.NALUTypeSEI || nalType == avc.NALUTypeIDRSlice || nalType == avc.NALUTypeSlice) { + switch codecID { + case base.RTMPCodecIDAVC: + out = append(out, avc.AUDNALU...) + case base.RTMPCodecIDHEVC: + out = append(out, hevc.AUDNALU...) } + audSent = true } - switch nalType { - case avc.NALUTypeSlice: - spsppsSent = false - case avc.NALUTypeIDRSlice: - // 如果是首个关键帧,在前面写入sps pps - if !spsppsSent { - var err error - out, err = s.appendSPSPPS(out) - if err != nil { - nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey) - return + // 关键帧前追加sps pps + if codecID == base.RTMPCodecIDAVC { + // h264的逻辑,一个tag中,多个连续的关键帧只追加一个,不连续则每个关键帧前都追加。为什么要这样处理 + switch nalType { + case avc.NALUTypeIDRSlice: + if !spsppsSent { + if out, err = s.appendSPSPPS(out); err != nil { + nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey) + return + } } + spsppsSent = true + case avc.NALUTypeSlice: + // 这里只有P帧,没有SEI。为什么要这样处理 + spsppsSent = false + } + } else { + switch nalType { + case hevc.NALUTypeSliceIDR, hevc.NALUTypeSliceIDRNLP, hevc.NALUTypeSliceCRANUT: + if !spsppsSent { + if out, err = s.appendSPSPPS(out); err != nil { + nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey) + return + } + } + spsppsSent = true + default: + // 这里简化了,只要不是关键帧,就刷新标志 + spsppsSent = false } - spsppsSent = true - } + // 如果写入了aud或spspps,则用start code3,否则start code4。为什么要这样处理 // 这里不知为什么要区分写入两种类型的start code if len(out) == 0 { out = append(out, avc.NALUStartCode4...) @@ -169,7 +214,6 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) { i += nalBytes } - key := ftype == base.RTMPFrameTypeKey && htype == base.RTMPAVCPacketTypeNALU dts := uint64(msg.Header.TimestampAbs) * 90 if s.audioCacheFrames != nil && s.audioCacheFirstFramePTS+maxAudioCacheDelayByVideo < dts { @@ -180,7 +224,7 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) { frame.CC = s.videoCC frame.DTS = dts frame.PTS = frame.DTS + uint64(cts)*90 - frame.Key = key + frame.Key = msg.IsVideoKeyNALU() frame.Raw = out frame.Pid = mpegts.PidVideo frame.Sid = mpegts.StreamIDVideo @@ -254,12 +298,6 @@ func (s *Streamer) cacheAACSeqHeader(msg base.RTMPMsg) error { return s.adts.InitWithAACAudioSpecificConfig(msg.Payload[2:]) } -func (s *Streamer) cacheSPSPPS(msg base.RTMPMsg) error { - var err error - s.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload) - return err -} - func (s *Streamer) appendSPSPPS(out []byte) ([]byte, error) { if s.spspps == nil { return out, ErrHLS diff --git a/pkg/hls/var.go b/pkg/hls/var.go new file mode 100644 index 0000000..f8f115f --- /dev/null +++ b/pkg/hls/var.go @@ -0,0 +1,13 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package hls + +var ( + calcFragmentHeaderQueueSize = 16 +) diff --git a/pkg/httpts/server_sub_session.go b/pkg/httpts/server_sub_session.go index 05094d7..aec2281 100644 --- a/pkg/httpts/server_sub_session.go +++ b/pkg/httpts/server_sub_session.go @@ -14,8 +14,6 @@ import ( "strings" "time" - "github.com/q191201771/lal/pkg/mpegts" - "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/connection" "github.com/q191201771/naza/pkg/nazahttp" @@ -106,11 +104,6 @@ func (session *SubSession) WriteHTTPResponseHeader() { } } -func (session *SubSession) WriteFragmentHeader() { - nazalog.Debugf("[%s] > W http response header.", session.uniqueKey) - session.WriteRawPacket(mpegts.FixedFragmentHeader) -} - func (session *SubSession) WriteRawPacket(pkt []byte) { if session.isWebSocket { wsHeader := base.WSHeader{ diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 2d1af95..f2d1251 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -71,7 +71,7 @@ type Group struct { recordMPEGTS *mpegts.FileWriter // rtmp pub/pull使用 - gopCache *GOPCache + rtmpGopCache *GOPCache httpflvGopCache *GOPCache // rtsp pub使用 @@ -80,6 +80,9 @@ type Group struct { sps []byte pps []byte + // mpegts使用 + patpmt []byte + // tickCount uint32 } @@ -120,7 +123,7 @@ func NewGroup(appName string, streamName string, pullEnable bool, pullURL string httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}), httptsSubSessionSet: make(map[*httpts.SubSession]struct{}), rtspSubSessionSet: make(map[*rtsp.SubSession]struct{}), - gopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum), + rtmpGopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum), httpflvGopCache: NewGOPCache("httpflv", uk, config.HTTPFLVConfig.GOPNum), pullProxy: &pullProxy{}, url2PushProxy: url2PushProxy, @@ -391,7 +394,6 @@ func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) { func (group *Group) AddHTTPTSSubSession(session *httpts.SubSession) { nazalog.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey()) session.WriteHTTPResponseHeader() - session.WriteFragmentHeader() group.mutex.Lock() defer group.mutex.Unlock() @@ -478,6 +480,17 @@ func (group *Group) BroadcastRTMP(msg base.RTMPMsg) { group.broadcastRTMP(msg) } +// hls.Muxer +func (group *Group) OnPATPMT(b []byte) { + group.patpmt = b + + if group.recordMPEGTS != nil { + if err := group.recordMPEGTS.Write(b); err != nil { + nazalog.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err) + } + } +} + // hls.Muxer func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) { // 因为最前面Feed时已经加锁了,所以这里回调上来就不用加锁了 @@ -485,8 +498,9 @@ func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) { for session := range group.httptsSubSessionSet { if session.IsFresh { if boundary { - session.IsFresh = false + session.WriteRawPacket(group.patpmt) session.WriteRawPacket(rawFrame) + session.IsFresh = false } } else { session.WriteRawPacket(rawFrame) @@ -542,6 +556,7 @@ func (group *Group) OnAVConfig(asc, vps, sps, pps []byte) { // rtsp.PubSession func (group *Group) OnAVPacket(pkt base.AVPacket) { + //nazalog.Tracef("[%s] > Group::OnAVPacket. type=%s, ts=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp) msg, err := remux.AVPacket2RTMPMsg(pkt) if err != nil { nazalog.Errorf("[%s] remux av packet to rtmp msg failed. err=+%v", group.UniqueKey, err) @@ -733,20 +748,20 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { // ## 3.1. 如果是新的 sub session,发送已缓存的信息 if session.IsFresh { // TODO chef: 头信息和full gop也可以在SubSession刚加入时发送 - if group.gopCache.Metadata != nil { + if group.rtmpGopCache.Metadata != nil { //nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey) - _ = session.Write(group.gopCache.Metadata) + _ = session.Write(group.rtmpGopCache.Metadata) } - if group.gopCache.VideoSeqHeader != nil { + if group.rtmpGopCache.VideoSeqHeader != nil { //nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey) - _ = session.Write(group.gopCache.VideoSeqHeader) + _ = session.Write(group.rtmpGopCache.VideoSeqHeader) } - if group.gopCache.AACSeqHeader != nil { + if group.rtmpGopCache.AACSeqHeader != nil { //nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey) - _ = session.Write(group.gopCache.AACSeqHeader) + _ = session.Write(group.rtmpGopCache.AACSeqHeader) } - for i := 0; i < group.gopCache.GetGOPCount(); i++ { - for _, item := range group.gopCache.GetGOPDataAt(i) { + for i := 0; i < group.rtmpGopCache.GetGOPCount(); i++ { + for _, item := range group.rtmpGopCache.GetGOPDataAt(i) { _ = session.Write(item) } } @@ -766,17 +781,17 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { } if v.pushSession.IsFresh { - if group.gopCache.Metadata != nil { - _ = v.pushSession.Write(group.gopCache.Metadata) + if group.rtmpGopCache.Metadata != nil { + _ = v.pushSession.Write(group.rtmpGopCache.Metadata) } - if group.gopCache.VideoSeqHeader != nil { - _ = v.pushSession.Write(group.gopCache.VideoSeqHeader) + if group.rtmpGopCache.VideoSeqHeader != nil { + _ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader) } - if group.gopCache.AACSeqHeader != nil { - _ = v.pushSession.Write(group.gopCache.AACSeqHeader) + if group.rtmpGopCache.AACSeqHeader != nil { + _ = v.pushSession.Write(group.rtmpGopCache.AACSeqHeader) } - for i := 0; i < group.gopCache.GetGOPCount(); i++ { - for _, item := range group.gopCache.GetGOPDataAt(i) { + for i := 0; i < group.rtmpGopCache.GetGOPCount(); i++ { + for _, item := range group.rtmpGopCache.GetGOPDataAt(i) { _ = v.pushSession.Write(item) } } @@ -821,7 +836,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { // # 6. 缓存关键信息,以及gop if config.RTMPConfig.Enable { - group.gopCache.Feed(msg, lcd.Get) + group.rtmpGopCache.Feed(msg, lcd.Get) } if config.HTTPFLVConfig.Enable { group.httpflvGopCache.Feed(msg, lrm2ft.Get) @@ -1085,8 +1100,12 @@ func (group *Group) delIn() { } } - group.gopCache.Clear() + group.rtmpGopCache.Clear() group.httpflvGopCache.Clear() + + // TODO(chef) 情况rtsp pub缓存的asc sps pps等数据 + + group.patpmt = nil } func (group *Group) disposeHLSMuxer() { diff --git a/pkg/mpegts/mpegts.go b/pkg/mpegts/mpegts.go index 2276f85..1deeebf 100644 --- a/pkg/mpegts/mpegts.go +++ b/pkg/mpegts/mpegts.go @@ -74,6 +74,68 @@ var FixedFragmentHeader = []byte{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, } +// 每个TS文件都以固定的PAT,PMT开始 +var FixedFragmentHeaderHEVC = []byte{ + /* TS */ + 0x47, 0x40, 0x00, 0x10, 0x00, + /* PSI */ + 0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00, + /* PAT */ + 0x00, 0x01, 0xf0, 0x01, + /* CRC */ + 0x2e, 0x70, 0x19, 0x05, + + /* stuffing 167 bytes */ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + + /* TS */ + 0x47, 0x50, 0x01, 0x10, 0x00, + /* PSI */ + 0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00, + /* PMT */ + 0xe1, 0x00, + 0xf0, 0x00, + //0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */ + 0x24, 0xe1, 0x00, 0xf0, 0x00, + 0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */ + /* CRC */ + //0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */ + 0xc7, 0x72, 0xb7, 0xcb, + /* stuffing 157 bytes */ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, +} + // TS Packet Header const ( syncByte uint8 = 0x47 @@ -95,11 +157,13 @@ const ( const ( // ----------------------------------------------------------------------------- // - // 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax - // 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video + // 0x0F AAC (ISO/IEC 13818-7 Audio with ADTS transport syntax) + // 0x1B AVC (video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video) + // 0x24 HEVC (HEVC video stream as defined in Rec. ITU-T H.265 | ISO/IEC 23008-2 MPEG-H Part 2) // ----------------------------------------------------------------------------- - streamTypeAAC uint8 = 0x0F - streamTypeAVC uint8 = 0x1B + streamTypeAAC uint8 = 0x0F + streamTypeAVC uint8 = 0x1B + streamTypeHEVC uint8 = 0x24 ) // PES diff --git a/pkg/mpegts/mpegts_test.go b/pkg/mpegts/mpegts_test.go index 94caee7..6bc11e9 100644 --- a/pkg/mpegts/mpegts_test.go +++ b/pkg/mpegts/mpegts_test.go @@ -21,7 +21,7 @@ func TestParseFixedTSPacket(t *testing.T) { pat := mpegts.ParsePAT(mpegts.FixedFragmentHeader[5:]) nazalog.Debugf("%+v", pat) - h = mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeader[188:]) + h = mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeaderHEVC[188:]) nazalog.Debugf("%+v", h) pmt := mpegts.ParsePMT(mpegts.FixedFragmentHeader[188+5:]) nazalog.Debugf("%+v", pmt) diff --git a/pkg/mpegts/pack.go b/pkg/mpegts/pack.go index 10b01ba..fc62baf 100644 --- a/pkg/mpegts/pack.go +++ b/pkg/mpegts/pack.go @@ -9,7 +9,7 @@ package mpegts type Frame struct { - PTS uint64 + PTS uint64 // =(毫秒 * 90) DTS uint64 CC uint8 // continuity_counter of TS Header @@ -36,6 +36,8 @@ type Frame struct { // type OnTSPacket func(packet []byte) +// AnnexB格式的流转换为mpegts packet +// // @param frame: 各字段含义见mpegts.Frame结构体定义 // frame.CC 注意,内部会修改frame.CC的值,外部在调用结束后,可保存CC的值,供下次调用时使用 // frame.Raw 函数调用结束后,内部不会持有该内存块