* 部分rtmp回源代码
* 部分rtmp server代码
* 部分rtmp服务端握手代码
pull/200/head
q191201771 6 years ago
parent c6c4c9e28a
commit 763cc241a6

2
.gitignore vendored

@ -10,3 +10,5 @@
/lal_linux
/build_linux.sh
/test.sh
/util/crudetimer.go

@ -20,8 +20,11 @@ $./lal -c conf/lal.conf.json -l conf/log.dev.xml
"httpflv": {
"sub_listen_addr": ":8080" // http-flv拉流地址
},
"rtmp": {
"addr": ":8081" // rtmp服务器监听端口NOTICE rtmp服务器部分正在开发中
}
"pull": { // 如果设置则当客户端连接lal拉流而lal上该流不存在时lal会去该配置中的地址回源拉流至本地再转发给客户端
"type": "httpflv", // 回源类型,支持"httpflv",正在实现"rtmp"
"type": "httpflv", // 回源类型,支持"httpflv""rtmp"
"addr": "pull.xxx.com", // 回源地址
"connect_timeout": 2, // 回源连接超时时间
"read_timeout": 10, // 回源读取数据超时时间
@ -44,3 +47,7 @@ TODO 日志配置文件说明
#### 依赖
* cihub/seelog
#### roadmap
正在实现rtmp服务器部分

@ -4,11 +4,14 @@
"httpflv": {
"sub_listen_addr": ":8080"
},
"rtmp": {
"addr": ":8081"
},
"pull": {
"type": "httpflv",
"addr": "pull.xxx.com",
"addr": "xxx.pull.com",
"connect_timeout": 2,
"read_timeout": 10,
"read_timeout": 1,
"stop_pull_while_no_sub_timeout": 3000
}
}

