diff --git a/README.md b/README.md index 0ee1274..5336e03 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Wide
-Go语言编写的流媒体 库 / 客户端 / 服务端 +Go语言编写的直播流媒体网络传输服务器

@@ -27,20 +27,40 @@ Go语言编写的流媒体 库 / 客户端 / 服务端 --- -Go语言编写的流媒体 库 / 客户端 / 服务器。目前 rtmp / http-flv 部分基本完成了。 +Go语言编写的直播流媒体网络传输服务器。本项目遵循的原则或者说最终目标是: -#### 源码框架 +* ~~没有蛀。。~~ +* 可读可维护。框架清晰,模块化,按业务逻辑层,协议层,传输层分层。 +* 可快速集成各种协议(rtmp / http-flv / hls, rtp / rtcp / webrtc, quic, srt, over tcp, over udp...) +* 高性能 -简单来说,主要源码在`app/`和`pkg/`两个目录下,后续我再画些源码架构图。 +目前 rtmp / http-flv 部分基本完成了。第一个目标大版本会实现直播源站以及直播 CDN 分发相关的功能。 + +### README 目录 + +* 源码框架 +* 编译和运行 +* 配置文件说明 +* 性能测试 +* 测试过的第三方客户端 +* Roadmap +* 联系我 + +### 源码框架 + +简单来说,源码在`app/`和`pkg/`两个目录下,后续我再画些源码架构图。 ``` app/ ......各种main包的源码文件,一个子目录对应一个main包,即对应可生成一个可执行文件 |-- lals/ ......[最重要的] 流媒体服务器 -|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件 -|-- flvfile2rtmppush ......rtmp推流客户端,输入是本地flv文件,文件推送完毕后,可循环推送(rtmp push流并不断开) +|-- flvfile2rtmppush ......// rtmp 推流客户端,读取本地 flv 文件,使用 rtmp 协议推送出去 + // + // 支持循环推送:文件推送完毕后,可循环推送(rtmp push 流并不断开) + // 支持推送多路流:相当于一个 rtmp 推流压测工具 |-- httpflvpull ......http-flv拉流客户端 -|-- modflvfile ......修改本地flv文件 |-- rtmppull ......rtmp拉流客户端,存储为本地flv文件 +|-- modflvfile ......修改本地flv文件 +|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件 pkg/ ......源码包 |-- aac/ ......音频aac编解码格式相关 |-- avc/ ......视频avc h264编解码格式相关 @@ -53,7 +73,7 @@ conf/ ......配置文件目录 目前唯一的第三方依赖(我自己写的 Go 基础库): [github.com/q191201771/naza](https://github.com/q191201771/naza) -#### 编译和运行 +### 编译和运行 ``` # 不使用 Go module @@ -69,7 +89,7 @@ $git clone https://github.com/q191201771/lal.git && cd lal && ./build.sh $./bin/lals -c conf/lals.conf.json ``` -#### 配置文件说明 +### 配置文件说明 ``` { @@ -97,7 +117,28 @@ $./bin/lals -c conf/lals.conf.json - [rtmp/var.go](https://github.com/q191201771/lal/blob/master/pkg/rtmp/var.go) - [httpflv/var.go](https://github.com/q191201771/lal/blob/master/pkg/httpflv/var.go) -#### 测试过的客户端 +### 性能测试 + +测试场景一:持续推送 n 路 rtmp 流至 lals(没有拉流) + +| 推流数量 | CPU 占用 | 内存占用(RES) | +| - | - | - | +| 1000 | (占单个核的)16% | 104MB | + +测试场景二:持续推送1路 rtmp 流至 lals,使用 rtmp 协议从 lals 拉取 n 路流 + +| 拉流数量 | CPU 占用 | 内存占用(RES) | +| - | - | - | +| 1000 | (占单个核的)30% | 120MB | + +* 测试机:32核16G(lals 服务器和压测工具同时跑在这一个机器上) +* 压测工具:lal 中的 `/app/flvfile2rtmppush` 以及 `/app/rtmppull` +* 推流码率:使用 `srs-bench` 中的 flv 文件,大概200kbps +* lals 版本:基于 git commit: fc0b04651af53a68758f41e5dfccdb7838e55a45 + +*由于测试机是台共用的机器,上面还跑了许多其他服务,这里列的只是个粗略的数据,还待做更多的性能分析以及优化。如果你对性能感兴趣,欢迎进行测试并将结果反馈给我。* + +### 测试过的第三方客户端 ``` 推流端: @@ -113,11 +154,11 @@ $./bin/lals -c conf/lals.conf.json - srs-bench (srs项目配套的一个压测工具) ``` -#### roadmap +### Roadmap **有建议、意见、bug、功能等等欢迎提 issue 啊,100% 会回复的。** -lals 服务器目标版本roadmap如下: +lals 服务器目标版本功能如下: **v1.0.0** @@ -157,20 +198,12 @@ lals 服务器目标版本roadmap如下: - hls - h265 -最终目标: - -性能ok,框架清晰,代码对于任何新手来说都是可读可维护的。 - -* 实现一个支持多种流媒体协议(比如rtmp, http-flv, hls, rtp/rtcp 等),多种底层传输协议(比如tcp, udp, srt, quic 等)的服务器 -* 所有协议都以模块化的库形式提供给需要的用户使用 -* 提供多种协议的推流客户端、拉流客户端,或者说演示demo - -#### 文档 +### 文档 * [rtmp handshake | rtmp握手简单模式和复杂模式](https://pengrl.com/p/20027/) * [rtmp协议中的chunk stream id, message stream id, transaction id, message type id](https://pengrl.com/p/25610/) -#### 联系我 +### 联系我 欢迎扫码加我微信,进行技术交流或扯淡。 diff --git a/app/flvfile2es/flvfile2es.go b/app/flvfile2es/flvfile2es.go index 65fec54..6f8f6d0 100644 --- a/app/flvfile2es/flvfile2es.go +++ b/app/flvfile2es/flvfile2es.go @@ -53,19 +53,19 @@ func main() { case httpflv.TagTypeAudio: aac.CaptureAAC(afp, payload) case httpflv.TagTypeVideo: - avc.CaptureAVC(vfp, payload) + _ = avc.CaptureAVC(vfp, payload) } } } func parseFlag() (string, string, string) { flv := flag.String("i", "", "specify flv file") - aac := flag.String("a", "", "specify es aac file") - avc := flag.String("v", "", "specify es h264 file") + a := flag.String("a", "", "specify es aac file") + v := flag.String("v", "", "specify es h264 file") flag.Parse() - if *flv == "" || *avc == "" || *aac == "" { + if *flv == "" || *a == "" || *v == "" { flag.Usage() os.Exit(1) } - return *flv, *aac, *avc + return *flv, *a, *v } diff --git a/app/flvfile2rtmppush/flvfile2rtmppush.go b/app/flvfile2rtmppush/flvfile2rtmppush.go index 4ea7e1f..5305e8b 100644 --- a/app/flvfile2rtmppush/flvfile2rtmppush.go +++ b/app/flvfile2rtmppush/flvfile2rtmppush.go @@ -13,6 +13,8 @@ import ( "fmt" "io" "os" + "strconv" + "strings" "time" "github.com/q191201771/lal/pkg/logic" @@ -23,28 +25,77 @@ import ( log "github.com/q191201771/naza/pkg/nazalog" ) -//rtmp推流客户端,输入是本地flv文件,文件推送完毕后,可循环推送(rtmp push流并不断开) +// rtmp 推流客户端,读取本地 flv 文件,使用 rtmp 协议推送出去 // -// -r 为1时表示当文件推送完毕后,是否循环推送(rtmp push流并不断开) +// 支持循环推送:文件推送完毕后,可循环推送(rtmp push 流并不断开) +// 支持推送多路流:相当于一个 rtmp 推流压测工具 // -// Usage: -// ./bin/flvfile2rtmppush -r 1 -i /tmp/test.flv -o rtmp://push.xxx.com/live/testttt +// Usage of ./bin/flvfile2rtmppush: +// -i string +// specify flv file +// -n int +// num of push connection (default 1) +// -o string +// specify rtmp push url +// -r recursive push if reach end of file +// -v show bin info +// Example: +// ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test +// ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test -r +// ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test_{i} -r -n 1000 func main() { - var err error + log.Info(bininfo.StringifySingleLine()) - flvFileName, rtmpPushURL, isRecursive := parseFlag() + filename, urlTmpl, num, isRecursive := parseFlag() + urls := collect(urlTmpl, num) - log.Info(bininfo.StringifySingleLine()) + tags := readAllTag(filename) + log.Debug(urls, num) - ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.ConnectTimeoutMS = 3000 - option.PushTimeoutMS = 5000 - option.WriteAVTimeoutMS = 10000 - }) - err = ps.Push(rtmpPushURL) + push(tags, urls, isRecursive) + log.Info("bye.") +} + +// readAllTag 预读取 flv 文件中的所有 tag,缓存在内存中 +func readAllTag(filename string) (ret []httpflv.Tag) { + var ffr httpflv.FLVFileReader + err := ffr.Open(filename) log.FatalIfErrorNotNil(err) - log.Infof("push succ. url=%s", rtmpPushURL) + log.Infof("open succ. filename=%s", filename) + + for { + tag, err := ffr.ReadTag() + if err == io.EOF { + log.Info("EOF") + break + } + log.FatalIfErrorNotNil(err) + ret = append(ret, tag) + } + log.Infof("read all tag done. num=%d", len(ret)) + return +} + +func push(tags []httpflv.Tag, urls []string, isRecursive bool) { + if len(tags) == 0 || len(urls) == 0 { + return + } + + var err error + var psList []*rtmp.PushSession + + for i := range urls { + ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { + option.ConnectTimeoutMS = 3000 + option.PushTimeoutMS = 5000 + option.WriteAVTimeoutMS = 10000 + }) + err = ps.Push(urls[i]) + log.FatalIfErrorNotNil(err) + log.Infof("push succ. url=%s", urls[i]) + psList = append(psList, ps) + } var totalBaseTS uint32 var prevTS uint32 @@ -58,21 +109,9 @@ func main() { log.Infof(" > round. i=%d, totalBaseTS=%d, prevTS=%d, thisBaseTS=%d", i, totalBaseTS, prevTS, thisBaseTS) - var ffr httpflv.FLVFileReader - err = ffr.Open(flvFileName) - log.FatalIfErrorNotNil(err) - log.Infof("open succ. filename=%s", flvFileName) - hasReadThisBaseTS = false - for { - tag, err := ffr.ReadTag() - if err == io.EOF { - log.Info("EOF") - break - } - log.FatalIfErrorNotNil(err) - + for _, tag := range tags { h := logic.Trans.FLVTagHeader2RTMPHeader(tag.Header) if tag.IsMetadata() { @@ -81,8 +120,10 @@ func main() { //log.Debugf("CHEFERASEME write metadata.") h.TimestampAbs = 0 chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h) - err = ps.AsyncWrite(chunks) - log.FatalIfErrorNotNil(err) + for _, ps := range psList { + err = ps.AsyncWrite(chunks) + log.FatalIfErrorNotNil(err) + } } else { // noop } @@ -122,14 +163,15 @@ func main() { hasTraceFirstTagTS = true } - err = ps.AsyncWrite(chunks) - log.FatalIfErrorNotNil(err) + for _, ps := range psList { + err = ps.AsyncWrite(chunks) + log.FatalIfErrorNotNil(err) + } prevTS = h.TimestampAbs } totalBaseTS = prevTS + 1 - ffr.Dispose() if !isRecursive { break @@ -137,11 +179,20 @@ func main() { } } -func parseFlag() (string, string, bool) { +func collect(urlTmpl string, num int) (urls []string) { + for i := 0; i < num; i++ { + url := strings.Replace(urlTmpl, "{i}", strconv.Itoa(i), -1) + urls = append(urls, url) + } + return +} + +func parseFlag() (filename string, urlTmpl string, num int, isRecursive bool) { v := flag.Bool("v", false, "show bin info") i := flag.String("i", "", "specify flv file") o := flag.String("o", "", "specify rtmp push url") r := flag.Bool("r", false, "recursive push if reach end of file") + n := flag.Int("n", 1, "num of push connection") flag.Parse() if *v { _, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine()) @@ -149,7 +200,12 @@ func parseFlag() (string, string, bool) { } if *i == "" || *o == "" { flag.Usage() + _, _ = fmt.Fprintf(os.Stderr, `Example: + ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test + ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test -r + ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test_{i} -r -n 1000 +`) os.Exit(1) } - return *i, *o, *r + return *i, *o, *n, *r } diff --git a/app/rtmppull/rtmppull.go b/app/rtmppull/rtmppull.go index f2f7bd8..303dc61 100644 --- a/app/rtmppull/rtmppull.go +++ b/app/rtmppull/rtmppull.go @@ -10,7 +10,11 @@ package main import ( "flag" + "fmt" "os" + "strconv" + "strings" + "sync" "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/logic" @@ -19,18 +23,33 @@ import ( ) func main() { + urlTmpl, fileNameTmpl, num := parseFlag() + urls, filenames := connect(urlTmpl, fileNameTmpl, num) + + var wg sync.WaitGroup + wg.Add(num) + for i := 0; i < num; i++ { + go func(index int) { + pull(urls[index], filenames[index]) + wg.Done() + }(i) + } + wg.Wait() +} + +func pull(url string, filename string) { var ( w httpflv.FLVFileWriter err error ) - url, outFileName := parseFlag() - - err = w.Open(outFileName) - log.FatalIfErrorNotNil(err) - defer w.Dispose() - err = w.WriteRaw(httpflv.FLVHeader) - log.FatalIfErrorNotNil(err) + if filename != "" { + err = w.Open(filename) + log.FatalIfErrorNotNil(err) + defer w.Dispose() + err = w.WriteRaw(httpflv.FLVHeader) + log.FatalIfErrorNotNil(err) + } session := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { option.ConnectTimeoutMS = 3000 @@ -39,21 +58,38 @@ func main() { }) err = session.Pull(url, func(msg rtmp.AVMsg) { - log.Infof("%+v, abs ts=%d", msg.Header, msg.Header.TimestampAbs) - tag := logic.Trans.RTMPMsg2FLVTag(msg) - err := w.WriteTag(*tag) - log.FatalIfErrorNotNil(err) + //log.Infof("%+v, abs ts=%d", msg.Header, msg.Header.TimestampAbs) + if filename != "" { + tag := logic.Trans.RTMPMsg2FLVTag(msg) + err := w.WriteTag(*tag) + log.FatalIfErrorNotNil(err) + } }) log.FatalIfErrorNotNil(err) } -func parseFlag() (string, string) { +func connect(urlTmpl string, fileNameTmpl string, num int) (urls []string, filenames []string) { + for i := 0; i < num; i++ { + url := strings.Replace(urlTmpl, "{i}", strconv.Itoa(i), -1) + urls = append(urls, url) + filename := strings.Replace(fileNameTmpl, "{i}", strconv.Itoa(i), -1) + filenames = append(filenames, filename) + } + return +} + +func parseFlag() (urlTmpl string, fileNameTmpl string, num int) { i := flag.String("i", "", "specify pull rtmp url") o := flag.String("o", "", "specify ouput flv file") + n := flag.Int("n", 1, "num of pull connection") flag.Parse() - if *i == "" || *o == "" { + if *i == "" { flag.Usage() + _, _ = fmt.Fprintf(os.Stderr, `Example: + ./bin/rtmppull -i rtmp://127.0.0.1:19350/live/test -o out.flv + ./bin/rtmppull -i rtmp://127.0.0.1:19350/live/test -n 1000 +`) os.Exit(1) } - return *i, *o + return *i, *o, *n } diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 438f9ab..13004c7 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -78,8 +78,8 @@ func (session *PullSession) Pull(rawURL string, onReadFLVTag OnReadFLVTag) error return session.runReadLoop(onReadFLVTag) } -func (session *PullSession) Dispose(err error) { - log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err) +func (session *PullSession) Dispose() { + log.Infof("lifecycle dispose PullSession. [%s]", session.UniqueKey) _ = session.Conn.Close() } diff --git a/pkg/logic/example_test.go b/pkg/logic/example_test.go index 6086672..b855193 100644 --- a/pkg/logic/example_test.go +++ b/pkg/logic/example_test.go @@ -125,7 +125,7 @@ func TestExample(t *testing.T) { assert.Equal(t, nil, err) fileTagCount.Increment() msg := logic.Trans.FLVTag2RTMPMsg(tag) - chunks := rtmp.Message2Chunks(msg.Message, &msg.Header) + chunks := rtmp.Message2Chunks(msg.Payload, &msg.Header) err = pushSession.AsyncWrite(chunks) assert.Equal(t, nil, err) } @@ -136,7 +136,7 @@ func TestExample(t *testing.T) { fileReader.Dispose() pushSession.Dispose() - httpflvPullSession.Dispose(nil) + httpflvPullSession.Dispose() rtmpPullSession.Dispose() HTTPFLVWriter.Dispose() RTMPWriter.Dispose() diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 26b3ccd..d841082 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -60,7 +60,7 @@ func (group *Group) RunLoop() { <-group.exitChan } -func (group *Group) Dispose(err error) { +func (group *Group) Dispose() { log.Infof("lifecycle dispose group. [%s]", group.UniqueKey) group.exitChan <- struct{}{} @@ -174,7 +174,7 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { for session := range group.rtmpSubSessionSet { // ## 2.1. 一个message广播给多个sub session时,只做一次chunk切割 if absChunks == nil { - absChunks = rtmp.Message2Chunks(msg.Message, &currHeader) + absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader) } // ## 2.2. 如果是新的sub session,发送已缓存的信息 @@ -200,10 +200,10 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { session.AsyncWrite(absChunks) case rtmp.TypeidVideo: if session.WaitKeyNalu { - if msg.Message[0] == 0x17 && msg.Message[1] == 0x0 { + if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x0 { session.AsyncWrite(absChunks) } - if msg.Message[0] == 0x17 && msg.Message[1] == 0x1 { + if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x1 { session.AsyncWrite(absChunks) session.WaitKeyNalu = false } @@ -245,10 +245,10 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { session.WriteTag(currTag) case rtmp.TypeidVideo: if session.WaitKeyNalu { - if msg.Message[0] == 0x17 && msg.Message[1] == 0x0 { + if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x0 { session.WriteTag(currTag) } - if msg.Message[0] == 0x17 && msg.Message[1] == 0x1 { + if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x1 { session.WriteTag(currTag) session.WaitKeyNalu = false } @@ -264,7 +264,7 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { switch msg.Header.MsgTypeID { case rtmp.TypeidDataMessageAMF0: if absChunks == nil { - absChunks = rtmp.Message2Chunks(msg.Message, &currHeader) + absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader) } if currTag == nil { currTag = Trans.RTMPMsg2FLVTag(msg) @@ -274,9 +274,9 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { log.Debugf("cache metadata. [%s]", group.UniqueKey) case rtmp.TypeidVideo: // TODO chef: magic number - if msg.Message[0] == 0x17 && msg.Message[1] == 0x0 { + if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x0 { if absChunks == nil { - absChunks = rtmp.Message2Chunks(msg.Message, &currHeader) + absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader) } if currTag == nil { currTag = Trans.RTMPMsg2FLVTag(msg) @@ -286,9 +286,9 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { log.Debugf("cache avc key seq header. [%s]", group.UniqueKey) } case rtmp.TypeidAudio: - if (msg.Message[0]>>4) == 0x0a && msg.Message[1] == 0x0 { + if (msg.Payload[0]>>4) == 0x0a && msg.Payload[1] == 0x0 { if absChunks == nil { - absChunks = rtmp.Message2Chunks(msg.Message, &currHeader) + absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader) } if currTag == nil { currTag = Trans.RTMPMsg2FLVTag(msg) diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 0a8c1f0..78b5566 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -90,7 +90,7 @@ func (sm *ServerManager) Dispose() { sm.mutex.Lock() for _, group := range sm.groupMap { - group.Dispose(ErrLogic) + group.Dispose() } sm.mutex.Unlock() @@ -159,7 +159,7 @@ func (sm *ServerManager) check() { for k, group := range sm.groupMap { if group.IsTotalEmpty() { log.Infof("erase empty group manager. [%s]", group.UniqueKey) - group.Dispose(ErrLogic) + group.Dispose() delete(sm.groupMap, k) } } diff --git a/pkg/logic/trans.go b/pkg/logic/trans.go index 0d48060..2bcd53d 100644 --- a/pkg/logic/trans.go +++ b/pkg/logic/trans.go @@ -55,7 +55,7 @@ func (t trans) MakeDefaultRTMPHeader(in rtmp.Header) (out rtmp.Header) { // 音视频内存块不发生拷贝 func (t trans) FLVTag2RTMPMsg(tag httpflv.Tag) (msg rtmp.AVMsg) { msg.Header = t.FLVTagHeader2RTMPHeader(tag.Header) - msg.Message = tag.Raw[11 : 11+msg.Header.MsgLen] + msg.Payload = tag.Raw[11 : 11+msg.Header.MsgLen] return } @@ -65,6 +65,6 @@ func (t trans) RTMPMsg2FLVTag(msg rtmp.AVMsg) *httpflv.Tag { tag.Header.Type = msg.Header.MsgTypeID tag.Header.DataSize = msg.Header.MsgLen tag.Header.Timestamp = msg.Header.TimestampAbs - tag.Raw = httpflv.PackHTTPFLVTag(msg.Header.MsgTypeID, msg.Header.TimestampAbs, msg.Message) + tag.Raw = httpflv.PackHTTPFLVTag(msg.Header.MsgTypeID, msg.Header.TimestampAbs, msg.Payload) return &tag } diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 9660502..0e61bc0 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -73,7 +73,6 @@ var defaultClientSessOption = ClientSessionOption{ type ModClientSessionOption func(option *ClientSessionOption) // @param t: session的类型,只能是推或者拉 -// @param timeout: 设置各种超时 func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession { var uk string switch t { diff --git a/pkg/rtmp/example_test.go b/pkg/rtmp/example_test.go index 6565163..aab5fdc 100644 --- a/pkg/rtmp/example_test.go +++ b/pkg/rtmp/example_test.go @@ -80,7 +80,7 @@ func (pso *MockPubSessionObserver) OnReadRTMPAVMsg(msg rtmp.AVMsg) { // 转发 currHeader := logic.Trans.MakeDefaultRTMPHeader(msg.Header) var absChunks []byte - absChunks = rtmp.Message2Chunks(msg.Message, &currHeader) + absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader) subSession.AsyncWrite(absChunks) } @@ -131,7 +131,7 @@ func TestExample(t *testing.T) { //log.Debugf("send tag. %d", tag.Header.Timestamp) msg := logic.Trans.FLVTag2RTMPMsg(tag) //log.Debugf("send msg. %d %d", msg.Header.Timestamp, msg.Header.TimestampAbs) - chunks := rtmp.Message2Chunks(msg.Message, &msg.Header) + chunks := rtmp.Message2Chunks(msg.Payload, &msg.Header) //log.Debugf("%s", hex.Dump(chunks[:16])) err = pushSession.AsyncWrite(chunks) assert.Equal(t, nil, err) diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/rtmp.go index c36ed57..a098a44 100644 --- a/pkg/rtmp/rtmp.go +++ b/pkg/rtmp/rtmp.go @@ -13,9 +13,10 @@ import "errors" var ErrRTMP = errors.New("lal.rtmp: fxxk") const ( - CSIDAMF = 5 - CSIDAudio = 6 - CSIDVideo = 7 + CSIDAMF = 5 + CSIDAudio = 6 + CSIDVideo = 7 + csidProtocolControl = 2 csidOverConnection = 3 csidOverStream = 5 @@ -25,9 +26,10 @@ const ( ) const ( - TypeidAudio = uint8(8) - TypeidVideo = uint8(9) - TypeidDataMessageAMF0 = uint8(18) // meta + TypeidAudio = uint8(8) + TypeidVideo = uint8(9) + TypeidDataMessageAMF0 = uint8(18) // meta + typeidSetChunkSize = uint8(1) typeidAck = uint8(3) typeidUserControl = uint8(4) @@ -58,7 +60,7 @@ const ( type AVMsg struct { Header Header - Message []byte // 不包含 rtmp 头 + Payload []byte // 不包含 rtmp 头 } type AVMsgObserver interface { diff --git a/pkg/rtmp/stream.go b/pkg/rtmp/stream.go index 61c0af7..604410b 100644 --- a/pkg/rtmp/stream.go +++ b/pkg/rtmp/stream.go @@ -44,7 +44,7 @@ func NewStream() *Stream { func (stream *Stream) toAVMsg() AVMsg { return AVMsg{ Header: stream.header, - Message: stream.msg.buf[stream.msg.b:stream.msg.e], + Payload: stream.msg.buf[stream.msg.b:stream.msg.e], } }