From d144c7fc294877094b1358d14b51d4abfd0dbc2e Mon Sep 17 00:00:00 2001 From: Amin Cheloh Date: Wed, 2 Sep 2015 23:04:06 +0700 Subject: [PATCH 1/7] Update README.md --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index e63a53c80..3b2315e41 100644 --- a/README.md +++ b/README.md @@ -241,6 +241,10 @@ Include the following to your dependency list: 2.1.1 +### Gradle + + compile 'org.redisson:redisson:2.1.1' + ### Supported by YourKit is kindly supporting this open source project with its full-featured Java Profiler. From 5018a3c31cc2dd10b7bb5d812f75f72548e233ec Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 3 Sep 2015 14:25:51 +0300 Subject: [PATCH 2/7] BlockingQueue.peek race-condition fixed. #106 --- src/main/java/org/redisson/RedissonQueue.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/org/redisson/RedissonQueue.java b/src/main/java/org/redisson/RedissonQueue.java index 8e6130c74..e5da5bdcd 100644 --- a/src/main/java/org/redisson/RedissonQueue.java +++ b/src/main/java/org/redisson/RedissonQueue.java @@ -88,10 +88,7 @@ public class RedissonQueue extends RedissonList implements RQueue { @Override public V peek() { - if (isEmpty()) { - return null; - } - return get(0); + return getValue(0); } @Override From 0f663f0c3e47543305be4ff9cf9b66b8a9682458 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 3 Sep 2015 15:53:39 +0300 Subject: [PATCH 3/7] masterEntry change possible race-condition --- .../redisson/connection/MasterSlaveEntry.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index b5d7acc4e..1be6af126 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -125,17 +125,19 @@ public class MasterSlaveEntry { } public RedisConnection connectionWriteOp() { - acquireMasterConnection(); + // may changed during changeMaster call + ConnectionEntry entry = masterEntry; + acquireMasterConnection(entry); - RedisConnection conn = masterEntry.getConnections().poll(); + RedisConnection conn = entry.getConnections().poll(); if (conn != null) { return conn; } try { - return masterEntry.connect(config); + return entry.connect(config); } catch (RedisException e) { - masterEntry.getConnectionsSemaphore().release(); + entry.getConnectionsSemaphore().release(); throw e; } } @@ -148,11 +150,11 @@ public class MasterSlaveEntry { return slaveBalancer.nextPubSubConnection(); } - void acquireMasterConnection() { - if (!masterEntry.getConnectionsSemaphore().tryAcquire()) { + void acquireMasterConnection(ConnectionEntry entry) { + if (!entry.getConnectionsSemaphore().tryAcquire()) { log.warn("Master connection pool gets exhausted! Trying to acquire connection ..."); long time = System.currentTimeMillis(); - masterEntry.getConnectionsSemaphore().acquireUninterruptibly(); + entry.getConnectionsSemaphore().acquireUninterruptibly(); long endTime = System.currentTimeMillis() - time; log.warn("Master connection acquired, time spended: {} ms", endTime); } @@ -164,13 +166,14 @@ public class MasterSlaveEntry { public void releaseWrite(RedisConnection connection) { // may changed during changeMaster call - if (!masterEntry.getClient().equals(connection.getRedisClient())) { + ConnectionEntry entry = masterEntry; + if (!entry.getClient().equals(connection.getRedisClient())) { connection.closeAsync(); return; } - masterEntry.getConnections().add(connection); - masterEntry.getConnectionsSemaphore().release(); + entry.getConnections().add(connection); + entry.getConnectionsSemaphore().release(); } public void releaseRead(RedisConnection сonnection) { From 8bdd6cf67a453ca60c198cb932e5de67ad3fceea Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 4 Sep 2015 16:48:06 +0300 Subject: [PATCH 4/7] use same node for SCAN/SSCAN/HSCAN during iteration. #230 --- .../redisson/CommandBatchExecutorService.java | 6 +- .../java/org/redisson/CommandExecutor.java | 3 + .../org/redisson/CommandExecutorService.java | 54 ++++++--- .../java/org/redisson/RedisClientResult.java | 11 ++ src/main/java/org/redisson/RedissonMap.java | 11 +- src/main/java/org/redisson/RedissonSet.java | 11 +- .../protocol/decoder/ListScanResult.java | 15 ++- .../protocol/decoder/MapScanResult.java | 15 ++- .../redisson/connection/BaseLoadBalancer.java | 109 ++++++++++-------- .../connection/ConnectionManager.java | 2 + .../org/redisson/connection/LoadBalancer.java | 3 + .../MasterSlaveConnectionManager.java | 10 +- .../redisson/connection/MasterSlaveEntry.java | 5 + 13 files changed, 178 insertions(+), 77 deletions(-) create mode 100644 src/main/java/org/redisson/RedisClientResult.java diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 330b94736..8c4504996 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -24,11 +24,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.client.WriteRedisConnectionException; -import org.redisson.client.RedisConnectionException; +import org.redisson.client.RedisClient; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; import org.redisson.client.RedisTimeoutException; +import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; @@ -102,7 +102,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { @Override protected void async(boolean readOnlyMode, int slot, MultiDecoder messageDecoder, - Codec codec, RedisCommand command, Object[] params, Promise mainPromise, int attempt) { + Codec codec, RedisCommand command, Object[] params, Promise mainPromise, RedisClient client, int attempt) { if (executed) { throw new IllegalStateException("Batch already executed!"); } diff --git a/src/main/java/org/redisson/CommandExecutor.java b/src/main/java/org/redisson/CommandExecutor.java index 22fe5acea..67bf29af0 100644 --- a/src/main/java/org/redisson/CommandExecutor.java +++ b/src/main/java/org/redisson/CommandExecutor.java @@ -18,6 +18,7 @@ package org.redisson; import java.util.Collection; import java.util.List; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; @@ -32,6 +33,8 @@ import io.netty.util.concurrent.Future; //TODO ping support public interface CommandExecutor { + R read(RedisClient client, String key, RedisCommand command, Object ... params); + Future evalWriteAllAsync(RedisCommand command, SlotCallback callback, String script, List keys, Object ... params); Future writeAllAsync(RedisCommand command, SlotCallback callback, Object ... params); diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 9f3170350..d0929561f 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -26,12 +26,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; -import org.redisson.client.WriteRedisConnectionException; -import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; import org.redisson.client.RedisTimeoutException; +import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; @@ -98,7 +98,7 @@ public class CommandExecutorService implements CommandExecutor { }; for (Integer slot : connectionManager.getEntries().keySet()) { - async(true, slot, null, connectionManager.getCodec(), command, params, promise, 0); + async(true, slot, null, connectionManager.getCodec(), command, params, promise, null, 0); } return mainPromise; } @@ -135,7 +135,7 @@ public class CommandExecutorService implements CommandExecutor { }); Integer slot = slots.remove(0); - async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, 0); + async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, null, 0); } public Future writeAllAsync(RedisCommand command, Object ... params) { @@ -172,7 +172,7 @@ public class CommandExecutorService implements CommandExecutor { } }; for (Integer slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, 0); + async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, null, 0); } return mainPromise; } @@ -196,10 +196,22 @@ public class CommandExecutorService implements CommandExecutor { return get(res); } + public R read(RedisClient client, String key, RedisCommand command, Object ... params) { + Future res = readAsync(client, key, connectionManager.getCodec(), command, params); + return get(res); + } + + public Future readAsync(RedisClient client, String key, Codec codec, RedisCommand command, Object ... params) { + Promise mainPromise = connectionManager.newPromise(); + int slot = connectionManager.calcSlot(key); + async(true, slot, null, codec, command, params, mainPromise, client, 0); + return mainPromise; + } + public Future readAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(true, slot, null, codec, command, params, mainPromise, 0); + async(true, slot, null, codec, command, params, mainPromise, null, 0); return mainPromise; } @@ -210,7 +222,7 @@ public class CommandExecutorService implements CommandExecutor { public Future writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); - async(false, slot, null, codec, command, params, mainPromise, 0); + async(false, slot, null, codec, command, params, mainPromise, null, 0); return mainPromise; } @@ -328,7 +340,7 @@ public class CommandExecutorService implements CommandExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); for (Integer slot : connectionManager.getEntries().keySet()) { - async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, 0); + async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0); } return mainPromise; } @@ -341,7 +353,7 @@ public class CommandExecutorService implements CommandExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); int slot = connectionManager.calcSlot(key); - async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, 0); + async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, null, 0); return mainPromise; } @@ -371,12 +383,12 @@ public class CommandExecutorService implements CommandExecutor { public Future writeAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); - async(false, slot, null, codec, command, params, mainPromise, 0); + async(false, slot, null, codec, command, params, mainPromise, null, 0); return mainPromise; } protected void async(final boolean readOnlyMode, final int slot, final MultiDecoder messageDecoder, final Codec codec, final RedisCommand command, - final Object[] params, final Promise mainPromise, final int attempt) { + final Object[] params, final Promise mainPromise, final RedisClient client, final int attempt) { if (!connectionManager.getShutdownLatch().acquire()) { mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); return; @@ -400,18 +412,22 @@ public class CommandExecutorService implements CommandExecutor { } int count = attempt + 1; - async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, count); + async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, client, count); } }; try { - org.redisson.client.RedisConnection connection; + RedisConnection connection; if (readOnlyMode) { - connection = connectionManager.connectionReadOp(slot); + if (client != null) { + connection = connectionManager.connectionReadOp(slot, client); + } else { + connection = connectionManager.connectionReadOp(slot); + } } else { connection = connectionManager.connectionWriteOp(slot); } - log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr()); + log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr()); ChannelFuture future = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); ex.set(new RedisTimeoutException()); @@ -449,12 +465,16 @@ public class CommandExecutorService implements CommandExecutor { if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, attempt); + async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, client, attempt); return; } if (future.isSuccess()) { - mainPromise.setSuccess(future.getNow()); + R res = future.getNow(); + if (res instanceof RedisClientResult) { + ((RedisClientResult)res).setRedisClient(client); + } + mainPromise.setSuccess(res); } else { mainPromise.setFailure(future.cause()); } diff --git a/src/main/java/org/redisson/RedisClientResult.java b/src/main/java/org/redisson/RedisClientResult.java new file mode 100644 index 000000000..2fe09b3ab --- /dev/null +++ b/src/main/java/org/redisson/RedisClientResult.java @@ -0,0 +1,11 @@ +package org.redisson; + +import org.redisson.client.RedisClient; + +public interface RedisClientResult { + + void setRedisClient(RedisClient client); + + RedisClient getRedisClient(); + +} diff --git a/src/main/java/org/redisson/RedissonMap.java b/src/main/java/org/redisson/RedissonMap.java index 664c4e61d..96d9cdde8 100644 --- a/src/main/java/org/redisson/RedissonMap.java +++ b/src/main/java/org/redisson/RedissonMap.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import org.redisson.client.RedisClient; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -277,8 +278,8 @@ public class RedissonMap extends RedissonExpirable implements RMap { return get(fastRemoveAsync(keys)); } - private MapScanResult scanIterator(long startPos) { - return commandExecutor.read(getName(), RedisCommands.HSCAN, getName(), startPos); + private MapScanResult scanIterator(RedisClient client, long startPos) { + return commandExecutor.read(client, getName(), RedisCommands.HSCAN, getName(), startPos); } private Iterator> iterator() { @@ -286,6 +287,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { private Iterator> iter; private long iterPos = 0; + private RedisClient client; private boolean removeExecuted; private Map.Entry value; @@ -294,7 +296,10 @@ public class RedissonMap extends RedissonExpirable implements RMap { public boolean hasNext() { if (iter == null || (!iter.hasNext() && iterPos != 0)) { - MapScanResult res = scanIterator(iterPos); + MapScanResult res = scanIterator(client, iterPos); + if (iter == null) { + client = res.getRedisClient(); + } iter = ((Map)res.getMap()).entrySet().iterator(); iterPos = res.getPos(); } diff --git a/src/main/java/org/redisson/RedissonSet.java b/src/main/java/org/redisson/RedissonSet.java index e37bae3cb..13343a60f 100644 --- a/src/main/java/org/redisson/RedissonSet.java +++ b/src/main/java/org/redisson/RedissonSet.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import org.redisson.client.RedisClient; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; @@ -68,8 +69,8 @@ public class RedissonSet extends RedissonExpirable implements RSet { return commandExecutor.readAsync(getName(), RedisCommands.SISMEMBER, getName(), o); } - private ListScanResult scanIterator(long startPos) { - return commandExecutor.read(getName(), RedisCommands.SSCAN, getName(), startPos); + private ListScanResult scanIterator(RedisClient client, long startPos) { + return commandExecutor.read(client, getName(), RedisCommands.SSCAN, getName(), startPos); } @Override @@ -77,6 +78,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { return new Iterator() { private Iterator iter; + private RedisClient client; private Long iterPos; private boolean removeExecuted; @@ -85,11 +87,12 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public boolean hasNext() { if (iter == null) { - ListScanResult res = scanIterator(0); + ListScanResult res = scanIterator(null, 0); + client = res.getRedisClient(); iter = res.getValues().iterator(); iterPos = res.getPos(); } else if (!iter.hasNext() && iterPos != 0) { - ListScanResult res = scanIterator(iterPos); + ListScanResult res = scanIterator(client, iterPos); iter = res.getValues().iterator(); iterPos = res.getPos(); } diff --git a/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java b/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java index 4e90ad130..3231cf931 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java +++ b/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java @@ -17,10 +17,14 @@ package org.redisson.client.protocol.decoder; import java.util.List; -public class ListScanResult { +import org.redisson.RedisClientResult; +import org.redisson.client.RedisClient; + +public class ListScanResult implements RedisClientResult { private final Long pos; private final List values; + private RedisClient client; public ListScanResult(Long pos, List values) { this.pos = pos; @@ -35,4 +39,13 @@ public class ListScanResult { return values; } + @Override + public void setRedisClient(RedisClient client) { + this.client = client; + } + + public RedisClient getRedisClient() { + return client; + } + } diff --git a/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java b/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java index 60028e211..e54591e4e 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java +++ b/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java @@ -17,10 +17,14 @@ package org.redisson.client.protocol.decoder; import java.util.Map; -public class MapScanResult { +import org.redisson.RedisClientResult; +import org.redisson.client.RedisClient; + +public class MapScanResult implements RedisClientResult { private final Long pos; private final Map values; + private RedisClient client; public MapScanResult(Long pos, Map values) { super(); @@ -36,4 +40,13 @@ public class MapScanResult { return values; } + @Override + public void setRedisClient(RedisClient client) { + this.client = client; + } + + public RedisClient getRedisClient() { + return client; + } + } diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 9b6f5bccb..5a0d8330e 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -20,10 +20,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Map; import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; @@ -33,6 +33,8 @@ import org.redisson.misc.ReclosableLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.internal.PlatformDependent; + abstract class BaseLoadBalancer implements LoadBalancer { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -41,7 +43,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { private ConnectionManager connectionManager; private final ReclosableLatch clientsEmpty = new ReclosableLatch(); - final Queue clients = new ConcurrentLinkedQueue(); + final Map clients = PlatformDependent.newConcurrentHashMap(); public void init(MasterSlaveServersConfig config, ConnectionManager connectionManager) { this.config = config; @@ -49,7 +51,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { } public synchronized void add(SubscribesConnectionEntry entry) { - clients.add(entry); + clients.put(entry.getClient(), entry); if (!entry.isFreezed()) { clientsEmpty.open(); } @@ -57,7 +59,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public int getAvailableClients() { int count = 0; - for (SubscribesConnectionEntry connectionEntry : clients) { + for (SubscribesConnectionEntry connectionEntry : clients.values()) { if (!connectionEntry.isFreezed()) { count++; } @@ -67,7 +69,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public synchronized void unfreeze(String host, int port) { InetSocketAddress addr = new InetSocketAddress(host, port); - for (SubscribesConnectionEntry connectionEntry : clients) { + for (SubscribesConnectionEntry connectionEntry : clients.values()) { if (!connectionEntry.getClient().getAddr().equals(addr)) { continue; } @@ -80,7 +82,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public synchronized Collection freeze(String host, int port) { InetSocketAddress addr = new InetSocketAddress(host, port); - for (SubscribesConnectionEntry connectionEntry : clients) { + for (SubscribesConnectionEntry connectionEntry : clients.values()) { if (connectionEntry.isFreezed() || !connectionEntry.getClient().getAddr().equals(addr)) { continue; @@ -109,7 +111,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { boolean allFreezed = true; - for (SubscribesConnectionEntry entry : clients) { + for (SubscribesConnectionEntry entry : clients.values()) { if (!entry.isFreezed()) { allFreezed = false; break; @@ -129,7 +131,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public RedisPubSubConnection nextPubSubConnection() { clientsEmpty.awaitUninterruptibly(); - List clientsCopy = new ArrayList(clients); + List clientsCopy = new ArrayList(clients.values()); while (true) { if (clientsCopy.isEmpty()) { throw new RedisConnectionException("Slave subscribe-connection pool gets exhausted!"); @@ -170,9 +172,21 @@ abstract class BaseLoadBalancer implements LoadBalancer { } } + public RedisConnection getConnection(RedisClient client) { + SubscribesConnectionEntry entry = clients.get(client); + if (entry != null) { + RedisConnection conn = retrieveConnection(entry); + if (conn == null) { + throw new RedisConnectionException("Slave connection pool gets exhausted for " + client); + } + return conn; + } + throw new RedisConnectionException("Can't find entry for " + client); + } + public RedisConnection nextConnection() { clientsEmpty.awaitUninterruptibly(); - List clientsCopy = new ArrayList(clients); + List clientsCopy = new ArrayList(clients.values()); while (true) { if (clientsCopy.isEmpty()) { throw new RedisConnectionException("Slave connection pool gets exhausted!"); @@ -181,22 +195,31 @@ abstract class BaseLoadBalancer implements LoadBalancer { int index = getIndex(clientsCopy); SubscribesConnectionEntry entry = clientsCopy.get(index); - if (entry.isFreezed() - || !entry.getConnectionsSemaphore().tryAcquire()) { + RedisConnection conn = retrieveConnection(entry); + if (conn == null) { clientsCopy.remove(index); } else { - RedisConnection conn = entry.getConnections().poll(); - if (conn != null) { - return conn; - } - try { - return entry.connect(config); - } catch (RedisException e) { - entry.getConnectionsSemaphore().release(); - // TODO connection scoring - log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr()); - clientsCopy.remove(index); - } + return conn; + } + } + } + + private RedisConnection retrieveConnection(SubscribesConnectionEntry entry) { + if (entry.isFreezed() + || !entry.getConnectionsSemaphore().tryAcquire()) { + return null; + } else { + RedisConnection conn = entry.getConnections().poll(); + if (conn != null) { + return conn; + } + try { + return entry.connect(config); + } catch (RedisException e) { + entry.getConnectionsSemaphore().release(); + // TODO connection scoring + log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr()); + return null; } } } @@ -204,42 +227,34 @@ abstract class BaseLoadBalancer implements LoadBalancer { abstract int getIndex(List clientsCopy); public void returnSubscribeConnection(RedisPubSubConnection connection) { - for (SubscribesConnectionEntry entry : clients) { - if (entry.getClient().equals(connection.getRedisClient())) { - if (entry.isFreezed()) { - connection.closeAsync(); - } else { - entry.offerFreeSubscribeConnection(connection); - } - entry.getSubscribeConnectionsSemaphore().release(); - break; - } + SubscribesConnectionEntry entry = clients.get(connection.getRedisClient()); + if (entry.isFreezed()) { + connection.closeAsync(); + } else { + entry.offerFreeSubscribeConnection(connection); } + entry.getSubscribeConnectionsSemaphore().release(); } public void returnConnection(RedisConnection connection) { - for (SubscribesConnectionEntry entry : clients) { - if (entry.getClient().equals(connection.getRedisClient())) { - if (entry.isFreezed()) { - connection.closeAsync(); - } else { - entry.getConnections().add(connection); - } - entry.getConnectionsSemaphore().release(); - break; - } + SubscribesConnectionEntry entry = clients.get(connection.getRedisClient()); + if (entry.isFreezed()) { + connection.closeAsync(); + } else { + entry.getConnections().add(connection); } + entry.getConnectionsSemaphore().release(); } public void shutdown() { - for (SubscribesConnectionEntry entry : clients) { + for (SubscribesConnectionEntry entry : clients.values()) { entry.getClient().shutdown(); } } public void shutdownAsync() { - for (SubscribesConnectionEntry entry : clients) { - connectionManager.shutdownAsync(entry.getClient()); + for (RedisClient client : clients.keySet()) { + connectionManager.shutdownAsync(client); } } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index cbe33a1e4..f7bcf5d9e 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -63,6 +63,8 @@ public interface ConnectionManager { RedisConnection connectionReadOp(int slot); + RedisConnection connectionReadOp(int slot, RedisClient client); + RedisConnection connectionWriteOp(int slot); FutureListener createReleaseReadListener(int slot, diff --git a/src/main/java/org/redisson/connection/LoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancer.java index 1f0f8c044..8c009ef8d 100644 --- a/src/main/java/org/redisson/connection/LoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancer.java @@ -18,11 +18,14 @@ package org.redisson.connection; import java.util.Collection; import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; public interface LoadBalancer { + RedisConnection getConnection(RedisClient client); + int getAvailableClients(); void shutdownAsync(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index bc929b429..08f7928ee 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -18,7 +18,6 @@ package org.redisson.connection; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Set; @@ -466,6 +465,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return e.connectionReadOp(); } + @Override + public RedisConnection connectionReadOp(int slot, RedisClient client) { + MasterSlaveEntry e = getEntry(slot); + if (!e.isOwn(slot)) { + throw new RedisEmptySlotException("No node for slot: " + slot, slot); + } + return e.connectionReadOp(client); + } + RedisPubSubConnection nextPubSubConnection(int slot) { return getEntry(slot).nextPubSubConnection(); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 1be6af126..dde924220 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -146,6 +146,11 @@ public class MasterSlaveEntry { return slaveBalancer.nextConnection(); } + public RedisConnection connectionReadOp(RedisClient client) { + return slaveBalancer.getConnection(client); + } + + RedisPubSubConnection nextPubSubConnection() { return slaveBalancer.nextPubSubConnection(); } From 481b2e969cfc77ef0cf95cc4e099327e4c3009e9 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 4 Sep 2015 16:52:44 +0300 Subject: [PATCH 5/7] executeAsync freeze fixed. #233 --- src/main/java/org/redisson/CommandBatchExecutorService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 8c4504996..89faa279a 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -158,8 +158,10 @@ public class CommandBatchExecutorService extends CommandExecutorService { commands = null; } }); + + AtomicInteger slots = new AtomicInteger(commands.size()); for (java.util.Map.Entry e : commands.entrySet()) { - execute(e.getValue(), e.getKey(), voidPromise, new AtomicInteger(commands.size()), 0); + execute(e.getValue(), e.getKey(), voidPromise, slots, 0); } return promise; } From f77034bb026aac11734ace7fbf75fb53a30f3eb9 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 4 Sep 2015 17:08:28 +0300 Subject: [PATCH 6/7] possible NPE during watchdog reconnection fixed. #235 --- src/main/java/org/redisson/client/RedisConnection.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 7ec96863b..c521ff7dd 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -42,13 +42,13 @@ public class RedisConnection implements RedisCommands { public RedisConnection(RedisClient redisClient, Channel channel) { super(); this.redisClient = redisClient; - this.channel = channel; - channel.attr(CONNECTION).set(this); + updateChannel(channel); } public void updateChannel(Channel channel) { this.channel = channel; + channel.attr(CONNECTION).set(this); } public RedisClient getRedisClient() { From 2f99a27103db931b8c372b880afa3b4adcffbd8c Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 4 Sep 2015 17:13:31 +0300 Subject: [PATCH 7/7] license added --- src/main/java/org/redisson/RedisClientResult.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/main/java/org/redisson/RedisClientResult.java b/src/main/java/org/redisson/RedisClientResult.java index 2fe09b3ab..381c2f522 100644 --- a/src/main/java/org/redisson/RedisClientResult.java +++ b/src/main/java/org/redisson/RedisClientResult.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson; import org.redisson.client.RedisClient;