messages:

- [feat] rtsp,pull支持interleaved(over tcp)模式。demo/pullrtsp2pushrtmp和demo/pullrtsp可通过运行参数选择是否使用over tcp模式
- [feat] rtsp,pull支持auth digest验证
- [feat] rtsp,pull支持定时发送GET_PARAMETER rtsp message进行保活(对端支持的情况下)
- [fix] rtsp,输入流音频不是AAC格式时,保证视频流可正常remux成其他封装协议
- [fix] rtsp,pull开始时发送dummy rtp/rtcp数据,保证对端能成功发送数据至本地
- [fix] rtsp,将以下包返回给上层:rtsp pub h265, single rtp packet, VPS, SPS, PPS, SEI
- [fix] rtsp,修改rtsp.AVPacketQueue的行为:当音频或者视频数量队列满了后,直接出队而不是丢弃
- [fix] sdp,修复解析及使用sdp错误的一些case
- [refactor] rtmp/httpflv/rtsp,统一所有PullSession:超时形式;Pull和Wait函数
- [fix] avc,尝试解析scaling matrix
pull/44/head
q191201771 4 years ago
parent 3ca25a22d2
commit 942b5da52d

@ -121,6 +121,7 @@ func main() {
nazalog.Debugf("%+v", buf.String())
}
case httpflv.TagTypeAudio:
nazalog.Debugf("header=%+v, %+v", tag.Header, tag.IsAACSeqHeader())
brAudio.Add(len(tag.Raw))
if tag.IsAACSeqHeader() {

@ -73,8 +73,7 @@ func pull(url string, filename string) {
}
session := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.ConnectTimeoutMS = 3000
option.PullTimeoutMS = 3000
option.PullTimeoutMS = 30000
option.ReadAVTimeoutMS = 10000
})
@ -89,7 +88,7 @@ func pull(url string, filename string) {
}
})
nazalog.Assert(nil, err)
err = <-session.Done()
err = <-session.Wait()
nazalog.Debug(err)
}

@ -48,7 +48,7 @@ func main() {
nazalog.Errorf("pull error. err=%+v", err)
os.Exit(-1)
}
err = <-pullSession.Done()
err = <-pullSession.Wait()
nazalog.Errorf("pull error. err=%+v", err)
}

