|
|
|
@ -637,155 +637,6 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// When republish a stream, the player stream SHOULD be continuous.
|
|
|
|
|
func TestRtcBasic_Republish(t *testing.T) {
|
|
|
|
|
ctx := logger.WithContext(context.Background())
|
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
|
|
|
|
|
|
|
|
var r0, r1, r2, r3, r4 error
|
|
|
|
|
defer func(ctx context.Context) {
|
|
|
|
|
if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4); err != nil {
|
|
|
|
|
t.Errorf("Fail for err %+v", err)
|
|
|
|
|
} else {
|
|
|
|
|
logger.Tf(ctx, "test done with err %+v", err)
|
|
|
|
|
}
|
|
|
|
|
}(ctx)
|
|
|
|
|
|
|
|
|
|
var resources []io.Closer
|
|
|
|
|
defer func() {
|
|
|
|
|
for _, resource := range resources {
|
|
|
|
|
_ = resource.Close()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
defer wg.Wait()
|
|
|
|
|
|
|
|
|
|
// The event notify.
|
|
|
|
|
var thePublisher, theRepublisher *testPublisher
|
|
|
|
|
var thePlayer *testPlayer
|
|
|
|
|
|
|
|
|
|
mainReady, mainReadyCancel := context.WithCancel(context.Background())
|
|
|
|
|
publishReady, publishReadyCancel := context.WithCancel(context.Background())
|
|
|
|
|
republishReady, republishReadyCancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
// Objects init.
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
doInit := func() (err error) {
|
|
|
|
|
streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int())
|
|
|
|
|
|
|
|
|
|
// Initialize player with private api.
|
|
|
|
|
if thePlayer, err = newTestPlayer(registerDefaultCodecs, func(play *testPlayer) error {
|
|
|
|
|
play.streamSuffix = streamSuffix
|
|
|
|
|
resources = append(resources, play)
|
|
|
|
|
|
|
|
|
|
var nnPlayReadRTP uint64
|
|
|
|
|
return play.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
|
|
|
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
|
|
|
|
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
|
|
|
|
select {
|
|
|
|
|
case <-republishReady.Done():
|
|
|
|
|
if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) {
|
|
|
|
|
cancel() // Completed.
|
|
|
|
|
}
|
|
|
|
|
logger.Tf(ctx, "Play recv rtp %v packets", nnPlayReadRTP)
|
|
|
|
|
default:
|
|
|
|
|
logger.Tf(ctx, "Play recv rtp packet before republish")
|
|
|
|
|
}
|
|
|
|
|
return i.nextRTPReader.Read(payload, attributes)
|
|
|
|
|
}
|
|
|
|
|
}))
|
|
|
|
|
})
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Initialize publisher with private api.
|
|
|
|
|
if thePublisher, err = newTestPublisher(registerDefaultCodecs, func(pub *testPublisher) error {
|
|
|
|
|
pub.streamSuffix = streamSuffix
|
|
|
|
|
pub.iceReadyCancel = publishReadyCancel
|
|
|
|
|
resources = append(resources, pub)
|
|
|
|
|
|
|
|
|
|
var nnPubReadRTCP uint64
|
|
|
|
|
return pub.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
|
|
|
|
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
|
|
|
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
|
|
|
|
nn, attr, err := i.nextRTCPReader.Read(buf, attributes)
|
|
|
|
|
if nnPubReadRTCP++; nnPubReadRTCP > 0 && pub.cancel != nil {
|
|
|
|
|
pub.cancel() // We only cancel the publisher itself.
|
|
|
|
|
}
|
|
|
|
|
logger.Tf(ctx, "Publish recv rtcp %v packets", nnPubReadRTCP)
|
|
|
|
|
return nn, attr, err
|
|
|
|
|
}
|
|
|
|
|
}))
|
|
|
|
|
})
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Initialize re-publisher with private api.
|
|
|
|
|
if theRepublisher, err = newTestPublisher(registerDefaultCodecs, func(pub *testPublisher) error {
|
|
|
|
|
pub.streamSuffix = streamSuffix
|
|
|
|
|
pub.iceReadyCancel = republishReadyCancel
|
|
|
|
|
resources = append(resources, pub)
|
|
|
|
|
|
|
|
|
|
return pub.Setup(*srsVnetClientIP)
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Init done.
|
|
|
|
|
mainReadyCancel()
|
|
|
|
|
|
|
|
|
|
<-ctx.Done()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := doInit(); err != nil {
|
|
|
|
|
r1 = err
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Run publisher.
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
case <-mainReady.Done():
|
|
|
|
|
pubCtx, pubCancel := context.WithCancel(ctx)
|
|
|
|
|
r2 = thePublisher.Run(logger.WithContext(pubCtx), pubCancel)
|
|
|
|
|
logger.Tf(ctx, "pub done, re-publish again")
|
|
|
|
|
|
|
|
|
|
// Dispose the stream.
|
|
|
|
|
_ = thePublisher.Close()
|
|
|
|
|
|
|
|
|
|
r4 = theRepublisher.Run(logger.WithContext(ctx), cancel)
|
|
|
|
|
logger.Tf(ctx, "re-pub done")
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Run player.
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
case <-publishReady.Done():
|
|
|
|
|
r3 = thePlayer.Run(logger.WithContext(ctx), cancel)
|
|
|
|
|
logger.Tf(ctx, "play done")
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The srs-server is DTLS server(passive), srs-bench is DTLS client which is active mode.
|
|
|
|
|
// No.1 srs-bench: ClientHello
|
|
|
|
|
// No.2 srs-server: ServerHello, Certificate, ServerKeyExchange, CertificateRequest, ServerHelloDone
|
|
|
|
|