@ -43,7 +43,8 @@ type ProxyServer struct {
listeners [ ] Listener // accept new connection from remote users
ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
workConnChan chan * conn . Conn // get new work conns from control goroutine
mutex sync . Mutex
mutex sync . RWMutex
closeChan chan struct { } // for notify other goroutines that the proxy is closed by close this channel
}
func NewProxyServer ( ) ( p * ProxyServer ) {
@ -71,9 +72,10 @@ func (p *ProxyServer) Init() {
p . Lock ( )
p . Status = consts . Idle
metric . SetStatus ( p . Name , p . Status )
p . workConnChan = make ( chan * conn . Conn , 1 0 0)
p . ctlMsgChan = make ( chan int64 )
p . workConnChan = make ( chan * conn . Conn , p . PoolCount + 1 0)
p . ctlMsgChan = make ( chan int64 , p . PoolCount + 10 )
p . listeners = make ( [ ] Listener , 0 )
p . closeChan = make ( chan struct { } )
p . Unlock ( )
}
@ -134,6 +136,11 @@ func (p *ProxyServer) Start(c *conn.Conn) (err error) {
p . Unlock ( )
metric . SetStatus ( p . Name , p . Status )
// create connection pool if needed
if p . PoolCount > 0 {
go p . connectionPoolManager ( p . closeChan )
}
// start a goroutine for every listener to accept user connection
for _ , listener := range p . listeners {
go func ( l Listener ) {
@ -153,7 +160,7 @@ func (p *ProxyServer) Start(c *conn.Conn) (err error) {
return
}
// start another goroutine for join two conn s from frpc and user
// start another goroutine for join two conn ections between frpc and user
go func ( ) {
workConn , err := p . getWorkConn ( )
if err != nil {
@ -161,14 +168,14 @@ func (p *ProxyServer) Start(c *conn.Conn) (err error) {
}
userConn := c
// m sg will transfer to another without modifying
// m e ssa ge will be transferred to another without modifying
// l means local, r means remote
log . Debug ( "Join two connections, (l[%s] r[%s]) (l[%s] r[%s])" , workConn . GetLocalAddr ( ) , workConn . GetRemoteAddr ( ) ,
userConn . GetLocalAddr ( ) , userConn . GetRemoteAddr ( ) )
metric . OpenConnection ( p . Name )
needRecord := true
go msg . JoinMore ( userConn , workConn , p . BaseConf , needRecord )
metric . OpenConnection ( p . Name )
} ( )
}
} ( listener )
@ -187,6 +194,7 @@ func (p *ProxyServer) Close() {
}
close ( p . ctlMsgChan )
close ( p . workConnChan )
close ( p . closeChan )
if p . CtlConn != nil {
p . CtlConn . Close ( )
}
@ -210,7 +218,11 @@ func (p *ProxyServer) WaitUserConn() (closeFlag bool) {
}
func ( p * ProxyServer ) RegisterNewWorkConn ( c * conn . Conn ) {
p . workConnChan <- c
select {
case p . workConnChan <- c :
default :
log . Debug ( "ProxyName [%s], workConnChan is full, so close this work connection" , p . Name )
}
}
// When frps get one user connection, we get one work connection from the pool and return it.
@ -219,30 +231,72 @@ func (p *ProxyServer) RegisterNewWorkConn(c *conn.Conn) {
// return an error if wait timeout
func ( p * ProxyServer ) getWorkConn ( ) ( workConn * conn . Conn , err error ) {
var ok bool
// get a work connection from the pool
select {
case workConn , ok = <- p . workConnChan :
if ! ok {
err = fmt . Errorf ( "ProxyName [%s], no work connections available, control is closing" , p . Name )
return
}
default :
// no work connections available in the poll, send message to frpc to get one
p . ctlMsgChan <- 1
for {
select {
case workConn , ok = <- p . workConnChan :
if ! ok {
err = fmt . Errorf ( "ProxyName [%s], no work connections available, control is closing" , p . Name )
return
}
default :
// no work connections available in the poll, send message to frpc to get more
p . ctlMsgChan <- 1
case <- time . After ( time . Duration ( UserConnTimeout ) * time . Second ) :
log . Warn ( "ProxyName [%s], timeout trying to get work connection" , p . Name )
err = fmt . Errorf ( "ProxyName [%s], timeout trying to get work connection" , p . Name )
return
select {
case workConn , ok = <- p . workConnChan :
if ! ok {
err = fmt . Errorf ( "ProxyName [%s], no work connections available, control is closing" , p . Name )
return
}
case <- time . After ( time . Duration ( UserConnTimeout ) * time . Second ) :
log . Warn ( "ProxyName [%s], timeout trying to get work connection" , p . Name )
err = fmt . Errorf ( "ProxyName [%s], timeout trying to get work connection" , p . Name )
return
}
}
// if connection pool is not used, we don't check the status
// function CheckClosed will consume at least 1 millisecond if the connection isn't closed
if p . PoolCount == 0 || ! workConn . CheckClosed ( ) {
break
} else {
log . Warn ( "ProxyName [%s], connection got from pool, but it's already closed" , p . Name )
}
}
return
}
func ( p * ProxyServer ) connectionPoolManager ( closeCh <- chan struct { } ) {
for {
// check if we need more work connections and send messages to frpc to get more
time . Sleep ( time . Duration ( 2 ) * time . Second )
select {
// if the channel closed, it means the proxy is closed, so just return
case <- closeCh :
log . Info ( "ProxyName [%s], connectionPoolManager exit" , p . Name )
return
default :
curWorkConnNum := int64 ( len ( p . workConnChan ) )
diff := p . PoolCount - curWorkConnNum
if diff > 0 {
if diff < p . PoolCount / 5 {
diff = p . PoolCount * 4 / 5 + 1
} else if diff < p . PoolCount / 2 {
diff = p . PoolCount / 4 + 1
} else if diff < p . PoolCount * 4 / 5 {
diff = p . PoolCount / 5 + 1
} else {
diff = p . PoolCount / 10 + 1
}
if diff + curWorkConnNum > p . PoolCount {
diff = p . PoolCount - curWorkConnNum
}
for i := 0 ; i < int ( diff ) ; i ++ {
p . ctlMsgChan <- 1
}
}
}
}
}