From 1b89afbd6f88a9c5179e23f8cb345ca32cc76452 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 20 Jul 2017 10:53:26 +0300 Subject: [PATCH 1/8] RSemaphoreReactive added #977 --- .../java/org/redisson/RedissonReactive.java | 10 + .../java/org/redisson/RedissonSemaphore.java | 12 +- .../org/redisson/api/RSemaphoreReactive.java | 179 ++++++++++++++++++ .../redisson/api/RedissonReactiveClient.java | 8 + .../reactive/RedissonSemaphoreReactive.java | 152 +++++++++++++++ 5 files changed, 356 insertions(+), 5 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 8f777a8a8..19596e3f9 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -43,6 +43,7 @@ import org.redisson.api.RQueueReactive; import org.redisson.api.RReadWriteLockReactive; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScriptReactive; +import org.redisson.api.RSemaphoreReactive; import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetReactive; import org.redisson.api.RTopicReactive; @@ -55,6 +56,7 @@ import org.redisson.config.Config; import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; +import org.redisson.pubsub.SemaphorePubSub; import org.redisson.reactive.RedissonAtomicLongReactive; import org.redisson.reactive.RedissonBatchReactive; import org.redisson.reactive.RedissonBitSetReactive; @@ -73,6 +75,7 @@ import org.redisson.reactive.RedissonQueueReactive; import org.redisson.reactive.RedissonReadWriteLockReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive; import org.redisson.reactive.RedissonScriptReactive; +import org.redisson.reactive.RedissonSemaphoreReactive; import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetReactive; import org.redisson.reactive.RedissonTopicReactive; @@ -91,7 +94,9 @@ public class RedissonReactive implements RedissonReactiveClient { protected final ConnectionManager connectionManager; protected final Config config; protected final CodecProvider codecProvider; + protected final UUID id = UUID.randomUUID(); + protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(); protected RedissonReactive(Config config) { this.config = config; @@ -103,6 +108,11 @@ public class RedissonReactive implements RedissonReactiveClient { codecProvider = config.getCodecProvider(); } + @Override + public RSemaphoreReactive getSemaphore(String name) { + return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub); + } + @Override public RReadWriteLockReactive getReadWriteLock(String name) { return new RedissonReadWriteLockReactive(commandExecutor, name, id); diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 14cbd956c..bea274380 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -27,7 +27,7 @@ import org.redisson.api.RSemaphore; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RPromise; import org.redisson.pubsub.SemaphorePubSub; @@ -48,9 +48,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { private final SemaphorePubSub semaphorePubSub; - final CommandExecutor commandExecutor; + final CommandAsyncExecutor commandExecutor; - protected RedissonSemaphore(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) { + public RedissonSemaphore(CommandAsyncExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.semaphorePubSub = semaphorePubSub; @@ -478,7 +478,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public int drainPermits() { - Long res = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, + RFuture future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local value = redis.call('get', KEYS[1]); " + "if (value == false or value == 0) then " + "return 0; " + @@ -486,12 +486,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { "redis.call('set', KEYS[1], 0); " + "return value;", Collections.singletonList(getName())); + Long res = get(future); return res.intValue(); } @Override public int availablePermits() { - Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); + RFuture future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); + Long res = get(future); return res.intValue(); } diff --git a/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java b/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java new file mode 100644 index 000000000..f9deb34e6 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java @@ -0,0 +1,179 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RSemaphoreReactive extends RExpirableReactive { + + /** + * Acquires a permit only if one is available at the + * time of invocation. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then this method will return + * immediately with the value {@code false}. + * + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + Publisher tryAcquire(); + + /** + * Acquires the given number of permits only if all are available at the + * time of invocation. + * + *

Acquires a permits, if all are available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by given number of permits. + * + *

