diff --git a/app/flvfile2rtmppush/main.go b/app/flvfile2rtmppush/main.go new file mode 100644 index 0000000..fb502e9 --- /dev/null +++ b/app/flvfile2rtmppush/main.go @@ -0,0 +1,140 @@ +package main + +import ( + "flag" + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/lal/pkg/rtmp" + "github.com/q191201771/lal/pkg/util/log" + "os" + "time" +) + +// 将flv文件通过rtmp协议推送至rtmp服务器 +// +// Usage: +// ./bin/flvfile2rtmppush -i /tmp/test.flv -o rtmp://push.xxx.com/live/testttt + +func main() { + flvFileName, rtmpPushURL := parseFlag() + + var ffr httpflv.FlvFileReader + err := ffr.Open(flvFileName) + panicIfErr(err) + + log.Infof("open succ.") + flvHeader, err := ffr.ReadFlvHeader() + panicIfErr(err) + log.Infof("read flv header succ. %v", flvHeader) + + ps := rtmp.NewPushSession(5000) + err = ps.Push(rtmpPushURL) + panicIfErr(err) + log.Infof("push succ.") + + var prevTS uint32 + firstA := true + firstV := true + //var aPrevH *rtmp.Header + //var vPrevH *rtmp.Header + + for i := 0; i < 1000*1000; i++ { + tag, err := ffr.ReadTag() + panicIfErr(err) + //log.Infof("tag: %+v %v", tag.Header, tag.Raw[11:]) + log.Infof("tag: %+v %d", tag.Header, len(tag.Raw)) + + // TODO chef: 转换代码放入lal某个包中 + var h rtmp.Header + h.MsgLen = int(tag.Header.DataSize) //len(tag.Raw)-httpflv.TagHeaderSize + h.Timestamp = int(tag.Header.Timestamp) + h.MsgTypeID = int(tag.Header.T) + h.MsgStreamID = rtmp.MSID1 + switch tag.Header.T { + case httpflv.TagTypeMetadata: + h.CSID = rtmp.CSIDAMF + case httpflv.TagTypeAudio: + h.CSID = rtmp.CSIDAudio + case httpflv.TagTypeVideo: + h.CSID = rtmp.CSIDVideo + } + + // 把第一个音频和视频的时间戳改成0 + if tag.Header.T == httpflv.TagTypeAudio && firstA { + h.Timestamp = 0 + firstA = false + } + if tag.Header.T == httpflv.TagTypeVideo && firstV { + h.Timestamp = 0 + firstV = false + } + + //var chunks []byte + //if tag.Header.T == httpflv.TagTypeVideo { + // chunks = rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, aPrevH, rtmp.LocalChunkSize) + // aPrevH = &h + //} + //if tag.Header.T == httpflv.TagTypeVideo { + // chunks = rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, vPrevH, rtmp.LocalChunkSize) + // vPrevH = &h + //} + //if tag.Header.T == httpflv.TagTypeVideo { + // chunks = rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, nil, rtmp.LocalChunkSize) + //} + chunks := rtmp.Message2Chunks(tag.Raw[11:11+h.MsgLen], &h, nil, rtmp.LocalChunkSize) + + // 第一个包直接发送 + if prevTS == 0 { + err = ps.TmpWrite(chunks) + panicIfErr(err) + prevTS = tag.Header.Timestamp + continue + } + + // 相等或回退了直接发送 + if tag.Header.Timestamp <= prevTS { + err = ps.TmpWrite(chunks) + panicIfErr(err) + prevTS = tag.Header.Timestamp + continue + } + + if tag.Header.Timestamp > prevTS { + diff := tag.Header.Timestamp - prevTS + + // 跳跃超过了30秒,直接发送 + if diff > 30000 { + err = ps.TmpWrite(chunks) + panicIfErr(err) + prevTS = tag.Header.Timestamp + continue + } + + // 睡眠后发送,睡眠时长为时间戳间隔 + time.Sleep(time.Duration(diff) * time.Millisecond) + err = ps.TmpWrite(chunks) + panicIfErr(err) + prevTS = tag.Header.Timestamp + continue + } + + panic("should never reach here.") + } + ffr.Dispose() +} + +func panicIfErr(err error) { + if err != nil { + panic(err) + } +} + +func parseFlag() (string, string) { + i := flag.String("i", "", "specify flv file") + o := flag.String("o", "", "specify rtmp push url") + flag.Parse() + if *i == "" || *o == "" { + flag.Usage() + os.Exit(1) + } + return *i, *o +} diff --git a/build.sh b/build.sh index 34b0979..ae4aa26 100755 --- a/build.sh +++ b/build.sh @@ -12,3 +12,7 @@ cd app/lal && \ -X 'github.com/q191201771/lal/pkg/util/bininfo.BuildTime=`date +'%Y.%m.%d.%H%M%S'`' \ -X 'github.com/q191201771/lal/pkg/util/bininfo.BuildGoVersion=`go version`' \ " -o ../../bin/lal + +cd - + +cd app/flvfile2rtmppush && go build -o ../../bin/flvfile2rtmppush diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 0a235a6..af1b23e 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -16,7 +16,7 @@ import ( var flvHeaderSize = 13 -var flvPrevTagFieldSize = 4 +var prevTagFieldSize = 4 type PullSessionStat struct { ReadCount int64 @@ -170,15 +170,16 @@ func (session *PullSession) readTag() (*Tag, error) { if err != nil { return nil, err } - session.ConnStat.Read(tagHeaderSize) + session.ConnStat.Read(TagHeaderSize) - needed := int(header.DataSize) + flvPrevTagFieldSize + needed := int(header.DataSize) + prevTagFieldSize tag := &Tag{} tag.Header = header - tag.Raw = make([]byte, tagHeaderSize+needed) + tag.Raw = make([]byte, TagHeaderSize+needed) copy(tag.Raw, rawHeader) - if _, err := io.ReadAtLeast(session.rb, tag.Raw[tagHeaderSize:], needed); err != nil { + // TODO chef: why ReadAtLeast??? + if _, err := io.ReadAtLeast(session.rb, tag.Raw[TagHeaderSize:], needed); err != nil { return nil, err } session.ConnStat.Read(needed) diff --git a/pkg/httpflv/flv_file_reader.go b/pkg/httpflv/flv_file_reader.go new file mode 100644 index 0000000..85f197b --- /dev/null +++ b/pkg/httpflv/flv_file_reader.go @@ -0,0 +1,43 @@ +package httpflv + +import "os" + +type FlvFileReader struct { + fp *os.File +} + +func (ffr *FlvFileReader) Open(filename string) (err error) { + ffr.fp, err = os.Open(filename) + return +} + +func (ffr *FlvFileReader) ReadFlvHeader() ([]byte, error) { + flvHeader := make([]byte, flvHeaderSize) + _, err := ffr.fp.Read(flvHeader) + return flvHeader, err +} + +func (ffr *FlvFileReader) ReadTag() (*Tag, error) { + var err error + h, rawHeader, err := readTagHeader(ffr.fp) + if err != nil { + return nil, err + } + needed := int(h.DataSize) + prevTagFieldSize + tag := &Tag{} + tag.Header = h + tag.Raw = make([]byte, TagHeaderSize+needed) + copy(tag.Raw, rawHeader) + + _, err = ffr.fp.Read(tag.Raw[TagHeaderSize:]) + if err != nil { + return nil, err + } + return tag, nil +} + +func (ffr *FlvFileReader) Dispose() { + if ffr.fp != nil { + _ = ffr.fp.Close() + } +} diff --git a/pkg/httpflv/gop_cache.go b/pkg/httpflv/gop_cache.go index 9480d70..f90acfa 100644 --- a/pkg/httpflv/gop_cache.go +++ b/pkg/httpflv/gop_cache.go @@ -37,8 +37,8 @@ func (c *GOPCache) Push(tag *Tag) { // TODO chef: will this happen? if c.metadata != nil { log.Debugf("updating metadata.") - log.Debug(tag.Header, tag.Raw[tagHeaderSize:]) - log.Debug(c.metadata.Header, c.metadata.Raw[tagHeaderSize:]) + log.Debug(tag.Header, tag.Raw[TagHeaderSize:]) + log.Debug(c.metadata.Header, c.metadata.Raw[TagHeaderSize:]) c.clearGOP() } c.metadata = tag @@ -48,12 +48,12 @@ func (c *GOPCache) Push(tag *Tag) { c.avcSeqHeader = tag } else { // TODO chef: compare nessary? if other way to update seq header and handle cache stuff? - if bytes.Compare(tag.Raw[tagHeaderSize:], c.avcSeqHeader.Raw[tagHeaderSize:]) == 0 { + if bytes.Compare(tag.Raw[TagHeaderSize:], c.avcSeqHeader.Raw[TagHeaderSize:]) == 0 { // noop } else { log.Debugf("updating avc seq header.") - log.Debug(tag.Header, tag.Raw[tagHeaderSize:]) - log.Debug(c.avcSeqHeader.Header, c.avcSeqHeader.Raw[tagHeaderSize:]) + log.Debug(tag.Header, tag.Raw[TagHeaderSize:]) + log.Debug(c.avcSeqHeader.Header, c.avcSeqHeader.Raw[TagHeaderSize:]) c.clearGOP() c.avcSeqHeader = tag } @@ -63,7 +63,7 @@ func (c *GOPCache) Push(tag *Tag) { if c.aacSeqHeader == nil { c.aacSeqHeader = tag } else { - if bytes.Compare(tag.Raw[tagHeaderSize:], c.aacSeqHeader.Raw[tagHeaderSize:]) == 0 { + if bytes.Compare(tag.Raw[TagHeaderSize:], c.aacSeqHeader.Raw[TagHeaderSize:]) == 0 { // noop } else { log.Debugf("updating aac seq header.") diff --git a/pkg/httpflv/tag.go b/pkg/httpflv/tag.go index 277c289..bf86917 100644 --- a/pkg/httpflv/tag.go +++ b/pkg/httpflv/tag.go @@ -6,7 +6,7 @@ import ( ) // TODO chef: make these const -const tagHeaderSize int = 11 +const TagHeaderSize int = 11 const prevTagSizeFieldSize int = 4 const ( @@ -60,15 +60,15 @@ func (tag *Tag) IsMetadata() bool { } func (tag *Tag) IsAVCKeySeqHeader() bool { - return tag.Header.T == TagTypeVideo && tag.Raw[tagHeaderSize] == AVCKey && tag.Raw[tagHeaderSize+1] == isAVCKeySeqHeader + return tag.Header.T == TagTypeVideo && tag.Raw[TagHeaderSize] == AVCKey && tag.Raw[TagHeaderSize+1] == isAVCKeySeqHeader } func (tag *Tag) IsAVCKeyNalu() bool { - return tag.Header.T == TagTypeVideo && tag.Raw[tagHeaderSize] == AVCKey && tag.Raw[tagHeaderSize+1] == AVCPacketTypeNalu + return tag.Header.T == TagTypeVideo && tag.Raw[TagHeaderSize] == AVCKey && tag.Raw[TagHeaderSize+1] == AVCPacketTypeNalu } func (tag *Tag) IsAACSeqHeader() bool { - return tag.Header.T == TagTypeAudio && tag.Raw[tagHeaderSize]>>4 == SoundFormatAAC && tag.Raw[tagHeaderSize+1] == AACPacketTypeSeqHeader + return tag.Header.T == TagTypeAudio && tag.Raw[TagHeaderSize]>>4 == SoundFormatAAC && tag.Raw[TagHeaderSize+1] == AACPacketTypeSeqHeader } func IsMetadata(tag []byte) bool { @@ -76,19 +76,19 @@ func IsMetadata(tag []byte) bool { } func IsAVCKeySeqHeader(tag []byte) bool { - return tag[0] == TagTypeVideo && tag[tagHeaderSize] == AVCKey && tag[tagHeaderSize+1] == isAVCKeySeqHeader + return tag[0] == TagTypeVideo && tag[TagHeaderSize] == AVCKey && tag[TagHeaderSize+1] == isAVCKeySeqHeader } func IsAVCKeyNalu(tag []byte) bool { - return tag[0] == TagTypeVideo && tag[tagHeaderSize] == AVCKey && tag[tagHeaderSize+1] == AVCPacketTypeNalu + return tag[0] == TagTypeVideo && tag[TagHeaderSize] == AVCKey && tag[TagHeaderSize+1] == AVCPacketTypeNalu } func IsAACSeqHeader(tag []byte) bool { - return tag[0] == TagTypeAudio && tag[tagHeaderSize]>>4 == SoundFormatAAC && tag[tagHeaderSize+1] == AACPacketTypeSeqHeader + return tag[0] == TagTypeAudio && tag[TagHeaderSize]>>4 == SoundFormatAAC && tag[TagHeaderSize+1] == AACPacketTypeSeqHeader } func PackHTTPFlvTag(t uint8, timestamp int, in []byte) []byte { - out := make([]byte, tagHeaderSize+len(in)+prevTagSizeFieldSize) + out := make([]byte, TagHeaderSize+len(in)+prevTagSizeFieldSize) out[0] = t bele.BEPutUint24(out[1:], uint32(len(in))) bele.BEPutUint24(out[4:], uint32(timestamp&0xFFFFFF)) @@ -97,13 +97,13 @@ func PackHTTPFlvTag(t uint8, timestamp int, in []byte) []byte { out[9] = 0 out[10] = 0 copy(out[11:], in) - bele.BEPutUint32(out[tagHeaderSize+len(in):], uint32(tagHeaderSize+len(in))) + bele.BEPutUint32(out[TagHeaderSize+len(in):], uint32(TagHeaderSize+len(in))) return out } func readTagHeader(rd io.Reader) (h TagHeader, rawHeader []byte, err error) { - rawHeader = make([]byte, tagHeaderSize) - if _, err = io.ReadAtLeast(rd, rawHeader, tagHeaderSize); err != nil { + rawHeader = make([]byte, TagHeaderSize) + if _, err = io.ReadAtLeast(rd, rawHeader, TagHeaderSize); err != nil { return } diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 58f9c47..584e84e 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -106,6 +106,12 @@ func (s *ClientSession) WaitLoop() error { return <-s.errChan } +// TODO chef: mod to async +func (s *ClientSession) TmpWrite(b []byte) error { + _, err := s.Conn.Write(b) + return err +} + func (s *ClientSession) runReadLoop() error { return s.chunkComposer.RunLoop(s.rb, s.doMsg) } diff --git a/pkg/rtmp/server.go b/pkg/rtmp/server.go index 4de5d0f..958afd6 100644 --- a/pkg/rtmp/server.go +++ b/pkg/rtmp/server.go @@ -109,4 +109,3 @@ func (server *Server) DelRTMPSubSessionCB(session *SubSession) { group.DelSubSession(session) } - diff --git a/pkg/util/bininfo/bininfo.go b/pkg/util/bininfo/bininfo.go index cc31ce5..703c418 100644 --- a/pkg/util/bininfo/bininfo.go +++ b/pkg/util/bininfo/bininfo.go @@ -13,9 +13,9 @@ import ( // " var ( - GitCommitID string - BuildTime string - BuildGoVersion string + GitCommitID string + BuildTime string + BuildGoVersion string ) func StringifySingleLine() string { diff --git a/pkg/util/log/log.go b/pkg/util/log/log.go index 63735c8..343c7c3 100644 --- a/pkg/util/log/log.go +++ b/pkg/util/log/log.go @@ -89,7 +89,7 @@ func init() { if err != nil { os.Exit(1) } - err = log.SetAdditionalStackDepth(2) + err = log.SetAdditionalStackDepth(0) if err != nil { os.Exit(1) }