(1) [refactor] DumpFile增加文件头信息,每个数据块包含类型 (2) [feat] demo/pullrtsp和CutomizePub支持DumpFile (3) [test] 整理DumpFile的测试

pull/248/head
q191201771 2 years ago
parent f5b97d9087
commit 46c0b47911

@ -11,6 +11,8 @@ package main
import (
"flag"
"fmt"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"os"
"time"
@ -21,14 +23,52 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// pullrtsp 拉取rtsp流然后存储为flv文件或者dump文件进行分析
// TODO(chef): dump功能整理成flag参数 202211
// TODO(chef): dump中加入sdp 202211
var remuxer *remux.AvPacket2RtmpRemuxer
var dump *base.DumpFile
type Observer struct{}
func (o *Observer) OnSdp(sdpCtx sdp.LogicContext) {
nazalog.Debugf("OnSdp %+v", sdpCtx)
if dump != nil {
dump.WriteWithType(sdpCtx.RawSdp, base.DumpTypeRtspSdpData)
}
remuxer.OnSdp(sdpCtx)
}
func (o *Observer) OnRtpPacket(pkt rtprtcp.RtpPacket) {
if dump != nil {
dump.WriteWithType(pkt.Raw, base.DumpTypeRtspRtpData)
}
remuxer.OnRtpPacket(pkt)
}
func (o *Observer) OnAvPacket(pkt base.AvPacket) {
//nazalog.Debugf("OnAvPacket %+v", pkt.DebugString())
remuxer.OnAvPacket(pkt)
}
func main() {
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
option.IsToStdout = true
option.Filename = "pullrtsp.log"
})
defer nazalog.Sync()
base.LogoutStartInfo()
inUrl, outFilename, overTcp := parseFlag()
inUrl, outFilename, overTcp, debugDumpPacket := parseFlag()
if debugDumpPacket != "" {
dump = base.NewDumpFile()
err := dump.OpenToWrite(debugDumpPacket)
nazalog.Assert(nil, err)
}
var fileWriter httpflv.FlvFileWriter
err := fileWriter.Open(outFilename)
@ -37,11 +77,14 @@ func main() {
err = fileWriter.WriteRaw(httpflv.FlvHeader)
nazalog.Assert(nil, err)
remuxer := remux.NewAvPacket2RtmpRemuxer().WithOnRtmpMsg(func(msg base.RtmpMsg) {
remuxer = remux.NewAvPacket2RtmpRemuxer().WithOnRtmpMsg(func(msg base.RtmpMsg) {
err = fileWriter.WriteTag(*remux.RtmpMsg2FlvTag(msg))
nazalog.Assert(nil, err)
})
pullSession := rtsp.NewPullSession(remuxer, func(option *rtsp.PullSessionOption) {
var observer Observer
pullSession := rtsp.NewPullSession(&observer, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMs = 5000
option.OverTcp = overTcp != 0
})
@ -68,18 +111,22 @@ func main() {
nazalog.Infof("< pullSession.Wait(). err=%+v", err)
}
func parseFlag() (inUrl string, outFilename string, overTcp int) {
func parseFlag() (inUrl string, outFilename string, overTcp int, debugDumpPacket string) {
i := flag.String("i", "", "specify pull rtsp url")
o := flag.String("o", "", "specify output flv file")
t := flag.Int("t", 0, "specify interleaved mode(rtp/rtcp over tcp)")
d := flag.String("d", "", "specify debug dump packet filename")
flag.Parse()
if *i == "" || *o == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
%s -i rtsp://localhost:5544/live/test110 -o out.flv -t 0
%s -i rtsp://localhost:5544/live/test110 -o out.flv -t 1
`, os.Args[0], os.Args[0])
%s -i rtsp://localhost:5544/live/test110 -o outpullrtsp.flv -t 0
%s -i rtsp://localhost:5544/live/test110 -o outpullrtsp.flv -t 1
%s -i rtsp://localhost:5544/live/test110 -o outpullrtsp.flv -t 0 -d outpullrtsp.laldump
`, os.Args[0], os.Args[0], os.Args[0])
base.OsExitAndWaitPressIfWindows(1)
}
return *i, *o, *t
return *i, *o, *t, *d
}

@ -13,6 +13,7 @@ import (
"fmt"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/naza/pkg/nazalog"
"os"
"path/filepath"
"time"
@ -22,20 +23,49 @@ import (
//
// lal中的支持情况列表
// 支持情况 | 协议 | 类型 | 应用 | 开关手段 | 方式
// 已支持 | ps | pub | lalserver | http-api参数 | hook到logic中
// 未支持 | rtsp | pull | lalserver | http-api参数 | hook到logic中
// 未支持 | customize pub |
//
// | 支持情况 | 协议 | 类型 | 应用 | 开关手段 | 方式 | 测试(dump, parse) |
// | 已支持 | ps | pub | lalserver | http-api参数 | hook到logic中 | 00 |
// | 已支持 | rtsp | pull | lalserver | http-api参数 | 回调到logic中 | 00 |
// | 已支持 | rtsp | pull | demo/pullrtsp | 运行参数 | 回调到上层逻辑 | 11 |
// | 已支持 | customize pub | pub | lalserver | 参数 | 接口提供选项 | 00 |
// | 未支持 | rtmp | | | | | |
const (
DumpTypeDefault uint32 = 0
DumpTypePsRtpData uint32 = 1 // 1
DumpTypeRtspRtpData uint32 = 17 // 1+16
DumpTypeRtspSdpData uint32 = 18
DumpTypeCustomizePubData uint32 = 33 // 1+16*2
DumpTypeCustomizePubAudioSpecificConfigData uint32 = 34
DumpTypeInnerFileHeaderData uint32 = 49 // 1+16*3
)
func (d *DumpFile) WriteAvPacket(packet AvPacket, typ uint32) error {
out := make([]byte, 4+8+8+len(packet.Payload))
bele.BePutUint32(out, uint32(packet.PayloadType))
bele.BePutUint64(out[4:], uint64(packet.Timestamp))
bele.BePutUint64(out[12:], uint64(packet.Pts))
copy(out[20:], packet.Payload)
return d.WriteWithType(out, typ)
}
// ---------------------------------------------------------------------------------------------------------------------
const (
writeVer uint32 = 3
)
type DumpFile struct {
file *os.File
}
type DumpFileMessage struct {
Ver uint32
Ver uint32 // 制造数据时的代码版本
Typ uint32
Len uint32
Timestamp uint32
Len uint32 // Body 的长度
Timestamp uint64 // 写入时的时间戳
Reserve uint32
Body []byte
}
@ -49,7 +79,7 @@ func (d *DumpFile) OpenToWrite(filename string) (err error) {
return err
}
d.file, err = os.Create(filename)
return
return d.WriteWithType([]byte(LalFullInfo), DumpTypeInnerFileHeaderData)
}
func (d *DumpFile) OpenToRead(filename string) (err error) {
@ -57,8 +87,8 @@ func (d *DumpFile) OpenToRead(filename string) (err error) {
return
}
func (d *DumpFile) Write(b []byte) error {
_, err := d.file.Write(d.pack(b))
func (d *DumpFile) WriteWithType(b []byte, typ uint32) error {
_, err := d.file.Write(d.pack(b, typ))
return err
}
@ -67,6 +97,11 @@ func (d *DumpFile) ReadOneMessage() (m DumpFileMessage, err error) {
if err != nil {
return
}
if m.Ver < writeVer {
nazalog.Warnf("invalid ver. ver=%d", m.Ver)
}
m.Typ, err = bele.ReadBeUint32(d.file)
if err != nil {
return
@ -75,14 +110,17 @@ func (d *DumpFile) ReadOneMessage() (m DumpFileMessage, err error) {
if err != nil {
return
}
m.Timestamp, err = bele.ReadBeUint32(d.file)
m.Timestamp, err = bele.ReadBeUint64(d.file)
if err != nil {
return
}
m.Reserve, err = bele.ReadBeUint32(d.file)
if err != nil {
return
}
m.Body = make([]byte, m.Len)
_, err = d.file.Read(m.Body)
// TODO(chef): [opt] 检查Ver等值 202208
// TODO(chef): [opt] check Read return value 202208
return
}
@ -102,13 +140,20 @@ func (m *DumpFileMessage) DebugString() string {
// ---------------------------------------------------------------------------------------------------------------------
func (d *DumpFile) pack(b []byte) []byte {
ret := make([]byte, len(b)+16)
bele.BePutUint32(ret, 1) // Ver
bele.BePutUint32(ret[4:], 1) // Typ
bele.BePutUint32(ret[8:], uint32(len(b))) // Len
//bele.BePutUint32(ret[12:], 0) // Timestamp
bele.BePutUint32(ret[12:], uint32(time.Now().Unix())) // Timestamp
copy(ret[16:], b)
func (d *DumpFile) pack(b []byte, typ uint32) []byte {
// TODO(chef): [perf] 优化这块内存 202211
ret := make([]byte, len(b)+24)
i := 0
bele.BePutUint32(ret[i:], writeVer) // Ver
i += 4
bele.BePutUint32(ret[i:], typ) // Typ
i += 4
bele.BePutUint32(ret[i:], uint32(len(b))) // Len
i += 4
bele.BePutUint64(ret[i:], uint64(time.Now().UnixMilli())) // Timestamp
i += 8
copy(ret[i:], "LALD")
i += 4
copy(ret[i:], b)
return ret
}

@ -1,169 +0,0 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package gb28181
import (
"encoding/hex"
"fmt"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazanet"
"io/ioutil"
"net"
"os"
"testing"
"time"
)
// 测试:
// 1. 测试 TestPubSession
// 1.1 测试指定端口
// 1.2 测试随机端口
// 1.3 测试重复端口
// 2. 测试 TestReplayPubSession
func TestReplayPubSession(t *testing.T) {
// 重放业务方的流
// 步骤:
// 1. 业务方提供的lalserver录制下来的dump file修改下面的文件名变量filename
// 2. 启动lalserver
// 3. 调用HTTP API
// curl -H "Content-Type:application/json" -X POST -d '{"stream_name": "test110", "port": 10002, "timeout_ms": 10000}' http://127.0.0.1:8083/api/ctrl/start_rtp_pub
// 4. 执行该测试
// go test -test.run TestReplayPubSession
//
filename := "/tmp/record.psdata"
isTcpFlag := 1
b, err := ioutil.ReadFile(filename)
if len(b) == 0 || err != nil {
return
}
testPushFile("127.0.0.1:10002", filename, isTcpFlag)
}
func TestPubSession(t *testing.T) {
// 读取一大堆.ps文件并使用udp发送到`addr`地址外部的比如外部自己启动lalserver
// 步骤:
// 1. 启动lalserver
// 2. 调用HTTP API
// curl -H "Content-Type:application/json" -X POST -d '{"stream_name": "test110", "port": 10002, "timeout_ms": 10000, "debug_dump_packet": "/tmp/test110.psdata"}' http://127.0.0.1:8083/api/ctrl/start_rtp_pub
// 3. 执行该测试
// go test -test.run TestPubSession
//helpUdpSend("127.0.0.1:10002")
// 读取一大堆.ps文件并使用udp发送到`addr`地址内部启动了PubSession做接收
// 步骤:
// 1. 执行该测试
//testPubSession()
}
// ---------------------------------------------------------------------------------------------------------------------
func testPubSession() {
// 一个udp包一个文件按行分隔hex stream格式如下
// 8060 0000 0000 0000 0beb c567 0000 01ba
// 46ab 1ea9 4401 0139 9ffe ffff 0094 ab0d
fp, err := os.Create("/tmp/udp2.h264")
nazalog.Assert(nil, err)
defer fp.Close()
fp2, err := os.Create("/tmp/udp2.aac")
nazalog.Assert(nil, err)
defer fp2.Close()
pool := nazanet.NewAvailUdpConnPool(1024, 10240)
port, err := pool.Peek()
nazalog.Assert(nil, err)
addr := fmt.Sprintf("127.0.0.1:%d", port)
session := NewPubSession().WithOnAvPacket(func(packet *base.AvPacket) {
nazalog.Infof("[test2] onAvPacket. packet=%s", packet.DebugString())
if packet.IsAudio() {
aac.NewAdtsHeaderContext(packet.Payload)
_, _ = fp2.Write(packet.Payload)
} else if packet.IsVideo() {
_, _ = fp.Write(packet.Payload)
}
})
go func() {
time.Sleep(100 * time.Millisecond)
helpUdpSend(addr)
}()
_, runErr := session.Listen(int(port), false)
nazalog.Assert(nil, runErr)
runErr = session.RunLoop()
nazalog.Assert(nil, runErr)
}
func helpUdpSend(addr string) {
conn, err := nazanet.NewUdpConnection(func(option *nazanet.UdpConnectionOption) {
option.RAddr = addr
})
nazalog.Assert(nil, err)
for i := 1; i < 10000; i++ {
filename := fmt.Sprintf("/tmp/rtp-h264-aac/%d.ps", i)
//filename := fmt.Sprintf("/tmp/rtp-ps-video/%d.ps", i)
b, err := ioutil.ReadFile(filename)
nazalog.Assert(nil, err)
nazalog.Debugf("[test] %d: %s", i, hex.EncodeToString(b[12:]))
conn.Write(b)
}
}
func testPushFile(addr string, filename string, isTcpFlag int) {
var udpConn *nazanet.UdpConnection
var tcpConn net.Conn
var err error
if isTcpFlag != 0 {
tcpConn, err = net.Dial("tcp", addr)
nazalog.Assert(nil, err)
} else {
udpConn, err = nazanet.NewUdpConnection(func(option *nazanet.UdpConnectionOption) {
option.RAddr = addr
})
nazalog.Assert(nil, err)
}
df := base.NewDumpFile()
err = df.OpenToRead(filename)
nazalog.Assert(nil, err)
lb := make([]byte, 2)
for {
m, err := df.ReadOneMessage()
if err != nil {
nazalog.Errorf("%+v", err)
break
}
nazalog.Debugf("%s", m.DebugString())
if isTcpFlag != 0 {
bele.BePutUint16(lb, uint16(m.Len))
_, err = tcpConn.Write(lb)
nazalog.Assert(nil, err)
_, err = tcpConn.Write(m.Body)
nazalog.Assert(nil, err)
} else {
udpConn.Write(m.Body)
}
//time.Sleep(10 * time.Millisecond)
}
}

@ -8,4 +8,8 @@
package innertest
// TODO(chef): [refactor] 把gb28181 ps的测试也挪过来 202209
import "testing"
func TestDump_CutomizePub(t *testing.T) {
}

@ -0,0 +1,91 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package innertest
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazanet"
"io/ioutil"
"net"
"testing"
)
// TestRe_PsPubSession
//
// 重放业务方的ps流。
//
// 本测试函数模拟客户端读取业务方对的dumpfile重新推送给lalserver。
//
// 步骤:
//
// 1. 让业务方提供lalserver录制下来的dumpfile文件
// 2. 将dumpfile存放在下面filename变量处或者修改下面filename变量值
// 3. 启动lalserver
// 4. 调用HTTP API
// curl -H "Content-Type:application/json" -X POST -d '{"stream_name": "test110", "port": 10002, "timeout_ms": 10000}' http://127.0.0.1:8083/api/ctrl/start_rtp_pub
// 5. 执行该测试
// go test -test.run TestDump_PsPub
func TestDump_PsPub(t *testing.T) {
filename := "/tmp/record.psdata"
isTcpFlag := 1
b, err := ioutil.ReadFile(filename)
if len(b) == 0 || err != nil {
return
}
testPushFile("127.0.0.1:10002", filename, isTcpFlag)
}
// ---------------------------------------------------------------------------------------------------------------------
// testPushFile 创建udp客户端向 addr 地址发送 filename 文件中的包
func testPushFile(addr string, filename string, isTcpFlag int) {
var udpConn *nazanet.UdpConnection
var tcpConn net.Conn
var err error
if isTcpFlag != 0 {
tcpConn, err = net.Dial("tcp", addr)
nazalog.Assert(nil, err)
} else {
udpConn, err = nazanet.NewUdpConnection(func(option *nazanet.UdpConnectionOption) {
option.RAddr = addr
})
nazalog.Assert(nil, err)
}
df := base.NewDumpFile()
err = df.OpenToRead(filename)
nazalog.Assert(nil, err)
lb := make([]byte, 2)
for {
m, err := df.ReadOneMessage()
if err != nil {
nazalog.Errorf("%+v", err)
break
}
nazalog.Debugf("%s", m.DebugString())
if isTcpFlag != 0 {
bele.BePutUint16(lb, uint16(m.Len))
_, err = tcpConn.Write(lb)
nazalog.Assert(nil, err)
_, err = tcpConn.Write(m.Body)
nazalog.Assert(nil, err)
} else {
udpConn.Write(m.Body)
}
//time.Sleep(10 * time.Millisecond)
}
}

@ -0,0 +1,98 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package innertest
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/nazalog"
"io"
"testing"
)
// TestDump_Rtsp
//
// 重放业务方的rtsp流。
//
// 本测试函数模拟客户端读取业务方对的dumpfile解析为rtp合帧写flv文件
//
// 步骤:
//
// 1. 让业务方提供lalserver录制下来的dumpfile文件
// 2. 将dumpfile存放在下面filename变量处或者修改下面filename变量值
// 3. 执行该测试
// go test -test.run TestDump_Rtsp
func TestDump_Rtsp(t *testing.T) {
// TODO(chef): [test] 合帧测试,只有音频部分,没有视频部分 202211
filename := "/tmp/outpullrtsp.laldump"
outFlvFilename := "/tmp/outtestdumprtsp.flv"
// 初始化输出的flv文件
var fileWriter httpflv.FlvFileWriter
err := fileWriter.Open(outFlvFilename)
nazalog.Assert(nil, err)
defer fileWriter.Dispose()
err = fileWriter.WriteRaw(httpflv.FlvHeader)
nazalog.Assert(nil, err)
// 初始化remuxer
remuxer := remux.NewAvPacket2RtmpRemuxer().WithOnRtmpMsg(func(msg base.RtmpMsg) {
nazalog.Debugf("remuxer. %s", msg.DebugString())
err = fileWriter.WriteTag(*remux.RtmpMsg2FlvTag(msg))
nazalog.Assert(nil, err)
})
var ctx sdp.LogicContext
var unpacker rtprtcp.IRtpUnpacker
df := base.NewDumpFile()
err = df.OpenToRead(filename)
nazalog.Assert(nil, err)
for {
m, err := df.ReadOneMessage()
nazalog.Debugf("< ReadOneMessage. %+v, %+v", m, err)
if err == io.EOF {
return
}
nazalog.Assert(nil, err)
if m.Typ == base.DumpTypeInnerFileHeaderData {
continue
}
if m.Typ != base.DumpTypeRtspRtpData && m.Typ != base.DumpTypeRtspSdpData {
nazalog.Errorf("unknown type. typ=%d", m.Typ)
return
}
if m.Typ == base.DumpTypeRtspSdpData {
ctx, err = sdp.ParseSdp2LogicContext([]byte(m.Body))
nazalog.Debugf("parse sdp, %+v, %+v", ctx, err)
remuxer.OnSdp(ctx)
unpacker = rtprtcp.DefaultRtpUnpackerFactory(ctx.GetAudioPayloadTypeBase(), ctx.AudioClockRate, 1024, func(pkt base.AvPacket) {
nazalog.Debugf("unpacker. %s", pkt.DebugString())
remuxer.OnAvPacket(pkt)
})
continue
}
pkt, err := rtprtcp.ParseRtpPacket(m.Body)
nazalog.Debugf("< ParseRtpPacket. %+v, %+v", pkt, err)
nazalog.Assert(nil, err)
if ctx.IsAudioPayloadTypeOrigin(int(pkt.Header.PacketType)) {
unpacker.Feed(pkt)
}
}
}

@ -15,12 +15,20 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
type CustomizePubSessionOption struct {
DebugDumpPacket string
}
type ModCustomizePubSessionOptionFn func(option *CustomizePubSessionOption)
type CustomizePubSessionContext struct {
uniqueKey string
streamName string
remuxer *remux.AvPacket2RtmpRemuxer
onRtmpMsg func(msg base.RtmpMsg)
option CustomizePubSessionOption
dumpFile *base.DumpFile
disposeFlag nazaatomic.Bool
}
@ -41,6 +49,16 @@ func (ctx *CustomizePubSessionContext) WithOnRtmpMsg(onRtmpMsg func(msg base.Rtm
return ctx
}
func (ctx *CustomizePubSessionContext) WithCustomizePubSessionContextOption(modFn func(option *CustomizePubSessionOption)) *CustomizePubSessionContext {
modFn(&ctx.option)
if ctx.option.DebugDumpPacket != "" {
ctx.dumpFile = base.NewDumpFile()
err := ctx.dumpFile.OpenToWrite(ctx.option.DebugDumpPacket)
nazalog.Assert(nil, err)
}
return ctx
}
func (ctx *CustomizePubSessionContext) UniqueKey() string {
return ctx.uniqueKey
}
@ -54,7 +72,7 @@ func (ctx *CustomizePubSessionContext) Dispose() {
ctx.disposeFlag.Store(true)
}
// ---------------------------------------------------------------------------------------------------------------------
// -----implement of base.IAvPacketStream ------------------------------------------------------------------------------
func (ctx *CustomizePubSessionContext) WithOption(modOption func(option *base.AvPacketStreamOption)) {
ctx.remuxer.WithOption(modOption)
@ -62,18 +80,22 @@ func (ctx *CustomizePubSessionContext) WithOption(modOption func(option *base.Av
func (ctx *CustomizePubSessionContext) FeedAudioSpecificConfig(asc []byte) error {
if ctx.disposeFlag.Load() {
nazalog.Errorf("[%s] FeedAudioSpecificConfig while CustomizePubSessionContext disposed.", ctx.uniqueKey)
return base.ErrDisposedInStream
}
//nazalog.Debugf("[%s] FeedAudioSpecificConfig. asc=%s", ctx.uniqueKey, hex.Dump(asc))
ctx.dumpFile.WriteWithType(asc, base.DumpTypeCustomizePubAudioSpecificConfigData)
ctx.remuxer.InitWithAvConfig(asc, nil, nil, nil)
return nil
}
func (ctx *CustomizePubSessionContext) FeedAvPacket(packet base.AvPacket) error {
if ctx.disposeFlag.Load() {
nazalog.Errorf("[%s] FeedAudioSpecificConfig while CustomizePubSessionContext disposed.", ctx.uniqueKey)
return base.ErrDisposedInStream
}
//nazalog.Debugf("[%s] FeedAvPacket. packet=%s", ctx.uniqueKey, packet.DebugString())
ctx.dumpFile.WriteAvPacket(packet, base.DumpTypeCustomizePubData)
ctx.remuxer.FeedAvPacket(packet)
return nil
}

@ -67,7 +67,7 @@ func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) {
defer group.mutex.Unlock()
group.feedRtpPacket(pkt)
if group.rtspPullDumpFile != nil {
group.rtspPullDumpFile.Write(pkt.Raw)
group.rtspPullDumpFile.WriteWithType(pkt.Raw, base.DumpTypeRtspRtpData)
}
}

@ -112,7 +112,7 @@ func (group *Group) StartRtpPub(req base.ApiCtrlStartRtpPubReq) (ret base.ApiCtr
pubSession := gb28181.NewPubSession().WithStreamName(req.StreamName).WithOnAvPacket(group.OnAvPacketFromPsPubSession)
pubSession.WithHookReadPacket(func(b []byte) {
if group.psPubDumpFile != nil {
group.psPubDumpFile.Write(b)
group.psPubDumpFile.WriteWithType(b, base.DumpTypePsRtpData)
}
})

Loading…
Cancel
Save