You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lal/pkg/hls/muxer.go

406 lines
9.0 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"bytes"
"fmt"
"os"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
// 记录fragment的一些信息注意写m3u8文件时可能还需要用到历史fragment的信息
type fragmentInfo struct {
id int // fragment的自增序号
duration float64 // 当前fragment中数据的时长单位秒
discont bool // #EXT-X-DISCONTINUITY
}
type MuxerConfig struct {
OutPath string `json:"out_path"`
FragmentDurationMS int `json:"fragment_duration_ms"`
FragmentNum int `json:"fragment_num"`
}
type Muxer struct {
streamName string
outPath string
playlistFilename string
playlistFilenameBak string
config *MuxerConfig
fragmentOP FragmentOP
opened bool
adts aac.ADTS
spspps []byte
videoCC uint8
audioCC uint8
videoOut []byte // 帧
fragTS uint64 // 新建立fragment时的时间戳毫秒 * 90
nfrags int // 大序号增长到winfrags后就增长frag
frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []fragmentInfo // TS文件的环形队列记录TS的信息比如写M3U8文件时要用 2 * winfrags + 1
aaframe []byte
aframePTS uint64 // 最新音频帧的时间戳
}
func NewMuxer(streamName string, config *MuxerConfig) *Muxer {
op := getMuxerOutPath(config.OutPath, streamName)
playlistFilename := getM3U8Filename(op, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
videoOut := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
frags := make([]fragmentInfo, 2*config.FragmentNum+1) // TODO chef: 为什么是 * 2 + 1
return &Muxer{
streamName: streamName,
outPath: op,
playlistFilename: playlistFilename,
playlistFilenameBak: playlistFilenameBak,
config: config,
videoOut: videoOut,
aaframe: nil,
frags: frags,
}
}
func (m *Muxer) Start() {
nazalog.Infof("start hls muxer. streamName=%s", m.streamName)
m.ensureDir()
}
func (m *Muxer) Stop() {
nazalog.Infof("stop hls muxer. streamName=%s", m.streamName)
m.flushAudio()
m.closeFragment()
}
func (m *Muxer) FeedRTMPMessage(msg rtmp.AVMsg) {
switch msg.Header.MsgTypeID {
case rtmp.TypeidAudio:
m.feedAudio(msg)
case rtmp.TypeidVideo:
m.feedVideo(msg)
}
}
func (m *Muxer) feedVideo(msg rtmp.AVMsg) {
if msg.Payload[0]&0xF != 7 {
// TODO chef: HLS视频现在只做了h264的支持
return
}
ftype := msg.Payload[0] & 0xF0 >> 4
htype := msg.Payload[1]
if ftype == 1 && htype == 0 {
m.cacheSPSPPS(msg)
return
}
cts := bele.BEUint24(msg.Payload[2:])
audSent := false
spsppsSent := false
// 优化这块buffer
out := m.videoOut[0:0]
for i := 5; i != len(msg.Payload); {
nalBytes := int(bele.BEUint32(msg.Payload[i:]))
i += 4
srcNalType := msg.Payload[i]
nalType := srcNalType & 0x1F
//nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts)
if nalType >= 7 && nalType <= 9 {
nazalog.Warn("should not reach here.")
i += nalBytes
continue
}
if !audSent {
switch nalType {
case 1, 5, 6:
out = append(out, audNal...)
audSent = true
case 9:
audSent = true
}
}
switch nalType {
case 1:
spsppsSent = false
case 5:
if !spsppsSent {
out = m.appendSPSPPS(out)
}
spsppsSent = true
}
if len(out) == 0 {
out = append(out, nalStartCode...)
} else {
out = append(out, nalStartCode3...)
}
out = append(out, msg.Payload[i:i+nalBytes]...)
i += nalBytes
}
var frame mpegTSFrame
frame.cc = m.videoCC
frame.dts = uint64(msg.Header.TimestampAbs) * 90
frame.pts = frame.dts + uint64(cts)*90
frame.pid = PidVideo
frame.sid = streamIDVideo
frame.key = ftype == 1
boundary := frame.key && (!m.opened || m.adts.IsNil() || m.aaframe != nil)
m.updateFragment(frame.dts, boundary, 1)
if !m.opened {
nazalog.Warn("not opened.")
return
}
m.fragmentOP.WriteFrame(&frame, out)
m.videoCC = frame.cc
}
func (m *Muxer) feedAudio(msg rtmp.AVMsg) {
if msg.Payload[0]>>4 != 10 {
// TODO chef: HLS音频现在只做了h264的支持
return
}
if msg.Payload[1] == 0 {
m.cacheAACSeqHeader(msg)
return
}
pts := uint64(msg.Header.TimestampAbs) * 90
m.updateFragment(pts, m.spspps == nil, 2)
if m.aaframe == nil {
m.aframePTS = pts
}
adtsHeader := m.adts.GetADTS(uint16(msg.Header.MsgLen))
m.aaframe = append(m.aaframe, adtsHeader...)
m.aaframe = append(m.aaframe, msg.Payload[2:]...)
}
func (m *Muxer) cacheAACSeqHeader(msg rtmp.AVMsg) {
m.adts.PutAACSequenceHeader(msg.Payload)
}
func (m *Muxer) cacheSPSPPS(msg rtmp.AVMsg) {
m.spspps = msg.Payload
}
func (m *Muxer) appendSPSPPS(out []byte) []byte {
index := 10
nnals := m.spspps[index] & 0x1f
index++
for n := 0; ; n++ {
for ; nnals != 0; nnals-- {
length := int(bele.BEUint16(m.spspps[index:]))
index += 2
out = append(out, nalStartCode...)
out = append(out, m.spspps[index:index+length]...)
index += length
}
if n == 1 {
break
}
nnals = m.spspps[index]
index++
}
return out
}
func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) {
force := false
discont := true
var f *fragmentInfo
if m.opened {
f = m.getFrag(m.nfrags)
// 当前时间戳跳跃很大或者是往回跳跃超过了阈值强制开启新的fragment
maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10)
if (ts > m.fragTS && ts-m.fragTS > maxfraglen) || (m.fragTS > ts && m.fragTS-ts > negMaxfraglen) {
nazalog.Warnf("hls: force fragment split: fragTS=%d, ts=%d", m.fragTS, ts)
force = true
} else {
// TODO chef: 考虑ts比fragTS小的情况
f.duration = float64(ts-m.fragTS) / 90000
discont = false
}
}
// 时长超过设置的ts文件切片阈值才行
if f != nil && f.duration < float64(m.config.FragmentDurationMS)/1000 {
boundary = false
}
// 开启新的fragment
if boundary || force {
m.closeFragment()
m.openFragment(ts, discont)
}
// 音频已经缓存了一定时长的数据了,需要落盘了
if m.opened && m.aaframe != nil && ((m.aframePTS + maxAudioDelay*90/uint64(flushRate)) < ts) {
m.flushAudio()
}
}
func (m *Muxer) openFragment(ts uint64, discont bool) {
if m.opened {
return
}
id := m.getFragmentID()
filename := getTSFilename(m.outPath, m.streamName, id)
m.fragmentOP.OpenFile(filename)
m.opened = true
frag := m.getFrag(m.nfrags)
frag.discont = discont
frag.id = id
m.fragTS = ts
m.flushAudio()
}
func (m *Muxer) closeFragment() {
if !m.opened {
return
}
m.fragmentOP.CloseFile()
m.opened = false
m.nextFrag()
m.writePlaylist()
}
func (m *Muxer) writePlaylist() {
fp, err := os.Create(m.playlistFilenameBak)
nazalog.Assert(nil, err)
// 找出时长最长的fragment
maxFrag := float64(m.config.FragmentDurationMS) / 1000
for i := 0; i < m.nfrags; i++ {
frag := m.getFrag(i)
if frag.duration > maxFrag {
maxFrag = frag.duration + 0.5
}
}
// TODO chef 优化这块buffer的构造
var buf bytes.Buffer
buf.WriteString("#EXTM3U\n")
buf.WriteString("#EXT-X-VERSION:3\n")
buf.WriteString("#EXT-X-ALLOW-CACHE:NO\n")
buf.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(maxFrag)))
buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n\n", m.frag))
for i := 0; i < m.nfrags; i++ {
frag := m.getFrag(i)
if frag.discont {
buf.WriteString("#EXT-X-DISCONTINUITY\n")
}
buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s\n", frag.duration, getTSFilenameWithoutPath(m.streamName, frag.id)))
}
_, err = fp.Write(buf.Bytes())
nazalog.Assert(nil, err)
_ = fp.Close()
err = os.Rename(m.playlistFilenameBak, m.playlistFilename)
nazalog.Assert(nil, err)
}
// 创建文件夹,如果文件夹已经存在,老的文件夹会被删除
func (m *Muxer) ensureDir() {
err := os.RemoveAll(m.outPath)
nazalog.Assert(nil, err)
err = os.MkdirAll(m.outPath, 0777)
nazalog.Assert(nil, err)
}
func (m *Muxer) getFragmentID() int {
return m.frag + m.nfrags
}
func (m *Muxer) getFrag(n int) *fragmentInfo {
return &m.frags[(m.frag+n)%(m.config.FragmentNum*2+1)]
}
// TODO chef: 这个函数重命名为incr更好些
func (m *Muxer) nextFrag() {
if m.nfrags == m.config.FragmentNum {
m.frag++
} else {
m.nfrags++
}
}
// 将音频数据落盘的几种情况:
// 1. open fragment时如果aframe中还有数据
// 2. update fragment时判断音频的时间戳
// 3. 音频队列长度过长时
// 4. 流关闭时
func (m *Muxer) flushAudio() {
if !m.opened {
nazalog.Warn("flushAudio by not opened.")
return
}
if m.aaframe == nil {
nazalog.Warn("flushAudio by aframe is nil.")
return
}
frame := &mpegTSFrame{
pts: m.aframePTS,
dts: m.aframePTS,
pid: PidAudio,
sid: streamIDAudio,
cc: m.audioCC,
key: false,
}
m.fragmentOP.WriteFrame(frame, m.aaframe)
m.audioCC = frame.cc
m.aaframe = nil
}