For #1641, Support RTMP publish and play regression test. v4.0.173

pull/2683/head
winlin 3 years ago
parent 0f9b9505a8
commit 9c315c94fc

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

File diff suppressed because it is too large Load Diff

@ -24,6 +24,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/ossrs/go-oryx-lib/rtmp"
"io"
"io/ioutil"
"math/rand"
@ -2097,3 +2098,73 @@ func TestRtcPublishFlvPlay(t *testing.T) {
}
}()
}
func TestRtmpPublishPlay(t *testing.T) {
var r0, r1 error
err := func() error {
publisher := NewRTMPPublisher()
defer publisher.Close()
player := NewRTMPPlayer()
defer player.Close()
// Connect to RTMP URL.
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix)
if err := publisher.Publish(ctx, rtmpUrl); err != nil {
return err
}
if err := player.Play(ctx, rtmpUrl); err != nil {
return err
}
// Check packets.
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
publisher.Close()
player.Close()
}()
wg.Add(1)
go func() {
defer wg.Done()
var nnPackets int
player.onRecvPacket = func(m *rtmp.Message) error {
logger.Tf(ctx, "got %v packet, %v %vms %vB",
nnPackets, m.MessageType, m.Timestamp, len(m.Payload))
if nnPackets += 1; nnPackets > 50 {
cancel()
}
return nil
}
if r1 = player.Consume(ctx); r1 != nil {
cancel()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
publisher.onSendPacket = func(m *rtmp.Message) error {
time.Sleep(1 * time.Millisecond)
return nil
}
if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil {
cancel()
}
}()
return nil
}()
if err := filterTestError(err, r0, r1); err != nil {
t.Errorf("err %+v", err)
}
}

