- [refactor] 转换mpegts使用remux.Rtmp2MpegtsRemuxer不再依赖hls.Muxer - [opt] lalserver: relay push增加超时检查,增加带宽统计 - [refactor] 所有interface类型以i字母开头

pull/138/head v0.28.0
q191201771 3 years ago
parent c2fa468162
commit 8824038415

@ -1,8 +1,8 @@
# LAL
[![Platform](https://img.shields.io/badge/platform-linux%20%7C%20macos%20%7C%20windows-green.svg)](https://github.com/q191201771/lal)
[![Release](https://img.shields.io/github/tag/q191201771/lal.svg?label=release)](https://github.com/q191201771/lal/releases)
[![CI](https://github.com/q191201771/lal/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/q191201771/lal/actions/workflows/ci.yml)
[![codecov](https://codecov.io/gh/q191201771/lal/branch/master/graph/badge.svg?token=YxPgzXAAmV)](https://codecov.io/gh/q191201771/lal)
[![goreportcard](https://goreportcard.com/badge/github.com/q191201771/lal)](https://goreportcard.com/report/github.com/q191201771/lal)
![wechat](https://img.shields.io/:微信-q191201771-blue.svg)
![qqgroup](https://img.shields.io/:QQ群-1090510973-blue.svg)
@ -86,9 +86,11 @@ Play multi protocol stream from lalserver via ffplay:
```shell
$ffplay rtmp://127.0.0.1/live/test110
$ffplay rtsp://127.0.0.1:5544/live/test110
$ffplay http://127.0.0.1:8080/live/test110.flv
$ffplay http://127.0.0.1:8080/hls/test110/playlist.m3u8
$ffplay http://127.0.0.1:8080/hls/test110/record.m3u8
$ffplay http://127.0.0.1:8080/hls/test110.m3u8
$ffplay http://127.0.0.1:8080/live/test110.ts
```

@ -45,7 +45,7 @@ func main() {
}
streamName := ctx.LastItemOfPath
hlsMuexer := hls.NewMuxer(streamName, true, &hlsMuxerConfig, nil)
hlsMuexer := hls.NewMuxer(streamName, &hlsMuxerConfig, nil)
hlsMuexer.Start()
rtmp2Mpegts := remux.NewRtmp2MpegtsRemuxer(hlsMuexer)

@ -28,7 +28,36 @@ import (
// 情况1: 协议正常走完回调OnAdd在自身server的RunLoop结束后回调OnDel
// 情况2: 在group中pull阻塞结束后手动回调OnDel
// 情况3: 在logic中sub RunLoop结束后手动回调OnDel
// TODO(chef): 整理所有Server类型Session的生命周期管理
// -
// - rtmp没有独立的Pub、Sub Session结构体类型而是直接使用ServerSession
// - write失败需要反应到loop来
// - rtsp是否也应该上层使用Command作为代理避免生命周期管理混乱
//
// server.pub: rtmp(), rtsp
// server.sub: rtmp(), rtsp, flv, ts, 还有一个比较特殊的hls
//
// client.push: rtmp, rtsp
// client.pull: rtmp, rtsp, flv
//
// other: rtmp.ClientSession, rtmp.ServerSession
// rtsp.BaseInSession, rtsp.BaseOutSession, rtsp.ClientCommandSession, rtsp.ServerCommandSessionS
// base.HttpSubSession
// ISessionUrlContext 实际测试
//
// | | 实际url | Url() | AppName, StreamName, RawQuery |
// | - | - | - | - |
// | rtmp pub推流 | rtmp://127.0.0.1:1935/live/test110 | 同实际url | live, test110, |
// | | rtmp://127.0.0.1:1935/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b, c/d/test110, p1=1&p2=2 |
// | rtsp pub推流 | rtsp://localhost:5544/live/test110 | 同实际url | live, test110, |
// | rtsp pub推流 | rtsp://localhost:5544/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | httpflv sub拉流 | http://127.0.0.1:8080/live/test110.flv | 同实际url | live, test110, |
// | | http://127.0.0.1:8080/a/b/c/d/test110.flv?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | rtmp sub拉流 | 同rtmp pub | . | . |
// | rtsp sub拉流 | 同rtsp pub | . | . |
// | httpts sub拉流 | 同httpflv sub只是末尾的.flv换成.ts不再赘述 | . | . |
// IsUseClosedConnectionError 当connection处于这些情况时就不需要再Close了
// TODO(chef): 临时放这

@ -19,7 +19,11 @@ import (
"github.com/q191201771/lal/pkg/base"
)
type MuxerObserver interface {
type IMuxerObserver interface {
// OnFragmentOpen
//
// 内部决定开启新的fragment切片将该事件通知给上层
//
OnFragmentOpen()
}
@ -43,7 +47,7 @@ const (
// Muxer
//
// 输入mpegts流出hls(m3u8+ts)至文件中
// 输入mpegts流出hls(m3u8+ts)至文件中
//
type Muxer struct {
UniqueKey string
@ -56,8 +60,7 @@ type Muxer struct {
recordPlayListFilenameBak string // const after init
config *MuxerConfig
enable bool
observer MuxerObserver
observer IMuxerObserver
fragment Fragment
videoCc uint8
@ -88,10 +91,9 @@ type fragmentInfo struct {
// NewMuxer
//
// @param enable 如果false说明hls功能没开也即不写文件但是MuxerObserver依然会回调
// @param observer 可以为nil如果不为nilTS流将回调给上层
//
func NewMuxer(streamName string, enable bool, config *MuxerConfig, observer MuxerObserver) *Muxer {
func NewMuxer(streamName string, config *MuxerConfig, observer IMuxerObserver) *Muxer {
uk := base.GenUkHlsMuxer()
op := PathStrategy.GetMuxerOutPath(config.OutPath, streamName)
playlistFilename := PathStrategy.GetLiveM3u8FileName(op, streamName)
@ -106,7 +108,6 @@ func NewMuxer(streamName string, enable bool, config *MuxerConfig, observer Muxe
playlistFilenameBak: playlistFilenameBak,
recordPlayListFilename: recordPlaylistFilename,
recordPlayListFilenameBak: recordPlaylistFilenameBak,
enable: enable,
config: config,
observer: observer,
}
@ -131,7 +132,7 @@ func (m *Muxer) Dispose() {
// OnPatPmt OnTsPackets
//
// 实现 remux.Rtmp2MpegtsRemuxerObserver方便直接将 remux.Rtmp2MpegtsRemuxer 的数据喂入 hls.Muxer
// 实现 remux.IRtmp2MpegtsRemuxerObserver方便直接将 remux.Rtmp2MpegtsRemuxer 的数据喂入 hls.Muxer
//
func (m *Muxer) OnPatPmt(b []byte) {
m.FeedPatPmt(b)
@ -273,14 +274,15 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error {
filename := PathStrategy.GetTsFileName(m.streamName, id, int(Clock.Now().UnixNano()/1e6))
filenameWithPath := PathStrategy.GetTsFileNameWithPath(m.outPath, filename)
if m.enable {
if err := m.fragment.OpenFile(filenameWithPath); err != nil {
return err
}
if err := m.fragment.WriteFile(m.patpmt); err != nil {
return err
}
if err := m.fragment.OpenFile(filenameWithPath); err != nil {
return err
}
if err := m.fragment.WriteFile(m.patpmt); err != nil {
return err
}
m.opened = true
frag := m.getCurrFrag()
@ -307,10 +309,8 @@ func (m *Muxer) closeFragment(isLast bool) error {
return nil
}
if m.enable {
if err := m.fragment.CloseFile(); err != nil {
return err
}
if err := m.fragment.CloseFile(); err != nil {
return err
}
m.opened = false
@ -339,10 +339,6 @@ func (m *Muxer) closeFragment(isLast bool) error {
}
func (m *Muxer) writeRecordPlaylist() {
if !m.enable {
return
}
// 找出整个直播流从开始到结束最大的分片时长
currFrag := m.getClosedFrag()
if currFrag.duration > m.recordMaxFragDuration {
@ -392,10 +388,6 @@ func (m *Muxer) writeRecordPlaylist() {
}
func (m *Muxer) writePlaylist(isLast bool) {
if !m.enable {
return
}
// 找出时长最长的fragment
maxFrag := float64(m.config.FragmentDurationMs) / 1000
m.iterateFragsInPlaylist(func(frag *fragmentInfo) {
@ -430,9 +422,6 @@ func (m *Muxer) writePlaylist(isLast bool) {
}
func (m *Muxer) ensureDir() {
if !m.enable {
return
}
//err := fslCtx.RemoveAll(m.outPath)
//Log.Assert(nil, err)
err := fslCtx.MkdirAll(m.outPath, 0777)

@ -19,39 +19,9 @@ import (
"github.com/q191201771/lal/pkg/rtsp"
)
// TODO(chef): refactor 有的interface以I开头有的不是
// TODO(chef): 整理所有Server类型Session的生命周期管理
// -
// - rtmp没有独立的Pub、Sub Session结构体类型而是直接使用ServerSession
// - write失败需要反应到loop来
// - rtsp是否也应该上层使用Command作为代理避免生命周期管理混乱
//
// server.pub: rtmp(), rtsp
// server.sub: rtmp(), rtsp, flv, ts, 还有一个比较特殊的hls
//
// client.push: rtmp, rtsp
// client.pull: rtmp, rtsp, flv
//
// other: rtmp.ClientSession, rtmp.ServerSession
// rtsp.BaseInSession, rtsp.BaseOutSession, rtsp.ClientCommandSession, rtsp.ServerCommandSessionS
// base.HttpSubSession
// ISessionUrlContext 实际测试
//
// | | 实际url | Url() | AppName, StreamName, RawQuery |
// | - | - | - | - |
// | rtmp pub推流 | rtmp://127.0.0.1:1935/live/test110 | 同实际url | live, test110, |
// | | rtmp://127.0.0.1:1935/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b, c/d/test110, p1=1&p2=2 |
// | rtsp pub推流 | rtsp://localhost:5544/live/test110 | 同实际url | live, test110, |
// | rtsp pub推流 | rtsp://localhost:5544/a/b/c/d/test110?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | httpflv sub拉流 | http://127.0.0.1:8080/live/test110.flv | 同实际url | live, test110, |
// | | http://127.0.0.1:8080/a/b/c/d/test110.flv?p1=1&p2=2 | 同实际url | a/b/c/d, test110, p1=1&p2=2 |
// | rtmp sub拉流 | 同rtmp pub | . | . |
// | rtsp sub拉流 | 同rtsp pub | . | . |
// | httpts sub拉流 | 同httpflv sub只是末尾的.flv换成.ts不再赘述 | . | . |
//
// TODO(chef):
// 规范检查
// 1. 所有interface以I开头
// IClientSession: 所有Client Session都满足
var (
@ -167,33 +137,33 @@ var (
// ---------------------------------------------------------------------------------------------------------------------
var _ logic.ILalServer = &logic.ServerManager{}
var _ rtmp.ServerObserver = &logic.ServerManager{}
var _ logic.HttpServerHandlerObserver = &logic.ServerManager{}
var _ rtsp.ServerObserver = &logic.ServerManager{}
var _ rtmp.IServerObserver = &logic.ServerManager{}
var _ logic.IHttpServerHandlerObserver = &logic.ServerManager{}
var _ rtsp.IServerObserver = &logic.ServerManager{}
var _ logic.IGroupCreator = &logic.ServerManager{}
var _ logic.GroupObserver = &logic.ServerManager{}
var _ logic.IGroupObserver = &logic.ServerManager{}
var _ logic.INotifyHandler = &logic.HttpNotify{}
var _ logic.IGroupManager = &logic.SimpleGroupManager{}
var _ logic.IGroupManager = &logic.ComplexGroupManager{}
var _ rtmp.PubSessionObserver = &logic.Group{} //
var _ rtsp.PullSessionObserver = &logic.Group{}
var _ rtsp.PullSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ rtsp.PubSessionObserver = &logic.Group{}
var _ rtsp.PubSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ hls.MuxerObserver = &logic.Group{}
var _ rtsp.BaseInSessionObserver = &logic.Group{} //
var _ rtsp.BaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ remux.Rtmp2MpegtsRemuxerObserver = &hls.Muxer{}
var _ rtmp.ServerSessionObserver = &rtmp.Server{}
var _ rtmp.IPubSessionObserver = &logic.Group{} //
var _ rtsp.IPullSessionObserver = &logic.Group{}
var _ rtsp.IPullSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ rtsp.IPubSessionObserver = &logic.Group{}
var _ rtsp.IPubSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ hls.IMuxerObserver = &logic.Group{}
var _ rtsp.IBaseInSessionObserver = &logic.Group{} //
var _ rtsp.IBaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ remux.IRtmp2MpegtsRemuxerObserver = &hls.Muxer{}
var _ rtmp.IServerSessionObserver = &rtmp.Server{}
var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientSimple{}
var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientComplex{}
var _ rtsp.ServerCommandSessionObserver = &rtsp.Server{}
var _ rtsp.ClientCommandSessionObserver = &rtsp.PushSession{}
var _ rtsp.ClientCommandSessionObserver = &rtsp.PullSession{}
var _ rtsp.IServerCommandSessionObserver = &rtsp.Server{}
var _ rtsp.IClientCommandSessionObserver = &rtsp.PushSession{}
var _ rtsp.IClientCommandSessionObserver = &rtsp.PullSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.PushSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.PullSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.PubSession{}

@ -24,7 +24,7 @@ import (
"github.com/q191201771/lal/pkg/sdp"
)
type GroupObserver interface {
type IGroupObserver interface {
CleanupHlsIfNeeded(appName string, streamName string, path string)
}
@ -33,7 +33,7 @@ type Group struct {
appName string // const after init
streamName string // const after init TODO chef: 和stat里的字段重复可以删除掉
config *Config
observer GroupObserver
observer IGroupObserver
exitChan chan struct{}
@ -79,7 +79,7 @@ type Group struct {
stat base.StatGroup
}
func NewGroup(appName string, streamName string, config *Config, observer GroupObserver) *Group {
func NewGroup(appName string, streamName string, config *Config, observer IGroupObserver) *Group {
uk := base.GenUkGroup()
g := &Group{
@ -304,7 +304,6 @@ func (group *Group) HasOutSession() bool {
// disposeInactiveSessions 关闭不活跃的session
//
// TODO(chef): [fix] Push是否需要检查
// TODO chef: [refactor] 梳理和naza.Connection超时重复部分
//
func (group *Group) disposeInactiveSessions() {
@ -350,12 +349,19 @@ func (group *Group) disposeInactiveSessions() {
session.Dispose()
}
}
for _, item := range group.url2PushProxy {
session := item.pushSession
if item.isPushing || session != nil {
if _, writeAlive := session.IsAlive(); !writeAlive {
Log.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
session.Dispose()
}
}
}
}
// updateAllSessionStat 更新所有session的状态
//
// TODO(chef): [fix] Push是否需要更新
//
func (group *Group) updateAllSessionStat() {
if group.rtmpPubSession != nil {
group.rtmpPubSession.UpdateStat(calcSessionStatIntervalSec)
@ -378,6 +384,12 @@ func (group *Group) updateAllSessionStat() {
for session := range group.rtspSubSessionSet {
session.UpdateStat(calcSessionStatIntervalSec)
}
for _, item := range group.url2PushProxy {
session := item.pushSession
if item.isPushing || session != nil {
session.UpdateStat(calcSessionStatIntervalSec)
}
}
}
func (group *Group) hasPubSession() bool {

@ -44,6 +44,7 @@ func (group *Group) stopRecordFlvIfNeeded() {
if !group.config.RecordConfig.EnableFlv {
return
}
if group.recordFlv != nil {
_ = group.recordFlv.Dispose()
group.recordFlv = nil

@ -19,20 +19,19 @@ func (group *Group) IsHlsMuxerAlive() bool {
// startHlsIfNeeded 必要时启动hls
//
func (group *Group) startHlsIfNeeded() {
// TODO(chef): [refactor] ts依赖hls
if !group.config.HlsConfig.Enable {
if !group.config.HlsConfig.Enable && !group.config.HlsConfig.EnableHttps {
return
}
enable := group.config.HlsConfig.Enable || group.config.HlsConfig.EnableHttps
group.hlsMuxer = hls.NewMuxer(group.streamName, enable, &group.config.HlsConfig.MuxerConfig, group)
group.hlsMuxer = hls.NewMuxer(group.streamName, &group.config.HlsConfig.MuxerConfig, group)
group.hlsMuxer.Start()
}
func (group *Group) stopHlsIfNeeded() {
if !group.config.HlsConfig.Enable {
if !group.config.HlsConfig.Enable && !group.config.HlsConfig.EnableHttps {
return
}
if group.hlsMuxer != nil {
group.hlsMuxer.Dispose()
group.observer.CleanupHlsIfNeeded(group.appName, group.streamName, group.hlsMuxer.OutPath())

@ -38,6 +38,7 @@ func (group *Group) stopRecordMpegtsIfNeeded() {
if !group.config.RecordConfig.EnableMpegts {
return
}
if group.recordMpegts != nil {
_ = group.recordMpegts.Dispose()
group.recordMpegts = nil

@ -17,7 +17,7 @@ import (
"github.com/q191201771/lal/pkg/httpts"
)
type HttpServerHandlerObserver interface {
type IHttpServerHandlerObserver interface {
// OnNewHttpflvSubSession
//
// 通知上层有新的拉流者
@ -32,10 +32,10 @@ type HttpServerHandlerObserver interface {
}
type HttpServerHandler struct {
observer HttpServerHandlerObserver
observer IHttpServerHandlerObserver
}
func NewHttpServerHandler(observer HttpServerHandlerObserver) *HttpServerHandler {
func NewHttpServerHandler(observer IHttpServerHandlerObserver) *HttpServerHandler {
return &HttpServerHandler{
observer: observer,
}

@ -374,7 +374,7 @@ func (sm *ServerManager) CtrlKickOutSession(info base.ApiCtrlKickOutSession) bas
}
}
// ----- implement rtmp.ServerObserver interface -----------------------------------------------------------------------
// ----- implement rtmp.IServerObserver interface -----------------------------------------------------------------------
func (sm *ServerManager) OnRtmpConnect(session *rtmp.ServerSession, opa rtmp.ObjectPairArray) {
sm.mutex.Lock()
@ -503,7 +503,7 @@ func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) {
sm.option.NotifyHandler.OnSubStop(info)
}
// ----- implement HttpServerHandlerObserver interface -----------------------------------------------------------------
// ----- implement IHttpServerHandlerObserver interface -----------------------------------------------------------------
func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) error {
sm.mutex.Lock()
@ -611,7 +611,7 @@ func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) {
sm.option.NotifyHandler.OnSubStop(info)
}
// ----- implement rtsp.ServerObserver interface -----------------------------------------------------------------------
// ----- implement rtsp.IServerObserver interface -----------------------------------------------------------------------
func (sm *ServerManager) OnNewRtspSessionConnect(session *rtsp.ServerCommandSession) {
// TODO chef: impl me
@ -740,7 +740,7 @@ func (sm *ServerManager) CreateGroup(appName string, streamName string) *Group {
return NewGroup(appName, streamName, sm.config, sm)
}
// ----- implement GroupObserver interface -----------------------------------------------------------------------------
// ----- implement IGroupObserver interface -----------------------------------------------------------------------------
func (sm *ServerManager) CleanupHlsIfNeeded(appName string, streamName string, path string) {
if sm.config.HlsConfig.Enable &&

@ -10,4 +10,4 @@ package remux
// TODO(chef): refactor 此package更名为avop内部包含remux_xxx2xxx.go, filter_xxx.go, 协议相关(比如rtmp.go)等
var _ rtmp2MpegtsFilterObserver = &Rtmp2MpegtsRemuxer{}
var _ iRtmp2MpegtsFilterObserver = &Rtmp2MpegtsRemuxer{}

@ -28,7 +28,7 @@ const (
maxAudioCacheDelayByVideo uint64 = 300 * 90 // 单位(毫秒*90
)
type Rtmp2MpegtsRemuxerObserver interface {
type IRtmp2MpegtsRemuxerObserver interface {
// OnPatPmt
//
// @param b: const只读内存块上层可以持有但是不允许修改
@ -55,7 +55,7 @@ type Rtmp2MpegtsRemuxerObserver interface {
type Rtmp2MpegtsRemuxer struct {
UniqueKey string
observer Rtmp2MpegtsRemuxerObserver
observer IRtmp2MpegtsRemuxerObserver
filter *rtmp2MpegtsFilter
videoOut []byte // Annexb
spspps []byte // Annexb 也可能是vps+sps+pps
@ -68,7 +68,7 @@ type Rtmp2MpegtsRemuxer struct {
opened bool
}
func NewRtmp2MpegtsRemuxer(observer Rtmp2MpegtsRemuxerObserver) *Rtmp2MpegtsRemuxer {
func NewRtmp2MpegtsRemuxer(observer IRtmp2MpegtsRemuxerObserver) *Rtmp2MpegtsRemuxer {
uk := base.GenUkRtmp2MpegtsRemuxer()
r := &Rtmp2MpegtsRemuxer{
UniqueKey: uk,
@ -124,11 +124,11 @@ func (s *Rtmp2MpegtsRemuxer) FlushAudio() {
s.audioCc = frame.Cc
}
// ---------------------------------------------------------------------------------------------------------------------
// ----- implement of iRtmp2MpegtsFilterObserver ----------------------------------------------------------------------------------------------------------------
// onPatPmt onPop
//
// 实现 rtmp2MpegtsFilterObserver
// 实现 iRtmp2MpegtsFilterObserver
//
func (s *Rtmp2MpegtsRemuxer) onPatPmt(b []byte) {
s.observer.OnPatPmt(b)

@ -22,14 +22,14 @@ import (
type rtmp2MpegtsFilter struct {
maxMsgSize int
data []base.RtmpMsg
observer rtmp2MpegtsFilterObserver
observer iRtmp2MpegtsFilterObserver
audioCodecId int
videoCodecId int
done bool
}
type rtmp2MpegtsFilterObserver interface {
type iRtmp2MpegtsFilterObserver interface {
// OnPatPmt
//
// 该回调一定发生在数据回调之前
@ -47,7 +47,7 @@ type rtmp2MpegtsFilterObserver interface {
//
// @param maxMsgSize: 最大缓存多少个包
//
func newRtmp2MpegtsFilter(maxMsgSize int, observer rtmp2MpegtsFilterObserver) *rtmp2MpegtsFilter {
func newRtmp2MpegtsFilter(maxMsgSize int, observer iRtmp2MpegtsFilterObserver) *rtmp2MpegtsFilter {
return &rtmp2MpegtsFilter{
maxMsgSize: maxMsgSize,
data: make([]base.RtmpMsg, maxMsgSize)[0:0],

@ -12,7 +12,7 @@ import (
"net"
)
type ServerObserver interface {
type IServerObserver interface {
OnRtmpConnect(session *ServerSession, opa ObjectPairArray)
// OnNewRtmpPubSession
@ -35,11 +35,11 @@ type ServerObserver interface {
type Server struct {
addr string
observer ServerObserver
observer IServerObserver
ln net.Listener
}
func NewServer(addr string, observer ServerObserver) *Server {
func NewServer(addr string, observer IServerObserver) *Server {
return &Server{
addr: addr,
observer: observer,
@ -91,7 +91,7 @@ func (server *Server) handleTcpConnect(conn net.Conn) {
}
}
// ----- ServerSessionObserver ------------------------------------------------------------------------------------
// ----- IServerSessionObserver ------------------------------------------------------------------------------------
func (server *Server) OnRtmpConnect(session *ServerSession, opa ObjectPairArray) {
server.observer.OnRtmpConnect(session, opa)

@ -24,7 +24,7 @@ import (
// TODO chef: 没有进化成Pub Sub时的超时释放
type ServerSessionObserver interface {
type IServerSessionObserver interface {
OnRtmpConnect(session *ServerSession, opa ObjectPairArray)
// OnNewRtmpPubSession
@ -38,12 +38,12 @@ type ServerSessionObserver interface {
OnNewRtmpSubSession(session *ServerSession) error
}
type PubSessionObserver interface {
type IPubSessionObserver interface {
// OnReadRtmpAvMsg 注意回调结束后内部会复用Payload内存块
OnReadRtmpAvMsg(msg base.RtmpMsg)
}
func (s *ServerSession) SetPubSessionObserver(observer PubSessionObserver) {
func (s *ServerSession) SetPubSessionObserver(observer IPubSessionObserver) {
s.avObserver = observer
}
@ -64,7 +64,7 @@ type ServerSession struct {
streamName string // const after set
rawQuery string //const after set
observer ServerSessionObserver
observer IServerSessionObserver
t ServerSessionType
hs HandshakeServer
chunkComposer *ChunkComposer
@ -76,7 +76,7 @@ type ServerSession struct {
stat base.StatSession
// only for PubSession
avObserver PubSessionObserver
avObserver IPubSessionObserver
// IsFresh ShouldWaitVideoKeyFrame
//
@ -97,7 +97,7 @@ type ServerSession struct {
DisposeByObserverFlag bool
}
func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession {
func NewServerSession(observer IServerSessionObserver, conn net.Conn) *ServerSession {
uk := base.GenUkRtmpServerSession()
s := &ServerSession{
conn: connection.New(conn, func(option *connection.Option) {

@ -27,13 +27,13 @@ import (
// 聚合PubSession和PullSession也即流数据是输入类型的session
// BaseInSessionObserver
// IBaseInSessionObserver
//
// BaseInSession会向上层回调两种格式的数据(本质上是一份数据,业务方可自由选择使用)
// 1. 原始的rtp packet
// 2. rtp合并后的av packet
//
type BaseInSessionObserver interface {
type IBaseInSessionObserver interface {
OnSdp(sdpCtx sdp.LogicContext)
// OnRtpPacket 回调收到的RTP包
@ -51,7 +51,7 @@ type BaseInSession struct {
uniqueKey string // 使用上层Session的值
cmdSession IInterleavedPacketWriter
observer BaseInSessionObserver
observer IBaseInSessionObserver
audioRtpConn *nazanet.UdpConnection
videoRtpConn *nazanet.UdpConnection
@ -106,7 +106,7 @@ func NewBaseInSession(uniqueKey string, cmdSession IInterleavedPacketWriter) *Ba
return s
}
func NewBaseInSessionWithObserver(uniqueKey string, cmdSession IInterleavedPacketWriter, observer BaseInSessionObserver) *BaseInSession {
func NewBaseInSessionWithObserver(uniqueKey string, cmdSession IInterleavedPacketWriter, observer IBaseInSessionObserver) *BaseInSession {
s := NewBaseInSession(uniqueKey, cmdSession)
s.observer = observer
return s
@ -144,7 +144,7 @@ func (session *BaseInSession) InitWithSdp(sdpCtx sdp.LogicContext) {
// SetObserver 如果没有设置回调监听对象可以通过该函数设置调用方保证调用该函数发生在调用InitWithSdp之后
//
func (session *BaseInSession) SetObserver(observer BaseInSessionObserver) {
func (session *BaseInSession) SetObserver(observer IBaseInSessionObserver) {
session.observer = observer
// 避免在当前协程回调,降低业务方使用负担,不必担心设置监听对象和回调函数中锁重入 TODO(chef): 更好的方式

@ -49,7 +49,7 @@ var defaultClientCommandSessionOption = ClientCommandSessionOption{
OverTcp: false,
}
type ClientCommandSessionObserver interface {
type IClientCommandSessionObserver interface {
OnConnectResult()
// OnDescribeResponse only for PullSession
@ -67,7 +67,7 @@ type ClientCommandSessionObserver interface {
type ClientCommandSession struct {
uniqueKey string
t ClientCommandSessionType
observer ClientCommandSessionObserver
observer IClientCommandSessionObserver
option ClientCommandSessionOption
rawUrl string
@ -88,7 +88,7 @@ type ClientCommandSession struct {
type ModClientCommandSessionOption func(option *ClientCommandSessionOption)
func NewClientCommandSession(t ClientCommandSessionType, uniqueKey string, observer ClientCommandSessionObserver, modOptions ...ModClientCommandSessionOption) *ClientCommandSession {
func NewClientCommandSession(t ClientCommandSessionType, uniqueKey string, observer IClientCommandSessionObserver, modOptions ...ModClientCommandSessionOption) *ClientCommandSession {
option := defaultClientCommandSessionOption
for _, fn := range modOptions {
fn(&option)

@ -17,8 +17,8 @@ import (
"github.com/q191201771/naza/pkg/nazanet"
)
type PullSessionObserver interface {
BaseInSessionObserver
type IPullSessionObserver interface {
IBaseInSessionObserver
}
type PullSessionOption struct {
@ -45,7 +45,7 @@ type PullSession struct {
type ModPullSessionOption func(option *PullSessionOption)
func NewPullSession(observer PullSessionObserver, modOptions ...ModPullSessionOption) *PullSession {
func NewPullSession(observer IPullSessionObserver, modOptions ...ModPullSessionOption) *PullSession {
option := defaultPullSessionOption
for _, fn := range modOptions {
fn(&option)
@ -185,32 +185,32 @@ func (session *PullSession) IsAlive() (readAlive, writeAlive bool) {
return session.baseInSession.IsAlive()
}
// OnConnectResult ClientCommandSessionObserver, callback by ClientCommandSession
// OnConnectResult IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnConnectResult() {
// noop
}
// OnDescribeResponse ClientCommandSessionObserver, callback by ClientCommandSession
// OnDescribeResponse IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnDescribeResponse(sdpCtx sdp.LogicContext) {
session.baseInSession.InitWithSdp(sdpCtx)
}
// OnSetupWithConn ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupWithConn IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnSetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UdpConnection) {
_ = session.baseInSession.SetupWithConn(uri, rtpConn, rtcpConn)
}
// OnSetupWithChannel ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupWithChannel IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnSetupWithChannel(uri string, rtpChannel, rtcpChannel int) {
_ = session.baseInSession.SetupWithChannel(uri, rtpChannel, rtcpChannel)
}
// OnSetupResult ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupResult IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnSetupResult() {
session.baseInSession.WriteRtpRtcpDummy()
}
// OnInterleavedPacket ClientCommandSessionObserver, callback by ClientCommandSession
// OnInterleavedPacket IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PullSession) OnInterleavedPacket(packet []byte, channel int) {
session.baseInSession.HandleInterleavedPacket(packet, channel)
}

@ -178,32 +178,32 @@ func (session *PushSession) IsAlive() (readAlive, writeAlive bool) {
return session.baseOutSession.IsAlive()
}
// OnConnectResult ClientCommandSessionObserver, callback by ClientCommandSession
// OnConnectResult IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnConnectResult() {
// noop
}
// OnDescribeResponse ClientCommandSessionObserver, callback by ClientCommandSession
// OnDescribeResponse IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnDescribeResponse(sdpCtx sdp.LogicContext) {
// noop
}
// OnSetupWithConn ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupWithConn IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnSetupWithConn(uri string, rtpConn, rtcpConn *nazanet.UdpConnection) {
_ = session.baseOutSession.SetupWithConn(uri, rtpConn, rtcpConn)
}
// OnSetupWithChannel ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupWithChannel IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnSetupWithChannel(uri string, rtpChannel, rtcpChannel int) {
_ = session.baseOutSession.SetupWithChannel(uri, rtpChannel, rtcpChannel)
}
// OnSetupResult ClientCommandSessionObserver, callback by ClientCommandSession
// OnSetupResult IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnSetupResult() {
// noop
}
// OnInterleavedPacket ClientCommandSessionObserver, callback by ClientCommandSession
// OnInterleavedPacket IClientCommandSessionObserver, callback by ClientCommandSession
func (session *PushSession) OnInterleavedPacket(packet []byte, channel int) {
session.baseOutSession.HandleInterleavedPacket(packet, channel)
}

@ -12,7 +12,7 @@ import (
"net"
)
type ServerObserver interface {
type IServerObserver interface {
// OnNewRtspSessionConnect @brief 使得上层有能力管理未进化到Pub、Sub阶段的Session
OnNewRtspSessionConnect(session *ServerCommandSession)
@ -51,12 +51,12 @@ type ServerObserver interface {
type Server struct {
addr string
observer ServerObserver
observer IServerObserver
ln net.Listener
}
func NewServer(addr string, observer ServerObserver) *Server {
func NewServer(addr string, observer IServerObserver) *Server {
return &Server{
addr: addr,
observer: observer,

@ -23,7 +23,7 @@ import (
"github.com/q191201771/naza/pkg/nazahttp"
)
type ServerCommandSessionObserver interface {
type IServerCommandSessionObserver interface {
// OnNewRtspPubSession
//
// @brief Announce阶段回调
@ -48,8 +48,8 @@ type ServerCommandSessionObserver interface {
}
type ServerCommandSession struct {
uniqueKey string // const after ctor
observer ServerCommandSessionObserver // const after ctor
uniqueKey string // const after ctor
observer IServerCommandSessionObserver // const after ctor
conn connection.Connection
prevConnStat connection.Stat
staleStat *connection.Stat
@ -59,7 +59,7 @@ type ServerCommandSession struct {
subSession *SubSession
}
func NewServerCommandSession(observer ServerCommandSessionObserver, conn net.Conn) *ServerCommandSession {
func NewServerCommandSession(observer IServerCommandSessionObserver, conn net.Conn) *ServerCommandSession {
uk := base.GenUkRtspServerCommandSession()
s := &ServerCommandSession{
uniqueKey: uk,

@ -16,8 +16,8 @@ import (
"github.com/q191201771/lal/pkg/sdp"
)
type PubSessionObserver interface {
BaseInSessionObserver
type IPubSessionObserver interface {
IBaseInSessionObserver
}
type PubSession struct {
@ -26,7 +26,7 @@ type PubSession struct {
cmdSession *ServerCommandSession
baseInSession *BaseInSession
observer PubSessionObserver
observer IPubSessionObserver
}
func NewPubSession(urlCtx base.UrlContext, cmdSession *ServerCommandSession) *PubSession {
@ -46,7 +46,7 @@ func (session *PubSession) InitWithSdp(sdpCtx sdp.LogicContext) {
session.baseInSession.InitWithSdp(sdpCtx)
}
func (session *PubSession) SetObserver(observer PubSessionObserver) {
func (session *PubSession) SetObserver(observer IPubSessionObserver) {
session.baseInSession.SetObserver(observer)
}

Loading…
Cancel
Save