From 022144dc0eedd56d613a0640a9959056d0c614d2 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 11 Jan 2019 08:53:56 +0300 Subject: [PATCH 1/5] refactoring --- .../main/java/org/redisson/command/CommandAsyncService.java | 2 +- .../org/redisson/liveobject/core/AccessorInterceptor.java | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index d4d334b99..63bf367a8 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -161,7 +161,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { try { int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); if (!future.await(timeout)) { - ((RPromise)future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)")); + ((RPromise)future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java b/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java index e09d25580..971ab9e4a 100644 --- a/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java +++ b/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java @@ -31,20 +31,16 @@ import org.redisson.api.annotation.REntity; import org.redisson.api.annotation.REntity.TransformationMode; import org.redisson.api.annotation.RId; import org.redisson.api.annotation.RIndex; -import org.redisson.client.codec.Codec; import org.redisson.liveobject.misc.ClassUtils; import org.redisson.liveobject.misc.Introspectior; import org.redisson.liveobject.resolver.NamingScheme; -import net.bytebuddy.description.field.FieldDescription.InDefinedShape; -import net.bytebuddy.description.field.FieldList; import net.bytebuddy.implementation.bind.annotation.AllArguments; import net.bytebuddy.implementation.bind.annotation.FieldValue; import net.bytebuddy.implementation.bind.annotation.Origin; import net.bytebuddy.implementation.bind.annotation.RuntimeType; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.implementation.bind.annotation.This; -import net.bytebuddy.matcher.ElementMatchers; /** * This class is going to be instantiated and becomes a static field of From eb4200dfea42e64bf1afeb9c1b786bf118f1957b Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 11 Jan 2019 17:09:41 +0300 Subject: [PATCH 2/5] Show warning in replicated if slave nodes not found --- .../org/redisson/connection/ReplicatedConnectionManager.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index 176630f14..b580eac15 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -94,6 +94,9 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { stopThreads(); throw new RedisConnectionException("Can't connect to servers!"); } + if (this.config.getSlaveAddresses().isEmpty()) { + log.warn("Slave nodes not found! Please specify all nodes in replicated mode."); + } initSingleEntry(); From 55f6a77c29d1c041f38b62199cf70384b8f88003 Mon Sep 17 00:00:00 2001 From: Dhruva Krishnamurthy Date: Fri, 11 Jan 2019 07:33:47 -0800 Subject: [PATCH 3/5] Allow checking if lock is held by a thread - helps when using thread pools * When using executors/thread pools to acquire and release locks, it is helpful to be able to check if the lock is held by a specified thread and not just the 'current thread'. --- redisson/src/main/java/org/redisson/RedissonLock.java | 9 +++++++-- redisson/src/main/java/org/redisson/api/RLock.java | 9 +++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 5a56b4a41..ab18e3f4f 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -453,7 +453,12 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public boolean isHeldByCurrentThread() { - RFuture future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId())); + return isHeldByThread(Thread.currentThread().getId()); + } + + @Override + public boolean isHeldByThread(long threadId) { + final RFuture future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(threadId)); return get(future); } @@ -847,4 +852,4 @@ public class RedissonLock extends RedissonExpirable implements RLock { } -; \ No newline at end of file +; diff --git a/redisson/src/main/java/org/redisson/api/RLock.java b/redisson/src/main/java/org/redisson/api/RLock.java index ede752687..c47accaba 100644 --- a/redisson/src/main/java/org/redisson/api/RLock.java +++ b/redisson/src/main/java/org/redisson/api/RLock.java @@ -98,6 +98,15 @@ public interface RLock extends Lock, RExpirable, RLockAsync { */ boolean isLocked(); + /** + * Checks if this lock is held by the current thread + * + * @param threadId Thread ID of locking thread + * @return true if held by given thread + * otherwise false + */ + boolean isHeldByThread(long threadId); + /** * Checks if this lock is held by the current thread * From 44ea59f9f60ee0d2f586de7b0a5750c74783a33a Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 14 Jan 2019 08:59:52 +0300 Subject: [PATCH 4/5] Fixed - connection is not reconnected #1811 --- .../java/org/redisson/client/handler/ConnectionWatchdog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 8f1c36025..54339ac0d 100644 --- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -133,9 +133,9 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { refresh(connection, channel); log.debug("{} connected to {}, command: {}", connection, connection.getRedisClient().getAddr(), connection.getCurrentCommand()); } else { - log.warn("Can't connect " + connection + " to " + connection.getRedisClient().getAddr(), future.cause()); + channel.close(); + reconnect(connection, nextAttempt); } - } }); return; From bfc2d25ecb3e72560e9be557a447c4e49263618f Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 14 Jan 2019 13:37:05 +0300 Subject: [PATCH 5/5] Fixed - RedissonTopic.removeListener throws RejectedExecutionException --- .../java/org/redisson/pubsub/PublishSubscribeService.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index de9f38b41..9742e61ff 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -304,6 +304,10 @@ public class PublishSubscribeService { } public RFuture unsubscribe(final ChannelName channelName, final AsyncSemaphore lock) { + if (connectionManager.isShuttingDown()) { + return RedissonPromise.newSucceededFuture(null); + } + final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { lock.release(); @@ -333,6 +337,10 @@ public class PublishSubscribeService { } public RFuture unsubscribe(final ChannelName channelName, final PubSubType topicType) { + if (connectionManager.isShuttingDown()) { + return RedissonPromise.newSucceededFuture(null); + } + final RPromise result = new RedissonPromise(); final AsyncSemaphore lock = getSemaphore(channelName); lock.acquire(new Runnable() {