- [refactor] app/lals重命名为app/lalserver,避免描述时容易和lal造成混淆
 - [refactor] 将app/lalserver的大部分逻辑代码移入pkg/logic中
 - [test] 将所有package的Server、Session等内容的实例测试收缩至package innertest中,多个package都可以共用它做单元测试
 - [refactor] lalserver配置中增加显式enable字段,用于开启关闭特定协议
 - [refactor] 各package的Server对象增加独立的Listen函数,使得绑定监听端口失败时上层可以第一时间感知
 - [feat] demo/analyseflv,增加I帧间隔检查,增加metadata分析
 - [fix] package avc: 修复函数CalcSliceType解析I、P、B帧类型的bug
 - [refactor] package aac: 函数ADTS::PutAACSequenceHeader检查输入切片长度
 - [reafactor] package aac: 删除函数CaptureAAC
 - [feat] 增加demo/learnrtsp,pkg/rtsp,开始学习rtsp
pull/8/head
q191201771 5 years ago
parent 7e804f5f9e
commit 9af3b44753

8
.gitignore vendored

@ -2,6 +2,9 @@ profile.out
coverage.html
*.aac
*.h264
*.flv
logs
testdata
/pre-commit.sh
/coverage.txt
@ -14,9 +17,4 @@ coverage.html
/release
/.idea
/.trash
/logs
/playground
/testdata
/pkg/rtmp/testdata
/pkg/httpflv/testdata
/pkg/logic/testdata

