mirror of https://github.com/ossrs/srs.git
Test: Support blackbox test by FFmpeg. v5.0.128 (#3355)
1. Enable blackbox test for each PR and push. 2. Refine Makefile and README for srs-bench. 3. Add blackbox using FFmpeg and ffprobe. 4. Add blackbox basic test for RTMP stream. 5. Add blackbox basic test for HTTP-FLV stream. 6. Fix utest rand seed issue.pull/3358/head
parent
dd0f398296
commit
2141d220b4
@ -0,0 +1,21 @@
|
|||||||
|
// The MIT License (MIT)
|
||||||
|
//
|
||||||
|
// # Copyright (c) 2023 Winlin
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
// the Software without restriction, including without limitation the rights to
|
||||||
|
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||||
|
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||||
|
// subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in all
|
||||||
|
// copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||||
|
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||||
|
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||||
|
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||||
|
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
package blackbox
|
@ -0,0 +1,50 @@
|
|||||||
|
// The MIT License (MIT)
|
||||||
|
//
|
||||||
|
// # Copyright (c) 2023 Winlin
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
// the Software without restriction, including without limitation the rights to
|
||||||
|
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||||
|
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||||
|
// subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in all
|
||||||
|
// copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||||
|
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||||
|
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||||
|
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||||
|
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
package blackbox
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/ossrs/go-oryx-lib/logger"
|
||||||
|
"io/ioutil"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
if err := prepareTest(); err != nil {
|
||||||
|
logger.Ef(nil, "Prepare test fail, err %+v", err)
|
||||||
|
os.Exit(-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disable the logger during all tests.
|
||||||
|
if *srsLog == false {
|
||||||
|
olw := logger.Switch(ioutil.Discard)
|
||||||
|
defer func() {
|
||||||
|
logger.Switch(olw)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init rand seed.
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
@ -0,0 +1,186 @@
|
|||||||
|
// The MIT License (MIT)
|
||||||
|
//
|
||||||
|
// # Copyright (c) 2023 Winlin
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
// the Software without restriction, including without limitation the rights to
|
||||||
|
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||||
|
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||||
|
// subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in all
|
||||||
|
// copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||||
|
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||||
|
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||||
|
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||||
|
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
package blackbox
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/ossrs/go-oryx-lib/errors"
|
||||||
|
"github.com/ossrs/go-oryx-lib/logger"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRtmpPublish_RtmpPlay_Basic(t *testing.T) {
|
||||||
|
// This case is run in parallel.
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Setup the max timeout for this case.
|
||||||
|
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Check a set of errors.
|
||||||
|
var r0, r1, r2, r3 error
|
||||||
|
defer func(ctx context.Context) {
|
||||||
|
if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil {
|
||||||
|
t.Errorf("Fail for err %+v", err)
|
||||||
|
} else {
|
||||||
|
logger.Tf(ctx, "test done with err %+v", err)
|
||||||
|
}
|
||||||
|
}(ctx)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
defer wg.Wait()
|
||||||
|
|
||||||
|
// Start SRS server and wait for it to be ready.
|
||||||
|
svr := NewSRSServer(func(v *srsServer) {
|
||||||
|
v.envs = []string{
|
||||||
|
fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
r0 = svr.Run(ctx, cancel)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Start FFmpeg to publish stream.
|
||||||
|
streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
streamURL := fmt.Sprintf("rtmp://localhost:%v/live/%v", svr.RTMPPort(), streamID)
|
||||||
|
ffmpeg := NewFFmpeg(func(v *ffmpegClient) {
|
||||||
|
v.args = []string{
|
||||||
|
"-stream_loop", "-1", "-re", "-i", *srsPublishAvatar, "-c", "copy", "-f", "flv", streamURL,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-svr.ReadyCtx().Done()
|
||||||
|
r1 = ffmpeg.Run(ctx, cancel)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Start FFprobe to detect and verify stream.
|
||||||
|
duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
|
||||||
|
ffprobe := NewFFprobe(func(v *ffprobeClient) {
|
||||||
|
v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.mp4", streamID))
|
||||||
|
v.streamURL, v.duration, v.timeout = streamURL, duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
|
||||||
|
})
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-svr.ReadyCtx().Done()
|
||||||
|
r2 = ffprobe.Run(ctx, cancel)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Fast quit for probe done.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-ffprobe.ProbeDoneCtx().Done():
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
str, m := ffprobe.Result()
|
||||||
|
if len(m.Streams) != 2 {
|
||||||
|
r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRtmpPublish_FlvPlay_Basic(t *testing.T) {
|
||||||
|
// This case is run in parallel.
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Setup the max timeout for this case.
|
||||||
|
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Check a set of errors.
|
||||||
|
var r0, r1, r2, r3 error
|
||||||
|
defer func(ctx context.Context) {
|
||||||
|
if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil {
|
||||||
|
t.Errorf("Fail for err %+v", err)
|
||||||
|
} else {
|
||||||
|
logger.Tf(ctx, "test done with err %+v", err)
|
||||||
|
}
|
||||||
|
}(ctx)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
defer wg.Wait()
|
||||||
|
|
||||||
|
// Start SRS server and wait for it to be ready.
|
||||||
|
svr := NewSRSServer(func(v *srsServer) {
|
||||||
|
v.envs = []string{
|
||||||
|
fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen),
|
||||||
|
"SRS_HTTP_SERVER_ENABLED=on",
|
||||||
|
fmt.Sprintf("SRS_HTTP_SERVER_LISTEN=%v", v.httpListen),
|
||||||
|
"SRS_VHOST_HTTP_REMUX_ENABLED=on",
|
||||||
|
}
|
||||||
|
})
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
r0 = svr.Run(ctx, cancel)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Start FFmpeg to publish stream.
|
||||||
|
streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
streamURL := fmt.Sprintf("rtmp://localhost:%v/live/%v", svr.RTMPPort(), streamID)
|
||||||
|
ffmpeg := NewFFmpeg(func(v *ffmpegClient) {
|
||||||
|
v.args = []string{
|
||||||
|
"-stream_loop", "-1", "-re", "-i", *srsPublishAvatar, "-c", "copy", "-f", "flv", streamURL,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-svr.ReadyCtx().Done()
|
||||||
|
r1 = ffmpeg.Run(ctx, cancel)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Start FFprobe to detect and verify stream.
|
||||||
|
duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
|
||||||
|
ffprobe := NewFFprobe(func(v *ffprobeClient) {
|
||||||
|
v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.mp4", streamID))
|
||||||
|
v.streamURL = fmt.Sprintf("http://localhost:%v/live/%v.flv", svr.HTTPPort(), streamID)
|
||||||
|
v.duration, v.timeout = duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
|
||||||
|
})
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
<-svr.ReadyCtx().Done()
|
||||||
|
r2 = ffprobe.Run(ctx, cancel)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Fast quit for probe done.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-ffprobe.ProbeDoneCtx().Done():
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
str, m := ffprobe.Result()
|
||||||
|
if len(m.Streams) != 2 {
|
||||||
|
r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,999 @@
|
|||||||
|
// The MIT License (MIT)
|
||||||
|
//
|
||||||
|
// # Copyright (c) 2023 Winlin
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
// the Software without restriction, including without limitation the rights to
|
||||||
|
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||||
|
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||||
|
// subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in all
|
||||||
|
// copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||||
|
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||||
|
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||||
|
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||||
|
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
package blackbox
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"github.com/ossrs/go-oryx-lib/errors"
|
||||||
|
"github.com/ossrs/go-oryx-lib/logger"
|
||||||
|
"io/ioutil"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var srsLog *bool
|
||||||
|
var srsStdout *bool
|
||||||
|
var srsFFmpegStderr *bool
|
||||||
|
var srsDVRStderr *bool
|
||||||
|
var srsFFprobeStdout *bool
|
||||||
|
|
||||||
|
var srsTimeout *int
|
||||||
|
var srsFFprobeDuration *int
|
||||||
|
var srsFFprobeTimeout *int
|
||||||
|
|
||||||
|
var srsBinary *string
|
||||||
|
var srsFFmpeg *string
|
||||||
|
var srsFFprobe *string
|
||||||
|
|
||||||
|
var srsPublishAvatar *string
|
||||||
|
|
||||||
|
func prepareTest() (err error) {
|
||||||
|
srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
|
||||||
|
srsStdout = flag.Bool("srs-stdout", false, "Whether enable the SRS stdout log")
|
||||||
|
srsFFmpegStderr = flag.Bool("srs-ffmpeg-stderr", false, "Whether enable the FFmpeg stderr log")
|
||||||
|
srsDVRStderr = flag.Bool("srs-dvr-stderr", false, "Whether enable the DVR stderr log")
|
||||||
|
srsFFprobeStdout = flag.Bool("srs-ffprobe-stdout", false, "Whether enable the FFprobe stdout log")
|
||||||
|
srsTimeout = flag.Int("srs-timeout", 64000, "For each case, the timeout in ms")
|
||||||
|
srsFFprobeDuration = flag.Int("srs-ffprobe-duration", 16000, "For each case, the duration for ffprobe in ms")
|
||||||
|
srsFFprobeTimeout = flag.Int("srs-ffprobe-timeout", 21000, "For each case, the timeout for ffprobe in ms")
|
||||||
|
srsBinary = flag.String("srs-binary", "../../objs/srs", "The binary to start SRS server")
|
||||||
|
srsFFmpeg = flag.String("srs-ffmpeg", "ffmpeg", "The FFmpeg tool")
|
||||||
|
srsFFprobe = flag.String("srs-ffprobe", "ffprobe", "The FFprobe tool")
|
||||||
|
srsPublishAvatar = flag.String("srs-publish-avatar", "avatar.flv", "The avatar file for publisher.")
|
||||||
|
|
||||||
|
// Parse user options.
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
// Try to locate file.
|
||||||
|
tryOpenFile := func(filename string) (string, error) {
|
||||||
|
// Match if file exists.
|
||||||
|
if _, err := os.Stat(filename); err == nil {
|
||||||
|
return filename, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we run in GoLand, the current directory is in blackbox, so we use parent directory.
|
||||||
|
nFilename := path.Join("../", filename)
|
||||||
|
if _, err := os.Stat(nFilename); err == nil {
|
||||||
|
return nFilename, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to find file by which if it's a command like ffmpeg.
|
||||||
|
cmd := exec.Command("which", filename)
|
||||||
|
cmd.Env = []string{"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"}
|
||||||
|
if v, err := cmd.Output(); err == nil {
|
||||||
|
return strings.TrimSpace(string(v)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return filename, errors.Errorf("file %v not found", filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check and relocate path of tools.
|
||||||
|
if *srsBinary, err = tryOpenFile(*srsBinary); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *srsFFmpeg, err = tryOpenFile(*srsFFmpeg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *srsFFprobe, err = tryOpenFile(*srsFFprobe); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *srsPublishAvatar, err = tryOpenFile(*srsPublishAvatar); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter the test error, ignore context.Canceled
|
||||||
|
func filterTestError(errs ...error) error {
|
||||||
|
var filteredErrors []error
|
||||||
|
|
||||||
|
for _, err := range errs {
|
||||||
|
if err == nil || errors.Cause(err) == context.Canceled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If url error, server maybe error, do not print the detail log.
|
||||||
|
if r0 := errors.Cause(err); r0 != nil {
|
||||||
|
if r1, ok := r0.(*url.Error); ok {
|
||||||
|
err = r1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
filteredErrors = append(filteredErrors, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(filteredErrors) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if len(filteredErrors) == 1 {
|
||||||
|
return filteredErrors[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
var descs []string
|
||||||
|
for i, err := range filteredErrors[1:] {
|
||||||
|
descs = append(descs, fmt.Sprintf("err #%d, %+v", i, err))
|
||||||
|
}
|
||||||
|
return errors.Wrapf(filteredErrors[0], "with %v", strings.Join(descs, ","))
|
||||||
|
}
|
||||||
|
|
||||||
|
// The SRSPortAllocator is SRS port manager.
|
||||||
|
type SRSPortAllocator struct {
|
||||||
|
ports sync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSRSPortAllocator() *SRSPortAllocator {
|
||||||
|
return &SRSPortAllocator{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *SRSPortAllocator) Allocate() int {
|
||||||
|
for i := 0; i < 1024; i++ {
|
||||||
|
port := 10000 + rand.Int()%50000
|
||||||
|
if _, ok := v.ports.LoadOrStore(port, true); !ok {
|
||||||
|
return port
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
panic("Allocate port failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *SRSPortAllocator) Free(port int) {
|
||||||
|
v.ports.Delete(port)
|
||||||
|
}
|
||||||
|
|
||||||
|
var allocator *SRSPortAllocator
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
allocator = NewSRSPortAllocator()
|
||||||
|
}
|
||||||
|
|
||||||
|
type backendService struct {
|
||||||
|
// The context for case.
|
||||||
|
caseCtx context.Context
|
||||||
|
caseCtxCancel context.CancelFunc
|
||||||
|
|
||||||
|
// When SRS process started.
|
||||||
|
readyCtx context.Context
|
||||||
|
readyCtxCancel context.CancelFunc
|
||||||
|
|
||||||
|
// Whether already closed.
|
||||||
|
closedCtx context.Context
|
||||||
|
closedCtxCancel context.CancelFunc
|
||||||
|
|
||||||
|
// All goroutines
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
// The name, args and env for cmd.
|
||||||
|
name string
|
||||||
|
args []string
|
||||||
|
env []string
|
||||||
|
|
||||||
|
// The process stdout and stderr.
|
||||||
|
stdout bytes.Buffer
|
||||||
|
stderr bytes.Buffer
|
||||||
|
// The process error.
|
||||||
|
r0 error
|
||||||
|
// The process pid.
|
||||||
|
pid int
|
||||||
|
|
||||||
|
// Hooks for owner.
|
||||||
|
onBeforeStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
|
||||||
|
onAfterStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
|
||||||
|
onStop func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error
|
||||||
|
onDispose func(ctx context.Context, bs *backendService) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBackendService(opts ...func(v *backendService)) *backendService {
|
||||||
|
v := &backendService{}
|
||||||
|
|
||||||
|
v.readyCtx, v.readyCtxCancel = context.WithCancel(context.Background())
|
||||||
|
v.closedCtx, v.closedCtxCancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *backendService) Close() error {
|
||||||
|
if v.closedCtx.Err() != nil {
|
||||||
|
return v.r0
|
||||||
|
}
|
||||||
|
v.closedCtxCancel()
|
||||||
|
|
||||||
|
if v.caseCtxCancel != nil {
|
||||||
|
v.caseCtxCancel()
|
||||||
|
}
|
||||||
|
if v.readyCtxCancel != nil {
|
||||||
|
v.readyCtxCancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
v.wg.Wait()
|
||||||
|
|
||||||
|
if v.onDispose != nil {
|
||||||
|
v.onDispose(v.caseCtx, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Tf(v.caseCtx, "Service is closed, pid=%v, r0=%v", v.pid, v.r0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *backendService) ReadyCtx() context.Context {
|
||||||
|
return v.readyCtx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) error {
|
||||||
|
// Start SRS with -e, which only use environment variables.
|
||||||
|
cmd := exec.Command(v.name, v.args...)
|
||||||
|
|
||||||
|
// If not started, we also need to callback the onStop.
|
||||||
|
var processStarted bool
|
||||||
|
defer func() {
|
||||||
|
if v.onStop != nil && !processStarted {
|
||||||
|
v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Ignore if already error.
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the context of case.
|
||||||
|
v.caseCtx, v.caseCtxCancel = ctx, cancel
|
||||||
|
|
||||||
|
// Setup stdout and stderr.
|
||||||
|
cmd.Stdout = &v.stdout
|
||||||
|
cmd.Stderr = &v.stderr
|
||||||
|
cmd.Env = v.env
|
||||||
|
if v.onBeforeStart != nil {
|
||||||
|
if err := v.onBeforeStart(ctx, v, cmd); err != nil {
|
||||||
|
return errors.Wrapf(err, "onBeforeStart failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to start the SRS server.
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer v.Close()
|
||||||
|
|
||||||
|
// Now process started, query the pid.
|
||||||
|
v.pid = cmd.Process.Pid
|
||||||
|
v.readyCtxCancel()
|
||||||
|
processStarted = true
|
||||||
|
if v.onAfterStart != nil {
|
||||||
|
if err := v.onAfterStart(ctx, v, cmd); err != nil {
|
||||||
|
return errors.Wrapf(err, "onAfterStart failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The context for SRS process.
|
||||||
|
processDone, processDoneCancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// If SRS process terminated, notify case to stop.
|
||||||
|
v.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer v.wg.Done()
|
||||||
|
|
||||||
|
// When SRS quit, also terminate the case.
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Notify other goroutine, SRS already done.
|
||||||
|
defer processDoneCancel()
|
||||||
|
|
||||||
|
v.r0 = cmd.Wait()
|
||||||
|
if v.onStop != nil {
|
||||||
|
if r1 := v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr); r1 != nil {
|
||||||
|
logger.Wf(ctx, "Process onStop err %v", r1)
|
||||||
|
if v.r0 == nil {
|
||||||
|
v.r0 = r1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// If case terminated, notify SRS process to stop.
|
||||||
|
v.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer v.wg.Done()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// When case terminated, also terminate the SRS process.
|
||||||
|
cmd.Process.Signal(syscall.SIGINT)
|
||||||
|
case <-processDone.Done():
|
||||||
|
// Ignore if already done.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a goroutine to ensure SRS killed.
|
||||||
|
go func() {
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
if processDone.Err() == nil { // Ignore if already done.
|
||||||
|
cmd.Process.Signal(syscall.SIGKILL)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for SRS or case done.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-processDone.Done():
|
||||||
|
}
|
||||||
|
|
||||||
|
return v.r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServiceRunner is an interface to run backend service.
|
||||||
|
type ServiceRunner interface {
|
||||||
|
Run(ctx context.Context, cancel context.CancelFunc) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServiceReadyQuerier is an interface to detect whether service is ready.
|
||||||
|
type ServiceReadyQuerier interface {
|
||||||
|
ReadyCtx() context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
// SRSServer is the interface for SRS server.
|
||||||
|
type SRSServer interface {
|
||||||
|
ServiceRunner
|
||||||
|
ServiceReadyQuerier
|
||||||
|
// WorkDir is the current working directory for SRS.
|
||||||
|
WorkDir() string
|
||||||
|
// RTMPPort is the RTMP stream port.
|
||||||
|
RTMPPort() int
|
||||||
|
// HTTPPort is the HTTP stream port.
|
||||||
|
HTTPPort() int
|
||||||
|
}
|
||||||
|
|
||||||
|
// srsServer is a SRS server instance.
|
||||||
|
type srsServer struct {
|
||||||
|
// The backend service process.
|
||||||
|
process *backendService
|
||||||
|
|
||||||
|
// When SRS process started.
|
||||||
|
readyCtx context.Context
|
||||||
|
readyCtxCancel context.CancelFunc
|
||||||
|
|
||||||
|
// SRS server ID.
|
||||||
|
srsID string
|
||||||
|
// SRS workdir.
|
||||||
|
workDir string
|
||||||
|
// SRS PID file, relative to the workdir.
|
||||||
|
srsRelativePidFile string
|
||||||
|
// SRS server ID cache file, relative to the workdir.
|
||||||
|
srsRelativeIDFile string
|
||||||
|
|
||||||
|
// SRS RTMP server listen port.
|
||||||
|
rtmpListen int
|
||||||
|
// HTTP API listen port.
|
||||||
|
apiListen int
|
||||||
|
// HTTP server listen port.
|
||||||
|
httpListen int
|
||||||
|
|
||||||
|
// The envs from user.
|
||||||
|
envs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSRSServer(opts ...func(v *srsServer)) SRSServer {
|
||||||
|
rid := fmt.Sprintf("%v-%v", os.Getpid(), rand.Int())
|
||||||
|
v := &srsServer{
|
||||||
|
workDir: "./",
|
||||||
|
srsID: fmt.Sprintf("srs-id-%v", rid),
|
||||||
|
process: newBackendService(),
|
||||||
|
}
|
||||||
|
v.readyCtx, v.readyCtxCancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// If we run in GoLand, the current directory is in blackbox, so we use parent directory.
|
||||||
|
if _, err := os.Stat("objs"); err != nil {
|
||||||
|
v.workDir = "../"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do allocate resource.
|
||||||
|
v.srsRelativePidFile = path.Join("objs", fmt.Sprintf("srs-%v.pid", rid))
|
||||||
|
v.srsRelativeIDFile = path.Join("objs", fmt.Sprintf("srs-%v.id", rid))
|
||||||
|
v.rtmpListen = allocator.Allocate()
|
||||||
|
v.apiListen = allocator.Allocate()
|
||||||
|
v.httpListen = allocator.Allocate()
|
||||||
|
|
||||||
|
// Do cleanup.
|
||||||
|
v.process.onDispose = func(ctx context.Context, bs *backendService) error {
|
||||||
|
allocator.Free(v.rtmpListen)
|
||||||
|
allocator.Free(v.apiListen)
|
||||||
|
allocator.Free(v.httpListen)
|
||||||
|
|
||||||
|
pidFile := path.Join(v.workDir, v.srsRelativePidFile)
|
||||||
|
if _, err := os.Stat(pidFile); !os.IsNotExist(err) {
|
||||||
|
os.Remove(pidFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
idFile := path.Join(v.workDir, v.srsRelativeIDFile)
|
||||||
|
if _, err := os.Stat(idFile); !os.IsNotExist(err) {
|
||||||
|
os.Remove(idFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Tf(ctx, "SRS server is closed, id=%v, pid=%v, r0=%v", v.srsID, bs.pid, bs.r0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *srsServer) ReadyCtx() context.Context {
|
||||||
|
return v.readyCtx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *srsServer) RTMPPort() int {
|
||||||
|
return v.rtmpListen
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *srsServer) HTTPPort() int {
|
||||||
|
return v.httpListen
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *srsServer) WorkDir() string {
|
||||||
|
return v.workDir
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error {
|
||||||
|
logger.Tf(ctx, "Starting SRS server, dir=%v, binary=%v, id=%v, pid=%v, rtmp=%v",
|
||||||
|
v.workDir, *srsBinary, v.srsID, v.srsRelativePidFile, v.rtmpListen,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Setup the name and args of process.
|
||||||
|
v.process.name = *srsBinary
|
||||||
|
v.process.args = []string{"-e"}
|
||||||
|
|
||||||
|
// Setup the envrionment variables.
|
||||||
|
v.process.env = []string{
|
||||||
|
// SRS working directory.
|
||||||
|
fmt.Sprintf("SRS_WORK_DIR=%v", v.workDir),
|
||||||
|
// Run in frontend.
|
||||||
|
"SRS_DAEMON=off",
|
||||||
|
// Write logs to stdout and stderr.
|
||||||
|
"SRS_SRS_LOG_FILE=console",
|
||||||
|
// Disable warning for asan.
|
||||||
|
"MallocNanoZone=0",
|
||||||
|
// SRS PID file.
|
||||||
|
fmt.Sprintf("SRS_PID=%v", v.srsRelativePidFile),
|
||||||
|
// SRS ID file.
|
||||||
|
fmt.Sprintf("SRS_SERVER_ID=%v", v.srsID),
|
||||||
|
// HTTP API to detect the service.
|
||||||
|
fmt.Sprintf("SRS_HTTP_API_ENABLED=on"),
|
||||||
|
fmt.Sprintf("SRS_HTTP_API_LISTEN=%v", v.apiListen),
|
||||||
|
// Avoid error for macOS, which ulimit to 256.
|
||||||
|
"SRS_MAX_CONNECTIONS=100",
|
||||||
|
}
|
||||||
|
// Rewrite envs by case.
|
||||||
|
if v.envs != nil {
|
||||||
|
v.process.env = append(v.process.env, v.envs...)
|
||||||
|
}
|
||||||
|
// Allow user to rewrite them.
|
||||||
|
for _, env := range os.Environ() {
|
||||||
|
if strings.HasPrefix(env, "SRS") || strings.HasPrefix(env, "PATH") {
|
||||||
|
v.process.env = append(v.process.env, env)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all goroutine to done.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
defer wg.Wait()
|
||||||
|
|
||||||
|
// Start a task to detect the HTTP API.
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
res, err := http.Get(fmt.Sprintf("http://localhost:%v/api/v1/versions", v.apiListen))
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
b, err := ioutil.ReadAll(res.Body)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Tf(ctx, "SRS API is ready, %v", string(b))
|
||||||
|
v.readyCtxCancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Hooks for process.
|
||||||
|
v.process.onBeforeStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
|
||||||
|
logger.Tf(ctx, "SRS id=%v, env=%v, cmd is %v %v",
|
||||||
|
v.srsID, cmd.Env, bs.name, strings.Join(bs.args, " "))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
v.process.onAfterStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
|
||||||
|
logger.Tf(ctx, "SRS id=%v, pid=%v", v.srsID, bs.pid)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
v.process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
|
||||||
|
// Should be ready when process stop.
|
||||||
|
defer v.readyCtxCancel()
|
||||||
|
|
||||||
|
logger.Tf(ctx, "SRS process pid=%v exit, r0=%v", bs.pid, r0)
|
||||||
|
if *srsStdout == true {
|
||||||
|
logger.Tf(ctx, "SRS process pid=%v, stdout is \n%v", bs.pid, stdout.String())
|
||||||
|
}
|
||||||
|
if stderr.Len() > 0 {
|
||||||
|
logger.Tf(ctx, "SRS process pid=%v, stderr is \n%v", bs.pid, stderr.String())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the process util quit.
|
||||||
|
return v.process.Run(ctx, cancel)
|
||||||
|
}
|
||||||
|
|
||||||
|
type FFmpegClient interface {
|
||||||
|
ServiceRunner
|
||||||
|
ServiceReadyQuerier
|
||||||
|
}
|
||||||
|
|
||||||
|
type ffmpegClient struct {
|
||||||
|
// The backend service process.
|
||||||
|
process *backendService
|
||||||
|
|
||||||
|
// FFmpeg cli args, without ffmpeg binary.
|
||||||
|
args []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFFmpeg(opts ...func(v *ffmpegClient)) FFmpegClient {
|
||||||
|
v := &ffmpegClient{
|
||||||
|
process: newBackendService(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do cleanup.
|
||||||
|
v.process.onDispose = func(ctx context.Context, bs *backendService) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffmpegClient) ReadyCtx() context.Context {
|
||||||
|
return v.process.ReadyCtx()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffmpegClient) Run(ctx context.Context, cancel context.CancelFunc) error {
|
||||||
|
logger.Tf(ctx, "Starting FFmpeg by %v", strings.Join(v.args, " "))
|
||||||
|
|
||||||
|
v.process.name = *srsFFmpeg
|
||||||
|
v.process.args = v.args
|
||||||
|
v.process.env = os.Environ()
|
||||||
|
|
||||||
|
v.process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
|
||||||
|
logger.Tf(ctx, "FFmpeg process pid=%v exit, r0=%v, stdout=%v", bs.pid, r0, stdout.String())
|
||||||
|
if *srsFFmpegStderr && stderr.Len() > 0 {
|
||||||
|
logger.Tf(ctx, "FFmpeg process pid=%v, stderr is \n%v", bs.pid, stderr.String())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return v.process.Run(ctx, cancel)
|
||||||
|
}
|
||||||
|
|
||||||
|
type FFprobeClient interface {
|
||||||
|
ServiceRunner
|
||||||
|
// ProbeDoneCtx indicates the probe is done.
|
||||||
|
ProbeDoneCtx() context.Context
|
||||||
|
// Result return the raw string and metadata.
|
||||||
|
Result() (string, *ffprobeObject)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ffprobeClient struct {
|
||||||
|
// The stream to probe.
|
||||||
|
streamURL string
|
||||||
|
|
||||||
|
// The DVR file for ffprobe. We DVR stream to file, then use ffprobe to detect it.
|
||||||
|
dvrFile string
|
||||||
|
// The duration of video file for DVR.
|
||||||
|
duration time.Duration
|
||||||
|
// The timeout to wait for task to done.
|
||||||
|
timeout time.Duration
|
||||||
|
|
||||||
|
// When probe stream metadata object.
|
||||||
|
doneCtx context.Context
|
||||||
|
doneCancel context.CancelFunc
|
||||||
|
// The metadata object.
|
||||||
|
metadata *ffprobeObject
|
||||||
|
// The raw string of ffprobe.
|
||||||
|
rawString string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFFprobe(opts ...func(v *ffprobeClient)) FFprobeClient {
|
||||||
|
v := &ffprobeClient{
|
||||||
|
metadata: &ffprobeObject{},
|
||||||
|
}
|
||||||
|
v.doneCtx, v.doneCancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffprobeClient) ProbeDoneCtx() context.Context {
|
||||||
|
return v.doneCtx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffprobeClient) Result() (string, *ffprobeObject) {
|
||||||
|
return v.rawString, v.metadata
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffprobeClient) Run(ctxCase context.Context, cancelCase context.CancelFunc) error {
|
||||||
|
ctx, cancel := context.WithTimeout(ctxCase, v.timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
logger.Tf(ctx, "Starting FFprobe for stream=%v, dvr=%v, duration=%v, timeout=%v",
|
||||||
|
v.streamURL, v.dvrFile, v.duration, v.timeout)
|
||||||
|
|
||||||
|
// Try to start a DVR process.
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
// If error, just ignore and retry, because the stream might not be ready. For example, for HLS, the DVR process
|
||||||
|
// might need to wait for a duration of segment, 10s as such.
|
||||||
|
_ = v.doDVR(ctx)
|
||||||
|
|
||||||
|
// Check whether DVR file is ok.
|
||||||
|
if fs, err := os.Stat(v.dvrFile); err == nil && fs.Size() > 1024 {
|
||||||
|
logger.Tf(ctx, "DVR FFprobe file is ok, file=%v, size=%v", v.dvrFile, fs.Size())
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ignore if case terminated.
|
||||||
|
if ctxCase.Err() != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a probe process for the DVR file.
|
||||||
|
return v.doProbe(ctx, cancelCase)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffprobeClient) doDVR(ctx context.Context) error {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
process := newBackendService()
|
||||||
|
process.name = *srsFFmpeg
|
||||||
|
process.args = []string{
|
||||||
|
"-t", fmt.Sprintf("%v", int64(v.duration/time.Second)),
|
||||||
|
"-i", v.streamURL, "-c", "copy", "-y", v.dvrFile,
|
||||||
|
}
|
||||||
|
process.env = os.Environ()
|
||||||
|
|
||||||
|
process.onDispose = func(ctx context.Context, bs *backendService) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
process.onBeforeStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
|
||||||
|
logger.Tf(ctx, "DVR start %v %v", bs.name, strings.Join(bs.args, " "))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
|
||||||
|
logger.Tf(ctx, "DVR process pid=%v exit, r0=%v, stdout=%v", bs.pid, r0, stdout.String())
|
||||||
|
if *srsDVRStderr && stderr.Len() > 0 {
|
||||||
|
logger.Tf(ctx, "DVR process pid=%v, stderr is \n%v", bs.pid, stderr.String())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return process.Run(ctx, cancel)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffprobeClient) doProbe(ctx context.Context, cancel context.CancelFunc) error {
|
||||||
|
process := newBackendService()
|
||||||
|
process.name = *srsFFprobe
|
||||||
|
process.args = []string{
|
||||||
|
"-show_error", "-show_private_data", "-v", "quiet", "-find_stream_info",
|
||||||
|
"-analyzeduration", fmt.Sprintf("%v", int64(v.duration/time.Microsecond)),
|
||||||
|
"-print_format", "json", "-show_format", "-show_streams", v.dvrFile,
|
||||||
|
}
|
||||||
|
process.env = os.Environ()
|
||||||
|
|
||||||
|
process.onDispose = func(ctx context.Context, bs *backendService) error {
|
||||||
|
if _, err := os.Stat(v.dvrFile); !os.IsNotExist(err) {
|
||||||
|
os.Remove(v.dvrFile)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
process.onBeforeStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
|
||||||
|
logger.Tf(ctx, "FFprobe start %v %v", bs.name, strings.Join(bs.args, " "))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
|
||||||
|
logger.Tf(ctx, "FFprobe process pid=%v exit, r0=%v, stderr=%v", bs.pid, r0, stderr.String())
|
||||||
|
if *srsFFprobeStdout && stdout.Len() > 0 {
|
||||||
|
logger.Tf(ctx, "FFprobe process pid=%v, stdout is \n%v", bs.pid, stdout.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
str := stdout.String()
|
||||||
|
v.rawString = str
|
||||||
|
|
||||||
|
if err := json.Unmarshal([]byte(str), v.metadata); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m := v.metadata
|
||||||
|
logger.Tf(ctx, "FFprobe done pid=%v, %v", bs.pid, m.String())
|
||||||
|
|
||||||
|
if ts := 90; m.Format.ProbeScore < ts {
|
||||||
|
return errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str)
|
||||||
|
}
|
||||||
|
if dv := m.Duration(); dv < v.duration {
|
||||||
|
return errors.Errorf("short duration=%v < %v, %v, %v", dv, v.duration, m.String(), str)
|
||||||
|
}
|
||||||
|
|
||||||
|
v.doneCancel()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return process.Run(ctx, cancel)
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
"index": 0,
|
||||||
|
"codec_name": "h264",
|
||||||
|
"codec_long_name": "H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10",
|
||||||
|
"profile": "High",
|
||||||
|
"codec_type": "video",
|
||||||
|
"codec_tag_string": "avc1",
|
||||||
|
"codec_tag": "0x31637661",
|
||||||
|
"width": 768,
|
||||||
|
"height": 320,
|
||||||
|
"coded_width": 768,
|
||||||
|
"coded_height": 320,
|
||||||
|
"closed_captions": 0,
|
||||||
|
"film_grain": 0,
|
||||||
|
"has_b_frames": 2,
|
||||||
|
"sample_aspect_ratio": "1:1",
|
||||||
|
"display_aspect_ratio": "12:5",
|
||||||
|
"pix_fmt": "yuv420p",
|
||||||
|
"level": 32,
|
||||||
|
"chroma_location": "left",
|
||||||
|
"field_order": "progressive",
|
||||||
|
"refs": 1,
|
||||||
|
"is_avc": "true",
|
||||||
|
"nal_length_size": "4",
|
||||||
|
"id": "0x1",
|
||||||
|
"r_frame_rate": "25/1",
|
||||||
|
"avg_frame_rate": "25/1",
|
||||||
|
"time_base": "1/16000",
|
||||||
|
"start_pts": 1280,
|
||||||
|
"start_time": "0.080000",
|
||||||
|
"duration_ts": 160000,
|
||||||
|
"duration": "10.000000",
|
||||||
|
"bit_rate": "196916",
|
||||||
|
"bits_per_raw_sample": "8",
|
||||||
|
"nb_frames": "250",
|
||||||
|
"extradata_size": 41,
|
||||||
|
"disposition": {
|
||||||
|
"default": 1,
|
||||||
|
"dub": 0,
|
||||||
|
"original": 0,
|
||||||
|
"comment": 0,
|
||||||
|
"lyrics": 0,
|
||||||
|
"karaoke": 0,
|
||||||
|
"forced": 0,
|
||||||
|
"hearing_impaired": 0,
|
||||||
|
"visual_impaired": 0,
|
||||||
|
"clean_effects": 0,
|
||||||
|
"attached_pic": 0,
|
||||||
|
"timed_thumbnails": 0,
|
||||||
|
"captions": 0,
|
||||||
|
"descriptions": 0,
|
||||||
|
"metadata": 0,
|
||||||
|
"dependent": 0,
|
||||||
|
"still_image": 0
|
||||||
|
},
|
||||||
|
"tags": {
|
||||||
|
"language": "und",
|
||||||
|
"handler_name": "VideoHandler",
|
||||||
|
"vendor_id": "[0][0][0][0]"
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
/*
|
||||||
|
"index": 1,
|
||||||
|
"codec_name": "aac",
|
||||||
|
"codec_long_name": "AAC (Advanced Audio Coding)",
|
||||||
|
"profile": "LC",
|
||||||
|
"codec_type": "audio",
|
||||||
|
"codec_tag_string": "mp4a",
|
||||||
|
"codec_tag": "0x6134706d",
|
||||||
|
"sample_fmt": "fltp",
|
||||||
|
"sample_rate": "44100",
|
||||||
|
"channels": 2,
|
||||||
|
"channel_layout": "stereo",
|
||||||
|
"bits_per_sample": 0,
|
||||||
|
"id": "0x2",
|
||||||
|
"r_frame_rate": "0/0",
|
||||||
|
"avg_frame_rate": "0/0",
|
||||||
|
"time_base": "1/44100",
|
||||||
|
"start_pts": 132,
|
||||||
|
"start_time": "0.002993",
|
||||||
|
"duration_ts": 441314,
|
||||||
|
"duration": "10.007120",
|
||||||
|
"bit_rate": "29827",
|
||||||
|
"nb_frames": "431",
|
||||||
|
"extradata_size": 2,
|
||||||
|
"disposition": {
|
||||||
|
"default": 1,
|
||||||
|
"dub": 0,
|
||||||
|
"original": 0,
|
||||||
|
"comment": 0,
|
||||||
|
"lyrics": 0,
|
||||||
|
"karaoke": 0,
|
||||||
|
"forced": 0,
|
||||||
|
"hearing_impaired": 0,
|
||||||
|
"visual_impaired": 0,
|
||||||
|
"clean_effects": 0,
|
||||||
|
"attached_pic": 0,
|
||||||
|
"timed_thumbnails": 0,
|
||||||
|
"captions": 0,
|
||||||
|
"descriptions": 0,
|
||||||
|
"metadata": 0,
|
||||||
|
"dependent": 0,
|
||||||
|
"still_image": 0
|
||||||
|
},
|
||||||
|
"tags": {
|
||||||
|
"language": "und",
|
||||||
|
"handler_name": "SoundHandler",
|
||||||
|
"vendor_id": "[0][0][0][0]"
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
type ffprobeObjectMedia struct {
|
||||||
|
Index int `json:"index"`
|
||||||
|
CodecName string `json:"codec_name"`
|
||||||
|
CodecType string `json:"codec_type"`
|
||||||
|
Timebase string `json:"time_base"`
|
||||||
|
Bitrate string `json:"bit_rate"`
|
||||||
|
Profile string `json:"profile"`
|
||||||
|
Duration string `json:"duration"`
|
||||||
|
CodecTagString string `json:"codec_tag_string"`
|
||||||
|
|
||||||
|
// For video codec.
|
||||||
|
Width int `json:"width"`
|
||||||
|
Height int `json:"height"`
|
||||||
|
CodedWidth int `json:"coded_width"`
|
||||||
|
CodedHeight int `json:"coded_height"`
|
||||||
|
RFramerate string `json:"r_frame_rate"`
|
||||||
|
AvgFramerate string `json:"avg_frame_rate"`
|
||||||
|
PixFmt string `json:"pix_fmt"`
|
||||||
|
Level int `json:"level"`
|
||||||
|
|
||||||
|
// For audio codec.
|
||||||
|
Channels int `json:"channels"`
|
||||||
|
ChannelLayout string `json:"channel_layout"`
|
||||||
|
SampleFmt string `json:"sample_fmt"`
|
||||||
|
SampleRate string `json:"sample_rate"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffprobeObjectMedia) String() string {
|
||||||
|
sb := strings.Builder{}
|
||||||
|
|
||||||
|
sb.WriteString(fmt.Sprintf("index=%v, codec=%v, type=%v, tb=%v, bitrate=%v, profile=%v, duration=%v",
|
||||||
|
v.Index, v.CodecName, v.CodecType, v.Timebase, v.Bitrate, v.Profile, v.Duration))
|
||||||
|
sb.WriteString(fmt.Sprintf(", codects=%v", v.CodecTagString))
|
||||||
|
|
||||||
|
if v.CodecType == "video" {
|
||||||
|
sb.WriteString(fmt.Sprintf(", size=%vx%v, csize=%vx%v, rfr=%v, afr=%v, pix=%v, level=%v",
|
||||||
|
v.Width, v.Height, v.CodedWidth, v.CodedHeight, v.RFramerate, v.AvgFramerate, v.PixFmt, v.Level))
|
||||||
|
} else if v.CodecType == "audio" {
|
||||||
|
sb.WriteString(fmt.Sprintf(", channels=%v, layout=%v, fmt=%v, srate=%v",
|
||||||
|
v.Channels, v.ChannelLayout, v.SampleFmt, v.SampleRate))
|
||||||
|
}
|
||||||
|
|
||||||
|
return sb.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
"filename": "../objs/srs-ffprobe-stream-84487-8369019999559815097.mp4",
|
||||||
|
"nb_streams": 2,
|
||||||
|
"nb_programs": 0,
|
||||||
|
"format_name": "mov,mp4,m4a,3gp,3g2,mj2",
|
||||||
|
"format_long_name": "QuickTime / MOV",
|
||||||
|
"start_time": "0.002993",
|
||||||
|
"duration": "10.080000",
|
||||||
|
"size": "292725",
|
||||||
|
"bit_rate": "232321",
|
||||||
|
"probe_score": 100,
|
||||||
|
|
||||||
|
"tags": {
|
||||||
|
"major_brand": "isom",
|
||||||
|
"minor_version": "512",
|
||||||
|
"compatible_brands": "isomiso2avc1mp41",
|
||||||
|
"encoder": "Lavf59.27.100"
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
type ffprobeObjectFormat struct {
|
||||||
|
Filename string `json:"filename"`
|
||||||
|
Duration string `json:"duration"`
|
||||||
|
NBStream int16 `json:"nb_streams"`
|
||||||
|
Size string `json:"size"`
|
||||||
|
Bitrate string `json:"bit_rate"`
|
||||||
|
ProbeScore int `json:"probe_score"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffprobeObjectFormat) String() string {
|
||||||
|
return fmt.Sprintf("file=%v, duration=%v, score=%v, size=%v, bitrate=%v, streams=%v",
|
||||||
|
v.Filename, v.Duration, v.ProbeScore, v.Size, v.Bitrate, v.NBStream)
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
{
|
||||||
|
"streams": [{ffprobeObjectMedia}, {ffprobeObjectMedia}],
|
||||||
|
"format": {ffprobeObjectFormat}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
type ffprobeObject struct {
|
||||||
|
Format ffprobeObjectFormat `json:"format"`
|
||||||
|
Streams []ffprobeObjectMedia `json:"streams"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffprobeObject) String() string {
|
||||||
|
sb := strings.Builder{}
|
||||||
|
sb.WriteString(v.Format.String())
|
||||||
|
sb.WriteString(", [")
|
||||||
|
for _, stream := range v.Streams {
|
||||||
|
sb.WriteString("{")
|
||||||
|
sb.WriteString(stream.String())
|
||||||
|
sb.WriteString("}")
|
||||||
|
}
|
||||||
|
sb.WriteString("]")
|
||||||
|
return sb.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *ffprobeObject) Duration() time.Duration {
|
||||||
|
dv, err := strconv.ParseFloat(v.Format.Duration, 10)
|
||||||
|
if err != nil {
|
||||||
|
return time.Duration(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Duration(dv*1000) * time.Millisecond
|
||||||
|
}
|
Loading…
Reference in New Issue