If no permits are available then this method will return + * immediately with the value {@code false}. + * + * @param permits the number of permits to acquire + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + Publisher tryAcquire(int permits); + + /** + * Acquires a permit from this semaphore. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + * + * @return void + * + */ + Publisher acquire(); + + /** + * Acquires the given number of permits, if they are available, + * and returns immediately, reducing the number of available permits + * by the given amount. + * + * @param permits the number of permits to acquire + * @throws IllegalArgumentException if {@code permits} is negative + * @return void + */ + Publisher acquire(int permits); + + /** + * Releases a permit, returning it to the semaphore. + * + *

Releases a permit, increasing the number of available permits by + * one. If any threads of Redisson client are trying to acquire a permit, + * then one is selected and given the permit that was just released. + * + *

There is no requirement that a thread that releases a permit must + * have acquired that permit by calling {@link #acquire()}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * @return void + */ + Publisher release(); + + /** + * Releases the given number of permits, returning them to the semaphore. + * + *

Releases the given number of permits, increasing the number of available permits by + * the given number of permits. If any threads of Redisson client are trying to + * acquire a permits, then next threads is selected and tries to acquire the permits that was just released. + * + *

There is no requirement that a thread that releases a permits must + * have acquired that permit by calling {@link #acquire()}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * @param permits amount + * @return void + */ + Publisher release(int permits); + + /** + * Sets number of permits. + * + * @param permits - number of permits + * @return true if permits has been set successfully, otherwise false. + */ + Publisher trySetPermits(int permits); + + /** + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If a permit is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param waitTime the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + Publisher tryAcquire(long waitTime, TimeUnit unit); + + /** + * Acquires the given number of permits only if all are available + * within the given waiting time. + * + *

Acquires a permits, if all are available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If a permits is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param permits amount + * @param waitTime the maximum time to wait for a available permits + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + Publisher tryAcquire(int permits, long waitTime, TimeUnit unit); + + /** + * Shrinks the number of available permits by the indicated + * reduction. This method can be useful in subclasses that use + * semaphores to track resources that become unavailable. This + * method differs from {@link #acquire()} in that it does not block + * waiting for permits to become available. + * + * @param permits - reduction the number of permits to remove + * @return void + * @throws IllegalArgumentException if {@code reduction} is negative + */ + Publisher reducePermits(int permits); + + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index 1105eb7a4..31689bf14 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -30,6 +30,14 @@ import org.redisson.config.Config; */ public interface RedissonReactiveClient { + /** + * Returns semaphore instance by name + * + * @param name - name of object + * @return Semaphore object + */ + RSemaphoreReactive getSemaphore(String name); + /** * Returns readWriteLock instance by name. * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java new file mode 100644 index 000000000..de60f6861 --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java @@ -0,0 +1,152 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonLock; +import org.redisson.RedissonSemaphore; +import org.redisson.api.RFuture; +import org.redisson.api.RLockAsync; +import org.redisson.api.RSemaphoreAsync; +import org.redisson.api.RSemaphoreReactive; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.pubsub.SemaphorePubSub; + +import reactor.fn.Supplier; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonSemaphoreReactive extends RedissonExpirableReactive implements RSemaphoreReactive { + + private final RSemaphoreAsync instance; + + public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { + super(connectionManager, name); + instance = new RedissonSemaphore(commandExecutor, name, semaphorePubSub); + } + + protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) { + return new RedissonLock(commandExecutor, name, id); + } + + @Override + public Publisher tryAcquire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(); + } + }); + } + + @Override + public Publisher tryAcquire(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(permits); + } + }); + } + + @Override + public Publisher acquire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.acquireAsync(); + } + }); + } + + @Override + public Publisher acquire(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.acquireAsync(permits); + } + }); + } + + @Override + public Publisher release() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.releaseAsync(); + } + }); + } + + @Override + public Publisher release(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.releaseAsync(permits); + } + }); + } + + @Override + public Publisher trySetPermits(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.trySetPermitsAsync(permits); + } + }); + } + + @Override + public Publisher tryAcquire(final long waitTime, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(waitTime, unit); + } + }); + } + + @Override + public Publisher tryAcquire(final int permits, final long waitTime, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(permits, waitTime, unit); + } + }); + } + + @Override + public Publisher reducePermits(final int permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.reducePermitsAsync(permits); + } + }); + } + +} From 69d7aff15ef89668952539bc107b2f13fa8590ce Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 20 Jul 2017 10:54:29 +0300 Subject: [PATCH 2/8] RLocalCachedMap doesn't work with non-json and non-binary codecs #976 --- .../org/redisson/RedissonLocalCachedMap.java | 44 ++++++++++++++++++- .../redisson/RedissonLocalCachedMapTest.java | 28 +++++++++++- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 2ca4e9a86..88f645330 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -15,6 +15,7 @@ */ package org.redisson; +import java.io.IOException; import java.io.Serializable; import java.math.BigDecimal; import java.util.AbstractCollection; @@ -60,13 +61,17 @@ import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.codec.JsonJacksonCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.misc.Hash; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonObjectFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.ThreadLocalRandom; @@ -178,6 +183,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R private int invalidationListenerId; private int invalidationStatusListenerId; private volatile long lastInvalidate; + private Codec topicCodec; protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) { super(commandExecutor, name, redisson, options); @@ -207,7 +213,28 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } private void addListeners(String name, final LocalCachedMapOptions options, final RedissonClient redisson) { - invalidationTopic = new RedissonTopic(commandExecutor, suffixName(name, "topic")); + topicCodec = codec; + + LocalCachedMapInvalidate msg = new LocalCachedMapInvalidate(new byte[] {1, 2, 3}, new byte[] {4, 5, 6}); + ByteBuf buf = null; + try { + byte[] s = topicCodec.getValueEncoder().encode(msg); + buf = Unpooled.wrappedBuffer(s); + msg = (LocalCachedMapInvalidate) topicCodec.getValueDecoder().decode(buf, null); + if (!Arrays.equals(msg.getExcludedId(), new byte[] {1, 2, 3}) + || !Arrays.equals(msg.getKeyHashes()[0], new byte[] {4, 5, 6})) { + throw new IllegalArgumentException(); + } + } catch (Exception e) { + log.warn("Defined {} codec doesn't encode service messages properly. Default JsonJacksonCodec used to encode messages!", topicCodec.getClass()); + topicCodec = new JsonJacksonCodec(); + } finally { + if (buf != null) { + buf.release(); + } + } + + invalidationTopic = new RedissonTopic(topicCodec, commandExecutor, suffixName(name, "topic")); if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) { return; @@ -1345,4 +1372,19 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return future; } + protected byte[] encode(Object value) { + if (commandExecutor.isRedissonReferenceSupportEnabled()) { + RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value); + if (reference != null) { + value = reference; + } + } + + try { + return topicCodec.getValueEncoder().encode(value); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + } diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index b1d57e481..d4f011772 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -8,7 +8,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; import org.junit.Assert; import org.junit.Test; @@ -22,6 +21,7 @@ import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy; import org.redisson.api.RLocalCachedMap; import org.redisson.api.RMap; import org.redisson.cache.Cache; +import org.redisson.client.codec.StringCodec; import mockit.Deencapsulation; @@ -148,6 +148,32 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { } }.execute(); } + + @Test + public void testInvalidationOnUpdateNonBinaryCodec() throws InterruptedException { + LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5); + RLocalCachedMap map1 = redisson.getLocalCachedMap("test", new StringCodec(), options); + Cache cache1 = Deencapsulation.getField(map1, "cache"); + + RLocalCachedMap map2 = redisson.getLocalCachedMap("test", new StringCodec(), options); + Cache cache2 = Deencapsulation.getField(map2, "cache"); + + map1.put("1", "1"); + map1.put("2", "2"); + + assertThat(map2.get("1")).isEqualTo("1"); + assertThat(map2.get("2")).isEqualTo("2"); + + assertThat(cache1.size()).isEqualTo(2); + assertThat(cache2.size()).isEqualTo(2); + + map1.put("1", "3"); + map2.put("2", "4"); + Thread.sleep(50); + + assertThat(cache1.size()).isEqualTo(1); + assertThat(cache2.size()).isEqualTo(1); + } @Test public void testInvalidationOnUpdate() throws InterruptedException { From 8eda8c052d84379d211b935cf55e2df86152156c Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 21 Jul 2017 12:17:25 +0300 Subject: [PATCH 3/8] Fixed - NullValue does not implement Serializable #981 --- .../src/main/java/org/redisson/spring/cache/NullValue.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/spring/cache/NullValue.java b/redisson/src/main/java/org/redisson/spring/cache/NullValue.java index 563c26066..193bcf2cf 100644 --- a/redisson/src/main/java/org/redisson/spring/cache/NullValue.java +++ b/redisson/src/main/java/org/redisson/spring/cache/NullValue.java @@ -15,6 +15,8 @@ */ package org.redisson.spring.cache; +import java.io.Serializable; + import org.springframework.cache.Cache.ValueWrapper; /** @@ -22,8 +24,10 @@ import org.springframework.cache.Cache.ValueWrapper; * @author Nikita Koksharov * */ -public class NullValue implements ValueWrapper { +public class NullValue implements ValueWrapper, Serializable { + private static final long serialVersionUID = -8310337775544536701L; + public static final NullValue INSTANCE = new NullValue(); @Override From 90f1d528ca907f91b21bb1f3d595666151ce3987 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 25 Jul 2017 12:22:52 +0300 Subject: [PATCH 4/8] Fixed - ContextClassLoader should be used by Redisson Codec for Tomcat session's object serialization. #974 --- .../org/redisson/tomcat/RedissonSessionManager.java | 10 ++++++++++ .../org/redisson/tomcat/RedissonSessionManager.java | 6 ++++++ 2 files changed, 16 insertions(+) diff --git a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 21c7cf3b3..bbc6f5e24 100644 --- a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -28,6 +28,7 @@ import org.apache.juli.logging.LogFactory; import org.redisson.Redisson; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; +import org.redisson.client.codec.Codec; import org.redisson.config.Config; /** @@ -135,6 +136,15 @@ public class RedissonSessionManager extends ManagerBase { } try { + try { + Config c = new Config(config); + Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class) + .newInstance(Thread.currentThread().getContextClassLoader()); + config.setCodec(codec); + } catch (Exception e) { + throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); + } + redisson = Redisson.create(config); } catch (Exception e) { throw new LifecycleException(e); diff --git a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index df588284b..ec97729ea 100644 --- a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -27,6 +27,7 @@ import org.apache.juli.logging.LogFactory; import org.redisson.Redisson; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; +import org.redisson.client.codec.Codec; import org.redisson.config.Config; /** @@ -134,6 +135,11 @@ public class RedissonSessionManager extends ManagerBase { } try { + Config c = new Config(config); + Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class) + .newInstance(Thread.currentThread().getContextClassLoader()); + config.setCodec(codec); + redisson = Redisson.create(config); } catch (Exception e) { throw new LifecycleException(e); From 365601871ebf50b3ad1d16cfa6f72cf50fcf5c14 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 26 Jul 2017 16:59:40 +0300 Subject: [PATCH 5/8] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a5af5193..7b5201860 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Feature - ability to submit few tasks atomically (in batch) through `RExecutorSe Feature - [Config.keepPubSubOrder](https://github.com/redisson/redisson/wiki/2.-Configuration#keeppubsuborder) setting added Improvement - make `RMapReactive` and `RMapCacheReactive` interfaces match with `RMap` and `RMapCache` Improvement - `RLexSortedSet` should extend `RSortedSet` +Fixed - connection listener is not invoked in some cases Fixed - `RMapCache` `remove`, `put`, `putIfAbsent` and `replace` methods aren't respect entry expiration Fixed - `SCAN` command should be used in `RKeys.deleteByPattern` method Fixed - `RBinaryStream` doesn't work in Redis cluster environment From 42d6df57525d589c763746d4ae431b49953755ab Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 27 Jul 2017 14:52:09 +0300 Subject: [PATCH 6/8] ConnectionWatchdog should handle timer's IllegalStateException properly --- .../client/handler/ConnectionWatchdog.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 17dd7bbdb..1098f8b0a 100644 --- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -82,12 +82,16 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { return; } - timer.newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1)); - } - }, timeout, TimeUnit.MILLISECONDS); + try { + timer.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1)); + } + }, timeout, TimeUnit.MILLISECONDS); + } catch (IllegalStateException e) { + // skip + } } private void tryReconnect(final RedisConnection connection, final int nextAttempt) { From f0ab27bf5ca00a7d491c97213f30ded641cc43bf Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 27 Jul 2017 14:52:18 +0300 Subject: [PATCH 7/8] Error message fixed --- .../src/main/java/org/redisson/executor/TasksRunnerService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 71bc94cd6..d865ae515 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -67,7 +67,7 @@ public class TasksRunnerService implements RemoteExecutorService, RemoteParams { try { this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader); } catch (Exception e) { - throw new IllegalStateException(e); + throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); } } From 3ab061094c57e4986b273c68e5da727b63396d0b Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 27 Jul 2017 15:17:33 +0300 Subject: [PATCH 8/8] refactoring --- .../org/redisson/spring/cache/RedissonSpringCacheManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java b/redisson/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java index 696feb815..8d1ce777f 100644 --- a/redisson/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java +++ b/redisson/src/main/java/org/redisson/spring/cache/RedissonSpringCacheManager.java @@ -55,7 +55,7 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA RedissonClient redisson; Map configMap = new ConcurrentHashMap(); - private ConcurrentMap instanceMap = new ConcurrentHashMap(); + ConcurrentMap instanceMap = new ConcurrentHashMap(); String configLocation;