Merge branch 'master' into 3.0.0

pull/1933/head
Nikita Koksharov 6 years ago
commit 07cb4f2f70

@ -453,7 +453,12 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public boolean isHeldByCurrentThread() {
RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId()));
return isHeldByThread(Thread.currentThread().getId());
}
@Override
public boolean isHeldByThread(long threadId) {
final RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(threadId));
return get(future);
}

@ -98,6 +98,15 @@ public interface RLock extends Lock, RExpirable, RLockAsync {
*/
boolean isLocked();
/**
* Checks if this lock is held by the current thread
*
* @param threadId Thread ID of locking thread
* @return <code>true</code> if held by given thread
* otherwise <code>false</code>
*/
boolean isHeldByThread(long threadId);
/**
* Checks if this lock is held by the current thread
*

@ -133,9 +133,9 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
refresh(connection, channel);
log.debug("{} connected to {}, command: {}", connection, connection.getRedisClient().getAddr(), connection.getCurrentCommand());
} else {
log.warn("Can't connect " + connection + " to " + connection.getRedisClient().getAddr(), future.cause());
channel.close();
reconnect(connection, nextAttempt);
}
}
});
return;

@ -161,7 +161,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
try {
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!future.await(timeout)) {
((RPromise)future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)"));
((RPromise<?>)future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

@ -94,6 +94,9 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
stopThreads();
throw new RedisConnectionException("Can't connect to servers!");
}
if (this.config.getSlaveAddresses().isEmpty()) {
log.warn("Slave nodes not found! Please specify all nodes in replicated mode.");
}
initSingleEntry();

@ -31,20 +31,16 @@ import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.REntity.TransformationMode;
import org.redisson.api.annotation.RId;
import org.redisson.api.annotation.RIndex;
import org.redisson.client.codec.Codec;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme;
import net.bytebuddy.description.field.FieldDescription.InDefinedShape;
import net.bytebuddy.description.field.FieldList;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.FieldValue;
import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import net.bytebuddy.implementation.bind.annotation.This;
import net.bytebuddy.matcher.ElementMatchers;
/**
* This class is going to be instantiated and becomes a <b>static</b> field of

@ -304,6 +304,10 @@ public class PublishSubscribeService {
}
public RFuture<Void> unsubscribe(final ChannelName channelName, final AsyncSemaphore lock) {
if (connectionManager.isShuttingDown()) {
return RedissonPromise.newSucceededFuture(null);
}
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
lock.release();
@ -333,6 +337,10 @@ public class PublishSubscribeService {
}
public RFuture<Codec> unsubscribe(final ChannelName channelName, final PubSubType topicType) {
if (connectionManager.isShuttingDown()) {
return RedissonPromise.newSucceededFuture(null);
}
final RPromise<Codec> result = new RedissonPromise<Codec>();
final AsyncSemaphore lock = getSemaphore(channelName);
lock.acquire(new Runnable() {

Loading…
Cancel
Save