diff --git a/README.md b/README.md index e3ac884..25f02dc 100644 --- a/README.md +++ b/README.md @@ -1,65 +1,89 @@ -

- -Live And Live - -
- - - -
- - -
- - -
-

- ---- - -lal是一个开源GoLang直播流媒体网络传输项目,包含三个主要组成部分: - -- lalserver:流媒体转发服务器。类似于`nginx-rtmp-module`等应用,但支持更多的协议,提供更丰富的功能。[lalserver简介](https://pengrl.com/lal/#/LALServer) -- demo:一些小应用,比如推、拉流客户端,压测工具,流分析工具,调度示例程序等。类似于ffmpeg、ffprobe等应用。[Demo简介](https://pengrl.com/lal/#/DEMO) -- pkg:流媒体协议库。类似于ffmpeg的libavformat等库。 - -**lal源码package架构图:** - -![lal源码package架构图](https://pengrl.com/lal/_media/lal_src_fullview_frame.jpeg?date=0124) - -**lalserver特性图:** - -![lalserver特性图](https://pengrl.com/lal/_media/lal_feature.jpeg?date=0124) - -了解更多请访问: - -* lal github地址: https://github.com/q191201771/lal -* lal 官方文档: https://pengrl.com/lal - * **/lalserver/** - * [简介](https://pengrl.com/lal/#/LALServer.md) - * [编译、运行、体验功能](https://pengrl.com/lal/#/QuickStart.md) - * [配置文件说明](https://pengrl.com/lal/#/ConfigBrief.md) - * [HTTP API接口](https://pengrl.com/lal/#/HTTPAPI.md) - * [HTTP Notify(Callback/Webhook)事件回调](https://pengrl.com/lal/#/HTTPNotify.md) - * [Demo简介](https://pengrl.com/lal/#/DEMO.md) - * [Changelog修改记录](https://pengrl.com/lal/#/CHANGELOG.md) - * [github star趋势图](https://pengrl.com/lal/#/StarChart.md) - * [第三方依赖](https://pengrl.com/lal/#/ThirdDeps.md) - * [联系作者](https://pengrl.com/lal/#/Author.md) - * **/技术文档/** - * [常见推拉流客户端使用方式](https://pengrl.com/lal/#/CommonClient.md) - * [连接类型之session pub/sub/push/pull](https://pengrl.com/lal/#/Session.md) - * [rtmp url,以及vhost](https://pengrl.com/lal/#/RTMPURLVhost.md) - * [ffplay播放rtsp花屏](https://pengrl.com/lal/#/RTSPFFPlayBlur.md) - * [FAQ](https://pengrl.com/lal/#/FAQ.md) - * **/待整理/** - * [性能测试](https://pengrl.com/lal/#/Test.md) - * [图稿](https://pengrl.com/lal/#/Drawing.md) - -联系作者: - -- email:191201771@qq.com -- 微信:q191201771 -- QQ:191201771 -- 微信群: 加我微信好友后,告诉我拉你进群 -- QQ群: 1090510973 + +# LAL + +[![Release](https://img.shields.io/github/tag/q191201771/lal.svg?label=release)](https://github.com/q191201771/lal/releases) +[![TravisCI](https://www.travis-ci.org/q191201771/lal.svg?branch=master)](https://www.travis-ci.org/q191201771/lal) +[![goreportcard](https://goreportcard.com/badge/github.com/q191201771/lal)](https://goreportcard.com/report/github.com/q191201771/lal) + +[中文文档](https://pengrl.com/lal/#/) + +LAL is a live stream broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g. RTMP/RTSP/HLS/HTTP[S]-FLV/HTTP-TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache. + +`LAL` stands for `Live And Live` if you may wonder. + +## Install + +There are 2 ways of installing lal. + +### Prebuilt binaries + +Prebuilt binaries for Linux, macOS(Darwin), Windows are available in the [lal github releases page](https://github.com/q191201771/lal/releases). Naturally, using [the latest release binary](https://github.com/q191201771/lal/releases/latest) is the recommended way. The naming format is `lal__.zip`, e.g. `lal_v0.20.0_linux.zip` + +LAL could also be built from the source wherever the Go compiler toolchain can run, e.g. for other architectures including arm32 and mipsle which have been tested by the community. + +### Building from source + +First, make sure that Go version >= 1.13 + +For Unix-like user: + +```shell +$git clone https://github.com/q191201771/lal.git +$cd lal +$make build +``` + +Then all binaries go into the `./bin/` directory. That's it. + +For an experienced gopher, the only thing you should be concern is that `the main function` is under the `./app/lalserver` directory. So you can also: + +```shell +$git clone https://github.com/q191201771/lal.git +$cd lal/app/lalserver +$go build +``` + +Or using whatever IDEs you'd like. + +So far, the only direct and indirect **dependency** of lal is [naza(A basic Go utility library)](https://github.com/q191201771/lal.git) which is also written by myself. This leads to less dependency or version manager issues. + +## Using + +Running lalserver: + +``` +$./bin/lalserver -c ./conf/lalserver.conf.json +``` + +Using whatever clients you are familiar with to interact with lalserver. + +For instance, publish rtmp stream to lalserver via ffmpeg: + +```shell +$ffmpeg -re -i demo.flv -c:a copy -c:v copy -f flv rtmp://127.0.0.1:1935/live/test110 +``` + +Play multi protocol stream from lalserver via ffplay: + +```shell +$ffplay rtmp://127.0.0.1/live/test110 +$ffplay http://127.0.0.1:8080/live/test110.flv +$ffplay http://127.0.0.1:8081/hls/test110/playlist.m3u8 +$ffplay http://127.0.0.1:8081/hls/test110/record.m3u8 +$ffplay http://127.0.0.1:8082/live/test110.ts +``` + +## One more thing + +Besides a live stream broadcast server which named `lalserver` precisely, `project lal` even provides many other applications, e.g. push/pull/remux stream clients, bench tools, examples. Each subdirectory under the `./app/demo` directory represents a tiny demo. + +Our goals are not only a production server but also a simple package with a well-defined, user-facing API, so that users can build their own applications on it. + + +## Contact + +Bugs, questions, suggestions, anything related or not, feel free to contact me with [lal github issues](https://github.com/q191201771/lal/issues). + +## License + +MIT, see [License](https://github.com/q191201771/lal/blob/master/LICENSE). diff --git a/app/demo/benchrtmpconnect/benchrtmpconnect.go b/app/demo/benchrtmpconnect/benchrtmpconnect.go index 589d401..6fdcd34 100644 --- a/app/demo/benchrtmpconnect/benchrtmpconnect.go +++ b/app/demo/benchrtmpconnect/benchrtmpconnect.go @@ -55,11 +55,12 @@ func main() { b := time.Now() err := pullSession.Pull(u, func(msg base.RTMPMsg) { }) + cost := time.Now().Sub(b).Milliseconds() mu.Lock() if err == nil { - succCosts = append(succCosts, time.Now().Sub(b).Milliseconds()) + succCosts = append(succCosts, cost) } else { - failCosts = append(failCosts, time.Now().Sub(b).Milliseconds()) + failCosts = append(failCosts, cost) } mu.Unlock() wg.Done() diff --git a/app/lalserver/main.go b/app/lalserver/main.go index 29d36c4..28344b0 100644 --- a/app/lalserver/main.go +++ b/app/lalserver/main.go @@ -25,6 +25,8 @@ import ( var sm *logic.ServerManager func main() { + defer nazalog.Sync() + confFile := parseFlag() logic.Entry(confFile) } diff --git a/conf/edge.conf.json b/conf/edge.conf.json index a987246..067599f 100644 --- a/conf/edge.conf.json +++ b/conf/edge.conf.json @@ -1,4 +1,6 @@ { + "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", + "conf_version": "v0.1.0", "rtmp": { "enable": true, "addr": ":1945", @@ -19,7 +21,7 @@ "out_path": "/tmp/lal/edge/", "fragment_duration_ms": 3000, "fragment_num": 6, - "cleanup_flag": true + "cleanup_mode": 1 }, "httpts": { "enable": true, diff --git a/conf/lalserver.conf.json b/conf/lalserver.conf.json index c47ad6b..b6dd260 100644 --- a/conf/lalserver.conf.json +++ b/conf/lalserver.conf.json @@ -1,4 +1,6 @@ { + "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", + "conf_version": "v0.1.0", "rtmp": { "enable": true, "addr": ":1935", @@ -19,7 +21,7 @@ "out_path": "/tmp/lal/hls/", "fragment_duration_ms": 3000, "fragment_num": 6, - "cleanup_flag": true + "cleanup_mode": 1 }, "httpts": { "enable": true, diff --git a/conf/lalserver.conf.json.tmpl b/conf/lalserver.conf.json.tmpl index c47ad6b..b6dd260 100644 --- a/conf/lalserver.conf.json.tmpl +++ b/conf/lalserver.conf.json.tmpl @@ -1,4 +1,6 @@ { + "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", + "conf_version": "v0.1.0", "rtmp": { "enable": true, "addr": ":1935", @@ -19,7 +21,7 @@ "out_path": "/tmp/lal/hls/", "fragment_duration_ms": 3000, "fragment_num": 6, - "cleanup_flag": true + "cleanup_mode": 1 }, "httpts": { "enable": true, diff --git a/conf/node2.conf.json b/conf/node2.conf.json index e91b5b2..f78f8f1 100644 --- a/conf/node2.conf.json +++ b/conf/node2.conf.json @@ -1,4 +1,6 @@ { + "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", + "conf_version": "v0.1.0", "rtmp": { "enable": true, "addr": ":1955", diff --git a/conf/onlyrtmp.conf.json b/conf/onlyrtmp.conf.json index d3b9e72..3cf42c9 100644 --- a/conf/onlyrtmp.conf.json +++ b/conf/onlyrtmp.conf.json @@ -1,4 +1,6 @@ { + "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", + "conf_version": "v0.1.0", "rtmp": { "enable": true, "addr": ":1935", @@ -19,7 +21,7 @@ "out_path": "/tmp/lal/hls/", "fragment_duration_ms": 3000, "fragment_num": 6, - "cleanup_flag": true + "cleanup_mode": 1 }, "httpts": { "enable": false, diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index 91beab5..554d64a 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -34,11 +34,26 @@ type MuxerObserver interface { type MuxerConfig struct { Enable bool `json:"enable"` // 如果false,说明hls功能没开,也即不写磁盘,但是MuxerObserver依然会回调 - OutPath string `json:"out_path"` // m3u8和ts文件的输出根目录,注意,末尾需已'/'结束 + OutPath string `json:"out_path"` // m3u8和ts文件的输出根目录,注意,末尾需以'/'结束 FragmentDurationMS int `json:"fragment_duration_ms"` FragmentNum int `json:"fragment_num"` + + // hls文件清理模式: + // 0 不删除m3u8+ts文件,可用于录制等场景 + // 1 在输入流结束后删除m3u8+ts文件 + // 注意,确切的删除时间是推流结束后的 * * 2的时间点 + // 推迟一小段时间删除,是为了避免输入流刚结束,hls的拉流端还没有拉取完 + // 2 推流过程中,持续删除过期的ts文件,只保留最近的 * 2个左右的ts文件 + // TODO chef: lalserver的模式1的逻辑是在上层做的,应该重构到hls模块中 + CleanupMode int `json:"cleanup_mode"` } +const ( + CleanupModeNever = 0 + CleanupModeInTheEnd = 1 + CleanupModeASAP = 2 +) + type Muxer struct { UniqueKey string @@ -58,9 +73,9 @@ type Muxer struct { audioCC uint8 fragTS uint64 // 新建立fragment时的时间戳,毫秒 * 90 - nfrags int // 大序号,增长到winfrags后,就增长frag + nfrags int // 大序号,增长到config.FragmentNum后,就增长frag frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段 - frags []fragmentInfo // TS文件的环形队列,记录TS的信息,比如写M3U8文件时要用 2 * winfrags + 1 + frags []fragmentInfo // TS文件的固定大小环形队列,记录TS的信息 recordMaxFragDuration float64 streamer *Streamer @@ -82,7 +97,7 @@ func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *M playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename) recordPlaylistFilename := getRecordM3U8Filename(op, streamName) recordPlaylistFilenameBak := fmt.Sprintf("%s.bak", recordPlaylistFilename) - frags := make([]fragmentInfo, 2*config.FragmentNum+1) // TODO chef: 为什么是 * 2 + 1 + frags := make([]fragmentInfo, 2*config.FragmentNum+1) m := &Muxer{ UniqueKey: uk, streamName: streamName, @@ -284,11 +299,29 @@ func (m *Muxer) closeFragment(isLast bool) error { } m.opened = false - //更新序号,为下个分片准备好 + + // 更新序号,为下个分片做准备 + // 注意,后面getFrag和getCurrFrag的调用,都依赖该处 m.incrFrag() m.writePlaylist(isLast) - m.writeRecordPlaylist(isLast) + + if m.config.CleanupMode == CleanupModeNever || m.config.CleanupMode == CleanupModeInTheEnd { + m.writeRecordPlaylist(isLast) + } + + if m.config.CleanupMode == CleanupModeASAP { + // 删除过期文件 + // 注意,此处获取的是环形队列该位置的上一轮残留下的信息 + // + frag := m.getCurrFrag() + if frag.filename != "" { + filenameWithPath := getTSFilenameWithPath(m.outPath, frag.filename) + if err := os.Remove(filenameWithPath); err != nil { + nazalog.Warnf("[%s] remove stale fragment file failed. filename=%s, err=%+v", m.UniqueKey, filenameWithPath, err) + } + } + } return nil } @@ -298,6 +331,8 @@ func (m *Muxer) writeRecordPlaylist(isLast bool) { return } + // 找出整个直播流从开始到结束最大的分片时长 + // 注意,由于前面已经incr过了,所以这里-1获取 //frag := m.getCurrFrag() currFrag := m.getFrag(m.nfrags - 1) if currFrag.duration > m.recordMaxFragDuration { diff --git a/pkg/innertest/innertest.go b/pkg/innertest/innertest.go index c88969f..20eabd6 100644 --- a/pkg/innertest/innertest.go +++ b/pkg/innertest/innertest.go @@ -10,6 +10,7 @@ package innertest import ( "bytes" + "encoding/json" "fmt" "io" "io/ioutil" @@ -77,8 +78,11 @@ func InnerTestEntry(t *testing.T) { go logic.Entry(confFile) time.Sleep(200 * time.Millisecond) - config, err := logic.LoadConf(confFile) - assert.Equal(t, nil, err) + var config logic.Config + rawContent, err := ioutil.ReadFile(confFile) + nazalog.Assert(nil, err) + err = json.Unmarshal(rawContent, &config) + nazalog.Assert(nil, err) _ = os.RemoveAll(config.HLSConfig.OutPath) diff --git a/pkg/logic/config.go b/pkg/logic/config.go index 3e26071..f2a8ad0 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -9,19 +9,16 @@ package logic import ( - "encoding/json" - "io/ioutil" - "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/hls" - "github.com/q191201771/naza/pkg/nazajson" "github.com/q191201771/naza/pkg/nazalog" ) -const ConfigVersion = "0.0.1" +const ConfVersion = "v0.1.0" type Config struct { + ConfVersion string `json:"conf_version"` RTMPConfig RTMPConfig `json:"rtmp"` HTTPFLVConfig HTTPFLVConfig `json:"httpflv"` HLSConfig HLSConfig `json:"hls"` @@ -56,7 +53,6 @@ type HTTPTSConfig struct { type HLSConfig struct { SubListenAddr string `json:"sub_listen_addr"` hls.MuxerConfig - CleanupFlag bool `json:"cleanup_flag"` } type RTSPConfig struct { @@ -95,70 +91,3 @@ type PProfConfig struct { Enable bool `json:"enable"` Addr string `json:"addr"` } - -func LoadConf(confFile string) (*Config, error) { - var config Config - rawContent, err := ioutil.ReadFile(confFile) - if err != nil { - return nil, err - } - if err = json.Unmarshal(rawContent, &config); err != nil { - return nil, err - } - - j, err := nazajson.New(rawContent) - if err != nil { - return nil, err - } - - // 检查一级配置项 - keyFieldList := []string{ - "rtmp", - "httpflv", - "hls", - "httpts", - "rtsp", - "relay_push", - "relay_pull", - "http_api", - "http_notify", - "pprof", - "log", - } - for _, kf := range keyFieldList { - if !j.Exist(kf) { - nazalog.Warnf("missing config item %s", kf) - } - } - - // 配置不存在时,设置默认值 - if !j.Exist("log.level") { - config.LogConfig.Level = nazalog.LevelDebug - } - if !j.Exist("log.filename") { - config.LogConfig.Filename = "./logs/lalserver.log" - } - if !j.Exist("log.is_to_stdout") { - config.LogConfig.IsToStdout = true - } - if !j.Exist("log.is_rotate_daily") { - config.LogConfig.IsRotateDaily = true - } - if !j.Exist("log.short_file_flag") { - config.LogConfig.ShortFileFlag = true - } - if !j.Exist("log.timestamp_flag") { - config.LogConfig.TimestampFlag = true - } - if !j.Exist("log.timestamp_with_ms_flag") { - config.LogConfig.TimestampWithMSFlag = true - } - if !j.Exist("log.level_flag") { - config.LogConfig.LevelFlag = true - } - if !j.Exist("log.assert_behavior") { - config.LogConfig.AssertBehavior = nazalog.AssertError - } - - return &config, nil -} diff --git a/pkg/logic/entry.go b/pkg/logic/entry.go index 9b26418..c42c095 100644 --- a/pkg/logic/entry.go +++ b/pkg/logic/entry.go @@ -9,10 +9,16 @@ package logic import ( + "encoding/json" "fmt" + "io/ioutil" "net/http" _ "net/http/pprof" "os" + "strings" + + "github.com/q191201771/lal/pkg/hls" + "github.com/q191201771/naza/pkg/nazajson" "github.com/q191201771/lal/pkg/base" @@ -27,8 +33,8 @@ var ( ) func Entry(confFile string) { - config = loadConf(confFile) - initLog(config.LogConfig) + LoadConfAndInitLog(confFile) + nazalog.Infof("args: %s", strings.Join(os.Args, " ")) nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine()) nazalog.Infof("version: %s", base.LALFullInfo) nazalog.Infof("github: %s", base.LALGithubSite) @@ -43,36 +49,129 @@ func Entry(confFile string) { sm.Dispose() }) - sm.RunLoop() + err := sm.RunLoop() + nazalog.Errorf("server manager loop break. err=%+v", err) } func Dispose() { sm.Dispose() } -func loadConf(confFile string) *Config { - config, err := LoadConf(confFile) +func LoadConfAndInitLog(confFile string) *Config { + // 读取配置文件并解析原始内容 + rawContent, err := ioutil.ReadFile(confFile) if err != nil { - nazalog.Errorf("load conf failed. file=%s err=%+v", confFile, err) - os.Exit(1) + _, _ = fmt.Fprintf(os.Stderr, "read conf file failed. file=%s err=%+v", confFile, err) + base.OSExitAndWaitPressIfWindows(1) + } + if err = json.Unmarshal(rawContent, &config); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "unmarshal conf file failed. file=%s err=%+v", confFile, err) + base.OSExitAndWaitPressIfWindows(1) + } + j, err := nazajson.New(rawContent) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "nazajson unmarshal conf file failed. file=%s err=%+v", confFile, err) + base.OSExitAndWaitPressIfWindows(1) } - nazalog.Infof("load conf file succ. file=%s content=%+v", confFile, config) - return config -} -func initLog(opt nazalog.Option) { + // 初始化日志,注意,这一步尽量提前,使得后续的日志内容按我们的日志配置输出 + // 日志配置项不存在时,设置默认值 + if !j.Exist("log.level") { + config.LogConfig.Level = nazalog.LevelDebug + } + if !j.Exist("log.filename") { + config.LogConfig.Filename = "./logs/lalserver.log" + } + if !j.Exist("log.is_to_stdout") { + config.LogConfig.IsToStdout = true + } + if !j.Exist("log.is_rotate_daily") { + config.LogConfig.IsRotateDaily = true + } + if !j.Exist("log.short_file_flag") { + config.LogConfig.ShortFileFlag = true + } + if !j.Exist("log.timestamp_flag") { + config.LogConfig.TimestampFlag = true + } + if !j.Exist("log.timestamp_with_ms_flag") { + config.LogConfig.TimestampWithMSFlag = true + } + if !j.Exist("log.level_flag") { + config.LogConfig.LevelFlag = true + } + if !j.Exist("log.assert_behavior") { + config.LogConfig.AssertBehavior = nazalog.AssertError + } if err := nazalog.Init(func(option *nazalog.Option) { - *option = opt + *option = config.LogConfig }); err != nil { _, _ = fmt.Fprintf(os.Stderr, "initial log failed. err=%+v\n", err) - os.Exit(1) + base.OSExitAndWaitPressIfWindows(1) } nazalog.Info("initial log succ.") + + // 打印Logo + nazalog.Info(` + __ ___ __ + / / / | / / + / / / /| | / / + / /___/ ___ |/ /___ +/_____/_/ |_/_____/ +`) + + // 检查配置版本号是否匹配 + if config.ConfVersion != ConfVersion { + nazalog.Warnf("config version invalid. conf version of lalserver=%s, conf version of config file=%s", + ConfVersion, config.ConfVersion) + } + + // 检查一级配置项 + keyFieldList := []string{ + "rtmp", + "httpflv", + "hls", + "httpts", + "rtsp", + "relay_push", + "relay_pull", + "http_api", + "http_notify", + "pprof", + "log", + } + for _, kf := range keyFieldList { + if !j.Exist(kf) { + nazalog.Warnf("missing config item %s", kf) + } + } + + // 配置不存在时,设置默认值 + if !j.Exist("hls.cleanup_mode") { + const defaultMode = hls.CleanupModeInTheEnd + nazalog.Warnf("config hls.cleanup_mode not exist. default is %d", defaultMode) + config.HLSConfig.CleanupMode = defaultMode + } + + // 把配置文件原始内容中的换行去掉,使得打印日志时紧凑一些 + lines := strings.Split(string(rawContent), "\n") + if len(lines) == 1 { + lines = strings.Split(string(rawContent), "\r\n") + } + var tlines []string + for _, l := range lines { + tlines = append(tlines, strings.TrimSpace(l)) + } + compactRawContent := strings.Join(tlines, " ") + nazalog.Infof("load conf file succ. filename=%s, raw content=%s parsed=%+v", confFile, compactRawContent, config) + + return config } func runWebPProf(addr string) { nazalog.Infof("start web pprof listen. addr=%s", addr) + //nazalog.Warn("start fgprof.") //http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler()) if err := http.ListenAndServe(addr, nil); err != nil { diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 43a8bcb..ded404d 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -657,10 +657,7 @@ func (group *Group) delRTSPPubSession(session *rtsp.PubSession) { group.rtspPubSession = nil group.delIn() } -func (group *Group) delRTSPSubSession(session *rtsp.SubSession) { - nazalog.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey()) - delete(group.rtspSubSessionSet, session) -} + func (group *Group) delRTMPPullSession(session *rtmp.PullSession) { nazalog.Debugf("[%s] [%s] del rtmp PullSession from group.", group.UniqueKey, session.UniqueKey()) @@ -684,6 +681,11 @@ func (group *Group) delHTTPTSSubSession(session *httpts.SubSession) { delete(group.httptsSubSessionSet, session) } +func (group *Group) delRTSPSubSession(session *rtsp.SubSession) { + nazalog.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey()) + delete(group.rtspSubSessionSet, session) +} + // TODO chef: 目前相当于其他类型往rtmp.AVMsg转了,考虑统一往一个通用类型转 // @param msg 调用结束后,内部不持有msg.Payload内存块 func (group *Group) broadcastRTMP(msg base.RTMPMsg) { @@ -1008,7 +1010,8 @@ func (group *Group) disposeHLSMuxer() { group.hlsMuxer.Dispose() // 添加延时任务,删除HLS文件 - if config.HLSConfig.Enable && config.HLSConfig.CleanupFlag { + if config.HLSConfig.Enable && + (config.HLSConfig.CleanupMode == hls.CleanupModeInTheEnd || config.HLSConfig.CleanupMode == hls.CleanupModeASAP) { defertaskthread.Go( config.HLSConfig.FragmentDurationMS*config.HLSConfig.FragmentNum*2, func(param ...interface{}) { diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index ebba6aa..434b54b 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -10,7 +10,6 @@ package logic import ( "fmt" - "os" "sync" "time" @@ -66,13 +65,12 @@ func NewServerManager() *ServerManager { return m } -func (sm *ServerManager) RunLoop() { +func (sm *ServerManager) RunLoop() error { httpNotify.OnServerStart() if sm.rtmpServer != nil { if err := sm.rtmpServer.Listen(); err != nil { - nazalog.Error(err) - os.Exit(1) + return err } go func() { if err := sm.rtmpServer.RunLoop(); err != nil { @@ -83,8 +81,7 @@ func (sm *ServerManager) RunLoop() { if sm.httpflvServer != nil { if err := sm.httpflvServer.Listen(); err != nil { - nazalog.Error(err) - os.Exit(1) + return err } go func() { if err := sm.httpflvServer.RunLoop(); err != nil { @@ -95,8 +92,7 @@ func (sm *ServerManager) RunLoop() { if sm.httptsServer != nil { if err := sm.httptsServer.Listen(); err != nil { - nazalog.Error(err) - os.Exit(1) + return err } go func() { if err := sm.httptsServer.RunLoop(); err != nil { @@ -107,8 +103,7 @@ func (sm *ServerManager) RunLoop() { if sm.hlsServer != nil { if err := sm.hlsServer.Listen(); err != nil { - nazalog.Error(err) - os.Exit(1) + return err } go func() { if err := sm.hlsServer.RunLoop(); err != nil { @@ -119,8 +114,7 @@ func (sm *ServerManager) RunLoop() { if sm.rtspServer != nil { if err := sm.rtspServer.Listen(); err != nil { - nazalog.Error(err) - os.Exit(1) + return err } go func() { if err := sm.rtspServer.RunLoop(); err != nil { @@ -131,8 +125,7 @@ func (sm *ServerManager) RunLoop() { if sm.httpAPIServer != nil { if err := sm.httpAPIServer.Listen(); err != nil { - nazalog.Error(err) - os.Exit(1) + return err } go func() { if err := sm.httpAPIServer.Runloop(); err != nil { @@ -153,7 +146,7 @@ func (sm *ServerManager) RunLoop() { for { select { case <-sm.exitChan: - return + return nil case <-t.C: count++ @@ -178,6 +171,8 @@ func (sm *ServerManager) RunLoop() { } } } + + // never reach here } func (sm *ServerManager) Dispose() { diff --git a/pkg/rtprtcp/rtp.go b/pkg/rtprtcp/rtp.go index 3a6eba3..58d254c 100644 --- a/pkg/rtprtcp/rtp.go +++ b/pkg/rtprtcp/rtp.go @@ -123,6 +123,9 @@ func ParseRTPPacket(b []byte) (pkt RTPPacket, err error) { } // 比较序号的值,内部处理序号翻转问题,见单元测试中的例子 +// @return 0 a和b相等 +// 1 a大于b +// -1 a小于b func CompareSeq(a, b uint16) int { if a == b { return 0 diff --git a/pkg/rtprtcp/rtp_unpack_container.go b/pkg/rtprtcp/rtp_unpack_container.go new file mode 100644 index 0000000..96b8317 --- /dev/null +++ b/pkg/rtprtcp/rtp_unpack_container.go @@ -0,0 +1,150 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package rtprtcp + +type RTPPacketListItem struct { + packet RTPPacket + next *RTPPacketListItem +} + +type RTPPacketList struct { + head RTPPacketListItem // 哨兵,自身不存放rtp包,第一个rtp包存在在head.next中 + size int // 实际元素个数 +} + +type RTPUnpackContainer struct { + maxSize int + + unpackerProtocol IRTPUnpackerProtocol + + list RTPPacketList + unpackedFlag bool // 是否成功合成过 + unpackedSeq uint16 // 成功合成的最后一个seq号 +} + +func NewRTPUnpackContainer(maxSize int, unpackerProtocol IRTPUnpackerProtocol) *RTPUnpackContainer { + return &RTPUnpackContainer{ + maxSize: maxSize, + unpackerProtocol: unpackerProtocol, + } +} + +// 输入收到的rtp包 +func (r *RTPUnpackContainer) Feed(pkt RTPPacket) { + // 过期的包 + if r.isStale(pkt.Header.Seq) { + return + } + + // 计算位置 + r.unpackerProtocol.CalcPositionIfNeeded(&pkt) + // 根据序号插入有序链表 + r.insert(pkt) + + // 尽可能多的合成顺序的帧 + count := 0 + for { + if !r.tryUnpackOneSequential() { + break + } + count++ + } + + // 合成顺序的帧成功了,直接返回 + if count > 0 { + return + } + + // 缓存达到最大值 + if r.list.size > r.maxSize { + // 尝试合成一帧发生跳跃的帧 + packed := r.tryUnpackOne() + + if !packed { + // 合成失败了,丢弃一包过期数据 + r.list.head.next = r.list.head.next.next + r.list.size-- + } else { + // 合成成功了,再次尝试,尽可能多的合成顺序的帧 + for { + if !r.tryUnpackOneSequential() { + break + } + } + } + } +} + +// 检查rtp包是否已经过期 +// +// @return true 表示过期 +// false 没过期 +// +func (r *RTPUnpackContainer) isStale(seq uint16) bool { + // 从来没有合成成功过 + if !r.unpackedFlag { + return false + } + // 序号太小 + return CompareSeq(seq, r.unpackedSeq) <= 0 +} + +// 将rtp包按seq排序插入队列中 +func (r *RTPUnpackContainer) insert(pkt RTPPacket) { + r.list.size++ + + p := &r.list.head + for ; p.next != nil; p = p.next { + res := CompareSeq(pkt.Header.Seq, p.next.packet.Header.Seq) + switch res { + case 0: + return + case 1: + // noop + case -1: + item := &RTPPacketListItem{ + packet: pkt, + next: p.next, + } + p.next = item + return + } + } + + item := &RTPPacketListItem{ + packet: pkt, + next: p.next, + } + p.next = item +} + +// 从队列头部,尝试合成一个完整的帧。保证这次合成的帧的首个seq和上次合成帧的尾部seq是连续的 +func (r *RTPUnpackContainer) tryUnpackOneSequential() bool { + if r.unpackedFlag { + first := r.list.head.next + if first == nil { + return false + } + if SubSeq(first.packet.Header.Seq, r.unpackedSeq) != 1 { + return false + } + } + + return r.tryUnpackOne() +} + +// 从队列头部,尝试合成一个完整的帧。不保证这次合成的帧的首个seq和上次合成帧的尾部seq是连续的 +func (r *RTPUnpackContainer) tryUnpackOne() bool { + unpackedFlag, unpackedSeq := r.unpackerProtocol.TryUnpackOne(&r.list) + if unpackedFlag { + r.unpackedFlag = unpackedFlag + r.unpackedSeq = unpackedSeq + } + return unpackedFlag +} diff --git a/pkg/rtprtcp/rtp_unpacker.go b/pkg/rtprtcp/rtp_unpacker.go index 77bdbf9..6531eb0 100644 --- a/pkg/rtprtcp/rtp_unpacker.go +++ b/pkg/rtprtcp/rtp_unpacker.go @@ -10,31 +10,41 @@ package rtprtcp import ( "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/nazalog" ) -// 传入RTP包,合成帧数据,并回调。 -// 一路音频或一路视频各对应一个对象。 -// 目前支持AVC,HEVC和AAC MPEG4-GENERIC/44100/2 +// 传入RTP包,合成帧数据,并回调返回 +// 一路音频或一路视频各对应一个对象 -type RTPPacketListItem struct { - packet RTPPacket - next *RTPPacketListItem -} +var ( + _ IRTPUnpacker = &RTPUnpackContainer{} + _ IRTPUnpackContainer = &RTPUnpackContainer{} + _ IRTPUnpackerProtocol = &RTPUnpackerAAC{} + _ IRTPUnpackerProtocol = &RTPUnpackerAVCHEVC{} +) -type RTPPacketList struct { - head RTPPacketListItem // 哨兵,自身不存放rtp包,第一个rtp包存在在head.next中 - size int // 实际元素个数 +type IRTPUnpacker interface { + IRTPUnpackContainer } -type RTPUnpacker struct { - payloadType base.AVPacketPT - clockRate int - maxSize int - onAVPacket OnAVPacket +type IRTPUnpackContainer interface { + Feed(pkt RTPPacket) +} - list RTPPacketList - unpackedFlag bool - unpackedSeq uint16 +type IRTPUnpackerProtocol interface { + // 计算rtp包处于帧中的位置 + CalcPositionIfNeeded(pkt *RTPPacket) + + // 尝试合成一个完整帧 + // + // 从当前队列的第一个包开始合成 + // 如果一个rtp包对应一个完整帧,则合成一帧 + // 如果一个rtp包对应多个完整帧,则合成多帧 + // 如果多个rtp包对应一个完整帧,则尝试合成一帧 + // + // @return unpackedFlag 本次调用是否成功合成 + // @return unpackedSeq 如果成功合成,合成使用的最后一个seq号;如果失败,则为0 + TryUnpackOne(list *RTPPacketList) (unpackedFlag bool, unpackedSeq uint16) } // @param pkt: pkt.Timestamp RTP包头中的时间戳(pts)经过clockrate换算后的时间戳,单位毫秒 @@ -46,138 +56,18 @@ type RTPUnpacker struct { // AVC或者HEVC是新申请的内存块,回调结束后,内部不再使用该内存块 type OnAVPacket func(pkt base.AVPacket) -func NewRTPUnpacker(payloadType base.AVPacketPT, clockRate int, maxSize int, onAVPacket OnAVPacket) *RTPUnpacker { - return &RTPUnpacker{ - payloadType: payloadType, - clockRate: clockRate, - maxSize: maxSize, - onAVPacket: onAVPacket, - } -} - -// 输入收到的rtp包 -func (r *RTPUnpacker) Feed(pkt RTPPacket) { - if r.isStale(pkt.Header.Seq) { - return - } - - r.calcPositionIfNeeded(&pkt) - r.insert(pkt) - - // 尽可能多的合成顺序的帧 - count := 0 - for { - if !r.unpackOneSequential() { - break - } - count++ - } - - // 合成顺序的帧成功了,直接返回 - if count > 0 { - return - } - - // 缓存达到最大值 - if r.list.size > r.maxSize { - // 尝试合成一帧发生跳跃的帧 - if !r.unpackOne() { - - // 合成失败了,丢弃过期数据 - r.list.head.next = r.list.head.next.next - r.list.size-- - } - - // 再次尝试,尽可能多的合成顺序的帧 - for { - if !r.unpackOneSequential() { - break - } - } - } -} - -// 检查rtp包是否已经过期 -// -// @return true 表示过期 -// false 没过期 -// -func (r *RTPUnpacker) isStale(seq uint16) bool { - if !r.unpackedFlag { - return false - } - return CompareSeq(seq, r.unpackedSeq) <= 0 -} - -// 计算rtp包处于帧中的位置 -func (r *RTPUnpacker) calcPositionIfNeeded(pkt *RTPPacket) { - switch r.payloadType { - case base.AVPacketPTAVC: - calcPositionIfNeededAVC(pkt) - case base.AVPacketPTHEVC: - calcPositionIfNeededHEVC(pkt) +// 目前支持AVC,HEVC和AAC MPEG4-GENERIC/44100/2,业务方也可以自己实现IRTPUnpackerProtocol,甚至是IRTPUnpackContainer +func DefaultRTPUnpackerFactory(payloadType base.AVPacketPT, clockRate int, maxSize int, onAVPacket OnAVPacket) IRTPUnpacker { + var protocol IRTPUnpackerProtocol + switch payloadType { case base.AVPacketPTAAC: - // noop - break - default: - // can't reach here - } -} - -// 将rtp包按seq排序插入队列中 -func (r *RTPUnpacker) insert(pkt RTPPacket) { - r.list.size++ - - p := &r.list.head - for ; p.next != nil; p = p.next { - res := CompareSeq(pkt.Header.Seq, p.next.packet.Header.Seq) - switch res { - case 0: - return - case 1: - // noop - case -1: - item := &RTPPacketListItem{ - packet: pkt, - next: p.next, - } - p.next = item - return - } - } - - item := &RTPPacketListItem{ - packet: pkt, - next: p.next, - } - p.next = item -} - -// 从队列头部,尝试合成一个完整的帧。保证这次合成的帧的首个seq和上次合成帧的尾部seq是连续的 -func (r *RTPUnpacker) unpackOneSequential() bool { - if r.unpackedFlag { - first := r.list.head.next - if first == nil { - return false - } - if SubSeq(first.packet.Header.Seq, r.unpackedSeq) != 1 { - return false - } - } - - return r.unpackOne() -} - -// 从队列头部,尝试合成一个完整的帧。不保证这次合成的帧的首个seq和上次合成帧的尾部seq是连续的 -func (r *RTPUnpacker) unpackOne() bool { - switch r.payloadType { - case base.AVPacketPTAAC: - return r.unpackOneAAC() + protocol = NewRTPUnpackerAAC(payloadType, clockRate, onAVPacket) case base.AVPacketPTAVC: fallthrough case base.AVPacketPTHEVC: - return r.unpackOneAVCOrHEVC() + protocol = NewRTPUnpackerAVCHEVC(payloadType, clockRate, onAVPacket) + default: + nazalog.Fatalf("payload type not support yet. payloadType=%d", payloadType) } - - return false + return NewRTPUnpackContainer(maxSize, protocol) } diff --git a/pkg/rtprtcp/rtp_unpacker_aac.go b/pkg/rtprtcp/rtp_unpacker_aac.go index 4da3690..c0de3b2 100644 --- a/pkg/rtprtcp/rtp_unpacker_aac.go +++ b/pkg/rtprtcp/rtp_unpacker_aac.go @@ -8,18 +8,30 @@ package rtprtcp -import "github.com/q191201771/lal/pkg/base" +import ( + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/nazalog" +) -// AAC格式的流,尝试合成一个完整的帧 -func (r *RTPUnpacker) unpackOneAAC() bool { - first := r.list.head.next - if first == nil { - return false +type RTPUnpackerAAC struct { + payloadType base.AVPacketPT + clockRate int + onAVPacket OnAVPacket +} + +func NewRTPUnpackerAAC(payloadType base.AVPacketPT, clockRate int, onAVPacket OnAVPacket) *RTPUnpackerAAC { + return &RTPUnpackerAAC{ + payloadType: payloadType, + clockRate: clockRate, + onAVPacket: onAVPacket, } +} - // TODO chef: - // 2. 只处理了一个RTP包含多个音频包的情况,没有处理一个音频包跨越多个RTP包的情况(是否有这种情况) +func (unpacker *RTPUnpackerAAC) CalcPositionIfNeeded(pkt *RTPPacket) { + // noop +} +func (unpacker *RTPUnpackerAAC) TryUnpackOne(list *RTPPacketList) (unpackedFlag bool, unpackedSeq uint16) { // rfc3640 2.11. Global Structure of Payload Format // // +---------+-----------+-----------+---------------+ @@ -38,46 +50,161 @@ func (r *RTPUnpacker) unpackOneAAC() bool { // // rfc3640 3.3.6. High Bit-rate AAC // + // rtp_parse_mp4_au() + // + // + // 3.2.3.1. Fragmentation + // + // A packet SHALL carry either one or more complete Access Units, or a + // single fragment of an Access Unit. Fragments of the same Access Unit + // have the same time stamp but different RTP sequence numbers. The + // marker bit in the RTP header is 1 on the last fragment of an Access + // Unit, and 0 on all other fragments. + // - b := first.packet.Raw[first.packet.Header.payloadOffset:] + p := list.head.next // first + if p == nil { + return false, 0 + } + b := p.packet.Raw[p.packet.Header.payloadOffset:] //nazalog.Debugf("%d, %d, %s", len(pkt.Raw), pkt.Header.timestamp, hex.Dump(b)) - // AU Header Section - var auHeaderLength uint32 - auHeaderLength = uint32(b[0])<<8 + uint32(b[1]) - auHeaderLength = (auHeaderLength + 7) / 8 - //nazalog.Debugf("auHeaderLength=%d", auHeaderLength) - - // no Auxiliary Section - - pauh := uint32(2) // AU Header pos - pau := uint32(2) + auHeaderLength // AU pos - auNum := uint32(auHeaderLength) / 2 - for i := uint32(0); i < auNum; i++ { - var auSize uint32 - auSize = uint32(b[pauh])<<8 | uint32(b[pauh+1]&0xF8) // 13bit - auSize /= 8 + aus := parseAU(b) - //auIndex := b[pauh+1] & 0x7 + if len(aus) == 1 { + if aus[0].size <= uint32(len(b[aus[0].pos:])) { + // one complete access unit + var outPkt base.AVPacket + outPkt.PayloadType = unpacker.payloadType + outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) + outPkt.Payload = b[aus[0].pos : aus[0].pos+aus[0].size] + unpacker.onAVPacket(outPkt) + + list.head.next = p.next + list.size-- + return true, p.packet.Header.Seq + } + + // fragmented + // 注意,这里我们参考size和rtp包头中的timestamp,不参考rtp包头中的mark位 + + totalSize := aus[0].size + timestamp := p.packet.Header.Timestamp + + var as [][]byte + as = append(as, b[aus[0].pos:]) + cacheSize := uint32(len(b[aus[0].pos:])) + + seq := p.packet.Header.Seq + p = p.next + packetCount := 0 + for { + packetCount++ + if p == nil { + return false, 0 + } + if SubSeq(p.packet.Header.Seq, seq) != 1 { + return false, 0 + } + if p.packet.Header.Timestamp != timestamp { + nazalog.Errorf("fragments of the same access shall have the same timestamp. first=%d, curr=%d", + timestamp, p.packet.Header.Timestamp) + return false, 0 + } + + b = p.packet.Raw[p.packet.Header.payloadOffset:] + aus := parseAU(b) + if len(aus) != 1 { + nazalog.Errorf("shall be a single fragment. len(aus)=%d", len(aus)) + return false, 0 + } + if aus[0].size != totalSize { + nazalog.Errorf("fragments of the same access shall have the same size. first=%d, curr=%d", + totalSize, aus[0].size) + return false, 0 + } - // raw AAC frame - // pau, auSize - //nazalog.Debugf("%d %d %s", auSize, auIndex, hex.Dump(b[pau:pau+auSize])) + cacheSize += uint32(len(b[aus[0].pos:])) + seq = p.packet.Header.Seq + as = append(as, b[aus[0].pos:]) + if cacheSize < totalSize { + p = p.next + } else if cacheSize == totalSize { + var outPkt base.AVPacket + outPkt.PayloadType = unpacker.payloadType + outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) + for _, a := range as { + outPkt.Payload = append(outPkt.Payload, a...) + } + unpacker.onAVPacket(outPkt) + + list.head.next = p.next + list.size -= packetCount + return true, p.packet.Header.Seq + } else { + nazalog.Errorf("cache size bigger then total size. cacheSize=%d, totalSize=%d", + cacheSize, totalSize) + return false, 0 + } + } + // can reach here + } + + // more complete access unit + for i := range aus { var outPkt base.AVPacket - outPkt.Timestamp = first.packet.Header.Timestamp / uint32(r.clockRate/1000) - outPkt.Timestamp += i * uint32((1024*1000)/r.clockRate) - outPkt.Payload = b[pau : pau+auSize] - outPkt.PayloadType = r.payloadType + outPkt.PayloadType = unpacker.payloadType + outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) + // TODO chef: 这里1024的含义 + outPkt.Timestamp += uint32(i * (1024 * 1000) / unpacker.clockRate) + outPkt.Payload = b[aus[i].pos : aus[i].pos+aus[i].size] + unpacker.onAVPacket(outPkt) + } - r.onAVPacket(outPkt) + list.head.next = p.next + list.size-- + return true, p.packet.Header.Seq +} + +type au struct { + size uint32 + pos uint32 +} + +func parseAU(b []byte) (ret []au) { + // AU Header Section + var auHeadersLength uint32 + auHeadersLength = uint32(b[0])<<8 + uint32(b[1]) + auHeadersLength = (auHeadersLength + 7) / 8 + + // TODO chef: 这里的2是写死的,正常是外部传入auSize和auIndex所占位数的和 + const auHeaderSize = 2 + nbAUHeaders := uint32(auHeadersLength) / auHeaderSize // 有多少个AU-Header + + pauh := uint32(2) // AU Header pos + pau := uint32(2) + auHeadersLength // AU pos + + for i := uint32(0); i < nbAUHeaders; i++ { + // TODO chef: auSize和auIndex所在的位数是写死的13bit,3bit,标准的做法应该从外部传入,比如从sdp中获取后传入 + auSize := uint32(b[pauh])<<8 | uint32(b[pauh+1]&0xF8) // 13bit + auSize /= 8 + // 注意,fragment时,auIndex并不可靠。见TestAACCase1 + //auIndex := b[pauh+1] & 0x7 + //nazalog.Debugf("~ %d %d", auSize, auIndex) + + ret = append(ret, au{ + size: auSize, + pos: pau, + }) pauh += 2 pau += auSize } - r.unpackedFlag = true - r.unpackedSeq = first.packet.Header.Seq - r.list.head.next = first.next - r.list.size-- - return true + if (nbAUHeaders > 1 && pau != uint32(len(b))) || + (nbAUHeaders == 1 && pau < uint32(len(b))) { + nazalog.Warnf("rtp packet size invalid. nbAUHeaders=%d, pau=%d, len(b)=%d", nbAUHeaders, pau, len(b)) + } + + return } diff --git a/pkg/rtprtcp/rtp_unpacker_avc.go b/pkg/rtprtcp/rtp_unpacker_avc_hevc.go similarity index 60% rename from pkg/rtprtcp/rtp_unpacker_avc.go rename to pkg/rtprtcp/rtp_unpacker_avc_hevc.go index bbe8636..3066768 100644 --- a/pkg/rtprtcp/rtp_unpacker_avc.go +++ b/pkg/rtprtcp/rtp_unpacker_avc_hevc.go @@ -1,4 +1,4 @@ -// Copyright 2020, Chef. All rights reserved. +// Copyright 2021, Chef. All rights reserved. // https://github.com/q191201771/lal // // Use of this source code is governed by a MIT-style license @@ -11,112 +11,59 @@ package rtprtcp import ( "github.com/q191201771/lal/pkg/avc" "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/hevc" "github.com/q191201771/naza/pkg/bele" "github.com/q191201771/naza/pkg/nazalog" ) -func calcPositionIfNeededAVC(pkt *RTPPacket) { - b := pkt.Raw[pkt.Header.payloadOffset:] - - // rfc3984 5.3. NAL Unit Octet Usage - // - // +---------------+ - // |0|1|2|3|4|5|6|7| - // +-+-+-+-+-+-+-+-+ - // |F|NRI| Type | - // +---------------+ - - outerNALUType := avc.ParseNALUType(b[0]) - if outerNALUType <= NALUTypeAVCSingleMax { - pkt.positionType = PositionTypeSingle - return - } else if outerNALUType == NALUTypeAVCFUA { - - // rfc3984 5.8. Fragmentation Units (FUs) - // - // 0 1 2 3 - // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // | FU indicator | FU header | | - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | - // | | - // | FU payload | - // | | - // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // | :...OPTIONAL RTP padding | - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // - // FU indicator: - // +---------------+ - // |0|1|2|3|4|5|6|7| - // +-+-+-+-+-+-+-+-+ - // |F|NRI| Type | - // +---------------+ - // - // Fu header: - // +---------------+ - // |0|1|2|3|4|5|6|7| - // +-+-+-+-+-+-+-+-+ - // |S|E|R| Type | - // +---------------+ - - fuIndicator := b[0] - _ = fuIndicator - fuHeader := b[1] - - startCode := (fuHeader & 0x80) != 0 - endCode := (fuHeader & 0x40) != 0 - - if startCode { - pkt.positionType = PositionTypeFUAStart - return - } - - if endCode { - pkt.positionType = PositionTypeFUAEnd - return - } +type RTPUnpackerAVCHEVC struct { + payloadType base.AVPacketPT + clockRate int + onAVPacket OnAVPacket +} - pkt.positionType = PositionTypeFUAMiddle - return - } else if outerNALUType == NALUTypeAVCSTAPA { - pkt.positionType = PositionTypeSTAPA - } else { - nazalog.Errorf("unknown nalu type. outerNALUType=%d", outerNALUType) +func NewRTPUnpackerAVCHEVC(payloadType base.AVPacketPT, clockRate int, onAVPacket OnAVPacket) *RTPUnpackerAVCHEVC { + return &RTPUnpackerAVCHEVC{ + payloadType: payloadType, + clockRate: clockRate, + onAVPacket: onAVPacket, } +} - return +func (unpacker *RTPUnpackerAVCHEVC) CalcPositionIfNeeded(pkt *RTPPacket) { + switch unpacker.payloadType { + case base.AVPacketPTAVC: + calcPositionIfNeededAVC(pkt) + case base.AVPacketPTHEVC: + calcPositionIfNeededHEVC(pkt) + } } -// AVC或HEVC格式的流,尝试合成一个完整的帧 -func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool { - first := r.list.head.next +func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedFlag bool, unpackedSeq uint16) { + first := list.head.next if first == nil { - return false + return false, 0 } switch first.packet.positionType { case PositionTypeSingle: var pkt base.AVPacket - pkt.PayloadType = r.payloadType - pkt.Timestamp = first.packet.Header.Timestamp / uint32(r.clockRate/1000) + pkt.PayloadType = unpacker.payloadType + pkt.Timestamp = first.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) pkt.Payload = make([]byte, len(first.packet.Raw)-int(first.packet.Header.payloadOffset)+4) bele.BEPutUint32(pkt.Payload, uint32(len(first.packet.Raw))-first.packet.Header.payloadOffset) copy(pkt.Payload[4:], first.packet.Raw[first.packet.Header.payloadOffset:]) - r.unpackedFlag = true - r.unpackedSeq = first.packet.Header.Seq - r.list.head.next = first.next - r.list.size-- - r.onAVPacket(pkt) - - return true + list.head.next = first.next + list.size-- + unpacker.onAVPacket(pkt) + return true, first.packet.Header.Seq case PositionTypeSTAPA: var pkt base.AVPacket - pkt.PayloadType = r.payloadType - pkt.Timestamp = first.packet.Header.Timestamp / uint32(r.clockRate/1000) + pkt.PayloadType = unpacker.payloadType + pkt.Timestamp = first.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) // 跳过首字节,并且将多nalu前的2字节长度,替换成4字节长度 buf := first.packet.Raw[first.packet.Header.payloadOffset+1:] @@ -126,7 +73,7 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool { for i := 0; i != len(buf); { if len(buf)-i < 2 { nazalog.Errorf("invalid STAP-A packet.") - return false + return false, 0 } naluSize := int(bele.BEUint16(buf[i:])) totalSize += 4 + naluSize @@ -143,23 +90,21 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool { i += 2 + naluSize } - r.unpackedFlag = true - r.unpackedSeq = first.packet.Header.Seq - r.list.head.next = first.next - r.list.size-- - r.onAVPacket(pkt) + list.head.next = first.next + list.size-- + unpacker.onAVPacket(pkt) - return true + return true, first.packet.Header.Seq case PositionTypeFUAStart: prev := first p := first.next for { if prev == nil || p == nil { - return false + return false, 0 } if SubSeq(p.packet.Header.Seq, prev.packet.Header.Seq) != 1 { - return false + return false, 0 } if p.packet.positionType == PositionTypeFUAMiddle { @@ -168,12 +113,12 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool { continue } else if p.packet.positionType == PositionTypeFUAEnd { var pkt base.AVPacket - pkt.PayloadType = r.payloadType - pkt.Timestamp = p.packet.Header.Timestamp / uint32(r.clockRate/1000) + pkt.PayloadType = unpacker.payloadType + pkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) var naluTypeLen int var naluType []byte - if r.payloadType == base.AVPacketPTAVC { + if unpacker.payloadType == base.AVPacketPTAVC { naluTypeLen = 1 naluType = make([]byte, naluTypeLen) @@ -204,7 +149,7 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool { pkt.Payload = make([]byte, totalSize+4+naluTypeLen) bele.BEPutUint32(pkt.Payload, uint32(totalSize+naluTypeLen)) var index int - if r.payloadType == base.AVPacketPTAVC { + if unpacker.payloadType == base.AVPacketPTAVC { pkt.Payload[4] = naluType[0] index = 5 } else { @@ -225,17 +170,15 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool { pp = pp.next } - r.unpackedFlag = true - r.unpackedSeq = p.packet.Header.Seq - r.list.head.next = p.next - r.list.size -= packetCount - r.onAVPacket(pkt) + list.head.next = p.next + list.size -= packetCount + unpacker.onAVPacket(pkt) - return true + return true, p.packet.Header.Seq } else { // 不应该出现其他类型 nazalog.Errorf("invalid position type. position=%d", p.packet.positionType) - return false + return false, 0 } } @@ -247,5 +190,150 @@ func (r *RTPUnpacker) unpackOneAVCOrHEVC() bool { nazalog.Errorf("invalid position. pos=%d", first.packet.positionType) } - return false + return false, 0 +} +func calcPositionIfNeededAVC(pkt *RTPPacket) { + b := pkt.Raw[pkt.Header.payloadOffset:] + + // rfc3984 5.3. NAL Unit Octet Usage + // + // +---------------+ + // |0|1|2|3|4|5|6|7| + // +-+-+-+-+-+-+-+-+ + // |F|NRI| Type | + // +---------------+ + + outerNALUType := avc.ParseNALUType(b[0]) + if outerNALUType <= NALUTypeAVCSingleMax { + pkt.positionType = PositionTypeSingle + return + } else if outerNALUType == NALUTypeAVCFUA { + + // rfc3984 5.8. Fragmentation Units (FUs) + // + // 0 1 2 3 + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | FU indicator | FU header | | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | + // | | + // | FU payload | + // | | + // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | :...OPTIONAL RTP padding | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // + // FU indicator: + // +---------------+ + // |0|1|2|3|4|5|6|7| + // +-+-+-+-+-+-+-+-+ + // |F|NRI| Type | + // +---------------+ + // + // Fu header: + // +---------------+ + // |0|1|2|3|4|5|6|7| + // +-+-+-+-+-+-+-+-+ + // |S|E|R| Type | + // +---------------+ + + fuIndicator := b[0] + _ = fuIndicator + fuHeader := b[1] + + startCode := (fuHeader & 0x80) != 0 + endCode := (fuHeader & 0x40) != 0 + + if startCode { + pkt.positionType = PositionTypeFUAStart + return + } + + if endCode { + pkt.positionType = PositionTypeFUAEnd + return + } + + pkt.positionType = PositionTypeFUAMiddle + return + } else if outerNALUType == NALUTypeAVCSTAPA { + pkt.positionType = PositionTypeSTAPA + } else { + nazalog.Errorf("unknown nalu type. outerNALUType=%d", outerNALUType) + } + + return +} + +func calcPositionIfNeededHEVC(pkt *RTPPacket) { + b := pkt.Raw[pkt.Header.payloadOffset:] + + // +---------------+---------------+ + // |0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7| + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // |F| Type | LayerId | TID | + // +-------------+-----------------+ + + outerNALUType := hevc.ParseNALUType(b[0]) + + switch outerNALUType { + case hevc.NALUTypeVPS: + fallthrough + case hevc.NALUTypeSPS: + fallthrough + case hevc.NALUTypePPS: + fallthrough + case hevc.NALUTypeSEI: + fallthrough + case hevc.NALUTypeSliceTrailR: + fallthrough + case hevc.NALUTypeSliceIDRNLP: + pkt.positionType = PositionTypeSingle + return + case NALUTypeHEVCFUA: + // Figure 1: The Structure of the HEVC NAL Unit Header + + // 0 1 2 3 + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | PayloadHdr (Type=49) | FU header | DONL (cond) | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| + // | DONL (cond) | | + // |-+-+-+-+-+-+-+-+ | + // | FU payload | + // | | + // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | :...OPTIONAL RTP padding | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + // Figure 9: The Structure of an FU + + // +---------------+ + // |0|1|2|3|4|5|6|7| + // +-+-+-+-+-+-+-+-+ + // |S|E| FuType | + // +---------------+ + + // Figure 10: The Structure of FU Header + + startCode := (b[2] & 0x80) != 0 + endCode := (b[2] & 0x40) != 0 + + if startCode { + pkt.positionType = PositionTypeFUAStart + return + } + + if endCode { + pkt.positionType = PositionTypeFUAEnd + return + } + + pkt.positionType = PositionTypeFUAMiddle + return + default: + // TODO chef: 没有实现 AP 48 + nazalog.Errorf("unknown nalu type. outerNALUType=%d", outerNALUType) + } + } diff --git a/pkg/rtprtcp/rtp_unpacker_hevc.go b/pkg/rtprtcp/rtp_unpacker_hevc.go deleted file mode 100644 index d150dc2..0000000 --- a/pkg/rtprtcp/rtp_unpacker_hevc.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2020, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package rtprtcp - -import ( - "github.com/q191201771/lal/pkg/hevc" - "github.com/q191201771/naza/pkg/nazalog" -) - -func calcPositionIfNeededHEVC(pkt *RTPPacket) { - b := pkt.Raw[pkt.Header.payloadOffset:] - - // +---------------+---------------+ - // |0|1|2|3|4|5|6|7|0|1|2|3|4|5|6|7| - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // |F| Type | LayerId | TID | - // +-------------+-----------------+ - - outerNALUType := hevc.ParseNALUType(b[0]) - - switch outerNALUType { - case hevc.NALUTypeVPS: - fallthrough - case hevc.NALUTypeSPS: - fallthrough - case hevc.NALUTypePPS: - fallthrough - case hevc.NALUTypeSEI: - fallthrough - case hevc.NALUTypeSliceTrailR: - fallthrough - case hevc.NALUTypeSliceIDRNLP: - pkt.positionType = PositionTypeSingle - return - case NALUTypeHEVCFUA: - // Figure 1: The Structure of the HEVC NAL Unit Header - - // 0 1 2 3 - // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // | PayloadHdr (Type=49) | FU header | DONL (cond) | - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| - // | DONL (cond) | | - // |-+-+-+-+-+-+-+-+ | - // | FU payload | - // | | - // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - // | :...OPTIONAL RTP padding | - // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - - // Figure 9: The Structure of an FU - - // +---------------+ - // |0|1|2|3|4|5|6|7| - // +-+-+-+-+-+-+-+-+ - // |S|E| FuType | - // +---------------+ - - // Figure 10: The Structure of FU Header - - startCode := (b[2] & 0x80) != 0 - endCode := (b[2] & 0x40) != 0 - - if startCode { - pkt.positionType = PositionTypeFUAStart - return - } - - if endCode { - pkt.positionType = PositionTypeFUAEnd - return - } - - pkt.positionType = PositionTypeFUAMiddle - return - default: - // TODO chef: 没有实现 AP 48 - nazalog.Errorf("unknown nalu type. outerNALUType=%d", outerNALUType) - } - -} - -// hevc rtp包合帧部分见func unpackOneAVCOrHEVC diff --git a/pkg/rtprtcp/rtp_unpacker_test.go b/pkg/rtprtcp/rtp_unpacker_test.go new file mode 100644 index 0000000..abb6c8f --- /dev/null +++ b/pkg/rtprtcp/rtp_unpacker_test.go @@ -0,0 +1,128 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package rtprtcp + +import ( + "encoding/hex" + "testing" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/assert" +) + +func TestAACCase1(t *testing.T) { + ss := []string{ + "80e10e9a56843e4cf0bdf2fe00102d10214e6c425f74815f92415f94415f924100000114008a004027f313d564a770026fd01203c9cbac420e1aecaa41efb4619391309daa7938b4b905989c5c293c024819e4234eb324934327e3f083c98c2220137bc9b5b2bae5809351710e4bfec0dec11fffe0ab6e568f8357fb74c3461e892fc3a4e22ac512fa73bdc13004b9d73e21c5221387922692939994e38ab1516ea2cfb64df8cffb5468a81e4704f255a5139f7f21d622b944e0b08221916d4ba6e10a31b2a148c38642d1c8b2b12a15681c0f0b589160feb32b43c7ebade413590916a934148ca7915d6250904e5172a3ecc612ab23fd4431b0eef591be52155e4a0ac8564108012689c4df89221892e50d8fca64a747bb2313908211004222ea6a848132e87935978e03845905ccc057c743c7424e64e2338bfe52090d4a5a961f35ec5e14befbc2b3d41f89bdfe949fc2dd40e141b13e397f84f7e1ef43f303df77407bdacc01c2832c3d3df65fe3bfde0001f37f85f4a021f27baf11e4f388fd5ce464337648fc89e6043a0c8268a44f0e024e112910c8b9bc282ee2e5587762b2752c08a422ac99558f904273f858d9de9938ef2502493ae3217c247410eb39395f084a117f6486161103c5cab0881d7c24ffd7123cdcac226bc292c2dc213d04f0b26dd864e6e149c3a4468a48d86124209e172166c5b331e434c8cae4276e191acc2682d692a51210c1ce2254102e048435919442449f2c57bb271364fbb67f1d79364ee004a28c88e75dd53210eef613186531f22908a5253a87e66423fe1bffef7dd95fed588acf815908808fefff2ffef581fe23b2b8e9fdfbd7e3f7071c05d40ec89401efbdf87beee9a9c1eff2d054c0f7ff5febc514514514514514514517d7fe0faf145145111fc0dd8218d9f9d064ee8895cb84ee848064e7593530495f87ffa276e76549a4e6c0e381270c24ea288e0e1d8ad270f0dc243210e413a84b160e085b10a416be12011022b31354b202020771fe376bf6afe387e67fe0fee8087bee102bf415fa0afd0400000e3e3e3fe001f8f8ff53eff60fafab04fe0877121cebad11c16048271174dc251cbdd3da84e0c9ba23dd312b26500020154a269d9c4520270444131665313103f0f7412d40de5c59cfff0f6383440fc588e986f10b35ddbfdb6be780f30eb25f49fd7e479062097adbc044ba3e0c956ef043c31d188a23d453e83ca71ca44df64805f4514916e71c7e760674293094980642012d104ba2cacd260456e2e93a8c1f492739f9dd445aba9241088de157cb3d976e20888a41b34808f3a4ca181528c989641e0a187dc5443494c896e4421711580a84276a12a229662108289464d105b1513f80841836f009c987f86969641eea943c86c475685fb09043ae947ac5d22ff6ff7c1d380a48885c2a7d03905668ec9fd1fcdf7fc700450094d16f034c90003df9240b851764658e101efb8ea83855071d6871c5126a8385556aa0db094e571d9dc7271352b846810c4e24000000000000038e60384c4f09800000021e17eb447ca381218ff036078221ec1e7843a466acc013c0130585c7518330993235855b71158610843210aa9ba05c22ea5984de5e1227e3e4d2dd942a1659f5320232b45bb6c6418df87b313c727fc3f1d170a6ecb0b74e4449209804a3bf270c98518182b79dd74412c24f7e4f45131a889442502df3928282006d0f165c6e4d39059e7404eb1092044c67267817686dc7dbe82281f1d2f65ddaf70e67250c77adbe39d09be32b8f0007d2f905140d3c34fb6784075ce77075dfb069da091017344c80b6b9a36ff1d93c22889d5a7ccaa2073ccf278e1f8e9c38e1c9a8f502b8e9c84938542ae3a7e1313c26240000000000000384c4f1cc4f09890000021f49fc3847d1b76e8613938122e759d2494b9e4634fc812ab747e3132beb075bf23fed67a0943a245082648a4d06e11166a25f110a24c0c993964042ff0a8d24e4108c489514499641221f84c013c06008e0301c2e0038e6009e0327c7326000001c", + "80e10e9b5684424cf0bdf2fe00102ce8217c4445fd4080000000000021e10fe2c047f365f8a423e48fb4c479f78f48f9e7c2647b178548f27f1693ecdec1b1f704b84989705ea8436bb5241b849b83253444f41189a621383b227b2aa4db0721db25369113cd2598864316d24dd1f1cb7c72df0b049c182460e0f8ee4b846115622efa44950683513c2cee38dcecd270f05c7544d4a274d44b2f90272ed9352894b51182c2583a846952e12092a6a2106913834eef824e0d42188a6455049a95c72590a6a232367c71404a9ab843f1c9a4df84e10e40f6838e5a2534e47187e12d71cd07093486821f1d7f1dc6123e3c83ed71c8c43198fe387207390b56b8e49a8ae13c8c6254e490934c8cdc6903e0091f00429d699b5041ef20f3f0a7218c3933e3c9da8dc25a231ad10c65a24fd011895489f0e4ee61c8d5cd1124d25630842ae1ff449e1c84f55608c489c2552106c10b812798918226672119f548e1cf334f99e2139b109c1391cc26a58c46f6289cc2108d9da924112f01ba2bd4f2081495314996e13246278f51291188032e431a4e12e13c7cc211884e428996e12228948915260896731b5314848c511246b16458923844939f9ae10f82a664aa46aa49e3c845292512a90cc61c844b84f0f548444ccb565725d50c89af92363889a7dd5267e912adf22f2127d4217e1910c32724d74867ea24de12318e4a6cd231ad71c3923c620739239c8cd3f0b1b8ecd2734e4669c81ce4ec1c81e3118d378edc2520e4621085c21021ea1493244267904c8424490151c499a254310912a913c82210920bc8049c71e4030f1841e102210f08f9543c2867f081008480424c2193d7ffde103fff7ffdffdc7e1c7e19cf081e103ffce10201fc80019543fc9c2a88843c2872a07f97f6003f93f943f3801fedfc9fb1fecf7f7e00ffb003cffdffec1ff7fd8ffb7e0ffee16717feffeff8200ffb07e7ffedfeffb201fc9ff6ff600000ff60f7fc705ef7fdbfdfdffbc0ff7ff60f7dff700f7ffec1ff7e3a2000000000008bbfa81000000000bd757353a04bb17e6748fdcefa6647ea8fd4223f3a7ab114f2227c33784f63025af4510f7f4e27773a4fe09f4e238fe2e470f9b2649a45f8827c9b9f12d5d5267b533da22439374822fb39d9e4819920b270842306411832099e31139c89e690d963f84a24423b7211309c905f2b1b8e908df2129a8e381201090861b12d906848c80916c8267a247315bb2b85392861acce44f0c81e1103d62038640272610f08d250c24715778e078edb210c39dd5c71e4a3ceceeace8524a571c2e3d7fff091544148e11e41181e3b1f84a79d23118e5e115c77098f15ffe90296704d6323125cbd0e4c864df533b2b3a28843d11280b211e3e3f8f9d55ffdcef1b852902ab8e978e14947449e1cef009c8804433b2a3c81eed121e1287a2928278370812718e46462498564a844cac823403f762741126225c7e3c8c44312b65135baefa5d244e5288c59b812884a55d9149c55138b23c389560d06b217f17fc6e12413ad4ab7a84961271646570925ac8de5607309c55105bbc3082e8115d0e12e13a13bff64e8442214110c4bbeb1189124c0900c093a41380193a2100a32a1484a4117b31f55c7af212a5112ac830bc8d284320c464141394fb3e511293a17ff849efe3a5c7caffd7fffffe63d781353f1fbf1ebfffec7f5bff5c75fc76463f5f1fdd923fd24d11394b3aaff5f959612e224d44bc8cac1ceebfff7ccf3a86739d1fc757e7c1ffecd73a2fff78f561ffe7ff84ce799e271c270872242112138e1091c848e4e3b13843f087e3a4e10e01c2c100027065f0b0785091972f85964a0cbe120f1d8201c2c1e1208108d1803848dc2cbe3b2f848dc7308425c600e39180384c271d97c725871d97c768f1c964a5d1e3b2c3204be3b2f85964e5d100e392c09e0301c7301c2e038e6240385c4f1cc07098000000e13001c72c71c6f096384c4f0b89e396385b01c2e2784c4871db1c771201c7359c270b0e3bc2f8ed700000000000e", + "80610e9c5684464cf0bdf2fe00102db8211a0455fd2003dbd44f3d9b002dd3e3cdef4097d20fbaa47f5b9c188fa9524fb075d2184b84f1fb320cdf93e3be3427dcee120512392e584b07048931a4f0e923073642e6dec4391c3e62ecd4133d32184ab32c421124114e9384e09c72c64fb1c2d1a8cf33442756a91261c963544a49c9cd9ba0909a7256ad119b3493dfc2e089435109782234d5c2d8278db0047619e20d81c2d3233b124605badade62233a5909d4c9406e769c4675b25025811916f8e378ed1e3ad216f177e21528909ef2640907d1201a846fdf22679008654bfc75fc2849bde40c4206792904226be44350811f3f9a7e7f1c092bab228ba4db0c84d41044d24dc571d0929b74922d909a19f833e4f24f791c24327a37926bc9bda45a426d9f2ac424186490de1024aedbe3b2c966aa708de16ef1cc713b61e3b348a0241eac7f0ff29c767135371e4e250dff84cef7484cba467cfe15e429c122b90420e0c848210590848a646e1091241090b210689182423228912138e12b1a240212418e4a1c3270864470adca84027210a8901d12279b6fc2255a112850894e7da6ea2c04b2d2091084a14eceb6485d21005d2565e49e4236615a0f26eba41e427225e3e9dc2bc9d9b645f88232e16777e3c7fffe45ef2407f1dbb9d41ffa24e793b242207f1d3f084a8884e4248d89b50108c488462134026725472094139139f8e412473ccd367f9f534fe127900c3230c248269549c7084c24261864e1be57864a1865786442195a191b91bb289c18c46e412583b96edcf7fc241e16e5bd83ac20f1c8dc76080138d18841824ae4621060d610495cc1dbf078e4125060f1d824a0c12706e0567078ec1e12e7097385e2bc773dc7727c733dc264f8e64c0009a327c2e78038ee7b8ee4c967b27c271fe39cd78e71f0000e178ff09c7f8e735000e178f871ce6bc2f1fe178f8709c7c384c971ce3fc274e0e3bc7f8ef1f00e39c7f84e9c04375bae3bac0e3bbae13580001c73a170bace1359c26eb85dd13dd6b385d6709c7f84e6bc271f0233b9a870bc7c384e3fc738f871ce6bc739a8709cd4236fe903aefae6bf1df8bd4e7384ec025f57e7cc012f89fdf88fe2c7e6490f055c27f0f37c46748215b5a4373009f60f7d10f1c4823c131043ddfdf09632313562f3b6148c6cd11e0d9120dcee74804e25f24190467064d449ab271365c2e1b8e6009e9701c25827560da5548448849b85c79778fc82d7c2a0943093420843864e6cd270e590c12b8ec3274d5c2349e2da4a8d1e16d920c9e15f2728849864570c8ad72941949177be5995292c9be0f1d8c46d4de38702318e466bf858846e41995244e4a8e09025cbb2acc97081cf95e4e9dc29ed8237152cd5bb5646ab320c3e0644b56592ffd89369117378ea0901c45c0e382b324130378475980b3cb8f4ad4421225b912d4194e0ddb071f412358b9091c23b2048e16a9124925021f1d270ad24493ff6224059ee2250f6a91808e3af221935253ba0b7440bac64a449b67852f082270961762eec844162bb246565905c2e393ff609ae7feb2ec3e543fe73214c248454306648248f0ffbbda57641cecf990f32a2d924f2ca8424d21332ff5a4a33ee93ddb24836492396ef2913c62442cfb26ed2d8aba99fc296c63d125c1635d65a90bdae44a524d2f1d9374b7f1a7c2ddc722d2fecffbd9e520879122ecd2b4f427085fbf122a2ee83fade3907f5c49652280930d0e103951776af2b44cec8ffeffab2bae5e5925a2ee5925a3f5a4d68ad9a4674e24a6d6eaa0e9d053b8e4ee115b6109d4c8aa771d6f1d500106b4841836ec621060918372dcc191b9078e4125060f1d824a0c1aca081283738ec1f7bef785171dc4f1cc071cc4f0b80e3b8000025b3c070b8900e3b89e39802789c0709bae3bc2f8e6e8000385dd709bae3bc2c00385dd071de17c26b384d6070bac0e13a871dd6709c2c38e6b38e6b0038eeb384e1604375bae39ac0e3bbae17580001c71fc26b385d6709bae17744b75ace1359c2eb384dd70bac023d0b741c26b0385d671dd6071cdd", + "80e10e9d5684464cf0bdf2fe00102db871cdd0709ba0e0", + } + + var pkts []RTPPacket + for _, s := range ss { + pkt, err := hexstream2rtppacket(s) + assert.Equal(t, nil, err) + pkts = append(pkts, pkt) + } + + expected := []base.AVPacket{ + { + Timestamp: 30239734, + PayloadType: base.AVPacketPTAAC, + Payload: pkts[0].Raw[12+4:], + }, + { + Timestamp: 30239756, + PayloadType: base.AVPacketPTAAC, + Payload: pkts[1].Raw[12+4:], + }, + { + Timestamp: 30239777, + PayloadType: base.AVPacketPTAAC, + Payload: append(pkts[2].Raw[12+4:], pkts[3].Raw[12+4:]...), + }, + } + var outPkts []base.AVPacket + unpacker := DefaultRTPUnpackerFactory(base.AVPacketPTAAC, 48000, 128, func(pkt base.AVPacket) { + outPkts = append(outPkts, pkt) + }) + for _, pkt := range pkts { + unpacker.Feed(pkt) + } + assert.Equal(t, expected, outPkts) +} + +func TestAACCase2(t *testing.T) { + ss := []string{ + "80e104558424aa36b06db689003000c000300030de04004c61766335372e3130372e31303000422008c11838211004608c1c212004608c1c", + "80e104568424b636b06db68900102d7020824ecd7ec9037f01bf9cdfe1af65fe2f8929dfff5754fd6be1a938fffbd772fdae7b717ad7cfff15d712f87c756ebec000025c1677100007f1aec2801869302bacc0003ecf821400b0a24b8fca00008c628be329f92424a4989200000001598ba8883e3d8a700000000c0dca33e12a4380000000000064f04e8520b013044271d16820000000000000834fdd38181d45ee0000000000000aec04ce29d0a41a9fe09352c00000000000000025a110233005e4f613687cf3000e8d0000fec7dcb47a7d482a131082024d2655350e6204812e36826cb0cd6c45100950191294826310282650139f33fcf5345951945c1ba584e00f071139b28839e4081a1c44d5389b49d704d4e26b06547d0b0c8162ecaad8b44d0cbf3a5020b7d4a69925d6d0c82265a8dc7376c2a9aa58af99d73d8e504b4796c377322153aaee260019f4b2a20900fe61db59003fe622264ca9f39f6d6c36ed9fc1ca84bbcb451e3b9789f5dfc5ba65d0cb0dda74503aa48962118439b889152a16595d624caa02214e0c5a9012b1e8326760a222c5114bec74e7f9fd3af736db38fa3f977887413632f7634bc5ad4f93a058c6fff3316573d9f2c834128c5caa7268749c22063642935a8a57371f5aa2c991ae925664268630db8b26621094cff8fd94bee0267844e6a4810741aad616577d4116df07ad91124998dfae2288248e720105dc8b1c84a356938445f66d3054a7273536e12cf5d88ebae01228ae945da7a0a1107109c29ddc84173c80da4254ca0436fbf1ea6ee1d669a21ddf64ca8c1459d4142d420e45aa62514feb5352c0722f16f04f1ab25183bafa0c9803dc44081f48a4350ede00bac2031ebbe1f2e05632091e060aca20d2dbc91864de8b3e21186722c310801ad884ce722f836fd2ad4372e750c991b2fd23338e7e791953890e1e5707a41121884d29054a210e0904088a679148884151284d9bc9497105970429181170237ad67655484244875ac02320a4cd66c79b900b52c624b011203dcfb8a7091c2b0500034a74d9b65c8717f31bf94dfca552b3ffc7f6fff63aea4037fff67ee4f8d17f7af13ffd9f75fdc5fc5bd03ebdfc21f97f68775e98d1d330b000095e21085408aa263fa21fe0f1f5b02644d14a21326d6b1c9d087293486067e55804f53b0258584433d972144393e713c25b2439442b4fbb219364cb5a11063f00479b9318367100035ef25f3f9318676010317b94999d59a880958f11298c9b59958241ee95444c69b3cf95184201e773936a7f3c412c960a4cd0c9c1273d7d931c37ff6d9d929418b8a94ba43b78934116c9e0bb4c426d6cef2ecc4128781f52a8239071492ce457325d8846086cf25da0f16d8b90c7802ea60d0d0b72d9e6fa8c9f17f2445c39d8fa80800c48e4958dad48ac846d8ad1463e5d72326c95518ff824593090c24165a9e3123b092535d0eb26ca00b30d320b26c8c15b2c03ab49513cb42200013203e03ed995c9b6f64407e93590fe3b91b46fd7e836649f75ee1adc19fb424b45d67997f6a4822f929ef35faddb5df5c53581758c62118564b229411952c81164aae379c084435068ec3223824d712cf19144e250d241082500b5a0c91e191b83c7916a0904a8472259a46942941249d81b785ff89201ea30130a2a474ce62261657760a99323e547108ade54a8e57faefd9549589f0217754f7b64165c0e5ddbc30a69d1b60a8358850e615eaffaf67cfb739b008aebe90e5cdb6599d9411eb29d4bf1193626020bbd249ada1104e39499436342c7e7d9ffd9bb473ee308539a47090c84f69025f2436938b648e12e11aa12260e053c981595e0918abb365111beb127dba50058d03203fe97ea545a7be2a222a70eecada1cf44545fc92fa001c46e917f9f3e4cc0b9b3cf647886082ec8b97fdc800446440b4d7ee15355c86a20181688e56792390929dffe60514948464f344321cdb324ecaacc19dc776c0e59f3bb4d7831c9ae5fed2662cc82f98bdf7ff8659e053949f8f4f8f04bb7bf96d2dcdb3f0af748fb7edfd5b0e0", + "80e104578424ba31b06db68900101710217c8fcffbffffffff1979833210cc1418840000000125d494cbd56b7a9f71939a407bb23c36490157b3716461bc99a2d721bb5f9322e942220d168209551277cedec9ac2411dde7ddd3f1098064caaae00452acec522331102b6bfa04f7f0f26031a48a98c3bb369e8df22dffd4fd97ffeec0fbb3c6e2b715faf8ef3c6542f86755595f618c773f9e743e6187bab6ae9ce8c9334bdfcef0fc4a22fad25b7d627d0658d1b0787baf71df6df58cd130f1ee8691a96c97c6d78c7696c2b6344b4c5a316341cbd9301eff71baad0e92a58c7f4ebad240852f1a802a7231f4f674e925b06f8efda7744cd6626a102f7e444844b6b10530e512da58e67a4b5823da6f230639396fb448c884a800b66ed7bd39648dceaa5ebc96f5db55e15e311a928a4c14d808318140ac573482c6d952f86ba62071addd4aba2ea4ec411290c5895734ac6aa71a227024b904ba9500f2625574a6ae33a4a3711301219b387e7a1cf996d1d5d21fcfd26d4684d449dd6a1f6edc67af2e0be6e611af2a0c0ad0450128040c82006f93201d01e8b1da15175910c8a08f31785396e7d69fd0115e62d246cc9c64e91c2c9a20232e347629151e210000000049752532f55adea7dc5a0194f8491726a28be359087d659d41678fa573b26eb25664ca0f575139ebf21f8d1bf4f5b5fdee41544239171dff3f4c688fa761594a14ddc40aa522794342bb1dd508bf1b8bb3f8e4a693ecb1cff09c786e084aabc551be968b928cd1519b62b14df3854c6026a84d74a584d3f4ccd9ae7330b4ab0dd1da78a3d355f9ec9718f772945b02f25c34e2db785eb9285f31d74e74003b98948261aaea04c173aae18453f37483fefb0792099cebbd230ee44e059980d83d983025601a7e9aa1a08850830de3e2991b508b305a280e99e6f3b3b6ae65b7e20cd36ba24180e55600cb2b0f56197f41b86417aca1257af9603ec6dd2e569d1c455b9f83c389629e4135e660ee55004e738babd3fedff03562b0896f85c87b7c9ddf0f48b9db3c60c8afcb500380", + "80e104588424be31b06db689001016d8211c8d2ff7fffffffd1975a43251621000000006af6ba26aa2e3f904d00271b1e487d009e4320471d8f204892ad2252d72b8700260e8b4113e20831048a5a0438f1b93832a048415ff73b5bf69631c8a4e4ca6eb89fc8454ab3558b7ed96e5557083eb2ac47779eb23fa2d680f806dfa4ffd6590df3f6e990132030626063e01e73386af34ea3ff2746f376e6c8fcc3f0574828a15f1375e386521f5b50f0f4aea7ffdb9e0d27caff771bbfd4788dcbd118e539df41f01be771cf33af56f9a35090fd7b57dfd8f557cb9e398b7df5f1ac4d4bb56f69cdd2211b6f49679293c43dc7b283840528ea9f573ee24cc7c055d4b9844491583e315d70af6dd5e73fdab59fd17d532f48dda3cb452cba838dccbbdc2b7d5b4699a8e565a4b71035f2a9bd97bb88b90057c240692aa50aca5486142176a21104b5b19c1a691b086d4118810597934c2302e49091073911ba492badc8dba7ec264c5d29602f779ff3b58a48c510a681f42e7cdfd3ecf37fa7e8f6f8eda069baed02641111cea27687eba9db1f5471d5bb4734a6ca7f63caa22f5c04b5df2a7a5f75000ca648557f5c4ee3bca7e079e382643a1a808cb851d8e67222204200000000d5ed744d545c7f916413a981219ae144e8cafae1383088cb0139c294c240e3f9c9dc1d324047aaf90dd06794b20bebd8f4d73ffefb80f3d6dfde7dddd613573e886de98f79a921d12eb49909e8b4415f66cdc863a11bf1aaada975bd2d91e20d7f58ca2a3219d9bb55d873c2c399a17028a2bdc09a513165845609d47ee11f4f5e8822407a13c1f876c728291c28931eb6004cc5e86a441a29503a82c2b848200ee55e3df0880cc00dbd92d204170376d1d2c19355727b70dd508350425a031cb73d7fee9bbd887878b6b4a3ebb056e59b44b1e7a4884f1e3cfdab2b3607e594e2cf47eb553f54696c0acb08ba8b411ed053572db39a5c885723bc0bd2d97d92b27b5381d5bd555ae9fba09cde8ea4e66a7bc1b3488deebff89ad1d8aac49546b79e301c0", + } + + var pkts []RTPPacket + for _, s := range ss { + pkt, err := hexstream2rtppacket(s) + assert.Equal(t, nil, err) + pkts = append(pkts, pkt) + } + + expected := []base.AVPacket{ + { + Timestamp: 69281105, + PayloadType: base.AVPacketPTAAC, + Payload: pkts[0].Raw[12+2+6 : 12+2+6+24], + }, + { + Timestamp: 69281137, + PayloadType: base.AVPacketPTAAC, + Payload: pkts[0].Raw[12+2+6+24 : 12+2+6+24+6], + }, + { + Timestamp: 69281169, + PayloadType: base.AVPacketPTAAC, + Payload: pkts[0].Raw[12+2+6+24+6:], + }, + { + Timestamp: 69281201, + PayloadType: base.AVPacketPTAAC, + Payload: pkts[1].Raw[12+4:], + }, + { + Timestamp: 69281233, + PayloadType: base.AVPacketPTAAC, + Payload: pkts[2].Raw[12+4:], + }, + { + Timestamp: 69281265, + PayloadType: base.AVPacketPTAAC, + Payload: pkts[3].Raw[12+4:], + }, + } + var outPkts []base.AVPacket + unpacker := DefaultRTPUnpackerFactory(base.AVPacketPTAAC, 32000, 128, func(pkt base.AVPacket) { + //nazalog.Infof("out: %d, %d", pkt.Timestamp, len(pkt.Payload)) + outPkts = append(outPkts, pkt) + }) + for _, pkt := range pkts { + //nazalog.Infof("in: %+v %d", pkt.Header, len(pkt.Raw)) + unpacker.Feed(pkt) + } + assert.Equal(t, expected, outPkts) +} + +func hexstream2rtppacket(in string) (pkt RTPPacket, err error) { + var raw []byte + raw, err = hex.DecodeString(in) + if err != nil { + return + } + pkt, err = ParseRTPPacket(raw) + return +} diff --git a/pkg/rtsp/base_in_session.go b/pkg/rtsp/base_in_session.go index 0931707..5ed029b 100644 --- a/pkg/rtsp/base_in_session.go +++ b/pkg/rtsp/base_in_session.go @@ -72,8 +72,8 @@ type BaseInSession struct { audioRRProducer *rtprtcp.RRProducer videoRRProducer *rtprtcp.RRProducer - audioUnpacker *rtprtcp.RTPUnpacker - videoUnpacker *rtprtcp.RTPUnpacker + audioUnpacker rtprtcp.IRTPUnpacker + videoUnpacker rtprtcp.IRTPUnpacker audioSSRC nazaatomic.Uint32 videoSSRC nazaatomic.Uint32 @@ -114,12 +114,12 @@ func (session *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicCo session.mu.Unlock() if session.sdpLogicCtx.IsAudioUnpackable() { - session.audioUnpacker = rtprtcp.NewRTPUnpacker(session.sdpLogicCtx.GetAudioPayloadTypeBase(), session.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked) + session.audioUnpacker = rtprtcp.DefaultRTPUnpackerFactory(session.sdpLogicCtx.GetAudioPayloadTypeBase(), session.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked) } else { nazalog.Warnf("[%s] audio unpacker not support for this type yet.", session.uniqueKey) } if session.sdpLogicCtx.IsVideoUnpackable() { - session.videoUnpacker = rtprtcp.NewRTPUnpacker(session.sdpLogicCtx.GetVideoPayloadTypeBase(), session.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked) + session.videoUnpacker = rtprtcp.DefaultRTPUnpackerFactory(session.sdpLogicCtx.GetVideoPayloadTypeBase(), session.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, session.onAVPacketUnpacked) } else { nazalog.Warnf("[%s] video unpacker not support this type yet.", session.uniqueKey) }