[feat] 新增demo pullrtmp2pushrtsp

pull/99/head
q191201771
parent 77c624ab03
commit cfabc9043c

2
.gitignore vendored

@ -1,5 +1,3 @@
/app/demo/pullrtmp2pushrtsp/
profile.out
coverage.html
*.aac

@ -70,8 +70,8 @@ func parseFlag() (url string, hlsOutPath string, fragmentDurationMs int, fragmen
flag.Usage()
eo := filepath.FromSlash("./pullrtmp2hls/")
_, _ = fmt.Fprintf(os.Stderr, `Example:
%s -i rtmp://127.0.0.1:19350/live/test110 -o %s
%s -i rtmp://127.0.0.1:19350/live/test110 -o %s -d 5000 -n 5
%s -i rtmp://127.0.0.1:1935/live/test110 -o %s
%s -i rtmp://127.0.0.1:1935/live/test110 -o %s -d 5000 -n 5
`, os.Args[0], eo, os.Args[0], eo)
base.OsExitAndWaitPressIfWindows(1)
}

@ -0,0 +1,75 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"flag"
"fmt"
"os"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/nazalog"
)
func main() {
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
})
inRtmpUrl, outRtspUrl, overTcp := parseFlag()
pullSession := rtmp.NewPullSession()
pushSession := rtsp.NewPushSession(func(option *rtsp.PushSessionOption) {
option.OverTcp = overTcp == 1
})
remuxer := remux.NewRtmp2RtspRemuxer(
func(rawSdp []byte, sdpCtx sdp.LogicContext) {
// remuxer完成前期工作生成sdp并开始push
nazalog.Info("start push.")
err := pushSession.Push(outRtspUrl, rawSdp, sdpCtx)
nazalog.Assert(nil, err)
nazalog.Info("push succ.")
},
pushSession.WriteRtpPacket, // remuxer的数据给push发送
)
nazalog.Info("start pull.")
err := pullSession.Pull(inRtmpUrl, remuxer.FeedRtmpMsg) // pull接收的数据放入remuxer中
nazalog.Assert(nil, err)
nazalog.Info("pull succ.")
select {
case err := <-pullSession.WaitChan():
nazalog.Fatalf("pull stopped. err=%+v", err)
case err := <-pushSession.WaitChan():
nazalog.Fatalf("push stopped. err=%+v", err)
}
}
func parseFlag() (inRtmpUrl string, outRtspUrl string, overTcp int) {
i := flag.String("i", "", "specify pull rtmp url")
o := flag.String("o", "", "specify push rtsp url")
t := flag.Int("t", 0, "specify rtsp interleaved mode(rtp/rtcp over tcp)")
flag.Parse()
if *i == "" || *o == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
%s -i rtmp://localhost:1935/live/test110 -o rtsp://localhost:5544/live/test220 -t 0
%s -i rtmp://localhost:1935/live/test110 -o rtsp://localhost:5544/live/test220 -t 1
`, os.Args[0], os.Args[0])
base.OsExitAndWaitPressIfWindows(1)
}
return *i, *o, *t
}

@ -75,7 +75,7 @@ func main() {
}
}
func parseFlag() (inUrl string, outFilename string, overTcp int) {
func parseFlag() (inUrl string, outUrl string, overTcp int) {
i := flag.String("i", "", "specify pull rtsp url")
o := flag.String("o", "", "specify push rtmp url")
t := flag.Int("t", 0, "specify interleaved mode(rtp/rtcp over tcp)")

@ -28,8 +28,14 @@ import (
var ErrAac = errors.New("lal.aac: fxxk")
const (
minAscLength = 2
AdtsHeaderLength = 7
AscSamplingFrequencyIndex48000 = 3
AscSamplingFrequencyIndex44100 = 4
)
const (
minAscLength = 2
)
// <ISO_IEC_14496-3.pdf>
@ -150,6 +156,16 @@ func (ascCtx *AscContext) PackToAdtsHeader(out []byte, frameLength int) error {
return nil
}
func (ascCtx *AscContext) GetSamplingFrequency() (int, error) {
switch ascCtx.SamplingFrequencyIndex {
case AscSamplingFrequencyIndex48000:
return 48000, nil
case AscSamplingFrequencyIndex44100:
return 44100, nil
}
return -1, ErrAac
}
type AdtsHeaderContext struct {
AscCtx AscContext

@ -0,0 +1,210 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package remux
import (
"math/rand"
"time"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/nazalog"
)
// TODO(chef): refactor 将analyze部分独立出来作为一个filter
var (
// config
// TODO(chef): 提供option另外还有ssrc和pt都支持自定义
maxAnalyzeAvMsgSize = 16
)
// 提供rtmp数据向sdp+rtp数据的转换
type Rtmp2RtspRemuxer struct {
onSdp OnSdp
onRtpPacket OnRtpPacket
analyzeDone bool
msgCache []base.RtmpMsg
vps, sps, pps, asc []byte
audioPt base.AvPacketPt
videoPt base.AvPacketPt
audioSsrc uint32
videoSsrc uint32
audioPacker *rtprtcp.RtpPacker
videoPacker *rtprtcp.RtpPacker
}
type OnSdp func(rawSdp []byte, sdpCtx sdp.LogicContext)
type OnRtpPacket func(pkt rtprtcp.RtpPacket)
func NewRtmp2RtspRemuxer(onSdp OnSdp, onRtpPacket OnRtpPacket) *Rtmp2RtspRemuxer {
return &Rtmp2RtspRemuxer{
onSdp: onSdp,
onRtpPacket: onRtpPacket,
audioPt: base.AvPacketPtUnknown,
videoPt: base.AvPacketPtUnknown,
}
}
// @param msg: 函数调用结束后,内部不持有`msg`内存块
//
func (r *Rtmp2RtspRemuxer) FeedRtmpMsg(msg base.RtmpMsg) {
var err error
if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
// noop
return
}
// 我们需要先接收一部分rtmp数据得到音频头、视频头
// 并且考虑,流中只有音频或只有视频的情况
// 我们把前面这个阶段叫做Analyze分析阶段
if !r.analyzeDone {
if msg.IsAvcKeySeqHeader() || msg.IsHevcKeySeqHeader() {
if msg.IsAvcKeySeqHeader() {
r.sps, r.pps, err = avc.ParseSpsPpsFromSeqHeader(msg.Payload)
nazalog.Assert(nil, err)
} else if msg.IsHevcKeySeqHeader() {
r.vps, r.sps, r.pps, err = hevc.ParseVpsSpsPpsFromSeqHeader(msg.Payload)
nazalog.Assert(nil, err)
}
r.doAnalyze()
return
}
if msg.IsAacSeqHeader() {
r.asc = msg.Clone().Payload[2:]
r.doAnalyze()
return
}
r.msgCache = append(r.msgCache, msg.Clone())
r.doAnalyze()
return
}
// 正常阶段
// 音视频头已通过sdp回调rtp数据中不再包含音视频头
if msg.IsAvcKeySeqHeader() || msg.IsHevcKeySeqHeader() || msg.IsAacSeqHeader() {
return
}
r.remux(msg)
}
func (r *Rtmp2RtspRemuxer) doAnalyze() {
nazalog.Assert(false, r.analyzeDone)
if r.isAnalyzeEnough() {
if r.sps != nil && r.pps != nil {
if r.vps != nil {
r.videoPt = base.AvPacketPtHevc
} else {
r.videoPt = base.AvPacketPtAvc
}
}
if r.asc != nil {
r.audioPt = base.AvPacketPtAac
}
// 回调sdp
ctx, rawSdp, err := sdp.Pack(r.vps, r.sps, r.pps, r.asc)
nazalog.Assert(nil, err)
r.onSdp(rawSdp, ctx)
// 分析阶段缓存的数据
for i := range r.msgCache {
r.remux(r.msgCache[i])
}
r.msgCache = nil
r.analyzeDone = true
}
}
// 是否应该退出Analyze阶段
func (r *Rtmp2RtspRemuxer) isAnalyzeEnough() bool {
// 音视频头都收集好了
if r.sps != nil && r.pps != nil && r.asc != nil {
return true
}
// 达到分析包数阈值了
if len(r.msgCache) >= maxAnalyzeAvMsgSize {
return true
}
return false
}
func (r *Rtmp2RtspRemuxer) remux(msg base.RtmpMsg) {
var rtppkts []rtprtcp.RtpPacket
switch msg.Header.MsgTypeId {
case base.RtmpTypeIdAudio:
rtppkts = r.getAudioPacker().Pack(base.AvPacket{
Timestamp: msg.Header.TimestampAbs,
PayloadType: r.audioPt,
Payload: msg.Payload[2:],
})
case base.RtmpTypeIdVideo:
rtppkts = r.getVideoPacker().Pack(base.AvPacket{
Timestamp: msg.Header.TimestampAbs,
PayloadType: r.videoPt,
Payload: msg.Payload[5:],
})
}
for i := range rtppkts {
r.onRtpPacket(rtppkts[i])
}
}
func (r *Rtmp2RtspRemuxer) getAudioPacker() *rtprtcp.RtpPacker {
if r.audioPacker == nil {
// TODO(chef): ssrc随机产生并且整个lal没有在setup信令中传递ssrc
r.audioSsrc = rand.Uint32()
// TODO(chef): 如果rtmp不是以音视频头开始也可能收到了帧数据但是头不存在目前该remux没有做过多容错判断后续要加上或者在输入层保证
ascCtx, err := aac.NewAscContext(r.asc)
if err != nil {
nazalog.Errorf("parse asc failed. err=%+v", err)
}
clockRate, err := ascCtx.GetSamplingFrequency()
if err != nil {
nazalog.Errorf("get sampling frequency failed. err=%+v", err)
}
pp := rtprtcp.NewRtpPackerPayloadAac()
r.audioPacker = rtprtcp.NewRtpPacker(pp, clockRate, r.audioSsrc)
}
return r.audioPacker
}
func (r *Rtmp2RtspRemuxer) getVideoPacker() *rtprtcp.RtpPacker {
if r.videoPacker == nil {
r.videoSsrc = rand.Uint32()
pp := rtprtcp.NewRtpPackerPayloadAvcHevc(r.videoPt, func(option *rtprtcp.RtpPackerPayloadAvcHevcOption) {
option.Typ = rtprtcp.RtpPackerPayloadAvcHevcTypeAvcc
})
r.videoPacker = rtprtcp.NewRtpPacker(pp, 90000, r.videoSsrc)
}
return r.videoPacker
}
func init() {
rand.Seed(time.Now().UnixNano())
}

@ -20,9 +20,6 @@ const (
RtpPackerPayloadAvcHevcTypeNalu RtpPackerPayloadAvcHevcType = 1
RtpPackerPayloadAvcHevcTypeAvcc = 2
RtpPackerPayloadAvcHevcTypeAnnexb = 3
RtpPackerPayloadHevcTypeNalu = RtpPackerPayloadAvcHevcTypeNalu // hevc的外层格式和avc是一样的
RtpPackerPayloadHevcTypeAvcc = RtpPackerPayloadAvcHevcTypeAvcc
RtpPackerPayloadHevcTypeAnnexb = RtpPackerPayloadAvcHevcTypeAnnexb
)
type RtpPackerPayloadAvcHevcOption struct {
@ -41,14 +38,14 @@ type RtpPackerPayloadAvcHevc struct {
type ModRtpPackerPayloadAvcHevcOption func(option *RtpPackerPayloadAvcHevcOption)
func NewRtpPackerPayloadAvc(modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc {
return newRtpPackerPayloadAvcHevc(base.AvPacketPtAvc, modOptions...)
return NewRtpPackerPayloadAvcHevc(base.AvPacketPtAvc, modOptions...)
}
func NewRtpPackerPayloadHevc(modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc {
return newRtpPackerPayloadAvcHevc(base.AvPacketPtHevc, modOptions...)
return NewRtpPackerPayloadAvcHevc(base.AvPacketPtHevc, modOptions...)
}
func newRtpPackerPayloadAvcHevc(payloadType base.AvPacketPt, modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc {
func NewRtpPackerPayloadAvcHevc(payloadType base.AvPacketPt, modOptions ...ModRtpPackerPayloadAvcHevcOption) *RtpPackerPayloadAvcHevc {
option := defaultRtpPackerPayloadAvcHevcOption
for _, fn := range modOptions {
fn(&option)

@ -60,7 +60,7 @@ type IRtpUnpackerProtocol interface {
// 假如sps和pps是一个stapA包则合并结果为一个AvPacket
type OnAvPacket func(pkt base.AvPacket)
// 目前支持AVCHEVC和AAC MPEG4-GENERIC/44100/2业务方也可以自己实现IRtpUnpackerProtocol甚至是IRtpUnpackContainer
// 目前支持AVCHEVC和AAC MPEG4-GENERIC业务方也可以自己实现IRtpUnpackerProtocol甚至是IRtpUnpackContainer
func DefaultRtpUnpackerFactory(payloadType base.AvPacketPt, clockRate int, maxSize int, onAvPacket OnAvPacket) IRtpUnpacker {
var protocol IRtpUnpackerProtocol
switch payloadType {

@ -203,7 +203,7 @@ func parseAu(b []byte) (ret []au) {
if (nbAuHeaders > 1 && pau != uint32(len(b))) ||
(nbAuHeaders == 1 && pau < uint32(len(b))) {
nazalog.Warnf("rtp packet size invalid. nbAuHeaders=%d, pau=%d, len(b)=%d", nbAuHeaders, pau, len(b))
nazalog.Warnf("rtp packet size invalid. nbAuHeaders=%d, pau=%d, len(b)=%d, auHeadersLength=%d", nbAuHeaders, pau, len(b), auHeadersLength)
}
return

@ -114,12 +114,12 @@ func (session *BaseInSession) InitWithSdp(rawSdp []byte, sdpLogicCtx sdp.LogicCo
if session.sdpLogicCtx.IsAudioUnpackable() {
session.audioUnpacker = rtprtcp.DefaultRtpUnpackerFactory(session.sdpLogicCtx.GetAudioPayloadTypeBase(), session.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, session.onAvPacketUnpacked)
} else {
nazalog.Warnf("[%s] audio unpacker not support for this type yet.", session.uniqueKey)
nazalog.Warnf("[%s] audio unpacker not support for this type yet. logicCtx=%+v", session.uniqueKey, session.sdpLogicCtx)
}
if session.sdpLogicCtx.IsVideoUnpackable() {
session.videoUnpacker = rtprtcp.DefaultRtpUnpackerFactory(session.sdpLogicCtx.GetVideoPayloadTypeBase(), session.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, session.onAvPacketUnpacked)
} else {
nazalog.Warnf("[%s] video unpacker not support this type yet.", session.uniqueKey)
nazalog.Warnf("[%s] video unpacker not support this type yet. logicCtx=%+v", session.uniqueKey, session.sdpLogicCtx)
}
session.audioRrProducer = rtprtcp.NewRrProducer(session.sdpLogicCtx.AudioClockRate)

@ -12,13 +12,16 @@ import (
"encoding/base64"
"encoding/hex"
"fmt"
"strings"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/base"
)
func Pack(vps, sps, pps, asc []byte) (ctx LogicContext, raw []byte, err error) {
// 判断音频、视频是否存在以及视频是H264还是H265
var hasAudio, hasVideo, isHevc bool
if sps != nil && pps != nil {
hasVideo = true
if vps != nil {
@ -34,24 +37,43 @@ func Pack(vps, sps, pps, asc []byte) (ctx LogicContext, raw []byte, err error) {
return
}
// 判断AAC的采样率
var samplingFrequency int
if asc != nil {
var ascCtx *aac.AscContext
ascCtx, err = aac.NewAscContext(asc)
if err != nil {
return
}
samplingFrequency, err = ascCtx.GetSamplingFrequency()
if err != nil {
return
}
}
sdpStr := fmt.Sprintf(`v=0
o=- 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
a=tool:%s
o=- 0 0 IN IP4 127.0.0.1
s=No Name
c=IN IP4 127.0.0.1
t=0 0
a=tool:%s
`, base.LalPackSdp)
streamid := 0
if hasVideo {
if isHevc {
tmpl := `m=video 0 RTP/AVP 98
a=rtpmap:98 H265/90000
a=fmtp:98 profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s
a=control:streamid=%d
`
sdpStr += fmt.Sprintf(tmpl, base64.StdEncoding.EncodeToString(sps), base64.StdEncoding.EncodeToString(pps), base64.StdEncoding.EncodeToString(vps), streamid)
} else {
tmpl := `m=video 0 RTP/AVP 96
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1; sprop-parameter-sets=%s,%s; profile-level-id=640016
a=control:streamid=%d
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1; sprop-parameter-sets=%s,%s; profile-level-id=640016
a=control:streamid=%d
`
sdpStr += fmt.Sprintf(tmpl, base64.StdEncoding.EncodeToString(sps), base64.StdEncoding.EncodeToString(pps), streamid)
}
@ -61,15 +83,15 @@ func Pack(vps, sps, pps, asc []byte) (ctx LogicContext, raw []byte, err error) {
if hasAudio {
tmpl := `m=audio 0 RTP/AVP 97
b=AS:128
a=rtpmap:97 MPEG4-GENERIC/44100/2
a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=%s
a=control:streamid=%d
b=AS:128
a=rtpmap:97 MPEG4-GENERIC/%d/2
a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=%s
a=control:streamid=%d
`
sdpStr += fmt.Sprintf(tmpl, hex.EncodeToString(asc), streamid)
sdpStr += fmt.Sprintf(tmpl, samplingFrequency, hex.EncodeToString(asc), streamid)
}
raw = []byte(sdpStr)
raw = []byte(strings.ReplaceAll(sdpStr, "\n", "\r\n"))
ctx, err = ParseSdp2LogicContext(raw)
return
}

Loading…
Cancel
Save