增加 /app/httpflvpull , http-flv拉流客户端

pull/200/head
q191201771 6 years ago
parent 3a12daab80
commit b770eeb48b

@ -1,7 +1,7 @@
<p align="center">
<img alt="Wide" src="https://pengrl.com/images/other/lallogo.png">
<br>
Go语言编写的流媒体 库 / 客户端 / 服务
Go语言编写的流媒体 库 / 客户端 / 服务
<br><br>
<a title="TravisCI" target="_blank" href="https://www.travis-ci.org/q191201771/lal"><img src="https://www.travis-ci.org/q191201771/lal.svg?branch=master"></a>
<a title="codecov" target="_blank" href="https://codecov.io/gh/q191201771/lal"><img src="https://codecov.io/gh/q191201771/lal/branch/master/graph/badge.svg?style=flat-square"></a>
@ -32,22 +32,23 @@ Go语言编写的流媒体 库 / 客户端 / 服务器
```
app/ ......各种main包的源码文件一个子目录对应一个main包即对应可生成一个可执行文件
|-- lal/ ......[最重要的] 流媒体服务器
|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件
|-- flvfile2rtmppush ......将本地flv文件使用rtmp推送出去
|-- modflvfile ......修改本地flv文件
|-- rtmppull ......rtmp拉流客户端
bin/ ......可执行文件输出目录
conf/ ......配置文件目录
demo/ ......各种demo类型的main包一个子目录对应一个demo即对应可生成一个可执行文件
|-- httpflvpull ......http-flv拉流客户端
|-- modflvfile ......修改本地flv文件
|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件
pkg/ ......源码包
|-- httpflv/ ......http flv协议
|-- httpflv/ ......http-flv协议
|-- rtmp/ ......rtmp协议
|-- util/ ......帮助类包
|-- bele/ ......大小端操作
|-- bininfo/ ......可执行文件版本等信息
|-- connstat/ ......连接超时信息
|-- errors/ ......错误处理
|-- log/ ......日志
|-- unique/ ......对象唯一ID
bin/ ......可执行文件输出目录
conf/ ......配置文件目录
```
#### 编译和运行
@ -64,43 +65,26 @@ $./bin/lal -c conf/lal.conf.json
```
{
"sub_idle_timeout": 10, // 往客户端发送数据时的超时时间
"gop_cache_num": 2, // gop缓存个数如果设置为0则只缓存seq header不缓存gop数据
"httpflv": {
"sub_listen_addr": ":8080" // http-flv拉流地址
},
"rtmp": {
"addr": ":8081" // rtmp服务器监听端口NOTICE rtmp服务器部分正在开发中
}
"pull": { // 如果设置则当客户端连接lal拉流而lal上该流不存在时lal会去该配置中的地址回源拉流至本地再转发给客户端
"type": "httpflv", // 回源类型,支持"httpflv" 或 "rtmp"
"addr": "pull.xxx.com", // 回源地址
"connect_timeout": 2, // 回源连接超时时间
"read_timeout": 10, // 回源读取数据超时时间
"stop_pull_while_no_sub_timeout": 3000 // 回源的流超过多长时间没有客户端播放,则关闭回源的流
"addr": ":19350" // rtmp服务监听的端口
}
}
```
TODO 日志配置文件说明
#### roadmap
#### 简单压力测试
第一阶段实现rtmp转发服务器
在一台双核腾讯云主机,以后会做更详细的测试以及性能优化。
最终目标:
| ~ | httpflv pull | httpflv sub | 平均%CPU | 入带宽 | 出带宽 | 内存RES |
| - | - | - | - | - | - | - |
| ~ | 1 | 300 | 8.8% | 1.5Mb | 450Mb | 36m |
| ~ | 300 | 300->0 | 18% | 450Mb | ->0Mb | 1.3g |
* 实现一个支持多种流媒体协议比如rtmp, http-flv, hls, rtp/rtcp 等多种底层传输协议比如tcp, udp, srt, quic 等)的服务器
* 所有协议都以模块化的库形式提供给需要的用户使用
* 提供多种协议的推流客户端、拉流客户端或者说演示demo
#### 依赖
目前不依赖任何第三方库
#### roadmap
正在实现rtmp服务器部分
#### 文档
* [rtmp handshake | rtmp握手简单模式和复杂模式](https://pengrl.com/p/20027/)

@ -0,0 +1,47 @@
package main
import (
"flag"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/util/log"
"os"
)
type Obs struct {
}
func (obs *Obs) ReadHTTPRespHeaderCB() {
log.Info("ReadHTTPRespHeaderCB")
}
func (obs *Obs) ReadFlvHeaderCB(flvHeader []byte) {
log.Info("ReadFlvHeaderCB")
}
func (obs *Obs) ReadFlvTagCB(tag *httpflv.Tag) {
log.Infof("ReadFlvTagCB %+v %d %d", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu())
}
func main() {
url := parseFlag()
var obs Obs
session := httpflv.NewPullSession(&obs, 0, 0)
err := session.Connect(url)
if err != nil {
log.Error(err)
return
}
err = session.RunLoop()
log.Error(err)
}
func parseFlag() string {
url := flag.String("i", "", "specify rtmp url")
flag.Parse()
if *url == "" {
flag.Usage()
os.Exit(1)
}
return *url
}

@ -18,17 +18,25 @@ var countA int
var countV int
func hookTag(tag *httpflv.Tag) {
log.Infof("%+v", tag.Header)
if tag.Header.T == httpflv.TagTypeAudio {
if countA < 3 {
httpflv.ModTagTimestamp(tag, 16777205)
}
countA++
//if countA < 3 {
// httpflv.ModTagTimestamp(tag, 16777205)
//}
//countA++
}
if tag.Header.T == httpflv.TagTypeVideo {
if countV < 3 {
httpflv.ModTagTimestamp(tag, 16777205)
//if countV < 3 {
// httpflv.ModTagTimestamp(tag, 16777205)
//}
//countV++
if tag.IsAVCKeySeqHeader() {
log.Info("key seq header.")
}
if tag.IsAVCKeyNalu() {
log.Info("key nalu.")
}
countV++
}
}

@ -20,6 +20,8 @@ cd ${ROOT_DIR}/app/flvfile2es && go build -o ${ROOT_DIR}/bin/flvfile2es
cd ${ROOT_DIR}/app/flvfile2rtmppush && go build -o ${ROOT_DIR}/bin/flvfile2rtmppush
cd ${ROOT_DIR}/app/httpflvpull && go build -o ${ROOT_DIR}/bin/httpflvpull
cd ${ROOT_DIR}/app/modflvfile && go build -o ${ROOT_DIR}/bin/modflvfile
cd ${ROOT_DIR}/app/rtmppull && go build -o ${ROOT_DIR}/bin/rtmppull

@ -20,6 +20,8 @@ cd ${ROOT_DIR}/app/flvfile2es && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}
cd ${ROOT_DIR}/app/flvfile2rtmppush && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/flvfile2rtmppush
cd ${ROOT_DIR}/app/httpflvpull && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/httpflvpull
cd ${ROOT_DIR}/app/modflvfile && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/modflvfile
cd ${ROOT_DIR}/app/rtmppull && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/rtmppull

@ -34,7 +34,7 @@ func parseHTTPHeader(r *bufio.Reader) (n int, firstLine string, headers map[stri
for {
line, isPrefix, err = r.ReadLine()
if len(line) == 0 {
if len(line) == 0 { // 读到一个空的 \r\n 表示http头全部读取完毕了
break
}
if isPrefix {

@ -12,9 +12,12 @@ import (
"time"
)
// rtmp客户端类型连接的底层实现
// rtmp包的使用者应该优先使用基于ClientSession实现的PushSession和PullSession
type ClientSession struct {
UniqueKey string
t ClientSessionType
obs PullSessionObserver // only for PullSession
connectTimeout int64
@ -27,12 +30,12 @@ type ClientSession struct {
appName string
streamName string
hs HandshakeClient
peerWinAckSize int
Conn net.Conn
rb *bufio.Reader
wb *bufio.Writer
peerWinAckSize int
UniqueKey string
wChan chan []byte
}
type ClientSessionType int
@ -61,6 +64,7 @@ func NewClientSession(t ClientSessionType, obs PullSessionObserver, connectTimeo
packer: NewMessagePacker(),
chunkComposer: NewChunkComposer(),
UniqueKey: unique.GenUniqueKey(uk),
wChan: make(chan []byte, wChanSize),
}
}

@ -48,9 +48,11 @@ const (
MSID1 = 1 // publish、play、onStatus 以及 音视频数据
)
// TODO chef
var (
readBufSize = 4096
writeBufSize = 4096
wChanSize = 1024
)
var windowAcknowledgementSize = 5000000

@ -15,7 +15,6 @@ import (
// TODO chef: 没有进化成Pub Sub时的超时释放
var wChanSize = 1024 // TODO chef
type ServerSessionObserver interface {
NewRTMPPubSessionCB(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听
@ -51,8 +50,6 @@ type ServerSession struct {
chunkComposer *ChunkComposer
packer *MessagePacker
// to be continued
// TODO chef: 添加Dispose以及chan发送
conn net.Conn
rb *bufio.Reader
wb *bufio.Writer
@ -88,6 +85,7 @@ func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession {
func (s *ServerSession) RunLoop() (err error) {
if err = s.handshake(); err != nil {
s.dispose(err)
return err
}

@ -19,11 +19,23 @@ var (
)
func StringifySingleLine() string {
return fmt.Sprintf("GitCommitID: %s. BuildTime: %s. GoVersion: %s. runtime: %s/%s",
return fmt.Sprintf("GitCommitID=%s. BuildTime=%s. GoVersion=%s. runtime=%s/%s.",
GitCommitID, BuildTime, BuildGoVersion, runtime.GOOS, runtime.GOARCH)
}
func StringifyMultiLine() string {
return fmt.Sprintf("GitCommitID: %s\nBuildTime: %s\nGoVersion: %s\nruntime: %s/%s",
return fmt.Sprintf("GitCommitID=%s\nBuildTime=%s\nGoVersion=%s\nruntime=%s/%s.",
GitCommitID, BuildTime, BuildGoVersion, runtime.GOOS, runtime.GOARCH)
}
func init() {
if GitCommitID == "" {
GitCommitID = "unknown"
}
if BuildTime == "" {
BuildTime = "unknown"
}
if BuildGoVersion == "" {
BuildGoVersion = "unknown"
}
}

Loading…
Cancel
Save