1. [feat] 完成package hls的基础功能,并做了小范围重构 2. [feat] lals服务接入hls功能 3. [refactor] 将app目录下除lals的其他应用移入demo目录下 4. [feat] 新增两个demo:analyseflv和analysehls,分别用于拉取HTTP-FLV和HLS的流,并进行分析v0.12.3 -> CHANGELOG.md

pull/2/head
q191201771 5 years ago
parent d7e77299b2
commit 41cdddbe2f

1
.gitignore vendored

@ -10,7 +10,6 @@ coverage.html
/conf/self.conf.json
/tmp
/demo
/bin
/release
/.idea

@ -27,7 +27,7 @@ Go语言编写的直播流媒体网络传输服务器
---
已支持RTMPHTTP-FLVH264/AVCH265/HEVCAACGOP缓存。正在实现HLS部分。
已支持RTMPHTTP-FLVHLS(m3u8 + ts)H264/AVCH265/HEVCAACGOP缓存。
### README 目录
@ -84,6 +84,12 @@ $./bin/lals -c conf/lals.conf.json
"sub_listen_addr": ":8080", // HTTP-FLV拉流地址
"gop_num": 2
},
"hls": {
"sub_listen_addr": ":8081", // HLS监听地址
"out_path": "/tmp/lal/hls/", // HLS文件保存根目录
"fragment_duration_ms": 3000, // 单个TS文件切片时长
"fragment_num": 6 // M3U8文件列表中TS文件的数量
},
"log": {
"level": 1, // 日志级别1 debug, 2 info, 3 warn, 4 error, 5 fatal
"filename": "./logs/lals.log", // 日志输出文件
@ -105,37 +111,42 @@ $./bin/lals -c conf/lals.conf.json
### 三. 仓库目录框架
简单来说,源码在`app/`和`pkg/`两个目录下,后续我再画些源码架构图。
简单来说,源码在`pkg/``app/``demo/`三个目录下。
- `pkg/`存放各package包供本repo的程序以及其他业务方使用
- `app/`重要程序的入口目前仅包含lals——基于lal编写的一个通用流媒体服务器程序
- `demo/`:存放各种基于`lal/pkg`开发的小程序(小工具),一个子目录是一个程序,详情见各源码文件中头部的说明
```
pkg/ ......源码包
pkg/ ......
|-- rtmp/ ......RTMP协议
|-- httpflv/ ......HTTP-FLV协议
|-- logic/ ......lals服务器的上层业务
|-- hls/ ......HLS协议
|-- logic/ ......lals服务器程序的上层业务逻辑
|-- aac/ ......音频AAC编码格式相关
|-- avc/ ......视频H264/AVC编码格式相关
|-- hevc/ ......视频H265/HEVC编码格式相关
app/ ......各种main包的源码文件一个子目录对应一个main包也即对应可生成一个可执行文件
|-- lals/ ......[最重要的]流媒体服务器
|-- flvfile2rtmppush ......// RTMP推流客户端读取本地FLV文件使用RTMP协议推送出去
//
// 支持循环推送文件推送完毕后可循环推送RTMP push流并不断开
// 支持推送多路流相当于一个RTMP推流压测工具
|-- rtmppull ......// RTMP拉流客户端从远端服务器拉取RTMP流存储为本地FLV文件
//
// 另外作为一个RTMP拉流压测工具已经支持
// 1. 对一路流拉取n份
// 2. 拉取n路流
|-- httpflvpull ......HTTP-FLV拉流客户端
|-- modflvfile ......修改本地FLV文件
|-- flvfile2es ......将本地FLV文件分离成H264/AVC和AAC的ES流文件
app/ ......
|-- lals/ ......流媒体服务器lals的main函数入口
demo/ ......
|-- analyseflvts ......
|-- analysehlsts ......
|-- flvfile2rtmppush ......
|-- rtmppull ......
|-- httpflvpull ......
|-- modflvfile ......
|-- flvfile2es ......
|-- learnts ......
|-- tscmp ......
bin/ ......可执行文件编译输出目录
conf/ ......配置文件目录
```
后续我再画些源码架构图。
目前唯一的第三方依赖我自己写的Go基础库 [github.com/q191201771/naza](https://github.com/q191201771/naza)
@ -151,16 +162,16 @@ conf/ ......配置文件目录
* 不依赖第三方代码
* 后续可快速集成各种网络传输协议,流媒体封装协议
#### 功能
#### lals服务器功能
- [x] **pub 接收推流:** RTMP
- [x] **sub 接收拉流:** RTMPHTTP-FLVHLS(m3u8+ts)
- [x] **音频编码格式:** AAC
- [x] **视频编码格式:** H264/AVCH265/HEVC
- [x] **GOP缓存** 用于秒开
TODO
- 接收RTMP推流 [DONE]
- 转发给RTMP拉流 [DONE]
- 转发给HTTP-FLV拉流 [DONE]
- AAC [DONE]
- H264/AVC [DONE]
- H265/HEVC [DONE]
- GOP缓存 [DONE]
- HLS
- RTMP转推
- RTMP回源
- HTTP-FLV回源

@ -1,98 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"encoding/hex"
"flag"
"os"
"time"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/naza/pkg/bitrate"
"github.com/q191201771/naza/pkg/nazaatomic"
log "github.com/q191201771/naza/pkg/nazalog"
)
// TODO chef: 存储成 flv 文件
func main() {
url := parseFlag()
session := httpflv.NewPullSession()
abr := bitrate.New()
vbr := bitrate.New()
var runFlag nazaatomic.Bool
runFlag.Store(true)
go func() {
for runFlag.Load() {
time.Sleep(1 * time.Second)
}
}()
err := session.Pull(url, func(tag httpflv.Tag) {
switch tag.Header.Type {
case httpflv.TagTypeMetadata:
log.Info(hex.Dump(tag.Payload()))
case httpflv.TagTypeAudio:
abr.Add(len(tag.Raw))
case httpflv.TagTypeVideo:
log.Infof("onReadFLVTag. %+v, isSeqHeader=%t, isKeyNalu=%t", tag.Header, tag.IsVideoKeySeqHeader(), tag.IsVideoKeyNalu())
analysisVideoTag(tag)
vbr.Add(len(tag.Raw))
}
})
runFlag.Store(false)
log.Assert(nil, err)
}
const (
typeUnknown uint8 = 1
typeAVC uint8 = 2
typeHEVC uint8 = 3
)
var t uint8 = typeUnknown
func analysisVideoTag(tag httpflv.Tag) {
if tag.IsVideoKeySeqHeader() {
if tag.IsAVCKeySeqHeader() {
t = typeAVC
log.Info("AVC SH")
} else if tag.IsHEVCKeySeqHeader() {
t = typeHEVC
log.Info("HEVC SH")
}
} else {
body := tag.Raw[11:]
for i := 5; i != int(tag.Header.DataSize); {
naluLen := bele.BEUint32(body[i:])
switch t {
case typeAVC:
log.Infof("%s %s", avc.CalcNaluTypeReadable(body[i+4:]), hex.Dump(body[i+4:i+8]))
case typeHEVC:
log.Infof("%s %s", hevc.CalcNaluTypeReadable(body[i+4:]), hex.Dump(body[i+4:i+8]))
}
i = i + 4 + int(naluLen)
}
}
}
func parseFlag() string {
url := flag.String("i", "", "specify http-flv url")
flag.Parse()
if *url == "" {
flag.Usage()
os.Exit(1)
}
return *url
}

@ -32,6 +32,14 @@ do
fi
done
for file in `ls ${ROOT_DIR}/demo`
do
if [ -d ${ROOT_DIR}/demo/${file} ]; then
echo "build" ${ROOT_DIR}/demo/${file} "..."
cd ${ROOT_DIR}/demo/${file} && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${file}
fi
done
for file in `ls ${ROOT_DIR}/playground`
do
if [ -d ${ROOT_DIR}/playground/${file} ]; then

@ -7,6 +7,12 @@
"sub_listen_addr": ":8080",
"gop_num": 2
},
"hls": {
"sub_listen_addr": ":8081",
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6
},
"log": {
"level": 1,
"filename": "./logs/lals.log",

@ -7,6 +7,12 @@
"sub_listen_addr": ":8080",
"gop_num": 2
},
"hls": {
"sub_listen_addr": ":8081",
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6
},
"log": {
"level": 1,
"filename": "./logs/lals.log",

@ -0,0 +1,186 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"bytes"
"encoding/hex"
"flag"
"fmt"
"os"
"time"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/bitrate"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/naza/pkg/nazalog"
)
// 分析诊断HTTP-FLV流的时间戳。注意这个程序还没有完成。
// 功能:
// - 时间戳回退检查
// - 当音频时间戳出现回退时打error日志
// - 当视频时间戳出现回退时打error日志
// - 将音频和视频时间戳看成一个整体出现回退时打error日志
// - 定时打印:
// - 总体带宽
// - 音频带宽
// - 视频带宽
// - 视频DTS和PTS不相等的计数
// - H264
// - 打印每个tag的类型key seq header...
// - 打印每个tag中有多少个帧SPS PPS SEI IDR SLICE...
// - 打印每个SLICE的类型I、P、B...
// TODO
// - 解析metadata
// - 检查时间戳正向大的跳跃
var (
printStatFlag = true
printEveryTagFlag = false
printMetaData = true
analysisVideoTagFlag = false
)
func main() {
url := parseFlag()
session := httpflv.NewPullSession()
brTotal := bitrate.New()
brAudio := bitrate.New()
brVideo := bitrate.New()
prevAudioTS := int64(-1)
prevVideoTS := int64(-1)
prevTS := int64(-1)
videoCTSNotZeroCount := 0
go func() {
for {
time.Sleep(1 * time.Second)
if printStatFlag {
nazalog.Debugf("stat. total=%dKb/s, audio=%dKb/s, video=%dKb/s, videoCTSNotZeroCount=%d", int(brTotal.Rate()), int(brAudio.Rate()), int(brVideo.Rate()), videoCTSNotZeroCount)
}
}
}()
err := session.Pull(url, func(tag httpflv.Tag) {
if printEveryTagFlag {
debugLength := 32
if len(tag.Raw) < 32 {
debugLength = len(tag.Raw)
}
nazalog.Debugf("header=%+v, hex=%s", tag.Header, hex.Dump(tag.Raw[11:debugLength]))
}
brTotal.Add(len(tag.Raw))
switch tag.Header.Type {
case httpflv.TagTypeMetadata:
//nazalog.Debugf("----------\n", hex.Dump(tag.Raw))
if printMetaData {
// TODO chef: 这部分可以移入到rtmp package中
_, l, err := rtmp.AMF0.ReadString(tag.Raw[11:])
nazalog.Assert(nil, err)
kv, _, err := rtmp.AMF0.ReadObject(tag.Raw[11+l:])
nazalog.Assert(nil, err)
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("-----\ncount:%d\n", len(kv)))
for k, v := range kv {
buf.WriteString(fmt.Sprintf(" %s: %v\n", k, v))
}
nazalog.Debugf("%+v", buf.String())
}
case httpflv.TagTypeAudio:
brAudio.Add(len(tag.Raw))
if prevAudioTS != -1 && int64(tag.Header.Timestamp) < prevAudioTS {
nazalog.Errorf("audio timestamp error. header=%+v, prevAudioTS=%d, diff=%d", tag.Header, prevAudioTS, int64(tag.Header.Timestamp)-prevAudioTS)
}
if prevTS != -1 && int64(tag.Header.Timestamp) < prevTS {
nazalog.Errorf("audio timestamp error. header=%+v, prevTS=%d, diff=%d", tag.Header, prevTS, int64(tag.Header.Timestamp)-prevTS)
}
prevAudioTS = int64(tag.Header.Timestamp)
prevTS = int64(tag.Header.Timestamp)
case httpflv.TagTypeVideo:
if analysisVideoTagFlag {
analysisVideoTag(tag)
}
videoCTS := bele.BEUint24(tag.Raw[13:])
if videoCTS != 0 {
videoCTSNotZeroCount++
}
brVideo.Add(len(tag.Raw))
if prevVideoTS != -1 && int64(tag.Header.Timestamp) < prevVideoTS {
nazalog.Errorf("video timestamp error. header=%+v, prevVideoTS=%d, diff=%d", tag.Header, prevVideoTS, int64(tag.Header.Timestamp)-prevVideoTS)
}
if prevTS != -1 && int64(tag.Header.Timestamp) < prevTS {
nazalog.Errorf("video timestamp error. header=%+v, prevTS=%d, diff=%d", tag.Header, prevTS, int64(tag.Header.Timestamp)-prevTS)
}
prevVideoTS = int64(tag.Header.Timestamp)
prevTS = int64(tag.Header.Timestamp)
}
})
nazalog.Warn(err)
}
const (
typeUnknown uint8 = 1
typeAVC uint8 = 2
typeHEVC uint8 = 3
)
var t uint8 = typeUnknown
func analysisVideoTag(tag httpflv.Tag) {
var buf bytes.Buffer
if tag.IsVideoKeySeqHeader() {
if tag.IsAVCKeySeqHeader() {
t = typeAVC
buf.WriteString(" [AVC SeqHeader] ")
} else if tag.IsHEVCKeySeqHeader() {
t = typeHEVC
buf.WriteString(" [HEVC SeqHeader] ")
}
} else {
body := tag.Raw[11:]
for i := 5; i != int(tag.Header.DataSize); {
naluLen := bele.BEUint32(body[i:])
switch t {
case typeAVC:
buf.WriteString(fmt.Sprintf(" [%s(%s)] ", avc.CalcNaluTypeReadable(body[i+4:]), avc.CalcSliceTypeReadable(body[i+4:])))
case typeHEVC:
buf.WriteString(fmt.Sprintf(" [%s] ", hevc.CalcNaluTypeReadable(body[i+4:])))
}
i = i + 4 + int(naluLen)
}
}
nazalog.Debug(buf.String())
}
func parseFlag() string {
url := flag.String("i", "", "specify http-flv url")
flag.Parse()
if *url == "" {
flag.Usage()
os.Exit(1)
}
return *url
}

