|
|
|
// Copyright (c) 2025 Winlin
|
Proxy: Support proxy server for SRS. v7.0.16 (#4158)
Please note that the proxy server is a new architecture or the next
version of the Origin Cluster, which allows the publication of multiple
streams. The SRS origin cluster consists of a group of origin servers
designed to handle a large number of streams.
```text
+-----------------------+
+---+ SRS Proxy(Deployment) +------+---------------------+
+-----------------+ | +-----------+-----------+ + +
| LB(K8s Service) +--+ +(Redis/MESH) + SRS Origin Servers +
+-----------------+ | +-----------+-----------+ + (Deployment) +
+---+ SRS Proxy(Deployment) +------+---------------------+
+-----------------------+
```
The new origin cluster is designed as a collection of proxy servers. For
more information, see [Discussion
#3634](https://github.com/ossrs/srs/discussions/3634). If you prefer to
use the old origin cluster, please switch to a version before SRS 6.0.
A proxy server can be used for a set of origin servers, which are
isolated and dedicated origin servers. The main improvement in the new
architecture is to store the state for origin servers in the proxy
server, rather than using MESH to communicate between origin servers.
With a proxy server, you can deploy origin servers as stateless servers,
such as in a Kubernetes (K8s) deployment.
Now that the proxy server is a stateful server, it uses Redis to store
the states. For faster development, we use Go to develop the proxy
server, instead of C/C++. Therefore, the proxy server itself is also
stateless, with all states stored in the Redis server or cluster. This
makes the new origin cluster architecture very powerful and robust.
The proxy server is also an architecture designed to solve multiple
process bottlenecks. You can run hundreds of SRS origin servers with one
proxy server on the same machine. This solution can utilize multi-core
machines, such as servers with 128 CPUs. Thus, we can keep SRS
single-threaded and very simple. See
https://github.com/ossrs/srs/discussions/3665#discussioncomment-6474441
for details.
```text
+--------------------+
+-------+ SRS Origin Server +
+ +--------------------+
+
+-----------------------+ + +--------------------+
+ SRS Proxy(Deployment) +------+-------+ SRS Origin Server +
+-----------------------+ + +--------------------+
+
+ +--------------------+
+-------+ SRS Origin Server +
+--------------------+
```
Keep in mind that the proxy server for the Origin Cluster is designed to
handle many streams. To address the issue of many viewers, we will
enhance the Edge Cluster to support more protocols.
```text
+------------------+ +--------------------+
+ SRS Edge Server +--+ +-------+ SRS Origin Server +
+------------------+ + + +--------------------+
+ +
+------------------+ + +-----------------------+ + +--------------------+
+ SRS Edge Server +--+-----+ SRS Proxy(Deployment) +------+-------+ SRS Origin Server +
+------------------+ + +-----------------------+ + +--------------------+
+ +
+------------------+ + + +--------------------+
+ SRS Edge Server +--+ +-------+ SRS Origin Server +
+------------------+ +--------------------+
```
With the new Origin Cluster and Edge Cluster, you have a media system
capable of supporting a large number of streams and viewers. For
example, you can publish 10,000 streams, each with 100,000 viewers.
---------
Co-authored-by: Jacob Su <suzp1984@gmail.com>
5 months ago
|
|
|
//
|
|
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"encoding/binary"
|
|
|
|
"fmt"
|
|
|
|
"net"
|
|
|
|
"strings"
|
|
|
|
stdSync "sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"srs-proxy/errors"
|
|
|
|
"srs-proxy/logger"
|
|
|
|
"srs-proxy/sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
// srsSRTServer is the proxy for SRS server via SRT. It will figure out which backend server to
|
|
|
|
// proxy to. It only parses the SRT handshake messages, parses the stream id, and proxy to the
|
|
|
|
// backend server.
|
|
|
|
type srsSRTServer struct {
|
|
|
|
// The UDP listener for SRT server.
|
|
|
|
listener *net.UDPConn
|
|
|
|
|
|
|
|
// The SRT connections, identify by the socket ID.
|
|
|
|
sockets sync.Map[uint32, *SRTConnection]
|
|
|
|
// The system start time.
|
|
|
|
start time.Time
|
|
|
|
|
|
|
|
// The wait group for server.
|
|
|
|
wg stdSync.WaitGroup
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewSRSSRTServer(opts ...func(*srsSRTServer)) *srsSRTServer {
|
|
|
|
v := &srsSRTServer{
|
|
|
|
start: time.Now(),
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt(v)
|
|
|
|
}
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *srsSRTServer) Close() error {
|
|
|
|
if v.listener != nil {
|
|
|
|
v.listener.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
v.wg.Wait()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *srsSRTServer) Run(ctx context.Context) error {
|
|
|
|
// Parse address to listen.
|
|
|
|
endpoint := envSRTServer()
|
|
|
|
if !strings.Contains(endpoint, ":") {
|
|
|
|
endpoint = ":" + endpoint
|
|
|
|
}
|
|
|
|
|
|
|
|
saddr, err := net.ResolveUDPAddr("udp", endpoint)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrapf(err, "resolve udp addr %v", endpoint)
|
|
|
|
}
|
|
|
|
|
|
|
|
listener, err := net.ListenUDP("udp", saddr)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrapf(err, "listen udp %v", saddr)
|
|
|
|
}
|
|
|
|
v.listener = listener
|
|
|
|
logger.Df(ctx, "SRT server listen at %v", saddr)
|
|
|
|
|
|
|
|
// Consume all messages from UDP media transport.
|
|
|
|
v.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer v.wg.Done()
|
|
|
|
|
|
|
|
for ctx.Err() == nil {
|
|
|
|
buf := make([]byte, 4096)
|
|
|
|
n, caddr, err := v.listener.ReadFromUDP(buf)
|
|
|
|
if err != nil {
|
|
|
|
// TODO: If SRT server closed unexpectedly, we should notice the main loop to quit.
|
|
|
|
logger.Wf(ctx, "read from udp failed, err=%+v", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := v.handleClientUDP(ctx, caddr, buf[:n]); err != nil {
|
|
|
|
logger.Wf(ctx, "handle udp %vB failed, addr=%v, err=%+v", n, caddr, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *srsSRTServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
|
|
|
|
socketID := srtParseSocketID(data)
|
|
|
|
|
|
|
|
var pkt *SRTHandshakePacket
|
|
|
|
if srtIsHandshake(data) {
|
|
|
|
pkt = &SRTHandshakePacket{}
|
|
|
|
if err := pkt.UnmarshalBinary(data); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if socketID == 0 {
|
|
|
|
socketID = pkt.SRTSocketID
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
conn, ok := v.sockets.LoadOrStore(socketID, NewSRTConnection(func(c *SRTConnection) {
|
|
|
|
c.ctx = logger.WithContext(ctx)
|
|
|
|
c.listenerUDP, c.socketID = v.listener, socketID
|
|
|
|
c.start = v.start
|
|
|
|
}))
|
|
|
|
|
|
|
|
ctx = conn.ctx
|
|
|
|
if !ok {
|
|
|
|
logger.Df(ctx, "Create new SRT connection skt=%v", socketID)
|
|
|
|
}
|
|
|
|
|
|
|
|
if newSocketID, err := conn.HandlePacket(pkt, addr, data); err != nil {
|
|
|
|
return errors.Wrapf(err, "handle packet")
|
|
|
|
} else if newSocketID != 0 && newSocketID != socketID {
|
|
|
|
// The connection may use a new socket ID.
|
|
|
|
// TODO: FIXME: Should cleanup the dead SRT connection.
|
|
|
|
v.sockets.Store(newSocketID, conn)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SRTConnection is an SRT connection proxy, for both caller and listener. It represents an SRT
|
|
|
|
// connection, identify by the socket ID.
|
|
|
|
//
|
|
|
|
// It's similar to RTMP or HTTP FLV/TS proxy connection, which are stateless and all state is in
|
|
|
|
// the client request. The SRTConnection is stateless, and no need to sync between proxy servers.
|
|
|
|
//
|
|
|
|
// Unlike the WebRTC connection, SRTConnection does not support address changes. This means the
|
|
|
|
// client should never switch to another network or port. If this occurs, the client may be served
|
|
|
|
// by a different proxy server and fail because the other proxy server cannot identify the client.
|
|
|
|
type SRTConnection struct {
|
|
|
|
// The stream context for SRT connection.
|
|
|
|
ctx context.Context
|
|
|
|
|
|
|
|
// The current socket ID.
|
|
|
|
socketID uint32
|
|
|
|
|
|
|
|
// The UDP connection proxy to backend.
|
|
|
|
backendUDP *net.UDPConn
|
|
|
|
// The listener UDP connection, used to send messages to client.
|
|
|
|
listenerUDP *net.UDPConn
|
|
|
|
|
|
|
|
// Listener start time.
|
|
|
|
start time.Time
|
|
|
|
|
|
|
|
// Handshake packets with client.
|
|
|
|
handshake0 *SRTHandshakePacket
|
|
|
|
handshake1 *SRTHandshakePacket
|
|
|
|
handshake2 *SRTHandshakePacket
|
|
|
|
handshake3 *SRTHandshakePacket
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewSRTConnection(opts ...func(*SRTConnection)) *SRTConnection {
|
|
|
|
v := &SRTConnection{}
|
|
|
|
for _, opt := range opts {
|
|
|
|
opt(v)
|
|
|
|
}
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SRTConnection) HandlePacket(pkt *SRTHandshakePacket, addr *net.UDPAddr, data []byte) (uint32, error) {
|
|
|
|
ctx := v.ctx
|
|
|
|
|
|
|
|
// If not handshake, try to proxy to backend directly.
|
|
|
|
if pkt == nil {
|
|
|
|
// Proxy client message to backend.
|
|
|
|
if v.backendUDP != nil {
|
|
|
|
if _, err := v.backendUDP.Write(data); err != nil {
|
|
|
|
return v.socketID, errors.Wrapf(err, "write to backend")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return v.socketID, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Handle handshake messages.
|
|
|
|
if err := v.handleHandshake(ctx, pkt, addr, data); err != nil {
|
|
|
|
return v.socketID, errors.Wrapf(err, "handle handshake %v", pkt)
|
|
|
|
}
|
|
|
|
|
|
|
|
return v.socketID, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SRTConnection) handleHandshake(ctx context.Context, pkt *SRTHandshakePacket, addr *net.UDPAddr, data []byte) error {
|
|
|
|
// Handle handshake 0 and 1 messages.
|
|
|
|
if pkt.SynCookie == 0 {
|
|
|
|
// Save handshake 0 packet.
|
|
|
|
v.handshake0 = pkt
|
|
|
|
logger.Df(ctx, "SRT Handshake 0: %v", v.handshake0)
|
|
|
|
|
|
|
|
// Response handshake 1.
|
|
|
|
v.handshake1 = &SRTHandshakePacket{
|
|
|
|
ControlFlag: pkt.ControlFlag,
|
|
|
|
ControlType: 0,
|
|
|
|
SubType: 0,
|
|
|
|
AdditionalInfo: 0,
|
|
|
|
Timestamp: uint32(time.Since(v.start).Microseconds()),
|
|
|
|
SocketID: pkt.SRTSocketID,
|
|
|
|
Version: 5,
|
|
|
|
EncryptionField: 0,
|
|
|
|
ExtensionField: 0x4A17,
|
|
|
|
InitSequence: pkt.InitSequence,
|
|
|
|
MTU: pkt.MTU,
|
|
|
|
FlowWindow: pkt.FlowWindow,
|
|
|
|
HandshakeType: 1,
|
|
|
|
SRTSocketID: pkt.SRTSocketID,
|
|
|
|
SynCookie: 0x418d5e4e,
|
|
|
|
PeerIP: net.ParseIP("127.0.0.1"),
|
|
|
|
}
|
|
|
|
logger.Df(ctx, "SRT Handshake 1: %v", v.handshake1)
|
|
|
|
|
|
|
|
if b, err := v.handshake1.MarshalBinary(); err != nil {
|
|
|
|
return errors.Wrapf(err, "marshal handshake 1")
|
|
|
|
} else if _, err = v.listenerUDP.WriteToUDP(b, addr); err != nil {
|
|
|
|
return errors.Wrapf(err, "write handshake 1")
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Handle handshake 2 and 3 messages.
|
|
|
|
// Parse stream id from packet.
|
|
|
|
streamID, err := pkt.StreamID()
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrapf(err, "parse stream id")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Save handshake packet.
|
|
|
|
v.handshake2 = pkt
|
|
|
|
logger.Df(ctx, "SRT Handshake 2: %v, sid=%v", v.handshake2, streamID)
|
|
|
|
|
|
|
|
// Start the UDP proxy to backend.
|
|
|
|
if err := v.connectBackend(ctx, streamID); err != nil {
|
|
|
|
return errors.Wrapf(err, "connect backend for %v", streamID)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Proxy client message to backend.
|
|
|
|
if v.backendUDP == nil {
|
|
|
|
return errors.Errorf("no backend for %v", streamID)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Proxy handshake 0 to backend server.
|
|
|
|
if b, err := v.handshake0.MarshalBinary(); err != nil {
|
|
|
|
return errors.Wrapf(err, "marshal handshake 0")
|
|
|
|
} else if _, err = v.backendUDP.Write(b); err != nil {
|
|
|
|
return errors.Wrapf(err, "write handshake 0")
|
|
|
|
}
|
|
|
|
logger.Df(ctx, "Proxy send handshake 0: %v", v.handshake0)
|
|
|
|
|
|
|
|
// Read handshake 1 from backend server.
|
|
|
|
b := make([]byte, 4096)
|
|
|
|
handshake1p := &SRTHandshakePacket{}
|
|
|
|
if nn, err := v.backendUDP.Read(b); err != nil {
|
|
|
|
return errors.Wrapf(err, "read handshake 1")
|
|
|
|
} else if err := handshake1p.UnmarshalBinary(b[:nn]); err != nil {
|
|
|
|
return errors.Wrapf(err, "unmarshal handshake 1")
|
|
|
|
}
|
|
|
|
logger.Df(ctx, "Proxy got handshake 1: %v", handshake1p)
|
|
|
|
|
|
|
|
// Proxy handshake 2 to backend server.
|
|
|
|
handshake2p := *v.handshake2
|
|
|
|
handshake2p.SynCookie = handshake1p.SynCookie
|
|
|
|
if b, err := handshake2p.MarshalBinary(); err != nil {
|
|
|
|
return errors.Wrapf(err, "marshal handshake 2")
|
|
|
|
} else if _, err = v.backendUDP.Write(b); err != nil {
|
|
|
|
return errors.Wrapf(err, "write handshake 2")
|
|
|
|
}
|
|
|
|
logger.Df(ctx, "Proxy send handshake 2: %v", handshake2p)
|
|
|
|
|
|
|
|
// Read handshake 3 from backend server.
|
|
|
|
handshake3p := &SRTHandshakePacket{}
|
|
|
|
if nn, err := v.backendUDP.Read(b); err != nil {
|
|
|
|
return errors.Wrapf(err, "read handshake 3")
|
|
|
|
} else if err := handshake3p.UnmarshalBinary(b[:nn]); err != nil {
|
|
|
|
return errors.Wrapf(err, "unmarshal handshake 3")
|
|
|
|
}
|
|
|
|
logger.Df(ctx, "Proxy got handshake 3: %v", handshake3p)
|
|
|
|
|
|
|
|
// Response handshake 3 to client.
|
|
|
|
v.handshake3 = &*handshake3p
|
|
|
|
v.handshake3.SynCookie = v.handshake1.SynCookie
|
|
|
|
v.socketID = handshake3p.SRTSocketID
|
|
|
|
logger.Df(ctx, "Handshake 3: %v", v.handshake3)
|
|
|
|
|
|
|
|
if b, err := v.handshake3.MarshalBinary(); err != nil {
|
|
|
|
return errors.Wrapf(err, "marshal handshake 3")
|
|
|
|
} else if _, err = v.listenerUDP.WriteToUDP(b, addr); err != nil {
|
|
|
|
return errors.Wrapf(err, "write handshake 3")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start a goroutine to proxy message from backend to client.
|
|
|
|
// TODO: FIXME: Support close the connection when timeout or client disconnected.
|
|
|
|
go func() {
|
|
|
|
for ctx.Err() == nil {
|
|
|
|
nn, err := v.backendUDP.Read(b)
|
|
|
|
if err != nil {
|
|
|
|
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
|
|
|
logger.Wf(ctx, "read from backend failed, err=%v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if _, err = v.listenerUDP.WriteToUDP(b[:nn], addr); err != nil {
|
|
|
|
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
|
|
|
logger.Wf(ctx, "write to client failed, err=%v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SRTConnection) connectBackend(ctx context.Context, streamID string) error {
|
|
|
|
if v.backendUDP != nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Parse stream id to host and resource.
|
|
|
|
host, resource, err := parseSRTStreamID(streamID)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrapf(err, "parse stream id %v", streamID)
|
|
|
|
}
|
|
|
|
|
|
|
|
if host == "" {
|
|
|
|
host = "localhost"
|
|
|
|
}
|
|
|
|
|
|
|
|
streamURL, err := buildStreamURL(fmt.Sprintf("srt://%v/%v", host, resource))
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrapf(err, "build stream url %v", streamID)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Pick a backend SRS server to proxy the SRT stream.
|
|
|
|
backend, err := srsLoadBalancer.Pick(ctx, streamURL)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrapf(err, "pick backend for %v", streamURL)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Parse UDP port from backend.
|
|
|
|
if len(backend.SRT) == 0 {
|
|
|
|
return errors.Errorf("no udp server %v for %v", backend, streamURL)
|
|
|
|
}
|
|
|
|
|
|
|
|
_, _, udpPort, err := parseListenEndpoint(backend.SRT[0])
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrapf(err, "parse udp port %v of %v for %v", backend.SRT[0], backend, streamURL)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Connect to backend SRS server via UDP client.
|
|
|
|
// TODO: FIXME: Support close the connection when timeout or client disconnected.
|
|
|
|
backendAddr := net.UDPAddr{IP: net.ParseIP(backend.IP), Port: int(udpPort)}
|
|
|
|
if backendUDP, err := net.DialUDP("udp", nil, &backendAddr); err != nil {
|
|
|
|
return errors.Wrapf(err, "dial udp to %v of %v for %v", backendAddr, backend, streamURL)
|
|
|
|
} else {
|
|
|
|
v.backendUDP = backendUDP
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// See https://datatracker.ietf.org/doc/html/draft-sharabayko-srt-01#section-3.2
|
|
|
|
// See https://datatracker.ietf.org/doc/html/draft-sharabayko-srt-01#section-3.2.1
|
|
|
|
type SRTHandshakePacket struct {
|
|
|
|
// F: 1 bit. Packet Type Flag. The control packet has this flag set to
|
|
|
|
// "1". The data packet has this flag set to "0".
|
|
|
|
ControlFlag uint8
|
|
|
|
// Control Type: 15 bits. Control Packet Type. The use of these bits
|
|
|
|
// is determined by the control packet type definition.
|
|
|
|
// Handshake control packets (Control Type = 0x0000) are used to
|
|
|
|
// exchange peer configurations, to agree on connection parameters, and
|
|
|
|
// to establish a connection.
|
|
|
|
ControlType uint16
|
|
|
|
// Subtype: 16 bits. This field specifies an additional subtype for
|
|
|
|
// specific packets.
|
|
|
|
SubType uint16
|
|
|
|
// Type-specific Information: 32 bits. The use of this field depends on
|
|
|
|
// the particular control packet type. Handshake packets do not use
|
|
|
|
// this field.
|
|
|
|
AdditionalInfo uint32
|
|
|
|
// Timestamp: 32 bits.
|
|
|
|
Timestamp uint32
|
|
|
|
// Destination Socket ID: 32 bits.
|
|
|
|
SocketID uint32
|
|
|
|
|
|
|
|
// Version: 32 bits. A base protocol version number. Currently used
|
|
|
|
// values are 4 and 5. Values greater than 5 are reserved for future
|
|
|
|
// use.
|
|
|
|
Version uint32
|
|
|
|
// Encryption Field: 16 bits. Block cipher family and key size. The
|
|
|
|
// values of this field are described in Table 2. The default value
|
|
|
|
// is AES-128.
|
|
|
|
// 0 | No Encryption Advertised
|
|
|
|
// 2 | AES-128
|
|
|
|
// 3 | AES-192
|
|
|
|
// 4 | AES-256
|
|
|
|
EncryptionField uint16
|
|
|
|
// Extension Field: 16 bits. This field is message specific extension
|
|
|
|
// related to Handshake Type field. The value MUST be set to 0
|
|
|
|
// except for the following cases. (1) If the handshake control
|
|
|
|
// packet is the INDUCTION message, this field is sent back by the
|
|
|
|
// Listener. (2) In the case of a CONCLUSION message, this field
|
|
|
|
// value should contain a combination of Extension Type values.
|
|
|
|
// 0x00000001 | HSREQ
|
|
|
|
// 0x00000002 | KMREQ
|
|
|
|
// 0x00000004 | CONFIG
|
|
|
|
// 0x4A17 if HandshakeType is INDUCTION, see https://datatracker.ietf.org/doc/html/draft-sharabayko-srt-01#section-4.3.1.1
|
|
|
|
ExtensionField uint16
|
|
|
|
// Initial Packet Sequence Number: 32 bits. The sequence number of the
|
|
|
|
// very first data packet to be sent.
|
|
|
|
InitSequence uint32
|
|
|
|
// Maximum Transmission Unit Size: 32 bits. This value is typically set
|
|
|
|
// to 1500, which is the default Maximum Transmission Unit (MTU) size
|
|
|
|
// for Ethernet, but can be less.
|
|
|
|
MTU uint32
|
|
|
|
// Maximum Flow Window Size: 32 bits. The value of this field is the
|
|
|
|
// maximum number of data packets allowed to be "in flight" (i.e. the
|
|
|
|
// number of sent packets for which an ACK control packet has not yet
|
|
|
|
// been received).
|
|
|
|
FlowWindow uint32
|
|
|
|
// Handshake Type: 32 bits. This field indicates the handshake packet
|
|
|
|
// type.
|
|
|
|
// 0xFFFFFFFD | DONE
|
|
|
|
// 0xFFFFFFFE | AGREEMENT
|
|
|
|
// 0xFFFFFFFF | CONCLUSION
|
|
|
|
// 0x00000000 | WAVEHAND
|
|
|
|
// 0x00000001 | INDUCTION
|
|
|
|
HandshakeType uint32
|
|
|
|
// SRT Socket ID: 32 bits. This field holds the ID of the source SRT
|
|
|
|
// socket from which a handshake packet is issued.
|
|
|
|
SRTSocketID uint32
|
|
|
|
// SYN Cookie: 32 bits. Randomized value for processing a handshake.
|
|
|
|
// The value of this field is specified by the handshake message
|
|
|
|
// type.
|
|
|
|
SynCookie uint32
|
|
|
|
// Peer IP Address: 128 bits. IPv4 or IPv6 address of the packet's
|
|
|
|
// sender. The value consists of four 32-bit fields.
|
|
|
|
PeerIP net.IP
|
|
|
|
// Extensions.
|
|
|
|
// Extension Type: 16 bits. The value of this field is used to process
|
|
|
|
// an integrated handshake. Each extension can have a pair of
|
|
|
|
// request and response types.
|
|
|
|
// Extension Length: 16 bits. The length of the Extension Contents
|
|
|
|
// field in four-byte blocks.
|
|
|
|
// Extension Contents: variable length. The payload of the extension.
|
|
|
|
ExtraData []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SRTHandshakePacket) IsData() bool {
|
|
|
|
return v.ControlFlag == 0x00
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SRTHandshakePacket) IsControl() bool {
|
|
|
|
return v.ControlFlag == 0x80
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SRTHandshakePacket) IsHandshake() bool {
|
|
|
|
return v.IsControl() && v.ControlType == 0x00 && v.SubType == 0x00
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SRTHandshakePacket) StreamID() (string, error) {
|
|
|
|
p := v.ExtraData
|
|
|
|
for {
|
|
|
|
if len(p) < 2 {
|
|
|
|
return "", errors.Errorf("Require 2 bytes, actual=%v, extra=%v", len(p), len(v.ExtraData))
|
|
|
|
}
|
|
|
|
|
|
|
|
extType := binary.BigEndian.Uint16(p)
|
|
|
|
extSize := binary.BigEndian.Uint16(p[2:])
|
|
|
|
p = p[4:]
|
|
|
|
|
|
|
|
if len(p) < int(extSize*4) {
|
|
|
|
return "", errors.Errorf("Require %v bytes, actual=%v, extra=%v", extSize*4, len(p), len(v.ExtraData))
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ignore other packets except stream id.
|
|
|
|
if extType != 0x05 {
|
|
|
|
p = p[extSize*4:]
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// We must copy it, because we will decode the stream id.
|
|
|
|
data := append([]byte{}, p[:extSize*4]...)
|
|
|
|
|
|
|
|
// Reverse the stream id encoded in little-endian to big-endian.
|
|
|
|
for i := 0; i < len(data); i += 4 {
|
|
|
|
value := binary.LittleEndian.Uint32(data[i:])
|
|
|
|
binary.BigEndian.PutUint32(data[i:], value)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Trim the trailing zero bytes.
|
|
|
|
data = bytes.TrimRight(data, "\x00")
|
|
|
|
return string(data), nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SRTHandshakePacket) String() string {
|
|
|
|
return fmt.Sprintf("Control=%v, CType=%v, SType=%v, Timestamp=%v, SocketID=%v, Version=%v, Encrypt=%v, Extension=%v, InitSequence=%v, MTU=%v, FlowWnd=%v, HSType=%v, SRTSocketID=%v, Cookie=%v, Peer=%vB, Extra=%vB",
|
|
|
|
v.IsControl(), v.ControlType, v.SubType, v.Timestamp, v.SocketID, v.Version, v.EncryptionField, v.ExtensionField, v.InitSequence, v.MTU, v.FlowWindow, v.HandshakeType, v.SRTSocketID, v.SynCookie, len(v.PeerIP), len(v.ExtraData))
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SRTHandshakePacket) UnmarshalBinary(b []byte) error {
|
|
|
|
if len(b) < 4 {
|
|
|
|
return errors.Errorf("Invalid packet length %v", len(b))
|
|
|
|
}
|
|
|
|
v.ControlFlag = b[0] & 0x80
|
|
|
|
v.ControlType = binary.BigEndian.Uint16(b[0:2]) & 0x7fff
|
|
|
|
v.SubType = binary.BigEndian.Uint16(b[2:4])
|
|
|
|
|
|
|
|
if len(b) < 64 {
|
|
|
|
return errors.Errorf("Invalid packet length %v", len(b))
|
|
|
|
}
|
|
|
|
v.AdditionalInfo = binary.BigEndian.Uint32(b[4:])
|
|
|
|
v.Timestamp = binary.BigEndian.Uint32(b[8:])
|
|
|
|
v.SocketID = binary.BigEndian.Uint32(b[12:])
|
|
|
|
v.Version = binary.BigEndian.Uint32(b[16:])
|
|
|
|
v.EncryptionField = binary.BigEndian.Uint16(b[20:])
|
|
|
|
v.ExtensionField = binary.BigEndian.Uint16(b[22:])
|
|
|
|
v.InitSequence = binary.BigEndian.Uint32(b[24:])
|
|
|
|
v.MTU = binary.BigEndian.Uint32(b[28:])
|
|
|
|
v.FlowWindow = binary.BigEndian.Uint32(b[32:])
|
|
|
|
v.HandshakeType = binary.BigEndian.Uint32(b[36:])
|
|
|
|
v.SRTSocketID = binary.BigEndian.Uint32(b[40:])
|
|
|
|
v.SynCookie = binary.BigEndian.Uint32(b[44:])
|
|
|
|
|
|
|
|
// Only support IPv4.
|
|
|
|
v.PeerIP = net.IPv4(b[51], b[50], b[49], b[48])
|
|
|
|
|
|
|
|
v.ExtraData = b[64:]
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *SRTHandshakePacket) MarshalBinary() ([]byte, error) {
|
|
|
|
b := make([]byte, 64+len(v.ExtraData))
|
|
|
|
binary.BigEndian.PutUint16(b, uint16(v.ControlFlag)<<8|v.ControlType)
|
|
|
|
binary.BigEndian.PutUint16(b[2:], v.SubType)
|
|
|
|
binary.BigEndian.PutUint32(b[4:], v.AdditionalInfo)
|
|
|
|
binary.BigEndian.PutUint32(b[8:], v.Timestamp)
|
|
|
|
binary.BigEndian.PutUint32(b[12:], v.SocketID)
|
|
|
|
binary.BigEndian.PutUint32(b[16:], v.Version)
|
|
|
|
binary.BigEndian.PutUint16(b[20:], v.EncryptionField)
|
|
|
|
binary.BigEndian.PutUint16(b[22:], v.ExtensionField)
|
|
|
|
binary.BigEndian.PutUint32(b[24:], v.InitSequence)
|
|
|
|
binary.BigEndian.PutUint32(b[28:], v.MTU)
|
|
|
|
binary.BigEndian.PutUint32(b[32:], v.FlowWindow)
|
|
|
|
binary.BigEndian.PutUint32(b[36:], v.HandshakeType)
|
|
|
|
binary.BigEndian.PutUint32(b[40:], v.SRTSocketID)
|
|
|
|
binary.BigEndian.PutUint32(b[44:], v.SynCookie)
|
|
|
|
|
|
|
|
// Only support IPv4.
|
|
|
|
ip := v.PeerIP.To4()
|
|
|
|
b[48] = ip[3]
|
|
|
|
b[49] = ip[2]
|
|
|
|
b[50] = ip[1]
|
|
|
|
b[51] = ip[0]
|
|
|
|
|
|
|
|
if len(v.ExtraData) > 0 {
|
|
|
|
copy(b[64:], v.ExtraData)
|
|
|
|
}
|
|
|
|
|
|
|
|
return b, nil
|
|
|
|
}
|