diff --git a/CHANGELOG.md b/CHANGELOG.md index b5fc19c..24e315a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +#### v0.22.0 (2021-05-03) + +- [feat] 录制新增支持:flv和mpegts文件。 录制支持列表见: https://pengrl.com/lal/#/LALServer (#14) +- [feat] h265新增支持: hls拉流,hls录制;http-ts拉流,mpegts录制。h265支持列表见: https://pengrl.com/lal/#/LALServer (#65) +- [feat] 拉流新增支持:websocket-flv,websocket-ts。拉流协议支持列表见: https://pengrl.com/lal/#/LALServer +- [feat] hls: 支持内存切片。 (#50) +- [fix] rtmp ClientSession握手,c2的发送时机,由收到s0s1s2改为收到s0s1就发送,解决握手失败的case。 (#42) +- [fix] rtsp h265: 转rtmp时处理错误导致无法播放 +- [fix] rtsp h265: ffmpeg向lalserver推送rtsp h265报错。 (#55) +- [test] travis ci: 自动化单元测试os增加osx, windows, arch增加arm64, ppc64le, s390x。 (#59) +- [feat] rtmp ClientSession支持配置使用简单握手,复杂握手 (#68) + #### v0.21.0 (2021-04-11) - [feat] package rtmp: 支持Aggregate Message diff --git a/README.md b/README.md index 790c0a6..6c615ad 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ [中文文档](https://pengrl.com/lal/#/) -LAL is an audio/video live streaming broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g RTMP, RTSP(RTP/RTCP), HLS, HTTP[S]-FLV/HTTP-TS, WebSocket-FLV/TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache. +LAL is an audio/video live streaming broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g RTMP, RTSP(RTP/RTCP), HLS, HTTP[S]/WebSocket[s]-FLV/TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache. And [more than a server, act as package and client](https://github.com/q191201771/lal#more-than-a-server-act-as-package-and-client) diff --git a/app/demo/pullrtmp2hls/pullrtmp2hls.go b/app/demo/pullrtmp2hls/pullrtmp2hls.go index 1c3cfd4..83ba6ad 100644 --- a/app/demo/pullrtmp2hls/pullrtmp2hls.go +++ b/app/demo/pullrtmp2hls/pullrtmp2hls.go @@ -31,7 +31,6 @@ func main() { url, hlsOutPath, fragmentDurationMS, fragmentNum) hlsMuxerConfig := hls.MuxerConfig{ - Enable: true, OutPath: hlsOutPath, FragmentDurationMS: fragmentDurationMS, FragmentNum: fragmentNum, @@ -43,7 +42,7 @@ func main() { } streamName := ctx.LastItemOfPath - hlsMuexer := hls.NewMuxer(streamName, &hlsMuxerConfig, nil) + hlsMuexer := hls.NewMuxer(streamName, true, &hlsMuxerConfig, nil) hlsMuexer.Start() pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { diff --git a/app/lalserver/main.go b/app/lalserver/main.go index 28344b0..ebb6462 100644 --- a/app/lalserver/main.go +++ b/app/lalserver/main.go @@ -28,7 +28,8 @@ func main() { defer nazalog.Sync() confFile := parseFlag() - logic.Entry(confFile) + logic.Init(confFile) + logic.RunLoop() } func parseFlag() string { diff --git a/conf/ConfigBrief.md b/conf/ConfigBrief.md new file mode 100644 index 0000000..68e266d --- /dev/null +++ b/conf/ConfigBrief.md @@ -0,0 +1,86 @@ +``` +{ + "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", // 配置文件对应的文档说明链接,在程序中没实际用途 + "conf_version": "0.2.0", // 配置文件版本号,业务方不应该手动修改,程序中会检查该版本号是否与代码中声明的一致 + "rtmp": { + "enable": true, // 是否开启rtmp服务的监听 + "addr": ":19350", // RTMP服务监听的端口,客户端向lalserver推拉流都是这个地址 + "gop_num": 2 // RTMP拉流的GOP缓存数量,加速秒开 + }, + "default_http": { // http监听相关的默认配置,如果hls, httpflv, httpts中没有单独配置以下配置项,则使用default_http中的配置 + // 注意,hls, httpflv, httpts服务是否开启,不由此处决定 + "http_listen_addr": ":8080", // HTTP监听地址 + "https_listen_addr": ":4433", // HTTPS监听地址 + "https_cert_file": "./conf/cert.pem", // HTTPS的本地cert文件地址 + "https_key_file": "./conf/key.pem" // HTTPS的本地key文件地址 + }, + "httpflv": { + "enable": true, // 是否开启HTTP-FLV服务的监听 + "enable_https": true, // 是否开启HTTPS-FLV监听 + "gop_num": 2 + }, + "hls": { + "enable": true, // 是否开启HLS服务的监听 + "out_path": "/tmp/lal/hls/", // HLS文件保存根目录 + "fragment_duration_ms": 3000, // 单个TS文件切片时长,单位毫秒 + "fragment_num": 6, // m3u8文件列表中ts文件的数量 + "cleanup_mode": 1, // HLS文件清理模式: + // 0 不删除m3u8+ts文件,可用于录制等场景 + // 1 在输入流结束后删除m3u8+ts文件 + // 注意,确切的删除时间是推流结束后的 * * 2的时间点 + // 推迟一小段时间删除,是为了避免输入流刚结束,HLS的拉流端还没有拉取完 + // 2 推流过程中,持续删除过期的ts文件,只保留最近的 * 2个左右的ts文件 + "use_memory_as_disk_flag": false // 是否使用内存取代磁盘,保存m3u8+ts文件 + }, + "httpts": { + "enable": true // 是否开启HTTP-TS服务的监听。注意,这并不是HLS中的TS,而是在一条HTTP长连接上持续性传输TS流 + }, + "rtsp": { + "enable": true, // 是否开启rtsp服务的监听,目前只支持rtsp推流 + "addr": ":5544" // rtsp推流地址 + }, + "record": { + "enable_flv": true, // 是否开启flv录制 + "flv_out_path": "/tmp/lal/flv/", // flv录制目录 + "enable_mpegts": true, // 是否开启mpegts录制。注意,此处是长ts文件录制,hls录制由上面的hls配置控制 + "mpegts_out_path": "/tmp/lal/mpegts" // mpegts录制目录 + }, + "relay_push": { + "enable": false, // 是否开启中继转推功能,开启后,自身接收到的所有流都会转推出去 + "addr_list":[ // 中继转推的对端地址,支持填写多个地址,做1对n的转推。格式举例 "127.0.0.1:19351" + ] + }, + "relay_pull": { + "enable": false, // 是否开启回源拉流功能,开启后,当自身接收到拉流请求,而流不存在时,会从其他服务器拉取这个流到本地 + "addr": "" // 回源拉流的地址。格式举例 "127.0.0.1:19351" + }, + "http_api": { + "enable": true, // 是否开启HTTP API接口 + "addr": ":8083" // 监听地址 + }, + "server_id": "1", // 当前lalserver唯一ID。多个lalserver HTTP Notify同一个地址时,可通过该ID区分 + "http_notify": { + "enable": true, // 是否开启HTTP Notify事件回调 + "update_interval_sec": 5, // update事件回调间隔,单位毫秒 + "on_server_start": "http://127.0.0.1:10101/on_server_start", // 各事件HTTP Notify事件回调地址 + "on_update": "http://127.0.0.1:10101/on_update", + "on_pub_start": "http://127.0.0.1:10101/on_pub_start", + "on_pub_stop": "http://127.0.0.1:10101/on_pub_stop", + "on_sub_start": "http://127.0.0.1:10101/on_sub_start", + "on_sub_stop": "http://127.0.0.1:10101/on_sub_stop", + "on_rtmp_connect": "http://127.0.0.1:10101/on_rtmp_connect" + }, + "pprof": { + "enable": true, // 是否开启Go pprof web服务的监听 + "addr": ":8084" // Go pprof web地址 + }, + "log": { + "level": 1, // 日志级别,0 trace, 1 debug, 2 info, 3 warn, 4 error, 5 fatal + "filename": "./logs/lalserver.log", // 日志输出文件 + "is_to_stdout": true, // 是否打印至标志控制台输出 + "is_rotate_daily": true, // 日志按天翻滚 + "short_file_flag": true, // 日志末尾是否携带源码文件名以及行号的信息 + "assert_behavior": 1 // 日志断言的行为,1 只打印错误日志 2 打印并退出程序 3 打印并panic + } +} +``` diff --git a/conf/lalserver.conf.json b/conf/lalserver.conf.json index e20b939..534c70d 100644 --- a/conf/lalserver.conf.json +++ b/conf/lalserver.conf.json @@ -1,23 +1,25 @@ { "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", - "conf_version": "v0.1.2", + "conf_version": "v0.2.0", "rtmp": { "enable": true, "addr": ":1935", "gop_num": 2 }, + "default_http": { + "http_listen_addr": ":8080", + "https_listen_addr": ":4433", + "https_cert_file": "./conf/cert.pem", + "https_key_file": "./conf/key.pem" + }, "httpflv": { "enable": true, - "sub_listen_addr": ":8080", "enable_https": false, - "https_addr": ":4433", - "https_cert_file": "./conf/cert.pem", - "https_key_file": "./conf/key.pem", "gop_num": 2 }, "hls": { "enable": true, - "sub_listen_addr": ":8081", + "enable_https": false, "out_path": "/tmp/lal/hls/", "fragment_duration_ms": 3000, "fragment_num": 6, @@ -26,7 +28,7 @@ }, "httpts": { "enable": true, - "sub_listen_addr": ":8082" + "enable_https":false }, "rtsp": { "enable": true, diff --git a/conf/lalserver.conf.json.tmpl b/conf/lalserver.conf.json.tmpl index e20b939..cb004da 100644 --- a/conf/lalserver.conf.json.tmpl +++ b/conf/lalserver.conf.json.tmpl @@ -1,23 +1,25 @@ { "# doc of config": "https://pengrl.com/lal/#/ConfigBrief", - "conf_version": "v0.1.2", + "conf_version": "v0.2.0", "rtmp": { "enable": true, "addr": ":1935", "gop_num": 2 }, + "default_http": { + "http_listen_addr": ":8080", + "https_listen_addr": ":4433", + "https_cert_file": "./conf/cert.pem", + "https_key_file": "./conf/key.pem" + }, "httpflv": { "enable": true, - "sub_listen_addr": ":8080", "enable_https": false, - "https_addr": ":4433", - "https_cert_file": "./conf/cert.pem", - "https_key_file": "./conf/key.pem", "gop_num": 2 }, "hls": { "enable": true, - "sub_listen_addr": ":8081", + "enable_https": false, "out_path": "/tmp/lal/hls/", "fragment_duration_ms": 3000, "fragment_num": 6, @@ -26,7 +28,7 @@ }, "httpts": { "enable": true, - "sub_listen_addr": ":8082" + "enable_https":false }, "rtsp": { "enable": true, diff --git a/pkg/avc/avc.go b/pkg/avc/avc.go index 19cf516..6d3c602 100644 --- a/pkg/avc/avc.go +++ b/pkg/avc/avc.go @@ -31,6 +31,9 @@ var ErrAVC = errors.New("lal.avc: fxxk") var ( NALUStartCode3 = []byte{0x0, 0x0, 0x1} NALUStartCode4 = []byte{0x0, 0x0, 0x0, 0x1} + + // aud nalu + AUDNALU = []byte{0x00, 0x00, 0x00, 0x01, 0x09, 0xf0} ) var NALUTypeMapping = map[uint8]string{ diff --git a/pkg/base/avpacket.go b/pkg/base/avpacket.go index 3e568d5..9e326f1 100644 --- a/pkg/base/avpacket.go +++ b/pkg/base/avpacket.go @@ -17,11 +17,24 @@ const ( AVPacketPTAAC AVPacketPT = RTPPacketTypeAAC ) -// 目前供package rtsp使用。以后可能被多个package使用。 -// 不排除不同package使用时,字段含义也不同的情况出现。 +// 不同场景使用时,字段含义可能不同。 // 使用AVPacket的地方,应注明各字段的含义。 type AVPacket struct { Timestamp uint32 PayloadType AVPacketPT Payload []byte } + +func (a AVPacketPT) ReadableString() string { + switch a { + case AVPacketPTUnknown: + return "unknown" + case AVPacketPTAVC: + return "avc" + case AVPacketPTHEVC: + return "hevc" + case AVPacketPTAAC: + return "aac" + } + return "" +} diff --git a/pkg/base/http_server.go b/pkg/base/http_server.go new file mode 100644 index 0000000..5ad9358 --- /dev/null +++ b/pkg/base/http_server.go @@ -0,0 +1,149 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package base + +import ( + "crypto/tls" + "errors" + "net" + "net/http" + "reflect" + + "github.com/q191201771/naza/pkg/nazaerrors" +) + +// TODO(chef) +// - 考虑移入naza中 +// - 考虑增加一个pattern全部未命中的mux回调 + +var ( + ErrAddrEmpty = errors.New("lal.base: http server addr empty") + ErrMultiRegistForPattern = errors.New("lal.base: http server multiple registrations for pattern") +) + +const ( + NetworkTCP = "tcp" +) + +type LocalAddrCtx struct { + IsHTTPS bool + Addr string + CertFile string + KeyFile string + + Network string // 默认为NetworkTCP +} + +type HTTPServerManager struct { + addr2ServerCtx map[string]*ServerCtx +} + +type ServerCtx struct { + addrCtx LocalAddrCtx + listener net.Listener + httpServer http.Server + mux *http.ServeMux + pattern2Handler map[string]Handler +} + +func NewHTTPServerManager() *HTTPServerManager { + return &HTTPServerManager{ + addr2ServerCtx: make(map[string]*ServerCtx), + } +} + +type Handler func(http.ResponseWriter, *http.Request) + +// @param pattern 必须以`/`开始,并以`/`结束 +func (s *HTTPServerManager) AddListen(addrCtx LocalAddrCtx, pattern string, handler Handler) error { + var ( + ctx *ServerCtx + mux *http.ServeMux + ok bool + ) + + if addrCtx.Addr == "" { + return ErrAddrEmpty + } + + ctx, ok = s.addr2ServerCtx[addrCtx.Addr] + if !ok { + l, err := listen(addrCtx) + if err != nil { + return err + } + mux = http.NewServeMux() + ctx = &ServerCtx{ + addrCtx: addrCtx, + listener: l, + httpServer: http.Server{ + Handler: mux, + }, + mux: mux, + pattern2Handler: make(map[string]Handler), + } + s.addr2ServerCtx[addrCtx.Addr] = ctx + } + + // 路径相同,比较回调函数是否相同 + // 如果回调函数也相同,意味着重复绑定,这种情况是允许的 + // 如果回调函数不同,返回错误 + if prevHandler, ok := ctx.pattern2Handler[pattern]; ok { + if reflect.ValueOf(prevHandler).Pointer() == reflect.ValueOf(handler).Pointer() { + return nil + } else { + return ErrMultiRegistForPattern + } + } + ctx.pattern2Handler[pattern] = handler + + ctx.mux.HandleFunc(pattern, handler) + return nil +} + +func (s *HTTPServerManager) RunLoop() error { + errChan := make(chan error, len(s.addr2ServerCtx)) + + for _, v := range s.addr2ServerCtx { + go func(ctx *ServerCtx) { + errChan <- ctx.httpServer.Serve(ctx.listener) + + _ = ctx.httpServer.Close() + }(v) + } + + // 阻塞直到接到第一个error + return <-errChan +} + +func (s *HTTPServerManager) Dispose() error { + var es []error + for _, v := range s.addr2ServerCtx { + err := v.httpServer.Close() + es = append(es, err) + } + return nazaerrors.CombineErrors(es...) +} + +func listen(ctx LocalAddrCtx) (net.Listener, error) { + if ctx.Network == "" { + ctx.Network = NetworkTCP + } + + if !ctx.IsHTTPS { + return net.Listen(ctx.Network, ctx.Addr) + } + + cert, err := tls.LoadX509KeyPair(ctx.CertFile, ctx.KeyFile) + if err != nil { + return nil, err + } + tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}} + return tls.Listen(ctx.Network, ctx.Addr, tlsConfig) +} diff --git a/pkg/base/http_server_test.go b/pkg/base/http_server_test.go new file mode 100644 index 0000000..68114ac --- /dev/null +++ b/pkg/base/http_server_test.go @@ -0,0 +1,49 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package base + +import ( + "testing" +) + +func TestHTTPServerManager(t *testing.T) { + //var err error + // + //var fnFLV = func(writer http.ResponseWriter, request *http.Request) { + // nazalog.Debugf("> fnFLV. %+v, %+v", writer, request) + // conn, bio, err := writer.(http.Hijacker).Hijack() + // if err != nil { + // nazalog.Errorf("hijack failed. err=%+v", err) + // return + // } + // if bio.Reader.Buffered() != 0 || bio.Writer.Buffered() != 0 { + // nazalog.Errorf("hijack but buffer not empty. rb=%d, wb=%d", bio.Reader.Buffered(), bio.Writer.Buffered()) + // } + // nazalog.Debugf("%+v, %+v, %+v", conn, bio, err) + //} + // + //sm := NewHTTPServerManager() + // + //err = sm.AddListen( + // LocalAddrCtx{IsHTTPS: false, Addr: ":8080"}, + // "/live/", + // fnFLV, + //) + //assert.Equal(t, nil, err) + // + //err = sm.AddListen( + // LocalAddrCtx{IsHTTPS: true, Addr: ":4433", CertFile: "../../conf/cert.pem", KeyFile: "../../conf/key.pem"}, + // "/live/", + // fnFLV, + //) + //assert.Equal(t, nil, err) + // + //err = sm.RunLoop() + //assert.Equal(t, nil, err) +} diff --git a/pkg/base/http_sub_session.go b/pkg/base/http_sub_session.go new file mode 100644 index 0000000..d897594 --- /dev/null +++ b/pkg/base/http_sub_session.go @@ -0,0 +1,149 @@ +// Copyright 2019, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package base + +import ( + "net" + "strings" + "time" + + "github.com/q191201771/naza/pkg/connection" +) + +type HTTPSubSession struct { + HTTPSubSessionOption + + suffix string + conn connection.Connection + prevConnStat connection.Stat + staleStat *connection.Stat + stat StatSession +} + +type HTTPSubSessionOption struct { + Conn net.Conn + ConnModOption connection.ModOption + UK string // unique key + Protocol string + URLCtx URLContext + IsWebSocket bool + WebSocketKey string +} + +func NewHTTPSubSession(option HTTPSubSessionOption) *HTTPSubSession { + s := &HTTPSubSession{ + HTTPSubSessionOption: option, + conn: connection.New(option.Conn, option.ConnModOption), + stat: StatSession{ + Protocol: option.Protocol, + SessionID: option.UK, + StartTime: time.Now().Format("2006-01-02 15:04:05.999"), + RemoteAddr: option.Conn.RemoteAddr().String(), + }, + } + return s +} + +func (session *HTTPSubSession) RunLoop() error { + buf := make([]byte, 128) + _, err := session.conn.Read(buf) + return err +} + +func (session *HTTPSubSession) WriteHTTPResponseHeader(b []byte) { + if session.IsWebSocket { + session.Write(UpdateWebSocketHeader(session.WebSocketKey)) + } else { + session.Write(b) + } +} + +func (session *HTTPSubSession) Write(b []byte) { + if session.IsWebSocket { + wsHeader := WSHeader{ + Fin: true, + Rsv1: false, + Rsv2: false, + Rsv3: false, + Opcode: WSO_Binary, + PayloadLength: uint64(len(b)), + Masked: false, + } + session.write(MakeWSFrameHeader(wsHeader)) + } + session.write(b) +} + +func (session *HTTPSubSession) Dispose() error { + return session.conn.Close() +} + +func (session *HTTPSubSession) URL() string { + return session.URLCtx.URL +} + +func (session *HTTPSubSession) AppName() string { + return session.URLCtx.PathWithoutLastItem +} + +func (session *HTTPSubSession) StreamName() string { + var suffix string + switch session.Protocol { + case ProtocolHTTPFLV: + suffix = ".flv" + case ProtocolHTTPTS: + suffix = ".ts" + default: + Logger.Warnf("[%s] acquire stream name but protocol unknown.", session.UK) + } + return strings.TrimSuffix(session.URLCtx.LastItemOfPath, suffix) +} + +func (session *HTTPSubSession) RawQuery() string { + return session.URLCtx.RawQuery +} + +func (session *HTTPSubSession) UniqueKey() string { + return session.UK +} + +func (session *HTTPSubSession) GetStat() StatSession { + currStat := session.conn.GetStat() + session.stat.ReadBytesSum = currStat.ReadBytesSum + session.stat.WroteBytesSum = currStat.WroteBytesSum + return session.stat +} + +func (session *HTTPSubSession) UpdateStat(intervalSec uint32) { + currStat := session.conn.GetStat() + rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum + session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) + wDiff := currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum + session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec)) + session.stat.Bitrate = session.stat.WriteBitrate + session.prevConnStat = currStat +} + +func (session *HTTPSubSession) IsAlive() (readAlive, writeAlive bool) { + currStat := session.conn.GetStat() + if session.staleStat == nil { + session.staleStat = new(connection.Stat) + *session.staleStat = currStat + return true, true + } + + readAlive = !(currStat.ReadBytesSum-session.staleStat.ReadBytesSum == 0) + writeAlive = !(currStat.WroteBytesSum-session.staleStat.WroteBytesSum == 0) + *session.staleStat = currStat + return +} + +func (session *HTTPSubSession) write(b []byte) { + _, _ = session.conn.Write(b) +} diff --git a/pkg/base/rtmp_t.go b/pkg/base/rtmp_t.go index 593a509..a1c45d3 100644 --- a/pkg/base/rtmp_t.go +++ b/pkg/base/rtmp_t.go @@ -62,7 +62,7 @@ const ( // AACAUDIODATA // AACPacketType UI8 // Data UI8[n] - RTMPSoundFormatAAC uint8 = 10 + RTMPSoundFormatAAC uint8 = 10 // 注意,视频的CodecID是后4位,音频是前4位 RTMPAACPacketTypeSeqHeader = 0 RTMPAACPacketTypeRaw = 1 ) @@ -72,7 +72,7 @@ type RTMPHeader struct { MsgLen uint32 // 不包含header的大小 MsgTypeID uint8 // 8 audio 9 video 18 metadata MsgStreamID int - TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳 + TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳,单位毫秒 } type RTMPMsg struct { diff --git a/pkg/base/url.go b/pkg/base/url.go index 82c7854..29b211c 100644 --- a/pkg/base/url.go +++ b/pkg/base/url.go @@ -134,11 +134,11 @@ func ParseRTMPURL(rawURL string) (ctx URLContext, err error) { } func ParseHTTPFLVURL(rawURL string, isHTTPS bool) (ctx URLContext, err error) { - return parsehttpURL(rawURL, isHTTPS, ".flv") + return ParseHTTPURL(rawURL, isHTTPS, ".flv") } func ParseHTTPTSURL(rawURL string, isHTTPS bool) (ctx URLContext, err error) { - return parsehttpURL(rawURL, isHTTPS, ".ts") + return ParseHTTPURL(rawURL, isHTTPS, ".ts") } func ParseRTSPURL(rawURL string) (ctx URLContext, err error) { @@ -184,7 +184,7 @@ func parseURLPath(stdURL *url.URL) (ctx URLPathContext, err error) { return ctx, nil } -func parsehttpURL(rawURL string, isHTTPS bool, suffix string) (ctx URLContext, err error) { +func ParseHTTPURL(rawURL string, isHTTPS bool, suffix string) (ctx URLContext, err error) { var defaultPort int if isHTTPS { defaultPort = DefaultHTTPSPort diff --git a/pkg/base/var.go b/pkg/base/var.go new file mode 100644 index 0000000..215e6e6 --- /dev/null +++ b/pkg/base/var.go @@ -0,0 +1,15 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package base + +import "github.com/q191201771/naza/pkg/nazalog" + +var ( + Logger = nazalog.GetGlobalLogger() +) diff --git a/pkg/base/version.go b/pkg/base/version.go index ac39f58..f9fffb4 100644 --- a/pkg/base/version.go +++ b/pkg/base/version.go @@ -16,7 +16,7 @@ import "strings" // 并且将这些信息打入可执行文件、日志、各协议中的标准版本字段中 // 版本,该变量由外部脚本修改维护 -const LALVersion = "v0.21.0" +const LALVersion = "v0.22.0" var ( LALLibraryName = "lal" diff --git a/pkg/hevc/hevc.go b/pkg/hevc/hevc.go index a103638..ee187ae 100644 --- a/pkg/hevc/hevc.go +++ b/pkg/hevc/hevc.go @@ -33,23 +33,37 @@ var ErrHEVC = errors.New("lal.hevc: fxxk") var ( NALUStartCode4 = []byte{0x0, 0x0, 0x0, 0x1} + + // aud nalu + AUDNALU = []byte{0x00, 0x00, 0x00, 0x01, 0x46, 0x01, 0x10} ) var NALUTypeMapping = map[uint8]string{ - NALUTypeSliceTrailR: "SLICE", - NALUTypeSliceIDR: "I", - NALUTypeSliceIDRNLP: "IDR", + NALUTypeSliceTrailN: "TrailN", + NALUTypeSliceTrailR: "TrailR", + NALUTypeSliceIDR: "IDR", + NALUTypeSliceIDRNLP: "IDRNLP", + NALUTypeSliceCRANUT: "CRANUT", + NALUTypeVPS: "VPS", + NALUTypeSPS: "SPS", + NALUTypePPS: "PPS", + NALUTypeAUD: "AUD", NALUTypeSEI: "SEI", - NALUTypeSEISuffix: "SEI", + NALUTypeSEISuffix: "SEISuffix", } + +// ISO_IEC_23008-2_2013.pdf +// Table 7-1 – NAL unit type codes and NAL unit type classes var ( NALUTypeSliceTrailN uint8 = 0 // 0x0 NALUTypeSliceTrailR uint8 = 1 // 0x01 NALUTypeSliceIDR uint8 = 19 // 0x13 NALUTypeSliceIDRNLP uint8 = 20 // 0x14 + NALUTypeSliceCRANUT uint8 = 21 // 0x15 NALUTypeVPS uint8 = 32 // 0x20 NALUTypeSPS uint8 = 33 // 0x21 NALUTypePPS uint8 = 34 // 0x22 + NALUTypeAUD uint8 = 35 // 0x23 NALUTypeSEI uint8 = 39 // 0x27 NALUTypeSEISuffix uint8 = 40 // 0x28 ) diff --git a/pkg/hls/fragment.go b/pkg/hls/fragment.go index ca9b613..8836dcb 100644 --- a/pkg/hls/fragment.go +++ b/pkg/hls/fragment.go @@ -10,8 +10,6 @@ package hls import ( "github.com/q191201771/naza/pkg/filesystemlayer" - - "github.com/q191201771/lal/pkg/mpegts" ) type Fragment struct { @@ -23,7 +21,6 @@ func (f *Fragment) OpenFile(filename string) (err error) { if err != nil { return } - err = f.WriteFile(mpegts.FixedFragmentHeader) return } diff --git a/pkg/hls/hls.go b/pkg/hls/hls.go index 1c7d599..90957c5 100644 --- a/pkg/hls/hls.go +++ b/pkg/hls/hls.go @@ -33,10 +33,6 @@ import ( var ErrHLS = errors.New("lal.hls: fxxk") -var audNal = []byte{ - 0x00, 0x00, 0x00, 0x01, 0x09, 0xf0, -} - const ( // TODO chef 这些在配置项中提供 negMaxfraglen uint64 = 1000 * 90 // 当前包时间戳回滚了,比当前fragment的首个时间戳还小,强制切割新的fragment,单位(毫秒*90) diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index d1ba3e8..7910661 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -24,6 +24,8 @@ import ( // 后续从架构上考虑,packet hls,mpegts,logic的分工 type MuxerObserver interface { + OnPATPMT(b []byte) + // @param rawFrame TS流,回调结束后,内部不再使用该内存块 // @param boundary 新的TS流接收者,应该从该标志为true时开始发送数据 // @@ -31,7 +33,6 @@ type MuxerObserver interface { } type MuxerConfig struct { - Enable bool `json:"enable"` // 如果false,说明hls功能没开,也即不写文件,但是MuxerObserver依然会回调 OutPath string `json:"out_path"` // m3u8和ts文件的输出根目录,注意,末尾需以'/'结束 FragmentDurationMS int `json:"fragment_duration_ms"` FragmentNum int `json:"fragment_num"` @@ -52,6 +53,7 @@ const ( CleanupModeASAP = 2 ) +// 输入rtmp流,转出hls(m3u8+ts)至文件中,并回调给上层转出ts流 type Muxer struct { UniqueKey string @@ -63,6 +65,7 @@ type Muxer struct { recordPlayListFilenameBak string // const after init config *MuxerConfig + enable bool observer MuxerObserver fragment Fragment @@ -77,6 +80,7 @@ type Muxer struct { recordMaxFragDuration float64 streamer *Streamer + patpmt []byte } // 记录fragment的一些信息,注意,写m3u8文件时可能还需要用到历史fragment的信息 @@ -87,8 +91,9 @@ type fragmentInfo struct { filename string } +// @param enable 如果false,说明hls功能没开,也即不写文件,但是MuxerObserver依然会回调 // @param observer 可以为nil,如果不为nil,TS流将回调给上层 -func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *Muxer { +func NewMuxer(streamName string, enable bool, config *MuxerConfig, observer MuxerObserver) *Muxer { uk := base.GenUKHLSMuxer() op := getMuxerOutPath(config.OutPath, streamName) playlistFilename := getM3U8Filename(op, streamName) @@ -104,6 +109,7 @@ func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *M playlistFilenameBak: playlistFilenameBak, recordPlayListFilename: recordPlaylistFilename, recordPlayListFilenameBak: recordPlaylistFilenameBak, + enable: enable, config: config, observer: observer, frags: frags, @@ -133,6 +139,11 @@ func (m *Muxer) FeedRTMPMessage(msg base.RTMPMsg) { m.streamer.FeedRTMPMessage(msg) } +func (m *Muxer) OnPATPMT(b []byte) { + m.patpmt = b + m.observer.OnPATPMT(b) +} + func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) { var boundary bool var packets []byte @@ -173,7 +184,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) { } mpegts.PackTSPacket(frame, func(packet []byte) { - if m.config.Enable { + if m.enable { if err := m.fragment.WriteFile(packet); err != nil { nazalog.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err) return @@ -204,7 +215,13 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool) error { if m.opened { f := m.getCurrFrag() - // 当前时间戳跳跃很大,或者是往回跳跃超过了阈值,强制开启新的fragment + // 以下情况,强制开启新的分片: + // 1. 当前时间戳 - 当前分片的初始时间戳 > 配置中单个ts分片时长的10倍 + // 原因可能是: + // 1. 当前包的时间戳发生了大的跳跃 + // 2. 一直没有I帧导致没有合适的时间重新切片,堆积的包达到阈值 + // 2. 往回跳跃超过了阈值 + // maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10) if (ts > m.fragTS && ts-m.fragTS > maxfraglen) || (m.fragTS > ts && m.fragTS-ts > negMaxfraglen) { nazalog.Warnf("[%s] force fragment split. fragTS=%d, ts=%d", m.UniqueKey, m.fragTS, ts) @@ -240,6 +257,9 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool) error { } // 开启新的fragment + // 此时的情况是,上层认为是合适的开启分片的时机(比如是I帧),并且 + // 1. 当前是第一个分片 + // 2. 当前不是第一个分片,但是上一个分片已经达到配置时长 if boundary { if err := m.closeFragment(false); err != nil { return err @@ -263,10 +283,13 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error { filename := getTSFilename(m.streamName, id, int(time.Now().Unix())) filenameWithPath := getTSFilenameWithPath(m.outPath, filename) - if m.config.Enable { + if m.enable { if err := m.fragment.OpenFile(filenameWithPath); err != nil { return err } + if err := m.fragment.WriteFile(m.patpmt); err != nil { + return err + } } m.opened = true @@ -290,7 +313,7 @@ func (m *Muxer) closeFragment(isLast bool) error { return nil } - if m.config.Enable { + if m.enable { if err := m.fragment.CloseFile(); err != nil { return err } @@ -325,7 +348,7 @@ func (m *Muxer) closeFragment(isLast bool) error { } func (m *Muxer) writeRecordPlaylist(isLast bool) { - if !m.config.Enable { + if !m.enable { return } @@ -380,7 +403,7 @@ func (m *Muxer) writeRecordPlaylist(isLast bool) { } func (m *Muxer) writePlaylist(isLast bool) { - if !m.config.Enable { + if !m.enable { return } @@ -421,7 +444,7 @@ func (m *Muxer) writePlaylist(isLast bool) { } func (m *Muxer) ensureDir() { - if !m.config.Enable { + if !m.enable { return } //err := fslCtx.RemoveAll(m.outPath) diff --git a/pkg/hls/queue.go b/pkg/hls/queue.go new file mode 100644 index 0000000..5bbb1ac --- /dev/null +++ b/pkg/hls/queue.go @@ -0,0 +1,91 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package hls + +import ( + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/mpegts" +) + +// 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式 +// 一旦判断结束,该队列变成直进直出,不再有实际缓存 +type Queue struct { + maxMsgSize int + data []base.RTMPMsg + observer IQueueObserver + + audioCodecID int + videoCodecID int + done bool +} + +type IQueueObserver interface { + // 该回调一定发生在数据回调之前 + // TODO(chef) 这里可以考虑换成只通知drain,由上层完成FragmentHeader的组装逻辑 + OnPATPMT(b []byte) + + OnPop(msg base.RTMPMsg) +} + +// @param maxMsgSize 最大缓存多少个包 +func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue { + return &Queue{ + maxMsgSize: maxMsgSize, + data: make([]base.RTMPMsg, maxMsgSize)[0:0], + observer: observer, + audioCodecID: -1, + videoCodecID: -1, + done: false, + } +} + +// @param msg 函数调用结束后,内部不持有该内存块 +func (q *Queue) Push(msg base.RTMPMsg) { + if q.done { + q.observer.OnPop(msg) + return + } + + q.data = append(q.data, msg.Clone()) + + switch msg.Header.MsgTypeID { + case base.RTMPTypeIDAudio: + q.audioCodecID = int(msg.Payload[0] >> 4) + case base.RTMPTypeIDVideo: + q.videoCodecID = int(msg.Payload[0] & 0xF) + } + + if q.videoCodecID != -1 && q.audioCodecID != -1 { + q.drain() + return + } + + if len(q.data) >= q.maxMsgSize { + q.drain() + return + } +} + +func (q *Queue) drain() { + switch q.videoCodecID { + case int(base.RTMPCodecIDAVC): + q.observer.OnPATPMT(mpegts.FixedFragmentHeader) + case int(base.RTMPCodecIDHEVC): + q.observer.OnPATPMT(mpegts.FixedFragmentHeaderHEVC) + default: + // TODO(chef) 正确处理只有音频或只有视频的情况 #56 + q.observer.OnPATPMT(mpegts.FixedFragmentHeader) + } + for i := range q.data { + q.observer.OnPop(q.data[i]) + } + q.data = nil + + q.done = true +} diff --git a/pkg/hls/queue_test.go b/pkg/hls/queue_test.go new file mode 100644 index 0000000..26b3025 --- /dev/null +++ b/pkg/hls/queue_test.go @@ -0,0 +1,58 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package hls + +import ( + "testing" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/mpegts" + "github.com/q191201771/naza/pkg/assert" +) + +var ( + fh []byte + poped []base.RTMPMsg +) + +type qo struct { +} + +func (q *qo) OnPATPMT(b []byte) { + fh = b +} + +func (q *qo) OnPop(msg base.RTMPMsg) { + poped = append(poped, msg) +} + +func TestQueue(t *testing.T) { + goldenRTMPMsg := []base.RTMPMsg{ + { + Header: base.RTMPHeader{ + MsgTypeID: base.RTMPTypeIDAudio, + }, + Payload: []byte{0xAF}, + }, + { + Header: base.RTMPHeader{ + MsgTypeID: base.RTMPTypeIDVideo, + }, + Payload: []byte{0x17}, + }, + } + + q := &qo{} + queue := NewQueue(8, q) + for i := range goldenRTMPMsg { + queue.Push(goldenRTMPMsg[i]) + } + assert.Equal(t, mpegts.FixedFragmentHeader, fh) + assert.Equal(t, goldenRTMPMsg, poped) +} diff --git a/pkg/hls/server.go b/pkg/hls/server_handler.go similarity index 71% rename from pkg/hls/server.go rename to pkg/hls/server_handler.go index 0d1679c..915d096 100644 --- a/pkg/hls/server.go +++ b/pkg/hls/server_handler.go @@ -9,7 +9,6 @@ package hls import ( - "net" "net/http" "github.com/q191201771/lal/pkg/base" @@ -17,40 +16,40 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -type Server struct { - addr string +type ServerHandler struct { outPath string - ln net.Listener - httpSrv *http.Server + //addr string + //ln net.Listener + //httpSrv *http.Server } -func NewServer(addr string, outPath string) *Server { - return &Server{ - addr: addr, +func NewServerHandler(outPath string) *ServerHandler { + return &ServerHandler{ outPath: outPath, } } -func (s *Server) Listen() (err error) { - if s.ln, err = net.Listen("tcp", s.addr); err != nil { - return - } - s.httpSrv = &http.Server{Addr: s.addr, Handler: s} - nazalog.Infof("start hls server listen. addr=%s", s.addr) - return -} - -func (s *Server) RunLoop() error { - return s.httpSrv.Serve(s.ln) -} - -func (s *Server) Dispose() { - if err := s.httpSrv.Close(); err != nil { - nazalog.Error(err) - } -} +// +//func (s *Server) Listen() (err error) { +// if s.ln, err = net.Listen("tcp", s.addr); err != nil { +// return +// } +// s.httpSrv = &http.Server{Addr: s.addr, Handler: s} +// nazalog.Infof("start hls server listen. addr=%s", s.addr) +// return +//} +// +//func (s *Server) RunLoop() error { +// return s.httpSrv.Serve(s.ln) +//} +// +//func (s *Server) Dispose() { +// if err := s.httpSrv.Close(); err != nil { +// nazalog.Error(err) +// } +//} -func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (s *ServerHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { //nazalog.Debugf("%+v", req) // TODO chef: diff --git a/pkg/hls/streamer.go b/pkg/hls/streamer.go index 3dcb002..8fdb8be 100644 --- a/pkg/hls/streamer.go +++ b/pkg/hls/streamer.go @@ -12,12 +12,16 @@ import ( "github.com/q191201771/lal/pkg/aac" "github.com/q191201771/lal/pkg/avc" "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/hevc" "github.com/q191201771/lal/pkg/mpegts" "github.com/q191201771/naza/pkg/bele" "github.com/q191201771/naza/pkg/nazalog" ) type StreamerObserver interface { + // @param b const只读内存块,上层可以持有,但是不允许修改 + OnPATPMT(b []byte) + // @param streamer: 供上层获取streamer内部的一些状态,比如spspps是否已缓存,音频缓存队列是否有数据等 // // @param frame: 各字段含义见mpegts.Frame结构体定义 @@ -27,12 +31,14 @@ type StreamerObserver interface { OnFrame(streamer *Streamer, frame *mpegts.Frame) } +// 输入rtmp流,回调转封装成AnnexB格式的流 type Streamer struct { UniqueKey string observer StreamerObserver + calcFragmentHeaderQueue *Queue videoOut []byte // AnnexB TODO chef: 优化这块buff - spspps []byte // AnnexB + spspps []byte // AnnexB 也可能是vps+sps+pps adts aac.ADTS audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 TODO chef: 优化这块buff audioCacheFirstFramePTS uint64 // audioCacheFrames中第一个音频帧的时间戳 TODO chef: rename to DTS @@ -44,17 +50,27 @@ func NewStreamer(observer StreamerObserver) *Streamer { uk := base.GenUKStreamer() videoOut := make([]byte, 1024*1024) videoOut = videoOut[0:0] - return &Streamer{ + streamer := &Streamer{ UniqueKey: uk, observer: observer, videoOut: videoOut, } + streamer.calcFragmentHeaderQueue = NewQueue(calcFragmentHeaderQueueSize, streamer) + return streamer } // @param msg msg.Payload 调用结束后,函数内部不会持有这块内存 // // TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接 func (s *Streamer) FeedRTMPMessage(msg base.RTMPMsg) { + s.calcFragmentHeaderQueue.Push(msg) +} + +func (s *Streamer) OnPATPMT(b []byte) { + s.observer.OnPATPMT(b) +} + +func (s *Streamer) OnPop(msg base.RTMPMsg) { switch msg.Header.MsgTypeID { case base.RTMPTypeIDAudio: s.feedAudio(msg) @@ -80,21 +96,25 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) { nazalog.Errorf("[%s] invalid video message length. len=%d", s.UniqueKey, len(msg.Payload)) return } - if msg.Payload[0]&0xF != base.RTMPCodecIDAVC { + codecID := msg.Payload[0] & 0xF + if codecID != base.RTMPCodecIDAVC && codecID != base.RTMPCodecIDHEVC { return } - ftype := msg.Payload[0] & 0xF0 >> 4 - htype := msg.Payload[1] - // 将数据转换成AnnexB // 如果是sps pps,缓存住,然后直接返回 - if ftype == base.RTMPFrameTypeKey && htype == base.RTMPAVCPacketTypeSeqHeader { - if err := s.cacheSPSPPS(msg); err != nil { + var err error + if msg.IsAVCKeySeqHeader() { + if s.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload); err != nil { nazalog.Errorf("[%s] cache spspps failed. err=%+v", s.UniqueKey, err) } return + } else if msg.IsHEVCKeySeqHeader() { + if s.spspps, err = hevc.VPSSPSPPSSeqHeader2AnnexB(msg.Payload); err != nil { + nazalog.Errorf("[%s] cache vpsspspps failed. err=%+v", s.UniqueKey, err) + } + return } cts := bele.BEUint24(msg.Payload[2:]) @@ -117,46 +137,71 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) { return } - nalType := avc.ParseNALUType(msg.Payload[i]) + var nalType uint8 + switch codecID { + case base.RTMPCodecIDAVC: + nalType = avc.ParseNALUType(msg.Payload[i]) + case base.RTMPCodecIDHEVC: + nalType = hevc.ParseNALUType(msg.Payload[i]) + } - //nazalog.Debugf("[%s] hls: h264 NAL type=%d, len=%d(%d) cts=%d.", s.UniqueKey, nalType, nalBytes, len(msg.Payload), cts) + //nazalog.Debugf("[%s] naltype=%d, len=%d(%d), cts=%d, key=%t.", s.UniqueKey, nalType, nalBytes, len(msg.Payload), cts, msg.IsVideoKeyNALU()) - // sps pps前面已经缓存过了,这里就不用处理了 - // aud有自己的生产逻辑,原流中的aud直接过滤掉 - if nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD { + // 过滤掉原流中的sps pps aud + // sps pps前面已经缓存过了,后面有自己的写入逻辑 + // aud有自己的写入逻辑 + if (codecID == base.RTMPCodecIDAVC && (nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD)) || + (codecID == base.RTMPCodecIDHEVC && (nalType == hevc.NALUTypeVPS || nalType == hevc.NALUTypeSPS || nalType == hevc.NALUTypePPS || nalType == hevc.NALUTypeAUD)) { i += nalBytes continue } + // tag中的首个nalu前面写入aud if !audSent { - switch nalType { - case avc.NALUTypeSlice, avc.NALUTypeIDRSlice, avc.NALUTypeSEI: - // 在前面写入aud - out = append(out, audNal...) - audSent = true - //case avc.NALUTypeAUD: - // // 上面aud已经continue跳过了,应该进不到这个分支,可以考虑删除这个分支代码 - // audSent = true + // 注意,因为前面已经过滤了sps pps aud的信息,所以这里可以认为都是需要用aud分隔的,不需要单独判断了 + //if codecID == base.RTMPCodecIDAVC && (nalType == avc.NALUTypeSEI || nalType == avc.NALUTypeIDRSlice || nalType == avc.NALUTypeSlice) { + switch codecID { + case base.RTMPCodecIDAVC: + out = append(out, avc.AUDNALU...) + case base.RTMPCodecIDHEVC: + out = append(out, hevc.AUDNALU...) } + audSent = true } - switch nalType { - case avc.NALUTypeSlice: - spsppsSent = false - case avc.NALUTypeIDRSlice: - // 如果是首个关键帧,在前面写入sps pps - if !spsppsSent { - var err error - out, err = s.appendSPSPPS(out) - if err != nil { - nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey) - return + // 关键帧前追加sps pps + if codecID == base.RTMPCodecIDAVC { + // h264的逻辑,一个tag中,多个连续的关键帧只追加一个,不连续则每个关键帧前都追加。为什么要这样处理 + switch nalType { + case avc.NALUTypeIDRSlice: + if !spsppsSent { + if out, err = s.appendSPSPPS(out); err != nil { + nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey) + return + } } + spsppsSent = true + case avc.NALUTypeSlice: + // 这里只有P帧,没有SEI。为什么要这样处理 + spsppsSent = false + } + } else { + switch nalType { + case hevc.NALUTypeSliceIDR, hevc.NALUTypeSliceIDRNLP, hevc.NALUTypeSliceCRANUT: + if !spsppsSent { + if out, err = s.appendSPSPPS(out); err != nil { + nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey) + return + } + } + spsppsSent = true + default: + // 这里简化了,只要不是关键帧,就刷新标志 + spsppsSent = false } - spsppsSent = true - } + // 如果写入了aud或spspps,则用start code3,否则start code4。为什么要这样处理 // 这里不知为什么要区分写入两种类型的start code if len(out) == 0 { out = append(out, avc.NALUStartCode4...) @@ -169,7 +214,6 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) { i += nalBytes } - key := ftype == base.RTMPFrameTypeKey && htype == base.RTMPAVCPacketTypeNALU dts := uint64(msg.Header.TimestampAbs) * 90 if s.audioCacheFrames != nil && s.audioCacheFirstFramePTS+maxAudioCacheDelayByVideo < dts { @@ -180,7 +224,7 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) { frame.CC = s.videoCC frame.DTS = dts frame.PTS = frame.DTS + uint64(cts)*90 - frame.Key = key + frame.Key = msg.IsVideoKeyNALU() frame.Raw = out frame.Pid = mpegts.PidVideo frame.Sid = mpegts.StreamIDVideo @@ -254,12 +298,6 @@ func (s *Streamer) cacheAACSeqHeader(msg base.RTMPMsg) error { return s.adts.InitWithAACAudioSpecificConfig(msg.Payload[2:]) } -func (s *Streamer) cacheSPSPPS(msg base.RTMPMsg) error { - var err error - s.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload) - return err -} - func (s *Streamer) appendSPSPPS(out []byte) ([]byte, error) { if s.spspps == nil { return out, ErrHLS diff --git a/pkg/hls/var.go b/pkg/hls/var.go new file mode 100644 index 0000000..f8f115f --- /dev/null +++ b/pkg/hls/var.go @@ -0,0 +1,13 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package hls + +var ( + calcFragmentHeaderQueueSize = 16 +) diff --git a/pkg/httpflv/server.go b/pkg/httpflv/server.go deleted file mode 100644 index caa114b..0000000 --- a/pkg/httpflv/server.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2019, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package httpflv - -import ( - "crypto/tls" - "net" - "sync" - - "github.com/q191201771/naza/pkg/nazalog" -) - -type ServerObserver interface { - // 通知上层有新的拉流者 - // 返回值: true则允许拉流,false则关闭连接 - OnNewHTTPFLVSubSession(session *SubSession) bool - - OnDelHTTPFLVSubSession(session *SubSession) -} - -type ServerConfig struct { - Enable bool `json:"enable"` - SubListenAddr string `json:"sub_listen_addr"` - EnableHTTPS bool `json:"enable_https"` - HTTPSAddr string `json:"https_addr"` - HTTPSCertFile string `json:"https_cert_file"` - HTTPSKeyFile string `json:"https_key_file"` -} - -type Server struct { - observer ServerObserver - config ServerConfig - ln net.Listener - httpsLn net.Listener -} - -// TODO chef: 监听太难看了,考虑直接传入Listener对象,或直接路由进来,使得不同server可以共用端口 - -func NewServer(observer ServerObserver, config ServerConfig) *Server { - return &Server{ - observer: observer, - config: config, - } -} - -func (server *Server) Listen() (err error) { - if server.config.Enable { - if server.ln, err = net.Listen("tcp", server.config.SubListenAddr); err != nil { - return - } - nazalog.Infof("start httpflv server listen. addr=%s", server.config.SubListenAddr) - } - - if server.config.EnableHTTPS { - var cert tls.Certificate - cert, err = tls.LoadX509KeyPair(server.config.HTTPSCertFile, server.config.HTTPSKeyFile) - if err != nil { - return err - } - tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}} - if server.httpsLn, err = tls.Listen("tcp", server.config.HTTPSAddr, tlsConfig); err != nil { - return - } - nazalog.Infof("start httpsflv server listen. addr=%s", server.config.HTTPSAddr) - } - - return -} - -func (server *Server) RunLoop() error { - var wg sync.WaitGroup - - // TODO chef: 临时这么搞,错误值丢失了,重构一下 - - if server.ln != nil { - wg.Add(1) - go func() { - for { - conn, err := server.ln.Accept() - if err != nil { - break - } - go server.handleConnect(conn, "http") - } - wg.Done() - }() - } - - if server.httpsLn != nil { - wg.Add(1) - go func() { - for { - conn, err := server.httpsLn.Accept() - if err != nil { - break - } - go server.handleConnect(conn, "https") - } - wg.Done() - }() - } - - wg.Wait() - return nil -} - -func (server *Server) Dispose() { - if server.ln != nil { - if err := server.ln.Close(); err != nil { - nazalog.Error(err) - } - } - - if server.httpsLn != nil { - if err := server.httpsLn.Close(); err != nil { - nazalog.Error(err) - } - } -} - -func (server *Server) handleConnect(conn net.Conn, scheme string) { - nazalog.Infof("accept a httpflv connection. remoteAddr=%s", conn.RemoteAddr().String()) - session := NewSubSession(conn, scheme) - if err := session.ReadRequest(); err != nil { - nazalog.Errorf("[%s] read httpflv SubSession request error. err=%v", session.uniqueKey, err) - return - } - nazalog.Debugf("[%s] < read http request. url=%s", session.uniqueKey, session.URL()) - - if !server.observer.OnNewHTTPFLVSubSession(session) { - session.Dispose() - } - - err := session.RunLoop() - nazalog.Debugf("[%s] httpflv sub session loop done. err=%v", session.uniqueKey, err) - server.observer.OnDelHTTPFLVSubSession(session) -} diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index 432ff64..6c123e9 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -9,15 +9,10 @@ package httpflv import ( - "fmt" "net" - "strings" - "time" "github.com/q191201771/lal/pkg/base" - "github.com/q191201771/naza/pkg/nazahttp" - "github.com/q191201771/naza/pkg/connection" "github.com/q191201771/naza/pkg/nazalog" @@ -26,171 +21,48 @@ import ( var flvHTTPResponseHeader []byte type SubSession struct { - uniqueKey string - IsFresh bool - - scheme string - - pathWithRawQuery string - headers map[string]string - urlCtx base.URLContext - - conn connection.Connection - prevConnStat connection.Stat - staleStat *connection.Stat - stat base.StatSession - isWebSocket bool + *base.HTTPSubSession // 直接使用它提供的函数 + IsFresh bool } -func NewSubSession(conn net.Conn, scheme string) *SubSession { +func NewSubSession(conn net.Conn, urlCtx base.URLContext, isWebSocket bool, websocketKey string) *SubSession { uk := base.GenUKFLVSubSession() s := &SubSession{ - uniqueKey: uk, - scheme: scheme, - IsFresh: true, - conn: connection.New(conn, func(option *connection.Option) { - option.ReadBufSize = readBufSize - option.WriteChanSize = wChanSize - option.WriteTimeoutMS = subSessionWriteTimeoutMS + base.NewHTTPSubSession(base.HTTPSubSessionOption{ + Conn: conn, + ConnModOption: func(option *connection.Option) { + option.WriteChanSize = SubSessionWriteChanSize + option.WriteTimeoutMS = SubSessionWriteTimeoutMS + }, + UK: uk, + Protocol: base.ProtocolHTTPFLV, + URLCtx: urlCtx, + IsWebSocket: isWebSocket, + WebSocketKey: websocketKey, }), - stat: base.StatSession{ - Protocol: base.ProtocolHTTPFLV, - SessionID: uk, - StartTime: time.Now().Format("2006-01-02 15:04:05.999"), - RemoteAddr: conn.RemoteAddr().String(), - }, + true, } nazalog.Infof("[%s] lifecycle new httpflv SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String()) return s } -// TODO chef: read request timeout -func (session *SubSession) ReadRequest() (err error) { - defer func() { - if err != nil { - session.Dispose() - } - }() - - var requestLine string - if requestLine, session.headers, err = nazahttp.ReadHTTPHeader(session.conn); err != nil { - return - } - if _, session.pathWithRawQuery, _, err = nazahttp.ParseHTTPRequestLine(requestLine); err != nil { - return - } - - rawURL := fmt.Sprintf("%s://%s%s", session.scheme, session.headers["Host"], session.pathWithRawQuery) - _ = rawURL - - session.urlCtx, err = base.ParseHTTPFLVURL(rawURL, session.scheme == "https") - if session.headers["Connection"] == "Upgrade" && session.headers["Upgrade"] == "websocket" { - session.isWebSocket = true - //回复升级为websocket - session.writeRawPacket(base.UpdateWebSocketHeader(session.headers["Sec-WebSocket-Key"])) - } - return -} - -func (session *SubSession) RunLoop() error { - buf := make([]byte, 128) - _, err := session.conn.Read(buf) - return err -} - func (session *SubSession) WriteHTTPResponseHeader() { - nazalog.Debugf("[%s] > W http response header.", session.uniqueKey) - if session.isWebSocket { - - } else { - session.WriteRawPacket(flvHTTPResponseHeader) - } + nazalog.Debugf("[%s] > W http response header.", session.UniqueKey()) + session.HTTPSubSession.WriteHTTPResponseHeader(flvHTTPResponseHeader) } func (session *SubSession) WriteFLVHeader() { - nazalog.Debugf("[%s] > W http flv header.", session.uniqueKey) - session.WriteRawPacket(FLVHeader) - + nazalog.Debugf("[%s] > W http flv header.", session.UniqueKey()) + session.Write(FLVHeader) } func (session *SubSession) WriteTag(tag *Tag) { - session.WriteRawPacket(tag.Raw) - -} - -func (session *SubSession) WriteRawPacket(pkt []byte) { - if session.isWebSocket { - wsHeader := base.WSHeader{ - Fin: true, - Rsv1: false, - Rsv2: false, - Rsv3: false, - Opcode: base.WSO_Binary, - PayloadLength: uint64(len(pkt)), - Masked: false, - } - session.writeRawPacket(base.MakeWSFrameHeader(wsHeader)) - } - session.writeRawPacket(pkt) -} -func (session *SubSession) writeRawPacket(pkt []byte) { - _, _ = session.conn.Write(pkt) + session.Write(tag.Raw) } func (session *SubSession) Dispose() error { - nazalog.Infof("[%s] lifecycle dispose httpflv SubSession.", session.uniqueKey) - return session.conn.Close() -} - -func (session *SubSession) URL() string { - return session.urlCtx.URL -} - -func (session *SubSession) AppName() string { - return session.urlCtx.PathWithoutLastItem -} - -func (session *SubSession) StreamName() string { - return strings.TrimSuffix(session.urlCtx.LastItemOfPath, ".flv") -} - -func (session *SubSession) RawQuery() string { - return session.urlCtx.RawQuery -} - -func (session *SubSession) UniqueKey() string { - return session.uniqueKey -} - -func (session *SubSession) GetStat() base.StatSession { - currStat := session.conn.GetStat() - session.stat.ReadBytesSum = currStat.ReadBytesSum - session.stat.WroteBytesSum = currStat.WroteBytesSum - return session.stat -} - -func (session *SubSession) UpdateStat(intervalSec uint32) { - currStat := session.conn.GetStat() - rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum - session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) - wDiff := currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum - session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec)) - session.stat.Bitrate = session.stat.WriteBitrate - session.prevConnStat = currStat -} - -func (session *SubSession) IsAlive() (readAlive, writeAlive bool) { - currStat := session.conn.GetStat() - if session.staleStat == nil { - session.staleStat = new(connection.Stat) - *session.staleStat = currStat - return true, true - } - - readAlive = !(currStat.ReadBytesSum-session.staleStat.ReadBytesSum == 0) - writeAlive = !(currStat.WroteBytesSum-session.staleStat.WroteBytesSum == 0) - *session.staleStat = currStat - return + nazalog.Infof("[%s] lifecycle dispose httpflv SubSession.", session.UniqueKey()) + return session.HTTPSubSession.Dispose() } func init() { diff --git a/pkg/httpflv/var.go b/pkg/httpflv/var.go index 4f450bf..29d6d80 100644 --- a/pkg/httpflv/var.go +++ b/pkg/httpflv/var.go @@ -8,8 +8,10 @@ package httpflv -var readBufSize = 256 //16384 // ClientPullSession 和 SubSession 读取数据时 -var wChanSize = 1024 // SubSession 发送数据时 channel 的大小 -var subSessionWriteTimeoutMS = 10000 +var ( + SubSessionWriteChanSize = 1024 // SubSession发送数据时channel的大小 + SubSessionWriteTimeoutMS = 10000 + FLVHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00} +) -var FLVHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00} +var readBufSize = 256 //16384 // ClientPullSession读取数据时 diff --git a/pkg/httpts/server.go b/pkg/httpts/server.go deleted file mode 100644 index 978dfd0..0000000 --- a/pkg/httpts/server.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2020, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package httpts - -import ( - "net" - - log "github.com/q191201771/naza/pkg/nazalog" -) - -type ServerObserver interface { - // 通知上层有新的拉流者 - // 返回值: true则允许拉流,false则关闭连接 - OnNewHTTPTSSubSession(session *SubSession) bool - - OnDelHTTPTSSubSession(session *SubSession) -} - -type Server struct { - observer ServerObserver - addr string - ln net.Listener -} - -func NewServer(observer ServerObserver, addr string) *Server { - return &Server{ - observer: observer, - addr: addr, - } -} - -func (server *Server) Listen() (err error) { - if server.ln, err = net.Listen("tcp", server.addr); err != nil { - return - } - log.Infof("start httpts server listen. addr=%s", server.addr) - return -} - -func (server *Server) RunLoop() error { - for { - conn, err := server.ln.Accept() - if err != nil { - return err - } - go server.handleConnect(conn) - } -} - -func (server *Server) Dispose() { - if server.ln == nil { - return - } - if err := server.ln.Close(); err != nil { - log.Error(err) - } -} - -func (server *Server) handleConnect(conn net.Conn) { - log.Infof("accept a httpts connection. remoteAddr=%s", conn.RemoteAddr().String()) - session := NewSubSession(conn, "http") - if err := session.ReadRequest(); err != nil { - log.Errorf("[%s] read httpts SubSession request error. err=%v", session.uniqueKey, err) - return - } - log.Debugf("[%s] < read http request. url=%s", session.uniqueKey, session.URL()) - - if !server.observer.OnNewHTTPTSSubSession(session) { - session.Dispose() - } - - err := session.RunLoop() - log.Debugf("[%s] httpts sub session loop done. err=%v", session.uniqueKey, err) - server.observer.OnDelHTTPTSSubSession(session) -} diff --git a/pkg/httpts/server_sub_session.go b/pkg/httpts/server_sub_session.go index 05094d7..8b5a1bf 100644 --- a/pkg/httpts/server_sub_session.go +++ b/pkg/httpts/server_sub_session.go @@ -9,180 +9,49 @@ package httpts import ( - "fmt" "net" - "strings" - "time" - - "github.com/q191201771/lal/pkg/mpegts" "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/connection" - "github.com/q191201771/naza/pkg/nazahttp" "github.com/q191201771/naza/pkg/nazalog" ) var tsHTTPResponseHeader []byte type SubSession struct { - uniqueKey string - IsFresh bool - - scheme string - - pathWithRawQuery string - headers map[string]string - urlCtx base.URLContext - - conn connection.Connection - prevConnStat connection.Stat - staleStat *connection.Stat - stat base.StatSession - isWebSocket bool + *base.HTTPSubSession // 直接使用它提供的函数 + IsFresh bool } -func NewSubSession(conn net.Conn, scheme string) *SubSession { +func NewSubSession(conn net.Conn, urlCtx base.URLContext, isWebSocket bool, websocketKey string) *SubSession { uk := base.GenUKTSSubSession() s := &SubSession{ - uniqueKey: uk, - scheme: scheme, - IsFresh: true, - conn: connection.New(conn, func(option *connection.Option) { - option.ReadBufSize = readBufSize - option.WriteChanSize = wChanSize - option.WriteTimeoutMS = subSessionWriteTimeoutMS + base.NewHTTPSubSession(base.HTTPSubSessionOption{ + Conn: conn, + ConnModOption: func(option *connection.Option) { + option.WriteChanSize = SubSessionWriteChanSize + option.WriteTimeoutMS = SubSessionWriteTimeoutMS + }, + UK: uk, + Protocol: base.ProtocolHTTPTS, + URLCtx: urlCtx, + IsWebSocket: isWebSocket, + WebSocketKey: websocketKey, }), - stat: base.StatSession{ - Protocol: base.ProtocolHTTPTS, - SessionID: uk, - StartTime: time.Now().Format("2006-01-02 15:04:05.999"), - RemoteAddr: conn.RemoteAddr().String(), - }, + true, } nazalog.Infof("[%s] lifecycle new httpts SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String()) return s } -// TODO chef: read request timeout -func (session *SubSession) ReadRequest() (err error) { - defer func() { - if err != nil { - session.Dispose() - } - }() - - var requestLine string - if requestLine, session.headers, err = nazahttp.ReadHTTPHeader(session.conn); err != nil { - return - } - if _, session.pathWithRawQuery, _, err = nazahttp.ParseHTTPRequestLine(requestLine); err != nil { - return - } - - rawURL := fmt.Sprintf("%s://%s%s", session.scheme, session.headers["Host"], session.pathWithRawQuery) - _ = rawURL - - session.urlCtx, err = base.ParseHTTPTSURL(rawURL, session.scheme == "https") - if session.headers["Connection"] == "Upgrade" && session.headers["Upgrade"] == "websocket" { - session.isWebSocket = true - //回复升级为websocket - session.writeRawPacket(base.UpdateWebSocketHeader(session.headers["Sec-WebSocket-Key"])) - } - return -} - -func (session *SubSession) RunLoop() error { - buf := make([]byte, 128) - _, err := session.conn.Read(buf) - return err -} - func (session *SubSession) WriteHTTPResponseHeader() { - nazalog.Debugf("[%s] > W http response header.", session.uniqueKey) - if session.isWebSocket { - - } else { - session.WriteRawPacket(tsHTTPResponseHeader) - } -} - -func (session *SubSession) WriteFragmentHeader() { - nazalog.Debugf("[%s] > W http response header.", session.uniqueKey) - session.WriteRawPacket(mpegts.FixedFragmentHeader) + nazalog.Debugf("[%s] > W http response header.", session.UniqueKey()) + session.HTTPSubSession.WriteHTTPResponseHeader(tsHTTPResponseHeader) } -func (session *SubSession) WriteRawPacket(pkt []byte) { - if session.isWebSocket { - wsHeader := base.WSHeader{ - Fin: true, - Rsv1: false, - Rsv2: false, - Rsv3: false, - Opcode: base.WSO_Binary, - PayloadLength: uint64(len(pkt)), - Masked: false, - } - session.writeRawPacket(base.MakeWSFrameHeader(wsHeader)) - } - session.writeRawPacket(pkt) -} -func (session *SubSession) writeRawPacket(pkt []byte) { - _, _ = session.conn.Write(pkt) -} func (session *SubSession) Dispose() error { - nazalog.Infof("[%s] lifecycle dispose httpts SubSession.", session.uniqueKey) - return session.conn.Close() -} - -func (session *SubSession) UpdateStat(intervalSec uint32) { - currStat := session.conn.GetStat() - rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum - session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec)) - wDiff := currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum - session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec)) - session.stat.Bitrate = session.stat.WriteBitrate - session.prevConnStat = currStat -} - -func (session *SubSession) GetStat() base.StatSession { - connStat := session.conn.GetStat() - session.stat.ReadBytesSum = connStat.ReadBytesSum - session.stat.WroteBytesSum = connStat.WroteBytesSum - return session.stat -} - -func (session *SubSession) IsAlive() (readAlive, writeAlive bool) { - currStat := session.conn.GetStat() - if session.staleStat == nil { - session.staleStat = new(connection.Stat) - *session.staleStat = currStat - return true, true - } - - readAlive = !(currStat.ReadBytesSum-session.staleStat.ReadBytesSum == 0) - writeAlive = !(currStat.WroteBytesSum-session.staleStat.WroteBytesSum == 0) - *session.staleStat = currStat - return -} - -func (session *SubSession) URL() string { - return session.urlCtx.URL -} - -func (session *SubSession) AppName() string { - return session.urlCtx.PathWithoutLastItem -} - -func (session *SubSession) StreamName() string { - return strings.TrimSuffix(session.urlCtx.LastItemOfPath, ".ts") -} - -func (session *SubSession) RawQuery() string { - return session.urlCtx.RawQuery -} - -func (session *SubSession) UniqueKey() string { - return session.uniqueKey + nazalog.Infof("[%s] lifecycle dispose httpts SubSession.", session.UniqueKey()) + return session.HTTPSubSession.Dispose() } func init() { diff --git a/pkg/httpts/var.go b/pkg/httpts/var.go index 4b8e640..a4cf988 100644 --- a/pkg/httpts/var.go +++ b/pkg/httpts/var.go @@ -8,6 +8,7 @@ package httpts -var readBufSize = 256 //16384 // SubSession读取数据时 -var wChanSize = 1024 // SubSession发送数据时channel的大小 -var subSessionWriteTimeoutMS = 10000 +var ( + SubSessionWriteChanSize = 1024 + SubSessionWriteTimeoutMS = 10000 +) diff --git a/pkg/innertest/innertest.go b/pkg/innertest/innertest.go index 20eabd6..cff45a7 100644 --- a/pkg/innertest/innertest.go +++ b/pkg/innertest/innertest.go @@ -10,7 +10,6 @@ package innertest import ( "bytes" - "encoding/json" "fmt" "io" "io/ioutil" @@ -75,19 +74,16 @@ func InnerTestEntry(t *testing.T) { var err error - go logic.Entry(confFile) + logic.Init(confFile) + go logic.RunLoop() time.Sleep(200 * time.Millisecond) - var config logic.Config - rawContent, err := ioutil.ReadFile(confFile) - nazalog.Assert(nil, err) - err = json.Unmarshal(rawContent, &config) - nazalog.Assert(nil, err) + config := logic.GetConfig() _ = os.RemoveAll(config.HLSConfig.OutPath) pushURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RTMPConfig.Addr) - httpflvPullURL = fmt.Sprintf("http://127.0.0.1%s/live/innertest.flv", config.HTTPFLVConfig.SubListenAddr) + httpflvPullURL = fmt.Sprintf("http://127.0.0.1%s/live/innertest.flv", config.HTTPFLVConfig.HTTPListenAddr) rtmpPullURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RTMPConfig.Addr) err = fileReader.Open(rFLVFileName) diff --git a/pkg/logic/config.go b/pkg/logic/config.go index a0caa03..d569d8d 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -9,24 +9,23 @@ package logic import ( - "github.com/q191201771/lal/pkg/httpflv" - "github.com/q191201771/lal/pkg/hls" "github.com/q191201771/naza/pkg/nazalog" ) -const ConfVersion = "v0.1.2" +const ConfVersion = "v0.2.0" type Config struct { - ConfVersion string `json:"conf_version"` - RTMPConfig RTMPConfig `json:"rtmp"` - HTTPFLVConfig HTTPFLVConfig `json:"httpflv"` - HLSConfig HLSConfig `json:"hls"` - HTTPTSConfig HTTPTSConfig `json:"httpts"` - RTSPConfig RTSPConfig `json:"rtsp"` - RecordConfig RecordConfig `json:"record"` - RelayPushConfig RelayPushConfig `json:"relay_push"` - RelayPullConfig RelayPullConfig `json:"relay_pull"` + ConfVersion string `json:"conf_version"` + RTMPConfig RTMPConfig `json:"rtmp"` + DefaultHTTPConfig DefaultHTTPConfig `json:"default_http"` + HTTPFLVConfig HTTPFLVConfig `json:"httpflv"` + HLSConfig HLSConfig `json:"hls"` + HTTPTSConfig HTTPTSConfig `json:"httpts"` + RTSPConfig RTSPConfig `json:"rtsp"` + RecordConfig RecordConfig `json:"record"` + RelayPushConfig RelayPushConfig `json:"relay_push"` + RelayPullConfig RelayPullConfig `json:"relay_pull"` HTTPAPIConfig HTTPAPIConfig `json:"http_api"` ServerID string `json:"server_id"` @@ -41,19 +40,24 @@ type RTMPConfig struct { GOPNum int `json:"gop_num"` } +type DefaultHTTPConfig struct { + CommonHTTPAddrConfig +} + type HTTPFLVConfig struct { - httpflv.ServerConfig + CommonHTTPServerConfig + GOPNum int `json:"gop_num"` } type HTTPTSConfig struct { - Enable bool `json:"enable"` - SubListenAddr string `json:"sub_listen_addr"` + CommonHTTPServerConfig } type HLSConfig struct { - SubListenAddr string `json:"sub_listen_addr"` - UseMemoryAsDiskFlag bool `json:"use_memory_as_disk_flag"` + CommonHTTPServerConfig + + UseMemoryAsDiskFlag bool `json:"use_memory_as_disk_flag"` hls.MuxerConfig } @@ -100,3 +104,17 @@ type PProfConfig struct { Enable bool `json:"enable"` Addr string `json:"addr"` } + +type CommonHTTPServerConfig struct { + CommonHTTPAddrConfig + + Enable bool `json:"enable"` + EnableHTTPS bool `json:"enable_https"` +} + +type CommonHTTPAddrConfig struct { + HTTPListenAddr string `json:"http_listen_addr"` + HTTPSListenAddr string `json:"https_listen_addr"` + HTTPSCertFile string `json:"https_cert_file"` + HTTPSKeyFile string `json:"https_key_file"` +} diff --git a/pkg/logic/entry.go b/pkg/logic/entry.go index f0b4284..4b1332f 100644 --- a/pkg/logic/entry.go +++ b/pkg/logic/entry.go @@ -32,11 +32,16 @@ var ( sm *ServerManager ) -func Entry(confFile string) { +// TODO(chef) 临时供innertest使用,后面应该重构 +func GetConfig() *Config { + return config +} + +func Init(confFile string) { LoadConfAndInitLog(confFile) - if dir, err := os.Getwd(); err == nil { - nazalog.Infof("wd: %s", dir) - } + + dir, _ := os.Getwd() + nazalog.Infof("wd: %s", dir) nazalog.Infof("args: %s", strings.Join(os.Args, " ")) nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine()) nazalog.Infof("version: %s", base.LALFullInfo) @@ -56,7 +61,9 @@ func Entry(confFile string) { nazalog.Errorf("record mpegts mkdir error. path=%s, err=%+v", config.RecordConfig.MPEGTSOutPath, err) } } +} +func RunLoop() { sm = NewServerManager() if config.PProfConfig.Enable { @@ -164,6 +171,11 @@ func LoadConfAndInitLog(confFile string) *Config { } } + // 如果具体的HTTP应用没有设置HTTP监听相关的配置,则尝试使用全局配置 + mergeCommonHTTPAddrConfig(&config.HTTPFLVConfig.CommonHTTPAddrConfig, &config.DefaultHTTPConfig.CommonHTTPAddrConfig) + mergeCommonHTTPAddrConfig(&config.HTTPTSConfig.CommonHTTPAddrConfig, &config.DefaultHTTPConfig.CommonHTTPAddrConfig) + mergeCommonHTTPAddrConfig(&config.HLSConfig.CommonHTTPAddrConfig, &config.DefaultHTTPConfig.CommonHTTPAddrConfig) + // 配置不存在时,设置默认值 if !j.Exist("hls.cleanup_mode") { const defaultMode = hls.CleanupModeInTheEnd @@ -197,3 +209,18 @@ func runWebPProf(addr string) { return } } + +func mergeCommonHTTPAddrConfig(dst, src *CommonHTTPAddrConfig) { + if dst.HTTPListenAddr == "" && src.HTTPListenAddr != "" { + dst.HTTPListenAddr = src.HTTPListenAddr + } + if dst.HTTPSListenAddr == "" && src.HTTPSListenAddr != "" { + dst.HTTPSListenAddr = src.HTTPSListenAddr + } + if dst.HTTPSCertFile == "" && src.HTTPSCertFile != "" { + dst.HTTPSCertFile = src.HTTPSCertFile + } + if dst.HTTPSKeyFile == "" && src.HTTPSKeyFile != "" { + dst.HTTPSKeyFile = src.HTTPSKeyFile + } +} diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 2d1af95..3a3f9d5 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -71,7 +71,7 @@ type Group struct { recordMPEGTS *mpegts.FileWriter // rtmp pub/pull使用 - gopCache *GOPCache + rtmpGopCache *GOPCache httpflvGopCache *GOPCache // rtsp pub使用 @@ -80,6 +80,9 @@ type Group struct { sps []byte pps []byte + // mpegts使用 + patpmt []byte + // tickCount uint32 } @@ -120,7 +123,7 @@ func NewGroup(appName string, streamName string, pullEnable bool, pullURL string httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}), httptsSubSessionSet: make(map[*httpts.SubSession]struct{}), rtspSubSessionSet: make(map[*rtsp.SubSession]struct{}), - gopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum), + rtmpGopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum), httpflvGopCache: NewGOPCache("httpflv", uk, config.HTTPFLVConfig.GOPNum), pullProxy: &pullProxy{}, url2PushProxy: url2PushProxy, @@ -389,9 +392,8 @@ func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) { // 这里应该也要考虑触发hls muxer开启 // 也即HTTPTS sub需要使用hls muxer,hls muxer开启和关闭都要考虑HTTPTS sub func (group *Group) AddHTTPTSSubSession(session *httpts.SubSession) { - nazalog.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey()) + nazalog.Debugf("[%s] [%s] add httpts SubSession into group.", group.UniqueKey, session.UniqueKey()) session.WriteHTTPResponseHeader() - session.WriteFragmentHeader() group.mutex.Lock() defer group.mutex.Unlock() @@ -478,6 +480,17 @@ func (group *Group) BroadcastRTMP(msg base.RTMPMsg) { group.broadcastRTMP(msg) } +// hls.Muxer +func (group *Group) OnPATPMT(b []byte) { + group.patpmt = b + + if group.recordMPEGTS != nil { + if err := group.recordMPEGTS.Write(b); err != nil { + nazalog.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err) + } + } +} + // hls.Muxer func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) { // 因为最前面Feed时已经加锁了,所以这里回调上来就不用加锁了 @@ -485,11 +498,12 @@ func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) { for session := range group.httptsSubSessionSet { if session.IsFresh { if boundary { + session.Write(group.patpmt) + session.Write(rawFrame) session.IsFresh = false - session.WriteRawPacket(rawFrame) } } else { - session.WriteRawPacket(rawFrame) + session.Write(rawFrame) } } @@ -542,6 +556,7 @@ func (group *Group) OnAVConfig(asc, vps, sps, pps []byte) { // rtsp.PubSession func (group *Group) OnAVPacket(pkt base.AVPacket) { + //nazalog.Tracef("[%s] > Group::OnAVPacket. type=%s, ts=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp) msg, err := remux.AVPacket2RTMPMsg(pkt) if err != nil { nazalog.Errorf("[%s] remux av packet to rtmp msg failed. err=+%v", group.UniqueKey, err) @@ -733,20 +748,20 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { // ## 3.1. 如果是新的 sub session,发送已缓存的信息 if session.IsFresh { // TODO chef: 头信息和full gop也可以在SubSession刚加入时发送 - if group.gopCache.Metadata != nil { + if group.rtmpGopCache.Metadata != nil { //nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey) - _ = session.Write(group.gopCache.Metadata) + _ = session.Write(group.rtmpGopCache.Metadata) } - if group.gopCache.VideoSeqHeader != nil { + if group.rtmpGopCache.VideoSeqHeader != nil { //nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey) - _ = session.Write(group.gopCache.VideoSeqHeader) + _ = session.Write(group.rtmpGopCache.VideoSeqHeader) } - if group.gopCache.AACSeqHeader != nil { + if group.rtmpGopCache.AACSeqHeader != nil { //nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey) - _ = session.Write(group.gopCache.AACSeqHeader) + _ = session.Write(group.rtmpGopCache.AACSeqHeader) } - for i := 0; i < group.gopCache.GetGOPCount(); i++ { - for _, item := range group.gopCache.GetGOPDataAt(i) { + for i := 0; i < group.rtmpGopCache.GetGOPCount(); i++ { + for _, item := range group.rtmpGopCache.GetGOPDataAt(i) { _ = session.Write(item) } } @@ -766,17 +781,17 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { } if v.pushSession.IsFresh { - if group.gopCache.Metadata != nil { - _ = v.pushSession.Write(group.gopCache.Metadata) + if group.rtmpGopCache.Metadata != nil { + _ = v.pushSession.Write(group.rtmpGopCache.Metadata) } - if group.gopCache.VideoSeqHeader != nil { - _ = v.pushSession.Write(group.gopCache.VideoSeqHeader) + if group.rtmpGopCache.VideoSeqHeader != nil { + _ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader) } - if group.gopCache.AACSeqHeader != nil { - _ = v.pushSession.Write(group.gopCache.AACSeqHeader) + if group.rtmpGopCache.AACSeqHeader != nil { + _ = v.pushSession.Write(group.rtmpGopCache.AACSeqHeader) } - for i := 0; i < group.gopCache.GetGOPCount(); i++ { - for _, item := range group.gopCache.GetGOPDataAt(i) { + for i := 0; i < group.rtmpGopCache.GetGOPCount(); i++ { + for _, item := range group.rtmpGopCache.GetGOPDataAt(i) { _ = v.pushSession.Write(item) } } @@ -792,24 +807,24 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { for session := range group.httpflvSubSessionSet { if session.IsFresh { if group.httpflvGopCache.Metadata != nil { - session.WriteRawPacket(group.httpflvGopCache.Metadata) + session.Write(group.httpflvGopCache.Metadata) } if group.httpflvGopCache.VideoSeqHeader != nil { - session.WriteRawPacket(group.httpflvGopCache.VideoSeqHeader) + session.Write(group.httpflvGopCache.VideoSeqHeader) } if group.httpflvGopCache.AACSeqHeader != nil { - session.WriteRawPacket(group.httpflvGopCache.AACSeqHeader) + session.Write(group.httpflvGopCache.AACSeqHeader) } for i := 0; i < group.httpflvGopCache.GetGOPCount(); i++ { for _, item := range group.httpflvGopCache.GetGOPDataAt(i) { - session.WriteRawPacket(item) + session.Write(item) } } session.IsFresh = false } - session.WriteRawPacket(lrm2ft.Get()) + session.Write(lrm2ft.Get()) } // # 5. 录制flv文件 @@ -821,7 +836,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { // # 6. 缓存关键信息,以及gop if config.RTMPConfig.Enable { - group.gopCache.Feed(msg, lcd.Get) + group.rtmpGopCache.Feed(msg, lcd.Get) } if config.HTTPFLVConfig.Enable { group.httpflvGopCache.Feed(msg, lrm2ft.Get) @@ -1002,7 +1017,8 @@ func (group *Group) addIn() { if group.hlsMuxer != nil { nazalog.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer) } - group.hlsMuxer = hls.NewMuxer(group.streamName, &config.HLSConfig.MuxerConfig, group) + enable := config.HLSConfig.Enable || config.HLSConfig.EnableHTTPS + group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &config.HLSConfig.MuxerConfig, group) group.hlsMuxer.Start() } @@ -1085,8 +1101,12 @@ func (group *Group) delIn() { } } - group.gopCache.Clear() + group.rtmpGopCache.Clear() group.httpflvGopCache.Clear() + + // TODO(chef) 情况rtsp pub缓存的asc sps pps等数据 + + group.patpmt = nil } func (group *Group) disposeHLSMuxer() { diff --git a/pkg/logic/http_server_handler.go b/pkg/logic/http_server_handler.go new file mode 100644 index 0000000..d52af27 --- /dev/null +++ b/pkg/logic/http_server_handler.go @@ -0,0 +1,113 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "fmt" + "net/http" + "strings" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/httpts" + "github.com/q191201771/naza/pkg/nazalog" +) + +type HTTPServerHandlerObserver interface { + // 通知上层有新的拉流者 + // 返回值: true则允许拉流,false则关闭连接 + OnNewHTTPFLVSubSession(session *httpflv.SubSession) bool + + OnDelHTTPFLVSubSession(session *httpflv.SubSession) + + OnNewHTTPTSSubSession(session *httpts.SubSession) bool + OnDelHTTPTSSubSession(session *httpts.SubSession) +} + +type HTTPServerHandler struct { + observer HTTPServerHandlerObserver +} + +func NewHTTPServerHandler(observer HTTPServerHandlerObserver) *HTTPServerHandler { + return &HTTPServerHandler{ + observer: observer, + } +} + +func (h *HTTPServerHandler) ServeSubSession(writer http.ResponseWriter, req *http.Request) { + var ( + isHTTPS bool + scheme string + ) + // TODO(chef) 这里scheme直接使用http和https,没有考虑ws和wss,注意,后续的逻辑可能会依赖此处 + if req.TLS == nil { + isHTTPS = false + scheme = "http" + } else { + isHTTPS = true + scheme = "https" + } + rawURL := fmt.Sprintf("%s://%s%s", scheme, req.Host, req.RequestURI) + + conn, bio, err := writer.(http.Hijacker).Hijack() + if err != nil { + nazalog.Errorf("hijack failed. err=%+v", err) + return + } + if bio.Reader.Buffered() != 0 || bio.Writer.Buffered() != 0 { + nazalog.Errorf("hijack but buffer not empty. rb=%d, wb=%d", bio.Reader.Buffered(), bio.Writer.Buffered()) + } + + var ( + isWebSocket bool + webSocketKey string + ) + if req.Header.Get("Connection") == "Upgrade" && req.Header.Get("Upgrade") == "websocket" { + isWebSocket = true + webSocketKey = req.Header.Get("Sec-WebSocket-Key") + } + + if strings.HasSuffix(rawURL, ".flv") { + urlCtx, err := base.ParseHTTPURL(rawURL, isHTTPS, ".flv") + if err != nil { + nazalog.Errorf("parse http url failed. err=%+v", err) + _ = conn.Close() + return + } + + session := httpflv.NewSubSession(conn, urlCtx, isWebSocket, webSocketKey) + nazalog.Debugf("[%s] < read http request. url=%s", session.UniqueKey(), session.URL()) + if !h.observer.OnNewHTTPFLVSubSession(session) { + session.Dispose() + } + err = session.RunLoop() + nazalog.Debugf("[%s] httpflv sub session loop done. err=%v", session.UniqueKey(), err) + h.observer.OnDelHTTPFLVSubSession(session) + return + } + + if strings.HasSuffix(rawURL, ".ts") { + urlCtx, err := base.ParseHTTPURL(rawURL, isHTTPS, ".ts") + if err != nil { + nazalog.Errorf("parse http url failed. err=%+v", err) + _ = conn.Close() + return + } + + session := httpts.NewSubSession(conn, urlCtx, isWebSocket, webSocketKey) + nazalog.Debugf("[%s] < read http request. url=%s", session.UniqueKey(), session.URL()) + if !h.observer.OnNewHTTPTSSubSession(session) { + session.Dispose() + } + err = session.RunLoop() + nazalog.Debugf("[%s] httpts sub session loop done. err=%v", session.UniqueKey(), err) + h.observer.OnDelHTTPTSSubSession(session) + return + } +} diff --git a/pkg/logic/iface_impl.go b/pkg/logic/iface_impl.go index b97ef9f..8a99459 100644 --- a/pkg/logic/iface_impl.go +++ b/pkg/logic/iface_impl.go @@ -17,6 +17,8 @@ import ( "github.com/q191201771/lal/pkg/rtsp" ) +// TODO(chef) add base.HTTPSubSession + // client.pub: rtmp, rtsp // client.sub: rtmp, rtsp, flv, ts // server.push: rtmp, rtsp @@ -132,8 +134,7 @@ var ( var _ rtmp.ServerObserver = &ServerManager{} var _ rtsp.ServerObserver = &ServerManager{} -var _ httpflv.ServerObserver = &ServerManager{} -var _ httpts.ServerObserver = &ServerManager{} +var _ HTTPServerHandlerObserver = &ServerManager{} var _ HTTPAPIServerObserver = &ServerManager{} diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 434b54b..83662a9 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -13,24 +13,25 @@ import ( "sync" "time" + "github.com/q191201771/lal/pkg/hls" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/httpts" "github.com/q191201771/lal/pkg/rtsp" - "github.com/q191201771/lal/pkg/hls" - "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/naza/pkg/nazalog" ) type ServerManager struct { + httpServerManager *base.HTTPServerManager + httpServerHandler *HTTPServerHandler + hlsServerHandler *hls.ServerHandler + rtmpServer *rtmp.Server - httpflvServer *httpflv.Server - hlsServer *hls.Server - httptsServer *httpts.Server rtspServer *rtsp.Server httpAPIServer *HTTPAPIServer exitChan chan struct{} @@ -44,18 +45,18 @@ func NewServerManager() *ServerManager { groupMap: make(map[string]*Group), exitChan: make(chan struct{}), } + + if config.HTTPFLVConfig.Enable || config.HTTPFLVConfig.EnableHTTPS || + config.HTTPTSConfig.Enable || config.HTTPTSConfig.EnableHTTPS || + config.HLSConfig.Enable || config.HLSConfig.EnableHTTPS { + m.httpServerManager = base.NewHTTPServerManager() + m.httpServerHandler = NewHTTPServerHandler(m) + m.hlsServerHandler = hls.NewServerHandler(config.HLSConfig.OutPath) + } + if config.RTMPConfig.Enable { m.rtmpServer = rtmp.NewServer(m, config.RTMPConfig.Addr) } - if config.HTTPFLVConfig.Enable || config.HTTPFLVConfig.EnableHTTPS { - m.httpflvServer = httpflv.NewServer(m, config.HTTPFLVConfig.ServerConfig) - } - if config.HLSConfig.Enable { - m.hlsServer = hls.NewServer(config.HLSConfig.SubListenAddr, config.HLSConfig.OutPath) - } - if config.HTTPTSConfig.Enable { - m.httptsServer = httpts.NewServer(m, config.HTTPTSConfig.SubListenAddr) - } if config.RTSPConfig.Enable { m.rtspServer = rtsp.NewServer(config.RTSPConfig.Addr, m) } @@ -68,50 +69,72 @@ func NewServerManager() *ServerManager { func (sm *ServerManager) RunLoop() error { httpNotify.OnServerStart() - if sm.rtmpServer != nil { - if err := sm.rtmpServer.Listen(); err != nil { - return err + var addMux = func(config CommonHTTPServerConfig, pattern string, handler base.Handler, name string) error { + if config.Enable { + err := sm.httpServerManager.AddListen( + base.LocalAddrCtx{Addr: config.HTTPListenAddr}, + pattern, + handler, + ) + if err != nil { + nazalog.Infof("add http listen for %s failed. addr=%s, pattern=%s, err=%+v", name, config.HTTPListenAddr, pattern, err) + return err + } + nazalog.Infof("add http listen for %s. addr=%s, pattern=%s", name, config.HTTPListenAddr, pattern) } - go func() { - if err := sm.rtmpServer.RunLoop(); err != nil { - nazalog.Error(err) + if config.EnableHTTPS { + err := sm.httpServerManager.AddListen( + base.LocalAddrCtx{IsHTTPS: true, Addr: config.HTTPSListenAddr, CertFile: config.HTTPSCertFile, KeyFile: config.HTTPSKeyFile}, + pattern, + handler, + ) + if err != nil { + nazalog.Infof("add https listen for %s failed. addr=%s, pattern=%s, err=%+v", name, config.HTTPListenAddr, pattern, err) + return err } - }() + nazalog.Infof("add https listen for %s. addr=%s, pattern=%s", name, config.HTTPSListenAddr, pattern) + } + return nil } - if sm.httpflvServer != nil { - if err := sm.httpflvServer.Listen(); err != nil { - return err - } - go func() { - if err := sm.httpflvServer.RunLoop(); err != nil { - nazalog.Error(err) - } - }() + if err := addMux(config.HTTPFLVConfig.CommonHTTPServerConfig, HTTPFLVURLPath, sm.httpServerHandler.ServeSubSession, "httpflv"); err != nil { + return err + } + if err := addMux(config.HTTPTSConfig.CommonHTTPServerConfig, HTTPTSURLPath, sm.httpServerHandler.ServeSubSession, "httpts"); err != nil { + return err + } + if err := addMux(config.HTTPTSConfig.CommonHTTPServerConfig, HLSURLPath, sm.hlsServerHandler.ServeHTTP, "hls"); err != nil { + return err } - if sm.httptsServer != nil { - if err := sm.httptsServer.Listen(); err != nil { - return err + go func() { + if err := sm.httpServerManager.RunLoop(); err != nil { + nazalog.Error(err) } - go func() { - if err := sm.httptsServer.RunLoop(); err != nil { - nazalog.Error(err) - } - }() - } + }() - if sm.hlsServer != nil { - if err := sm.hlsServer.Listen(); err != nil { + if sm.rtmpServer != nil { + if err := sm.rtmpServer.Listen(); err != nil { return err } go func() { - if err := sm.hlsServer.RunLoop(); err != nil { + if err := sm.rtmpServer.RunLoop(); err != nil { nazalog.Error(err) } }() } + //if sm.hlsServer != nil { + // if err := sm.hlsServer.Listen(); err != nil { + // return err + // } + // go func() { + // if err := sm.hlsServer.RunLoop(); err != nil { + // nazalog.Error(err) + // } + // }() + //} + if sm.rtspServer != nil { if err := sm.rtspServer.Listen(); err != nil { return err @@ -177,18 +200,15 @@ func (sm *ServerManager) RunLoop() error { func (sm *ServerManager) Dispose() { nazalog.Debug("dispose server manager.") + + // TODO(chef) add httpServer + if sm.rtmpServer != nil { sm.rtmpServer.Dispose() } - if sm.httpflvServer != nil { - sm.httpflvServer.Dispose() - } - if sm.httptsServer != nil { - sm.httptsServer.Dispose() - } - if sm.hlsServer != nil { - sm.hlsServer.Dispose() - } + //if sm.hlsServer != nil { + // sm.hlsServer.Dispose() + //} sm.mutex.Lock() for _, group := range sm.groupMap { diff --git a/pkg/logic/var.go b/pkg/logic/var.go index 7fa7f3b..c0bf77d 100644 --- a/pkg/logic/var.go +++ b/pkg/logic/var.go @@ -8,6 +8,12 @@ package logic +var ( + HTTPFLVURLPath = "/live/" + HTTPTSURLPath = "/live/" + HLSURLPath = "/hls/" +) + //var relayPushCheckIntervalMS = 1000 var relayPushTimeoutMS = 5000 var relayPushWriteAVTimeoutMS = 5000 diff --git a/pkg/mpegts/mpegts.go b/pkg/mpegts/mpegts.go index 2276f85..1deeebf 100644 --- a/pkg/mpegts/mpegts.go +++ b/pkg/mpegts/mpegts.go @@ -74,6 +74,68 @@ var FixedFragmentHeader = []byte{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, } +// 每个TS文件都以固定的PAT,PMT开始 +var FixedFragmentHeaderHEVC = []byte{ + /* TS */ + 0x47, 0x40, 0x00, 0x10, 0x00, + /* PSI */ + 0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00, + /* PAT */ + 0x00, 0x01, 0xf0, 0x01, + /* CRC */ + 0x2e, 0x70, 0x19, 0x05, + + /* stuffing 167 bytes */ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + + /* TS */ + 0x47, 0x50, 0x01, 0x10, 0x00, + /* PSI */ + 0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00, + /* PMT */ + 0xe1, 0x00, + 0xf0, 0x00, + //0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */ + 0x24, 0xe1, 0x00, 0xf0, 0x00, + 0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */ + /* CRC */ + //0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */ + 0xc7, 0x72, 0xb7, 0xcb, + /* stuffing 157 bytes */ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, +} + // TS Packet Header const ( syncByte uint8 = 0x47 @@ -95,11 +157,13 @@ const ( const ( // ----------------------------------------------------------------------------- // - // 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax - // 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video + // 0x0F AAC (ISO/IEC 13818-7 Audio with ADTS transport syntax) + // 0x1B AVC (video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video) + // 0x24 HEVC (HEVC video stream as defined in Rec. ITU-T H.265 | ISO/IEC 23008-2 MPEG-H Part 2) // ----------------------------------------------------------------------------- - streamTypeAAC uint8 = 0x0F - streamTypeAVC uint8 = 0x1B + streamTypeAAC uint8 = 0x0F + streamTypeAVC uint8 = 0x1B + streamTypeHEVC uint8 = 0x24 ) // PES diff --git a/pkg/mpegts/mpegts_test.go b/pkg/mpegts/mpegts_test.go index 94caee7..6bc11e9 100644 --- a/pkg/mpegts/mpegts_test.go +++ b/pkg/mpegts/mpegts_test.go @@ -21,7 +21,7 @@ func TestParseFixedTSPacket(t *testing.T) { pat := mpegts.ParsePAT(mpegts.FixedFragmentHeader[5:]) nazalog.Debugf("%+v", pat) - h = mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeader[188:]) + h = mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeaderHEVC[188:]) nazalog.Debugf("%+v", h) pmt := mpegts.ParsePMT(mpegts.FixedFragmentHeader[188+5:]) nazalog.Debugf("%+v", pmt) diff --git a/pkg/mpegts/pack.go b/pkg/mpegts/pack.go index 10b01ba..fc62baf 100644 --- a/pkg/mpegts/pack.go +++ b/pkg/mpegts/pack.go @@ -9,7 +9,7 @@ package mpegts type Frame struct { - PTS uint64 + PTS uint64 // =(毫秒 * 90) DTS uint64 CC uint8 // continuity_counter of TS Header @@ -36,6 +36,8 @@ type Frame struct { // type OnTSPacket func(packet []byte) +// AnnexB格式的流转换为mpegts packet +// // @param frame: 各字段含义见mpegts.Frame结构体定义 // frame.CC 注意,内部会修改frame.CC的值,外部在调用结束后,可保存CC的值,供下次调用时使用 // frame.Raw 函数调用结束后,内部不会持有该内存块 diff --git a/pkg/rtmp/client_pull_session.go b/pkg/rtmp/client_pull_session.go index 6108ec2..3275d26 100644 --- a/pkg/rtmp/client_pull_session.go +++ b/pkg/rtmp/client_pull_session.go @@ -23,7 +23,8 @@ type PullSessionOption struct { // 如果为0,则没有超时时间 PullTimeoutMS int - ReadAVTimeoutMS int + ReadAVTimeoutMS int + HandshakeComplexFlag bool } var defaultPullSessionOption = PullSessionOption{ @@ -43,6 +44,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession { core: NewClientSession(CSTPullSession, func(option *ClientSessionOption) { option.DoTimeoutMS = opt.PullTimeoutMS option.ReadAVTimeoutMS = opt.ReadAVTimeoutMS + option.HandshakeComplexFlag = opt.HandshakeComplexFlag }), } } diff --git a/pkg/rtmp/client_push_session.go b/pkg/rtmp/client_push_session.go index 60c64b1..7a787a1 100644 --- a/pkg/rtmp/client_push_session.go +++ b/pkg/rtmp/client_push_session.go @@ -21,7 +21,8 @@ type PushSessionOption struct { // 如果为0,则没有超时时间 PushTimeoutMS int - WriteAVTimeoutMS int + WriteAVTimeoutMS int + HandshakeComplexFlag bool } var defaultPushSessionOption = PushSessionOption{ @@ -41,6 +42,7 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession { core: NewClientSession(CSTPushSession, func(option *ClientSessionOption) { option.DoTimeoutMS = opt.PushTimeoutMS option.WriteAVTimeoutMS = opt.WriteAVTimeoutMS + option.HandshakeComplexFlag = opt.HandshakeComplexFlag }), } } diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 30319d9..224f5f3 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -38,7 +38,7 @@ type ClientSession struct { packer *MessagePacker chunkComposer *ChunkComposer urlCtx base.URLContext - hc HandshakeClientSimple + hc IHandshakeClient peerWinAckSize int conn connection.Connection @@ -63,15 +63,17 @@ const ( type ClientSessionOption struct { // 单位毫秒,如果为0,则没有超时 - DoTimeoutMS int // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时 - ReadAVTimeoutMS int // 读取音视频数据的超时 - WriteAVTimeoutMS int // 发送音视频数据的超时 + DoTimeoutMS int // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时 + ReadAVTimeoutMS int // 读取音视频数据的超时 + WriteAVTimeoutMS int // 发送音视频数据的超时 + HandshakeComplexFlag bool // 握手是否使用复杂模式 } var defaultClientSessOption = ClientSessionOption{ - DoTimeoutMS: 0, - ReadAVTimeoutMS: 0, - WriteAVTimeoutMS: 0, + DoTimeoutMS: 0, + ReadAVTimeoutMS: 0, + WriteAVTimeoutMS: 0, + HandshakeComplexFlag: false, } type ModClientSessionOption func(option *ClientSessionOption) @@ -91,6 +93,13 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) fn(&option) } + var hc IHandshakeClient + if option.HandshakeComplexFlag { + hc = &HandshakeClientComplex{} + } else { + hc = &HandshakeClientSimple{} + } + s := &ClientSession{ uniqueKey: uk, t: t, @@ -104,6 +113,7 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) StartTime: time.Now().Format("2006-01-02 15:04:05.999"), }, debugLogReadUserCtrlMsgMax: 5, + hc: hc, } nazalog.Infof("[%s] lifecycle new rtmp ClientSession. session=%p", uk, s) return s diff --git a/pkg/rtprtcp/rtp_unpack_container.go b/pkg/rtprtcp/rtp_unpack_container.go index 96b8317..412500a 100644 --- a/pkg/rtprtcp/rtp_unpack_container.go +++ b/pkg/rtprtcp/rtp_unpack_container.go @@ -9,13 +9,13 @@ package rtprtcp type RTPPacketListItem struct { - packet RTPPacket - next *RTPPacketListItem + Packet RTPPacket + Next *RTPPacketListItem } type RTPPacketList struct { - head RTPPacketListItem // 哨兵,自身不存放rtp包,第一个rtp包存在在head.next中 - size int // 实际元素个数 + Head RTPPacketListItem // 哨兵,自身不存放rtp包,第一个rtp包存在在head.next中 + Size int // 实际元素个数 } type RTPUnpackContainer struct { @@ -62,14 +62,14 @@ func (r *RTPUnpackContainer) Feed(pkt RTPPacket) { } // 缓存达到最大值 - if r.list.size > r.maxSize { + if r.list.Size > r.maxSize { // 尝试合成一帧发生跳跃的帧 packed := r.tryUnpackOne() if !packed { // 合成失败了,丢弃一包过期数据 - r.list.head.next = r.list.head.next.next - r.list.size-- + r.list.Head.Next = r.list.Head.Next.Next + r.list.Size-- } else { // 合成成功了,再次尝试,尽可能多的合成顺序的帧 for { @@ -97,11 +97,11 @@ func (r *RTPUnpackContainer) isStale(seq uint16) bool { // 将rtp包按seq排序插入队列中 func (r *RTPUnpackContainer) insert(pkt RTPPacket) { - r.list.size++ + r.list.Size++ - p := &r.list.head - for ; p.next != nil; p = p.next { - res := CompareSeq(pkt.Header.Seq, p.next.packet.Header.Seq) + p := &r.list.Head + for ; p.Next != nil; p = p.Next { + res := CompareSeq(pkt.Header.Seq, p.Next.Packet.Header.Seq) switch res { case 0: return @@ -109,29 +109,29 @@ func (r *RTPUnpackContainer) insert(pkt RTPPacket) { // noop case -1: item := &RTPPacketListItem{ - packet: pkt, - next: p.next, + Packet: pkt, + Next: p.Next, } - p.next = item + p.Next = item return } } item := &RTPPacketListItem{ - packet: pkt, - next: p.next, + Packet: pkt, + Next: p.Next, } - p.next = item + p.Next = item } // 从队列头部,尝试合成一个完整的帧。保证这次合成的帧的首个seq和上次合成帧的尾部seq是连续的 func (r *RTPUnpackContainer) tryUnpackOneSequential() bool { if r.unpackedFlag { - first := r.list.head.next + first := r.list.Head.Next if first == nil { return false } - if SubSeq(first.packet.Header.Seq, r.unpackedSeq) != 1 { + if SubSeq(first.Packet.Header.Seq, r.unpackedSeq) != 1 { return false } } diff --git a/pkg/rtprtcp/rtp_unpacker_aac.go b/pkg/rtprtcp/rtp_unpacker_aac.go index c0de3b2..8cf0d0b 100644 --- a/pkg/rtprtcp/rtp_unpacker_aac.go +++ b/pkg/rtprtcp/rtp_unpacker_aac.go @@ -62,11 +62,11 @@ func (unpacker *RTPUnpackerAAC) TryUnpackOne(list *RTPPacketList) (unpackedFlag // Unit, and 0 on all other fragments. // - p := list.head.next // first + p := list.Head.Next // first if p == nil { return false, 0 } - b := p.packet.Raw[p.packet.Header.payloadOffset:] + b := p.Packet.Raw[p.Packet.Header.payloadOffset:] //nazalog.Debugf("%d, %d, %s", len(pkt.Raw), pkt.Header.timestamp, hex.Dump(b)) aus := parseAU(b) @@ -76,43 +76,43 @@ func (unpacker *RTPUnpackerAAC) TryUnpackOne(list *RTPPacketList) (unpackedFlag // one complete access unit var outPkt base.AVPacket outPkt.PayloadType = unpacker.payloadType - outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) + outPkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000) outPkt.Payload = b[aus[0].pos : aus[0].pos+aus[0].size] unpacker.onAVPacket(outPkt) - list.head.next = p.next - list.size-- - return true, p.packet.Header.Seq + list.Head.Next = p.Next + list.Size-- + return true, p.Packet.Header.Seq } // fragmented // 注意,这里我们参考size和rtp包头中的timestamp,不参考rtp包头中的mark位 totalSize := aus[0].size - timestamp := p.packet.Header.Timestamp + timestamp := p.Packet.Header.Timestamp var as [][]byte as = append(as, b[aus[0].pos:]) cacheSize := uint32(len(b[aus[0].pos:])) - seq := p.packet.Header.Seq - p = p.next + seq := p.Packet.Header.Seq + p = p.Next packetCount := 0 for { packetCount++ if p == nil { return false, 0 } - if SubSeq(p.packet.Header.Seq, seq) != 1 { + if SubSeq(p.Packet.Header.Seq, seq) != 1 { return false, 0 } - if p.packet.Header.Timestamp != timestamp { + if p.Packet.Header.Timestamp != timestamp { nazalog.Errorf("fragments of the same access shall have the same timestamp. first=%d, curr=%d", - timestamp, p.packet.Header.Timestamp) + timestamp, p.Packet.Header.Timestamp) return false, 0 } - b = p.packet.Raw[p.packet.Header.payloadOffset:] + b = p.Packet.Raw[p.Packet.Header.payloadOffset:] aus := parseAU(b) if len(aus) != 1 { nazalog.Errorf("shall be a single fragment. len(aus)=%d", len(aus)) @@ -125,22 +125,22 @@ func (unpacker *RTPUnpackerAAC) TryUnpackOne(list *RTPPacketList) (unpackedFlag } cacheSize += uint32(len(b[aus[0].pos:])) - seq = p.packet.Header.Seq + seq = p.Packet.Header.Seq as = append(as, b[aus[0].pos:]) if cacheSize < totalSize { - p = p.next + p = p.Next } else if cacheSize == totalSize { var outPkt base.AVPacket outPkt.PayloadType = unpacker.payloadType - outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) + outPkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000) for _, a := range as { outPkt.Payload = append(outPkt.Payload, a...) } unpacker.onAVPacket(outPkt) - list.head.next = p.next - list.size -= packetCount - return true, p.packet.Header.Seq + list.Head.Next = p.Next + list.Size -= packetCount + return true, p.Packet.Header.Seq } else { nazalog.Errorf("cache size bigger then total size. cacheSize=%d, totalSize=%d", cacheSize, totalSize) @@ -154,16 +154,16 @@ func (unpacker *RTPUnpackerAAC) TryUnpackOne(list *RTPPacketList) (unpackedFlag for i := range aus { var outPkt base.AVPacket outPkt.PayloadType = unpacker.payloadType - outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) + outPkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000) // TODO chef: 这里1024的含义 outPkt.Timestamp += uint32(i * (1024 * 1000) / unpacker.clockRate) outPkt.Payload = b[aus[i].pos : aus[i].pos+aus[i].size] unpacker.onAVPacket(outPkt) } - list.head.next = p.next - list.size-- - return true, p.packet.Header.Seq + list.Head.Next = p.Next + list.Size-- + return true, p.Packet.Header.Seq } type au struct { diff --git a/pkg/rtprtcp/rtp_unpacker_avc_hevc.go b/pkg/rtprtcp/rtp_unpacker_avc_hevc.go index 2083822..4eb435b 100644 --- a/pkg/rtprtcp/rtp_unpacker_avc_hevc.go +++ b/pkg/rtprtcp/rtp_unpacker_avc_hevc.go @@ -40,33 +40,33 @@ func (unpacker *RTPUnpackerAVCHEVC) CalcPositionIfNeeded(pkt *RTPPacket) { } func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedFlag bool, unpackedSeq uint16) { - first := list.head.next + first := list.Head.Next if first == nil { return false, 0 } - switch first.packet.positionType { + switch first.Packet.positionType { case PositionTypeSingle: var pkt base.AVPacket pkt.PayloadType = unpacker.payloadType - pkt.Timestamp = first.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) + pkt.Timestamp = first.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000) - pkt.Payload = make([]byte, len(first.packet.Raw)-int(first.packet.Header.payloadOffset)+4) - bele.BEPutUint32(pkt.Payload, uint32(len(first.packet.Raw))-first.packet.Header.payloadOffset) - copy(pkt.Payload[4:], first.packet.Raw[first.packet.Header.payloadOffset:]) + pkt.Payload = make([]byte, len(first.Packet.Raw)-int(first.Packet.Header.payloadOffset)+4) + bele.BEPutUint32(pkt.Payload, uint32(len(first.Packet.Raw))-first.Packet.Header.payloadOffset) + copy(pkt.Payload[4:], first.Packet.Raw[first.Packet.Header.payloadOffset:]) - list.head.next = first.next - list.size-- + list.Head.Next = first.Next + list.Size-- unpacker.onAVPacket(pkt) - return true, first.packet.Header.Seq + return true, first.Packet.Header.Seq case PositionTypeSTAPA: var pkt base.AVPacket pkt.PayloadType = unpacker.payloadType - pkt.Timestamp = first.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) + pkt.Timestamp = first.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000) // 跳过首字节,并且将多nalu前的2字节长度,替换成4字节长度 - buf := first.packet.Raw[first.packet.Header.payloadOffset+1:] + buf := first.Packet.Raw[first.Packet.Header.payloadOffset+1:] // 使用两次遍历,第一次遍历找出总大小,第二次逐个拷贝,目的是使得内存块一次就申请好,不用动态扩容造成额外性能开销 totalSize := 0 @@ -90,31 +90,31 @@ func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedF i += 2 + naluSize } - list.head.next = first.next - list.size-- + list.Head.Next = first.Next + list.Size-- unpacker.onAVPacket(pkt) - return true, first.packet.Header.Seq + return true, first.Packet.Header.Seq case PositionTypeFUAStart: prev := first - p := first.next + p := first.Next for { if prev == nil || p == nil { return false, 0 } - if SubSeq(p.packet.Header.Seq, prev.packet.Header.Seq) != 1 { + if SubSeq(p.Packet.Header.Seq, prev.Packet.Header.Seq) != 1 { return false, 0 } - if p.packet.positionType == PositionTypeFUAMiddle { + if p.Packet.positionType == PositionTypeFUAMiddle { prev = p - p = p.next + p = p.Next continue - } else if p.packet.positionType == PositionTypeFUAEnd { + } else if p.Packet.positionType == PositionTypeFUAEnd { var pkt base.AVPacket pkt.PayloadType = unpacker.payloadType - pkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000) + pkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000) var naluTypeLen int var naluType []byte @@ -122,14 +122,14 @@ func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedF naluTypeLen = 1 naluType = make([]byte, naluTypeLen) - fuIndicator := first.packet.Raw[first.packet.Header.payloadOffset] - fuHeader := first.packet.Raw[first.packet.Header.payloadOffset+1] + fuIndicator := first.Packet.Raw[first.Packet.Header.payloadOffset] + fuHeader := first.Packet.Raw[first.Packet.Header.payloadOffset+1] naluType[0] = (fuIndicator & 0xE0) | (fuHeader & 0x1F) } else { naluTypeLen = 2 naluType = make([]byte, naluTypeLen) - buf := first.packet.Raw[first.packet.Header.payloadOffset:] + buf := first.Packet.Raw[first.Packet.Header.payloadOffset:] fuType := buf[2] & 0x3f naluType[0] = (buf[0] & 0x81) | (fuType << 1) naluType[1] = buf[1] @@ -139,11 +139,11 @@ func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedF totalSize := 0 pp := first for { - totalSize += len(pp.packet.Raw) - int(pp.packet.Header.payloadOffset) - (naluTypeLen + 1) + totalSize += len(pp.Packet.Raw) - int(pp.Packet.Header.payloadOffset) - (naluTypeLen + 1) if pp == p { break } - pp = pp.next + pp = pp.Next } pkt.Payload = make([]byte, totalSize+4+naluTypeLen) @@ -160,24 +160,24 @@ func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedF packetCount := 0 pp = first for { - copy(pkt.Payload[index:], pp.packet.Raw[int(pp.packet.Header.payloadOffset)+(naluTypeLen+1):]) - index += len(pp.packet.Raw) - int(pp.packet.Header.payloadOffset) - (naluTypeLen + 1) + copy(pkt.Payload[index:], pp.Packet.Raw[int(pp.Packet.Header.payloadOffset)+(naluTypeLen+1):]) + index += len(pp.Packet.Raw) - int(pp.Packet.Header.payloadOffset) - (naluTypeLen + 1) packetCount++ if pp == p { break } - pp = pp.next + pp = pp.Next } - list.head.next = p.next - list.size -= packetCount + list.Head.Next = p.Next + list.Size -= packetCount unpacker.onAVPacket(pkt) - return true, p.packet.Header.Seq + return true, p.Packet.Header.Seq } else { // 不应该出现其他类型 - nazalog.Errorf("invalid position type. position=%d", p.packet.positionType) + nazalog.Errorf("invalid position type. position=%d", p.Packet.positionType) return false, 0 } } @@ -187,7 +187,7 @@ func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedF case PositionTypeFUAEnd: // noop default: - nazalog.Errorf("invalid position. pos=%d", first.packet.positionType) + nazalog.Errorf("invalid position. pos=%d", first.Packet.positionType) } return false, 0