From 37d8d1738d3746cb5cb100e8f6d84820f1fe3063 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sun, 24 Jan 2021 11:57:51 +0800 Subject: [PATCH] messages: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [doc] 启动lal官方文档页: https://pengrl.com/lal - [doc] 新增文档《rtmp url,以及vhost》: http://pengrl.com/lal/#/RTMPURLVhost - [feat] demo,新增/app/demo/pullrtmp2pushrtmp,从远端服务器拉取RTMP流,并使用RTMP转推出去,支持1对n转推 - [fix] rtsp,setup信令header中的transport字段区分record和play,record时添加mode=record --- README.md | 146 ++++++++------------------- TEST.md | 46 --------- app/demo/README.md | 21 ---- app/demo/pullrtmp2pushrtmp/main.go | 38 +++++++ app/demo/pullrtmp2pushrtmp/tunnel.go | 119 ++++++++++++++++++++++ app/demo/pullrtsp/pullrtsp.go | 2 +- app/lalserver/main.go | 12 ++- conf/edge.conf.json | 6 +- conf/lalserver.conf.json | 2 +- conf/lalserver.conf.json.brief | 70 ------------- conf/lalserver.conf.json.tmpl | 2 +- conf/node2.conf.json | 2 +- pkg/base/rtmp_t.go | 7 ++ pkg/base/session.go | 38 +++---- pkg/base/version.go | 10 +- pkg/httpflv/client_pull_session.go | 4 +- pkg/logic/entry.go | 2 + pkg/logic/group.go | 7 +- pkg/rtmp/chunk_composer.go | 2 +- pkg/rtmp/client_session.go | 7 +- pkg/rtmp/handshake.go | 7 +- pkg/rtmp/server_session.go | 6 +- pkg/rtsp/client_command_session.go | 25 ++++- pkg/rtsp/pack.go | 28 +---- pkg/rtsp/rtsp.go | 13 ++- pkg/rtsp/server_command_session.go | 13 ++- 26 files changed, 312 insertions(+), 323 deletions(-) delete mode 100644 TEST.md delete mode 100644 app/demo/README.md create mode 100644 app/demo/pullrtmp2pushrtmp/main.go create mode 100644 app/demo/pullrtmp2pushrtmp/tunnel.go delete mode 100644 conf/lalserver.conf.json.brief diff --git a/README.md b/README.md index fb03c0b..081bed4 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@

