diff --git a/pkg/avc/beta.go b/pkg/avc/beta.go index 3747969..4016561 100644 --- a/pkg/avc/beta.go +++ b/pkg/avc/beta.go @@ -45,7 +45,7 @@ func ParseSps(payload []byte, ctx *Context) error { return nil } -// 尝试解析PPS所有字段,实验中,请勿直接使用该函数 +// TryParsePps 尝试解析PPS所有字段,实验中,请勿直接使用该函数 func TryParsePps(payload []byte) error { // ISO-14496-10.pdf // 7.3.2.2 Picture parameter set RBSP syntax @@ -54,7 +54,7 @@ func TryParsePps(payload []byte) error { return nil } -// 尝试解析SeqHeader所有字段,实验中,请勿直接使用该函数 +// TryParseSeqHeader 尝试解析SeqHeader所有字段,实验中,请勿直接使用该函数 // // @param rtmp message的payload部分或者flv tag的payload部分 // 注意,包含了头部2字节类型以及3字节的cts diff --git a/pkg/base/base.go b/pkg/base/base.go index c9e6a4e..0750bcb 100644 --- a/pkg/base/base.go +++ b/pkg/base/base.go @@ -22,9 +22,7 @@ import ( var startTime string -// ReadableNowTime -// -// TODO(chef): refactor 使用ReadableNowTime +// ReadableNowTime 当前时间,可读字符串形式 // func ReadableNowTime() string { return time.Now().Format("2006-01-02 15:04:05.999") diff --git a/pkg/base/http_sub_session.go b/pkg/base/http_sub_session.go index 9f6a513..62c12dd 100644 --- a/pkg/base/http_sub_session.go +++ b/pkg/base/http_sub_session.go @@ -11,7 +11,6 @@ package base import ( "net" "strings" - "time" "github.com/q191201771/naza/pkg/connection" ) @@ -43,7 +42,7 @@ func NewHttpSubSession(option HttpSubSessionOption) *HttpSubSession { stat: StatSession{ Protocol: option.Protocol, SessionId: option.Uk, - StartTime: time.Now().Format("2006-01-02 15:04:05.999"), + StartTime: ReadableNowTime(), RemoteAddr: option.Conn.RemoteAddr().String(), }, } diff --git a/pkg/base/rtmp_t.go b/pkg/base/rtmp_t.go index 53056e4..92b5136 100644 --- a/pkg/base/rtmp_t.go +++ b/pkg/base/rtmp_t.go @@ -9,7 +9,7 @@ package base const ( - // spec-rtmp_specification_1.0.pdf + // RtmpTypeIdAudio spec-rtmp_specification_1.0.pdf // 7.1. Types of Messages RtmpTypeIdAudio uint8 = 8 RtmpTypeIdVideo uint8 = 9 @@ -32,7 +32,7 @@ const ( RtmpUserControlPingRequest uint8 = 6 RtmpUserControlPingResponse uint8 = 7 - // spec-video_file_format_spec_v10.pdf + // RtmpFrameTypeKey spec-video_file_format_spec_v10.pdf // Video tags // VIDEODATA // FrameType UB[4] @@ -63,7 +63,7 @@ const ( RtmpAvcInterFrame = RtmpFrameTypeInter<<4 | RtmpCodecIdAvc RtmpHevcInterFrame = RtmpFrameTypeInter<<4 | RtmpCodecIdHevc - // spec-video_file_format_spec_v10.pdf + // RtmpSoundFormatAac spec-video_file_format_spec_v10.pdf // Audio tags // AUDIODATA // SoundFormat UB[4] diff --git a/pkg/base/rtprtcp_t.go b/pkg/base/rtprtcp_t.go index 7e7710e..6735944 100644 --- a/pkg/base/rtprtcp_t.go +++ b/pkg/base/rtprtcp_t.go @@ -9,7 +9,7 @@ package base const ( - // 注意,一般情况下,AVC使用96,AAC使用97,HEVC使用98 + // RtpPacketTypeAvcOrHevc 注意,一般情况下,AVC使用96,AAC使用97,HEVC使用98 // 但是我还遇到过: // HEVC使用96 // AVC使用105 diff --git a/pkg/base/stat.go b/pkg/base/stat.go index 3bc43ed..bb03f79 100644 --- a/pkg/base/stat.go +++ b/pkg/base/stat.go @@ -9,14 +9,14 @@ package base const ( - // StatGroup.AudioCodec + // AudioCodecAac StatGroup.AudioCodec AudioCodecAac = "AAC" - // StatGroup.VideoCodec + // VideoCodecAvc StatGroup.VideoCodec VideoCodecAvc = "H264" VideoCodecHevc = "H265" - // StatSession.Protocol + // ProtocolRtmp StatSession.Protocol ProtocolRtmp = "RTMP" ProtocolRtsp = "RTSP" ProtocolHttpflv = "HTTP-FLV" diff --git a/pkg/base/version.go b/pkg/base/version.go index 7ea6741..ac72ce5 100644 --- a/pkg/base/version.go +++ b/pkg/base/version.go @@ -15,7 +15,7 @@ import "strings" // 另外,我们也在本文件提供另外一些信息 // 并且将这些信息打入可执行文件、日志、各协议中的标准版本字段中 -// 版本,该变量由外部脚本修改维护 +// LalVersion 版本,该变量由外部脚本修改维护 const LalVersion = "v0.27.1" var ( @@ -24,57 +24,57 @@ var ( LalGithubSite = "https://github.com/q191201771/lal" LalDocSite = "https://pengrl.com/lal" - // e.g. lal v0.12.3 (github.com/q191201771/lal) + // LalFullInfo e.g. lal v0.12.3 (github.com/q191201771/lal) LalFullInfo = LalLibraryName + " " + LalVersion + " (" + LalGithubRepo + ")" - // e.g. 0.12.3 + // LalVersionDot e.g. 0.12.3 LalVersionDot string - // e.g. 0,12,3 + // LalVersionComma e.g. 0,12,3 LalVersionComma string ) var ( - // 植入rtmp握手随机字符串中 + // LalRtmpHandshakeWaterMark 植入rtmp握手随机字符串中 // e.g. lal v0.12.3 (github.com/q191201771/lal) LalRtmpHandshakeWaterMark string - // 植入rtmp server中的connect result信令中 + // LalRtmpConnectResultVersion 植入rtmp server中的connect result信令中 // 注意,有两个object,第一个object中的fmsVer我们保持通用公认的值,在第二个object中植入 // e.g. 0,12,3 LalRtmpConnectResultVersion string - // e.g. lal0.12.3 + // LalRtmpPushSessionConnectVersion e.g. lal0.12.3 LalRtmpPushSessionConnectVersion string - // e.g. lal0.12.3 + // LalRtmpBuildMetadataEncoder e.g. lal0.12.3 LalRtmpBuildMetadataEncoder string - // e.g. lal/0.12.3 + // LalHttpflvPullSessionUa e.g. lal/0.12.3 LalHttpflvPullSessionUa string - // e.g. lal0.12.3 + // LalHttpflvSubSessionServer e.g. lal0.12.3 LalHttpflvSubSessionServer string - // e.g. lal0.12.3 + // LalHlsM3u8Server e.g. lal0.12.3 LalHlsM3u8Server string - // e.g. lal0.12.3 + // LalHlsTsServer e.g. lal0.12.3 LalHlsTsServer string - // e.g. lal0.12.3 + // LalRtspOptionsResponseServer e.g. lal0.12.3 LalRtspOptionsResponseServer string - // e.g. lal0.12.3 + // LalHttptsSubSessionServer e.g. lal0.12.3 LalHttptsSubSessionServer string - // e.g. lal0.12.3 + // LalHttpApiServer e.g. lal0.12.3 LalHttpApiServer string - // e.g. lal/0.12.3 + // LalRtspPullSessionUa e.g. lal/0.12.3 LalRtspPullSessionUa string - // e.g. lal 0.12.3 + // LalPackSdp e.g. lal 0.12.3 LalPackSdp string ) diff --git a/pkg/base/websocket.go b/pkg/base/websocket.go index 046824e..5ba564e 100644 --- a/pkg/base/websocket.go +++ b/pkg/base/websocket.go @@ -16,7 +16,7 @@ import ( "github.com/q191201771/naza/pkg/bele" ) -// The WebSocket Protocol +// WsOpcode The WebSocket Protocol // https://tools.ietf.org/html/rfc6455 // // 0 1 2 3 @@ -59,7 +59,8 @@ const ( Wso_Continuous WsOpcode = iota //连续消息片断 Wso_Text //文本消息片断, Wso_Binary //二进制消息片断, - //非控制消息片断保留的操作码, + + // Wso_Rsv3 非控制消息片断保留的操作码, Wso_Rsv3 Wso_Rsv4 Wso_Rsv5 @@ -68,7 +69,8 @@ const ( Wso_Close //连接关闭, Wso_Ping //心跳检查的ping, Wso_Pong //心跳检查的pong, - //为将来的控制消息片断的保留操作码 + + // Wso_RsvB 为将来的控制消息片断的保留操作码 Wso_RsvB Wso_RsvC Wso_RsvD diff --git a/pkg/hls/m3u8.go b/pkg/hls/m3u8.go index 8ea5159..568f6a4 100644 --- a/pkg/hls/m3u8.go +++ b/pkg/hls/m3u8.go @@ -59,7 +59,7 @@ func updateTargetDurationInM3u8(content []byte, currDuration int) ([]byte, error return content, nil } -// @param content 传入m3u8文件内容 +// CalcM3u8Duration @param content 传入m3u8文件内容 // // @return durationSec m3u8中所有ts的时间总和。注意,使用的是m3u8文件中描述的ts时间,而不是读取ts文件中实际音视频数据的时间。 // diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index 8624357..e3f93e9 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -25,7 +25,7 @@ import ( type MuxerObserver interface { OnPatPmt(b []byte) - // @param rawFrame TS流,回调结束后,内部不再使用该内存块 + // OnTsPackets @param rawFrame TS流,回调结束后,内部不再使用该内存块 // @param boundary 新的TS流接收者,应该从该标志为true时开始发送数据 // OnTsPackets(rawFrame []byte, boundary bool) @@ -135,7 +135,7 @@ func (m *Muxer) Dispose() { } } -// @param msg 函数调用结束后,内部不持有msg中的内存块 +// FeedRtmpMessage @param msg 函数调用结束后,内部不持有msg中的内存块 // func (m *Muxer) FeedRtmpMessage(msg base.RtmpMsg) { m.streamer.FeedRtmpMessage(msg) diff --git a/pkg/hls/path_strategy.go b/pkg/hls/path_strategy.go index 69643fd..a629290 100644 --- a/pkg/hls/path_strategy.go +++ b/pkg/hls/path_strategy.go @@ -43,17 +43,17 @@ type IPathRequestStrategy interface { GetRequestInfo(urlCtx base.UrlContext, rootOutPath string) RequestInfo } -// 落盘策略 +// IPathWriteStrategy 落盘策略 type IPathWriteStrategy interface { - // 获取单个流对应的文件根路径 + // GetMuxerOutPath 获取单个流对应的文件根路径 GetMuxerOutPath(rootOutPath string, streamName string) string - // 获取单个流对应的m3u8文件路径 + // GetLiveM3u8FileName 获取单个流对应的m3u8文件路径 // // @param outPath: func GetMuxerOutPath的结果 GetLiveM3u8FileName(outPath string, streamName string) string - // 获取单个流对应的record类型的m3u8文件路径 + // GetRecordM3u8FileName 获取单个流对应的record类型的m3u8文件路径 // // live m3u8和record m3u8的区别: // live记录的是当前最近的可播放内容,record记录的是从流开始时的可播放内容 @@ -61,12 +61,12 @@ type IPathWriteStrategy interface { // @param outPath: func GetMuxerOutPath的结果 GetRecordM3u8FileName(outPath string, streamName string) string - // 获取单个流对应的ts文件路径 + // GetTsFileNameWithPath 获取单个流对应的ts文件路径 // // @param outPath: func GetMuxerOutPath的结果 GetTsFileNameWithPath(outPath string, fileName string) string - // ts文件名的生成策略 + // GetTsFileName ts文件名的生成策略 GetTsFileName(streamName string, index int, timestamp int) string } @@ -77,7 +77,7 @@ const ( recordM3u8FileName = "record.m3u8" ) -// 默认的路由,落盘策略 +// DefaultPathStrategy 默认的路由,落盘策略 // // 每个流在下以流名称生成一个子目录,目录下包含: // @@ -136,7 +136,7 @@ func (dps *DefaultPathStrategy) GetRequestInfo(urlCtx base.UrlContext, rootOutPa return } -// / +// GetMuxerOutPath / func (*DefaultPathStrategy) GetMuxerOutPath(rootOutPath string, streamName string) string { return filepath.Join(rootOutPath, streamName) } diff --git a/pkg/hls/queue.go b/pkg/hls/queue.go index f3dbede..e77d9f4 100644 --- a/pkg/hls/queue.go +++ b/pkg/hls/queue.go @@ -13,7 +13,7 @@ import ( "github.com/q191201771/lal/pkg/mpegts" ) -// 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式 +// Queue 缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式 // 一旦判断结束,该队列变成直进直出,不再有实际缓存 type Queue struct { maxMsgSize int @@ -26,14 +26,14 @@ type Queue struct { } type IQueueObserver interface { - // 该回调一定发生在数据回调之前 + // OnPatPmt 该回调一定发生在数据回调之前 // TODO(chef) 这里可以考虑换成只通知drain,由上层完成FragmentHeader的组装逻辑 OnPatPmt(b []byte) OnPop(msg base.RtmpMsg) } -// @param maxMsgSize 最大缓存多少个包 +// NewQueue @param maxMsgSize 最大缓存多少个包 func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue { return &Queue{ maxMsgSize: maxMsgSize, @@ -45,7 +45,7 @@ func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue { } } -// @param msg 函数调用结束后,内部不持有该内存块 +// Push @param msg 函数调用结束后,内部不持有该内存块 func (q *Queue) Push(msg base.RtmpMsg) { if q.done { q.observer.OnPop(msg) diff --git a/pkg/hls/streamer.go b/pkg/hls/streamer.go index f20881a..a0c9c2e 100644 --- a/pkg/hls/streamer.go +++ b/pkg/hls/streamer.go @@ -21,10 +21,10 @@ import ( ) type StreamerObserver interface { - // @param b const只读内存块,上层可以持有,但是不允许修改 + // OnPatPmt @param b const只读内存块,上层可以持有,但是不允许修改 OnPatPmt(b []byte) - // @param streamer: 供上层获取streamer内部的一些状态,比如spspps是否已缓存,音频缓存队列是否有数据等 + // OnFrame @param streamer: 供上层获取streamer内部的一些状态,比如spspps是否已缓存,音频缓存队列是否有数据等 // // @param frame: 各字段含义见mpegts.Frame结构体定义 // frame.CC 注意,回调结束后,Streamer会保存frame.CC,上层在TS打包完成后,可通过frame.CC将cc值传递给Streamer @@ -33,7 +33,7 @@ type StreamerObserver interface { OnFrame(streamer *Streamer, frame *mpegts.Frame) } -// 输入rtmp流,回调转封装成Annexb格式的流 +// Streamer 输入rtmp流,回调转封装成Annexb格式的流 type Streamer struct { UniqueKey string @@ -61,7 +61,7 @@ func NewStreamer(observer StreamerObserver) *Streamer { return streamer } -// @param msg msg.Payload 调用结束后,函数内部不会持有这块内存 +// FeedRtmpMessage @param msg msg.Payload 调用结束后,函数内部不会持有这块内存 // // TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接 func (s *Streamer) FeedRtmpMessage(msg base.RtmpMsg) { @@ -268,7 +268,7 @@ func (s *Streamer) feedAudio(msg base.RtmpMsg) { s.audioCacheFrames = append(s.audioCacheFrames, msg.Payload[2:]...) } -// 吐出音频数据的三种情况: +// FlushAudio 吐出音频数据的三种情况: // 1. 收到音频或视频时,音频缓存队列已达到一定长度 // 2. 打开一个新的TS文件切片时 // 3. 输入流关闭时 diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 4f43a34..4469844 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -68,7 +68,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession { return s } -// @param tag: 底层保证回调上来的Raw数据长度是完整的(但是不会分析Raw内部的编码数据) +// OnReadFlvTag @param tag: 底层保证回调上来的Raw数据长度是完整的(但是不会分析Raw内部的编码数据) type OnReadFlvTag func(tag Tag) // Pull 阻塞直到和对端完成拉流前,握手部分的工作,或者发生错误 @@ -115,32 +115,32 @@ func (session *PullSession) WaitChan() <-chan error { // --------------------------------------------------------------------------------------------------------------------- -// 文档请参考: interface ISessionUrlContext +// Url 文档请参考: interface ISessionUrlContext func (session *PullSession) Url() string { return session.urlCtx.Url } -// 文档请参考: interface ISessionUrlContext +// AppName 文档请参考: interface ISessionUrlContext func (session *PullSession) AppName() string { return session.urlCtx.PathWithoutLastItem } -// 文档请参考: interface ISessionUrlContext +// StreamName 文档请参考: interface ISessionUrlContext func (session *PullSession) StreamName() string { return session.urlCtx.LastItemOfPath } -// 文档请参考: interface ISessionUrlContext +// RawQuery 文档请参考: interface ISessionUrlContext func (session *PullSession) RawQuery() string { return session.urlCtx.RawQuery } -// 文档请参考: interface IObject +// UniqueKey 文档请参考: interface IObject func (session *PullSession) UniqueKey() string { return session.uniqueKey } -// 文档请参考: interface ISessionStat +// UpdateStat 文档请参考: interface ISessionStat func (session *PullSession) UpdateStat(intervalSec uint32) { currStat := session.conn.GetStat() rDiff := currStat.ReadBytesSum - session.prevConnStat.ReadBytesSum @@ -151,7 +151,7 @@ func (session *PullSession) UpdateStat(intervalSec uint32) { session.prevConnStat = currStat } -// 文档请参考: interface ISessionStat +// GetStat 文档请参考: interface ISessionStat func (session *PullSession) GetStat() base.StatSession { connStat := session.conn.GetStat() session.stat.ReadBytesSum = connStat.ReadBytesSum @@ -159,7 +159,7 @@ func (session *PullSession) GetStat() base.StatSession { return session.stat } -// 文档请参考: interface ISessionStat +// IsAlive 文档请参考: interface ISessionStat func (session *PullSession) IsAlive() (readAlive, writeAlive bool) { currStat := session.conn.GetStat() if session.staleStat == nil { diff --git a/pkg/httpflv/flv_file_pump.go b/pkg/httpflv/flv_file_pump.go index fc2da59..35b2aea 100644 --- a/pkg/httpflv/flv_file_pump.go +++ b/pkg/httpflv/flv_file_pump.go @@ -59,7 +59,7 @@ func (f *FlvFilePump) Pump(filename string, onFlvTag OnPumpFlvTag) error { return f.PumpWithTags(tags, onFlvTag) } -// @return error 暂时只做预留,目前只会返回nil +// PumpWithTags @return error 暂时只做预留,目前只会返回nil // func (f *FlvFilePump) PumpWithTags(tags []Tag, onFlvTag OnPumpFlvTag) error { var totalBaseTs uint32 // 整体的基础时间戳。每轮最后更新 diff --git a/pkg/httpflv/tag.go b/pkg/httpflv/tag.go index b7c79e5..799c7cd 100644 --- a/pkg/httpflv/tag.go +++ b/pkg/httpflv/tag.go @@ -26,7 +26,7 @@ type Tag struct { Raw []byte // 结构为 (11字节的 tag header) + (body) + (4字节的 prev tag size) } -// 只包含数据部分,去除了前面11字节的tag header和后面4字节的prev tag size +// Payload 只包含数据部分,去除了前面11字节的tag header和后面4字节的prev tag size // func (tag *Tag) Payload() []byte { return tag.Raw[TagHeaderSize : len(tag.Raw)-PrevTagSizeFieldSize] @@ -52,7 +52,7 @@ func (tag *Tag) IsHevcKeySeqHeader() bool { return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == HevcKeyFrame && tag.Raw[TagHeaderSize+1] == HevcPacketTypeSeqHeader } -// AVC或HEVC的seq header +// IsVideoKeySeqHeader AVC或HEVC的seq header func (tag *Tag) IsVideoKeySeqHeader() bool { return tag.IsAvcKeySeqHeader() || tag.IsHevcKeySeqHeader() } @@ -65,7 +65,7 @@ func (tag *Tag) IsHevcKeyNalu() bool { return tag.Header.Type == TagTypeVideo && tag.Raw[TagHeaderSize] == HevcKeyFrame && tag.Raw[TagHeaderSize+1] == HevcPacketTypeNalu } -// AVC或HEVC的关键帧 +// IsVideoKeyNalu AVC或HEVC的关键帧 func (tag *Tag) IsVideoKeyNalu() bool { return tag.IsAvcKeyNalu() || tag.IsHevcKeyNalu() } @@ -87,7 +87,7 @@ func (tag *Tag) ModTagTimestamp(timestamp uint32) { tag.Raw[7] = byte(timestamp >> 24) } -// 打包一个序列化后的 tag 二进制buffer,包含 tag header,body,prev tag size +// PackHttpflvTag 打包一个序列化后的 tag 二进制buffer,包含 tag header,body,prev tag size func PackHttpflvTag(t uint8, timestamp uint32, in []byte) []byte { out := make([]byte, TagHeaderSize+len(in)+PrevTagSizeFieldSize) out[0] = t diff --git a/pkg/logic/group.go b/pkg/logic/group.go index c2d13cb..8108f8e 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -157,7 +157,7 @@ func (group *Group) RunLoop() { <-group.exitChan } -// TODO chef: 传入时间 +// Tick TODO chef: 传入时间 // 目前每秒触发一次 func (group *Group) Tick() { group.mutex.Lock() @@ -253,7 +253,7 @@ func (group *Group) Tick() { group.tickCount++ } -// 主动释放所有资源。保证所有资源的生命周期逻辑上都在我们的控制中。降低出bug的几率,降低心智负担。 +// Dispose 主动释放所有资源。保证所有资源的生命周期逻辑上都在我们的控制中。降低出bug的几率,降低心智负担。 // 注意,Dispose后,不应再使用这个对象。 // 值得一提,如果是从其他协程回调回来的消息,在使用Group中的资源前,要判断资源是否存在以及可用。 // @@ -340,7 +340,7 @@ func (group *Group) AddRtmpPubSession(session *rtmp.ServerSession) error { return nil } -// TODO chef: rtsp package中,增加回调返回值判断,如果是false,将连接关掉 +// AddRtspPubSession TODO chef: rtsp package中,增加回调返回值判断,如果是false,将连接关掉 func (group *Group) AddRtspPubSession(session *rtsp.PubSession) error { Log.Debugf("[%s] [%s] add RTSP PubSession into group.", group.UniqueKey, session.UniqueKey()) @@ -435,7 +435,7 @@ func (group *Group) AddHttpflvSubSession(session *httpflv.SubSession) { group.pullIfNeeded() } -// TODO chef: +// AddHttptsSubSession TODO chef: // 这里应该也要考虑触发hls muxer开启 // 也即HTTPTS sub需要使用hls muxer,hls muxer开启和关闭都要考虑HTTPTS sub func (group *Group) AddHttptsSubSession(session *httpts.SubSession) { @@ -644,7 +644,7 @@ func (group *Group) KickOutSession(sessionId string) bool { return false } -// 外部命令主动触发pull拉流 +// StartPull 外部命令主动触发pull拉流 // // 当前调用时机: // 1. 比如http api @@ -922,14 +922,14 @@ func (group *Group) disposeHlsMuxer() { // 音视频数据转发、转封装的逻辑 // --------------------------------------------------------------------------------------------------------------------- -// rtmp.PubSession or rtmp.PullSession +// OnReadRtmpAvMsg rtmp.PubSession or rtmp.PullSession func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) { group.mutex.Lock() defer group.mutex.Unlock() group.broadcastByRtmpMsg(msg) } -// rtsp.PubSession +// OnRtpPacket rtsp.PubSession func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) { group.mutex.Lock() defer group.mutex.Unlock() @@ -937,7 +937,7 @@ func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) { group.onRtpPacket(pkt) } -// rtsp.PubSession +// OnSdp rtsp.PubSession func (group *Group) OnSdp(sdpCtx sdp.LogicContext) { group.mutex.Lock() defer group.mutex.Unlock() @@ -946,7 +946,7 @@ func (group *Group) OnSdp(sdpCtx sdp.LogicContext) { group.rtsp2RtmpRemuxer.OnSdp(sdpCtx) } -// rtsp.PubSession +// OnAvPacket rtsp.PubSession func (group *Group) OnAvPacket(pkt base.AvPacket) { group.mutex.Lock() defer group.mutex.Unlock() @@ -955,7 +955,7 @@ func (group *Group) OnAvPacket(pkt base.AvPacket) { group.rtsp2RtmpRemuxer.OnAvPacket(pkt) } -// hls.Muxer +// OnPatPmt hls.Muxer func (group *Group) OnPatPmt(b []byte) { group.patpmt = b @@ -966,7 +966,7 @@ func (group *Group) OnPatPmt(b []byte) { } } -// hls.Muxer +// OnTsPackets hls.Muxer func (group *Group) OnTsPackets(rawFrame []byte, boundary bool) { // 因为最前面Feed时已经加锁了,所以这里回调上来就不用加锁了 diff --git a/pkg/logic/logic.go b/pkg/logic/logic.go index ebda95c..3804908 100644 --- a/pkg/logic/logic.go +++ b/pkg/logic/logic.go @@ -66,23 +66,3 @@ var defaultOption = Option{ } type ModOption func(option *Option) - -// --------------------------------------------------------------------------------------------------------------------- - -// 一些没有放入配置文件中,包级别的配置,暂时没有对外暴露 -// -var ( - relayPushTimeoutMs = 5000 - relayPushWriteAvTimeoutMs = 5000 - relayPullTimeoutMs = 5000 - relayPullReadAvTimeoutMs = 5000 - calcSessionStatIntervalSec uint32 = 5 - - // checkSessionAliveIntervalSec - // - // - 对于输入型session,检查一定时间内,是否没有收到数据 - // - 对于输出型session,检查一定时间内,是否没有发送数据 - // 注意,这里既检查socket发送阻塞,又检查上层没有给session喂数据 - // - checkSessionAliveIntervalSec uint32 = 10 -) diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index db8a281..f45a3e3 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -56,7 +56,7 @@ type ServerManager struct { func NewServerManager(confFile string, modOption ...ModOption) *ServerManager { sm := &ServerManager{ - serverStartTime: time.Now().Format("2006-01-02 15:04:05.999"), + serverStartTime: base.ReadableNowTime(), exitChan: make(chan struct{}, 1), } sm.groupManager = NewSimpleGroupManager(sm) diff --git a/pkg/logic/var.go b/pkg/logic/var.go index 20318ff..811dfae 100644 --- a/pkg/logic/var.go +++ b/pkg/logic/var.go @@ -11,3 +11,19 @@ package logic import "github.com/q191201771/naza/pkg/nazalog" var Log = nazalog.GetGlobalLogger() + +var ( + relayPushTimeoutMs = 5000 + relayPushWriteAvTimeoutMs = 5000 + relayPullTimeoutMs = 5000 + relayPullReadAvTimeoutMs = 5000 + calcSessionStatIntervalSec uint32 = 5 + + // checkSessionAliveIntervalSec + // + // - 对于输入型session,检查一定时间内,是否没有收到数据 + // - 对于输出型session,检查一定时间内,是否没有发送数据 + // 注意,这里既检查socket发送阻塞,又检查上层没有给session喂数据 + // + checkSessionAliveIntervalSec uint32 = 10 +) diff --git a/pkg/mpegts/mpegts.go b/pkg/mpegts/mpegts.go index 13bfad8..9115fdf 100644 --- a/pkg/mpegts/mpegts.go +++ b/pkg/mpegts/mpegts.go @@ -10,7 +10,7 @@ package mpegts // MPEG: Moving Picture Experts Group -// 每个TS文件都以固定的PAT,PMT开始 +// FixedFragmentHeader 每个TS文件都以固定的PAT,PMT开始 var FixedFragmentHeader = []byte{ /* TS */ 0x47, 0x40, 0x00, 0x10, 0x00, @@ -70,7 +70,7 @@ var FixedFragmentHeader = []byte{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, } -// 每个TS文件都以固定的PAT,PMT开始 +// FixedFragmentHeaderHevc 每个TS文件都以固定的PAT,PMT开始 var FixedFragmentHeaderHevc = []byte{ /* TS */ 0x47, 0x40, 0x00, 0x10, 0x00, @@ -140,7 +140,7 @@ const ( PidVideo uint16 = 0x100 PidAudio uint16 = 0x101 - // ------------------------------------------ + // AdaptationFieldControlReserved ------------------------------------------ // // ------------------------------------------ AdaptationFieldControlReserved uint8 = 0 // Reserved for future use by ISO/IEC @@ -164,13 +164,13 @@ const ( // PES const ( - // ----------------------------------------------------------------- + // StreamIdAudio ----------------------------------------------------------------- //
// ----------------------------------------------------------------- StreamIdAudio uint8 = 192 // 110x xxxx 0xC0 StreamIdVideo uint8 = 224 // 1110 xxxx - // ------------------------------ + // PtsDtsFlags0 ------------------------------ // // ------------------------------ PtsDtsFlags0 uint8 = 0 // no PTS no DTS diff --git a/pkg/mpegts/pack.go b/pkg/mpegts/pack.go index fe89f7d..b482566 100644 --- a/pkg/mpegts/pack.go +++ b/pkg/mpegts/pack.go @@ -8,6 +8,8 @@ package mpegts +// Frame 帧数据 +// type Frame struct { Pts uint64 // =(毫秒 * 90) Dts uint64 @@ -32,11 +34,11 @@ type Frame struct { Raw []byte } -// @param packet: 188字节大小的TS包,注意,一次Pack对应的多个TSPacket,复用的是一块内存 +// OnTsPacket @param packet: 188字节大小的TS包,注意,一次Pack对应的多个TSPacket,复用的是一块内存 // type OnTsPacket func(packet []byte) -// Annexb格式的流转换为mpegts packet +// PackTsPacket Annexb格式的流转换为mpegts packet // // @param frame: 各字段含义见mpegts.Frame结构体定义 // frame.CC 注意,内部会修改frame.CC的值,外部在调用结束后,可保存CC的值,供下次调用时使用 diff --git a/pkg/mpegts/pat.go b/pkg/mpegts/pat.go index d136910..466501a 100644 --- a/pkg/mpegts/pat.go +++ b/pkg/mpegts/pat.go @@ -12,7 +12,7 @@ import ( "github.com/q191201771/naza/pkg/nazabits" ) -// --------------------------------------------------------------------------------------------------- +// Pat --------------------------------------------------------------------------------------------------- // Program association section // <2.4.4.3> // table_id [8b] * diff --git a/pkg/mpegts/pes.go b/pkg/mpegts/pes.go index 75c9cfb..6bf6570 100644 --- a/pkg/mpegts/pes.go +++ b/pkg/mpegts/pes.go @@ -12,7 +12,7 @@ import ( "github.com/q191201771/naza/pkg/nazabits" ) -// ----------------------------------------------------------- +// Pes ----------------------------------------------------------- // // <2.4.3.6 PES packet> //
diff --git a/pkg/mpegts/pmt.go b/pkg/mpegts/pmt.go index d9f39c1..004618d 100644 --- a/pkg/mpegts/pmt.go +++ b/pkg/mpegts/pmt.go @@ -12,7 +12,7 @@ import ( "github.com/q191201771/naza/pkg/nazabits" ) -// ---------------------------------------- +// Pmt ---------------------------------------- // Program Map Table // <2.4.4.8> // table_id [8b] * diff --git a/pkg/mpegts/ts_packet_header.go b/pkg/mpegts/ts_packet_header.go index 9e1162c..fc788f6 100644 --- a/pkg/mpegts/ts_packet_header.go +++ b/pkg/mpegts/ts_packet_header.go @@ -12,7 +12,7 @@ import ( "github.com/q191201771/naza/pkg/nazabits" ) -// ------------------------------------------------ +// TsPacketHeader ------------------------------------------------ // <2.4.3.2> // sync_byte [8b] * always 0x47 // transport_error_indicator [1b] @@ -34,7 +34,7 @@ type TsPacketHeader struct { Cc uint8 } -// ---------------------------------------------------------- +// TsPacketAdaptation ---------------------------------------------------------- //
// adaptation_field_length [8b] * 不包括自己这1字节 // discontinuity_indicator [1b] @@ -54,7 +54,7 @@ type TsPacketAdaptation struct { Length uint8 } -// 解析4字节TS Packet header +// ParseTsPacketHeader 解析4字节TS Packet header func ParseTsPacketHeader(b []byte) (h TsPacketHeader) { // TODO chef: 检查长度 br := nazabits.NewBitReader(b) @@ -69,7 +69,7 @@ func ParseTsPacketHeader(b []byte) (h TsPacketHeader) { return } -// TODO chef +// ParseTsPacketAdaptation TODO chef func ParseTsPacketAdaptation(b []byte) (f TsPacketAdaptation) { br := nazabits.NewBitReader(b) f.Length, _ = br.ReadBits8(8) diff --git a/pkg/remux/avpacket2rtmp.go b/pkg/remux/avpacket2rtmp.go index f36059d..93603e2 100644 --- a/pkg/remux/avpacket2rtmp.go +++ b/pkg/remux/avpacket2rtmp.go @@ -19,7 +19,7 @@ import ( "github.com/q191201771/naza/pkg/bele" ) -// AvPacket转换为RTMP +// AvPacket2RtmpRemuxer AvPacket转换为RTMP // 目前AvPacket来自RTSP的sdp以及rtp的合帧包。理论上也支持webrtc,后续接入webrtc时再验证 // type AvPacket2RtmpRemuxer struct { @@ -42,7 +42,7 @@ func NewAvPacket2RtmpRemuxer(onRtmpAvMsg rtmp.OnReadRtmpAvMsg) *AvPacket2RtmpRem } } -// 实现RTSP回调数据的三个接口,使得接入时方便些 +// OnRtpPacket 实现RTSP回调数据的三个接口,使得接入时方便些 func (r *AvPacket2RtmpRemuxer) OnRtpPacket(pkt rtprtcp.RtpPacket) { // noop } @@ -53,7 +53,7 @@ func (r *AvPacket2RtmpRemuxer) OnAvPacket(pkt base.AvPacket) { r.FeedAvPacket(pkt) } -// rtsp场景下,有时sps、pps等信息只包含在sdp中,有时包含在rtp包中, +// InitWithAvConfig rtsp场景下,有时sps、pps等信息只包含在sdp中,有时包含在rtp包中, // 这里提供输入sdp的sps、pps等信息的机会,如果没有,可以不调用 // // 内部不持有输入参数的内存块 @@ -112,7 +112,7 @@ func (r *AvPacket2RtmpRemuxer) InitWithAvConfig(asc, vps, sps, pps []byte) { } } -// @param pkt: 内部不持有该内存块 +// FeedAvPacket @param pkt: 内部不持有该内存块 // func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) { switch pkt.PayloadType { diff --git a/pkg/remux/flv2rtmp.go b/pkg/remux/flv2rtmp.go index 730c388..8407511 100644 --- a/pkg/remux/flv2rtmp.go +++ b/pkg/remux/flv2rtmp.go @@ -30,7 +30,7 @@ func FlvTagHeader2RtmpHeader(in httpflv.TagHeader) (out base.RtmpHeader) { return } -// @return msg: 返回的内存块引用参数`tag`的内存块 +// FlvTag2RtmpMsg @return msg: 返回的内存块引用参数`tag`的内存块 // func FlvTag2RtmpMsg(tag httpflv.Tag) (msg base.RtmpMsg) { msg.Header = FlvTagHeader2RtmpHeader(tag.Header) @@ -38,7 +38,7 @@ func FlvTag2RtmpMsg(tag httpflv.Tag) (msg base.RtmpMsg) { return } -// @return 返回的内存块为内部新申请 +// FlvTag2RtmpChunks @return 返回的内存块为内部新申请 // func FlvTag2RtmpChunks(tag httpflv.Tag) []byte { rtmpHeader := FlvTagHeader2RtmpHeader(tag.Header) diff --git a/pkg/remux/gop_cache.go b/pkg/remux/gop_cache.go index c609c3d..3460790 100644 --- a/pkg/remux/gop_cache.go +++ b/pkg/remux/gop_cache.go @@ -65,7 +65,7 @@ type GopCache struct { gopSize int } -// @param gopNum: gop缓存大小 +// NewGopCache @param gopNum: gop缓存大小 // 如果为0,则不缓存音频数据,也即GOP缓存功能不生效 // 如果>0,则缓存个完整GOP,另外还可能有半个最近不完整的GOP // @@ -111,7 +111,7 @@ func (gc *GopCache) Feed(msg base.RtmpMsg, lg LazyGet) { } } -// 获取GOP数量,注意,最后一个可能是不完整的 +// GetGopCount 获取GOP数量,注意,最后一个可能是不完整的 func (gc *GopCache) GetGopCount() int { return (gc.gopRingLast + gc.gopSize - gc.gopRingFirst) % gc.gopSize } diff --git a/pkg/remux/rtmp2flv.go b/pkg/remux/rtmp2flv.go index ac25d02..0aeab89 100644 --- a/pkg/remux/rtmp2flv.go +++ b/pkg/remux/rtmp2flv.go @@ -13,7 +13,7 @@ import ( "github.com/q191201771/lal/pkg/httpflv" ) -// @return 返回的内存块为新申请的独立内存块 +// RtmpMsg2FlvTag @return 返回的内存块为新申请的独立内存块 func RtmpMsg2FlvTag(msg base.RtmpMsg) *httpflv.Tag { var tag httpflv.Tag tag.Header.Type = msg.Header.MsgTypeId diff --git a/pkg/remux/rtmp2rtsp.go b/pkg/remux/rtmp2rtsp.go index 8c20307..77ab9e7 100644 --- a/pkg/remux/rtmp2rtsp.go +++ b/pkg/remux/rtmp2rtsp.go @@ -29,7 +29,7 @@ var ( maxAnalyzeAvMsgSize = 16 ) -// 提供rtmp数据向sdp+rtp数据的转换 +// Rtmp2RtspRemuxer 提供rtmp数据向sdp+rtp数据的转换 type Rtmp2RtspRemuxer struct { onSdp OnSdp onRtpPacket OnRtpPacket @@ -49,7 +49,7 @@ type Rtmp2RtspRemuxer struct { type OnSdp func(sdpCtx sdp.LogicContext) type OnRtpPacket func(pkt rtprtcp.RtpPacket) -// @param onSdp: 每次回调为独立的内存块,回调结束后,内部不再使用该内存块 +// NewRtmp2RtspRemuxer @param onSdp: 每次回调为独立的内存块,回调结束后,内部不再使用该内存块 // @param onRtpPacket: 每次回调为独立的内存块,回调结束后,内部不再使用该内存块 // func NewRtmp2RtspRemuxer(onSdp OnSdp, onRtpPacket OnRtpPacket) *Rtmp2RtspRemuxer { @@ -61,7 +61,7 @@ func NewRtmp2RtspRemuxer(onSdp OnSdp, onRtpPacket OnRtpPacket) *Rtmp2RtspRemuxer } } -// @param msg: 函数调用结束后,内部不持有`msg`内存块 +// FeedRtmpMsg @param msg: 函数调用结束后,内部不持有`msg`内存块 // func (r *Rtmp2RtspRemuxer) FeedRtmpMsg(msg base.RtmpMsg) { var err error diff --git a/pkg/rtmp/chunk_divider.go b/pkg/rtmp/chunk_divider.go index 352b5eb..84e1ad1 100644 --- a/pkg/rtmp/chunk_divider.go +++ b/pkg/rtmp/chunk_divider.go @@ -23,12 +23,12 @@ var defaultChunkDivider = ChunkDivider{ localChunkSize: LocalChunkSize, } -// @return 返回的内存块由内部申请,不依赖参数内存块 +// Message2Chunks @return 返回的内存块由内部申请,不依赖参数内存块 func Message2Chunks(message []byte, header *base.RtmpHeader) []byte { return defaultChunkDivider.Message2Chunks(message, header) } -// TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message +// Message2Chunks TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message func (d *ChunkDivider) Message2Chunks(message []byte, header *base.RtmpHeader) []byte { return message2Chunks(message, header, nil, d.localChunkSize) } diff --git a/pkg/rtmp/client_pull_session.go b/pkg/rtmp/client_pull_session.go index d52698c..61ab1e7 100644 --- a/pkg/rtmp/client_pull_session.go +++ b/pkg/rtmp/client_pull_session.go @@ -53,7 +53,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession { } } -// 阻塞直到和对端完成拉流前的所有准备工作(也即收到RTMP Play response),或者发生错误 +// Pull 阻塞直到和对端完成拉流前的所有准备工作(也即收到RTMP Play response),或者发生错误 // // @param onReadRtmpAvMsg: msg: 注意,回调结束后,`msg`的内存块会被`PullSession`重复使用 // 也即多次回调的`msg`是复用的同一块内存块 @@ -83,42 +83,42 @@ func (s *PullSession) WaitChan() <-chan error { // --------------------------------------------------------------------------------------------------------------------- -// 文档请参考: interface ISessionUrlContext +// Url 文档请参考: interface ISessionUrlContext func (s *PullSession) Url() string { return s.core.Url() } -// 文档请参考: interface ISessionUrlContext +// AppName 文档请参考: interface ISessionUrlContext func (s *PullSession) AppName() string { return s.core.AppName() } -// 文档请参考: interface ISessionUrlContext +// StreamName 文档请参考: interface ISessionUrlContext func (s *PullSession) StreamName() string { return s.core.StreamName() } -// 文档请参考: interface ISessionUrlContext +// RawQuery 文档请参考: interface ISessionUrlContext func (s *PullSession) RawQuery() string { return s.core.RawQuery() } -// 文档请参考: interface IObject +// UniqueKey 文档请参考: interface IObject func (s *PullSession) UniqueKey() string { return s.core.uniqueKey } -// 文档请参考: interface ISessionStat +// GetStat 文档请参考: interface ISessionStat func (s *PullSession) GetStat() base.StatSession { return s.core.GetStat() } -// 文档请参考: interface ISessionStat +// UpdateStat 文档请参考: interface ISessionStat func (s *PullSession) UpdateStat(intervalSec uint32) { s.core.UpdateStat(intervalSec) } -// 文档请参考: interface ISessionStat +// IsAlive 文档请参考: interface ISessionStat func (s *PullSession) IsAlive() (readAlive, writeAlive bool) { return s.core.IsAlive() } diff --git a/pkg/rtmp/client_push_session.go b/pkg/rtmp/client_push_session.go index c360a5d..ffa35ce 100644 --- a/pkg/rtmp/client_push_session.go +++ b/pkg/rtmp/client_push_session.go @@ -54,7 +54,7 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession { } } -// 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTMP Publish response),或者发生错误 +// Push 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTMP Publish response),或者发生错误 func (s *PushSession) Push(rawUrl string) error { return s.core.Do(rawUrl) } @@ -65,7 +65,7 @@ func (s *PushSession) Write(msg []byte) error { return s.core.Write(msg) } -// 将缓存的数据立即刷新发送 +// Flush 将缓存的数据立即刷新发送 // 是否有缓存策略,请参见配置及内部实现 func (s *PushSession) Flush() error { return s.core.Flush() @@ -89,42 +89,42 @@ func (s *PushSession) WaitChan() <-chan error { // --------------------------------------------------------------------------------------------------------------------- -// 文档请参考: interface ISessionUrlContext +// Url 文档请参考: interface ISessionUrlContext func (s *PushSession) Url() string { return s.core.Url() } -// 文档请参考: interface ISessionUrlContext +// AppName 文档请参考: interface ISessionUrlContext func (s *PushSession) AppName() string { return s.core.AppName() } -// 文档请参考: interface ISessionUrlContext +// StreamName 文档请参考: interface ISessionUrlContext func (s *PushSession) StreamName() string { return s.core.StreamName() } -// 文档请参考: interface ISessionUrlContext +// RawQuery 文档请参考: interface ISessionUrlContext func (s *PushSession) RawQuery() string { return s.core.RawQuery() } -// 文档请参考: interface IObject +// UniqueKey 文档请参考: interface IObject func (s *PushSession) UniqueKey() string { return s.core.uniqueKey } -// 文档请参考: interface ISessionStat +// GetStat 文档请参考: interface ISessionStat func (s *PushSession) GetStat() base.StatSession { return s.core.GetStat() } -// 文档请参考: interface ISessionStat +// UpdateStat 文档请参考: interface ISessionStat func (s *PushSession) UpdateStat(intervalSec uint32) { s.core.UpdateStat(intervalSec) } -// 文档请参考: interface ISessionStat +// IsAlive 文档请参考: interface ISessionStat func (s *PushSession) IsAlive() (readAlive, writeAlive bool) { return s.core.IsAlive() } diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 51ffe1f..3c709bb 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -22,7 +22,7 @@ import ( "github.com/q191201771/naza/pkg/connection" ) -// rtmp 客户端类型连接的底层实现 +// ClientSession rtmp 客户端类型连接的底层实现 // package rtmp 的使用者应该优先使用基于 ClientSession 实现的 PushSession 和 PullSession type ClientSession struct { uniqueKey string @@ -84,7 +84,7 @@ var defaultClientSessOption = ClientSessionOption{ type ModClientSessionOption func(option *ClientSessionOption) -// @param t: session的类型,只能是推或者拉 +// NewClientSession @param t: session的类型,只能是推或者拉 func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession { var uk string switch t { @@ -116,7 +116,7 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) stat: base.StatSession{ Protocol: base.ProtocolRtmp, SessionId: uk, - StartTime: time.Now().Format("2006-01-02 15:04:05.999"), + StartTime: base.ReadableNowTime(), }, debugLogReadUserCtrlMsgMax: 5, hc: hc, @@ -125,7 +125,7 @@ func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) return s } -// 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误 +// Do 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误 func (s *ClientSession) Do(rawUrl string) error { Log.Debugf("[%s] Do. url=%s", s.uniqueKey, rawUrl) diff --git a/pkg/rtmp/message_packer.go b/pkg/rtmp/message_packer.go index 3a76e2e..b1c5b5d 100644 --- a/pkg/rtmp/message_packer.go +++ b/pkg/rtmp/message_packer.go @@ -23,7 +23,7 @@ const ( peerBandwidthLimitTypeDynamic = uint8(2) ) -// 打包并发送 rtmp 信令 +// MessagePacker 打包并发送 rtmp 信令 // type MessagePacker struct { b *Buffer diff --git a/pkg/rtmp/metadata.go b/pkg/rtmp/metadata.go index d5a8fb5..a721536 100644 --- a/pkg/rtmp/metadata.go +++ b/pkg/rtmp/metadata.go @@ -32,7 +32,7 @@ func ParseMetadata(b []byte) (ObjectPairArray, error) { return opa, err } -// spec-video_file_format_spec_v10.pdf +// BuildMetadata spec-video_file_format_spec_v10.pdf // onMetaData // - duration DOUBLE, seconds // - width DOUBLE diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/rtmp.go index 67398f7..6b7baaa 100644 --- a/pkg/rtmp/rtmp.go +++ b/pkg/rtmp/rtmp.go @@ -38,6 +38,7 @@ const defaultChunkSize = 128 // 未收到对端设置chunk size时的默认值 const ( //MSID0 = 0 // 所有除 publish、play、onStatus 之外的信令 + Msid1 = 1 // publish、play、onStatus 以及 音视频数据 ) diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index b1992b0..0e5618c 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -13,7 +13,6 @@ import ( "net" "strings" "sync" - "time" "github.com/q191201771/naza/pkg/nazaerrors" @@ -40,7 +39,7 @@ type ServerSessionObserver interface { } type PubSessionObserver interface { - // 注意,回调结束后,内部会复用Payload内存块 + // OnReadRtmpAvMsg 注意,回调结束后,内部会复用Payload内存块 OnReadRtmpAvMsg(msg base.RtmpMsg) } @@ -97,7 +96,7 @@ func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSess stat: base.StatSession{ Protocol: base.ProtocolRtmp, SessionId: uk, - StartTime: time.Now().Format("2006-01-02 15:04:05.999"), + StartTime: base.ReadableNowTime(), RemoteAddr: conn.RemoteAddr().String(), }, uniqueKey: uk, diff --git a/pkg/rtmp/stream.go b/pkg/rtmp/stream.go index e8a97b3..6faea61 100644 --- a/pkg/rtmp/stream.go +++ b/pkg/rtmp/stream.go @@ -57,7 +57,7 @@ type StreamMsg struct { buff *nazabytes.Buffer } -// 确保可写空间,如果不够会扩容 +// Grow 确保可写空间,如果不够会扩容 func (msg *StreamMsg) Grow(n uint32) { msg.buff.Grow(int(n)) } diff --git a/pkg/rtprtcp/ntp.go b/pkg/rtprtcp/ntp.go index a960c31..9226446 100644 --- a/pkg/rtprtcp/ntp.go +++ b/pkg/rtprtcp/ntp.go @@ -11,19 +11,19 @@ package rtprtcp // (70 * 365 + 17) * 24 * 60 * 60 const offset uint64 = 2208988800 -// 将ntp时间戳转换为Unix时间戳,Unix时间戳单位是纳秒 +// Ntp2UnixNano 将ntp时间戳转换为Unix时间戳,Unix时间戳单位是纳秒 func Ntp2UnixNano(v uint64) uint64 { msw := v >> 32 lsw := v & 0xFFFFFFFF return (msw-offset)*1e9 + (lsw*1e9)>>32 } -// 将ntp时间戳(高32位低32位分开的形式)转换为Unix时间戳 +// MswLsw2UnixNano 将ntp时间戳(高32位低32位分开的形式)转换为Unix时间戳 func MswLsw2UnixNano(msw, lsw uint64) uint64 { return Ntp2UnixNano(MswLsw2Ntp(msw, lsw)) } -// msw是ntp的高32位,lsw是ntp的低32位 +// MswLsw2Ntp msw是ntp的高32位,lsw是ntp的低32位 func MswLsw2Ntp(msw, lsw uint64) uint64 { return (msw << 32) | lsw } diff --git a/pkg/rtprtcp/rtcp.go b/pkg/rtprtcp/rtcp.go index ad84d46..204a023 100644 --- a/pkg/rtprtcp/rtcp.go +++ b/pkg/rtprtcp/rtcp.go @@ -119,7 +119,7 @@ func ParseRtcpHeader(b []byte) RtcpHeader { return h } -// rfc3550 6.4.1 +// ParseSr rfc3550 6.4.1 // // @param b rtcp包,包含包头 func ParseSr(b []byte) Sr { @@ -133,7 +133,7 @@ func ParseSr(b []byte) Sr { return s } -// @param out 传出参数,注意,调用方保证长度>=4 +// PackTo @param out 传出参数,注意,调用方保证长度>=4 func (r *RtcpHeader) PackTo(out []byte) { out[0] = r.Version<<6 | r.Padding<<5 | r.CountOrFormat out[1] = r.PacketType diff --git a/pkg/rtprtcp/rtcp_rr_producer.go b/pkg/rtprtcp/rtcp_rr_producer.go index 063a6b2..3764ef7 100644 --- a/pkg/rtprtcp/rtcp_rr_producer.go +++ b/pkg/rtprtcp/rtcp_rr_producer.go @@ -40,7 +40,7 @@ func NewRrProducer(clockRate int) *RrProducer { } } -// 每次收到rtp包,都将seq序号传入这个函数 +// FeedRtpPacket 每次收到rtp包,都将seq序号传入这个函数 func (r *RrProducer) FeedRtpPacket(seq uint16) { r.received++ @@ -62,7 +62,7 @@ func (r *RrProducer) FeedRtpPacket(seq uint16) { r.extendedSeq = (r.cycles << 16) | uint32(r.maxSeq) } -// 收到sr包时,产生rr包 +// Produce 收到sr包时,产生rr包 // // @param lsr: 从sr包中获取,见func SR.GetMiddleNtp // @return: rr包的二进制数据 diff --git a/pkg/rtprtcp/rtp.go b/pkg/rtprtcp/rtp.go index 82b0f2b..cfc5141 100644 --- a/pkg/rtprtcp/rtp.go +++ b/pkg/rtprtcp/rtp.go @@ -28,11 +28,11 @@ const ( NaluTypeAvcStapa = 24 // one packet, multiple nals NaluTypeAvcFua = 28 - // TODO(chef): hevc有stapa格式吗 + // NaluTypeHevcFua TODO(chef): hevc有stapa格式吗 NaluTypeHevcFua = 49 ) -// 比较序号的值,内部处理序号翻转问题,见单元测试中的例子 +// CompareSeq 比较序号的值,内部处理序号翻转问题,见单元测试中的例子 // @return 0 a和b相等 // 1 a大于b // -1 a小于b @@ -56,7 +56,7 @@ func CompareSeq(a, b uint16) int { return 1 } -// a减b的值,内部处理序号翻转问题,如果a小于b,则返回负值,见单元测试中的例子 +// SubSeq a减b的值,内部处理序号翻转问题,如果a小于b,则返回负值,见单元测试中的例子 func SubSeq(a, b uint16) int { if a == b { return 0 diff --git a/pkg/rtprtcp/rtp_packer.go b/pkg/rtprtcp/rtp_packer.go index dc5d60f..d857888 100644 --- a/pkg/rtprtcp/rtp_packer.go +++ b/pkg/rtprtcp/rtp_packer.go @@ -52,7 +52,7 @@ func NewRtpPacker(payloadPacker IRtpPackerPayload, clockRate int, ssrc uint32, m } } -// @param pkt: pkt.Timestamp 绝对时间戳,单位毫秒 +// Pack @param pkt: pkt.Timestamp 绝对时间戳,单位毫秒 // pkt.PayloadType rtp包头中的packet type // func (r *RtpPacker) Pack(pkt base.AvPacket) (out []RtpPacket) { diff --git a/pkg/rtprtcp/rtp_packer_payload.go b/pkg/rtprtcp/rtp_packer_payload.go index 0a52c4c..edab4a6 100644 --- a/pkg/rtprtcp/rtp_packer_payload.go +++ b/pkg/rtprtcp/rtp_packer_payload.go @@ -9,7 +9,7 @@ package rtprtcp type IRtpPackerPayload interface { - // @param maxSize: rtp payload包体部分(不含包头)的最大大小 + // Pack @param maxSize: rtp payload包体部分(不含包头)的最大大小 // Pack(in []byte, maxSize int) (out [][]byte) } diff --git a/pkg/rtprtcp/rtp_packer_payload_avc_hevc.go b/pkg/rtprtcp/rtp_packer_payload_avc_hevc.go index 1625830..e9fa13a 100644 --- a/pkg/rtprtcp/rtp_packer_payload_avc_hevc.go +++ b/pkg/rtprtcp/rtp_packer_payload_avc_hevc.go @@ -56,7 +56,7 @@ func NewRtpPackerPayloadAvcHevc(payloadType base.AvPacketPt, modOptions ...ModRt } } -// @param in: AVCC格式 +// Pack @param in: AVCC格式 // // @return out: 内存块为独立新申请;函数返回后,内部不再持有该内存块 // diff --git a/pkg/rtprtcp/rtp_packet.go b/pkg/rtprtcp/rtp_packet.go index 103f6e2..65770cd 100644 --- a/pkg/rtprtcp/rtp_packet.go +++ b/pkg/rtprtcp/rtp_packet.go @@ -113,7 +113,7 @@ func ParseRtpHeader(b []byte) (h RtpHeader, err error) { return } -// 函数调用结束后,不持有参数的内存块 +// ParseRtpPacket 函数调用结束后,不持有参数的内存块 func ParseRtpPacket(b []byte) (pkt RtpPacket, err error) { pkt.Header, err = ParseRtpHeader(b) if err != nil { @@ -132,7 +132,7 @@ func (p *RtpPacket) Body() []byte { return p.Raw[p.Header.payloadOffset:] } -// @param pt: 取值范围为AvPacketPtAvc或AvPacketPtHevc,否则直接返回false +// IsAvcHevcBoundary @param pt: 取值范围为AvPacketPtAvc或AvPacketPtHevc,否则直接返回false // func IsAvcHevcBoundary(pkt RtpPacket, pt base.AvPacketPt) bool { switch pt { diff --git a/pkg/rtprtcp/rtp_unpack_container.go b/pkg/rtprtcp/rtp_unpack_container.go index f38d25f..ca1609b 100644 --- a/pkg/rtprtcp/rtp_unpack_container.go +++ b/pkg/rtprtcp/rtp_unpack_container.go @@ -35,7 +35,7 @@ func NewRtpUnpackContainer(maxSize int, unpackerProtocol IRtpUnpackerProtocol) * } } -// 输入收到的rtp包 +// Feed 输入收到的rtp包 func (r *RtpUnpackContainer) Feed(pkt RtpPacket) { // 过期的包 if r.isStale(pkt.Header.Seq) { diff --git a/pkg/rtprtcp/rtp_unpacker.go b/pkg/rtprtcp/rtp_unpacker.go index 5902dcf..3a1aca2 100644 --- a/pkg/rtprtcp/rtp_unpacker.go +++ b/pkg/rtprtcp/rtp_unpacker.go @@ -31,10 +31,10 @@ type IRtpUnpackContainer interface { } type IRtpUnpackerProtocol interface { - // 计算rtp包处于帧中的位置 + // CalcPositionIfNeeded 计算rtp包处于帧中的位置 CalcPositionIfNeeded(pkt *RtpPacket) - // 尝试合成一个完整帧 + // TryUnpackOne 尝试合成一个完整帧 // // 从当前队列的第一个包开始合成 // 如果一个rtp包对应一个完整帧,则合成一帧 @@ -46,7 +46,7 @@ type IRtpUnpackerProtocol interface { TryUnpackOne(list *RtpPacketList) (unpackedFlag bool, unpackedSeq uint16) } -// @param pkt: pkt.Timestamp RTP包头中的时间戳(pts)经过clockrate换算后的时间戳,单位毫秒 +// OnAvPacket @param pkt: pkt.Timestamp RTP包头中的时间戳(pts)经过clockrate换算后的时间戳,单位毫秒 // 注意,不支持带B帧的视频流,pts和dts永远相同 // pkt.PayloadType base.AvPacketPTXXX // pkt.Payload AAC: @@ -59,7 +59,7 @@ type IRtpUnpackerProtocol interface { // 假如sps和pps是一个stapA包,则合并结果为一个AvPacket type OnAvPacket func(pkt base.AvPacket) -// 目前支持AVC,HEVC和AAC MPEG4-GENERIC,业务方也可以自己实现IRtpUnpackerProtocol,甚至是IRtpUnpackContainer +// DefaultRtpUnpackerFactory 目前支持AVC,HEVC和AAC MPEG4-GENERIC,业务方也可以自己实现IRtpUnpackerProtocol,甚至是IRtpUnpackContainer func DefaultRtpUnpackerFactory(payloadType base.AvPacketPt, clockRate int, maxSize int, onAvPacket OnAvPacket) IRtpUnpacker { var protocol IRtpUnpackerProtocol switch payloadType { diff --git a/pkg/rtsp/auth.go b/pkg/rtsp/auth.go index 02ae1f8..e53637a 100644 --- a/pkg/rtsp/auth.go +++ b/pkg/rtsp/auth.go @@ -74,7 +74,7 @@ func (a *Auth) FeedWwwAuthenticate(auths []string, username, password string) { } } -// 如果没有调用`FeedWwwAuthenticate`初始化过,则直接返回空字符串 +// MakeAuthorization 如果没有调用`FeedWwwAuthenticate`初始化过,则直接返回空字符串 func (a *Auth) MakeAuthorization(method, uri string) string { if a.Username == "" { return "" diff --git a/pkg/rtsp/avpacket_queue.go b/pkg/rtsp/avpacket_queue.go index abe93cd..66771b5 100644 --- a/pkg/rtsp/avpacket_queue.go +++ b/pkg/rtsp/avpacket_queue.go @@ -41,7 +41,7 @@ func NewAvPacketQueue(onAvPacket OnAvPacket) *AvPacketQueue { } } -// 注意,调用方保证,音频相较于音频,视频相较于视频,时间戳是线性递增的。 +// Feed 注意,调用方保证,音频相较于音频,视频相较于视频,时间戳是线性递增的。 func (a *AvPacketQueue) Feed(pkt base.AvPacket) { //Log.Debugf("AVQ feed. t=%d, ts=%d", pkt.PayloadType, pkt.Timestamp) switch pkt.PayloadType { diff --git a/pkg/rtsp/base_in_session.go b/pkg/rtsp/base_in_session.go index b223abf..b61e441 100644 --- a/pkg/rtsp/base_in_session.go +++ b/pkg/rtsp/base_in_session.go @@ -12,7 +12,6 @@ import ( "encoding/hex" "net" "sync" - "time" "github.com/q191201771/naza/pkg/nazaatomic" @@ -95,7 +94,7 @@ func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *Ba stat: base.StatSession{ Protocol: base.ProtocolRtsp, SessionId: uniqueKey, - StartTime: time.Now().Format("2006-01-02 15:04:05.999"), + StartTime: base.ReadableNowTime(), }, cmdSession: cmdSession, waitChan: make(chan error, 1), @@ -143,7 +142,8 @@ func (session *BaseInSession) InitWithSdp(sdpCtx sdp.LogicContext) { } } -// 如果没有设置回调监听对象,可以通过该函数设置,调用方保证调用该函数发生在调用InitWithSdp之后 +// SetObserver 如果没有设置回调监听对象,可以通过该函数设置,调用方保证调用该函数发生在调用InitWithSdp之后 +// func (session *BaseInSession) SetObserver(observer BaseInSessionObserver) { session.observer = observer @@ -224,7 +224,7 @@ func (session *BaseInSession) HandleInterleavedPacket(b []byte, channel int) { } } -// 发现pull时,需要先给对端发送数据,才能收到数据 +// WriteRtpRtcpDummy 发现pull时,需要先给对端发送数据,才能收到数据 func (session *BaseInSession) WriteRtpRtcpDummy() { if session.videoRtpConn != nil { _ = session.videoRtpConn.Write(dummyRtpPacket) diff --git a/pkg/rtsp/base_out_session.go b/pkg/rtsp/base_out_session.go index 9a9cb15..f03b6df 100644 --- a/pkg/rtsp/base_out_session.go +++ b/pkg/rtsp/base_out_session.go @@ -12,7 +12,6 @@ import ( "encoding/hex" "net" "sync" - "time" "github.com/q191201771/naza/pkg/nazaatomic" @@ -26,7 +25,7 @@ import ( "github.com/q191201771/naza/pkg/nazanet" ) -// out的含义是音视频由本端发送至对端 +// BaseOutSession out的含义是音视频由本端发送至对端 // type BaseOutSession struct { uniqueKey string @@ -66,7 +65,7 @@ func NewBaseOutSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *B stat: base.StatSession{ Protocol: base.ProtocolRtsp, SessionId: uniqueKey, - StartTime: time.Now().Format("2006-01-02 15:04:05.999"), + StartTime: base.ReadableNowTime(), }, audioRtpChannel: -1, videoRtpChannel: -1, diff --git a/pkg/rtsp/client_command_session.go b/pkg/rtsp/client_command_session.go index 2e86fba..f062736 100644 --- a/pkg/rtsp/client_command_session.go +++ b/pkg/rtsp/client_command_session.go @@ -52,7 +52,7 @@ var defaultClientCommandSessionOption = ClientCommandSessionOption{ type ClientCommandSessionObserver interface { OnConnectResult() - // only for PullSession + // OnDescribeResponse only for PullSession OnDescribeResponse(sdpCtx sdp.LogicContext) OnSetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UdpConnection) @@ -62,7 +62,7 @@ type ClientCommandSessionObserver interface { OnInterleavedPacket(packet []byte, channel int) } -// Push和Pull共用,封装了客户端底层信令信令部分。 +// ClientCommandSession Push和Pull共用,封装了客户端底层信令信令部分。 // 业务方应该使用PushSession和PullSession,而不是直接使用ClientCommandSession,除非你确定要这么做。 type ClientCommandSession struct { uniqueKey string @@ -103,7 +103,7 @@ func NewClientCommandSession(t ClientCommandSessionType, uniqueKey string, obser return s } -// only for PushSession +// InitWithSdp only for PushSession func (session *ClientCommandSession) InitWithSdp(sdpCtx sdp.LogicContext) { session.sdpCtx = sdpCtx } diff --git a/pkg/rtsp/client_pull_session.go b/pkg/rtsp/client_pull_session.go index 50657f9..d6ddbd8 100644 --- a/pkg/rtsp/client_pull_session.go +++ b/pkg/rtsp/client_pull_session.go @@ -143,79 +143,79 @@ func (session *PullSession) WaitChan() <-chan error { // --------------------------------------------------------------------------------------------------------------------- -// 文档请参考: interface ISessionUrlContext +// Url 文档请参考: interface ISessionUrlContext func (session *PullSession) Url() string { return session.cmdSession.Url() } -// 文档请参考: interface ISessionUrlContext +// AppName 文档请参考: interface ISessionUrlContext func (session *PullSession) AppName() string { return session.cmdSession.AppName() } -// 文档请参考: interface ISessionUrlContext +// StreamName 文档请参考: interface ISessionUrlContext func (session *PullSession) StreamName() string { return session.cmdSession.StreamName() } -// 文档请参考: interface ISessionUrlContext +// RawQuery 文档请参考: interface ISessionUrlContext func (session *PullSession) RawQuery() string { return session.cmdSession.RawQuery() } -// 文档请参考: interface IObject +// UniqueKey 文档请参考: interface IObject func (session *PullSession) UniqueKey() string { return session.uniqueKey } -// 文档请参考: interface ISessionStat +// GetStat 文档请参考: interface ISessionStat func (session *PullSession) GetStat() base.StatSession { stat := session.baseInSession.GetStat() stat.RemoteAddr = session.cmdSession.RemoteAddr() return stat } -// 文档请参考: interface ISessionStat +// UpdateStat 文档请参考: interface ISessionStat func (session *PullSession) UpdateStat(intervalSec uint32) { session.baseInSession.UpdateStat(intervalSec) } -// 文档请参考: interface ISessionStat +// IsAlive 文档请参考: interface ISessionStat func (session *PullSession) IsAlive() (readAlive, writeAlive bool) { return session.baseInSession.IsAlive() } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnConnectResult ClientCommandSessionObserver, callback by ClientCommandSession func (session *PullSession) OnConnectResult() { // noop } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnDescribeResponse ClientCommandSessionObserver, callback by ClientCommandSession func (session *PullSession) OnDescribeResponse(sdpCtx sdp.LogicContext) { session.baseInSession.InitWithSdp(sdpCtx) } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnSetupWithConn ClientCommandSessionObserver, callback by ClientCommandSession func (session *PullSession) OnSetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UdpConnection) { _ = session.baseInSession.SetupWithConn(uri, rtpConn, rtcpConn) } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnSetupWithChannel ClientCommandSessionObserver, callback by ClientCommandSession func (session *PullSession) OnSetupWithChannel(uri string, rtpChannel, rtcpChannel int) { _ = session.baseInSession.SetupWithChannel(uri, rtpChannel, rtcpChannel) } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnSetupResult ClientCommandSessionObserver, callback by ClientCommandSession func (session *PullSession) OnSetupResult() { session.baseInSession.WriteRtpRtcpDummy() } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnInterleavedPacket ClientCommandSessionObserver, callback by ClientCommandSession func (session *PullSession) OnInterleavedPacket(packet []byte, channel int) { session.baseInSession.HandleInterleavedPacket(packet, channel) } -// IInterleavedPacketWriter, callback by BaseInSession +// WriteInterleavedPacket IInterleavedPacketWriter, callback by BaseInSession func (session *PullSession) WriteInterleavedPacket(packet []byte, channel int) error { return session.cmdSession.WriteInterleavedPacket(packet, channel) } diff --git a/pkg/rtsp/client_push_session.go b/pkg/rtsp/client_push_session.go index 65ecf87..ada9474 100644 --- a/pkg/rtsp/client_push_session.go +++ b/pkg/rtsp/client_push_session.go @@ -136,79 +136,79 @@ func (session *PushSession) WaitChan() <-chan error { // --------------------------------------------------------------------------------------------------------------------- -// 文档请参考: interface ISessionUrlContext +// Url 文档请参考: interface ISessionUrlContext func (session *PushSession) Url() string { return session.cmdSession.Url() } -// 文档请参考: interface ISessionUrlContext +// AppName 文档请参考: interface ISessionUrlContext func (session *PushSession) AppName() string { return session.cmdSession.AppName() } -// 文档请参考: interface ISessionUrlContext +// StreamName 文档请参考: interface ISessionUrlContext func (session *PushSession) StreamName() string { return session.cmdSession.StreamName() } -// 文档请参考: interface ISessionUrlContext +// RawQuery 文档请参考: interface ISessionUrlContext func (session *PushSession) RawQuery() string { return session.cmdSession.RawQuery() } -// 文档请参考: interface IObject +// UniqueKey 文档请参考: interface IObject func (session *PushSession) UniqueKey() string { return session.uniqueKey } -// 文档请参考: interface ISessionStat +// GetStat 文档请参考: interface ISessionStat func (session *PushSession) GetStat() base.StatSession { stat := session.baseOutSession.GetStat() stat.RemoteAddr = session.cmdSession.RemoteAddr() return stat } -// 文档请参考: interface ISessionStat +// UpdateStat 文档请参考: interface ISessionStat func (session *PushSession) UpdateStat(intervalSec uint32) { session.baseOutSession.UpdateStat(intervalSec) } -// 文档请参考: interface ISessionStat +// IsAlive 文档请参考: interface ISessionStat func (session *PushSession) IsAlive() (readAlive, writeAlive bool) { return session.baseOutSession.IsAlive() } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnConnectResult ClientCommandSessionObserver, callback by ClientCommandSession func (session *PushSession) OnConnectResult() { // noop } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnDescribeResponse ClientCommandSessionObserver, callback by ClientCommandSession func (session *PushSession) OnDescribeResponse(sdpCtx sdp.LogicContext) { // noop } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnSetupWithConn ClientCommandSessionObserver, callback by ClientCommandSession func (session *PushSession) OnSetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UdpConnection) { _ = session.baseOutSession.SetupWithConn(uri, rtpConn, rtcpConn) } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnSetupWithChannel ClientCommandSessionObserver, callback by ClientCommandSession func (session *PushSession) OnSetupWithChannel(uri string, rtpChannel, rtcpChannel int) { _ = session.baseOutSession.SetupWithChannel(uri, rtpChannel, rtcpChannel) } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnSetupResult ClientCommandSessionObserver, callback by ClientCommandSession func (session *PushSession) OnSetupResult() { // noop } -// ClientCommandSessionObserver, callback by ClientCommandSession +// OnInterleavedPacket ClientCommandSessionObserver, callback by ClientCommandSession func (session *PushSession) OnInterleavedPacket(packet []byte, channel int) { session.baseOutSession.HandleInterleavedPacket(packet, channel) } -// IInterleavedPacketWriter, callback by BaseOutSession +// WriteInterleavedPacket IInterleavedPacketWriter, callback by BaseOutSession func (session *PushSession) WriteInterleavedPacket(packet []byte, channel int) error { return session.cmdSession.WriteInterleavedPacket(packet, channel) } diff --git a/pkg/rtsp/pack.go b/pkg/rtsp/pack.go index 84e2ebc..294111d 100644 --- a/pkg/rtsp/pack.go +++ b/pkg/rtsp/pack.go @@ -17,7 +17,7 @@ import ( // rfc2326 10.1 OPTIONS -// CSeq +// ResponseOptionsTmpl CSeq var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\n" + "Server: " + base.LalRtspOptionsResponseServer + "\r\n" + "CSeq: %s\r\n" + @@ -27,14 +27,14 @@ var ResponseOptionsTmpl = "RTSP/1.0 200 OK\r\n" + // rfc2326 10.3 ANNOUNCE //var RequestAnnounceTmpl = "not impl" -// CSeq +// ResponseAnnounceTmpl CSeq var ResponseAnnounceTmpl = "RTSP/1.0 200 OK\r\n" + "CSeq: %s\r\n" + "\r\n" // rfc2326 10.2 DESCRIBE -// CSeq, Date, Content-Length, +// ResponseDescribeTmpl CSeq, Date, Content-Length, var ResponseDescribeTmpl = "RTSP/1.0 200 OK\r\n" + "CSeq: %s\r\n" + "Date: %s\r\n" + @@ -43,7 +43,7 @@ var ResponseDescribeTmpl = "RTSP/1.0 200 OK\r\n" + "\r\n" + "%s" -// rfc2326 10.4 SETUP +// ResponseSetupTmpl rfc2326 10.4 SETUP // CSeq, Date, Session, Transport var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\n" + "CSeq: %s\r\n" + @@ -55,7 +55,7 @@ var ResponseSetupTmpl = "RTSP/1.0 200 OK\r\n" + // rfc2326 10.11 RECORD //var RequestRecordTmpl = "not impl" -// CSeq, Session +// ResponseRecordTmpl CSeq, Session var ResponseRecordTmpl = "RTSP/1.0 200 OK\r\n" + "CSeq: %s\r\n" + "Session: %s\r\n" + @@ -63,7 +63,7 @@ var ResponseRecordTmpl = "RTSP/1.0 200 OK\r\n" + // rfc2326 10.5 PLAY -// CSeq Date +// ResponsePlayTmpl CSeq Date var ResponsePlayTmpl = "RTSP/1.0 200 OK\r\n" + "CSeq: %s\r\n" + "Date: %s\r\n" + @@ -72,7 +72,7 @@ var ResponsePlayTmpl = "RTSP/1.0 200 OK\r\n" + // rfc2326 10.7 TEARDOWN //var RequestTeardownTmpl = "not impl" -// CSeq +// ResponseTeardownTmpl CSeq var ResponseTeardownTmpl = "RTSP/1.0 200 OK\r\n" + "CSeq: %s\r\n" + "\r\n" @@ -109,7 +109,7 @@ func PackResponseTeardown(cseq string) string { return fmt.Sprintf(ResponseTeardownTmpl, cseq) } -// @param body 可以为空 +// PackRequest @param body 可以为空 func PackRequest(method, uri string, headers map[string]string, body string) (ret string) { ret = method + " " + uri + " RTSP/1.0\r\n" for k, v := range headers { diff --git a/pkg/rtsp/rtsp.go b/pkg/rtsp/rtsp.go index a4c1e8a..424ee40 100644 --- a/pkg/rtsp/rtsp.go +++ b/pkg/rtsp/rtsp.go @@ -41,7 +41,7 @@ const ( ) const ( - // header key + // HeaderAccept header key HeaderAccept = "Accept" HeaderUserAgent = "User-Agent" HeaderCSeq = "CSeq" @@ -53,7 +53,7 @@ const ( HeaderAuthorization = "Authorization" HeaderPublic = "Public" - // header value + // HeaderAcceptApplicationSdp header value HeaderAcceptApplicationSdp = "application/sdp" HeaderRangeDefault = "npt=0.000-" HeaderTransportClientPlayTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d" // localRtpPort, localRtcpPort @@ -61,8 +61,11 @@ const ( HeaderTransportClientRecordTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;mode=record" HeaderTransportClientRecordTcpTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=record" HeaderTransportServerPlayTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;server_port=%d-%d" + //HeaderTransportServerPlayTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d" + HeaderTransportServerRecordTmpl = "RTP/AVP/UDP;unicast;client_port=%d-%d;server_port=%d-%d;mode=record" + //HeaderTransportServerRecordTCPTmpl = "RTP/AVP/TCP;unicast;interleaved=%d-%d;mode=record" ) diff --git a/pkg/rtsp/server.go b/pkg/rtsp/server.go index 0d07fe6..7f98f38 100644 --- a/pkg/rtsp/server.go +++ b/pkg/rtsp/server.go @@ -13,10 +13,10 @@ import ( ) type ServerObserver interface { - // @brief 使得上层有能力管理未进化到Pub、Sub阶段的Session + // OnNewRtspSessionConnect @brief 使得上层有能力管理未进化到Pub、Sub阶段的Session OnNewRtspSessionConnect(session *ServerCommandSession) - // @brief 注意,对于已经进化到了Pub、Sub阶段的Session,该回调依然会被调用 + // OnDelRtspSession @brief 注意,对于已经进化到了Pub、Sub阶段的Session,该回调依然会被调用 OnDelRtspSession(session *ServerCommandSession) /////////////////////////////////////////////////////////////////////////// diff --git a/pkg/rtsp/server_pub_session.go b/pkg/rtsp/server_pub_session.go index 4fd03e9..cdfd169 100644 --- a/pkg/rtsp/server_pub_session.go +++ b/pkg/rtsp/server_pub_session.go @@ -107,7 +107,7 @@ func (session *PubSession) IsAlive() (readAlive, writeAlive bool) { return session.baseInSession.IsAlive() } -// IInterleavedPacketWriter, callback by BaseInSession +// WriteInterleavedPacket IInterleavedPacketWriter, callback by BaseInSession func (session *PubSession) WriteInterleavedPacket(packet []byte, channel int) error { return session.cmdSession.WriteInterleavedPacket(packet, channel) } diff --git a/pkg/rtsp/server_sub_session.go b/pkg/rtsp/server_sub_session.go index 47761c3..e6471dd 100644 --- a/pkg/rtsp/server_sub_session.go +++ b/pkg/rtsp/server_sub_session.go @@ -102,7 +102,7 @@ func (session *SubSession) IsAlive() (readAlive, writeAlive bool) { return session.baseOutSession.IsAlive() } -// IInterleavedPacketWriter, callback by BaseOutSession +// WriteInterleavedPacket IInterleavedPacketWriter, callback by BaseOutSession func (session *SubSession) WriteInterleavedPacket(packet []byte, channel int) error { return session.cmdSession.WriteInterleavedPacket(packet, channel) } diff --git a/pkg/sdp/parse_raw.go b/pkg/sdp/parse_raw.go index 65d0311..7777d4c 100644 --- a/pkg/sdp/parse_raw.go +++ b/pkg/sdp/parse_raw.go @@ -92,7 +92,7 @@ func ParseM(s string) (ret M, err error) { return } -// 例子见单元测试 +// ParseARtpMap 例子见单元测试 func ParseARtpMap(s string) (ret ARtpMap, err error) { // rfc 3640 3.3.1. General // rfc 3640 3.3.6. High Bit-rate AAC @@ -131,7 +131,7 @@ func ParseARtpMap(s string) (ret ARtpMap, err error) { return } -// 例子见单元测试 +// ParseAFmtPBase 例子见单元测试 func ParseAFmtPBase(s string) (ret AFmtPBase, err error) { // rfc 3640 4.4.1. The a=fmtp Keyword //