1. [feat] package rtsp: 开始处理收到的rtp数据 2. [style] Nalu更改为NALU

pull/8/head^2
q191201771 5 years ago
parent 5576b6659d
commit a11723bde7

@ -183,15 +183,15 @@ func analysisVideoTag(tag httpflv.Tag) {
}
switch t {
case typeAVC:
if avc.CalcNaluType(body[i+4:]) == avc.NaluUnitTypeIDRSlice {
if avc.CalcNALUType(body[i+4:]) == avc.NALUTypeIDRSlice {
if prevIDRTS != int64(-1) {
diffIDRTS = int64(tag.Header.Timestamp) - prevIDRTS
}
prevIDRTS = int64(tag.Header.Timestamp)
}
buf.WriteString(fmt.Sprintf(" [%s(%s)] ", avc.CalcNaluTypeReadable(body[i+4:]), avc.CalcSliceTypeReadable(body[i+4:])))
buf.WriteString(fmt.Sprintf(" [%s(%s)] ", avc.CalcNALUTypeReadable(body[i+4:]), avc.CalcSliceTypeReadable(body[i+4:])))
case typeHEVC:
buf.WriteString(fmt.Sprintf(" [%s] ", hevc.CalcNaluTypeReadable(body[i+4:])))
buf.WriteString(fmt.Sprintf(" [%s] ", hevc.CalcNALUTypeReadable(body[i+4:])))
}
i = i + 4 + int(naluLen)
}