@ -25,7 +25,11 @@ import (
"context"
"flag"
"fmt"
"github.com/ossrs/go-oryx-lib/amf0"
"github.com/ossrs/go-oryx-lib/flv"
"github.com/ossrs/go-oryx-lib/rtmp"
"io"
"math/rand"
"net"
"net/url"
"os"
@ -62,6 +66,8 @@ var srsStream *string
var srsLiveStream *string
var srsPublishAudio *string
var srsPublishVideo *string
var srsPublishAvatar *string
var srsPublishBBB *string
var srsVnetClientIP *string
func prepareTest() error {
@ -78,6 +84,8 @@ func prepareTest() error {
srsPublishOKPackets = flag.Int("srs-publish-ok-packets", 3, "If send N RTP, recv N RTCP packets, it's ok, or fail")
srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.")
srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.")
srsPublishAvatar = flag.String("srs-publish-avatar", "avatar.flv", "The avatar file for publisher.")
srsPublishBBB = flag.String("srs-publish-bbb", "bbb.flv", "The bbb file for publisher.")
srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.")
srsVnetClientIP = flag.String("srs-vnet-client-ip", "192.168.168.168", "The client ip in pion/vnet.")
srsDTLSDropPackets = flag.Int("srs-dtls-drop-packets", 5, "If dropped N packets, it's ok, or fail")
@ -122,6 +130,14 @@ func prepareTest() error {
return err
}
if *srsPublishAvatar, err = tryOpenFile(*srsPublishAvatar); err != nil {
return err
}
if *srsPublishBBB, err = tryOpenFile(*srsPublishBBB); err != nil {
return err
}
if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil {
return err
}
@ -1240,3 +1256,301 @@ func (v *testPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro
}
return ctx.Err()
}
type RTMPClient struct {
rtmpUrl string
rtmpTcUrl string
rtmpStream string
rtmpUrlObject *url.URL
streamID int
conn *net.TCPConn
proto *rtmp.Protocol
}
func (v *RTMPClient) Close() error {
if v.conn != nil {
v.conn.Close()
}
return nil
}
func (v *RTMPClient) connect(rtmpUrl string) error {
v.rtmpUrl = rtmpUrl
if index := strings.LastIndex(rtmpUrl, "/"); index <= 0 {
return fmt.Errorf("invalid url %v, index=%v", rtmpUrl, index)
} else {
v.rtmpTcUrl = rtmpUrl[0:index]
v.rtmpStream = rtmpUrl[index+1:]
}
// Parse RTMP url.
rtmpUrlObject, err := url.Parse(rtmpUrl)
if err != nil {
return err
}
v.rtmpUrlObject = rtmpUrlObject
port := rtmpUrlObject.Port()
if port == "" {
port = "1935"
}
// Connect to TCP server.
rtmpAddr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%v:%v", rtmpUrlObject.Hostname(), port))
if err != nil {
return err
}
c, err := net.DialTCP("tcp4", nil, rtmpAddr)
if err != nil {
return err
}
v.conn = c
// RTMP Handshake with server.
hs := rtmp.NewHandshake(rand.New(rand.NewSource(time.Now().UnixNano())))
if err := hs.WriteC0S0(c); err != nil {
return err
}
if err := hs.WriteC1S1(c); err != nil {
return err
}
if _, err := hs.ReadC0S0(c); err != nil {
return err
}
s1, err := hs.ReadC1S1(c)
if err != nil {
return err
}
if _, err := hs.ReadC2S2(c); err != nil {
return err
}
if err := hs.WriteC2S2(c, s1); err != nil {
return err
}
// Connect to RTMP tcUrl.
p := rtmp.NewProtocol(v.conn)
pkt := rtmp.NewConnectAppPacket()
pkt.CommandObject.Set("tcUrl", amf0.NewString(v.rtmpTcUrl))
if err = p.WritePacket(pkt, 0); err != nil {
return err
}
res := rtmp.NewConnectAppResPacket(pkt.TransactionID)
if _, err := p.ExpectPacket(&res); err != nil {
return err
}
v.proto = p
return nil
}
func (v *RTMPClient) Publish(ctx context.Context, rtmpUrl string) error {
if err := v.connect(rtmpUrl); err != nil {
return err
}
p := v.proto
// Create RTMP stream.
if true {
pkt := rtmp.NewCreateStreamPacket()
if err := p.WritePacket(pkt, 0); err != nil {
return err
}
res := rtmp.NewCreateStreamResPacket(pkt.TransactionID)
if _, err := p.ExpectPacket(&res); err != nil {
return err
}
v.streamID = int(res.StreamID)
}
// Publish RTMP stream.
if true {
pkt := rtmp.NewPublishPacket()
pkt.StreamName = *amf0.NewString(v.rtmpStream)
if err := p.WritePacket(pkt, v.streamID); err != nil {
return err
}
res := rtmp.NewCallPacket()
if _, err := p.ExpectPacket(&res); err != nil {
return err
}
}
return nil
}
func (v *RTMPClient) Play(ctx context.Context, rtmpUrl string) error {
if err := v.connect(rtmpUrl); err != nil {
return err
}
p := v.proto
// Create RTMP stream.
if true {
pkt := rtmp.NewCreateStreamPacket()
if err := p.WritePacket(pkt, 0); err != nil {
return err
}
res := rtmp.NewCreateStreamResPacket(pkt.TransactionID)
if _, err := p.ExpectPacket(&res); err != nil {
return err
}
v.streamID = int(res.StreamID)
}
// Play RTMP stream.
if true {
pkt := rtmp.NewPlayPacket()
pkt.StreamName = *amf0.NewString(v.rtmpStream)
if err := p.WritePacket(pkt, v.streamID); err != nil {
return err
}
res := rtmp.NewCallPacket()
if _, err := p.ExpectPacket(&res); err != nil {
return err
}
}
return nil
}
type RTMPPublisher struct {
client *RTMPClient
onSendPacket func(m *rtmp.Message) error
}
func NewRTMPPublisher() *RTMPPublisher {
return &RTMPPublisher{
client: &RTMPClient{},
}
}
func (v *RTMPPublisher) Close() error {
return v.client.Close()
}
func (v *RTMPPublisher) Publish(ctx context.Context, rtmpUrl string) error {
return v.client.Publish(ctx, rtmpUrl)
}
func (v *RTMPPublisher) Ingest(ctx context.Context, flvInput string) error {
err := v.ingest(flvInput)
if err == io.EOF {
return nil
}
if ctx.Err() == context.Canceled {
return nil
}
return err
}
func (v *RTMPPublisher) ingest(flvInput string) error {
p := v.client
fs, err := os.Open(flvInput)
if err != nil {
return err
}
defer fs.Close()
demuxer, err := flv.NewDemuxer(fs)
if err != nil {
return err
}
if _, _, _, err = demuxer.ReadHeader(); err != nil {
return err
}
for {
tagType, tagSize, timestamp, err := demuxer.ReadTagHeader()
if err != nil {
return err
}
tag, err := demuxer.ReadTag(tagSize)
if err != nil {
return err
}
if tagType != flv.TagTypeVideo && tagType != flv.TagTypeAudio {
continue
}
m := rtmp.NewStreamMessage(p.streamID)
m.MessageType = rtmp.MessageType(tagType)
m.Timestamp = uint64(timestamp)
m.Payload = tag
if err = p.proto.WriteMessage(m); err != nil {
return err
}
if v.onSendPacket != nil {
if err = v.onSendPacket(m); err != nil {
return err
}
}
}
return nil
}
type RTMPPlayer struct {
client *RTMPClient
onRecvPacket func(m *rtmp.Message) error
}
func NewRTMPPlayer() *RTMPPlayer {
return &RTMPPlayer{
client: &RTMPClient{},
}
}
func (v *RTMPPlayer) Close() error {
return v.client.Close()
}
func (v *RTMPPlayer) Play(ctx context.Context, rtmpUrl string) error {
return v.client.Play(ctx, rtmpUrl)
}
func (v *RTMPPlayer) Consume(ctx context.Context) error {
err := v.consume()
if err == io.EOF {
return nil
}
if ctx.Err() == context.Canceled {
return nil
}
return err
}
func (v *RTMPPlayer) consume() error {
for {
res, err := v.client.proto.ExpectMessage(rtmp.MessageTypeVideo, rtmp.MessageTypeAudio)
if err != nil {
return err
}
if v.onRecvPacket != nil {
if err := v.onRecvPacket(res); err != nil {
return err
}
}
}
}

@ -0,0 +1,738 @@
// The MIT License (MIT)
//
// Copyright (c) 2013-2017 Oryx(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// The oryx amf0 package support AMF0 codec.
package amf0
import (
"bytes"
"encoding"
"encoding/binary"
"fmt"
oe "github.com/ossrs/go-oryx-lib/errors"
"math"
"sync"
)
// Please read @doc amf0_spec_121207.pdf, @page 4, @section 2.1 Types Overview
type marker uint8
const (
markerNumber marker = iota // 0
markerBoolean // 1
markerString // 2
markerObject // 3
markerMovieClip // 4
markerNull // 5
markerUndefined // 6
markerReference // 7
markerEcmaArray // 8
markerObjectEnd // 9
markerStrictArray // 10
markerDate // 11
markerLongString // 12
markerUnsupported // 13
markerRecordSet // 14
markerXmlDocument // 15
markerTypedObject // 16
markerAvmPlusObject // 17
markerForbidden marker = 0xff
)
func (v marker) String() string {
switch v {
case markerNumber:
return "Number"
case markerBoolean:
return "Boolean"
case markerString:
return "String"
case markerObject:
return "Object"
case markerNull:
return "Null"
case markerUndefined:
return "Undefined"
case markerReference:
return "Reference"
case markerEcmaArray:
return "EcmaArray"
case markerObjectEnd:
return "ObjectEnd"
case markerStrictArray:
return "StrictArray"
case markerDate:
return "Date"
case markerLongString:
return "LongString"
case markerUnsupported:
return "Unsupported"
case markerXmlDocument:
return "XmlDocument"
case markerTypedObject:
return "TypedObject"
case markerAvmPlusObject:
return "AvmPlusObject"
case markerMovieClip:
return "MovieClip"
case markerRecordSet:
return "RecordSet"
default:
return "Forbidden"
}
}
// For utest to mock it.
type buffer interface {
Bytes() []byte
WriteByte(c byte) error
Write(p []byte) (n int, err error)
}
var createBuffer = func() buffer {
return &bytes.Buffer{}
}
// All AMF0 things.
type Amf0 interface {
// Binary marshaler and unmarshaler.
encoding.BinaryUnmarshaler
encoding.BinaryMarshaler
// Get the size of bytes to marshal this object.
Size() int
// Get the Marker of any AMF0 stuff.
amf0Marker() marker
}
// Discovery the amf0 object from the bytes b.
func Discovery(p []byte) (a Amf0, err error) {
if len(p) < 1 {
return nil, oe.Errorf("require 1 bytes only %v", len(p))
}
m := marker(p[0])
switch m {
case markerNumber:
return NewNumber(0), nil
case markerBoolean:
return NewBoolean(false), nil
case markerString:
return NewString(""), nil
case markerObject:
return NewObject(), nil
case markerNull:
return NewNull(), nil
case markerUndefined:
return NewUndefined(), nil
case markerReference:
case markerEcmaArray:
return NewEcmaArray(), nil
case markerObjectEnd:
return &objectEOF{}, nil
case markerStrictArray:
return NewStrictArray(), nil
case markerDate, markerLongString, markerUnsupported, markerXmlDocument,
markerTypedObject, markerAvmPlusObject, markerForbidden, markerMovieClip,
markerRecordSet:
return nil, oe.Errorf("Marker %v is not supported", m)
}
return nil, oe.Errorf("Marker %v is invalid", m)
}
// The UTF8 string, please read @doc amf0_spec_121207.pdf, @page 3, @section 1.3.1 Strings and UTF-8
type amf0UTF8 string
func (v *amf0UTF8) Size() int {
return 2 + len(string(*v))
}
func (v *amf0UTF8) UnmarshalBinary(data []byte) (err error) {
var p []byte
if p = data; len(p) < 2 {
return oe.Errorf("require 2 bytes only %v", len(p))
}
size := uint16(p[0])<<8 | uint16(p[1])
if p = data[2:]; len(p) < int(size) {
return oe.Errorf("require %v bytes only %v", int(size), len(p))
}
*v = amf0UTF8(string(p[:size]))
return
}
func (v *amf0UTF8) MarshalBinary() (data []byte, err error) {
data = make([]byte, v.Size())
size := uint16(len(string(*v)))
data[0] = byte(size >> 8)
data[1] = byte(size)
if size > 0 {
copy(data[2:], []byte(*v))
}
return
}
// The number object, please read @doc amf0_spec_121207.pdf, @page 5, @section 2.2 Number Type
type Number float64
func NewNumber(f float64) *Number {
v := Number(f)
return &v
}
func (v *Number) amf0Marker() marker {
return markerNumber
}
func (v *Number) Size() int {
return 1 + 8
}
func (v *Number) UnmarshalBinary(data []byte) (err error) {
var p []byte
if p = data; len(p) < 9 {
return oe.Errorf("require 9 bytes only %v", len(p))
}
if m := marker(p[0]); m != markerNumber {
return oe.Errorf("Number marker %v is illegal", m)
}
f := binary.BigEndian.Uint64(p[1:])
*v = Number(math.Float64frombits(f))
return
}
func (v *Number) MarshalBinary() (data []byte, err error) {
data = make([]byte, 9)
data[0] = byte(markerNumber)
f := math.Float64bits(float64(*v))
binary.BigEndian.PutUint64(data[1:], f)
return
}
// The string objet, please read @doc amf0_spec_121207.pdf, @page 5, @section 2.4 String Type
type String string
func NewString(s string) *String {
v := String(s)
return &v
}
func (v *String) amf0Marker() marker {
return markerString
}
func (v *String) Size() int {
u := amf0UTF8(*v)
return 1 + u.Size()
}
func (v *String) UnmarshalBinary(data []byte) (err error) {
var p []byte
if p = data; len(p) < 1 {
return oe.Errorf("require 1 bytes only %v", len(p))
}
if m := marker(p[0]); m != markerString {
return oe.Errorf("String marker %v is illegal", m)
}
var sv amf0UTF8
if err = sv.UnmarshalBinary(p[1:]); err != nil {
return oe.WithMessage(err, "utf8")
}
*v = String(string(sv))
return
}
func (v *String) MarshalBinary() (data []byte, err error) {
u := amf0UTF8(*v)
var pb []byte
if pb, err = u.MarshalBinary(); err != nil {
return nil, oe.WithMessage(err, "utf8")
}
data = append([]byte{byte(markerString)}, pb...)
return
}
// The AMF0 object end type, please read @doc amf0_spec_121207.pdf, @page 5, @section 2.11 Object End Type
type objectEOF struct {
}
func (v *objectEOF) amf0Marker() marker {
return markerObjectEnd
}
func (v *objectEOF) Size() int {
return 3
}
func (v *objectEOF) UnmarshalBinary(data []byte) (err error) {
p := data
if len(p) < 3 {
return oe.Errorf("require 3 bytes only %v", len(p))
}
if p[0] != 0 || p[1] != 0 || p[2] != 9 {
return oe.Errorf("EOF marker %v is illegal", p[0:3])
}
return
}
func (v *objectEOF) MarshalBinary() (data []byte, err error) {
return []byte{0, 0, 9}, nil
}
// Use array for object and ecma array, to keep the original order.
type property struct {
key amf0UTF8
value Amf0
}
// The object-like AMF0 structure, like object and ecma array and strict array.
type objectBase struct {
properties []*property
lock sync.Mutex
}
func (v *objectBase) Size() int {
v.lock.Lock()
defer v.lock.Unlock()
var size int
for _, p := range v.properties {
key, value := p.key, p.value
size += key.Size() + value.Size()
}
return size
}
func (v *objectBase) Get(key string) Amf0 {
v.lock.Lock()
defer v.lock.Unlock()
for _, p := range v.properties {
if string(p.key) == key {
return p.value
}
}
return nil
}
func (v *objectBase) Set(key string, value Amf0) *objectBase {
v.lock.Lock()
defer v.lock.Unlock()
prop := &property{key: amf0UTF8(key), value: value}
var ok bool
for i, p := range v.properties {
if string(p.key) == key {
v.properties[i] = prop
ok = true
}
}
if !ok {
v.properties = append(v.properties, prop)
}
return v
}
func (v *objectBase) unmarshal(p []byte, eof bool, maxElems int) (err error) {
// if no eof, elems specified by maxElems.
if !eof && maxElems < 0 {
return oe.Errorf("maxElems=%v without eof", maxElems)
}
// if eof, maxElems must be -1.
if eof && maxElems != -1 {
return oe.Errorf("maxElems=%v with eof", maxElems)
}
readOne := func() (amf0UTF8, Amf0, error) {
var u amf0UTF8
if err = u.UnmarshalBinary(p); err != nil {
return "", nil, oe.WithMessage(err, "prop name")
}
p = p[u.Size():]
var a Amf0
if a, err = Discovery(p); err != nil {
return "", nil, oe.WithMessage(err, fmt.Sprintf("discover prop %v", string(u)))
}
return u, a, nil
}
pushOne := func(u amf0UTF8, a Amf0) error {
// For object property, consume the whole bytes.
if err = a.UnmarshalBinary(p); err != nil {
return oe.WithMessage(err, fmt.Sprintf("unmarshal prop %v", string(u)))
}
v.Set(string(u), a)
p = p[a.Size():]
return nil
}
for eof {
u, a, err := readOne()
if err != nil {
return oe.WithMessage(err, "read")
}
// For object EOF, we should only consume total 3bytes.
if u.Size() == 2 && a.amf0Marker() == markerObjectEnd {
// 2 bytes is consumed by u(name), the a(eof) should only consume 1 byte.
p = p[1:]
return nil
}
if err := pushOne(u, a); err != nil {
return oe.WithMessage(err, "push")
}
}
for len(v.properties) < maxElems {
u, a, err := readOne()
if err != nil {
return oe.WithMessage(err, "read")
}
if err := pushOne(u, a); err != nil {
return oe.WithMessage(err, "push")
}
}
return
}
func (v *objectBase) marshal(b buffer) (err error) {
v.lock.Lock()
defer v.lock.Unlock()
var pb []byte
for _, p := range v.properties {
key, value := p.key, p.value
if pb, err = key.MarshalBinary(); err != nil {
return oe.WithMessage(err, fmt.Sprintf("marshal %v", string(key)))
}
if _, err = b.Write(pb); err != nil {
return oe.Wrapf(err, "write %v", string(key))
}
if pb, err = value.MarshalBinary(); err != nil {
return oe.WithMessage(err, fmt.Sprintf("marshal value for %v", string(key)))
}
if _, err = b.Write(pb); err != nil {
return oe.Wrapf(err, "marshal value for %v", string(key))
}
}
return
}
// The AMF0 object, please read @doc amf0_spec_121207.pdf, @page 5, @section 2.5 Object Type
type Object struct {
objectBase
eof objectEOF
}
func NewObject() *Object {
v := &Object{}
v.properties = []*property{}
return v
}
func (v *Object) amf0Marker() marker {
return markerObject
}
func (v *Object) Size() int {
return int(1) + v.eof.Size() + v.objectBase.Size()
}
func (v *Object) UnmarshalBinary(data []byte) (err error) {
var p []byte
if p = data; len(p) < 1 {
return oe.Errorf("require 1 byte only %v", len(p))
}
if m := marker(p[0]); m != markerObject {
return oe.Errorf("Object marker %v is illegal", m)
}
p = p[1:]
if err = v.unmarshal(p, true, -1); err != nil {
return oe.WithMessage(err, "unmarshal")
}
return
}
func (v *Object) MarshalBinary() (data []byte, err error) {
b := createBuffer()
if err = b.WriteByte(byte(markerObject)); err != nil {
return nil, oe.Wrap(err, "marshal")
}
if err = v.marshal(b); err != nil {
return nil, oe.WithMessage(err, "marshal")
}
var pb []byte
if pb, err = v.eof.MarshalBinary(); err != nil {
return nil, oe.WithMessage(err, "marshal")
}
if _, err = b.Write(pb); err != nil {
return nil, oe.Wrap(err, "marshal")
}
return b.Bytes(), nil
}
// The AMF0 ecma array, please read @doc amf0_spec_121207.pdf, @page 6, @section 2.10 ECMA Array Type
type EcmaArray struct {
objectBase
count uint32
eof objectEOF
}
func NewEcmaArray() *EcmaArray {
v := &EcmaArray{}
v.properties = []*property{}
return v
}
func (v *EcmaArray) amf0Marker() marker {
return markerEcmaArray
}
func (v *EcmaArray) Size() int {
return int(1) + 4 + v.eof.Size() + v.objectBase.Size()
}
func (v *EcmaArray) UnmarshalBinary(data []byte) (err error) {
var p []byte
if p = data; len(p) < 5 {
return oe.Errorf("require 5 bytes only %v", len(p))
}
if m := marker(p[0]); m != markerEcmaArray {
return oe.Errorf("EcmaArray marker %v is illegal", m)
}
v.count = binary.BigEndian.Uint32(p[1:])
p = p[5:]
if err = v.unmarshal(p, true, -1); err != nil {
return oe.WithMessage(err, "unmarshal")
}
return
}
func (v *EcmaArray) MarshalBinary() (data []byte, err error) {
b := createBuffer()
if err = b.WriteByte(byte(markerEcmaArray)); err != nil {
return nil, oe.Wrap(err, "marshal")
}
if err = binary.Write(b, binary.BigEndian, v.count); err != nil {
return nil, oe.Wrap(err, "marshal")
}
if err = v.marshal(b); err != nil {
return nil, oe.WithMessage(err, "marshal")
}
var pb []byte
if pb, err = v.eof.MarshalBinary(); err != nil {
return nil, oe.WithMessage(err, "marshal")
}
if _, err = b.Write(pb); err != nil {
return nil, oe.Wrap(err, "marshal")
}
return b.Bytes(), nil
}
// The AMF0 strict array, please read @doc amf0_spec_121207.pdf, @page 7, @section 2.12 Strict Array Type
type StrictArray struct {
objectBase
count uint32
}
func NewStrictArray() *StrictArray {
v := &StrictArray{}
v.properties = []*property{}
return v
}
func (v *StrictArray) amf0Marker() marker {
return markerStrictArray
}
func (v *StrictArray) Size() int {
return int(1) + 4 + v.objectBase.Size()
}
func (v *StrictArray) UnmarshalBinary(data []byte) (err error) {
var p []byte
if p = data; len(p) < 5 {
return oe.Errorf("require 5 bytes only %v", len(p))
}
if m := marker(p[0]); m != markerStrictArray {
return oe.Errorf("StrictArray marker %v is illegal", m)
}
v.count = binary.BigEndian.Uint32(p[1:])
p = p[5:]
if int(v.count) <= 0 {
return
}
if err = v.unmarshal(p, false, int(v.count)); err != nil {
return oe.WithMessage(err, "unmarshal")
}
return
}
func (v *StrictArray) MarshalBinary() (data []byte, err error) {
b := createBuffer()
if err = b.WriteByte(byte(markerStrictArray)); err != nil {
return nil, oe.Wrap(err, "marshal")
}
if err = binary.Write(b, binary.BigEndian, v.count); err != nil {
return nil, oe.Wrap(err, "marshal")
}
if err = v.marshal(b); err != nil {
return nil, oe.WithMessage(err, "marshal")
}
return b.Bytes(), nil
}
// The single marker object, for all AMF0 which only has the marker, like null and undefined.
type singleMarkerObject struct {
target marker
}
func newSingleMarkerObject(m marker) singleMarkerObject {
return singleMarkerObject{target: m}
}
func (v *singleMarkerObject) amf0Marker() marker {
return v.target
}
func (v *singleMarkerObject) Size() int {
return int(1)
}
func (v *singleMarkerObject) UnmarshalBinary(data []byte) (err error) {
var p []byte
if p = data; len(p) < 1 {
return oe.Errorf("require 1 byte only %v", len(p))
}
if m := marker(p[0]); m != v.target {
return oe.Errorf("%v marker %v is illegal", v.target, m)
}
return
}
func (v *singleMarkerObject) MarshalBinary() (data []byte, err error) {
return []byte{byte(v.target)}, nil
}
// The AMF0 null, please read @doc amf0_spec_121207.pdf, @page 6, @section 2.7 null Type
type null struct {
singleMarkerObject
}
func NewNull() *null {
v := null{}
v.singleMarkerObject = newSingleMarkerObject(markerNull)
return &v
}
// The AMF0 undefined, please read @doc amf0_spec_121207.pdf, @page 6, @section 2.8 undefined Type
type undefined struct {
singleMarkerObject
}
func NewUndefined() Amf0 {
v := undefined{}
v.singleMarkerObject = newSingleMarkerObject(markerUndefined)
return &v
}
// The AMF0 boolean, please read @doc amf0_spec_121207.pdf, @page 5, @section 2.3 Boolean Type
type Boolean bool
func NewBoolean(b bool) Amf0 {
v := Boolean(b)
return &v
}
func (v *Boolean) amf0Marker() marker {
return markerBoolean
}
func (v *Boolean) Size() int {
return int(2)
}
func (v *Boolean) UnmarshalBinary(data []byte) (err error) {
var p []byte
if p = data; len(p) < 2 {
return oe.Errorf("require 2 bytes only %v", len(p))
}
if m := marker(p[0]); m != markerBoolean {
return oe.Errorf("BOOL marker %v is illegal", m)
}
if p[1] == 0 {
*v = false
} else {
*v = true
}
return
}
func (v *Boolean) MarshalBinary() (data []byte, err error) {
var b byte
if *v {
b = 1
}
return []byte{byte(markerBoolean), b}, nil
}

File diff suppressed because it is too large Load Diff

@ -3,9 +3,11 @@ github.com/google/uuid
# github.com/ossrs/go-oryx-lib v0.0.8
## explicit
github.com/ossrs/go-oryx-lib/aac
github.com/ossrs/go-oryx-lib/amf0
github.com/ossrs/go-oryx-lib/errors
github.com/ossrs/go-oryx-lib/flv
github.com/ossrs/go-oryx-lib/logger
github.com/ossrs/go-oryx-lib/rtmp
# github.com/pion/datachannel v1.4.21
github.com/pion/datachannel
# github.com/pion/dtls/v2 v2.0.8

@ -8,6 +8,7 @@ The changelog for SRS.
## SRS 4.0 Changelog
* v4.0, 2021-10-10, For [#1641](https://github.com/ossrs/srs/issues/1641), Support RTMP publish and play regression test. v4.0.173
* v4.0, 2021-10-10, RTC: Change rtc.aac to discard by default. v4.0.172
* v4.0, 2021-10-10, Fix [#2304](https://github.com/ossrs/srs/issues/2304) Remove Push RTSP feature. v4.0.171
* v4.0, 2021-10-10, Fix [#2653](https://github.com/ossrs/srs/issues/2653) Remove HTTP RAW API. v4.0.170

@ -863,6 +863,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
SrsAutoFree(SrsPithyPrint, pprint);
// start isolate recv thread.
// TODO: FIXME: Pass the callback here.
if ((err = rtrd->start()) != srs_success) {
return srs_error_wrap(err, "rtmp: receive thread");
}

@ -9,6 +9,6 @@
#define VERSION_MAJOR 4
#define VERSION_MINOR 0
#define VERSION_REVISION 172
#define VERSION_REVISION 173
#endif

Loading…
Cancel
Save