增加HTTP API接口,获取服务信息

pull/33/head
q191201771 4 years ago
parent d7df4d9623
commit f2fc9a741a

@ -10,7 +10,6 @@
<a title="codeline" target="_blank" href="https://github.com/q191201771/lal"><img src="https://sloc.xyz/github/q191201771/lal/?category=code"></a>
<a title="license" target="_blank" href="https://github.com/q191201771/lal/blob/master/LICENSE"><img src="https://img.shields.io/badge/license-MIT-brightgreen.svg?style=flat-square"></a>
<a title="lastcommit" target="_blank" href="https://github.com/q191201771/lal/commits/master"><img src="https://img.shields.io/github/commit-activity/m/q191201771/lal.svg?style=flat-square"></a>
<a title="commitactivity" target="_blank" href="https://github.com/q191201771/lal/graphs/commit-activity"><img src="https://img.shields.io/github/last-commit/q191201771/lal.svg?style=flat-square"></a>
<br>
<a title="pr" target="_blank" href="https://github.com/q191201771/lal/pulls"><img src="https://img.shields.io/github/issues-pr-closed/q191201771/lal.svg?style=flat-square&color=FF9966"></a>
<a title="hits" target="_blank" href="https://github.com/q191201771/lal"><img src="https://hits.b3log.org/q191201771/lal.svg?style=flat-square"></a>

