diff --git a/app/demo/analyseflv/analyseflv.go b/app/demo/analyseflv/analyseflv.go index 06a4e13..230b676 100644 --- a/app/demo/analyseflv/analyseflv.go +++ b/app/demo/analyseflv/analyseflv.go @@ -121,6 +121,7 @@ func main() { nazalog.Debugf("%+v", buf.String()) } case httpflv.TagTypeAudio: + nazalog.Debugf("header=%+v, %+v", tag.Header, tag.IsAACSeqHeader()) brAudio.Add(len(tag.Raw)) if tag.IsAACSeqHeader() { diff --git a/app/demo/pullrtmp/pullrtmp.go b/app/demo/pullrtmp/pullrtmp.go index 845f8b9..c9ab738 100644 --- a/app/demo/pullrtmp/pullrtmp.go +++ b/app/demo/pullrtmp/pullrtmp.go @@ -73,8 +73,7 @@ func pull(url string, filename string) { } session := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { - option.ConnectTimeoutMS = 3000 - option.PullTimeoutMS = 3000 + option.PullTimeoutMS = 30000 option.ReadAVTimeoutMS = 10000 }) @@ -89,7 +88,7 @@ func pull(url string, filename string) { } }) nazalog.Assert(nil, err) - err = <-session.Done() + err = <-session.Wait() nazalog.Debug(err) } diff --git a/app/demo/pullrtmp2hls/pullrtmp2hls.go b/app/demo/pullrtmp2hls/pullrtmp2hls.go index 6d729f8..d7904f2 100644 --- a/app/demo/pullrtmp2hls/pullrtmp2hls.go +++ b/app/demo/pullrtmp2hls/pullrtmp2hls.go @@ -48,7 +48,7 @@ func main() { nazalog.Errorf("pull error. err=%+v", err) os.Exit(-1) } - err = <-pullSession.Done() + err = <-pullSession.Wait() nazalog.Errorf("pull error. err=%+v", err) } diff --git a/app/demo/pullrtsp/pullrtsp.go b/app/demo/pullrtsp/pullrtsp.go index 6b40d3f..5c82ad9 100644 --- a/app/demo/pullrtsp/pullrtsp.go +++ b/app/demo/pullrtsp/pullrtsp.go @@ -12,6 +12,7 @@ import ( "flag" "fmt" "os" + "time" "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/httpflv" @@ -31,14 +32,18 @@ func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) { } func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) { - metadata, vsh, ash, err := remux.AVConfig2FLVTag(asc, vps, sps, pps) + metadata, ash, vsh, err := remux.AVConfig2FLVTag(asc, vps, sps, pps) nazalog.Assert(nil, err) err = w.WriteTag(*metadata) nazalog.Assert(nil, err) - err = w.WriteTag(*vsh) - nazalog.Assert(nil, err) - err = w.WriteTag(*ash) - nazalog.Assert(nil, err) + if ash != nil { + err = w.WriteTag(*ash) + nazalog.Assert(nil, err) + } + if vsh != nil { + err = w.WriteTag(*vsh) + nazalog.Assert(nil, err) + } } func (o *Observer) OnAVPacket(pkt base.AVPacket) { @@ -49,7 +54,11 @@ func (o *Observer) OnAVPacket(pkt base.AVPacket) { } func main() { - inURL, outFilename := parseFlag() + _ = nazalog.Init(func(option *nazalog.Option) { + option.AssertBehavior = nazalog.AssertFatal + }) + inURL, outFilename, overTCP := parseFlag() + err := w.Open(outFilename) nazalog.Assert(nil, err) defer w.Dispose() @@ -57,23 +66,38 @@ func main() { nazalog.Assert(nil, err) o := &Observer{} - s := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) { + rtspPullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) { option.PullTimeoutMS = 5000 + option.OverTCP = overTCP != 0 }) - err = s.Pull(inURL) - nazalog.Error(err) + + go func() { + for { + rtspPullSession.UpdateStat(1) + rtspStat := rtspPullSession.GetStat() + nazalog.Debugf("bitrate. rtsp pull=%dkbit/s", rtspStat.Bitrate) + time.Sleep(1 * time.Second) + } + }() + + err = rtspPullSession.Pull(inURL) + nazalog.Assert(nil, err) + err = <-rtspPullSession.Wait() + nazalog.Infof("done. err=%+v", err) } -func parseFlag() (inURL string, outFilename string) { +func parseFlag() (inURL string, outFilename string, overTCP int) { i := flag.String("i", "", "specify pull rtsp url") o := flag.String("o", "", "specify ouput flv file") + t := flag.Int("t", 0, "specify interleaved mode(rtp/rtcp over tcp)") flag.Parse() if *i == "" || *o == "" { flag.Usage() _, _ = fmt.Fprintf(os.Stderr, `Example: - ./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o out.flv + ./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o out.flv -t 0 + ./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o out.flv -t 1 `) os.Exit(1) } - return *i, *o + return *i, *o, *t } diff --git a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go index dbf1365..b9a09fa 100644 --- a/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go +++ b/app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go @@ -33,14 +33,18 @@ func (o *Observer) OnRTPPacket(pkt rtprtcp.RTPPacket) { } func (o *Observer) OnAVConfig(asc, vps, sps, pps []byte) { - metadata, vsh, ash, err := remux.AVConfig2RTMPMsg(asc, vps, sps, pps) + metadata, ash, vsh, err := remux.AVConfig2RTMPMsg(asc, vps, sps, pps) nazalog.Assert(nil, err) err = rtmpPushSession.AsyncWrite(rtmp.Message2Chunks(metadata.Payload, &metadata.Header)) nazalog.Assert(nil, err) - err = rtmpPushSession.AsyncWrite(rtmp.Message2Chunks(vsh.Payload, &vsh.Header)) - nazalog.Assert(nil, err) - err = rtmpPushSession.AsyncWrite(rtmp.Message2Chunks(ash.Payload, &ash.Header)) - nazalog.Assert(nil, err) + if ash != nil { + err = rtmpPushSession.AsyncWrite(rtmp.Message2Chunks(ash.Payload, &ash.Header)) + nazalog.Assert(nil, err) + } + if vsh != nil { + err = rtmpPushSession.AsyncWrite(rtmp.Message2Chunks(vsh.Payload, &vsh.Header)) + nazalog.Assert(nil, err) + } } func (o *Observer) OnAVPacket(pkt base.AVPacket) { @@ -55,10 +59,9 @@ func main() { option.AssertBehavior = nazalog.AssertFatal }) - inURL, outURL := parseFlag() + inURL, outURL, overTCP := parseFlag() rtmpPushSession = rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.ConnectTimeoutMS = 5000 option.PushTimeoutMS = 5000 option.WriteAVTimeoutMS = 5000 }) @@ -67,13 +70,15 @@ func main() { nazalog.Assert(nil, err) go func() { - err := <-rtmpPushSession.Done() - nazalog.Assert(nil, err) + err := <-rtmpPushSession.Wait() + nazalog.Infof("push rtmp done. err=%+v", err) + os.Exit(1) }() o := &Observer{} rtspPullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) { option.PullTimeoutMS = 5000 + option.OverTCP = overTCP != 0 }) go func() { @@ -89,18 +94,22 @@ func main() { err = rtspPullSession.Pull(inURL) nazalog.Assert(nil, err) + err = <-rtspPullSession.Wait() + nazalog.Infof("pull rtsp done. err=%+v", err) } -func parseFlag() (inURL string, outFilename string) { +func parseFlag() (inURL string, outFilename string, overTCP int) { i := flag.String("i", "", "specify pull rtsp url") o := flag.String("o", "", "specify push rtmp url") + t := flag.Int("t", 0, "specify interleaved mode(rtp/rtcp over tcp)") flag.Parse() if *i == "" || *o == "" { flag.Usage() _, _ = fmt.Fprintf(os.Stderr, `Example: - ./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o rtmp://localhost:19350/live/test220 + ./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o rtmp://localhost:19350/live/test220 -t 0 + ./bin/pullrtsp -i rtsp://localhost:5544/live/test110 -o rtmp://localhost:19350/live/test220 -t 1 `) os.Exit(1) } - return *i, *o + return *i, *o, *t } diff --git a/app/demo/pushrtmp/pushrtmp.go b/app/demo/pushrtmp/pushrtmp.go index dc437c1..691d2df 100644 --- a/app/demo/pushrtmp/pushrtmp.go +++ b/app/demo/pushrtmp/pushrtmp.go @@ -131,7 +131,6 @@ func push(tags []httpflv.Tag, urls []string, isRecursive bool) { for i := range urls { ps := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.ConnectTimeoutMS = 3000 option.PushTimeoutMS = 5000 option.WriteAVTimeoutMS = 10000 }) diff --git a/pkg/aac/aac.go b/pkg/aac/aac.go index cb49198..73060c9 100644 --- a/pkg/aac/aac.go +++ b/pkg/aac/aac.go @@ -62,9 +62,9 @@ func (a *ADTS) InitWithAACAudioSpecificConfig(asc []byte) error { // <1.6.3.3 samplingFrequencyIndex>, // <1.6.3.4 channelConfiguration> // -------------------------------------------------------- - // audio object type [5b] 2=AAC LC - // samplingFrequencyIndex [4b] 3=48000 4=44100 - // channelConfiguration [4b] 2=left, right front speakers + // audio object type [5b] 1=AAC MAIN 2=AAC LC + // samplingFrequencyIndex [4b] 3=48000 4=44100 6=24000 5=32000 11=11025 + // channelConfiguration [4b] 1=center front speaker 2=left, right front speakers br := nazabits.NewBitReader(asc) a.audioObjectType, _ = br.ReadBits8(5) a.samplingFrequencyIndex, _ = br.ReadBits8(4) diff --git a/pkg/avc/avc.go b/pkg/avc/avc.go index 1fd567d..2fa3b32 100644 --- a/pkg/avc/avc.go +++ b/pkg/avc/avc.go @@ -425,8 +425,12 @@ func ParseSPS(payload []byte) (Context, error) { return Context{}, err } if flag == 1 { - nazalog.Debugf("scaling matrix present, not impl yet.") - return Context{}, ErrAVC + nazalog.Debugf("scaling matrix present.") + // TODO chef: 还没有正确实现,只是针对特定case做了处理 + _, err = br.ReadBits32(128) + if err != nil { + return Context{}, err + } } } else { sps.ChromaFormatIdc = 1 @@ -438,10 +442,14 @@ func ParseSPS(payload []byte) (Context, error) { if err != nil { return Context{}, err } + if sps.Log2MaxFrameNumMinus4 > 12 { + return Context{}, ErrAVC + } sps.PicOrderCntType, err = br.ReadGolomb() if err != nil { return Context{}, err } + if sps.PicOrderCntType == 0 { sps.Log2MaxPicOrderCntLsb, err = br.ReadGolomb() sps.Log2MaxPicOrderCntLsb += 4 @@ -579,3 +587,134 @@ func TryParseSeqHeader(payload []byte) error { return err } + +//var defaultScaling4 = [][]uint8{ +// { +// 6, 13, 20, 28, 13, 20, 28, 32, +// 20, 28, 32, 37, 28, 32, 37, 42, +// }, +// { +// 10, 14, 20, 24, 14, 20, 24, 27, +// 20, 24, 27, 30, 24, 27, 30, 34, +// }, +//} +// +//var defaultScaling8 = [][]uint8{ +// { +// 6, 10, 13, 16, 18, 23, 25, 27, +// 10, 11, 16, 18, 23, 25, 27, 29, +// 13, 16, 18, 23, 25, 27, 29, 31, +// 16, 18, 23, 25, 27, 29, 31, 33, +// 18, 23, 25, 27, 29, 31, 33, 36, +// 23, 25, 27, 29, 31, 33, 36, 38, +// 25, 27, 29, 31, 33, 36, 38, 40, +// 27, 29, 31, 33, 36, 38, 40, 42, +// }, +// { +// 9, 13, 15, 17, 19, 21, 22, 24, +// 13, 13, 17, 19, 21, 22, 24, 25, +// 15, 17, 19, 21, 22, 24, 25, 27, +// 17, 19, 21, 22, 24, 25, 27, 28, +// 19, 21, 22, 24, 25, 27, 28, 30, +// 21, 22, 24, 25, 27, 28, 30, 32, +// 22, 24, 25, 27, 28, 30, 32, 33, +// 24, 25, 27, 28, 30, 32, 33, 35, +// }, +//} +// +//var ffZigzagDirect = []uint8{ +// 0, 1, 8, 16, 9, 2, 3, 10, +// 17, 24, 32, 25, 18, 11, 4, 5, +// 12, 19, 26, 33, 40, 48, 41, 34, +// 27, 20, 13, 6, 7, 14, 21, 28, +// 35, 42, 49, 56, 57, 50, 43, 36, +// 29, 22, 15, 23, 30, 37, 44, 51, +// 58, 59, 52, 45, 38, 31, 39, 46, +// 53, 60, 61, 54, 47, 55, 62, 63, +//} +// +//var ffZigzagScan = []uint8{ +// 0 + 0*4, 1 + 0*4, 0 + 1*4, 0 + 2*4, +// 1 + 1*4, 2 + 0*4, 3 + 0*4, 2 + 1*4, +// 1 + 2*4, 0 + 3*4, 1 + 3*4, 2 + 2*4, +// 3 + 1*4, 3 + 2*4, 2 + 3*4, 3 + 3*4, +//} +// +//func decodeScalingMatrices(reader *nazabits.BitReader) error { +// // 6 * 16 +// var spsScalingMatrix4 = [][]uint8{ +// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, +// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, +// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, +// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, +// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, +// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, +// } +// // 6 * 64 +// var spsScalingMatrix8 = [][]uint8{ +// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, +// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, +// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, +// {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, +// } +// +// fallback := [][]uint8{defaultScaling4[0], defaultScaling4[1], defaultScaling8[0], defaultScaling8[1]} +// decodeScalingList(reader, spsScalingMatrix4[0], 16, defaultScaling4[0], fallback[0]) +// decodeScalingList(reader, spsScalingMatrix4[1], 16, defaultScaling4[0], spsScalingMatrix4[0]) +// decodeScalingList(reader, spsScalingMatrix4[2], 16, defaultScaling4[0], spsScalingMatrix4[1]) +// decodeScalingList(reader, spsScalingMatrix4[3], 16, defaultScaling4[1], fallback[1]) +// decodeScalingList(reader, spsScalingMatrix4[4], 16, defaultScaling4[1], spsScalingMatrix4[3]) +// decodeScalingList(reader, spsScalingMatrix4[4], 16, defaultScaling4[1], spsScalingMatrix4[3]) +// +// decodeScalingList(reader, spsScalingMatrix8[0], 64, defaultScaling8[0], fallback[2]) +// decodeScalingList(reader, spsScalingMatrix8[3], 64, defaultScaling8[1], fallback[3]) +// +// return nil +//} +// +//func decodeScalingList(reader *nazabits.BitReader, factors []uint8, size int, jvtList []uint8, fallbackList []uint8) error { +// var ( +// i = 0 +// last = 8 +// next = 8 +// scan []uint8 +// ) +// if size == 16 { +// scan = ffZigzagScan +// } else { +// scan = ffZigzagDirect +// } +// flag, err := reader.ReadBit() +// if err != nil { +// return err +// } +// return nil +// if flag == 0 { +// for n := 0; n < size; n++ { +// factors[n] = fallbackList[n] +// } +// } else { +// for i = 0; i < size; i++ { +// if next != 0 { +// v, err := reader.ReadGolomb() +// if err != nil { +// return err +// } +// next = (last + int(v)) & 0xff +// } +// if i == 0 && next == 0 { +// for n := 0; n < size; n++ { +// factors[n] = jvtList[n] +// } +// break +// } +// if next != 0 { +// factors[scan[i]] = uint8(next) +// last = next +// } else { +// factors[scan[i]] = uint8(last) +// } +// } +// } +// return nil +//} diff --git a/pkg/avc/avc_test.go b/pkg/avc/avc_test.go index 336d7a8..17f17d6 100644 --- a/pkg/avc/avc_test.go +++ b/pkg/avc/avc_test.go @@ -134,3 +134,13 @@ func TestCorner(t *testing.T) { assert.Equal(t, nil, b.Bytes()) assert.Equal(t, avc.ErrAVC, err) } + +func TestParsePPS_Case2(t *testing.T) { + in := []byte{0x67, 0x64, 0x00, 0x20, 0xad, 0x84, 0x01, 0x0c, 0x20, 0x08, 0x61, 0x00, 0x43, 0x08, 0x02, 0x18, 0x40, 0x10, 0xc2, 0x00, 0x84, 0x3b, 0x50, 0x28, 0x03, 0xcd, 0x37, 0x01, 0x01, 0x01, 0x40, 0x00, 0x00, 0x03, 0x00, 0x40, 0x00, 0x00, 0x0c, 0xa1} + ctx, err := avc.ParseSPS(in) + assert.Equal(t, nil, err) + assert.Equal(t, uint8(100), ctx.Profile) + assert.Equal(t, uint8(32), ctx.Level) + assert.Equal(t, uint32(1280), ctx.Width) + assert.Equal(t, uint32(960), ctx.Height) +} diff --git a/pkg/base/avpacket.go b/pkg/base/avpacket.go index 9a20a91..3e568d5 100644 --- a/pkg/base/avpacket.go +++ b/pkg/base/avpacket.go @@ -11,9 +11,10 @@ package base type AVPacketPT int const ( - AVPacketPTAVC AVPacketPT = RTPPacketTypeAVCOrHEVC - AVPacketPTAAC AVPacketPT = RTPPacketTypeAAC - AVPacketPTHEVC AVPacketPT = 98 + AVPacketPTUnknown AVPacketPT = -1 + AVPacketPTAVC AVPacketPT = RTPPacketTypeAVCOrHEVC + AVPacketPTHEVC AVPacketPT = RTPPacketTypeHEVC + AVPacketPTAAC AVPacketPT = RTPPacketTypeAAC ) // 目前供package rtsp使用。以后可能被多个package使用。 diff --git a/pkg/base/rtprtcp.go b/pkg/base/rtprtcp.go index e59ee34..d5da899 100644 --- a/pkg/base/rtprtcp.go +++ b/pkg/base/rtprtcp.go @@ -9,7 +9,11 @@ package base const ( - // 注意,AVC和HEVC都可能使用96,所以不能直接通过96判断是AVC还是HEVC + // 注意,一般情况下,AVC使用96,AAC使用97,HEVC使用98 + // 但是我还遇到过: + // HEVC使用96 + // AVC使用105 RTPPacketTypeAVCOrHEVC = 96 RTPPacketTypeAAC = 97 + RTPPacketTypeHEVC = 98 ) diff --git a/pkg/base/session.go b/pkg/base/session.go index 7bd5df8..a564948 100644 --- a/pkg/base/session.go +++ b/pkg/base/session.go @@ -74,12 +74,15 @@ type ISessionURLContext interface { // | UpdateStat() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | // | IsAlive() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | -// | RunLoop() | √ | x√ | √ | √ | √ | x√ | - | x | x | x | x | | +// | RunLoop() | √ | x√ | √ | √ | √ | x&√ | - | x | x | x | x | | // | Dispose() | √ | √ | √ | √ | √ | √ | - | √ | √ | √ | √ | | // | RemoteAddr() | √ | x | √ | √ | x | x | - | x | x | x | x | | // | SingleConn | √ | x | √ | √ | √ | √ | - | √ | √ | √ | x | | // +// | Opt.PullTimeoutMS | - | - | - | - | - | - | - | - | x | √ | √ | | +// | Wait() | - | - | - | - | - | - | - | - | √ | √ | √ | | +// // Dispose由外部调用,表示主动关闭正常的session // 外部调用Dispose后,不应继续使用该session // Dispose后,RunLoop结束阻塞 diff --git a/pkg/base/url.go b/pkg/base/url.go index 1421db5..ecd7f33 100644 --- a/pkg/base/url.go +++ b/pkg/base/url.go @@ -19,6 +19,8 @@ import ( // 见单元测试 +// TODO chef: 考虑部分内容移入naza中 + var ErrURL = errors.New("lal.url: fxxk") const ( @@ -37,6 +39,8 @@ type URLPathContext struct { type URLContext struct { Scheme string + Username string + Password string StdHost string // host or host:port HostWithPort string Host string @@ -48,6 +52,8 @@ type URLContext struct { PathWithoutLastItem string // 注意,没有前面的'/',也没有后面的'/' LastItemOfPath string // 注意,没有前面的'/' RawQuery string + + RawURLWithoutUserInfo string } func ParseURLPath(path string) (ctx URLPathContext, err error) { @@ -69,6 +75,8 @@ func ParseURL(rawURL string, defaultPort int) (ctx URLContext, err error) { ctx.Scheme = stdURL.Scheme ctx.StdHost = stdURL.Host + ctx.Username = stdURL.User.Username() + ctx.Password, _ = stdURL.User.Password() h, p, err := net.SplitHostPort(stdURL.Host) if err != nil { @@ -102,6 +110,8 @@ func ParseURL(rawURL string, defaultPort int) (ctx URLContext, err error) { ctx.PathWithoutLastItem = pathCtx.PathWithoutLastItem ctx.LastItemOfPath = pathCtx.LastItemOfPath ctx.RawQuery = pathCtx.RawQuery + + ctx.RawURLWithoutUserInfo = fmt.Sprintf("%s://%s%s", ctx.Scheme, ctx.StdHost, ctx.PathWithRawQuery) return ctx, nil } diff --git a/pkg/base/url_test.go b/pkg/base/url_test.go index fca5fad..da1a38f 100644 --- a/pkg/base/url_test.go +++ b/pkg/base/url_test.go @@ -30,133 +30,143 @@ func TestParseURL(t *testing.T) { golden := map[in]base.URLContext{ // 常见url,url中无端口,另外设置默认端口 in{rawURL: "rtmp://127.0.0.1/live/test110", defaultPort: 1935}: { - Scheme: "rtmp", - StdHost: "127.0.0.1", - HostWithPort: "127.0.0.1:1935", - Host: "127.0.0.1", - Port: 1935, - PathWithRawQuery: "/live/test110", - Path: "/live/test110", - PathWithoutLastItem: "live", - LastItemOfPath: "test110", - RawQuery: "", + Scheme: "rtmp", + StdHost: "127.0.0.1", + HostWithPort: "127.0.0.1:1935", + Host: "127.0.0.1", + Port: 1935, + PathWithRawQuery: "/live/test110", + Path: "/live/test110", + PathWithoutLastItem: "live", + LastItemOfPath: "test110", + RawQuery: "", + RawURLWithoutUserInfo: "rtmp://127.0.0.1/live/test110", }, // 域名url in{rawURL: "rtmp://localhost/live/test110", defaultPort: 1935}: { - Scheme: "rtmp", - StdHost: "localhost", - HostWithPort: "localhost:1935", - Host: "localhost", - Port: 1935, - PathWithRawQuery: "/live/test110", - Path: "/live/test110", - PathWithoutLastItem: "live", - LastItemOfPath: "test110", - RawQuery: "", + Scheme: "rtmp", + StdHost: "localhost", + HostWithPort: "localhost:1935", + Host: "localhost", + Port: 1935, + PathWithRawQuery: "/live/test110", + Path: "/live/test110", + PathWithoutLastItem: "live", + LastItemOfPath: "test110", + RawQuery: "", + RawURLWithoutUserInfo: "rtmp://localhost/live/test110", }, // 带参数url in{rawURL: "rtmp://127.0.0.1/live/test110?a=1", defaultPort: 1935}: { - Scheme: "rtmp", - StdHost: "127.0.0.1", - HostWithPort: "127.0.0.1:1935", - Host: "127.0.0.1", - Port: 1935, - PathWithRawQuery: "/live/test110?a=1", - Path: "/live/test110", - PathWithoutLastItem: "live", - LastItemOfPath: "test110", - RawQuery: "a=1", + Scheme: "rtmp", + StdHost: "127.0.0.1", + HostWithPort: "127.0.0.1:1935", + Host: "127.0.0.1", + Port: 1935, + PathWithRawQuery: "/live/test110?a=1", + Path: "/live/test110", + PathWithoutLastItem: "live", + LastItemOfPath: "test110", + RawQuery: "a=1", + RawURLWithoutUserInfo: "rtmp://127.0.0.1/live/test110?a=1", }, // path多级 in{rawURL: "rtmp://127.0.0.1:19350/a/b/test110", defaultPort: 1935}: { - Scheme: "rtmp", - StdHost: "127.0.0.1:19350", - HostWithPort: "127.0.0.1:19350", - Host: "127.0.0.1", - Port: 19350, - PathWithRawQuery: "/a/b/test110", - Path: "/a/b/test110", - PathWithoutLastItem: "a/b", - LastItemOfPath: "test110", - RawQuery: "", + Scheme: "rtmp", + StdHost: "127.0.0.1:19350", + HostWithPort: "127.0.0.1:19350", + Host: "127.0.0.1", + Port: 19350, + PathWithRawQuery: "/a/b/test110", + Path: "/a/b/test110", + PathWithoutLastItem: "a/b", + LastItemOfPath: "test110", + RawQuery: "", + RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350/a/b/test110", }, // url中无端口,没有设置默认端口 in{rawURL: "rtmp://127.0.0.1/live/test110?a=1", defaultPort: -1}: { - Scheme: "rtmp", - StdHost: "127.0.0.1", - HostWithPort: "127.0.0.1", - Host: "127.0.0.1", - Port: 0, - PathWithRawQuery: "/live/test110?a=1", - Path: "/live/test110", - PathWithoutLastItem: "live", - LastItemOfPath: "test110", - RawQuery: "a=1", + Scheme: "rtmp", + StdHost: "127.0.0.1", + HostWithPort: "127.0.0.1", + Host: "127.0.0.1", + Port: 0, + PathWithRawQuery: "/live/test110?a=1", + Path: "/live/test110", + PathWithoutLastItem: "live", + LastItemOfPath: "test110", + RawQuery: "a=1", + RawURLWithoutUserInfo: "rtmp://127.0.0.1/live/test110?a=1", }, // url中有端口,设置默认端口 in{rawURL: "rtmp://127.0.0.1:19350/live/test110?a=1", defaultPort: 1935}: { - Scheme: "rtmp", - StdHost: "127.0.0.1:19350", - HostWithPort: "127.0.0.1:19350", - Host: "127.0.0.1", - Port: 19350, - PathWithRawQuery: "/live/test110?a=1", - Path: "/live/test110", - PathWithoutLastItem: "live", - LastItemOfPath: "test110", - RawQuery: "a=1", + Scheme: "rtmp", + StdHost: "127.0.0.1:19350", + HostWithPort: "127.0.0.1:19350", + Host: "127.0.0.1", + Port: 19350, + PathWithRawQuery: "/live/test110?a=1", + Path: "/live/test110", + PathWithoutLastItem: "live", + LastItemOfPath: "test110", + RawQuery: "a=1", + RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350/live/test110?a=1", }, // 无path in{rawURL: "rtmp://127.0.0.1:19350", defaultPort: 1935}: { - Scheme: "rtmp", - StdHost: "127.0.0.1:19350", - HostWithPort: "127.0.0.1:19350", - Host: "127.0.0.1", - Port: 19350, - PathWithRawQuery: "", - Path: "", - PathWithoutLastItem: "", - LastItemOfPath: "", - RawQuery: "", + Scheme: "rtmp", + StdHost: "127.0.0.1:19350", + HostWithPort: "127.0.0.1:19350", + Host: "127.0.0.1", + Port: 19350, + PathWithRawQuery: "", + Path: "", + PathWithoutLastItem: "", + LastItemOfPath: "", + RawQuery: "", + RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350", }, // 无path2 in{rawURL: "rtmp://127.0.0.1:19350/", defaultPort: 1935}: { - Scheme: "rtmp", - StdHost: "127.0.0.1:19350", - HostWithPort: "127.0.0.1:19350", - Host: "127.0.0.1", - Port: 19350, - PathWithRawQuery: "/", - Path: "/", - PathWithoutLastItem: "", - LastItemOfPath: "", - RawQuery: "", + Scheme: "rtmp", + StdHost: "127.0.0.1:19350", + HostWithPort: "127.0.0.1:19350", + Host: "127.0.0.1", + Port: 19350, + PathWithRawQuery: "/", + Path: "/", + PathWithoutLastItem: "", + LastItemOfPath: "", + RawQuery: "", + RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350/", }, // path不完整 in{rawURL: "rtmp://127.0.0.1:19350/live", defaultPort: 1935}: { - Scheme: "rtmp", - StdHost: "127.0.0.1:19350", - HostWithPort: "127.0.0.1:19350", - Host: "127.0.0.1", - Port: 19350, - PathWithRawQuery: "/live", - Path: "/live", - PathWithoutLastItem: "", - LastItemOfPath: "live", - RawQuery: "", + Scheme: "rtmp", + StdHost: "127.0.0.1:19350", + HostWithPort: "127.0.0.1:19350", + Host: "127.0.0.1", + Port: 19350, + PathWithRawQuery: "/live", + Path: "/live", + PathWithoutLastItem: "", + LastItemOfPath: "live", + RawQuery: "", + RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350/live", }, // path不完整2 in{rawURL: "rtmp://127.0.0.1:19350/live/", defaultPort: 1935}: { - Scheme: "rtmp", - StdHost: "127.0.0.1:19350", - HostWithPort: "127.0.0.1:19350", - Host: "127.0.0.1", - Port: 19350, - PathWithRawQuery: "/live/", - Path: "/live/", - PathWithoutLastItem: "live", - LastItemOfPath: "", - RawQuery: "", + Scheme: "rtmp", + StdHost: "127.0.0.1:19350", + HostWithPort: "127.0.0.1:19350", + Host: "127.0.0.1", + Port: 19350, + PathWithRawQuery: "/live/", + Path: "/live/", + PathWithoutLastItem: "live", + LastItemOfPath: "", + RawQuery: "", + RawURLWithoutUserInfo: "rtmp://127.0.0.1:19350/live/", }, } @@ -171,16 +181,17 @@ func TestParseRTMPURL(t *testing.T) { golden := map[string]base.URLContext{ // 其他测试见ParseURL "rtmp://127.0.0.1/test110": { - Scheme: "rtmp", - StdHost: "127.0.0.1", - HostWithPort: "127.0.0.1:1935", - Host: "127.0.0.1", - Port: 1935, - PathWithRawQuery: "/test110", - Path: "/test110", - PathWithoutLastItem: "test110", - LastItemOfPath: "", - RawQuery: "", + Scheme: "rtmp", + StdHost: "127.0.0.1", + HostWithPort: "127.0.0.1:1935", + Host: "127.0.0.1", + Port: 1935, + PathWithRawQuery: "/test110", + Path: "/test110", + PathWithoutLastItem: "test110", + LastItemOfPath: "", + RawQuery: "", + RawURLWithoutUserInfo: "rtmp://127.0.0.1/test110", }, } for k, v := range golden { @@ -189,3 +200,29 @@ func TestParseRTMPURL(t *testing.T) { assert.Equal(t, v, ctx, k) } } + +func TestParseRTSPURL(t *testing.T) { + golden := map[string]base.URLContext{ + // 其他测试见ParseURL + "rtsp://admin:P!@1988@127.0.0.1:5554/h264/ch33/main/av_stream": { + Scheme: "rtsp", + Username: "admin", + Password: "P!@1988", + StdHost: "127.0.0.1:5554", + HostWithPort: "127.0.0.1:5554", + Host: "127.0.0.1", + Port: 5554, + PathWithRawQuery: "/h264/ch33/main/av_stream", + Path: "/h264/ch33/main/av_stream", + PathWithoutLastItem: "h264/ch33/main", + LastItemOfPath: "av_stream", + RawQuery: "", + RawURLWithoutUserInfo: "rtsp://127.0.0.1:5554/h264/ch33/main/av_stream", + }, + } + for k, v := range golden { + ctx, err := base.ParseRTSPURL(k) + assert.Equal(t, nil, err) + assert.Equal(t, v, ctx, k) + } +} diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 323cf3d..69e6099 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -9,6 +9,7 @@ package httpflv import ( + "context" "fmt" "net" "time" @@ -22,13 +23,16 @@ import ( ) type PullSessionOption struct { - ConnectTimeoutMS int // TCP连接时超时,单位毫秒,如果为0,则不设置超时 - ReadTimeoutMS int // 接收数据超时,单位毫秒,如果为0,则不设置超时 + // 从调用Pull函数,到接收音视频数据的前一步,也即发送完HTTP请求的超时时间 + // 如果为0,则没有超时时间 + PullTimeoutMS int + + ReadTimeoutMS int // 接收数据超时,单位毫秒,如果为0,则不设置超时 } var defaultPullSessionOption = PullSessionOption{ - ConnectTimeoutMS: 0, - ReadTimeoutMS: 0, + PullTimeoutMS: 10000, + ReadTimeoutMS: 0, } type PullSession struct { @@ -41,6 +45,8 @@ type PullSession struct { stat base.StatSession urlCtx base.URLContext + + waitErrChan chan error } type ModPullSessionOption func(option *PullSessionOption) @@ -53,8 +59,9 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession { uk := base.GenUniqueKey(base.UKPFLVPullSession) s := &PullSession{ - option: option, - UniqueKey: uk, + UniqueKey: uk, + option: option, + waitErrChan: make(chan error, 1), } nazalog.Infof("[%s] lifecycle new httpflv PullSession. session=%p", uk, s) return s @@ -62,22 +69,59 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession { type OnReadFLVTag func(tag Tag) -// 阻塞直到拉流失败 +// 如果没有错误发生,阻塞直到接收音视频数据的前一步,也即发送完HTTP请求 // // @param rawURL 支持如下两种格式。(当然,关键点是对端支持) -// http://{domain}/{app_name}/{stream_name}.flv -// http://{ip}/{domain}/{app_name}/{stream_name}.flv +// http://{domain}/{app_name}/{stream_name}.flv +// http://{ip}/{domain}/{app_name}/{stream_name}.flv // // @param onReadFLVTag 读取到 flv tag 数据时回调。回调结束后,PullSession 不会再使用这块 数据。 func (session *PullSession) Pull(rawURL string, onReadFLVTag OnReadFLVTag) error { - if err := session.connect(rawURL); err != nil { - return err + var ( + ctx context.Context + cancel context.CancelFunc + ) + if session.option.PullTimeoutMS == 0 { + ctx, cancel = context.WithCancel(context.Background()) + } else { + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(session.option.PullTimeoutMS)*time.Millisecond) } - if err := session.writeHTTPRequest(); err != nil { - return err + defer cancel() + return session.pullContext(ctx, rawURL, onReadFLVTag) +} + +// Pull成功后,调用该函数,可阻塞直到拉流结束 +func (session *PullSession) Wait() <-chan error { + return session.waitErrChan +} + +func (session *PullSession) pullContext(ctx context.Context, rawURL string, onReadFLVTag OnReadFLVTag) error { + errChan := make(chan error, 1) + + go func() { + if err := session.connect(rawURL); err != nil { + errChan <- err + return + } + if err := session.writeHTTPRequest(); err != nil { + errChan <- err + return + } + + errChan <- nil + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errChan: + if err != nil { + return err + } } - return session.runReadLoop(onReadFLVTag) + go session.runReadLoop(onReadFLVTag) + return nil } func (session *PullSession) Dispose() { @@ -137,7 +181,7 @@ func (session *PullSession) connect(rawURL string) (err error) { nazalog.Debugf("[%s] > tcp connect.", session.UniqueKey) // # 建立连接 - conn, err := net.DialTimeout("tcp", session.urlCtx.HostWithPort, time.Duration(session.option.ConnectTimeoutMS)*time.Millisecond) + conn, err := net.Dial("tcp", session.urlCtx.HostWithPort) if err != nil { return err } @@ -188,19 +232,22 @@ func (session *PullSession) readTag() (Tag, error) { return readTag(session.conn) } -func (session *PullSession) runReadLoop(onReadFLVTag OnReadFLVTag) error { +func (session *PullSession) runReadLoop(onReadFLVTag OnReadFLVTag) { if _, _, err := session.readHTTPRespHeader(); err != nil { - return err + session.waitErrChan <- err + return } if _, err := session.readFLVHeader(); err != nil { - return err + session.waitErrChan <- err + return } for { tag, err := session.readTag() if err != nil { - return err + session.waitErrChan <- err + return } onReadFLVTag(tag) } diff --git a/pkg/innertest/innertest.go b/pkg/innertest/innertest.go index 29d8ff4..6ff8e1f 100644 --- a/pkg/innertest/innertest.go +++ b/pkg/innertest/innertest.go @@ -114,7 +114,7 @@ func InnerTestEntry(t *testing.T) { if err != nil { nazalog.Error(err) } - err = <-rtmpPullSession.Done() + err = <-rtmpPullSession.Wait() nazalog.Debug(err) }() diff --git a/pkg/logic/group.go b/pkg/logic/group.go index be3d943..8de9b3e 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -848,7 +848,6 @@ func (group *Group) pullIfNeeded() { go func() { pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) { - option.ConnectTimeoutMS = relayPullConnectTimeoutMS option.PullTimeoutMS = relayPullTimeoutMS option.ReadAVTimeoutMS = relayPullReadAVTimeoutMS }) @@ -860,7 +859,7 @@ func (group *Group) pullIfNeeded() { } res := group.AddRTMPPullSession(pullSession) if res { - err = <-pullSession.Done() + err = <-pullSession.Wait() nazalog.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err) group.DelRTMPPullSession(pullSession) } else { @@ -901,7 +900,6 @@ func (group *Group) pushIfNeeded() { go func(u, u2 string) { pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { - option.ConnectTimeoutMS = relayPushConnectTimeoutMS option.PushTimeoutMS = relayPushTimeoutMS option.WriteAVTimeoutMS = relayPushWriteAVTimeoutMS }) @@ -912,7 +910,7 @@ func (group *Group) pushIfNeeded() { return } group.AddRTMPPushSession(u, pushSession) - err = <-pushSession.Done() + err = <-pushSession.Wait() nazalog.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err) group.DelRTMPPushSession(u, pushSession) }(url, urlWithParam) diff --git a/pkg/logic/var.go b/pkg/logic/var.go index 5ed8e8b..7fa7f3b 100644 --- a/pkg/logic/var.go +++ b/pkg/logic/var.go @@ -9,11 +9,9 @@ package logic //var relayPushCheckIntervalMS = 1000 -var relayPushConnectTimeoutMS = 5000 var relayPushTimeoutMS = 5000 var relayPushWriteAVTimeoutMS = 5000 -var relayPullConnectTimeoutMS = 5000 var relayPullTimeoutMS = 5000 var relayPullReadAVTimeoutMS = 5000 diff --git a/pkg/remux/avpacket2flv.go b/pkg/remux/avpacket2flv.go index 4ea6715..e6d7ebc 100644 --- a/pkg/remux/avpacket2flv.go +++ b/pkg/remux/avpacket2flv.go @@ -16,6 +16,7 @@ import ( "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/naza/pkg/bele" + "github.com/q191201771/naza/pkg/nazalog" ) // @param asc 如果为nil,则没有音频 @@ -46,11 +47,14 @@ func AVConfig2FLVTag(asc, vps, sps, pps []byte) (metadata, ash, vsh *httpflv.Tag if isHEVC { videocodecid = int(base.RTMPCodecIDHEVC) var ctx hevc.Context - if err = hevc.ParseSPS(sps, &ctx); err != nil { - return + err = hevc.ParseSPS(sps, &ctx) + if err == nil { + width = int(ctx.PicWidthInLumaSamples) + height = int(ctx.PicHeightInLumaSamples) + } else { + // TODO chef: 如果解析错误,先忽略,将失败的case收集起来解决 + nazalog.Warnf("parse sps failed.") } - width = int(ctx.PicWidthInLumaSamples) - height = int(ctx.PicHeightInLumaSamples) bVsh, err = hevc.BuildSeqHeaderFromVPSSPSPPS(vps, sps, pps) if err != nil { return @@ -59,11 +63,13 @@ func AVConfig2FLVTag(asc, vps, sps, pps []byte) (metadata, ash, vsh *httpflv.Tag videocodecid = int(base.RTMPCodecIDAVC) var ctx avc.Context ctx, err = avc.ParseSPS(sps) - if err != nil { - return + if err == nil { + width = int(ctx.Width) + height = int(ctx.Height) + } else { + // TODO chef: 如果解析错误,先忽略,将失败的case收集起来解决 + nazalog.Warnf("parse sps failed.") } - width = int(ctx.Width) - height = int(ctx.Height) bVsh, err = avc.BuildSeqHeaderFromSPSPPS(sps, pps) if err != nil { return diff --git a/pkg/rtmp/client_pull_session.go b/pkg/rtmp/client_pull_session.go index 176e1dd..8c74ec8 100644 --- a/pkg/rtmp/client_pull_session.go +++ b/pkg/rtmp/client_pull_session.go @@ -19,15 +19,16 @@ type PullSession struct { } type PullSessionOption struct { - ConnectTimeoutMS int - PullTimeoutMS int - ReadAVTimeoutMS int + // 从调用Pull函数,到接收音视频数据的前一步,也即收到服务端返回的rtmp play对应结果的信令的超时时间 + // 如果为0,则没有超时时间 + PullTimeoutMS int + + ReadAVTimeoutMS int } var defaultPullSessionOption = PullSessionOption{ - ConnectTimeoutMS: 0, - PullTimeoutMS: 0, - ReadAVTimeoutMS: 0, + PullTimeoutMS: 0, + ReadAVTimeoutMS: 0, } type ModPullSessionOption func(option *PullSessionOption) @@ -40,24 +41,23 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession { return &PullSession{ core: NewClientSession(CSTPullSession, func(option *ClientSessionOption) { - option.ConnectTimeoutMS = opt.ConnectTimeoutMS option.DoTimeoutMS = opt.PullTimeoutMS option.ReadAVTimeoutMS = opt.ReadAVTimeoutMS }), } } -// 建立rtmp play连接 -// 阻塞直到收到服务端返回的rtmp publish对应结果的信令,或发生错误 +// 如果没有发生错误,阻塞直到到接收音视频数据的前一步,也即收到服务端返回的rtmp play对应结果的信令 // // @param onReadRTMPAVMsg: 注意,回调结束后,内存块会被PullSession重复使用 func (s *PullSession) Pull(rawURL string, onReadRTMPAVMsg OnReadRTMPAVMsg) error { s.core.onReadRTMPAVMsg = onReadRTMPAVMsg - return s.core.DoWithTimeout(rawURL) + return s.core.Do(rawURL) } -func (s *PullSession) Done() <-chan error { - return s.core.Done() +// Pull成功后,调用该函数,可阻塞直到拉流结束 +func (s *PullSession) Wait() <-chan error { + return s.core.Wait() } func (s *PullSession) Dispose() { diff --git a/pkg/rtmp/client_push_session.go b/pkg/rtmp/client_push_session.go index 47d2a94..a204eab 100644 --- a/pkg/rtmp/client_push_session.go +++ b/pkg/rtmp/client_push_session.go @@ -8,7 +8,9 @@ package rtmp -import "github.com/q191201771/lal/pkg/base" +import ( + "github.com/q191201771/lal/pkg/base" +) type PushSession struct { IsFresh bool @@ -17,13 +19,14 @@ type PushSession struct { } type PushSessionOption struct { - ConnectTimeoutMS int - PushTimeoutMS int + // 从调用Push函数,到可以发送音视频数据的前一步,也即收到服务端返回的rtmp publish对应结果的信令的超时时间 + // 如果为0,则没有超时时间 + PushTimeoutMS int + WriteAVTimeoutMS int } var defaultPushSessionOption = PushSessionOption{ - ConnectTimeoutMS: 0, PushTimeoutMS: 0, WriteAVTimeoutMS: 0, } @@ -38,17 +41,15 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession { return &PushSession{ IsFresh: true, core: NewClientSession(CSTPushSession, func(option *ClientSessionOption) { - option.ConnectTimeoutMS = opt.ConnectTimeoutMS option.DoTimeoutMS = opt.PushTimeoutMS option.WriteAVTimeoutMS = opt.WriteAVTimeoutMS }), } } -// 建立rtmp publish连接 -// 阻塞直到收到服务端返回的rtmp publish对应结果的信令,或发生错误 +// 如果没有错误发生,阻塞到接收音视频数据的前一步,也即收到服务端返回的rtmp publish对应结果的信令 func (s *PushSession) Push(rawURL string) error { - return s.core.DoWithTimeout(rawURL) + return s.core.Do(rawURL) } func (s *PushSession) AsyncWrite(msg []byte) error { @@ -87,8 +88,8 @@ func (s *PushSession) RawQuery() string { return s.core.RawQuery() } -func (s *PushSession) Done() <-chan error { - return s.core.Done() +func (s *PushSession) Wait() <-chan error { + return s.core.Wait() } func (s *PushSession) UniqueKey() string { diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 0b071f6..0b03bed 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -9,6 +9,7 @@ package rtmp import ( + "context" "errors" "fmt" "net" @@ -56,14 +57,12 @@ const ( type ClientSessionOption struct { // 单位毫秒,如果为0,则没有超时 - ConnectTimeoutMS int // 建立连接超时 DoTimeoutMS int // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时 ReadAVTimeoutMS int // 读取音视频数据的超时 WriteAVTimeoutMS int // 发送音视频数据的超时 } var defaultClientSessOption = ClientSessionOption{ - ConnectTimeoutMS: 0, DoTimeoutMS: 0, ReadAVTimeoutMS: 0, WriteAVTimeoutMS: 0, @@ -103,7 +102,23 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) return s } -func (s *ClientSession) Done() <-chan error { +// 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误 +func (s *ClientSession) Do(rawURL string) error { + var ( + ctx context.Context + cancel context.CancelFunc + ) + if s.option.DoTimeoutMS == 0 { + ctx, cancel = context.WithCancel(context.Background()) + } else { + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(s.option.DoTimeoutMS)*time.Millisecond) + } + defer cancel() + return s.doContext(ctx, rawURL) +} + +// Do成功后,调用该函数,可阻塞直到推流或拉流结束 +func (s *ClientSession) Wait() <-chan error { return s.conn.Done() } @@ -169,62 +184,45 @@ func (s *ClientSession) IsAlive() (readAlive, writeAlive bool) { return } -// 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误 -func (s *ClientSession) DoWithTimeout(rawURL string) error { - if s.option.DoTimeoutMS == 0 { - err := <-s.do(rawURL) - return err - } - t := time.NewTimer(time.Duration(s.option.DoTimeoutMS) * time.Millisecond) - defer t.Stop() - select { - // TODO chef: 这种写法执行不到超时 - case err := <-s.do(rawURL): - return err - case <-t.C: - return ErrClientSessionTimeout - } -} +func (s *ClientSession) doContext(ctx context.Context, rawURL string) error { + errChan := make(chan error, 1) -func (s *ClientSession) do(rawURL string) <-chan error { - ch := make(chan error, 1) - if err := s.parseURL(rawURL); err != nil { - ch <- err - return ch - } - if err := s.tcpConnect(); err != nil { - ch <- err - return ch - } + go func() { + if err := s.parseURL(rawURL); err != nil { + errChan <- err + return + } + if err := s.tcpConnect(); err != nil { + errChan <- err + return + } - if err := s.handshake(); err != nil { - ch <- err - return ch - } + if err := s.handshake(); err != nil { + errChan <- err + return + } - log.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey, LocalChunkSize) - if err := s.packer.writeChunkSize(s.conn, LocalChunkSize); err != nil { - ch <- err - return ch - } + log.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey, LocalChunkSize) + if err := s.packer.writeChunkSize(s.conn, LocalChunkSize); err != nil { + errChan <- err + return + } - log.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName()) - if err := s.packer.writeConnect(s.conn, s.appName(), s.tcURL(), s.t == CSTPushSession); err != nil { - ch <- err - return ch - } + log.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName()) + if err := s.packer.writeConnect(s.conn, s.appName(), s.tcURL(), s.t == CSTPushSession); err != nil { + errChan <- err + return + } - go s.runReadLoop() + s.runReadLoop() + }() select { + case <-ctx.Done(): + return ctx.Err() case <-s.doResultChan: - ch <- nil - break - case err := <-s.conn.Done(): - ch <- err - break + return nil } - return ch } func (s *ClientSession) parseURL(rawURL string) (err error) { @@ -256,7 +254,7 @@ func (s *ClientSession) tcpConnect() error { s.stat.RemoteAddr = s.urlCtx.HostWithPort var conn net.Conn - if conn, err = net.DialTimeout("tcp", s.urlCtx.HostWithPort, time.Duration(s.option.ConnectTimeoutMS)*time.Millisecond); err != nil { + if conn, err = net.Dial("tcp", s.urlCtx.HostWithPort); err != nil { return err } @@ -286,6 +284,7 @@ func (s *ClientSession) handshake() error { } func (s *ClientSession) runReadLoop() { + // TODO chef: 这里是否应该主动关闭conn,考虑对端发送非法协议数据,增加一个对应的测试看看 _ = s.chunkComposer.RunLoop(s.conn, s.doMsg) } diff --git a/pkg/rtmp/handshake.go b/pkg/rtmp/handshake.go index c5dc7e3..c5916a9 100644 --- a/pkg/rtmp/handshake.go +++ b/pkg/rtmp/handshake.go @@ -100,6 +100,14 @@ func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error { c.c0c1 = make([]byte, c0c1Len) c.c0c1[0] = version bele.BEPutUint32(c.c0c1[1:5], uint32(time.Now().UnixNano())) + //c.c0c1[1] = 0 + //c.c0c1[2] = 0 + //c.c0c1[3] = 0 + //c.c0c1[4] = 0 + //c.c0c1[5] = 9 + //c.c0c1[6] = 0 + //c.c0c1[7] = 124 + //c.c0c1[8] = 2 random1528(c.c0c1[9:]) _, err := writer.Write(c.c0c1) diff --git a/pkg/rtprtcp/rtp_unpacker.go b/pkg/rtprtcp/rtp_unpacker.go index c67908f..77bdbf9 100644 --- a/pkg/rtprtcp/rtp_unpacker.go +++ b/pkg/rtprtcp/rtp_unpacker.go @@ -111,18 +111,14 @@ func (r *RTPUnpacker) isStale(seq uint16) bool { // 计算rtp包处于帧中的位置 func (r *RTPUnpacker) calcPositionIfNeeded(pkt *RTPPacket) { - switch pkt.Header.PacketType { - case base.RTPPacketTypeAVCOrHEVC: - switch r.payloadType { - case base.AVPacketPTAVC: - calcPositionIfNeededAVC(pkt) - case base.AVPacketPTHEVC: - calcPositionIfNeededHEVC(pkt) - default: - // can't reach here - } - case base.RTPPacketTypeAAC: + switch r.payloadType { + case base.AVPacketPTAVC: + calcPositionIfNeededAVC(pkt) + case base.AVPacketPTHEVC: + calcPositionIfNeededHEVC(pkt) + case base.AVPacketPTAAC: // noop + break default: // can't reach here } diff --git a/pkg/rtprtcp/rtp_unpacker_hevc.go b/pkg/rtprtcp/rtp_unpacker_hevc.go index e3aa374..d150dc2 100644 --- a/pkg/rtprtcp/rtp_unpacker_hevc.go +++ b/pkg/rtprtcp/rtp_unpacker_hevc.go @@ -25,9 +25,17 @@ func calcPositionIfNeededHEVC(pkt *RTPPacket) { outerNALUType := hevc.ParseNALUType(b[0]) switch outerNALUType { - case hevc.NALUTypeSliceIDRNLP: + case hevc.NALUTypeVPS: + fallthrough + case hevc.NALUTypeSPS: + fallthrough + case hevc.NALUTypePPS: + fallthrough + case hevc.NALUTypeSEI: fallthrough case hevc.NALUTypeSliceTrailR: + fallthrough + case hevc.NALUTypeSliceIDRNLP: pkt.positionType = PositionTypeSingle return case NALUTypeHEVCFUA: diff --git a/pkg/rtsp/auth.go b/pkg/rtsp/auth.go new file mode 100644 index 0000000..823561b --- /dev/null +++ b/pkg/rtsp/auth.go @@ -0,0 +1,85 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package rtsp + +import ( + "fmt" + "strings" + + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/nazamd5" +) + +// TODO chef: 考虑部分内容移入naza中 +// TODO chef: 只支持Digest方式,不支持Basic方式 + +const ( + AuthTypeDigest = "Digest" + AuthAlgorithm = "MD5" +) + +type Auth struct { + Username string + Password string + + Typ string + Realm string + Nonce string + Algorithm string +} + +func (a *Auth) FeedWWWAuthenticate(s, username, password string) { + a.Username = username + a.Password = password + + s = strings.TrimPrefix(s, HeaderWWWAuthenticate) + s = strings.TrimSpace(s) + if strings.HasPrefix(s, AuthTypeDigest) { + a.Typ = AuthTypeDigest + } + a.Realm = a.getV(s, `realm="`) + a.Nonce = a.getV(s, `nonce="`) + a.Algorithm = a.getV(s, `algorithm="`) + + if a.Typ != AuthTypeDigest { + nazalog.Warnf("FeedWWWAuthenticate type invalid, only support Digest. v=%s", s) + } + if a.Realm == "" { + nazalog.Warnf("FeedWWWAuthenticate realm invalid. v=%s", s) + } + if a.Nonce == "" { + nazalog.Warnf("FeedWWWAuthenticate realm invalid. v=%s", s) + } + if a.Algorithm != AuthAlgorithm { + nazalog.Warnf("FeedWWWAuthenticate algorithm invalid, only support MD5. v=%s", s) + } +} + +func (a *Auth) MakeAuthorization(method, uri string) string { + if a.Username == "" || a.Nonce == "" { + return "" + } + + ha1 := nazamd5.MD5([]byte(fmt.Sprintf("%s:%s:%s", a.Username, a.Realm, a.Password))) + ha2 := nazamd5.MD5([]byte(fmt.Sprintf("%s:%s", method, uri))) + response := nazamd5.MD5([]byte(fmt.Sprintf("%s:%s:%s", ha1, a.Nonce, ha2))) + return fmt.Sprintf(`%s username="%s", realm="%s", nonce="%s", uri="%s", response="%s", algorithm="%s"`, a.Typ, a.Username, a.Realm, a.Nonce, uri, response, a.Algorithm) +} + +func (a *Auth) getV(s string, pre string) string { + b := strings.Index(s, pre) + if b == -1 { + return "" + } + e := strings.Index(s[b+len(pre):], `"`) + if e == -1 { + return "" + } + return s[b+len(pre) : b+len(pre)+e] +} diff --git a/pkg/rtsp/auth_test.go b/pkg/rtsp/auth_test.go new file mode 100644 index 0000000..00c0507 --- /dev/null +++ b/pkg/rtsp/auth_test.go @@ -0,0 +1,16 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package rtsp_test + +import ( + "testing" +) + +func TestAuth(t *testing.T) { +} diff --git a/pkg/rtsp/avpacket_queue.go b/pkg/rtsp/avpacket_queue.go index 9bd1bcc..06d16a5 100644 --- a/pkg/rtsp/avpacket_queue.go +++ b/pkg/rtsp/avpacket_queue.go @@ -11,7 +11,6 @@ package rtsp import ( "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/circularqueue" - "github.com/q191201771/naza/pkg/nazalog" ) // 处理音频和视频的时间戳: @@ -53,12 +52,15 @@ func (a *AVPacketQueue) Feed(pkt base.AVPacket) { a.videoBaseTS = int64(pkt.Timestamp) } pkt.Timestamp -= uint32(a.videoBaseTS) + _ = a.videoQueue.PushBack(pkt) if a.videoQueue.Full() { + pkt, _ := a.videoQueue.Front() _, _ = a.videoQueue.PopFront() - nazalog.Warnf("video queue full, drop front packet.") + ppkt := pkt.(base.AVPacket) + a.onAVPacket(ppkt) + return } - _ = a.videoQueue.PushBack(pkt) //nazalog.Debugf("AVQ v push. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size()) case base.AVPacketPTAAC: if a.audioBaseTS == -1 { @@ -66,11 +68,14 @@ func (a *AVPacketQueue) Feed(pkt base.AVPacket) { } pkt.Timestamp -= uint32(a.audioBaseTS) + _ = a.audioQueue.PushBack(pkt) if a.audioQueue.Full() { + pkt, _ := a.audioQueue.Front() _, _ = a.audioQueue.PopFront() - nazalog.Warnf("audio queue full, drop front packet. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size()) + ppkt := pkt.(base.AVPacket) + a.onAVPacket(ppkt) + return } - _ = a.audioQueue.PushBack(pkt) //nazalog.Debugf("AVQ a push. a=%d, v=%d", a.audioQueue.Size(), a.videoQueue.Size()) } //switch loop diff --git a/pkg/rtsp/base_in_session.go b/pkg/rtsp/base_in_session.go index dccfb04..b52c5f4 100644 --- a/pkg/rtsp/base_in_session.go +++ b/pkg/rtsp/base_in_session.go @@ -82,13 +82,21 @@ func (s *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) s.sdpLogicCtx = sdpLogicCtx s.m.Unlock() - s.audioUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.AudioPayloadType, s.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked) - s.videoUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.VideoPayloadType, s.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked) + if isSupportType(s.sdpLogicCtx.AudioPayloadType) { + s.audioUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.AudioPayloadType, s.sdpLogicCtx.AudioClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked) + } else { + nazalog.Warnf("[%s] audio unpacker not support yet. origin type=%d", s.UniqueKey, s.sdpLogicCtx.AudioPayloadTypeOrigin) + } + if isSupportType(s.sdpLogicCtx.VideoPayloadType) { + s.videoUnpacker = rtprtcp.NewRTPUnpacker(s.sdpLogicCtx.VideoPayloadType, s.sdpLogicCtx.VideoClockRate, unpackerItemMaxSize, s.onAVPacketUnpacked) + } else { + nazalog.Warnf("[%s] video unpacker not support yet. origin type=%d", s.UniqueKey, s.sdpLogicCtx.AudioPayloadTypeOrigin) + } s.audioRRProducer = rtprtcp.NewRRProducer(s.sdpLogicCtx.AudioClockRate) s.videoRRProducer = rtprtcp.NewRRProducer(s.sdpLogicCtx.VideoClockRate) - if s.sdpLogicCtx.AudioPayloadType != 0 && s.sdpLogicCtx.VideoPayloadType != 0 { + if s.sdpLogicCtx.HasAudio && s.sdpLogicCtx.HasVideo { s.avPacketQueue = NewAVPacketQueue(s.onAVPacket) } @@ -97,9 +105,11 @@ func (s *BaseInSession) InitWithSDP(rawSDP []byte, sdpLogicCtx sdp.LogicContext) } } +// 如果没有设置回调监听对象,可以通过该函数设置,调用方保证调用该函数发生在调用InitWithSDP之后 func (s *BaseInSession) SetObserver(observer BaseInSessionObserver) { s.observer = observer + // TODO chef: 这里的判断应该去掉 if s.sdpLogicCtx.ASC != nil && s.sdpLogicCtx.SPS != nil { s.observer.OnAVConfig(s.sdpLogicCtx.ASC, s.sdpLogicCtx.VPS, s.sdpLogicCtx.SPS, s.sdpLogicCtx.PPS) } @@ -122,7 +132,7 @@ func (s *BaseInSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDP return nil } -func (s *BaseInSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int, remoteAddr string) error { +func (s *BaseInSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int) error { if strings.HasSuffix(uri, s.sdpLogicCtx.AudioAControl) { s.audioRTPChannel = rtpChannel s.audioRTCPChannel = rtcpChannel @@ -213,6 +223,22 @@ func (s *BaseInSession) IsAlive() (readAlive, writeAlive bool) { return } +// 发现pull时,需要先给对端发送数据,才能收到数据 +func (s *BaseInSession) WriteRTPRTCPDummy() { + if s.videoRTPConn != nil { + _ = s.videoRTPConn.Write(dummyRTPPacket) + } + if s.videoRTCPConn != nil { + _ = s.videoRTCPConn.Write(dummyRTCPPacket) + } + if s.audioRTPConn != nil { + _ = s.audioRTPConn.Write(dummyRTPPacket) + } + if s.audioRTCPConn != nil { + _ = s.audioRTCPConn.Write(dummyRTCPPacket) + } +} + // callback by RTPUnpacker func (s *BaseInSession) onAVPacketUnpacked(pkt base.AVPacket) { if s.avPacketQueue != nil { @@ -311,8 +337,8 @@ func (s *BaseInSession) handleRTPPacket(b []byte) error { return ErrRTSP } - packetType := b[1] & 0x7F - if packetType != base.RTPPacketTypeAVCOrHEVC && packetType != base.RTPPacketTypeAAC { + packetType := int(b[1] & 0x7F) + if packetType != s.sdpLogicCtx.AudioPayloadTypeOrigin && packetType != s.sdpLogicCtx.VideoPayloadTypeOrigin { return ErrRTSP } @@ -328,16 +354,22 @@ func (s *BaseInSession) handleRTPPacket(b []byte) error { pkt.Raw = b switch packetType { - case base.RTPPacketTypeAVCOrHEVC: + case s.sdpLogicCtx.VideoPayloadTypeOrigin: s.videoSSRC = h.SSRC s.observer.OnRTPPacket(pkt) - s.videoUnpacker.Feed(pkt) s.videoRRProducer.FeedRTPPacket(h.Seq) - case base.RTPPacketTypeAAC: + + if s.videoUnpacker != nil { + s.videoUnpacker.Feed(pkt) + } + case s.sdpLogicCtx.AudioPayloadTypeOrigin: s.audioSSRC = h.SSRC s.observer.OnRTPPacket(pkt) - s.audioUnpacker.Feed(pkt) s.audioRRProducer.FeedRTPPacket(h.Seq) + + if s.audioUnpacker != nil { + s.audioUnpacker.Feed(pkt) + } default: // 因为前面已经判断过type了,所以永远不会走到这 } diff --git a/pkg/rtsp/client_pull_session.go b/pkg/rtsp/client_pull_session.go index 6d25ae4..facde4b 100644 --- a/pkg/rtsp/client_pull_session.go +++ b/pkg/rtsp/client_pull_session.go @@ -9,9 +9,11 @@ package rtsp import ( + "bufio" "context" "fmt" "net" + "strings" "time" "github.com/q191201771/lal/pkg/base" @@ -24,7 +26,8 @@ import ( ) const ( - pullReadBufSize = 256 + pullReadBufSize = 256 + writeGetParameterIntervalSec = 10 ) type PullSessionObserver interface { @@ -32,11 +35,16 @@ type PullSessionObserver interface { } type PullSessionOption struct { - PullTimeoutMS int // 从调用Pull函数,到收到rtsp play response(接收音视频数据的前一步)的超时时间 + // 从调用Pull函数,到接收音视频数据的前一步,也即收到rtsp play response的超时时间 + // 如果为0,则没有超时时间 + PullTimeoutMS int + + OverTCP bool // 是否使用interleaved模式,也即是否通过rtsp command tcp连接传输rtp/rtcp数据 } var defaultPullSessionOption = PullSessionOption{ PullTimeoutMS: 10000, + OverTCP: false, } type PullSession struct { @@ -48,9 +56,16 @@ type PullSession struct { cseq int sessionID string + channel int rawURL string urlCtx base.URLContext + + waitErrChan chan error + + methodGetParameterSupported bool + + auth Auth } type ModPullSessionOption func(option *PullSessionOption) @@ -63,8 +78,9 @@ func NewPullSession(observer PullSessionObserver, modOptions ...ModPullSessionOp uk := base.GenUniqueKey(base.UKPRTSPPullSession) s := &PullSession{ - UniqueKey: uk, - option: option, + UniqueKey: uk, + option: option, + waitErrChan: make(chan error, 1), } baseInSession := &BaseInSession{ UniqueKey: uk, @@ -81,6 +97,7 @@ func NewPullSession(observer PullSessionObserver, modOptions ...ModPullSessionOp return s } +// 如果没有错误发生,阻塞直到接收音视频数据的前一步,也即收到rtsp play response func (session *PullSession) Pull(rawURL string) error { var ( ctx context.Context @@ -95,9 +112,15 @@ func (session *PullSession) Pull(rawURL string) error { return session.pullContext(ctx, rawURL) } +// Pull成功后,调用该函数,可阻塞直到拉流结束 +func (session *PullSession) Wait() <-chan error { + return session.waitErrChan +} + func (session *PullSession) Write(channel int, b []byte) error { return nil } + func (session *PullSession) Dispose() error { return nil } @@ -105,9 +128,11 @@ func (session *PullSession) Dispose() error { func (session *PullSession) AppName() string { return session.urlCtx.PathWithoutLastItem } + func (session *PullSession) StreamName() string { return session.urlCtx.LastItemOfPath } + func (session *PullSession) RawQuery() string { return session.urlCtx.RawQuery } @@ -166,9 +191,91 @@ func (session *PullSession) pullContext(ctx context.Context, rawURL string) erro } } - dummy := make([]byte, 1) // 用于接收TCP对端关闭FIN信号 - _, err := session.CmdConn.Read(dummy) - return err + go session.runReadLoop() + return nil +} + +func (session *PullSession) runReadLoop() { + if !session.methodGetParameterSupported { + // TCP模式,需要收取数据进行处理 + if session.option.OverTCP { + var r = bufio.NewReader(session.CmdConn) + for { + isInterleaved, packet, channel, err := readInterleaved(r) + if err != nil { + session.waitErrChan <- err + return + } + if isInterleaved { + session.baseInSession.HandleInterleavedPacket(packet, int(channel)) + } + } + } + + // not over tcp + // 接收TCP对端关闭FIN信号 + dummy := make([]byte, 1) + _, err := session.CmdConn.Read(dummy) + session.waitErrChan <- err + return + } + + // 对端支持get_parameter,需要定时向对端发送get_parameter进行保活 + + var r = bufio.NewReader(session.CmdConn) + t := time.NewTicker(writeGetParameterIntervalSec * time.Millisecond) + defer t.Stop() + + if session.option.OverTCP { + for { + select { + case <-t.C: + session.cseq++ + req := PackRequestGetParameter(session.urlCtx.RawURLWithoutUserInfo, session.cseq, session.sessionID) + if _, err := session.CmdConn.Write([]byte(req)); err != nil { + session.waitErrChan <- err + return + } + default: + // noop + } + + isInterleaved, packet, channel, err := readInterleaved(r) + if err != nil { + session.waitErrChan <- err + return + } + if isInterleaved { + session.baseInSession.HandleInterleavedPacket(packet, int(channel)) + } else { + if _, err := nazahttp.ReadHTTPResponseMessage(r); err != nil { + session.waitErrChan <- err + return + } + } + } + } + + // not over tcp + for { + select { + case <-t.C: + session.cseq++ + req := PackRequestGetParameter(session.urlCtx.RawURLWithoutUserInfo, session.cseq, session.sessionID) + if _, err := session.CmdConn.Write([]byte(req)); err != nil { + session.waitErrChan <- err + return + } + + if _, err := nazahttp.ReadHTTPResponseMessage(r); err != nil { + session.waitErrChan <- err + return + } + default: + // noop + } + + } } func (session *PullSession) connect(rawURL string) (err error) { @@ -195,7 +302,7 @@ func (session *PullSession) connect(rawURL string) (err error) { func (session *PullSession) writeOptions() error { session.cseq++ - req := PackRequestOptions(session.rawURL, session.cseq) + req := PackRequestOptions(session.urlCtx.RawURLWithoutUserInfo, session.cseq, "") nazalog.Debugf("[%s] > write options.", session.UniqueKey) if _, err := session.CmdConn.Write([]byte(req)); err != nil { return err @@ -205,12 +312,19 @@ func (session *PullSession) writeOptions() error { return err } nazalog.Debugf("[%s] < read response. %s", session.UniqueKey, ctx.StatusCode) + + session.handleOptionMethods(ctx) + if err := session.handleAuth(ctx); err != nil { + return err + } + return nil } func (session *PullSession) writeDescribe() error { session.cseq++ - req := PackRequestDescribe(session.rawURL, session.cseq) + auth := session.auth.MakeAuthorization(MethodDescribe, session.urlCtx.RawURLWithoutUserInfo) + req := PackRequestDescribe(session.urlCtx.RawURLWithoutUserInfo, session.cseq, auth) nazalog.Debugf("[%s] > write describe.", session.UniqueKey) if _, err := session.CmdConn.Write([]byte(req)); err != nil { return err @@ -221,39 +335,52 @@ func (session *PullSession) writeDescribe() error { } nazalog.Debugf("[%s] < read response. code=%s, body=%s", session.UniqueKey, ctx.StatusCode, string(ctx.Body)) - sdpCtx, err := sdp.ParseSDP2LogicContext(ctx.Body) + sdpLogicCtx, err := sdp.ParseSDP2LogicContext(ctx.Body) if err != nil { return err } - session.baseInSession.InitWithSDP(ctx.Body, sdpCtx) + session.baseInSession.InitWithSDP(ctx.Body, sdpLogicCtx) return nil } func (session *PullSession) writeSetup() error { if session.baseInSession.sdpLogicCtx.VideoAControl != "" { - if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.VideoAControl); err != nil { - return err + if session.option.OverTCP { + if err := session.writeOneSetupTCP(session.baseInSession.sdpLogicCtx.VideoAControl); err != nil { + return err + } + } else { + if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.VideoAControl); err != nil { + return err + } } } if session.baseInSession.sdpLogicCtx.AudioAControl != "" { - if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.AudioAControl); err != nil { - return err + if session.option.OverTCP { + if err := session.writeOneSetupTCP(session.baseInSession.sdpLogicCtx.AudioAControl); err != nil { + return err + } + } else { + if err := session.writeOneSetup(session.baseInSession.sdpLogicCtx.AudioAControl); err != nil { + return err + } } } return nil } func (session *PullSession) writeOneSetup(aControl string) error { - setupURI := fmt.Sprintf("%s/%s", session.rawURL, aControl) + setupURI := makeSetupURI(session.urlCtx, aControl) rtpC, rtpPort, rtcpC, rtcpPort, err := availUDPConnPool.Acquire2() if err != nil { return err } session.cseq++ - req := PackRequestSetup(setupURI, session.cseq, session.sessionID, int(rtpPort), int(rtcpPort)) + auth := session.auth.MakeAuthorization(MethodSetup, session.urlCtx.RawURLWithoutUserInfo) + req := PackRequestSetup(setupURI, session.cseq, int(rtpPort), int(rtcpPort), session.sessionID, auth) nazalog.Debugf("[%s] > write setup.", session.UniqueKey) if _, err := session.CmdConn.Write([]byte(req)); err != nil { return err @@ -264,7 +391,7 @@ func (session *PullSession) writeOneSetup(aControl string) error { } nazalog.Debugf("[%s] < read response. code=%s, ctx=%+v", session.UniqueKey, ctx.StatusCode, ctx) - session.sessionID = ctx.Headers[HeaderFieldSession] + session.sessionID = strings.Split(ctx.Headers[HeaderFieldSession], ";")[0] srvRTPPort, srvRTCPPort, err := parseServerPort(ctx.Headers[HeaderFieldTransport]) if err != nil { @@ -296,9 +423,42 @@ func (session *PullSession) writeOneSetup(aControl string) error { return nil } +func (session *PullSession) writeOneSetupTCP(aControl string) error { + setupURI := makeSetupURI(session.urlCtx, aControl) + rtpChannel := session.channel + rtcpChannel := session.channel + 1 + session.channel += 2 + + session.cseq++ + auth := session.auth.MakeAuthorization(MethodSetup, session.urlCtx.RawURLWithoutUserInfo) + req := PackRequestSetupTCP(setupURI, session.cseq, rtpChannel, rtcpChannel, session.sessionID, auth) + nazalog.Debugf("[%s] > write setup.", session.UniqueKey) + if _, err := session.CmdConn.Write([]byte(req)); err != nil { + return err + } + ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn) + if err != nil { + return err + } + nazalog.Debugf("[%s] < read response. code=%s, ctx=%+v", session.UniqueKey, ctx.StatusCode, ctx) + + session.sessionID = strings.Split(ctx.Headers[HeaderFieldSession], ";")[0] + + // TODO chef: 这里没有解析回传的channel id了,因为我假定了它和request中的是一致的 + + if err := session.baseInSession.SetupWithChannel(setupURI, rtpChannel, rtcpChannel); err != nil { + return err + } + + return nil +} + func (session *PullSession) writePlay() error { + session.baseInSession.WriteRTPRTCPDummy() + session.cseq++ - req := PackRequestPlay(session.rawURL, session.cseq, session.sessionID) + auth := session.auth.MakeAuthorization(MethodPlay, session.urlCtx.RawURLWithoutUserInfo) + req := PackRequestPlay(session.urlCtx.RawURLWithoutUserInfo, session.cseq, session.sessionID, auth) nazalog.Debugf("[%s] > write play.", session.UniqueKey) if _, err := session.CmdConn.Write([]byte(req)); err != nil { return err @@ -310,3 +470,36 @@ func (session *PullSession) writePlay() error { nazalog.Debugf("[%s] < read response. %s", session.UniqueKey, ctx.StatusCode) return nil } + +func (session *PullSession) handleOptionMethods(ctx nazahttp.HTTPRespMsgCtx) { + methods := ctx.Headers["Public"] + if methods == "" { + return + } + + if strings.Contains(methods, MethodGetParameter) { + session.methodGetParameterSupported = true + } +} + +func (session *PullSession) handleAuth(ctx nazahttp.HTTPRespMsgCtx) error { + if ctx.Headers[HeaderWWWAuthenticate] == "" { + return nil + } + + session.auth.FeedWWWAuthenticate(ctx.Headers[HeaderWWWAuthenticate], session.urlCtx.Username, session.urlCtx.Password) + auth := session.auth.MakeAuthorization(MethodOptions, session.urlCtx.RawURLWithoutUserInfo) + + session.cseq++ + req := PackRequestOptions(session.urlCtx.RawURLWithoutUserInfo, session.cseq, auth) + nazalog.Debugf("[%s] > write options.", session.UniqueKey) + if _, err := session.CmdConn.Write([]byte(req)); err != nil { + return err + } + ctx, err := nazahttp.ReadHTTPResponseMessage(session.CmdConn) + if err != nil { + return err + } + nazalog.Debugf("[%s] < read response. %s", session.UniqueKey, ctx.StatusCode) + return nil +} diff --git a/pkg/rtsp/pack.go b/pkg/rtsp/pack.go index 414fd09..4b1d6fc 100644 --- a/pkg/rtsp/pack.go +++ b/pkg/rtsp/pack.go @@ -16,11 +16,6 @@ import ( ) // rfc2326 10.1 OPTIONS -// uri CSeq -var RequestOptionsTmpl = "OPTIONS %s RTSP/1.0\r\n" + - "CSeq: %d\r\n" + - "User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" + - "\r\n" // CSeq var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\n" + @@ -30,18 +25,14 @@ var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\n" + "\r\n" // rfc2326 10.3 ANNOUNCE +//var RequestAnnounceTmpl = "not impl" + // CSeq var ResponseAnnounceTmpl = "RTSP/1.0 200 OK\r\n" + "CSeq: %s\r\n" + "\r\n" // rfc2326 10.2 DESCRIBE -// uri CSeq -var RequestDescribeTmpl = "DESCRIBE %s RTSP/1.0\r\n" + - "Accept: application/sdp\r\n" + - "CSeq: %d\r\n" + - "User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" + - "\r\n" // CSeq, Date, Content-Length, var ResponseDescribeTmpl = "RTSP/1.0 200 OK\r\n" + @@ -53,20 +44,7 @@ var ResponseDescribeTmpl = "RTSP/1.0 200 OK\r\n" + "%s" // rfc2326 10.4 SETUP -// uri CSeq RTPPort RTCPPort -var RequestSetupTmpl = "SETUP %s RTSP/1.0\r\n" + - "CSeq: %d\r\n" + - "Transport: RTP/AVP/UDP;unicast;client_port=%d-%d\r\n" + - "User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" + - "\r\n" - -// uri CSeq Session RTPPort RTCPPort -var RequestSetupWithSessionTmpl = "SETUP %s RTSP/1.0\r\n" + - "CSeq: %d\r\n" + - "Session: %s\r\n" + - "Transport: RTP/AVP/UDP;unicast;client_port=%d-%d\r\n" + - "User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" + - "\r\n" +// TODO chef: mode=record,这个是咋作用,是应该pub有sub没有吗,我的pack实现没有严格区分 // CSeq, Date, Session, Transport(client_port, server_rtp_port, server_rtcp_port) var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\n" + @@ -85,6 +63,8 @@ var ResponseSetupTCPTmpl = "RTSP/1.0 200 OK\r\n" + "\r\n" // rfc2326 10.11 RECORD +//var RequestRecordTmpl = "not impl" + // CSeq, Session var ResponseRecordTmpl = "RTSP/1.0 200 OK\r\n" + "CSeq: %s\r\n" + @@ -92,13 +72,6 @@ var ResponseRecordTmpl = "RTSP/1.0 200 OK\r\n" + "\r\n" // rfc2326 10.5 PLAY -// uri CSeq Session -var RequestPlayTmpl = "PLAY %s RTSP/1.0\r\n" + - "CSeq: %d\r\n" + - "Range: npt=0.000-\r\n" + - "Session: %s\r\n" + - "User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" + - "\r\n" // CSeq Date var ResponsePlayTmpl = "RTSP/1.0 200 OK\r\n" + @@ -106,30 +79,96 @@ var ResponsePlayTmpl = "RTSP/1.0 200 OK\r\n" + "Date: %s\r\n" + "\r\n" +// rfc2326 10.8 GET_PARAMETER +// uri CSeq Session +var RequestGetParameterTmpl = "GET_PARAMETER %s RTSP/1.0\r\n" + + "CSeq: %d\r\n" + + "Session: %s\r\n" + + "User-Agent: " + base.LALRTSPPullSessionUA + "\r\n" + + "\r\n" + // rfc2326 10.7 TEARDOWN +//var RequestTeardownTmpl = "not impl" + // CSeq var ResponseTeardownTmpl = "RTSP/1.0 200 OK\r\n" + "CSeq: %s\r\n" + "\r\n" -func PackRequestOptions(uri string, cseq int) string { - return fmt.Sprintf(RequestOptionsTmpl, uri, cseq) +// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段 +func PackRequestOptions(uri string, cseq int, auth string) string { + headers := map[string]string{ + HeaderFieldCSeq: fmt.Sprintf("%d", cseq), + HeaderUserAgent: base.LALRTSPPullSessionUA, + } + if auth != "" { + headers[HeaderAuthorization] = auth + } + return packRequest(MethodOptions, uri, headers) } -func PackRequestDescribe(uri string, cseq int) string { - return fmt.Sprintf(RequestDescribeTmpl, uri, cseq) +// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段 +func PackRequestDescribe(uri string, cseq int, auth string) string { + headers := map[string]string{ + HeaderAccept: "application/sdp", + HeaderFieldCSeq: fmt.Sprintf("%d", cseq), + HeaderUserAgent: base.LALRTSPPullSessionUA, + } + if auth != "" { + headers[HeaderAuthorization] = auth + } + return packRequest(MethodDescribe, uri, headers) } // @param sessionID 可以为空,如果为空,则请求中不包含`Session`字段 -func PackRequestSetup(uri string, cseq int, sessionID string, rtpClientPort int, rtcpClientPort int) string { - if sessionID == "" { - return fmt.Sprintf(RequestSetupTmpl, uri, cseq, rtpClientPort, rtcpClientPort) +// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段 +func PackRequestSetup(uri string, cseq int, rtpClientPort int, rtcpClientPort int, sessionID string, auth string) string { + headers := map[string]string{ + HeaderFieldTransport: fmt.Sprintf("RTP/AVP/UDP;unicast;client_port=%d-%d", rtpClientPort, rtcpClientPort), + HeaderFieldCSeq: fmt.Sprintf("%d", cseq), + HeaderUserAgent: base.LALRTSPPullSessionUA, + } + if sessionID != "" { + headers[HeaderFieldSession] = sessionID } - return fmt.Sprintf(RequestSetupWithSessionTmpl, uri, cseq, sessionID, rtpClientPort, rtcpClientPort) + if auth != "" { + headers[HeaderAuthorization] = auth + } + return packRequest(MethodSetup, uri, headers) } -func PackRequestPlay(uri string, cseq int, sessionID string) string { - return fmt.Sprintf(RequestPlayTmpl, uri, cseq, sessionID) +// @param sessionID 可以为空,如果为空,则请求中不包含`Session`字段 +// @param auth 可以为空,如果为空,则请求中不包含`Authorization`字段 +func PackRequestSetupTCP(uri string, cseq int, rtpChannel int, rtcpChannel int, sessionID string, auth string) string { + headers := map[string]string{ + HeaderFieldCSeq: fmt.Sprintf("%d", cseq), + HeaderFieldTransport: fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", rtpChannel, rtcpChannel), + HeaderUserAgent: base.LALRTSPPullSessionUA, + } + if sessionID != "" { + headers[HeaderFieldSession] = sessionID + } + if auth != "" { + headers[HeaderAuthorization] = auth + } + return packRequest(MethodSetup, uri, headers) +} + +func PackRequestPlay(uri string, cseq int, sessionID string, auth string) string { + headers := map[string]string{ + HeaderFieldCSeq: fmt.Sprintf("%d", cseq), + HeaderFieldRange: "npt=0.000-", + HeaderFieldSession: sessionID, + HeaderUserAgent: base.LALRTSPPullSessionUA, + } + if auth != "" { + headers[HeaderAuthorization] = auth + } + return packRequest(MethodPlay, uri, headers) +} + +func PackRequestGetParameter(uri string, cseq int, sessionID string) string { + return fmt.Sprintf(RequestGetParameterTmpl, uri, cseq, sessionID) } func PackResponseOptions(cseq string) string { @@ -175,3 +214,12 @@ func PackResponsePlay(cseq string) string { func PackResponseTeardown(cseq string) string { return fmt.Sprintf(ResponseTeardownTmpl, cseq) } + +func packRequest(method, uri string, headers map[string]string) (ret string) { + ret = method + " " + uri + " RTSP/1.0\r\n" + for k, v := range headers { + ret += k + ": " + v + "\r\n" + } + ret += "\r\n" + return ret +} diff --git a/pkg/rtsp/rtsp.go b/pkg/rtsp/rtsp.go index 7028a73..71218a1 100644 --- a/pkg/rtsp/rtsp.go +++ b/pkg/rtsp/rtsp.go @@ -15,6 +15,8 @@ import ( "strconv" "strings" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/rtprtcp" "github.com/q191201771/naza/pkg/nazalog" @@ -29,25 +31,30 @@ import ( // - stat // - pub和sub存在一些重复代码 // - sub缺少主动发送sr -// - queue的策略,当一条流没数据之后 -// - 用context重写其它pull session +// - pull session回调有observer interface和on func回调两种方式,是否需要统一 var ErrRTSP = errors.New("lal.rtsp: fxxk") const ( - MethodOptions = "OPTIONS" - MethodAnnounce = "ANNOUNCE" - MethodDescribe = "DESCRIBE" - MethodSetup = "SETUP" - MethodRecord = "RECORD" - MethodPlay = "PLAY" - MethodTeardown = "TEARDOWN" + MethodOptions = "OPTIONS" + MethodAnnounce = "ANNOUNCE" + MethodDescribe = "DESCRIBE" + MethodSetup = "SETUP" + MethodRecord = "RECORD" + MethodPlay = "PLAY" + MethodTeardown = "TEARDOWN" + MethodGetParameter = "GET_PARAMETER" ) const ( - HeaderFieldCSeq = "CSeq" - HeaderFieldTransport = "Transport" - HeaderFieldSession = "Session" + HeaderAccept = "Accept" + HeaderUserAgent = "User-Agent" + HeaderFieldCSeq = "CSeq" + HeaderFieldTransport = "Transport" + HeaderFieldSession = "Session" + HeaderFieldRange = "Range" + HeaderWWWAuthenticate = "WWW-Authenticate" + HeaderAuthorization = "Authorization" ) const ( @@ -64,12 +71,23 @@ var ( // TODO chef: 参考协议标准,不要使用固定值 sessionID = "191201771" - minServerPort = uint16(8000) - maxServerPort = uint16(16000) + minServerPort = uint16(30000) + maxServerPort = uint16(60000) unpackerItemMaxSize = 1024 serverCommandSessionReadBufSize = 256 + + dummyRTPPacket = []byte{ + 0x80, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + } + + dummyRTCPPacket = []byte{ + 0x80, 0xc9, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + } ) var availUDPConnPool *nazanet.AvailUDPConnPool @@ -148,6 +166,25 @@ func parseTransport(setupTransport string, key string) (first, second uint16, er return uint16(iFirst), uint16(iSecond), err } +func isSupportType(t base.AVPacketPT) bool { + switch t { + case base.AVPacketPTAAC: + fallthrough + case base.AVPacketPTAVC: + fallthrough + case base.AVPacketPTHEVC: + return true + } + return false +} + +func makeSetupURI(urlCtx base.URLContext, aControl string) string { + if strings.HasPrefix(aControl, "rtsp://") { + return aControl + } + return fmt.Sprintf("%s/%s", urlCtx.RawURLWithoutUserInfo, aControl) +} + func init() { availUDPConnPool = nazanet.NewAvailUDPConnPool(minServerPort, maxServerPort) } @@ -177,9 +214,6 @@ func init() { // read http request. method=SETUP, uri=rtsp://localhost:5544/live/test110/streamid=1, headers=map[CSeq:4 Session:191201771 Transport:RTP/AVP/UDP;unicast;client_port=32184-32185;mode=record User-Agent:Lavf57.83.100], body= - server.go:95 // read http request. method=RECORD, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:5 Range:npt=0.000- Session:191201771 User-Agent:Lavf57.83.100], body= - server.go:95 // read http request. method=TEARDOWN, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:6 Session:191201771 User-Agent:Lavf57.83.100], body= - server.go:95 - -// read udp packet failed. err=read udp [::]:8002: use of closed network connection - server_pub_session.go:199 -// read udp packet failed. err=read udp [::]:8003: use of closed network connection - server_pub_session.go:199 // --------------------------------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------------------------------- @@ -221,6 +255,11 @@ func init() { // --------------------------------------------------------------------------------------------------------------------- // SUB(rtp over tcp) // +// read http request. method=OPTIONS, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:1 User-Agent:Lavf57.83.100], body= - server_command_session.go:136 +// read http request. method=DESCRIBE, uri=rtsp://localhost:5544/live/test110, headers=map[Accept:application/sdp CSeq:2 User-Agent:Lavf57.83.100], body= - server_command_session.go:136 +// read http request. method=SETUP, uri=rtsp://localhost:5544/live/test110/streamid=0, headers=map[CSeq:3 Transport:RTP/AVP/TCP;unicast;interleaved=0-1 User-Agent:Lavf57.83.100], body= - server_command_session.go:136 +// read http request. method=SETUP, uri=rtsp://localhost:5544/live/test110/streamid=1, headers=map[CSeq:4 Session:191201771 Transport:RTP/AVP/TCP;unicast;interleaved=2-3 User-Agent:Lavf57.83.100], body= - server_command_session.go:136 +// read http request. method=PLAY, uri=rtsp://localhost:5544/live/test110, headers=map[CSeq:5 Range:npt=0.000- Session:191201771 User-Agent:Lavf57.83.100], body= - server_command_session.go:136 // --------------------------------------------------------------------------------------------------------------------- // 8000 video rtp diff --git a/pkg/rtsp/server_command_session.go b/pkg/rtsp/server_command_session.go index 05bf3da..1c21af7 100644 --- a/pkg/rtsp/server_command_session.go +++ b/pkg/rtsp/server_command_session.go @@ -227,8 +227,8 @@ func (s *ServerCommandSession) handleDescribe(requestCtx nazahttp.HTTPReqMsgCtx) return ErrRTSP } - ctx, _ := sdp.ParseSDP2LogicContext(rawSDP) - s.subSession.InitWithSDP(rawSDP, ctx) + sdpLogicCtx, _ := sdp.ParseSDP2LogicContext(rawSDP) + s.subSession.InitWithSDP(rawSDP, sdpLogicCtx) resp := PackResponseDescribe(requestCtx.Headers[HeaderFieldCSeq], string(rawSDP)) _, err = s.conn.Write([]byte(resp)) diff --git a/pkg/rtsp/server_pub_session.go b/pkg/rtsp/server_pub_session.go index e9818b9..cc423c8 100644 --- a/pkg/rtsp/server_pub_session.go +++ b/pkg/rtsp/server_pub_session.go @@ -65,7 +65,7 @@ func (s *PubSession) SetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UDPCon } func (s *PubSession) SetupWithChannel(uri string, rtpChannel, rtcpChannel int, remoteAddr string) error { - return s.baseInSession.SetupWithChannel(uri, rtpChannel, rtcpChannel, remoteAddr) + return s.baseInSession.SetupWithChannel(uri, rtpChannel, rtcpChannel) } func (s *PubSession) Dispose() { diff --git a/pkg/rtsp/server_sub_session.go b/pkg/rtsp/server_sub_session.go index 2f1dc40..3b72ffe 100644 --- a/pkg/rtsp/server_sub_session.go +++ b/pkg/rtsp/server_sub_session.go @@ -178,14 +178,14 @@ func (s *SubSession) WriteRTPPacket(packet rtprtcp.RTPPacket) { atomic.AddUint64(&s.currConnStat.WroteBytesSum, uint64(len(packet.Raw))) switch packet.Header.PacketType { - case base.RTPPacketTypeAVCOrHEVC: + case uint8(s.sdpLogicCtx.VideoPayloadTypeOrigin): if s.videoRTPConn != nil { _ = s.videoRTPConn.Write(packet.Raw) } if s.videoRTPChannel != -1 { _ = s.cmdSession.Write(s.videoRTPChannel, packet.Raw) } - case base.RTPPacketTypeAAC: + case uint8(s.sdpLogicCtx.AudioPayloadTypeOrigin): if s.audioRTPConn != nil { _ = s.audioRTPConn.Write(packet.Raw) } diff --git a/pkg/sdp/sdp.go b/pkg/sdp/sdp.go index 90dedaf..b57faaa 100644 --- a/pkg/sdp/sdp.go +++ b/pkg/sdp/sdp.go @@ -22,26 +22,41 @@ import ( var ErrSDP = errors.New("lal.sdp: fxxk") const ( - ARTPMapEncodingName = "H265" + ARTPMapEncodingNameH265 = "H265" + ARTPMapEncodingNameH264 = "H264" + ARTPMapEncodingNameAAC = "MPEG4-GENERIC" ) type LogicContext struct { - AudioClockRate int - VideoClockRate int - AudioPayloadType base.AVPacketPT - VideoPayloadType base.AVPacketPT - AudioAControl string - VideoAControl string - ASC []byte - VPS []byte - SPS []byte - PPS []byte + HasAudio bool + HasVideo bool + AudioClockRate int + VideoClockRate int + AudioPayloadTypeOrigin int + VideoPayloadTypeOrigin int + AudioPayloadType base.AVPacketPT + VideoPayloadType base.AVPacketPT + AudioAControl string + VideoAControl string + ASC []byte + VPS []byte + SPS []byte + PPS []byte +} + +type MediaDesc struct { + M M + ARTPMap ARTPMap + AFmtBase AFmtPBase + AControl AControl } type RawContext struct { - ARTPMapList []ARTPMap - AFmtPBaseList []AFmtPBase - AControlList []AControl + MediaDescList []MediaDesc +} + +type M struct { + Media string } type ARTPMap struct { @@ -68,47 +83,45 @@ func ParseSDP2LogicContext(b []byte) (LogicContext, error) { return ret, err } - for i, item := range c.ARTPMapList { - switch item.PayloadType { - case base.RTPPacketTypeAVCOrHEVC: - ret.VideoClockRate = item.ClockRate - if item.EncodingName == ARTPMapEncodingName { - ret.VideoPayloadType = base.AVPacketPTHEVC + for _, md := range c.MediaDescList { + switch md.M.Media { + case "audio": + ret.HasAudio = true + ret.AudioClockRate = md.ARTPMap.ClockRate + ret.AudioAControl = md.AControl.Value + + ret.AudioPayloadTypeOrigin = md.ARTPMap.PayloadType + if md.ARTPMap.EncodingName == ARTPMapEncodingNameAAC { + ret.AudioPayloadType = base.AVPacketPTAAC + ret.ASC, err = ParseASC(md.AFmtBase) + if err != nil { + return ret, err + } } else { - ret.VideoPayloadType = base.AVPacketPTAVC - } - if i < len(c.AControlList) { - ret.VideoAControl = c.AControlList[i].Value - } - case base.RTPPacketTypeAAC: - ret.AudioClockRate = item.ClockRate - ret.AudioPayloadType = base.AVPacketPTAAC - if i < len(c.AControlList) { - ret.AudioAControl = c.AControlList[i].Value + ret.AudioPayloadType = base.AVPacketPTUnknown } - default: - return ret, ErrSDP - } - } - - for _, item := range c.AFmtPBaseList { - switch item.Format { - case base.RTPPacketTypeAVCOrHEVC: - if ret.VideoPayloadType == base.AVPacketPTHEVC { - ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(item) - } else { - ret.SPS, ret.PPS, err = ParseSPSPPS(item) - } - if err != nil { - return ret, err - } - case base.RTPPacketTypeAAC: - ret.ASC, err = ParseASC(item) - if err != nil { - return ret, err + case "video": + ret.HasVideo = true + ret.VideoClockRate = md.ARTPMap.ClockRate + ret.VideoAControl = md.AControl.Value + + ret.VideoPayloadTypeOrigin = md.ARTPMap.PayloadType + switch md.ARTPMap.EncodingName { + case ARTPMapEncodingNameH264: + ret.VideoPayloadType = base.AVPacketPTAVC + ret.SPS, ret.PPS, err = ParseSPSPPS(md.AFmtBase) + if err != nil { + return ret, err + } + case ARTPMapEncodingNameH265: + ret.VideoPayloadType = base.AVPacketPTHEVC + ret.VPS, ret.SPS, ret.PPS, err = ParseVPSSPSPPS(md.AFmtBase) + if err != nil { + return ret, err + } + default: + ret.VideoPayloadType = base.AVPacketPTUnknown } - default: - return ret, ErrSDP } } @@ -117,37 +130,74 @@ func ParseSDP2LogicContext(b []byte) (LogicContext, error) { // 例子见单元测试 func ParseSDP2RawContext(b []byte) (RawContext, error) { - var sdpCtx RawContext + var ( + sdpCtx RawContext + md *MediaDesc + ) s := string(b) lines := strings.Split(s, "\r\n") for _, line := range lines { + if strings.HasPrefix(line, "m=") { + m, err := ParseM(line) + if err != nil { + return sdpCtx, err + } + if md != nil { + sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md) + } + md = &MediaDesc{ + M: m, + } + } if strings.HasPrefix(line, "a=rtpmap") { aRTPMap, err := ParseARTPMap(line) if err != nil { return sdpCtx, err } - sdpCtx.ARTPMapList = append(sdpCtx.ARTPMapList, aRTPMap) + if md == nil { + continue + } + md.ARTPMap = aRTPMap } if strings.HasPrefix(line, "a=fmtp") { aFmtPBase, err := ParseAFmtPBase(line) if err != nil { return sdpCtx, err } - sdpCtx.AFmtPBaseList = append(sdpCtx.AFmtPBaseList, aFmtPBase) + if md == nil { + continue + } + md.AFmtBase = aFmtPBase } if strings.HasPrefix(line, "a=control") { aControl, err := ParseAControl(line) if err != nil { return sdpCtx, err } - sdpCtx.AControlList = append(sdpCtx.AControlList, aControl) + if md == nil { + continue + } + md.AControl = aControl } } + if md != nil { + sdpCtx.MediaDescList = append(sdpCtx.MediaDescList, *md) + } return sdpCtx, nil } +func ParseM(s string) (ret M, err error) { + ss := strings.TrimPrefix(s, "m=") + items := strings.Split(ss, " ") + if len(items) < 1 { + return ret, ErrSDP + } + ret.Media = items[0] + return +} + // 例子见单元测试 func ParseARTPMap(s string) (ret ARTPMap, err error) { // rfc 3640 3.3.1. General @@ -262,10 +312,6 @@ func ParseASC(a AFmtPBase) ([]byte, error) { } func ParseVPSSPSPPS(a AFmtPBase) (vps, sps, pps []byte, err error) { - if a.Format != base.RTPPacketTypeAVCOrHEVC { - return nil, nil, nil, ErrSDP - } - v, ok := a.Parameters["sprop-vps"] if !ok { return nil, nil, nil, ErrSDP @@ -296,10 +342,6 @@ func ParseVPSSPSPPS(a AFmtPBase) (vps, sps, pps []byte, err error) { // 解析AVC/H264的sps,pps // 例子见单元测试 func ParseSPSPPS(a AFmtPBase) (sps, pps []byte, err error) { - if a.Format != base.RTPPacketTypeAVCOrHEVC { - return nil, nil, ErrSDP - } - v, ok := a.Parameters["sprop-parameter-sets"] if !ok { return nil, nil, ErrSDP diff --git a/pkg/sdp/sdp_test.go b/pkg/sdp/sdp_test.go index bf84150..6718355 100644 --- a/pkg/sdp/sdp_test.go +++ b/pkg/sdp/sdp_test.go @@ -10,6 +10,7 @@ package sdp_test import ( "encoding/hex" + "strings" "testing" "github.com/q191201771/lal/pkg/base" @@ -46,17 +47,6 @@ var goldenPPS = []byte{ 0x68, 0xEB, 0xEC, 0xB2, 0x2C, } -func TestParseSDP2LogicContext(t *testing.T) { - ctx, err := sdp.ParseSDP2LogicContext([]byte(goldenSDP)) - assert.Equal(t, nil, err) - assert.Equal(t, 44100, ctx.AudioClockRate) - assert.Equal(t, 90000, ctx.VideoClockRate) - assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType) - assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType) - assert.Equal(t, "streamid=1", ctx.AudioAControl) - assert.Equal(t, "streamid=0", ctx.VideoAControl) -} - func TestParseSDP2RawContext(t *testing.T) { sdpCtx, err := sdp.ParseSDP2RawContext([]byte(goldenSDP)) assert.Equal(t, nil, err) @@ -161,3 +151,170 @@ func TestParseVPSSPSPPS(t *testing.T) { nazalog.Debugf("%s", hex.Dump(sps)) nazalog.Debugf("%s", hex.Dump(pps)) } + +func TestParseSDP2LogicContext(t *testing.T) { + ctx, err := sdp.ParseSDP2LogicContext([]byte(goldenSDP)) + assert.Equal(t, nil, err) + assert.Equal(t, true, ctx.HasAudio) + assert.Equal(t, true, ctx.HasVideo) + assert.Equal(t, 44100, ctx.AudioClockRate) + assert.Equal(t, 90000, ctx.VideoClockRate) + assert.Equal(t, 97, ctx.AudioPayloadTypeOrigin) + assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin) + assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType) + assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType) + assert.Equal(t, "streamid=1", ctx.AudioAControl) + assert.Equal(t, "streamid=0", ctx.VideoAControl) + assert.IsNotNil(t, ctx.ASC) + assert.Equal(t, nil, ctx.VPS) + assert.IsNotNil(t, ctx.SPS) + assert.IsNotNil(t, ctx.PPS) +} + +func TestCase2(t *testing.T) { + golden := `v=0 +o=- 2252316233 2252316233 IN IP4 0.0.0.0 +s=Media Server +c=IN IP4 0.0.0.0 +t=0 0 +a=control:* +a=packetization-supported:DH +a=rtppayload-supported:DH +a=range:npt=now- +m=video 0 RTP/AVP 98 +a=control:trackID=0 +a=framerate:25.000000 +a=rtpmap:98 H265/90000 +a=fmtp:98 profile-id=1;sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBaoAWCAJBY2uSTL5A=;sprop-pps=RAHA8vA8kA==;sprop-vps=QAEMAf//AWAAAAMAsAAAAwAAAwBarAk= +a=recvonly` + golden = strings.ReplaceAll(golden, "\n", "\r\n") + ctx, err := sdp.ParseSDP2LogicContext([]byte(golden)) + assert.Equal(t, nil, err) + assert.Equal(t, false, ctx.HasAudio) + assert.Equal(t, true, ctx.HasVideo) + assert.Equal(t, 90000, ctx.VideoClockRate) + assert.Equal(t, 98, ctx.VideoPayloadTypeOrigin) + assert.Equal(t, base.AVPacketPTHEVC, ctx.VideoPayloadType) + assert.Equal(t, "trackID=0", ctx.VideoAControl) + assert.Equal(t, nil, ctx.ASC) + assert.IsNotNil(t, ctx.VPS) + assert.IsNotNil(t, ctx.SPS) + assert.IsNotNil(t, ctx.PPS) + nazalog.Debugf("%+v", ctx) +} + +func TestCase3(t *testing.T) { + golden := `v=0 +o=- 2252310609 2252310609 IN IP4 0.0.0.0 +s=Media Server +c=IN IP4 0.0.0.0 +t=0 0 +a=control:* +a=packetization-supported:DH +a=rtppayload-supported:DH +a=range:npt=now- +m=video 0 RTP/AVP 96 +a=control:trackID=0 +a=framerate:25.000000 +a=rtpmap:96 H264/90000 +a=fmtp:96 packetization-mode=1;profile-level-id=4D002A;sprop-parameter-sets=Z00AKp2oHgCJ+WbgICAoAAAfQAAGGoQgAA==,aO48gAA= +a=recvonly +m=audio 0 RTP/AVP 97 +a=control:trackID=1 +a=rtpmap:97 MPEG4-GENERIC/48000 +a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1188 +a=recvonly` + golden = strings.ReplaceAll(golden, "\n", "\r\n") + ctx, err := sdp.ParseSDP2LogicContext([]byte(golden)) + assert.Equal(t, nil, err) + assert.Equal(t, true, ctx.HasAudio) + assert.Equal(t, true, ctx.HasVideo) + assert.Equal(t, 48000, ctx.AudioClockRate) + assert.Equal(t, 90000, ctx.VideoClockRate) + assert.Equal(t, 97, ctx.AudioPayloadTypeOrigin) + assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin) + assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType) + assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType) + assert.Equal(t, "trackID=1", ctx.AudioAControl) + assert.Equal(t, "trackID=0", ctx.VideoAControl) + assert.IsNotNil(t, ctx.ASC) + assert.Equal(t, nil, ctx.VPS) + assert.IsNotNil(t, ctx.SPS) + assert.IsNotNil(t, ctx.PPS) + nazalog.Debugf("%+v", ctx) +} + +func TestCase4(t *testing.T) { + golden := `v=0 +o=- 1109162014219182 0 IN IP4 0.0.0.0 +s=HIK Media Server V3.4.103 +i=HIK Media Server Session Description : standard +e=NONE +c=IN IP4 0.0.0.0 +t=0 0 +a=control:* +b=AS:1034 +a=range:npt=now- +m=video 0 RTP/AVP 96 +i=Video Media +a=rtpmap:96 H264/90000 +a=fmtp:96 profile-level-id=4D0014;packetization-mode=0;sprop-parameter-sets=Z2QAIK2EAQwgCGEAQwgCGEAQwgCEO1AoA803AQEBQAAAAwBAAAAMoQ==,aO48sA== +a=control:trackID=video +b=AS:1024 +m=audio 0 RTP/AVP 8 +i=Audio Media +a=rtpmap:8 PCMA/8000 +a=control:trackID=audio +b=AS:10 +a=Media_header:MEDIAINFO=494D4B48020100000400000111710110401F000000FA000000000000000000000000000000000000; +a=appversion:1.0` + golden = strings.ReplaceAll(golden, "\n", "\r\n") + ctx, err := sdp.ParseSDP2LogicContext([]byte(golden)) + assert.Equal(t, nil, err) + assert.Equal(t, true, ctx.HasAudio) + assert.Equal(t, true, ctx.HasVideo) + assert.Equal(t, 8000, ctx.AudioClockRate) + assert.Equal(t, 90000, ctx.VideoClockRate) + assert.Equal(t, 8, ctx.AudioPayloadTypeOrigin) + assert.Equal(t, 96, ctx.VideoPayloadTypeOrigin) + //assert.Equal(t, base.AVPacketPTAAC, ctx.AudioPayloadType) + assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType) + assert.Equal(t, "trackID=audio", ctx.AudioAControl) + assert.Equal(t, "trackID=video", ctx.VideoAControl) + assert.Equal(t, nil, ctx.ASC) + assert.Equal(t, nil, ctx.VPS) + assert.IsNotNil(t, ctx.SPS) + assert.IsNotNil(t, ctx.PPS) + nazalog.Debugf("%+v", ctx) +} + +func TestCase5(t *testing.T) { + golden := `v=0 +o=- 1001 1 IN IP4 192.168.0.221 +s=VCP IPC Realtime stream +m=video 0 RTP/AVP 105 +c=IN IP4 192.168.0.221 +a=control:rtsp://192.168.0.221/media/video1/video +a=rtpmap:105 H264/90000 +a=fmtp:105 profile-level-id=64002a; packetization-mode=1; sprop-parameter-sets=Z2QAKq2EAQwgCGEAQwgCGEAQwgCEO1A8ARPyzcBAQFAAAD6AAAnECEA=,aO4xshs= +a=recvonly +m=application 0 RTP/AVP 107 +c=IN IP4 192.168.0.221 +a=control:rtsp://192.168.0.221/media/video1/metadata +a=rtpmap:107 vnd.onvif.metadata/90000 +a=fmtp:107 DecoderTag=h3c-v3 RTCP=0 +a=recvonly` + golden = strings.ReplaceAll(golden, "\n", "\r\n") + ctx, err := sdp.ParseSDP2LogicContext([]byte(golden)) + assert.Equal(t, nil, err) + assert.Equal(t, false, ctx.HasAudio) + assert.Equal(t, true, ctx.HasVideo) + assert.Equal(t, 90000, ctx.VideoClockRate) + assert.Equal(t, 105, ctx.VideoPayloadTypeOrigin) + assert.Equal(t, base.AVPacketPTAVC, ctx.VideoPayloadType) + assert.Equal(t, "rtsp://192.168.0.221/media/video1/video", ctx.VideoAControl) + assert.Equal(t, nil, ctx.VPS) + assert.IsNotNil(t, ctx.SPS) + assert.IsNotNil(t, ctx.PPS) + nazalog.Debugf("%+v", ctx) +}