From 645b1240dce40e4757eb20a85b2da165dd710fda Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 1 Jun 2016 14:34:19 +0300 Subject: [PATCH 01/22] Infinity loop with iterator fixed. #515 --- .../org/redisson/RedissonBaseIterator.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/redisson/RedissonBaseIterator.java b/src/main/java/org/redisson/RedissonBaseIterator.java index 3f6c62c7a..efb169cc0 100644 --- a/src/main/java/org/redisson/RedissonBaseIterator.java +++ b/src/main/java/org/redisson/RedissonBaseIterator.java @@ -73,12 +73,18 @@ abstract class RedissonBaseIterator implements Iterator { if (firstValues.isEmpty()) { firstValues = lastValues; lastValues = null; - if (firstValues.isEmpty() && tryAgain()) { - client = null; - firstValues = null; - nextIterPos = 0; - prevIterPos = -1; - continue; + if (firstValues.isEmpty()) { + if (tryAgain()) { + client = null; + firstValues = null; + nextIterPos = 0; + prevIterPos = -1; + continue; + } + if (res.getPos() == 0) { + finished = true; + return false; + } } } else if (lastValues.removeAll(firstValues)) { currentElementRemoved = false; From a3c495f980821e68414462400496e249647d03d5 Mon Sep 17 00:00:00 2001 From: Fransiskus Xaverius Date: Fri, 3 Jun 2016 13:44:44 +0700 Subject: [PATCH 02/22] improve auth reconnect --- .../connection/ClientConnectionsEntry.java | 5 ++++ .../connection/pool/ConnectionPool.java | 28 +++++++++++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 1d9c3d6da..6faf953b8 100644 --- a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -19,6 +19,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import org.redisson.MasterSlaveServersConfig; import org.redisson.client.ReconnectListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; @@ -186,6 +187,10 @@ public class ClientConnectionsEntry { }); } + public MasterSlaveServersConfig getConfig() { + return connectionManager.getConfig(); + } + public Future connectPubSub() { final Promise connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise(); Future future = client.connectPubSubAsync(); diff --git a/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 3c63d632b..6d46625cb 100644 --- a/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -326,13 +326,12 @@ abstract class ConnectionPool { return; } - Future f = c.asyncWithTimeout(null, RedisCommands.PING); - f.addListener(new FutureListener() { + final FutureListener pingListener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { try { if (entry.getFreezeReason() != FreezeReason.RECONNECT - || !entry.isFreezed()) { + || !entry.isFreezed()) { return; } @@ -342,7 +341,7 @@ abstract class ConnectionPool { promise.addListener(new FutureListener() { @Override public void operationComplete(Future future) - throws Exception { + throws Exception { if (entry.getNodeType() == NodeType.SLAVE) { masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); log.info("slave {} successfully reconnected", entry.getClient().getAddr()); @@ -365,13 +364,32 @@ abstract class ConnectionPool { c.closeAsync(); } } - }); + }; + + if (entry.getConfig().getPassword() != null) { + Future temp = c.asyncWithTimeout(null, RedisCommands.AUTH, config.getPassword()); + + FutureListener listener = new FutureListener () { + @Override public void operationComplete (Future < Void > future)throws Exception { + ping(c, pingListener); + } + }; + + temp.addListener(listener); + } else { + ping(c, pingListener); + } } }); } }, config.getReconnectionTimeout(), TimeUnit.MILLISECONDS); } + private void ping(RedisConnection c, final FutureListener pingListener) { + Future f = c.asyncWithTimeout(null, RedisCommands.PING); + f.addListener(pingListener); + } + public void returnConnection(ClientConnectionsEntry entry, T connection) { if (entry.isFreezed()) { connection.closeAsync(); From eeba238f42ed707979afe85b095d9c4e3cd0531a Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 8 Jun 2016 13:40:04 +0300 Subject: [PATCH 03/22] Map iterator fixed. #515 --- .../org/redisson/RedissonBaseMapIterator.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/redisson/RedissonBaseMapIterator.java b/src/main/java/org/redisson/RedissonBaseMapIterator.java index b31246795..536887ff9 100644 --- a/src/main/java/org/redisson/RedissonBaseMapIterator.java +++ b/src/main/java/org/redisson/RedissonBaseMapIterator.java @@ -82,12 +82,18 @@ abstract class RedissonBaseMapIterator implements Iterator { if (firstValues.isEmpty()) { firstValues = lastValues; lastValues = null; - if (firstValues.isEmpty() && tryAgain()) { - client = null; - firstValues = null; - nextIterPos = 0; - prevIterPos = -1; - continue; + if (firstValues.isEmpty()) { + if (tryAgain()) { + client = null; + firstValues = null; + nextIterPos = 0; + prevIterPos = -1; + continue; + } + if (res.getPos() == 0) { + finished = true; + return false; + } } } else if (lastValues.keySet().removeAll(firstValues.keySet())) { free(firstValues); From a4f0473514c654a597017cd4c10aec07c4f467ca Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 8 Jun 2016 13:49:56 +0300 Subject: [PATCH 04/22] Iterator infinite scan test added. #515 --- src/test/java/org/redisson/RedissonTest.java | 28 ++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/test/java/org/redisson/RedissonTest.java b/src/test/java/org/redisson/RedissonTest.java index 65792df58..a33d4a4b7 100644 --- a/src/test/java/org/redisson/RedissonTest.java +++ b/src/test/java/org/redisson/RedissonTest.java @@ -2,6 +2,7 @@ package org.redisson; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -19,6 +20,7 @@ import org.redisson.RedisRunner.RedisProcess; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.WriteRedisConnectionException; +import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.codec.SerializationCodec; import org.redisson.connection.ConnectionListener; import org.redisson.core.ClusterNode; @@ -33,6 +35,32 @@ public class RedissonTest { protected RedissonClient redisson; protected static RedissonClient defaultRedisson; + @Test + public void testIterator() { + RedissonBaseIterator iter = new RedissonBaseIterator() { + int i; + @Override + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + i++; + if (i == 1) { + return new ListScanResult(13L, Collections.emptyList()); + } + if (i == 2) { + return new ListScanResult(0L, Collections.emptyList()); + } + Assert.fail(); + return null; + } + + @Override + void remove(Object value) { + } + + }; + + Assert.assertFalse(iter.hasNext()); + } + @BeforeClass public static void beforeClass() throws IOException, InterruptedException { if (!RedissonRuntimeEnvironment.isTravis) { From 31bc0135f1059d383ee9a774db20d1a9688658eb Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 8 Jun 2016 14:10:22 +0300 Subject: [PATCH 05/22] [maven-release-plugin] prepare release redisson-2.2.15 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 1e09ffc38..3c32a9f8e 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.redisson redisson - 2.2.15-SNAPSHOT + 2.2.15 bundle Redisson @@ -15,7 +15,7 @@ scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git - HEAD + redisson-2.2.15 From ad0fc6c4af03931a5b662386aa71ecfae4f85b54 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 8 Jun 2016 14:10:29 +0300 Subject: [PATCH 06/22] [maven-release-plugin] prepare for next development iteration --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 3c32a9f8e..d18462cf1 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.redisson redisson - 2.2.15 + 2.2.16-SNAPSHOT bundle Redisson @@ -15,7 +15,7 @@ scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git - redisson-2.2.15 + HEAD From dd30c661373f2d19b4e1f5cc8dc27e63b09c4929 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 8 Jun 2016 14:22:03 +0300 Subject: [PATCH 07/22] Update CHANGELOG.md --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fbab79055..c21506cbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ Redisson Releases History ================================ ####Please Note: trunk is current development branch. +####08-Jun-2016 - version 2.2.15 released +Improvement - Performance boost up to 30% for `RSortedSet.add` method +Fixed - auth during reconnection (thanks to fransiskusx) +Fixed - Infinity loop with iterator +Fixed - NPE in `RSortedSet` +Fixed - `RSortedSet.remove` and `iterator.remove` methods can break elements ordering + ####27-May-2016 - version 2.2.14 released Redisson Team is pleased to announce [Redisson PRO](http://redisson.pro) edition. This version is based on open-source edition and has 24x7 support and some features. From 5ecea4d719eb80d9cf9b3d205f9c73ba739452fd Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 8 Jun 2016 17:33:30 +0300 Subject: [PATCH 08/22] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c21506cbe..f262ce150 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ Redisson Releases History ####08-Jun-2016 - version 2.2.15 released Improvement - Performance boost up to 30% for `RSortedSet.add` method -Fixed - auth during reconnection (thanks to fransiskusx) +Fixed - auth during reconnection (thanks to fransiskusx) Fixed - Infinity loop with iterator Fixed - NPE in `RSortedSet` Fixed - `RSortedSet.remove` and `iterator.remove` methods can break elements ordering From 93574d4f6b883ca24a2cb5b4c4a79ef08611f789 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 8 Jun 2016 17:33:51 +0300 Subject: [PATCH 09/22] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f1087744c..75efb0cac 100644 --- a/README.md +++ b/README.md @@ -83,12 +83,12 @@ Include the following to your dependency list: org.redisson redisson - 2.2.14 + 2.2.15 ### Gradle - compile 'org.redisson:redisson:2.2.14' + compile 'org.redisson:redisson:2.2.15' ### Supported by From 0227a0695d0c7ad75831e36f3c3beeeeb627cf6f Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 9 Jun 2016 13:09:57 +0300 Subject: [PATCH 10/22] Fixed dead lock during RLock, RSemaphore, RReadWriteLock, RCountDownLatch usage under heavy load. #491, #486, #455 --- .../connection/ConnectionManager.java | 2 +- .../MasterSlaveConnectionManager.java | 6 +- .../redisson/connection/MasterSlaveEntry.java | 46 +++++++----- .../org/redisson/pubsub/PublishSubscribe.java | 2 +- .../org/redisson/RedissonLockHeavyTest.java | 73 +++++++++++++++++++ 5 files changed, 106 insertions(+), 23 deletions(-) create mode 100644 src/test/java/org/redisson/RedissonLockHeavyTest.java diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index d15d630be..2b3393c8d 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -93,7 +93,7 @@ public interface ConnectionManager { Future psubscribe(String pattern, Codec codec); - Codec unsubscribe(String channelName); + Future unsubscribe(String channelName); Codec punsubscribe(String channelName); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index f67e8c34b..60c0bff85 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -464,12 +464,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public Codec unsubscribe(final String channelName) { + public Future unsubscribe(final String channelName) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { return null; } + Promise result = newPromise(); Codec entryCodec = entry.getConnection().getChannels().get(channelName); entry.unsubscribe(channelName, new BaseRedisPubSubListener() { @@ -481,13 +482,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager { releaseSubscribeConnection(0, entry); } } + result.setSuccess(entryCodec); return true; } return false; } }); - return entryCodec; + return result; } @Override diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index ff26955be..0fe9871a0 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -180,26 +180,34 @@ public class MasterSlaveEntry { } private void reattachPubSubListeners(final String channelName, final Collection listeners) { - Codec subscribeCodec = connectionManager.unsubscribe(channelName); - if (!listeners.isEmpty()) { - Future future = connectionManager.subscribe(subscribeCodec, channelName, null); - future.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) - throws Exception { - if (!future.isSuccess()) { - log.error("Can't resubscribe topic channel: " + channelName); - return; - } - PubSubConnectionEntry newEntry = future.getNow(); - for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(channelName, redisPubSubListener); - } - log.debug("resubscribed listeners for '{}' channel", channelName); + Future unsubscribeFuture = connectionManager.unsubscribe(channelName); + unsubscribeFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (listeners.isEmpty()) { + return; } - }); - } + + Codec subscribeCodec = future.getNow(); + Future subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null); + subscribeFuture.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) + throws Exception { + if (!future.isSuccess()) { + log.error("Can't resubscribe topic channel: " + channelName); + return; + } + PubSubConnectionEntry newEntry = future.getNow(); + for (RedisPubSubListener redisPubSubListener : listeners) { + newEntry.addListener(channelName, redisPubSubListener); + } + log.debug("resubscribed listeners for '{}' channel", channelName); + } + }); + } + }); } private void reattachPatternPubSubListeners(final String channelName, diff --git a/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/src/main/java/org/redisson/pubsub/PublishSubscribe.java index fb9f81a22..a02b3f26d 100644 --- a/src/main/java/org/redisson/pubsub/PublishSubscribe.java +++ b/src/main/java/org/redisson/pubsub/PublishSubscribe.java @@ -38,7 +38,7 @@ abstract class PublishSubscribe> { // just an assertion boolean removed = entries.remove(entryName) == entry; if (removed) { - connectionManager.unsubscribe(channelName); + connectionManager.unsubscribe(channelName).syncUninterruptibly(); } } } diff --git a/src/test/java/org/redisson/RedissonLockHeavyTest.java b/src/test/java/org/redisson/RedissonLockHeavyTest.java new file mode 100644 index 000000000..2268b423f --- /dev/null +++ b/src/test/java/org/redisson/RedissonLockHeavyTest.java @@ -0,0 +1,73 @@ +package org.redisson; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.redisson.core.RBucket; +import org.redisson.core.RLock; +import org.redisson.core.RSemaphore; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +@RunWith(Parameterized.class) +public class RedissonLockHeavyTest extends BaseTest { + @Parameters + public static Collection data() { + + return Arrays.asList(new Object[][] { { 2, 5000 }, { 2, 50000 }, { 5, 50000 }, { 10, 50000 }, { 20, 50000 }, }); + } + + private ExecutorService executor; + private int threads; + private int loops; + + public RedissonLockHeavyTest(int threads, int loops) { + this.threads = threads; + executor = Executors.newFixedThreadPool(threads); + this.loops = loops; + } + + @Test + public void lockUnlockRLock() throws Exception { + for (int i = 0; i < threads; i++) { + + Runnable worker = new Runnable() { + + @Override + public void run() { + for (int j = 0; j < loops; j++) { + RLock lock = redisson.getLock("RLOCK_" + j); + lock.lock(); + try { + RBucket bucket = redisson.getBucket("RBUCKET_" + j); + bucket.set("TEST", 30, TimeUnit.SECONDS); + RSemaphore semaphore = redisson.getSemaphore("SEMAPHORE_" + j); + semaphore.release(); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + semaphore.expire(30, TimeUnit.SECONDS); + } finally { + lock.unlock(); + } + } + } + }; + executor.execute(worker); + } + executor.shutdown(); + executor.awaitTermination(threads * loops, TimeUnit.SECONDS); + + } + +} \ No newline at end of file From f9c1dda0e1d660023c6ed73b3a0606fa85d461f3 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 9 Jun 2016 13:42:23 +0300 Subject: [PATCH 11/22] RGeo added to RBatch object. #522 --- src/main/java/org/redisson/RedissonBatch.java | 23 +++++++++++----- src/main/java/org/redisson/core/RBatch.java | 26 ++++++++++++++++--- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/redisson/RedissonBatch.java b/src/main/java/org/redisson/RedissonBatch.java index bd6910939..11a956eb8 100644 --- a/src/main/java/org/redisson/RedissonBatch.java +++ b/src/main/java/org/redisson/RedissonBatch.java @@ -27,20 +27,20 @@ import org.redisson.core.RBitSetAsync; import org.redisson.core.RBlockingDequeAsync; import org.redisson.core.RBlockingQueueAsync; import org.redisson.core.RBucketAsync; -import org.redisson.core.RMapCacheAsync; import org.redisson.core.RDequeAsync; +import org.redisson.core.RGeoAsync; import org.redisson.core.RHyperLogLogAsync; import org.redisson.core.RKeysAsync; import org.redisson.core.RLexSortedSetAsync; import org.redisson.core.RListAsync; -import org.redisson.core.RListMultimap; import org.redisson.core.RMapAsync; +import org.redisson.core.RMapCacheAsync; +import org.redisson.core.RMultimapAsync; import org.redisson.core.RQueueAsync; import org.redisson.core.RScoredSortedSetAsync; import org.redisson.core.RScriptAsync; import org.redisson.core.RSetAsync; import org.redisson.core.RSetCacheAsync; -import org.redisson.core.RSetMultimap; import org.redisson.core.RTopicAsync; import io.netty.util.concurrent.Future; @@ -232,24 +232,33 @@ public class RedissonBatch implements RBatch { } @Override - public RSetMultimap getSetMultimap(String name) { + public RMultimapAsync getSetMultimap(String name) { return new RedissonSetMultimap(executorService, name); } @Override - public RSetMultimap getSetMultimap(String name, Codec codec) { + public RMultimapAsync getSetMultimap(String name, Codec codec) { return new RedissonSetMultimap(codec, executorService, name); } @Override - public RListMultimap getListMultimap(String name) { + public RMultimapAsync getListMultimap(String name) { return new RedissonListMultimap(executorService, name); } @Override - public RListMultimap getListMultimap(String name, Codec codec) { + public RMultimapAsync getListMultimap(String name, Codec codec) { return new RedissonListMultimap(codec, executorService, name); } + @Override + public RGeoAsync getGeo(String name) { + return new RedissonGeo(executorService, name); + } + + @Override + public RGeoAsync getGeo(String name, Codec codec) { + return new RedissonGeo(codec, executorService, name); + } } diff --git a/src/main/java/org/redisson/core/RBatch.java b/src/main/java/org/redisson/core/RBatch.java index cdcae7a6f..b573811cf 100644 --- a/src/main/java/org/redisson/core/RBatch.java +++ b/src/main/java/org/redisson/core/RBatch.java @@ -37,13 +37,31 @@ import io.netty.util.concurrent.Future; */ public interface RBatch { + /** + * Returns geospatial items holder instance by name. + * + * @param name + * @return + */ + RGeoAsync getGeo(String name); + + /** + * Returns geospatial items holder instance by name + * using provided codec for geospatial members. + * + * @param name + * @param geospatial member codec + * @return + */ + RGeoAsync getGeo(String name, Codec codec); + /** * Returns Set based MultiMap instance by name. * * @param name * @return */ - RSetMultimap getSetMultimap(String name); + RMultimapAsync getSetMultimap(String name); /** * Returns Set based MultiMap instance by name @@ -53,7 +71,7 @@ public interface RBatch { * @param codec * @return */ - RSetMultimap getSetMultimap(String name, Codec codec); + RMultimapAsync getSetMultimap(String name, Codec codec); /** * Returns set-based cache instance by name. @@ -142,7 +160,7 @@ public interface RBatch { * @param name * @return */ - RListMultimap getListMultimap(String name); + RMultimapAsync getListMultimap(String name); /** * Returns List based MultiMap instance by name @@ -152,7 +170,7 @@ public interface RBatch { * @param codec * @return */ - RListMultimap getListMultimap(String name, Codec codec); + RMultimapAsync getListMultimap(String name, Codec codec); /** * Returns map instance by name. From d4d954effb818546ee3a7f88ca927b2819da1012 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 9 Jun 2016 13:46:26 +0300 Subject: [PATCH 12/22] MultimapCache added to RBatch object. #525 --- src/main/java/org/redisson/Redisson.java | 2 +- src/main/java/org/redisson/RedissonBatch.java | 22 +++++++++ src/main/java/org/redisson/core/RBatch.java | 46 +++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 4d6f929ab..87f2e99f2 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; @@ -75,7 +76,6 @@ import org.redisson.core.RSortedSet; import org.redisson.core.RTopic; import io.netty.util.concurrent.Future; -import java.util.concurrent.TimeUnit; /** * Main infrastructure class allows to get access diff --git a/src/main/java/org/redisson/RedissonBatch.java b/src/main/java/org/redisson/RedissonBatch.java index 11a956eb8..84ff7b75f 100644 --- a/src/main/java/org/redisson/RedissonBatch.java +++ b/src/main/java/org/redisson/RedissonBatch.java @@ -36,6 +36,7 @@ import org.redisson.core.RListAsync; import org.redisson.core.RMapAsync; import org.redisson.core.RMapCacheAsync; import org.redisson.core.RMultimapAsync; +import org.redisson.core.RMultimapCacheAsync; import org.redisson.core.RQueueAsync; import org.redisson.core.RScoredSortedSetAsync; import org.redisson.core.RScriptAsync; @@ -260,5 +261,26 @@ public class RedissonBatch implements RBatch { public RGeoAsync getGeo(String name, Codec codec) { return new RedissonGeo(codec, executorService, name); } + + @Override + public RMultimapCacheAsync getSetMultimapCache(String name) { + return new RedissonSetMultimapCache(evictionScheduler, executorService, name); + } + + @Override + public RMultimapCacheAsync getSetMultimapCache(String name, Codec codec) { + return new RedissonSetMultimapCache(evictionScheduler, codec, executorService, name); + } + + @Override + public RMultimapCacheAsync getListMultimapCache(String name) { + return new RedissonListMultimapCache(evictionScheduler, executorService, name); + } + + @Override + public RMultimapCacheAsync getListMultimapCache(String name, Codec codec) { + return new RedissonListMultimapCache(evictionScheduler, codec, executorService, name); + } + } diff --git a/src/main/java/org/redisson/core/RBatch.java b/src/main/java/org/redisson/core/RBatch.java index b573811cf..5790b18a0 100644 --- a/src/main/java/org/redisson/core/RBatch.java +++ b/src/main/java/org/redisson/core/RBatch.java @@ -73,6 +73,29 @@ public interface RBatch { */ RMultimapAsync getSetMultimap(String name, Codec codec); + /** + * Returns Set based Multimap instance by name. + * Supports key-entry eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSetMultimap(String)}.

