- package logic: 将 rtmp pub session 的音视频转发给httpflv sub session

- package httpflv: ServerSubSession 使用 naza connection
- package logic: 增加 `example_test`,加载flv文件后使用rtmp推流至服务器,然后分别使用rtmp和httpflv将流拉取下来,存成文件,判断和输入文件是否相等
pull/1/head
q191201771 5 years ago
parent 0076e5c828
commit 24f56530e9

6
.gitignore vendored

@ -15,5 +15,7 @@ coverage.html
/logs/ /logs/
/pkg/hls/ /pkg/hls/
/pkg/rtmp/testdata/test.flv /testdata
/pkg/rtmp/testdata/test.flv.bak /pkg/rtmp/testdata
/pkg/httpflv/testdata
/pkg/logic/testdata

@ -23,7 +23,7 @@ func main() {
var err error var err error
flvFileName, aacFileName, avcFileName := parseFlag() flvFileName, aacFileName, avcFileName := parseFlag()
var ffr httpflv.FlvFileReader var ffr httpflv.FLVFileReader
err = ffr.Open(flvFileName) err = ffr.Open(flvFileName)
log.FatalIfErrorNotNil(err) log.FatalIfErrorNotNil(err)
defer ffr.Dispose() defer ffr.Dispose()
@ -39,7 +39,7 @@ func main() {
defer vfp.Close() defer vfp.Close()
log.Infof("open es h264 file succ.") log.Infof("open es h264 file succ.")
_, err = ffr.ReadFlvHeader() _, err = ffr.ReadFLVHeader()
log.FatalIfErrorNotNil(err) log.FatalIfErrorNotNil(err)
for { for {

@ -56,12 +56,12 @@ func main() {
log.Infof(" > round. i=%d, totalBaseTS=%d, prevTS=%d, thisBaseTS=%d", log.Infof(" > round. i=%d, totalBaseTS=%d, prevTS=%d, thisBaseTS=%d",
i, totalBaseTS, prevTS, thisBaseTS) i, totalBaseTS, prevTS, thisBaseTS)
var ffr httpflv.FlvFileReader var ffr httpflv.FLVFileReader
err = ffr.Open(flvFileName) err = ffr.Open(flvFileName)
log.FatalIfErrorNotNil(err) log.FatalIfErrorNotNil(err)
log.Infof("open succ. filename=%s", flvFileName) log.Infof("open succ. filename=%s", flvFileName)
flvHeader, err := ffr.ReadFlvHeader() flvHeader, err := ffr.ReadFLVHeader()
log.FatalIfErrorNotNil(err) log.FatalIfErrorNotNil(err)
log.Infof("read flv header succ. %v", flvHeader) log.Infof("read flv header succ. %v", flvHeader)

@ -23,7 +23,7 @@ func main() {
ReadTimeoutMS: 0, ReadTimeoutMS: 0,
}) })
err := session.Pull(url, func(tag *httpflv.Tag) { err := session.Pull(url, func(tag *httpflv.Tag) {
log.Infof("ReadFlvTagCB. %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu()) log.Infof("ReadFLVTagCB. %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu())
}) })
if err != nil { if err != nil {
log.Error(err) log.Error(err)

@ -56,19 +56,19 @@ func main() {
var err error var err error
inFileName, outFileName := parseFlag() inFileName, outFileName := parseFlag()
var ffr httpflv.FlvFileReader var ffr httpflv.FLVFileReader
err = ffr.Open(inFileName) err = ffr.Open(inFileName)
log.FatalIfErrorNotNil(err) log.FatalIfErrorNotNil(err)
defer ffr.Dispose() defer ffr.Dispose()
log.Infof("open input flv file succ.") log.Infof("open input flv file succ.")
var ffw httpflv.FlvFileWriter var ffw httpflv.FLVFileWriter
err = ffw.Open(outFileName) err = ffw.Open(outFileName)
log.FatalIfErrorNotNil(err) log.FatalIfErrorNotNil(err)
defer ffw.Dispose() defer ffw.Dispose()
log.Infof("open output flv file succ.") log.Infof("open output flv file succ.")
flvHeader, err := ffr.ReadFlvHeader() flvHeader, err := ffr.ReadFLVHeader()
log.FatalIfErrorNotNil(err) log.FatalIfErrorNotNil(err)
err = ffw.WriteRaw(flvHeader) err = ffw.WriteRaw(flvHeader)

@ -19,13 +19,13 @@ import (
) )
type Obs struct { type Obs struct {
w httpflv.FlvFileWriter w httpflv.FLVFileWriter
} }
func (obs *Obs) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { func (obs *Obs) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) {
log.Infof("%+v, abs ts=%d", header, timestampAbs) log.Infof("%+v, abs ts=%d", header, timestampAbs)
tag := logic.Trans.RTMPMsg2FlvTag(header, timestampAbs, message) tag := logic.Trans.RTMPMsg2FLVTag(header, timestampAbs, message)
err := obs.w.WriteTag(tag) err := obs.w.WriteTag(*tag)
log.FatalIfErrorNotNil(err) log.FatalIfErrorNotNil(err)
} }
@ -43,7 +43,7 @@ func main() {
err = obs.w.Open(outFileName) err = obs.w.Open(outFileName)
log.FatalIfErrorNotNil(err) log.FatalIfErrorNotNil(err)
//defer obs.w.Dispose() //defer obs.w.Dispose()
err = obs.w.WriteRaw(httpflv.FlvHeader) err = obs.w.WriteRaw(httpflv.FLVHeader)
log.FatalIfErrorNotNil(err) log.FatalIfErrorNotNil(err)
err = session.WaitLoop() err = session.WaitLoop()

@ -2,6 +2,9 @@
"rtmp": { "rtmp": {
"addr": ":19350" "addr": ":19350"
}, },
"httpflv": {
"sub_listen_addr": ":8080"
},
"log": { "log": {
"level": 1, "level": 1,
"filename": "./logs/lals.log", "filename": "./logs/lals.log",

@ -0,0 +1,18 @@
{
"rtmp": {
"addr": ":19350"
},
"httpflv": {
"sub_listen_addr": ":8080"
},
"log": {
"level": 1,
"filename": "./logs/lals.log",
"is_to_stdout": true,
"is_rotate_daily": true,
"short_file_flag": true
},
"pprof": {
"addr": ":10001"
}
}

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.12 go 1.12
require github.com/q191201771/naza v0.4.3 require github.com/q191201771/naza v0.4.4-0.20191022073946-f1b3e6ae4eba

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.4.3 h1:v7CDH3ONFyrptU8rpUeCakVdN3akxPSChHO4z3CRrAw= github.com/q191201771/naza v0.4.4-0.20191022073946-f1b3e6ae4eba h1:pVmUuAaWFm2Y/v32ZmV9KQLE86Opfc+7IP3h1RpuDF4=
github.com/q191201771/naza v0.4.3/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= github.com/q191201771/naza v0.4.4-0.20191022073946-f1b3e6ae4eba/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=

@ -15,7 +15,7 @@ import (
"github.com/q191201771/naza/pkg/bele" "github.com/q191201771/naza/pkg/bele"
) )
var avcErr = errors.New("avc: fxxk") var ErrAVC = errors.New("lal.avc: fxxk")
var NaluStartCode = []byte{0x0, 0x0, 0x0, 0x1} var NaluStartCode = []byte{0x0, 0x0, 0x0, 0x1}
@ -34,7 +34,7 @@ func ParseAVCSeqHeader(payload []byte) (sps, pps []byte, err error) {
// TODO chef: check if read out of <payload> range // TODO chef: check if read out of <payload> range
if payload[0] != 0x17 || payload[1] != 0x00 || payload[2] != 0 || payload[3] != 0 || payload[4] != 0 { if payload[0] != 0x17 || payload[1] != 0x00 || payload[2] != 0 || payload[3] != 0 || payload[4] != 0 {
err = avcErr err = ErrAVC
return return
} }

@ -27,10 +27,10 @@ func TestCorner(t *testing.T) {
sps, pps, err := ParseAVCSeqHeader([]byte{0}) sps, pps, err := ParseAVCSeqHeader([]byte{0})
assert.Equal(t, nil, sps) assert.Equal(t, nil, sps)
assert.Equal(t, nil, pps) assert.Equal(t, nil, pps)
assert.Equal(t, err, avcErr) assert.Equal(t, err, ErrAVC)
b := &bytes.Buffer{} b := &bytes.Buffer{}
err = CaptureAVC(b, []byte{0x17, 0x0, 0x1}) err = CaptureAVC(b, []byte{0x17, 0x0, 0x1})
assert.Equal(t, nil, b.Bytes()) assert.Equal(t, nil, b.Bytes())
assert.Equal(t, err, avcErr) assert.Equal(t, err, ErrAVC)
} }

@ -13,7 +13,6 @@ import (
"net" "net"
"net/url" "net/url"
"strings" "strings"
"sync"
"time" "time"
"github.com/q191201771/naza/pkg/connection" "github.com/q191201771/naza/pkg/connection"
@ -32,13 +31,12 @@ type PullSession struct {
config PullSessionConfig config PullSessionConfig
Conn connection.Connection Conn connection.Connection
closeOnce sync.Once
host string host string
uri string uri string
addr string addr string
readFlvTagCB ReadFlvTagCB readFLVTagCB ReadFLVTagCB
} }
func NewPullSession(config PullSessionConfig) *PullSession { func NewPullSession(config PullSessionConfig) *PullSession {
@ -50,7 +48,7 @@ func NewPullSession(config PullSessionConfig) *PullSession {
} }
} }
type ReadFlvTagCB func(tag *Tag) type ReadFLVTagCB func(tag *Tag)
// 阻塞直到拉流失败 // 阻塞直到拉流失败
// //
@ -58,8 +56,8 @@ type ReadFlvTagCB func(tag *Tag)
// http://{domain}/{app_name}/{stream_name}.flv // http://{domain}/{app_name}/{stream_name}.flv
// http://{ip}/{domain}/{app_name}/{stream_name}.flv // http://{ip}/{domain}/{app_name}/{stream_name}.flv
// //
// @param readFlvTagCB 读取到 flv tag 数据时回调。回调结束后PullSession不会再使用 <tag> 数据。 // @param readFLVTagCB 读取到 flv tag 数据时回调。回调结束后PullSession不会再使用 <tag> 数据。
func (session *PullSession) Pull(rawURL string, readFlvTagCB ReadFlvTagCB) error { func (session *PullSession) Pull(rawURL string, readFLVTagCB ReadFLVTagCB) error {
if err := session.Connect(rawURL); err != nil { if err := session.Connect(rawURL); err != nil {
return err return err
} }
@ -67,16 +65,12 @@ func (session *PullSession) Pull(rawURL string, readFlvTagCB ReadFlvTagCB) error
return err return err
} }
return session.runReadLoop(readFlvTagCB) return session.runReadLoop(readFLVTagCB)
} }
func (session *PullSession) Dispose(err error) { func (session *PullSession) Dispose(err error) {
session.closeOnce.Do(func() {
log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err) log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err)
if err := session.Conn.Close(); err != nil { _ = session.Conn.Close()
log.Errorf("conn close error. [%s] err=%v", session.UniqueKey, err)
}
})
} }
func (session *PullSession) Connect(rawURL string) error { func (session *PullSession) Connect(rawURL string) error {
@ -86,7 +80,7 @@ func (session *PullSession) Connect(rawURL string) error {
return err return err
} }
if url.Scheme != "http" || !strings.HasSuffix(url.Path, ".flv") { if url.Scheme != "http" || !strings.HasSuffix(url.Path, ".flv") {
return httpFlvErr return ErrHTTPFLV
} }
session.host = url.Host session.host = url.Host
@ -128,7 +122,7 @@ func (session *PullSession) ReadHTTPRespHeader() (firstLine string, headers map[
} }
if !strings.Contains(firstLine, "200") || len(headers) == 0 { if !strings.Contains(firstLine, "200") || len(headers) == 0 {
err = httpFlvErr err = ErrHTTPFLV
return return
} }
log.Infof("-----> http response header. [%s]", session.UniqueKey) log.Infof("-----> http response header. [%s]", session.UniqueKey)
@ -136,7 +130,7 @@ func (session *PullSession) ReadHTTPRespHeader() (firstLine string, headers map[
return return
} }
func (session *PullSession) ReadFlvHeader() ([]byte, error) { func (session *PullSession) ReadFLVHeader() ([]byte, error) {
flvHeader := make([]byte, flvHeaderSize) flvHeader := make([]byte, flvHeaderSize)
_, err := session.Conn.ReadAtLeast(flvHeader, flvHeaderSize) _, err := session.Conn.ReadAtLeast(flvHeader, flvHeaderSize)
if err != nil { if err != nil {
@ -152,12 +146,12 @@ func (session *PullSession) ReadTag() (*Tag, error) {
return readTag(session.Conn) return readTag(session.Conn)
} }
func (session *PullSession) runReadLoop(readFlvTagCB ReadFlvTagCB) error { func (session *PullSession) runReadLoop(readFLVTagCB ReadFLVTagCB) error {
if _, _, err := session.ReadHTTPRespHeader(); err != nil { if _, _, err := session.ReadHTTPRespHeader(); err != nil {
return err return err
} }
if _, err := session.ReadFlvHeader(); err != nil { if _, err := session.ReadFLVHeader(); err != nil {
return err return err
} }
@ -166,6 +160,6 @@ func (session *PullSession) runReadLoop(readFlvTagCB ReadFlvTagCB) error {
if err != nil { if err != nil {
return err return err
} }
readFlvTagCB(tag) readFLVTagCB(tag)
} }
} }

@ -0,0 +1,50 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpflv_test
import (
"testing"
"github.com/q191201771/lal/pkg/httpflv"
log "github.com/q191201771/naza/pkg/nazalog"
)
// TODO chef: 后续加个 httpflv post 在做完整流程测试吧
var (
serverAddr = ":10001"
pullURL = "http://127.0.0.1:10001/live/11111.flv"
)
type MockServerObserver struct {
}
func (so *MockServerObserver) NewHTTPFLVSubSessionCB(session *httpflv.SubSession) bool {
return true
}
func (so *MockServerObserver) DelHTTPFLVSubSessionCB(session *httpflv.SubSession) {
}
func TestExample(t *testing.T) {
var err error
var so MockServerObserver
s := httpflv.NewServer(&so, serverAddr)
go s.RunLoop()
pullSession := httpflv.NewPullSession(httpflv.PullSessionConfig{
ConnectTimeoutMS: 1000,
ReadTimeoutMS: 1000,
})
err = pullSession.Pull(pullURL, func(tag *httpflv.Tag) {
})
log.Debugf("pull failed. err=%+v", err)
}

@ -12,27 +12,27 @@ import (
"os" "os"
) )
type FlvFileReader struct { type FLVFileReader struct {
fp *os.File fp *os.File
} }
func (ffr *FlvFileReader) Open(filename string) (err error) { func (ffr *FLVFileReader) Open(filename string) (err error) {
ffr.fp, err = os.Open(filename) ffr.fp, err = os.Open(filename)
return return
} }
func (ffr *FlvFileReader) ReadFlvHeader() ([]byte, error) { func (ffr *FLVFileReader) ReadFLVHeader() ([]byte, error) {
flvHeader := make([]byte, flvHeaderSize) flvHeader := make([]byte, flvHeaderSize)
_, err := ffr.fp.Read(flvHeader) _, err := ffr.fp.Read(flvHeader)
return flvHeader, err return flvHeader, err
} }
// TODO chef: 返回 Tag 类型,对比 bench // TODO chef: 返回 Tag 类型,对比 bench
func (ffr *FlvFileReader) ReadTag() (*Tag, error) { func (ffr *FLVFileReader) ReadTag() (*Tag, error) {
return readTag(ffr.fp) return readTag(ffr.fp)
} }
func (ffr *FlvFileReader) Dispose() { func (ffr *FLVFileReader) Dispose() {
if ffr.fp != nil { if ffr.fp != nil {
_ = ffr.fp.Close() _ = ffr.fp.Close()
} }

@ -10,26 +10,26 @@ package httpflv
import "os" import "os"
type FlvFileWriter struct { type FLVFileWriter struct {
fp *os.File fp *os.File
} }
func (ffw *FlvFileWriter) Open(filename string) (err error) { func (ffw *FLVFileWriter) Open(filename string) (err error) {
ffw.fp, err = os.Create(filename) ffw.fp, err = os.Create(filename)
return return
} }
func (ffw *FlvFileWriter) WriteRaw(b []byte) (err error) { func (ffw *FLVFileWriter) WriteRaw(b []byte) (err error) {
_, err = ffw.fp.Write(b) _, err = ffw.fp.Write(b)
return return
} }
func (ffw *FlvFileWriter) WriteTag(tag Tag) (err error) { func (ffw *FLVFileWriter) WriteTag(tag Tag) (err error) {
_, err = ffw.fp.Write(tag.Raw) _, err = ffw.fp.Write(tag.Raw)
return return
} }
func (ffw *FlvFileWriter) Dispose() { func (ffw *FLVFileWriter) Dispose() {
if ffw.fp != nil { if ffw.fp != nil {
_ = ffw.fp.Close() _ = ffw.fp.Close()
} }

@ -1,215 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpflv
/*
// v1.0.0 版本之前不提供 httpflv 功能
import (
"bytes"
log "github.com/q191201771/naza/pkg/nazalog"
"sync"
)
type GOP struct {
tags []*Tag
//raw []byte
firstTimestamp uint32
}
type GOPCache struct {
gopNum int
metadata *Tag
avcSeqHeader *Tag
aacSeqHeader *Tag
gops []*GOP // TODO chef: maybe use other container to mock a queue
mutex sync.Mutex
}
// gopNum: 0 means only cache metadata, avc seq header, aac seq header
func NewGOPCache(gopNum int) *GOPCache {
return &GOPCache{
gopNum: gopNum,
}
}
func (c *GOPCache) Push(tag *Tag) {
c.mutex.Lock()
defer c.mutex.Unlock()
if tag.IsMetadata() {
// TODO chef: will this happen?
if c.metadata != nil {
log.Debugf("updating metadata.")
log.Debug(tag.Header, tag.Raw[TagHeaderSize:])
log.Debug(c.metadata.Header, c.metadata.Raw[TagHeaderSize:])
c.clearGOP()
}
c.metadata = tag
}
if tag.IsAVCKeySeqHeader() {
if c.avcSeqHeader == nil {
c.avcSeqHeader = tag
} else {
// TODO chef: compare nessary? if other way to update seq header and handle cache stuff?
if bytes.Compare(tag.Raw[TagHeaderSize:], c.avcSeqHeader.Raw[TagHeaderSize:]) == 0 {
// noop
} else {
log.Debugf("updating avc seq header.")
log.Debug(tag.Header, tag.Raw[TagHeaderSize:])
log.Debug(c.avcSeqHeader.Header, c.avcSeqHeader.Raw[TagHeaderSize:])
c.clearGOP()
c.avcSeqHeader = tag
}
}
}
if tag.IsAACSeqHeader() {
if c.aacSeqHeader == nil {
c.aacSeqHeader = tag
} else {
if bytes.Compare(tag.Raw[TagHeaderSize:], c.aacSeqHeader.Raw[TagHeaderSize:]) == 0 {
// noop
} else {
log.Debugf("updating aac seq header.")
c.clearGOP()
c.aacSeqHeader = tag
}
}
c.aacSeqHeader = tag
}
if c.gopNum == 0 {
return
}
if len(c.gops) == 0 {
if tag.IsAVCKeyNalu() {
gop := &GOP{}
gop.firstTimestamp = tag.Header.Timestamp
gop.tags = append(gop.tags, tag)
c.gops = append(c.gops, gop)
c.syncOldestKeyNaluTimestampToSeqHeader()
}
} else {
if tag.IsAVCKeyNalu() {
gop := &GOP{}
gop.firstTimestamp = tag.Header.Timestamp
gop.tags = append(gop.tags, tag)
c.gops = append(c.gops, gop)
if len(c.gops) > c.gopNum+1 {
c.gops = c.gops[1:]
c.syncOldestKeyNaluTimestampToSeqHeader()
}
} else {
c.gops[len(c.gops)-1].tags = append(c.gops[len(c.gops)-1].tags, tag)
}
}
}
func (c *GOPCache) WriteWholeThings(writer Writer) (hasKeyFrame bool) {
if tag := c.getMetadata(); tag != nil {
writer.WriteTag(tag)
}
avc := c.getAVCSeqHeader()
aac := c.getAACSeqHeader()
// TODO chef: if nessary to sort them by timestamp
if avc != nil && aac != nil {
if avc.Header.Timestamp <= aac.Header.Timestamp {
writer.WriteTag(avc)
writer.WriteTag(aac)
} else {
writer.WriteTag(aac)
writer.WriteTag(avc)
}
} else if avc != nil && aac == nil {
writer.WriteTag(avc)
} else if avc == nil && aac != nil {
writer.WriteTag(aac)
}
c.writeGOPs(writer, false)
return
}
func (c *GOPCache) ClearAll() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.metadata = nil
c.avcSeqHeader = nil
c.aacSeqHeader = nil
c.gops = nil
}
func (c *GOPCache) writeGOPs(write Writer, mustCompleted bool) bool {
c.mutex.Lock()
defer c.mutex.Unlock()
neededLen := len(c.gops)
if mustCompleted {
neededLen--
}
if neededLen <= 0 {
return false
}
for i := 0; i != neededLen; i++ {
for j := 0; j != len(c.gops[i].tags); j++ {
write.WriteTag(c.gops[i].tags[j])
}
}
return true
}
func (c *GOPCache) getMetadata() (res *Tag) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.metadata != nil {
res = c.metadata.cloneTag()
}
return
}
func (c *GOPCache) getAVCSeqHeader() (res *Tag) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.avcSeqHeader != nil {
res = c.avcSeqHeader.cloneTag()
}
return
}
func (c *GOPCache) getAACSeqHeader() (res *Tag) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.aacSeqHeader != nil {
res = c.aacSeqHeader.cloneTag()
}
return
}
func (c *GOPCache) clearGOP() {
log.Debug("clearGOP")
c.gops = nil
}
// TODO chef: if nessary
func (c *GOPCache) syncOldestKeyNaluTimestampToSeqHeader() {
ts := c.gops[0].firstTimestamp
if c.avcSeqHeader != nil {
c.avcSeqHeader.Header.Timestamp = ts
}
if c.aacSeqHeader != nil {
c.aacSeqHeader.Header.Timestamp = ts
}
}
*/

@ -1,163 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpflv
/*
// v1.0.0 版本之前不提供 httpflv 功能
// TODO chef: set me by config
var gopCacheNum = 2
// TODO chef: 所有新增对象的UniqueKey
// TODO chef: 将Observer方式改成 func CB方式
type GroupObserver interface {
ReadHTTPRespHeaderCB()
ReadFlvHeaderCB(flvHeader []byte)
ReadFlvTagCB(tag *Tag)
}
type Group struct {
appName string
streamName string
pullSession *PullSession
subSessionSet map[*SubSession]struct{}
gopCache *GOPCache
mutex sync.Mutex
obs GroupObserver
}
func NewGroup(appName string, streamName string) *Group {
return &Group{
appName: appName,
streamName: streamName,
subSessionSet: make(map[*SubSession]struct{}),
gopCache: NewGOPCache(gopCacheNum),
}
}
func (group *Group) RunLoop() {
}
func (group *Group) AddHTTPFlvSubSession(session *SubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
log.Debugf("add SubSession into group. [%s]", session.UniqueKey)
group.subSessionSet[session] = struct{}{}
go func() {
if err := session.RunLoop(); err != nil {
log.Debugf("SubSession loop done. [%s] err=%v", session.UniqueKey, err)
}
group.mutex.Lock()
defer group.mutex.Unlock()
log.Infof("del SubSession out of group. [%s]", session.UniqueKey)
delete(group.subSessionSet, session)
}()
// TODO chef: 在这里发送http和flv的头还是确保有数据了再发
session.WriteHTTPResponseHeader()
session.WriteFlvHeader()
if group.gopCache.WriteWholeThings(session) {
session.HasKeyFrame = true
}
}
func (group *Group) Pull(addr string, connectTimeout int64, readTimeout int64) {
group.pullSession = NewPullSession(PullSessionConfig{
ConnectTimeoutMS: int(connectTimeout),
ReadTimeoutMS: int(readTimeout),
})
defer func() {
group.mutex.Lock()
defer group.mutex.Unlock()
group.pullSession = nil
log.Infof("del httpflv PullSession out of group. [%s]", group.pullSession.UniqueKey)
}()
log.Infof("<----- connect. [%s]", group.pullSession.UniqueKey)
url := fmt.Sprintf("http://%s/%s/%s.flv", addr, group.appName, group.streamName)
// TODO chef: impl cb
if err := group.pullSession.Pull(url, group.ReadFlvTagCB); err != nil {
}
//if err := group.pullSession.Pull(url, nil); err != nil {
//log.Errorf("-----> connect error. [%s] err=%v", group.pullSession.UniqueKey, err)
//return
//}
//log.Infof("-----> connect succ. [%s]", group.pullSession.UniqueKey)
//if err := group.pullSession.RunLoop(); err != nil {
// log.Debugf("PullSession loop done. [%s] err=%v", group.pullSession.UniqueKey, err)
// return
//}
}
func (group *Group) IsTotalEmpty() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.pullSession == nil && len(group.subSessionSet) == 0
}
func (group *Group) IsInExist() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return false
}
func (group *Group) SetObserver(obs GroupObserver) {
// 确保如果调用SetObserver那么调用发生在Pull之前就不对obs加锁保护了
group.obs = obs
}
// PullSessionObserver
func (group *Group) ReadHTTPRespHeaderCB() {
if group.obs != nil {
group.obs.ReadHTTPRespHeaderCB()
}
}
// PullSessionObserver
func (group *Group) ReadFlvHeaderCB(flvHeader []byte) {
if group.obs != nil {
group.obs.ReadFlvHeaderCB(flvHeader)
}
}
// PullSessionObserver
func (group *Group) ReadFlvTagCB(tag *Tag) {
group.mutex.Lock()
defer group.mutex.Unlock()
// TODO chef: assume that write fast and would not block
for session := range group.subSessionSet {
// TODO chef: 如果一个流上只有音频永远没有视频该如何处理
if session.HasKeyFrame {
session.WriteRawPacket(tag.Raw)
} else {
if tag.IsMetadata() || tag.IsAVCKeySeqHeader() || tag.IsAACSeqHeader() || tag.IsAVCKeyNalu() {
if tag.IsAVCKeyNalu() {
session.HasKeyFrame = true
}
session.WriteRawPacket(tag.Raw)
}
}
}
group.gopCache.Push(tag)
if group.obs != nil {
group.obs.ReadFlvTagCB(tag)
}
}
*/

@ -18,16 +18,14 @@ type Writer interface {
WriteTag(tag *Tag) WriteTag(tag *Tag)
} }
var httpFlvErr = errors.New("httpflv: fxxk") var ErrHTTPFLV = errors.New("lal.httpflv: fxxk")
const ( const (
flvHeaderSize = 13 flvHeaderSize = 13
prevTagFieldSize = 4 prevTagFieldSize = 4
) )
var FlvHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00} var FLVHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00}
var readBufSize = 16384
type LineReader interface { type LineReader interface {
ReadLine() (line []byte, isPrefix bool, err error) ReadLine() (line []byte, isPrefix bool, err error)
@ -44,7 +42,7 @@ func parseHTTPHeader(r LineReader) (n int, firstLine string, headers map[string]
return return
} }
if len(line) == 0 || isPrefix { if len(line) == 0 || isPrefix {
err = httpFlvErr err = ErrHTTPFLV
return return
} }
firstLine = string(line) firstLine = string(line)
@ -56,7 +54,7 @@ func parseHTTPHeader(r LineReader) (n int, firstLine string, headers map[string]
break break
} }
if isPrefix { if isPrefix {
err = httpFlvErr err = ErrHTTPFLV
return return
} }
if err != nil { if err != nil {
@ -66,7 +64,7 @@ func parseHTTPHeader(r LineReader) (n int, firstLine string, headers map[string]
n += len(l) n += len(l)
pos := strings.Index(l, ":") pos := strings.Index(l, ":")
if pos == -1 { if pos == -1 {
err = httpFlvErr err = ErrHTTPFLV
return return
} }
headers[strings.Trim(l[0:pos], " ")] = strings.Trim(l[pos+1:], " ") headers[strings.Trim(l[0:pos], " ")] = strings.Trim(l[pos+1:], " ")

@ -18,23 +18,22 @@ import (
type ServerObserver interface { type ServerObserver interface {
// 通知上层有新的拉流者 // 通知上层有新的拉流者
// 返回值: true则允许拉流false则关闭连接 // 返回值: true则允许拉流false则关闭连接
NewHTTPFlvSubSessionCB(session *SubSession) bool NewHTTPFLVSubSessionCB(session *SubSession) bool
DelHTTPFLVSubSessionCB(session *SubSession)
} }
type Server struct { type Server struct {
obs ServerObserver obs ServerObserver
addr string addr string
subWriteTimeout int64
m sync.Mutex m sync.Mutex
ln net.Listener ln net.Listener
} }
func NewServer(obs ServerObserver, addr string, subWriteTimeout int64) *Server { func NewServer(obs ServerObserver, addr string) *Server {
return &Server{ return &Server{
obs: obs, obs: obs,
addr: addr, addr: addr,
subWriteTimeout: subWriteTimeout,
} }
} }
@ -69,14 +68,18 @@ func (server *Server) Dispose() {
func (server *Server) handleConnect(conn net.Conn) { func (server *Server) handleConnect(conn net.Conn) {
log.Infof("accept a http flv connection. remoteAddr=%v", conn.RemoteAddr()) log.Infof("accept a http flv connection. remoteAddr=%v", conn.RemoteAddr())
session := NewSubSession(conn, server.subWriteTimeout) session := NewSubSession(conn)
if err := session.ReadRequest(); err != nil { if err := session.ReadRequest(); err != nil {
log.Errorf("read SubSession request error. [%s]", session.UniqueKey) log.Errorf("read SubSession request error. [%s]", session.UniqueKey)
return return
} }
log.Infof("-----> http request. [%s] uri=%s", session.UniqueKey, session.URI) log.Infof("-----> http request. [%s] uri=%s", session.UniqueKey, session.URI)
if !server.obs.NewHTTPFlvSubSessionCB(session) { if !server.obs.NewHTTPFLVSubSessionCB(session) {
session.Dispose(httpFlvErr) session.Dispose()
} }
err := session.RunLoop()
log.Debugf("httpflv sub session loop done. err=%v", err)
server.obs.DelHTTPFLVSubSessionCB(session)
} }

@ -9,14 +9,13 @@
package httpflv package httpflv
import ( import (
"bufio"
"net" "net"
url2 "net/url" url2 "net/url"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
"github.com/q191201771/naza/pkg/connection"
log "github.com/q191201771/naza/pkg/nazalog" log "github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique" "github.com/q191201771/naza/pkg/unique"
) )
@ -33,40 +32,33 @@ var flvHTTPResponseHeader = []byte(flvHTTPResponseHeaderStr)
var flvHeaderBuf13 = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x0, 0x0, 0x0, 0x09, 0x0, 0x0, 0x0, 0x0} var flvHeaderBuf13 = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x0, 0x0, 0x0, 0x09, 0x0, 0x0, 0x0, 0x0}
var wChanSize = 1024 // TODO chef: 1024
type SubSession struct { type SubSession struct {
UniqueKey string UniqueKey string
writeTimeout int64
StartTick int64 StartTick int64
StreamName string StreamName string
AppName string AppName string
URI string URI string
Headers map[string]string Headers map[string]string
HasKeyFrame bool IsFresh bool
WaitKeyNalu bool
conn net.Conn
rb *bufio.Reader
wChan chan []byte
closeOnce sync.Once conn connection.Connection
exitChan chan struct{}
hasClosedFlag uint32
} }
func NewSubSession(conn net.Conn, writeTimeout int64) *SubSession { func NewSubSession(conn net.Conn) *SubSession {
uk := unique.GenUniqueKey("FLVSUB") uk := unique.GenUniqueKey("FLVSUB")
log.Infof("lifecycle new SubSession. [%s] remoteAddr=%s", uk, conn.RemoteAddr().String()) log.Infof("lifecycle new SubSession. [%s] remoteAddr=%s", uk, conn.RemoteAddr().String())
return &SubSession{ return &SubSession{
writeTimeout: writeTimeout,
conn: conn,
rb: bufio.NewReaderSize(conn, readBufSize),
wChan: make(chan []byte, wChanSize),
exitChan: make(chan struct{}),
UniqueKey: uk, UniqueKey: uk,
IsFresh: true,
WaitKeyNalu: true,
conn: connection.New(conn, func(option *connection.Option) {
option.ReadBufSize = readBufSize
option.WriteChanSize = wChanSize
option.WriteTimeoutMS = subSessionWriteTimeoutMS
}),
} }
} }
@ -76,19 +68,19 @@ func (session *SubSession) ReadRequest() (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
session.Dispose(err) session.Dispose()
} }
}() }()
var firstLine string var firstLine string
_, firstLine, session.Headers, err = parseHTTPHeader(session.rb) _, firstLine, session.Headers, err = parseHTTPHeader(session.conn)
if err != nil { if err != nil {
return return
} }
items := strings.Split(string(firstLine), " ") items := strings.Split(string(firstLine), " ")
if len(items) != 3 || items[0] != "GET" { if len(items) != 3 || items[0] != "GET" {
err = httpFlvErr err = ErrHTTPFLV
return return
} }
@ -99,19 +91,19 @@ func (session *SubSession) ReadRequest() (err error) {
return return
} }
if !strings.HasSuffix(urlObj.Path, ".flv") { if !strings.HasSuffix(urlObj.Path, ".flv") {
err = httpFlvErr err = ErrHTTPFLV
return return
} }
items = strings.Split(urlObj.Path, "/") items = strings.Split(urlObj.Path, "/")
if len(items) != 3 { if len(items) != 3 {
err = httpFlvErr err = ErrHTTPFLV
return return
} }
session.AppName = items[1] session.AppName = items[1]
items = strings.Split(items[2], ".") items = strings.Split(items[2], ".")
if len(items) < 2 { if len(items) < 2 {
err = httpFlvErr err = ErrHTTPFLV
return return
} }
session.StreamName = items[0] session.StreamName = items[0]
@ -120,15 +112,9 @@ func (session *SubSession) ReadRequest() (err error) {
} }
func (session *SubSession) RunLoop() error { func (session *SubSession) RunLoop() error {
go func() {
buf := make([]byte, 128) buf := make([]byte, 128)
if _, err := session.conn.Read(buf); err != nil { _, err := session.conn.Read(buf)
log.Errorf("read failed. [%s] err=%v", session.UniqueKey, err) return err
session.Dispose(err)
}
}()
return session.runWriteLoop()
} }
func (session *SubSession) WriteHTTPResponseHeader() { func (session *SubSession) WriteHTTPResponseHeader() {
@ -136,7 +122,7 @@ func (session *SubSession) WriteHTTPResponseHeader() {
session.WriteRawPacket(flvHTTPResponseHeader) session.WriteRawPacket(flvHTTPResponseHeader)
} }
func (session *SubSession) WriteFlvHeader() { func (session *SubSession) WriteFLVHeader() {
log.Infof("<----- http flv header. [%s]", session.UniqueKey) log.Infof("<----- http flv header. [%s]", session.UniqueKey)
session.WriteRawPacket(flvHeaderBuf13) session.WriteRawPacket(flvHeaderBuf13)
} }
@ -146,52 +132,9 @@ func (session *SubSession) WriteTag(tag *Tag) {
} }
func (session *SubSession) WriteRawPacket(pkt []byte) { func (session *SubSession) WriteRawPacket(pkt []byte) {
if session.hasClosed() { _, _ = session.conn.Write(pkt)
return
}
for {
select {
case session.wChan <- pkt:
return
default:
if session.hasClosed() {
return
}
}
}
}
func (session *SubSession) Dispose(err error) {
session.closeOnce.Do(func() {
log.Infof("lifecycle dispose SubSession. [%s] reason=%v", session.UniqueKey, err)
atomic.StoreUint32(&session.hasClosedFlag, 1)
close(session.exitChan)
if err := session.conn.Close(); err != nil {
log.Errorf("conn close error. [%s] err=%v", session.UniqueKey, err)
}
})
}
func (session *SubSession) runWriteLoop() error {
for {
select {
case <-session.exitChan:
return httpFlvErr
case pkt := <-session.wChan:
if session.hasClosed() {
return httpFlvErr
}
// TODO chef: use bufio.Writer
_, err := session.conn.Write(pkt)
if err != nil {
session.Dispose(err)
return err
}
}
}
} }
func (session *SubSession) hasClosed() bool { func (session *SubSession) Dispose() {
return atomic.LoadUint32(&session.hasClosedFlag) == 1 _ = session.conn.Close()
} }

@ -100,7 +100,7 @@ func IsAACSeqHeader(tag []byte) bool {
return tag[0] == TagTypeAudio && tag[TagHeaderSize]>>4 == SoundFormatAAC && tag[TagHeaderSize+1] == AACPacketTypeSeqHeader return tag[0] == TagTypeAudio && tag[TagHeaderSize]>>4 == SoundFormatAAC && tag[TagHeaderSize+1] == AACPacketTypeSeqHeader
} }
func PackHTTPFlvTag(t uint8, timestamp uint32, in []byte) []byte { func PackHTTPFLVTag(t uint8, timestamp uint32, in []byte) []byte {
out := make([]byte, TagHeaderSize+len(in)+prevTagSizeFieldSize) out := make([]byte, TagHeaderSize+len(in)+prevTagSizeFieldSize)
out[0] = t out[0] = t
bele.BEPutUint24(out[1:], uint32(len(in))) bele.BEPutUint24(out[1:], uint32(len(in)))

@ -6,8 +6,8 @@
// //
// Author: Chef (191201771@qq.com) // Author: Chef (191201771@qq.com)
package logic package httpflv
import "errors" var readBufSize = 16384 // ClientPullSession 和 SubSession 读取数据时
var wChanSize = 1024 // SubSession 发送数据时 channel 的大小
var lalErr = errors.New("lal: fxxk") var subSessionWriteTimeoutMS = 10000

@ -18,34 +18,21 @@ import (
type Config struct { type Config struct {
RTMP RTMP `json:"rtmp"` RTMP RTMP `json:"rtmp"`
HTTPFLV HTTPFLV `json:"httpflv"`
Log log.Option `json:"log"` Log log.Option `json:"log"`
PProf PProf `json:"pprof"` PProf PProf `json:"pprof"`
// v1.0.0之前不提供
SubIdleTimeout int64 `json:"sub_idle_timeout"`
GOPCacheNum int `json:"gop_cache_number"`
HTTPFlv HTTPFlv `json:"httpflv"`
Pull Pull `json:"pull"`
} }
type RTMP struct { type RTMP struct {
Addr string `json:"addr"` Addr string `json:"addr"`
} }
type PProf struct { type HTTPFLV struct {
Addr string `json:"addr"`
}
type HTTPFlv struct {
SubListenAddr string `json:"sub_listen_addr"` SubListenAddr string `json:"sub_listen_addr"`
} }
type Pull struct { type PProf struct {
Type string `json:"type"`
Addr string `json:"addr"` Addr string `json:"addr"`
ConnectTimeout int64 `json:"connect_timeout"`
ReadTimeout int64 `json:"read_timeout"`
StopPullWhileNoSubTimeout int64 `json:"stop_pull_while_no_sub_timeout"`
} }
func LoadConf(confFile string) (*Config, error) { func LoadConf(confFile string) (*Config, error) {
@ -58,14 +45,15 @@ func LoadConf(confFile string) (*Config, error) {
return nil, err return nil, err
} }
// TODO chef: check item valid.
j, err := nazajson.New(rawContent) j, err := nazajson.New(rawContent)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !j.Exist("rtmp.addr") {
config.RTMP.Addr = ":1935" // 检查配置必须项
} // 暂时无
// 配置不存在时,设置默认值
if !j.Exist("log.level") { if !j.Exist("log.level") {
config.Log.Level = log.LevelDebug config.Log.Level = log.LevelDebug
} }

@ -0,0 +1,170 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"testing"
"time"
"github.com/q191201771/naza/pkg/nazaatomic"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/assert"
)
var (
tt *testing.T
confFile = "testdata/lals.default.conf.json"
rFLVFileName = "testdata/test.flv"
wFLVPullFileName = "testdata/flvpull.flv"
wRTMPPullFileName = "testdata/rtmppull.flv"
pushURL string
httpflvPullURL string
rtmpPullURL string
fileReader httpflv.FLVFileReader
HTTPFLVWriter httpflv.FLVFileWriter
RTMPWriter httpflv.FLVFileWriter
pushSession *rtmp.PushSession
httpflvPullSession *httpflv.PullSession
rtmpPullSession *rtmp.PullSession
fileTagCount nazaatomic.Uint32
httpflvPullTagCount nazaatomic.Uint32
rtmpPullTagCount nazaatomic.Uint32
)
type MockRTMPPullSessionObserver struct {
}
// TODO chef: httpflv 和 rtmp 两种协议的 pull 接口形式不统一
func (mrpso *MockRTMPPullSessionObserver) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) {
tag := Trans.RTMPMsg2FLVTag(header, timestampAbs, message)
err := RTMPWriter.WriteTag(*tag)
assert.Equal(tt, nil, err)
rtmpPullTagCount.Increment()
}
func TestExample(t *testing.T) {
tt = t
var err error
err = fileReader.Open(rFLVFileName)
assert.Equal(t, nil, err)
config, err := LoadConf(confFile)
assert.IsNotNil(t, config)
assert.Equal(t, nil, err)
pushURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/11111", config.RTMP.Addr)
httpflvPullURL = fmt.Sprintf("http://127.0.0.1%s/live/11111.flv", config.HTTPFLV.SubListenAddr)
rtmpPullURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/11111", config.RTMP.Addr)
sm := NewServerManager(config)
go sm.RunLoop()
time.Sleep(200 * time.Millisecond)
err = HTTPFLVWriter.Open(wFLVPullFileName)
assert.Equal(t, nil, err)
err = HTTPFLVWriter.WriteRaw(httpflv.FLVHeader)
assert.Equal(t, nil, err)
err = RTMPWriter.Open(wRTMPPullFileName)
assert.Equal(t, nil, err)
err = RTMPWriter.WriteRaw(httpflv.FLVHeader)
assert.Equal(t, nil, err)
go func() {
var mrpso MockRTMPPullSessionObserver
rtmpPullSession = rtmp.NewPullSession(&mrpso, rtmp.PullSessionTimeout{
ReadAVTimeoutMS: 500,
})
err := rtmpPullSession.Pull(rtmpPullURL)
assert.Equal(t, nil, err)
}()
go func() {
httpflvPullSession = httpflv.NewPullSession(httpflv.PullSessionConfig{
ReadTimeoutMS: 500,
})
err := httpflvPullSession.Pull(httpflvPullURL, func(tag *httpflv.Tag) {
err := HTTPFLVWriter.WriteTag(*tag)
assert.Equal(t, nil, err)
httpflvPullTagCount.Increment()
})
nazalog.Error(err)
}()
time.Sleep(200 * time.Millisecond)
pushSession = rtmp.NewPushSession(rtmp.PushSessionTimeout{})
err = pushSession.Push(pushURL)
assert.Equal(t, nil, err)
_, err = fileReader.ReadFLVHeader()
assert.Equal(t, nil, err)
for {
tag, err := fileReader.ReadTag()
if err == io.EOF {
break
}
assert.Equal(t, nil, err)
fileTagCount.Increment()
h, _, m := Trans.FLVTag2RTMPMsg(*tag)
chunks := rtmp.Message2Chunks(m, &h)
err = pushSession.AsyncWrite(chunks)
assert.Equal(t, nil, err)
}
err = pushSession.Flush()
assert.Equal(t, nil, err)
time.Sleep(1 * time.Second)
fileReader.Dispose()
pushSession.Dispose()
httpflvPullSession.Dispose(nil)
rtmpPullSession.Dispose()
HTTPFLVWriter.Dispose()
RTMPWriter.Dispose()
sm.Dispose()
nazalog.Debugf("count. %d %d %d", fileTagCount.Load(), httpflvPullTagCount.Load(), rtmpPullTagCount.Load())
compareFile()
}
func compareFile() {
r, err := ioutil.ReadFile(rFLVFileName)
assert.Equal(tt, nil, err)
w, err := ioutil.ReadFile(wFLVPullFileName)
assert.Equal(tt, nil, err)
res := bytes.Compare(r, w)
assert.Equal(tt, 0, res)
err = os.Remove(wFLVPullFileName)
assert.Equal(tt, nil, err)
w2, err := ioutil.ReadFile(wRTMPPullFileName)
assert.Equal(tt, nil, err)
res = bytes.Compare(r, w2)
assert.Equal(tt, 0, res)
err = os.Remove(wRTMPPullFileName)
assert.Equal(tt, nil, err)
}

@ -10,7 +10,6 @@ package logic
import ( import (
"sync" "sync"
"time"
"github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/lal/pkg/rtmp"
@ -29,39 +28,36 @@ type Group struct {
mutex sync.Mutex mutex sync.Mutex
pubSession *rtmp.ServerSession pubSession *rtmp.ServerSession
pullSession *rtmp.PullSession pullSession *rtmp.PullSession
subSessionSet map[*rtmp.ServerSession]struct{} rtmpSubSessionSet map[*rtmp.ServerSession]struct{}
// TODO chef: httpflvSubSessionSet map[*httpflv.SubSession]struct{}
// rtmp chunk格式
metadata []byte metadata []byte
avcKeySeqHeader []byte avcKeySeqHeader []byte
aacSeqHeader []byte aacSeqHeader []byte
// httpflv tag格式
// TODO chef: 如果没有开启httpflv监听可以不做格式转换节约CPU资源
metadataTag *httpflv.Tag
avcKeySeqHeaderTag *httpflv.Tag
aacSeqHeaderTag *httpflv.Tag
} }
var _ rtmp.PubSessionObserver = &Group{} var _ rtmp.PubSessionObserver = &Group{}
func NewGroup(appName string, streamName string) *Group { func NewGroup(appName string, streamName string) *Group {
uk := unique.GenUniqueKey("RTMPGROUP") uk := unique.GenUniqueKey("GROUP")
log.Infof("lifecycle new group. [%s] appName=%s, streamName=%s", uk, appName, streamName) log.Infof("lifecycle new group. [%s] appName=%s, streamName=%s", uk, appName, streamName)
return &Group{ return &Group{
UniqueKey: uk, UniqueKey: uk,
appName: appName, appName: appName,
streamName: streamName, streamName: streamName,
exitChan: make(chan struct{}, 1), exitChan: make(chan struct{}, 1),
subSessionSet: make(map[*rtmp.ServerSession]struct{}), rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}),
httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}),
} }
} }
func (group *Group) RunLoop() { func (group *Group) RunLoop() {
t := time.NewTicker(200 * time.Millisecond) <-group.exitChan
defer t.Stop()
for {
select {
case <-group.exitChan:
break
case <-t.C:
//noop
}
}
} }
func (group *Group) Dispose(err error) { func (group *Group) Dispose(err error) {
@ -73,7 +69,10 @@ func (group *Group) Dispose(err error) {
if group.pubSession != nil { if group.pubSession != nil {
group.pubSession.Dispose() group.pubSession.Dispose()
} }
for session := range group.subSessionSet { for session := range group.rtmpSubSessionSet {
session.Dispose()
}
for session := range group.httpflvSubSessionSet {
session.Dispose() session.Dispose()
} }
} }
@ -92,16 +91,6 @@ func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool {
return true return true
} }
func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) {
log.Debugf("add SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
group.subSessionSet[session] = struct{}{}
// TODO chef: 多长没有拉流session存在的功能
//group.turnToEmptyTick = 0
}
func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) { func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) {
log.Debugf("del PubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) log.Debugf("del PubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock() group.mutex.Lock()
@ -110,49 +99,49 @@ func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) {
group.metadata = nil group.metadata = nil
group.avcKeySeqHeader = nil group.avcKeySeqHeader = nil
group.aacSeqHeader = nil group.aacSeqHeader = nil
group.metadataTag = nil
group.avcKeySeqHeaderTag = nil
group.aacSeqHeaderTag = nil
}
func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) {
log.Debugf("add SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtmpSubSessionSet[session] = struct{}{}
// TODO chef: 多长没有拉流session存在的功能
//group.turnToEmptyTick = 0
} }
func (group *Group) DelRTMPSubSession(session *rtmp.ServerSession) { func (group *Group) DelRTMPSubSession(session *rtmp.ServerSession) {
log.Debugf("del SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) log.Debugf("del SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
group.mutex.Lock() group.mutex.Lock()
defer group.mutex.Unlock() defer group.mutex.Unlock()
delete(group.subSessionSet, session) delete(group.rtmpSubSessionSet, session)
} }
func (group *Group) AddHTTPFlvSubSession(session *httpflv.SubSession) { func (group *Group) AddHTTPFLVSubSession(session *httpflv.SubSession) {
panic("not impl") log.Debugf("add httpflv SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
session.WriteHTTPResponseHeader()
session.WriteFLVHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
group.httpflvSubSessionSet[session] = struct{}{}
} }
func (group *Group) Pull(addr string, connectTimeout int64) { func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) {
// TODO chef: config me, log.Debugf("del httpflv SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey)
// v1.0.0 版本之前先不提供去其他节点回源的功能 group.mutex.Lock()
panic("not impl yet") defer group.mutex.Unlock()
//group.pullSession = NewPullSession(group, PullSessionTimeout{ delete(group.httpflvSubSessionSet, session)
// ConnectTimeoutMS: int(connectTimeout),
//})
//
//defer func() {
// group.mutex.Lock()
// defer group.mutex.Unlock()
// log.Infof("del rtmp PullSession out of group. [%s] [%s]", group.UniqueKey, group.pullSession)
// group.pullSession = nil
//}()
//
//url := fmt.Sprintf("rtmp://%s/%s/%s", addr, group.appName, group.streamName)
//if err := group.pullSession.Pull(url); err != nil {
// log.Error(err)
//}
//if err := group.pullSession.WaitLoop(); err != nil {
// log.Debugf("rtmp PullSession loop done. [%s] [%s] err=%v", group.UniqueKey, group.pullSession.UniqueKey, err)
// return
//}
} }
func (group *Group) IsTotalEmpty() bool { func (group *Group) IsTotalEmpty() bool {
group.mutex.Lock() group.mutex.Lock()
defer group.mutex.Unlock() defer group.mutex.Unlock()
return group.pubSession == nil && len(group.subSessionSet) == 0 return group.pubSession == nil && len(group.rtmpSubSessionSet) == 0 && len(group.httpflvSubSessionSet) == 0
} }
func (group *Group) IsInExist() bool { func (group *Group) IsInExist() bool {
@ -165,13 +154,20 @@ func (group *Group) IsInExist() bool {
func (group *Group) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { func (group *Group) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) {
group.mutex.Lock() group.mutex.Lock()
defer group.mutex.Unlock() defer group.mutex.Unlock()
group.broadcastRTMP2RTMP(header, timestampAbs, message)
group.broadcastRTMP(header, timestampAbs, message)
} }
func (group *Group) broadcastRTMP2RTMP(header rtmp.Header, timestampAbs uint32, message []byte) { func (group *Group) broadcastRTMP(header rtmp.Header, timestampAbs uint32, message []byte) {
//log.Infof("%+v", header) //log.Infof("%+v", header)
// # 1. 设置好头部信息
var currHeader rtmp.Header var (
currHeader rtmp.Header
absChunks []byte
currTag *httpflv.Tag
)
// # 1. 设置好 rtmp 头部信息
currHeader.MsgLen = uint32(len(message)) currHeader.MsgLen = uint32(len(message))
currHeader.Timestamp = timestampAbs currHeader.Timestamp = timestampAbs
currHeader.MsgTypeID = header.MsgTypeID currHeader.MsgTypeID = header.MsgTypeID
@ -188,10 +184,8 @@ func (group *Group) broadcastRTMP2RTMP(header rtmp.Header, timestampAbs uint32,
//prevHeader = group.prevVideoHeader //prevHeader = group.prevVideoHeader
} }
var absChunks []byte // # 2. 广播。遍历所有rtmp sub session决定是否转发
for session := range group.rtmpSubSessionSet {
// # 2. 广播。遍历所有sub session决定是否转发
for session := range group.subSessionSet {
// ## 2.1. 一个message广播给多个sub session时只做一次chunk切割 // ## 2.1. 一个message广播给多个sub session时只做一次chunk切割
if absChunks == nil { if absChunks == nil {
absChunks = rtmp.Message2Chunks(message, &currHeader) absChunks = rtmp.Message2Chunks(message, &currHeader)
@ -212,7 +206,7 @@ func (group *Group) broadcastRTMP2RTMP(header rtmp.Header, timestampAbs uint32,
session.IsFresh = false session.IsFresh = false
} }
// ## 2.3. 判断当前包的类型以及sub session的状态决定是否发送并更新sub session的状态 // ## 2.3. 判断当前包的类型以及sub session的状态决定是否发送并更新sub session的状态
switch header.MsgTypeID { switch header.MsgTypeID {
case rtmp.TypeidDataMessageAMF0: case rtmp.TypeidDataMessageAMF0:
session.AsyncWrite(absChunks) session.AsyncWrite(absChunks)
@ -232,69 +226,90 @@ func (group *Group) broadcastRTMP2RTMP(header rtmp.Header, timestampAbs uint32,
} }
} }
}
// # 3. 广播。遍历所有httpflv sub session决定是否转发
for session := range group.httpflvSubSessionSet {
// ## 3.1. 将当前 message 转换成 tag 格式
if currTag == nil {
currTag = Trans.RTMPMsg2FLVTag(header, timestampAbs, message)
} }
// # 3. 缓存 metadata 和 avc key seq header 和 aac seq header // ## 3.2. 如果是新的sub session发送已缓存的信息
// 由于可能没有订阅者所以message可能还没做chunk切割所以这里要做判断是否做chunk切割 if session.IsFresh {
// 发送缓存的头部信息
if group.metadataTag != nil {
log.Debugf("send cache metadata. [%s]", session.UniqueKey)
session.WriteTag(group.metadataTag)
}
if group.avcKeySeqHeaderTag != nil {
session.WriteTag(group.avcKeySeqHeaderTag)
}
if group.aacSeqHeaderTag != nil {
session.WriteTag(group.aacSeqHeaderTag)
}
session.IsFresh = false
}
// ## 3.3. 判断当前包的类型以及sub session的状态决定是否发送并更新sub session的状态
switch header.MsgTypeID {
case rtmp.TypeidDataMessageAMF0:
session.WriteTag(currTag)
case rtmp.TypeidAudio:
session.WriteTag(currTag)
case rtmp.TypeidVideo:
if session.WaitKeyNalu {
if message[0] == 0x17 && message[1] == 0x0 {
session.WriteTag(currTag)
}
if message[0] == 0x17 && message[1] == 0x1 {
session.WriteTag(currTag)
session.WaitKeyNalu = false
}
} else {
session.WriteTag(currTag)
}
}
}
// # 4. 缓存 rtmp 以及 httpflv 的 metadata 和 avc key seq header 和 aac seq header
// 由于可能没有订阅者,所以可能需要重新打包
switch header.MsgTypeID { switch header.MsgTypeID {
case rtmp.TypeidDataMessageAMF0: case rtmp.TypeidDataMessageAMF0:
if absChunks == nil { if absChunks == nil {
absChunks = rtmp.Message2Chunks(message, &currHeader) absChunks = rtmp.Message2Chunks(message, &currHeader)
} }
log.Debugf("cache metadata. [%s]", group.UniqueKey) if currTag == nil {
currTag = Trans.RTMPMsg2FLVTag(header, timestampAbs, message)
}
group.metadata = absChunks group.metadata = absChunks
group.metadataTag = currTag
log.Debugf("cache metadata. [%s]", group.UniqueKey)
case rtmp.TypeidVideo: case rtmp.TypeidVideo:
// TODO chef: magic number // TODO chef: magic number
if message[0] == 0x17 && message[1] == 0x0 { if message[0] == 0x17 && message[1] == 0x0 {
if absChunks == nil { if absChunks == nil {
absChunks = rtmp.Message2Chunks(message, &currHeader) absChunks = rtmp.Message2Chunks(message, &currHeader)
} }
log.Debugf("cache avc key seq header. [%s]", group.UniqueKey) if currTag == nil {
currTag = Trans.RTMPMsg2FLVTag(header, timestampAbs, message)
}
group.avcKeySeqHeader = absChunks group.avcKeySeqHeader = absChunks
group.avcKeySeqHeaderTag = currTag
log.Debugf("cache avc key seq header. [%s]", group.UniqueKey)
} }
case rtmp.TypeidAudio: case rtmp.TypeidAudio:
if (message[0]>>4) == 0x0a && message[1] == 0x0 { if (message[0]>>4) == 0x0a && message[1] == 0x0 {
if absChunks == nil { if absChunks == nil {
absChunks = rtmp.Message2Chunks(message, &currHeader) absChunks = rtmp.Message2Chunks(message, &currHeader)
} }
log.Debugf("cache aac seq header. [%s]", group.UniqueKey) if currTag == nil {
group.aacSeqHeader = absChunks currTag = Trans.RTMPMsg2FLVTag(header, timestampAbs, message)
}
} }
group.aacSeqHeader = absChunks
group.aacSeqHeaderTag = currTag
log.Debugf("cache aac seq header. [%s]", group.UniqueKey)
} }
func (group *Group) pullIfNeeded() {
panic("not impl")
//if !gm.isInExist() {
// switch gm.config.Pull.Type {
// case "httpflv":
// go gm.httpFlvGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout, gm.config.Pull.ReadTimeout)
// case "rtmp":
// go gm.rtmpGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout)
// }
//}
}
func (group *Group) isInExist() bool {
panic("not impl")
//return (gm.rtmpGroup != nil && gm.rtmpGroup.IsInExist()) ||
// (gm.httpFlvGroup != nil && gm.httpFlvGroup.IsInExist())
}
// GroupObserver of httpflv.Group
func (group *Group) ReadHTTPRespHeaderCB() {
// noop
}
// GroupObserver of httpflv.Group
func (group *Group) ReadFlvHeaderCB(flvHeader []byte) {
// noop
} }
// GroupObserver of httpflv.Group
func (group *Group) ReadFlvTagCB(tag *httpflv.Tag) {
log.Info("ReadFlvTagCB")
// TODO chef: broadcast to rtmp.Group
} }

@ -0,0 +1,22 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"errors"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
)
var ErrLogic = errors.New("lal.logic: fxxk")
var _ rtmp.ServerObserver = &ServerManager{}
var _ httpflv.ServerObserver = &ServerManager{}
var _ rtmp.PubSessionObserver = &Group{}