@ -10,6 +10,7 @@ type Config struct {
GOPCacheNum int `json:"gop_cache_number"`
HTTPFlv HTTPFlv `json:"httpflv"`
RTMP RTMP `json:"rtmp"`
Pull Pull `json:"pull"`
}
@ -17,6 +18,10 @@ type HTTPFlv struct {
SubListenAddr string `json:"sub_listen_addr"`
}
type RTMP struct {
Addr string `json:"addr"`
}
type Pull struct {
Type string `json:"type"`
Addr string `json:"addr"`

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/q191201771/lal/httpflv"
"github.com/q191201771/lal/log"
"github.com/q191201771/lal/rtmp"
"github.com/q191201771/lal/util"
"sync"
"time"
@ -20,12 +21,13 @@ type Group struct {
appName string
streamName string
exitChan chan bool
pullSession *httpflv.PullSession
subSessionList map[*httpflv.SubSession]bool
turnToEmptyTick int64 // trace while sub session list turn to empty
gopCache *httpflv.GOPCache
mutex sync.Mutex
exitChan chan bool
rtmpPullSession *rtmp.PullSession
httpFlvPullSession *httpflv.PullSession
subSessionList map[*httpflv.SubSession]bool
turnToEmptyTick int64 // trace while sub session list turn to empty
gopCache *httpflv.GOPCache
mutex sync.Mutex
UniqueKey string
}
@ -59,9 +61,9 @@ func (group *Group) RunLoop() {
// TODO chef: do timeout stuff. and do it fast.
group.mutex.Lock()
if group.pullSession != nil {
if isReadTimeout, _ := group.pullSession.ConnStat.Check(now); isReadTimeout {
log.Warnf("pull session read timeout. [%s]", group.pullSession.UniqueKey)
if group.httpFlvPullSession != nil {
if isReadTimeout, _ := group.httpFlvPullSession.ConnStat.Check(now); isReadTimeout {
log.Warnf("pull session read timeout. [%s]", group.httpFlvPullSession.UniqueKey)
group.disposePullSession(lalErr)
}
}
@ -79,10 +81,10 @@ func (group *Group) RunLoop() {
if group.config.Pull.StopPullWhileNoSubTimeout != 0 {
group.mutex.Lock()
if group.pullSession != nil && group.turnToEmptyTick != 0 && len(group.subSessionList) == 0 &&
if group.httpFlvPullSession != nil && group.turnToEmptyTick != 0 && len(group.subSessionList) == 0 &&
now-group.turnToEmptyTick > group.config.Pull.StopPullWhileNoSubTimeout {
log.Infof("stop pull while no SubSession. [%s]", group.pullSession.UniqueKey)
log.Infof("stop pull while no SubSession. [%s]", group.httpFlvPullSession.UniqueKey)
group.disposePullSession(lalErr)
}
group.mutex.Unlock()
@ -124,45 +126,28 @@ func (group *Group) AddSubSession(session *httpflv.SubSession) {
group.mutex.Unlock()
}
func (group *Group) PullIfNeeded(httpFlvPullAddr string) {
func (group *Group) PullIfNeeded() {
group.mutex.Lock()
if group.pullSession != nil {
if group.isInExist() {
return
}
pullSession := httpflv.NewPullSession(group, group.config.Pull.ConnectTimeout, group.config.Pull.ReadTimeout)
group.pullSession = pullSession
switch group.config.Pull.Type {
case "httpflv":
group.httpFlvPullSession = httpflv.NewPullSession(group, group.config.Pull.ConnectTimeout, group.config.Pull.ReadTimeout)
go group.pullByHTTPFlv()
case "rtmp":
group.rtmpPullSession = rtmp.NewPullSession(group, group.config.Pull.ConnectTimeout)
go group.pullByRTMP()
default:
log.Errorf("unknown pull type. type=%s", group.config.Pull.Type)
}
group.mutex.Unlock()
go func() {
defer func() {
group.mutex.Lock()
defer group.mutex.Unlock()
group.pullSession = nil
log.Infof("del PullSession out of group. [%s]", pullSession.UniqueKey)
}()
log.Infof("<----- connect. [%s]", pullSession.UniqueKey)
url := fmt.Sprintf("http://%s/%s/%s.flv", httpFlvPullAddr, group.appName, group.streamName)
err := pullSession.Connect(url)
if err != nil {
log.Errorf("-----> connect error. [%s] err=%v", pullSession.UniqueKey, err)
return
}
log.Infof("-----> connect succ. [%s]", pullSession.UniqueKey)
err = pullSession.RunLoop()
if err != nil {
log.Debugf("PullSession loop done. [%s] err=%v", pullSession.UniqueKey, err)
return
}
}()
}
func (group *Group) IsTotalEmpty() bool {
group.mutex.Lock()
defer group.mutex.Unlock()
return group.pullSession == nil && len(group.subSessionList) == 0
return group.httpFlvPullSession == nil && len(group.subSessionList) == 0
}
func (group *Group) ReadHTTPRespHeaderCB() {
@ -193,12 +178,71 @@ func (group *Group) ReadTagCB(tag *httpflv.Tag) {
group.gopCache.Push(tag)
}
func (group *Group) ReadAvMessageCB(t int, timestampAbs int, message []byte) {
//log.Info(t)
group.mutex.Lock()
defer group.mutex.Unlock()
flvTag := httpflv.PackHTTPFlvTag(uint8(t), timestampAbs, message)
for session := range group.subSessionList {
if session.HasKeyFrame {
session.WritePacket(flvTag)
} else {
if httpflv.IsMetadata(flvTag) || httpflv.IsAVCKeySeqHeader(flvTag) || httpflv.IsAACSeqHeader(flvTag) || httpflv.IsAVCKeyNalu(flvTag) {
if httpflv.IsAVCKeyNalu(flvTag) {
session.HasKeyFrame = true
}
session.WritePacket(flvTag)
}
}
}
}
func (group *Group) pullByHTTPFlv() {
defer func() {
group.mutex.Lock()
defer group.mutex.Unlock()
group.httpFlvPullSession = nil
log.Infof("del httpflv PullSession out of group. [%s]", group.httpFlvPullSession.UniqueKey)
}()
log.Infof("<----- connect. [%s]", group.httpFlvPullSession.UniqueKey)
url := fmt.Sprintf("http://%s/%s/%s.flv", group.config.Pull.Addr, group.appName, group.streamName)
if err := group.httpFlvPullSession.Connect(url); err != nil {
log.Errorf("-----> connect error. [%s] err=%v", group.httpFlvPullSession.UniqueKey, err)
return
}
log.Infof("-----> connect succ. [%s]", group.httpFlvPullSession.UniqueKey)
if err := group.httpFlvPullSession.RunLoop(); err != nil {
log.Debugf("PullSession loop done. [%s] err=%v", group.httpFlvPullSession.UniqueKey, err)
return
}
}
func (group *Group) pullByRTMP() {
defer func() {
group.mutex.Lock()
defer group.mutex.Unlock()
group.rtmpPullSession = nil
log.Infof("del rtmp PullSession out of group.")
}()
url := fmt.Sprintf("rtmp://%s/%s/%s", group.config.Pull.Addr, group.appName, group.streamName)
if err := group.rtmpPullSession.Pull(url); err != nil {
log.Error(err)
}
if err := group.rtmpPullSession.WaitLoop(); err != nil {
log.Debugf("rtmp PullSession loop done. [%s] err=%v", group.rtmpPullSession.UniqueKey, err)
return
}
}
func (group *Group) disposePullSession(err error) {
group.pullSession.Dispose(err)
group.pullSession = nil
group.httpFlvPullSession.Dispose(err)
group.httpFlvPullSession = nil
group.gopCache.ClearAll()
}
func (group *Group) isInExist() bool {
return group.pullSession != nil
return group.httpFlvPullSession != nil || group.rtmpPullSession != nil
}

@ -31,13 +31,13 @@ func (server *Server) RunLoop() error {
if err != nil {
return err
}
log.Infof("listen. addr=%s", server.addr)
log.Infof("start httpflv listen. addr=%s", server.addr)
for {
conn, err := server.ln.Accept()
if err != nil {
return err
}
go server.handleSubSessionConnect(conn)
go server.handleConnect(conn)
}
}
@ -47,7 +47,7 @@ func (server *Server) Dispose() {
}
}
func (server *Server) handleSubSessionConnect(conn net.Conn) {
func (server *Server) handleConnect(conn net.Conn) {
log.Infof("accept a http flv connection. remoteAddr=%v", conn.RemoteAddr())
session := NewSubSession(conn, server.subWriteTimeout)
if err := session.ReadRequest(); err != nil {

@ -7,11 +7,12 @@ import (
// TODO chef: make these const
const tagHeaderSize int = 11
const prevTagSizeFieldSize int = 4
var (
tagTypeMetadata uint8 = 18
tagTypeVideo uint8 = 9
tagTypeAudio uint8 = 8
TagTypeMetadata uint8 = 18
TagTypeVideo uint8 = 9
TagTypeAudio uint8 = 8
)
var (
@ -54,32 +55,62 @@ type Tag struct {
Raw []byte
}
func readTagHeader(rd io.Reader) (h TagHeader, rawHeader []byte, err error) {
rawHeader = make([]byte, tagHeaderSize)
if _, err = io.ReadAtLeast(rd, rawHeader, tagHeaderSize); err != nil {
return
}
h.T = rawHeader[0]
h.DataSize = bele.BEUint24(rawHeader[1:])
h.Timestamp = (uint32(rawHeader[7]) << 24) + bele.BEUint24(rawHeader[4:])
return
}
func (tag *Tag) IsMetadata() bool {
return tag.Header.T == tagTypeMetadata
return tag.Header.T == TagTypeMetadata
}
func (tag *Tag) IsAVCKeySeqHeader() bool {
return tag.Header.T == tagTypeVideo && tag.Raw[tagHeaderSize] == AVCKey && tag.Raw[tagHeaderSize+1] == isAVCKeySeqHeader
return tag.Header.T == TagTypeVideo && tag.Raw[tagHeaderSize] == AVCKey && tag.Raw[tagHeaderSize+1] == isAVCKeySeqHeader
}
func (tag *Tag) IsAVCKeyNalu() bool {
return tag.Header.T == tagTypeVideo && tag.Raw[tagHeaderSize] == AVCKey && tag.Raw[tagHeaderSize+1] == AVCPacketTypeNalu
return tag.Header.T == TagTypeVideo && tag.Raw[tagHeaderSize] == AVCKey && tag.Raw[tagHeaderSize+1] == AVCPacketTypeNalu
}
func (tag *Tag) IsAACSeqHeader() bool {
return tag.Header.T == tagTypeAudio && tag.Raw[tagHeaderSize]>>4 == SoundFormatAAC && tag.Raw[tagHeaderSize+1] == AACPacketTypeSeqHeader
return tag.Header.T == TagTypeAudio && tag.Raw[tagHeaderSize]>>4 == SoundFormatAAC && tag.Raw[tagHeaderSize+1] == AACPacketTypeSeqHeader
}
func IsMetadata(tag []byte) bool {
return tag[0] == TagTypeMetadata
}
func IsAVCKeySeqHeader(tag []byte) bool {
return tag[0] == TagTypeVideo && tag[tagHeaderSize] == AVCKey && tag[tagHeaderSize+1] == isAVCKeySeqHeader
}
func IsAVCKeyNalu(tag []byte) bool {
return tag[0] == TagTypeVideo && tag[tagHeaderSize] == AVCKey && tag[tagHeaderSize+1] == AVCPacketTypeNalu
}
func IsAACSeqHeader(tag []byte) bool {
return tag[0] == TagTypeAudio && tag[tagHeaderSize]>>4 == SoundFormatAAC && tag[tagHeaderSize+1] == AACPacketTypeSeqHeader
}
func PackHTTPFlvTag(t uint8, timestamp int, in []byte) []byte {
out := make([]byte, tagHeaderSize+len(in)+prevTagSizeFieldSize)
out[0] = t
bele.BEPutUint24(out[1:], uint32(len(in)))
bele.BEPutUint24(out[4:], uint32(timestamp&0xFFFFFF))
out[7] = uint8(timestamp >> 24)
out[8] = 0
out[9] = 0
out[10] = 0
copy(out[11:], in)
bele.BEPutUint32(out[tagHeaderSize+len(in):], uint32(tagHeaderSize+len(in)))
return out
}
func readTagHeader(rd io.Reader) (h TagHeader, rawHeader []byte, err error) {
rawHeader = make([]byte, tagHeaderSize)
if _, err = io.ReadAtLeast(rd, rawHeader, tagHeaderSize); err != nil {
return
}
h.T = rawHeader[0]
h.DataSize = bele.BEUint24(rawHeader[1:])
h.Timestamp = (uint32(rawHeader[7]) << 24) + bele.BEUint24(rawHeader[4:])
return
}
func (tag *Tag) cloneTag() *Tag {

@ -3,6 +3,7 @@ package main
import (
"github.com/q191201771/lal/httpflv"
"github.com/q191201771/lal/log"
"github.com/q191201771/lal/rtmp"
"sync"
"time"
)
@ -10,10 +11,11 @@ import (
type Manager struct {
config *Config
server *httpflv.Server
groups map[string]*Group // TODO chef: with appName
mutex sync.Mutex
exitChan chan bool
httpFlvServer *httpflv.Server
rtmpServer *rtmp.Server
groups map[string]*Group // TODO chef: with appName
mutex sync.Mutex
exitChan chan bool
}
func NewManager(config *Config) *Manager {
@ -23,13 +25,21 @@ func NewManager(config *Config) *Manager {
exitChan: make(chan bool),
}
s := httpflv.NewServer(m, config.HTTPFlv.SubListenAddr, config.SubIdleTimeout)
m.server = s
m.httpFlvServer = s
rtmpServer := rtmp.NewServer(config.RTMP.Addr)
m.rtmpServer = rtmpServer
return m
}
func (manager *Manager) RunLoop() {
go func() {
if err := manager.server.RunLoop(); err != nil {
if err := manager.httpFlvServer.RunLoop(); err != nil {
log.Error(err)
}
}()
go func() {
if err := manager.rtmpServer.RunLoop(); err != nil {
log.Error(err)
}
}()
@ -54,7 +64,8 @@ func (manager *Manager) RunLoop() {
func (manager *Manager) Dispose() {
log.Debug("Dispose manager.")
manager.server.Dispose()
manager.httpFlvServer.Dispose()
manager.rtmpServer.Dispose()
manager.exitChan <- true
manager.mutex.Lock()
defer manager.mutex.Unlock()
@ -67,12 +78,7 @@ func (manager *Manager) Dispose() {
func (manager *Manager) NewHTTPFlvSubSessionCB(session *httpflv.SubSession) {
group := manager.getOrCreateGroup(session.AppName, session.StreamName)
group.AddSubSession(session)
switch manager.config.Pull.Type {
case "httpflv":
group.PullIfNeeded(manager.config.Pull.Addr)
default:
log.Errorf("unknown pull type. type=%s", manager.config.Pull.Type)
}
group.PullIfNeeded()
}
func (manager *Manager) check() {

@ -2,21 +2,24 @@ package rtmp
import (
"bufio"
"encoding/hex"
"github.com/q191201771/lal/bele"
"github.com/q191201771/lal/log"
"io"
"net"
"net/url"
"strings"
"time"
)
var readBufSize = 4096
var writeBufSize = 4096
// rtmp客户端类型连接的底层实现
// rtmp包的使用者应该优先使用基于ClientSession实现的PushSession和PullSession
type ClientSession struct {
t ClientSessionType
obs PullSessionObserver
doResultChan chan error
connectTimeout int64
doResultChan chan struct{}
errChan chan error
packer *MessagePacker
csid2stream map[int]*Stream
peerChunkSize int
@ -29,6 +32,8 @@ type ClientSession struct {
rb *bufio.Reader
wb *bufio.Writer
peerWinAckSize int
UniqueKey string
}
type ClientSessionType int
@ -39,18 +44,31 @@ const (
)
// set <obs> if <t> equal CSTPullSession
func NewClientSession(t ClientSessionType, obs PullSessionObserver) *ClientSession {
func NewClientSession(t ClientSessionType, obs PullSessionObserver, connectTimeout int64) *ClientSession {
var uk string
switch t {
case CSTPullSession:
uk = "RTMPPULL"
case CSTPushSession:
uk = "RTMPPUSH"
default:
panic("unreached.")
}
return &ClientSession{
t: t,
obs: obs,
doResultChan: make(chan error),
packer: NewMessagePacker(),
csid2stream: make(map[int]*Stream),
peerChunkSize: defaultChunkSize,
t: t,
obs: obs,
connectTimeout: connectTimeout,
doResultChan: make(chan struct{}),
errChan: make(chan error),
packer: NewMessagePacker(),
csid2stream: make(map[int]*Stream),
peerChunkSize: defaultChunkSize,
UniqueKey: uk,
}
}
// block until server reply play start or other error occur.
// block until server reply publish / play start or timeout.
func (s *ClientSession) Do(rawURL string) error {
if err := s.parseURL(rawURL); err != nil {
return err
@ -70,13 +88,24 @@ func (s *ClientSession) Do(rawURL string) error {
}
go func() {
err := s.runReadLoop()
s.doResultChan <- err
s.errChan <- s.runReadLoop()
}()
doResult := <-s.doResultChan
t := time.NewTimer(time.Duration(s.connectTimeout) * time.Second)
var ret error
select {
case <-s.doResultChan:
break
case <-t.C:
ret = rtmpErr
}
t.Stop()
return ret
}
return doResult
func (s *ClientSession) WaitLoop() error {
return <-s.errChan
}
func (s *ClientSession) runReadLoop() error {
@ -196,7 +225,7 @@ func (s *ClientSession) doMsg(stream *Stream) error {
case typeidUserControl:
log.Warn("user control message. ignore.")
case typeidDataMessageAMF0:
fallthrough
return s.doDataMessageAMF0(stream)
case typeidAudio:
fallthrough
case typeidVideo:
@ -207,6 +236,23 @@ func (s *ClientSession) doMsg(stream *Stream) error {
return nil
}
func (s *ClientSession) doDataMessageAMF0(stream *Stream) error {
val, err := stream.msg.peekStringWithType()
if err != nil {
return err
}
switch val {
case "|RtmpSampleAccess": // TODO chef: handle this?
return nil
default:
log.Error(val)
log.Error(hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e]))
}
s.obs.ReadAvMessageCB(stream.header.msgTypeID, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e])
return nil
}
func (s *ClientSession) doCommandMessage(stream *Stream) error {
cmd, err := stream.msg.readStringWithType()
if err != nil {
@ -249,7 +295,7 @@ func (s *ClientSession) doOnStatusMessage(stream *Stream, tid int) error {
switch code {
case "NetStream.Publish.Start":
log.Info("-----> onStatus('NetStream.Publish.Start')")
s.notifyPullResultSucc()
s.notifyDoResultSucc()
default:
log.Errorf("unknown code. code=%s", code)
}
@ -257,7 +303,7 @@ func (s *ClientSession) doOnStatusMessage(stream *Stream, tid int) error {
switch code {
case "NetStream.Play.Start":
log.Info("-----> onStatus('NetStream.Play.Start')")
s.notifyPullResultSucc()
s.notifyDoResultSucc()
default:
log.Errorf("unknown code. code=%s", code)
}
@ -363,13 +409,13 @@ func (s *ClientSession) parseURL(rawURL string) error {
}
func (s *ClientSession) handshake() error {
if err := s.hs.writeC0C1(s.Conn); err != nil {
if err := s.hs.WriteC0C1(s.Conn); err != nil {
return err
}
if err := s.hs.readS0S1S2(s.rb); err != nil {
if err := s.hs.ReadS0S1S2(s.rb); err != nil {
return err
}
if err := s.hs.writeC2(s.Conn); err != nil {
if err := s.hs.WriteC2(s.Conn); err != nil {
return err
}
return nil
@ -401,6 +447,6 @@ func (s *ClientSession) getOrCreateStream(csid int) *Stream {
return stream
}
func (s *ClientSession) notifyPullResultSucc() {
s.doResultChan <- nil
func (s *ClientSession) notifyDoResultSucc() {
s.doResultChan <- struct{}{}
}

@ -2,26 +2,73 @@ package rtmp
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"github.com/q191201771/lal/bele"
"github.com/q191201771/lal/log"
"io"
"time"
)
// rtmp握手分为两种模式简单模式和复杂模式
// 本源码文件中
// HandshakeClient作为客户端握手只实现了简单模式
// HandshakeServer作为服务端握手实现了简单模式和复杂模式
// TODO chef: HandshakeClient with complex mode.
var version = uint8(3)
//var c1Len = 1536
var c0c1Len = 1537
var s0s1Len = 1537
var c2Len = 1536
var s2Len = 1536
var s0s1s2Len = 3073
var serverVersion = []byte{
0x0D, 0x0E, 0x0A, 0x0D,
}
// 30+32
var clientKey = []byte{
'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
'F', 'l', 'a', 's', 'h', ' ', 'P', 'l', 'a', 'y', 'e', 'r', ' ',
'0', '0', '1',
0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1,
0x02, 0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB,
0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE,
}
// 36+32
var serverKey = []byte{
'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
'F', 'l', 'a', 's', 'h', ' ', 'M', 'e', 'd', 'i', 'a', ' ',
'S', 'e', 'r', 'v', 'e', 'r', ' ',
'0', '0', '1',
0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1,
0x02, 0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB,
0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE,
}
var clientPartKeyLen = 30
var serverPartKeyLen = 36
var serverFullKeyLen = 68
var keyLen = 32
type HandshakeClient struct {
c0c1 []byte
c2 []byte
}
func (c *HandshakeClient) writeC0C1(writer io.Writer) error {
type HandshakeServer struct {
isSimpleMode bool
s0s1s2 []byte
}
func (c *HandshakeClient) WriteC0C1(writer io.Writer) error {
c.c0c1 = make([]byte, c0c1Len)
c.c0c1[0] = version
bele.BEPutUint32(c.c0c1[1:5], uint32(time.Now().Unix()))
@ -33,7 +80,7 @@ func (c *HandshakeClient) writeC0C1(writer io.Writer) error {
return err
}
func (c *HandshakeClient) readS0S1S2(reader io.Reader) error {
func (c *HandshakeClient) ReadS0S1S2(reader io.Reader) error {
s0s1s2 := make([]byte, s0s1s2Len)
if _, err := io.ReadAtLeast(reader, s0s1s2, s0s1s2Len); err != nil {
return err
@ -50,8 +97,153 @@ func (c *HandshakeClient) readS0S1S2(reader io.Reader) error {
return nil
}
func (c *HandshakeClient) writeC2(write io.Writer) error {
func (c *HandshakeClient) WriteC2(write io.Writer) error {
_, err := write.Write(c.c2)
log.Infof("<----- Handshake C2")
return err
}
func (s *HandshakeServer) ReadC0C1(reader io.Reader) error {
c0c1 := make([]byte, c0c1Len)
if _, err := io.ReadAtLeast(reader, c0c1, c0c1Len); err != nil {
return err
}
log.Infof("-----> Handshake C0+C1")
s.s0s1s2 = make([]byte, s0s1s2Len)
if err := s.parseChallenge(c0c1); err != nil {
return err
}
var readC1Timestamp uint32
if s.isSimpleMode {
readC1Timestamp = bele.BEUint32(c0c1[1:])
}
s.s0s1s2[0] = version
s1 := s.s0s1s2[1:]
writeS1Timestamp := uint32(time.Now().Unix())
bele.BEPutUint32(s1, writeS1Timestamp)
if s.isSimpleMode {
bele.BEPutUint32(s1[4:], 0)
// TODO chef: random s1 1528
// s2
bele.BEPutUint32(s.s0s1s2[s0s1Len:], readC1Timestamp)
bele.BEPutUint32(s.s0s1s2[s0s1Len+4:], writeS1Timestamp)
// TODO chef: random
} else {
copy(s1[4:], serverVersion)
// TODO chef: random s1
offs := int(s1[8]) + int(s.s0s1s2[9]) + int(s1[10]) + int(s1[11])
offs = (offs % 728) + 12
makeDigestWithoutCenterKey(s1, offs, serverKey[:serverPartKeyLen], s1[offs:])
}
return nil
}
func (s *HandshakeServer) WriteS0S1S2(write io.Writer) error {
_, err := write.Write(s.s0s1s2)
log.Infof("<----- Handshake S0S1S2")
return err
}
func (s *HandshakeServer) ReadC2(reader io.Reader) error {
c2 := make([]byte, c2Len)
if _, err := io.ReadAtLeast(reader, c2, c2Len); err != nil {
return err
}
log.Infof("-----> Handshake C2")
return nil
}
func (s *HandshakeServer) parseChallenge(c0c1 []byte) error {
if c0c1[0] != version {
return rtmpErr
}
//peerEpoch := bele.BEUint32(c0c1[1:])
ver := bele.BEUint32(c0c1[5:])
if ver == 0 {
log.Debug("handshake simple mode.")
s.isSimpleMode = true
return nil
}
// assume digest in second-half
// | peer epoch | ver | | digest |
// | 4 | 4 | 764 | 764 |
// offs [0~727] + 764 + 8 + 4
// [776~1503]
offs := findDigest(c0c1[1:], 764+8, clientKey[:clientPartKeyLen])
if offs == -1 {
// try digest in first-half
// | peer epoch | ver | digest | |
// offs [0~727] + 8 + 4
// [12~739]
offs = findDigest(c0c1[1:], 8, clientKey[:clientPartKeyLen])
}
if offs == -1 {
log.Warn("get digest offs failed. roll back to try simple handshake.")
s.isSimpleMode = true
return nil
}
s.isSimpleMode = false
// use c0c1 digest to make a new digest
digest := makeDigest(c0c1[1+offs:1+offs+keyLen], serverKey[:serverFullKeyLen])
// hardcode reply offs
replyOffs := s2Len - keyLen
// use new digest as key to make another digest.
digest2 := make([]byte, keyLen)
makeDigestWithoutCenterKey(s.s0s1s2[s0s1Len:], replyOffs, digest, digest2)
// copy digest2 to s2
copy(s.s0s1s2[s0s1Len+replyOffs:], digest2)
return nil
}
// <b> could be `c1`
func findDigest(b []byte, base int, key []byte) int {
// calc offs
offs := int(b[base]) + int(b[base+1]) + int(b[base+2]) + int(b[base+3])
offs = (offs % 728) + base + 4
// calc digest
digest := make([]byte, keyLen)
makeDigestWithoutCenterKey(b, offs, key, digest)
// compare origin digest in buffer with calced digest
if bytes.Compare(digest, b[offs:offs+keyLen]) == 0 {
return offs
}
return -1
}
// <b> could be `c1` or `s2`
func makeDigestWithoutCenterKey(b []byte, offs int, key []byte, out []byte) {
mac := hmac.New(sha256.New, key)
// left
if offs != 0 {
mac.Write(b[:offs])
}
// right
if len(b)-offs-keyLen > 0 {
mac.Write(b[offs+keyLen:])
}
copy(out, mac.Sum(nil))
}
func makeDigest(b []byte, key []byte) []byte {
mac := hmac.New(sha256.New, key)
mac.Write(b)
return mac.Sum(nil)
}

@ -12,9 +12,9 @@ type PullSession struct {
*ClientSession
}
func NewPullSession(obs PullSessionObserver) *PullSession {
func NewPullSession(obs PullSessionObserver, connectTimeout int64) *PullSession {
return &PullSession{
ClientSession: NewClientSession(CSTPullSession, obs),
ClientSession: NewClientSession(CSTPullSession, obs, connectTimeout),
}
}

@ -4,9 +4,9 @@ type PushSession struct {
*ClientSession
}
func NewPushSession() *PushSession {
func NewPushSession(connectTimeout int64) *PushSession {
return &PushSession{
ClientSession: NewClientSession(CSTPushSession, nil),
ClientSession: NewClientSession(CSTPushSession, nil, connectTimeout),
}
}

@ -27,3 +27,6 @@ var tidClientPublish = 3
var maxTimestampInMessageHeader = 0xFFFFFF
var defaultChunkSize = 128
var readBufSize = 4096
var writeBufSize = 4096

@ -0,0 +1,45 @@
package rtmp
import (
"github.com/q191201771/lal/log"
"net"
)
type Server struct {
addr string
ln net.Listener
}
func NewServer(addr string) *Server {
return &Server{
addr: addr,
}
}
func (server *Server) RunLoop() error {
var err error
server.ln, err = net.Listen("tcp", server.addr)
if err != nil {
return err
}
log.Infof("start rtmp listen. addr=%s", server.addr)
for {
conn, err := server.ln.Accept()
if err != nil {
return err
}
go server.handleConnect(conn)
}
}
func (server *Server) Dispose() {
if err := server.ln.Close(); err != nil {
log.Error(err)
}
}
func (server *Server) handleConnect(conn net.Conn) {
log.Infof("accept a rtmp connection. remoteAddr=%v", conn.RemoteAddr())
session := NewServerSession(conn)
session.RunLoop()
}

@ -0,0 +1,41 @@
package rtmp
import (
"bufio"
"net"
)
type ServerSession struct {
conn net.Conn
rb *bufio.Reader
wb *bufio.Writer
hs HandshakeServer
}
func NewServerSession(conn net.Conn) *ServerSession {
return &ServerSession{
conn: conn,
rb: bufio.NewReaderSize(conn, readBufSize),
wb: bufio.NewWriterSize(conn, writeBufSize),
}
}
func (s *ServerSession) RunLoop() error {
if err := s.handshake(); err != nil {
return err
}
return nil
}
func (s *ServerSession) handshake() error {
if err := s.hs.ReadC0C1(s.rb); err != nil {
return err
}
if err := s.hs.WriteS0S1S2(s.conn); err != nil {
return err
}
if err := s.hs.ReadC2(s.rb); err != nil {
return err
}
return nil
}

@ -66,6 +66,11 @@ func (msg *StreamMsg) clear() {
msg.e = 0
}
func (msg *StreamMsg) peekStringWithType() (string, error) {
str, _, err := AMF0.readStringWithType(msg.buf[msg.b:msg.e])
return str, err
}
func (msg *StreamMsg) readStringWithType() (string, error) {
str, l, err := AMF0.readStringWithType(msg.buf[msg.b:msg.e])
if err == nil {

@ -1,12 +1,11 @@
package util
import (
"github.com/q191201771/lal/log"
"sync/atomic"
"time"
)
// 高性能场景下实现长连接读写超时功能
// 高性能场景下实现长连接流数据读写超时功能
// 不使用Go的库函数SetDeadline
// 也不在每次读写数据时使用time now获取读写时间
// 允许出现两秒左右的误差
@ -45,27 +44,21 @@ func (cs *ConnStat) Write(n int) {
// 检查时传入当前时间戳。检查频率应该小于超时阈值。频率越低,则越精确
func (cs *ConnStat) Check(now int64) (isReadTimeout bool, isWriteTimeout bool) {
if cs.readTimeout == 0 {
if cs.readTimeout == 0 { // 没有设置,则不用检查
isReadTimeout = false
log.Info("tag1")
} else {
log.Info("tag2")
trb := atomic.LoadUint64(&cs.totalReadByte)
if trb == 0 { // 历史从来没有收到过数据
isReadTimeout = (now - cs.lastReadActiveTick) > cs.readTimeout
log.Info("tag3")
} else {
if trb-cs.prevTotalReadByte > 0 { // 距离上次检查有收到过数据
isReadTimeout = false
cs.lastReadActiveTick = now
log.Info("tag4")
} else {
isReadTimeout = (now - cs.lastReadActiveTick) > cs.readTimeout
log.Info("tag5")
}
}
cs.prevTotalReadByte = trb
log.Info("tag6")
}
if cs.writeTimeout == 0 {

Loading…
Cancel
Save