You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lal/pkg/hls/muxer.go

422 lines
9.7 KiB
Go

// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"bytes"
"fmt"
"os"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
// 记录fragment的一些信息注意写m3u8文件时可能还需要用到历史fragment的信息
type fragmentInfo struct {
id int // fragment的自增序号
duration float64 // 当前fragment中数据的时长单位秒
discont bool // #EXT-X-DISCONTINUITY
}
type MuxerConfig struct {
Enable bool `json:"enable"`
OutPath string `json:"out_path"`
FragmentDurationMS int `json:"fragment_duration_ms"`
FragmentNum int `json:"fragment_num"`
}
type Muxer struct {
streamName string
outPath string
playlistFilename string
playlistFilenameBak string
config *MuxerConfig
fragmentOP FragmentOP
opened bool
adts aac.ADTS
spspps []byte
videoCC uint8
audioCC uint8
videoOut []byte // 帧
fragTS uint64 // 新建立fragment时的时间戳毫秒 * 90
nfrags int // 大序号增长到winfrags后就增长frag
frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []fragmentInfo // TS文件的环形队列记录TS的信息比如写M3U8文件时要用 2 * winfrags + 1
aaframe []byte
aframePTS uint64 // 最新音频帧的时间戳
}
func NewMuxer(streamName string, config *MuxerConfig) *Muxer {
op := getMuxerOutPath(config.OutPath, streamName)
playlistFilename := getM3U8Filename(op, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
videoOut := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
frags := make([]fragmentInfo, 2*config.FragmentNum+1) // TODO chef: 为什么是 * 2 + 1
return &Muxer{
streamName: streamName,
outPath: op,
playlistFilename: playlistFilename,
playlistFilenameBak: playlistFilenameBak,
config: config,
videoOut: videoOut,
aaframe: nil,
frags: frags,
}
}
func (m *Muxer) Start() {
nazalog.Infof("start hls muxer. streamName=%s", m.streamName)
m.ensureDir()
}
func (m *Muxer) Stop() {
nazalog.Infof("stop hls muxer. streamName=%s", m.streamName)
m.flushAudio()
m.closeFragment()
}
func (m *Muxer) FeedRTMPMessage(msg rtmp.AVMsg) {
switch msg.Header.MsgTypeID {
case rtmp.TypeidAudio:
m.feedAudio(msg)
case rtmp.TypeidVideo:
m.feedVideo(msg)
}
}
// TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接
func (m *Muxer) feedVideo(msg rtmp.AVMsg) {
if len(msg.Payload) < 5 {
nazalog.Errorf("invalid video message length. len=%d", len(msg.Payload))
return
}
if msg.Payload[0]&0xF != 7 {
// TODO chef: HLS视频现在只做了h264的支持
return
}
ftype := msg.Payload[0] & 0xF0 >> 4
htype := msg.Payload[1]
if ftype == 1 && htype == 0 {
m.cacheSPSPPS(msg)
return
}
cts := bele.BEUint24(msg.Payload[2:])
audSent := false
spsppsSent := false
// 优化这块buffer
out := m.videoOut[0:0]
for i := 5; i != len(msg.Payload); {
if i+4 > len(msg.Payload) {
nazalog.Errorf("slice len not enough. i=%d, len=%d", i, len(msg.Payload))
return
}
nalBytes := int(bele.BEUint32(msg.Payload[i:]))
i += 4
if i+nalBytes > len(msg.Payload) {
nazalog.Errorf("slice len not enough. i=%d, payload len=%d, nalBytes=%d", i, len(msg.Payload), nalBytes)
return
}
srcNalType := msg.Payload[i]
nalType := srcNalType & 0x1F
//nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts)
if nalType >= 7 && nalType <= 9 {
nazalog.Warn("should not reach here.")
i += nalBytes
continue
}
if !audSent {
switch nalType {
case 1, 5, 6:
out = append(out, audNal...)
audSent = true
case 9:
audSent = true
}
}
switch nalType {
case 1:
spsppsSent = false
case 5:
if !spsppsSent {
out = m.appendSPSPPS(out)
}
spsppsSent = true
}
if len(out) == 0 {
out = append(out, nalStartCode...)
} else {
out = append(out, nalStartCode3...)
}
out = append(out, msg.Payload[i:i+nalBytes]...)
i += nalBytes
}
var frame mpegTSFrame
frame.cc = m.videoCC
frame.dts = uint64(msg.Header.TimestampAbs) * 90
frame.pts = frame.dts + uint64(cts)*90
frame.pid = PidVideo
frame.sid = streamIDVideo
frame.key = ftype == 1
boundary := frame.key && (!m.opened || m.adts.IsNil() || m.aaframe != nil)
m.updateFragment(frame.dts, boundary, 1)
if !m.opened {
nazalog.Warn("not opened.")
return
}
m.fragmentOP.WriteFrame(&frame, out)
m.videoCC = frame.cc
}
func (m *Muxer) feedAudio(msg rtmp.AVMsg) {
if len(msg.Payload) < 3 {
nazalog.Errorf("invalid audio message length. len=%d", len(msg.Payload))
}
if msg.Payload[0]>>4 != 10 {
// TODO chef: HLS音频现在只做了h264的支持
return
}
if msg.Payload[1] == 0 {
m.cacheAACSeqHeader(msg)
return
}
pts := uint64(msg.Header.TimestampAbs) * 90
m.updateFragment(pts, m.spspps == nil, 2)
if m.aaframe == nil {
m.aframePTS = pts
}
adtsHeader := m.adts.GetADTS(uint16(msg.Header.MsgLen))
m.aaframe = append(m.aaframe, adtsHeader...)
m.aaframe = append(m.aaframe, msg.Payload[2:]...)
}
func (m *Muxer) cacheAACSeqHeader(msg rtmp.AVMsg) {
m.adts.PutAACSequenceHeader(msg.Payload)
}
func (m *Muxer) cacheSPSPPS(msg rtmp.AVMsg) {
m.spspps = msg.Payload
}
func (m *Muxer) appendSPSPPS(out []byte) []byte {
index := 10
nnals := m.spspps[index] & 0x1f
index++
for n := 0; ; n++ {
for ; nnals != 0; nnals-- {
length := int(bele.BEUint16(m.spspps[index:]))
index += 2
out = append(out, nalStartCode...)
out = append(out, m.spspps[index:index+length]...)
index += length
}
if n == 1 {
break
}
nnals = m.spspps[index]
index++
}
return out
}
func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) {
force := false
discont := true
var f *fragmentInfo
if m.opened {
f = m.getFrag(m.nfrags)
// 当前时间戳跳跃很大或者是往回跳跃超过了阈值强制开启新的fragment
maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10)
if (ts > m.fragTS && ts-m.fragTS > maxfraglen) || (m.fragTS > ts && m.fragTS-ts > negMaxfraglen) {
nazalog.Warnf("hls: force fragment split: fragTS=%d, ts=%d", m.fragTS, ts)
force = true
} else {
// TODO chef: 考虑ts比fragTS小的情况
f.duration = float64(ts-m.fragTS) / 90000
discont = false
}
}
// 时长超过设置的ts文件切片阈值才行
if f != nil && f.duration < float64(m.config.FragmentDurationMS)/1000 {
boundary = false
}
// 开启新的fragment
if boundary || force {
m.closeFragment()
m.openFragment(ts, discont)
}
// 音频已经缓存了一定时长的数据了,需要落盘了
if m.opened && m.aaframe != nil && ((m.aframePTS + maxAudioDelay*90/uint64(flushRate)) < ts) {
m.flushAudio()
}
}
func (m *Muxer) openFragment(ts uint64, discont bool) {
if m.opened {
return
}
id := m.getFragmentID()
filename := getTSFilename(m.outPath, m.streamName, id)
m.fragmentOP.OpenFile(filename)
m.opened = true
frag := m.getFrag(m.nfrags)
frag.discont = discont
frag.id = id
m.fragTS = ts
m.flushAudio()
}
func (m *Muxer) closeFragment() {
if !m.opened {
return
}
m.fragmentOP.CloseFile()
m.opened = false
m.nextFrag()
m.writePlaylist()
}
func (m *Muxer) writePlaylist() {
fp, err := os.Create(m.playlistFilenameBak)
nazalog.Assert(nil, err)
// 找出时长最长的fragment
maxFrag := float64(m.config.FragmentDurationMS) / 1000
for i := 0; i < m.nfrags; i++ {
frag := m.getFrag(i)
if frag.duration > maxFrag {
maxFrag = frag.duration + 0.5
}
}
// TODO chef 优化这块buffer的构造
var buf bytes.Buffer
buf.WriteString("#EXTM3U\n")
buf.WriteString("#EXT-X-VERSION:3\n")
buf.WriteString("#EXT-X-ALLOW-CACHE:NO\n")
buf.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(maxFrag)))
buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n\n", m.frag))
for i := 0; i < m.nfrags; i++ {
frag := m.getFrag(i)
if frag.discont {
buf.WriteString("#EXT-X-DISCONTINUITY\n")
}
buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s\n", frag.duration, getTSFilenameWithoutPath(m.streamName, frag.id)))
}
_, err = fp.Write(buf.Bytes())
nazalog.Assert(nil, err)
_ = fp.Close()
err = os.Rename(m.playlistFilenameBak, m.playlistFilename)
nazalog.Assert(nil, err)
}
// 创建文件夹,如果文件夹已经存在,老的文件夹会被删除
func (m *Muxer) ensureDir() {
err := os.RemoveAll(m.outPath)
nazalog.Assert(nil, err)
err = os.MkdirAll(m.outPath, 0777)
nazalog.Assert(nil, err)
}
func (m *Muxer) getFragmentID() int {
return m.frag + m.nfrags
}
func (m *Muxer) getFrag(n int) *fragmentInfo {
return &m.frags[(m.frag+n)%(m.config.FragmentNum*2+1)]
}
// TODO chef: 这个函数重命名为incr更好些
func (m *Muxer) nextFrag() {
if m.nfrags == m.config.FragmentNum {
m.frag++
} else {
m.nfrags++
}
}
// 将音频数据落盘的几种情况:
// 1. open fragment时如果aframe中还有数据
// 2. update fragment时判断音频的时间戳
// 3. 音频队列长度过长时
// 4. 流关闭时
func (m *Muxer) flushAudio() {
if !m.opened {
nazalog.Warn("flushAudio by not opened.")
return
}
if m.aaframe == nil {
nazalog.Warn("flushAudio by aframe is nil.")
return
}
frame := &mpegTSFrame{
pts: m.aframePTS,
dts: m.aframePTS,
pid: PidAudio,
sid: streamIDAudio,
cc: m.audioCC,
key: false,
}
m.fragmentOP.WriteFrame(frame, m.aaframe)
m.audioCC = frame.cc
m.aaframe = nil
}