@ -20,14 +20,13 @@ import (
type ServerManager struct { type ServerManager struct {
config *Config config *Config
httpFlvServer *httpflv.Server httpflvServer *httpflv.Server
rtmpServer *rtmp.Server rtmpServer *rtmp.Server
groupMap map[string]*Group // TODO chef: with appName
mutex sync.Mutex
exitChan chan struct{} exitChan chan struct{}
}
var _ rtmp.ServerObserver = &ServerManager{} mutex sync.Mutex
groupMap map[string]*Group // TODO chef: with appName
}
func NewServerManager(config *Config) *ServerManager { func NewServerManager(config *Config) *ServerManager {
m := &ServerManager{ m := &ServerManager{
@ -35,8 +34,8 @@ func NewServerManager(config *Config) *ServerManager {
groupMap: make(map[string]*Group), groupMap: make(map[string]*Group),
exitChan: make(chan struct{}), exitChan: make(chan struct{}),
} }
if len(config.HTTPFlv.SubListenAddr) != 0 { if len(config.HTTPFLV.SubListenAddr) != 0 {
m.httpFlvServer = httpflv.NewServer(m, config.HTTPFlv.SubListenAddr, config.SubIdleTimeout) m.httpflvServer = httpflv.NewServer(m, config.HTTPFLV.SubListenAddr)
} }
if len(config.RTMP.Addr) != 0 { if len(config.RTMP.Addr) != 0 {
m.rtmpServer = rtmp.NewServer(m, config.RTMP.Addr) m.rtmpServer = rtmp.NewServer(m, config.RTMP.Addr)
@ -45,9 +44,9 @@ func NewServerManager(config *Config) *ServerManager {
} }
func (sm *ServerManager) RunLoop() { func (sm *ServerManager) RunLoop() {
if sm.httpFlvServer != nil { if sm.httpflvServer != nil {
go func() { go func() {
if err := sm.httpFlvServer.RunLoop(); err != nil { if err := sm.httpflvServer.RunLoop(); err != nil {
log.Error(err) log.Error(err)
} }
}() }()
@ -82,8 +81,8 @@ func (sm *ServerManager) RunLoop() {
func (sm *ServerManager) Dispose() { func (sm *ServerManager) Dispose() {
log.Debug("dispose server manager.") log.Debug("dispose server manager.")
if sm.httpFlvServer != nil { if sm.httpflvServer != nil {
sm.httpFlvServer.Dispose() sm.httpflvServer.Dispose()
} }
if sm.rtmpServer != nil { if sm.rtmpServer != nil {
sm.rtmpServer.Dispose() sm.rtmpServer.Dispose()
@ -91,9 +90,8 @@ func (sm *ServerManager) Dispose() {
sm.mutex.Lock() sm.mutex.Lock()
for _, group := range sm.groupMap { for _, group := range sm.groupMap {
group.Dispose(lalErr) group.Dispose(ErrLogic)
} }
sm.groupMap = nil
sm.mutex.Unlock() sm.mutex.Unlock()
sm.exitChan <- struct{}{} sm.exitChan <- struct{}{}
@ -101,51 +99,73 @@ func (sm *ServerManager) Dispose() {
// ServerObserver of rtmp.Server // ServerObserver of rtmp.Server
func (sm *ServerManager) NewRTMPPubSessionCB(session *rtmp.ServerSession) bool { func (sm *ServerManager) NewRTMPPubSessionCB(session *rtmp.ServerSession) bool {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName, session.StreamName) group := sm.getOrCreateGroup(session.AppName, session.StreamName)
return group.AddRTMPPubSession(session) return group.AddRTMPPubSession(session)
} }
// ServerObserver of rtmp.Server // ServerObserver of rtmp.Server
func (sm *ServerManager) NewRTMPSubSessionCB(session *rtmp.ServerSession) bool { func (sm *ServerManager) DelRTMPPubSessionCB(session *rtmp.ServerSession) {
group := sm.getOrCreateGroup(session.AppName, session.StreamName) sm.mutex.Lock()
group.AddRTMPSubSession(session) defer sm.mutex.Unlock()
return true group := sm.getGroup(session.AppName, session.StreamName)
if group != nil {
group.DelRTMPPubSession(session)
}
} }
// ServerObserver of rtmp.Server // ServerObserver of rtmp.Server
func (sm *ServerManager) DelRTMPPubSessionCB(session *rtmp.ServerSession) { func (sm *ServerManager) NewRTMPSubSessionCB(session *rtmp.ServerSession) bool {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName, session.StreamName) group := sm.getOrCreateGroup(session.AppName, session.StreamName)
group.DelRTMPPubSession(session) group.AddRTMPSubSession(session)
return true
} }
// ServerObserver of rtmp.Server // ServerObserver of rtmp.Server
func (sm *ServerManager) DelRTMPSubSessionCB(session *rtmp.ServerSession) { func (sm *ServerManager) DelRTMPSubSessionCB(session *rtmp.ServerSession) {
group := sm.getOrCreateGroup(session.AppName, session.StreamName) sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName, session.StreamName)
if group != nil {
group.DelRTMPSubSession(session) group.DelRTMPSubSession(session)
} }
}
// ServerObserver of httpflv.Server // ServerObserver of httpflv.Server
func (sm *ServerManager) NewHTTPFlvSubSessionCB(session *httpflv.SubSession) bool { func (sm *ServerManager) NewHTTPFLVSubSessionCB(session *httpflv.SubSession) bool {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName, session.StreamName) group := sm.getOrCreateGroup(session.AppName, session.StreamName)
group.AddHTTPFlvSubSession(session) group.AddHTTPFLVSubSession(session)
return true return true
} }
// ServerObserver of httpflv.Server
func (sm *ServerManager) DelHTTPFLVSubSessionCB(session *httpflv.SubSession) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName, session.StreamName)
if group != nil {
group.DelHTTPFLVSubSession(session)
}
}
func (sm *ServerManager) check() { func (sm *ServerManager) check() {
sm.mutex.Lock() sm.mutex.Lock()
defer sm.mutex.Unlock() defer sm.mutex.Unlock()
for k, group := range sm.groupMap { for k, group := range sm.groupMap {
if group.IsTotalEmpty() { if group.IsTotalEmpty() {
log.Infof("erase empty group manager. [%s]", group.UniqueKey) log.Infof("erase empty group manager. [%s]", group.UniqueKey)
group.Dispose(lalErr) group.Dispose(ErrLogic)
delete(sm.groupMap, k) delete(sm.groupMap, k)
} }
} }
} }
func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group { func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group {
sm.mutex.Lock()
defer sm.mutex.Unlock()
group, exist := sm.groupMap[streamName] group, exist := sm.groupMap[streamName]
if !exist { if !exist {
group = NewGroup(appName, streamName) group = NewGroup(appName, streamName)
@ -154,3 +174,11 @@ func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Gr
go group.RunLoop() go group.RunLoop()
return group return group
} }
func (sm *ServerManager) getGroup(appName string, streamName string) *Group {
group, exist := sm.groupMap[streamName]
if !exist {
return nil
}
return group
}

@ -18,11 +18,11 @@ var Trans trans
type trans struct { type trans struct {
} }
//// TODO chef: rtmp msg 也弄成结构体 // 注意tag -> message [nocopy]
func (t trans) FlvTag2RTMPMsg(tag httpflv.Tag) (header rtmp.Header, timestampAbs uint32, message []byte) { func (t trans) FLVTag2RTMPMsg(tag httpflv.Tag) (header rtmp.Header, timestampAbs uint32, message []byte) {
header.MsgLen = tag.Header.DataSize header.MsgLen = tag.Header.DataSize
header.MsgTypeID = tag.Header.T header.MsgTypeID = tag.Header.T
header.MsgStreamID = rtmp.MSID1 // TODO header.MsgStreamID = rtmp.MSID1
switch tag.Header.T { switch tag.Header.T {
case httpflv.TagTypeMetadata: case httpflv.TagTypeMetadata:
header.CSID = rtmp.CSIDAMF header.CSID = rtmp.CSIDAMF
@ -37,11 +37,12 @@ func (t trans) FlvTag2RTMPMsg(tag httpflv.Tag) (header rtmp.Header, timestampAbs
return return
} }
func (t trans) RTMPMsg2FlvTag(header rtmp.Header, timestampAbs uint32, message []byte) httpflv.Tag { // 注意message -> tag [copy]
func (t trans) RTMPMsg2FLVTag(header rtmp.Header, timestampAbs uint32, message []byte) *httpflv.Tag {
var tag httpflv.Tag var tag httpflv.Tag
tag.Header.T = header.MsgTypeID tag.Header.T = header.MsgTypeID
tag.Header.DataSize = header.MsgLen tag.Header.DataSize = header.MsgLen
tag.Header.Timestamp = timestampAbs tag.Header.Timestamp = timestampAbs
tag.Raw = httpflv.PackHTTPFlvTag(header.MsgTypeID, timestampAbs, message) tag.Raw = httpflv.PackHTTPFLVTag(header.MsgTypeID, timestampAbs, message)
return tag return &tag
} }

@ -22,8 +22,8 @@ import (
) )
var ( var (
ErrAMFInvalidType = errors.New("lal.AMF0: invalid type") ErrAMFInvalidType = errors.New("lal.rtmp: invalid amf0 type")
ErrAMFTooShort = errors.New("lal.AMF0: too short") ErrAMFTooShort = errors.New("lal.rtmp: too short to unmarshal amf0 data")
) )
const ( const (

@ -67,9 +67,9 @@ func TestAmf0_WriteString_ReadString(t *testing.T) {
func TestAmf0_WriteObject_ReadObject(t *testing.T) { func TestAmf0_WriteObject_ReadObject(t *testing.T) {
out := &bytes.Buffer{} out := &bytes.Buffer{}
objs := []ObjectPair{ objs := []ObjectPair{
{"air", 3}, {Key: "air", Value: 3},
{"ban", "cat"}, {Key: "ban", Value: "cat"},
{"dog", true}, {Key: "dog", Value: true},
} }
err := AMF0.WriteObject(out, objs) err := AMF0.WriteObject(out, objs)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
@ -153,9 +153,9 @@ func TestAMF0Corner(t *testing.T) {
assert.IsNotNil(t, err) assert.IsNotNil(t, err)
objs = []ObjectPair{ objs = []ObjectPair{
{"air", 3}, {Key: "air", Value: 3},
{"ban", "cat"}, {Key: "ban", Value: "cat"},
{"dog", true}, {Key: "dog", Value: true},
} }
for i := uint32(0); i < 14; i++ { for i := uint32(0); i < 14; i++ {
mw = mockwriter.NewMockWriter(mockwriter.WriterTypeDoNothing) mw = mockwriter.NewMockWriter(mockwriter.WriterTypeDoNothing)
@ -244,7 +244,7 @@ func TestAMF0Corner(t *testing.T) {
recover() recover()
}() }()
objs = []ObjectPair{ objs = []ObjectPair{
{"key", []byte{1}}, {Key: "key", Value: []byte{1}},
} }
_ = AMF0.WriteObject(mw, objs) _ = AMF0.WriteObject(mw, objs)
} }
@ -252,9 +252,9 @@ func TestAMF0Corner(t *testing.T) {
func BenchmarkAmf0_ReadObject(b *testing.B) { func BenchmarkAmf0_ReadObject(b *testing.B) {
out := &bytes.Buffer{} out := &bytes.Buffer{}
objs := []ObjectPair{ objs := []ObjectPair{
{"air", 3}, {Key: "air", Value: 3},
{"ban", "cat"}, {Key: "ban", Value: "cat"},
{"dog", true}, {Key: "dog", Value: true},
} }
_ = AMF0.WriteObject(out, objs) _ = AMF0.WriteObject(out, objs)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -265,9 +265,9 @@ func BenchmarkAmf0_ReadObject(b *testing.B) {
func BenchmarkAmf0_WriteObject(b *testing.B) { func BenchmarkAmf0_WriteObject(b *testing.B) {
out := &bytes.Buffer{} out := &bytes.Buffer{}
objs := []ObjectPair{ objs := []ObjectPair{
{"air", 3}, {Key: "air", Value: 3},
{"ban", "cat"}, {Key: "ban", Value: "cat"},
{"dog", true}, {Key: "dog", Value: true},
} }
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_ = AMF0.WriteObject(out, objs) _ = AMF0.WriteObject(out, objs)

@ -112,7 +112,7 @@ func calcHeader(header *Header, prevHeader *Header, out []byte) int {
func message2Chunks(message []byte, header *Header, prevHeader *Header, chunkSize int) []byte { func message2Chunks(message []byte, header *Header, prevHeader *Header, chunkSize int) []byte {
//if header.CSID < minCSID || header.CSID > maxCSID { //if header.CSID < minCSID || header.CSID > maxCSID {
// return nil, rtmpErr // return nil, ErrRTMP
//} //}
// 计算chunk数量最后一个chunk的大小 // 计算chunk数量最后一个chunk的大小

@ -22,7 +22,7 @@ import (
"github.com/q191201771/naza/pkg/unique" "github.com/q191201771/naza/pkg/unique"
) )
var ErrClientSessionTimeout = errors.New("rtmp.ClientSession timeout") var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout")
// rtmp客户端类型连接的底层实现 // rtmp客户端类型连接的底层实现
// rtmp包的使用者应该优先使用基于ClientSession实现的PushSession和PullSession // rtmp包的使用者应该优先使用基于ClientSession实现的PushSession和PullSession
@ -251,7 +251,7 @@ func (s *ClientSession) doOnStatusMessage(stream *Stream, tid int) error {
} }
code, ok := infos["code"] code, ok := infos["code"]
if !ok { if !ok {
return rtmpErr return ErrRTMP
} }
switch s.t { switch s.t {
case CSTPushSession: case CSTPushSession:
@ -288,7 +288,7 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
} }
code, ok := infos["code"].(string) code, ok := infos["code"].(string)
if !ok { if !ok {
return rtmpErr return ErrRTMP
} }
switch code { switch code {
case "NetConnection.Connect.Success": case "NetConnection.Connect.Success":
@ -330,7 +330,7 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
func (s *ClientSession) doProtocolControlMessage(stream *Stream) error { func (s *ClientSession) doProtocolControlMessage(stream *Stream) error {
if stream.msg.len() < 4 { if stream.msg.len() < 4 {
return rtmpErr return ErrRTMP
} }
val := int(bele.BEUint32(stream.msg.buf)) val := int(bele.BEUint32(stream.msg.buf))
@ -356,16 +356,16 @@ func (s *ClientSession) parseURL(rawURL string) error {
return err return err
} }
if s.url.Scheme != "rtmp" || len(s.url.Host) == 0 || len(s.url.Path) == 0 || s.url.Path[0] != '/' { if s.url.Scheme != "rtmp" || len(s.url.Host) == 0 || len(s.url.Path) == 0 || s.url.Path[0] != '/' {
return rtmpErr return ErrRTMP
} }
index := strings.LastIndexByte(rawURL, '/') index := strings.LastIndexByte(rawURL, '/')
if index == -1 { if index == -1 {
return rtmpErr return ErrRTMP
} }
s.tcURL = rawURL[:index] s.tcURL = rawURL[:index]
strs := strings.Split(s.url.Path[1:], "/") strs := strings.Split(s.url.Path[1:], "/")
if len(strs) != 2 { if len(strs) != 2 {
return rtmpErr return ErrRTMP
} }
s.appName = strs[0] s.appName = strs[0]
// 有的rtmp服务器会使用url后面的参数比如说用于鉴权这里把它带上 // 有的rtmp服务器会使用url后面的参数比如说用于鉴权这里把它带上

@ -29,16 +29,16 @@ var (
serverAddr = ":10001" serverAddr = ":10001"
pushURL = "rtmp://127.0.0.1:10001/live/test" pushURL = "rtmp://127.0.0.1:10001/live/test"
pullURL = "rtmp://127.0.0.1:10001/live/test" pullURL = "rtmp://127.0.0.1:10001/live/test"
rFlvFile = "testdata/test.flv" rFLVFile = "testdata/test.flv"
wFlvFile = "testdata/out.flv" wFLVFile = "testdata/out.flv"
wgNum = 4 // FlvFileReader -> [push -> pub -> sub -> pull] -> FlvFileWriter wgNum = 4 // FLVFileReader -> [push -> pub -> sub -> pull] -> FLVFileWriter
) )
var ( var (
pubSessionObs MockPubSessionObserver pubSessionObs MockPubSessionObserver
subSession *rtmp.ServerSession subSession *rtmp.ServerSession
wg sync.WaitGroup wg sync.WaitGroup
w httpflv.FlvFileWriter w httpflv.FLVFileWriter
// //
rc uint32 rc uint32
bc uint32 bc uint32
@ -83,13 +83,10 @@ func (pso *MockPubSessionObserver) ReadRTMPAVMsgCB(header rtmp.Header, timestamp
switch header.MsgTypeID { switch header.MsgTypeID {
case rtmp.TypeidDataMessageAMF0: case rtmp.TypeidDataMessageAMF0:
currHeader.CSID = rtmp.CSIDAMF currHeader.CSID = rtmp.CSIDAMF
//prevHeader = nil
case rtmp.TypeidAudio: case rtmp.TypeidAudio:
currHeader.CSID = rtmp.CSIDAudio currHeader.CSID = rtmp.CSIDAudio
//prevHeader = group.prevAudioHeader
case rtmp.TypeidVideo: case rtmp.TypeidVideo:
currHeader.CSID = rtmp.CSIDVideo currHeader.CSID = rtmp.CSIDVideo
//prevHeader = group.prevVideoHeader
} }
var absChunks []byte var absChunks []byte
absChunks = rtmp.Message2Chunks(message, &currHeader) absChunks = rtmp.Message2Chunks(message, &currHeader)
@ -100,19 +97,16 @@ type MockPullSessionObserver struct {
} }
func (pso *MockPullSessionObserver) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { func (pso *MockPullSessionObserver) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) {
tag := logic.Trans.RTMPMsg2FlvTag(header, timestampAbs, message) tag := logic.Trans.RTMPMsg2FLVTag(header, timestampAbs, message)
w.WriteTag(tag) w.WriteTag(*tag)
//wg.Done()
atomic.AddUint32(&wc, 1) atomic.AddUint32(&wc, 1)
} }
func TestExample(t *testing.T) { func TestExample(t *testing.T) {
var err error var err error
var r httpflv.FlvFileReader var r httpflv.FLVFileReader
err = r.Open(rFlvFile) err = r.Open(rFLVFile)
//assert.Equal(t, nil, err)
// 测试文件不存在,则不做后面的测试了
if err != nil { if err != nil {
return return
} }
@ -137,12 +131,12 @@ func TestExample(t *testing.T) {
err = pushSession.Push(pushURL) err = pushSession.Push(pushURL)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
err = w.Open(wFlvFile) err = w.Open(wFLVFile)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
err = w.WriteRaw(httpflv.FlvHeader) err = w.WriteRaw(httpflv.FLVHeader)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
_, err = r.ReadFlvHeader() _, err = r.ReadFLVHeader()
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
for { for {
tag, err := r.ReadTag() tag, err := r.ReadTag()
@ -151,13 +145,11 @@ func TestExample(t *testing.T) {
} }
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
rc++ rc++
//wg.Add(1) h, _, m := logic.Trans.FLVTag2RTMPMsg(*tag)
h, _, m := logic.Trans.FlvTag2RTMPMsg(*tag)
chunks := rtmp.Message2Chunks(m, &h) chunks := rtmp.Message2Chunks(m, &h)
err = pushSession.AsyncWrite(chunks) err = pushSession.AsyncWrite(chunks)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
} }
//wg.Wait()
r.Dispose() r.Dispose()
wg.Done() wg.Done()
@ -187,12 +179,12 @@ func TestExample(t *testing.T) {
} }
func compareFile(t *testing.T) { func compareFile(t *testing.T) {
r, err := ioutil.ReadFile(rFlvFile) r, err := ioutil.ReadFile(rFLVFile)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
w, err := ioutil.ReadFile(wFlvFile) w, err := ioutil.ReadFile(wFLVFile)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
res := bytes.Compare(r, w) res := bytes.Compare(r, w)
assert.Equal(t, 0, res) assert.Equal(t, 0, res)
err = os.Remove(wFlvFile) err = os.Remove(wFLVFile)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
} }

@ -107,7 +107,7 @@ func (c *HandshakeClientSimple) ReadS0S1S2(reader io.Reader) error {
return err return err
} }
//if s0s1s2[0] != version { //if s0s1s2[0] != version {
// return rtmpErr // return ErrRTMP
//} //}
// use s2 as c2 // use s2 as c2
c.c2 = append(c.c2, s0s1s2[s0s1Len:]...) c.c2 = append(c.c2, s0s1s2[s0s1Len:]...)
@ -144,7 +144,7 @@ func (c *HandshakeClientComplex) ReadS0S1S2(reader io.Reader) error {
return err return err
} }
//if s0s1s2[0] != version { //if s0s1s2[0] != version {
// return rtmpErr // return ErrRTMP
//} //}
// TODO chef: 这里复杂模式的 c2 构造没有完全按照规范 // TODO chef: 这里复杂模式的 c2 构造没有完全按照规范
// nginx rtmp module 作为 server 端时,不会校验 c2 内容 // nginx rtmp module 作为 server 端时,不会校验 c2 内容
@ -222,7 +222,7 @@ func (s *HandshakeServer) ReadC2(reader io.Reader) error {
func parseChallenge(c0c1 []byte) []byte { func parseChallenge(c0c1 []byte) []byte {
//if c0c1[0] != version { //if c0c1[0] != version {
// return nil, rtmpErr // return nil, ErrRTMP
//} //}
ver := bele.BEUint32(c0c1[5:]) ver := bele.BEUint32(c0c1[5:])
if ver == 0 { if ver == 0 {

@ -10,7 +10,7 @@ package rtmp
import "errors" import "errors"
var rtmpErr = errors.New("rtmp: fxxk") var ErrRTMP = errors.New("lal.rtmp: fxxk")
const ( const (
CSIDAMF = 5 CSIDAMF = 5

@ -17,8 +17,8 @@ import (
type ServerObserver interface { type ServerObserver interface {
NewRTMPPubSessionCB(session *ServerSession) bool // 返回true则允许推流返回false则强制关闭这个连接 NewRTMPPubSessionCB(session *ServerSession) bool // 返回true则允许推流返回false则强制关闭这个连接
NewRTMPSubSessionCB(session *ServerSession) bool // 返回true则允许拉流返回false则强制关闭这个连接
DelRTMPPubSessionCB(session *ServerSession) DelRTMPPubSessionCB(session *ServerSession)
NewRTMPSubSessionCB(session *ServerSession) bool // 返回true则允许拉流返回false则强制关闭这个连接
DelRTMPSubSessionCB(session *ServerSession) DelRTMPSubSessionCB(session *ServerSession)
} }

@ -146,7 +146,7 @@ func (s *ServerSession) doMsg(stream *Stream) error {
case TypeidVideo: case TypeidVideo:
if s.t != ServerSessionTypePub { if s.t != ServerSessionTypePub {
log.Errorf("read audio/video message but server session not pub type. [%s]", s.UniqueKey) log.Errorf("read audio/video message but server session not pub type. [%s]", s.UniqueKey)
return rtmpErr return ErrRTMP
} }
//log.Infof("t:%d ts:%d len:%d", stream.header.MsgTypeID, stream.timestampAbs, stream.msg.e - stream.msg.b) //log.Infof("t:%d ts:%d len:%d", stream.header.MsgTypeID, stream.timestampAbs, stream.msg.e - stream.msg.b)
s.avObs.ReadRTMPAVMsgCB(stream.header, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e]) s.avObs.ReadRTMPAVMsgCB(stream.header, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e])
@ -166,7 +166,7 @@ func (s *ServerSession) doACK(stream *Stream) error {
func (s *ServerSession) doDataMessageAMF0(stream *Stream) error { func (s *ServerSession) doDataMessageAMF0(stream *Stream) error {
if s.t != ServerSessionTypePub { if s.t != ServerSessionTypePub {
log.Errorf("read audio/video message but server session not pub type. [%s]", s.UniqueKey) log.Errorf("read audio/video message but server session not pub type. [%s]", s.UniqueKey)
return rtmpErr return ErrRTMP
} }
val, err := stream.msg.peekStringWithType() val, err := stream.msg.peekStringWithType()
@ -187,7 +187,7 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error {
return err return err
} }
if val != "onMetaData" { if val != "onMetaData" {
return rtmpErr return ErrRTMP
} }
case "onMetaData": case "onMetaData":
// noop // noop
@ -241,7 +241,7 @@ func (s *ServerSession) doConnect(tid int, stream *Stream) error {
var ok bool var ok bool
s.AppName, ok = val["app"].(string) s.AppName, ok = val["app"].(string)
if !ok { if !ok {
return rtmpErr return ErrRTMP
} }
log.Infof("-----> connect('%s'). [%s]", s.AppName, s.UniqueKey) log.Infof("-----> connect('%s'). [%s]", s.AppName, s.UniqueKey)
@ -294,10 +294,6 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) {
log.Debugf("[%s] pubType=%s", s.UniqueKey, pubType) log.Debugf("[%s] pubType=%s", s.UniqueKey, pubType)
log.Infof("-----> publish('%s') [%s]", s.StreamName, s.UniqueKey) log.Infof("-----> publish('%s') [%s]", s.StreamName, s.UniqueKey)
// 回调放在回复客户端信令之前
s.t = ServerSessionTypePub
s.obs.NewRTMPPubSessionCB(s)
log.Infof("<---- onStatus('NetStream.Publish.Start'). [%s]", s.UniqueKey) log.Infof("<---- onStatus('NetStream.Publish.Start'). [%s]", s.UniqueKey)
if err := s.packer.writeOnStatusPublish(s.conn, MSID1); err != nil { if err := s.packer.writeOnStatusPublish(s.conn, MSID1); err != nil {
return err return err
@ -306,6 +302,9 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) {
// 回复完信令后修改 connection 的属性 // 回复完信令后修改 connection 的属性
s.ModConnProps() s.ModConnProps()
s.t = ServerSessionTypePub
s.obs.NewRTMPPubSessionCB(s)
return nil return nil
} }
@ -323,10 +322,6 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {
log.Infof("-----> play('%s'). [%s]", s.StreamName, s.UniqueKey) log.Infof("-----> play('%s'). [%s]", s.StreamName, s.UniqueKey)
// TODO chef: start duration reset // TODO chef: start duration reset
// 回调放在回复客户端信令之前
s.t = ServerSessionTypeSub
s.obs.NewRTMPSubSessionCB(s)
log.Infof("<----onStatus('NetStream.Play.Start'). [%s]", s.UniqueKey) log.Infof("<----onStatus('NetStream.Play.Start'). [%s]", s.UniqueKey)
if err := s.packer.writeOnStatusPlay(s.conn, MSID1); err != nil { if err := s.packer.writeOnStatusPlay(s.conn, MSID1); err != nil {
return err return err
@ -335,12 +330,16 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {
// 回复完信令后修改 connection 的属性 // 回复完信令后修改 connection 的属性
s.ModConnProps() s.ModConnProps()
s.t = ServerSessionTypeSub
s.obs.NewRTMPSubSessionCB(s)
return nil return nil
} }
func (s *ServerSession) ModConnProps() { func (s *ServerSession) ModConnProps() {
s.conn.ModWriteChanSize(wChanSize) s.conn.ModWriteChanSize(wChanSize)
s.conn.ModWriteBufSize(writeBufSize) // TODO chef: naza.connection 这种方式会导致最后一点数据发送不出去,我们应该使用更好的方式
//s.conn.ModWriteBufSize(writeBufSize)
switch s.t { switch s.t {
case ServerSessionTypePub: case ServerSessionTypePub:

@ -6,6 +6,7 @@ if command -v add_go_license >/dev/null 2>&1; then
else else
echo 'CHEFNOTICEME add_go_license not exist!' echo 'CHEFNOTICEME add_go_license not exist!'
fi fi
echo '-----gofmt-----' echo '-----gofmt-----'
if command -v gofmt >/dev/null 2>&1; then if command -v gofmt >/dev/null 2>&1; then
gofmt -l ./ gofmt -l ./
@ -33,15 +34,26 @@ done
# 跑 go test 生成测试覆盖率 # 跑 go test 生成测试覆盖率
echo "-----CI coverage-----" echo "-----CI coverage-----"
if [ ! -f "pkg/rtmp/testdata/test.flv" ]; then if [ ! -f "./testdata/test.flv" ]; then
echo "CHEFERASEME test.flv not exist." if [ ! -d "./testdata" ]; then
if [ ! -d "pkg/rtmp/testdata" ]; then mkdir "./testdata"
echo "CHEFERASEME mkdir."
mkdir "pkg/rtmp/testdata"
fi fi
wget https://pengrl.com/images/other/source.200kbps.768x320.flv -O pkg/rtmp/testdata/test.flv wget https://pengrl.com/images/other/source.200kbps.768x320.flv -O ./testdata/test.flv
else fi
echo "CHEFERASEME test.flv exist." if [ ! -f "./pkg/rtmp/testdata/test.flv" ]; then
if [ ! -d "./pkg/rtmp/testdata" ]; then
mkdir "./pkg/rtmp/testdata"
fi
cp ./testdata/test.flv ./pkg/rtmp/testdata/test.flv
fi
if [ ! -f "./pkg/logic/testdata/test.flv" ]; then
if [ ! -d "./pkg/logic/testdata" ]; then
mkdir "./pkg/logic/testdata"
fi
cp ./testdata/test.flv ./pkg/logic/testdata/test.flv
fi
if [ ! -f "./pkg/logic/testdata/lals.default.conf.json" ]; then
cp ./conf/lals.default.conf.json ./pkg/logic/testdata/lals.default.conf.json
fi fi
echo "" > coverage.txt echo "" > coverage.txt

Loading…
Cancel
Save