[feat] 支持录制flv和ts文件 #14

pull/78/head
q191201771 4 years ago
parent 2e0615b496
commit d5d67a51a2

@ -9,7 +9,7 @@
[中文文档](https://pengrl.com/lal/#/) [中文文档](https://pengrl.com/lal/#/)
LAL is a audio/video live streaming broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g RTMP, RTSP(RTP/RTCP), HLS, HTTP[S]-FLV/HTTP-TS, WebSocket-FLV/TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache. LAL is an audio/video live streaming broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g RTMP, RTSP(RTP/RTCP), HLS, HTTP[S]-FLV/HTTP-TS, WebSocket-FLV/TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache.
And [more than a server, act as package and client](https://github.com/q191201771/lal#more-than-a-server-act-as-package-and-client) And [more than a server, act as package and client](https://github.com/q191201771/lal#more-than-a-server-act-as-package-and-client)

@ -1,6 +1,6 @@
{ {
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief", "# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.1", "conf_version": "v0.1.2",
"rtmp": { "rtmp": {
"enable": true, "enable": true,
"addr": ":1935", "addr": ":1935",
@ -32,6 +32,12 @@
"enable": true, "enable": true,
"addr": ":5544" "addr": ":5544"
}, },
"record": {
"enable_flv": false,
"flv_out_path": "/tmp/lal/flv/",
"enable_mpegts": false,
"mpegts_out_path": "/tmp/lal/mpegts"
},
"relay_push": { "relay_push": {
"enable": false, "enable": false,
"addr_list":[ "addr_list":[

@ -1,6 +1,6 @@
{ {
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief", "# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.1", "conf_version": "v0.1.2",
"rtmp": { "rtmp": {
"enable": true, "enable": true,
"addr": ":1935", "addr": ":1935",
@ -32,6 +32,12 @@
"enable": true, "enable": true,
"addr": ":5544" "addr": ":5544"
}, },
"record": {
"enable_flv": false,
"flv_out_path": "/tmp/lal/flv/",
"enable_mpegts": false,
"mpegts_out_path": "/tmp/lal/mpegts"
},
"relay_push": { "relay_push": {
"enable": false, "enable": false,
"addr_list":[ "addr_list":[

@ -1,6 +1,6 @@
{ {
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief", "# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.1", "conf_version": "v0.1.2",
"rtmp": { "rtmp": {
"enable": true, "enable": true,
"addr": ":1955", "addr": ":1955",
@ -22,6 +22,10 @@
"rtsp": { "rtsp": {
"enable": false "enable": false
}, },
"record": {
"enable_flv": false,
"enable_mpegts": false
},
"relay_push": { "relay_push": {
"enable": false "enable": false
}, },

@ -1,6 +1,6 @@
{ {
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief", "# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.1", "conf_version": "v0.1.2",
"rtmp": { "rtmp": {
"enable": true, "enable": true,
"addr": ":1935", "addr": ":1935",
@ -32,6 +32,10 @@
"enable": false, "enable": false,
"addr": ":5544" "addr": ":5544"
}, },
"record": {
"enable_flv": false,
"enable_mpegts": false
},
"relay_push": { "relay_push": {
"enable": false, "enable": false,
"addr_list":[ "addr_list":[

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.13 go 1.13
require github.com/q191201771/naza v0.18.4 require github.com/q191201771/naza v0.18.5

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.18.4 h1:yk5EUz8q2FhCYg6pG5QcH9uXVJN5nmiXPu5Y5vlpArE= github.com/q191201771/naza v0.18.5 h1:new/bBkivdVqPZpaviseZc2Z4qZ9I4KpKvJUmXCXjEk=
github.com/q191201771/naza v0.18.4/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= github.com/q191201771/naza v0.18.5/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0=

@ -1,3 +1,11 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: joestarzxh
package base package base
import ( import (

@ -33,8 +33,6 @@ import (
var ErrHLS = errors.New("lal.hls: fxxk") var ErrHLS = errors.New("lal.hls: fxxk")
var _ StreamerObserver = &Muxer{}
var audNal = []byte{ var audNal = []byte{
0x00, 0x00, 0x00, 0x01, 0x09, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x09, 0xf0,
} }

@ -92,8 +92,8 @@ func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *M
uk := base.GenUKHLSMuxer() uk := base.GenUKHLSMuxer()
op := getMuxerOutPath(config.OutPath, streamName) op := getMuxerOutPath(config.OutPath, streamName)
playlistFilename := getM3U8Filename(op, streamName) playlistFilename := getM3U8Filename(op, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
recordPlaylistFilename := getRecordM3U8Filename(op, streamName) recordPlaylistFilename := getRecordM3U8Filename(op, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
recordPlaylistFilenameBak := fmt.Sprintf("%s.bak", recordPlaylistFilename) recordPlaylistFilenameBak := fmt.Sprintf("%s.bak", recordPlaylistFilename)
frags := make([]fragmentInfo, 2*config.FragmentNum+1) frags := make([]fragmentInfo, 2*config.FragmentNum+1)
m := &Muxer{ m := &Muxer{

@ -10,6 +10,7 @@ package hls
import ( import (
"fmt" "fmt"
"path/filepath"
"strings" "strings"
) )
@ -56,25 +57,30 @@ func parseRequestInfo(uri string) (ri requestInfo) {
return return
} }
// <rootOutPath>/<ri.streamName>/<ri.fileName>
func readFileContent(rootOutPath string, ri requestInfo) ([]byte, error) { func readFileContent(rootOutPath string, ri requestInfo) ([]byte, error) {
filename := fmt.Sprintf("%s%s/%s", rootOutPath, ri.streamName, ri.fileName) filename := filepath.Join(rootOutPath, ri.streamName, ri.fileName)
return fslCtx.ReadFile(filename) return fslCtx.ReadFile(filename)
} }
// <rootOutPath>/<streamName>
func getMuxerOutPath(rootOutPath string, streamName string) string { func getMuxerOutPath(rootOutPath string, streamName string) string {
return fmt.Sprintf("%s%s/", rootOutPath, streamName) return filepath.Join(rootOutPath, streamName)
} }
func getM3U8Filename(outpath string, streamName string) string { // @param outPath 参考func getMuxerOutPath
return fmt.Sprintf("%s%s.m3u8", outpath, "playlist") func getM3U8Filename(outPath string, streamName string) string {
return filepath.Join(outPath, "playlist.m3u8")
} }
func getRecordM3U8Filename(outpath string, streamName string) string { // @param outPath 参考func getMuxerOutPath
return fmt.Sprintf("%s%s.m3u8", outpath, "record") func getRecordM3U8Filename(outPath string, streamName string) string {
return filepath.Join(outPath, "record.m3u8")
} }
// @param outPath 参考func getMuxerOutPath
func getTSFilenameWithPath(outpath string, filename string) string { func getTSFilenameWithPath(outpath string, filename string) string {
return fmt.Sprintf("%s%s", outpath, filename) return filepath.Join(outpath, filename)
} }
func getTSFilename(streamName string, id int, timestamp int) string { func getTSFilename(streamName string, id int, timestamp int) string {

@ -10,6 +10,8 @@ package httpflv
import "os" import "os"
// TODO chef: 结构体重命名为FileWriter文件名重命名为file_writer.go。所有写流文件的flv,hls,ts统一重构
type FLVFileWriter struct { type FLVFileWriter struct {
fp *os.File fp *os.File
} }
@ -20,17 +22,39 @@ func (ffw *FLVFileWriter) Open(filename string) (err error) {
} }
func (ffw *FLVFileWriter) WriteRaw(b []byte) (err error) { func (ffw *FLVFileWriter) WriteRaw(b []byte) (err error) {
if ffw.fp == nil {
return ErrHTTPFLV
}
_, err = ffw.fp.Write(b) _, err = ffw.fp.Write(b)
return return
} }
func (ffw *FLVFileWriter) WriteFLVHeader() (err error) {
if ffw.fp == nil {
return ErrHTTPFLV
}
_, err = ffw.fp.Write(FLVHeader)
return
}
func (ffw *FLVFileWriter) WriteTag(tag Tag) (err error) { func (ffw *FLVFileWriter) WriteTag(tag Tag) (err error) {
if ffw.fp == nil {
return ErrHTTPFLV
}
_, err = ffw.fp.Write(tag.Raw) _, err = ffw.fp.Write(tag.Raw)
return return
} }
func (ffw *FLVFileWriter) Dispose() { func (ffw *FLVFileWriter) Dispose() error {
if ffw.fp != nil { if ffw.fp == nil {
_ = ffw.fp.Close() return ErrHTTPFLV
}
return ffw.fp.Close()
}
func (ffw *FLVFileWriter) Name() string {
if ffw.fp == nil {
return ""
} }
return ffw.fp.Name()
} }

@ -15,7 +15,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/nazalog"
) )
const ConfVersion = "v0.1.1" const ConfVersion = "v0.1.2"
type Config struct { type Config struct {
ConfVersion string `json:"conf_version"` ConfVersion string `json:"conf_version"`
@ -24,6 +24,7 @@ type Config struct {
HLSConfig HLSConfig `json:"hls"` HLSConfig HLSConfig `json:"hls"`
HTTPTSConfig HTTPTSConfig `json:"httpts"` HTTPTSConfig HTTPTSConfig `json:"httpts"`
RTSPConfig RTSPConfig `json:"rtsp"` RTSPConfig RTSPConfig `json:"rtsp"`
RecordConfig RecordConfig `json:"record"`
RelayPushConfig RelayPushConfig `json:"relay_push"` RelayPushConfig RelayPushConfig `json:"relay_push"`
RelayPullConfig RelayPullConfig `json:"relay_pull"` RelayPullConfig RelayPullConfig `json:"relay_pull"`
@ -61,6 +62,13 @@ type RTSPConfig struct {
Addr string `json:"addr"` Addr string `json:"addr"`
} }
type RecordConfig struct {
EnableFLV bool `json:"enable_flv"`
FLVOutPath string `json:"flv_out_path"`
EnableMPEGTS bool `json:"enable_mpegts"`
MPEGTSOutPath string `json:"mpegts_out_path"`
}
type RelayPushConfig struct { type RelayPushConfig struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
AddrList []string `json:"addr_list"` AddrList []string `json:"addr_list"`

@ -34,6 +34,9 @@ var (
func Entry(confFile string) { func Entry(confFile string) {
LoadConfAndInitLog(confFile) LoadConfAndInitLog(confFile)
if dir, err := os.Getwd(); err == nil {
nazalog.Infof("wd: %s", dir)
}
nazalog.Infof("args: %s", strings.Join(os.Args, " ")) nazalog.Infof("args: %s", strings.Join(os.Args, " "))
nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine()) nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine())
nazalog.Infof("version: %s", base.LALFullInfo) nazalog.Infof("version: %s", base.LALFullInfo)
@ -45,6 +48,15 @@ func Entry(confFile string) {
hls.SetUseMemoryAsDiskFlag(true) hls.SetUseMemoryAsDiskFlag(true)
} }
if config.RecordConfig.EnableFLV {
if err := os.MkdirAll(config.RecordConfig.FLVOutPath, 0777); err != nil {
nazalog.Errorf("record flv mkdir error. path=%s, err=%+v", config.RecordConfig.FLVOutPath, err)
}
if err := os.MkdirAll(config.RecordConfig.MPEGTSOutPath, 0777); err != nil {
nazalog.Errorf("record mpegts mkdir error. path=%s, err=%+v", config.RecordConfig.MPEGTSOutPath, err)
}
}
sm = NewServerManager() sm = NewServerManager()
if config.PProfConfig.Enable { if config.PProfConfig.Enable {
@ -138,6 +150,7 @@ func LoadConfAndInitLog(confFile string) *Config {
"hls", "hls",
"httpts", "httpts",
"rtsp", "rtsp",
"record",
"relay_push", "relay_push",
"relay_pull", "relay_pull",
"http_api", "http_api",

@ -11,8 +11,12 @@ package logic
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"path/filepath"
"strings" "strings"
"sync" "sync"
"time"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/remux" "github.com/q191201771/lal/pkg/remux"
@ -46,30 +50,36 @@ type Group struct {
mutex sync.Mutex mutex sync.Mutex
// //
stat base.StatGroup stat base.StatGroup
// // pub
rtmpPubSession *rtmp.ServerSession rtmpPubSession *rtmp.ServerSession
rtspPubSession *rtsp.PubSession rtspPubSession *rtsp.PubSession
// // pull
pullEnable bool pullEnable bool
pullURL string pullURL string
pullProxy *pullProxy pullProxy *pullProxy
// // sub
rtmpSubSessionSet map[*rtmp.ServerSession]struct{} rtmpSubSessionSet map[*rtmp.ServerSession]struct{}
httpflvSubSessionSet map[*httpflv.SubSession]struct{} httpflvSubSessionSet map[*httpflv.SubSession]struct{}
httptsSubSessionSet map[*httpts.SubSession]struct{} httptsSubSessionSet map[*httpts.SubSession]struct{}
rtspSubSessionSet map[*rtsp.SubSession]struct{} rtspSubSessionSet map[*rtsp.SubSession]struct{}
// // push
url2PushProxy map[string]*pushProxy url2PushProxy map[string]*pushProxy
// // hls
hlsMuxer *hls.Muxer hlsMuxer *hls.Muxer
recordFLV *httpflv.FLVFileWriter
recordMPEGTS *mpegts.FileWriter
// rtmp pub/pull使用 // rtmp pub/pull使用
gopCache *GOPCache gopCache *GOPCache
httpflvGopCache *GOPCache httpflvGopCache *GOPCache
// rtsp pub使用 // rtsp pub使用
asc []byte asc []byte
vps []byte vps []byte
sps []byte sps []byte
pps []byte pps []byte
// //
tickCount uint32 tickCount uint32
} }
@ -482,6 +492,12 @@ func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) {
session.WriteRawPacket(rawFrame) session.WriteRawPacket(rawFrame)
} }
} }
if group.recordMPEGTS != nil {
if err := group.recordMPEGTS.Write(rawFrame); err != nil {
nazalog.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err)
}
}
} }
// rtmp.PubSession or rtmp.PullSession // rtmp.PubSession or rtmp.PullSession
@ -796,7 +812,14 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
session.WriteRawPacket(lrm2ft.Get()) session.WriteRawPacket(lrm2ft.Get())
} }
// # 5. 缓存关键信息以及gop // # 5. 录制flv文件
if group.recordFLV != nil {
if err := group.recordFLV.WriteRaw(lrm2ft.Get()); err != nil {
nazalog.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err)
}
}
// # 6. 缓存关键信息以及gop
if config.RTMPConfig.Enable { if config.RTMPConfig.Enable {
group.gopCache.Feed(msg, lcd.Get) group.gopCache.Feed(msg, lcd.Get)
} }
@ -804,7 +827,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
group.httpflvGopCache.Feed(msg, lrm2ft.Get) group.httpflvGopCache.Feed(msg, lrm2ft.Get)
} }
// # 6. 记录stat // # 7. 记录stat
if group.stat.AudioCodec == "" { if group.stat.AudioCodec == "" {
if msg.IsAACSeqHeader() { if msg.IsAACSeqHeader() {
group.stat.AudioCodec = base.AudioCodecAAC group.stat.AudioCodec = base.AudioCodecAAC
@ -986,6 +1009,48 @@ func (group *Group) addIn() {
if config.RelayPushConfig.Enable { if config.RelayPushConfig.Enable {
group.pushIfNeeded() group.pushIfNeeded()
} }
now := time.Now().Unix()
if config.RecordConfig.EnableFLV {
filename := fmt.Sprintf("%s-%d.flv", group.streamName, now)
filenameWithPath := filepath.Join(config.RecordConfig.FLVOutPath, filename)
if group.recordFLV != nil {
nazalog.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordFLV.Name())
if err := group.recordFLV.Dispose(); err != nil {
nazalog.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err)
}
}
group.recordFLV = &httpflv.FLVFileWriter{}
if err := group.recordFLV.Open(filenameWithPath); err != nil {
nazalog.Errorf("[%s] record flv open file failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFLV = nil
}
if err := group.recordFLV.WriteFLVHeader(); err != nil {
nazalog.Errorf("[%s] record flv write flv header failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFLV = nil
}
}
if config.RecordConfig.EnableMPEGTS {
filename := fmt.Sprintf("%s-%d.ts", group.streamName, now)
filenameWithPath := filepath.Join(config.RecordConfig.MPEGTSOutPath, filename)
if group.recordMPEGTS != nil {
nazalog.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordMPEGTS.Name())
if err := group.recordMPEGTS.Dispose(); err != nil {
nazalog.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err)
}
}
group.recordMPEGTS = &mpegts.FileWriter{}
if err := group.recordMPEGTS.Create(filenameWithPath); err != nil {
nazalog.Errorf("[%s] record mpegts open file failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFLV = nil
}
}
} }
func (group *Group) delIn() { func (group *Group) delIn() {
@ -1002,6 +1067,24 @@ func (group *Group) delIn() {
} }
} }
if config.RecordConfig.EnableFLV {
if group.recordFLV != nil {
if err := group.recordFLV.Dispose(); err != nil {
nazalog.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err)
}
group.recordFLV = nil
}
}
if config.RecordConfig.EnableMPEGTS {
if group.recordMPEGTS != nil {
if err := group.recordMPEGTS.Dispose(); err != nil {
nazalog.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err)
}
group.recordMPEGTS = nil
}
}
group.gopCache.Clear() group.gopCache.Clear()
group.httpflvGopCache.Clear() group.httpflvGopCache.Clear()
} }

@ -0,0 +1,42 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package mpegts
import "os"
type FileWriter struct {
fp *os.File
}
func (fw *FileWriter) Create(filename string) (err error) {
fw.fp, err = os.Create(filename)
return
}
func (fw *FileWriter) Write(b []byte) (err error) {
if fw.fp == nil {
return ErrMPEGTS
}
_, err = fw.fp.Write(b)
return
}
func (fw *FileWriter) Dispose() error {
if fw.fp == nil {
return ErrMPEGTS
}
return fw.fp.Close()
}
func (fw *FileWriter) Name() string {
if fw.fp == nil {
return ""
}
return fw.fp.Name()
}

@ -8,8 +8,12 @@
package mpegts package mpegts
import "errors"
// MPEG: Moving Picture Experts Group // MPEG: Moving Picture Experts Group
var ErrMPEGTS = errors.New("lal.mpegts: fxxk")
// 每个TS文件都以固定的PATPMT开始 // 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeader = []byte{ var FixedFragmentHeader = []byte{
/* TS */ /* TS */

Loading…
Cancel
Save