diff --git a/README.md b/README.md index e9881f7..1f0df01 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ - [x] **HTTP文件服务器**。比如HLS切片文件可直接播放,不需要额外的HTTP文件服务器 - [x] **秒开播放**。GOP缓冲 -除了lalserver,还提供一些基于lal开发的demo: [《lal/app/demo》](https://github.com/q191201771/lal/blob/master/app/demo/README.md) +除了lalserver,还提供一些基于lal开发的demo(比如客户端程序): [《lal/app/demo》](https://github.com/q191201771/lal/blob/master/app/demo/README.md) Wide diff --git a/app/demo/README.md b/app/demo/README.md index fc3f1fe..286d2d4 100644 --- a/app/demo/README.md +++ b/app/demo/README.md @@ -1,13 +1,14 @@ `/app/demo`示例程序功能简介: -| demo | push rtmp | pull rtmp | pull httpflv | pull rtsp | 说明 | -| - | - | - | - | - | - | -| pushrtmp | ✔ | . | . | . | RTMP推流客户端;压力测试工具 | -| pullrtmp | . | ✔ | . | . | RTMP拉流客户端;压力测试工具 | -| pullrtmp2hls | . | ✔ | . | . | 从远端服务器拉取RTMP流,存储为本地m3u8+ts文件 | -| pullhttpflv | . | . | ✔ | . | HTTP-FLV拉流客户端 | -| pullrtsp | . | . | . | ✔ | RTSP拉流客户端 | -| pullrtsp2pushrtmp | ✔ | . | . | ✔ | RTSP拉流,并使用RTMP转推出去 | -| analyseflv | . | . | ✔ | . | 拉取HTTP-FLV流,并进行分析 | +| demo | push rtmp | push rtsp | pull rtmp | pull httpflv | pull rtsp | 说明 | +| - | - | - | - | - | - | - | +| pushrtmp | ✔ | . | . | . | . | RTMP推流客户端;压力测试工具 | +| pullrtmp | . | . | ✔ | . | . | RTMP拉流客户端;压力测试工具 | +| pullrtmp2hls | . | . | ✔ | . | . | 从远端服务器拉取RTMP流,存储为本地m3u8+ts文件 | +| pullhttpflv | . | . | . | ✔ | . | HTTP-FLV拉流客户端 | +| pullrtsp | . | . | . | . | ✔ | RTSP拉流客户端 | +| pullrtsp2pushrtsp | . | ✔ | . | . | ✔ | RTSP拉流,并使用RTSP转推出去 | +| pullrtsp2pushrtmp | ✔ | . | . | . | ✔ | RTSP拉流,并使用RTMP转推出去 | +| analyseflv | . | . | . | ✔ | . | 拉取HTTP-FLV流,并进行分析 | (更具体的功能参加各源码文件的头部说明) diff --git a/app/demo/pullrtsp/pullrtsp.go b/app/demo/pullrtsp/pullrtsp.go index 5c82ad9..ee951af 100644 --- a/app/demo/pullrtsp/pullrtsp.go +++ b/app/demo/pullrtsp/pullrtsp.go @@ -72,6 +72,7 @@ func main() { }) go func() { + time.Sleep(3 * time.Second) for { rtspPullSession.UpdateStat(1) rtspStat := rtspPullSession.GetStat() diff --git a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go index b9a09fa..fd96251 100644 --- a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go +++ b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go @@ -82,6 +82,7 @@ func main() { }) go func() { + time.Sleep(3 * time.Second) for { rtspPullSession.UpdateStat(1) rtspStat := rtspPullSession.GetStat() diff --git a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go new file mode 100644 index 0000000..c651e7e --- /dev/null +++ b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go @@ -0,0 +1,101 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package main + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/rtprtcp" + "github.com/q191201771/lal/pkg/rtsp" + "github.com/q191201771/naza/pkg/nazalog" +) + +var rtpPacketChan = make(chan rtprtcp.RTPPacket, 1024) + +type Observer struct { +} + +func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) { + rtpPacketChan <- pkt +} + +func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) { + // noop +} + +func (o *Observer) OnAVPacket(pkt base.AVPacket) { + // noop +} + +func main() { + _ = nazalog.Init(func(option *nazalog.Option) { + option.AssertBehavior = nazalog.AssertFatal + }) + + inURL, outURL := parseFlag() + + o := &Observer{} + rtspPullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) { + option.PullTimeoutMS = 5000 + option.OverTCP = false + }) + + rtspPushSession := rtsp.NewPushSession(func(option *rtsp.PushSessionOption) { + option.PushTimeoutMS = 5000 + option.OverTCP = false + }) + + go func() { + time.Sleep(3 * time.Second) + for { + rtspPullSession.UpdateStat(1) + rtspStat := rtspPullSession.GetStat() + nazalog.Debugf("bitrate. rtsp pull=%dkbit/s, rtsp push=", rtspStat.Bitrate) + time.Sleep(1 * time.Second) + } + }() + + err := rtspPullSession.Pull(inURL) + nazalog.Assert(nil, err) + rawSDP, sdpLogicCtx := rtspPullSession.GetSDP() + + err = rtspPushSession.Push(outURL, rawSDP, sdpLogicCtx) + nazalog.Assert(nil, err) + + for { + select { + case err = <-rtspPullSession.Wait(): + nazalog.Infof("pull rtsp done. err=%+v", err) + return + case err = <-rtspPushSession.Wait(): + nazalog.Infof("push rtsp done. err=%+v", err) + return + case pkt := <-rtpPacketChan: + rtspPushSession.WriteRTPPacket(pkt) + } + } +} + +func parseFlag() (inURL string, outURL string) { + i := flag.String("i", "", "specify pull rtsp url") + o := flag.String("o", "", "specify push rtmp url") + flag.Parse() + if *i == "" || *o == "" { + flag.Usage() + _, _ = fmt.Fprintf(os.Stderr, `Example: + ./bin/pullrtsp2pushrtsp -i rtsp://localhost:5544/live/test110 -o rtsp://localhost:5544/live/test220 +`) + os.Exit(1) + } + return *i, *o +} diff --git a/pkg/base/unique.go b/pkg/base/unique.go index b9bf878..9e36a36 100644 --- a/pkg/base/unique.go +++ b/pkg/base/unique.go @@ -17,6 +17,7 @@ const ( UKPRTSPServerCommandSession = "RTSPSRVCMD" UKPRTSPPubSession = "RTSPPUB" UKPRTSPSubSession = "RTSPSUB" + UKPRTSPPushSession = "RTSPPUSH" UKPRTSPPullSession = "RTSPPULL" UKPFLVSubSession = "FLVSUB" UKPTSSubSession = "TSSUB" diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 69e6099..5e16127 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -77,6 +77,8 @@ type OnReadFLVTag func(tag Tag) // // @param onReadFLVTag 读取到 flv tag 数据时回调。回调结束后,PullSession 不会再使用这块 数据。 func (session *PullSession) Pull(rawURL string, onReadFLVTag OnReadFLVTag) error { + nazalog.Debugf("[%s] pull. url=%s", session.UniqueKey, rawURL) + var ( ctx context.Context cancel context.CancelFunc diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 8de9b3e..23b5e15 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -481,7 +481,6 @@ func (group *Group) OnReadRTMPAVMsg(msg base.RTMPMsg) { group.mutex.Lock() defer group.mutex.Unlock() - //nazalog.Debugf("%+v, %02x, %02x", msg.Header, msg.Payload[0], msg.Payload[1]) group.broadcastRTMP(msg) } diff --git a/pkg/remux/avpacket2rtmp.go b/pkg/remux/avpacket2rtmp.go index a7c3bb7..6073be7 100644 --- a/pkg/remux/avpacket2rtmp.go +++ b/pkg/remux/avpacket2rtmp.go @@ -17,9 +17,8 @@ import ( "github.com/q191201771/naza/pkg/bele" ) -// @param asc 如果为nil,则没有音频 -// @param vps 如果为nil,则是H264,如果不为nil,则是H265 // @return 返回的内存块为新申请的独立内存块 +// func AVConfig2RTMPMsg(asc, vps, sps, pps []byte) (metadata, ash, vsh *base.RTMPMsg, err error) { var bMetadata []byte var bVsh []byte diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 0b03bed..91253aa 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -19,7 +19,7 @@ import ( "github.com/q191201771/naza/pkg/bele" "github.com/q191201771/naza/pkg/connection" - log "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/nazalog" ) var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout") @@ -98,12 +98,14 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) StartTime: time.Now().Format("2006-01-02 15:04:05.999"), }, } - log.Infof("[%s] lifecycle new rtmp ClientSession. session=%p", uk, s) + nazalog.Infof("[%s] lifecycle new rtmp ClientSession. session=%p", uk, s) return s } // 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误 func (s *ClientSession) Do(rawURL string) error { + nazalog.Debugf("[%s] Do. url=%s", s.UniqueKey, rawURL) + var ( ctx context.Context cancel context.CancelFunc @@ -132,7 +134,7 @@ func (s *ClientSession) Flush() error { } func (s *ClientSession) Dispose() { - log.Infof("[%s] lifecycle dispose rtmp ClientSession.", s.UniqueKey) + nazalog.Infof("[%s] lifecycle dispose rtmp ClientSession.", s.UniqueKey) _ = s.conn.Close() } @@ -202,13 +204,13 @@ func (s *ClientSession) doContext(ctx context.Context, rawURL string) error { return } - log.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey, LocalChunkSize) + nazalog.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey, LocalChunkSize) if err := s.packer.writeChunkSize(s.conn, LocalChunkSize); err != nil { errChan <- err return } - log.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName()) + nazalog.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName()) if err := s.packer.writeConnect(s.conn, s.appName(), s.tcURL(), s.t == CSTPushSession); err != nil { errChan <- err return @@ -266,7 +268,7 @@ func (s *ClientSession) tcpConnect() error { } func (s *ClientSession) handshake() error { - log.Infof("[%s] > W Handshake C0+C1.", s.UniqueKey) + nazalog.Infof("[%s] > W Handshake C0+C1.", s.UniqueKey) if err := s.hc.WriteC0C1(s.conn); err != nil { return err } @@ -274,9 +276,9 @@ func (s *ClientSession) handshake() error { if err := s.hc.ReadS0S1S2(s.conn); err != nil { return err } - log.Infof("[%s] < R Handshake S0+S1+S2.", s.UniqueKey) + nazalog.Infof("[%s] < R Handshake S0+S1+S2.", s.UniqueKey) - log.Infof("[%s] > W Handshake C2.", s.UniqueKey) + nazalog.Infof("[%s] > W Handshake C2.", s.UniqueKey) if err := s.hc.WriteC2(s.conn); err != nil { return err } @@ -303,13 +305,13 @@ func (s *ClientSession) doMsg(stream *Stream) error { case base.RTMPTypeIDAck: return s.doAck(stream) case base.RTMPTypeIDUserControl: - log.Warnf("[%s] read user control message, ignore.", s.UniqueKey) + nazalog.Warnf("[%s] read user control message, ignore.", s.UniqueKey) case base.RTMPTypeIDAudio: fallthrough case base.RTMPTypeIDVideo: s.onReadRTMPAVMsg(stream.toAVMsg()) default: - log.Errorf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString()) + nazalog.Errorf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString()) panic(0) } return nil @@ -317,7 +319,7 @@ func (s *ClientSession) doMsg(stream *Stream) error { func (s *ClientSession) doAck(stream *Stream) error { seqNum := bele.BEUint32(stream.msg.buf[stream.msg.b:stream.msg.e]) - log.Infof("[%s] < R Acknowledgement. ignore. sequence number=%d.", s.UniqueKey, seqNum) + nazalog.Infof("[%s] < R Acknowledgement. ignore. sequence number=%d.", s.UniqueKey, seqNum) return nil } @@ -329,7 +331,7 @@ func (s *ClientSession) doDataMessageAMF0(stream *Stream) error { switch val { case "|RtmpSampleAccess": - log.Debugf("[%s] < R |RtmpSampleAccess, ignore.", s.UniqueKey) + nazalog.Debugf("[%s] < R |RtmpSampleAccess, ignore.", s.UniqueKey) return nil default: } @@ -350,13 +352,13 @@ func (s *ClientSession) doCommandMessage(stream *Stream) error { switch cmd { case "onBWDone": - log.Warnf("[%s] < R onBWDone. ignore.", s.UniqueKey) + nazalog.Warnf("[%s] < R onBWDone. ignore.", s.UniqueKey) case "_result": return s.doResultMessage(stream, tid) case "onStatus": return s.doOnStatusMessage(stream, tid) default: - log.Errorf("[%s] read unknown command message. cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString()) + nazalog.Errorf("[%s] read unknown command message. cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString()) } return nil @@ -378,18 +380,18 @@ func (s *ClientSession) doOnStatusMessage(stream *Stream, tid int) error { case CSTPushSession: switch code { case "NetStream.Publish.Start": - log.Infof("[%s] < R onStatus('NetStream.Publish.Start').", s.UniqueKey) + nazalog.Infof("[%s] < R onStatus('NetStream.Publish.Start').", s.UniqueKey) s.notifyDoResultSucc() default: - log.Errorf("[%s] read on status message but code field unknown. code=%s", s.UniqueKey, code) + nazalog.Errorf("[%s] read on status message but code field unknown. code=%s", s.UniqueKey, code) } case CSTPullSession: switch code { case "NetStream.Play.Start": - log.Infof("[%s] < R onStatus('NetStream.Play.Start').", s.UniqueKey) + nazalog.Infof("[%s] < R onStatus('NetStream.Play.Start').", s.UniqueKey) s.notifyDoResultSucc() default: - log.Errorf("[%s] read on status message but code field unknown. code=%s", s.UniqueKey, code) + nazalog.Errorf("[%s] read on status message but code field unknown. code=%s", s.UniqueKey, code) } } @@ -413,13 +415,13 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error { } switch code { case "NetConnection.Connect.Success": - log.Infof("[%s] < R _result(\"NetConnection.Connect.Success\").", s.UniqueKey) - log.Infof("[%s] > W createStream().", s.UniqueKey) + nazalog.Infof("[%s] < R _result(\"NetConnection.Connect.Success\").", s.UniqueKey) + nazalog.Infof("[%s] > W createStream().", s.UniqueKey) if err := s.packer.writeCreateStream(s.conn); err != nil { return err } default: - log.Errorf("[%s] unknown code. code=%v", s.UniqueKey, code) + nazalog.Errorf("[%s] unknown code. code=%v", s.UniqueKey, code) } case tidClientCreateStream: err := stream.msg.readNull() @@ -430,21 +432,21 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error { if err != nil { return err } - log.Infof("[%s] < R _result().", s.UniqueKey) + nazalog.Infof("[%s] < R _result().", s.UniqueKey) switch s.t { case CSTPullSession: - log.Infof("[%s] > W play('%s').", s.UniqueKey, s.streamNameWithRawQuery()) + nazalog.Infof("[%s] > W play('%s').", s.UniqueKey, s.streamNameWithRawQuery()) if err := s.packer.writePlay(s.conn, s.streamNameWithRawQuery(), sid); err != nil { return err } case CSTPushSession: - log.Infof("[%s] > W publish('%s').", s.UniqueKey, s.streamNameWithRawQuery()) + nazalog.Infof("[%s] > W publish('%s').", s.UniqueKey, s.streamNameWithRawQuery()) if err := s.packer.writePublish(s.conn, s.appName(), s.streamNameWithRawQuery(), sid); err != nil { return err } } default: - log.Errorf("[%s] unknown tid. tid=%d", s.UniqueKey, tid) + nazalog.Errorf("[%s] unknown tid. tid=%d", s.UniqueKey, tid) } return nil } @@ -458,15 +460,15 @@ func (s *ClientSession) doProtocolControlMessage(stream *Stream) error { switch stream.header.MsgTypeID { case base.RTMPTypeIDWinAckSize: s.peerWinAckSize = val - log.Infof("[%s] < R Window Acknowledgement Size: %d", s.UniqueKey, s.peerWinAckSize) + nazalog.Infof("[%s] < R Window Acknowledgement Size: %d", s.UniqueKey, s.peerWinAckSize) case base.RTMPTypeIDBandwidth: // TODO chef: 是否需要关注这个信令 - log.Debugf("[%s] < R Set Peer Bandwidth. ignore.", s.UniqueKey) + nazalog.Debugf("[%s] < R Set Peer Bandwidth. ignore.", s.UniqueKey) case base.RTMPTypeIDSetChunkSize: // composer内部会自动更新peer chunk size. - log.Infof("[%s] < R Set Chunk Size %d.", s.UniqueKey, val) + nazalog.Infof("[%s] < R Set Chunk Size %d.", s.UniqueKey, val) default: - log.Errorf("[%s] read unknown protocol control message. typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString()) + nazalog.Errorf("[%s] read unknown protocol control message. typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString()) } return nil } diff --git a/pkg/rtsp/base_in_session.go b/pkg/rtsp/base_in_session.go index 47d1439..ff3f6d5 100644 --- a/pkg/rtsp/base_in_session.go +++ b/pkg/rtsp/base_in_session.go @@ -10,7 +10,6 @@ package rtsp import ( "net" - "strings" "sync" "sync/atomic" @@ -35,8 +34,10 @@ type BaseCommandSession interface { type BaseInSessionObserver interface { OnRTPPacket(pkt rtprtcp.RTPPacket) - // @param asc: AAC AudioSpecificConfig,注意,如果不存在音频,则为nil - // @param vps: 视频为H264时为nil,视频为H265时不为nil + // @param asc: AAC AudioSpecificConfig,注意,如果不存在音频或音频不为AAC,则为nil + // @param vps, sps, pps 如果都为nil,则没有视频,如果sps, pps不为nil,则vps不为nil是H265,vps为nil是H264 + // + // 注意,4个参数可能同时为nil OnAVConfig(asc, vps, sps, pps []byte) // @param pkt: pkt结构体中字段含义见rtprtcp.OnAVPacket @@ -82,21 +83,21 @@ func (s *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) s.sdpLogicCtx = sdpLogicCtx s.m.Unlock() - if isSupportType(s.sdpLogicCtx.AudioPayloadType) { - s.audioUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.AudioPayloadType, s.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked) + if s.sdpLogicCtx.IsAudioUnpackable() { + s.audioUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.GetAudioPayloadTypeBase(), s.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked) } else { - nazalog.Warnf("[%s] audio unpacker not support yet. origin type=%d", s.UniqueKey, s.sdpLogicCtx.AudioPayloadTypeOrigin) + nazalog.Warnf("[%s] audio unpacker not support for this type yet.", s.UniqueKey) } - if isSupportType(s.sdpLogicCtx.VideoPayloadType) { - s.videoUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.VideoPayloadType, s.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked) + if s.sdpLogicCtx.IsVideoUnpackable() { + s.videoUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.GetVideoPayloadTypeBase(), s.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked) } else { - nazalog.Warnf("[%s] video unpacker not support yet. origin type=%d", s.UniqueKey, s.sdpLogicCtx.AudioPayloadTypeOrigin) + nazalog.Warnf("[%s] video unpacker not support this type yet.", s.UniqueKey) } s.audioRRProducer = rtprtcp.NewRRProducer(s.sdpLogicCtx.AudioClockRate) s.videoRRProducer = rtprtcp.NewRRProducer(s.sdpLogicCtx.VideoClockRate) - if isSupportType(s.sdpLogicCtx.AudioPayloadType) && isSupportType(s.sdpLogicCtx.VideoPayloadType) { + if s.sdpLogicCtx.IsAudioUnpackable() && s.sdpLogicCtx.IsVideoUnpackable() { s.avPacketQueue = NewAVPacketQueue(s.onAVPacket) } @@ -109,17 +110,14 @@ func (s *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) func (s *BaseInSession) SetObserver(observer BaseInSessionObserver) { s.observer = observer - // TODO chef: 这里的判断应该去掉 - if s.sdpLogicCtx.ASC != nil && s.sdpLogicCtx.SPS != nil { - s.observer.OnAVConfig(s.sdpLogicCtx.ASC, s.sdpLogicCtx.VPS, s.sdpLogicCtx.SPS, s.sdpLogicCtx.PPS) - } + s.observer.OnAVConfig(s.sdpLogicCtx.ASC, s.sdpLogicCtx.VPS, s.sdpLogicCtx.SPS, s.sdpLogicCtx.PPS) } func (s *BaseInSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error { - if s.sdpLogicCtx.AudioAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) { + if s.sdpLogicCtx.IsAudioURI(uri) { s.audioRTPConn = rtpConn s.audioRTCPConn = rtcpConn - } else if s.sdpLogicCtx.VideoAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.VideoAControl) { + } else if s.sdpLogicCtx.IsVideoURI(uri) { s.videoRTPConn = rtpConn s.videoRTCPConn = rtcpConn } else { @@ -133,11 +131,11 @@ func (s *BaseInSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDP } func (s *BaseInSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int) error { - if s.sdpLogicCtx.AudioAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) { + if s.sdpLogicCtx.IsAudioURI(uri) { s.audioRTPChannel = rtpChannel s.audioRTCPChannel = rtcpChannel return nil - } else if s.sdpLogicCtx.VideoAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.VideoAControl) { + } else if s.sdpLogicCtx.IsVideoURI(uri) { s.videoRTPChannel = rtpChannel s.videoRTCPChannel = rtcpChannel return nil @@ -337,7 +335,7 @@ func (s *BaseInSession) handleRTPPacket(b []byte) error { } packetType := int(b[1] & 0x7F) - if packetType != s.sdpLogicCtx.AudioPayloadTypeOrigin && packetType != s.sdpLogicCtx.VideoPayloadTypeOrigin { + if !s.sdpLogicCtx.IsPayloadTypeOrigin(packetType) { return ErrRTSP } @@ -352,16 +350,8 @@ func (s *BaseInSession) handleRTPPacket(b []byte) error { pkt.Header = h pkt.Raw = b - switch packetType { - case s.sdpLogicCtx.VideoPayloadTypeOrigin: - s.videoSSRC = h.SSRC - s.observer.OnRTPPacket(pkt) - s.videoRRProducer.FeedRTPPacket(h.Seq) - - if s.videoUnpacker != nil { - s.videoUnpacker.Feed(pkt) - } - case s.sdpLogicCtx.AudioPayloadTypeOrigin: + // 接收数据时,保证了sdp的原始类型对应 + if s.sdpLogicCtx.IsAudioPayloadTypeOrigin(packetType) { s.audioSSRC = h.SSRC s.observer.OnRTPPacket(pkt) s.audioRRProducer.FeedRTPPacket(h.Seq) @@ -369,7 +359,15 @@ func (s *BaseInSession) handleRTPPacket(b []byte) error { if s.audioUnpacker != nil { s.audioUnpacker.Feed(pkt) } - default: + } else if s.sdpLogicCtx.IsVideoPayloadTypeOrigin(packetType) { + s.videoSSRC = h.SSRC + s.observer.OnRTPPacket(pkt) + s.videoRRProducer.FeedRTPPacket(h.Seq) + + if s.videoUnpacker != nil { + s.videoUnpacker.Feed(pkt) + } + } else { // 因为前面已经判断过type了,所以永远不会走到这 } diff --git a/pkg/rtsp/client_pull_session.go b/pkg/rtsp/client_pull_session.go index facde4b..f1813eb 100644 --- a/pkg/rtsp/client_pull_session.go +++ b/pkg/rtsp/client_pull_session.go @@ -99,6 +99,8 @@ func NewPullSession(observer PullSessionObserver, modOptions ...ModPullSessionOp // 如果没有错误发生,阻塞直到接收音视频数据的前一步,也即收到rtsp play response func (session *PullSession) Pull(rawURL string) error { + nazalog.Debugf("[%s] pull. url=%s", session.UniqueKey, rawURL) + var ( ctx context.Context cancel context.CancelFunc @@ -150,6 +152,10 @@ func (session *PullSession) IsAlive() (readAlive, writeAlive bool) { return session.baseInSession.IsAlive() } +func (session *PullSession) GetSDP() ([]byte, sdp.LogicContext) { + return session.baseInSession.GetSDP() +} + func (session *PullSession) pullContext(ctx context.Context, rawURL string) error { errChan := make(chan error, 1) @@ -346,24 +352,27 @@ func (session *PullSession) writeDescribe() error { } func (session *PullSession) writeSetup() error { - if session.baseInSession.sdpLogicCtx.VideoAControl != "" { + if session.baseInSession.sdpLogicCtx.HasVideoAControl() { + uri := session.baseInSession.sdpLogicCtx.MakeVideoSetupURI(session.urlCtx.RawURLWithoutUserInfo) if session.option.OverTCP { - if err := session.writeOneSetupTCP(session.baseInSession.sdpLogicCtx.VideoAControl); err != nil { + if err := session.writeOneSetupTCP(uri); err != nil { return err } } else { - if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.VideoAControl); err != nil { + if err := session.writeOneSetup(uri); err != nil { return err } } } - if session.baseInSession.sdpLogicCtx.AudioAControl != "" { + // can't else if + if session.baseInSession.sdpLogicCtx.HasAudioAControl() { + uri := session.baseInSession.sdpLogicCtx.MakeAudioSetupURI(session.urlCtx.RawURLWithoutUserInfo) if session.option.OverTCP { - if err := session.writeOneSetupTCP(session.baseInSession.sdpLogicCtx.AudioAControl); err != nil { + if err := session.writeOneSetupTCP(uri); err != nil { return err } } else { - if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.AudioAControl); err != nil { + if err := session.writeOneSetup(uri); err != nil { return err } } @@ -371,8 +380,7 @@ func (session *PullSession) writeSetup() error { return nil } -func (session *PullSession) writeOneSetup(aControl string) error { - setupURI := makeSetupURI(session.urlCtx, aControl) +func (session *PullSession) writeOneSetup(setupURI string) error { rtpC, rtpPort, rtcpC, rtcpPort, err := availUDPConnPool.Acquire2() if err != nil { return err @@ -423,8 +431,7 @@ func (session *PullSession) writeOneSetup(aControl string) error { return nil } -func (session *PullSession) writeOneSetupTCP(aControl string) error { - setupURI := makeSetupURI(session.urlCtx, aControl) +func (session *PullSession) writeOneSetupTCP(setupURI string) error { rtpChannel := session.channel rtcpChannel := session.channel + 1 session.channel += 2 diff --git a/pkg/rtsp/client_push_session.go b/pkg/rtsp/client_push_session.go new file mode 100644 index 0000000..038e3f5 --- /dev/null +++ b/pkg/rtsp/client_push_session.go @@ -0,0 +1,385 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package rtsp + +import ( + "context" + "fmt" + "net" + "strings" + "time" + + "github.com/q191201771/lal/pkg/rtprtcp" + "github.com/q191201771/lal/pkg/sdp" + "github.com/q191201771/naza/pkg/nazanet" + + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/connection" + "github.com/q191201771/naza/pkg/nazahttp" + "github.com/q191201771/naza/pkg/nazalog" +) + +// TODO chef: +// - 将push和pull中重复的代码抽象出来 +// - 将push和sub中重复的代码抽象出来 +// - push有的功能没实现,需要参考pull和sub + +const ( + pushReadBufSize = 256 +) + +type PushSessionOption struct { + PushTimeoutMS int + OverTCP bool +} + +var defaultPushSessionOption = PushSessionOption{ + PushTimeoutMS: 10000, + OverTCP: false, +} + +type PushSession struct { + UniqueKey string + option PushSessionOption + + CmdConn connection.Connection + + cseq int + sessionID string + channel int + + rawURL string + urlCtx base.URLContext + + waitErrChan chan error + + methodGetParameterSupported bool + + auth Auth + + rawSDP []byte + sdpLogicCtx sdp.LogicContext + + audioRTPConn *nazanet.UDPConnection + videoRTPConn *nazanet.UDPConnection + audioRTCPConn *nazanet.UDPConnection + videoRTCPConn *nazanet.UDPConnection +} + +type ModPushSessionOption func(option *PushSessionOption) + +func NewPushSession(modOptions ...ModPushSessionOption) *PushSession { + option := defaultPushSessionOption + for _, fn := range modOptions { + fn(&option) + } + + uk := base.GenUniqueKey(base.UKPRTSPPushSession) + s := &PushSession{ + UniqueKey: uk, + option: option, + waitErrChan: make(chan error, 1), + } + nazalog.Infof("[%s] lifecycle new rtsp PushSession. session=%p", uk, s) + return s +} + +func (session *PushSession) Push(rawURL string, rawSDP []byte, sdpLogicCtx sdp.LogicContext) error { + nazalog.Debugf("[%s] push. url=%s", session.UniqueKey, rawURL) + + session.rawSDP = rawSDP + session.sdpLogicCtx = sdpLogicCtx + + var ( + ctx context.Context + cancel context.CancelFunc + ) + if session.option.PushTimeoutMS == 0 { + ctx, cancel = context.WithCancel(context.Background()) + } else { + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(session.option.PushTimeoutMS)*time.Millisecond) + } + defer cancel() + return session.pushContext(ctx, rawURL) +} + +func (session *PushSession) Wait() <-chan error { + return session.waitErrChan +} + +func (session *PushSession) WriteRTPPacket(packet rtprtcp.RTPPacket) { + // 发送数据时,保证和sdp的原始类型对应 + t := int(packet.Header.PacketType) + if session.sdpLogicCtx.IsAudioPayloadTypeOrigin(t) { + if session.audioRTPConn != nil { + _ = session.audioRTPConn.Write(packet.Raw) + } + } else if session.sdpLogicCtx.IsVideoPayloadTypeOrigin(t) { + if session.videoRTPConn != nil { + _ = session.videoRTPConn.Write(packet.Raw) + } + } else { + nazalog.Errorf("[%s] write rtp packet but type invalid. type=%d", session.UniqueKey, t) + } +} + +func (session *PushSession) pushContext(ctx context.Context, rawURL string) error { + errChan := make(chan error, 1) + + go func() { + if err := session.connect(rawURL); err != nil { + errChan <- err + return + } + + if err := session.writeOptions(); err != nil { + errChan <- err + return + } + + if err := session.writeAnnounce(); err != nil { + errChan <- err + return + } + + if err := session.writeSetup(); err != nil { + errChan <- err + return + } + + if err := session.writeRecord(); err != nil { + errChan <- err + return + } + + }() + + return nil +} + +func (session *PushSession) connect(rawURL string) (err error) { + session.rawURL = rawURL + + session.urlCtx, err = base.ParseRTSPURL(rawURL) + if err != nil { + return err + } + + nazalog.Debugf("[%s] > tcp connect.", session.UniqueKey) + + // # 建立连接 + conn, err := net.Dial("tcp", session.urlCtx.HostWithPort) + if err != nil { + return err + } + session.CmdConn = connection.New(conn, func(option *connection.Option) { + option.ReadBufSize = pullReadBufSize + }) + + nazalog.Debugf("[%s] < tcp connect. laddr=%s", session.UniqueKey, conn.LocalAddr().String()) + + // TODO + return nil +} + +func (session *PushSession) writeOptions() error { + session.cseq++ + req := PackRequestOptions(session.urlCtx.RawURLWithoutUserInfo, session.cseq, "") + nazalog.Debugf("[%s] > write options.", session.UniqueKey) + if _, err := session.CmdConn.Write([]byte(req)); err != nil { + return err + } + ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn) + if err != nil { + return err + } + nazalog.Debugf("[%s] < read response. %s", session.UniqueKey, ctx.StatusCode) + + session.handleOptionMethods(ctx) + if err := session.handleAuth(ctx); err != nil { + return err + } + + return nil +} + +func (session *PushSession) writeAnnounce() error { + session.cseq++ + auth := session.auth.MakeAuthorization(MethodDescribe, session.urlCtx.RawURLWithoutUserInfo) + req := PackRequestAnnounce(session.urlCtx.RawURLWithoutUserInfo, session.cseq, string(session.rawSDP), auth) + nazalog.Debugf("[%s] > write announce.", session.UniqueKey) + if _, err := session.CmdConn.Write([]byte(req)); err != nil { + return err + } + ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn) + if err != nil { + return err + } + nazalog.Debugf("[%s] < read response. code=%s, body=%s", session.UniqueKey, ctx.StatusCode, string(ctx.Body)) + + return nil +} + +func (session *PushSession) writeSetup() error { + if session.sdpLogicCtx.HasVideoAControl() { + uri := session.sdpLogicCtx.MakeVideoSetupURI(session.urlCtx.RawURLWithoutUserInfo) + if session.option.OverTCP { + if err := session.writeOneSetupTCP(uri); err != nil { + return err + } + } else { + if err := session.writeOneSetup(uri); err != nil { + return err + } + } + } + // can't else if + if session.sdpLogicCtx.HasAudioAControl() { + uri := session.sdpLogicCtx.MakeAudioSetupURI(session.urlCtx.RawURLWithoutUserInfo) + if session.option.OverTCP { + if err := session.writeOneSetupTCP(uri); err != nil { + return err + } + } else { + if err := session.writeOneSetup(uri); err != nil { + return err + } + } + } + return nil +} + +func (session *PushSession) writeOneSetup(setupURI string) error { + rtpC, rtpPort, rtcpC, rtcpPort, err := availUDPConnPool.Acquire2() + if err != nil { + return err + } + + session.cseq++ + auth := session.auth.MakeAuthorization(MethodSetup, session.urlCtx.RawURLWithoutUserInfo) + req := PackRequestSetup(setupURI, session.cseq, int(rtpPort), int(rtcpPort), session.sessionID, auth) + nazalog.Debugf("[%s] > write setup.", session.UniqueKey) + if _, err := session.CmdConn.Write([]byte(req)); err != nil { + return err + } + ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn) + if err != nil { + return err + } + nazalog.Debugf("[%s] < read response. code=%s, ctx=%+v", session.UniqueKey, ctx.StatusCode, ctx) + + session.sessionID = strings.Split(ctx.Headers[HeaderFieldSession], ";")[0] + + srvRTPPort, srvRTCPPort, err := parseServerPort(ctx.Headers[HeaderFieldTransport]) + if err != nil { + return err + } + + rtpConn, err := nazanet.NewUDPConnection(func(option *nazanet.UDPConnectionOption) { + option.Conn = rtpC + option.RAddr = net.JoinHostPort(session.urlCtx.Host, fmt.Sprintf("%d", srvRTPPort)) + option.MaxReadPacketSize = rtprtcp.MaxRTPRTCPPacketSize + }) + if err != nil { + return err + } + + rtcpConn, err := nazanet.NewUDPConnection(func(option *nazanet.UDPConnectionOption) { + option.Conn = rtcpC + option.RAddr = net.JoinHostPort(session.urlCtx.Host, fmt.Sprintf("%d", srvRTCPPort)) + option.MaxReadPacketSize = rtprtcp.MaxRTPRTCPPacketSize + }) + if err != nil { + return err + } + + if err := session.setupWithConn(setupURI, rtpConn, rtcpConn); err != nil { + return err + } + + return nil +} + +func (session *PushSession) writeRecord() error { + session.cseq++ + auth := session.auth.MakeAuthorization(MethodRecord, session.urlCtx.RawURLWithoutUserInfo) + req := PackRequestRecord(session.urlCtx.RawURLWithoutUserInfo, session.cseq, session.sessionID, auth) + nazalog.Debugf("[%s] > write record.", session.UniqueKey) + if _, err := session.CmdConn.Write([]byte(req)); err != nil { + return err + } + ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn) + if err != nil { + return err + } + nazalog.Debugf("[%s] < read response. code=%s, body=%s", session.UniqueKey, ctx.StatusCode, string(ctx.Body)) + + return nil +} + +func (session *PushSession) setupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error { + if session.sdpLogicCtx.IsAudioURI(uri) { + session.audioRTPConn = rtpConn + session.audioRTCPConn = rtcpConn + } else if session.sdpLogicCtx.IsVideoURI(uri) { + session.videoRTPConn = rtpConn + session.videoRTCPConn = rtcpConn + } else { + return ErrRTSP + } + + go rtpConn.RunLoop(session.onReadUDPPacket) + go rtcpConn.RunLoop(session.onReadUDPPacket) + + return nil +} + +func (session *PushSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bool { + // TODO chef: impl me + //nazalog.Errorf("[%s] SubSession::onReadUDPPacket. %s", s.UniqueKey, hex.Dump(b)) + return true +} + +func (session *PushSession) writeOneSetupTCP(setupURI string) error { + panic("not impl") + return nil +} + +func (session *PushSession) handleOptionMethods(ctx nazahttp.HTTPRespMsgCtx) { + methods := ctx.Headers["Public"] + if methods == "" { + return + } + + if strings.Contains(methods, MethodGetParameter) { + session.methodGetParameterSupported = true + } +} + +func (session *PushSession) handleAuth(ctx nazahttp.HTTPRespMsgCtx) error { + if ctx.Headers[HeaderWWWAuthenticate] == "" { + return nil + } + + session.auth.FeedWWWAuthenticate(ctx.Headers[HeaderWWWAuthenticate], session.urlCtx.Username, session.urlCtx.Password) + auth := session.auth.MakeAuthorization(MethodOptions, session.urlCtx.RawURLWithoutUserInfo) + + session.cseq++ + req := PackRequestOptions(session.urlCtx.RawURLWithoutUserInfo, session.cseq, auth) + nazalog.Debugf("[%s] > write options.", session.UniqueKey) + if _, err := session.CmdConn.Write([]byte(req)); err != nil { + return err + } + ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn) + if err != nil { + return err + } + nazalog.Debugf("[%s] < read response. %s", session.UniqueKey, ctx.StatusCode) + return nil +} diff --git a/pkg/rtsp/pack.go b/pkg/rtsp/pack.go index 4b1d6fc..13fa0c5 100644 --- a/pkg/rtsp/pack.go +++ b/pkg/rtsp/pack.go @@ -104,20 +104,34 @@ func PackRequestOptions(uri string, cseq int, auth string) string { if auth != "" { headers[HeaderAuthorization] = auth } - return packRequest(MethodOptions, uri, headers) + return packRequest(MethodOptions, uri, headers, "") +} + +// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段 +func PackRequestAnnounce(uri string, cseq int, sdp string, auth string) string { + headers := map[string]string{ + HeaderFieldCSeq: fmt.Sprintf("%d", cseq), + HeaderFieldContentLength: fmt.Sprintf("%d", len(sdp)), + HeaderAccept: HeaderAcceptApplicationSDP, + HeaderUserAgent: base.LALRTSPPullSessionUA, + } + if auth != "" { + headers[HeaderAuthorization] = auth + } + return packRequest(MethodAnnounce, uri, headers, sdp) } // @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段 func PackRequestDescribe(uri string, cseq int, auth string) string { headers := map[string]string{ - HeaderAccept: "application/sdp", + HeaderAccept: HeaderAcceptApplicationSDP, HeaderFieldCSeq: fmt.Sprintf("%d", cseq), HeaderUserAgent: base.LALRTSPPullSessionUA, } if auth != "" { headers[HeaderAuthorization] = auth } - return packRequest(MethodDescribe, uri, headers) + return packRequest(MethodDescribe, uri, headers, "") } // @param sessionID 可以为空,如果为空,则请求中不包含`Session`字段 @@ -134,7 +148,7 @@ func PackRequestSetup(uri string, cseq int, rtpClientPort int, rtcpClientPort in if auth != "" { headers[HeaderAuthorization] = auth } - return packRequest(MethodSetup, uri, headers) + return packRequest(MethodSetup, uri, headers, "") } // @param sessionID 可以为空,如果为空,则请求中不包含`Session`字段 @@ -151,7 +165,20 @@ func PackRequestSetupTCP(uri string, cseq int, rtpChannel int, rtcpChannel int, if auth != "" { headers[HeaderAuthorization] = auth } - return packRequest(MethodSetup, uri, headers) + return packRequest(MethodSetup, uri, headers, "") +} + +func PackRequestRecord(uri string, cseq int, sessionID string, auth string) string { + headers := map[string]string{ + HeaderFieldCSeq: fmt.Sprintf("%d", cseq), + HeaderFieldRange: "npt=0.000-", + HeaderFieldSession: sessionID, + HeaderUserAgent: base.LALRTSPPullSessionUA, + } + if auth != "" { + headers[HeaderAuthorization] = auth + } + return packRequest(MethodRecord, uri, headers, "") } func PackRequestPlay(uri string, cseq int, sessionID string, auth string) string { @@ -164,7 +191,7 @@ func PackRequestPlay(uri string, cseq int, sessionID string, auth string) string if auth != "" { headers[HeaderAuthorization] = auth } - return packRequest(MethodPlay, uri, headers) + return packRequest(MethodPlay, uri, headers, "") } func PackRequestGetParameter(uri string, cseq int, sessionID string) string { @@ -215,11 +242,17 @@ func PackResponseTeardown(cseq string) string { return fmt.Sprintf(ResponseTeardownTmpl, cseq) } -func packRequest(method, uri string, headers map[string]string) (ret string) { +// @param body 可以为空 +func packRequest(method, uri string, headers map[string]string, body string) (ret string) { ret = method + " " + uri + " RTSP/1.0\r\n" for k, v := range headers { ret += k + ": " + v + "\r\n" } ret += "\r\n" + + if body != "" { + ret += body + } + return ret } diff --git a/pkg/rtsp/rtsp.go b/pkg/rtsp/rtsp.go index 71218a1..e606d6e 100644 --- a/pkg/rtsp/rtsp.go +++ b/pkg/rtsp/rtsp.go @@ -46,15 +46,19 @@ const ( MethodGetParameter = "GET_PARAMETER" ) +// TODO chef: 这里有的有Field,有的没有,命名需要统一一下 const ( - HeaderAccept = "Accept" - HeaderUserAgent = "User-Agent" - HeaderFieldCSeq = "CSeq" - HeaderFieldTransport = "Transport" - HeaderFieldSession = "Session" - HeaderFieldRange = "Range" - HeaderWWWAuthenticate = "WWW-Authenticate" - HeaderAuthorization = "Authorization" + HeaderAccept = "Accept" + HeaderUserAgent = "User-Agent" + HeaderFieldCSeq = "CSeq" + HeaderFieldTransport = "Transport" + HeaderFieldSession = "Session" + HeaderFieldRange = "Range" + HeaderFieldContentLength = "Content-Length" + HeaderWWWAuthenticate = "WWW-Authenticate" + HeaderAuthorization = "Authorization" + + HeaderAcceptApplicationSDP = "application/sdp" ) const ( @@ -166,18 +170,6 @@ func parseTransport(setupTransport string, key string) (first, second uint16, er return uint16(iFirst), uint16(iSecond), err } -func isSupportType(t base.AVPacketPT) bool { - switch t { - case base.AVPacketPTAAC: - fallthrough - case base.AVPacketPTAVC: - fallthrough - case base.AVPacketPTHEVC: - return true - } - return false -} - func makeSetupURI(urlCtx base.URLContext, aControl string) string { if strings.HasPrefix(aControl, "rtsp://") { return aControl diff --git a/pkg/rtsp/server_sub_session.go b/pkg/rtsp/server_sub_session.go index 3958cac..ee2dc44 100644 --- a/pkg/rtsp/server_sub_session.go +++ b/pkg/rtsp/server_sub_session.go @@ -10,7 +10,6 @@ package rtsp import ( "net" - "strings" "sync/atomic" "time" @@ -76,10 +75,10 @@ func (s *SubSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) { } func (s *SubSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error { - if s.sdpLogicCtx.AudioAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) { + if s.sdpLogicCtx.IsAudioURI(uri) { s.audioRTPConn = rtpConn s.audioRTCPConn = rtcpConn - } else if s.sdpLogicCtx.VideoAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.VideoAControl) { + } else if s.sdpLogicCtx.IsVideoURI(uri) { s.videoRTPConn = rtpConn s.videoRTCPConn = rtcpConn } else { @@ -95,11 +94,11 @@ func (s *SubSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPCon func (s *SubSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int, remoteAddr string) error { s.stat.RemoteAddr = remoteAddr - if s.sdpLogicCtx.AudioAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) { + if s.sdpLogicCtx.IsAudioURI(uri) { s.audioRTPChannel = rtpChannel s.audioRTCPChannel = rtcpChannel return nil - } else if s.sdpLogicCtx.VideoAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.VideoAControl) { + } else if s.sdpLogicCtx.IsVideoURI(uri) { s.videoRTPChannel = rtpChannel s.videoRTCPChannel = rtcpChannel return nil @@ -176,23 +175,24 @@ func (s *SubSession) RawQuery() string { func (s *SubSession) WriteRTPPacket(packet rtprtcp.RTPPacket) { atomic.AddUint64(&s.currConnStat.WroteBytesSum, uint64(len(packet.Raw))) - switch packet.Header.PacketType { - case uint8(s.sdpLogicCtx.VideoPayloadTypeOrigin): - if s.videoRTPConn != nil { - _ = s.videoRTPConn.Write(packet.Raw) - } - if s.videoRTPChannel != -1 { - _ = s.cmdSession.Write(s.videoRTPChannel, packet.Raw) - } - case uint8(s.sdpLogicCtx.AudioPayloadTypeOrigin): + // 发送数据时,保证和sdp的原始类型对应 + t := int(packet.Header.PacketType) + if s.sdpLogicCtx.IsAudioPayloadTypeOrigin(t) { if s.audioRTPConn != nil { _ = s.audioRTPConn.Write(packet.Raw) } if s.audioRTPChannel != -1 { _ = s.cmdSession.Write(s.audioRTPChannel, packet.Raw) } - default: - nazalog.Errorf("[%s] write rtp packet but type invalid. type=%d", s.UniqueKey, packet.Header.PacketType) + } else if s.sdpLogicCtx.IsVideoPayloadTypeOrigin(t) { + if s.videoRTPConn != nil { + _ = s.videoRTPConn.Write(packet.Raw) + } + if s.videoRTPChannel != -1 { + _ = s.cmdSession.Write(s.videoRTPChannel, packet.Raw) + } + } else { + nazalog.Errorf("[%s] write rtp packet but type invalid. type=%d", s.UniqueKey, t) } } diff --git a/pkg/sdp/avconfig.go b/pkg/sdp/avconfig.go new file mode 100644 index 0000000..4343f06 --- /dev/null +++ b/pkg/sdp/avconfig.go @@ -0,0 +1,92 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package sdp + +import ( + "encoding/base64" + "strconv" + "strings" + + "github.com/q191201771/lal/pkg/base" +) + +func ParseASC(a *AFmtPBase) ([]byte, error) { + if a.Format != base.RTPPacketTypeAAC { + return nil, ErrSDP + } + + v, ok := a.Parameters["config"] + if !ok { + return nil, ErrSDP + } + if len(v) < 4 || (len(v)%2) != 0 { + return nil, ErrSDP + } + l := len(v) / 2 + r := make([]byte, l) + for i := 0; i < l; i++ { + b, err := strconv.ParseInt(v[i*2:i*2+2], 16, 0) + if err != nil { + return nil, ErrSDP + } + r[i] = uint8(b) + } + + return r, nil +} + +func ParseVPSSPSPPS(a *AFmtPBase) (vps, sps, pps []byte, err error) { + v, ok := a.Parameters["sprop-vps"] + if !ok { + return nil, nil, nil, ErrSDP + } + if vps, err = base64.StdEncoding.DecodeString(v); err != nil { + return nil, nil, nil, err + } + + v, ok = a.Parameters["sprop-sps"] + if !ok { + return nil, nil, nil, ErrSDP + } + if sps, err = base64.StdEncoding.DecodeString(v); err != nil { + return nil, nil, nil, err + } + + v, ok = a.Parameters["sprop-pps"] + if !ok { + return nil, nil, nil, ErrSDP + } + if pps, err = base64.StdEncoding.DecodeString(v); err != nil { + return nil, nil, nil, err + } + + return +} + +// 解析AVC/H264的sps,pps +// 例子见单元测试 +func ParseSPSPPS(a *AFmtPBase) (sps, pps []byte, err error) { + v, ok := a.Parameters["sprop-parameter-sets"] + if !ok { + return nil, nil, ErrSDP + } + + items := strings.SplitN(v, ",", 2) + if len(items) != 2 { + return nil, nil, ErrSDP + } + + sps, err = base64.StdEncoding.DecodeString(items[0]) + if err != nil { + return nil, nil, ErrSDP + } + + pps, err = base64.StdEncoding.DecodeString(items[1]) + return +} diff --git a/pkg/sdp/logic.go b/pkg/sdp/logic.go new file mode 100644 index 0000000..6fd741a --- /dev/null +++ b/pkg/sdp/logic.go @@ -0,0 +1,165 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package sdp + +import ( + "fmt" + "strings" + + "github.com/q191201771/naza/pkg/nazalog" + + "github.com/q191201771/lal/pkg/base" +) + +type LogicContext struct { + AudioClockRate int + VideoClockRate int + + audioPayloadTypeBase base.AVPacketPT // lal内部定义的类型 + videoPayloadTypeBase base.AVPacketPT + + audioPayloadTypeOrigin int // 原始类型,sdp或rtp中的类型 + videoPayloadTypeOrigin int + audioAControl string + videoAControl string + + ASC []byte + VPS []byte + SPS []byte + PPS []byte + + // 没有用上的 + hasAudio bool + hasVideo bool +} + +func (lc *LogicContext) IsAudioPayloadTypeOrigin(t int) bool { + return lc.audioPayloadTypeOrigin == t +} + +func (lc *LogicContext) IsVideoPayloadTypeOrigin(t int) bool { + return lc.videoPayloadTypeOrigin == t +} + +func (lc *LogicContext) IsPayloadTypeOrigin(t int) bool { + return lc.audioPayloadTypeOrigin == t || lc.videoPayloadTypeOrigin == t +} + +func (lc *LogicContext) IsAudioUnpackable() bool { + return lc.audioPayloadTypeBase == base.AVPacketPTAAC +} + +func (lc *LogicContext) IsVideoUnpackable() bool { + return lc.videoPayloadTypeBase == base.AVPacketPTAVC || + lc.videoPayloadTypeBase == base.AVPacketPTHEVC +} + +func (lc *LogicContext) IsAudioURI(uri string) bool { + return lc.audioAControl != "" && strings.HasSuffix(uri, lc.audioAControl) +} + +func (lc *LogicContext) IsVideoURI(uri string) bool { + return lc.videoAControl != "" && strings.HasSuffix(uri, lc.videoAControl) +} + +func (lc *LogicContext) HasAudioAControl() bool { + return lc.audioAControl != "" +} + +func (lc *LogicContext) HasVideoAControl() bool { + return lc.videoAControl != "" +} + +func (lc *LogicContext) MakeAudioSetupURI(uri string) string { + return lc.makeSetupURI(uri, lc.audioAControl) +} + +func (lc *LogicContext) MakeVideoSetupURI(uri string) string { + return lc.makeSetupURI(uri, lc.videoAControl) +} + +func (lc *LogicContext) GetAudioPayloadTypeBase() base.AVPacketPT { + return lc.audioPayloadTypeBase +} + +func (lc *LogicContext) GetVideoPayloadTypeBase() base.AVPacketPT { + return lc.videoPayloadTypeBase +} + +func (lc *LogicContext) makeSetupURI(uri string, aControl string) string { + if strings.HasPrefix(aControl, "rtsp://") { + return aControl + } + return fmt.Sprintf("%s/%s", uri, aControl) +} + +func ParseSDP2LogicContext(b []byte) (LogicContext, error) { + var ret LogicContext + + c, err := ParseSDP2RawContext(b) + if err != nil { + return ret, err + } + + for _, md := range c.MediaDescList { + switch md.M.Media { + case "audio": + ret.hasAudio = true + ret.AudioClockRate = md.ARTPMap.ClockRate + ret.audioAControl = md.AControl.Value + + ret.audioPayloadTypeOrigin = md.ARTPMap.PayloadType + if md.ARTPMap.EncodingName == ARTPMapEncodingNameAAC { + ret.audioPayloadTypeBase = base.AVPacketPTAAC + if md.AFmtPBase != nil { + ret.ASC, err = ParseASC(md.AFmtPBase) + if err != nil { + return ret, err + } + } else { + nazalog.Warnf("aac afmtp not exist.") + } + } else { + ret.audioPayloadTypeBase = base.AVPacketPTUnknown + } + case "video": + ret.hasVideo = true + ret.VideoClockRate = md.ARTPMap.ClockRate + ret.videoAControl = md.AControl.Value + + ret.videoPayloadTypeOrigin = md.ARTPMap.PayloadType + switch md.ARTPMap.EncodingName { + case ARTPMapEncodingNameH264: + ret.videoPayloadTypeBase = base.AVPacketPTAVC + if md.AFmtPBase != nil { + ret.SPS, ret.PPS, err = ParseSPSPPS(md.AFmtPBase) + if err != nil { + return ret, err + } + } else { + nazalog.Warnf("avc afmtp not exist.") + } + case ARTPMapEncodingNameH265: + ret.videoPayloadTypeBase = base.AVPacketPTHEVC + if md.AFmtPBase != nil { + ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(md.AFmtPBase) + if err != nil { + return ret, err + } + } else { + nazalog.Warnf("hevc afmtp not exist.") + } + default: + ret.videoPayloadTypeBase = base.AVPacketPTUnknown + } + } + } + + return ret, nil +} diff --git a/pkg/sdp/raw.go b/pkg/sdp/raw.go new file mode 100644 index 0000000..0a6205a --- /dev/null +++ b/pkg/sdp/raw.go @@ -0,0 +1,203 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package sdp + +import ( + "strconv" + "strings" +) + +type RawContext struct { + MediaDescList []MediaDesc +} + +type MediaDesc struct { + M M + ARTPMap ARTPMap + AFmtPBase *AFmtPBase + AControl AControl +} + +type M struct { + Media string +} + +type ARTPMap struct { + PayloadType int + EncodingName string + ClockRate int + EncodingParameters string +} + +type AFmtPBase struct { + Format int // same as PayloadType + Parameters map[string]string // name -> value +} + +type AControl struct { + Value string +} + +// 例子见单元测试 +func ParseSDP2RawContext(b []byte) (RawContext, error) { + var ( + sdpCtx RawContext + md *MediaDesc + ) + + s := string(b) + lines := strings.Split(s, "\r\n") + for _, line := range lines { + if strings.HasPrefix(line, "m=") { + m, err := ParseM(line) + if err != nil { + return sdpCtx, err + } + if md != nil { + sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md) + } + md = &MediaDesc{ + M: m, + } + } + if strings.HasPrefix(line, "a=rtpmap") { + aRTPMap, err := ParseARTPMap(line) + if err != nil { + return sdpCtx, err + } + if md == nil { + continue + } + md.ARTPMap = aRTPMap + } + if strings.HasPrefix(line, "a=fmtp") { + aFmtPBase, err := ParseAFmtPBase(line) + if err != nil { + return sdpCtx, err + } + if md == nil { + continue + } + md.AFmtPBase = &aFmtPBase + } + if strings.HasPrefix(line, "a=control") { + aControl, err := ParseAControl(line) + if err != nil { + return sdpCtx, err + } + if md == nil { + continue + } + md.AControl = aControl + } + } + if md != nil { + sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md) + } + + return sdpCtx, nil +} + +func ParseM(s string) (ret M, err error) { + ss := strings.TrimPrefix(s, "m=") + items := strings.Split(ss, " ") + if len(items) < 1 { + return ret, ErrSDP + } + ret.Media = items[0] + return +} + +// 例子见单元测试 +func ParseARTPMap(s string) (ret ARTPMap, err error) { + // rfc 3640 3.3.1. General + // rfc 3640 3.3.6. High Bit-rate AAC + // + // a=rtpmap: /[/] + // + + items := strings.SplitN(s, ":", 2) + if len(items) != 2 { + err = ErrSDP + return + } + items = strings.SplitN(items[1], " ", 2) + if len(items) != 2 { + err = ErrSDP + return + } + ret.PayloadType, err = strconv.Atoi(items[0]) + if err != nil { + return + } + items = strings.SplitN(items[1], "/", 3) + switch len(items) { + case 3: + ret.EncodingParameters = items[2] + fallthrough + case 2: + ret.EncodingName = items[0] + ret.ClockRate, err = strconv.Atoi(items[1]) + if err != nil { + return + } + default: + err = ErrSDP + } + return +} + +// 例子见单元测试 +func ParseAFmtPBase(s string) (ret AFmtPBase, err error) { + // rfc 3640 4.4.1. The a=fmtp Keyword + // + // a=fmtp: =[; =] + // + + ret.Parameters = make(map[string]string) + + items := strings.SplitN(s, ":", 2) + if len(items) != 2 { + err = ErrSDP + return + } + + items = strings.SplitN(items[1], " ", 2) + if len(items) != 2 { + err = ErrSDP + return + } + + ret.Format, err = strconv.Atoi(items[0]) + if err != nil { + return + } + + items = strings.Split(items[1], ";") + for _, pp := range items { + pp = strings.TrimSpace(pp) + kv := strings.SplitN(pp, "=", 2) + if len(kv) != 2 { + err = ErrSDP + return + } + ret.Parameters[kv[0]] = kv[1] + } + + return +} + +func ParseAControl(s string) (ret AControl, err error) { + if !strings.HasPrefix(s, "a=control:") { + err = ErrSDP + return + } + ret.Value = strings.TrimPrefix(s, "a=control:") + return +} diff --git a/pkg/sdp/sdp.go b/pkg/sdp/sdp.go index b57faaa..836eef8 100644 --- a/pkg/sdp/sdp.go +++ b/pkg/sdp/sdp.go @@ -9,12 +9,7 @@ package sdp import ( - "encoding/base64" "errors" - "strconv" - "strings" - - "github.com/q191201771/lal/pkg/base" ) // rfc4566 @@ -26,337 +21,3 @@ const ( ARTPMapEncodingNameH264 = "H264" ARTPMapEncodingNameAAC = "MPEG4-GENERIC" ) - -type LogicContext struct { - HasAudio bool - HasVideo bool - AudioClockRate int - VideoClockRate int - AudioPayloadTypeOrigin int - VideoPayloadTypeOrigin int - AudioPayloadType base.AVPacketPT - VideoPayloadType base.AVPacketPT - AudioAControl string - VideoAControl string - ASC []byte - VPS []byte - SPS []byte - PPS []byte -} - -type MediaDesc struct { - M M - ARTPMap ARTPMap - AFmtBase AFmtPBase - AControl AControl -} - -type RawContext struct { - MediaDescList []MediaDesc -} - -type M struct { - Media string -} - -type ARTPMap struct { - PayloadType int - EncodingName string - ClockRate int - EncodingParameters string -} - -type AFmtPBase struct { - Format int // same as PayloadType - Parameters map[string]string // name -> value -} - -type AControl struct { - Value string -} - -func ParseSDP2LogicContext(b []byte) (LogicContext, error) { - var ret LogicContext - - c, err := ParseSDP2RawContext(b) - if err != nil { - return ret, err - } - - for _, md := range c.MediaDescList { - switch md.M.Media { - case "audio": - ret.HasAudio = true - ret.AudioClockRate = md.ARTPMap.ClockRate - ret.AudioAControl = md.AControl.Value - - ret.AudioPayloadTypeOrigin = md.ARTPMap.PayloadType - if md.ARTPMap.EncodingName == ARTPMapEncodingNameAAC { - ret.AudioPayloadType = base.AVPacketPTAAC - ret.ASC, err = ParseASC(md.AFmtBase) - if err != nil { - return ret, err - } - } else { - ret.AudioPayloadType = base.AVPacketPTUnknown - } - case "video": - ret.HasVideo = true - ret.VideoClockRate = md.ARTPMap.ClockRate - ret.VideoAControl = md.AControl.Value - - ret.VideoPayloadTypeOrigin = md.ARTPMap.PayloadType - switch md.ARTPMap.EncodingName { - case ARTPMapEncodingNameH264: - ret.VideoPayloadType = base.AVPacketPTAVC - ret.SPS, ret.PPS, err = ParseSPSPPS(md.AFmtBase) - if err != nil { - return ret, err - } - case ARTPMapEncodingNameH265: - ret.VideoPayloadType = base.AVPacketPTHEVC - ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(md.AFmtBase) - if err != nil { - return ret, err - } - default: - ret.VideoPayloadType = base.AVPacketPTUnknown - } - } - } - - return ret, nil -} - -// 例子见单元测试 -func ParseSDP2RawContext(b []byte) (RawContext, error) { - var ( - sdpCtx RawContext - md *MediaDesc - ) - - s := string(b) - lines := strings.Split(s, "\r\n") - for _, line := range lines { - if strings.HasPrefix(line, "m=") { - m, err := ParseM(line) - if err != nil { - return sdpCtx, err - } - if md != nil { - sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md) - } - md = &MediaDesc{ - M: m, - } - } - if strings.HasPrefix(line, "a=rtpmap") { - aRTPMap, err := ParseARTPMap(line) - if err != nil { - return sdpCtx, err - } - if md == nil { - continue - } - md.ARTPMap = aRTPMap - } - if strings.HasPrefix(line, "a=fmtp") { - aFmtPBase, err := ParseAFmtPBase(line) - if err != nil { - return sdpCtx, err - } - if md == nil { - continue - } - md.AFmtBase = aFmtPBase - } - if strings.HasPrefix(line, "a=control") { - aControl, err := ParseAControl(line) - if err != nil { - return sdpCtx, err - } - if md == nil { - continue - } - md.AControl = aControl - } - } - if md != nil { - sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md) - } - - return sdpCtx, nil -} - -func ParseM(s string) (ret M, err error) { - ss := strings.TrimPrefix(s, "m=") - items := strings.Split(ss, " ") - if len(items) < 1 { - return ret, ErrSDP - } - ret.Media = items[0] - return -} - -// 例子见单元测试 -func ParseARTPMap(s string) (ret ARTPMap, err error) { - // rfc 3640 3.3.1. General - // rfc 3640 3.3.6. High Bit-rate AAC - // - // a=rtpmap: /[/] - // - - items := strings.SplitN(s, ":", 2) - if len(items) != 2 { - err = ErrSDP - return - } - items = strings.SplitN(items[1], " ", 2) - if len(items) != 2 { - err = ErrSDP - return - } - ret.PayloadType, err = strconv.Atoi(items[0]) - if err != nil { - return - } - items = strings.SplitN(items[1], "/", 3) - switch len(items) { - case 3: - ret.EncodingParameters = items[2] - fallthrough - case 2: - ret.EncodingName = items[0] - ret.ClockRate, err = strconv.Atoi(items[1]) - if err != nil { - return - } - default: - err = ErrSDP - } - return -} - -// 例子见单元测试 -func ParseAFmtPBase(s string) (ret AFmtPBase, err error) { - // rfc 3640 4.4.1. The a=fmtp Keyword - // - // a=fmtp: =[; =] - // - - ret.Parameters = make(map[string]string) - - items := strings.SplitN(s, ":", 2) - if len(items) != 2 { - err = ErrSDP - return - } - - items = strings.SplitN(items[1], " ", 2) - if len(items) != 2 { - err = ErrSDP - return - } - - ret.Format, err = strconv.Atoi(items[0]) - if err != nil { - return - } - - items = strings.Split(items[1], ";") - for _, pp := range items { - pp = strings.TrimSpace(pp) - kv := strings.SplitN(pp, "=", 2) - if len(kv) != 2 { - err = ErrSDP - return - } - ret.Parameters[kv[0]] = kv[1] - } - - return -} - -func ParseAControl(s string) (ret AControl, err error) { - if !strings.HasPrefix(s, "a=control:") { - err = ErrSDP - return - } - ret.Value = strings.TrimPrefix(s, "a=control:") - return -} - -func ParseASC(a AFmtPBase) ([]byte, error) { - if a.Format != base.RTPPacketTypeAAC { - return nil, ErrSDP - } - - v, ok := a.Parameters["config"] - if !ok { - return nil, ErrSDP - } - if len(v) < 4 || (len(v)%2) != 0 { - return nil, ErrSDP - } - l := len(v) / 2 - r := make([]byte, l) - for i := 0; i < l; i++ { - b, err := strconv.ParseInt(v[i*2:i*2+2], 16, 0) - if err != nil { - return nil, ErrSDP - } - r[i] = uint8(b) - } - - return r, nil -} - -func ParseVPSSPSPPS(a AFmtPBase) (vps, sps, pps []byte, err error) { - v, ok := a.Parameters["sprop-vps"] - if !ok { - return nil, nil, nil, ErrSDP - } - if vps, err = base64.StdEncoding.DecodeString(v); err != nil { - return nil, nil, nil, err - } - - v, ok = a.Parameters["sprop-sps"] - if !ok { - return nil, nil, nil, ErrSDP - } - if sps, err = base64.StdEncoding.DecodeString(v); err != nil { - return nil, nil, nil, err - } - - v, ok = a.Parameters["sprop-pps"] - if !ok { - return nil, nil, nil, ErrSDP - } - if pps, err = base64.StdEncoding.DecodeString(v); err != nil { - return nil, nil, nil, err - } - - return -} - -// 解析AVC/H264的sps,pps -// 例子见单元测试 -func ParseSPSPPS(a AFmtPBase) (sps, pps []byte, err error) { - v, ok := a.Parameters["sprop-parameter-sets"] - if !ok { - return nil, nil, ErrSDP - } - - items := strings.SplitN(v, ",", 2) - if len(items) != 2 { - return nil, nil, ErrSDP - } - - sps, err = base64.StdEncoding.DecodeString(items[0]) - if err != nil { - return nil, nil, ErrSDP - } - - pps, err = base64.StdEncoding.DecodeString(items[1]) - return -} diff --git a/pkg/sdp/sdp_test.go b/pkg/sdp/sdp_test.go index 6718355..7404227 100644 --- a/pkg/sdp/sdp_test.go +++ b/pkg/sdp/sdp_test.go @@ -6,7 +6,7 @@ // // Author: Chef (191201771@qq.com) -package sdp_test +package sdp import ( "encoding/hex" @@ -15,8 +15,6 @@ import ( "github.com/q191201771/lal/pkg/base" - "github.com/q191201771/lal/pkg/sdp" - "github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/assert" @@ -48,13 +46,13 @@ var goldenPPS = []byte{ } func TestParseSDP2RawContext(t *testing.T) { - sdpCtx, err := sdp.ParseSDP2RawContext([]byte(goldenSDP)) + sdpCtx, err := ParseSDP2RawContext([]byte(goldenSDP)) assert.Equal(t, nil, err) nazalog.Debugf("sdp=%+v", sdpCtx) } func TestParseARTPMap(t *testing.T) { - golden := map[string]sdp.ARTPMap{ + golden := map[string]ARTPMap{ "rtpmap:96 H264/90000": { PayloadType: 96, EncodingName: "H264", @@ -75,14 +73,14 @@ func TestParseARTPMap(t *testing.T) { }, } for in, out := range golden { - actual, err := sdp.ParseARTPMap(in) + actual, err := ParseARTPMap(in) assert.Equal(t, nil, err) assert.Equal(t, out, actual) } } func TestParseFmtPBase(t *testing.T) { - golden := map[string]sdp.AFmtPBase{ + golden := map[string]AFmtPBase{ "a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=; profile-level-id=640020": { Format: 96, Parameters: map[string]string{ @@ -112,7 +110,7 @@ func TestParseFmtPBase(t *testing.T) { }, } for in, out := range golden { - actual, err := sdp.ParseAFmtPBase(in) + actual, err := ParseAFmtPBase(in) assert.Equal(t, nil, err) assert.Equal(t, out, actual) } @@ -120,9 +118,9 @@ func TestParseFmtPBase(t *testing.T) { func TestParseSPSPPS(t *testing.T) { s := "a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=; profile-level-id=640020" - f, err := sdp.ParseAFmtPBase(s) + f, err := ParseAFmtPBase(s) assert.Equal(t, nil, err) - sps, pps, err := sdp.ParseSPSPPS(f) + sps, pps, err := ParseSPSPPS(&f) assert.Equal(t, nil, err) assert.Equal(t, goldenSPS, sps) assert.Equal(t, goldenPPS, pps) @@ -130,9 +128,9 @@ func TestParseSPSPPS(t *testing.T) { func TestParseASC(t *testing.T) { s := "a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=1210" - f, err := sdp.ParseAFmtPBase(s) + f, err := ParseAFmtPBase(s) assert.Equal(t, nil, err) - asc, err := sdp.ParseASC(f) + asc, err := ParseASC(&f) assert.Equal(t, nil, err) assert.Equal(t, []byte{0x12, 0x10}, asc) } @@ -143,9 +141,9 @@ func TestParseASC(t *testing.T) { //[]byte{0x44, 0x01, 0xc1, 0x72, 0xb4, 0x62, 0x40} func TestParseVPSSPSPPS(t *testing.T) { s := "a=fmtp:96 sprop-vps=QAEMAf//AWAAAAMAkAAAAwAAAwA/ugJA; sprop-sps=QgEBAWAAAAMAkAAAAwAAAwA/oAUCAXHy5bpKTC8BAQAAAwABAAADAA8I; sprop-pps=RAHAc8GJ" - f, err := sdp.ParseAFmtPBase(s) + f, err := ParseAFmtPBase(s) assert.Equal(t, nil, err) - vps, sps, pps, err := sdp.ParseVPSSPSPPS(f) + vps, sps, pps, err := ParseVPSSPSPPS(&f) assert.Equal(t, nil, err) nazalog.Debugf("%s", hex.Dump(vps)) nazalog.Debugf("%s", hex.Dump(sps)) @@ -153,18 +151,18 @@ func TestParseVPSSPSPPS(t *testing.T) { } func TestParseSDP2LogicContext(t *testing.T) { - ctx, err := sdp.ParseSDP2LogicContext([]byte(goldenSDP)) + ctx, err := ParseSDP2LogicContext([]byte(goldenSDP)) assert.Equal(t, nil, err) - assert.Equal(t, true, ctx.HasAudio) - assert.Equal(t, true, ctx.HasVideo) + assert.Equal(t, true, ctx.hasAudio) + assert.Equal(t, true, ctx.hasVideo) assert.Equal(t, 44100, ctx.AudioClockRate) assert.Equal(t, 90000, ctx.VideoClockRate) - assert.Equal(t, 97, ctx.AudioPayloadTypeOrigin) - assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin) - assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType) - assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType) - assert.Equal(t, "streamid=1", ctx.AudioAControl) - assert.Equal(t, "streamid=0", ctx.VideoAControl) + assert.Equal(t, true, ctx.IsAudioPayloadTypeOrigin(97)) + assert.Equal(t, true, ctx.IsVideoPayloadTypeOrigin(96)) + assert.Equal(t, base.AVPacketPTAAC, ctx.GetAudioPayloadTypeBase()) + assert.Equal(t, base.AVPacketPTAVC, ctx.GetVideoPayloadTypeBase()) + assert.Equal(t, "streamid=1", ctx.audioAControl) + assert.Equal(t, "streamid=0", ctx.videoAControl) assert.IsNotNil(t, ctx.ASC) assert.Equal(t, nil, ctx.VPS) assert.IsNotNil(t, ctx.SPS) @@ -188,14 +186,14 @@ a=rtpmap:98 H265/90000 a=fmtp:98 profile-id=1;sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBaoAWCAJBY2uSTL5A=;sprop-pps=RAHA8vA8kA==;sprop-vps=QAEMAf//AWAAAAMAsAAAAwAAAwBarAk= a=recvonly` golden = strings.ReplaceAll(golden, "\n", "\r\n") - ctx, err := sdp.ParseSDP2LogicContext([]byte(golden)) + ctx, err := ParseSDP2LogicContext([]byte(golden)) assert.Equal(t, nil, err) - assert.Equal(t, false, ctx.HasAudio) - assert.Equal(t, true, ctx.HasVideo) + assert.Equal(t, false, ctx.hasAudio) + assert.Equal(t, true, ctx.hasVideo) assert.Equal(t, 90000, ctx.VideoClockRate) - assert.Equal(t, 98, ctx.VideoPayloadTypeOrigin) - assert.Equal(t, base.AVPacketPTHEVC, ctx.VideoPayloadType) - assert.Equal(t, "trackID=0", ctx.VideoAControl) + assert.Equal(t, true, ctx.IsVideoPayloadTypeOrigin(98)) + assert.Equal(t, base.AVPacketPTHEVC, ctx.GetVideoPayloadTypeBase()) + assert.Equal(t, "trackID=0", ctx.videoAControl) assert.Equal(t, nil, ctx.ASC) assert.IsNotNil(t, ctx.VPS) assert.IsNotNil(t, ctx.SPS) @@ -225,18 +223,18 @@ a=rtpmap:97 MPEG4-GENERIC/48000 a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1188 a=recvonly` golden = strings.ReplaceAll(golden, "\n", "\r\n") - ctx, err := sdp.ParseSDP2LogicContext([]byte(golden)) + ctx, err := ParseSDP2LogicContext([]byte(golden)) assert.Equal(t, nil, err) - assert.Equal(t, true, ctx.HasAudio) - assert.Equal(t, true, ctx.HasVideo) + assert.Equal(t, true, ctx.hasAudio) + assert.Equal(t, true, ctx.hasVideo) assert.Equal(t, 48000, ctx.AudioClockRate) assert.Equal(t, 90000, ctx.VideoClockRate) - assert.Equal(t, 97, ctx.AudioPayloadTypeOrigin) - assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin) - assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType) - assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType) - assert.Equal(t, "trackID=1", ctx.AudioAControl) - assert.Equal(t, "trackID=0", ctx.VideoAControl) + assert.Equal(t, true, ctx.IsAudioPayloadTypeOrigin(97)) + assert.Equal(t, true, ctx.IsVideoPayloadTypeOrigin(96)) + assert.Equal(t, base.AVPacketPTAAC, ctx.GetAudioPayloadTypeBase()) + assert.Equal(t, base.AVPacketPTAVC, ctx.GetVideoPayloadTypeBase()) + assert.Equal(t, "trackID=1", ctx.audioAControl) + assert.Equal(t, "trackID=0", ctx.videoAControl) assert.IsNotNil(t, ctx.ASC) assert.Equal(t, nil, ctx.VPS) assert.IsNotNil(t, ctx.SPS) @@ -269,18 +267,18 @@ b=AS:10 a=Media_header:MEDIAINFO=494D4B48020100000400000111710110401F000000FA000000000000000000000000000000000000; a=appversion:1.0` golden = strings.ReplaceAll(golden, "\n", "\r\n") - ctx, err := sdp.ParseSDP2LogicContext([]byte(golden)) + ctx, err := ParseSDP2LogicContext([]byte(golden)) assert.Equal(t, nil, err) - assert.Equal(t, true, ctx.HasAudio) - assert.Equal(t, true, ctx.HasVideo) + assert.Equal(t, true, ctx.hasAudio) + assert.Equal(t, true, ctx.hasVideo) assert.Equal(t, 8000, ctx.AudioClockRate) assert.Equal(t, 90000, ctx.VideoClockRate) - assert.Equal(t, 8, ctx.AudioPayloadTypeOrigin) - assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin) + assert.Equal(t, true, ctx.IsAudioPayloadTypeOrigin(8)) + assert.Equal(t, true, ctx.IsVideoPayloadTypeOrigin(96)) //assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType) - assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType) - assert.Equal(t, "trackID=audio", ctx.AudioAControl) - assert.Equal(t, "trackID=video", ctx.VideoAControl) + assert.Equal(t, base.AVPacketPTAVC, ctx.GetVideoPayloadTypeBase()) + assert.Equal(t, "trackID=audio", ctx.audioAControl) + assert.Equal(t, "trackID=video", ctx.videoAControl) assert.Equal(t, nil, ctx.ASC) assert.Equal(t, nil, ctx.VPS) assert.IsNotNil(t, ctx.SPS) @@ -305,16 +303,60 @@ a=rtpmap:107 vnd.onvif.metadata/90000 a=fmtp:107 DecoderTag=h3c-v3 RTCP=0 a=recvonly` golden = strings.ReplaceAll(golden, "\n", "\r\n") - ctx, err := sdp.ParseSDP2LogicContext([]byte(golden)) + ctx, err := ParseSDP2LogicContext([]byte(golden)) assert.Equal(t, nil, err) - assert.Equal(t, false, ctx.HasAudio) - assert.Equal(t, true, ctx.HasVideo) + assert.Equal(t, false, ctx.hasAudio) + assert.Equal(t, true, ctx.hasVideo) assert.Equal(t, 90000, ctx.VideoClockRate) - assert.Equal(t, 105, ctx.VideoPayloadTypeOrigin) - assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType) - assert.Equal(t, "rtsp://192.168.0.221/media/video1/video", ctx.VideoAControl) + assert.Equal(t, true, ctx.IsVideoPayloadTypeOrigin(105)) + assert.Equal(t, base.AVPacketPTAVC, ctx.GetVideoPayloadTypeBase()) + assert.Equal(t, "rtsp://192.168.0.221/media/video1/video", ctx.videoAControl) assert.Equal(t, nil, ctx.VPS) assert.IsNotNil(t, ctx.SPS) assert.IsNotNil(t, ctx.PPS) nazalog.Debugf("%+v", ctx) } + +func TestCase6(t *testing.T) { + golden := `v=0 +o=- 1109162014219182 0 IN IP4 0.0.0.0 +s=HIK Media Server V3.4.96 +i=HIK Media Server Session Description : standard +e=NONE +c=IN IP4 0.0.0.0 +t=0 0 +a=control:* +b=AS:2058 +a=range:npt=now- +m=video 0 RTP/AVP 96 +i=Video Media +a=rtpmap:96 H265/90000 +a=control:trackID=video +b=AS:2048 +m=audio 0 RTP/AVP 8 +i=Audio Media +a=rtpmap:8 PCMA/8000 +a=control:trackID=audio +b=AS:10 +a=Media_header:MEDIAINFO=494D4B48020100000400050011710110401F000000FA000000000000000000000000000000000000; +a=appversion:1.0` + + golden = strings.ReplaceAll(golden, "\n", "\r\n") + ctx, err := ParseSDP2LogicContext([]byte(golden)) + assert.Equal(t, nil, err) + assert.Equal(t, 8000, ctx.AudioClockRate) + assert.Equal(t, 90000, ctx.VideoClockRate) + assert.Equal(t, base.AVPacketPTUnknown, ctx.audioPayloadTypeBase) + assert.Equal(t, base.AVPacketPTHEVC, ctx.videoPayloadTypeBase) + assert.Equal(t, 8, ctx.audioPayloadTypeOrigin) + assert.Equal(t, 96, ctx.videoPayloadTypeOrigin) + assert.Equal(t, "trackID=audio", ctx.audioAControl) + assert.Equal(t, "trackID=video", ctx.videoAControl) + assert.Equal(t, nil, ctx.ASC) + assert.Equal(t, nil, ctx.VPS) + assert.Equal(t, nil, ctx.SPS) + assert.Equal(t, nil, ctx.PPS) + assert.Equal(t, true, ctx.hasAudio) + assert.Equal(t, true, ctx.hasVideo) + nazalog.Debugf("%+v", ctx) +}