From 89af181710e2d9d44a6cec40fc3b6cecc599063c Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Mon, 2 Sep 2019 20:07:57 +0800 Subject: [PATCH] 1. stream.msgLen -> stream.header.MsgLen 2. rtmp stream name with url raw query 3. --- app/flvfile2rtmppush/flvfile2rtmppush.go | 139 +++++++++-------------- app/rtmppull/rtmppull.go | 4 +- pkg/httpflv/client_pull_session.go | 7 +- pkg/rtmp/chunk_composer.go | 22 ++-- pkg/rtmp/client_pull_session.go | 2 +- pkg/rtmp/client_push_session.go | 2 +- pkg/rtmp/client_session.go | 74 ++++++------ pkg/rtmp/group.go | 2 +- pkg/rtmp/stream.go | 1 - 9 files changed, 111 insertions(+), 142 deletions(-) diff --git a/app/flvfile2rtmppush/flvfile2rtmppush.go b/app/flvfile2rtmppush/flvfile2rtmppush.go index 45a93de..f983071 100644 --- a/app/flvfile2rtmppush/flvfile2rtmppush.go +++ b/app/flvfile2rtmppush/flvfile2rtmppush.go @@ -6,131 +6,98 @@ import ( "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/nezha/pkg/errors" "github.com/q191201771/nezha/pkg/log" + "io" "os" "time" ) // 将flv文件通过rtmp协议推送至rtmp服务器 // +// -r 表示当文件推送完毕后,是否循环推送 +// // Usage: -// ./bin/flvfile2rtmppush -i /tmp/test.flv -o rtmp://push.xxx.com/live/testttt +// ./bin/flvfile2rtmppush -r 1 -i /tmp/test.flv -o rtmp://push.xxx.com/live/testttt func main() { - flvFileName, rtmpPushURL := parseFlag() + var err error - var ffr httpflv.FlvFileReader - err := ffr.Open(flvFileName) - errors.PanicIfErrorOccur(err) - defer ffr.Dispose() - log.Infof("open succ.") - - flvHeader, err := ffr.ReadFlvHeader() - errors.PanicIfErrorOccur(err) - log.Infof("read flv header succ. %v", flvHeader) + flvFileName, rtmpPushURL, isRecursive := parseFlag() ps := rtmp.NewPushSession(5000) err = ps.Push(rtmpPushURL) errors.PanicIfErrorOccur(err) log.Infof("push succ.") - var prevTS uint32 - firstA := true - firstV := true - //var aPrevH *rtmp.Header - //var vPrevH *rtmp.Header + var baseTS int + var prevTS int - //for i := 0; i < 1000*1000; i++ { for { - tag, err := ffr.ReadTag() + var ffr httpflv.FlvFileReader + err = ffr.Open(flvFileName) errors.PanicIfErrorOccur(err) - //log.Infof("tag: %+v %v", tag.Header, tag.Raw[11:]) - //log.Infof("tag: %+v %d", tag.Header, len(tag.Raw)) - - // TODO chef: 转换代码放入lal某个包中 - var h rtmp.Header - h.MsgLen = int(tag.Header.DataSize) //len(tag.Raw)-httpflv.TagHeaderSize - h.Timestamp = int(tag.Header.Timestamp) - h.MsgTypeID = int(tag.Header.T) - h.MsgStreamID = rtmp.MSID1 - switch tag.Header.T { - case httpflv.TagTypeMetadata: - h.CSID = rtmp.CSIDAMF - case httpflv.TagTypeAudio: - h.CSID = rtmp.CSIDAudio - case httpflv.TagTypeVideo: - h.CSID = rtmp.CSIDVideo - } + log.Infof("open succ.") - // 把第一个音频和视频的时间戳改成0 - if tag.Header.T == httpflv.TagTypeAudio && firstA { - h.Timestamp = 0 - firstA = false - } - if tag.Header.T == httpflv.TagTypeVideo && firstV { - h.Timestamp = 0 - firstV = false - } - - //var chunks []byte - //if tag.Header.T == httpflv.TagTypeVideo { - // chunks = rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, aPrevH, rtmp.LocalChunkSize) - // aPrevH = &h - //} - //if tag.Header.T == httpflv.TagTypeVideo { - // chunks = rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, vPrevH, rtmp.LocalChunkSize) - // vPrevH = &h - //} - //if tag.Header.T == httpflv.TagTypeVideo { - // chunks = rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, nil, rtmp.LocalChunkSize) - //} - chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, rtmp.LocalChunkSize) - - // 第一个包直接发送 - if prevTS == 0 { - err = ps.TmpWrite(chunks) - errors.PanicIfErrorOccur(err) - prevTS = tag.Header.Timestamp - continue - } + flvHeader, err := ffr.ReadFlvHeader() + errors.PanicIfErrorOccur(err) + log.Infof("read flv header succ. %v", flvHeader) - // 相等或回退了直接发送 - if tag.Header.Timestamp <= prevTS { - err = ps.TmpWrite(chunks) + for { + tag, err := ffr.ReadTag() + if err == io.EOF { + log.Info("EOF") + break + } errors.PanicIfErrorOccur(err) - prevTS = tag.Header.Timestamp - continue - } - if tag.Header.Timestamp > prevTS { - diff := tag.Header.Timestamp - prevTS + // TODO chef: 转换代码放入lal某个包中 + var h rtmp.Header + h.MsgLen = int(tag.Header.DataSize) //len(tag.Raw)-httpflv.TagHeaderSize + h.Timestamp = int(tag.Header.Timestamp) + int(baseTS) + h.MsgTypeID = int(tag.Header.T) + h.MsgStreamID = rtmp.MSID1 + switch tag.Header.T { + case httpflv.TagTypeMetadata: + h.CSID = rtmp.CSIDAMF + case httpflv.TagTypeAudio: + h.CSID = rtmp.CSIDAudio + case httpflv.TagTypeVideo: + h.CSID = rtmp.CSIDVideo + } - // 跳跃超过了30秒,直接发送 - if diff > 30000 { - err = ps.TmpWrite(chunks) - errors.PanicIfErrorOccur(err) - prevTS = tag.Header.Timestamp - continue + var diff int + if h.Timestamp >= prevTS { + diff = int(h.Timestamp) - prevTS + } else { + h.Timestamp = prevTS } - // 睡眠后发送,睡眠时长为时间戳间隔 + chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, rtmp.LocalChunkSize) + + log.Debugf("before send. diff=%d, ts=%d, prevTS=%d", diff, h.Timestamp, prevTS) time.Sleep(time.Duration(diff) * time.Millisecond) + log.Debug("send") err = ps.TmpWrite(chunks) errors.PanicIfErrorOccur(err) - prevTS = tag.Header.Timestamp - continue + prevTS = h.Timestamp } - panic("should not reach here.") + baseTS = prevTS + 1 + ffr.Dispose() + + if !isRecursive { + break + } } } -func parseFlag() (string, string) { +func parseFlag() (string, string, bool) { i := flag.String("i", "", "specify flv file") o := flag.String("o", "", "specify rtmp push url") + r := flag.Bool("r", false, "recursive push if reach end of file") flag.Parse() if *i == "" || *o == "" { flag.Usage() os.Exit(1) } - return *i, *o + return *i, *o, *r } diff --git a/app/rtmppull/rtmppull.go b/app/rtmppull/rtmppull.go index 2bde883..597c1c5 100644 --- a/app/rtmppull/rtmppull.go +++ b/app/rtmppull/rtmppull.go @@ -6,7 +6,6 @@ import ( "github.com/q191201771/nezha/pkg/errors" "github.com/q191201771/nezha/pkg/log" "os" - "time" ) type Obs struct { @@ -22,7 +21,8 @@ func main() { session := rtmp.NewPullSession(obs, 2000) err := session.Pull(url) errors.PanicIfErrorOccur(err) - time.Sleep(1 * time.Hour) + err := session.WaitLoop() + errors.PanicIfErrorOccur(err) } func parseFlag() string { diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 2574f48..d721248 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -90,12 +90,7 @@ func (session *PullSession) Connect(rawURL string) error { } // # 建立连接 - var conn net.Conn - if session.config.ConnectTimeoutMS == 0 { - conn, err = net.Dial("tcp", session.addr) - } else { - conn, err = net.DialTimeout("tcp", session.addr, time.Duration(session.config.ConnectTimeoutMS)*time.Millisecond) - } + conn, err := net.DialTimeout("tcp", session.addr, time.Duration(session.config.ConnectTimeoutMS)*time.Millisecond) if err != nil { return err } diff --git a/pkg/rtmp/chunk_composer.go b/pkg/rtmp/chunk_composer.go index 28fa290..a3b6dec 100644 --- a/pkg/rtmp/chunk_composer.go +++ b/pkg/rtmp/chunk_composer.go @@ -69,11 +69,11 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { // 包头中为绝对时间戳 stream.header.Timestamp = int(bele.BEUint24(bootstrap)) stream.timestampAbs = stream.header.Timestamp - stream.msgLen = int(bele.BEUint24(bootstrap[3:])) + stream.header.MsgLen = int(bele.BEUint24(bootstrap[3:])) stream.header.MsgTypeID = int(bootstrap[6]) stream.header.MsgStreamID = int(bele.LEUint32(bootstrap[7:])) - stream.msg.reserve(stream.msgLen) + stream.msg.reserve(stream.header.MsgLen) case 1: if _, err := io.ReadAtLeast(reader, bootstrap[:7], 7); err != nil { return err @@ -81,10 +81,10 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { // 包头中为相对时间戳 stream.header.Timestamp = int(bele.BEUint24(bootstrap)) stream.timestampAbs += stream.header.Timestamp - stream.msgLen = int(bele.BEUint24(bootstrap[3:])) + stream.header.MsgLen = int(bele.BEUint24(bootstrap[3:])) stream.header.MsgTypeID = int(bootstrap[6]) - stream.msg.reserve(stream.msgLen) + stream.msg.reserve(stream.header.MsgLen) case 2: if _, err := io.ReadAtLeast(reader, bootstrap[:3], 3); err != nil { return err @@ -119,14 +119,13 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { } } //stream.header.CSID = csid - //stream.header.MsgLen = stream.msgLen //log.Debugf("CHEFGREPME tag1 fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.msgLen, stream.timestampAbs) var neededSize int - if stream.msgLen <= c.peerChunkSize { - neededSize = stream.msgLen + if stream.header.MsgLen <= c.peerChunkSize { + neededSize = stream.header.MsgLen } else { - neededSize = stream.msgLen - stream.msg.len() + neededSize = stream.header.MsgLen - stream.msg.len() if neededSize > c.peerChunkSize { neededSize = c.peerChunkSize } @@ -138,7 +137,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { } stream.msg.produced(neededSize) - if stream.msg.len() == stream.msgLen { + if stream.msg.len() == stream.header.MsgLen { // 对端设置了chunk size if stream.header.MsgTypeID == typeidSetChunkSize { val := int(bele.BEUint32(stream.msg.buf)) @@ -146,14 +145,13 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error { } stream.header.CSID = csid - stream.header.MsgLen = stream.msgLen - //log.Debugf("CHEFGREPME %+v %d %d", stream.header, stream.timestampAbs, stream.msgLen) + //log.Debugf("CHEFGREPME %+v %d %d", stream.header, stream.timestampAbs, stream.header.MsgLen) if err := cb(stream); err != nil { return err } stream.msg.clear() } - if stream.msg.len() > stream.msgLen { + if stream.msg.len() > stream.header.MsgLen { panic(0) } } diff --git a/pkg/rtmp/client_pull_session.go b/pkg/rtmp/client_pull_session.go index 31bf8ba..88acdd8 100644 --- a/pkg/rtmp/client_pull_session.go +++ b/pkg/rtmp/client_pull_session.go @@ -8,7 +8,7 @@ type PullSession struct { *ClientSession } -func NewPullSession(obs PullSessionObserver, connectTimeout int64) *PullSession { +func NewPullSession(obs PullSessionObserver, connectTimeout int) *PullSession { return &PullSession{ ClientSession: NewClientSession(CSTPullSession, obs, connectTimeout), } diff --git a/pkg/rtmp/client_push_session.go b/pkg/rtmp/client_push_session.go index a82c8d4..57d7592 100644 --- a/pkg/rtmp/client_push_session.go +++ b/pkg/rtmp/client_push_session.go @@ -4,7 +4,7 @@ type PushSession struct { *ClientSession } -func NewPushSession(connectTimeout int64) *PushSession { +func NewPushSession(connectTimeout int) *PushSession { return &PushSession{ ClientSession: NewClientSession(CSTPushSession, nil, connectTimeout), } diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 4319971..c8c6836 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -17,19 +17,20 @@ import ( type ClientSession struct { UniqueKey string - t ClientSessionType - obs PullSessionObserver // only for PullSession - connectTimeout int64 - doResultChan chan struct{} - errChan chan error - packer *MessagePacker - chunkComposer *ChunkComposer - url *url.URL - tcURL string - appName string - streamName string - hs HandshakeClient - peerWinAckSize int + t ClientSessionType + obs PullSessionObserver // only for PullSession + stageCB StageCB + connectTimeoutMS int + doResultChan chan struct{} + errChan chan error + packer *MessagePacker + chunkComposer *ChunkComposer + url *url.URL + tcURL string + appName string + streamName string + hs HandshakeClient + peerWinAckSize int Conn connection.Connection //Conn net.Conn @@ -45,8 +46,19 @@ const ( CSTPushSession ) -// set if equal CSTPullSession -func NewClientSession(t ClientSessionType, obs PullSessionObserver, connectTimeout int64) *ClientSession { +type ClientSessionStage int + +const ( + CSSConnConnectStart ClientSessionStage = iota + CSSConnConnectSucc +) + +type StageCB func(stage ClientSessionStage) + +// @param t: session的类型,只能是推或者拉 +// @param obs: 回调结束后,buffer会被重复使用 +// @param connectTimeoutMS: 建立连接超时,单位毫秒 +func NewClientSession(t ClientSessionType, obs PullSessionObserver, connectTimeoutMS int) *ClientSession { var uk string switch t { case CSTPullSession: @@ -56,19 +68,19 @@ func NewClientSession(t ClientSessionType, obs PullSessionObserver, connectTimeo } return &ClientSession{ - t: t, - obs: obs, - connectTimeout: connectTimeout, - doResultChan: make(chan struct{}), - errChan: make(chan error), - packer: NewMessagePacker(), - chunkComposer: NewChunkComposer(), - UniqueKey: unique.GenUniqueKey(uk), - wChan: make(chan []byte, wChanSize), + t: t, + obs: obs, + connectTimeoutMS: connectTimeoutMS, + doResultChan: make(chan struct{}), + errChan: make(chan error), + packer: NewMessagePacker(), + chunkComposer: NewChunkComposer(), + UniqueKey: unique.GenUniqueKey(uk), + wChan: make(chan []byte, wChanSize), } } -// 阻塞直到收到服务端的 publish start / play start 信令 或者超时 +// 阻塞直到收到服务端返回的 publish start / play start 信令 或者超时 func (s *ClientSession) Do(rawURL string) error { if err := s.parseURL(rawURL); err != nil { return err @@ -91,16 +103,13 @@ func (s *ClientSession) Do(rawURL string) error { s.errChan <- s.runReadLoop() }() - t := time.NewTimer(time.Duration(s.connectTimeout) * time.Second) - var ret error select { case <-s.doResultChan: break - case <-t.C: - ret = rtmpErr + case ret = <-s.errChan: + break } - t.Stop() return ret } @@ -310,7 +319,8 @@ func (s *ClientSession) parseURL(rawURL string) error { return rtmpErr } s.appName = strs[0] - s.streamName = strs[1] + // 有的rtmp服务器会使用url后面的参数(比如说用于鉴权),这里把它带上 + s.streamName = strs[1] + "?" + s.url.RawQuery log.Debugf("%s %s %s %+v", s.tcURL, s.appName, s.streamName, *s.url) return nil @@ -339,7 +349,7 @@ func (s *ClientSession) tcpConnect() error { } var conn net.Conn - if conn, err = net.Dial("tcp", addr); err != nil { + if conn, err = net.DialTimeout("tcp", addr, time.Duration(s.connectTimeoutMS)*time.Millisecond); err != nil { return err } diff --git a/pkg/rtmp/group.go b/pkg/rtmp/group.go index 5d3c2e6..b714bcb 100644 --- a/pkg/rtmp/group.go +++ b/pkg/rtmp/group.go @@ -89,7 +89,7 @@ func (group *Group) DelSubSession(session *ServerSession) { } func (group *Group) Pull(addr string, connectTimeout int64) { - group.pullSession = NewPullSession(group, connectTimeout) + group.pullSession = NewPullSession(group, int(connectTimeout)) defer func() { group.mutex.Lock() diff --git a/pkg/rtmp/stream.go b/pkg/rtmp/stream.go index 9012764..55f5c2b 100644 --- a/pkg/rtmp/stream.go +++ b/pkg/rtmp/stream.go @@ -24,7 +24,6 @@ type StreamMsg struct { type Stream struct { header Header - msgLen int // TODO chef: needed? dup with Header's timestampAbs int msg StreamMsg