mirror of https://github.com/ossrs/srs.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
273 lines
7.2 KiB
Go
273 lines
7.2 KiB
Go
// Copyright (c) 2025 Winlin
|
|
//
|
|
// SPDX-License-Identifier: MIT
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"srs-proxy/errors"
|
|
"srs-proxy/logger"
|
|
)
|
|
|
|
// srsHTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP,
|
|
// to proxy other HTTP API of SRS like the streams and clients, etc.
|
|
type srsHTTPAPIServer struct {
|
|
// The underlayer HTTP server.
|
|
server *http.Server
|
|
// The WebRTC server.
|
|
rtc *srsWebRTCServer
|
|
// The gracefully quit timeout, wait server to quit.
|
|
gracefulQuitTimeout time.Duration
|
|
// The wait group for all goroutines.
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewSRSHTTPAPIServer(opts ...func(*srsHTTPAPIServer)) *srsHTTPAPIServer {
|
|
v := &srsHTTPAPIServer{}
|
|
for _, opt := range opts {
|
|
opt(v)
|
|
}
|
|
return v
|
|
}
|
|
|
|
func (v *srsHTTPAPIServer) Close() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
|
|
defer cancel()
|
|
v.server.Shutdown(ctx)
|
|
|
|
v.wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
|
|
// Parse address to listen.
|
|
addr := envHttpAPI()
|
|
if !strings.Contains(addr, ":") {
|
|
addr = ":" + addr
|
|
}
|
|
|
|
// Create server and handler.
|
|
mux := http.NewServeMux()
|
|
v.server = &http.Server{Addr: addr, Handler: mux}
|
|
logger.Df(ctx, "HTTP API server listen at %v", addr)
|
|
|
|
// Shutdown the server gracefully when quiting.
|
|
go func() {
|
|
ctxParent := ctx
|
|
<-ctxParent.Done()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
|
|
defer cancel()
|
|
|
|
v.server.Shutdown(ctx)
|
|
}()
|
|
|
|
// The basic version handler, also can be used as health check API.
|
|
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
|
|
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
|
|
apiResponse(ctx, w, r, map[string]string{
|
|
"signature": Signature(),
|
|
"version": Version(),
|
|
})
|
|
})
|
|
|
|
// The WebRTC WHIP API handler.
|
|
logger.Df(ctx, "Handle /rtc/v1/whip/ by %v", addr)
|
|
mux.HandleFunc("/rtc/v1/whip/", func(w http.ResponseWriter, r *http.Request) {
|
|
if err := v.rtc.HandleApiForWHIP(ctx, w, r); err != nil {
|
|
apiError(ctx, w, r, err)
|
|
}
|
|
})
|
|
|
|
// The WebRTC WHEP API handler.
|
|
logger.Df(ctx, "Handle /rtc/v1/whep/ by %v", addr)
|
|
mux.HandleFunc("/rtc/v1/whep/", func(w http.ResponseWriter, r *http.Request) {
|
|
if err := v.rtc.HandleApiForWHEP(ctx, w, r); err != nil {
|
|
apiError(ctx, w, r, err)
|
|
}
|
|
})
|
|
|
|
// Run HTTP API server.
|
|
v.wg.Add(1)
|
|
go func() {
|
|
defer v.wg.Done()
|
|
|
|
err := v.server.ListenAndServe()
|
|
if err != nil {
|
|
if ctx.Err() != context.Canceled {
|
|
// TODO: If HTTP API server closed unexpectedly, we should notice the main loop to quit.
|
|
logger.Wf(ctx, "HTTP API accept err %+v", err)
|
|
} else {
|
|
logger.Df(ctx, "HTTP API server done")
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// systemAPI is the system HTTP API of the proxy server, for SRS media server to register the service
|
|
// to proxy server. It also provides some other system APIs like the status of proxy server, like exporter
|
|
// for Prometheus metrics.
|
|
type systemAPI struct {
|
|
// The underlayer HTTP server.
|
|
server *http.Server
|
|
// The gracefully quit timeout, wait server to quit.
|
|
gracefulQuitTimeout time.Duration
|
|
// The wait group for all goroutines.
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewSystemAPI(opts ...func(*systemAPI)) *systemAPI {
|
|
v := &systemAPI{}
|
|
for _, opt := range opts {
|
|
opt(v)
|
|
}
|
|
return v
|
|
}
|
|
|
|
func (v *systemAPI) Close() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
|
|
defer cancel()
|
|
v.server.Shutdown(ctx)
|
|
|
|
v.wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (v *systemAPI) Run(ctx context.Context) error {
|
|
// Parse address to listen.
|
|
addr := envSystemAPI()
|
|
if !strings.Contains(addr, ":") {
|
|
addr = ":" + addr
|
|
}
|
|
|
|
// Create server and handler.
|
|
mux := http.NewServeMux()
|
|
v.server = &http.Server{Addr: addr, Handler: mux}
|
|
logger.Df(ctx, "System API server listen at %v", addr)
|
|
|
|
// Shutdown the server gracefully when quiting.
|
|
go func() {
|
|
ctxParent := ctx
|
|
<-ctxParent.Done()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
|
|
defer cancel()
|
|
|
|
v.server.Shutdown(ctx)
|
|
}()
|
|
|
|
// The basic version handler, also can be used as health check API.
|
|
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
|
|
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
|
|
apiResponse(ctx, w, r, map[string]string{
|
|
"signature": Signature(),
|
|
"version": Version(),
|
|
})
|
|
})
|
|
|
|
// The register service for SRS media servers.
|
|
logger.Df(ctx, "Handle /api/v1/srs/register by %v", addr)
|
|
mux.HandleFunc("/api/v1/srs/register", func(w http.ResponseWriter, r *http.Request) {
|
|
if err := func() error {
|
|
var deviceID, ip, serverID, serviceID, pid string
|
|
var rtmp, stream, api, srt, rtc []string
|
|
if err := ParseBody(r.Body, &struct {
|
|
// The IP of SRS, mandatory.
|
|
IP *string `json:"ip"`
|
|
// The server id of SRS, store in file, may not change, mandatory.
|
|
ServerID *string `json:"server"`
|
|
// The service id of SRS, always change when restarted, mandatory.
|
|
ServiceID *string `json:"service"`
|
|
// The process id of SRS, always change when restarted, mandatory.
|
|
PID *string `json:"pid"`
|
|
// The RTMP listen endpoints, mandatory.
|
|
RTMP *[]string `json:"rtmp"`
|
|
// The HTTP Stream listen endpoints, optional.
|
|
HTTP *[]string `json:"http"`
|
|
// The API listen endpoints, optional.
|
|
API *[]string `json:"api"`
|
|
// The SRT listen endpoints, optional.
|
|
SRT *[]string `json:"srt"`
|
|
// The RTC listen endpoints, optional.
|
|
RTC *[]string `json:"rtc"`
|
|
// The device id of SRS, optional.
|
|
DeviceID *string `json:"device_id"`
|
|
}{
|
|
IP: &ip, DeviceID: &deviceID,
|
|
ServerID: &serverID, ServiceID: &serviceID, PID: &pid,
|
|
RTMP: &rtmp, HTTP: &stream, API: &api, SRT: &srt, RTC: &rtc,
|
|
}); err != nil {
|
|
return errors.Wrapf(err, "parse body")
|
|
}
|
|
|
|
if ip == "" {
|
|
return errors.Errorf("empty ip")
|
|
}
|
|
if serverID == "" {
|
|
return errors.Errorf("empty server")
|
|
}
|
|
if serviceID == "" {
|
|
return errors.Errorf("empty service")
|
|
}
|
|
if pid == "" {
|
|
return errors.Errorf("empty pid")
|
|
}
|
|
if len(rtmp) == 0 {
|
|
return errors.Errorf("empty rtmp")
|
|
}
|
|
|
|
server := NewSRSServer(func(srs *SRSServer) {
|
|
srs.IP, srs.DeviceID = ip, deviceID
|
|
srs.ServerID, srs.ServiceID, srs.PID = serverID, serviceID, pid
|
|
srs.RTMP, srs.HTTP, srs.API = rtmp, stream, api
|
|
srs.SRT, srs.RTC = srt, rtc
|
|
srs.UpdatedAt = time.Now()
|
|
})
|
|
if err := srsLoadBalancer.Update(ctx, server); err != nil {
|
|
return errors.Wrapf(err, "update SRS server %+v", server)
|
|
}
|
|
|
|
logger.Df(ctx, "Register SRS media server, %+v", server)
|
|
return nil
|
|
}(); err != nil {
|
|
apiError(ctx, w, r, err)
|
|
}
|
|
|
|
type Response struct {
|
|
Code int `json:"code"`
|
|
PID string `json:"pid"`
|
|
}
|
|
|
|
apiResponse(ctx, w, r, &Response{
|
|
Code: 0, PID: fmt.Sprintf("%v", os.Getpid()),
|
|
})
|
|
})
|
|
|
|
// Run System API server.
|
|
v.wg.Add(1)
|
|
go func() {
|
|
defer v.wg.Done()
|
|
|
|
err := v.server.ListenAndServe()
|
|
if err != nil {
|
|
if ctx.Err() != context.Canceled {
|
|
// TODO: If System API server closed unexpectedly, we should notice the main loop to quit.
|
|
logger.Wf(ctx, "System API accept err %+v", err)
|
|
} else {
|
|
logger.Df(ctx, "System API server done")
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|