@ -7,7 +7,7 @@
"httpflv": {
"enable": true,
"sub_listen_addr": ":8090",
"enable_https": true,
"enable_https": false,
"https_addr": ":4443",
"https_cert_file": "./conf/cert.pem",
"https_key_file": "./conf/key.pem",
@ -38,9 +38,13 @@
"enable": true,
"addr": "127.0.0.1:19350"
},
"http_api": {
"enable": true,
"addr": ":8093"
},
"pprof": {
"enable": false,
"addr": ":10002"
"addr": ":10011"
},
"log": {
"level": 1,

@ -37,6 +37,10 @@
"enable": false,
"addr": ""
},
"http_api": {
"enable": true,
"addr": ":8083"
},
"pprof": {
"enable": true,
"addr": ":10001"

@ -37,6 +37,10 @@
"enable": false, // 是否开启回源拉流功能,开启后,当自身接收到拉流请求,而流不存在时,会从其他服务器拉取这个流到本地
"addr": "" // 回源拉流的地址。格式举例 "127.0.0.1:19351"
},
"http_api": {
"enable": true, // 是否开启HTTP API接口
"addr": ":8083" // 监听地址
},
"pprof": {
"enable": true, // 是否开启Go pprof web服务的监听
"addr": ":10001" // Go pprof web地址

@ -37,6 +37,10 @@
"enable": false,
"addr": ""
},
"http_api": {
"enable": true,
"addr": ":8083"
},
"pprof": {
"enable": true,
"addr": ":10001"

@ -2,4 +2,4 @@ module github.com/q191201771/lal
go 1.12
require github.com/q191201771/naza v0.15.0
require github.com/q191201771/naza v0.15.1

@ -1,2 +1,2 @@
github.com/q191201771/naza v0.15.0 h1:HFyRrluqZhpnBu6YQ1soIk6cR9P8G/9sDMFLBhTTBRc=
github.com/q191201771/naza v0.15.0/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=
github.com/q191201771/naza v0.15.1 h1:y9D7jbzHeD883PqBZTln+O47E40dFoRlQUrWYOA5GoM=
github.com/q191201771/naza v0.15.1/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=

@ -333,6 +333,7 @@ func CaptureAVCC2AnnexB(w io.Writer, payload []byte) error {
return nil
}
// TODO chef: hevc中ctx作为参数传入这里考虑统一一下
// 尝试解析SPS所有字段实验中请勿直接使用该函数
func ParseSPS(payload []byte) (Context, error) {
var sps SPS

@ -0,0 +1,71 @@
// Copyright 2020, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
const (
// StatGroup.AudioCodec
AudioCodecAAC = "AAC"
// StatGroup.VideoCodec
VideoCodecAVC = "H264"
VideoCodecHEVC = "H265"
// StatSession.Protocol
ProtocolRTMP = "RTMP"
ProtocolRTSP = "RTSP"
ProtocolHTTPFLV = "HTTP-FLV"
ProtocolHTTPTS = "HTTP-TS"
)
type StatGroup struct {
StreamName string `json:"stream_name"`
AudioCodec string `json:"audio_codec"`
VideoCodec string `json:"video_codec"`
VideoWidth int `json:"video_width"`
VideoHeight int `json:"video_height"`
StatPub StatPub `json:"pub"`
StatSubs []StatSub `json:"subs"`
}
type StatPub struct {
StatSession
}
type StatSub struct {
StatSession
}
type StatSession struct {
Protocol string `json:"protocol"`
StartTime string `json:"start_time"`
RemoteAddr string `json:"remote_addr"`
ReadBytesSum uint64 `json:"read_bytes_sum"`
WroteBytesSum uint64 `json:"wrote_bytes_sum"`
Bitrate int `json:"bitrate"`
}
func StatSession2Pub(ss StatSession) (ret StatPub) {
ret.Protocol = ss.Protocol
ret.StartTime = ss.StartTime
ret.RemoteAddr = ss.RemoteAddr
ret.ReadBytesSum = ss.ReadBytesSum
ret.WroteBytesSum = ss.WroteBytesSum
ret.Bitrate = ss.Bitrate
return
}
func StatSession2Sub(ss StatSession) (ret StatSub) {
ret.Protocol = ss.Protocol
ret.StartTime = ss.StartTime
ret.RemoteAddr = ss.RemoteAddr
ret.ReadBytesSum = ss.ReadBytesSum
ret.WroteBytesSum = ss.WroteBytesSum
ret.Bitrate = ss.Bitrate
return
}

@ -34,18 +34,18 @@ type ServerConfig struct {
}
type Server struct {
obs ServerObserver
config ServerConfig
ln net.Listener
httpsLn net.Listener
observer ServerObserver
config ServerConfig
ln net.Listener
httpsLn net.Listener
}
// TODO chef: 监听太难看了考虑直接传入Listener对象或直接路由进来使得不同server可以共用端口
func NewServer(obs ServerObserver, config ServerConfig) *Server {
func NewServer(observer ServerObserver, config ServerConfig) *Server {
return &Server{
obs: obs,
config: config,
observer: observer,
config: config,
}
}
@ -133,11 +133,11 @@ func (server *Server) handleConnect(conn net.Conn) {
}
nazalog.Debugf("[%s] < read http request. uri=%s", session.UniqueKey, session.URI)
if !server.obs.OnNewHTTPFLVSubSession(session) {
if !server.observer.OnNewHTTPFLVSubSession(session) {
session.Dispose()
}
err := session.RunLoop()
nazalog.Debugf("[%s] httpflv sub session loop done. err=%v", session.UniqueKey, err)
server.obs.OnDelHTTPFLVSubSession(session)
server.observer.OnDelHTTPFLVSubSession(session)
}

@ -37,7 +37,9 @@ type SubSession struct {
IsFresh bool
conn connection.Connection
conn connection.Connection
prevConnStat connection.Stat
stat base.StatSub
}
func NewSubSession(conn net.Conn) *SubSession {
@ -50,6 +52,13 @@ func NewSubSession(conn net.Conn) *SubSession {
option.WriteChanSize = wChanSize
option.WriteTimeoutMS = subSessionWriteTimeoutMS
}),
stat: base.StatSub{
StatSession: base.StatSession{
Protocol: base.ProtocolHTTPFLV,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
RemoteAddr: conn.RemoteAddr().String(),
},
},
}
nazalog.Infof("[%s] lifecycle new httpflv SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String())
return s
@ -134,6 +143,21 @@ func (session *SubSession) Dispose() {
_ = session.conn.Close()
}
func (session *SubSession) GetStat() base.StatSub {
currStat := session.conn.GetStat()
session.stat.ReadBytesSum = currStat.ReadBytesSum
session.stat.WroteBytesSum = currStat.WroteBytesSum
return session.stat
}
func (session *SubSession) UpdateStat(tickCount uint32) {
currStat := session.conn.GetStat()
var diffStat connection.Stat
diffStat.WroteBytesSum = currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum
session.stat.Bitrate = int(diffStat.WroteBytesSum * 8 / 1024 / 5)
session.prevConnStat = currStat
}
func init() {
flvHTTPResponseHeaderStr := "HTTP/1.1 200 OK\r\n" +
"Server: " + base.LALHTTPFLVSubSessionServer + "\r\n" +

@ -23,15 +23,15 @@ type ServerObserver interface {
}
type Server struct {
obs ServerObserver
addr string
ln net.Listener
observer ServerObserver
addr string
ln net.Listener
}
func NewServer(obs ServerObserver, addr string) *Server {
func NewServer(observer ServerObserver, addr string) *Server {
return &Server{
obs: obs,
addr: addr,
observer: observer,
addr: addr,
}
}
@ -71,11 +71,11 @@ func (server *Server) handleConnect(conn net.Conn) {
}
log.Debugf("[%s] < read http request. uri=%s", session.UniqueKey, session.URI)
if !server.obs.OnNewHTTPTSSubSession(session) {
if !server.observer.OnNewHTTPTSSubSession(session) {
session.Dispose()
}
err := session.RunLoop()
log.Debugf("[%s] httpts sub session loop done. err=%v", session.UniqueKey, err)
server.obs.OnDelHTTPTSSubSession(session)
server.observer.OnDelHTTPTSSubSession(session)
}

@ -36,7 +36,9 @@ type SubSession struct {
IsFresh bool
conn connection.Connection
conn connection.Connection
prevConnStat connection.Stat
stat base.StatSub
}
func NewSubSession(conn net.Conn) *SubSession {
@ -49,6 +51,13 @@ func NewSubSession(conn net.Conn) *SubSession {
option.WriteChanSize = wChanSize
option.WriteTimeoutMS = subSessionWriteTimeoutMS
}),
stat: base.StatSub{
StatSession: base.StatSession{
Protocol: base.ProtocolHTTPTS,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
RemoteAddr: conn.RemoteAddr().String(),
},
},
}
nazalog.Infof("[%s] lifecycle new httpts SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String())
return s
@ -129,6 +138,21 @@ func (session *SubSession) Dispose() {
_ = session.conn.Close()
}
func (session *SubSession) GetStat() base.StatSub {
currStat := session.conn.GetStat()
session.stat.ReadBytesSum = currStat.ReadBytesSum
session.stat.WroteBytesSum = currStat.WroteBytesSum
return session.stat
}
func (session *SubSession) UpdateStat(tickCount uint32) {
currStat := session.conn.GetStat()
var diffStat connection.Stat
diffStat.WroteBytesSum = currStat.WroteBytesSum - session.prevConnStat.WroteBytesSum
session.stat.Bitrate = int(diffStat.WroteBytesSum * 8 / 1024 / 5)
session.prevConnStat = currStat
}
func init() {
tsHTTPResponseHeaderStr := "HTTP/1.1 200 OK\r\n" +
"Server: " + base.LALHTTPTSSubSessionServer + "\r\n" +

@ -29,8 +29,9 @@ type Config struct {
RelayPushConfig RelayPushConfig `json:"relay_push"`
RelayPullConfig RelayPullConfig `json:"relay_pull"`
PProfConfig PProfConfig `json:"pprof"`
LogConfig nazalog.Option `json:"log"`
HTTPAPIConfig HTTPAPIConfig `json:"http_api"`
PProfConfig PProfConfig `json:"pprof"`
LogConfig nazalog.Option `json:"log"`
}
type RTMPConfig struct {
@ -69,6 +70,11 @@ type RelayPullConfig struct {
Addr string `json:"addr"`
}
type HTTPAPIConfig struct {
Enable bool `json:"enable"`
Addr string `json:"addr"`
}
type PProfConfig struct {
Enable bool `json:"enable"`
Addr string `json:"addr"`
@ -97,6 +103,7 @@ func LoadConf(confFile string) (*Config, error) {
!j.Exist("rtsp") ||
!j.Exist("relay_push") ||
!j.Exist("relay_pull") ||
!j.Exist("http_api") ||
!j.Exist("pprof") ||
!j.Exist("log") {
return &config, errors.New("missing key field in config file")

@ -14,8 +14,6 @@ import (
"github.com/q191201771/lal/pkg/hevc"
"github.com/q191201771/naza/pkg/nazastring"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/naza/pkg/bele"
@ -44,7 +42,8 @@ type Group struct {
UniqueKey string
appName string
streamName string
streamName string // TODO chef: 和stat里的字段重复可以删除掉
stat base.StatGroup
exitChan chan struct{}
@ -66,6 +65,8 @@ type Group struct {
vps []byte
sps []byte
pps []byte
tickCount uint32
}
type pushProxy struct {
@ -89,9 +90,12 @@ func NewGroup(appName string, streamName string) *Group {
}
return &Group{
UniqueKey: uk,
appName: appName,
streamName: streamName,
UniqueKey: uk,
appName: appName,
streamName: streamName,
stat: base.StatGroup{
StreamName: streamName,
},
exitChan: make(chan struct{}, 1),
rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}),
httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}),
@ -107,12 +111,33 @@ func (group *Group) RunLoop() {
}
// TODO chef: 传入时间
// 目前每秒触发一次
func (group *Group) Tick() {
group.mutex.Lock()
defer group.mutex.Unlock()
group.pullIfNeeded()
group.pushIfNeeded()
// 每5秒计算session bitrate
if group.tickCount%5 == 0 {
if group.rtmpPubSession != nil {
group.rtmpPubSession.UpdateStat(group.tickCount)
}
if group.rtspPubSession != nil {
group.rtspPubSession.UpdateStat(group.tickCount)
}
for session := range group.rtmpSubSessionSet {
session.UpdateStat(group.tickCount)
}
for session := range group.httpflvSubSessionSet {
session.UpdateStat(group.tickCount)
}
for session := range group.httptsSubSessionSet {
session.UpdateStat(group.tickCount)
}
}
group.tickCount++
}
// 主动释放所有资源。保证所有资源的生命周期逻辑上都在我们的控制中。降低出bug的几率降低心智负担。
@ -452,6 +477,7 @@ func (group *Group) OnAVPacket(pkt base.AVPacket) {
group.broadcastRTMP(msg)
}
// TODO chef 用Stat取代这个序列号函数
func (group *Group) StringifyStats() string {
group.mutex.Lock()
defer group.mutex.Unlock()
@ -476,8 +502,34 @@ func (group *Group) StringifyStats() string {
}
}
return fmt.Sprintf("[%s] stream name=%s, rtmp pub=%s, relay rtmp pull=%s, rtmp sub=%d, httpflv sub=%d, httpts sub=%d, relay rtmp push=%d",
group.UniqueKey, group.streamName, pub, pull, len(group.rtmpSubSessionSet), len(group.httpflvSubSessionSet), len(group.httptsSubSessionSet), pushSize)
return fmt.Sprintf("[%s] stat=%+v, rtmp pub=%s, relay rtmp pull=%s, rtmp sub=%d, httpflv sub=%d, httpts sub=%d, relay rtmp push=%d",
group.UniqueKey, group.stat, pub, pull, len(group.rtmpSubSessionSet), len(group.httpflvSubSessionSet), len(group.httptsSubSessionSet), pushSize)
}
func (group *Group) GetStat() base.StatGroup {
group.mutex.Lock()
defer group.mutex.Unlock()
if group.rtmpPubSession != nil {
group.stat.StatPub = base.StatSession2Pub(group.rtmpPubSession.GetStat())
} else if group.rtspPubSession != nil {
group.stat.StatPub = group.rtspPubSession.GetStat()
} else {
group.stat.StatPub = base.StatPub{}
}
group.stat.StatSubs = nil
for s := range group.rtmpSubSessionSet {
group.stat.StatSubs = append(group.stat.StatSubs, base.StatSession2Sub(s.GetStat()))
}
for s := range group.httpflvSubSessionSet {
group.stat.StatSubs = append(group.stat.StatSubs, s.GetStat())
}
for s := range group.httptsSubSessionSet {
group.stat.StatSubs = append(group.stat.StatSubs, s.GetStat())
}
return group.stat
}
func (group *Group) broadcastMetadataAndSeqHeader() {
@ -565,9 +617,6 @@ func (group *Group) broadcastMetadataAndSeqHeader() {
// TODO chef: 目前相当于其他类型往rtmp.AVMsg转了考虑统一往一个通用类型转
// @param msg 调用结束后内部不持有msg.Payload内存块
func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
if msg.IsHEVCKeySeqHeader() {
nazalog.Debugf("%s", nazastring.DumpSliceByte(msg.Payload))
}
var (
lcd LazyChunkDivider
lrm2ft LazyRTMPMsg2FLVTag
@ -672,10 +721,47 @@ func (group *Group) broadcastRTMP(msg base.RTMPMsg) {
if config.RTMPConfig.Enable {
group.gopCache.Feed(msg, lcd.Get)
}
if config.HTTPFLVConfig.Enable {
group.httpflvGopCache.Feed(msg, lrm2ft.Get)
}
// # 6. 记录stat
if group.stat.AudioCodec == "" {
if msg.IsAACSeqHeader() {
group.stat.AudioCodec = base.AudioCodecAAC
}
}
if group.stat.AudioCodec == "" {
if msg.IsAVCKeySeqHeader() {
group.stat.VideoCodec = base.VideoCodecAVC
}
if msg.IsHEVCKeySeqHeader() {
group.stat.VideoCodec = base.VideoCodecHEVC
}
}
if group.stat.VideoHeight == 0 || group.stat.VideoWidth == 0 {
if msg.IsAVCKeySeqHeader() {
sps, _, err := avc.ParseSPSPPSFromSeqHeader(msg.Payload)
if err == nil {
ctx, err := avc.ParseSPS(sps)
if err == nil {
group.stat.VideoHeight = int(ctx.Height)
group.stat.VideoWidth = int(ctx.Width)
}
}
}
if msg.IsHEVCKeySeqHeader() {
_, sps, _, err := hevc.ParseVPSSPSPPSFromSeqHeader(msg.Payload)
if err == nil {
var ctx hevc.Context
err = hevc.ParseSPS(sps, &ctx)
if err == nil {
group.stat.VideoHeight = int(ctx.PicHeightInLumaSamples)
group.stat.VideoWidth = int(ctx.PicWidthInLumaSamples)
}
}
}
}
}
func (group *Group) pullIfNeeded() {

@ -20,60 +20,58 @@ import (
)
const httpAPIVersion = "v0.0.1"
const CodeSucc = 0
const DespSucc = "succ"
const (
ErrorCodeSucc = 0
DespSucc = "succ"
ErrorCodeGroupNotFound = 1001
DespGroupNotFound = "group not found"
)
var startTime string
type HTTPAPIServerObserver interface {
OnStatAllGroup() []base.StatGroup
OnStatGroup(streamName string) *base.StatGroup
}
type HTTPAPIServer struct {
addr string
ln net.Listener
addr string
observer HTTPAPIServerObserver
ln net.Listener
}
type HTTPResponseBasic struct {
Code int `json:"code"`
Desp string `json:"desp"`
ErrorCode int `json:"error_code"`
Desp string `json:"desp"`
}
type APILalInfo struct {
type APIStatLALInfo struct {
HTTPResponseBasic
BinInfo string `json:"bin_info"`
LalVersion string `json:"lal_version"`
APIVersion string `json:"api_version"`
StartTime string `json:"start_time"`
Data struct {
BinInfo string `json:"bin_info"`
LalVersion string `json:"lal_version"`
APIVersion string `json:"api_version"`
StartTime string `json:"start_time"`
} `json:"data"`
}
type APIStatAllGroup struct {
HTTPResponseBasic
Groups []StatGroupItem `json:"groups"`
}
type StatGroupItem struct {
StreamName string `json:"stream_name"`
AudioCodec string `json:"audio_codec"`
VideoCodec string `json:"video_codec"`
VideoWidth string `json:"video_width"`
VideoHeight string `json:"video_height"`
Data struct {
Groups []base.StatGroup `json:"groups"`
} `json:"data"`
}
type StatPub struct {
StatSession
}
type StatSub struct {
StatSession
}
type StatSession struct {
Protocol string `json:"protocol"`
StartTime string `json:"start_time"`
RemoteAddr string `json:"remote_addr"`
Bitrate string `json:"bitrate"`
type APIStatGroup struct {
HTTPResponseBasic
Data *base.StatGroup `json:"data"`
}
func NewHTTPAPIServer(addr string) *HTTPAPIServer {
func NewHTTPAPIServer(addr string, observer HTTPAPIServerObserver) *HTTPAPIServer {
return &HTTPAPIServer{
addr: addr,
addr: addr,
observer: observer,
}
}
@ -88,36 +86,85 @@ func (h *HTTPAPIServer) Listen() (err error) {
func (h *HTTPAPIServer) Runloop() error {
mux := http.NewServeMux()
mux.HandleFunc("/api/lal_info", h.lalInfo)
mux.HandleFunc("/api/stat/group", h.statGroup)
mux.HandleFunc("/api/stat/all_group", h.statAllGroup)
//mux.HandleFunc("/api/list", h.apiListHandler)
mux.HandleFunc("/api/stat/lal_info", h.statLALInfoHandler)
mux.HandleFunc("/api/stat/group", h.statGroupHandler)
mux.HandleFunc("/api/stat/all_group", h.statAllGroupHandler)
var srv http.Server
srv.Handler = mux
return srv.Serve(h.ln)
}
func (h *HTTPAPIServer) lalInfo(w http.ResponseWriter, req *http.Request) {
var v APILalInfo
v.Code = CodeSucc
// TODO chef: dispose
func (h *HTTPAPIServer) apiListHandler(w http.ResponseWriter, req *http.Request) {
// TODO chef: 写完api list页面
b := []byte(`
<html>
<head><title>lal http api list</title></head>
<body>
<br>
<br>
<ul>
<li><a href="https://pengrl.com">lal http api</li>
</ul>
</body>
</html>
`)
w.Header().Add("Server", base.LALHTTPAPIServer)
_, _ = w.Write(b)
}
func (h *HTTPAPIServer) statLALInfoHandler(w http.ResponseWriter, req *http.Request) {
var v APIStatLALInfo
v.ErrorCode = ErrorCodeSucc
v.Desp = DespSucc
v.BinInfo = bininfo.StringifySingleLine()
v.LalVersion = base.LALVersion
v.APIVersion = httpAPIVersion
v.StartTime = startTime
v.Data.BinInfo = bininfo.StringifySingleLine()
v.Data.LalVersion = base.LALVersion
v.Data.APIVersion = httpAPIVersion
v.Data.StartTime = startTime
resp, _ := json.Marshal(v)
w.Header().Add("Server", base.LALHTTPAPIServer)
_, _ = w.Write(resp)
}
func (h *HTTPAPIServer) statGroup(w http.ResponseWriter, req *http.Request) {
func (h *HTTPAPIServer) statAllGroupHandler(w http.ResponseWriter, req *http.Request) {
gs := h.observer.OnStatAllGroup()
var v APIStatAllGroup
v.ErrorCode = ErrorCodeSucc
v.Desp = DespSucc
v.Data.Groups = gs
resp, _ := json.Marshal(v)
w.Header().Add("Server", base.LALHTTPAPIServer)
_, _ = w.Write(resp)
}
func (h *HTTPAPIServer) statAllGroup(w http.ResponseWriter, req *http.Request) {
func (h *HTTPAPIServer) statGroupHandler(w http.ResponseWriter, req *http.Request) {
var v APIStatGroup
q := req.URL.Query()
streamName := q.Get("stream_name")
if streamName == "" {
v.ErrorCode = ErrorCodeGroupNotFound
v.Desp = DespGroupNotFound
} else {
v.Data = h.observer.OnStatGroup(streamName)
if v.Data == nil {
v.ErrorCode = ErrorCodeGroupNotFound
v.Desp = DespGroupNotFound
} else {
v.ErrorCode = ErrorCodeSucc
v.Desp = DespSucc
}
}
resp, _ := json.Marshal(v)
w.Header().Add("Server", base.LALHTTPAPIServer)
_, _ = w.Write(resp)
}
func init() {
startTime = time.Now().String()
startTime = time.Now().Format("2006-01-02 15:04:05.999")
}

@ -26,6 +26,7 @@ var _ rtmp.ServerObserver = &ServerManager{}
var _ httpflv.ServerObserver = &ServerManager{}
var _ httpts.ServerObserver = &ServerManager{}
var _ rtsp.ServerObserver = &ServerManager{}
var _ HTTPAPIServerObserver = &ServerManager{}
var _ rtmp.PubSessionObserver = &Group{}
var _ rtsp.PubSessionObserver = &Group{}

@ -13,6 +13,8 @@ import (
"sync"
"time"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/httpts"
"github.com/q191201771/lal/pkg/rtsp"
@ -30,6 +32,7 @@ type ServerManager struct {
hlsServer *hls.Server
httptsServer *httpts.Server
rtspServer *rtsp.Server
httpAPIServer *HTTPAPIServer
exitChan chan struct{}
mutex sync.Mutex
@ -56,6 +59,9 @@ func NewServerManager() *ServerManager {
if config.RTSPConfig.Enable {
m.rtspServer = rtsp.NewServer(config.RTSPConfig.Addr, m)
}
if config.HTTPAPIConfig.Enable {
m.httpAPIServer = NewHTTPAPIServer(config.HTTPAPIConfig.Addr, m)
}
return m
}
@ -120,6 +126,18 @@ func (sm *ServerManager) RunLoop() {
}()
}
if sm.httpAPIServer != nil {
if err := sm.httpAPIServer.Listen(); err != nil {
nazalog.Error(err)
os.Exit(1)
}
go func() {
if err := sm.httpAPIServer.Runloop(); err != nil {
nazalog.Error(err)
}
}()
}
t := time.NewTicker(1 * time.Second)
defer t.Stop()
var count uint32
@ -129,6 +147,8 @@ func (sm *ServerManager) RunLoop() {
return
case <-t.C:
sm.iterateGroup()
// log
count++
if (count % 10) == 0 {
sm.mutex.Lock()
@ -260,6 +280,28 @@ func (sm *ServerManager) OnDelRTSPPubSession(session *rtsp.PubSession) {
}
}
func (sm *ServerManager) OnStatAllGroup() (sgs []base.StatGroup) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
for _, g := range sm.groupMap {
sgs = append(sgs, g.GetStat())
}
return
}
func (sm *ServerManager) OnStatGroup(streamName string) *base.StatGroup {
sm.mutex.Lock()
defer sm.mutex.Unlock()
g := sm.getGroup("fakeAppName", streamName)
if g == nil {
return nil
}
// copy
var ret base.StatGroup
ret = g.GetStat()
return &ret
}
func (sm *ServerManager) iterateGroup() {
sm.mutex.Lock()
defer sm.mutex.Unlock()

@ -22,15 +22,15 @@ type ServerObserver interface {
}
type Server struct {
obs ServerObserver
addr string
ln net.Listener
observer ServerObserver
addr string
ln net.Listener
}
func NewServer(obs ServerObserver, addr string) *Server {
func NewServer(observer ServerObserver, addr string) *Server {
return &Server{
obs: obs,
addr: addr,
observer: observer,
addr: addr,
}
}
@ -70,15 +70,15 @@ func (server *Server) handleTCPConnect(conn net.Conn) {
case ServerSessionTypeUnknown:
// noop
case ServerSessionTypePub:
server.obs.OnDelRTMPPubSession(session)
server.observer.OnDelRTMPPubSession(session)
case ServerSessionTypeSub:
server.obs.OnDelRTMPSubSession(session)
server.observer.OnDelRTMPSubSession(session)
}
}
// ServerSessionObserver
func (server *Server) OnNewRTMPPubSession(session *ServerSession) {
if !server.obs.OnNewRTMPPubSession(session) {
if !server.observer.OnNewRTMPPubSession(session) {
log.Warnf("dispose PubSession since pub exist.")
session.Dispose()
return
@ -87,7 +87,7 @@ func (server *Server) OnNewRTMPPubSession(session *ServerSession) {
// ServerSessionObserver
func (server *Server) OnNewRTMPSubSession(session *ServerSession) {
if !server.obs.OnNewRTMPSubSession(session) {
if !server.observer.OnNewRTMPSubSession(session) {
session.Dispose()
return
}

@ -11,6 +11,7 @@ package rtmp
import (
"net"
"strings"
"time"
"github.com/q191201771/lal/pkg/base"
@ -34,8 +35,8 @@ type PubSessionObserver interface {
OnReadRTMPAVMsg(msg base.RTMPMsg)
}
func (s *ServerSession) SetPubSessionObserver(obs PubSessionObserver) {
s.avObs = obs
func (s *ServerSession) SetPubSessionObserver(observer PubSessionObserver) {
s.avObserver = observer
}
type ServerSessionType int
@ -52,29 +53,36 @@ type ServerSession struct {
StreamName string
StreamNameWithRawQuery string
obs ServerSessionObserver
observer ServerSessionObserver
t ServerSessionType
hs HandshakeServer
chunkComposer *ChunkComposer
packer *MessagePacker
conn connection.Connection
conn connection.Connection
prevConnStat connection.Stat
stat base.StatSession
// only for PubSession
avObs PubSessionObserver
avObserver PubSessionObserver
// only for SubSession
IsFresh bool
}
func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession {
func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession {
uk := unique.GenUniqueKey("RTMPPUBSUB")
s := &ServerSession{
conn: connection.New(conn, func(option *connection.Option) {
option.ReadBufSize = readBufSize
}),
stat: base.StatSession{
Protocol: base.ProtocolRTMP,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
RemoteAddr: conn.RemoteAddr().String(),
},
UniqueKey: uk,
obs: obs,
observer: observer,
t: ServerSessionTypeUnknown,
chunkComposer: NewChunkComposer(),
packer: NewMessagePacker(),
@ -106,6 +114,28 @@ func (s *ServerSession) Dispose() {
_ = s.conn.Close()
}
func (s *ServerSession) GetStat() base.StatSession {
connStat := s.conn.GetStat()
s.stat.ReadBytesSum = connStat.ReadBytesSum
s.stat.WroteBytesSum = connStat.WroteBytesSum
return s.stat
}
// TODO chef: 默认每5秒调用一次
func (s *ServerSession) UpdateStat(tickCount uint32) {
currStat := s.conn.GetStat()
var diffStat connection.Stat
diffStat.ReadBytesSum = currStat.ReadBytesSum - s.prevConnStat.ReadBytesSum
diffStat.WroteBytesSum = currStat.WroteBytesSum - s.prevConnStat.WroteBytesSum
switch s.t {
case ServerSessionTypePub:
s.stat.Bitrate = int(diffStat.ReadBytesSum * 8 / 1024 / 5)
case ServerSessionTypeSub:
s.stat.Bitrate = int(diffStat.WroteBytesSum * 8 / 1024 / 5)
}
s.prevConnStat = currStat
}
func (s *ServerSession) runReadLoop() error {
return s.chunkComposer.RunLoop(s.conn, s.doMsg)
}
@ -149,7 +179,7 @@ func (s *ServerSession) doMsg(stream *Stream) error {
nazalog.Errorf("[%s] read audio/video message but server session not pub type.", s.UniqueKey)
return ErrRTMP
}
s.avObs.OnReadRTMPAVMsg(stream.toAVMsg())
s.avObserver.OnReadRTMPAVMsg(stream.toAVMsg())
default:
nazalog.Warnf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey, stream.header.MsgTypeID, stream.toDebugString())
@ -197,7 +227,7 @@ func (s *ServerSession) doDataMessageAMF0(stream *Stream) error {
return nil
}
s.avObs.OnReadRTMPAVMsg(stream.toAVMsg())
s.avObserver.OnReadRTMPAVMsg(stream.toAVMsg())
return nil
}
@ -315,7 +345,7 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) {
s.ModConnProps()
s.t = ServerSessionTypePub
s.obs.OnNewRTMPPubSession(s)
s.observer.OnNewRTMPPubSession(s)
return nil
}
@ -350,14 +380,17 @@ func (s *ServerSession) doPlay(tid int, stream *Stream) (err error) {
s.ModConnProps()
s.t = ServerSessionTypeSub
s.obs.OnNewRTMPSubSession(s)
s.observer.OnNewRTMPSubSession(s)
return nil
}
func (s *ServerSession) ModConnProps() {
s.conn.ModWriteChanSize(wChanSize)
// TODO chef: naza.connection 这种方式会导致最后一点数据发送不出去,我们应该使用更好的方式
// TODO chef:
// 使用合并发送
// naza.connection 这种方式会导致最后一点数据发送不出去我们应该使用更好的方式比如合并发送模式下Dispose时发送剩余数据
//
//s.conn.ModWriteBufSize(writeBufSize)
switch s.t {

@ -33,8 +33,8 @@ type ServerObserver interface {
}
type Server struct {
addr string
obs ServerObserver
addr string
observer ServerObserver
ln net.Listener
availUDPConnPool *nazanet.AvailUDPConnPool
@ -43,10 +43,10 @@ type Server struct {
presentation2PubSession map[string]*PubSession
}
func NewServer(addr string, obs ServerObserver) *Server {
func NewServer(addr string, observer ServerObserver) *Server {
return &Server{
addr: addr,
obs: obs,
observer: observer,
availUDPConnPool: nazanet.NewAvailUDPConnPool(minServerPort, maxServerPort),
presentation2PubSession: make(map[string]*PubSession),
}
@ -123,7 +123,7 @@ func (s *Server) handleTCPConnect(conn net.Conn) {
// TODO chef: 缺少统一释放pubsession的逻辑
// TODO chef: 我用ffmpeg向lal推rtsp流发现lal直接关闭rtsp的连接ffmpeg并不会退出是否应先发送什么命令
if ok := s.obs.OnNewRTSPPubSession(pubSession); !ok {
if ok := s.observer.OnNewRTSPPubSession(pubSession); !ok {
nazalog.Warnf("[%s] force close pubsession.", pubSession.UniqueKey)
break
}
@ -187,7 +187,7 @@ func (s *Server) handleTCPConnect(conn net.Conn) {
s.m.Unlock()
if ok {
session.Dispose()
s.obs.OnDelRTSPPubSession(session)
s.observer.OnDelRTSPPubSession(session)
}
resp := PackResponseTeardown(headers[HeaderFieldCSeq])

@ -10,6 +10,10 @@ package rtsp
import (
"net"
"sync/atomic"
"time"
"github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazanet"
@ -37,8 +41,12 @@ type PubSession struct {
observer PubSessionObserver
avPacketQueue *AVPacketQueue
rtpConn *nazanet.UDPConnection
rtcpConn *nazanet.UDPConnection
rtpConn *nazanet.UDPConnection
rtcpConn *nazanet.UDPConnection
currConnStat connection.Stat
prevConnStat connection.Stat
stat base.StatPub
audioUnpacker *rtprtcp.RTPUnpacker
videoUnpacker *rtprtcp.RTPUnpacker
audioRRProducer *rtprtcp.RRProducer
@ -59,14 +67,20 @@ func NewPubSession(streamName string) *PubSession {
ps := &PubSession{
UniqueKey: uk,
StreamName: streamName,
stat: base.StatPub{
StatSession: base.StatSession{
Protocol: base.ProtocolRTSP,
StartTime: time.Now().Format("2006-01-02 15:04:05.999"),
},
},
}
ps.avPacketQueue = NewAVPacketQueue(ps.onAVPacket)
nazalog.Infof("[%s] lifecycle new rtsp PubSession. session=%p, streamName=%s", uk, ps, streamName)
return ps
}
func (p *PubSession) SetObserver(obs PubSessionObserver) {
p.observer = obs
func (p *PubSession) SetObserver(observer PubSessionObserver) {
p.observer = observer
if p.sps != nil && p.pps != nil {
if p.vps != nil {
@ -135,16 +149,18 @@ func (p *PubSession) SetRTPConn(conn *net.UDPConn) {
server, _ := nazanet.NewUDPConnection(func(option *nazanet.UDPConnectionOption) {
option.Conn = conn
})
go server.RunLoop(p.onReadUDPPacket)
p.rtpConn = server
go server.RunLoop(p.onReadUDPPacket)
}
func (p *PubSession) SetRTCPConn(conn *net.UDPConn) {
server, _ := nazanet.NewUDPConnection(func(option *nazanet.UDPConnectionOption) {
option.Conn = conn
})
go server.RunLoop(p.onReadUDPPacket)
p.rtcpConn = server
go server.RunLoop(p.onReadUDPPacket)
}
func (p *PubSession) Dispose() {
@ -156,6 +172,18 @@ func (p *PubSession) Dispose() {
}
}
func (p *PubSession) GetStat() base.StatPub {
p.stat.ReadBytesSum = atomic.LoadUint64(&p.currConnStat.ReadBytesSum)
p.stat.WroteBytesSum = atomic.LoadUint64(&p.currConnStat.WroteBytesSum)
return p.stat
}
func (p *PubSession) UpdateStat(tickCount uint32) {
diff := p.currConnStat.ReadBytesSum - p.prevConnStat.ReadBytesSum
p.stat.Bitrate = int(diff * 8 / 1024 / 5)
p.prevConnStat = p.currConnStat
}
// callback by UDPConnection
// TODO yoko: 因为rtp和rtcp使用了两个连接所以分成两个回调也行
func (p *PubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bool {
@ -164,6 +192,8 @@ func (p *PubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bo
return true
}
atomic.AddUint64(&p.currConnStat.ReadBytesSum, uint64(len(b)))
if len(b) < 2 {
nazalog.Errorf("read udp packet length invalid. len=%d", len(b))
return true
@ -172,7 +202,6 @@ func (p *PubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bo
// try RTCP
switch b[1] {
case rtprtcp.RTCPPacketTypeSR:
//h := rtprtcp.ParseRTCPHeader(b)
sr := rtprtcp.ParseSR(b)
var rrBuf []byte
switch sr.SenderSSRC {
@ -181,7 +210,10 @@ func (p *PubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bo
case p.videoSsrc:
rrBuf = p.videoRRProducer.Produce(sr.GetMiddleNTP())
}
_ = p.rtcpConn.Write(rrBuf)
atomic.AddUint64(&p.currConnStat.WroteBytesSum, uint64(len(b)))
return true
}
@ -207,6 +239,10 @@ func (p *PubSession) onReadUDPPacket(b []byte, rAddr *net.UDPAddr, err error) bo
p.audioRRProducer.FeedRTPPacket(h.Seq)
}
if p.stat.RemoteAddr == "" {
p.stat.RemoteAddr = rAddr.String()
}
return true
}

Loading…
Cancel
Save