|
|
|
// Copyright (c) 2025 Winlin
|
Proxy: Support proxy server for SRS. v7.0.16 (#4158)
Please note that the proxy server is a new architecture or the next
version of the Origin Cluster, which allows the publication of multiple
streams. The SRS origin cluster consists of a group of origin servers
designed to handle a large number of streams.
```text
+-----------------------+
+---+ SRS Proxy(Deployment) +------+---------------------+
+-----------------+ | +-----------+-----------+ + +
| LB(K8s Service) +--+ +(Redis/MESH) + SRS Origin Servers +
+-----------------+ | +-----------+-----------+ + (Deployment) +
+---+ SRS Proxy(Deployment) +------+---------------------+
+-----------------------+
```
The new origin cluster is designed as a collection of proxy servers. For
more information, see [Discussion
#3634](https://github.com/ossrs/srs/discussions/3634). If you prefer to
use the old origin cluster, please switch to a version before SRS 6.0.
A proxy server can be used for a set of origin servers, which are
isolated and dedicated origin servers. The main improvement in the new
architecture is to store the state for origin servers in the proxy
server, rather than using MESH to communicate between origin servers.
With a proxy server, you can deploy origin servers as stateless servers,
such as in a Kubernetes (K8s) deployment.
Now that the proxy server is a stateful server, it uses Redis to store
the states. For faster development, we use Go to develop the proxy
server, instead of C/C++. Therefore, the proxy server itself is also
stateless, with all states stored in the Redis server or cluster. This
makes the new origin cluster architecture very powerful and robust.
The proxy server is also an architecture designed to solve multiple
process bottlenecks. You can run hundreds of SRS origin servers with one
proxy server on the same machine. This solution can utilize multi-core
machines, such as servers with 128 CPUs. Thus, we can keep SRS
single-threaded and very simple. See
https://github.com/ossrs/srs/discussions/3665#discussioncomment-6474441
for details.
```text
+--------------------+
+-------+ SRS Origin Server +
+ +--------------------+
+
+-----------------------+ + +--------------------+
+ SRS Proxy(Deployment) +------+-------+ SRS Origin Server +
+-----------------------+ + +--------------------+
+
+ +--------------------+
+-------+ SRS Origin Server +
+--------------------+
```
Keep in mind that the proxy server for the Origin Cluster is designed to
handle many streams. To address the issue of many viewers, we will
enhance the Edge Cluster to support more protocols.
```text
+------------------+ +--------------------+
+ SRS Edge Server +--+ +-------+ SRS Origin Server +
+------------------+ + + +--------------------+
+ +
+------------------+ + +-----------------------+ + +--------------------+
+ SRS Edge Server +--+-----+ SRS Proxy(Deployment) +------+-------+ SRS Origin Server +
+------------------+ + +-----------------------+ + +--------------------+
+ +
+------------------+ + + +--------------------+
+ SRS Edge Server +--+ +-------+ SRS Origin Server +
+------------------+ +--------------------+
```
With the new Origin Cluster and Edge Cluster, you have a media system
capable of supporting a large number of streams and viewers. For
example, you can publish 10,000 streams, each with 100,000 viewers.
---------
Co-authored-by: Jacob Su <suzp1984@gmail.com>
5 months ago
|
|
|
//
|
|
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package rtmp
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bufio"
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"encoding"
|
|
|
|
"encoding/binary"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"math/rand"
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
"srs-proxy/errors"
|
|
|
|
)
|
|
|
|
|
|
|
|
// The handshake implements the RTMP handshake protocol.
|
|
|
|
type Handshake struct {
|
|
|
|
// The random number generator.
|
|
|
|
r *rand.Rand
|
|
|
|
// The c1s1 cache.
|
|
|
|
c1s1 []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewHandshake(r *rand.Rand) *Handshake {
|
|
|
|
return &Handshake{r: r}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Handshake) C1S1() []byte {
|
|
|
|
return v.c1s1
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Handshake) WriteC0S0(w io.Writer) (err error) {
|
|
|
|
r := bytes.NewReader([]byte{0x03})
|
|
|
|
if _, err = io.Copy(w, r); err != nil {
|
|
|
|
return errors.Wrap(err, "write c0s0")
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Handshake) ReadC0S0(r io.Reader) (c0 []byte, err error) {
|
|
|
|
b := &bytes.Buffer{}
|
|
|
|
if _, err = io.CopyN(b, r, 1); err != nil {
|
|
|
|
return nil, errors.Wrap(err, "read c0s0")
|
|
|
|
}
|
|
|
|
|
|
|
|
c0 = b.Bytes()
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Handshake) WriteC1S1(w io.Writer) (err error) {
|
|
|
|
p := make([]byte, 1536)
|
|
|
|
|
|
|
|
for i := 8; i < len(p); i++ {
|
|
|
|
p[i] = byte(v.r.Int())
|
|
|
|
}
|
|
|
|
|
|
|
|
r := bytes.NewReader(p)
|
|
|
|
if _, err = io.Copy(w, r); err != nil {
|
|
|
|
return errors.Wrap(err, "write c0s1")
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Handshake) ReadC1S1(r io.Reader) (c1s1 []byte, err error) {
|
|
|
|
b := &bytes.Buffer{}
|
|
|
|
if _, err = io.CopyN(b, r, 1536); err != nil {
|
|
|
|
return nil, errors.Wrap(err, "read c1s1")
|
|
|
|
}
|
|
|
|
|
|
|
|
c1s1 = b.Bytes()
|
|
|
|
v.c1s1 = c1s1
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Handshake) WriteC2S2(w io.Writer, s1c1 []byte) (err error) {
|
|
|
|
r := bytes.NewReader(s1c1[:])
|
|
|
|
if _, err = io.Copy(w, r); err != nil {
|
|
|
|
return errors.Wrap(err, "write c2s2")
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Handshake) ReadC2S2(r io.Reader) (c2 []byte, err error) {
|
|
|
|
b := &bytes.Buffer{}
|
|
|
|
if _, err = io.CopyN(b, r, 1536); err != nil {
|
|
|
|
return nil, errors.Wrap(err, "read c2s2")
|
|
|
|
}
|
|
|
|
|
|
|
|
c2 = b.Bytes()
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 16, @section 6.1. Chunk Format
|
|
|
|
// Extended timestamp: 0 or 4 bytes
|
|
|
|
// This field MUST be sent when the normal timsestamp is set to
|
|
|
|
// 0xffffff, it MUST NOT be sent if the normal timestamp is set to
|
|
|
|
// anything else. So for values less than 0xffffff the normal
|
|
|
|
// timestamp field SHOULD be used in which case the extended timestamp
|
|
|
|
// MUST NOT be present. For values greater than or equal to 0xffffff
|
|
|
|
// the normal timestamp field MUST NOT be used and MUST be set to
|
|
|
|
// 0xffffff and the extended timestamp MUST be sent.
|
|
|
|
const extendedTimestamp = uint64(0xffffff)
|
|
|
|
|
|
|
|
// The default chunk size of RTMP is 128 bytes.
|
|
|
|
const defaultChunkSize = 128
|
|
|
|
|
|
|
|
// The intput or output settings for RTMP protocol.
|
|
|
|
type settings struct {
|
|
|
|
chunkSize uint32
|
|
|
|
}
|
|
|
|
|
|
|
|
func newSettings() *settings {
|
|
|
|
return &settings{
|
|
|
|
chunkSize: defaultChunkSize,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// The chunk stream which transport a message once.
|
|
|
|
type chunkStream struct {
|
|
|
|
format formatType
|
|
|
|
cid chunkID
|
|
|
|
header messageHeader
|
|
|
|
message *Message
|
|
|
|
count uint64
|
|
|
|
extendedTimestamp bool
|
|
|
|
}
|
|
|
|
|
|
|
|
func newChunkStream() *chunkStream {
|
|
|
|
return &chunkStream{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// The protocol implements the RTMP command and chunk stack.
|
|
|
|
type Protocol struct {
|
|
|
|
r *bufio.Reader
|
|
|
|
w *bufio.Writer
|
|
|
|
input struct {
|
|
|
|
opt *settings
|
|
|
|
chunks map[chunkID]*chunkStream
|
|
|
|
|
|
|
|
transactions map[amf0Number]amf0String
|
|
|
|
ltransactions sync.Mutex
|
|
|
|
}
|
|
|
|
output struct {
|
|
|
|
opt *settings
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewProtocol(rw io.ReadWriter) *Protocol {
|
|
|
|
v := &Protocol{
|
|
|
|
r: bufio.NewReader(rw),
|
|
|
|
w: bufio.NewWriter(rw),
|
|
|
|
}
|
|
|
|
|
|
|
|
v.input.opt = newSettings()
|
|
|
|
v.input.chunks = map[chunkID]*chunkStream{}
|
|
|
|
v.input.transactions = map[amf0Number]amf0String{}
|
|
|
|
|
|
|
|
v.output.opt = newSettings()
|
|
|
|
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
func ExpectPacket[T Packet](ctx context.Context, v *Protocol, ppkt *T) (m *Message, err error) {
|
|
|
|
for {
|
|
|
|
if m, err = v.ReadMessage(ctx); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "read message")
|
|
|
|
}
|
|
|
|
|
|
|
|
var pkt Packet
|
|
|
|
if pkt, err = v.DecodeMessage(m); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "decode message")
|
|
|
|
}
|
|
|
|
|
|
|
|
if p, ok := pkt.(T); ok {
|
|
|
|
*ppkt = p
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Deprecated: Please use rtmp.ExpectPacket instead.
|
|
|
|
func (v *Protocol) ExpectPacket(ctx context.Context, ppkt any) (m *Message, err error) {
|
|
|
|
panic("Please use rtmp.ExpectPacket instead")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Protocol) ExpectMessage(ctx context.Context, types ...MessageType) (m *Message, err error) {
|
|
|
|
for {
|
|
|
|
if m, err = v.ReadMessage(ctx); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "read message")
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(types) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, t := range types {
|
|
|
|
if m.MessageType == t {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Protocol) parseAMFObject(p []byte) (pkt Packet, err error) {
|
|
|
|
var commandName amf0String
|
|
|
|
if err = commandName.UnmarshalBinary(p); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "unmarshal command name")
|
|
|
|
}
|
|
|
|
|
|
|
|
switch commandName {
|
|
|
|
case commandResult, commandError:
|
|
|
|
var transactionID amf0Number
|
|
|
|
if err = transactionID.UnmarshalBinary(p[commandName.Size():]); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "unmarshal tid")
|
|
|
|
}
|
|
|
|
|
|
|
|
var requestName amf0String
|
|
|
|
if err = func() error {
|
|
|
|
v.input.ltransactions.Lock()
|
|
|
|
defer v.input.ltransactions.Unlock()
|
|
|
|
|
|
|
|
var ok bool
|
|
|
|
if requestName, ok = v.input.transactions[transactionID]; !ok {
|
|
|
|
return errors.Errorf("No matched request for tid=%v", transactionID)
|
|
|
|
}
|
|
|
|
delete(v.input.transactions, transactionID)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "discovery request name")
|
|
|
|
}
|
|
|
|
|
|
|
|
switch requestName {
|
|
|
|
case commandConnect:
|
|
|
|
return NewConnectAppResPacket(transactionID), nil
|
|
|
|
case commandCreateStream:
|
|
|
|
return NewCreateStreamResPacket(transactionID), nil
|
|
|
|
case commandReleaseStream, commandFCPublish, commandFCUnpublish:
|
|
|
|
call := NewCallPacket()
|
|
|
|
call.TransactionID = transactionID
|
|
|
|
return call, nil
|
|
|
|
default:
|
|
|
|
return nil, errors.Errorf("No request for %v", string(requestName))
|
|
|
|
}
|
|
|
|
case commandConnect:
|
|
|
|
return NewConnectAppPacket(), nil
|
|
|
|
case commandPublish:
|
|
|
|
return NewPublishPacket(), nil
|
|
|
|
case commandPlay:
|
|
|
|
return NewPlayPacket(), nil
|
|
|
|
default:
|
|
|
|
return NewCallPacket(), nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Protocol) DecodeMessage(m *Message) (pkt Packet, err error) {
|
|
|
|
p := m.Payload[:]
|
|
|
|
if len(p) == 0 {
|
|
|
|
return nil, errors.New("Empty packet")
|
|
|
|
}
|
|
|
|
|
|
|
|
switch m.MessageType {
|
|
|
|
case MessageTypeAMF3Command, MessageTypeAMF3Data:
|
|
|
|
p = p[1:]
|
|
|
|
}
|
|
|
|
|
|
|
|
switch m.MessageType {
|
|
|
|
case MessageTypeSetChunkSize:
|
|
|
|
pkt = NewSetChunkSize()
|
|
|
|
case MessageTypeWindowAcknowledgementSize:
|
|
|
|
pkt = NewWindowAcknowledgementSize()
|
|
|
|
case MessageTypeSetPeerBandwidth:
|
|
|
|
pkt = NewSetPeerBandwidth()
|
|
|
|
case MessageTypeAMF0Command, MessageTypeAMF3Command, MessageTypeAMF0Data, MessageTypeAMF3Data:
|
|
|
|
if pkt, err = v.parseAMFObject(p); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, fmt.Sprintf("Parse AMF %v", m.MessageType))
|
|
|
|
}
|
|
|
|
case MessageTypeUserControl:
|
|
|
|
pkt = NewUserControl()
|
|
|
|
default:
|
|
|
|
return nil, errors.Errorf("Unknown message %v", m.MessageType)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = pkt.UnmarshalBinary(p); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, fmt.Sprintf("Unmarshal %v", m.MessageType))
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Protocol) ReadMessage(ctx context.Context) (m *Message, err error) {
|
|
|
|
for m == nil {
|
|
|
|
// TODO: We should convert buffered io to async io, because we will be stuck in block io here,
|
|
|
|
// TODO: but the risk is acceptable because we literally will set the underlay io timeout.
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
return nil, ctx.Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
var cid chunkID
|
|
|
|
var format formatType
|
|
|
|
if format, cid, err = v.readBasicHeader(ctx); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "read basic header")
|
|
|
|
}
|
|
|
|
|
|
|
|
var ok bool
|
|
|
|
var chunk *chunkStream
|
|
|
|
if chunk, ok = v.input.chunks[cid]; !ok {
|
|
|
|
chunk = newChunkStream()
|
|
|
|
v.input.chunks[cid] = chunk
|
|
|
|
chunk.header.betterCid = cid
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = v.readMessageHeader(ctx, chunk, format); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "read message header")
|
|
|
|
}
|
|
|
|
|
|
|
|
if m, err = v.readMessagePayload(ctx, chunk); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "read message payload")
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = v.onMessageArrivated(m); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "on message")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Protocol) readMessagePayload(ctx context.Context, chunk *chunkStream) (m *Message, err error) {
|
|
|
|
// Empty payload message.
|
|
|
|
if chunk.message.payloadLength == 0 {
|
|
|
|
m = chunk.message
|
|
|
|
chunk.message = nil
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate the chunk payload size.
|
|
|
|
chunkedPayloadSize := int(chunk.message.payloadLength) - len(chunk.message.Payload)
|
|
|
|
if chunkedPayloadSize > int(v.input.opt.chunkSize) {
|
|
|
|
chunkedPayloadSize = int(v.input.opt.chunkSize)
|
|
|
|
}
|
|
|
|
|
|
|
|
b := make([]byte, chunkedPayloadSize)
|
|
|
|
if _, err = io.ReadFull(v.r, b); err != nil {
|
|
|
|
return nil, errors.Wrapf(err, "read chunk %vB", chunkedPayloadSize)
|
|
|
|
}
|
|
|
|
chunk.message.Payload = append(chunk.message.Payload, b...)
|
|
|
|
|
|
|
|
// Got entire RTMP message?
|
|
|
|
if int(chunk.message.payloadLength) == len(chunk.message.Payload) {
|
|
|
|
m = chunk.message
|
|
|
|
chunk.message = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 18, @section 6.1.2. Chunk Message Header
|
|
|
|
// There are four different formats for the chunk message header,
|
|
|
|
// selected by the "fmt" field in the chunk basic header.
|
|
|
|
type formatType uint8
|
|
|
|
|
|
|
|
const (
|
|
|
|
// 6.1.2.1. Type 0
|
|
|
|
// Chunks of Type 0 are 11 bytes long. This type MUST be used at the
|
|
|
|
// start of a chunk stream, and whenever the stream timestamp goes
|
|
|
|
// backward (e.g., because of a backward seek).
|
|
|
|
formatType0 formatType = iota
|
|
|
|
// 6.1.2.2. Type 1
|
|
|
|
// Chunks of Type 1 are 7 bytes long. The message stream ID is not
|
|
|
|
// included; this chunk takes the same stream ID as the preceding chunk.
|
|
|
|
// Streams with variable-sized messages (for example, many video
|
|
|
|
// formats) SHOULD use this format for the first chunk of each new
|
|
|
|
// message after the first.
|
|
|
|
formatType1
|
|
|
|
// 6.1.2.3. Type 2
|
|
|
|
// Chunks of Type 2 are 3 bytes long. Neither the stream ID nor the
|
|
|
|
// message length is included; this chunk has the same stream ID and
|
|
|
|
// message length as the preceding chunk. Streams with constant-sized
|
|
|
|
// messages (for example, some audio and data formats) SHOULD use this
|
|
|
|
// format for the first chunk of each message after the first.
|
|
|
|
formatType2
|
|
|
|
// 6.1.2.4. Type 3
|
|
|
|
// Chunks of Type 3 have no header. Stream ID, message length and
|
|
|
|
// timestamp delta are not present; chunks of this type take values from
|
|
|
|
// the preceding chunk. When a single message is split into chunks, all
|
|
|
|
// chunks of a message except the first one, SHOULD use this type. Refer
|
|
|
|
// to example 2 in section 6.2.2. Stream consisting of messages of
|
|
|
|
// exactly the same size, stream ID and spacing in time SHOULD use this
|
|
|
|
// type for all chunks after chunk of Type 2. Refer to example 1 in
|
|
|
|
// section 6.2.1. If the delta between the first message and the second
|
|
|
|
// message is same as the time stamp of first message, then chunk of
|
|
|
|
// type 3 would immediately follow the chunk of type 0 as there is no
|
|
|
|
// need for a chunk of type 2 to register the delta. If Type 3 chunk
|
|
|
|
// follows a Type 0 chunk, then timestamp delta for this Type 3 chunk is
|
|
|
|
// the same as the timestamp of Type 0 chunk.
|
|
|
|
formatType3
|
|
|
|
)
|
|
|
|
|
|
|
|
// The message header size, index is format.
|
|
|
|
var messageHeaderSizes = []int{11, 7, 3, 0}
|
|
|
|
|
|
|
|
// Parse the chunk message header.
|
|
|
|
// 3bytes: timestamp delta, fmt=0,1,2
|
|
|
|
// 3bytes: payload length, fmt=0,1
|
|
|
|
// 1bytes: message type, fmt=0,1
|
|
|
|
// 4bytes: stream id, fmt=0
|
|
|
|
// where:
|
|
|
|
// fmt=0, 0x0X
|
|
|
|
// fmt=1, 0x4X
|
|
|
|
// fmt=2, 0x8X
|
|
|
|
// fmt=3, 0xCX
|
|
|
|
func (v *Protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, format formatType) (err error) {
|
|
|
|
// We should not assert anything about fmt, for the first packet.
|
|
|
|
// (when first packet, the chunk.message is nil).
|
|
|
|
// the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet.
|
|
|
|
// the previous packet is:
|
|
|
|
// 04 // fmt=0, cid=4
|
|
|
|
// 00 00 1a // timestamp=26
|
|
|
|
// 00 00 9d // payload_length=157
|
|
|
|
// 08 // message_type=8(audio)
|
|
|
|
// 01 00 00 00 // stream_id=1
|
|
|
|
// the current packet maybe:
|
|
|
|
// c4 // fmt=3, cid=4
|
|
|
|
// it's ok, for the packet is audio, and timestamp delta is 26.
|
|
|
|
// the current packet must be parsed as:
|
|
|
|
// fmt=0, cid=4
|
|
|
|
// timestamp=26+26=52
|
|
|
|
// payload_length=157
|
|
|
|
// message_type=8(audio)
|
|
|
|
// stream_id=1
|
|
|
|
// so we must update the timestamp even fmt=3 for first packet.
|
|
|
|
//
|
|
|
|
// The fresh packet used to update the timestamp even fmt=3 for first packet.
|
|
|
|
// fresh packet always means the chunk is the first one of message.
|
|
|
|
var isFirstChunkOfMsg bool
|
|
|
|
if chunk.message == nil {
|
|
|
|
isFirstChunkOfMsg = true
|
|
|
|
}
|
|
|
|
|
|
|
|
// But, we can ensure that when a chunk stream is fresh,
|
|
|
|
// the fmt must be 0, a new stream.
|
|
|
|
if chunk.count == 0 && format != formatType0 {
|
|
|
|
// For librtmp, if ping, it will send a fresh stream with fmt=1,
|
|
|
|
// 0x42 where: fmt=1, cid=2, protocol contorl user-control message
|
|
|
|
// 0x00 0x00 0x00 where: timestamp=0
|
|
|
|
// 0x00 0x00 0x06 where: payload_length=6
|
|
|
|
// 0x04 where: message_type=4(protocol control user-control message)
|
|
|
|
// 0x00 0x06 where: event Ping(0x06)
|
|
|
|
// 0x00 0x00 0x0d 0x0f where: event data 4bytes ping timestamp.
|
|
|
|
// @see: https://github.com/ossrs/srs/issues/98
|
|
|
|
if chunk.cid == chunkIDProtocolControl && format == formatType1 {
|
|
|
|
// We accept cid=2, fmt=1 to make librtmp happy.
|
|
|
|
} else {
|
|
|
|
return errors.Errorf("For fresh chunk, fmt %v != %v(required), cid is %v", format, formatType0, chunk.cid)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// When exists cache msg, means got an partial message,
|
|
|
|
// the fmt must not be type0 which means new message.
|
|
|
|
if chunk.message != nil && format == formatType0 {
|
|
|
|
return errors.Errorf("For exists chunk, fmt is %v, cid is %v", format, chunk.cid)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create msg when new chunk stream start
|
|
|
|
if chunk.message == nil {
|
|
|
|
chunk.message = NewMessage()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read the message header.
|
|
|
|
p := make([]byte, messageHeaderSizes[format])
|
|
|
|
if _, err = io.ReadFull(v.r, p); err != nil {
|
|
|
|
return errors.Wrapf(err, "read %vB message header", len(p))
|
|
|
|
}
|
|
|
|
|
|
|
|
// Prse the message header.
|
|
|
|
// 3bytes: timestamp delta, fmt=0,1,2
|
|
|
|
// 3bytes: payload length, fmt=0,1
|
|
|
|
// 1bytes: message type, fmt=0,1
|
|
|
|
// 4bytes: stream id, fmt=0
|
|
|
|
// where:
|
|
|
|
// fmt=0, 0x0X
|
|
|
|
// fmt=1, 0x4X
|
|
|
|
// fmt=2, 0x8X
|
|
|
|
// fmt=3, 0xCX
|
|
|
|
if format <= formatType2 {
|
|
|
|
chunk.header.timestampDelta = uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2])
|
|
|
|
p = p[3:]
|
|
|
|
|
|
|
|
// fmt: 0
|
|
|
|
// timestamp: 3 bytes
|
|
|
|
// If the timestamp is greater than or equal to 16777215
|
|
|
|
// (hexadecimal 0x00ffffff), this value MUST be 16777215, and the
|
|
|
|
// 'extended timestamp header' MUST be present. Otherwise, this value
|
|
|
|
// SHOULD be the entire timestamp.
|
|
|
|
//
|
|
|
|
// fmt: 1 or 2
|
|
|
|
// timestamp delta: 3 bytes
|
|
|
|
// If the delta is greater than or equal to 16777215 (hexadecimal
|
|
|
|
// 0x00ffffff), this value MUST be 16777215, and the 'extended
|
|
|
|
// timestamp header' MUST be present. Otherwise, this value SHOULD be
|
|
|
|
// the entire delta.
|
|
|
|
chunk.extendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp
|
|
|
|
if !chunk.extendedTimestamp {
|
|
|
|
// Extended timestamp: 0 or 4 bytes
|
|
|
|
// This field MUST be sent when the normal timsestamp is set to
|
|
|
|
// 0xffffff, it MUST NOT be sent if the normal timestamp is set to
|
|
|
|
// anything else. So for values less than 0xffffff the normal
|
|
|
|
// timestamp field SHOULD be used in which case the extended timestamp
|
|
|
|
// MUST NOT be present. For values greater than or equal to 0xffffff
|
|
|
|
// the normal timestamp field MUST NOT be used and MUST be set to
|
|
|
|
// 0xffffff and the extended timestamp MUST be sent.
|
|
|
|
if format == formatType0 {
|
|
|
|
// 6.1.2.1. Type 0
|
|
|
|
// For a type-0 chunk, the absolute timestamp of the message is sent
|
|
|
|
// here.
|
|
|
|
chunk.header.Timestamp = uint64(chunk.header.timestampDelta)
|
|
|
|
} else {
|
|
|
|
// 6.1.2.2. Type 1
|
|
|
|
// 6.1.2.3. Type 2
|
|
|
|
// For a type-1 or type-2 chunk, the difference between the previous
|
|
|
|
// chunk's timestamp and the current chunk's timestamp is sent here.
|
|
|
|
chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if format <= formatType1 {
|
|
|
|
payloadLength := uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2])
|
|
|
|
p = p[3:]
|
|
|
|
|
|
|
|
// For a message, if msg exists in cache, the size must not changed.
|
|
|
|
// always use the actual msg size to compare, for the cache payload length can changed,
|
|
|
|
// for the fmt type1(stream_id not changed), user can change the payload
|
|
|
|
// length(it's not allowed in the continue chunks).
|
|
|
|
if !isFirstChunkOfMsg && chunk.header.payloadLength != payloadLength {
|
|
|
|
return errors.Errorf("Chunk message size %v != %v(required)", payloadLength, chunk.header.payloadLength)
|
|
|
|
}
|
|
|
|
chunk.header.payloadLength = payloadLength
|
|
|
|
|
|
|
|
chunk.header.MessageType = MessageType(p[0])
|
|
|
|
p = p[1:]
|
|
|
|
|
|
|
|
if format == formatType0 {
|
|
|
|
chunk.header.streamID = uint32(p[0]) | uint32(p[1])<<8 | uint32(p[2])<<16 | uint32(p[3])<<24
|
|
|
|
p = p[4:]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Update the timestamp even fmt=3 for first chunk packet
|
|
|
|
if isFirstChunkOfMsg && !chunk.extendedTimestamp {
|
|
|
|
chunk.header.Timestamp += uint64(chunk.header.timestampDelta)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read extended-timestamp
|
|
|
|
if chunk.extendedTimestamp {
|
|
|
|
var timestamp uint32
|
|
|
|
if err = binary.Read(v.r, binary.BigEndian, ×tamp); err != nil {
|
|
|
|
return errors.Wrapf(err, "read ext-ts, pkt-ts=%v", chunk.header.Timestamp)
|
|
|
|
}
|
|
|
|
|
|
|
|
// We always use 31bits timestamp, for some server may use 32bits extended timestamp.
|
|
|
|
// @see https://github.com/ossrs/srs/issues/111
|
|
|
|
timestamp &= 0x7fffffff
|
|
|
|
|
|
|
|
// TODO: FIXME: Support detect the extended timestamp.
|
|
|
|
// @see http://blog.csdn.net/win_lin/article/details/13363699
|
|
|
|
chunk.header.Timestamp = uint64(timestamp)
|
|
|
|
}
|
|
|
|
|
|
|
|
// The extended-timestamp must be unsigned-int,
|
|
|
|
// 24bits timestamp: 0xffffff = 16777215ms = 16777.215s = 4.66h
|
|
|
|
// 32bits timestamp: 0xffffffff = 4294967295ms = 4294967.295s = 1193.046h = 49.71d
|
|
|
|
// because the rtmp protocol says the 32bits timestamp is about "50 days":
|
|
|
|
// 3. Byte Order, Alignment, and Time Format
|
|
|
|
// Because timestamps are generally only 32 bits long, they will roll
|
|
|
|
// over after fewer than 50 days.
|
|
|
|
//
|
|
|
|
// but, its sample says the timestamp is 31bits:
|
|
|
|
// An application could assume, for example, that all
|
|
|
|
// adjacent timestamps are within 2^31 milliseconds of each other, so
|
|
|
|
// 10000 comes after 4000000000, while 3000000000 comes before
|
|
|
|
// 4000000000.
|
|
|
|
// and flv specification says timestamp is 31bits:
|
|
|
|
// Extension of the Timestamp field to form a SI32 value. This
|
|
|
|
// field represents the upper 8 bits, while the previous
|
|
|
|
// Timestamp field represents the lower 24 bits of the time in
|
|
|
|
// milliseconds.
|
|
|
|
// in a word, 31bits timestamp is ok.
|
|
|
|
// convert extended timestamp to 31bits.
|
|
|
|
chunk.header.Timestamp &= 0x7fffffff
|
|
|
|
|
|
|
|
// Copy header to msg
|
|
|
|
chunk.message.messageHeader = chunk.header
|
|
|
|
|
|
|
|
// Increase the msg count, the chunk stream can accept fmt=1/2/3 message now.
|
|
|
|
chunk.count++
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 17, @section 6.1.1. Chunk Basic Header
|
|
|
|
// The Chunk Basic Header encodes the chunk stream ID and the chunk
|
|
|
|
// type(represented by fmt field in the figure below). Chunk type
|
|
|
|
// determines the format of the encoded message header. Chunk Basic
|
|
|
|
// Header field may be 1, 2, or 3 bytes, depending on the chunk stream
|
|
|
|
// ID.
|
|
|
|
//
|
|
|
|
// The bits 0-5 (least significant) in the chunk basic header represent
|
|
|
|
// the chunk stream ID.
|
|
|
|
//
|
|
|
|
// Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
|
|
|
|
// field.
|
|
|
|
// 0 1 2 3 4 5 6 7
|
|
|
|
// +-+-+-+-+-+-+-+-+
|
|
|
|
// |fmt| cs id |
|
|
|
|
// +-+-+-+-+-+-+-+-+
|
|
|
|
// Figure 6 Chunk basic header 1
|
|
|
|
//
|
|
|
|
// Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
|
|
|
|
// field. ID is computed as (the second byte + 64).
|
|
|
|
// 0 1
|
|
|
|
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
|
|
|
|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
|
|
// |fmt| 0 | cs id - 64 |
|
|
|
|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
|
|
// Figure 7 Chunk basic header 2
|
|
|
|
//
|
|
|
|
// Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
|
|
|
|
// this field. ID is computed as ((the third byte)*256 + the second byte
|
|
|
|
// + 64).
|
|
|
|
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
|
|
|
|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
|
|
// |fmt| 1 | cs id - 64 |
|
|
|
|
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
|
|
// Figure 8 Chunk basic header 3
|
|
|
|
//
|
|
|
|
// cs id: 6 bits
|
|
|
|
// fmt: 2 bits
|
|
|
|
// cs id - 64: 8 or 16 bits
|
|
|
|
//
|
|
|
|
// Chunk stream IDs with values 64-319 could be represented by both 2-
|
|
|
|
// byte version and 3-byte version of this field.
|
|
|
|
func (v *Protocol) readBasicHeader(ctx context.Context) (format formatType, cid chunkID, err error) {
|
|
|
|
// 2-63, 1B chunk header
|
|
|
|
var t uint8
|
|
|
|
if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
|
|
|
|
return format, cid, errors.Wrap(err, "read basic header")
|
|
|
|
}
|
|
|
|
cid = chunkID(t & 0x3f)
|
|
|
|
format = formatType((t >> 6) & 0x03)
|
|
|
|
|
|
|
|
if cid > 1 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// 64-319, 2B chunk header
|
|
|
|
if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
|
|
|
|
return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid)
|
|
|
|
}
|
|
|
|
cid = chunkID(64 + uint32(t))
|
|
|
|
|
|
|
|
// 64-65599, 3B chunk header
|
|
|
|
if cid == 1 {
|
|
|
|
if err = binary.Read(v.r, binary.BigEndian, &t); err != nil {
|
|
|
|
return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid)
|
|
|
|
}
|
|
|
|
cid += chunkID(uint32(t) * 256)
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Protocol) WritePacket(ctx context.Context, pkt Packet, streamID int) (err error) {
|
|
|
|
m := NewMessage()
|
|
|
|
|
|
|
|
if m.Payload, err = pkt.MarshalBinary(); err != nil {
|
|
|
|
return errors.WithMessage(err, "marshal payload")
|
|
|
|
}
|
|
|
|
|
|
|
|
m.MessageType = pkt.Type()
|
|
|
|
m.streamID = uint32(streamID)
|
|
|
|
m.betterCid = pkt.BetterCid()
|
|
|
|
|
|
|
|
if err = v.WriteMessage(ctx, m); err != nil {
|
|
|
|
return errors.WithMessage(err, "write message")
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = v.onPacketWriten(m, pkt); err != nil {
|
|
|
|
return errors.WithMessage(err, "on write packet")
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Protocol) onPacketWriten(m *Message, pkt Packet) (err error) {
|
|
|
|
var tid amf0Number
|
|
|
|
var name amf0String
|
|
|
|
|
|
|
|
switch pkt := pkt.(type) {
|
|
|
|
case *ConnectAppPacket:
|
|
|
|
tid, name = pkt.TransactionID, pkt.CommandName
|
|
|
|
case *CreateStreamPacket:
|
|
|
|
tid, name = pkt.TransactionID, pkt.CommandName
|
|
|
|
case *CallPacket:
|
|
|
|
tid, name = pkt.TransactionID, pkt.CommandName
|
|
|
|
}
|
|
|
|
|
|
|
|
if tid > 0 && len(name) > 0 {
|
|
|
|
v.input.ltransactions.Lock()
|
|
|
|
defer v.input.ltransactions.Unlock()
|
|
|
|
|
|
|
|
v.input.transactions[tid] = name
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Protocol) onMessageArrivated(m *Message) (err error) {
|
|
|
|
if m == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
var pkt Packet
|
|
|
|
switch m.MessageType {
|
|
|
|
case MessageTypeSetChunkSize, MessageTypeUserControl, MessageTypeWindowAcknowledgementSize:
|
|
|
|
if pkt, err = v.DecodeMessage(m); err != nil {
|
|
|
|
return errors.Errorf("decode message %v", m.MessageType)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
switch pkt := pkt.(type) {
|
|
|
|
case *SetChunkSize:
|
|
|
|
v.input.opt.chunkSize = pkt.ChunkSize
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Protocol) WriteMessage(ctx context.Context, m *Message) (err error) {
|
|
|
|
m.payloadLength = uint32(len(m.Payload))
|
|
|
|
|
|
|
|
var c0h, c3h []byte
|
|
|
|
if c0h, err = m.generateC0Header(); err != nil {
|
|
|
|
return errors.WithMessage(err, "generate c0 header")
|
|
|
|
}
|
|
|
|
if c3h, err = m.generateC3Header(); err != nil {
|
|
|
|
return errors.WithMessage(err, "generate c3 header")
|
|
|
|
}
|
|
|
|
|
|
|
|
var h []byte
|
|
|
|
p := m.Payload
|
|
|
|
for len(p) > 0 {
|
|
|
|
// TODO: We should convert buffered io to async io, because we will be stuck in block io here,
|
|
|
|
// TODO: but the risk is acceptable because we literally will set the underlay io timeout.
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
if h == nil {
|
|
|
|
h = c0h
|
|
|
|
} else {
|
|
|
|
h = c3h
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err = io.Copy(v.w, bytes.NewReader(h)); err != nil {
|
|
|
|
return errors.Wrapf(err, "write c0c3 header %x", h)
|
|
|
|
}
|
|
|
|
|
|
|
|
size := len(p)
|
|
|
|
if size > int(v.output.opt.chunkSize) {
|
|
|
|
size = int(v.output.opt.chunkSize)
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err = io.Copy(v.w, bytes.NewReader(p[:size])); err != nil {
|
|
|
|
return errors.Wrapf(err, "write chunk payload %vB", size)
|
|
|
|
}
|
|
|
|
p = p[size:]
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: We should convert buffered io to async io, because we will be stuck in block io here,
|
|
|
|
// TODO: but the risk is acceptable because we literally will set the underlay io timeout.
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: FIXME: Use writev to write for high performance.
|
|
|
|
if err = v.w.Flush(); err != nil {
|
|
|
|
return errors.Wrapf(err, "flush writer")
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 30, @section 4.1. Message Header
|
|
|
|
// 1byte. One byte field to represent the message type. A range of type IDs
|
|
|
|
// (1-7) are reserved for protocol control messages.
|
|
|
|
type MessageType uint8
|
|
|
|
|
|
|
|
const (
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 30, @section 5. Protocol Control Messages
|
|
|
|
// RTMP reserves message type IDs 1-7 for protocol control messages.
|
|
|
|
// These messages contain information needed by the RTM Chunk Stream
|
|
|
|
// protocol or RTMP itself. Protocol messages with IDs 1 & 2 are
|
|
|
|
// reserved for usage with RTM Chunk Stream protocol. Protocol messages
|
|
|
|
// with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID
|
|
|
|
// 7 is used between edge server and origin server.
|
|
|
|
MessageTypeSetChunkSize MessageType = 0x01
|
|
|
|
MessageTypeAbort MessageType = 0x02 // 0x02
|
|
|
|
MessageTypeAcknowledgement MessageType = 0x03 // 0x03
|
|
|
|
MessageTypeUserControl MessageType = 0x04 // 0x04
|
|
|
|
MessageTypeWindowAcknowledgementSize MessageType = 0x05 // 0x05
|
|
|
|
MessageTypeSetPeerBandwidth MessageType = 0x06 // 0x06
|
|
|
|
MessageTypeEdgeAndOriginServerCommand MessageType = 0x07 // 0x07
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3. Types of messages
|
|
|
|
// The server and the client send messages over the network to
|
|
|
|
// communicate with each other. The messages can be of any type which
|
|
|
|
// includes audio messages, video messages, command messages, shared
|
|
|
|
// object messages, data messages, and user control messages.
|
|
|
|
//
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 41, @section 3.4. Audio message
|
|
|
|
// The client or the server sends this message to send audio data to the
|
|
|
|
// peer. The message type value of 8 is reserved for audio messages.
|
|
|
|
MessageTypeAudio MessageType = 0x08
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 41, @section 3.5. Video message
|
|
|
|
// The client or the server sends this message to send video data to the
|
|
|
|
// peer. The message type value of 9 is reserved for video messages.
|
|
|
|
// These messages are large and can delay the sending of other type of
|
|
|
|
// messages. To avoid such a situation, the video message is assigned
|
|
|
|
// the lowest priority.
|
|
|
|
MessageTypeVideo MessageType = 0x09 // 0x09
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3.1. Command message
|
|
|
|
// Command messages carry the AMF-encoded commands between the client
|
|
|
|
// and the server. These messages have been assigned message type value
|
|
|
|
// of 20 for AMF0 encoding and message type value of 17 for AMF3
|
|
|
|
// encoding. These messages are sent to perform some operations like
|
|
|
|
// connect, createStream, publish, play, pause on the peer. Command
|
|
|
|
// messages like onstatus, result etc. are used to inform the sender
|
|
|
|
// about the status of the requested commands. A command message
|
|
|
|
// consists of command name, transaction ID, and command object that
|
|
|
|
// contains related parameters. A client or a server can request Remote
|
|
|
|
// Procedure Calls (RPC) over streams that are communicated using the
|
|
|
|
// command messages to the peer.
|
|
|
|
MessageTypeAMF3Command MessageType = 17 // 0x11
|
|
|
|
MessageTypeAMF0Command MessageType = 20 // 0x14
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3.2. Data message
|
|
|
|
// The client or the server sends this message to send Metadata or any
|
|
|
|
// user data to the peer. Metadata includes details about the
|
|
|
|
// data(audio, video etc.) like creation time, duration, theme and so
|
|
|
|
// on. These messages have been assigned message type value of 18 for
|
|
|
|
// AMF0 and message type value of 15 for AMF3.
|
|
|
|
MessageTypeAMF0Data MessageType = 18 // 0x12
|
|
|
|
MessageTypeAMF3Data MessageType = 15 // 0x0f
|
|
|
|
)
|
|
|
|
|
|
|
|
// The header of message.
|
|
|
|
type messageHeader struct {
|
|
|
|
// 3bytes.
|
|
|
|
// Three-byte field that contains a timestamp delta of the message.
|
|
|
|
// @remark, only used for decoding message from chunk stream.
|
|
|
|
timestampDelta uint32
|
|
|
|
// 3bytes.
|
|
|
|
// Three-byte field that represents the size of the payload in bytes.
|
|
|
|
// It is set in big-endian format.
|
|
|
|
payloadLength uint32
|
|
|
|
// 1byte.
|
|
|
|
// One byte field to represent the message type. A range of type IDs
|
|
|
|
// (1-7) are reserved for protocol control messages.
|
|
|
|
MessageType MessageType
|
|
|
|
// 4bytes.
|
|
|
|
// Four-byte field that identifies the stream of the message. These
|
|
|
|
// bytes are set in little-endian format.
|
|
|
|
streamID uint32
|
|
|
|
|
|
|
|
// The chunk stream id over which transport.
|
|
|
|
betterCid chunkID
|
|
|
|
|
|
|
|
// Four-byte field that contains a timestamp of the message.
|
|
|
|
// The 4 bytes are packed in the big-endian order.
|
|
|
|
// @remark, we use 64bits for large time for jitter detect and for large tbn like HLS.
|
|
|
|
Timestamp uint64
|
|
|
|
}
|
|
|
|
|
|
|
|
// The RTMP message, transport over chunk stream in RTMP.
|
|
|
|
// Please read the cs id of @doc rtmp_specification_1.0.pdf, @page 30, @section 4.1. Message Header
|
|
|
|
type Message struct {
|
|
|
|
messageHeader
|
|
|
|
|
|
|
|
// The payload which carries the RTMP packet.
|
|
|
|
Payload []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewMessage() *Message {
|
|
|
|
return &Message{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewStreamMessage(streamID int) *Message {
|
|
|
|
v := NewMessage()
|
|
|
|
v.streamID = uint32(streamID)
|
|
|
|
v.betterCid = chunkIDOverStream
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Message) generateC3Header() ([]byte, error) {
|
|
|
|
var c3h []byte
|
|
|
|
if v.Timestamp < extendedTimestamp {
|
|
|
|
c3h = make([]byte, 1)
|
|
|
|
} else {
|
|
|
|
c3h = make([]byte, 1+4)
|
|
|
|
}
|
|
|
|
|
|
|
|
p := c3h
|
|
|
|
p[0] = 0xc0 | byte(v.betterCid&0x3f)
|
|
|
|
p = p[1:]
|
|
|
|
|
|
|
|
// In RTMP protocol, there must not any timestamp in C3 header,
|
|
|
|
// but actually all products from adobe, such as FMS/AMS and Flash player and FMLE,
|
|
|
|
// always carry a extended timestamp in C3 header.
|
|
|
|
// @see: http://blog.csdn.net/win_lin/article/details/13363699
|
|
|
|
if v.Timestamp >= extendedTimestamp {
|
|
|
|
p[0] = byte(v.Timestamp >> 24)
|
|
|
|
p[1] = byte(v.Timestamp >> 16)
|
|
|
|
p[2] = byte(v.Timestamp >> 8)
|
|
|
|
p[3] = byte(v.Timestamp)
|
|
|
|
}
|
|
|
|
|
|
|
|
return c3h, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *Message) generateC0Header() ([]byte, error) {
|
|
|
|
var c0h []byte
|
|
|
|
if v.Timestamp < extendedTimestamp {
|
|
|
|
c0h = make([]byte, 1+3+3+1+4)
|
|
|
|
} else {
|
|
|
|
c0h = make([]byte, 1+3+3+1+4+4)
|
|
|
|
}
|
|
|
|
|
|
|
|
p := c0h
|
|
|
|
p[0] = byte(v.betterCid) & 0x3f
|
|
|
|
p = p[1:]
|
|
|
|
|
|
|
|
if v.Timestamp < extendedTimestamp {
|
|
|
|
p[0] = byte(v.Timestamp >> 16)
|
|
|
|
p[1] = byte(v.Timestamp >> 8)
|
|
|
|
p[2] = byte(v.Timestamp)
|
|
|
|
} else {
|
|
|
|
p[0] = 0xff
|
|
|
|
p[1] = 0xff
|
|
|
|
p[2] = 0xff
|
|
|
|
}
|
|
|
|
p = p[3:]
|
|
|
|
|
|
|
|
p[0] = byte(v.payloadLength >> 16)
|
|
|
|
p[1] = byte(v.payloadLength >> 8)
|
|
|
|
p[2] = byte(v.payloadLength)
|
|
|
|
p = p[3:]
|
|
|
|
|
|
|
|
p[0] = byte(v.MessageType)
|
|
|
|
p = p[1:]
|
|
|
|
|
|
|
|
p[0] = byte(v.streamID)
|
|
|
|
p[1] = byte(v.streamID >> 8)
|
|
|
|
p[2] = byte(v.streamID >> 16)
|
|
|
|
p[3] = byte(v.streamID >> 24)
|
|
|
|
p = p[4:]
|
|
|
|
|
|
|
|
if v.Timestamp >= extendedTimestamp {
|
|
|
|
p[0] = byte(v.Timestamp >> 24)
|
|
|
|
p[1] = byte(v.Timestamp >> 16)
|
|
|
|
p[2] = byte(v.Timestamp >> 8)
|
|
|
|
p[3] = byte(v.Timestamp)
|
|
|
|
}
|
|
|
|
|
|
|
|
return c0h, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read the cs id of @doc rtmp_specification_1.0.pdf, @page 17, @section 6.1.1. Chunk Basic Header
|
|
|
|
type chunkID uint32
|
|
|
|
|
|
|
|
const (
|
|
|
|
chunkIDProtocolControl chunkID = 0x02
|
|
|
|
chunkIDOverConnection chunkID = 0x03
|
|
|
|
chunkIDOverConnection2 chunkID = 0x04
|
|
|
|
chunkIDOverStream chunkID = 0x05
|
|
|
|
chunkIDOverStream2 chunkID = 0x06
|
|
|
|
chunkIDVideo chunkID = 0x07
|
|
|
|
chunkIDAudio chunkID = 0x08
|
|
|
|
)
|
|
|
|
|
|
|
|
// The Command Name of message.
|
|
|
|
const (
|
|
|
|
commandConnect amf0String = amf0String("connect")
|
|
|
|
commandCreateStream amf0String = amf0String("createStream")
|
|
|
|
commandCloseStream amf0String = amf0String("closeStream")
|
|
|
|
commandPlay amf0String = amf0String("play")
|
|
|
|
commandPause amf0String = amf0String("pause")
|
|
|
|
commandOnBWDone amf0String = amf0String("onBWDone")
|
|
|
|
commandOnStatus amf0String = amf0String("onStatus")
|
|
|
|
commandResult amf0String = amf0String("_result")
|
|
|
|
commandError amf0String = amf0String("_error")
|
|
|
|
commandReleaseStream amf0String = amf0String("releaseStream")
|
|
|
|
commandFCPublish amf0String = amf0String("FCPublish")
|
|
|
|
commandFCUnpublish amf0String = amf0String("FCUnpublish")
|
|
|
|
commandPublish amf0String = amf0String("publish")
|
|
|
|
commandRtmpSampleAccess amf0String = amf0String("|RtmpSampleAccess")
|
|
|
|
)
|
|
|
|
|
|
|
|
// The RTMP packet, transport as payload of RTMP message.
|
|
|
|
type Packet interface {
|
|
|
|
// Marshaler and unmarshaler
|
|
|
|
Size() int
|
|
|
|
encoding.BinaryUnmarshaler
|
|
|
|
encoding.BinaryMarshaler
|
|
|
|
|
|
|
|
// RTMP protocol fields for each packet.
|
|
|
|
BetterCid() chunkID
|
|
|
|
Type() MessageType
|
|
|
|
}
|
|
|
|
|
|
|
|
// A Call packet, both object and args are AMF0 objects.
|
|
|
|
type objectCallPacket struct {
|
|
|
|
CommandName amf0String
|
|
|
|
TransactionID amf0Number
|
|
|
|
CommandObject *amf0Object
|
|
|
|
Args *amf0Object
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *objectCallPacket) BetterCid() chunkID {
|
|
|
|
return chunkIDOverConnection
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *objectCallPacket) Type() MessageType {
|
|
|
|
return MessageTypeAMF0Command
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *objectCallPacket) Size() int {
|
|
|
|
size := v.CommandName.Size() + v.TransactionID.Size() + v.CommandObject.Size()
|
|
|
|
if v.Args != nil {
|
|
|
|
size += v.Args.Size()
|
|
|
|
}
|
|
|
|
return size
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *objectCallPacket) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
p := data
|
|
|
|
|
|
|
|
if err = v.CommandName.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal command name")
|
|
|
|
}
|
|
|
|
p = p[v.CommandName.Size():]
|
|
|
|
|
|
|
|
if err = v.TransactionID.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal tid")
|
|
|
|
}
|
|
|
|
p = p[v.TransactionID.Size():]
|
|
|
|
|
|
|
|
if err = v.CommandObject.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal command")
|
|
|
|
}
|
|
|
|
p = p[v.CommandObject.Size():]
|
|
|
|
|
|
|
|
if len(p) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
v.Args = NewAmf0Object()
|
|
|
|
if err = v.Args.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal args")
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *objectCallPacket) MarshalBinary() (data []byte, err error) {
|
|
|
|
var pb []byte
|
|
|
|
if pb, err = v.CommandName.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal command name")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
if pb, err = v.TransactionID.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal tid")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
if pb, err = v.CommandObject.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal command object")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
if v.Args != nil {
|
|
|
|
if pb, err = v.Args.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal args")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 45, @section 4.1.1. connect
|
|
|
|
// The client sends the connect command to the server to request
|
|
|
|
// connection to a server application instance.
|
|
|
|
type ConnectAppPacket struct {
|
|
|
|
objectCallPacket
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewConnectAppPacket() *ConnectAppPacket {
|
|
|
|
v := &ConnectAppPacket{}
|
|
|
|
v.CommandName = commandConnect
|
|
|
|
v.CommandObject = NewAmf0Object()
|
|
|
|
v.TransactionID = amf0Number(1.0)
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *ConnectAppPacket) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
if err = v.objectCallPacket.UnmarshalBinary(data); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
|
|
}
|
|
|
|
|
|
|
|
if v.CommandName != commandConnect {
|
|
|
|
return errors.Errorf("Invalid command name %v", string(v.CommandName))
|
|
|
|
}
|
|
|
|
|
|
|
|
if v.TransactionID != 1.0 {
|
|
|
|
return errors.Errorf("Invalid transaction ID %v", float64(v.TransactionID))
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *ConnectAppPacket) TcUrl() string {
|
|
|
|
if v.CommandObject != nil {
|
|
|
|
if v, ok := v.CommandObject.Get("tcUrl").(*amf0String); ok {
|
|
|
|
return string(*v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
|
|
|
|
// The response for ConnectAppPacket.
|
|
|
|
type ConnectAppResPacket struct {
|
|
|
|
objectCallPacket
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewConnectAppResPacket(tid amf0Number) *ConnectAppResPacket {
|
|
|
|
v := &ConnectAppResPacket{}
|
|
|
|
v.CommandName = commandResult
|
|
|
|
v.CommandObject = NewAmf0Object()
|
|
|
|
v.Args = NewAmf0Object()
|
|
|
|
v.TransactionID = tid
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *ConnectAppResPacket) SrsID() string {
|
|
|
|
if v.Args != nil {
|
|
|
|
if v, ok := v.Args.Get("data").(*amf0EcmaArray); ok {
|
|
|
|
if v, ok := v.Get("srs_id").(*amf0String); ok {
|
|
|
|
return string(*v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *ConnectAppResPacket) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
if err = v.objectCallPacket.UnmarshalBinary(data); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
|
|
}
|
|
|
|
|
|
|
|
if v.CommandName != commandResult {
|
|
|
|
return errors.Errorf("Invalid command name %v", string(v.CommandName))
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// A Call object, command object is variant.
|
|
|
|
type variantCallPacket struct {
|
|
|
|
CommandName amf0String
|
|
|
|
TransactionID amf0Number
|
|
|
|
CommandObject amf0Any // object or null
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *variantCallPacket) BetterCid() chunkID {
|
|
|
|
return chunkIDOverConnection
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *variantCallPacket) Type() MessageType {
|
|
|
|
return MessageTypeAMF0Command
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *variantCallPacket) Size() int {
|
|
|
|
size := v.CommandName.Size() + v.TransactionID.Size()
|
|
|
|
|
|
|
|
if v.CommandObject != nil {
|
|
|
|
size += v.CommandObject.Size()
|
|
|
|
}
|
|
|
|
|
|
|
|
return size
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *variantCallPacket) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
p := data
|
|
|
|
|
|
|
|
if err = v.CommandName.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal command name")
|
|
|
|
}
|
|
|
|
p = p[v.CommandName.Size():]
|
|
|
|
|
|
|
|
if err = v.TransactionID.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal tid")
|
|
|
|
}
|
|
|
|
p = p[v.TransactionID.Size():]
|
|
|
|
|
|
|
|
if len(p) > 0 {
|
|
|
|
if v.CommandObject, err = Amf0Discovery(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "discovery command object")
|
|
|
|
}
|
|
|
|
if err = v.CommandObject.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal command object")
|
|
|
|
}
|
|
|
|
p = p[v.CommandObject.Size():]
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *variantCallPacket) MarshalBinary() (data []byte, err error) {
|
|
|
|
var pb []byte
|
|
|
|
if pb, err = v.CommandName.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal command name")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
if pb, err = v.TransactionID.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal tid")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
if v.CommandObject != nil {
|
|
|
|
if pb, err = v.CommandObject.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal command object")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 51, @section 4.1.2. Call
|
|
|
|
// The call method of the NetConnection object runs remote procedure
|
|
|
|
// calls (RPC) at the receiving end. The called RPC name is passed as a
|
|
|
|
// parameter to the call command.
|
|
|
|
// @remark onStatus packet is a call packet.
|
|
|
|
type CallPacket struct {
|
|
|
|
variantCallPacket
|
|
|
|
Args amf0Any // optional or object or null
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewCallPacket() *CallPacket {
|
|
|
|
return &CallPacket{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *CallPacket) ArgsCode() string {
|
|
|
|
if v.Args != nil {
|
|
|
|
if v, ok := v.Args.(*amf0Object); ok {
|
|
|
|
if code, ok := v.Get("code").(*amf0String); ok {
|
|
|
|
return string(*code)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *CallPacket) Size() int {
|
|
|
|
size := v.variantCallPacket.Size()
|
|
|
|
|
|
|
|
if v.Args != nil {
|
|
|
|
size += v.Args.Size()
|
|
|
|
}
|
|
|
|
|
|
|
|
return size
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *CallPacket) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
p := data
|
|
|
|
|
|
|
|
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
|
|
}
|
|
|
|
p = p[v.variantCallPacket.Size():]
|
|
|
|
|
|
|
|
if len(p) > 0 {
|
|
|
|
if v.Args, err = Amf0Discovery(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "discovery args")
|
|
|
|
}
|
|
|
|
if err = v.Args.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal args")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *CallPacket) MarshalBinary() (data []byte, err error) {
|
|
|
|
var pb []byte
|
|
|
|
if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal call")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
if v.Args != nil {
|
|
|
|
if pb, err = v.Args.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal args")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 52, @section 4.1.3. createStream
|
|
|
|
// The client sends this command to the server to create a logical
|
|
|
|
// channel for message communication The publishing of audio, video, and
|
|
|
|
// metadata is carried out over stream channel created using the
|
|
|
|
// createStream command.
|
|
|
|
type CreateStreamPacket struct {
|
|
|
|
variantCallPacket
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewCreateStreamPacket() *CreateStreamPacket {
|
|
|
|
v := &CreateStreamPacket{}
|
|
|
|
v.CommandName = commandCreateStream
|
|
|
|
v.TransactionID = amf0Number(2)
|
|
|
|
v.CommandObject = NewAmf0Null()
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
// The response for create stream
|
|
|
|
type CreateStreamResPacket struct {
|
|
|
|
variantCallPacket
|
|
|
|
StreamID amf0Number
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewCreateStreamResPacket(tid amf0Number) *CreateStreamResPacket {
|
|
|
|
v := &CreateStreamResPacket{}
|
|
|
|
v.CommandName = commandResult
|
|
|
|
v.TransactionID = tid
|
|
|
|
v.CommandObject = NewAmf0Null()
|
|
|
|
v.StreamID = 0
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *CreateStreamResPacket) Size() int {
|
|
|
|
return v.variantCallPacket.Size() + v.StreamID.Size()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *CreateStreamResPacket) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
p := data
|
|
|
|
|
|
|
|
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
|
|
}
|
|
|
|
p = p[v.variantCallPacket.Size():]
|
|
|
|
|
|
|
|
if err = v.StreamID.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal sid")
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *CreateStreamResPacket) MarshalBinary() (data []byte, err error) {
|
|
|
|
var pb []byte
|
|
|
|
if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal call")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
if pb, err = v.StreamID.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal sid")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 64, @section 4.2.6. Publish
|
|
|
|
type PublishPacket struct {
|
|
|
|
variantCallPacket
|
|
|
|
StreamName amf0String
|
|
|
|
StreamType amf0String
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewPublishPacket() *PublishPacket {
|
|
|
|
v := &PublishPacket{}
|
|
|
|
v.CommandName = commandPublish
|
|
|
|
v.CommandObject = NewAmf0Null()
|
|
|
|
v.StreamType = "live"
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *PublishPacket) Size() int {
|
|
|
|
return v.variantCallPacket.Size() + v.StreamName.Size() + v.StreamType.Size()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *PublishPacket) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
p := data
|
|
|
|
|
|
|
|
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
|
|
}
|
|
|
|
p = p[v.variantCallPacket.Size():]
|
|
|
|
|
|
|
|
if err = v.StreamName.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal stream name")
|
|
|
|
}
|
|
|
|
p = p[v.StreamName.Size():]
|
|
|
|
|
|
|
|
if err = v.StreamType.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal stream type")
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *PublishPacket) MarshalBinary() (data []byte, err error) {
|
|
|
|
var pb []byte
|
|
|
|
if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal call")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
if pb, err = v.StreamName.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal stream name")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
if pb, err = v.StreamType.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal stream type")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 54, @section 4.2.1. play
|
|
|
|
type PlayPacket struct {
|
|
|
|
variantCallPacket
|
|
|
|
StreamName amf0String
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewPlayPacket() *PlayPacket {
|
|
|
|
v := &PlayPacket{}
|
|
|
|
v.CommandName = commandPlay
|
|
|
|
v.CommandObject = NewAmf0Null()
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *PlayPacket) Size() int {
|
|
|
|
return v.variantCallPacket.Size() + v.StreamName.Size()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *PlayPacket) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
p := data
|
|
|
|
|
|
|
|
if err = v.variantCallPacket.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal call")
|
|
|
|
}
|
|
|
|
p = p[v.variantCallPacket.Size():]
|
|
|
|
|
|
|
|
if err = v.StreamName.UnmarshalBinary(p); err != nil {
|
|
|
|
return errors.WithMessage(err, "unmarshal stream name")
|
|
|
|
}
|
|
|
|
p = p[v.StreamName.Size():]
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *PlayPacket) MarshalBinary() (data []byte, err error) {
|
|
|
|
var pb []byte
|
|
|
|
if pb, err = v.variantCallPacket.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal call")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
if pb, err = v.StreamName.MarshalBinary(); err != nil {
|
|
|
|
return nil, errors.WithMessage(err, "marshal stream name")
|
|
|
|
}
|
|
|
|
data = append(data, pb...)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 31, @section 5.1. Set Chunk Size
|
|
|
|
// Protocol control message 1, Set Chunk Size, is used to notify the
|
|
|
|
// peer about the new maximum chunk size.
|
|
|
|
type SetChunkSize struct {
|
|
|
|
ChunkSize uint32
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewSetChunkSize() *SetChunkSize {
|
|
|
|
return &SetChunkSize{
|
|
|
|
ChunkSize: defaultChunkSize,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SetChunkSize) BetterCid() chunkID {
|
|
|
|
return chunkIDProtocolControl
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SetChunkSize) Type() MessageType {
|
|
|
|
return MessageTypeSetChunkSize
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SetChunkSize) Size() int {
|
|
|
|
return 4
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SetChunkSize) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
if len(data) < 4 {
|
|
|
|
return errors.Errorf("requires 4 only %v bytes, %x", len(data), data)
|
|
|
|
}
|
|
|
|
v.ChunkSize = binary.BigEndian.Uint32(data)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SetChunkSize) MarshalBinary() (data []byte, err error) {
|
|
|
|
data = make([]byte, 4)
|
|
|
|
binary.BigEndian.PutUint32(data, v.ChunkSize)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.5. Window Acknowledgement Size (5)
|
|
|
|
// The client or the server sends this message to inform the peer which
|
|
|
|
// window size to use when sending acknowledgment.
|
|
|
|
type WindowAcknowledgementSize struct {
|
|
|
|
AckSize uint32
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewWindowAcknowledgementSize() *WindowAcknowledgementSize {
|
|
|
|
return &WindowAcknowledgementSize{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *WindowAcknowledgementSize) BetterCid() chunkID {
|
|
|
|
return chunkIDProtocolControl
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *WindowAcknowledgementSize) Type() MessageType {
|
|
|
|
return MessageTypeWindowAcknowledgementSize
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *WindowAcknowledgementSize) Size() int {
|
|
|
|
return 4
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *WindowAcknowledgementSize) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
if len(data) < 4 {
|
|
|
|
return errors.Errorf("requires 4 only %v bytes, %x", len(data), data)
|
|
|
|
}
|
|
|
|
v.AckSize = binary.BigEndian.Uint32(data)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *WindowAcknowledgementSize) MarshalBinary() (data []byte, err error) {
|
|
|
|
data = make([]byte, 4)
|
|
|
|
binary.BigEndian.PutUint32(data, v.AckSize)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.6. Set Peer Bandwidth (6)
|
|
|
|
// The sender can mark this message hard (0), soft (1), or dynamic (2)
|
|
|
|
// using the Limit type field.
|
|
|
|
type LimitType uint8
|
|
|
|
|
|
|
|
const (
|
|
|
|
LimitTypeHard LimitType = iota
|
|
|
|
LimitTypeSoft
|
|
|
|
LimitTypeDynamic
|
|
|
|
)
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.6. Set Peer Bandwidth (6)
|
|
|
|
// The client or the server sends this message to update the output
|
|
|
|
// bandwidth of the peer.
|
|
|
|
type SetPeerBandwidth struct {
|
|
|
|
Bandwidth uint32
|
|
|
|
LimitType LimitType
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewSetPeerBandwidth() *SetPeerBandwidth {
|
|
|
|
return &SetPeerBandwidth{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SetPeerBandwidth) BetterCid() chunkID {
|
|
|
|
return chunkIDProtocolControl
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SetPeerBandwidth) Type() MessageType {
|
|
|
|
return MessageTypeSetPeerBandwidth
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SetPeerBandwidth) Size() int {
|
|
|
|
return 4 + 1
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SetPeerBandwidth) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
if len(data) < 5 {
|
|
|
|
return errors.Errorf("requires 5 only %v bytes, %x", len(data), data)
|
|
|
|
}
|
|
|
|
v.Bandwidth = binary.BigEndian.Uint32(data)
|
|
|
|
v.LimitType = LimitType(data[4])
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SetPeerBandwidth) MarshalBinary() (data []byte, err error) {
|
|
|
|
data = make([]byte, 5)
|
|
|
|
binary.BigEndian.PutUint32(data, v.Bandwidth)
|
|
|
|
data[4] = byte(v.LimitType)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
type EventType uint16
|
|
|
|
|
|
|
|
const (
|
|
|
|
// Generally, 4bytes event-data
|
|
|
|
|
|
|
|
// The server sends this event to notify the client
|
|
|
|
// that a stream has become functional and can be
|
|
|
|
// used for communication. By default, this event
|
|
|
|
// is sent on ID 0 after the application connect
|
|
|
|
// command is successfully received from the
|
|
|
|
// client. The event data is 4-byte and represents
|
|
|
|
// The stream ID of the stream that became
|
|
|
|
// Functional.
|
|
|
|
EventTypeStreamBegin = 0x00
|
|
|
|
|
|
|
|
// The server sends this event to notify the client
|
|
|
|
// that the playback of data is over as requested
|
|
|
|
// on this stream. No more data is sent without
|
|
|
|
// issuing additional commands. The client discards
|
|
|
|
// The messages received for the stream. The
|
|
|
|
// 4 bytes of event data represent the ID of the
|
|
|
|
// stream on which playback has ended.
|
|
|
|
EventTypeStreamEOF = 0x01
|
|
|
|
|
|
|
|
// The server sends this event to notify the client
|
|
|
|
// that there is no more data on the stream. If the
|
|
|
|
// server does not detect any message for a time
|
|
|
|
// period, it can notify the subscribed clients
|
|
|
|
// that the stream is dry. The 4 bytes of event
|
|
|
|
// data represent the stream ID of the dry stream.
|
|
|
|
EventTypeStreamDry = 0x02
|
|
|
|
|
|
|
|
// The client sends this event to inform the server
|
|
|
|
// of the buffer size (in milliseconds) that is
|
|
|
|
// used to buffer any data coming over a stream.
|
|
|
|
// This event is sent before the server starts
|
|
|
|
// processing the stream. The first 4 bytes of the
|
|
|
|
// event data represent the stream ID and the next
|
|
|
|
// 4 bytes represent the buffer length, in
|
|
|
|
// milliseconds.
|
|
|
|
EventTypeSetBufferLength = 0x03 // 8bytes event-data
|
|
|
|
|
|
|
|
// The server sends this event to notify the client
|
|
|
|
// that the stream is a recorded stream. The
|
|
|
|
// 4 bytes event data represent the stream ID of
|
|
|
|
// The recorded stream.
|
|
|
|
EventTypeStreamIsRecorded = 0x04
|
|
|
|
|
|
|
|
// The server sends this event to test whether the
|
|
|
|
// client is reachable. Event data is a 4-byte
|
|
|
|
// timestamp, representing the local server time
|
|
|
|
// When the server dispatched the command. The
|
|
|
|
// client responds with kMsgPingResponse on
|
|
|
|
// receiving kMsgPingRequest.
|
|
|
|
EventTypePingRequest = 0x06
|
|
|
|
|
|
|
|
// The client sends this event to the server in
|
|
|
|
// Response to the ping request. The event data is
|
|
|
|
// a 4-byte timestamp, which was received with the
|
|
|
|
// kMsgPingRequest request.
|
|
|
|
EventTypePingResponse = 0x07
|
|
|
|
|
|
|
|
// For PCUC size=3, for example the payload is "00 1A 01",
|
|
|
|
// it's a FMS control event, where the event type is 0x001a and event data is 0x01,
|
|
|
|
// please notice that the event data is only 1 byte for this event.
|
|
|
|
EventTypeFmsEvent0 = 0x1a
|
|
|
|
)
|
|
|
|
|
|
|
|
// Please read @doc rtmp_specification_1.0.pdf, @page 32, @5.4. User Control Message (4)
|
|
|
|
// The client or the server sends this message to notify the peer about the user control events.
|
|
|
|
// This message carries Event type and Event data.
|
|
|
|
type UserControl struct {
|
|
|
|
// Event type is followed by Event data.
|
|
|
|
// @see: SrcPCUCEventType
|
|
|
|
EventType EventType
|
|
|
|
// The event data generally in 4bytes.
|
|
|
|
// @remark for event type is 0x001a, only 1bytes.
|
|
|
|
// @see SrsPCUCFmsEvent0
|
|
|
|
EventData int32
|
|
|
|
// 4bytes if event_type is SetBufferLength; otherwise 0.
|
|
|
|
ExtraData int32
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewUserControl() *UserControl {
|
|
|
|
return &UserControl{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *UserControl) BetterCid() chunkID {
|
|
|
|
return chunkIDProtocolControl
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *UserControl) Type() MessageType {
|
|
|
|
return MessageTypeUserControl
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *UserControl) Size() int {
|
|
|
|
size := 2
|
|
|
|
|
|
|
|
if v.EventType == EventTypeFmsEvent0 {
|
|
|
|
size += 1
|
|
|
|
} else {
|
|
|
|
size += 4
|
|
|
|
}
|
|
|
|
|
|
|
|
if v.EventType == EventTypeSetBufferLength {
|
|
|
|
size += 4
|
|
|
|
}
|
|
|
|
|
|
|
|
return size
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *UserControl) UnmarshalBinary(data []byte) (err error) {
|
|
|
|
if len(data) < 3 {
|
|
|
|
return errors.Errorf("requires 5 only %v bytes, %x", len(data), data)
|
|
|
|
}
|
|
|
|
|
|
|
|
v.EventType = EventType(binary.BigEndian.Uint16(data))
|
|
|
|
if len(data) < v.Size() {
|
|
|
|
return errors.Errorf("requires %v only %v bytes, %x", v.Size(), len(data), data)
|
|
|
|
}
|
|
|
|
|
|
|
|
if v.EventType == EventTypeFmsEvent0 {
|
|
|
|
v.EventData = int32(uint8(data[2]))
|
|
|
|
} else {
|
|
|
|
v.EventData = int32(binary.BigEndian.Uint32(data[2:]))
|
|
|
|
}
|
|
|
|
|
|
|
|
if v.EventType == EventTypeSetBufferLength {
|
|
|
|
v.ExtraData = int32(binary.BigEndian.Uint32(data[6:]))
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *UserControl) MarshalBinary() (data []byte, err error) {
|
|
|
|
data = make([]byte, v.Size())
|
|
|
|
binary.BigEndian.PutUint16(data, uint16(v.EventType))
|
|
|
|
|
|
|
|
if v.EventType == EventTypeFmsEvent0 {
|
|
|
|
data[2] = uint8(v.EventData)
|
|
|
|
} else {
|
|
|
|
binary.BigEndian.PutUint32(data[2:], uint32(v.EventData))
|
|
|
|
}
|
|
|
|
|
|
|
|
if v.EventType == EventTypeSetBufferLength {
|
|
|
|
binary.BigEndian.PutUint32(data[6:], uint32(v.ExtraData))
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|