diff --git a/.gitignore b/.gitignore index 62cb2a2..8eb8f72 100644 --- a/.gitignore +++ b/.gitignore @@ -15,5 +15,7 @@ coverage.html /logs/ /pkg/hls/ -/pkg/rtmp/testdata/test.flv -/pkg/rtmp/testdata/test.flv.bak +/testdata +/pkg/rtmp/testdata +/pkg/httpflv/testdata +/pkg/logic/testdata diff --git a/app/flvfile2es/flvfile2es.go b/app/flvfile2es/flvfile2es.go index 86121ed..1489013 100644 --- a/app/flvfile2es/flvfile2es.go +++ b/app/flvfile2es/flvfile2es.go @@ -23,7 +23,7 @@ func main() { var err error flvFileName, aacFileName, avcFileName := parseFlag() - var ffr httpflv.FlvFileReader + var ffr httpflv.FLVFileReader err = ffr.Open(flvFileName) log.FatalIfErrorNotNil(err) defer ffr.Dispose() @@ -39,7 +39,7 @@ func main() { defer vfp.Close() log.Infof("open es h264 file succ.") - _, err = ffr.ReadFlvHeader() + _, err = ffr.ReadFLVHeader() log.FatalIfErrorNotNil(err) for { diff --git a/app/flvfile2rtmppush/flvfile2rtmppush.go b/app/flvfile2rtmppush/flvfile2rtmppush.go index c18c499..93d6e3e 100644 --- a/app/flvfile2rtmppush/flvfile2rtmppush.go +++ b/app/flvfile2rtmppush/flvfile2rtmppush.go @@ -56,12 +56,12 @@ func main() { log.Infof(" > round. i=%d, totalBaseTS=%d, prevTS=%d, thisBaseTS=%d", i, totalBaseTS, prevTS, thisBaseTS) - var ffr httpflv.FlvFileReader + var ffr httpflv.FLVFileReader err = ffr.Open(flvFileName) log.FatalIfErrorNotNil(err) log.Infof("open succ. filename=%s", flvFileName) - flvHeader, err := ffr.ReadFlvHeader() + flvHeader, err := ffr.ReadFLVHeader() log.FatalIfErrorNotNil(err) log.Infof("read flv header succ. %v", flvHeader) diff --git a/app/httpflvpull/httpflvpull.go b/app/httpflvpull/httpflvpull.go index b174719..e197065 100644 --- a/app/httpflvpull/httpflvpull.go +++ b/app/httpflvpull/httpflvpull.go @@ -23,7 +23,7 @@ func main() { ReadTimeoutMS: 0, }) err := session.Pull(url, func(tag *httpflv.Tag) { - log.Infof("ReadFlvTagCB. %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu()) + log.Infof("ReadFLVTagCB. %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu()) }) if err != nil { log.Error(err) diff --git a/app/modflvfile/modflvfile.go b/app/modflvfile/modflvfile.go index 83f7236..09643fa 100644 --- a/app/modflvfile/modflvfile.go +++ b/app/modflvfile/modflvfile.go @@ -56,19 +56,19 @@ func main() { var err error inFileName, outFileName := parseFlag() - var ffr httpflv.FlvFileReader + var ffr httpflv.FLVFileReader err = ffr.Open(inFileName) log.FatalIfErrorNotNil(err) defer ffr.Dispose() log.Infof("open input flv file succ.") - var ffw httpflv.FlvFileWriter + var ffw httpflv.FLVFileWriter err = ffw.Open(outFileName) log.FatalIfErrorNotNil(err) defer ffw.Dispose() log.Infof("open output flv file succ.") - flvHeader, err := ffr.ReadFlvHeader() + flvHeader, err := ffr.ReadFLVHeader() log.FatalIfErrorNotNil(err) err = ffw.WriteRaw(flvHeader) diff --git a/app/rtmppull/rtmppull.go b/app/rtmppull/rtmppull.go index 1d521c9..959687e 100644 --- a/app/rtmppull/rtmppull.go +++ b/app/rtmppull/rtmppull.go @@ -19,13 +19,13 @@ import ( ) type Obs struct { - w httpflv.FlvFileWriter + w httpflv.FLVFileWriter } func (obs *Obs) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { log.Infof("%+v, abs ts=%d", header, timestampAbs) - tag := logic.Trans.RTMPMsg2FlvTag(header, timestampAbs, message) - err := obs.w.WriteTag(tag) + tag := logic.Trans.RTMPMsg2FLVTag(header, timestampAbs, message) + err := obs.w.WriteTag(*tag) log.FatalIfErrorNotNil(err) } @@ -43,7 +43,7 @@ func main() { err = obs.w.Open(outFileName) log.FatalIfErrorNotNil(err) //defer obs.w.Dispose() - err = obs.w.WriteRaw(httpflv.FlvHeader) + err = obs.w.WriteRaw(httpflv.FLVHeader) log.FatalIfErrorNotNil(err) err = session.WaitLoop() diff --git a/conf/lals.conf.json b/conf/lals.conf.json index c0811dd..5bc8d82 100644 --- a/conf/lals.conf.json +++ b/conf/lals.conf.json @@ -2,6 +2,9 @@ "rtmp": { "addr": ":19350" }, + "httpflv": { + "sub_listen_addr": ":8080" + }, "log": { "level": 1, "filename": "./logs/lals.log", diff --git a/conf/lals.default.conf.json b/conf/lals.default.conf.json new file mode 100644 index 0000000..5bc8d82 --- /dev/null +++ b/conf/lals.default.conf.json @@ -0,0 +1,18 @@ +{ + "rtmp": { + "addr": ":19350" + }, + "httpflv": { + "sub_listen_addr": ":8080" + }, + "log": { + "level": 1, + "filename": "./logs/lals.log", + "is_to_stdout": true, + "is_rotate_daily": true, + "short_file_flag": true + }, + "pprof": { + "addr": ":10001" + } +} diff --git a/go.mod b/go.mod index 10700e7..a5930ab 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.12 -require github.com/q191201771/naza v0.4.3 +require github.com/q191201771/naza v0.4.4-0.20191022073946-f1b3e6ae4eba diff --git a/go.sum b/go.sum index fdff201..bcef056 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/naza v0.4.3 h1:v7CDH3ONFyrptU8rpUeCakVdN3akxPSChHO4z3CRrAw= -github.com/q191201771/naza v0.4.3/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= +github.com/q191201771/naza v0.4.4-0.20191022073946-f1b3e6ae4eba h1:pVmUuAaWFm2Y/v32ZmV9KQLE86Opfc+7IP3h1RpuDF4= +github.com/q191201771/naza v0.4.4-0.20191022073946-f1b3e6ae4eba/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= diff --git a/pkg/avc/avc.go b/pkg/avc/avc.go index d4de9ee..0cc7318 100644 --- a/pkg/avc/avc.go +++ b/pkg/avc/avc.go @@ -15,7 +15,7 @@ import ( "github.com/q191201771/naza/pkg/bele" ) -var avcErr = errors.New("avc: fxxk") +var ErrAVC = errors.New("lal.avc: fxxk") var NaluStartCode = []byte{0x0, 0x0, 0x0, 0x1} @@ -34,7 +34,7 @@ func ParseAVCSeqHeader(payload []byte) (sps, pps []byte, err error) { // TODO chef: check if read out of range if payload[0] != 0x17 || payload[1] != 0x00 || payload[2] != 0 || payload[3] != 0 || payload[4] != 0 { - err = avcErr + err = ErrAVC return } diff --git a/pkg/avc/avc_test.go b/pkg/avc/avc_test.go index 4ec816c..d3f7e4d 100644 --- a/pkg/avc/avc_test.go +++ b/pkg/avc/avc_test.go @@ -27,10 +27,10 @@ func TestCorner(t *testing.T) { sps, pps, err := ParseAVCSeqHeader([]byte{0}) assert.Equal(t, nil, sps) assert.Equal(t, nil, pps) - assert.Equal(t, err, avcErr) + assert.Equal(t, err, ErrAVC) b := &bytes.Buffer{} err = CaptureAVC(b, []byte{0x17, 0x0, 0x1}) assert.Equal(t, nil, b.Bytes()) - assert.Equal(t, err, avcErr) + assert.Equal(t, err, ErrAVC) } diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 2cbd589..3a35e22 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -13,7 +13,6 @@ import ( "net" "net/url" "strings" - "sync" "time" "github.com/q191201771/naza/pkg/connection" @@ -31,14 +30,13 @@ type PullSession struct { config PullSessionConfig - Conn connection.Connection - closeOnce sync.Once + Conn connection.Connection host string uri string addr string - readFlvTagCB ReadFlvTagCB + readFLVTagCB ReadFLVTagCB } func NewPullSession(config PullSessionConfig) *PullSession { @@ -50,7 +48,7 @@ func NewPullSession(config PullSessionConfig) *PullSession { } } -type ReadFlvTagCB func(tag *Tag) +type ReadFLVTagCB func(tag *Tag) // 阻塞直到拉流失败 // @@ -58,8 +56,8 @@ type ReadFlvTagCB func(tag *Tag) // http://{domain}/{app_name}/{stream_name}.flv // http://{ip}/{domain}/{app_name}/{stream_name}.flv // -// @param readFlvTagCB 读取到 flv tag 数据时回调。回调结束后,PullSession不会再使用 数据。 -func (session *PullSession) Pull(rawURL string, readFlvTagCB ReadFlvTagCB) error { +// @param readFLVTagCB 读取到 flv tag 数据时回调。回调结束后,PullSession不会再使用 数据。 +func (session *PullSession) Pull(rawURL string, readFLVTagCB ReadFLVTagCB) error { if err := session.Connect(rawURL); err != nil { return err } @@ -67,16 +65,12 @@ func (session *PullSession) Pull(rawURL string, readFlvTagCB ReadFlvTagCB) error return err } - return session.runReadLoop(readFlvTagCB) + return session.runReadLoop(readFLVTagCB) } func (session *PullSession) Dispose(err error) { - session.closeOnce.Do(func() { - log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err) - if err := session.Conn.Close(); err != nil { - log.Errorf("conn close error. [%s] err=%v", session.UniqueKey, err) - } - }) + log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err) + _ = session.Conn.Close() } func (session *PullSession) Connect(rawURL string) error { @@ -86,7 +80,7 @@ func (session *PullSession) Connect(rawURL string) error { return err } if url.Scheme != "http" || !strings.HasSuffix(url.Path, ".flv") { - return httpFlvErr + return ErrHTTPFLV } session.host = url.Host @@ -128,7 +122,7 @@ func (session *PullSession) ReadHTTPRespHeader() (firstLine string, headers map[ } if !strings.Contains(firstLine, "200") || len(headers) == 0 { - err = httpFlvErr + err = ErrHTTPFLV return } log.Infof("-----> http response header. [%s]", session.UniqueKey) @@ -136,7 +130,7 @@ func (session *PullSession) ReadHTTPRespHeader() (firstLine string, headers map[ return } -func (session *PullSession) ReadFlvHeader() ([]byte, error) { +func (session *PullSession) ReadFLVHeader() ([]byte, error) { flvHeader := make([]byte, flvHeaderSize) _, err := session.Conn.ReadAtLeast(flvHeader, flvHeaderSize) if err != nil { @@ -152,12 +146,12 @@ func (session *PullSession) ReadTag() (*Tag, error) { return readTag(session.Conn) } -func (session *PullSession) runReadLoop(readFlvTagCB ReadFlvTagCB) error { +func (session *PullSession) runReadLoop(readFLVTagCB ReadFLVTagCB) error { if _, _, err := session.ReadHTTPRespHeader(); err != nil { return err } - if _, err := session.ReadFlvHeader(); err != nil { + if _, err := session.ReadFLVHeader(); err != nil { return err } @@ -166,6 +160,6 @@ func (session *PullSession) runReadLoop(readFlvTagCB ReadFlvTagCB) error { if err != nil { return err } - readFlvTagCB(tag) + readFLVTagCB(tag) } } diff --git a/pkg/httpflv/example_test.go b/pkg/httpflv/example_test.go new file mode 100644 index 0000000..e523e18 --- /dev/null +++ b/pkg/httpflv/example_test.go @@ -0,0 +1,50 @@ +// Copyright 2019, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package httpflv_test + +import ( + "testing" + + "github.com/q191201771/lal/pkg/httpflv" + log "github.com/q191201771/naza/pkg/nazalog" +) + +// TODO chef: 后续加个 httpflv post 在做完整流程测试吧 + +var ( + serverAddr = ":10001" + pullURL = "http://127.0.0.1:10001/live/11111.flv" +) + +type MockServerObserver struct { +} + +func (so *MockServerObserver) NewHTTPFLVSubSessionCB(session *httpflv.SubSession) bool { + return true +} + +func (so *MockServerObserver) DelHTTPFLVSubSessionCB(session *httpflv.SubSession) { + +} +func TestExample(t *testing.T) { + var err error + + var so MockServerObserver + s := httpflv.NewServer(&so, serverAddr) + go s.RunLoop() + + pullSession := httpflv.NewPullSession(httpflv.PullSessionConfig{ + ConnectTimeoutMS: 1000, + ReadTimeoutMS: 1000, + }) + err = pullSession.Pull(pullURL, func(tag *httpflv.Tag) { + + }) + log.Debugf("pull failed. err=%+v", err) +} diff --git a/pkg/httpflv/flv_file_reader.go b/pkg/httpflv/flv_file_reader.go index 0def978..dbd71fb 100644 --- a/pkg/httpflv/flv_file_reader.go +++ b/pkg/httpflv/flv_file_reader.go @@ -12,27 +12,27 @@ import ( "os" ) -type FlvFileReader struct { +type FLVFileReader struct { fp *os.File } -func (ffr *FlvFileReader) Open(filename string) (err error) { +func (ffr *FLVFileReader) Open(filename string) (err error) { ffr.fp, err = os.Open(filename) return } -func (ffr *FlvFileReader) ReadFlvHeader() ([]byte, error) { +func (ffr *FLVFileReader) ReadFLVHeader() ([]byte, error) { flvHeader := make([]byte, flvHeaderSize) _, err := ffr.fp.Read(flvHeader) return flvHeader, err } // TODO chef: 返回 Tag 类型,对比 bench -func (ffr *FlvFileReader) ReadTag() (*Tag, error) { +func (ffr *FLVFileReader) ReadTag() (*Tag, error) { return readTag(ffr.fp) } -func (ffr *FlvFileReader) Dispose() { +func (ffr *FLVFileReader) Dispose() { if ffr.fp != nil { _ = ffr.fp.Close() } diff --git a/pkg/httpflv/flv_file_writer.go b/pkg/httpflv/flv_file_writer.go index da4a104..49b94b7 100644 --- a/pkg/httpflv/flv_file_writer.go +++ b/pkg/httpflv/flv_file_writer.go @@ -10,26 +10,26 @@ package httpflv import "os" -type FlvFileWriter struct { +type FLVFileWriter struct { fp *os.File } -func (ffw *FlvFileWriter) Open(filename string) (err error) { +func (ffw *FLVFileWriter) Open(filename string) (err error) { ffw.fp, err = os.Create(filename) return } -func (ffw *FlvFileWriter) WriteRaw(b []byte) (err error) { +func (ffw *FLVFileWriter) WriteRaw(b []byte) (err error) { _, err = ffw.fp.Write(b) return } -func (ffw *FlvFileWriter) WriteTag(tag Tag) (err error) { +func (ffw *FLVFileWriter) WriteTag(tag Tag) (err error) { _, err = ffw.fp.Write(tag.Raw) return } -func (ffw *FlvFileWriter) Dispose() { +func (ffw *FLVFileWriter) Dispose() { if ffw.fp != nil { _ = ffw.fp.Close() } diff --git a/pkg/httpflv/gop_cache.go b/pkg/httpflv/gop_cache.go deleted file mode 100644 index 50b3564..0000000 --- a/pkg/httpflv/gop_cache.go +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright 2019, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package httpflv - -/* -// v1.0.0 版本之前不提供 httpflv 功能 - -import ( - "bytes" - log "github.com/q191201771/naza/pkg/nazalog" - "sync" -) - -type GOP struct { - tags []*Tag - //raw []byte - firstTimestamp uint32 -} - -type GOPCache struct { - gopNum int - - metadata *Tag - avcSeqHeader *Tag - aacSeqHeader *Tag - gops []*GOP // TODO chef: maybe use other container to mock a queue - mutex sync.Mutex -} - -// gopNum: 0 means only cache metadata, avc seq header, aac seq header -func NewGOPCache(gopNum int) *GOPCache { - return &GOPCache{ - gopNum: gopNum, - } -} - -func (c *GOPCache) Push(tag *Tag) { - c.mutex.Lock() - defer c.mutex.Unlock() - - if tag.IsMetadata() { - // TODO chef: will this happen? - if c.metadata != nil { - log.Debugf("updating metadata.") - log.Debug(tag.Header, tag.Raw[TagHeaderSize:]) - log.Debug(c.metadata.Header, c.metadata.Raw[TagHeaderSize:]) - c.clearGOP() - } - c.metadata = tag - } - if tag.IsAVCKeySeqHeader() { - if c.avcSeqHeader == nil { - c.avcSeqHeader = tag - } else { - // TODO chef: compare nessary? if other way to update seq header and handle cache stuff? - if bytes.Compare(tag.Raw[TagHeaderSize:], c.avcSeqHeader.Raw[TagHeaderSize:]) == 0 { - // noop - } else { - log.Debugf("updating avc seq header.") - log.Debug(tag.Header, tag.Raw[TagHeaderSize:]) - log.Debug(c.avcSeqHeader.Header, c.avcSeqHeader.Raw[TagHeaderSize:]) - c.clearGOP() - c.avcSeqHeader = tag - } - } - } - if tag.IsAACSeqHeader() { - if c.aacSeqHeader == nil { - c.aacSeqHeader = tag - } else { - if bytes.Compare(tag.Raw[TagHeaderSize:], c.aacSeqHeader.Raw[TagHeaderSize:]) == 0 { - // noop - } else { - log.Debugf("updating aac seq header.") - c.clearGOP() - c.aacSeqHeader = tag - } - } - c.aacSeqHeader = tag - } - - if c.gopNum == 0 { - return - } - - if len(c.gops) == 0 { - if tag.IsAVCKeyNalu() { - gop := &GOP{} - gop.firstTimestamp = tag.Header.Timestamp - gop.tags = append(gop.tags, tag) - c.gops = append(c.gops, gop) - c.syncOldestKeyNaluTimestampToSeqHeader() - } - } else { - if tag.IsAVCKeyNalu() { - gop := &GOP{} - gop.firstTimestamp = tag.Header.Timestamp - gop.tags = append(gop.tags, tag) - c.gops = append(c.gops, gop) - if len(c.gops) > c.gopNum+1 { - c.gops = c.gops[1:] - c.syncOldestKeyNaluTimestampToSeqHeader() - } - } else { - c.gops[len(c.gops)-1].tags = append(c.gops[len(c.gops)-1].tags, tag) - } - } -} - -func (c *GOPCache) WriteWholeThings(writer Writer) (hasKeyFrame bool) { - if tag := c.getMetadata(); tag != nil { - writer.WriteTag(tag) - } - - avc := c.getAVCSeqHeader() - aac := c.getAACSeqHeader() - // TODO chef: if nessary to sort them by timestamp - if avc != nil && aac != nil { - if avc.Header.Timestamp <= aac.Header.Timestamp { - writer.WriteTag(avc) - writer.WriteTag(aac) - } else { - writer.WriteTag(aac) - writer.WriteTag(avc) - } - } else if avc != nil && aac == nil { - writer.WriteTag(avc) - } else if avc == nil && aac != nil { - writer.WriteTag(aac) - } - c.writeGOPs(writer, false) - return -} - -func (c *GOPCache) ClearAll() { - c.mutex.Lock() - defer c.mutex.Unlock() - c.metadata = nil - c.avcSeqHeader = nil - c.aacSeqHeader = nil - c.gops = nil -} - -func (c *GOPCache) writeGOPs(write Writer, mustCompleted bool) bool { - c.mutex.Lock() - defer c.mutex.Unlock() - - neededLen := len(c.gops) - if mustCompleted { - neededLen-- - } - if neededLen <= 0 { - return false - } - - for i := 0; i != neededLen; i++ { - for j := 0; j != len(c.gops[i].tags); j++ { - write.WriteTag(c.gops[i].tags[j]) - } - } - return true -} - -func (c *GOPCache) getMetadata() (res *Tag) { - c.mutex.Lock() - defer c.mutex.Unlock() - if c.metadata != nil { - res = c.metadata.cloneTag() - } - return -} - -func (c *GOPCache) getAVCSeqHeader() (res *Tag) { - c.mutex.Lock() - defer c.mutex.Unlock() - - if c.avcSeqHeader != nil { - res = c.avcSeqHeader.cloneTag() - } - return -} - -func (c *GOPCache) getAACSeqHeader() (res *Tag) { - c.mutex.Lock() - defer c.mutex.Unlock() - - if c.aacSeqHeader != nil { - res = c.aacSeqHeader.cloneTag() - } - return -} - -func (c *GOPCache) clearGOP() { - log.Debug("clearGOP") - c.gops = nil -} - -// TODO chef: if nessary -func (c *GOPCache) syncOldestKeyNaluTimestampToSeqHeader() { - ts := c.gops[0].firstTimestamp - if c.avcSeqHeader != nil { - c.avcSeqHeader.Header.Timestamp = ts - } - if c.aacSeqHeader != nil { - c.aacSeqHeader.Header.Timestamp = ts - } -} - -*/ diff --git a/pkg/httpflv/group.go b/pkg/httpflv/group.go deleted file mode 100644 index 3ee7edf..0000000 --- a/pkg/httpflv/group.go +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright 2019, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package httpflv - -/* -// v1.0.0 版本之前不提供 httpflv 功能 - -// TODO chef: set me by config -var gopCacheNum = 2 - -// TODO chef: 所有新增对象的UniqueKey - -// TODO chef: 将Observer方式改成 func CB方式 -type GroupObserver interface { - ReadHTTPRespHeaderCB() - ReadFlvHeaderCB(flvHeader []byte) - ReadFlvTagCB(tag *Tag) -} - -type Group struct { - appName string - streamName string - - pullSession *PullSession - subSessionSet map[*SubSession]struct{} - gopCache *GOPCache - mutex sync.Mutex - - obs GroupObserver -} - -func NewGroup(appName string, streamName string) *Group { - return &Group{ - appName: appName, - streamName: streamName, - subSessionSet: make(map[*SubSession]struct{}), - gopCache: NewGOPCache(gopCacheNum), - } -} - -func (group *Group) RunLoop() { - -} - -func (group *Group) AddHTTPFlvSubSession(session *SubSession) { - group.mutex.Lock() - defer group.mutex.Unlock() - log.Debugf("add SubSession into group. [%s]", session.UniqueKey) - group.subSessionSet[session] = struct{}{} - - go func() { - if err := session.RunLoop(); err != nil { - log.Debugf("SubSession loop done. [%s] err=%v", session.UniqueKey, err) - } - - group.mutex.Lock() - defer group.mutex.Unlock() - log.Infof("del SubSession out of group. [%s]", session.UniqueKey) - delete(group.subSessionSet, session) - }() - - // TODO chef: 在这里发送http和flv的头,还是确保有数据了再发 - session.WriteHTTPResponseHeader() - session.WriteFlvHeader() - if group.gopCache.WriteWholeThings(session) { - session.HasKeyFrame = true - } -} - -func (group *Group) Pull(addr string, connectTimeout int64, readTimeout int64) { - group.pullSession = NewPullSession(PullSessionConfig{ - ConnectTimeoutMS: int(connectTimeout), - ReadTimeoutMS: int(readTimeout), - }) - - defer func() { - group.mutex.Lock() - defer group.mutex.Unlock() - group.pullSession = nil - log.Infof("del httpflv PullSession out of group. [%s]", group.pullSession.UniqueKey) - }() - - log.Infof("<----- connect. [%s]", group.pullSession.UniqueKey) - url := fmt.Sprintf("http://%s/%s/%s.flv", addr, group.appName, group.streamName) - // TODO chef: impl cb - if err := group.pullSession.Pull(url, group.ReadFlvTagCB); err != nil { - - } - //if err := group.pullSession.Pull(url, nil); err != nil { - //log.Errorf("-----> connect error. [%s] err=%v", group.pullSession.UniqueKey, err) - //return - //} - //log.Infof("-----> connect succ. [%s]", group.pullSession.UniqueKey) - - //if err := group.pullSession.RunLoop(); err != nil { - // log.Debugf("PullSession loop done. [%s] err=%v", group.pullSession.UniqueKey, err) - // return - //} -} - -func (group *Group) IsTotalEmpty() bool { - group.mutex.Lock() - defer group.mutex.Unlock() - return group.pullSession == nil && len(group.subSessionSet) == 0 -} - -func (group *Group) IsInExist() bool { - group.mutex.Lock() - defer group.mutex.Unlock() - return false -} - -func (group *Group) SetObserver(obs GroupObserver) { - // 确保如果调用SetObserver,那么调用发生在Pull之前,就不对obs加锁保护了 - group.obs = obs -} - -// PullSessionObserver -func (group *Group) ReadHTTPRespHeaderCB() { - if group.obs != nil { - group.obs.ReadHTTPRespHeaderCB() - } -} - -// PullSessionObserver -func (group *Group) ReadFlvHeaderCB(flvHeader []byte) { - if group.obs != nil { - group.obs.ReadFlvHeaderCB(flvHeader) - } -} - -// PullSessionObserver -func (group *Group) ReadFlvTagCB(tag *Tag) { - group.mutex.Lock() - defer group.mutex.Unlock() - // TODO chef: assume that write fast and would not block - for session := range group.subSessionSet { - // TODO chef: 如果一个流上只有音频永远没有视频该如何处理 - if session.HasKeyFrame { - session.WriteRawPacket(tag.Raw) - } else { - if tag.IsMetadata() || tag.IsAVCKeySeqHeader() || tag.IsAACSeqHeader() || tag.IsAVCKeyNalu() { - if tag.IsAVCKeyNalu() { - session.HasKeyFrame = true - } - session.WriteRawPacket(tag.Raw) - } - } - } - group.gopCache.Push(tag) - - if group.obs != nil { - group.obs.ReadFlvTagCB(tag) - } -} - -*/ diff --git a/pkg/httpflv/httpflv.go b/pkg/httpflv/httpflv.go index 51c908c..028d1f3 100644 --- a/pkg/httpflv/httpflv.go +++ b/pkg/httpflv/httpflv.go @@ -18,16 +18,14 @@ type Writer interface { WriteTag(tag *Tag) } -var httpFlvErr = errors.New("httpflv: fxxk") +var ErrHTTPFLV = errors.New("lal.httpflv: fxxk") const ( flvHeaderSize = 13 prevTagFieldSize = 4 ) -var FlvHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00} - -var readBufSize = 16384 +var FLVHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00} type LineReader interface { ReadLine() (line []byte, isPrefix bool, err error) @@ -44,7 +42,7 @@ func parseHTTPHeader(r LineReader) (n int, firstLine string, headers map[string] return } if len(line) == 0 || isPrefix { - err = httpFlvErr + err = ErrHTTPFLV return } firstLine = string(line) @@ -56,7 +54,7 @@ func parseHTTPHeader(r LineReader) (n int, firstLine string, headers map[string] break } if isPrefix { - err = httpFlvErr + err = ErrHTTPFLV return } if err != nil { @@ -66,7 +64,7 @@ func parseHTTPHeader(r LineReader) (n int, firstLine string, headers map[string] n += len(l) pos := strings.Index(l, ":") if pos == -1 { - err = httpFlvErr + err = ErrHTTPFLV return } headers[strings.Trim(l[0:pos], " ")] = strings.Trim(l[pos+1:], " ") diff --git a/pkg/httpflv/server.go b/pkg/httpflv/server.go index dd879bc..2aace70 100644 --- a/pkg/httpflv/server.go +++ b/pkg/httpflv/server.go @@ -18,23 +18,22 @@ import ( type ServerObserver interface { // 通知上层有新的拉流者 // 返回值: true则允许拉流,false则关闭连接 - NewHTTPFlvSubSessionCB(session *SubSession) bool + NewHTTPFLVSubSessionCB(session *SubSession) bool + DelHTTPFLVSubSessionCB(session *SubSession) } type Server struct { - obs ServerObserver - addr string - subWriteTimeout int64 + obs ServerObserver + addr string m sync.Mutex ln net.Listener } -func NewServer(obs ServerObserver, addr string, subWriteTimeout int64) *Server { +func NewServer(obs ServerObserver, addr string) *Server { return &Server{ - obs: obs, - addr: addr, - subWriteTimeout: subWriteTimeout, + obs: obs, + addr: addr, } } @@ -69,14 +68,18 @@ func (server *Server) Dispose() { func (server *Server) handleConnect(conn net.Conn) { log.Infof("accept a http flv connection. remoteAddr=%v", conn.RemoteAddr()) - session := NewSubSession(conn, server.subWriteTimeout) + session := NewSubSession(conn) if err := session.ReadRequest(); err != nil { log.Errorf("read SubSession request error. [%s]", session.UniqueKey) return } log.Infof("-----> http request. [%s] uri=%s", session.UniqueKey, session.URI) - if !server.obs.NewHTTPFlvSubSessionCB(session) { - session.Dispose(httpFlvErr) + if !server.obs.NewHTTPFLVSubSessionCB(session) { + session.Dispose() } + + err := session.RunLoop() + log.Debugf("httpflv sub session loop done. err=%v", err) + server.obs.DelHTTPFLVSubSessionCB(session) } diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index 8a067f3..36c2014 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -9,14 +9,13 @@ package httpflv import ( - "bufio" "net" url2 "net/url" "strings" - "sync" - "sync/atomic" "time" + "github.com/q191201771/naza/pkg/connection" + log "github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/unique" ) @@ -33,40 +32,33 @@ var flvHTTPResponseHeader = []byte(flvHTTPResponseHeaderStr) var flvHeaderBuf13 = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x0, 0x0, 0x0, 0x09, 0x0, 0x0, 0x0, 0x0} -var wChanSize = 1024 // TODO chef: 1024 - type SubSession struct { UniqueKey string - writeTimeout int64 - StartTick int64 StreamName string AppName string URI string Headers map[string]string - HasKeyFrame bool + IsFresh bool + WaitKeyNalu bool - conn net.Conn - rb *bufio.Reader - wChan chan []byte - - closeOnce sync.Once - exitChan chan struct{} - hasClosedFlag uint32 + conn connection.Connection } -func NewSubSession(conn net.Conn, writeTimeout int64) *SubSession { +func NewSubSession(conn net.Conn) *SubSession { uk := unique.GenUniqueKey("FLVSUB") log.Infof("lifecycle new SubSession. [%s] remoteAddr=%s", uk, conn.RemoteAddr().String()) return &SubSession{ - writeTimeout: writeTimeout, - conn: conn, - rb: bufio.NewReaderSize(conn, readBufSize), - wChan: make(chan []byte, wChanSize), - exitChan: make(chan struct{}), - UniqueKey: uk, + UniqueKey: uk, + IsFresh: true, + WaitKeyNalu: true, + conn: connection.New(conn, func(option *connection.Option) { + option.ReadBufSize = readBufSize + option.WriteChanSize = wChanSize + option.WriteTimeoutMS = subSessionWriteTimeoutMS + }), } } @@ -76,19 +68,19 @@ func (session *SubSession) ReadRequest() (err error) { defer func() { if err != nil { - session.Dispose(err) + session.Dispose() } }() var firstLine string - _, firstLine, session.Headers, err = parseHTTPHeader(session.rb) + _, firstLine, session.Headers, err = parseHTTPHeader(session.conn) if err != nil { return } items := strings.Split(string(firstLine), " ") if len(items) != 3 || items[0] != "GET" { - err = httpFlvErr + err = ErrHTTPFLV return } @@ -99,19 +91,19 @@ func (session *SubSession) ReadRequest() (err error) { return } if !strings.HasSuffix(urlObj.Path, ".flv") { - err = httpFlvErr + err = ErrHTTPFLV return } items = strings.Split(urlObj.Path, "/") if len(items) != 3 { - err = httpFlvErr + err = ErrHTTPFLV return } session.AppName = items[1] items = strings.Split(items[2], ".") if len(items) < 2 { - err = httpFlvErr + err = ErrHTTPFLV return } session.StreamName = items[0] @@ -120,15 +112,9 @@ func (session *SubSession) ReadRequest() (err error) { } func (session *SubSession) RunLoop() error { - go func() { - buf := make([]byte, 128) - if _, err := session.conn.Read(buf); err != nil { - log.Errorf("read failed. [%s] err=%v", session.UniqueKey, err) - session.Dispose(err) - } - }() - - return session.runWriteLoop() + buf := make([]byte, 128) + _, err := session.conn.Read(buf) + return err } func (session *SubSession) WriteHTTPResponseHeader() { @@ -136,7 +122,7 @@ func (session *SubSession) WriteHTTPResponseHeader() { session.WriteRawPacket(flvHTTPResponseHeader) } -func (session *SubSession) WriteFlvHeader() { +func (session *SubSession) WriteFLVHeader() { log.Infof("<----- http flv header. [%s]", session.UniqueKey) session.WriteRawPacket(flvHeaderBuf13) } @@ -146,52 +132,9 @@ func (session *SubSession) WriteTag(tag *Tag) { } func (session *SubSession) WriteRawPacket(pkt []byte) { - if session.hasClosed() { - return - } - for { - select { - case session.wChan <- pkt: - return - default: - if session.hasClosed() { - return - } - } - } -} - -func (session *SubSession) Dispose(err error) { - session.closeOnce.Do(func() { - log.Infof("lifecycle dispose SubSession. [%s] reason=%v", session.UniqueKey, err) - atomic.StoreUint32(&session.hasClosedFlag, 1) - close(session.exitChan) - if err := session.conn.Close(); err != nil { - log.Errorf("conn close error. [%s] err=%v", session.UniqueKey, err) - } - }) -} - -func (session *SubSession) runWriteLoop() error { - for { - select { - case <-session.exitChan: - return httpFlvErr - case pkt := <-session.wChan: - if session.hasClosed() { - return httpFlvErr - } - - // TODO chef: use bufio.Writer - _, err := session.conn.Write(pkt) - if err != nil { - session.Dispose(err) - return err - } - } - } + _, _ = session.conn.Write(pkt) } -func (session *SubSession) hasClosed() bool { - return atomic.LoadUint32(&session.hasClosedFlag) == 1 +func (session *SubSession) Dispose() { + _ = session.conn.Close() } diff --git a/pkg/httpflv/tag.go b/pkg/httpflv/tag.go index ec06214..cb0bb33 100644 --- a/pkg/httpflv/tag.go +++ b/pkg/httpflv/tag.go @@ -100,7 +100,7 @@ func IsAACSeqHeader(tag []byte) bool { return tag[0] == TagTypeAudio && tag[TagHeaderSize]>>4 == SoundFormatAAC && tag[TagHeaderSize+1] == AACPacketTypeSeqHeader } -func PackHTTPFlvTag(t uint8, timestamp uint32, in []byte) []byte { +func PackHTTPFLVTag(t uint8, timestamp uint32, in []byte) []byte { out := make([]byte, TagHeaderSize+len(in)+prevTagSizeFieldSize) out[0] = t bele.BEPutUint24(out[1:], uint32(len(in))) diff --git a/pkg/logic/lal.go b/pkg/httpflv/var.go similarity index 53% rename from pkg/logic/lal.go rename to pkg/httpflv/var.go index b98da5e..43c92d1 100644 --- a/pkg/logic/lal.go +++ b/pkg/httpflv/var.go @@ -6,8 +6,8 @@ // // Author: Chef (191201771@qq.com) -package logic +package httpflv -import "errors" - -var lalErr = errors.New("lal: fxxk") +var readBufSize = 16384 // ClientPullSession 和 SubSession 读取数据时 +var wChanSize = 1024 // SubSession 发送数据时 channel 的大小 +var subSessionWriteTimeoutMS = 10000 diff --git a/pkg/logic/config.go b/pkg/logic/config.go index 6d76502..40b117a 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -17,35 +17,22 @@ import ( ) type Config struct { - RTMP RTMP `json:"rtmp"` - Log log.Option `json:"log"` - PProf PProf `json:"pprof"` - - // v1.0.0之前不提供 - SubIdleTimeout int64 `json:"sub_idle_timeout"` - GOPCacheNum int `json:"gop_cache_number"` - HTTPFlv HTTPFlv `json:"httpflv"` - Pull Pull `json:"pull"` + RTMP RTMP `json:"rtmp"` + HTTPFLV HTTPFLV `json:"httpflv"` + Log log.Option `json:"log"` + PProf PProf `json:"pprof"` } type RTMP struct { Addr string `json:"addr"` } -type PProf struct { - Addr string `json:"addr"` -} - -type HTTPFlv struct { +type HTTPFLV struct { SubListenAddr string `json:"sub_listen_addr"` } -type Pull struct { - Type string `json:"type"` - Addr string `json:"addr"` - ConnectTimeout int64 `json:"connect_timeout"` - ReadTimeout int64 `json:"read_timeout"` - StopPullWhileNoSubTimeout int64 `json:"stop_pull_while_no_sub_timeout"` +type PProf struct { + Addr string `json:"addr"` } func LoadConf(confFile string) (*Config, error) { @@ -58,14 +45,15 @@ func LoadConf(confFile string) (*Config, error) { return nil, err } - // TODO chef: check item valid. j, err := nazajson.New(rawContent) if err != nil { return nil, err } - if !j.Exist("rtmp.addr") { - config.RTMP.Addr = ":1935" - } + + // 检查配置必须项 + // 暂时无 + + // 配置不存在时,设置默认值 if !j.Exist("log.level") { config.Log.Level = log.LevelDebug } diff --git a/pkg/logic/example_test.go b/pkg/logic/example_test.go new file mode 100644 index 0000000..cd3b92d --- /dev/null +++ b/pkg/logic/example_test.go @@ -0,0 +1,170 @@ +// Copyright 2019, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/q191201771/naza/pkg/nazaatomic" + "github.com/q191201771/naza/pkg/nazalog" + + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/rtmp" + + "github.com/q191201771/naza/pkg/assert" +) + +var ( + tt *testing.T + + confFile = "testdata/lals.default.conf.json" + + rFLVFileName = "testdata/test.flv" + wFLVPullFileName = "testdata/flvpull.flv" + wRTMPPullFileName = "testdata/rtmppull.flv" + + pushURL string + httpflvPullURL string + rtmpPullURL string + + fileReader httpflv.FLVFileReader + HTTPFLVWriter httpflv.FLVFileWriter + RTMPWriter httpflv.FLVFileWriter + + pushSession *rtmp.PushSession + httpflvPullSession *httpflv.PullSession + rtmpPullSession *rtmp.PullSession + + fileTagCount nazaatomic.Uint32 + httpflvPullTagCount nazaatomic.Uint32 + rtmpPullTagCount nazaatomic.Uint32 +) + +type MockRTMPPullSessionObserver struct { +} + +// TODO chef: httpflv 和 rtmp 两种协议的 pull 接口形式不统一 +func (mrpso *MockRTMPPullSessionObserver) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { + tag := Trans.RTMPMsg2FLVTag(header, timestampAbs, message) + err := RTMPWriter.WriteTag(*tag) + assert.Equal(tt, nil, err) + rtmpPullTagCount.Increment() +} + +func TestExample(t *testing.T) { + tt = t + + var err error + + err = fileReader.Open(rFLVFileName) + assert.Equal(t, nil, err) + + config, err := LoadConf(confFile) + assert.IsNotNil(t, config) + assert.Equal(t, nil, err) + + pushURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/11111", config.RTMP.Addr) + httpflvPullURL = fmt.Sprintf("http://127.0.0.1%s/live/11111.flv", config.HTTPFLV.SubListenAddr) + rtmpPullURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/11111", config.RTMP.Addr) + + sm := NewServerManager(config) + go sm.RunLoop() + + time.Sleep(200 * time.Millisecond) + + err = HTTPFLVWriter.Open(wFLVPullFileName) + assert.Equal(t, nil, err) + err = HTTPFLVWriter.WriteRaw(httpflv.FLVHeader) + assert.Equal(t, nil, err) + + err = RTMPWriter.Open(wRTMPPullFileName) + assert.Equal(t, nil, err) + err = RTMPWriter.WriteRaw(httpflv.FLVHeader) + assert.Equal(t, nil, err) + + go func() { + var mrpso MockRTMPPullSessionObserver + rtmpPullSession = rtmp.NewPullSession(&mrpso, rtmp.PullSessionTimeout{ + ReadAVTimeoutMS: 500, + }) + err := rtmpPullSession.Pull(rtmpPullURL) + assert.Equal(t, nil, err) + }() + + go func() { + httpflvPullSession = httpflv.NewPullSession(httpflv.PullSessionConfig{ + ReadTimeoutMS: 500, + }) + err := httpflvPullSession.Pull(httpflvPullURL, func(tag *httpflv.Tag) { + err := HTTPFLVWriter.WriteTag(*tag) + assert.Equal(t, nil, err) + httpflvPullTagCount.Increment() + }) + nazalog.Error(err) + }() + + time.Sleep(200 * time.Millisecond) + + pushSession = rtmp.NewPushSession(rtmp.PushSessionTimeout{}) + err = pushSession.Push(pushURL) + assert.Equal(t, nil, err) + + _, err = fileReader.ReadFLVHeader() + assert.Equal(t, nil, err) + for { + tag, err := fileReader.ReadTag() + if err == io.EOF { + break + } + assert.Equal(t, nil, err) + fileTagCount.Increment() + h, _, m := Trans.FLVTag2RTMPMsg(*tag) + chunks := rtmp.Message2Chunks(m, &h) + err = pushSession.AsyncWrite(chunks) + assert.Equal(t, nil, err) + } + err = pushSession.Flush() + assert.Equal(t, nil, err) + + time.Sleep(1 * time.Second) + + fileReader.Dispose() + pushSession.Dispose() + httpflvPullSession.Dispose(nil) + rtmpPullSession.Dispose() + HTTPFLVWriter.Dispose() + RTMPWriter.Dispose() + sm.Dispose() + + nazalog.Debugf("count. %d %d %d", fileTagCount.Load(), httpflvPullTagCount.Load(), rtmpPullTagCount.Load()) + compareFile() +} + +func compareFile() { + r, err := ioutil.ReadFile(rFLVFileName) + assert.Equal(tt, nil, err) + w, err := ioutil.ReadFile(wFLVPullFileName) + assert.Equal(tt, nil, err) + res := bytes.Compare(r, w) + assert.Equal(tt, 0, res) + err = os.Remove(wFLVPullFileName) + assert.Equal(tt, nil, err) + w2, err := ioutil.ReadFile(wRTMPPullFileName) + assert.Equal(tt, nil, err) + res = bytes.Compare(r, w2) + assert.Equal(tt, 0, res) + err = os.Remove(wRTMPPullFileName) + assert.Equal(tt, nil, err) +} diff --git a/pkg/logic/group.go b/pkg/logic/group.go index c36e465..2ecdd40 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -10,7 +10,6 @@ package logic import ( "sync" - "time" "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/rtmp" @@ -26,42 +25,39 @@ type Group struct { exitChan chan struct{} - mutex sync.Mutex - pubSession *rtmp.ServerSession - pullSession *rtmp.PullSession - subSessionSet map[*rtmp.ServerSession]struct{} - // TODO chef: + mutex sync.Mutex + pubSession *rtmp.ServerSession + pullSession *rtmp.PullSession + rtmpSubSessionSet map[*rtmp.ServerSession]struct{} + httpflvSubSessionSet map[*httpflv.SubSession]struct{} + // rtmp chunk格式 metadata []byte avcKeySeqHeader []byte aacSeqHeader []byte + // httpflv tag格式 + // TODO chef: 如果没有开启httpflv监听,可以不做格式转换,节约CPU资源 + metadataTag *httpflv.Tag + avcKeySeqHeaderTag *httpflv.Tag + aacSeqHeaderTag *httpflv.Tag } var _ rtmp.PubSessionObserver = &Group{} func NewGroup(appName string, streamName string) *Group { - uk := unique.GenUniqueKey("RTMPGROUP") + uk := unique.GenUniqueKey("GROUP") log.Infof("lifecycle new group. [%s] appName=%s, streamName=%s", uk, appName, streamName) return &Group{ - UniqueKey: uk, - appName: appName, - streamName: streamName, - exitChan: make(chan struct{}, 1), - subSessionSet: make(map[*rtmp.ServerSession]struct{}), + UniqueKey: uk, + appName: appName, + streamName: streamName, + exitChan: make(chan struct{}, 1), + rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}), + httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}), } } func (group *Group) RunLoop() { - t := time.NewTicker(200 * time.Millisecond) - defer t.Stop() - - for { - select { - case <-group.exitChan: - break - case <-t.C: - //noop - } - } + <-group.exitChan } func (group *Group) Dispose(err error) { @@ -73,7 +69,10 @@ func (group *Group) Dispose(err error) { if group.pubSession != nil { group.pubSession.Dispose() } - for session := range group.subSessionSet { + for session := range group.rtmpSubSessionSet { + session.Dispose() + } + for session := range group.httpflvSubSessionSet { session.Dispose() } } @@ -92,16 +91,6 @@ func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool { return true } -func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) { - log.Debugf("add SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) - group.mutex.Lock() - defer group.mutex.Unlock() - group.subSessionSet[session] = struct{}{} - - // TODO chef: 多长没有拉流session存在的功能 - //group.turnToEmptyTick = 0 -} - func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) { log.Debugf("del PubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) group.mutex.Lock() @@ -110,49 +99,49 @@ func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) { group.metadata = nil group.avcKeySeqHeader = nil group.aacSeqHeader = nil + group.metadataTag = nil + group.avcKeySeqHeaderTag = nil + group.aacSeqHeaderTag = nil +} + +func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) { + log.Debugf("add SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + group.mutex.Lock() + defer group.mutex.Unlock() + group.rtmpSubSessionSet[session] = struct{}{} + // TODO chef: 多长没有拉流session存在的功能 + //group.turnToEmptyTick = 0 } func (group *Group) DelRTMPSubSession(session *rtmp.ServerSession) { log.Debugf("del SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) group.mutex.Lock() defer group.mutex.Unlock() - delete(group.subSessionSet, session) + delete(group.rtmpSubSessionSet, session) } -func (group *Group) AddHTTPFlvSubSession(session *httpflv.SubSession) { - panic("not impl") +func (group *Group) AddHTTPFLVSubSession(session *httpflv.SubSession) { + log.Debugf("add httpflv SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + session.WriteHTTPResponseHeader() + session.WriteFLVHeader() + + group.mutex.Lock() + defer group.mutex.Unlock() + group.httpflvSubSessionSet[session] = struct{}{} } -func (group *Group) Pull(addr string, connectTimeout int64) { - // TODO chef: config me, - // v1.0.0 版本之前先不提供去其他节点回源的功能 - panic("not impl yet") - //group.pullSession = NewPullSession(group, PullSessionTimeout{ - // ConnectTimeoutMS: int(connectTimeout), - //}) - // - //defer func() { - // group.mutex.Lock() - // defer group.mutex.Unlock() - // log.Infof("del rtmp PullSession out of group. [%s] [%s]", group.UniqueKey, group.pullSession) - // group.pullSession = nil - //}() - // - //url := fmt.Sprintf("rtmp://%s/%s/%s", addr, group.appName, group.streamName) - //if err := group.pullSession.Pull(url); err != nil { - // log.Error(err) - //} - //if err := group.pullSession.WaitLoop(); err != nil { - // log.Debugf("rtmp PullSession loop done. [%s] [%s] err=%v", group.UniqueKey, group.pullSession.UniqueKey, err) - // return - //} +func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) { + log.Debugf("del httpflv SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + group.mutex.Lock() + defer group.mutex.Unlock() + delete(group.httpflvSubSessionSet, session) } func (group *Group) IsTotalEmpty() bool { group.mutex.Lock() defer group.mutex.Unlock() - return group.pubSession == nil && len(group.subSessionSet) == 0 + return group.pubSession == nil && len(group.rtmpSubSessionSet) == 0 && len(group.httpflvSubSessionSet) == 0 } func (group *Group) IsInExist() bool { @@ -165,13 +154,20 @@ func (group *Group) IsInExist() bool { func (group *Group) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { group.mutex.Lock() defer group.mutex.Unlock() - group.broadcastRTMP2RTMP(header, timestampAbs, message) + + group.broadcastRTMP(header, timestampAbs, message) } -func (group *Group) broadcastRTMP2RTMP(header rtmp.Header, timestampAbs uint32, message []byte) { +func (group *Group) broadcastRTMP(header rtmp.Header, timestampAbs uint32, message []byte) { //log.Infof("%+v", header) - // # 1. 设置好头部信息 - var currHeader rtmp.Header + + var ( + currHeader rtmp.Header + absChunks []byte + currTag *httpflv.Tag + ) + + // # 1. 设置好 rtmp 头部信息 currHeader.MsgLen = uint32(len(message)) currHeader.Timestamp = timestampAbs currHeader.MsgTypeID = header.MsgTypeID @@ -188,10 +184,8 @@ func (group *Group) broadcastRTMP2RTMP(header rtmp.Header, timestampAbs uint32, //prevHeader = group.prevVideoHeader } - var absChunks []byte - - // # 2. 广播。遍历所有sub session,决定是否转发 - for session := range group.subSessionSet { + // # 2. 广播。遍历所有rtmp sub session,决定是否转发 + for session := range group.rtmpSubSessionSet { // ## 2.1. 一个message广播给多个sub session时,只做一次chunk切割 if absChunks == nil { absChunks = rtmp.Message2Chunks(message, &currHeader) @@ -212,7 +206,7 @@ func (group *Group) broadcastRTMP2RTMP(header rtmp.Header, timestampAbs uint32, session.IsFresh = false } - // ## 2.3. 判断当前包的类型,以及sub session的状态,决定是否发送并更新sub session的状态 + // ## 2.3. 判断当前包的类型,以及sub session的状态,决定是否发送,并更新sub session的状态 switch header.MsgTypeID { case rtmp.TypeidDataMessageAMF0: session.AsyncWrite(absChunks) @@ -232,69 +226,90 @@ func (group *Group) broadcastRTMP2RTMP(header rtmp.Header, timestampAbs uint32, } } + } + // # 3. 广播。遍历所有httpflv sub session,决定是否转发 + for session := range group.httpflvSubSessionSet { + // ## 3.1. 将当前 message 转换成 tag 格式 + if currTag == nil { + currTag = Trans.RTMPMsg2FLVTag(header, timestampAbs, message) + } + + // ## 3.2. 如果是新的sub session,发送已缓存的信息 + if session.IsFresh { + // 发送缓存的头部信息 + if group.metadataTag != nil { + log.Debugf("send cache metadata. [%s]", session.UniqueKey) + session.WriteTag(group.metadataTag) + } + if group.avcKeySeqHeaderTag != nil { + session.WriteTag(group.avcKeySeqHeaderTag) + } + if group.aacSeqHeaderTag != nil { + session.WriteTag(group.aacSeqHeaderTag) + } + session.IsFresh = false + } + + // ## 3.3. 判断当前包的类型,以及sub session的状态,决定是否发送,并更新sub session的状态 + switch header.MsgTypeID { + case rtmp.TypeidDataMessageAMF0: + session.WriteTag(currTag) + case rtmp.TypeidAudio: + session.WriteTag(currTag) + case rtmp.TypeidVideo: + if session.WaitKeyNalu { + if message[0] == 0x17 && message[1] == 0x0 { + session.WriteTag(currTag) + } + if message[0] == 0x17 && message[1] == 0x1 { + session.WriteTag(currTag) + session.WaitKeyNalu = false + } + } else { + session.WriteTag(currTag) + } + + } } - // # 3. 缓存 metadata 和 avc key seq header 和 aac seq header - // 由于可能没有订阅者,所以message可能还没做chunk切割,所以这里要做判断是否做chunk切割 + // # 4. 缓存 rtmp 以及 httpflv 的 metadata 和 avc key seq header 和 aac seq header + // 由于可能没有订阅者,所以可能需要重新打包 switch header.MsgTypeID { case rtmp.TypeidDataMessageAMF0: if absChunks == nil { absChunks = rtmp.Message2Chunks(message, &currHeader) } - log.Debugf("cache metadata. [%s]", group.UniqueKey) + if currTag == nil { + currTag = Trans.RTMPMsg2FLVTag(header, timestampAbs, message) + } group.metadata = absChunks + group.metadataTag = currTag + log.Debugf("cache metadata. [%s]", group.UniqueKey) case rtmp.TypeidVideo: // TODO chef: magic number if message[0] == 0x17 && message[1] == 0x0 { if absChunks == nil { absChunks = rtmp.Message2Chunks(message, &currHeader) } - log.Debugf("cache avc key seq header. [%s]", group.UniqueKey) + if currTag == nil { + currTag = Trans.RTMPMsg2FLVTag(header, timestampAbs, message) + } group.avcKeySeqHeader = absChunks + group.avcKeySeqHeaderTag = currTag + log.Debugf("cache avc key seq header. [%s]", group.UniqueKey) } case rtmp.TypeidAudio: if (message[0]>>4) == 0x0a && message[1] == 0x0 { if absChunks == nil { absChunks = rtmp.Message2Chunks(message, &currHeader) } - log.Debugf("cache aac seq header. [%s]", group.UniqueKey) + if currTag == nil { + currTag = Trans.RTMPMsg2FLVTag(header, timestampAbs, message) + } group.aacSeqHeader = absChunks + group.aacSeqHeaderTag = currTag + log.Debugf("cache aac seq header. [%s]", group.UniqueKey) } } } - -func (group *Group) pullIfNeeded() { - panic("not impl") - //if !gm.isInExist() { - // switch gm.config.Pull.Type { - // case "httpflv": - // go gm.httpFlvGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout, gm.config.Pull.ReadTimeout) - // case "rtmp": - // go gm.rtmpGroup.Pull(gm.config.Pull.Addr, gm.config.Pull.ConnectTimeout) - // } - //} -} - -func (group *Group) isInExist() bool { - panic("not impl") - //return (gm.rtmpGroup != nil && gm.rtmpGroup.IsInExist()) || - // (gm.httpFlvGroup != nil && gm.httpFlvGroup.IsInExist()) -} - -// GroupObserver of httpflv.Group -func (group *Group) ReadHTTPRespHeaderCB() { - // noop -} - -// GroupObserver of httpflv.Group -func (group *Group) ReadFlvHeaderCB(flvHeader []byte) { - // noop -} - -// GroupObserver of httpflv.Group -func (group *Group) ReadFlvTagCB(tag *httpflv.Tag) { - log.Info("ReadFlvTagCB") - - // TODO chef: broadcast to rtmp.Group -} diff --git a/pkg/logic/logic.go b/pkg/logic/logic.go new file mode 100644 index 0000000..57aced5 --- /dev/null +++ b/pkg/logic/logic.go @@ -0,0 +1,22 @@ +// Copyright 2019, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "errors" + + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/rtmp" +) + +var ErrLogic = errors.New("lal.logic: fxxk") + +var _ rtmp.ServerObserver = &ServerManager{} +var _ httpflv.ServerObserver = &ServerManager{} +var _ rtmp.PubSessionObserver = &Group{} diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index a74e941..0a8c1f0 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -20,14 +20,13 @@ import ( type ServerManager struct { config *Config - httpFlvServer *httpflv.Server + httpflvServer *httpflv.Server rtmpServer *rtmp.Server - groupMap map[string]*Group // TODO chef: with appName - mutex sync.Mutex exitChan chan struct{} -} -var _ rtmp.ServerObserver = &ServerManager{} + mutex sync.Mutex + groupMap map[string]*Group // TODO chef: with appName +} func NewServerManager(config *Config) *ServerManager { m := &ServerManager{ @@ -35,8 +34,8 @@ func NewServerManager(config *Config) *ServerManager { groupMap: make(map[string]*Group), exitChan: make(chan struct{}), } - if len(config.HTTPFlv.SubListenAddr) != 0 { - m.httpFlvServer = httpflv.NewServer(m, config.HTTPFlv.SubListenAddr, config.SubIdleTimeout) + if len(config.HTTPFLV.SubListenAddr) != 0 { + m.httpflvServer = httpflv.NewServer(m, config.HTTPFLV.SubListenAddr) } if len(config.RTMP.Addr) != 0 { m.rtmpServer = rtmp.NewServer(m, config.RTMP.Addr) @@ -45,9 +44,9 @@ func NewServerManager(config *Config) *ServerManager { } func (sm *ServerManager) RunLoop() { - if sm.httpFlvServer != nil { + if sm.httpflvServer != nil { go func() { - if err := sm.httpFlvServer.RunLoop(); err != nil { + if err := sm.httpflvServer.RunLoop(); err != nil { log.Error(err) } }() @@ -82,8 +81,8 @@ func (sm *ServerManager) RunLoop() { func (sm *ServerManager) Dispose() { log.Debug("dispose server manager.") - if sm.httpFlvServer != nil { - sm.httpFlvServer.Dispose() + if sm.httpflvServer != nil { + sm.httpflvServer.Dispose() } if sm.rtmpServer != nil { sm.rtmpServer.Dispose() @@ -91,9 +90,8 @@ func (sm *ServerManager) Dispose() { sm.mutex.Lock() for _, group := range sm.groupMap { - group.Dispose(lalErr) + group.Dispose(ErrLogic) } - sm.groupMap = nil sm.mutex.Unlock() sm.exitChan <- struct{}{} @@ -101,51 +99,73 @@ func (sm *ServerManager) Dispose() { // ServerObserver of rtmp.Server func (sm *ServerManager) NewRTMPPubSessionCB(session *rtmp.ServerSession) bool { + sm.mutex.Lock() + defer sm.mutex.Unlock() group := sm.getOrCreateGroup(session.AppName, session.StreamName) return group.AddRTMPPubSession(session) } // ServerObserver of rtmp.Server -func (sm *ServerManager) NewRTMPSubSessionCB(session *rtmp.ServerSession) bool { - group := sm.getOrCreateGroup(session.AppName, session.StreamName) - group.AddRTMPSubSession(session) - return true +func (sm *ServerManager) DelRTMPPubSessionCB(session *rtmp.ServerSession) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + group := sm.getGroup(session.AppName, session.StreamName) + if group != nil { + group.DelRTMPPubSession(session) + } } // ServerObserver of rtmp.Server -func (sm *ServerManager) DelRTMPPubSessionCB(session *rtmp.ServerSession) { +func (sm *ServerManager) NewRTMPSubSessionCB(session *rtmp.ServerSession) bool { + sm.mutex.Lock() + defer sm.mutex.Unlock() group := sm.getOrCreateGroup(session.AppName, session.StreamName) - group.DelRTMPPubSession(session) + group.AddRTMPSubSession(session) + return true } // ServerObserver of rtmp.Server func (sm *ServerManager) DelRTMPSubSessionCB(session *rtmp.ServerSession) { - group := sm.getOrCreateGroup(session.AppName, session.StreamName) - group.DelRTMPSubSession(session) + sm.mutex.Lock() + defer sm.mutex.Unlock() + group := sm.getGroup(session.AppName, session.StreamName) + if group != nil { + group.DelRTMPSubSession(session) + } } // ServerObserver of httpflv.Server -func (sm *ServerManager) NewHTTPFlvSubSessionCB(session *httpflv.SubSession) bool { +func (sm *ServerManager) NewHTTPFLVSubSessionCB(session *httpflv.SubSession) bool { + sm.mutex.Lock() + defer sm.mutex.Unlock() group := sm.getOrCreateGroup(session.AppName, session.StreamName) - group.AddHTTPFlvSubSession(session) + group.AddHTTPFLVSubSession(session) return true } +// ServerObserver of httpflv.Server +func (sm *ServerManager) DelHTTPFLVSubSessionCB(session *httpflv.SubSession) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + group := sm.getGroup(session.AppName, session.StreamName) + if group != nil { + group.DelHTTPFLVSubSession(session) + } +} + func (sm *ServerManager) check() { sm.mutex.Lock() defer sm.mutex.Unlock() for k, group := range sm.groupMap { if group.IsTotalEmpty() { log.Infof("erase empty group manager. [%s]", group.UniqueKey) - group.Dispose(lalErr) + group.Dispose(ErrLogic) delete(sm.groupMap, k) } } } func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group { - sm.mutex.Lock() - defer sm.mutex.Unlock() group, exist := sm.groupMap[streamName] if !exist { group = NewGroup(appName, streamName) @@ -154,3 +174,11 @@ func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Gr go group.RunLoop() return group } + +func (sm *ServerManager) getGroup(appName string, streamName string) *Group { + group, exist := sm.groupMap[streamName] + if !exist { + return nil + } + return group +} diff --git a/pkg/logic/trans.go b/pkg/logic/trans.go index 5cf7985..a1c7aef 100644 --- a/pkg/logic/trans.go +++ b/pkg/logic/trans.go @@ -18,11 +18,11 @@ var Trans trans type trans struct { } -//// TODO chef: rtmp msg 也弄成结构体 -func (t trans) FlvTag2RTMPMsg(tag httpflv.Tag) (header rtmp.Header, timestampAbs uint32, message []byte) { +// 注意,tag -> message [nocopy] +func (t trans) FLVTag2RTMPMsg(tag httpflv.Tag) (header rtmp.Header, timestampAbs uint32, message []byte) { header.MsgLen = tag.Header.DataSize header.MsgTypeID = tag.Header.T - header.MsgStreamID = rtmp.MSID1 // TODO + header.MsgStreamID = rtmp.MSID1 switch tag.Header.T { case httpflv.TagTypeMetadata: header.CSID = rtmp.CSIDAMF @@ -37,11 +37,12 @@ func (t trans) FlvTag2RTMPMsg(tag httpflv.Tag) (header rtmp.Header, timestampAbs return } -func (t trans) RTMPMsg2FlvTag(header rtmp.Header, timestampAbs uint32, message []byte) httpflv.Tag { +// 注意,message -> tag [copy] +func (t trans) RTMPMsg2FLVTag(header rtmp.Header, timestampAbs uint32, message []byte) *httpflv.Tag { var tag httpflv.Tag tag.Header.T = header.MsgTypeID tag.Header.DataSize = header.MsgLen tag.Header.Timestamp = timestampAbs - tag.Raw = httpflv.PackHTTPFlvTag(header.MsgTypeID, timestampAbs, message) - return tag + tag.Raw = httpflv.PackHTTPFLVTag(header.MsgTypeID, timestampAbs, message) + return &tag } diff --git a/pkg/rtmp/amf0.go b/pkg/rtmp/amf0.go index 9bc27f1..e98a4fb 100644 --- a/pkg/rtmp/amf0.go +++ b/pkg/rtmp/amf0.go @@ -22,8 +22,8 @@ import ( ) var ( - ErrAMFInvalidType = errors.New("lal.AMF0: invalid type") - ErrAMFTooShort = errors.New("lal.AMF0: too short") + ErrAMFInvalidType = errors.New("lal.rtmp: invalid amf0 type") + ErrAMFTooShort = errors.New("lal.rtmp: too short to unmarshal amf0 data") ) const ( diff --git a/pkg/rtmp/amf0_test.go b/pkg/rtmp/amf0_test.go index 68399df..5b80eac 100644 --- a/pkg/rtmp/amf0_test.go +++ b/pkg/rtmp/amf0_test.go @@ -67,9 +67,9 @@ func TestAmf0_WriteString_ReadString(t *testing.T) { func TestAmf0_WriteObject_ReadObject(t *testing.T) { out := &bytes.Buffer{} objs := []ObjectPair{ - {"air", 3}, - {"ban", "cat"}, - {"dog", true}, + {Key: "air", Value: 3}, + {Key: "ban", Value: "cat"}, + {Key: "dog", Value: true}, } err := AMF0.WriteObject(out, objs) assert.Equal(t, nil, err) @@ -153,9 +153,9 @@ func TestAMF0Corner(t *testing.T) { assert.IsNotNil(t, err) objs = []ObjectPair{ - {"air", 3}, - {"ban", "cat"}, - {"dog", true}, + {Key: "air", Value: 3}, + {Key: "ban", Value: "cat"}, + {Key: "dog", Value: true}, } for i := uint32(0); i < 14; i++ { mw = mockwriter.NewMockWriter(mockwriter.WriterTypeDoNothing) @@ -244,7 +244,7 @@ func TestAMF0Corner(t *testing.T) { recover() }() objs = []ObjectPair{ - {"key", []byte{1}}, + {Key: "key", Value: []byte{1}}, } _ = AMF0.WriteObject(mw, objs) } @@ -252,9 +252,9 @@ func TestAMF0Corner(t *testing.T) { func BenchmarkAmf0_ReadObject(b *testing.B) { out := &bytes.Buffer{} objs := []ObjectPair{ - {"air", 3}, - {"ban", "cat"}, - {"dog", true}, + {Key: "air", Value: 3}, + {Key: "ban", Value: "cat"}, + {Key: "dog", Value: true}, } _ = AMF0.WriteObject(out, objs) for i := 0; i < b.N; i++ { @@ -265,9 +265,9 @@ func BenchmarkAmf0_ReadObject(b *testing.B) { func BenchmarkAmf0_WriteObject(b *testing.B) { out := &bytes.Buffer{} objs := []ObjectPair{ - {"air", 3}, - {"ban", "cat"}, - {"dog", true}, + {Key: "air", Value: 3}, + {Key: "ban", Value: "cat"}, + {Key: "dog", Value: true}, } for i := 0; i < b.N; i++ { _ = AMF0.WriteObject(out, objs) diff --git a/pkg/rtmp/chunk_divider.go b/pkg/rtmp/chunk_divider.go index 2aa14d5..42ea3a0 100644 --- a/pkg/rtmp/chunk_divider.go +++ b/pkg/rtmp/chunk_divider.go @@ -112,7 +112,7 @@ func calcHeader(header *Header, prevHeader *Header, out []byte) int { func message2Chunks(message []byte, header *Header, prevHeader *Header, chunkSize int) []byte { //if header.CSID < minCSID || header.CSID > maxCSID { - // return nil, rtmpErr + // return nil, ErrRTMP //} // 计算chunk数量,最后一个chunk的大小 diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index d2871f5..9c530b4 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -22,7 +22,7 @@ import ( "github.com/q191201771/naza/pkg/unique" ) -var ErrClientSessionTimeout = errors.New("rtmp.ClientSession timeout") +var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout") // rtmp客户端类型连接的底层实现 // rtmp包的使用者应该优先使用基于ClientSession实现的PushSession和PullSession @@ -251,7 +251,7 @@ func (s *ClientSession) doOnStatusMessage(stream *Stream, tid int) error { } code, ok := infos["code"] if !ok { - return rtmpErr + return ErrRTMP } switch s.t { case CSTPushSession: @@ -288,7 +288,7 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error { } code, ok := infos["code"].(string) if !ok { - return rtmpErr + return ErrRTMP } switch code { case "NetConnection.Connect.Success": @@ -330,7 +330,7 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error { func (s *ClientSession) doProtocolControlMessage(stream *Stream) error { if stream.msg.len() < 4 { - return rtmpErr + return ErrRTMP } val := int(bele.BEUint32(stream.msg.buf)) @@ -356,16 +356,16 @@ func (s *ClientSession) parseURL(rawURL string) error { return err } if s.url.Scheme != "rtmp" || len(s.url.Host) == 0 || len(s.url.Path) == 0 || s.url.Path[0] != '/' { - return rtmpErr + return ErrRTMP } index := strings.LastIndexByte(rawURL, '/') if index == -1 { - return rtmpErr + return ErrRTMP } s.tcURL = rawURL[:index] strs := strings.Split(s.url.Path[1:], "/") if len(strs) != 2 { - return rtmpErr + return ErrRTMP } s.appName = strs[0] // 有的rtmp服务器会使用url后面的参数(比如说用于鉴权),这里把它带上 diff --git a/pkg/rtmp/example_test.go b/pkg/rtmp/example_test.go index 14e4938..094f22a 100644 --- a/pkg/rtmp/example_test.go +++ b/pkg/rtmp/example_test.go @@ -29,16 +29,16 @@ var ( serverAddr = ":10001" pushURL = "rtmp://127.0.0.1:10001/live/test" pullURL = "rtmp://127.0.0.1:10001/live/test" - rFlvFile = "testdata/test.flv" - wFlvFile = "testdata/out.flv" - wgNum = 4 // FlvFileReader -> [push -> pub -> sub -> pull] -> FlvFileWriter + rFLVFile = "testdata/test.flv" + wFLVFile = "testdata/out.flv" + wgNum = 4 // FLVFileReader -> [push -> pub -> sub -> pull] -> FLVFileWriter ) var ( pubSessionObs MockPubSessionObserver subSession *rtmp.ServerSession wg sync.WaitGroup - w httpflv.FlvFileWriter + w httpflv.FLVFileWriter // rc uint32 bc uint32 @@ -83,13 +83,10 @@ func (pso *MockPubSessionObserver) ReadRTMPAVMsgCB(header rtmp.Header, timestamp switch header.MsgTypeID { case rtmp.TypeidDataMessageAMF0: currHeader.CSID = rtmp.CSIDAMF - //prevHeader = nil case rtmp.TypeidAudio: currHeader.CSID = rtmp.CSIDAudio - //prevHeader = group.prevAudioHeader case rtmp.TypeidVideo: currHeader.CSID = rtmp.CSIDVideo - //prevHeader = group.prevVideoHeader } var absChunks []byte absChunks = rtmp.Message2Chunks(message, &currHeader) @@ -100,19 +97,16 @@ type MockPullSessionObserver struct { } func (pso *MockPullSessionObserver) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) { - tag := logic.Trans.RTMPMsg2FlvTag(header, timestampAbs, message) - w.WriteTag(tag) - //wg.Done() + tag := logic.Trans.RTMPMsg2FLVTag(header, timestampAbs, message) + w.WriteTag(*tag) atomic.AddUint32(&wc, 1) } func TestExample(t *testing.T) { var err error - var r httpflv.FlvFileReader - err = r.Open(rFlvFile) - //assert.Equal(t, nil, err) - // 测试文件不存在,则不做后面的测试了 + var r httpflv.FLVFileReader + err = r.Open(rFLVFile) if err != nil { return } @@ -137,12 +131,12 @@ func TestExample(t *testing.T) { err = pushSession.Push(pushURL) assert.Equal(t, nil, err) - err = w.Open(wFlvFile) + err = w.Open(wFLVFile) assert.Equal(t, nil, err) - err = w.WriteRaw(httpflv.FlvHeader) + err = w.WriteRaw(httpflv.FLVHeader) assert.Equal(t, nil, err) - _, err = r.ReadFlvHeader() + _, err = r.ReadFLVHeader() assert.Equal(t, nil, err) for { tag, err := r.ReadTag() @@ -151,13 +145,11 @@ func TestExample(t *testing.T) { } assert.Equal(t, nil, err) rc++ - //wg.Add(1) - h, _, m := logic.Trans.FlvTag2RTMPMsg(*tag) + h, _, m := logic.Trans.FLVTag2RTMPMsg(*tag) chunks := rtmp.Message2Chunks(m, &h) err = pushSession.AsyncWrite(chunks) assert.Equal(t, nil, err) } - //wg.Wait() r.Dispose() wg.Done() @@ -187,12 +179,12 @@ func TestExample(t *testing.T) { } func compareFile(t *testing.T) { - r, err := ioutil.ReadFile(rFlvFile) + r, err := ioutil.ReadFile(rFLVFile) assert.Equal(t, nil, err) - w, err := ioutil.ReadFile(wFlvFile) + w, err := ioutil.ReadFile(wFLVFile) assert.Equal(t, nil, err) res := bytes.Compare(r, w) assert.Equal(t, 0, res) - err = os.Remove(wFlvFile) + err = os.Remove(wFLVFile) assert.Equal(t, nil, err) } diff --git a/pkg/rtmp/handshake.go b/pkg/rtmp/handshake.go index 715032c..ba0754e 100644 --- a/pkg/rtmp/handshake.go +++ b/pkg/rtmp/handshake.go @@ -107,7 +107,7 @@ func (c *HandshakeClientSimple) ReadS0S1S2(reader io.Reader) error { return err } //if s0s1s2[0] != version { - // return rtmpErr + // return ErrRTMP //} // use s2 as c2 c.c2 = append(c.c2, s0s1s2[s0s1Len:]...) @@ -144,7 +144,7 @@ func (c *HandshakeClientComplex) ReadS0S1S2(reader io.Reader) error { return err } //if s0s1s2[0] != version { - // return rtmpErr + // return ErrRTMP //} // TODO chef: 这里复杂模式的 c2 构造没有完全按照规范 // nginx rtmp module 作为 server 端时,不会校验 c2 内容 @@ -222,7 +222,7 @@ func (s *HandshakeServer) ReadC2(reader io.Reader) error { func parseChallenge(c0c1 []byte) []byte { //if c0c1[0] != version { - // return nil, rtmpErr + // return nil, ErrRTMP //} ver := bele.BEUint32(c0c1[5:]) if ver == 0 { diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/rtmp.go index ad16abb..e7a02de 100644 --- a/pkg/rtmp/rtmp.go +++ b/pkg/rtmp/rtmp.go @@ -10,7 +10,7 @@ package rtmp import "errors" -var rtmpErr = errors.New("rtmp: fxxk") +var ErrRTMP = errors.New("lal.rtmp: fxxk") const ( CSIDAMF = 5 diff --git a/pkg/rtmp/server.go b/pkg/rtmp/server.go index 2c11e47..94f87fd 100644 --- a/pkg/rtmp/server.go +++ b/pkg/rtmp/server.go @@ -17,8 +17,8 @@ import ( type ServerObserver interface { NewRTMPPubSessionCB(session *ServerSession) bool // 返回true则允许推流,返回false则强制关闭这个连接 - NewRTMPSubSessionCB(session *ServerSession) bool // 返回true则允许拉流,返回false则强制关闭这个连接 DelRTMPPubSessionCB(session *ServerSession) + NewRTMPSubSessionCB(session *ServerSession) bool // 返回true则允许拉流,返回false则强制关闭这个连接 DelRTMPSubSessionCB(session *ServerSession) } diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index 140dc63..f5df930 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -146,7 +146,7 @@ func (s *ServerSession) doMsg(stream *Stream) error { case TypeidVideo: if s.t != ServerSessionTypePub { log.Errorf("read audio/video message but server session not pub type. [%s]", s.UniqueKey) - return rtmpErr + return ErrRTMP } //log.Infof("t:%d ts:%d len:%d", stream.header.MsgTypeID, stream.timestampAbs, stream.msg.e - stream.msg.b) s.avObs.ReadRTMPAVMsgCB(stream.header, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e]) @@ -166,7 +166,7 @@ func (s *ServerSession) doACK(stream *Stream) error { func (s *ServerSession) doDataMessageAMF0(stream *Stream) error { if s.t != ServerSessionTypePub { log.Errorf("read audio/video message but server session not pub type. [%s]", s.UniqueKey) - return rtmpErr + return ErrRTMP } val, err := stream.msg.peekStringWithType() @@ -187,7 +187,7 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error { return err } if val != "onMetaData" { - return rtmpErr + return ErrRTMP } case "onMetaData": // noop @@ -241,7 +241,7 @@ func (s *ServerSession) doConnect(tid int, stream *Stream) error { var ok bool s.AppName, ok = val["app"].(string) if !ok { - return rtmpErr + return ErrRTMP } log.Infof("-----> connect('%s'). [%s]", s.AppName, s.UniqueKey) @@ -294,10 +294,6 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) { log.Debugf("[%s] pubType=%s", s.UniqueKey, pubType) log.Infof("-----> publish('%s') [%s]", s.StreamName, s.UniqueKey) - // 回调放在回复客户端信令之前 - s.t = ServerSessionTypePub - s.obs.NewRTMPPubSessionCB(s) - log.Infof("<---- onStatus('NetStream.Publish.Start'). [%s]", s.UniqueKey) if err := s.packer.writeOnStatusPublish(s.conn, MSID1); err != nil { return err @@ -306,6 +302,9 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) { // 回复完信令后修改 connection 的属性 s.ModConnProps() + s.t = ServerSessionTypePub + s.obs.NewRTMPPubSessionCB(s) + return nil } @@ -323,10 +322,6 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) { log.Infof("-----> play('%s'). [%s]", s.StreamName, s.UniqueKey) // TODO chef: start duration reset - // 回调放在回复客户端信令之前 - s.t = ServerSessionTypeSub - s.obs.NewRTMPSubSessionCB(s) - log.Infof("<----onStatus('NetStream.Play.Start'). [%s]", s.UniqueKey) if err := s.packer.writeOnStatusPlay(s.conn, MSID1); err != nil { return err @@ -335,12 +330,16 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) { // 回复完信令后修改 connection 的属性 s.ModConnProps() + s.t = ServerSessionTypeSub + s.obs.NewRTMPSubSessionCB(s) + return nil } func (s *ServerSession) ModConnProps() { s.conn.ModWriteChanSize(wChanSize) - s.conn.ModWriteBufSize(writeBufSize) + // TODO chef: naza.connection 这种方式会导致最后一点数据发送不出去,我们应该使用更好的方式 + //s.conn.ModWriteBufSize(writeBufSize) switch s.t { case ServerSessionTypePub: diff --git a/test.sh b/test.sh index 2f8b316..8aba434 100755 --- a/test.sh +++ b/test.sh @@ -6,6 +6,7 @@ if command -v add_go_license >/dev/null 2>&1; then else echo 'CHEFNOTICEME add_go_license not exist!' fi + echo '-----gofmt-----' if command -v gofmt >/dev/null 2>&1; then gofmt -l ./ @@ -33,15 +34,26 @@ done # 跑 go test 生成测试覆盖率 echo "-----CI coverage-----" -if [ ! -f "pkg/rtmp/testdata/test.flv" ]; then - echo "CHEFERASEME test.flv not exist." - if [ ! -d "pkg/rtmp/testdata" ]; then - echo "CHEFERASEME mkdir." - mkdir "pkg/rtmp/testdata" +if [ ! -f "./testdata/test.flv" ]; then + if [ ! -d "./testdata" ]; then + mkdir "./testdata" fi - wget https://pengrl.com/images/other/source.200kbps.768x320.flv -O pkg/rtmp/testdata/test.flv -else - echo "CHEFERASEME test.flv exist." + wget https://pengrl.com/images/other/source.200kbps.768x320.flv -O ./testdata/test.flv +fi +if [ ! -f "./pkg/rtmp/testdata/test.flv" ]; then + if [ ! -d "./pkg/rtmp/testdata" ]; then + mkdir "./pkg/rtmp/testdata" + fi + cp ./testdata/test.flv ./pkg/rtmp/testdata/test.flv +fi +if [ ! -f "./pkg/logic/testdata/test.flv" ]; then + if [ ! -d "./pkg/logic/testdata" ]; then + mkdir "./pkg/logic/testdata" + fi + cp ./testdata/test.flv ./pkg/logic/testdata/test.flv +fi +if [ ! -f "./pkg/logic/testdata/lals.default.conf.json" ]; then + cp ./conf/lals.default.conf.json ./pkg/logic/testdata/lals.default.conf.json fi echo "" > coverage.txt