- [refactor] 规范所有导出符号的注释 - [refactor] base.ReadableNowTime()

pull/134/head
q191201771 3 years ago
parent 6084827c00
commit abf50b107d

@ -45,7 +45,7 @@ func ParseSps(payload []byte, ctx *Context) error {
return nil
}
// 尝试解析PPS所有字段实验中请勿直接使用该函数
// TryParsePps 尝试解析PPS所有字段实验中请勿直接使用该函数
func TryParsePps(payload []byte) error {
// ISO-14496-10.pdf
// 7.3.2.2 Picture parameter set RBSP syntax
@ -54,7 +54,7 @@ func TryParsePps(payload []byte) error {
return nil
}
// 尝试解析SeqHeader所有字段实验中请勿直接使用该函数
// TryParseSeqHeader 尝试解析SeqHeader所有字段实验中请勿直接使用该函数
//
// @param <payload> rtmp message的payload部分或者flv tag的payload部分
// 注意包含了头部2字节类型以及3字节的cts

@ -22,9 +22,7 @@ import (
var startTime string
// ReadableNowTime
//
// TODO(chef): refactor 使用ReadableNowTime
// ReadableNowTime 当前时间,可读字符串形式
//
func ReadableNowTime() string {
return time.Now().Format("2006-01-02 15:04:05.999")

@ -11,7 +11,6 @@ package base
import (
"net"
"strings"
"time"
"github.com/q191201771/naza/pkg/connection"
)
@ -43,7 +42,7 @@ func NewHttpSubSession(option HttpSubSessionOption) *HttpSubSession {
stat: StatSession{
Protocol: option.Protocol,
SessionId: option.Uk,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
StartTime: ReadableNowTime(),
RemoteAddr: option.Conn.RemoteAddr().String(),
},
}

@ -9,7 +9,7 @@
package base
const (
// spec-rtmp_specification_1.0.pdf
// RtmpTypeIdAudio spec-rtmp_specification_1.0.pdf
// 7.1. Types of Messages
RtmpTypeIdAudio uint8 = 8
RtmpTypeIdVideo uint8 = 9
@ -32,7 +32,7 @@ const (
RtmpUserControlPingRequest uint8 = 6
RtmpUserControlPingResponse uint8 = 7
// spec-video_file_format_spec_v10.pdf
// RtmpFrameTypeKey spec-video_file_format_spec_v10.pdf
// Video tags
// VIDEODATA
// FrameType UB[4]
@ -63,7 +63,7 @@ const (
RtmpAvcInterFrame = RtmpFrameTypeInter<<4 | RtmpCodecIdAvc
RtmpHevcInterFrame = RtmpFrameTypeInter<<4 | RtmpCodecIdHevc
// spec-video_file_format_spec_v10.pdf
// RtmpSoundFormatAac spec-video_file_format_spec_v10.pdf
// Audio tags
// AUDIODATA
// SoundFormat UB[4]

@ -9,7 +9,7 @@
package base
const (
// 注意一般情况下AVC使用96AAC使用97HEVC使用98
// RtpPacketTypeAvcOrHevc 注意一般情况下AVC使用96AAC使用97HEVC使用98
// 但是我还遇到过:
// HEVC使用96
// AVC使用105

@ -9,14 +9,14 @@
package base
const (
// StatGroup.AudioCodec
// AudioCodecAac StatGroup.AudioCodec
AudioCodecAac = "AAC"
// StatGroup.VideoCodec
// VideoCodecAvc StatGroup.VideoCodec
VideoCodecAvc = "H264"
VideoCodecHevc = "H265"
// StatSession.Protocol
// ProtocolRtmp StatSession.Protocol
ProtocolRtmp = "RTMP"
ProtocolRtsp = "RTSP"
ProtocolHttpflv = "HTTP-FLV"

@ -15,7 +15,7 @@ import "strings"
// 另外,我们也在本文件提供另外一些信息
// 并且将这些信息打入可执行文件、日志、各协议中的标准版本字段中
// 版本,该变量由外部脚本修改维护
// LalVersion 版本,该变量由外部脚本修改维护
const LalVersion = "v0.27.1"
var (
@ -24,57 +24,57 @@ var (
LalGithubSite = "https://github.com/q191201771/lal"
LalDocSite = "https://pengrl.com/lal"
// e.g. lal v0.12.3 (github.com/q191201771/lal)
// LalFullInfo e.g. lal v0.12.3 (github.com/q191201771/lal)
LalFullInfo = LalLibraryName + " " + LalVersion + " (" + LalGithubRepo + ")"
// e.g. 0.12.3
// LalVersionDot e.g. 0.12.3
LalVersionDot string
// e.g. 0,12,3
// LalVersionComma e.g. 0,12,3
LalVersionComma string
)
var (
// 植入rtmp握手随机字符串中
// LalRtmpHandshakeWaterMark 植入rtmp握手随机字符串中
// e.g. lal v0.12.3 (github.com/q191201771/lal)
LalRtmpHandshakeWaterMark string
// 植入rtmp server中的connect result信令中
// LalRtmpConnectResultVersion 植入rtmp server中的connect result信令中
// 注意有两个object第一个object中的fmsVer我们保持通用公认的值在第二个object中植入
// e.g. 0,12,3
LalRtmpConnectResultVersion string
// e.g. lal0.12.3
// LalRtmpPushSessionConnectVersion e.g. lal0.12.3
LalRtmpPushSessionConnectVersion string
// e.g. lal0.12.3
// LalRtmpBuildMetadataEncoder e.g. lal0.12.3
LalRtmpBuildMetadataEncoder string
// e.g. lal/0.12.3
// LalHttpflvPullSessionUa e.g. lal/0.12.3
LalHttpflvPullSessionUa string
// e.g. lal0.12.3
// LalHttpflvSubSessionServer e.g. lal0.12.3
LalHttpflvSubSessionServer string
// e.g. lal0.12.3
// LalHlsM3u8Server e.g. lal0.12.3
LalHlsM3u8Server string
// e.g. lal0.12.3
// LalHlsTsServer e.g. lal0.12.3
LalHlsTsServer string
// e.g. lal0.12.3
// LalRtspOptionsResponseServer e.g. lal0.12.3
LalRtspOptionsResponseServer string
// e.g. lal0.12.3
// LalHttptsSubSessionServer e.g. lal0.12.3
LalHttptsSubSessionServer string
// e.g. lal0.12.3
// LalHttpApiServer e.g. lal0.12.3
LalHttpApiServer string
// e.g. lal/0.12.3
// LalRtspPullSessionUa e.g. lal/0.12.3
LalRtspPullSessionUa string
// e.g. lal 0.12.3
// LalPackSdp e.g. lal 0.12.3
LalPackSdp string
)

@ -16,7 +16,7 @@ import (
"github.com/q191201771/naza/pkg/bele"
)
// The WebSocket Protocol
// WsOpcode The WebSocket Protocol
// https://tools.ietf.org/html/rfc6455
//
// 0 1 2 3
@ -59,7 +59,8 @@ const (
Wso_Continuous WsOpcode = iota //连续消息片断
Wso_Text //文本消息片断,
Wso_Binary //二进制消息片断,
//非控制消息片断保留的操作码,
// Wso_Rsv3 非控制消息片断保留的操作码,
Wso_Rsv3
Wso_Rsv4
Wso_Rsv5
@ -68,7 +69,8 @@ const (
Wso_Close //连接关闭,
Wso_Ping //心跳检查的ping,
Wso_Pong //心跳检查的pong,
//为将来的控制消息片断的保留操作码
// Wso_RsvB 为将来的控制消息片断的保留操作码
Wso_RsvB
Wso_RsvC
Wso_RsvD

@ -59,7 +59,7 @@ func updateTargetDurationInM3u8(content []byte, currDuration int) ([]byte, error
return content, nil
}
// @param content 传入m3u8文件内容
// CalcM3u8Duration @param content 传入m3u8文件内容
//
// @return durationSec m3u8中所有ts的时间总和。注意使用的是m3u8文件中描述的ts时间而不是读取ts文件中实际音视频数据的时间。
//

@ -25,7 +25,7 @@ import (
type MuxerObserver interface {
OnPatPmt(b []byte)
// @param rawFrame TS流回调结束后内部不再使用该内存块
// OnTsPackets @param rawFrame TS流回调结束后内部不再使用该内存块
// @param boundary 新的TS流接收者应该从该标志为true时开始发送数据
//
OnTsPackets(rawFrame []byte, boundary bool)
@ -135,7 +135,7 @@ func (m *Muxer) Dispose() {
}
}
// @param msg 函数调用结束后内部不持有msg中的内存块
// FeedRtmpMessage @param msg 函数调用结束后内部不持有msg中的内存块
//
func (m *Muxer) FeedRtmpMessage(msg base.RtmpMsg) {
m.streamer.FeedRtmpMessage(msg)

@ -43,17 +43,17 @@ type IPathRequestStrategy interface {
GetRequestInfo(urlCtx base.UrlContext, rootOutPath string) RequestInfo
}
// 落盘策略
// IPathWriteStrategy 落盘策略
type IPathWriteStrategy interface {
// 获取单个流对应的文件根路径
// GetMuxerOutPath 获取单个流对应的文件根路径
GetMuxerOutPath(rootOutPath string, streamName string) string
// 获取单个流对应的m3u8文件路径
// GetLiveM3u8FileName 获取单个流对应的m3u8文件路径
//
// @param outPath: func GetMuxerOutPath的结果
GetLiveM3u8FileName(outPath string, streamName string) string
// 获取单个流对应的record类型的m3u8文件路径
// GetRecordM3u8FileName 获取单个流对应的record类型的m3u8文件路径
//
// live m3u8和record m3u8的区别
// live记录的是当前最近的可播放内容record记录的是从流开始时的可播放内容
@ -61,12 +61,12 @@ type IPathWriteStrategy interface {
// @param outPath: func GetMuxerOutPath的结果
GetRecordM3u8FileName(outPath string, streamName string) string
// 获取单个流对应的ts文件路径
// GetTsFileNameWithPath 获取单个流对应的ts文件路径
//
// @param outPath: func GetMuxerOutPath的结果
GetTsFileNameWithPath(outPath string, fileName string) string
// ts文件名的生成策略
// GetTsFileName ts文件名的生成策略
GetTsFileName(streamName string, index int, timestamp int) string
}
@ -77,7 +77,7 @@ const (
recordM3u8FileName = "record.m3u8"
)
// 默认的路由,落盘策略
// DefaultPathStrategy 默认的路由,落盘策略
//
// 每个流在<rootPath>下以流名称生成一个子目录,目录下包含:
//
@ -136,7 +136,7 @@ func (dps *DefaultPathStrategy) GetRequestInfo(urlCtx base.UrlContext, rootOutPa
return
}
// <rootOutPath>/<streamName>
// GetMuxerOutPath <rootOutPath>/<streamName>
func (*DefaultPathStrategy) GetMuxerOutPath(rootOutPath string, streamName string) string {
return filepath.Join(rootOutPath, streamName)
}

@ -13,7 +13,7 @@ import (
"github.com/q191201771/lal/pkg/mpegts"
)
// 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式
// Queue 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式
// 一旦判断结束,该队列变成直进直出,不再有实际缓存
type Queue struct {
maxMsgSize int
@ -26,14 +26,14 @@ type Queue struct {
}
type IQueueObserver interface {
// 该回调一定发生在数据回调之前
// OnPatPmt 该回调一定发生在数据回调之前
// TODO(chef) 这里可以考虑换成只通知drain由上层完成FragmentHeader的组装逻辑
OnPatPmt(b []byte)
OnPop(msg base.RtmpMsg)
}
// @param maxMsgSize 最大缓存多少个包
// NewQueue @param maxMsgSize 最大缓存多少个包
func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue {
return &Queue{
maxMsgSize: maxMsgSize,
@ -45,7 +45,7 @@ func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue {
}
}
// @param msg 函数调用结束后,内部不持有该内存块
// Push @param msg 函数调用结束后,内部不持有该内存块
func (q *Queue) Push(msg base.RtmpMsg) {
if q.done {
q.observer.OnPop(msg)

@ -21,10 +21,10 @@ import (
)
type StreamerObserver interface {
// @param b const只读内存块上层可以持有但是不允许修改
// OnPatPmt @param b const只读内存块上层可以持有但是不允许修改
OnPatPmt(b []byte)
// @param streamer: 供上层获取streamer内部的一些状态比如spspps是否已缓存音频缓存队列是否有数据等
// OnFrame @param streamer: 供上层获取streamer内部的一些状态比如spspps是否已缓存音频缓存队列是否有数据等
//
// @param frame: 各字段含义见mpegts.Frame结构体定义
// frame.CC 注意回调结束后Streamer会保存frame.CC上层在TS打包完成后可通过frame.CC将cc值传递给Streamer
@ -33,7 +33,7 @@ type StreamerObserver interface {
OnFrame(streamer *Streamer, frame *mpegts.Frame)
}
// 输入rtmp流回调转封装成Annexb格式的流
// Streamer 输入rtmp流回调转封装成Annexb格式的流
type Streamer struct {
UniqueKey string
@ -61,7 +61,7 @@ func NewStreamer(observer StreamerObserver) *Streamer {
return streamer
}
// @param msg msg.Payload 调用结束后,函数内部不会持有这块内存
// FeedRtmpMessage @param msg msg.Payload 调用结束后,函数内部不会持有这块内存
//
// TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接
func (s *Streamer) FeedRtmpMessage(msg base.RtmpMsg) {
@ -268,7 +268,7 @@ func (s *Streamer) feedAudio(msg base.RtmpMsg) {
s.audioCacheFrames = append(s.audioCacheFrames, msg.Payload[2:]...)
}
// 吐出音频数据的三种情况:
// FlushAudio 吐出音频数据的三种情况:
// 1. 收到音频或视频时,音频缓存队列已达到一定长度
// 2. 打开一个新的TS文件切片时
// 3. 输入流关闭时

@ -68,7 +68,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
return s
}
// @param tag: 底层保证回调上来的Raw数据长度是完整的但是不会分析Raw内部的编码数据
// OnReadFlvTag @param tag: 底层保证回调上来的Raw数据长度是完整的但是不会分析Raw内部的编码数据
type OnReadFlvTag func(tag Tag)
// Pull 阻塞直到和对端完成拉流前,握手部分的工作,或者发生错误
@ -115,32 +115,32 @@ func (session *PullSession) WaitChan() <-chan error {
// ---------------------------------------------------------------------------------------------------------------------
// 文档请参考: interface ISessionUrlContext
// Url 文档请参考: interface ISessionUrlContext
func (session *PullSession) Url() string {
return session.urlCtx.Url
}
// 文档请参考: interface ISessionUrlContext
// AppName 文档请参考: interface ISessionUrlContext
func (session *PullSession) AppName() string {
return session.urlCtx.PathWithoutLastItem
}
// 文档请参考: interface ISessionUrlContext
// StreamName 文档请参考: interface ISessionUrlContext
func (session *PullSession) StreamName() string {
return session.urlCtx.LastItemOfPath
}
// 文档请参考: interface ISessionUrlContext
// RawQuery 文档请参考: interface ISessionUrlContext
func (session *PullSession) RawQuery() string {
return session.urlCtx.RawQuery
}
// 文档请参考: interface IObject
// UniqueKey 文档请参考: interface IObject
func (session *PullSession) UniqueKey() string {
return session.uniqueKey
}
// 文档请参考: interface ISessionStat
// UpdateStat 文档请参考: interface ISessionStat
func (session *PullSession) UpdateStat(intervalSec uint32) {
currStat := session.conn.GetStat()
rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum
@ -151,7 +151,7 @@ func (session *PullSession) UpdateStat(intervalSec uint32) {
session.prevConnStat = currStat
}
// 文档请参考: interface ISessionStat
// GetStat 文档请参考: interface ISessionStat
func (session *PullSession) GetStat() base.StatSession {
connStat := session.conn.GetStat()
session.stat.ReadBytesSum = connStat.ReadBytesSum
@ -159,7 +159,7 @@ func (session *PullSession) GetStat() base.StatSession {
return session.stat
}
// 文档请参考: interface ISessionStat
// IsAlive 文档请参考: interface ISessionStat
func (session *PullSession) IsAlive() (readAlive, writeAlive bool) {
currStat := session.conn.GetStat()
if session.staleStat == nil {

@ -59,7 +59,7 @@ func (f *FlvFilePump) Pump(filename string, onFlvTag OnPumpFlvTag) error {
return f.PumpWithTags(tags, onFlvTag)
}
// @return error 暂时只做预留目前只会返回nil
// PumpWithTags @return error 暂时只做预留目前只会返回nil
//
func (f *FlvFilePump) PumpWithTags(tags []Tag, onFlvTag OnPumpFlvTag) error {
var totalBaseTs uint32 // 整体的基础时间戳。每轮最后更新

@ -26,7 +26,7 @@ type Tag struct {
Raw []byte // 结构为 (11字节的 tag header) + (body) + (4字节的 prev tag size)
}
// 只包含数据部分去除了前面11字节的tag header和后面4字节的prev tag size
// Payload 只包含数据部分去除了前面11字节的tag header和后面4字节的prev tag size
//
func (tag *Tag) Payload() []byte {
return tag.Raw[TagHeaderSize : len(tag.Raw)-PrevTagSizeFieldSize]
@ -52,7 +52,7 @@ func (tag *Tag) IsHevcKeySeqHeader() bool {
return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == HevcKeyFrame && tag.Raw[TagHeaderSize+1] == HevcPacketTypeSeqHeader
}
// AVC或HEVC的seq header
// IsVideoKeySeqHeader AVC或HEVC的seq header
func (tag *Tag) IsVideoKeySeqHeader() bool {
return tag.IsAvcKeySeqHeader() || tag.IsHevcKeySeqHeader()
}
@ -65,7 +65,7 @@ func (tag *Tag) IsHevcKeyNalu() bool {
return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == HevcKeyFrame && tag.Raw[TagHeaderSize+1] == HevcPacketTypeNalu
}
// AVC或HEVC的关键帧
// IsVideoKeyNalu AVC或HEVC的关键帧
func (tag *Tag) IsVideoKeyNalu() bool {
return tag.IsAvcKeyNalu() || tag.IsHevcKeyNalu()
}
@ -87,7 +87,7 @@ func (tag *Tag) ModTagTimestamp(timestamp uint32) {
tag.Raw[7] = byte(timestamp >> 24)
}
// 打包一个序列化后的 tag 二进制buffer包含 tag headerbodyprev tag size
// PackHttpflvTag 打包一个序列化后的 tag 二进制buffer包含 tag headerbodyprev tag size
func PackHttpflvTag(t uint8, timestamp uint32, in []byte) []byte {
out := make([]byte, TagHeaderSize+len(in)+PrevTagSizeFieldSize)
out[0] = t

@ -157,7 +157,7 @@ func (group *Group) RunLoop() {
<-group.exitChan
}
// TODO chef: 传入时间
// Tick TODO chef: 传入时间
// 目前每秒触发一次
func (group *Group) Tick() {
group.mutex.Lock()
@ -253,7 +253,7 @@ func (group *Group) Tick() {
group.tickCount++
}
// 主动释放所有资源。保证所有资源的生命周期逻辑上都在我们的控制中。降低出bug的几率降低心智负担。
// Dispose 主动释放所有资源。保证所有资源的生命周期逻辑上都在我们的控制中。降低出bug的几率降低心智负担。
// 注意Dispose后不应再使用这个对象。
// 值得一提如果是从其他协程回调回来的消息在使用Group中的资源前要判断资源是否存在以及可用。
//
@ -340,7 +340,7 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error {
return nil
}
// TODO chef: rtsp package中增加回调返回值判断如果是false将连接关掉
// AddRtspPubSession TODO chef: rtsp package中增加回调返回值判断如果是false将连接关掉
func (group *Group) AddRtspPubSession(session *rtsp.PubSession) error {
Log.Debugf("[%s] [%s] add RTSP PubSession into group.", group.UniqueKey, session.UniqueKey())
@ -435,7 +435,7 @@ func (group *Group) AddHttpflvSubSession(session *httpflv.SubSession) {
group.pullIfNeeded()
}
// TODO chef:
// AddHttptsSubSession TODO chef:
// 这里应该也要考虑触发hls muxer开启
// 也即HTTPTS sub需要使用hls muxerhls muxer开启和关闭都要考虑HTTPTS sub
func (group *Group) AddHttptsSubSession(session *httpts.SubSession) {
@ -644,7 +644,7 @@ func (group *Group) KickOutSession(sessionId string) bool {
return false
}
// 外部命令主动触发pull拉流
// StartPull 外部命令主动触发pull拉流
//
// 当前调用时机:
// 1. 比如http api
@ -922,14 +922,14 @@ func (group *Group) disposeHlsMuxer() {
// 音视频数据转发、转封装的逻辑
// ---------------------------------------------------------------------------------------------------------------------
// rtmp.PubSession or rtmp.PullSession
// OnReadRtmpAvMsg rtmp.PubSession or rtmp.PullSession
func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.broadcastByRtmpMsg(msg)
}
// rtsp.PubSession
// OnRtpPacket rtsp.PubSession
func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) {
group.mutex.Lock()
defer group.mutex.Unlock()
@ -937,7 +937,7 @@ func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) {
group.onRtpPacket(pkt)
}
// rtsp.PubSession
// OnSdp rtsp.PubSession
func (group *Group) OnSdp(sdpCtx sdp.LogicContext) {
group.mutex.Lock()
defer group.mutex.Unlock()
@ -946,7 +946,7 @@ func (group *Group) OnSdp(sdpCtx sdp.LogicContext) {
group.rtsp2RtmpRemuxer.OnSdp(sdpCtx)
}
// rtsp.PubSession
// OnAvPacket rtsp.PubSession
func (group *Group) OnAvPacket(pkt base.AvPacket) {
group.mutex.Lock()
defer group.mutex.Unlock()
@ -955,7 +955,7 @@ func (group *Group) OnAvPacket(pkt base.AvPacket) {
group.rtsp2RtmpRemuxer.OnAvPacket(pkt)
}
// hls.Muxer
// OnPatPmt hls.Muxer
func (group *Group) OnPatPmt(b []byte) {
group.patpmt = b
@ -966,7 +966,7 @@ func (group *Group) OnPatPmt(b []byte) {
}
}
// hls.Muxer
// OnTsPackets hls.Muxer
func (group *Group) OnTsPackets(rawFrame []byte, boundary bool) {
// 因为最前面Feed时已经加锁了所以这里回调上来就不用加锁了

@ -66,23 +66,3 @@ var defaultOption = Option{
}
type ModOption func(option *Option)
// ---------------------------------------------------------------------------------------------------------------------
// 一些没有放入配置文件中,包级别的配置,暂时没有对外暴露
//
var (
relayPushTimeoutMs = 5000
relayPushWriteAvTimeoutMs = 5000
relayPullTimeoutMs = 5000
relayPullReadAvTimeoutMs = 5000
calcSessionStatIntervalSec uint32 = 5
// checkSessionAliveIntervalSec
//
// - 对于输入型session检查一定时间内是否没有收到数据
// - 对于输出型session检查一定时间内是否没有发送数据
// 注意这里既检查socket发送阻塞又检查上层没有给session喂数据
//
checkSessionAliveIntervalSec uint32 = 10
)

@ -56,7 +56,7 @@ type ServerManager struct {
func NewServerManager(confFile string, modOption ...ModOption) *ServerManager {
sm := &ServerManager{
serverStartTime: time.Now().Format("2006-01-02 15:04:05.999"),
serverStartTime: base.ReadableNowTime(),
exitChan: make(chan struct{}, 1),
}
sm.groupManager = NewSimpleGroupManager(sm)

@ -11,3 +11,19 @@ package logic
import "github.com/q191201771/naza/pkg/nazalog"
var Log = nazalog.GetGlobalLogger()
var (
relayPushTimeoutMs = 5000
relayPushWriteAvTimeoutMs = 5000
relayPullTimeoutMs = 5000
relayPullReadAvTimeoutMs = 5000
calcSessionStatIntervalSec uint32 = 5
// checkSessionAliveIntervalSec
//
// - 对于输入型session检查一定时间内是否没有收到数据
// - 对于输出型session检查一定时间内是否没有发送数据
// 注意这里既检查socket发送阻塞又检查上层没有给session喂数据
//
checkSessionAliveIntervalSec uint32 = 10
)

@ -10,7 +10,7 @@ package mpegts
// MPEG: Moving Picture Experts Group
// 每个TS文件都以固定的PATPMT开始
// FixedFragmentHeader 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeader = []byte{
/* TS */
0x47, 0x40, 0x00, 0x10, 0x00,
@ -70,7 +70,7 @@ var FixedFragmentHeader = []byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
// 每个TS文件都以固定的PATPMT开始
// FixedFragmentHeaderHevc 每个TS文件都以固定的PATPMT开始
var FixedFragmentHeaderHevc = []byte{
/* TS */
0x47, 0x40, 0x00, 0x10, 0x00,
@ -140,7 +140,7 @@ const (
PidVideo uint16 = 0x100
PidAudio uint16 = 0x101
// ------------------------------------------
// AdaptationFieldControlReserved ------------------------------------------
// <iso13818-1.pdf> <Table 2-5> <page 38/174>
// ------------------------------------------
AdaptationFieldControlReserved uint8 = 0 // Reserved for future use by ISO/IEC
@ -164,13 +164,13 @@ const (
// PES
const (
// -----------------------------------------------------------------
// StreamIdAudio -----------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-18-Stream_id assignments> <page 52/174>
// -----------------------------------------------------------------
StreamIdAudio uint8 = 192 // 110x xxxx 0xC0
StreamIdVideo uint8 = 224 // 1110 xxxx
// ------------------------------
// PtsDtsFlags0 ------------------------------
// <iso13818-1.pdf> <page 53/174>
// ------------------------------
PtsDtsFlags0 uint8 = 0 // no PTS no DTS

@ -8,6 +8,8 @@
package mpegts
// Frame 帧数据
//
type Frame struct {
Pts uint64 // =(毫秒 * 90)
Dts uint64
@ -32,11 +34,11 @@ type Frame struct {
Raw []byte
}
// @param packet: 188字节大小的TS包注意一次Pack对应的多个TSPacket复用的是一块内存
// OnTsPacket @param packet: 188字节大小的TS包注意一次Pack对应的多个TSPacket复用的是一块内存
//
type OnTsPacket func(packet []byte)
// Annexb格式的流转换为mpegts packet
// PackTsPacket Annexb格式的流转换为mpegts packet
//
// @param frame: 各字段含义见mpegts.Frame结构体定义
// frame.CC 注意内部会修改frame.CC的值外部在调用结束后可保存CC的值供下次调用时使用

@ -12,7 +12,7 @@ import (
"github.com/q191201771/naza/pkg/nazabits"
)
// ---------------------------------------------------------------------------------------------------
// Pat ---------------------------------------------------------------------------------------------------
// Program association section
// <iso13818-1.pdf> <2.4.4.3> <page 61/174>
// table_id [8b] *

@ -12,7 +12,7 @@ import (
"github.com/q191201771/naza/pkg/nazabits"
)
// -----------------------------------------------------------
// Pes -----------------------------------------------------------
// <iso13818-1.pdf>
// <2.4.3.6 PES packet> <page 49/174>
// <Table E.1 - PES packet header example> <page 142/174>

@ -12,7 +12,7 @@ import (
"github.com/q191201771/naza/pkg/nazabits"
)
// ----------------------------------------
// Pmt ----------------------------------------
// Program Map Table
// <iso13818-1.pdf> <2.4.4.8> <page 64/174>
// table_id [8b] *

@ -12,7 +12,7 @@ import (
"github.com/q191201771/naza/pkg/nazabits"
)
// ------------------------------------------------
// TsPacketHeader ------------------------------------------------
// <iso13818-1.pdf> <2.4.3.2> <page 36/174>
// sync_byte [8b] * always 0x47
// transport_error_indicator [1b]
@ -34,7 +34,7 @@ type TsPacketHeader struct {
Cc uint8
}
// ----------------------------------------------------------
// TsPacketAdaptation ----------------------------------------------------------
// <iso13818-1.pdf> <Table 2-6> <page 40/174>
// adaptation_field_length [8b] * 不包括自己这1字节
// discontinuity_indicator [1b]
@ -54,7 +54,7 @@ type TsPacketAdaptation struct {
Length uint8
}
// 解析4字节TS Packet header
// ParseTsPacketHeader 解析4字节TS Packet header
func ParseTsPacketHeader(b []byte) (h TsPacketHeader) {
// TODO chef: 检查长度
br := nazabits.NewBitReader(b)
@ -69,7 +69,7 @@ func ParseTsPacketHeader(b []byte) (h TsPacketHeader) {
return
}
// TODO chef
// ParseTsPacketAdaptation TODO chef
func ParseTsPacketAdaptation(b []byte) (f TsPacketAdaptation) {
br := nazabits.NewBitReader(b)
f.Length, _ = br.ReadBits8(8)

@ -19,7 +19,7 @@ import (
"github.com/q191201771/naza/pkg/bele"
)
// AvPacket转换为RTMP
// AvPacket2RtmpRemuxer AvPacket转换为RTMP
// 目前AvPacket来自RTSP的sdp以及rtp的合帧包。理论上也支持webrtc后续接入webrtc时再验证
//
type AvPacket2RtmpRemuxer struct {
@ -42,7 +42,7 @@ func NewAvPacket2RtmpRemuxer(onRtmpAvMsg rtmp.OnReadRtmpAvMsg) *AvPacket2RtmpRem
}
}
// 实现RTSP回调数据的三个接口使得接入时方便些
// OnRtpPacket 实现RTSP回调数据的三个接口使得接入时方便些
func (r *AvPacket2RtmpRemuxer) OnRtpPacket(pkt rtprtcp.RtpPacket) {
// noop
}
@ -53,7 +53,7 @@ func (r *AvPacket2RtmpRemuxer) OnAvPacket(pkt base.AvPacket) {
r.FeedAvPacket(pkt)
}
// rtsp场景下有时sps、pps等信息只包含在sdp中有时包含在rtp包中
// InitWithAvConfig rtsp场景下有时sps、pps等信息只包含在sdp中有时包含在rtp包中
// 这里提供输入sdp的sps、pps等信息的机会如果没有可以不调用
//
// 内部不持有输入参数的内存块
@ -112,7 +112,7 @@ func (r *AvPacket2RtmpRemuxer) InitWithAvConfig(asc, vps, sps, pps []byte) {
}
}
// @param pkt: 内部不持有该内存块
// FeedAvPacket @param pkt: 内部不持有该内存块
//
func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) {
switch pkt.PayloadType {

@ -30,7 +30,7 @@ func FlvTagHeader2RtmpHeader(in httpflv.TagHeader) (out base.RtmpHeader) {
return
}
// @return msg: 返回的内存块引用参数`tag`的内存块
// FlvTag2RtmpMsg @return msg: 返回的内存块引用参数`tag`的内存块
//
func FlvTag2RtmpMsg(tag httpflv.Tag) (msg base.RtmpMsg) {
msg.Header = FlvTagHeader2RtmpHeader(tag.Header)
@ -38,7 +38,7 @@ func FlvTag2RtmpMsg(tag httpflv.Tag) (msg base.RtmpMsg) {
return
}
// @return 返回的内存块为内部新申请
// FlvTag2RtmpChunks @return 返回的内存块为内部新申请
//
func FlvTag2RtmpChunks(tag httpflv.Tag) []byte {
rtmpHeader := FlvTagHeader2RtmpHeader(tag.Header)

@ -65,7 +65,7 @@ type GopCache struct {
gopSize int
}
// @param gopNum: gop缓存大小
// NewGopCache @param gopNum: gop缓存大小
// 如果为0则不缓存音频数据也即GOP缓存功能不生效
// 如果>0则缓存<gopNum>个完整GOP另外还可能有半个最近不完整的GOP
//
@ -111,7 +111,7 @@ func (gc *GopCache) Feed(msg base.RtmpMsg, lg LazyGet) {
}
}
// 获取GOP数量注意最后一个可能是不完整的
// GetGopCount 获取GOP数量注意最后一个可能是不完整的
func (gc *GopCache) GetGopCount() int {
return (gc.gopRingLast + gc.gopSize - gc.gopRingFirst) % gc.gopSize
}

@ -13,7 +13,7 @@ import (
"github.com/q191201771/lal/pkg/httpflv"
)
// @return 返回的内存块为新申请的独立内存块
// RtmpMsg2FlvTag @return 返回的内存块为新申请的独立内存块
func RtmpMsg2FlvTag(msg base.RtmpMsg) *httpflv.Tag {
var tag httpflv.Tag
tag.Header.Type = msg.Header.MsgTypeId

@ -29,7 +29,7 @@ var (
maxAnalyzeAvMsgSize = 16
)
// 提供rtmp数据向sdp+rtp数据的转换
// Rtmp2RtspRemuxer 提供rtmp数据向sdp+rtp数据的转换
type Rtmp2RtspRemuxer struct {
onSdp OnSdp
onRtpPacket OnRtpPacket
@ -49,7 +49,7 @@ type Rtmp2RtspRemuxer struct {
type OnSdp func(sdpCtx sdp.LogicContext)
type OnRtpPacket func(pkt rtprtcp.RtpPacket)
// @param onSdp: 每次回调为独立的内存块,回调结束后,内部不再使用该内存块
// NewRtmp2RtspRemuxer @param onSdp: 每次回调为独立的内存块,回调结束后,内部不再使用该内存块
// @param onRtpPacket: 每次回调为独立的内存块,回调结束后,内部不再使用该内存块
//
func NewRtmp2RtspRemuxer(onSdp OnSdp, onRtpPacket OnRtpPacket) *Rtmp2RtspRemuxer {
@ -61,7 +61,7 @@ func NewRtmp2RtspRemuxer(onSdp OnSdp, onRtpPacket OnRtpPacket) *Rtmp2RtspRemuxer
}
}
// @param msg: 函数调用结束后,内部不持有`msg`内存块
// FeedRtmpMsg @param msg: 函数调用结束后,内部不持有`msg`内存块
//
func (r *Rtmp2RtspRemuxer) FeedRtmpMsg(msg base.RtmpMsg) {
var err error

@ -23,12 +23,12 @@ var defaultChunkDivider = ChunkDivider{
localChunkSize: LocalChunkSize,
}
// @return 返回的内存块由内部申请,不依赖参数<message>内存块
// Message2Chunks @return 返回的内存块由内部申请,不依赖参数<message>内存块
func Message2Chunks(message []byte, header *base.RtmpHeader) []byte {
return defaultChunkDivider.Message2Chunks(message, header)
}
// TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message
// Message2Chunks TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message
func (d *ChunkDivider) Message2Chunks(message []byte, header *base.RtmpHeader) []byte {
return message2Chunks(message, header, nil, d.localChunkSize)
}

@ -53,7 +53,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
}
}
// 阻塞直到和对端完成拉流前的所有准备工作也即收到RTMP Play response或者发生错误
// Pull 阻塞直到和对端完成拉流前的所有准备工作也即收到RTMP Play response或者发生错误
//
// @param onReadRtmpAvMsg: msg: 注意,回调结束后,`msg`的内存块会被`PullSession`重复使用
// 也即多次回调的`msg`是复用的同一块内存块
@ -83,42 +83,42 @@ func (s *PullSession) WaitChan() <-chan error {
// ---------------------------------------------------------------------------------------------------------------------
// 文档请参考: interface ISessionUrlContext
// Url 文档请参考: interface ISessionUrlContext
func (s *PullSession) Url() string {
return s.core.Url()
}
// 文档请参考: interface ISessionUrlContext
// AppName 文档请参考: interface ISessionUrlContext
func (s *PullSession) AppName() string {
return s.core.AppName()
}
// 文档请参考: interface ISessionUrlContext
// StreamName 文档请参考: interface ISessionUrlContext
func (s *PullSession) StreamName() string {
return s.core.StreamName()
}
// 文档请参考: interface ISessionUrlContext
// RawQuery 文档请参考: interface ISessionUrlContext
func (s *PullSession) RawQuery() string {
return s.core.RawQuery()
}
// 文档请参考: interface IObject
// UniqueKey 文档请参考: interface IObject
func (s *PullSession) UniqueKey() string {
return s.core.uniqueKey
}
// 文档请参考: interface ISessionStat
// GetStat 文档请参考: interface ISessionStat
func (s *PullSession) GetStat() base.StatSession {
return s.core.GetStat()
}
// 文档请参考: interface ISessionStat
// UpdateStat 文档请参考: interface ISessionStat
func (s *PullSession) UpdateStat(intervalSec uint32) {
s.core.UpdateStat(intervalSec)
}
// 文档请参考: interface ISessionStat
// IsAlive 文档请参考: interface ISessionStat
func (s *PullSession) IsAlive() (readAlive, writeAlive bool) {
return s.core.IsAlive()
}

@ -54,7 +54,7 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession {
}
}
// 阻塞直到和对端完成推流前握手部分的工作也即收到RTMP Publish response或者发生错误
// Push 阻塞直到和对端完成推流前握手部分的工作也即收到RTMP Publish response或者发生错误
func (s *PushSession) Push(rawUrl string) error {
return s.core.Do(rawUrl)
}
@ -65,7 +65,7 @@ func (s *PushSession) Write(msg []byte) error {
return s.core.Write(msg)
}
// 将缓存的数据立即刷新发送
// Flush 将缓存的数据立即刷新发送
// 是否有缓存策略,请参见配置及内部实现
func (s *PushSession) Flush() error {
return s.core.Flush()
@ -89,42 +89,42 @@ func (s *PushSession) WaitChan() <-chan error {
// ---------------------------------------------------------------------------------------------------------------------
// 文档请参考: interface ISessionUrlContext
// Url 文档请参考: interface ISessionUrlContext
func (s *PushSession) Url() string {
return s.core.Url()
}
// 文档请参考: interface ISessionUrlContext
// AppName 文档请参考: interface ISessionUrlContext
func (s *PushSession) AppName() string {
return s.core.AppName()
}
// 文档请参考: interface ISessionUrlContext
// StreamName 文档请参考: interface ISessionUrlContext
func (s *PushSession) StreamName() string {
return s.core.StreamName()
}
// 文档请参考: interface ISessionUrlContext
// RawQuery 文档请参考: interface ISessionUrlContext
func (s *PushSession) RawQuery() string {
return s.core.RawQuery()
}
// 文档请参考: interface IObject
// UniqueKey 文档请参考: interface IObject
func (s *PushSession) UniqueKey() string {
return s.core.uniqueKey
}
// 文档请参考: interface ISessionStat
// GetStat 文档请参考: interface ISessionStat
func (s *PushSession) GetStat() base.StatSession {
return s.core.GetStat()
}
// 文档请参考: interface ISessionStat
// UpdateStat 文档请参考: interface ISessionStat
func (s *PushSession) UpdateStat(intervalSec uint32) {
s.core.UpdateStat(intervalSec)
}
// 文档请参考: interface ISessionStat
// IsAlive 文档请参考: interface ISessionStat
func (s *PushSession) IsAlive() (readAlive, writeAlive bool) {
return s.core.IsAlive()
}

@ -22,7 +22,7 @@ import (
"github.com/q191201771/naza/pkg/connection"
)
// rtmp 客户端类型连接的底层实现
// ClientSession rtmp 客户端类型连接的底层实现
// package rtmp 的使用者应该优先使用基于 ClientSession 实现的 PushSession 和 PullSession
type ClientSession struct {
uniqueKey string
@ -84,7 +84,7 @@ var defaultClientSessOption = ClientSessionOption{
type ModClientSessionOption func(option *ClientSessionOption)
// @param t: session的类型只能是推或者拉
// NewClientSession @param t: session的类型只能是推或者拉
func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession {
var uk string
switch t {
@ -116,7 +116,7 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
stat: base.StatSession{
Protocol: base.ProtocolRtmp,
SessionId: uk,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
StartTime: base.ReadableNowTime(),
},
debugLogReadUserCtrlMsgMax: 5,
hc: hc,
@ -125,7 +125,7 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption)
return s
}
// 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误
// Do 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误
func (s *ClientSession) Do(rawUrl string) error {
Log.Debugf("[%s] Do. url=%s", s.uniqueKey, rawUrl)

@ -23,7 +23,7 @@ const (
peerBandwidthLimitTypeDynamic = uint8(2)
)
// 打包并发送 rtmp 信令
// MessagePacker 打包并发送 rtmp 信令
//
type MessagePacker struct {
b *Buffer

@ -32,7 +32,7 @@ func ParseMetadata(b []byte) (ObjectPairArray, error) {
return opa, err
}
// spec-video_file_format_spec_v10.pdf
// BuildMetadata spec-video_file_format_spec_v10.pdf
// onMetaData
// - duration DOUBLE, seconds
// - width DOUBLE

@ -38,6 +38,7 @@ const defaultChunkSize = 128 // 未收到对端设置chunk size时的默认值
const (
//MSID0 = 0 // 所有除 publish、play、onStatus 之外的信令
Msid1 = 1 // publish、play、onStatus 以及 音视频数据
)

@ -13,7 +13,6 @@ import (
"net"
"strings"
"sync"
"time"
"github.com/q191201771/naza/pkg/nazaerrors"
@ -40,7 +39,7 @@ type ServerSessionObserver interface {
}
type PubSessionObserver interface {
// 注意回调结束后内部会复用Payload内存块
// OnReadRtmpAvMsg 注意回调结束后内部会复用Payload内存块
OnReadRtmpAvMsg(msg base.RtmpMsg)
}
@ -97,7 +96,7 @@ func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSess
stat: base.StatSession{
Protocol: base.ProtocolRtmp,
SessionId: uk,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
StartTime: base.ReadableNowTime(),
RemoteAddr: conn.RemoteAddr().String(),
},
uniqueKey: uk,

@ -57,7 +57,7 @@ type StreamMsg struct {
buff *nazabytes.Buffer
}
// 确保可写空间,如果不够会扩容
// Grow 确保可写空间,如果不够会扩容
func (msg *StreamMsg) Grow(n uint32) {
msg.buff.Grow(int(n))
}

@ -11,19 +11,19 @@ package rtprtcp
// (70 * 365 + 17) * 24 * 60 * 60
const offset uint64 = 2208988800
// 将ntp时间戳转换为Unix时间戳Unix时间戳单位是纳秒
// Ntp2UnixNano 将ntp时间戳转换为Unix时间戳Unix时间戳单位是纳秒
func Ntp2UnixNano(v uint64) uint64 {
msw := v >> 32
lsw := v & 0xFFFFFFFF
return (msw-offset)*1e9 + (lsw*1e9)>>32
}
// 将ntp时间戳高32位低32位分开的形式转换为Unix时间戳
// MswLsw2UnixNano 将ntp时间戳高32位低32位分开的形式转换为Unix时间戳
func MswLsw2UnixNano(msw, lsw uint64) uint64 {
return Ntp2UnixNano(MswLsw2Ntp(msw, lsw))
}
// msw是ntp的高32位lsw是ntp的低32位
// MswLsw2Ntp msw是ntp的高32位lsw是ntp的低32位
func MswLsw2Ntp(msw, lsw uint64) uint64 {
return (msw << 32) | lsw
}

@ -119,7 +119,7 @@ func ParseRtcpHeader(b []byte) RtcpHeader {
return h
}
// rfc3550 6.4.1
// ParseSr rfc3550 6.4.1
//
// @param b rtcp包包含包头
func ParseSr(b []byte) Sr {
@ -133,7 +133,7 @@ func ParseSr(b []byte) Sr {
return s
}
// @param out 传出参数,注意,调用方保证长度>=4
// PackTo @param out 传出参数,注意,调用方保证长度>=4
func (r *RtcpHeader) PackTo(out []byte) {
out[0] = r.Version<<6 | r.Padding<<5 | r.CountOrFormat
out[1] = r.PacketType

@ -40,7 +40,7 @@ func NewRrProducer(clockRate int) *RrProducer {
}
}
// 每次收到rtp包都将seq序号传入这个函数
// FeedRtpPacket 每次收到rtp包都将seq序号传入这个函数
func (r *RrProducer) FeedRtpPacket(seq uint16) {
r.received++
@ -62,7 +62,7 @@ func (r *RrProducer) FeedRtpPacket(seq uint16) {
r.extendedSeq = (r.cycles << 16) | uint32(r.maxSeq)
}
// 收到sr包时产生rr包
// Produce 收到sr包时产生rr包
//
// @param lsr: 从sr包中获取见func SR.GetMiddleNtp
// @return: rr包的二进制数据

@ -28,11 +28,11 @@ const (
NaluTypeAvcStapa = 24 // one packet, multiple nals
NaluTypeAvcFua = 28
// TODO(chef): hevc有stapa格式吗
// NaluTypeHevcFua TODO(chef): hevc有stapa格式吗
NaluTypeHevcFua = 49
)
// 比较序号的值,内部处理序号翻转问题,见单元测试中的例子
// CompareSeq 比较序号的值,内部处理序号翻转问题,见单元测试中的例子
// @return 0 a和b相等
// 1 a大于b
// -1 a小于b
@ -56,7 +56,7 @@ func CompareSeq(a, b uint16) int {
return 1
}
// a减b的值内部处理序号翻转问题如果a小于b则返回负值见单元测试中的例子
// SubSeq a减b的值内部处理序号翻转问题如果a小于b则返回负值见单元测试中的例子
func SubSeq(a, b uint16) int {
if a == b {
return 0

@ -52,7 +52,7 @@ func NewRtpPacker(payloadPacker IRtpPackerPayload, clockRate int, ssrc uint32, m
}
}
// @param pkt: pkt.Timestamp 绝对时间戳,单位毫秒
// Pack @param pkt: pkt.Timestamp 绝对时间戳,单位毫秒
// pkt.PayloadType rtp包头中的packet type
//
func (r *RtpPacker) Pack(pkt base.AvPacket) (out []RtpPacket) {

@ -9,7 +9,7 @@
package rtprtcp
type IRtpPackerPayload interface {
// @param maxSize: rtp payload包体部分不含包头的最大大小
// Pack @param maxSize: rtp payload包体部分不含包头的最大大小
//
Pack(in []byte, maxSize int) (out [][]byte)
}

@ -56,7 +56,7 @@ func NewRtpPackerPayloadAvcHevc(payloadType base.AvPacketPt, modOptions ...ModRt
}
}
// @param in: AVCC格式
// Pack @param in: AVCC格式
//
// @return out: 内存块为独立新申请;函数返回后,内部不再持有该内存块
//

@ -113,7 +113,7 @@ func ParseRtpHeader(b []byte) (h RtpHeader, err error) {
return
}
// 函数调用结束后,不持有参数<b>的内存块
// ParseRtpPacket 函数调用结束后,不持有参数<b>的内存块
func ParseRtpPacket(b []byte) (pkt RtpPacket, err error) {
pkt.Header, err = ParseRtpHeader(b)
if err != nil {
@ -132,7 +132,7 @@ func (p *RtpPacket) Body() []byte {
return p.Raw[p.Header.payloadOffset:]
}
// @param pt: 取值范围为AvPacketPtAvc或AvPacketPtHevc否则直接返回false
// IsAvcHevcBoundary @param pt: 取值范围为AvPacketPtAvc或AvPacketPtHevc否则直接返回false
//
func IsAvcHevcBoundary(pkt RtpPacket, pt base.AvPacketPt) bool {
switch pt {

@ -35,7 +35,7 @@ func NewRtpUnpackContainer(maxSize int, unpackerProtocol IRtpUnpackerProtocol) *
}
}
// 输入收到的rtp包
// Feed 输入收到的rtp包
func (r *RtpUnpackContainer) Feed(pkt RtpPacket) {
// 过期的包
if r.isStale(pkt.Header.Seq) {

@ -31,10 +31,10 @@ type IRtpUnpackContainer interface {
}
type IRtpUnpackerProtocol interface {
// 计算rtp包处于帧中的位置
// CalcPositionIfNeeded 计算rtp包处于帧中的位置
CalcPositionIfNeeded(pkt *RtpPacket)
// 尝试合成一个完整帧
// TryUnpackOne 尝试合成一个完整帧
//
// 从当前队列的第一个包开始合成
// 如果一个rtp包对应一个完整帧则合成一帧
@ -46,7 +46,7 @@ type IRtpUnpackerProtocol interface {
TryUnpackOne(list *RtpPacketList) (unpackedFlag bool, unpackedSeq uint16)
}
// @param pkt: pkt.Timestamp RTP包头中的时间戳(pts)经过clockrate换算后的时间戳单位毫秒
// OnAvPacket @param pkt: pkt.Timestamp RTP包头中的时间戳(pts)经过clockrate换算后的时间戳单位毫秒
// 注意不支持带B帧的视频流pts和dts永远相同
// pkt.PayloadType base.AvPacketPTXXX
// pkt.Payload AAC:
@ -59,7 +59,7 @@ type IRtpUnpackerProtocol interface {
// 假如sps和pps是一个stapA包则合并结果为一个AvPacket
type OnAvPacket func(pkt base.AvPacket)
// 目前支持AVCHEVC和AAC MPEG4-GENERIC业务方也可以自己实现IRtpUnpackerProtocol甚至是IRtpUnpackContainer
// DefaultRtpUnpackerFactory 目前支持AVCHEVC和AAC MPEG4-GENERIC业务方也可以自己实现IRtpUnpackerProtocol甚至是IRtpUnpackContainer
func DefaultRtpUnpackerFactory(payloadType base.AvPacketPt, clockRate int, maxSize int, onAvPacket OnAvPacket) IRtpUnpacker {
var protocol IRtpUnpackerProtocol
switch payloadType {

@ -74,7 +74,7 @@ func (a *Auth) FeedWwwAuthenticate(auths []string, username, password string) {
}
}
// 如果没有调用`FeedWwwAuthenticate`初始化过,则直接返回空字符串
// MakeAuthorization 如果没有调用`FeedWwwAuthenticate`初始化过,则直接返回空字符串
func (a *Auth) MakeAuthorization(method, uri string) string {
if a.Username == "" {
return ""

@ -41,7 +41,7 @@ func NewAvPacketQueue(onAvPacket OnAvPacket) *AvPacketQueue {
}
}
// 注意,调用方保证,音频相较于音频,视频相较于视频,时间戳是线性递增的。
// Feed 注意,调用方保证,音频相较于音频,视频相较于视频,时间戳是线性递增的。
func (a *AvPacketQueue) Feed(pkt base.AvPacket) {
//Log.Debugf("AVQ feed. t=%d, ts=%d", pkt.PayloadType, pkt.Timestamp)
switch pkt.PayloadType {

@ -12,7 +12,6 @@ import (
"encoding/hex"
"net"
"sync"
"time"
"github.com/q191201771/naza/pkg/nazaatomic"
@ -95,7 +94,7 @@ func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *Ba
stat: base.StatSession{
Protocol: base.ProtocolRtsp,
SessionId: uniqueKey,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
StartTime: base.ReadableNowTime(),
},
cmdSession: cmdSession,
waitChan: make(chan error, 1),
@ -143,7 +142,8 @@ func (session *BaseInSession) InitWithSdp(sdpCtx sdp.LogicContext) {
}
}
// 如果没有设置回调监听对象可以通过该函数设置调用方保证调用该函数发生在调用InitWithSdp之后
// SetObserver 如果没有设置回调监听对象可以通过该函数设置调用方保证调用该函数发生在调用InitWithSdp之后
//
func (session *BaseInSession) SetObserver(observer BaseInSessionObserver) {
session.observer = observer
@ -224,7 +224,7 @@ func (session *BaseInSession) HandleInterleavedPacket(b []byte, channel int) {
}
}
// 发现pull时需要先给对端发送数据才能收到数据
// WriteRtpRtcpDummy 发现pull时需要先给对端发送数据才能收到数据
func (session *BaseInSession) WriteRtpRtcpDummy() {
if session.videoRtpConn != nil {
_ = session.videoRtpConn.Write(dummyRtpPacket)

@ -12,7 +12,6 @@ import (
"encoding/hex"
"net"
"sync"
"time"
"github.com/q191201771/naza/pkg/nazaatomic"
@ -26,7 +25,7 @@ import (
"github.com/q191201771/naza/pkg/nazanet"
)
// out的含义是音视频由本端发送至对端
// BaseOutSession out的含义是音视频由本端发送至对端
//
type BaseOutSession struct {
uniqueKey string
@ -66,7 +65,7 @@ func NewBaseOutSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *B
stat: base.StatSession{
Protocol: base.ProtocolRtsp,
SessionId: uniqueKey,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
StartTime: base.ReadableNowTime(),
},
audioRtpChannel: -1,
videoRtpChannel: -1,

@ -52,7 +52,7 @@ var defaultClientCommandSessionOption = ClientCommandSessionOption{
type ClientCommandSessionObserver interface {
OnConnectResult()
// only for PullSession
// OnDescribeResponse only for PullSession
OnDescribeResponse(sdpCtx sdp.LogicContext)
OnSetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UdpConnection)
@ -62,7 +62,7 @@ type ClientCommandSessionObserver interface {
OnInterleavedPacket(packet []byte, channel int)
}
// Push和Pull共用封装了客户端底层信令信令部分。
// ClientCommandSession Push和Pull共用封装了客户端底层信令信令部分。
// 业务方应该使用PushSession和PullSession而不是直接使用ClientCommandSession除非你确定要这么做。
type ClientCommandSession struct {
uniqueKey string
@ -103,7 +103,7 @@ func NewClientCommandSession(t ClientCommandSessionType, uniqueKey string, obser
return s
}
// only for PushSession
// InitWithSdp only for PushSession
func (session *ClientCommandSession) InitWithSdp(sdpCtx sdp.LogicContext) {
session.sdpCtx = sdpCtx
}

@ -143,79 +143,79 @@ func (session *PullSession) WaitChan() <-chan error {
// ---------------------------------------------------------------------------------------------------------------------
// 文档请参考: interface ISessionUrlContext
// Url 文档请参考: interface ISessionUrlContext
func (session *PullSession) Url() string {
return session.cmdSession.Url()
}
// 文档请参考: interface ISessionUrlContext
// AppName 文档请参考: interface ISessionUrlContext
func (session *PullSession) AppName() string {
return session.cmdSession.AppName()
}
// 文档请参考: interface ISessionUrlContext
// StreamName 文档请参考: interface ISessionUrlContext
func (session *PullSession) StreamName() string {
return session.cmdSession.StreamName()
}
// 文档请参考: interface ISessionUrlContext
// RawQuery 文档请参考: interface ISessionUrlContext
func (session *PullSession) RawQuery() string {
return session.cmdSession.RawQuery()
}
// 文档请参考: interface IObject
// UniqueKey 文档请参考: interface IObject
func (session *PullSession) UniqueKey() string {
return session.uniqueKey
}
// 文档请参考: interface ISessionStat
// GetStat 文档请参考: interface ISessionStat
func (session *PullSession) GetStat() base.StatSession {
stat := session.baseInSession.GetStat()
stat.RemoteAddr = session.cmdSession.RemoteAddr()
return stat
}
// 文档请参考: interface ISessionStat
// UpdateStat 文档请参考: interface ISessionStat
func (session *PullSession) UpdateStat(intervalSec uint32) {
session.baseInSession.UpdateStat(intervalSec)
}
// 文档请参考: interface ISessionStat
// IsAlive 文档请参考: interface ISessionStat
func (session *PullSession) IsAlive() (readAlive, writeAlive bool) {
return session.baseInSession.IsAlive()
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnConnectResult ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnConnectResult() {
// noop
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnDescribeResponse ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnDescribeResponse(sdpCtx sdp.LogicContext) {
session.baseInSession.InitWithSdp(sdpCtx)
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupWithConn ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnSetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UdpConnection) {
_ = session.baseInSession.SetupWithConn(uri, rtpConn, rtcpConn)
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupWithChannel ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnSetupWithChannel(uri string, rtpChannel, rtcpChannel int) {
_ = session.baseInSession.SetupWithChannel(uri, rtpChannel, rtcpChannel)
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupResult ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnSetupResult() {
session.baseInSession.WriteRtpRtcpDummy()
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnInterleavedPacket ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnInterleavedPacket(packet []byte, channel int) {
session.baseInSession.HandleInterleavedPacket(packet, channel)
}
// IInterleavedPacketWriter, callback by BaseInSession
// WriteInterleavedPacket IInterleavedPacketWriter, callback by BaseInSession
func (session *PullSession) WriteInterleavedPacket(packet []byte, channel int) error {
return session.cmdSession.WriteInterleavedPacket(packet, channel)
}

@ -136,79 +136,79 @@ func (session *PushSession) WaitChan() <-chan error {
// ---------------------------------------------------------------------------------------------------------------------
// 文档请参考: interface ISessionUrlContext
// Url 文档请参考: interface ISessionUrlContext
func (session *PushSession) Url() string {
return session.cmdSession.Url()
}
// 文档请参考: interface ISessionUrlContext
// AppName 文档请参考: interface ISessionUrlContext
func (session *PushSession) AppName() string {
return session.cmdSession.AppName()
}
// 文档请参考: interface ISessionUrlContext
// StreamName 文档请参考: interface ISessionUrlContext
func (session *PushSession) StreamName() string {
return session.cmdSession.StreamName()
}
// 文档请参考: interface ISessionUrlContext
// RawQuery 文档请参考: interface ISessionUrlContext
func (session *PushSession) RawQuery() string {
return session.cmdSession.RawQuery()
}
// 文档请参考: interface IObject
// UniqueKey 文档请参考: interface IObject
func (session *PushSession) UniqueKey() string {
return session.uniqueKey
}
// 文档请参考: interface ISessionStat
// GetStat 文档请参考: interface ISessionStat
func (session *PushSession) GetStat() base.StatSession {
stat := session.baseOutSession.GetStat()
stat.RemoteAddr = session.cmdSession.RemoteAddr()
return stat
}
// 文档请参考: interface ISessionStat
// UpdateStat 文档请参考: interface ISessionStat
func (session *PushSession) UpdateStat(intervalSec uint32) {
session.baseOutSession.UpdateStat(intervalSec)
}
// 文档请参考: interface ISessionStat
// IsAlive 文档请参考: interface ISessionStat
func (session *PushSession) IsAlive() (readAlive, writeAlive bool) {
return session.baseOutSession.IsAlive()
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnConnectResult ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnConnectResult() {
// noop
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnDescribeResponse ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnDescribeResponse(sdpCtx sdp.LogicContext) {
// noop
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupWithConn ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnSetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UdpConnection) {
_ = session.baseOutSession.SetupWithConn(uri, rtpConn, rtcpConn)
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupWithChannel ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnSetupWithChannel(uri string, rtpChannel, rtcpChannel int) {
_ = session.baseOutSession.SetupWithChannel(uri, rtpChannel, rtcpChannel)
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupResult ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnSetupResult() {
// noop
}
// ClientCommandSessionObserver, callback by ClientCommandSession
// OnInterleavedPacket ClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnInterleavedPacket(packet []byte, channel int) {
session.baseOutSession.HandleInterleavedPacket(packet, channel)
}
// IInterleavedPacketWriter, callback by BaseOutSession
// WriteInterleavedPacket IInterleavedPacketWriter, callback by BaseOutSession
func (session *PushSession) WriteInterleavedPacket(packet []byte, channel int) error {
return session.cmdSession.WriteInterleavedPacket(packet, channel)
}

@ -17,7 +17,7 @@ import (
// rfc2326 10.1 OPTIONS
// CSeq
// ResponseOptionsTmpl CSeq
var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\n" +
"Server: " + base.LalRtspOptionsResponseServer + "\r\n" +
"CSeq: %s\r\n" +
@ -27,14 +27,14 @@ var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\n" +
// rfc2326 10.3 ANNOUNCE
//var RequestAnnounceTmpl = "not impl"
// CSeq
// ResponseAnnounceTmpl CSeq
var ResponseAnnounceTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" +
"\r\n"
// rfc2326 10.2 DESCRIBE
// CSeq, Date, Content-Length,
// ResponseDescribeTmpl CSeq, Date, Content-Length,
var ResponseDescribeTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" +
"Date: %s\r\n" +
@ -43,7 +43,7 @@ var ResponseDescribeTmpl = "RTSP/1.0 200 OK\r\n" +
"\r\n" +
"%s"
// rfc2326 10.4 SETUP
// ResponseSetupTmpl rfc2326 10.4 SETUP
// CSeq, Date, Session, Transport
var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" +
@ -55,7 +55,7 @@ var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\n" +
// rfc2326 10.11 RECORD
//var RequestRecordTmpl = "not impl"
// CSeq, Session
// ResponseRecordTmpl CSeq, Session
var ResponseRecordTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" +
"Session: %s\r\n" +
@ -63,7 +63,7 @@ var ResponseRecordTmpl = "RTSP/1.0 200 OK\r\n" +
// rfc2326 10.5 PLAY
// CSeq Date
// ResponsePlayTmpl CSeq Date
var ResponsePlayTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" +
"Date: %s\r\n" +
@ -72,7 +72,7 @@ var ResponsePlayTmpl = "RTSP/1.0 200 OK\r\n" +
// rfc2326 10.7 TEARDOWN
//var RequestTeardownTmpl = "not impl"
// CSeq
// ResponseTeardownTmpl CSeq
var ResponseTeardownTmpl = "RTSP/1.0 200 OK\r\n" +
"CSeq: %s\r\n" +
"\r\n"
@ -109,7 +109,7 @@ func PackResponseTeardown(cseq string) string {
return fmt.Sprintf(ResponseTeardownTmpl, cseq)
}
// @param body 可以为空
// PackRequest @param body 可以为空
func PackRequest(method, uri string, headers map[string]string, body string) (ret string) {
ret = method + " " + uri + " RTSP/1.0\r\n"
for k, v := range headers {

@ -41,7 +41,7 @@ const (
)
const (
// header key
// HeaderAccept header key
HeaderAccept = "Accept"
HeaderUserAgent = "User-Agent"
HeaderCSeq = "CSeq"
@ -53,7 +53,7 @@ const (
HeaderAuthorization = "Authorization"
HeaderPublic = "Public"
// header value
// HeaderAcceptApplicationSdp header value
HeaderAcceptApplicationSdp = "application/sdp"
HeaderRangeDefault = "npt=0.000-"
HeaderTransportClientPlayTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d" // localRtpPort, localRtcpPort
@ -61,8 +61,11 @@ const (
HeaderTransportClientRecordTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;mode=record"
HeaderTransportClientRecordTcpTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=record"
HeaderTransportServerPlayTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;server_port=%d-%d"
//HeaderTransportServerPlayTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d"
HeaderTransportServerRecordTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;server_port=%d-%d;mode=record"
//HeaderTransportServerRecordTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=record"
)

@ -13,10 +13,10 @@ import (
)
type ServerObserver interface {
// @brief 使得上层有能力管理未进化到Pub、Sub阶段的Session
// OnNewRtspSessionConnect @brief 使得上层有能力管理未进化到Pub、Sub阶段的Session
OnNewRtspSessionConnect(session *ServerCommandSession)
// @brief 注意对于已经进化到了Pub、Sub阶段的Session该回调依然会被调用
// OnDelRtspSession @brief 注意对于已经进化到了Pub、Sub阶段的Session该回调依然会被调用
OnDelRtspSession(session *ServerCommandSession)
///////////////////////////////////////////////////////////////////////////

@ -107,7 +107,7 @@ func (session *PubSession) IsAlive() (readAlive, writeAlive bool) {
return session.baseInSession.IsAlive()
}
// IInterleavedPacketWriter, callback by BaseInSession
// WriteInterleavedPacket IInterleavedPacketWriter, callback by BaseInSession
func (session *PubSession) WriteInterleavedPacket(packet []byte, channel int) error {
return session.cmdSession.WriteInterleavedPacket(packet, channel)
}

@ -102,7 +102,7 @@ func (session *SubSession) IsAlive() (readAlive, writeAlive bool) {
return session.baseOutSession.IsAlive()
}
// IInterleavedPacketWriter, callback by BaseOutSession
// WriteInterleavedPacket IInterleavedPacketWriter, callback by BaseOutSession
func (session *SubSession) WriteInterleavedPacket(packet []byte, channel int) error {
return session.cmdSession.WriteInterleavedPacket(packet, channel)
}

@ -92,7 +92,7 @@ func ParseM(s string) (ret M, err error) {
return
}
// 例子见单元测试
// ParseARtpMap 例子见单元测试
func ParseARtpMap(s string) (ret ARtpMap, err error) {
// rfc 3640 3.3.1. General
// rfc 3640 3.3.6. High Bit-rate AAC
@ -131,7 +131,7 @@ func ParseARtpMap(s string) (ret ARtpMap, err error) {
return
}
// 例子见单元测试
// ParseAFmtPBase 例子见单元测试
func ParseAFmtPBase(s string) (ret AFmtPBase, err error) {
// rfc 3640 4.4.1. The a=fmtp Keyword
//

Loading…
Cancel
Save