- [feat] gb28181 PubSession接入logic.Group基本完成

pull/208/head
q191201771 3 years ago
parent 7bdc841317
commit ff0864254f

@ -1,6 +1,6 @@
# LAL
[![Platform](https://img.shields.io/badge/platform-linux%20%7C%20macos%20%7C%20windows-green.svg)](https://github.com/q191201771/lal)
[![Platform](https://img.shields.io/badge/platform-linux%20%7C%20windows%20%7C%20macos-green.svg)](https://github.com/q191201771/lal)
[![Release](https://img.shields.io/github/tag/q191201771/lal.svg?label=release)](https://github.com/q191201771/lal/releases)
[![CI](https://github.com/q191201771/lal/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/q191201771/lal/actions/workflows/ci.yml)
[![goreportcard](https://goreportcard.com/badge/github.com/q191201771/lal)](https://goreportcard.com/report/github.com/q191201771/lal)

@ -16,6 +16,7 @@ type (
const (
AvPacketStreamAudioFormatUnknown AvPacketStreamAudioFormat = 0
AvPacketStreamAudioFormatRawAac AvPacketStreamAudioFormat = 1
AvPacketStreamAudioFormatAdtsAac AvPacketStreamAudioFormat = 2
AvPacketStreamVideoFormatUnknown AvPacketStreamVideoFormat = 0
AvPacketStreamVideoFormatAvcc AvPacketStreamVideoFormat = 1

@ -149,6 +149,8 @@ func (s *BasicSessionStat) UniqueKey() string {
// ---------------------------------------------------------------------------------------------------------------------
// updateStat 根据两次调用间隔计算bitrate
//
func (s *BasicSessionStat) updateStat(readBytesSum, wroteBytesSum uint64, typ string, intervalSec uint32) {
rDiff := readBytesSum - s.prevConnStat.ReadBytesSum
s.stat.ReadBitrate = int(rDiff * 8 / 1024 / uint64(intervalSec))
@ -168,6 +170,8 @@ func (s *BasicSessionStat) updateStat(readBytesSum, wroteBytesSum uint64, typ st
s.prevConnStat.WroteBytesSum = wroteBytesSum
}
// isAlive 根据两次调用间隔计算是否存活
//
func (s *BasicSessionStat) isAlive(readBytesSum, wroteBytesSum uint64) (readAlive, writeAlive bool) {
if s.staleStat == nil {
s.staleStat = new(connection.Stat)

@ -0,0 +1,107 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import (
"encoding/hex"
"fmt"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazabytes"
"os"
"path/filepath"
"time"
)
// TODO(chef): [refactor] move to naza 202208
type DumpFile struct {
file *os.File
}
type DumpFileMessage struct {
Ver uint32
Typ uint32
Len uint32
Timestamp uint32
Body []byte
}
func NewDumpFile() *DumpFile {
return &DumpFile{}
}
func (d *DumpFile) OpenToWrite(filename string) (err error) {
dir := filepath.Dir(filename)
if err = os.MkdirAll(dir, 0755); err != nil {
return err
}
d.file, err = os.Create(filename)
return
}
func (d *DumpFile) OpenToRead(filename string) (err error) {
d.file, err = os.Open(filename)
return
}
func (d *DumpFile) Write(b []byte) error {
_, err := d.file.Write(d.pack(b))
return err
}
func (d *DumpFile) ReadOneMessage() (m DumpFileMessage, err error) {
m.Ver, err = bele.ReadBeUint32(d.file)
if err != nil {
return
}
m.Typ, err = bele.ReadBeUint32(d.file)
if err != nil {
return
}
m.Len, err = bele.ReadBeUint32(d.file)
if err != nil {
return
}
m.Timestamp, err = bele.ReadBeUint32(d.file)
if err != nil {
return
}
m.Body = make([]byte, m.Len)
_, err = d.file.Read(m.Body)
// TODO(chef): [opt] 检查Ver等值 202208
// TODO(chef): [opt] check Read return value 202208
return
}
func (d *DumpFile) Close() error {
if d.file == nil {
return nil
}
return d.file.Close()
}
// ---------------------------------------------------------------------------------------------------------------------
func (m *DumpFileMessage) DebugString() string {
return fmt.Sprintf("ver: %d, typ: %d, len: %d, timestamp: %d, len: %d, hex: %s",
m.Ver, m.Typ, m.Len, m.Timestamp, len(m.Body), hex.Dump(nazabytes.Prefix(m.Body, 16)))
}
// ---------------------------------------------------------------------------------------------------------------------
func (d *DumpFile) pack(b []byte) []byte {
ret := make([]byte, len(b)+16)
bele.BePutUint32(ret, 1) // Ver
bele.BePutUint32(ret[4:], 1) // Typ
bele.BePutUint32(ret[8:], uint32(len(b))) // Len
//bele.BePutUint32(ret[12:], 0) // Timestamp
bele.BePutUint32(ret[12:], uint32(time.Now().Unix())) // Timestamp
copy(ret[16:], b)
return ret
}

@ -41,6 +41,7 @@ type ApiCtrlStartRtpPubReq struct {
StreamName string `json:"stream_name"`
Port int `json:"port"`
TimeoutMs int `json:"timeout_ms"`
DebugDumpPacket string `json:"debug_dump_packet"`
}
// ----- response ------------------------------------------------------------------------------------------------------

@ -11,15 +11,14 @@ package gb28181
import (
"errors"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazanet"
)
// TODO(chef): [feat] http api start_rtp_pub 202207
// TODO(chef): [feat] http api stop_rtp_pub 202207
// TODO(chef): [feat] http api /api/stat/all_rtp_pub不过这个可以用已有的all_group代替 202207
// TODO(chef): [feat] pub接入group 202207
// TODO(chef): [feat] 超时自动关闭 202207
// TODO(chef): [test] 保存rtp数据用于回放分析 202206
// TODO(chef): [perf] 优化ps解析内存块 202207
// TODO(chef): [opt] avpkt转rtmp时可能需要接一个缓存队列 202208
// TODO(chef): [opt] 端口被占用时HTTP API返回错误 202208
var (
Log = nazalog.GetGlobalLogger()
@ -30,3 +29,14 @@ var (
var ErrGb28181 = errors.New("lal.gb28181: fxxk")
var maxUnpackRtpListSize = 1024
var (
defaultPubSessionPortMin = uint16(30000)
defaultPubSessionPortMax = uint16(60000)
)
var defaultUdpConnPoll *nazanet.AvailUdpConnPool
func init() {
defaultUdpConnPoll = nazanet.NewAvailUdpConnPool(defaultPubSessionPortMin, defaultPubSessionPortMax)
}

@ -20,10 +20,11 @@ type PubSession struct {
streamName string
conn *nazanet.UdpConnection
sessionStat base.BasicSessionStat
hookOnReadUdpPacket nazanet.OnReadUdpPacket
disposeOnce sync.Once
conn *nazanet.UdpConnection
sessionStat base.BasicSessionStat
}
func NewPubSession() *PubSession {
@ -33,6 +34,10 @@ func NewPubSession() *PubSession {
}
}
// WithOnAvPacket 设置音视频的回调
//
// @param onAvPacket 见 PsUnpacker.WithOnAvPacket 的注释
//
func (session *PubSession) WithOnAvPacket(onAvPacket base.OnAvPacketFunc) *PubSession {
session.unpacker.WithOnAvPacket(onAvPacket)
return session
@ -43,15 +48,48 @@ func (session *PubSession) WithStreamName(streamName string) *PubSession {
return session
}
// WithHookReadUdpPacket
//
// 将udp接收数据返回给上层。
// 注意,底层的解析逻辑依然走。
// 可以用这个方式来截取数据进行调试。
//
func (session *PubSession) WithHookReadUdpPacket(fn nazanet.OnReadUdpPacket) *PubSession {
session.hookOnReadUdpPacket = fn
return session
}
// RunLoop
//
// @param addr: 如果为空,则内部选择一个可用的地址
//
func (session *PubSession) RunLoop(addr string) error {
var uconn *net.UDPConn
var err error
if addr == "" {
uconn, _, err = defaultUdpConnPoll.Acquire()
if err != nil {
return err
}
}
session.conn, err = nazanet.NewUdpConnection(func(option *nazanet.UdpConnectionOption) {
option.LAddr = addr
option.Conn = uconn
})
if err != nil {
return err
}
err = session.conn.RunLoop(func(b []byte, raddr *net.UDPAddr, err error) bool {
if len(b) == 0 && err != nil {
return false
}
if session.hookOnReadUdpPacket != nil {
session.hookOnReadUdpPacket(b, raddr, err)
}
session.sessionStat.AddReadBytes(len(b))
session.unpacker.FeedRtpPacket(b)
return true
@ -112,12 +150,10 @@ func (session *PubSession) IsAlive() (readAlive, writeAlive bool) {
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
func (session *PubSession) dispose(err error) error {
var retErr error
session.disposeOnce.Do(func() {
Log.Infof("[%s] lifecycle dispose rtmp ServerSession. err=%+v", session.UniqueKey(), err)
Log.Infof("[%s] lifecycle dispose gb28181 PubSession. err=%+v", session.UniqueKey(), err)
if session.conn == nil {
retErr = base.ErrSessionNotStarted
return

@ -9,8 +9,8 @@
package gb28181
import (
"encoding/hex"
"fmt"
"github.com/q191201771/lal/pkg/aac"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazanet"
@ -21,6 +21,24 @@ import (
)
func TestPubSession(t *testing.T) {
// 重放业务方的流
// 步骤:
// 1. 业务方提供的lalserver录制下来的dump file
// 2. 启动lalserver
// 3. 调用HTTP API
// 4. 执行该测试
//testDumpFile("127.0.0.1:10002", "/tmp/test.psdata")
// 读取一大堆.ps文件并使用udp发送到`addr`地址外部的比如外部自己启动lalserver
// 步骤:
// 1. 启动lalserver
// 2. 调用HTTP API
// 3. 执行该测试
//helpUdpSend("127.0.0.1:10002")
// 读取一大堆.ps文件并使用udp发送到`addr`地址内部启动了PubSession做接收
// 步骤:
// 1. 执行该测试
//testPubSession()
}
@ -45,6 +63,8 @@ func testPubSession() {
session := NewPubSession().WithOnAvPacket(func(packet *base.AvPacket) {
nazalog.Infof("[test2] onAvPacket. packet=%s", packet.DebugString())
if packet.IsAudio() {
aac.NewAdtsHeaderContext(packet.Payload)
_, _ = fp2.Write(packet.Payload)
} else if packet.IsVideo() {
_, _ = fp.Write(packet.Payload)
@ -54,22 +74,47 @@ func testPubSession() {
go func() {
time.Sleep(100 * time.Millisecond)
helpUdpSend(addr)
}()
runErr := session.RunLoop(addr)
nazalog.Assert(nil, runErr)
}
func helpUdpSend(addr string) {
conn, err := nazanet.NewUdpConnection(func(option *nazanet.UdpConnectionOption) {
option.RAddr = addr
})
nazalog.Assert(nil, err)
for i := 1; i < 1000; i++ {
//filename := fmt.Sprintf("/tmp/rtp-h264-aac/%d.ps", i)
filename := fmt.Sprintf("/tmp/rtp-ps-video/%d.ps", i)
for i := 1; i < 10000; i++ {
filename := fmt.Sprintf("/tmp/rtp-h264-aac/%d.ps", i)
//filename := fmt.Sprintf("/tmp/rtp-ps-video/%d.ps", i)
b, err := ioutil.ReadFile(filename)
nazalog.Assert(nil, err)
nazalog.Debugf("[test] %d: %s", i, hex.EncodeToString(b[12:]))
//nazalog.Debugf("[test] %d: %s", i, hex.EncodeToString(b[12:]))
conn.Write(b)
}
}()
}
runErr := session.RunLoop(addr)
nazalog.Assert(nil, runErr)
func testDumpFile(addr string, filename string) {
conn, err := nazanet.NewUdpConnection(func(option *nazanet.UdpConnectionOption) {
option.RAddr = addr
})
nazalog.Assert(nil, err)
df := base.NewDumpFile()
err = df.OpenToRead(filename)
nazalog.Assert(nil, err)
for {
m, err := df.ReadOneMessage()
if err != nil {
nazalog.Errorf("%+v", err)
break
}
nazalog.Debugf("%s", m.DebugString())
conn.Write(m.Body)
}
}

@ -59,6 +59,15 @@ func NewPsUnpacker() *PsUnpacker {
return p
}
// WithOnAvPacket
//
// @param onAvPacket: 回调函数中 base.AvPacket 字段说明:
// PayloadType AvPacketPt 见 base.AvPacketPt
// Timestamp int64 dts单位毫秒
// Pts int64 pts单位毫秒
// Payload []byte 对于视频h264和h265是AnnexB格式
// 对于音频AAC是前面携带adts的格式
//
func (p *PsUnpacker) WithOnAvPacket(onAvPacket base.OnAvPacketFunc) *PsUnpacker {
p.onAvPacket = onAvPacket
return p
@ -71,14 +80,14 @@ func (p *PsUnpacker) WithOnAvPacket(onAvPacket base.OnAvPacketFunc) *PsUnpacker
// @param b: rtp包注意包含rtp包头部分
//
func (p *PsUnpacker) FeedRtpPacket(b []byte) error {
nazalog.Debugf("> FeedRtpPacket. len=%d", len(b))
//nazalog.Debugf("> FeedRtpPacket. len=%d", len(b))
ipkt, err := rtprtcp.ParseRtpPacket(b)
if err != nil {
return err
}
nazalog.Debugf("h=%+v", ipkt.Header)
//nazalog.Debugf("h=%+v", ipkt.Header)
var isStartPositionFn = func(pkt rtprtcp.RtpPacket) bool {
body := pkt.Body()
@ -144,7 +153,7 @@ func (p *PsUnpacker) FeedRtpPacket(b []byte) error {
// FeedRtpBody 注意,传入的数据应该是连续的,属于完整帧的
//
func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpts uint32) {
nazalog.Debugf("> FeedRtpBody. len=%d, prev buf=%d", len(rtpBody), p.buf.Len())
//nazalog.Debugf("> FeedRtpBody. len=%d, prev buf=%d", len(rtpBody), p.buf.Len())
p.buf.Write(rtpBody)
// ISO/IEC iso13818-1
//
@ -160,28 +169,28 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpts uint32) {
var consumed int
switch code {
case psPackStartCodePackHeader:
nazalog.Debugf("----------pack header----------")
//nazalog.Debugf("----------pack header----------")
consumed = parsePackHeader(rb, i)
case psPackStartCodeSystemHeader:
nazalog.Debugf("----------system header----------")
//nazalog.Debugf("----------system header----------")
// 2.5.3.5 System header
// Table 2-32 - Program Stream system header
//
consumed = parsePackStreamBody(rb, i)
case psPackStartCodeProgramStreamMap:
nazalog.Debugf("----------program stream map----------")
//nazalog.Debugf("----------program stream map----------")
consumed = p.parsePsm(rb, i)
case psPackStartCodeAudioStream:
nazalog.Debugf("----------audio stream----------")
//nazalog.Debugf("----------audio stream----------")
consumed = p.parseAvStream(int(code), rtpts, rb, i)
case psPackStartCodeVideoStream:
nazalog.Debugf("----------video stream----------")
//nazalog.Debugf("----------video stream----------")
consumed = p.parseAvStream(int(code), rtpts, rb, i)
case psPackStartCodePackEnd:
nazalog.Errorf("----------skip----------. %s", hex.Dump(nazabytes.Prefix(rb[i-4:], 32)))
consumed = 0
case psPackStartCodeHikStream:
nazalog.Debugf("----------hik stream----------")
//nazalog.Debugf("----------hik stream----------")
consumed = parsePackStreamBody(rb, i)
case psPackStartCodePesPrivate2:
fallthrough
@ -206,7 +215,7 @@ func (p *PsUnpacker) FeedRtpBody(rtpBody []byte, rtpts uint32) {
return
}
p.buf.Skip(i + consumed)
nazalog.Debugf("skip. %d", i+consumed)
//nazalog.Debugf("skip. %d", i+consumed)
}
}
@ -241,7 +250,7 @@ func (p *PsUnpacker) parsePsm(rb []byte, index int) int {
// elementary_stream_map_length
esml := int(bele.BeUint16(rb[i:]))
nazalog.Debugf("l=%d, esml=%d", l, esml)
//nazalog.Debugf("l=%d, esml=%d", l, esml)
i += 2
if len(rb[i:]) < esml+4 {
@ -276,7 +285,7 @@ func (p *PsUnpacker) parsePsm(rb []byte, index int) int {
}
}
esil := int(bele.BeUint16(rb[i:]))
nazalog.Debugf("streamType=%d, streamId=%d, esil=%d", streamType, streamId, esil)
//nazalog.Debugf("streamType=%d, streamId=%d, esil=%d", streamType, streamId, esil)
i += 2 + esil
esml = esml - 4 - esil
}
@ -296,7 +305,7 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int)
}
i += 2
nazalog.Debugf("parseAvStream. code=%d, expected=%d, actual=%d", code, length, len(rb)-i)
//nazalog.Debugf("parseAvStream. code=%d, expected=%d, actual=%d", code, length, len(rb)-i)
if len(rb)-i < length {
return -1
@ -321,12 +330,12 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int)
i += phdl
nazalog.Debugf("parseAvStream. code=%d, length=%d, pts=%d, dts=%d", code, length, pts, dts)
//nazalog.Debugf("parseAvStream. code=%d, length=%d, pts=%d, dts=%d", code, length, pts, dts)
if code == psPackStartCodeAudioStream {
// 注意,处理音频的逻辑和处理视频的类似,参考处理视频的注释
if p.audioStreamType == StreamTypeAAC {
nazalog.Debugf("audio code=%d, length=%d, ptsDtsFlag=%d, phdl=%d, pts=%d, dts=%d,type=%d", code, length, ptsDtsFlag, phdl, pts, dts, p.audioStreamType)
//nazalog.Debugf("audio code=%d, length=%d, ptsDtsFlag=%d, phdl=%d, pts=%d, dts=%d,type=%d", code, length, ptsDtsFlag, phdl, pts, dts, p.audioStreamType)
if pts == -1 {
if p.preAudioPts == -1 {
if p.preAudioRtpts == -1 {
@ -335,8 +344,8 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int)
if p.preAudioRtpts != int64(rtpts) {
p.onAvPacket(&base.AvPacket{
PayloadType: p.audioPayloadType,
Timestamp: p.preAudioDts,
Pts: p.preAudioPts,
Timestamp: p.preAudioDts / 90,
Pts: p.preAudioPts / 90,
Payload: p.audioBuf,
})
p.audioBuf = nil
@ -351,8 +360,8 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int)
if pts != p.preAudioPts && p.preAudioPts >= 0 {
p.onAvPacket(&base.AvPacket{
PayloadType: p.audioPayloadType,
Timestamp: p.preAudioDts,
Pts: p.preAudioPts,
Timestamp: p.preAudioDts / 90,
Pts: p.preAudioPts / 90,
Payload: p.audioBuf,
})
p.audioBuf = nil
@ -483,8 +492,8 @@ func (p *PsUnpacker) iterateNaluByStartCode(code int, pts, dts int64) {
p.onAvPacket(&base.AvPacket{
PayloadType: p.videoPayloadType,
Timestamp: dts,
Pts: pts,
Timestamp: dts / 90,
Pts: pts / 90,
Payload: nalu,
})
@ -572,5 +581,5 @@ func readPts(b []byte) (fb uint8, pts int64) {
}
func defaultOnAvPacket(packet *base.AvPacket) {
// noop
}

@ -11,6 +11,7 @@ package gb28181
import (
"encoding/hex"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/naza/pkg/nazamd5"
"io/ioutil"
"os"
@ -111,16 +112,15 @@ var hevcNalu = []byte{
func TestPsUnpacker2(t *testing.T) {
// 解析别人提供的一些测试数据,开发阶段用
//test1()
//test2()
}
func test1() {
nazalog.Debugf("[test1] > test1")
// 读取raw文件(包连在一起不包含rtp header)存取h264文件
b, err := ioutil.ReadFile("/tmp/udp.raw")
//b, err := ioutil.ReadFile("/tmp/udp.raw")
b, err := ioutil.ReadFile("/Volumes/T7/new/avfile/ka_at_13sec.ps")
nazalog.Assert(nil, err)
fp, err := os.Create("/tmp/udp.h264")
@ -132,13 +132,21 @@ func test1() {
return
}
nazalog.Debugf("[test1] onVideo. length=%d", len(packet.Payload))
nazalog.Debugf("[test1] onVideo. %s, %s", hevc.ParseNaluTypeReadable(packet.Payload[4]), packet.DebugString())
if waitingSps {
if packet.PayloadType == base.AvPacketPtAvc {
if avc.ParseNaluType(packet.Payload[4]) == avc.NaluTypeSps {
waitingSps = false
} else {
return
}
} else if packet.PayloadType == base.AvPacketPtHevc {
if hevc.ParseNaluType(packet.Payload[4]) == hevc.NaluTypeSps {
waitingSps = false
} else {
return
}
}
}
_, _ = fp.Write(packet.Payload)
})

@ -33,13 +33,13 @@ import (
// | . | rtmp pub | ps pub |
// | 添加到group中 | Y | Y |
// | 到输出流的转换路径关系 | Y | Y |
// | 删除 | Y |
// | 删除 | Y | Y |
// | group.hasPubSession() | Y | Y |
// | group.disposeInactiveSessions()检查超时并清理 | Y | Y |
// | group.Dispose()时销毁 | Y |
// | group.GetStat()时获取信息 | Y |
// | group.KickSession()时踢出 | Y |
// | group.disposeInactiveSessions()检查超时并清理 | Y |
// | group.updateAllSessionStat()更新信息 | Y |
// | group.hasPubSession() | Y |
// | group.inSessionUniqueKey() | Y |
// ---------------------------------------------------------------------------------------------------------------------
@ -93,7 +93,8 @@ type Group struct {
dummyAudioFilter *remux.DummyAudioFilter
// ps pub使用
psPubTimeoutSec uint32 // 超时时间
psPubPrevInactiveCheckTick uint32 // 上次检查时间
psPubPrevInactiveCheckTick int64 // 上次检查时间
psPubDumpFile *base.DumpFile
// rtmp sub使用
rtmpGopCache *remux.GopCache
// httpflv sub使用
@ -145,6 +146,7 @@ func NewGroup(appName string, streamName string, config *Config, observer IGroup
rtmpGopCache: remux.NewGopCache("rtmp", uk, config.RtmpConfig.GopNum),
httpflvGopCache: remux.NewGopCache("httpflv", uk, config.HttpflvConfig.GopNum),
httptsGopCache: remux.NewGopCacheMpegts(uk, config.HttptsConfig.GopNum),
psPubPrevInactiveCheckTick: -1,
}
g.initRelayPushByConfig()
@ -389,7 +391,25 @@ func (group *Group) OutSessionNum() int {
// TODO chef: [refactor] 梳理和naza.Connection超时重复部分
//
func (group *Group) disposeInactiveSessions(tickCount uint32) {
// to be continued
if group.psPubSession != nil {
if group.psPubTimeoutSec == 0 {
// noop
// 没有超时逻辑
} else {
if group.psPubPrevInactiveCheckTick == -1 ||
tickCount-uint32(group.psPubPrevInactiveCheckTick) >= group.psPubTimeoutSec {
if readAlive, _ := group.psPubSession.IsAlive(); !readAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.psPubSession.UniqueKey())
group.psPubSession.Dispose()
}
group.psPubPrevInactiveCheckTick = int64(tickCount)
}
}
}
// 以下都是以 checkSessionAliveIntervalSec 为间隔的清理逻辑
if tickCount%checkSessionAliveIntervalSec != 0 {
return
@ -487,7 +507,8 @@ func (group *Group) updateAllSessionStat() {
}
func (group *Group) hasPubSession() bool {
return group.rtmpPubSession != nil || group.rtspPubSession != nil || group.customizePubSession != nil
return group.rtmpPubSession != nil || group.rtspPubSession != nil || group.customizePubSession != nil ||
group.psPubSession != nil
}
func (group *Group) hasSubSession() bool {

@ -11,6 +11,8 @@ package logic
import (
"fmt"
"github.com/q191201771/lal/pkg/gb28181"
"github.com/q191201771/naza/pkg/nazalog"
"net"
"time"
"github.com/q191201771/lal/pkg/base"
@ -116,12 +118,32 @@ func (group *Group) StartRtpPub(req base.ApiCtrlStartRtpPubReq) {
}
pubSession := gb28181.NewPubSession().WithStreamName(req.StreamName).WithOnAvPacket(group.OnAvPacketFromPsPubSession)
if req.DebugDumpPacket != "" {
group.psPubDumpFile = base.NewDumpFile()
if err := group.psPubDumpFile.OpenToWrite(req.DebugDumpPacket); err != nil {
Log.Errorf("%+v", err)
}
}
pubSession.WithHookReadUdpPacket(func(b []byte, raddr *net.UDPAddr, err error) bool {
if group.psPubDumpFile != nil {
group.psPubDumpFile.Write(b)
}
return true
})
Log.Debugf("[%s] [%s] add RTP PubSession into group.", group.UniqueKey, pubSession.UniqueKey())
group.psPubSession = pubSession
group.psPubTimeoutSec = uint32(req.TimeoutMs / 1000)
group.addIn()
group.rtsp2RtmpRemuxer = remux.NewAvPacket2RtmpRemuxer()
group.rtsp2RtmpRemuxer.WithOption(func(option *base.AvPacketStreamOption) {
option.VideoFormat = base.AvPacketStreamVideoFormatAnnexb
option.AudioFormat = base.AvPacketStreamAudioFormatAdtsAac
})
group.rtsp2RtmpRemuxer.WithOnRtmpMsg(group.onRtmpMsgFromRemux)
if group.shouldStartRtspRemuxer() {
group.rtmp2RtspRemuxer = remux.NewRtmp2RtspRemuxer(
group.onSdpFromRemux,
@ -130,8 +152,13 @@ func (group *Group) StartRtpPub(req base.ApiCtrlStartRtpPubReq) {
}
go func() {
addr := fmt.Sprintf(":%d", req.Port)
pubSession.RunLoop(addr)
var addr string
if req.Port != 0 {
addr = fmt.Sprintf(":%d", req.Port)
}
err := pubSession.RunLoop(addr)
nazalog.Debugf("[%s] [%s] ps PubSession run loop exit, err=%v", group.UniqueKey, pubSession.UniqueKey(), err)
group.DelPsPubSession(pubSession)
}()
}
@ -204,6 +231,12 @@ func (group *Group) AddRtspPullSession(session *rtsp.PullSession) error {
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) DelPsPubSession(session *gb28181.PubSession) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.delPsPubSession(session)
}
func (group *Group) DelCustomizePubSession(sessionCtx ICustomizePubSessionContext) {
group.mutex.Lock()
defer group.mutex.Unlock()
@ -260,6 +293,18 @@ func (group *Group) DelRtspPullSession(session *rtsp.PullSession) {
// ---------------------------------------------------------------------------------------------------------------------
func (group *Group) delPsPubSession(session *gb28181.PubSession) {
Log.Debugf("[%s] [%s] del ps PubSession from group.", group.UniqueKey, session.UniqueKey())
if session != group.psPubSession {
Log.Warnf("[%s] del ps pub session but not match. del session=%s, group session=%p",
group.UniqueKey, session.UniqueKey(), group.customizePubSession)
return
}
group.delIn()
}
func (group *Group) delCustomizePubSession(sessionCtx ICustomizePubSessionContext) {
Log.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, sessionCtx.UniqueKey())
@ -338,10 +383,15 @@ func (group *Group) delIn() {
group.rtmpPubSession = nil
group.rtspPubSession = nil
group.customizePubSession = nil
group.psPubSession = nil
group.rtsp2RtmpRemuxer = nil
group.rtmp2RtspRemuxer = nil
group.dummyAudioFilter = nil
if group.psPubDumpFile != nil {
group.psPubDumpFile.Close()
group.psPubDumpFile = nil
}
group.rtmpGopCache.Clear()
group.httpflvGopCache.Clear()
group.httptsGopCache.Clear()

@ -197,7 +197,7 @@ func (h *HttpApiServer) ctrlStartRtpPubHandler(w http.ResponseWriter, req *http.
info.Port = 0
}
if !j.Exist("timeout_ms") {
info.TimeoutMs = 10000
info.TimeoutMs = 60000
}
Log.Infof("http api start rtp pub. req info=%+v", info)

@ -21,9 +21,11 @@ import (
// AvPacket2RtmpRemuxer AvPacket转换为RTMP
//
// 目前AvPacket来自
// - RTSP的sdp以及rtp的合帧包
// - 业务方通过接口向lalserver输入的流
// 目前AvPacket来自:
//
// - RTSP: sdp以及rtp的合帧包
// - gb28181 ps: rtp的合帧包
// - customize: 业务方通过接口向lalserver输入的流
// - 理论上也支持webrtc后续接入webrtc时再验证
//
type AvPacket2RtmpRemuxer struct {
@ -37,6 +39,8 @@ type AvPacket2RtmpRemuxer struct {
vps []byte // 从AvPacket数据中获取
sps []byte
pps []byte
hasAdts2Asc bool
}
func NewAvPacket2RtmpRemuxer() *AvPacket2RtmpRemuxer {
@ -47,6 +51,10 @@ func NewAvPacket2RtmpRemuxer() *AvPacket2RtmpRemuxer {
}
}
// WithOption
//
// TODO(chef): [refactor] 返回*AvPacket2RtmpRemuxer 202208
//
func (r *AvPacket2RtmpRemuxer) WithOption(modOption func(option *base.AvPacketStreamOption)) {
modOption(&r.option)
}
@ -138,7 +146,7 @@ func (r *AvPacket2RtmpRemuxer) InitWithAvConfig(asc, vps, sps, pps []byte) {
//
// @param pkt:
//
// - 如果是aac格式是裸数据不需要adts头
// - 如果是aac格式是裸数据或带adts头具体取决于前面的配置
// - 如果是h264格式是avcc或Annexb具体取决于前面的配置
//
// 内部不持有该内存块
@ -271,6 +279,7 @@ func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) {
}
case base.AvPacketPtAac:
if r.option.AudioFormat == base.AvPacketStreamAudioFormatRawAac {
length := len(pkt.Payload) + 2
payload := make([]byte, length)
// TODO(chef) 处理此处的魔数0xAF
@ -278,6 +287,26 @@ func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) {
payload[1] = base.RtmpAacPacketTypeRaw
copy(payload[2:], pkt.Payload)
r.emitRtmpAvMsg(true, payload, pkt.Timestamp)
} else if r.option.AudioFormat == base.AvPacketStreamAudioFormatAdtsAac {
if !r.hasAdts2Asc {
adts, err := aac.MakeAudioDataSeqHeaderWithAdtsHeader(pkt.Payload)
if err != nil {
Log.Errorf("%+v", err)
}
r.emitRtmpAvMsg(true, adts, pkt.Timestamp)
r.hasAdts2Asc = true
}
length := len(pkt.Payload) - 5 // -7+2
payload := make([]byte, length)
payload[0] = 0xAF
payload[1] = base.RtmpAacPacketTypeRaw
copy(payload[7:], pkt.Payload)
r.emitRtmpAvMsg(true, payload, pkt.Timestamp)
}
default:
Log.Warnf("unsupported packet. type=%d", pkt.PayloadType)
}

Loading…
Cancel
Save