@ -34,6 +34,7 @@ import (
"io"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"path"
@ -65,6 +66,7 @@ var srsDTLSDropPackets *int
var srsSchema string
var srsServer * string
var srsHttpServer * string
var srsStream * string
var srsLiveStream * string
var srsPublishAudio * string
@ -75,7 +77,8 @@ var srsVnetClientIP *string
func prepareTest ( ) ( err error ) {
srsHttps = flag . Bool ( "srs-https" , false , "Whther connect to HTTPS-API" )
srsServer = flag . String ( "srs-server" , "127.0.0.1" , "The RTC server to connect to" )
srsServer = flag . String ( "srs-server" , "127.0.0.1" , "The RTMP/RTC server to connect to" )
srsHttpServer = flag . String ( "srs-http-server" , "127.0.0.1:8080" , "The HTTP server to connect to" )
srsStream = flag . String ( "srs-stream" , "/rtc/regression" , "The RTC app/stream to play" )
srsLiveStream = flag . String ( "srs-live-stream" , "/live/livestream" , "The LIVE app/stream to play" )
srsLog = flag . Bool ( "srs-log" , false , "Whether enable the detail log" )
@ -1445,6 +1448,10 @@ type RTMPPublisher struct {
client * RTMPClient
// Whether auto close transport when ingest done.
closeTransportWhenIngestDone bool
// Whether drop audio, set the hasAudio to false.
hasAudio bool
// Whether drop video, set the hasVideo to false.
hasVideo bool
onSendPacket func ( m * rtmp . Message ) error
}
@ -1456,6 +1463,7 @@ func NewRTMPPublisher() *RTMPPublisher {
// By default, set to on.
v . closeTransportWhenIngestDone = true
v . hasAudio , v . hasVideo = true , true
return v
}
@ -1465,6 +1473,7 @@ func (v *RTMPPublisher) Close() error {
}
func ( v * RTMPPublisher ) Publish ( ctx context . Context , rtmpUrl string ) error {
logger . Tf ( ctx , "Publish %v" , rtmpUrl )
return v . client . Publish ( ctx , rtmpUrl )
}
@ -1483,7 +1492,8 @@ func (v *RTMPPublisher) Ingest(ctx context.Context, flvInput string) error {
} ( )
// Consume all packets.
err := v . ingest ( flvInput )
logger . Tf ( ctx , "Start to ingest %v" , flvInput )
err := v . ingest ( ctx , flvInput )
if err == io . EOF {
return nil
}
@ -1493,7 +1503,7 @@ func (v *RTMPPublisher) Ingest(ctx context.Context, flvInput string) error {
return err
}
func ( v * RTMPPublisher ) ingest ( flvInput string ) error {
func ( v * RTMPPublisher ) ingest ( ctx context . Context , flvInput string ) error {
p := v . client
fs , err := os . Open ( flvInput )
@ -1501,6 +1511,7 @@ func (v *RTMPPublisher) ingest(flvInput string) error {
return err
}
defer fs . Close ( )
logger . Tf ( ctx , "Open input %v" , flvInput )
demuxer , err := flv . NewDemuxer ( fs )
if err != nil {
@ -1525,6 +1536,12 @@ func (v *RTMPPublisher) ingest(flvInput string) error {
if tagType != flv . TagTypeVideo && tagType != flv . TagTypeAudio {
continue
}
if ! v . hasAudio && tagType == flv . TagTypeAudio {
continue
}
if ! v . hasVideo && tagType == flv . TagTypeVideo {
continue
}
m := rtmp . NewStreamMessage ( p . streamID )
m . MessageType = rtmp . MessageType ( tagType )
@ -1577,6 +1594,9 @@ func (v *RTMPPlayer) Consume(ctx context.Context) error {
var wg sync . WaitGroup
defer wg . Wait ( )
ctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
@ -1618,6 +1638,133 @@ func (v *RTMPPlayer) consume() error {
}
}
type FLVPlayer struct {
flvUrl string
client * http . Client
resp * http . Response
f flv . Demuxer
onRecvHeader func ( hasAudio , hasVideo bool ) error
onRecvTag func ( tp flv . TagType , size , ts uint32 , tag [ ] byte ) error
}
func NewFLVPlayer ( ) * FLVPlayer {
return & FLVPlayer {
client : & http . Client { } , resp : nil , f : nil , onRecvHeader : nil , onRecvTag : nil ,
}
}
func ( v * FLVPlayer ) Close ( ) error {
if v . f != nil {
v . f . Close ( )
}
if v . resp != nil {
v . resp . Body . Close ( )
}
return nil
}
func ( v * FLVPlayer ) Play ( ctx context . Context , flvUrl string ) error {
v . flvUrl = flvUrl
return nil
}
func ( v * FLVPlayer ) Consume ( ctx context . Context ) error {
// If ctx is cancelled, close the RTMP transport.
var wg sync . WaitGroup
defer wg . Wait ( )
ctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
<- ctx . Done ( )
v . Close ( )
} ( )
// Start to play.
if err := v . play ( ctx , v . flvUrl ) ; err != nil {
return err
}
// Consume all packets.
err := v . consume ( ctx )
if err == io . EOF {
return nil
}
if ctx . Err ( ) == context . Canceled {
return nil
}
return err
}
func ( v * FLVPlayer ) play ( ctx context . Context , flvUrl string ) error {
logger . Tf ( ctx , "Run play flv url=%v" , flvUrl )
req , err := http . NewRequestWithContext ( ctx , "GET" , flvUrl , nil )
if err != nil {
return errors . Wrapf ( err , "New request for flv %v failed, err=%v" , flvUrl , err )
}
resp , err := v . client . Do ( req )
if err != nil {
return errors . Wrapf ( err , "Http get flv %v failed, err=%v" , flvUrl , err )
}
logger . Tf ( ctx , "Connected to %v" , flvUrl )
if v . resp != nil {
v . resp . Body . Close ( )
}
v . resp = resp
f , err := flv . NewDemuxer ( resp . Body )
if err != nil {
return errors . Wrapf ( err , "Create flv demuxer for %v failed, err=%v" , flvUrl , err )
}
if v . f != nil {
v . f . Close ( )
}
v . f = f
return nil
}
func ( v * FLVPlayer ) consume ( ctx context . Context ) ( err error ) {
var hasVideo , hasAudio bool
if _ , hasVideo , hasAudio , err = v . f . ReadHeader ( ) ; err != nil {
return errors . Wrapf ( err , "Flv demuxer read header failed, err=%v" , err )
}
logger . Tf ( ctx , "Got audio=%v, video=%v" , hasAudio , hasVideo )
if v . onRecvHeader != nil {
if err := v . onRecvHeader ( hasAudio , hasVideo ) ; err != nil {
return errors . Wrapf ( err , "Callback FLV header audio=%v, video=%v" , hasAudio , hasVideo )
}
}
for {
var tagType flv . TagType
var tagSize , timestamp uint32
if tagType , tagSize , timestamp , err = v . f . ReadTagHeader ( ) ; err != nil {
return errors . Wrapf ( err , "Flv demuxer read tag header failed, err=%v" , err )
}
var tag [ ] byte
if tag , err = v . f . ReadTag ( tagSize ) ; err != nil {
return errors . Wrapf ( err , "Flv demuxer read tag failed, err=%v" , err )
}
if v . onRecvTag != nil {
if err := v . onRecvTag ( tagType , tagSize , timestamp , tag ) ; err != nil {
return errors . Wrapf ( err , "Callback tag type=%v, size=%v, ts=%v, tag=%vB" , tagType , tagSize , timestamp , len ( tag ) )
}
}
}
}
func IsAvccrEquals ( a , b * avc . AVCDecoderConfigurationRecord ) bool {
if a == nil || b == nil {
return false