diff --git a/README.md b/README.md index 6df1626..07001e7 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@

Wide
-Go语言编写的流媒体 库 / 客户端 / 服务器 +Go语言编写的流媒体 库 / 客户端 / 服务端

@@ -32,22 +32,23 @@ Go语言编写的流媒体 库 / 客户端 / 服务器 ``` app/ ......各种main包的源码文件,一个子目录对应一个main包,即对应可生成一个可执行文件 |-- lal/ ......[最重要的] 流媒体服务器 -|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件 |-- flvfile2rtmppush ......将本地flv文件使用rtmp推送出去 -|-- modflvfile ......修改本地flv文件 |-- rtmppull ......rtmp拉流客户端 -bin/ ......可执行文件输出目录 -conf/ ......配置文件目录 -demo/ ......各种demo类型的main包,一个子目录对应一个demo,即对应可生成一个可执行文件 +|-- httpflvpull ......http-flv拉流客户端 +|-- modflvfile ......修改本地flv文件 +|-- flvfile2es ......将本地flv文件分离成h264/avc es流文件以及aac es流文件 pkg/ ......源码包 -|-- httpflv/ ......http flv协议 +|-- httpflv/ ......http-flv协议 |-- rtmp/ ......rtmp协议 |-- util/ ......帮助类包 |-- bele/ ......大小端操作 |-- bininfo/ ......可执行文件版本等信息 |-- connstat/ ......连接超时信息 + |-- errors/ ......错误处理 |-- log/ ......日志 |-- unique/ ......对象唯一ID +bin/ ......可执行文件输出目录 +conf/ ......配置文件目录 ``` #### 编译和运行 @@ -64,43 +65,26 @@ $./bin/lal -c conf/lal.conf.json ``` { - "sub_idle_timeout": 10, // 往客户端发送数据时的超时时间 - "gop_cache_num": 2, // gop缓存个数,如果设置为0,则只缓存seq header,不缓存gop数据 - "httpflv": { - "sub_listen_addr": ":8080" // http-flv拉流地址 - }, "rtmp": { - "addr": ":8081" // rtmp服务器监听端口,NOTICE rtmp服务器部分正在开发中 - } - "pull": { // 如果设置,则当客户端连接lal拉流而lal上该流不存在时,lal会去该配置中的地址回源拉流至本地再转发给客户端 - "type": "httpflv", // 回源类型,支持"httpflv" 或 "rtmp" - "addr": "pull.xxx.com", // 回源地址 - "connect_timeout": 2, // 回源连接超时时间 - "read_timeout": 10, // 回源读取数据超时时间 - "stop_pull_while_no_sub_timeout": 3000 // 回源的流超过多长时间没有客户端播放,则关闭回源的流 + "addr": ":19350" // rtmp服务监听的端口 } } ``` -TODO 日志配置文件说明 +#### roadmap -#### 简单压力测试 +第一阶段:实现rtmp转发服务器 -在一台双核腾讯云主机,以后会做更详细的测试以及性能优化。 +最终目标: -| ~ | httpflv pull | httpflv sub | 平均%CPU | 入带宽 | 出带宽 | 内存RES | -| - | - | - | - | - | - | - | -| ~ | 1 | 300 | 8.8% | 1.5Mb | 450Mb | 36m | -| ~ | 300 | 300->0 | 18% | 450Mb | ->0Mb | 1.3g | +* 实现一个支持多种流媒体协议(比如rtmp, http-flv, hls, rtp/rtcp 等),多种底层传输协议(比如tcp, udp, srt, quic 等)的服务器 +* 所有协议都以模块化的库形式提供给需要的用户使用 +* 提供多种协议的推流客户端、拉流客户端,或者说演示demo #### 依赖 目前不依赖任何第三方库 -#### roadmap - -正在实现rtmp服务器部分 - #### 文档 * [rtmp handshake | rtmp握手简单模式和复杂模式](https://pengrl.com/p/20027/) diff --git a/app/httpflvpull/httpflvpull.go b/app/httpflvpull/httpflvpull.go new file mode 100644 index 0000000..2387a3c --- /dev/null +++ b/app/httpflvpull/httpflvpull.go @@ -0,0 +1,47 @@ +package main + +import ( + "flag" + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/util/log" + "os" +) + +type Obs struct { +} + +func (obs *Obs) ReadHTTPRespHeaderCB() { + log.Info("ReadHTTPRespHeaderCB") +} + +func (obs *Obs) ReadFlvHeaderCB(flvHeader []byte) { + log.Info("ReadFlvHeaderCB") +} + +func (obs *Obs) ReadFlvTagCB(tag *httpflv.Tag) { + log.Infof("ReadFlvTagCB %+v %d %d", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu()) +} + +func main() { + url := parseFlag() + var obs Obs + session := httpflv.NewPullSession(&obs, 0, 0) + err := session.Connect(url) + if err != nil { + log.Error(err) + return + } + err = session.RunLoop() + log.Error(err) +} + + +func parseFlag() string { + url := flag.String("i", "", "specify rtmp url") + flag.Parse() + if *url == "" { + flag.Usage() + os.Exit(1) + } + return *url +} diff --git a/app/modflvfile/modflvfile.go b/app/modflvfile/modflvfile.go index 772d23e..e6f40f1 100644 --- a/app/modflvfile/modflvfile.go +++ b/app/modflvfile/modflvfile.go @@ -18,17 +18,25 @@ var countA int var countV int func hookTag(tag *httpflv.Tag) { + log.Infof("%+v", tag.Header) if tag.Header.T == httpflv.TagTypeAudio { - if countA < 3 { - httpflv.ModTagTimestamp(tag, 16777205) - } - countA++ + + //if countA < 3 { + // httpflv.ModTagTimestamp(tag, 16777205) + //} + //countA++ } if tag.Header.T == httpflv.TagTypeVideo { - if countV < 3 { - httpflv.ModTagTimestamp(tag, 16777205) + //if countV < 3 { + // httpflv.ModTagTimestamp(tag, 16777205) + //} + //countV++ + if tag.IsAVCKeySeqHeader() { + log.Info("key seq header.") + } + if tag.IsAVCKeyNalu() { + log.Info("key nalu.") } - countV++ } } diff --git a/build.sh b/build.sh index 6d26526..749e3bc 100755 --- a/build.sh +++ b/build.sh @@ -20,6 +20,8 @@ cd ${ROOT_DIR}/app/flvfile2es && go build -o ${ROOT_DIR}/bin/flvfile2es cd ${ROOT_DIR}/app/flvfile2rtmppush && go build -o ${ROOT_DIR}/bin/flvfile2rtmppush +cd ${ROOT_DIR}/app/httpflvpull && go build -o ${ROOT_DIR}/bin/httpflvpull + cd ${ROOT_DIR}/app/modflvfile && go build -o ${ROOT_DIR}/bin/modflvfile cd ${ROOT_DIR}/app/rtmppull && go build -o ${ROOT_DIR}/bin/rtmppull diff --git a/build_linux.sh b/build_linux.sh index e536959..68c7b1e 100755 --- a/build_linux.sh +++ b/build_linux.sh @@ -20,6 +20,8 @@ cd ${ROOT_DIR}/app/flvfile2es && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR} cd ${ROOT_DIR}/app/flvfile2rtmppush && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/flvfile2rtmppush +cd ${ROOT_DIR}/app/httpflvpull && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/httpflvpull + cd ${ROOT_DIR}/app/modflvfile && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/modflvfile cd ${ROOT_DIR}/app/rtmppull && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/rtmppull diff --git a/pkg/httpflv/httpflv.go b/pkg/httpflv/httpflv.go index de1a00e..076c3f0 100644 --- a/pkg/httpflv/httpflv.go +++ b/pkg/httpflv/httpflv.go @@ -34,7 +34,7 @@ func parseHTTPHeader(r *bufio.Reader) (n int, firstLine string, headers map[stri for { line, isPrefix, err = r.ReadLine() - if len(line) == 0 { + if len(line) == 0 { // 读到一个空的 \r\n 表示http头全部读取完毕了 break } if isPrefix { diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 7cfc23f..eafec6f 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -12,9 +12,12 @@ import ( "time" ) + // rtmp客户端类型连接的底层实现 // rtmp包的使用者应该优先使用基于ClientSession实现的PushSession和PullSession type ClientSession struct { + UniqueKey string + t ClientSessionType obs PullSessionObserver // only for PullSession connectTimeout int64 @@ -27,12 +30,12 @@ type ClientSession struct { appName string streamName string hs HandshakeClient + peerWinAckSize int + Conn net.Conn rb *bufio.Reader wb *bufio.Writer - peerWinAckSize int - - UniqueKey string + wChan chan []byte } type ClientSessionType int @@ -61,6 +64,7 @@ func NewClientSession(t ClientSessionType, obs PullSessionObserver, connectTimeo packer: NewMessagePacker(), chunkComposer: NewChunkComposer(), UniqueKey: unique.GenUniqueKey(uk), + wChan: make(chan []byte, wChanSize), } } diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/rtmp.go index 523448d..e0d037d 100644 --- a/pkg/rtmp/rtmp.go +++ b/pkg/rtmp/rtmp.go @@ -48,9 +48,11 @@ const ( MSID1 = 1 // publish、play、onStatus 以及 音视频数据 ) +// TODO chef var ( readBufSize = 4096 writeBufSize = 4096 + wChanSize = 1024 ) var windowAcknowledgementSize = 5000000 diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index a62f185..89fa5a0 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -15,7 +15,6 @@ import ( // TODO chef: 没有进化成Pub Sub时的超时释放 -var wChanSize = 1024 // TODO chef type ServerSessionObserver interface { NewRTMPPubSessionCB(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听 @@ -51,8 +50,6 @@ type ServerSession struct { chunkComposer *ChunkComposer packer *MessagePacker - // to be continued - // TODO chef: 添加Dispose,以及chan发送 conn net.Conn rb *bufio.Reader wb *bufio.Writer @@ -88,6 +85,7 @@ func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession { func (s *ServerSession) RunLoop() (err error) { if err = s.handshake(); err != nil { + s.dispose(err) return err } diff --git a/pkg/util/bininfo/bininfo.go b/pkg/util/bininfo/bininfo.go index 703c418..7cb696c 100644 --- a/pkg/util/bininfo/bininfo.go +++ b/pkg/util/bininfo/bininfo.go @@ -19,11 +19,23 @@ var ( ) func StringifySingleLine() string { - return fmt.Sprintf("GitCommitID: %s. BuildTime: %s. GoVersion: %s. runtime: %s/%s", + return fmt.Sprintf("GitCommitID=%s. BuildTime=%s. GoVersion=%s. runtime=%s/%s.", GitCommitID, BuildTime, BuildGoVersion, runtime.GOOS, runtime.GOARCH) } func StringifyMultiLine() string { - return fmt.Sprintf("GitCommitID: %s\nBuildTime: %s\nGoVersion: %s\nruntime: %s/%s", + return fmt.Sprintf("GitCommitID=%s\nBuildTime=%s\nGoVersion=%s\nruntime=%s/%s.", GitCommitID, BuildTime, BuildGoVersion, runtime.GOOS, runtime.GOARCH) } + +func init() { + if GitCommitID == "" { + GitCommitID = "unknown" + } + if BuildTime == "" { + BuildTime = "unknown" + } + if BuildGoVersion == "" { + BuildGoVersion = "unknown" + } +}