Merge pull request #11 from q191201771/master

更新lal到最新
pull/99/head
joestarzxh 4 years ago committed by GitHub
commit d4f261d73f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

2
.gitignore vendored

@ -1,5 +1,3 @@
/app/demo/pullrtmp2pushrtsp/
profile.out
coverage.html
*.aac

@ -1,133 +0,0 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"flag"
"strconv"
"strings"
"sync"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/lru"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/nazalog"
)
// 分析诊断HLS的时间戳。注意这个程序还没有完成。
//
// TODO chef: 有的代码考虑弄到pkg/hls中
type M3u8PullSession struct {
}
type frag struct {
extinf float64
filename string
}
func parseM3u8(content string) (ret []frag) {
var err error
lines := strings.Split(content, "\n")
var f frag
for _, line := range lines {
if strings.HasPrefix(line, "#EXTINF:") {
line = strings.TrimPrefix(line, "#EXTINF:")
line = strings.TrimSuffix(line, ",")
f.extinf, err = strconv.ParseFloat(line, 64)
nazalog.Assert(nil, err)
}
if strings.Index(line, ".ts") != -1 {
f.filename = line
ret = append(ret, f)
}
}
return
}
func getTsUrl(m3u8Url string, tsFilename string) string {
index := strings.LastIndex(m3u8Url, "/")
nazalog.Assert(true, index != -1)
path := m3u8Url[:index+1]
return path + tsFilename
}
func main() {
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
})
defer nazalog.Sync()
m3u8Url := parseFlag()
nazalog.Infof("m3u8 url=%s", m3u8Url)
cache := lru.New(1024)
var m sync.Mutex
var frags []frag
go func() {
for {
content, err := nazahttp.GetHttpFile(m3u8Url, 3000)
if err != nil {
nazalog.Error(err)
return
}
//nazalog.Debugf("\n-----m3u8-----\n%s", string(content))
currFrags := parseM3u8(string(content))
//nazalog.Debugf("%+v", currFrags)
m.Lock()
for _, f := range currFrags {
if _, exist := cache.Get(f.filename); exist {
continue
}
cache.Put(f.filename, nil)
nazalog.Infof("> new frag. filename=%s", f.filename)
frags = append(frags, f)
}
m.Unlock()
time.Sleep(100 * time.Millisecond)
}
}()
for {
m.Lock()
currFrags := frags
frags = nil
m.Unlock()
for _, f := range currFrags {
nazalog.Infof("< new frag. filename=%s", f.filename)
tsUrl := getTsUrl(m3u8Url, f.filename)
nazalog.Debug(tsUrl)
content, err := nazahttp.GetHttpFile(tsUrl, 3000)
nazalog.Assert(nil, err)
nazalog.Debugf("TS len=%d", len(content))
}
time.Sleep(100 * time.Millisecond)
}
}
func parseFlag() string {
url := flag.String("i", "", "specify m3u8 url")
flag.Parse()
if *url == "" {
flag.Usage()
base.OsExitAndWaitPressIfWindows(1)
}
return *url
}

@ -1,100 +0,0 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"io/ioutil"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/nazalog"
)
// 学习如何解析TS文件。注意该程序还没有写完。
var (
pat mpegts.Pat
pmt mpegts.Pmt
pid2stream map[uint16]*Stream
)
type Stream struct {
}
var filename = "/Volumes/Data/nrm-0.ts"
func handlePacket(packet []byte) {
h := mpegts.ParseTsPacketHeader(packet)
index := 4
nazalog.Debugf("%+v", h)
var adaptation mpegts.TsPacketAdaptation
switch h.Adaptation {
case mpegts.AdaptationFieldControlNo:
// noop
case mpegts.AdaptationFieldControlFollowed:
adaptation = mpegts.ParseTsPacketAdaptation(packet[4:])
index++
default:
nazalog.Warn(h.Adaptation)
}
index += int(adaptation.Length)
if h.Pid == mpegts.PidPat {
if h.PayloadUnitStart == 1 {
index++
}
pat = mpegts.ParsePat(packet[index:])
nazalog.Debugf("%+v", pat)
return
}
if pat.SearchPid(h.Pid) {
if h.PayloadUnitStart == 1 {
index++
}
pmt = mpegts.ParsePmt(packet[index:])
nazalog.Debugf("%+v", pmt)
for _, ele := range pmt.ProgramElements {
pid2stream[ele.Pid] = &Stream{}
}
return
}
_, ok := pid2stream[h.Pid]
if !ok {
nazalog.Warn(h.Pid)
}
// 判断是否有PES
if h.PayloadUnitStart == 1 {
pes, length := mpegts.ParsePes(packet[index:])
nazalog.Debugf("%+v, %d", pes, length)
}
}
func main() {
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
})
defer nazalog.Sync()
pid2stream = make(map[uint16]*Stream)
content, err := ioutil.ReadFile(filename)
nazalog.Assert(nil, err)
packets, _ := hls.SplitFragment2TsPackets(content)
for _, packet := range packets {
handlePacket(packet)
}
}

