messages:

- [fix] package rtsp: 修复rtsp aac可能出现崩溃。支持aac fragment格式(一个音频帧被拆分成多个rtp包)
- [feat] lalserver: 新增配置项hls.cleanup_mode,支持三种清理hls文件的模式,具体说明见 https://pengrl.com/lal/#/ConfigBrief
- [doc] 启用英文版本README.md作为github首页文档展示
- [refactor] package rtprtcp: 重构rtp unpacker,业务方可以使用默认的container,protocol策略,也可以自己实现特定的协议解析组包策略
- [refactor] lalserver: 整理配置文件加载与日志初始化部分的代码
- [doc] lalserver: 新增配置项conf_version,用于表示配置文件的版本号
- [doc] lalserver: 启动时日志中增加lal logo
- [doc] 新增文章《rtmp中的各种ID》,见 https://pengrl.com/lal/#/RTMPID
- [doc] 新增文章《rtmp handshake握手之简单模式和复杂模式》,见 https://pengrl.com/lal/#/RTMPHandshake
pull/53/head
q191201771 4 years ago
parent 12c53b8691
commit 2c5bb92601

@ -1,65 +1,89 @@
<p align="center">
<a title="logo" target="_blank" href="https://github.com/q191201771/lal">
<img alt="Live And Live" src="https://pengrl.com/lal/_media/lallogo.png">
</a>
<br>
<a title="TravisCI" target="_blank" href="https://www.travis-ci.org/q191201771/lal"><img src="https://www.travis-ci.org/q191201771/lal.svg?branch=master"></a>
<a title="codecov" target="_blank" href="https://codecov.io/gh/q191201771/lal"><img src="https://codecov.io/gh/q191201771/lal/branch/master/graph/badge.svg?style=flat-square"></a>
<a title="goreportcard" target="_blank" href="https://goreportcard.com/report/github.com/q191201771/lal"><img src="https://goreportcard.com/badge/github.com/q191201771/lal?style=flat-square"></a>
<br>
<a title="codeline" target="_blank" href="https://github.com/q191201771/lal"><img src="https://sloc.xyz/github/q191201771/lal/?category=code"></a>
<a title="license" target="_blank" href="https://github.com/q191201771/lal/blob/master/LICENSE"><img src="https://img.shields.io/badge/license-MIT-brightgreen.svg?style=flat-square"></a>
<br>
<a title="hits" target="_blank" href="https://github.com/q191201771/lal"><img src="https://hits.b3log.org/q191201771/lal.svg?style=flat-square"></a>
<a title="toplanguage" target="_blank" href="https://github.com/q191201771/lal"><img src="https://img.shields.io/github/languages/top/q191201771/lal.svg?style=flat-square"></a>
<br>
</p>
---
lal是一个开源GoLang直播流媒体网络传输项目包含三个主要组成部分
- lalserver流媒体转发服务器。类似于`nginx-rtmp-module`等应用,但支持更多的协议,提供更丰富的功能。[lalserver简介](https://pengrl.com/lal/#/LALServer)
- demo一些小应用比如推、拉流客户端压测工具流分析工具调度示例程序等。类似于ffmpeg、ffprobe等应用。[Demo简介](https://pengrl.com/lal/#/DEMO)
- pkg流媒体协议库。类似于ffmpeg的libavformat等库。
**lal源码package架构图**
![lal源码package架构图](https://pengrl.com/lal/_media/lal_src_fullview_frame.jpeg?date=0124)
**lalserver特性图**
![lalserver特性图](https://pengrl.com/lal/_media/lal_feature.jpeg?date=0124)
了解更多请访问:
* lal github地址: https://github.com/q191201771/lal
* lal 官方文档: https://pengrl.com/lal
* **/lalserver/**
* [简介](https://pengrl.com/lal/#/LALServer.md)
* [编译、运行、体验功能](https://pengrl.com/lal/#/QuickStart.md)
* [配置文件说明](https://pengrl.com/lal/#/ConfigBrief.md)
* [HTTP API接口](https://pengrl.com/lal/#/HTTPAPI.md)
* [HTTP Notify(Callback/Webhook)事件回调](https://pengrl.com/lal/#/HTTPNotify.md)
* [Demo简介](https://pengrl.com/lal/#/DEMO.md)
* [Changelog修改记录](https://pengrl.com/lal/#/CHANGELOG.md)
* [github star趋势图](https://pengrl.com/lal/#/StarChart.md)
* [第三方依赖](https://pengrl.com/lal/#/ThirdDeps.md)
* [联系作者](https://pengrl.com/lal/#/Author.md)
* **/技术文档/**
* [常见推拉流客户端使用方式](https://pengrl.com/lal/#/CommonClient.md)
* [连接类型之session pub/sub/push/pull](https://pengrl.com/lal/#/Session.md)
* [rtmp url以及vhost](https://pengrl.com/lal/#/RTMPURLVhost.md)
* [ffplay播放rtsp花屏](https://pengrl.com/lal/#/RTSPFFPlayBlur.md)
* [FAQ](https://pengrl.com/lal/#/FAQ.md)
* **/待整理/**
* [性能测试](https://pengrl.com/lal/#/Test.md)
* [图稿](https://pengrl.com/lal/#/Drawing.md)
联系作者:
- email191201771@qq.com
- 微信q191201771
- QQ191201771
- 微信群: 加我微信好友后,告诉我拉你进群
- QQ群 1090510973
# LAL
[![Release](https://img.shields.io/github/tag/q191201771/lal.svg?label=release)](https://github.com/q191201771/lal/releases)
[![TravisCI](https://www.travis-ci.org/q191201771/lal.svg?branch=master)](https://www.travis-ci.org/q191201771/lal)
[![goreportcard](https://goreportcard.com/badge/github.com/q191201771/lal)](https://goreportcard.com/report/github.com/q191201771/lal)
[中文文档](https://pengrl.com/lal/#/)
LAL is a live stream broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g. RTMP/RTSP/HLS/HTTP[S]-FLV/HTTP-TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache.
`LAL` stands for `Live And Live` if you may wonder.
## Install
There are 2 ways of installing lal.
### Prebuilt binaries
Prebuilt binaries for Linux, macOS(Darwin), Windows are available in the [lal github releases page](https://github.com/q191201771/lal/releases). Naturally, using [the latest release binary](https://github.com/q191201771/lal/releases/latest) is the recommended way. The naming format is `lal_<version>_<platform>.zip`, e.g. `lal_v0.20.0_linux.zip`
LAL could also be built from the source wherever the Go compiler toolchain can run, e.g. for other architectures including arm32 and mipsle which have been tested by the community.
### Building from source
First, make sure that Go version >= 1.13
For Unix-like user:
```shell
$git clone https://github.com/q191201771/lal.git
$cd lal
$make build
```
Then all binaries go into the `./bin/` directory. That's it.
For an experienced gopher, the only thing you should be concern is that `the main function` is under the `./app/lalserver` directory. So you can also:
```shell
$git clone https://github.com/q191201771/lal.git
$cd lal/app/lalserver
$go build
```
Or using whatever IDEs you'd like.
So far, the only direct and indirect **dependency** of lal is [naza(A basic Go utility library)](https://github.com/q191201771/lal.git) which is also written by myself. This leads to less dependency or version manager issues.
## Using
Running lalserver:
```
$./bin/lalserver -c ./conf/lalserver.conf.json
```
Using whatever clients you are familiar with to interact with lalserver.
For instance, publish rtmp stream to lalserver via ffmpeg:
```shell
$ffmpeg -re -i demo.flv -c:a copy -c:v copy -f flv rtmp://127.0.0.1:1935/live/test110
```
Play multi protocol stream from lalserver via ffplay:
```shell
$ffplay rtmp://127.0.0.1/live/test110
$ffplay http://127.0.0.1:8080/live/test110.flv
$ffplay http://127.0.0.1:8081/hls/test110/playlist.m3u8
$ffplay http://127.0.0.1:8081/hls/test110/record.m3u8
$ffplay http://127.0.0.1:8082/live/test110.ts
```
## One more thing
Besides a live stream broadcast server which named `lalserver` precisely, `project lal` even provides many other applications, e.g. push/pull/remux stream clients, bench tools, examples. Each subdirectory under the `./app/demo` directory represents a tiny demo.
Our goals are not only a production server but also a simple package with a well-defined, user-facing API, so that users can build their own applications on it.
## Contact
Bugs, questions, suggestions, anything related or not, feel free to contact me with [lal github issues](https://github.com/q191201771/lal/issues).
## License
MIT, see [License](https://github.com/q191201771/lal/blob/master/LICENSE).

@ -55,11 +55,12 @@ func main() {
b := time.Now()
err := pullSession.Pull(u, func(msg base.RTMPMsg) {
})
cost := time.Now().Sub(b).Milliseconds()
mu.Lock()
if err == nil {
succCosts = append(succCosts, time.Now().Sub(b).Milliseconds())
succCosts = append(succCosts, cost)
} else {
failCosts = append(failCosts, time.Now().Sub(b).Milliseconds())
failCosts = append(failCosts, cost)
}
mu.Unlock()
wg.Done()

@ -25,6 +25,8 @@ import (
var sm *logic.ServerManager
func main() {
defer nazalog.Sync()
confFile := parseFlag()
logic.Entry(confFile)
}

@ -1,4 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.0",
"rtmp": {
"enable": true,
"addr": ":1945",
@ -19,7 +21,7 @@
"out_path": "/tmp/lal/edge/",
"fragment_duration_ms": 3000,
"fragment_num": 6,
"cleanup_flag": true
"cleanup_mode": 1
},
"httpts": {
"enable": true,

@ -1,4 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.0",
"rtmp": {
"enable": true,
"addr": ":1935",
@ -19,7 +21,7 @@
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6,
"cleanup_flag": true
"cleanup_mode": 1
},
"httpts": {
"enable": true,

@ -1,4 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.0",
"rtmp": {
"enable": true,
"addr": ":1935",
@ -19,7 +21,7 @@
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6,
"cleanup_flag": true
"cleanup_mode": 1
},
"httpts": {
"enable": true,

@ -1,4 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.0",
"rtmp": {
"enable": true,
"addr": ":1955",

@ -1,4 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.0",
"rtmp": {
"enable": true,
"addr": ":1935",
@ -19,7 +21,7 @@
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6,
"cleanup_flag": true
"cleanup_mode": 1
},
"httpts": {
"enable": false,

@ -34,11 +34,26 @@ type MuxerObserver interface {
type MuxerConfig struct {
Enable bool `json:"enable"` // 如果false说明hls功能没开也即不写磁盘但是MuxerObserver依然会回调
OutPath string `json:"out_path"` // m3u8和ts文件的输出根目录注意末尾需'/'结束
OutPath string `json:"out_path"` // m3u8和ts文件的输出根目录注意末尾需'/'结束
FragmentDurationMS int `json:"fragment_duration_ms"`
FragmentNum int `json:"fragment_num"`
// hls文件清理模式
// 0 不删除m3u8+ts文件可用于录制等场景
// 1 在输入流结束后删除m3u8+ts文件
// 注意,确切的删除时间是推流结束后的<fragment_duration_ms> * <fragment_num> * 2的时间点
// 推迟一小段时间删除是为了避免输入流刚结束hls的拉流端还没有拉取完
// 2 推流过程中持续删除过期的ts文件只保留最近的<fragment_num> * 2个左右的ts文件
// TODO chef: lalserver的模式1的逻辑是在上层做的应该重构到hls模块中
CleanupMode int `json:"cleanup_mode"`
}
const (
CleanupModeNever = 0
CleanupModeInTheEnd = 1
CleanupModeASAP = 2
)
type Muxer struct {
UniqueKey string
@ -58,9 +73,9 @@ type Muxer struct {
audioCC uint8
fragTS uint64 // 新建立fragment时的时间戳毫秒 * 90
nfrags int // 大序号,增长到winfrags就增长frag
nfrags int // 大序号,增长到config.FragmentNum就增长frag
frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []fragmentInfo // TS文件的环形队列记录TS的信息比如写M3U8文件时要用 2 * winfrags + 1
frags []fragmentInfo // TS文件的固定大小环形队列记录TS的信息
recordMaxFragDuration float64
streamer *Streamer
@ -82,7 +97,7 @@ func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *M
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
recordPlaylistFilename := getRecordM3U8Filename(op, streamName)
recordPlaylistFilenameBak := fmt.Sprintf("%s.bak", recordPlaylistFilename)
frags := make([]fragmentInfo, 2*config.FragmentNum+1) // TODO chef: 为什么是 * 2 + 1
frags := make([]fragmentInfo, 2*config.FragmentNum+1)
m := &Muxer{
UniqueKey: uk,
streamName: streamName,
@ -284,11 +299,29 @@ func (m *Muxer) closeFragment(isLast bool) error {
}
m.opened = false
//更新序号,为下个分片准备好
// 更新序号,为下个分片做准备
// 注意后面getFrag和getCurrFrag的调用都依赖该处
m.incrFrag()
m.writePlaylist(isLast)
m.writeRecordPlaylist(isLast)
if m.config.CleanupMode == CleanupModeNever || m.config.CleanupMode == CleanupModeInTheEnd {
m.writeRecordPlaylist(isLast)
}
if m.config.CleanupMode == CleanupModeASAP {
// 删除过期文件
// 注意,此处获取的是环形队列该位置的上一轮残留下的信息
//
frag := m.getCurrFrag()
if frag.filename != "" {
filenameWithPath := getTSFilenameWithPath(m.outPath, frag.filename)
if err := os.Remove(filenameWithPath); err != nil {
nazalog.Warnf("[%s] remove stale fragment file failed. filename=%s, err=%+v", m.UniqueKey, filenameWithPath, err)
}
}
}
return nil
}
@ -298,6 +331,8 @@ func (m *Muxer) writeRecordPlaylist(isLast bool) {
return
}
// 找出整个直播流从开始到结束最大的分片时长
// 注意由于前面已经incr过了所以这里-1获取
//frag := m.getCurrFrag()
currFrag := m.getFrag(m.nfrags - 1)
if currFrag.duration > m.recordMaxFragDuration {

@ -10,6 +10,7 @@ package innertest
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
@ -77,8 +78,11 @@ func InnerTestEntry(t *testing.T) {
go logic.Entry(confFile)
time.Sleep(200 * time.Millisecond)
config, err := logic.LoadConf(confFile)
assert.Equal(t, nil, err)
var config logic.Config
rawContent, err := ioutil.ReadFile(confFile)
nazalog.Assert(nil, err)
err = json.Unmarshal(rawContent, &config)
nazalog.Assert(nil, err)
_ = os.RemoveAll(config.HLSConfig.OutPath)

@ -9,19 +9,16 @@
package logic
import (
"encoding/json"
"io/ioutil"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/nazajson"
"github.com/q191201771/naza/pkg/nazalog"
)
const ConfigVersion = "0.0.1"
const ConfVersion = "v0.1.0"
type Config struct {
ConfVersion string `json:"conf_version"`
RTMPConfig RTMPConfig `json:"rtmp"`
HTTPFLVConfig HTTPFLVConfig `json:"httpflv"`
HLSConfig HLSConfig `json:"hls"`
@ -56,7 +53,6 @@ type HTTPTSConfig struct {
type HLSConfig struct {
SubListenAddr string `json:"sub_listen_addr"`
hls.MuxerConfig
CleanupFlag bool `json:"cleanup_flag"`
}
type RTSPConfig struct {
@ -95,70 +91,3 @@ type PProfConfig struct {
Enable bool `json:"enable"`
Addr string `json:"addr"`
}
func LoadConf(confFile string) (*Config, error) {
var config Config
rawContent, err := ioutil.ReadFile(confFile)
if err != nil {
return nil, err
}
if err = json.Unmarshal(rawContent, &config); err != nil {
return nil, err
}
j, err := nazajson.New(rawContent)
if err != nil {
return nil, err
}
// 检查一级配置项
keyFieldList := []string{
"rtmp",
"httpflv",
"hls",
"httpts",
"rtsp",
"relay_push",
"relay_pull",
"http_api",
"http_notify",
"pprof",
"log",
}
for _, kf := range keyFieldList {
if !j.Exist(kf) {
nazalog.Warnf("missing config item %s", kf)
}
}
// 配置不存在时,设置默认值
if !j.Exist("log.level") {
config.LogConfig.Level = nazalog.LevelDebug
}
if !j.Exist("log.filename") {
config.LogConfig.Filename = "./logs/lalserver.log"
}
if !j.Exist("log.is_to_stdout") {
config.LogConfig.IsToStdout = true
}
if !j.Exist("log.is_rotate_daily") {
config.LogConfig.IsRotateDaily = true
}
if !j.Exist("log.short_file_flag") {
config.LogConfig.ShortFileFlag = true
}
if !j.Exist("log.timestamp_flag") {
config.LogConfig.TimestampFlag = true
}
if !j.Exist("log.timestamp_with_ms_flag") {
config.LogConfig.TimestampWithMSFlag = true
}
if !j.Exist("log.level_flag") {
config.LogConfig.LevelFlag = true
}
if !j.Exist("log.assert_behavior") {
config.LogConfig.AssertBehavior = nazalog.AssertError
}
return &config, nil
}

@ -9,10 +9,16 @@
package logic
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
"strings"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/nazajson"
"github.com/q191201771/lal/pkg/base"
@ -27,8 +33,8 @@ var (
)
func Entry(confFile string) {
config = loadConf(confFile)
initLog(config.LogConfig)
LoadConfAndInitLog(confFile)
nazalog.Infof("args: %s", strings.Join(os.Args, " "))
nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine())
nazalog.Infof("version: %s", base.LALFullInfo)
nazalog.Infof("github: %s", base.LALGithubSite)
@ -43,36 +49,129 @@ func Entry(confFile string) {
sm.Dispose()
})
sm.RunLoop()
err := sm.RunLoop()
nazalog.Errorf("server manager loop break. err=%+v", err)
}
func Dispose() {
sm.Dispose()
}
func loadConf(confFile string) *Config {
config, err := LoadConf(confFile)
func LoadConfAndInitLog(confFile string) *Config {
// 读取配置文件并解析原始内容
rawContent, err := ioutil.ReadFile(confFile)
if err != nil {
nazalog.Errorf("load conf failed. file=%s err=%+v", confFile, err)
os.Exit(1)
_, _ = fmt.Fprintf(os.Stderr, "read conf file failed. file=%s err=%+v", confFile, err)
base.OSExitAndWaitPressIfWindows(1)
}
if err = json.Unmarshal(rawContent, &config); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "unmarshal conf file failed. file=%s err=%+v", confFile, err)
base.OSExitAndWaitPressIfWindows(1)
}
j, err := nazajson.New(rawContent)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "nazajson unmarshal conf file failed. file=%s err=%+v", confFile, err)
base.OSExitAndWaitPressIfWindows(1)
}
nazalog.Infof("load conf file succ. file=%s content=%+v", confFile, config)
return config
}
func initLog(opt nazalog.Option) {
// 初始化日志,注意,这一步尽量提前,使得后续的日志内容按我们的日志配置输出
// 日志配置项不存在时,设置默认值
if !j.Exist("log.level") {
config.LogConfig.Level = nazalog.LevelDebug
}
if !j.Exist("log.filename") {
config.LogConfig.Filename = "./logs/lalserver.log"
}
if !j.Exist("log.is_to_stdout") {
config.LogConfig.IsToStdout = true
}
if !j.Exist("log.is_rotate_daily") {
config.LogConfig.IsRotateDaily = true
}
if !j.Exist("log.short_file_flag") {
config.LogConfig.ShortFileFlag = true
}
if !j.Exist("log.timestamp_flag") {
config.LogConfig.TimestampFlag = true
}
if !j.Exist("log.timestamp_with_ms_flag") {
config.LogConfig.TimestampWithMSFlag = true
}
if !j.Exist("log.level_flag") {
config.LogConfig.LevelFlag = true
}
if !j.Exist("log.assert_behavior") {
config.LogConfig.AssertBehavior = nazalog.AssertError
}
if err := nazalog.Init(func(option *nazalog.Option) {
*option = opt
*option = config.LogConfig
}); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "initial log failed. err=%+v\n", err)
os.Exit(1)
base.OSExitAndWaitPressIfWindows(1)
}
nazalog.Info("initial log succ.")
// 打印Logo
nazalog.Info(`
__ ___ __
/ / / | / /
/ / / /| | / /
/ /___/ ___ |/ /___
/_____/_/ |_/_____/
`)
// 检查配置版本号是否匹配
if config.ConfVersion != ConfVersion {
nazalog.Warnf("config version invalid. conf version of lalserver=%s, conf version of config file=%s",
ConfVersion, config.ConfVersion)
}
// 检查一级配置项
keyFieldList := []string{
"rtmp",
"httpflv",
"hls",
"httpts",
"rtsp",
"relay_push",
"relay_pull",
"http_api",
"http_notify",
"pprof",
"log",
}
for _, kf := range keyFieldList {
if !j.Exist(kf) {
nazalog.Warnf("missing config item %s", kf)
}
}
// 配置不存在时,设置默认值
if !j.Exist("hls.cleanup_mode") {
const defaultMode = hls.CleanupModeInTheEnd
nazalog.Warnf("config hls.cleanup_mode not exist. default is %d", defaultMode)
config.HLSConfig.CleanupMode = defaultMode
}
// 把配置文件原始内容中的换行去掉,使得打印日志时紧凑一些
lines := strings.Split(string(rawContent), "\n")
if len(lines) == 1 {
lines = strings.Split(string(rawContent), "\r\n")
}
var tlines []string
for _, l := range lines {
tlines = append(tlines, strings.TrimSpace(l))
}
compactRawContent := strings.Join(tlines, " ")
nazalog.Infof("load conf file succ. filename=%s, raw content=%s parsed=%+v", confFile, compactRawContent, config)
return config
}
func runWebPProf(addr string) {
nazalog.Infof("start web pprof listen. addr=%s", addr)
//nazalog.Warn("start fgprof.")
//http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
if err := http.ListenAndServe(addr, nil); err != nil {

@ -657,10 +657,7 @@ func (group *Group) delRTSPPubSession(session *rtsp.PubSession) {
group.rtspPubSession = nil
group.delIn()
}
func (group *Group) delRTSPSubSession(session *rtsp.SubSession) {
nazalog.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.rtspSubSessionSet, session)
}
func (group *Group) delRTMPPullSession(session *rtmp.PullSession) {
nazalog.Debugf("[%s] [%s] del rtmp PullSession from group.", group.UniqueKey, session.UniqueKey())
@ -684,6 +681,11 @@ func (group *Group) delHTTPTSSubSession(session *httpts.SubSession) {
delete(group.httptsSubSessionSet, session)
}
func (group *Group) delRTSPSubSession(session *rtsp.SubSession) {
nazalog.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey())
delete(group.rtspSubSessionSet, session)
}
// TODO chef: 目前相当于其他类型往rtmp.AVMsg转了考虑统一往一个通用类型转
// @param msg 调用结束后内部不持有msg.Payload内存块
func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
@ -1008,7 +1010,8 @@ func (group *Group) disposeHLSMuxer() {
group.hlsMuxer.Dispose()
// 添加延时任务删除HLS文件
if config.HLSConfig.Enable && config.HLSConfig.CleanupFlag {
if config.HLSConfig.Enable &&
(config.HLSConfig.CleanupMode == hls.CleanupModeInTheEnd || config.HLSConfig.CleanupMode == hls.CleanupModeASAP) {
defertaskthread.Go(
config.HLSConfig.FragmentDurationMS*config.HLSConfig.FragmentNum*2,
func(param ...interface{}) {

@ -10,7 +10,6 @@ package logic
import (
"fmt"
"os"
"sync"
"time"
@ -66,13 +65,12 @@ func NewServerManager() *ServerManager {
return m
}
func (sm *ServerManager) RunLoop() {
func (sm *ServerManager) RunLoop() error {
httpNotify.OnServerStart()
if sm.rtmpServer != nil {
if err := sm.rtmpServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
return err
}
go func() {
if err := sm.rtmpServer.RunLoop(); err != nil {
@ -83,8 +81,7 @@ func (sm *ServerManager) RunLoop() {
if sm.httpflvServer != nil {
if err := sm.httpflvServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
return err
}
go func() {
if err := sm.httpflvServer.RunLoop(); err != nil {
@ -95,8 +92,7 @@ func (sm *ServerManager) RunLoop() {
if sm.httptsServer != nil {
if err := sm.httptsServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
return err
}
go func() {
if err := sm.httptsServer.RunLoop(); err != nil {
@ -107,8 +103,7 @@ func (sm *ServerManager) RunLoop() {
if sm.hlsServer != nil {
if err := sm.hlsServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
return err
}
go func() {
if err := sm.hlsServer.RunLoop(); err != nil {
@ -119,8 +114,7 @@ func (sm *ServerManager) RunLoop() {
if sm.rtspServer != nil {
if err := sm.rtspServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
return err
}
go func() {
if err := sm.rtspServer.RunLoop(); err != nil {
@ -131,8 +125,7 @@ func (sm *ServerManager) RunLoop() {
if sm.httpAPIServer != nil {
if err := sm.httpAPIServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
return err
}
go func() {
if err := sm.httpAPIServer.Runloop(); err != nil {
@ -153,7 +146,7 @@ func (sm *ServerManager) RunLoop() {
for {
select {
case <-sm.exitChan:
return
return nil
case <-t.C:
count++
@ -178,6 +171,8 @@ func (sm *ServerManager) RunLoop() {
}
}
}
// never reach here
}
func (sm *ServerManager) Dispose() {

@ -123,6 +123,9 @@ func ParseRTPPacket(b []byte) (pkt RTPPacket, err error) {
}
// 比较序号的值,内部处理序号翻转问题,见单元测试中的例子
// @return 0 a和b相等
// 1 a大于b
// -1 a小于b
func CompareSeq(a, b uint16) int {
if a == b {
return 0

@ -0,0 +1,150 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtprtcp
type RTPPacketListItem struct {
packet RTPPacket
next *RTPPacketListItem
}
type RTPPacketList struct {
head RTPPacketListItem // 哨兵自身不存放rtp包第一个rtp包存在在head.next中
size int // 实际元素个数
}
type RTPUnpackContainer struct {
maxSize int
unpackerProtocol IRTPUnpackerProtocol
list RTPPacketList
unpackedFlag bool // 是否成功合成过
unpackedSeq uint16 // 成功合成的最后一个seq号
}
func NewRTPUnpackContainer(maxSize int, unpackerProtocol IRTPUnpackerProtocol) *RTPUnpackContainer {
return &RTPUnpackContainer{
maxSize: maxSize,
unpackerProtocol: unpackerProtocol,
}
}
// 输入收到的rtp包
func (r *RTPUnpackContainer) Feed(pkt RTPPacket) {
// 过期的包
if r.isStale(pkt.Header.Seq) {
return
}
// 计算位置
r.unpackerProtocol.CalcPositionIfNeeded(&pkt)
// 根据序号插入有序链表
r.insert(pkt)
// 尽可能多的合成顺序的帧
count := 0
for {
if !r.tryUnpackOneSequential() {
break
}
count++
}
// 合成顺序的帧成功了,直接返回
if count > 0 {
return
}
// 缓存达到最大值
if r.list.size > r.maxSize {
// 尝试合成一帧发生跳跃的帧
packed := r.tryUnpackOne()
if !packed {
// 合成失败了,丢弃一包过期数据
r.list.head.next = r.list.head.next.next
r.list.size--
} else {
// 合成成功了,再次尝试,尽可能多的合成顺序的帧
for {
if !r.tryUnpackOneSequential() {
break
}
}
}
}
}
// 检查rtp包是否已经过期
//
// @return true 表示过期
// false 没过期
//
func (r *RTPUnpackContainer) isStale(seq uint16) bool {
// 从来没有合成成功过
if !r.unpackedFlag {
return false
}
// 序号太小
return CompareSeq(seq, r.unpackedSeq) <= 0
}
// 将rtp包按seq排序插入队列中
func (r *RTPUnpackContainer) insert(pkt RTPPacket) {
r.list.size++
p := &r.list.head
for ; p.next != nil; p = p.next {
res := CompareSeq(pkt.Header.Seq, p.next.packet.Header.Seq)
switch res {
case 0:
return
case 1:
// noop
case -1:
item := &RTPPacketListItem{
packet: pkt,
next: p.next,
}
p.next = item
return
}
}
item := &RTPPacketListItem{
packet: pkt,
next: p.next,
}
p.next = item
}
// 从队列头部尝试合成一个完整的帧。保证这次合成的帧的首个seq和上次合成帧的尾部seq是连续的
func (r *RTPUnpackContainer) tryUnpackOneSequential() bool {
if r.unpackedFlag {
first := r.list.head.next
if first == nil {
return false
}
if SubSeq(first.packet.Header.Seq, r.unpackedSeq) != 1 {
return false
}
}
return r.tryUnpackOne()
}
// 从队列头部尝试合成一个完整的帧。不保证这次合成的帧的首个seq和上次合成帧的尾部seq是连续的
func (r *RTPUnpackContainer) tryUnpackOne() bool {
unpackedFlag, unpackedSeq := r.unpackerProtocol.TryUnpackOne(&r.list)
if unpackedFlag {
r.unpackedFlag = unpackedFlag
r.unpackedSeq = unpackedSeq
}
return unpackedFlag
}

@ -10,31 +10,41 @@ package rtprtcp
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
)
// 传入RTP包合成帧数据并回调。
// 一路音频或一路视频各对应一个对象。
// 目前支持AVCHEVC和AAC MPEG4-GENERIC/44100/2
// 传入RTP包合成帧数据并回调返回
// 一路音频或一路视频各对应一个对象
type RTPPacketListItem struct {
packet RTPPacket
next *RTPPacketListItem
}
var (
_ IRTPUnpacker = &RTPUnpackContainer{}
_ IRTPUnpackContainer = &RTPUnpackContainer{}
_ IRTPUnpackerProtocol = &RTPUnpackerAAC{}
_ IRTPUnpackerProtocol = &RTPUnpackerAVCHEVC{}
)
type RTPPacketList struct {
head RTPPacketListItem // 哨兵自身不存放rtp包第一个rtp包存在在head.next中
size int // 实际元素个数
type IRTPUnpacker interface {
IRTPUnpackContainer
}
type RTPUnpacker struct {
payloadType base.AVPacketPT
clockRate int
maxSize int
onAVPacket OnAVPacket
type IRTPUnpackContainer interface {
Feed(pkt RTPPacket)
}
list RTPPacketList
unpackedFlag bool
unpackedSeq uint16
type IRTPUnpackerProtocol interface {
// 计算rtp包处于帧中的位置
CalcPositionIfNeeded(pkt *RTPPacket)
// 尝试合成一个完整帧
//
// 从当前队列的第一个包开始合成
// 如果一个rtp包对应一个完整帧则合成一帧
// 如果一个rtp包对应多个完整帧则合成多帧
// 如果多个rtp包对应一个完整帧则尝试合成一帧
//
// @return unpackedFlag 本次调用是否成功合成
// @return unpackedSeq 如果成功合成合成使用的最后一个seq号如果失败则为0
TryUnpackOne(list *RTPPacketList) (unpackedFlag bool, unpackedSeq uint16)
}
// @param pkt: pkt.Timestamp RTP包头中的时间戳(pts)经过clockrate换算后的时间戳单位毫秒
@ -46,138 +56,18 @@ type RTPUnpacker struct {
// AVC或者HEVC是新申请的内存块回调结束后内部不再使用该内存块
type OnAVPacket func(pkt base.AVPacket)
func NewRTPUnpacker(payloadType base.AVPacketPT, clockRate int, maxSize int, onAVPacket OnAVPacket) *RTPUnpacker {
return &RTPUnpacker{
payloadType: payloadType,
clockRate: clockRate,
maxSize: maxSize,
onAVPacket: onAVPacket,
}
}
// 输入收到的rtp包
func (r *RTPUnpacker) Feed(pkt RTPPacket) {
if r.isStale(pkt.Header.Seq) {
return
}
r.calcPositionIfNeeded(&pkt)
r.insert(pkt)
// 尽可能多的合成顺序的帧
count := 0
for {
if !r.unpackOneSequential() {
break
}
count++
}
// 合成顺序的帧成功了,直接返回
if count > 0 {
return
}
// 缓存达到最大值
if r.list.size > r.maxSize {
// 尝试合成一帧发生跳跃的帧
if !r.unpackOne() {
// 合成失败了,丢弃过期数据
r.list.head.next = r.list.head.next.next
r.list.size--
}
// 再次尝试,尽可能多的合成顺序的帧
for {
if !r.unpackOneSequential() {
break
}
}
}
}
// 检查rtp包是否已经过期
//
// @return true 表示过期
// false 没过期
//
func (r *RTPUnpacker) isStale(seq uint16) bool {
if !r.unpackedFlag {
return false
}
return CompareSeq(seq, r.unpackedSeq) <= 0
}
// 计算rtp包处于帧中的位置
func (r *RTPUnpacker) calcPositionIfNeeded(pkt *RTPPacket) {
switch r.payloadType {
case base.AVPacketPTAVC:
calcPositionIfNeededAVC(pkt)
case base.AVPacketPTHEVC:
calcPositionIfNeededHEVC(pkt)
// 目前支持AVCHEVC和AAC MPEG4-GENERIC/44100/2业务方也可以自己实现IRTPUnpackerProtocol甚至是IRTPUnpackContainer
func DefaultRTPUnpackerFactory(payloadType base.AVPacketPT, clockRate int, maxSize int, onAVPacket OnAVPacket) IRTPUnpacker {
var protocol IRTPUnpackerProtocol
switch payloadType {
case base.AVPacketPTAAC:
// noop
break
default:
// can't reach here
}
}
// 将rtp包按seq排序插入队列中
func (r *RTPUnpacker) insert(pkt RTPPacket) {
r.list.size++
p := &r.list.head
for ; p.next != nil; p = p.next {
res := CompareSeq(pkt.Header.Seq, p.next.packet.Header.Seq)
switch res {
case 0:
return
case 1:
// noop
case -1:
item := &RTPPacketListItem{
packet: pkt,
next: p.next,
}
p.next = item
return
}
}
item := &RTPPacketListItem{
packet: pkt,
next: p.next,
}
p.next = item
}
// 从队列头部尝试合成一个完整的帧。保证这次合成的帧的首个seq和上次合成帧的尾部seq是连续的
func (r *RTPUnpacker) unpackOneSequential() bool {
if r.unpackedFlag {
first := r.list.head.next
if first == nil {
return false
}
if SubSeq(first.packet.Header.Seq, r.unpackedSeq) != 1 {
return false
}
}
return r.unpackOne()
}
// 从队列头部尝试合成一个完整的帧。不保证这次合成的帧的首个seq和上次合成帧的尾部seq是连续的
func (r *RTPUnpacker) unpackOne() bool {
switch r.payloadType {
case base.AVPacketPTAAC:
return r.unpackOneAAC()
protocol = NewRTPUnpackerAAC(payloadType, clockRate, onAVPacket)
case base.AVPacketPTAVC:
fallthrough
case base.AVPacketPTHEVC:
return r.unpackOneAVCOrHEVC()
protocol = NewRTPUnpackerAVCHEVC(payloadType, clockRate, onAVPacket)
default:
nazalog.Fatalf("payload type not support yet. payloadType=%d", payloadType)
}
return false
return NewRTPUnpackContainer(maxSize, protocol)
}

@ -8,18 +8,30 @@
package rtprtcp
import "github.com/q191201771/lal/pkg/base"
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
)
// AAC格式的流尝试合成一个完整的帧
func (r *RTPUnpacker) unpackOneAAC() bool {
first := r.list.head.next
if first == nil {
return false
type RTPUnpackerAAC struct {
payloadType base.AVPacketPT
clockRate int
onAVPacket OnAVPacket
}
func NewRTPUnpackerAAC(payloadType base.AVPacketPT, clockRate int, onAVPacket OnAVPacket) *RTPUnpackerAAC {
return &RTPUnpackerAAC{
payloadType: payloadType,
clockRate: clockRate,
onAVPacket: onAVPacket,
}
}
// TODO chef:
// 2. 只处理了一个RTP包含多个音频包的情况没有处理一个音频包跨越多个RTP包的情况是否有这种情况
func (unpacker *RTPUnpackerAAC) CalcPositionIfNeeded(pkt *RTPPacket) {
// noop
}
func (unpacker *RTPUnpackerAAC) TryUnpackOne(list *RTPPacketList) (unpackedFlag bool, unpackedSeq uint16) {
// rfc3640 2.11. Global Structure of Payload Format
//
// +---------+-----------+-----------+---------------+
@ -38,46 +50,161 @@ func (r *RTPUnpacker) unpackOneAAC() bool {
//
// rfc3640 3.3.6. High Bit-rate AAC
//
// rtp_parse_mp4_au()
//
//
// 3.2.3.1. Fragmentation
//
// A packet SHALL carry either one or more complete Access Units, or a
// single fragment of an Access Unit. Fragments of the same Access Unit
// have the same time stamp but different RTP sequence numbers. The
// marker bit in the RTP header is 1 on the last fragment of an Access
// Unit, and 0 on all other fragments.
//
b := first.packet.Raw[first.packet.Header.payloadOffset:]
p := list.head.next // first
if p == nil {
return false, 0
}
b := p.packet.Raw[p.packet.Header.payloadOffset:]
//nazalog.Debugf("%d, %d, %s", len(pkt.Raw), pkt.Header.timestamp, hex.Dump(b))
// AU Header Section
var auHeaderLength uint32
auHeaderLength = uint32(b[0])<<8 + uint32(b[1])
auHeaderLength = (auHeaderLength + 7) / 8
//nazalog.Debugf("auHeaderLength=%d", auHeaderLength)
// no Auxiliary Section
pauh := uint32(2) // AU Header pos
pau := uint32(2) + auHeaderLength // AU pos
auNum := uint32(auHeaderLength) / 2
for i := uint32(0); i < auNum; i++ {
var auSize uint32
auSize = uint32(b[pauh])<<8 | uint32(b[pauh+1]&0xF8) // 13bit
auSize /= 8
aus := parseAU(b)
//auIndex := b[pauh+1] & 0x7
if len(aus) == 1 {
if aus[0].size <= uint32(len(b[aus[0].pos:])) {
// one complete access unit
var outPkt base.AVPacket
outPkt.PayloadType = unpacker.payloadType
outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
outPkt.Payload = b[aus[0].pos : aus[0].pos+aus[0].size]
unpacker.onAVPacket(outPkt)
list.head.next = p.next
list.size--
return true, p.packet.Header.Seq
}
// fragmented
// 注意这里我们参考size和rtp包头中的timestamp不参考rtp包头中的mark位
totalSize := aus[0].size
timestamp := p.packet.Header.Timestamp
var as [][]byte
as = append(as, b[aus[0].pos:])
cacheSize := uint32(len(b[aus[0].pos:]))
seq := p.packet.Header.Seq
p = p.next
packetCount := 0
for {
packetCount++
if p == nil {
return false, 0
}
if SubSeq(p.packet.Header.Seq, seq) != 1 {
return false, 0
}
if p.packet.Header.Timestamp != timestamp {
nazalog.Errorf("fragments of the same access shall have the same timestamp. first=%d, curr=%d",
timestamp, p.packet.Header.Timestamp)
return false, 0
}
b = p.packet.Raw[p.packet.Header.payloadOffset:]
aus := parseAU(b)
if len(aus) != 1 {
nazalog.Errorf("shall be a single fragment. len(aus)=%d", len(aus))
return false, 0
}
if aus[0].size != totalSize {
nazalog.Errorf("fragments of the same access shall have the same size. first=%d, curr=%d",
totalSize, aus[0].size)
return false, 0
}
// raw AAC frame
// pau, auSize
//nazalog.Debugf("%d %d %s", auSize, auIndex, hex.Dump(b[pau:pau+auSize]))
cacheSize += uint32(len(b[aus[0].pos:]))
seq = p.packet.Header.Seq
as = append(as, b[aus[0].pos:])
if cacheSize < totalSize {
p = p.next
} else if cacheSize == totalSize {
var outPkt base.AVPacket
outPkt.PayloadType = unpacker.payloadType
outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
for _, a := range as {
outPkt.Payload = append(outPkt.Payload, a...)
}
unpacker.onAVPacket(outPkt)
list.head.next = p.next
list.size -= packetCount
return true, p.packet.Header.Seq
} else {
nazalog.Errorf("cache size bigger then total size. cacheSize=%d, totalSize=%d",
cacheSize, totalSize)
return false, 0
}
}
// can reach here
}
// more complete access unit
for i := range aus {
var outPkt base.AVPacket
outPkt.Timestamp = first.packet.Header.Timestamp / uint32(r.clockRate/1000)
outPkt.Timestamp += i * uint32((1024*1000)/r.clockRate)
outPkt.Payload = b[pau : pau+auSize]
outPkt.PayloadType = r.payloadType
outPkt.PayloadType = unpacker.payloadType
outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
// TODO chef: 这里1024的含义
outPkt.Timestamp += uint32(i * (1024 * 1000) / unpacker.clockRate)
outPkt.Payload = b[aus[i].pos : aus[i].pos+aus[i].size]
unpacker.onAVPacket(outPkt)
}
r.onAVPacket(outPkt)
list.head.next = p.next
list.size--
return true, p.packet.Header.Seq
}
type au struct {
size uint32
pos uint32
}
func parseAU(b []byte) (ret []au) {
// AU Header Section
var auHeadersLength uint32
auHeadersLength = uint32(b[0])<<8 + uint32(b[1])
auHeadersLength = (auHeadersLength + 7) / 8
// TODO chef: 这里的2是写死的正常是外部传入auSize和auIndex所占位数的和
const auHeaderSize = 2
nbAUHeaders := uint32(auHeadersLength) / auHeaderSize // 有多少个AU-Header
pauh := uint32(2) // AU Header pos
pau := uint32(2) + auHeadersLength // AU pos
for i := uint32(0); i < nbAUHeaders; i++ {
// TODO chef: auSize和auIndex所在的位数是写死的13bit3bit标准的做法应该从外部传入比如从sdp中获取后传入
auSize := uint32(b[pauh])<<8 | uint32(b[pauh+1]&0xF8) // 13bit
auSize /= 8
// 注意fragment时auIndex并不可靠。见TestAACCase1
//auIndex := b[pauh+1] & 0x7
//nazalog.Debugf("~ %d %d", auSize, auIndex)
ret = append(ret, au{
size: auSize,
pos: pau,
})
pauh += 2
pau += auSize
}
r.unpackedFlag = true
r.unpackedSeq = first.packet.Header.Seq
r.list.head.next = first.next
r.list.size--
return true
if (nbAUHeaders > 1 && pau != uint32(len(b))) ||
(nbAUHeaders == 1 && pau < uint32(len(b))) {
nazalog.Warnf("rtp packet size invalid. nbAUHeaders=%d, pau=%d, len(b)=%d", nbAUHeaders, pau, len(b))
}
return
}

@ -1,4 +1,4 @@
// Copyright 2020, Chef. All rights reserved.
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
@ -11,112 +11,59 @@ package rtprtcp
import (
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
func calcPositionIfNeededAVC(pkt *RTPPacket) {
b := pkt.Raw[pkt.Header.payloadOffset:]
// rfc3984 5.3. NAL Unit Octet Usage
//
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |F|NRI| Type |
// +---------------+
outerNALUType := avc.ParseNALUType(b[0])
if outerNALUType <= NALUTypeAVCSingleMax {
pkt.positionType = PositionTypeSingle
return
} else if outerNALUType == NALUTypeAVCFUA {
// rfc3984 5.8. Fragmentation Units (FUs)
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | FU indicator | FU header | |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
// | |
// | FU payload |
// | |
// | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | :...OPTIONAL RTP padding |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// FU indicator:
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |F|NRI| Type |
// +---------------+
//
// Fu header:
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |S|E|R| Type |
// +---------------+
fuIndicator := b[0]
_ = fuIndicator
fuHeader := b[1]
startCode := (fuHeader & 0x80) != 0
endCode := (fuHeader & 0x40) != 0
if startCode {
pkt.positionType = PositionTypeFUAStart
return
}
if endCode {
pkt.positionType = PositionTypeFUAEnd
return
}
type RTPUnpackerAVCHEVC struct {
payloadType base.AVPacketPT
clockRate int
onAVPacket OnAVPacket
}
pkt.positionType = PositionTypeFUAMiddle
return
} else if outerNALUType == NALUTypeAVCSTAPA {
pkt.positionType = PositionTypeSTAPA
} else {
nazalog.Errorf("unknown nalu type. outerNALUType=%d", outerNALUType)
func NewRTPUnpackerAVCHEVC(payloadType base.AVPacketPT, clockRate int, onAVPacket OnAVPacket) *RTPUnpackerAVCHEVC {
return &RTPUnpackerAVCHEVC{
payloadType: payloadType,
clockRate: clockRate,
onAVPacket: onAVPacket,
}
}
return
func (unpacker *RTPUnpackerAVCHEVC) CalcPositionIfNeeded(pkt *RTPPacket) {
switch unpacker.payloadType {
case base.AVPacketPTAVC:
calcPositionIfNeededAVC(pkt)
case base.AVPacketPTHEVC:
calcPositionIfNeededHEVC(pkt)
}
}
// AVC或HEVC格式的流尝试合成一个完整的帧
func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool {
first := r.list.head.next
func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedFlag bool, unpackedSeq uint16) {
first := list.head.next
if first == nil {
return false
return false, 0
}
switch first.packet.positionType {
case PositionTypeSingle:
var pkt base.AVPacket
pkt.PayloadType = r.payloadType
pkt.Timestamp = first.packet.Header.Timestamp / uint32(r.clockRate/1000)
pkt.PayloadType = unpacker.payloadType
pkt.Timestamp = first.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
pkt.Payload = make([]byte, len(first.packet.Raw)-int(first.packet.Header.payloadOffset)+4)
bele.BEPutUint32(pkt.Payload, uint32(len(first.packet.Raw))-first.packet.Header.payloadOffset)
copy(pkt.Payload[4:], first.packet.Raw[first.packet.Header.payloadOffset:])
r.unpackedFlag = true
r.unpackedSeq = first.packet.Header.Seq
r.list.head.next = first.next
r.list.size--
r.onAVPacket(pkt)
return true
list.head.next = first.next
list.size--
unpacker.onAVPacket(pkt)
return true, first.packet.Header.Seq
case PositionTypeSTAPA:
var pkt base.AVPacket
pkt.PayloadType = r.payloadType
pkt.Timestamp = first.packet.Header.Timestamp / uint32(r.clockRate/1000)
pkt.PayloadType = unpacker.payloadType
pkt.Timestamp = first.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
// 跳过首字节并且将多nalu前的2字节长度替换成4字节长度
buf := first.packet.Raw[first.packet.Header.payloadOffset+1:]
@ -126,7 +73,7 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool {
for i := 0; i != len(buf); {
if len(buf)-i < 2 {
nazalog.Errorf("invalid STAP-A packet.")
return false
return false, 0
}
naluSize := int(bele.BEUint16(buf[i:]))
totalSize += 4 + naluSize
@ -143,23 +90,21 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool {
i += 2 + naluSize
}
r.unpackedFlag = true
r.unpackedSeq = first.packet.Header.Seq
r.list.head.next = first.next
r.list.size--
r.onAVPacket(pkt)
list.head.next = first.next
list.size--
unpacker.onAVPacket(pkt)
return true
return true, first.packet.Header.Seq
case PositionTypeFUAStart:
prev := first
p := first.next
for {
if prev == nil || p == nil {
return false
return false, 0
}
if SubSeq(p.packet.Header.Seq, prev.packet.Header.Seq) != 1 {
return false
return false, 0
}
if p.packet.positionType == PositionTypeFUAMiddle {
@ -168,12 +113,12 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool {
continue
} else if p.packet.positionType == PositionTypeFUAEnd {
var pkt base.AVPacket
pkt.PayloadType = r.payloadType
pkt.Timestamp = p.packet.Header.Timestamp / uint32(r.clockRate/1000)
pkt.PayloadType = unpacker.payloadType
pkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
var naluTypeLen int
var naluType []byte
if r.payloadType == base.AVPacketPTAVC {
if unpacker.payloadType == base.AVPacketPTAVC {
naluTypeLen = 1
naluType = make([]byte, naluTypeLen)
@ -204,7 +149,7 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool {
pkt.Payload = make([]byte, totalSize+4+naluTypeLen)
bele.BEPutUint32(pkt.Payload, uint32(totalSize+naluTypeLen))
var index int
if r.payloadType == base.AVPacketPTAVC {
if unpacker.payloadType == base.AVPacketPTAVC {
pkt.Payload[4] = naluType[0]
index = 5
} else {
@ -225,17 +170,15 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool {
pp = pp.next
}
r.unpackedFlag = true
r.unpackedSeq = p.packet.Header.Seq
r.list.head.next = p.next
r.list.size -= packetCount
r.onAVPacket(pkt)
list.head.next = p.next
list.size -= packetCount
unpacker.onAVPacket(pkt)
return true
return true, p.packet.Header.Seq
} else {
// 不应该出现其他类型
nazalog.Errorf("invalid position type. position=%d", p.packet.positionType)
return false
return false, 0
}
}
@ -247,5 +190,150 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool {
nazalog.Errorf("invalid position. pos=%d", first.packet.positionType)
}
return false
return false, 0
}
func calcPositionIfNeededAVC(pkt *RTPPacket) {
b := pkt.Raw[pkt.Header.payloadOffset:]
// rfc3984 5.3. NAL Unit Octet Usage
//
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |F|NRI| Type |
// +---------------+
outerNALUType := avc.ParseNALUType(b[0])
if outerNALUType <= NALUTypeAVCSingleMax {
pkt.positionType = PositionTypeSingle
return
} else if outerNALUType == NALUTypeAVCFUA {
// rfc3984 5.8. Fragmentation Units (FUs)
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | FU indicator | FU header | |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
// | |
// | FU payload |
// | |
// | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | :...OPTIONAL RTP padding |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// FU indicator:
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |F|NRI| Type |
// +---------------+
//
// Fu header:
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |S|E|R| Type |
// +---------------+
fuIndicator := b[0]
_ = fuIndicator
fuHeader := b[1]
startCode := (fuHeader & 0x80) != 0
endCode := (fuHeader & 0x40) != 0
if startCode {
pkt.positionType = PositionTypeFUAStart
return
}
if endCode {
pkt.positionType = PositionTypeFUAEnd
return
}
pkt.positionType = PositionTypeFUAMiddle
return
} else if outerNALUType == NALUTypeAVCSTAPA {
pkt.positionType = PositionTypeSTAPA
} else {
nazalog.Errorf("unknown nalu type. outerNALUType=%d", outerNALUType)
}
return
}
func calcPositionIfNeededHEVC(pkt *RTPPacket) {
b := pkt.Raw[pkt.Header.payloadOffset:]
// +---------------+---------------+
// |0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |F| Type | LayerId | TID |
// +-------------+-----------------+
outerNALUType := hevc.ParseNALUType(b[0])
switch outerNALUType {
case hevc.NALUTypeVPS:
fallthrough
case hevc.NALUTypeSPS:
fallthrough
case hevc.NALUTypePPS:
fallthrough
case hevc.NALUTypeSEI:
fallthrough
case hevc.NALUTypeSliceTrailR:
fallthrough
case hevc.NALUTypeSliceIDRNLP:
pkt.positionType = PositionTypeSingle
return
case NALUTypeHEVCFUA:
// Figure 1: The Structure of the HEVC NAL Unit Header
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | PayloadHdr (Type=49) | FU header | DONL (cond) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
// | DONL (cond) | |
// |-+-+-+-+-+-+-+-+ |
// | FU payload |
// | |
// | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | :...OPTIONAL RTP padding |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// Figure 9: The Structure of an FU
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |S|E| FuType |
// +---------------+
// Figure 10: The Structure of FU Header
startCode := (b[2] & 0x80) != 0
endCode := (b[2] & 0x40) != 0
if startCode {
pkt.positionType = PositionTypeFUAStart
return
}
if endCode {
pkt.positionType = PositionTypeFUAEnd
return
}
pkt.positionType = PositionTypeFUAMiddle
return
default:
// TODO chef: 没有实现 AP 48
nazalog.Errorf("unknown nalu type. outerNALUType=%d", outerNALUType)
}
}

@ -1,89 +0,0 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtprtcp
import (
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/naza/pkg/nazalog"
)
func calcPositionIfNeededHEVC(pkt *RTPPacket) {
b := pkt.Raw[pkt.Header.payloadOffset:]
// +---------------+---------------+
// |0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |F| Type | LayerId | TID |
// +-------------+-----------------+
outerNALUType := hevc.ParseNALUType(b[0])
switch outerNALUType {
case hevc.NALUTypeVPS:
fallthrough
case hevc.NALUTypeSPS:
fallthrough
case hevc.NALUTypePPS:
fallthrough
case hevc.NALUTypeSEI:
fallthrough
case hevc.NALUTypeSliceTrailR:
fallthrough
case hevc.NALUTypeSliceIDRNLP:
pkt.positionType = PositionTypeSingle
return
case NALUTypeHEVCFUA:
// Figure 1: The Structure of the HEVC NAL Unit Header
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | PayloadHdr (Type=49) | FU header | DONL (cond) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
// | DONL (cond) | |
// |-+-+-+-+-+-+-+-+ |
// | FU payload |
// | |
// | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | :...OPTIONAL RTP padding |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// Figure 9: The Structure of an FU
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |S|E| FuType |
// +---------------+
// Figure 10: The Structure of FU Header
startCode := (b[2] & 0x80) != 0
endCode := (b[2] & 0x40) != 0
if startCode {
pkt.positionType = PositionTypeFUAStart
return
}
if endCode {
pkt.positionType = PositionTypeFUAEnd
return
}
pkt.positionType = PositionTypeFUAMiddle
return
default:
// TODO chef: 没有实现 AP 48
nazalog.Errorf("unknown nalu type. outerNALUType=%d", outerNALUType)
}
}
// hevc rtp包合帧部分见func unpackOneAVCOrHEVC

@ -0,0 +1,128 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtprtcp
import (
"encoding/hex"
"testing"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/assert"
)
func TestAACCase1(t *testing.T) {
ss := []string{
"80e10e9a56843e4cf0bdf2fe00102d10214e6c425f74815f92415f94415f924100000114008a004027f313d564a770026fd01203c9cbac420e1aecaa41efb4619391309daa7938b4b905989c5c293c024819e4234eb324934327e3f083c98c2220137bc9b5b2bae5809351710e4bfec0dec11fffe0ab6e568f8357fb74c3461e892fc3a4e22ac512fa73bdc13004b9d73e21c5221387922692939994e38ab1516ea2cfb64df8cffb5468a81e4704f255a5139f7f21d622b944e0b08221916d4ba6e10a31b2a148c38642d1c8b2b12a15681c0f0b589160feb32b43c7ebade413590916a934148ca7915d6250904e5172a3ecc612ab23fd4431b0eef591be52155e4a0ac8564108012689c4df89221892e50d8fca64a747bb2313908211004222ea6a848132e87935978e03845905ccc057c743c7424e64e2338bfe52090d4a5a961f35ec5e14befbc2b3d41f89bdfe949fc2dd40e141b13e397f84f7e1ef43f303df77407bdacc01c2832c3d3df65fe3bfde0001f37f85f4a021f27baf11e4f388fd5ce464337648fc89e6043a0c8268a44f0e024e112910c8b9bc282ee2e5587762b2752c08a422ac99558f904273f858d9de9938ef2502493ae3217c247410eb39395f084a117f6486161103c5cab0881d7c24ffd7123cdcac226bc292c2dc213d04f0b26dd864e6e149c3a4468a48d86124209e172166c5b331e434c8cae4276e191acc2682d692a51210c1ce2254102e048435919442449f2c57bb271364fbb67f1d79364ee004a28c88e75dd53210eef613186531f22908a5253a87e66423fe1bffef7dd95fed588acf815908808fefff2ffef581fe23b2b8e9fdfbd7e3f7071c05d40ec89401efbdf87beee9a9c1eff2d054c0f7ff5febc514514514514514514517d7fe0faf145145111fc0dd8218d9f9d064ee8895cb84ee848064e7593530495f87ffa276e76549a4e6c0e381270c24ea288e0e1d8ad270f0dc243210e413a84b160e085b10a416be12011022b31354b202020771fe376bf6afe387e67fe0fee8087bee102bf415fa0afd0400000e3e3e3fe001f8f8ff53eff60fafab04fe0877121cebad11c16048271174dc251cbdd3da84e0c9ba23dd312b26500020154a269d9c4520270444131665313103f0f7412d40de5c59cfff0f6383440fc588e986f10b35ddbfdb6be780f30eb25f49fd7e479062097adbc044ba3e0c956ef043c31d188a23d453e83ca71ca44df64805f4514916e71c7e760674293094980642012d104ba2cacd260456e2e93a8c1f492739f9dd445aba9241088de157cb3d976e20888a41b34808f3a4ca181528c989641e0a187dc5443494c896e4421711580a84276a12a229662108289464d105b1513f80841836f009c987f86969641eea943c86c475685fb09043ae947ac5d22ff6ff7c1d380a48885c2a7d03905668ec9fd1fcdf7fc700450094d16f034c90003df9240b851764658e101efb8ea83855071d6871c5126a8385556aa0db094e571d9dc7271352b846810c4e24000000000000038e60384c4f09800000021e17eb447ca381218ff036078221ec1e7843a466acc013c0130585c7518330993235855b71158610843210aa9ba05c22ea5984de5e1227e3e4d2dd942a1659f5320232b45bb6c6418df87b313c727fc3f1d170a6ecb0b74e4449209804a3bf270c98518182b79dd74412c24f7e4f45131a889442502df3928282006d0f165c6e4d39059e7404eb1092044c67267817686dc7dbe82281f1d2f65ddaf70e67250c77adbe39d09be32b8f0007d2f905140d3c34fb6784075ce77075dfb069da091017344c80b6b9a36ff1d93c22889d5a7ccaa2073ccf278e1f8e9c38e1c9a8f502b8e9c84938542ae3a7e1313c26240000000000000384c4f1cc4f09890000021f49fc3847d1b76e8613938122e759d2494b9e4634fc812ab747e3132beb075bf23fed67a0943a245082648a4d06e11166a25f110a24c0c993964042ff0a8d24e4108c489514499641221f84c013c06008e0301c2e0038e6009e0327c7326000001c",
"80e10e9b5684424cf0bdf2fe00102ce8217c4445fd4080000000000021e10fe2c047f365f8a423e48fb4c479f78f48f9e7c2647b178548f27f1693ecdec1b1f704b84989705ea8436bb5241b849b83253444f41189a621383b227b2aa4db0721db25369113cd2598864316d24dd1f1cb7c72df0b049c182460e0f8ee4b846115622efa44950683513c2cee38dcecd270f05c7544d4a274d44b2f90272ed9352894b51182c2583a846952e12092a6a2106913834eef824e0d42188a6455049a95c72590a6a232367c71404a9ab843f1c9a4df84e10e40f6838e5a2534e47187e12d71cd07093486821f1d7f1dc6123e3c83ed71c8c43198fe387207390b56b8e49a8ae13c8c6254e490934c8cdc6903e0091f00429d699b5041ef20f3f0a7218c3933e3c9da8dc25a231ad10c65a24fd011895489f0e4ee61c8d5cd1124d25630842ae1ff449e1c84f55608c489c2552106c10b812798918226672119f548e1cf334f99e2139b109c1391cc26a58c46f6289cc2108d9da924112f01ba2bd4f2081495314996e13246278f51291188032e431a4e12e13c7cc211884e428996e12228948915260896731b5314848c511246b16458923844939f9ae10f82a664aa46aa49e3c845292512a90cc61c844b84f0f548444ccb565725d50c89af92363889a7dd5267e912adf22f2127d4217e1910c32724d74867ea24de12318e4a6cd231ad71c3923c620739239c8cd3f0b1b8ecd2734e4669c81ce4ec1c81e3118d378edc2520e4621085c21021ea1493244267904c8424490151c499a254310912a913c82210920bc8049c71e4030f1841e102210f08f9543c2867f081008480424c2193d7ffde103fff7ffdffdc7e1c7e19cf081e103ffce10201fc80019543fc9c2a88843c2872a07f97f6003f93f943f3801fedfc9fb1fecf7f7e00ffb003cffdffec1ff7fd8ffb7e0ffee16717feffeff8200ffb07e7ffedfeffb201fc9ff6ff600000ff60f7fc705ef7fdbfdfdffbc0ff7ff60f7dff700f7ffec1ff7e3a2000000000008bbfa81000000000bd757353a04bb17e6748fdcefa6647ea8fd4223f3a7ab114f2227c33784f63025af4510f7f4e27773a4fe09f4e238fe2e470f9b2649a45f8827c9b9f12d5d5267b533da22439374822fb39d9e4819920b270842306411832099e31139c89e690d963f84a24423b7211309c905f2b1b8e908df2129a8e381201090861b12d906848c80916c8267a247315bb2b85392861acce44f0c81e1103d62038640272610f08d250c24715778e078edb210c39dd5c71e4a3ceceeace8524a571c2e3d7fff091544148e11e41181e3b1f84a79d23118e5e115c77098f15ffe90296704d6323125cbd0e4c864df533b2b3a28843d11280b211e3e3f8f9d55ffdcef1b852902ab8e978e14947449e1cef009c8804433b2a3c81eed121e1287a2928278370812718e46462498564a844cac823403f762741126225c7e3c8c44312b65135baefa5d244e5288c59b812884a55d9149c55138b23c389560d06b217f17fc6e12413ad4ab7a84961271646570925ac8de5607309c55105bbc3082e8115d0e12e13a13bff64e8442214110c4bbeb1189124c0900c093a41380193a2100a32a1484a4117b31f55c7af212a5112ac830bc8d284320c464141394fb3e511293a17ff849efe3a5c7caffd7fffffe63d781353f1fbf1ebfffec7f5bff5c75fc76463f5f1fdd923fd24d11394b3aaff5f959612e224d44bc8cac1ceebfff7ccf3a86739d1fc757e7c1ffecd73a2fff78f561ffe7ff84ce799e271c270872242112138e1091c848e4e3b13843f087e3a4e10e01c2c100027065f0b0785091972f85964a0cbe120f1d8201c2c1e1208108d1803848dc2cbe3b2f848dc7308425c600e39180384c271d97c725871d97c768f1c964a5d1e3b2c3204be3b2f85964e5d100e392c09e0301c7301c2e038e6240385c4f1cc07098000000e13001c72c71c6f096384c4f0b89e396385b01c2e2784c4871db1c771201c7359c270b0e3bc2f8ed700000000000e",
"80610e9c5684464cf0bdf2fe00102db8211a0455fd2003dbd44f3d9b002dd3e3cdef4097d20fbaa47f5b9c188fa9524fb075d2184b84f1fb320cdf93e3be3427dcee120512392e584b07048931a4f0e923073642e6dec4391c3e62ecd4133d32184ab32c421124114e9384e09c72c64fb1c2d1a8cf33442756a91261c963544a49c9cd9ba0909a7256ad119b3493dfc2e089435109782234d5c2d8278db0047619e20d81c2d3233b124605badade62233a5909d4c9406e769c4675b25025811916f8e378ed1e3ad216f177e21528909ef2640907d1201a846fdf22679008654bfc75fc2849bde40c4206792904226be44350811f3f9a7e7f1c092bab228ba4db0c84d41044d24dc571d0929b74922d909a19f833e4f24f791c24327a37926bc9bda45a426d9f2ac424186490de1024aedbe3b2c966aa708de16ef1cc713b61e3b348a0241eac7f0ff29c767135371e4e250dff84cef7484cba467cfe15e429c122b90420e0c848210590848a646e1091241090b210689182423228912138e12b1a240212418e4a1c3270864470adca84027210a8901d12279b6fc2255a112850894e7da6ea2c04b2d2091084a14eceb6485d21005d2565e49e4236615a0f26eba41e427225e3e9dc2bc9d9b645f88232e16777e3c7fffe45ef2407f1dbb9d41ffa24e793b242207f1d3f084a8884e4248d89b50108c488462134026725472094139139f8e412473ccd367f9f534fe127900c3230c248269549c7084c24261864e1be57864a1865786442195a191b91bb289c18c46e412583b96edcf7fc241e16e5bd83ac20f1c8dc76080138d18841824ae4621060d610495cc1dbf078e4125060f1d824a0c12706e0567078ec1e12e7097385e2bc773dc7727c733dc264f8e64c0009a327c2e78038ee7b8ee4c967b27c271fe39cd78e71f0000e178ff09c7f8e735000e178f871ce6bc2f1fe178f8709c7c384c971ce3fc274e0e3bc7f8ef1f00e39c7f84e9c04375bae3bac0e3bbae13580001c73a170bace1359c26eb85dd13dd6b385d6709c7f84e6bc271f0233b9a870bc7c384e3fc738f871ce6bc739a8709cd4236fe903aefae6bf1df8bd4e7384ec025f57e7cc012f89fdf88fe2c7e6490f055c27f0f37c46748215b5a4373009f60f7d10f1c4823c131043ddfdf09632313562f3b6148c6cd11e0d9120dcee74804e25f24190467064d449ab271365c2e1b8e6009e9701c25827560da5548448849b85c79778fc82d7c2a0943093420843864e6cd270e590c12b8ec3274d5c2349e2da4a8d1e16d920c9e15f2728849864570c8ad72941949177be5995292c9be0f1d8c46d4de38702318e466bf858846e41995244e4a8e09025cbb2acc97081cf95e4e9dc29ed8237152cd5bb5646ab320c3e0644b56592ffd89369117378ea0901c45c0e382b324130378475980b3cb8f4ad4421225b912d4194e0ddb071f412358b9091c23b2048e16a9124925021f1d270ad24493ff6224059ee2250f6a91808e3af221935253ba0b7440bac64a449b67852f082270961762eec844162bb246565905c2e393ff609ae7feb2ec3e543fe73214c248454306648248f0ffbbda57641cecf990f32a2d924f2ca8424d21332ff5a4a33ee93ddb24836492396ef2913c62442cfb26ed2d8aba99fc296c63d125c1635d65a90bdae44a524d2f1d9374b7f1a7c2ddc722d2fecffbd9e520879122ecd2b4f427085fbf122a2ee83fade3907f5c49652280930d0e103951776af2b44cec8ffeffab2bae5e5925a2ee5925a3f5a4d68ad9a4674e24a6d6eaa0e9d053b8e4ee115b6109d4c8aa771d6f1d500106b4841836ec621060918372dcc191b9078e4125060f1d824a0c1aca081283738ec1f7bef785171dc4f1cc071cc4f0b80e3b8000025b3c070b8900e3b89e39802789c0709bae3bc2f8e6e8000385dd709bae3bc2c00385dd071de17c26b384d6070bac0e13a871dd6709c2c38e6b38e6b0038eeb384e1604375bae39ac0e3bbae17580001c71fc26b385d6709bae17744b75ace1359c2eb384dd70bac023d0b741c26b0385d671dd6071cdd",
"80e10e9d5684464cf0bdf2fe00102db871cdd0709ba0e0",
}
var pkts []RTPPacket
for _, s := range ss {
pkt, err := hexstream2rtppacket(s)
assert.Equal(t, nil, err)
pkts = append(pkts, pkt)
}
expected := []base.AVPacket{
{
Timestamp: 30239734,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[0].Raw[12+4:],
},
{
Timestamp: 30239756,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[1].Raw[12+4:],
},
{
Timestamp: 30239777,
PayloadType: base.AVPacketPTAAC,
Payload: append(pkts[2].Raw[12+4:], pkts[3].Raw[12+4:]...),
},
}
var outPkts []base.AVPacket
unpacker := DefaultRTPUnpackerFactory(base.AVPacketPTAAC, 48000, 128, func(pkt base.AVPacket) {
outPkts = append(outPkts, pkt)
})
for _, pkt := range pkts {
unpacker.Feed(pkt)
}
assert.Equal(t, expected, outPkts)
}
func TestAACCase2(t *testing.T) {
ss := []string{
"80e104558424aa36b06db689003000c000300030de04004c61766335372e3130372e31303000422008c11838211004608c1c212004608c1c",
"80e104568424b636b06db68900102d7020824ecd7ec9037f01bf9cdfe1af65fe2f8929dfff5754fd6be1a938fffbd772fdae7b717ad7cfff15d712f87c756ebec000025c1677100007f1aec2801869302bacc0003ecf821400b0a24b8fca00008c628be329f92424a4989200000001598ba8883e3d8a700000000c0dca33e12a4380000000000064f04e8520b013044271d16820000000000000834fdd38181d45ee0000000000000aec04ce29d0a41a9fe09352c00000000000000025a110233005e4f613687cf3000e8d0000fec7dcb47a7d482a131082024d2655350e6204812e36826cb0cd6c45100950191294826310282650139f33fcf5345951945c1ba584e00f071139b28839e4081a1c44d5389b49d704d4e26b06547d0b0c8162ecaad8b44d0cbf3a5020b7d4a69925d6d0c82265a8dc7376c2a9aa58af99d73d8e504b4796c377322153aaee260019f4b2a20900fe61db59003fe622264ca9f39f6d6c36ed9fc1ca84bbcb451e3b9789f5dfc5ba65d0cb0dda74503aa48962118439b889152a16595d624caa02214e0c5a9012b1e8326760a222c5114bec74e7f9fd3af736db38fa3f977887413632f7634bc5ad4f93a058c6fff3316573d9f2c834128c5caa7268749c22063642935a8a57371f5aa2c991ae925664268630db8b26621094cff8fd94bee0267844e6a4810741aad616577d4116df07ad91124998dfae2288248e720105dc8b1c84a356938445f66d3054a7273536e12cf5d88ebae01228ae945da7a0a1107109c29ddc84173c80da4254ca0436fbf1ea6ee1d669a21ddf64ca8c1459d4142d420e45aa62514feb5352c0722f16f04f1ab25183bafa0c9803dc44081f48a4350ede00bac2031ebbe1f2e05632091e060aca20d2dbc91864de8b3e21186722c310801ad884ce722f836fd2ad4372e750c991b2fd23338e7e791953890e1e5707a41121884d29054a210e0904088a679148884151284d9bc9497105970429181170237ad67655484244875ac02320a4cd66c79b900b52c624b011203dcfb8a7091c2b0500034a74d9b65c8717f31bf94dfca552b3ffc7f6fff63aea4037fff67ee4f8d17f7af13ffd9f75fdc5fc5bd03ebdfc21f97f68775e98d1d330b000095e21085408aa263fa21fe0f1f5b02644d14a21326d6b1c9d087293486067e55804f53b0258584433d972144393e713c25b2439442b4fbb219364cb5a11063f00479b9318367100035ef25f3f9318676010317b94999d59a880958f11298c9b59958241ee95444c69b3cf95184201e773936a7f3c412c960a4cd0c9c1273d7d931c37ff6d9d929418b8a94ba43b78934116c9e0bb4c426d6cef2ecc4128781f52a8239071492ce457325d8846086cf25da0f16d8b90c7802ea60d0d0b72d9e6fa8c9f17f2445c39d8fa80800c48e4958dad48ac846d8ad1463e5d72326c95518ff824593090c24165a9e3123b092535d0eb26ca00b30d320b26c8c15b2c03ab49513cb42200013203e03ed995c9b6f64407e93590fe3b91b46fd7e836649f75ee1adc19fb424b45d67997f6a4822f929ef35faddb5df5c53581758c62118564b229411952c81164aae379c084435068ec3223824d712cf19144e250d241082500b5a0c91e191b83c7916a0904a8472259a46942941249d81b785ff89201ea30130a2a474ce62261657760a99323e547108ade54a8e57faefd9549589f0217754f7b64165c0e5ddbc30a69d1b60a8358850e615eaffaf67cfb739b008aebe90e5cdb6599d9411eb29d4bf1193626020bbd249ada1104e39499436342c7e7d9ffd9bb473ee308539a47090c84f69025f2436938b648e12e11aa12260e053c981595e0918abb365111beb127dba50058d03203fe97ea545a7be2a222a70eecada1cf44545fc92fa001c46e917f9f3e4cc0b9b3cf647886082ec8b97fdc800446440b4d7ee15355c86a20181688e56792390929dffe60514948464f344321cdb324ecaacc19dc776c0e59f3bb4d7831c9ae5fed2662cc82f98bdf7ff8659e053949f8f4f8f04bb7bf96d2dcdb3f0af748fb7edfd5b0e0",
"80e104578424ba31b06db68900101710217c8fcffbffffffff1979833210cc1418840000000125d494cbd56b7a9f71939a407bb23c36490157b3716461bc99a2d721bb5f9322e942220d168209551277cedec9ac2411dde7ddd3f1098064caaae00452acec522331102b6bfa04f7f0f26031a48a98c3bb369e8df22dffd4fd97ffeec0fbb3c6e2b715faf8ef3c6542f86755595f618c773f9e743e6187bab6ae9ce8c9334bdfcef0fc4a22fad25b7d627d0658d1b0787baf71df6df58cd130f1ee8691a96c97c6d78c7696c2b6344b4c5a316341cbd9301eff71baad0e92a58c7f4ebad240852f1a802a7231f4f674e925b06f8efda7744cd6626a102f7e444844b6b10530e512da58e67a4b5823da6f230639396fb448c884a800b66ed7bd39648dceaa5ebc96f5db55e15e311a928a4c14d808318140ac573482c6d952f86ba62071addd4aba2ea4ec411290c5895734ac6aa71a227024b904ba9500f2625574a6ae33a4a3711301219b387e7a1cf996d1d5d21fcfd26d4684d449dd6a1f6edc67af2e0be6e611af2a0c0ad0450128040c82006f93201d01e8b1da15175910c8a08f31785396e7d69fd0115e62d246cc9c64e91c2c9a20232e347629151e210000000049752532f55adea7dc5a0194f8491726a28be359087d659d41678fa573b26eb25664ca0f575139ebf21f8d1bf4f5b5fdee41544239171dff3f4c688fa761594a14ddc40aa522794342bb1dd508bf1b8bb3f8e4a693ecb1cff09c786e084aabc551be968b928cd1519b62b14df3854c6026a84d74a584d3f4ccd9ae7330b4ab0dd1da78a3d355f9ec9718f772945b02f25c34e2db785eb9285f31d74e74003b98948261aaea04c173aae18453f37483fefb0792099cebbd230ee44e059980d83d983025601a7e9aa1a08850830de3e2991b508b305a280e99e6f3b3b6ae65b7e20cd36ba24180e55600cb2b0f56197f41b86417aca1257af9603ec6dd2e569d1c455b9f83c389629e4135e660ee55004e738babd3fedff03562b0896f85c87b7c9ddf0f48b9db3c60c8afcb500380",
"80e104588424be31b06db689001016d8211c8d2ff7fffffffd1975a43251621000000006af6ba26aa2e3f904d00271b1e487d009e4320471d8f204892ad2252d72b8700260e8b4113e20831048a5a0438f1b93832a048415ff73b5bf69631c8a4e4ca6eb89fc8454ab3558b7ed96e5557083eb2ac47779eb23fa2d680f806dfa4ffd6590df3f6e990132030626063e01e73386af34ea3ff2746f376e6c8fcc3f0574828a15f1375e386521f5b50f0f4aea7ffdb9e0d27caff771bbfd4788dcbd118e539df41f01be771cf33af56f9a35090fd7b57dfd8f557cb9e398b7df5f1ac4d4bb56f69cdd2211b6f49679293c43dc7b283840528ea9f573ee24cc7c055d4b9844491583e315d70af6dd5e73fdab59fd17d532f48dda3cb452cba838dccbbdc2b7d5b4699a8e565a4b71035f2a9bd97bb88b90057c240692aa50aca5486142176a21104b5b19c1a691b086d4118810597934c2302e49091073911ba492badc8dba7ec264c5d29602f779ff3b58a48c510a681f42e7cdfd3ecf37fa7e8f6f8eda069baed02641111cea27687eba9db1f5471d5bb4734a6ca7f63caa22f5c04b5df2a7a5f75000ca648557f5c4ee3bca7e079e382643a1a808cb851d8e67222204200000000d5ed744d545c7f916413a981219ae144e8cafae1383088cb0139c294c240e3f9c9dc1d324047aaf90dd06794b20bebd8f4d73ffefb80f3d6dfde7dddd613573e886de98f79a921d12eb49909e8b4415f66cdc863a11bf1aaada975bd2d91e20d7f58ca2a3219d9bb55d873c2c399a17028a2bdc09a513165845609d47ee11f4f5e8822407a13c1f876c728291c28931eb6004cc5e86a441a29503a82c2b848200ee55e3df0880cc00dbd92d204170376d1d2c19355727b70dd508350425a031cb73d7fee9bbd887878b6b4a3ebb056e59b44b1e7a4884f1e3cfdab2b3607e594e2cf47eb553f54696c0acb08ba8b411ed053572db39a5c885723bc0bd2d97d92b27b5381d5bd555ae9fba09cde8ea4e66a7bc1b3488deebff89ad1d8aac49546b79e301c0",
}
var pkts []RTPPacket
for _, s := range ss {
pkt, err := hexstream2rtppacket(s)
assert.Equal(t, nil, err)
pkts = append(pkts, pkt)
}
expected := []base.AVPacket{
{
Timestamp: 69281105,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[0].Raw[12+2+6 : 12+2+6+24],
},
{
Timestamp: 69281137,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[0].Raw[12+2+6+24 : 12+2+6+24+6],
},
{
Timestamp: 69281169,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[0].Raw[12+2+6+24+6:],
},
{
Timestamp: 69281201,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[1].Raw[12+4:],
},
{
Timestamp: 69281233,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[2].Raw[12+4:],
},
{
Timestamp: 69281265,
PayloadType: base.AVPacketPTAAC,
Payload: pkts[3].Raw[12+4:],
},
}
var outPkts []base.AVPacket
unpacker := DefaultRTPUnpackerFactory(base.AVPacketPTAAC, 32000, 128, func(pkt base.AVPacket) {
//nazalog.Infof("out: %d, %d", pkt.Timestamp, len(pkt.Payload))
outPkts = append(outPkts, pkt)
})
for _, pkt := range pkts {
//nazalog.Infof("in: %+v %d", pkt.Header, len(pkt.Raw))
unpacker.Feed(pkt)
}
assert.Equal(t, expected, outPkts)
}
func hexstream2rtppacket(in string) (pkt RTPPacket, err error) {
var raw []byte
raw, err = hex.DecodeString(in)
if err != nil {
return
}
pkt, err = ParseRTPPacket(raw)
return
}

@ -72,8 +72,8 @@ type BaseInSession struct {
audioRRProducer *rtprtcp.RRProducer
videoRRProducer *rtprtcp.RRProducer
audioUnpacker *rtprtcp.RTPUnpacker
videoUnpacker *rtprtcp.RTPUnpacker
audioUnpacker rtprtcp.IRTPUnpacker
videoUnpacker rtprtcp.IRTPUnpacker
audioSSRC nazaatomic.Uint32
videoSSRC nazaatomic.Uint32
@ -114,12 +114,12 @@ func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicCo
session.mu.Unlock()
if session.sdpLogicCtx.IsAudioUnpackable() {
session.audioUnpacker = rtprtcp.NewRTPUnpacker(session.sdpLogicCtx.GetAudioPayloadTypeBase(), session.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked)
session.audioUnpacker = rtprtcp.DefaultRTPUnpackerFactory(session.sdpLogicCtx.GetAudioPayloadTypeBase(), session.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked)
} else {
nazalog.Warnf("[%s] audio unpacker not support for this type yet.", session.uniqueKey)
}
if session.sdpLogicCtx.IsVideoUnpackable() {
session.videoUnpacker = rtprtcp.NewRTPUnpacker(session.sdpLogicCtx.GetVideoPayloadTypeBase(), session.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked)
session.videoUnpacker = rtprtcp.DefaultRTPUnpackerFactory(session.sdpLogicCtx.GetVideoPayloadTypeBase(), session.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked)
} else {
nazalog.Warnf("[%s] video unpacker not support this type yet.", session.uniqueKey)
}

Loading…
Cancel
Save