diff --git a/pkg/base/avpacket.go b/pkg/base/avpacket.go index 65becf8..b737da7 100644 --- a/pkg/base/avpacket.go +++ b/pkg/base/avpacket.go @@ -14,6 +14,8 @@ import ( "github.com/q191201771/naza/pkg/nazabytes" ) +// --------------------------------------------------------------------------------------------------------------------- + type AvPacketPt int const ( @@ -37,11 +39,15 @@ func (a AvPacketPt) ReadableString() string { return "" } +// --------------------------------------------------------------------------------------------------------------------- + // AvPacket // // 不同场景使用时,字段含义可能不同。 // 使用AvPacket的地方,应注明各字段的含义。 // +// +// type AvPacket struct { PayloadType AvPacketPt Timestamp int64 // 如无特殊说明,此字段是Dts @@ -58,6 +64,10 @@ func (packet *AvPacket) IsVideo() bool { } func (packet *AvPacket) DebugString() string { - return fmt.Sprintf("[%p] type=%s, timestamp=%d, len=%d, payload=%s", - packet, packet.PayloadType.ReadableString(), packet.Timestamp, len(packet.Payload), hex.Dump(nazabytes.Prefix(packet.Payload, 32))) + return fmt.Sprintf("[%p] type=%s, timestamp=%d, pts=%d, len=%d, payload=%s", + packet, packet.PayloadType.ReadableString(), packet.Timestamp, packet.Pts, len(packet.Payload), hex.Dump(nazabytes.Prefix(packet.Payload, 32))) } + +// --------------------------------------------------------------------------------------------------------------------- + +type OnAvPacketFunc func(packet *AvPacket) diff --git a/pkg/base/basic_session_stat.go b/pkg/base/basic_session_stat.go index 7abeb92..ab1cef4 100644 --- a/pkg/base/basic_session_stat.go +++ b/pkg/base/basic_session_stat.go @@ -13,6 +13,10 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) +type IStatable interface { + GetStat() connection.Stat // TODO(chef): [refactor] 考虑为 nazanet.UdpConnection 实现这个接口 +} + // BasicSessionStat // // 包含两部分功能: @@ -76,6 +80,10 @@ func NewBasicSessionStat(sessionType SessionType, remoteAddr string) BasicSessio s.stat.SessionId = GenUkFlvSubSession() s.stat.BaseType = SessionBaseTypeSubStr s.stat.Protocol = SessionProtocolFlvStr + case SessionTypePsPub: + s.stat.SessionId = GenUkPsPubSession() + s.stat.BaseType = SessionBaseTypePubStr + s.stat.Protocol = SessionProtocolPsStr } return s } @@ -102,7 +110,7 @@ func (s *BasicSessionStat) UpdateStat(intervalSec uint32) { s.updateStat(s.currConnStat.ReadBytesSum.Load(), s.currConnStat.WroteBytesSum.Load(), s.stat.BaseType, intervalSec) } -func (s *BasicSessionStat) UpdateStatWitchConn(conn connection.Connection, intervalSec uint32) { +func (s *BasicSessionStat) UpdateStatWitchConn(conn IStatable, intervalSec uint32) { currStat := conn.GetStat() s.updateStat(currStat.ReadBytesSum, currStat.WroteBytesSum, s.stat.BaseType, intervalSec) } @@ -113,7 +121,7 @@ func (s *BasicSessionStat) GetStat() StatSession { return s.stat } -func (s *BasicSessionStat) GetStatWithConn(conn connection.Connection) StatSession { +func (s *BasicSessionStat) GetStatWithConn(conn IStatable) StatSession { connStat := conn.GetStat() s.stat.ReadBytesSum = connStat.ReadBytesSum s.stat.WroteBytesSum = connStat.WroteBytesSum @@ -124,7 +132,7 @@ func (s *BasicSessionStat) IsAlive() (readAlive, writeAlive bool) { return s.isAlive(s.currConnStat.ReadBytesSum.Load(), s.currConnStat.WroteBytesSum.Load()) } -func (s *BasicSessionStat) IsAliveWitchConn(conn connection.Connection) (readAlive, writeAlive bool) { +func (s *BasicSessionStat) IsAliveWitchConn(conn IStatable) (readAlive, writeAlive bool) { currStat := conn.GetStat() return s.isAlive(currStat.ReadBytesSum, currStat.WroteBytesSum) } diff --git a/pkg/base/t_session.go b/pkg/base/t_session.go index eb59a48..e0e95d3 100644 --- a/pkg/base/t_session.go +++ b/pkg/base/t_session.go @@ -38,12 +38,14 @@ const ( SessionTypeFlvSub SessionType = SessionProtocolFlv<<8 | SessionBaseTypeSub SessionTypeFlvPull SessionType = SessionProtocolFlv<<8 | SessionBaseTypePull SessionTypeTsSub SessionType = SessionProtocolTs<<8 | SessionBaseTypeSub + SessionTypePsPub SessionType = SessionProtocolPs<<8 | SessionBaseTypePub SessionProtocolCustomize = 1 SessionProtocolRtmp = 2 SessionProtocolRtsp = 3 SessionProtocolFlv = 4 SessionProtocolTs = 5 + SessionProtocolPs = 6 SessionBaseTypePubSub = 1 SessionBaseTypePub = 2 @@ -56,6 +58,7 @@ const ( SessionProtocolRtspStr = "RTSP" SessionProtocolFlvStr = "FLV" SessionProtocolTsStr = "TS" + SessionProtocolPsStr = "PS" SessionBaseTypePubSubStr = "PUBSUB" SessionBaseTypePubStr = "PUB" @@ -64,38 +67,6 @@ const ( SessionBaseTypePullStr = "PULL" ) -//func (protocol SessionProtocol) Stringify() string { -// switch protocol { -// case SessionProtocolCustomize: -// return SessionProtocolCustomizeStr -// case SessionProtocolRtmp: -// return SessionProtocolRtmpStr -// case SessionProtocolRtsp: -// return SessionProtocolRtspStr -// case SessionProtocolFlv: -// return SessionProtocolFlvStr -// case SessionProtocolTs: -// return SessionProtocolTsStr -// } -// return "INVALID" -//} -// -//func (typ SessionBaseType) Stringify() string { -// switch typ { -// case SessionBaseTypePubSub: -// return SessionBaseTypePubSubStr -// case SessionBaseTypePub: -// return SessionBaseTypePubStr -// case SessionBaseTypeSub: -// return SessionBaseTypeSubStr -// case SessionBaseTypePush: -// return SessionBaseTypePushStr -// case SessionBaseTypePull: -// return SessionBaseTypePullStr -// } -// return "INVALID" -//} - type ISession interface { ISessionUrlContext IObject @@ -218,13 +189,6 @@ type IObject interface { UniqueKey() string } -//type ISessionType interface { -// Protocol() string -// BaseType() string -// -// //UniqueKey() string -//} - // TODO chef: rtmp.ClientSession修改为BaseClientSession更好些 // TODO(chef): [refactor] 整理 subsession 接口部分 IsFresh 和 ShouldWaitVideoKeyFrame diff --git a/pkg/base/t_unique.go b/pkg/base/t_unique.go index f3bb42d..79fe9ce 100644 --- a/pkg/base/t_unique.go +++ b/pkg/base/t_unique.go @@ -22,6 +22,7 @@ const ( UkPreFlvSubSession = SessionProtocolFlvStr + SessionBaseTypePubSubStr // "FLVSUB" UkPreFlvPullSession = SessionProtocolFlvStr + SessionBaseTypePullStr // "FLVPULL" UkPreTsSubSession = SessionProtocolTsStr + SessionBaseTypePubSubStr // "TSSUB" + UkPrePsPubSession = SessionProtocolPsStr + SessionBaseTypePubStr // "PSPUB" UkPreRtspServerCommandSession = "RTSPSRVCMD" // 这个不暴露给上层 @@ -82,6 +83,10 @@ func GenUkFlvPullSession() string { return siUkFlvPullSession.GenUniqueKey() } +func GenUkPsPubSession() string { + return siUkPsPubSession.GenUniqueKey() +} + func GenUkGroup() string { return siUkGroup.GenUniqueKey() } @@ -107,6 +112,7 @@ var ( siUkFlvSubSession *unique.SingleGenerator siUkTsSubSession *unique.SingleGenerator siUkFlvPullSession *unique.SingleGenerator + siUkPsPubSession *unique.SingleGenerator siUkGroup *unique.SingleGenerator siUkHlsMuxer *unique.SingleGenerator @@ -126,6 +132,7 @@ func init() { siUkFlvSubSession = unique.NewSingleGenerator(UkPreFlvSubSession) siUkTsSubSession = unique.NewSingleGenerator(UkPreTsSubSession) siUkFlvPullSession = unique.NewSingleGenerator(UkPreFlvPullSession) + siUkPsPubSession = unique.NewSingleGenerator(UkPrePsPubSession) siUkGroup = unique.NewSingleGenerator(UkPreGroup) siUkHlsMuxer = unique.NewSingleGenerator(UkPreHlsMuxer) diff --git a/pkg/gb28181/gb28181.go b/pkg/gb28181/gb28181.go index c6149fd..d4b0518 100644 --- a/pkg/gb28181/gb28181.go +++ b/pkg/gb28181/gb28181.go @@ -8,13 +8,20 @@ package gb28181 -import "errors" +import ( + "errors" + "github.com/q191201771/naza/pkg/nazalog" +) // TODO(chef): gb28181 package处于开发中阶段,请不使用 // TODO(chef): [opt] rtp排序 202206 // TODO(chef): [test] 保存rtp数据,用于回放分析 202206 // TODO(chef): [perf] 优化ps解析,内存块 202207 +var ( + Log = nazalog.GetGlobalLogger() +) + // ErrGb28181 TODO(chef): [refactor] move to pkg base 202207 // var ErrGb28181 = errors.New("lal.gb28181: fxxk") diff --git a/pkg/gb28181/ps.go b/pkg/gb28181/ps.go index b0b0893..522bf37 100644 --- a/pkg/gb28181/ps.go +++ b/pkg/gb28181/ps.go @@ -27,17 +27,15 @@ const ( ) -type PsStreamType int - const ( - StreamTypeH264 PsStreamType = 0x1b - StreamTypeH265 = 0x24 - StreamTypeAAC = 0x0f - StreamTypeG711A = 0x90 //PCMA - StreamTypeG7221 = 0x92 - StreamTypeG7231 = 0x93 - StreamTypeG729 = 0x99 - StreamTypeUnknown = -1 + StreamTypeH264 uint8 = 0x1b + StreamTypeH265 = 0x24 + StreamTypeAAC = 0x0f + StreamTypeG711A = 0x90 //PCMA + StreamTypeG7221 = 0x92 + StreamTypeG7231 = 0x93 + StreamTypeG729 = 0x99 + StreamTypeUnknown = -1 ) const psBufInitSize = 4096 diff --git a/pkg/gb28181/pub_session.go b/pkg/gb28181/pub_session.go index aeb6a63..c370702 100644 --- a/pkg/gb28181/pub_session.go +++ b/pkg/gb28181/pub_session.go @@ -9,18 +9,38 @@ package gb28181 import ( - "github.com/q191201771/lal/pkg/rtprtcp" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/nazanet" "net" + "sync" ) type PubSession struct { - conn *nazanet.UdpConnection - rtprtcp.IRtpUnpacker + unpacker *PsUnpacker + + streamName string + + conn *nazanet.UdpConnection + sessionStat base.BasicSessionStat + + disposeOnce sync.Once } func NewPubSession() *PubSession { - return &PubSession{} + return &PubSession{ + unpacker: NewPsUnpacker(), + sessionStat: base.NewBasicSessionStat(base.SessionTypePsPub, ""), + } +} + +func (session *PubSession) WithOnAvPacket(onAvPacket base.OnAvPacketFunc) *PubSession { + session.unpacker.WithOnAvPacket(onAvPacket) + return session +} + +func (session *PubSession) WithStreamName(streamName string) *PubSession { + session.streamName = streamName + return session } func (session *PubSession) RunLoop(addr string) error { @@ -32,7 +52,77 @@ func (session *PubSession) RunLoop(addr string) error { return err } err = session.conn.RunLoop(func(b []byte, raddr *net.UDPAddr, err error) bool { + session.sessionStat.AddReadBytes(len(b)) + session.unpacker.FeedRtpPacket(b) return true }) return err } + +// ----- IServerSessionLifecycle --------------------------------------------------------------------------------------- + +func (session *PubSession) Dispose() error { + return session.dispose(nil) +} + +// ----- ISessionUrlContext -------------------------------------------------------------------------------------------- + +func (session *PubSession) Url() string { + Log.Warnf("[%s] PubSession.Url() is not implemented", session.UniqueKey()) + return "invalid" +} + +func (session *PubSession) AppName() string { + Log.Warnf("[%s] PubSession.AppName() is not implemented", session.UniqueKey()) + return "invalid" +} + +func (session *PubSession) StreamName() string { + // 如果stream name没有设置,则使用session的unique key作为stream name + if session.streamName == "" { + return session.UniqueKey() + } + return session.streamName +} + +func (session *PubSession) RawQuery() string { + Log.Warnf("[%s] PubSession.RawQuery() is not implemented", session.UniqueKey()) + return "invalid" +} + +// ----- IObject ------------------------------------------------------------------------------------------------------- + +func (session *PubSession) UniqueKey() string { + return session.sessionStat.UniqueKey() +} + +// ----- ISessionStat -------------------------------------------------------------------------------------------------- + +func (session *PubSession) UpdateStat(intervalSec uint32) { + session.sessionStat.UpdateStat(intervalSec) +} + +func (session *PubSession) GetStat() base.StatSession { + return session.sessionStat.GetStat() +} + +func (session *PubSession) IsAlive() (readAlive, writeAlive bool) { + return session.sessionStat.IsAlive() +} + +// --------------------------------------------------------------------------------------------------------------------- + +// --------------------------------------------------------------------------------------------------------------------- + +func (session *PubSession) dispose(err error) error { + var retErr error + session.disposeOnce.Do(func() { + Log.Infof("[%s] lifecycle dispose rtmp ServerSession. err=%+v", session.UniqueKey(), err) + if session.conn == nil { + retErr = base.ErrSessionNotStarted + return + } + retErr = session.conn.Dispose() + }) + return retErr +} diff --git a/pkg/gb28181/pub_session_test.go b/pkg/gb28181/pub_session_test.go new file mode 100644 index 0000000..4df7f3d --- /dev/null +++ b/pkg/gb28181/pub_session_test.go @@ -0,0 +1,75 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package gb28181 + +import ( + "encoding/hex" + "fmt" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/nazanet" + "io/ioutil" + "os" + "testing" + "time" +) + +func TestPubSession(t *testing.T) { + //testPubSession() +} + +func testPubSession() { + // 一个udp包一个文件,按行分隔,hex stream格式如下 + // 8060 0000 0000 0000 0beb c567 0000 01ba + // 46ab 1ea9 4401 0139 9ffe ffff 0094 ab0d + + fp, err := os.Create("/tmp/udp2.h264") + nazalog.Assert(nil, err) + defer fp.Close() + + fp2, err := os.Create("/tmp/udp2.aac") + nazalog.Assert(nil, err) + defer fp2.Close() + + pool := nazanet.NewAvailUdpConnPool(1024, 10240) + port, err := pool.Peek() + nazalog.Assert(nil, err) + addr := fmt.Sprintf("127.0.0.1:%d", port) + + session := NewPubSession().WithOnAvPacket(func(packet *base.AvPacket) { + nazalog.Infof("[test2] onAvPacket. packet=%s", packet.DebugString()) + if packet.IsAudio() { + _, _ = fp2.Write(packet.Payload) + } else if packet.IsVideo() { + _, _ = fp.Write(packet.Payload) + } + }) + + go func() { + time.Sleep(100 * time.Millisecond) + + conn, err := nazanet.NewUdpConnection(func(option *nazanet.UdpConnectionOption) { + option.RAddr = addr + }) + nazalog.Assert(nil, err) + + for i := 1; i < 1000; i++ { + //filename := fmt.Sprintf("/tmp/rtp-h264-aac/%d.ps", i) + filename := fmt.Sprintf("/tmp/rtp-ps-video/%d.ps", i) + b, err := ioutil.ReadFile(filename) + nazalog.Assert(nil, err) + nazalog.Debugf("[test] %d: %s", i, hex.EncodeToString(b[12:])) + + conn.Write(b) + } + }() + + runErr := session.RunLoop(addr) + nazalog.Assert(nil, runErr) +} diff --git a/pkg/gb28181/unpack.go b/pkg/gb28181/unpack.go index fe88ba2..3c5425b 100644 --- a/pkg/gb28181/unpack.go +++ b/pkg/gb28181/unpack.go @@ -11,53 +11,38 @@ package gb28181 import ( "bytes" "encoding/hex" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/h2645" "github.com/q191201771/lal/pkg/rtprtcp" - "github.com/q191201771/lal/pkg/hevc" - - "github.com/q191201771/lal/pkg/avc" - "github.com/q191201771/naza/pkg/bele" "github.com/q191201771/naza/pkg/nazabits" "github.com/q191201771/naza/pkg/nazabytes" "github.com/q191201771/naza/pkg/nazalog" ) -type onAudioFn func(payload []byte, dts int64, pts int64) -type onVideoFn func(payload []byte, dts int64, pts int64) - -type IPsUnpackerObserver interface { - OnAudio(payload []byte, dts int64, pts int64) - - // OnVideo - // - // @param payload: annexb格式 - // - OnVideo(payload []byte, dts int64, pts int64) -} - -// PsUnpacker 解析ps(Progream Stream)流 +// PsUnpacker 解析ps(Program Stream)流 // type PsUnpacker struct { list rtprtcp.RtpPacketList buf *nazabytes.Buffer - videoBuf []byte audioBuf []byte + videoBuf []byte - videoStreamType byte - audioStreamType byte + audioStreamType uint8 + videoStreamType uint8 + audioPayloadType base.AvPacketPt + videoPayloadType base.AvPacketPt - preVideoPts int64 preAudioPts int64 - preVideoDts int64 + preVideoPts int64 preAudioDts int64 + preVideoDts int64 - preVideoRtpts int64 preAudioRtpts int64 + preVideoRtpts int64 - onAudio onAudioFn - onVideo onVideoFn + onAvPacket base.OnAvPacketFunc } func NewPsUnpacker() *PsUnpacker { @@ -67,36 +52,18 @@ func NewPsUnpacker() *PsUnpacker { preAudioPts: -1, preVideoRtpts: -1, preAudioRtpts: -1, - onAudio: defaultOnAudio, - onVideo: defaultOnVideo, + onAvPacket: defaultOnAvPacket, } p.list.InitMaxSize(maxUnpackRtpListSize) return p } -// WithOnAudio -// -// TODO(chef): [refactor] 使用AvPacket 202206 -// -func (p *PsUnpacker) WithOnAudio(onAudio func(payload []byte, dts int64, pts int64)) *PsUnpacker { - p.onAudio = onAudio +func (p *PsUnpacker) WithOnAvPacket(onAvPacket base.OnAvPacketFunc) *PsUnpacker { + p.onAvPacket = onAvPacket return p } -func (p *PsUnpacker) WithOnVideo(onVideo func(payload []byte, dts int64, pts int64)) *PsUnpacker { - p.onVideo = onVideo - return p -} - -func (p *PsUnpacker) VideoStreamType() byte { - return p.videoStreamType -} - -func (p *PsUnpacker) AudioStreamType() byte { - return p.audioStreamType -} - // FeedRtpPacket // // 注意,内部会处理丢包、乱序等问题 @@ -289,8 +256,24 @@ func (p *PsUnpacker) parsePsm(rb []byte, index int) int { // elementary_stream_info_length if streamId >= 0xe0 && streamId <= 0xef { p.videoStreamType = streamType + + switch p.videoStreamType { + case StreamTypeH264: + p.videoPayloadType = base.AvPacketPtAvc + case StreamTypeH265: + p.videoPayloadType = base.AvPacketPtHevc + default: + p.videoPayloadType = base.AvPacketPtUnknown + } } else if streamId >= 0xc0 && streamId <= 0xdf { p.audioStreamType = streamType + + switch p.audioStreamType { + case StreamTypeAAC: + p.audioPayloadType = base.AvPacketPtAac + default: + p.audioPayloadType = base.AvPacketPtUnknown + } } esil := int(bele.BeUint16(rb[i:])) nazalog.Debugf("streamType=%d, streamId=%d, esil=%d", streamType, streamId, esil) @@ -350,7 +333,12 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int) // noop } else { if p.preAudioRtpts != int64(rtpts) { - p.onAudio(p.audioBuf, p.preAudioDts, p.preAudioPts) + p.onAvPacket(&base.AvPacket{ + PayloadType: p.audioPayloadType, + Timestamp: p.preAudioDts, + Pts: p.preAudioPts, + Payload: p.audioBuf, + }) p.audioBuf = nil } else { // noop @@ -361,7 +349,12 @@ func (p *PsUnpacker) parseAvStream(code int, rtpts uint32, rb []byte, index int) } } else { if pts != p.preAudioPts && p.preAudioPts >= 0 { - p.onAudio(p.audioBuf, p.preAudioDts, p.preAudioPts) + p.onAvPacket(&base.AvPacket{ + PayloadType: p.audioPayloadType, + Timestamp: p.preAudioDts, + Pts: p.preAudioPts, + Payload: p.audioBuf, + }) p.audioBuf = nil } else { // noop @@ -487,12 +480,14 @@ func (p *PsUnpacker) iterateNaluByStartCode(code int, pts, dts int64) { nalu = p.videoBuf[startPos:] } startPos = nextPos - if p.videoStreamType == StreamTypeH265 { - nazalog.Debugf("Video code=%d, length=%d,pts=%d, dts=%d, type=%s", code, len(nalu), pts, dts, hevc.ParseNaluTypeReadable(nalu[preLeading])) - } else { - nazalog.Debugf("Video code=%d, length=%d,pts=%d, dts=%d, type=%s", code, len(nalu), pts, dts, avc.ParseNaluTypeReadable(nalu[preLeading])) - } - p.onVideo(nalu, dts, pts) + + p.onAvPacket(&base.AvPacket{ + PayloadType: p.videoPayloadType, + Timestamp: dts, + Pts: pts, + Payload: nalu, + }) + if nextPos >= 0 { preLeading = leading } @@ -576,9 +571,6 @@ func readPts(b []byte) (fb uint8, pts int64) { return } -func defaultOnAudio(payload []byte, dts int64, pts int64) { - // noop -} -func defaultOnVideo(payload []byte, dts int64, pts int64) { - // noop +func defaultOnAvPacket(packet *base.AvPacket) { + } diff --git a/pkg/gb28181/unpack_test.go b/pkg/gb28181/unpack_test.go index 5f374ad..7d6bed3 100644 --- a/pkg/gb28181/unpack_test.go +++ b/pkg/gb28181/unpack_test.go @@ -10,7 +10,7 @@ package gb28181 import ( "encoding/hex" - "fmt" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/nazamd5" "io/ioutil" "os" @@ -66,9 +66,7 @@ var goldenRtpList = []string{ } func TestPsUnpacker(t *testing.T) { - unpacker := NewPsUnpacker().WithOnVideo(func(payload []byte, dts int64, pts int64) { - - }) + unpacker := NewPsUnpacker() for i, item := range goldenRtpList { nazalog.Debugf("%d", i) @@ -129,16 +127,20 @@ func test1() { nazalog.Assert(nil, err) waitingSps := true - unpacker := NewPsUnpacker().WithOnVideo(func(payload []byte, dts int64, pts int64) { - nazalog.Debugf("[test1] onVideo. length=%d", len(payload)) + unpacker := NewPsUnpacker().WithOnAvPacket(func(packet *base.AvPacket) { + if !packet.IsVideo() { + return + } + + nazalog.Debugf("[test1] onVideo. length=%d", len(packet.Payload)) if waitingSps { - if avc.ParseNaluType(payload[4]) == avc.NaluTypeSps { + if avc.ParseNaluType(packet.Payload[4]) == avc.NaluTypeSps { waitingSps = false } else { return } } - _, _ = fp.Write(payload) + _, _ = fp.Write(packet.Payload) }) unpacker.FeedRtpBody(b, 0) @@ -147,38 +149,3 @@ func test1() { nazalog.Assert(nil, err) nazalog.Assert("fd8dbe365152e212bf8cbabb7a99c1aa", nazamd5.Md5(out)) } - -func test2() { - // 一个udp包一个文件,按行分隔,hex stream格式如下 - // 8060 0000 0000 0000 0beb c567 0000 01ba - // 46ab 1ea9 4401 0139 9ffe ffff 0094 ab0d - - fp, err := os.Create("/tmp/udp2.h264") - nazalog.Assert(nil, err) - defer fp.Close() - - fp2, err := os.Create("/tmp/udp2.aac") - nazalog.Assert(nil, err) - defer fp2.Close() - - unpacker := NewPsUnpacker().WithOnAudio(func(payload []byte, dts int64, pts int64) { - nazalog.Infof("[test2] onAudio. length=%d, dts=%d", len(payload), dts) - _, _ = fp2.Write(payload) - }).WithOnVideo(func(payload []byte, dts int64, pts int64) { - nazalog.Infof("[test2] onVideo. length=%d, dts=%d", len(payload), dts) - _, _ = fp.Write(payload) - }) - - for i := 1; i < 1000; i++ { - //filename := fmt.Sprintf("/tmp/rtp-h264-aac/%d.ps", i) - filename := fmt.Sprintf("/tmp/rtp-ps-video/%d.ps", i) - b, err := ioutil.ReadFile(filename) - if err != nil { - nazalog.Errorf("%+v", err) - return - } - - nazalog.Debugf("[test2] %d: %s", i, hex.EncodeToString(b[12:])) - unpacker.FeedRtpPacket(b) - } -} diff --git a/pkg/innertest/iface_impl.go b/pkg/innertest/iface_impl.go index ac14f77..623ada3 100644 --- a/pkg/innertest/iface_impl.go +++ b/pkg/innertest/iface_impl.go @@ -17,11 +17,11 @@ import ( "github.com/q191201771/lal/pkg/remux" "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/lal/pkg/rtsp" + "github.com/q191201771/naza/pkg/connection" ) -// TODO(chef): -// 规范检查 -// 1. 所有interface以I开头 +// TODO(chef): 检查所有 interface是否以I开头 202207 +// TODO(chef): 增加 gb28181.PubSession 202207 var ( _ base.ISession = &rtmp.ServerSession{} @@ -189,3 +189,5 @@ var _ rtsp.IInterleavedPacketWriter = &rtsp.PubSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.SubSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.ClientCommandSession{} var _ rtsp.IInterleavedPacketWriter = &rtsp.ServerCommandSession{} + +var _ base.IStatable = connection.New(nil) diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index 7a8cf84..9069f01 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -135,10 +135,14 @@ func (s *ServerSession) Flush() error { return s.conn.Flush() } +// ----- IServerSessionLifecycle --------------------------------------------------------------------------------------- + func (s *ServerSession) Dispose() error { return s.dispose(nil) } +// ----- ISessionUrlContext -------------------------------------------------------------------------------------------- + func (s *ServerSession) Url() string { return s.url } @@ -155,6 +159,8 @@ func (s *ServerSession) RawQuery() string { return s.rawQuery } +// ----- IObject ------------------------------------------------------------------------------------------------------- + func (s *ServerSession) UniqueKey() string { return s.sessionStat.UniqueKey() } diff --git a/pkg/rtprtcp/rtp_packet_list.go b/pkg/rtprtcp/rtp_packet_list.go index a127c19..2dad82b 100644 --- a/pkg/rtprtcp/rtp_packet_list.go +++ b/pkg/rtprtcp/rtp_packet_list.go @@ -1,3 +1,11 @@ +// Copyright 2022, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + package rtprtcp type RtpPacketListItem struct { @@ -23,7 +31,6 @@ type RtpPacketList struct { maxSize int } - // IsStale 是否过期 // func (l *RtpPacketList) IsStale(seq uint16) bool { @@ -112,7 +119,6 @@ func (l *RtpPacketList) IsFirstSequential() bool { return SubSeq(first.Packet.Header.Seq, l.unpackedSeq) == 1 } - // SetUnpackedSeq 设置最新的合成帧成功的包序号 // func (l *RtpPacketList) SetUnpackedSeq(seq uint16) { diff --git a/pkg/rtprtcp/rtp_unpack_container.go b/pkg/rtprtcp/rtp_unpack_container.go index 7a61db5..bade73d 100644 --- a/pkg/rtprtcp/rtp_unpack_container.go +++ b/pkg/rtprtcp/rtp_unpack_container.go @@ -8,11 +8,10 @@ package rtprtcp - type RtpUnpackContainer struct { unpackerProtocol IRtpUnpackerProtocol - list RtpPacketList + list RtpPacketList } func NewRtpUnpackContainer(maxSize int, unpackerProtocol IRtpUnpackerProtocol) *RtpUnpackContainer { diff --git a/pkg/rtsp/client_command_session.go b/pkg/rtsp/client_command_session.go index a8e3c5a..a6eb011 100644 --- a/pkg/rtsp/client_command_session.go +++ b/pkg/rtsp/client_command_session.go @@ -229,7 +229,7 @@ func (session *ClientCommandSession) doContext(ctx context.Context, rawUrl strin select { case <-ctx.Done(): - _ = session.dispose(nil) + _ = session.dispose(ctx.Err()) return ctx.Err() case err := <-errChan: if err != nil {