For #2371: Add regression test for SDP nack (#2487)

* explicitly enable nack, for firefox (#2373)

* For #2371: Add regression test for SDP nack

1. Refine API to change defaule decodes.
2. Add test for publish SDP nack.
3. Add test for play SDP nack.

Co-authored-by: Haibo Chen <495810242@qq.com>
pull/2502/head
Winlin 4 years ago committed by GitHub
parent 43bb9660ca
commit 8c7ba05f9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,7 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="srs-ingest" type="CMakeRunConfiguration" factoryName="Application" PROGRAM_PARAMS="-c conf/clion-ingest.conf" REDIRECT_INPUT="false" ELEVATE="false" USE_EXTERNAL_CONSOLE="false" WORKING_DIR="file://$CMakeCurrentBuildDir$/../../../" PASS_PARENT_ENVS_2="true" PROJECT_NAME="srs" TARGET_NAME="srs" CONFIG_NAME="Debug" RUN_TARGET_PROJECT_NAME="srs" RUN_TARGET_NAME="srs">
<method v="2">
<option name="com.jetbrains.cidr.execution.CidrBuildBeforeRunTaskProvider$BuildBeforeRunTask" enabled="true" />
</method>
</configuration>
</component>

@ -25,11 +25,13 @@ import (
"encoding/json"
"fmt"
"github.com/pion/transport/vnet"
"github.com/pion/webrtc/v3"
"io"
"io/ioutil"
"math/rand"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"
@ -62,7 +64,7 @@ func TestMain(m *testing.M) {
func TestPR2483_RtcStatApi_PublisherOnly(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("publish-only-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
return nil
})
@ -96,11 +98,346 @@ func TestPR2483_RtcStatApi_PublisherOnly(t *testing.T) {
}
}
// Veirfy https://github.com/ossrs/srs/issues/2371
func TestBugfix2371_PublishWithNack(t *testing.T) {
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("bugfix-2371-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(registerMiniCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = func(s *webrtc.SessionDescription) error {
if n := strings.Count(s.SDP, "nack"); n != 2 {
return errors.Errorf("invalid %v nack", n)
}
return nil
}
p.onAnswer = func(s *webrtc.SessionDescription) error {
if n := strings.Count(s.SDP, "nack"); n != 2 {
return errors.Errorf("invalid %v nack", n)
}
cancel()
return nil
}
return nil
})
if err != nil {
return err
}
defer p.Close()
if err := p.Setup(*srsVnetClientIP); err != nil {
return err
}
return p.Run(ctx, cancel)
}()); err != nil {
t.Errorf("err %+v", err)
}
}
// Veirfy https://github.com/ossrs/srs/issues/2371
func TestBugfix2371_PublishWithoutNack(t *testing.T) {
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("bugfix-2371-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(registerMiniCodecsWithoutNack, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = func(s *webrtc.SessionDescription) error {
if n := strings.Count(s.SDP, "nack"); n != 0 {
return errors.Errorf("invalid %v nack", n)
}
return nil
}
p.onAnswer = func(s *webrtc.SessionDescription) error {
if n := strings.Count(s.SDP, "nack"); n != 0 {
return errors.Errorf("invalid %v nack", n)
}
cancel()
return nil
}
return nil
})
if err != nil {
return err
}
defer p.Close()
if err := p.Setup(*srsVnetClientIP); err != nil {
return err
}
return p.Run(ctx, cancel)
}()); err != nil {
t.Errorf("err %+v", err)
}
}
// Veirfy https://github.com/ossrs/srs/issues/2371
func TestBugfix2371_PlayWithNack(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1, r2, r3 error
defer func(ctx context.Context) {
if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil {
t.Errorf("Fail for err %+v", err)
} else {
logger.Tf(ctx, "test done with err %+v", err)
}
}(ctx)
var resources []io.Closer
defer func() {
for _, resource := range resources {
_ = resource.Close()
}
}()
var wg sync.WaitGroup
defer wg.Wait()
// The event notify.
var thePublisher *testPublisher
var thePlayer *testPlayer
mainReady, mainReadyCancel := context.WithCancel(context.Background())
publishReady, publishReadyCancel := context.WithCancel(context.Background())
// Objects init.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
doInit := func() (err error) {
streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int())
// Initialize player with private api.
if thePlayer, err = newTestPlayer(registerMiniCodecs, func(play *testPlayer) error {
play.streamSuffix = streamSuffix
play.onOffer = func(s *webrtc.SessionDescription) error {
if n := strings.Count(s.SDP, "nack"); n != 2 {
return errors.Errorf("invalid %v nack", n)
}
return nil
}
play.onAnswer = func(s *webrtc.SessionDescription) error {
if n := strings.Count(s.SDP, "nack"); n != 2 {
return errors.Errorf("invalid %v nack", n)
}
cancel()
return nil
}
resources = append(resources, play)
return play.Setup(*srsVnetClientIP)
}); err != nil {
return err
}
// Initialize publisher with private api.
if thePublisher, err = newTestPublisher(registerMiniCodecs, func(pub *testPublisher) error {
pub.streamSuffix = streamSuffix
pub.iceReadyCancel = publishReadyCancel
resources = append(resources, pub)
return pub.Setup(*srsVnetClientIP)
}); err != nil {
return err
}
// Init done.
mainReadyCancel()
<-ctx.Done()
return nil
}
if err := doInit(); err != nil {
r1 = err
}
}()
// Run publisher.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-mainReady.Done():
r2 = thePublisher.Run(logger.WithContext(ctx), cancel)
logger.Tf(ctx, "pub done")
}
}()
// Run player.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-publishReady.Done():
r3 = thePlayer.Run(logger.WithContext(ctx), cancel)
logger.Tf(ctx, "play done")
}
}()
}
// Veirfy https://github.com/ossrs/srs/issues/2371
func TestBugfix2371_PlayWithoutNack(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1, r2, r3 error
defer func(ctx context.Context) {
if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil {
t.Errorf("Fail for err %+v", err)
} else {
logger.Tf(ctx, "test done with err %+v", err)
}
}(ctx)
var resources []io.Closer
defer func() {
for _, resource := range resources {
_ = resource.Close()
}
}()
var wg sync.WaitGroup
defer wg.Wait()
// The event notify.
var thePublisher *testPublisher
var thePlayer *testPlayer
mainReady, mainReadyCancel := context.WithCancel(context.Background())
publishReady, publishReadyCancel := context.WithCancel(context.Background())
// Objects init.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
doInit := func() (err error) {
streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int())
// Initialize player with private api.
if thePlayer, err = newTestPlayer(registerMiniCodecsWithoutNack, func(play *testPlayer) error {
play.streamSuffix = streamSuffix
play.onOffer = func(s *webrtc.SessionDescription) error {
if n := strings.Count(s.SDP, "nack"); n != 0 {
return errors.Errorf("invalid %v nack", n)
}
return nil
}
play.onAnswer = func(s *webrtc.SessionDescription) error {
if n := strings.Count(s.SDP, "nack"); n != 0 {
return errors.Errorf("invalid %v nack", n)
}
cancel()
return nil
}
resources = append(resources, play)
return play.Setup(*srsVnetClientIP)
}); err != nil {
return err
}
// Initialize publisher with private api.
if thePublisher, err = newTestPublisher(registerMiniCodecs, func(pub *testPublisher) error {
pub.streamSuffix = streamSuffix
pub.iceReadyCancel = publishReadyCancel
resources = append(resources, pub)
return pub.Setup(*srsVnetClientIP)
}); err != nil {
return err
}
// Init done.
mainReadyCancel()
<-ctx.Done()
return nil
}
if err := doInit(); err != nil {
r1 = err
}
}()
// Run publisher.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-mainReady.Done():
r2 = thePublisher.Run(logger.WithContext(ctx), cancel)
logger.Tf(ctx, "pub done")
}
}()
// Run player.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-publishReady.Done():
r3 = thePlayer.Run(logger.WithContext(ctx), cancel)
logger.Tf(ctx, "play done")
}
}()
}
// Veirfy https://github.com/ossrs/srs/issues/2371
func TestBugfix2371_RTMP2RTC_PlayWithNack(t *testing.T) {
if err := filterTestError(func() error {
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
p, err := newTestPlayer(registerMiniCodecs, func(play *testPlayer) error {
play.onOffer = func(s *webrtc.SessionDescription) error {
if n := strings.Count(s.SDP, "nack"); n != 2 {
return errors.Errorf("invalid %v nack", n)
}
return nil
}
play.onAnswer = func(s *webrtc.SessionDescription) error {
if n := strings.Count(s.SDP, "nack"); n != 2 {
return errors.Errorf("invalid %v nack", n)
}
cancel()
return nil
}
return nil
})
if err != nil {
return err
}
defer p.Close()
if err := p.Setup(*srsVnetClientIP); err != nil {
return err
}
return p.Run(ctx, cancel)
}()); err != nil {
t.Errorf("err %+v", err)
}
}
// Basic use scenario, publish a stream.
func TestRtcBasic_PublishOnly(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("publish-only-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
return nil
})
@ -187,7 +524,7 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int())
// Initialize player with private api.
if thePlayer, err = newTestPlayer(createApiForPlayer, func(play *testPlayer) error {
if thePlayer, err = newTestPlayer(registerDefaultCodecs, func(play *testPlayer) error {
play.streamSuffix = streamSuffix
resources = append(resources, play)
@ -221,7 +558,7 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
}
// Initialize publisher with private api.
if thePublisher, err = newTestPublisher(createApiForPublisher, func(pub *testPublisher) error {
if thePublisher, err = newTestPublisher(registerDefaultCodecs, func(pub *testPublisher) error {
pub.streamSuffix = streamSuffix
pub.iceReadyCancel = publishReadyCancel
resources = append(resources, pub)
@ -342,7 +679,7 @@ func TestRtcBasic_Republish(t *testing.T) {
streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int())
// Initialize player with private api.
if thePlayer, err = newTestPlayer(createApiForPlayer, func(play *testPlayer) error {
if thePlayer, err = newTestPlayer(registerDefaultCodecs, func(play *testPlayer) error {
play.streamSuffix = streamSuffix
resources = append(resources, play)
@ -368,7 +705,7 @@ func TestRtcBasic_Republish(t *testing.T) {
}
// Initialize publisher with private api.
if thePublisher, err = newTestPublisher(createApiForPublisher, func(pub *testPublisher) error {
if thePublisher, err = newTestPublisher(registerDefaultCodecs, func(pub *testPublisher) error {
pub.streamSuffix = streamSuffix
pub.iceReadyCancel = publishReadyCancel
resources = append(resources, pub)
@ -391,7 +728,7 @@ func TestRtcBasic_Republish(t *testing.T) {
}
// Initialize re-publisher with private api.
if theRepublisher, err = newTestPublisher(createApiForPublisher, func(pub *testPublisher) error {
if theRepublisher, err = newTestPublisher(registerDefaultCodecs, func(pub *testPublisher) error {
pub.streamSuffix = streamSuffix
pub.iceReadyCancel = republishReadyCancel
resources = append(resources, pub)
@ -457,7 +794,7 @@ func TestRtcBasic_Republish(t *testing.T) {
func TestRtcDTLS_ClientActive_Default(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupActive
return nil
@ -512,7 +849,7 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) {
func TestRtcDTLS_ClientPassive_Default(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -564,7 +901,7 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) {
func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupActive
return nil
@ -623,7 +960,7 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) {
func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -689,7 +1026,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
var r0 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupActive
return nil
@ -767,7 +1104,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
var r0 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -844,7 +1181,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
var r0, r1 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupActive
return nil
@ -932,7 +1269,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
var r0, r1 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -1017,7 +1354,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
var r0 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupActive
return nil
@ -1094,7 +1431,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
var r0 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -1171,7 +1508,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
var r0, r1 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupActive
return nil
@ -1257,7 +1594,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
var r0, r1 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -1334,7 +1671,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -1387,7 +1724,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) {
func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -1440,7 +1777,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) {
func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -1493,7 +1830,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) {
func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -1547,7 +1884,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) {
func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) {
if err := filterTestError(func() error {
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil
@ -1625,7 +1962,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
var r0 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error {
p.streamSuffix = streamSuffix
p.onOffer = testUtilSetupPassive
return nil

@ -59,6 +59,7 @@ var srsDTLSDropPackets *int
var srsSchema string
var srsServer *string
var srsStream *string
var srsLiveStream *string
var srsPublishAudio *string
var srsPublishVideo *string
var srsVnetClientIP *string
@ -68,7 +69,8 @@ func prepareTest() error {
srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API")
srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to")
srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play")
srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC app/stream to play")
srsLiveStream = flag.String("srs-live-stream", "/live/livestream", "The LIVE app/stream to play")
srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
srsTimeout = flag.Int("srs-timeout", 5000, "For each case, the timeout in ms")
srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.")
@ -234,6 +236,11 @@ func (v *wallClock) Tick(d time.Duration) time.Duration {
return 0
}
// Do nothing for SDP.
func testUtilPassBy(s *webrtc.SessionDescription) error {
return nil
}
// Set to active, as DTLS client, to start ClientHello.
func testUtilSetupActive(s *webrtc.SessionDescription) error {
if strings.Contains(s.SDP, "setup:passive") {
@ -548,6 +555,7 @@ func (v *dtlsRecord) Unmarshal(b []byte) error {
return nil
}
// The func to setup testWebRTCAPI
type testWebRTCAPIOptionFunc func(api *testWebRTCAPI)
type testWebRTCAPI struct {
@ -566,25 +574,88 @@ type testWebRTCAPI struct {
proxy *vnet_proxy.UDPProxy
}
func newTestWebRTCAPI(options ...testWebRTCAPIOptionFunc) (*testWebRTCAPI, error) {
v := &testWebRTCAPI{}
// The func to initialize testWebRTCAPI
type testWebRTCAPIInitFunc func(api *testWebRTCAPI) error
// Implements interface testWebRTCAPIInitFunc to init testWebRTCAPI
func registerDefaultCodecs(api *testWebRTCAPI) error {
v := api
v.mediaEngine = &webrtc.MediaEngine{}
if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil {
return nil, err
return err
}
v.registry = &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(v.mediaEngine, v.registry); err != nil {
return nil, err
return err
}
for _, setup := range options {
setup(v)
return nil
}
// Implements interface testWebRTCAPIInitFunc to init testWebRTCAPI
func registerMiniCodecs(api *testWebRTCAPI) error {
v := api
if err := v.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{webrtc.MimeTypeOpus, 48000, 2, "minptime=10;useinbandfec=1", nil},
PayloadType: 111,
}, webrtc.RTPCodecTypeAudio); err != nil {
return err
}
videoRTCPFeedback := []webrtc.RTCPFeedback{{"goog-remb", ""}, {"ccm", "fir"}, {"nack", ""}, {"nack", "pli"}}
if err := v.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{webrtc.MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", videoRTCPFeedback},
PayloadType: 108,
}, webrtc.RTPCodecTypeVideo); err != nil {
return err
}
// Interceptors for NACK??? @see webrtc.ConfigureNack(v.mediaEngine, v.registry)
return nil
}
// Implements interface testWebRTCAPIInitFunc to init testWebRTCAPI
func registerMiniCodecsWithoutNack(api *testWebRTCAPI) error {
v := api
if err := v.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{webrtc.MimeTypeOpus, 48000, 2, "minptime=10;useinbandfec=1", nil},
PayloadType: 111,
}, webrtc.RTPCodecTypeAudio); err != nil {
return err
}
videoRTCPFeedback := []webrtc.RTCPFeedback{{"goog-remb", ""}, {"ccm", "fir"}}
if err := v.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{webrtc.MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f", videoRTCPFeedback},
PayloadType: 108,
}, webrtc.RTPCodecTypeVideo); err != nil {
return err
}
// Interceptors for NACK??? @see webrtc.ConfigureNack(v.mediaEngine, v.registry)
return nil
}
func newTestWebRTCAPI(inits ...testWebRTCAPIInitFunc) (*testWebRTCAPI, error) {
v := &testWebRTCAPI{}
v.mediaEngine = &webrtc.MediaEngine{}
v.registry = &interceptor.Registry{}
v.settingEngine = &webrtc.SettingEngine{}
// Apply initialize filter, for example, register default codecs when create publisher/player.
for _, setup := range inits {
if setup == nil {
continue
}
if err := setup(v); err != nil {
return nil, err
}
}
return v, nil
}
@ -635,10 +706,12 @@ func (v *testWebRTCAPI) Setup(vnetClientIP string, options ...testWebRTCAPIOptio
return err
}
// Apply options from params, for example, tester to register vnet filter.
for _, setup := range options {
setup(v)
}
// Apply options in api, for example, publisher register audio-level interceptor.
for _, setup := range v.options {
setup(v)
}
@ -659,26 +732,28 @@ func (v *testWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*
type testPlayerOptionFunc func(p *testPlayer) error
type testPlayer struct {
onOffer func(s *webrtc.SessionDescription) error
onAnswer func(s *webrtc.SessionDescription) error
pc *webrtc.PeerConnection
receivers []*webrtc.RTPReceiver
// We should dispose it.
api *testWebRTCAPI
// Optional suffix for stream url.
streamSuffix string
// Optional app/stream to play, use srsStream by default.
defaultStream string
}
func createApiForPlayer(play *testPlayer) error {
api, err := newTestWebRTCAPI()
// Create test player, the init is used to initialize api which maybe nil,
// and the options is used to setup the player itself.
func newTestPlayer(init testWebRTCAPIInitFunc, options ...testPlayerOptionFunc) (*testPlayer, error) {
v := &testPlayer{}
api, err := newTestWebRTCAPI(init)
if err != nil {
return err
return nil, err
}
play.api = api
return nil
}
func newTestPlayer(options ...testPlayerOptionFunc) (*testPlayer, error) {
v := &testPlayer{}
v.api = api
for _, opt := range options {
if err := opt(v); err != nil {
@ -711,6 +786,9 @@ func (v *testPlayer) Close() error {
func (v *testPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
if v.defaultStream != "" {
r = fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, v.defaultStream)
}
if v.streamSuffix != "" {
r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
}
@ -743,22 +821,35 @@ func (v *testPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
return errors.Wrapf(err, "Set offer %v", offer)
}
answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP)
if v.onOffer != nil {
if err := v.onOffer(&offer); err != nil {
return errors.Wrapf(err, "sdp %v %v", offer.Type, offer.SDP)
}
}
answerSDP, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP)
if err != nil {
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
}
// Run a proxy for real server and vnet.
if address, err := parseAddressOfCandidate(answer); err != nil {
return errors.Wrapf(err, "parse address of %v", answer)
if address, err := parseAddressOfCandidate(answerSDP); err != nil {
return errors.Wrapf(err, "parse address of %v", answerSDP)
} else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
return errors.Wrapf(err, "proxy %v to %v", v.api.network, address)
}
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer, SDP: answer,
}); err != nil {
return errors.Wrapf(err, "Set answer %v", answer)
answer := &webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer, SDP: answerSDP,
}
if v.onAnswer != nil {
if err := v.onAnswer(answer); err != nil {
return errors.Wrapf(err, "on answerSDP")
}
}
if err := pc.SetRemoteDescription(*answer); err != nil {
return errors.Wrapf(err, "Set answerSDP %v", answerSDP)
}
handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error {
@ -830,21 +921,19 @@ type testPublisher struct {
cancel context.CancelFunc
}
func createApiForPublisher(pub *testPublisher) error {
api, err := newTestWebRTCAPI()
if err != nil {
return err
}
pub.api = api
return nil
}
func newTestPublisher(options ...testPublisherOptionFunc) (*testPublisher, error) {
// Create test publisher, the init is used to initialize api which maybe nil,
// and the options is used to setup the publisher itself.
func newTestPublisher(init testWebRTCAPIInitFunc, options ...testPublisherOptionFunc) (*testPublisher, error) {
sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio
v := &testPublisher{}
api, err := newTestWebRTCAPI(init)
if err != nil {
return nil, err
}
v.api = api
for _, opt := range options {
if err := opt(v); err != nil {
return nil, err
@ -860,7 +949,6 @@ func newTestPublisher(options ...testPublisherOptionFunc) (*testPublisher, error
}
// Setup the interceptors for packets.
api := v.api
api.options = append(api.options, func(api *testWebRTCAPI) {
// Filter for RTCP packets.
rtcpInterceptor := &rtcpInterceptor{}

@ -0,0 +1,53 @@
listen 1935;
max_connections 1000;
daemon off;
srs_log_tank console;
http_server {
enabled on;
listen 8080;
dir ./objs/nginx/html;
}
http_api {
enabled on;
listen 1985;
}
stats {
network 0;
}
rtc_server {
enabled on;
# Listen at udp://8000
listen 8000;
#
# The $CANDIDATE means fetch from env, if not configed, use * as default.
#
# The * means retrieving server IP automatically, from all network interfaces,
# @see https://github.com/ossrs/srs/wiki/v4_CN_RTCWiki#config-candidate
candidate $CANDIDATE;
}
vhost __defaultVhost__ {
rtc {
enabled on;
}
http_remux {
enabled on;
mount [vhost]/[app]/[stream].flv;
}
ingest livestream {
enabled on;
input {
type file;
url ./doc/source.200kbps.768x320.flv;
}
ffmpeg ./objs/ffmpeg/bin/ffmpeg;
engine {
enabled off;
output rtmp://127.0.0.1:[port]/live/livestream;
}
}
}

@ -2909,7 +2909,7 @@ srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRtcUserConfig* ruc
//local_media_desc.payload_types_.back().rtcp_fb_.push_back("rrtr");
}
// TODO: FIXME: use one parse paylod from sdp.
// TODO: FIXME: use one parse payload from sdp.
track_desc->create_auxiliary_payload(remote_media_desc.find_media_with_encoding_name("red"));
track_desc->create_auxiliary_payload(remote_media_desc.find_media_with_encoding_name("rtx"));

Loading…
Cancel
Save