From 44443c1c1d35b655f2c5a70061bdf314110e735a Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 10 Jul 2021 10:25:13 +0800 Subject: [PATCH 1/5] prev commit patch --- go.mod | 2 +- go.sum | 4 ++-- pkg/rtsp/auth.go | 10 +++++----- pkg/rtsp/client_command_session.go | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 92ba118..f10918a 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.13 -require github.com/q191201771/naza v0.19.3 +require github.com/q191201771/naza v0.20.0 diff --git a/go.sum b/go.sum index 0914ec0..8e42348 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/naza v0.19.3 h1:cvjBztFJjvqbr8uSFkFFKq39HgA4ryC7z6qTY4y8ayI= -github.com/q191201771/naza v0.19.3/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= +github.com/q191201771/naza v0.20.0 h1:VRessjnJDQgclflzUUMy8qIzLYtblaLv2po6wb4rMNM= +github.com/q191201771/naza v0.20.0/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= diff --git a/pkg/rtsp/auth.go b/pkg/rtsp/auth.go index 7b8627b..18479f4 100644 --- a/pkg/rtsp/auth.go +++ b/pkg/rtsp/auth.go @@ -38,14 +38,14 @@ func (a *Auth) FeedWwwAuthenticate(auths []string, username, password string) { a.Username = username a.Password = password //目前只处理第一个 - var s string - if len(auths)>0{ + var s string + if len(auths) > 0 { s = auths[0] - }else{ + } else { return } - s = strings.TrimPrefix(s, HeaderWwwAuthenticate) - s = strings.TrimSpace(s) + s = strings.TrimPrefix(s, HeaderWwwAuthenticate) + s = strings.TrimSpace(s) if strings.HasPrefix(s, AuthTypeBasic) { a.Typ = AuthTypeBasic return diff --git a/pkg/rtsp/client_command_session.go b/pkg/rtsp/client_command_session.go index 2783540..fbdd65c 100644 --- a/pkg/rtsp/client_command_session.go +++ b/pkg/rtsp/client_command_session.go @@ -343,7 +343,7 @@ func (session *ClientCommandSession) writeOptions() error { method := ctx.Headers.Get(HeaderPublic) - if method== "" { + if method == "" { return nil } if strings.Contains(method, MethodGetParameter) { From cbe2e8e2b3fc1477c3211ea3c94f800b388e2b10 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 10 Jul 2021 10:32:50 +0800 Subject: [PATCH 2/5] =?UTF-8?q?[refactor]=20=E5=88=A0=E9=99=A4=E5=87=A0?= =?UTF-8?q?=E4=B8=AA=E6=B2=A1=E7=94=A8=E7=9A=84demo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/demo/learnhls/learnhls.go | 133 ---------------------------------- app/demo/learnts/learnts.go | 100 ------------------------- app/demo/tscmp/tscmp.go | 98 ------------------------- 3 files changed, 331 deletions(-) delete mode 100644 app/demo/learnhls/learnhls.go delete mode 100644 app/demo/learnts/learnts.go delete mode 100644 app/demo/tscmp/tscmp.go diff --git a/app/demo/learnhls/learnhls.go b/app/demo/learnhls/learnhls.go deleted file mode 100644 index f0abcc3..0000000 --- a/app/demo/learnhls/learnhls.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2020, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package main - -import ( - "flag" - "strconv" - "strings" - "sync" - "time" - - "github.com/q191201771/lal/pkg/base" - - "github.com/q191201771/naza/pkg/lru" - "github.com/q191201771/naza/pkg/nazahttp" - "github.com/q191201771/naza/pkg/nazalog" -) - -// 分析诊断HLS的时间戳。注意,这个程序还没有完成。 -// -// TODO chef: 有的代码考虑弄到pkg/hls中 - -type M3u8PullSession struct { -} - -type frag struct { - extinf float64 - filename string -} - -func parseM3u8(content string) (ret []frag) { - var err error - - lines := strings.Split(content, "\n") - var f frag - for _, line := range lines { - if strings.HasPrefix(line, "#EXTINF:") { - line = strings.TrimPrefix(line, "#EXTINF:") - line = strings.TrimSuffix(line, ",") - f.extinf, err = strconv.ParseFloat(line, 64) - nazalog.Assert(nil, err) - } - if strings.Index(line, ".ts") != -1 { - f.filename = line - ret = append(ret, f) - } - } - return -} - -func getTsUrl(m3u8Url string, tsFilename string) string { - index := strings.LastIndex(m3u8Url, "/") - nazalog.Assert(true, index != -1) - path := m3u8Url[:index+1] - return path + tsFilename -} - -func main() { - _ = nazalog.Init(func(option *nazalog.Option) { - option.AssertBehavior = nazalog.AssertFatal - }) - defer nazalog.Sync() - - m3u8Url := parseFlag() - nazalog.Infof("m3u8 url=%s", m3u8Url) - - cache := lru.New(1024) - - var m sync.Mutex - var frags []frag - - go func() { - for { - content, err := nazahttp.GetHttpFile(m3u8Url, 3000) - if err != nil { - nazalog.Error(err) - return - } - //nazalog.Debugf("\n-----m3u8-----\n%s", string(content)) - - currFrags := parseM3u8(string(content)) - //nazalog.Debugf("%+v", currFrags) - - m.Lock() - for _, f := range currFrags { - if _, exist := cache.Get(f.filename); exist { - continue - } - cache.Put(f.filename, nil) - - nazalog.Infof("> new frag. filename=%s", f.filename) - frags = append(frags, f) - } - m.Unlock() - - time.Sleep(100 * time.Millisecond) - } - }() - - for { - m.Lock() - currFrags := frags - frags = nil - m.Unlock() - - for _, f := range currFrags { - nazalog.Infof("< new frag. filename=%s", f.filename) - tsUrl := getTsUrl(m3u8Url, f.filename) - nazalog.Debug(tsUrl) - content, err := nazahttp.GetHttpFile(tsUrl, 3000) - nazalog.Assert(nil, err) - nazalog.Debugf("TS len=%d", len(content)) - } - - time.Sleep(100 * time.Millisecond) - } -} - -func parseFlag() string { - url := flag.String("i", "", "specify m3u8 url") - flag.Parse() - if *url == "" { - flag.Usage() - base.OsExitAndWaitPressIfWindows(1) - } - return *url -} diff --git a/app/demo/learnts/learnts.go b/app/demo/learnts/learnts.go deleted file mode 100644 index ca91dc0..0000000 --- a/app/demo/learnts/learnts.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright 2020, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package main - -import ( - "io/ioutil" - - "github.com/q191201771/lal/pkg/mpegts" - - "github.com/q191201771/lal/pkg/hls" - "github.com/q191201771/naza/pkg/nazalog" -) - -// 学习如何解析TS文件。注意,该程序还没有写完。 - -var ( - pat mpegts.Pat - pmt mpegts.Pmt - pid2stream map[uint16]*Stream -) - -type Stream struct { -} - -var filename = "/Volumes/Data/nrm-0.ts" - -func handlePacket(packet []byte) { - h := mpegts.ParseTsPacketHeader(packet) - index := 4 - nazalog.Debugf("%+v", h) - - var adaptation mpegts.TsPacketAdaptation - switch h.Adaptation { - case mpegts.AdaptationFieldControlNo: - // noop - case mpegts.AdaptationFieldControlFollowed: - adaptation = mpegts.ParseTsPacketAdaptation(packet[4:]) - index++ - default: - nazalog.Warn(h.Adaptation) - } - index += int(adaptation.Length) - - if h.Pid == mpegts.PidPat { - if h.PayloadUnitStart == 1 { - index++ - } - pat = mpegts.ParsePat(packet[index:]) - nazalog.Debugf("%+v", pat) - return - } - - if pat.SearchPid(h.Pid) { - if h.PayloadUnitStart == 1 { - index++ - } - pmt = mpegts.ParsePmt(packet[index:]) - nazalog.Debugf("%+v", pmt) - - for _, ele := range pmt.ProgramElements { - pid2stream[ele.Pid] = &Stream{} - } - return - } - - _, ok := pid2stream[h.Pid] - if !ok { - nazalog.Warn(h.Pid) - } - - // 判断是否有PES - if h.PayloadUnitStart == 1 { - pes, length := mpegts.ParsePes(packet[index:]) - nazalog.Debugf("%+v, %d", pes, length) - } -} - -func main() { - _ = nazalog.Init(func(option *nazalog.Option) { - option.AssertBehavior = nazalog.AssertFatal - }) - defer nazalog.Sync() - - pid2stream = make(map[uint16]*Stream) - - content, err := ioutil.ReadFile(filename) - nazalog.Assert(nil, err) - - packets, _ := hls.SplitFragment2TsPackets(content) - - for _, packet := range packets { - handlePacket(packet) - } -} diff --git a/app/demo/tscmp/tscmp.go b/app/demo/tscmp/tscmp.go deleted file mode 100644 index 6d844bb..0000000 --- a/app/demo/tscmp/tscmp.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2020, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package main - -import ( - "bytes" - "encoding/hex" - "io/ioutil" - - "github.com/q191201771/lal/pkg/mpegts" - - "github.com/q191201771/lal/pkg/hls" - "github.com/q191201771/naza/pkg/nazalog" -) - -// 临时小工具,比较两个TS文件。注意,该程序还没有写完。 - -var filename1 = "/tmp/lal/hls/innertest/innertest-7.ts" -var filename2 = "/tmp/lal/hls/innertest.bak/innertest-7.ts" - -func skipPacketFilter(tss [][]byte) (ret [][]byte) { - for _, ts := range tss { - h := mpegts.ParseTsPacketHeader(ts) - if h.Pid == mpegts.PidAudio { - continue - } - ret = append(ret, ts) - } - return -} - -func parsePacket(packet []byte) { - h := mpegts.ParseTsPacketHeader(packet) - nazalog.Debugf("%+v", h) - index := 4 - - var adaptation mpegts.TsPacketAdaptation - switch h.Adaptation { - case mpegts.AdaptationFieldControlNo: - // noop - case mpegts.AdaptationFieldControlFollowed: - adaptation = mpegts.ParseTsPacketAdaptation(packet[4:]) - index++ - default: - nazalog.Warn(h.Adaptation) - } - index += int(adaptation.Length) - - if h.PayloadUnitStart == 1 && h.Pid == 256 { - pes, length := mpegts.ParsePes(packet[index:]) - nazalog.Debugf("%+v, %d", pes, length) - } -} - -func main() { - _ = nazalog.Init(func(option *nazalog.Option) { - option.AssertBehavior = nazalog.AssertFatal - }) - defer nazalog.Sync() - - content1, err := ioutil.ReadFile(filename1) - nazalog.Assert(nil, err) - - content2, err := ioutil.ReadFile(filename2) - nazalog.Assert(nil, err) - - tss1, _ := hls.SplitFragment2TsPackets(content1) - tss2, _ := hls.SplitFragment2TsPackets(content2) - - nazalog.Debugf("num of ts1=%d, num of ts2=%d", len(tss1), len(tss2)) - - //tss1 = skipPacketFilter(tss1) - //tss2 = skipPacketFilter(tss2) - - nazalog.Debugf("after skip. num of ts1=%d, num of ts2=%d", len(tss1), len(tss2)) - - m := len(tss1) - if m > len(tss2) { - m = len(tss2) - } - - for i := 0; i < m; i++ { - if !bytes.Equal(tss1[i], tss2[i]) { - nazalog.Debug(i) - parsePacket(tss1[i]) - parsePacket(tss2[i]) - nazalog.Debugf("\n%s", hex.Dump(tss1[i])) - nazalog.Debugf("\n%s", hex.Dump(tss2[i])) - } - } - -} From 68470dea1f68e9ecf396b3216ce5ab1513820992 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 10 Jul 2021 10:35:30 +0800 Subject: [PATCH 3/5] =?UTF-8?q?[fix]=20rtsp=E5=85=BC=E5=AE=B9=EF=BC=8C?= =?UTF-8?q?=E6=9C=89=E7=9A=84=E6=91=84=E5=83=8F=E5=A4=B4sdp=E4=B8=AD?= =?UTF-8?q?=E5=8C=85=E5=90=ABaac=EF=BC=8C=E4=BD=86=E6=98=AF=E6=B2=A1?= =?UTF-8?q?=E6=9C=89config=E5=AD=97=E6=AE=B5=EF=BC=88=E5=90=8E=E7=BB=AD?= =?UTF-8?q?=E4=B9=9F=E6=B2=A1=E6=9C=89aac=20rtp=E5=8C=85=EF=BC=89=EF=BC=8C?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E8=BF=9E=E6=8E=A5=E5=A4=B1=E8=B4=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/sdp/logic.go | 4 ++-- pkg/sdp/sdp_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/pkg/sdp/logic.go b/pkg/sdp/logic.go index 0b64fa2..d4f1d33 100644 --- a/pkg/sdp/logic.go +++ b/pkg/sdp/logic.go @@ -52,7 +52,7 @@ func (lc *LogicContext) IsPayloadTypeOrigin(t int) bool { } func (lc *LogicContext) IsAudioUnpackable() bool { - return lc.audioPayloadTypeBase == base.AvPacketPtAac + return lc.audioPayloadTypeBase == base.AvPacketPtAac && lc.Asc != nil } func (lc *LogicContext) IsVideoUnpackable() bool { @@ -120,7 +120,7 @@ func ParseSdp2LogicContext(b []byte) (LogicContext, error) { if md.AFmtPBase != nil { ret.Asc, err = ParseAsc(md.AFmtPBase) if err != nil { - return ret, err + nazalog.Warnf("parse asc from afmtp failed. err=%+v", err) } } else { nazalog.Warnf("aac afmtp not exist.") diff --git a/pkg/sdp/sdp_test.go b/pkg/sdp/sdp_test.go index 2156907..fad1b12 100644 --- a/pkg/sdp/sdp_test.go +++ b/pkg/sdp/sdp_test.go @@ -458,3 +458,28 @@ a=control:track3 assert.Equal(t, nil, err) _ = ctx } + +// sdp aac中a=fmtp缺少config字段,这个case的实际情况是后续也没有aac的rtp包 +func TestCase10(t *testing.T) { + golden := `v=0 +o=- 0 0 IN IP4 0.0.0.0 +s=rtsp_demo +t=0 0 +a=control:rtsp://10.10.10.188:554/stream0 +a=range:npt=0- +m=video 0 RTP/AVP 96 +c=IN IP4 0.0.0.0 +a=rtpmap:96 H264/90000 +a=fmtp:96 packetization-mode=1;sprop-parameter-sets=Z00AKp2oHgCJ+WbgICAgQA==,aO48gA== +a=control:rtsp://10.10.10.188:554/stream0/track1 +m=audio 0 RTP/AVP 97 +c=IN IP4 0.0.0.0 +a=rtpmap:97 MPEG4-GENERIC/44100/2 +a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3 +a=control:rtsp://10.10.10.188:554/stream0/track2 +` + golden = strings.ReplaceAll(golden, "\n", "\r\n") + ctx, err := ParseSdp2LogicContext([]byte(golden)) + assert.Equal(t, nil, err) + _ = ctx +} From 77c624ab0311e2c4b2a617fb25a2e07e841ed262 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 10 Jul 2021 20:01:39 +0800 Subject: [PATCH 4/5] =?UTF-8?q?[fix]=20aac=20rtp=E6=89=93=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/rtprtcp/rtp_packer_payload_aac.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rtprtcp/rtp_packer_payload_aac.go b/pkg/rtprtcp/rtp_packer_payload_aac.go index 08df8fb..e372410 100644 --- a/pkg/rtprtcp/rtp_packer_payload_aac.go +++ b/pkg/rtprtcp/rtp_packer_payload_aac.go @@ -39,8 +39,8 @@ func (r *RtpPackerPayloadAac) Pack(in []byte, maxSize int) (out [][]byte) { item := make([]byte, 4+len(in)) item[0] = 0 item[1] = uint8(auHeadersLength*8 - 7) - item[2] = uint8(len(in) >> 5 & 0xFF) - item[3] = uint8(len(in) & 0x1F) + item[2] = uint8(len(in) >> 5) + item[3] = uint8((len(in) & 0x1F) << 3) copy(item[4:], in) out = append(out, item) return From cfabc9043c225c031127876bc59e8f285bd91fd9 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 10 Jul 2021 20:04:33 +0800 Subject: [PATCH 5/5] =?UTF-8?q?[feat]=20=E6=96=B0=E5=A2=9Edemo=20pullrtmp2?= =?UTF-8?q?pushrtsp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 - app/demo/pullrtmp2hls/pullrtmp2hls.go | 4 +- .../pullrtmp2pushrtsp/pullrtmp2pushrtsp.go | 75 +++++++ .../pullrtsp2pushrtmp/pullrtsp2pushrtmp.go | 2 +- pkg/aac/aac.go | 18 +- pkg/remux/rtmp2rtsp.go | 210 ++++++++++++++++++ pkg/rtprtcp/rtp_packer_payload_avc_hevc.go | 9 +- pkg/rtprtcp/rtp_unpacker.go | 2 +- pkg/rtprtcp/rtp_unpacker_aac.go | 2 +- pkg/rtsp/base_in_session.go | 4 +- pkg/sdp/pack.go | 54 +++-- 11 files changed, 350 insertions(+), 32 deletions(-) create mode 100644 app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go create mode 100644 pkg/remux/rtmp2rtsp.go diff --git a/.gitignore b/.gitignore index 2176dcf..e34c671 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,3 @@ -/app/demo/pullrtmp2pushrtsp/ - profile.out coverage.html *.aac diff --git a/app/demo/pullrtmp2hls/pullrtmp2hls.go b/app/demo/pullrtmp2hls/pullrtmp2hls.go index 153a230..cfd7045 100644 --- a/app/demo/pullrtmp2hls/pullrtmp2hls.go +++ b/app/demo/pullrtmp2hls/pullrtmp2hls.go @@ -70,8 +70,8 @@ func parseFlag() (url string, hlsOutPath string, fragmentDurationMs int, fragmen flag.Usage() eo := filepath.FromSlash("./pullrtmp2hls/") _, _ = fmt.Fprintf(os.Stderr, `Example: - %s -i rtmp://127.0.0.1:19350/live/test110 -o %s - %s -i rtmp://127.0.0.1:19350/live/test110 -o %s -d 5000 -n 5 + %s -i rtmp://127.0.0.1:1935/live/test110 -o %s + %s -i rtmp://127.0.0.1:1935/live/test110 -o %s -d 5000 -n 5 `, os.Args[0], eo, os.Args[0], eo) base.OsExitAndWaitPressIfWindows(1) } diff --git a/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go b/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go new file mode 100644 index 0000000..027f60b --- /dev/null +++ b/app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go @@ -0,0 +1,75 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/remux" + "github.com/q191201771/lal/pkg/rtmp" + "github.com/q191201771/lal/pkg/rtsp" + "github.com/q191201771/lal/pkg/sdp" + "github.com/q191201771/naza/pkg/nazalog" +) + +func main() { + _ = nazalog.Init(func(option *nazalog.Option) { + option.AssertBehavior = nazalog.AssertFatal + }) + + inRtmpUrl, outRtspUrl, overTcp := parseFlag() + + pullSession := rtmp.NewPullSession() + pushSession := rtsp.NewPushSession(func(option *rtsp.PushSessionOption) { + option.OverTcp = overTcp == 1 + }) + + remuxer := remux.NewRtmp2RtspRemuxer( + func(rawSdp []byte, sdpCtx sdp.LogicContext) { + // remuxer完成前期工作,生成sdp并开始push + nazalog.Info("start push.") + err := pushSession.Push(outRtspUrl, rawSdp, sdpCtx) + nazalog.Assert(nil, err) + nazalog.Info("push succ.") + + }, + pushSession.WriteRtpPacket, // remuxer的数据给push发送 + ) + + nazalog.Info("start pull.") + err := pullSession.Pull(inRtmpUrl, remuxer.FeedRtmpMsg) // pull接收的数据放入remuxer中 + nazalog.Assert(nil, err) + nazalog.Info("pull succ.") + + select { + case err := <-pullSession.WaitChan(): + nazalog.Fatalf("pull stopped. err=%+v", err) + case err := <-pushSession.WaitChan(): + nazalog.Fatalf("push stopped. err=%+v", err) + } +} + +func parseFlag() (inRtmpUrl string, outRtspUrl string, overTcp int) { + i := flag.String("i", "", "specify pull rtmp url") + o := flag.String("o", "", "specify push rtsp url") + t := flag.Int("t", 0, "specify rtsp interleaved mode(rtp/rtcp over tcp)") + flag.Parse() + if *i == "" || *o == "" { + flag.Usage() + _, _ = fmt.Fprintf(os.Stderr, `Example: + %s -i rtmp://localhost:1935/live/test110 -o rtsp://localhost:5544/live/test220 -t 0 + %s -i rtmp://localhost:1935/live/test110 -o rtsp://localhost:5544/live/test220 -t 1 +`, os.Args[0], os.Args[0]) + base.OsExitAndWaitPressIfWindows(1) + } + return *i, *o, *t +} diff --git a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go index 75a3b16..c95637b 100644 --- a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go +++ b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go @@ -75,7 +75,7 @@ func main() { } } -func parseFlag() (inUrl string, outFilename string, overTcp int) { +func parseFlag() (inUrl string, outUrl string, overTcp int) { i := flag.String("i", "", "specify pull rtsp url") o := flag.String("o", "", "specify push rtmp url") t := flag.Int("t", 0, "specify interleaved mode(rtp/rtcp over tcp)") diff --git a/pkg/aac/aac.go b/pkg/aac/aac.go index e2cf6d2..f9a7f5f 100644 --- a/pkg/aac/aac.go +++ b/pkg/aac/aac.go @@ -28,8 +28,14 @@ import ( var ErrAac = errors.New("lal.aac: fxxk") const ( - minAscLength = 2 AdtsHeaderLength = 7 + + AscSamplingFrequencyIndex48000 = 3 + AscSamplingFrequencyIndex44100 = 4 +) + +const ( + minAscLength = 2 ) // @@ -150,6 +156,16 @@ func (ascCtx *AscContext) PackToAdtsHeader(out []byte, frameLength int) error { return nil } +func (ascCtx *AscContext) GetSamplingFrequency() (int, error) { + switch ascCtx.SamplingFrequencyIndex { + case AscSamplingFrequencyIndex48000: + return 48000, nil + case AscSamplingFrequencyIndex44100: + return 44100, nil + } + return -1, ErrAac +} + type AdtsHeaderContext struct { AscCtx AscContext diff --git a/pkg/remux/rtmp2rtsp.go b/pkg/remux/rtmp2rtsp.go new file mode 100644 index 0000000..04469e1 --- /dev/null +++ b/pkg/remux/rtmp2rtsp.go @@ -0,0 +1,210 @@ +// Copyright 2021, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package remux + +import ( + "math/rand" + "time" + + "github.com/q191201771/lal/pkg/aac" + "github.com/q191201771/lal/pkg/avc" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/hevc" + "github.com/q191201771/lal/pkg/rtprtcp" + "github.com/q191201771/lal/pkg/sdp" + "github.com/q191201771/naza/pkg/nazalog" +) + +// TODO(chef): refactor 将analyze部分独立出来作为一个filter + +var ( + // config + // TODO(chef): 提供option,另外还有ssrc和pt都支持自定义 + maxAnalyzeAvMsgSize = 16 +) + +// 提供rtmp数据向sdp+rtp数据的转换 +type Rtmp2RtspRemuxer struct { + onSdp OnSdp + onRtpPacket OnRtpPacket + + analyzeDone bool + msgCache []base.RtmpMsg + vps, sps, pps, asc []byte + audioPt base.AvPacketPt + videoPt base.AvPacketPt + + audioSsrc uint32 + videoSsrc uint32 + audioPacker *rtprtcp.RtpPacker + videoPacker *rtprtcp.RtpPacker +} + +type OnSdp func(rawSdp []byte, sdpCtx sdp.LogicContext) +type OnRtpPacket func(pkt rtprtcp.RtpPacket) + +func NewRtmp2RtspRemuxer(onSdp OnSdp, onRtpPacket OnRtpPacket) *Rtmp2RtspRemuxer { + return &Rtmp2RtspRemuxer{ + onSdp: onSdp, + onRtpPacket: onRtpPacket, + audioPt: base.AvPacketPtUnknown, + videoPt: base.AvPacketPtUnknown, + } +} + +// @param msg: 函数调用结束后,内部不持有`msg`内存块 +// +func (r *Rtmp2RtspRemuxer) FeedRtmpMsg(msg base.RtmpMsg) { + var err error + + if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata { + // noop + return + } + + // 我们需要先接收一部分rtmp数据,得到音频头、视频头 + // 并且考虑,流中只有音频或只有视频的情况 + // 我们把前面这个阶段叫做Analyze分析阶段 + + if !r.analyzeDone { + if msg.IsAvcKeySeqHeader() || msg.IsHevcKeySeqHeader() { + if msg.IsAvcKeySeqHeader() { + r.sps, r.pps, err = avc.ParseSpsPpsFromSeqHeader(msg.Payload) + nazalog.Assert(nil, err) + } else if msg.IsHevcKeySeqHeader() { + r.vps, r.sps, r.pps, err = hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload) + nazalog.Assert(nil, err) + } + r.doAnalyze() + return + } + + if msg.IsAacSeqHeader() { + r.asc = msg.Clone().Payload[2:] + r.doAnalyze() + return + } + + r.msgCache = append(r.msgCache, msg.Clone()) + r.doAnalyze() + return + } + + // 正常阶段 + + // 音视频头已通过sdp回调,rtp数据中不再包含音视频头 + if msg.IsAvcKeySeqHeader() || msg.IsHevcKeySeqHeader() || msg.IsAacSeqHeader() { + return + } + + r.remux(msg) +} + +func (r *Rtmp2RtspRemuxer) doAnalyze() { + nazalog.Assert(false, r.analyzeDone) + + if r.isAnalyzeEnough() { + if r.sps != nil && r.pps != nil { + if r.vps != nil { + r.videoPt = base.AvPacketPtHevc + } else { + r.videoPt = base.AvPacketPtAvc + } + } + if r.asc != nil { + r.audioPt = base.AvPacketPtAac + } + + // 回调sdp + ctx, rawSdp, err := sdp.Pack(r.vps, r.sps, r.pps, r.asc) + nazalog.Assert(nil, err) + r.onSdp(rawSdp, ctx) + + // 分析阶段缓存的数据 + for i := range r.msgCache { + r.remux(r.msgCache[i]) + } + r.msgCache = nil + + r.analyzeDone = true + } +} + +// 是否应该退出Analyze阶段 +func (r *Rtmp2RtspRemuxer) isAnalyzeEnough() bool { + // 音视频头都收集好了 + if r.sps != nil && r.pps != nil && r.asc != nil { + return true + } + + // 达到分析包数阈值了 + if len(r.msgCache) >= maxAnalyzeAvMsgSize { + return true + } + + return false +} + +func (r *Rtmp2RtspRemuxer) remux(msg base.RtmpMsg) { + var rtppkts []rtprtcp.RtpPacket + switch msg.Header.MsgTypeId { + case base.RtmpTypeIdAudio: + rtppkts = r.getAudioPacker().Pack(base.AvPacket{ + Timestamp: msg.Header.TimestampAbs, + PayloadType: r.audioPt, + Payload: msg.Payload[2:], + }) + case base.RtmpTypeIdVideo: + rtppkts = r.getVideoPacker().Pack(base.AvPacket{ + Timestamp: msg.Header.TimestampAbs, + PayloadType: r.videoPt, + Payload: msg.Payload[5:], + }) + } + + for i := range rtppkts { + r.onRtpPacket(rtppkts[i]) + } +} + +func (r *Rtmp2RtspRemuxer) getAudioPacker() *rtprtcp.RtpPacker { + if r.audioPacker == nil { + // TODO(chef): ssrc随机产生,并且整个lal没有在setup信令中传递ssrc + r.audioSsrc = rand.Uint32() + + // TODO(chef): 如果rtmp不是以音视频头开始,也可能收到了帧数据,但是头不存在,目前该remux没有做过多容错判断,后续要加上,或者在输入层保证 + ascCtx, err := aac.NewAscContext(r.asc) + if err != nil { + nazalog.Errorf("parse asc failed. err=%+v", err) + } + clockRate, err := ascCtx.GetSamplingFrequency() + if err != nil { + nazalog.Errorf("get sampling frequency failed. err=%+v", err) + } + + pp := rtprtcp.NewRtpPackerPayloadAac() + r.audioPacker = rtprtcp.NewRtpPacker(pp, clockRate, r.audioSsrc) + } + return r.audioPacker +} + +func (r *Rtmp2RtspRemuxer) getVideoPacker() *rtprtcp.RtpPacker { + if r.videoPacker == nil { + r.videoSsrc = rand.Uint32() + pp := rtprtcp.NewRtpPackerPayloadAvcHevc(r.videoPt, func(option *rtprtcp.RtpPackerPayloadAvcHevcOption) { + option.Typ = rtprtcp.RtpPackerPayloadAvcHevcTypeAvcc + }) + r.videoPacker = rtprtcp.NewRtpPacker(pp, 90000, r.videoSsrc) + } + return r.videoPacker +} + +func init() { + rand.Seed(time.Now().UnixNano()) +} diff --git a/pkg/rtprtcp/rtp_packer_payload_avc_hevc.go b/pkg/rtprtcp/rtp_packer_payload_avc_hevc.go index b4ebb8e..1625830 100644 --- a/pkg/rtprtcp/rtp_packer_payload_avc_hevc.go +++ b/pkg/rtprtcp/rtp_packer_payload_avc_hevc.go @@ -20,9 +20,6 @@ const ( RtpPackerPayloadAvcHevcTypeNalu RtpPackerPayloadAvcHevcType = 1 RtpPackerPayloadAvcHevcTypeAvcc = 2 RtpPackerPayloadAvcHevcTypeAnnexb = 3 - RtpPackerPayloadHevcTypeNalu = RtpPackerPayloadAvcHevcTypeNalu // hevc的外层格式和avc是一样的 - RtpPackerPayloadHevcTypeAvcc = RtpPackerPayloadAvcHevcTypeAvcc - RtpPackerPayloadHevcTypeAnnexb = RtpPackerPayloadAvcHevcTypeAnnexb ) type RtpPackerPayloadAvcHevcOption struct { @@ -41,14 +38,14 @@ type RtpPackerPayloadAvcHevc struct { type ModRtpPackerPayloadAvcHevcOption func(option *RtpPackerPayloadAvcHevcOption) func NewRtpPackerPayloadAvc(modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc { - return newRtpPackerPayloadAvcHevc(base.AvPacketPtAvc, modOptions...) + return NewRtpPackerPayloadAvcHevc(base.AvPacketPtAvc, modOptions...) } func NewRtpPackerPayloadHevc(modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc { - return newRtpPackerPayloadAvcHevc(base.AvPacketPtHevc, modOptions...) + return NewRtpPackerPayloadAvcHevc(base.AvPacketPtHevc, modOptions...) } -func newRtpPackerPayloadAvcHevc(payloadType base.AvPacketPt, modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc { +func NewRtpPackerPayloadAvcHevc(payloadType base.AvPacketPt, modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc { option := defaultRtpPackerPayloadAvcHevcOption for _, fn := range modOptions { fn(&option) diff --git a/pkg/rtprtcp/rtp_unpacker.go b/pkg/rtprtcp/rtp_unpacker.go index 84c63a6..ff7f5c7 100644 --- a/pkg/rtprtcp/rtp_unpacker.go +++ b/pkg/rtprtcp/rtp_unpacker.go @@ -60,7 +60,7 @@ type IRtpUnpackerProtocol interface { // 假如sps和pps是一个stapA包,则合并结果为一个AvPacket type OnAvPacket func(pkt base.AvPacket) -// 目前支持AVC,HEVC和AAC MPEG4-GENERIC/44100/2,业务方也可以自己实现IRtpUnpackerProtocol,甚至是IRtpUnpackContainer +// 目前支持AVC,HEVC和AAC MPEG4-GENERIC,业务方也可以自己实现IRtpUnpackerProtocol,甚至是IRtpUnpackContainer func DefaultRtpUnpackerFactory(payloadType base.AvPacketPt, clockRate int, maxSize int, onAvPacket OnAvPacket) IRtpUnpacker { var protocol IRtpUnpackerProtocol switch payloadType { diff --git a/pkg/rtprtcp/rtp_unpacker_aac.go b/pkg/rtprtcp/rtp_unpacker_aac.go index c0262f2..2cd14ed 100644 --- a/pkg/rtprtcp/rtp_unpacker_aac.go +++ b/pkg/rtprtcp/rtp_unpacker_aac.go @@ -203,7 +203,7 @@ func parseAu(b []byte) (ret []au) { if (nbAuHeaders > 1 && pau != uint32(len(b))) || (nbAuHeaders == 1 && pau < uint32(len(b))) { - nazalog.Warnf("rtp packet size invalid. nbAuHeaders=%d, pau=%d, len(b)=%d", nbAuHeaders, pau, len(b)) + nazalog.Warnf("rtp packet size invalid. nbAuHeaders=%d, pau=%d, len(b)=%d, auHeadersLength=%d", nbAuHeaders, pau, len(b), auHeadersLength) } return diff --git a/pkg/rtsp/base_in_session.go b/pkg/rtsp/base_in_session.go index 1e7313d..b4d939a 100644 --- a/pkg/rtsp/base_in_session.go +++ b/pkg/rtsp/base_in_session.go @@ -114,12 +114,12 @@ func (session *BaseInSession) InitWithSdp(rawSdp []byte, sdpLogicCtx sdp.LogicCo if session.sdpLogicCtx.IsAudioUnpackable() { session.audioUnpacker = rtprtcp.DefaultRtpUnpackerFactory(session.sdpLogicCtx.GetAudioPayloadTypeBase(), session.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, session.onAvPacketUnpacked) } else { - nazalog.Warnf("[%s] audio unpacker not support for this type yet.", session.uniqueKey) + nazalog.Warnf("[%s] audio unpacker not support for this type yet. logicCtx=%+v", session.uniqueKey, session.sdpLogicCtx) } if session.sdpLogicCtx.IsVideoUnpackable() { session.videoUnpacker = rtprtcp.DefaultRtpUnpackerFactory(session.sdpLogicCtx.GetVideoPayloadTypeBase(), session.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, session.onAvPacketUnpacked) } else { - nazalog.Warnf("[%s] video unpacker not support this type yet.", session.uniqueKey) + nazalog.Warnf("[%s] video unpacker not support this type yet. logicCtx=%+v", session.uniqueKey, session.sdpLogicCtx) } session.audioRrProducer = rtprtcp.NewRrProducer(session.sdpLogicCtx.AudioClockRate) diff --git a/pkg/sdp/pack.go b/pkg/sdp/pack.go index 87f0227..8d4b5ed 100644 --- a/pkg/sdp/pack.go +++ b/pkg/sdp/pack.go @@ -12,13 +12,16 @@ import ( "encoding/base64" "encoding/hex" "fmt" + "strings" + + "github.com/q191201771/lal/pkg/aac" "github.com/q191201771/lal/pkg/base" ) func Pack(vps, sps, pps, asc []byte) (ctx LogicContext, raw []byte, err error) { + // 判断音频、视频是否存在,以及视频是H264还是H265 var hasAudio, hasVideo, isHevc bool - if sps != nil && pps != nil { hasVideo = true if vps != nil { @@ -34,24 +37,43 @@ func Pack(vps, sps, pps, asc []byte) (ctx LogicContext, raw []byte, err error) { return } + // 判断AAC的采样率 + var samplingFrequency int + if asc != nil { + var ascCtx *aac.AscContext + ascCtx, err = aac.NewAscContext(asc) + if err != nil { + return + } + samplingFrequency, err = ascCtx.GetSamplingFrequency() + if err != nil { + return + } + } + sdpStr := fmt.Sprintf(`v=0 - o=- 0 0 IN IP4 127.0.0.1 - s=No Name - c=IN IP4 127.0.0.1 - t=0 0 - a=tool:%s +o=- 0 0 IN IP4 127.0.0.1 +s=No Name +c=IN IP4 127.0.0.1 +t=0 0 +a=tool:%s `, base.LalPackSdp) streamid := 0 if hasVideo { if isHevc { - + tmpl := `m=video 0 RTP/AVP 98 +a=rtpmap:98 H265/90000 +a=fmtp:98 profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s +a=control:streamid=%d +` + sdpStr += fmt.Sprintf(tmpl, base64.StdEncoding.EncodeToString(sps), base64.StdEncoding.EncodeToString(pps), base64.StdEncoding.EncodeToString(vps), streamid) } else { tmpl := `m=video 0 RTP/AVP 96 - a=rtpmap:96 H264/90000 - a=fmtp:96 packetization-mode=1; sprop-parameter-sets=%s,%s; profile-level-id=640016 - a=control:streamid=%d +a=rtpmap:96 H264/90000 +a=fmtp:96 packetization-mode=1; sprop-parameter-sets=%s,%s; profile-level-id=640016 +a=control:streamid=%d ` sdpStr += fmt.Sprintf(tmpl, base64.StdEncoding.EncodeToString(sps), base64.StdEncoding.EncodeToString(pps), streamid) } @@ -61,15 +83,15 @@ func Pack(vps, sps, pps, asc []byte) (ctx LogicContext, raw []byte, err error) { if hasAudio { tmpl := `m=audio 0 RTP/AVP 97 - b=AS:128 - a=rtpmap:97 MPEG4-GENERIC/44100/2 - a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=%s - a=control:streamid=%d +b=AS:128 +a=rtpmap:97 MPEG4-GENERIC/%d/2 +a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=%s +a=control:streamid=%d ` - sdpStr += fmt.Sprintf(tmpl, hex.EncodeToString(asc), streamid) + sdpStr += fmt.Sprintf(tmpl, samplingFrequency, hex.EncodeToString(asc), streamid) } - raw = []byte(sdpStr) + raw = []byte(strings.ReplaceAll(sdpStr, "\n", "\r\n")) ctx, err = ParseSdp2LogicContext(raw) return }