mirror of https://github.com/ossrs/srs.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1793 lines
51 KiB
Go
1793 lines
51 KiB
Go
// Copyright (c) 2025 Winlin
|
|
//
|
|
// SPDX-License-Identifier: MIT
|
|
package rtmp
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"encoding"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"sync"
|
|
|
|
"srs-proxy/errors"
|
|
)
|
|
|
|
// The handshake implements the RTMP handshake protocol.
|
|
type Handshake struct {
|
|
// The random number generator.
|
|
r *rand.Rand
|
|
// The c1s1 cache.
|
|
c1s1 []byte
|
|
}
|
|
|
|
func NewHandshake(r *rand.Rand) *Handshake {
|
|
return &Handshake{r: r}
|
|
}
|
|
|
|
func (v *Handshake) C1S1() []byte {
|
|
return v.c1s1
|
|
}
|
|
|
|
func (v *Handshake) WriteC0S0(w io.Writer) (err error) {
|
|
r := bytes.NewReader([]byte{0x03})
|
|
if _, err = io.Copy(w, r); err != nil {
|
|
return errors.Wrap(err, "write c0s0")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Handshake) ReadC0S0(r io.Reader) (c0 []byte, err error) {
|
|
b := &bytes.Buffer{}
|
|
if _, err = io.CopyN(b, r, 1); err != nil {
|
|
return nil, errors.Wrap(err, "read c0s0")
|
|
}
|
|
|
|
c0 = b.Bytes()
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Handshake) WriteC1S1(w io.Writer) (err error) {
|
|
p := make([]byte, 1536)
|
|
|
|
for i := 8; i < len(p); i++ {
|
|
p[i] = byte(v.r.Int())
|
|
}
|
|
|
|
r := bytes.NewReader(p)
|
|
if _, err = io.Copy(w, r); err != nil {
|
|
return errors.Wrap(err, "write c0s1")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Handshake) ReadC1S1(r io.Reader) (c1s1 []byte, err error) {
|
|
b := &bytes.Buffer{}
|
|
if _, err = io.CopyN(b, r, 1536); err != nil {
|
|
return nil, errors.Wrap(err, "read c1s1")
|
|
}
|
|
|
|
c1s1 = b.Bytes()
|
|
v.c1s1 = c1s1
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Handshake) WriteC2S2(w io.Writer, s1c1 []byte) (err error) {
|
|
r := bytes.NewReader(s1c1[:])
|
|
if _, err = io.Copy(w, r); err != nil {
|
|
return errors.Wrap(err, "write c2s2")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Handshake) ReadC2S2(r io.Reader) (c2 []byte, err error) {
|
|
b := &bytes.Buffer{}
|
|
if _, err = io.CopyN(b, r, 1536); err != nil {
|
|
return nil, errors.Wrap(err, "read c2s2")
|
|
}
|
|
|
|
c2 = b.Bytes()
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 16, @section 6.1. Chunk Format
|
|
// Extended timestamp: 0 or 4 bytes
|
|
// This field MUST be sent when the normal timsestamp is set to
|
|
// 0xffffff, it MUST NOT be sent if the normal timestamp is set to
|
|
// anything else. So for values less than 0xffffff the normal
|
|
// timestamp field SHOULD be used in which case the extended timestamp
|
|
// MUST NOT be present. For values greater than or equal to 0xffffff
|
|
// the normal timestamp field MUST NOT be used and MUST be set to
|
|
// 0xffffff and the extended timestamp MUST be sent.
|
|
const extendedTimestamp = uint64(0xffffff)
|
|
|
|
// The default chunk size of RTMP is 128 bytes.
|
|
const defaultChunkSize = 128
|
|
|
|
// The intput or output settings for RTMP protocol.
|
|
type settings struct {
|
|
chunkSize uint32
|
|
}
|
|
|
|
func newSettings() *settings {
|
|
return &settings{
|
|
chunkSize: defaultChunkSize,
|
|
}
|
|
}
|
|
|
|
// The chunk stream which transport a message once.
|
|
type chunkStream struct {
|
|
format formatType
|
|
cid chunkID
|
|
header messageHeader
|
|
message *Message
|
|
count uint64
|
|
extendedTimestamp bool
|
|
}
|
|
|
|
func newChunkStream() *chunkStream {
|
|
return &chunkStream{}
|
|
}
|
|
|
|
// The protocol implements the RTMP command and chunk stack.
|
|
type Protocol struct {
|
|
r *bufio.Reader
|
|
w *bufio.Writer
|
|
input struct {
|
|
opt *settings
|
|
chunks map[chunkID]*chunkStream
|
|
|
|
transactions map[amf0Number]amf0String
|
|
ltransactions sync.Mutex
|
|
}
|
|
output struct {
|
|
opt *settings
|
|
}
|
|
}
|
|
|
|
func NewProtocol(rw io.ReadWriter) *Protocol {
|
|
v := &Protocol{
|
|
r: bufio.NewReader(rw),
|
|
w: bufio.NewWriter(rw),
|
|
}
|
|
|
|
v.input.opt = newSettings()
|
|
v.input.chunks = map[chunkID]*chunkStream{}
|
|
v.input.transactions = map[amf0Number]amf0String{}
|
|
|
|
v.output.opt = newSettings()
|
|
|
|
return v
|
|
}
|
|
|
|
func ExpectPacket[T Packet](ctx context.Context, v *Protocol, ppkt *T) (m *Message, err error) {
|
|
for {
|
|
if m, err = v.ReadMessage(ctx); err != nil {
|
|
return nil, errors.WithMessage(err, "read message")
|
|
}
|
|
|
|
var pkt Packet
|
|
if pkt, err = v.DecodeMessage(m); err != nil {
|
|
return nil, errors.WithMessage(err, "decode message")
|
|
}
|
|
|
|
if p, ok := pkt.(T); ok {
|
|
*ppkt = p
|
|
break
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Deprecated: Please use rtmp.ExpectPacket instead.
|
|
func (v *Protocol) ExpectPacket(ctx context.Context, ppkt any) (m *Message, err error) {
|
|
panic("Please use rtmp.ExpectPacket instead")
|
|
}
|
|
|
|
func (v *Protocol) ExpectMessage(ctx context.Context, types ...MessageType) (m *Message, err error) {
|
|
for {
|
|
if m, err = v.ReadMessage(ctx); err != nil {
|
|
return nil, errors.WithMessage(err, "read message")
|
|
}
|
|
|
|
if len(types) == 0 {
|
|
return
|
|
}
|
|
|
|
for _, t := range types {
|
|
if m.MessageType == t {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Protocol) parseAMFObject(p []byte) (pkt Packet, err error) {
|
|
var commandName amf0String
|
|
if err = commandName.UnmarshalBinary(p); err != nil {
|
|
return nil, errors.WithMessage(err, "unmarshal command name")
|
|
}
|
|
|
|
switch commandName {
|
|
case commandResult, commandError:
|
|
var transactionID amf0Number
|
|
if err = transactionID.UnmarshalBinary(p[commandName.Size():]); err != nil {
|
|
return nil, errors.WithMessage(err, "unmarshal tid")
|
|
}
|
|
|
|
var requestName amf0String
|
|
if err = func() error {
|
|
v.input.ltransactions.Lock()
|
|
defer v.input.ltransactions.Unlock()
|
|
|
|
var ok bool
|
|
if requestName, ok = v.input.transactions[transactionID]; !ok {
|
|
return errors.Errorf("No matched request for tid=%v", transactionID)
|
|
}
|
|
delete(v.input.transactions, transactionID)
|
|
|
|
return nil
|
|
}(); err != nil {
|
|
return nil, errors.WithMessage(err, "discovery request name")
|
|
}
|
|
|
|
switch requestName {
|
|
case commandConnect:
|
|
return NewConnectAppResPacket(transactionID), nil
|
|
case commandCreateStream:
|
|
return NewCreateStreamResPacket(transactionID), nil
|
|
case commandReleaseStream, commandFCPublish, commandFCUnpublish:
|
|
call := NewCallPacket()
|
|
call.TransactionID = transactionID
|
|
return call, nil
|
|
default:
|
|
return nil, errors.Errorf("No request for %v", string(requestName))
|
|
}
|
|
case commandConnect:
|
|
return NewConnectAppPacket(), nil
|
|
case commandPublish:
|
|
return NewPublishPacket(), nil
|
|
case commandPlay:
|
|
return NewPlayPacket(), nil
|
|
default:
|
|
return NewCallPacket(), nil
|
|
}
|
|
}
|
|
|
|
func (v *Protocol) DecodeMessage(m *Message) (pkt Packet, err error) {
|
|
p := m.Payload[:]
|
|
if len(p) == 0 {
|
|
return nil, errors.New("Empty packet")
|
|
}
|
|
|
|
switch m.MessageType {
|
|
case MessageTypeAMF3Command, MessageTypeAMF3Data:
|
|
p = p[1:]
|
|
}
|
|
|
|
switch m.MessageType {
|
|
case MessageTypeSetChunkSize:
|
|
pkt = NewSetChunkSize()
|
|
case MessageTypeWindowAcknowledgementSize:
|
|
pkt = NewWindowAcknowledgementSize()
|
|
case MessageTypeSetPeerBandwidth:
|
|
pkt = NewSetPeerBandwidth()
|
|
case MessageTypeAMF0Command, MessageTypeAMF3Command, MessageTypeAMF0Data, MessageTypeAMF3Data:
|
|
if pkt, err = v.parseAMFObject(p); err != nil {
|
|
return nil, errors.WithMessage(err, fmt.Sprintf("Parse AMF %v", m.MessageType))
|
|
}
|
|
case MessageTypeUserControl:
|
|
pkt = NewUserControl()
|
|
default:
|
|
return nil, errors.Errorf("Unknown message %v", m.MessageType)
|
|
}
|
|
|
|
if err = pkt.UnmarshalBinary(p); err != nil {
|
|
return nil, errors.WithMessage(err, fmt.Sprintf("Unmarshal %v", m.MessageType))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Protocol) ReadMessage(ctx context.Context) (m *Message, err error) {
|
|
for m == nil {
|
|
// TODO: We should convert buffered io to async io, because we will be stuck in block io here,
|
|
// TODO: but the risk is acceptable because we literally will set the underlay io timeout.
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
|
|
var cid chunkID
|
|
var format formatType
|
|
if format, cid, err = v.readBasicHeader(ctx); err != nil {
|
|
return nil, errors.WithMessage(err, "read basic header")
|
|
}
|
|
|
|
var ok bool
|
|
var chunk *chunkStream
|
|
if chunk, ok = v.input.chunks[cid]; !ok {
|
|
chunk = newChunkStream()
|
|
v.input.chunks[cid] = chunk
|
|
chunk.header.betterCid = cid
|
|
}
|
|
|
|
if err = v.readMessageHeader(ctx, chunk, format); err != nil {
|
|
return nil, errors.WithMessage(err, "read message header")
|
|
}
|
|
|
|
if m, err = v.readMessagePayload(ctx, chunk); err != nil {
|
|
return nil, errors.WithMessage(err, "read message payload")
|
|
}
|
|
|
|
if err = v.onMessageArrivated(m); err != nil {
|
|
return nil, errors.WithMessage(err, "on message")
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Protocol) readMessagePayload(ctx context.Context, chunk *chunkStream) (m *Message, err error) {
|
|
// Empty payload message.
|
|
if chunk.message.payloadLength == 0 {
|
|
m = chunk.message
|
|
chunk.message = nil
|
|
return
|
|
}
|
|
|
|
// Calculate the chunk payload size.
|
|
chunkedPayloadSize := int(chunk.message.payloadLength) - len(chunk.message.Payload)
|
|
if chunkedPayloadSize > int(v.input.opt.chunkSize) {
|
|
chunkedPayloadSize = int(v.input.opt.chunkSize)
|
|
}
|
|
|
|
b := make([]byte, chunkedPayloadSize)
|
|
if _, err = io.ReadFull(v.r, b); err != nil {
|
|
return nil, errors.Wrapf(err, "read chunk %vB", chunkedPayloadSize)
|
|
}
|
|
chunk.message.Payload = append(chunk.message.Payload, b...)
|
|
|
|
// Got entire RTMP message?
|
|
if int(chunk.message.payloadLength) == len(chunk.message.Payload) {
|
|
m = chunk.message
|
|
chunk.message = nil
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 18, @section 6.1.2. Chunk Message Header
|
|
// There are four different formats for the chunk message header,
|
|
// selected by the "fmt" field in the chunk basic header.
|
|
type formatType uint8
|
|
|
|
const (
|
|
// 6.1.2.1. Type 0
|
|
// Chunks of Type 0 are 11 bytes long. This type MUST be used at the
|
|
// start of a chunk stream, and whenever the stream timestamp goes
|
|
// backward (e.g., because of a backward seek).
|
|
formatType0 formatType = iota
|
|
// 6.1.2.2. Type 1
|
|
// Chunks of Type 1 are 7 bytes long. The message stream ID is not
|
|
// included; this chunk takes the same stream ID as the preceding chunk.
|
|
// Streams with variable-sized messages (for example, many video
|
|
// formats) SHOULD use this format for the first chunk of each new
|
|
// message after the first.
|
|
formatType1
|
|
// 6.1.2.3. Type 2
|
|
// Chunks of Type 2 are 3 bytes long. Neither the stream ID nor the
|
|
// message length is included; this chunk has the same stream ID and
|
|
// message length as the preceding chunk. Streams with constant-sized
|
|
// messages (for example, some audio and data formats) SHOULD use this
|
|
// format for the first chunk of each message after the first.
|
|
formatType2
|
|
// 6.1.2.4. Type 3
|
|
// Chunks of Type 3 have no header. Stream ID, message length and
|
|
// timestamp delta are not present; chunks of this type take values from
|
|
// the preceding chunk. When a single message is split into chunks, all
|
|
// chunks of a message except the first one, SHOULD use this type. Refer
|
|
// to example 2 in section 6.2.2. Stream consisting of messages of
|
|
// exactly the same size, stream ID and spacing in time SHOULD use this
|
|
// type for all chunks after chunk of Type 2. Refer to example 1 in
|
|
// section 6.2.1. If the delta between the first message and the second
|
|
// message is same as the time stamp of first message, then chunk of
|
|
// type 3 would immediately follow the chunk of type 0 as there is no
|
|
// need for a chunk of type 2 to register the delta. If Type 3 chunk
|
|
// follows a Type 0 chunk, then timestamp delta for this Type 3 chunk is
|
|
// the same as the timestamp of Type 0 chunk.
|
|
formatType3
|
|
)
|
|
|
|
// The message header size, index is format.
|
|
var messageHeaderSizes = []int{11, 7, 3, 0}
|
|
|
|
// Parse the chunk message header.
|
|
// 3bytes: timestamp delta, fmt=0,1,2
|
|
// 3bytes: payload length, fmt=0,1
|
|
// 1bytes: message type, fmt=0,1
|
|
// 4bytes: stream id, fmt=0
|
|
// where:
|
|
// fmt=0, 0x0X
|
|
// fmt=1, 0x4X
|
|
// fmt=2, 0x8X
|
|
// fmt=3, 0xCX
|
|
func (v *Protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, format formatType) (err error) {
|
|
// We should not assert anything about fmt, for the first packet.
|
|
// (when first packet, the chunk.message is nil).
|
|
// the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet.
|
|
// the previous packet is:
|
|
// 04 // fmt=0, cid=4
|
|
// 00 00 1a // timestamp=26
|
|
// 00 00 9d // payload_length=157
|
|
// 08 // message_type=8(audio)
|
|
// 01 00 00 00 // stream_id=1
|
|
// the current packet maybe:
|
|
// c4 // fmt=3, cid=4
|
|
// it's ok, for the packet is audio, and timestamp delta is 26.
|
|
// the current packet must be parsed as:
|
|
// fmt=0, cid=4
|
|
// timestamp=26+26=52
|
|
// payload_length=157
|
|
// message_type=8(audio)
|
|
// stream_id=1
|
|
// so we must update the timestamp even fmt=3 for first packet.
|
|
//
|
|
// The fresh packet used to update the timestamp even fmt=3 for first packet.
|
|
// fresh packet always means the chunk is the first one of message.
|
|
var isFirstChunkOfMsg bool
|
|
if chunk.message == nil {
|
|
isFirstChunkOfMsg = true
|
|
}
|
|
|
|
// But, we can ensure that when a chunk stream is fresh,
|
|
// the fmt must be 0, a new stream.
|
|
if chunk.count == 0 && format != formatType0 {
|
|
// For librtmp, if ping, it will send a fresh stream with fmt=1,
|
|
// 0x42 where: fmt=1, cid=2, protocol contorl user-control message
|
|
// 0x00 0x00 0x00 where: timestamp=0
|
|
// 0x00 0x00 0x06 where: payload_length=6
|
|
// 0x04 where: message_type=4(protocol control user-control message)
|
|
// 0x00 0x06 where: event Ping(0x06)
|
|
// 0x00 0x00 0x0d 0x0f where: event data 4bytes ping timestamp.
|
|
// @see: https://github.com/ossrs/srs/issues/98
|
|
if chunk.cid == chunkIDProtocolControl && format == formatType1 {
|
|
// We accept cid=2, fmt=1 to make librtmp happy.
|
|
} else {
|
|
return errors.Errorf("For fresh chunk, fmt %v != %v(required), cid is %v", format, formatType0, chunk.cid)
|
|
}
|
|
}
|
|
|
|
// When exists cache msg, means got an partial message,
|
|
// the fmt must not be type0 which means new message.
|
|
if chunk.message != nil && format == formatType0 {
|
|
return errors.Errorf("For exists chunk, fmt is %v, cid is %v", format, chunk.cid)
|
|
}
|
|
|
|
// Create msg when new chunk stream start
|
|
if chunk.message == nil {
|
|
chunk.message = NewMessage()
|
|
}
|
|
|
|
// Read the message header.
|
|
p := make([]byte, messageHeaderSizes[format])
|
|
if _, err = io.ReadFull(v.r, p); err != nil {
|
|
return errors.Wrapf(err, "read %vB message header", len(p))
|
|
}
|
|
|
|
// Prse the message header.
|
|
// 3bytes: timestamp delta, fmt=0,1,2
|
|
// 3bytes: payload length, fmt=0,1
|
|
// 1bytes: message type, fmt=0,1
|
|
// 4bytes: stream id, fmt=0
|
|
// where:
|
|
// fmt=0, 0x0X
|
|
// fmt=1, 0x4X
|
|
// fmt=2, 0x8X
|
|
// fmt=3, 0xCX
|
|
if format <= formatType2 {
|
|
chunk.header.timestampDelta = uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2])
|
|
p = p[3:]
|
|
|
|
// fmt: 0
|
|
// timestamp: 3 bytes
|
|
// If the timestamp is greater than or equal to 16777215
|
|
// (hexadecimal 0x00ffffff), this value MUST be 16777215, and the
|
|
// 'extended timestamp header' MUST be present. Otherwise, this value
|
|
// SHOULD be the entire timestamp.
|
|
//
|
|
// fmt: 1 or 2
|
|
// timestamp delta: 3 bytes
|
|
// If the delta is greater than or equal to 16777215 (hexadecimal
|
|
// 0x00ffffff), this value MUST be 16777215, and the 'extended
|
|
// timestamp header' MUST be present. Otherwise, this value SHOULD be
|
|
// the entire delta.
|
|
chunk.extendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp
|
|
if !chunk.extendedTimestamp {
|
|
// Extended timestamp: 0 or 4 bytes
|
|
// This field MUST be sent when the normal timsestamp is set to
|
|
// 0xffffff, it MUST NOT be sent if the normal timestamp is set to
|
|
// anything else. So for values less than 0xffffff the normal
|
|
// timestamp field SHOULD be used in which case the extended timestamp
|
|
// MUST NOT be present. For values greater than or equal to 0xffffff
|
|
// the normal timestamp field MUST NOT be used and MUST be set to
|
|
// 0xffffff and the extended timestamp MUST be sent.
|
|
if format == formatType0 {
|
|
// 6.1.2.1. Type 0
|
|
// For a type-0 chunk, the absolute timestamp of the message is sent
|
|
// here.
|
|
chunk.header.Timestamp = uint64(chunk.header.timestampDelta)
|
|
} else {
|
|
// 6.1.2.2. Type 1
|
|
// 6.1.2.3. Type 2
|
|
// For a type-1 or type-2 chunk, the difference between the previous
|
|
// chunk's timestamp and the current chunk's timestamp is sent here.
|
|
chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
|
|
}
|
|
}
|
|
|
|
if format <= formatType1 {
|
|
payloadLength := uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2])
|
|
p = p[3:]
|
|
|
|
// For a message, if msg exists in cache, the size must not changed.
|
|
// always use the actual msg size to compare, for the cache payload length can changed,
|
|
// for the fmt type1(stream_id not changed), user can change the payload
|
|
// length(it's not allowed in the continue chunks).
|
|
if !isFirstChunkOfMsg && chunk.header.payloadLength != payloadLength {
|
|
return errors.Errorf("Chunk message size %v != %v(required)", payloadLength, chunk.header.payloadLength)
|
|
}
|
|
chunk.header.payloadLength = payloadLength
|
|
|
|
chunk.header.MessageType = MessageType(p[0])
|
|
p = p[1:]
|
|
|
|
if format == formatType0 {
|
|
chunk.header.streamID = uint32(p[0]) | uint32(p[1])<<8 | uint32(p[2])<<16 | uint32(p[3])<<24
|
|
p = p[4:]
|
|
}
|
|
}
|
|
} else {
|
|
// Update the timestamp even fmt=3 for first chunk packet
|
|
if isFirstChunkOfMsg && !chunk.extendedTimestamp {
|
|
chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
|
|
}
|
|
}
|
|
|
|
// Read extended-timestamp
|
|
if chunk.extendedTimestamp {
|
|
var timestamp uint32
|
|
if err = binary.Read(v.r, binary.BigEndian, ×tamp); err != nil {
|
|
return errors.Wrapf(err, "read ext-ts, pkt-ts=%v", chunk.header.Timestamp)
|
|
}
|
|
|
|
// We always use 31bits timestamp, for some server may use 32bits extended timestamp.
|
|
// @see https://github.com/ossrs/srs/issues/111
|
|
timestamp &= 0x7fffffff
|
|
|
|
// TODO: FIXME: Support detect the extended timestamp.
|
|
// @see http://blog.csdn.net/win_lin/article/details/13363699
|
|
chunk.header.Timestamp = uint64(timestamp)
|
|
}
|
|
|
|
// The extended-timestamp must be unsigned-int,
|
|
// 24bits timestamp: 0xffffff = 16777215ms = 16777.215s = 4.66h
|
|
// 32bits timestamp: 0xffffffff = 4294967295ms = 4294967.295s = 1193.046h = 49.71d
|
|
// because the rtmp protocol says the 32bits timestamp is about "50 days":
|
|
// 3. Byte Order, Alignment, and Time Format
|
|
// Because timestamps are generally only 32 bits long, they will roll
|
|
// over after fewer than 50 days.
|
|
//
|
|
// but, its sample says the timestamp is 31bits:
|
|
// An application could assume, for example, that all
|
|
// adjacent timestamps are within 2^31 milliseconds of each other, so
|
|
// 10000 comes after 4000000000, while 3000000000 comes before
|
|
// 4000000000.
|
|
// and flv specification says timestamp is 31bits:
|
|
// Extension of the Timestamp field to form a SI32 value. This
|
|
// field represents the upper 8 bits, while the previous
|
|
// Timestamp field represents the lower 24 bits of the time in
|
|
// milliseconds.
|
|
// in a word, 31bits timestamp is ok.
|
|
// convert extended timestamp to 31bits.
|
|
chunk.header.Timestamp &= 0x7fffffff
|
|
|
|
// Copy header to msg
|
|
chunk.message.messageHeader = chunk.header
|
|
|
|
// Increase the msg count, the chunk stream can accept fmt=1/2/3 message now.
|
|
chunk.count++
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 17, @section 6.1.1. Chunk Basic Header
|
|
// The Chunk Basic Header encodes the chunk stream ID and the chunk
|
|
// type(represented by fmt field in the figure below). Chunk type
|
|
// determines the format of the encoded message header. Chunk Basic
|
|
// Header field may be 1, 2, or 3 bytes, depending on the chunk stream
|
|
// ID.
|
|
//
|
|
// The bits 0-5 (least significant) in the chunk basic header represent
|
|
// the chunk stream ID.
|
|
//
|
|
// Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
|
|
// field.
|
|
// 0 1 2 3 4 5 6 7
|
|
// +-+-+-+-+-+-+-+-+
|
|
// |fmt| cs id |
|
|
// +-+-+-+-+-+-+-+-+
|
|
// Figure 6 Chunk basic header 1
|
|
//
|
|
// Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
|
|
// field. ID is computed as (the second byte + 64).
|
|
// 0 1
|
|
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
|
|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
// |fmt| 0 | cs id - 64 |
|
|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
// Figure 7 Chunk basic header 2
|
|
//
|
|
// Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
|
|
// this field. ID is computed as ((the third byte)*256 + the second byte
|
|
// + 64).
|
|
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
|
|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
// |fmt| 1 | cs id - 64 |
|
|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
// Figure 8 Chunk basic header 3
|
|
//
|
|
// cs id: 6 bits
|
|
// fmt: 2 bits
|
|
// cs id - 64: 8 or 16 bits
|
|
//
|
|
// Chunk stream IDs with values 64-319 could be represented by both 2-
|
|
// byte version and 3-byte version of this field.
|
|
func (v *Protocol) readBasicHeader(ctx context.Context) (format formatType, cid chunkID, err error) {
|
|
// 2-63, 1B chunk header
|
|
var t uint8
|
|
if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
|
|
return format, cid, errors.Wrap(err, "read basic header")
|
|
}
|
|
cid = chunkID(t & 0x3f)
|
|
format = formatType((t >> 6) & 0x03)
|
|
|
|
if cid > 1 {
|
|
return
|
|
}
|
|
|
|
// 64-319, 2B chunk header
|
|
if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
|
|
return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid)
|
|
}
|
|
cid = chunkID(64 + uint32(t))
|
|
|
|
// 64-65599, 3B chunk header
|
|
if cid == 1 {
|
|
if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
|
|
return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid)
|
|
}
|
|
cid += chunkID(uint32(t) * 256)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Protocol) WritePacket(ctx context.Context, pkt Packet, streamID int) (err error) {
|
|
m := NewMessage()
|
|
|
|
if m.Payload, err = pkt.MarshalBinary(); err != nil {
|
|
return errors.WithMessage(err, "marshal payload")
|
|
}
|
|
|
|
m.MessageType = pkt.Type()
|
|
m.streamID = uint32(streamID)
|
|
m.betterCid = pkt.BetterCid()
|
|
|
|
if err = v.WriteMessage(ctx, m); err != nil {
|
|
return errors.WithMessage(err, "write message")
|
|
}
|
|
|
|
if err = v.onPacketWriten(m, pkt); err != nil {
|
|
return errors.WithMessage(err, "on write packet")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Protocol) onPacketWriten(m *Message, pkt Packet) (err error) {
|
|
var tid amf0Number
|
|
var name amf0String
|
|
|
|
switch pkt := pkt.(type) {
|
|
case *ConnectAppPacket:
|
|
tid, name = pkt.TransactionID, pkt.CommandName
|
|
case *CreateStreamPacket:
|
|
tid, name = pkt.TransactionID, pkt.CommandName
|
|
case *CallPacket:
|
|
tid, name = pkt.TransactionID, pkt.CommandName
|
|
}
|
|
|
|
if tid > 0 && len(name) > 0 {
|
|
v.input.ltransactions.Lock()
|
|
defer v.input.ltransactions.Unlock()
|
|
|
|
v.input.transactions[tid] = name
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Protocol) onMessageArrivated(m *Message) (err error) {
|
|
if m == nil {
|
|
return
|
|
}
|
|
|
|
var pkt Packet
|
|
switch m.MessageType {
|
|
case MessageTypeSetChunkSize, MessageTypeUserControl, MessageTypeWindowAcknowledgementSize:
|
|
if pkt, err = v.DecodeMessage(m); err != nil {
|
|
return errors.Errorf("decode message %v", m.MessageType)
|
|
}
|
|
}
|
|
|
|
switch pkt := pkt.(type) {
|
|
case *SetChunkSize:
|
|
v.input.opt.chunkSize = pkt.ChunkSize
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *Protocol) WriteMessage(ctx context.Context, m *Message) (err error) {
|
|
m.payloadLength = uint32(len(m.Payload))
|
|
|
|
var c0h, c3h []byte
|
|
if c0h, err = m.generateC0Header(); err != nil {
|
|
return errors.WithMessage(err, "generate c0 header")
|
|
}
|
|
if c3h, err = m.generateC3Header(); err != nil {
|
|
return errors.WithMessage(err, "generate c3 header")
|
|
}
|
|
|
|
var h []byte
|
|
p := m.Payload
|
|
for len(p) > 0 {
|
|
// TODO: We should convert buffered io to async io, because we will be stuck in block io here,
|
|
// TODO: but the risk is acceptable because we literally will set the underlay io timeout.
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
if h == nil {
|
|
h = c0h
|
|
} else {
|
|
h = c3h
|
|
}
|
|
|
|
if _, err = io.Copy(v.w, bytes.NewReader(h)); err != nil {
|
|
return errors.Wrapf(err, "write c0c3 header %x", h)
|
|
}
|
|
|
|
size := len(p)
|
|
if size > int(v.output.opt.chunkSize) {
|
|
size = int(v.output.opt.chunkSize)
|
|
}
|
|
|
|
if _, err = io.Copy(v.w, bytes.NewReader(p[:size])); err != nil {
|
|
return errors.Wrapf(err, "write chunk payload %vB", size)
|
|
}
|
|
p = p[size:]
|
|
}
|
|
|
|
// TODO: We should convert buffered io to async io, because we will be stuck in block io here,
|
|
// TODO: but the risk is acceptable because we literally will set the underlay io timeout.
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
// TODO: FIXME: Use writev to write for high performance.
|
|
if err = v.w.Flush(); err != nil {
|
|
return errors.Wrapf(err, "flush writer")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 30, @section 4.1. Message Header
|
|
// 1byte. One byte field to represent the message type. A range of type IDs
|
|
// (1-7) are reserved for protocol control messages.
|
|
type MessageType uint8
|
|
|
|
const (
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 30, @section 5. Protocol Control Messages
|
|
// RTMP reserves message type IDs 1-7 for protocol control messages.
|
|
// These messages contain information needed by the RTM Chunk Stream
|
|
// protocol or RTMP itself. Protocol messages with IDs 1 & 2 are
|
|
// reserved for usage with RTM Chunk Stream protocol. Protocol messages
|
|
// with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID
|
|
// 7 is used between edge server and origin server.
|
|
MessageTypeSetChunkSize MessageType = 0x01
|
|
MessageTypeAbort MessageType = 0x02 // 0x02
|
|
MessageTypeAcknowledgement MessageType = 0x03 // 0x03
|
|
MessageTypeUserControl MessageType = 0x04 // 0x04
|
|
MessageTypeWindowAcknowledgementSize MessageType = 0x05 // 0x05
|
|
MessageTypeSetPeerBandwidth MessageType = 0x06 // 0x06
|
|
MessageTypeEdgeAndOriginServerCommand MessageType = 0x07 // 0x07
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3. Types of messages
|
|
// The server and the client send messages over the network to
|
|
// communicate with each other. The messages can be of any type which
|
|
// includes audio messages, video messages, command messages, shared
|
|
// object messages, data messages, and user control messages.
|
|
//
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 41, @section 3.4. Audio message
|
|
// The client or the server sends this message to send audio data to the
|
|
// peer. The message type value of 8 is reserved for audio messages.
|
|
MessageTypeAudio MessageType = 0x08
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 41, @section 3.5. Video message
|
|
// The client or the server sends this message to send video data to the
|
|
// peer. The message type value of 9 is reserved for video messages.
|
|
// These messages are large and can delay the sending of other type of
|
|
// messages. To avoid such a situation, the video message is assigned
|
|
// the lowest priority.
|
|
MessageTypeVideo MessageType = 0x09 // 0x09
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3.1. Command message
|
|
// Command messages carry the AMF-encoded commands between the client
|
|
// and the server. These messages have been assigned message type value
|
|
// of 20 for AMF0 encoding and message type value of 17 for AMF3
|
|
// encoding. These messages are sent to perform some operations like
|
|
// connect, createStream, publish, play, pause on the peer. Command
|
|
// messages like onstatus, result etc. are used to inform the sender
|
|
// about the status of the requested commands. A command message
|
|
// consists of command name, transaction ID, and command object that
|
|
// contains related parameters. A client or a server can request Remote
|
|
// Procedure Calls (RPC) over streams that are communicated using the
|
|
// command messages to the peer.
|
|
MessageTypeAMF3Command MessageType = 17 // 0x11
|
|
MessageTypeAMF0Command MessageType = 20 // 0x14
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3.2. Data message
|
|
// The client or the server sends this message to send Metadata or any
|
|
// user data to the peer. Metadata includes details about the
|
|
// data(audio, video etc.) like creation time, duration, theme and so
|
|
// on. These messages have been assigned message type value of 18 for
|
|
// AMF0 and message type value of 15 for AMF3.
|
|
MessageTypeAMF0Data MessageType = 18 // 0x12
|
|
MessageTypeAMF3Data MessageType = 15 // 0x0f
|
|
)
|
|
|
|
// The header of message.
|
|
type messageHeader struct {
|
|
// 3bytes.
|
|
// Three-byte field that contains a timestamp delta of the message.
|
|
// @remark, only used for decoding message from chunk stream.
|
|
timestampDelta uint32
|
|
// 3bytes.
|
|
// Three-byte field that represents the size of the payload in bytes.
|
|
// It is set in big-endian format.
|
|
payloadLength uint32
|
|
// 1byte.
|
|
// One byte field to represent the message type. A range of type IDs
|
|
// (1-7) are reserved for protocol control messages.
|
|
MessageType MessageType
|
|
// 4bytes.
|
|
// Four-byte field that identifies the stream of the message. These
|
|
// bytes are set in little-endian format.
|
|
streamID uint32
|
|
|
|
// The chunk stream id over which transport.
|
|
betterCid chunkID
|
|
|
|
// Four-byte field that contains a timestamp of the message.
|
|
// The 4 bytes are packed in the big-endian order.
|
|
// @remark, we use 64bits for large time for jitter detect and for large tbn like HLS.
|
|
Timestamp uint64
|
|
}
|
|
|
|
// The RTMP message, transport over chunk stream in RTMP.
|
|
// Please read the cs id of @doc rtmp_specification_1.0.pdf, @page 30, @section 4.1. Message Header
|
|
type Message struct {
|
|
messageHeader
|
|
|
|
// The payload which carries the RTMP packet.
|
|
Payload []byte
|
|
}
|
|
|
|
func NewMessage() *Message {
|
|
return &Message{}
|
|
}
|
|
|
|
func NewStreamMessage(streamID int) *Message {
|
|
v := NewMessage()
|
|
v.streamID = uint32(streamID)
|
|
v.betterCid = chunkIDOverStream
|
|
return v
|
|
}
|
|
|
|
func (v *Message) generateC3Header() ([]byte, error) {
|
|
var c3h []byte
|
|
if v.Timestamp < extendedTimestamp {
|
|
c3h = make([]byte, 1)
|
|
} else {
|
|
c3h = make([]byte, 1+4)
|
|
}
|
|
|
|
p := c3h
|
|
p[0] = 0xc0 | byte(v.betterCid&0x3f)
|
|
p = p[1:]
|
|
|
|
// In RTMP protocol, there must not any timestamp in C3 header,
|
|
// but actually all products from adobe, such as FMS/AMS and Flash player and FMLE,
|
|
// always carry a extended timestamp in C3 header.
|
|
// @see: http://blog.csdn.net/win_lin/article/details/13363699
|
|
if v.Timestamp >= extendedTimestamp {
|
|
p[0] = byte(v.Timestamp >> 24)
|
|
p[1] = byte(v.Timestamp >> 16)
|
|
p[2] = byte(v.Timestamp >> 8)
|
|
p[3] = byte(v.Timestamp)
|
|
}
|
|
|
|
return c3h, nil
|
|
}
|
|
|
|
func (v *Message) generateC0Header() ([]byte, error) {
|
|
var c0h []byte
|
|
if v.Timestamp < extendedTimestamp {
|
|
c0h = make([]byte, 1+3+3+1+4)
|
|
} else {
|
|
c0h = make([]byte, 1+3+3+1+4+4)
|
|
}
|
|
|
|
p := c0h
|
|
p[0] = byte(v.betterCid) & 0x3f
|
|
p = p[1:]
|
|
|
|
if v.Timestamp < extendedTimestamp {
|
|
p[0] = byte(v.Timestamp >> 16)
|
|
p[1] = byte(v.Timestamp >> 8)
|
|
p[2] = byte(v.Timestamp)
|
|
} else {
|
|
p[0] = 0xff
|
|
p[1] = 0xff
|
|
p[2] = 0xff
|
|
}
|
|
p = p[3:]
|
|
|
|
p[0] = byte(v.payloadLength >> 16)
|
|
p[1] = byte(v.payloadLength >> 8)
|
|
p[2] = byte(v.payloadLength)
|
|
p = p[3:]
|
|
|
|
p[0] = byte(v.MessageType)
|
|
p = p[1:]
|
|
|
|
p[0] = byte(v.streamID)
|
|
p[1] = byte(v.streamID >> 8)
|
|
p[2] = byte(v.streamID >> 16)
|
|
p[3] = byte(v.streamID >> 24)
|
|
p = p[4:]
|
|
|
|
if v.Timestamp >= extendedTimestamp {
|
|
p[0] = byte(v.Timestamp >> 24)
|
|
p[1] = byte(v.Timestamp >> 16)
|
|
p[2] = byte(v.Timestamp >> 8)
|
|
p[3] = byte(v.Timestamp)
|
|
}
|
|
|
|
return c0h, nil
|
|
}
|
|
|
|
// Please read the cs id of @doc rtmp_specification_1.0.pdf, @page 17, @section 6.1.1. Chunk Basic Header
|
|
type chunkID uint32
|
|
|
|
const (
|
|
chunkIDProtocolControl chunkID = 0x02
|
|
chunkIDOverConnection chunkID = 0x03
|
|
chunkIDOverConnection2 chunkID = 0x04
|
|
chunkIDOverStream chunkID = 0x05
|
|
chunkIDOverStream2 chunkID = 0x06
|
|
chunkIDVideo chunkID = 0x07
|
|
chunkIDAudio chunkID = 0x08
|
|
)
|
|
|
|
// The Command Name of message.
|
|
const (
|
|
commandConnect amf0String = amf0String("connect")
|
|
commandCreateStream amf0String = amf0String("createStream")
|
|
commandCloseStream amf0String = amf0String("closeStream")
|
|
commandPlay amf0String = amf0String("play")
|
|
commandPause amf0String = amf0String("pause")
|
|
commandOnBWDone amf0String = amf0String("onBWDone")
|
|
commandOnStatus amf0String = amf0String("onStatus")
|
|
commandResult amf0String = amf0String("_result")
|
|
commandError amf0String = amf0String("_error")
|
|
commandReleaseStream amf0String = amf0String("releaseStream")
|
|
commandFCPublish amf0String = amf0String("FCPublish")
|
|
commandFCUnpublish amf0String = amf0String("FCUnpublish")
|
|
commandPublish amf0String = amf0String("publish")
|
|
commandRtmpSampleAccess amf0String = amf0String("|RtmpSampleAccess")
|
|
)
|
|
|
|
// The RTMP packet, transport as payload of RTMP message.
|
|
type Packet interface {
|
|
// Marshaler and unmarshaler
|
|
Size() int
|
|
encoding.BinaryUnmarshaler
|
|
encoding.BinaryMarshaler
|
|
|
|
// RTMP protocol fields for each packet.
|
|
BetterCid() chunkID
|
|
Type() MessageType
|
|
}
|
|
|
|
// A Call packet, both object and args are AMF0 objects.
|
|
type objectCallPacket struct {
|
|
CommandName amf0String
|
|
TransactionID amf0Number
|
|
CommandObject *amf0Object
|
|
Args *amf0Object
|
|
}
|
|
|
|
func (v *objectCallPacket) BetterCid() chunkID {
|
|
return chunkIDOverConnection
|
|
}
|
|
|
|
func (v *objectCallPacket) Type() MessageType {
|
|
return MessageTypeAMF0Command
|
|
}
|
|
|
|
func (v *objectCallPacket) Size() int {
|
|
size := v.CommandName.Size() + v.TransactionID.Size() + v.CommandObject.Size()
|
|
if v.Args != nil {
|
|
size += v.Args.Size()
|
|
}
|
|
return size
|
|
}
|
|
|
|
func (v *objectCallPacket) UnmarshalBinary(data []byte) (err error) {
|
|
p := data
|
|
|
|
if err = v.CommandName.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal command name")
|
|
}
|
|
p = p[v.CommandName.Size():]
|
|
|
|
if err = v.TransactionID.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal tid")
|
|
}
|
|
p = p[v.TransactionID.Size():]
|
|
|
|
if err = v.CommandObject.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal command")
|
|
}
|
|
p = p[v.CommandObject.Size():]
|
|
|
|
if len(p) == 0 {
|
|
return
|
|
}
|
|
|
|
v.Args = NewAmf0Object()
|
|
if err = v.Args.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal args")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *objectCallPacket) MarshalBinary() (data []byte, err error) {
|
|
var pb []byte
|
|
if pb, err = v.CommandName.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal command name")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
if pb, err = v.TransactionID.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal tid")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
if pb, err = v.CommandObject.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal command object")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
if v.Args != nil {
|
|
if pb, err = v.Args.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal args")
|
|
}
|
|
data = append(data, pb...)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 45, @section 4.1.1. connect
|
|
// The client sends the connect command to the server to request
|
|
// connection to a server application instance.
|
|
type ConnectAppPacket struct {
|
|
objectCallPacket
|
|
}
|
|
|
|
func NewConnectAppPacket() *ConnectAppPacket {
|
|
v := &ConnectAppPacket{}
|
|
v.CommandName = commandConnect
|
|
v.CommandObject = NewAmf0Object()
|
|
v.TransactionID = amf0Number(1.0)
|
|
return v
|
|
}
|
|
|
|
func (v *ConnectAppPacket) UnmarshalBinary(data []byte) (err error) {
|
|
if err = v.objectCallPacket.UnmarshalBinary(data); err != nil {
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
}
|
|
|
|
if v.CommandName != commandConnect {
|
|
return errors.Errorf("Invalid command name %v", string(v.CommandName))
|
|
}
|
|
|
|
if v.TransactionID != 1.0 {
|
|
return errors.Errorf("Invalid transaction ID %v", float64(v.TransactionID))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *ConnectAppPacket) TcUrl() string {
|
|
if v.CommandObject != nil {
|
|
if v, ok := v.CommandObject.Get("tcUrl").(*amf0String); ok {
|
|
return string(*v)
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// The response for ConnectAppPacket.
|
|
type ConnectAppResPacket struct {
|
|
objectCallPacket
|
|
}
|
|
|
|
func NewConnectAppResPacket(tid amf0Number) *ConnectAppResPacket {
|
|
v := &ConnectAppResPacket{}
|
|
v.CommandName = commandResult
|
|
v.CommandObject = NewAmf0Object()
|
|
v.Args = NewAmf0Object()
|
|
v.TransactionID = tid
|
|
return v
|
|
}
|
|
|
|
func (v *ConnectAppResPacket) SrsID() string {
|
|
if v.Args != nil {
|
|
if v, ok := v.Args.Get("data").(*amf0EcmaArray); ok {
|
|
if v, ok := v.Get("srs_id").(*amf0String); ok {
|
|
return string(*v)
|
|
}
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (v *ConnectAppResPacket) UnmarshalBinary(data []byte) (err error) {
|
|
if err = v.objectCallPacket.UnmarshalBinary(data); err != nil {
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
}
|
|
|
|
if v.CommandName != commandResult {
|
|
return errors.Errorf("Invalid command name %v", string(v.CommandName))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// A Call object, command object is variant.
|
|
type variantCallPacket struct {
|
|
CommandName amf0String
|
|
TransactionID amf0Number
|
|
CommandObject amf0Any // object or null
|
|
}
|
|
|
|
func (v *variantCallPacket) BetterCid() chunkID {
|
|
return chunkIDOverConnection
|
|
}
|
|
|
|
func (v *variantCallPacket) Type() MessageType {
|
|
return MessageTypeAMF0Command
|
|
}
|
|
|
|
func (v *variantCallPacket) Size() int {
|
|
size := v.CommandName.Size() + v.TransactionID.Size()
|
|
|
|
if v.CommandObject != nil {
|
|
size += v.CommandObject.Size()
|
|
}
|
|
|
|
return size
|
|
}
|
|
|
|
func (v *variantCallPacket) UnmarshalBinary(data []byte) (err error) {
|
|
p := data
|
|
|
|
if err = v.CommandName.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal command name")
|
|
}
|
|
p = p[v.CommandName.Size():]
|
|
|
|
if err = v.TransactionID.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal tid")
|
|
}
|
|
p = p[v.TransactionID.Size():]
|
|
|
|
if len(p) > 0 {
|
|
if v.CommandObject, err = Amf0Discovery(p); err != nil {
|
|
return errors.WithMessage(err, "discovery command object")
|
|
}
|
|
if err = v.CommandObject.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal command object")
|
|
}
|
|
p = p[v.CommandObject.Size():]
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *variantCallPacket) MarshalBinary() (data []byte, err error) {
|
|
var pb []byte
|
|
if pb, err = v.CommandName.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal command name")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
if pb, err = v.TransactionID.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal tid")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
if v.CommandObject != nil {
|
|
if pb, err = v.CommandObject.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal command object")
|
|
}
|
|
data = append(data, pb...)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 51, @section 4.1.2. Call
|
|
// The call method of the NetConnection object runs remote procedure
|
|
// calls (RPC) at the receiving end. The called RPC name is passed as a
|
|
// parameter to the call command.
|
|
// @remark onStatus packet is a call packet.
|
|
type CallPacket struct {
|
|
variantCallPacket
|
|
Args amf0Any // optional or object or null
|
|
}
|
|
|
|
func NewCallPacket() *CallPacket {
|
|
return &CallPacket{}
|
|
}
|
|
|
|
func (v *CallPacket) ArgsCode() string {
|
|
if v.Args != nil {
|
|
if v, ok := v.Args.(*amf0Object); ok {
|
|
if code, ok := v.Get("code").(*amf0String); ok {
|
|
return string(*code)
|
|
}
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (v *CallPacket) Size() int {
|
|
size := v.variantCallPacket.Size()
|
|
|
|
if v.Args != nil {
|
|
size += v.Args.Size()
|
|
}
|
|
|
|
return size
|
|
}
|
|
|
|
func (v *CallPacket) UnmarshalBinary(data []byte) (err error) {
|
|
p := data
|
|
|
|
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
}
|
|
p = p[v.variantCallPacket.Size():]
|
|
|
|
if len(p) > 0 {
|
|
if v.Args, err = Amf0Discovery(p); err != nil {
|
|
return errors.WithMessage(err, "discovery args")
|
|
}
|
|
if err = v.Args.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal args")
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *CallPacket) MarshalBinary() (data []byte, err error) {
|
|
var pb []byte
|
|
if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal call")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
if v.Args != nil {
|
|
if pb, err = v.Args.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal args")
|
|
}
|
|
data = append(data, pb...)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 52, @section 4.1.3. createStream
|
|
// The client sends this command to the server to create a logical
|
|
// channel for message communication The publishing of audio, video, and
|
|
// metadata is carried out over stream channel created using the
|
|
// createStream command.
|
|
type CreateStreamPacket struct {
|
|
variantCallPacket
|
|
}
|
|
|
|
func NewCreateStreamPacket() *CreateStreamPacket {
|
|
v := &CreateStreamPacket{}
|
|
v.CommandName = commandCreateStream
|
|
v.TransactionID = amf0Number(2)
|
|
v.CommandObject = NewAmf0Null()
|
|
return v
|
|
}
|
|
|
|
// The response for create stream
|
|
type CreateStreamResPacket struct {
|
|
variantCallPacket
|
|
StreamID amf0Number
|
|
}
|
|
|
|
func NewCreateStreamResPacket(tid amf0Number) *CreateStreamResPacket {
|
|
v := &CreateStreamResPacket{}
|
|
v.CommandName = commandResult
|
|
v.TransactionID = tid
|
|
v.CommandObject = NewAmf0Null()
|
|
v.StreamID = 0
|
|
return v
|
|
}
|
|
|
|
func (v *CreateStreamResPacket) Size() int {
|
|
return v.variantCallPacket.Size() + v.StreamID.Size()
|
|
}
|
|
|
|
func (v *CreateStreamResPacket) UnmarshalBinary(data []byte) (err error) {
|
|
p := data
|
|
|
|
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
}
|
|
p = p[v.variantCallPacket.Size():]
|
|
|
|
if err = v.StreamID.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal sid")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *CreateStreamResPacket) MarshalBinary() (data []byte, err error) {
|
|
var pb []byte
|
|
if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal call")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
if pb, err = v.StreamID.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal sid")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 64, @section 4.2.6. Publish
|
|
type PublishPacket struct {
|
|
variantCallPacket
|
|
StreamName amf0String
|
|
StreamType amf0String
|
|
}
|
|
|
|
func NewPublishPacket() *PublishPacket {
|
|
v := &PublishPacket{}
|
|
v.CommandName = commandPublish
|
|
v.CommandObject = NewAmf0Null()
|
|
v.StreamType = "live"
|
|
return v
|
|
}
|
|
|
|
func (v *PublishPacket) Size() int {
|
|
return v.variantCallPacket.Size() + v.StreamName.Size() + v.StreamType.Size()
|
|
}
|
|
|
|
func (v *PublishPacket) UnmarshalBinary(data []byte) (err error) {
|
|
p := data
|
|
|
|
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
}
|
|
p = p[v.variantCallPacket.Size():]
|
|
|
|
if err = v.StreamName.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal stream name")
|
|
}
|
|
p = p[v.StreamName.Size():]
|
|
|
|
if err = v.StreamType.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal stream type")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *PublishPacket) MarshalBinary() (data []byte, err error) {
|
|
var pb []byte
|
|
if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal call")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
if pb, err = v.StreamName.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal stream name")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
if pb, err = v.StreamType.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal stream type")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 54, @section 4.2.1. play
|
|
type PlayPacket struct {
|
|
variantCallPacket
|
|
StreamName amf0String
|
|
}
|
|
|
|
func NewPlayPacket() *PlayPacket {
|
|
v := &PlayPacket{}
|
|
v.CommandName = commandPlay
|
|
v.CommandObject = NewAmf0Null()
|
|
return v
|
|
}
|
|
|
|
func (v *PlayPacket) Size() int {
|
|
return v.variantCallPacket.Size() + v.StreamName.Size()
|
|
}
|
|
|
|
func (v *PlayPacket) UnmarshalBinary(data []byte) (err error) {
|
|
p := data
|
|
|
|
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
}
|
|
p = p[v.variantCallPacket.Size():]
|
|
|
|
if err = v.StreamName.UnmarshalBinary(p); err != nil {
|
|
return errors.WithMessage(err, "unmarshal stream name")
|
|
}
|
|
p = p[v.StreamName.Size():]
|
|
|
|
return
|
|
}
|
|
|
|
func (v *PlayPacket) MarshalBinary() (data []byte, err error) {
|
|
var pb []byte
|
|
if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal call")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
if pb, err = v.StreamName.MarshalBinary(); err != nil {
|
|
return nil, errors.WithMessage(err, "marshal stream name")
|
|
}
|
|
data = append(data, pb...)
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 31, @section 5.1. Set Chunk Size
|
|
// Protocol control message 1, Set Chunk Size, is used to notify the
|
|
// peer about the new maximum chunk size.
|
|
type SetChunkSize struct {
|
|
ChunkSize uint32
|
|
}
|
|
|
|
func NewSetChunkSize() *SetChunkSize {
|
|
return &SetChunkSize{
|
|
ChunkSize: defaultChunkSize,
|
|
}
|
|
}
|
|
|
|
func (v *SetChunkSize) BetterCid() chunkID {
|
|
return chunkIDProtocolControl
|
|
}
|
|
|
|
func (v *SetChunkSize) Type() MessageType {
|
|
return MessageTypeSetChunkSize
|
|
}
|
|
|
|
func (v *SetChunkSize) Size() int {
|
|
return 4
|
|
}
|
|
|
|
func (v *SetChunkSize) UnmarshalBinary(data []byte) (err error) {
|
|
if len(data) < 4 {
|
|
return errors.Errorf("requires 4 only %v bytes, %x", len(data), data)
|
|
}
|
|
v.ChunkSize = binary.BigEndian.Uint32(data)
|
|
|
|
return
|
|
}
|
|
|
|
func (v *SetChunkSize) MarshalBinary() (data []byte, err error) {
|
|
data = make([]byte, 4)
|
|
binary.BigEndian.PutUint32(data, v.ChunkSize)
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.5. Window Acknowledgement Size (5)
|
|
// The client or the server sends this message to inform the peer which
|
|
// window size to use when sending acknowledgment.
|
|
type WindowAcknowledgementSize struct {
|
|
AckSize uint32
|
|
}
|
|
|
|
func NewWindowAcknowledgementSize() *WindowAcknowledgementSize {
|
|
return &WindowAcknowledgementSize{}
|
|
}
|
|
|
|
func (v *WindowAcknowledgementSize) BetterCid() chunkID {
|
|
return chunkIDProtocolControl
|
|
}
|
|
|
|
func (v *WindowAcknowledgementSize) Type() MessageType {
|
|
return MessageTypeWindowAcknowledgementSize
|
|
}
|
|
|
|
func (v *WindowAcknowledgementSize) Size() int {
|
|
return 4
|
|
}
|
|
|
|
func (v *WindowAcknowledgementSize) UnmarshalBinary(data []byte) (err error) {
|
|
if len(data) < 4 {
|
|
return errors.Errorf("requires 4 only %v bytes, %x", len(data), data)
|
|
}
|
|
v.AckSize = binary.BigEndian.Uint32(data)
|
|
|
|
return
|
|
}
|
|
|
|
func (v *WindowAcknowledgementSize) MarshalBinary() (data []byte, err error) {
|
|
data = make([]byte, 4)
|
|
binary.BigEndian.PutUint32(data, v.AckSize)
|
|
|
|
return
|
|
}
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.6. Set Peer Bandwidth (6)
|
|
// The sender can mark this message hard (0), soft (1), or dynamic (2)
|
|
// using the Limit type field.
|
|
type LimitType uint8
|
|
|
|
const (
|
|
LimitTypeHard LimitType = iota
|
|
LimitTypeSoft
|
|
LimitTypeDynamic
|
|
)
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.6. Set Peer Bandwidth (6)
|
|
// The client or the server sends this message to update the output
|
|
// bandwidth of the peer.
|
|
type SetPeerBandwidth struct {
|
|
Bandwidth uint32
|
|
LimitType LimitType
|
|
}
|
|
|
|
func NewSetPeerBandwidth() *SetPeerBandwidth {
|
|
return &SetPeerBandwidth{}
|
|
}
|
|
|
|
func (v *SetPeerBandwidth) BetterCid() chunkID {
|
|
return chunkIDProtocolControl
|
|
}
|
|
|
|
func (v *SetPeerBandwidth) Type() MessageType {
|
|
return MessageTypeSetPeerBandwidth
|
|
}
|
|
|
|
func (v *SetPeerBandwidth) Size() int {
|
|
return 4 + 1
|
|
}
|
|
|
|
func (v *SetPeerBandwidth) UnmarshalBinary(data []byte) (err error) {
|
|
if len(data) < 5 {
|
|
return errors.Errorf("requires 5 only %v bytes, %x", len(data), data)
|
|
}
|
|
v.Bandwidth = binary.BigEndian.Uint32(data)
|
|
v.LimitType = LimitType(data[4])
|
|
|
|
return
|
|
}
|
|
|
|
func (v *SetPeerBandwidth) MarshalBinary() (data []byte, err error) {
|
|
data = make([]byte, 5)
|
|
binary.BigEndian.PutUint32(data, v.Bandwidth)
|
|
data[4] = byte(v.LimitType)
|
|
|
|
return
|
|
}
|
|
|
|
type EventType uint16
|
|
|
|
const (
|
|
// Generally, 4bytes event-data
|
|
|
|
// The server sends this event to notify the client
|
|
// that a stream has become functional and can be
|
|
// used for communication. By default, this event
|
|
// is sent on ID 0 after the application connect
|
|
// command is successfully received from the
|
|
// client. The event data is 4-byte and represents
|
|
// The stream ID of the stream that became
|
|
// Functional.
|
|
EventTypeStreamBegin = 0x00
|
|
|
|
// The server sends this event to notify the client
|
|
// that the playback of data is over as requested
|
|
// on this stream. No more data is sent without
|
|
// issuing additional commands. The client discards
|
|
// The messages received for the stream. The
|
|
// 4 bytes of event data represent the ID of the
|
|
// stream on which playback has ended.
|
|
EventTypeStreamEOF = 0x01
|
|
|
|
// The server sends this event to notify the client
|
|
// that there is no more data on the stream. If the
|
|
// server does not detect any message for a time
|
|
// period, it can notify the subscribed clients
|
|
// that the stream is dry. The 4 bytes of event
|
|
// data represent the stream ID of the dry stream.
|
|
EventTypeStreamDry = 0x02
|
|
|
|
// The client sends this event to inform the server
|
|
// of the buffer size (in milliseconds) that is
|
|
// used to buffer any data coming over a stream.
|
|
// This event is sent before the server starts
|
|
// processing the stream. The first 4 bytes of the
|
|
// event data represent the stream ID and the next
|
|
// 4 bytes represent the buffer length, in
|
|
// milliseconds.
|
|
EventTypeSetBufferLength = 0x03 // 8bytes event-data
|
|
|
|
// The server sends this event to notify the client
|
|
// that the stream is a recorded stream. The
|
|
// 4 bytes event data represent the stream ID of
|
|
// The recorded stream.
|
|
EventTypeStreamIsRecorded = 0x04
|
|
|
|
// The server sends this event to test whether the
|
|
// client is reachable. Event data is a 4-byte
|
|
// timestamp, representing the local server time
|
|
// When the server dispatched the command. The
|
|
// client responds with kMsgPingResponse on
|
|
// receiving kMsgPingRequest.
|
|
EventTypePingRequest = 0x06
|
|
|
|
// The client sends this event to the server in
|
|
// Response to the ping request. The event data is
|
|
// a 4-byte timestamp, which was received with the
|
|
// kMsgPingRequest request.
|
|
EventTypePingResponse = 0x07
|
|
|
|
// For PCUC size=3, for example the payload is "00 1A 01",
|
|
// it's a FMS control event, where the event type is 0x001a and event data is 0x01,
|
|
// please notice that the event data is only 1 byte for this event.
|
|
EventTypeFmsEvent0 = 0x1a
|
|
)
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 32, @5.4. User Control Message (4)
|
|
// The client or the server sends this message to notify the peer about the user control events.
|
|
// This message carries Event type and Event data.
|
|
type UserControl struct {
|
|
// Event type is followed by Event data.
|
|
// @see: SrcPCUCEventType
|
|
EventType EventType
|
|
// The event data generally in 4bytes.
|
|
// @remark for event type is 0x001a, only 1bytes.
|
|
// @see SrsPCUCFmsEvent0
|
|
EventData int32
|
|
// 4bytes if event_type is SetBufferLength; otherwise 0.
|
|
ExtraData int32
|
|
}
|
|
|
|
func NewUserControl() *UserControl {
|
|
return &UserControl{}
|
|
}
|
|
|
|
func (v *UserControl) BetterCid() chunkID {
|
|
return chunkIDProtocolControl
|
|
}
|
|
|
|
func (v *UserControl) Type() MessageType {
|
|
return MessageTypeUserControl
|
|
}
|
|
|
|
func (v *UserControl) Size() int {
|
|
size := 2
|
|
|
|
if v.EventType == EventTypeFmsEvent0 {
|
|
size += 1
|
|
} else {
|
|
size += 4
|
|
}
|
|
|
|
if v.EventType == EventTypeSetBufferLength {
|
|
size += 4
|
|
}
|
|
|
|
return size
|
|
}
|
|
|
|
func (v *UserControl) UnmarshalBinary(data []byte) (err error) {
|
|
if len(data) < 3 {
|
|
return errors.Errorf("requires 5 only %v bytes, %x", len(data), data)
|
|
}
|
|
|
|
v.EventType = EventType(binary.BigEndian.Uint16(data))
|
|
if len(data) < v.Size() {
|
|
return errors.Errorf("requires %v only %v bytes, %x", v.Size(), len(data), data)
|
|
}
|
|
|
|
if v.EventType == EventTypeFmsEvent0 {
|
|
v.EventData = int32(uint8(data[2]))
|
|
} else {
|
|
v.EventData = int32(binary.BigEndian.Uint32(data[2:]))
|
|
}
|
|
|
|
if v.EventType == EventTypeSetBufferLength {
|
|
v.ExtraData = int32(binary.BigEndian.Uint32(data[6:]))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (v *UserControl) MarshalBinary() (data []byte, err error) {
|
|
data = make([]byte, v.Size())
|
|
binary.BigEndian.PutUint16(data, uint16(v.EventType))
|
|
|
|
if v.EventType == EventTypeFmsEvent0 {
|
|
data[2] = uint8(v.EventData)
|
|
} else {
|
|
binary.BigEndian.PutUint32(data[2:], uint32(v.EventData))
|
|
}
|
|
|
|
if v.EventType == EventTypeSetBufferLength {
|
|
binary.BigEndian.PutUint32(data[6:], uint32(v.ExtraData))
|
|
}
|
|
|
|
return
|
|
}
|