@ -70,8 +70,8 @@ func parseFlag() (url string, hlsOutPath string, fragmentDurationMs int, fragmen
flag.Usage()
eo := filepath.FromSlash("./pullrtmp2hls/")
_, _ = fmt.Fprintf(os.Stderr, `Example:
%s -i rtmp://127.0.0.1:19350/live/test110 -o %s
%s -i rtmp://127.0.0.1:19350/live/test110 -o %s -d 5000 -n 5
%s -i rtmp://127.0.0.1:1935/live/test110 -o %s
%s -i rtmp://127.0.0.1:1935/live/test110 -o %s -d 5000 -n 5
`, os.Args[0], eo, os.Args[0], eo)
base.OsExitAndWaitPressIfWindows(1)
}

@ -0,0 +1,75 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"flag"
"fmt"
"os"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/nazalog"
)
func main() {
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
})
inRtmpUrl, outRtspUrl, overTcp := parseFlag()
pullSession := rtmp.NewPullSession()
pushSession := rtsp.NewPushSession(func(option *rtsp.PushSessionOption) {
option.OverTcp = overTcp == 1
})
remuxer := remux.NewRtmp2RtspRemuxer(
func(rawSdp []byte, sdpCtx sdp.LogicContext) {
// remuxer完成前期工作生成sdp并开始push
nazalog.Info("start push.")
err := pushSession.Push(outRtspUrl, rawSdp, sdpCtx)
nazalog.Assert(nil, err)
nazalog.Info("push succ.")
},
pushSession.WriteRtpPacket, // remuxer的数据给push发送
)
nazalog.Info("start pull.")
err := pullSession.Pull(inRtmpUrl, remuxer.FeedRtmpMsg) // pull接收的数据放入remuxer中
nazalog.Assert(nil, err)
nazalog.Info("pull succ.")
select {
case err := <-pullSession.WaitChan():
nazalog.Fatalf("pull stopped. err=%+v", err)
case err := <-pushSession.WaitChan():
nazalog.Fatalf("push stopped. err=%+v", err)
}
}
func parseFlag() (inRtmpUrl string, outRtspUrl string, overTcp int) {
i := flag.String("i", "", "specify pull rtmp url")
o := flag.String("o", "", "specify push rtsp url")
t := flag.Int("t", 0, "specify rtsp interleaved mode(rtp/rtcp over tcp)")
flag.Parse()
if *i == "" || *o == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
%s -i rtmp://localhost:1935/live/test110 -o rtsp://localhost:5544/live/test220 -t 0
%s -i rtmp://localhost:1935/live/test110 -o rtsp://localhost:5544/live/test220 -t 1
`, os.Args[0], os.Args[0])
base.OsExitAndWaitPressIfWindows(1)
}
return *i, *o, *t
}

@ -75,7 +75,7 @@ func main() {
}
}
func parseFlag() (inUrl string, outFilename string, overTcp int) {
func parseFlag() (inUrl string, outUrl string, overTcp int) {
i := flag.String("i", "", "specify pull rtsp url")
o := flag.String("o", "", "specify push rtmp url")
t := flag.Int("t", 0, "specify interleaved mode(rtp/rtcp over tcp)")

@ -1,98 +0,0 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"bytes"
"encoding/hex"
"io/ioutil"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/nazalog"
)
// 临时小工具比较两个TS文件。注意该程序还没有写完。
var filename1 = "/tmp/lal/hls/innertest/innertest-7.ts"
var filename2 = "/tmp/lal/hls/innertest.bak/innertest-7.ts"
func skipPacketFilter(tss [][]byte) (ret [][]byte) {
for _, ts := range tss {
h := mpegts.ParseTsPacketHeader(ts)
if h.Pid == mpegts.PidAudio {
continue
}
ret = append(ret, ts)
}
return
}
func parsePacket(packet []byte) {
h := mpegts.ParseTsPacketHeader(packet)
nazalog.Debugf("%+v", h)
index := 4
var adaptation mpegts.TsPacketAdaptation
switch h.Adaptation {
case mpegts.AdaptationFieldControlNo:
// noop
case mpegts.AdaptationFieldControlFollowed:
adaptation = mpegts.ParseTsPacketAdaptation(packet[4:])
index++
default:
nazalog.Warn(h.Adaptation)
}
index += int(adaptation.Length)
if h.PayloadUnitStart == 1 && h.Pid == 256 {
pes, length := mpegts.ParsePes(packet[index:])
nazalog.Debugf("%+v, %d", pes, length)
}
}
func main() {
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
})
defer nazalog.Sync()
content1, err := ioutil.ReadFile(filename1)
nazalog.Assert(nil, err)
content2, err := ioutil.ReadFile(filename2)
nazalog.Assert(nil, err)
tss1, _ := hls.SplitFragment2TsPackets(content1)
tss2, _ := hls.SplitFragment2TsPackets(content2)
nazalog.Debugf("num of ts1=%d, num of ts2=%d", len(tss1), len(tss2))
//tss1 = skipPacketFilter(tss1)
//tss2 = skipPacketFilter(tss2)
nazalog.Debugf("after skip. num of ts1=%d, num of ts2=%d", len(tss1), len(tss2))
m := len(tss1)
if m > len(tss2) {
m = len(tss2)
}
for i := 0; i < m; i++ {
if !bytes.Equal(tss1[i], tss2[i]) {
nazalog.Debug(i)
parsePacket(tss1[i])
parsePacket(tss2[i])
nazalog.Debugf("\n%s", hex.Dump(tss1[i]))
nazalog.Debugf("\n%s", hex.Dump(tss2[i]))
}
}
}

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.13
require github.com/q191201771/naza v0.19.3
require github.com/q191201771/naza v0.20.0

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.19.3 h1:cvjBztFJjvqbr8uSFkFFKq39HgA4ryC7z6qTY4y8ayI=
github.com/q191201771/naza v0.19.3/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0=
github.com/q191201771/naza v0.20.0 h1:VRessjnJDQgclflzUUMy8qIzLYtblaLv2po6wb4rMNM=
github.com/q191201771/naza v0.20.0/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0=

@ -28,8 +28,14 @@ import (
var ErrAac = errors.New("lal.aac: fxxk")
const (
minAscLength = 2
AdtsHeaderLength = 7
AscSamplingFrequencyIndex48000 = 3
AscSamplingFrequencyIndex44100 = 4
)
const (
minAscLength = 2
)
// <ISO_IEC_14496-3.pdf>
@ -150,6 +156,16 @@ func (ascCtx *AscContext) PackToAdtsHeader(out []byte, frameLength int) error {
return nil
}
func (ascCtx *AscContext) GetSamplingFrequency() (int, error) {
switch ascCtx.SamplingFrequencyIndex {
case AscSamplingFrequencyIndex48000:
return 48000, nil
case AscSamplingFrequencyIndex44100:
return 44100, nil
}
return -1, ErrAac
}
type AdtsHeaderContext struct {
AscCtx AscContext

@ -0,0 +1,210 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package remux
import (
"math/rand"
"time"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/nazalog"
)
// TODO(chef): refactor 将analyze部分独立出来作为一个filter
var (
// config
// TODO(chef): 提供option另外还有ssrc和pt都支持自定义
maxAnalyzeAvMsgSize = 16
)
// 提供rtmp数据向sdp+rtp数据的转换
type Rtmp2RtspRemuxer struct {
onSdp OnSdp
onRtpPacket OnRtpPacket
analyzeDone bool
msgCache []base.RtmpMsg
vps, sps, pps, asc []byte
audioPt base.AvPacketPt
videoPt base.AvPacketPt
audioSsrc uint32
videoSsrc uint32
audioPacker *rtprtcp.RtpPacker
videoPacker *rtprtcp.RtpPacker
}
type OnSdp func(rawSdp []byte, sdpCtx sdp.LogicContext)
type OnRtpPacket func(pkt rtprtcp.RtpPacket)
func NewRtmp2RtspRemuxer(onSdp OnSdp, onRtpPacket OnRtpPacket) *Rtmp2RtspRemuxer {
return &Rtmp2RtspRemuxer{
onSdp: onSdp,
onRtpPacket: onRtpPacket,
audioPt: base.AvPacketPtUnknown,
videoPt: base.AvPacketPtUnknown,
}
}
// @param msg: 函数调用结束后,内部不持有`msg`内存块
//
func (r *Rtmp2RtspRemuxer) FeedRtmpMsg(msg base.RtmpMsg) {
var err error
if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
// noop
return
}
// 我们需要先接收一部分rtmp数据得到音频头、视频头
// 并且考虑,流中只有音频或只有视频的情况
// 我们把前面这个阶段叫做Analyze分析阶段
if !r.analyzeDone {
if msg.IsAvcKeySeqHeader() || msg.IsHevcKeySeqHeader() {
if msg.IsAvcKeySeqHeader() {
r.sps, r.pps, err = avc.ParseSpsPpsFromSeqHeader(msg.Payload)
nazalog.Assert(nil, err)
} else if msg.IsHevcKeySeqHeader() {
r.vps, r.sps, r.pps, err = hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload)
nazalog.Assert(nil, err)
}
r.doAnalyze()
return
}
if msg.IsAacSeqHeader() {
r.asc = msg.Clone().Payload[2:]
r.doAnalyze()
return
}
r.msgCache = append(r.msgCache, msg.Clone())
r.doAnalyze()
return
}
// 正常阶段
// 音视频头已通过sdp回调rtp数据中不再包含音视频头
if msg.IsAvcKeySeqHeader() || msg.IsHevcKeySeqHeader() || msg.IsAacSeqHeader() {
return
}
r.remux(msg)
}
func (r *Rtmp2RtspRemuxer) doAnalyze() {
nazalog.Assert(false, r.analyzeDone)
if r.isAnalyzeEnough() {
if r.sps != nil && r.pps != nil {
if r.vps != nil {
r.videoPt = base.AvPacketPtHevc
} else {
r.videoPt = base.AvPacketPtAvc
}
}
if r.asc != nil {
r.audioPt = base.AvPacketPtAac
}
// 回调sdp
ctx, rawSdp, err := sdp.Pack(r.vps, r.sps, r.pps, r.asc)
nazalog.Assert(nil, err)
r.onSdp(rawSdp, ctx)
// 分析阶段缓存的数据
for i := range r.msgCache {
r.remux(r.msgCache[i])
}
r.msgCache = nil
r.analyzeDone = true
}
}
// 是否应该退出Analyze阶段
func (r *Rtmp2RtspRemuxer) isAnalyzeEnough() bool {
// 音视频头都收集好了
if r.sps != nil && r.pps != nil && r.asc != nil {
return true
}
// 达到分析包数阈值了
if len(r.msgCache) >= maxAnalyzeAvMsgSize {
return true
}
return false
}
func (r *Rtmp2RtspRemuxer) remux(msg base.RtmpMsg) {
var rtppkts []rtprtcp.RtpPacket
switch msg.Header.MsgTypeId {
case base.RtmpTypeIdAudio:
rtppkts = r.getAudioPacker().Pack(base.AvPacket{
Timestamp: msg.Header.TimestampAbs,
PayloadType: r.audioPt,
Payload: msg.Payload[2:],
})
case base.RtmpTypeIdVideo:
rtppkts = r.getVideoPacker().Pack(base.AvPacket{
Timestamp: msg.Header.TimestampAbs,
PayloadType: r.videoPt,
Payload: msg.Payload[5:],
})
}
for i := range rtppkts {
r.onRtpPacket(rtppkts[i])
}
}
func (r *Rtmp2RtspRemuxer) getAudioPacker() *rtprtcp.RtpPacker {
if r.audioPacker == nil {
// TODO(chef): ssrc随机产生并且整个lal没有在setup信令中传递ssrc
r.audioSsrc = rand.Uint32()
// TODO(chef): 如果rtmp不是以音视频头开始也可能收到了帧数据但是头不存在目前该remux没有做过多容错判断后续要加上或者在输入层保证
ascCtx, err := aac.NewAscContext(r.asc)
if err != nil {
nazalog.Errorf("parse asc failed. err=%+v", err)
}
clockRate, err := ascCtx.GetSamplingFrequency()
if err != nil {
nazalog.Errorf("get sampling frequency failed. err=%+v", err)
}
pp := rtprtcp.NewRtpPackerPayloadAac()
r.audioPacker = rtprtcp.NewRtpPacker(pp, clockRate, r.audioSsrc)
}
return r.audioPacker
}
func (r *Rtmp2RtspRemuxer) getVideoPacker() *rtprtcp.RtpPacker {
if r.videoPacker == nil {
r.videoSsrc = rand.Uint32()
pp := rtprtcp.NewRtpPackerPayloadAvcHevc(r.videoPt, func(option *rtprtcp.RtpPackerPayloadAvcHevcOption) {
option.Typ = rtprtcp.RtpPackerPayloadAvcHevcTypeAvcc
})
r.videoPacker = rtprtcp.NewRtpPacker(pp, 90000, r.videoSsrc)
}
return r.videoPacker
}
func init() {
rand.Seed(time.Now().UnixNano())
}

@ -39,8 +39,8 @@ func (r *RtpPackerPayloadAac) Pack(in []byte, maxSize int) (out [][]byte) {
item := make([]byte, 4+len(in))
item[0] = 0
item[1] = uint8(auHeadersLength*8 - 7)
item[2] = uint8(len(in) >> 5 & 0xFF)
item[3] = uint8(len(in) & 0x1F)
item[2] = uint8(len(in) >> 5)
item[3] = uint8((len(in) & 0x1F) << 3)
copy(item[4:], in)
out = append(out, item)
return

@ -20,9 +20,6 @@ const (
RtpPackerPayloadAvcHevcTypeNalu RtpPackerPayloadAvcHevcType = 1
RtpPackerPayloadAvcHevcTypeAvcc = 2
RtpPackerPayloadAvcHevcTypeAnnexb = 3
RtpPackerPayloadHevcTypeNalu = RtpPackerPayloadAvcHevcTypeNalu // hevc的外层格式和avc是一样的
RtpPackerPayloadHevcTypeAvcc = RtpPackerPayloadAvcHevcTypeAvcc
RtpPackerPayloadHevcTypeAnnexb = RtpPackerPayloadAvcHevcTypeAnnexb
)
type RtpPackerPayloadAvcHevcOption struct {
@ -41,14 +38,14 @@ type RtpPackerPayloadAvcHevc struct {
type ModRtpPackerPayloadAvcHevcOption func(option *RtpPackerPayloadAvcHevcOption)
func NewRtpPackerPayloadAvc(modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc {
return newRtpPackerPayloadAvcHevc(base.AvPacketPtAvc, modOptions...)
return NewRtpPackerPayloadAvcHevc(base.AvPacketPtAvc, modOptions...)
}
func NewRtpPackerPayloadHevc(modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc {
return newRtpPackerPayloadAvcHevc(base.AvPacketPtHevc, modOptions...)
return NewRtpPackerPayloadAvcHevc(base.AvPacketPtHevc, modOptions...)
}
func newRtpPackerPayloadAvcHevc(payloadType base.AvPacketPt, modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc {
func NewRtpPackerPayloadAvcHevc(payloadType base.AvPacketPt, modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc {
option := defaultRtpPackerPayloadAvcHevcOption
for _, fn := range modOptions {
fn(&option)

@ -60,7 +60,7 @@ type IRtpUnpackerProtocol interface {
// 假如sps和pps是一个stapA包则合并结果为一个AvPacket
type OnAvPacket func(pkt base.AvPacket)
// 目前支持AVCHEVC和AAC MPEG4-GENERIC/44100/2业务方也可以自己实现IRtpUnpackerProtocol甚至是IRtpUnpackContainer
// 目前支持AVCHEVC和AAC MPEG4-GENERIC业务方也可以自己实现IRtpUnpackerProtocol甚至是IRtpUnpackContainer
func DefaultRtpUnpackerFactory(payloadType base.AvPacketPt, clockRate int, maxSize int, onAvPacket OnAvPacket) IRtpUnpacker {
var protocol IRtpUnpackerProtocol
switch payloadType {

@ -203,7 +203,7 @@ func parseAu(b []byte) (ret []au) {
if (nbAuHeaders > 1 && pau != uint32(len(b))) ||
(nbAuHeaders == 1 && pau < uint32(len(b))) {
nazalog.Warnf("rtp packet size invalid. nbAuHeaders=%d, pau=%d, len(b)=%d", nbAuHeaders, pau, len(b))
nazalog.Warnf("rtp packet size invalid. nbAuHeaders=%d, pau=%d, len(b)=%d, auHeadersLength=%d", nbAuHeaders, pau, len(b), auHeadersLength)
}
return

@ -38,14 +38,14 @@ func (a *Auth) FeedWwwAuthenticate(auths []string, username, password string) {
a.Username = username
a.Password = password
//目前只处理第一个
var s string
if len(auths)>0{
var s string
if len(auths) > 0 {
s = auths[0]
}else{
} else {
return
}
s = strings.TrimPrefix(s, HeaderWwwAuthenticate)
s = strings.TrimSpace(s)
s = strings.TrimPrefix(s, HeaderWwwAuthenticate)
s = strings.TrimSpace(s)
if strings.HasPrefix(s, AuthTypeBasic) {
a.Typ = AuthTypeBasic
return

@ -114,12 +114,12 @@ func (session *BaseInSession) InitWithSdp(rawSdp []byte, sdpLogicCtx sdp.LogicCo
if session.sdpLogicCtx.IsAudioUnpackable() {
session.audioUnpacker = rtprtcp.DefaultRtpUnpackerFactory(session.sdpLogicCtx.GetAudioPayloadTypeBase(), session.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, session.onAvPacketUnpacked)
} else {
nazalog.Warnf("[%s] audio unpacker not support for this type yet.", session.uniqueKey)
nazalog.Warnf("[%s] audio unpacker not support for this type yet. logicCtx=%+v", session.uniqueKey, session.sdpLogicCtx)
}
if session.sdpLogicCtx.IsVideoUnpackable() {
session.videoUnpacker = rtprtcp.DefaultRtpUnpackerFactory(session.sdpLogicCtx.GetVideoPayloadTypeBase(), session.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, session.onAvPacketUnpacked)
} else {
nazalog.Warnf("[%s] video unpacker not support this type yet.", session.uniqueKey)
nazalog.Warnf("[%s] video unpacker not support this type yet. logicCtx=%+v", session.uniqueKey, session.sdpLogicCtx)
}
session.audioRrProducer = rtprtcp.NewRrProducer(session.sdpLogicCtx.AudioClockRate)

@ -343,7 +343,7 @@ func (session *ClientCommandSession) writeOptions() error {
method := ctx.Headers.Get(HeaderPublic)
if method== "" {
if method == "" {
return nil
}
if strings.Contains(method, MethodGetParameter) {

@ -52,7 +52,7 @@ func (lc *LogicContext) IsPayloadTypeOrigin(t int) bool {
}
func (lc *LogicContext) IsAudioUnpackable() bool {
return lc.audioPayloadTypeBase == base.AvPacketPtAac
return lc.audioPayloadTypeBase == base.AvPacketPtAac && lc.Asc != nil
}
func (lc *LogicContext) IsVideoUnpackable() bool {
@ -120,7 +120,7 @@ func ParseSdp2LogicContext(b []byte) (LogicContext, error) {
if md.AFmtPBase != nil {
ret.Asc, err = ParseAsc(md.AFmtPBase)
if err != nil {
return ret, err
nazalog.Warnf("parse asc from afmtp failed. err=%+v", err)
}
} else {
nazalog.Warnf("aac afmtp not exist.")

@ -12,13 +12,16 @@ import (
"encoding/base64"
"encoding/hex"
"fmt"
"strings"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/base"
)
func Pack(vps, sps, pps, asc []byte) (ctx LogicContext, raw []byte, err error) {
// 判断音频、视频是否存在以及视频是H264还是H265
var hasAudio, hasVideo, isHevc bool
if sps != nil && pps != nil {
hasVideo = true
if vps != nil {
@ -34,24 +37,43 @@ func Pack(vps, sps, pps, asc []byte) (ctx LogicContext, raw []byte, err error) {
return
}
// 判断AAC的采样率
var samplingFrequency int
if asc != nil {
var ascCtx *aac.AscContext
ascCtx, err = aac.NewAscContext(asc)
if err != nil {
return
}
samplingFrequency, err = ascCtx.GetSamplingFrequency()
if err != nil {
return
}
}
sdpStr := fmt.Sprintf(`v=0
o=- 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
a=tool:%s
o=- 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
a=tool:%s
`, base.LalPackSdp)
streamid := 0
if hasVideo {
if isHevc {
tmpl := `m=video 0 RTP/AVP 98
a=rtpmap:98 H265/90000
a=fmtp:98 profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s
a=control:streamid=%d
`
sdpStr += fmt.Sprintf(tmpl, base64.StdEncoding.EncodeToString(sps), base64.StdEncoding.EncodeToString(pps), base64.StdEncoding.EncodeToString(vps), streamid)
} else {
tmpl := `m=video 0 RTP/AVP 96
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1; sprop-parameter-sets=%s,%s; profile-level-id=640016
a=control:streamid=%d
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1; sprop-parameter-sets=%s,%s; profile-level-id=640016
a=control:streamid=%d
`
sdpStr += fmt.Sprintf(tmpl, base64.StdEncoding.EncodeToString(sps), base64.StdEncoding.EncodeToString(pps), streamid)
}
@ -61,15 +83,15 @@ func Pack(vps, sps, pps, asc []byte) (ctx LogicContext, raw []byte, err error) {
if hasAudio {
tmpl := `m=audio 0 RTP/AVP 97
b=AS:128
a=rtpmap:97 MPEG4-GENERIC/44100/2
a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=%s
a=control:streamid=%d
b=AS:128
a=rtpmap:97 MPEG4-GENERIC/%d/2
a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=%s
a=control:streamid=%d
`
sdpStr += fmt.Sprintf(tmpl, hex.EncodeToString(asc), streamid)
sdpStr += fmt.Sprintf(tmpl, samplingFrequency, hex.EncodeToString(asc), streamid)
}
raw = []byte(sdpStr)
raw = []byte(strings.ReplaceAll(sdpStr, "\n", "\r\n"))
ctx, err = ParseSdp2LogicContext(raw)
return
}

@ -458,3 +458,28 @@ a=control:track3
assert.Equal(t, nil, err)
_ = ctx
}
// sdp aac中a=fmtp缺少config字段这个case的实际情况是后续也没有aac的rtp包
func TestCase10(t *testing.T) {
golden := `v=0
o=- 0 0 IN IP4 0.0.0.0
s=rtsp_demo
t=0 0
a=control:rtsp://10.10.10.188:554/stream0
a=range:npt=0-
m=video 0 RTP/AVP 96
c=IN IP4 0.0.0.0
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1;sprop-parameter-sets=Z00AKp2oHgCJ+WbgICAgQA==,aO48gA==
a=control:rtsp://10.10.10.188:554/stream0/track1
m=audio 0 RTP/AVP 97
c=IN IP4 0.0.0.0
a=rtpmap:97 MPEG4-GENERIC/44100/2
a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3
a=control:rtsp://10.10.10.188:554/stream0/track2
`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := ParseSdp2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
_ = ctx
}

Loading…
Cancel
Save