Fixed - failover handling may cause temporary connections spike. #4920

pull/4944/head
Nikita Koksharov 2 years ago
parent 2aab362974
commit 318f11fea6

@ -15,29 +15,23 @@
*/
package org.redisson.client.handler;
import java.util.Map.Entry;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timer;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.misc.AsyncSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.Map.Entry;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
*
@ -53,6 +47,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private final Bootstrap bootstrap;
private final ChannelGroup channels;
private static final int BACKOFF_CAP = 12;
private final AsyncSemaphore semaphore = new AsyncSemaphore(2);
public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels, Timer timer) {
this.bootstrap = bootstrap;
@ -78,35 +73,40 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
if (connection.isFastReconnect()) {
tryReconnect(connection, 1);
} else {
reconnect(connection, 1);
semaphore.acquire().thenAccept(r -> {
reconnect(connection, 1);
});
}
}
}
ctx.fireChannelInactive();
}
private void reconnect(final RedisConnection connection, final int attempts){
int timeout = 2 << attempts;
if (bootstrap.config().group().isShuttingDown()) {
private void reconnect(RedisConnection connection, int attempts){
if (connection.isClosed()) {
semaphore.release();
return;
}
if (connection.getRedisClient().isShutdown()
|| bootstrap.config().group().isShuttingDown()) {
return;
}
int timeout = 2 << attempts;
try {
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1));
}
}, timeout, TimeUnit.MILLISECONDS);
timer.newTimeout(t -> tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1)), timeout, TimeUnit.MILLISECONDS);
} catch (IllegalStateException e) {
// skip
}
}
private void tryReconnect(RedisConnection connection, int nextAttempt) {
if (connection.isClosed()) {
semaphore.release();
return;
}
if (connection.getRedisClient().isShutdown()
|| connection.isClosed()
|| bootstrap.config().group().isShuttingDown()) {
|| bootstrap.config().group().isShuttingDown()) {
return;
}
@ -116,7 +116,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
bootstrap.connect(connection.getRedisClient().getAddr()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
public void operationComplete(ChannelFuture future) throws Exception {
if (connection.getRedisClient().isShutdown()
|| connection.isClosed()
|| bootstrap.config().group().isShuttingDown()) {
@ -127,11 +127,14 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
con.closeAsync();
}
}
if (connection.isClosed()) {
semaphore.release();
}
return;
}
if (future.isSuccess()) {
final Channel channel = future.channel();
Channel channel = future.channel();
if (channel.localAddress().equals(channel.remoteAddress())) {
channel.close();
log.error("local address and remote address are the same! connected to: {}, localAddress: {} remoteAddress: {}",
@ -140,6 +143,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
RedisConnection c = RedisConnection.getFrom(channel);
c.getConnectionPromise().whenComplete((res, e) -> {
if (e == null) {
semaphore.release();
if (connection.getRedisClient().isShutdown()
|| connection.isClosed()) {
channel.close();
@ -201,12 +205,9 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
}
ChannelFuture future = connection.send(currentCommand);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't reconnect blocking queue by command: {} using connection: {}", currentCommand, connection);
}
future.addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
log.error("Can't reconnect blocking queue by command: {} using connection: {}", currentCommand, connection);
}
});
}

@ -813,7 +813,12 @@ public class RedissonTest extends BaseTest {
.run();
Config config = new Config();
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
config.useSingleServer()
.setConnectionMinimumIdleSize(20)
.setConnectionPoolSize(20)
.setSubscriptionConnectionMinimumIdleSize(20)
.setSubscriptionConnectionPoolSize(20)
.setAddress(runner.getRedisServerAddressAndPort());
RedissonClient r = Redisson.create(config);

Loading…
Cancel
Save