From f2fc9a741a5fdd57f6186cb5ace30d99a84a253f Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sun, 11 Oct 2020 19:15:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0HTTP=20API=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=EF=BC=8C=E8=8E=B7=E5=8F=96=E6=9C=8D=E5=8A=A1=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 - conf/edge.conf.json | 8 +- conf/lalserver.conf.json | 4 + conf/lalserver.conf.json.brief | 4 + conf/lalserver.conf.json.tmpl | 4 + go.mod | 2 +- go.sum | 4 +- pkg/avc/avc.go | 1 + pkg/base/stat.go | 71 +++++++++++++++ pkg/httpflv/server.go | 18 ++-- pkg/httpflv/server_sub_session.go | 26 +++++- pkg/httpts/server.go | 16 ++-- pkg/httpts/server_sub_session.go | 26 +++++- pkg/logic/config.go | 11 ++- pkg/logic/group.go | 110 ++++++++++++++++++++--- pkg/logic/http_api.go | 143 ++++++++++++++++++++---------- pkg/logic/logic.go | 1 + pkg/logic/server_manager.go | 42 +++++++++ pkg/rtmp/server.go | 20 ++--- pkg/rtmp/server_session.go | 57 +++++++++--- pkg/rtsp/server.go | 12 +-- pkg/rtsp/server_pub_session.go | 50 +++++++++-- 22 files changed, 509 insertions(+), 122 deletions(-) create mode 100644 pkg/base/stat.go diff --git a/README.md b/README.md index 6c3c6be..106f455 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,6 @@ -
diff --git a/conf/edge.conf.json b/conf/edge.conf.json index 6aebe61..57b8023 100644 --- a/conf/edge.conf.json +++ b/conf/edge.conf.json @@ -7,7 +7,7 @@ "httpflv": { "enable": true, "sub_listen_addr": ":8090", - "enable_https": true, + "enable_https": false, "https_addr": ":4443", "https_cert_file": "./conf/cert.pem", "https_key_file": "./conf/key.pem", @@ -38,9 +38,13 @@ "enable": true, "addr": "127.0.0.1:19350" }, + "http_api": { + "enable": true, + "addr": ":8093" + }, "pprof": { "enable": false, - "addr": ":10002" + "addr": ":10011" }, "log": { "level": 1, diff --git a/conf/lalserver.conf.json b/conf/lalserver.conf.json index 95c0294..8b46978 100644 --- a/conf/lalserver.conf.json +++ b/conf/lalserver.conf.json @@ -37,6 +37,10 @@ "enable": false, "addr": "" }, + "http_api": { + "enable": true, + "addr": ":8083" + }, "pprof": { "enable": true, "addr": ":10001" diff --git a/conf/lalserver.conf.json.brief b/conf/lalserver.conf.json.brief index 9fdc674..e55c0cc 100644 --- a/conf/lalserver.conf.json.brief +++ b/conf/lalserver.conf.json.brief @@ -37,6 +37,10 @@ "enable": false, // 是否开启回源拉流功能,开启后,当自身接收到拉流请求,而流不存在时,会从其他服务器拉取这个流到本地 "addr": "" // 回源拉流的地址。格式举例 "127.0.0.1:19351" }, + "http_api": { + "enable": true, // 是否开启HTTP API接口 + "addr": ":8083" // 监听地址 + }, "pprof": { "enable": true, // 是否开启Go pprof web服务的监听 "addr": ":10001" // Go pprof web地址 diff --git a/conf/lalserver.conf.json.tmpl b/conf/lalserver.conf.json.tmpl index ea1ea6f..f31e75f 100644 --- a/conf/lalserver.conf.json.tmpl +++ b/conf/lalserver.conf.json.tmpl @@ -37,6 +37,10 @@ "enable": false, "addr": "" }, + "http_api": { + "enable": true, + "addr": ":8083" + }, "pprof": { "enable": true, "addr": ":10001" diff --git a/go.mod b/go.mod index 20fed7c..5d8eb83 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.12 -require github.com/q191201771/naza v0.15.0 +require github.com/q191201771/naza v0.15.1 diff --git a/go.sum b/go.sum index c108dbf..4b48aef 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/naza v0.15.0 h1:HFyRrluqZhpnBu6YQ1soIk6cR9P8G/9sDMFLBhTTBRc= -github.com/q191201771/naza v0.15.0/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= +github.com/q191201771/naza v0.15.1 h1:y9D7jbzHeD883PqBZTln+O47E40dFoRlQUrWYOA5GoM= +github.com/q191201771/naza v0.15.1/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= diff --git a/pkg/avc/avc.go b/pkg/avc/avc.go index 99552a5..22ac706 100644 --- a/pkg/avc/avc.go +++ b/pkg/avc/avc.go @@ -333,6 +333,7 @@ func CaptureAVCC2AnnexB(w io.Writer, payload []byte) error { return nil } +// TODO chef: hevc中,ctx作为参数传入,这里考虑统一一下 // 尝试解析SPS所有字段,实验中,请勿直接使用该函数 func ParseSPS(payload []byte) (Context, error) { var sps SPS diff --git a/pkg/base/stat.go b/pkg/base/stat.go new file mode 100644 index 0000000..1fb50b9 --- /dev/null +++ b/pkg/base/stat.go @@ -0,0 +1,71 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package base + +const ( + // StatGroup.AudioCodec + AudioCodecAAC = "AAC" + + // StatGroup.VideoCodec + VideoCodecAVC = "H264" + VideoCodecHEVC = "H265" + + // StatSession.Protocol + ProtocolRTMP = "RTMP" + ProtocolRTSP = "RTSP" + ProtocolHTTPFLV = "HTTP-FLV" + ProtocolHTTPTS = "HTTP-TS" +) + +type StatGroup struct { + StreamName string `json:"stream_name"` + AudioCodec string `json:"audio_codec"` + VideoCodec string `json:"video_codec"` + VideoWidth int `json:"video_width"` + VideoHeight int `json:"video_height"` + StatPub StatPub `json:"pub"` + StatSubs []StatSub `json:"subs"` +} + +type StatPub struct { + StatSession +} + +type StatSub struct { + StatSession +} + +type StatSession struct { + Protocol string `json:"protocol"` + StartTime string `json:"start_time"` + RemoteAddr string `json:"remote_addr"` + ReadBytesSum uint64 `json:"read_bytes_sum"` + WroteBytesSum uint64 `json:"wrote_bytes_sum"` + Bitrate int `json:"bitrate"` +} + +func StatSession2Pub(ss StatSession) (ret StatPub) { + ret.Protocol = ss.Protocol + ret.StartTime = ss.StartTime + ret.RemoteAddr = ss.RemoteAddr + ret.ReadBytesSum = ss.ReadBytesSum + ret.WroteBytesSum = ss.WroteBytesSum + ret.Bitrate = ss.Bitrate + return +} + +func StatSession2Sub(ss StatSession) (ret StatSub) { + ret.Protocol = ss.Protocol + ret.StartTime = ss.StartTime + ret.RemoteAddr = ss.RemoteAddr + ret.ReadBytesSum = ss.ReadBytesSum + ret.WroteBytesSum = ss.WroteBytesSum + ret.Bitrate = ss.Bitrate + return +} diff --git a/pkg/httpflv/server.go b/pkg/httpflv/server.go index 17d9142..c496c48 100644 --- a/pkg/httpflv/server.go +++ b/pkg/httpflv/server.go @@ -34,18 +34,18 @@ type ServerConfig struct { } type Server struct { - obs ServerObserver - config ServerConfig - ln net.Listener - httpsLn net.Listener + observer ServerObserver + config ServerConfig + ln net.Listener + httpsLn net.Listener } // TODO chef: 监听太难看了,考虑直接传入Listener对象,或直接路由进来,使得不同server可以共用端口 -func NewServer(obs ServerObserver, config ServerConfig) *Server { +func NewServer(observer ServerObserver, config ServerConfig) *Server { return &Server{ - obs: obs, - config: config, + observer: observer, + config: config, } } @@ -133,11 +133,11 @@ func (server *Server) handleConnect(conn net.Conn) { } nazalog.Debugf("[%s] < read http request. uri=%s", session.UniqueKey, session.URI) - if !server.obs.OnNewHTTPFLVSubSession(session) { + if !server.observer.OnNewHTTPFLVSubSession(session) { session.Dispose() } err := session.RunLoop() nazalog.Debugf("[%s] httpflv sub session loop done. err=%v", session.UniqueKey, err) - server.obs.OnDelHTTPFLVSubSession(session) + server.observer.OnDelHTTPFLVSubSession(session) } diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index 20168f8..0dddcd3 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -37,7 +37,9 @@ type SubSession struct { IsFresh bool - conn connection.Connection + conn connection.Connection + prevConnStat connection.Stat + stat base.StatSub } func NewSubSession(conn net.Conn) *SubSession { @@ -50,6 +52,13 @@ func NewSubSession(conn net.Conn) *SubSession { option.WriteChanSize = wChanSize option.WriteTimeoutMS = subSessionWriteTimeoutMS }), + stat: base.StatSub{ + StatSession: base.StatSession{ + Protocol: base.ProtocolHTTPFLV, + StartTime: time.Now().Format("2006-01-02 15:04:05.999"), + RemoteAddr: conn.RemoteAddr().String(), + }, + }, } nazalog.Infof("[%s] lifecycle new httpflv SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String()) return s @@ -134,6 +143,21 @@ func (session *SubSession) Dispose() { _ = session.conn.Close() } +func (session *SubSession) GetStat() base.StatSub { + currStat := session.conn.GetStat() + session.stat.ReadBytesSum = currStat.ReadBytesSum + session.stat.WroteBytesSum = currStat.WroteBytesSum + return session.stat +} + +func (session *SubSession) UpdateStat(tickCount uint32) { + currStat := session.conn.GetStat() + var diffStat connection.Stat + diffStat.WroteBytesSum = currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum + session.stat.Bitrate = int(diffStat.WroteBytesSum * 8 / 1024 / 5) + session.prevConnStat = currStat +} + func init() { flvHTTPResponseHeaderStr := "HTTP/1.1 200 OK\r\n" + "Server: " + base.LALHTTPFLVSubSessionServer + "\r\n" + diff --git a/pkg/httpts/server.go b/pkg/httpts/server.go index 3d14c6d..d7cf2d0 100644 --- a/pkg/httpts/server.go +++ b/pkg/httpts/server.go @@ -23,15 +23,15 @@ type ServerObserver interface { } type Server struct { - obs ServerObserver - addr string - ln net.Listener + observer ServerObserver + addr string + ln net.Listener } -func NewServer(obs ServerObserver, addr string) *Server { +func NewServer(observer ServerObserver, addr string) *Server { return &Server{ - obs: obs, - addr: addr, + observer: observer, + addr: addr, } } @@ -71,11 +71,11 @@ func (server *Server) handleConnect(conn net.Conn) { } log.Debugf("[%s] < read http request. uri=%s", session.UniqueKey, session.URI) - if !server.obs.OnNewHTTPTSSubSession(session) { + if !server.observer.OnNewHTTPTSSubSession(session) { session.Dispose() } err := session.RunLoop() log.Debugf("[%s] httpts sub session loop done. err=%v", session.UniqueKey, err) - server.obs.OnDelHTTPTSSubSession(session) + server.observer.OnDelHTTPTSSubSession(session) } diff --git a/pkg/httpts/server_sub_session.go b/pkg/httpts/server_sub_session.go index 7129462..4205bcf 100644 --- a/pkg/httpts/server_sub_session.go +++ b/pkg/httpts/server_sub_session.go @@ -36,7 +36,9 @@ type SubSession struct { IsFresh bool - conn connection.Connection + conn connection.Connection + prevConnStat connection.Stat + stat base.StatSub } func NewSubSession(conn net.Conn) *SubSession { @@ -49,6 +51,13 @@ func NewSubSession(conn net.Conn) *SubSession { option.WriteChanSize = wChanSize option.WriteTimeoutMS = subSessionWriteTimeoutMS }), + stat: base.StatSub{ + StatSession: base.StatSession{ + Protocol: base.ProtocolHTTPTS, + StartTime: time.Now().Format("2006-01-02 15:04:05.999"), + RemoteAddr: conn.RemoteAddr().String(), + }, + }, } nazalog.Infof("[%s] lifecycle new httpts SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String()) return s @@ -129,6 +138,21 @@ func (session *SubSession) Dispose() { _ = session.conn.Close() } +func (session *SubSession) GetStat() base.StatSub { + currStat := session.conn.GetStat() + session.stat.ReadBytesSum = currStat.ReadBytesSum + session.stat.WroteBytesSum = currStat.WroteBytesSum + return session.stat +} + +func (session *SubSession) UpdateStat(tickCount uint32) { + currStat := session.conn.GetStat() + var diffStat connection.Stat + diffStat.WroteBytesSum = currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum + session.stat.Bitrate = int(diffStat.WroteBytesSum * 8 / 1024 / 5) + session.prevConnStat = currStat +} + func init() { tsHTTPResponseHeaderStr := "HTTP/1.1 200 OK\r\n" + "Server: " + base.LALHTTPTSSubSessionServer + "\r\n" + diff --git a/pkg/logic/config.go b/pkg/logic/config.go index e37b5c4..f8d1b41 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -29,8 +29,9 @@ type Config struct { RelayPushConfig RelayPushConfig `json:"relay_push"` RelayPullConfig RelayPullConfig `json:"relay_pull"` - PProfConfig PProfConfig `json:"pprof"` - LogConfig nazalog.Option `json:"log"` + HTTPAPIConfig HTTPAPIConfig `json:"http_api"` + PProfConfig PProfConfig `json:"pprof"` + LogConfig nazalog.Option `json:"log"` } type RTMPConfig struct { @@ -69,6 +70,11 @@ type RelayPullConfig struct { Addr string `json:"addr"` } +type HTTPAPIConfig struct { + Enable bool `json:"enable"` + Addr string `json:"addr"` +} + type PProfConfig struct { Enable bool `json:"enable"` Addr string `json:"addr"` @@ -97,6 +103,7 @@ func LoadConf(confFile string) (*Config, error) { !j.Exist("rtsp") || !j.Exist("relay_push") || !j.Exist("relay_pull") || + !j.Exist("http_api") || !j.Exist("pprof") || !j.Exist("log") { return &config, errors.New("missing key field in config file") diff --git a/pkg/logic/group.go b/pkg/logic/group.go index f169e4a..2ee27fb 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -14,8 +14,6 @@ import ( "github.com/q191201771/lal/pkg/hevc" - "github.com/q191201771/naza/pkg/nazastring" - "github.com/q191201771/lal/pkg/httpts" "github.com/q191201771/naza/pkg/bele" @@ -44,7 +42,8 @@ type Group struct { UniqueKey string appName string - streamName string + streamName string // TODO chef: 和stat里的字段重复,可以删除掉 + stat base.StatGroup exitChan chan struct{} @@ -66,6 +65,8 @@ type Group struct { vps []byte sps []byte pps []byte + + tickCount uint32 } type pushProxy struct { @@ -89,9 +90,12 @@ func NewGroup(appName string, streamName string) *Group { } return &Group{ - UniqueKey: uk, - appName: appName, - streamName: streamName, + UniqueKey: uk, + appName: appName, + streamName: streamName, + stat: base.StatGroup{ + StreamName: streamName, + }, exitChan: make(chan struct{}, 1), rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}), httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}), @@ -107,12 +111,33 @@ func (group *Group) RunLoop() { } // TODO chef: 传入时间 +// 目前每秒触发一次 func (group *Group) Tick() { group.mutex.Lock() defer group.mutex.Unlock() group.pullIfNeeded() group.pushIfNeeded() + + // 每5秒计算session bitrate + if group.tickCount%5 == 0 { + if group.rtmpPubSession != nil { + group.rtmpPubSession.UpdateStat(group.tickCount) + } + if group.rtspPubSession != nil { + group.rtspPubSession.UpdateStat(group.tickCount) + } + for session := range group.rtmpSubSessionSet { + session.UpdateStat(group.tickCount) + } + for session := range group.httpflvSubSessionSet { + session.UpdateStat(group.tickCount) + } + for session := range group.httptsSubSessionSet { + session.UpdateStat(group.tickCount) + } + } + group.tickCount++ } // 主动释放所有资源。保证所有资源的生命周期逻辑上都在我们的控制中。降低出bug的几率,降低心智负担。 @@ -452,6 +477,7 @@ func (group *Group) OnAVPacket(pkt base.AVPacket) { group.broadcastRTMP(msg) } +// TODO chef 用Stat取代这个序列号函数 func (group *Group) StringifyStats() string { group.mutex.Lock() defer group.mutex.Unlock() @@ -476,8 +502,34 @@ func (group *Group) StringifyStats() string { } } - return fmt.Sprintf("[%s] stream name=%s, rtmp pub=%s, relay rtmp pull=%s, rtmp sub=%d, httpflv sub=%d, httpts sub=%d, relay rtmp push=%d", - group.UniqueKey, group.streamName, pub, pull, len(group.rtmpSubSessionSet), len(group.httpflvSubSessionSet), len(group.httptsSubSessionSet), pushSize) + return fmt.Sprintf("[%s] stat=%+v, rtmp pub=%s, relay rtmp pull=%s, rtmp sub=%d, httpflv sub=%d, httpts sub=%d, relay rtmp push=%d", + group.UniqueKey, group.stat, pub, pull, len(group.rtmpSubSessionSet), len(group.httpflvSubSessionSet), len(group.httptsSubSessionSet), pushSize) +} + +func (group *Group) GetStat() base.StatGroup { + group.mutex.Lock() + defer group.mutex.Unlock() + + if group.rtmpPubSession != nil { + group.stat.StatPub = base.StatSession2Pub(group.rtmpPubSession.GetStat()) + } else if group.rtspPubSession != nil { + group.stat.StatPub = group.rtspPubSession.GetStat() + } else { + group.stat.StatPub = base.StatPub{} + } + + group.stat.StatSubs = nil + for s := range group.rtmpSubSessionSet { + group.stat.StatSubs = append(group.stat.StatSubs, base.StatSession2Sub(s.GetStat())) + } + for s := range group.httpflvSubSessionSet { + group.stat.StatSubs = append(group.stat.StatSubs, s.GetStat()) + } + for s := range group.httptsSubSessionSet { + group.stat.StatSubs = append(group.stat.StatSubs, s.GetStat()) + } + + return group.stat } func (group *Group) broadcastMetadataAndSeqHeader() { @@ -565,9 +617,6 @@ func (group *Group) broadcastMetadataAndSeqHeader() { // TODO chef: 目前相当于其他类型往rtmp.AVMsg转了,考虑统一往一个通用类型转 // @param msg 调用结束后,内部不持有msg.Payload内存块 func (group *Group) broadcastRTMP(msg base.RTMPMsg) { - if msg.IsHEVCKeySeqHeader() { - nazalog.Debugf("%s", nazastring.DumpSliceByte(msg.Payload)) - } var ( lcd LazyChunkDivider lrm2ft LazyRTMPMsg2FLVTag @@ -672,10 +721,47 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) { if config.RTMPConfig.Enable { group.gopCache.Feed(msg, lcd.Get) } - if config.HTTPFLVConfig.Enable { group.httpflvGopCache.Feed(msg, lrm2ft.Get) } + + // # 6. 记录stat + if group.stat.AudioCodec == "" { + if msg.IsAACSeqHeader() { + group.stat.AudioCodec = base.AudioCodecAAC + } + } + if group.stat.AudioCodec == "" { + if msg.IsAVCKeySeqHeader() { + group.stat.VideoCodec = base.VideoCodecAVC + } + if msg.IsHEVCKeySeqHeader() { + group.stat.VideoCodec = base.VideoCodecHEVC + } + } + if group.stat.VideoHeight == 0 || group.stat.VideoWidth == 0 { + if msg.IsAVCKeySeqHeader() { + sps, _, err := avc.ParseSPSPPSFromSeqHeader(msg.Payload) + if err == nil { + ctx, err := avc.ParseSPS(sps) + if err == nil { + group.stat.VideoHeight = int(ctx.Height) + group.stat.VideoWidth = int(ctx.Width) + } + } + } + if msg.IsHEVCKeySeqHeader() { + _, sps, _, err := hevc.ParseVPSSPSPPSFromSeqHeader(msg.Payload) + if err == nil { + var ctx hevc.Context + err = hevc.ParseSPS(sps, &ctx) + if err == nil { + group.stat.VideoHeight = int(ctx.PicHeightInLumaSamples) + group.stat.VideoWidth = int(ctx.PicWidthInLumaSamples) + } + } + } + } } func (group *Group) pullIfNeeded() { diff --git a/pkg/logic/http_api.go b/pkg/logic/http_api.go index fd69234..7a8a728 100644 --- a/pkg/logic/http_api.go +++ b/pkg/logic/http_api.go @@ -20,60 +20,58 @@ import ( ) const httpAPIVersion = "v0.0.1" -const CodeSucc = 0 -const DespSucc = "succ" + +const ( + ErrorCodeSucc = 0 + DespSucc = "succ" + ErrorCodeGroupNotFound = 1001 + DespGroupNotFound = "group not found" +) var startTime string +type HTTPAPIServerObserver interface { + OnStatAllGroup() []base.StatGroup + OnStatGroup(streamName string) *base.StatGroup +} + type HTTPAPIServer struct { - addr string - ln net.Listener + addr string + observer HTTPAPIServerObserver + ln net.Listener } type HTTPResponseBasic struct { - Code int `json:"code"` - Desp string `json:"desp"` + ErrorCode int `json:"error_code"` + Desp string `json:"desp"` } -type APILalInfo struct { +type APIStatLALInfo struct { HTTPResponseBasic - BinInfo string `json:"bin_info"` - LalVersion string `json:"lal_version"` - APIVersion string `json:"api_version"` - StartTime string `json:"start_time"` + Data struct { + BinInfo string `json:"bin_info"` + LalVersion string `json:"lal_version"` + APIVersion string `json:"api_version"` + StartTime string `json:"start_time"` + } `json:"data"` } type APIStatAllGroup struct { HTTPResponseBasic - Groups []StatGroupItem `json:"groups"` -} - -type StatGroupItem struct { - StreamName string `json:"stream_name"` - AudioCodec string `json:"audio_codec"` - VideoCodec string `json:"video_codec"` - VideoWidth string `json:"video_width"` - VideoHeight string `json:"video_height"` + Data struct { + Groups []base.StatGroup `json:"groups"` + } `json:"data"` } -type StatPub struct { - StatSession -} - -type StatSub struct { - StatSession -} - -type StatSession struct { - Protocol string `json:"protocol"` - StartTime string `json:"start_time"` - RemoteAddr string `json:"remote_addr"` - Bitrate string `json:"bitrate"` +type APIStatGroup struct { + HTTPResponseBasic + Data *base.StatGroup `json:"data"` } -func NewHTTPAPIServer(addr string) *HTTPAPIServer { +func NewHTTPAPIServer(addr string, observer HTTPAPIServerObserver) *HTTPAPIServer { return &HTTPAPIServer{ - addr: addr, + addr: addr, + observer: observer, } } @@ -88,36 +86,85 @@ func (h *HTTPAPIServer) Listen() (err error) { func (h *HTTPAPIServer) Runloop() error { mux := http.NewServeMux() - mux.HandleFunc("/api/lal_info", h.lalInfo) - mux.HandleFunc("/api/stat/group", h.statGroup) - mux.HandleFunc("/api/stat/all_group", h.statAllGroup) + //mux.HandleFunc("/api/list", h.apiListHandler) + mux.HandleFunc("/api/stat/lal_info", h.statLALInfoHandler) + mux.HandleFunc("/api/stat/group", h.statGroupHandler) + mux.HandleFunc("/api/stat/all_group", h.statAllGroupHandler) var srv http.Server srv.Handler = mux return srv.Serve(h.ln) } -func (h *HTTPAPIServer) lalInfo(w http.ResponseWriter, req *http.Request) { - var v APILalInfo - v.Code = CodeSucc +// TODO chef: dispose + +func (h *HTTPAPIServer) apiListHandler(w http.ResponseWriter, req *http.Request) { + // TODO chef: 写完api list页面 + b := []byte(` + +lal http api list + +
+
+ + + +`) + w.Header().Add("Server", base.LALHTTPAPIServer) + _, _ = w.Write(b) +} + +func (h *HTTPAPIServer) statLALInfoHandler(w http.ResponseWriter, req *http.Request) { + var v APIStatLALInfo + v.ErrorCode = ErrorCodeSucc v.Desp = DespSucc - v.BinInfo = bininfo.StringifySingleLine() - v.LalVersion = base.LALVersion - v.APIVersion = httpAPIVersion - v.StartTime = startTime + v.Data.BinInfo = bininfo.StringifySingleLine() + v.Data.LalVersion = base.LALVersion + v.Data.APIVersion = httpAPIVersion + v.Data.StartTime = startTime resp, _ := json.Marshal(v) w.Header().Add("Server", base.LALHTTPAPIServer) _, _ = w.Write(resp) } -func (h *HTTPAPIServer) statGroup(w http.ResponseWriter, req *http.Request) { +func (h *HTTPAPIServer) statAllGroupHandler(w http.ResponseWriter, req *http.Request) { + gs := h.observer.OnStatAllGroup() + var v APIStatAllGroup + v.ErrorCode = ErrorCodeSucc + v.Desp = DespSucc + v.Data.Groups = gs + resp, _ := json.Marshal(v) + w.Header().Add("Server", base.LALHTTPAPIServer) + _, _ = w.Write(resp) } -func (h *HTTPAPIServer) statAllGroup(w http.ResponseWriter, req *http.Request) { +func (h *HTTPAPIServer) statGroupHandler(w http.ResponseWriter, req *http.Request) { + var v APIStatGroup + + q := req.URL.Query() + streamName := q.Get("stream_name") + if streamName == "" { + v.ErrorCode = ErrorCodeGroupNotFound + v.Desp = DespGroupNotFound + } else { + v.Data = h.observer.OnStatGroup(streamName) + if v.Data == nil { + v.ErrorCode = ErrorCodeGroupNotFound + v.Desp = DespGroupNotFound + } else { + v.ErrorCode = ErrorCodeSucc + v.Desp = DespSucc + } + } + resp, _ := json.Marshal(v) + w.Header().Add("Server", base.LALHTTPAPIServer) + _, _ = w.Write(resp) } func init() { - startTime = time.Now().String() + startTime = time.Now().Format("2006-01-02 15:04:05.999") } diff --git a/pkg/logic/logic.go b/pkg/logic/logic.go index fded075..2e140aa 100644 --- a/pkg/logic/logic.go +++ b/pkg/logic/logic.go @@ -26,6 +26,7 @@ var _ rtmp.ServerObserver = &ServerManager{} var _ httpflv.ServerObserver = &ServerManager{} var _ httpts.ServerObserver = &ServerManager{} var _ rtsp.ServerObserver = &ServerManager{} +var _ HTTPAPIServerObserver = &ServerManager{} var _ rtmp.PubSessionObserver = &Group{} var _ rtsp.PubSessionObserver = &Group{} diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index bbdd001..8ba74d1 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -13,6 +13,8 @@ import ( "sync" "time" + "github.com/q191201771/lal/pkg/base" + "github.com/q191201771/lal/pkg/httpts" "github.com/q191201771/lal/pkg/rtsp" @@ -30,6 +32,7 @@ type ServerManager struct { hlsServer *hls.Server httptsServer *httpts.Server rtspServer *rtsp.Server + httpAPIServer *HTTPAPIServer exitChan chan struct{} mutex sync.Mutex @@ -56,6 +59,9 @@ func NewServerManager() *ServerManager { if config.RTSPConfig.Enable { m.rtspServer = rtsp.NewServer(config.RTSPConfig.Addr, m) } + if config.HTTPAPIConfig.Enable { + m.httpAPIServer = NewHTTPAPIServer(config.HTTPAPIConfig.Addr, m) + } return m } @@ -120,6 +126,18 @@ func (sm *ServerManager) RunLoop() { }() } + if sm.httpAPIServer != nil { + if err := sm.httpAPIServer.Listen(); err != nil { + nazalog.Error(err) + os.Exit(1) + } + go func() { + if err := sm.httpAPIServer.Runloop(); err != nil { + nazalog.Error(err) + } + }() + } + t := time.NewTicker(1 * time.Second) defer t.Stop() var count uint32 @@ -129,6 +147,8 @@ func (sm *ServerManager) RunLoop() { return case <-t.C: sm.iterateGroup() + + // log count++ if (count % 10) == 0 { sm.mutex.Lock() @@ -260,6 +280,28 @@ func (sm *ServerManager) OnDelRTSPPubSession(session *rtsp.PubSession) { } } +func (sm *ServerManager) OnStatAllGroup() (sgs []base.StatGroup) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + for _, g := range sm.groupMap { + sgs = append(sgs, g.GetStat()) + } + return +} + +func (sm *ServerManager) OnStatGroup(streamName string) *base.StatGroup { + sm.mutex.Lock() + defer sm.mutex.Unlock() + g := sm.getGroup("fakeAppName", streamName) + if g == nil { + return nil + } + // copy + var ret base.StatGroup + ret = g.GetStat() + return &ret +} + func (sm *ServerManager) iterateGroup() { sm.mutex.Lock() defer sm.mutex.Unlock() diff --git a/pkg/rtmp/server.go b/pkg/rtmp/server.go index ea18a8a..f54b17e 100644 --- a/pkg/rtmp/server.go +++ b/pkg/rtmp/server.go @@ -22,15 +22,15 @@ type ServerObserver interface { } type Server struct { - obs ServerObserver - addr string - ln net.Listener + observer ServerObserver + addr string + ln net.Listener } -func NewServer(obs ServerObserver, addr string) *Server { +func NewServer(observer ServerObserver, addr string) *Server { return &Server{ - obs: obs, - addr: addr, + observer: observer, + addr: addr, } } @@ -70,15 +70,15 @@ func (server *Server) handleTCPConnect(conn net.Conn) { case ServerSessionTypeUnknown: // noop case ServerSessionTypePub: - server.obs.OnDelRTMPPubSession(session) + server.observer.OnDelRTMPPubSession(session) case ServerSessionTypeSub: - server.obs.OnDelRTMPSubSession(session) + server.observer.OnDelRTMPSubSession(session) } } // ServerSessionObserver func (server *Server) OnNewRTMPPubSession(session *ServerSession) { - if !server.obs.OnNewRTMPPubSession(session) { + if !server.observer.OnNewRTMPPubSession(session) { log.Warnf("dispose PubSession since pub exist.") session.Dispose() return @@ -87,7 +87,7 @@ func (server *Server) OnNewRTMPPubSession(session *ServerSession) { // ServerSessionObserver func (server *Server) OnNewRTMPSubSession(session *ServerSession) { - if !server.obs.OnNewRTMPSubSession(session) { + if !server.observer.OnNewRTMPSubSession(session) { session.Dispose() return } diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index db45e35..54a2b10 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -11,6 +11,7 @@ package rtmp import ( "net" "strings" + "time" "github.com/q191201771/lal/pkg/base" @@ -34,8 +35,8 @@ type PubSessionObserver interface { OnReadRTMPAVMsg(msg base.RTMPMsg) } -func (s *ServerSession) SetPubSessionObserver(obs PubSessionObserver) { - s.avObs = obs +func (s *ServerSession) SetPubSessionObserver(observer PubSessionObserver) { + s.avObserver = observer } type ServerSessionType int @@ -52,29 +53,36 @@ type ServerSession struct { StreamName string StreamNameWithRawQuery string - obs ServerSessionObserver + observer ServerSessionObserver t ServerSessionType hs HandshakeServer chunkComposer *ChunkComposer packer *MessagePacker - conn connection.Connection + conn connection.Connection + prevConnStat connection.Stat + stat base.StatSession // only for PubSession - avObs PubSessionObserver + avObserver PubSessionObserver // only for SubSession IsFresh bool } -func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession { +func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession { uk := unique.GenUniqueKey("RTMPPUBSUB") s := &ServerSession{ conn: connection.New(conn, func(option *connection.Option) { option.ReadBufSize = readBufSize }), + stat: base.StatSession{ + Protocol: base.ProtocolRTMP, + StartTime: time.Now().Format("2006-01-02 15:04:05.999"), + RemoteAddr: conn.RemoteAddr().String(), + }, UniqueKey: uk, - obs: obs, + observer: observer, t: ServerSessionTypeUnknown, chunkComposer: NewChunkComposer(), packer: NewMessagePacker(), @@ -106,6 +114,28 @@ func (s *ServerSession) Dispose() { _ = s.conn.Close() } +func (s *ServerSession) GetStat() base.StatSession { + connStat := s.conn.GetStat() + s.stat.ReadBytesSum = connStat.ReadBytesSum + s.stat.WroteBytesSum = connStat.WroteBytesSum + return s.stat +} + +// TODO chef: 默认每5秒调用一次 +func (s *ServerSession) UpdateStat(tickCount uint32) { + currStat := s.conn.GetStat() + var diffStat connection.Stat + diffStat.ReadBytesSum = currStat.ReadBytesSum - s.prevConnStat.ReadBytesSum + diffStat.WroteBytesSum = currStat.WroteBytesSum - s.prevConnStat.WroteBytesSum + switch s.t { + case ServerSessionTypePub: + s.stat.Bitrate = int(diffStat.ReadBytesSum * 8 / 1024 / 5) + case ServerSessionTypeSub: + s.stat.Bitrate = int(diffStat.WroteBytesSum * 8 / 1024 / 5) + } + s.prevConnStat = currStat +} + func (s *ServerSession) runReadLoop() error { return s.chunkComposer.RunLoop(s.conn, s.doMsg) } @@ -149,7 +179,7 @@ func (s *ServerSession) doMsg(stream *Stream) error { nazalog.Errorf("[%s] read audio/video message but server session not pub type.", s.UniqueKey) return ErrRTMP } - s.avObs.OnReadRTMPAVMsg(stream.toAVMsg()) + s.avObserver.OnReadRTMPAVMsg(stream.toAVMsg()) default: nazalog.Warnf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString()) @@ -197,7 +227,7 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error { return nil } - s.avObs.OnReadRTMPAVMsg(stream.toAVMsg()) + s.avObserver.OnReadRTMPAVMsg(stream.toAVMsg()) return nil } @@ -315,7 +345,7 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) { s.ModConnProps() s.t = ServerSessionTypePub - s.obs.OnNewRTMPPubSession(s) + s.observer.OnNewRTMPPubSession(s) return nil } @@ -350,14 +380,17 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) { s.ModConnProps() s.t = ServerSessionTypeSub - s.obs.OnNewRTMPSubSession(s) + s.observer.OnNewRTMPSubSession(s) return nil } func (s *ServerSession) ModConnProps() { s.conn.ModWriteChanSize(wChanSize) - // TODO chef: naza.connection 这种方式会导致最后一点数据发送不出去,我们应该使用更好的方式 + // TODO chef: + // 使用合并发送 + // naza.connection 这种方式会导致最后一点数据发送不出去,我们应该使用更好的方式,比如合并发送模式下,Dispose时发送剩余数据 + // //s.conn.ModWriteBufSize(writeBufSize) switch s.t { diff --git a/pkg/rtsp/server.go b/pkg/rtsp/server.go index 9ffef79..6b9311d 100644 --- a/pkg/rtsp/server.go +++ b/pkg/rtsp/server.go @@ -33,8 +33,8 @@ type ServerObserver interface { } type Server struct { - addr string - obs ServerObserver + addr string + observer ServerObserver ln net.Listener availUDPConnPool *nazanet.AvailUDPConnPool @@ -43,10 +43,10 @@ type Server struct { presentation2PubSession map[string]*PubSession } -func NewServer(addr string, obs ServerObserver) *Server { +func NewServer(addr string, observer ServerObserver) *Server { return &Server{ addr: addr, - obs: obs, + observer: observer, availUDPConnPool: nazanet.NewAvailUDPConnPool(minServerPort, maxServerPort), presentation2PubSession: make(map[string]*PubSession), } @@ -123,7 +123,7 @@ func (s *Server) handleTCPConnect(conn net.Conn) { // TODO chef: 缺少统一释放pubsession的逻辑 // TODO chef: 我用ffmpeg向lal推rtsp流,发现lal直接关闭rtsp的连接,ffmpeg并不会退出,是否应先发送什么命令? - if ok := s.obs.OnNewRTSPPubSession(pubSession); !ok { + if ok := s.observer.OnNewRTSPPubSession(pubSession); !ok { nazalog.Warnf("[%s] force close pubsession.", pubSession.UniqueKey) break } @@ -187,7 +187,7 @@ func (s *Server) handleTCPConnect(conn net.Conn) { s.m.Unlock() if ok { session.Dispose() - s.obs.OnDelRTSPPubSession(session) + s.observer.OnDelRTSPPubSession(session) } resp := PackResponseTeardown(headers[HeaderFieldCSeq]) diff --git a/pkg/rtsp/server_pub_session.go b/pkg/rtsp/server_pub_session.go index e603801..50a30c0 100644 --- a/pkg/rtsp/server_pub_session.go +++ b/pkg/rtsp/server_pub_session.go @@ -10,6 +10,10 @@ package rtsp import ( "net" + "sync/atomic" + "time" + + "github.com/q191201771/naza/pkg/connection" "github.com/q191201771/naza/pkg/nazanet" @@ -37,8 +41,12 @@ type PubSession struct { observer PubSessionObserver avPacketQueue *AVPacketQueue - rtpConn *nazanet.UDPConnection - rtcpConn *nazanet.UDPConnection + rtpConn *nazanet.UDPConnection + rtcpConn *nazanet.UDPConnection + currConnStat connection.Stat + prevConnStat connection.Stat + stat base.StatPub + audioUnpacker *rtprtcp.RTPUnpacker videoUnpacker *rtprtcp.RTPUnpacker audioRRProducer *rtprtcp.RRProducer @@ -59,14 +67,20 @@ func NewPubSession(streamName string) *PubSession { ps := &PubSession{ UniqueKey: uk, StreamName: streamName, + stat: base.StatPub{ + StatSession: base.StatSession{ + Protocol: base.ProtocolRTSP, + StartTime: time.Now().Format("2006-01-02 15:04:05.999"), + }, + }, } ps.avPacketQueue = NewAVPacketQueue(ps.onAVPacket) nazalog.Infof("[%s] lifecycle new rtsp PubSession. session=%p, streamName=%s", uk, ps, streamName) return ps } -func (p *PubSession) SetObserver(obs PubSessionObserver) { - p.observer = obs +func (p *PubSession) SetObserver(observer PubSessionObserver) { + p.observer = observer if p.sps != nil && p.pps != nil { if p.vps != nil { @@ -135,16 +149,18 @@ func (p *PubSession) SetRTPConn(conn *net.UDPConn) { server, _ := nazanet.NewUDPConnection(func(option *nazanet.UDPConnectionOption) { option.Conn = conn }) - go server.RunLoop(p.onReadUDPPacket) p.rtpConn = server + + go server.RunLoop(p.onReadUDPPacket) } func (p *PubSession) SetRTCPConn(conn *net.UDPConn) { server, _ := nazanet.NewUDPConnection(func(option *nazanet.UDPConnectionOption) { option.Conn = conn }) - go server.RunLoop(p.onReadUDPPacket) p.rtcpConn = server + + go server.RunLoop(p.onReadUDPPacket) } func (p *PubSession) Dispose() { @@ -156,6 +172,18 @@ func (p *PubSession) Dispose() { } } +func (p *PubSession) GetStat() base.StatPub { + p.stat.ReadBytesSum = atomic.LoadUint64(&p.currConnStat.ReadBytesSum) + p.stat.WroteBytesSum = atomic.LoadUint64(&p.currConnStat.WroteBytesSum) + return p.stat +} + +func (p *PubSession) UpdateStat(tickCount uint32) { + diff := p.currConnStat.ReadBytesSum - p.prevConnStat.ReadBytesSum + p.stat.Bitrate = int(diff * 8 / 1024 / 5) + p.prevConnStat = p.currConnStat +} + // callback by UDPConnection // TODO yoko: 因为rtp和rtcp使用了两个连接,所以分成两个回调也行 func (p *PubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bool { @@ -164,6 +192,8 @@ func (p *PubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bo return true } + atomic.AddUint64(&p.currConnStat.ReadBytesSum, uint64(len(b))) + if len(b) < 2 { nazalog.Errorf("read udp packet length invalid. len=%d", len(b)) return true @@ -172,7 +202,6 @@ func (p *PubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bo // try RTCP switch b[1] { case rtprtcp.RTCPPacketTypeSR: - //h := rtprtcp.ParseRTCPHeader(b) sr := rtprtcp.ParseSR(b) var rrBuf []byte switch sr.SenderSSRC { @@ -181,7 +210,10 @@ func (p *PubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bo case p.videoSsrc: rrBuf = p.videoRRProducer.Produce(sr.GetMiddleNTP()) } + _ = p.rtcpConn.Write(rrBuf) + + atomic.AddUint64(&p.currConnStat.WroteBytesSum, uint64(len(b))) return true } @@ -207,6 +239,10 @@ func (p *PubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bo p.audioRRProducer.FeedRTPPacket(h.Seq) } + if p.stat.RemoteAddr == "" { + p.stat.RemoteAddr = rAddr.String() + } + return true }