[feat] gb28181: 基本完成PubSession

pull/202/head
q191201771 3 years ago
parent eedfe5525b
commit a036db8131

@ -14,6 +14,8 @@ import (
"github.com/q191201771/naza/pkg/nazabytes"
)
// ---------------------------------------------------------------------------------------------------------------------
type AvPacketPt int
const (
@ -37,11 +39,15 @@ func (a AvPacketPt) ReadableString() string {
return ""
}
// ---------------------------------------------------------------------------------------------------------------------
// AvPacket
//
// 不同场景使用时,字段含义可能不同。
// 使用AvPacket的地方应注明各字段的含义。
//
//
//
type AvPacket struct {
PayloadType AvPacketPt
Timestamp int64 // 如无特殊说明此字段是Dts
@ -58,6 +64,10 @@ func (packet *AvPacket) IsVideo() bool {
}
func (packet *AvPacket) DebugString() string {
return fmt.Sprintf("[%p] type=%s, timestamp=%d, len=%d, payload=%s",
packet, packet.PayloadType.ReadableString(), packet.Timestamp, len(packet.Payload), hex.Dump(nazabytes.Prefix(packet.Payload, 32)))
return fmt.Sprintf("[%p] type=%s, timestamp=%d, pts=%d, len=%d, payload=%s",
packet, packet.PayloadType.ReadableString(), packet.Timestamp, packet.Pts, len(packet.Payload), hex.Dump(nazabytes.Prefix(packet.Payload, 32)))
}
// ---------------------------------------------------------------------------------------------------------------------
type OnAvPacketFunc func(packet *AvPacket)

@ -13,6 +13,10 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
type IStatable interface {
GetStat() connection.Stat // TODO(chef): [refactor] 考虑为 nazanet.UdpConnection 实现这个接口
}
// BasicSessionStat
//
// 包含两部分功能:
@ -76,6 +80,10 @@ func NewBasicSessionStat(sessionType SessionType, remoteAddr string) BasicSessio
s.stat.SessionId = GenUkFlvSubSession()
s.stat.BaseType = SessionBaseTypeSubStr
s.stat.Protocol = SessionProtocolFlvStr
case SessionTypePsPub:
s.stat.SessionId = GenUkPsPubSession()
s.stat.BaseType = SessionBaseTypePubStr
s.stat.Protocol = SessionProtocolPsStr
}
return s
}
@ -102,7 +110,7 @@ func (s *BasicSessionStat) UpdateStat(intervalSec uint32) {
s.updateStat(s.currConnStat.ReadBytesSum.Load(), s.currConnStat.WroteBytesSum.Load(), s.stat.BaseType, intervalSec)
}
func (s *BasicSessionStat) UpdateStatWitchConn(conn connection.Connection, intervalSec uint32) {
func (s *BasicSessionStat) UpdateStatWitchConn(conn IStatable, intervalSec uint32) {
currStat := conn.GetStat()
s.updateStat(currStat.ReadBytesSum, currStat.WroteBytesSum, s.stat.BaseType, intervalSec)
}
@ -113,7 +121,7 @@ func (s *BasicSessionStat) GetStat() StatSession {
return s.stat
}
func (s *BasicSessionStat) GetStatWithConn(conn connection.Connection) StatSession {
func (s *BasicSessionStat) GetStatWithConn(conn IStatable) StatSession {
connStat := conn.GetStat()
s.stat.ReadBytesSum = connStat.ReadBytesSum
s.stat.WroteBytesSum = connStat.WroteBytesSum
@ -124,7 +132,7 @@ func (s *BasicSessionStat) IsAlive() (readAlive, writeAlive bool) {
return s.isAlive(s.currConnStat.ReadBytesSum.Load(), s.currConnStat.WroteBytesSum.Load())
}
func (s *BasicSessionStat) IsAliveWitchConn(conn connection.Connection) (readAlive, writeAlive bool) {
func (s *BasicSessionStat) IsAliveWitchConn(conn IStatable) (readAlive, writeAlive bool) {
currStat := conn.GetStat()
return s.isAlive(currStat.ReadBytesSum, currStat.WroteBytesSum)
}

@ -38,12 +38,14 @@ const (
SessionTypeFlvSub SessionType = SessionProtocolFlv<<8 | SessionBaseTypeSub
SessionTypeFlvPull SessionType = SessionProtocolFlv<<8 | SessionBaseTypePull
SessionTypeTsSub SessionType = SessionProtocolTs<<8 | SessionBaseTypeSub
SessionTypePsPub SessionType = SessionProtocolPs<<8 | SessionBaseTypePub
SessionProtocolCustomize = 1
SessionProtocolRtmp = 2
SessionProtocolRtsp = 3
SessionProtocolFlv = 4
SessionProtocolTs = 5
SessionProtocolPs = 6
SessionBaseTypePubSub = 1
SessionBaseTypePub = 2
@ -56,6 +58,7 @@ const (
SessionProtocolRtspStr = "RTSP"
SessionProtocolFlvStr = "FLV"
SessionProtocolTsStr = "TS"
SessionProtocolPsStr = "PS"
SessionBaseTypePubSubStr = "PUBSUB"
SessionBaseTypePubStr = "PUB"
@ -64,38 +67,6 @@ const (
SessionBaseTypePullStr = "PULL"
)
//func (protocol SessionProtocol) Stringify() string {
// switch protocol {
// case SessionProtocolCustomize:
// return SessionProtocolCustomizeStr
// case SessionProtocolRtmp:
// return SessionProtocolRtmpStr
// case SessionProtocolRtsp:
// return SessionProtocolRtspStr
// case SessionProtocolFlv:
// return SessionProtocolFlvStr
// case SessionProtocolTs:
// return SessionProtocolTsStr
// }
// return "INVALID"
//}
//
//func (typ SessionBaseType) Stringify() string {
// switch typ {
// case SessionBaseTypePubSub:
// return SessionBaseTypePubSubStr
// case SessionBaseTypePub:
// return SessionBaseTypePubStr
// case SessionBaseTypeSub:
// return SessionBaseTypeSubStr
// case SessionBaseTypePush:
// return SessionBaseTypePushStr
// case SessionBaseTypePull:
// return SessionBaseTypePullStr
// }
// return "INVALID"
//}
type ISession interface {
ISessionUrlContext
IObject
@ -218,13 +189,6 @@ type IObject interface {
UniqueKey() string
}
//type ISessionType interface {
// Protocol() string
// BaseType() string
//
// //UniqueKey() string
//}
// TODO chef: rtmp.ClientSession修改为BaseClientSession更好些
// TODO(chef): [refactor] 整理 subsession 接口部分 IsFresh 和 ShouldWaitVideoKeyFrame

@ -22,6 +22,7 @@ const (
UkPreFlvSubSession = SessionProtocolFlvStr + SessionBaseTypePubSubStr // "FLVSUB"
UkPreFlvPullSession = SessionProtocolFlvStr + SessionBaseTypePullStr // "FLVPULL"
UkPreTsSubSession = SessionProtocolTsStr + SessionBaseTypePubSubStr // "TSSUB"
UkPrePsPubSession = SessionProtocolPsStr + SessionBaseTypePubStr // "PSPUB"
UkPreRtspServerCommandSession = "RTSPSRVCMD" // 这个不暴露给上层
@ -82,6 +83,10 @@ func GenUkFlvPullSession() string {
return siUkFlvPullSession.GenUniqueKey()
}
func GenUkPsPubSession() string {
return siUkPsPubSession.GenUniqueKey()
}
func GenUkGroup() string {
return siUkGroup.GenUniqueKey()
}
@ -107,6 +112,7 @@ var (
siUkFlvSubSession *unique.SingleGenerator
siUkTsSubSession *unique.SingleGenerator
siUkFlvPullSession *unique.SingleGenerator
siUkPsPubSession *unique.SingleGenerator
siUkGroup *unique.SingleGenerator
siUkHlsMuxer *unique.SingleGenerator
@ -126,6 +132,7 @@ func init() {
siUkFlvSubSession = unique.NewSingleGenerator(UkPreFlvSubSession)
siUkTsSubSession = unique.NewSingleGenerator(UkPreTsSubSession)
siUkFlvPullSession = unique.NewSingleGenerator(UkPreFlvPullSession)
siUkPsPubSession = unique.NewSingleGenerator(UkPrePsPubSession)
siUkGroup = unique.NewSingleGenerator(UkPreGroup)
siUkHlsMuxer = unique.NewSingleGenerator(UkPreHlsMuxer)

@ -8,13 +8,20 @@
package gb28181
import "errors"
import (
"errors"
"github.com/q191201771/naza/pkg/nazalog"
)
// TODO(chef): gb28181 package处于开发中阶段请不使用
// TODO(chef): [opt] rtp排序 202206
// TODO(chef): [test] 保存rtp数据用于回放分析 202206
// TODO(chef): [perf] 优化ps解析内存块 202207
var (
Log = nazalog.GetGlobalLogger()
)
// ErrGb28181 TODO(chef): [refactor] move to pkg base 202207
//
var ErrGb28181 = errors.New("lal.gb28181: fxxk")

@ -27,17 +27,15 @@ const (
)
type PsStreamType int
const (
StreamTypeH264 PsStreamType = 0x1b
StreamTypeH265 = 0x24
StreamTypeAAC = 0x0f
StreamTypeG711A = 0x90 //PCMA
StreamTypeG7221 = 0x92
StreamTypeG7231 = 0x93
StreamTypeG729 = 0x99
StreamTypeUnknown = -1
StreamTypeH264 uint8 = 0x1b
StreamTypeH265 = 0x24
StreamTypeAAC = 0x0f
StreamTypeG711A = 0x90 //PCMA
StreamTypeG7221 = 0x92
StreamTypeG7231 = 0x93
StreamTypeG729 = 0x99
StreamTypeUnknown = -1
)
const psBufInitSize = 4096

@ -9,18 +9,38 @@
package gb28181
import (
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazanet"
"net"
"sync"
)
type PubSession struct {
conn *nazanet.UdpConnection
rtprtcp.IRtpUnpacker
unpacker *PsUnpacker
streamName string
conn *nazanet.UdpConnection
sessionStat base.BasicSessionStat
disposeOnce sync.Once
}
func NewPubSession() *PubSession {
return &PubSession{}
return &PubSession{
unpacker: NewPsUnpacker(),
sessionStat: base.NewBasicSessionStat(base.SessionTypePsPub, ""),
}
}
func (session *PubSession) WithOnAvPacket(onAvPacket base.OnAvPacketFunc) *PubSession {
session.unpacker.WithOnAvPacket(onAvPacket)
return session
}
func (session *PubSession) WithStreamName(streamName string) *PubSession {
session.streamName = streamName
return session
}
func (session *PubSession) RunLoop(addr string) error {
@ -32,7 +52,77 @@ func (session *PubSession) RunLoop(addr string) error {
return err
}
err = session.conn.RunLoop(func(b []byte, raddr *net.UDPAddr, err error) bool {
session.sessionStat.AddReadBytes(len(b))
session.unpacker.FeedRtpPacket(b)
return true
})
return err
}
// ----- IServerSessionLifecycle ---------------------------------------------------------------------------------------
func (session *PubSession) Dispose() error {
return session.dispose(nil)
}
// ----- ISessionUrlContext --------------------------------------------------------------------------------------------
func (session *PubSession) Url() string {
Log.Warnf("[%s] PubSession.Url() is not implemented", session.UniqueKey())
return "invalid"
}
func (session *PubSession) AppName() string {
Log.Warnf("[%s] PubSession.AppName() is not implemented", session.UniqueKey())
return "invalid"
}
func (session *PubSession) StreamName() string {
// 如果stream name没有设置则使用session的unique key作为stream name
if session.streamName == "" {
return session.UniqueKey()
}
return session.streamName
}
func (session *PubSession) RawQuery() string {
Log.Warnf("[%s] PubSession.RawQuery() is not implemented", session.UniqueKey())
return "invalid"
}
// ----- IObject -------------------------------------------------------------------------------------------------------
func (session *PubSession) UniqueKey() string {
return session.sessionStat.UniqueKey()
}
// ----- ISessionStat --------------------------------------------------------------------------------------------------
func (session *PubSession) UpdateStat(intervalSec uint32) {
session.sessionStat.UpdateStat(intervalSec)
}
func (session *PubSession) GetStat() base.StatSession {
return session.sessionStat.GetStat()
}
func (session *PubSession) IsAlive() (readAlive, writeAlive bool) {
return session.sessionStat.IsAlive()
}
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
func (session *PubSession) dispose(err error) error {
var retErr error
session.disposeOnce.Do(func() {
Log.Infof("[%s] lifecycle dispose rtmp ServerSession. err=%+v", session.UniqueKey(), err)
if session.conn == nil {
retErr = base.ErrSessionNotStarted
return
}
retErr = session.conn.Dispose()
})
return retErr
}

@ -0,0 +1,75 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package gb28181
import (
"encoding/hex"
"fmt"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazanet"
"io/ioutil"
"os"
"testing"
"time"
)
func TestPubSession(t *testing.T) {
//testPubSession()
}
func testPubSession() {
// 一个udp包一个文件按行分隔hex stream格式如下
// 8060 0000 0000 0000 0beb c567 0000 01ba
// 46ab 1ea9 4401 0139 9ffe ffff 0094 ab0d
fp, err := os.Create("/tmp/udp2.h264")
nazalog.Assert(nil, err)
defer fp.Close()
fp2, err := os.Create("/tmp/udp2.aac")
nazalog.Assert(nil, err)
defer fp2.Close()
pool := nazanet.NewAvailUdpConnPool(1024, 10240)
port, err := pool.Peek()
nazalog.Assert(nil, err)
addr := fmt.Sprintf("127.0.0.1:%d", port)
session := NewPubSession().WithOnAvPacket(func(packet *base.AvPacket) {
nazalog.Infof("[test2] onAvPacket. packet=%s", packet.DebugString())
if packet.IsAudio() {
_, _ = fp2.Write(packet.Payload)
} else if packet.IsVideo() {
_, _ = fp.Write(packet.Payload)
}
})
go func() {
time.Sleep(100 * time.Millisecond)
conn, err := nazanet.NewUdpConnection(func(option *nazanet.UdpConnectionOption) {
option.RAddr = addr
})
nazalog.Assert(nil, err)
for i := 1; i < 1000; i++ {
//filename := fmt.Sprintf("/tmp/rtp-h264-aac/%d.ps", i)
filename := fmt.Sprintf("/tmp/rtp-ps-video/%d.ps", i)
b, err := ioutil.ReadFile(filename)
nazalog.Assert(nil, err)
nazalog.Debugf("[test] %d: %s", i, hex.EncodeToString(b[12:]))
conn.Write(b)
}
}()
runErr := session.RunLoop(addr)
nazalog.Assert(nil, runErr)
}

@ -11,53 +11,38 @@ package gb28181
import (
"bytes"
"encoding/hex"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/h2645"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazabits"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/naza/pkg/nazalog"
)
type onAudioFn func(payload []byte, dts int64, pts int64)
type onVideoFn func(payload []byte, dts int64, pts int64)
type IPsUnpackerObserver interface {
OnAudio(payload []byte, dts int64, pts int64)
// OnVideo
//
// @param payload: annexb格式
//
OnVideo(payload []byte, dts int64, pts int64)
}
// PsUnpacker 解析ps(Progream Stream)流
// PsUnpacker 解析ps(Program Stream)流
//
type PsUnpacker struct {
list rtprtcp.RtpPacketList
buf *nazabytes.Buffer
videoBuf []byte
audioBuf []byte
videoBuf []byte
videoStreamType byte
audioStreamType byte
audioStreamType uint8
videoStreamType uint8
audioPayloadType base.AvPacketPt
videoPayloadType base.AvPacketPt
preVideoPts int64
preAudioPts int64
preVideoDts int64
preVideoPts int64
preAudioDts int64
preVideoDts int64
preVideoRtpts int64
preAudioRtpts int64
preVideoRtpts int64
onAudio onAudioFn
onVideo onVideoFn
onAvPacket base.OnAvPacketFunc
}
func NewPsUnpacker() *PsUnpacker {
@ -67,36 +52,18 @@ func NewPsUnpacker() *PsUnpacker {
preAudioPts: -1,
preVideoRtpts: -1,
preAudioRtpts: -1,
onAudio: defaultOnAudio,
onVideo: defaultOnVideo,
onAvPacket: defaultOnAvPacket,
}
p.list.InitMaxSize(maxUnpackRtpListSize)
return p
}
// WithOnAudio
//
// TODO(chef): [refactor] 使用AvPacket 202206
//
func (p *PsUnpacker) WithOnAudio(onAudio func(payload []byte, dts int64, pts int64)) *PsUnpacker {
p.onAudio = onAudio
func (p *PsUnpacker) WithOnAvPacket(onAvPacket base.OnAvPacketFunc) *PsUnpacker {
p.onAvPacket = onAvPacket
return p
}
func (p *PsUnpacker) WithOnVideo(onVideo func(payload []byte, dts int64, pts int64)) *PsUnpacker {
p.onVideo = onVideo
return p
}
func (p *PsUnpacker) VideoStreamType() byte {
return p.videoStreamType
}
func (p *PsUnpacker) AudioStreamType() byte {
return p.audioStreamType
}
// FeedRtpPacket
//
// 注意,内部会处理丢包、乱序等问题
@ -289,8 +256,24 @@ func (p *PsUnpacker) parsePsm(rb []byte, index int) int {
// elementary_stream_info_length
if streamId >= 0xe0 && streamId <= 0xef {
p.videoStreamType = streamType
switch p.videoStreamType {
case StreamTypeH264:
p.videoPayloadType = base.AvPacketPtAvc
case StreamTypeH265:
p.videoPayloadType = base.AvPacketPtHevc
default:
p.videoPayloadType = base.AvPacketPtUnknown
}
} else if streamId >= 0xc0 && streamId <= 0xdf {
p.audioStreamType = streamType
switch p.audioStreamType {
case StreamTypeAAC:
p.audioPayloadType = base.AvPacketPtAac
default:
p.audioPayloadType = base.AvPacketPtUnknown
}
}
esil := int(bele.BeUint16(rb[i:]))
nazalog.Debugf("streamType=%d, streamId=%d, esil=%d", streamType, streamId, esil)
@ -350,7 +333,12 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int)
// noop
} else {
if p.preAudioRtpts != int64(rtpts) {
p.onAudio(p.audioBuf, p.preAudioDts, p.preAudioPts)
p.onAvPacket(&base.AvPacket{
PayloadType: p.audioPayloadType,
Timestamp: p.preAudioDts,
Pts: p.preAudioPts,
Payload: p.audioBuf,
})
p.audioBuf = nil
} else {
// noop
@ -361,7 +349,12 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int)
}
} else {
if pts != p.preAudioPts && p.preAudioPts >= 0 {
p.onAudio(p.audioBuf, p.preAudioDts, p.preAudioPts)
p.onAvPacket(&base.AvPacket{
PayloadType: p.audioPayloadType,
Timestamp: p.preAudioDts,
Pts: p.preAudioPts,
Payload: p.audioBuf,
})
p.audioBuf = nil
} else {
// noop
@ -487,12 +480,14 @@ func (p *PsUnpacker) iterateNaluByStartCode(code int, pts, dts int64) {
nalu = p.videoBuf[startPos:]
}
startPos = nextPos
if p.videoStreamType == StreamTypeH265 {
nazalog.Debugf("Video code=%d, length=%d,pts=%d, dts=%d, type=%s", code, len(nalu), pts, dts, hevc.ParseNaluTypeReadable(nalu[preLeading]))
} else {
nazalog.Debugf("Video code=%d, length=%d,pts=%d, dts=%d, type=%s", code, len(nalu), pts, dts, avc.ParseNaluTypeReadable(nalu[preLeading]))
}
p.onVideo(nalu, dts, pts)
p.onAvPacket(&base.AvPacket{
PayloadType: p.videoPayloadType,
Timestamp: dts,
Pts: pts,
Payload: nalu,
})
if nextPos >= 0 {
preLeading = leading
}
@ -576,9 +571,6 @@ func readPts(b []byte) (fb uint8, pts int64) {
return
}
func defaultOnAudio(payload []byte, dts int64, pts int64) {
// noop
}
func defaultOnVideo(payload []byte, dts int64, pts int64) {
// noop
func defaultOnAvPacket(packet *base.AvPacket) {
}

@ -10,7 +10,7 @@ package gb28181
import (
"encoding/hex"
"fmt"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazamd5"
"io/ioutil"
"os"
@ -66,9 +66,7 @@ var goldenRtpList = []string{
}
func TestPsUnpacker(t *testing.T) {
unpacker := NewPsUnpacker().WithOnVideo(func(payload []byte, dts int64, pts int64) {
})
unpacker := NewPsUnpacker()
for i, item := range goldenRtpList {
nazalog.Debugf("%d", i)
@ -129,16 +127,20 @@ func test1() {
nazalog.Assert(nil, err)
waitingSps := true
unpacker := NewPsUnpacker().WithOnVideo(func(payload []byte, dts int64, pts int64) {
nazalog.Debugf("[test1] onVideo. length=%d", len(payload))
unpacker := NewPsUnpacker().WithOnAvPacket(func(packet *base.AvPacket) {
if !packet.IsVideo() {
return
}
nazalog.Debugf("[test1] onVideo. length=%d", len(packet.Payload))
if waitingSps {
if avc.ParseNaluType(payload[4]) == avc.NaluTypeSps {
if avc.ParseNaluType(packet.Payload[4]) == avc.NaluTypeSps {
waitingSps = false
} else {
return
}
}
_, _ = fp.Write(payload)
_, _ = fp.Write(packet.Payload)
})
unpacker.FeedRtpBody(b, 0)
@ -147,38 +149,3 @@ func test1() {
nazalog.Assert(nil, err)
nazalog.Assert("fd8dbe365152e212bf8cbabb7a99c1aa", nazamd5.Md5(out))
}
func test2() {
// 一个udp包一个文件按行分隔hex stream格式如下
// 8060 0000 0000 0000 0beb c567 0000 01ba
// 46ab 1ea9 4401 0139 9ffe ffff 0094 ab0d
fp, err := os.Create("/tmp/udp2.h264")
nazalog.Assert(nil, err)
defer fp.Close()
fp2, err := os.Create("/tmp/udp2.aac")
nazalog.Assert(nil, err)
defer fp2.Close()
unpacker := NewPsUnpacker().WithOnAudio(func(payload []byte, dts int64, pts int64) {
nazalog.Infof("[test2] onAudio. length=%d, dts=%d", len(payload), dts)
_, _ = fp2.Write(payload)
}).WithOnVideo(func(payload []byte, dts int64, pts int64) {
nazalog.Infof("[test2] onVideo. length=%d, dts=%d", len(payload), dts)
_, _ = fp.Write(payload)
})
for i := 1; i < 1000; i++ {
//filename := fmt.Sprintf("/tmp/rtp-h264-aac/%d.ps", i)
filename := fmt.Sprintf("/tmp/rtp-ps-video/%d.ps", i)
b, err := ioutil.ReadFile(filename)
if err != nil {
nazalog.Errorf("%+v", err)
return
}
nazalog.Debugf("[test2] %d: %s", i, hex.EncodeToString(b[12:]))
unpacker.FeedRtpPacket(b)
}
}

@ -17,11 +17,11 @@ import (
"github.com/q191201771/lal/pkg/remux"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/naza/pkg/connection"
)
// TODO(chef):
// 规范检查
// 1. 所有interface以I开头
// TODO(chef): 检查所有 interface是否以I开头 202207
// TODO(chef): 增加 gb28181.PubSession 202207
var (
_ base.ISession = &rtmp.ServerSession{}
@ -189,3 +189,5 @@ var _ rtsp.IInterleavedPacketWriter = &rtsp.PubSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.SubSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.ClientCommandSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.ServerCommandSession{}
var _ base.IStatable = connection.New(nil)

@ -135,10 +135,14 @@ func (s *ServerSession) Flush() error {
return s.conn.Flush()
}
// ----- IServerSessionLifecycle ---------------------------------------------------------------------------------------
func (s *ServerSession) Dispose() error {
return s.dispose(nil)
}
// ----- ISessionUrlContext --------------------------------------------------------------------------------------------
func (s *ServerSession) Url() string {
return s.url
}
@ -155,6 +159,8 @@ func (s *ServerSession) RawQuery() string {
return s.rawQuery
}
// ----- IObject -------------------------------------------------------------------------------------------------------
func (s *ServerSession) UniqueKey() string {
return s.sessionStat.UniqueKey()
}

@ -1,3 +1,11 @@
// Copyright 2022, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtprtcp
type RtpPacketListItem struct {
@ -23,7 +31,6 @@ type RtpPacketList struct {
maxSize int
}
// IsStale 是否过期
//
func (l *RtpPacketList) IsStale(seq uint16) bool {
@ -112,7 +119,6 @@ func (l *RtpPacketList) IsFirstSequential() bool {
return SubSeq(first.Packet.Header.Seq, l.unpackedSeq) == 1
}
// SetUnpackedSeq 设置最新的合成帧成功的包序号
//
func (l *RtpPacketList) SetUnpackedSeq(seq uint16) {

@ -8,11 +8,10 @@
package rtprtcp
type RtpUnpackContainer struct {
unpackerProtocol IRtpUnpackerProtocol
list RtpPacketList
list RtpPacketList
}
func NewRtpUnpackContainer(maxSize int, unpackerProtocol IRtpUnpackerProtocol) *RtpUnpackContainer {

@ -229,7 +229,7 @@ func (session *ClientCommandSession) doContext(ctx context.Context, rawUrl strin
select {
case <-ctx.Done():
_ = session.dispose(nil)
_ = session.dispose(ctx.Err())
return ctx.Err()
case err := <-errChan:
if err != nil {

Loading…
Cancel
Save