diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 5a56b4a41..ab18e3f4f 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -453,7 +453,12 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public boolean isHeldByCurrentThread() { - RFuture future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId())); + return isHeldByThread(Thread.currentThread().getId()); + } + + @Override + public boolean isHeldByThread(long threadId) { + final RFuture future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(threadId)); return get(future); } @@ -847,4 +852,4 @@ public class RedissonLock extends RedissonExpirable implements RLock { } -; \ No newline at end of file +; diff --git a/redisson/src/main/java/org/redisson/api/RLock.java b/redisson/src/main/java/org/redisson/api/RLock.java index ede752687..c47accaba 100644 --- a/redisson/src/main/java/org/redisson/api/RLock.java +++ b/redisson/src/main/java/org/redisson/api/RLock.java @@ -98,6 +98,15 @@ public interface RLock extends Lock, RExpirable, RLockAsync { */ boolean isLocked(); + /** + * Checks if this lock is held by the current thread + * + * @param threadId Thread ID of locking thread + * @return true if held by given thread + * otherwise false + */ + boolean isHeldByThread(long threadId); + /** * Checks if this lock is held by the current thread * diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 8f1c36025..54339ac0d 100644 --- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -133,9 +133,9 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { refresh(connection, channel); log.debug("{} connected to {}, command: {}", connection, connection.getRedisClient().getAddr(), connection.getCurrentCommand()); } else { - log.warn("Can't connect " + connection + " to " + connection.getRedisClient().getAddr(), future.cause()); + channel.close(); + reconnect(connection, nextAttempt); } - } }); return; diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index d4d334b99..63bf367a8 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -161,7 +161,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { try { int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); if (!future.await(timeout)) { - ((RPromise)future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)")); + ((RPromise)future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index 176630f14..b580eac15 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -94,6 +94,9 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { stopThreads(); throw new RedisConnectionException("Can't connect to servers!"); } + if (this.config.getSlaveAddresses().isEmpty()) { + log.warn("Slave nodes not found! Please specify all nodes in replicated mode."); + } initSingleEntry(); diff --git a/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java b/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java index e09d25580..971ab9e4a 100644 --- a/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java +++ b/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java @@ -31,20 +31,16 @@ import org.redisson.api.annotation.REntity; import org.redisson.api.annotation.REntity.TransformationMode; import org.redisson.api.annotation.RId; import org.redisson.api.annotation.RIndex; -import org.redisson.client.codec.Codec; import org.redisson.liveobject.misc.ClassUtils; import org.redisson.liveobject.misc.Introspectior; import org.redisson.liveobject.resolver.NamingScheme; -import net.bytebuddy.description.field.FieldDescription.InDefinedShape; -import net.bytebuddy.description.field.FieldList; import net.bytebuddy.implementation.bind.annotation.AllArguments; import net.bytebuddy.implementation.bind.annotation.FieldValue; import net.bytebuddy.implementation.bind.annotation.Origin; import net.bytebuddy.implementation.bind.annotation.RuntimeType; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.implementation.bind.annotation.This; -import net.bytebuddy.matcher.ElementMatchers; /** * This class is going to be instantiated and becomes a static field of diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index de9f38b41..9742e61ff 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -304,6 +304,10 @@ public class PublishSubscribeService { } public RFuture unsubscribe(final ChannelName channelName, final AsyncSemaphore lock) { + if (connectionManager.isShuttingDown()) { + return RedissonPromise.newSucceededFuture(null); + } + final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName); if (entry == null) { lock.release(); @@ -333,6 +337,10 @@ public class PublishSubscribeService { } public RFuture unsubscribe(final ChannelName channelName, final PubSubType topicType) { + if (connectionManager.isShuttingDown()) { + return RedissonPromise.newSucceededFuture(null); + } + final RPromise result = new RedissonPromise(); final AsyncSemaphore lock = getSemaphore(channelName); lock.acquire(new Runnable() {