-Live And Live +Live And Live
@@ -9,9 +9,7 @@
-
-
@@ -19,104 +17,48 @@ --- -lal是一个开源GoLang流媒体项目,包含三个主要组成部分: - -- lalserver:接收客户端推流,转发给对应的拉流客户端,也即承担直播场景中的源站、CDN边缘分发等角色。类似于`nginx-rtmp-module`、crtmpserver等应用。 -- demo:一些小应用,比如推、拉流客户端,压测工具,流分析工具,调度示例程序等。类似于ffmpeg、ffprobe等应用,以及提供给业务方的使用示例。 -- pkg:流媒体协议部分,业务方可使用它编写自身的应用。类似于ffmpeg的libavformat等库。 - -**`app/lalserver`服务器支持的协议:** - -| - | sub rtmp | sub http(s)-flv | sub http-ts | sub hls | sub rtsp | relay push rtmp | -| - | - | - | - | - | - | - | -| pub rtmp | ✔ | ✔ | ✔ | ✔ | X | ✔ | -| pub rtsp | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | -| relay pull rtmp | ✔ | ✔ | ✔ | ✔ | X | . | - -| 编码类型 | rtmp | rtsp | hls | http(s)-flv | http-ts | -| - | - | - | - | - | - | -| aac | ✔ | ✔ | ✔ | ✔ | ✔ | -| avc/h264 | ✔ | ✔ | ✔ | ✔ | ✔ | -| hevc/h265 | ✔ | ✔ | X | ✔ | X | - -表格含义见: [《流媒体传输连接类型之session client, server, pub, sub, push, pull》](https://pengrl.com/p/20080) - -**`app/lalserver`功能特性:** - -- [x] **全平台**。(依托Go语言):支持`(linux/macOS/windows)`多平台开发、调试、运行。支持交叉编译。生成的可执行文件(无任何库依赖)可独立运行。(开放源码的同时)提供各平台可执行文件,可(免编译)直接运行 -- [x] **高性能**。多核多线程扩展 -- [x] **多种直播流封装协议**。支持RTMP/RTSP/HTTP-FLV/HTTP-TS/HLS,不同封装协议支持相互转换 -- [x] **多种编码格式**。视频支持H264/AVC,H265/HEVC,音频支持AAC -- [x] **录制**。支持HLS录制(HLS直播与录制可同时开启) -- [x] **HTTPS**。支持HTTPS-FLV拉流 -- [x] **RTSP**。支持interleaved模式。支持digest auth验证。支持`GET_PARAMETER`。 -- [x] **HTTP API接口**。用于获取服务信息,向服务发送命令。见[《lal流媒体服务器的HTTP API接口》](https://pengrl.com/p/20100) -- [x] **HTTP Notify事件回调**。见[《lal HTTP Notify(or Callback or Webhook)事件回调》](https://pengrl.com/p/20101) -- [x] **分布式集群**。 -- [x] **静态pull回源**。通过配置文件配置回源地址 -- [x] **静态push转推**。支持转推多个地址。通过配置文件配置转推地址 -- [x] **CORS跨域**。支持HTTP-FLV,HTTP-TS,HLS跨域拉流 -- [x] **HTTP文件服务器**。比如HLS切片文件可直接播放,不需要额外的HTTP文件服务器 -- [x] **秒开播放**。GOP缓冲 - -除了lalserver,还提供一些基于lal开发的demo(比如客户端程序): [《lal/app/demo》](https://github.com/q191201771/lal/blob/master/app/demo/README.md) - -Live And Live - -发行版本日志:[《CHANGELOG.md》](https://github.com/q191201771/lal/blob/master/CHANGELOG.md) - -### 编译,运行,体验功能 - -#### 编译 - -方式1,从源码自行编译 - -```shell -$export GO111MODULE=on && export GOPROXY=https://goproxy.cn,https://goproxy.io,direct -$make -``` - -方式2,直接下载编译好的二进制可执行文件 - -[点我打开《github lal最新release版本页面》](https://github.com/q191201771/lal/releases/latest),下载对应平台编译好的二进制可执行文件的zip压缩包。 - -#### 运行 - -```shell -# 注意,windows平台将路径分隔符`/`换成`\` -$./bin/lalserver -c conf/lalserver.conf.json -``` - -#### 体验功能 - -快速体验lalserver服务器见: [《常见推拉流客户端软件的使用方式》](https://pengrl.com/p/20051/) - -lalserver详细配置见: [《配置注释文档》](https://github.com/q191201771/lal/blob/master/conf/lalserver.conf.json.brief) - -### 第三方依赖 - -我自己写的Go基础库: [github.com/q191201771/naza](https://github.com/q191201771/naza) - -### 联系我 - -- 个人微信号: q191201771 -- 个人QQ号: 191201771 +lal是一个开源GoLang直播流媒体网络传输项目,包含三个主要组成部分: + +- lalserver:流媒体转发服务器。类似于`nginx-rtmp-module`等应用,但支持更多的协议,提供更丰富的功能。[lalserver简介](https://pengrl.com/lal/#/LALServer) +- demo:一些小应用,比如推、拉流客户端,压测工具,流分析工具,调度示例程序等。类似于ffmpeg、ffprobe等应用。[Demo简介](https://pengrl.com/lal/#/DEMO) +- pkg:流媒体协议库。类似于ffmpeg的libavformat等库。 + +**lal源码package架构图:** + +![lal源码package架构图](https://pengrl.com/lal/_media/lal_src_fullview_frame.jpeg) + +**lalserver特性图:** + +![lalserver特性图](https://pengrl.com/lal/_media/lal_feature.jpeg) + +了解更多请访问: + +* [lal github地址](https://github.com/q191201771/lal): https://github.com/q191201771/lal +* [lal 官方文档](https://pengrl.com/lal): https://pengrl.com/lal + * **/lalserver/** + * [简介](https://pengrl.com/lal/#/LALServer.md) + * [编译、运行、体验功能](https://pengrl.com/lal/#/QuickStart.md) + * [配置文件说明](https://pengrl.com/lal/#/ConfigBrief.md) + * [HTTP API接口](https://pengrl.com/lal/#/HTTPAPI.md) + * [HTTP Notify(Callback/Webhook)事件回调](https://pengrl.com/lal/#/HTTPNotify.md) + * [Demo简介](https://pengrl.com/lal/#/DEMO.md) + * [Changelog修改记录](https://pengrl.com/lal/#/CHANGELOG.md) + * [github star趋势图](https://pengrl.com/lal/#/StarChart.md) + * [第三方依赖](https://pengrl.com/lal/#/ThirdDeps.md) + * [联系作者](https://pengrl.com/lal/#/Author.md) + * **/技术文档/** + * [常见推拉流客户端使用方式](https://pengrl.com/lal/#/CommonClient.md) + * [连接类型之session pub/sub/push/pull](https://pengrl.com/lal/#/Session.md) + * [rtmp url,以及vhost](https://pengrl.com/lal/#/RTMPURLVhost.md) + * [FAQ](https://pengrl.com/lal/#/FAQ.md) + * **/待整理/** + * [性能测试](https://pengrl.com/lal/#/Test.md) + * [图稿](https://pengrl.com/lal/#/Drawing.md) + +联系作者: + +- email:191201771@qq.com +- 微信:q191201771 +- QQ:191201771 - 微信群: 加我微信好友后,告诉我拉你进群 - QQ群: 1090510973 - -欢迎任何技术和非技术的交流。 - -目前lal正在收集新一轮需求中。 - -并且,lal也十分欢迎开源贡献者的加入。提PR前请先阅读:[《yoko版本PR规范》](https://pengrl.com/p/20070/) - -### 性能测试,测试过的第三方客户端 - -见[《TEST.md》](https://github.com/q191201771/lal/blob/master/TEST.md) - -### 项目star趋势图 - -觉得项目还不错,就点个star支持一下吧 :) - -[![Stargazers over time](https://starchart.cc/q191201771/lal.svg)](https://starchart.cc/q191201771/lal) - diff --git a/TEST.md b/TEST.md deleted file mode 100644 index 171630d..0000000 --- a/TEST.md +++ /dev/null @@ -1,46 +0,0 @@ -### 性能测试 - -测试场景一:持续推送n路RTMP流至lalserver(没有拉流) - -| 推流数量 | CPU占用 | 内存占用(RES) | -| - | - | - | -| 1000 | (占单个核的)16% | 104MB | - -测试场景二:持续推送1路RTMP流至lalserver,使用RTMP协议从lalserver拉取n路流 - -| 拉流数量 | CPU占用 | 内存占用(RES) | -| - | - | - | -| 1000 | (占单个核的)30% | 120MB | - -测试场景三: 持续推送n路RTMP流至lalserver,使用RTMP协议从lalserver拉取n路流(推拉流为1对1的关系) - -| 推流数量 | 拉流数量 | CPU占用 | 内存占用(RES) | -| - | - | - | - | -| 1000 | 1000 | 125% | 464MB | - -* 测试机:32核16G(lalserver服务器和压测工具同时跑在这一个机器上) -* 压测工具:lal中的 `/app/demo/pushrtmp` 以及 `/app/demo/pullrtmp` -* 推流码率:使用`srs-bench`中的FLV文件,大概200kbps -* lalserver版本:基于 git commit: xxx - -*由于测试机是台共用的机器,上面还跑了许多其他服务,这里列的只是个粗略的数据,还待做更多的性能分析以及优化。如果你对性能感兴趣,欢迎进行测试并将结果反馈给我。* - -性能和可读,有时是矛盾的,存在取舍。我会保持思考,尽量平衡好两者。 - -### 测试过的第三方客户端 - -``` -RTMP推流端: -- OBS 21.0.3(macos) -- OBS 24.0.3(win10 64 bit) -- ffmpeg 3.4.2(macos) -- srs-bench (macos srs项目配套的一个压测工具) -- pushrtmp (macos lal demo中的RTMP推流客户端) - -RTMP / HTTP-FLV 拉流端: -- VLC 3.0.8(macos 10.15.1) -- VLC 3.0.8(win10) -- MPV 0.29.1(macos) -- ffmpeg 3.4.2(macos) -- srs-bench (macos srs项目配套的一个压测工具) -``` diff --git a/app/demo/README.md b/app/demo/README.md deleted file mode 100644 index 0f45b70..0000000 --- a/app/demo/README.md +++ /dev/null @@ -1,21 +0,0 @@ -`/app/demo`示例程序功能简介: - -| demo | push rtmp | push rtsp | pull rtmp | pull httpflv | pull rtsp | 说明 | -| - | - | - | - | - | - | - | -| pushrtmp | ✔ | . | . | . | . | RTMP推流客户端;压力测试工具 | -| pullrtmp | . | . | ✔ | . | . | RTMP拉流客户端;压力测试工具 | -| pullrtmp2hls | . | . | ✔ | . | . | 从远端服务器拉取RTMP流,存储为本地m3u8+ts文件 | -| pullhttpflv | . | . | . | ✔ | . | HTTP-FLV拉流客户端 | -| pullrtsp | . | . | . | . | ✔ | RTSP拉流客户端 | -| pullrtsp2pushrtsp | . | ✔ | . | . | ✔ | RTSP拉流,并使用RTSP转推出去 | -| pullrtsp2pushrtmp | ✔ | . | . | . | ✔ | RTSP拉流,并使用RTMP转推出去 | -| analyseflv | . | . | . | ✔ | . | 拉取HTTP-FLV流,并进行分析 | - -. - -| demo | 说明 | -| dispatch | 简单演示如何实现一个简单的调度服务,使得多个lalserver节点可以组成一个集群 | -| flvfile2es | 将本地FLV文件分离成H264/AVC和AAC的ES流文件 | -| modflvfile | 修改flv文件的一些信息(比如某些tag的时间戳)后另存文件 | - -(更具体的功能参加各源码文件的头部说明) diff --git a/app/demo/pullrtmp2pushrtmp/main.go b/app/demo/pullrtmp2pushrtmp/main.go new file mode 100644 index 0000000..a3ed4fb --- /dev/null +++ b/app/demo/pullrtmp2pushrtmp/main.go @@ -0,0 +1,38 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package main + +import ( + "flag" + "fmt" + "os" + "strings" + "time" + + "github.com/q191201771/naza/pkg/nazalog" +) + +func main() { + i := flag.String("i", "", "specify pull rtmp url") + o := flag.String("o", "", "specify push rtmp url list, separated by a comma") + flag.Parse() + if *i == "" || *o == "" { + flag.Usage() + _, _ = fmt.Fprintf(os.Stderr, `Example: + %s -i rtmp://127.0.0.1/live/test110 -o rtmp://127.0.0.1/live/test220 + %s -i rtmp://127.0.0.1/live/test110 -o rtmp://127.0.0.1/live/test220,rtmp://127.0.0.1/live/test330 +`, os.Args[0], os.Args[0]) + } + + ol := strings.Split(*o, ",") + PullRTMP2PushRTMP(*i, ol) + + nazalog.Info("done.") + time.Sleep(60 * time.Second) +} diff --git a/app/demo/pullrtmp2pushrtmp/tunnel.go b/app/demo/pullrtmp2pushrtmp/tunnel.go new file mode 100644 index 0000000..426e803 --- /dev/null +++ b/app/demo/pullrtmp2pushrtmp/tunnel.go @@ -0,0 +1,119 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package main + +import ( + "encoding/hex" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/remux" + "github.com/q191201771/lal/pkg/rtmp" + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/nazastring" + "github.com/q191201771/naza/pkg/unique" +) + +// 拉取一路rtmp流,并使用rtmp转推出去,可1对n转推 +// 阻塞直至拉流或者(任意一路)推流失败或结束 +// +// @param inURL 拉流rtmp url地址 +// @param outURL 推流rtmp url地址列表 +// +func PullRTMP2PushRTMP(inURL string, outURLList []string) error { + const ( + pullTimeoutMS = 10000 + pushTimeoutMS = 10000 + ) + + tunnelUK := unique.GenUniqueKey("TUNNEL") + nazalog.Infof("[%s] new tunnel. inURL=%s, outURLList=%+v", tunnelUK, inURL, outURLList) + + errChan := make(chan error, len(outURLList)+1) + rtmpMsgQ := make(chan base.RTMPMsg, 1024) + + var pullSession *rtmp.PullSession + var pushSessionList []*rtmp.PushSession + + defer func() { + if pullSession != nil { + nazalog.Infof("[%s] dispose pull session. [%s]", tunnelUK, pullSession.UniqueKey()) + pullSession.Dispose() + } + for _, s := range pushSessionList { + nazalog.Infof("[%s] dispose push session. [%s]", tunnelUK, s.UniqueKey()) + s.Dispose() + } + }() + + for _, outURL := range outURLList { + pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { + option.PushTimeoutMS = pushTimeoutMS + }) + nazalog.Infof("[%s] start push. [%s] url=%s", tunnelUK, pushSession.UniqueKey(), outURL) + + err := pushSession.Push(outURL) + if err != nil { + nazalog.Errorf("[%s] push error. [%s] err=%+v", tunnelUK, pushSession.UniqueKey(), err) + return err + } + nazalog.Infof("[%s] push succ. [%s]", tunnelUK, pushSession.UniqueKey()) + + pushSessionList = append(pushSessionList, pushSession) + go func(u string, s *rtmp.PushSession) { + err := <-s.Wait() + nazalog.Errorf("[%s] push wait error. [%s] err=%+v", tunnelUK, s.UniqueKey(), err) + errChan <- err + }(outURL, pushSession) + } + + pullSession = rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { + option.PullTimeoutMS = pullTimeoutMS + }) + nazalog.Infof("[%s] start pull. [%s] url=%s", tunnelUK, pullSession.UniqueKey(), inURL) + + err := pullSession.Pull(inURL, func(msg base.RTMPMsg) { + m := msg.Clone() + rtmpMsgQ <- m + }) + if err != nil { + nazalog.Errorf("[%s] pull error. [%s] err=%+v", tunnelUK, pullSession.UniqueKey(), err) + return err + } + nazalog.Infof("[%s] pull succ. [%s]", tunnelUK, pullSession.UniqueKey()) + + go func(u string, s *rtmp.PullSession) { + err := <-s.Wait() + nazalog.Errorf("[%s] pull wait error. [%s] err=%+v", tunnelUK, s.UniqueKey(), err) + errChan <- err + }(inURL, pullSession) + + debugWriteCount := 0 + maxDebugWriteCount := 5 + for { + select { + case err := <-errChan: + nazalog.Errorf("[%s] errChan. err=%+v", tunnelUK, err) + return err + case msg := <-rtmpMsgQ: + currHeader := remux.MakeDefaultRTMPHeader(msg.Header) + chunks := rtmp.Message2Chunks(msg.Payload, &currHeader) + if debugWriteCount < maxDebugWriteCount { + nazalog.Infof("[%s] write. header=%+v, %+v, %s", tunnelUK, msg.Header, currHeader, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 32))) + debugWriteCount++ + } + for _, pushSession := range pushSessionList { + err := pushSession.AsyncWrite(chunks) + if err != nil { + nazalog.Errorf("[%s] write error. err=%+v", tunnelUK, err) + return err + } + } + } + } +} diff --git a/app/demo/pullrtsp/pullrtsp.go b/app/demo/pullrtsp/pullrtsp.go index 2f6460a..3c887ad 100644 --- a/app/demo/pullrtsp/pullrtsp.go +++ b/app/demo/pullrtsp/pullrtsp.go @@ -102,7 +102,7 @@ func parseFlag() (inURL string, outFilename string, overTCP int) { _, _ = fmt.Fprintf(os.Stderr, `Example: %s -i rtsp://localhost:5544/live/test110 -o out.flv -t 0 %s -i rtsp://localhost:5544/live/test110 -o out.flv -t 1 -`, os.Args[0], os.Args[1]) +`, os.Args[0], os.Args[0]) base.OSExitAndWaitPressIfWindows(1) } return *i, *o, *t diff --git a/app/lalserver/main.go b/app/lalserver/main.go index 7d1076d..29d36c4 100644 --- a/app/lalserver/main.go +++ b/app/lalserver/main.go @@ -11,10 +11,11 @@ package main import ( "flag" "fmt" - "github.com/q191201771/naza/pkg/nazalog" "os" "path/filepath" + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/logic" @@ -45,7 +46,7 @@ func parseFlag() string { } // 运行参数中没有配置文件,尝试从几个默认位置读取 - nazalog.Warnf("config file not specify in command line, try to load some common config file in common path.") + nazalog.Warnf("config file did not specify in the command line, try to load it in the usual path.") defaultConfigFileList := []string{ filepath.FromSlash("lalserver.conf.json"), filepath.FromSlash("./conf/lalserver.conf.json"), @@ -66,7 +67,10 @@ func parseFlag() string { _, _ = fmt.Fprintf(os.Stderr, ` Example: %s -c %s -`, os.Args[0], filepath.FromSlash("./conf/lalserver.conf.json")) - base.OSExitAndWaitPressIfWindows(1) + +Github: %s +Doc: %s +`, os.Args[0], filepath.FromSlash("./conf/lalserver.conf.json"), base.LALGithubSite, base.LALDocSite) + base.OSExitAndWaitPressIfWindows(1) return *cf } diff --git a/conf/edge.conf.json b/conf/edge.conf.json index 3573e67..a987246 100644 --- a/conf/edge.conf.json +++ b/conf/edge.conf.json @@ -1,7 +1,7 @@ { "rtmp": { "enable": true, - "addr": ":19450", + "addr": ":1945", "gop_num": 2 }, "httpflv": { @@ -32,12 +32,12 @@ "relay_push": { "enable": true, "addr_list":[ - "127.0.0.1:19350" + "127.0.0.1:1935" ] }, "relay_pull": { "enable": true, - "addr": "127.0.0.1:19350" + "addr": "127.0.0.1:1935" }, "http_api": { "enable": true, diff --git a/conf/lalserver.conf.json b/conf/lalserver.conf.json index 648f942..c47ad6b 100644 --- a/conf/lalserver.conf.json +++ b/conf/lalserver.conf.json @@ -1,7 +1,7 @@ { "rtmp": { "enable": true, - "addr": ":19350", + "addr": ":1935", "gop_num": 2 }, "httpflv": { diff --git a/conf/lalserver.conf.json.brief b/conf/lalserver.conf.json.brief deleted file mode 100644 index 8535e8a..0000000 --- a/conf/lalserver.conf.json.brief +++ /dev/null @@ -1,70 +0,0 @@ -{ - "rtmp": { - "enable": true, // 是否开启rtmp服务的监听 - "addr": ":19350", // RTMP服务监听的端口,客户端向lalserver推拉流都是这个地址 - "gop_num": 2 // RTMP拉流的GOP缓存数量,加速秒开 - }, - "httpflv": { - "enable": true, // 是否开启HTTP-FLV服务的监听 - "sub_listen_addr": ":8080", // HTTP-FLV拉流地址 - "enable_https": true, // 是否开启HTTPS-FLV监听 - "https_addr": ":4433", // HTTPS-FLV拉流地址 - "https_cert_file": "./conf/cert.pem", // cert文件地址 - "https_key_file": "./conf/key.pem", // key文件地址 - "gop_num": 2 - }, - "hls": { - "enable": true, // 是否开启HLS服务的监听 - "sub_listen_addr": ":8081", // HLS监听地址 - "out_path": "/tmp/lal/hls/", // HLS文件保存根目录 - "fragment_duration_ms": 3000, // 单个TS文件切片时长,单位毫秒 - "fragment_num": 6, // M3U8文件列表中TS文件的数量 - "cleanup_flag": true // 输入流结束后,是否清理hls的文件。 - // 注意,如果为true,文件将在输入流结束后的 * * 2 的时间点清理 - }, - "httpts": { - "enable": true, // 是否开启HTTP-TS服务的监听。注意,这并不是HLS中的TS,而是在一条HTTP长连接上持续性传输TS流 - "sub_listen_addr": ":8082" // HTTP-TS拉流地址 - }, - "rtsp": { - "enable": true, // 是否开启rtsp服务的监听,目前只支持rtsp推流 - "addr": ":5544" // rtsp推流地址 - }, - "relay_push": { - "enable": false, // 是否开启中继转推功能,开启后,自身接收到的所有流都会转推出去 - "addr_list":[ // 中继转推的对端地址,支持填写多个地址,做1对n的转推。格式举例 "127.0.0.1:19351" - ] - }, - "relay_pull": { - "enable": false, // 是否开启回源拉流功能,开启后,当自身接收到拉流请求,而流不存在时,会从其他服务器拉取这个流到本地 - "addr": "" // 回源拉流的地址。格式举例 "127.0.0.1:19351" - }, - "http_api": { - "enable": true, // 是否开启HTTP API接口 - "addr": ":8083" // 监听地址 - }, - "server_id": "1", // 当前lalserver唯一ID。多个lalserver HTTP Notify同一个地址时,可通过该ID区分 - "http_notify": { - "enable": true, // 是否开启HTTP Notify事件回调 - "update_interval_sec": 5, // update事件回调间隔,单位毫秒 - "on_server_start": "http://127.0.0.1:10101/on_server_start", // 各事件HTTP Notify事件回调地址 - "on_update": "http://127.0.0.1:10101/on_update", - "on_pub_start": "http://127.0.0.1:10101/on_pub_start", - "on_pub_stop": "http://127.0.0.1:10101/on_pub_stop", - "on_sub_start": "http://127.0.0.1:10101/on_sub_start", - "on_sub_stop": "http://127.0.0.1:10101/on_sub_stop", - "on_rtmp_connect": "http://127.0.0.1:10101/on_rtmp_connect" - }, - "pprof": { - "enable": true, // 是否开启Go pprof web服务的监听 - "addr": ":8084" // Go pprof web地址 - }, - "log": { - "level": 1, // 日志级别,1 debug, 2 info, 3 warn, 4 error, 5 fatal - "filename": "./logs/lalserver.log", // 日志输出文件 - "is_to_stdout": true, // 是否打印至标志控制台输出 - "is_rotate_daily": true, // 日志按天翻滚 - "short_file_flag": true, // 日志末尾是否携带源码文件名以及行号的信息 - "assert_behavior": 1 // 日志断言的行为,1 只打印错误日志 2 打印并退出程序 3 打印并panic - } -} diff --git a/conf/lalserver.conf.json.tmpl b/conf/lalserver.conf.json.tmpl index f3edb41..0bab291 100644 --- a/conf/lalserver.conf.json.tmpl +++ b/conf/lalserver.conf.json.tmpl @@ -1,7 +1,7 @@ { "rtmp": { "enable": true, - "addr": ":19350", + "addr": ":1935", "gop_num": 2 }, "httpflv": { diff --git a/conf/node2.conf.json b/conf/node2.conf.json index 6084da5..e91b5b2 100644 --- a/conf/node2.conf.json +++ b/conf/node2.conf.json @@ -1,7 +1,7 @@ { "rtmp": { "enable": true, - "addr": ":19550", + "addr": ":1955", "gop_num": 2 }, "httpflv": { diff --git a/pkg/base/rtmp_t.go b/pkg/base/rtmp_t.go index 715e098..a0c4c52 100644 --- a/pkg/base/rtmp_t.go +++ b/pkg/base/rtmp_t.go @@ -106,3 +106,10 @@ func (msg RTMPMsg) IsVideoKeyNALU() bool { func (msg RTMPMsg) IsAACSeqHeader() bool { return msg.Header.MsgTypeID == RTMPTypeIDAudio && (msg.Payload[0]>>4) == RTMPSoundFormatAAC && msg.Payload[1] == RTMPAACPacketTypeSeqHeader } + +func (msg RTMPMsg) Clone() (ret RTMPMsg) { + ret.Header = msg.Header + ret.Payload = make([]byte, len(msg.Payload)) + copy(ret.Payload, msg.Payload) + return +} diff --git a/pkg/base/session.go b/pkg/base/session.go index 9f6667c..58c9125 100644 --- a/pkg/base/session.go +++ b/pkg/base/session.go @@ -43,12 +43,6 @@ type ISessionURLContext interface { // | struct | ServerSession | ServerSession | PushSession/ClientSession | PullSession/ClientSession | // // -// | . | rtsp pub | rtsp sub | rtsp pull | rtsp push | -// | - | - | - | - | - | -// | file | server_pub_session.go | server_sub_session.go | client_pull_session.go | client_push_session.go | -// | struct | PubSession/ServerCommandSession/BaseInSession | SubSession/ServerCommandSession | PullSession/BaseInSession | PushSession | -// -// // | . | flv sub | flv pull | // | - | - | - | // | file | server_sub_session.go | client_pull_session.go | @@ -61,27 +55,27 @@ type ISessionURLContext interface { // | struct | SubSession | // // -// | . | rtmppub | rtsppub | rtmpsub | flvsub | tssub | rtspsub | - | rtmppush | rtmppull | flvpull | rtsppull | hls | -// | - | - | - | - | - | - | - | - | - | - | - | - | | -// | x | x | x | x | x | x | x | - | x | x | x | x | | -// | UniqueKey | √ | √ | √ | √ | √ | √ | - | x√ | x√ | √ | √ | | +// | . | rtmppub | rtsppub | rtmpsub | flvsub | tssub | rtspsub | - | rtmppush | rtmppull | flvpull | rtsppull | rtsppush | hls | +// | - | - | - | - | - | - | - | - | - | - | - | - | | | +// | x | x | x | x | x | x | x | - | x | x | x | x | | | +// | UniqueKey | √ | √ | √ | √ | √ | √ | - | x√ | x√ | √ | √ | | | -// | AppName() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | -// | StreamName() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | -// | RawQuery() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | +// | AppName() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | | +// | StreamName() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | | +// | RawQuery() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | | -// | GetStat() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | -// | UpdateStat() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | -// | IsAlive() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | +// | GetStat() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | | +// | UpdateStat() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | | +// | IsAlive() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | | -// | RunLoop() | √ | x√ | √ | √ | √ | x&√ | - | x | x | x | x | | -// | Dispose() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | +// | RunLoop() | √ | x√ | √ | √ | √ | x&√ | - | x | x | x | x | | | +// | Dispose() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | | -// | RemoteAddr() | √ | x | √ | √ | x | x | - | x | x | x | x | | -// | SingleConn | √ | x | √ | √ | √ | √ | - | √ | √ | √ | x | | +// | RemoteAddr() | √ | x | √ | √ | x | x | - | x | x | x | x | | | +// | SingleConn | √ | x | √ | √ | √ | √ | - | √ | √ | √ | x | | | // -// | Opt.PullTimeoutMS | - | - | - | - | - | - | - | - | x | √ | √ | | -// | Wait() | - | - | - | - | - | - | - | - | √ | √ | √ | | +// | Opt.Push/PullTimeoutMS | - | - | - | - | - | - | - | √ | √ | √ | √ | √ | | +// | Wait() | - | - | - | - | - | - | - | √ | √ | √ | √ | √ | | // // Dispose由外部调用,表示主动关闭正常的session // 外部调用Dispose后,不应继续使用该session diff --git a/pkg/base/version.go b/pkg/base/version.go index a057495..0aba89d 100644 --- a/pkg/base/version.go +++ b/pkg/base/version.go @@ -20,10 +20,12 @@ const LALVersion = "v0.18.0" var ( LALLibraryName = "lal" - LALGitHubRepo = "github.com/q191201771/lal" + LALGithubRepo = "github.com/q191201771/lal" + LALGithubSite = "https://github.com/q191201771/lal" + LALDocSite = "https://pengrl.com/lal" // e.g. lal v0.12.3 (github.com/q191201771/lal) - LALFullInfo = LALLibraryName + " " + LALVersion + " (" + LALGitHubRepo + ")" + LALFullInfo = LALLibraryName + " " + LALVersion + " (" + LALGithubRepo + ")" // e.g. 0.12.3 LALVersionDot string @@ -35,7 +37,7 @@ var ( var ( // 植入rtmp握手随机字符串中 // e.g. lal v0.12.3 (github.com/q191201771/lal) - LALRTMPHandshakeWaterMark string + //LALRTMPHandshakeWaterMark string // 植入rtmp server中的connect result信令中 // 注意,有两个object,第一个object中的fmsVer我们保持通用公认的值,在第二个object中植入 @@ -121,5 +123,5 @@ func init() { LALHTTPFLVPullSessionUA = LALLibraryName + "/" + LALVersionDot LALRTSPPullSessionUA = LALLibraryName + "/" + LALVersionDot - LALRTMPHandshakeWaterMark = LALFullInfo + //LALRTMPHandshakeWaterMark = LALFullInfo } diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 5e16127..b101806 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -128,7 +128,9 @@ func (session *PullSession) pullContext(ctx context.Context, rawURL string, onRe func (session *PullSession) Dispose() { nazalog.Infof("[%s] lifecycle dispose httpflv PullSession.", session.UniqueKey) - _ = session.conn.Close() + if session.conn != nil { + _ = session.conn.Close() + } } func (session *PullSession) UpdateStat(interval uint32) { diff --git a/pkg/logic/entry.go b/pkg/logic/entry.go index 96d8f3c..2c767ed 100644 --- a/pkg/logic/entry.go +++ b/pkg/logic/entry.go @@ -30,6 +30,8 @@ func Entry(confFile string) { initLog(config.LogConfig) nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine()) nazalog.Infof("version: %s", base.LALFullInfo) + nazalog.Infof("github: %s", base.LALGithubSite) + nazalog.Infof("doc: %s", base.LALDocSite) sm = NewServerManager() diff --git a/pkg/logic/group.go b/pkg/logic/group.go index b3daa92..6d438b7 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -687,6 +687,8 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { lrm2ft LazyRTMPMsg2FLVTag ) + //nazalog.Debugf("[%s] broadcaseRTMP. header=%+v, %s", group.UniqueKey, msg.Header, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 7))) + // # 0. hls if config.HLSConfig.Enable && group.hlsMuxer != nil { group.hlsMuxer.FeedRTMPMessage(msg) @@ -695,7 +697,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { // # 1. 设置好用于发送的 rtmp 头部信息 currHeader := remux.MakeDefaultRTMPHeader(msg.Header) if currHeader.MsgLen != uint32(len(msg.Payload)) { - nazalog.Errorf("diff. msgLen=%d, payload len=%d, %+v", currHeader.MsgLen, len(msg.Payload), msg.Header) + nazalog.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, currHeader.MsgLen, len(msg.Payload), msg.Header) } // # 2. 懒初始化rtmp chunk切片,以及httpflv转换 @@ -708,12 +710,15 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { if session.IsFresh { // TODO chef: 头信息和full gop也可以在SubSession刚加入时发送 if group.gopCache.Metadata != nil { + //nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey) _ = session.AsyncWrite(group.gopCache.Metadata) } if group.gopCache.VideoSeqHeader != nil { + //nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey) _ = session.AsyncWrite(group.gopCache.VideoSeqHeader) } if group.gopCache.AACSeqHeader != nil { + //nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey) _ = session.AsyncWrite(group.gopCache.AACSeqHeader) } for i := 0; i < group.gopCache.GetGOPCount(); i++ { diff --git a/pkg/rtmp/chunk_composer.go b/pkg/rtmp/chunk_composer.go index 059688e..8cb75f8 100644 --- a/pkg/rtmp/chunk_composer.go +++ b/pkg/rtmp/chunk_composer.go @@ -166,7 +166,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { stream.header.TimestampAbs += stream.timestamp } absTsFlag = false - //nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v", fmt, csid, stream.header) + //nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, c=%p", fmt, csid, stream.header, c) if err := cb(stream); err != nil { return err diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 91253aa..50f09a6 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -135,7 +135,9 @@ func (s *ClientSession) Flush() error { func (s *ClientSession) Dispose() { nazalog.Infof("[%s] lifecycle dispose rtmp ClientSession.", s.UniqueKey) - _ = s.conn.Close() + if s.conn != nil { + _ = s.conn.Close() + } } func (s *ClientSession) AppName() string { @@ -210,7 +212,7 @@ func (s *ClientSession) doContext(ctx context.Context, rawURL string) error { return } - nazalog.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName()) + nazalog.Infof("[%s] > W connect('%s'). tcUrl=%s", s.UniqueKey, s.appName(), s.tcURL()) if err := s.packer.writeConnect(s.conn, s.appName(), s.tcURL(), s.t == CSTPushSession); err != nil { errChan <- err return @@ -251,6 +253,7 @@ func (s *ClientSession) streamNameWithRawQuery() string { } func (s *ClientSession) tcpConnect() error { + nazalog.Infof("[%s] > tcp connect.", s.UniqueKey) var err error s.stat.RemoteAddr = s.urlCtx.HostWithPort diff --git a/pkg/rtmp/handshake.go b/pkg/rtmp/handshake.go index c5916a9..969a789 100644 --- a/pkg/rtmp/handshake.go +++ b/pkg/rtmp/handshake.go @@ -12,12 +12,9 @@ import ( "bytes" "crypto/hmac" "crypto/sha256" - "fmt" "io" "time" - "github.com/q191201771/lal/pkg/base" - "github.com/q191201771/naza/pkg/bele" "github.com/q191201771/naza/pkg/nazalog" ) @@ -294,6 +291,6 @@ func random1528(out []byte) { func init() { random1528Buf = make([]byte, 1528) - hack := fmt.Sprintf("random buf of rtmp handshake gen by %s", base.LALRTMPHandshakeWaterMark) - copy(random1528Buf, []byte(hack)) + //hack := fmt.Sprintf("random buf of rtmp handshake gen by %s", base.LALRTMPHandshakeWaterMark) + //copy(random1528Buf, []byte(hack)) } diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index d17d15c..ab648fe 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -330,7 +330,11 @@ func (s *ServerSession) doConnect(tid int, stream *Stream) error { if err != nil { return err } - nazalog.Infof("[%s] < R connect('%s').", s.UniqueKey, s.appName) + tcUrl, err := val.FindString("tcUrl") + if err != nil { + nazalog.Warnf("[%s] tcUrl not exist.", s.UniqueKey) + } + nazalog.Infof("[%s] < R connect('%s'). tcUrl=%s", s.UniqueKey, s.appName, tcUrl) s.observer.OnRTMPConnect(s, val) diff --git a/pkg/rtsp/client_command_session.go b/pkg/rtsp/client_command_session.go index bddf21f..345b3e7 100644 --- a/pkg/rtsp/client_command_session.go +++ b/pkg/rtsp/client_command_session.go @@ -129,6 +129,9 @@ func (session *ClientCommandSession) Wait() <-chan error { func (session *ClientCommandSession) Dispose() error { nazalog.Infof("[%s] lifecycle dispose rtsp ClientCommandSession. session=%p", session.UniqueKey, session) + if session.conn == nil { + return nil + } return session.conn.Close() } @@ -396,8 +399,15 @@ func (session *ClientCommandSession) writeOneSetup(setupURI string) error { return err } + var htv string + switch session.t { + case CCSTPushSession: + htv = fmt.Sprintf(HeaderTransportClientRecordTmpl, lRTPPort, lRTCPPort) + case CCSTPullSession: + htv = fmt.Sprintf(HeaderTransportClientPlayTmpl, lRTPPort, lRTCPPort) + } headers := map[string]string{ - HeaderTransport: fmt.Sprintf("RTP/AVP/UDP;unicast;client_port=%d-%d", lRTPPort, lRTCPPort), + HeaderTransport: htv, } ctx, err := session.writeCmdReadResp(MethodSetup, setupURI, headers, "") if err != nil { @@ -441,8 +451,15 @@ func (session *ClientCommandSession) writeOneSetupTCP(setupURI string) error { rtcpChannel := session.channel + 1 session.channel += 2 + var htv string + switch session.t { + case CCSTPushSession: + htv = fmt.Sprintf(HeaderTransportClientRecordTCPTmpl, rtpChannel, rtcpChannel) + case CCSTPullSession: + htv = fmt.Sprintf(HeaderTransportClientPlayTCPTmpl, rtpChannel, rtcpChannel) + } headers := map[string]string{ - HeaderTransport: fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", rtpChannel, rtcpChannel), + HeaderTransport: htv, } ctx, err := session.writeCmdReadResp(MethodSetup, setupURI, headers, "") if err != nil { @@ -458,7 +475,7 @@ func (session *ClientCommandSession) writeOneSetupTCP(setupURI string) error { func (session *ClientCommandSession) writePlay() error { headers := map[string]string{ - HeaderRange: "npt=0.000-", + HeaderRange: HeaderRangeDefault, } _, err := session.writeCmdReadResp(MethodPlay, session.urlCtx.RawURLWithoutUserInfo, headers, "") return err @@ -466,7 +483,7 @@ func (session *ClientCommandSession) writePlay() error { func (session *ClientCommandSession) writeRecord() error { headers := map[string]string{ - HeaderRange: "npt=0.000-", + HeaderRange: HeaderRangeDefault, } _, err := session.writeCmdReadResp(MethodRecord, session.urlCtx.RawURLWithoutUserInfo, headers, "") return err diff --git a/pkg/rtsp/pack.go b/pkg/rtsp/pack.go index 20cdc8c..d81df50 100644 --- a/pkg/rtsp/pack.go +++ b/pkg/rtsp/pack.go @@ -44,18 +44,8 @@ var ResponseDescribeTmpl = "RTSP/1.0 200 OK\r\n" + "%s" // rfc2326 10.4 SETUP -// TODO chef: mode=record,这个是咋作用,是应该pub有sub没有吗,我的pack实现没有严格区分 - -// CSeq, Date, Session, Transport(client_port, server_rtp_port, server_rtcp_port) -var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\n" + - "CSeq: %s\r\n" + - "Date: %s\r\n" + - "Session: %s\r\n" + - "Transport:RTP/AVP/UDP;unicast;client_port=%d-%d;server_port=%d-%d\r\n" + - "\r\n" - // CSeq, Date, Session, Transport -var ResponseSetupTCPTmpl = "RTSP/1.0 200 OK\r\n" + +var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\n" + "CSeq: %s\r\n" + "Date: %s\r\n" + "Session: %s\r\n" + @@ -100,22 +90,10 @@ func PackResponseDescribe(cseq, sdp string) string { return fmt.Sprintf(ResponseDescribeTmpl, cseq, date, len(sdp), sdp) } -// @param transportC: -// pub example: -// RTP/AVP/UDP;unicast;client_port=24254-24255;mode=record -// RTP/AVP/UDP;unicast;client_port=24256-24257;mode=record -// sub example: -// RTP/AVP/UDP;unicast;client_port=9420-9421 -func PackResponseSetup(cseq string, rRTPPort, rRTCPPort, lRTPPort, lRTCPPort uint16) string { - date := time.Now().Format(time.RFC1123) - - return fmt.Sprintf(ResponseSetupTmpl, cseq, date, sessionID, rRTPPort, rRTCPPort, lRTPPort, lRTCPPort) -} - -func PackResponseSetupTCP(cseq string, ts string) string { +func PackResponseSetup(cseq string, htv string) string { date := time.Now().Format(time.RFC1123) - return fmt.Sprintf(ResponseSetupTCPTmpl, cseq, date, sessionID, ts) + return fmt.Sprintf(ResponseSetupTmpl, cseq, date, sessionID, htv) } func PackResponseRecord(cseq string) string { diff --git a/pkg/rtsp/rtsp.go b/pkg/rtsp/rtsp.go index aea32ba..8b1e8be 100644 --- a/pkg/rtsp/rtsp.go +++ b/pkg/rtsp/rtsp.go @@ -44,16 +44,25 @@ const ( HeaderAccept = "Accept" HeaderUserAgent = "User-Agent" HeaderCSeq = "CSeq" + HeaderContentLength = "Content-Length" HeaderTransport = "Transport" HeaderSession = "Session" HeaderRange = "Range" - HeaderContentLength = "Content-Length" HeaderWWWAuthenticate = "WWW-Authenticate" HeaderAuthorization = "Authorization" HeaderPublic = "Public" // header value - HeaderAcceptApplicationSDP = "application/sdp" + HeaderAcceptApplicationSDP = "application/sdp" + HeaderRangeDefault = "npt=0.000-" + HeaderTransportClientPlayTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d" // localRTPPort, localRTCPPort + HeaderTransportClientPlayTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d" // rtpChannel, rtcpChannel + HeaderTransportClientRecordTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;mode=record" + HeaderTransportClientRecordTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=record" + HeaderTransportServerPlayTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;server_port=%d-%d" + //HeaderTransportServerPlayTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d" + HeaderTransportServerRecordTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;server_port=%d-%d;mode=record" + //HeaderTransportServerRecordTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=record" ) const ( diff --git a/pkg/rtsp/server_command_session.go b/pkg/rtsp/server_command_session.go index d94c693..f0bd8d5 100644 --- a/pkg/rtsp/server_command_session.go +++ b/pkg/rtsp/server_command_session.go @@ -10,6 +10,7 @@ package rtsp import ( "bufio" + "fmt" "net" "strings" @@ -251,9 +252,9 @@ func (session *ServerCommandSession) handleSetup(requestCtx nazahttp.HTTPReqMsgC host, _, _ := net.SplitHostPort(remoteAddr) // 是否为interleaved模式 - ts := requestCtx.Headers[HeaderTransport] - if strings.Contains(ts, TransportFieldInterleaved) { - rtpChannel, rtcpChannel, err := parseRTPRTCPChannel(ts) + htv := requestCtx.Headers[HeaderTransport] + if strings.Contains(htv, TransportFieldInterleaved) { + rtpChannel, rtcpChannel, err := parseRTPRTCPChannel(htv) if err != nil { nazalog.Errorf("[%s] parse rtp rtcp channel error. err=%+v", session.UniqueKey, err) return err @@ -273,7 +274,7 @@ func (session *ServerCommandSession) handleSetup(requestCtx nazahttp.HTTPReqMsgC return ErrRTSP } - resp := PackResponseSetupTCP(requestCtx.Headers[HeaderCSeq], ts) + resp := PackResponseSetup(requestCtx.Headers[HeaderCSeq], htv) _, err = session.conn.Write([]byte(resp)) return err } @@ -296,17 +297,19 @@ func (session *ServerCommandSession) handleSetup(requestCtx nazahttp.HTTPReqMsgC nazalog.Errorf("[%s] setup conn error. err=%+v", session.UniqueKey, err) return err } + htv = fmt.Sprintf(HeaderTransportServerRecordTmpl, rRTPPort, rRTCPPort, lRTPPort, lRTCPPort) } else if session.subSession != nil { if err = session.subSession.SetupWithConn(requestCtx.URI, rtpConn, rtcpConn); err != nil { nazalog.Errorf("[%s] setup conn error. err=%+v", session.UniqueKey, err) return err } + htv = fmt.Sprintf(HeaderTransportServerPlayTmpl, rRTPPort, rRTCPPort, lRTPPort, lRTCPPort) } else { nazalog.Errorf("[%s] setup but session not exist.", session.UniqueKey) return ErrRTSP } - resp := PackResponseSetup(requestCtx.Headers[HeaderCSeq], rRTPPort, rRTCPPort, lRTPPort, lRTCPPort) + resp := PackResponseSetup(requestCtx.Headers[HeaderCSeq], htv) _, err = session.conn.Write([]byte(resp)) return err }