Merge pull request #9 from q191201771/master

同步最新
pull/78/head
joestarzxh 4 years ago committed by GitHub
commit e5df0b4802
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,3 +1,15 @@
#### v0.22.0 (2021-05-03)
- [feat] 录制新增支持flv和mpegts文件。 录制支持列表见: https://pengrl.com/lal/#/LALServer (#14)
- [feat] h265新增支持 hls拉流hls录制http-ts拉流mpegts录制。h265支持列表见 https://pengrl.com/lal/#/LALServer (#65)
- [feat] 拉流新增支持websocket-flvwebsocket-ts。拉流协议支持列表见 https://pengrl.com/lal/#/LALServer
- [feat] hls: 支持内存切片。 (#50)
- [fix] rtmp ClientSession握手c2的发送时机由收到s0s1s2改为收到s0s1就发送解决握手失败的case。 (#42)
- [fix] rtsp h265: 转rtmp时处理错误导致无法播放
- [fix] rtsp h265: ffmpeg向lalserver推送rtsp h265报错。 (#55)
- [test] travis ci: 自动化单元测试os增加osx, windows, arch增加arm64, ppc64le, s390x。 (#59)
- [feat] rtmp ClientSession支持配置使用简单握手复杂握手 (#68)
#### v0.21.0 (2021-04-11)
- [feat] package rtmp: 支持Aggregate Message

@ -9,7 +9,7 @@
[中文文档](https://pengrl.com/lal/#/)
LAL is an audio/video live streaming broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g RTMP, RTSP(RTP/RTCP), HLS, HTTP[S]-FLV/HTTP-TS, WebSocket-FLV/TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache.
LAL is an audio/video live streaming broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g RTMP, RTSP(RTP/RTCP), HLS, HTTP[S]/WebSocket[s]-FLV/TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache.
And [more than a server, act as package and client](https://github.com/q191201771/lal#more-than-a-server-act-as-package-and-client)

@ -31,7 +31,6 @@ func main() {
url, hlsOutPath, fragmentDurationMS, fragmentNum)
hlsMuxerConfig := hls.MuxerConfig{
Enable: true,
OutPath: hlsOutPath,
FragmentDurationMS: fragmentDurationMS,
FragmentNum: fragmentNum,
@ -43,7 +42,7 @@ func main() {
}
streamName := ctx.LastItemOfPath
hlsMuexer := hls.NewMuxer(streamName, &hlsMuxerConfig, nil)
hlsMuexer := hls.NewMuxer(streamName, true, &hlsMuxerConfig, nil)
hlsMuexer.Start()
pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {

@ -28,7 +28,8 @@ func main() {
defer nazalog.Sync()
confFile := parseFlag()
logic.Entry(confFile)
logic.Init(confFile)
logic.RunLoop()
}
func parseFlag() string {

@ -0,0 +1,86 @@
```
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief", // 配置文件对应的文档说明链接,在程序中没实际用途
"conf_version": "0.2.0", // 配置文件版本号,业务方不应该手动修改,程序中会检查该版本号是否与代码中声明的一致
"rtmp": {
"enable": true, // 是否开启rtmp服务的监听
"addr": ":19350", // RTMP服务监听的端口客户端向lalserver推拉流都是这个地址
"gop_num": 2 // RTMP拉流的GOP缓存数量加速秒开
},
"default_http": { // http监听相关的默认配置如果hls, httpflv, httpts中没有单独配置以下配置项则使用default_http中的配置
// 注意hls, httpflv, httpts服务是否开启不由此处决定
"http_listen_addr": ":8080", // HTTP监听地址
"https_listen_addr": ":4433", // HTTPS监听地址
"https_cert_file": "./conf/cert.pem", // HTTPS的本地cert文件地址
"https_key_file": "./conf/key.pem" // HTTPS的本地key文件地址
},
"httpflv": {
"enable": true, // 是否开启HTTP-FLV服务的监听
"enable_https": true, // 是否开启HTTPS-FLV监听
"gop_num": 2
},
"hls": {
"enable": true, // 是否开启HLS服务的监听
"out_path": "/tmp/lal/hls/", // HLS文件保存根目录
"fragment_duration_ms": 3000, // 单个TS文件切片时长单位毫秒
"fragment_num": 6, // m3u8文件列表中ts文件的数量
"cleanup_mode": 1, // HLS文件清理模式
// 0 不删除m3u8+ts文件可用于录制等场景
// 1 在输入流结束后删除m3u8+ts文件
// 注意,确切的删除时间是推流结束后的<fragment_duration_ms> * <fragment_num> * 2的时间点
// 推迟一小段时间删除是为了避免输入流刚结束HLS的拉流端还没有拉取完
// 2 推流过程中持续删除过期的ts文件只保留最近的<fragment_num> * 2个左右的ts文件
"use_memory_as_disk_flag": false // 是否使用内存取代磁盘保存m3u8+ts文件
},
"httpts": {
"enable": true // 是否开启HTTP-TS服务的监听。注意这并不是HLS中的TS而是在一条HTTP长连接上持续性传输TS流
},
"rtsp": {
"enable": true, // 是否开启rtsp服务的监听目前只支持rtsp推流
"addr": ":5544" // rtsp推流地址
},
"record": {
"enable_flv": true, // 是否开启flv录制
"flv_out_path": "/tmp/lal/flv/", // flv录制目录
"enable_mpegts": true, // 是否开启mpegts录制。注意此处是长ts文件录制hls录制由上面的hls配置控制
"mpegts_out_path": "/tmp/lal/mpegts" // mpegts录制目录
},
"relay_push": {
"enable": false, // 是否开启中继转推功能,开启后,自身接收到的所有流都会转推出去
"addr_list":[ // 中继转推的对端地址支持填写多个地址做1对n的转推。格式举例 "127.0.0.1:19351"
]
},
"relay_pull": {
"enable": false, // 是否开启回源拉流功能,开启后,当自身接收到拉流请求,而流不存在时,会从其他服务器拉取这个流到本地
"addr": "" // 回源拉流的地址。格式举例 "127.0.0.1:19351"
},
"http_api": {
"enable": true, // 是否开启HTTP API接口
"addr": ":8083" // 监听地址
},
"server_id": "1", // 当前lalserver唯一ID。多个lalserver HTTP Notify同一个地址时可通过该ID区分
"http_notify": {
"enable": true, // 是否开启HTTP Notify事件回调
"update_interval_sec": 5, // update事件回调间隔单位毫秒
"on_server_start": "http://127.0.0.1:10101/on_server_start", // 各事件HTTP Notify事件回调地址
"on_update": "http://127.0.0.1:10101/on_update",
"on_pub_start": "http://127.0.0.1:10101/on_pub_start",
"on_pub_stop": "http://127.0.0.1:10101/on_pub_stop",
"on_sub_start": "http://127.0.0.1:10101/on_sub_start",
"on_sub_stop": "http://127.0.0.1:10101/on_sub_stop",
"on_rtmp_connect": "http://127.0.0.1:10101/on_rtmp_connect"
},
"pprof": {
"enable": true, // 是否开启Go pprof web服务的监听
"addr": ":8084" // Go pprof web地址
},
"log": {
"level": 1, // 日志级别0 trace, 1 debug, 2 info, 3 warn, 4 error, 5 fatal
"filename": "./logs/lalserver.log", // 日志输出文件
"is_to_stdout": true, // 是否打印至标志控制台输出
"is_rotate_daily": true, // 日志按天翻滚
"short_file_flag": true, // 日志末尾是否携带源码文件名以及行号的信息
"assert_behavior": 1 // 日志断言的行为1 只打印错误日志 2 打印并退出程序 3 打印并panic
}
}
```

@ -1,23 +1,25 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.2",
"conf_version": "v0.2.0",
"rtmp": {
"enable": true,
"addr": ":1935",
"gop_num": 2
},
"default_http": {
"http_listen_addr": ":8080",
"https_listen_addr": ":4433",
"https_cert_file": "./conf/cert.pem",
"https_key_file": "./conf/key.pem"
},
"httpflv": {
"enable": true,
"sub_listen_addr": ":8080",
"enable_https": false,
"https_addr": ":4433",
"https_cert_file": "./conf/cert.pem",
"https_key_file": "./conf/key.pem",
"gop_num": 2
},
"hls": {
"enable": true,
"sub_listen_addr": ":8081",
"enable_https": false,
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6,
@ -26,7 +28,7 @@
},
"httpts": {
"enable": true,
"sub_listen_addr": ":8082"
"enable_https":false
},
"rtsp": {
"enable": true,

@ -1,23 +1,25 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.2",
"conf_version": "v0.2.0",
"rtmp": {
"enable": true,
"addr": ":1935",
"gop_num": 2
},
"default_http": {
"http_listen_addr": ":8080",
"https_listen_addr": ":4433",
"https_cert_file": "./conf/cert.pem",
"https_key_file": "./conf/key.pem"
},
"httpflv": {
"enable": true,
"sub_listen_addr": ":8080",
"enable_https": false,
"https_addr": ":4433",
"https_cert_file": "./conf/cert.pem",
"https_key_file": "./conf/key.pem",
"gop_num": 2
},
"hls": {
"enable": true,
"sub_listen_addr": ":8081",
"enable_https": false,
"out_path": "/tmp/lal/hls/",
"fragment_duration_ms": 3000,
"fragment_num": 6,
@ -26,7 +28,7 @@
},
"httpts": {
"enable": true,
"sub_listen_addr": ":8082"
"enable_https":false
},
"rtsp": {
"enable": true,

@ -31,6 +31,9 @@ var ErrAVC = errors.New("lal.avc: fxxk")
var (
NALUStartCode3 = []byte{0x0, 0x0, 0x1}
NALUStartCode4 = []byte{0x0, 0x0, 0x0, 0x1}
// aud nalu
AUDNALU = []byte{0x00, 0x00, 0x00, 0x01, 0x09, 0xf0}
)
var NALUTypeMapping = map[uint8]string{

@ -17,11 +17,24 @@ const (
AVPacketPTAAC AVPacketPT = RTPPacketTypeAAC
)
// 目前供package rtsp使用。以后可能被多个package使用。
// 不排除不同package使用时字段含义也不同的情况出现。
// 不同场景使用时,字段含义可能不同。
// 使用AVPacket的地方应注明各字段的含义。
type AVPacket struct {
Timestamp uint32
PayloadType AVPacketPT
Payload []byte
}
func (a AVPacketPT) ReadableString() string {
switch a {
case AVPacketPTUnknown:
return "unknown"
case AVPacketPTAVC:
return "avc"
case AVPacketPTHEVC:
return "hevc"
case AVPacketPTAAC:
return "aac"
}
return ""
}

@ -0,0 +1,149 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import (
"crypto/tls"
"errors"
"net"
"net/http"
"reflect"
"github.com/q191201771/naza/pkg/nazaerrors"
)
// TODO(chef)
// - 考虑移入naza中
// - 考虑增加一个pattern全部未命中的mux回调
var (
ErrAddrEmpty = errors.New("lal.base: http server addr empty")
ErrMultiRegistForPattern = errors.New("lal.base: http server multiple registrations for pattern")
)
const (
NetworkTCP = "tcp"
)
type LocalAddrCtx struct {
IsHTTPS bool
Addr string
CertFile string
KeyFile string
Network string // 默认为NetworkTCP
}
type HTTPServerManager struct {
addr2ServerCtx map[string]*ServerCtx
}
type ServerCtx struct {
addrCtx LocalAddrCtx
listener net.Listener
httpServer http.Server
mux *http.ServeMux
pattern2Handler map[string]Handler
}
func NewHTTPServerManager() *HTTPServerManager {
return &HTTPServerManager{
addr2ServerCtx: make(map[string]*ServerCtx),
}
}
type Handler func(http.ResponseWriter, *http.Request)
// @param pattern 必须以`/`开始,并以`/`结束
func (s *HTTPServerManager) AddListen(addrCtx LocalAddrCtx, pattern string, handler Handler) error {
var (
ctx *ServerCtx
mux *http.ServeMux
ok bool
)
if addrCtx.Addr == "" {
return ErrAddrEmpty
}
ctx, ok = s.addr2ServerCtx[addrCtx.Addr]
if !ok {
l, err := listen(addrCtx)
if err != nil {
return err
}
mux = http.NewServeMux()
ctx = &ServerCtx{
addrCtx: addrCtx,
listener: l,
httpServer: http.Server{
Handler: mux,
},
mux: mux,
pattern2Handler: make(map[string]Handler),
}
s.addr2ServerCtx[addrCtx.Addr] = ctx
}
// 路径相同,比较回调函数是否相同
// 如果回调函数也相同,意味着重复绑定,这种情况是允许的
// 如果回调函数不同,返回错误
if prevHandler, ok := ctx.pattern2Handler[pattern]; ok {
if reflect.ValueOf(prevHandler).Pointer() == reflect.ValueOf(handler).Pointer() {
return nil
} else {
return ErrMultiRegistForPattern
}
}
ctx.pattern2Handler[pattern] = handler
ctx.mux.HandleFunc(pattern, handler)
return nil
}
func (s *HTTPServerManager) RunLoop() error {
errChan := make(chan error, len(s.addr2ServerCtx))
for _, v := range s.addr2ServerCtx {
go func(ctx *ServerCtx) {
errChan <- ctx.httpServer.Serve(ctx.listener)
_ = ctx.httpServer.Close()
}(v)
}
// 阻塞直到接到第一个error
return <-errChan
}
func (s *HTTPServerManager) Dispose() error {
var es []error
for _, v := range s.addr2ServerCtx {
err := v.httpServer.Close()
es = append(es, err)
}
return nazaerrors.CombineErrors(es...)
}
func listen(ctx LocalAddrCtx) (net.Listener, error) {
if ctx.Network == "" {
ctx.Network = NetworkTCP
}
if !ctx.IsHTTPS {
return net.Listen(ctx.Network, ctx.Addr)
}
cert, err := tls.LoadX509KeyPair(ctx.CertFile, ctx.KeyFile)
if err != nil {
return nil, err
}
tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}}
return tls.Listen(ctx.Network, ctx.Addr, tlsConfig)
}

@ -0,0 +1,49 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import (
"testing"
)
func TestHTTPServerManager(t *testing.T) {
//var err error
//
//var fnFLV = func(writer http.ResponseWriter, request *http.Request) {
// nazalog.Debugf("> fnFLV. %+v, %+v", writer, request)
// conn, bio, err := writer.(http.Hijacker).Hijack()
// if err != nil {
// nazalog.Errorf("hijack failed. err=%+v", err)
// return
// }
// if bio.Reader.Buffered() != 0 || bio.Writer.Buffered() != 0 {
// nazalog.Errorf("hijack but buffer not empty. rb=%d, wb=%d", bio.Reader.Buffered(), bio.Writer.Buffered())
// }
// nazalog.Debugf("%+v, %+v, %+v", conn, bio, err)
//}
//
//sm := NewHTTPServerManager()
//
//err = sm.AddListen(
// LocalAddrCtx{IsHTTPS: false, Addr: ":8080"},
// "/live/",
// fnFLV,
//)
//assert.Equal(t, nil, err)
//
//err = sm.AddListen(
// LocalAddrCtx{IsHTTPS: true, Addr: ":4433", CertFile: "../../conf/cert.pem", KeyFile: "../../conf/key.pem"},
// "/live/",
// fnFLV,
//)
//assert.Equal(t, nil, err)
//
//err = sm.RunLoop()
//assert.Equal(t, nil, err)
}

@ -0,0 +1,149 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import (
"net"
"strings"
"time"
"github.com/q191201771/naza/pkg/connection"
)
type HTTPSubSession struct {
HTTPSubSessionOption
suffix string
conn connection.Connection
prevConnStat connection.Stat
staleStat *connection.Stat
stat StatSession
}
type HTTPSubSessionOption struct {
Conn net.Conn
ConnModOption connection.ModOption
UK string // unique key
Protocol string
URLCtx URLContext
IsWebSocket bool
WebSocketKey string
}
func NewHTTPSubSession(option HTTPSubSessionOption) *HTTPSubSession {
s := &HTTPSubSession{
HTTPSubSessionOption: option,
conn: connection.New(option.Conn, option.ConnModOption),
stat: StatSession{
Protocol: option.Protocol,
SessionID: option.UK,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
RemoteAddr: option.Conn.RemoteAddr().String(),
},
}
return s
}
func (session *HTTPSubSession) RunLoop() error {
buf := make([]byte, 128)
_, err := session.conn.Read(buf)
return err
}
func (session *HTTPSubSession) WriteHTTPResponseHeader(b []byte) {
if session.IsWebSocket {
session.Write(UpdateWebSocketHeader(session.WebSocketKey))
} else {
session.Write(b)
}
}
func (session *HTTPSubSession) Write(b []byte) {
if session.IsWebSocket {
wsHeader := WSHeader{
Fin: true,
Rsv1: false,
Rsv2: false,
Rsv3: false,
Opcode: WSO_Binary,
PayloadLength: uint64(len(b)),
Masked: false,
}
session.write(MakeWSFrameHeader(wsHeader))
}
session.write(b)
}
func (session *HTTPSubSession) Dispose() error {
return session.conn.Close()
}
func (session *HTTPSubSession) URL() string {
return session.URLCtx.URL
}
func (session *HTTPSubSession) AppName() string {
return session.URLCtx.PathWithoutLastItem
}
func (session *HTTPSubSession) StreamName() string {
var suffix string
switch session.Protocol {
case ProtocolHTTPFLV:
suffix = ".flv"
case ProtocolHTTPTS:
suffix = ".ts"
default:
Logger.Warnf("[%s] acquire stream name but protocol unknown.", session.UK)
}
return strings.TrimSuffix(session.URLCtx.LastItemOfPath, suffix)
}
func (session *HTTPSubSession) RawQuery() string {
return session.URLCtx.RawQuery
}
func (session *HTTPSubSession) UniqueKey() string {
return session.UK
}
func (session *HTTPSubSession) GetStat() StatSession {
currStat := session.conn.GetStat()
session.stat.ReadBytesSum = currStat.ReadBytesSum
session.stat.WroteBytesSum = currStat.WroteBytesSum
return session.stat
}
func (session *HTTPSubSession) UpdateStat(intervalSec uint32) {
currStat := session.conn.GetStat()
rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum
session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
wDiff := currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum
session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec))
session.stat.Bitrate = session.stat.WriteBitrate
session.prevConnStat = currStat
}
func (session *HTTPSubSession) IsAlive() (readAlive, writeAlive bool) {
currStat := session.conn.GetStat()
if session.staleStat == nil {
session.staleStat = new(connection.Stat)
*session.staleStat = currStat
return true, true
}
readAlive = !(currStat.ReadBytesSum-session.staleStat.ReadBytesSum == 0)
writeAlive = !(currStat.WroteBytesSum-session.staleStat.WroteBytesSum == 0)
*session.staleStat = currStat
return
}
func (session *HTTPSubSession) write(b []byte) {
_, _ = session.conn.Write(b)
}

@ -62,7 +62,7 @@ const (
// AACAUDIODATA
// AACPacketType UI8
// Data UI8[n]
RTMPSoundFormatAAC uint8 = 10
RTMPSoundFormatAAC uint8 = 10 // 注意视频的CodecID是后4位音频是前4位
RTMPAACPacketTypeSeqHeader = 0
RTMPAACPacketTypeRaw = 1
)
@ -72,7 +72,7 @@ type RTMPHeader struct {
MsgLen uint32 // 不包含header的大小
MsgTypeID uint8 // 8 audio 9 video 18 metadata
MsgStreamID int
TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳
TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳,单位毫秒
}
type RTMPMsg struct {

@ -134,11 +134,11 @@ func ParseRTMPURL(rawURL string) (ctx URLContext, err error) {
}
func ParseHTTPFLVURL(rawURL string, isHTTPS bool) (ctx URLContext, err error) {
return parsehttpURL(rawURL, isHTTPS, ".flv")
return ParseHTTPURL(rawURL, isHTTPS, ".flv")
}
func ParseHTTPTSURL(rawURL string, isHTTPS bool) (ctx URLContext, err error) {
return parsehttpURL(rawURL, isHTTPS, ".ts")
return ParseHTTPURL(rawURL, isHTTPS, ".ts")
}
func ParseRTSPURL(rawURL string) (ctx URLContext, err error) {
@ -184,7 +184,7 @@ func parseURLPath(stdURL *url.URL) (ctx URLPathContext, err error) {
return ctx, nil
}
func parsehttpURL(rawURL string, isHTTPS bool, suffix string) (ctx URLContext, err error) {
func ParseHTTPURL(rawURL string, isHTTPS bool, suffix string) (ctx URLContext, err error) {
var defaultPort int
if isHTTPS {
defaultPort = DefaultHTTPSPort

@ -0,0 +1,15 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import "github.com/q191201771/naza/pkg/nazalog"
var (
Logger = nazalog.GetGlobalLogger()
)

@ -16,7 +16,7 @@ import "strings"
// 并且将这些信息打入可执行文件、日志、各协议中的标准版本字段中
// 版本,该变量由外部脚本修改维护
const LALVersion = "v0.21.0"
const LALVersion = "v0.22.0"
var (
LALLibraryName = "lal"

@ -33,23 +33,37 @@ var ErrHEVC = errors.New("lal.hevc: fxxk")
var (
NALUStartCode4 = []byte{0x0, 0x0, 0x0, 0x1}
// aud nalu
AUDNALU = []byte{0x00, 0x00, 0x00, 0x01, 0x46, 0x01, 0x10}
)
var NALUTypeMapping = map[uint8]string{
NALUTypeSliceTrailR: "SLICE",
NALUTypeSliceIDR: "I",
NALUTypeSliceIDRNLP: "IDR",
NALUTypeSliceTrailN: "TrailN",
NALUTypeSliceTrailR: "TrailR",
NALUTypeSliceIDR: "IDR",
NALUTypeSliceIDRNLP: "IDRNLP",
NALUTypeSliceCRANUT: "CRANUT",
NALUTypeVPS: "VPS",
NALUTypeSPS: "SPS",
NALUTypePPS: "PPS",
NALUTypeAUD: "AUD",
NALUTypeSEI: "SEI",
NALUTypeSEISuffix: "SEI",
NALUTypeSEISuffix: "SEISuffix",
}
// ISO_IEC_23008-2_2013.pdf
// Table 7-1 NAL unit type codes and NAL unit type classes
var (
NALUTypeSliceTrailN uint8 = 0 // 0x0
NALUTypeSliceTrailR uint8 = 1 // 0x01
NALUTypeSliceIDR uint8 = 19 // 0x13
NALUTypeSliceIDRNLP uint8 = 20 // 0x14
NALUTypeSliceCRANUT uint8 = 21 // 0x15
NALUTypeVPS uint8 = 32 // 0x20
NALUTypeSPS uint8 = 33 // 0x21
NALUTypePPS uint8 = 34 // 0x22
NALUTypeAUD uint8 = 35 // 0x23
NALUTypeSEI uint8 = 39 // 0x27
NALUTypeSEISuffix uint8 = 40 // 0x28
)

@ -10,8 +10,6 @@ package hls
import (
"github.com/q191201771/naza/pkg/filesystemlayer"
"github.com/q191201771/lal/pkg/mpegts"
)
type Fragment struct {
@ -23,7 +21,6 @@ func (f *Fragment) OpenFile(filename string) (err error) {
if err != nil {
return
}
err = f.WriteFile(mpegts.FixedFragmentHeader)
return
}

@ -33,10 +33,6 @@ import (
var ErrHLS = errors.New("lal.hls: fxxk")
var audNal = []byte{
0x00, 0x00, 0x00, 0x01, 0x09, 0xf0,
}
const (
// TODO chef 这些在配置项中提供
negMaxfraglen uint64 = 1000 * 90 // 当前包时间戳回滚了比当前fragment的首个时间戳还小强制切割新的fragment单位毫秒*90

@ -24,6 +24,8 @@ import (
// 后续从架构上考虑packet hls,mpegts,logic的分工
type MuxerObserver interface {
OnPATPMT(b []byte)
// @param rawFrame TS流回调结束后内部不再使用该内存块
// @param boundary 新的TS流接收者应该从该标志为true时开始发送数据
//
@ -31,7 +33,6 @@ type MuxerObserver interface {
}
type MuxerConfig struct {
Enable bool `json:"enable"` // 如果false说明hls功能没开也即不写文件但是MuxerObserver依然会回调
OutPath string `json:"out_path"` // m3u8和ts文件的输出根目录注意末尾需以'/'结束
FragmentDurationMS int `json:"fragment_duration_ms"`
FragmentNum int `json:"fragment_num"`
@ -52,6 +53,7 @@ const (
CleanupModeASAP = 2
)
// 输入rtmp流转出hls(m3u8+ts)至文件中并回调给上层转出ts流
type Muxer struct {
UniqueKey string
@ -63,6 +65,7 @@ type Muxer struct {
recordPlayListFilenameBak string // const after init
config *MuxerConfig
enable bool
observer MuxerObserver
fragment Fragment
@ -77,6 +80,7 @@ type Muxer struct {
recordMaxFragDuration float64
streamer *Streamer
patpmt []byte
}
// 记录fragment的一些信息注意写m3u8文件时可能还需要用到历史fragment的信息
@ -87,8 +91,9 @@ type fragmentInfo struct {
filename string
}
// @param enable 如果false说明hls功能没开也即不写文件但是MuxerObserver依然会回调
// @param observer 可以为nil如果不为nilTS流将回调给上层
func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *Muxer {
func NewMuxer(streamName string, enable bool, config *MuxerConfig, observer MuxerObserver) *Muxer {
uk := base.GenUKHLSMuxer()
op := getMuxerOutPath(config.OutPath, streamName)
playlistFilename := getM3U8Filename(op, streamName)
@ -104,6 +109,7 @@ func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *M
playlistFilenameBak: playlistFilenameBak,
recordPlayListFilename: recordPlaylistFilename,
recordPlayListFilenameBak: recordPlaylistFilenameBak,
enable: enable,
config: config,
observer: observer,
frags: frags,
@ -133,6 +139,11 @@ func (m *Muxer) FeedRTMPMessage(msg base.RTMPMsg) {
m.streamer.FeedRTMPMessage(msg)
}
func (m *Muxer) OnPATPMT(b []byte) {
m.patpmt = b
m.observer.OnPATPMT(b)
}
func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) {
var boundary bool
var packets []byte
@ -173,7 +184,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame) {
}
mpegts.PackTSPacket(frame, func(packet []byte) {
if m.config.Enable {
if m.enable {
if err := m.fragment.WriteFile(packet); err != nil {
nazalog.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err)
return
@ -204,7 +215,13 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool) error {
if m.opened {
f := m.getCurrFrag()
// 当前时间戳跳跃很大或者是往回跳跃超过了阈值强制开启新的fragment
// 以下情况,强制开启新的分片:
// 1. 当前时间戳 - 当前分片的初始时间戳 > 配置中单个ts分片时长的10倍
// 原因可能是:
// 1. 当前包的时间戳发生了大的跳跃
// 2. 一直没有I帧导致没有合适的时间重新切片堆积的包达到阈值
// 2. 往回跳跃超过了阈值
//
maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10)
if (ts > m.fragTS && ts-m.fragTS > maxfraglen) || (m.fragTS > ts && m.fragTS-ts > negMaxfraglen) {
nazalog.Warnf("[%s] force fragment split. fragTS=%d, ts=%d", m.UniqueKey, m.fragTS, ts)
@ -240,6 +257,9 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool) error {
}
// 开启新的fragment
// 此时的情况是上层认为是合适的开启分片的时机比如是I帧并且
// 1. 当前是第一个分片
// 2. 当前不是第一个分片,但是上一个分片已经达到配置时长
if boundary {
if err := m.closeFragment(false); err != nil {
return err
@ -263,10 +283,13 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error {
filename := getTSFilename(m.streamName, id, int(time.Now().Unix()))
filenameWithPath := getTSFilenameWithPath(m.outPath, filename)
if m.config.Enable {
if m.enable {
if err := m.fragment.OpenFile(filenameWithPath); err != nil {
return err
}
if err := m.fragment.WriteFile(m.patpmt); err != nil {
return err
}
}
m.opened = true
@ -290,7 +313,7 @@ func (m *Muxer) closeFragment(isLast bool) error {
return nil
}
if m.config.Enable {
if m.enable {
if err := m.fragment.CloseFile(); err != nil {
return err
}
@ -325,7 +348,7 @@ func (m *Muxer) closeFragment(isLast bool) error {
}
func (m *Muxer) writeRecordPlaylist(isLast bool) {
if !m.config.Enable {
if !m.enable {
return
}
@ -380,7 +403,7 @@ func (m *Muxer) writeRecordPlaylist(isLast bool) {
}
func (m *Muxer) writePlaylist(isLast bool) {
if !m.config.Enable {
if !m.enable {
return
}
@ -421,7 +444,7 @@ func (m *Muxer) writePlaylist(isLast bool) {
}
func (m *Muxer) ensureDir() {
if !m.config.Enable {
if !m.enable {
return
}
//err := fslCtx.RemoveAll(m.outPath)

@ -0,0 +1,91 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/mpegts"
)
// 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式
// 一旦判断结束,该队列变成直进直出,不再有实际缓存
type Queue struct {
maxMsgSize int
data []base.RTMPMsg
observer IQueueObserver
audioCodecID int
videoCodecID int
done bool
}
type IQueueObserver interface {
// 该回调一定发生在数据回调之前
// TODO(chef) 这里可以考虑换成只通知drain由上层完成FragmentHeader的组装逻辑
OnPATPMT(b []byte)
OnPop(msg base.RTMPMsg)
}
// @param maxMsgSize 最大缓存多少个包
func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue {
return &Queue{
maxMsgSize: maxMsgSize,
data: make([]base.RTMPMsg, maxMsgSize)[0:0],
observer: observer,
audioCodecID: -1,
videoCodecID: -1,
done: false,
}
}
// @param msg 函数调用结束后,内部不持有该内存块
func (q *Queue) Push(msg base.RTMPMsg) {
if q.done {
q.observer.OnPop(msg)
return
}
q.data = append(q.data, msg.Clone())
switch msg.Header.MsgTypeID {
case base.RTMPTypeIDAudio:
q.audioCodecID = int(msg.Payload[0] >> 4)
case base.RTMPTypeIDVideo:
q.videoCodecID = int(msg.Payload[0] & 0xF)
}
if q.videoCodecID != -1 && q.audioCodecID != -1 {
q.drain()
return
}
if len(q.data) >= q.maxMsgSize {
q.drain()
return
}
}
func (q *Queue) drain() {
switch q.videoCodecID {
case int(base.RTMPCodecIDAVC):
q.observer.OnPATPMT(mpegts.FixedFragmentHeader)
case int(base.RTMPCodecIDHEVC):
q.observer.OnPATPMT(mpegts.FixedFragmentHeaderHEVC)
default:
// TODO(chef) 正确处理只有音频或只有视频的情况 #56
q.observer.OnPATPMT(mpegts.FixedFragmentHeader)
}
for i := range q.data {
q.observer.OnPop(q.data[i])
}
q.data = nil
q.done = true
}

@ -0,0 +1,58 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
import (
"testing"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/naza/pkg/assert"
)
var (
fh []byte
poped []base.RTMPMsg
)
type qo struct {
}
func (q *qo) OnPATPMT(b []byte) {
fh = b
}
func (q *qo) OnPop(msg base.RTMPMsg) {
poped = append(poped, msg)
}
func TestQueue(t *testing.T) {
goldenRTMPMsg := []base.RTMPMsg{
{
Header: base.RTMPHeader{
MsgTypeID: base.RTMPTypeIDAudio,
},
Payload: []byte{0xAF},
},
{
Header: base.RTMPHeader{
MsgTypeID: base.RTMPTypeIDVideo,
},
Payload: []byte{0x17},
},
}
q := &qo{}
queue := NewQueue(8, q)
for i := range goldenRTMPMsg {
queue.Push(goldenRTMPMsg[i])
}
assert.Equal(t, mpegts.FixedFragmentHeader, fh)
assert.Equal(t, goldenRTMPMsg, poped)
}

@ -9,7 +9,6 @@
package hls
import (
"net"
"net/http"
"github.com/q191201771/lal/pkg/base"
@ -17,40 +16,40 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
type Server struct {
addr string
type ServerHandler struct {
outPath string
ln net.Listener
httpSrv *http.Server
//addr string
//ln net.Listener
//httpSrv *http.Server
}
func NewServer(addr string, outPath string) *Server {
return &Server{
addr: addr,
func NewServerHandler(outPath string) *ServerHandler {
return &ServerHandler{
outPath: outPath,
}
}
func (s *Server) Listen() (err error) {
if s.ln, err = net.Listen("tcp", s.addr); err != nil {
return
}
s.httpSrv = &http.Server{Addr: s.addr, Handler: s}
nazalog.Infof("start hls server listen. addr=%s", s.addr)
return
}
func (s *Server) RunLoop() error {
return s.httpSrv.Serve(s.ln)
}
func (s *Server) Dispose() {
if err := s.httpSrv.Close(); err != nil {
nazalog.Error(err)
}
}
//
//func (s *Server) Listen() (err error) {
// if s.ln, err = net.Listen("tcp", s.addr); err != nil {
// return
// }
// s.httpSrv = &http.Server{Addr: s.addr, Handler: s}
// nazalog.Infof("start hls server listen. addr=%s", s.addr)
// return
//}
//
//func (s *Server) RunLoop() error {
// return s.httpSrv.Serve(s.ln)
//}
//
//func (s *Server) Dispose() {
// if err := s.httpSrv.Close(); err != nil {
// nazalog.Error(err)
// }
//}
func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
func (s *ServerHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
//nazalog.Debugf("%+v", req)
// TODO chef:

@ -12,12 +12,16 @@ import (
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
type StreamerObserver interface {
// @param b const只读内存块上层可以持有但是不允许修改
OnPATPMT(b []byte)
// @param streamer: 供上层获取streamer内部的一些状态比如spspps是否已缓存音频缓存队列是否有数据等
//
// @param frame: 各字段含义见mpegts.Frame结构体定义
@ -27,12 +31,14 @@ type StreamerObserver interface {
OnFrame(streamer *Streamer, frame *mpegts.Frame)
}
// 输入rtmp流回调转封装成AnnexB格式的流
type Streamer struct {
UniqueKey string
observer StreamerObserver
calcFragmentHeaderQueue *Queue
videoOut []byte // AnnexB TODO chef: 优化这块buff
spspps []byte // AnnexB
spspps []byte // AnnexB 也可能是vps+sps+pps
adts aac.ADTS
audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 TODO chef: 优化这块buff
audioCacheFirstFramePTS uint64 // audioCacheFrames中第一个音频帧的时间戳 TODO chef: rename to DTS
@ -44,17 +50,27 @@ func NewStreamer(observer StreamerObserver) *Streamer {
uk := base.GenUKStreamer()
videoOut := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
return &Streamer{
streamer := &Streamer{
UniqueKey: uk,
observer: observer,
videoOut: videoOut,
}
streamer.calcFragmentHeaderQueue = NewQueue(calcFragmentHeaderQueueSize, streamer)
return streamer
}
// @param msg msg.Payload 调用结束后,函数内部不会持有这块内存
//
// TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接
func (s *Streamer) FeedRTMPMessage(msg base.RTMPMsg) {
s.calcFragmentHeaderQueue.Push(msg)
}
func (s *Streamer) OnPATPMT(b []byte) {
s.observer.OnPATPMT(b)
}
func (s *Streamer) OnPop(msg base.RTMPMsg) {
switch msg.Header.MsgTypeID {
case base.RTMPTypeIDAudio:
s.feedAudio(msg)
@ -80,21 +96,25 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) {
nazalog.Errorf("[%s] invalid video message length. len=%d", s.UniqueKey, len(msg.Payload))
return
}
if msg.Payload[0]&0xF != base.RTMPCodecIDAVC {
codecID := msg.Payload[0] & 0xF
if codecID != base.RTMPCodecIDAVC && codecID != base.RTMPCodecIDHEVC {
return
}
ftype := msg.Payload[0] & 0xF0 >> 4
htype := msg.Payload[1]
// 将数据转换成AnnexB
// 如果是sps pps缓存住然后直接返回
if ftype == base.RTMPFrameTypeKey && htype == base.RTMPAVCPacketTypeSeqHeader {
if err := s.cacheSPSPPS(msg); err != nil {
var err error
if msg.IsAVCKeySeqHeader() {
if s.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload); err != nil {
nazalog.Errorf("[%s] cache spspps failed. err=%+v", s.UniqueKey, err)
}
return
} else if msg.IsHEVCKeySeqHeader() {
if s.spspps, err = hevc.VPSSPSPPSSeqHeader2AnnexB(msg.Payload); err != nil {
nazalog.Errorf("[%s] cache vpsspspps failed. err=%+v", s.UniqueKey, err)
}
return
}
cts := bele.BEUint24(msg.Payload[2:])
@ -117,46 +137,71 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) {
return
}
nalType := avc.ParseNALUType(msg.Payload[i])
var nalType uint8
switch codecID {
case base.RTMPCodecIDAVC:
nalType = avc.ParseNALUType(msg.Payload[i])
case base.RTMPCodecIDHEVC:
nalType = hevc.ParseNALUType(msg.Payload[i])
}
//nazalog.Debugf("[%s] hls: h264 NAL type=%d, len=%d(%d) cts=%d.", s.UniqueKey, nalType, nalBytes, len(msg.Payload), cts)
//nazalog.Debugf("[%s] naltype=%d, len=%d(%d), cts=%d, key=%t.", s.UniqueKey, nalType, nalBytes, len(msg.Payload), cts, msg.IsVideoKeyNALU())
// sps pps前面已经缓存过了这里就不用处理了
// aud有自己的生产逻辑原流中的aud直接过滤掉
if nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD {
// 过滤掉原流中的sps pps aud
// sps pps前面已经缓存过了后面有自己的写入逻辑
// aud有自己的写入逻辑
if (codecID == base.RTMPCodecIDAVC && (nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD)) ||
(codecID == base.RTMPCodecIDHEVC && (nalType == hevc.NALUTypeVPS || nalType == hevc.NALUTypeSPS || nalType == hevc.NALUTypePPS || nalType == hevc.NALUTypeAUD)) {
i += nalBytes
continue
}
// tag中的首个nalu前面写入aud
if !audSent {
switch nalType {
case avc.NALUTypeSlice, avc.NALUTypeIDRSlice, avc.NALUTypeSEI:
// 在前面写入aud
out = append(out, audNal...)
audSent = true
//case avc.NALUTypeAUD:
// // 上面aud已经continue跳过了应该进不到这个分支可以考虑删除这个分支代码
// audSent = true
// 注意因为前面已经过滤了sps pps aud的信息所以这里可以认为都是需要用aud分隔的不需要单独判断了
//if codecID == base.RTMPCodecIDAVC && (nalType == avc.NALUTypeSEI || nalType == avc.NALUTypeIDRSlice || nalType == avc.NALUTypeSlice) {
switch codecID {
case base.RTMPCodecIDAVC:
out = append(out, avc.AUDNALU...)
case base.RTMPCodecIDHEVC:
out = append(out, hevc.AUDNALU...)
}
audSent = true
}
switch nalType {
case avc.NALUTypeSlice:
spsppsSent = false
case avc.NALUTypeIDRSlice:
// 如果是首个关键帧在前面写入sps pps
if !spsppsSent {
var err error
out, err = s.appendSPSPPS(out)
if err != nil {
nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
return
// 关键帧前追加sps pps
if codecID == base.RTMPCodecIDAVC {
// h264的逻辑一个tag中多个连续的关键帧只追加一个不连续则每个关键帧前都追加。为什么要这样处理
switch nalType {
case avc.NALUTypeIDRSlice:
if !spsppsSent {
if out, err = s.appendSPSPPS(out); err != nil {
nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
return
}
}
spsppsSent = true
case avc.NALUTypeSlice:
// 这里只有P帧没有SEI。为什么要这样处理
spsppsSent = false
}
} else {
switch nalType {
case hevc.NALUTypeSliceIDR, hevc.NALUTypeSliceIDRNLP, hevc.NALUTypeSliceCRANUT:
if !spsppsSent {
if out, err = s.appendSPSPPS(out); err != nil {
nazalog.Warnf("[%s] append spspps by not exist.", s.UniqueKey)
return
}
}
spsppsSent = true
default:
// 这里简化了,只要不是关键帧,就刷新标志
spsppsSent = false
}
spsppsSent = true
}
// 如果写入了aud或spspps则用start code3否则start code4。为什么要这样处理
// 这里不知为什么要区分写入两种类型的start code
if len(out) == 0 {
out = append(out, avc.NALUStartCode4...)
@ -169,7 +214,6 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) {
i += nalBytes
}
key := ftype == base.RTMPFrameTypeKey && htype == base.RTMPAVCPacketTypeNALU
dts := uint64(msg.Header.TimestampAbs) * 90
if s.audioCacheFrames != nil && s.audioCacheFirstFramePTS+maxAudioCacheDelayByVideo < dts {
@ -180,7 +224,7 @@ func (s *Streamer) feedVideo(msg base.RTMPMsg) {
frame.CC = s.videoCC
frame.DTS = dts
frame.PTS = frame.DTS + uint64(cts)*90
frame.Key = key
frame.Key = msg.IsVideoKeyNALU()
frame.Raw = out
frame.Pid = mpegts.PidVideo
frame.Sid = mpegts.StreamIDVideo
@ -254,12 +298,6 @@ func (s *Streamer) cacheAACSeqHeader(msg base.RTMPMsg) error {
return s.adts.InitWithAACAudioSpecificConfig(msg.Payload[2:])
}
func (s *Streamer) cacheSPSPPS(msg base.RTMPMsg) error {
var err error
s.spspps, err = avc.SPSPPSSeqHeader2AnnexB(msg.Payload)
return err
}
func (s *Streamer) appendSPSPPS(out []byte) ([]byte, error) {
if s.spspps == nil {
return out, ErrHLS

@ -0,0 +1,13 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package hls
var (
calcFragmentHeaderQueueSize = 16
)

@ -1,143 +0,0 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpflv
import (
"crypto/tls"
"net"
"sync"
"github.com/q191201771/naza/pkg/nazalog"
)
type ServerObserver interface {
// 通知上层有新的拉流者
// 返回值: true则允许拉流false则关闭连接
OnNewHTTPFLVSubSession(session *SubSession) bool
OnDelHTTPFLVSubSession(session *SubSession)
}
type ServerConfig struct {
Enable bool `json:"enable"`
SubListenAddr string `json:"sub_listen_addr"`
EnableHTTPS bool `json:"enable_https"`
HTTPSAddr string `json:"https_addr"`
HTTPSCertFile string `json:"https_cert_file"`
HTTPSKeyFile string `json:"https_key_file"`
}
type Server struct {
observer ServerObserver
config ServerConfig
ln net.Listener
httpsLn net.Listener
}
// TODO chef: 监听太难看了考虑直接传入Listener对象或直接路由进来使得不同server可以共用端口
func NewServer(observer ServerObserver, config ServerConfig) *Server {
return &Server{
observer: observer,
config: config,
}
}
func (server *Server) Listen() (err error) {
if server.config.Enable {
if server.ln, err = net.Listen("tcp", server.config.SubListenAddr); err != nil {
return
}
nazalog.Infof("start httpflv server listen. addr=%s", server.config.SubListenAddr)
}
if server.config.EnableHTTPS {
var cert tls.Certificate
cert, err = tls.LoadX509KeyPair(server.config.HTTPSCertFile, server.config.HTTPSKeyFile)
if err != nil {
return err
}
tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}}
if server.httpsLn, err = tls.Listen("tcp", server.config.HTTPSAddr, tlsConfig); err != nil {
return
}
nazalog.Infof("start httpsflv server listen. addr=%s", server.config.HTTPSAddr)
}
return
}
func (server *Server) RunLoop() error {
var wg sync.WaitGroup
// TODO chef: 临时这么搞,错误值丢失了,重构一下
if server.ln != nil {
wg.Add(1)
go func() {
for {
conn, err := server.ln.Accept()
if err != nil {
break
}
go server.handleConnect(conn, "http")
}
wg.Done()
}()
}
if server.httpsLn != nil {
wg.Add(1)
go func() {
for {
conn, err := server.httpsLn.Accept()
if err != nil {
break
}
go server.handleConnect(conn, "https")
}
wg.Done()
}()
}
wg.Wait()
return nil
}
func (server *Server) Dispose() {
if server.ln != nil {
if err := server.ln.Close(); err != nil {
nazalog.Error(err)
}
}
if server.httpsLn != nil {
if err := server.httpsLn.Close(); err != nil {
nazalog.Error(err)
}
}
}
func (server *Server) handleConnect(conn net.Conn, scheme string) {
nazalog.Infof("accept a httpflv connection. remoteAddr=%s", conn.RemoteAddr().String())
session := NewSubSession(conn, scheme)
if err := session.ReadRequest(); err != nil {
nazalog.Errorf("[%s] read httpflv SubSession request error. err=%v", session.uniqueKey, err)
return
}
nazalog.Debugf("[%s] < read http request. url=%s", session.uniqueKey, session.URL())
if !server.observer.OnNewHTTPFLVSubSession(session) {
session.Dispose()
}
err := session.RunLoop()
nazalog.Debugf("[%s] httpflv sub session loop done. err=%v", session.uniqueKey, err)
server.observer.OnDelHTTPFLVSubSession(session)
}

@ -9,15 +9,10 @@
package httpflv
import (
"fmt"
"net"
"strings"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazalog"
@ -26,171 +21,48 @@ import (
var flvHTTPResponseHeader []byte
type SubSession struct {
uniqueKey string
IsFresh bool
scheme string
pathWithRawQuery string
headers map[string]string
urlCtx base.URLContext
conn connection.Connection
prevConnStat connection.Stat
staleStat *connection.Stat
stat base.StatSession
isWebSocket bool
*base.HTTPSubSession // 直接使用它提供的函数
IsFresh bool
}
func NewSubSession(conn net.Conn, scheme string) *SubSession {
func NewSubSession(conn net.Conn, urlCtx base.URLContext, isWebSocket bool, websocketKey string) *SubSession {
uk := base.GenUKFLVSubSession()
s := &SubSession{
uniqueKey: uk,
scheme: scheme,
IsFresh: true,
conn: connection.New(conn, func(option *connection.Option) {
option.ReadBufSize = readBufSize
option.WriteChanSize = wChanSize
option.WriteTimeoutMS = subSessionWriteTimeoutMS
base.NewHTTPSubSession(base.HTTPSubSessionOption{
Conn: conn,
ConnModOption: func(option *connection.Option) {
option.WriteChanSize = SubSessionWriteChanSize
option.WriteTimeoutMS = SubSessionWriteTimeoutMS
},
UK: uk,
Protocol: base.ProtocolHTTPFLV,
URLCtx: urlCtx,
IsWebSocket: isWebSocket,
WebSocketKey: websocketKey,
}),
stat: base.StatSession{
Protocol: base.ProtocolHTTPFLV,
SessionID: uk,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
RemoteAddr: conn.RemoteAddr().String(),
},
true,
}
nazalog.Infof("[%s] lifecycle new httpflv SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String())
return s
}
// TODO chef: read request timeout
func (session *SubSession) ReadRequest() (err error) {
defer func() {
if err != nil {
session.Dispose()
}
}()
var requestLine string
if requestLine, session.headers, err = nazahttp.ReadHTTPHeader(session.conn); err != nil {
return
}
if _, session.pathWithRawQuery, _, err = nazahttp.ParseHTTPRequestLine(requestLine); err != nil {
return
}
rawURL := fmt.Sprintf("%s://%s%s", session.scheme, session.headers["Host"], session.pathWithRawQuery)
_ = rawURL
session.urlCtx, err = base.ParseHTTPFLVURL(rawURL, session.scheme == "https")
if session.headers["Connection"] == "Upgrade" && session.headers["Upgrade"] == "websocket" {
session.isWebSocket = true
//回复升级为websocket
session.writeRawPacket(base.UpdateWebSocketHeader(session.headers["Sec-WebSocket-Key"]))
}
return
}
func (session *SubSession) RunLoop() error {
buf := make([]byte, 128)
_, err := session.conn.Read(buf)
return err
}
func (session *SubSession) WriteHTTPResponseHeader() {
nazalog.Debugf("[%s] > W http response header.", session.uniqueKey)
if session.isWebSocket {
} else {
session.WriteRawPacket(flvHTTPResponseHeader)
}
nazalog.Debugf("[%s] > W http response header.", session.UniqueKey())
session.HTTPSubSession.WriteHTTPResponseHeader(flvHTTPResponseHeader)
}
func (session *SubSession) WriteFLVHeader() {
nazalog.Debugf("[%s] > W http flv header.", session.uniqueKey)
session.WriteRawPacket(FLVHeader)
nazalog.Debugf("[%s] > W http flv header.", session.UniqueKey())
session.Write(FLVHeader)
}
func (session *SubSession) WriteTag(tag *Tag) {
session.WriteRawPacket(tag.Raw)
}
func (session *SubSession) WriteRawPacket(pkt []byte) {
if session.isWebSocket {
wsHeader := base.WSHeader{
Fin: true,
Rsv1: false,
Rsv2: false,
Rsv3: false,
Opcode: base.WSO_Binary,
PayloadLength: uint64(len(pkt)),
Masked: false,
}
session.writeRawPacket(base.MakeWSFrameHeader(wsHeader))
}
session.writeRawPacket(pkt)
}
func (session *SubSession) writeRawPacket(pkt []byte) {
_, _ = session.conn.Write(pkt)
session.Write(tag.Raw)
}
func (session *SubSession) Dispose() error {
nazalog.Infof("[%s] lifecycle dispose httpflv SubSession.", session.uniqueKey)
return session.conn.Close()
}
func (session *SubSession) URL() string {
return session.urlCtx.URL
}
func (session *SubSession) AppName() string {
return session.urlCtx.PathWithoutLastItem
}
func (session *SubSession) StreamName() string {
return strings.TrimSuffix(session.urlCtx.LastItemOfPath, ".flv")
}
func (session *SubSession) RawQuery() string {
return session.urlCtx.RawQuery
}
func (session *SubSession) UniqueKey() string {
return session.uniqueKey
}
func (session *SubSession) GetStat() base.StatSession {
currStat := session.conn.GetStat()
session.stat.ReadBytesSum = currStat.ReadBytesSum
session.stat.WroteBytesSum = currStat.WroteBytesSum
return session.stat
}
func (session *SubSession) UpdateStat(intervalSec uint32) {
currStat := session.conn.GetStat()
rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum
session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
wDiff := currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum
session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec))
session.stat.Bitrate = session.stat.WriteBitrate
session.prevConnStat = currStat
}
func (session *SubSession) IsAlive() (readAlive, writeAlive bool) {
currStat := session.conn.GetStat()
if session.staleStat == nil {
session.staleStat = new(connection.Stat)
*session.staleStat = currStat
return true, true
}
readAlive = !(currStat.ReadBytesSum-session.staleStat.ReadBytesSum == 0)
writeAlive = !(currStat.WroteBytesSum-session.staleStat.WroteBytesSum == 0)
*session.staleStat = currStat
return
nazalog.Infof("[%s] lifecycle dispose httpflv SubSession.", session.UniqueKey())
return session.HTTPSubSession.Dispose()
}
func init() {

@ -8,8 +8,10 @@
package httpflv
var readBufSize = 256 //16384 // ClientPullSession 和 SubSession 读取数据时
var wChanSize = 1024 // SubSession 发送数据时 channel 的大小
var subSessionWriteTimeoutMS = 10000
var (
SubSessionWriteChanSize = 1024 // SubSession发送数据时channel的大小
SubSessionWriteTimeoutMS = 10000
FLVHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00}
)
var FLVHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00}
var readBufSize = 256 //16384 // ClientPullSession读取数据时

@ -1,81 +0,0 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package httpts
import (
"net"
log "github.com/q191201771/naza/pkg/nazalog"
)
type ServerObserver interface {
// 通知上层有新的拉流者
// 返回值: true则允许拉流false则关闭连接
OnNewHTTPTSSubSession(session *SubSession) bool
OnDelHTTPTSSubSession(session *SubSession)
}
type Server struct {
observer ServerObserver
addr string
ln net.Listener
}
func NewServer(observer ServerObserver, addr string) *Server {
return &Server{
observer: observer,
addr: addr,
}
}
func (server *Server) Listen() (err error) {
if server.ln, err = net.Listen("tcp", server.addr); err != nil {
return
}
log.Infof("start httpts server listen. addr=%s", server.addr)
return
}
func (server *Server) RunLoop() error {
for {
conn, err := server.ln.Accept()
if err != nil {
return err
}
go server.handleConnect(conn)
}
}
func (server *Server) Dispose() {
if server.ln == nil {
return
}
if err := server.ln.Close(); err != nil {
log.Error(err)
}
}
func (server *Server) handleConnect(conn net.Conn) {
log.Infof("accept a httpts connection. remoteAddr=%s", conn.RemoteAddr().String())
session := NewSubSession(conn, "http")
if err := session.ReadRequest(); err != nil {
log.Errorf("[%s] read httpts SubSession request error. err=%v", session.uniqueKey, err)
return
}
log.Debugf("[%s] < read http request. url=%s", session.uniqueKey, session.URL())
if !server.observer.OnNewHTTPTSSubSession(session) {
session.Dispose()
}
err := session.RunLoop()
log.Debugf("[%s] httpts sub session loop done. err=%v", session.uniqueKey, err)
server.observer.OnDelHTTPTSSubSession(session)
}

@ -9,180 +9,49 @@
package httpts
import (
"fmt"
"net"
"strings"
"time"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/nazalog"
)
var tsHTTPResponseHeader []byte
type SubSession struct {
uniqueKey string
IsFresh bool
scheme string
pathWithRawQuery string
headers map[string]string
urlCtx base.URLContext
conn connection.Connection
prevConnStat connection.Stat
staleStat *connection.Stat
stat base.StatSession
isWebSocket bool
*base.HTTPSubSession // 直接使用它提供的函数
IsFresh bool
}
func NewSubSession(conn net.Conn, scheme string) *SubSession {
func NewSubSession(conn net.Conn, urlCtx base.URLContext, isWebSocket bool, websocketKey string) *SubSession {
uk := base.GenUKTSSubSession()
s := &SubSession{
uniqueKey: uk,
scheme: scheme,
IsFresh: true,
conn: connection.New(conn, func(option *connection.Option) {
option.ReadBufSize = readBufSize
option.WriteChanSize = wChanSize
option.WriteTimeoutMS = subSessionWriteTimeoutMS
base.NewHTTPSubSession(base.HTTPSubSessionOption{
Conn: conn,
ConnModOption: func(option *connection.Option) {
option.WriteChanSize = SubSessionWriteChanSize
option.WriteTimeoutMS = SubSessionWriteTimeoutMS
},
UK: uk,
Protocol: base.ProtocolHTTPTS,
URLCtx: urlCtx,
IsWebSocket: isWebSocket,
WebSocketKey: websocketKey,
}),
stat: base.StatSession{
Protocol: base.ProtocolHTTPTS,
SessionID: uk,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
RemoteAddr: conn.RemoteAddr().String(),
},
true,
}
nazalog.Infof("[%s] lifecycle new httpts SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String())
return s
}
// TODO chef: read request timeout
func (session *SubSession) ReadRequest() (err error) {
defer func() {
if err != nil {
session.Dispose()
}
}()
var requestLine string
if requestLine, session.headers, err = nazahttp.ReadHTTPHeader(session.conn); err != nil {
return
}
if _, session.pathWithRawQuery, _, err = nazahttp.ParseHTTPRequestLine(requestLine); err != nil {
return
}
rawURL := fmt.Sprintf("%s://%s%s", session.scheme, session.headers["Host"], session.pathWithRawQuery)
_ = rawURL
session.urlCtx, err = base.ParseHTTPTSURL(rawURL, session.scheme == "https")
if session.headers["Connection"] == "Upgrade" && session.headers["Upgrade"] == "websocket" {
session.isWebSocket = true
//回复升级为websocket
session.writeRawPacket(base.UpdateWebSocketHeader(session.headers["Sec-WebSocket-Key"]))
}
return
}
func (session *SubSession) RunLoop() error {
buf := make([]byte, 128)
_, err := session.conn.Read(buf)
return err
}
func (session *SubSession) WriteHTTPResponseHeader() {
nazalog.Debugf("[%s] > W http response header.", session.uniqueKey)
if session.isWebSocket {
} else {
session.WriteRawPacket(tsHTTPResponseHeader)
}
}
func (session *SubSession) WriteFragmentHeader() {
nazalog.Debugf("[%s] > W http response header.", session.uniqueKey)
session.WriteRawPacket(mpegts.FixedFragmentHeader)
nazalog.Debugf("[%s] > W http response header.", session.UniqueKey())
session.HTTPSubSession.WriteHTTPResponseHeader(tsHTTPResponseHeader)
}
func (session *SubSession) WriteRawPacket(pkt []byte) {
if session.isWebSocket {
wsHeader := base.WSHeader{
Fin: true,
Rsv1: false,
Rsv2: false,
Rsv3: false,
Opcode: base.WSO_Binary,
PayloadLength: uint64(len(pkt)),
Masked: false,
}
session.writeRawPacket(base.MakeWSFrameHeader(wsHeader))
}
session.writeRawPacket(pkt)
}
func (session *SubSession) writeRawPacket(pkt []byte) {
_, _ = session.conn.Write(pkt)
}
func (session *SubSession) Dispose() error {
nazalog.Infof("[%s] lifecycle dispose httpts SubSession.", session.uniqueKey)
return session.conn.Close()
}
func (session *SubSession) UpdateStat(intervalSec uint32) {
currStat := session.conn.GetStat()
rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum
session.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
wDiff := currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum
session.stat.WriteBitrate = int(wDiff * 8 / 1024 / uint64(intervalSec))
session.stat.Bitrate = session.stat.WriteBitrate
session.prevConnStat = currStat
}
func (session *SubSession) GetStat() base.StatSession {
connStat := session.conn.GetStat()
session.stat.ReadBytesSum = connStat.ReadBytesSum
session.stat.WroteBytesSum = connStat.WroteBytesSum
return session.stat
}
func (session *SubSession) IsAlive() (readAlive, writeAlive bool) {
currStat := session.conn.GetStat()
if session.staleStat == nil {
session.staleStat = new(connection.Stat)
*session.staleStat = currStat
return true, true
}
readAlive = !(currStat.ReadBytesSum-session.staleStat.ReadBytesSum == 0)
writeAlive = !(currStat.WroteBytesSum-session.staleStat.WroteBytesSum == 0)
*session.staleStat = currStat
return
}
func (session *SubSession) URL() string {
return session.urlCtx.URL
}
func (session *SubSession) AppName() string {
return session.urlCtx.PathWithoutLastItem
}
func (session *SubSession) StreamName() string {
return strings.TrimSuffix(session.urlCtx.LastItemOfPath, ".ts")
}
func (session *SubSession) RawQuery() string {
return session.urlCtx.RawQuery
}
func (session *SubSession) UniqueKey() string {
return session.uniqueKey
nazalog.Infof("[%s] lifecycle dispose httpts SubSession.", session.UniqueKey())
return session.HTTPSubSession.Dispose()
}
func init() {

@ -8,6 +8,7 @@
package httpts
var readBufSize = 256 //16384 // SubSession读取数据时
var wChanSize = 1024 // SubSession发送数据时channel的大小
var subSessionWriteTimeoutMS = 10000
var (
SubSessionWriteChanSize = 1024
SubSessionWriteTimeoutMS = 10000
)

@ -10,7 +10,6 @@ package innertest
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
@ -75,19 +74,16 @@ func InnerTestEntry(t *testing.T) {
var err error
go logic.Entry(confFile)
logic.Init(confFile)
go logic.RunLoop()
time.Sleep(200 * time.Millisecond)
var config logic.Config
rawContent, err := ioutil.ReadFile(confFile)
nazalog.Assert(nil, err)
err = json.Unmarshal(rawContent, &config)
nazalog.Assert(nil, err)
config := logic.GetConfig()
_ = os.RemoveAll(config.HLSConfig.OutPath)
pushURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RTMPConfig.Addr)
httpflvPullURL = fmt.Sprintf("http://127.0.0.1%s/live/innertest.flv", config.HTTPFLVConfig.SubListenAddr)
httpflvPullURL = fmt.Sprintf("http://127.0.0.1%s/live/innertest.flv", config.HTTPFLVConfig.HTTPListenAddr)
rtmpPullURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/innertest", config.RTMPConfig.Addr)
err = fileReader.Open(rFLVFileName)

@ -9,24 +9,23 @@
package logic
import (
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/nazalog"
)
const ConfVersion = "v0.1.2"
const ConfVersion = "v0.2.0"
type Config struct {
ConfVersion string `json:"conf_version"`
RTMPConfig RTMPConfig `json:"rtmp"`
HTTPFLVConfig HTTPFLVConfig `json:"httpflv"`
HLSConfig HLSConfig `json:"hls"`
HTTPTSConfig HTTPTSConfig `json:"httpts"`
RTSPConfig RTSPConfig `json:"rtsp"`
RecordConfig RecordConfig `json:"record"`
RelayPushConfig RelayPushConfig `json:"relay_push"`
RelayPullConfig RelayPullConfig `json:"relay_pull"`
ConfVersion string `json:"conf_version"`
RTMPConfig RTMPConfig `json:"rtmp"`
DefaultHTTPConfig DefaultHTTPConfig `json:"default_http"`
HTTPFLVConfig HTTPFLVConfig `json:"httpflv"`
HLSConfig HLSConfig `json:"hls"`
HTTPTSConfig HTTPTSConfig `json:"httpts"`
RTSPConfig RTSPConfig `json:"rtsp"`
RecordConfig RecordConfig `json:"record"`
RelayPushConfig RelayPushConfig `json:"relay_push"`
RelayPullConfig RelayPullConfig `json:"relay_pull"`
HTTPAPIConfig HTTPAPIConfig `json:"http_api"`
ServerID string `json:"server_id"`
@ -41,19 +40,24 @@ type RTMPConfig struct {
GOPNum int `json:"gop_num"`
}
type DefaultHTTPConfig struct {
CommonHTTPAddrConfig
}
type HTTPFLVConfig struct {
httpflv.ServerConfig
CommonHTTPServerConfig
GOPNum int `json:"gop_num"`
}
type HTTPTSConfig struct {
Enable bool `json:"enable"`
SubListenAddr string `json:"sub_listen_addr"`
CommonHTTPServerConfig
}
type HLSConfig struct {
SubListenAddr string `json:"sub_listen_addr"`
UseMemoryAsDiskFlag bool `json:"use_memory_as_disk_flag"`
CommonHTTPServerConfig
UseMemoryAsDiskFlag bool `json:"use_memory_as_disk_flag"`
hls.MuxerConfig
}
@ -100,3 +104,17 @@ type PProfConfig struct {
Enable bool `json:"enable"`
Addr string `json:"addr"`
}
type CommonHTTPServerConfig struct {
CommonHTTPAddrConfig
Enable bool `json:"enable"`
EnableHTTPS bool `json:"enable_https"`
}
type CommonHTTPAddrConfig struct {
HTTPListenAddr string `json:"http_listen_addr"`
HTTPSListenAddr string `json:"https_listen_addr"`
HTTPSCertFile string `json:"https_cert_file"`
HTTPSKeyFile string `json:"https_key_file"`
}

@ -32,11 +32,16 @@ var (
sm *ServerManager
)
func Entry(confFile string) {
// TODO(chef) 临时供innertest使用后面应该重构
func GetConfig() *Config {
return config
}
func Init(confFile string) {
LoadConfAndInitLog(confFile)
if dir, err := os.Getwd(); err == nil {
nazalog.Infof("wd: %s", dir)
}
dir, _ := os.Getwd()
nazalog.Infof("wd: %s", dir)
nazalog.Infof("args: %s", strings.Join(os.Args, " "))
nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine())
nazalog.Infof("version: %s", base.LALFullInfo)
@ -56,7 +61,9 @@ func Entry(confFile string) {
nazalog.Errorf("record mpegts mkdir error. path=%s, err=%+v", config.RecordConfig.MPEGTSOutPath, err)
}
}
}
func RunLoop() {
sm = NewServerManager()
if config.PProfConfig.Enable {
@ -164,6 +171,11 @@ func LoadConfAndInitLog(confFile string) *Config {
}
}
// 如果具体的HTTP应用没有设置HTTP监听相关的配置则尝试使用全局配置
mergeCommonHTTPAddrConfig(&config.HTTPFLVConfig.CommonHTTPAddrConfig, &config.DefaultHTTPConfig.CommonHTTPAddrConfig)
mergeCommonHTTPAddrConfig(&config.HTTPTSConfig.CommonHTTPAddrConfig, &config.DefaultHTTPConfig.CommonHTTPAddrConfig)
mergeCommonHTTPAddrConfig(&config.HLSConfig.CommonHTTPAddrConfig, &config.DefaultHTTPConfig.CommonHTTPAddrConfig)
// 配置不存在时,设置默认值
if !j.Exist("hls.cleanup_mode") {
const defaultMode = hls.CleanupModeInTheEnd
@ -197,3 +209,18 @@ func runWebPProf(addr string) {
return
}
}
func mergeCommonHTTPAddrConfig(dst, src *CommonHTTPAddrConfig) {
if dst.HTTPListenAddr == "" && src.HTTPListenAddr != "" {
dst.HTTPListenAddr = src.HTTPListenAddr
}
if dst.HTTPSListenAddr == "" && src.HTTPSListenAddr != "" {
dst.HTTPSListenAddr = src.HTTPSListenAddr
}
if dst.HTTPSCertFile == "" && src.HTTPSCertFile != "" {
dst.HTTPSCertFile = src.HTTPSCertFile
}
if dst.HTTPSKeyFile == "" && src.HTTPSKeyFile != "" {
dst.HTTPSKeyFile = src.HTTPSKeyFile
}
}

@ -71,7 +71,7 @@ type Group struct {
recordMPEGTS *mpegts.FileWriter
// rtmp pub/pull使用
gopCache *GOPCache
rtmpGopCache *GOPCache
httpflvGopCache *GOPCache
// rtsp pub使用
@ -80,6 +80,9 @@ type Group struct {
sps []byte
pps []byte
// mpegts使用
patpmt []byte
//
tickCount uint32
}
@ -120,7 +123,7 @@ func NewGroup(appName string, streamName string, pullEnable bool, pullURL string
httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}),
httptsSubSessionSet: make(map[*httpts.SubSession]struct{}),
rtspSubSessionSet: make(map[*rtsp.SubSession]struct{}),
gopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum),
rtmpGopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum),
httpflvGopCache: NewGOPCache("httpflv", uk, config.HTTPFLVConfig.GOPNum),
pullProxy: &pullProxy{},
url2PushProxy: url2PushProxy,
@ -389,9 +392,8 @@ func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) {
// 这里应该也要考虑触发hls muxer开启
// 也即HTTPTS sub需要使用hls muxerhls muxer开启和关闭都要考虑HTTPTS sub
func (group *Group) AddHTTPTSSubSession(session *httpts.SubSession) {
nazalog.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey())
nazalog.Debugf("[%s] [%s] add httpts SubSession into group.", group.UniqueKey, session.UniqueKey())
session.WriteHTTPResponseHeader()
session.WriteFragmentHeader()
group.mutex.Lock()
defer group.mutex.Unlock()
@ -478,6 +480,17 @@ func (group *Group) BroadcastRTMP(msg base.RTMPMsg) {
group.broadcastRTMP(msg)
}
// hls.Muxer
func (group *Group) OnPATPMT(b []byte) {
group.patpmt = b
if group.recordMPEGTS != nil {
if err := group.recordMPEGTS.Write(b); err != nil {
nazalog.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err)
}
}
}
// hls.Muxer
func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) {
// 因为最前面Feed时已经加锁了所以这里回调上来就不用加锁了
@ -485,11 +498,12 @@ func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) {
for session := range group.httptsSubSessionSet {
if session.IsFresh {
if boundary {
session.Write(group.patpmt)
session.Write(rawFrame)
session.IsFresh = false
session.WriteRawPacket(rawFrame)
}
} else {
session.WriteRawPacket(rawFrame)
session.Write(rawFrame)
}
}
@ -542,6 +556,7 @@ func (group *Group) OnAVConfig(asc, vps, sps, pps []byte) {
// rtsp.PubSession
func (group *Group) OnAVPacket(pkt base.AVPacket) {
//nazalog.Tracef("[%s] > Group::OnAVPacket. type=%s, ts=%d", group.UniqueKey, pkt.PayloadType.ReadableString(), pkt.Timestamp)
msg, err := remux.AVPacket2RTMPMsg(pkt)
if err != nil {
nazalog.Errorf("[%s] remux av packet to rtmp msg failed. err=+%v", group.UniqueKey, err)
@ -733,20 +748,20 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
// ## 3.1. 如果是新的 sub session发送已缓存的信息
if session.IsFresh {
// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
if group.gopCache.Metadata != nil {
if group.rtmpGopCache.Metadata != nil {
//nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey)
_ = session.Write(group.gopCache.Metadata)
_ = session.Write(group.rtmpGopCache.Metadata)
}
if group.gopCache.VideoSeqHeader != nil {
if group.rtmpGopCache.VideoSeqHeader != nil {
//nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey)
_ = session.Write(group.gopCache.VideoSeqHeader)
_ = session.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.gopCache.AACSeqHeader != nil {
if group.rtmpGopCache.AACSeqHeader != nil {
//nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey)
_ = session.Write(group.gopCache.AACSeqHeader)
_ = session.Write(group.rtmpGopCache.AACSeqHeader)
}
for i := 0; i < group.gopCache.GetGOPCount(); i++ {
for _, item := range group.gopCache.GetGOPDataAt(i) {
for i := 0; i < group.rtmpGopCache.GetGOPCount(); i++ {
for _, item := range group.rtmpGopCache.GetGOPDataAt(i) {
_ = session.Write(item)
}
}
@ -766,17 +781,17 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
}
if v.pushSession.IsFresh {
if group.gopCache.Metadata != nil {
_ = v.pushSession.Write(group.gopCache.Metadata)
if group.rtmpGopCache.Metadata != nil {
_ = v.pushSession.Write(group.rtmpGopCache.Metadata)
}
if group.gopCache.VideoSeqHeader != nil {
_ = v.pushSession.Write(group.gopCache.VideoSeqHeader)
if group.rtmpGopCache.VideoSeqHeader != nil {
_ = v.pushSession.Write(group.rtmpGopCache.VideoSeqHeader)
}
if group.gopCache.AACSeqHeader != nil {
_ = v.pushSession.Write(group.gopCache.AACSeqHeader)
if group.rtmpGopCache.AACSeqHeader != nil {
_ = v.pushSession.Write(group.rtmpGopCache.AACSeqHeader)
}
for i := 0; i < group.gopCache.GetGOPCount(); i++ {
for _, item := range group.gopCache.GetGOPDataAt(i) {
for i := 0; i < group.rtmpGopCache.GetGOPCount(); i++ {
for _, item := range group.rtmpGopCache.GetGOPDataAt(i) {
_ = v.pushSession.Write(item)
}
}
@ -792,24 +807,24 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
for session := range group.httpflvSubSessionSet {
if session.IsFresh {
if group.httpflvGopCache.Metadata != nil {
session.WriteRawPacket(group.httpflvGopCache.Metadata)
session.Write(group.httpflvGopCache.Metadata)
}
if group.httpflvGopCache.VideoSeqHeader != nil {
session.WriteRawPacket(group.httpflvGopCache.VideoSeqHeader)
session.Write(group.httpflvGopCache.VideoSeqHeader)
}
if group.httpflvGopCache.AACSeqHeader != nil {
session.WriteRawPacket(group.httpflvGopCache.AACSeqHeader)
session.Write(group.httpflvGopCache.AACSeqHeader)
}
for i := 0; i < group.httpflvGopCache.GetGOPCount(); i++ {
for _, item := range group.httpflvGopCache.GetGOPDataAt(i) {
session.WriteRawPacket(item)
session.Write(item)
}
}
session.IsFresh = false
}
session.WriteRawPacket(lrm2ft.Get())
session.Write(lrm2ft.Get())
}
// # 5. 录制flv文件
@ -821,7 +836,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
// # 6. 缓存关键信息以及gop
if config.RTMPConfig.Enable {
group.gopCache.Feed(msg, lcd.Get)
group.rtmpGopCache.Feed(msg, lcd.Get)
}
if config.HTTPFLVConfig.Enable {
group.httpflvGopCache.Feed(msg, lrm2ft.Get)
@ -1002,7 +1017,8 @@ func (group *Group) addIn() {
if group.hlsMuxer != nil {
nazalog.Errorf("[%s] hls muxer exist while addIn. muxer=%+v", group.UniqueKey, group.hlsMuxer)
}
group.hlsMuxer = hls.NewMuxer(group.streamName, &config.HLSConfig.MuxerConfig, group)
enable := config.HLSConfig.Enable || config.HLSConfig.EnableHTTPS
group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &config.HLSConfig.MuxerConfig, group)
group.hlsMuxer.Start()
}
@ -1085,8 +1101,12 @@ func (group *Group) delIn() {
}
}
group.gopCache.Clear()
group.rtmpGopCache.Clear()
group.httpflvGopCache.Clear()
// TODO(chef) 情况rtsp pub缓存的asc sps pps等数据
group.patpmt = nil
}
func (group *Group) disposeHLSMuxer() {

@ -0,0 +1,113 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package logic
import (
"fmt"
"net/http"
"strings"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/naza/pkg/nazalog"
)
type HTTPServerHandlerObserver interface {
// 通知上层有新的拉流者
// 返回值: true则允许拉流false则关闭连接
OnNewHTTPFLVSubSession(session *httpflv.SubSession) bool
OnDelHTTPFLVSubSession(session *httpflv.SubSession)
OnNewHTTPTSSubSession(session *httpts.SubSession) bool
OnDelHTTPTSSubSession(session *httpts.SubSession)
}
type HTTPServerHandler struct {
observer HTTPServerHandlerObserver
}
func NewHTTPServerHandler(observer HTTPServerHandlerObserver) *HTTPServerHandler {
return &HTTPServerHandler{
observer: observer,
}
}
func (h *HTTPServerHandler) ServeSubSession(writer http.ResponseWriter, req *http.Request) {
var (
isHTTPS bool
scheme string
)
// TODO(chef) 这里scheme直接使用http和https没有考虑ws和wss注意后续的逻辑可能会依赖此处
if req.TLS == nil {
isHTTPS = false
scheme = "http"
} else {
isHTTPS = true
scheme = "https"
}
rawURL := fmt.Sprintf("%s://%s%s", scheme, req.Host, req.RequestURI)
conn, bio, err := writer.(http.Hijacker).Hijack()
if err != nil {
nazalog.Errorf("hijack failed. err=%+v", err)
return
}
if bio.Reader.Buffered() != 0 || bio.Writer.Buffered() != 0 {
nazalog.Errorf("hijack but buffer not empty. rb=%d, wb=%d", bio.Reader.Buffered(), bio.Writer.Buffered())
}
var (
isWebSocket bool
webSocketKey string
)
if req.Header.Get("Connection") == "Upgrade" && req.Header.Get("Upgrade") == "websocket" {
isWebSocket = true
webSocketKey = req.Header.Get("Sec-WebSocket-Key")
}
if strings.HasSuffix(rawURL, ".flv") {
urlCtx, err := base.ParseHTTPURL(rawURL, isHTTPS, ".flv")
if err != nil {
nazalog.Errorf("parse http url failed. err=%+v", err)
_ = conn.Close()
return
}
session := httpflv.NewSubSession(conn, urlCtx, isWebSocket, webSocketKey)
nazalog.Debugf("[%s] < read http request. url=%s", session.UniqueKey(), session.URL())
if !h.observer.OnNewHTTPFLVSubSession(session) {
session.Dispose()
}
err = session.RunLoop()
nazalog.Debugf("[%s] httpflv sub session loop done. err=%v", session.UniqueKey(), err)
h.observer.OnDelHTTPFLVSubSession(session)
return
}
if strings.HasSuffix(rawURL, ".ts") {
urlCtx, err := base.ParseHTTPURL(rawURL, isHTTPS, ".ts")
if err != nil {
nazalog.Errorf("parse http url failed. err=%+v", err)
_ = conn.Close()
return
}
session := httpts.NewSubSession(conn, urlCtx, isWebSocket, webSocketKey)
nazalog.Debugf("[%s] < read http request. url=%s", session.UniqueKey(), session.URL())
if !h.observer.OnNewHTTPTSSubSession(session) {
session.Dispose()
}
err = session.RunLoop()
nazalog.Debugf("[%s] httpts sub session loop done. err=%v", session.UniqueKey(), err)
h.observer.OnDelHTTPTSSubSession(session)
return
}
}

@ -17,6 +17,8 @@ import (
"github.com/q191201771/lal/pkg/rtsp"
)
// TODO(chef) add base.HTTPSubSession
// client.pub: rtmp, rtsp
// client.sub: rtmp, rtsp, flv, ts
// server.push: rtmp, rtsp
@ -132,8 +134,7 @@ var (
var _ rtmp.ServerObserver = &ServerManager{}
var _ rtsp.ServerObserver = &ServerManager{}
var _ httpflv.ServerObserver = &ServerManager{}
var _ httpts.ServerObserver = &ServerManager{}
var _ HTTPServerHandlerObserver = &ServerManager{}
var _ HTTPAPIServerObserver = &ServerManager{}

@ -13,24 +13,25 @@ import (
"sync"
"time"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
)
type ServerManager struct {
httpServerManager *base.HTTPServerManager
httpServerHandler *HTTPServerHandler
hlsServerHandler *hls.ServerHandler
rtmpServer *rtmp.Server
httpflvServer *httpflv.Server
hlsServer *hls.Server
httptsServer *httpts.Server
rtspServer *rtsp.Server
httpAPIServer *HTTPAPIServer
exitChan chan struct{}
@ -44,18 +45,18 @@ func NewServerManager() *ServerManager {
groupMap: make(map[string]*Group),
exitChan: make(chan struct{}),
}
if config.HTTPFLVConfig.Enable || config.HTTPFLVConfig.EnableHTTPS ||
config.HTTPTSConfig.Enable || config.HTTPTSConfig.EnableHTTPS ||
config.HLSConfig.Enable || config.HLSConfig.EnableHTTPS {
m.httpServerManager = base.NewHTTPServerManager()
m.httpServerHandler = NewHTTPServerHandler(m)
m.hlsServerHandler = hls.NewServerHandler(config.HLSConfig.OutPath)
}
if config.RTMPConfig.Enable {
m.rtmpServer = rtmp.NewServer(m, config.RTMPConfig.Addr)
}
if config.HTTPFLVConfig.Enable || config.HTTPFLVConfig.EnableHTTPS {
m.httpflvServer = httpflv.NewServer(m, config.HTTPFLVConfig.ServerConfig)
}
if config.HLSConfig.Enable {
m.hlsServer = hls.NewServer(config.HLSConfig.SubListenAddr, config.HLSConfig.OutPath)
}
if config.HTTPTSConfig.Enable {
m.httptsServer = httpts.NewServer(m, config.HTTPTSConfig.SubListenAddr)
}
if config.RTSPConfig.Enable {
m.rtspServer = rtsp.NewServer(config.RTSPConfig.Addr, m)
}
@ -68,50 +69,72 @@ func NewServerManager() *ServerManager {
func (sm *ServerManager) RunLoop() error {
httpNotify.OnServerStart()
if sm.rtmpServer != nil {
if err := sm.rtmpServer.Listen(); err != nil {
return err
var addMux = func(config CommonHTTPServerConfig, pattern string, handler base.Handler, name string) error {
if config.Enable {
err := sm.httpServerManager.AddListen(
base.LocalAddrCtx{Addr: config.HTTPListenAddr},
pattern,
handler,
)
if err != nil {
nazalog.Infof("add http listen for %s failed. addr=%s, pattern=%s, err=%+v", name, config.HTTPListenAddr, pattern, err)
return err
}
nazalog.Infof("add http listen for %s. addr=%s, pattern=%s", name, config.HTTPListenAddr, pattern)
}
go func() {
if err := sm.rtmpServer.RunLoop(); err != nil {
nazalog.Error(err)
if config.EnableHTTPS {
err := sm.httpServerManager.AddListen(
base.LocalAddrCtx{IsHTTPS: true, Addr: config.HTTPSListenAddr, CertFile: config.HTTPSCertFile, KeyFile: config.HTTPSKeyFile},
pattern,
handler,
)
if err != nil {
nazalog.Infof("add https listen for %s failed. addr=%s, pattern=%s, err=%+v", name, config.HTTPListenAddr, pattern, err)
return err
}
}()
nazalog.Infof("add https listen for %s. addr=%s, pattern=%s", name, config.HTTPSListenAddr, pattern)
}
return nil
}
if sm.httpflvServer != nil {
if err := sm.httpflvServer.Listen(); err != nil {
return err
}
go func() {
if err := sm.httpflvServer.RunLoop(); err != nil {
nazalog.Error(err)
}
}()
if err := addMux(config.HTTPFLVConfig.CommonHTTPServerConfig, HTTPFLVURLPath, sm.httpServerHandler.ServeSubSession, "httpflv"); err != nil {
return err
}
if err := addMux(config.HTTPTSConfig.CommonHTTPServerConfig, HTTPTSURLPath, sm.httpServerHandler.ServeSubSession, "httpts"); err != nil {
return err
}
if err := addMux(config.HTTPTSConfig.CommonHTTPServerConfig, HLSURLPath, sm.hlsServerHandler.ServeHTTP, "hls"); err != nil {
return err
}
if sm.httptsServer != nil {
if err := sm.httptsServer.Listen(); err != nil {
return err
go func() {
if err := sm.httpServerManager.RunLoop(); err != nil {
nazalog.Error(err)
}
go func() {
if err := sm.httptsServer.RunLoop(); err != nil {
nazalog.Error(err)
}
}()
}
}()
if sm.hlsServer != nil {
if err := sm.hlsServer.Listen(); err != nil {
if sm.rtmpServer != nil {
if err := sm.rtmpServer.Listen(); err != nil {
return err
}
go func() {
if err := sm.hlsServer.RunLoop(); err != nil {
if err := sm.rtmpServer.RunLoop(); err != nil {
nazalog.Error(err)
}
}()
}
//if sm.hlsServer != nil {
// if err := sm.hlsServer.Listen(); err != nil {
// return err
// }
// go func() {
// if err := sm.hlsServer.RunLoop(); err != nil {
// nazalog.Error(err)
// }
// }()
//}
if sm.rtspServer != nil {
if err := sm.rtspServer.Listen(); err != nil {
return err
@ -177,18 +200,15 @@ func (sm *ServerManager) RunLoop() error {
func (sm *ServerManager) Dispose() {
nazalog.Debug("dispose server manager.")
// TODO(chef) add httpServer
if sm.rtmpServer != nil {
sm.rtmpServer.Dispose()
}
if sm.httpflvServer != nil {
sm.httpflvServer.Dispose()
}
if sm.httptsServer != nil {
sm.httptsServer.Dispose()
}
if sm.hlsServer != nil {
sm.hlsServer.Dispose()
}
//if sm.hlsServer != nil {
// sm.hlsServer.Dispose()
//}
sm.mutex.Lock()
for _, group := range sm.groupMap {

@ -8,6 +8,12 @@
package logic
var (
HTTPFLVURLPath = "/live/"
HTTPTSURLPath = "/live/"
HLSURLPath = "/hls/"
)
//var relayPushCheckIntervalMS = 1000
var relayPushTimeoutMS = 5000
var relayPushWriteAVTimeoutMS = 5000

@ -74,6 +74,68 @@ var FixedFragmentHeader = []byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
// 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeaderHEVC = []byte{
/* TS */
0x47, 0x40, 0x00, 0x10, 0x00,
/* PSI */
0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00,
/* PAT */
0x00, 0x01, 0xf0, 0x01,
/* CRC */
0x2e, 0x70, 0x19, 0x05,
/* stuffing 167 bytes */
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
/* TS */
0x47, 0x50, 0x01, 0x10, 0x00,
/* PSI */
0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00,
/* PMT */
0xe1, 0x00,
0xf0, 0x00,
//0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */
0x24, 0xe1, 0x00, 0xf0, 0x00,
0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */
/* CRC */
//0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */
0xc7, 0x72, 0xb7, 0xcb,
/* stuffing 157 bytes */
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
// TS Packet Header
const (
syncByte uint8 = 0x47
@ -95,11 +157,13 @@ const (
const (
// -----------------------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-29 Stream type assignments> <page 66/174>
// 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax
// 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video
// 0x0F AAC (ISO/IEC 13818-7 Audio with ADTS transport syntax)
// 0x1B AVC (video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video)
// 0x24 HEVC (HEVC video stream as defined in Rec. ITU-T H.265 | ISO/IEC 23008-2 MPEG-H Part 2)
// -----------------------------------------------------------------------------
streamTypeAAC uint8 = 0x0F
streamTypeAVC uint8 = 0x1B
streamTypeAAC uint8 = 0x0F
streamTypeAVC uint8 = 0x1B
streamTypeHEVC uint8 = 0x24
)
// PES

@ -21,7 +21,7 @@ func TestParseFixedTSPacket(t *testing.T) {
pat := mpegts.ParsePAT(mpegts.FixedFragmentHeader[5:])
nazalog.Debugf("%+v", pat)
h = mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeader[188:])
h = mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeaderHEVC[188:])
nazalog.Debugf("%+v", h)
pmt := mpegts.ParsePMT(mpegts.FixedFragmentHeader[188+5:])
nazalog.Debugf("%+v", pmt)

@ -9,7 +9,7 @@
package mpegts
type Frame struct {
PTS uint64
PTS uint64 // =(毫秒 * 90)
DTS uint64
CC uint8 // continuity_counter of TS Header
@ -36,6 +36,8 @@ type Frame struct {
//
type OnTSPacket func(packet []byte)
// AnnexB格式的流转换为mpegts packet
//
// @param frame: 各字段含义见mpegts.Frame结构体定义
// frame.CC 注意内部会修改frame.CC的值外部在调用结束后可保存CC的值供下次调用时使用
// frame.Raw 函数调用结束后,内部不会持有该内存块

@ -23,7 +23,8 @@ type PullSessionOption struct {
// 如果为0则没有超时时间
PullTimeoutMS int
ReadAVTimeoutMS int
ReadAVTimeoutMS int
HandshakeComplexFlag bool
}
var defaultPullSessionOption = PullSessionOption{
@ -43,6 +44,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
core: NewClientSession(CSTPullSession, func(option *ClientSessionOption) {
option.DoTimeoutMS = opt.PullTimeoutMS
option.ReadAVTimeoutMS = opt.ReadAVTimeoutMS
option.HandshakeComplexFlag = opt.HandshakeComplexFlag
}),
}
}

@ -21,7 +21,8 @@ type PushSessionOption struct {
// 如果为0则没有超时时间
PushTimeoutMS int
WriteAVTimeoutMS int
WriteAVTimeoutMS int
HandshakeComplexFlag bool
}
var defaultPushSessionOption = PushSessionOption{
@ -41,6 +42,7 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession {
core: NewClientSession(CSTPushSession, func(option *ClientSessionOption) {
option.DoTimeoutMS = opt.PushTimeoutMS
option.WriteAVTimeoutMS = opt.WriteAVTimeoutMS
option.HandshakeComplexFlag = opt.HandshakeComplexFlag
}),
}
}

@ -38,7 +38,7 @@ type ClientSession struct {
packer *MessagePacker
chunkComposer *ChunkComposer
urlCtx base.URLContext
hc HandshakeClientSimple
hc IHandshakeClient
peerWinAckSize int
conn connection.Connection
@ -63,15 +63,17 @@ const (
type ClientSessionOption struct {
// 单位毫秒如果为0则没有超时
DoTimeoutMS int // 从发起连接包含了建立连接的时间到收到publish或play信令结果的超时
ReadAVTimeoutMS int // 读取音视频数据的超时
WriteAVTimeoutMS int // 发送音视频数据的超时
DoTimeoutMS int // 从发起连接包含了建立连接的时间到收到publish或play信令结果的超时
ReadAVTimeoutMS int // 读取音视频数据的超时
WriteAVTimeoutMS int // 发送音视频数据的超时
HandshakeComplexFlag bool // 握手是否使用复杂模式
}
var defaultClientSessOption = ClientSessionOption{
DoTimeoutMS: 0,
ReadAVTimeoutMS: 0,
WriteAVTimeoutMS: 0,
DoTimeoutMS: 0,
ReadAVTimeoutMS: 0,
WriteAVTimeoutMS: 0,
HandshakeComplexFlag: false,
}
type ModClientSessionOption func(option *ClientSessionOption)
@ -91,6 +93,13 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
fn(&option)
}
var hc IHandshakeClient
if option.HandshakeComplexFlag {
hc = &HandshakeClientComplex{}
} else {
hc = &HandshakeClientSimple{}
}
s := &ClientSession{
uniqueKey: uk,
t: t,
@ -104,6 +113,7 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
},
debugLogReadUserCtrlMsgMax: 5,
hc: hc,
}
nazalog.Infof("[%s] lifecycle new rtmp ClientSession. session=%p", uk, s)
return s

@ -9,13 +9,13 @@
package rtprtcp
type RTPPacketListItem struct {
packet RTPPacket
next *RTPPacketListItem
Packet RTPPacket
Next *RTPPacketListItem
}
type RTPPacketList struct {
head RTPPacketListItem // 哨兵自身不存放rtp包第一个rtp包存在在head.next中
size int // 实际元素个数
Head RTPPacketListItem // 哨兵自身不存放rtp包第一个rtp包存在在head.next中
Size int // 实际元素个数
}
type RTPUnpackContainer struct {
@ -62,14 +62,14 @@ func (r *RTPUnpackContainer) Feed(pkt RTPPacket) {
}
// 缓存达到最大值
if r.list.size > r.maxSize {
if r.list.Size > r.maxSize {
// 尝试合成一帧发生跳跃的帧
packed := r.tryUnpackOne()
if !packed {
// 合成失败了,丢弃一包过期数据
r.list.head.next = r.list.head.next.next
r.list.size--
r.list.Head.Next = r.list.Head.Next.Next
r.list.Size--
} else {
// 合成成功了,再次尝试,尽可能多的合成顺序的帧
for {
@ -97,11 +97,11 @@ func (r *RTPUnpackContainer) isStale(seq uint16) bool {
// 将rtp包按seq排序插入队列中
func (r *RTPUnpackContainer) insert(pkt RTPPacket) {
r.list.size++
r.list.Size++
p := &r.list.head
for ; p.next != nil; p = p.next {
res := CompareSeq(pkt.Header.Seq, p.next.packet.Header.Seq)
p := &r.list.Head
for ; p.Next != nil; p = p.Next {
res := CompareSeq(pkt.Header.Seq, p.Next.Packet.Header.Seq)
switch res {
case 0:
return
@ -109,29 +109,29 @@ func (r *RTPUnpackContainer) insert(pkt RTPPacket) {
// noop
case -1:
item := &RTPPacketListItem{
packet: pkt,
next: p.next,
Packet: pkt,
Next: p.Next,
}
p.next = item
p.Next = item
return
}
}
item := &RTPPacketListItem{
packet: pkt,
next: p.next,
Packet: pkt,
Next: p.Next,
}
p.next = item
p.Next = item
}
// 从队列头部尝试合成一个完整的帧。保证这次合成的帧的首个seq和上次合成帧的尾部seq是连续的
func (r *RTPUnpackContainer) tryUnpackOneSequential() bool {
if r.unpackedFlag {
first := r.list.head.next
first := r.list.Head.Next
if first == nil {
return false
}
if SubSeq(first.packet.Header.Seq, r.unpackedSeq) != 1 {
if SubSeq(first.Packet.Header.Seq, r.unpackedSeq) != 1 {
return false
}
}

@ -62,11 +62,11 @@ func (unpacker *RTPUnpackerAAC) TryUnpackOne(list *RTPPacketList) (unpackedFlag
// Unit, and 0 on all other fragments.
//
p := list.head.next // first
p := list.Head.Next // first
if p == nil {
return false, 0
}
b := p.packet.Raw[p.packet.Header.payloadOffset:]
b := p.Packet.Raw[p.Packet.Header.payloadOffset:]
//nazalog.Debugf("%d, %d, %s", len(pkt.Raw), pkt.Header.timestamp, hex.Dump(b))
aus := parseAU(b)
@ -76,43 +76,43 @@ func (unpacker *RTPUnpackerAAC) TryUnpackOne(list *RTPPacketList) (unpackedFlag
// one complete access unit
var outPkt base.AVPacket
outPkt.PayloadType = unpacker.payloadType
outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
outPkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
outPkt.Payload = b[aus[0].pos : aus[0].pos+aus[0].size]
unpacker.onAVPacket(outPkt)
list.head.next = p.next
list.size--
return true, p.packet.Header.Seq
list.Head.Next = p.Next
list.Size--
return true, p.Packet.Header.Seq
}
// fragmented
// 注意这里我们参考size和rtp包头中的timestamp不参考rtp包头中的mark位
totalSize := aus[0].size
timestamp := p.packet.Header.Timestamp
timestamp := p.Packet.Header.Timestamp
var as [][]byte
as = append(as, b[aus[0].pos:])
cacheSize := uint32(len(b[aus[0].pos:]))
seq := p.packet.Header.Seq
p = p.next
seq := p.Packet.Header.Seq
p = p.Next
packetCount := 0
for {
packetCount++
if p == nil {
return false, 0
}
if SubSeq(p.packet.Header.Seq, seq) != 1 {
if SubSeq(p.Packet.Header.Seq, seq) != 1 {
return false, 0
}
if p.packet.Header.Timestamp != timestamp {
if p.Packet.Header.Timestamp != timestamp {
nazalog.Errorf("fragments of the same access shall have the same timestamp. first=%d, curr=%d",
timestamp, p.packet.Header.Timestamp)
timestamp, p.Packet.Header.Timestamp)
return false, 0
}
b = p.packet.Raw[p.packet.Header.payloadOffset:]
b = p.Packet.Raw[p.Packet.Header.payloadOffset:]
aus := parseAU(b)
if len(aus) != 1 {
nazalog.Errorf("shall be a single fragment. len(aus)=%d", len(aus))
@ -125,22 +125,22 @@ func (unpacker *RTPUnpackerAAC) TryUnpackOne(list *RTPPacketList) (unpackedFlag
}
cacheSize += uint32(len(b[aus[0].pos:]))
seq = p.packet.Header.Seq
seq = p.Packet.Header.Seq
as = append(as, b[aus[0].pos:])
if cacheSize < totalSize {
p = p.next
p = p.Next
} else if cacheSize == totalSize {
var outPkt base.AVPacket
outPkt.PayloadType = unpacker.payloadType
outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
outPkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
for _, a := range as {
outPkt.Payload = append(outPkt.Payload, a...)
}
unpacker.onAVPacket(outPkt)
list.head.next = p.next
list.size -= packetCount
return true, p.packet.Header.Seq
list.Head.Next = p.Next
list.Size -= packetCount
return true, p.Packet.Header.Seq
} else {
nazalog.Errorf("cache size bigger then total size. cacheSize=%d, totalSize=%d",
cacheSize, totalSize)
@ -154,16 +154,16 @@ func (unpacker *RTPUnpackerAAC) TryUnpackOne(list *RTPPacketList) (unpackedFlag
for i := range aus {
var outPkt base.AVPacket
outPkt.PayloadType = unpacker.payloadType
outPkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
outPkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
// TODO chef: 这里1024的含义
outPkt.Timestamp += uint32(i * (1024 * 1000) / unpacker.clockRate)
outPkt.Payload = b[aus[i].pos : aus[i].pos+aus[i].size]
unpacker.onAVPacket(outPkt)
}
list.head.next = p.next
list.size--
return true, p.packet.Header.Seq
list.Head.Next = p.Next
list.Size--
return true, p.Packet.Header.Seq
}
type au struct {

@ -40,33 +40,33 @@ func (unpacker *RTPUnpackerAVCHEVC) CalcPositionIfNeeded(pkt *RTPPacket) {
}
func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedFlag bool, unpackedSeq uint16) {
first := list.head.next
first := list.Head.Next
if first == nil {
return false, 0
}
switch first.packet.positionType {
switch first.Packet.positionType {
case PositionTypeSingle:
var pkt base.AVPacket
pkt.PayloadType = unpacker.payloadType
pkt.Timestamp = first.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
pkt.Timestamp = first.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
pkt.Payload = make([]byte, len(first.packet.Raw)-int(first.packet.Header.payloadOffset)+4)
bele.BEPutUint32(pkt.Payload, uint32(len(first.packet.Raw))-first.packet.Header.payloadOffset)
copy(pkt.Payload[4:], first.packet.Raw[first.packet.Header.payloadOffset:])
pkt.Payload = make([]byte, len(first.Packet.Raw)-int(first.Packet.Header.payloadOffset)+4)
bele.BEPutUint32(pkt.Payload, uint32(len(first.Packet.Raw))-first.Packet.Header.payloadOffset)
copy(pkt.Payload[4:], first.Packet.Raw[first.Packet.Header.payloadOffset:])
list.head.next = first.next
list.size--
list.Head.Next = first.Next
list.Size--
unpacker.onAVPacket(pkt)
return true, first.packet.Header.Seq
return true, first.Packet.Header.Seq
case PositionTypeSTAPA:
var pkt base.AVPacket
pkt.PayloadType = unpacker.payloadType
pkt.Timestamp = first.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
pkt.Timestamp = first.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
// 跳过首字节并且将多nalu前的2字节长度替换成4字节长度
buf := first.packet.Raw[first.packet.Header.payloadOffset+1:]
buf := first.Packet.Raw[first.Packet.Header.payloadOffset+1:]
// 使用两次遍历,第一次遍历找出总大小,第二次逐个拷贝,目的是使得内存块一次就申请好,不用动态扩容造成额外性能开销
totalSize := 0
@ -90,31 +90,31 @@ func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedF
i += 2 + naluSize
}
list.head.next = first.next
list.size--
list.Head.Next = first.Next
list.Size--
unpacker.onAVPacket(pkt)
return true, first.packet.Header.Seq
return true, first.Packet.Header.Seq
case PositionTypeFUAStart:
prev := first
p := first.next
p := first.Next
for {
if prev == nil || p == nil {
return false, 0
}
if SubSeq(p.packet.Header.Seq, prev.packet.Header.Seq) != 1 {
if SubSeq(p.Packet.Header.Seq, prev.Packet.Header.Seq) != 1 {
return false, 0
}
if p.packet.positionType == PositionTypeFUAMiddle {
if p.Packet.positionType == PositionTypeFUAMiddle {
prev = p
p = p.next
p = p.Next
continue
} else if p.packet.positionType == PositionTypeFUAEnd {
} else if p.Packet.positionType == PositionTypeFUAEnd {
var pkt base.AVPacket
pkt.PayloadType = unpacker.payloadType
pkt.Timestamp = p.packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
pkt.Timestamp = p.Packet.Header.Timestamp / uint32(unpacker.clockRate/1000)
var naluTypeLen int
var naluType []byte
@ -122,14 +122,14 @@ func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedF
naluTypeLen = 1
naluType = make([]byte, naluTypeLen)
fuIndicator := first.packet.Raw[first.packet.Header.payloadOffset]
fuHeader := first.packet.Raw[first.packet.Header.payloadOffset+1]
fuIndicator := first.Packet.Raw[first.Packet.Header.payloadOffset]
fuHeader := first.Packet.Raw[first.Packet.Header.payloadOffset+1]
naluType[0] = (fuIndicator & 0xE0) | (fuHeader & 0x1F)
} else {
naluTypeLen = 2
naluType = make([]byte, naluTypeLen)
buf := first.packet.Raw[first.packet.Header.payloadOffset:]
buf := first.Packet.Raw[first.Packet.Header.payloadOffset:]
fuType := buf[2] & 0x3f
naluType[0] = (buf[0] & 0x81) | (fuType << 1)
naluType[1] = buf[1]
@ -139,11 +139,11 @@ func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedF
totalSize := 0
pp := first
for {
totalSize += len(pp.packet.Raw) - int(pp.packet.Header.payloadOffset) - (naluTypeLen + 1)
totalSize += len(pp.Packet.Raw) - int(pp.Packet.Header.payloadOffset) - (naluTypeLen + 1)
if pp == p {
break
}
pp = pp.next
pp = pp.Next
}
pkt.Payload = make([]byte, totalSize+4+naluTypeLen)
@ -160,24 +160,24 @@ func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedF
packetCount := 0
pp = first
for {
copy(pkt.Payload[index:], pp.packet.Raw[int(pp.packet.Header.payloadOffset)+(naluTypeLen+1):])
index += len(pp.packet.Raw) - int(pp.packet.Header.payloadOffset) - (naluTypeLen + 1)
copy(pkt.Payload[index:], pp.Packet.Raw[int(pp.Packet.Header.payloadOffset)+(naluTypeLen+1):])
index += len(pp.Packet.Raw) - int(pp.Packet.Header.payloadOffset) - (naluTypeLen + 1)
packetCount++
if pp == p {
break
}
pp = pp.next
pp = pp.Next
}
list.head.next = p.next
list.size -= packetCount
list.Head.Next = p.Next
list.Size -= packetCount
unpacker.onAVPacket(pkt)
return true, p.packet.Header.Seq
return true, p.Packet.Header.Seq
} else {
// 不应该出现其他类型
nazalog.Errorf("invalid position type. position=%d", p.packet.positionType)
nazalog.Errorf("invalid position type. position=%d", p.Packet.positionType)
return false, 0
}
}
@ -187,7 +187,7 @@ func (unpacker *RTPUnpackerAVCHEVC) TryUnpackOne(list *RTPPacketList) (unpackedF
case PositionTypeFUAEnd:
// noop
default:
nazalog.Errorf("invalid position. pos=%d", first.packet.positionType)
nazalog.Errorf("invalid position. pos=%d", first.Packet.positionType)
}
return false, 0

Loading…
Cancel
Save