Fixed - BlockingQueue.take method doesn't work properly after failover. #1622

pull/1639/head
Nikita 7 years ago
parent 4fef15078c
commit 9ca5977758

@ -46,6 +46,7 @@ public class ClientConnectionsEntry {
private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final AsyncSemaphore freeSubscribeConnectionsCounter;
private final Queue<RedisConnection> allConnections = new ConcurrentLinkedQueue<RedisConnection>();
private final Queue<RedisConnection> freeConnections = new ConcurrentLinkedQueue<RedisConnection>();
private final AsyncSemaphore freeConnectionsCounter;
@ -167,6 +168,8 @@ public class ClientConnectionsEntry {
RedisConnection conn = future.getNow();
onConnect(conn);
log.debug("new connection created: {}", conn);
allConnections.add(conn);
}
});
return future;
@ -215,6 +218,10 @@ public class ClientConnectionsEntry {
});
return future;
}
public Queue<RedisConnection> getAllConnections() {
return allConnections;
}
public Queue<RedisPubSubConnection> getAllSubscribeConnections() {
return allSubscribeConnections;

@ -196,9 +196,13 @@ public class MasterSlaveEntry {
entry.reset();
closeConnections(entry);
for (RedisConnection connection : entry.getAllConnections()) {
connection.closeAsync();
reattachBlockingQueue(connection);
}
for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
connection.closeAsync();
connectionManager.getSubscribeService().reattachPubSub(connection);
}
entry.getAllSubscribeConnections().clear();
@ -206,32 +210,6 @@ public class MasterSlaveEntry {
return true;
}
private void closeConnections(ClientConnectionsEntry entry) {
// close all connections
while (true) {
final RedisConnection connection = entry.pollConnection();
if (connection == null) {
break;
}
connection.closeAsync().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
reattachBlockingQueue(connection);
}
});
}
// close all pub/sub connections
while (true) {
RedisPubSubConnection connection = entry.pollSubscribeConnection();
if (connection == null) {
break;
}
connection.closeAsync();
}
}
private void reattachBlockingQueue(RedisConnection connection) {
final CommandData<?, ?> commandData = connection.getCurrentCommand();
@ -251,7 +229,7 @@ public class MasterSlaveEntry {
}
final RedisConnection newConnection = future.getNow();
final FutureListener<Object> listener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {

@ -22,6 +22,7 @@ import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
public class RedissonBlockingQueueTest extends RedissonQueueTest {
@ -162,6 +163,84 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
runner.stop();
}
@Test
public void testFailoverInSentinel() throws Exception {
RedisRunner.RedisProcess master = new RedisRunner()
.nosave()
.randomDir()
.run();
RedisRunner.RedisProcess slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Thread.sleep(5000);
Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.takeAsync();
f.await(1, TimeUnit.SECONDS);
master.stop();
System.out.println("master " + master.getRedisServerAddressAndPort() + " stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(70));
master = new RedisRunner()
.port(master.getRedisServerPort())
.nosave()
.randomDir()
.run();
System.out.println("master " + master.getRedisServerAddressAndPort() + " started!");
Thread.sleep(25000);
queue1.put(1);
assertThat(f.get()).isEqualTo(1);
redisson.shutdown();
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
}
@Test
public void testTakeReattach() throws Exception {

Loading…
Cancel
Save