refactoring

pull/1253/merge
Nikita 7 years ago
parent eb86e27fab
commit 2b559475f0

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -107,7 +106,6 @@ public class Redisson implements RedissonClient {
protected final Config config;
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
protected final UUID id = UUID.randomUUID();
protected final ConcurrentMap<String, ResponseEntry> responses = PlatformDependent.newConcurrentHashMap();
protected Redisson(Config config) {
@ -243,12 +241,12 @@ public class Redisson implements RedissonClient {
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name) {
return new RedissonListMultimap<K, V>(id, connectionManager.getCommandExecutor(), name);
return new RedissonListMultimap<K, V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimap<K, V>(id, codec, connectionManager.getCommandExecutor(), name);
return new RedissonListMultimap<K, V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
@ -273,32 +271,32 @@ public class Redisson implements RedissonClient {
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(id, connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimap<K, V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name) {
return new RedissonSetMultimapCache<K, V>(id, evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name, Codec codec) {
return new RedissonSetMultimapCache<K, V>(id, evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimapCache<K, V>(evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimapCache<K, V> getListMultimapCache(String name) {
return new RedissonListMultimapCache<K, V>(id, evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonListMultimapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimapCache<K, V> getListMultimapCache(String name, Codec codec) {
return new RedissonListMultimapCache<K, V>(id, evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimap<K, V>(id, codec, connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimap<K, V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
@ -343,17 +341,17 @@ public class Redisson implements RedissonClient {
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name, id);
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
@Override
public RLock getFairLock(String name) {
return new RedissonFairLock(connectionManager.getCommandExecutor(), name, id);
return new RedissonFairLock(connectionManager.getCommandExecutor(), name);
}
@Override
public RReadWriteLock getReadWriteLock(String name) {
return new RedissonReadWriteLock(connectionManager.getCommandExecutor(), name, id);
return new RedissonReadWriteLock(connectionManager.getCommandExecutor(), name);
}
@Override
@ -373,7 +371,7 @@ public class Redisson implements RedissonClient {
@Override
public RScheduledExecutorService getExecutorService(String name) {
return new RedissonExecutorService(connectionManager.getCodec(), connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, id.toString());
return new RedissonExecutorService(connectionManager.getCodec(), connectionManager.getCommandExecutor(), this, name, queueTransferService, responses);
}
@Override
@ -384,7 +382,7 @@ public class Redisson implements RedissonClient {
@Override
public RScheduledExecutorService getExecutorService(String name, Codec codec) {
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, id.toString());
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses);
}
@Override
@ -406,9 +404,9 @@ public class Redisson implements RedissonClient {
public RRemoteService getRemoteService(String name, Codec codec) {
String executorId;
if (codec == connectionManager.getCodec()) {
executorId = id.toString();
executorId = connectionManager.getId().toString();
} else {
executorId = id + ":" + name;
executorId = connectionManager.getId() + ":" + name;
}
return new RedissonRemoteService(codec, this, name, connectionManager.getCommandExecutor(), executorId, responses);
}
@ -538,7 +536,7 @@ public class Redisson implements RedissonClient {
@Override
public RCountDownLatch getCountDownLatch(String name) {
return new RedissonCountDownLatch(connectionManager.getCommandExecutor(), name, id);
return new RedissonCountDownLatch(connectionManager.getCommandExecutor(), name);
}
@Override
@ -573,7 +571,7 @@ public class Redisson implements RedissonClient {
@Override
public RBatch createBatch() {
RedissonBatch batch = new RedissonBatch(id, evictionScheduler, connectionManager);
RedissonBatch batch = new RedissonBatch(evictionScheduler, connectionManager);
if (config.isReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.BatchResult;
@ -58,7 +57,6 @@ public class RedissonBatch implements RBatch {
private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService;
private final UUID id;
private long timeout;
private int retryAttempts;
@ -69,10 +67,9 @@ public class RedissonBatch implements RBatch {
private boolean skipResult;
private boolean atomic;
public RedissonBatch(UUID id, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager);
this.evictionScheduler = evictionScheduler;
this.id = id;
}
@Override
@ -294,22 +291,22 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMultimapAsync<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(id, executorService, name);
return new RedissonSetMultimap<K, V>(executorService, name);
}
@Override
public <K, V> RMultimapAsync<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimap<K, V>(id, codec, executorService, name);
return new RedissonSetMultimap<K, V>(codec, executorService, name);
}
@Override
public <K, V> RMultimapAsync<K, V> getListMultimap(String name) {
return new RedissonListMultimap<K, V>(id, executorService, name);
return new RedissonListMultimap<K, V>(executorService, name);
}
@Override
public <K, V> RMultimapAsync<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimap<K, V>(id, codec, executorService, name);
return new RedissonListMultimap<K, V>(codec, executorService, name);
}
@Override
@ -324,22 +321,22 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMultimapCacheAsync<K, V> getSetMultimapCache(String name) {
return new RedissonSetMultimapCache<K, V>(id, evictionScheduler, executorService, name);
return new RedissonSetMultimapCache<K, V>(evictionScheduler, executorService, name);
}
@Override
public <K, V> RMultimapCacheAsync<K, V> getSetMultimapCache(String name, Codec codec) {
return new RedissonSetMultimapCache<K, V>(id, evictionScheduler, codec, executorService, name);
return new RedissonSetMultimapCache<K, V>(evictionScheduler, codec, executorService, name);
}
@Override
public <K, V> RMultimapCacheAsync<K, V> getListMultimapCache(String name) {
return new RedissonListMultimapCache<K, V>(id, evictionScheduler, executorService, name);
return new RedissonListMultimapCache<K, V>(evictionScheduler, executorService, name);
}
@Override
public <K, V> RMultimapCacheAsync<K, V> getListMultimapCache(String name, Codec codec) {
return new RedissonListMultimapCache<K, V>(id, evictionScheduler, codec, executorService, name);
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, executorService, name);
}
protected void enableRedissonReferenceSupport(Redisson redisson) {

@ -44,9 +44,9 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
private final UUID id;
protected RedissonCountDownLatch(CommandAsyncExecutor commandExecutor, String name, UUID id) {
protected RedissonCountDownLatch(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.id = id;
this.id = commandExecutor.getConnectionManager().getId();
}
public void await() throws InterruptedException {

@ -82,7 +82,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/**
*
@ -135,7 +134,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue<RExecutorFuture<?>>();
private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(PlatformDependent.<RedissonExecutorFutureReference, Boolean>newConcurrentHashMap());
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, String redissonId) {
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses) {
super();
this.codec = codec;
this.commandExecutor = commandExecutor;
@ -146,9 +145,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
this.responses = responses;
if (codec == connectionManager.getCodec()) {
this.executorId = redissonId;
this.executorId = connectionManager.getId().toString();
} else {
this.executorId = redissonId + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
this.executorId = connectionManager.getId().toString() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
}
remoteService = redisson.getRemoteService(name, codec);

@ -16,7 +16,6 @@
package org.redisson;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@ -45,8 +44,8 @@ public class RedissonFairLock extends RedissonLock implements RLock {
private final String threadsQueueName;
private final String timeoutSetName;
protected RedissonFairLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id);
protected RedissonFairLock(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
threadsQueueName = prefixName("redisson_lock_queue", name);
timeoutSetName = prefixName("redisson_lock_timeout", name);

@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
@ -47,12 +46,12 @@ public class RedissonListMultimap<K, V> extends RedissonMultimap<K, V> implement
private static final RedisStrictCommand<Boolean> LLEN_VALUE = new RedisStrictCommand<Boolean>("LLEN", new BooleanAmountReplayConvertor());
public RedissonListMultimap(UUID id, CommandAsyncExecutor connectionManager, String name) {
super(id, connectionManager, name);
public RedissonListMultimap(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
public RedissonListMultimap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(id, codec, connectionManager, name);
public RedissonListMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
@Override

@ -40,14 +40,14 @@ public class RedissonListMultimapCache<K, V> extends RedissonListMultimap<K, V>
private final RedissonMultimapCache<K> baseCache;
RedissonListMultimapCache(UUID id, EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(id, connectionManager, name);
RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, this, getTimeoutSetName(), prefix);
}
RedissonListMultimapCache(UUID id, EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(id, codec, connectionManager, name);
RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, this, getTimeoutSetName(), prefix);
}

@ -66,10 +66,10 @@ public class RedissonLock extends RedissonExpirable implements RLock {
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = id;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}

@ -107,13 +107,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RLock getLock(K key) {
String lockName = getLockName(key);
return redisson.getLock(lockName);
return new RedissonLock(commandExecutor, lockName);
}
@Override
public RReadWriteLock getReadWriteLock(K key) {
String lockName = getLockName(key);
return redisson.getReadWriteLock(lockName);
return new RedissonReadWriteLock(commandExecutor, lockName);
}
private String getLockName(Object key) {

@ -99,7 +99,7 @@ public class RedissonMultiLock implements Lock {
return result;
}
protected void tryLockAsync(long leaseTime, TimeUnit unit, long waitTime, RPromise<Void> result) {
protected void tryLockAsync(final long leaseTime, final TimeUnit unit, final long waitTime, final RPromise<Void> result) {
tryLockAsync(waitTime, leaseTime, unit).addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
@ -174,7 +174,7 @@ public class RedissonMultiLock implements Lock {
return RedissonPromise.newSucceededFuture(null);
}
RPromise<Void> result = new RedissonPromise<Void>();
final RPromise<Void> result = new RedissonPromise<Void>();
final AtomicInteger counter = new AtomicInteger(locks.size());
for (RLock lock : locks) {
lock.unlockAsync(threadId).addListener(new FutureListener<Void>() {
@ -286,15 +286,15 @@ public class RedissonMultiLock implements Lock {
return true;
}
private void tryAcquireLockAsync(ListIterator<RLock> iterator, List<RLock> acquiredLocks, RPromise<Boolean> result,
long lockWaitTime, long waitTime, long leaseTime, long newLeaseTime,
AtomicLong remainTime, AtomicLong time, AtomicInteger failedLocksLimit, TimeUnit unit, long threadId) {
private void tryAcquireLockAsync(final ListIterator<RLock> iterator, final List<RLock> acquiredLocks, final RPromise<Boolean> result,
final long lockWaitTime, final long waitTime, final long leaseTime, final long newLeaseTime,
final AtomicLong remainTime, final AtomicLong time, final AtomicInteger failedLocksLimit, final TimeUnit unit, final long threadId) {
if (!iterator.hasNext()) {
checkLeaseTimeAsync(acquiredLocks, result, leaseTime, unit);
return;
}
RLock lock = iterator.next();
final RLock lock = iterator.next();
RPromise<Boolean> lockAcquired = new RedissonPromise<Boolean>();
if (waitTime == -1 && leaseTime == -1) {
lock.tryLockAsync(threadId)
@ -385,7 +385,7 @@ public class RedissonMultiLock implements Lock {
result.trySuccess(true);
}
protected void checkRemainTimeAsync(ListIterator<RLock> iterator, List<RLock> acquiredLocks, RPromise<Boolean> result,
protected void checkRemainTimeAsync(ListIterator<RLock> iterator, List<RLock> acquiredLocks, final RPromise<Boolean> result,
long lockWaitTime, long waitTime, long leaseTime, long newLeaseTime,
AtomicLong remainTime, AtomicLong time, AtomicInteger failedLocksLimit, TimeUnit unit, long threadId) {
if (remainTime.get() != -1) {

@ -59,28 +59,28 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
private final UUID id;
final String prefix;
RedissonMultimap(UUID id, CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
this.id = id;
RedissonMultimap(CommandAsyncExecutor commandAsyncExecutor, String name) {
super(commandAsyncExecutor, name);
this.id = commandAsyncExecutor.getConnectionManager().getId();
prefix = suffixName(getName(), "");
}
RedissonMultimap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
this.id = id;
RedissonMultimap(Codec codec, CommandAsyncExecutor commandAsyncExecutor, String name) {
super(codec, commandAsyncExecutor, name);
this.id = commandAsyncExecutor.getConnectionManager().getId();
prefix = suffixName(getName(), "");
}
@Override
public RLock getLock(K key) {
String lockName = getLockName(key);
return new RedissonLock((CommandExecutor)commandExecutor, lockName, id);
return new RedissonLock((CommandExecutor)commandExecutor, lockName);
}
@Override
public RReadWriteLock getReadWriteLock(K key) {
String lockName = getLockName(key);
return new RedissonReadWriteLock((CommandExecutor)commandExecutor, lockName, id);
return new RedissonReadWriteLock((CommandExecutor)commandExecutor, lockName);
}
private String getLockName(Object key) {

@ -121,12 +121,12 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RReadWriteLockReactive getReadWriteLock(String name) {
return new RedissonReadWriteLockReactive(commandExecutor, name, id);
return new RedissonReadWriteLockReactive(commandExecutor, name);
}
@Override
public RLockReactive getLock(String name) {
return new RedissonLockReactive(commandExecutor, name, id);
return new RedissonLockReactive(commandExecutor, name);
}
@Override

@ -16,7 +16,6 @@
package org.redisson;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@ -40,8 +39,8 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedissonReadLock extends RedissonLock implements RLock {
public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id);
public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
@Override

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import org.redisson.api.RLock;
@ -37,21 +36,18 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {
private final UUID id;
public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.id = id;
}
@Override
public RLock readLock() {
return new RedissonReadLock(commandExecutor, getName(), id);
return new RedissonReadLock(commandExecutor, getName());
}
@Override
public RLock writeLock() {
return new RedissonWriteLock(commandExecutor, getName(), id);
return new RedissonWriteLock(commandExecutor, getName());
}
}

@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
@ -50,12 +49,12 @@ public class RedissonSetMultimap<K, V> extends RedissonMultimap<K, V> implements
private static final RedisStrictCommand<Boolean> SCARD_VALUE = new RedisStrictCommand<Boolean>("SCARD", new BooleanAmountReplayConvertor());
private static final RedisCommand<Boolean> SISMEMBER_VALUE = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor());
public RedissonSetMultimap(UUID id, CommandAsyncExecutor connectionManager, String name) {
super(id, connectionManager, name);
public RedissonSetMultimap(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
public RedissonSetMultimap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(id, codec, connectionManager, name);
public RedissonSetMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
@Override

@ -17,7 +17,6 @@ package org.redisson;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
@ -40,14 +39,14 @@ public class RedissonSetMultimapCache<K, V> extends RedissonSetMultimap<K, V> im
private final RedissonMultimapCache<K> baseCache;
RedissonSetMultimapCache(UUID id, EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(id, connectionManager, name);
RedissonSetMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, this, getTimeoutSetName(), prefix);
}
RedissonSetMultimapCache(UUID id, EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(id, codec, connectionManager, name);
RedissonSetMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, this, getTimeoutSetName(), prefix);
}

@ -16,7 +16,6 @@
package org.redisson;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@ -40,8 +39,8 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedissonWriteLock extends RedissonLock implements RLock {
protected RedissonWriteLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id);
protected RedissonWriteLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
@Override

@ -19,7 +19,6 @@ import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import org.redisson.transaction.RTransaction;
/**
* Main Redisson interface for access
@ -910,8 +909,6 @@ public interface RedissonClient {
*/
RBatch createBatch();
RTransaction createTransaction();
/**
* Returns interface with methods for Redis keys.
* Each of Redis/Redisson object associated with own key

@ -18,6 +18,7 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -45,6 +46,8 @@ import io.netty.util.TimerTask;
*/
public interface ConnectionManager {
UUID getId();
CommandSyncService getCommandExecutor();
ExecutorService getExecutor();

@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@ -117,6 +118,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
};
protected final UUID id = UUID.randomUUID();
public static final int MAX_SLOT = 16384;
protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT-1);
@ -284,6 +287,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return result;
}
public UUID getId() {
return id;
}
public boolean isClusterMode() {
return false;
}

@ -41,11 +41,11 @@ import io.netty.buffer.ByteBuf;
public class RedissonListMultimapReactive<K, V> extends RedissonBaseMultimapReactive<K, V> implements RListMultimapReactive<K, V> {
public RedissonListMultimapReactive(UUID id, CommandReactiveExecutor commandExecutor, String name) {
super(new RedissonListMultimap<K, V>(id, commandExecutor, name), commandExecutor, name);
super(new RedissonListMultimap<K, V>(commandExecutor, name), commandExecutor, name);
}
public RedissonListMultimapReactive(UUID id, Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(new RedissonListMultimap<K, V>(id, codec, commandExecutor, name), codec, commandExecutor, name);
super(new RedissonListMultimap<K, V>(codec, commandExecutor, name), codec, commandExecutor, name);
}
@Override

@ -15,7 +15,6 @@
*/
package org.redisson.reactive;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
@ -37,13 +36,13 @@ public class RedissonLockReactive extends RedissonExpirableReactive implements R
private final RLockAsync instance;
public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, UUID id) {
public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name);
instance = createLock(connectionManager, name, id);
instance = createLock(connectionManager, name);
}
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
return new RedissonLock(commandExecutor, name, id);
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return new RedissonLock(commandExecutor, name);
}
@Override

@ -15,8 +15,6 @@
*/
package org.redisson.reactive;
import java.util.UUID;
import org.redisson.RedissonReadWriteLock;
import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive;
@ -33,19 +31,17 @@ import org.redisson.command.CommandReactiveExecutor;
public class RedissonReadWriteLockReactive extends RedissonExpirableReactive implements RReadWriteLockReactive {
private final RReadWriteLock instance;
private final UUID id;
public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name, UUID id) {
public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.id = id;
this.instance = new RedissonReadWriteLock(commandExecutor, name, id);
this.instance = new RedissonReadWriteLock(commandExecutor, name);
}
@Override
public RLockReactive readLock() {
return new RedissonLockReactive(commandExecutor, getName(), id) {
return new RedissonLockReactive(commandExecutor, getName()) {
@Override
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return instance.readLock();
}
};
@ -53,9 +49,9 @@ public class RedissonReadWriteLockReactive extends RedissonExpirableReactive imp
@Override
public RLockReactive writeLock() {
return new RedissonLockReactive(commandExecutor, getName(), id) {
return new RedissonLockReactive(commandExecutor, getName()) {
@Override
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return instance.writeLock();
}
};

@ -45,8 +45,8 @@ public class RedissonSemaphoreReactive extends RedissonExpirableReactive impleme
instance = new RedissonSemaphore(commandExecutor, name, semaphorePubSub);
}
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
return new RedissonLock(commandExecutor, name, id);
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return new RedissonLock(commandExecutor, name);
}
@Override

@ -42,11 +42,11 @@ import io.netty.buffer.ByteBuf;
public class RedissonSetMultimapReactive<K, V> extends RedissonBaseMultimapReactive<K, V> implements RSetMultimapReactive<K, V> {
public RedissonSetMultimapReactive(UUID id, CommandReactiveExecutor commandExecutor, String name) {
super(new RedissonSetMultimap<K, V>(id, commandExecutor, name), commandExecutor, name);
super(new RedissonSetMultimap<K, V>(commandExecutor, name), commandExecutor, name);
}
public RedissonSetMultimapReactive(UUID id, Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(new RedissonSetMultimap<K, V>(id, codec, commandExecutor, name), codec, commandExecutor, name);
super(new RedissonSetMultimap<K, V>(codec, commandExecutor, name), codec, commandExecutor, name);
}
@Override

@ -635,8 +635,6 @@ public class RedissonTopicTest {
await().atMost(10, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
Assert.assertTrue(executed.get());
Thread.sleep(1000000);
redisson.shutdown();
sentinel1.stop();
sentinel2.stop();

Loading…
Cancel
Save