messages:

- [refactor] 新增package mpegts,将部分package hls中代码抽离出来
- [feat] 在各协议的标准字段中写入lal版本信息
- [log] 整理所有session的日志
pull/29/head
q191201771 5 years ago
parent f6869f2a72
commit 2b67df6d12

@ -219,6 +219,7 @@ func analysisVideoTag(tag httpflv.Tag) {
// 注意SEI的内容是自定义格式解析的代码不具有通用性
func SEIDelayMS(seiNALU []byte) int {
nazalog.Debugf("sei: %s", hex.Dump(seiNALU))
items := strings.Split(string(seiNALU), ":")
if len(items) != 3 {
return -1

@ -11,6 +11,8 @@ package main
import (
"io/ioutil"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/nazalog"
)
@ -18,8 +20,8 @@ import (
// 学习如何解析TS文件。注意该程序还没有写完。
var (
pat hls.PAT
pmt hls.PMT
pat mpegts.PAT
pmt mpegts.PMT
pid2stream map[uint16]*Stream
)
@ -29,27 +31,27 @@ type Stream struct {
var filename = "/Volumes/Data/nrm-0.ts"
func handlePacket(packet []byte) {
h := hls.ParseTSPacketHeader(packet)
h := mpegts.ParseTSPacketHeader(packet)
index := 4
nazalog.Debugf("%+v", h)
var adaptation hls.TSPacketAdaptation
var adaptation mpegts.TSPacketAdaptation
switch h.Adaptation {
case hls.AdaptationFieldControlNo:
case mpegts.AdaptationFieldControlNo:
// noop
case hls.AdaptationFieldControlFollowed:
adaptation = hls.ParseTSPacketAdaptation(packet[4:])
case mpegts.AdaptationFieldControlFollowed:
adaptation = mpegts.ParseTSPacketAdaptation(packet[4:])
index++
default:
nazalog.Warn(h.Adaptation)
}
index += int(adaptation.Length)
if h.Pid == hls.PidPAT {
if h.Pid == mpegts.PidPAT {
if h.PayloadUnitStart == 1 {
index++
}
pat = hls.ParsePAT(packet[index:])
pat = mpegts.ParsePAT(packet[index:])
nazalog.Debugf("%+v", pat)
return
}
@ -58,7 +60,7 @@ func handlePacket(packet []byte) {
if h.PayloadUnitStart == 1 {
index++
}
pmt = hls.ParsePMT(packet[index:])
pmt = mpegts.ParsePMT(packet[index:])
nazalog.Debugf("%+v", pmt)
for _, ele := range pmt.ProgramElements {
@ -74,7 +76,7 @@ func handlePacket(packet []byte) {
// 判断是否有PES
if h.PayloadUnitStart == 1 {
pes, length := hls.ParsePES(packet[index:])
pes, length := mpegts.ParsePES(packet[index:])
nazalog.Debugf("%+v, %d", pes, length)
}
}

@ -13,19 +13,21 @@ import (
"encoding/hex"
"io/ioutil"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/nazalog"
)
// 临时小工具比较两个TS文件。注意该程序还没有写完。
var filename1 = "/Volumes/Data/tmp/lal-4.ts"
var filename2 = "/Volumes/Data/tmp/nrm-4.ts"
var filename1 = "/tmp/lal/hls/innertest/innertest-0.ts"
var filename2 = "/tmp/lal/hls/new/innertest-0.ts"
func skipPacketFilter(tss [][]byte) (ret [][]byte) {
for _, ts := range tss {
h := hls.ParseTSPacketHeader(ts)
if h.Pid == hls.PidAudio {
h := mpegts.ParseTSPacketHeader(ts)
if h.Pid == mpegts.PidAudio {
continue
}
ret = append(ret, ts)
@ -34,16 +36,16 @@ func skipPacketFilter(tss [][]byte) (ret [][]byte) {
}
func parsePacket(packet []byte) {
h := hls.ParseTSPacketHeader(packet)
h := mpegts.ParseTSPacketHeader(packet)
nazalog.Debugf("%+v", h)
index := 4
var adaptation hls.TSPacketAdaptation
var adaptation mpegts.TSPacketAdaptation
switch h.Adaptation {
case hls.AdaptationFieldControlNo:
case mpegts.AdaptationFieldControlNo:
// noop
case hls.AdaptationFieldControlFollowed:
adaptation = hls.ParseTSPacketAdaptation(packet[4:])
case mpegts.AdaptationFieldControlFollowed:
adaptation = mpegts.ParseTSPacketAdaptation(packet[4:])
index++
default:
nazalog.Warn(h.Adaptation)
@ -51,7 +53,7 @@ func parsePacket(packet []byte) {
index += int(adaptation.Length)
if h.PayloadUnitStart == 1 && h.Pid == 256 {
pes, length := hls.ParsePES(packet[index:])
pes, length := mpegts.ParsePES(packet[index:])
nazalog.Debugf("%+v, %d", pes, length)
}
}

@ -17,7 +17,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// Annex B:
// AnnexB:
// keywords: MPEG-2 transport stream, ElementaryStream(ES),
// nalu with start code.
// e.g. ts

@ -8,24 +8,99 @@
package base
import "strings"
// 版本信息相关
// lal的一部分版本信息使用了naza.bininfo手段是获取git信息。
// 另外,我们也在本文件提供另外一些信息:
// lal的一部分版本信息使用了naza.bininfo
// 另外,我们也在本文件提供另外一些信息
// 并且将这些信息打入可执行文件、日志、各协议中的标准版本字段中
// 版本该变量由build脚本修改维护
var LALVersion = "v0.13.0"
// 以下字段固定不变
var (
LALLibraryName = "lal"
LALGitHubRepo = "github.com/q191201771/lal"
LALFullInfo = LALLibraryName + " " + LALVersion + " (" + LALGitHubRepo + ")"
//LALServerName = "lalserver"
//LALGitHubRepoURL = "https://github.com/q191201771/lal"
// e.g. lal v0.12.3 (github.com/q191201771/lal)
LALFullInfo = LALLibraryName + " " + LALVersion + " (" + LALGitHubRepo + ")"
// e.g. 0.12.3
LALVersionDot string
// e.g. 0,12,3
LALVersionComma string
)
var (
// 植入rtmp握手随机字符串中
// e.g. lal v0.12.3 (github.com/q191201771/lal)
LALRTMPHandshakeWaterMark string
// 植入rtmp server中的connect result信令中
// 注意有两个object第一个object中的fmsVer我们保持通用公认的值在第二个object中植入
// e.g. 0,12,3
LALRTMPConnectResultVersion string
// e.g. lal0.12.3
LALRTMPPushSessionConnectVersion string
// e.g. lal0.12.3
LALRTMPBuildMetadataEncoder string
// e.g. lal/0.12.3
LALHTTPFLVPullSessionUA string
// e.g. lal0.12.3
LALHTTPFLVSubSessionServer string
// e.g. lal0.12.3
LALHLSM3U8Server string
// e.g. lal0.12.3
LALHLSTSServer string
// e.g. lal0.12.3
LALRTSPOptionsResponseServer string
)
// 作为HTTP客户端时考虑User-Agent
// - rtmp handshake random buf
// - rtmp server
// - rtmp message connect result
// - cdnws: 第一个obj: `fmsVer: FMS/3,0,1,123` 第二个obj: `version: x,x,x,xxx`
// - rtmp client
// - rtmp message connect
// - ffmpeg push: `flashVer: FMLE/3.0 (compatible; Lavf57.83.100)`
// - ffmpeg pull: `flashVer: LNX 9,0,124,2` -- emulated Flash client version - 9.0.124.2 on Linux
// - rtmp/flv build metadata
// - encoder: Lavf57.83.100
// - httpflv pull
// - wget: User-Agent: Wget/1.19.1 (darwin15.6.0)
// - httpflv sub
// - `server:`
// - hls
// - m3u8
// - `Server:`
// - ts
// - `Server:`
// - rtsp
// - Options response `Server:`
//
// RTMP metadata
// description : Bilibili VXCode Swarm Transcoder v0.2.30(gap_fixed:False)
// encoder : Lavf57.83.100
func init() {
LALVersionDot = strings.TrimPrefix(LALVersion, "v")
LALVersionComma = strings.ReplaceAll(LALVersionDot, ".", ",")
LALRTMPConnectResultVersion = LALVersionComma
LALRTMPPushSessionConnectVersion = LALLibraryName + LALVersionDot
LALRTMPBuildMetadataEncoder = LALLibraryName + LALVersionDot
LALHTTPFLVSubSessionServer = LALLibraryName + LALVersionDot
LALHLSM3U8Server = LALLibraryName + LALVersionDot
LALHLSTSServer = LALLibraryName + LALVersionDot
LALRTSPOptionsResponseServer = LALLibraryName + LALVersionDot
LALHTTPFLVPullSessionUA = LALLibraryName + "/" + LALVersionDot
LALRTMPHandshakeWaterMark = LALFullInfo
}

@ -10,243 +10,33 @@ package hls
import (
"os"
)
type FragmentOP struct {
fp *os.File
packet []byte //WriteFrame中缓存每个TS包数据
}
"github.com/q191201771/lal/pkg/mpegts"
)
type mpegTSFrame struct {
pts uint64
dts uint64
pid uint16
sid uint8
cc uint8
key bool // 关键帧
type Fragment struct {
fp *os.File
}
func (f *FragmentOP) OpenFile(filename string) (err error) {
func (f *Fragment) OpenFile(filename string) (err error) {
f.fp, err = os.Create(filename)
if err != nil {
return
}
f.writeFile(FixedFragmentHeader)
//TS包固定188-byte
f.packet = make([]byte, 188)
f.writeFile(mpegts.FixedFragmentHeader)
return nil
}
func (f *FragmentOP) WriteFrame(frame *mpegTSFrame, b []byte) {
//nazalog.Debugf("mpegts: pid=%d, sid=%d, pts=%d, dts=%d, key=%b, size=%d", frame.pid, frame.sid, frame.pts, frame.dts, frame.key, len(b))
wpos := 0 // 当前packet的写入位置
lpos := 0 // 当前帧的处理位置
rpos := len(b) // 当前帧大小
first := true // 是否为帧的首个packet的标准
for lpos != rpos {
wpos = 0
frame.cc++
// 每个packet都需要添加TS Header
// -----TS Header----------------
// sync_byte
// transport_error_indicator 0
// payload_unit_start_indicator
// transport_priority 0
// PID
// transport_scrambling_control 0
// adaptation_field_control
// continuity_counter
// ------------------------------
f.packet[0] = syncByte // sync_byte
f.packet[1] = 0x0
if first {
f.packet[1] = 0x40 // payload_unit_start_indicator
}
f.packet[1] |= uint8((frame.pid >> 8) & 0x1F) //PID高5位
f.packet[2] = uint8(frame.pid & 0xFF) //PID低8位
// adaptation_field_control 先设置成无Adaptation
// continuity_counter
f.packet[3] = 0x10 | (frame.cc & 0x0f)
wpos += 4
if first {
if frame.key {
// 关键帧的首个packet需要添加Adaptation
// -----Adaptation-----------------------
// adaptation_field_length
// discontinuity_indicator 0
// random_access_indicator 1
// elementary_stream_priority_indicator 0
// PCR_flag 1
// OPCR_flag 0
// splicing_point_flag 0
// transport_private_data_flag 0
// adaptation_field_extension_flag 0
// program_clock_reference_base
// reserved
// program_clock_reference_extension
// --------------------------------------
f.packet[3] |= 0x20 // adaptation_field_control 设置Adaptation
f.packet[4] = 7 // adaptation_field_length
f.packet[5] = 0x50 // random_access_indicator + PCR_flag
mpegtsdWritePCR(f.packet[6:], frame.dts-delay) // using 6 byte
wpos += 8
}
// 帧的首个packet需要添加PES Header
// -----PES Header------------
// packet_start_code_prefix
// stream_id
// PES_packet_length
// '10'
// PES_scrambling_control 0
// PES_priority 0
// data_alignment_indicator 0
// copyright 0
// original_or_copy 0
// PTS_DTS_flags
// ESCR_flag 0
// ES_rate_flag 0
// DSM_trick_mode_flag 0
// additional_copy_info_flag 0
// PES_CRC_flag 0
// PES_extension_flag 0
// PES_header_data_length
// ---------------------------
f.packet[wpos] = 0x00 // packet_start_code_prefix 24-bits
f.packet[wpos+1] = 0x00 //
f.packet[wpos+2] = 0x01 //
f.packet[wpos+3] = frame.sid // stream_id
wpos += 4
// 计算PES Header中一些字段的值
// PTS相关
headerSize := uint8(5)
flags := uint8(0x80)
// DTS相关
if frame.dts != frame.pts {
headerSize += 5
flags |= 0x40
}
pesSize := rpos + int(headerSize) + 3 // PES Header剩余3字节 + PTS/PTS长度 + 整个帧的长度
if pesSize > 0xFFFF {
pesSize = 0
}
f.packet[wpos] = uint8(pesSize >> 8) // PES_packet_length
f.packet[wpos+1] = uint8(pesSize & 0xFF) //
f.packet[wpos+2] = 0x80 // 除了reserve的'10'其他字段都是0
f.packet[wpos+3] = flags // PTS/DTS flag
f.packet[wpos+4] = headerSize // PES_header_data_length: PTS+DTS数据长度
wpos += 5
// 写入PTS的值
mpegtsWritePTS(f.packet[wpos:], flags>>6, frame.pts+delay)
wpos += 5
// 写入DTS的值
if frame.pts != frame.dts {
mpegtsWritePTS(f.packet[wpos:], 1, frame.dts+delay)
wpos += 5
}
first = false
}
// 把帧的内容切割放入packet中
bodySize := 188 - wpos // 当前TS packet可写入大小
inSize := rpos - lpos // 整个帧剩余待打包大小
if bodySize <= inSize {
// 当前packet写不完这个帧或者刚好够写完
copy(f.packet[wpos:], b[lpos:lpos+inSize])
lpos += bodySize
} else {
// 当前packet可以写完这个帧并且还有空闲空间
// 此时真实数据挪最后中间用0xFF填充到Adaptation中
// 注意,此时有两种情况
// 1. 原本有Adaptation
// 2. 原本没有Adaptation
stuffSize := bodySize - inSize // 当前TS packet的剩余空闲空间
if f.packet[3]&0x20 != 0 {
// has Adaptation
base := int(4 + f.packet[4]) // TS Header + Adaptation
if wpos > base {
// 比如有PES Header
copy(f.packet[base+stuffSize:], f.packet[base:wpos])
}
wpos = base + stuffSize
f.packet[4] += uint8(stuffSize) // adaptation_field_length
for i := 0; i < stuffSize; i++ {
f.packet[base+i] = 0xFF
}
} else {
// no Adaptation
f.packet[3] |= 0x20
base := 4
if wpos > base {
copy(f.packet[base+stuffSize:], f.packet[base:wpos])
}
wpos += stuffSize
f.packet[4] = uint8(stuffSize - 1) // adaptation_field_length
if stuffSize >= 2 {
// TODO chef 这里是参考nginx rtmp module的实现为什么这个字节写0而不是0xFF
f.packet[5] = 0
for i := 0; i < stuffSize-2; i++ {
f.packet[6+i] = 0xFF
}
}
}
// 真实数据放在packet尾部
copy(f.packet[wpos:], b[lpos:lpos+inSize])
lpos = rpos
}
f.writeFile(f.packet)
}
func (f *Fragment) WriteFrame(frame *mpegts.Frame) {
mpegts.PackTSPacket(frame, func(packet []byte, cc uint8) {
f.writeFile(packet)
})
}
func (f *FragmentOP) CloseFile() {
func (f *Fragment) CloseFile() {
_ = f.fp.Close()
}
func (f *FragmentOP) writeFile(b []byte) {
func (f *Fragment) writeFile(b []byte) {
_, _ = f.fp.Write(b)
}
func mpegtsdWritePCR(out []byte, pcr uint64) {
out[0] = uint8(pcr >> 25)
out[1] = uint8(pcr >> 17)
out[2] = uint8(pcr >> 9)
out[3] = uint8(pcr >> 1)
out[4] = uint8(pcr<<7) | 0x7e
out[5] = 0
}
// write PTS or DTS
func mpegtsWritePTS(out []byte, fb uint8, pts uint64) {
var val uint64
out[0] = (fb << 4) | (uint8(pts>>30) & 0x07) | 1
val = (((pts >> 15) & 0x7FFF) << 1) | 1
out[1] = uint8(val >> 8)
out[2] = uint8(val)
val = ((pts & 0x7FFF) << 1) | 1
out[3] = uint8(val >> 8)
out[4] = uint8(val)
}

@ -8,8 +8,6 @@
package hls
// 声明本package参考了c语言实现的开源项目nginx-rtmp-module
// TODO chef:
// - 支持HEVC
// - 检查所有的容错处理,是否会出现
@ -31,123 +29,15 @@ package hls
// 进来的数据称为Frame帧188字节的封装称为TSPacket包TS文件称为Fragment
// 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeader = []byte{
/* TS */
0x47, 0x40, 0x00, 0x10, 0x00,
/* PSI */
0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00,
/* PAT */
0x00, 0x01, 0xf0, 0x01,
/* CRC */
0x2e, 0x70, 0x19, 0x05,
/* stuffing 167 bytes */
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
/* TS */
0x47, 0x50, 0x01, 0x10, 0x00,
/* PSI */
0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00,
/* PMT */
0xe1, 0x00,
0xf0, 0x00,
0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */
0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */
/* CRC */
0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */
/* stuffing 157 bytes */
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
var audNal = []byte{
0x00, 0x00, 0x00, 0x01, 0x09, 0xf0,
}
// TS Packet Header
const (
syncByte uint8 = 0x47
PidPAT uint16 = 0
// ------------------------------------------
// <iso13818-1.pdf> <Table 2-5> <page 38/174>
// ------------------------------------------
AdaptationFieldControlReserved uint8 = 0 // Reserved for future use by ISO/IEC
AdaptationFieldControlNo uint8 = 1 // No adaptation_field, payload only
AdaptationFieldControlOnly uint8 = 2 // Adaptation_field only, no payload
AdaptationFieldControlFollowed uint8 = 3 // Adaptation_field followed by payload
)
// PMT
const (
// -----------------------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-29 Stream type assignments> <page 66/174>
// 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax
// 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video
// -----------------------------------------------------------------------------
streamTypeAAC uint8 = 0x0F
streamTypeAVC uint8 = 0x1B
)
// PES
const (
// -----------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-18-Stream_id assignments> <page 52/174>
// -----------------------------------------------------------------
streamIDAudio uint8 = 192 // 110x xxxx 0xC0
streamIDVideo uint8 = 224 // 1110 xxxx
// ------------------------------
// <iso13818-1.pdf> <page 53/174>
// ------------------------------
PTSDTSFlags0 uint8 = 0 // no PTS no DTS
PTSDTSFlags1 uint8 = 1 // forbidden
PTSDTSFlags2 uint8 = 2 // only PTS
PTSDTSFlags3 uint8 = 3 // both PTS and DTS
)
const (
PidVideo uint16 = 0x100
PidAudio uint16 = 0x101
delay uint64 = 63000 // 700 ms PCR delay TODO chef: 具体作用?
// TODO chef 这些在配置项中提供
negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了比当前fragment的首个时间戳还小强制切割新的fragment单位毫秒 * 90
maxAudioDelay uint64 = 300 // 单位毫秒
appName = "hls"
negMaxfraglen uint64 = 1000 * 90 // 当前包时间戳回滚了比当前fragment的首个时间戳还小强制切割新的fragment单位毫秒*90
maxAudioCacheDelayByAudio uint64 = 150 * 90 // 单位(毫秒*90
maxAudioCacheDelayByVideo uint64 = 300 * 90 // 单位(毫秒*90
)
func SplitFragment2TSPackets(content []byte) (ret [][]byte) {

@ -12,23 +12,8 @@ import (
"testing"
"github.com/q191201771/lal/pkg/innertest"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/naza/pkg/nazalog"
)
func TestParseFixedTSPacket(t *testing.T) {
h := hls.ParseTSPacketHeader(hls.FixedFragmentHeader)
nazalog.Debugf("%+v", h)
pat := hls.ParsePAT(hls.FixedFragmentHeader[5:])
nazalog.Debugf("%+v", pat)
h = hls.ParseTSPacketHeader(hls.FixedFragmentHeader[188:])
nazalog.Debugf("%+v", h)
pmt := hls.ParsePMT(hls.FixedFragmentHeader[188+5:])
nazalog.Debugf("%+v", pmt)
}
func TestHls(t *testing.T) {
func TestHLS(t *testing.T) {
innertest.InnerTestEntry(t)
}

@ -13,6 +13,8 @@ import (
"fmt"
"os"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/avc"
@ -48,13 +50,13 @@ type Muxer struct {
config *MuxerConfig
fragmentOP FragmentOP
opened bool
adts aac.ADTS
spspps []byte // AnnexB
videoCC uint8
audioCC uint8
videoOut []byte // 帧
fragment Fragment
opened bool
adts aac.ADTS
spspps []byte // AnnexB
videoCC uint8
audioCC uint8
videoOut []byte // 帧
fragTS uint64 // 新建立fragment时的时间戳毫秒 * 90
@ -62,21 +64,19 @@ type Muxer struct {
frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []fragmentInfo // TS文件的环形队列记录TS的信息比如写M3U8文件时要用 2 * winfrags + 1
aaframe []byte
aframePTS uint64 // 最新音频帧的时间戳
audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧
audioCacheFirstFramePTS uint64 // audioCacheFrames中第一个音频帧的时间戳
}
func NewMuxer(streamName string, config *MuxerConfig) *Muxer {
uk := unique.GenUniqueKey("HLSMUXER")
nazalog.Infof("[%s] lifecycle new hls muxer. streamName=%s", uk, streamName)
op := getMuxerOutPath(config.OutPath, streamName)
playlistFilename := getM3U8Filename(op, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
videoOut := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
frags := make([]fragmentInfo, 2*config.FragmentNum+1) // TODO chef: 为什么是 * 2 + 1
return &Muxer{
m := &Muxer{
UniqueKey: uk,
streamName: streamName,
outPath: op,
@ -84,9 +84,11 @@ func NewMuxer(streamName string, config *MuxerConfig) *Muxer {
playlistFilenameBak: playlistFilenameBak,
config: config,
videoOut: videoOut,
aaframe: nil,
audioCacheFrames: nil,
frags: frags,
}
nazalog.Infof("[%s] lifecycle new hls muxer. muxer=%p, streamName=%s", uk, m, streamName)
return m
}
func (m *Muxer) Start() {
@ -100,7 +102,8 @@ func (m *Muxer) Dispose() {
m.closeFragment(true)
}
// 函数调用结束后内部不持有msg中的内存块
// @param msg 函数调用结束后内部不持有msg中的内存块
//
func (m *Muxer) FeedRTMPMessage(msg base.RTMPMsg) {
switch msg.Header.MsgTypeID {
case base.RTMPTypeIDAudio:
@ -124,6 +127,9 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) {
ftype := msg.Payload[0] & 0xF0 >> 4
htype := msg.Payload[1]
// 将数据转换成AnnexB
// 如果是sps pps缓存住然后直接返回
if ftype == 1 && htype == 0 {
if err := m.cacheSPSPPS(msg); err != nil {
nazalog.Errorf("[%s] cache spspps failed. err=%+v", m.UniqueKey, err)
@ -137,6 +143,8 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) {
spsppsSent := false
// 优化这块buffer
out := m.videoOut[0:0]
// tag中可能有多个NALU逐个获取
for i := 5; i != len(msg.Payload); {
if i+4 > len(msg.Payload) {
nazalog.Errorf("[%s] slice len not enough. i=%d, len=%d", m.UniqueKey, i, len(msg.Payload))
@ -151,8 +159,10 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) {
nalType := avc.ParseNALUType(msg.Payload[i])
//nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts)
nazalog.Debugf("[%s] hls: h264 NAL type=%d, len=%d(%d) cts=%d.", m.UniqueKey, nalType, nalBytes, len(msg.Payload), cts)
// sps pps前面已经缓存过了这里就不用处理了
// aud有自己的生产逻辑原流中的aud直接过滤掉
if nalType == avc.NALUTypeSPS || nalType == avc.NALUTypePPS || nalType == avc.NALUTypeAUD {
i += nalBytes
continue
@ -161,10 +171,12 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) {
if !audSent {
switch nalType {
case avc.NALUTypeSlice, avc.NALUTypeIDRSlice, avc.NALUTypeSEI:
// 在前面写入aud
out = append(out, audNal...)
audSent = true
case avc.NALUTypeAUD:
audSent = true
//case avc.NALUTypeAUD:
// // 上面aud已经continue跳过了应该进不到这个分支可以考虑删除这个分支代码
// audSent = true
}
}
@ -172,6 +184,7 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) {
case avc.NALUTypeSlice:
spsppsSent = false
case avc.NALUTypeIDRSlice:
// 如果是关键帧在前面写入sps pps
if !spsppsSent {
out = m.appendSPSPPS(out)
}
@ -179,35 +192,41 @@ func (m *Muxer) feedVideo(msg base.RTMPMsg) {
}
// 这里不知为什么要区分写入两种类型的start code
if len(out) == 0 {
out = append(out, avc.NALUStartCode4...)
} else {
out = append(out, avc.NALUStartCode3...)
}
out = append(out, msg.Payload[i:i+nalBytes]...)
i += nalBytes
}
var frame mpegTSFrame
frame.cc = m.videoCC
frame.dts = uint64(msg.Header.TimestampAbs) * 90
frame.pts = frame.dts + uint64(cts)*90
frame.pid = PidVideo
frame.sid = streamIDVideo
frame.key = ftype == 1
key := ftype == 1
dts := uint64(msg.Header.TimestampAbs) * 90
boundary := frame.key && (!m.opened || !m.adts.HasInited() || m.aaframe != nil)
boundary := key && (!m.opened || !m.adts.HasInited() || m.audioCacheFrames != nil)
m.updateFragment(frame.dts, boundary, 1)
m.updateFragment(dts, boundary, false)
if !m.opened {
nazalog.Warnf("[%s] not opened.", m.UniqueKey)
return
}
m.fragmentOP.WriteFrame(&frame, out)
m.videoCC = frame.cc
var frame mpegts.Frame
frame.CC = m.videoCC
frame.DTS = dts
frame.PTS = frame.DTS + uint64(cts)*90
frame.Key = key
frame.Raw = out
frame.Pid = mpegts.PidVideo
frame.Sid = mpegts.StreamIDVideo
nazalog.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
m.fragment.WriteFrame(&frame)
m.videoCC = frame.CC
}
func (m *Muxer) feedAudio(msg base.RTMPMsg) {
@ -218,6 +237,8 @@ func (m *Muxer) feedAudio(msg base.RTMPMsg) {
return
}
nazalog.Debugf("[%s] hls: feedAudio. dts=%d len=%d", m.UniqueKey, msg.Header.TimestampAbs, len(msg.Payload))
if msg.Payload[1] == 0 {
m.cacheAACSeqHeader(msg)
return
@ -230,15 +251,15 @@ func (m *Muxer) feedAudio(msg base.RTMPMsg) {
pts := uint64(msg.Header.TimestampAbs) * 90
m.updateFragment(pts, m.spspps == nil, 2)
m.updateFragment(pts, m.spspps == nil, true)
if m.aaframe == nil {
m.aframePTS = pts
if m.audioCacheFrames == nil {
m.audioCacheFirstFramePTS = pts
}
adtsHeader, _ := m.adts.CalcADTSHeader(uint16(msg.Header.MsgLen - 2))
m.aaframe = append(m.aaframe, adtsHeader...)
m.aaframe = append(m.aaframe, msg.Payload[2:]...)
m.audioCacheFrames = append(m.audioCacheFrames, adtsHeader...)
m.audioCacheFrames = append(m.audioCacheFrames, msg.Payload[2:]...)
}
func (m *Muxer) cacheAACSeqHeader(msg base.RTMPMsg) {
@ -261,14 +282,20 @@ func (m *Muxer) appendSPSPPS(out []byte) []byte {
return out
}
func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) {
// 决定是否开启新的TS切片文件注意可能已经有TS切片也可能没有这是第一个切片以及落盘音频数据
//
// @param boundary 调用方认为可能是开启新TS切片的时间点
// @param isByAudio 触发该函数调用,是因为收到音频数据,还是视频数据
//
func (m *Muxer) updateFragment(ts uint64, boundary bool, isByAudio bool) {
force := false
discont := true
var f *fragmentInfo
// 如果已经有TS切片检查是否需要强制开启新的切片以及切片是否发生跳跃
// 注意,音频和视频是在一起检查的
if m.opened {
f = m.getFrag(m.nfrags)
f = m.getCurrFrag()
// 当前时间戳跳跃很大或者是往回跳跃超过了阈值强制开启新的fragment
maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10)
@ -280,11 +307,11 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) {
f.duration = float64(ts-m.fragTS) / 90000
discont = false
}
}
// 时长超过设置的ts文件切片阈值才行
if f != nil && f.duration < float64(m.config.FragmentDurationMS)/1000 {
boundary = false
// 已经有TS切片那么只有当前fragment的时长超过设置的TS切片阈值才开启新的切片
if f.duration < float64(m.config.FragmentDurationMS)/1000 {
boundary = false
}
}
// 开启新的fragment
@ -294,11 +321,18 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) {
}
// 音频已经缓存了一定时长的数据了,需要落盘了
if m.opened && m.aaframe != nil && ((m.aframePTS + maxAudioDelay*90/uint64(flushRate)) < ts) {
var maxAudioDelay uint64
if isByAudio {
maxAudioDelay = maxAudioCacheDelayByAudio
} else {
maxAudioDelay = maxAudioCacheDelayByVideo
}
if m.opened && m.audioCacheFrames != nil && ((m.audioCacheFirstFramePTS + maxAudioDelay) < ts) {
m.flushAudio()
}
}
// @param discont 不连续标志会在m3u8文件的fragment前增加`#EXT-X-DISCONTINUITY`
func (m *Muxer) openFragment(ts uint64, discont bool) {
if m.opened {
return
@ -307,10 +341,10 @@ func (m *Muxer) openFragment(ts uint64, discont bool) {
id := m.getFragmentID()
filename := getTSFilename(m.outPath, m.streamName, id)
_ = m.fragmentOP.OpenFile(filename)
_ = m.fragment.OpenFile(filename)
m.opened = true
frag := m.getFrag(m.nfrags)
frag := m.getCurrFrag()
frag.discont = discont
frag.id = id
@ -324,11 +358,11 @@ func (m *Muxer) closeFragment(isLast bool) {
return
}
m.fragmentOP.CloseFile()
m.fragment.CloseFile()
m.opened = false
//更新序号,为下个分片准备好
m.nextFrag()
m.incrFrag()
m.writePlaylist(isLast)
}
@ -391,8 +425,11 @@ func (m *Muxer) getFrag(n int) *fragmentInfo {
return &m.frags[(m.frag+n)%(m.config.FragmentNum*2+1)]
}
// TODO chef: 这个函数重命名为incr更好些
func (m *Muxer) nextFrag() {
func (m *Muxer) getCurrFrag() *fragmentInfo {
return m.getFrag(m.nfrags)
}
func (m *Muxer) incrFrag() {
if m.nfrags == m.config.FragmentNum {
m.frag++
} else {
@ -411,21 +448,21 @@ func (m *Muxer) flushAudio() {
return
}
if m.aaframe == nil {
if m.audioCacheFrames == nil {
return
}
frame := &mpegTSFrame{
pts: m.aframePTS,
dts: m.aframePTS,
pid: PidAudio,
sid: streamIDAudio,
cc: m.audioCC,
key: false,
}
m.fragmentOP.WriteFrame(frame, m.aaframe)
m.audioCC = frame.cc
m.aaframe = nil
var frame mpegts.Frame
frame.CC = m.audioCC
frame.DTS = m.audioCacheFirstFramePTS
frame.PTS = m.audioCacheFirstFramePTS
frame.Key = false
frame.Raw = m.audioCacheFrames
frame.Pid = mpegts.PidAudio
frame.Sid = mpegts.StreamIDAudio
nazalog.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
m.fragment.WriteFrame(&frame)
m.audioCC = frame.CC
m.audioCacheFrames = nil
}

@ -12,6 +12,8 @@ import (
"net"
"net/http"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
)
@ -74,8 +76,10 @@ func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
switch ri.fileType {
case "m3u8":
resp.Header().Add("Content-Type", "application/x-mpegurl")
resp.Header().Add("Server", base.LALHLSM3U8Server)
case "ts":
resp.Header().Add("Content-Type", "video/mp2t")
resp.Header().Add("Server", base.LALHLSTSServer)
}
resp.Header().Add("Cache-Control", "no-cache")

@ -15,6 +15,8 @@ import (
"strings"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/connection"
@ -53,11 +55,12 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
}
uk := unique.GenUniqueKey("FLVPULL")
nazalog.Infof("[%s] lifecycle new PullSession.", uk)
return &PullSession{
s := &PullSession{
option: option,
UniqueKey: uk,
}
nazalog.Infof("[%s] lifecycle new httpflv PullSession. session=%p", uk, s)
return s
}
type OnReadFLVTag func(tag Tag)
@ -81,7 +84,7 @@ func (session *PullSession) Pull(rawURL string, onReadFLVTag OnReadFLVTag) error
}
func (session *PullSession) Dispose() {
nazalog.Infof("[%s] lifecycle dispose PullSession.", session.UniqueKey)
nazalog.Infof("[%s] lifecycle dispose httpflv PullSession.", session.UniqueKey)
_ = session.Conn.Close()
}
@ -126,8 +129,8 @@ func (session *PullSession) Connect(rawURL string) error {
func (session *PullSession) WriteHTTPRequest() error {
// # 发送 http GET 请求
nazalog.Debugf("[%s] > W http request. GET %s", session.UniqueKey, session.pathWithQuery)
req := fmt.Sprintf("GET %s HTTP/1.0\r\nAccept: */*\r\nRange: byte=0-\r\nConnection: close\r\nHost: %s\r\nIcy-MetaData: 1\r\n\r\n",
session.pathWithQuery, session.host)
req := fmt.Sprintf("GET %s HTTP/1.0\r\nUser-Agent: %s\r\nAccept: */*\r\nRange: byte=0-\r\nConnection: close\r\nHost: %s\r\nIcy-MetaData: 1\r\n\r\n",
session.pathWithQuery, base.LALHTTPFLVPullSessionUA, session.host)
_, err := session.Conn.Write([]byte(req))
return err
}

@ -14,6 +14,8 @@ import (
"strings"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/connection"
@ -22,15 +24,7 @@ import (
"github.com/q191201771/naza/pkg/unique"
)
var flvHTTPResponseHeaderStr = "HTTP/1.1 200 OK\r\n" +
"Cache-Control: no-cache\r\n" +
"Content-Type: video/x-flv\r\n" +
"Connection: close\r\n" +
"Expires: -1\r\n" +
"Pragma: no-cache\r\n" +
"\r\n"
var flvHTTPResponseHeader = []byte(flvHTTPResponseHeaderStr)
var flvHTTPResponseHeader []byte
type SubSession struct {
UniqueKey string
@ -48,8 +42,7 @@ type SubSession struct {
func NewSubSession(conn net.Conn) *SubSession {
uk := unique.GenUniqueKey("FLVSUB")
nazalog.Infof("[%s] lifecycle new SubSession. addr=%s", uk, conn.RemoteAddr().String())
return &SubSession{
s := &SubSession{
UniqueKey: uk,
IsFresh: true,
conn: connection.New(conn, func(option *connection.Option) {
@ -58,6 +51,8 @@ func NewSubSession(conn net.Conn) *SubSession {
option.WriteTimeoutMS = subSessionWriteTimeoutMS
}),
}
nazalog.Infof("[%s] lifecycle new httpflv SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String())
return s
}
// TODO chef: read request timeout
@ -135,5 +130,19 @@ func (session *SubSession) WriteRawPacket(pkt []byte) {
}
func (session *SubSession) Dispose() {
nazalog.Infof("[%s] lifecycle dispose httpflv SubSession.", session.UniqueKey)
_ = session.conn.Close()
}
func init() {
flvHTTPResponseHeaderStr := "HTTP/1.1 200 OK\r\n" +
"Server: " + base.LALHTTPFLVSubSessionServer + "\r\n" +
"Cache-Control: no-cache\r\n" +
"Content-Type: video/x-flv\r\n" +
"Connection: close\r\n" +
"Expires: -1\r\n" +
"Pragma: no-cache\r\n" +
"\r\n"
flvHTTPResponseHeader = []byte(flvHTTPResponseHeaderStr)
}

@ -0,0 +1,118 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package mpegts
// 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeader = []byte{
/* TS */
0x47, 0x40, 0x00, 0x10, 0x00,
/* PSI */
0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00,
/* PAT */
0x00, 0x01, 0xf0, 0x01,
/* CRC */
0x2e, 0x70, 0x19, 0x05,
/* stuffing 167 bytes */
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
/* TS */
0x47, 0x50, 0x01, 0x10, 0x00,
/* PSI */
0x02, 0xb0, 0x17, 0x00, 0x01, 0xc1, 0x00, 0x00,
/* PMT */
0xe1, 0x00,
0xf0, 0x00,
0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */
0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */
/* CRC */
0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */
/* stuffing 157 bytes */
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
// TS Packet Header
const (
syncByte uint8 = 0x47
PidPAT uint16 = 0
PidVideo uint16 = 0x100
PidAudio uint16 = 0x101
// ------------------------------------------
// <iso13818-1.pdf> <Table 2-5> <page 38/174>
// ------------------------------------------
AdaptationFieldControlReserved uint8 = 0 // Reserved for future use by ISO/IEC
AdaptationFieldControlNo uint8 = 1 // No adaptation_field, payload only
AdaptationFieldControlOnly uint8 = 2 // Adaptation_field only, no payload
AdaptationFieldControlFollowed uint8 = 3 // Adaptation_field followed by payload
)
// PMT
const (
// -----------------------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-29 Stream type assignments> <page 66/174>
// 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax
// 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video
// -----------------------------------------------------------------------------
streamTypeAAC uint8 = 0x0F
streamTypeAVC uint8 = 0x1B
)
// PES
const (
// -----------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-18-Stream_id assignments> <page 52/174>
// -----------------------------------------------------------------
StreamIDAudio uint8 = 192 // 110x xxxx 0xC0
StreamIDVideo uint8 = 224 // 1110 xxxx
// ------------------------------
// <iso13818-1.pdf> <page 53/174>
// ------------------------------
PTSDTSFlags0 uint8 = 0 // no PTS no DTS
PTSDTSFlags1 uint8 = 1 // forbidden
PTSDTSFlags2 uint8 = 2 // only PTS
PTSDTSFlags3 uint8 = 3 // both PTS and DTS
)
const (
delay uint64 = 63000 // 700 ms PCR delay TODO chef: 具体作用?
)

@ -0,0 +1,28 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package mpegts_test
import (
"testing"
"github.com/q191201771/lal/pkg/mpegts"
"github.com/q191201771/naza/pkg/nazalog"
)
func TestParseFixedTSPacket(t *testing.T) {
h := mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeader)
nazalog.Debugf("%+v", h)
pat := mpegts.ParsePAT(mpegts.FixedFragmentHeader[5:])
nazalog.Debugf("%+v", pat)
h = mpegts.ParseTSPacketHeader(mpegts.FixedFragmentHeader[188:])
nazalog.Debugf("%+v", h)
pmt := mpegts.ParsePMT(mpegts.FixedFragmentHeader[188+5:])
nazalog.Debugf("%+v", pmt)
}

@ -0,0 +1,241 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package mpegts
// MPEG: Moving Picture Experts Group
type Frame struct {
PTS uint64
DTS uint64
Pid uint16 // PID of PES Header
Sid uint8 // stream_id of PES Header
CC uint8 // continuity_counter of TS Header
Key bool
Raw []byte
}
// @param packet: 188字节大小的TS包注意一次Pack对应的多个TSPacket复用的是一块内存
// @param cc: 当前TS包的continuity_counter
//
type OnTSPacket func(packet []byte, cc uint8)
// @param frame: frame.CC 注意内部会修改frame.CC的值外部在调用结束后可保持CC的值供下次调用时使用
// frame.Pid 音频设置为mpegts.PidAudio视频设置为mpegts.PidVideo
// frame.Sid 音频设置为mpegts.StreamIDAudio视频设置为mpegts.StreamIDVideo
// frame.Key 是否是关键帧一般用于视频格式音频全部设置为false
//
// frame.Raw 如果是AVC类型格式为AnnexB
// 如果是AAC类型格式为2字节ADTS头加raw frame
// 函数内部不会持有该内存块
//
// @param onTSPacket: 注意,一次函数调用,可能对应多次回调
//
func PackTSPacket(frame *Frame, onTSPacket OnTSPacket) {
wpos := 0 // 当前packet的写入位置
lpos := 0 // 当前帧的处理位置
rpos := len(frame.Raw) // 当前帧大小
first := true // 是否为帧的首个packet的标准
packet := make([]byte, 188)
for lpos != rpos {
wpos = 0
frame.CC++
// 每个packet都需要添加TS Header
// -----TS Header----------------
// sync_byte
// transport_error_indicator 0
// payload_unit_start_indicator
// transport_priority 0
// PID
// transport_scrambling_control 0
// adaptation_field_control
// continuity_counter
// ------------------------------
packet[0] = syncByte // sync_byte
packet[1] = 0x0
if first {
packet[1] = 0x40 // payload_unit_start_indicator
}
packet[1] |= uint8((frame.Pid >> 8) & 0x1F) //PID高5位
packet[2] = uint8(frame.Pid & 0xFF) //PID低8位
// adaptation_field_control 先设置成无Adaptation
// continuity_counter
packet[3] = 0x10 | (frame.CC & 0x0f)
wpos += 4
if first {
if frame.Key {
// 关键帧的首个packet需要添加Adaptation
// -----Adaptation-----------------------
// adaptation_field_length
// discontinuity_indicator 0
// random_access_indicator 1
// elementary_stream_priority_indicator 0
// PCR_flag 1
// OPCR_flag 0
// splicing_point_flag 0
// transport_private_data_flag 0
// adaptation_field_extension_flag 0
// program_clock_reference_base
// reserved
// program_clock_reference_extension
// --------------------------------------
packet[3] |= 0x20 // adaptation_field_control 设置Adaptation
packet[4] = 7 // adaptation_field_length
packet[5] = 0x50 // random_access_indicator + PCR_flag
packPCR(packet[6:], frame.DTS-delay) // using 6 byte
wpos += 8
}
// 帧的首个packet需要添加PES Header
// -----PES Header------------
// packet_start_code_prefix
// stream_id
// PES_packet_length
// '10'
// PES_scrambling_control 0
// PES_priority 0
// data_alignment_indicator 0
// copyright 0
// original_or_copy 0
// PTS_DTS_flags
// ESCR_flag 0
// ES_rate_flag 0
// DSM_trick_mode_flag 0
// additional_copy_info_flag 0
// PES_CRC_flag 0
// PES_extension_flag 0
// PES_header_data_length
// ---------------------------
packet[wpos] = 0x00 // packet_start_code_prefix 24-bits
packet[wpos+1] = 0x00 //
packet[wpos+2] = 0x01 //
packet[wpos+3] = frame.Sid // stream_id
wpos += 4
// 计算PES Header中一些字段的值
// PTS相关
headerSize := uint8(5)
flags := uint8(0x80)
// DTS相关
if frame.DTS != frame.PTS {
headerSize += 5
flags |= 0x40
}
pesSize := rpos + int(headerSize) + 3 // PES Header剩余3字节 + PTS/PTS长度 + 整个帧的长度
if pesSize > 0xFFFF {
pesSize = 0
}
packet[wpos] = uint8(pesSize >> 8) // PES_packet_length
packet[wpos+1] = uint8(pesSize & 0xFF) //
packet[wpos+2] = 0x80 // 除了reserve的'10'其他字段都是0
packet[wpos+3] = flags // PTS/DTS flag
packet[wpos+4] = headerSize // PES_header_data_length: PTS+DTS数据长度
wpos += 5
// 写入PTS的值
packPTS(packet[wpos:], flags>>6, frame.PTS+delay)
wpos += 5
// 写入DTS的值
if frame.PTS != frame.DTS {
packPTS(packet[wpos:], 1, frame.DTS+delay)
wpos += 5
}
first = false
}
// 把帧的内容切割放入packet中
bodySize := 188 - wpos // 当前TS packet可写入大小
inSize := rpos - lpos // 整个帧剩余待打包大小
if bodySize <= inSize {
// 当前packet写不完这个帧或者刚好够写完
copy(packet[wpos:], frame.Raw[lpos:lpos+inSize])
lpos += bodySize
} else {
// 当前packet可以写完这个帧并且还有空闲空间
// 此时真实数据挪最后中间用0xFF填充到Adaptation中
// 注意,此时有两种情况
// 1. 原本有Adaptation
// 2. 原本没有Adaptation
stuffSize := bodySize - inSize // 当前TS packet的剩余空闲空间
if packet[3]&0x20 != 0 {
// has Adaptation
base := int(4 + packet[4]) // TS Header + Adaptation
if wpos > base {
// 比如有PES Header
copy(packet[base+stuffSize:], packet[base:wpos])
}
wpos = base + stuffSize
packet[4] += uint8(stuffSize) // adaptation_field_length
for i := 0; i < stuffSize; i++ {
packet[base+i] = 0xFF
}
} else {
// no Adaptation
packet[3] |= 0x20
base := 4
if wpos > base {
copy(packet[base+stuffSize:], packet[base:wpos])
}
wpos += stuffSize
packet[4] = uint8(stuffSize - 1) // adaptation_field_length
if stuffSize >= 2 {
// TODO chef 这里是参考nginx rtmp module的实现为什么这个字节写0而不是0xFF
packet[5] = 0
for i := 0; i < stuffSize-2; i++ {
packet[6+i] = 0xFF
}
}
}
// 真实数据放在packet尾部
copy(packet[wpos:], frame.Raw[lpos:lpos+inSize])
lpos = rpos
}
onTSPacket(packet, frame.CC)
}
}
func packPCR(out []byte, pcr uint64) {
out[0] = uint8(pcr >> 25)
out[1] = uint8(pcr >> 17)
out[2] = uint8(pcr >> 9)
out[3] = uint8(pcr >> 1)
out[4] = uint8(pcr<<7) | 0x7e
out[5] = 0
}
// 注意除PTS外DTS也使用这个函数打包
func packPTS(out []byte, fb uint8, pts uint64) {
var val uint64
out[0] = (fb << 4) | (uint8(pts>>30) & 0x07) | 1
val = (((pts >> 15) & 0x7FFF) << 1) | 1
out[1] = uint8(val >> 8)
out[2] = uint8(val)
val = ((pts & 0x7FFF) << 1) | 1
out[3] = uint8(val >> 8)
out[4] = uint8(val)
}

@ -6,7 +6,7 @@
//
// Author: Chef (191201771@qq.com)
package hls
package mpegts
import (
"github.com/q191201771/naza/pkg/nazabits"

@ -6,7 +6,7 @@
//
// Author: Chef (191201771@qq.com)
package hls
package mpegts
import (
"github.com/q191201771/naza/pkg/nazabits"

@ -6,7 +6,7 @@
//
// Author: Chef (191201771@qq.com)
package hls
package mpegts
import (
"github.com/q191201771/naza/pkg/nazabits"

@ -6,7 +6,7 @@
//
// Author: Chef (191201771@qq.com)
package hls
package mpegts
import (
"github.com/q191201771/naza/pkg/nazabits"

@ -9,7 +9,6 @@
package rtmp
import (
"encoding/hex"
"errors"
"net"
"net/url"
@ -84,14 +83,13 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
case CSTPushSession:
uk = unique.GenUniqueKey("RTMPPUSH")
}
log.Infof("[%s] lifecycle new rtmp client session.", uk)
option := defaultClientSessOption
for _, fn := range modOptions {
fn(&option)
}
return &ClientSession{
s := &ClientSession{
UniqueKey: uk,
t: t,
option: option,
@ -99,6 +97,8 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
packer: NewMessagePacker(),
chunkComposer: NewChunkComposer(),
}
log.Infof("[%s] lifecycle new rtmp ClientSession. session=%p", uk, s)
return s
}
// 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误
@ -141,7 +141,7 @@ func (s *ClientSession) do(rawURL string) <-chan error {
}
log.Infof("[%s] > W connect('%s').", s.UniqueKey, s.appName)
if err := s.packer.writeConnect(s.conn, s.appName, s.tcURL); err != nil {
if err := s.packer.writeConnect(s.conn, s.appName, s.tcURL, s.t == CSTPushSession); err != nil {
ch <- err
return ch
}
@ -173,7 +173,7 @@ func (s *ClientSession) Flush() error {
}
func (s *ClientSession) Dispose() {
log.Infof("[%s] lifecycle dispose rtmp client session.", s.UniqueKey)
log.Infof("[%s] lifecycle dispose rtmp ClientSession.", s.UniqueKey)
_ = s.conn.Close()
}
@ -222,9 +222,7 @@ func (s *ClientSession) doDataMessageAMF0(stream *Stream) error {
switch val {
case "|RtmpSampleAccess":
// TODO chef: handle this?
log.Error(val)
log.Error(hex.Dump(stream.msg.buf[stream.msg.b:stream.msg.e]))
log.Debugf("[%s] < R |RtmpSampleAccess, ignore.", s.UniqueKey)
return nil
default:
}

@ -12,11 +12,14 @@ import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"fmt"
"io"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/bele"
log "github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/nazalog"
)
// https://pengrl.com/p/20027
@ -222,7 +225,7 @@ func parseChallenge(c0c1 []byte) []byte {
//}
ver := bele.BEUint32(c0c1[5:])
if ver == 0 {
log.Debug("handshake simple mode.")
nazalog.Debug("handshake simple mode.")
return nil
}
@ -231,10 +234,10 @@ func parseChallenge(c0c1 []byte) []byte {
offs = findDigest(c0c1[1:], 8, clientKey[:clientPartKeyLen])
}
if offs == -1 {
log.Warn("get digest offs failed. roll back to try simple handshake.")
nazalog.Warn("get digest offs failed. roll back to try simple handshake.")
return nil
}
log.Debug("handshake complex mode.")
nazalog.Debug("handshake complex mode.")
// use c0c1 digest to make a new digest
digest := makeDigest(c0c1[1+offs:1+offs+keyLen], serverKey[:serverFullKeyLen])
@ -282,10 +285,8 @@ func random1528(out []byte) {
}
func init() {
bs := []byte{'l', 'a', 'l'}
bsl := len(bs)
random1528Buf = make([]byte, 1528)
for i := 0; i < 1528; i++ {
random1528Buf[i] = bs[i%bsl]
}
hack := fmt.Sprintf("random buf of rtmp handshake gen by %s", base.LALRTMPHandshakeWaterMark)
copy(random1528Buf, []byte(hack))
nazalog.Debug(len(random1528Buf))
}

@ -14,6 +14,7 @@ package rtmp
import (
"bytes"
"fmt"
"io"
"github.com/q191201771/lal/pkg/base"
@ -76,17 +77,23 @@ func (packer *MessagePacker) writePeerBandwidth(writer io.Writer, val int, limit
return err
}
func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL string) error {
// @param isPush: 推流为true拉流为false
func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL string, isPush bool) error {
packer.writeMessageHeader(csidOverConnection, 0, base.RTMPTypeIDCommandMessageAMF0, 0)
_ = AMF0.WriteString(packer.b, "connect")
_ = AMF0.WriteNumber(packer.b, float64(tidClientConnect))
objs := []ObjectPair{
{Key: "app", Value: appName},
{Key: "type", Value: "nonprivate"},
{Key: "flashVer", Value: "FMLE/3.0 (compatible; Lal0.0.1)"},
{Key: "tcUrl", Value: tcURL},
var objs []ObjectPair
objs = append(objs, ObjectPair{Key: "app", Value: appName})
objs = append(objs, ObjectPair{Key: "type", Value: "nonprivate"})
var flashVer string
if isPush {
flashVer = fmt.Sprintf("FMLE/3.0 (compatible; %s)", base.LALRTMPPushSessionConnectVersion)
} else {
flashVer = "LNX 9,0,124,2"
}
objs = append(objs, ObjectPair{Key: "flashVer", Value: flashVer})
objs = append(objs, ObjectPair{Key: "tcUrl", Value: tcURL})
_ = AMF0.WriteObject(packer.b, objs)
raw := packer.b.Bytes()
bele.BEPutUint24(raw[4:], uint32(len(raw)-12))
@ -95,7 +102,7 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcURL strin
}
func (packer *MessagePacker) writeConnectResult(writer io.Writer, tid int) error {
packer.writeMessageHeader(csidOverConnection, 190, base.RTMPTypeIDCommandMessageAMF0, 0)
packer.writeMessageHeader(csidOverConnection, 0, base.RTMPTypeIDCommandMessageAMF0, 0)
_ = AMF0.WriteString(packer.b, "_result")
_ = AMF0.WriteNumber(packer.b, float64(tid))
objs := []ObjectPair{
@ -108,8 +115,11 @@ func (packer *MessagePacker) writeConnectResult(writer io.Writer, tid int) error
{Key: "code", Value: "NetConnection.Connect.Success"},
{Key: "description", Value: "Connection succeeded."},
{Key: "objectEncoding", Value: 0},
{Key: "version", Value: base.LALRTMPConnectResultVersion},
}
_ = AMF0.WriteObject(packer.b, objs)
raw := packer.b.Bytes()
bele.BEPutUint24(raw[4:], uint32(len(raw)-12))
_, err := packer.b.WriteTo(writer)
return err
}

@ -51,25 +51,16 @@ func TestWrite(t *testing.T) {
assert.Equal(t, []byte{2, 0, 0, 0, 0, 0, 5, 6, 0, 0, 0, 0, 0, 0, 0, 1, 2}, buf.Bytes())
buf.Reset()
err = packer.writeConnect(buf, "live", "rtmp://127.0.0.1/live")
err = packer.writeConnect(buf, "live", "rtmp://127.0.0.1/live", true)
assert.Equal(t, nil, err)
result = []byte{
0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x81, 0x14, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x07, 0x63,
0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03,
0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x04, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x04, 0x74, 0x79,
0x70, 0x65, 0x02, 0x00, 0x0a, 0x6e, 0x6f, 0x6e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x00,
0x08, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x56, 0x65, 0x72, 0x02, 0x00, 0x1f, 0x46, 0x4d, 0x4c, 0x45,
0x2f, 0x33, 0x2e, 0x30, 0x20, 0x28, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x69, 0x62, 0x6c, 0x65,
0x3b, 0x20, 0x4c, 0x61, 0x6c, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x29, 0x00, 0x05, 0x74, 0x63, 0x55,
0x72, 0x6c, 0x02, 0x00, 0x15, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x31, 0x32, 0x37, 0x2e,
0x30, 0x2e, 0x30, 0x2e, 0x31, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x00, 0x09,
}
result = []byte{0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x82, 0x14, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x04, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x04, 0x74, 0x79, 0x70, 0x65, 0x02, 0x00, 0x0a, 0x6e, 0x6f, 0x6e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x00, 0x08, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x56, 0x65, 0x72, 0x02, 0x00, 0x20, 0x46, 0x4d, 0x4c, 0x45, 0x2f, 0x33, 0x2e, 0x30, 0x20, 0x28, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x3b, 0x20, 0x6c, 0x61, 0x6c, 0x30, 0x2e, 0x31, 0x33, 0x2e, 0x30, 0x29, 0x00, 0x05, 0x74, 0x63, 0x55, 0x72, 0x6c, 0x02, 0x00, 0x15, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x31, 0x32, 0x37, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x00, 0x00, 0x09}
assert.Equal(t, result, buf.Bytes())
buf.Reset()
err = packer.writeConnectResult(buf, 1)
assert.Equal(t, nil, err)
result = []byte{0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0xbe, 0x14, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x7, 0x5f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x0, 0x3f, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x0, 0x6, 0x66, 0x6d, 0x73, 0x56, 0x65, 0x72, 0x2, 0x0, 0xd, 0x46, 0x4d, 0x53, 0x2f, 0x33, 0x2c, 0x30, 0x2c, 0x31, 0x2c, 0x31, 0x32, 0x33, 0x0, 0xc, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x0, 0x40, 0x3f, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x9, 0x3, 0x0, 0x5, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x2, 0x0, 0x6, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x0, 0x4, 0x63, 0x6f, 0x64, 0x65, 0x2, 0x0, 0x1d, 0x4e, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x0, 0xb, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x2, 0x0, 0x15, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x20, 0x73, 0x75, 0x63, 0x63, 0x65, 0x65, 0x64, 0x65, 0x64, 0x2e, 0x0, 0xe, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x9}
result = []byte{0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0xd0, 0x14, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x07, 0x5f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x06, 0x66, 0x6d, 0x73, 0x56, 0x65, 0x72, 0x02, 0x00, 0x0d, 0x46, 0x4d, 0x53, 0x2f, 0x33, 0x2c, 0x30, 0x2c, 0x31, 0x2c, 0x31, 0x32, 0x33, 0x00, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x00, 0x40, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x03, 0x00, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x02, 0x00, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x00, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x02, 0x00, 0x1d, 0x4e, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x00, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x02, 0x00, 0x15, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x20, 0x73, 0x75, 0x63, 0x63, 0x65, 0x65, 0x64, 0x65, 0x64, 0x2e, 0x00, 0x0e, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02, 0x00, 0x06, 0x30, 0x2c, 0x31, 0x33, 0x2c, 0x30, 0x00, 0x00, 0x09}
assert.Equal(t, result, buf.Bytes())
buf.Reset()
@ -139,7 +130,7 @@ func TestPackCorner(t *testing.T) {
assert.IsNotNil(t, err)
err = packer.writePeerBandwidth(mw, 1, 2)
assert.IsNotNil(t, err)
err = packer.writeConnect(mw, "live", "rtmp://127.0.0.1/live")
err = packer.writeConnect(mw, "live", "rtmp://127.0.0.1/live", true)
assert.IsNotNil(t, err)
err = packer.writeConnectResult(mw, 1)
assert.IsNotNil(t, err)
@ -161,6 +152,6 @@ func BenchmarkMessagePacker(b *testing.B) {
mw := fake.NewWriter(fake.WriterTypeDoNothing)
packer := NewMessagePacker()
for i := 0; i < b.N; i++ {
_ = packer.writeConnect(mw, "live", "rtmp://127.0.0.1/live")
_ = packer.writeConnect(mw, "live", "rtmp://127.0.0.1/live", true)
}
}

@ -72,7 +72,7 @@ func BuildMetadata(width int, height int, audiocodecid int, videocodecid int) ([
opa = append(opa, ObjectPair{
Key: "version",
Value: base.LALFullInfo,
Value: base.LALRTMPBuildMetadataEncoder,
})
if err := AMF0.WriteObject(buf, opa); err != nil {

@ -36,5 +36,5 @@ func TestMetadata(t *testing.T) {
v = opa.Find("videocodecid")
assert.Equal(t, float64(7), v.(float64))
v = opa.Find("version")
assert.Equal(t, base.LALFullInfo, v.(string))
assert.Equal(t, base.LALRTMPBuildMetadataEncoder, v.(string))
}

@ -69,8 +69,7 @@ type ServerSession struct {
func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession {
uk := unique.GenUniqueKey("RTMPPUBSUB")
nazalog.Infof("[%s] lifecycle new rtmp server session. addr=%s", uk, conn.RemoteAddr().String())
return &ServerSession{
s := &ServerSession{
conn: connection.New(conn, func(option *connection.Option) {
option.ReadBufSize = readBufSize
}),
@ -81,6 +80,8 @@ func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession {
packer: NewMessagePacker(),
IsFresh: true,
}
nazalog.Infof("[%s] lifecycle new rtmp ServerSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String())
return s
}
func (s *ServerSession) RunLoop() (err error) {
@ -101,7 +102,7 @@ func (s *ServerSession) Flush() error {
}
func (s *ServerSession) Dispose() {
nazalog.Infof("[%s] lifecycle dispose rtmp server session.", s.UniqueKey)
nazalog.Infof("[%s] lifecycle dispose rtmp ServerSession.", s.UniqueKey)
_ = s.conn.Close()
}

@ -22,9 +22,8 @@ import (
// rfc2326 10.1 OPTIONS
// CSeq
var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\n" +
"Server: " + base.LALFullInfo + "\r\n" +
"Server: " + base.LALRTSPOptionsResponseServer + "\r\n" +
"CSeq: %s\r\n" +
//"Public:DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE\r\n\r\n"
"Public:DESCRIBE, ANNOUNCE, SETUP, PLAY, PAUSE, RECORD, TEARDOWN\r\n" +
"\r\n"

@ -57,6 +57,7 @@ func NewPubSession(streamName string) *PubSession {
StreamName: streamName,
}
ps.avPacketQueue = NewAVPacketQueue(ps.onAVPacket)
nazalog.Infof("[%s] lifecycle new rtsp PubSession. session=%p, streamName=%s", uk, ps, streamName)
return ps
}

@ -11,13 +11,13 @@ for d in $(go list ./pkg/...); do
# 只看依赖lal自身的哪些package
# package依赖自身这个package的过滤掉
# 依赖pkg/base这个基础package的过滤掉
#go list -deps $d | grep 'q191201771/lal' | grep -v $d | grep -v 'q191201771/lal/pkg/base'
go list -deps $d | grep 'q191201771/lal' | grep -v $d | grep -v 'q191201771/lal/pkg/base'
#go list -deps $d | grep 'q191201771/lal' | grep -v $d
go list -deps $d | grep 'q191201771/naza' | grep -v $d
#go list -deps $d | grep 'q191201771/naza' | grep -v $d
done
for d in $(go list ./app/...); do
echo "-----"$d"-----"
#for d in $(go list ./app/...); do
#echo "-----"$d"-----"
#go list -deps $d | grep 'q191201771/lal' | grep -v $d
go list -deps $d | grep 'q191201771/naza' | grep -v $d
done
#go list -deps $d | grep 'q191201771/naza' | grep -v $d
#done

Loading…
Cancel
Save