messages:

- [feat] rtsp,实现PushSession
- [feat] demo,新增app/demo/pullrtsp2pushrtsp,可拉取rtsp流,并使用rtsp转推出去
- [refactor] rtsp,重构部分逻辑,聚合至sdp.LogicContext中
pull/44/head
q191201771 4 years ago
parent 7dcd4a6b73
commit 766573741e

@ -53,7 +53,7 @@
- [x] **HTTP文件服务器**。比如HLS切片文件可直接播放不需要额外的HTTP文件服务器
- [x] **秒开播放**。GOP缓冲
除了lalserver还提供一些基于lal开发的demo [《lal/app/demo》](https://github.com/q191201771/lal/blob/master/app/demo/README.md)
除了lalserver还提供一些基于lal开发的demo(比如客户端程序) [《lal/app/demo》](https://github.com/q191201771/lal/blob/master/app/demo/README.md)
<img alt="Wide" src="https://pengrl.com/images/other/lalmodule.jpg?date=0829">

@ -1,13 +1,14 @@
`/app/demo`示例程序功能简介:
| demo | push rtmp | pull rtmp | pull httpflv | pull rtsp | 说明 |
| - | - | - | - | - | - |
| pushrtmp | ✔ | . | . | . | RTMP推流客户端压力测试工具 |
| pullrtmp | . | ✔ | . | . | RTMP拉流客户端压力测试工具 |
| pullrtmp2hls | . | ✔ | . | . | 从远端服务器拉取RTMP流存储为本地m3u8+ts文件 |
| pullhttpflv | . | . | ✔ | . | HTTP-FLV拉流客户端 |
| pullrtsp | . | . | . | ✔ | RTSP拉流客户端 |
| pullrtsp2pushrtmp | ✔ | . | . | ✔ | RTSP拉流并使用RTMP转推出去 |
| analyseflv | . | . | ✔ | . | 拉取HTTP-FLV流并进行分析 |
| demo | push rtmp | push rtsp | pull rtmp | pull httpflv | pull rtsp | 说明 |
| - | - | - | - | - | - | - |
| pushrtmp | ✔ | . | . | . | . | RTMP推流客户端压力测试工具 |
| pullrtmp | . | . | ✔ | . | . | RTMP拉流客户端压力测试工具 |
| pullrtmp2hls | . | . | ✔ | . | . | 从远端服务器拉取RTMP流存储为本地m3u8+ts文件 |
| pullhttpflv | . | . | . | ✔ | . | HTTP-FLV拉流客户端 |
| pullrtsp | . | . | . | . | ✔ | RTSP拉流客户端 |
| pullrtsp2pushrtsp | . | ✔ | . | . | ✔ | RTSP拉流并使用RTSP转推出去 |
| pullrtsp2pushrtmp | ✔ | . | . | . | ✔ | RTSP拉流并使用RTMP转推出去 |
| analyseflv | . | . | . | ✔ | . | 拉取HTTP-FLV流并进行分析 |
(更具体的功能参加各源码文件的头部说明)

@ -72,6 +72,7 @@ func main() {
})
go func() {
time.Sleep(3 * time.Second)
for {
rtspPullSession.UpdateStat(1)
rtspStat := rtspPullSession.GetStat()

@ -82,6 +82,7 @@ func main() {
})
go func() {
time.Sleep(3 * time.Second)
for {
rtspPullSession.UpdateStat(1)
rtspStat := rtspPullSession.GetStat()

@ -0,0 +1,101 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package main
import (
"flag"
"fmt"
"os"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/naza/pkg/nazalog"
)
var rtpPacketChan = make(chan rtprtcp.RTPPacket, 1024)
type Observer struct {
}
func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) {
rtpPacketChan <- pkt
}
func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) {
// noop
}
func (o *Observer) OnAVPacket(pkt base.AVPacket) {
// noop
}
func main() {
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
})
inURL, outURL := parseFlag()
o := &Observer{}
rtspPullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMS = 5000
option.OverTCP = false
})
rtspPushSession := rtsp.NewPushSession(func(option *rtsp.PushSessionOption) {
option.PushTimeoutMS = 5000
option.OverTCP = false
})
go func() {
time.Sleep(3 * time.Second)
for {
rtspPullSession.UpdateStat(1)
rtspStat := rtspPullSession.GetStat()
nazalog.Debugf("bitrate. rtsp pull=%dkbit/s, rtsp push=", rtspStat.Bitrate)
time.Sleep(1 * time.Second)
}
}()
err := rtspPullSession.Pull(inURL)
nazalog.Assert(nil, err)
rawSDP, sdpLogicCtx := rtspPullSession.GetSDP()
err = rtspPushSession.Push(outURL, rawSDP, sdpLogicCtx)
nazalog.Assert(nil, err)
for {
select {
case err = <-rtspPullSession.Wait():
nazalog.Infof("pull rtsp done. err=%+v", err)
return
case err = <-rtspPushSession.Wait():
nazalog.Infof("push rtsp done. err=%+v", err)
return
case pkt := <-rtpPacketChan:
rtspPushSession.WriteRTPPacket(pkt)
}
}
}
func parseFlag() (inURL string, outURL string) {
i := flag.String("i", "", "specify pull rtsp url")
o := flag.String("o", "", "specify push rtmp url")
flag.Parse()
if *i == "" || *o == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
./bin/pullrtsp2pushrtsp -i rtsp://localhost:5544/live/test110 -o rtsp://localhost:5544/live/test220
`)
os.Exit(1)
}
return *i, *o
}

@ -17,6 +17,7 @@ const (
UKPRTSPServerCommandSession = "RTSPSRVCMD"
UKPRTSPPubSession = "RTSPPUB"
UKPRTSPSubSession = "RTSPSUB"
UKPRTSPPushSession = "RTSPPUSH"
UKPRTSPPullSession = "RTSPPULL"
UKPFLVSubSession = "FLVSUB"
UKPTSSubSession = "TSSUB"

@ -77,6 +77,8 @@ type OnReadFLVTag func(tag Tag)
//
// @param onReadFLVTag 读取到 flv tag 数据时回调。回调结束后PullSession 不会再使用这块 <tag> 数据。
func (session *PullSession) Pull(rawURL string, onReadFLVTag OnReadFLVTag) error {
nazalog.Debugf("[%s] pull. url=%s", session.UniqueKey, rawURL)
var (
ctx context.Context
cancel context.CancelFunc

@ -481,7 +481,6 @@ func (group *Group) OnReadRTMPAVMsg(msg base.RTMPMsg) {
group.mutex.Lock()
defer group.mutex.Unlock()
//nazalog.Debugf("%+v, %02x, %02x", msg.Header, msg.Payload[0], msg.Payload[1])
group.broadcastRTMP(msg)
}

@ -17,9 +17,8 @@ import (
"github.com/q191201771/naza/pkg/bele"
)
// @param asc 如果为nil则没有音频
// @param vps 如果为nil则是H264如果不为nil则是H265
// @return 返回的内存块为新申请的独立内存块
//
func AVConfig2RTMPMsg(asc, vps, sps, pps []byte) (metadata, ash, vsh *base.RTMPMsg, err error) {
var bMetadata []byte
var bVsh []byte

@ -19,7 +19,7 @@ import (
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/connection"
log "github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazalog"
)
var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout")
@ -98,12 +98,14 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
},
}
log.Infof("[%s] lifecycle new rtmp ClientSession. session=%p", uk, s)
nazalog.Infof("[%s] lifecycle new rtmp ClientSession. session=%p", uk, s)
return s
}
// 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误
func (s *ClientSession) Do(rawURL string) error {
nazalog.Debugf("[%s] Do. url=%s", s.UniqueKey, rawURL)
var (
ctx context.Context
cancel context.CancelFunc
@ -132,7 +134,7 @@ func (s *ClientSession) Flush() error {
}
func (s *ClientSession) Dispose() {
log.Infof("[%s] lifecycle dispose rtmp ClientSession.", s.UniqueKey)
nazalog.Infof("[%s] lifecycle dispose rtmp ClientSession.", s.UniqueKey)
_ = s.conn.Close()
}
@ -202,13 +204,13 @@ func (s *ClientSession) doContext(ctx context.Context, rawURL string) error {
return
}
log.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey, LocalChunkSize)
nazalog.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey, LocalChunkSize)
if err := s.packer.writeChunkSize(s.conn, LocalChunkSize); err != nil {
errChan <- err
return
}
log.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName())
nazalog.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName())
if err := s.packer.writeConnect(s.conn, s.appName(), s.tcURL(), s.t == CSTPushSession); err != nil {
errChan <- err
return
@ -266,7 +268,7 @@ func (s *ClientSession) tcpConnect() error {
}
func (s *ClientSession) handshake() error {
log.Infof("[%s] > W Handshake C0+C1.", s.UniqueKey)
nazalog.Infof("[%s] > W Handshake C0+C1.", s.UniqueKey)
if err := s.hc.WriteC0C1(s.conn); err != nil {
return err
}
@ -274,9 +276,9 @@ func (s *ClientSession) handshake() error {
if err := s.hc.ReadS0S1S2(s.conn); err != nil {
return err
}
log.Infof("[%s] < R Handshake S0+S1+S2.", s.UniqueKey)
nazalog.Infof("[%s] < R Handshake S0+S1+S2.", s.UniqueKey)
log.Infof("[%s] > W Handshake C2.", s.UniqueKey)
nazalog.Infof("[%s] > W Handshake C2.", s.UniqueKey)
if err := s.hc.WriteC2(s.conn); err != nil {
return err
}
@ -303,13 +305,13 @@ func (s *ClientSession) doMsg(stream *Stream) error {
case base.RTMPTypeIDAck:
return s.doAck(stream)
case base.RTMPTypeIDUserControl:
log.Warnf("[%s] read user control message, ignore.", s.UniqueKey)
nazalog.Warnf("[%s] read user control message, ignore.", s.UniqueKey)
case base.RTMPTypeIDAudio:
fallthrough
case base.RTMPTypeIDVideo:
s.onReadRTMPAVMsg(stream.toAVMsg())
default:
log.Errorf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString())
nazalog.Errorf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString())
panic(0)
}
return nil
@ -317,7 +319,7 @@ func (s *ClientSession) doMsg(stream *Stream) error {
func (s *ClientSession) doAck(stream *Stream) error {
seqNum := bele.BEUint32(stream.msg.buf[stream.msg.b:stream.msg.e])
log.Infof("[%s] < R Acknowledgement. ignore. sequence number=%d.", s.UniqueKey, seqNum)
nazalog.Infof("[%s] < R Acknowledgement. ignore. sequence number=%d.", s.UniqueKey, seqNum)
return nil
}
@ -329,7 +331,7 @@ func (s *ClientSession) doDataMessageAMF0(stream *Stream) error {
switch val {
case "|RtmpSampleAccess":
log.Debugf("[%s] < R |RtmpSampleAccess, ignore.", s.UniqueKey)
nazalog.Debugf("[%s] < R |RtmpSampleAccess, ignore.", s.UniqueKey)
return nil
default:
}
@ -350,13 +352,13 @@ func (s *ClientSession) doCommandMessage(stream *Stream) error {
switch cmd {
case "onBWDone":
log.Warnf("[%s] < R onBWDone. ignore.", s.UniqueKey)
nazalog.Warnf("[%s] < R onBWDone. ignore.", s.UniqueKey)
case "_result":
return s.doResultMessage(stream, tid)
case "onStatus":
return s.doOnStatusMessage(stream, tid)
default:
log.Errorf("[%s] read unknown command message. cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString())
nazalog.Errorf("[%s] read unknown command message. cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString())
}
return nil
@ -378,18 +380,18 @@ func (s *ClientSession) doOnStatusMessage(stream *Stream, tid int) error {
case CSTPushSession:
switch code {
case "NetStream.Publish.Start":
log.Infof("[%s] < R onStatus('NetStream.Publish.Start').", s.UniqueKey)
nazalog.Infof("[%s] < R onStatus('NetStream.Publish.Start').", s.UniqueKey)
s.notifyDoResultSucc()
default:
log.Errorf("[%s] read on status message but code field unknown. code=%s", s.UniqueKey, code)
nazalog.Errorf("[%s] read on status message but code field unknown. code=%s", s.UniqueKey, code)
}
case CSTPullSession:
switch code {
case "NetStream.Play.Start":
log.Infof("[%s] < R onStatus('NetStream.Play.Start').", s.UniqueKey)
nazalog.Infof("[%s] < R onStatus('NetStream.Play.Start').", s.UniqueKey)
s.notifyDoResultSucc()
default:
log.Errorf("[%s] read on status message but code field unknown. code=%s", s.UniqueKey, code)
nazalog.Errorf("[%s] read on status message but code field unknown. code=%s", s.UniqueKey, code)
}
}
@ -413,13 +415,13 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
}
switch code {
case "NetConnection.Connect.Success":
log.Infof("[%s] < R _result(\"NetConnection.Connect.Success\").", s.UniqueKey)
log.Infof("[%s] > W createStream().", s.UniqueKey)
nazalog.Infof("[%s] < R _result(\"NetConnection.Connect.Success\").", s.UniqueKey)
nazalog.Infof("[%s] > W createStream().", s.UniqueKey)
if err := s.packer.writeCreateStream(s.conn); err != nil {
return err
}
default:
log.Errorf("[%s] unknown code. code=%v", s.UniqueKey, code)
nazalog.Errorf("[%s] unknown code. code=%v", s.UniqueKey, code)
}
case tidClientCreateStream:
err := stream.msg.readNull()
@ -430,21 +432,21 @@ func (s *ClientSession) doResultMessage(stream *Stream, tid int) error {
if err != nil {
return err
}
log.Infof("[%s] < R _result().", s.UniqueKey)
nazalog.Infof("[%s] < R _result().", s.UniqueKey)
switch s.t {
case CSTPullSession:
log.Infof("[%s] > W play('%s').", s.UniqueKey, s.streamNameWithRawQuery())
nazalog.Infof("[%s] > W play('%s').", s.UniqueKey, s.streamNameWithRawQuery())
if err := s.packer.writePlay(s.conn, s.streamNameWithRawQuery(), sid); err != nil {
return err
}
case CSTPushSession:
log.Infof("[%s] > W publish('%s').", s.UniqueKey, s.streamNameWithRawQuery())
nazalog.Infof("[%s] > W publish('%s').", s.UniqueKey, s.streamNameWithRawQuery())
if err := s.packer.writePublish(s.conn, s.appName(), s.streamNameWithRawQuery(), sid); err != nil {
return err
}
}
default:
log.Errorf("[%s] unknown tid. tid=%d", s.UniqueKey, tid)
nazalog.Errorf("[%s] unknown tid. tid=%d", s.UniqueKey, tid)
}
return nil
}
@ -458,15 +460,15 @@ func (s *ClientSession) doProtocolControlMessage(stream *Stream) error {
switch stream.header.MsgTypeID {
case base.RTMPTypeIDWinAckSize:
s.peerWinAckSize = val
log.Infof("[%s] < R Window Acknowledgement Size: %d", s.UniqueKey, s.peerWinAckSize)
nazalog.Infof("[%s] < R Window Acknowledgement Size: %d", s.UniqueKey, s.peerWinAckSize)
case base.RTMPTypeIDBandwidth:
// TODO chef: 是否需要关注这个信令
log.Debugf("[%s] < R Set Peer Bandwidth. ignore.", s.UniqueKey)
nazalog.Debugf("[%s] < R Set Peer Bandwidth. ignore.", s.UniqueKey)
case base.RTMPTypeIDSetChunkSize:
// composer内部会自动更新peer chunk size.
log.Infof("[%s] < R Set Chunk Size %d.", s.UniqueKey, val)
nazalog.Infof("[%s] < R Set Chunk Size %d.", s.UniqueKey, val)
default:
log.Errorf("[%s] read unknown protocol control message. typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString())
nazalog.Errorf("[%s] read unknown protocol control message. typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString())
}
return nil
}

@ -10,7 +10,6 @@ package rtsp
import (
"net"
"strings"
"sync"
"sync/atomic"
@ -35,8 +34,10 @@ type BaseCommandSession interface {
type BaseInSessionObserver interface {
OnRTPPacket(pkt rtprtcp.RTPPacket)
// @param asc: AAC AudioSpecificConfig注意如果不存在音频则为nil
// @param vps: 视频为H264时为nil视频为H265时不为nil
// @param asc: AAC AudioSpecificConfig注意如果不存在音频或音频不为AAC则为nil
// @param vps, sps, pps 如果都为nil则没有视频如果sps, pps不为nil则vps不为nil是H265vps为nil是H264
//
// 注意4个参数可能同时为nil
OnAVConfig(asc, vps, sps, pps []byte)
// @param pkt: pkt结构体中字段含义见rtprtcp.OnAVPacket
@ -82,21 +83,21 @@ func (s *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext)
s.sdpLogicCtx = sdpLogicCtx
s.m.Unlock()
if isSupportType(s.sdpLogicCtx.AudioPayloadType) {
s.audioUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.AudioPayloadType, s.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked)
if s.sdpLogicCtx.IsAudioUnpackable() {
s.audioUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.GetAudioPayloadTypeBase(), s.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked)
} else {
nazalog.Warnf("[%s] audio unpacker not support yet. origin type=%d", s.UniqueKey, s.sdpLogicCtx.AudioPayloadTypeOrigin)
nazalog.Warnf("[%s] audio unpacker not support for this type yet.", s.UniqueKey)
}
if isSupportType(s.sdpLogicCtx.VideoPayloadType) {
s.videoUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.VideoPayloadType, s.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked)
if s.sdpLogicCtx.IsVideoUnpackable() {
s.videoUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.GetVideoPayloadTypeBase(), s.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked)
} else {
nazalog.Warnf("[%s] video unpacker not support yet. origin type=%d", s.UniqueKey, s.sdpLogicCtx.AudioPayloadTypeOrigin)
nazalog.Warnf("[%s] video unpacker not support this type yet.", s.UniqueKey)
}
s.audioRRProducer = rtprtcp.NewRRProducer(s.sdpLogicCtx.AudioClockRate)
s.videoRRProducer = rtprtcp.NewRRProducer(s.sdpLogicCtx.VideoClockRate)
if isSupportType(s.sdpLogicCtx.AudioPayloadType) && isSupportType(s.sdpLogicCtx.VideoPayloadType) {
if s.sdpLogicCtx.IsAudioUnpackable() && s.sdpLogicCtx.IsVideoUnpackable() {
s.avPacketQueue = NewAVPacketQueue(s.onAVPacket)
}
@ -109,17 +110,14 @@ func (s *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext)
func (s *BaseInSession) SetObserver(observer BaseInSessionObserver) {
s.observer = observer
// TODO chef: 这里的判断应该去掉
if s.sdpLogicCtx.ASC != nil && s.sdpLogicCtx.SPS != nil {
s.observer.OnAVConfig(s.sdpLogicCtx.ASC, s.sdpLogicCtx.VPS, s.sdpLogicCtx.SPS, s.sdpLogicCtx.PPS)
}
s.observer.OnAVConfig(s.sdpLogicCtx.ASC, s.sdpLogicCtx.VPS, s.sdpLogicCtx.SPS, s.sdpLogicCtx.PPS)
}
func (s *BaseInSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error {
if s.sdpLogicCtx.AudioAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) {
if s.sdpLogicCtx.IsAudioURI(uri) {
s.audioRTPConn = rtpConn
s.audioRTCPConn = rtcpConn
} else if s.sdpLogicCtx.VideoAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.VideoAControl) {
} else if s.sdpLogicCtx.IsVideoURI(uri) {
s.videoRTPConn = rtpConn
s.videoRTCPConn = rtcpConn
} else {
@ -133,11 +131,11 @@ func (s *BaseInSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDP
}
func (s *BaseInSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int) error {
if s.sdpLogicCtx.AudioAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) {
if s.sdpLogicCtx.IsAudioURI(uri) {
s.audioRTPChannel = rtpChannel
s.audioRTCPChannel = rtcpChannel
return nil
} else if s.sdpLogicCtx.VideoAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.VideoAControl) {
} else if s.sdpLogicCtx.IsVideoURI(uri) {
s.videoRTPChannel = rtpChannel
s.videoRTCPChannel = rtcpChannel
return nil
@ -337,7 +335,7 @@ func (s *BaseInSession) handleRTPPacket(b []byte) error {
}
packetType := int(b[1] & 0x7F)
if packetType != s.sdpLogicCtx.AudioPayloadTypeOrigin && packetType != s.sdpLogicCtx.VideoPayloadTypeOrigin {
if !s.sdpLogicCtx.IsPayloadTypeOrigin(packetType) {
return ErrRTSP
}
@ -352,16 +350,8 @@ func (s *BaseInSession) handleRTPPacket(b []byte) error {
pkt.Header = h
pkt.Raw = b
switch packetType {
case s.sdpLogicCtx.VideoPayloadTypeOrigin:
s.videoSSRC = h.SSRC
s.observer.OnRTPPacket(pkt)
s.videoRRProducer.FeedRTPPacket(h.Seq)
if s.videoUnpacker != nil {
s.videoUnpacker.Feed(pkt)
}
case s.sdpLogicCtx.AudioPayloadTypeOrigin:
// 接收数据时保证了sdp的原始类型对应
if s.sdpLogicCtx.IsAudioPayloadTypeOrigin(packetType) {
s.audioSSRC = h.SSRC
s.observer.OnRTPPacket(pkt)
s.audioRRProducer.FeedRTPPacket(h.Seq)
@ -369,7 +359,15 @@ func (s *BaseInSession) handleRTPPacket(b []byte) error {
if s.audioUnpacker != nil {
s.audioUnpacker.Feed(pkt)
}
default:
} else if s.sdpLogicCtx.IsVideoPayloadTypeOrigin(packetType) {
s.videoSSRC = h.SSRC
s.observer.OnRTPPacket(pkt)
s.videoRRProducer.FeedRTPPacket(h.Seq)
if s.videoUnpacker != nil {
s.videoUnpacker.Feed(pkt)
}
} else {
// 因为前面已经判断过type了所以永远不会走到这
}

@ -99,6 +99,8 @@ func NewPullSession(observer PullSessionObserver, modOptions ...ModPullSessionOp
// 如果没有错误发生阻塞直到接收音视频数据的前一步也即收到rtsp play response
func (session *PullSession) Pull(rawURL string) error {
nazalog.Debugf("[%s] pull. url=%s", session.UniqueKey, rawURL)
var (
ctx context.Context
cancel context.CancelFunc
@ -150,6 +152,10 @@ func (session *PullSession) IsAlive() (readAlive, writeAlive bool) {
return session.baseInSession.IsAlive()
}
func (session *PullSession) GetSDP() ([]byte, sdp.LogicContext) {
return session.baseInSession.GetSDP()
}
func (session *PullSession) pullContext(ctx context.Context, rawURL string) error {
errChan := make(chan error, 1)
@ -346,24 +352,27 @@ func (session *PullSession) writeDescribe() error {
}
func (session *PullSession) writeSetup() error {
if session.baseInSession.sdpLogicCtx.VideoAControl != "" {
if session.baseInSession.sdpLogicCtx.HasVideoAControl() {
uri := session.baseInSession.sdpLogicCtx.MakeVideoSetupURI(session.urlCtx.RawURLWithoutUserInfo)
if session.option.OverTCP {
if err := session.writeOneSetupTCP(session.baseInSession.sdpLogicCtx.VideoAControl); err != nil {
if err := session.writeOneSetupTCP(uri); err != nil {
return err
}
} else {
if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.VideoAControl); err != nil {
if err := session.writeOneSetup(uri); err != nil {
return err
}
}
}
if session.baseInSession.sdpLogicCtx.AudioAControl != "" {
// can't else if
if session.baseInSession.sdpLogicCtx.HasAudioAControl() {
uri := session.baseInSession.sdpLogicCtx.MakeAudioSetupURI(session.urlCtx.RawURLWithoutUserInfo)
if session.option.OverTCP {
if err := session.writeOneSetupTCP(session.baseInSession.sdpLogicCtx.AudioAControl); err != nil {
if err := session.writeOneSetupTCP(uri); err != nil {
return err
}
} else {
if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.AudioAControl); err != nil {
if err := session.writeOneSetup(uri); err != nil {
return err
}
}
@ -371,8 +380,7 @@ func (session *PullSession) writeSetup() error {
return nil
}
func (session *PullSession) writeOneSetup(aControl string) error {
setupURI := makeSetupURI(session.urlCtx, aControl)
func (session *PullSession) writeOneSetup(setupURI string) error {
rtpC, rtpPort, rtcpC, rtcpPort, err := availUDPConnPool.Acquire2()
if err != nil {
return err
@ -423,8 +431,7 @@ func (session *PullSession) writeOneSetup(aControl string) error {
return nil
}
func (session *PullSession) writeOneSetupTCP(aControl string) error {
setupURI := makeSetupURI(session.urlCtx, aControl)
func (session *PullSession) writeOneSetupTCP(setupURI string) error {
rtpChannel := session.channel
rtcpChannel := session.channel + 1
session.channel += 2

@ -0,0 +1,385 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp
import (
"context"
"fmt"
"net"
"strings"
"time"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/nazanet"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/nazalog"
)
// TODO chef:
// - 将push和pull中重复的代码抽象出来
// - 将push和sub中重复的代码抽象出来
// - push有的功能没实现需要参考pull和sub
const (
pushReadBufSize = 256
)
type PushSessionOption struct {
PushTimeoutMS int
OverTCP bool
}
var defaultPushSessionOption = PushSessionOption{
PushTimeoutMS: 10000,
OverTCP: false,
}
type PushSession struct {
UniqueKey string
option PushSessionOption
CmdConn connection.Connection
cseq int
sessionID string
channel int
rawURL string
urlCtx base.URLContext
waitErrChan chan error
methodGetParameterSupported bool
auth Auth
rawSDP []byte
sdpLogicCtx sdp.LogicContext
audioRTPConn *nazanet.UDPConnection
videoRTPConn *nazanet.UDPConnection
audioRTCPConn *nazanet.UDPConnection
videoRTCPConn *nazanet.UDPConnection
}
type ModPushSessionOption func(option *PushSessionOption)
func NewPushSession(modOptions ...ModPushSessionOption) *PushSession {
option := defaultPushSessionOption
for _, fn := range modOptions {
fn(&option)
}
uk := base.GenUniqueKey(base.UKPRTSPPushSession)
s := &PushSession{
UniqueKey: uk,
option: option,
waitErrChan: make(chan error, 1),
}
nazalog.Infof("[%s] lifecycle new rtsp PushSession. session=%p", uk, s)
return s
}
func (session *PushSession) Push(rawURL string, rawSDP []byte, sdpLogicCtx sdp.LogicContext) error {
nazalog.Debugf("[%s] push. url=%s", session.UniqueKey, rawURL)
session.rawSDP = rawSDP
session.sdpLogicCtx = sdpLogicCtx
var (
ctx context.Context
cancel context.CancelFunc
)
if session.option.PushTimeoutMS == 0 {
ctx, cancel = context.WithCancel(context.Background())
} else {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(session.option.PushTimeoutMS)*time.Millisecond)
}
defer cancel()
return session.pushContext(ctx, rawURL)
}
func (session *PushSession) Wait() <-chan error {
return session.waitErrChan
}
func (session *PushSession) WriteRTPPacket(packet rtprtcp.RTPPacket) {
// 发送数据时保证和sdp的原始类型对应
t := int(packet.Header.PacketType)
if session.sdpLogicCtx.IsAudioPayloadTypeOrigin(t) {
if session.audioRTPConn != nil {
_ = session.audioRTPConn.Write(packet.Raw)
}
} else if session.sdpLogicCtx.IsVideoPayloadTypeOrigin(t) {
if session.videoRTPConn != nil {
_ = session.videoRTPConn.Write(packet.Raw)
}
} else {
nazalog.Errorf("[%s] write rtp packet but type invalid. type=%d", session.UniqueKey, t)
}
}
func (session *PushSession) pushContext(ctx context.Context, rawURL string) error {
errChan := make(chan error, 1)
go func() {
if err := session.connect(rawURL); err != nil {
errChan <- err
return
}
if err := session.writeOptions(); err != nil {
errChan <- err
return
}
if err := session.writeAnnounce(); err != nil {
errChan <- err
return
}
if err := session.writeSetup(); err != nil {
errChan <- err
return
}
if err := session.writeRecord(); err != nil {
errChan <- err
return
}
}()
return nil
}
func (session *PushSession) connect(rawURL string) (err error) {
session.rawURL = rawURL
session.urlCtx, err = base.ParseRTSPURL(rawURL)
if err != nil {
return err
}
nazalog.Debugf("[%s] > tcp connect.", session.UniqueKey)
// # 建立连接
conn, err := net.Dial("tcp", session.urlCtx.HostWithPort)
if err != nil {
return err
}
session.CmdConn = connection.New(conn, func(option *connection.Option) {
option.ReadBufSize = pullReadBufSize
})
nazalog.Debugf("[%s] < tcp connect. laddr=%s", session.UniqueKey, conn.LocalAddr().String())
// TODO
return nil
}
func (session *PushSession) writeOptions() error {
session.cseq++
req := PackRequestOptions(session.urlCtx.RawURLWithoutUserInfo, session.cseq, "")
nazalog.Debugf("[%s] > write options.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
}
ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn)
if err != nil {
return err
}
nazalog.Debugf("[%s] < read response. %s", session.UniqueKey, ctx.StatusCode)
session.handleOptionMethods(ctx)
if err := session.handleAuth(ctx); err != nil {
return err
}
return nil
}
func (session *PushSession) writeAnnounce() error {
session.cseq++
auth := session.auth.MakeAuthorization(MethodDescribe, session.urlCtx.RawURLWithoutUserInfo)
req := PackRequestAnnounce(session.urlCtx.RawURLWithoutUserInfo, session.cseq, string(session.rawSDP), auth)
nazalog.Debugf("[%s] > write announce.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
}
ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn)
if err != nil {
return err
}
nazalog.Debugf("[%s] < read response. code=%s, body=%s", session.UniqueKey, ctx.StatusCode, string(ctx.Body))
return nil
}
func (session *PushSession) writeSetup() error {
if session.sdpLogicCtx.HasVideoAControl() {
uri := session.sdpLogicCtx.MakeVideoSetupURI(session.urlCtx.RawURLWithoutUserInfo)
if session.option.OverTCP {
if err := session.writeOneSetupTCP(uri); err != nil {
return err
}
} else {
if err := session.writeOneSetup(uri); err != nil {
return err
}
}
}
// can't else if
if session.sdpLogicCtx.HasAudioAControl() {
uri := session.sdpLogicCtx.MakeAudioSetupURI(session.urlCtx.RawURLWithoutUserInfo)
if session.option.OverTCP {
if err := session.writeOneSetupTCP(uri); err != nil {
return err
}
} else {
if err := session.writeOneSetup(uri); err != nil {
return err
}
}
}
return nil
}
func (session *PushSession) writeOneSetup(setupURI string) error {
rtpC, rtpPort, rtcpC, rtcpPort, err := availUDPConnPool.Acquire2()
if err != nil {
return err
}
session.cseq++
auth := session.auth.MakeAuthorization(MethodSetup, session.urlCtx.RawURLWithoutUserInfo)
req := PackRequestSetup(setupURI, session.cseq, int(rtpPort), int(rtcpPort), session.sessionID, auth)
nazalog.Debugf("[%s] > write setup.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
}
ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn)
if err != nil {
return err
}
nazalog.Debugf("[%s] < read response. code=%s, ctx=%+v", session.UniqueKey, ctx.StatusCode, ctx)
session.sessionID = strings.Split(ctx.Headers[HeaderFieldSession], ";")[0]
srvRTPPort, srvRTCPPort, err := parseServerPort(ctx.Headers[HeaderFieldTransport])
if err != nil {
return err
}
rtpConn, err := nazanet.NewUDPConnection(func(option *nazanet.UDPConnectionOption) {
option.Conn = rtpC
option.RAddr = net.JoinHostPort(session.urlCtx.Host, fmt.Sprintf("%d", srvRTPPort))
option.MaxReadPacketSize = rtprtcp.MaxRTPRTCPPacketSize
})
if err != nil {
return err
}
rtcpConn, err := nazanet.NewUDPConnection(func(option *nazanet.UDPConnectionOption) {
option.Conn = rtcpC
option.RAddr = net.JoinHostPort(session.urlCtx.Host, fmt.Sprintf("%d", srvRTCPPort))
option.MaxReadPacketSize = rtprtcp.MaxRTPRTCPPacketSize
})
if err != nil {
return err
}
if err := session.setupWithConn(setupURI, rtpConn, rtcpConn); err != nil {
return err
}
return nil
}
func (session *PushSession) writeRecord() error {
session.cseq++
auth := session.auth.MakeAuthorization(MethodRecord, session.urlCtx.RawURLWithoutUserInfo)
req := PackRequestRecord(session.urlCtx.RawURLWithoutUserInfo, session.cseq, session.sessionID, auth)
nazalog.Debugf("[%s] > write record.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
}
ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn)
if err != nil {
return err
}
nazalog.Debugf("[%s] < read response. code=%s, body=%s", session.UniqueKey, ctx.StatusCode, string(ctx.Body))
return nil
}
func (session *PushSession) setupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error {
if session.sdpLogicCtx.IsAudioURI(uri) {
session.audioRTPConn = rtpConn
session.audioRTCPConn = rtcpConn
} else if session.sdpLogicCtx.IsVideoURI(uri) {
session.videoRTPConn = rtpConn
session.videoRTCPConn = rtcpConn
} else {
return ErrRTSP
}
go rtpConn.RunLoop(session.onReadUDPPacket)
go rtcpConn.RunLoop(session.onReadUDPPacket)
return nil
}
func (session *PushSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bool {
// TODO chef: impl me
//nazalog.Errorf("[%s] SubSession::onReadUDPPacket. %s", s.UniqueKey, hex.Dump(b))
return true
}
func (session *PushSession) writeOneSetupTCP(setupURI string) error {
panic("not impl")
return nil
}
func (session *PushSession) handleOptionMethods(ctx nazahttp.HTTPRespMsgCtx) {
methods := ctx.Headers["Public"]
if methods == "" {
return
}
if strings.Contains(methods, MethodGetParameter) {
session.methodGetParameterSupported = true
}
}
func (session *PushSession) handleAuth(ctx nazahttp.HTTPRespMsgCtx) error {
if ctx.Headers[HeaderWWWAuthenticate] == "" {
return nil
}
session.auth.FeedWWWAuthenticate(ctx.Headers[HeaderWWWAuthenticate], session.urlCtx.Username, session.urlCtx.Password)
auth := session.auth.MakeAuthorization(MethodOptions, session.urlCtx.RawURLWithoutUserInfo)
session.cseq++
req := PackRequestOptions(session.urlCtx.RawURLWithoutUserInfo, session.cseq, auth)
nazalog.Debugf("[%s] > write options.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
}
ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn)
if err != nil {
return err
}
nazalog.Debugf("[%s] < read response. %s", session.UniqueKey, ctx.StatusCode)
return nil
}

@ -104,20 +104,34 @@ func PackRequestOptions(uri string, cseq int, auth string) string {
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodOptions, uri, headers)
return packRequest(MethodOptions, uri, headers, "")
}
// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段
func PackRequestAnnounce(uri string, cseq int, sdp string, auth string) string {
headers := map[string]string{
HeaderFieldCSeq: fmt.Sprintf("%d", cseq),
HeaderFieldContentLength: fmt.Sprintf("%d", len(sdp)),
HeaderAccept: HeaderAcceptApplicationSDP,
HeaderUserAgent: base.LALRTSPPullSessionUA,
}
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodAnnounce, uri, headers, sdp)
}
// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段
func PackRequestDescribe(uri string, cseq int, auth string) string {
headers := map[string]string{
HeaderAccept: "application/sdp",
HeaderAccept: HeaderAcceptApplicationSDP,
HeaderFieldCSeq: fmt.Sprintf("%d", cseq),
HeaderUserAgent: base.LALRTSPPullSessionUA,
}
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodDescribe, uri, headers)
return packRequest(MethodDescribe, uri, headers, "")
}
// @param sessionID 可以为空,如果为空,则请求中不包含`Session`字段
@ -134,7 +148,7 @@ func PackRequestSetup(uri string, cseq int, rtpClientPort int, rtcpClientPort in
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodSetup, uri, headers)
return packRequest(MethodSetup, uri, headers, "")
}
// @param sessionID 可以为空,如果为空,则请求中不包含`Session`字段
@ -151,7 +165,20 @@ func PackRequestSetupTCP(uri string, cseq int, rtpChannel int, rtcpChannel int,
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodSetup, uri, headers)
return packRequest(MethodSetup, uri, headers, "")
}
func PackRequestRecord(uri string, cseq int, sessionID string, auth string) string {
headers := map[string]string{
HeaderFieldCSeq: fmt.Sprintf("%d", cseq),
HeaderFieldRange: "npt=0.000-",
HeaderFieldSession: sessionID,
HeaderUserAgent: base.LALRTSPPullSessionUA,
}
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodRecord, uri, headers, "")
}
func PackRequestPlay(uri string, cseq int, sessionID string, auth string) string {
@ -164,7 +191,7 @@ func PackRequestPlay(uri string, cseq int, sessionID string, auth string) string
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodPlay, uri, headers)
return packRequest(MethodPlay, uri, headers, "")
}
func PackRequestGetParameter(uri string, cseq int, sessionID string) string {
@ -215,11 +242,17 @@ func PackResponseTeardown(cseq string) string {
return fmt.Sprintf(ResponseTeardownTmpl, cseq)
}
func packRequest(method, uri string, headers map[string]string) (ret string) {
// @param body 可以为空
func packRequest(method, uri string, headers map[string]string, body string) (ret string) {
ret = method + " " + uri + " RTSP/1.0\r\n"
for k, v := range headers {
ret += k + ": " + v + "\r\n"
}
ret += "\r\n"
if body != "" {
ret += body
}
return ret
}

@ -46,15 +46,19 @@ const (
MethodGetParameter = "GET_PARAMETER"
)
// TODO chef: 这里有的有Field有的没有命名需要统一一下
const (
HeaderAccept = "Accept"
HeaderUserAgent = "User-Agent"
HeaderFieldCSeq = "CSeq"
HeaderFieldTransport = "Transport"
HeaderFieldSession = "Session"
HeaderFieldRange = "Range"
HeaderWWWAuthenticate = "WWW-Authenticate"
HeaderAuthorization = "Authorization"
HeaderAccept = "Accept"
HeaderUserAgent = "User-Agent"
HeaderFieldCSeq = "CSeq"
HeaderFieldTransport = "Transport"
HeaderFieldSession = "Session"
HeaderFieldRange = "Range"
HeaderFieldContentLength = "Content-Length"
HeaderWWWAuthenticate = "WWW-Authenticate"
HeaderAuthorization = "Authorization"
HeaderAcceptApplicationSDP = "application/sdp"
)
const (
@ -166,18 +170,6 @@ func parseTransport(setupTransport string, key string) (first, second uint16, er
return uint16(iFirst), uint16(iSecond), err
}
func isSupportType(t base.AVPacketPT) bool {
switch t {
case base.AVPacketPTAAC:
fallthrough
case base.AVPacketPTAVC:
fallthrough
case base.AVPacketPTHEVC:
return true
}
return false
}
func makeSetupURI(urlCtx base.URLContext, aControl string) string {
if strings.HasPrefix(aControl, "rtsp://") {
return aControl

@ -10,7 +10,6 @@ package rtsp
import (
"net"
"strings"
"sync/atomic"
"time"
@ -76,10 +75,10 @@ func (s *SubSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) {
}
func (s *SubSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPConnection) error {
if s.sdpLogicCtx.AudioAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) {
if s.sdpLogicCtx.IsAudioURI(uri) {
s.audioRTPConn = rtpConn
s.audioRTCPConn = rtcpConn
} else if s.sdpLogicCtx.VideoAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.VideoAControl) {
} else if s.sdpLogicCtx.IsVideoURI(uri) {
s.videoRTPConn = rtpConn
s.videoRTCPConn = rtcpConn
} else {
@ -95,11 +94,11 @@ func (s *SubSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPCon
func (s *SubSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int, remoteAddr string) error {
s.stat.RemoteAddr = remoteAddr
if s.sdpLogicCtx.AudioAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) {
if s.sdpLogicCtx.IsAudioURI(uri) {
s.audioRTPChannel = rtpChannel
s.audioRTCPChannel = rtcpChannel
return nil
} else if s.sdpLogicCtx.VideoAControl != "" && strings.HasSuffix(uri, s.sdpLogicCtx.VideoAControl) {
} else if s.sdpLogicCtx.IsVideoURI(uri) {
s.videoRTPChannel = rtpChannel
s.videoRTCPChannel = rtcpChannel
return nil
@ -176,23 +175,24 @@ func (s *SubSession) RawQuery() string {
func (s *SubSession) WriteRTPPacket(packet rtprtcp.RTPPacket) {
atomic.AddUint64(&s.currConnStat.WroteBytesSum, uint64(len(packet.Raw)))
switch packet.Header.PacketType {
case uint8(s.sdpLogicCtx.VideoPayloadTypeOrigin):
if s.videoRTPConn != nil {
_ = s.videoRTPConn.Write(packet.Raw)
}
if s.videoRTPChannel != -1 {
_ = s.cmdSession.Write(s.videoRTPChannel, packet.Raw)
}
case uint8(s.sdpLogicCtx.AudioPayloadTypeOrigin):
// 发送数据时保证和sdp的原始类型对应
t := int(packet.Header.PacketType)
if s.sdpLogicCtx.IsAudioPayloadTypeOrigin(t) {
if s.audioRTPConn != nil {
_ = s.audioRTPConn.Write(packet.Raw)
}
if s.audioRTPChannel != -1 {
_ = s.cmdSession.Write(s.audioRTPChannel, packet.Raw)
}
default:
nazalog.Errorf("[%s] write rtp packet but type invalid. type=%d", s.UniqueKey, packet.Header.PacketType)
} else if s.sdpLogicCtx.IsVideoPayloadTypeOrigin(t) {
if s.videoRTPConn != nil {
_ = s.videoRTPConn.Write(packet.Raw)
}
if s.videoRTPChannel != -1 {
_ = s.cmdSession.Write(s.videoRTPChannel, packet.Raw)
}
} else {
nazalog.Errorf("[%s] write rtp packet but type invalid. type=%d", s.UniqueKey, t)
}
}

@ -0,0 +1,92 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package sdp
import (
"encoding/base64"
"strconv"
"strings"
"github.com/q191201771/lal/pkg/base"
)
func ParseASC(a *AFmtPBase) ([]byte, error) {
if a.Format != base.RTPPacketTypeAAC {
return nil, ErrSDP
}
v, ok := a.Parameters["config"]
if !ok {
return nil, ErrSDP
}
if len(v) < 4 || (len(v)%2) != 0 {
return nil, ErrSDP
}
l := len(v) / 2
r := make([]byte, l)
for i := 0; i < l; i++ {
b, err := strconv.ParseInt(v[i*2:i*2+2], 16, 0)
if err != nil {
return nil, ErrSDP
}
r[i] = uint8(b)
}
return r, nil
}
func ParseVPSSPSPPS(a *AFmtPBase) (vps, sps, pps []byte, err error) {
v, ok := a.Parameters["sprop-vps"]
if !ok {
return nil, nil, nil, ErrSDP
}
if vps, err = base64.StdEncoding.DecodeString(v); err != nil {
return nil, nil, nil, err
}
v, ok = a.Parameters["sprop-sps"]
if !ok {
return nil, nil, nil, ErrSDP
}
if sps, err = base64.StdEncoding.DecodeString(v); err != nil {
return nil, nil, nil, err
}
v, ok = a.Parameters["sprop-pps"]
if !ok {
return nil, nil, nil, ErrSDP
}
if pps, err = base64.StdEncoding.DecodeString(v); err != nil {
return nil, nil, nil, err
}
return
}
// 解析AVC/H264的spspps
// 例子见单元测试
func ParseSPSPPS(a *AFmtPBase) (sps, pps []byte, err error) {
v, ok := a.Parameters["sprop-parameter-sets"]
if !ok {
return nil, nil, ErrSDP
}
items := strings.SplitN(v, ",", 2)
if len(items) != 2 {
return nil, nil, ErrSDP
}
sps, err = base64.StdEncoding.DecodeString(items[0])
if err != nil {
return nil, nil, ErrSDP
}
pps, err = base64.StdEncoding.DecodeString(items[1])
return
}

@ -0,0 +1,165 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package sdp
import (
"fmt"
"strings"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/lal/pkg/base"
)
type LogicContext struct {
AudioClockRate int
VideoClockRate int
audioPayloadTypeBase base.AVPacketPT // lal内部定义的类型
videoPayloadTypeBase base.AVPacketPT
audioPayloadTypeOrigin int // 原始类型sdp或rtp中的类型
videoPayloadTypeOrigin int
audioAControl string
videoAControl string
ASC []byte
VPS []byte
SPS []byte
PPS []byte
// 没有用上的
hasAudio bool
hasVideo bool
}
func (lc *LogicContext) IsAudioPayloadTypeOrigin(t int) bool {
return lc.audioPayloadTypeOrigin == t
}
func (lc *LogicContext) IsVideoPayloadTypeOrigin(t int) bool {
return lc.videoPayloadTypeOrigin == t
}
func (lc *LogicContext) IsPayloadTypeOrigin(t int) bool {
return lc.audioPayloadTypeOrigin == t || lc.videoPayloadTypeOrigin == t
}
func (lc *LogicContext) IsAudioUnpackable() bool {
return lc.audioPayloadTypeBase == base.AVPacketPTAAC
}
func (lc *LogicContext) IsVideoUnpackable() bool {
return lc.videoPayloadTypeBase == base.AVPacketPTAVC ||
lc.videoPayloadTypeBase == base.AVPacketPTHEVC
}
func (lc *LogicContext) IsAudioURI(uri string) bool {
return lc.audioAControl != "" && strings.HasSuffix(uri, lc.audioAControl)
}
func (lc *LogicContext) IsVideoURI(uri string) bool {
return lc.videoAControl != "" && strings.HasSuffix(uri, lc.videoAControl)
}
func (lc *LogicContext) HasAudioAControl() bool {
return lc.audioAControl != ""
}
func (lc *LogicContext) HasVideoAControl() bool {
return lc.videoAControl != ""
}
func (lc *LogicContext) MakeAudioSetupURI(uri string) string {
return lc.makeSetupURI(uri, lc.audioAControl)
}
func (lc *LogicContext) MakeVideoSetupURI(uri string) string {
return lc.makeSetupURI(uri, lc.videoAControl)
}
func (lc *LogicContext) GetAudioPayloadTypeBase() base.AVPacketPT {
return lc.audioPayloadTypeBase
}
func (lc *LogicContext) GetVideoPayloadTypeBase() base.AVPacketPT {
return lc.videoPayloadTypeBase
}
func (lc *LogicContext) makeSetupURI(uri string, aControl string) string {
if strings.HasPrefix(aControl, "rtsp://") {
return aControl
}
return fmt.Sprintf("%s/%s", uri, aControl)
}
func ParseSDP2LogicContext(b []byte) (LogicContext, error) {
var ret LogicContext
c, err := ParseSDP2RawContext(b)
if err != nil {
return ret, err
}
for _, md := range c.MediaDescList {
switch md.M.Media {
case "audio":
ret.hasAudio = true
ret.AudioClockRate = md.ARTPMap.ClockRate
ret.audioAControl = md.AControl.Value
ret.audioPayloadTypeOrigin = md.ARTPMap.PayloadType
if md.ARTPMap.EncodingName == ARTPMapEncodingNameAAC {
ret.audioPayloadTypeBase = base.AVPacketPTAAC
if md.AFmtPBase != nil {
ret.ASC, err = ParseASC(md.AFmtPBase)
if err != nil {
return ret, err
}
} else {
nazalog.Warnf("aac afmtp not exist.")
}
} else {
ret.audioPayloadTypeBase = base.AVPacketPTUnknown
}
case "video":
ret.hasVideo = true
ret.VideoClockRate = md.ARTPMap.ClockRate
ret.videoAControl = md.AControl.Value
ret.videoPayloadTypeOrigin = md.ARTPMap.PayloadType
switch md.ARTPMap.EncodingName {
case ARTPMapEncodingNameH264:
ret.videoPayloadTypeBase = base.AVPacketPTAVC
if md.AFmtPBase != nil {
ret.SPS, ret.PPS, err = ParseSPSPPS(md.AFmtPBase)
if err != nil {
return ret, err
}
} else {
nazalog.Warnf("avc afmtp not exist.")
}
case ARTPMapEncodingNameH265:
ret.videoPayloadTypeBase = base.AVPacketPTHEVC
if md.AFmtPBase != nil {
ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(md.AFmtPBase)
if err != nil {
return ret, err
}
} else {
nazalog.Warnf("hevc afmtp not exist.")
}
default:
ret.videoPayloadTypeBase = base.AVPacketPTUnknown
}
}
}
return ret, nil
}

@ -0,0 +1,203 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package sdp
import (
"strconv"
"strings"
)
type RawContext struct {
MediaDescList []MediaDesc
}
type MediaDesc struct {
M M
ARTPMap ARTPMap
AFmtPBase *AFmtPBase
AControl AControl
}
type M struct {
Media string
}
type ARTPMap struct {
PayloadType int
EncodingName string
ClockRate int
EncodingParameters string
}
type AFmtPBase struct {
Format int // same as PayloadType
Parameters map[string]string // name -> value
}
type AControl struct {
Value string
}
// 例子见单元测试
func ParseSDP2RawContext(b []byte) (RawContext, error) {
var (
sdpCtx RawContext
md *MediaDesc
)
s := string(b)
lines := strings.Split(s, "\r\n")
for _, line := range lines {
if strings.HasPrefix(line, "m=") {
m, err := ParseM(line)
if err != nil {
return sdpCtx, err
}
if md != nil {
sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md)
}
md = &MediaDesc{
M: m,
}
}
if strings.HasPrefix(line, "a=rtpmap") {
aRTPMap, err := ParseARTPMap(line)
if err != nil {
return sdpCtx, err
}
if md == nil {
continue
}
md.ARTPMap = aRTPMap
}
if strings.HasPrefix(line, "a=fmtp") {
aFmtPBase, err := ParseAFmtPBase(line)
if err != nil {
return sdpCtx, err
}
if md == nil {
continue
}
md.AFmtPBase = &aFmtPBase
}
if strings.HasPrefix(line, "a=control") {
aControl, err := ParseAControl(line)
if err != nil {
return sdpCtx, err
}
if md == nil {
continue
}
md.AControl = aControl
}
}
if md != nil {
sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md)
}
return sdpCtx, nil
}
func ParseM(s string) (ret M, err error) {
ss := strings.TrimPrefix(s, "m=")
items := strings.Split(ss, " ")
if len(items) < 1 {
return ret, ErrSDP
}
ret.Media = items[0]
return
}
// 例子见单元测试
func ParseARTPMap(s string) (ret ARTPMap, err error) {
// rfc 3640 3.3.1. General
// rfc 3640 3.3.6. High Bit-rate AAC
//
// a=rtpmap:<payload type> <encoding name>/<clock rate>[/<encoding parameters>]
//
items := strings.SplitN(s, ":", 2)
if len(items) != 2 {
err = ErrSDP
return
}
items = strings.SplitN(items[1], " ", 2)
if len(items) != 2 {
err = ErrSDP
return
}
ret.PayloadType, err = strconv.Atoi(items[0])
if err != nil {
return
}
items = strings.SplitN(items[1], "/", 3)
switch len(items) {
case 3:
ret.EncodingParameters = items[2]
fallthrough
case 2:
ret.EncodingName = items[0]
ret.ClockRate, err = strconv.Atoi(items[1])
if err != nil {
return
}
default:
err = ErrSDP
}
return
}
// 例子见单元测试
func ParseAFmtPBase(s string) (ret AFmtPBase, err error) {
// rfc 3640 4.4.1. The a=fmtp Keyword
//
// a=fmtp:<format> <parameter name>=<value>[; <parameter name>=<value>]
//
ret.Parameters = make(map[string]string)
items := strings.SplitN(s, ":", 2)
if len(items) != 2 {
err = ErrSDP
return
}
items = strings.SplitN(items[1], " ", 2)
if len(items) != 2 {
err = ErrSDP
return
}
ret.Format, err = strconv.Atoi(items[0])
if err != nil {
return
}
items = strings.Split(items[1], ";")
for _, pp := range items {
pp = strings.TrimSpace(pp)
kv := strings.SplitN(pp, "=", 2)
if len(kv) != 2 {
err = ErrSDP
return
}
ret.Parameters[kv[0]] = kv[1]
}
return
}
func ParseAControl(s string) (ret AControl, err error) {
if !strings.HasPrefix(s, "a=control:") {
err = ErrSDP
return
}
ret.Value = strings.TrimPrefix(s, "a=control:")
return
}

@ -9,12 +9,7 @@
package sdp
import (
"encoding/base64"
"errors"
"strconv"
"strings"
"github.com/q191201771/lal/pkg/base"
)
// rfc4566
@ -26,337 +21,3 @@ const (
ARTPMapEncodingNameH264 = "H264"
ARTPMapEncodingNameAAC = "MPEG4-GENERIC"
)
type LogicContext struct {
HasAudio bool
HasVideo bool
AudioClockRate int
VideoClockRate int
AudioPayloadTypeOrigin int
VideoPayloadTypeOrigin int
AudioPayloadType base.AVPacketPT
VideoPayloadType base.AVPacketPT
AudioAControl string
VideoAControl string
ASC []byte
VPS []byte
SPS []byte
PPS []byte
}
type MediaDesc struct {
M M
ARTPMap ARTPMap
AFmtBase AFmtPBase
AControl AControl
}
type RawContext struct {
MediaDescList []MediaDesc
}
type M struct {
Media string
}
type ARTPMap struct {
PayloadType int
EncodingName string
ClockRate int
EncodingParameters string
}
type AFmtPBase struct {
Format int // same as PayloadType
Parameters map[string]string // name -> value
}
type AControl struct {
Value string
}
func ParseSDP2LogicContext(b []byte) (LogicContext, error) {
var ret LogicContext
c, err := ParseSDP2RawContext(b)
if err != nil {
return ret, err
}
for _, md := range c.MediaDescList {
switch md.M.Media {
case "audio":
ret.HasAudio = true
ret.AudioClockRate = md.ARTPMap.ClockRate
ret.AudioAControl = md.AControl.Value
ret.AudioPayloadTypeOrigin = md.ARTPMap.PayloadType
if md.ARTPMap.EncodingName == ARTPMapEncodingNameAAC {
ret.AudioPayloadType = base.AVPacketPTAAC
ret.ASC, err = ParseASC(md.AFmtBase)
if err != nil {
return ret, err
}
} else {
ret.AudioPayloadType = base.AVPacketPTUnknown
}
case "video":
ret.HasVideo = true
ret.VideoClockRate = md.ARTPMap.ClockRate
ret.VideoAControl = md.AControl.Value
ret.VideoPayloadTypeOrigin = md.ARTPMap.PayloadType
switch md.ARTPMap.EncodingName {
case ARTPMapEncodingNameH264:
ret.VideoPayloadType = base.AVPacketPTAVC
ret.SPS, ret.PPS, err = ParseSPSPPS(md.AFmtBase)
if err != nil {
return ret, err
}
case ARTPMapEncodingNameH265:
ret.VideoPayloadType = base.AVPacketPTHEVC
ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(md.AFmtBase)
if err != nil {
return ret, err
}
default:
ret.VideoPayloadType = base.AVPacketPTUnknown
}
}
}
return ret, nil
}
// 例子见单元测试
func ParseSDP2RawContext(b []byte) (RawContext, error) {
var (
sdpCtx RawContext
md *MediaDesc
)
s := string(b)
lines := strings.Split(s, "\r\n")
for _, line := range lines {
if strings.HasPrefix(line, "m=") {
m, err := ParseM(line)
if err != nil {
return sdpCtx, err
}
if md != nil {
sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md)
}
md = &MediaDesc{
M: m,
}
}
if strings.HasPrefix(line, "a=rtpmap") {
aRTPMap, err := ParseARTPMap(line)
if err != nil {
return sdpCtx, err
}
if md == nil {
continue
}
md.ARTPMap = aRTPMap
}
if strings.HasPrefix(line, "a=fmtp") {
aFmtPBase, err := ParseAFmtPBase(line)
if err != nil {
return sdpCtx, err
}
if md == nil {
continue
}
md.AFmtBase = aFmtPBase
}
if strings.HasPrefix(line, "a=control") {
aControl, err := ParseAControl(line)
if err != nil {
return sdpCtx, err
}
if md == nil {
continue
}
md.AControl = aControl
}
}
if md != nil {
sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md)
}
return sdpCtx, nil
}
func ParseM(s string) (ret M, err error) {
ss := strings.TrimPrefix(s, "m=")
items := strings.Split(ss, " ")
if len(items) < 1 {
return ret, ErrSDP
}
ret.Media = items[0]
return
}
// 例子见单元测试
func ParseARTPMap(s string) (ret ARTPMap, err error) {
// rfc 3640 3.3.1. General
// rfc 3640 3.3.6. High Bit-rate AAC
//
// a=rtpmap:<payload type> <encoding name>/<clock rate>[/<encoding parameters>]
//
items := strings.SplitN(s, ":", 2)
if len(items) != 2 {
err = ErrSDP
return
}
items = strings.SplitN(items[1], " ", 2)
if len(items) != 2 {
err = ErrSDP
return
}
ret.PayloadType, err = strconv.Atoi(items[0])
if err != nil {
return
}
items = strings.SplitN(items[1], "/", 3)
switch len(items) {
case 3:
ret.EncodingParameters = items[2]
fallthrough
case 2:
ret.EncodingName = items[0]
ret.ClockRate, err = strconv.Atoi(items[1])
if err != nil {
return
}
default:
err = ErrSDP
}
return
}
// 例子见单元测试
func ParseAFmtPBase(s string) (ret AFmtPBase, err error) {
// rfc 3640 4.4.1. The a=fmtp Keyword
//
// a=fmtp:<format> <parameter name>=<value>[; <parameter name>=<value>]
//
ret.Parameters = make(map[string]string)
items := strings.SplitN(s, ":", 2)
if len(items) != 2 {
err = ErrSDP
return
}
items = strings.SplitN(items[1], " ", 2)
if len(items) != 2 {
err = ErrSDP
return
}
ret.Format, err = strconv.Atoi(items[0])
if err != nil {
return
}
items = strings.Split(items[1], ";")
for _, pp := range items {
pp = strings.TrimSpace(pp)
kv := strings.SplitN(pp, "=", 2)
if len(kv) != 2 {
err = ErrSDP
return
}
ret.Parameters[kv[0]] = kv[1]
}
return
}
func ParseAControl(s string) (ret AControl, err error) {
if !strings.HasPrefix(s, "a=control:") {
err = ErrSDP
return
}
ret.Value = strings.TrimPrefix(s, "a=control:")
return
}
func ParseASC(a AFmtPBase) ([]byte, error) {
if a.Format != base.RTPPacketTypeAAC {
return nil, ErrSDP
}
v, ok := a.Parameters["config"]
if !ok {
return nil, ErrSDP
}
if len(v) < 4 || (len(v)%2) != 0 {
return nil, ErrSDP
}
l := len(v) / 2
r := make([]byte, l)
for i := 0; i < l; i++ {
b, err := strconv.ParseInt(v[i*2:i*2+2], 16, 0)
if err != nil {
return nil, ErrSDP
}
r[i] = uint8(b)
}
return r, nil
}
func ParseVPSSPSPPS(a AFmtPBase) (vps, sps, pps []byte, err error) {
v, ok := a.Parameters["sprop-vps"]
if !ok {
return nil, nil, nil, ErrSDP
}
if vps, err = base64.StdEncoding.DecodeString(v); err != nil {
return nil, nil, nil, err
}
v, ok = a.Parameters["sprop-sps"]
if !ok {
return nil, nil, nil, ErrSDP
}
if sps, err = base64.StdEncoding.DecodeString(v); err != nil {
return nil, nil, nil, err
}
v, ok = a.Parameters["sprop-pps"]
if !ok {
return nil, nil, nil, ErrSDP
}
if pps, err = base64.StdEncoding.DecodeString(v); err != nil {
return nil, nil, nil, err
}
return
}
// 解析AVC/H264的spspps
// 例子见单元测试
func ParseSPSPPS(a AFmtPBase) (sps, pps []byte, err error) {
v, ok := a.Parameters["sprop-parameter-sets"]
if !ok {
return nil, nil, ErrSDP
}
items := strings.SplitN(v, ",", 2)
if len(items) != 2 {
return nil, nil, ErrSDP
}
sps, err = base64.StdEncoding.DecodeString(items[0])
if err != nil {
return nil, nil, ErrSDP
}
pps, err = base64.StdEncoding.DecodeString(items[1])
return
}

@ -6,7 +6,7 @@
//
// Author: Chef (191201771@qq.com)
package sdp_test
package sdp
import (
"encoding/hex"
@ -15,8 +15,6 @@ import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/sdp"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/assert"
@ -48,13 +46,13 @@ var goldenPPS = []byte{
}
func TestParseSDP2RawContext(t *testing.T) {
sdpCtx, err := sdp.ParseSDP2RawContext([]byte(goldenSDP))
sdpCtx, err := ParseSDP2RawContext([]byte(goldenSDP))
assert.Equal(t, nil, err)
nazalog.Debugf("sdp=%+v", sdpCtx)
}
func TestParseARTPMap(t *testing.T) {
golden := map[string]sdp.ARTPMap{
golden := map[string]ARTPMap{
"rtpmap:96 H264/90000": {
PayloadType: 96,
EncodingName: "H264",
@ -75,14 +73,14 @@ func TestParseARTPMap(t *testing.T) {
},
}
for in, out := range golden {
actual, err := sdp.ParseARTPMap(in)
actual, err := ParseARTPMap(in)
assert.Equal(t, nil, err)
assert.Equal(t, out, actual)
}
}
func TestParseFmtPBase(t *testing.T) {
golden := map[string]sdp.AFmtPBase{
golden := map[string]AFmtPBase{
"a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=; profile-level-id=640020": {
Format: 96,
Parameters: map[string]string{
@ -112,7 +110,7 @@ func TestParseFmtPBase(t *testing.T) {
},
}
for in, out := range golden {
actual, err := sdp.ParseAFmtPBase(in)
actual, err := ParseAFmtPBase(in)
assert.Equal(t, nil, err)
assert.Equal(t, out, actual)
}
@ -120,9 +118,9 @@ func TestParseFmtPBase(t *testing.T) {
func TestParseSPSPPS(t *testing.T) {
s := "a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=; profile-level-id=640020"
f, err := sdp.ParseAFmtPBase(s)
f, err := ParseAFmtPBase(s)
assert.Equal(t, nil, err)
sps, pps, err := sdp.ParseSPSPPS(f)
sps, pps, err := ParseSPSPPS(&f)
assert.Equal(t, nil, err)
assert.Equal(t, goldenSPS, sps)
assert.Equal(t, goldenPPS, pps)
@ -130,9 +128,9 @@ func TestParseSPSPPS(t *testing.T) {
func TestParseASC(t *testing.T) {
s := "a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=1210"
f, err := sdp.ParseAFmtPBase(s)
f, err := ParseAFmtPBase(s)
assert.Equal(t, nil, err)
asc, err := sdp.ParseASC(f)
asc, err := ParseASC(&f)
assert.Equal(t, nil, err)
assert.Equal(t, []byte{0x12, 0x10}, asc)
}
@ -143,9 +141,9 @@ func TestParseASC(t *testing.T) {
//[]byte{0x44, 0x01, 0xc1, 0x72, 0xb4, 0x62, 0x40}
func TestParseVPSSPSPPS(t *testing.T) {
s := "a=fmtp:96 sprop-vps=QAEMAf//AWAAAAMAkAAAAwAAAwA/ugJA; sprop-sps=QgEBAWAAAAMAkAAAAwAAAwA/oAUCAXHy5bpKTC8BAQAAAwABAAADAA8I; sprop-pps=RAHAc8GJ"
f, err := sdp.ParseAFmtPBase(s)
f, err := ParseAFmtPBase(s)
assert.Equal(t, nil, err)
vps, sps, pps, err := sdp.ParseVPSSPSPPS(f)
vps, sps, pps, err := ParseVPSSPSPPS(&f)
assert.Equal(t, nil, err)
nazalog.Debugf("%s", hex.Dump(vps))
nazalog.Debugf("%s", hex.Dump(sps))
@ -153,18 +151,18 @@ func TestParseVPSSPSPPS(t *testing.T) {
}
func TestParseSDP2LogicContext(t *testing.T) {
ctx, err := sdp.ParseSDP2LogicContext([]byte(goldenSDP))
ctx, err := ParseSDP2LogicContext([]byte(goldenSDP))
assert.Equal(t, nil, err)
assert.Equal(t, true, ctx.HasAudio)
assert.Equal(t, true, ctx.HasVideo)
assert.Equal(t, true, ctx.hasAudio)
assert.Equal(t, true, ctx.hasVideo)
assert.Equal(t, 44100, ctx.AudioClockRate)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, 97, ctx.AudioPayloadTypeOrigin)
assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin)
assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType)
assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType)
assert.Equal(t, "streamid=1", ctx.AudioAControl)
assert.Equal(t, "streamid=0", ctx.VideoAControl)
assert.Equal(t, true, ctx.IsAudioPayloadTypeOrigin(97))
assert.Equal(t, true, ctx.IsVideoPayloadTypeOrigin(96))
assert.Equal(t, base.AVPacketPTAAC, ctx.GetAudioPayloadTypeBase())
assert.Equal(t, base.AVPacketPTAVC, ctx.GetVideoPayloadTypeBase())
assert.Equal(t, "streamid=1", ctx.audioAControl)
assert.Equal(t, "streamid=0", ctx.videoAControl)
assert.IsNotNil(t, ctx.ASC)
assert.Equal(t, nil, ctx.VPS)
assert.IsNotNil(t, ctx.SPS)
@ -188,14 +186,14 @@ a=rtpmap:98 H265/90000
a=fmtp:98 profile-id=1;sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBaoAWCAJBY2uSTL5A=;sprop-pps=RAHA8vA8kA==;sprop-vps=QAEMAf//AWAAAAMAsAAAAwAAAwBarAk=
a=recvonly`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := sdp.ParseSDP2LogicContext([]byte(golden))
ctx, err := ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
assert.Equal(t, false, ctx.HasAudio)
assert.Equal(t, true, ctx.HasVideo)
assert.Equal(t, false, ctx.hasAudio)
assert.Equal(t, true, ctx.hasVideo)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, 98, ctx.VideoPayloadTypeOrigin)
assert.Equal(t, base.AVPacketPTHEVC, ctx.VideoPayloadType)
assert.Equal(t, "trackID=0", ctx.VideoAControl)
assert.Equal(t, true, ctx.IsVideoPayloadTypeOrigin(98))
assert.Equal(t, base.AVPacketPTHEVC, ctx.GetVideoPayloadTypeBase())
assert.Equal(t, "trackID=0", ctx.videoAControl)
assert.Equal(t, nil, ctx.ASC)
assert.IsNotNil(t, ctx.VPS)
assert.IsNotNil(t, ctx.SPS)
@ -225,18 +223,18 @@ a=rtpmap:97 MPEG4-GENERIC/48000
a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1188
a=recvonly`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := sdp.ParseSDP2LogicContext([]byte(golden))
ctx, err := ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
assert.Equal(t, true, ctx.HasAudio)
assert.Equal(t, true, ctx.HasVideo)
assert.Equal(t, true, ctx.hasAudio)
assert.Equal(t, true, ctx.hasVideo)
assert.Equal(t, 48000, ctx.AudioClockRate)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, 97, ctx.AudioPayloadTypeOrigin)
assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin)
assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType)
assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType)
assert.Equal(t, "trackID=1", ctx.AudioAControl)
assert.Equal(t, "trackID=0", ctx.VideoAControl)
assert.Equal(t, true, ctx.IsAudioPayloadTypeOrigin(97))
assert.Equal(t, true, ctx.IsVideoPayloadTypeOrigin(96))
assert.Equal(t, base.AVPacketPTAAC, ctx.GetAudioPayloadTypeBase())
assert.Equal(t, base.AVPacketPTAVC, ctx.GetVideoPayloadTypeBase())
assert.Equal(t, "trackID=1", ctx.audioAControl)
assert.Equal(t, "trackID=0", ctx.videoAControl)
assert.IsNotNil(t, ctx.ASC)
assert.Equal(t, nil, ctx.VPS)
assert.IsNotNil(t, ctx.SPS)
@ -269,18 +267,18 @@ b=AS:10
a=Media_header:MEDIAINFO=494D4B48020100000400000111710110401F000000FA000000000000000000000000000000000000;
a=appversion:1.0`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := sdp.ParseSDP2LogicContext([]byte(golden))
ctx, err := ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
assert.Equal(t, true, ctx.HasAudio)
assert.Equal(t, true, ctx.HasVideo)
assert.Equal(t, true, ctx.hasAudio)
assert.Equal(t, true, ctx.hasVideo)
assert.Equal(t, 8000, ctx.AudioClockRate)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, 8, ctx.AudioPayloadTypeOrigin)
assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin)
assert.Equal(t, true, ctx.IsAudioPayloadTypeOrigin(8))
assert.Equal(t, true, ctx.IsVideoPayloadTypeOrigin(96))
//assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType)
assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType)
assert.Equal(t, "trackID=audio", ctx.AudioAControl)
assert.Equal(t, "trackID=video", ctx.VideoAControl)
assert.Equal(t, base.AVPacketPTAVC, ctx.GetVideoPayloadTypeBase())
assert.Equal(t, "trackID=audio", ctx.audioAControl)
assert.Equal(t, "trackID=video", ctx.videoAControl)
assert.Equal(t, nil, ctx.ASC)
assert.Equal(t, nil, ctx.VPS)
assert.IsNotNil(t, ctx.SPS)
@ -305,16 +303,60 @@ a=rtpmap:107 vnd.onvif.metadata/90000
a=fmtp:107 DecoderTag=h3c-v3 RTCP=0
a=recvonly`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := sdp.ParseSDP2LogicContext([]byte(golden))
ctx, err := ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
assert.Equal(t, false, ctx.HasAudio)
assert.Equal(t, true, ctx.HasVideo)
assert.Equal(t, false, ctx.hasAudio)
assert.Equal(t, true, ctx.hasVideo)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, 105, ctx.VideoPayloadTypeOrigin)
assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType)
assert.Equal(t, "rtsp://192.168.0.221/media/video1/video", ctx.VideoAControl)
assert.Equal(t, true, ctx.IsVideoPayloadTypeOrigin(105))
assert.Equal(t, base.AVPacketPTAVC, ctx.GetVideoPayloadTypeBase())
assert.Equal(t, "rtsp://192.168.0.221/media/video1/video", ctx.videoAControl)
assert.Equal(t, nil, ctx.VPS)
assert.IsNotNil(t, ctx.SPS)
assert.IsNotNil(t, ctx.PPS)
nazalog.Debugf("%+v", ctx)
}
func TestCase6(t *testing.T) {
golden := `v=0
o=- 1109162014219182 0 IN IP4 0.0.0.0
s=HIK Media Server V3.4.96
i=HIK Media Server Session Description : standard
e=NONE
c=IN IP4 0.0.0.0
t=0 0
a=control:*
b=AS:2058
a=range:npt=now-
m=video 0 RTP/AVP 96
i=Video Media
a=rtpmap:96 H265/90000
a=control:trackID=video
b=AS:2048
m=audio 0 RTP/AVP 8
i=Audio Media
a=rtpmap:8 PCMA/8000
a=control:trackID=audio
b=AS:10
a=Media_header:MEDIAINFO=494D4B48020100000400050011710110401F000000FA000000000000000000000000000000000000;
a=appversion:1.0`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
assert.Equal(t, 8000, ctx.AudioClockRate)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, base.AVPacketPTUnknown, ctx.audioPayloadTypeBase)
assert.Equal(t, base.AVPacketPTHEVC, ctx.videoPayloadTypeBase)
assert.Equal(t, 8, ctx.audioPayloadTypeOrigin)
assert.Equal(t, 96, ctx.videoPayloadTypeOrigin)
assert.Equal(t, "trackID=audio", ctx.audioAControl)
assert.Equal(t, "trackID=video", ctx.videoAControl)
assert.Equal(t, nil, ctx.ASC)
assert.Equal(t, nil, ctx.VPS)
assert.Equal(t, nil, ctx.SPS)
assert.Equal(t, nil, ctx.PPS)
assert.Equal(t, true, ctx.hasAudio)
assert.Equal(t, true, ctx.hasVideo)
nazalog.Debugf("%+v", ctx)
}

Loading…
Cancel
Save