From 9c4ec56491efd534b9eac98155c76455cc9b329e Mon Sep 17 00:00:00 2001 From: fatedier Date: Fri, 29 Mar 2019 19:01:18 +0800 Subject: [PATCH] support proxy protocol --- client/control.go | 2 +- client/proxy/proxy.go | 55 ++++++++++++++++++++++++++--------- client/proxy/proxy_manager.go | 4 +-- client/proxy/proxy_wrapper.go | 4 +-- conf/frpc_full.ini | 3 ++ go.mod | 1 + go.sum | 2 ++ models/config/proxy.go | 14 +++++++-- models/msg/msg.go | 4 +++ server/proxy/http.go | 2 +- server/proxy/proxy.go | 29 ++++++++++++++++-- server/proxy/udp.go | 2 +- server/proxy/xtcp.go | 2 +- 13 files changed, 98 insertions(+), 26 deletions(-) diff --git a/client/control.go b/client/control.go index bbcece6b..31f18f10 100644 --- a/client/control.go +++ b/client/control.go @@ -131,7 +131,7 @@ func (ctl *Control) HandleReqWorkConn(inMsg *msg.ReqWorkConn) { workConn.AddLogPrefix(startMsg.ProxyName) // dispatch this work connection to related proxy - ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn) + ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn, &startMsg) } func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) { diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index 55e87a7c..3b7d1831 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -37,6 +37,7 @@ import ( frpIo "github.com/fatedier/golib/io" "github.com/fatedier/golib/pool" fmux "github.com/hashicorp/yamux" + pp "github.com/pires/go-proxyproto" ) // Proxy defines how to handle work connections for different proxy type. @@ -44,7 +45,7 @@ type Proxy interface { Run() error // InWorkConn accept work connections registered to server. - InWorkConn(conn frpNet.Conn) + InWorkConn(frpNet.Conn, *msg.StartWorkConn) Close() log.Logger @@ -119,9 +120,9 @@ func (pxy *TcpProxy) Close() { } } -func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn) { +func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(g.GlbClientCfg.Token)) + []byte(g.GlbClientCfg.Token), m) } // HTTP @@ -148,9 +149,9 @@ func (pxy *HttpProxy) Close() { } } -func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn) { +func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(g.GlbClientCfg.Token)) + []byte(g.GlbClientCfg.Token), m) } // HTTPS @@ -177,9 +178,9 @@ func (pxy *HttpsProxy) Close() { } } -func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn) { +func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(g.GlbClientCfg.Token)) + []byte(g.GlbClientCfg.Token), m) } // STCP @@ -206,9 +207,9 @@ func (pxy *StcpProxy) Close() { } } -func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn) { +func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(g.GlbClientCfg.Token)) + []byte(g.GlbClientCfg.Token), m) } // XTCP @@ -235,7 +236,7 @@ func (pxy *XtcpProxy) Close() { } } -func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn) { +func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { defer conn.Close() var natHoleSidMsg msg.NatHoleSid err := msg.ReadMsgInto(conn, &natHoleSidMsg) @@ -353,7 +354,7 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn) { } HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, - frpNet.WrapConn(muxConn), []byte(pxy.cfg.Sk)) + frpNet.WrapConn(muxConn), []byte(pxy.cfg.Sk), m) } func (pxy *XtcpProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) { @@ -415,7 +416,7 @@ func (pxy *UdpProxy) Close() { } } -func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) { +func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) { pxy.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String()) // close resources releated with old workConn pxy.Close() @@ -482,7 +483,7 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) { // Common handler for tcp work connections. func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin, - baseInfo *config.BaseProxyConf, workConn frpNet.Conn, encKey []byte) { + baseInfo *config.BaseProxyConf, workConn frpNet.Conn, encKey []byte, m *msg.StartWorkConn) { var ( remote io.ReadWriteCloser @@ -518,6 +519,34 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin. workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(), localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String()) + + // check if we need to send proxy protocol info + if baseInfo.ProxyProtocolVersion != "" { + if m.SrcAddr != "" && m.SrcPort != 0 { + h := &pp.Header{ + Command: pp.PROXY, + SourceAddress: net.ParseIP(m.SrcAddr), + SourcePort: m.SrcPort, + DestinationAddress: net.ParseIP(m.DstAddr), + DestinationPort: m.DstPort, + } + + if h.SourceAddress.To16() == nil { + h.TransportProtocol = pp.TCPv4 + } else { + h.TransportProtocol = pp.TCPv6 + } + + if baseInfo.ProxyProtocolVersion == "v1" { + h.Version = 1 + } else if baseInfo.ProxyProtocolVersion == "v2" { + h.Version = 2 + } + + h.WriteTo(localConn) + } + } + frpIo.Join(localConn, remote) workConn.Debug("join connections closed") } diff --git a/client/proxy/proxy_manager.go b/client/proxy/proxy_manager.go index 6f0bb411..fb230b3e 100644 --- a/client/proxy/proxy_manager.go +++ b/client/proxy/proxy_manager.go @@ -58,12 +58,12 @@ func (pm *ProxyManager) Close() { pm.proxies = make(map[string]*ProxyWrapper) } -func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) { +func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn, m *msg.StartWorkConn) { pm.mu.RLock() pw, ok := pm.proxies[name] pm.mu.RUnlock() if ok { - pw.InWorkConn(workConn) + pw.InWorkConn(workConn, m) } else { workConn.Close() } diff --git a/client/proxy/proxy_wrapper.go b/client/proxy/proxy_wrapper.go index f95144c6..0b29e481 100644 --- a/client/proxy/proxy_wrapper.go +++ b/client/proxy/proxy_wrapper.go @@ -217,13 +217,13 @@ func (pw *ProxyWrapper) statusFailedCallback() { pw.Info("health check failed") } -func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) { +func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn, m *msg.StartWorkConn) { pw.mu.RLock() pxy := pw.pxy pw.mu.RUnlock() if pxy != nil { workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String()) - go pxy.InWorkConn(workConn) + go pxy.InWorkConn(workConn, m) } else { workConn.Close() } diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index 60bf859d..378612b5 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -154,6 +154,9 @@ use_encryption = false use_compression = false subdomain = web01 custom_domains = web02.yourdomain.com +# if not empty, frpc will use proxy protocol to transfer connection info to your local service +# v1 or v2 or empty +proxy_protocol_version = v2 [plugin_unix_domain_socket] type = tcp diff --git a/go.mod b/go.mod index 9bd172b8..b9a6493b 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/klauspost/cpuid v1.2.0 // indirect github.com/klauspost/reedsolomon v1.9.1 // indirect github.com/mattn/go-runewidth v0.0.4 // indirect + github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc github.com/pkg/errors v0.8.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rakyll/statik v0.1.1 diff --git a/go.sum b/go.sum index d8d783da..64df3822 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/klauspost/reedsolomon v1.9.1 h1:kYrT1MlR4JH6PqOpC+okdb9CDTcwEC/BqpzK4 github.com/klauspost/reedsolomon v1.9.1/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc h1:lNOt1SMsgHXTdpuGw+RpnJtzUcCb/oRKZP65pBy9pr8= +github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc/go.mod h1:6/gX3+E/IYGa0wMORlSMla999awQFdbaeQCHjSMKIzY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rakyll/statik v0.1.1/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs= diff --git a/models/config/proxy.go b/models/config/proxy.go index cdb06b8e..a27416f9 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -107,8 +107,10 @@ type BaseProxyConf struct { Group string `json:"group"` GroupKey string `json:"group_key"` + // only used for client + ProxyProtocolVersion string `json:"proxy_protocol_version"` LocalSvrConf - HealthCheckConf // only used for client + HealthCheckConf } func (cfg *BaseProxyConf) GetBaseInfo() *BaseProxyConf { @@ -121,7 +123,8 @@ func (cfg *BaseProxyConf) compare(cmp *BaseProxyConf) bool { cfg.UseEncryption != cmp.UseEncryption || cfg.UseCompression != cmp.UseCompression || cfg.Group != cmp.Group || - cfg.GroupKey != cmp.GroupKey { + cfg.GroupKey != cmp.GroupKey || + cfg.ProxyProtocolVersion != cmp.ProxyProtocolVersion { return false } if !cfg.LocalSvrConf.compare(&cmp.LocalSvrConf) { @@ -162,6 +165,7 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i cfg.Group = section["group"] cfg.GroupKey = section["group_key"] + cfg.ProxyProtocolVersion = section["proxy_protocol_version"] if err := cfg.LocalSvrConf.UnmarshalFromIni(prefix, name, section); err != nil { return err @@ -194,6 +198,12 @@ func (cfg *BaseProxyConf) MarshalToMsg(pMsg *msg.NewProxy) { } func (cfg *BaseProxyConf) checkForCli() (err error) { + if cfg.ProxyProtocolVersion != "" { + if cfg.ProxyProtocolVersion != "v1" && cfg.ProxyProtocolVersion != "v2" { + return fmt.Errorf("no support proxy protocol version: %s", cfg.ProxyProtocolVersion) + } + } + if err = cfg.LocalSvrConf.checkForCli(); err != nil { return } diff --git a/models/msg/msg.go b/models/msg/msg.go index 2d5985c4..11d2542f 100644 --- a/models/msg/msg.go +++ b/models/msg/msg.go @@ -126,6 +126,10 @@ type ReqWorkConn struct { type StartWorkConn struct { ProxyName string `json:"proxy_name"` + SrcAddr string `json:"src_addr"` + DstAddr string `json:"dst_addr"` + SrcPort uint16 `json:"src_port"` + DstPort uint16 `json:"dst_port"` } type NewVisitorConn struct { diff --git a/server/proxy/http.go b/server/proxy/http.go index 1dde20ec..e6092226 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -98,7 +98,7 @@ func (pxy *HttpProxy) GetConf() config.ProxyConf { } func (pxy *HttpProxy) GetRealConn() (workConn frpNet.Conn, err error) { - tmpConn, errRet := pxy.GetWorkConnFromPool() + tmpConn, errRet := pxy.GetWorkConnFromPool(nil, nil) if errRet != nil { err = errRet return diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index c07453a1..dd7bb79d 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -17,6 +17,8 @@ package proxy import ( "fmt" "io" + "net" + "strconv" "sync" "github.com/fatedier/frp/g" @@ -36,7 +38,7 @@ type Proxy interface { Run() (remoteAddr string, err error) GetName() string GetConf() config.ProxyConf - GetWorkConnFromPool() (workConn frpNet.Conn, err error) + GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Conn, err error) GetUsedPortsNum() int Close() log.Logger @@ -70,7 +72,7 @@ func (pxy *BaseProxy) Close() { } } -func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) { +func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Conn, err error) { // try all connections from the pool for i := 0; i < pxy.poolCount+1; i++ { if workConn, err = pxy.getWorkConnFn(); err != nil { @@ -80,8 +82,29 @@ func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) { pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String()) workConn.AddLogPrefix(pxy.GetName()) + var ( + srcAddr string + dstAddr string + srcPortStr string + dstPortStr string + srcPort int + dstPort int + ) + + if src != nil { + srcAddr, srcPortStr, _ = net.SplitHostPort(src.String()) + srcPort, _ = strconv.Atoi(srcPortStr) + } + if dst != nil { + dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String()) + dstPort, _ = strconv.Atoi(dstPortStr) + } err := msg.WriteMsg(workConn, &msg.StartWorkConn{ ProxyName: pxy.GetName(), + SrcAddr: srcAddr, + SrcPort: uint16(srcPort), + DstAddr: dstAddr, + DstPort: uint16(dstPort), }) if err != nil { workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i) @@ -177,7 +200,7 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector sta defer userConn.Close() // try all connections from the pool - workConn, err := pxy.GetWorkConnFromPool() + workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr()) if err != nil { return } diff --git a/server/proxy/udp.go b/server/proxy/udp.go index 104bff86..b5dd5fbb 100644 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -160,7 +160,7 @@ func (pxy *UdpProxy) Run() (remoteAddr string, err error) { // Sleep a while for waiting control send the NewProxyResp to client. time.Sleep(500 * time.Millisecond) for { - workConn, err := pxy.GetWorkConnFromPool() + workConn, err := pxy.GetWorkConnFromPool(nil, nil) if err != nil { time.Sleep(1 * time.Second) // check if proxy is closed diff --git a/server/proxy/xtcp.go b/server/proxy/xtcp.go index 87266669..925485c1 100644 --- a/server/proxy/xtcp.go +++ b/server/proxy/xtcp.go @@ -44,7 +44,7 @@ func (pxy *XtcpProxy) Run() (remoteAddr string, err error) { break case sidRequest := <-sidCh: sr := sidRequest - workConn, errRet := pxy.GetWorkConnFromPool() + workConn, errRet := pxy.GetWorkConnFromPool(nil, nil) if errRet != nil { continue }