You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
srs/trunk/3rdparty/srs-bench/gb28181/util.go

362 lines
9.0 KiB
Go

// The MIT License (MIT)
//
// # Copyright (c) 2022-2025 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package gb28181
import (
"bufio"
"context"
"flag"
"fmt"
"github.com/ghettovoice/gosip/sip"
"github.com/ossrs/go-oryx-lib/aac"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/yapingcat/gomedia/mpeg2"
"io"
"net"
"net/url"
"os"
"path"
"strings"
"time"
)
var srsLog *bool
var srsTimeout *int
var srsPublishVideoFps *int
var srsSipAddr *string
var srsSipUser *string
var srsSipRandomID *int
var srsSipDomain *string
var srsSipSvrID *string
var srsMediaTimeout *int
var srsReinviteTimeout *int
var srsPublishAudio *string
var srsPublishVideo *string
func prepareTest() (err error) {
srsSipAddr = flag.String("srs-sip", "tcp://127.0.0.1:5060", "The SRS GB server to connect to")
srsSipUser = flag.String("srs-stream", "3402000000", "The GB user/stream to publish")
srsSipRandomID = flag.Int("srs-random", 10, "The GB user/stream random suffix to publish")
srsSipDomain = flag.String("srs-domain", "3402000000", "The GB SIP domain")
srsSipSvrID = flag.String("srs-server", "34020000002000000001", "The GB server ID for SIP")
srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
srsTimeout = flag.Int("srs-timeout", 11000, "For each case, the timeout in ms")
srsMediaTimeout = flag.Int("srs-media-timeout", 2100, "PS media disconnect timeout in ms")
srsReinviteTimeout = flag.Int("srs-reinvite-timeout", 1200, "When disconnect, SIP re-invite timeout in ms")
srsPublishAudio = flag.String("srs-publish-audio", "avatar.aac", "The audio file for publisher.")
srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher. Note that *.h264 is for AVC, *.h265 is for HEVC.")
srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.")
// Should parse it first.
flag.Parse()
// Check file.
tryOpenFile := func(filename string) (string, error) {
if filename == "" {
return filename, nil
}
f, err := os.Open(filename)
if err != nil {
nfilename := path.Join("../", filename)
f2, err := os.Open(nfilename)
if err != nil {
return filename, errors.Wrapf(err, "No video file at %v or %v", filename, nfilename)
}
defer f2.Close()
return nfilename, nil
}
defer f.Close()
return filename, nil
}
if *srsPublishVideo, err = tryOpenFile(*srsPublishVideo); err != nil {
return err
}
if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil {
return err
}
return nil
}
type GBTestSession struct {
session *GBSession
}
func NewGBTestSession() *GBTestSession {
sipConfig := SIPConfig{
addr: *srsSipAddr,
domain: *srsSipDomain,
user: *srsSipUser,
random: *srsSipRandomID,
server: *srsSipSvrID,
}
return &GBTestSession{
session: NewGBSession(&GBSessionConfig{
regTimeout: time.Duration(*srsTimeout) * 5 * time.Minute,
inviteTimeout: time.Duration(*srsTimeout) * 5 * time.Minute,
}, &sipConfig),
}
}
func (v *GBTestSession) Close() error {
v.session.Close()
return nil
}
func (v *GBTestSession) Run(ctx context.Context) (err error) {
if err = v.session.Connect(ctx); err != nil {
return errors.Wrap(err, "connect")
}
if err = v.session.Register(ctx); err != nil {
return errors.Wrap(err, "register")
}
if err = v.session.Invite(ctx); err != nil {
return errors.Wrap(err, "invite")
}
return nil
}
type GBTestPublisher struct {
session *GBSession
ingester *PSIngester
}
func NewGBTestPublisher() *GBTestPublisher {
sipConfig := SIPConfig{
addr: *srsSipAddr,
domain: *srsSipDomain,
user: *srsSipUser,
random: *srsSipRandomID,
server: *srsSipSvrID,
}
psConfig := PSConfig{
video: *srsPublishVideo,
fps: *srsPublishVideoFps,
audio: *srsPublishAudio,
}
return &GBTestPublisher{
session: NewGBSession(&GBSessionConfig{
regTimeout: time.Duration(*srsTimeout) * 5 * time.Minute,
inviteTimeout: time.Duration(*srsTimeout) * 5 * time.Minute,
}, &sipConfig),
ingester: NewPSIngester(&IngesterConfig{
psConfig: psConfig,
}),
}
}
func (v *GBTestPublisher) Close() error {
v.ingester.Close()
v.session.Close()
return nil
}
func (v *GBTestPublisher) Run(ctx context.Context) (err error) {
if err = v.session.Connect(ctx); err != nil {
return errors.Wrap(err, "connect")
}
if err = v.session.Register(ctx); err != nil {
return errors.Wrap(err, "register")
}
if err = v.session.Invite(ctx); err != nil {
return errors.Wrap(err, "invite")
}
serverAddr, err := utilBuildMediaAddr(v.session.sip.conf.addr, v.session.out.mediaPort)
if err != nil {
return errors.Wrap(err, "parse")
}
v.ingester.conf.serverAddr = serverAddr
v.ingester.conf.ssrc = uint32(v.session.out.ssrc)
v.ingester.conf.clockRate = v.session.out.clockRate
v.ingester.conf.payloadType = uint8(v.session.out.payloadType)
if err := v.ingester.Ingest(ctx); err != nil {
return errors.Wrap(err, "ingest")
}
return nil
}
// Filter the test error, ignore context.Canceled
func filterTestError(errs ...error) error {
var filteredErrors []error
for _, err := range errs {
if err == nil || errors.Cause(err) == context.Canceled {
continue
}
// If url error, server maybe error, do not print the detail log.
if r0 := errors.Cause(err); r0 != nil {
if r1, ok := r0.(*url.Error); ok {
err = r1
}
}
filteredErrors = append(filteredErrors, err)
}
if len(filteredErrors) == 0 {
return nil
}
if len(filteredErrors) == 1 {
return filteredErrors[0]
}
var descs []string
for i, err := range filteredErrors[1:] {
descs = append(descs, fmt.Sprintf("err #%d, %+v", i, err))
}
return errors.Wrapf(filteredErrors[0], "with %v", strings.Join(descs, ","))
}
type wallClock struct {
start time.Time
duration time.Duration
}
func newWallClock() *wallClock {
return &wallClock{start: time.Now()}
}
func (v *wallClock) Tick(d time.Duration) time.Duration {
v.duration += d
wc := time.Now().Sub(v.start)
re := v.duration - wc
if re > 30*time.Millisecond {
return re
}
return 0
}
func sipGetCallID(m sip.Message) string {
if v, ok := m.CallID(); !ok {
return ""
} else {
return v.Value()
}
}
func utilBuildMediaAddr(addr string, mediaPort int64) (string, error) {
if u, err := url.Parse(addr); err != nil {
return "", errors.Wrapf(err, "parse %v", addr)
} else if addr, err := net.ResolveTCPAddr(u.Scheme, u.Host); err != nil {
return "", errors.Wrapf(err, "parse %v scheme=%v, host=%v", addr, u.Scheme, u.Host)
} else {
return fmt.Sprintf("%v://%v:%v",
u.Scheme, addr.IP.String(), mediaPort,
), nil
}
}
// See SrsMpegPES::decode
func utilUpdatePesPacketLength(pes *mpeg2.PesPacket) {
var nb_required int
if pes.PTS_DTS_flags == 0x2 {
nb_required += 5
}
if pes.PTS_DTS_flags == 0x3 {
nb_required += 10
}
if pes.ESCR_flag > 0 {
nb_required += 6
}
if pes.ES_rate_flag > 0 {
nb_required += 3
}
if pes.DSM_trick_mode_flag > 0 {
nb_required += 1
}
if pes.Additional_copy_info_flag > 0 {
nb_required += 1
}
if pes.PES_CRC_flag > 0 {
nb_required += 2
}
if pes.PES_extension_flag > 0 {
nb_required += 1
}
// Size before PES_header_data_length.
const fixed = uint16(3)
// Size after PES_header_data_length.
pes.PES_header_data_length = uint8(nb_required)
// Size after PES_packet_length
pes.PES_packet_length = uint16(len(pes.Pes_payload)) + fixed + uint16(pes.PES_header_data_length)
}
type AACReader struct {
codec aac.ADTS
r *bufio.Reader
}
func NewAACReader(f io.Reader) (*AACReader, error) {
v := &AACReader{}
var err error
if v.codec, err = aac.NewADTS(); err != nil {
return nil, err
}
v.r = bufio.NewReaderSize(f, 4096)
b, err := v.r.Peek(7 + 1024)
if err != nil {
return nil, err
}
if _, _, err = v.codec.Decode(b); err != nil {
return nil, err
}
return v, nil
}
func (v *AACReader) NextADTSFrame() ([]byte, error) {
b, err := v.r.Peek(7 + 1024)
if err != nil {
return nil, err
}
_, left, err := v.codec.Decode(b)
if err != nil {
return nil, err
}
adts := b[:len(b)-len(left)]
if _, err = v.r.Discard(len(adts)); err != nil {
return nil, err
}
return adts, nil
}