1. httpflv 抽象出 readTag 函数 2. rtmp.ClientSession 读取 typeidAck

pull/200/head
q191201771 5 years ago
parent 994d9d2e14
commit 8136c5b620

@ -30,18 +30,21 @@ func main() {
})
err = ps.Push(rtmpPushURL)
errors.PanicIfErrorOccur(err)
log.Infof("push succ.")
log.Infof("push succ. url=%s", rtmpPushURL)
var totalBaseTS uint32
var prevTS uint32
var hasReadThisBaseTS bool
var thisBaseTS uint32
for {
for i := 0; ; i++ {
log.Infof(" > round. i=%d, totalBaseTS=%d, prevTS=%d, thisBaseTS=%d",
i, totalBaseTS, prevTS, thisBaseTS)
var ffr httpflv.FlvFileReader
err = ffr.Open(flvFileName)
errors.PanicIfErrorOccur(err)
log.Infof("open succ.")
log.Infof("open succ. filename=%s", flvFileName)
flvHeader, err := ffr.ReadFlvHeader()
errors.PanicIfErrorOccur(err)
@ -75,7 +78,7 @@ func main() {
if tag.Header.T == httpflv.TagTypeMetadata {
if totalBaseTS == 0 {
// 第一个metadata直接发送
log.Debugf("CHEFERASEME write metadata.")
//log.Debugf("CHEFERASEME write metadata.")
chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, rtmp.LocalChunkSize)
err = ps.TmpWrite(chunks)
errors.PanicIfErrorOccur(err)
@ -87,11 +90,11 @@ func main() {
if hasReadThisBaseTS {
// 之前已经读到了这轮读文件的base值ts要减去base
log.Debugf("CHEFERASEME %+v %d %d %d.", tag.Header, tag.Header.Timestamp, thisBaseTS, totalBaseTS)
//log.Debugf("CHEFERASEME %+v %d %d %d.", tag.Header, tag.Header.Timestamp, thisBaseTS, totalBaseTS)
h.Timestamp = tag.Header.Timestamp - thisBaseTS + totalBaseTS
} else {
// 设置basets设置为上一轮读文件的值
log.Debugf("CHEFERASEME %+v %d %d %d.", tag.Header, tag.Header.Timestamp, thisBaseTS, totalBaseTS)
//log.Debugf("CHEFERASEME %+v %d %d %d.", tag.Header, tag.Header.Timestamp, thisBaseTS, totalBaseTS)
thisBaseTS = tag.Header.Timestamp
h.Timestamp = totalBaseTS
hasReadThisBaseTS = true
@ -107,9 +110,9 @@ func main() {
chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, rtmp.LocalChunkSize)
log.Debugf("before send. diff=%d, ts=%d, prevTS=%d", diff, h.Timestamp, prevTS)
//log.Debugf("before send. diff=%d, ts=%d, prevTS=%d", diff, h.Timestamp, prevTS)
time.Sleep(time.Duration(diff) * time.Millisecond)
log.Debug("send")
//log.Debug("send")
err = ps.TmpWrite(chunks)
errors.PanicIfErrorOccur(err)
prevTS = h.Timestamp

@ -139,23 +139,7 @@ func (session *PullSession) ReadFlvHeader() ([]byte, error) {
}
func (session *PullSession) ReadTag() (*Tag, error) {
rawHeader := make([]byte, TagHeaderSize)
if _, err := session.Conn.ReadAtLeast(rawHeader, TagHeaderSize); err != nil {
return nil, err
}
header := parseTagHeader(rawHeader)
needed := int(header.DataSize) + prevTagFieldSize
tag := &Tag{}
tag.Header = header
tag.Raw = make([]byte, TagHeaderSize+needed)
copy(tag.Raw, rawHeader)
if _, err := session.Conn.ReadAtLeast(tag.Raw[TagHeaderSize:], needed); err != nil {
return nil, err
}
return tag, nil
return readTag(session.Conn)
}
func (session *PullSession) runReadLoop(readFlvTagCB ReadFlvTagCB) error {

@ -1,6 +1,8 @@
package httpflv
import "os"
import (
"os"
)
type FlvFileReader struct {
fp *os.File
@ -18,22 +20,7 @@ func (ffr *FlvFileReader) ReadFlvHeader() ([]byte, error) {
}
func (ffr *FlvFileReader) ReadTag() (*Tag, error) {
var err error
h, rawHeader, err := readTagHeader(ffr.fp)
if err != nil {
return nil, err
}
needed := int(h.DataSize) + prevTagFieldSize
tag := &Tag{}
tag.Header = h
tag.Raw = make([]byte, TagHeaderSize+needed)
copy(tag.Raw, rawHeader)
_, err = ffr.fp.Read(tag.Raw[TagHeaderSize:])
if err != nil {
return nil, err
}
return tag, nil
return readTag(ffr.fp)
}
func (ffr *FlvFileReader) Dispose() {

@ -133,6 +133,26 @@ func readTagHeader(rd io.Reader) (h TagHeader, rawHeader []byte, err error) {
return
}
func readTag(rd io.Reader) (*Tag, error) {
rawHeader := make([]byte, TagHeaderSize)
if _, err := io.ReadAtLeast(rd, rawHeader, TagHeaderSize); err != nil {
return nil, err
}
header := parseTagHeader(rawHeader)
needed := int(header.DataSize) + prevTagFieldSize
tag := &Tag{}
tag.Header = header
tag.Raw = make([]byte, TagHeaderSize+needed)
copy(tag.Raw, rawHeader)
if _, err := io.ReadAtLeast(rd, tag.Raw[TagHeaderSize:], needed); err != nil {
return nil, err
}
return tag, nil
}
func (tag *Tag) cloneTag() *Tag {
res := &Tag{}
res.Header = tag.Header

@ -45,7 +45,7 @@ const (
// 单位毫秒如果为0则没有超时
type ClientSessionTimeout struct {
ConnectTimeoutMS int // 建立连接超时
DoTimeoutMS int // 从发起连接到收到publish或play信令结果的超时
DoTimeoutMS int // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时
ReadAVTimeoutMS int // 读取音视频数据的超时
WriteAVTimeoutMS int // 发送音视频数据的超时
}
@ -150,21 +150,29 @@ func (s *ClientSession) doMsg(stream *Stream) error {
return s.doProtocolControlMessage(stream)
case typeidCommandMessageAMF0:
return s.doCommandMessage(stream)
case typeidUserControl:
log.Warnf("read user control message, ignore. [%s]", s.UniqueKey)
case TypeidDataMessageAMF0:
return s.doDataMessageAMF0(stream)
case typeidAck:
return s.doAck(stream)
case typeidUserControl:
log.Warnf("read user control message, ignore. [%s]", s.UniqueKey)
case TypeidAudio:
fallthrough
case TypeidVideo:
s.obs.ReadRTMPAVMsgCB(stream.header, stream.timestampAbs, stream.msg.buf[stream.msg.b:stream.msg.e])
default:
log.Errorf("read unknown msg type id. [%s] typeid=%d", s.UniqueKey, stream.header)
log.Errorf("read unknown msg type id. [%s] typeid=%+v", s.UniqueKey, stream.header)
panic(0)
}
return nil
}
func (s *ClientSession) doAck(stream *Stream) error {
seqNum := bele.BEUint32(stream.msg.buf[stream.msg.b: stream.msg.e])
log.Infof("-----> Acknowledgement. [%s] ignore. sequence number=%d.", s.UniqueKey, seqNum)
return nil
}
func (s *ClientSession) doDataMessageAMF0(stream *Stream) error {
val, err := stream.msg.peekStringWithType()
if err != nil {

@ -23,6 +23,7 @@ const (
TypeidVideo = 9
TypeidDataMessageAMF0 = 18 // meta
typeidSetChunkSize = 1
typeidAck = 3
typeidUserControl = 4
typeidWinAckSize = 5
typeidBandwidth = 6

Loading…
Cancel
Save