diff --git a/pkg/base/websocket.go b/pkg/base/websocket.go new file mode 100644 index 0000000..c3f9fde --- /dev/null +++ b/pkg/base/websocket.go @@ -0,0 +1,148 @@ +package base + +import ( + "crypto/sha1" + "encoding/base64" + "math" + + "github.com/q191201771/naza/pkg/bele" +) + +/* +The WebSocket Protocol +https://tools.ietf.org/html/rfc6455 + +0 1 2 3 +0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-------+-+-------------+-------------------------------+ +|F|R|R|R| opcode|M| Payload len | Extended payload length | +|I|S|S|S| (4) |A| (7) | (16/64) | +|N|V|V|V| |S| | (if payload len==126/127) | +| |1|2|3| |K| | | ++-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + +| Extended payload length continued, if payload len == 127 | ++ - - - - - - - - - - - - - - - +-------------------------------+ +| |Masking-key, if MASK set to 1 | ++-------------------------------+-------------------------------+ +| Masking-key (continued) | Payload Data | ++-------------------------------- - - - - - - - - - - - - - - - + +: Payload Data continued ... : ++ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +| Payload Data continued ... | ++---------------------------------------------------------------+ +opcode: +* %x0 denotes a continuation frame +* %x1 denotes a text frame +* %x2 denotes a binary frame +* %x3-7 are reserved for further non-control frames +* %x8 denotes a connection close +* %x9 denotes a ping +* %xA denotes a pong +* %xB-F are reserved for further control frames +Payload length: 7 bits, 7+16 bits, or 7+64 bits +Masking-key: 0 or 4 bytes +mark 加密 +for i := 0; i < datalen; i { + m := markingkeys[i%4] + data[i] = msg[i] ^ m +} +*/ +type WsOpcode = uint8 + +const ( + WSO_Continuous WsOpcode = iota //连续消息片断 + WSO_Text //文本消息片断, + WSO_Binary //二进制消息片断, + //非控制消息片断保留的操作码, + WSO_Rsv3 + WSO_Rsv4 + WSO_Rsv5 + WSO_Rsv6 + WSO_Rsv7 + WSO_Close //连接关闭, + WSO_Ping //心跳检查的ping, + WSO_Pong //心跳检查的pong, + //为将来的控制消息片断的保留操作码 + WSO_RsvB + WSO_RsvC + WSO_RsvD + WSO_RsvE + WSO_RsvF +) + +type WSHeader struct { + Fin bool + Rsv1 bool + Rsv2 bool + Rsv3 bool + Opcode WsOpcode + + PayloadLength uint64 + + Masked bool + MaskKey uint32 +} + +const WS_MAGIC_STR = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + +func MakeWSFrameHeader(wsHeader WSHeader) (buf []byte) { + headerSize := 2 + payload := uint64(0) + switch { + case wsHeader.PayloadLength < 126: + payload = wsHeader.PayloadLength + case wsHeader.PayloadLength <= math.MaxUint16: + payload = 126 + headerSize += 2 + case wsHeader.PayloadLength > math.MaxUint16: + payload = 127 + headerSize += 8 + } + if wsHeader.Masked { + headerSize += 4 + } + buf = make([]byte, headerSize, headerSize) + if wsHeader.Fin { + buf[0] |= 1 << 7 + } + if wsHeader.Rsv1 { + buf[0] |= 1 << 6 + } + if wsHeader.Rsv2 { + buf[0] |= 1 << 5 + } + if wsHeader.Rsv3 { + buf[0] |= 1 << 4 + } + buf[0] |= wsHeader.Opcode + + if wsHeader.Masked { + buf[1] |= 1 << 7 + } + buf[1] |= (uint8(payload) & 0x7F) + if payload == 126 { + bele.BEPutUint16(buf[2:], uint16(wsHeader.PayloadLength)) + } else if payload == 127 { + bele.BEPutUint64(buf[2:], wsHeader.PayloadLength) + } + + if wsHeader.Masked { + bele.LEPutUint32(buf[headerSize-4:], wsHeader.MaskKey) + } + return buf +} +func UpdateWebSocketHeader(secWebSocketKey string) []byte { + firstLine := "HTTP/1.1 101 Switching Protocol\r\n" + sha1Sum := sha1.Sum([]byte(secWebSocketKey + WS_MAGIC_STR)) + secWebSocketAccept := base64.StdEncoding.EncodeToString(sha1Sum[:]) + webSocketResponseHeaderStr := firstLine + + "Server: " + LALHTTPFLVSubSessionServer + "\r\n" + + "Sec-WebSocket-Accept:" + secWebSocketAccept + "\r\n" + + "Keep-Alive: timeout=15, max=100\r\n" + + "Connection: Upgrade\r\n" + + "Upgrade: websocket\r\n" + + "Access-Control-Allow-Credentials: true\r\n" + + "Access-Control-Allow-Origin: *\r\n" + + "\r\n" + return []byte(webSocketResponseHeaderStr) +} diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index 479b15b..432ff64 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -39,6 +39,7 @@ type SubSession struct { prevConnStat connection.Stat staleStat *connection.Stat stat base.StatSession + isWebSocket bool } func NewSubSession(conn net.Conn, scheme string) *SubSession { @@ -83,6 +84,11 @@ func (session *SubSession) ReadRequest() (err error) { _ = rawURL session.urlCtx, err = base.ParseHTTPFLVURL(rawURL, session.scheme == "https") + if session.headers["Connection"] == "Upgrade" && session.headers["Upgrade"] == "websocket" { + session.isWebSocket = true + //回复升级为websocket + session.writeRawPacket(base.UpdateWebSocketHeader(session.headers["Sec-WebSocket-Key"])) + } return } @@ -94,19 +100,40 @@ func (session *SubSession) RunLoop() error { func (session *SubSession) WriteHTTPResponseHeader() { nazalog.Debugf("[%s] > W http response header.", session.uniqueKey) - session.WriteRawPacket(flvHTTPResponseHeader) + if session.isWebSocket { + + } else { + session.WriteRawPacket(flvHTTPResponseHeader) + } } func (session *SubSession) WriteFLVHeader() { nazalog.Debugf("[%s] > W http flv header.", session.uniqueKey) session.WriteRawPacket(FLVHeader) + } func (session *SubSession) WriteTag(tag *Tag) { session.WriteRawPacket(tag.Raw) + } func (session *SubSession) WriteRawPacket(pkt []byte) { + if session.isWebSocket { + wsHeader := base.WSHeader{ + Fin: true, + Rsv1: false, + Rsv2: false, + Rsv3: false, + Opcode: base.WSO_Binary, + PayloadLength: uint64(len(pkt)), + Masked: false, + } + session.writeRawPacket(base.MakeWSFrameHeader(wsHeader)) + } + session.writeRawPacket(pkt) +} +func (session *SubSession) writeRawPacket(pkt []byte) { _, _ = session.conn.Write(pkt) } @@ -174,6 +201,7 @@ func init() { "Connection: close\r\n" + "Expires: -1\r\n" + "Pragma: no-cache\r\n" + + "Access-Control-Allow-Credentials: true\r\n" + "Access-Control-Allow-Origin: *\r\n" + "\r\n" diff --git a/pkg/httpts/server_sub_session.go b/pkg/httpts/server_sub_session.go index 81271f3..05094d7 100644 --- a/pkg/httpts/server_sub_session.go +++ b/pkg/httpts/server_sub_session.go @@ -38,6 +38,7 @@ type SubSession struct { prevConnStat connection.Stat staleStat *connection.Stat stat base.StatSession + isWebSocket bool } func NewSubSession(conn net.Conn, scheme string) *SubSession { @@ -82,6 +83,11 @@ func (session *SubSession) ReadRequest() (err error) { _ = rawURL session.urlCtx, err = base.ParseHTTPTSURL(rawURL, session.scheme == "https") + if session.headers["Connection"] == "Upgrade" && session.headers["Upgrade"] == "websocket" { + session.isWebSocket = true + //回复升级为websocket + session.writeRawPacket(base.UpdateWebSocketHeader(session.headers["Sec-WebSocket-Key"])) + } return } @@ -93,7 +99,11 @@ func (session *SubSession) RunLoop() error { func (session *SubSession) WriteHTTPResponseHeader() { nazalog.Debugf("[%s] > W http response header.", session.uniqueKey) - session.WriteRawPacket(tsHTTPResponseHeader) + if session.isWebSocket { + + } else { + session.WriteRawPacket(tsHTTPResponseHeader) + } } func (session *SubSession) WriteFragmentHeader() { @@ -102,9 +112,23 @@ func (session *SubSession) WriteFragmentHeader() { } func (session *SubSession) WriteRawPacket(pkt []byte) { + if session.isWebSocket { + wsHeader := base.WSHeader{ + Fin: true, + Rsv1: false, + Rsv2: false, + Rsv3: false, + Opcode: base.WSO_Binary, + PayloadLength: uint64(len(pkt)), + Masked: false, + } + session.writeRawPacket(base.MakeWSFrameHeader(wsHeader)) + } + session.writeRawPacket(pkt) +} +func (session *SubSession) writeRawPacket(pkt []byte) { _, _ = session.conn.Write(pkt) } - func (session *SubSession) Dispose() error { nazalog.Infof("[%s] lifecycle dispose httpts SubSession.", session.uniqueKey) return session.conn.Close() @@ -169,6 +193,7 @@ func init() { "Connection: close\r\n" + "Expires: -1\r\n" + "Pragma: no-cache\r\n" + + "Access-Control-Allow-Credentials: true\r\n" + "Access-Control-Allow-Origin: *\r\n" + "\r\n"