@ -65,56 +65,55 @@ $./build.sh
#### 运行
```shell
$./bin/lals -c conf/lals.conf.json
$./bin/lalserver -c conf/lalserver.conf.json
```
#### 体验功能
快速体验lals服务器见[常见推拉流客户端软件的使用方式](https://pengrl.com/p/20051/)
快速体验lalserver服务器见:[常见推拉流客户端软件的使用方式](https://pengrl.com/p/20051/)
### 二. 配置文件说明
```
{
"rtmp": {
"addr": ":19350", // RTMP服务监听的端口客户端向lals推拉流都是这个地址
"enable": true, // 是否开启rtmp服务的监听
"addr": ":19350", // RTMP服务监听的端口客户端向lalserver推拉流都是这个地址
"gop_num": 2 // RTMP拉流的GOP缓存数量加速秒开
},
"httpflv": {
"enable": true, // 是否开启HTTP-FLV服务的监听
"sub_listen_addr": ":8080", // HTTP-FLV拉流地址
"gop_num": 2
},
"hls": {
"enable": true, // 是否开启HLS服务的监听
"sub_listen_addr": ":8081", // HLS监听地址
"out_path": "/tmp/lal/hls/", // HLS文件保存根目录
"fragment_duration_ms": 3000, // 单个TS文件切片时长
"fragment_duration_ms": 3000, // 单个TS文件切片时长,单位毫秒
"fragment_num": 6 // M3U8文件列表中TS文件的数量
},
"pprof": {
"enable": true, // 是否开启Go pprof web服务的监听
"addr": ":10001" // Go pprof web地址
},
"log": {
"level": 1, // 日志级别1 debug, 2 info, 3 warn, 4 error, 5 fatal
"filename": "./logs/lals.log", // 日志输出文件
"filename": "./logs/lalserver.log", // 日志输出文件
"is_to_stdout": true, // 是否打印至标志控制台输出
"is_rotate_daily": true, // 日志按天翻滚
"short_file_flag": true, // 日志末尾是否携带源码文件名以及行号的信息
"assert_behavior": 1 // 日志断言的行为1 只打印错误日志 2 打印并退出程序 3 打印并panic
},
"pprof": {
"addr": ":10001" // Go pprof web地址
}
}
```
其它放在代码中的配置:
- [rtmp/var.go](https://github.com/q191201771/lal/blob/master/pkg/rtmp/var.go)
- [httpflv/var.go](https://github.com/q191201771/lal/blob/master/pkg/httpflv/var.go)
### 三. 仓库目录框架
简单来说,源码在`pkg/``app/``demo/`三个目录下。
- `pkg/`存放各package包供本repo的程序以及其他业务方使用
- `app/`重要程序的入口目前仅包含lals——基于lal编写的一个通用流媒体服务器程序
- `app/`重要程序的入口目前仅包含lalserver——基于lal编写的一个通用流媒体服务器程序
- `demo/`:存放各种基于`lal/pkg`开发的小程序(小工具),一个子目录是一个程序,详情见各源码文件中头部的说明
```
@ -122,13 +121,13 @@ pkg/ ......
|-- rtmp/ ......RTMP协议
|-- httpflv/ ......HTTP-FLV协议
|-- hls/ ......HLS协议
|-- logic/ ......lals服务器程序的上层业务逻辑
|-- logic/ ......lalserver服务器程序的上层业务逻辑
|-- aac/ ......音频AAC编码格式相关
|-- avc/ ......视频H264/AVC编码格式相关
|-- hevc/ ......视频H265/HEVC编码格式相关
app/ ......
|-- lals/ ......流媒体服务器lals的main函数入口
|-- lalserver/ ......流媒体服务器lalserver的main函数入口
demo/ ......
|-- analyseflv ......
@ -162,34 +161,31 @@ conf/ ......配置文件目录
* 不依赖第三方代码
* 后续可快速集成各种网络传输协议,流媒体封装协议
#### lals服务器功能
#### lalserver服务器功能
- [x] **pub 接收推流:** RTMP
- [x] **sub 接收拉流:** RTMPHTTP-FLVHLS(m3u8+ts)
- [x] **音频编码格式:** AAC
- [x] **视频编码格式:** H264/AVCH265/HEVC
- [x] **GOP缓存** 用于秒开
TODO
- RTMP转推
- RTMP回源
- HTTP-FLV回源
- 静态转推、回源
- 动态转推、回源
- rtsp
- rtp/rtcp
- webrtc
- udp quic
- udp srt
- udp kcp
- 分布式。提供与外部调度系统交互的接口。应对多级分发场景,或平级源站类型场景
- 调整框架代码
- 各种推流、拉流客户端兼容性测试
- 和其它主流服务器的性能对比测试
- 整理日志
- 稳定性测试
- mp4
- [ ] RTMP转推
- [ ] RTMP回源
- [ ] HTTP-FLV回源
- [ ] 静态转推、回源
- [ ] 动态转推、回源
- [ ] rtsp
- [ ] rtp/rtcp
- [ ] webrtc
- [ ] udp quic
- [ ] udp srt
- [ ] udp kcp
- [ ] mp4
- [ ] 分布式。提供与外部调度系统交互的接口。应对多级分发场景,或平级源站类型场景
- [ ] 调整框架代码
- [ ] 各种推流、拉流客户端兼容性测试
- [ ] 和其它主流服务器的性能对比测试
- [ ] 整理日志
- [ ] 稳定性测试
### 五. 文档

@ -1,27 +1,27 @@
### 性能测试
测试场景一持续推送n路RTMP流至lals没有拉流
测试场景一持续推送n路RTMP流至lalserver(没有拉流)
| 推流数量 | CPU占用 | 内存占用RES |
| - | - | - |
| 1000 | 占单个核的16% | 104MB |
测试场景二持续推送1路RTMP流至lals使用RTMP协议从lals拉取n路流
测试场景二持续推送1路RTMP流至lalserver使用RTMP协议从lalserver拉取n路流
| 拉流数量 | CPU占用 | 内存占用RES |
| - | - | - |
| 1000 | 占单个核的30% | 120MB |
测试场景三: 持续推送n路RTMP流至lals使用RTMP协议从lals拉取n路流推拉流为1对1的关系
测试场景三: 持续推送n路RTMP流至lalserver使用RTMP协议从lalserver拉取n路流推拉流为1对1的关系
| 推流数量 | 拉流数量 | CPU占用 | 内存占用RES |
| - | - | - | - |
| 1000 | 1000 | 125% | 464MB |
* 测试机32核16Glals服务器和压测工具同时跑在这一个机器上
* 压测工具lal中的 `/app/flvfile2rtmppush` 以及 `/app/rtmppull`
* 测试机32核16Glalserver服务器和压测工具同时跑在这一个机器上)
* 压测工具lal中的 `/demo/flvfile2rtmppush` 以及 `/demo/rtmppull`
* 推流码率:使用`srs-bench`中的FLV文件大概200kbps
* lals版本基于 git commit: xxx
* lalserver版本:基于 git commit: xxx
*由于测试机是台共用的机器,上面还跑了许多其他服务,这里列的只是个粗略的数据,还待做更多的性能分析以及优化。如果你对性能感兴趣,欢迎进行测试并将结果反馈给我。*
@ -35,7 +35,7 @@ RTMP推流端
- OBS 24.0.3(win10 64 bit)
- ffmpeg 3.4.2(macos)
- srs-bench (macos srs项目配套的一个压测工具)
- flvfile2rtmppush (macos lal app中的RTMP推流客户端)
- flvfile2rtmppush (macos lal demo中的RTMP推流客户端)
RTMP / HTTP-FLV 拉流端:
- VLC 3.0.8(macos 10.15.1)

@ -1,70 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"encoding/json"
"io/ioutil"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/naza/pkg/nazajson"
log "github.com/q191201771/naza/pkg/nazalog"
)
type Config struct {
logic.Config
Log log.Option `json:"log"`
PProf PProfConfig `json:"pprof"`
}
type PProfConfig struct {
Addr string `json:"addr"`
}
func LoadConf(confFile string) (*Config, error) {
var config Config
rawContent, err := ioutil.ReadFile(confFile)
if err != nil {
return nil, err
}
if err = json.Unmarshal(rawContent, &config); err != nil {
return nil, err
}
j, err := nazajson.New(rawContent)
if err != nil {
return nil, err
}
// 检查配置必须项
// 暂时无
// 配置不存在时,设置默认值
if !j.Exist("log.level") {
config.Log.Level = log.LevelDebug
}
if !j.Exist("log.filename") {
config.Log.Filename = "./logs/lals.log"
}
if !j.Exist("log.is_to_stdout") {
config.Log.IsToStdout = true
}
if !j.Exist("log.is_rotate_daily") {
config.Log.IsRotateDaily = true
}
if !j.Exist("log.short_file_flag") {
config.Log.ShortFileFlag = true
}
if !j.Exist("log.assert_behavior") {
config.Log.AssertBehavior = log.AssertError
}
return &config, nil
}

@ -1,86 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/naza/pkg/bininfo"
log "github.com/q191201771/naza/pkg/nazalog"
)
var sm *logic.ServerManager
func main() {
confFile := parseFlag()
config := loadConf(confFile)
initLog(config.Log)
log.Infof("bininfo: %s", bininfo.StringifySingleLine())
sm = logic.NewServerManager(&config.Config)
if config.PProf.Addr != "" {
go runWebPProf(config.PProf.Addr)
}
go runSignalHandler()
sm.RunLoop()
}
func parseFlag() string {
binInfoFlag := flag.Bool("v", false, "show bin info")
cf := flag.String("c", "", "specify conf file")
flag.Parse()
if *binInfoFlag {
_, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine())
os.Exit(0)
}
if *cf == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `
Example:
./bin/lals -c ./conf/lals.conf.json
`)
os.Exit(1)
}
return *cf
}
func loadConf(confFile string) *Config {
config, err := LoadConf(confFile)
if err != nil {
log.Errorf("load conf failed. file=%s err=%+v", confFile, err)
os.Exit(1)
}
log.Infof("load conf file succ. file=%s content=%+v", confFile, config)
return config
}
func initLog(opt log.Option) {
if err := log.Init(func(option *log.Option) {
*option = opt
}); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "initial log failed. err=%+v\n", err)
os.Exit(1)
}
log.Info("initial log succ.")
}
func runWebPProf(addr string) {
log.Infof("start web pprof listen. addr=%s", addr)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Error(err)
return
}
}

@ -0,0 +1,44 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"flag"
"fmt"
"os"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/naza/pkg/bininfo"
)
var sm *logic.ServerManager
func main() {
confFile := parseFlag()
logic.Entry(confFile)
}
func parseFlag() string {
binInfoFlag := flag.Bool("v", false, "show bin info")
cf := flag.String("c", "", "specify conf file")
flag.Parse()
if *binInfoFlag {
_, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine())
os.Exit(0)
}
if *cf == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `
Example:
./bin/lalserver -c ./conf/lalserver.conf.json
`)
os.Exit(1)
}
return *cf
}

@ -49,6 +49,6 @@ do
done
${ROOT_DIR}/${OUT_DIR}/lals -v &&
${ROOT_DIR}/${OUT_DIR}/lalserver -v &&
ls -lrt ${ROOT_DIR}/${OUT_DIR} &&
echo 'build done.'

@ -1,27 +1,31 @@
{
"rtmp": {
"enable": true,
"addr": ":19350",
"gop_num": 2
},
"httpflv": {
"enable": true,
"sub_listen_addr": ":8080",
"gop_num": 2
},
"hls": {
"enable": true,
"sub_listen_addr": ":8081",
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6
},
"pprof": {
"enable": true,
"addr": ":10001"
},
"log": {
"level": 1,
"filename": "./logs/lals.log",
"filename": "./logs/lalserver.log",
"is_to_stdout": true,
"is_rotate_daily": true,
"short_file_flag": true,
"assert_behavior": 1
},
"pprof": {
"addr": ":10001"
}
}

@ -1,27 +1,31 @@
{
"rtmp": {
"enable": true,
"addr": ":19350",
"gop_num": 2
},
"httpflv": {
"enable": true,
"sub_listen_addr": ":8080",
"gop_num": 2
},
"hls": {
"enable": true,
"sub_listen_addr": ":8081",
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6
},
"pprof": {
"enable": true,
"addr": ":10001"
},
"log": {
"level": 1,
"filename": "./logs/lals.log",
"filename": "./logs/lalserver.log",
"is_to_stdout": true,
"is_rotate_daily": true,
"short_file_flag": true,
"assert_behavior": 1
},
"pprof": {
"addr": ":10001"
}
}

@ -38,22 +38,34 @@ import (
// - 音频带宽
// - 视频带宽
// - 视频DTS和PTS不相等的计数
// - I帧间隔时间
// - H264
// - 打印每个tag的类型key seq header...
// - 打印每个tag中有多少个帧SPS PPS SEI IDR SLICE...
// - 打印每个SLICE的类型I、P、B...
// 解析metadata信息并打印
// TODO
// - 解析metadata
// - 检查时间戳正向大的跳跃
// - 打印GOP中帧数量
// - slice_num?
var (
timestampCheckFlag = true
printStatFlag = true
printEveryTagFlag = false
printMetaData = true
analysisVideoTagFlag = false
)
var (
prevAudioTS = int64(-1)
prevVideoTS = int64(-1)
prevTS = int64(-1)
prevIDRTS = int64(-1)
diffIDRTS = int64(-1)
)
func main() {
url := parseFlag()
session := httpflv.NewPullSession()
@ -62,17 +74,14 @@ func main() {
brAudio := bitrate.New()
brVideo := bitrate.New()
prevAudioTS := int64(-1)
prevVideoTS := int64(-1)
prevTS := int64(-1)
videoCTSNotZeroCount := 0
go func() {
for {
time.Sleep(1 * time.Second)
if printStatFlag {
nazalog.Debugf("stat. total=%dKb/s, audio=%dKb/s, video=%dKb/s, videoCTSNotZeroCount=%d", int(brTotal.Rate()), int(brAudio.Rate()), int(brVideo.Rate()), videoCTSNotZeroCount)
nazalog.Debugf("stat. total=%dKb/s, audio=%dKb/s, video=%dKb/s, videoCTSNotZeroCount=%d, diffIDRTS=%d",
int(brTotal.Rate()), int(brAudio.Rate()), int(brVideo.Rate()), videoCTSNotZeroCount, diffIDRTS)
}
}
}()
@ -107,18 +116,18 @@ func main() {
case httpflv.TagTypeAudio:
brAudio.Add(len(tag.Raw))
if timestampCheckFlag {
if prevAudioTS != -1 && int64(tag.Header.Timestamp) < prevAudioTS {
nazalog.Errorf("audio timestamp error. header=%+v, prevAudioTS=%d, diff=%d", tag.Header, prevAudioTS, int64(tag.Header.Timestamp)-prevAudioTS)
nazalog.Errorf("audio timestamp error, less than prev audio timestamp. header=%+v, prevAudioTS=%d, diff=%d", tag.Header, prevAudioTS, int64(tag.Header.Timestamp)-prevAudioTS)
}
if prevTS != -1 && int64(tag.Header.Timestamp) < prevTS {
nazalog.Errorf("audio timestamp error. header=%+v, prevTS=%d, diff=%d", tag.Header, prevTS, int64(tag.Header.Timestamp)-prevTS)
nazalog.Warnf("audio timestamp error. less than prev global timestamp. header=%+v, prevTS=%d, diff=%d", tag.Header, prevTS, int64(tag.Header.Timestamp)-prevTS)
}
}
prevAudioTS = int64(tag.Header.Timestamp)
prevTS = int64(tag.Header.Timestamp)
case httpflv.TagTypeVideo:
if analysisVideoTagFlag {
analysisVideoTag(tag)
}
videoCTS := bele.BEUint24(tag.Raw[13:])
if videoCTS != 0 {
@ -127,11 +136,13 @@ func main() {
brVideo.Add(len(tag.Raw))
if timestampCheckFlag {
if prevVideoTS != -1 && int64(tag.Header.Timestamp) < prevVideoTS {
nazalog.Errorf("video timestamp error. header=%+v, prevVideoTS=%d, diff=%d", tag.Header, prevVideoTS, int64(tag.Header.Timestamp)-prevVideoTS)
nazalog.Errorf("video timestamp error, less than prev video timestamp. header=%+v, prevVideoTS=%d, diff=%d", tag.Header, prevVideoTS, int64(tag.Header.Timestamp)-prevVideoTS)
}
if prevTS != -1 && int64(tag.Header.Timestamp) < prevTS {
nazalog.Errorf("video timestamp error. header=%+v, prevTS=%d, diff=%d", tag.Header, prevTS, int64(tag.Header.Timestamp)-prevTS)
nazalog.Warnf("video timestamp error, less than prev global timestamp. header=%+v, prevTS=%d, diff=%d", tag.Header, prevTS, int64(tag.Header.Timestamp)-prevTS)
}
}
prevVideoTS = int64(tag.Header.Timestamp)
prevTS = int64(tag.Header.Timestamp)
@ -165,6 +176,12 @@ func analysisVideoTag(tag httpflv.Tag) {
naluLen := bele.BEUint32(body[i:])
switch t {
case typeAVC:
if avc.CalcNaluType(body[i+4:]) == avc.NaluUnitTypeIDRSlice {
if prevIDRTS != int64(-1) {
diffIDRTS = int64(tag.Header.Timestamp) - prevIDRTS
}
prevIDRTS = int64(tag.Header.Timestamp)
}
buf.WriteString(fmt.Sprintf(" [%s(%s)] ", avc.CalcNaluTypeReadable(body[i+4:]), avc.CalcSliceTypeReadable(body[i+4:])))
case typeHEVC:
buf.WriteString(fmt.Sprintf(" [%s] ", hevc.CalcNaluTypeReadable(body[i+4:])))
@ -172,8 +189,10 @@ func analysisVideoTag(tag httpflv.Tag) {
i = i + 4 + int(naluLen)
}
}
if analysisVideoTagFlag {
nazalog.Debug(buf.String())
}
}
func parseFlag() string {
url := flag.String("i", "", "specify http-flv url")

@ -71,7 +71,7 @@ func main() {
go func() {
for {
content, err := nazahttp.GetHttpFile(m3u8URL, 3000)
content, err := nazahttp.GetHTTPFile(m3u8URL, 3000)
if err != nil {
nazalog.Error(err)
return
@ -107,7 +107,7 @@ func main() {
nazalog.Infof("< new frag. filename=%s", f.filename)
tsURL := getTSURL(m3u8URL, f.filename)
nazalog.Debug(tsURL)
content, err := nazahttp.GetHttpFile(tsURL, 3000)
content, err := nazahttp.GetHTTPFile(tsURL, 3000)
nazalog.Assert(nil, err)
nazalog.Debugf("TS len=%d", len(content))
}

@ -44,6 +44,8 @@ func main() {
defer vfp.Close()
log.Infof("open es h264 file succ.")
var adts aac.ADTS
for {
tag, err := ffr.ReadTag()
if err == io.EOF {
@ -56,7 +58,14 @@ func main() {
switch tag.Header.Type {
case httpflv.TagTypeAudio:
aac.CaptureAAC(afp, payload)
if payload[1] == 0 {
err = adts.PutAACSequenceHeader(payload)
log.Assert(nil, err)
return
}
_, _ = afp.Write(adts.GetADTS(uint16(len(payload))))
_, _ = afp.Write(payload[2:])
case httpflv.TagTypeVideo:
_ = avc.CaptureAVC(vfp, payload)
}

@ -0,0 +1,22 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/naza/pkg/nazalog"
)
func main() {
s := rtsp.NewServer(":5544")
err := s.Listen()
nazalog.Assert(nil, err)
err = s.RunLoop()
nazalog.Error(err)
}

@ -20,9 +20,9 @@ echo ${v} >> ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/VERSION.txt
echo ${v} >> ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/VERSION.txt
echo ${v} >> ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/VERSION.txt
cp conf/lals.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/conf
cp conf/lals.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/conf
cp conf/lals.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/conf
cp conf/lalserver.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/conf
cp conf/lalserver.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/conf
cp conf/lalserver.conf.json ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/conf
GitCommitLog=`git log --pretty=oneline -n 1`
# 将 log 原始字符串中的单引号替换成双引号
@ -44,15 +44,15 @@ export GOARCH=amd64
echo "build linux..."
export GOOS=linux
cd ${ROOT_DIR}/app/lals && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/bin/lals
cd ${ROOT_DIR}/app/lalserver && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${prefix}linux/bin/lalserver
echo "build macos..."
export GOOS=darwin
cd ${ROOT_DIR}/app/lals && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/bin/lals
cd ${ROOT_DIR}/app/lalserver && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${prefix}macos/bin/lalserver
echo "build windows..."
export GOOS=windows
cd ${ROOT_DIR}/app/lals && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/bin/lals
cd ${ROOT_DIR}/app/lalserver && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${prefix}windows/bin/lalserver.exe
cd ${ROOT_DIR}/${OUT_DIR}
zip -r ${prefix}linux.zip ${prefix}linux

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.12
require github.com/q191201771/naza v0.12.3
require github.com/q191201771/naza v0.13.0

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.12.3 h1:0Z8hMa5RYNqsG1GjGfYyLFkuPLfuZ21iDx3BJEPK0p8=
github.com/q191201771/naza v0.12.3/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=
github.com/q191201771/naza v0.13.0 h1:tHgsMlMu9dHGmL26cGpFJDeP1qdFwbXVJHPg6IlAuvo=
github.com/q191201771/naza v0.13.0/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=

@ -10,17 +10,14 @@ package aac
import (
"encoding/hex"
"io"
"errors"
"github.com/q191201771/naza/pkg/nazabits"
log "github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazalog"
)
// TODO chef: 把Seq Header头两字节的解析和ADTS的内容分离开
// TODO chef: 这个全局变量没有并发保证,删除掉
var adts ADTS
var ErrAAC = errors.New("lal.aac: fxxk")
// Audio Data Transport Stream
type ADTS struct {
@ -33,7 +30,13 @@ type ADTS struct {
// 传入AAC Sequence Header调用GetADTS时需要使用
// @param <payload> rtmp message payload包含前面2个字节
func (obj *ADTS) PutAACSequenceHeader(payload []byte) {
func (a *ADTS) PutAACSequenceHeader(payload []byte) error {
if len(payload) < 4 {
nazalog.Warnf("aac seq header length invalid. len=%d", len(payload))
return ErrAAC
}
// TODO chef: 把Seq Header头两字节的解析和ADTS的内容分离开
// <spec-video_file_format_spec_v10.pdf>, <Audio tags, AUDIODATA>, <page 10/48>
// ----------------------------------------------------------------------------
// soundFormat [4b] 10=AAC
@ -42,12 +45,12 @@ func (obj *ADTS) PutAACSequenceHeader(payload []byte) {
// soundType [1b] 0=sndMono, 1=sndStereo. AAC always 1
// aacPackageType [8b] 0=seq header, 1=AAC raw
br := nazabits.NewBitReader(payload)
soundFormat := br.ReadBits8(4)
soundRate := br.ReadBits8(2)
soundSize := br.ReadBits8(1)
soundType := br.ReadBits8(1)
aacPacketType := br.ReadBits8(8)
log.Debugf("%s %d %d %d %d %d", hex.Dump(payload[:4]), soundFormat, soundRate, soundSize, soundType, aacPacketType)
soundFormat, _ := br.ReadBits8(4)
soundRate, _ := br.ReadBits8(2)
soundSize, _ := br.ReadBits8(1)
soundType, _ := br.ReadBits8(1)
aacPacketType, _ := br.ReadBits8(8)
nazalog.Debugf("%s %d %d %d %d %d", hex.Dump(payload[:4]), soundFormat, soundRate, soundSize, soundType, aacPacketType)
// <ISO_IEC_14496-3.pdf>
// <1.6.2.1 AudioSpecificConfig>, <page 33/110>
@ -58,17 +61,21 @@ func (obj *ADTS) PutAACSequenceHeader(payload []byte) {
// audio object type [5b] 2=AAC LC
// samplingFrequencyIndex [4b] 3=48000 4=44100
// channelConfiguration [4b] 2=left, right front speakers
obj.audioObjectType = br.ReadBits8(5)
obj.samplingFrequencyIndex = br.ReadBits8(4)
obj.channelConfiguration = br.ReadBits8(4)
log.Debugf("%+v", obj)
a.audioObjectType, _ = br.ReadBits8(5)
a.samplingFrequencyIndex, _ = br.ReadBits8(4)
a.channelConfiguration, _ = br.ReadBits8(4)
nazalog.Debugf("%+v", a)
obj.adtsHeader = make([]byte, 7)
if a.adtsHeader == nil {
a.adtsHeader = make([]byte, 7)
}
return nil
}
// 获取ADTS头注意由于ADTS头依赖包的长度而每个包的长度不同所以生成的每个包的ADTS头也不同
// @param <length> rtmp message payload长度包含前面2个字节
func (obj *ADTS) GetADTS(length uint16) []byte {
func (a *ADTS) GetADTS(length uint16) []byte {
// <ISO_IEC_14496-3.pdf>
// <1.A.2.2.1 Fixed Header of ADTS>, <page 75/110>
// <1.A.2.2.2 Variable Header of ADTS>, <page 76/110>
@ -95,33 +102,21 @@ func (obj *ADTS) GetADTS(length uint16) []byte {
// 减去头两字节再加上自身adts头的7个字节
length += 5
bw := nazabits.NewBitWriter(obj.adtsHeader)
bw := nazabits.NewBitWriter(a.adtsHeader)
bw.WriteBits16(12, 0xFFF) // Syncword 0(8) 1(4)
bw.WriteBits8(4, 0x1) // ID, Layer, protection_absent 1(4)
bw.WriteBits8(2, obj.audioObjectType-1) // 2(2)
bw.WriteBits8(4, obj.samplingFrequencyIndex) // 2(4)
bw.WriteBits8(2, a.audioObjectType-1) // 2(2)
bw.WriteBits8(4, a.samplingFrequencyIndex) // 2(4)
bw.WriteBits8(1, 0) // private_bit 2(1)
bw.WriteBits8(3, obj.channelConfiguration) // 2(1) 3(2)
bw.WriteBits8(3, a.channelConfiguration) // 2(1) 3(2)
bw.WriteBits8(4, 0) // origin/copy, home, copyright_identification_bit, copyright_identification_start 3(4)
bw.WriteBits16(13, length) // 3(2) 4(8) 5(3)
bw.WriteBits16(11, 0x7FF) // adts_buffer_fullness 5(5) 6(6)
bw.WriteBits8(2, 0) // no_raw_data_blocks_in_frame 6(2)
return obj.adtsHeader
}
func (obj *ADTS) IsNil() bool {
return obj.adtsHeader == nil
}
// 将 rtmp AAC 传入,输出带 ADTS 头的 AAC ES流
// @param <payload> rtmp message payload部分
func CaptureAAC(w io.Writer, payload []byte) {
if payload[1] == 0 {
adts.PutAACSequenceHeader(payload)
return
return a.adtsHeader
}
_, _ = w.Write(adts.GetADTS(uint16(len(payload))))
_, _ = w.Write(payload[2:])
func (a *ADTS) IsNil() bool {
return a.adtsHeader == nil
}

@ -9,16 +9,16 @@
package aac
import (
"bytes"
"testing"
"github.com/q191201771/naza/pkg/assert"
)
func TestCaptureAAC(t *testing.T) {
b := &bytes.Buffer{}
CaptureAAC(b, []byte{0xaf, 0x0, 0x11, 0x90})
CaptureAAC(b, []byte{0xaf, 0x1, 0x21, 0x2b, 0x94, 0xa5, 0xb6, 0xa, 0xe1, 0x63, 0x21, 0x88, 0xa2, 0x10, 0x4b, 0xdf, 0x9, 0x25, 0xb4, 0xd6, 0xe3, 0x4a, 0xd, 0xe3, 0xa3, 0x64, 0x8d, 0x1, 0x31, 0x80, 0x98, 0x8b, 0xdc, 0x79, 0x3e, 0x2d, 0xd8, 0xed, 0x68, 0xe0, 0xe5, 0xb2, 0x44, 0x13, 0x4, 0x53, 0xbf, 0x28, 0x92, 0xe5, 0xfa, 0x7d, 0x86, 0x78, 0x40, 0x78, 0x4c, 0xb5, 0xe, 0x15, 0x21, 0xc3, 0x57, 0x1a, 0x63, 0x8d, 0xe, 0xc, 0x69, 0xb5, 0x91, 0xd0, 0x52, 0xe, 0x1, 0xa8, 0x67, 0x3e, 0xf9, 0x4e, 0xa2, 0xdb, 0x8b, 0x4a, 0x52, 0x4a, 0xd0, 0x7d, 0x34, 0x4, 0x4f, 0x8d, 0x11, 0xd3, 0xd, 0x20, 0x98, 0x55, 0x86, 0x9, 0xfb, 0xe5, 0xdd, 0x28, 0xd9, 0x4c, 0xde, 0x40, 0x89, 0x26, 0x0, 0xd4, 0x14, 0xcb, 0x6a, 0xc5, 0x91, 0x48, 0xb5, 0xcf, 0x20, 0x6b, 0xbb, 0x16, 0x1b, 0x6b, 0xf4, 0x65, 0x32, 0x5a, 0x8d, 0x1a, 0xe0, 0xa9, 0xf2, 0xf4, 0x71, 0x7e, 0xb8, 0x6f, 0x93, 0xbc, 0x2, 0xf1, 0x36, 0x2b, 0x4e, 0x96, 0x7f, 0x6d, 0x7c, 0xc5, 0x8a, 0x6e, 0xed, 0x6, 0xa9, 0x7f, 0xbd, 0x97, 0x25, 0xb1, 0xa9, 0xac, 0x70, 0xba, 0x58, 0xd7, 0x31, 0x53, 0x94, 0x5f, 0xa5, 0x8f, 0x74, 0x35, 0xea, 0x64, 0x74, 0x6f, 0x19, 0x94, 0x11, 0x46, 0x99, 0x89, 0x80, 0x1c, 0x8a, 0x22, 0x52, 0xcf, 0x9, 0x43, 0x31, 0xc, 0x48, 0x63, 0x18, 0x25, 0xcf, 0x60, 0xcf, 0xc6, 0x46, 0x74, 0x35, 0xbd, 0xa7, 0x7c, 0x66, 0xaa, 0xf7, 0x97, 0x34, 0x4, 0x12, 0x30, 0x49, 0xae, 0x39, 0xb4, 0xfa, 0x74, 0x58, 0x72, 0x23, 0x8d, 0xdc, 0xaa, 0x58, 0x7c, 0xb5, 0x1c, 0xe9, 0x55, 0xd9, 0x55, 0x8c, 0x4e, 0x51, 0xd4, 0xa8, 0xb4, 0x76, 0x61, 0x55, 0xd0, 0xea, 0x55, 0x39, 0xda, 0x9, 0x1b, 0x52, 0x79, 0xbd, 0x8d, 0xff, 0xb8, 0xcb, 0xa0, 0xf4, 0xc2, 0xe3, 0xfc, 0x87, 0x80, 0x6c, 0xa8, 0xa6, 0x4e, 0x8d, 0x10, 0x9a, 0xc9, 0x3b, 0x8e, 0x52, 0x34, 0x55, 0x20, 0xa9, 0xa4, 0xb2, 0xf0, 0xf0, 0xb0, 0x29, 0x5c, 0xa7, 0xea, 0xc6, 0x11, 0x91, 0xa0, 0x10, 0x3, 0x77, 0xc3, 0xe8, 0xa7, 0xd1, 0x8b, 0xdc, 0x35, 0xc2, 0x95, 0x6f, 0x25, 0xec, 0xbb, 0x8a, 0x8a, 0xf5, 0xd6, 0x59, 0x9c, 0xa2, 0x8b, 0xc, 0x15, 0x5d, 0x50, 0xdb, 0xf2, 0xda, 0x79, 0xd6, 0xb8, 0xd5, 0x94, 0x99, 0xb9, 0x7a, 0x67, 0x8e, 0xd2, 0x6a, 0x58, 0x88, 0x68, 0xa4, 0xc2, 0x17, 0xdd, 0x5a, 0xf1, 0xd1, 0xe3, 0xc7, 0x3e, 0x76, 0x2e, 0x65, 0xc5, 0xc9, 0x3, 0x80})
expected := []byte{0xff, 0xf1, 0x4c, 0x80, 0x2d, 0x9f, 0xfc, 0x21, 0x2b, 0x94, 0xa5, 0xb6, 0xa, 0xe1, 0x63, 0x21, 0x88, 0xa2, 0x10, 0x4b, 0xdf, 0x9, 0x25, 0xb4, 0xd6, 0xe3, 0x4a, 0xd, 0xe3, 0xa3, 0x64, 0x8d, 0x1, 0x31, 0x80, 0x98, 0x8b, 0xdc, 0x79, 0x3e, 0x2d, 0xd8, 0xed, 0x68, 0xe0, 0xe5, 0xb2, 0x44, 0x13, 0x4, 0x53, 0xbf, 0x28, 0x92, 0xe5, 0xfa, 0x7d, 0x86, 0x78, 0x40, 0x78, 0x4c, 0xb5, 0xe, 0x15, 0x21, 0xc3, 0x57, 0x1a, 0x63, 0x8d, 0xe, 0xc, 0x69, 0xb5, 0x91, 0xd0, 0x52, 0xe, 0x1, 0xa8, 0x67, 0x3e, 0xf9, 0x4e, 0xa2, 0xdb, 0x8b, 0x4a, 0x52, 0x4a, 0xd0, 0x7d, 0x34, 0x4, 0x4f, 0x8d, 0x11, 0xd3, 0xd, 0x20, 0x98, 0x55, 0x86, 0x9, 0xfb, 0xe5, 0xdd, 0x28, 0xd9, 0x4c, 0xde, 0x40, 0x89, 0x26, 0x0, 0xd4, 0x14, 0xcb, 0x6a, 0xc5, 0x91, 0x48, 0xb5, 0xcf, 0x20, 0x6b, 0xbb, 0x16, 0x1b, 0x6b, 0xf4, 0x65, 0x32, 0x5a, 0x8d, 0x1a, 0xe0, 0xa9, 0xf2, 0xf4, 0x71, 0x7e, 0xb8, 0x6f, 0x93, 0xbc, 0x2, 0xf1, 0x36, 0x2b, 0x4e, 0x96, 0x7f, 0x6d, 0x7c, 0xc5, 0x8a, 0x6e, 0xed, 0x6, 0xa9, 0x7f, 0xbd, 0x97, 0x25, 0xb1, 0xa9, 0xac, 0x70, 0xba, 0x58, 0xd7, 0x31, 0x53, 0x94, 0x5f, 0xa5, 0x8f, 0x74, 0x35, 0xea, 0x64, 0x74, 0x6f, 0x19, 0x94, 0x11, 0x46, 0x99, 0x89, 0x80, 0x1c, 0x8a, 0x22, 0x52, 0xcf, 0x9, 0x43, 0x31, 0xc, 0x48, 0x63, 0x18, 0x25, 0xcf, 0x60, 0xcf, 0xc6, 0x46, 0x74, 0x35, 0xbd, 0xa7, 0x7c, 0x66, 0xaa, 0xf7, 0x97, 0x34, 0x4, 0x12, 0x30, 0x49, 0xae, 0x39, 0xb4, 0xfa, 0x74, 0x58, 0x72, 0x23, 0x8d, 0xdc, 0xaa, 0x58, 0x7c, 0xb5, 0x1c, 0xe9, 0x55, 0xd9, 0x55, 0x8c, 0x4e, 0x51, 0xd4, 0xa8, 0xb4, 0x76, 0x61, 0x55, 0xd0, 0xea, 0x55, 0x39, 0xda, 0x9, 0x1b, 0x52, 0x79, 0xbd, 0x8d, 0xff, 0xb8, 0xcb, 0xa0, 0xf4, 0xc2, 0xe3, 0xfc, 0x87, 0x80, 0x6c, 0xa8, 0xa6, 0x4e, 0x8d, 0x10, 0x9a, 0xc9, 0x3b, 0x8e, 0x52, 0x34, 0x55, 0x20, 0xa9, 0xa4, 0xb2, 0xf0, 0xf0, 0xb0, 0x29, 0x5c, 0xa7, 0xea, 0xc6, 0x11, 0x91, 0xa0, 0x10, 0x3, 0x77, 0xc3, 0xe8, 0xa7, 0xd1, 0x8b, 0xdc, 0x35, 0xc2, 0x95, 0x6f, 0x25, 0xec, 0xbb, 0x8a, 0x8a, 0xf5, 0xd6, 0x59, 0x9c, 0xa2, 0x8b, 0xc, 0x15, 0x5d, 0x50, 0xdb, 0xf2, 0xda, 0x79, 0xd6, 0xb8, 0xd5, 0x94, 0x99, 0xb9, 0x7a, 0x67, 0x8e, 0xd2, 0x6a, 0x58, 0x88, 0x68, 0xa4, 0xc2, 0x17, 0xdd, 0x5a, 0xf1, 0xd1, 0xe3, 0xc7, 0x3e, 0x76, 0x2e, 0x65, 0xc5, 0xc9, 0x3, 0x80}
assert.Equal(t, expected, b.Bytes())
func TestADTS(t *testing.T) {
var adts ADTS
err := adts.PutAACSequenceHeader([]byte{0xaf, 0x0, 0x11, 0x90})
assert.Equal(t, nil, err)
data := []byte{0xaf, 0x1, 0x21, 0x2b, 0x94, 0xa5, 0xb6, 0xa, 0xe1, 0x63, 0x21, 0x88, 0xa2, 0x10, 0x4b, 0xdf, 0x9, 0x25, 0xb4, 0xd6, 0xe3, 0x4a, 0xd, 0xe3, 0xa3, 0x64, 0x8d, 0x1, 0x31, 0x80, 0x98, 0x8b, 0xdc, 0x79, 0x3e, 0x2d, 0xd8, 0xed, 0x68, 0xe0, 0xe5, 0xb2, 0x44, 0x13, 0x4, 0x53, 0xbf, 0x28, 0x92, 0xe5, 0xfa, 0x7d, 0x86, 0x78, 0x40, 0x78, 0x4c, 0xb5, 0xe, 0x15, 0x21, 0xc3, 0x57, 0x1a, 0x63, 0x8d, 0xe, 0xc, 0x69, 0xb5, 0x91, 0xd0, 0x52, 0xe, 0x1, 0xa8, 0x67, 0x3e, 0xf9, 0x4e, 0xa2, 0xdb, 0x8b, 0x4a, 0x52, 0x4a, 0xd0, 0x7d, 0x34, 0x4, 0x4f, 0x8d, 0x11, 0xd3, 0xd, 0x20, 0x98, 0x55, 0x86, 0x9, 0xfb, 0xe5, 0xdd, 0x28, 0xd9, 0x4c, 0xde, 0x40, 0x89, 0x26, 0x0, 0xd4, 0x14, 0xcb, 0x6a, 0xc5, 0x91, 0x48, 0xb5, 0xcf, 0x20, 0x6b, 0xbb, 0x16, 0x1b, 0x6b, 0xf4, 0x65, 0x32, 0x5a, 0x8d, 0x1a, 0xe0, 0xa9, 0xf2, 0xf4, 0x71, 0x7e, 0xb8, 0x6f, 0x93, 0xbc, 0x2, 0xf1, 0x36, 0x2b, 0x4e, 0x96, 0x7f, 0x6d, 0x7c, 0xc5, 0x8a, 0x6e, 0xed, 0x6, 0xa9, 0x7f, 0xbd, 0x97, 0x25, 0xb1, 0xa9, 0xac, 0x70, 0xba, 0x58, 0xd7, 0x31, 0x53, 0x94, 0x5f, 0xa5, 0x8f, 0x74, 0x35, 0xea, 0x64, 0x74, 0x6f, 0x19, 0x94, 0x11, 0x46, 0x99, 0x89, 0x80, 0x1c, 0x8a, 0x22, 0x52, 0xcf, 0x9, 0x43, 0x31, 0xc, 0x48, 0x63, 0x18, 0x25, 0xcf, 0x60, 0xcf, 0xc6, 0x46, 0x74, 0x35, 0xbd, 0xa7, 0x7c, 0x66, 0xaa, 0xf7, 0x97, 0x34, 0x4, 0x12, 0x30, 0x49, 0xae, 0x39, 0xb4, 0xfa, 0x74, 0x58, 0x72, 0x23, 0x8d, 0xdc, 0xaa, 0x58, 0x7c, 0xb5, 0x1c, 0xe9, 0x55, 0xd9, 0x55, 0x8c, 0x4e, 0x51, 0xd4, 0xa8, 0xb4, 0x76, 0x61, 0x55, 0xd0, 0xea, 0x55, 0x39, 0xda, 0x9, 0x1b, 0x52, 0x79, 0xbd, 0x8d, 0xff, 0xb8, 0xcb, 0xa0, 0xf4, 0xc2, 0xe3, 0xfc, 0x87, 0x80, 0x6c, 0xa8, 0xa6, 0x4e, 0x8d, 0x10, 0x9a, 0xc9, 0x3b, 0x8e, 0x52, 0x34, 0x55, 0x20, 0xa9, 0xa4, 0xb2, 0xf0, 0xf0, 0xb0, 0x29, 0x5c, 0xa7, 0xea, 0xc6, 0x11, 0x91, 0xa0, 0x10, 0x3, 0x77, 0xc3, 0xe8, 0xa7, 0xd1, 0x8b, 0xdc, 0x35, 0xc2, 0x95, 0x6f, 0x25, 0xec, 0xbb, 0x8a, 0x8a, 0xf5, 0xd6, 0x59, 0x9c, 0xa2, 0x8b, 0xc, 0x15, 0x5d, 0x50, 0xdb, 0xf2, 0xda, 0x79, 0xd6, 0xb8, 0xd5, 0x94, 0x99, 0xb9, 0x7a, 0x67, 0x8e, 0xd2, 0x6a, 0x58, 0x88, 0x68, 0xa4, 0xc2, 0x17, 0xdd, 0x5a, 0xf1, 0xd1, 0xe3, 0xc7, 0x3e, 0x76, 0x2e, 0x65, 0xc5, 0xc9, 0x3, 0x80}
data2 := adts.GetADTS(uint16(len(data)))
assert.Equal(t, []byte{0xff, 0xf1, 0x4c, 0x80, 0x2d, 0x9f, 0xfc}, data2)
}

@ -11,7 +11,6 @@ package avc
import (
"errors"
"io"
"math"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazabits"
@ -64,24 +63,23 @@ func CalcNaluType(nalu []byte) uint8 {
return nalu[0] & 0x1f
}
// TODO chef: 考虑将error返回给上层
func CalcSliceType(nalu []byte) uint8 {
c := nalu[1]
var leadingZeroBits int
index := 6 // can't unsigned
for ; index >= 0; index-- {
v := nazabits.GetBit8(c, uint(index))
if v == 0 {
leadingZeroBits++
} else {
break
br := nazabits.NewBitReader(nalu[1:])
// first_mb_in_slice
_, err := br.ReadGolomb()
if err != nil {
return 0
}
sliceType, err := br.ReadGolomb()
if err != nil {
return 0
}
rbLeadingZeroBits := nazabits.GetBits8(c, uint(index-1), uint(leadingZeroBits))
codeNum := int(math.Pow(2, float64(leadingZeroBits))) - 1 + int(rbLeadingZeroBits)
if codeNum > 4 {
codeNum -= 5
// TODO chef: 检查非法数据slice type范围 [0, 9]
if sliceType > 4 {
sliceType -= 5
}
return uint8(codeNum)
return uint8(sliceType)
}
func CalcNaluTypeReadable(nalu []byte) string {
@ -157,6 +155,8 @@ func ParseAVCSeqHeader(payload []byte) (sps, pps []byte, err error) {
return
}
// TODO chef: 和HLS中的代码有重复合并一下
// 将rtmp avc数据转换成avc裸流
// @param <payload> rtmp message的payload部分 或者 flv tag的payload部分
func CaptureAVC(w io.Writer, payload []byte) error {

@ -34,3 +34,22 @@ func TestCorner(t *testing.T) {
assert.Equal(t, nil, b.Bytes())
assert.Equal(t, err, ErrAVC)
}
func TestCalcSliceType(t *testing.T) {
data := []struct {
in []byte
out uint8
}{
{[]byte{0x65, 0x88, 0x82}, SliceTypeI},
{[]byte{0x65, 0x88, 0x84}, SliceTypeI},
{[]byte{0x41, 0x9a, 0x26}, SliceTypeP},
{[]byte{0x41, 0x9a, 0x46}, SliceTypeP},
{[]byte{0x41, 0x9a, 0x24}, SliceTypeP},
{[]byte{0x41, 0x9e, 0x42}, SliceTypeB},
}
for _, item := range data {
assert.Equal(t, CalcSliceType(item.in), item.out)
}
}

@ -10,8 +10,6 @@ package hls
import (
"os"
"github.com/q191201771/naza/pkg/nazalog"
)
type FragmentOP struct {
@ -136,7 +134,6 @@ func (f *FragmentOP) WriteFrame(frame *mpegTSFrame, b []byte) {
pesSize := rpos + int(headerSize) + 3 // PES Header剩余3字节 + PTS/PTS长度 + 整个帧的长度
if pesSize > 0xFFFF {
nazalog.Warnf("pes size too large. pesSize=%d", pesSize)
pesSize = 0
}

@ -8,16 +8,15 @@
package hls
// TODO
// 声明本package参考了c语言实现的开源项目nginx-rtmp-module
// TODO chef:
// - 支持HEVC
// - 检查所有的容错处理,是否会出现
// - 补充单元测试
// - 配置项
// - Server
// - Dispose
// - 超时时间
// - 测试windows平台
// - safari直接播放不了vlc和ffplay是可以的
// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/incorporating_ads_into_a_playlist
// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/event_playlist_construction

@ -11,6 +11,8 @@ package hls_test
import (
"testing"
"github.com/q191201771/lal/pkg/innertest"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/nazalog"
)
@ -26,3 +28,7 @@ func TestParseFixedTSPacket(t *testing.T) {
pmt := hls.ParsePMT(hls.FixedFragmentHeader[188+5:])
nazalog.Debugf("%+v", pmt)
}
func TestHls(t *testing.T) {
innertest.InnerTestEntry(t)
}

@ -28,6 +28,7 @@ type fragmentInfo struct {
}
type MuxerConfig struct {
Enable bool `json:"enable"`
OutPath string `json:"out_path"`
FragmentDurationMS int `json:"fragment_duration_ms"`
FragmentNum int `json:"fragment_num"`

@ -9,6 +9,7 @@
package hls
import (
"net"
"net/http"
"github.com/q191201771/naza/pkg/nazalog"
@ -17,6 +18,8 @@ import (
type Server struct {
addr string
outPath string
ln net.Listener
httpSrv *http.Server
}
func NewServer(addr string, outPath string) *Server {
@ -26,9 +29,23 @@ func NewServer(addr string, outPath string) *Server {
}
}
func (s *Server) Listen() (err error) {
if s.ln, err = net.Listen("tcp", s.addr); err != nil {
return
}
s.httpSrv = &http.Server{Addr: s.addr, Handler: s}
nazalog.Infof("start hls server listen. addr=%s", s.addr)
return
}
func (s *Server) RunLoop() error {
nazalog.Infof("start hls listen. addr=%s", s.addr)
return http.ListenAndServe(s.addr, s)
return s.httpSrv.Serve(s.ln)
}
func (s *Server) Dispose() {
if err := s.httpSrv.Close(); err != nil {
nazalog.Error(err)
}
}
func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
@ -57,16 +74,20 @@ func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
switch ri.fileType {
case "m3u8":
resp.Header().Add("Content-Type", "application/x-mpegurl")
//resp.Header().Add("Content-Type", "application/vnd.apple.mpegurl")
case "ts":
resp.Header().Add("Content-Type", "video/mp2t")
}
resp.Header().Add("Cache-Control", "no-cache")
_, _ = resp.Write(content)
return
}
// m3u8文件用这个也行
//resp.Header().Add("Content-Type", "application/vnd.apple.mpegurl")
//resp.Header().Add("Access-Control-Allow-Origin", "*")
//resp.Header().Add("Access-Control-Allow-Credentials", "true")
//resp.Header().Add("Access-Control-Allow-Methods", "*")
//resp.Header().Add("Access-Control-Allow-Headers", "Content-Type,Access-Token")
//resp.Header().Add("Access-Control-Allow-Expose-Headers", "*")
_, _ = resp.Write(content)
return
}

@ -10,7 +10,6 @@ package hls
import (
"github.com/q191201771/naza/pkg/nazabits"
"github.com/q191201771/naza/pkg/nazalog"
)
// ------------------------------------------------
@ -57,22 +56,22 @@ type TSPacketAdaptation struct {
// 解析4字节TS Packet header
func ParseTSPacketHeader(b []byte) (h TSPacketHeader) {
// TODO chef: 检查长度
br := nazabits.NewBitReader(b)
h.Sync = br.ReadBits8(8)
nazalog.Assert(uint8(0x47), h.Sync)
h.Err = br.ReadBits8(1)
h.PayloadUnitStart = br.ReadBits8(1)
h.Prio = br.ReadBits8(1)
h.Pid = br.ReadBits16(13)
h.Scra = br.ReadBits8(2)
h.Adaptation = br.ReadBits8(2)
h.CC = br.ReadBits8(4)
h.Sync, _ = br.ReadBits8(8)
h.Err, _ = br.ReadBits8(1)
h.PayloadUnitStart, _ = br.ReadBits8(1)
h.Prio, _ = br.ReadBits8(1)
h.Pid, _ = br.ReadBits16(13)
h.Scra, _ = br.ReadBits8(2)
h.Adaptation, _ = br.ReadBits8(2)
h.CC, _ = br.ReadBits8(4)
return
}
// TODO chef
func ParseTSPacketAdaptation(b []byte) (f TSPacketAdaptation) {
br := nazabits.NewBitReader(b)
f.Length = br.ReadBits8(8)
f.Length, _ = br.ReadBits8(8)
return
}

@ -52,29 +52,30 @@ type PATProgramElement struct {
}
func ParsePAT(b []byte) (pat PAT) {
// TODO chef: 检查长度
br := nazabits.NewBitReader(b)
pat.tid = br.ReadBits8(8)
pat.ssi = br.ReadBits8(1)
br.ReadBits8(3)
pat.sl = br.ReadBits16(12)
pat.tsi = br.ReadBits16(16)
br.ReadBits8(2)
pat.vn = br.ReadBits8(5)
pat.cni = br.ReadBits8(1)
pat.sn = br.ReadBits8(8)
pat.lsn = br.ReadBits8(8)
pat.tid, _ = br.ReadBits8(8)
pat.ssi, _ = br.ReadBits8(1)
_, _ = br.ReadBits8(3)
pat.sl, _ = br.ReadBits16(12)
pat.tsi, _ = br.ReadBits16(16)
_, _ = br.ReadBits8(2)
pat.vn, _ = br.ReadBits8(5)
pat.cni, _ = br.ReadBits8(1)
pat.sn, _ = br.ReadBits8(8)
pat.lsn, _ = br.ReadBits8(8)
length := pat.sl - 9
for i := uint16(0); i < length; i += 4 {
var ppe PATProgramElement
ppe.pn = br.ReadBits16(16)
br.ReadBits8(3)
ppe.pn, _ = br.ReadBits16(16)
_, _ = br.ReadBits8(3)
// TODO chef if pn == 0
ppe.pmpid = br.ReadBits16(13)
ppe.pmpid, _ = br.ReadBits16(13)
pat.ppes = append(pat.ppes, ppe)
}
pat.crc32 = br.ReadBits32(32)
pat.crc32, _ = br.ReadBits32(32)
return
}

@ -49,16 +49,16 @@ type PES struct {
func ParsePES(b []byte) (pes PES, length int) {
br := nazabits.NewBitReader(b)
pes.pscp = br.ReadBits32(24)
pes.sid = br.ReadBits8(8)
pes.ppl = br.ReadBits16(16)
pes.pscp, _ = br.ReadBits32(24)
pes.sid, _ = br.ReadBits8(8)
pes.ppl, _ = br.ReadBits16(16)
pes.pad1 = br.ReadBits8(8)
pes.ptsDtsFlag = br.ReadBits8(2)
pes.pad2 = br.ReadBits8(6)
pes.phdl = br.ReadBits8(8)
pes.pad1, _ = br.ReadBits8(8)
pes.ptsDtsFlag, _ = br.ReadBits8(2)
pes.pad2, _ = br.ReadBits8(6)
pes.phdl, _ = br.ReadBits8(8)
br.ReadBytes(uint(pes.phdl))
_, _ = br.ReadBytes(uint(pes.phdl))
length = 9 + int(pes.phdl)
// 处理得不是特别标准

@ -63,36 +63,36 @@ type PMTProgramElement struct {
func ParsePMT(b []byte) (pmt PMT) {
br := nazabits.NewBitReader(b)
pmt.tid = br.ReadBits8(8)
pmt.ssi = br.ReadBits8(1)
br.ReadBits8(3)
pmt.sl = br.ReadBits16(12)
pmt.tid, _ = br.ReadBits8(8)
pmt.ssi, _ = br.ReadBits8(1)
_, _ = br.ReadBits8(3)
pmt.sl, _ = br.ReadBits16(12)
len := pmt.sl - 13
pmt.pn = br.ReadBits16(16)
br.ReadBits8(2)
pmt.vn = br.ReadBits8(5)
pmt.cni = br.ReadBits8(1)
pmt.sn = br.ReadBits8(8)
pmt.lsn = br.ReadBits8(8)
br.ReadBits8(3)
pmt.pp = br.ReadBits16(13)
br.ReadBits8(4)
pmt.pil = br.ReadBits16(12)
pmt.pn, _ = br.ReadBits16(16)
_, _ = br.ReadBits8(2)
pmt.vn, _ = br.ReadBits8(5)
pmt.cni, _ = br.ReadBits8(1)
pmt.sn, _ = br.ReadBits8(8)
pmt.lsn, _ = br.ReadBits8(8)
_, _ = br.ReadBits8(3)
pmt.pp, _ = br.ReadBits16(13)
_, _ = br.ReadBits8(4)
pmt.pil, _ = br.ReadBits16(12)
if pmt.pil != 0 {
nazalog.Warn(pmt.pil)
br.ReadBytes(uint(pmt.pil))
_, _ = br.ReadBytes(uint(pmt.pil))
}
for i := uint16(0); i < len; i += 5 {
var ppe PMTProgramElement
ppe.StreamType = br.ReadBits8(8)
br.ReadBits8(3)
ppe.Pid = br.ReadBits16(13)
br.ReadBits8(4)
ppe.Length = br.ReadBits16(12)
ppe.StreamType, _ = br.ReadBits8(8)
_, _ = br.ReadBits8(3)
ppe.Pid, _ = br.ReadBits16(13)
_, _ = br.ReadBits8(4)
ppe.Length, _ = br.ReadBits16(12)
if ppe.Length != 0 {
nazalog.Warn(ppe.Length)
br.ReadBits32(uint(ppe.Length))
_, _ = br.ReadBits32(uint(ppe.Length))
}
pmt.ProgramElements = append(pmt.ProgramElements, ppe)
}

@ -1,49 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpflv_test
import (
"testing"
"github.com/q191201771/lal/pkg/httpflv"
log "github.com/q191201771/naza/pkg/nazalog"
)
// TODO chef: 后续加个 httpflv post 在做完整流程测试吧
var (
serverAddr = ":10001"
pullURL = "http://127.0.0.1:10001/live/11111.flv"
)
type MockServerObserver struct {
}
func (so *MockServerObserver) NewHTTPFLVSubSessionCB(session *httpflv.SubSession) bool {
return true
}
func (so *MockServerObserver) DelHTTPFLVSubSessionCB(session *httpflv.SubSession) {
}
func TestExample(t *testing.T) {
var err error
var so MockServerObserver
s := httpflv.NewServer(&so, serverAddr)
go s.RunLoop()
pullSession := httpflv.NewPullSession(func(option *httpflv.PullSessionOption) {
option.ConnectTimeoutMS = 1000
option.ReadTimeoutMS = 1000
})
err = pullSession.Pull(pullURL, func(tag httpflv.Tag) {
})
log.Debugf("pull failed. err=%+v", err)
}

@ -0,0 +1,19 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpflv_test
import (
"testing"
"github.com/q191201771/lal/pkg/innertest"
)
func TestRTMP(t *testing.T) {
innertest.InnerTestEntry(t)
}

@ -10,7 +10,6 @@ package httpflv
import (
"net"
"sync"
log "github.com/q191201771/naza/pkg/nazalog"
)
@ -26,8 +25,6 @@ type ServerObserver interface {
type Server struct {
obs ServerObserver
addr string
m sync.Mutex
ln net.Listener
}
@ -38,18 +35,15 @@ func NewServer(obs ServerObserver, addr string) *Server {
}
}
func (server *Server) RunLoop() error {
var err error
server.m.Lock()
server.ln, err = net.Listen("tcp", server.addr)
server.m.Unlock()
if err != nil {
return err
func (server *Server) Listen() (err error) {
if server.ln, err = net.Listen("tcp", server.addr); err != nil {
return
}
log.Infof("start httpflv server listen. addr=%s", server.addr)
return
}
log.Infof("start httpflv listen. addr=%s", server.addr)
func (server *Server) RunLoop() error {
for {
conn, err := server.ln.Accept()
if err != nil {
@ -60,8 +54,6 @@ func (server *Server) RunLoop() error {
}
func (server *Server) Dispose() {
server.m.Lock()
defer server.m.Unlock()
if server.ln == nil {
return
}

@ -1,4 +1,4 @@
// Copyright 2019, Chef. All rights reserved.
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
@ -6,7 +6,7 @@
//
// Author: Chef (191201771@qq.com)
package logic_test
package innertest
import (
"bytes"
@ -14,29 +14,29 @@ import (
"io"
"io/ioutil"
"os"
"syscall"
"testing"
"time"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/assert"
"github.com/q191201771/naza/pkg/nazaatomic"
"github.com/q191201771/naza/pkg/nazalog"
)
// 读取 flv 文件,使用 rtmp 协议推送至服务端,之后分别用 rtmp 协议以及 httpflv 协议从服务端拉流再将拉取的流保存为flv文件
// 最后用三份 flv 文件做对比,看是否完全一致。
// 开启了一个lalserver
// 读取flv文件使用rtmp协议推送至服务端
// 分别用rtmp协议以及httpflv协议从服务端拉流再将拉取的流保存为flv文件
// 对比三份flv文件看是否完全一致
// TODO chef: 加上hls的检查
var (
tt *testing.T
rtmpAddr = ":19350"
httpflvAddr = ":8080"
hlsAddr = ":10001"
confFile = "testdata/lalserver.default.conf.json"
rFLVFileName = "testdata/test.flv"
wFLVPullFileName = "testdata/flvpull.flv"
@ -47,8 +47,8 @@ var (
rtmpPullURL string
fileReader httpflv.FLVFileReader
HTTPFLVWriter httpflv.FLVFileWriter
RTMPWriter httpflv.FLVFileWriter
httpFLVWriter httpflv.FLVFileWriter
rtmpWriter httpflv.FLVFileWriter
pushSession *rtmp.PushSession
httpflvPullSession *httpflv.PullSession
@ -59,44 +59,32 @@ var (
rtmpPullTagCount nazaatomic.Uint32
)
func TestExample(t *testing.T) {
func InnerTestEntry(t *testing.T) {
tt = t
var err error
err = fileReader.Open(rFLVFileName)
assert.Equal(t, nil, err)
config := logic.Config{
RTMP: logic.RTMP{Addr: rtmpAddr},
HTTPFLV: logic.HTTPFLV{SubListenAddr: httpflvAddr},
HLS: logic.HLS{
SubListenAddr: hlsAddr,
MuxerConfig: &hls.MuxerConfig{
OutPath: "/tmp/lal/hls/",
FragmentDurationMS: 3000,
FragmentNum: 6,
},
},
}
go logic.Entry(confFile)
time.Sleep(200 * time.Millisecond)
pushURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/11111", config.RTMP.Addr)
httpflvPullURL = fmt.Sprintf("http://127.0.0.1%s/live/11111.flv", config.HTTPFLV.SubListenAddr)
rtmpPullURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/11111", config.RTMP.Addr)
config, err := logic.LoadConf(confFile)
assert.Equal(t, nil, err)
sm := logic.NewServerManager(&config)
go sm.RunLoop()
pushURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/11111", config.RTMPConfig.Addr)
httpflvPullURL = fmt.Sprintf("http://127.0.0.1%s/live/11111.flv", config.HTTPFLVConfig.SubListenAddr)
rtmpPullURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/11111", config.RTMPConfig.Addr)
time.Sleep(200 * time.Millisecond)
err = fileReader.Open(rFLVFileName)
assert.Equal(t, nil, err)
err = HTTPFLVWriter.Open(wFLVPullFileName)
err = httpFLVWriter.Open(wFLVPullFileName)
assert.Equal(t, nil, err)
err = HTTPFLVWriter.WriteRaw(httpflv.FLVHeader)
err = httpFLVWriter.WriteRaw(httpflv.FLVHeader)
assert.Equal(t, nil, err)
err = RTMPWriter.Open(wRTMPPullFileName)
err = rtmpWriter.Open(wRTMPPullFileName)
assert.Equal(t, nil, err)
err = RTMPWriter.WriteRaw(httpflv.FLVHeader)
err = rtmpWriter.WriteRaw(httpflv.FLVHeader)
assert.Equal(t, nil, err)
go func() {
@ -105,7 +93,7 @@ func TestExample(t *testing.T) {
})
err := rtmpPullSession.Pull(rtmpPullURL, func(msg rtmp.AVMsg) {
tag := logic.Trans.RTMPMsg2FLVTag(msg)
err := RTMPWriter.WriteTag(*tag)
err := rtmpWriter.WriteTag(*tag)
assert.Equal(tt, nil, err)
rtmpPullTagCount.Increment()
})
@ -117,7 +105,7 @@ func TestExample(t *testing.T) {
option.ReadTimeoutMS = 500
})
err := httpflvPullSession.Pull(httpflvPullURL, func(tag httpflv.Tag) {
err := HTTPFLVWriter.WriteTag(tag)
err := httpFLVWriter.WriteTag(tag)
assert.Equal(t, nil, err)
httpflvPullTagCount.Increment()
})
@ -151,9 +139,9 @@ func TestExample(t *testing.T) {
pushSession.Dispose()
httpflvPullSession.Dispose()
rtmpPullSession.Dispose()
HTTPFLVWriter.Dispose()
RTMPWriter.Dispose()
sm.Dispose()
httpFLVWriter.Dispose()
rtmpWriter.Dispose()
_ = syscall.Kill(syscall.Getpid(), syscall.SIGUSR1)
nazalog.Debugf("count. %d %d %d", fileTagCount.Load(), httpflvPullTagCount.Load(), rtmpPullTagCount.Load())
compareFile()
@ -162,12 +150,14 @@ func TestExample(t *testing.T) {
func compareFile() {
r, err := ioutil.ReadFile(rFLVFileName)
assert.Equal(tt, nil, err)
w, err := ioutil.ReadFile(wFLVPullFileName)
assert.Equal(tt, nil, err)
res := bytes.Compare(r, w)
assert.Equal(tt, 0, res)
err = os.Remove(wFLVPullFileName)
assert.Equal(tt, nil, err)
w2, err := ioutil.ReadFile(wRTMPPullFileName)
assert.Equal(tt, nil, err)
res = bytes.Compare(r, w2)

@ -8,25 +8,86 @@
package logic
import "github.com/q191201771/lal/pkg/hls"
import (
"encoding/json"
"errors"
"io/ioutil"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/nazajson"
"github.com/q191201771/naza/pkg/nazalog"
)
type Config struct {
RTMP RTMP `json:"rtmp"`
HTTPFLV HTTPFLV `json:"httpflv"`
HLS HLS `json:"hls"`
RTMPConfig RTMPConfig `json:"rtmp"`
HTTPFLVConfig HTTPFLVConfig `json:"httpflv"`
HLSConfig HLSConfig `json:"hls"`
PProfConfig PProfConfig `json:"pprof"`
LogConfig nazalog.Option `json:"log"`
}
type RTMP struct {
type RTMPConfig struct {
Enable bool `json:"enable"`
Addr string `json:"addr"`
GOPNum int `json:"gop_num"`
}
type HTTPFLV struct {
type HTTPFLVConfig struct {
Enable bool `json:"enable"`
SubListenAddr string `json:"sub_listen_addr"`
GOPNum int `json:"gop_num"`
}
type HLS struct {
type HLSConfig struct {
SubListenAddr string `json:"sub_listen_addr"`
*hls.MuxerConfig
}
type PProfConfig struct {
Enable bool `json:"enable"`
Addr string `json:"addr"`
}
func LoadConf(confFile string) (*Config, error) {
var config Config
rawContent, err := ioutil.ReadFile(confFile)
if err != nil {
return nil, err
}
if err = json.Unmarshal(rawContent, &config); err != nil {
return nil, err
}
j, err := nazajson.New(rawContent)
if err != nil {
return nil, err
}
// 检查配置必须项
if !j.Exist("rtmp") || !j.Exist("httpflv") || !j.Exist("hls") || !j.Exist("log") || !j.Exist("pprof") {
return &config, errors.New("missing key field in config file")
}
// 配置不存在时,设置默认值
if !j.Exist("log.level") {
config.LogConfig.Level = nazalog.LevelDebug
}
if !j.Exist("log.filename") {
config.LogConfig.Filename = "./logs/lalserver.log"
}
if !j.Exist("log.is_to_stdout") {
config.LogConfig.IsToStdout = true
}
if !j.Exist("log.is_rotate_daily") {
config.LogConfig.IsRotateDaily = true
}
if !j.Exist("log.short_file_flag") {
config.LogConfig.ShortFileFlag = true
}
if !j.Exist("log.assert_behavior") {
config.LogConfig.AssertBehavior = nazalog.AssertError
}
return &config, nil
}

@ -0,0 +1,68 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"github.com/q191201771/naza/pkg/bininfo"
"github.com/q191201771/naza/pkg/nazalog"
)
var (
config *Config
)
func Entry(confFile string) {
config = loadConf(confFile)
initLog(config.LogConfig)
nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine())
sm := NewServerManager(config)
if config.PProfConfig.Enable {
go runWebPProf(config.PProfConfig.Addr)
}
go runSignalHandler(func() {
sm.Dispose()
})
sm.RunLoop()
}
func loadConf(confFile string) *Config {
config, err := LoadConf(confFile)
if err != nil {
nazalog.Errorf("load conf failed. file=%s err=%+v", confFile, err)
os.Exit(1)
}
nazalog.Infof("load conf file succ. file=%s content=%+v", confFile, config)
return config
}
func initLog(opt nazalog.Option) {
if err := nazalog.Init(func(option *nazalog.Option) {
*option = opt
}); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "initial log failed. err=%+v\n", err)
os.Exit(1)
}
nazalog.Info("initial log succ.")
}
func runWebPProf(addr string) {
nazalog.Infof("start web pprof listen. addr=%s", addr)
if err := http.ListenAndServe(addr, nil); err != nil {
nazalog.Error(err)
return
}
}

@ -1,9 +1,18 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"testing"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/assert"
"testing"
)
func TestGOPCache_Feed(t *testing.T) {

@ -87,8 +87,10 @@ func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool {
}
group.pubSession = session
if group.hlsConfig.Enable {
group.hlsMuxer = hls.NewMuxer(group.streamName, group.hlsConfig)
group.hlsMuxer.Start()
}
group.mutex.Unlock()
session.SetPubSessionObserver(group)
@ -100,7 +102,9 @@ func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.pubSession = nil
if group.hlsConfig.Enable {
group.hlsMuxer.Stop()
}
group.gopCache.Clear()
group.httpflvGopCache.Clear()
@ -159,8 +163,10 @@ func (group *Group) OnReadRTMPAVMsg(msg rtmp.AVMsg) {
//log.Debugf("%+v, %02x, %02x", msg.Header, msg.Payload[0], msg.Payload[1])
group.broadcastRTMP(msg)
if group.hlsConfig.Enable {
group.hlsMuxer.FeedRTMPMessage(msg)
}
}
func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
//log.Infof("%+v", msg.Header)

@ -0,0 +1,19 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic_test
import (
"testing"
"github.com/q191201771/lal/pkg/innertest"
)
func TestLogic(t *testing.T) {
innertest.InnerTestEntry(t)
}

@ -9,6 +9,7 @@
package logic
import (
"os"
"sync"
"time"
@ -16,14 +17,14 @@ import (
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
log "github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazalog"
)
type ServerManager struct {
config *Config
httpflvServer *httpflv.Server
rtmpServer *rtmp.Server
httpflvServer *httpflv.Server
hlsServer *hls.Server
exitChan chan struct{}
@ -37,39 +38,51 @@ func NewServerManager(config *Config) *ServerManager {
groupMap: make(map[string]*Group),
exitChan: make(chan struct{}),
}
if len(config.HTTPFLV.SubListenAddr) != 0 {
m.httpflvServer = httpflv.NewServer(m, config.HTTPFLV.SubListenAddr)
if config.RTMPConfig.Enable {
m.rtmpServer = rtmp.NewServer(m, config.RTMPConfig.Addr)
}
if len(config.RTMP.Addr) != 0 {
m.rtmpServer = rtmp.NewServer(m, config.RTMP.Addr)
if config.HTTPFLVConfig.Enable {
m.httpflvServer = httpflv.NewServer(m, config.HTTPFLVConfig.SubListenAddr)
}
if len(config.HLS.SubListenAddr) != 0 {
m.hlsServer = hls.NewServer(config.HLS.SubListenAddr, config.HLS.OutPath)
if config.HLSConfig.Enable {
m.hlsServer = hls.NewServer(config.HLSConfig.SubListenAddr, config.HLSConfig.OutPath)
}
return m
}
func (sm *ServerManager) RunLoop() {
if sm.httpflvServer != nil {
if sm.rtmpServer != nil {
if err := sm.rtmpServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
}
go func() {
if err := sm.httpflvServer.RunLoop(); err != nil {
log.Error(err)
if err := sm.rtmpServer.RunLoop(); err != nil {
nazalog.Error(err)
}
}()
}
if sm.rtmpServer != nil {
if sm.httpflvServer != nil {
if err := sm.httpflvServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
}
go func() {
if err := sm.rtmpServer.RunLoop(); err != nil {
log.Error(err)
if err := sm.httpflvServer.RunLoop(); err != nil {
nazalog.Error(err)
}
}()
}
if sm.hlsServer != nil {
if err := sm.hlsServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
}
go func() {
if err := sm.hlsServer.RunLoop(); err != nil {
log.Error(err)
nazalog.Error(err)
}
}()
}
@ -86,7 +99,7 @@ func (sm *ServerManager) RunLoop() {
count++
if (count % 10) == 0 {
sm.mutex.Lock()
log.Infof("group size:%d", len(sm.groupMap))
nazalog.Debugf("group size:%d", len(sm.groupMap))
sm.mutex.Unlock()
}
}
@ -94,12 +107,15 @@ func (sm *ServerManager) RunLoop() {
}
func (sm *ServerManager) Dispose() {
log.Debug("dispose server manager.")
nazalog.Debug("dispose server manager.")
if sm.rtmpServer != nil {
sm.rtmpServer.Dispose()
}
if sm.httpflvServer != nil {
sm.httpflvServer.Dispose()
}
if sm.rtmpServer != nil {
sm.rtmpServer.Dispose()
if sm.hlsServer != nil {
sm.hlsServer.Dispose()
}
sm.mutex.Lock()
@ -172,7 +188,7 @@ func (sm *ServerManager) check() {
defer sm.mutex.Unlock()
for k, group := range sm.groupMap {
if group.IsTotalEmpty() {
log.Infof("erase empty group manager. [%s]", group.UniqueKey)
nazalog.Infof("erase empty group manager. [%s]", group.UniqueKey)
group.Dispose()
delete(sm.groupMap, k)
}
@ -182,7 +198,7 @@ func (sm *ServerManager) check() {
func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group {
group, exist := sm.groupMap[streamName]
if !exist {
group = NewGroup(appName, streamName, sm.config.RTMP.GOPNum, sm.config.HTTPFLV.GOPNum, sm.config.HLS.MuxerConfig)
group = NewGroup(appName, streamName, sm.config.RTMPConfig.GOPNum, sm.config.HTTPFLVConfig.GOPNum, sm.config.HLSConfig.MuxerConfig)
sm.groupMap[streamName] = group
}
go group.RunLoop()

@ -8,7 +8,7 @@
// +build linux darwin netbsd freebsd openbsd dragonfly
package main
package logic
import (
"os"
@ -18,10 +18,10 @@ import (
log "github.com/q191201771/naza/pkg/nazalog"
)
func runSignalHandler() {
func runSignalHandler(cb func()) {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGUSR1, syscall.SIGUSR2)
s := <-c
log.Infof("recv signal. s=%+v", s)
sm.Dispose()
cb()
}

@ -8,8 +8,8 @@
// +build windows
package main
package logic
func runSignalHandler() {
func runSignalHandler(cb func()) {
}

@ -188,7 +188,7 @@ func (c *ChunkComposer) getOrCreateStream(csid int) *Stream {
// 临时存放一些rtmp推流case在这便于理解以及修改后回归用
//
// 场景ffmpeg推送test.flv至lals
// 场景ffmpeg推送test.flv至lalserver
// 关注点message超过chunk时fmt和timestamp的值
//
// ChunkComposer chunk fmt:1 header:{CSID:6 MsgLen:143 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:520} csid:6 len:143 ts:520

@ -219,11 +219,12 @@ func (s *ClientSession) doDataMessageAMF0(stream *Stream) error {
}
switch val {
case "|RtmpSampleAccess": // TODO chef: handle this?
return nil
default:
case "|RtmpSampleAccess":
// TODO chef: handle this?
log.Error(val)
log.Error(hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e]))
return nil
default:
}
s.onReadRTMPAVMsg(stream.toAVMsg())
return nil

@ -1,176 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtmp_test
import (
"bytes"
"io"
"io/ioutil"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/assert"
log "github.com/q191201771/naza/pkg/nazalog"
)
// 读取 flv 文件,使用 rtmp 协议发送至服务端,再使用 rtmp 协议从服务端拉流,转换 flv 格式存储为文件,
// 检查两份 flv 文件是否完全一致。
var (
serverAddr = ":10001"
pushURL = "rtmp://127.0.0.1:10001/live/test"
pullURL = "rtmp://127.0.0.1:10001/live/test"
rFLVFile = "testdata/test.flv"
wFLVFile = "testdata/out.flv"
wgNum = 4 // FLVFileReader -> [push -> pub -> sub -> pull] -> FLVFileWriter
)
var (
pubSessionObs MockPubSessionObserver
pullSession *rtmp.PullSession
subSession *rtmp.ServerSession
wg sync.WaitGroup
w httpflv.FLVFileWriter
//
rc uint32
bc uint32
wc uint32
)
type MockServerObserver struct {
}
func (so *MockServerObserver) NewRTMPPubSessionCB(session *rtmp.ServerSession) bool {
log.Debug("NewRTMPPubSessionCB")
session.SetPubSessionObserver(&pubSessionObs)
return true
}
func (so *MockServerObserver) NewRTMPSubSessionCB(session *rtmp.ServerSession) bool {
log.Debug("NewRTMPSubSessionCB")
subSession = session
return true
}
func (so *MockServerObserver) DelRTMPPubSessionCB(session *rtmp.ServerSession) {
log.Debug("DelRTMPPubSessionCB")
subSession.Flush()
subSession.Dispose()
wg.Done()
}
func (so *MockServerObserver) DelRTMPSubSessionCB(session *rtmp.ServerSession) {
log.Debug("DelRTMPSubSessionCB")
wg.Done()
}
type MockPubSessionObserver struct {
}
func (pso *MockPubSessionObserver) OnReadRTMPAVMsg(msg rtmp.AVMsg) {
bc++
// 转发
currHeader := logic.Trans.MakeDefaultRTMPHeader(msg.Header)
var absChunks []byte
absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader)
subSession.AsyncWrite(absChunks)
}
func TestExample(t *testing.T) {
var err error
var r httpflv.FLVFileReader
err = r.Open(rFLVFile)
if err != nil {
return
}
wg.Add(wgNum)
var so MockServerObserver
s := rtmp.NewServer(&so, serverAddr)
go s.RunLoop()
// 等待 server 开始监听
time.Sleep(100 * time.Millisecond)
go func() {
pullSession = rtmp.NewPullSession()
err = pullSession.Pull(pullURL, func(msg rtmp.AVMsg) {
tag := logic.Trans.RTMPMsg2FLVTag(msg)
w.WriteTag(*tag)
atomic.AddUint32(&wc, 1)
})
log.Error(err)
}()
pushSession := rtmp.NewPushSession()
err = pushSession.Push(pushURL)
assert.Equal(t, nil, err)
err = w.Open(wFLVFile)
assert.Equal(t, nil, err)
err = w.WriteRaw(httpflv.FLVHeader)
assert.Equal(t, nil, err)
for {
tag, err := r.ReadTag()
if err == io.EOF {
break
}
assert.Equal(t, nil, err)
rc++
//log.Debugf("send tag. %d", tag.Header.Timestamp)
msg := logic.Trans.FLVTag2RTMPMsg(tag)
//log.Debugf("send msg. %d %d", msg.Header.Timestamp, msg.Header.TimestampAbs)
chunks := rtmp.Message2Chunks(msg.Payload, &msg.Header)
//log.Debugf("%s", hex.Dump(chunks[:16]))
err = pushSession.AsyncWrite(chunks)
assert.Equal(t, nil, err)
}
r.Dispose()
wg.Done()
err = pushSession.Flush()
assert.Equal(t, nil, err)
pushSession.Dispose()
wg.Done()
wg.Wait()
// 等待 pull 完成
for {
if atomic.LoadUint32(&wc) == rc {
break
}
time.Sleep(1 * time.Nanosecond)
}
//time.Sleep(1 * time.Second)
pullSession.Dispose()
w.Dispose()
s.Dispose()
log.Debugf("rc=%d, bc=%d, wc=%d", rc, bc, atomic.LoadUint32(&wc))
compareFile(t)
}
func compareFile(t *testing.T) {
r, err := ioutil.ReadFile(rFLVFile)
assert.Equal(t, nil, err)
w, err := ioutil.ReadFile(wFLVFile)
assert.Equal(t, nil, err)
res := bytes.Compare(r, w)
assert.Equal(t, 0, res)
//err = os.Remove(wFLVFile)
assert.Equal(t, nil, err)
}

@ -0,0 +1,19 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtmp_test
import (
"testing"
"github.com/q191201771/lal/pkg/innertest"
)
func TestRTMP(t *testing.T) {
innertest.InnerTestEntry(t)
}

@ -10,7 +10,6 @@ package rtmp
import (
"net"
"sync"
log "github.com/q191201771/naza/pkg/nazalog"
)
@ -25,7 +24,6 @@ type ServerObserver interface {
type Server struct {
obs ServerObserver
addr string
m sync.Mutex
ln net.Listener
}
@ -36,15 +34,15 @@ func NewServer(obs ServerObserver, addr string) *Server {
}
}
func (server *Server) RunLoop() error {
var err error
server.m.Lock()
server.ln, err = net.Listen("tcp", server.addr)
server.m.Unlock()
if err != nil {
return err
func (server *Server) Listen() (err error) {
if server.ln, err = net.Listen("tcp", server.addr); err != nil {
return
}
log.Infof("start rtmp server listen. addr=%s", server.addr)
return
}
func (server *Server) RunLoop() error {
for {
conn, err := server.ln.Accept()
if err != nil {
@ -55,8 +53,6 @@ func (server *Server) RunLoop() error {
}
func (server *Server) Dispose() {
server.m.Lock()
defer server.m.Unlock()
if server.ln == nil {
return
}

@ -0,0 +1,26 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp
import (
"fmt"
"time"
)
var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\nCSeq: %s\r\nPublic:DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE\r\n\r\n"
var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\nCSeq: %s\r\nDate: %s\r\nSession: 47112344\r\nTransport:%s;server_port=6256-6257"
func PackResponseOptions(cseq string) string {
return fmt.Sprintf(ResponseOptionsTmpl, cseq)
}
func PackResponseSetup(cseq string, transportC string) string {
date := time.Now().Format(time.RFC1123)
return fmt.Sprintf(ResponseSetupTmpl, cseq, date, transportC)
}

@ -0,0 +1,23 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp
// 注意正在学习以及实现rtsp请不要使用这个package
// rfc2326
const (
MethodOptions = "OPTIONS"
MethodSetup = "SETUP"
)
const (
HeaderFieldCSeq = "CSeq"
HeaderFieldTransport = "Transport"
)

@ -0,0 +1,78 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp
import (
"bufio"
"net"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/nazalog"
)
type Server struct {
addr string
ln net.Listener
}
func NewServer(addr string) *Server {
return &Server{
addr: addr,
}
}
func (s *Server) Listen() (err error) {
s.ln, err = net.Listen("tcp", s.addr)
if err != nil {
return
}
nazalog.Infof("start hls server listen. addr=%s", s.addr)
return
}
func (s *Server) RunLoop() error {
for {
conn, err := s.ln.Accept()
if err != nil {
return err
}
go s.handleTCPConnect(conn)
}
}
func (s *Server) handleTCPConnect(conn net.Conn) {
nazalog.Debugf("handleTCPConnect. conn=%p", conn)
r := bufio.NewReader(conn)
for {
requestLine, headers, err := nazahttp.ReadHTTPHeader(r)
if err != nil {
nazalog.Error(err)
break
}
nazalog.Debugf("requestLine=%s, headers=%+v", requestLine, headers)
method, _, _, err := nazahttp.ParseHTTPRequestLine(requestLine)
if err != nil {
nazalog.Error(err)
break
}
// TODO chef: header field not exist?
switch method {
case MethodOptions:
resp := PackResponseOptions(headers[HeaderFieldCSeq])
_, _ = conn.Write([]byte(resp))
case MethodSetup:
resp := PackResponseSetup(headers[HeaderFieldCSeq], headers[HeaderFieldTransport])
_, _ = conn.Write([]byte(resp))
default:
nazalog.Error(method)
}
}
}

@ -34,6 +34,7 @@ done
# 跑 go test 生成测试覆盖率
echo "-----CI coverage-----"
# 从网上下载测试用的flv文件
if [ ! -f "./testdata/test.flv" ]; then
if [ ! -d "./testdata" ]; then
mkdir "./testdata"
@ -43,28 +44,57 @@ if [ ! -f "./testdata/test.flv" ]; then
wget https://pengrl.com/images/other/source.200kbps.768x320.flv -O ./testdata/test.flv
fi
fi
# 将测试的flv文件分别拷贝到logicrtmphttpflvhls的testdata目录下
if [ ! -f "./pkg/logic/testdata/test.flv" ]; then
if [ ! -d "./pkg/logic/testdata" ]; then
mkdir "./pkg/logic/testdata"
fi
cp ./testdata/test.flv ./pkg/logic/testdata/test.flv
fi
if [ ! -f "./pkg/rtmp/testdata/test.flv" ]; then
if [ ! -d "./pkg/rtmp/testdata" ]; then
mkdir "./pkg/rtmp/testdata"
fi
cp ./testdata/test.flv ./pkg/rtmp/testdata/test.flv
fi
if [ ! -f "./pkg/logic/testdata/test.flv" ]; then
if [ ! -d "./pkg/logic/testdata" ]; then
mkdir "./pkg/logic/testdata"
if [ ! -f "./pkg/httpflv/testdata/test.flv" ]; then
if [ ! -d "./pkg/httpflv/testdata" ]; then
mkdir "./pkg/httpflv/testdata"
fi
cp ./testdata/test.flv ./pkg/logic/testdata/test.flv
cp ./testdata/test.flv ./pkg/httpflv/testdata/test.flv
fi
if [ ! -f "./pkg/hls/testdata/test.flv" ]; then
if [ ! -d "./pkg/hls/testdata" ]; then
mkdir "./pkg/hls/testdata"
fi
if [ ! -f "./pkg/logic/testdata/lals.default.conf.json" ]; then
cp ./conf/lals.default.conf.json ./pkg/logic/testdata/lals.default.conf.json
cp ./testdata/test.flv ./pkg/hls/testdata/test.flv
fi
# 将配置文件分别拷贝到logicrtmphttpflvhls的testdata目录下
if [ ! -f "./pkg/logic/testdata/lalserver.default.conf.json" ]; then
cp ./conf/lalserver.default.conf.json ./pkg/logic/testdata/lalserver.default.conf.json
fi
if [ ! -f "./pkg/rtmp/testdata/lalserver.default.conf.json" ]; then
cp ./conf/lalserver.default.conf.json ./pkg/rtmp/testdata/lalserver.default.conf.json
fi
if [ ! -f "./pkg/httpflv/testdata/lalserver.default.conf.json" ]; then
cp ./conf/lalserver.default.conf.json ./pkg/httpflv/testdata/lalserver.default.conf.json
fi
if [ ! -f "./pkg/hls/testdata/lalserver.default.conf.json" ]; then
cp ./conf/lalserver.default.conf.json ./pkg/hls/testdata/lalserver.default.conf.json
fi
echo "" > coverage.txt
for d in $(go list ./... | grep -v vendor | grep pkg); do
for d in $(go list ./... | grep -v vendor | grep pkg | grep -v innertest); do
go test -race -coverprofile=profile.out -covermode=atomic $d
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
rm profile.out
fi
done
rm -rf ./pkg/logic/logs ./pkg/rtmp/logs ./pkg/httpflv/logs ./pkg/hls/logs
#rm -rf ./pkg/logic/testdata ./pkg/rtmp/testdata ./pkg/httpflv/testdata ./pkg/hls/testdata
echo 'done.'

Loading…
Cancel
Save