
- [doc] 启动lal官方文档页:
- [doc] 新增文档《rtmp url,以及vhost》:
- [feat] demo,新增/app/demo/pullrtmp2pushrtmp,从远端服务器拉取RTMP流,并使用RTMP转推出去,支持1对n转推
- [fix] rtsp,setup信令header中的transport字段区分record和play,record时添加mode=record
<p align="center">
<a title="logo" target="_blank" href="">
<img alt="Live And Live" src="">
</a>
<br>
<a title="TravisCI" target="_blank" href=""><img src=""></a> <a title="TravisCI" target="_blank" href=""><img src=""></a>
<br> <br>
<a title="codeline" target="_blank" href=""><img src=""></a> <a title="codeline" target="_blank" href=""><img src=""></a>
<a title="license" target="_blank" href=""><img src=""></a> <a title="license" target="_blank" href=""><img src=""></a>
<a title="lastcommit" target="_blank" href=""><img src=""></a>
<br> <br>
<a title="pr" target="_blank" href=""><img src=""></a>
<a title="hits" target="_blank" href=""><img src=""></a> <a title="hits" target="_blank" href=""><img src=""></a>
<a title="toplanguage" target="_blank" href=""><img src=""></a> <a title="toplanguage" target="_blank" href=""><img src=""></a>
<br> <br>
--- ---
lal是一个开源GoLang直播流媒体网络传输项目包含三个主要组成部分
- lalserver接收客户端推流转发给对应的拉流客户端也即承担直播场景中的源站、CDN边缘分发等角色。类似于`nginx-rtmp-module`、crtmpserver等应用。 - lalserver流媒体转发服务器。类似于`nginx-rtmp-module`等应用,但支持更多的协议,提供更丰富的功能。[lalserver简介](
- demo一些小应用比如推、拉流客户端压测工具流分析工具调度示例程序等。类似于ffmpeg、ffprobe等应用以及提供给业务方的使用示例。 - demo一些小应用比如推、拉流客户端压测工具流分析工具调度示例程序等。类似于ffmpeg、ffprobe等应用。[Demo简介](
- pkg流媒体协议部分业务方可使用它编写自身的应用。类似于ffmpeg的libavformat等库。 - pkg流媒体协议库。类似于ffmpeg的libavformat等库。
**`app/lalserver`服务器支持的协议:** **lal源码package架构图**
| - | sub rtmp | sub http(s)-flv | sub http-ts | sub hls | sub rtsp | relay push rtmp | ![lal源码package架构图](
| - | - | - | - | - | - | - |
| pub rtmp | ✔ | ✔ | ✔ | ✔ | X | ✔ | **lalserver特性图**
| pub rtsp | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ |
| relay pull rtmp | ✔ | ✔ | ✔ | ✔ | X | . | ![lalserver特性图](
| 编码类型 | rtmp | rtsp | hls | http(s)-flv | http-ts | 了解更多请访问:
| - | - | - | - | - | - |
| aac | ✔ | ✔ | ✔ | ✔ | ✔ | * [lal github地址](
| avc/h264 | ✔ | ✔ | ✔ | ✔ | ✔ | * [lal 官方文档](
| hevc/h265 | ✔ | ✔ | X | ✔ | X | * **/lalserver/**
* [简介](
表格含义见: [《流媒体传输连接类型之session client, server, pub, sub, push, pull》]( * [编译、运行、体验功能](
* [配置文件说明](
**`app/lalserver`功能特性:** * [HTTP API接口](
* [HTTP Notify(Callback/Webhook)事件回调](
- [x] **全平台**。(依托Go语言):支持`(linux/macOS/windows)`多平台开发、调试、运行。支持交叉编译。生成的可执行文件(无任何库依赖)可独立运行。(开放源码的同时)提供各平台可执行文件,可(免编译)直接运行 * [Demo简介](
- [x] **高性能**。多核多线程扩展 * [Changelog修改记录](
- [x] **多种直播流封装协议**。支持RTMP/RTSP/HTTP-FLV/HTTP-TS/HLS不同封装协议支持相互转换 * [github star趋势图](
- [x] **多种编码格式**。视频支持H264/AVCH265/HEVC音频支持AAC * [第三方依赖](
- [x] **录制**。支持HLS录制(HLS直播与录制可同时开启) * [联系作者](
- [x] **HTTPS**。支持HTTPS-FLV拉流 * **/技术文档/**
- [x] **RTSP**。支持interleaved模式。支持digest auth验证。支持`GET_PARAMETER`。 * [常见推拉流客户端使用方式](
- [x] **HTTP API接口**。用于获取服务信息,向服务发送命令。见[《lal流媒体服务器的HTTP API接口》]( * [连接类型之session pub/sub/push/pull](
- [x] **HTTP Notify事件回调**。见[《lal HTTP Notify(or Callback or Webhook)事件回调》]( * [rtmp url以及vhost](
- [x] **分布式集群**。 * [FAQ](
- [x] **静态pull回源**。通过配置文件配置回源地址 * **/待整理/**
- [x] **静态push转推**。支持转推多个地址。通过配置文件配置转推地址 * [性能测试](
- [x] **CORS跨域**。支持HTTP-FLVHTTP-TSHLS跨域拉流 * [图稿](
- [x] **HTTP文件服务器**。比如HLS切片文件可直接播放不需要额外的HTTP文件服务器
联系作者:
-
- 微信q191201771
<img alt="Live And Live" src=""> - QQ191201771
### 编译,运行,体验功能
#### 编译
$export GO111MODULE=on && export GOPROXY=,,direct
[点我打开《github lal最新release版本页面》](下载对应平台编译好的二进制可执行文件的zip压缩包。
#### 运行
# 注意windows平台将路径分隔符`/`换成`\`
$./bin/lalserver -c conf/lalserver.conf.json
#### 体验功能
快速体验lalserver服务器见 [《常见推拉流客户端软件的使用方式》](
lalserver详细配置见 [《配置注释文档》](
### 第三方依赖
我自己写的Go基础库 [](
### 联系我
- 个人微信号: q191201771
- 个人QQ号 191201771
- 微信群: 加我微信好友后,告诉我拉你进群 - 微信群: 加我微信好友后,告诉我拉你进群
- QQ群 1090510973 - QQ群 1090510973
### 性能测试,测试过的第三方客户端
### 项目star趋势图
觉得项目还不错就点个star支持一下吧 :)
[![Stargazers over time](](

### 性能测试
| 推流数量 | CPU占用 | 内存占用RES |
| - | - | - |
| 1000 | 占单个核的16% | 104MB |
| 拉流数量 | CPU占用 | 内存占用RES |
| - | - | - |
| 1000 | 占单个核的30% | 120MB |
测试场景三: 持续推送n路RTMP流至lalserver使用RTMP协议从lalserver拉取n路流推拉流为1对1的关系
| 推流数量 | 拉流数量 | CPU占用 | 内存占用RES |
| - | - | - | - |
| 1000 | 1000 | 125% | 464MB |
* 测试机32核16Glalserver服务器和压测工具同时跑在这一个机器上
* 压测工具lal中的 `/app/demo/pushrtmp` 以及 `/app/demo/pullrtmp`
* 推流码率:使用`srs-bench`中的FLV文件大概200kbps
* lalserver版本基于 git commit: xxx
### 测试过的第三方客户端
- OBS 21.0.3(macos)
- OBS 24.0.3(win10 64 bit)
- ffmpeg 3.4.2(macos)
- srs-bench (macos srs项目配套的一个压测工具)
- pushrtmp (macos lal demo中的RTMP推流客户端)
- VLC 3.0.8(macos 10.15.1)
- VLC 3.0.8(win10)
- MPV 0.29.1(macos)
- ffmpeg 3.4.2(macos)
- srs-bench (macos srs项目配套的一个压测工具)

| demo | push rtmp | push rtsp | pull rtmp | pull httpflv | pull rtsp | 说明 |
| - | - | - | - | - | - | - |
| pushrtmp | ✔ | . | . | . | . | RTMP推流客户端压力测试工具 |
| pullrtmp | . | . | ✔ | . | . | RTMP拉流客户端压力测试工具 |
| pullrtmp2hls | . | . | ✔ | . | . | 从远端服务器拉取RTMP流存储为本地m3u8+ts文件 |
| pullhttpflv | . | . | . | ✔ | . | HTTP-FLV拉流客户端 |
| pullrtsp | . | . | . | . | ✔ | RTSP拉流客户端 |
| pullrtsp2pushrtsp | . | ✔ | . | . | ✔ | RTSP拉流并使用RTSP转推出去 |
| pullrtsp2pushrtmp | ✔ | . | . | . | ✔ | RTSP拉流并使用RTMP转推出去 |
| analyseflv | . | . | . | ✔ | . | 拉取HTTP-FLV流并进行分析 |
| demo | 说明 |
| dispatch | 简单演示如何实现一个简单的调度服务使得多个lalserver节点可以组成一个集群 |
| flvfile2es | 将本地FLV文件分离成H264/AVC和AAC的ES流文件 |
| modflvfile | 修改flv文件的一些信息比如某些tag的时间戳后另存文件 |

// Copyright 2021, Chef. All rights reserved.
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
// Author: Chef (
package main
import (
func main() {
i := flag.String("i", "", "specify pull rtmp url")
o := flag.String("o", "", "specify push rtmp url list, separated by a comma")
if *i == "" || *o == "" {
_, _ = fmt.Fprintf(os.Stderr, `Example:
%s -i rtmp:// -o rtmp://
%s -i rtmp:// -o rtmp://,rtmp://
`, os.Args[0], os.Args[0])
ol := strings.Split(*o, ",")
PullRTMP2PushRTMP(*i, ol)
time.Sleep(60 * time.Second)

// Copyright 2021, Chef. All rights reserved.
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
// Author: Chef (
package main
import (
// 拉取一路rtmp流并使用rtmp转推出去可1对n转推
// 阻塞直至拉流或者(任意一路)推流失败或结束
// @param inURL 拉流rtmp url地址
// @param outURL 推流rtmp url地址列表
func PullRTMP2PushRTMP(inURL string, outURLList []string) error {
const (
pullTimeoutMS = 10000
pushTimeoutMS = 10000
tunnelUK := unique.GenUniqueKey("TUNNEL")
nazalog.Infof("[%s] new tunnel. inURL=%s, outURLList=%+v", tunnelUK, inURL, outURLList)
errChan := make(chan error, len(outURLList)+1)
rtmpMsgQ := make(chan base.RTMPMsg, 1024)
var pullSession *rtmp.PullSession
var pushSessionList []*rtmp.PushSession
defer func() {
if pullSession != nil {
nazalog.Infof("[%s] dispose pull session. [%s]", tunnelUK, pullSession.UniqueKey())
for _, s := range pushSessionList {
nazalog.Infof("[%s] dispose push session. [%s]", tunnelUK, s.UniqueKey())
for _, outURL := range outURLList {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMS = pushTimeoutMS
nazalog.Infof("[%s] start push. [%s] url=%s", tunnelUK, pushSession.UniqueKey(), outURL)
err := pushSession.Push(outURL)
if err != nil {
nazalog.Errorf("[%s] push error. [%s] err=%+v", tunnelUK, pushSession.UniqueKey(), err)
return err
nazalog.Infof("[%s] push succ. [%s]", tunnelUK, pushSession.UniqueKey())
pushSessionList = append(pushSessionList, pushSession)
go func(u string, s *rtmp.PushSession) {
err := <-s.Wait()
nazalog.Errorf("[%s] push wait error. [%s] err=%+v", tunnelUK, s.UniqueKey(), err)
errChan <- err
}(outURL, pushSession)
pullSession = rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMS = pullTimeoutMS
nazalog.Infof("[%s] start pull. [%s] url=%s", tunnelUK, pullSession.UniqueKey(), inURL)
err := pullSession.Pull(inURL, func(msg base.RTMPMsg) {
m := msg.Clone()
rtmpMsgQ <- m
if err != nil {
nazalog.Errorf("[%s] pull error. [%s] err=%+v", tunnelUK, pullSession.UniqueKey(), err)
return err
nazalog.Infof("[%s] pull succ. [%s]", tunnelUK, pullSession.UniqueKey())
go func(u string, s *rtmp.PullSession) {
err := <-s.Wait()
nazalog.Errorf("[%s] pull wait error. [%s] err=%+v", tunnelUK, s.UniqueKey(), err)
errChan <- err
}(inURL, pullSession)
debugWriteCount := 0
maxDebugWriteCount := 5
for {
select {
case err := <-errChan:
nazalog.Errorf("[%s] errChan. err=%+v", tunnelUK, err)
return err
case msg := <-rtmpMsgQ:
currHeader := remux.MakeDefaultRTMPHeader(msg.Header)
chunks := rtmp.Message2Chunks(msg.Payload, &currHeader)
if debugWriteCount < maxDebugWriteCount {
nazalog.Infof("[%s] write. header=%+v, %+v, %s", tunnelUK, msg.Header, currHeader, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 32)))
for _, pushSession := range pushSessionList {
err := pushSession.AsyncWrite(chunks)
if err != nil {
nazalog.Errorf("[%s] write error. err=%+v", tunnelUK, err)
return err

_, _ = fmt.Fprintf(os.Stderr, `Example: _, _ = fmt.Fprintf(os.Stderr, `Example:
%s -i rtsp://localhost:5544/live/test110 -o out.flv -t 0 %s -i rtsp://localhost:5544/live/test110 -o out.flv -t 0
%s -i rtsp://localhost:5544/live/test110 -o out.flv -t 1 %s -i rtsp://localhost:5544/live/test110 -o out.flv -t 1
`, os.Args[0], os.Args[0])
base.OSExitAndWaitPressIfWindows(1) base.OSExitAndWaitPressIfWindows(1)
} }
return *i, *o, *t return *i, *o, *t

import ( import (
"flag" "flag"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"" ""
"" ""
} }
// 运行参数中没有配置文件,尝试从几个默认位置读取 // 运行参数中没有配置文件,尝试从几个默认位置读取
nazalog.Warnf("config file not specify in command line, try to load some common config file in common path.") nazalog.Warnf("config file did not specify in the command line, try to load it in the usual path.")
defaultConfigFileList := []string{ defaultConfigFileList := []string{
filepath.FromSlash("lalserver.conf.json"), filepath.FromSlash("lalserver.conf.json"),
filepath.FromSlash("./conf/lalserver.conf.json"), filepath.FromSlash("./conf/lalserver.conf.json"),
_, _ = fmt.Fprintf(os.Stderr, ` _, _ = fmt.Fprintf(os.Stderr, `
Example: Example:
%s -c %s %s -c %s
`, os.Args[0], filepath.FromSlash("./conf/lalserver.conf.json"))
base.OSExitAndWaitPressIfWindows(1) Github: %s
Doc: %s
`, os.Args[0], filepath.FromSlash("./conf/lalserver.conf.json"), base.LALGithubSite, base.LALDocSite)
return *cf return *cf
} }

{ {
"rtmp": { "rtmp": {
"enable": true, "enable": true,
"addr": ":1945",
"gop_num": 2 "gop_num": 2
}, },
"httpflv": { "httpflv": {
"relay_push": { "relay_push": {
"enable": true, "enable": true,
"addr_list":[ "addr_list":[
""
] ]
}, },
"relay_pull": { "relay_pull": {
"enable": true, "enable": true,
"addr": ""
}, },
"http_api": { "http_api": {
"enable": true, "enable": true,

{ {
"rtmp": { "rtmp": {
"enable": true, "enable": true,
"addr": ":1935",
"gop_num": 2 "gop_num": 2
}, },
"httpflv": { "httpflv": {

{ {
"rtmp": { "rtmp": {
"enable": true, "enable": true,
"addr": ":1935",
"gop_num": 2 "gop_num": 2
}, },
"httpflv": { "httpflv": {

{ {
"rtmp": { "rtmp": {
"enable": true, "enable": true,
"addr": ":1955",
"gop_num": 2 "gop_num": 2
}, },
"httpflv": { "httpflv": {

func (msg RTMPMsg) IsAACSeqHeader() bool { func (msg RTMPMsg) IsAACSeqHeader() bool {
return msg.Header.MsgTypeID == RTMPTypeIDAudio && (msg.Payload[0]>>4) == RTMPSoundFormatAAC && msg.Payload[1] == RTMPAACPacketTypeSeqHeader return msg.Header.MsgTypeID == RTMPTypeIDAudio && (msg.Payload[0]>>4) == RTMPSoundFormatAAC && msg.Payload[1] == RTMPAACPacketTypeSeqHeader
} }
func (msg RTMPMsg) Clone() (ret RTMPMsg) {
ret.Header = msg.Header
ret.Payload = make([]byte, len(msg.Payload))
copy(ret.Payload, msg.Payload)

// | struct | ServerSession | ServerSession | PushSession/ClientSession | PullSession/ClientSession | // | struct | ServerSession | ServerSession | PushSession/ClientSession | PullSession/ClientSession |
// //
// //
// | . | rtsp pub | rtsp sub | rtsp pull | rtsp push |
// | - | - | - | - | - |
// | file | server_pub_session.go | server_sub_session.go | client_pull_session.go | client_push_session.go |
// | struct | PubSession/ServerCommandSession/BaseInSession | SubSession/ServerCommandSession | PullSession/BaseInSession | PushSession |
// | . | flv sub | flv pull | // | . | flv sub | flv pull |
// | - | - | - | // | - | - | - |
// | file | server_sub_session.go | client_pull_session.go | // | file | server_sub_session.go | client_pull_session.go |
@ -61,27 +55,27 @@ type ISessionURLContext interface {
// | struct | SubSession | // | struct | SubSession |
// //
// //
// | . | rtmppub | rtsppub | rtmpsub | flvsub | tssub | rtspsub | - | rtmppush | rtmppull | flvpull | rtsppull | hls | // | . | rtmppub | rtsppub | rtmpsub | flvsub | tssub | rtspsub | - | rtmppush | rtmppull | flvpull | rtsppull | rtsppush | hls |
// | - | - | - | - | - | - | - | - | - | - | - | - | | // | - | - | - | - | - | - | - | - | - | - | - | - | | |
// | x | x | x | x | x | x | x | - | x | x | x | x | | // | x | x | x | x | x | x | x | - | x | x | x | x | | |
// | UniqueKey<all> | √ | √ | √ | √ | √ | √ | - | x√ | x√ | √ | √ | | // | UniqueKey<all> | √ | √ | √ | √ | √ | √ | - | x√ | x√ | √ | √ | | |
// | AppName()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | // | AppName()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | |
// | StreamName()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | // | StreamName()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | |
// | RawQuery()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | // | RawQuery()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | |
// | GetStat()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | // | GetStat()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | |
// | UpdateStat()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | // | UpdateStat()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | |
// | IsAlive()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | // | IsAlive()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | |
// | RunLoop() | √ | x√ | √ | √ | √ | x&√ | - | x | x | x | x | | // | RunLoop() | √ | x√ | √ | √ | √ | x&√ | - | x | x | x | x | | |
// | Dispose() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | // | Dispose() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | |
// | RemoteAddr() | √ | x | √ | √ | x | x | - | x | x | x | x | | // | RemoteAddr() | √ | x | √ | √ | x | x | - | x | x | x | x | | |
// | SingleConn | √ | x | √ | √ | √ | √ | - | √ | √ | √ | x | | // | SingleConn | √ | x | √ | √ | √ | √ | - | √ | √ | √ | x | | |
// //
// | Opt.PullTimeoutMS | - | - | - | - | - | - | - | - | x | √ | √ | | // | Opt.Push/PullTimeoutMS | - | - | - | - | - | - | - | √ | √ | √ | √ | √ | |
// | Wait() | - | - | - | - | - | - | - | - | √ | √ | √ | | // | Wait() | - | - | - | - | - | - | - | √ | √ | √ | √ | √ | |
// //
// Dispose由外部调用表示主动关闭正常的session // Dispose由外部调用表示主动关闭正常的session
// 外部调用Dispose后不应继续使用该session // 外部调用Dispose后不应继续使用该session

var ( var (
LALLibraryName = "lal" LALLibraryName = "lal"
LALGithubRepo = ""
LALGithubSite = ""
LALDocSite = ""
// e.g. lal v0.12.3 ( // e.g. lal v0.12.3 (
LALFullInfo = LALLibraryName + " " + LALVersion + " (" + LALGithubRepo + ")"
// e.g. 0.12.3 // e.g. 0.12.3
LALVersionDot string LALVersionDot string
var ( var (
// 植入rtmp握手随机字符串中 // 植入rtmp握手随机字符串中
// e.g. lal v0.12.3 ( // e.g. lal v0.12.3 (
LALRTMPHandshakeWaterMark string //LALRTMPHandshakeWaterMark string
// 植入rtmp server中的connect result信令中 // 植入rtmp server中的connect result信令中
// 注意有两个object第一个object中的fmsVer我们保持通用公认的值在第二个object中植入 // 注意有两个object第一个object中的fmsVer我们保持通用公认的值在第二个object中植入
@ -121,5 +123,5 @@ func init() {
LALHTTPFLVPullSessionUA = LALLibraryName + "/" + LALVersionDot LALHTTPFLVPullSessionUA = LALLibraryName + "/" + LALVersionDot
LALRTSPPullSessionUA = LALLibraryName + "/" + LALVersionDot LALRTSPPullSessionUA = LALLibraryName + "/" + LALVersionDot
LALRTMPHandshakeWaterMark = LALFullInfo //LALRTMPHandshakeWaterMark = LALFullInfo
} }

func (session *PullSession) Dispose() { func (session *PullSession) Dispose() {
nazalog.Infof("[%s] lifecycle dispose httpflv PullSession.", session.UniqueKey) nazalog.Infof("[%s] lifecycle dispose httpflv PullSession.", session.UniqueKey)
_ = session.conn.Close() if session.conn != nil {
_ = session.conn.Close()
} }
func (session *PullSession) UpdateStat(interval uint32) { func (session *PullSession) UpdateStat(interval uint32) {

initLog(config.LogConfig) initLog(config.LogConfig)
nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine()) nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine())
nazalog.Infof("version: %s", base.LALFullInfo) nazalog.Infof("version: %s", base.LALFullInfo)
nazalog.Infof("github: %s", base.LALGithubSite)
nazalog.Infof("doc: %s", base.LALDocSite)
sm = NewServerManager() sm = NewServerManager()

lrm2ft LazyRTMPMsg2FLVTag lrm2ft LazyRTMPMsg2FLVTag
) )
//nazalog.Debugf("[%s] broadcaseRTMP. header=%+v, %s", group.UniqueKey, msg.Header, hex.Dump(nazastring.SubSliceSafety(msg.Payload, 7)))
// # 0. hls // # 0. hls
if config.HLSConfig.Enable && group.hlsMuxer != nil { if config.HLSConfig.Enable && group.hlsMuxer != nil {
group.hlsMuxer.FeedRTMPMessage(msg) group.hlsMuxer.FeedRTMPMessage(msg)
// # 1. 设置好用于发送的 rtmp 头部信息 // # 1. 设置好用于发送的 rtmp 头部信息
currHeader := remux.MakeDefaultRTMPHeader(msg.Header) currHeader := remux.MakeDefaultRTMPHeader(msg.Header)
if currHeader.MsgLen != uint32(len(msg.Payload)) { if currHeader.MsgLen != uint32(len(msg.Payload)) {
nazalog.Errorf("diff. msgLen=%d, payload len=%d, %+v", currHeader.MsgLen, len(msg.Payload), msg.Header) nazalog.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, currHeader.MsgLen, len(msg.Payload), msg.Header)
} }
// # 2. 懒初始化rtmp chunk切片以及httpflv转换 // # 2. 懒初始化rtmp chunk切片以及httpflv转换
@ -708,12 +710,15 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
if session.IsFresh { if session.IsFresh {
// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送 // TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
if group.gopCache.Metadata != nil { if group.gopCache.Metadata != nil {
//nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey)
_ = session.AsyncWrite(group.gopCache.Metadata) _ = session.AsyncWrite(group.gopCache.Metadata)
} }
if group.gopCache.VideoSeqHeader != nil { if group.gopCache.VideoSeqHeader != nil {
//nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey)
_ = session.AsyncWrite(group.gopCache.VideoSeqHeader) _ = session.AsyncWrite(group.gopCache.VideoSeqHeader)
} }
if group.gopCache.AACSeqHeader != nil { if group.gopCache.AACSeqHeader != nil {
//nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey)
_ = session.AsyncWrite(group.gopCache.AACSeqHeader) _ = session.AsyncWrite(group.gopCache.AACSeqHeader)
} }
for i := 0; i < group.gopCache.GetGOPCount(); i++ { for i := 0; i < group.gopCache.GetGOPCount(); i++ {

stream.header.TimestampAbs += stream.timestamp stream.header.TimestampAbs += stream.timestamp
} }
absTsFlag = false absTsFlag = false
//nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v", fmt, csid, stream.header) //nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v, c=%p", fmt, csid, stream.header, c)
if err := cb(stream); err != nil { if err := cb(stream); err != nil {
return err return err

@ -135,7 +135,9 @@ func (s *ClientSession) Flush() error {
func (s *ClientSession) Dispose() { func (s *ClientSession) Dispose() {
nazalog.Infof("[%s] lifecycle dispose rtmp ClientSession.", s.UniqueKey) nazalog.Infof("[%s] lifecycle dispose rtmp ClientSession.", s.UniqueKey)
_ = s.conn.Close() if s.conn != nil {
_ = s.conn.Close()
} }
func (s *ClientSession) AppName() string { func (s *ClientSession) AppName() string {
@ -210,7 +212,7 @@ func (s *ClientSession) doContext(ctx context.Context, rawURL string) error {
return return
} }
nazalog.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName()) nazalog.Infof("[%s] > W connect('%s'). tcUrl=%s", s.UniqueKey, s.appName(), s.tcURL())
if err := s.packer.writeConnect(s.conn, s.appName(), s.tcURL(), s.t == CSTPushSession); err != nil { if err := s.packer.writeConnect(s.conn, s.appName(), s.tcURL(), s.t == CSTPushSession); err != nil {
errChan <- err errChan <- err
return return
@ -251,6 +253,7 @@ func (s *ClientSession) streamNameWithRawQuery() string {
} }
func (s *ClientSession) tcpConnect() error { func (s *ClientSession) tcpConnect() error {
nazalog.Infof("[%s] > tcp connect.", s.UniqueKey)
var err error var err error
s.stat.RemoteAddr = s.urlCtx.HostWithPort s.stat.RemoteAddr = s.urlCtx.HostWithPort

"bytes" "bytes"
"crypto/hmac" "crypto/hmac"
"crypto/sha256" "crypto/sha256"
"io" "io"
"time" "time"
"" ""
"" ""
) )
func init() { func init() {
random1528Buf = make([]byte, 1528) random1528Buf = make([]byte, 1528)
hack := fmt.Sprintf("random buf of rtmp handshake gen by %s", base.LALRTMPHandshakeWaterMark) //hack := fmt.Sprintf("random buf of rtmp handshake gen by %s", base.LALRTMPHandshakeWaterMark)
copy(random1528Buf, []byte(hack)) //copy(random1528Buf, []byte(hack))
} }

if err != nil { if err != nil {
return err return err
} }
nazalog.Infof("[%s] < R connect('%s').", s.UniqueKey, s.appName) tcUrl, err := val.FindString("tcUrl")
if err != nil {
nazalog.Warnf("[%s] tcUrl not exist.", s.UniqueKey)
nazalog.Infof("[%s] < R connect('%s'). tcUrl=%s", s.UniqueKey, s.appName, tcUrl), val), val)

func (session *ClientCommandSession) Dispose() error { func (session *ClientCommandSession) Dispose() error {
nazalog.Infof("[%s] lifecycle dispose rtsp ClientCommandSession. session=%p", session.UniqueKey, session) nazalog.Infof("[%s] lifecycle dispose rtsp ClientCommandSession. session=%p", session.UniqueKey, session)
if session.conn == nil {
return nil
return session.conn.Close() return session.conn.Close()
} }
@ -396,8 +399,15 @@ func (session *ClientCommandSession) writeOneSetup(setupURI string) error {
return err return err
} }
var htv string
switch session.t {
case CCSTPushSession:
htv = fmt.Sprintf(HeaderTransportClientRecordTmpl, lRTPPort, lRTCPPort)
case CCSTPullSession:
htv = fmt.Sprintf(HeaderTransportClientPlayTmpl, lRTPPort, lRTCPPort)
headers := map[string]string{ headers := map[string]string{
HeaderTransport: fmt.Sprintf("RTP/AVP/UDP;unicast;client_port=%d-%d", lRTPPort, lRTCPPort), HeaderTransport: htv,
} }
ctx, err := session.writeCmdReadResp(MethodSetup, setupURI, headers, "") ctx, err := session.writeCmdReadResp(MethodSetup, setupURI, headers, "")
if err != nil { if err != nil {
rtcpChannel := + 1 rtcpChannel := + 1 += 2 += 2
var htv string
switch session.t {
case CCSTPushSession:
htv = fmt.Sprintf(HeaderTransportClientRecordTCPTmpl, rtpChannel, rtcpChannel)
case CCSTPullSession:
htv = fmt.Sprintf(HeaderTransportClientPlayTCPTmpl, rtpChannel, rtcpChannel)
headers := map[string]string{ headers := map[string]string{
HeaderTransport: fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", rtpChannel, rtcpChannel), HeaderTransport: htv,
} }
ctx, err := session.writeCmdReadResp(MethodSetup, setupURI, headers, "") ctx, err := session.writeCmdReadResp(MethodSetup, setupURI, headers, "")
if err != nil { if err != nil {
func (session *ClientCommandSession) writePlay() error { func (session *ClientCommandSession) writePlay() error {
headers := map[string]string{ headers := map[string]string{
HeaderRange: "npt=0.000-", HeaderRange: HeaderRangeDefault,
} }
_, err := session.writeCmdReadResp(MethodPlay, session.urlCtx.RawURLWithoutUserInfo, headers, "") _, err := session.writeCmdReadResp(MethodPlay, session.urlCtx.RawURLWithoutUserInfo, headers, "")
return err return err
func (session *ClientCommandSession) writeRecord() error { func (session *ClientCommandSession) writeRecord() error {
headers := map[string]string{ headers := map[string]string{
HeaderRange: "npt=0.000-", HeaderRange: HeaderRangeDefault,
} }
_, err := session.writeCmdReadResp(MethodRecord, session.urlCtx.RawURLWithoutUserInfo, headers, "") _, err := session.writeCmdReadResp(MethodRecord, session.urlCtx.RawURLWithoutUserInfo, headers, "")
return err return err

"%s" "%s"
// rfc2326 10.4 SETUP // rfc2326 10.4 SETUP
// TODO chef: mode=record这个是咋作用是应该pub有sub没有吗我的pack实现没有严格区分
// CSeq, Date, Session, Transport(client_port, server_rtp_port, server_rtcp_port)
var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" +
"Date: %s\r\n" +
"Session: %s\r\n" +
"Transport:RTP/AVP/UDP;unicast;client_port=%d-%d;server_port=%d-%d\r\n" +
// CSeq, Date, Session, Transport // CSeq, Date, Session, Transport
var ResponseSetupTCPTmpl = "RTSP/1.0 200 OK\r\n" + var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" + "CSeq: %s\r\n" +
"Date: %s\r\n" + "Date: %s\r\n" +
"Session: %s\r\n" + "Session: %s\r\n" +
@ -100,22 +90,10 @@ func PackResponseDescribe(cseq, sdp string) string {
return fmt.Sprintf(ResponseDescribeTmpl, cseq, date, len(sdp), sdp) return fmt.Sprintf(ResponseDescribeTmpl, cseq, date, len(sdp), sdp)
} }
// @param transportC: func PackResponseSetup(cseq string, htv string) string {
// pub example:
// RTP/AVP/UDP;unicast;client_port=24254-24255;mode=record
// RTP/AVP/UDP;unicast;client_port=24256-24257;mode=record
// sub example:
// RTP/AVP/UDP;unicast;client_port=9420-9421
func PackResponseSetup(cseq string, rRTPPort, rRTCPPort, lRTPPort, lRTCPPort uint16) string {
date := time.Now().Format(time.RFC1123)
return fmt.Sprintf(ResponseSetupTmpl, cseq, date, sessionID, rRTPPort, rRTCPPort, lRTPPort, lRTCPPort)
func PackResponseSetupTCP(cseq string, ts string) string {
date := time.Now().Format(time.RFC1123) date := time.Now().Format(time.RFC1123)
return fmt.Sprintf(ResponseSetupTCPTmpl, cseq, date, sessionID, ts) return fmt.Sprintf(ResponseSetupTmpl, cseq, date, sessionID, htv)
} }
func PackResponseRecord(cseq string) string { func PackResponseRecord(cseq string) string {

HeaderAccept = "Accept" HeaderAccept = "Accept"
HeaderUserAgent = "User-Agent" HeaderUserAgent = "User-Agent"
HeaderCSeq = "CSeq" HeaderCSeq = "CSeq"
HeaderContentLength = "Content-Length"
HeaderTransport = "Transport" HeaderTransport = "Transport"
HeaderSession = "Session" HeaderSession = "Session"
HeaderRange = "Range" HeaderRange = "Range"
HeaderContentLength = "Content-Length"
HeaderWWWAuthenticate = "WWW-Authenticate" HeaderWWWAuthenticate = "WWW-Authenticate"
HeaderAuthorization = "Authorization" HeaderAuthorization = "Authorization"
HeaderPublic = "Public" HeaderPublic = "Public"
// header value // header value
HeaderAcceptApplicationSDP = "application/sdp" HeaderAcceptApplicationSDP = "application/sdp"
HeaderRangeDefault = "npt=0.000-"
HeaderTransportClientPlayTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d" // localRTPPort, localRTCPPort
HeaderTransportClientPlayTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d" // rtpChannel, rtcpChannel
HeaderTransportClientRecordTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;mode=record"
HeaderTransportClientRecordTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=record"
HeaderTransportServerPlayTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;server_port=%d-%d"
//HeaderTransportServerPlayTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d"
HeaderTransportServerRecordTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;server_port=%d-%d;mode=record"
//HeaderTransportServerRecordTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=record"
) )
const ( const (

import ( import (
"bufio" "bufio"
"net" "net"
"strings" "strings"
@ -251,9 +252,9 @@ func (session *ServerCommandSession) handleSetup(requestCtx nazahttp.HTTPReqMsgC
host, _, _ := net.SplitHostPort(remoteAddr) host, _, _ := net.SplitHostPort(remoteAddr)
// 是否为interleaved模式 // 是否为interleaved模式
ts := requestCtx.Headers[HeaderTransport] htv := requestCtx.Headers[HeaderTransport]
if strings.Contains(ts, TransportFieldInterleaved) { if strings.Contains(htv, TransportFieldInterleaved) {
rtpChannel, rtcpChannel, err := parseRTPRTCPChannel(ts) rtpChannel, rtcpChannel, err := parseRTPRTCPChannel(htv)
if err != nil { if err != nil {
nazalog.Errorf("[%s] parse rtp rtcp channel error. err=%+v", session.UniqueKey, err) nazalog.Errorf("[%s] parse rtp rtcp channel error. err=%+v", session.UniqueKey, err)
return err return err
@ -273,7 +274,7 @@ func (session *ServerCommandSession) handleSetup(requestCtx nazahttp.HTTPReqMsgC
return ErrRTSP return ErrRTSP
} }
resp := PackResponseSetupTCP(requestCtx.Headers[HeaderCSeq], ts) resp := PackResponseSetup(requestCtx.Headers[HeaderCSeq], htv)
_, err = session.conn.Write([]byte(resp)) _, err = session.conn.Write([]byte(resp))
return err return err
} }
@ -296,17 +297,19 @@ func (session *ServerCommandSession) handleSetup(requestCtx nazahttp.HTTPReqMsgC
nazalog.Errorf("[%s] setup conn error. err=%+v", session.UniqueKey, err) nazalog.Errorf("[%s] setup conn error. err=%+v", session.UniqueKey, err)
return err return err
} }
htv = fmt.Sprintf(HeaderTransportServerRecordTmpl, rRTPPort, rRTCPPort, lRTPPort, lRTCPPort)
} else if session.subSession != nil { } else if session.subSession != nil {
if err = session.subSession.SetupWithConn(requestCtx.URI, rtpConn, rtcpConn); err != nil { if err = session.subSession.SetupWithConn(requestCtx.URI, rtpConn, rtcpConn); err != nil {
nazalog.Errorf("[%s] setup conn error. err=%+v", session.UniqueKey, err) nazalog.Errorf("[%s] setup conn error. err=%+v", session.UniqueKey, err)
return err return err
} }
htv = fmt.Sprintf(HeaderTransportServerPlayTmpl, rRTPPort, rRTCPPort, lRTPPort, lRTCPPort)
} else { } else {
nazalog.Errorf("[%s] setup but session not exist.", session.UniqueKey) nazalog.Errorf("[%s] setup but session not exist.", session.UniqueKey)
return ErrRTSP return ErrRTSP
} }
resp := PackResponseSetup(requestCtx.Headers[HeaderCSeq], rRTPPort, rRTCPPort, lRTPPort, lRTCPPort) resp := PackResponseSetup(requestCtx.Headers[HeaderCSeq], htv)
_, err = session.conn.Write([]byte(resp)) _, err = session.conn.Write([]byte(resp))
return err return err
} }