+ * + * @param name + * @return + */ + RMultimapCacheAsync getSetMultimapCache(String name); + + /** + * Returns Set based Multimap instance by name + * using provided codec for both map keys and values. + * Supports key-entry eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSetMultimap(String, Codec)}.

+ * + * @param name + * @return + */ + RMultimapCacheAsync getSetMultimapCache(String name, Codec codec); + /** * Returns set-based cache instance by name. * Uses map (value_hash, value) under the hood for minimal memory consumption. @@ -172,6 +195,29 @@ public interface RBatch { */ RMultimapAsync getListMultimap(String name, Codec codec); + /** + * Returns List based Multimap instance by name. + * Supports key-entry eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSetMultimap(String)}.

+ * + * @param name + * @return + */ + RMultimapAsync getListMultimapCache(String name); + + /** + * Returns List based Multimap instance by name + * using provided codec for both map keys and values. + * Supports key-entry eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSetMultimap(String, Codec)}.

+ * + * @param name + * @return + */ + RMultimapAsync getListMultimapCache(String name, Codec codec); + /** * Returns map instance by name. * From 42b3c1eaaf9d733690cbd2bb2d122eeaad2b300e Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 9 Jun 2016 13:59:07 +0300 Subject: [PATCH 13/22] Added ability to define Codec for RemoteService. #523 --- src/main/java/org/redisson/Redisson.java | 10 +++++++ .../java/org/redisson/RedissonClient.java | 17 +++++++++++ .../org/redisson/RedissonRemoteService.java | 30 +++++++++++++++---- 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 87f2e99f2..db480403b 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -400,6 +400,16 @@ public class Redisson implements RedissonClient { public RRemoteService getRemoteSerivce(String name) { return new RedissonRemoteService(this, name); } + + @Override + public RRemoteService getRemoteSerivce(Codec codec) { + return new RedissonRemoteService(codec, this); + } + + @Override + public RRemoteService getRemoteSerivce(String name, Codec codec) { + return new RedissonRemoteService(codec, this, name); + } @Override public RSortedSet getSortedSet(String name) { diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index f2aa29c16..dfa4bab5d 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -611,6 +611,14 @@ public interface RedissonClient { * @return */ RRemoteService getRemoteSerivce(); + + /** + * Returns object for remote operations prefixed with the default name (redisson_remote_service) + * and uses provided codec for method arguments and result. + * + * @return + */ + RRemoteService getRemoteSerivce(Codec codec); /** * Returns object for remote operations prefixed with the specified name @@ -619,6 +627,15 @@ public interface RedissonClient { * @return */ RRemoteService getRemoteSerivce(String name); + + /** + * Returns object for remote operations prefixed with the specified name + * and uses provided codec for method arguments and result. + * + * @param name The name used as the Redis key prefix for the services + * @return + */ + RRemoteService getRemoteSerivce(String name, Codec codec); /** * Return batch object which executes group of diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 30c24500d..337b03dca 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.core.RBatch; import org.redisson.core.RBlockingQueue; @@ -64,6 +65,7 @@ public class RedissonRemoteService implements RRemoteService { private final Map beans = PlatformDependent.newConcurrentHashMap(); + private final Codec codec; private final Redisson redisson; private final String name; @@ -72,6 +74,15 @@ public class RedissonRemoteService implements RRemoteService { } public RedissonRemoteService(Redisson redisson, String name) { + this(null, redisson, name); + } + + public RedissonRemoteService(Codec codec, Redisson redisson) { + this(codec, redisson, "redisson_remote_service"); + } + + public RedissonRemoteService(Codec codec, Redisson redisson, String name) { + this.codec = codec; this.redisson = redisson; this.name = name; } @@ -96,10 +107,17 @@ public class RedissonRemoteService implements RRemoteService { for (int i = 0; i < executorsAmount; i++) { String requestQueueName = name + ":{" + remoteInterface.getName() + "}"; - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); subscribe(remoteInterface, requestQueue); } } + + private Codec getCodec() { + if (codec != null) { + return codec; + } + return redisson.getConfig().getCodec(); + } private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue) { Future take = requestQueue.takeAsync(); @@ -141,7 +159,7 @@ public class RedissonRemoteService implements RRemoteService { + "return 1;" + "end;" + "return 0;", RScript.ReturnType.BOOLEAN, Arrays.asList(ackName, responseName), - redisson.getConfig().getCodec().getValueEncoder().encode(new RemoteServiceAck()), request.getOptions().getAckTimeoutInMillis()); + getCodec().getValueEncoder().encode(new RemoteServiceAck()), request.getOptions().getAckTimeoutInMillis()); // Future> ackClientsFuture = send(request.getOptions().getAckTimeoutInMillis(), responseName, new RemoteServiceAck()); // ackClientsFuture.addListener(new FutureListener>() { ackClientsFuture.addListener(new FutureListener() { @@ -271,7 +289,7 @@ public class RedissonRemoteService implements RRemoteService { final Promise result = ImmediateEventExecutor.INSTANCE.newPromise(); String requestQueueName = name + ":{" + interfaceName + "}"; - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); final RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, optionsCopy, System.currentTimeMillis()); Future addFuture = requestQueue.addAsync(request); @@ -287,7 +305,7 @@ public class RedissonRemoteService implements RRemoteService { final RBlockingQueue responseQueue; if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) { String responseName = name + ":{" + interfaceName + "}:" + requestId; - responseQueue = redisson.getBlockingQueue(responseName); + responseQueue = redisson.getBlockingQueue(responseName, getCodec()); } else { responseQueue = null; } @@ -408,7 +426,7 @@ public class RedissonRemoteService implements RRemoteService { String requestId = generateRequestId(); String requestQueueName = name + ":{" + interfaceName + "}"; - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, optionsCopy, System.currentTimeMillis()); requestQueue.add(request); @@ -416,7 +434,7 @@ public class RedissonRemoteService implements RRemoteService { RBlockingQueue responseQueue = null; if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) { String responseName = name + ":{" + interfaceName + "}:" + requestId; - responseQueue = redisson.getBlockingQueue(responseName); + responseQueue = redisson.getBlockingQueue(responseName, getCodec()); } // poll for the ack only if expected From f5542bd6b9f8a892a989d3bd58e9eb47feb61d6e Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 9 Jun 2016 15:53:40 +0300 Subject: [PATCH 14/22] RList.remove optimization --- src/main/java/org/redisson/RedissonList.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index e8e46c490..550f30169 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -328,8 +328,6 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public V remove(int index) { - checkIndex(index); - if (index == 0) { Future f = commandExecutor.writeAsync(getName(), codec, LPOP, getName()); return get(f); @@ -337,18 +335,13 @@ public class RedissonList extends RedissonExpirable implements RList { Future f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT, "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + - "local tail = redis.call('lrange', KEYS[1], ARGV[1] + 1, -1);" + - "redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" + - "if #tail > 0 then " + - "for i=1, #tail, 5000 do " - + "redis.call('rpush', KEYS[1], unpack(tail, i, math.min(i+4999, #tail))); " - + "end " - + "end;" + - "return v", + "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" + + "return v", Collections.singletonList(getName()), index); return get(f); } - + @Override public int indexOf(Object o) { return get(indexOfAsync(o)); From 6cdee25f43e80cc2f272468d1efc48b4aa54d760 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 9 Jun 2016 16:16:13 +0300 Subject: [PATCH 15/22] RList.fastRemove and fastRemoveAsync methods were added --- src/main/java/org/redisson/RedissonList.java | 13 +++++++++++++ src/main/java/org/redisson/core/RList.java | 2 ++ src/main/java/org/redisson/core/RListAsync.java | 2 ++ 3 files changed, 17 insertions(+) diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index 550f30169..78083113e 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -342,6 +342,19 @@ public class RedissonList extends RedissonExpirable implements RList { return get(f); } + @Override + public void fastRemove(int index) { + get(fastRemoveAsync(index)); + } + + @Override + public Future fastRemoveAsync(int index) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, + "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');", + Collections.singletonList(getName()), index); + } + @Override public int indexOf(Object o) { return get(indexOfAsync(o)); diff --git a/src/main/java/org/redisson/core/RList.java b/src/main/java/org/redisson/core/RList.java index 02506780b..fe865c142 100644 --- a/src/main/java/org/redisson/core/RList.java +++ b/src/main/java/org/redisson/core/RList.java @@ -74,4 +74,6 @@ public interface RList extends List, RExpirable, RListAsync, RandomAcce */ void trim(int fromIndex, int toIndex); + void fastRemove(int index); + } diff --git a/src/main/java/org/redisson/core/RListAsync.java b/src/main/java/org/redisson/core/RListAsync.java index d4032587c..49722b029 100644 --- a/src/main/java/org/redisson/core/RListAsync.java +++ b/src/main/java/org/redisson/core/RListAsync.java @@ -85,4 +85,6 @@ public interface RListAsync extends RCollectionAsync, RandomAccess { */ Future trimAsync(int fromIndex, int toIndex); + Future fastRemoveAsync(int index); + } From fb8cae77057c291604e04cf35178182673f8061e Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 9 Jun 2016 16:25:05 +0300 Subject: [PATCH 16/22] Compilation fixed --- .../org/redisson/connection/MasterSlaveConnectionManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 60c0bff85..e04d25777 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -470,8 +470,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return null; } - Promise result = newPromise(); - Codec entryCodec = entry.getConnection().getChannels().get(channelName); + final Promise result = newPromise(); + final Codec entryCodec = entry.getConnection().getChannels().get(channelName); entry.unsubscribe(channelName, new BaseRedisPubSubListener() { @Override From 4fcd54ec8cc7457434d0eb50de52274658625f3c Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 9 Jun 2016 16:25:47 +0300 Subject: [PATCH 17/22] RSortedSet performance boost up to 4 times! --- src/main/java/org/redisson/Redisson.java | 4 +- .../redisson/RedissonListMultimapValues.java | 26 ++- .../java/org/redisson/RedissonSortedSet.java | 214 +++++++----------- src/main/java/org/redisson/SyncOperation.java | 25 -- .../redisson/command/CommandSyncExecutor.java | 5 - .../redisson/command/CommandSyncService.java | 74 ------ .../ConcurrentRedissonSortedSetTest.java | 1 - 7 files changed, 94 insertions(+), 255 deletions(-) delete mode 100644 src/main/java/org/redisson/SyncOperation.java diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index db480403b..ccde73895 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -413,12 +413,12 @@ public class Redisson implements RedissonClient { @Override public RSortedSet getSortedSet(String name) { - return new RedissonSortedSet(commandExecutor, name); + return new RedissonSortedSet(commandExecutor, name, this); } @Override public RSortedSet getSortedSet(String name, Codec codec) { - return new RedissonSortedSet(codec, commandExecutor, name); + return new RedissonSortedSet(codec, commandExecutor, name, this); } @Override diff --git a/src/main/java/org/redisson/RedissonListMultimapValues.java b/src/main/java/org/redisson/RedissonListMultimapValues.java index 3cf698a8e..3e40b3308 100644 --- a/src/main/java/org/redisson/RedissonListMultimapValues.java +++ b/src/main/java/org/redisson/RedissonListMultimapValues.java @@ -433,8 +433,6 @@ public class RedissonListMultimapValues extends RedissonExpirable implements @Override public V remove(int index) { - checkIndex(index); - if (index == 0) { Future f = commandExecutor.writeAsync(getName(), codec, LPOP, getName()); return get(f); @@ -442,18 +440,26 @@ public class RedissonListMultimapValues extends RedissonExpirable implements Future f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT, "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + - "local tail = redis.call('lrange', KEYS[1], ARGV[1] + 1, -1);" + - "redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" + - "if #tail > 0 then " + - "for i=1, #tail, 5000 do " - + "redis.call('rpush', KEYS[1], unpack(tail, i, math.min(i+4999, #tail))); " - + "end " - + "end;" + - "return v", + "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" + + "return v", Collections.singletonList(getName()), index); return get(f); } + @Override + public void fastRemove(int index) { + get(fastRemoveAsync(index)); + } + + @Override + public Future fastRemoveAsync(int index) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, + "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');", + Collections.singletonList(getName()), index); + } + @Override public int indexOf(Object o) { return get(indexOfAsync(o)); diff --git a/src/main/java/org/redisson/RedissonSortedSet.java b/src/main/java/org/redisson/RedissonSortedSet.java index e0dd93bd7..8f1b3edb3 100644 --- a/src/main/java/org/redisson/RedissonSortedSet.java +++ b/src/main/java/org/redisson/RedissonSortedSet.java @@ -19,28 +19,25 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.math.BigDecimal; import java.math.BigInteger; import java.security.MessageDigest; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.Iterator; -import java.util.List; import java.util.NoSuchElementException; import java.util.SortedSet; -import org.redisson.client.RedisConnection; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; +import org.redisson.core.RBucket; +import org.redisson.core.RLock; import org.redisson.core.RSortedSet; import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; /** @@ -95,34 +92,36 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet comparator = NaturalComparator.NATURAL_ORDER; CommandExecutor commandExecutor; + + private RLock lock; + private RedissonList list; + private RBucket comparatorHolder; - protected RedissonSortedSet(CommandExecutor commandExecutor, String name) { + protected RedissonSortedSet(CommandExecutor commandExecutor, String name, Redisson redisson) { super(commandExecutor, name); this.commandExecutor = commandExecutor; + comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE); + lock = redisson.getLock("redisson_sortedset_lock:{" + getName() + "}"); + list = (RedissonList) redisson.getList(getName()); + loadComparator(); } - public RedissonSortedSet(Codec codec, CommandExecutor commandExecutor, String name) { + public RedissonSortedSet(Codec codec, CommandExecutor commandExecutor, String name, Redisson redisson) { super(codec, commandExecutor, name); this.commandExecutor = commandExecutor; + comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE); + lock = redisson.getLock("redisson_sortedset_lock:{" + getName() + "}"); + list = (RedissonList) redisson.getList(getName()); + loadComparator(); } private void loadComparator() { - commandExecutor.read(getName(), codec, new SyncOperation() { - @Override - public Void execute(Codec codec, RedisConnection conn) { - loadComparator(conn); - return null; - } - }); - } - - private void loadComparator(RedisConnection connection) { try { - String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName()); + String comparatorSign = comparatorHolder.get(); if (comparatorSign != null) { String[] parts = comparatorSign.split(":"); String className = parts[0]; @@ -136,6 +135,8 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet clazz = Class.forName(className); comparator = (Comparator) clazz.newInstance(); } + } catch (IllegalStateException e) { + throw e; } catch (Exception e) { throw new IllegalStateException(e); } @@ -163,26 +164,17 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { - @Override - public Boolean execute(Codec codec, RedisConnection conn) { - return binarySearch((V)o, codec, conn).getIndex() >= 0; - } - }); + return binarySearch((V)o, codec).getIndex() >= 0; } public Iterator iterator() { @@ -206,7 +198,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSet res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1); - return res.toArray(); + return list.toArray(); } @Override public T[] toArray(T[] a) { - List res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1); - return res.toArray(a); + return list.toArray(a); } @Override - public boolean add(final V value) { - return commandExecutor.write(getName(), codec, new SyncOperation() { - @Override - public Boolean execute(Codec codec, RedisConnection conn) { - return add(value, codec, conn); - } - }); - } - - public Future addAsync(final V value) { - final Promise promise = commandExecutor.getConnectionManager().newPromise(); - commandExecutor.getConnectionManager().getGroup().execute(new Runnable() { - public void run() { - try { - boolean res = add(value); - promise.setSuccess(res); - } catch (Exception e) { - promise.setFailure(e); - } - } - }); - return promise; - } - - boolean add(V value, Codec codec, RedisConnection connection) { - while (true) { - connection.sync(RedisCommands.WATCH, getName(), getComparatorKeyName()); - - checkComparator(connection); - - BinarySearchResult res = binarySearch(value, codec, connection); - if (res.getIndex() == null) { - continue; - } + public boolean add(V value) { + lock.lock(); + + try { + checkComparator(); + + BinarySearchResult res = binarySearch(value, codec); if (res.getIndex() < 0) { int index = -(res.getIndex() + 1); @@ -284,45 +243,47 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet re = connection.sync(codec, RedisCommands.EXEC); - if (re.size() == 1) { - return true; - } + commandExecutor.evalWrite(getName(), RedisCommands.EVAL_VOID, + "local len = redis.call('llen', KEYS[1]);" + + "if tonumber(ARGV[1]) < len then " + + "local pivot = redis.call('lindex', KEYS[1], ARGV[1]);" + + "redis.call('linsert', KEYS[1], 'before', pivot, ARGV[2]);" + + "return;" + + "end;" + + "redis.call('rpush', KEYS[1], ARGV[2]);", Arrays.asList(getName()), index, encodedValue); + return true; } else { - connection.sync(RedisCommands.UNWATCH); return false; } + } finally { + lock.unlock(); } } - private void checkComparator(RedisConnection connection) { - String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName()); + private void checkComparator() { + String comparatorSign = comparatorHolder.get(); if (comparatorSign != null) { String[] vals = comparatorSign.split(":"); String className = vals[0]; if (!comparator.getClass().getName().equals(className)) { - loadComparator(connection); + loadComparator(); } } } - public static double calcIncrement(double value) { - BigDecimal b = BigDecimal.valueOf(value); - BigDecimal r = b.remainder(BigDecimal.ONE); - if (r.compareTo(BigDecimal.ZERO) == 0) { - return 1; - } - double res = 1/Math.pow(10, r.scale()); - return res; + public Future addAsync(final V value) { + final Promise promise = newPromise(); + commandExecutor.getConnectionManager().getGroup().execute(new Runnable() { + public void run() { + try { + boolean res = add(value); + promise.setSuccess(res); + } catch (Exception e) { + promise.setFailure(e); + } + } + }); + return promise; } @Override @@ -346,44 +307,21 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet() { - @Override - public Boolean execute(Codec codec, RedisConnection conn) { - return remove(value, codec, conn); - } - }); - } + public boolean remove(Object value) { + lock.lock(); - boolean remove(Object value, Codec codec, RedisConnection conn) { - while (true) { - conn.sync(RedisCommands.WATCH, getName()); - BinarySearchResult res = binarySearch((V) value, codec, conn); - if (res.getIndex() == null) { - conn.sync(RedisCommands.UNWATCH); - continue; - } + try { + checkComparator(); + + BinarySearchResult res = binarySearch((V) value, codec); if (res.getIndex() < 0) { - conn.sync(RedisCommands.UNWATCH); return false; } - conn.sync(RedisCommands.MULTI); - if (res.getIndex() == 0) { - conn.sync(codec, RedisCommands.LPOP, getName()); - } else { - conn.sync(RedisCommands.EVAL_VOID, - "local len = redis.call('llen', KEYS[1]);" - + "local tail = redis.call('lrange', KEYS[1], tonumber(ARGV[1]) + 1, len);" - + "redis.call('ltrim', KEYS[1], 0, tonumber(ARGV[1]) - 1);" - + "if #tail > 0 then " - + "redis.call('rpush', KEYS[1], unpack(tail)); " - + "end;", 1, getName(), res.getIndex()); - } - - if (((List)conn.sync(codec, RedisCommands.EXEC)).size() == 1) { - return true; - } + list.remove((int)res.getIndex()); + return true; + } finally { + lock.unlock(); } } @@ -460,7 +398,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSet extends RedissonObject implements RSortedSet binarySearch(V value, Codec codec, RedisConnection connection) { - int size = size(connection); + + public BinarySearchResult binarySearch(V value, Codec codec) { + int size = list.size(); int upperIndex = size - 1; int lowerIndex = 0; while (lowerIndex <= upperIndex) { int index = lowerIndex + (upperIndex - lowerIndex) / 2; - V res = connection.sync(codec, RedisCommands.LINDEX, getName(), index); + V res = list.getValue(index); if (res == null) { return new BinarySearchResult(); } diff --git a/src/main/java/org/redisson/SyncOperation.java b/src/main/java/org/redisson/SyncOperation.java deleted file mode 100644 index d6fc5449d..000000000 --- a/src/main/java/org/redisson/SyncOperation.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright 2014 Nikita Koksharov, Nickolay Borbit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson; - -import org.redisson.client.RedisConnection; -import org.redisson.client.codec.Codec; - -public interface SyncOperation { - - R execute(Codec codec, RedisConnection conn); - -} diff --git a/src/main/java/org/redisson/command/CommandSyncExecutor.java b/src/main/java/org/redisson/command/CommandSyncExecutor.java index a022cd4c8..4e60b8def 100644 --- a/src/main/java/org/redisson/command/CommandSyncExecutor.java +++ b/src/main/java/org/redisson/command/CommandSyncExecutor.java @@ -18,7 +18,6 @@ package org.redisson.command; import java.net.InetSocketAddress; import java.util.List; -import org.redisson.SyncOperation; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; @@ -42,10 +41,6 @@ public interface CommandSyncExecutor { R read(String key, RedisCommand command, Object ... params); - R read(String key, Codec codec, SyncOperation operation); - - R write(String key, Codec codec, SyncOperation operation); - R read(String key, Codec codec, RedisCommand command, Object ... params); R evalRead(String key, RedisCommand evalCommandType, String script, List keys, Object ... params); diff --git a/src/main/java/org/redisson/command/CommandSyncService.java b/src/main/java/org/redisson/command/CommandSyncService.java index 738ac94ae..007ea6ab1 100644 --- a/src/main/java/org/redisson/command/CommandSyncService.java +++ b/src/main/java/org/redisson/command/CommandSyncService.java @@ -18,23 +18,13 @@ package org.redisson.command; import java.net.InetSocketAddress; import java.util.List; -import org.redisson.SyncOperation; -import org.redisson.client.RedisAskException; -import org.redisson.client.RedisConnection; -import org.redisson.client.RedisException; -import org.redisson.client.RedisLoadingException; -import org.redisson.client.RedisMovedException; -import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.NodeSource; -import org.redisson.connection.NodeSource.Redirect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; /** * @@ -100,70 +90,6 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx return get(res); } - @Override - public R write(String key, Codec codec, SyncOperation operation) { - int slot = connectionManager.calcSlot(key); - return sync(false, codec, new NodeSource(slot), operation, 0); - } - - @Override - public R read(String key, Codec codec, SyncOperation operation) { - int slot = connectionManager.calcSlot(key); - return sync(true, codec, new NodeSource(slot), operation, 0); - } - - R sync(boolean readOnlyMode, Codec codec, NodeSource source, SyncOperation operation, int attempt) { - if (!connectionManager.getShutdownLatch().acquire()) { - throw new IllegalStateException("Redisson is shutdown"); - } - - try { - Future connectionFuture; - if (readOnlyMode) { - connectionFuture = connectionManager.connectionReadOp(source, null); - } else { - connectionFuture = connectionManager.connectionWriteOp(source, null); - } - connectionFuture.syncUninterruptibly(); - - RedisConnection connection = connectionFuture.getNow(); - - try { - return operation.execute(codec, connection); - } catch (RedisMovedException e) { - return sync(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.MOVED), operation, attempt); - } catch (RedisAskException e) { - return sync(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.ASK), operation, attempt); - } catch (RedisLoadingException e) { - return sync(readOnlyMode, codec, source, operation, attempt); - } catch (RedisTimeoutException e) { - if (attempt == connectionManager.getConfig().getRetryAttempts()) { - throw e; - } - attempt++; - return sync(readOnlyMode, codec, source, operation, attempt); - } finally { - connectionManager.getShutdownLatch().release(); - if (readOnlyMode) { - connectionManager.releaseRead(source, connection); - } else { - connectionManager.releaseWrite(source, connection); - } - } - } catch (RedisException e) { - if (attempt == connectionManager.getConfig().getRetryAttempts()) { - throw e; - } - try { - Thread.sleep(connectionManager.getConfig().getRetryInterval()); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - } - attempt++; - return sync(readOnlyMode, codec, source, operation, attempt); - } - } - @Override public R write(String key, Codec codec, RedisCommand command, Object ... params) { Future res = writeAsync(key, codec, command, params); diff --git a/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java b/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java index 3dff1f5ab..18db0a135 100644 --- a/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java +++ b/src/test/java/org/redisson/ConcurrentRedissonSortedSetTest.java @@ -3,7 +3,6 @@ package org.redisson; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; From 3d88888f8dfc690933eac08726ba41f8a938a816 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 10 Jun 2016 12:52:28 +0300 Subject: [PATCH 18/22] BlockingQueue test added --- .../redisson/RedissonBlockingQueueTest.java | 61 ++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/src/test/java/org/redisson/RedissonBlockingQueueTest.java index bf0ed2b0c..d047e9d7f 100644 --- a/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -12,6 +12,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; @@ -20,6 +21,8 @@ import org.redisson.RedisRunner.RedisProcess; import org.redisson.core.RBlockingQueue; import io.netty.util.concurrent.Future; + +import static com.jayway.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; public class RedissonBlockingQueueTest extends BaseTest { @@ -38,10 +41,12 @@ public class RedissonBlockingQueueTest extends BaseTest { final RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollTimeout"); Future f = queue1.pollAsync(5, TimeUnit.SECONDS); - f.await(1, TimeUnit.SECONDS); + Assert.assertFalse(f.await(1, TimeUnit.SECONDS)); runner.stop(); - + + long start = System.currentTimeMillis(); assertThat(f.get()).isNull(); + assertThat(System.currentTimeMillis() - start).isGreaterThan(3800); } @Test @@ -56,6 +61,58 @@ public class RedissonBlockingQueueTest extends BaseTest { config.useSingleServer().setAddress("127.0.0.1:6319"); RedissonClient redisson = Redisson.create(config); + final AtomicBoolean executed = new AtomicBoolean(); + + Thread t = new Thread() { + public void run() { + try { + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + long start = System.currentTimeMillis(); + Integer res = queue1.poll(10, TimeUnit.SECONDS); + assertThat(System.currentTimeMillis() - start).isGreaterThan(2000); + assertThat(res).isEqualTo(123); + executed.set(true); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }; + }; + + t.start(); + t.join(1000); + runner.stop(); + + runner = new RedisRunner() + .port(6319) + .nosave() + .randomDir() + .run(); + + Thread.sleep(1000); + + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + queue1.put(123); + + t.join(); + + await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue()); + + runner.stop(); + } + + @Test + public void testPollAsyncReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException { + RedisProcess runner = new RedisRunner() + .port(6319) + .nosave() + .randomDir() + .run(); + + Config config = new Config(); + config.useSingleServer().setAddress("127.0.0.1:6319"); + RedissonClient redisson = Redisson.create(config); + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); Future f = queue1.pollAsync(10, TimeUnit.SECONDS); f.await(1, TimeUnit.SECONDS); From 69f90ff8f3a5af0252898a8436c9602d59a5b961 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 10 Jun 2016 15:35:07 +0300 Subject: [PATCH 19/22] Add Spring 4.3.0 support to RedissonSpringCacheManager --- .../java/org/redisson/RedissonMultimap.java | 17 +--- src/main/java/org/redisson/misc/Hash.java | 42 ++++++++++ .../redisson/spring/cache/RedissonCache.java | 79 ++++++++++++++++++- .../cache/RedissonSpringCacheManager.java | 6 +- 4 files changed, 124 insertions(+), 20 deletions(-) create mode 100644 src/main/java/org/redisson/misc/Hash.java diff --git a/src/main/java/org/redisson/RedissonMultimap.java b/src/main/java/org/redisson/RedissonMultimap.java index 1075b2e33..602fa3cec 100644 --- a/src/main/java/org/redisson/RedissonMultimap.java +++ b/src/main/java/org/redisson/RedissonMultimap.java @@ -38,13 +38,9 @@ import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.core.RMultimap; +import org.redisson.misc.Hash; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.handler.codec.base64.Base64; -import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; -import net.openhft.hashing.LongHashFunction; /** * @author Nikita Koksharov @@ -63,16 +59,7 @@ public abstract class RedissonMultimap extends RedissonExpirable implement } protected String hash(byte[] objectState) { - long h1 = LongHashFunction.farmUo().hashBytes(objectState); - long h2 = LongHashFunction.xx_r39().hashBytes(objectState); - - ByteBuf buf = Unpooled.buffer((2 * Long.SIZE) / Byte.SIZE).writeLong(h1).writeLong(h2); - - ByteBuf b = Base64.encode(buf); - String s = b.toString(CharsetUtil.UTF_8); - b.release(); - buf.release(); - return s.substring(0, s.length() - 2); + return Hash.hashToBase64(objectState); } @Override diff --git a/src/main/java/org/redisson/misc/Hash.java b/src/main/java/org/redisson/misc/Hash.java new file mode 100644 index 000000000..e259bdf77 --- /dev/null +++ b/src/main/java/org/redisson/misc/Hash.java @@ -0,0 +1,42 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.misc; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.base64.Base64; +import io.netty.util.CharsetUtil; +import net.openhft.hashing.LongHashFunction; + +public class Hash { + + private Hash() { + } + + public static String hashToBase64(byte[] objectState) { + long h1 = LongHashFunction.farmUo().hashBytes(objectState); + long h2 = LongHashFunction.xx_r39().hashBytes(objectState); + + ByteBuf buf = Unpooled.buffer((2 * Long.SIZE) / Byte.SIZE).writeLong(h1).writeLong(h2); + + ByteBuf b = Base64.encode(buf); + String s = b.toString(CharsetUtil.UTF_8); + b.release(); + buf.release(); + return s.substring(0, s.length() - 2); + } + +} diff --git a/src/main/java/org/redisson/spring/cache/RedissonCache.java b/src/main/java/org/redisson/spring/cache/RedissonCache.java index 2cbb70c48..6108be0de 100644 --- a/src/main/java/org/redisson/spring/cache/RedissonCache.java +++ b/src/main/java/org/redisson/spring/cache/RedissonCache.java @@ -15,10 +15,19 @@ */ package org.redisson.spring.cache; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.redisson.RedissonClient; +import org.redisson.core.RLock; import org.redisson.core.RMap; import org.redisson.core.RMapCache; +import org.redisson.misc.Hash; import org.springframework.cache.Cache; import org.springframework.cache.support.SimpleValueWrapper; @@ -34,15 +43,19 @@ public class RedissonCache implements Cache { private final RMap map; private CacheConfig config; + + private final RedissonClient redisson; - public RedissonCache(RMapCache mapCache, CacheConfig config) { + public RedissonCache(RedissonClient redisson, RMapCache mapCache, CacheConfig config) { this.mapCache = mapCache; this.map = mapCache; this.config = config; + this.redisson = redisson; } - public RedissonCache(RMap map) { + public RedissonCache(RedissonClient redisson, RMap map) { this.map = map; + this.redisson = redisson; } @Override @@ -113,4 +126,66 @@ public class RedissonCache implements Cache { return new SimpleValueWrapper(value); } + final Map valueLoaderLocks = new ConcurrentHashMap(); + + public Lock getLock(Object key) { + Lock lock = valueLoaderLocks.get(key); + if (lock == null) { + Lock newlock = new ReentrantLock(); + lock = valueLoaderLocks.putIfAbsent(key, newlock); + if (lock == null) { + lock = newlock; + } + } + return lock; + } + + public T get(Object key, Callable valueLoader) { + Object value = map.get(key); + if (value == null) { + String lockName = getLockName(key); + RLock lock = redisson.getLock(lockName); + lock.lock(); + try { + value = map.get(key); + if (value == null) { + try { + value = toStoreValue(valueLoader.call()); + } catch (Exception ex) { + throw new ValueRetrievalException(key, valueLoader, ex.getCause()); + } + map.put(key, value); + } + } finally { + lock.unlock(); + } + } + + return (T) fromStoreValue(value); + } + + private String getLockName(Object key) { + try { + byte[] keyState = redisson.getConfig().getCodec().getMapKeyEncoder().encode(key); + Hash.hashToBase64(keyState); + return "{" + map.getName() + "}:" + Hash.hashToBase64(keyState) + ":key"; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + protected Object fromStoreValue(Object storeValue) { + if (storeValue == NullValue.INSTANCE) { + return null; + } + return storeValue; + } + + protected Object toStoreValue(Object userValue) { + if (userValue == null) { + return NullValue.INSTANCE; + } + return userValue; + } + } diff --git a/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java b/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java index 2c4f84830..9f9166e2d 100644 --- a/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java +++ b/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java @@ -158,14 +158,14 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA configMap.put(name, config); RMap map = createMap(name); - return new RedissonCache(map); + return new RedissonCache(redisson, map); } if (config.getMaxIdleTime() == 0 && config.getTTL() == 0) { RMap map = createMap(name); - return new RedissonCache(map); + return new RedissonCache(redisson, map); } RMapCache map = createMapCache(name); - return new RedissonCache(map, config); + return new RedissonCache(redisson, map, config); } private RMap createMap(String name) { From e6ccd7f3ebb884798e6a4e04647482ef27625457 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 10 Jun 2016 15:36:39 +0300 Subject: [PATCH 20/22] RedissonCache.getLockName minor optimization --- src/main/java/org/redisson/spring/cache/RedissonCache.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/redisson/spring/cache/RedissonCache.java b/src/main/java/org/redisson/spring/cache/RedissonCache.java index 6108be0de..ec366a56f 100644 --- a/src/main/java/org/redisson/spring/cache/RedissonCache.java +++ b/src/main/java/org/redisson/spring/cache/RedissonCache.java @@ -167,7 +167,6 @@ public class RedissonCache implements Cache { private String getLockName(Object key) { try { byte[] keyState = redisson.getConfig().getCodec().getMapKeyEncoder().encode(key); - Hash.hashToBase64(keyState); return "{" + map.getName() + "}:" + Hash.hashToBase64(keyState) + ":key"; } catch (IOException e) { throw new IllegalStateException(e); From c507014c262fc12157e43bc5c00d7fb522b660ad Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 10 Jun 2016 18:21:11 +0300 Subject: [PATCH 21/22] Fixed cluster state managing with redis masters only. #491 --- .../java/org/redisson/cluster/ClusterConnectionManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 0fd5e56b8..9741e3f41 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -226,9 +226,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { e = new MasterSlaveEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config); + List> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses()); + futures.addAll(fs); if (!partition.getSlaveAddresses().isEmpty()) { - List> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses()); - futures.addAll(fs); log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); if (!partition.getFailedSlaveAddresses().isEmpty()) { log.warn("slaves: {} is down for slot ranges: {}", partition.getFailedSlaveAddresses(), partition.getSlotRanges()); @@ -403,7 +403,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { currentPart.addSlaveAddress(uri); entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER); - log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); + log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); } }); } From 08105c03820ff6aa1a64bc41a29093b57a4cfb64 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 10 Jun 2016 18:46:53 +0300 Subject: [PATCH 22/22] libs updated --- pom.xml | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index d18462cf1..0b7802ba4 100644 --- a/pom.xml +++ b/pom.xml @@ -100,33 +100,33 @@ io.netty netty-transport-native-epoll - 4.0.36.Final + 4.0.37.Final provided io.netty netty-common - 4.0.36.Final + 4.0.37.Final io.netty netty-codec - 4.0.36.Final + 4.0.37.Final io.netty netty-buffer - 4.0.36.Final + 4.0.37.Final io.netty netty-transport - 4.0.36.Final + 4.0.37.Final io.netty netty-handler - 4.0.36.Final + 4.0.37.Final @@ -138,7 +138,7 @@ org.assertj assertj-core - 3.3.0 + 3.4.1 test @@ -150,7 +150,7 @@ junit junit - 4.11 + 4.12 test @@ -175,19 +175,19 @@ org.msgpack jackson-dataformat-msgpack - 0.8.2 + 0.8.7 provided org.xerial.snappy snappy-java - 1.1.2.1 + 1.1.2.6 provided de.ruedigermoeller fst - 2.43 + 2.47 provided @@ -199,7 +199,7 @@ org.slf4j slf4j-api - 1.7.14 + 1.7.21