From 3b5c1f848b4c62a993d18f4ca19a3a04a69ed0a0 Mon Sep 17 00:00:00 2001 From: Shailender Bathula Date: Thu, 18 Aug 2016 11:13:14 +1200 Subject: [PATCH 1/3] #582: Always find a connection to read --- .../balancer/WeightedRoundRobinBalancer.java | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/balancer/WeightedRoundRobinBalancer.java b/redisson/src/main/java/org/redisson/connection/balancer/WeightedRoundRobinBalancer.java index b7f43d0c0..c5227783c 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/WeightedRoundRobinBalancer.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/WeightedRoundRobinBalancer.java @@ -117,7 +117,6 @@ public class WeightedRoundRobinBalancer implements LoadBalancer { Map weightsCopy = new HashMap(weights); - List clientsCopy = new ArrayList(); synchronized (this) { for (Iterator iterator = weightsCopy.values().iterator(); iterator.hasNext();) { @@ -136,15 +135,18 @@ public class WeightedRoundRobinBalancer implements LoadBalancer { weightsCopy = weights; } + List clientsCopy = findClients(clients, weightsCopy); - for (InetSocketAddress addr : weightsCopy.keySet()) { - for (ClientConnectionsEntry clientConnectionsEntry : clients) { - if (clientConnectionsEntry.getClient().getAddr().equals(addr) - && !clientConnectionsEntry.isFreezed()) { - clientsCopy.add(clientConnectionsEntry); - break; - } + // If there are no connections available to servers that have a weight counter + // remaining, then reset the weight counters and find a connection again. In the worst + // case, there should always be a connection to the master. + if (clientsCopy.isEmpty()) { + for (WeightEntry entry : weights.values()) { + entry.resetWeightCounter(); } + + weightsCopy = weights; + clientsCopy = findClients(clients, weightsCopy); } int ind = Math.abs(index.incrementAndGet() % clientsCopy.size()); @@ -155,4 +157,18 @@ public class WeightedRoundRobinBalancer implements LoadBalancer { } } + private List findClients(List clients, Map weightsCopy) { + List clientsCopy = new ArrayList(); + for (InetSocketAddress addr : weightsCopy.keySet()) { + for (ClientConnectionsEntry clientConnectionsEntry : clients) { + if (clientConnectionsEntry.getClient().getAddr().equals(addr) + && !clientConnectionsEntry.isFreezed()) { + clientsCopy.add(clientConnectionsEntry); + break; + } + } + } + return clientsCopy; + } + } From 1a87a5eddac30adf79c31667f80be19839ba19d9 Mon Sep 17 00:00:00 2001 From: "Shailender R. Bathula" Date: Sun, 21 Aug 2016 09:36:19 +1200 Subject: [PATCH 2/3] #582: Unit test to reproduce the issue and test the fix --- .../WeightedRoundRobinBalancerTest.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java diff --git a/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java b/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java new file mode 100644 index 000000000..685b91651 --- /dev/null +++ b/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java @@ -0,0 +1,70 @@ +package org.redisson.connection.balancer; + +import static com.jayway.awaitility.Awaitility.await; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.redisson.RedisRunner; +import org.redisson.RedisRunner.RedisProcess; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.ReadMode; + +public class WeightedRoundRobinBalancerTest { + + @Test + public void testUseMasterForReadsIfNoConnectionsToSlaves() throws IOException, InterruptedException { + RedisProcess master = null; + RedisProcess slave = null; + RedissonClient client = null; + try { + master = redisTestInstance(6379); + slave = redisTestInstance(6380); + + Map weights = new HashMap<>(); + weights.put("127.0.0.1:6379", 1); + weights.put("127.0.0.1:6380", 2); + + Config config = new Config(); + config.useMasterSlaveServers() + .setReadMode(ReadMode.SLAVE) + .setMasterAddress("127.0.0.1:6379") + .addSlaveAddress("127.0.0.1:6380") + .setLoadBalancer(new WeightedRoundRobinBalancer(weights, 1)); + + client = Redisson.create(config); + + // To simulate network connection issues to slave, stop the slave + // after creating the client. Cannot create the client without the + // slave running. See https://github.com/mrniko/redisson/issues/580 + slave.stop(); + + RedissonClient clientCopy = client; + await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(clientCopy.getBucket("key").get()).isNull()); + } finally { + if (master != null) { + master.stop(); + } + if (slave != null) { + slave.stop(); + } + if (client != null) { + client.shutdown(); + } + } + } + + private RedisProcess redisTestInstance(int port) throws IOException, InterruptedException { + return new RedisRunner() + .nosave() + .randomDir() + .port(port) + .run(); + } +} From 12d72f712435feb86da02f15af95df22732cf3a3 Mon Sep 17 00:00:00 2001 From: "Shailender R. Bathula" Date: Sun, 21 Aug 2016 09:42:31 +1200 Subject: [PATCH 3/3] #582: Fix up the indentation --- .../WeightedRoundRobinBalancerTest.java | 79 +++++++++---------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java b/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java index 685b91651..0257efd6a 100644 --- a/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java +++ b/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java @@ -17,54 +17,53 @@ import org.redisson.config.Config; import org.redisson.config.ReadMode; public class WeightedRoundRobinBalancerTest { - + @Test - public void testUseMasterForReadsIfNoConnectionsToSlaves() throws IOException, InterruptedException { + public void testUseMasterForReadsIfNoConnectionsToSlaves() throws IOException, InterruptedException { RedisProcess master = null; RedisProcess slave = null; RedissonClient client = null; try { - master = redisTestInstance(6379); - slave = redisTestInstance(6380); - - Map weights = new HashMap<>(); - weights.put("127.0.0.1:6379", 1); - weights.put("127.0.0.1:6380", 2); - - Config config = new Config(); - config.useMasterSlaveServers() - .setReadMode(ReadMode.SLAVE) - .setMasterAddress("127.0.0.1:6379") - .addSlaveAddress("127.0.0.1:6380") - .setLoadBalancer(new WeightedRoundRobinBalancer(weights, 1)); - - client = Redisson.create(config); - - // To simulate network connection issues to slave, stop the slave - // after creating the client. Cannot create the client without the - // slave running. See https://github.com/mrniko/redisson/issues/580 - slave.stop(); - - RedissonClient clientCopy = client; - await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(clientCopy.getBucket("key").get()).isNull()); - } finally { - if (master != null) { - master.stop(); - } - if (slave != null) { - slave.stop(); - } - if (client != null) { - client.shutdown(); - } - } + master = redisTestInstance(6379); + slave = redisTestInstance(6380); + + Map weights = new HashMap<>(); + weights.put("127.0.0.1:6379", 1); + weights.put("127.0.0.1:6380", 2); + + Config config = new Config(); + config.useMasterSlaveServers() + .setReadMode(ReadMode.SLAVE) + .setMasterAddress("127.0.0.1:6379") + .addSlaveAddress("127.0.0.1:6380") + .setLoadBalancer(new WeightedRoundRobinBalancer(weights, 1)); + + client = Redisson.create(config); + + // To simulate network connection issues to slave, stop the slave + // after creating the client. Cannot create the client without the + // slave running. See https://github.com/mrniko/redisson/issues/580 + slave.stop(); + + RedissonClient clientCopy = client; + await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(clientCopy.getBucket("key").get()).isNull()); + } finally { + if (master != null) { + master.stop(); + } + if (slave != null) { + slave.stop(); + } + if (client != null) { + client.shutdown(); + } + } } - - private RedisProcess redisTestInstance(int port) throws IOException, InterruptedException { + + private RedisProcess redisTestInstance(int port) throws IOException, InterruptedException { return new RedisRunner() .nosave() .randomDir() - .port(port) - .run(); + .port(port).run(); } }