From d5d67a51a23fa12f654afe5391150f0fff1739a8 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 24 Apr 2021 19:37:39 +0800 Subject: [PATCH] =?UTF-8?q?[feat]=20=E6=94=AF=E6=8C=81=E5=BD=95=E5=88=B6fl?= =?UTF-8?q?v=E5=92=8Cts=E6=96=87=E4=BB=B6=20#14?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- conf/lalserver.conf.json | 8 ++- conf/lalserver.conf.json.tmpl | 8 ++- conf/node2.conf.json | 6 ++- conf/onlyrtmp.conf.json | 6 ++- go.mod | 2 +- go.sum | 4 +- pkg/base/websocket.go | 8 +++ pkg/hls/hls.go | 2 - pkg/hls/muxer.go | 2 +- pkg/hls/path.go | 20 ++++--- pkg/httpflv/flv_file_writer.go | 30 +++++++++-- pkg/logic/config.go | 10 +++- pkg/logic/entry.go | 13 +++++ pkg/logic/group.go | 97 +++++++++++++++++++++++++++++++--- pkg/mpegts/file_writer.go | 42 +++++++++++++++ pkg/mpegts/mpegts.go | 4 ++ 17 files changed, 235 insertions(+), 29 deletions(-) create mode 100644 pkg/mpegts/file_writer.go diff --git a/README.md b/README.md index ab09ab6..790c0a6 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ [中文文档](https://pengrl.com/lal/#/) -LAL is a audio/video live streaming broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g RTMP, RTSP(RTP/RTCP), HLS, HTTP[S]-FLV/HTTP-TS, WebSocket-FLV/TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache. +LAL is an audio/video live streaming broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g RTMP, RTSP(RTP/RTCP), HLS, HTTP[S]-FLV/HTTP-TS, WebSocket-FLV/TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache. And [more than a server, act as package and client](https://github.com/q191201771/lal#more-than-a-server-act-as-package-and-client) diff --git a/conf/lalserver.conf.json b/conf/lalserver.conf.json index 40e7f56..e20b939 100644 --- a/conf/lalserver.conf.json +++ b/conf/lalserver.conf.json @@ -1,6 +1,6 @@ { "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", - "conf_version": "v0.1.1", + "conf_version": "v0.1.2", "rtmp": { "enable": true, "addr": ":1935", @@ -32,6 +32,12 @@ "enable": true, "addr": ":5544" }, + "record": { + "enable_flv": false, + "flv_out_path": "/tmp/lal/flv/", + "enable_mpegts": false, + "mpegts_out_path": "/tmp/lal/mpegts" + }, "relay_push": { "enable": false, "addr_list":[ diff --git a/conf/lalserver.conf.json.tmpl b/conf/lalserver.conf.json.tmpl index 40e7f56..e20b939 100644 --- a/conf/lalserver.conf.json.tmpl +++ b/conf/lalserver.conf.json.tmpl @@ -1,6 +1,6 @@ { "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", - "conf_version": "v0.1.1", + "conf_version": "v0.1.2", "rtmp": { "enable": true, "addr": ":1935", @@ -32,6 +32,12 @@ "enable": true, "addr": ":5544" }, + "record": { + "enable_flv": false, + "flv_out_path": "/tmp/lal/flv/", + "enable_mpegts": false, + "mpegts_out_path": "/tmp/lal/mpegts" + }, "relay_push": { "enable": false, "addr_list":[ diff --git a/conf/node2.conf.json b/conf/node2.conf.json index 6fb2cfd..63c5a84 100644 --- a/conf/node2.conf.json +++ b/conf/node2.conf.json @@ -1,6 +1,6 @@ { "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", - "conf_version": "v0.1.1", + "conf_version": "v0.1.2", "rtmp": { "enable": true, "addr": ":1955", @@ -22,6 +22,10 @@ "rtsp": { "enable": false }, + "record": { + "enable_flv": false, + "enable_mpegts": false + }, "relay_push": { "enable": false }, diff --git a/conf/onlyrtmp.conf.json b/conf/onlyrtmp.conf.json index 4a8a9f9..6cce7f5 100644 --- a/conf/onlyrtmp.conf.json +++ b/conf/onlyrtmp.conf.json @@ -1,6 +1,6 @@ { "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", - "conf_version": "v0.1.1", + "conf_version": "v0.1.2", "rtmp": { "enable": true, "addr": ":1935", @@ -32,6 +32,10 @@ "enable": false, "addr": ":5544" }, + "record": { + "enable_flv": false, + "enable_mpegts": false + }, "relay_push": { "enable": false, "addr_list":[ diff --git a/go.mod b/go.mod index 951afa1..9415bc2 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.13 -require github.com/q191201771/naza v0.18.4 +require github.com/q191201771/naza v0.18.5 diff --git a/go.sum b/go.sum index db42625..e169668 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/naza v0.18.4 h1:yk5EUz8q2FhCYg6pG5QcH9uXVJN5nmiXPu5Y5vlpArE= -github.com/q191201771/naza v0.18.4/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= +github.com/q191201771/naza v0.18.5 h1:new/bBkivdVqPZpaviseZc2Z4qZ9I4KpKvJUmXCXjEk= +github.com/q191201771/naza v0.18.5/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= diff --git a/pkg/base/websocket.go b/pkg/base/websocket.go index c3f9fde..a265630 100644 --- a/pkg/base/websocket.go +++ b/pkg/base/websocket.go @@ -1,3 +1,11 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: joestarzxh + package base import ( diff --git a/pkg/hls/hls.go b/pkg/hls/hls.go index df9d524..1c7d599 100644 --- a/pkg/hls/hls.go +++ b/pkg/hls/hls.go @@ -33,8 +33,6 @@ import ( var ErrHLS = errors.New("lal.hls: fxxk") -var _ StreamerObserver = &Muxer{} - var audNal = []byte{ 0x00, 0x00, 0x00, 0x01, 0x09, 0xf0, } diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index 13d7f98..d1ba3e8 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -92,8 +92,8 @@ func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *M uk := base.GenUKHLSMuxer() op := getMuxerOutPath(config.OutPath, streamName) playlistFilename := getM3U8Filename(op, streamName) - playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename) recordPlaylistFilename := getRecordM3U8Filename(op, streamName) + playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename) recordPlaylistFilenameBak := fmt.Sprintf("%s.bak", recordPlaylistFilename) frags := make([]fragmentInfo, 2*config.FragmentNum+1) m := &Muxer{ diff --git a/pkg/hls/path.go b/pkg/hls/path.go index f856f0d..0c2c0b3 100644 --- a/pkg/hls/path.go +++ b/pkg/hls/path.go @@ -10,6 +10,7 @@ package hls import ( "fmt" + "path/filepath" "strings" ) @@ -56,25 +57,30 @@ func parseRequestInfo(uri string) (ri requestInfo) { return } +// // func readFileContent(rootOutPath string, ri requestInfo) ([]byte, error) { - filename := fmt.Sprintf("%s%s/%s", rootOutPath, ri.streamName, ri.fileName) + filename := filepath.Join(rootOutPath, ri.streamName, ri.fileName) return fslCtx.ReadFile(filename) } +// / func getMuxerOutPath(rootOutPath string, streamName string) string { - return fmt.Sprintf("%s%s/", rootOutPath, streamName) + return filepath.Join(rootOutPath, streamName) } -func getM3U8Filename(outpath string, streamName string) string { - return fmt.Sprintf("%s%s.m3u8", outpath, "playlist") +// @param outPath 参考func getMuxerOutPath +func getM3U8Filename(outPath string, streamName string) string { + return filepath.Join(outPath, "playlist.m3u8") } -func getRecordM3U8Filename(outpath string, streamName string) string { - return fmt.Sprintf("%s%s.m3u8", outpath, "record") +// @param outPath 参考func getMuxerOutPath +func getRecordM3U8Filename(outPath string, streamName string) string { + return filepath.Join(outPath, "record.m3u8") } +// @param outPath 参考func getMuxerOutPath func getTSFilenameWithPath(outpath string, filename string) string { - return fmt.Sprintf("%s%s", outpath, filename) + return filepath.Join(outpath, filename) } func getTSFilename(streamName string, id int, timestamp int) string { diff --git a/pkg/httpflv/flv_file_writer.go b/pkg/httpflv/flv_file_writer.go index 49b94b7..9382f09 100644 --- a/pkg/httpflv/flv_file_writer.go +++ b/pkg/httpflv/flv_file_writer.go @@ -10,6 +10,8 @@ package httpflv import "os" +// TODO chef: 结构体重命名为FileWriter,文件名重命名为file_writer.go。所有写流文件的(flv,hls,ts)统一重构 + type FLVFileWriter struct { fp *os.File } @@ -20,17 +22,39 @@ func (ffw *FLVFileWriter) Open(filename string) (err error) { } func (ffw *FLVFileWriter) WriteRaw(b []byte) (err error) { + if ffw.fp == nil { + return ErrHTTPFLV + } _, err = ffw.fp.Write(b) return } +func (ffw *FLVFileWriter) WriteFLVHeader() (err error) { + if ffw.fp == nil { + return ErrHTTPFLV + } + _, err = ffw.fp.Write(FLVHeader) + return +} + func (ffw *FLVFileWriter) WriteTag(tag Tag) (err error) { + if ffw.fp == nil { + return ErrHTTPFLV + } _, err = ffw.fp.Write(tag.Raw) return } -func (ffw *FLVFileWriter) Dispose() { - if ffw.fp != nil { - _ = ffw.fp.Close() +func (ffw *FLVFileWriter) Dispose() error { + if ffw.fp == nil { + return ErrHTTPFLV + } + return ffw.fp.Close() +} + +func (ffw *FLVFileWriter) Name() string { + if ffw.fp == nil { + return "" } + return ffw.fp.Name() } diff --git a/pkg/logic/config.go b/pkg/logic/config.go index cc99a2c..a0caa03 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -15,7 +15,7 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -const ConfVersion = "v0.1.1" +const ConfVersion = "v0.1.2" type Config struct { ConfVersion string `json:"conf_version"` @@ -24,6 +24,7 @@ type Config struct { HLSConfig HLSConfig `json:"hls"` HTTPTSConfig HTTPTSConfig `json:"httpts"` RTSPConfig RTSPConfig `json:"rtsp"` + RecordConfig RecordConfig `json:"record"` RelayPushConfig RelayPushConfig `json:"relay_push"` RelayPullConfig RelayPullConfig `json:"relay_pull"` @@ -61,6 +62,13 @@ type RTSPConfig struct { Addr string `json:"addr"` } +type RecordConfig struct { + EnableFLV bool `json:"enable_flv"` + FLVOutPath string `json:"flv_out_path"` + EnableMPEGTS bool `json:"enable_mpegts"` + MPEGTSOutPath string `json:"mpegts_out_path"` +} + type RelayPushConfig struct { Enable bool `json:"enable"` AddrList []string `json:"addr_list"` diff --git a/pkg/logic/entry.go b/pkg/logic/entry.go index 8b4c1ae..f0b4284 100644 --- a/pkg/logic/entry.go +++ b/pkg/logic/entry.go @@ -34,6 +34,9 @@ var ( func Entry(confFile string) { LoadConfAndInitLog(confFile) + if dir, err := os.Getwd(); err == nil { + nazalog.Infof("wd: %s", dir) + } nazalog.Infof("args: %s", strings.Join(os.Args, " ")) nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine()) nazalog.Infof("version: %s", base.LALFullInfo) @@ -45,6 +48,15 @@ func Entry(confFile string) { hls.SetUseMemoryAsDiskFlag(true) } + if config.RecordConfig.EnableFLV { + if err := os.MkdirAll(config.RecordConfig.FLVOutPath, 0777); err != nil { + nazalog.Errorf("record flv mkdir error. path=%s, err=%+v", config.RecordConfig.FLVOutPath, err) + } + if err := os.MkdirAll(config.RecordConfig.MPEGTSOutPath, 0777); err != nil { + nazalog.Errorf("record mpegts mkdir error. path=%s, err=%+v", config.RecordConfig.MPEGTSOutPath, err) + } + } + sm = NewServerManager() if config.PProfConfig.Enable { @@ -138,6 +150,7 @@ func LoadConfAndInitLog(confFile string) *Config { "hls", "httpts", "rtsp", + "record", "relay_push", "relay_pull", "http_api", diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 31c23ca..2d1af95 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -11,8 +11,12 @@ package logic import ( "encoding/json" "fmt" + "path/filepath" "strings" "sync" + "time" + + "github.com/q191201771/lal/pkg/mpegts" "github.com/q191201771/lal/pkg/remux" @@ -46,30 +50,36 @@ type Group struct { mutex sync.Mutex // stat base.StatGroup - // + // pub rtmpPubSession *rtmp.ServerSession rtspPubSession *rtsp.PubSession - // + // pull pullEnable bool pullURL string pullProxy *pullProxy - // + // sub rtmpSubSessionSet map[*rtmp.ServerSession]struct{} httpflvSubSessionSet map[*httpflv.SubSession]struct{} httptsSubSessionSet map[*httpts.SubSession]struct{} rtspSubSessionSet map[*rtsp.SubSession]struct{} - // + // push url2PushProxy map[string]*pushProxy - // + // hls hlsMuxer *hls.Muxer + + recordFLV *httpflv.FLVFileWriter + recordMPEGTS *mpegts.FileWriter + // rtmp pub/pull使用 gopCache *GOPCache httpflvGopCache *GOPCache + // rtsp pub使用 asc []byte vps []byte sps []byte pps []byte + // tickCount uint32 } @@ -482,6 +492,12 @@ func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) { session.WriteRawPacket(rawFrame) } } + + if group.recordMPEGTS != nil { + if err := group.recordMPEGTS.Write(rawFrame); err != nil { + nazalog.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err) + } + } } // rtmp.PubSession or rtmp.PullSession @@ -796,7 +812,14 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { session.WriteRawPacket(lrm2ft.Get()) } - // # 5. 缓存关键信息,以及gop + // # 5. 录制flv文件 + if group.recordFLV != nil { + if err := group.recordFLV.WriteRaw(lrm2ft.Get()); err != nil { + nazalog.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err) + } + } + + // # 6. 缓存关键信息,以及gop if config.RTMPConfig.Enable { group.gopCache.Feed(msg, lcd.Get) } @@ -804,7 +827,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { group.httpflvGopCache.Feed(msg, lrm2ft.Get) } - // # 6. 记录stat + // # 7. 记录stat if group.stat.AudioCodec == "" { if msg.IsAACSeqHeader() { group.stat.AudioCodec = base.AudioCodecAAC @@ -986,6 +1009,48 @@ func (group *Group) addIn() { if config.RelayPushConfig.Enable { group.pushIfNeeded() } + + now := time.Now().Unix() + if config.RecordConfig.EnableFLV { + filename := fmt.Sprintf("%s-%d.flv", group.streamName, now) + filenameWithPath := filepath.Join(config.RecordConfig.FLVOutPath, filename) + if group.recordFLV != nil { + nazalog.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s", + group.UniqueKey, filenameWithPath, group.recordFLV.Name()) + if err := group.recordFLV.Dispose(); err != nil { + nazalog.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err) + } + } + group.recordFLV = &httpflv.FLVFileWriter{} + if err := group.recordFLV.Open(filenameWithPath); err != nil { + nazalog.Errorf("[%s] record flv open file failed. filename=%s, err=%+v", + group.UniqueKey, filenameWithPath, err) + group.recordFLV = nil + } + if err := group.recordFLV.WriteFLVHeader(); err != nil { + nazalog.Errorf("[%s] record flv write flv header failed. filename=%s, err=%+v", + group.UniqueKey, filenameWithPath, err) + group.recordFLV = nil + } + } + + if config.RecordConfig.EnableMPEGTS { + filename := fmt.Sprintf("%s-%d.ts", group.streamName, now) + filenameWithPath := filepath.Join(config.RecordConfig.MPEGTSOutPath, filename) + if group.recordMPEGTS != nil { + nazalog.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s", + group.UniqueKey, filenameWithPath, group.recordMPEGTS.Name()) + if err := group.recordMPEGTS.Dispose(); err != nil { + nazalog.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err) + } + } + group.recordMPEGTS = &mpegts.FileWriter{} + if err := group.recordMPEGTS.Create(filenameWithPath); err != nil { + nazalog.Errorf("[%s] record mpegts open file failed. filename=%s, err=%+v", + group.UniqueKey, filenameWithPath, err) + group.recordFLV = nil + } + } } func (group *Group) delIn() { @@ -1002,6 +1067,24 @@ func (group *Group) delIn() { } } + if config.RecordConfig.EnableFLV { + if group.recordFLV != nil { + if err := group.recordFLV.Dispose(); err != nil { + nazalog.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err) + } + group.recordFLV = nil + } + } + + if config.RecordConfig.EnableMPEGTS { + if group.recordMPEGTS != nil { + if err := group.recordMPEGTS.Dispose(); err != nil { + nazalog.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err) + } + group.recordMPEGTS = nil + } + } + group.gopCache.Clear() group.httpflvGopCache.Clear() } diff --git a/pkg/mpegts/file_writer.go b/pkg/mpegts/file_writer.go new file mode 100644 index 0000000..09f4c12 --- /dev/null +++ b/pkg/mpegts/file_writer.go @@ -0,0 +1,42 @@ +// Copyright 2019, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package mpegts + +import "os" + +type FileWriter struct { + fp *os.File +} + +func (fw *FileWriter) Create(filename string) (err error) { + fw.fp, err = os.Create(filename) + return +} + +func (fw *FileWriter) Write(b []byte) (err error) { + if fw.fp == nil { + return ErrMPEGTS + } + _, err = fw.fp.Write(b) + return +} + +func (fw *FileWriter) Dispose() error { + if fw.fp == nil { + return ErrMPEGTS + } + return fw.fp.Close() +} + +func (fw *FileWriter) Name() string { + if fw.fp == nil { + return "" + } + return fw.fp.Name() +} diff --git a/pkg/mpegts/mpegts.go b/pkg/mpegts/mpegts.go index 4a1592e..2276f85 100644 --- a/pkg/mpegts/mpegts.go +++ b/pkg/mpegts/mpegts.go @@ -8,8 +8,12 @@ package mpegts +import "errors" + // MPEG: Moving Picture Experts Group +var ErrMPEGTS = errors.New("lal.mpegts: fxxk") + // 每个TS文件都以固定的PAT,PMT开始 var FixedFragmentHeader = []byte{ /* TS */