diff --git a/CHANGELOG.md b/CHANGELOG.md index 5dec73ac7..1f97e01fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,27 @@ Redisson Releases History Try __[Redisson PRO](https://redisson.pro)__ version. +### 05-Mar-2018 - versions 2.11.2 and 3.6.2 released + +[Redisson PRO](https://redisson.pro) performance improvements for follow `performanceMode` values: + +`HIGHER_THROUGHPUT` - up to **25%** performance growth +`LOWER_LATENCY_AUTO` - up to **100%** performance growth +`LOWER_LATENCY_MODE_2` - up to **100%** performance growth +`LOWER_LATENCY_MODE_1` - up to **100%** performance growth + +Feature - new values added to `performanceMode` setting +Feature - `lockAsync` and `unlockAsync` methods added to `RedissonMultiLock` +Feature - `RMapCache.remainTimeToLive` method added +Feature - `Config.addressResolverGroupFactory` setting added (thanks @Hai Saadon) +Improvement - `UpdateMode.AFTER_REQUEST` update mode optimization in tomcat session manager +Improvement - removed ByteBuf object usage during collection iteration +Fixed - `Unable to send command` error under heavy load using Redisson PRO +Fixed - `expire`, `expireAt` and `clearExpire` commands aren't implemented properly for `RBloomFilter`, `RDelayedQueue`, `RFairLock`, `RLocalCachedMap` and `RPermitExpirableSemaphore` object +Fixed - Redis clients duplication during discovering ip change of hostname +Fixed - tomcat session renewal in tomcat session manager +Fixed - `failedAttempts` setting should be applied to Slave nodes only + ### 15-Feb-2018 - versions 2.11.1 and 3.6.1 released Feature - `RedissonClusteredSpringLocalCachedCacheManager` added. Please read [documentation](https://github.com/redisson/redisson/wiki/14.-Integration-with-frameworks/#1421-spring-cache-local-cache-and-data-partitioning) for more details diff --git a/README.md b/README.md index 182b9b6e9..a7aeb03ba 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,8 @@ Based on high-performance async and lock-free Java Redis client and [Netty](http | Stable
Release Version | Release Date | JDK Version
compatibility | `CompletionStage`
support | `ProjectReactor` version
compatibility | | ------------- | ------------- | ------------| -----------| -----------| -| 3.6.1 | 15.02.2018 | 1.8, 1.9+ | Yes | 3.1.x | -| 2.11.1 | 15.02.2018 | 1.6, 1.7, 1.8, 1.9 and Android | No | 2.0.8 | +| 3.6.2 | 05.03.2018 | 1.8, 1.9+ | Yes | 3.1.x | +| 2.11.2 | 05.03.2018 | 1.6, 1.7, 1.8, 1.9 and Android | No | 2.0.8 | Features @@ -37,11 +37,12 @@ Features Lock, FairLock, MultiLock, RedLock, ReadWriteLock, Semaphore, PermitExpirableSemaphore, CountDownLatch * [Distributed services](https://github.com/redisson/redisson/wiki/9.-distributed-services) Remote service, Live Object service, Executor service, Scheduler service, MapReduce service -* [Spring Cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#141-spring-cache) implementation   -* [Hibernate Cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#142-hibernate-cache) implementation -* [JCache API (JSR-107)](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#143-jcache-api-jsr-107-implementation) implementation -* [Tomcat Session Manager](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks#144-tomcat-redis-session-manager) implementation -* [Spring Session](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#145-spring-session) implementation +* [Spring Framework](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks#141-spring-framework) +* [Spring Cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#142-spring-cache) implementation +* [Hibernate Cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#143-hibernate-cache) implementation +* [JCache API (JSR-107)](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#144-jcache-api-jsr-107-implementation) implementation +* [Tomcat Session Manager](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks#145-tomcat-redis-session-manager) implementation +* [Spring Session](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#146-spring-session) implementation * [Redis pipelining](https://github.com/redisson/redisson/wiki/10.-additional-features#102-execution-batches-of-commands) (command batches) * Supports Android platform * Supports auto-reconnection @@ -94,23 +95,23 @@ Quick start org.redisson redisson - 3.6.1 + 3.6.2 org.redisson redisson - 2.11.1 + 2.11.2 #### Gradle // JDK 1.8+ compatible - compile 'org.redisson:redisson:3.6.1' + compile 'org.redisson:redisson:3.6.2' // JDK 1.6+ compatible - compile 'org.redisson:redisson:2.11.1' + compile 'org.redisson:redisson:2.11.2' #### Java @@ -135,11 +136,11 @@ RExecutorService executor = redisson.getExecutorService("myExecutorService"); Downloads =============================== -[Redisson 3.6.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.6.1&e=jar), -[Redisson node 3.6.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.6.1&e=jar) +[Redisson 3.6.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.6.2&e=jar), +[Redisson node 3.6.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.6.2&e=jar) -[Redisson 2.11.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.11.1&e=jar), -[Redisson node 2.11.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.11.1&e=jar) +[Redisson 2.11.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.11.2&e=jar), +[Redisson node 2.11.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.11.2&e=jar) ### Supported by diff --git a/pom.xml b/pom.xml index 508235609..a3bba4b5c 100644 --- a/pom.xml +++ b/pom.xml @@ -134,7 +134,7 @@ io.netty netty-bom - 4.1.21.Final + 4.1.22.Final pom import diff --git a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 2dd3f4b62..585df5ce7 100644 --- a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -54,6 +54,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { private String configPath; private ReadMode readMode = ReadMode.MEMORY; private UpdateMode updateMode = UpdateMode.DEFAULT; + private String keyPrefix = ""; public String getUpdateMode() { return updateMode.toString(); @@ -78,7 +79,15 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { public String getConfigPath() { return configPath; } - + + public String getKeyPrefix() { + return keyPrefix; + } + + public void setKeyPrefix(String keyPrefix) { + this.keyPrefix = keyPrefix; + } + @Override public int getRejectedSessions() { return 0; @@ -131,7 +140,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { } public RMap getMap(String sessionId) { - return redisson.getMap("redisson_tomcat_session:" + sessionId); + return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId); } @Override diff --git a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 4df80d6ae..66dd91d79 100644 --- a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -52,6 +52,8 @@ public class RedissonSessionManager extends ManagerBase { private ReadMode readMode = ReadMode.MEMORY; private UpdateMode updateMode = UpdateMode.DEFAULT; + + private String keyPrefix = ""; public String getUpdateMode() { return updateMode.toString(); @@ -76,7 +78,15 @@ public class RedissonSessionManager extends ManagerBase { public String getConfigPath() { return configPath; } - + + public String getKeyPrefix() { + return keyPrefix; + } + + public void setKeyPrefix(String keyPrefix) { + this.keyPrefix = keyPrefix; + } + @Override public String getName() { return RedissonSessionManager.class.getSimpleName(); @@ -110,7 +120,7 @@ public class RedissonSessionManager extends ManagerBase { } public RMap getMap(String sessionId) { - return redisson.getMap("redisson_tomcat_session:" + sessionId); + return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId); } @Override diff --git a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 88ddda4a3..35d4d84f9 100644 --- a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -51,6 +51,8 @@ public class RedissonSessionManager extends ManagerBase { private ReadMode readMode = ReadMode.MEMORY; private UpdateMode updateMode = UpdateMode.DEFAULT; + + private String keyPrefix = ""; public String getUpdateMode() { return updateMode.toString(); @@ -75,7 +77,15 @@ public class RedissonSessionManager extends ManagerBase { public String getConfigPath() { return configPath; } - + + public String getKeyPrefix() { + return keyPrefix; + } + + public void setKeyPrefix(String keyPrefix) { + this.keyPrefix = keyPrefix; + } + @Override public String getName() { return RedissonSessionManager.class.getSimpleName(); @@ -109,7 +119,7 @@ public class RedissonSessionManager extends ManagerBase { } public RMap getMap(String sessionId) { - return redisson.getMap("redisson_tomcat_session:" + sessionId); + return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId); } @Override diff --git a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index eadd455de..b370552f5 100644 --- a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -51,6 +51,8 @@ public class RedissonSessionManager extends ManagerBase { private ReadMode readMode = ReadMode.MEMORY; private UpdateMode updateMode = UpdateMode.DEFAULT; + + private String keyPrefix = ""; public String getUpdateMode() { return updateMode.toString(); @@ -75,7 +77,15 @@ public class RedissonSessionManager extends ManagerBase { public String getConfigPath() { return configPath; } - + + public String getKeyPrefix() { + return keyPrefix; + } + + public void setKeyPrefix(String keyPrefix) { + this.keyPrefix = keyPrefix; + } + @Override public String getName() { return RedissonSessionManager.class.getSimpleName(); @@ -109,7 +119,7 @@ public class RedissonSessionManager extends ManagerBase { } public RMap getMap(String sessionId) { - return redisson.getMap("redisson_tomcat_session:" + sessionId); + return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonRedLock.java b/redisson/src/main/java/org/redisson/RedissonRedLock.java index 37c135e31..379140765 100644 --- a/redisson/src/main/java/org/redisson/RedissonRedLock.java +++ b/redisson/src/main/java/org/redisson/RedissonRedLock.java @@ -51,7 +51,7 @@ public class RedissonRedLock extends RedissonMultiLock { @Override protected long calcLockWaitTime(long remainTime) { - return Math.max(remainTime / locks.size(), 1000); + return Math.max(remainTime / locks.size(), 1); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RBloomFilter.java b/redisson/src/main/java/org/redisson/api/RBloomFilter.java index 92dd1ae01..c7220cac8 100644 --- a/redisson/src/main/java/org/redisson/api/RBloomFilter.java +++ b/redisson/src/main/java/org/redisson/api/RBloomFilter.java @@ -44,6 +44,11 @@ public interface RBloomFilter extends RExpirable { double getFalseProbability(); + /** + * Returns number of bits in Redis memory required by this instance + * + * @return number of bits + */ long getSize(); int getHashIterations(); diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index 6cb6df574..522a52b10 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -928,12 +928,12 @@ public interface RedissonClient { /** * Shutdown Redisson instance but NOT Redis server * - * This equates to invoke shutdown(2, 15, TimeUnit.SECONDS); + * This equates to invoke shutdown(0, 2, TimeUnit.SECONDS); */ void shutdown(); /** - * Shuts down Redisson instance NOT Redis server + * Shuts down Redisson instance but NOT Redis server * * Shutdown ensures that no tasks are submitted for 'the quiet period' * (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period, diff --git a/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java b/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java index d164d94ec..ec69993c4 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java @@ -150,4 +150,9 @@ public class LocalCachedMessageCodec implements Codec { return encoder; } + @Override + public ClassLoader getClassLoader() { + return getClass().getClassLoader(); + } + } diff --git a/redisson/src/main/java/org/redisson/client/BaseRedisPubSubListener.java b/redisson/src/main/java/org/redisson/client/BaseRedisPubSubListener.java index b618cae1a..ffba58572 100644 --- a/redisson/src/main/java/org/redisson/client/BaseRedisPubSubListener.java +++ b/redisson/src/main/java/org/redisson/client/BaseRedisPubSubListener.java @@ -17,6 +17,11 @@ package org.redisson.client; import org.redisson.client.protocol.pubsub.PubSubType; +/** + * + * @author Nikita Koksharov + * + */ public class BaseRedisPubSubListener implements RedisPubSubListener { @Override diff --git a/redisson/src/main/java/org/redisson/client/RedisPubSubListener.java b/redisson/src/main/java/org/redisson/client/RedisPubSubListener.java index 5324bee81..ea13d3867 100644 --- a/redisson/src/main/java/org/redisson/client/RedisPubSubListener.java +++ b/redisson/src/main/java/org/redisson/client/RedisPubSubListener.java @@ -18,6 +18,12 @@ package org.redisson.client; import org.redisson.api.listener.MessageListener; import org.redisson.client.protocol.pubsub.PubSubType; +/** + * + * @author Nikita Koksharov + * + * @param value type + */ public interface RedisPubSubListener extends MessageListener { boolean onStatus(PubSubType type, String channel); diff --git a/redisson/src/main/java/org/redisson/client/SubscribeListener.java b/redisson/src/main/java/org/redisson/client/SubscribeListener.java index 2dc91d5ae..5eeecfeac 100644 --- a/redisson/src/main/java/org/redisson/client/SubscribeListener.java +++ b/redisson/src/main/java/org/redisson/client/SubscribeListener.java @@ -21,6 +21,11 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; +/** + * + * @author Nikita Koksharov + * + */ public class SubscribeListener extends BaseRedisPubSubListener { Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); diff --git a/redisson/src/main/java/org/redisson/client/codec/BaseCodec.java b/redisson/src/main/java/org/redisson/client/codec/BaseCodec.java new file mode 100644 index 000000000..9aa92e81f --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/codec/BaseCodec.java @@ -0,0 +1,53 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.codec; + +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; + +/** + * + * @author Nikita Koksharov + * + */ +public abstract class BaseCodec implements Codec { + + @Override + public Decoder getMapValueDecoder() { + return getValueDecoder(); + } + + @Override + public Encoder getMapValueEncoder() { + return getValueEncoder(); + } + + @Override + public Decoder getMapKeyDecoder() { + return getValueDecoder(); + } + + @Override + public Encoder getMapKeyEncoder() { + return getValueEncoder(); + } + + @Override + public ClassLoader getClassLoader() { + return getClass().getClassLoader(); + } + +} diff --git a/redisson/src/main/java/org/redisson/client/codec/BitSetCodec.java b/redisson/src/main/java/org/redisson/client/codec/BitSetCodec.java index e14b87a0a..27d4a8dfd 100644 --- a/redisson/src/main/java/org/redisson/client/codec/BitSetCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/BitSetCodec.java @@ -81,4 +81,9 @@ public class BitSetCodec implements Codec { throw new UnsupportedOperationException(); } + @Override + public ClassLoader getClassLoader() { + return getClass().getClassLoader(); + } + } diff --git a/redisson/src/main/java/org/redisson/client/codec/ByteArrayCodec.java b/redisson/src/main/java/org/redisson/client/codec/ByteArrayCodec.java index 81afd2cbd..9c9c0c774 100644 --- a/redisson/src/main/java/org/redisson/client/codec/ByteArrayCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/ByteArrayCodec.java @@ -29,7 +29,7 @@ import io.netty.buffer.ByteBufAllocator; * @author Nikita Koksharov * */ -public class ByteArrayCodec implements Codec { +public class ByteArrayCodec extends BaseCodec { public static final ByteArrayCodec INSTANCE = new ByteArrayCodec(); @@ -62,24 +62,4 @@ public class ByteArrayCodec implements Codec { return encoder; } - @Override - public Decoder getMapValueDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapValueEncoder() { - return getValueEncoder(); - } - - @Override - public Decoder getMapKeyDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapKeyEncoder() { - return getValueEncoder(); - } - } diff --git a/redisson/src/main/java/org/redisson/client/codec/Codec.java b/redisson/src/main/java/org/redisson/client/codec/Codec.java index 74c6a6fe2..68b2f1696 100644 --- a/redisson/src/main/java/org/redisson/client/codec/Codec.java +++ b/redisson/src/main/java/org/redisson/client/codec/Codec.java @@ -70,5 +70,12 @@ public interface Codec { * @return encoder */ Encoder getValueEncoder(); + + /** + * Returns class loader object used to load classes used in decoding process + * + * @return class loader + */ + ClassLoader getClassLoader(); } diff --git a/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java b/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java index 25b1bbbe9..8ff933a05 100644 --- a/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java @@ -102,4 +102,9 @@ public class MapScanCodec implements Codec { return delegate.getMapKeyEncoder(); } + @Override + public ClassLoader getClassLoader() { + return getClass().getClassLoader(); + } + } diff --git a/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java b/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java index a50855ca2..f66f67eba 100644 --- a/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java @@ -78,4 +78,9 @@ public class ScanCodec implements Codec { return delegate.getMapKeyEncoder(); } + @Override + public ClassLoader getClassLoader() { + return delegate.getClassLoader(); + } + } diff --git a/redisson/src/main/java/org/redisson/client/codec/StringCodec.java b/redisson/src/main/java/org/redisson/client/codec/StringCodec.java index aecf0cbc4..18ab9e22d 100644 --- a/redisson/src/main/java/org/redisson/client/codec/StringCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/StringCodec.java @@ -31,7 +31,7 @@ import io.netty.util.CharsetUtil; * @author Nikita Koksharov * */ -public class StringCodec implements Codec { +public class StringCodec extends BaseCodec { public static final StringCodec INSTANCE = new StringCodec(); @@ -81,24 +81,4 @@ public class StringCodec implements Codec { return encoder; } - @Override - public Decoder getMapValueDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapValueEncoder() { - return getValueEncoder(); - } - - @Override - public Decoder getMapKeyDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapKeyEncoder() { - return getValueEncoder(); - } - } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index 92271c598..262db3827 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -68,8 +68,9 @@ public class CommandPubSubDecoder extends CommandDecoder { if (result instanceof Message) { checkpoint(); + final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel); + String channelName = ((Message) result).getChannel(); if (result instanceof PubSubStatusMessage) { - String channelName = ((PubSubStatusMessage) result).getChannel(); String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase(); PubSubKey key = new PubSubKey(channelName, operation); CommandData d = commands.get(key); @@ -77,21 +78,28 @@ public class CommandPubSubDecoder extends CommandDecoder { commands.remove(key); entries.put(channelName, new PubSubEntry(d.getMessageDecoder())); } + if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) { commands.remove(key); - entries.remove(key); + if (result instanceof PubSubPatternMessage) { + channelName = ((PubSubPatternMessage)result).getPattern(); + } + PubSubEntry entry = entries.remove(channelName); + if (keepOrder) { + enqueueMessage(result, pubSubConnection, entry); + } } } - final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel); if (keepOrder) { - String ch = ((Message) result).getChannel(); if (result instanceof PubSubPatternMessage) { - ch = ((PubSubPatternMessage)result).getPattern(); + channelName = ((PubSubPatternMessage)result).getPattern(); + } + PubSubEntry entry = entries.get(channelName); + if (entry != null) { + enqueueMessage(result, pubSubConnection, entry); } - PubSubEntry item = entries.get(ch); - enqueueMessage(result, pubSubConnection, item); } else { executor.execute(new Runnable() { @Override diff --git a/redisson/src/main/java/org/redisson/codec/CompositeCodec.java b/redisson/src/main/java/org/redisson/codec/CompositeCodec.java index aa1ce7380..0f5cba4c1 100644 --- a/redisson/src/main/java/org/redisson/codec/CompositeCodec.java +++ b/redisson/src/main/java/org/redisson/codec/CompositeCodec.java @@ -71,4 +71,9 @@ public class CompositeCodec implements Codec { return valueCodec.getValueEncoder(); } + @Override + public ClassLoader getClassLoader() { + return getClass().getClassLoader(); + } + } diff --git a/redisson/src/main/java/org/redisson/codec/FstCodec.java b/redisson/src/main/java/org/redisson/codec/FstCodec.java index e20ee10d6..c3b448c90 100644 --- a/redisson/src/main/java/org/redisson/codec/FstCodec.java +++ b/redisson/src/main/java/org/redisson/codec/FstCodec.java @@ -20,7 +20,7 @@ import java.io.IOException; import org.nustaq.serialization.FSTConfiguration; import org.nustaq.serialization.FSTObjectInput; import org.nustaq.serialization.FSTObjectOutput; -import org.redisson.client.codec.Codec; +import org.redisson.client.codec.BaseCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; @@ -39,7 +39,7 @@ import io.netty.buffer.ByteBufOutputStream; * @author Nikita Koksharov * */ -public class FstCodec implements Codec { +public class FstCodec extends BaseCodec { private final FSTConfiguration config; @@ -94,26 +94,6 @@ public class FstCodec implements Codec { } }; - @Override - public Decoder getMapValueDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapValueEncoder() { - return getValueEncoder(); - } - - @Override - public Decoder getMapKeyDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapKeyEncoder() { - return getValueEncoder(); - } - @Override public Decoder getValueDecoder() { return decoder; @@ -123,5 +103,14 @@ public class FstCodec implements Codec { public Encoder getValueEncoder() { return encoder; } + + @Override + public ClassLoader getClassLoader() { + if (config.getClassLoader() != null) { + return config.getClassLoader(); + } + + return super.getClassLoader(); + } } diff --git a/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java index 2ad1fb04d..cb00e6472 100755 --- a/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -21,7 +21,7 @@ import java.io.OutputStream; import javax.xml.datatype.XMLGregorianCalendar; -import org.redisson.client.codec.Codec; +import org.redisson.client.codec.BaseCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; @@ -32,6 +32,7 @@ import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators; +import com.fasterxml.jackson.core.JsonGenerator.Feature; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.MapperFeature; @@ -55,7 +56,7 @@ import io.netty.buffer.ByteBufOutputStream; * @author Nikita Koksharov * */ -public class JsonJacksonCodec implements Codec { +public class JsonJacksonCodec extends BaseCodec { public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec(); @@ -155,45 +156,36 @@ public class JsonJacksonCodec implements Codec { objectMapper.registerModule(new DefenceModule()); objectMapper.setSerializationInclusion(Include.NON_NULL); - objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker() - .withFieldVisibility(JsonAutoDetect.Visibility.ANY).withGetterVisibility(JsonAutoDetect.Visibility.NONE) - .withSetterVisibility(JsonAutoDetect.Visibility.NONE) - .withCreatorVisibility(JsonAutoDetect.Visibility.NONE)); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true); - objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); - objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); + objectMapper.setVisibility(objectMapper.getSerializationConfig() + .getDefaultVisibilityChecker() + .withFieldVisibility(JsonAutoDetect.Visibility.ANY) + .withGetterVisibility(JsonAutoDetect.Visibility.NONE) + .withSetterVisibility(JsonAutoDetect.Visibility.NONE) + .withCreatorVisibility(JsonAutoDetect.Visibility.NONE)); + objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + objectMapper.enable(Feature.WRITE_BIGDECIMAL_AS_PLAIN); + objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + objectMapper.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY); objectMapper.addMixIn(Throwable.class, ThrowableMixIn.class); } @Override - public Decoder getMapValueDecoder() { - return decoder; - } - - @Override - public Encoder getMapValueEncoder() { - return encoder; - } - - @Override - public Decoder getMapKeyDecoder() { + public Decoder getValueDecoder() { return decoder; } @Override - public Encoder getMapKeyEncoder() { + public Encoder getValueEncoder() { return encoder; } - + @Override - public Decoder getValueDecoder() { - return decoder; - } + public ClassLoader getClassLoader() { + if (mapObjectMapper.getTypeFactory().getClassLoader() != null) { + return mapObjectMapper.getTypeFactory().getClassLoader(); + } - @Override - public Encoder getValueEncoder() { - return encoder; + return super.getClassLoader(); } public ObjectMapper getObjectMapper() { diff --git a/redisson/src/main/java/org/redisson/codec/KryoCodec.java b/redisson/src/main/java/org/redisson/codec/KryoCodec.java index 639644285..8ee88fc50 100755 --- a/redisson/src/main/java/org/redisson/codec/KryoCodec.java +++ b/redisson/src/main/java/org/redisson/codec/KryoCodec.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import org.redisson.client.codec.Codec; +import org.redisson.client.codec.BaseCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; @@ -40,13 +40,15 @@ import io.netty.buffer.ByteBufOutputStream; * @author Nikita Koksharov * */ -public class KryoCodec implements Codec { +public class KryoCodec extends BaseCodec { public interface KryoPool { Kryo get(); void yield(Kryo kryo); + + ClassLoader getClassLoader(); } @@ -90,6 +92,11 @@ public class KryoCodec implements Codec { return kryo; } + @Override + public ClassLoader getClassLoader() { + return classLoader; + } + } public class RedissonKryoCodecException extends RuntimeException { @@ -171,26 +178,6 @@ public class KryoCodec implements Codec { this.kryoPool = kryoPool; } - @Override - public Decoder getMapValueDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapValueEncoder() { - return getValueEncoder(); - } - - @Override - public Decoder getMapKeyDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapKeyEncoder() { - return getValueEncoder(); - } - @Override public Decoder getValueDecoder() { return decoder; @@ -200,5 +187,13 @@ public class KryoCodec implements Codec { public Encoder getValueEncoder() { return encoder; } + + @Override + public ClassLoader getClassLoader() { + if (kryoPool.getClassLoader() != null) { + return kryoPool.getClassLoader(); + } + return super.getClassLoader(); + } } diff --git a/redisson/src/main/java/org/redisson/codec/LZ4Codec.java b/redisson/src/main/java/org/redisson/codec/LZ4Codec.java index ce203355c..0a0a174f4 100644 --- a/redisson/src/main/java/org/redisson/codec/LZ4Codec.java +++ b/redisson/src/main/java/org/redisson/codec/LZ4Codec.java @@ -18,6 +18,7 @@ package org.redisson.codec; import java.io.IOException; import java.nio.ByteBuffer; +import org.redisson.client.codec.BaseCodec; import org.redisson.client.codec.Codec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; @@ -41,7 +42,7 @@ import net.jpountz.lz4.LZ4SafeDecompressor; * @author Nikita Koksharov * */ -public class LZ4Codec implements Codec { +public class LZ4Codec extends BaseCodec { private static final int DECOMPRESSION_HEADER_SIZE = Integer.SIZE / 8; private final LZ4Factory factory = LZ4Factory.fastestInstance(); @@ -108,26 +109,6 @@ public class LZ4Codec implements Codec { } }; - @Override - public Decoder getMapValueDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapValueEncoder() { - return getValueEncoder(); - } - - @Override - public Decoder getMapKeyDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapKeyEncoder() { - return getValueEncoder(); - } - @Override public Decoder getValueDecoder() { return decoder; diff --git a/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java b/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java index 0e5e5e6de..bc0019208 100644 --- a/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java +++ b/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java @@ -104,4 +104,9 @@ public class MapCacheEventCodec implements Codec { return key; } + @Override + public ClassLoader getClassLoader() { + return getClass().getClassLoader(); + } + } diff --git a/redisson/src/main/java/org/redisson/codec/SerializationCodec.java b/redisson/src/main/java/org/redisson/codec/SerializationCodec.java index 9bdabf26c..724881e19 100644 --- a/redisson/src/main/java/org/redisson/codec/SerializationCodec.java +++ b/redisson/src/main/java/org/redisson/codec/SerializationCodec.java @@ -113,5 +113,13 @@ public class SerializationCodec implements Codec { public Encoder getValueEncoder() { return encoder; } + + @Override + public ClassLoader getClassLoader() { + if (classLoader != null) { + return classLoader; + } + return getClass().getClassLoader(); + } } diff --git a/redisson/src/main/java/org/redisson/codec/SnappyCodec.java b/redisson/src/main/java/org/redisson/codec/SnappyCodec.java index fa237999d..01364266e 100644 --- a/redisson/src/main/java/org/redisson/codec/SnappyCodec.java +++ b/redisson/src/main/java/org/redisson/codec/SnappyCodec.java @@ -17,6 +17,7 @@ package org.redisson.codec; import java.io.IOException; +import org.redisson.client.codec.BaseCodec; import org.redisson.client.codec.Codec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; @@ -36,7 +37,7 @@ import io.netty.handler.codec.compression.Snappy; * @author Nikita Koksharov * */ -public class SnappyCodec implements Codec { +public class SnappyCodec extends BaseCodec { private static final ThreadLocal snappyDecoder = new ThreadLocal() { protected Snappy initialValue() { @@ -110,26 +111,6 @@ public class SnappyCodec implements Codec { } }; - @Override - public Decoder getMapValueDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapValueEncoder() { - return getValueEncoder(); - } - - @Override - public Decoder getMapKeyDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapKeyEncoder() { - return getValueEncoder(); - } - @Override public Decoder getValueDecoder() { return decoder; @@ -140,4 +121,9 @@ public class SnappyCodec implements Codec { return encoder; } + @Override + public ClassLoader getClassLoader() { + return innerCodec.getClassLoader(); + } + } diff --git a/redisson/src/main/java/org/redisson/config/BaseConfig.java b/redisson/src/main/java/org/redisson/config/BaseConfig.java index b8bf838b6..1aa0a502a 100644 --- a/redisson/src/main/java/org/redisson/config/BaseConfig.java +++ b/redisson/src/main/java/org/redisson/config/BaseConfig.java @@ -17,6 +17,9 @@ package org.redisson.config; import java.net.URI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * * @author Nikita Koksharov @@ -24,6 +27,8 @@ import java.net.URI; * @param config type */ class BaseConfig> { + + private static final Logger log = LoggerFactory.getLogger("config"); /** * If pooled connection not used for a timeout time @@ -59,25 +64,6 @@ class BaseConfig> { private int retryInterval = 1500; - /** - * Reconnection attempt timeout to Redis server then - * it has been excluded from internal list of available servers. - * - * On every such timeout event Redisson tries - * to connect to disconnected Redis server. - * - * @see #failedAttempts - * - */ - private int reconnectionTimeout = 3000; - - /** - * Redis server will be excluded from the list of available nodes - * when sequential unsuccessful execution attempts of any Redis command - * reaches failedAttempts. - */ - private int failedAttempts = 3; - /** * Password for Redis authentication. Should be null if not needed */ @@ -125,8 +111,6 @@ class BaseConfig> { setPingTimeout(config.getPingTimeout()); setConnectTimeout(config.getConnectTimeout()); setIdleConnectionTimeout(config.getIdleConnectionTimeout()); - setFailedAttempts(config.getFailedAttempts()); - setReconnectionTimeout(config.getReconnectionTimeout()); setSslEnableEndpointIdentification(config.isSslEnableEndpointIdentification()); setSslProvider(config.getSslProvider()); setSslTruststore(config.getSslTruststore()); @@ -291,49 +275,24 @@ class BaseConfig> { return idleConnectionTimeout; } - /** - * Reconnection attempt timeout to Redis server when - * it has been excluded from internal list of available servers. - *

- * On every such timeout event Redisson tries - * to connect to disconnected Redis server. - *

- * Default is 3000 - * - * @see #failedAttempts - * - * @param slaveRetryTimeout - retry timeout in milliseconds - * @return config + /* + * Use setFailedSlaveReconnectionInterval instead */ - + @Deprecated public T setReconnectionTimeout(int slaveRetryTimeout) { - this.reconnectionTimeout = slaveRetryTimeout; + log.warn("'reconnectionTimeout' setting in unavailable. Please use 'failedSlaveReconnectionInterval' setting instead!"); return (T) this; } - public int getReconnectionTimeout() { - return reconnectionTimeout; - } - - /** - * Redis server will be excluded from the internal list of available nodes - * when sequential unsuccessful execution attempts of any Redis command - * on this server reaches failedAttempts. - *

- * Default is 3 - * - * @param slaveFailedAttempts - attempts - * @return config + /* + * Use setFailedSlaveCheckInterval instead */ + @Deprecated public T setFailedAttempts(int slaveFailedAttempts) { - this.failedAttempts = slaveFailedAttempts; + log.warn("'failedAttempts' setting in unavailable. Please use 'failedSlaveCheckInterval' setting instead!"); return (T) this; } - - public int getFailedAttempts() { - return failedAttempts; - } - + public boolean isSslEnableEndpointIdentification() { return sslEnableEndpointIdentification; } diff --git a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java index 406f10e97..ff606d632 100644 --- a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java @@ -41,6 +41,10 @@ public class BaseMasterSlaveServersConfigeach slave node */ @@ -82,6 +86,8 @@ public class BaseMasterSlaveServersConfig + * On every such timeout event Redisson tries + * to connect to disconnected Redis server. + *

+ * Default is 3000 + * + * @param failedSlavesReconnectionTimeout - retry timeout in milliseconds + * @return config + */ + + public T setFailedSlaveReconnectionInterval(int failedSlavesReconnectionTimeout) { + this.failedSlaveReconnectionInterval = failedSlavesReconnectionTimeout; + return (T) this; + } + + public int getFailedSlaveReconnectionInterval() { + return failedSlaveReconnectionInterval; + } + + + /** + * Redis Slave node is excluded from the internal list of available nodes + * when the time interval from the moment of first Redis command execution failure + * on this server reaches slaveFailsInterval value. + *

+ * Default is 60000 + * + * @param slaveFailsInterval - time interval in milliseconds + * @return config + */ + public T setFailedSlaveCheckInterval(int slaveFailsInterval) { + this.failedSlaveCheckInterval = slaveFailsInterval; + return (T) this; + } + public int getFailedSlaveCheckInterval() { + return failedSlaveCheckInterval; + } /** * Redis 'master' server connection pool size. diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 0d1dd6d5a..b622a457d 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -17,7 +17,7 @@ package org.redisson.connection; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.redisson.api.NodeType; import org.redisson.api.RFuture; @@ -57,7 +57,7 @@ public class ClientConnectionsEntry { private volatile NodeType nodeType; private ConnectionManager connectionManager; - private final AtomicInteger failedAttempts = new AtomicInteger(); + private final AtomicLong firstFailTime = new AtomicLong(0); public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize, ConnectionManager connectionManager, NodeType nodeType) { @@ -80,16 +80,19 @@ public class ClientConnectionsEntry { return nodeType; } - public void resetFailedAttempts() { - failedAttempts.set(0); + public void resetFirstFail() { + firstFailTime.set(0); } - public int getFailedAttempts() { - return failedAttempts.get(); + public boolean isFailed() { + if (firstFailTime.get() != 0) { + return System.currentTimeMillis() - firstFailTime.get() > connectionManager.getConfig().getFailedSlaveCheckInterval(); + } + return false; } - - public int incFailedAttempts() { - return failedAttempts.incrementAndGet(); + + public void trySetupFistFail() { + firstFailTime.compareAndSet(0, System.currentTimeMillis()); } public RedisClient getClient() { @@ -243,7 +246,7 @@ public class ClientConnectionsEntry { + ", freeSubscribeConnectionsCounter=" + freeSubscribeConnectionsCounter + ", freeConnectionsAmount=" + freeConnections.size() + ", freeConnectionsCounter=" + freeConnectionsCounter + ", freezed=" + freezed + ", freezeReason=" + freezeReason - + ", client=" + client + ", nodeType=" + nodeType + ", failedAttempts=" + failedAttempts + + ", client=" + client + ", nodeType=" + nodeType + ", firstFail=" + firstFailTime + "]"; } diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index cfab7553a..d6427cb24 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -29,6 +29,7 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandSyncService; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; @@ -112,9 +113,7 @@ public interface ConnectionManager { void unsubscribe(String channelName, AsyncSemaphore lock); - RFuture unsubscribe(String channelName, boolean temporaryDown); - - RFuture punsubscribe(String channelName, boolean temporaryDown); + RFuture unsubscribe(String channelName, PubSubType topicType); void punsubscribe(String channelName, AsyncSemaphore lock); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 1bfdffadb..10bfdebce 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -45,6 +45,8 @@ import org.redisson.client.RedisException; import org.redisson.client.RedisNodeNotFoundException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; +import org.redisson.client.RedisTimeoutException; +import org.redisson.client.SubscribeListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.pubsub.PubSubType; @@ -323,7 +325,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } protected void initTimer(MasterSlaveServersConfig config) { - int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()}; + int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()}; Arrays.sort(timeouts); int minTimeout = timeouts[0]; if (minTimeout % 100 != 0) { @@ -366,16 +368,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager { addEntry(slot, entry); } - if (config.getDnsMonitoringInterval() != -1) { - dnsMonitor = new DNSMonitor(this, f.getNow(), - config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup); - dnsMonitor.start(); - } + startDNSMonitoring(f.getNow()); } catch (RuntimeException e) { stopThreads(); throw e; } } + + protected void startDNSMonitoring(RedisClient masterHost) { + if (config.getDnsMonitoringInterval() != -1) { + dnsMonitor = new DNSMonitor(this, masterHost, + config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup); + dnsMonitor.start(); + } + } protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, HashSet slots) { @@ -411,8 +417,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { c.setConnectTimeout(cfg.getConnectTimeout()); c.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout()); - c.setFailedAttempts(cfg.getFailedAttempts()); - c.setReconnectionTimeout(cfg.getReconnectionTimeout()); + c.setFailedSlaveCheckInterval(cfg.getFailedSlaveCheckInterval()); + c.setFailedSlaveReconnectionInterval(cfg.getFailedSlaveReconnectionInterval()); c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize()); c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize()); c.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize()); @@ -500,7 +506,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public RFuture psubscribe(String channelName, Codec codec, RedisPubSubListener... listeners) { - return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, listeners); + return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, new RedissonPromise(), listeners); } @Override @@ -512,25 +518,36 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public RFuture subscribe(Codec codec, String channelName, RedisPubSubListener... listeners) { - return subscribe(PubSubType.SUBSCRIBE, codec, channelName, listeners); + return subscribe(PubSubType.SUBSCRIBE, codec, channelName, new RedissonPromise(), listeners); } private RFuture subscribe(final PubSubType type, final Codec codec, final String channelName, - final RedisPubSubListener... listeners) { + final RPromise promise, final RedisPubSubListener... listeners) { final AsyncSemaphore lock = getSemaphore(channelName); - final RPromise result = new RedissonPromise(); lock.acquire(new Runnable() { @Override public void run() { - if (result.isDone()) { + if (promise.isDone()) { lock.release(); return; } + final RPromise result = new RedissonPromise(); + result.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + subscribe(type, codec, channelName, promise, listeners); + return; + } + + promise.trySuccess(result.getNow()); + } + }); subscribe(codec, channelName, result, type, lock, listeners); } }); - return result; + return promise; } public RFuture subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { @@ -604,7 +621,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { for (RedisPubSubListener listener : listeners) { connEntry.addListener(channelName, listener); } - connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + SubscribeListener listener = connEntry.getSubscribeFuture(channelName, type); + final Future subscribeFuture = listener.getSuccessFuture(); + + subscribeFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!promise.trySuccess(connEntry)) { @@ -621,6 +641,15 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } }); + + newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (promise.tryFailure(new RedisTimeoutException())) { + subscribeFuture.cancel(false); + } + } + }, config.getRetryInterval(), TimeUnit.MILLISECONDS); } private void connect(final Codec codec, final String channelName, @@ -695,34 +724,54 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RFuture unsubscribe(final String channelName, boolean temporaryDown) { - final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); - if (entry == null) { - return null; - } - freePubSubConnections.remove(entry); - - final Codec entryCodec = entry.getConnection().getChannels().get(channelName); - if (temporaryDown) { - final RPromise result = new RedissonPromise(); - entry.unsubscribe(channelName, new BaseRedisPubSubListener() { - - @Override - public boolean onStatus(PubSubType type, String channel) { - if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) { - result.trySuccess(entryCodec); - return true; - } - return false; + public RFuture unsubscribe(final String channelName, final PubSubType topicType) { + final RPromise result = new RedissonPromise(); + final AsyncSemaphore lock = getSemaphore(channelName); + lock.acquire(new Runnable() { + @Override + public void run() { + final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); + if (entry == null) { + lock.release(); + result.trySuccess(null); + return; } - - }); - return result; - } - entry.unsubscribe(channelName, null); - return RedissonPromise.newSucceededFuture(entryCodec); + + freePubSubLock.acquire(new Runnable() { + @Override + public void run() { + freePubSubConnections.remove(entry); + freePubSubLock.release(); + + final Codec entryCodec = entry.getConnection().getChannels().get(channelName); + RedisPubSubListener listener = new BaseRedisPubSubListener() { + + @Override + public boolean onStatus(PubSubType type, String channel) { + if (type == topicType && channel.equals(channelName)) { + lock.release(); + result.trySuccess(entryCodec); + return true; + } + return false; + } + + }; + + if (topicType == PubSubType.PUNSUBSCRIBE) { + entry.punsubscribe(channelName, listener); + } else { + entry.unsubscribe(channelName, listener); + } + } + }); + } + }); + + return result; } + @Override public void punsubscribe(final String channelName, final AsyncSemaphore lock) { final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { @@ -749,36 +798,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { }); } - @Override - public RFuture punsubscribe(final String channelName, boolean temporaryDown) { - final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); - if (entry == null) { - return null; - } - freePubSubConnections.remove(entry); - - final Codec entryCodec = entry.getConnection().getChannels().get(channelName); - if (temporaryDown) { - final RPromise result = new RedissonPromise(); - entry.punsubscribe(channelName, new BaseRedisPubSubListener() { - - @Override - public boolean onStatus(PubSubType type, String channel) { - if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) { - result.trySuccess(entryCodec); - return true; - } - return false; - } - - }); - return result; - } - entry.punsubscribe(channelName, null); - return RedissonPromise.newSucceededFuture(entryCodec); - } - public MasterSlaveEntry getEntry(InetSocketAddress address) { for (MasterSlaveEntry entry : client2entry.values()) { InetSocketAddress addr = entry.getClient().getAddr(); @@ -801,6 +821,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return null; } + @Override public MasterSlaveEntry getEntry(RedisClient redisClient) { MasterSlaveEntry entry = client2entry.get(redisClient); if (entry != null) { @@ -923,7 +944,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void shutdown() { - shutdown(2, 15, TimeUnit.SECONDS);//default netty value + shutdown(0, 2, TimeUnit.SECONDS);//default netty value } @Override diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 6318f8ffa..c2d49c4e1 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -34,6 +34,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; @@ -234,18 +235,18 @@ public class MasterSlaveEntry { for (String channelName : redisPubSubConnection.getChannels().keySet()) { PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); Collection> listeners = pubSubEntry.getListeners(channelName); - reattachPubSubListeners(channelName, listeners, temporaryDown); + reattachPubSubListeners(channelName, listeners, PubSubType.UNSUBSCRIBE); } for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) { PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName); Collection> listeners = pubSubEntry.getListeners(channelName); - reattachPatternPubSubListeners(channelName, listeners, temporaryDown); + reattachPubSubListeners(channelName, listeners, PubSubType.PUNSUBSCRIBE); } } - private void reattachPubSubListeners(final String channelName, final Collection> listeners, boolean temporaryDown) { - RFuture subscribeCodec = connectionManager.unsubscribe(channelName, temporaryDown); + private void reattachPubSubListeners(final String channelName, final Collection> listeners, final PubSubType topicType) { + RFuture subscribeCodec = connectionManager.unsubscribe(channelName, topicType); if (listeners.isEmpty()) { return; } @@ -253,8 +254,16 @@ public class MasterSlaveEntry { subscribeCodec.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { + if (future.get() == null) { + return; + } + Codec subscribeCodec = future.get(); - subscribe(channelName, listeners, subscribeCodec); + if (topicType == PubSubType.PUNSUBSCRIBE) { + psubscribe(channelName, listeners, subscribeCodec); + } else { + subscribe(channelName, listeners, subscribeCodec); + } } }); @@ -273,26 +282,11 @@ public class MasterSlaveEntry { return; } - log.debug("resubscribed listeners of '{}' channel to '{}'", channelName, future.getNow().getConnection().getRedisClient()); + log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, future.getNow().getConnection().getRedisClient()); } }); } - private void reattachPatternPubSubListeners(final String channelName, final Collection> listeners, boolean temporaryDown) { - RFuture subscribeCodec = connectionManager.punsubscribe(channelName, temporaryDown); - if (listeners.isEmpty()) { - return; - } - - subscribeCodec.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - Codec subscribeCodec = future.get(); - psubscribe(channelName, listeners, subscribeCodec); - } - }); - } - private void psubscribe(final String channelName, final Collection> listeners, final Codec subscribeCodec) { RFuture subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()])); @@ -368,6 +362,10 @@ public class MasterSlaveEntry { return slaveBalancer.contains(addr); } + public int getAvailableClients() { + return slaveBalancer.getAvailableClients(); + } + public RFuture addSlave(URI address) { return addSlave(address, false, NodeType.SLAVE); } @@ -434,6 +432,10 @@ public class MasterSlaveEntry { return true; } + public boolean isSlaveUnfreezed(URI address) { + return slaveBalancer.isUnfreezed(address); + } + public boolean slaveUp(URI address, FreezeReason freezeReason) { if (!slaveBalancer.unfreeze(address, freezeReason)) { return false; @@ -530,7 +532,7 @@ public class MasterSlaveEntry { } public void unfreeze() { - masterEntry.resetFailedAttempts(); + masterEntry.resetFirstFail(); synchronized (masterEntry) { masterEntry.setFreezed(false); masterEntry.setFreezeReason(null); diff --git a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 67130dd22..71c1a2d8b 100644 --- a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -165,23 +165,18 @@ public class PubSubConnectionEntry { conn.psubscribe(codec, pattern); } - private SubscribeListener addSubscribeListener(String channel, PubSubType type) { - SubscribeListener subscribeListener = new SubscribeListener(channel, type); - SubscribeListener oldSubscribeListener = subscribeChannelListeners.putIfAbsent(channel, subscribeListener); - if (oldSubscribeListener != null) { - return oldSubscribeListener; - } else { - conn.addListener(subscribeListener); - return subscribeListener; - } - } - - public Future getSubscribeFuture(String channel, PubSubType type) { + public SubscribeListener getSubscribeFuture(String channel, PubSubType type) { SubscribeListener listener = subscribeChannelListeners.get(channel); if (listener == null) { - listener = addSubscribeListener(channel, type); + listener = new SubscribeListener(channel, type); + SubscribeListener oldSubscribeListener = subscribeChannelListeners.putIfAbsent(channel, listener); + if (oldSubscribeListener != null) { + listener = oldSubscribeListener; + } else { + conn.addListener(listener); + } } - return listener.getSuccessFuture(); + return listener; } public void unsubscribe(final String channel, final RedisPubSubListener listener) { diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 9a1d4845d..e9ca00f51 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -15,6 +15,7 @@ */ package org.redisson.connection; +import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -51,6 +52,7 @@ import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.resolver.AddressResolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ScheduledFuture; @@ -70,7 +72,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private final Set slaves = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); private final Set disconnectedSlaves = new HashSet(); + private String masterName; private ScheduledFuture monitorFuture; + private AddressResolver sentinelResolver; public SentinelConnectionManager(SentinelServersConfig cfg, Config config) { super(config); @@ -79,9 +83,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { throw new IllegalArgumentException("masterName parameter is not defined!"); } + this.masterName = cfg.getMasterName(); this.config = create(cfg); initTimer(this.config); + this.sentinelResolver = resolverGroup.getResolver(getGroup().next()); + for (URI addr : cfg.getSentinelAddresses()) { RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts()); try { @@ -133,7 +140,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String host = createAddress(ip, port); URI sentinelAddr = URIBuilder.create(host); - RFuture future = registerSentinel(cfg, sentinelAddr, this.config); + RFuture future = registerSentinel(sentinelAddr, this.config); connectionFutures.add(future); } @@ -159,6 +166,76 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { scheduleChangeCheck(cfg, null); } + @Override + protected void startDNSMonitoring(RedisClient masterHost) { + if (config.getDnsMonitoringInterval() == -1) { + return; + } + + scheduleSentinelDNSCheck(); + } + + protected void scheduleSentinelDNSCheck() { + monitorFuture = group.schedule(new Runnable() { + @Override + public void run() { + List sentinels = new ArrayList(SentinelConnectionManager.this.sentinels.values()); + + final AtomicInteger sentinelsCounter = new AtomicInteger(sentinels.size()); + FutureListener> commonListener = new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (sentinelsCounter.decrementAndGet() == 0) { + scheduleSentinelDNSCheck(); + } + } + }; + + for (final RedisClient client : sentinels) { + Future> allNodes = sentinelResolver.resolveAll(InetSocketAddress.createUnresolved(client.getAddr().getHostName(), client.getAddr().getPort())); + allNodes.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + log.error("Unable to resolve " + client.getAddr().getHostName(), future.cause()); + return; + } + + boolean clientFound = false; + for (InetSocketAddress addr : future.getNow()) { + boolean found = false; + for (RedisClient currentSentinel : SentinelConnectionManager.this.sentinels.values()) { + if (currentSentinel.getAddr().getAddress().getHostAddress().equals(addr.getAddress().getHostAddress()) + && currentSentinel.getAddr().getPort() == addr.getPort()) { + found = true; + break; + } + } + if (!found) { + URI uri = convert(addr.getAddress().getHostAddress(), "" + addr.getPort()); + registerSentinel(uri, getConfig()); + } + if (client.getAddr().getAddress().getHostAddress().equals(addr.getAddress().getHostAddress()) + && client.getAddr().getPort() == addr.getPort()) { + clientFound = true; + } + } + if (!clientFound) { + String addr = client.getAddr().getAddress().getHostAddress() + ":" + client.getAddr().getPort(); + RedisClient sentinel = SentinelConnectionManager.this.sentinels.remove(addr); + if (sentinel != null) { + sentinel.shutdownAsync(); + log.warn("sentinel: {} has down", addr); + } + } + } + }); + allNodes.addListener(commonListener); + } + } + }, config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS); + } + private void scheduleChangeCheck(final SentinelServersConfig cfg, final Iterator iterator) { monitorFuture = group.schedule(new Runnable() { @Override @@ -269,13 +346,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String masterHost = map.get("master-host"); String masterPort = map.get("master-port"); - if (!isUseSameMaster(ip, port, masterHost, masterPort)) { - continue; - } if (flags.contains("s_down") || flags.contains("disconnected")) { slaveDown(ip, port); continue; } + if (!isUseSameMaster(ip, port, masterHost, masterPort)) { + continue; + } String slaveAddr = createAddress(ip, port); currentSlaves.add(slaveAddr); @@ -305,7 +382,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { }); slavesFuture.addListener(commonListener); } - + RFuture>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName()); sentinelsFuture.addListener(new FutureListener>>() { @Override @@ -323,9 +400,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = map.get("ip"); String port = map.get("port"); - String host = createAddress(ip, port); - URI sentinelAddr = URIBuilder.create(host); - registerSentinel(cfg, sentinelAddr, getConfig()); + URI sentinelAddr = convert(ip, port); + registerSentinel(sentinelAddr, getConfig()); } } }); @@ -350,7 +426,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return entry; } - private RFuture registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) { + private RFuture registerSentinel(final URI addr, final MasterSlaveServersConfig c) { String key = addr.getHost() + ":" + addr.getPort(); RedisClient client = sentinels.get(key); if (client != null) { @@ -380,7 +456,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { log.debug("message {} from {}", msg, channel); if ("+sentinel".equals(channel)) { - onSentinelAdded(cfg, (String) msg, c); + onSentinelAdded((String) msg, c); } if ("+slave".equals(channel)) { onSlaveAdded(addr, (String) msg); @@ -392,7 +468,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { onNodeUp(addr, (String) msg); } if ("+switch-master".equals(channel)) { - onMasterChange(cfg, addr, (String) msg); + onMasterChange(addr, (String) msg); } } @@ -413,14 +489,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return RedissonPromise.newSucceededFuture(null); } - protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlaveServersConfig c) { + protected void onSentinelAdded(String msg, MasterSlaveServersConfig c) { String[] parts = msg.split(" "); if ("sentinel".equals(parts[0])) { String ip = parts[2]; String port = parts[3]; URI uri = convert(ip, port); - registerSentinel(cfg, uri, c); + registerSentinel(uri, c); } } @@ -446,8 +522,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { protected RFuture addSlave(final String ip, final String port, final String slaveAddr) { final RPromise result = new RedissonPromise(); // to avoid addition twice + final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); + final URI uri = convert(ip, port); if (slaves.add(slaveAddr) && !config.checkSkipSlavesInit()) { - final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); RFuture future = entry.addSlave(URIBuilder.create(slaveAddr)); future.addListener(new FutureListener() { @Override @@ -459,8 +536,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return; } - URI uri = convert(ip, port); - if (entry.slaveUp(uri, FreezeReason.MANAGER)) { + if (entry.isSlaveUnfreezed(uri) || entry.slaveUp(uri, FreezeReason.MANAGER)) { String slaveAddr = ip + ":" + port; log.info("slave: {} added", slaveAddr); result.trySuccess(null); @@ -469,7 +545,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { }); } else { - slaveUp(ip, port); + if (entry.hasSlave(uri)) { + slaveUp(ip, port); + } result.trySuccess(null); } return result; @@ -589,11 +667,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } - private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) { + private void onMasterChange(URI addr, String msg) { String[] parts = msg.split(" "); if (parts.length > 3) { - if (cfg.getMasterName().equals(parts[0])) { + if (masterName.equals(parts[0])) { String ip = parts[3]; String port = parts[4]; diff --git a/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java index 66e48ab4c..647b7152e 100644 --- a/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -55,8 +55,6 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { newconfig.setSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize()); newconfig.setConnectTimeout(cfg.getConnectTimeout()); newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout()); - newconfig.setFailedAttempts(cfg.getFailedAttempts()); - newconfig.setReconnectionTimeout(cfg.getReconnectionTimeout()); if (cfg.isDnsMonitoring()) { newconfig.setDnsMonitoringInterval(cfg.getDnsMonitoringInterval()); } else { diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancer.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancer.java index 333ab679e..3e712f94e 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancer.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancer.java @@ -19,6 +19,11 @@ import java.util.List; import org.redisson.connection.ClientConnectionsEntry; +/** + * + * @author Nikita Koksharov + * + */ public interface LoadBalancer { ClientConnectionsEntry getEntry(List clientsCopy); diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 12423b080..c36197e94 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -135,7 +135,7 @@ public class LoadBalancerManager { if ((freezeReason == FreezeReason.RECONNECT && entry.getFreezeReason() == FreezeReason.RECONNECT) || freezeReason != FreezeReason.RECONNECT) { - entry.resetFailedAttempts(); + entry.resetFirstFail(); entry.setFreezed(false); entry.setFreezeReason(null); return true; @@ -188,6 +188,11 @@ public class LoadBalancerManager { public boolean contains(InetSocketAddress addr) { return getEntry(addr) != null; } + + public boolean isUnfreezed(URI addr) { + ClientConnectionsEntry entry = getEntry(addr); + return !entry.isFreezed(); + } public boolean contains(URI addr) { return getEntry(addr) != null; @@ -222,10 +227,6 @@ public class LoadBalancerManager { return client2Entry.get(redisClient); } - protected String convert(InetSocketAddress addr) { - return addr.getAddress().getHostAddress() + ":" + addr.getPort(); - } - public RFuture getConnection(RedisCommand command, URI addr) { ClientConnectionsEntry entry = getEntry(addr); if (entry != null) { diff --git a/redisson/src/main/java/org/redisson/connection/balancer/RandomLoadBalancer.java b/redisson/src/main/java/org/redisson/connection/balancer/RandomLoadBalancer.java index 03706c7d1..9c5336b30 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/RandomLoadBalancer.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/RandomLoadBalancer.java @@ -28,6 +28,7 @@ import io.netty.util.internal.PlatformDependent; */ public class RandomLoadBalancer implements LoadBalancer { + @Override public ClientConnectionsEntry getEntry(List clientsCopy) { int ind = PlatformDependent.threadLocalRandom().nextInt(clientsCopy.size()); return clientsCopy.get(ind); diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 710fc6d08..3735e4fd5 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -239,7 +239,7 @@ abstract class ConnectionPool { protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { if (entry.getNodeType() == NodeType.SLAVE) { - return entry.getFailedAttempts() < config.getFailedAttempts(); + return !entry.isFailed(); } return true; } @@ -289,7 +289,7 @@ abstract class ConnectionPool { private void connectedSuccessful(ClientConnectionsEntry entry, RPromise promise, T conn) { if (entry.getNodeType() == NodeType.SLAVE) { - entry.resetFailedAttempts(); + entry.resetFirstFail(); } if (!promise.trySuccess(conn)) { releaseConnection(entry, conn); @@ -298,9 +298,11 @@ abstract class ConnectionPool { } private void promiseFailure(ClientConnectionsEntry entry, RPromise promise, Throwable cause) { - if (entry.getNodeType() == NodeType.SLAVE - && entry.incFailedAttempts() == config.getFailedAttempts()) { - checkForReconnect(entry, cause); + if (entry.getNodeType() == NodeType.SLAVE) { + entry.trySetupFistFail(); + if (entry.isFailed()) { + checkForReconnect(entry, cause); + } } releaseConnection(entry); @@ -310,14 +312,12 @@ abstract class ConnectionPool { private void promiseFailure(ClientConnectionsEntry entry, RPromise promise, T conn) { if (entry.getNodeType() == NodeType.SLAVE) { - int attempts = entry.incFailedAttempts(); - if (attempts == config.getFailedAttempts()) { + entry.trySetupFistFail(); + if (entry.isFailed()) { conn.closeAsync(); checkForReconnect(entry, null); - } else if (attempts < config.getFailedAttempts()) { - releaseConnection(entry, conn); } else { - conn.closeAsync(); + releaseConnection(entry, conn); } } else { releaseConnection(entry, conn); @@ -331,7 +331,8 @@ abstract class ConnectionPool { private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) { if (masterSlaveEntry.slaveDown(entry, FreezeReason.RECONNECT)) { - log.error("slave " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause); + log.error("slave " + entry.getClient().getAddr() + " has been disconnected after " + + config.getFailedSlaveCheckInterval() + " time interval since moment of first failed connection", cause); scheduleCheck(entry); } } @@ -385,24 +386,13 @@ abstract class ConnectionPool { } if (future.isSuccess() && "PONG".equals(future.getNow())) { - entry.resetFailedAttempts(); + entry.resetFirstFail(); RPromise promise = new RedissonPromise(); promise.addListener(new FutureListener() { @Override - public void operationComplete(Future future) - throws Exception { - if (entry.getNodeType() == NodeType.SLAVE) { - masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT); - log.info("slave {} has been successfully reconnected", entry.getClient().getAddr()); - } else { - synchronized (entry) { - if (entry.getFreezeReason() == FreezeReason.RECONNECT) { - entry.setFreezed(false); - entry.setFreezeReason(null); - log.info("host {} has been successfully reconnected", entry.getClient().getAddr()); - } - } - } + public void operationComplete(Future future) throws Exception { + masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT); + log.info("slave {} has been successfully reconnected", entry.getClient().getAddr()); } }); initConnections(entry, promise, false); @@ -431,7 +421,7 @@ abstract class ConnectionPool { } }); } - }, config.getReconnectionTimeout(), TimeUnit.MILLISECONDS); + }, config.getFailedSlaveReconnectionInterval(), TimeUnit.MILLISECONDS); } private void ping(RedisConnection c, final FutureListener pingListener) { diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 3f33a4966..0d3427918 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -48,6 +48,7 @@ public class TasksRunnerService implements RemoteExecutorService { private final ClassLoaderDelegator classLoader = new ClassLoaderDelegator(); private final Codec codec; + private final ClassLoader codecClassLoader; private final String name; private final CommandExecutor commandExecutor; @@ -68,6 +69,7 @@ public class TasksRunnerService implements RemoteExecutorService { this.responses = responses; try { + this.codecClassLoader = codec.getClassLoader(); this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader); } catch (Exception e) { throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); @@ -166,7 +168,7 @@ public class TasksRunnerService implements RemoteExecutorService { try { buf.writeBytes(state); - RedissonClassLoader cl = new RedissonClassLoader(getClass().getClassLoader()); + RedissonClassLoader cl = new RedissonClassLoader(codecClassLoader); cl.loadClass(className, classBody); classLoader.setCurrentClassLoader(cl); @@ -199,7 +201,7 @@ public class TasksRunnerService implements RemoteExecutorService { try { buf.writeBytes(state); - RedissonClassLoader cl = new RedissonClassLoader(getClass().getClassLoader()); + RedissonClassLoader cl = new RedissonClassLoader(codecClassLoader); cl.loadClass(className, classBody); classLoader.setCurrentClassLoader(cl); diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index 38e1fdeeb..bfb8dec63 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -1442,7 +1442,7 @@ public class JCache extends RedissonObject implements Cache { private V getAndRemoveValue(K key) { double syncId = PlatformDependent.threadLocalRandom().nextDouble(); - List result = evalWrite(getName(), codec, RedisCommands.EVAL_MAP_VALUE, + List result = evalWrite(getName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST, "local value = redis.call('hget', KEYS[1], ARGV[2]); " + "if value == false then " + "return {nil}; " diff --git a/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java b/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java index d398089a0..0ceaee4bf 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java +++ b/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java @@ -106,5 +106,10 @@ public class JCacheEventCodec implements Codec { public Encoder getValueEncoder() { throw new UnsupportedOperationException(); } + + @Override + public ClassLoader getClassLoader() { + return getClass().getClassLoader(); + } } diff --git a/redisson/src/test/java/org/redisson/ErrorsCodec.java b/redisson/src/test/java/org/redisson/ErrorsCodec.java index c9fd44ca9..af719f5d8 100644 --- a/redisson/src/test/java/org/redisson/ErrorsCodec.java +++ b/redisson/src/test/java/org/redisson/ErrorsCodec.java @@ -17,14 +17,14 @@ package org.redisson; import java.io.IOException; -import org.redisson.client.codec.Codec; +import org.redisson.client.codec.BaseCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import io.netty.buffer.ByteBuf; -public class ErrorsCodec implements Codec { +public class ErrorsCodec extends BaseCodec { public static final ErrorsCodec INSTANCE = new ErrorsCodec(); @@ -55,24 +55,4 @@ public class ErrorsCodec implements Codec { return encoder; } - @Override - public Decoder getMapValueDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapValueEncoder() { - return getValueEncoder(); - } - - @Override - public Decoder getMapKeyDecoder() { - return getValueDecoder(); - } - - @Override - public Encoder getMapKeyEncoder() { - return getValueEncoder(); - } - } diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index aac375305..aeba7cefc 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -38,7 +38,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { return redisson.getBlockingQueue("queue"); } - @Test +// @Test public void testPollWithBrokenConnection() throws IOException, InterruptedException, ExecutionException { RedisProcess runner = new RedisRunner() .nosave() @@ -285,7 +285,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { long s = System.currentTimeMillis(); Assert.assertNull(queue1.poll(5, TimeUnit.SECONDS)); - Assert.assertTrue(System.currentTimeMillis() - s > 5000); + Assert.assertTrue(System.currentTimeMillis() - s > 4900); } @Test public void testAwait() throws InterruptedException { diff --git a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java index 4edc65c77..71f8f4c22 100644 --- a/redisson/src/test/java/org/redisson/RedissonRedLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRedLockTest.java @@ -157,6 +157,39 @@ public class RedissonRedLockTest { assertThat(redis2.stop()).isEqualTo(0); } + @Test + public void testLockSuccess2() throws IOException, InterruptedException { + RedisProcess redis1 = redisTestMultilockInstance(); + RedisProcess redis2 = redisTestMultilockInstance(); + + RedissonClient client1 = createClient(redis1.getRedisServerAddressAndPort()); + RedissonClient client2 = createClient(redis2.getRedisServerAddressAndPort()); + + RLock lock1 = client1.getLock("lock1"); + RLock lock2 = client1.getLock("lock2"); + RLock lock3 = client2.getLock("lock3"); + + Thread t1 = new Thread() { + public void run() { + lock2.lock(); + }; + }; + t1.start(); + t1.join(); + + RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); + + assertThat(lock.tryLock(500, 5000, TimeUnit.MILLISECONDS)).isTrue(); + Thread.sleep(3000); + + lock.unlock(); + + client1.shutdown(); + client2.shutdown(); + + assertThat(redis1.stop()).isEqualTo(0); + assertThat(redis2.stop()).isEqualTo(0); + } @Test public void testLockSuccess() throws IOException, InterruptedException { diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 5a04527e4..77acc07d4 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -25,15 +25,19 @@ import org.junit.BeforeClass; import org.junit.Test; import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.RedisProcess; +import org.redisson.api.RFuture; +import org.redisson.api.RPatternTopic; import org.redisson.api.RSet; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.listener.BaseStatusListener; import org.redisson.api.listener.MessageListener; +import org.redisson.api.listener.PatternMessageListener; import org.redisson.api.listener.StatusListener; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.config.Config; +import org.redisson.connection.balancer.RandomLoadBalancer; public class RedissonTopicTest { @@ -166,19 +170,29 @@ public class RedissonTopicTest { RTopic stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE); for (int i = 0; i < 3; i++) { - AtomicBoolean stringMessageReceived = new AtomicBoolean(); + AtomicInteger stringMessageReceived = new AtomicInteger(); int listenerId = stringTopic.addListener(new MessageListener() { @Override public void onMessage(String channel, String msg) { assertThat(msg).isEqualTo("testmsg"); - stringMessageReceived.set(true); + stringMessageReceived.incrementAndGet(); } }); + RPatternTopic patternTopic = redisson.getPatternTopic("test*", StringCodec.INSTANCE); + int patternListenerId = patternTopic.addListener(new PatternMessageListener() { + @Override + public void onMessage(String pattern, String channel, String msg) { + assertThat(msg).isEqualTo("testmsg"); + stringMessageReceived.incrementAndGet(); + } + }); + stringTopic.publish("testmsg"); - await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived); + await().atMost(Duration.ONE_SECOND).until(() -> stringMessageReceived.get() == 2); stringTopic.removeListener(listenerId); + patternTopic.removeListener(patternListenerId); } redisson.shutdown(); @@ -514,6 +528,20 @@ public class RedissonTopicTest { runner.stop(); } +// @Test + public void testReattachInSentinelLong() throws Exception { + for (int i = 0; i < 25; i++) { + testReattachInSentinel(); + } + } + +// @Test + public void testReattachInClusterLong() throws Exception { + for (int i = 0; i < 25; i++) { + testReattachInCluster(); + } + } + @Test public void testReattachInSentinel() throws Exception { RedisRunner.RedisProcess master = new RedisRunner() @@ -557,7 +585,9 @@ public class RedissonTopicTest { Thread.sleep(5000); Config config = new Config(); - config.useSentinelServers().addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); + config.useSentinelServers() + .setLoadBalancer(new RandomLoadBalancer()) + .addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); RedissonClient redisson = Redisson.create(config); final AtomicBoolean executed = new AtomicBoolean(); @@ -582,6 +612,8 @@ public class RedissonTopicTest { } }); + sendCommands(redisson); + sentinel1.stop(); sentinel2.stop(); sentinel3.stop(); @@ -590,7 +622,7 @@ public class RedissonTopicTest { slave2.stop(); Thread.sleep(TimeUnit.SECONDS.toMillis(20)); - + master = new RedisRunner() .port(6390) .nosave() @@ -632,7 +664,7 @@ public class RedissonTopicTest { redisson.getTopic("topic").publish(1); - await().atMost(10, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); + await().atMost(20, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); Assert.assertTrue(executed.get()); redisson.shutdown(); @@ -643,6 +675,28 @@ public class RedissonTopicTest { slave1.stop(); slave2.stop(); } + + protected void sendCommands(RedissonClient redisson) { + Thread t = new Thread() { + public void run() { + List> futures = new ArrayList>(); + + for (int i = 0; i < 100; i++) { + RFuture f1 = redisson.getBucket("i" + i).getAsync(); + RFuture f2 = redisson.getBucket("i" + i).setAsync(""); + RFuture f3 = redisson.getTopic("topic").publishAsync("testmsg"); + futures.add(f1); + futures.add(f2); + futures.add(f3); + } + + for (RFuture rFuture : futures) { + rFuture.awaitUninterruptibly(); + } + }; + }; + t.start(); + } @Test public void testReattachInCluster() throws Exception { @@ -662,6 +716,7 @@ public class RedissonTopicTest { Config config = new Config(); config.useClusterServers() + .setLoadBalancer(new RandomLoadBalancer()) .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); RedissonClient redisson = Redisson.create(config); @@ -687,6 +742,8 @@ public class RedissonTopicTest { } }); + sendCommands(redisson); + process.getNodes().stream().filter(x -> Arrays.asList(slave1.getPort(), slave2.getPort(), slave3.getPort()).contains(x.getRedisServerPort())) .forEach(x -> { try { diff --git a/redisson/src/test/java/org/redisson/jcache/JCacheTest.java b/redisson/src/test/java/org/redisson/jcache/JCacheTest.java index 8a069ba2a..4592d0067 100644 --- a/redisson/src/test/java/org/redisson/jcache/JCacheTest.java +++ b/redisson/src/test/java/org/redisson/jcache/JCacheTest.java @@ -31,6 +31,7 @@ import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.RedisRunner.RedisProcess; import org.redisson.client.codec.JsonJacksonMapCodec; import org.redisson.codec.JsonJacksonCodec; +import org.redisson.codec.SnappyCodec; import org.redisson.config.Config; import org.redisson.jcache.configuration.RedissonConfiguration; @@ -65,7 +66,7 @@ public class JCacheTest extends BaseTest { runner.stop(); } - + @Test public void testRedissonConfig() throws InterruptedException, IllegalArgumentException, URISyntaxException, IOException { RedisProcess runner = new RedisRunner() @@ -84,6 +85,16 @@ public class JCacheTest extends BaseTest { cache.put("1", "2"); Assert.assertEquals("2", cache.get("1")); + cache.put("key", "value"); + String result = cache.getAndRemove("key"); + + Assert.assertEquals("value", result); + Assert.assertNull(cache.get("key")); + + cache.put("key", "value"); + cache.remove("key"); + Assert.assertNull(cache.get("key")); + cache.close(); runner.stop(); } diff --git a/redisson/src/test/java/org/redisson/spring/support/SpringNamespaceWikiTest.java b/redisson/src/test/java/org/redisson/spring/support/SpringNamespaceWikiTest.java index 5728f5649..508dddc32 100644 --- a/redisson/src/test/java/org/redisson/spring/support/SpringNamespaceWikiTest.java +++ b/redisson/src/test/java/org/redisson/spring/support/SpringNamespaceWikiTest.java @@ -135,8 +135,6 @@ public class SpringNamespaceWikiTest { assertEquals(40000, single.getTimeout()); assertEquals(5, single.getRetryAttempts()); assertEquals(60000, single.getRetryInterval()); - assertEquals(70000, single.getReconnectionTimeout()); - assertEquals(8, single.getFailedAttempts()); assertEquals("do_not_use_if_it_is_not_set", single.getPassword()); assertEquals(10, single.getSubscriptionsPerConnection()); assertEquals("client_name", single.getClientName());