*) 新增 rtmp.PubSession 和 rtmp.SubSession *) 新增 rtmp/handshake_test.go *) 新增 rtmp/chunkdivider.go。用于将业务层的message切割成rtmp chunk

pull/200/head
q191201771 6 years ago
parent d9d752ce52
commit 8f52174fbd

@ -47,6 +47,7 @@ TODO 日志配置文件说明
#### 依赖 #### 依赖
* cihub/seelog * cihub/seelog
* stretchr/testify/assert
#### roadmap #### roadmap

@ -16,11 +16,11 @@ type Group struct {
streamName string streamName string
exitChan chan bool exitChan chan bool
rtmpPubSession *rtmp.ServerSession rtmpPubSession *rtmp.PubSession
rtmpPullSession *rtmp.PullSession rtmpPullSession *rtmp.PullSession
httpFlvPullSession *httpflv.PullSession httpFlvPullSession *httpflv.PullSession
httpFlvSubSessionList map[*httpflv.SubSession]struct{} httpFlvSubSessionList map[*httpflv.SubSession]struct{}
rtmpSubSessionList map[*rtmp.ServerSession]struct{} rtmpSubSessionList map[*rtmp.SubSession]struct{}
turnToEmptyTick int64 // trace while sub session list turn to empty turnToEmptyTick int64 // trace while sub session list turn to empty
gopCache *httpflv.GOPCache gopCache *httpflv.GOPCache
mutex sync.Mutex mutex sync.Mutex
@ -38,7 +38,7 @@ func NewGroup(appName string, streamName string, config *Config) *Group {
streamName: streamName, streamName: streamName,
exitChan: make(chan bool), exitChan: make(chan bool),
httpFlvSubSessionList: make(map[*httpflv.SubSession]struct{}), httpFlvSubSessionList: make(map[*httpflv.SubSession]struct{}),
rtmpSubSessionList: make(map[*rtmp.ServerSession]struct{}), rtmpSubSessionList: make(map[*rtmp.SubSession]struct{}),
gopCache: httpflv.NewGOPCache(config.GOPCacheNum), gopCache: httpflv.NewGOPCache(config.GOPCacheNum),
UniqueKey: uk, UniqueKey: uk,
} }
@ -124,7 +124,7 @@ func (group *Group) AddHTTPFlvSubSession(session *httpflv.SubSession) {
} }
} }
func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) { func (group *Group) AddRTMPSubSession(session *rtmp.SubSession) {
group.mutex.Lock() group.mutex.Lock()
defer group.mutex.Unlock() defer group.mutex.Unlock()
log.Debugf("add SubSession into group. [%s]", session.UniqueKey) log.Debugf("add SubSession into group. [%s]", session.UniqueKey)
@ -132,7 +132,7 @@ func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) {
group.turnToEmptyTick = 0 group.turnToEmptyTick = 0
} }
func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) { func (group *Group) AddRTMPPubSession(session *rtmp.PubSession) {
// TODO chef: 如果已经存在输入,应该拒绝掉这次推流 // TODO chef: 如果已经存在输入,应该拒绝掉这次推流
group.mutex.Lock() group.mutex.Lock()
defer group.mutex.Unlock() defer group.mutex.Unlock()
@ -170,15 +170,12 @@ func (group *Group) IsTotalEmpty() bool {
} }
func (group *Group) ReadHTTPRespHeaderCB() { func (group *Group) ReadHTTPRespHeaderCB() {
//log.Debugf("ReadHTTPRespHeaderCb. [%s]", group.UniqueKey)
} }
func (group *Group) ReadFlvHeaderCB(flvHeader []byte) { func (group *Group) ReadFlvHeaderCB(flvHeader []byte) {
//log.Debugf("ReadFlvHeaderCb. [%s]", group.UniqueKey)
} }
func (group *Group) ReadTagCB(tag *httpflv.Tag) { func (group *Group) ReadTagCB(tag *httpflv.Tag) {
//log.Debug(header.t, header.timestamp)
group.mutex.Lock() group.mutex.Lock()
defer group.mutex.Unlock() defer group.mutex.Unlock()
// TODO chef: assume that write fast and would not block // TODO chef: assume that write fast and would not block
@ -198,11 +195,16 @@ func (group *Group) ReadTagCB(tag *httpflv.Tag) {
group.gopCache.Push(tag) group.gopCache.Push(tag)
} }
func (group *Group) ReadAVMessageCB(t int, timestampAbs int, message []byte) { func (group *Group) ReadAVMessageCB(header rtmp.Header, timestampAbs int, message []byte) {
//log.Info(t) //log.Info(t)
group.mutex.Lock() group.mutex.Lock()
defer group.mutex.Unlock() defer group.mutex.Unlock()
flvTag := httpflv.PackHTTPFlvTag(uint8(t), timestampAbs, message)
//for session := range group.rtmpSubSessionList {
//
//}
flvTag := httpflv.PackHTTPFlvTag(uint8(header.MsgTypeID), timestampAbs, message)
for session := range group.httpFlvSubSessionList { for session := range group.httpFlvSubSessionList {
if session.HasKeyFrame { if session.HasKeyFrame {
session.WritePacket(flvTag) session.WritePacket(flvTag)

@ -114,16 +114,6 @@ func (session *PullSession) Dispose(err error) {
}) })
} }
//func (session *PullSession) GetStat() (now PullSessionStat, diff PullSessionStat) {
// session.statMutex.Lock()
// defer session.statMutex.Unlock()
// now = session.stat
// diff.ReadCount = session.stat.ReadCount - session.prevStat.ReadCount
// diff.ReadByte = session.stat.ReadByte - session.prevStat.ReadByte
// session.prevStat = session.stat
// return
//}
func (session *PullSession) runReadLoop() error { func (session *PullSession) runReadLoop() error {
if err := session.readHTTPRespHeader(); err != nil { if err := session.readHTTPRespHeader(); err != nil {
return err return err

@ -27,6 +27,8 @@ var flvHeaderBuf13 = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x0, 0x0, 0x0, 0x09, 0
var wChanSize = 1024 // TODO chef: 1024 var wChanSize = 1024 // TODO chef: 1024
type SubSession struct { type SubSession struct {
UniqueKey string
ConnStat util.ConnStat ConnStat util.ConnStat
writeTimeout int64 writeTimeout int64
@ -45,8 +47,6 @@ type SubSession struct {
closeOnce sync.Once closeOnce sync.Once
exitChan chan struct{} exitChan chan struct{}
hasClosedFlag uint32 hasClosedFlag uint32
UniqueKey string
} }
func NewSubSession(conn net.Conn, writeTimeout int64) *SubSession { func NewSubSession(conn net.Conn, writeTimeout int64) *SubSession {
@ -142,7 +142,6 @@ func (session *SubSession) WritePacket(pkt []byte) {
if session.hasClosed() { if session.hasClosed() {
return return
} }
//session.addWannaWriteStat(len(pkt))
for { for {
select { select {
case session.wChan <- pkt: case session.wChan <- pkt:

@ -81,13 +81,13 @@ func (manager *Manager) NewHTTPFlvSubSessionCB(session *httpflv.SubSession) {
group.PullIfNeeded() group.PullIfNeeded()
} }
func (manager *Manager) NewRTMPSubSessionCB(session *rtmp.ServerSession) { func (manager *Manager) NewRTMPSubSessionCB(session *rtmp.SubSession) {
group := manager.getOrCreateGroup(session.AppName, session.StreamName) group := manager.getOrCreateGroup(session.AppName, session.StreamName)
group.AddRTMPSubSession(session) group.AddRTMPSubSession(session)
group.PullIfNeeded() group.PullIfNeeded()
} }
func (manager *Manager) NewRTMPPubSessionCB(session *rtmp.ServerSession) { func (manager *Manager) NewRTMPPubSessionCB(session *rtmp.PubSession) {
group := manager.getOrCreateGroup(session.AppName, session.StreamName) group := manager.getOrCreateGroup(session.AppName, session.StreamName)
group.AddRTMPPubSession(session) group.AddRTMPPubSession(session)
} }

@ -5,29 +5,31 @@ import (
"io" "io"
) )
type Composer struct { // 读取chunk并组织chunk生成message返回给上层
type ChunkComposer struct {
peerChunkSize int peerChunkSize int
csid2stream map[int]*Stream csid2stream map[int]*Stream
} }
func NewComposer() *Composer { func NewChunkComposer() *ChunkComposer {
return &Composer{ return &ChunkComposer{
peerChunkSize: defaultChunkSize, peerChunkSize: defaultChunkSize,
csid2stream: make(map[int]*Stream), csid2stream: make(map[int]*Stream),
} }
} }
func (c *Composer) SetPeerChunkSize(val int) { func (c *ChunkComposer) SetPeerChunkSize(val int) {
c.peerChunkSize = val c.peerChunkSize = val
} }
func (c *Composer) GetPeerChunkSize() int { func (c *ChunkComposer) GetPeerChunkSize() int {
return c.peerChunkSize return c.peerChunkSize
} }
type CompleteMessageCB func(stream *Stream) error type CompleteMessageCB func(stream *Stream) error
func (c *Composer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
bootstrap := make([]byte, 11) bootstrap := make([]byte, 11)
for { for {
@ -65,7 +67,7 @@ func (c *Composer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
stream.header.timestamp = int(bele.BEUint24(bootstrap)) stream.header.timestamp = int(bele.BEUint24(bootstrap))
stream.timestampAbs = stream.header.timestamp stream.timestampAbs = stream.header.timestamp
stream.msgLen = int(bele.BEUint24(bootstrap[3:])) stream.msgLen = int(bele.BEUint24(bootstrap[3:]))
stream.header.msgTypeID = int(bootstrap[6]) stream.header.MsgTypeID = int(bootstrap[6])
stream.header.msgStreamID = int(bele.LEUint32(bootstrap[7:])) stream.header.msgStreamID = int(bele.LEUint32(bootstrap[7:]))
stream.msg.reserve(stream.msgLen) stream.msg.reserve(stream.msgLen)
@ -76,7 +78,7 @@ func (c *Composer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
stream.header.timestamp = int(bele.BEUint24(bootstrap)) stream.header.timestamp = int(bele.BEUint24(bootstrap))
stream.timestampAbs += stream.header.timestamp stream.timestampAbs += stream.header.timestamp
stream.msgLen = int(bele.BEUint24(bootstrap[3:])) stream.msgLen = int(bele.BEUint24(bootstrap[3:]))
stream.header.msgTypeID = int(bootstrap[6]) stream.header.MsgTypeID = int(bootstrap[6])
stream.msg.reserve(stream.msgLen) stream.msg.reserve(stream.msgLen)
case 2: case 2:
@ -125,10 +127,12 @@ func (c *Composer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
stream.msg.produced(neededSize) stream.msg.produced(neededSize)
if stream.msg.len() == stream.msgLen { if stream.msg.len() == stream.msgLen {
if stream.header.msgTypeID == typeidSetChunkSize { if stream.header.MsgTypeID == typeidSetChunkSize {
val := int(bele.BEUint32(stream.msg.buf)) val := int(bele.BEUint32(stream.msg.buf))
c.SetPeerChunkSize(val) c.SetPeerChunkSize(val)
} }
stream.header.csid = csid
stream.header.msgLen = stream.msgLen
if err := cb(stream); err != nil { if err := cb(stream); err != nil {
return err return err
} }
@ -140,7 +144,7 @@ func (c *Composer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
} }
} }
func (c *Composer) getOrCreateStream(csid int) *Stream { func (c *ChunkComposer) getOrCreateStream(csid int) *Stream {
stream, exist := c.csid2stream[csid] stream, exist := c.csid2stream[csid]
if !exist { if !exist {
stream = NewStream() stream = NewStream()

@ -0,0 +1,3 @@
package rtmp
//func Message2Chunks(message []byte, )

@ -0,0 +1,15 @@
package rtmp
type PullSession struct {
*ClientSession
}
func NewPullSession(obs AVMessageObserver, connectTimeout int64) *PullSession {
return &PullSession{
ClientSession: NewClientSession(CSTPullSession, obs, connectTimeout),
}
}
func (s *PullSession) Pull(rawURL string) error {
return s.Do(rawURL)
}

@ -12,16 +12,18 @@ import (
"time" "time"
) )
var chunkSize = 4096
// rtmp客户端类型连接的底层实现 // rtmp客户端类型连接的底层实现
// rtmp包的使用者应该优先使用基于ClientSession实现的PushSession和PullSession // rtmp包的使用者应该优先使用基于ClientSession实现的PushSession和PullSession
type ClientSession struct { type ClientSession struct {
t ClientSessionType t ClientSessionType
obs PullSessionObserver obs AVMessageObserver // only for PullSession
connectTimeout int64 connectTimeout int64
doResultChan chan struct{} doResultChan chan struct{}
errChan chan error errChan chan error
packer *MessagePacker packer *MessagePacker
composer *Composer chunkComposer *ChunkComposer
url *url.URL url *url.URL
tcURL string tcURL string
appName string appName string
@ -43,7 +45,7 @@ const (
) )
// set <obs> if <t> equal CSTPullSession // set <obs> if <t> equal CSTPullSession
func NewClientSession(t ClientSessionType, obs PullSessionObserver, connectTimeout int64) *ClientSession { func NewClientSession(t ClientSessionType, obs AVMessageObserver, connectTimeout int64) *ClientSession {
var uk string var uk string
switch t { switch t {
case CSTPullSession: case CSTPullSession:
@ -61,7 +63,7 @@ func NewClientSession(t ClientSessionType, obs PullSessionObserver, connectTimeo
doResultChan: make(chan struct{}), doResultChan: make(chan struct{}),
errChan: make(chan error), errChan: make(chan error),
packer: NewMessagePacker(), packer: NewMessagePacker(),
composer: NewComposer(), chunkComposer: NewChunkComposer(),
UniqueKey: util.GenUniqueKey(uk), UniqueKey: util.GenUniqueKey(uk),
} }
} }
@ -107,11 +109,11 @@ func (s *ClientSession) WaitLoop() error {
} }
func (s *ClientSession) runReadLoop() error { func (s *ClientSession) runReadLoop() error {
return s.composer.RunLoop(s.rb, s.doMsg) return s.chunkComposer.RunLoop(s.rb, s.doMsg)
} }
func (s *ClientSession) doMsg(stream *Stream) error { func (s *ClientSession) doMsg(stream *Stream) error {
switch stream.header.msgTypeID { switch stream.header.MsgTypeID {
case typeidWinAckSize: case typeidWinAckSize:
fallthrough fallthrough
case typeidBandwidth: case typeidBandwidth:
@ -121,15 +123,15 @@ func (s *ClientSession) doMsg(stream *Stream) error {
case typeidCommandMessageAMF0: case typeidCommandMessageAMF0:
return s.doCommandMessage(stream) return s.doCommandMessage(stream)
case typeidUserControl: case typeidUserControl:
log.Warn("user control message. ignore.") log.Warn("read user control message, ignore. [%s]", s.UniqueKey)
case typeidDataMessageAMF0: case typeidDataMessageAMF0:
return s.doDataMessageAMF0(stream) return s.doDataMessageAMF0(stream)
case typeidAudio: case typeidAudio:
fallthrough fallthrough
case typeidVideo: case typeidVideo:
s.obs.ReadAVMessageCB(stream.header.msgTypeID, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e]) s.obs.ReadAVMessageCB(stream.header, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e])
default: default:
log.Errorf("unknown msg type id. typeid=%d", stream.header.msgTypeID) log.Errorf("read unknown msg type id. [%s] typeid=%d", s.UniqueKey, stream.header)
panic(0) panic(0)
} }
return nil return nil
@ -149,7 +151,7 @@ func (s *ClientSession) doDataMessageAMF0(stream *Stream) error {
log.Error(val) log.Error(val)
log.Error(hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e])) log.Error(hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e]))
} }
s.obs.ReadAVMessageCB(stream.header.msgTypeID, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e]) s.obs.ReadAVMessageCB(stream.header, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e])
return nil return nil
} }
@ -166,13 +168,13 @@ func (s *ClientSession) doCommandMessage(stream *Stream) error {
switch cmd { switch cmd {
case "onBWDone": case "onBWDone":
log.Warn("-----> onBWDone. ignore") log.Warnf("-----> onBWDone. ignore. [%s]", s.UniqueKey)
case "_result": case "_result":
return s.doResultMessage(stream, tid) return s.doResultMessage(stream, tid)
case "onStatus": case "onStatus":
return s.doOnStatusMessage(stream, tid) return s.doOnStatusMessage(stream, tid)
default: default:
log.Errorf("unknown cmd. cmd=%s", cmd) log.Errorf("read unknown cmd. [%s] cmd=%s", s.UniqueKey, cmd)
} }
return nil return nil
@ -194,18 +196,18 @@ func (s *ClientSession) doOnStatusMessage(stream *Stream, tid int) error {
case CSTPushSession: case CSTPushSession:
switch code { switch code {
case "NetStream.Publish.Start": case "NetStream.Publish.Start":
log.Info("-----> onStatus('NetStream.Publish.Start')") log.Infof("-----> onStatus('NetStream.Publish.Start'). [%s]", s.UniqueKey)
s.notifyDoResultSucc() s.notifyDoResultSucc()
default: default:
log.Errorf("unknown code. code=%s", code) log.Errorf("read on status message but code field unknown. [%s] code=%s", s.UniqueKey, code)
} }
case CSTPullSession: case CSTPullSession:
switch code { switch code {
case "NetStream.Play.Start": case "NetStream.Play.Start":
log.Info("-----> onStatus('NetStream.Play.Start')") log.Infof("-----> onStatus('NetStream.Play.Start'). [%s]", s.UniqueKey)
s.notifyDoResultSucc() s.notifyDoResultSucc()
default: default:
log.Errorf("unknown code. code=%s", code) log.Errorf("read on status message but code field unknown. [%s] code=%s", s.UniqueKey, code)
} }
} }
@ -229,12 +231,12 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
} }
switch code { switch code {
case "NetConnection.Connect.Success": case "NetConnection.Connect.Success":
log.Info("-----> _result(\"NetConnection.Connect.Success\")") log.Infof("-----> _result(\"NetConnection.Connect.Success\"). [%s]", s.UniqueKey)
if err := s.packer.writeCreateStream(s.Conn); err != nil { if err := s.packer.writeCreateStream(s.Conn); err != nil {
return err return err
} }
default: default:
log.Errorf("unknown code. code=%s", code) log.Errorf("unknown code. [%s] code=%s", s.UniqueKey, code)
} }
case tidClientCreateStream: case tidClientCreateStream:
err := stream.msg.readNull() err := stream.msg.readNull()
@ -245,7 +247,7 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
if err != nil { if err != nil {
return err return err
} }
log.Info("-----> _result()") log.Infof("-----> _result(). [%s]", s.UniqueKey)
switch s.t { switch s.t {
case CSTPullSession: case CSTPullSession:
if err := s.packer.writePlay(s.Conn, s.streamName, sid); err != nil { if err := s.packer.writePlay(s.Conn, s.streamName, sid); err != nil {
@ -257,7 +259,7 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
} }
} }
default: default:
log.Errorf("unknown tid. tid=%d", tid) log.Errorf("unknown tid. [%s] tid=%d", s.UniqueKey, tid)
} }
return nil return nil
} }
@ -268,17 +270,17 @@ func (s *ClientSession) doProtocolControlMessage(stream *Stream) error {
} }
val := int(bele.BEUint32(stream.msg.buf)) val := int(bele.BEUint32(stream.msg.buf))
switch stream.header.msgTypeID { switch stream.header.MsgTypeID {
case typeidWinAckSize: case typeidWinAckSize:
s.peerWinAckSize = val s.peerWinAckSize = val
log.Infof("-----> Window Acknowledgement Size: %d", s.peerWinAckSize) log.Infof("-----> Window Acknowledgement Size: %d. [%s]", s.peerWinAckSize, s.UniqueKey)
case typeidBandwidth: case typeidBandwidth:
log.Warn("-----> Set Peer Bandwidth. ignore") log.Warnf("-----> Set Peer Bandwidth. ignore. [%s]", s.UniqueKey)
case typeidSetChunkSize: case typeidSetChunkSize:
// composer内部会自动更新peer chunk size. // composer内部会自动更新peer chunk size.
log.Infof("-----> Set Chunk Size %d", val) log.Infof("-----> Set Chunk Size %d. [%s]", val, s.UniqueKey)
default: default:
log.Errorf("unknown msg type id. id=%d", stream.header.msgTypeID) log.Errorf("unknown msg type id. [%s] id=%d", s.UniqueKey, stream.header.MsgTypeID)
} }
return nil return nil
} }

@ -12,6 +12,8 @@ import (
// TODO chef: doc // TODO chef: doc
// TODO chef: HandshakeClient with complex mode
const version = uint8(3) const version = uint8(3)
const ( const (

@ -0,0 +1,26 @@
package rtmp
import (
"bytes"
"github.com/stretchr/testify/assert"
"testing"
)
func TestAll(t *testing.T) {
var err error
var hc HandshakeClient
var hs HandshakeServer
b := &bytes.Buffer{}
err = hc.WriteC0C1(b)
assert.Equal(t, nil, err, "fxxk.")
err = hs.ReadC0C1(b)
assert.Equal(t, nil, err, "fxxk.")
err = hs.WriteS0S1S2(b)
assert.Equal(t, nil, err, "fxxk.")
err = hc.ReadS0S1S2(b)
assert.Equal(t, nil, err, "fxxk.")
err = hc.WriteC2(b)
assert.Equal(t, nil, err, "fxxk.")
err = hs.ReadC2(b)
assert.Equal(t, nil, err, "fxxk.")
}

@ -1,23 +0,0 @@
package rtmp
var chunkSize = 4096
type PullSessionObserver interface {
// @param t: 8 audio, 9 video, 18 meta
// after cb, PullSession will use <message>
ReadAVMessageCB(t int, timestampAbs int, message []byte)
}
type PullSession struct {
*ClientSession
}
func NewPullSession(obs PullSessionObserver, connectTimeout int64) *PullSession {
return &PullSession{
ClientSession: NewClientSession(CSTPullSession, obs, connectTimeout),
}
}
func (s *PullSession) Pull(rawURL string) error {
return s.Do(rawURL)
}

@ -6,25 +6,31 @@ import (
var rtmpErr = errors.New("rtmp error") var rtmpErr = errors.New("rtmp error")
var csidProtocolControl = 2 const (
var csidOverConnection = 3 csidProtocolControl = 2
var csidOverStream = 5 csidOverConnection = 3
csidOverStream = 5
var typeidSetChunkSize = 1 )
var typeidUserControl = 4
var typeidWinAckSize = 5 const (
var typeidBandwidth = 6 typeidSetChunkSize = 1
var typeidAudio = 8 typeidUserControl = 4
var typeidVideo = 9 typeidWinAckSize = 5
var typeidDataMessageAMF0 = 18 // meta typeidBandwidth = 6
var typeidCommandMessageAMF0 = 20 typeidAudio = 8
typeidVideo = 9
var tidClientConnect = 1 typeidDataMessageAMF0 = 18 // meta
var tidClientCreateStream = 2 typeidCommandMessageAMF0 = 20
var tidClientPlay = 3 )
var tidClientPublish = 3
const (
var maxTimestampInMessageHeader = 0xFFFFFF tidClientConnect = 1
tidClientCreateStream = 2
tidClientPlay = 3
tidClientPublish = 3
)
const maxTimestampInMessageHeader = 0xFFFFFF
var defaultChunkSize = 128 var defaultChunkSize = 128
@ -36,3 +42,11 @@ var peerBandwidth = 5000000
var localChunkSize = 4096 var localChunkSize = 4096
var msid = 1 var msid = 1
// 接收到音视频类型数据时的回调函数。目前被PullSession以及PubSession使用。
type AVMessageObserver interface {
// @param header:
// @param timestampAbs: 绝对时间戳
// @param message: 不包含头内容。回调结束后PullSession会继续使用这块内存。
ReadAVMessageCB(header Header, timestampAbs int, message []byte)
}

@ -6,8 +6,8 @@ import (
) )
type ServerObserver interface { type ServerObserver interface {
NewRTMPPubSessionCB(session *ServerSession) NewRTMPPubSessionCB(session *PubSession)
NewRTMPSubSessionCB(session *ServerSession) NewRTMPSubSessionCB(session *SubSession)
} }
type Server struct { type Server struct {
@ -52,14 +52,14 @@ func (server *Server) handleConnect(conn net.Conn) {
session.RunLoop() session.RunLoop()
} }
func (server *Server) NewRTMPPubSessionCB(session *ServerSession) { func (server *Server) NewRTMPPubSessionCB(session *PubSession) {
server.obs.NewRTMPPubSessionCB(session) server.obs.NewRTMPPubSessionCB(session)
} }
func (server *Server) NewRTMPSubSessionCB(session *ServerSession) { func (server *Server) NewRTMPSubSessionCB(session *SubSession) {
server.obs.NewRTMPSubSessionCB(session) server.obs.NewRTMPSubSessionCB(session)
} }
func (server *Server) ReadAVMessageCB(t int, timestampAbs int, message []byte) { func (server *Server) ReadAVMessageCB(header Header, timestampAbs int, message []byte) {
} }

@ -0,0 +1,15 @@
package rtmp
type PubSession struct {
*ServerSession
}
func NewPubSession(ss *ServerSession) *PubSession {
return &PubSession{
ss,
}
}
func (s *ServerSession) SetAVMessageObserver(obs AVMessageObserver) {
s.avObs = obs
}

@ -6,17 +6,16 @@ import (
"github.com/q191201771/lal/log" "github.com/q191201771/lal/log"
"github.com/q191201771/lal/util" "github.com/q191201771/lal/util"
"net" "net"
"strings"
) )
type ServerSessionObserver interface { // TODO chef: PubSession SubSession
NewRTMPPubSessionCB(session *ServerSession)
NewRTMPSubSessionCB(session *ServerSession) // TODO chef: 没有进化成Pub Sub时的超时释放
}
type AVMessageObserver interface { type ServerSessionObserver interface {
// @param t: 8 audio, 9 video, 18 meta NewRTMPPubSessionCB(session *PubSession) // 上层代码应该在这个事件回调中注册音视频数据的监听
// after cb, PullSession will use <message> NewRTMPSubSessionCB(session *SubSession)
ReadAVMessageCB(t int, timestampAbs int, message []byte)
} }
type ServerSessionType int type ServerSessionType int
@ -32,27 +31,29 @@ type ServerSession struct {
StreamName string StreamName string
UniqueKey string UniqueKey string
obs ServerSessionObserver obs ServerSessionObserver
avObs AVMessageObserver conn net.Conn
conn net.Conn rb *bufio.Reader
rb *bufio.Reader wb *bufio.Writer
wb *bufio.Writer t ServerSessionType
t ServerSessionType hs HandshakeServer
hs HandshakeServer chunkComposer *ChunkComposer
composer *Composer packer *MessagePacker
packer *MessagePacker
// for PubSession
avObs AVMessageObserver
} }
func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession { func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession {
return &ServerSession{ return &ServerSession{
obs: obs, obs: obs,
conn: conn, conn: conn,
rb: bufio.NewReaderSize(conn, readBufSize), rb: bufio.NewReaderSize(conn, readBufSize),
wb: bufio.NewWriterSize(conn, writeBufSize), wb: bufio.NewWriterSize(conn, writeBufSize),
t: ServerSessionTypeInit, t: ServerSessionTypeInit,
composer: NewComposer(), chunkComposer: NewChunkComposer(),
packer: NewMessagePacker(), packer: NewMessagePacker(),
UniqueKey: util.GenUniqueKey("RTMPSERVER"), UniqueKey: util.GenUniqueKey("RTMPSERVER"),
} }
} }
@ -60,11 +61,11 @@ func (s *ServerSession) RunLoop() error {
if err := s.handshake(); err != nil { if err := s.handshake(); err != nil {
return err return err
} }
return s.composer.RunLoop(s.rb, s.doMsg) return s.chunkComposer.RunLoop(s.rb, s.doMsg)
} }
func (s *ServerSession) SetAVMessageObserver(obs AVMessageObserver) { func (s *ServerSession) WriteMessage() {
s.avObs = obs
} }
func (s *ServerSession) handshake() error { func (s *ServerSession) handshake() error {
@ -82,7 +83,7 @@ func (s *ServerSession) handshake() error {
func (s *ServerSession) doMsg(stream *Stream) error { func (s *ServerSession) doMsg(stream *Stream) error {
//log.Debugf("%d %d %v", stream.header.msgTypeID, stream.msgLen, stream.header) //log.Debugf("%d %d %v", stream.header.msgTypeID, stream.msgLen, stream.header)
switch stream.header.msgTypeID { switch stream.header.MsgTypeID {
case typeidSetChunkSize: case typeidSetChunkSize:
// TODO chef: // TODO chef:
case typeidCommandMessageAMF0: case typeidCommandMessageAMF0:
@ -92,13 +93,22 @@ func (s *ServerSession) doMsg(stream *Stream) error {
case typeidAudio: case typeidAudio:
fallthrough fallthrough
case typeidVideo: case typeidVideo:
s.avObs.ReadAVMessageCB(stream.header.msgTypeID, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e]) if s.t != ServerSessionTypePub {
log.Error("read audio/video message but server session not pub type.")
return rtmpErr
}
s.avObs.ReadAVMessageCB(stream.header, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e])
} }
return nil return nil
} }
func (s *ServerSession) doDataMessageAMF0(stream *Stream) error { func (s *ServerSession) doDataMessageAMF0(stream *Stream) error {
if s.t != ServerSessionTypePub {
log.Error("read audio/video message but server session not pub type.")
return rtmpErr
}
val, err := stream.msg.peekStringWithType() val, err := stream.msg.peekStringWithType()
if err != nil { if err != nil {
return err return err
@ -127,7 +137,7 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error {
return nil return nil
} }
s.avObs.ReadAVMessageCB(stream.header.msgTypeID, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e]) s.avObs.ReadAVMessageCB(stream.header, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e])
return nil return nil
} }
@ -219,7 +229,10 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) error {
return err return err
} }
s.t = ServerSessionTypePub s.t = ServerSessionTypePub
s.obs.NewRTMPPubSessionCB(s) newUniqueKey := strings.Replace(s.UniqueKey, "RTMPSERVER", "RTMPPUB", 1)
log.Infof("session unique key upgrade. %s -> %s", s.UniqueKey, newUniqueKey)
s.UniqueKey = newUniqueKey
s.obs.NewRTMPPubSessionCB(NewPubSession(s))
return nil return nil
} }
@ -238,6 +251,9 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) error {
return err return err
} }
s.t = ServerSessionTypeSub s.t = ServerSessionTypeSub
s.obs.NewRTMPSubSessionCB(s) newUniqueKey := strings.Replace(s.UniqueKey, "RTMPSERVER", "RTMPSUB", 1)
log.Infof("session unique key upgrade. %s -> %s", s.UniqueKey, newUniqueKey)
s.UniqueKey = newUniqueKey
s.obs.NewRTMPSubSessionCB(NewSubSession(s))
return nil return nil
} }

@ -0,0 +1,11 @@
package rtmp
type SubSession struct {
*ServerSession
}
func NewSubSession(ss *ServerSession) *SubSession {
return &SubSession{
ss,
}
}

@ -5,11 +5,12 @@ import "github.com/q191201771/lal/log"
var initMsgLen = 4096 var initMsgLen = 4096
type Header struct { type Header struct {
//csid int csid int
//msgLen int msgLen int
timestamp int timestamp int // NOTICE 是header中的时间戳可能是绝对的也可能是相对的。
msgTypeID int // 如果需要绝对时间戳应该使用Stream中的timestampAbs
MsgTypeID int // 8 audio 9 video 18 metadata
msgStreamID int msgStreamID int
} }

Loading…
Cancel
Save