From 03ebf8f96c8487646806dc68c5b0cf7ae8f16a7f Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 22 Mar 2018 12:27:28 +0300 Subject: [PATCH] Fixed - race condition with load balancer node selection. #1337 --- .../connection/pool/ConnectionPool.java | 21 ++++++------------- .../connection/pool/MasterConnectionPool.java | 10 +++++---- .../pool/MasterPubSubConnectionPool.java | 7 +++++-- .../java/org/redisson/RedissonTopicTest.java | 4 +++- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 4966f760c..d6b771bb6 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -164,18 +164,16 @@ abstract class ConnectionPool { protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry); - protected ClientConnectionsEntry getEntry() { - return config.getLoadBalancer().getEntry(entries); - } - public RFuture get(RedisCommand command) { - for (int j = entries.size() - 1; j >= 0; j--) { - final ClientConnectionsEntry entry = getEntry(); + List entriesCopy = new LinkedList(entries); + while (!entriesCopy.isEmpty()) { + ClientConnectionsEntry entry = config.getLoadBalancer().getEntry(entriesCopy); if ((!entry.isFreezed() || (entry.getFreezeReason() == FreezeReason.SYSTEM && config.getReadMode() == ReadMode.MASTER_SLAVE)) && tryAcquireConnection(entry)) { return acquireConnection(command, entry); } + entriesCopy.remove(entry); } List failed = new LinkedList(); @@ -201,21 +199,14 @@ abstract class ConnectionPool { } public RFuture get(RedisCommand command, ClientConnectionsEntry entry) { - if ((!entry.isFreezed() || entry.getFreezeReason() == FreezeReason.SYSTEM) && - tryAcquireConnection(entry)) { - return acquireConnection(command, entry); - } - - RedisConnectionException exception = new RedisConnectionException( - "Can't aquire connection to " + entry); - return RedissonPromise.newFailedFuture(exception); + return acquireConnection(command, entry); } public static abstract class AcquireCallback implements Runnable, FutureListener { } - private RFuture acquireConnection(RedisCommand command, final ClientConnectionsEntry entry) { + protected final RFuture acquireConnection(RedisCommand command, final ClientConnectionsEntry entry) { final RPromise result = new RedissonPromise(); AcquireCallback callback = new AcquireCallback() { diff --git a/redisson/src/main/java/org/redisson/connection/pool/MasterConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/MasterConnectionPool.java index 49efd906f..1b48489ee 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/MasterConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/MasterConnectionPool.java @@ -15,11 +15,13 @@ */ package org.redisson.connection.pool; +import org.redisson.api.RFuture; import org.redisson.client.RedisConnection; +import org.redisson.client.protocol.RedisCommand; import org.redisson.config.MasterSlaveServersConfig; +import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; -import org.redisson.connection.ClientConnectionsEntry; /** @@ -35,10 +37,10 @@ public class MasterConnectionPool extends ConnectionPool { } @Override - protected ClientConnectionsEntry getEntry() { - return entries.get(0); + public RFuture get(RedisCommand command) { + return acquireConnection(command, entries.get(0)); } - + public void remove(ClientConnectionsEntry entry) { entries.remove(entry); } diff --git a/redisson/src/main/java/org/redisson/connection/pool/MasterPubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/MasterPubSubConnectionPool.java index 7a7bd47b6..8329b64de 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/MasterPubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/MasterPubSubConnectionPool.java @@ -15,6 +15,9 @@ */ package org.redisson.connection.pool; +import org.redisson.api.RFuture; +import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.protocol.RedisCommand; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionManager; @@ -34,8 +37,8 @@ public class MasterPubSubConnectionPool extends PubSubConnectionPool { } @Override - protected ClientConnectionsEntry getEntry() { - return entries.get(0); + public RFuture get(RedisCommand command) { + return acquireConnection(command, entries.get(0)); } public void remove(ClientConnectionsEntry entry) { diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 77acc07d4..501067601 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -37,6 +37,7 @@ import org.redisson.api.listener.StatusListener; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.config.Config; +import org.redisson.config.SubscriptionMode; import org.redisson.connection.balancer.RandomLoadBalancer; public class RedissonTopicTest { @@ -684,7 +685,7 @@ public class RedissonTopicTest { for (int i = 0; i < 100; i++) { RFuture f1 = redisson.getBucket("i" + i).getAsync(); RFuture f2 = redisson.getBucket("i" + i).setAsync(""); - RFuture f3 = redisson.getTopic("topic").publishAsync("testmsg"); + RFuture f3 = redisson.getTopic("topic").publishAsync(1); futures.add(f1); futures.add(f2); futures.add(f3); @@ -716,6 +717,7 @@ public class RedissonTopicTest { Config config = new Config(); config.useClusterServers() + .setSubscriptionMode(SubscriptionMode.SLAVE) .setLoadBalancer(new RandomLoadBalancer()) .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); RedissonClient redisson = Redisson.create(config);