messages:

- [feat] h265新增支持: hls拉流,hls录制;http-ts拉流,mpegts录制。h265支持列表见: https://pengrl.com/lal/#/LALServer (#65)
pull/78/head
q191201771 4 years ago
parent 7167d41fb5
commit 8bf0331b24

@ -31,6 +31,9 @@ var ErrAVC = errors.New("lal.avc: fxxk")
var (
NALUStartCode3 = []byte{0x0, 0x0, 0x1}
NALUStartCode4 = []byte{0x0, 0x0, 0x0, 0x1}
// aud nalu
AUDNALU = []byte{0x00, 0x00, 0x00, 0x01, 0x09, 0xf0}
)
var NALUTypeMapping = map[uint8]string{

@ -17,11 +17,24 @@ const (
AVPacketPTAAC AVPacketPT = RTPPacketTypeAAC
)
// 目前供package rtsp使用。以后可能被多个package使用。
// 不排除不同package使用时字段含义也不同的情况出现。
// 不同场景使用时,字段含义可能不同。
// 使用AVPacket的地方应注明各字段的含义。
type AVPacket struct {
Timestamp uint32
PayloadType AVPacketPT
Payload []byte
}
func (a AVPacketPT) ReadableString() string {
switch a {
case AVPacketPTUnknown:
return "unknown"
case AVPacketPTAVC:
return "avc"
case AVPacketPTHEVC:
return "hevc"
case AVPacketPTAAC:
return "aac"
}
return ""
}

@ -62,7 +62,7 @@ const (
// AACAUDIODATA
// AACPacketType UI8
// Data UI8[n]
RTMPSoundFormatAAC uint8 = 10
RTMPSoundFormatAAC uint8 = 10 // 注意视频的CodecID是后4位音频是前4位
RTMPAACPacketTypeSeqHeader = 0
RTMPAACPacketTypeRaw = 1
)
@ -72,7 +72,7 @@ type RTMPHeader struct {
MsgLen uint32 // 不包含header的大小
MsgTypeID uint8 // 8 audio 9 video 18 metadata
MsgStreamID int
TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳
TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳,单位毫秒
}
type RTMPMsg struct {

@ -33,23 +33,37 @@ var ErrHEVC = errors.New("lal.hevc: fxxk")
var (
NALUStartCode4 = []byte{0x0, 0x0, 0x0, 0x1}
// aud nalu
AUDNALU = []byte{0x00, 0x00, 0x00, 0x01, 0x46, 0x01, 0x10}
)
var NALUTypeMapping = map[uint8]string{
NALUTypeSliceTrailR: "SLICE",
NALUTypeSliceIDR: "I",
NALUTypeSliceIDRNLP: "IDR",
NALUTypeSliceTrailN: "TrailN",
NALUTypeSliceTrailR: "TrailR",
NALUTypeSliceIDR: "IDR",
NALUTypeSliceIDRNLP: "IDRNLP",
NALUTypeSliceCRANUT: "CRANUT",
NALUTypeVPS: "VPS",
NALUTypeSPS: "SPS",
NALUTypePPS: "PPS",
NALUTypeAUD: "AUD",
NALUTypeSEI: "SEI",
NALUTypeSEISuffix: "SEI",
NALUTypeSEISuffix: "SEISuffix",
}
// ISO_IEC_23008-2_2013.pdf
// Table 7-1 NAL unit type codes and NAL unit type classes
var (
NALUTypeSliceTrailN uint8 = 0 // 0x0
NALUTypeSliceTrailR uint8 = 1 // 0x01
NALUTypeSliceIDR uint8 = 19 // 0x13
NALUTypeSliceIDRNLP uint8 = 20 // 0x14
NALUTypeSliceCRANUT uint8 = 21 // 0x15
NALUTypeVPS uint8 = 32 // 0x20
NALUTypeSPS uint8 = 33 // 0x21
NALUTypePPS uint8 = 34 // 0x22
NALUTypeAUD uint8 = 35 // 0x23
NALUTypeSEI uint8 = 39 // 0x27
NALUTypeSEISuffix uint8 = 40 // 0x28
)

@ -10,8 +10,6 @@ package hls
import (
"github.com/q191201771/naza/pkg/filesystemlayer"
"github.com/q191201771/lal/pkg/mpegts"
)
type Fragment struct {
@ -23,7 +21,6 @@ func (f *Fragment) OpenFile(filename string) (err error) {
if err != nil {
return
}
err = f.WriteFile(mpegts.FixedFragmentHeader)
return
}

@ -33,10 +33,6 @@ import (
var ErrHLS = errors.New("lal.hls: fxxk")
var audNal = []byte{
0x00, 0x00, 0x00, 0x01, 0x09, 0xf0,
}
const (
// TODO chef 这些在配置项中提供
negMaxfraglen uint64 = 1000 * 90 // 当前包时间戳回滚了比当前fragment的首个时间戳还小强制切割新的fragment单位毫秒*90

@ -24,6 +24,8 @@ import (
// 后续从架构上考虑packet hls,mpegts,logic的分工
type MuxerObserver interface {
OnPATPMT(b []byte)
// @param rawFrame TS流回调结束后内部不再使用该内存块
// @param boundary 新的TS流接收者应该从该标志为true时开始发送数据
//
@ -52,6 +54,7 @@ const (
CleanupModeASAP = 2
)
// 输入rtmp流转出hls(m3u8+ts)至文件中并回调给上层转出ts流
type Muxer struct {
UniqueKey string
@ -77,6 +80,7 @@ type Muxer struct {
recordMaxFragDuration float64
streamer *Streamer
patpmt []byte
}
// 记录fragment的一些信息注意写m3u8文件时可能还需要用到历史fragment的信息
@ -133,6 +137,11 @@ func (m *Muxer) FeedRTMPMessage(msg base.RTMPMsg) {
m.streamer.FeedRTMPMessage(msg)
}
func (m *Muxer) OnPATPMT(b []byte) {
m.patpmt = b
m.observer.OnPATPMT(b)
}
func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) {
var boundary bool
var packets []byte
@ -204,7 +213,13 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool) error {
if m.opened {
f := m.getCurrFrag()
// 当前时间戳跳跃很大或者是往回跳跃超过了阈值强制开启新的fragment
// 以下情况,强制开启新的分片:
// 1. 当前时间戳 - 当前分片的初始时间戳 > 配置中单个ts分片时长的10倍
// 原因可能是:
// 1. 当前包的时间戳发生了大的跳跃
// 2. 一直没有I帧导致没有合适的时间重新切片堆积的包达到阈值
// 2. 往回跳跃超过了阈值
//
maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10)
if (ts > m.fragTS && ts-m.fragTS > maxfraglen) || (m.fragTS > ts && m.fragTS-ts > negMaxfraglen) {
nazalog.Warnf("[%s] force fragment split. fragTS=%d, ts=%d", m.UniqueKey, m.fragTS, ts)
@ -240,6 +255,9 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool) error {
}
// 开启新的fragment
// 此时的情况是上层认为是合适的开启分片的时机比如是I帧并且
// 1. 当前是第一个分片
// 2. 当前不是第一个分片,但是上一个分片已经达到配置时长
if boundary {
if err := m.closeFragment(false); err != nil {
return err
@ -267,6 +285,9 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error {
if err := m.fragment.OpenFile(filenameWithPath); err != nil {
return err
}
if err := m.fragment.WriteFile(m.patpmt); err != nil {
return err
}
}
m.opened = true

@ -0,0 +1,91 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/mpegts"
)
// 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式
// 一旦判断结束,该队列变成直进直出,不再有实际缓存
type Queue struct {
maxMsgSize int
data []base.RTMPMsg
observer IQueueObserver
audioCodecID int
videoCodecID int
done bool
}
type IQueueObserver interface {
// 该回调一定发生在数据回调之前
// TODO(chef) 这里可以考虑换成只通知drain由上层完成FragmentHeader的组装逻辑
OnPATPMT(b []byte)
OnPop(msg base.RTMPMsg)
}
// @param maxMsgSize 最大缓存多少个包
func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue {
return &Queue{
maxMsgSize: maxMsgSize,
data: make([]base.RTMPMsg, maxMsgSize)[0:0],
observer: observer,
audioCodecID: -1,
videoCodecID: -1,
done: false,
}
}
// @param msg 函数调用结束后,内部不持有该内存块
func (q *Queue) Push(msg base.RTMPMsg) {
if q.done {
q.observer.OnPop(msg)
return
}
q.data = append(q.data, msg.Clone())
switch msg.Header.MsgTypeID {
case base.RTMPTypeIDAudio:
q.audioCodecID = int(msg.Payload[0] >> 4)
case base.RTMPTypeIDVideo:
q.videoCodecID = int(msg.Payload[0] & 0xF)
}
if q.videoCodecID != -1 && q.audioCodecID != -1 {
q.drain()
return
}
if len(q.data) >= q.maxMsgSize {
q.drain()
return
}
}
func (q *Queue) drain() {
switch q.videoCodecID {
case int(base.RTMPCodecIDAVC):
q.observer.OnPATPMT(mpegts.FixedFragmentHeader)
case int(base.RTMPCodecIDHEVC):
q.observer.OnPATPMT(mpegts.FixedFragmentHeaderHEVC)
default:
// TODO(chef) 正确处理只有音频或只有视频的情况 #56
q.observer.OnPATPMT(mpegts.FixedFragmentHeader)
}
for i := range q.data {
q.observer.OnPop(q.data[i])
}
q.data = nil
q.done = true
}

@ -0,0 +1,58 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"testing"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/naza/pkg/assert"
)
var (
fh []byte
poped []base.RTMPMsg
)
type qo struct {
}
func (q *qo) OnPATPMT(b []byte) {
fh = b
}
func (q *qo) OnPop(msg base.RTMPMsg) {
poped = append(poped, msg)
}
func TestQueue(t *testing.T) {
goldenRTMPMsg := []base.RTMPMsg{
{
Header: base.RTMPHeader{
MsgTypeID: base.RTMPTypeIDAudio,
},
Payload: []byte{0xAF},
},
{
Header: base.RTMPHeader{
MsgTypeID: base.RTMPTypeIDVideo,
},
Payload: []byte{0x17},
},
}
q := &qo{}
queue := NewQueue(8, q)
for i := range goldenRTMPMsg {
queue.Push(goldenRTMPMsg[i])
}
assert.Equal(t, mpegts.FixedFragmentHeader, fh)
assert.Equal(t, goldenRTMPMsg, poped)
}

@ -12,12 +12,16 @@ import (
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
type StreamerObserver interface {
// @param b const只读内存块上层可以持有但是不允许修改
OnPATPMT(b []byte)
// @param streamer: 供上层获取streamer内部的一些状态比如spspps是否已缓存音频缓存队列是否有数据等
//
// @param frame: 各字段含义见mpegts.Frame结构体定义
@ -27,12 +31,14 @@ type StreamerObserver interface {
OnFrame(streamer *Streamer, frame *mpegts.Frame)
}
// 输入rtmp流回调转封装成AnnexB格式的流
type Streamer struct {
UniqueKey string
observer StreamerObserver
calcFragmentHeaderQueue *Queue
videoOut []byte // AnnexB TODO chef: 优化这块buff
spspps []byte // AnnexB
spspps []byte // AnnexB 也可能是vps+sps+pps
adts aac.ADTS
audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 TODO chef: 优化这块buff
audioCacheFirstFramePTS uint64 // audioCacheFrames中第一个音频帧的时间戳 TODO chef: rename to DTS
@ -44,17 +50,27 @@ func NewStreamer(observer StreamerObserver) *Streamer {
uk := base.GenUKStreamer()
videoOut := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
return &Streamer{
streamer := &Streamer{
UniqueKey: uk,
observer: observer,
videoOut: videoOut,
}
streamer.calcFragmentHeaderQueue = NewQueue(calcFragmentHeaderQueueSize, streamer)
return streamer
}
// @param msg msg.Payload 调用结束后,函数内部不会持有这块内存
//
// TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接
func (s *Streamer) FeedRTMPMessage(msg base.RTMPMsg) {
s.calcFragmentHeaderQueue.Push(msg)
}
func (s *Streamer) OnPATPMT(b []byte) {
s.observer.OnPATPMT(b)
}
func (s *Streamer) OnPop(msg base.RTMPMsg) {
switch msg.Header.MsgTypeID {
case base.RTMPTypeIDAudio:
s.feedAudio(msg)
@ -80,21 +96,25 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) {
nazalog.Errorf("[%s] invalid video message length. len=%d", s.UniqueKey, len(msg.Payload))
return
}
if msg.Payload[0]&0xF != base.RTMPCodecIDAVC {
codecID := msg.Payload[0] & 0xF
if codecID != base.RTMPCodecIDAVC && codecID != base.RTMPCodecIDHEVC {
return
}
ftype := msg.Payload[0] & 0xF0 >> 4
htype := msg.Payload[1]
// 将数据转换成AnnexB
// 如果是sps pps缓存住然后直接返回
if ftype == base.RTMPFrameTypeKey && htype == base.RTMPAVCPacketTypeSeqHeader {
if err := s.cacheSPSPPS(msg); err != nil {
var err error
if msg.IsAVCKeySeqHeader() {
if s.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload); err != nil {
nazalog.Errorf("[%s] cache spspps failed. err=%+v", s.UniqueKey, err)
}
return
} else if msg.IsHEVCKeySeqHeader() {
if s.spspps, err = hevc.VPSSPSPPSSeqHeader2AnnexB(msg.Payload); err != nil {
nazalog.Errorf("[%s] cache vpsspspps failed. err=%+v", s.UniqueKey, err)
}
return
}
cts := bele.BEUint24(msg.Payload[2:])
@ -117,46 +137,71 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) {
return
}
nalType := avc.ParseNALUType(msg.Payload[i])
var nalType uint8
switch codecID {
case base.RTMPCodecIDAVC:
nalType = avc.ParseNALUType(msg.Payload[i])
case base.RTMPCodecIDHEVC:
nalType = hevc.ParseNALUType(msg.Payload[i])
}
//nazalog.Debugf("[%s] hls: h264 NAL type=%d, len=%d(%d) cts=%d.", s.UniqueKey, nalType, nalBytes, len(msg.Payload), cts)
//nazalog.Debugf("[%s] naltype=%d, len=%d(%d), cts=%d, key=%t.", s.UniqueKey, nalType, nalBytes, len(msg.Payload), cts, msg.IsVideoKeyNALU())
// sps pps前面已经缓存过了这里就不用处理了
// aud有自己的生产逻辑原流中的aud直接过滤掉
if nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD {
// 过滤掉原流中的sps pps aud
// sps pps前面已经缓存过了后面有自己的写入逻辑
// aud有自己的写入逻辑
if (codecID == base.RTMPCodecIDAVC && (nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD)) ||
(codecID == base.RTMPCodecIDHEVC && (nalType == hevc.NALUTypeVPS || nalType == hevc.NALUTypeSPS || nalType == hevc.NALUTypePPS || nalType == hevc.NALUTypeAUD)) {
i += nalBytes
continue
}
// tag中的首个nalu前面写入aud
if !audSent {
switch nalType {
case avc.NALUTypeSlice, avc.NALUTypeIDRSlice, avc.NALUTypeSEI:
// 在前面写入aud
out = append(out, audNal...)
audSent = true
//case avc.NALUTypeAUD:
// // 上面aud已经continue跳过了应该进不到这个分支可以考虑删除这个分支代码
// audSent = true
// 注意因为前面已经过滤了sps pps aud的信息所以这里可以认为都是需要用aud分隔的不需要单独判断了
//if codecID == base.RTMPCodecIDAVC && (nalType == avc.NALUTypeSEI || nalType == avc.NALUTypeIDRSlice || nalType == avc.NALUTypeSlice) {
switch codecID {
case base.RTMPCodecIDAVC:
out = append(out, avc.AUDNALU...)
case base.RTMPCodecIDHEVC:
out = append(out, hevc.AUDNALU...)
}
audSent = true
}
switch nalType {
case avc.NALUTypeSlice:
spsppsSent = false
case avc.NALUTypeIDRSlice:
// 如果是首个关键帧在前面写入sps pps
if !spsppsSent {
var err error
out, err = s.appendSPSPPS(out)
if err != nil {
nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
return
// 关键帧前追加sps pps
if codecID == base.RTMPCodecIDAVC {
// h264的逻辑一个tag中多个连续的关键帧只追加一个不连续则每个关键帧前都追加。为什么要这样处理
switch nalType {
case avc.NALUTypeIDRSlice:
if !spsppsSent {
if out, err = s.appendSPSPPS(out); err != nil {
nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
return
}
}
spsppsSent = true
case avc.NALUTypeSlice:
// 这里只有P帧没有SEI。为什么要这样处理
spsppsSent = false
}
} else {
switch nalType {
case hevc.NALUTypeSliceIDR, hevc.NALUTypeSliceIDRNLP, hevc.NALUTypeSliceCRANUT:
if !spsppsSent {
if out, err = s.appendSPSPPS(out); err != nil {
nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
return
}
}
spsppsSent = true
default:
// 这里简化了,只要不是关键帧,就刷新标志
spsppsSent = false
}
spsppsSent = true
}
// 如果写入了aud或spspps则用start code3否则start code4。为什么要这样处理
// 这里不知为什么要区分写入两种类型的start code
if len(out) == 0 {
out = append(out, avc.NALUStartCode4...)
@ -169,7 +214,6 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) {
i += nalBytes
}
key := ftype == base.RTMPFrameTypeKey && htype == base.RTMPAVCPacketTypeNALU
dts := uint64(msg.Header.TimestampAbs) * 90
if s.audioCacheFrames != nil && s.audioCacheFirstFramePTS+maxAudioCacheDelayByVideo < dts {
@ -180,7 +224,7 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) {
frame.CC = s.videoCC
frame.DTS = dts
frame.PTS = frame.DTS + uint64(cts)*90
frame.Key = key
frame.Key = msg.IsVideoKeyNALU()
frame.Raw = out
frame.Pid = mpegts.PidVideo
frame.Sid = mpegts.StreamIDVideo
@ -254,12 +298,6 @@ func (s *Streamer) cacheAACSeqHeader(msg base.RTMPMsg) error {
return s.adts.InitWithAACAudioSpecificConfig(msg.Payload[2:])
}
func (s *Streamer) cacheSPSPPS(msg base.RTMPMsg) error {
var err error
s.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload)
return err
}
func (s *Streamer) appendSPSPPS(out []byte) ([]byte, error) {
if s.spspps == nil {
return out, ErrHLS

@ -0,0 +1,13 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
var (
calcFragmentHeaderQueueSize = 16
)

@ -14,8 +14,6 @@ import (
"strings"
"time"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazahttp"
@ -106,11 +104,6 @@ func (session *SubSession) WriteHTTPResponseHeader() {
}
}
func (session *SubSession) WriteFragmentHeader() {
nazalog.Debugf("[%s] > W http response header.", session.uniqueKey)
session.WriteRawPacket(mpegts.FixedFragmentHeader)
}
func (session *SubSession) WriteRawPacket(pkt []byte) {
if session.isWebSocket {
wsHeader := base.WSHeader{

@ -71,7 +71,7 @@ type Group struct {
recordMPEGTS *mpegts.FileWriter
// rtmp pub/pull使用
gopCache *GOPCache
rtmpGopCache *GOPCache
httpflvGopCache *GOPCache
// rtsp pub使用
@ -80,6 +80,9 @@ type Group struct {
sps []byte
pps []byte
// mpegts使用
patpmt []byte
//
tickCount uint32
}
@ -120,7 +123,7 @@ func NewGroup(appName string, streamName string, pullEnable bool, pullURL string
httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}),
httptsSubSessionSet: make(map[*httpts.SubSession]struct{}),
rtspSubSessionSet: make(map[*rtsp.SubSession]struct{}),
gopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum),
rtmpGopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum),
httpflvGopCache: NewGOPCache("httpflv", uk, config.HTTPFLVConfig.GOPNum),
pullProxy: &pullProxy{},
url2PushProxy: url2PushProxy,
@ -391,7 +394,6 @@ func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) {
func (group *Group) AddHTTPTSSubSession(session *httpts.SubSession) {
nazalog.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey())
session.WriteHTTPResponseHeader()
session.WriteFragmentHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
@ -478,6 +480,17 @@ func (group *Group) BroadcastRTMP(msg base.RTMPMsg) {
group.broadcastRTMP(msg)
}
// hls.Muxer
func (group *Group) OnPATPMT(b []byte) {
group.patpmt = b
if group.recordMPEGTS != nil {
if err := group.recordMPEGTS.Write(b); err != nil {
nazalog.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err)
}
}
}
// hls.Muxer
func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) {
// 因为最前面Feed时已经加锁了所以这里回调上来就不用加锁了
@ -485,8 +498,9 @@ func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) {
for session := range group.httptsSubSessionSet {
if session.IsFresh {
if boundary {
session.IsFresh = false
session.WriteRawPacket(group.patpmt)
session.WriteRawPacket(rawFrame)
session.IsFresh = false
}
} else {
session.WriteRawPacket(rawFrame)
@ -542,6 +556,7 @@ func (group *Group) OnAVConfig(asc, vps, sps, pps []byte) {
// rtsp.PubSession
func (group *Group) OnAVPacket(pkt base.AVPacket) {
//nazalog.Tracef("[%s] > Group::OnAVPacket. type=%s, ts=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp)
msg, err := remux.AVPacket2RTMPMsg(pkt)
if err != nil {
nazalog.Errorf("[%s] remux av packet to rtmp msg failed. err=+%v", group.UniqueKey, err)
@ -733,20 +748,20 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
// ## 3.1. 如果是新的 sub session发送已缓存的信息
if session.IsFresh {
// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
if group.gopCache.Metadata != nil {
if group.rtmpGopCache.Metadata != nil {
//nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey)
_ = session.Write(group.gopCache.Metadata)
_ = session.Write(group.rtmpGopCache.Metadata)
}
if group.gopCache.VideoSeqHeader != nil {
if group.rtmpGopCache.VideoSeqHeader != nil {
//nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey)
_ = session.Write(group.gopCache.VideoSeqHeader)
_ = session.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.gopCache.AACSeqHeader != nil {
if group.rtmpGopCache.AACSeqHeader != nil {
//nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey)
_ = session.Write(group.gopCache.AACSeqHeader)
_ = session.Write(group.rtmpGopCache.AACSeqHeader)
}
for i := 0; i < group.gopCache.GetGOPCount(); i++ {
for _, item := range group.gopCache.GetGOPDataAt(i) {
for i := 0; i < group.rtmpGopCache.GetGOPCount(); i++ {
for _, item := range group.rtmpGopCache.GetGOPDataAt(i) {
_ = session.Write(item)
}
}
@ -766,17 +781,17 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
}
if v.pushSession.IsFresh {
if group.gopCache.Metadata != nil {
_ = v.pushSession.Write(group.gopCache.Metadata)
if group.rtmpGopCache.Metadata != nil {
_ = v.pushSession.Write(group.rtmpGopCache.Metadata)
}
if group.gopCache.VideoSeqHeader != nil {
_ = v.pushSession.Write(group.gopCache.VideoSeqHeader)
if group.rtmpGopCache.VideoSeqHeader != nil {
_ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.gopCache.AACSeqHeader != nil {
_ = v.pushSession.Write(group.gopCache.AACSeqHeader)
if group.rtmpGopCache.AACSeqHeader != nil {
_ = v.pushSession.Write(group.rtmpGopCache.AACSeqHeader)
}
for i := 0; i < group.gopCache.GetGOPCount(); i++ {
for _, item := range group.gopCache.GetGOPDataAt(i) {
for i := 0; i < group.rtmpGopCache.GetGOPCount(); i++ {
for _, item := range group.rtmpGopCache.GetGOPDataAt(i) {
_ = v.pushSession.Write(item)
}
}
@ -821,7 +836,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
// # 6. 缓存关键信息以及gop
if config.RTMPConfig.Enable {
group.gopCache.Feed(msg, lcd.Get)
group.rtmpGopCache.Feed(msg, lcd.Get)
}
if config.HTTPFLVConfig.Enable {
group.httpflvGopCache.Feed(msg, lrm2ft.Get)
@ -1085,8 +1100,12 @@ func (group *Group) delIn() {
}
}
group.gopCache.Clear()
group.rtmpGopCache.Clear()
group.httpflvGopCache.Clear()
// TODO(chef) 情况rtsp pub缓存的asc sps pps等数据
group.patpmt = nil
}
func (group *Group) disposeHLSMuxer() {

@ -74,6 +74,68 @@ var FixedFragmentHeader = []byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
// 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeaderHEVC = []byte{
/* TS */
0x47, 0x40, 0x00, 0x10, 0x00,
/* PSI */
0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00,
/* PAT */
0x00, 0x01, 0xf0, 0x01,
/* CRC */
0x2e, 0x70, 0x19, 0x05,
/* stuffing 167 bytes */
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
/* TS */
0x47, 0x50, 0x01, 0x10, 0x00,
/* PSI */
0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00,
/* PMT */
0xe1, 0x00,
0xf0, 0x00,
//0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */
0x24, 0xe1, 0x00, 0xf0, 0x00,
0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */
/* CRC */
//0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */
0xc7, 0x72, 0xb7, 0xcb,
/* stuffing 157 bytes */
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
// TS Packet Header
const (
syncByte uint8 = 0x47
@ -95,11 +157,13 @@ const (
const (
// -----------------------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-29 Stream type assignments> <page 66/174>
// 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax
// 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video
// 0x0F AAC (ISO/IEC 13818-7 Audio with ADTS transport syntax)
// 0x1B AVC (video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video)
// 0x24 HEVC (HEVC video stream as defined in Rec. ITU-T H.265 | ISO/IEC 23008-2 MPEG-H Part 2)
// -----------------------------------------------------------------------------
streamTypeAAC uint8 = 0x0F
streamTypeAVC uint8 = 0x1B
streamTypeAAC uint8 = 0x0F
streamTypeAVC uint8 = 0x1B
streamTypeHEVC uint8 = 0x24
)
// PES

@ -21,7 +21,7 @@ func TestParseFixedTSPacket(t *testing.T) {
pat := mpegts.ParsePAT(mpegts.FixedFragmentHeader[5:])
nazalog.Debugf("%+v", pat)
h = mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeader[188:])
h = mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeaderHEVC[188:])
nazalog.Debugf("%+v", h)
pmt := mpegts.ParsePMT(mpegts.FixedFragmentHeader[188+5:])
nazalog.Debugf("%+v", pmt)

@ -9,7 +9,7 @@
package mpegts
type Frame struct {
PTS uint64
PTS uint64 // =(毫秒 * 90)
DTS uint64
CC uint8 // continuity_counter of TS Header
@ -36,6 +36,8 @@ type Frame struct {
//
type OnTSPacket func(packet []byte)
// AnnexB格式的流转换为mpegts packet
//
// @param frame: 各字段含义见mpegts.Frame结构体定义
// frame.CC 注意内部会修改frame.CC的值外部在调用结束后可保存CC的值供下次调用时使用
// frame.Raw 函数调用结束后,内部不会持有该内存块

Loading…
Cancel
Save