From a11723bde792092b4235145de460387844c970e4 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 11 Jul 2020 15:15:20 +0800 Subject: [PATCH] =?UTF-8?q?1.=20[feat]=20package=20rtsp:=20=E5=BC=80?= =?UTF-8?q?=E5=A7=8B=E5=A4=84=E7=90=86=E6=94=B6=E5=88=B0=E7=9A=84rtp?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=202.=20[style]=20Nalu=E6=9B=B4=E6=94=B9?= =?UTF-8?q?=E4=B8=BANALU?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/demo/analyseflv/analyseflv.go | 6 +- app/demo/flvfile2rtmppush/flvfile2rtmppush.go | 2 +- pkg/avc/avc.go | 40 ++--- pkg/hevc/hevc.go | 37 ++-- pkg/hls/fragment.go | 11 +- pkg/hls/muxer.go | 7 +- pkg/httpflv/httpflv.go | 4 +- pkg/httpflv/tag.go | 12 +- pkg/logic/gop_cache.go | 2 +- pkg/logic/group.go | 5 - pkg/logic/server_manager.go | 2 - pkg/rtmp/chunk_composer.go | 6 +- pkg/rtmp/rtmp.go | 16 +- pkg/rtsp/rtcp.go | 79 ++++++++- pkg/rtsp/rtcp_packer.go | 14 ++ pkg/rtsp/rtcp_server.go | 15 +- pkg/rtsp/rtp.go | 71 +++++++- pkg/rtsp/rtp_server.go | 41 ++++- pkg/rtsp/rtp_session.go | 159 ++++++++++++++++++ pkg/rtsp/rtsp.go | 4 +- pkg/rtsp/{packer.go => rtsp_packer.go} | 0 pkg/rtsp/{server.go => rtsp_server.go} | 3 +- pkg/rtsp/sdp.go | 108 ++++++++++++ pkg/rtsp/udp_server.go | 1 - 24 files changed, 541 insertions(+), 104 deletions(-) create mode 100644 pkg/rtsp/rtcp_packer.go create mode 100644 pkg/rtsp/rtp_session.go rename pkg/rtsp/{packer.go => rtsp_packer.go} (100%) rename pkg/rtsp/{server.go => rtsp_server.go} (98%) create mode 100644 pkg/rtsp/sdp.go diff --git a/app/demo/analyseflv/analyseflv.go b/app/demo/analyseflv/analyseflv.go index cf2ad3f..bc90186 100644 --- a/app/demo/analyseflv/analyseflv.go +++ b/app/demo/analyseflv/analyseflv.go @@ -183,15 +183,15 @@ func analysisVideoTag(tag httpflv.Tag) { } switch t { case typeAVC: - if avc.CalcNaluType(body[i+4:]) == avc.NaluUnitTypeIDRSlice { + if avc.CalcNALUType(body[i+4:]) == avc.NALUTypeIDRSlice { if prevIDRTS != int64(-1) { diffIDRTS = int64(tag.Header.Timestamp) - prevIDRTS } prevIDRTS = int64(tag.Header.Timestamp) } - buf.WriteString(fmt.Sprintf(" [%s(%s)] ", avc.CalcNaluTypeReadable(body[i+4:]), avc.CalcSliceTypeReadable(body[i+4:]))) + buf.WriteString(fmt.Sprintf(" [%s(%s)] ", avc.CalcNALUTypeReadable(body[i+4:]), avc.CalcSliceTypeReadable(body[i+4:]))) case typeHEVC: - buf.WriteString(fmt.Sprintf(" [%s] ", hevc.CalcNaluTypeReadable(body[i+4:]))) + buf.WriteString(fmt.Sprintf(" [%s] ", hevc.CalcNALUTypeReadable(body[i+4:]))) } i = i + 4 + int(naluLen) } diff --git a/app/demo/flvfile2rtmppush/flvfile2rtmppush.go b/app/demo/flvfile2rtmppush/flvfile2rtmppush.go index 03e911a..da81f9a 100644 --- a/app/demo/flvfile2rtmppush/flvfile2rtmppush.go +++ b/app/demo/flvfile2rtmppush/flvfile2rtmppush.go @@ -78,7 +78,7 @@ func readAllTag(filename string) (ret []httpflv.Tag) { log.Debugf("M %d", tag.Header.Timestamp) } else if tag.IsVideoKeySeqHeader() { log.Debugf("V SH %d", tag.Header.Timestamp) - } else if tag.IsVideoKeyNalu() { + } else if tag.IsVideoKeyNALU() { log.Debugf("V K %d", tag.Header.Timestamp) } else if tag.IsAACSeqHeader() { log.Debugf("A SH %d", tag.Header.Timestamp) diff --git a/pkg/avc/avc.go b/pkg/avc/avc.go index 35506cc..122636d 100644 --- a/pkg/avc/avc.go +++ b/pkg/avc/avc.go @@ -18,9 +18,9 @@ import ( var ErrAVC = errors.New("lal.avc: fxxk") -var NaluStartCode = []byte{0x0, 0x0, 0x0, 0x1} +var NALUStartCode = []byte{0x0, 0x0, 0x0, 0x1} -var NaluUintTypeMapping = map[uint8]string{ +var NALUTypeMapping = map[uint8]string{ 1: "SLICE", 5: "IDR", 6: "SEI", @@ -43,12 +43,12 @@ var SliceTypeMapping = map[uint8]string{ } const ( - NaluUnitTypeSlice uint8 = 1 - NaluUnitTypeIDRSlice uint8 = 5 - NaluUnitTypeSEI uint8 = 6 - NaluUintTypeSPS uint8 = 7 - NaluUintTypePPS uint8 = 8 - NaluUintTypeAUD uint8 = 9 + NALUTypeSlice uint8 = 1 + NALUTypeIDRSlice uint8 = 5 + NALUTypeSEI uint8 = 6 + NALUTypeSPS uint8 = 7 + NALUTypePPS uint8 = 8 + NALUTypeAUD uint8 = 9 ) const ( @@ -59,7 +59,7 @@ const ( SliceTypeSI uint8 = 4 // TODO chef ) -func CalcNaluType(nalu []byte) uint8 { +func CalcNALUType(nalu []byte) uint8 { return nalu[0] & 0x1f } @@ -82,9 +82,9 @@ func CalcSliceType(nalu []byte) uint8 { return uint8(sliceType) } -func CalcNaluTypeReadable(nalu []byte) string { +func CalcNALUTypeReadable(nalu []byte) string { t := nalu[0] & 0x1f - ret, ok := NaluUintTypeMapping[t] + ret, ok := NALUTypeMapping[t] if !ok { return "unknown" } @@ -92,13 +92,13 @@ func CalcNaluTypeReadable(nalu []byte) string { } func CalcSliceTypeReadable(nalu []byte) string { - naluType := CalcNaluType(nalu) + naluType := CalcNALUType(nalu) switch naluType { - case NaluUnitTypeSEI: + case NALUTypeSEI: fallthrough - case NaluUintTypeSPS: + case NALUTypeSPS: fallthrough - case NaluUintTypePPS: + case NALUTypePPS: return "" } @@ -167,9 +167,9 @@ func CaptureAVC(w io.Writer, payload []byte) error { return err } //utilErrors.PanicIfErrorOccur(err) - _, _ = w.Write(NaluStartCode) + _, _ = w.Write(NALUStartCode) _, _ = w.Write(sps) - _, _ = w.Write(NaluStartCode) + _, _ = w.Write(NALUStartCode) _, _ = w.Write(pps) return nil } @@ -179,9 +179,9 @@ func CaptureAVC(w io.Writer, payload []byte) error { for i := 5; i != len(payload); { naluLen := int(bele.BEUint32(payload[i:])) i += 4 - //naluUintType := payload[i] & 0x1f - //log.Debugf("naluLen:%d t:%d %s\n", naluLen, naluUintType, avc.NaluUintTypeMapping[naluUintType]) - _, _ = w.Write(NaluStartCode) + //naluType := payload[i] & 0x1f + //log.Debugf("naluLen:%d t:%d %s\n", naluLen, naluType, avc.NALUTypeMapping[naluUintType]) + _, _ = w.Write(NALUStartCode) _, _ = w.Write(payload[i : i+naluLen]) i += naluLen break diff --git a/pkg/hevc/hevc.go b/pkg/hevc/hevc.go index df04491..2f355c3 100644 --- a/pkg/hevc/hevc.go +++ b/pkg/hevc/hevc.go @@ -8,32 +8,35 @@ package hevc -var NaluUintTypeMapping = map[uint8]string{ - NaluUnitTypeSliceTrailR: "SLICE", - NaluUnitTypeSliceIDR: "I", - NaluUintTypeSliceIDRNLP: "IDR", - NaluUnitTypeSEI: "SEI", - NaluUnitTypeSEISuffix: "SEI", +var NALUTypeMapping = map[uint8]string{ + NALUTypeSliceTrailR: "SLICE", + NALUTypeSliceIDR: "I", + NALUTypeSliceIDRNLP: "IDR", + NALUTypeSEI: "SEI", + NALUTypeSEISuffix: "SEI", } var ( - NaluUnitTypeSliceTrailR uint8 = 1 // 0x01 - NaluUnitTypeSliceIDR uint8 = 19 // 0x13 - NaluUintTypeSliceIDRNLP uint8 = 20 // 0x14 - NaluUnitTypeVPS uint8 = 32 // 0x20 - NaluUnitTypeSPS uint8 = 33 // 0x21 - NaluUnitTypePPS uint8 = 34 // 0x22 - NaluUnitTypeSEI uint8 = 39 // 0x27 - NaluUnitTypeSEISuffix uint8 = 40 // 0x28 + NALUTypeSliceTrailR uint8 = 1 // 0x01 + NALUTypeSliceIDR uint8 = 19 // 0x13 + NALUTypeSliceIDRNLP uint8 = 20 // 0x14 + NALUTypeVPS uint8 = 32 // 0x20 + NALUTypeSPS uint8 = 33 // 0x21 + NALUTypePPS uint8 = 34 // 0x22 + NALUTypeSEI uint8 = 39 // 0x27 + NALUTypeSEISuffix uint8 = 40 // 0x28 ) -func CalcNaluTypeReadable(nalu []byte) string { - b, ok := NaluUintTypeMapping[CalcNaluType(nalu)] +func CalcNALUTypeReadable(nalu []byte) string { + b, ok := NALUTypeMapping[CalcNALUType(nalu)] if !ok { return "unknown" } return b } -func CalcNaluType(nalu []byte) uint8 { +func CalcNALUType(nalu []byte) uint8 { + // 6 bit in middle + // 0*** ***0 + // or return (nalu[0] >> 1) & 0x3F return (nalu[0] & 0x7E) >> 1 } diff --git a/pkg/hls/fragment.go b/pkg/hls/fragment.go index 6f4bbbb..43b48d0 100644 --- a/pkg/hls/fragment.go +++ b/pkg/hls/fragment.go @@ -13,8 +13,8 @@ import ( ) type FragmentOP struct { - fp *os.File - packet []byte //WriteFrame中缓存每个TS包数据 + fp *os.File + packet []byte //WriteFrame中缓存每个TS包数据 } type mpegTSFrame struct { @@ -62,12 +62,12 @@ func (f *FragmentOP) WriteFrame(frame *mpegTSFrame, b []byte) { // continuity_counter // ------------------------------ f.packet[0] = syncByte // sync_byte - f.packet[1] = 0x0; + f.packet[1] = 0x0 if first { f.packet[1] = 0x40 // payload_unit_start_indicator } - f.packet[1] |= uint8((frame.pid >> 8) & 0x1F) //PID高5位 - f.packet[2] = uint8(frame.pid & 0xFF) //PID低8位 + f.packet[1] |= uint8((frame.pid >> 8) & 0x1F) //PID高5位 + f.packet[2] = uint8(frame.pid & 0xFF) //PID低8位 // adaptation_field_control 先设置成无Adaptation // continuity_counter @@ -195,7 +195,6 @@ func (f *FragmentOP) WriteFrame(frame *mpegTSFrame, b []byte) { f.packet[3] |= 0x20 - base := 4 if wpos > base { copy(f.packet[base+stuffSize:], f.packet[base:wpos]) diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index be2583b..fe36d4e 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -97,6 +97,7 @@ func (m *Muxer) Dispose() { m.closeFragment() } +// 函数调用结束后,内部不持有msg中的内存块 func (m *Muxer) FeedRTMPMessage(msg rtmp.AVMsg) { switch msg.Header.MsgTypeID { case rtmp.TypeidAudio: @@ -241,10 +242,8 @@ func (m *Muxer) cacheAACSeqHeader(msg rtmp.AVMsg) { } func (m *Muxer) cacheSPSPPS(msg rtmp.AVMsg) { - // 分配新内存来缓存SPS_PPS的msg - // 这样就可以不依赖func (group *Group) OnReadRTMPAVMsg中的msg变量 - m.spspps = make([]byte,len(msg.Payload)) - copy(m.spspps,msg.Payload) + m.spspps = make([]byte, len(msg.Payload)) + copy(m.spspps, msg.Payload) } func (m *Muxer) appendSPSPPS(out []byte) []byte { diff --git a/pkg/httpflv/httpflv.go b/pkg/httpflv/httpflv.go index 6325f4a..8a527b7 100644 --- a/pkg/httpflv/httpflv.go +++ b/pkg/httpflv/httpflv.go @@ -47,10 +47,10 @@ const ( const ( AVCPacketTypeSeqHeader uint8 = 0 - AVCPacketTypeNalu uint8 = 1 + AVCPacketTypeNALU uint8 = 1 HEVCPacketTypeSeqHeader uint8 = 0 - HEVCPacketTypeNalu uint8 = 1 + HEVCPacketTypeNALU uint8 = 1 AACPacketTypeSeqHeader uint8 = 0 AACPacketTypeRaw uint8 = 1 diff --git a/pkg/httpflv/tag.go b/pkg/httpflv/tag.go index 6d3746e..00bcc71 100644 --- a/pkg/httpflv/tag.go +++ b/pkg/httpflv/tag.go @@ -55,17 +55,17 @@ func (tag *Tag) IsVideoKeySeqHeader() bool { return tag.IsAVCKeySeqHeader() || tag.IsHEVCKeySeqHeader() } -func (tag *Tag) IsAVCKeyNalu() bool { - return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == AVCKeyFrame && tag.Raw[TagHeaderSize+1] == AVCPacketTypeNalu +func (tag *Tag) IsAVCKeyNALU() bool { + return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == AVCKeyFrame && tag.Raw[TagHeaderSize+1] == AVCPacketTypeNALU } -func (tag *Tag) IsHEVCKeyNalu() bool { - return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == HEVCKeyFrame && tag.Raw[TagHeaderSize+1] == HEVCPacketTypeNalu +func (tag *Tag) IsHEVCKeyNALU() bool { + return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == HEVCKeyFrame && tag.Raw[TagHeaderSize+1] == HEVCPacketTypeNALU } // AVC或HEVC的关键帧 -func (tag *Tag) IsVideoKeyNalu() bool { - return tag.IsAVCKeyNalu() || tag.IsHEVCKeyNalu() +func (tag *Tag) IsVideoKeyNALU() bool { + return tag.IsAVCKeyNALU() || tag.IsHEVCKeyNALU() } func (tag *Tag) IsAACSeqHeader() bool { diff --git a/pkg/logic/gop_cache.go b/pkg/logic/gop_cache.go index 69ce220..0eb21b2 100644 --- a/pkg/logic/gop_cache.go +++ b/pkg/logic/gop_cache.go @@ -99,7 +99,7 @@ func (gc *GOPCache) Feed(msg rtmp.AVMsg, lg LazyGet) { // 这个size的判断去掉也行 if gc.gopSize > 1 { - if msg.IsVideoKeyNalu() { + if msg.IsVideoKeyNALU() { gc.feedNewGOP(msg, lg()) } else { gc.feedLastGOP(msg, lg()) diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 4674678..62e27e3 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -275,11 +275,6 @@ func (group *Group) OnReadRTMPAVMsg(msg rtmp.AVMsg) { group.mutex.Lock() defer group.mutex.Unlock() - //因为group.broadcastRTMP和group.hlsMuxer.FeedRTMPMessage都不引用msg了,所以不需要复制数据了 - //p := make([]byte, len(msg.Payload)) - //copy(p, msg.Payload) - //msg.Payload = p - //nazalog.Debugf("%+v, %02x, %02x", msg.Header, msg.Payload[0], msg.Payload[1]) group.broadcastRTMP(msg) diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 71e17b3..ce61377 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -204,8 +204,6 @@ func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Gr group = NewGroup(appName, streamName) sm.groupMap[streamName] = group - // 只在第1个ServerSession产生时启动这个group协程 - // 注: 创建的group协程暂时做结构设计预留,现在并没有实际动作,以后可以用协程执行OnReadRTMPAVMsg中数据转发 go group.RunLoop() } return group diff --git a/pkg/rtmp/chunk_composer.go b/pkg/rtmp/chunk_composer.go index 40e8b11..e3cd631 100644 --- a/pkg/rtmp/chunk_composer.go +++ b/pkg/rtmp/chunk_composer.go @@ -108,6 +108,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { case 3: // noop } + //nazalog.Debugf("RTMP_CHUNK_COMPOSER chunk.fmt=%d, csid=%d, header=%+v", fmt, csid, stream.header) // 5.3.1.3 Extended Timestamp // 使用ffmpeg推流时,发现时间戳超过3字节最大值后,即使是fmt3(即包头大小为0),依然存在ext ts字段 @@ -121,6 +122,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { return err } stream.header.Timestamp = bele.BEUint32(bootstrap) + //nazalog.Debugf("RTMP_CHUNK_COMPOSER ext. extTs=%d", stream.header.Timestamp) switch fmt { case 0: stream.header.TimestampAbs = stream.header.Timestamp @@ -132,8 +134,6 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { // noop } } - //stream.header.CSID = csid - //nazalog.Debugf("ChunkComposer chunk fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs) var neededSize uint32 if stream.header.MsgLen <= c.peerChunkSize { @@ -164,7 +164,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error { stream.header.TimestampAbs += stream.header.Timestamp } absTsFlag = false - //nazalog.Debugf("ChunkComposer message fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs) + //nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v", fmt, csid, stream.header) if err := cb(stream); err != nil { return err diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/rtmp.go index 8e3d462..e16e49a 100644 --- a/pkg/rtmp/rtmp.go +++ b/pkg/rtmp/rtmp.go @@ -81,10 +81,10 @@ const ( HEVCInterFrame = frameTypeInter<<4 | codecIDHEVC AVCPacketTypeSeqHeader uint8 = 0 - AVCPacketTypeNalu uint8 = 1 + AVCPacketTypeNALU uint8 = 1 HEVCPacketTypeSeqHeader uint8 = 0 - HEVCPacketTypeNalu uint8 = 1 + HEVCPacketTypeNALU uint8 = 1 AACPacketTypeSeqHeader uint8 = 0 AACPacketTypeRaw uint8 = 1 @@ -104,16 +104,16 @@ func (msg AVMsg) IsVideoKeySeqHeader() bool { return msg.IsAVCKeySeqHeader() || msg.IsHEVCKeySeqHeader() } -func (msg AVMsg) IsAVCKeyNalu() bool { - return msg.Header.MsgTypeID == TypeidVideo && msg.Payload[0] == AVCKeyFrame && msg.Payload[1] == AVCPacketTypeNalu +func (msg AVMsg) IsAVCKeyNALU() bool { + return msg.Header.MsgTypeID == TypeidVideo && msg.Payload[0] == AVCKeyFrame && msg.Payload[1] == AVCPacketTypeNALU } -func (msg AVMsg) IsHEVCKeyNalu() bool { - return msg.Header.MsgTypeID == TypeidVideo && msg.Payload[0] == HEVCKeyFrame && msg.Payload[1] == HEVCPacketTypeNalu +func (msg AVMsg) IsHEVCKeyNALU() bool { + return msg.Header.MsgTypeID == TypeidVideo && msg.Payload[0] == HEVCKeyFrame && msg.Payload[1] == HEVCPacketTypeNALU } -func (msg AVMsg) IsVideoKeyNalu() bool { - return msg.IsAVCKeyNalu() || msg.IsHEVCKeyNalu() +func (msg AVMsg) IsVideoKeyNALU() bool { + return msg.IsAVCKeyNALU() || msg.IsHEVCKeyNALU() } func (msg AVMsg) IsAACSeqHeader() bool { diff --git a/pkg/rtsp/rtcp.go b/pkg/rtsp/rtcp.go index a140c63..1b93948 100644 --- a/pkg/rtsp/rtcp.go +++ b/pkg/rtsp/rtcp.go @@ -13,10 +13,79 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -// rfc3550 +// ------------------------------------------- +// rfc3550 6.4.1 SR: Sender Report RTCP Packet +// ------------------------------------------- +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// header |V=2|P| RC | PT=SR=200 | length | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | SSRC of sender | +// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// sender | NTP timestamp, most significant word | +// info +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | NTP timestamp, least significant word | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | RTP timestamp | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | sender's packet count | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | sender's octet count | +// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// report | SSRC_1 (SSRC of first source) | +// block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// 1 | fraction lost | cumulative number of packets lost | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | extended highest sequence number received | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | interarrival jitter | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | last SR (LSR) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | delay since last SR (DLSR) | +// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// report | SSRC_2 (SSRC of second source) | +// block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// 2 : ... : +// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// | profile-specific extensions | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +// --------------------------------------------- +// rfc3550 6.4.2 RR: Receiver Report RTCP Packet +// --------------------------------------------- +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// header |V=2|P| RC | PT=RR=201 | length | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | SSRC of packet sender | +// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// report | SSRC_1 (SSRC of first source) | +// block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// 1 | fraction lost | cumulative number of packets lost | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | extended highest sequence number received | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | interarrival jitter | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | last SR (LSR) | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | delay since last SR (DLSR) | +// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// report | SSRC_2 (SSRC of second source) | +// block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// 2 : ... : +// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// | profile-specific extensions | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ const ( - PacketTypeSR = 200 // Sender Report + RTCPPacketTypeSR = 200 // Sender Report + RTCPPacketTypeRR = 201 // Receiver Report ) type RTCPHeader struct { @@ -43,10 +112,10 @@ func parseRTCPPacket(b []byte) { h.countOrFormat = b[0] & 0x1F h.packetType = b[1] h.length = bele.BEUint16(b[2:]) - nazalog.Debugf("%+v", h) + //nazalog.Debugf("%+v", h) switch h.packetType { - case PacketTypeSR: + case RTCPPacketTypeSR: parseSR(b) default: nazalog.Warnf("unknown packet type. type=%d", h.packetType) @@ -62,5 +131,5 @@ func parseSR(b []byte) { s.ts = bele.BEUint32(b[16:]) s.pktCnt = bele.BEUint32(b[20:]) s.octetCnt = bele.BEUint32(b[24:]) - nazalog.Debugf("%+v", s) + //nazalog.Debugf("%+v", s) } diff --git a/pkg/rtsp/rtcp_packer.go b/pkg/rtsp/rtcp_packer.go new file mode 100644 index 0000000..76c3e56 --- /dev/null +++ b/pkg/rtsp/rtcp_packer.go @@ -0,0 +1,14 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package rtsp + +func PackRR() []byte { + // TODO chef: impl me + return nil +} diff --git a/pkg/rtsp/rtcp_server.go b/pkg/rtsp/rtcp_server.go index ccce1d4..b6e780d 100644 --- a/pkg/rtsp/rtcp_server.go +++ b/pkg/rtsp/rtcp_server.go @@ -8,9 +8,7 @@ package rtsp -import ( - "github.com/q191201771/naza/pkg/nazalog" -) +import "github.com/q191201771/naza/pkg/nazalog" type RTCPServer struct { udpServer *UDPServer @@ -23,14 +21,15 @@ func NewRTCPServer(addr string) *RTCPServer { } func (r *RTCPServer) OnReadUDPPacket(b []byte, addr string, err error) { - nazalog.Debugf("< R length=%d, remote=%s, err=%v", len(b), addr, err) + //nazalog.Debugf("< R length=%d, remote=%s, err=%v", len(b), addr, err) parseRTCPPacket(b) } -func (s *RTCPServer) Listen() (err error) { - return s.udpServer.Listen() +func (r *RTCPServer) Listen() (err error) { + nazalog.Infof("start rtcp server listen. addr=%s", r.udpServer.addr) + return r.udpServer.Listen() } -func (s *RTCPServer) RunLoop() error { - return s.udpServer.RunLoop() +func (r *RTCPServer) RunLoop() error { + return r.udpServer.RunLoop() } diff --git a/pkg/rtsp/rtp.go b/pkg/rtsp/rtp.go index 9b3e202..9456d4e 100644 --- a/pkg/rtsp/rtp.go +++ b/pkg/rtsp/rtp.go @@ -9,11 +9,59 @@ package rtsp import ( + "errors" + "github.com/q191201771/naza/pkg/bele" - "github.com/q191201771/naza/pkg/nazalog" ) +// ----------------------------------- // rfc3550 5.1 RTP Fixed Header Fields +// ----------------------------------- +// +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |V=2|P|X| CC |M| PT | sequence number | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | timestamp | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | synchronization source (SSRC) identifier | +// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +// | contributing source (CSRC) identifiers | +// | .... | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +var ErrRTP = errors.New("lal.rtp: fxxk") + +const ( + RTPFixedHeaderLength = 12 +) + +const ( + RTPPacketTypeAVC = 96 + RTPPacketTypeAAC = 97 +) + +// rfc3984 5.2. Common Structure of the RTP Payload Format +// Table 1. Summary of NAL unit types and their payload structures +// +// Type Packet Type name Section +// --------------------------------------------------------- +// 0 undefined - +// 1-23 NAL unit Single NAL unit packet per H.264 5.6 +// 24 STAP-A Single-time aggregation packet 5.7.1 +// 25 STAP-B Single-time aggregation packet 5.7.1 +// 26 MTAP16 Multi-time aggregation packet 5.7.2 +// 27 MTAP24 Multi-time aggregation packet 5.7.2 +// 28 FU-A Fragmentation unit 5.8 +// 29 FU-B Fragmentation unit 5.8 +// 30-31 undefined - + +const ( + NALUTypeSingleMax = 23 + NALUTypeFUA = 28 +) + type RTPHeader struct { version uint8 // 2b padding uint8 // 1b @@ -24,10 +72,23 @@ type RTPHeader struct { seq uint16 // 16b timestamp uint32 // 32b ssrc uint32 // 32b + + payloadOffset uint32 } -func parseRTPPacket(b []byte) { - var h RTPHeader +func isAudio(packetType uint8) bool { + if packetType == RTPPacketTypeAAC { + return true + } + return false +} + +func parseRTPPacket(b []byte) (h RTPHeader, err error) { + if len(b) < RTPFixedHeaderLength { + err = ErrRTP + return + } + h.version = b[0] >> 6 h.padding = (b[0] >> 5) & 0x1 h.extension = (b[0] >> 4) & 0x1 @@ -37,5 +98,7 @@ func parseRTPPacket(b []byte) { h.seq = bele.BEUint16(b[2:]) h.timestamp = bele.BEUint32(b[4:]) h.ssrc = bele.BEUint32(b[8:]) - nazalog.Debugf("%+v", h) + + h.payloadOffset = RTPFixedHeaderLength + return } diff --git a/pkg/rtsp/rtp_server.go b/pkg/rtsp/rtp_server.go index 7106eac..bc3be34 100644 --- a/pkg/rtsp/rtp_server.go +++ b/pkg/rtsp/rtp_server.go @@ -9,28 +9,57 @@ package rtsp import ( + "sync" + "github.com/q191201771/naza/pkg/nazalog" ) type RTPServer struct { udpServer *UDPServer + + m sync.Mutex + ssrc2Session map[uint32]*Session } func NewRTPServer(addr string) *RTPServer { var s RTPServer s.udpServer = NewUDPServer(addr, s.OnReadUDPPacket) + s.ssrc2Session = make(map[uint32]*Session) return &s } func (r *RTPServer) OnReadUDPPacket(b []byte, addr string, err error) { - nazalog.Debugf("< R length=%d, remote=%s, err=%v", len(b), addr, err) - parseRTPPacket(b) + //nazalog.Debugf("< R length=%d, remote=%s, err=%v", len(b), addr, err) + h, err := parseRTPPacket(b) + if err != nil { + nazalog.Errorf("read invalid rtp packet. err=%+v", err) + } + switch h.packetType { + case RTPPacketTypeAAC: + s := r.getOrCreateSession(h) + s.FeedAACPacket(b, h) + case RTPPacketTypeAVC: + nazalog.Debugf("header=%+v, length=%d", h, len(b)) + s := r.getOrCreateSession(h) + s.FeedAVCPacket(b, h) + } +} + +func (r *RTPServer) Listen() (err error) { + nazalog.Infof("start rtp server listen. addr=%s", r.udpServer.addr) + return r.udpServer.Listen() } -func (s *RTPServer) Listen() (err error) { - return s.udpServer.Listen() +func (r *RTPServer) RunLoop() error { + return r.udpServer.RunLoop() } -func (s *RTPServer) RunLoop() error { - return s.udpServer.RunLoop() +func (r *RTPServer) getOrCreateSession(h RTPHeader) *Session { + r.m.Lock() + defer r.m.Unlock() + s, ok := r.ssrc2Session[h.ssrc] + if ok { + return s + } + return NewSession(h.ssrc, isAudio(h.packetType)) } diff --git a/pkg/rtsp/rtp_session.go b/pkg/rtsp/rtp_session.go new file mode 100644 index 0000000..8c9f2d2 --- /dev/null +++ b/pkg/rtsp/rtp_session.go @@ -0,0 +1,159 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package rtsp + +import ( + "encoding/hex" + + "github.com/q191201771/naza/pkg/nazalog" +) + +type Session struct { + ssrc uint32 + isAudio bool +} + +func NewSession(ssrc uint32, isAudio bool) *Session { + return &Session{ + ssrc: ssrc, + isAudio: isAudio, + } +} + +func (s *Session) FeedAVCPacket(b []byte, h RTPHeader) { + // h264 + { + // rfc3984 5.3. NAL Unit Octet Usage + // + // +---------------+ + // |0|1|2|3|4|5|6|7| + // +-+-+-+-+-+-+-+-+ + // |F|NRI| Type | + // +---------------+ + + outerNALUType := b[h.payloadOffset] & 0x1F + if outerNALUType <= NALUTypeSingleMax { + nazalog.Debugf("SINGLE. naluType=%d %s", outerNALUType, hex.Dump(b[12:32])) + } else if outerNALUType == NALUTypeFUA { + + // rfc3984 5.8. Fragmentation Units (FUs) + // + // 0 1 2 3 + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | FU indicator | FU header | | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | + // | | + // | FU payload | + // | | + // | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | :...OPTIONAL RTP padding | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // + // FU indicator: + // +---------------+ + // |0|1|2|3|4|5|6|7| + // +-+-+-+-+-+-+-+-+ + // |F|NRI| Type | + // +---------------+ + // + // Fu header: + // +---------------+ + // |0|1|2|3|4|5|6|7| + // +-+-+-+-+-+-+-+-+ + // |S|E|R| Type | + // +---------------+ + + //fuIndicator := b[h.payloadOffset] + fuHeader := b[h.payloadOffset+1] + + startCode := (fuHeader & 0x80) != 0 + endCode := (fuHeader & 0x40) != 0 + + //naluType := (fuIndicator & 0xE0) | (fuHeader & 0x1F) + naluType := fuHeader & 0x1F + + nazalog.Debugf("FUA. outerNALUType=%d, naluType=%d, startCode=%t, endCode=%t %s", outerNALUType, naluType, startCode, endCode, hex.Dump(b[12:32])) + } else { + nazalog.Errorf("error. type=%d", outerNALUType) + } + + // TODO chef: to be continued + // 从SDP中获取SPS,PPS等信息 + // 将RTP包合并出视频帧 + } + + // h265 + //{ + // originNALUType := (b[h.payloadOffset] >> 1) & 0x3F + // if originNALUType == 49 { + // header2 := b[h.payloadOffset+2] + // + // startCode := (header2 & 0x80) != 0 + // endCode := (header2 & 0x40) != 0 + // + // naluType := header2 & 0x3F + // + // nazalog.Debugf("FUA. originNALUType=%d, naluType=%d, startCode=%t, endCode=%t %s", originNALUType, naluType, startCode, endCode, hex.Dump(b[12:32])) + // + // } else { + // nazalog.Debugf("SINGLE. naluType=%d %s", originNALUType, hex.Dump(b[12:32])) + // } + //} +} + +func (s *Session) FeedAACPacket(b []byte, h RTPHeader) { + return + // TODO chef: 目前只实现了AAC MPEG4-GENERIC/44100/2 + + // rfc3640 2.11. Global Structure of Payload Format + // + // +---------+-----------+-----------+---------------+ + // | RTP | AU Header | Auxiliary | Access Unit | + // | Header | Section | Section | Data Section | + // +---------+-----------+-----------+---------------+ + // + // <----------RTP Packet Payload-----------> + // + // rfc3640 3.2.1. The AU Header Section + // + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+-+ + // |AU-headers-length|AU-header|AU-header| |AU-header|padding| + // | | (1) | (2) | | (n) | bits | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+-+ + // + // rfc3640 3.3.6. High Bit-rate AAC + // + + nazalog.Debugf("%s", hex.Dump(b[12:])) + + // au header section + auHeaderLength := b[h.payloadOffset]<<8 + b[h.payloadOffset+1] + auHeaderLength = (auHeaderLength + 7) / 8 + nazalog.Debugf("auHeaderLength=%d", auHeaderLength) + + // no auxiliary section + + pauh := h.payloadOffset + uint32(2) // au header pos + pau := h.payloadOffset + uint32(2) + uint32(auHeaderLength) // au pos + auNum := uint32(auHeaderLength) / 2 + for i := uint32(0); i < auNum; i++ { + auSize := uint32(b[pauh]<<8 | b[pauh+1]&0xF8) // 13bit + auSize /= 8 + + auIndex := b[pauh+1] & 0x7 + + // data + // pau, auSize + nazalog.Debugf("%d %d %s", auSize, auIndex, hex.Dump(b[pau:pau+auSize])) + + pauh += 2 + pau += uint32(auSize) + } +} diff --git a/pkg/rtsp/rtsp.go b/pkg/rtsp/rtsp.go index ca3eed8..75011c5 100644 --- a/pkg/rtsp/rtsp.go +++ b/pkg/rtsp/rtsp.go @@ -10,6 +10,9 @@ package rtsp // 注意,正在学习以及实现rtsp,请不要使用这个package +// TODO chef +// - rtp和rtcp作为独立package + // rfc2326 const ( @@ -32,7 +35,6 @@ var ( serverName = "lalserver" sessionID = "191201771" - // TODO chef: to be continued // read RTP/RTCP data from port serverPort = "8000-8001" diff --git a/pkg/rtsp/packer.go b/pkg/rtsp/rtsp_packer.go similarity index 100% rename from pkg/rtsp/packer.go rename to pkg/rtsp/rtsp_packer.go diff --git a/pkg/rtsp/server.go b/pkg/rtsp/rtsp_server.go similarity index 98% rename from pkg/rtsp/server.go rename to pkg/rtsp/rtsp_server.go index a3a1ad0..2930939 100644 --- a/pkg/rtsp/server.go +++ b/pkg/rtsp/rtsp_server.go @@ -70,7 +70,7 @@ func (s *Server) handleTCPConnect(conn net.Conn) { var body []byte if contentLength, ok := headers["Content-Length"]; ok { if cl, err := strconv.Atoi(contentLength); err == nil { - body := make([]byte, cl) + body = make([]byte, cl) l, err := io.ReadAtLeast(r, body, cl) if l != cl || err != nil { nazalog.Errorf("read rtsp cmd fail. content-length=%d, read length=%d, err=%+v", cl, l, err) @@ -89,6 +89,7 @@ func (s *Server) handleTCPConnect(conn net.Conn) { _, _ = conn.Write([]byte(resp)) case MethodAnnounce: nazalog.Info("< R ANNOUNCE") + parseSDP(body) resp := PackResponseAnnounce(headers[HeaderFieldCSeq]) _, _ = conn.Write([]byte(resp)) case MethodSetup: diff --git a/pkg/rtsp/sdp.go b/pkg/rtsp/sdp.go new file mode 100644 index 0000000..f78bdf7 --- /dev/null +++ b/pkg/rtsp/sdp.go @@ -0,0 +1,108 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package rtsp + +import ( + "errors" + "strconv" + "strings" + + "github.com/q191201771/naza/pkg/nazalog" +) + +var ErrSDP = errors.New("lal.sdp: fxxk") + +type SDP struct { +} + +// rfc 4566 5.14. Media Descriptions ("m=") +// m= .. +// +// example: +// m=audio 0 RTP/AVP 97 +//type MediaDesc struct { +// Media string +// Port string +// Proto string +// Fmt string +//} + +type ARTPMap struct { + PayloadType int + EncodingName string + ClockRate string + EncodingParameters string +} + +type FmtP struct { + Mode string +} + +func parseSDP(b []byte) SDP { + s := string(b) + lines := strings.Split(s, "\r\n") + for _, line := range lines { + if strings.HasPrefix(line, "a=rtpmap") { + aRTPMap, err := parseARTPMap(line) + nazalog.Debugf("%+v, %v", aRTPMap, err) + } + } + + return SDP{} +} + +func parseARTPMap(s string) (ret ARTPMap, err error) { + // rfc 3640 3.3.1. General + // rfc 3640 3.3.6. High Bit-rate AAC + // + // a=rtpmap: /[/] + // + // example: + // a=rtpmap:96 H264/90000 + // a=rtpmap:97 MPEG4-GENERIC/44100/2 + + items := strings.Split(s, ":") + if len(items) != 2 { + err = ErrSDP + return + } + items = strings.Split(items[1], " ") + if len(items) != 2 { + err = ErrSDP + return + } + ret.PayloadType, err = strconv.Atoi(items[0]) + if err != nil { + return + } + items = strings.Split(items[1], "/") + switch len(items) { + case 3: + ret.EncodingParameters = items[2] + fallthrough + case 2: + ret.EncodingName = items[0] + ret.ClockRate = items[1] + default: + err = ErrSDP + } + return +} + +func parseFmtP(s string) (ret ARTPMap, err error) { + // rfc 3640 4.4.1. The a=fmtp Keyword + // + // a=fmtp: =[; =] + // + // example: + // a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=; profile-level-id=640020 + // a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=1210 + + return +} diff --git a/pkg/rtsp/udp_server.go b/pkg/rtsp/udp_server.go index ffebc72..1af965c 100644 --- a/pkg/rtsp/udp_server.go +++ b/pkg/rtsp/udp_server.go @@ -40,7 +40,6 @@ func (s *UDPServer) Listen() (err error) { if err != nil { return } - nazalog.Infof("start rtcp server listen. addr=%s", s.addr) return }