Merge pull request #8 from q191201771/master

更新lal
pull/78/head
joestarzxh 4 years ago committed by GitHub
commit fdcb601be8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -4,12 +4,12 @@
[![Release](https://img.shields.io/github/tag/q191201771/lal.svg?label=release)](https://github.com/q191201771/lal/releases)
[![TravisCI](https://www.travis-ci.org/q191201771/lal.svg?branch=master)](https://www.travis-ci.org/q191201771/lal)
[![goreportcard](https://goreportcard.com/badge/github.com/q191201771/lal)](https://goreportcard.com/report/github.com/q191201771/lal)
[![wechat](https://img.shields.io/:微信-q191201771-blue.svg)](https://pengrl.com/images/yoko_vx.jpeg)
![wechat](https://img.shields.io/:微信-q191201771-blue.svg)
![qqgroup](https://img.shields.io/:QQ群-1090510973-blue.svg)
[中文文档](https://pengrl.com/lal/#/)
LAL is a live stream broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g. RTMP/RTSP/HLS/HTTP[S]-FLV/HTTP-TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache.
LAL is an audio/video live streaming broadcast server written in Go. It's sort of like `nginx-rtmp-module`, but easier to use and with more features, e.g RTMP, RTSP(RTP/RTCP), HLS, HTTP[S]-FLV/HTTP-TS, WebSocket-FLV/TS, H264/H265/AAC, relay, cluster, record, HTTP API/Notify, GOP cache.
And [more than a server, act as package and client](https://github.com/q191201771/lal#more-than-a-server-act-as-package-and-client)

@ -1,6 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.1",
"conf_version": "v0.1.2",
"rtmp": {
"enable": true,
"addr": ":1935",
@ -32,6 +32,12 @@
"enable": true,
"addr": ":5544"
},
"record": {
"enable_flv": false,
"flv_out_path": "/tmp/lal/flv/",
"enable_mpegts": false,
"mpegts_out_path": "/tmp/lal/mpegts"
},
"relay_push": {
"enable": false,
"addr_list":[

@ -1,6 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.1",
"conf_version": "v0.1.2",
"rtmp": {
"enable": true,
"addr": ":1935",
@ -32,6 +32,12 @@
"enable": true,
"addr": ":5544"
},
"record": {
"enable_flv": false,
"flv_out_path": "/tmp/lal/flv/",
"enable_mpegts": false,
"mpegts_out_path": "/tmp/lal/mpegts"
},
"relay_push": {
"enable": false,
"addr_list":[

@ -1,6 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.1",
"conf_version": "v0.1.2",
"rtmp": {
"enable": true,
"addr": ":1955",
@ -22,6 +22,10 @@
"rtsp": {
"enable": false
},
"record": {
"enable_flv": false,
"enable_mpegts": false
},
"relay_push": {
"enable": false
},

@ -1,6 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.1.1",
"conf_version": "v0.1.2",
"rtmp": {
"enable": true,
"addr": ":1935",
@ -32,6 +32,10 @@
"enable": false,
"addr": ":5544"
},
"record": {
"enable_flv": false,
"enable_mpegts": false
},
"relay_push": {
"enable": false,
"addr_list":[

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.13
require github.com/q191201771/naza v0.18.4
require github.com/q191201771/naza v0.18.5

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.18.4 h1:yk5EUz8q2FhCYg6pG5QcH9uXVJN5nmiXPu5Y5vlpArE=
github.com/q191201771/naza v0.18.4/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0=
github.com/q191201771/naza v0.18.5 h1:new/bBkivdVqPZpaviseZc2Z4qZ9I4KpKvJUmXCXjEk=
github.com/q191201771/naza v0.18.5/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0=

@ -37,7 +37,7 @@ var (
var (
// 植入rtmp握手随机字符串中
// e.g. lal v0.12.3 (github.com/q191201771/lal)
//LALRTMPHandshakeWaterMark string
LALRTMPHandshakeWaterMark string
// 植入rtmp server中的connect result信令中
// 注意有两个object第一个object中的fmsVer我们保持通用公认的值在第二个object中植入
@ -123,5 +123,5 @@ func init() {
LALHTTPFLVPullSessionUA = LALLibraryName + "/" + LALVersionDot
LALRTSPPullSessionUA = LALLibraryName + "/" + LALVersionDot
//LALRTMPHandshakeWaterMark = LALFullInfo
LALRTMPHandshakeWaterMark = LALFullInfo
}

@ -1,3 +1,11 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: joestarzxh
package base
import (

@ -33,8 +33,6 @@ import (
var ErrHLS = errors.New("lal.hls: fxxk")
var _ StreamerObserver = &Muxer{}
var audNal = []byte{
0x00, 0x00, 0x00, 0x01, 0x09, 0xf0,
}

@ -92,8 +92,8 @@ func NewMuxer(streamName string, config *MuxerConfig, observer MuxerObserver) *M
uk := base.GenUKHLSMuxer()
op := getMuxerOutPath(config.OutPath, streamName)
playlistFilename := getM3U8Filename(op, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
recordPlaylistFilename := getRecordM3U8Filename(op, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
recordPlaylistFilenameBak := fmt.Sprintf("%s.bak", recordPlaylistFilename)
frags := make([]fragmentInfo, 2*config.FragmentNum+1)
m := &Muxer{

@ -10,6 +10,7 @@ package hls
import (
"fmt"
"path/filepath"
"strings"
)
@ -56,25 +57,30 @@ func parseRequestInfo(uri string) (ri requestInfo) {
return
}
// <rootOutPath>/<ri.streamName>/<ri.fileName>
func readFileContent(rootOutPath string, ri requestInfo) ([]byte, error) {
filename := fmt.Sprintf("%s%s/%s", rootOutPath, ri.streamName, ri.fileName)
filename := filepath.Join(rootOutPath, ri.streamName, ri.fileName)
return fslCtx.ReadFile(filename)
}
// <rootOutPath>/<streamName>
func getMuxerOutPath(rootOutPath string, streamName string) string {
return fmt.Sprintf("%s%s/", rootOutPath, streamName)
return filepath.Join(rootOutPath, streamName)
}
func getM3U8Filename(outpath string, streamName string) string {
return fmt.Sprintf("%s%s.m3u8", outpath, "playlist")
// @param outPath 参考func getMuxerOutPath
func getM3U8Filename(outPath string, streamName string) string {
return filepath.Join(outPath, "playlist.m3u8")
}
func getRecordM3U8Filename(outpath string, streamName string) string {
return fmt.Sprintf("%s%s.m3u8", outpath, "record")
// @param outPath 参考func getMuxerOutPath
func getRecordM3U8Filename(outPath string, streamName string) string {
return filepath.Join(outPath, "record.m3u8")
}
// @param outPath 参考func getMuxerOutPath
func getTSFilenameWithPath(outpath string, filename string) string {
return fmt.Sprintf("%s%s", outpath, filename)
return filepath.Join(outpath, filename)
}
func getTSFilename(streamName string, id int, timestamp int) string {

@ -10,6 +10,8 @@ package httpflv
import "os"
// TODO chef: 结构体重命名为FileWriter文件名重命名为file_writer.go。所有写流文件的flv,hls,ts统一重构
type FLVFileWriter struct {
fp *os.File
}
@ -20,17 +22,39 @@ func (ffw *FLVFileWriter) Open(filename string) (err error) {
}
func (ffw *FLVFileWriter) WriteRaw(b []byte) (err error) {
if ffw.fp == nil {
return ErrHTTPFLV
}
_, err = ffw.fp.Write(b)
return
}
func (ffw *FLVFileWriter) WriteFLVHeader() (err error) {
if ffw.fp == nil {
return ErrHTTPFLV
}
_, err = ffw.fp.Write(FLVHeader)
return
}
func (ffw *FLVFileWriter) WriteTag(tag Tag) (err error) {
if ffw.fp == nil {
return ErrHTTPFLV
}
_, err = ffw.fp.Write(tag.Raw)
return
}
func (ffw *FLVFileWriter) Dispose() {
if ffw.fp != nil {
_ = ffw.fp.Close()
func (ffw *FLVFileWriter) Dispose() error {
if ffw.fp == nil {
return ErrHTTPFLV
}
return ffw.fp.Close()
}
func (ffw *FLVFileWriter) Name() string {
if ffw.fp == nil {
return ""
}
return ffw.fp.Name()
}

@ -15,7 +15,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
const ConfVersion = "v0.1.1"
const ConfVersion = "v0.1.2"
type Config struct {
ConfVersion string `json:"conf_version"`
@ -24,6 +24,7 @@ type Config struct {
HLSConfig HLSConfig `json:"hls"`
HTTPTSConfig HTTPTSConfig `json:"httpts"`
RTSPConfig RTSPConfig `json:"rtsp"`
RecordConfig RecordConfig `json:"record"`
RelayPushConfig RelayPushConfig `json:"relay_push"`
RelayPullConfig RelayPullConfig `json:"relay_pull"`
@ -61,6 +62,13 @@ type RTSPConfig struct {
Addr string `json:"addr"`
}
type RecordConfig struct {
EnableFLV bool `json:"enable_flv"`
FLVOutPath string `json:"flv_out_path"`
EnableMPEGTS bool `json:"enable_mpegts"`
MPEGTSOutPath string `json:"mpegts_out_path"`
}
type RelayPushConfig struct {
Enable bool `json:"enable"`
AddrList []string `json:"addr_list"`

@ -34,6 +34,9 @@ var (
func Entry(confFile string) {
LoadConfAndInitLog(confFile)
if dir, err := os.Getwd(); err == nil {
nazalog.Infof("wd: %s", dir)
}
nazalog.Infof("args: %s", strings.Join(os.Args, " "))
nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine())
nazalog.Infof("version: %s", base.LALFullInfo)
@ -45,6 +48,15 @@ func Entry(confFile string) {
hls.SetUseMemoryAsDiskFlag(true)
}
if config.RecordConfig.EnableFLV {
if err := os.MkdirAll(config.RecordConfig.FLVOutPath, 0777); err != nil {
nazalog.Errorf("record flv mkdir error. path=%s, err=%+v", config.RecordConfig.FLVOutPath, err)
}
if err := os.MkdirAll(config.RecordConfig.MPEGTSOutPath, 0777); err != nil {
nazalog.Errorf("record mpegts mkdir error. path=%s, err=%+v", config.RecordConfig.MPEGTSOutPath, err)
}
}
sm = NewServerManager()
if config.PProfConfig.Enable {
@ -138,6 +150,7 @@ func LoadConfAndInitLog(confFile string) *Config {
"hls",
"httpts",
"rtsp",
"record",
"relay_push",
"relay_pull",
"http_api",

@ -11,8 +11,12 @@ package logic
import (
"encoding/json"
"fmt"
"path/filepath"
"strings"
"sync"
"time"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/remux"
@ -46,30 +50,36 @@ type Group struct {
mutex sync.Mutex
//
stat base.StatGroup
//
// pub
rtmpPubSession *rtmp.ServerSession
rtspPubSession *rtsp.PubSession
//
// pull
pullEnable bool
pullURL string
pullProxy *pullProxy
//
// sub
rtmpSubSessionSet map[*rtmp.ServerSession]struct{}
httpflvSubSessionSet map[*httpflv.SubSession]struct{}
httptsSubSessionSet map[*httpts.SubSession]struct{}
rtspSubSessionSet map[*rtsp.SubSession]struct{}
//
// push
url2PushProxy map[string]*pushProxy
//
// hls
hlsMuxer *hls.Muxer
recordFLV *httpflv.FLVFileWriter
recordMPEGTS *mpegts.FileWriter
// rtmp pub/pull使用
gopCache *GOPCache
httpflvGopCache *GOPCache
// rtsp pub使用
asc []byte
vps []byte
sps []byte
pps []byte
//
tickCount uint32
}
@ -482,6 +492,12 @@ func (group *Group) OnTSPackets(rawFrame []byte, boundary bool) {
session.WriteRawPacket(rawFrame)
}
}
if group.recordMPEGTS != nil {
if err := group.recordMPEGTS.Write(rawFrame); err != nil {
nazalog.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err)
}
}
}
// rtmp.PubSession or rtmp.PullSession
@ -796,7 +812,14 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
session.WriteRawPacket(lrm2ft.Get())
}
// # 5. 缓存关键信息以及gop
// # 5. 录制flv文件
if group.recordFLV != nil {
if err := group.recordFLV.WriteRaw(lrm2ft.Get()); err != nil {
nazalog.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err)
}
}
// # 6. 缓存关键信息以及gop
if config.RTMPConfig.Enable {
group.gopCache.Feed(msg, lcd.Get)
}
@ -804,7 +827,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
group.httpflvGopCache.Feed(msg, lrm2ft.Get)
}
// # 6. 记录stat
// # 7. 记录stat
if group.stat.AudioCodec == "" {
if msg.IsAACSeqHeader() {
group.stat.AudioCodec = base.AudioCodecAAC
@ -986,6 +1009,48 @@ func (group *Group) addIn() {
if config.RelayPushConfig.Enable {
group.pushIfNeeded()
}
now := time.Now().Unix()
if config.RecordConfig.EnableFLV {
filename := fmt.Sprintf("%s-%d.flv", group.streamName, now)
filenameWithPath := filepath.Join(config.RecordConfig.FLVOutPath, filename)
if group.recordFLV != nil {
nazalog.Errorf("[%s] record flv but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordFLV.Name())
if err := group.recordFLV.Dispose(); err != nil {
nazalog.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err)
}
}
group.recordFLV = &httpflv.FLVFileWriter{}
if err := group.recordFLV.Open(filenameWithPath); err != nil {
nazalog.Errorf("[%s] record flv open file failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFLV = nil
}
if err := group.recordFLV.WriteFLVHeader(); err != nil {
nazalog.Errorf("[%s] record flv write flv header failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFLV = nil
}
}
if config.RecordConfig.EnableMPEGTS {
filename := fmt.Sprintf("%s-%d.ts", group.streamName, now)
filenameWithPath := filepath.Join(config.RecordConfig.MPEGTSOutPath, filename)
if group.recordMPEGTS != nil {
nazalog.Errorf("[%s] record mpegts but already exist. new filename=%s, old filename=%s",
group.UniqueKey, filenameWithPath, group.recordMPEGTS.Name())
if err := group.recordMPEGTS.Dispose(); err != nil {
nazalog.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err)
}
}
group.recordMPEGTS = &mpegts.FileWriter{}
if err := group.recordMPEGTS.Create(filenameWithPath); err != nil {
nazalog.Errorf("[%s] record mpegts open file failed. filename=%s, err=%+v",
group.UniqueKey, filenameWithPath, err)
group.recordFLV = nil
}
}
}
func (group *Group) delIn() {
@ -1002,6 +1067,24 @@ func (group *Group) delIn() {
}
}
if config.RecordConfig.EnableFLV {
if group.recordFLV != nil {
if err := group.recordFLV.Dispose(); err != nil {
nazalog.Errorf("[%s] record flv dispose error. err=%+v", group.UniqueKey, err)
}
group.recordFLV = nil
}
}
if config.RecordConfig.EnableMPEGTS {
if group.recordMPEGTS != nil {
if err := group.recordMPEGTS.Dispose(); err != nil {
nazalog.Errorf("[%s] record mpegts dispose error. err=%+v", group.UniqueKey, err)
}
group.recordMPEGTS = nil
}
}
group.gopCache.Clear()
group.httpflvGopCache.Clear()
}

@ -144,8 +144,8 @@ var _ hls.MuxerObserver = &Group{}
var _ rtsp.BaseInSessionObserver = &Group{} //
var _ rtmp.ServerSessionObserver = &rtmp.Server{}
var _ rtmp.HandshakeClient = &rtmp.HandshakeClientSimple{}
var _ rtmp.HandshakeClient = &rtmp.HandshakeClientComplex{}
var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientSimple{}
var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientComplex{}
var _ rtsp.ServerCommandSessionObserver = &rtsp.Server{}
var _ rtsp.ClientCommandSessionObserver = &rtsp.PushSession{}

@ -0,0 +1,42 @@
// Copyright 2019, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package mpegts
import "os"
type FileWriter struct {
fp *os.File
}
func (fw *FileWriter) Create(filename string) (err error) {
fw.fp, err = os.Create(filename)
return
}
func (fw *FileWriter) Write(b []byte) (err error) {
if fw.fp == nil {
return ErrMPEGTS
}
_, err = fw.fp.Write(b)
return
}
func (fw *FileWriter) Dispose() error {
if fw.fp == nil {
return ErrMPEGTS
}
return fw.fp.Close()
}
func (fw *FileWriter) Name() string {
if fw.fp == nil {
return ""
}
return fw.fp.Name()
}

@ -8,8 +8,12 @@
package mpegts
import "errors"
// MPEG: Moving Picture Experts Group
var ErrMPEGTS = errors.New("lal.mpegts: fxxk")
// 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeader = []byte{
/* TS */

@ -297,15 +297,20 @@ func (s *ClientSession) handshake() error {
return err
}
if err := s.hc.ReadS0S1S2(s.conn); err != nil {
if err := s.hc.ReadS0S1(s.conn); err != nil {
return err
}
nazalog.Infof("[%s] < R Handshake S0+S1+S2.", s.uniqueKey)
nazalog.Infof("[%s] < R Handshake S0+S1.", s.uniqueKey)
nazalog.Infof("[%s] > W Handshake C2.", s.uniqueKey)
if err := s.hc.WriteC2(s.conn); err != nil {
return err
}
if err := s.hc.ReadS2(s.conn); err != nil {
return err
}
nazalog.Infof("[%s] < R Handshake S2.", s.uniqueKey)
return nil
}

@ -12,9 +12,12 @@ import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"fmt"
"io"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
@ -24,24 +27,26 @@ import (
const version = uint8(3)
const (
c0c1Len = 1537
c2Len = 1536
s0s1Len = 1537
s1Len = 1536
s2Len = 1536
c0c1Len = 1537
s0s1Len = 1537
s0s1s2Len = 3073
)
const (
clientPartKeyLen = 30
clientFullKeyLen = 62
serverPartKeyLen = 36
serverFullKeyLen = 68
keyLen = 32
)
var (
clientVersion = []byte{0x0C, 0x00, 0x0D, 0x0E}
serverVersion = []byte{0x0D, 0x0E, 0x0A, 0x0D}
clientVersionMockFromFFMPEG = []byte{9, 0, 124, 2} // emulated Flash client version - 9.0.124.2 on Linux
clientVersion = []byte{0x0C, 0x00, 0x0D, 0x0E}
serverVersion = []byte{0x0D, 0x0E, 0x0A, 0x0D}
)
// 30+32
@ -69,20 +74,19 @@ var serverKey = []byte{
var random1528Buf []byte
type HandshakeClient interface {
type IHandshakeClient interface {
WriteC0C1(writer io.Writer) error
ReadS0S1S2(reader io.Reader) error
ReadS0S1(reader io.Reader) error
WriteC2(writer io.Writer) error
ReadS2(reader io.Reader) error
}
type HandshakeClientSimple struct {
c0c1 []byte
c2 []byte
buf []byte
}
type HandshakeClientComplex struct {
c0c1 []byte
c2 []byte
buf []byte
}
type HandshakeServer struct {
@ -91,77 +95,78 @@ type HandshakeServer struct {
}
func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error {
c.c0c1 = make([]byte, c0c1Len)
c.c0c1[0] = version
bele.BEPutUint32(c.c0c1[1:5], uint32(time.Now().UnixNano()))
//c.c0c1[1] = 0
//c.c0c1[2] = 0
//c.c0c1[3] = 0
//c.c0c1[4] = 0
//c.c0c1[5] = 9
//c.c0c1[6] = 0
//c.c0c1[7] = 124
//c.c0c1[8] = 2
random1528(c.c0c1[9:])
_, err := writer.Write(c.c0c1)
c.buf = make([]byte, c0c1Len)
c.buf[0] = version
bele.BEPutUint32(c.buf[1:5], uint32(time.Now().UnixNano()))
bele.BEPutUint32(c.buf[5:9], 0) // 4字节模式串保持为0标识是简单模式
random1528(c.buf[9:])
_, err := writer.Write(c.buf)
return err
}
func (c *HandshakeClientSimple) ReadS0S1S2(reader io.Reader) error {
s0s1s2 := make([]byte, s0s1s2Len)
if _, err := io.ReadAtLeast(reader, s0s1s2, s0s1s2Len); err != nil {
return err
}
//if s0s1s2[0] != version {
// return ErrRTMP
//}
// use s2 as c2
c.c2 = append(c.c2, s0s1s2[s0s1Len:]...)
func (c *HandshakeClientSimple) ReadS0S1(reader io.Reader) error {
_, err := io.ReadAtLeast(reader, c.buf, s0s1Len)
return err
}
return nil
func (c *HandshakeClientSimple) WriteC2(writer io.Writer) error {
// use s1 as c2
_, err := writer.Write(c.buf[1:])
return err
}
func (c *HandshakeClientSimple) WriteC2(write io.Writer) error {
_, err := write.Write(c.c2)
func (c *HandshakeClientSimple) ReadS2(reader io.Reader) error {
_, err := io.ReadAtLeast(reader, c.buf, s2Len)
return err
}
func (c *HandshakeClientComplex) WriteC0C1(writer io.Writer) error {
c.c0c1 = make([]byte, c0c1Len)
c.c0c1[0] = version
bele.BEPutUint32(c.c0c1[1:5], uint32(time.Now().UnixNano()))
//
copy(c.c0c1[5:], clientVersion)
random1528(c.c0c1[9:])
// offset
c.c0c1[9] = 0
c.c0c1[10] = 0
c.c0c1[11] = 0
c.c0c1[12] = 0
// digest
makeDigestWithoutCenterPart(c.c0c1[1:], 12, clientKey[:clientPartKeyLen], c.c0c1[13:])
_, err := writer.Write(c.c0c1)
c.buf = make([]byte, c0c1Len)
c.buf[0] = version
// mock ffmpeg
bele.BEPutUint32(c.buf[1:5], 0)
copy(c.buf[5:9], clientVersionMockFromFFMPEG)
random1528(c.buf[9:])
offs := int(c.buf[9]) + int(c.buf[10]) + int(c.buf[11]) + int(c.buf[12])
offs = (offs % 728) + 12
makeDigestWithoutCenterPart(c.buf[1:c0c1Len], offs, clientKey[:clientPartKeyLen], c.buf[1+offs:])
_, err := writer.Write(c.buf)
return err
}
func (c *HandshakeClientComplex) ReadS0S1S2(reader io.Reader) error {
s0s1s2 := make([]byte, s0s1s2Len)
if _, err := io.ReadAtLeast(reader, s0s1s2, s0s1s2Len); err != nil {
func (c *HandshakeClientComplex) ReadS0S1(reader io.Reader) error {
s0s1 := make([]byte, s0s1Len)
if _, err := io.ReadAtLeast(reader, s0s1, s0s1Len); err != nil {
return err
}
//if s0s1s2[0] != version {
// return ErrRTMP
//}
// TODO chef: 这里复杂模式的 c2 构造没有完全按照规范
// nginx rtmp module 作为 server 端时,不会校验 c2 内容
c.c2 = append(c.c2, s0s1s2[s0s1Len:]...)
c2key := parseChallenge(s0s1, serverKey[:serverPartKeyLen], clientKey[:clientFullKeyLen])
// simple mode
if c2key == nil {
// use s1 as c2
copy(c.buf, s0s1[1:])
return nil
}
// complex mode
random1528(c.buf)
replayOffs := c2Len - keyLen
makeDigestWithoutCenterPart(c.buf[:c2Len], replayOffs, c2key, c.buf[replayOffs:replayOffs+keyLen])
return nil
}
func (c *HandshakeClientComplex) WriteC2(write io.Writer) error {
_, err := write.Write(c.c2)
func (c *HandshakeClientComplex) WriteC2(writer io.Writer) error {
_, err := writer.Write(c.buf[:c2Len])
return err
}
func (c *HandshakeClientComplex) ReadS2(reader io.Reader) error {
_, err := io.ReadAtLeast(reader, c.buf, s2Len)
return err
}
@ -173,7 +178,7 @@ func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error) {
s.s0s1s2 = make([]byte, s0s1s2Len)
s2key := parseChallenge(c0c1)
s2key := parseChallenge(c0c1, clientKey[:clientPartKeyLen], serverKey[:serverFullKeyLen])
s.isSimpleMode = len(s2key) == 0
s.s0s1s2[0] = version
@ -208,8 +213,8 @@ func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error) {
return nil
}
func (s *HandshakeServer) WriteS0S1S2(write io.Writer) error {
_, err := write.Write(s.s0s1s2)
func (s *HandshakeServer) WriteS0S1S2(writer io.Writer) error {
_, err := writer.Write(s.s0s1s2)
return err
}
@ -221,19 +226,21 @@ func (s *HandshakeServer) ReadC2(reader io.Reader) error {
return nil
}
func parseChallenge(c0c1 []byte) []byte {
//if c0c1[0] != version {
// c0c1 clientPartKey serverFullKey
// s0s1 serverPartKey clientFullKey
func parseChallenge(b []byte, peerKey []byte, key []byte) []byte {
//if b[0] != version {
// return nil, ErrRTMP
//}
ver := bele.BEUint32(c0c1[5:])
ver := bele.BEUint32(b[5:])
if ver == 0 {
nazalog.Debug("handshake simple mode.")
return nil
}
offs := findDigest(c0c1[1:], 764+8, clientKey[:clientPartKeyLen])
offs := findDigest(b[1:], 764+8, peerKey)
if offs == -1 {
offs = findDigest(c0c1[1:], 8, clientKey[:clientPartKeyLen])
offs = findDigest(b[1:], 8, peerKey)
}
if offs == -1 {
nazalog.Warn("get digest offs failed. roll back to try simple handshake.")
@ -242,20 +249,21 @@ func parseChallenge(c0c1 []byte) []byte {
nazalog.Debug("handshake complex mode.")
// use c0c1 digest to make a new digest
digest := makeDigest(c0c1[1+offs:1+offs+keyLen], serverKey[:serverFullKeyLen])
digest := makeDigest(b[1+offs:1+offs+keyLen], key)
return digest
}
func findDigest(c1 []byte, base int, key []byte) int {
// @param b c1或s1
func findDigest(b []byte, base int, key []byte) int {
// calc offs
offs := int(c1[base]) + int(c1[base+1]) + int(c1[base+2]) + int(c1[base+3])
offs := int(b[base]) + int(b[base+1]) + int(b[base+2]) + int(b[base+3])
offs = (offs % 728) + base + 4
// calc digest
digest := make([]byte, keyLen)
makeDigestWithoutCenterPart(c1, offs, key, digest)
makeDigestWithoutCenterPart(b, offs, key, digest)
// compare origin digest in buffer with calced digest
if bytes.Compare(digest, c1[offs:offs+keyLen]) == 0 {
if bytes.Compare(digest, b[offs:offs+keyLen]) == 0 {
return offs
}
return -1
@ -288,6 +296,8 @@ func random1528(out []byte) {
func init() {
random1528Buf = make([]byte, 1528)
//hack := fmt.Sprintf("random buf of rtmp handshake gen by %s", base.LALRTMPHandshakeWaterMark)
//copy(random1528Buf, []byte(hack))
hack := []byte(fmt.Sprintf("random buf of rtmp handshake gen by %s", base.LALRTMPHandshakeWaterMark))
for i := 0; i < 1528; i += len(hack) {
copy(random1528Buf[i:], hack)
}
}

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save