- package rtmp: 结构体的属性重命名 AVMsg.Message -> AVMsg.Payload

- app/flvfile2rtmppush: 支持推送多路 rtmp 流,相当于一个压测工具
- app/rtmppull: 支持对特定的一路流并发拉取多份,相当于一个压测工具
- README 中补充性能测试结果
pull/1/head
q191201771 5 years ago
parent ef3ac243d1
commit e89ff9992b

@ -3,7 +3,7 @@
<img alt="Wide" src="https://pengrl.com/images/other/lallogo.png">
</a>
<br>
Go语言编写的流媒体 库 / 客户端 / 服务端
Go语言编写的直播流媒体网络传输服务器
<br><br>
<a title="TravisCI" target="_blank" href="https://www.travis-ci.org/q191201771/lal"><img src="https://www.travis-ci.org/q191201771/lal.svg?branch=master"></a>
<a title="codecov" target="_blank" href="https://codecov.io/gh/q191201771/lal"><img src="https://codecov.io/gh/q191201771/lal/branch/master/graph/badge.svg?style=flat-square"></a>
@ -27,20 +27,40 @@ Go语言编写的流媒体 库 / 客户端 / 服务端
---
Go语言编写的流媒体 库 / 客户端 / 服务器。目前 rtmp / http-flv 部分基本完成了。
Go语言编写的直播流媒体网络传输服务器。本项目遵循的原则或者说最终目标是:
#### 源码框架
* ~~没有蛀。。~~
* 可读可维护。框架清晰,模块化,按业务逻辑层,协议层,传输层分层。
* 可快速集成各种协议rtmp / http-flv / hls, rtp / rtcp / webrtc, quic, srt, over tcp, over udp...
* 高性能
简单来说,主要源码在`app/`和`pkg/`两个目录下,后续我再画些源码架构图。
目前 rtmp / http-flv 部分基本完成了。第一个目标大版本会实现直播源站以及直播 CDN 分发相关的功能。
### README 目录
* 源码框架
* 编译和运行
* 配置文件说明
* 性能测试
* 测试过的第三方客户端
* Roadmap
* 联系我
### 源码框架
简单来说,源码在`app/`和`pkg/`两个目录下,后续我再画些源码架构图。
```
app/ ......各种main包的源码文件一个子目录对应一个main包即对应可生成一个可执行文件
|-- lals/ ......[最重要的] 流媒体服务器
|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件
|-- flvfile2rtmppush ......rtmp推流客户端输入是本地flv文件文件推送完毕后可循环推送rtmp push流并不断开
|-- flvfile2rtmppush ......// rtmp 推流客户端,读取本地 flv 文件,使用 rtmp 协议推送出去
//
// 支持循环推送文件推送完毕后可循环推送rtmp push 流并不断开)
// 支持推送多路流:相当于一个 rtmp 推流压测工具
|-- httpflvpull ......http-flv拉流客户端
|-- modflvfile ......修改本地flv文件
|-- rtmppull ......rtmp拉流客户端存储为本地flv文件
|-- modflvfile ......修改本地flv文件
|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件
pkg/ ......源码包
|-- aac/ ......音频aac编解码格式相关
|-- avc/ ......视频avc h264编解码格式相关
@ -53,7 +73,7 @@ conf/ ......配置文件目录
目前唯一的第三方依赖(我自己写的 Go 基础库): [github.com/q191201771/naza](https://github.com/q191201771/naza)
#### 编译和运行
### 编译和运行
```
# 不使用 Go module
@ -69,7 +89,7 @@ $git clone https://github.com/q191201771/lal.git && cd lal && ./build.sh
$./bin/lals -c conf/lals.conf.json
```
#### 配置文件说明
### 配置文件说明
```
{
@ -97,7 +117,28 @@ $./bin/lals -c conf/lals.conf.json
- [rtmp/var.go](https://github.com/q191201771/lal/blob/master/pkg/rtmp/var.go)
- [httpflv/var.go](https://github.com/q191201771/lal/blob/master/pkg/httpflv/var.go)
#### 测试过的客户端
### 性能测试
测试场景一:持续推送 n 路 rtmp 流至 lals没有拉流
| 推流数量 | CPU 占用 | 内存占用RES |
| - | - | - |
| 1000 | 占单个核的16% | 104MB |
测试场景二持续推送1路 rtmp 流至 lals使用 rtmp 协议从 lals 拉取 n 路流
| 拉流数量 | CPU 占用 | 内存占用RES |
| - | - | - |
| 1000 | 占单个核的30% | 120MB |
* 测试机32核16Glals 服务器和压测工具同时跑在这一个机器上)
* 压测工具lal 中的 `/app/flvfile2rtmppush` 以及 `/app/rtmppull`
* 推流码率:使用 `srs-bench` 中的 flv 文件大概200kbps
* lals 版本:基于 git commit: fc0b04651af53a68758f41e5dfccdb7838e55a45
*由于测试机是台共用的机器,上面还跑了许多其他服务,这里列的只是个粗略的数据,还待做更多的性能分析以及优化。如果你对性能感兴趣,欢迎进行测试并将结果反馈给我。*
### 测试过的第三方客户端
```
推流端:
@ -113,11 +154,11 @@ $./bin/lals -c conf/lals.conf.json
- srs-bench (srs项目配套的一个压测工具)
```
#### roadmap
### Roadmap
**有建议、意见、bug、功能等等欢迎提 issue 啊100% 会回复的。**
lals 服务器目标版本roadmap如下:
lals 服务器目标版本功能如下:
**v1.0.0**
@ -157,20 +198,12 @@ lals 服务器目标版本roadmap如下
- hls
- h265
最终目标:
性能ok框架清晰代码对于任何新手来说都是可读可维护的。
* 实现一个支持多种流媒体协议比如rtmp, http-flv, hls, rtp/rtcp 等多种底层传输协议比如tcp, udp, srt, quic 等)的服务器
* 所有协议都以模块化的库形式提供给需要的用户使用
* 提供多种协议的推流客户端、拉流客户端或者说演示demo
#### 文档
### 文档
* [rtmp handshake | rtmp握手简单模式和复杂模式](https://pengrl.com/p/20027/)
* [rtmp协议中的chunk stream id, message stream id, transaction id, message type id](https://pengrl.com/p/25610/)
#### 联系我
### 联系我
欢迎扫码加我微信,进行技术交流或扯淡。

@ -53,19 +53,19 @@ func main() {
case httpflv.TagTypeAudio:
aac.CaptureAAC(afp, payload)
case httpflv.TagTypeVideo:
avc.CaptureAVC(vfp, payload)
_ = avc.CaptureAVC(vfp, payload)
}
}
}
func parseFlag() (string, string, string) {
flv := flag.String("i", "", "specify flv file")
aac := flag.String("a", "", "specify es aac file")
avc := flag.String("v", "", "specify es h264 file")
a := flag.String("a", "", "specify es aac file")
v := flag.String("v", "", "specify es h264 file")
flag.Parse()
if *flv == "" || *avc == "" || *aac == "" {
if *flv == "" || *a == "" || *v == "" {
flag.Usage()
os.Exit(1)
}
return *flv, *aac, *avc
return *flv, *a, *v
}

@ -13,6 +13,8 @@ import (
"fmt"
"io"
"os"
"strconv"
"strings"
"time"
"github.com/q191201771/lal/pkg/logic"
@ -23,28 +25,77 @@ import (
log "github.com/q191201771/naza/pkg/nazalog"
)
//rtmp推流客户端输入是本地flv文件文件推送完毕后可循环推送rtmp push流并不断开
// rtmp 推流客户端,读取本地 flv 文件,使用 rtmp 协议推送出去
//
// -r 为1时表示当文件推送完毕后是否循环推送rtmp push流并不断开
// 支持循环推送文件推送完毕后可循环推送rtmp push 流并不断开)
// 支持推送多路流:相当于一个 rtmp 推流压测工具
//
// Usage:
// ./bin/flvfile2rtmppush -r 1 -i /tmp/test.flv -o rtmp://push.xxx.com/live/testttt
// Usage of ./bin/flvfile2rtmppush:
// -i string
// specify flv file
// -n int
// num of push connection (default 1)
// -o string
// specify rtmp push url
// -r recursive push if reach end of file
// -v show bin info
// Example:
// ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test
// ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test -r
// ./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test_{i} -r -n 1000
func main() {
var err error
log.Info(bininfo.StringifySingleLine())
flvFileName, rtmpPushURL, isRecursive := parseFlag()
filename, urlTmpl, num, isRecursive := parseFlag()
urls := collect(urlTmpl, num)
log.Info(bininfo.StringifySingleLine())
tags := readAllTag(filename)
log.Debug(urls, num)
ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.ConnectTimeoutMS = 3000
option.PushTimeoutMS = 5000
option.WriteAVTimeoutMS = 10000
})
err = ps.Push(rtmpPushURL)
push(tags, urls, isRecursive)
log.Info("bye.")
}
// readAllTag 预读取 flv 文件中的所有 tag缓存在内存中
func readAllTag(filename string) (ret []httpflv.Tag) {
var ffr httpflv.FLVFileReader
err := ffr.Open(filename)
log.FatalIfErrorNotNil(err)
log.Infof("push succ. url=%s", rtmpPushURL)
log.Infof("open succ. filename=%s", filename)
for {
tag, err := ffr.ReadTag()
if err == io.EOF {
log.Info("EOF")
break
}
log.FatalIfErrorNotNil(err)
ret = append(ret, tag)
}
log.Infof("read all tag done. num=%d", len(ret))
return
}
func push(tags []httpflv.Tag, urls []string, isRecursive bool) {
if len(tags) == 0 || len(urls) == 0 {
return
}
var err error
var psList []*rtmp.PushSession
for i := range urls {
ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.ConnectTimeoutMS = 3000
option.PushTimeoutMS = 5000
option.WriteAVTimeoutMS = 10000
})
err = ps.Push(urls[i])
log.FatalIfErrorNotNil(err)
log.Infof("push succ. url=%s", urls[i])
psList = append(psList, ps)
}
var totalBaseTS uint32
var prevTS uint32
@ -58,21 +109,9 @@ func main() {
log.Infof(" > round. i=%d, totalBaseTS=%d, prevTS=%d, thisBaseTS=%d",
i, totalBaseTS, prevTS, thisBaseTS)
var ffr httpflv.FLVFileReader
err = ffr.Open(flvFileName)
log.FatalIfErrorNotNil(err)
log.Infof("open succ. filename=%s", flvFileName)
hasReadThisBaseTS = false
for {
tag, err := ffr.ReadTag()
if err == io.EOF {
log.Info("EOF")
break
}
log.FatalIfErrorNotNil(err)
for _, tag := range tags {
h := logic.Trans.FLVTagHeader2RTMPHeader(tag.Header)
if tag.IsMetadata() {
@ -81,8 +120,10 @@ func main() {
//log.Debugf("CHEFERASEME write metadata.")
h.TimestampAbs = 0
chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h)
err = ps.AsyncWrite(chunks)
log.FatalIfErrorNotNil(err)
for _, ps := range psList {
err = ps.AsyncWrite(chunks)
log.FatalIfErrorNotNil(err)
}
} else {
// noop
}
@ -122,14 +163,15 @@ func main() {
hasTraceFirstTagTS = true
}
err = ps.AsyncWrite(chunks)
log.FatalIfErrorNotNil(err)
for _, ps := range psList {
err = ps.AsyncWrite(chunks)
log.FatalIfErrorNotNil(err)
}
prevTS = h.TimestampAbs
}
totalBaseTS = prevTS + 1
ffr.Dispose()
if !isRecursive {
break
@ -137,11 +179,20 @@ func main() {
}
}
func parseFlag() (string, string, bool) {
func collect(urlTmpl string, num int) (urls []string) {
for i := 0; i < num; i++ {
url := strings.Replace(urlTmpl, "{i}", strconv.Itoa(i), -1)
urls = append(urls, url)
}
return
}
func parseFlag() (filename string, urlTmpl string, num int, isRecursive bool) {
v := flag.Bool("v", false, "show bin info")
i := flag.String("i", "", "specify flv file")
o := flag.String("o", "", "specify rtmp push url")
r := flag.Bool("r", false, "recursive push if reach end of file")
n := flag.Int("n", 1, "num of push connection")
flag.Parse()
if *v {
_, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine())
@ -149,7 +200,12 @@ func parseFlag() (string, string, bool) {
}
if *i == "" || *o == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test
./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test -r
./bin/flvfile2rtmppush -i testdata/test.flv -o rtmp://127.0.0.1:19350/live/test_{i} -r -n 1000
`)
os.Exit(1)
}
return *i, *o, *r
return *i, *o, *n, *r
}

@ -10,7 +10,11 @@ package main
import (
"flag"
"fmt"
"os"
"strconv"
"strings"
"sync"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/logic"
@ -19,18 +23,33 @@ import (
)
func main() {
urlTmpl, fileNameTmpl, num := parseFlag()
urls, filenames := connect(urlTmpl, fileNameTmpl, num)
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func(index int) {
pull(urls[index], filenames[index])
wg.Done()
}(i)
}
wg.Wait()
}
func pull(url string, filename string) {
var (
w httpflv.FLVFileWriter
err error
)
url, outFileName := parseFlag()
err = w.Open(outFileName)
log.FatalIfErrorNotNil(err)
defer w.Dispose()
err = w.WriteRaw(httpflv.FLVHeader)
log.FatalIfErrorNotNil(err)
if filename != "" {
err = w.Open(filename)
log.FatalIfErrorNotNil(err)
defer w.Dispose()
err = w.WriteRaw(httpflv.FLVHeader)
log.FatalIfErrorNotNil(err)
}
session := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.ConnectTimeoutMS = 3000
@ -39,21 +58,38 @@ func main() {
})
err = session.Pull(url, func(msg rtmp.AVMsg) {
log.Infof("%+v, abs ts=%d", msg.Header, msg.Header.TimestampAbs)
tag := logic.Trans.RTMPMsg2FLVTag(msg)
err := w.WriteTag(*tag)
log.FatalIfErrorNotNil(err)
//log.Infof("%+v, abs ts=%d", msg.Header, msg.Header.TimestampAbs)
if filename != "" {
tag := logic.Trans.RTMPMsg2FLVTag(msg)
err := w.WriteTag(*tag)
log.FatalIfErrorNotNil(err)
}
})
log.FatalIfErrorNotNil(err)
}
func parseFlag() (string, string) {
func connect(urlTmpl string, fileNameTmpl string, num int) (urls []string, filenames []string) {
for i := 0; i < num; i++ {
url := strings.Replace(urlTmpl, "{i}", strconv.Itoa(i), -1)
urls = append(urls, url)
filename := strings.Replace(fileNameTmpl, "{i}", strconv.Itoa(i), -1)
filenames = append(filenames, filename)
}
return
}
func parseFlag() (urlTmpl string, fileNameTmpl string, num int) {
i := flag.String("i", "", "specify pull rtmp url")
o := flag.String("o", "", "specify ouput flv file")
n := flag.Int("n", 1, "num of pull connection")
flag.Parse()
if *i == "" || *o == "" {
if *i == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
./bin/rtmppull -i rtmp://127.0.0.1:19350/live/test -o out.flv
./bin/rtmppull -i rtmp://127.0.0.1:19350/live/test -n 1000
`)
os.Exit(1)
}
return *i, *o
return *i, *o, *n
}

@ -78,8 +78,8 @@ func (session *PullSession) Pull(rawURL string, onReadFLVTag OnReadFLVTag) error
return session.runReadLoop(onReadFLVTag)
}
func (session *PullSession) Dispose(err error) {
log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err)
func (session *PullSession) Dispose() {
log.Infof("lifecycle dispose PullSession. [%s]", session.UniqueKey)
_ = session.Conn.Close()
}

