1). app/flvfile2rtmppush 优化平稳推流 2). app/modflvfile 使用 log.FatalIfErrorNotNil 3). rtmp.ClientSession bugfix 信令中的 streamName 携带 url 参数,匹配 group 时不携带 4). rtmp.ServerSession 中处理 typeidAck

pull/200/head
q191201771 6 years ago
parent 1394663101
commit 8830ef6db6

@ -39,6 +39,10 @@ func main() {
var prevTS uint32
var hasReadThisBaseTS bool
var thisBaseTS uint32
var hasTraceFirstTagTS bool
var firstTagTS uint32
var firstTagTick int64
for i := 0; ; i++ {
log.Infof(" > round. i=%d, totalBaseTS=%d, prevTS=%d, thisBaseTS=%d",
@ -103,21 +107,30 @@ func main() {
hasReadThisBaseTS = true
}
var diff uint32
if h.Timestamp >= prevTS {
diff = h.Timestamp - prevTS
} else {
if h.Timestamp < prevTS {
// ts比上一个包的还小直接设置为上一包的值并且不sleep直接发送
h.Timestamp = prevTS
}
chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, rtmp.LocalChunkSize)
//log.Debugf("before send. diff=%d, ts=%d, prevTS=%d", diff, h.Timestamp, prevTS)
time.Sleep(time.Duration(diff) * time.Millisecond)
//log.Debug("send")
if hasTraceFirstTagTS {
n := time.Now().UnixNano() / 1000000
diffTick := n - firstTagTick
diffTS := h.Timestamp - firstTagTS
//log.Infof("%d %d %d %d", n, diffTick, diffTS, int64(diffTS) - diffTick)
if diffTick < int64(diffTS) {
time.Sleep(time.Duration(int64(diffTS) - diffTick) * time.Millisecond)
}
} else {
firstTagTick = time.Now().UnixNano() / 1000000
firstTagTS = h.Timestamp
hasTraceFirstTagTS = true
}
err = ps.TmpWrite(chunks)
log.FatalIfErrorNotNil(err)
prevTS = h.Timestamp
}

@ -3,7 +3,6 @@ package main
import (
"flag"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/nezha/pkg/errors"
"github.com/q191201771/nezha/pkg/log"
"io"
"os"
@ -16,6 +15,7 @@ import (
var countA int
var countV int
var exitFlag bool
func hookTag(tag *httpflv.Tag) {
log.Infof("%+v", tag.Header)
@ -25,6 +25,9 @@ func hookTag(tag *httpflv.Tag) {
// httpflv.ModTagTimestamp(tag, 16777205)
//}
//countA++
if tag.IsAACSeqHeader() {
log.Info("aac header.")
}
}
if tag.Header.T == httpflv.TagTypeVideo {
//if countV < 3 {
@ -46,21 +49,21 @@ func main() {
var ffr httpflv.FlvFileReader
err = ffr.Open(inFileName)
errors.PanicIfErrorOccur(err)
log.FatalIfErrorNotNil(err)
defer ffr.Dispose()
log.Infof("open input flv file succ.")
var ffw httpflv.FlvFileWriter
err = ffw.Open(outFileName)
errors.PanicIfErrorOccur(err)
log.FatalIfErrorNotNil(err)
defer ffw.Dispose()
log.Infof("open output flv file succ.")
flvHeader, err := ffr.ReadFlvHeader()
errors.PanicIfErrorOccur(err)
log.FatalIfErrorNotNil(err)
err = ffw.WriteRaw(flvHeader)
errors.PanicIfErrorOccur(err)
log.FatalIfErrorNotNil(err)
//for i:=0; i < 10; i++{
for {
@ -69,13 +72,16 @@ func main() {
log.Infof("EOF.")
break
}
errors.PanicIfErrorOccur(err)
log.FatalIfErrorNotNil(err)
if tag.Header.T == 9 && tag.Header.DataSize == 68 && tag.Header.Timestamp == 677764 {
break
}
//log.Infof("> hook. %+v", tag)
hookTag(tag)
//log.Infof("< hook. %+v", tag)
err = ffw.WriteRaw(tag.Raw)
errors.PanicIfErrorOccur(err)
log.FatalIfErrorNotNil(err)
}
}

@ -28,6 +28,7 @@ type ClientSession struct {
tcURL string
appName string
streamName string
streamNameWithRawQuery string
hs HandshakeClient
peerWinAckSize int
@ -286,11 +287,11 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
log.Infof("-----> _result(). [%s]", s.UniqueKey)
switch s.t {
case CSTPullSession:
if err := s.packer.writePlay(s.Conn, s.streamName, sid); err != nil {
if err := s.packer.writePlay(s.Conn, s.streamNameWithRawQuery, sid); err != nil {
return err
}
case CSTPushSession:
if err := s.packer.writePublish(s.Conn, s.appName, s.streamName, sid); err != nil {
if err := s.packer.writePublish(s.Conn, s.appName, s.streamNameWithRawQuery, sid); err != nil {
return err
}
}
@ -341,8 +342,13 @@ func (s *ClientSession) parseURL(rawURL string) error {
}
s.appName = strs[0]
// 有的rtmp服务器会使用url后面的参数比如说用于鉴权这里把它带上
s.streamName = strs[1] + "?" + s.url.RawQuery
log.Debugf("%s %s %s %+v", s.tcURL, s.appName, s.streamName, *s.url)
s.streamName = strs[1]
if s.url.RawQuery == "" {
s.streamNameWithRawQuery = s.streamName
} else {
s.streamNameWithRawQuery = s.streamName + "?" + s.url.RawQuery
}
log.Debugf("%s %s %s %+v", s.tcURL, s.appName, s.streamNameWithRawQuery, *s.url)
return nil
}

@ -3,6 +3,7 @@ package rtmp
import (
"bufio"
"encoding/hex"
"github.com/q191201771/nezha/pkg/bele"
"github.com/q191201771/nezha/pkg/log"
"github.com/q191201771/nezha/pkg/unique"
"net"
@ -162,6 +163,8 @@ func (s *ServerSession) doMsg(stream *Stream) error {
return s.doCommandMessage(stream)
case TypeidDataMessageAMF0:
return s.doDataMessageAMF0(stream)
case typeidAck:
return s.doACK(stream)
case TypeidAudio:
fallthrough
case TypeidVideo:
@ -178,6 +181,12 @@ func (s *ServerSession) doMsg(stream *Stream) error {
return nil
}
func (s *ServerSession) doACK(stream *Stream) error {
seqNum := bele.BEUint32(stream.msg.buf[stream.msg.b:stream.msg.e])
log.Infof("-----> Acknowledgement. [%s] ignore. sequence number=%d.", s.UniqueKey, seqNum)
return nil
}
func (s *ServerSession) doDataMessageAMF0(stream *Stream) error {
if s.t != ServerSessionTypePub {
log.Errorf("read audio/video message but server session not pub type. [%s]", s.UniqueKey)

Loading…
Cancel
Save