@ -78,7 +78,7 @@ func readAllTag(filename string) (ret []httpflv.Tag) {
log.Debugf("M %d", tag.Header.Timestamp)
} else if tag.IsVideoKeySeqHeader() {
log.Debugf("V SH %d", tag.Header.Timestamp)
} else if tag.IsVideoKeyNalu() {
} else if tag.IsVideoKeyNALU() {
log.Debugf("V K %d", tag.Header.Timestamp)
} else if tag.IsAACSeqHeader() {
log.Debugf("A SH %d", tag.Header.Timestamp)

@ -18,9 +18,9 @@ import (
var ErrAVC = errors.New("lal.avc: fxxk")
var NaluStartCode = []byte{0x0, 0x0, 0x0, 0x1}
var NALUStartCode = []byte{0x0, 0x0, 0x0, 0x1}
var NaluUintTypeMapping = map[uint8]string{
var NALUTypeMapping = map[uint8]string{
1: "SLICE",
5: "IDR",
6: "SEI",
@ -43,12 +43,12 @@ var SliceTypeMapping = map[uint8]string{
}
const (
NaluUnitTypeSlice uint8 = 1
NaluUnitTypeIDRSlice uint8 = 5
NaluUnitTypeSEI uint8 = 6
NaluUintTypeSPS uint8 = 7
NaluUintTypePPS uint8 = 8
NaluUintTypeAUD uint8 = 9
NALUTypeSlice uint8 = 1
NALUTypeIDRSlice uint8 = 5
NALUTypeSEI uint8 = 6
NALUTypeSPS uint8 = 7
NALUTypePPS uint8 = 8
NALUTypeAUD uint8 = 9
)
const (
@ -59,7 +59,7 @@ const (
SliceTypeSI uint8 = 4 // TODO chef
)
func CalcNaluType(nalu []byte) uint8 {
func CalcNALUType(nalu []byte) uint8 {
return nalu[0] & 0x1f
}
@ -82,9 +82,9 @@ func CalcSliceType(nalu []byte) uint8 {
return uint8(sliceType)
}
func CalcNaluTypeReadable(nalu []byte) string {
func CalcNALUTypeReadable(nalu []byte) string {
t := nalu[0] & 0x1f
ret, ok := NaluUintTypeMapping[t]
ret, ok := NALUTypeMapping[t]
if !ok {
return "unknown"
}
@ -92,13 +92,13 @@ func CalcNaluTypeReadable(nalu []byte) string {
}
func CalcSliceTypeReadable(nalu []byte) string {
naluType := CalcNaluType(nalu)
naluType := CalcNALUType(nalu)
switch naluType {
case NaluUnitTypeSEI:
case NALUTypeSEI:
fallthrough
case NaluUintTypeSPS:
case NALUTypeSPS:
fallthrough
case NaluUintTypePPS:
case NALUTypePPS:
return ""
}
@ -167,9 +167,9 @@ func CaptureAVC(w io.Writer, payload []byte) error {
return err
}
//utilErrors.PanicIfErrorOccur(err)
_, _ = w.Write(NaluStartCode)
_, _ = w.Write(NALUStartCode)
_, _ = w.Write(sps)
_, _ = w.Write(NaluStartCode)
_, _ = w.Write(NALUStartCode)
_, _ = w.Write(pps)
return nil
}
@ -179,9 +179,9 @@ func CaptureAVC(w io.Writer, payload []byte) error {
for i := 5; i != len(payload); {
naluLen := int(bele.BEUint32(payload[i:]))
i += 4
//naluUintType := payload[i] & 0x1f
//log.Debugf("naluLen:%d t:%d %s\n", naluLen, naluUintType, avc.NaluUintTypeMapping[naluUintType])
_, _ = w.Write(NaluStartCode)
//naluType := payload[i] & 0x1f
//log.Debugf("naluLen:%d t:%d %s\n", naluLen, naluType, avc.NALUTypeMapping[naluUintType])
_, _ = w.Write(NALUStartCode)
_, _ = w.Write(payload[i : i+naluLen])
i += naluLen
break

@ -8,32 +8,35 @@
package hevc
var NaluUintTypeMapping = map[uint8]string{
NaluUnitTypeSliceTrailR: "SLICE",
NaluUnitTypeSliceIDR: "I",
NaluUintTypeSliceIDRNLP: "IDR",
NaluUnitTypeSEI: "SEI",
NaluUnitTypeSEISuffix: "SEI",
var NALUTypeMapping = map[uint8]string{
NALUTypeSliceTrailR: "SLICE",
NALUTypeSliceIDR: "I",
NALUTypeSliceIDRNLP: "IDR",
NALUTypeSEI: "SEI",
NALUTypeSEISuffix: "SEI",
}
var (
NaluUnitTypeSliceTrailR uint8 = 1 // 0x01
NaluUnitTypeSliceIDR uint8 = 19 // 0x13
NaluUintTypeSliceIDRNLP uint8 = 20 // 0x14
NaluUnitTypeVPS uint8 = 32 // 0x20
NaluUnitTypeSPS uint8 = 33 // 0x21
NaluUnitTypePPS uint8 = 34 // 0x22
NaluUnitTypeSEI uint8 = 39 // 0x27
NaluUnitTypeSEISuffix uint8 = 40 // 0x28
NALUTypeSliceTrailR uint8 = 1 // 0x01
NALUTypeSliceIDR uint8 = 19 // 0x13
NALUTypeSliceIDRNLP uint8 = 20 // 0x14
NALUTypeVPS uint8 = 32 // 0x20
NALUTypeSPS uint8 = 33 // 0x21
NALUTypePPS uint8 = 34 // 0x22
NALUTypeSEI uint8 = 39 // 0x27
NALUTypeSEISuffix uint8 = 40 // 0x28
)
func CalcNaluTypeReadable(nalu []byte) string {
b, ok := NaluUintTypeMapping[CalcNaluType(nalu)]
func CalcNALUTypeReadable(nalu []byte) string {
b, ok := NALUTypeMapping[CalcNALUType(nalu)]
if !ok {
return "unknown"
}
return b
}
func CalcNaluType(nalu []byte) uint8 {
func CalcNALUType(nalu []byte) uint8 {
// 6 bit in middle
// 0*** ***0
// or return (nalu[0] >> 1) & 0x3F
return (nalu[0] & 0x7E) >> 1
}

@ -13,8 +13,8 @@ import (
)
type FragmentOP struct {
fp *os.File
packet []byte //WriteFrame中缓存每个TS包数据
fp *os.File
packet []byte //WriteFrame中缓存每个TS包数据
}
type mpegTSFrame struct {
@ -62,12 +62,12 @@ func (f *FragmentOP) WriteFrame(frame *mpegTSFrame, b []byte) {
// continuity_counter
// ------------------------------
f.packet[0] = syncByte // sync_byte
f.packet[1] = 0x0;
f.packet[1] = 0x0
if first {
f.packet[1] = 0x40 // payload_unit_start_indicator
}
f.packet[1] |= uint8((frame.pid >> 8) & 0x1F) //PID高5位
f.packet[2] = uint8(frame.pid & 0xFF) //PID低8位
f.packet[1] |= uint8((frame.pid >> 8) & 0x1F) //PID高5位
f.packet[2] = uint8(frame.pid & 0xFF) //PID低8位
// adaptation_field_control 先设置成无Adaptation
// continuity_counter
@ -195,7 +195,6 @@ func (f *FragmentOP) WriteFrame(frame *mpegTSFrame, b []byte) {
f.packet[3] |= 0x20
base := 4
if wpos > base {
copy(f.packet[base+stuffSize:], f.packet[base:wpos])

@ -97,6 +97,7 @@ func (m *Muxer) Dispose() {
m.closeFragment()
}
// 函数调用结束后内部不持有msg中的内存块
func (m *Muxer) FeedRTMPMessage(msg rtmp.AVMsg) {
switch msg.Header.MsgTypeID {
case rtmp.TypeidAudio:
@ -241,10 +242,8 @@ func (m *Muxer) cacheAACSeqHeader(msg rtmp.AVMsg) {
}
func (m *Muxer) cacheSPSPPS(msg rtmp.AVMsg) {
// 分配新内存来缓存SPS_PPS的msg
// 这样就可以不依赖func (group *Group) OnReadRTMPAVMsg中的msg变量
m.spspps = make([]byte,len(msg.Payload))
copy(m.spspps,msg.Payload)
m.spspps = make([]byte, len(msg.Payload))
copy(m.spspps, msg.Payload)
}
func (m *Muxer) appendSPSPPS(out []byte) []byte {

@ -47,10 +47,10 @@ const (
const (
AVCPacketTypeSeqHeader uint8 = 0
AVCPacketTypeNalu uint8 = 1
AVCPacketTypeNALU uint8 = 1
HEVCPacketTypeSeqHeader uint8 = 0
HEVCPacketTypeNalu uint8 = 1
HEVCPacketTypeNALU uint8 = 1
AACPacketTypeSeqHeader uint8 = 0
AACPacketTypeRaw uint8 = 1

@ -55,17 +55,17 @@ func (tag *Tag) IsVideoKeySeqHeader() bool {
return tag.IsAVCKeySeqHeader() || tag.IsHEVCKeySeqHeader()
}
func (tag *Tag) IsAVCKeyNalu() bool {
return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == AVCKeyFrame && tag.Raw[TagHeaderSize+1] == AVCPacketTypeNalu
func (tag *Tag) IsAVCKeyNALU() bool {
return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == AVCKeyFrame && tag.Raw[TagHeaderSize+1] == AVCPacketTypeNALU
}
func (tag *Tag) IsHEVCKeyNalu() bool {
return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == HEVCKeyFrame && tag.Raw[TagHeaderSize+1] == HEVCPacketTypeNalu
func (tag *Tag) IsHEVCKeyNALU() bool {
return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == HEVCKeyFrame && tag.Raw[TagHeaderSize+1] == HEVCPacketTypeNALU
}
// AVC或HEVC的关键帧
func (tag *Tag) IsVideoKeyNalu() bool {
return tag.IsAVCKeyNalu() || tag.IsHEVCKeyNalu()
func (tag *Tag) IsVideoKeyNALU() bool {
return tag.IsAVCKeyNALU() || tag.IsHEVCKeyNALU()
}
func (tag *Tag) IsAACSeqHeader() bool {

@ -99,7 +99,7 @@ func (gc *GOPCache) Feed(msg rtmp.AVMsg, lg LazyGet) {
// 这个size的判断去掉也行
if gc.gopSize > 1 {
if msg.IsVideoKeyNalu() {
if msg.IsVideoKeyNALU() {
gc.feedNewGOP(msg, lg())
} else {
gc.feedLastGOP(msg, lg())

@ -275,11 +275,6 @@ func (group *Group) OnReadRTMPAVMsg(msg rtmp.AVMsg) {
group.mutex.Lock()
defer group.mutex.Unlock()
//因为group.broadcastRTMP和group.hlsMuxer.FeedRTMPMessage都不引用msg了,所以不需要复制数据了
//p := make([]byte, len(msg.Payload))
//copy(p, msg.Payload)
//msg.Payload = p
//nazalog.Debugf("%+v, %02x, %02x", msg.Header, msg.Payload[0], msg.Payload[1])
group.broadcastRTMP(msg)

@ -204,8 +204,6 @@ func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Gr
group = NewGroup(appName, streamName)
sm.groupMap[streamName] = group
// 只在第1个ServerSession产生时启动这个group协程
// 注: 创建的group协程暂时做结构设计预留现在并没有实际动作以后可以用协程执行OnReadRTMPAVMsg中数据转发
go group.RunLoop()
}
return group

@ -108,6 +108,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
case 3:
// noop
}
//nazalog.Debugf("RTMP_CHUNK_COMPOSER chunk.fmt=%d, csid=%d, header=%+v", fmt, csid, stream.header)
// 5.3.1.3 Extended Timestamp
// 使用ffmpeg推流时发现时间戳超过3字节最大值后即使是fmt3(即包头大小为0)依然存在ext ts字段
@ -121,6 +122,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
return err
}
stream.header.Timestamp = bele.BEUint32(bootstrap)
//nazalog.Debugf("RTMP_CHUNK_COMPOSER ext. extTs=%d", stream.header.Timestamp)
switch fmt {
case 0:
stream.header.TimestampAbs = stream.header.Timestamp
@ -132,8 +134,6 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
// noop
}
}
//stream.header.CSID = csid
//nazalog.Debugf("ChunkComposer chunk fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs)
var neededSize uint32
if stream.header.MsgLen <= c.peerChunkSize {
@ -164,7 +164,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
stream.header.TimestampAbs += stream.header.Timestamp
}
absTsFlag = false
//nazalog.Debugf("ChunkComposer message fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs)
//nazalog.Debugf("RTMP_CHUNK_COMPOSER cb. fmt=%d, csid=%d, header=%+v", fmt, csid, stream.header)
if err := cb(stream); err != nil {
return err

@ -81,10 +81,10 @@ const (
HEVCInterFrame = frameTypeInter<<4 | codecIDHEVC
AVCPacketTypeSeqHeader uint8 = 0
AVCPacketTypeNalu uint8 = 1
AVCPacketTypeNALU uint8 = 1
HEVCPacketTypeSeqHeader uint8 = 0
HEVCPacketTypeNalu uint8 = 1
HEVCPacketTypeNALU uint8 = 1
AACPacketTypeSeqHeader uint8 = 0
AACPacketTypeRaw uint8 = 1
@ -104,16 +104,16 @@ func (msg AVMsg) IsVideoKeySeqHeader() bool {
return msg.IsAVCKeySeqHeader() || msg.IsHEVCKeySeqHeader()
}
func (msg AVMsg) IsAVCKeyNalu() bool {
return msg.Header.MsgTypeID == TypeidVideo && msg.Payload[0] == AVCKeyFrame && msg.Payload[1] == AVCPacketTypeNalu
func (msg AVMsg) IsAVCKeyNALU() bool {
return msg.Header.MsgTypeID == TypeidVideo && msg.Payload[0] == AVCKeyFrame && msg.Payload[1] == AVCPacketTypeNALU
}
func (msg AVMsg) IsHEVCKeyNalu() bool {
return msg.Header.MsgTypeID == TypeidVideo && msg.Payload[0] == HEVCKeyFrame && msg.Payload[1] == HEVCPacketTypeNalu
func (msg AVMsg) IsHEVCKeyNALU() bool {
return msg.Header.MsgTypeID == TypeidVideo && msg.Payload[0] == HEVCKeyFrame && msg.Payload[1] == HEVCPacketTypeNALU
}
func (msg AVMsg) IsVideoKeyNalu() bool {
return msg.IsAVCKeyNalu() || msg.IsHEVCKeyNalu()
func (msg AVMsg) IsVideoKeyNALU() bool {
return msg.IsAVCKeyNALU() || msg.IsHEVCKeyNALU()
}
func (msg AVMsg) IsAACSeqHeader() bool {

@ -13,10 +13,79 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// rfc3550
// -------------------------------------------
// rfc3550 6.4.1 SR: Sender Report RTCP Packet
// -------------------------------------------
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// header |V=2|P| RC | PT=SR=200 | length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | SSRC of sender |
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// sender | NTP timestamp, most significant word |
// info +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | NTP timestamp, least significant word |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | RTP timestamp |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | sender's packet count |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | sender's octet count |
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// report | SSRC_1 (SSRC of first source) |
// block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// 1 | fraction lost | cumulative number of packets lost |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | extended highest sequence number received |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | interarrival jitter |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | last SR (LSR) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | delay since last SR (DLSR) |
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// report | SSRC_2 (SSRC of second source) |
// block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// 2 : ... :
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// | profile-specific extensions |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// ---------------------------------------------
// rfc3550 6.4.2 RR: Receiver Report RTCP Packet
// ---------------------------------------------
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// header |V=2|P| RC | PT=RR=201 | length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | SSRC of packet sender |
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// report | SSRC_1 (SSRC of first source) |
// block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// 1 | fraction lost | cumulative number of packets lost |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | extended highest sequence number received |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | interarrival jitter |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | last SR (LSR) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | delay since last SR (DLSR) |
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// report | SSRC_2 (SSRC of second source) |
// block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// 2 : ... :
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// | profile-specific extensions |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
const (
PacketTypeSR = 200 // Sender Report
RTCPPacketTypeSR = 200 // Sender Report
RTCPPacketTypeRR = 201 // Receiver Report
)
type RTCPHeader struct {
@ -43,10 +112,10 @@ func parseRTCPPacket(b []byte) {
h.countOrFormat = b[0] & 0x1F
h.packetType = b[1]
h.length = bele.BEUint16(b[2:])
nazalog.Debugf("%+v", h)
//nazalog.Debugf("%+v", h)
switch h.packetType {
case PacketTypeSR:
case RTCPPacketTypeSR:
parseSR(b)
default:
nazalog.Warnf("unknown packet type. type=%d", h.packetType)
@ -62,5 +131,5 @@ func parseSR(b []byte) {
s.ts = bele.BEUint32(b[16:])
s.pktCnt = bele.BEUint32(b[20:])
s.octetCnt = bele.BEUint32(b[24:])
nazalog.Debugf("%+v", s)
//nazalog.Debugf("%+v", s)
}

@ -0,0 +1,14 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp
func PackRR() []byte {
// TODO chef: impl me
return nil
}

@ -8,9 +8,7 @@
package rtsp
import (
"github.com/q191201771/naza/pkg/nazalog"
)
import "github.com/q191201771/naza/pkg/nazalog"
type RTCPServer struct {
udpServer *UDPServer
@ -23,14 +21,15 @@ func NewRTCPServer(addr string) *RTCPServer {
}
func (r *RTCPServer) OnReadUDPPacket(b []byte, addr string, err error) {
nazalog.Debugf("< R length=%d, remote=%s, err=%v", len(b), addr, err)
//nazalog.Debugf("< R length=%d, remote=%s, err=%v", len(b), addr, err)
parseRTCPPacket(b)
}
func (s *RTCPServer) Listen() (err error) {
return s.udpServer.Listen()
func (r *RTCPServer) Listen() (err error) {
nazalog.Infof("start rtcp server listen. addr=%s", r.udpServer.addr)
return r.udpServer.Listen()
}
func (s *RTCPServer) RunLoop() error {
return s.udpServer.RunLoop()
func (r *RTCPServer) RunLoop() error {
return r.udpServer.RunLoop()
}

@ -9,11 +9,59 @@
package rtsp
import (
"errors"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazalog"
)
// -----------------------------------
// rfc3550 5.1 RTP Fixed Header Fields
// -----------------------------------
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |V=2|P|X| CC |M| PT | sequence number |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | timestamp |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | synchronization source (SSRC) identifier |
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// | contributing source (CSRC) identifiers |
// | .... |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
var ErrRTP = errors.New("lal.rtp: fxxk")
const (
RTPFixedHeaderLength = 12
)
const (
RTPPacketTypeAVC = 96
RTPPacketTypeAAC = 97
)
// rfc3984 5.2. Common Structure of the RTP Payload Format
// Table 1. Summary of NAL unit types and their payload structures
//
// Type Packet Type name Section
// ---------------------------------------------------------
// 0 undefined -
// 1-23 NAL unit Single NAL unit packet per H.264 5.6
// 24 STAP-A Single-time aggregation packet 5.7.1
// 25 STAP-B Single-time aggregation packet 5.7.1
// 26 MTAP16 Multi-time aggregation packet 5.7.2
// 27 MTAP24 Multi-time aggregation packet 5.7.2
// 28 FU-A Fragmentation unit 5.8
// 29 FU-B Fragmentation unit 5.8
// 30-31 undefined -
const (
NALUTypeSingleMax = 23
NALUTypeFUA = 28
)
type RTPHeader struct {
version uint8 // 2b
padding uint8 // 1b
@ -24,10 +72,23 @@ type RTPHeader struct {
seq uint16 // 16b
timestamp uint32 // 32b
ssrc uint32 // 32b
payloadOffset uint32
}
func parseRTPPacket(b []byte) {
var h RTPHeader
func isAudio(packetType uint8) bool {
if packetType == RTPPacketTypeAAC {
return true
}
return false
}
func parseRTPPacket(b []byte) (h RTPHeader, err error) {
if len(b) < RTPFixedHeaderLength {
err = ErrRTP
return
}
h.version = b[0] >> 6
h.padding = (b[0] >> 5) & 0x1
h.extension = (b[0] >> 4) & 0x1
@ -37,5 +98,7 @@ func parseRTPPacket(b []byte) {
h.seq = bele.BEUint16(b[2:])
h.timestamp = bele.BEUint32(b[4:])
h.ssrc = bele.BEUint32(b[8:])
nazalog.Debugf("%+v", h)
h.payloadOffset = RTPFixedHeaderLength
return
}

@ -9,28 +9,57 @@
package rtsp
import (
"sync"
"github.com/q191201771/naza/pkg/nazalog"
)
type RTPServer struct {
udpServer *UDPServer
m sync.Mutex
ssrc2Session map[uint32]*Session
}
func NewRTPServer(addr string) *RTPServer {
var s RTPServer
s.udpServer = NewUDPServer(addr, s.OnReadUDPPacket)
s.ssrc2Session = make(map[uint32]*Session)
return &s
}
func (r *RTPServer) OnReadUDPPacket(b []byte, addr string, err error) {
nazalog.Debugf("< R length=%d, remote=%s, err=%v", len(b), addr, err)
parseRTPPacket(b)
//nazalog.Debugf("< R length=%d, remote=%s, err=%v", len(b), addr, err)
h, err := parseRTPPacket(b)
if err != nil {
nazalog.Errorf("read invalid rtp packet. err=%+v", err)
}
switch h.packetType {
case RTPPacketTypeAAC:
s := r.getOrCreateSession(h)
s.FeedAACPacket(b, h)
case RTPPacketTypeAVC:
nazalog.Debugf("header=%+v, length=%d", h, len(b))
s := r.getOrCreateSession(h)
s.FeedAVCPacket(b, h)
}
}
func (r *RTPServer) Listen() (err error) {
nazalog.Infof("start rtp server listen. addr=%s", r.udpServer.addr)
return r.udpServer.Listen()
}
func (s *RTPServer) Listen() (err error) {
return s.udpServer.Listen()
func (r *RTPServer) RunLoop() error {
return r.udpServer.RunLoop()
}
func (s *RTPServer) RunLoop() error {
return s.udpServer.RunLoop()
func (r *RTPServer) getOrCreateSession(h RTPHeader) *Session {
r.m.Lock()
defer r.m.Unlock()
s, ok := r.ssrc2Session[h.ssrc]
if ok {
return s
}
return NewSession(h.ssrc, isAudio(h.packetType))
}

@ -0,0 +1,159 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp
import (
"encoding/hex"
"github.com/q191201771/naza/pkg/nazalog"
)
type Session struct {
ssrc uint32
isAudio bool
}
func NewSession(ssrc uint32, isAudio bool) *Session {
return &Session{
ssrc: ssrc,
isAudio: isAudio,
}
}
func (s *Session) FeedAVCPacket(b []byte, h RTPHeader) {
// h264
{
// rfc3984 5.3. NAL Unit Octet Usage
//
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |F|NRI| Type |
// +---------------+
outerNALUType := b[h.payloadOffset] & 0x1F
if outerNALUType <= NALUTypeSingleMax {
nazalog.Debugf("SINGLE. naluType=%d %s", outerNALUType, hex.Dump(b[12:32]))
} else if outerNALUType == NALUTypeFUA {
// rfc3984 5.8. Fragmentation Units (FUs)
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | FU indicator | FU header | |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
// | |
// | FU payload |
// | |
// | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | :...OPTIONAL RTP padding |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// FU indicator:
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |F|NRI| Type |
// +---------------+
//
// Fu header:
// +---------------+
// |0|1|2|3|4|5|6|7|
// +-+-+-+-+-+-+-+-+
// |S|E|R| Type |
// +---------------+
//fuIndicator := b[h.payloadOffset]
fuHeader := b[h.payloadOffset+1]
startCode := (fuHeader & 0x80) != 0
endCode := (fuHeader & 0x40) != 0
//naluType := (fuIndicator & 0xE0) | (fuHeader & 0x1F)
naluType := fuHeader & 0x1F
nazalog.Debugf("FUA. outerNALUType=%d, naluType=%d, startCode=%t, endCode=%t %s", outerNALUType, naluType, startCode, endCode, hex.Dump(b[12:32]))
} else {
nazalog.Errorf("error. type=%d", outerNALUType)
}
// TODO chef: to be continued
// 从SDP中获取SPSPPS等信息
// 将RTP包合并出视频帧
}
// h265
//{
// originNALUType := (b[h.payloadOffset] >> 1) & 0x3F
// if originNALUType == 49 {
// header2 := b[h.payloadOffset+2]
//
// startCode := (header2 & 0x80) != 0
// endCode := (header2 & 0x40) != 0
//
// naluType := header2 & 0x3F
//
// nazalog.Debugf("FUA. originNALUType=%d, naluType=%d, startCode=%t, endCode=%t %s", originNALUType, naluType, startCode, endCode, hex.Dump(b[12:32]))
//
// } else {
// nazalog.Debugf("SINGLE. naluType=%d %s", originNALUType, hex.Dump(b[12:32]))
// }
//}
}
func (s *Session) FeedAACPacket(b []byte, h RTPHeader) {
return
// TODO chef: 目前只实现了AAC MPEG4-GENERIC/44100/2
// rfc3640 2.11. Global Structure of Payload Format
//
// +---------+-----------+-----------+---------------+
// | RTP | AU Header | Auxiliary | Access Unit |
// | Header | Section | Section | Data Section |
// +---------+-----------+-----------+---------------+
//
// <----------RTP Packet Payload----------->
//
// rfc3640 3.2.1. The AU Header Section
//
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+-+
// |AU-headers-length|AU-header|AU-header| |AU-header|padding|
// | | (1) | (2) | | (n) | bits |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+-+
//
// rfc3640 3.3.6. High Bit-rate AAC
//
nazalog.Debugf("%s", hex.Dump(b[12:]))
// au header section
auHeaderLength := b[h.payloadOffset]<<8 + b[h.payloadOffset+1]
auHeaderLength = (auHeaderLength + 7) / 8
nazalog.Debugf("auHeaderLength=%d", auHeaderLength)
// no auxiliary section
pauh := h.payloadOffset + uint32(2) // au header pos
pau := h.payloadOffset + uint32(2) + uint32(auHeaderLength) // au pos
auNum := uint32(auHeaderLength) / 2
for i := uint32(0); i < auNum; i++ {
auSize := uint32(b[pauh]<<8 | b[pauh+1]&0xF8) // 13bit
auSize /= 8
auIndex := b[pauh+1] & 0x7
// data
// pau, auSize
nazalog.Debugf("%d %d %s", auSize, auIndex, hex.Dump(b[pau:pau+auSize]))
pauh += 2
pau += uint32(auSize)
}
}

@ -10,6 +10,9 @@ package rtsp
// 注意正在学习以及实现rtsp请不要使用这个package
// TODO chef
// - rtp和rtcp作为独立package
// rfc2326
const (
@ -32,7 +35,6 @@ var (
serverName = "lalserver"
sessionID = "191201771"
// TODO chef: to be continued
// read RTP/RTCP data from port
serverPort = "8000-8001"

@ -70,7 +70,7 @@ func (s *Server) handleTCPConnect(conn net.Conn) {
var body []byte
if contentLength, ok := headers["Content-Length"]; ok {
if cl, err := strconv.Atoi(contentLength); err == nil {
body := make([]byte, cl)
body = make([]byte, cl)
l, err := io.ReadAtLeast(r, body, cl)
if l != cl || err != nil {
nazalog.Errorf("read rtsp cmd fail. content-length=%d, read length=%d, err=%+v", cl, l, err)
@ -89,6 +89,7 @@ func (s *Server) handleTCPConnect(conn net.Conn) {
_, _ = conn.Write([]byte(resp))
case MethodAnnounce:
nazalog.Info("< R ANNOUNCE")
parseSDP(body)
resp := PackResponseAnnounce(headers[HeaderFieldCSeq])
_, _ = conn.Write([]byte(resp))
case MethodSetup:

@ -0,0 +1,108 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtsp
import (
"errors"
"strconv"
"strings"
"github.com/q191201771/naza/pkg/nazalog"
)
var ErrSDP = errors.New("lal.sdp: fxxk")
type SDP struct {
}
// rfc 4566 5.14. Media Descriptions ("m=")
// m=<media> <port> <proto> <fmt> ..
//
// example:
// m=audio 0 RTP/AVP 97
//type MediaDesc struct {
// Media string
// Port string
// Proto string
// Fmt string
//}
type ARTPMap struct {
PayloadType int
EncodingName string
ClockRate string
EncodingParameters string
}
type FmtP struct {
Mode string
}
func parseSDP(b []byte) SDP {
s := string(b)
lines := strings.Split(s, "\r\n")
for _, line := range lines {
if strings.HasPrefix(line, "a=rtpmap") {
aRTPMap, err := parseARTPMap(line)
nazalog.Debugf("%+v, %v", aRTPMap, err)
}
}
return SDP{}
}
func parseARTPMap(s string) (ret ARTPMap, err error) {
// rfc 3640 3.3.1. General
// rfc 3640 3.3.6. High Bit-rate AAC
//
// a=rtpmap:<payload type> <encoding name>/<clock rate>[/<encoding parameters>]
//
// example
// a=rtpmap:96 H264/90000
// a=rtpmap:97 MPEG4-GENERIC/44100/2
items := strings.Split(s, ":")
if len(items) != 2 {
err = ErrSDP
return
}
items = strings.Split(items[1], " ")
if len(items) != 2 {
err = ErrSDP
return
}
ret.PayloadType, err = strconv.Atoi(items[0])
if err != nil {
return
}
items = strings.Split(items[1], "/")
switch len(items) {
case 3:
ret.EncodingParameters = items[2]
fallthrough
case 2:
ret.EncodingName = items[0]
ret.ClockRate = items[1]
default:
err = ErrSDP
}
return
}
func parseFmtP(s string) (ret ARTPMap, err error) {
// rfc 3640 4.4.1. The a=fmtp Keyword
//
// a=fmtp:<format> <parameter name>=<value>[; <parameter name>=<value>]
//
// example:
// a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z2QAIKzZQMApsBEAAAMAAQAAAwAyDxgxlg==,aOvssiw=; profile-level-id=640020
// a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=1210
return
}

@ -40,7 +40,6 @@ func (s *UDPServer) Listen() (err error) {
if err != nil {
return
}
nazalog.Infof("start rtcp server listen. addr=%s", s.addr)
return
}

Loading…
Cancel
Save