@ -125,7 +125,7 @@ func TestExample(t *testing.T) {
assert.Equal(t, nil, err)
fileTagCount.Increment()
msg := logic.Trans.FLVTag2RTMPMsg(tag)
chunks := rtmp.Message2Chunks(msg.Message, &msg.Header)
chunks := rtmp.Message2Chunks(msg.Payload, &msg.Header)
err = pushSession.AsyncWrite(chunks)
assert.Equal(t, nil, err)
}
@ -136,7 +136,7 @@ func TestExample(t *testing.T) {
fileReader.Dispose()
pushSession.Dispose()
httpflvPullSession.Dispose(nil)
httpflvPullSession.Dispose()
rtmpPullSession.Dispose()
HTTPFLVWriter.Dispose()
RTMPWriter.Dispose()

@ -60,7 +60,7 @@ func (group *Group) RunLoop() {
<-group.exitChan
}
func (group *Group) Dispose(err error) {
func (group *Group) Dispose() {
log.Infof("lifecycle dispose group. [%s]", group.UniqueKey)
group.exitChan <- struct{}{}
@ -174,7 +174,7 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
for session := range group.rtmpSubSessionSet {
// ## 2.1. 一个message广播给多个sub session时只做一次chunk切割
if absChunks == nil {
absChunks = rtmp.Message2Chunks(msg.Message, &currHeader)
absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader)
}
// ## 2.2. 如果是新的sub session发送已缓存的信息
@ -200,10 +200,10 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
session.AsyncWrite(absChunks)
case rtmp.TypeidVideo:
if session.WaitKeyNalu {
if msg.Message[0] == 0x17 && msg.Message[1] == 0x0 {
if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x0 {
session.AsyncWrite(absChunks)
}
if msg.Message[0] == 0x17 && msg.Message[1] == 0x1 {
if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x1 {
session.AsyncWrite(absChunks)
session.WaitKeyNalu = false
}
@ -245,10 +245,10 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
session.WriteTag(currTag)
case rtmp.TypeidVideo:
if session.WaitKeyNalu {
if msg.Message[0] == 0x17 && msg.Message[1] == 0x0 {
if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x0 {
session.WriteTag(currTag)
}
if msg.Message[0] == 0x17 && msg.Message[1] == 0x1 {
if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x1 {
session.WriteTag(currTag)
session.WaitKeyNalu = false
}
@ -264,7 +264,7 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
switch msg.Header.MsgTypeID {
case rtmp.TypeidDataMessageAMF0:
if absChunks == nil {
absChunks = rtmp.Message2Chunks(msg.Message, &currHeader)
absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader)
}
if currTag == nil {
currTag = Trans.RTMPMsg2FLVTag(msg)
@ -274,9 +274,9 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
log.Debugf("cache metadata. [%s]", group.UniqueKey)
case rtmp.TypeidVideo:
// TODO chef: magic number
if msg.Message[0] == 0x17 && msg.Message[1] == 0x0 {
if msg.Payload[0] == 0x17 && msg.Payload[1] == 0x0 {
if absChunks == nil {
absChunks = rtmp.Message2Chunks(msg.Message, &currHeader)
absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader)
}
if currTag == nil {
currTag = Trans.RTMPMsg2FLVTag(msg)
@ -286,9 +286,9 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) {
log.Debugf("cache avc key seq header. [%s]", group.UniqueKey)
}
case rtmp.TypeidAudio:
if (msg.Message[0]>>4) == 0x0a && msg.Message[1] == 0x0 {
if (msg.Payload[0]>>4) == 0x0a && msg.Payload[1] == 0x0 {
if absChunks == nil {
absChunks = rtmp.Message2Chunks(msg.Message, &currHeader)
absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader)
}
if currTag == nil {
currTag = Trans.RTMPMsg2FLVTag(msg)

@ -90,7 +90,7 @@ func (sm *ServerManager) Dispose() {
sm.mutex.Lock()
for _, group := range sm.groupMap {
group.Dispose(ErrLogic)
group.Dispose()
}
sm.mutex.Unlock()
@ -159,7 +159,7 @@ func (sm *ServerManager) check() {
for k, group := range sm.groupMap {
if group.IsTotalEmpty() {
log.Infof("erase empty group manager. [%s]", group.UniqueKey)
group.Dispose(ErrLogic)
group.Dispose()
delete(sm.groupMap, k)
}
}

@ -55,7 +55,7 @@ func (t trans) MakeDefaultRTMPHeader(in rtmp.Header) (out rtmp.Header) {
// 音视频内存块不发生拷贝
func (t trans) FLVTag2RTMPMsg(tag httpflv.Tag) (msg rtmp.AVMsg) {
msg.Header = t.FLVTagHeader2RTMPHeader(tag.Header)
msg.Message = tag.Raw[11 : 11+msg.Header.MsgLen]
msg.Payload = tag.Raw[11 : 11+msg.Header.MsgLen]
return
}
@ -65,6 +65,6 @@ func (t trans) RTMPMsg2FLVTag(msg rtmp.AVMsg) *httpflv.Tag {
tag.Header.Type = msg.Header.MsgTypeID
tag.Header.DataSize = msg.Header.MsgLen
tag.Header.Timestamp = msg.Header.TimestampAbs
tag.Raw = httpflv.PackHTTPFLVTag(msg.Header.MsgTypeID, msg.Header.TimestampAbs, msg.Message)
tag.Raw = httpflv.PackHTTPFLVTag(msg.Header.MsgTypeID, msg.Header.TimestampAbs, msg.Payload)
return &tag
}

@ -73,7 +73,6 @@ var defaultClientSessOption = ClientSessionOption{
type ModClientSessionOption func(option *ClientSessionOption)
// @param t: session的类型只能是推或者拉
// @param timeout: 设置各种超时
func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession {
var uk string
switch t {

@ -80,7 +80,7 @@ func (pso *MockPubSessionObserver) OnReadRTMPAVMsg(msg rtmp.AVMsg) {
// 转发
currHeader := logic.Trans.MakeDefaultRTMPHeader(msg.Header)
var absChunks []byte
absChunks = rtmp.Message2Chunks(msg.Message, &currHeader)
absChunks = rtmp.Message2Chunks(msg.Payload, &currHeader)
subSession.AsyncWrite(absChunks)
}
@ -131,7 +131,7 @@ func TestExample(t *testing.T) {
//log.Debugf("send tag. %d", tag.Header.Timestamp)
msg := logic.Trans.FLVTag2RTMPMsg(tag)
//log.Debugf("send msg. %d %d", msg.Header.Timestamp, msg.Header.TimestampAbs)
chunks := rtmp.Message2Chunks(msg.Message, &msg.Header)
chunks := rtmp.Message2Chunks(msg.Payload, &msg.Header)
//log.Debugf("%s", hex.Dump(chunks[:16]))
err = pushSession.AsyncWrite(chunks)
assert.Equal(t, nil, err)

@ -13,9 +13,10 @@ import "errors"
var ErrRTMP = errors.New("lal.rtmp: fxxk")
const (
CSIDAMF = 5
CSIDAudio = 6
CSIDVideo = 7
CSIDAMF = 5
CSIDAudio = 6
CSIDVideo = 7
csidProtocolControl = 2
csidOverConnection = 3
csidOverStream = 5
@ -25,9 +26,10 @@ const (
)
const (
TypeidAudio = uint8(8)
TypeidVideo = uint8(9)
TypeidDataMessageAMF0 = uint8(18) // meta
TypeidAudio = uint8(8)
TypeidVideo = uint8(9)
TypeidDataMessageAMF0 = uint8(18) // meta
typeidSetChunkSize = uint8(1)
typeidAck = uint8(3)
typeidUserControl = uint8(4)
@ -58,7 +60,7 @@ const (
type AVMsg struct {
Header Header
Message []byte // 不包含 rtmp 头
Payload []byte // 不包含 rtmp 头
}
type AVMsgObserver interface {

@ -44,7 +44,7 @@ func NewStream() *Stream {
func (stream *Stream) toAVMsg() AVMsg {
return AVMsg{
Header: stream.header,
Message: stream.msg.buf[stream.msg.b:stream.msg.e],
Payload: stream.msg.buf[stream.msg.b:stream.msg.e],
}
}

Loading…
Cancel
Save