[feat] HLS: rtmp音视频数据转换成m3u8+ts格式并落盘部分基本完成了

pull/2/head
q191201771 5 years ago
parent 99ab8df79a
commit d7e77299b2

2
.gitignore vendored

@ -1,5 +1,7 @@
profile.out
coverage.html
*.aac
*.h264
/pre-commit.sh
/coverage.txt

@ -19,13 +19,13 @@ import (
// 比较两个TS文件注意该程序还没有写完
var filename1 = "/Volumes/Data/lal-0.ts"
var filename2 = "/Volumes/Data/nrm-0.ts"
var filename1 = "/Volumes/Data/tmp/lal-4.ts"
var filename2 = "/Volumes/Data/tmp/nrm-4.ts"
func skipAudioPacketFilter(tss [][]byte) (ret [][]byte) {
func skipPacketFilter(tss [][]byte) (ret [][]byte) {
for _, ts := range tss {
h := hls.ParseTSPacketHeader(ts)
if h.Pid == uint16(257) {
if h.Pid == hls.PidAudio {
continue
}
ret = append(ret, ts)
@ -36,6 +36,24 @@ func skipAudioPacketFilter(tss [][]byte) (ret [][]byte) {
func parsePacket(packet []byte) {
h := hls.ParseTSPacketHeader(packet)
nazalog.Debugf("%+v", h)
index := 4
var adaptation hls.TSPacketAdaptation
switch h.Adaptation {
case hls.AdaptationFieldControlNo:
// noop
case hls.AdaptationFieldControlFollowed:
adaptation = hls.ParseTSPacketAdaptation(packet[4:])
index++
default:
nazalog.Warn(h.Adaptation)
}
index += int(adaptation.Length)
if h.PayloadUnitStart == 1 && h.Pid == 256 {
pes, length := hls.ParsePES(packet[index:])
nazalog.Debugf("%+v, %d", pes, length)
}
}
func main() {
@ -50,10 +68,10 @@ func main() {
nazalog.Debugf("num of ts1=%d, num of ts2=%d", len(tss1), len(tss2))
tss1 = skipAudioPacketFilter(tss1)
tss2 = skipAudioPacketFilter(tss2)
//tss1 = skipPacketFilter(tss1)
//tss2 = skipPacketFilter(tss2)
nazalog.Debugf("after skip audio. num of ts1=%d, num of ts2=%d", len(tss1), len(tss2))
nazalog.Debugf("after skip. num of ts1=%d, num of ts2=%d", len(tss1), len(tss2))
m := len(tss1)
if m > len(tss2) {

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.12
require github.com/q191201771/naza v0.12.1
require github.com/q191201771/naza v0.12.2

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.12.1 h1:uBJ9mrucssU4i10DrPaA07pnomOk2X7PnBZvP7Z1yXw=
github.com/q191201771/naza v0.12.1/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=
github.com/q191201771/naza v0.12.2 h1:El5OSCPFrGGrZiyZ0aOvdystC15Ap7lC4MipVKdfVMY=
github.com/q191201771/naza v0.12.2/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=

@ -56,12 +56,14 @@ func (obj *ADTS) PutAACSequenceHeader(payload []byte) {
// <1.6.3.4 channelConfiguration>
// --------------------------------------------------------
// audio object type [5b] 2=AAC LC
// samplingFrequencyIndex [4b] 3=48000
// samplingFrequencyIndex [4b] 3=48000 4=44100
// channelConfiguration [4b] 2=left, right front speakers
obj.audioObjectType = br.ReadBits8(5)
obj.samplingFrequencyIndex = br.ReadBits8(4)
obj.channelConfiguration = br.ReadBits8(4)
log.Debugf("%+v", obj)
obj.adtsHeader = make([]byte, 7)
}
// 获取 ADTS 头注意由于ADTS头依赖包的长度而每个包的长度不同所以生成的每个包的 ADTS 头也不同
@ -90,9 +92,6 @@ func (obj *ADTS) GetADTS(length uint16) []byte {
// adts_buffer_fullness [11b]
// no_raw_data_blocks_in_frame [2b]
if obj.adtsHeader == nil {
obj.adtsHeader = make([]byte, 7)
}
// 减去头两字节再加上自身adts头的7个字节
length += 5
@ -111,6 +110,10 @@ func (obj *ADTS) GetADTS(length uint16) []byte {
return obj.adtsHeader
}
func (obj *ADTS) IsNil() bool {
return obj.adtsHeader == nil
}
// 将 rtmp AAC 传入,输出带 ADTS 头的 AAC ES流
// @param <payload> rtmp message payload部分
func CaptureAAC(w io.Writer, payload []byte) {

@ -13,6 +13,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// ------------------------------------------------
// <iso13818-1.pdf> <2.4.3.2> <page 36/174>
// sync_byte [8b] * always 0x47
// transport_error_indicator [1b]
@ -22,6 +23,7 @@ import (
// transport_scrambling_control [2b]
// adaptation_field_control [2b]
// continuity_counter [4b] *
// ------------------------------------------------
type TSPacketHeader struct {
Sync uint8
Err uint8
@ -33,6 +35,7 @@ type TSPacketHeader struct {
CC uint8
}
// ----------------------------------------------------------
// <iso13818-1.pdf> <Table 2-6> <page 40/174>
// adaptation_field_length [8b] * 不包括自己这1字节
// discontinuity_indicator [1b]
@ -47,6 +50,7 @@ type TSPacketHeader struct {
// program_clock_reference_base [33b]
// reserved [6b]
// program_clock_reference_extension [9b] ******
// ----------------------------------------------------------
type TSPacketAdaptation struct {
Length uint8
}

@ -13,8 +13,14 @@ package hls
// - 不提供各种配置项
// - 只支持H264和AAC
// - 先参照nginx rtmp module把功能实现再做重构
//
// - 检查所有的容错处理,是否会出现
// - 配置项
// - web服务
// - 清理文件
// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/incorporating_ads_into_a_playlist
// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/event_playlist_construction
// #EXTM3U // 固定串
// #EXT-X-VERSION:3 // 固定串
// #EXT-X-MEDIA-SEQUENCE // m3u8文件中第一个TS文件的序号
@ -25,7 +31,8 @@ package hls
// 重构时需要统一项目中数据的命名比如进来的数据称为Frame帧188字节的封装称为TSPacket包TS文件称为Fragment
var FixedTSHeader = []byte{
// 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeader = []byte{
/* TS */
0x47, 0x40, 0x00, 0x10, 0x00,
/* PSI */
@ -61,8 +68,8 @@ var FixedTSHeader = []byte{
/* PMT */
0xe1, 0x00,
0xf0, 0x00,
0x1b, 0xe1, 0x00, 0xf0, 0x00, /* h264 epid 256 */
0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */
0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */
0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */
/* CRC */
0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */
/* stuffing 157 bytes */
@ -104,7 +111,9 @@ const (
PidPAT uint16 = 0
// ------------------------------------------
// <iso13818-1.pdf> <Table 2-5> <page 38/174>
// ------------------------------------------
AdaptationFieldControlReserved uint8 = 0 // Reserved for future use by ISO/IEC
AdaptationFieldControlNo uint8 = 1 // No adaptation_field, payload only
AdaptationFieldControlOnly uint8 = 2 // Adaptation_field only, no payload
@ -113,20 +122,26 @@ const (
// PMT
const (
// -----------------------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-29 Stream type assignments> <page 66/174>
// 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax
// 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video
// -----------------------------------------------------------------------------
streamTypeAAC uint8 = 0x0F
streamTypeAVC uint8 = 0x1B
)
// PES
const (
// -----------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-18-Stream_id assignments> <page 52/174>
streamIDAudio uint8 = 192 // 110x xxxx
// -----------------------------------------------------------------
streamIDAudio uint8 = 192 // 110x xxxx 0xC0
streamIDVideo uint8 = 224 // 1110 xxxx
// ------------------------------
// <iso13818-1.pdf> <page 53/174>
// ------------------------------
PTSDTSFlags0 uint8 = 0 // no PTS no DTS
PTSDTSFlags1 uint8 = 1 // forbidden
PTSDTSFlags2 uint8 = 2 // only PTS
@ -134,17 +149,18 @@ const (
)
const (
pidVideo uint16 = 0x100
delay uint64 = 63000 // 700 ms PCR delay
PidVideo uint16 = 0x100
PidAudio uint16 = 0x101
delay uint64 = 63000 // 700 ms PCR delay TODO chef: 具体作用?
// TODO chef 这些在配置项中提供
outPath = "/tmp/lal/hls/" // 切片文件输出目录
fraglen = 5000 // 单个TS时长单位毫秒
maxfraglen = fraglen * 90 * 10 // 单个fragment超过这个时长强制切割新的fragment单位毫秒 * 90
negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了比当前fragment的首个时间戳还小强制切割新的fragment单位毫秒 * 90
playlen = 30000 // m3u8列表时长
winfrags = playlen / fraglen // 多少个TS文件
maxAudioDelay = 300
audioBufSize = 1024 * 1024
Sync = 2
outPath = "/tmp/lal/hls/" // 切片文件输出目录
fraglen = 5000 // 单个TS时长单位毫秒
playlen = 30000 // m3u8列表时长
maxfraglen = fraglen * 90 * 10 // 单个fragment超过这个时长强制切割新的fragment单位毫秒 * 90
negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了比当前fragment的首个时间戳还小强制切割新的fragment单位毫秒 * 90
winfrags = playlen / fraglen // 多少个TS文件
maxAudioDelay uint64 = 300 // 单位毫秒
audioBufSize = 1024 * 1024
Sync = 2
)

@ -16,13 +16,13 @@ import (
)
func TestParseFixedTSPacket(t *testing.T) {
h := hls.ParseTSPacketHeader(hls.FixedTSHeader)
h := hls.ParseTSPacketHeader(hls.FixedFragmentHeader)
nazalog.Debugf("%+v", h)
pat := hls.ParsePAT(hls.FixedTSHeader[5:])
pat := hls.ParsePAT(hls.FixedFragmentHeader[5:])
nazalog.Debugf("%+v", pat)
h = hls.ParseTSPacketHeader(hls.FixedTSHeader[188:])
h = hls.ParseTSPacketHeader(hls.FixedFragmentHeader[188:])
nazalog.Debugf("%+v", h)
pmt := hls.ParsePMT(hls.FixedTSHeader[188+5:])
pmt := hls.ParsePMT(hls.FixedFragmentHeader[188+5:])
nazalog.Debugf("%+v", pmt)
}

@ -28,12 +28,12 @@ type MPEGTSFrame struct {
func mpegtsOpenFile(filename string) *os.File {
fp, err := os.Create(filename)
nazalog.Assert(nil, err)
mpegtsWriteFile(fp, FixedTSHeader)
mpegtsWriteFile(fp, FixedFragmentHeader)
return fp
}
func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
nazalog.Debugf("mpegts: write frame. %+v, size=%d", frame, len(b))
//nazalog.Debugf("mpegts: pid=%d, sid=%d, pts=%d, dts=%d, key=%b, size=%d", frame.pid, frame.sid, frame.pts, frame.dts, frame.key, len(b))
wpos := 0 // 当前packet的写入位置
lpos := 0 // 当前帧的处理位置
@ -111,7 +111,6 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
// PES_CRC_flag 0
// PES_extension_flag 0
// PES_header_data_length
nazalog.Debug("write PES.")
packet[wpos] = 0x00 // packet_start_code_prefix
packet[wpos+1] = 0x00 //
packet[wpos+2] = 0x01 //
@ -148,6 +147,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
if frame.pts != frame.dts {
mpegtsWritePTS(packet[wpos:], 1, frame.dts+delay)
wpos += 5
//nazalog.Debugf("%d %d", (frame.pts)/90, (frame.dts)/90)
}
first = false
@ -239,16 +239,8 @@ func mpegtsWritePTS(out []byte, fb uint8, pts uint64) {
out[4] = uint8(val)
}
//var debugCount int
func mpegtsWriteFile(fp *os.File, b []byte) {
//nazalog.Debugf("(%d) mpegts: write %d bytes.", debugCount, len(b))
//debugCount++
//if debugCount == 60 {
// nazalog.Debugf("%s", hex.Dump(b))
//}
_, _ = fp.Write(b)
_ = fp.Sync()
}
func mpegtsCloseFile(fp *os.File) {

@ -12,6 +12,7 @@ import (
"github.com/q191201771/naza/pkg/nazabits"
)
// ---------------------------------------------------------------------------------------------------
// Program association section
// <iso13818-1.pdf> <2.4.4.3> <page 61/174>
// table_id [8b] *
@ -31,6 +32,7 @@ import (
// program_map_PID [13b] ** if program_number == 0 then network_PID else then program_map_PID
// --------------
// CRC_32 [32b] ****
// ---------------------------------------------------------------------------------------------------
type PAT struct {
tid uint8
ssi uint8
@ -62,9 +64,9 @@ func ParsePAT(b []byte) (pat PAT) {
pat.sn = br.ReadBits8(8)
pat.lsn = br.ReadBits8(8)
len := pat.sl - 9
length := pat.sl - 9
for i := uint16(0); i < len; i += 4 {
for i := uint16(0); i < length; i += 4 {
var ppe PATProgramElement
ppe.pn = br.ReadBits16(16)
br.ReadBits8(3)

@ -12,6 +12,7 @@ import (
"github.com/q191201771/naza/pkg/nazabits"
)
// -----------------------------------------------------------
// <iso13818-1.pdf>
// <2.4.3.6 PES packet> <page 49/174>
// <Table E.1 - PES packet header example> <page 142/174>
@ -33,14 +34,17 @@ import (
// PES_CRC_flag [1b]
// PES_extension_flag [1b] *
// PES_header_data_length [8b] *
// -----------------------------------------------------------
type PES struct {
pscp uint32
sid uint8
ppl uint16
pad1 uint8
pdf uint8
pad2 uint8
phdl uint8
pscp uint32
sid uint8
ppl uint16
pad1 uint8
ptsDtsFlag uint8
pad2 uint8
phdl uint8
pts uint64
dts uint64
}
func ParsePES(b []byte) (pes PES, length int) {
@ -50,12 +54,33 @@ func ParsePES(b []byte) (pes PES, length int) {
pes.ppl = br.ReadBits16(16)
pes.pad1 = br.ReadBits8(8)
pes.pdf = br.ReadBits8(2)
pes.ptsDtsFlag = br.ReadBits8(2)
pes.pad2 = br.ReadBits8(6)
pes.phdl = br.ReadBits8(8)
br.ReadBytes(uint(pes.phdl))
length = 9 + int(pes.phdl)
// 处理得不是特别标准
if pes.ptsDtsFlag&0x2 != 0 {
_, pes.pts = readPTS(b[9:])
}
if pes.ptsDtsFlag&0x1 != 0 {
_, pes.dts = readPTS(b[14:])
} else {
pes.dts = pes.pts
}
//pes.pts = (pes.pts - delay) / 90
//pes.dts = (pes.dts - delay) / 90
return
}
// read pts or dts
func readPTS(b []byte) (fb uint8, pts uint64) {
fb = b[0] >> 4
pts |= uint64((b[0]>>1)&0x07) << 30
pts |= (uint64(b[1])<<8 | uint64(b[2])) >> 1 << 15
pts |= (uint64(b[3])<<8 | uint64(b[4])) >> 1
return
}

@ -13,6 +13,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)
// ----------------------------------------
// Program Map Table
// <iso13818-1.pdf> <2.4.4.8> <page 64/174>
// table_id [8b] *
@ -38,6 +39,7 @@ import (
// ES_info_length_length [12b] **
// --------------
// CRC32 [32b] ****
// ----------------------------------------
type PMT struct {
tid uint8
ssi uint8

@ -9,6 +9,7 @@
package hls
import (
"bytes"
"encoding/hex"
"fmt"
"os"
@ -25,37 +26,48 @@ type Frag struct {
keyID uint64
duration float64 // 当前fragment中数据的时长单位秒
active bool
discont bool
discont bool // #EXT-X-DISCONTINUITY
}
type Session struct {
streamName string
playlistFilename string
playlistFilenameBak string
adts aac.ADTS
//aacSeqHeader []byte
spspps []byte
videoCC uint8
audioCC uint8
opened bool
videoOut []byte // 帧
fp *os.File
fragTS uint64 // 新建立fragment时的时间戳毫秒 * 90
frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
nfrags int // 大序号增长到winfrags后就增长frag
frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []Frag // TS文件的环形队列记录TS的信息比如写M3U8文件时要用 2 * winfrags + 1
aframe []byte
aframeBase uint64 // 上一个音频帧的时间戳
aaframe []byte
//aframeBase uint64 // 上一个音频帧的时间戳
//aframeNum uint64
aframePTS uint64
aframePTS uint64 // 最新音频帧的时间戳
}
func NewSession() *Session {
func NewSession(streamName string) *Session {
playlistFilename := fmt.Sprintf("%s%s.m3u8", outPath, streamName)
playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename)
videoOut := make([]byte, 1024*1024)
aframe := make([]byte, 1024*1024)
videoOut = videoOut[0:0]
frags := make([]Frag, 2*winfrags+1) // TODO chef: 为什么是 * 2 + 1
return &Session{
videoOut: videoOut,
aframe: aframe,
frags: frags,
videoOut: videoOut,
aaframe: nil,
frags: frags,
streamName: streamName,
playlistFilename: playlistFilename,
playlistFilenameBak: playlistFilenameBak,
}
}
@ -63,7 +75,14 @@ func (s *Session) Start() {
}
func (s *Session) Stop() {
s.flushAudio()
s.closeFragment()
}
func (s *Session) FeedRTMPMessage(msg rtmp.AVMsg) {
// TODO chef: to be continued
// HLS还没有开发完
return
switch msg.Header.MsgTypeID {
case rtmp.TypeidAudio:
@ -73,18 +92,7 @@ func (s *Session) FeedRTMPMessage(msg rtmp.AVMsg) {
}
}
func (s *Session) Stop() {
}
//var debugCount int
func (s *Session) feedVideo(msg rtmp.AVMsg) {
//if debugCount == 3 {
// //os.Exit(0)
//}
//debugCount++
if msg.Payload[0]&0xF != 7 {
// TODO chef: HLS视频现在只做了h264的支持
return
@ -110,7 +118,7 @@ func (s *Session) feedVideo(msg rtmp.AVMsg) {
srcNalType := msg.Payload[i]
nalType := srcNalType & 0x1F
nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts)
//nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts)
if nalType >= 7 && nalType <= 9 {
nazalog.Warn("should not reach here.")
@ -153,14 +161,19 @@ func (s *Session) feedVideo(msg rtmp.AVMsg) {
frame.cc = s.videoCC
frame.dts = uint64(msg.Header.TimestampAbs) * 90
frame.pts = frame.dts + uint64(cts)*90
frame.pid = pidVideo
frame.pid = PidVideo
frame.sid = streamIDVideo
frame.key = ftype == 1
//boundary := frame.key && (true || !s.opened)
boundary := frame.key
boundary := frame.key && (!s.opened || s.adts.IsNil() || s.aaframe != nil)
s.updateFragment(frame.dts, boundary, 1)
if !s.opened {
nazalog.Warn("not opened.")
return
}
mpegtsWriteFrame(s.fp, &frame, out)
s.videoCC = frame.cc
}
@ -180,11 +193,14 @@ func (s *Session) feedAudio(msg rtmp.AVMsg) {
s.updateFragment(pts, s.spspps == nil, 2)
if s.aaframe == nil {
s.aframePTS = pts
}
adtsHeader := s.adts.GetADTS(uint16(msg.Header.MsgLen))
s.aframe = append(s.aframe, adtsHeader...)
s.aframe = append(s.aframe, msg.Payload...)
s.aaframe = append(s.aaframe, adtsHeader...)
s.aaframe = append(s.aaframe, msg.Payload[2:]...)
s.aframePTS = pts
}
func (s *Session) cacheAACSeqHeader(msg rtmp.AVMsg) {
@ -230,6 +246,7 @@ func (s *Session) updateFragment(ts uint64, boundary bool, flushRate int) {
if s.opened {
f = s.getFrag(s.nfrags)
// 当前时间戳跳跃很大或者是往回跳跃超过了阈值强制开启新的fragment
if (ts > s.fragTS && ts-s.fragTS > maxfraglen) || (s.fragTS > ts && s.fragTS-ts > negMaxfraglen) {
nazalog.Warnf("hls: force fragment split: fragTS=%d, ts=%d", s.fragTS, ts)
force = true
@ -240,13 +257,24 @@ func (s *Session) updateFragment(ts uint64, boundary bool, flushRate int) {
}
}
// 时长超过设置的ts文件切片阈值才行
if f != nil && f.duration < fraglen/float64(1000) {
boundary = false
}
// 开启新的fragment
if boundary || force {
s.closeFragment()
s.openFragment(ts, discont)
}
// 音频已经缓存了一定时长的数据了,需要落盘了
//nazalog.Debugf("CHEFERASEME 05191839, flush_rate=%d, size=%d, aframe_pts=%d, ts=%d",
// flushRate, len(s.aaframe), s.aframePTS, ts)
if s.opened && s.aaframe != nil && ((s.aframePTS + maxAudioDelay*90/uint64(flushRate)) < ts) {
//nazalog.Debugf("CHEFERASEME 05191839.")
s.flushAudio()
}
}
func (s *Session) openFragment(ts uint64, discont bool) {
@ -257,29 +285,70 @@ func (s *Session) openFragment(ts uint64, discont bool) {
s.ensureDir()
id := s.getFragmentID()
filename := fmt.Sprintf("%s%d.ts", outPath, id)
filename := fmt.Sprintf("%s%s-%d.ts", outPath, s.streamName, id)
s.fp = mpegtsOpenFile(filename)
s.opened = true
frag := s.getFrag(s.nfrags)
frag.active = true
frag.discont = discont
frag.id = uint64(id)
s.fragTS = ts
s.flushAudio()
}
func (s *Session) closeFragment() {
if !s.opened {
// TODO chef: 关注下是否有这种情况
nazalog.Assert(true, s.opened)
return
}
mpegtsCloseFile(s.fp)
s.opened = false
s.nextFrag()
s.writePlaylist()
s.opened = false
}
func (s *Session) writePlaylist() {
// to be continued
fp, err := os.Create(s.playlistFilenameBak)
nazalog.Assert(nil, err)
// 找出时长最长的fragment
maxFrag := float64(fraglen / 1000)
for i := 0; i < s.nfrags; i++ {
frag := s.getFrag(i)
if frag.duration > maxFrag {
maxFrag = frag.duration + 0.5
}
}
// TODO chef 优化这块buffer的构造
var buf bytes.Buffer
buf.WriteString("#EXTM3U\n")
buf.WriteString("#EXT-X-VERSION:3\n")
buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n", s.frag))
buf.WriteString(fmt.Sprintf("#EXT-X-TARGETRATION:%d\n", int(maxFrag)))
for i := 0; i < s.nfrags; i++ {
frag := s.getFrag(i)
if frag.discont {
buf.WriteString("#EXT-X-DISCONTINUITY\n")
}
buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s-%d.ts\n", frag.duration, s.streamName, frag.id))
}
_, err = fp.Write(buf.Bytes())
nazalog.Assert(nil, err)
_ = fp.Close()
err = os.Rename(s.playlistFilenameBak, s.playlistFilename)
nazalog.Assert(nil, err)
}
func (s *Session) ensureDir() {
@ -288,7 +357,7 @@ func (s *Session) ensureDir() {
}
func (s *Session) getFragmentID() int {
return s.frag
return s.frag + s.nfrags
}
func (s *Session) getFrag(n int) *Frag {
@ -304,6 +373,33 @@ func (s *Session) nextFrag() {
}
}
//
// ngx_rtmp_hls_next_frag() 如果nfrags达到了winfrags则递增frag否则递增nfrags
// 关闭fragment时调用
// 将音频数据落盘的几种情况:
// 1. open fragment时如果aframe中还有数据
// 2. update fragment时判断音频的时间戳
// 3. 音频队列长度过长时
// 4. 流关闭时
func (s *Session) flushAudio() {
if !s.opened {
nazalog.Warn("flushAudio by not opened.")
return
}
if s.aaframe == nil {
nazalog.Warn("flushAudio by aframe is nil.")
return
}
frame := &MPEGTSFrame{
pts: s.aframePTS,
dts: s.aframePTS,
pid: PidAudio,
sid: streamIDAudio,
cc: s.audioCC,
key: false,
}
mpegtsWriteFrame(s.fp, frame, s.aaframe)
s.audioCC = frame.cc
s.aaframe = nil
}

@ -84,7 +84,7 @@ func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool {
}
group.pubSession = session
group.hlsSession = hls.NewSession()
group.hlsSession = hls.NewSession(group.streamName)
group.hlsSession.Start()
group.mutex.Unlock()

Loading…
Cancel
Save