1. [feat] 支持HTTP-TS长连接拉流 2. [feat] 在hls直播基础上,同时支持hls录制回放功能

pull/29/head
q191201771 5 years ago
parent cb5e9f82ee
commit 0f94f8d4b2

@ -87,7 +87,7 @@ func main() {
content, err := ioutil.ReadFile(filename)
nazalog.Assert(nil, err)
packets := hls.SplitFragment2TSPackets(content)
packets, _ := hls.SplitFragment2TSPackets(content)
for _, packet := range packets {
handlePacket(packet)

@ -21,8 +21,8 @@ import (
// 临时小工具比较两个TS文件。注意该程序还没有写完。
var filename1 = "/tmp/lal/hls/innertest/innertest-0.ts"
var filename2 = "/tmp/lal/hls/new/innertest-0.ts"
var filename1 = "/tmp/lal/hls/innertest/innertest-7.ts"
var filename2 = "/tmp/lal/hls/innertest.bak/innertest-7.ts"
func skipPacketFilter(tss [][]byte) (ret [][]byte) {
for _, ts := range tss {
@ -65,8 +65,8 @@ func main() {
content2, err := ioutil.ReadFile(filename2)
nazalog.Assert(nil, err)
tss1 := hls.SplitFragment2TSPackets(content1)
tss2 := hls.SplitFragment2TSPackets(content2)
tss1, _ := hls.SplitFragment2TSPackets(content1)
tss2, _ := hls.SplitFragment2TSPackets(content2)
nazalog.Debugf("num of ts1=%d, num of ts2=%d", len(tss1), len(tss2))
@ -87,7 +87,6 @@ func main() {
parsePacket(tss2[i])
nazalog.Debugf("\n%s", hex.Dump(tss1[i]))
nazalog.Debugf("\n%s", hex.Dump(tss2[i]))
//break
}
}

@ -6,16 +6,20 @@
},
"httpflv": {
"enable": true,
"sub_listen_addr": ":8082",
"sub_listen_addr": ":8090",
"gop_num": 2
},
"hls": {
"enable": false,
"sub_listen_addr": ":8083",
"sub_listen_addr": ":8091",
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6
},
"httpts": {
"enable": true,
"sub_listen_addr": ":8092"
},
"relay_push": {
"enable": true,
"addr_list":[

@ -16,6 +16,10 @@
"fragment_duration_ms": 3000,
"fragment_num": 6
},
"httpts": {
"enable": true,
"sub_listen_addr": ":8082"
},
"rtsp": {
"enable": true,
"addr": ":5544"

@ -16,6 +16,10 @@
"fragment_duration_ms": 3000, // 单个TS文件切片时长单位毫秒
"fragment_num": 6 // M3U8文件列表中TS文件的数量
},
"httpts": {
"enable": true, // 是否开启HTTP-TS服务的监听。注意这并不是HLS中的TS而是在一条HTTP长连接上持续性传输TS流
"sub_listen_addr": ":8082" // HTTP-TS拉流地址
},
"rtsp": {
"enable": true, // 是否开启rtsp服务的监听目前只支持rtsp推流
"addr": ":5544" // rtsp推流地址

@ -16,6 +16,10 @@
"fragment_duration_ms": 3000,
"fragment_num": 6
},
"httpts": {
"enable": true,
"sub_listen_addr": ":8082"
},
"rtsp": {
"enable": true,
"addr": ":5544"

@ -55,7 +55,7 @@ const (
// AACAUDIODATA
// AACPacketType UI8
// Data UI8[n]
RTMPSoundFormatACC uint8 = 10
RTMPSoundFormatAAC uint8 = 10
RTMPAACPacketTypeSeqHeader = 0
RTMPAACPacketTypeRaw = 1
)
@ -98,5 +98,5 @@ func (msg RTMPMsg) IsVideoKeyNALU() bool {
}
func (msg RTMPMsg) IsAACSeqHeader() bool {
return msg.Header.MsgTypeID == RTMPTypeIDAudio && (msg.Payload[0]>>4) == RTMPSoundFormatACC && msg.Payload[1] == RTMPAACPacketTypeSeqHeader
return msg.Header.MsgTypeID == RTMPTypeIDAudio && (msg.Payload[0]>>4) == RTMPSoundFormatAAC && msg.Payload[1] == RTMPAACPacketTypeSeqHeader
}

@ -16,7 +16,7 @@ import "strings"
// 并且将这些信息打入可执行文件、日志、各协议中的标准版本字段中
// 版本该变量由build脚本修改维护
var LALVersion = "v0.13.0"
var LALVersion = "v0.14.0"
var (
LALLibraryName = "lal"
@ -62,6 +62,9 @@ var (
// e.g. lal0.12.3
LALRTSPOptionsResponseServer string
// e.g. lal0.12.3
LALHTTPTSSubSessionServer string
)
// - rtmp handshake random buf
@ -86,6 +89,8 @@ var (
// - rtsp
// - Options response `Server:`
//
// - httpts sub
// - `server:`
func init() {
LALVersionDot = strings.TrimPrefix(LALVersion, "v")
@ -99,6 +104,7 @@ func init() {
LALHLSM3U8Server = LALLibraryName + LALVersionDot
LALHLSTSServer = LALLibraryName + LALVersionDot
LALRTSPOptionsResponseServer = LALLibraryName + LALVersionDot
LALHTTPTSSubSessionServer = LALLibraryName + LALVersionDot
LALHTTPFLVPullSessionUA = LALLibraryName + "/" + LALVersionDot

@ -23,20 +23,15 @@ func (f *Fragment) OpenFile(filename string) (err error) {
if err != nil {
return
}
f.writeFile(mpegts.FixedFragmentHeader)
return nil
err = f.WriteFile(mpegts.FixedFragmentHeader)
return
}
func (f *Fragment) WriteFrame(frame *mpegts.Frame) {
mpegts.PackTSPacket(frame, func(packet []byte, cc uint8) {
f.writeFile(packet)
})
func (f *Fragment) WriteFile(b []byte) (err error) {
_, err = f.fp.Write(b)
return
}
func (f *Fragment) CloseFile() {
_ = f.fp.Close()
}
func (f *Fragment) writeFile(b []byte) {
_, _ = f.fp.Write(b)
func (f *Fragment) CloseFile() error {
return f.fp.Close()
}

@ -8,14 +8,14 @@
package hls
import "errors"
// TODO chef:
// - 支持HEVC
// - 检查所有的容错处理,是否会出现
// - 补充单元测试
// - 配置项
// - Server
// - 超时时间
// - 考虑删除过期的TS文件并考虑做一个全量TS的m3u8作为点播用
// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/incorporating_ads_into_a_playlist
// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/event_playlist_construction
@ -29,6 +29,8 @@ package hls
// 进来的数据称为Frame帧188字节的封装称为TSPacket包TS文件称为Fragment
var ErrHLS = errors.New("lal.hls: fxxk")
var audNal = []byte{
0x00, 0x00, 0x00, 0x01, 0x09, 0xf0,
}
@ -40,8 +42,9 @@ const (
maxAudioCacheDelayByVideo uint64 = 300 * 90 // 单位(毫秒*90
)
func SplitFragment2TSPackets(content []byte) (ret [][]byte) {
func SplitFragment2TSPackets(content []byte) (ret [][]byte, err error) {
if len(content)%188 != 0 {
err = ErrHLS
return
}
for {

@ -11,30 +11,32 @@ package hls
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"strconv"
"time"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/naza/pkg/unique"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
// 记录fragment的一些信息注意写m3u8文件时可能还需要用到历史fragment的信息
type fragmentInfo struct {
id int // fragment的自增序号
duration float64 // 当前fragment中数据的时长单位秒
discont bool // #EXT-X-DISCONTINUITY
// TODO chef: 转换TS流的功能通过回调供httpts使用也放在了Muxer中好处是hls和httpts可以共用一份TS流。
// 后续从架构上考虑packet hls,mpegts,logic的分工
type MuxerObserver interface {
// @param rawFrame TS流回调结束后内部不再使用该内存块
// @param boundary 新的TS流接收者应该从该标志为true时开始发送数据
//
OnTSPackets(rawFrame []byte, boundary bool)
}
type MuxerConfig struct {
Enable bool `json:"enable"` // 如果false说明hls功能没开也即不写磁盘但是MuxerObserver依然会回调
OutPath string `json:"out_path"`
FragmentDurationMS int `json:"fragment_duration_ms"`
FragmentNum int `json:"fragment_num"`
@ -43,50 +45,61 @@ type MuxerConfig struct {
type Muxer struct {
UniqueKey string
streamName string
outPath string
playlistFilename string
playlistFilenameBak string
streamName string
outPath string
playlistFilename string
playlistFilenameBak string
recordPlayListFilename string
recordPlayListFilenameBak string
config *MuxerConfig
config *MuxerConfig
observer MuxerObserver
fragment Fragment
opened bool
adts aac.ADTS
spspps []byte // AnnexB
videoCC uint8
audioCC uint8
videoOut []byte // 帧
fragTS uint64 // 新建立fragment时的时间戳毫秒 * 90
fragTS uint64 // 新建立fragment时的时间戳毫秒 * 90
nfrags int // 大序号增长到winfrags后就增长frag
frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []fragmentInfo // TS文件的环形队列记录TS的信息比如写M3U8文件时要用 2 * winfrags + 1
recordMaxFragDuration float64
nfrags int // 大序号增长到winfrags后就增长frag
frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []fragmentInfo // TS文件的环形队列记录TS的信息比如写M3U8文件时要用 2 * winfrags + 1
streamer *Streamer
}
audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧
audioCacheFirstFramePTS uint64 // audioCacheFrames中第一个音频帧的时间戳
// 记录fragment的一些信息注意写m3u8文件时可能还需要用到历史fragment的信息
type fragmentInfo struct {
id int // fragment的自增序号
duration float64 // 当前fragment中数据的时长单位秒
discont bool // #EXT-X-DISCONTINUITY
filename string
}
func NewMuxer(streamName string, config *MuxerConfig) *Muxer {
// @param observer 可以为nil如果不为nilTS流将回调给上层
func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *Muxer {
uk := unique.GenUniqueKey("HLSMUXER")
op := getMuxerOutPath(config.OutPath, streamName)
playlistFilename := getM3U8Filename(op, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
videoOut := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
recordPlaylistFilename := getRecordM3U8Filename(op, streamName)
recordPlaylistFilenameBak := fmt.Sprintf("%s.bak", recordPlaylistFilename)
frags := make([]fragmentInfo, 2*config.FragmentNum+1) // TODO chef: 为什么是 * 2 + 1
m := &Muxer{
UniqueKey: uk,
streamName: streamName,
outPath: op,
playlistFilename: playlistFilename,
playlistFilenameBak: playlistFilenameBak,
config: config,
videoOut: videoOut,
audioCacheFrames: nil,
frags: frags,
UniqueKey: uk,
streamName: streamName,
outPath: op,
playlistFilename: playlistFilename,
playlistFilenameBak: playlistFilenameBak,
recordPlayListFilename: recordPlaylistFilename,
recordPlayListFilenameBak: recordPlaylistFilenameBak,
config: config,
observer: observer,
frags: frags,
}
streamer := NewStreamer(m)
m.streamer = streamer
nazalog.Infof("[%s] lifecycle new hls muxer. muxer=%p, streamName=%s", uk, m, streamName)
return m
}
@ -98,278 +111,278 @@ func (m *Muxer) Start() {
func (m *Muxer) Dispose() {
nazalog.Infof("[%s] lifecycle dispose hls muxer.", m.UniqueKey)
m.flushAudio()
m.closeFragment(true)
m.streamer.FlushAudio()
if err := m.closeFragment(true); err != nil {
nazalog.Errorf("[%s] close fragment error. err=%+v", m.UniqueKey, err)
}
}
// @param msg 函数调用结束后内部不持有msg中的内存块
//
func (m *Muxer) FeedRTMPMessage(msg base.RTMPMsg) {
switch msg.Header.MsgTypeID {
case base.RTMPTypeIDAudio:
m.feedAudio(msg)
case base.RTMPTypeIDVideo:
m.feedVideo(msg)
}
m.streamer.FeedRTMPMessage(msg)
}
// TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接
func (m *Muxer) feedVideo(msg base.RTMPMsg) {
if len(msg.Payload) < 5 {
nazalog.Errorf("[%s] invalid video message length. len=%d", m.UniqueKey, len(msg.Payload))
return
}
if msg.Payload[0]&0xF != 7 {
// TODO chef: HLS视频现在只做了h264的支持
return
}
ftype := msg.Payload[0] & 0xF0 >> 4
htype := msg.Payload[1]
// 将数据转换成AnnexB
func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) {
var boundary bool
var packets []byte
// 如果是sps pps缓存住然后直接返回
if ftype == 1 && htype == 0 {
if err := m.cacheSPSPPS(msg); err != nil {
nazalog.Errorf("[%s] cache spspps failed. err=%+v", m.UniqueKey, err)
if frame.Sid == mpegts.StreamIDAudio {
// 为了考虑没有视频的情况也能切片所以这里判断spspps为空时也建议生成fragment
boundary = !streamer.VideoSeqHeaderCached()
if err := m.updateFragment(frame.PTS, boundary); err != nil {
nazalog.Errorf("[%s] update fragment error. err=%+v", m.UniqueKey, err)
return
}
return
}
cts := bele.BEUint24(msg.Payload[2:])
audSent := false
spsppsSent := false
// 优化这块buffer
out := m.videoOut[0:0]
// tag中可能有多个NALU逐个获取
for i := 5; i != len(msg.Payload); {
if i+4 > len(msg.Payload) {
nazalog.Errorf("[%s] slice len not enough. i=%d, len=%d", m.UniqueKey, i, len(msg.Payload))
if !m.opened {
nazalog.Warnf("[%s] OnFrame A not opened.", m.UniqueKey)
return
}
nalBytes := int(bele.BEUint32(msg.Payload[i:]))
i += 4
if i+nalBytes > len(msg.Payload) {
nazalog.Errorf("[%s] slice len not enough. i=%d, payload len=%d, nalBytes=%d", m.UniqueKey, i, len(msg.Payload), nalBytes)
//nazalog.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
} else {
// 收到视频可能触发建立fragment的条件是
// 关键帧数据 &&
// ((没有收到过音频seq header) || -> 只有视频
// (收到过音频seq header && fragment没有打开) || -> 音视频都有且都已ready
// (收到过音频seq header && fragment已经打开 && 音频缓存数据不为空) -> 为什么音频缓存需不为空?
// )
boundary = frame.Key && (!streamer.AudioSeqHeaderCached() || !m.opened || !streamer.AudioCacheEmpty())
if err := m.updateFragment(frame.DTS, boundary); err != nil {
nazalog.Errorf("[%s] update fragment error. err=%+v", m.UniqueKey, err)
return
}
nalType := avc.ParseNALUType(msg.Payload[i])
//nazalog.Debugf("[%s] hls: h264 NAL type=%d, len=%d(%d) cts=%d.", m.UniqueKey, nalType, nalBytes, len(msg.Payload), cts)
// sps pps前面已经缓存过了这里就不用处理了
// aud有自己的生产逻辑原流中的aud直接过滤掉
if nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD {
i += nalBytes
continue
if !m.opened {
nazalog.Warnf("[%s] OnFrame V not opened.", m.UniqueKey)
return
}
if !audSent {
switch nalType {
case avc.NALUTypeSlice, avc.NALUTypeIDRSlice, avc.NALUTypeSEI:
// 在前面写入aud
out = append(out, audNal...)
audSent = true
//case avc.NALUTypeAUD:
// // 上面aud已经continue跳过了应该进不到这个分支可以考虑删除这个分支代码
// audSent = true
}
}
//nazalog.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
}
switch nalType {
case avc.NALUTypeSlice:
spsppsSent = false
case avc.NALUTypeIDRSlice:
// 如果是关键帧在前面写入sps pps
if !spsppsSent {
out = m.appendSPSPPS(out)
mpegts.PackTSPacket(frame, func(packet []byte) {
if m.config.Enable {
if err := m.fragment.WriteFile(packet); err != nil {
nazalog.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err)
return
}
spsppsSent = true
}
// 这里不知为什么要区分写入两种类型的start code
if len(out) == 0 {
out = append(out, avc.NALUStartCode4...)
} else {
out = append(out, avc.NALUStartCode3...)
if m.observer != nil {
packets = append(packets, packet...)
}
out = append(out, msg.Payload[i:i+nalBytes]...)
i += nalBytes
}
key := ftype == 1
dts := uint64(msg.Header.TimestampAbs) * 90
boundary := key && (!m.opened || !m.adts.HasInited() || m.audioCacheFrames != nil)
m.updateFragment(dts, boundary, false)
if !m.opened {
nazalog.Warnf("[%s] not opened.", m.UniqueKey)
return
}
var frame mpegts.Frame
frame.CC = m.videoCC
frame.DTS = dts
frame.PTS = frame.DTS + uint64(cts)*90
frame.Key = key
frame.Raw = out
frame.Pid = mpegts.PidVideo
frame.Sid = mpegts.StreamIDVideo
//nazalog.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
m.fragment.WriteFrame(&frame)
m.videoCC = frame.CC
}
func (m *Muxer) feedAudio(msg base.RTMPMsg) {
if len(msg.Payload) < 3 {
nazalog.Errorf("[%s] invalid audio message length. len=%d", m.UniqueKey, len(msg.Payload))
}
if msg.Payload[0]>>4 != 10 {
return
}
//nazalog.Debugf("[%s] hls: feedAudio. dts=%d len=%d", m.UniqueKey, msg.Header.TimestampAbs, len(msg.Payload))
if msg.Payload[1] == 0 {
m.cacheAACSeqHeader(msg)
return
}
if !m.adts.HasInited() {
nazalog.Warnf("[%s] feed audio message but aac seq header not exist.", m.UniqueKey)
return
}
pts := uint64(msg.Header.TimestampAbs) * 90
m.updateFragment(pts, m.spspps == nil, true)
if m.audioCacheFrames == nil {
m.audioCacheFirstFramePTS = pts
}
adtsHeader, _ := m.adts.CalcADTSHeader(uint16(msg.Header.MsgLen - 2))
m.audioCacheFrames = append(m.audioCacheFrames, adtsHeader...)
m.audioCacheFrames = append(m.audioCacheFrames, msg.Payload[2:]...)
}
func (m *Muxer) cacheAACSeqHeader(msg base.RTMPMsg) {
_ = m.adts.InitWithAACAudioSpecificConfig(msg.Payload[2:])
}
func (m *Muxer) cacheSPSPPS(msg base.RTMPMsg) error {
var err error
m.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload)
return err
}
func (m *Muxer) appendSPSPPS(out []byte) []byte {
if m.spspps == nil {
nazalog.Warnf("[%s] append spspps by not exist.", m.UniqueKey)
return out
})
if m.observer != nil {
m.observer.OnTSPackets(packets, boundary)
}
out = append(out, m.spspps...)
return out
}
// 决定是否开启新的TS切片文件注意可能已经有TS切片也可能没有这是第一个切片,以及落盘音频数据
// 决定是否开启新的TS切片文件注意可能已经有TS切片也可能没有这是第一个切片
//
// @param boundary 调用方认为可能是开启新TS切片的时间点
// @param isByAudio 触发该函数调用,是因为收到音频数据,还是视频数据
// @param boundary 调用方认为可能是开启新TS切片的时间点
//
func (m *Muxer) updateFragment(ts uint64, boundary bool, isByAudio bool) {
force := false
func (m *Muxer) updateFragment(ts uint64, boundary bool) error {
discont := true
var f *fragmentInfo
// 如果已经有TS切片检查是否需要强制开启新的切片以及切片是否发生跳跃
// 注意,音频和视频是在一起检查的
if m.opened {
f = m.getCurrFrag()
f := m.getCurrFrag()
// 当前时间戳跳跃很大或者是往回跳跃超过了阈值强制开启新的fragment
maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10)
if (ts > m.fragTS && ts-m.fragTS > maxfraglen) || (m.fragTS > ts && m.fragTS-ts > negMaxfraglen) {
nazalog.Warnf("[%s] force fragment split. fragTS=%d, ts=%d", m.UniqueKey, m.fragTS, ts)
force = true
} else {
// TODO chef: 考虑ts比fragTS小的情况
f.duration = float64(ts-m.fragTS) / 90000
discont = false
if err := m.closeFragment(false); err != nil {
return err
}
if err := m.openFragment(ts, true); err != nil {
return err
}
}
// 已经有TS切片那么只有当前fragment的时长超过设置的TS切片阈值才开启新的切片
// 更新当前分片的时间长度
//
// TODO chef:
// f.duration也即写入m3u8中记录分片时间长度的做法我觉得有问题
// 此处用最新收到的数据更新f.duration
// 但是假设fragment翻滚数据可能是写入下一个分片中
// 是否就导致了f.duration和实际分片时间长度不一致
if ts > m.fragTS {
duration := float64(ts-m.fragTS) / 90000
if duration > f.duration {
f.duration = duration
}
}
discont = false
// 已经有TS切片切片时长没有达到设置的阈值则不开启新的切片
if f.duration < float64(m.config.FragmentDurationMS)/1000 {
boundary = false
return nil
}
}
// 开启新的fragment
if boundary || force {
m.closeFragment(false)
m.openFragment(ts, discont)
if boundary {
if err := m.closeFragment(false); err != nil {
return err
}
if err := m.openFragment(ts, discont); err != nil {
return err
}
}
// 音频已经缓存了一定时长的数据了,需要落盘了
var maxAudioDelay uint64
if isByAudio {
maxAudioDelay = maxAudioCacheDelayByAudio
} else {
maxAudioDelay = maxAudioCacheDelayByVideo
}
if m.opened && m.audioCacheFrames != nil && ((m.audioCacheFirstFramePTS + maxAudioDelay) < ts) {
m.flushAudio()
}
return nil
}
// @param discont 不连续标志会在m3u8文件的fragment前增加`#EXT-X-DISCONTINUITY`
func (m *Muxer) openFragment(ts uint64, discont bool) {
//
func (m *Muxer) openFragment(ts uint64, discont bool) error {
if m.opened {
return
return ErrHLS
}
id := m.getFragmentID()
filename := getTSFilename(m.outPath, m.streamName, id)
_ = m.fragment.OpenFile(filename)
filename := getTSFilename(m.streamName, id, int(time.Now().Unix()))
filenameWithPath := getTSFilenameWithPath(m.outPath, filename)
if m.config.Enable {
if err := m.fragment.OpenFile(filenameWithPath); err != nil {
return err
}
}
m.opened = true
frag := m.getCurrFrag()
frag.discont = discont
frag.id = id
frag.filename = filename
frag.duration = 0
m.fragTS = ts
m.flushAudio()
// nrm said: start fragment with audio to make iPhone happy
m.streamer.FlushAudio()
return nil
}
func (m *Muxer) closeFragment(isLast bool) {
func (m *Muxer) closeFragment(isLast bool) error {
if !m.opened {
return
// 注意首次调用closeFragment时有可能opened为false
return nil
}
m.fragment.CloseFile()
if m.config.Enable {
if err := m.fragment.CloseFile(); err != nil {
return err
}
}
m.opened = false
//更新序号,为下个分片准备好
m.incrFrag()
m.writePlaylist(isLast)
m.writeRecordPlaylist(isLast)
return nil
}
func (m *Muxer) writeRecordPlaylist(isLast bool) {
if !m.config.Enable {
return
}
//frag := m.getCurrFrag()
frag := m.getFrag(m.nfrags - 1)
if frag.duration > m.recordMaxFragDuration {
m.recordMaxFragDuration = frag.duration + 0.5
}
fragLines := fmt.Sprintf("#EXTINF:%.3f,\n%s\n", frag.duration, frag.filename)
content, err := ioutil.ReadFile(m.recordPlayListFilename)
if err == nil {
// m3u8文件已经存在
content = bytes.TrimSuffix(content, []byte("#EXT-X-ENDLIST\n"))
// 更新 #EXT-X-TARGETDURATION
l := bytes.Index(content, []byte("#EXT-X-TARGETDURATION:"))
if l == -1 {
nazalog.Errorf("m3u8 file format invalid. file=%s", m.recordPlayListFilename)
return
}
r := bytes.Index(content[l:], []byte{'\n'})
if r == -1 {
nazalog.Errorf("m3u8 file format invalid. file=%s", m.recordPlayListFilename)
return
}
oldDurationStr := bytes.TrimPrefix(content[l:l+r], []byte("#EXT-X-TARGETDURATION:"))
oldDuration, err := strconv.Atoi(string(oldDurationStr))
if err != nil {
nazalog.Errorf("m3u8 file format invalid. file=%s", m.recordPlayListFilename)
return
}
if int(m.recordMaxFragDuration) > oldDuration {
tmpContent := make([]byte, l)
copy(tmpContent, content[:l])
tmpContent = append(tmpContent, []byte(fmt.Sprintf("#EXT-X-TARGETDURATION:%d", int(m.recordMaxFragDuration)))...)
tmpContent = append(tmpContent, content[l+r:]...)
content = tmpContent
}
if frag.discont {
content = append(content, []byte("#EXT-X-DISCONTINUITY\n")...)
}
content = append(content, []byte(fragLines)...)
content = append(content, []byte("#EXT-X-ENDLIST\n")...)
} else {
// m3u8文件不存在
var buf bytes.Buffer
buf.WriteString("#EXTM3U\n")
buf.WriteString("#EXT-X-VERSION:3\n")
buf.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(m.recordMaxFragDuration)))
buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n\n", 0))
if frag.discont {
buf.WriteString("#EXT-X-DISCONTINUITY\n")
}
buf.WriteString(fragLines)
buf.WriteString("#EXT-X-ENDLIST\n")
content = buf.Bytes()
}
var fp *os.File
if fp, err = os.Create(m.recordPlayListFilenameBak); err != nil {
nazalog.Errorf("[%s] create file failed. file=%s, err=%+v", m.UniqueKey, m.recordPlayListFilenameBak, err)
return
}
if _, err = fp.Write(content); err != nil {
nazalog.Errorf("[%s] write file failed. file=%s, err=%+v", m.UniqueKey, m.recordPlayListFilenameBak, err)
return
}
if err = fp.Close(); err != nil {
nazalog.Errorf("[%s] close file failed. file=%s, err=%+v", m.UniqueKey, m.recordPlayListFilenameBak, err)
return
}
if err = os.Rename(m.recordPlayListFilenameBak, m.recordPlayListFilename); err != nil {
nazalog.Errorf("[%s] rename file failed. err=%+v", m.UniqueKey, err)
return
}
}
func (m *Muxer) writePlaylist(isLast bool) {
fp, err := os.Create(m.playlistFilenameBak)
nazalog.Assert(nil, err)
if !m.config.Enable {
return
}
// 找出时长最长的fragment
maxFrag := float64(m.config.FragmentDurationMS) / 1000
@ -395,25 +408,40 @@ func (m *Muxer) writePlaylist(isLast bool) {
buf.WriteString("#EXT-X-DISCONTINUITY\n")
}
buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s\n", frag.duration, getTSFilenameWithoutPath(m.streamName, frag.id)))
buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s\n", frag.duration, frag.filename))
}
if isLast {
buf.WriteString("#EXT-X-ENDLIST\n")
}
_, err = fp.Write(buf.Bytes())
nazalog.Assert(nil, err)
_ = fp.Close()
err = os.Rename(m.playlistFilenameBak, m.playlistFilename)
nazalog.Assert(nil, err)
var fp *os.File
var err error
if fp, err = os.Create(m.playlistFilenameBak); err != nil {
nazalog.Errorf("[%s] create file failed. file=%s, err=%+v", m.UniqueKey, m.playlistFilenameBak, err)
return
}
if _, err = fp.Write(buf.Bytes()); err != nil {
nazalog.Errorf("[%s] write file failed. file=%s, err=%+v", m.UniqueKey, m.playlistFilenameBak, err)
return
}
if err = fp.Close(); err != nil {
nazalog.Errorf("[%s] close file failed. file=%s, err=%+v", m.UniqueKey, m.playlistFilenameBak, err)
return
}
if err = os.Rename(m.playlistFilenameBak, m.playlistFilename); err != nil {
nazalog.Errorf("[%s] rename file failed. err=%+v", m.UniqueKey, err)
return
}
}
// 创建文件夹,如果文件夹已经存在,老的文件夹会被删除
func (m *Muxer) ensureDir() {
err := os.RemoveAll(m.outPath)
nazalog.Assert(nil, err)
err = os.MkdirAll(m.outPath, 0777)
if !m.config.Enable {
return
}
//err := os.RemoveAll(m.outPath)
//nazalog.Assert(nil, err)
err := os.MkdirAll(m.outPath, 0777)
nazalog.Assert(nil, err)
}
@ -436,33 +464,3 @@ func (m *Muxer) incrFrag() {
m.nfrags++
}
}
// 将音频数据落盘的几种情况:
// 1. open fragment时如果aframe中还有数据
// 2. update fragment时判断音频的时间戳
// 3. 音频队列长度过长时
// 4. 流关闭时
func (m *Muxer) flushAudio() {
if !m.opened {
nazalog.Warnf("[%s] flushAudio by not opened.", m.UniqueKey)
return
}
if m.audioCacheFrames == nil {
return
}
var frame mpegts.Frame
frame.CC = m.audioCC
frame.DTS = m.audioCacheFirstFramePTS
frame.PTS = m.audioCacheFirstFramePTS
frame.Key = false
frame.Raw = m.audioCacheFrames
frame.Pid = mpegts.PidAudio
frame.Sid = mpegts.StreamIDAudio
//nazalog.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
m.fragment.WriteFrame(&frame)
m.audioCC = frame.CC
m.audioCacheFrames = nil
}

@ -25,8 +25,9 @@ import (
// rootPath="/tmp/lal/hls/"
//
// 则
// http://127.0.0.1:8081/hls/test110/playlist.m3u8 -> /tmp/lal/hls/test110/playlist.m3u8
// http://127.0.0.1:8081/hls/test110/test110-0.ts -> /tmp/lal/hls/test110/test110-0.ts
// http://127.0.0.1:8081/hls/test110/playlist.m3u8 -> /tmp/lal/hls/test110/playlist.m3u8
// http://127.0.0.1:8081/hls/test110/record.m3u8 -> /tmp/lal/hls/test110/record.m3u8
// http://127.0.0.1:8081/hls/test110/timestamp-0.ts -> /tmp/lal/hls/test110/timestamp-0.ts
type requestInfo struct {
fileName string
@ -35,9 +36,10 @@ type requestInfo struct {
}
// RequestURI example:
// uri -> fileName streamName fileType
// http://127.0.0.1:8081/hls/test110/playlist.m3u8 -> playlist.m3u8 test110 m3u8
// http://127.0.0.1:8081/hls/test110/test110-0.ts -> test110-0.ts test110 ts
// uri -> fileName streamName fileType
// http://127.0.0.1:8081/hls/test110/playlist.m3u8 -> playlist.m3u8 test110 m3u8
// http://127.0.0.1:8081/hls/test110/record.m3u8 -> record.m3u8 test110 m3u8
// http://127.0.0.1:8081/hls/test110/timestamp-0.ts -> timestamp-0.ts test110 ts
func parseRequestInfo(uri string) (ri requestInfo) {
ss := strings.Split(uri, "/")
if len(ss) < 2 {
@ -68,10 +70,14 @@ func getM3U8Filename(outpath string, streamName string) string {
return fmt.Sprintf("%s%s.m3u8", outpath, "playlist")
}
func getTSFilename(outpath string, streamName string, id int) string {
return fmt.Sprintf("%s%s-%d.ts", outpath, streamName, id)
func getRecordM3U8Filename(outpath string, streamName string) string {
return fmt.Sprintf("%s%s.m3u8", outpath, "record")
}
func getTSFilenameWithoutPath(streamName string, id int) string {
return fmt.Sprintf("%s-%d.ts", streamName, id)
func getTSFilenameWithPath(outpath string, filename string) string {
return fmt.Sprintf("%s%s", outpath, filename)
}
func getTSFilename(streamName string, id int, timestamp int) string {
return fmt.Sprintf("%d-%d.ts", timestamp, id)
}

@ -55,7 +55,6 @@ func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
// TODO chef:
// - check appname in URI path
// - DIY 404 response body
ri := parseRequestInfo(req.RequestURI)
//nazalog.Debugf("%+v", ri)

@ -0,0 +1,271 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique"
)
type StreamerObserver interface {
// @param streamer: 供上层获取streamer内部的一些状态比如spspps是否已缓存音频缓存队列是否有数据等
//
// @param frame: 各字段含义见mpegts.Frame结构体定义
// frame.CC 注意回调结束后Streamer会保存frame.CC上层在TS打包完成后可通过frame.CC将cc值传递给Streamer
// frame.Raw 回调结束后,这块内存可能会被内部重复使用
//
OnFrame(streamer *Streamer, frame *mpegts.Frame)
}
type Streamer struct {
UniqueKey string
observer StreamerObserver
videoOut []byte // AnnexB TODO chef: 优化这块buff
spspps []byte // AnnexB
adts aac.ADTS
audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 TODO chef: 优化这块buff
audioCacheFirstFramePTS uint64 // audioCacheFrames中第一个音频帧的时间戳 TODO chef: rename to DTS
audioCC uint8
videoCC uint8
}
func NewStreamer(observer StreamerObserver) *Streamer {
uk := unique.GenUniqueKey("STREAMER")
videoOut := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
return &Streamer{
UniqueKey: uk,
observer: observer,
videoOut: videoOut,
}
}
// @param msg msg.Payload 调用结束后,函数内部不会持有这块内存
//
// TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接
func (s *Streamer) FeedRTMPMessage(msg base.RTMPMsg) {
switch msg.Header.MsgTypeID {
case base.RTMPTypeIDAudio:
s.feedAudio(msg)
case base.RTMPTypeIDVideo:
s.feedVideo(msg)
}
}
func (s *Streamer) AudioSeqHeaderCached() bool {
return s.adts.HasInited()
}
func (s *Streamer) VideoSeqHeaderCached() bool {
return s.spspps != nil
}
func (s *Streamer) AudioCacheEmpty() bool {
return s.audioCacheFrames == nil
}
func (s *Streamer) feedVideo(msg base.RTMPMsg) {
if len(msg.Payload) < 5 {
nazalog.Errorf("[%s] invalid video message length. len=%d", s.UniqueKey, len(msg.Payload))
return
}
if msg.Payload[0]&0xF != base.RTMPCodecIDAVC {
return
}
ftype := msg.Payload[0] & 0xF0 >> 4
htype := msg.Payload[1]
// 将数据转换成AnnexB
// 如果是sps pps缓存住然后直接返回
if ftype == base.RTMPFrameTypeKey && htype == base.RTMPAVCPacketTypeSeqHeader {
if err := s.cacheSPSPPS(msg); err != nil {
nazalog.Errorf("[%s] cache spspps failed. err=%+v", s.UniqueKey, err)
}
return
}
cts := bele.BEUint24(msg.Payload[2:])
audSent := false
spsppsSent := false
// 优化这块buffer
out := s.videoOut[0:0]
// tag中可能有多个NALU逐个获取
for i := 5; i != len(msg.Payload); {
if i+4 > len(msg.Payload) {
nazalog.Errorf("[%s] slice len not enough. i=%d, len=%d", s.UniqueKey, i, len(msg.Payload))
return
}
nalBytes := int(bele.BEUint32(msg.Payload[i:]))
i += 4
if i+nalBytes > len(msg.Payload) {
nazalog.Errorf("[%s] slice len not enough. i=%d, payload len=%d, nalBytes=%d", s.UniqueKey, i, len(msg.Payload), nalBytes)
return
}
nalType := avc.ParseNALUType(msg.Payload[i])
//nazalog.Debugf("[%s] hls: h264 NAL type=%d, len=%d(%d) cts=%d.", s.UniqueKey, nalType, nalBytes, len(msg.Payload), cts)
// sps pps前面已经缓存过了这里就不用处理了
// aud有自己的生产逻辑原流中的aud直接过滤掉
if nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD {
i += nalBytes
continue
}
if !audSent {
switch nalType {
case avc.NALUTypeSlice, avc.NALUTypeIDRSlice, avc.NALUTypeSEI:
// 在前面写入aud
out = append(out, audNal...)
audSent = true
//case avc.NALUTypeAUD:
// // 上面aud已经continue跳过了应该进不到这个分支可以考虑删除这个分支代码
// audSent = true
}
}
switch nalType {
case avc.NALUTypeSlice:
spsppsSent = false
case avc.NALUTypeIDRSlice:
// 如果是首个关键帧在前面写入sps pps
if !spsppsSent {
var err error
out, err = s.appendSPSPPS(out)
if err != nil {
nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
return
}
}
spsppsSent = true
}
// 这里不知为什么要区分写入两种类型的start code
if len(out) == 0 {
out = append(out, avc.NALUStartCode4...)
} else {
out = append(out, avc.NALUStartCode3...)
}
out = append(out, msg.Payload[i:i+nalBytes]...)
i += nalBytes
}
key := ftype == base.RTMPFrameTypeKey && htype == base.RTMPAVCPacketTypeNALU
dts := uint64(msg.Header.TimestampAbs) * 90
if s.audioCacheFrames != nil && s.audioCacheFirstFramePTS+maxAudioCacheDelayByVideo < dts {
s.FlushAudio()
}
var frame mpegts.Frame
frame.CC = s.videoCC
frame.DTS = dts
frame.PTS = frame.DTS + uint64(cts)*90
frame.Key = key
frame.Raw = out
frame.Pid = mpegts.PidVideo
frame.Sid = mpegts.StreamIDVideo
s.observer.OnFrame(s, &frame)
s.videoCC = frame.CC
}
func (s *Streamer) feedAudio(msg base.RTMPMsg) {
if len(msg.Payload) < 3 {
nazalog.Errorf("[%s] invalid audio message length. len=%d", s.UniqueKey, len(msg.Payload))
return
}
if msg.Payload[0]>>4 != base.RTMPSoundFormatAAC {
return
}
//nazalog.Debugf("[%s] hls: feedAudio. dts=%d len=%d", s.UniqueKey, msg.Header.TimestampAbs, len(msg.Payload))
if msg.Payload[1] == base.RTMPAACPacketTypeSeqHeader {
if err := s.cacheAACSeqHeader(msg); err != nil {
nazalog.Errorf("[%s] cache aac seq header failed. err=%+v", s.UniqueKey, err)
}
return
}
if !s.adts.HasInited() {
nazalog.Warnf("[%s] feed audio message but aac seq header not exist.", s.UniqueKey)
return
}
pts := uint64(msg.Header.TimestampAbs) * 90
if s.audioCacheFrames != nil && s.audioCacheFirstFramePTS+maxAudioCacheDelayByAudio < pts {
s.FlushAudio()
}
if s.audioCacheFrames == nil {
s.audioCacheFirstFramePTS = pts
}
adtsHeader, _ := s.adts.CalcADTSHeader(uint16(msg.Header.MsgLen - 2))
s.audioCacheFrames = append(s.audioCacheFrames, adtsHeader...)
s.audioCacheFrames = append(s.audioCacheFrames, msg.Payload[2:]...)
}
// 吐出音频数据的三种情况:
// 1. 收到音频或视频时,音频缓存队列已达到一定长度
// 2. 打开一个新的TS文件切片时
// 3. 输入流关闭时
func (s *Streamer) FlushAudio() {
if s.audioCacheFrames == nil {
return
}
var frame mpegts.Frame
frame.CC = s.audioCC
frame.DTS = s.audioCacheFirstFramePTS
frame.PTS = s.audioCacheFirstFramePTS
frame.Key = false
frame.Raw = s.audioCacheFrames
frame.Pid = mpegts.PidAudio
frame.Sid = mpegts.StreamIDAudio
s.observer.OnFrame(s, &frame)
s.audioCC = frame.CC
s.audioCacheFrames = nil
}
func (s *Streamer) cacheAACSeqHeader(msg base.RTMPMsg) error {
return s.adts.InitWithAACAudioSpecificConfig(msg.Payload[2:])
}
func (s *Streamer) cacheSPSPPS(msg base.RTMPMsg) error {
var err error
s.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload)
return err
}
func (s *Streamer) appendSPSPPS(out []byte) ([]byte, error) {
if s.spspps == nil {
return out, ErrHLS
}
out = append(out, s.spspps...)
return out, nil
}

@ -0,0 +1,13 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpts
import "errors"
var ErrHTTPTS = errors.New("lal.httpts: fxxk")

@ -0,0 +1,81 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpts
import (
"net"
log "github.com/q191201771/naza/pkg/nazalog"
)
type ServerObserver interface {
// 通知上层有新的拉流者
// 返回值: true则允许拉流false则关闭连接
OnNewHTTPTSSubSession(session *SubSession) bool
OnDelHTTPTSSubSession(session *SubSession)
}
type Server struct {
obs ServerObserver
addr string
ln net.Listener
}
func NewServer(obs ServerObserver, addr string) *Server {
return &Server{
obs: obs,
addr: addr,
}
}
func (server *Server) Listen() (err error) {
if server.ln, err = net.Listen("tcp", server.addr); err != nil {
return
}
log.Infof("start httpts server listen. addr=%s", server.addr)
return
}
func (server *Server) RunLoop() error {
for {
conn, err := server.ln.Accept()
if err != nil {
return err
}
go server.handleConnect(conn)
}
}
func (server *Server) Dispose() {
if server.ln == nil {
return
}
if err := server.ln.Close(); err != nil {
log.Error(err)
}
}
func (server *Server) handleConnect(conn net.Conn) {
log.Infof("accept a httpts connection. remoteAddr=%s", conn.RemoteAddr().String())
session := NewSubSession(conn)
if err := session.ReadRequest(); err != nil {
log.Errorf("[%s] read httpts SubSession request error. err=%v", session.UniqueKey, err)
return
}
log.Debugf("[%s] < read http request. uri=%s", session.UniqueKey, session.URI)
if !server.obs.OnNewHTTPTSSubSession(session) {
session.Dispose()
}
err := session.RunLoop()
log.Debugf("[%s] httpts sub session loop done. err=%v", session.UniqueKey, err)
server.obs.OnDelHTTPTSSubSession(session)
}

@ -0,0 +1,143 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpts
import (
"net"
"net/url"
"strings"
"time"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique"
)
var tsHTTPResponseHeader []byte
type SubSession struct {
UniqueKey string
StartTick int64
StreamName string
AppName string
URI string
Headers map[string]string
IsFresh bool
conn connection.Connection
}
func NewSubSession(conn net.Conn) *SubSession {
uk := unique.GenUniqueKey("TSSUB")
s := &SubSession{
UniqueKey: uk,
IsFresh: true,
conn: connection.New(conn, func(option *connection.Option) {
option.ReadBufSize = readBufSize
option.WriteChanSize = wChanSize
option.WriteTimeoutMS = subSessionWriteTimeoutMS
}),
}
nazalog.Infof("[%s] lifecycle new httpts SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String())
return s
}
// TODO chef: read request timeout
func (session *SubSession) ReadRequest() (err error) {
session.StartTick = time.Now().Unix()
defer func() {
if err != nil {
session.Dispose()
}
}()
var (
requestLine string
method string
)
if requestLine, session.Headers, err = nazahttp.ReadHTTPHeader(session.conn); err != nil {
return
}
if method, session.URI, _, err = nazahttp.ParseHTTPRequestLine(requestLine); err != nil {
return
}
if method != "GET" {
err = ErrHTTPTS
return
}
var urlObj *url.URL
if urlObj, err = url.Parse(session.URI); err != nil {
return
}
if !strings.HasSuffix(urlObj.Path, ".ts") {
err = ErrHTTPTS
return
}
items := strings.Split(urlObj.Path, "/")
if len(items) != 3 {
err = ErrHTTPTS
return
}
session.AppName = items[1]
items = strings.Split(items[2], ".")
if len(items) < 2 {
err = ErrHTTPTS
return
}
session.StreamName = items[0]
return nil
}
func (session *SubSession) RunLoop() error {
buf := make([]byte, 128)
_, err := session.conn.Read(buf)
return err
}
func (session *SubSession) WriteHTTPResponseHeader() {
nazalog.Debugf("[%s] > W http response header.", session.UniqueKey)
session.WriteRawPacket(tsHTTPResponseHeader)
}
func (session *SubSession) WriteFragmentHeader() {
nazalog.Debugf("[%s] > W http response header.", session.UniqueKey)
session.WriteRawPacket(mpegts.FixedFragmentHeader)
}
func (session *SubSession) WriteRawPacket(pkt []byte) {
_, _ = session.conn.Write(pkt)
}
func (session *SubSession) Dispose() {
nazalog.Infof("[%s] lifecycle dispose httpts SubSession.", session.UniqueKey)
_ = session.conn.Close()
}
func init() {
tsHTTPResponseHeaderStr := "HTTP/1.1 200 OK\r\n" +
"Server: " + base.LALHTTPTSSubSessionServer + "\r\n" +
"Cache-Control: no-cache\r\n" +
"Content-Type: video/mp2t\r\n" +
"Connection: close\r\n" +
"Expires: -1\r\n" +
"Pragma: no-cache\r\n" +
"\r\n"
tsHTTPResponseHeader = []byte(tsHTTPResponseHeaderStr)
}

@ -0,0 +1,13 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpts
var readBufSize = 256 //16384 // SubSession读取数据时
var wChanSize = 1024 // SubSession发送数据时channel的大小
var subSessionWriteTimeoutMS = 10000

@ -79,6 +79,8 @@ func InnerTestEntry(t *testing.T) {
config, err := logic.LoadConf(confFile)
assert.Equal(t, nil, err)
_ = os.RemoveAll(config.HLSConfig.OutPath)
pushURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RTMPConfig.Addr)
httpflvPullURL = fmt.Sprintf("http://127.0.0.1%s/live/innertest.flv", config.HTTPFLVConfig.SubListenAddr)
rtmpPullURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RTMPConfig.Addr)
@ -166,7 +168,7 @@ func InnerTestEntry(t *testing.T) {
err = filebatch.Walk(
fmt.Sprintf("%sinnertest", config.HLSConfig.OutPath),
false,
"",
".ts",
func(path string, info os.FileInfo, content []byte, err error) []byte {
allContent = append(allContent, content...)
fileNum++
@ -174,9 +176,9 @@ func InnerTestEntry(t *testing.T) {
})
assert.Equal(t, nil, err)
allContentMD5 := nazamd5.MD5(allContent)
assert.Equal(t, 9, fileNum)
assert.Equal(t, 2219443, len(allContent))
assert.Equal(t, "fbccb04325eb2876c173ec6295359fef", allContentMD5)
assert.Equal(t, 8, fileNum)
assert.Equal(t, 2219152, len(allContent))
assert.Equal(t, "48db6251d40c271fd11b05650f074e0f", allContentMD5)
}
func compareFile() {

@ -22,6 +22,7 @@ type Config struct {
RTMPConfig RTMPConfig `json:"rtmp"`
HTTPFLVConfig HTTPFLVConfig `json:"httpflv"`
HLSConfig HLSConfig `json:"hls"`
HTTPTSConfig HTTPTSConfig `json:"httpts"`
RTSPConfig RTSPConfig `json:"rtsp"`
RelayPushConfig RelayPushConfig `json:"relay_push"`
RelayPullConfig RelayPullConfig `json:"relay_pull"`
@ -42,9 +43,13 @@ type HTTPFLVConfig struct {
GOPNum int `json:"gop_num"`
}
type HLSConfig struct {
type HTTPTSConfig struct {
Enable bool `json:"enable"`
SubListenAddr string `json:"sub_listen_addr"`
}
type HLSConfig struct {
SubListenAddr string `json:"sub_listen_addr"`
hls.MuxerConfig
}
@ -84,9 +89,15 @@ func LoadConf(confFile string) (*Config, error) {
}
// 检查配置必须项
if !j.Exist("rtmp") || !j.Exist("httpflv") || !j.Exist("hls") || !j.Exist("rtsp") ||
!j.Exist("relay_push") || !j.Exist("relay_pull") ||
!j.Exist("pprof") || !j.Exist("log") {
if !j.Exist("rtmp") ||
!j.Exist("httpflv") ||
!j.Exist("hls") ||
!j.Exist("httpts") ||
!j.Exist("rtsp") ||
!j.Exist("relay_push") ||
!j.Exist("relay_pull") ||
!j.Exist("pprof") ||
!j.Exist("log") {
return &config, errors.New("missing key field in config file")
}

@ -12,6 +12,8 @@ import (
"fmt"
"sync"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/lal/pkg/aac"
@ -49,6 +51,7 @@ type Group struct {
isPulling bool
rtmpSubSessionSet map[*rtmp.ServerSession]struct{}
httpflvSubSessionSet map[*httpflv.SubSession]struct{}
httptsSubSessionSet map[*httpts.SubSession]struct{}
hlsMuxer *hls.Muxer
url2PushProxy map[string]*pushProxy
gopCache *GOPCache
@ -87,6 +90,7 @@ func NewGroup(appName string, streamName string) *Group {
exitChan: make(chan struct{}, 1),
rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}),
httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}),
httptsSubSessionSet: make(map[*httpts.SubSession]struct{}),
gopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum),
httpflvGopCache: NewGOPCache("httpflv", uk, config.HTTPFLVConfig.GOPNum),
url2PushProxy: url2PushProxy,
@ -135,6 +139,11 @@ func (group *Group) Dispose() {
}
group.httpflvSubSessionSet = nil
for session := range group.httptsSubSessionSet {
session.Dispose()
}
group.httptsSubSessionSet = nil
if group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
group.hlsMuxer = nil
@ -202,6 +211,21 @@ func (group *Group) AddRTSPPubSession(session *rtsp.PubSession) bool {
return true
}
func (group *Group) DelRTSPPubSession(session *rtsp.PubSession) {
nazalog.Debugf("[%s] [%s] del PubSession from group.", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
if session != group.rtspPubSession {
nazalog.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p", group.UniqueKey, session.UniqueKey, group.rtmpPubSession)
return
}
group.rtspPubSession = nil
group.delIn()
}
func (group *Group) AddRTMPPullSession(session *rtmp.PullSession) {
nazalog.Debugf("[%s] [%s] add PullSession into group.", group.UniqueKey, session.UniqueKey())
@ -213,7 +237,7 @@ func (group *Group) AddRTMPPullSession(session *rtmp.PullSession) {
group.pullSession = session
if config.HLSConfig.Enable {
group.hlsMuxer = hls.NewMuxer(group.streamName, &config.HLSConfig.MuxerConfig)
group.hlsMuxer = hls.NewMuxer(group.streamName, &config.HLSConfig.MuxerConfig, group)
group.hlsMuxer.Start()
}
}
@ -265,6 +289,25 @@ func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) {
delete(group.httpflvSubSessionSet, session)
}
func (group *Group) AddHTTPTSSubSession(session *httpts.SubSession) {
nazalog.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey)
session.WriteHTTPResponseHeader()
session.WriteFragmentHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
group.httptsSubSessionSet[session] = struct{}{}
group.pullIfNeeded()
}
func (group *Group) DelHTTPTSSubSession(session *httpts.SubSession) {
nazalog.Debugf("[%s] [%s] del httpflv SubSession from group.", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
delete(group.httptsSubSessionSet, session)
}
func (group *Group) AddRTMPPushSession(url string, session *rtmp.PushSession) {
nazalog.Debugf("[%s] [%s] add rtmp PushSession into group.", group.UniqueKey, session.UniqueKey())
group.mutex.Lock()
@ -290,6 +333,22 @@ func (group *Group) IsTotalEmpty() bool {
return group.isTotalEmpty()
}
// hls.Muxer
func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) {
// 因为最前面Feed时已经加锁了所以这里回调上来就不用加锁了
for session := range group.httptsSubSessionSet {
if session.IsFresh {
if boundary {
session.IsFresh = false
session.WriteRawPacket(rawFrame)
}
} else {
session.WriteRawPacket(rawFrame)
}
}
}
// rtmp.PubSession or rtmp.PullSession
func (group *Group) OnReadRTMPAVMsg(msg base.RTMPMsg) {
group.mutex.Lock()
@ -377,10 +436,12 @@ func (group *Group) StringifyStats() string {
group.mutex.Lock()
defer group.mutex.Unlock()
var pub string
if group.rtmpPubSession == nil {
pub = "none"
} else {
if group.rtmpPubSession != nil {
pub = group.rtmpPubSession.UniqueKey
} else if group.rtspPubSession != nil {
pub = group.rtspPubSession.UniqueKey
} else {
pub = "none"
}
var pull string
if group.pullSession == nil {
@ -395,8 +456,8 @@ func (group *Group) StringifyStats() string {
}
}
return fmt.Sprintf("[%s] stream name=%s, rtmp pub=%s, relay rtmp pull=%s, rtmp sub size=%d, httpflv sub size=%d, relay rtmp push size=%d",
group.UniqueKey, group.streamName, pub, pull, len(group.rtmpSubSessionSet), len(group.httpflvSubSessionSet), pushSize)
return fmt.Sprintf("[%s] stream name=%s, rtmp pub=%s, relay rtmp pull=%s, rtmp sub=%d, httpflv sub=%d, httpts sub=%d, relay rtmp push=%d",
group.UniqueKey, group.streamName, pub, pull, len(group.rtmpSubSessionSet), len(group.httpflvSubSessionSet), len(group.httptsSubSessionSet), pushSize)
}
func (group *Group) broadcastMetadataAndSeqHeader() {
@ -410,7 +471,7 @@ func (group *Group) broadcastMetadataAndSeqHeader() {
return
}
metadata, err := rtmp.BuildMetadata(int(ctx.Width), int(ctx.Height), int(base.RTMPSoundFormatACC), int(base.RTMPCodecIDAVC))
metadata, err := rtmp.BuildMetadata(int(ctx.Width), int(ctx.Height), int(base.RTMPSoundFormatAAC), int(base.RTMPCodecIDAVC))
if err != nil {
nazalog.Errorf("build metadata failed. err=%+v", err)
return
@ -576,8 +637,9 @@ func (group *Group) pullIfNeeded() {
if !config.RelayPullConfig.Enable {
return
}
// TODO chef: func IsOutEmpty?
// 没有sub订阅者
if len(group.rtmpSubSessionSet) == 0 && len(group.httpflvSubSessionSet) == 0 {
if len(group.rtmpSubSessionSet) == 0 && len(group.httpflvSubSessionSet) == 0 && len(group.httptsSubSessionSet) == 0 {
return
}
// 已有pub推流或pull回源
@ -659,6 +721,7 @@ func (group *Group) isTotalEmpty() bool {
return group.rtmpPubSession == nil && len(group.rtmpSubSessionSet) == 0 &&
group.rtspPubSession == nil &&
len(group.httpflvSubSessionSet) == 0 &&
len(group.httptsSubSessionSet) == 0 &&
group.hlsMuxer == nil &&
!group.hasPushSession() &&
group.pullSession == nil
@ -672,7 +735,10 @@ func (group *Group) isInEmpty() bool {
func (group *Group) addIn() {
if config.HLSConfig.Enable {
group.hlsMuxer = hls.NewMuxer(group.streamName, &config.HLSConfig.MuxerConfig)
if group.hlsMuxer != nil {
nazalog.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer)
}
group.hlsMuxer = hls.NewMuxer(group.streamName, &config.HLSConfig.MuxerConfig, group)
group.hlsMuxer.Start()
}

@ -13,6 +13,8 @@ import (
"sync"
"time"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/lal/pkg/hls"
@ -26,6 +28,7 @@ type ServerManager struct {
rtmpServer *rtmp.Server
httpflvServer *httpflv.Server
hlsServer *hls.Server
httptsServer *httpts.Server
rtspServer *rtsp.Server
exitChan chan struct{}
@ -47,6 +50,9 @@ func NewServerManager() *ServerManager {
if config.HLSConfig.Enable {
m.hlsServer = hls.NewServer(config.HLSConfig.SubListenAddr, config.HLSConfig.OutPath)
}
if config.HTTPFLVConfig.Enable {
m.httptsServer = httpts.NewServer(m, config.HTTPTSConfig.SubListenAddr)
}
if config.RTSPConfig.Enable {
m.rtspServer = rtsp.NewServer(config.RTSPConfig.Addr, m)
}
@ -78,6 +84,18 @@ func (sm *ServerManager) RunLoop() {
}()
}
if sm.httptsServer != nil {
if err := sm.httptsServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
}
go func() {
if err := sm.httptsServer.RunLoop(); err != nil {
nazalog.Error(err)
}
}()
}
if sm.hlsServer != nil {
if err := sm.hlsServer.Listen(); err != nil {
nazalog.Error(err)
@ -132,6 +150,9 @@ func (sm *ServerManager) Dispose() {
if sm.httpflvServer != nil {
sm.httpflvServer.Dispose()
}
if sm.httptsServer != nil {
sm.httptsServer.Dispose()
}
if sm.hlsServer != nil {
sm.hlsServer.Dispose()
}
@ -201,6 +222,25 @@ func (sm *ServerManager) OnDelHTTPFLVSubSession(session *httpflv.SubSession) {
}
}
// ServerObserver of httpts.Server
func (sm *ServerManager) OnNewHTTPTSSubSession(session *httpts.SubSession) bool {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName, session.StreamName)
group.AddHTTPTSSubSession(session)
return true
}
// ServerObserver of httpts.Server
func (sm *ServerManager) OnDelHTTPTSSubSession(session *httpts.SubSession) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName, session.StreamName)
if group != nil {
group.DelHTTPTSSubSession(session)
}
}
// ServerObserver of rtsp.Server
func (sm *ServerManager) OnNewRTSPPubSession(session *rtsp.PubSession) bool {
sm.mutex.Lock()
@ -212,6 +252,12 @@ func (sm *ServerManager) OnNewRTSPPubSession(session *rtsp.PubSession) bool {
// ServerObserver of rtsp.Server
func (sm *ServerManager) OnDelRTSPPubSession(session *rtsp.PubSession) {
// TODO chef: impl me
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup("", session.StreamName)
if group != nil {
group.DelRTSPPubSession(session)
}
}
func (sm *ServerManager) iterateGroup() {

@ -8,6 +8,8 @@
package mpegts
// MPEG: Moving Picture Experts Group
// 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeader = []byte{
/* TS */

@ -8,31 +8,37 @@
package mpegts
// MPEG: Moving Picture Experts Group
type Frame struct {
PTS uint64
DTS uint64
Pid uint16 // PID of PES Header
Sid uint8 // stream_id of PES Header
CC uint8 // continuity_counter of TS Header
CC uint8 // continuity_counter of TS Header
// PID of PES Header
// 音频 mpegts.PidAudio
// 视频 mpegts.PidVideo
Pid uint16
// stream_id of PES Header
// 音频 mpegts.StreamIDAudio
// 视频 mpegts.StreamIDVideo
Sid uint8
// 音频 全部为false
// 视频 关键帧为true非关键帧为false
Key bool
// 音频AAC 格式为2字节ADTS头加raw frame
// 视频AVC 格式为AnnexB
Raw []byte
}
// @param packet: 188字节大小的TS包注意一次Pack对应的多个TSPacket复用的是一块内存
// @param cc: 当前TS包的continuity_counter
//
type OnTSPacket func(packet []byte, cc uint8)
type OnTSPacket func(packet []byte)
// @param frame: frame.CC 注意内部会修改frame.CC的值外部在调用结束后可保持CC的值供下次调用时使用
// frame.Pid 音频设置为mpegts.PidAudio视频设置为mpegts.PidVideo
// frame.Sid 音频设置为mpegts.StreamIDAudio视频设置为mpegts.StreamIDVideo
// frame.Key 是否是关键帧一般用于视频格式音频全部设置为false
//
// frame.Raw 如果是AVC类型格式为AnnexB
// 如果是AAC类型格式为2字节ADTS头加raw frame
// 函数内部不会持有该内存块
// @param frame: 各字段含义见mpegts.Frame结构体定义
// frame.CC 注意内部会修改frame.CC的值外部在调用结束后可保存CC的值供下次调用时使用
// frame.Raw 函数调用结束后,内部不会持有该内存块
//
// @param onTSPacket: 注意,一次函数调用,可能对应多次回调
//
@ -213,7 +219,7 @@ func PackTSPacket(frame *Frame, onTSPacket OnTSPacket) {
lpos = rpos
}
onTSPacket(packet, frame.CC)
onTSPacket(packet)
}
}

@ -51,17 +51,14 @@ func TestWrite(t *testing.T) {
assert.Equal(t, []byte{2, 0, 0, 0, 0, 0, 5, 6, 0, 0, 0, 0, 0, 0, 0, 1, 2}, buf.Bytes())
buf.Reset()
// 注意由于writeConnect中包含了版本信息是可变的所以不对结果做断言检查
err = packer.writeConnect(buf, "live", "rtmp://127.0.0.1/live", true)
assert.Equal(t, nil, err)
result = []byte{0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x82, 0x14, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x04, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x04, 0x74, 0x79, 0x70, 0x65, 0x02, 0x00, 0x0a, 0x6e, 0x6f, 0x6e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x00, 0x08, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x56, 0x65, 0x72, 0x02, 0x00, 0x20, 0x46, 0x4d, 0x4c, 0x45, 0x2f, 0x33, 0x2e, 0x30, 0x20, 0x28, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x3b, 0x20, 0x6c, 0x61, 0x6c, 0x30, 0x2e, 0x31, 0x33, 0x2e, 0x30, 0x29, 0x00, 0x05, 0x74, 0x63, 0x55, 0x72, 0x6c, 0x02, 0x00, 0x15, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x31, 0x32, 0x37, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x00, 0x09}
assert.Equal(t, result, buf.Bytes())
buf.Reset()
// 注意由于writeConnect中包含了版本信息是可变的所以不对结果做断言检查
err = packer.writeConnectResult(buf, 1)
assert.Equal(t, nil, err)
result = []byte{0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0xd0, 0x14, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x07, 0x5f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x06, 0x66, 0x6d, 0x73, 0x56, 0x65, 0x72, 0x02, 0x00, 0x0d, 0x46, 0x4d, 0x53, 0x2f, 0x33, 0x2c, 0x30, 0x2c, 0x31, 0x2c, 0x31, 0x32, 0x33, 0x00, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x00, 0x40, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x03, 0x00, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x02, 0x00, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x00, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x02, 0x00, 0x1d, 0x4e, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x00, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x02, 0x00, 0x15, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x20, 0x73, 0x75, 0x63, 0x63, 0x65, 0x65, 0x64, 0x65, 0x64, 0x2e, 0x00, 0x0e, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02, 0x00, 0x06, 0x30, 0x2c, 0x31, 0x33, 0x2c, 0x30, 0x00, 0x00, 0x09}
assert.Equal(t, result, buf.Bytes())
buf.Reset()
err = packer.writeCreateStream(buf)

@ -319,7 +319,7 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {
ss := strings.Split(s.StreamNameWithRawQuery, "?")
s.StreamName = ss[0]
nazalog.Infof("[%s] < R play('%s').", s.StreamName, s.UniqueKey)
nazalog.Infof("[%s] < R play('%s').", s.UniqueKey, s.StreamName)
// TODO chef: start duration reset
nazalog.Infof("[%s] > W onStatus('NetStream.Play.Start').", s.UniqueKey)

Loading…
Cancel
Save