@ -12,6 +12,7 @@ import (
"flag"
"fmt"
"os"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/httpflv"
@ -31,14 +32,18 @@ func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) {
}
func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) {
metadata, vsh, ash, err := remux.AVConfig2FLVTag(asc, vps, sps, pps)
metadata, ash, vsh, err := remux.AVConfig2FLVTag(asc, vps, sps, pps)
nazalog.Assert(nil, err)
err = w.WriteTag(*metadata)
nazalog.Assert(nil, err)
err = w.WriteTag(*vsh)
nazalog.Assert(nil, err)
err = w.WriteTag(*ash)
nazalog.Assert(nil, err)
if ash != nil {
err = w.WriteTag(*ash)
nazalog.Assert(nil, err)
}
if vsh != nil {
err = w.WriteTag(*vsh)
nazalog.Assert(nil, err)
}
}
func (o *Observer) OnAVPacket(pkt base.AVPacket) {
@ -49,7 +54,11 @@ func (o *Observer) OnAVPacket(pkt base.AVPacket) {
}
func main() {
inURL, outFilename := parseFlag()
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
})
inURL, outFilename, overTCP := parseFlag()
err := w.Open(outFilename)
nazalog.Assert(nil, err)
defer w.Dispose()
@ -57,23 +66,38 @@ func main() {
nazalog.Assert(nil, err)
o := &Observer{}
s := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) {
rtspPullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMS = 5000
option.OverTCP = overTCP != 0
})
err = s.Pull(inURL)
nazalog.Error(err)
go func() {
for {
rtspPullSession.UpdateStat(1)
rtspStat := rtspPullSession.GetStat()
nazalog.Debugf("bitrate. rtsp pull=%dkbit/s", rtspStat.Bitrate)
time.Sleep(1 * time.Second)
}
}()
err = rtspPullSession.Pull(inURL)
nazalog.Assert(nil, err)
err = <-rtspPullSession.Wait()
nazalog.Infof("done. err=%+v", err)
}
func parseFlag() (inURL string, outFilename string) {
func parseFlag() (inURL string, outFilename string, overTCP int) {
i := flag.String("i", "", "specify pull rtsp url")
o := flag.String("o", "", "specify ouput flv file")
t := flag.Int("t", 0, "specify interleaved mode(rtp/rtcp over tcp)")
flag.Parse()
if *i == "" || *o == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o out.flv
./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o out.flv -t 0
./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o out.flv -t 1
`)
os.Exit(1)
}
return *i, *o
return *i, *o, *t
}

@ -33,14 +33,18 @@ func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) {
}
func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) {
metadata, vsh, ash, err := remux.AVConfig2RTMPMsg(asc, vps, sps, pps)
metadata, ash, vsh, err := remux.AVConfig2RTMPMsg(asc, vps, sps, pps)
nazalog.Assert(nil, err)
err = rtmpPushSession.AsyncWrite(rtmp.Message2Chunks(metadata.Payload, &metadata.Header))
nazalog.Assert(nil, err)
err = rtmpPushSession.AsyncWrite(rtmp.Message2Chunks(vsh.Payload, &vsh.Header))
nazalog.Assert(nil, err)
err = rtmpPushSession.AsyncWrite(rtmp.Message2Chunks(ash.Payload, &ash.Header))
nazalog.Assert(nil, err)
if ash != nil {
err = rtmpPushSession.AsyncWrite(rtmp.Message2Chunks(ash.Payload, &ash.Header))
nazalog.Assert(nil, err)
}
if vsh != nil {
err = rtmpPushSession.AsyncWrite(rtmp.Message2Chunks(vsh.Payload, &vsh.Header))
nazalog.Assert(nil, err)
}
}
func (o *Observer) OnAVPacket(pkt base.AVPacket) {
@ -55,10 +59,9 @@ func main() {
option.AssertBehavior = nazalog.AssertFatal
})
inURL, outURL := parseFlag()
inURL, outURL, overTCP := parseFlag()
rtmpPushSession = rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.ConnectTimeoutMS = 5000
option.PushTimeoutMS = 5000
option.WriteAVTimeoutMS = 5000
})
@ -67,13 +70,15 @@ func main() {
nazalog.Assert(nil, err)
go func() {
err := <-rtmpPushSession.Done()
nazalog.Assert(nil, err)
err := <-rtmpPushSession.Wait()
nazalog.Infof("push rtmp done. err=%+v", err)
os.Exit(1)
}()
o := &Observer{}
rtspPullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMS = 5000
option.OverTCP = overTCP != 0
})
go func() {
@ -89,18 +94,22 @@ func main() {
err = rtspPullSession.Pull(inURL)
nazalog.Assert(nil, err)
err = <-rtspPullSession.Wait()
nazalog.Infof("pull rtsp done. err=%+v", err)
}
func parseFlag() (inURL string, outFilename string) {
func parseFlag() (inURL string, outFilename string, overTCP int) {
i := flag.String("i", "", "specify pull rtsp url")
o := flag.String("o", "", "specify push rtmp url")
t := flag.Int("t", 0, "specify interleaved mode(rtp/rtcp over tcp)")
flag.Parse()
if *i == "" || *o == "" {
flag.Usage()
_, _ = fmt.Fprintf(os.Stderr, `Example:
./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o rtmp://localhost:19350/live/test220
./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o rtmp://localhost:19350/live/test220 -t 0
./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o rtmp://localhost:19350/live/test220 -t 1
`)
os.Exit(1)
}
return *i, *o
return *i, *o, *t
}

@ -131,7 +131,6 @@ func push(tags []httpflv.Tag, urls []string, isRecursive bool) {
for i := range urls {
ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.ConnectTimeoutMS = 3000
option.PushTimeoutMS = 5000
option.WriteAVTimeoutMS = 10000
})

@ -62,9 +62,9 @@ func (a *ADTS) InitWithAACAudioSpecificConfig(asc []byte) error {
// <1.6.3.3 samplingFrequencyIndex>, <page 35/110>
// <1.6.3.4 channelConfiguration>
// --------------------------------------------------------
// audio object type [5b] 2=AAC LC
// samplingFrequencyIndex [4b] 3=48000 4=44100
// channelConfiguration [4b] 2=left, right front speakers
// audio object type [5b] 1=AAC MAIN 2=AAC LC
// samplingFrequencyIndex [4b] 3=48000 4=44100 6=24000 5=32000 11=11025
// channelConfiguration [4b] 1=center front speaker 2=left, right front speakers
br := nazabits.NewBitReader(asc)
a.audioObjectType, _ = br.ReadBits8(5)
a.samplingFrequencyIndex, _ = br.ReadBits8(4)

@ -425,8 +425,12 @@ func ParseSPS(payload []byte) (Context, error) {
return Context{}, err
}
if flag == 1 {
nazalog.Debugf("scaling matrix present, not impl yet.")
return Context{}, ErrAVC
nazalog.Debugf("scaling matrix present.")
// TODO chef: 还没有正确实现只是针对特定case做了处理
_, err = br.ReadBits32(128)
if err != nil {
return Context{}, err
}
}
} else {
sps.ChromaFormatIdc = 1
@ -438,10 +442,14 @@ func ParseSPS(payload []byte) (Context, error) {
if err != nil {
return Context{}, err
}
if sps.Log2MaxFrameNumMinus4 > 12 {
return Context{}, ErrAVC
}
sps.PicOrderCntType, err = br.ReadGolomb()
if err != nil {
return Context{}, err
}
if sps.PicOrderCntType == 0 {
sps.Log2MaxPicOrderCntLsb, err = br.ReadGolomb()
sps.Log2MaxPicOrderCntLsb += 4
@ -579,3 +587,134 @@ func TryParseSeqHeader(payload []byte) error {
return err
}
//var defaultScaling4 = [][]uint8{
// {
// 6, 13, 20, 28, 13, 20, 28, 32,
// 20, 28, 32, 37, 28, 32, 37, 42,
// },
// {
// 10, 14, 20, 24, 14, 20, 24, 27,
// 20, 24, 27, 30, 24, 27, 30, 34,
// },
//}
//
//var defaultScaling8 = [][]uint8{
// {
// 6, 10, 13, 16, 18, 23, 25, 27,
// 10, 11, 16, 18, 23, 25, 27, 29,
// 13, 16, 18, 23, 25, 27, 29, 31,
// 16, 18, 23, 25, 27, 29, 31, 33,
// 18, 23, 25, 27, 29, 31, 33, 36,
// 23, 25, 27, 29, 31, 33, 36, 38,
// 25, 27, 29, 31, 33, 36, 38, 40,
// 27, 29, 31, 33, 36, 38, 40, 42,
// },
// {
// 9, 13, 15, 17, 19, 21, 22, 24,
// 13, 13, 17, 19, 21, 22, 24, 25,
// 15, 17, 19, 21, 22, 24, 25, 27,
// 17, 19, 21, 22, 24, 25, 27, 28,
// 19, 21, 22, 24, 25, 27, 28, 30,
// 21, 22, 24, 25, 27, 28, 30, 32,
// 22, 24, 25, 27, 28, 30, 32, 33,
// 24, 25, 27, 28, 30, 32, 33, 35,
// },
//}
//
//var ffZigzagDirect = []uint8{
// 0, 1, 8, 16, 9, 2, 3, 10,
// 17, 24, 32, 25, 18, 11, 4, 5,
// 12, 19, 26, 33, 40, 48, 41, 34,
// 27, 20, 13, 6, 7, 14, 21, 28,
// 35, 42, 49, 56, 57, 50, 43, 36,
// 29, 22, 15, 23, 30, 37, 44, 51,
// 58, 59, 52, 45, 38, 31, 39, 46,
// 53, 60, 61, 54, 47, 55, 62, 63,
//}
//
//var ffZigzagScan = []uint8{
// 0 + 0*4, 1 + 0*4, 0 + 1*4, 0 + 2*4,
// 1 + 1*4, 2 + 0*4, 3 + 0*4, 2 + 1*4,
// 1 + 2*4, 0 + 3*4, 1 + 3*4, 2 + 2*4,
// 3 + 1*4, 3 + 2*4, 2 + 3*4, 3 + 3*4,
//}
//
//func decodeScalingMatrices(reader *nazabits.BitReader) error {
// // 6 * 16
// var spsScalingMatrix4 = [][]uint8{
// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
// }
// // 6 * 64
// var spsScalingMatrix8 = [][]uint8{
// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
// }
//
// fallback := [][]uint8{defaultScaling4[0], defaultScaling4[1], defaultScaling8[0], defaultScaling8[1]}
// decodeScalingList(reader, spsScalingMatrix4[0], 16, defaultScaling4[0], fallback[0])
// decodeScalingList(reader, spsScalingMatrix4[1], 16, defaultScaling4[0], spsScalingMatrix4[0])
// decodeScalingList(reader, spsScalingMatrix4[2], 16, defaultScaling4[0], spsScalingMatrix4[1])
// decodeScalingList(reader, spsScalingMatrix4[3], 16, defaultScaling4[1], fallback[1])
// decodeScalingList(reader, spsScalingMatrix4[4], 16, defaultScaling4[1], spsScalingMatrix4[3])
// decodeScalingList(reader, spsScalingMatrix4[4], 16, defaultScaling4[1], spsScalingMatrix4[3])
//
// decodeScalingList(reader, spsScalingMatrix8[0], 64, defaultScaling8[0], fallback[2])
// decodeScalingList(reader, spsScalingMatrix8[3], 64, defaultScaling8[1], fallback[3])
//
// return nil
//}
//
//func decodeScalingList(reader *nazabits.BitReader, factors []uint8, size int, jvtList []uint8, fallbackList []uint8) error {
// var (
// i = 0
// last = 8
// next = 8
// scan []uint8
// )
// if size == 16 {
// scan = ffZigzagScan
// } else {
// scan = ffZigzagDirect
// }
// flag, err := reader.ReadBit()
// if err != nil {
// return err
// }
// return nil
// if flag == 0 {
// for n := 0; n < size; n++ {
// factors[n] = fallbackList[n]
// }
// } else {
// for i = 0; i < size; i++ {
// if next != 0 {
// v, err := reader.ReadGolomb()
// if err != nil {
// return err
// }
// next = (last + int(v)) & 0xff
// }
// if i == 0 && next == 0 {
// for n := 0; n < size; n++ {
// factors[n] = jvtList[n]
// }
// break
// }
// if next != 0 {
// factors[scan[i]] = uint8(next)
// last = next
// } else {
// factors[scan[i]] = uint8(last)
// }
// }
// }
// return nil
//}

@ -134,3 +134,13 @@ func TestCorner(t *testing.T) {
assert.Equal(t, nil, b.Bytes())
assert.Equal(t, avc.ErrAVC, err)
}
func TestParsePPS_Case2(t *testing.T) {
in := []byte{0x67, 0x64, 0x00, 0x20, 0xad, 0x84, 0x01, 0x0c, 0x20, 0x08, 0x61, 0x00, 0x43, 0x08, 0x02, 0x18, 0x40, 0x10, 0xc2, 0x00, 0x84, 0x3b, 0x50, 0x28, 0x03, 0xcd, 0x37, 0x01, 0x01, 0x01, 0x40, 0x00, 0x00, 0x03, 0x00, 0x40, 0x00, 0x00, 0x0c, 0xa1}
ctx, err := avc.ParseSPS(in)
assert.Equal(t, nil, err)
assert.Equal(t, uint8(100), ctx.Profile)
assert.Equal(t, uint8(32), ctx.Level)
assert.Equal(t, uint32(1280), ctx.Width)
assert.Equal(t, uint32(960), ctx.Height)
}

@ -11,9 +11,10 @@ package base
type AVPacketPT int
const (
AVPacketPTAVC AVPacketPT = RTPPacketTypeAVCOrHEVC
AVPacketPTAAC AVPacketPT = RTPPacketTypeAAC
AVPacketPTHEVC AVPacketPT = 98
AVPacketPTUnknown AVPacketPT = -1
AVPacketPTAVC AVPacketPT = RTPPacketTypeAVCOrHEVC
AVPacketPTHEVC AVPacketPT = RTPPacketTypeHEVC
AVPacketPTAAC AVPacketPT = RTPPacketTypeAAC
)
// 目前供package rtsp使用。以后可能被多个package使用。

@ -9,7 +9,11 @@
package base
const (
// 注意AVC和HEVC都可能使用96所以不能直接通过96判断是AVC还是HEVC
// 注意一般情况下AVC使用96AAC使用97HEVC使用98
// 但是我还遇到过:
// HEVC使用96
// AVC使用105
RTPPacketTypeAVCOrHEVC = 96
RTPPacketTypeAAC = 97
RTPPacketTypeHEVC = 98
)

@ -74,12 +74,15 @@ type ISessionURLContext interface {
// | UpdateStat()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | |
// | IsAlive()<all> | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | |
// | RunLoop() | √ | x√ | √ | √ | √ | x | - | x | x | x | x | |
// | RunLoop() | √ | x√ | √ | √ | √ | x&√ | - | x | x | x | x | |
// | Dispose() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | |
// | RemoteAddr() | √ | x | √ | √ | x | x | - | x | x | x | x | |
// | SingleConn | √ | x | √ | √ | √ | √ | - | √ | √ | √ | x | |
//
// | Opt.PullTimeoutMS | - | - | - | - | - | - | - | - | x | √ | √ | |
// | Wait() | - | - | - | - | - | - | - | - | √ | √ | √ | |
//
// Dispose由外部调用表示主动关闭正常的session
// 外部调用Dispose后不应继续使用该session
// Dispose后RunLoop结束阻塞

@ -19,6 +19,8 @@ import (
// 见单元测试
// TODO chef: 考虑部分内容移入naza中
var ErrURL = errors.New("lal.url: fxxk")
const (
@ -37,6 +39,8 @@ type URLPathContext struct {
type URLContext struct {
Scheme string
Username string
Password string
StdHost string // host or host:port
HostWithPort string
Host string
@ -48,6 +52,8 @@ type URLContext struct {
PathWithoutLastItem string // 注意,没有前面的'/',也没有后面的'/'
LastItemOfPath string // 注意,没有前面的'/'
RawQuery string
RawURLWithoutUserInfo string
}
func ParseURLPath(path string) (ctx URLPathContext, err error) {
@ -69,6 +75,8 @@ func ParseURL(rawURL string, defaultPort int) (ctx URLContext, err error) {
ctx.Scheme = stdURL.Scheme
ctx.StdHost = stdURL.Host
ctx.Username = stdURL.User.Username()
ctx.Password, _ = stdURL.User.Password()
h, p, err := net.SplitHostPort(stdURL.Host)
if err != nil {
@ -102,6 +110,8 @@ func ParseURL(rawURL string, defaultPort int) (ctx URLContext, err error) {
ctx.PathWithoutLastItem = pathCtx.PathWithoutLastItem
ctx.LastItemOfPath = pathCtx.LastItemOfPath
ctx.RawQuery = pathCtx.RawQuery
ctx.RawURLWithoutUserInfo = fmt.Sprintf("%s://%s%s", ctx.Scheme, ctx.StdHost, ctx.PathWithRawQuery)
return ctx, nil
}

@ -30,133 +30,143 @@ func TestParseURL(t *testing.T) {
golden := map[in]base.URLContext{
// 常见urlurl中无端口另外设置默认端口
in{rawURL: "rtmp://127.0.0.1/live/test110", defaultPort: 1935}: {
Scheme: "rtmp",
StdHost: "127.0.0.1",
HostWithPort: "127.0.0.1:1935",
Host: "127.0.0.1",
Port: 1935,
PathWithRawQuery: "/live/test110",
Path: "/live/test110",
PathWithoutLastItem: "live",
LastItemOfPath: "test110",
RawQuery: "",
Scheme: "rtmp",
StdHost: "127.0.0.1",
HostWithPort: "127.0.0.1:1935",
Host: "127.0.0.1",
Port: 1935,
PathWithRawQuery: "/live/test110",
Path: "/live/test110",
PathWithoutLastItem: "live",
LastItemOfPath: "test110",
RawQuery: "",
RawURLWithoutUserInfo: "rtmp://127.0.0.1/live/test110",
},
// 域名url
in{rawURL: "rtmp://localhost/live/test110", defaultPort: 1935}: {
Scheme: "rtmp",
StdHost: "localhost",
HostWithPort: "localhost:1935",
Host: "localhost",
Port: 1935,
PathWithRawQuery: "/live/test110",
Path: "/live/test110",
PathWithoutLastItem: "live",
LastItemOfPath: "test110",
RawQuery: "",
Scheme: "rtmp",
StdHost: "localhost",
HostWithPort: "localhost:1935",
Host: "localhost",
Port: 1935,
PathWithRawQuery: "/live/test110",
Path: "/live/test110",
PathWithoutLastItem: "live",
LastItemOfPath: "test110",
RawQuery: "",
RawURLWithoutUserInfo: "rtmp://localhost/live/test110",
},
// 带参数url
in{rawURL: "rtmp://127.0.0.1/live/test110?a=1", defaultPort: 1935}: {
Scheme: "rtmp",
StdHost: "127.0.0.1",
HostWithPort: "127.0.0.1:1935",
Host: "127.0.0.1",
Port: 1935,
PathWithRawQuery: "/live/test110?a=1",
Path: "/live/test110",
PathWithoutLastItem: "live",
LastItemOfPath: "test110",
RawQuery: "a=1",
Scheme: "rtmp",
StdHost: "127.0.0.1",
HostWithPort: "127.0.0.1:1935",
Host: "127.0.0.1",
Port: 1935,
PathWithRawQuery: "/live/test110?a=1",
Path: "/live/test110",
PathWithoutLastItem: "live",
LastItemOfPath: "test110",
RawQuery: "a=1",
RawURLWithoutUserInfo: "rtmp://127.0.0.1/live/test110?a=1",
},
// path多级
in{rawURL: "rtmp://127.0.0.1:19350/a/b/test110", defaultPort: 1935}: {
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "/a/b/test110",
Path: "/a/b/test110",
PathWithoutLastItem: "a/b",
LastItemOfPath: "test110",
RawQuery: "",
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "/a/b/test110",
Path: "/a/b/test110",
PathWithoutLastItem: "a/b",
LastItemOfPath: "test110",
RawQuery: "",
RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350/a/b/test110",
},
// url中无端口没有设置默认端口
in{rawURL: "rtmp://127.0.0.1/live/test110?a=1", defaultPort: -1}: {
Scheme: "rtmp",
StdHost: "127.0.0.1",
HostWithPort: "127.0.0.1",
Host: "127.0.0.1",
Port: 0,
PathWithRawQuery: "/live/test110?a=1",
Path: "/live/test110",
PathWithoutLastItem: "live",
LastItemOfPath: "test110",
RawQuery: "a=1",
Scheme: "rtmp",
StdHost: "127.0.0.1",
HostWithPort: "127.0.0.1",
Host: "127.0.0.1",
Port: 0,
PathWithRawQuery: "/live/test110?a=1",
Path: "/live/test110",
PathWithoutLastItem: "live",
LastItemOfPath: "test110",
RawQuery: "a=1",
RawURLWithoutUserInfo: "rtmp://127.0.0.1/live/test110?a=1",
},
// url中有端口设置默认端口
in{rawURL: "rtmp://127.0.0.1:19350/live/test110?a=1", defaultPort: 1935}: {
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "/live/test110?a=1",
Path: "/live/test110",
PathWithoutLastItem: "live",
LastItemOfPath: "test110",
RawQuery: "a=1",
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "/live/test110?a=1",
Path: "/live/test110",
PathWithoutLastItem: "live",
LastItemOfPath: "test110",
RawQuery: "a=1",
RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350/live/test110?a=1",
},
// 无path
in{rawURL: "rtmp://127.0.0.1:19350", defaultPort: 1935}: {
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "",
Path: "",
PathWithoutLastItem: "",
LastItemOfPath: "",
RawQuery: "",
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "",
Path: "",
PathWithoutLastItem: "",
LastItemOfPath: "",
RawQuery: "",
RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350",
},
// 无path2
in{rawURL: "rtmp://127.0.0.1:19350/", defaultPort: 1935}: {
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "/",
Path: "/",
PathWithoutLastItem: "",
LastItemOfPath: "",
RawQuery: "",
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "/",
Path: "/",
PathWithoutLastItem: "",
LastItemOfPath: "",
RawQuery: "",
RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350/",
},
// path不完整
in{rawURL: "rtmp://127.0.0.1:19350/live", defaultPort: 1935}: {
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "/live",
Path: "/live",
PathWithoutLastItem: "",
LastItemOfPath: "live",
RawQuery: "",
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "/live",
Path: "/live",
PathWithoutLastItem: "",
LastItemOfPath: "live",
RawQuery: "",
RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350/live",
},
// path不完整2
in{rawURL: "rtmp://127.0.0.1:19350/live/", defaultPort: 1935}: {
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "/live/",
Path: "/live/",
PathWithoutLastItem: "live",
LastItemOfPath: "",
RawQuery: "",
Scheme: "rtmp",
StdHost: "127.0.0.1:19350",
HostWithPort: "127.0.0.1:19350",
Host: "127.0.0.1",
Port: 19350,
PathWithRawQuery: "/live/",
Path: "/live/",
PathWithoutLastItem: "live",
LastItemOfPath: "",
RawQuery: "",
RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350/live/",
},
}
@ -171,16 +181,17 @@ func TestParseRTMPURL(t *testing.T) {
golden := map[string]base.URLContext{
// 其他测试见ParseURL
"rtmp://127.0.0.1/test110": {
Scheme: "rtmp",
StdHost: "127.0.0.1",
HostWithPort: "127.0.0.1:1935",
Host: "127.0.0.1",
Port: 1935,
PathWithRawQuery: "/test110",
Path: "/test110",
PathWithoutLastItem: "test110",
LastItemOfPath: "",
RawQuery: "",
Scheme: "rtmp",
StdHost: "127.0.0.1",
HostWithPort: "127.0.0.1:1935",
Host: "127.0.0.1",
Port: 1935,
PathWithRawQuery: "/test110",
Path: "/test110",
PathWithoutLastItem: "test110",
LastItemOfPath: "",
RawQuery: "",
RawURLWithoutUserInfo: "rtmp://127.0.0.1/test110",
},
}
for k, v := range golden {
@ -189,3 +200,29 @@ func TestParseRTMPURL(t *testing.T) {
assert.Equal(t, v, ctx, k)
}
}
func TestParseRTSPURL(t *testing.T) {
golden := map[string]base.URLContext{
// 其他测试见ParseURL
"rtsp://admin:P!@1988@127.0.0.1:5554/h264/ch33/main/av_stream": {
Scheme: "rtsp",
Username: "admin",
Password: "P!@1988",
StdHost: "127.0.0.1:5554",
HostWithPort: "127.0.0.1:5554",
Host: "127.0.0.1",
Port: 5554,
PathWithRawQuery: "/h264/ch33/main/av_stream",
Path: "/h264/ch33/main/av_stream",
PathWithoutLastItem: "h264/ch33/main",
LastItemOfPath: "av_stream",
RawQuery: "",
RawURLWithoutUserInfo: "rtsp://127.0.0.1:5554/h264/ch33/main/av_stream",
},
}
for k, v := range golden {
ctx, err := base.ParseRTSPURL(k)
assert.Equal(t, nil, err)
assert.Equal(t, v, ctx, k)
}
}

@ -9,6 +9,7 @@
package httpflv
import (
"context"
"fmt"
"net"
"time"
@ -22,13 +23,16 @@ import (
)
type PullSessionOption struct {
ConnectTimeoutMS int // TCP连接时超时单位毫秒如果为0则不设置超时
ReadTimeoutMS int // 接收数据超时单位毫秒如果为0则不设置超时
// 从调用Pull函数到接收音视频数据的前一步也即发送完HTTP请求的超时时间
// 如果为0则没有超时时间
PullTimeoutMS int
ReadTimeoutMS int // 接收数据超时单位毫秒如果为0则不设置超时
}
var defaultPullSessionOption = PullSessionOption{
ConnectTimeoutMS: 0,
ReadTimeoutMS: 0,
PullTimeoutMS: 10000,
ReadTimeoutMS: 0,
}
type PullSession struct {
@ -41,6 +45,8 @@ type PullSession struct {
stat base.StatSession
urlCtx base.URLContext
waitErrChan chan error
}
type ModPullSessionOption func(option *PullSessionOption)
@ -53,8 +59,9 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
uk := base.GenUniqueKey(base.UKPFLVPullSession)
s := &PullSession{
option: option,
UniqueKey: uk,
UniqueKey: uk,
option: option,
waitErrChan: make(chan error, 1),
}
nazalog.Infof("[%s] lifecycle new httpflv PullSession. session=%p", uk, s)
return s
@ -62,22 +69,59 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
type OnReadFLVTag func(tag Tag)
// 阻塞直到拉流失败
// 如果没有错误发生阻塞直到接收音视频数据的前一步也即发送完HTTP请求
//
// @param rawURL 支持如下两种格式。(当然,关键点是对端支持)
// http://{domain}/{app_name}/{stream_name}.flv
// http://{ip}/{domain}/{app_name}/{stream_name}.flv
// http://{domain}/{app_name}/{stream_name}.flv
// http://{ip}/{domain}/{app_name}/{stream_name}.flv
//
// @param onReadFLVTag 读取到 flv tag 数据时回调。回调结束后PullSession 不会再使用这块 <tag> 数据。
func (session *PullSession) Pull(rawURL string, onReadFLVTag OnReadFLVTag) error {
if err := session.connect(rawURL); err != nil {
return err
var (
ctx context.Context
cancel context.CancelFunc
)
if session.option.PullTimeoutMS == 0 {
ctx, cancel = context.WithCancel(context.Background())
} else {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(session.option.PullTimeoutMS)*time.Millisecond)
}
if err := session.writeHTTPRequest(); err != nil {
return err
defer cancel()
return session.pullContext(ctx, rawURL, onReadFLVTag)
}
// Pull成功后调用该函数可阻塞直到拉流结束
func (session *PullSession) Wait() <-chan error {
return session.waitErrChan
}
func (session *PullSession) pullContext(ctx context.Context, rawURL string, onReadFLVTag OnReadFLVTag) error {
errChan := make(chan error, 1)
go func() {
if err := session.connect(rawURL); err != nil {
errChan <- err
return
}
if err := session.writeHTTPRequest(); err != nil {
errChan <- err
return
}
errChan <- nil
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errChan:
if err != nil {
return err
}
}
return session.runReadLoop(onReadFLVTag)
go session.runReadLoop(onReadFLVTag)
return nil
}
func (session *PullSession) Dispose() {
@ -137,7 +181,7 @@ func (session *PullSession) connect(rawURL string) (err error) {
nazalog.Debugf("[%s] > tcp connect.", session.UniqueKey)
// # 建立连接
conn, err := net.DialTimeout("tcp", session.urlCtx.HostWithPort, time.Duration(session.option.ConnectTimeoutMS)*time.Millisecond)
conn, err := net.Dial("tcp", session.urlCtx.HostWithPort)
if err != nil {
return err
}
@ -188,19 +232,22 @@ func (session *PullSession) readTag() (Tag, error) {
return readTag(session.conn)
}
func (session *PullSession) runReadLoop(onReadFLVTag OnReadFLVTag) error {
func (session *PullSession) runReadLoop(onReadFLVTag OnReadFLVTag) {
if _, _, err := session.readHTTPRespHeader(); err != nil {
return err
session.waitErrChan <- err
return
}
if _, err := session.readFLVHeader(); err != nil {
return err
session.waitErrChan <- err
return
}
for {
tag, err := session.readTag()
if err != nil {
return err
session.waitErrChan <- err
return
}
onReadFLVTag(tag)
}

@ -114,7 +114,7 @@ func InnerTestEntry(t *testing.T) {
if err != nil {
nazalog.Error(err)
}
err = <-rtmpPullSession.Done()
err = <-rtmpPullSession.Wait()
nazalog.Debug(err)
}()

@ -848,7 +848,6 @@ func (group *Group) pullIfNeeded() {
go func() {
pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.ConnectTimeoutMS = relayPullConnectTimeoutMS
option.PullTimeoutMS = relayPullTimeoutMS
option.ReadAVTimeoutMS = relayPullReadAVTimeoutMS
})
@ -860,7 +859,7 @@ func (group *Group) pullIfNeeded() {
}
res := group.AddRTMPPullSession(pullSession)
if res {
err = <-pullSession.Done()
err = <-pullSession.Wait()
nazalog.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err)
group.DelRTMPPullSession(pullSession)
} else {
@ -901,7 +900,6 @@ func (group *Group) pushIfNeeded() {
go func(u, u2 string) {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.ConnectTimeoutMS = relayPushConnectTimeoutMS
option.PushTimeoutMS = relayPushTimeoutMS
option.WriteAVTimeoutMS = relayPushWriteAVTimeoutMS
})
@ -912,7 +910,7 @@ func (group *Group) pushIfNeeded() {
return
}
group.AddRTMPPushSession(u, pushSession)
err = <-pushSession.Done()
err = <-pushSession.Wait()
nazalog.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRTMPPushSession(u, pushSession)
}(url, urlWithParam)

@ -9,11 +9,9 @@
package logic
//var relayPushCheckIntervalMS = 1000
var relayPushConnectTimeoutMS = 5000
var relayPushTimeoutMS = 5000
var relayPushWriteAVTimeoutMS = 5000
var relayPullConnectTimeoutMS = 5000
var relayPullTimeoutMS = 5000
var relayPullReadAVTimeoutMS = 5000

@ -16,6 +16,7 @@ import (
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
// @param asc 如果为nil则没有音频
@ -46,11 +47,14 @@ func AVConfig2FLVTag(asc, vps, sps, pps []byte) (metadata, ash, vsh *httpflv.Tag
if isHEVC {
videocodecid = int(base.RTMPCodecIDHEVC)
var ctx hevc.Context
if err = hevc.ParseSPS(sps, &ctx); err != nil {
return
err = hevc.ParseSPS(sps, &ctx)
if err == nil {
width = int(ctx.PicWidthInLumaSamples)
height = int(ctx.PicHeightInLumaSamples)
} else {
// TODO chef: 如果解析错误先忽略将失败的case收集起来解决
nazalog.Warnf("parse sps failed.")
}
width = int(ctx.PicWidthInLumaSamples)
height = int(ctx.PicHeightInLumaSamples)
bVsh, err = hevc.BuildSeqHeaderFromVPSSPSPPS(vps, sps, pps)
if err != nil {
return
@ -59,11 +63,13 @@ func AVConfig2FLVTag(asc, vps, sps, pps []byte) (metadata, ash, vsh *httpflv.Tag
videocodecid = int(base.RTMPCodecIDAVC)
var ctx avc.Context
ctx, err = avc.ParseSPS(sps)
if err != nil {
return
if err == nil {
width = int(ctx.Width)
height = int(ctx.Height)
} else {
// TODO chef: 如果解析错误先忽略将失败的case收集起来解决
nazalog.Warnf("parse sps failed.")
}
width = int(ctx.Width)
height = int(ctx.Height)
bVsh, err = avc.BuildSeqHeaderFromSPSPPS(sps, pps)
if err != nil {
return

@ -19,15 +19,16 @@ type PullSession struct {
}
type PullSessionOption struct {
ConnectTimeoutMS int
PullTimeoutMS int
ReadAVTimeoutMS int
// 从调用Pull函数到接收音视频数据的前一步也即收到服务端返回的rtmp play对应结果的信令的超时时间
// 如果为0则没有超时时间
PullTimeoutMS int
ReadAVTimeoutMS int
}
var defaultPullSessionOption = PullSessionOption{
ConnectTimeoutMS: 0,
PullTimeoutMS: 0,
ReadAVTimeoutMS: 0,
PullTimeoutMS: 0,
ReadAVTimeoutMS: 0,
}
type ModPullSessionOption func(option *PullSessionOption)
@ -40,24 +41,23 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
return &PullSession{
core: NewClientSession(CSTPullSession, func(option *ClientSessionOption) {
option.ConnectTimeoutMS = opt.ConnectTimeoutMS
option.DoTimeoutMS = opt.PullTimeoutMS
option.ReadAVTimeoutMS = opt.ReadAVTimeoutMS
}),
}
}
// 建立rtmp play连接
// 阻塞直到收到服务端返回的rtmp publish对应结果的信令或发生错误
// 如果没有发生错误阻塞直到到接收音视频数据的前一步也即收到服务端返回的rtmp play对应结果的信令
//
// @param onReadRTMPAVMsg: 注意回调结束后内存块会被PullSession重复使用
func (s *PullSession) Pull(rawURL string, onReadRTMPAVMsg OnReadRTMPAVMsg) error {
s.core.onReadRTMPAVMsg = onReadRTMPAVMsg
return s.core.DoWithTimeout(rawURL)
return s.core.Do(rawURL)
}
func (s *PullSession) Done() <-chan error {
return s.core.Done()
// Pull成功后调用该函数可阻塞直到拉流结束
func (s *PullSession) Wait() <-chan error {
return s.core.Wait()
}
func (s *PullSession) Dispose() {

@ -8,7 +8,9 @@
package rtmp
import "github.com/q191201771/lal/pkg/base"
import (
"github.com/q191201771/lal/pkg/base"
)
type PushSession struct {
IsFresh bool
@ -17,13 +19,14 @@ type PushSession struct {
}
type PushSessionOption struct {
ConnectTimeoutMS int
PushTimeoutMS int
// 从调用Push函数到可以发送音视频数据的前一步也即收到服务端返回的rtmp publish对应结果的信令的超时时间
// 如果为0则没有超时时间
PushTimeoutMS int
WriteAVTimeoutMS int
}
var defaultPushSessionOption = PushSessionOption{
ConnectTimeoutMS: 0,
PushTimeoutMS: 0,
WriteAVTimeoutMS: 0,
}
@ -38,17 +41,15 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession {
return &PushSession{
IsFresh: true,
core: NewClientSession(CSTPushSession, func(option *ClientSessionOption) {
option.ConnectTimeoutMS = opt.ConnectTimeoutMS
option.DoTimeoutMS = opt.PushTimeoutMS
option.WriteAVTimeoutMS = opt.WriteAVTimeoutMS
}),
}
}
// 建立rtmp publish连接
// 阻塞直到收到服务端返回的rtmp publish对应结果的信令或发生错误
// 如果没有错误发生阻塞到接收音视频数据的前一步也即收到服务端返回的rtmp publish对应结果的信令
func (s *PushSession) Push(rawURL string) error {
return s.core.DoWithTimeout(rawURL)
return s.core.Do(rawURL)
}
func (s *PushSession) AsyncWrite(msg []byte) error {
@ -87,8 +88,8 @@ func (s *PushSession) RawQuery() string {
return s.core.RawQuery()
}
func (s *PushSession) Done() <-chan error {
return s.core.Done()
func (s *PushSession) Wait() <-chan error {
return s.core.Wait()
}
func (s *PushSession) UniqueKey() string {

@ -9,6 +9,7 @@
package rtmp
import (
"context"
"errors"
"fmt"
"net"
@ -56,14 +57,12 @@ const (
type ClientSessionOption struct {
// 单位毫秒如果为0则没有超时
ConnectTimeoutMS int // 建立连接超时
DoTimeoutMS int // 从发起连接包含了建立连接的时间到收到publish或play信令结果的超时
ReadAVTimeoutMS int // 读取音视频数据的超时
WriteAVTimeoutMS int // 发送音视频数据的超时
}
var defaultClientSessOption = ClientSessionOption{
ConnectTimeoutMS: 0,
DoTimeoutMS: 0,
ReadAVTimeoutMS: 0,
WriteAVTimeoutMS: 0,
@ -103,7 +102,23 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
return s
}
func (s *ClientSession) Done() <-chan error {
// 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误
func (s *ClientSession) Do(rawURL string) error {
var (
ctx context.Context
cancel context.CancelFunc
)
if s.option.DoTimeoutMS == 0 {
ctx, cancel = context.WithCancel(context.Background())
} else {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(s.option.DoTimeoutMS)*time.Millisecond)
}
defer cancel()
return s.doContext(ctx, rawURL)
}
// Do成功后调用该函数可阻塞直到推流或拉流结束
func (s *ClientSession) Wait() <-chan error {
return s.conn.Done()
}
@ -169,62 +184,45 @@ func (s *ClientSession) IsAlive() (readAlive, writeAlive bool) {
return
}
// 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误
func (s *ClientSession) DoWithTimeout(rawURL string) error {
if s.option.DoTimeoutMS == 0 {
err := <-s.do(rawURL)
return err
}
t := time.NewTimer(time.Duration(s.option.DoTimeoutMS) * time.Millisecond)
defer t.Stop()
select {
// TODO chef: 这种写法执行不到超时
case err := <-s.do(rawURL):
return err
case <-t.C:
return ErrClientSessionTimeout
}
}
func (s *ClientSession) doContext(ctx context.Context, rawURL string) error {
errChan := make(chan error, 1)
func (s *ClientSession) do(rawURL string) <-chan error {
ch := make(chan error, 1)
if err := s.parseURL(rawURL); err != nil {
ch <- err
return ch
}
if err := s.tcpConnect(); err != nil {
ch <- err
return ch
}
go func() {
if err := s.parseURL(rawURL); err != nil {
errChan <- err
return
}
if err := s.tcpConnect(); err != nil {
errChan <- err
return
}
if err := s.handshake(); err != nil {
ch <- err
return ch
}
if err := s.handshake(); err != nil {
errChan <- err
return
}
log.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey, LocalChunkSize)
if err := s.packer.writeChunkSize(s.conn, LocalChunkSize); err != nil {
ch <- err
return ch
}
log.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey, LocalChunkSize)
if err := s.packer.writeChunkSize(s.conn, LocalChunkSize); err != nil {
errChan <- err
return
}
log.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName())
if err := s.packer.writeConnect(s.conn, s.appName(), s.tcURL(), s.t == CSTPushSession); err != nil {
ch <- err
return ch
}
log.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName())
if err := s.packer.writeConnect(s.conn, s.appName(), s.tcURL(), s.t == CSTPushSession); err != nil {
errChan <- err
return
}
go s.runReadLoop()
s.runReadLoop()
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-s.doResultChan:
ch <- nil
break
case err := <-s.conn.Done():
ch <- err
break
return nil
}
return ch
}
func (s *ClientSession) parseURL(rawURL string) (err error) {
@ -256,7 +254,7 @@ func (s *ClientSession) tcpConnect() error {
s.stat.RemoteAddr = s.urlCtx.HostWithPort
var conn net.Conn
if conn, err = net.DialTimeout("tcp", s.urlCtx.HostWithPort, time.Duration(s.option.ConnectTimeoutMS)*time.Millisecond); err != nil {
if conn, err = net.Dial("tcp", s.urlCtx.HostWithPort); err != nil {
return err
}
@ -286,6 +284,7 @@ func (s *ClientSession) handshake() error {
}
func (s *ClientSession) runReadLoop() {
// TODO chef: 这里是否应该主动关闭conn考虑对端发送非法协议数据增加一个对应的测试看看
_ = s.chunkComposer.RunLoop(s.conn, s.doMsg)
}

@ -100,6 +100,14 @@ func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error {
c.c0c1 = make([]byte, c0c1Len)
c.c0c1[0] = version
bele.BEPutUint32(c.c0c1[1:5], uint32(time.Now().UnixNano()))
//c.c0c1[1] = 0
//c.c0c1[2] = 0
//c.c0c1[3] = 0
//c.c0c1[4] = 0
//c.c0c1[5] = 9
//c.c0c1[6] = 0
//c.c0c1[7] = 124
//c.c0c1[8] = 2
random1528(c.c0c1[9:])
_, err := writer.Write(c.c0c1)

@ -111,18 +111,14 @@ func (r *RTPUnpacker) isStale(seq uint16) bool {
// 计算rtp包处于帧中的位置
func (r *RTPUnpacker) calcPositionIfNeeded(pkt *RTPPacket) {
switch pkt.Header.PacketType {
case base.RTPPacketTypeAVCOrHEVC:
switch r.payloadType {
case base.AVPacketPTAVC:
calcPositionIfNeededAVC(pkt)
case base.AVPacketPTHEVC:
calcPositionIfNeededHEVC(pkt)
default:
// can't reach here
}
case base.RTPPacketTypeAAC:
switch r.payloadType {
case base.AVPacketPTAVC:
calcPositionIfNeededAVC(pkt)
case base.AVPacketPTHEVC:
calcPositionIfNeededHEVC(pkt)
case base.AVPacketPTAAC:
// noop
break
default:
// can't reach here
}

@ -25,9 +25,17 @@ func calcPositionIfNeededHEVC(pkt *RTPPacket) {
outerNALUType := hevc.ParseNALUType(b[0])
switch outerNALUType {
case hevc.NALUTypeSliceIDRNLP:
case hevc.NALUTypeVPS:
fallthrough
case hevc.NALUTypeSPS:
fallthrough
case hevc.NALUTypePPS:
fallthrough
case hevc.NALUTypeSEI:
fallthrough
case hevc.NALUTypeSliceTrailR:
fallthrough
case hevc.NALUTypeSliceIDRNLP:
pkt.positionType = PositionTypeSingle
return
case NALUTypeHEVCFUA:

@ -0,0 +1,85 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp
import (
"fmt"
"strings"
"github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazamd5"
)
// TODO chef: 考虑部分内容移入naza中
// TODO chef: 只支持Digest方式不支持Basic方式
const (
AuthTypeDigest = "Digest"
AuthAlgorithm = "MD5"
)
type Auth struct {
Username string
Password string
Typ string
Realm string
Nonce string
Algorithm string
}
func (a *Auth) FeedWWWAuthenticate(s, username, password string) {
a.Username = username
a.Password = password
s = strings.TrimPrefix(s, HeaderWWWAuthenticate)
s = strings.TrimSpace(s)
if strings.HasPrefix(s, AuthTypeDigest) {
a.Typ = AuthTypeDigest
}
a.Realm = a.getV(s, `realm="`)
a.Nonce = a.getV(s, `nonce="`)
a.Algorithm = a.getV(s, `algorithm="`)
if a.Typ != AuthTypeDigest {
nazalog.Warnf("FeedWWWAuthenticate type invalid, only support Digest. v=%s", s)
}
if a.Realm == "" {
nazalog.Warnf("FeedWWWAuthenticate realm invalid. v=%s", s)
}
if a.Nonce == "" {
nazalog.Warnf("FeedWWWAuthenticate realm invalid. v=%s", s)
}
if a.Algorithm != AuthAlgorithm {
nazalog.Warnf("FeedWWWAuthenticate algorithm invalid, only support MD5. v=%s", s)
}
}
func (a *Auth) MakeAuthorization(method, uri string) string {
if a.Username == "" || a.Nonce == "" {
return ""
}
ha1 := nazamd5.MD5([]byte(fmt.Sprintf("%s:%s:%s", a.Username, a.Realm, a.Password)))
ha2 := nazamd5.MD5([]byte(fmt.Sprintf("%s:%s", method, uri)))
response := nazamd5.MD5([]byte(fmt.Sprintf("%s:%s:%s", ha1, a.Nonce, ha2)))
return fmt.Sprintf(`%s username="%s", realm="%s", nonce="%s", uri="%s", response="%s", algorithm="%s"`, a.Typ, a.Username, a.Realm, a.Nonce, uri, response, a.Algorithm)
}
func (a *Auth) getV(s string, pre string) string {
b := strings.Index(s, pre)
if b == -1 {
return ""
}
e := strings.Index(s[b+len(pre):], `"`)
if e == -1 {
return ""
}
return s[b+len(pre) : b+len(pre)+e]
}

@ -0,0 +1,16 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp_test
import (
"testing"
)
func TestAuth(t *testing.T) {
}

@ -11,7 +11,6 @@ package rtsp
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/circularqueue"
"github.com/q191201771/naza/pkg/nazalog"
)
// 处理音频和视频的时间戳:
@ -53,12 +52,15 @@ func (a *AVPacketQueue) Feed(pkt base.AVPacket) {
a.videoBaseTS = int64(pkt.Timestamp)
}
pkt.Timestamp -= uint32(a.videoBaseTS)
_ = a.videoQueue.PushBack(pkt)
if a.videoQueue.Full() {
pkt, _ := a.videoQueue.Front()
_, _ = a.videoQueue.PopFront()
nazalog.Warnf("video queue full, drop front packet.")
ppkt := pkt.(base.AVPacket)
a.onAVPacket(ppkt)
return
}
_ = a.videoQueue.PushBack(pkt)
//nazalog.Debugf("AVQ v push. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size())
case base.AVPacketPTAAC:
if a.audioBaseTS == -1 {
@ -66,11 +68,14 @@ func (a *AVPacketQueue) Feed(pkt base.AVPacket) {
}
pkt.Timestamp -= uint32(a.audioBaseTS)
_ = a.audioQueue.PushBack(pkt)
if a.audioQueue.Full() {
pkt, _ := a.audioQueue.Front()
_, _ = a.audioQueue.PopFront()
nazalog.Warnf("audio queue full, drop front packet. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size())
ppkt := pkt.(base.AVPacket)
a.onAVPacket(ppkt)
return
}
_ = a.audioQueue.PushBack(pkt)
//nazalog.Debugf("AVQ a push. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size())
} //switch loop

@ -82,13 +82,21 @@ func (s *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext)
s.sdpLogicCtx = sdpLogicCtx
s.m.Unlock()
s.audioUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.AudioPayloadType, s.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked)
s.videoUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.VideoPayloadType, s.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked)
if isSupportType(s.sdpLogicCtx.AudioPayloadType) {
s.audioUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.AudioPayloadType, s.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked)
} else {
nazalog.Warnf("[%s] audio unpacker not support yet. origin type=%d", s.UniqueKey, s.sdpLogicCtx.AudioPayloadTypeOrigin)
}
if isSupportType(s.sdpLogicCtx.VideoPayloadType) {
s.videoUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.VideoPayloadType, s.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked)
} else {
nazalog.Warnf("[%s] video unpacker not support yet. origin type=%d", s.UniqueKey, s.sdpLogicCtx.AudioPayloadTypeOrigin)
}
s.audioRRProducer = rtprtcp.NewRRProducer(s.sdpLogicCtx.AudioClockRate)
s.videoRRProducer = rtprtcp.NewRRProducer(s.sdpLogicCtx.VideoClockRate)
if s.sdpLogicCtx.AudioPayloadType != 0 && s.sdpLogicCtx.VideoPayloadType != 0 {
if s.sdpLogicCtx.HasAudio && s.sdpLogicCtx.HasVideo {
s.avPacketQueue = NewAVPacketQueue(s.onAVPacket)
}
@ -97,9 +105,11 @@ func (s *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext)
}
}
// 如果没有设置回调监听对象可以通过该函数设置调用方保证调用该函数发生在调用InitWithSDP之后
func (s *BaseInSession) SetObserver(observer BaseInSessionObserver) {
s.observer = observer
// TODO chef: 这里的判断应该去掉
if s.sdpLogicCtx.ASC != nil && s.sdpLogicCtx.SPS != nil {
s.observer.OnAVConfig(s.sdpLogicCtx.ASC, s.sdpLogicCtx.VPS, s.sdpLogicCtx.SPS, s.sdpLogicCtx.PPS)
}
@ -122,7 +132,7 @@ func (s *BaseInSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDP
return nil
}
func (s *BaseInSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int, remoteAddr string) error {
func (s *BaseInSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int) error {
if strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) {
s.audioRTPChannel = rtpChannel
s.audioRTCPChannel = rtcpChannel
@ -213,6 +223,22 @@ func (s *BaseInSession) IsAlive() (readAlive, writeAlive bool) {
return
}
// 发现pull时需要先给对端发送数据才能收到数据
func (s *BaseInSession) WriteRTPRTCPDummy() {
if s.videoRTPConn != nil {
_ = s.videoRTPConn.Write(dummyRTPPacket)
}
if s.videoRTCPConn != nil {
_ = s.videoRTCPConn.Write(dummyRTCPPacket)
}
if s.audioRTPConn != nil {
_ = s.audioRTPConn.Write(dummyRTPPacket)
}
if s.audioRTCPConn != nil {
_ = s.audioRTCPConn.Write(dummyRTCPPacket)
}
}
// callback by RTPUnpacker
func (s *BaseInSession) onAVPacketUnpacked(pkt base.AVPacket) {
if s.avPacketQueue != nil {
@ -311,8 +337,8 @@ func (s *BaseInSession) handleRTPPacket(b []byte) error {
return ErrRTSP
}
packetType := b[1] & 0x7F
if packetType != base.RTPPacketTypeAVCOrHEVC && packetType != base.RTPPacketTypeAAC {
packetType := int(b[1] & 0x7F)
if packetType != s.sdpLogicCtx.AudioPayloadTypeOrigin && packetType != s.sdpLogicCtx.VideoPayloadTypeOrigin {
return ErrRTSP
}
@ -328,16 +354,22 @@ func (s *BaseInSession) handleRTPPacket(b []byte) error {
pkt.Raw = b
switch packetType {
case base.RTPPacketTypeAVCOrHEVC:
case s.sdpLogicCtx.VideoPayloadTypeOrigin:
s.videoSSRC = h.SSRC
s.observer.OnRTPPacket(pkt)
s.videoUnpacker.Feed(pkt)
s.videoRRProducer.FeedRTPPacket(h.Seq)
case base.RTPPacketTypeAAC:
if s.videoUnpacker != nil {
s.videoUnpacker.Feed(pkt)
}
case s.sdpLogicCtx.AudioPayloadTypeOrigin:
s.audioSSRC = h.SSRC
s.observer.OnRTPPacket(pkt)
s.audioUnpacker.Feed(pkt)
s.audioRRProducer.FeedRTPPacket(h.Seq)
if s.audioUnpacker != nil {
s.audioUnpacker.Feed(pkt)
}
default:
// 因为前面已经判断过type了所以永远不会走到这
}

@ -9,9 +9,11 @@
package rtsp
import (
"bufio"
"context"
"fmt"
"net"
"strings"
"time"
"github.com/q191201771/lal/pkg/base"
@ -24,7 +26,8 @@ import (
)
const (
pullReadBufSize = 256
pullReadBufSize = 256
writeGetParameterIntervalSec = 10
)
type PullSessionObserver interface {
@ -32,11 +35,16 @@ type PullSessionObserver interface {
}
type PullSessionOption struct {
PullTimeoutMS int // 从调用Pull函数到收到rtsp play response接收音视频数据的前一步的超时时间
// 从调用Pull函数到接收音视频数据的前一步也即收到rtsp play response的超时时间
// 如果为0则没有超时时间
PullTimeoutMS int
OverTCP bool // 是否使用interleaved模式也即是否通过rtsp command tcp连接传输rtp/rtcp数据
}
var defaultPullSessionOption = PullSessionOption{
PullTimeoutMS: 10000,
OverTCP: false,
}
type PullSession struct {
@ -48,9 +56,16 @@ type PullSession struct {
cseq int
sessionID string
channel int
rawURL string
urlCtx base.URLContext
waitErrChan chan error
methodGetParameterSupported bool
auth Auth
}
type ModPullSessionOption func(option *PullSessionOption)
@ -63,8 +78,9 @@ func NewPullSession(observer PullSessionObserver, modOptions ...ModPullSessionOp
uk := base.GenUniqueKey(base.UKPRTSPPullSession)
s := &PullSession{
UniqueKey: uk,
option: option,
UniqueKey: uk,
option: option,
waitErrChan: make(chan error, 1),
}
baseInSession := &BaseInSession{
UniqueKey: uk,
@ -81,6 +97,7 @@ func NewPullSession(observer PullSessionObserver, modOptions ...ModPullSessionOp
return s
}
// 如果没有错误发生阻塞直到接收音视频数据的前一步也即收到rtsp play response
func (session *PullSession) Pull(rawURL string) error {
var (
ctx context.Context
@ -95,9 +112,15 @@ func (session *PullSession) Pull(rawURL string) error {
return session.pullContext(ctx, rawURL)
}
// Pull成功后调用该函数可阻塞直到拉流结束
func (session *PullSession) Wait() <-chan error {
return session.waitErrChan
}
func (session *PullSession) Write(channel int, b []byte) error {
return nil
}
func (session *PullSession) Dispose() error {
return nil
}
@ -105,9 +128,11 @@ func (session *PullSession) Dispose() error {
func (session *PullSession) AppName() string {
return session.urlCtx.PathWithoutLastItem
}
func (session *PullSession) StreamName() string {
return session.urlCtx.LastItemOfPath
}
func (session *PullSession) RawQuery() string {
return session.urlCtx.RawQuery
}
@ -166,9 +191,91 @@ func (session *PullSession) pullContext(ctx context.Context, rawURL string) erro
}
}
dummy := make([]byte, 1) // 用于接收TCP对端关闭FIN信号
_, err := session.CmdConn.Read(dummy)
return err
go session.runReadLoop()
return nil
}
func (session *PullSession) runReadLoop() {
if !session.methodGetParameterSupported {
// TCP模式需要收取数据进行处理
if session.option.OverTCP {
var r = bufio.NewReader(session.CmdConn)
for {
isInterleaved, packet, channel, err := readInterleaved(r)
if err != nil {
session.waitErrChan <- err
return
}
if isInterleaved {
session.baseInSession.HandleInterleavedPacket(packet, int(channel))
}
}
}
// not over tcp
// 接收TCP对端关闭FIN信号
dummy := make([]byte, 1)
_, err := session.CmdConn.Read(dummy)
session.waitErrChan <- err
return
}
// 对端支持get_parameter需要定时向对端发送get_parameter进行保活
var r = bufio.NewReader(session.CmdConn)
t := time.NewTicker(writeGetParameterIntervalSec * time.Millisecond)
defer t.Stop()
if session.option.OverTCP {
for {
select {
case <-t.C:
session.cseq++
req := PackRequestGetParameter(session.urlCtx.RawURLWithoutUserInfo, session.cseq, session.sessionID)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
session.waitErrChan <- err
return
}
default:
// noop
}
isInterleaved, packet, channel, err := readInterleaved(r)
if err != nil {
session.waitErrChan <- err
return
}
if isInterleaved {
session.baseInSession.HandleInterleavedPacket(packet, int(channel))
} else {
if _, err := nazahttp.ReadHTTPResponseMessage(r); err != nil {
session.waitErrChan <- err
return
}
}
}
}
// not over tcp
for {
select {
case <-t.C:
session.cseq++
req := PackRequestGetParameter(session.urlCtx.RawURLWithoutUserInfo, session.cseq, session.sessionID)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
session.waitErrChan <- err
return
}
if _, err := nazahttp.ReadHTTPResponseMessage(r); err != nil {
session.waitErrChan <- err
return
}
default:
// noop
}
}
}
func (session *PullSession) connect(rawURL string) (err error) {
@ -195,7 +302,7 @@ func (session *PullSession) connect(rawURL string) (err error) {
func (session *PullSession) writeOptions() error {
session.cseq++
req := PackRequestOptions(session.rawURL, session.cseq)
req := PackRequestOptions(session.urlCtx.RawURLWithoutUserInfo, session.cseq, "")
nazalog.Debugf("[%s] > write options.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
@ -205,12 +312,19 @@ func (session *PullSession) writeOptions() error {
return err
}
nazalog.Debugf("[%s] < read response. %s", session.UniqueKey, ctx.StatusCode)
session.handleOptionMethods(ctx)
if err := session.handleAuth(ctx); err != nil {
return err
}
return nil
}
func (session *PullSession) writeDescribe() error {
session.cseq++
req := PackRequestDescribe(session.rawURL, session.cseq)
auth := session.auth.MakeAuthorization(MethodDescribe, session.urlCtx.RawURLWithoutUserInfo)
req := PackRequestDescribe(session.urlCtx.RawURLWithoutUserInfo, session.cseq, auth)
nazalog.Debugf("[%s] > write describe.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
@ -221,39 +335,52 @@ func (session *PullSession) writeDescribe() error {
}
nazalog.Debugf("[%s] < read response. code=%s, body=%s", session.UniqueKey, ctx.StatusCode, string(ctx.Body))
sdpCtx, err := sdp.ParseSDP2LogicContext(ctx.Body)
sdpLogicCtx, err := sdp.ParseSDP2LogicContext(ctx.Body)
if err != nil {
return err
}
session.baseInSession.InitWithSDP(ctx.Body, sdpCtx)
session.baseInSession.InitWithSDP(ctx.Body, sdpLogicCtx)
return nil
}
func (session *PullSession) writeSetup() error {
if session.baseInSession.sdpLogicCtx.VideoAControl != "" {
if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.VideoAControl); err != nil {
return err
if session.option.OverTCP {
if err := session.writeOneSetupTCP(session.baseInSession.sdpLogicCtx.VideoAControl); err != nil {
return err
}
} else {
if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.VideoAControl); err != nil {
return err
}
}
}
if session.baseInSession.sdpLogicCtx.AudioAControl != "" {
if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.AudioAControl); err != nil {
return err
if session.option.OverTCP {
if err := session.writeOneSetupTCP(session.baseInSession.sdpLogicCtx.AudioAControl); err != nil {
return err
}
} else {
if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.AudioAControl); err != nil {
return err
}
}
}
return nil
}
func (session *PullSession) writeOneSetup(aControl string) error {
setupURI := fmt.Sprintf("%s/%s", session.rawURL, aControl)
setupURI := makeSetupURI(session.urlCtx, aControl)
rtpC, rtpPort, rtcpC, rtcpPort, err := availUDPConnPool.Acquire2()
if err != nil {
return err
}
session.cseq++
req := PackRequestSetup(setupURI, session.cseq, session.sessionID, int(rtpPort), int(rtcpPort))
auth := session.auth.MakeAuthorization(MethodSetup, session.urlCtx.RawURLWithoutUserInfo)
req := PackRequestSetup(setupURI, session.cseq, int(rtpPort), int(rtcpPort), session.sessionID, auth)
nazalog.Debugf("[%s] > write setup.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
@ -264,7 +391,7 @@ func (session *PullSession) writeOneSetup(aControl string) error {
}
nazalog.Debugf("[%s] < read response. code=%s, ctx=%+v", session.UniqueKey, ctx.StatusCode, ctx)
session.sessionID = ctx.Headers[HeaderFieldSession]
session.sessionID = strings.Split(ctx.Headers[HeaderFieldSession], ";")[0]
srvRTPPort, srvRTCPPort, err := parseServerPort(ctx.Headers[HeaderFieldTransport])
if err != nil {
@ -296,9 +423,42 @@ func (session *PullSession) writeOneSetup(aControl string) error {
return nil
}
func (session *PullSession) writeOneSetupTCP(aControl string) error {
setupURI := makeSetupURI(session.urlCtx, aControl)
rtpChannel := session.channel
rtcpChannel := session.channel + 1
session.channel += 2
session.cseq++
auth := session.auth.MakeAuthorization(MethodSetup, session.urlCtx.RawURLWithoutUserInfo)
req := PackRequestSetupTCP(setupURI, session.cseq, rtpChannel, rtcpChannel, session.sessionID, auth)
nazalog.Debugf("[%s] > write setup.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
}
ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn)
if err != nil {
return err
}
nazalog.Debugf("[%s] < read response. code=%s, ctx=%+v", session.UniqueKey, ctx.StatusCode, ctx)
session.sessionID = strings.Split(ctx.Headers[HeaderFieldSession], ";")[0]
// TODO chef: 这里没有解析回传的channel id了因为我假定了它和request中的是一致的
if err := session.baseInSession.SetupWithChannel(setupURI, rtpChannel, rtcpChannel); err != nil {
return err
}
return nil
}
func (session *PullSession) writePlay() error {
session.baseInSession.WriteRTPRTCPDummy()
session.cseq++
req := PackRequestPlay(session.rawURL, session.cseq, session.sessionID)
auth := session.auth.MakeAuthorization(MethodPlay, session.urlCtx.RawURLWithoutUserInfo)
req := PackRequestPlay(session.urlCtx.RawURLWithoutUserInfo, session.cseq, session.sessionID, auth)
nazalog.Debugf("[%s] > write play.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
@ -310,3 +470,36 @@ func (session *PullSession) writePlay() error {
nazalog.Debugf("[%s] < read response. %s", session.UniqueKey, ctx.StatusCode)
return nil
}
func (session *PullSession) handleOptionMethods(ctx nazahttp.HTTPRespMsgCtx) {
methods := ctx.Headers["Public"]
if methods == "" {
return
}
if strings.Contains(methods, MethodGetParameter) {
session.methodGetParameterSupported = true
}
}
func (session *PullSession) handleAuth(ctx nazahttp.HTTPRespMsgCtx) error {
if ctx.Headers[HeaderWWWAuthenticate] == "" {
return nil
}
session.auth.FeedWWWAuthenticate(ctx.Headers[HeaderWWWAuthenticate], session.urlCtx.Username, session.urlCtx.Password)
auth := session.auth.MakeAuthorization(MethodOptions, session.urlCtx.RawURLWithoutUserInfo)
session.cseq++
req := PackRequestOptions(session.urlCtx.RawURLWithoutUserInfo, session.cseq, auth)
nazalog.Debugf("[%s] > write options.", session.UniqueKey)
if _, err := session.CmdConn.Write([]byte(req)); err != nil {
return err
}
ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn)
if err != nil {
return err
}
nazalog.Debugf("[%s] < read response. %s", session.UniqueKey, ctx.StatusCode)
return nil
}

@ -16,11 +16,6 @@ import (
)
// rfc2326 10.1 OPTIONS
// uri CSeq
var RequestOptionsTmpl = "OPTIONS %s RTSP/1.0\r\n" +
"CSeq: %d\r\n" +
"User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" +
"\r\n"
// CSeq
var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\n" +
@ -30,18 +25,14 @@ var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\n" +
"\r\n"
// rfc2326 10.3 ANNOUNCE
//var RequestAnnounceTmpl = "not impl"
// CSeq
var ResponseAnnounceTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" +
"\r\n"
// rfc2326 10.2 DESCRIBE
// uri CSeq
var RequestDescribeTmpl = "DESCRIBE %s RTSP/1.0\r\n" +
"Accept: application/sdp\r\n" +
"CSeq: %d\r\n" +
"User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" +
"\r\n"
// CSeq, Date, Content-Length,
var ResponseDescribeTmpl = "RTSP/1.0 200 OK\r\n" +
@ -53,20 +44,7 @@ var ResponseDescribeTmpl = "RTSP/1.0 200 OK\r\n" +
"%s"
// rfc2326 10.4 SETUP
// uri CSeq RTPPort RTCPPort
var RequestSetupTmpl = "SETUP %s RTSP/1.0\r\n" +
"CSeq: %d\r\n" +
"Transport: RTP/AVP/UDP;unicast;client_port=%d-%d\r\n" +
"User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" +
"\r\n"
// uri CSeq Session RTPPort RTCPPort
var RequestSetupWithSessionTmpl = "SETUP %s RTSP/1.0\r\n" +
"CSeq: %d\r\n" +
"Session: %s\r\n" +
"Transport: RTP/AVP/UDP;unicast;client_port=%d-%d\r\n" +
"User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" +
"\r\n"
// TODO chef: mode=record这个是咋作用是应该pub有sub没有吗我的pack实现没有严格区分
// CSeq, Date, Session, Transport(client_port, server_rtp_port, server_rtcp_port)
var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\n" +
@ -85,6 +63,8 @@ var ResponseSetupTCPTmpl = "RTSP/1.0 200 OK\r\n" +
"\r\n"
// rfc2326 10.11 RECORD
//var RequestRecordTmpl = "not impl"
// CSeq, Session
var ResponseRecordTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" +
@ -92,13 +72,6 @@ var ResponseRecordTmpl = "RTSP/1.0 200 OK\r\n" +
"\r\n"
// rfc2326 10.5 PLAY
// uri CSeq Session
var RequestPlayTmpl = "PLAY %s RTSP/1.0\r\n" +
"CSeq: %d\r\n" +
"Range: npt=0.000-\r\n" +
"Session: %s\r\n" +
"User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" +
"\r\n"
// CSeq Date
var ResponsePlayTmpl = "RTSP/1.0 200 OK\r\n" +
@ -106,30 +79,96 @@ var ResponsePlayTmpl = "RTSP/1.0 200 OK\r\n" +
"Date: %s\r\n" +
"\r\n"
// rfc2326 10.8 GET_PARAMETER
// uri CSeq Session
var RequestGetParameterTmpl = "GET_PARAMETER %s RTSP/1.0\r\n" +
"CSeq: %d\r\n" +
"Session: %s\r\n" +
"User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" +
"\r\n"
// rfc2326 10.7 TEARDOWN
//var RequestTeardownTmpl = "not impl"
// CSeq
var ResponseTeardownTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" +
"\r\n"
func PackRequestOptions(uri string, cseq int) string {
return fmt.Sprintf(RequestOptionsTmpl, uri, cseq)
// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段
func PackRequestOptions(uri string, cseq int, auth string) string {
headers := map[string]string{
HeaderFieldCSeq: fmt.Sprintf("%d", cseq),
HeaderUserAgent: base.LALRTSPPullSessionUA,
}
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodOptions, uri, headers)
}
func PackRequestDescribe(uri string, cseq int) string {
return fmt.Sprintf(RequestDescribeTmpl, uri, cseq)
// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段
func PackRequestDescribe(uri string, cseq int, auth string) string {
headers := map[string]string{
HeaderAccept: "application/sdp",
HeaderFieldCSeq: fmt.Sprintf("%d", cseq),
HeaderUserAgent: base.LALRTSPPullSessionUA,
}
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodDescribe, uri, headers)
}
// @param sessionID 可以为空,如果为空,则请求中不包含`Session`字段
func PackRequestSetup(uri string, cseq int, sessionID string, rtpClientPort int, rtcpClientPort int) string {
if sessionID == "" {
return fmt.Sprintf(RequestSetupTmpl, uri, cseq, rtpClientPort, rtcpClientPort)
// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段
func PackRequestSetup(uri string, cseq int, rtpClientPort int, rtcpClientPort int, sessionID string, auth string) string {
headers := map[string]string{
HeaderFieldTransport: fmt.Sprintf("RTP/AVP/UDP;unicast;client_port=%d-%d", rtpClientPort, rtcpClientPort),
HeaderFieldCSeq: fmt.Sprintf("%d", cseq),
HeaderUserAgent: base.LALRTSPPullSessionUA,
}
if sessionID != "" {
headers[HeaderFieldSession] = sessionID
}
return fmt.Sprintf(RequestSetupWithSessionTmpl, uri, cseq, sessionID, rtpClientPort, rtcpClientPort)
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodSetup, uri, headers)
}
func PackRequestPlay(uri string, cseq int, sessionID string) string {
return fmt.Sprintf(RequestPlayTmpl, uri, cseq, sessionID)
// @param sessionID 可以为空,如果为空,则请求中不包含`Session`字段
// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段
func PackRequestSetupTCP(uri string, cseq int, rtpChannel int, rtcpChannel int, sessionID string, auth string) string {
headers := map[string]string{
HeaderFieldCSeq: fmt.Sprintf("%d", cseq),
HeaderFieldTransport: fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", rtpChannel, rtcpChannel),
HeaderUserAgent: base.LALRTSPPullSessionUA,
}
if sessionID != "" {
headers[HeaderFieldSession] = sessionID
}
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodSetup, uri, headers)
}
func PackRequestPlay(uri string, cseq int, sessionID string, auth string) string {
headers := map[string]string{
HeaderFieldCSeq: fmt.Sprintf("%d", cseq),
HeaderFieldRange: "npt=0.000-",
HeaderFieldSession: sessionID,
HeaderUserAgent: base.LALRTSPPullSessionUA,
}
if auth != "" {
headers[HeaderAuthorization] = auth
}
return packRequest(MethodPlay, uri, headers)
}
func PackRequestGetParameter(uri string, cseq int, sessionID string) string {
return fmt.Sprintf(RequestGetParameterTmpl, uri, cseq, sessionID)
}
func PackResponseOptions(cseq string) string {
@ -175,3 +214,12 @@ func PackResponsePlay(cseq string) string {
func PackResponseTeardown(cseq string) string {
return fmt.Sprintf(ResponseTeardownTmpl, cseq)
}
func packRequest(method, uri string, headers map[string]string) (ret string) {
ret = method + " " + uri + " RTSP/1.0\r\n"
for k, v := range headers {
ret += k + ": " + v + "\r\n"
}
ret += "\r\n"
return ret
}

@ -15,6 +15,8 @@ import (
"strconv"
"strings"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/rtprtcp"
"github.com/q191201771/naza/pkg/nazalog"
@ -29,25 +31,30 @@ import (
// - stat
// - pub和sub存在一些重复代码
// - sub缺少主动发送sr
// - queue的策略当一条流没数据之后
// - 用context重写其它pull session
// - pull session回调有observer interface和on func回调两种方式是否需要统一
var ErrRTSP = errors.New("lal.rtsp: fxxk")
const (
MethodOptions = "OPTIONS"
MethodAnnounce = "ANNOUNCE"
MethodDescribe = "DESCRIBE"
MethodSetup = "SETUP"
MethodRecord = "RECORD"
MethodPlay = "PLAY"
MethodTeardown = "TEARDOWN"
MethodOptions = "OPTIONS"
MethodAnnounce = "ANNOUNCE"
MethodDescribe = "DESCRIBE"
MethodSetup = "SETUP"
MethodRecord = "RECORD"
MethodPlay = "PLAY"
MethodTeardown = "TEARDOWN"
MethodGetParameter = "GET_PARAMETER"
)
const (
HeaderFieldCSeq = "CSeq"
HeaderFieldTransport = "Transport"
HeaderFieldSession = "Session"
HeaderAccept = "Accept"
HeaderUserAgent = "User-Agent"
HeaderFieldCSeq = "CSeq"
HeaderFieldTransport = "Transport"
HeaderFieldSession = "Session"
HeaderFieldRange = "Range"
HeaderWWWAuthenticate = "WWW-Authenticate"
HeaderAuthorization = "Authorization"
)
const (
@ -64,12 +71,23 @@ var (
// TODO chef: 参考协议标准,不要使用固定值
sessionID = "191201771"
minServerPort = uint16(8000)
maxServerPort = uint16(16000)
minServerPort = uint16(30000)
maxServerPort = uint16(60000)
unpackerItemMaxSize = 1024
serverCommandSessionReadBufSize = 256
dummyRTPPacket = []byte{
0x80, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
}
dummyRTCPPacket = []byte{
0x80, 0xc9, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00,
}
)
var availUDPConnPool *nazanet.AvailUDPConnPool
@ -148,6 +166,25 @@ func parseTransport(setupTransport string, key string) (first, second uint16, er
return uint16(iFirst), uint16(iSecond), err
}
func isSupportType(t base.AVPacketPT) bool {
switch t {
case base.AVPacketPTAAC:
fallthrough
case base.AVPacketPTAVC:
fallthrough
case base.AVPacketPTHEVC:
return true
}
return false
}
func makeSetupURI(urlCtx base.URLContext, aControl string) string {
if strings.HasPrefix(aControl, "rtsp://") {
return aControl
}
return fmt.Sprintf("%s/%s", urlCtx.RawURLWithoutUserInfo, aControl)
}
func init() {
availUDPConnPool = nazanet.NewAvailUDPConnPool(minServerPort, maxServerPort)
}
@ -177,9 +214,6 @@ func init() {
// read http request. method=SETUP, uri=rtsp://localhost:5544/live/test110/streamid=1, headers=map[CSeq:4 Session:191201771 Transport:RTP/AVP/UDP;unicast;client_port=32184-32185;mode=record User-Agent:Lavf57.83.100], body= - server.go:95
// read http request. method=RECORD, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:5 Range:npt=0.000- Session:191201771 User-Agent:Lavf57.83.100], body= - server.go:95
// read http request. method=TEARDOWN, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:6 Session:191201771 User-Agent:Lavf57.83.100], body= - server.go:95
// read udp packet failed. err=read udp [::]:8002: use of closed network connection - server_pub_session.go:199
// read udp packet failed. err=read udp [::]:8003: use of closed network connection - server_pub_session.go:199
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
@ -221,6 +255,11 @@ func init() {
// ---------------------------------------------------------------------------------------------------------------------
// SUB(rtp over tcp)
//
// read http request. method=OPTIONS, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:1 User-Agent:Lavf57.83.100], body= - server_command_session.go:136
// read http request. method=DESCRIBE, uri=rtsp://localhost:5544/live/test110, headers=map[Accept:application/sdp CSeq:2 User-Agent:Lavf57.83.100], body= - server_command_session.go:136
// read http request. method=SETUP, uri=rtsp://localhost:5544/live/test110/streamid=0, headers=map[CSeq:3 Transport:RTP/AVP/TCP;unicast;interleaved=0-1 User-Agent:Lavf57.83.100], body= - server_command_session.go:136
// read http request. method=SETUP, uri=rtsp://localhost:5544/live/test110/streamid=1, headers=map[CSeq:4 Session:191201771 Transport:RTP/AVP/TCP;unicast;interleaved=2-3 User-Agent:Lavf57.83.100], body= - server_command_session.go:136
// read http request. method=PLAY, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:5 Range:npt=0.000- Session:191201771 User-Agent:Lavf57.83.100], body= - server_command_session.go:136
// ---------------------------------------------------------------------------------------------------------------------
// 8000 video rtp

@ -227,8 +227,8 @@ func (s *ServerCommandSession) handleDescribe(requestCtx nazahttp.HTTPReqMsgCtx)
return ErrRTSP
}
ctx, _ := sdp.ParseSDP2LogicContext(rawSDP)
s.subSession.InitWithSDP(rawSDP, ctx)
sdpLogicCtx, _ := sdp.ParseSDP2LogicContext(rawSDP)
s.subSession.InitWithSDP(rawSDP, sdpLogicCtx)
resp := PackResponseDescribe(requestCtx.Headers[HeaderFieldCSeq], string(rawSDP))
_, err = s.conn.Write([]byte(resp))

@ -65,7 +65,7 @@ func (s *PubSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPCon
}
func (s *PubSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int, remoteAddr string) error {
return s.baseInSession.SetupWithChannel(uri, rtpChannel, rtcpChannel, remoteAddr)
return s.baseInSession.SetupWithChannel(uri, rtpChannel, rtcpChannel)
}
func (s *PubSession) Dispose() {

@ -178,14 +178,14 @@ func (s *SubSession) WriteRTPPacket(packet rtprtcp.RTPPacket) {
atomic.AddUint64(&s.currConnStat.WroteBytesSum, uint64(len(packet.Raw)))
switch packet.Header.PacketType {
case base.RTPPacketTypeAVCOrHEVC:
case uint8(s.sdpLogicCtx.VideoPayloadTypeOrigin):
if s.videoRTPConn != nil {
_ = s.videoRTPConn.Write(packet.Raw)
}
if s.videoRTPChannel != -1 {
_ = s.cmdSession.Write(s.videoRTPChannel, packet.Raw)
}
case base.RTPPacketTypeAAC:
case uint8(s.sdpLogicCtx.AudioPayloadTypeOrigin):
if s.audioRTPConn != nil {
_ = s.audioRTPConn.Write(packet.Raw)
}

@ -22,26 +22,41 @@ import (
var ErrSDP = errors.New("lal.sdp: fxxk")
const (
ARTPMapEncodingName = "H265"
ARTPMapEncodingNameH265 = "H265"
ARTPMapEncodingNameH264 = "H264"
ARTPMapEncodingNameAAC = "MPEG4-GENERIC"
)
type LogicContext struct {
AudioClockRate int
VideoClockRate int
AudioPayloadType base.AVPacketPT
VideoPayloadType base.AVPacketPT
AudioAControl string
VideoAControl string
ASC []byte
VPS []byte
SPS []byte
PPS []byte
HasAudio bool
HasVideo bool
AudioClockRate int
VideoClockRate int
AudioPayloadTypeOrigin int
VideoPayloadTypeOrigin int
AudioPayloadType base.AVPacketPT
VideoPayloadType base.AVPacketPT
AudioAControl string
VideoAControl string
ASC []byte
VPS []byte
SPS []byte
PPS []byte
}
type MediaDesc struct {
M M
ARTPMap ARTPMap
AFmtBase AFmtPBase
AControl AControl
}
type RawContext struct {
ARTPMapList []ARTPMap
AFmtPBaseList []AFmtPBase
AControlList []AControl
MediaDescList []MediaDesc
}
type M struct {
Media string
}
type ARTPMap struct {
@ -68,47 +83,45 @@ func ParseSDP2LogicContext(b []byte) (LogicContext, error) {
return ret, err
}
for i, item := range c.ARTPMapList {
switch item.PayloadType {
case base.RTPPacketTypeAVCOrHEVC:
ret.VideoClockRate = item.ClockRate
if item.EncodingName == ARTPMapEncodingName {
ret.VideoPayloadType = base.AVPacketPTHEVC
for _, md := range c.MediaDescList {
switch md.M.Media {
case "audio":
ret.HasAudio = true
ret.AudioClockRate = md.ARTPMap.ClockRate
ret.AudioAControl = md.AControl.Value
ret.AudioPayloadTypeOrigin = md.ARTPMap.PayloadType
if md.ARTPMap.EncodingName == ARTPMapEncodingNameAAC {
ret.AudioPayloadType = base.AVPacketPTAAC
ret.ASC, err = ParseASC(md.AFmtBase)
if err != nil {
return ret, err
}
} else {
ret.VideoPayloadType = base.AVPacketPTAVC
}
if i < len(c.AControlList) {
ret.VideoAControl = c.AControlList[i].Value
}
case base.RTPPacketTypeAAC:
ret.AudioClockRate = item.ClockRate
ret.AudioPayloadType = base.AVPacketPTAAC
if i < len(c.AControlList) {
ret.AudioAControl = c.AControlList[i].Value
ret.AudioPayloadType = base.AVPacketPTUnknown
}
default:
return ret, ErrSDP
}
}
for _, item := range c.AFmtPBaseList {
switch item.Format {
case base.RTPPacketTypeAVCOrHEVC:
if ret.VideoPayloadType == base.AVPacketPTHEVC {
ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(item)
} else {
ret.SPS, ret.PPS, err = ParseSPSPPS(item)
}
if err != nil {
return ret, err
}
case base.RTPPacketTypeAAC:
ret.ASC, err = ParseASC(item)
if err != nil {
return ret, err
case "video":
ret.HasVideo = true
ret.VideoClockRate = md.ARTPMap.ClockRate
ret.VideoAControl = md.AControl.Value
ret.VideoPayloadTypeOrigin = md.ARTPMap.PayloadType
switch md.ARTPMap.EncodingName {
case ARTPMapEncodingNameH264:
ret.VideoPayloadType = base.AVPacketPTAVC
ret.SPS, ret.PPS, err = ParseSPSPPS(md.AFmtBase)
if err != nil {
return ret, err
}
case ARTPMapEncodingNameH265:
ret.VideoPayloadType = base.AVPacketPTHEVC
ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(md.AFmtBase)
if err != nil {
return ret, err
}
default:
ret.VideoPayloadType = base.AVPacketPTUnknown
}
default:
return ret, ErrSDP
}
}
@ -117,37 +130,74 @@ func ParseSDP2LogicContext(b []byte) (LogicContext, error) {
// 例子见单元测试
func ParseSDP2RawContext(b []byte) (RawContext, error) {
var sdpCtx RawContext
var (
sdpCtx RawContext
md *MediaDesc
)
s := string(b)
lines := strings.Split(s, "\r\n")
for _, line := range lines {
if strings.HasPrefix(line, "m=") {
m, err := ParseM(line)
if err != nil {
return sdpCtx, err
}
if md != nil {
sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md)
}
md = &MediaDesc{
M: m,
}
}
if strings.HasPrefix(line, "a=rtpmap") {
aRTPMap, err := ParseARTPMap(line)
if err != nil {
return sdpCtx, err
}
sdpCtx.ARTPMapList = append(sdpCtx.ARTPMapList, aRTPMap)
if md == nil {
continue
}
md.ARTPMap = aRTPMap
}
if strings.HasPrefix(line, "a=fmtp") {
aFmtPBase, err := ParseAFmtPBase(line)
if err != nil {
return sdpCtx, err
}
sdpCtx.AFmtPBaseList = append(sdpCtx.AFmtPBaseList, aFmtPBase)
if md == nil {
continue
}
md.AFmtBase = aFmtPBase
}
if strings.HasPrefix(line, "a=control") {
aControl, err := ParseAControl(line)
if err != nil {
return sdpCtx, err
}
sdpCtx.AControlList = append(sdpCtx.AControlList, aControl)
if md == nil {
continue
}
md.AControl = aControl
}
}
if md != nil {
sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md)
}
return sdpCtx, nil
}
func ParseM(s string) (ret M, err error) {
ss := strings.TrimPrefix(s, "m=")
items := strings.Split(ss, " ")
if len(items) < 1 {
return ret, ErrSDP
}
ret.Media = items[0]
return
}
// 例子见单元测试
func ParseARTPMap(s string) (ret ARTPMap, err error) {
// rfc 3640 3.3.1. General
@ -262,10 +312,6 @@ func ParseASC(a AFmtPBase) ([]byte, error) {
}
func ParseVPSSPSPPS(a AFmtPBase) (vps, sps, pps []byte, err error) {
if a.Format != base.RTPPacketTypeAVCOrHEVC {
return nil, nil, nil, ErrSDP
}
v, ok := a.Parameters["sprop-vps"]
if !ok {
return nil, nil, nil, ErrSDP
@ -296,10 +342,6 @@ func ParseVPSSPSPPS(a AFmtPBase) (vps, sps, pps []byte, err error) {
// 解析AVC/H264的spspps
// 例子见单元测试
func ParseSPSPPS(a AFmtPBase) (sps, pps []byte, err error) {
if a.Format != base.RTPPacketTypeAVCOrHEVC {
return nil, nil, ErrSDP
}
v, ok := a.Parameters["sprop-parameter-sets"]
if !ok {
return nil, nil, ErrSDP

@ -10,6 +10,7 @@ package sdp_test
import (
"encoding/hex"
"strings"
"testing"
"github.com/q191201771/lal/pkg/base"
@ -46,17 +47,6 @@ var goldenPPS = []byte{
0x68, 0xEB, 0xEC, 0xB2, 0x2C,
}
func TestParseSDP2LogicContext(t *testing.T) {
ctx, err := sdp.ParseSDP2LogicContext([]byte(goldenSDP))
assert.Equal(t, nil, err)
assert.Equal(t, 44100, ctx.AudioClockRate)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType)
assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType)
assert.Equal(t, "streamid=1", ctx.AudioAControl)
assert.Equal(t, "streamid=0", ctx.VideoAControl)
}
func TestParseSDP2RawContext(t *testing.T) {
sdpCtx, err := sdp.ParseSDP2RawContext([]byte(goldenSDP))
assert.Equal(t, nil, err)
@ -161,3 +151,170 @@ func TestParseVPSSPSPPS(t *testing.T) {
nazalog.Debugf("%s", hex.Dump(sps))
nazalog.Debugf("%s", hex.Dump(pps))
}
func TestParseSDP2LogicContext(t *testing.T) {
ctx, err := sdp.ParseSDP2LogicContext([]byte(goldenSDP))
assert.Equal(t, nil, err)
assert.Equal(t, true, ctx.HasAudio)
assert.Equal(t, true, ctx.HasVideo)
assert.Equal(t, 44100, ctx.AudioClockRate)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, 97, ctx.AudioPayloadTypeOrigin)
assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin)
assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType)
assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType)
assert.Equal(t, "streamid=1", ctx.AudioAControl)
assert.Equal(t, "streamid=0", ctx.VideoAControl)
assert.IsNotNil(t, ctx.ASC)
assert.Equal(t, nil, ctx.VPS)
assert.IsNotNil(t, ctx.SPS)
assert.IsNotNil(t, ctx.PPS)
}
func TestCase2(t *testing.T) {
golden := `v=0
o=- 2252316233 2252316233 IN IP4 0.0.0.0
s=Media Server
c=IN IP4 0.0.0.0
t=0 0
a=control:*
a=packetization-supported:DH
a=rtppayload-supported:DH
a=range:npt=now-
m=video 0 RTP/AVP 98
a=control:trackID=0
a=framerate:25.000000
a=rtpmap:98 H265/90000
a=fmtp:98 profile-id=1;sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBaoAWCAJBY2uSTL5A=;sprop-pps=RAHA8vA8kA==;sprop-vps=QAEMAf//AWAAAAMAsAAAAwAAAwBarAk=
a=recvonly`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := sdp.ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
assert.Equal(t, false, ctx.HasAudio)
assert.Equal(t, true, ctx.HasVideo)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, 98, ctx.VideoPayloadTypeOrigin)
assert.Equal(t, base.AVPacketPTHEVC, ctx.VideoPayloadType)
assert.Equal(t, "trackID=0", ctx.VideoAControl)
assert.Equal(t, nil, ctx.ASC)
assert.IsNotNil(t, ctx.VPS)
assert.IsNotNil(t, ctx.SPS)
assert.IsNotNil(t, ctx.PPS)
nazalog.Debugf("%+v", ctx)
}
func TestCase3(t *testing.T) {
golden := `v=0
o=- 2252310609 2252310609 IN IP4 0.0.0.0
s=Media Server
c=IN IP4 0.0.0.0
t=0 0
a=control:*
a=packetization-supported:DH
a=rtppayload-supported:DH
a=range:npt=now-
m=video 0 RTP/AVP 96
a=control:trackID=0
a=framerate:25.000000
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1;profile-level-id=4D002A;sprop-parameter-sets=Z00AKp2oHgCJ+WbgICAoAAAfQAAGGoQgAA==,aO48gAA=
a=recvonly
m=audio 0 RTP/AVP 97
a=control:trackID=1
a=rtpmap:97 MPEG4-GENERIC/48000
a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1188
a=recvonly`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := sdp.ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
assert.Equal(t, true, ctx.HasAudio)
assert.Equal(t, true, ctx.HasVideo)
assert.Equal(t, 48000, ctx.AudioClockRate)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, 97, ctx.AudioPayloadTypeOrigin)
assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin)
assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType)
assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType)
assert.Equal(t, "trackID=1", ctx.AudioAControl)
assert.Equal(t, "trackID=0", ctx.VideoAControl)
assert.IsNotNil(t, ctx.ASC)
assert.Equal(t, nil, ctx.VPS)
assert.IsNotNil(t, ctx.SPS)
assert.IsNotNil(t, ctx.PPS)
nazalog.Debugf("%+v", ctx)
}
func TestCase4(t *testing.T) {
golden := `v=0
o=- 1109162014219182 0 IN IP4 0.0.0.0
s=HIK Media Server V3.4.103
i=HIK Media Server Session Description : standard
e=NONE
c=IN IP4 0.0.0.0
t=0 0
a=control:*
b=AS:1034
a=range:npt=now-
m=video 0 RTP/AVP 96
i=Video Media
a=rtpmap:96 H264/90000
a=fmtp:96 profile-level-id=4D0014;packetization-mode=0;sprop-parameter-sets=Z2QAIK2EAQwgCGEAQwgCGEAQwgCEO1AoA803AQEBQAAAAwBAAAAMoQ==,aO48sA==
a=control:trackID=video
b=AS:1024
m=audio 0 RTP/AVP 8
i=Audio Media
a=rtpmap:8 PCMA/8000
a=control:trackID=audio
b=AS:10
a=Media_header:MEDIAINFO=494D4B48020100000400000111710110401F000000FA000000000000000000000000000000000000;
a=appversion:1.0`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := sdp.ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
assert.Equal(t, true, ctx.HasAudio)
assert.Equal(t, true, ctx.HasVideo)
assert.Equal(t, 8000, ctx.AudioClockRate)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, 8, ctx.AudioPayloadTypeOrigin)
assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin)
//assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType)
assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType)
assert.Equal(t, "trackID=audio", ctx.AudioAControl)
assert.Equal(t, "trackID=video", ctx.VideoAControl)
assert.Equal(t, nil, ctx.ASC)
assert.Equal(t, nil, ctx.VPS)
assert.IsNotNil(t, ctx.SPS)
assert.IsNotNil(t, ctx.PPS)
nazalog.Debugf("%+v", ctx)
}
func TestCase5(t *testing.T) {
golden := `v=0
o=- 1001 1 IN IP4 192.168.0.221
s=VCP IPC Realtime stream
m=video 0 RTP/AVP 105
c=IN IP4 192.168.0.221
a=control:rtsp://192.168.0.221/media/video1/video
a=rtpmap:105 H264/90000
a=fmtp:105 profile-level-id=64002a; packetization-mode=1; sprop-parameter-sets=Z2QAKq2EAQwgCGEAQwgCGEAQwgCEO1A8ARPyzcBAQFAAAD6AAAnECEA=,aO4xshs=
a=recvonly
m=application 0 RTP/AVP 107
c=IN IP4 192.168.0.221
a=control:rtsp://192.168.0.221/media/video1/metadata
a=rtpmap:107 vnd.onvif.metadata/90000
a=fmtp:107 DecoderTag=h3c-v3 RTCP=0
a=recvonly`
golden = strings.ReplaceAll(golden, "\n", "\r\n")
ctx, err := sdp.ParseSDP2LogicContext([]byte(golden))
assert.Equal(t, nil, err)
assert.Equal(t, false, ctx.HasAudio)
assert.Equal(t, true, ctx.HasVideo)
assert.Equal(t, 90000, ctx.VideoClockRate)
assert.Equal(t, 105, ctx.VideoPayloadTypeOrigin)
assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType)
assert.Equal(t, "rtsp://192.168.0.221/media/video1/video", ctx.VideoAControl)
assert.Equal(t, nil, ctx.VPS)
assert.IsNotNil(t, ctx.SPS)
assert.IsNotNil(t, ctx.PPS)
nazalog.Debugf("%+v", ctx)
}

Loading…
Cancel
Save