From 2b67df6d12cced6ca7dabc44a72646e60051eac4 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sun, 23 Aug 2020 09:39:42 +0800 Subject: [PATCH] messages: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [refactor] 新增package mpegts,将部分package hls中代码抽离出来 - [feat] 在各协议的标准字段中写入lal版本信息 - [log] 整理所有session的日志 --- app/demo/analyseflv/analyseflv.go | 1 + app/demo/learnts/learnts.go | 24 +- app/demo/tscmp/tscmp.go | 22 +- pkg/avc/avc.go | 2 +- pkg/base/version.go | 95 ++++++- pkg/hls/fragment.go | 234 +---------------- pkg/hls/hls.go | 116 +-------- pkg/hls/hls_test.go | 17 +- pkg/hls/muxer.go | 157 +++++++----- pkg/hls/server.go | 4 + pkg/httpflv/client_pull_session.go | 13 +- pkg/httpflv/server_sub_session.go | 31 ++- pkg/mpegts/mpegts.go | 118 +++++++++ pkg/mpegts/mpegts_test.go | 28 ++ pkg/mpegts/pack.go | 241 ++++++++++++++++++ pkg/{hls/ts_pat.go => mpegts/pat.go} | 2 +- pkg/{hls/ts_pes.go => mpegts/pes.go} | 2 +- pkg/{hls/ts_pmt.go => mpegts/pmt.go} | 2 +- .../ts_packet_header.go} | 2 +- pkg/rtmp/client_session.go | 14 +- pkg/rtmp/handshake.go | 19 +- pkg/rtmp/message_packer.go | 24 +- pkg/rtmp/message_packer_test.go | 21 +- pkg/rtmp/metadata.go | 2 +- pkg/rtmp/metadata_test.go | 2 +- pkg/rtmp/server_session.go | 7 +- pkg/rtsp/pack.go | 3 +- pkg/rtsp/pub_session.go | 1 + showdeps.sh | 12 +- 29 files changed, 701 insertions(+), 515 deletions(-) create mode 100644 pkg/mpegts/mpegts.go create mode 100644 pkg/mpegts/mpegts_test.go create mode 100644 pkg/mpegts/pack.go rename pkg/{hls/ts_pat.go => mpegts/pat.go} (99%) rename pkg/{hls/ts_pes.go => mpegts/pes.go} (99%) rename pkg/{hls/ts_pmt.go => mpegts/pmt.go} (99%) rename pkg/{hls/ts_header.go => mpegts/ts_packet_header.go} (99%) diff --git a/app/demo/analyseflv/analyseflv.go b/app/demo/analyseflv/analyseflv.go index f00b3fa..309512b 100644 --- a/app/demo/analyseflv/analyseflv.go +++ b/app/demo/analyseflv/analyseflv.go @@ -219,6 +219,7 @@ func analysisVideoTag(tag httpflv.Tag) { // 注意,SEI的内容是自定义格式,解析的代码不具有通用性 func SEIDelayMS(seiNALU []byte) int { + nazalog.Debugf("sei: %s", hex.Dump(seiNALU)) items := strings.Split(string(seiNALU), ":") if len(items) != 3 { return -1 diff --git a/app/demo/learnts/learnts.go b/app/demo/learnts/learnts.go index 511cae2..97926a2 100644 --- a/app/demo/learnts/learnts.go +++ b/app/demo/learnts/learnts.go @@ -11,6 +11,8 @@ package main import ( "io/ioutil" + "github.com/q191201771/lal/pkg/mpegts" + "github.com/q191201771/lal/pkg/hls" "github.com/q191201771/naza/pkg/nazalog" ) @@ -18,8 +20,8 @@ import ( // 学习如何解析TS文件。注意,该程序还没有写完。 var ( - pat hls.PAT - pmt hls.PMT + pat mpegts.PAT + pmt mpegts.PMT pid2stream map[uint16]*Stream ) @@ -29,27 +31,27 @@ type Stream struct { var filename = "/Volumes/Data/nrm-0.ts" func handlePacket(packet []byte) { - h := hls.ParseTSPacketHeader(packet) + h := mpegts.ParseTSPacketHeader(packet) index := 4 nazalog.Debugf("%+v", h) - var adaptation hls.TSPacketAdaptation + var adaptation mpegts.TSPacketAdaptation switch h.Adaptation { - case hls.AdaptationFieldControlNo: + case mpegts.AdaptationFieldControlNo: // noop - case hls.AdaptationFieldControlFollowed: - adaptation = hls.ParseTSPacketAdaptation(packet[4:]) + case mpegts.AdaptationFieldControlFollowed: + adaptation = mpegts.ParseTSPacketAdaptation(packet[4:]) index++ default: nazalog.Warn(h.Adaptation) } index += int(adaptation.Length) - if h.Pid == hls.PidPAT { + if h.Pid == mpegts.PidPAT { if h.PayloadUnitStart == 1 { index++ } - pat = hls.ParsePAT(packet[index:]) + pat = mpegts.ParsePAT(packet[index:]) nazalog.Debugf("%+v", pat) return } @@ -58,7 +60,7 @@ func handlePacket(packet []byte) { if h.PayloadUnitStart == 1 { index++ } - pmt = hls.ParsePMT(packet[index:]) + pmt = mpegts.ParsePMT(packet[index:]) nazalog.Debugf("%+v", pmt) for _, ele := range pmt.ProgramElements { @@ -74,7 +76,7 @@ func handlePacket(packet []byte) { // 判断是否有PES if h.PayloadUnitStart == 1 { - pes, length := hls.ParsePES(packet[index:]) + pes, length := mpegts.ParsePES(packet[index:]) nazalog.Debugf("%+v, %d", pes, length) } } diff --git a/app/demo/tscmp/tscmp.go b/app/demo/tscmp/tscmp.go index 1eadcac..da1bcf3 100644 --- a/app/demo/tscmp/tscmp.go +++ b/app/demo/tscmp/tscmp.go @@ -13,19 +13,21 @@ import ( "encoding/hex" "io/ioutil" + "github.com/q191201771/lal/pkg/mpegts" + "github.com/q191201771/lal/pkg/hls" "github.com/q191201771/naza/pkg/nazalog" ) // 临时小工具,比较两个TS文件。注意,该程序还没有写完。 -var filename1 = "/Volumes/Data/tmp/lal-4.ts" -var filename2 = "/Volumes/Data/tmp/nrm-4.ts" +var filename1 = "/tmp/lal/hls/innertest/innertest-0.ts" +var filename2 = "/tmp/lal/hls/new/innertest-0.ts" func skipPacketFilter(tss [][]byte) (ret [][]byte) { for _, ts := range tss { - h := hls.ParseTSPacketHeader(ts) - if h.Pid == hls.PidAudio { + h := mpegts.ParseTSPacketHeader(ts) + if h.Pid == mpegts.PidAudio { continue } ret = append(ret, ts) @@ -34,16 +36,16 @@ func skipPacketFilter(tss [][]byte) (ret [][]byte) { } func parsePacket(packet []byte) { - h := hls.ParseTSPacketHeader(packet) + h := mpegts.ParseTSPacketHeader(packet) nazalog.Debugf("%+v", h) index := 4 - var adaptation hls.TSPacketAdaptation + var adaptation mpegts.TSPacketAdaptation switch h.Adaptation { - case hls.AdaptationFieldControlNo: + case mpegts.AdaptationFieldControlNo: // noop - case hls.AdaptationFieldControlFollowed: - adaptation = hls.ParseTSPacketAdaptation(packet[4:]) + case mpegts.AdaptationFieldControlFollowed: + adaptation = mpegts.ParseTSPacketAdaptation(packet[4:]) index++ default: nazalog.Warn(h.Adaptation) @@ -51,7 +53,7 @@ func parsePacket(packet []byte) { index += int(adaptation.Length) if h.PayloadUnitStart == 1 && h.Pid == 256 { - pes, length := hls.ParsePES(packet[index:]) + pes, length := mpegts.ParsePES(packet[index:]) nazalog.Debugf("%+v, %d", pes, length) } } diff --git a/pkg/avc/avc.go b/pkg/avc/avc.go index ea56544..9c03964 100644 --- a/pkg/avc/avc.go +++ b/pkg/avc/avc.go @@ -17,7 +17,7 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -// Annex B: +// AnnexB: // keywords: MPEG-2 transport stream, ElementaryStream(ES), // nalu with start code. // e.g. ts diff --git a/pkg/base/version.go b/pkg/base/version.go index 4ea062e..991e901 100644 --- a/pkg/base/version.go +++ b/pkg/base/version.go @@ -8,24 +8,99 @@ package base +import "strings" + // 版本信息相关 -// lal的一部分版本信息使用了naza.bininfo,手段是获取git信息。 -// 另外,我们也在本文件提供另外一些信息: +// lal的一部分版本信息使用了naza.bininfo +// 另外,我们也在本文件提供另外一些信息 +// 并且将这些信息打入可执行文件、日志、各协议中的标准版本字段中 // 版本,该变量由build脚本修改维护 var LALVersion = "v0.13.0" -// 以下字段固定不变 var ( LALLibraryName = "lal" LALGitHubRepo = "github.com/q191201771/lal" - LALFullInfo = LALLibraryName + " " + LALVersion + " (" + LALGitHubRepo + ")" - //LALServerName = "lalserver" - //LALGitHubRepoURL = "https://github.com/q191201771/lal" + + // e.g. lal v0.12.3 (github.com/q191201771/lal) + LALFullInfo = LALLibraryName + " " + LALVersion + " (" + LALGitHubRepo + ")" + + // e.g. 0.12.3 + LALVersionDot string + + // e.g. 0,12,3 + LALVersionComma string +) + +var ( + // 植入rtmp握手随机字符串中 + // e.g. lal v0.12.3 (github.com/q191201771/lal) + LALRTMPHandshakeWaterMark string + + // 植入rtmp server中的connect result信令中 + // 注意,有两个object,第一个object中的fmsVer我们保持通用公认的值,在第二个object中植入 + // e.g. 0,12,3 + LALRTMPConnectResultVersion string + + // e.g. lal0.12.3 + LALRTMPPushSessionConnectVersion string + + // e.g. lal0.12.3 + LALRTMPBuildMetadataEncoder string + + // e.g. lal/0.12.3 + LALHTTPFLVPullSessionUA string + + // e.g. lal0.12.3 + LALHTTPFLVSubSessionServer string + + // e.g. lal0.12.3 + LALHLSM3U8Server string + + // e.g. lal0.12.3 + LALHLSTSServer string + + // e.g. lal0.12.3 + LALRTSPOptionsResponseServer string ) -// 作为HTTP客户端时,考虑User-Agent +// - rtmp handshake random buf +// - rtmp server +// - rtmp message connect result +// - cdnws: 第一个obj: `fmsVer: FMS/3,0,1,123` 第二个obj: `version: x,x,x,xxx` +// - rtmp client +// - rtmp message connect +// - ffmpeg push: `flashVer: FMLE/3.0 (compatible; Lavf57.83.100)` +// - ffmpeg pull: `flashVer: LNX 9,0,124,2` -- emulated Flash client version - 9.0.124.2 on Linux +// - rtmp/flv build metadata +// - encoder: Lavf57.83.100 +// - httpflv pull +// - wget: User-Agent: Wget/1.19.1 (darwin15.6.0) +// - httpflv sub +// - `server:` +// - hls +// - m3u8 +// - `Server:` +// - ts +// - `Server:` +// - rtsp +// - Options response `Server:` // -// RTMP metadata -// description : Bilibili VXCode Swarm Transcoder v0.2.30(gap_fixed:False) -// encoder : Lavf57.83.100 + +func init() { + LALVersionDot = strings.TrimPrefix(LALVersion, "v") + LALVersionComma = strings.ReplaceAll(LALVersionDot, ".", ",") + + LALRTMPConnectResultVersion = LALVersionComma + + LALRTMPPushSessionConnectVersion = LALLibraryName + LALVersionDot + LALRTMPBuildMetadataEncoder = LALLibraryName + LALVersionDot + LALHTTPFLVSubSessionServer = LALLibraryName + LALVersionDot + LALHLSM3U8Server = LALLibraryName + LALVersionDot + LALHLSTSServer = LALLibraryName + LALVersionDot + LALRTSPOptionsResponseServer = LALLibraryName + LALVersionDot + + LALHTTPFLVPullSessionUA = LALLibraryName + "/" + LALVersionDot + + LALRTMPHandshakeWaterMark = LALFullInfo +} diff --git a/pkg/hls/fragment.go b/pkg/hls/fragment.go index 43b48d0..2bedff5 100644 --- a/pkg/hls/fragment.go +++ b/pkg/hls/fragment.go @@ -10,243 +10,33 @@ package hls import ( "os" -) -type FragmentOP struct { - fp *os.File - packet []byte //WriteFrame中缓存每个TS包数据 -} + "github.com/q191201771/lal/pkg/mpegts" +) -type mpegTSFrame struct { - pts uint64 - dts uint64 - pid uint16 - sid uint8 - cc uint8 - key bool // 关键帧 +type Fragment struct { + fp *os.File } -func (f *FragmentOP) OpenFile(filename string) (err error) { +func (f *Fragment) OpenFile(filename string) (err error) { f.fp, err = os.Create(filename) if err != nil { return } - f.writeFile(FixedFragmentHeader) - //TS包固定188-byte - f.packet = make([]byte, 188) + f.writeFile(mpegts.FixedFragmentHeader) return nil } -func (f *FragmentOP) WriteFrame(frame *mpegTSFrame, b []byte) { - //nazalog.Debugf("mpegts: pid=%d, sid=%d, pts=%d, dts=%d, key=%b, size=%d", frame.pid, frame.sid, frame.pts, frame.dts, frame.key, len(b)) - - wpos := 0 // 当前packet的写入位置 - lpos := 0 // 当前帧的处理位置 - rpos := len(b) // 当前帧大小 - - first := true // 是否为帧的首个packet的标准 - - for lpos != rpos { - wpos = 0 - frame.cc++ - - // 每个packet都需要添加TS Header - // -----TS Header---------------- - // sync_byte - // transport_error_indicator 0 - // payload_unit_start_indicator - // transport_priority 0 - // PID - // transport_scrambling_control 0 - // adaptation_field_control - // continuity_counter - // ------------------------------ - f.packet[0] = syncByte // sync_byte - f.packet[1] = 0x0 - if first { - f.packet[1] = 0x40 // payload_unit_start_indicator - } - f.packet[1] |= uint8((frame.pid >> 8) & 0x1F) //PID高5位 - f.packet[2] = uint8(frame.pid & 0xFF) //PID低8位 - - // adaptation_field_control 先设置成无Adaptation - // continuity_counter - f.packet[3] = 0x10 | (frame.cc & 0x0f) - wpos += 4 - - if first { - if frame.key { - // 关键帧的首个packet需要添加Adaptation - // -----Adaptation----------------------- - // adaptation_field_length - // discontinuity_indicator 0 - // random_access_indicator 1 - // elementary_stream_priority_indicator 0 - // PCR_flag 1 - // OPCR_flag 0 - // splicing_point_flag 0 - // transport_private_data_flag 0 - // adaptation_field_extension_flag 0 - // program_clock_reference_base - // reserved - // program_clock_reference_extension - // -------------------------------------- - f.packet[3] |= 0x20 // adaptation_field_control 设置Adaptation - f.packet[4] = 7 // adaptation_field_length - f.packet[5] = 0x50 // random_access_indicator + PCR_flag - mpegtsdWritePCR(f.packet[6:], frame.dts-delay) // using 6 byte - wpos += 8 - } - - // 帧的首个packet需要添加PES Header - // -----PES Header------------ - // packet_start_code_prefix - // stream_id - // PES_packet_length - // '10' - // PES_scrambling_control 0 - // PES_priority 0 - // data_alignment_indicator 0 - // copyright 0 - // original_or_copy 0 - // PTS_DTS_flags - // ESCR_flag 0 - // ES_rate_flag 0 - // DSM_trick_mode_flag 0 - // additional_copy_info_flag 0 - // PES_CRC_flag 0 - // PES_extension_flag 0 - // PES_header_data_length - // --------------------------- - f.packet[wpos] = 0x00 // packet_start_code_prefix 24-bits - f.packet[wpos+1] = 0x00 // - f.packet[wpos+2] = 0x01 // - f.packet[wpos+3] = frame.sid // stream_id - wpos += 4 - - // 计算PES Header中一些字段的值 - // PTS相关 - headerSize := uint8(5) - flags := uint8(0x80) - // DTS相关 - if frame.dts != frame.pts { - headerSize += 5 - flags |= 0x40 - } - - pesSize := rpos + int(headerSize) + 3 // PES Header剩余3字节 + PTS/PTS长度 + 整个帧的长度 - if pesSize > 0xFFFF { - pesSize = 0 - } - - f.packet[wpos] = uint8(pesSize >> 8) // PES_packet_length - f.packet[wpos+1] = uint8(pesSize & 0xFF) // - f.packet[wpos+2] = 0x80 // 除了reserve的'10',其他字段都是0 - f.packet[wpos+3] = flags // PTS/DTS flag - f.packet[wpos+4] = headerSize // PES_header_data_length: PTS+DTS数据长度 - wpos += 5 - - // 写入PTS的值 - mpegtsWritePTS(f.packet[wpos:], flags>>6, frame.pts+delay) - wpos += 5 - // 写入DTS的值 - if frame.pts != frame.dts { - mpegtsWritePTS(f.packet[wpos:], 1, frame.dts+delay) - wpos += 5 - } - - first = false - } - - // 把帧的内容切割放入packet中 - bodySize := 188 - wpos // 当前TS packet,可写入大小 - inSize := rpos - lpos // 整个帧剩余待打包大小 - - if bodySize <= inSize { - // 当前packet写不完这个帧,或者刚好够写完 - copy(f.packet[wpos:], b[lpos:lpos+inSize]) - lpos += bodySize - } else { - // 当前packet可以写完这个帧,并且还有空闲空间 - // 此时,真实数据挪最后,中间用0xFF填充到Adaptation中 - // 注意,此时有两种情况 - // 1. 原本有Adaptation - // 2. 原本没有Adaptation - - stuffSize := bodySize - inSize // 当前TS packet的剩余空闲空间 - - if f.packet[3]&0x20 != 0 { - // has Adaptation - - base := int(4 + f.packet[4]) // TS Header + Adaptation - if wpos > base { - // 比如有PES Header - - copy(f.packet[base+stuffSize:], f.packet[base:wpos]) - } - wpos = base + stuffSize - - f.packet[4] += uint8(stuffSize) // adaptation_field_length - for i := 0; i < stuffSize; i++ { - f.packet[base+i] = 0xFF - } - } else { - // no Adaptation - - f.packet[3] |= 0x20 - - base := 4 - if wpos > base { - copy(f.packet[base+stuffSize:], f.packet[base:wpos]) - } - wpos += stuffSize - - f.packet[4] = uint8(stuffSize - 1) // adaptation_field_length - if stuffSize >= 2 { - // TODO chef 这里是参考nginx rtmp module的实现,为什么这个字节写0而不是0xFF - f.packet[5] = 0 - for i := 0; i < stuffSize-2; i++ { - f.packet[6+i] = 0xFF - } - } - } - - // 真实数据放在packet尾部 - copy(f.packet[wpos:], b[lpos:lpos+inSize]) - lpos = rpos - } - - f.writeFile(f.packet) - } +func (f *Fragment) WriteFrame(frame *mpegts.Frame) { + mpegts.PackTSPacket(frame, func(packet []byte, cc uint8) { + f.writeFile(packet) + }) } -func (f *FragmentOP) CloseFile() { +func (f *Fragment) CloseFile() { _ = f.fp.Close() } -func (f *FragmentOP) writeFile(b []byte) { +func (f *Fragment) writeFile(b []byte) { _, _ = f.fp.Write(b) } - -func mpegtsdWritePCR(out []byte, pcr uint64) { - out[0] = uint8(pcr >> 25) - out[1] = uint8(pcr >> 17) - out[2] = uint8(pcr >> 9) - out[3] = uint8(pcr >> 1) - out[4] = uint8(pcr<<7) | 0x7e - out[5] = 0 -} - -// write PTS or DTS -func mpegtsWritePTS(out []byte, fb uint8, pts uint64) { - var val uint64 - out[0] = (fb << 4) | (uint8(pts>>30) & 0x07) | 1 - - val = (((pts >> 15) & 0x7FFF) << 1) | 1 - out[1] = uint8(val >> 8) - out[2] = uint8(val) - - val = ((pts & 0x7FFF) << 1) | 1 - out[3] = uint8(val >> 8) - out[4] = uint8(val) -} diff --git a/pkg/hls/hls.go b/pkg/hls/hls.go index 79b31c1..59fec51 100644 --- a/pkg/hls/hls.go +++ b/pkg/hls/hls.go @@ -8,8 +8,6 @@ package hls -// 声明,本package参考了c语言实现的开源项目nginx-rtmp-module - // TODO chef: // - 支持HEVC // - 检查所有的容错处理,是否会出现 @@ -31,123 +29,15 @@ package hls // 进来的数据称为Frame帧,188字节的封装称为TSPacket包,TS文件称为Fragment -// 每个TS文件都以固定的PAT,PMT开始 -var FixedFragmentHeader = []byte{ - /* TS */ - 0x47, 0x40, 0x00, 0x10, 0x00, - /* PSI */ - 0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00, - /* PAT */ - 0x00, 0x01, 0xf0, 0x01, - /* CRC */ - 0x2e, 0x70, 0x19, 0x05, - - /* stuffing 167 bytes */ - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - - /* TS */ - 0x47, 0x50, 0x01, 0x10, 0x00, - /* PSI */ - 0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00, - /* PMT */ - 0xe1, 0x00, - 0xf0, 0x00, - 0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */ - 0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */ - /* CRC */ - 0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */ - /* stuffing 157 bytes */ - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, -} - var audNal = []byte{ 0x00, 0x00, 0x00, 0x01, 0x09, 0xf0, } -// TS Packet Header -const ( - syncByte uint8 = 0x47 - - PidPAT uint16 = 0 - - // ------------------------------------------ - // <iso13818-1.pdf> <Table 2-5> <page 38/174> - // ------------------------------------------ - AdaptationFieldControlReserved uint8 = 0 // Reserved for future use by ISO/IEC - AdaptationFieldControlNo uint8 = 1 // No adaptation_field, payload only - AdaptationFieldControlOnly uint8 = 2 // Adaptation_field only, no payload - AdaptationFieldControlFollowed uint8 = 3 // Adaptation_field followed by payload -) - -// PMT const ( - // ----------------------------------------------------------------------------- - // <iso13818-1.pdf> <Table 2-29 Stream type assignments> <page 66/174> - // 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax - // 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video - // ----------------------------------------------------------------------------- - streamTypeAAC uint8 = 0x0F - streamTypeAVC uint8 = 0x1B -) - -// PES -const ( - // ----------------------------------------------------------------- - // <iso13818-1.pdf> <Table 2-18-Stream_id assignments> <page 52/174> - // ----------------------------------------------------------------- - streamIDAudio uint8 = 192 // 110x xxxx 0xC0 - streamIDVideo uint8 = 224 // 1110 xxxx - - // ------------------------------ - // <iso13818-1.pdf> <page 53/174> - // ------------------------------ - PTSDTSFlags0 uint8 = 0 // no PTS no DTS - PTSDTSFlags1 uint8 = 1 // forbidden - PTSDTSFlags2 uint8 = 2 // only PTS - PTSDTSFlags3 uint8 = 3 // both PTS and DTS -) - -const ( - PidVideo uint16 = 0x100 - PidAudio uint16 = 0x101 - delay uint64 = 63000 // 700 ms PCR delay TODO chef: 具体作用? - // TODO chef 这些在配置项中提供 - negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了,比当前fragment的首个时间戳还小,强制切割新的fragment,单位毫秒 * 90 - maxAudioDelay uint64 = 300 // 单位毫秒 - - appName = "hls" + negMaxfraglen uint64 = 1000 * 90 // 当前包时间戳回滚了,比当前fragment的首个时间戳还小,强制切割新的fragment,单位(毫秒*90) + maxAudioCacheDelayByAudio uint64 = 150 * 90 // 单位(毫秒*90) + maxAudioCacheDelayByVideo uint64 = 300 * 90 // 单位(毫秒*90) ) func SplitFragment2TSPackets(content []byte) (ret [][]byte) { diff --git a/pkg/hls/hls_test.go b/pkg/hls/hls_test.go index 1aece2b..8bb5d43 100644 --- a/pkg/hls/hls_test.go +++ b/pkg/hls/hls_test.go @@ -12,23 +12,8 @@ import ( "testing" "github.com/q191201771/lal/pkg/innertest" - - "github.com/q191201771/lal/pkg/hls" - "github.com/q191201771/naza/pkg/nazalog" ) -func TestParseFixedTSPacket(t *testing.T) { - h := hls.ParseTSPacketHeader(hls.FixedFragmentHeader) - nazalog.Debugf("%+v", h) - pat := hls.ParsePAT(hls.FixedFragmentHeader[5:]) - nazalog.Debugf("%+v", pat) - - h = hls.ParseTSPacketHeader(hls.FixedFragmentHeader[188:]) - nazalog.Debugf("%+v", h) - pmt := hls.ParsePMT(hls.FixedFragmentHeader[188+5:]) - nazalog.Debugf("%+v", pmt) -} - -func TestHls(t *testing.T) { +func TestHLS(t *testing.T) { innertest.InnerTestEntry(t) } diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index a8d309d..e74e65a 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -13,6 +13,8 @@ import ( "fmt" "os" + "github.com/q191201771/lal/pkg/mpegts" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/lal/pkg/avc" @@ -48,13 +50,13 @@ type Muxer struct { config *MuxerConfig - fragmentOP FragmentOP - opened bool - adts aac.ADTS - spspps []byte // AnnexB - videoCC uint8 - audioCC uint8 - videoOut []byte // 帧 + fragment Fragment + opened bool + adts aac.ADTS + spspps []byte // AnnexB + videoCC uint8 + audioCC uint8 + videoOut []byte // 帧 fragTS uint64 // 新建立fragment时的时间戳,毫秒 * 90 @@ -62,21 +64,19 @@ type Muxer struct { frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段 frags []fragmentInfo // TS文件的环形队列,记录TS的信息,比如写M3U8文件时要用 2 * winfrags + 1 - aaframe []byte - aframePTS uint64 // 最新音频帧的时间戳 + audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 + audioCacheFirstFramePTS uint64 // audioCacheFrames中第一个音频帧的时间戳 } func NewMuxer(streamName string, config *MuxerConfig) *Muxer { uk := unique.GenUniqueKey("HLSMUXER") - nazalog.Infof("[%s] lifecycle new hls muxer. streamName=%s", uk, streamName) - op := getMuxerOutPath(config.OutPath, streamName) playlistFilename := getM3U8Filename(op, streamName) playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename) videoOut := make([]byte, 1024*1024) videoOut = videoOut[0:0] frags := make([]fragmentInfo, 2*config.FragmentNum+1) // TODO chef: 为什么是 * 2 + 1 - return &Muxer{ + m := &Muxer{ UniqueKey: uk, streamName: streamName, outPath: op, @@ -84,9 +84,11 @@ func NewMuxer(streamName string, config *MuxerConfig) *Muxer { playlistFilenameBak: playlistFilenameBak, config: config, videoOut: videoOut, - aaframe: nil, + audioCacheFrames: nil, frags: frags, } + nazalog.Infof("[%s] lifecycle new hls muxer. muxer=%p, streamName=%s", uk, m, streamName) + return m } func (m *Muxer) Start() { @@ -100,7 +102,8 @@ func (m *Muxer) Dispose() { m.closeFragment(true) } -// 函数调用结束后,内部不持有msg中的内存块 +// @param msg 函数调用结束后,内部不持有msg中的内存块 +// func (m *Muxer) FeedRTMPMessage(msg base.RTMPMsg) { switch msg.Header.MsgTypeID { case base.RTMPTypeIDAudio: @@ -124,6 +127,9 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) { ftype := msg.Payload[0] & 0xF0 >> 4 htype := msg.Payload[1] + // 将数据转换成AnnexB + + // 如果是sps pps,缓存住,然后直接返回 if ftype == 1 && htype == 0 { if err := m.cacheSPSPPS(msg); err != nil { nazalog.Errorf("[%s] cache spspps failed. err=%+v", m.UniqueKey, err) @@ -137,6 +143,8 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) { spsppsSent := false // 优化这块buffer out := m.videoOut[0:0] + + // tag中可能有多个NALU,逐个获取 for i := 5; i != len(msg.Payload); { if i+4 > len(msg.Payload) { nazalog.Errorf("[%s] slice len not enough. i=%d, len=%d", m.UniqueKey, i, len(msg.Payload)) @@ -151,8 +159,10 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) { nalType := avc.ParseNALUType(msg.Payload[i]) - //nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts) + nazalog.Debugf("[%s] hls: h264 NAL type=%d, len=%d(%d) cts=%d.", m.UniqueKey, nalType, nalBytes, len(msg.Payload), cts) + // sps pps前面已经缓存过了,这里就不用处理了 + // aud有自己的生产逻辑,原流中的aud直接过滤掉 if nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD { i += nalBytes continue @@ -161,10 +171,12 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) { if !audSent { switch nalType { case avc.NALUTypeSlice, avc.NALUTypeIDRSlice, avc.NALUTypeSEI: + // 在前面写入aud out = append(out, audNal...) audSent = true - case avc.NALUTypeAUD: - audSent = true + //case avc.NALUTypeAUD: + // // 上面aud已经continue跳过了,应该进不到这个分支,可以考虑删除这个分支代码 + // audSent = true } } @@ -172,6 +184,7 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) { case avc.NALUTypeSlice: spsppsSent = false case avc.NALUTypeIDRSlice: + // 如果是关键帧,在前面写入sps pps if !spsppsSent { out = m.appendSPSPPS(out) } @@ -179,35 +192,41 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) { } + // 这里不知为什么要区分写入两种类型的start code if len(out) == 0 { out = append(out, avc.NALUStartCode4...) } else { out = append(out, avc.NALUStartCode3...) } + out = append(out, msg.Payload[i:i+nalBytes]...) i += nalBytes } - var frame mpegTSFrame - frame.cc = m.videoCC - frame.dts = uint64(msg.Header.TimestampAbs) * 90 - frame.pts = frame.dts + uint64(cts)*90 - frame.pid = PidVideo - frame.sid = streamIDVideo - frame.key = ftype == 1 + key := ftype == 1 + dts := uint64(msg.Header.TimestampAbs) * 90 - boundary := frame.key && (!m.opened || !m.adts.HasInited() || m.aaframe != nil) + boundary := key && (!m.opened || !m.adts.HasInited() || m.audioCacheFrames != nil) - m.updateFragment(frame.dts, boundary, 1) + m.updateFragment(dts, boundary, false) if !m.opened { nazalog.Warnf("[%s] not opened.", m.UniqueKey) return } - m.fragmentOP.WriteFrame(&frame, out) - m.videoCC = frame.cc + var frame mpegts.Frame + frame.CC = m.videoCC + frame.DTS = dts + frame.PTS = frame.DTS + uint64(cts)*90 + frame.Key = key + frame.Raw = out + frame.Pid = mpegts.PidVideo + frame.Sid = mpegts.StreamIDVideo + nazalog.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw)) + m.fragment.WriteFrame(&frame) + m.videoCC = frame.CC } func (m *Muxer) feedAudio(msg base.RTMPMsg) { @@ -218,6 +237,8 @@ func (m *Muxer) feedAudio(msg base.RTMPMsg) { return } + nazalog.Debugf("[%s] hls: feedAudio. dts=%d len=%d", m.UniqueKey, msg.Header.TimestampAbs, len(msg.Payload)) + if msg.Payload[1] == 0 { m.cacheAACSeqHeader(msg) return @@ -230,15 +251,15 @@ func (m *Muxer) feedAudio(msg base.RTMPMsg) { pts := uint64(msg.Header.TimestampAbs) * 90 - m.updateFragment(pts, m.spspps == nil, 2) + m.updateFragment(pts, m.spspps == nil, true) - if m.aaframe == nil { - m.aframePTS = pts + if m.audioCacheFrames == nil { + m.audioCacheFirstFramePTS = pts } adtsHeader, _ := m.adts.CalcADTSHeader(uint16(msg.Header.MsgLen - 2)) - m.aaframe = append(m.aaframe, adtsHeader...) - m.aaframe = append(m.aaframe, msg.Payload[2:]...) + m.audioCacheFrames = append(m.audioCacheFrames, adtsHeader...) + m.audioCacheFrames = append(m.audioCacheFrames, msg.Payload[2:]...) } func (m *Muxer) cacheAACSeqHeader(msg base.RTMPMsg) { @@ -261,14 +282,20 @@ func (m *Muxer) appendSPSPPS(out []byte) []byte { return out } -func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) { +// 决定是否开启新的TS切片文件(注意,可能已经有TS切片,也可能没有,这是第一个切片),以及落盘音频数据 +// +// @param boundary 调用方认为可能是开启新TS切片的时间点 +// @param isByAudio 触发该函数调用,是因为收到音频数据,还是视频数据 +// +func (m *Muxer) updateFragment(ts uint64, boundary bool, isByAudio bool) { force := false discont := true var f *fragmentInfo + // 如果已经有TS切片,检查是否需要强制开启新的切片,以及切片是否发生跳跃 // 注意,音频和视频是在一起检查的 if m.opened { - f = m.getFrag(m.nfrags) + f = m.getCurrFrag() // 当前时间戳跳跃很大,或者是往回跳跃超过了阈值,强制开启新的fragment maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10) @@ -280,11 +307,11 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) { f.duration = float64(ts-m.fragTS) / 90000 discont = false } - } - // 时长超过设置的ts文件切片阈值才行 - if f != nil && f.duration < float64(m.config.FragmentDurationMS)/1000 { - boundary = false + // 已经有TS切片,那么只有当前fragment的时长超过设置的TS切片阈值才开启新的切片 + if f.duration < float64(m.config.FragmentDurationMS)/1000 { + boundary = false + } } // 开启新的fragment @@ -294,11 +321,18 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) { } // 音频已经缓存了一定时长的数据了,需要落盘了 - if m.opened && m.aaframe != nil && ((m.aframePTS + maxAudioDelay*90/uint64(flushRate)) < ts) { + var maxAudioDelay uint64 + if isByAudio { + maxAudioDelay = maxAudioCacheDelayByAudio + } else { + maxAudioDelay = maxAudioCacheDelayByVideo + } + if m.opened && m.audioCacheFrames != nil && ((m.audioCacheFirstFramePTS + maxAudioDelay) < ts) { m.flushAudio() } } +// @param discont 不连续标志,会在m3u8文件的fragment前增加`#EXT-X-DISCONTINUITY` func (m *Muxer) openFragment(ts uint64, discont bool) { if m.opened { return @@ -307,10 +341,10 @@ func (m *Muxer) openFragment(ts uint64, discont bool) { id := m.getFragmentID() filename := getTSFilename(m.outPath, m.streamName, id) - _ = m.fragmentOP.OpenFile(filename) + _ = m.fragment.OpenFile(filename) m.opened = true - frag := m.getFrag(m.nfrags) + frag := m.getCurrFrag() frag.discont = discont frag.id = id @@ -324,11 +358,11 @@ func (m *Muxer) closeFragment(isLast bool) { return } - m.fragmentOP.CloseFile() + m.fragment.CloseFile() m.opened = false //更新序号,为下个分片准备好 - m.nextFrag() + m.incrFrag() m.writePlaylist(isLast) } @@ -391,8 +425,11 @@ func (m *Muxer) getFrag(n int) *fragmentInfo { return &m.frags[(m.frag+n)%(m.config.FragmentNum*2+1)] } -// TODO chef: 这个函数重命名为incr更好些 -func (m *Muxer) nextFrag() { +func (m *Muxer) getCurrFrag() *fragmentInfo { + return m.getFrag(m.nfrags) +} + +func (m *Muxer) incrFrag() { if m.nfrags == m.config.FragmentNum { m.frag++ } else { @@ -411,21 +448,21 @@ func (m *Muxer) flushAudio() { return } - if m.aaframe == nil { + if m.audioCacheFrames == nil { return } - frame := &mpegTSFrame{ - pts: m.aframePTS, - dts: m.aframePTS, - pid: PidAudio, - sid: streamIDAudio, - cc: m.audioCC, - key: false, - } - - m.fragmentOP.WriteFrame(frame, m.aaframe) - - m.audioCC = frame.cc - m.aaframe = nil + var frame mpegts.Frame + frame.CC = m.audioCC + frame.DTS = m.audioCacheFirstFramePTS + frame.PTS = m.audioCacheFirstFramePTS + frame.Key = false + frame.Raw = m.audioCacheFrames + frame.Pid = mpegts.PidAudio + frame.Sid = mpegts.StreamIDAudio + nazalog.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw)) + m.fragment.WriteFrame(&frame) + m.audioCC = frame.CC + + m.audioCacheFrames = nil } diff --git a/pkg/hls/server.go b/pkg/hls/server.go index 2e7cddd..c7bcd9d 100644 --- a/pkg/hls/server.go +++ b/pkg/hls/server.go @@ -12,6 +12,8 @@ import ( "net" "net/http" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/nazalog" ) @@ -74,8 +76,10 @@ func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) { switch ri.fileType { case "m3u8": resp.Header().Add("Content-Type", "application/x-mpegurl") + resp.Header().Add("Server", base.LALHLSM3U8Server) case "ts": resp.Header().Add("Content-Type", "video/mp2t") + resp.Header().Add("Server", base.LALHLSTSServer) } resp.Header().Add("Cache-Control", "no-cache") diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 31ee271..4a7e1e0 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -15,6 +15,8 @@ import ( "strings" "time" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/nazahttp" "github.com/q191201771/naza/pkg/connection" @@ -53,11 +55,12 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession { } uk := unique.GenUniqueKey("FLVPULL") - nazalog.Infof("[%s] lifecycle new PullSession.", uk) - return &PullSession{ + s := &PullSession{ option: option, UniqueKey: uk, } + nazalog.Infof("[%s] lifecycle new httpflv PullSession. session=%p", uk, s) + return s } type OnReadFLVTag func(tag Tag) @@ -81,7 +84,7 @@ func (session *PullSession) Pull(rawURL string, onReadFLVTag OnReadFLVTag) error } func (session *PullSession) Dispose() { - nazalog.Infof("[%s] lifecycle dispose PullSession.", session.UniqueKey) + nazalog.Infof("[%s] lifecycle dispose httpflv PullSession.", session.UniqueKey) _ = session.Conn.Close() } @@ -126,8 +129,8 @@ func (session *PullSession) Connect(rawURL string) error { func (session *PullSession) WriteHTTPRequest() error { // # 发送 http GET 请求 nazalog.Debugf("[%s] > W http request. GET %s", session.UniqueKey, session.pathWithQuery) - req := fmt.Sprintf("GET %s HTTP/1.0\r\nAccept: */*\r\nRange: byte=0-\r\nConnection: close\r\nHost: %s\r\nIcy-MetaData: 1\r\n\r\n", - session.pathWithQuery, session.host) + req := fmt.Sprintf("GET %s HTTP/1.0\r\nUser-Agent: %s\r\nAccept: */*\r\nRange: byte=0-\r\nConnection: close\r\nHost: %s\r\nIcy-MetaData: 1\r\n\r\n", + session.pathWithQuery, base.LALHTTPFLVPullSessionUA, session.host) _, err := session.Conn.Write([]byte(req)) return err } diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index 80b03fc..f0c07c7 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -14,6 +14,8 @@ import ( "strings" "time" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/nazahttp" "github.com/q191201771/naza/pkg/connection" @@ -22,15 +24,7 @@ import ( "github.com/q191201771/naza/pkg/unique" ) -var flvHTTPResponseHeaderStr = "HTTP/1.1 200 OK\r\n" + - "Cache-Control: no-cache\r\n" + - "Content-Type: video/x-flv\r\n" + - "Connection: close\r\n" + - "Expires: -1\r\n" + - "Pragma: no-cache\r\n" + - "\r\n" - -var flvHTTPResponseHeader = []byte(flvHTTPResponseHeaderStr) +var flvHTTPResponseHeader []byte type SubSession struct { UniqueKey string @@ -48,8 +42,7 @@ type SubSession struct { func NewSubSession(conn net.Conn) *SubSession { uk := unique.GenUniqueKey("FLVSUB") - nazalog.Infof("[%s] lifecycle new SubSession. addr=%s", uk, conn.RemoteAddr().String()) - return &SubSession{ + s := &SubSession{ UniqueKey: uk, IsFresh: true, conn: connection.New(conn, func(option *connection.Option) { @@ -58,6 +51,8 @@ func NewSubSession(conn net.Conn) *SubSession { option.WriteTimeoutMS = subSessionWriteTimeoutMS }), } + nazalog.Infof("[%s] lifecycle new httpflv SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String()) + return s } // TODO chef: read request timeout @@ -135,5 +130,19 @@ func (session *SubSession) WriteRawPacket(pkt []byte) { } func (session *SubSession) Dispose() { + nazalog.Infof("[%s] lifecycle dispose httpflv SubSession.", session.UniqueKey) _ = session.conn.Close() } + +func init() { + flvHTTPResponseHeaderStr := "HTTP/1.1 200 OK\r\n" + + "Server: " + base.LALHTTPFLVSubSessionServer + "\r\n" + + "Cache-Control: no-cache\r\n" + + "Content-Type: video/x-flv\r\n" + + "Connection: close\r\n" + + "Expires: -1\r\n" + + "Pragma: no-cache\r\n" + + "\r\n" + + flvHTTPResponseHeader = []byte(flvHTTPResponseHeaderStr) +} diff --git a/pkg/mpegts/mpegts.go b/pkg/mpegts/mpegts.go new file mode 100644 index 0000000..e7c3b2e --- /dev/null +++ b/pkg/mpegts/mpegts.go @@ -0,0 +1,118 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package mpegts + +// 每个TS文件都以固定的PAT,PMT开始 +var FixedFragmentHeader = []byte{ + /* TS */ + 0x47, 0x40, 0x00, 0x10, 0x00, + /* PSI */ + 0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00, + /* PAT */ + 0x00, 0x01, 0xf0, 0x01, + /* CRC */ + 0x2e, 0x70, 0x19, 0x05, + + /* stuffing 167 bytes */ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + + /* TS */ + 0x47, 0x50, 0x01, 0x10, 0x00, + /* PSI */ + 0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00, + /* PMT */ + 0xe1, 0x00, + 0xf0, 0x00, + 0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */ + 0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */ + /* CRC */ + 0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */ + /* stuffing 157 bytes */ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, +} + +// TS Packet Header +const ( + syncByte uint8 = 0x47 + + PidPAT uint16 = 0 + PidVideo uint16 = 0x100 + PidAudio uint16 = 0x101 + + // ------------------------------------------ + // <iso13818-1.pdf> <Table 2-5> <page 38/174> + // ------------------------------------------ + AdaptationFieldControlReserved uint8 = 0 // Reserved for future use by ISO/IEC + AdaptationFieldControlNo uint8 = 1 // No adaptation_field, payload only + AdaptationFieldControlOnly uint8 = 2 // Adaptation_field only, no payload + AdaptationFieldControlFollowed uint8 = 3 // Adaptation_field followed by payload +) + +// PMT +const ( + // ----------------------------------------------------------------------------- + // <iso13818-1.pdf> <Table 2-29 Stream type assignments> <page 66/174> + // 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax + // 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video + // ----------------------------------------------------------------------------- + streamTypeAAC uint8 = 0x0F + streamTypeAVC uint8 = 0x1B +) + +// PES +const ( + // ----------------------------------------------------------------- + // <iso13818-1.pdf> <Table 2-18-Stream_id assignments> <page 52/174> + // ----------------------------------------------------------------- + StreamIDAudio uint8 = 192 // 110x xxxx 0xC0 + StreamIDVideo uint8 = 224 // 1110 xxxx + + // ------------------------------ + // <iso13818-1.pdf> <page 53/174> + // ------------------------------ + PTSDTSFlags0 uint8 = 0 // no PTS no DTS + PTSDTSFlags1 uint8 = 1 // forbidden + PTSDTSFlags2 uint8 = 2 // only PTS + PTSDTSFlags3 uint8 = 3 // both PTS and DTS +) + +const ( + delay uint64 = 63000 // 700 ms PCR delay TODO chef: 具体作用? +) diff --git a/pkg/mpegts/mpegts_test.go b/pkg/mpegts/mpegts_test.go new file mode 100644 index 0000000..94caee7 --- /dev/null +++ b/pkg/mpegts/mpegts_test.go @@ -0,0 +1,28 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package mpegts_test + +import ( + "testing" + + "github.com/q191201771/lal/pkg/mpegts" + "github.com/q191201771/naza/pkg/nazalog" +) + +func TestParseFixedTSPacket(t *testing.T) { + h := mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeader) + nazalog.Debugf("%+v", h) + pat := mpegts.ParsePAT(mpegts.FixedFragmentHeader[5:]) + nazalog.Debugf("%+v", pat) + + h = mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeader[188:]) + nazalog.Debugf("%+v", h) + pmt := mpegts.ParsePMT(mpegts.FixedFragmentHeader[188+5:]) + nazalog.Debugf("%+v", pmt) +} diff --git a/pkg/mpegts/pack.go b/pkg/mpegts/pack.go new file mode 100644 index 0000000..7b6bb25 --- /dev/null +++ b/pkg/mpegts/pack.go @@ -0,0 +1,241 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package mpegts + +// MPEG: Moving Picture Experts Group + +type Frame struct { + PTS uint64 + DTS uint64 + Pid uint16 // PID of PES Header + Sid uint8 // stream_id of PES Header + CC uint8 // continuity_counter of TS Header + Key bool + Raw []byte +} + +// @param packet: 188字节大小的TS包,注意,一次Pack对应的多个TSPacket,复用的是一块内存 +// @param cc: 当前TS包的continuity_counter +// +type OnTSPacket func(packet []byte, cc uint8) + +// @param frame: frame.CC 注意,内部会修改frame.CC的值,外部在调用结束后,可保持CC的值,供下次调用时使用 +// frame.Pid 音频设置为mpegts.PidAudio,视频设置为mpegts.PidVideo +// frame.Sid 音频设置为mpegts.StreamIDAudio,视频设置为mpegts.StreamIDVideo +// frame.Key 是否是关键帧,一般用于视频格式,音频全部设置为false +// +// frame.Raw 如果是AVC类型,格式为AnnexB +// 如果是AAC类型,格式为2字节ADTS头加raw frame +// 函数内部不会持有该内存块 +// +// @param onTSPacket: 注意,一次函数调用,可能对应多次回调 +// +func PackTSPacket(frame *Frame, onTSPacket OnTSPacket) { + wpos := 0 // 当前packet的写入位置 + lpos := 0 // 当前帧的处理位置 + rpos := len(frame.Raw) // 当前帧大小 + first := true // 是否为帧的首个packet的标准 + packet := make([]byte, 188) + + for lpos != rpos { + wpos = 0 + frame.CC++ + + // 每个packet都需要添加TS Header + // -----TS Header---------------- + // sync_byte + // transport_error_indicator 0 + // payload_unit_start_indicator + // transport_priority 0 + // PID + // transport_scrambling_control 0 + // adaptation_field_control + // continuity_counter + // ------------------------------ + packet[0] = syncByte // sync_byte + packet[1] = 0x0 + if first { + packet[1] = 0x40 // payload_unit_start_indicator + } + packet[1] |= uint8((frame.Pid >> 8) & 0x1F) //PID高5位 + packet[2] = uint8(frame.Pid & 0xFF) //PID低8位 + + // adaptation_field_control 先设置成无Adaptation + // continuity_counter + packet[3] = 0x10 | (frame.CC & 0x0f) + wpos += 4 + + if first { + if frame.Key { + // 关键帧的首个packet需要添加Adaptation + // -----Adaptation----------------------- + // adaptation_field_length + // discontinuity_indicator 0 + // random_access_indicator 1 + // elementary_stream_priority_indicator 0 + // PCR_flag 1 + // OPCR_flag 0 + // splicing_point_flag 0 + // transport_private_data_flag 0 + // adaptation_field_extension_flag 0 + // program_clock_reference_base + // reserved + // program_clock_reference_extension + // -------------------------------------- + packet[3] |= 0x20 // adaptation_field_control 设置Adaptation + packet[4] = 7 // adaptation_field_length + packet[5] = 0x50 // random_access_indicator + PCR_flag + packPCR(packet[6:], frame.DTS-delay) // using 6 byte + wpos += 8 + } + + // 帧的首个packet需要添加PES Header + // -----PES Header------------ + // packet_start_code_prefix + // stream_id + // PES_packet_length + // '10' + // PES_scrambling_control 0 + // PES_priority 0 + // data_alignment_indicator 0 + // copyright 0 + // original_or_copy 0 + // PTS_DTS_flags + // ESCR_flag 0 + // ES_rate_flag 0 + // DSM_trick_mode_flag 0 + // additional_copy_info_flag 0 + // PES_CRC_flag 0 + // PES_extension_flag 0 + // PES_header_data_length + // --------------------------- + packet[wpos] = 0x00 // packet_start_code_prefix 24-bits + packet[wpos+1] = 0x00 // + packet[wpos+2] = 0x01 // + packet[wpos+3] = frame.Sid // stream_id + wpos += 4 + + // 计算PES Header中一些字段的值 + // PTS相关 + headerSize := uint8(5) + flags := uint8(0x80) + // DTS相关 + if frame.DTS != frame.PTS { + headerSize += 5 + flags |= 0x40 + } + + pesSize := rpos + int(headerSize) + 3 // PES Header剩余3字节 + PTS/PTS长度 + 整个帧的长度 + if pesSize > 0xFFFF { + pesSize = 0 + } + + packet[wpos] = uint8(pesSize >> 8) // PES_packet_length + packet[wpos+1] = uint8(pesSize & 0xFF) // + packet[wpos+2] = 0x80 // 除了reserve的'10',其他字段都是0 + packet[wpos+3] = flags // PTS/DTS flag + packet[wpos+4] = headerSize // PES_header_data_length: PTS+DTS数据长度 + wpos += 5 + + // 写入PTS的值 + packPTS(packet[wpos:], flags>>6, frame.PTS+delay) + wpos += 5 + // 写入DTS的值 + if frame.PTS != frame.DTS { + packPTS(packet[wpos:], 1, frame.DTS+delay) + wpos += 5 + } + + first = false + } + + // 把帧的内容切割放入packet中 + bodySize := 188 - wpos // 当前TS packet,可写入大小 + inSize := rpos - lpos // 整个帧剩余待打包大小 + + if bodySize <= inSize { + // 当前packet写不完这个帧,或者刚好够写完 + copy(packet[wpos:], frame.Raw[lpos:lpos+inSize]) + lpos += bodySize + } else { + // 当前packet可以写完这个帧,并且还有空闲空间 + // 此时,真实数据挪最后,中间用0xFF填充到Adaptation中 + // 注意,此时有两种情况 + // 1. 原本有Adaptation + // 2. 原本没有Adaptation + + stuffSize := bodySize - inSize // 当前TS packet的剩余空闲空间 + + if packet[3]&0x20 != 0 { + // has Adaptation + + base := int(4 + packet[4]) // TS Header + Adaptation + if wpos > base { + // 比如有PES Header + + copy(packet[base+stuffSize:], packet[base:wpos]) + } + wpos = base + stuffSize + + packet[4] += uint8(stuffSize) // adaptation_field_length + for i := 0; i < stuffSize; i++ { + packet[base+i] = 0xFF + } + } else { + // no Adaptation + + packet[3] |= 0x20 + + base := 4 + if wpos > base { + copy(packet[base+stuffSize:], packet[base:wpos]) + } + wpos += stuffSize + + packet[4] = uint8(stuffSize - 1) // adaptation_field_length + if stuffSize >= 2 { + // TODO chef 这里是参考nginx rtmp module的实现,为什么这个字节写0而不是0xFF + packet[5] = 0 + for i := 0; i < stuffSize-2; i++ { + packet[6+i] = 0xFF + } + } + } + + // 真实数据放在packet尾部 + copy(packet[wpos:], frame.Raw[lpos:lpos+inSize]) + lpos = rpos + } + + onTSPacket(packet, frame.CC) + } +} + +func packPCR(out []byte, pcr uint64) { + out[0] = uint8(pcr >> 25) + out[1] = uint8(pcr >> 17) + out[2] = uint8(pcr >> 9) + out[3] = uint8(pcr >> 1) + out[4] = uint8(pcr<<7) | 0x7e + out[5] = 0 +} + +// 注意,除PTS外,DTS也使用这个函数打包 +func packPTS(out []byte, fb uint8, pts uint64) { + var val uint64 + out[0] = (fb << 4) | (uint8(pts>>30) & 0x07) | 1 + + val = (((pts >> 15) & 0x7FFF) << 1) | 1 + out[1] = uint8(val >> 8) + out[2] = uint8(val) + + val = ((pts & 0x7FFF) << 1) | 1 + out[3] = uint8(val >> 8) + out[4] = uint8(val) +} diff --git a/pkg/hls/ts_pat.go b/pkg/mpegts/pat.go similarity index 99% rename from pkg/hls/ts_pat.go rename to pkg/mpegts/pat.go index 961edc7..3dbba44 100644 --- a/pkg/hls/ts_pat.go +++ b/pkg/mpegts/pat.go @@ -6,7 +6,7 @@ // // Author: Chef (191201771@qq.com) -package hls +package mpegts import ( "github.com/q191201771/naza/pkg/nazabits" diff --git a/pkg/hls/ts_pes.go b/pkg/mpegts/pes.go similarity index 99% rename from pkg/hls/ts_pes.go rename to pkg/mpegts/pes.go index de88d17..21c8a6e 100644 --- a/pkg/hls/ts_pes.go +++ b/pkg/mpegts/pes.go @@ -6,7 +6,7 @@ // // Author: Chef (191201771@qq.com) -package hls +package mpegts import ( "github.com/q191201771/naza/pkg/nazabits" diff --git a/pkg/hls/ts_pmt.go b/pkg/mpegts/pmt.go similarity index 99% rename from pkg/hls/ts_pmt.go rename to pkg/mpegts/pmt.go index 1110ffb..87bda42 100644 --- a/pkg/hls/ts_pmt.go +++ b/pkg/mpegts/pmt.go @@ -6,7 +6,7 @@ // // Author: Chef (191201771@qq.com) -package hls +package mpegts import ( "github.com/q191201771/naza/pkg/nazabits" diff --git a/pkg/hls/ts_header.go b/pkg/mpegts/ts_packet_header.go similarity index 99% rename from pkg/hls/ts_header.go rename to pkg/mpegts/ts_packet_header.go index 26e8065..631b7e1 100644 --- a/pkg/hls/ts_header.go +++ b/pkg/mpegts/ts_packet_header.go @@ -6,7 +6,7 @@ // // Author: Chef (191201771@qq.com) -package hls +package mpegts import ( "github.com/q191201771/naza/pkg/nazabits" diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 1358ac2..25dc989 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -9,7 +9,6 @@ package rtmp import ( - "encoding/hex" "errors" "net" "net/url" @@ -84,14 +83,13 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) case CSTPushSession: uk = unique.GenUniqueKey("RTMPPUSH") } - log.Infof("[%s] lifecycle new rtmp client session.", uk) option := defaultClientSessOption for _, fn := range modOptions { fn(&option) } - return &ClientSession{ + s := &ClientSession{ UniqueKey: uk, t: t, option: option, @@ -99,6 +97,8 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) packer: NewMessagePacker(), chunkComposer: NewChunkComposer(), } + log.Infof("[%s] lifecycle new rtmp ClientSession. session=%p", uk, s) + return s } // 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误 @@ -141,7 +141,7 @@ func (s *ClientSession) do(rawURL string) <-chan error { } log.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName) - if err := s.packer.writeConnect(s.conn, s.appName, s.tcURL); err != nil { + if err := s.packer.writeConnect(s.conn, s.appName, s.tcURL, s.t == CSTPushSession); err != nil { ch <- err return ch } @@ -173,7 +173,7 @@ func (s *ClientSession) Flush() error { } func (s *ClientSession) Dispose() { - log.Infof("[%s] lifecycle dispose rtmp client session.", s.UniqueKey) + log.Infof("[%s] lifecycle dispose rtmp ClientSession.", s.UniqueKey) _ = s.conn.Close() } @@ -222,9 +222,7 @@ func (s *ClientSession) doDataMessageAMF0(stream *Stream) error { switch val { case "|RtmpSampleAccess": - // TODO chef: handle this? - log.Error(val) - log.Error(hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e])) + log.Debugf("[%s] < R |RtmpSampleAccess, ignore.", s.UniqueKey) return nil default: } diff --git a/pkg/rtmp/handshake.go b/pkg/rtmp/handshake.go index 86e1fcd..4be7d82 100644 --- a/pkg/rtmp/handshake.go +++ b/pkg/rtmp/handshake.go @@ -12,11 +12,14 @@ import ( "bytes" "crypto/hmac" "crypto/sha256" + "fmt" "io" "time" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/naza/pkg/bele" - log "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/nazalog" ) // https://pengrl.com/p/20027 @@ -222,7 +225,7 @@ func parseChallenge(c0c1 []byte) []byte { //} ver := bele.BEUint32(c0c1[5:]) if ver == 0 { - log.Debug("handshake simple mode.") + nazalog.Debug("handshake simple mode.") return nil } @@ -231,10 +234,10 @@ func parseChallenge(c0c1 []byte) []byte { offs = findDigest(c0c1[1:], 8, clientKey[:clientPartKeyLen]) } if offs == -1 { - log.Warn("get digest offs failed. roll back to try simple handshake.") + nazalog.Warn("get digest offs failed. roll back to try simple handshake.") return nil } - log.Debug("handshake complex mode.") + nazalog.Debug("handshake complex mode.") // use c0c1 digest to make a new digest digest := makeDigest(c0c1[1+offs:1+offs+keyLen], serverKey[:serverFullKeyLen]) @@ -282,10 +285,8 @@ func random1528(out []byte) { } func init() { - bs := []byte{'l', 'a', 'l'} - bsl := len(bs) random1528Buf = make([]byte, 1528) - for i := 0; i < 1528; i++ { - random1528Buf[i] = bs[i%bsl] - } + hack := fmt.Sprintf("random buf of rtmp handshake gen by %s", base.LALRTMPHandshakeWaterMark) + copy(random1528Buf, []byte(hack)) + nazalog.Debug(len(random1528Buf)) } diff --git a/pkg/rtmp/message_packer.go b/pkg/rtmp/message_packer.go index 6a857ad..3e764a0 100644 --- a/pkg/rtmp/message_packer.go +++ b/pkg/rtmp/message_packer.go @@ -14,6 +14,7 @@ package rtmp import ( "bytes" + "fmt" "io" "github.com/q191201771/lal/pkg/base" @@ -76,17 +77,23 @@ func (packer *MessagePacker) writePeerBandwidth(writer io.Writer, val int, limit return err } -func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL string) error { +// @param isPush: 推流为true,拉流为false +func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL string, isPush bool) error { packer.writeMessageHeader(csidOverConnection, 0, base.RTMPTypeIDCommandMessageAMF0, 0) _ = AMF0.WriteString(packer.b, "connect") _ = AMF0.WriteNumber(packer.b, float64(tidClientConnect)) - objs := []ObjectPair{ - {Key: "app", Value: appName}, - {Key: "type", Value: "nonprivate"}, - {Key: "flashVer", Value: "FMLE/3.0 (compatible; Lal0.0.1)"}, - {Key: "tcUrl", Value: tcURL}, + var objs []ObjectPair + objs = append(objs, ObjectPair{Key: "app", Value: appName}) + objs = append(objs, ObjectPair{Key: "type", Value: "nonprivate"}) + var flashVer string + if isPush { + flashVer = fmt.Sprintf("FMLE/3.0 (compatible; %s)", base.LALRTMPPushSessionConnectVersion) + } else { + flashVer = "LNX 9,0,124,2" } + objs = append(objs, ObjectPair{Key: "flashVer", Value: flashVer}) + objs = append(objs, ObjectPair{Key: "tcUrl", Value: tcURL}) _ = AMF0.WriteObject(packer.b, objs) raw := packer.b.Bytes() bele.BEPutUint24(raw[4:], uint32(len(raw)-12)) @@ -95,7 +102,7 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL strin } func (packer *MessagePacker) writeConnectResult(writer io.Writer, tid int) error { - packer.writeMessageHeader(csidOverConnection, 190, base.RTMPTypeIDCommandMessageAMF0, 0) + packer.writeMessageHeader(csidOverConnection, 0, base.RTMPTypeIDCommandMessageAMF0, 0) _ = AMF0.WriteString(packer.b, "_result") _ = AMF0.WriteNumber(packer.b, float64(tid)) objs := []ObjectPair{ @@ -108,8 +115,11 @@ func (packer *MessagePacker) writeConnectResult(writer io.Writer, tid int) error {Key: "code", Value: "NetConnection.Connect.Success"}, {Key: "description", Value: "Connection succeeded."}, {Key: "objectEncoding", Value: 0}, + {Key: "version", Value: base.LALRTMPConnectResultVersion}, } _ = AMF0.WriteObject(packer.b, objs) + raw := packer.b.Bytes() + bele.BEPutUint24(raw[4:], uint32(len(raw)-12)) _, err := packer.b.WriteTo(writer) return err } diff --git a/pkg/rtmp/message_packer_test.go b/pkg/rtmp/message_packer_test.go index d747313..5fe08d2 100644 --- a/pkg/rtmp/message_packer_test.go +++ b/pkg/rtmp/message_packer_test.go @@ -51,25 +51,16 @@ func TestWrite(t *testing.T) { assert.Equal(t, []byte{2, 0, 0, 0, 0, 0, 5, 6, 0, 0, 0, 0, 0, 0, 0, 1, 2}, buf.Bytes()) buf.Reset() - err = packer.writeConnect(buf, "live", "rtmp://127.0.0.1/live") + err = packer.writeConnect(buf, "live", "rtmp://127.0.0.1/live", true) assert.Equal(t, nil, err) - result = []byte{ - 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x81, 0x14, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x07, 0x63, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, - 0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x04, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x04, 0x74, 0x79, - 0x70, 0x65, 0x02, 0x00, 0x0a, 0x6e, 0x6f, 0x6e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x00, - 0x08, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x56, 0x65, 0x72, 0x02, 0x00, 0x1f, 0x46, 0x4d, 0x4c, 0x45, - 0x2f, 0x33, 0x2e, 0x30, 0x20, 0x28, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x69, 0x62, 0x6c, 0x65, - 0x3b, 0x20, 0x4c, 0x61, 0x6c, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x29, 0x00, 0x05, 0x74, 0x63, 0x55, - 0x72, 0x6c, 0x02, 0x00, 0x15, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x31, 0x32, 0x37, 0x2e, - 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x00, 0x09, - } + result = []byte{0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x82, 0x14, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x04, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x04, 0x74, 0x79, 0x70, 0x65, 0x02, 0x00, 0x0a, 0x6e, 0x6f, 0x6e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x00, 0x08, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x56, 0x65, 0x72, 0x02, 0x00, 0x20, 0x46, 0x4d, 0x4c, 0x45, 0x2f, 0x33, 0x2e, 0x30, 0x20, 0x28, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x3b, 0x20, 0x6c, 0x61, 0x6c, 0x30, 0x2e, 0x31, 0x33, 0x2e, 0x30, 0x29, 0x00, 0x05, 0x74, 0x63, 0x55, 0x72, 0x6c, 0x02, 0x00, 0x15, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x31, 0x32, 0x37, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x00, 0x09} + assert.Equal(t, result, buf.Bytes()) buf.Reset() err = packer.writeConnectResult(buf, 1) assert.Equal(t, nil, err) - result = []byte{0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0xbe, 0x14, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x7, 0x5f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x0, 0x3f, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x0, 0x6, 0x66, 0x6d, 0x73, 0x56, 0x65, 0x72, 0x2, 0x0, 0xd, 0x46, 0x4d, 0x53, 0x2f, 0x33, 0x2c, 0x30, 0x2c, 0x31, 0x2c, 0x31, 0x32, 0x33, 0x0, 0xc, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x0, 0x40, 0x3f, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x9, 0x3, 0x0, 0x5, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x2, 0x0, 0x6, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x0, 0x4, 0x63, 0x6f, 0x64, 0x65, 0x2, 0x0, 0x1d, 0x4e, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x0, 0xb, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x2, 0x0, 0x15, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x20, 0x73, 0x75, 0x63, 0x63, 0x65, 0x65, 0x64, 0x65, 0x64, 0x2e, 0x0, 0xe, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x9} + result = []byte{0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0xd0, 0x14, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x07, 0x5f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x06, 0x66, 0x6d, 0x73, 0x56, 0x65, 0x72, 0x02, 0x00, 0x0d, 0x46, 0x4d, 0x53, 0x2f, 0x33, 0x2c, 0x30, 0x2c, 0x31, 0x2c, 0x31, 0x32, 0x33, 0x00, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x00, 0x40, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x03, 0x00, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x02, 0x00, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x00, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x02, 0x00, 0x1d, 0x4e, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x00, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x02, 0x00, 0x15, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x20, 0x73, 0x75, 0x63, 0x63, 0x65, 0x65, 0x64, 0x65, 0x64, 0x2e, 0x00, 0x0e, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02, 0x00, 0x06, 0x30, 0x2c, 0x31, 0x33, 0x2c, 0x30, 0x00, 0x00, 0x09} assert.Equal(t, result, buf.Bytes()) buf.Reset() @@ -139,7 +130,7 @@ func TestPackCorner(t *testing.T) { assert.IsNotNil(t, err) err = packer.writePeerBandwidth(mw, 1, 2) assert.IsNotNil(t, err) - err = packer.writeConnect(mw, "live", "rtmp://127.0.0.1/live") + err = packer.writeConnect(mw, "live", "rtmp://127.0.0.1/live", true) assert.IsNotNil(t, err) err = packer.writeConnectResult(mw, 1) assert.IsNotNil(t, err) @@ -161,6 +152,6 @@ func BenchmarkMessagePacker(b *testing.B) { mw := fake.NewWriter(fake.WriterTypeDoNothing) packer := NewMessagePacker() for i := 0; i < b.N; i++ { - _ = packer.writeConnect(mw, "live", "rtmp://127.0.0.1/live") + _ = packer.writeConnect(mw, "live", "rtmp://127.0.0.1/live", true) } } diff --git a/pkg/rtmp/metadata.go b/pkg/rtmp/metadata.go index c968f7b..ad9c932 100644 --- a/pkg/rtmp/metadata.go +++ b/pkg/rtmp/metadata.go @@ -72,7 +72,7 @@ func BuildMetadata(width int, height int, audiocodecid int, videocodecid int) ([ opa = append(opa, ObjectPair{ Key: "version", - Value: base.LALFullInfo, + Value: base.LALRTMPBuildMetadataEncoder, }) if err := AMF0.WriteObject(buf, opa); err != nil { diff --git a/pkg/rtmp/metadata_test.go b/pkg/rtmp/metadata_test.go index 4d4b1cc..44cf7eb 100644 --- a/pkg/rtmp/metadata_test.go +++ b/pkg/rtmp/metadata_test.go @@ -36,5 +36,5 @@ func TestMetadata(t *testing.T) { v = opa.Find("videocodecid") assert.Equal(t, float64(7), v.(float64)) v = opa.Find("version") - assert.Equal(t, base.LALFullInfo, v.(string)) + assert.Equal(t, base.LALRTMPBuildMetadataEncoder, v.(string)) } diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index 5196624..d370bb5 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -69,8 +69,7 @@ type ServerSession struct { func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession { uk := unique.GenUniqueKey("RTMPPUBSUB") - nazalog.Infof("[%s] lifecycle new rtmp server session. addr=%s", uk, conn.RemoteAddr().String()) - return &ServerSession{ + s := &ServerSession{ conn: connection.New(conn, func(option *connection.Option) { option.ReadBufSize = readBufSize }), @@ -81,6 +80,8 @@ func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession { packer: NewMessagePacker(), IsFresh: true, } + nazalog.Infof("[%s] lifecycle new rtmp ServerSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String()) + return s } func (s *ServerSession) RunLoop() (err error) { @@ -101,7 +102,7 @@ func (s *ServerSession) Flush() error { } func (s *ServerSession) Dispose() { - nazalog.Infof("[%s] lifecycle dispose rtmp server session.", s.UniqueKey) + nazalog.Infof("[%s] lifecycle dispose rtmp ServerSession.", s.UniqueKey) _ = s.conn.Close() } diff --git a/pkg/rtsp/pack.go b/pkg/rtsp/pack.go index d287d8d..e8fedb4 100644 --- a/pkg/rtsp/pack.go +++ b/pkg/rtsp/pack.go @@ -22,9 +22,8 @@ import ( // rfc2326 10.1 OPTIONS // CSeq var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\n" + - "Server: " + base.LALFullInfo + "\r\n" + + "Server: " + base.LALRTSPOptionsResponseServer + "\r\n" + "CSeq: %s\r\n" + - //"Public:DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE\r\n\r\n" "Public:DESCRIBE, ANNOUNCE, SETUP, PLAY, PAUSE, RECORD, TEARDOWN\r\n" + "\r\n" diff --git a/pkg/rtsp/pub_session.go b/pkg/rtsp/pub_session.go index 3114aab..d046e15 100644 --- a/pkg/rtsp/pub_session.go +++ b/pkg/rtsp/pub_session.go @@ -57,6 +57,7 @@ func NewPubSession(streamName string) *PubSession { StreamName: streamName, } ps.avPacketQueue = NewAVPacketQueue(ps.onAVPacket) + nazalog.Infof("[%s] lifecycle new rtsp PubSession. session=%p, streamName=%s", uk, ps, streamName) return ps } diff --git a/showdeps.sh b/showdeps.sh index c2e4a9d..d5da85d 100755 --- a/showdeps.sh +++ b/showdeps.sh @@ -11,13 +11,13 @@ for d in $(go list ./pkg/...); do # 只看依赖lal自身的哪些package # package依赖自身这个package的过滤掉 # 依赖pkg/base这个基础package的过滤掉 - #go list -deps $d | grep 'q191201771/lal' | grep -v $d | grep -v 'q191201771/lal/pkg/base' + go list -deps $d | grep 'q191201771/lal' | grep -v $d | grep -v 'q191201771/lal/pkg/base' #go list -deps $d | grep 'q191201771/lal' | grep -v $d - go list -deps $d | grep 'q191201771/naza' | grep -v $d + #go list -deps $d | grep 'q191201771/naza' | grep -v $d done -for d in $(go list ./app/...); do - echo "-----"$d"-----" +#for d in $(go list ./app/...); do + #echo "-----"$d"-----" #go list -deps $d | grep 'q191201771/lal' | grep -v $d - go list -deps $d | grep 'q191201771/naza' | grep -v $d -done + #go list -deps $d | grep 'q191201771/naza' | grep -v $d +#done