Test: Refine RTMP regression test.

pull/2683/head
winlin 3 years ago
parent 9c315c94fc
commit 29b98c49cc

@ -24,7 +24,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/ossrs/go-oryx-lib/rtmp"
"io"
"io/ioutil"
"math/rand"
@ -46,23 +45,6 @@ import (
"github.com/pion/rtp"
)
func TestMain(m *testing.M) {
if err := prepareTest(); err != nil {
logger.Ef(nil, "Prepare test fail, err %+v", err)
os.Exit(-1)
}
// Disable the logger during all tests.
if *srsLog == false {
olw := logger.Switch(ioutil.Discard)
defer func() {
logger.Switch(olw)
}()
}
os.Exit(m.Run())
}
// Test for https://github.com/ossrs/srs/pull/2483
func TestPR2483_RtcStatApi_PublisherOnly(t *testing.T) {
if err := filterTestError(func() error {
@ -2098,73 +2080,3 @@ func TestRtcPublishFlvPlay(t *testing.T) {
}
}()
}
func TestRtmpPublishPlay(t *testing.T) {
var r0, r1 error
err := func() error {
publisher := NewRTMPPublisher()
defer publisher.Close()
player := NewRTMPPlayer()
defer player.Close()
// Connect to RTMP URL.
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix)
if err := publisher.Publish(ctx, rtmpUrl); err != nil {
return err
}
if err := player.Play(ctx, rtmpUrl); err != nil {
return err
}
// Check packets.
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
publisher.Close()
player.Close()
}()
wg.Add(1)
go func() {
defer wg.Done()
var nnPackets int
player.onRecvPacket = func(m *rtmp.Message) error {
logger.Tf(ctx, "got %v packet, %v %vms %vB",
nnPackets, m.MessageType, m.Timestamp, len(m.Payload))
if nnPackets += 1; nnPackets > 50 {
cancel()
}
return nil
}
if r1 = player.Consume(ctx); r1 != nil {
cancel()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
publisher.onSendPacket = func(m *rtmp.Message) error {
time.Sleep(1 * time.Millisecond)
return nil
}
if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil {
cancel()
}
}()
return nil
}()
if err := filterTestError(err, r0, r1); err != nil {
t.Errorf("err %+v", err)
}
}

@ -0,0 +1,96 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package srs
import (
"context"
"fmt"
"math/rand"
"os"
"sync"
"testing"
"time"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/ossrs/go-oryx-lib/rtmp"
)
func TestRtmpPublishPlay(t *testing.T) {
var r0, r1 error
err := func() error {
publisher := NewRTMPPublisher()
defer publisher.Close()
player := NewRTMPPlayer()
defer player.Close()
// Connect to RTMP URL.
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix)
if err := publisher.Publish(ctx, rtmpUrl); err != nil {
return err
}
if err := player.Play(ctx, rtmpUrl); err != nil {
return err
}
// Check packets.
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
var nnPackets int
player.onRecvPacket = func(m *rtmp.Message) error {
logger.Tf(ctx, "got %v packet, %v %vms %vB",
nnPackets, m.MessageType, m.Timestamp, len(m.Payload))
if nnPackets += 1; nnPackets > 50 {
cancel()
}
return nil
}
if r1 = player.Consume(ctx); r1 != nil {
cancel()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
publisher.onSendPacket = func(m *rtmp.Message) error {
time.Sleep(1 * time.Millisecond)
return nil
}
if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil {
cancel()
}
}()
return nil
}()
if err := filterTestError(err, r0, r1); err != nil {
t.Errorf("err %+v", err)
}
}

@ -0,0 +1,45 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package srs
import (
"github.com/ossrs/go-oryx-lib/logger"
"io/ioutil"
"os"
"testing"
)
func TestMain(m *testing.M) {
if err := prepareTest(); err != nil {
logger.Ef(nil, "Prepare test fail, err %+v", err)
os.Exit(-1)
}
// Disable the logger during all tests.
if *srsLog == false {
olw := logger.Switch(ioutil.Discard)
defer func() {
logger.Switch(olw)
}()
}
os.Exit(m.Run())
}

@ -1448,6 +1448,18 @@ func (v *RTMPPublisher) Publish(ctx context.Context, rtmpUrl string) error {
}
func (v *RTMPPublisher) Ingest(ctx context.Context, flvInput string) error {
// If ctx is cancelled, close the RTMP transport.
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
v.Close()
}()
// Consume all packets.
err := v.ingest(flvInput)
if err == io.EOF {
return nil
@ -1530,6 +1542,18 @@ func (v *RTMPPlayer) Play(ctx context.Context, rtmpUrl string) error {
}
func (v *RTMPPlayer) Consume(ctx context.Context) error {
// If ctx is cancelled, close the RTMP transport.
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
v.Close()
}()
// Consume all packets.
err := v.consume()
if err == io.EOF {
return nil

Loading…
Cancel
Save