From 8136c5b620ff60a50ce71b2c5fff91e92763c27e Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Tue, 3 Sep 2019 16:01:34 +0800 Subject: [PATCH] =?UTF-8?q?1.=20httpflv=20=E6=8A=BD=E8=B1=A1=E5=87=BA=20re?= =?UTF-8?q?adTag=20=E5=87=BD=E6=95=B0=202.=20rtmp.ClientSession=20?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=20typeidAck?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/flvfile2rtmppush/flvfile2rtmppush.go | 19 +++++++++++-------- pkg/httpflv/client_pull_session.go | 18 +----------------- pkg/httpflv/flv_file_reader.go | 21 ++++----------------- pkg/httpflv/tag.go | 20 ++++++++++++++++++++ pkg/rtmp/client_session.go | 16 ++++++++++++---- pkg/rtmp/rtmp.go | 1 + 6 files changed, 49 insertions(+), 46 deletions(-) diff --git a/app/flvfile2rtmppush/flvfile2rtmppush.go b/app/flvfile2rtmppush/flvfile2rtmppush.go index 9671475..28298c9 100644 --- a/app/flvfile2rtmppush/flvfile2rtmppush.go +++ b/app/flvfile2rtmppush/flvfile2rtmppush.go @@ -30,18 +30,21 @@ func main() { }) err = ps.Push(rtmpPushURL) errors.PanicIfErrorOccur(err) - log.Infof("push succ.") + log.Infof("push succ. url=%s", rtmpPushURL) var totalBaseTS uint32 var prevTS uint32 var hasReadThisBaseTS bool var thisBaseTS uint32 - for { + for i := 0; ; i++ { + log.Infof(" > round. i=%d, totalBaseTS=%d, prevTS=%d, thisBaseTS=%d", + i, totalBaseTS, prevTS, thisBaseTS) + var ffr httpflv.FlvFileReader err = ffr.Open(flvFileName) errors.PanicIfErrorOccur(err) - log.Infof("open succ.") + log.Infof("open succ. filename=%s", flvFileName) flvHeader, err := ffr.ReadFlvHeader() errors.PanicIfErrorOccur(err) @@ -75,7 +78,7 @@ func main() { if tag.Header.T == httpflv.TagTypeMetadata { if totalBaseTS == 0 { // 第一个metadata直接发送 - log.Debugf("CHEFERASEME write metadata.") + //log.Debugf("CHEFERASEME write metadata.") chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, rtmp.LocalChunkSize) err = ps.TmpWrite(chunks) errors.PanicIfErrorOccur(err) @@ -87,11 +90,11 @@ func main() { if hasReadThisBaseTS { // 之前已经读到了这轮读文件的base值,ts要减去base - log.Debugf("CHEFERASEME %+v %d %d %d.", tag.Header, tag.Header.Timestamp, thisBaseTS, totalBaseTS) + //log.Debugf("CHEFERASEME %+v %d %d %d.", tag.Header, tag.Header.Timestamp, thisBaseTS, totalBaseTS) h.Timestamp = tag.Header.Timestamp - thisBaseTS + totalBaseTS } else { // 设置base,ts设置为上一轮读文件的值 - log.Debugf("CHEFERASEME %+v %d %d %d.", tag.Header, tag.Header.Timestamp, thisBaseTS, totalBaseTS) + //log.Debugf("CHEFERASEME %+v %d %d %d.", tag.Header, tag.Header.Timestamp, thisBaseTS, totalBaseTS) thisBaseTS = tag.Header.Timestamp h.Timestamp = totalBaseTS hasReadThisBaseTS = true @@ -107,9 +110,9 @@ func main() { chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, rtmp.LocalChunkSize) - log.Debugf("before send. diff=%d, ts=%d, prevTS=%d", diff, h.Timestamp, prevTS) + //log.Debugf("before send. diff=%d, ts=%d, prevTS=%d", diff, h.Timestamp, prevTS) time.Sleep(time.Duration(diff) * time.Millisecond) - log.Debug("send") + //log.Debug("send") err = ps.TmpWrite(chunks) errors.PanicIfErrorOccur(err) prevTS = h.Timestamp diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index d721248..f10c61a 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -139,23 +139,7 @@ func (session *PullSession) ReadFlvHeader() ([]byte, error) { } func (session *PullSession) ReadTag() (*Tag, error) { - rawHeader := make([]byte, TagHeaderSize) - if _, err := session.Conn.ReadAtLeast(rawHeader, TagHeaderSize); err != nil { - return nil, err - } - header := parseTagHeader(rawHeader) - - needed := int(header.DataSize) + prevTagFieldSize - tag := &Tag{} - tag.Header = header - tag.Raw = make([]byte, TagHeaderSize+needed) - copy(tag.Raw, rawHeader) - - if _, err := session.Conn.ReadAtLeast(tag.Raw[TagHeaderSize:], needed); err != nil { - return nil, err - } - - return tag, nil + return readTag(session.Conn) } func (session *PullSession) runReadLoop(readFlvTagCB ReadFlvTagCB) error { diff --git a/pkg/httpflv/flv_file_reader.go b/pkg/httpflv/flv_file_reader.go index 85f197b..cb6f0c1 100644 --- a/pkg/httpflv/flv_file_reader.go +++ b/pkg/httpflv/flv_file_reader.go @@ -1,6 +1,8 @@ package httpflv -import "os" +import ( + "os" +) type FlvFileReader struct { fp *os.File @@ -18,22 +20,7 @@ func (ffr *FlvFileReader) ReadFlvHeader() ([]byte, error) { } func (ffr *FlvFileReader) ReadTag() (*Tag, error) { - var err error - h, rawHeader, err := readTagHeader(ffr.fp) - if err != nil { - return nil, err - } - needed := int(h.DataSize) + prevTagFieldSize - tag := &Tag{} - tag.Header = h - tag.Raw = make([]byte, TagHeaderSize+needed) - copy(tag.Raw, rawHeader) - - _, err = ffr.fp.Read(tag.Raw[TagHeaderSize:]) - if err != nil { - return nil, err - } - return tag, nil + return readTag(ffr.fp) } func (ffr *FlvFileReader) Dispose() { diff --git a/pkg/httpflv/tag.go b/pkg/httpflv/tag.go index b034671..3c912d2 100644 --- a/pkg/httpflv/tag.go +++ b/pkg/httpflv/tag.go @@ -133,6 +133,26 @@ func readTagHeader(rd io.Reader) (h TagHeader, rawHeader []byte, err error) { return } +func readTag(rd io.Reader) (*Tag, error) { + rawHeader := make([]byte, TagHeaderSize) + if _, err := io.ReadAtLeast(rd, rawHeader, TagHeaderSize); err != nil { + return nil, err + } + header := parseTagHeader(rawHeader) + + needed := int(header.DataSize) + prevTagFieldSize + tag := &Tag{} + tag.Header = header + tag.Raw = make([]byte, TagHeaderSize+needed) + copy(tag.Raw, rawHeader) + + if _, err := io.ReadAtLeast(rd, tag.Raw[TagHeaderSize:], needed); err != nil { + return nil, err + } + + return tag, nil +} + func (tag *Tag) cloneTag() *Tag { res := &Tag{} res.Header = tag.Header diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 18b5dff..3f126f4 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -45,7 +45,7 @@ const ( // 单位毫秒,如果为0,则没有超时 type ClientSessionTimeout struct { ConnectTimeoutMS int // 建立连接超时 - DoTimeoutMS int // 从发起连接到收到publish或play信令结果的超时 + DoTimeoutMS int // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时 ReadAVTimeoutMS int // 读取音视频数据的超时 WriteAVTimeoutMS int // 发送音视频数据的超时 } @@ -150,21 +150,29 @@ func (s *ClientSession) doMsg(stream *Stream) error { return s.doProtocolControlMessage(stream) case typeidCommandMessageAMF0: return s.doCommandMessage(stream) - case typeidUserControl: - log.Warnf("read user control message, ignore. [%s]", s.UniqueKey) case TypeidDataMessageAMF0: return s.doDataMessageAMF0(stream) + case typeidAck: + return s.doAck(stream) + case typeidUserControl: + log.Warnf("read user control message, ignore. [%s]", s.UniqueKey) case TypeidAudio: fallthrough case TypeidVideo: s.obs.ReadRTMPAVMsgCB(stream.header, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e]) default: - log.Errorf("read unknown msg type id. [%s] typeid=%d", s.UniqueKey, stream.header) + log.Errorf("read unknown msg type id. [%s] typeid=%+v", s.UniqueKey, stream.header) panic(0) } return nil } +func (s *ClientSession) doAck(stream *Stream) error { + seqNum := bele.BEUint32(stream.msg.buf[stream.msg.b: stream.msg.e]) + log.Infof("-----> Acknowledgement. [%s] ignore. sequence number=%d.", s.UniqueKey, seqNum) + return nil +} + func (s *ClientSession) doDataMessageAMF0(stream *Stream) error { val, err := stream.msg.peekStringWithType() if err != nil { diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/rtmp.go index ae46fda..9417f2f 100644 --- a/pkg/rtmp/rtmp.go +++ b/pkg/rtmp/rtmp.go @@ -23,6 +23,7 @@ const ( TypeidVideo = 9 TypeidDataMessageAMF0 = 18 // meta typeidSetChunkSize = 1 + typeidAck = 3 typeidUserControl = 4 typeidWinAckSize = 5 typeidBandwidth = 6