|
|
|
@ -86,7 +86,7 @@ type pushProxy struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewGroup(appName string, streamName string, pullEnable bool, pullURL string) *Group {
|
|
|
|
|
uk := base.GenUniqueKey(base.UKPGroup)
|
|
|
|
|
uk := base.GenUKGroup()
|
|
|
|
|
nazalog.Infof("[%s] lifecycle new group. appName=%s, streamName=%s", uk, appName, streamName)
|
|
|
|
|
|
|
|
|
|
url2PushProxy := make(map[string]*pushProxy)
|
|
|
|
@ -145,13 +145,13 @@ func (group *Group) Tick() {
|
|
|
|
|
if group.tickCount%checkSessionAliveIntervalSec == 0 {
|
|
|
|
|
if group.rtmpPubSession != nil {
|
|
|
|
|
if readAlive, _ := group.rtmpPubSession.IsAlive(); !readAlive {
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtmpPubSession.UniqueKey)
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtmpPubSession.UniqueKey())
|
|
|
|
|
group.rtmpPubSession.Dispose()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if group.rtspPubSession != nil {
|
|
|
|
|
if readAlive, _ := group.rtspPubSession.IsAlive(); !readAlive {
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey)
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, group.rtspPubSession.UniqueKey())
|
|
|
|
|
group.rtspPubSession.Dispose()
|
|
|
|
|
group.rtspPubSession = nil
|
|
|
|
|
}
|
|
|
|
@ -165,28 +165,28 @@ func (group *Group) Tick() {
|
|
|
|
|
}
|
|
|
|
|
for session := range group.rtmpSubSessionSet {
|
|
|
|
|
if _, writeAlive := session.IsAlive(); !writeAlive {
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
session.Dispose()
|
|
|
|
|
group.delRTMPSubSession(session)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for session := range group.httpflvSubSessionSet {
|
|
|
|
|
if _, writeAlive := session.IsAlive(); !writeAlive {
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
session.Dispose()
|
|
|
|
|
group.delHTTPFLVSubSession(session)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for session := range group.httptsSubSessionSet {
|
|
|
|
|
if _, writeAlive := session.IsAlive(); !writeAlive {
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
session.Dispose()
|
|
|
|
|
group.delHTTPTSSubSession(session)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for session := range group.rtspSubSessionSet {
|
|
|
|
|
if _, writeAlive := session.IsAlive(); !writeAlive {
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Warnf("[%s] session timeout. session=%s", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
session.Dispose()
|
|
|
|
|
group.DelRTSPSubSession(session)
|
|
|
|
|
}
|
|
|
|
@ -271,13 +271,13 @@ func (group *Group) Dispose() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add PubSession into group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add PubSession into group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
|
|
|
|
|
group.mutex.Lock()
|
|
|
|
|
defer group.mutex.Unlock()
|
|
|
|
|
|
|
|
|
|
if group.hasInSession() {
|
|
|
|
|
nazalog.Errorf("[%s] in stream already exist. wanna add=%s", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Errorf("[%s] in stream already exist. wanna add=%s", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -296,13 +296,13 @@ func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) {
|
|
|
|
|
|
|
|
|
|
// TODO chef: rtsp package中,增加回调返回值判断,如果是false,将连接关掉
|
|
|
|
|
func (group *Group) AddRTSPPubSession(session *rtsp.PubSession) bool {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add RTSP PubSession into group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add RTSP PubSession into group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
|
|
|
|
|
group.mutex.Lock()
|
|
|
|
|
defer group.mutex.Unlock()
|
|
|
|
|
|
|
|
|
|
if group.hasInSession() {
|
|
|
|
|
nazalog.Errorf("[%s] in stream already exist. wanna add=%s", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Errorf("[%s] in stream already exist. wanna add=%s", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -342,7 +342,7 @@ func (group *Group) DelRTMPPullSession(session *rtmp.PullSession) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add SubSession into group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add SubSession into group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
group.mutex.Lock()
|
|
|
|
|
defer group.mutex.Unlock()
|
|
|
|
|
group.rtmpSubSessionSet[session] = struct{}{}
|
|
|
|
@ -357,7 +357,7 @@ func (group *Group) DelRTMPSubSession(session *rtmp.ServerSession) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (group *Group) AddHTTPFLVSubSession(session *httpflv.SubSession) {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
session.WriteHTTPResponseHeader()
|
|
|
|
|
session.WriteFLVHeader()
|
|
|
|
|
|
|
|
|
@ -378,7 +378,7 @@ func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) {
|
|
|
|
|
// 这里应该也要考虑触发hls muxer开启
|
|
|
|
|
// 也即HTTPTS sub需要使用hls muxer,hls muxer开启和关闭都要考虑HTTPTS sub
|
|
|
|
|
func (group *Group) AddHTTPTSSubSession(session *httpts.SubSession) {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add httpflv SubSession into group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
session.WriteHTTPResponseHeader()
|
|
|
|
|
session.WriteFragmentHeader()
|
|
|
|
|
|
|
|
|
@ -399,7 +399,8 @@ func (group *Group) HandleNewRTSPSubSessionDescribe(session *rtsp.SubSession) (o
|
|
|
|
|
group.mutex.Lock()
|
|
|
|
|
defer group.mutex.Unlock()
|
|
|
|
|
if group.rtspPubSession == nil {
|
|
|
|
|
nazalog.Warnf("[%s] close rtsp subSession while describe but pubSession not exist. [%s]", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Warnf("[%s] close rtsp subSession while describe but pubSession not exist. [%s]",
|
|
|
|
|
group.UniqueKey, session.UniqueKey())
|
|
|
|
|
return false, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -408,7 +409,7 @@ func (group *Group) HandleNewRTSPSubSessionDescribe(session *rtsp.SubSession) (o
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (group *Group) HandleNewRTSPSubSessionPlay(session *rtsp.SubSession) bool {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] add rtsp SubSession into group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
|
|
|
|
|
group.mutex.Lock()
|
|
|
|
|
defer group.mutex.Unlock()
|
|
|
|
@ -417,7 +418,7 @@ func (group *Group) HandleNewRTSPSubSessionPlay(session *rtsp.SubSession) bool {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (group *Group) DelRTSPSubSession(session *rtsp.SubSession) {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del rtsp SubSession from group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
group.mutex.Lock()
|
|
|
|
|
defer group.mutex.Unlock()
|
|
|
|
|
delete(group.rtspSubSessionSet, session)
|
|
|
|
@ -597,32 +598,32 @@ func (group *Group) KickOutSession(sessionID string) bool {
|
|
|
|
|
|
|
|
|
|
nazalog.Infof("[%s] kick out session. session id=%s", group.UniqueKey, sessionID)
|
|
|
|
|
|
|
|
|
|
if strings.HasPrefix(sessionID, base.UKPRTMPServerSession) {
|
|
|
|
|
if strings.HasPrefix(sessionID, base.UKPreRTMPServerSession) {
|
|
|
|
|
if group.rtmpPubSession != nil {
|
|
|
|
|
group.rtmpPubSession.Dispose()
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
} else if strings.HasPrefix(sessionID, base.UKPRTSPPubSession) {
|
|
|
|
|
} else if strings.HasPrefix(sessionID, base.UKPreRTSPPubSession) {
|
|
|
|
|
if group.rtspPubSession != nil {
|
|
|
|
|
group.rtspPubSession.Dispose()
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
} else if strings.HasPrefix(sessionID, base.UKPFLVSubSession) {
|
|
|
|
|
} else if strings.HasPrefix(sessionID, base.UKPreFLVSubSession) {
|
|
|
|
|
// TODO chef: 考虑数据结构改成sessionIDzuokey的map
|
|
|
|
|
for s := range group.httpflvSubSessionSet {
|
|
|
|
|
if s.UniqueKey == sessionID {
|
|
|
|
|
if s.UniqueKey() == sessionID {
|
|
|
|
|
s.Dispose()
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if strings.HasPrefix(sessionID, base.UKPTSSubSession) {
|
|
|
|
|
} else if strings.HasPrefix(sessionID, base.UKPreTSSubSession) {
|
|
|
|
|
for s := range group.httptsSubSessionSet {
|
|
|
|
|
if s.UniqueKey == sessionID {
|
|
|
|
|
if s.UniqueKey() == sessionID {
|
|
|
|
|
s.Dispose()
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if strings.HasPrefix(sessionID, base.UKPRTSPSubSession) {
|
|
|
|
|
} else if strings.HasPrefix(sessionID, base.UKPreRTSPSubSession) {
|
|
|
|
|
// TODO chef: impl me
|
|
|
|
|
} else {
|
|
|
|
|
nazalog.Errorf("[%s] kick out session while session id format invalid. %s", group.UniqueKey, sessionID)
|
|
|
|
@ -632,10 +633,10 @@ func (group *Group) KickOutSession(sessionID string) bool {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (group *Group) delRTMPPubSession(session *rtmp.ServerSession) {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del rtmp PubSession from group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
|
|
|
|
|
if session != group.rtmpPubSession {
|
|
|
|
|
nazalog.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p", group.UniqueKey, session.UniqueKey, group.rtmpPubSession)
|
|
|
|
|
nazalog.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p", group.UniqueKey, session.UniqueKey(), group.rtmpPubSession)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -644,10 +645,10 @@ func (group *Group) delRTMPPubSession(session *rtmp.ServerSession) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (group *Group) delRTSPPubSession(session *rtsp.PubSession) {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del rtsp PubSession from group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del rtsp PubSession from group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
|
|
|
|
|
if session != group.rtspPubSession {
|
|
|
|
|
nazalog.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p", group.UniqueKey, session.UniqueKey, group.rtmpPubSession)
|
|
|
|
|
nazalog.Warnf("[%s] del rtmp pub session but not match. del session=%s, group session=%p", group.UniqueKey, session.UniqueKey(), group.rtmpPubSession)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -665,17 +666,17 @@ func (group *Group) delRTMPPullSession(session *rtmp.PullSession) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (group *Group) delRTMPSubSession(session *rtmp.ServerSession) {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del rtmp SubSession from group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del rtmp SubSession from group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
delete(group.rtmpSubSessionSet, session)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (group *Group) delHTTPFLVSubSession(session *httpflv.SubSession) {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del httpflv SubSession from group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del httpflv SubSession from group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
delete(group.httpflvSubSessionSet, session)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (group *Group) delHTTPTSSubSession(session *httpts.SubSession) {
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del httpts SubSession from group.", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
nazalog.Debugf("[%s] [%s] del httpts SubSession from group.", group.UniqueKey, session.UniqueKey())
|
|
|
|
|
delete(group.httptsSubSessionSet, session)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -711,19 +712,19 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
|
|
|
|
|
// TODO chef: 头信息和full gop也可以在SubSession刚加入时发送
|
|
|
|
|
if group.gopCache.Metadata != nil {
|
|
|
|
|
//nazalog.Debugf("[%s] [%s] write metadata", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
_ = session.AsyncWrite(group.gopCache.Metadata)
|
|
|
|
|
_ = session.Write(group.gopCache.Metadata)
|
|
|
|
|
}
|
|
|
|
|
if group.gopCache.VideoSeqHeader != nil {
|
|
|
|
|
//nazalog.Debugf("[%s] [%s] write vsh", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
_ = session.AsyncWrite(group.gopCache.VideoSeqHeader)
|
|
|
|
|
_ = session.Write(group.gopCache.VideoSeqHeader)
|
|
|
|
|
}
|
|
|
|
|
if group.gopCache.AACSeqHeader != nil {
|
|
|
|
|
//nazalog.Debugf("[%s] [%s] write ash", group.UniqueKey, session.UniqueKey)
|
|
|
|
|
_ = session.AsyncWrite(group.gopCache.AACSeqHeader)
|
|
|
|
|
_ = session.Write(group.gopCache.AACSeqHeader)
|
|
|
|
|
}
|
|
|
|
|
for i := 0; i < group.gopCache.GetGOPCount(); i++ {
|
|
|
|
|
for _, item := range group.gopCache.GetGOPDataAt(i) {
|
|
|
|
|
_ = session.AsyncWrite(item)
|
|
|
|
|
_ = session.Write(item)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -731,7 +732,7 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ## 3.2. 转发本次数据
|
|
|
|
|
_ = session.AsyncWrite(lcd.Get())
|
|
|
|
|
_ = session.Write(lcd.Get())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下
|
|
|
|
@ -743,24 +744,24 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
|
|
|
|
|
|
|
|
|
|
if v.pushSession.IsFresh {
|
|
|
|
|
if group.gopCache.Metadata != nil {
|
|
|
|
|
_ = v.pushSession.AsyncWrite(group.gopCache.Metadata)
|
|
|
|
|
_ = v.pushSession.Write(group.gopCache.Metadata)
|
|
|
|
|
}
|
|
|
|
|
if group.gopCache.VideoSeqHeader != nil {
|
|
|
|
|
_ = v.pushSession.AsyncWrite(group.gopCache.VideoSeqHeader)
|
|
|
|
|
_ = v.pushSession.Write(group.gopCache.VideoSeqHeader)
|
|
|
|
|
}
|
|
|
|
|
if group.gopCache.AACSeqHeader != nil {
|
|
|
|
|
_ = v.pushSession.AsyncWrite(group.gopCache.AACSeqHeader)
|
|
|
|
|
_ = v.pushSession.Write(group.gopCache.AACSeqHeader)
|
|
|
|
|
}
|
|
|
|
|
for i := 0; i < group.gopCache.GetGOPCount(); i++ {
|
|
|
|
|
for _, item := range group.gopCache.GetGOPDataAt(i) {
|
|
|
|
|
_ = v.pushSession.AsyncWrite(item)
|
|
|
|
|
_ = v.pushSession.Write(item)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
v.pushSession.IsFresh = false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_ = v.pushSession.AsyncWrite(lcd.Get())
|
|
|
|
|
_ = v.pushSession.Write(lcd.Get())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -874,7 +875,7 @@ func (group *Group) pullIfNeeded() {
|
|
|
|
|
}
|
|
|
|
|
res := group.AddRTMPPullSession(pullSession)
|
|
|
|
|
if res {
|
|
|
|
|
err = <-pullSession.Wait()
|
|
|
|
|
err = <-pullSession.WaitChan()
|
|
|
|
|
nazalog.Infof("[%s] relay pull done. err=%v", pullSession.UniqueKey(), err)
|
|
|
|
|
group.DelRTMPPullSession(pullSession)
|
|
|
|
|
} else {
|
|
|
|
@ -925,7 +926,7 @@ func (group *Group) pushIfNeeded() {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
group.AddRTMPPushSession(u, pushSession)
|
|
|
|
|
err = <-pushSession.Wait()
|
|
|
|
|
err = <-pushSession.WaitChan()
|
|
|
|
|
nazalog.Infof("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
|
|
|
|
|
group.DelRTMPPushSession(u, pushSession)
|
|
|
|
|
}(url, urlWithParam)
|
|
|
|
|