From 3c1d94eb560763401d05077b8277147b838383b7 Mon Sep 17 00:00:00 2001 From: joestarzxh Date: Fri, 23 Apr 2021 08:42:18 +0800 Subject: [PATCH 1/3] =?UTF-8?q?[feat]=E6=94=AF=E6=8C=81ws-flv,ws-ts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/base/websocket.go | 97 +++++++++++++++++++++++++++++++ pkg/httpflv/server_sub_session.go | 21 ++++++- pkg/httpts/server_sub_session.go | 20 ++++++- 3 files changed, 135 insertions(+), 3 deletions(-) create mode 100644 pkg/base/websocket.go diff --git a/pkg/base/websocket.go b/pkg/base/websocket.go new file mode 100644 index 0000000..af3ffd0 --- /dev/null +++ b/pkg/base/websocket.go @@ -0,0 +1,97 @@ +package base + +import ( + "crypto/sha1" + "encoding/base64" + + "github.com/q191201771/naza/pkg/bele" +) + +/* +The WebSocket Protocol +https://tools.ietf.org/html/rfc6455 + +0 1 2 3 +0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-------+-+-------------+-------------------------------+ +|F|R|R|R| opcode|M| Payload len | Extended payload length | +|I|S|S|S| (4) |A| (7) | (16/64) | +|N|V|V|V| |S| | (if payload len==126/127) | +| |1|2|3| |K| | | ++-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + +| Extended payload length continued, if payload len == 127 | ++ - - - - - - - - - - - - - - - +-------------------------------+ +| |Masking-key, if MASK set to 1 | ++-------------------------------+-------------------------------+ +| Masking-key (continued) | Payload Data | ++-------------------------------- - - - - - - - - - - - - - - - + +: Payload Data continued ... : ++ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +| Payload Data continued ... | ++---------------------------------------------------------------+ +opcode: +* %x0 denotes a continuation frame +* %x1 denotes a text frame +* %x2 denotes a binary frame +* %x3-7 are reserved for further non-control frames +* %x8 denotes a connection close +* %x9 denotes a ping +* %xA denotes a pong +* %xB-F are reserved for further control frames +Payload length: 7 bits, 7+16 bits, or 7+64 bits +Masking-key: 0 or 4 bytes +*/ + +func MakeWSFrameHeader(AOpCode uint8, AFin bool, AMaskKey uint32, ADataSize uint64) (HeaderBytes []byte) { + LHeaderSize := 2 + LPayload := uint64(0) + if ADataSize < 126 { + LPayload = ADataSize + } else if ADataSize <= 0xFFFF { + LPayload = 126 + LHeaderSize += 2 + } else { + LPayload = 127 + LHeaderSize += 8 + } + if AMaskKey != 0 { + LHeaderSize += 4 + } + HeaderBytes = make([]byte, LHeaderSize, LHeaderSize) + if AFin { + HeaderBytes[0] = HeaderBytes[0] | 0x80 + } + + HeaderBytes[0] = HeaderBytes[0] | (AOpCode & 0x0F) + + if AMaskKey != 0 { + HeaderBytes[1] = HeaderBytes[1] | 0x80 + } + HeaderBytes[1] = HeaderBytes[1] | (uint8(LPayload) & 0x7F) + if LPayload == 126 { + bele.BEPutUint16(HeaderBytes[2:4], uint16(ADataSize)) + } else if LPayload == 127 { + bele.BEPutUint64(HeaderBytes[2:10], ADataSize) + } + + if AMaskKey != 0 { + bele.LEPutUint32(HeaderBytes[LHeaderSize-4:], AMaskKey) + } + return HeaderBytes +} +func UpdateWebSocketHeader(secWebSocketKey string) []byte { + firstLine := "HTTP/1.1 101 Switching Protocol\r\n" + WS_MAGIC_STR := "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + sha1Sum := sha1.Sum([]byte(secWebSocketKey + WS_MAGIC_STR)) + secWebSocketAccept := base64.StdEncoding.EncodeToString(sha1Sum[:]) + flvWebSocketResponseHeaderStr := firstLine + + "Server: " + LALHTTPFLVSubSessionServer + "\r\n" + + "Sec-WebSocket-Accept:" + secWebSocketAccept + "\r\n" + + "Keep-Alive: timeout=15, max=100\r\n" + + "Connection: Upgrade\r\n" + + "Upgrade: websocket\r\n" + + "Access-Control-Allow-Credentials: true\r\n" + + "Access-Control-Allow-Origin: *\r\n" + + "\r\n" + return []byte(flvWebSocketResponseHeaderStr) +} diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index 479b15b..8718941 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -39,6 +39,7 @@ type SubSession struct { prevConnStat connection.Stat staleStat *connection.Stat stat base.StatSession + isWebSocket bool } func NewSubSession(conn net.Conn, scheme string) *SubSession { @@ -83,6 +84,11 @@ func (session *SubSession) ReadRequest() (err error) { _ = rawURL session.urlCtx, err = base.ParseHTTPFLVURL(rawURL, session.scheme == "https") + if session.headers["Connection"] == "Upgrade" && session.headers["Upgrade"] == "websocket" { + session.isWebSocket = true + //回复升级为websocket + session.writeRawPacket(base.UpdateWebSocketHeader(session.headers["Sec-WebSocket-Key"])) + } return } @@ -94,19 +100,31 @@ func (session *SubSession) RunLoop() error { func (session *SubSession) WriteHTTPResponseHeader() { nazalog.Debugf("[%s] > W http response header.", session.uniqueKey) - session.WriteRawPacket(flvHTTPResponseHeader) + if session.isWebSocket { + + } else { + session.WriteRawPacket(flvHTTPResponseHeader) + } } func (session *SubSession) WriteFLVHeader() { nazalog.Debugf("[%s] > W http flv header.", session.uniqueKey) session.WriteRawPacket(FLVHeader) + } func (session *SubSession) WriteTag(tag *Tag) { session.WriteRawPacket(tag.Raw) + } func (session *SubSession) WriteRawPacket(pkt []byte) { + if session.isWebSocket { + session.writeRawPacket(base.MakeWSFrameHeader(2, true, 0, uint64(len(pkt)))) + } + session.writeRawPacket(pkt) +} +func (session *SubSession) writeRawPacket(pkt []byte) { _, _ = session.conn.Write(pkt) } @@ -174,6 +192,7 @@ func init() { "Connection: close\r\n" + "Expires: -1\r\n" + "Pragma: no-cache\r\n" + + "Access-Control-Allow-Credentials: true\r\n" + "Access-Control-Allow-Origin: *\r\n" + "\r\n" diff --git a/pkg/httpts/server_sub_session.go b/pkg/httpts/server_sub_session.go index 81271f3..97be581 100644 --- a/pkg/httpts/server_sub_session.go +++ b/pkg/httpts/server_sub_session.go @@ -38,6 +38,7 @@ type SubSession struct { prevConnStat connection.Stat staleStat *connection.Stat stat base.StatSession + isWebSocket bool } func NewSubSession(conn net.Conn, scheme string) *SubSession { @@ -82,6 +83,11 @@ func (session *SubSession) ReadRequest() (err error) { _ = rawURL session.urlCtx, err = base.ParseHTTPTSURL(rawURL, session.scheme == "https") + if session.headers["Connection"] == "Upgrade" && session.headers["Upgrade"] == "websocket" { + session.isWebSocket = true + //回复升级为websocket + session.writeRawPacket(base.UpdateWebSocketHeader(session.headers["Sec-WebSocket-Key"])) + } return } @@ -93,7 +99,11 @@ func (session *SubSession) RunLoop() error { func (session *SubSession) WriteHTTPResponseHeader() { nazalog.Debugf("[%s] > W http response header.", session.uniqueKey) - session.WriteRawPacket(tsHTTPResponseHeader) + if session.isWebSocket { + + } else { + session.WriteRawPacket(tsHTTPResponseHeader) + } } func (session *SubSession) WriteFragmentHeader() { @@ -102,9 +112,14 @@ func (session *SubSession) WriteFragmentHeader() { } func (session *SubSession) WriteRawPacket(pkt []byte) { + if session.isWebSocket { + session.writeRawPacket(base.MakeWSFrameHeader(2, true, 0, uint64(len(pkt)))) + } + session.writeRawPacket(pkt) +} +func (session *SubSession) writeRawPacket(pkt []byte) { _, _ = session.conn.Write(pkt) } - func (session *SubSession) Dispose() error { nazalog.Infof("[%s] lifecycle dispose httpts SubSession.", session.uniqueKey) return session.conn.Close() @@ -169,6 +184,7 @@ func init() { "Connection: close\r\n" + "Expires: -1\r\n" + "Pragma: no-cache\r\n" + + "Access-Control-Allow-Credentials: true\r\n" + "Access-Control-Allow-Origin: *\r\n" + "\r\n" From ec225fd012616d87a7e109175b2a801e82a1252a Mon Sep 17 00:00:00 2001 From: joestarzxh Date: Fri, 23 Apr 2021 08:51:38 +0800 Subject: [PATCH 2/3] =?UTF-8?q?[style]websocket=E5=8D=95=E5=85=83=E5=8F=98?= =?UTF-8?q?=E9=87=8F=E5=90=8D=E7=A7=B0=E4=BF=AE=E6=94=B9=E4=B8=80=E4=B8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/base/websocket.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/base/websocket.go b/pkg/base/websocket.go index af3ffd0..49d2b31 100644 --- a/pkg/base/websocket.go +++ b/pkg/base/websocket.go @@ -41,6 +41,7 @@ opcode: Payload length: 7 bits, 7+16 bits, or 7+64 bits Masking-key: 0 or 4 bytes */ +const WS_MAGIC_STR = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" func MakeWSFrameHeader(AOpCode uint8, AFin bool, AMaskKey uint32, ADataSize uint64) (HeaderBytes []byte) { LHeaderSize := 2 @@ -81,10 +82,9 @@ func MakeWSFrameHeader(AOpCode uint8, AFin bool, AMaskKey uint32, ADataSize uint } func UpdateWebSocketHeader(secWebSocketKey string) []byte { firstLine := "HTTP/1.1 101 Switching Protocol\r\n" - WS_MAGIC_STR := "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" sha1Sum := sha1.Sum([]byte(secWebSocketKey + WS_MAGIC_STR)) secWebSocketAccept := base64.StdEncoding.EncodeToString(sha1Sum[:]) - flvWebSocketResponseHeaderStr := firstLine + + webSocketResponseHeaderStr := firstLine + "Server: " + LALHTTPFLVSubSessionServer + "\r\n" + "Sec-WebSocket-Accept:" + secWebSocketAccept + "\r\n" + "Keep-Alive: timeout=15, max=100\r\n" + @@ -93,5 +93,5 @@ func UpdateWebSocketHeader(secWebSocketKey string) []byte { "Access-Control-Allow-Credentials: true\r\n" + "Access-Control-Allow-Origin: *\r\n" + "\r\n" - return []byte(flvWebSocketResponseHeaderStr) + return []byte(webSocketResponseHeaderStr) } From dece388f71d09db624330a5915d74ba5d00b1c57 Mon Sep 17 00:00:00 2001 From: joestarzxh Date: Fri, 23 Apr 2021 15:17:29 +0800 Subject: [PATCH 3/3] =?UTF-8?q?[style]websocket=E7=9B=B8=E5=85=B3=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/base/websocket.go | 107 ++++++++++++++++++++++-------- pkg/httpflv/server_sub_session.go | 11 ++- pkg/httpts/server_sub_session.go | 11 ++- 3 files changed, 99 insertions(+), 30 deletions(-) diff --git a/pkg/base/websocket.go b/pkg/base/websocket.go index 49d2b31..c3f9fde 100644 --- a/pkg/base/websocket.go +++ b/pkg/base/websocket.go @@ -3,6 +3,7 @@ package base import ( "crypto/sha1" "encoding/base64" + "math" "github.com/q191201771/naza/pkg/bele" ) @@ -40,45 +41,95 @@ opcode: * %xB-F are reserved for further control frames Payload length: 7 bits, 7+16 bits, or 7+64 bits Masking-key: 0 or 4 bytes +mark 加密 +for i := 0; i < datalen; i { + m := markingkeys[i%4] + data[i] = msg[i] ^ m +} */ +type WsOpcode = uint8 + +const ( + WSO_Continuous WsOpcode = iota //连续消息片断 + WSO_Text //文本消息片断, + WSO_Binary //二进制消息片断, + //非控制消息片断保留的操作码, + WSO_Rsv3 + WSO_Rsv4 + WSO_Rsv5 + WSO_Rsv6 + WSO_Rsv7 + WSO_Close //连接关闭, + WSO_Ping //心跳检查的ping, + WSO_Pong //心跳检查的pong, + //为将来的控制消息片断的保留操作码 + WSO_RsvB + WSO_RsvC + WSO_RsvD + WSO_RsvE + WSO_RsvF +) + +type WSHeader struct { + Fin bool + Rsv1 bool + Rsv2 bool + Rsv3 bool + Opcode WsOpcode + + PayloadLength uint64 + + Masked bool + MaskKey uint32 +} + const WS_MAGIC_STR = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" -func MakeWSFrameHeader(AOpCode uint8, AFin bool, AMaskKey uint32, ADataSize uint64) (HeaderBytes []byte) { - LHeaderSize := 2 - LPayload := uint64(0) - if ADataSize < 126 { - LPayload = ADataSize - } else if ADataSize <= 0xFFFF { - LPayload = 126 - LHeaderSize += 2 - } else { - LPayload = 127 - LHeaderSize += 8 +func MakeWSFrameHeader(wsHeader WSHeader) (buf []byte) { + headerSize := 2 + payload := uint64(0) + switch { + case wsHeader.PayloadLength < 126: + payload = wsHeader.PayloadLength + case wsHeader.PayloadLength <= math.MaxUint16: + payload = 126 + headerSize += 2 + case wsHeader.PayloadLength > math.MaxUint16: + payload = 127 + headerSize += 8 } - if AMaskKey != 0 { - LHeaderSize += 4 + if wsHeader.Masked { + headerSize += 4 } - HeaderBytes = make([]byte, LHeaderSize, LHeaderSize) - if AFin { - HeaderBytes[0] = HeaderBytes[0] | 0x80 + buf = make([]byte, headerSize, headerSize) + if wsHeader.Fin { + buf[0] |= 1 << 7 } + if wsHeader.Rsv1 { + buf[0] |= 1 << 6 + } + if wsHeader.Rsv2 { + buf[0] |= 1 << 5 + } + if wsHeader.Rsv3 { + buf[0] |= 1 << 4 + } + buf[0] |= wsHeader.Opcode - HeaderBytes[0] = HeaderBytes[0] | (AOpCode & 0x0F) - - if AMaskKey != 0 { - HeaderBytes[1] = HeaderBytes[1] | 0x80 + if wsHeader.Masked { + buf[1] |= 1 << 7 } - HeaderBytes[1] = HeaderBytes[1] | (uint8(LPayload) & 0x7F) - if LPayload == 126 { - bele.BEPutUint16(HeaderBytes[2:4], uint16(ADataSize)) - } else if LPayload == 127 { - bele.BEPutUint64(HeaderBytes[2:10], ADataSize) + buf[1] |= (uint8(payload) & 0x7F) + if payload == 126 { + bele.BEPutUint16(buf[2:], uint16(wsHeader.PayloadLength)) + } else if payload == 127 { + bele.BEPutUint64(buf[2:], wsHeader.PayloadLength) } - if AMaskKey != 0 { - bele.LEPutUint32(HeaderBytes[LHeaderSize-4:], AMaskKey) + if wsHeader.Masked { + bele.LEPutUint32(buf[headerSize-4:], wsHeader.MaskKey) } - return HeaderBytes + return buf } func UpdateWebSocketHeader(secWebSocketKey string) []byte { firstLine := "HTTP/1.1 101 Switching Protocol\r\n" diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index 8718941..432ff64 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -120,7 +120,16 @@ func (session *SubSession) WriteTag(tag *Tag) { func (session *SubSession) WriteRawPacket(pkt []byte) { if session.isWebSocket { - session.writeRawPacket(base.MakeWSFrameHeader(2, true, 0, uint64(len(pkt)))) + wsHeader := base.WSHeader{ + Fin: true, + Rsv1: false, + Rsv2: false, + Rsv3: false, + Opcode: base.WSO_Binary, + PayloadLength: uint64(len(pkt)), + Masked: false, + } + session.writeRawPacket(base.MakeWSFrameHeader(wsHeader)) } session.writeRawPacket(pkt) } diff --git a/pkg/httpts/server_sub_session.go b/pkg/httpts/server_sub_session.go index 97be581..05094d7 100644 --- a/pkg/httpts/server_sub_session.go +++ b/pkg/httpts/server_sub_session.go @@ -113,7 +113,16 @@ func (session *SubSession) WriteFragmentHeader() { func (session *SubSession) WriteRawPacket(pkt []byte) { if session.isWebSocket { - session.writeRawPacket(base.MakeWSFrameHeader(2, true, 0, uint64(len(pkt)))) + wsHeader := base.WSHeader{ + Fin: true, + Rsv1: false, + Rsv2: false, + Rsv3: false, + Opcode: base.WSO_Binary, + PayloadLength: uint64(len(pkt)), + Masked: false, + } + session.writeRawPacket(base.MakeWSFrameHeader(wsHeader)) } session.writeRawPacket(pkt) }