@ -0,0 +1,127 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"flag"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/q191201771/naza/pkg/lru"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/nazalog"
)
// 分析诊断HLS的时间戳。注意这个程序还没有完成。
//
// TODO chef: 有的代码考虑弄到pkg/hls中
type M3U8PullSession struct {
}
type frag struct {
extinf float64
filename string
}
func parseM3U8(content string) (ret []frag) {
var err error
lines := strings.Split(content, "\n")
var f frag
for _, line := range lines {
if strings.HasPrefix(line, "#EXTINF:") {
line = strings.TrimPrefix(line, "#EXTINF:")
line = strings.TrimSuffix(line, ",")
f.extinf, err = strconv.ParseFloat(line, 64)
nazalog.Assert(nil, err)
}
if strings.Index(line, ".ts") != -1 {
f.filename = line
ret = append(ret, f)
}
}
return
}
func getTSURL(m3u8URL string, tsFilename string) string {
index := strings.LastIndex(m3u8URL, "/")
nazalog.Assert(true, index != -1)
path := m3u8URL[:index+1]
return path + tsFilename
}
func main() {
m3u8URL := parseFlag()
nazalog.Infof("m3u8 url=%s", m3u8URL)
cache := lru.New(1024)
var m sync.Mutex
var frags []frag
go func() {
for {
content, err := nazahttp.GetHttpFile(m3u8URL, 3000)
if err != nil {
nazalog.Error(err)
return
}
//nazalog.Debugf("\n-----m3u8-----\n%s", string(content))
currFrags := parseM3U8(string(content))
//nazalog.Debugf("%+v", currFrags)
m.Lock()
for _, f := range currFrags {
if _, exist := cache.Get(f.filename); exist {
continue
}
cache.Put(f.filename, nil)
nazalog.Infof("> new frag. filename=%s", f.filename)
frags = append(frags, f)
}
m.Unlock()
time.Sleep(100 * time.Millisecond)
}
}()
for {
m.Lock()
currFrags := frags
frags = nil
m.Unlock()
for _, f := range currFrags {
nazalog.Infof("< new frag. filename=%s", f.filename)
tsURL := getTSURL(m3u8URL, f.filename)
nazalog.Debug(tsURL)
content, err := nazahttp.GetHttpFile(tsURL, 3000)
nazalog.Assert(nil, err)
nazalog.Debugf("TS len=%d", len(content))
}
time.Sleep(100 * time.Millisecond)
}
}
func parseFlag() string {
url := flag.String("i", "", "specify m3u8 url")
flag.Parse()
if *url == "" {
flag.Usage()
os.Exit(1)
}
return *url
}

@ -21,6 +21,7 @@ import (
)
// 将本地FLV文件分离成H264/AVC和AAC的ES流文件
//
// TODO chef 做HEVC的支持
func main() {

@ -0,0 +1,48 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"encoding/hex"
"flag"
"os"
"github.com/q191201771/lal/pkg/httpflv"
log "github.com/q191201771/naza/pkg/nazalog"
)
// 拉取HTTP-FLV的流
//
// TODO
// - 存储成flv文件
// - 拉取HTTP-FLV流进行分析参见另外一个demoanalyseflvts。 这个demo可能可以删除掉了。
func main() {
url := parseFlag()
session := httpflv.NewPullSession()
err := session.Pull(url, func(tag httpflv.Tag) {
switch tag.Header.Type {
case httpflv.TagTypeMetadata:
log.Info(hex.Dump(tag.Payload()))
case httpflv.TagTypeAudio:
case httpflv.TagTypeVideo:
}
})
log.Assert(nil, err)
}
func parseFlag() string {
url := flag.String("i", "", "specify http-flv url")
flag.Parse()
if *url == "" {
flag.Usage()
os.Exit(1)
}
return *url
}

@ -15,7 +15,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// 解析TS文件注意该程序还没有写完
// 学习如何解析TS文件。注意该程序还没有写完。
var (
pat hls.PAT
@ -85,7 +85,7 @@ func main() {
content, err := ioutil.ReadFile(filename)
nazalog.Assert(nil, err)
packets := hls.SplitTS(content)
packets := hls.SplitFragment2TSPackets(content)
for _, packet := range packets {
handlePacket(packet)

@ -17,7 +17,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// 比较两个TS文件注意该程序还没有写完
// 临时小工具比较两个TS文件。注意该程序还没有写完。
var filename1 = "/Volumes/Data/tmp/lal-4.ts"
var filename2 = "/Volumes/Data/tmp/nrm-4.ts"
@ -63,8 +63,8 @@ func main() {
content2, err := ioutil.ReadFile(filename2)
nazalog.Assert(nil, err)
tss1 := hls.SplitTS(content1)
tss2 := hls.SplitTS(content2)
tss1 := hls.SplitFragment2TSPackets(content1)
tss2 := hls.SplitFragment2TSPackets(content2)
nazalog.Debugf("num of ts1=%d, num of ts2=%d", len(tss1), len(tss2))

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.12
require github.com/q191201771/naza v0.12.2
require github.com/q191201771/naza v0.12.3

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.12.2 h1:El5OSCPFrGGrZiyZ0aOvdystC15Ap7lC4MipVKdfVMY=
github.com/q191201771/naza v0.12.2/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=
github.com/q191201771/naza v0.12.3 h1:0Z8hMa5RYNqsG1GjGfYyLFkuPLfuZ21iDx3BJEPK0p8=
github.com/q191201771/naza v0.12.3/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=

@ -13,9 +13,8 @@ import (
"io"
"math"
"github.com/q191201771/naza/pkg/nazabits"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazabits"
)
var ErrAVC = errors.New("lal.avc: fxxk")
@ -61,42 +60,52 @@ const (
SliceTypeSI uint8 = 4 // TODO chef
)
func CalcNaluType(nalu []byte) uint8 {
return nalu[0] & 0x1f
}
func CalcSliceType(nalu []byte) uint8 {
c := nalu[1]
var leadingZeroBits uint
index := uint(6)
var leadingZeroBits int
index := 6 // can't unsigned
for ; index >= 0; index-- {
v := nazabits.GetBit8(c, index)
v := nazabits.GetBit8(c, uint(index))
if v == 0 {
leadingZeroBits++
} else {
break
}
}
rbLeadingZeroBits := nazabits.GetBits8(c, index-1, leadingZeroBits)
codeNum := uint(math.Pow(2, float64(leadingZeroBits))) - 1 + uint(rbLeadingZeroBits)
rbLeadingZeroBits := nazabits.GetBits8(c, uint(index-1), uint(leadingZeroBits))
codeNum := int(math.Pow(2, float64(leadingZeroBits))) - 1 + int(rbLeadingZeroBits)
if codeNum > 4 {
codeNum -= 5
}
return uint8(codeNum)
}
func CalcSliceTypeReadable(nalu []byte) string {
t := CalcSliceType(nalu)
ret, ok := SliceTypeMapping[t]
func CalcNaluTypeReadable(nalu []byte) string {
t := nalu[0] & 0x1f
ret, ok := NaluUintTypeMapping[t]
if !ok {
return "unknown"
}
return ret
}
func CalcNaluType(nalu []byte) uint8 {
return nalu[0] & 0x1f
}
func CalcSliceTypeReadable(nalu []byte) string {
naluType := CalcNaluType(nalu)
switch naluType {
case NaluUnitTypeSEI:
fallthrough
case NaluUintTypeSPS:
fallthrough
case NaluUintTypePPS:
return ""
}
func CalcNaluTypeReadable(nalu []byte) string {
t := nalu[0] & 0x1f
ret, ok := NaluUintTypeMapping[t]
t := CalcSliceType(nalu)
ret, ok := SliceTypeMapping[t]
if !ok {
return "unknown"
}

@ -14,25 +14,29 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// TODO chef: 这个文件需要和session.go一起重构
type FragmentOP struct {
fp *os.File
}
type MPEGTSFrame struct {
type mpegTSFrame struct {
pts uint64
dts uint64
pid uint16
sid uint8
cc uint8
key bool
key bool // 关键帧
}
func mpegtsOpenFile(filename string) *os.File {
fp, err := os.Create(filename)
nazalog.Assert(nil, err)
mpegtsWriteFile(fp, FixedFragmentHeader)
return fp
func (f *FragmentOP) OpenFile(filename string) (err error) {
f.fp, err = os.Create(filename)
if err != nil {
return
}
f.writeFile(FixedFragmentHeader)
return nil
}
func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
func (f *FragmentOP) WriteFrame(frame *mpegTSFrame, b []byte) {
//nazalog.Debugf("mpegts: pid=%d, sid=%d, pts=%d, dts=%d, key=%b, size=%d", frame.pid, frame.sid, frame.pts, frame.dts, frame.key, len(b))
wpos := 0 // 当前packet的写入位置
@ -47,7 +51,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
frame.cc++
// 每个packet都需要添加TS Header
// -----TS Header-----
// -----TS Header----------------
// sync_byte
// transport_error_indicator 0
// payload_unit_start_indicator
@ -56,6 +60,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
// transport_scrambling_control 0
// adaptation_field_control
// continuity_counter
// ------------------------------
packet[0] = syncByte // sync_byte
if first {
@ -72,7 +77,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
if first {
if frame.key {
// 关键帧的首个packet需要添加Adaptation
// -----Adaptation-----
// -----Adaptation-----------------------
// adaptation_field_length
// discontinuity_indicator 0
// random_access_indicator 1
@ -85,6 +90,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
// program_clock_reference_base
// reserved
// program_clock_reference_extension
// --------------------------------------
packet[3] |= 0x20 // adaptation_field_control 设置Adaptation
packet[4] = 7 // adaptation_field_length
packet[5] = 0x50 // random_access_indicator + PCR_flag
@ -93,7 +99,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
}
// 帧的首个packet需要添加PES Header
// -----PES Header-----
// -----PES Header------------
// packet_start_code_prefix
// stream_id
// PES_packet_length
@ -111,6 +117,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
// PES_CRC_flag 0
// PES_extension_flag 0
// PES_header_data_length
// ---------------------------
packet[wpos] = 0x00 // packet_start_code_prefix
packet[wpos+1] = 0x00 //
packet[wpos+2] = 0x01 //
@ -147,7 +154,6 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
if frame.pts != frame.dts {
mpegtsWritePTS(packet[wpos:], 1, frame.dts+delay)
wpos += 5
//nazalog.Debugf("%d %d", (frame.pts)/90, (frame.dts)/90)
}
first = false
@ -212,10 +218,18 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
lpos = rpos
}
mpegtsWriteFile(fp, packet)
f.writeFile(packet)
}
}
func (f *FragmentOP) CloseFile() {
_ = f.fp.Close()
}
func (f *FragmentOP) writeFile(b []byte) {
_, _ = f.fp.Write(b)
}
func mpegtsdWritePCR(out []byte, pcr uint64) {
out[0] = uint8(pcr >> 25)
out[1] = uint8(pcr >> 17)
@ -238,11 +252,3 @@ func mpegtsWritePTS(out []byte, fb uint8, pts uint64) {
out[3] = uint8(val >> 8)
out[4] = uint8(val)
}
func mpegtsWriteFile(fp *os.File, b []byte) {
_, _ = fp.Write(b)
}
func mpegtsCloseFile(fp *os.File) {
_ = fp.Close()
}

@ -9,27 +9,27 @@
package hls
// TODO
// package hls处于开发中阶段请不要使用第一步计划
// - 不提供各种配置项
// - 只支持H264和AAC
// - 先参照nginx rtmp module把功能实现再做重构
//
// - 支持HEVC
// - 检查所有的容错处理,是否会出现
// - 补充单元测试
// - 配置项
// - web服务
// - 清理文件
// - Server
// - Dispose
// - 超时时间
// - 测试windows平台
// - safari直接播放不了vlc和ffplay是可以的
// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/incorporating_ads_into_a_playlist
// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/event_playlist_construction
// #EXTM3U // 固定串
// #EXT-X-VERSION:3 // 固定串
// #EXT-X-MEDIA-SEQUENCE // m3u8文件中第一个TS文件的序号
// #EXT-X-MEDIA-SEQUENCE //
// #EXT-X-TARGETDURATION // 所有TS文件最长的时长
// #EXT-X-PLAYLIST-TYPE: EVENT
// #EXT-X-DISCONTINUITY //
// #EXTINF: // 时长 以及TS文件名
// #EXTINF: // 时长以及TS文件名
// 重构时,需要统一项目中数据的命名,比如,进来的数据称为Frame帧188字节的封装称为TSPacket包TS文件称为Fragment
// 进来的数据称为Frame帧188字节的封装称为TSPacket包TS文件称为Fragment
// 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeader = []byte{
@ -154,13 +154,23 @@ const (
delay uint64 = 63000 // 700 ms PCR delay TODO chef: 具体作用?
// TODO chef 这些在配置项中提供
outPath = "/tmp/lal/hls/" // 切片文件输出目录
fraglen = 5000 // 单个TS时长单位毫秒
playlen = 30000 // m3u8列表时长
maxfraglen = fraglen * 90 * 10 // 单个fragment超过这个时长强制切割新的fragment单位毫秒 * 90
negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了比当前fragment的首个时间戳还小强制切割新的fragment单位毫秒 * 90
winfrags = playlen / fraglen // 多少个TS文件
maxAudioDelay uint64 = 300 // 单位毫秒
audioBufSize = 1024 * 1024
Sync = 2
negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了比当前fragment的首个时间戳还小强制切割新的fragment单位毫秒 * 90
maxAudioDelay uint64 = 300 // 单位毫秒
appName = "hls"
)
func SplitFragment2TSPackets(content []byte) (ret [][]byte) {
if len(content)%188 != 0 {
return
}
for {
if len(content) == 0 {
break
}
ret = append(ret, content[0:188])
content = content[188:]
}
return
}

@ -0,0 +1,405 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"bytes"
"fmt"
"os"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
// 记录fragment的一些信息注意写m3u8文件时可能还需要用到历史fragment的信息
type fragmentInfo struct {
id int // fragment的自增序号
duration float64 // 当前fragment中数据的时长单位秒
discont bool // #EXT-X-DISCONTINUITY
}
type MuxerConfig struct {
OutPath string `json:"out_path"`
FragmentDurationMS int `json:"fragment_duration_ms"`
FragmentNum int `json:"fragment_num"`
}
type Muxer struct {
streamName string
outPath string
playlistFilename string
playlistFilenameBak string
config *MuxerConfig
fragmentOP FragmentOP
opened bool
adts aac.ADTS
spspps []byte
videoCC uint8
audioCC uint8
videoOut []byte // 帧
fragTS uint64 // 新建立fragment时的时间戳毫秒 * 90
nfrags int // 大序号增长到winfrags后就增长frag
frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []fragmentInfo // TS文件的环形队列记录TS的信息比如写M3U8文件时要用 2 * winfrags + 1
aaframe []byte
aframePTS uint64 // 最新音频帧的时间戳
}
func NewMuxer(streamName string, config *MuxerConfig) *Muxer {
op := getMuxerOutPath(config.OutPath, streamName)
playlistFilename := getM3U8Filename(op, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
videoOut := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
frags := make([]fragmentInfo, 2*config.FragmentNum+1) // TODO chef: 为什么是 * 2 + 1
return &Muxer{
streamName: streamName,
outPath: op,
playlistFilename: playlistFilename,
playlistFilenameBak: playlistFilenameBak,
config: config,
videoOut: videoOut,
aaframe: nil,
frags: frags,
}
}
func (m *Muxer) Start() {
nazalog.Infof("start hls muxer. streamName=%s", m.streamName)
m.ensureDir()
}
func (m *Muxer) Stop() {
nazalog.Infof("stop hls muxer. streamName=%s", m.streamName)
m.flushAudio()
m.closeFragment()
}
func (m *Muxer) FeedRTMPMessage(msg rtmp.AVMsg) {
switch msg.Header.MsgTypeID {
case rtmp.TypeidAudio:
m.feedAudio(msg)
case rtmp.TypeidVideo:
m.feedVideo(msg)
}
}
func (m *Muxer) feedVideo(msg rtmp.AVMsg) {
if msg.Payload[0]&0xF != 7 {
// TODO chef: HLS视频现在只做了h264的支持
return
}
ftype := msg.Payload[0] & 0xF0 >> 4
htype := msg.Payload[1]
if ftype == 1 && htype == 0 {
m.cacheSPSPPS(msg)
return
}
cts := bele.BEUint24(msg.Payload[2:])
audSent := false
spsppsSent := false
// 优化这块buffer
out := m.videoOut[0:0]
for i := 5; i != len(msg.Payload); {
nalBytes := int(bele.BEUint32(msg.Payload[i:]))
i += 4
srcNalType := msg.Payload[i]
nalType := srcNalType & 0x1F
//nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts)
if nalType >= 7 && nalType <= 9 {
nazalog.Warn("should not reach here.")
i += nalBytes
continue
}
if !audSent {
switch nalType {
case 1, 5, 6:
out = append(out, audNal...)
audSent = true
case 9:
audSent = true
}
}
switch nalType {
case 1:
spsppsSent = false
case 5:
if !spsppsSent {
out = m.appendSPSPPS(out)
}
spsppsSent = true
}
if len(out) == 0 {
out = append(out, nalStartCode...)
} else {
out = append(out, nalStartCode3...)
}
out = append(out, msg.Payload[i:i+nalBytes]...)
i += nalBytes
}
var frame mpegTSFrame
frame.cc = m.videoCC
frame.dts = uint64(msg.Header.TimestampAbs) * 90
frame.pts = frame.dts + uint64(cts)*90
frame.pid = PidVideo
frame.sid = streamIDVideo
frame.key = ftype == 1
boundary := frame.key && (!m.opened || m.adts.IsNil() || m.aaframe != nil)
m.updateFragment(frame.dts, boundary, 1)
if !m.opened {
nazalog.Warn("not opened.")
return
}
m.fragmentOP.WriteFrame(&frame, out)
m.videoCC = frame.cc
}
func (m *Muxer) feedAudio(msg rtmp.AVMsg) {
if msg.Payload[0]>>4 != 10 {
// TODO chef: HLS音频现在只做了h264的支持
return
}
if msg.Payload[1] == 0 {
m.cacheAACSeqHeader(msg)
return
}
pts := uint64(msg.Header.TimestampAbs) * 90
m.updateFragment(pts, m.spspps == nil, 2)
if m.aaframe == nil {
m.aframePTS = pts
}
adtsHeader := m.adts.GetADTS(uint16(msg.Header.MsgLen))
m.aaframe = append(m.aaframe, adtsHeader...)
m.aaframe = append(m.aaframe, msg.Payload[2:]...)
}
func (m *Muxer) cacheAACSeqHeader(msg rtmp.AVMsg) {
m.adts.PutAACSequenceHeader(msg.Payload)
}
func (m *Muxer) cacheSPSPPS(msg rtmp.AVMsg) {
m.spspps = msg.Payload
}
func (m *Muxer) appendSPSPPS(out []byte) []byte {
index := 10
nnals := m.spspps[index] & 0x1f
index++
for n := 0; ; n++ {
for ; nnals != 0; nnals-- {
length := int(bele.BEUint16(m.spspps[index:]))
index += 2
out = append(out, nalStartCode...)
out = append(out, m.spspps[index:index+length]...)
index += length
}
if n == 1 {
break
}
nnals = m.spspps[index]
index++
}
return out
}
func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) {
force := false
discont := true
var f *fragmentInfo
if m.opened {
f = m.getFrag(m.nfrags)
// 当前时间戳跳跃很大或者是往回跳跃超过了阈值强制开启新的fragment
maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10)
if (ts > m.fragTS && ts-m.fragTS > maxfraglen) || (m.fragTS > ts && m.fragTS-ts > negMaxfraglen) {
nazalog.Warnf("hls: force fragment split: fragTS=%d, ts=%d", m.fragTS, ts)
force = true
} else {
// TODO chef: 考虑ts比fragTS小的情况
f.duration = float64(ts-m.fragTS) / 90000
discont = false
}
}
// 时长超过设置的ts文件切片阈值才行
if f != nil && f.duration < float64(m.config.FragmentDurationMS)/1000 {
boundary = false
}
// 开启新的fragment
if boundary || force {
m.closeFragment()
m.openFragment(ts, discont)
}
// 音频已经缓存了一定时长的数据了,需要落盘了
if m.opened && m.aaframe != nil && ((m.aframePTS + maxAudioDelay*90/uint64(flushRate)) < ts) {
m.flushAudio()
}
}
func (m *Muxer) openFragment(ts uint64, discont bool) {
if m.opened {
return
}
id := m.getFragmentID()
filename := getTSFilename(m.outPath, m.streamName, id)
m.fragmentOP.OpenFile(filename)
m.opened = true
frag := m.getFrag(m.nfrags)
frag.discont = discont
frag.id = id
m.fragTS = ts
m.flushAudio()
}
func (m *Muxer) closeFragment() {
if !m.opened {
return
}
m.fragmentOP.CloseFile()
m.opened = false
m.nextFrag()
m.writePlaylist()
}
func (m *Muxer) writePlaylist() {
fp, err := os.Create(m.playlistFilenameBak)
nazalog.Assert(nil, err)
// 找出时长最长的fragment
maxFrag := float64(m.config.FragmentDurationMS) / 1000
for i := 0; i < m.nfrags; i++ {
frag := m.getFrag(i)
if frag.duration > maxFrag {
maxFrag = frag.duration + 0.5
}
}
// TODO chef 优化这块buffer的构造
var buf bytes.Buffer
buf.WriteString("#EXTM3U\n")
buf.WriteString("#EXT-X-VERSION:3\n")
buf.WriteString("#EXT-X-ALLOW-CACHE:NO\n")
buf.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(maxFrag)))
buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n\n", m.frag))
for i := 0; i < m.nfrags; i++ {
frag := m.getFrag(i)
if frag.discont {
buf.WriteString("#EXT-X-DISCONTINUITY\n")
}
buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s\n", frag.duration, getTSFilenameWithoutPath(m.streamName, frag.id)))
}
_, err = fp.Write(buf.Bytes())
nazalog.Assert(nil, err)
_ = fp.Close()
err = os.Rename(m.playlistFilenameBak, m.playlistFilename)
nazalog.Assert(nil, err)
}
// 创建文件夹,如果文件夹已经存在,老的文件夹会被删除
func (m *Muxer) ensureDir() {
err := os.RemoveAll(m.outPath)
nazalog.Assert(nil, err)
err = os.MkdirAll(m.outPath, 0777)
nazalog.Assert(nil, err)
}
func (m *Muxer) getFragmentID() int {
return m.frag + m.nfrags
}
func (m *Muxer) getFrag(n int) *fragmentInfo {
return &m.frags[(m.frag+n)%(m.config.FragmentNum*2+1)]
}
// TODO chef: 这个函数重命名为incr更好些
func (m *Muxer) nextFrag() {
if m.nfrags == m.config.FragmentNum {
m.frag++
} else {
m.nfrags++
}
}
// 将音频数据落盘的几种情况:
// 1. open fragment时如果aframe中还有数据
// 2. update fragment时判断音频的时间戳
// 3. 音频队列长度过长时
// 4. 流关闭时
func (m *Muxer) flushAudio() {
if !m.opened {
nazalog.Warn("flushAudio by not opened.")
return
}
if m.aaframe == nil {
nazalog.Warn("flushAudio by aframe is nil.")
return
}
frame := &mpegTSFrame{
pts: m.aframePTS,
dts: m.aframePTS,
pid: PidAudio,
sid: streamIDAudio,
cc: m.audioCC,
key: false,
}
m.fragmentOP.WriteFrame(frame, m.aaframe)
m.audioCC = frame.cc
m.aaframe = nil
}

@ -0,0 +1,77 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"fmt"
"io/ioutil"
"strings"
)
// 本文件聚合以下功能:
// - 生成HLSm3u8文件+ts文件文件命名规则以及文件存放规则
// - HTTP请求HLS时request URI和文件路径的映射规则
// HTTP请求URI格式已经文件路径的映射规则
//
// 假设
// 流名称="test110"
// rootPath="/tmp/lal/hls/"
//
// 则
// http://127.0.0.1:8081/hls/test110/playlist.m3u8 -> /tmp/lal/hls/test110/playlist.m3u8
// http://127.0.0.1:8081/hls/test110/test110-0.ts -> /tmp/lal/hls/test110/test110-0.ts
type requestInfo struct {
fileName string
streamName string
fileType string
}
// RequestURI example:
// uri -> fileName streamName fileType
// http://127.0.0.1:8081/hls/test110/playlist.m3u8 -> playlist.m3u8 test110 m3u8
// http://127.0.0.1:8081/hls/test110/test110-0.ts -> test110-0.ts test110 ts
func parseRequestInfo(uri string) (ri requestInfo) {
ss := strings.Split(uri, "/")
if len(ss) < 2 {
return
}
ri.streamName = ss[len(ss)-2]
ri.fileName = ss[len(ss)-1]
ss = strings.Split(ri.fileName, ".")
if len(ss) < 2 {
return
}
ri.fileType = ss[len(ss)-1]
return
}
func readFileContent(rootOutPath string, ri requestInfo) ([]byte, error) {
filename := fmt.Sprintf("%s%s/%s", rootOutPath, ri.streamName, ri.fileName)
return ioutil.ReadFile(filename)
}
func getMuxerOutPath(rootOutPath string, streamName string) string {
return fmt.Sprintf("%s%s/", rootOutPath, streamName)
}
func getM3U8Filename(outpath string, streamName string) string {
return fmt.Sprintf("%s%s.m3u8", outpath, "playlist")
}
func getTSFilename(outpath string, streamName string, id int) string {
return fmt.Sprintf("%s%s-%d.ts", outpath, streamName, id)
}
func getTSFilenameWithoutPath(streamName string, id int) string {
return fmt.Sprintf("%s-%d.ts", streamName, id)
}

@ -0,0 +1,72 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"net/http"
"github.com/q191201771/naza/pkg/nazalog"
)
type Server struct {
addr string
outPath string
}
func NewServer(addr string, outPath string) *Server {
return &Server{
addr: addr,
outPath: outPath,
}
}
func (s *Server) RunLoop() error {
nazalog.Infof("start hls listen. addr=%s", s.addr)
return http.ListenAndServe(s.addr, s)
}
func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
//nazalog.Debugf("%+v", req)
// TODO chef:
// - check appname in URI path
// - DIY 404 response body
ri := parseRequestInfo(req.RequestURI)
//nazalog.Debugf("%+v", ri)
if ri.fileName == "" || ri.streamName == "" || (ri.fileType != "m3u8" && ri.fileType != "ts") {
nazalog.Warnf("%+v", ri)
resp.WriteHeader(404)
return
}
content, err := readFileContent(s.outPath, ri)
if err != nil {
nazalog.Warnf("%+v", err)
resp.WriteHeader(404)
return
}
switch ri.fileType {
case "m3u8":
resp.Header().Add("Content-Type", "application/x-mpegurl")
//resp.Header().Add("Content-Type", "application/vnd.apple.mpegurl")
case "ts":
resp.Header().Add("Content-Type", "video/mp2t")
}
resp.Header().Add("Cache-Control", "no-cache")
//resp.Header().Add("Access-Control-Allow-Origin", "*")
//resp.Header().Add("Access-Control-Allow-Credentials", "true")
//resp.Header().Add("Access-Control-Allow-Methods", "*")
//resp.Header().Add("Access-Control-Allow-Headers", "Content-Type,Access-Token")
//resp.Header().Add("Access-Control-Allow-Expose-Headers", "*")
_, _ = resp.Write(content)
return
}

@ -1,405 +0,0 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"bytes"
"encoding/hex"
"fmt"
"os"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
type Frag struct {
id uint64
keyID uint64
duration float64 // 当前fragment中数据的时长单位秒
active bool
discont bool // #EXT-X-DISCONTINUITY
}
type Session struct {
streamName string
playlistFilename string
playlistFilenameBak string
adts aac.ADTS
//aacSeqHeader []byte
spspps []byte
videoCC uint8
audioCC uint8
opened bool
videoOut []byte // 帧
fp *os.File
fragTS uint64 // 新建立fragment时的时间戳毫秒 * 90
nfrags int // 大序号增长到winfrags后就增长frag
frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []Frag // TS文件的环形队列记录TS的信息比如写M3U8文件时要用 2 * winfrags + 1
aaframe []byte
//aframeBase uint64 // 上一个音频帧的时间戳
//aframeNum uint64
aframePTS uint64 // 最新音频帧的时间戳
}
func NewSession(streamName string) *Session {
playlistFilename := fmt.Sprintf("%s%s.m3u8", outPath, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
videoOut := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
frags := make([]Frag, 2*winfrags+1) // TODO chef: 为什么是 * 2 + 1
return &Session{
videoOut: videoOut,
aaframe: nil,
frags: frags,
streamName: streamName,
playlistFilename: playlistFilename,
playlistFilenameBak: playlistFilenameBak,
}
}
func (s *Session) Start() {
}
func (s *Session) Stop() {
s.flushAudio()
s.closeFragment()
}
func (s *Session) FeedRTMPMessage(msg rtmp.AVMsg) {
// TODO chef: to be continued
// HLS还没有开发完
return
switch msg.Header.MsgTypeID {
case rtmp.TypeidAudio:
s.feedAudio(msg)
case rtmp.TypeidVideo:
s.feedVideo(msg)
}
}
func (s *Session) feedVideo(msg rtmp.AVMsg) {
if msg.Payload[0]&0xF != 7 {
// TODO chef: HLS视频现在只做了h264的支持
return
}
ftype := msg.Payload[0] & 0xF0 >> 4
htype := msg.Payload[1]
if ftype == 1 && htype == 0 {
s.cacheSPSPPS(msg)
return
}
cts := bele.BEUint24(msg.Payload[2:])
audSent := false
spsppsSent := false
// 优化这块buffer
out := s.videoOut[0:0]
for i := 5; i != len(msg.Payload); {
nalBytes := int(bele.BEUint32(msg.Payload[i:]))
i += 4
srcNalType := msg.Payload[i]
nalType := srcNalType & 0x1F
//nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts)
if nalType >= 7 && nalType <= 9 {
nazalog.Warn("should not reach here.")
i += nalBytes
continue
}
if !audSent {
switch nalType {
case 1, 5, 6:
out = append(out, audNal...)
audSent = true
case 9:
audSent = true
}
}
switch nalType {
case 1:
spsppsSent = false
case 5:
if !spsppsSent {
out = s.appendSPSPPS(out)
}
spsppsSent = true
}
if len(out) == 0 {
out = append(out, nalStartCode...)
} else {
out = append(out, nalStartCode3...)
}
out = append(out, msg.Payload[i:i+nalBytes]...)
i += nalBytes
}
var frame MPEGTSFrame
frame.cc = s.videoCC
frame.dts = uint64(msg.Header.TimestampAbs) * 90
frame.pts = frame.dts + uint64(cts)*90
frame.pid = PidVideo
frame.sid = streamIDVideo
frame.key = ftype == 1
boundary := frame.key && (!s.opened || s.adts.IsNil() || s.aaframe != nil)
s.updateFragment(frame.dts, boundary, 1)
if !s.opened {
nazalog.Warn("not opened.")
return
}
mpegtsWriteFrame(s.fp, &frame, out)
s.videoCC = frame.cc
}
func (s *Session) feedAudio(msg rtmp.AVMsg) {
if msg.Payload[0]>>4 != 10 {
// TODO chef: HLS音频现在只做了h264的支持
return
}
if msg.Payload[1] == 0 {
s.cacheAACSeqHeader(msg)
return
}
pts := uint64(msg.Header.TimestampAbs) * 90
s.updateFragment(pts, s.spspps == nil, 2)
if s.aaframe == nil {
s.aframePTS = pts
}
adtsHeader := s.adts.GetADTS(uint16(msg.Header.MsgLen))
s.aaframe = append(s.aaframe, adtsHeader...)
s.aaframe = append(s.aaframe, msg.Payload[2:]...)
}
func (s *Session) cacheAACSeqHeader(msg rtmp.AVMsg) {
nazalog.Debug("cacheAACSeqHeader")
s.adts.PutAACSequenceHeader(msg.Payload)
}
func (s *Session) cacheSPSPPS(msg rtmp.AVMsg) {
nazalog.Debugf("cacheSPSPPS. %s", hex.Dump(msg.Payload))
s.spspps = msg.Payload
}
func (s *Session) appendSPSPPS(out []byte) []byte {
index := 10
nnals := s.spspps[index] & 0x1f
index++
nazalog.Debugf("SPS number: %d", nnals)
for n := 0; ; n++ {
for ; nnals != 0; nnals-- {
len := int(bele.BEUint16(s.spspps[index:]))
nazalog.Debugf("header NAL length:%d", len)
index += 2
out = append(out, nalStartCode...)
out = append(out, s.spspps[index:index+len]...)
index += len
}
if n == 1 {
break
}
nnals = s.spspps[index]
nazalog.Debugf("PPS number: %d", nnals)
index++
}
return out
}
func (s *Session) updateFragment(ts uint64, boundary bool, flushRate int) {
force := false
discont := true
var f *Frag
if s.opened {
f = s.getFrag(s.nfrags)
// 当前时间戳跳跃很大或者是往回跳跃超过了阈值强制开启新的fragment
if (ts > s.fragTS && ts-s.fragTS > maxfraglen) || (s.fragTS > ts && s.fragTS-ts > negMaxfraglen) {
nazalog.Warnf("hls: force fragment split: fragTS=%d, ts=%d", s.fragTS, ts)
force = true
} else {
// TODO chef: 考虑ts比fragTS小的情况
f.duration = float64(ts-s.fragTS) / 90000
discont = false
}
}
// 时长超过设置的ts文件切片阈值才行
if f != nil && f.duration < fraglen/float64(1000) {
boundary = false
}
// 开启新的fragment
if boundary || force {
s.closeFragment()
s.openFragment(ts, discont)
}
// 音频已经缓存了一定时长的数据了,需要落盘了
//nazalog.Debugf("CHEFERASEME 05191839, flush_rate=%d, size=%d, aframe_pts=%d, ts=%d",
// flushRate, len(s.aaframe), s.aframePTS, ts)
if s.opened && s.aaframe != nil && ((s.aframePTS + maxAudioDelay*90/uint64(flushRate)) < ts) {
//nazalog.Debugf("CHEFERASEME 05191839.")
s.flushAudio()
}
}
func (s *Session) openFragment(ts uint64, discont bool) {
if s.opened {
return
}
s.ensureDir()
id := s.getFragmentID()
filename := fmt.Sprintf("%s%s-%d.ts", outPath, s.streamName, id)
s.fp = mpegtsOpenFile(filename)
s.opened = true
frag := s.getFrag(s.nfrags)
frag.active = true
frag.discont = discont
frag.id = uint64(id)
s.fragTS = ts
s.flushAudio()
}
func (s *Session) closeFragment() {
if !s.opened {
return
}
mpegtsCloseFile(s.fp)
s.opened = false
s.nextFrag()
s.writePlaylist()
}
func (s *Session) writePlaylist() {
fp, err := os.Create(s.playlistFilenameBak)
nazalog.Assert(nil, err)
// 找出时长最长的fragment
maxFrag := float64(fraglen / 1000)
for i := 0; i < s.nfrags; i++ {
frag := s.getFrag(i)
if frag.duration > maxFrag {
maxFrag = frag.duration + 0.5
}
}
// TODO chef 优化这块buffer的构造
var buf bytes.Buffer
buf.WriteString("#EXTM3U\n")
buf.WriteString("#EXT-X-VERSION:3\n")
buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n", s.frag))
buf.WriteString(fmt.Sprintf("#EXT-X-TARGETRATION:%d\n", int(maxFrag)))
for i := 0; i < s.nfrags; i++ {
frag := s.getFrag(i)
if frag.discont {
buf.WriteString("#EXT-X-DISCONTINUITY\n")
}
buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s-%d.ts\n", frag.duration, s.streamName, frag.id))
}
_, err = fp.Write(buf.Bytes())
nazalog.Assert(nil, err)
_ = fp.Close()
err = os.Rename(s.playlistFilenameBak, s.playlistFilename)
nazalog.Assert(nil, err)
}
func (s *Session) ensureDir() {
err := os.MkdirAll(outPath, 0777)
nazalog.Assert(nil, err)
}
func (s *Session) getFragmentID() int {
return s.frag + s.nfrags
}
func (s *Session) getFrag(n int) *Frag {
return &s.frags[(s.frag+n)%(winfrags*2+1)]
}
// TODO chef: 这个函数重命名为incr更好些
func (s *Session) nextFrag() {
if s.nfrags == winfrags {
s.frag++
} else {
s.nfrags++
}
}
// 将音频数据落盘的几种情况:
// 1. open fragment时如果aframe中还有数据
// 2. update fragment时判断音频的时间戳
// 3. 音频队列长度过长时
// 4. 流关闭时
func (s *Session) flushAudio() {
if !s.opened {
nazalog.Warn("flushAudio by not opened.")
return
}
if s.aaframe == nil {
nazalog.Warn("flushAudio by aframe is nil.")
return
}
frame := &MPEGTSFrame{
pts: s.aframePTS,
dts: s.aframePTS,
pid: PidAudio,
sid: streamIDAudio,
cc: s.audioCC,
key: false,
}
mpegtsWriteFrame(s.fp, frame, s.aaframe)
s.audioCC = frame.cc
s.aaframe = nil
}

@ -1,24 +0,0 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import "github.com/q191201771/naza/pkg/nazalog"
func SplitTS(content []byte) (ret [][]byte) {
for {
if len(content) < 188 {
nazalog.Assert(0, len(content))
break
}
ret = append(ret, content[0:188])
content = content[188:]
}
return
}

@ -8,9 +8,12 @@
package logic
import "github.com/q191201771/lal/pkg/hls"
type Config struct {
RTMP RTMP `json:"rtmp"`
HTTPFLV HTTPFLV `json:"httpflv"`
HLS HLS `json:"hls"`
}
type RTMP struct {
@ -22,3 +25,8 @@ type HTTPFLV struct {
SubListenAddr string `json:"sub_listen_addr"`
GOPNum int `json:"gop_num"`
}
type HLS struct {
SubListenAddr string `json:"sub_listen_addr"`
*hls.MuxerConfig
}

@ -17,6 +17,8 @@ import (
"testing"
"time"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/lal/pkg/httpflv"
@ -34,6 +36,7 @@ var (
rtmpAddr = ":19350"
httpflvAddr = ":8080"
hlsAddr = ":10001"
rFLVFileName = "testdata/test.flv"
wFLVPullFileName = "testdata/flvpull.flv"
@ -67,6 +70,14 @@ func TestExample(t *testing.T) {
config := logic.Config{
RTMP: logic.RTMP{Addr: rtmpAddr},
HTTPFLV: logic.HTTPFLV{SubListenAddr: httpflvAddr},
HLS: logic.HLS{
SubListenAddr: hlsAddr,
MuxerConfig: &hls.MuxerConfig{
OutPath: "/tmp/lal/hls/",
FragmentDurationMS: 3000,
FragmentNum: 6,
},
},
}
pushURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/11111", config.RTMP.Addr)

@ -25,13 +25,15 @@ type Group struct {
appName string
streamName string
hlsConfig *hls.MuxerConfig
exitChan chan struct{}
mutex sync.Mutex
pubSession *rtmp.ServerSession
rtmpSubSessionSet map[*rtmp.ServerSession]struct{}
httpflvSubSessionSet map[*httpflv.SubSession]struct{}
hlsSession *hls.Session
hlsMuxer *hls.Muxer
gopCache *GOPCache
// TODO chef: 如果没有开启httpflv监听可以不做格式转换节约CPU资源
httpflvGopCache *GOPCache
@ -39,13 +41,14 @@ type Group struct {
var _ rtmp.PubSessionObserver = &Group{}
func NewGroup(appName string, streamName string, rtmpGOPNum int, httpflvGOPNum int) *Group {
func NewGroup(appName string, streamName string, rtmpGOPNum int, httpflvGOPNum int, hlsConfig *hls.MuxerConfig) *Group {
uk := unique.GenUniqueKey("GROUP")
log.Infof("lifecycle new group. [%s] appName=%s, streamName=%s", uk, appName, streamName)
return &Group{
UniqueKey: uk,
appName: appName,
streamName: streamName,
hlsConfig: hlsConfig,
exitChan: make(chan struct{}, 1),
rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}),
httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}),
@ -84,8 +87,8 @@ func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool {
}
group.pubSession = session
group.hlsSession = hls.NewSession(group.streamName)
group.hlsSession.Start()
group.hlsMuxer = hls.NewMuxer(group.streamName, group.hlsConfig)
group.hlsMuxer.Start()
group.mutex.Unlock()
session.SetPubSessionObserver(group)
@ -97,7 +100,7 @@ func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.pubSession = nil
group.hlsSession.Stop()
group.hlsMuxer.Stop()
group.gopCache.Clear()
group.httpflvGopCache.Clear()
@ -156,7 +159,7 @@ func (group *Group) OnReadRTMPAVMsg(msg rtmp.AVMsg) {
//log.Debugf("%+v, %02x, %02x", msg.Header, msg.Payload[0], msg.Payload[1])
group.broadcastRTMP(msg)
group.hlsSession.FeedRTMPMessage(msg)
group.hlsMuxer.FeedRTMPMessage(msg)
}
func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {

@ -12,6 +12,8 @@ import (
"sync"
"time"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
log "github.com/q191201771/naza/pkg/nazalog"
@ -22,6 +24,7 @@ type ServerManager struct {
httpflvServer *httpflv.Server
rtmpServer *rtmp.Server
hlsServer *hls.Server
exitChan chan struct{}
mutex sync.Mutex
@ -40,6 +43,9 @@ func NewServerManager(config *Config) *ServerManager {
if len(config.RTMP.Addr) != 0 {
m.rtmpServer = rtmp.NewServer(m, config.RTMP.Addr)
}
if len(config.HLS.SubListenAddr) != 0 {
m.hlsServer = hls.NewServer(config.HLS.SubListenAddr, config.HLS.OutPath)
}
return m
}
@ -60,6 +66,14 @@ func (sm *ServerManager) RunLoop() {
}()
}
if sm.hlsServer != nil {
go func() {
if err := sm.hlsServer.RunLoop(); err != nil {
log.Error(err)
}
}()
}
t := time.NewTicker(1 * time.Second)
defer t.Stop()
var count uint32
@ -168,7 +182,7 @@ func (sm *ServerManager) check() {
func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group {
group, exist := sm.groupMap[streamName]
if !exist {
group = NewGroup(appName, streamName, sm.config.RTMP.GOPNum, sm.config.HTTPFLV.GOPNum)
group = NewGroup(appName, streamName, sm.config.RTMP.GOPNum, sm.config.HTTPFLV.GOPNum, sm.config.HLS.MuxerConfig)
sm.groupMap[streamName] = group
}
go group.RunLoop()

@ -214,6 +214,10 @@ func (amf0) ReadNull(b []byte) (int, error) {
return 1, nil
}
// TODO chef:
// 考虑将map改成数组
// - Go的map是顺序随机的使用map也即丢失了原始数据的顺序性
// - 如果Object中存在key相同的情况先不讨论是否合法如果业务方非要这么用可能会覆盖导致丢失
func (amf0) ReadObject(b []byte) (map[string]interface{}, int, error) {
if len(b) < 1 {
return nil, 0, ErrAMFTooShort

@ -19,6 +19,8 @@ import (
log "github.com/q191201771/naza/pkg/nazalog"
)
// https://pengrl.com/p/20027
const version = uint8(3)
const (

@ -28,6 +28,7 @@ type Header struct {
TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳
}
// TODO chef: 将这个buffer实现和bytes.Buffer做比较考虑将它放入naza package中
type StreamMsg struct {
buf []byte
b uint32

Loading…
Cancel
Save