refactoring

pull/1303/head
Nikita 7 years ago
parent d9b7a2730d
commit 849da63bb9

@ -29,6 +29,7 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -259,7 +260,7 @@ public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBin
@Override @Override
public RFuture<Void> setAsync(byte[] value) { public RFuture<Void> setAsync(byte[] value) {
if (value.length > 512*1024*1024) { if (value.length > 512*1024*1024) {
RPromise<Void> result = newPromise(); RPromise<Void> result = new RedissonPromise<Void>();
int chunkSize = 10*1024*1024; int chunkSize = 10*1024*1024;
write(value, result, chunkSize, 0); write(value, result, chunkSize, 0);
return result; return result;

@ -69,7 +69,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override @Override
public RFuture<Boolean> addAsync(V e) { public RFuture<Boolean> addAsync(V e) {
final RPromise<Boolean> result = commandExecutor.getConnectionManager().newPromise(); final RPromise<Boolean> result = new RedissonPromise<Boolean>();
RFuture<Boolean> future = offerAsync(e); RFuture<Boolean> future = offerAsync(e);
future.addListener(new FutureListener<Boolean>() { future.addListener(new FutureListener<Boolean>() {
@ -171,7 +171,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override @Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) { public RFuture<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
@ -394,7 +394,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override @Override
public RFuture<Boolean> addAllAsync(final Collection<? extends V> c) { public RFuture<Boolean> addAllAsync(final Collection<? extends V> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
RedissonQueueSemaphore semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName(), semaphorePubSub); RedissonQueueSemaphore semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName(), semaphorePubSub);

@ -30,6 +30,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom; import io.netty.util.internal.ThreadLocalRandom;
@ -299,7 +300,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override @Override
public RFuture<Boolean> containsAllAsync(Collection<?> c) { public RFuture<Boolean> containsAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(true); return RedissonPromise.newSucceededFuture(true);
} }
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
@ -331,7 +332,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override @Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) { public RFuture<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,

@ -43,6 +43,7 @@ import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompositeIterable; import org.redisson.misc.CompositeIterable;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
@ -203,7 +204,7 @@ public class RedissonKeys implements RKeys {
@Override @Override
public RFuture<Long> deleteByPatternAsync(final String pattern) { public RFuture<Long> deleteByPatternAsync(final String pattern) {
final int batchSize = 100; final int batchSize = 100;
final RPromise<Long> result = commandExecutor.getConnectionManager().newPromise(); final RPromise<Long> result = new RedissonPromise<Long>();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong(); final AtomicLong count = new AtomicLong();
Collection<MasterSlaveEntry> entries = commandExecutor.getConnectionManager().getEntrySet(); Collection<MasterSlaveEntry> entries = commandExecutor.getConnectionManager().getEntrySet();
@ -309,7 +310,7 @@ public class RedissonKeys implements RKeys {
list.add(key); list.add(key);
} }
final RPromise<Long> result = commandExecutor.getConnectionManager().newPromise(); final RPromise<Long> result = new RedissonPromise<Long>();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong(); final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size()); final AtomicLong executed = new AtomicLong(range2key.size());

@ -27,6 +27,7 @@ import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
/** /**
* Sorted set contained values of String type * Sorted set contained values of String type
@ -281,7 +282,7 @@ public class RedissonLexSortedSet extends RedissonScoredSortedSet<String> implem
@Override @Override
public RFuture<Boolean> addAllAsync(Collection<? extends String> c) { public RFuture<Boolean> addAllAsync(Collection<? extends String> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
List<Object> params = new ArrayList<Object>(2*c.size()); List<Object> params = new ArrayList<Object>(2*c.size());
params.add(getName()); params.add(getName());

@ -159,7 +159,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override @Override
public RFuture<Boolean> containsAllAsync(Collection<?> c) { public RFuture<Boolean> containsAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(true); return RedissonPromise.newSucceededFuture(true);
} }
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
@ -188,7 +188,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override @Override
public RFuture<Boolean> addAllAsync(final Collection<? extends V> c) { public RFuture<Boolean> addAllAsync(final Collection<? extends V> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
List<Object> args = new ArrayList<Object>(c.size() + 1); List<Object> args = new ArrayList<Object>(c.size() + 1);
@ -204,7 +204,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
} }
if (coll.isEmpty()) { if (coll.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
if (index == 0) { // prepend elements to list if (index == 0) { // prepend elements to list
@ -246,7 +246,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override @Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) { public RFuture<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,

@ -358,7 +358,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (!cache.containsKey(cacheKey)) { if (!cache.containsKey(cacheKey)) {
return super.containsKeyAsync(key); return super.containsKeyAsync(key);
} }
return newSucceededFuture(true); return RedissonPromise.newSucceededFuture(true);
} }
@Override @Override
@ -369,7 +369,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (!cache.containsValue(cacheValue)) { if (!cache.containsValue(cacheValue)) {
return super.containsValueAsync(value); return super.containsValueAsync(value);
} }
return newSucceededFuture(true); return RedissonPromise.newSucceededFuture(true);
} }
@Override @Override
@ -379,7 +379,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
final CacheKey cacheKey = toCacheKey(key); final CacheKey cacheKey = toCacheKey(key);
CacheValue cacheValue = cache.get(cacheKey); CacheValue cacheValue = cache.get(cacheKey);
if (cacheValue != null && cacheValue.getValue() != null) { if (cacheValue != null && cacheValue.getValue() != null) {
return newSucceededFuture((V)cacheValue.getValue()); return RedissonPromise.newSucceededFuture((V)cacheValue.getValue());
} }
RFuture<V> future = super.getAsync((K)key); RFuture<V> future = super.getAsync((K)key);
@ -681,7 +681,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override @Override
public RFuture<Map<K, V>> getAllAsync(Set<K> keys) { public RFuture<Map<K, V>> getAllAsync(Set<K> keys) {
if (keys.isEmpty()) { if (keys.isEmpty()) {
return newSucceededFuture(Collections.<K, V>emptyMap()); return RedissonPromise.newSucceededFuture(Collections.<K, V>emptyMap());
} }
final Map<K, V> result = new HashMap<K, V>(); final Map<K, V> result = new HashMap<K, V>();

@ -32,6 +32,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.LockPubSub; import org.redisson.pubsub.LockPubSub;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -264,7 +265,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
} }
protected RFuture<Void> acquireFailedAsync(long threadId) { protected RFuture<Void> acquireFailedAsync(long threadId) {
return newSucceededFuture(null); return RedissonPromise.newSucceededFuture(null);
} }
@Override @Override
@ -467,7 +468,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override @Override
public RFuture<Void> unlockAsync(final long threadId) { public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = newPromise(); final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId); RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() { future.addListener(new FutureListener<Boolean>() {
@ -513,7 +514,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override @Override
public RFuture<Void> lockAsync(final long leaseTime, final TimeUnit unit, final long currentThreadId) { public RFuture<Void> lockAsync(final long leaseTime, final TimeUnit unit, final long currentThreadId) {
final RPromise<Void> result = newPromise(); final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId); RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
ttlFuture.addListener(new FutureListener<Long>() { ttlFuture.addListener(new FutureListener<Long>() {
@Override @Override
@ -636,7 +637,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override @Override
public RFuture<Boolean> tryLockAsync(final long waitTime, final long leaseTime, final TimeUnit unit, public RFuture<Boolean> tryLockAsync(final long waitTime, final long leaseTime, final TimeUnit unit,
final long currentThreadId) { final long currentThreadId) {
final RPromise<Boolean> result = newPromise(); final RPromise<Boolean> result = new RedissonPromise<Boolean>();
final AtomicLong time = new AtomicLong(unit.toMillis(waitTime)); final AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
final long currentTime = System.currentTimeMillis(); final long currentTime = System.currentTimeMillis();

@ -198,7 +198,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override @Override
public RFuture<Map<K, V>> getAllAsync(final Set<K> keys) { public RFuture<Map<K, V>> getAllAsync(final Set<K> keys) {
if (keys.isEmpty()) { if (keys.isEmpty()) {
return newSucceededFuture(Collections.<K, V>emptyMap()); return RedissonPromise.newSucceededFuture(Collections.<K, V>emptyMap());
} }
RFuture<Map<K, V>> future = getAllOperationAsync(keys); RFuture<Map<K, V>> future = getAllOperationAsync(keys);
@ -269,7 +269,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override @Override
public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) { public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
if (map.isEmpty()) { if (map.isEmpty()) {
return newSucceededFuture(null); return RedissonPromise.newSucceededFuture(null);
} }
RFuture<Void> future = putAllOperationAsync(map); RFuture<Void> future = putAllOperationAsync(map);
@ -892,7 +892,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
} }
if (keys.length == 0) { if (keys.length == 0) {
return newSucceededFuture(0L); return RedissonPromise.newSucceededFuture(0L);
} }
if (hasNoWriter()) { if (hasNoWriter()) {

@ -45,6 +45,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandExecutor;
import org.redisson.misc.Hash; import org.redisson.misc.Hash;
import org.redisson.misc.RedissonPromise;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -204,7 +205,7 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
@Override @Override
public RFuture<Long> fastRemoveAsync(K ... keys) { public RFuture<Long> fastRemoveAsync(K ... keys) {
if (keys == null || keys.length == 0) { if (keys == null || keys.length == 0) {
return newSucceededFuture(0L); return RedissonPromise.newSucceededFuture(0L);
} }
List<Object> mapKeys = new ArrayList<Object>(keys.length); List<Object> mapKeys = new ArrayList<Object>(keys.length);

@ -26,7 +26,6 @@ import org.redisson.api.RObject;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory; import org.redisson.misc.RedissonObjectFactory;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -75,14 +74,6 @@ public abstract class RedissonObject implements RObject {
return commandExecutor.get(future); return commandExecutor.get(future);
} }
protected <V> RPromise<V> newPromise() {
return commandExecutor.getConnectionManager().newPromise();
}
protected <V> RFuture<V> newSucceededFuture(V result) {
return commandExecutor.getConnectionManager().newSucceededFuture(result);
}
@Override @Override
public String getName() { public String getName() {
return name; return name;

@ -26,6 +26,7 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.SemaphorePubSub; import org.redisson.pubsub.SemaphorePubSub;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
@ -34,7 +35,6 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/** /**
* *
@ -124,7 +124,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} }
private RFuture<String> acquireAsync(final int permits, final long ttl, final TimeUnit timeUnit) { private RFuture<String> acquireAsync(final int permits, final long ttl, final TimeUnit timeUnit) {
final RPromise<String> result = newPromise(); final RPromise<String> result = new RedissonPromise<String>();
long timeoutDate = calcTimeout(ttl, timeUnit); long timeoutDate = calcTimeout(ttl, timeUnit);
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate); RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.addListener(new FutureListener<String>() { tryAcquireFuture.addListener(new FutureListener<String>() {
@ -373,7 +373,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} }
public RFuture<String> tryAcquireAsync() { public RFuture<String> tryAcquireAsync() {
final RPromise<String> result = newPromise(); final RPromise<String> result = new RedissonPromise<String>();
RFuture<String> res = tryAcquireAsync(1, nonExpirableTimeout); RFuture<String> res = tryAcquireAsync(1, nonExpirableTimeout);
res.addListener(new FutureListener<String>() { res.addListener(new FutureListener<String>() {
@Override @Override
@ -513,7 +513,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} }
private RFuture<String> tryAcquireAsync(final int permits, long waitTime, final long ttl, final TimeUnit timeUnit) { private RFuture<String> tryAcquireAsync(final int permits, long waitTime, final long ttl, final TimeUnit timeUnit) {
final RPromise<String> result = newPromise(); final RPromise<String> result = new RedissonPromise<String>();
final AtomicLong time = new AtomicLong(timeUnit.toMillis(waitTime)); final AtomicLong time = new AtomicLong(timeUnit.toMillis(waitTime));
final long current = System.currentTimeMillis(); final long current = System.currentTimeMillis();
long timeoutDate = calcTimeout(ttl, timeUnit); long timeoutDate = calcTimeout(ttl, timeUnit);
@ -637,7 +637,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override @Override
public RFuture<Void> releaseAsync(final String permitId) { public RFuture<Void> releaseAsync(final String permitId) {
final RPromise<Void> result = newPromise(); final RPromise<Void> result = new RedissonPromise<Void>();
tryReleaseAsync(permitId).addListener(new FutureListener<Boolean>() { tryReleaseAsync(permitId).addListener(new FutureListener<Boolean>() {
@Override @Override
public void operationComplete(Future<Boolean> future) throws Exception { public void operationComplete(Future<Boolean> future) throws Exception {

@ -42,6 +42,7 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RedissonPromise;
/** /**
* *
@ -199,7 +200,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override @Override
public RFuture<Long> addAllAsync(Map<V, Double> objects) { public RFuture<Long> addAllAsync(Map<V, Double> objects) {
if (objects.isEmpty()) { if (objects.isEmpty()) {
return newSucceededFuture(0L); return RedissonPromise.newSucceededFuture(0L);
} }
List<Object> params = new ArrayList<Object>(objects.size()*2+1); List<Object> params = new ArrayList<Object>(objects.size()*2+1);
params.add(getName()); params.add(getName());
@ -353,7 +354,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override @Override
public RFuture<Boolean> containsAllAsync(Collection<?> c) { public RFuture<Boolean> containsAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(true); return RedissonPromise.newSucceededFuture(true);
} }
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
@ -370,7 +371,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override @Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) { public RFuture<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
List<Object> params = new ArrayList<Object>(c.size()+1); List<Object> params = new ArrayList<Object>(c.size()+1);

@ -29,6 +29,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.SemaphorePubSub; import org.redisson.pubsub.SemaphorePubSub;
import io.netty.util.Timeout; import io.netty.util.Timeout;
@ -101,7 +102,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override @Override
public RFuture<Void> acquireAsync(final int permits) { public RFuture<Void> acquireAsync(final int permits) {
final RPromise<Void> result = newPromise(); final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits); RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);
tryAcquireFuture.addListener(new FutureListener<Boolean>() { tryAcquireFuture.addListener(new FutureListener<Boolean>() {
@Override @Override
@ -358,7 +359,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override @Override
public RFuture<Boolean> tryAcquireAsync(final int permits, long waitTime, TimeUnit unit) { public RFuture<Boolean> tryAcquireAsync(final int permits, long waitTime, TimeUnit unit) {
final RPromise<Boolean> result = newPromise(); final RPromise<Boolean> result = new RedissonPromise<Boolean>();
final AtomicLong time = new AtomicLong(unit.toMillis(waitTime)); final AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
final long current = System.currentTimeMillis(); final long current = System.currentTimeMillis();
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits); RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);

@ -37,6 +37,7 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RedissonPromise;
/** /**
* Distributed and concurrent implementation of {@link java.util.Set} * Distributed and concurrent implementation of {@link java.util.Set}
@ -212,7 +213,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
@Override @Override
public RFuture<Boolean> containsAllAsync(Collection<?> c) { public RFuture<Boolean> containsAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(true); return RedissonPromise.newSucceededFuture(true);
} }
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
@ -231,7 +232,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
@Override @Override
public RFuture<Boolean> addAllAsync(Collection<? extends V> c) { public RFuture<Boolean> addAllAsync(Collection<? extends V> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
List<Object> args = new ArrayList<Object>(c.size() + 1); List<Object> args = new ArrayList<Object>(c.size() + 1);
@ -262,7 +263,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
@Override @Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) { public RFuture<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
List<Object> args = new ArrayList<Object>(c.size() + 1); List<Object> args = new ArrayList<Object>(c.size() + 1);

@ -15,7 +15,6 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -32,16 +31,13 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;
import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RedissonPromise;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -260,7 +256,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override @Override
public RFuture<Boolean> containsAllAsync(Collection<?> c) { public RFuture<Boolean> containsAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(true); return RedissonPromise.newSucceededFuture(true);
} }
List<Object> params = new ArrayList<Object>(c.size() + 1); List<Object> params = new ArrayList<Object>(c.size() + 1);
@ -290,7 +286,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override @Override
public RFuture<Boolean> addAllAsync(Collection<? extends V> c) { public RFuture<Boolean> addAllAsync(Collection<? extends V> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
long score = 92233720368547758L - System.currentTimeMillis(); long score = 92233720368547758L - System.currentTimeMillis();
@ -335,7 +331,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override @Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) { public RFuture<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
List<Object> params = new ArrayList<Object>(c.size()+1); List<Object> params = new ArrayList<Object>(c.size()+1);

@ -25,7 +25,6 @@ import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import org.redisson.api.RBucket; import org.redisson.api.RBucket;
@ -40,6 +39,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -254,7 +254,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
} }
public RFuture<Boolean> addAsync(final V value) { public RFuture<Boolean> addAsync(final V value) {
final RPromise<Boolean> promise = newPromise(); final RPromise<Boolean> promise = new RedissonPromise<Boolean>();
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
public void run() { public void run() {
try { try {
@ -270,7 +270,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override @Override
public RFuture<Boolean> removeAsync(final Object value) { public RFuture<Boolean> removeAsync(final Object value) {
final RPromise<Boolean> promise = newPromise(); final RPromise<Boolean> promise = new RedissonPromise<Boolean>();
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override @Override
public void run() { public void run() {

@ -39,6 +39,7 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.Convertor; import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
/** /**
* Distributed and concurrent implementation of {@link java.util.List} * Distributed and concurrent implementation of {@link java.util.List}
@ -61,7 +62,7 @@ public class RedissonSubList<V> extends RedissonList<V> implements RList<V> {
public RFuture<Integer> sizeAsync() { public RFuture<Integer> sizeAsync() {
if (size != -1) { if (size != -1) {
return newSucceededFuture(size); return RedissonPromise.newSucceededFuture(size);
} }
return commandExecutor.readAsync(getName(), codec, new RedisStrictCommand<Integer>("LLEN", new IntegerReplayConvertor() { return commandExecutor.readAsync(getName(), codec, new RedisStrictCommand<Integer>("LLEN", new IntegerReplayConvertor() {
@Override @Override
@ -112,7 +113,7 @@ public class RedissonSubList<V> extends RedissonList<V> implements RList<V> {
@Override @Override
public RFuture<Boolean> addAllAsync(Collection<? extends V> c) { public RFuture<Boolean> addAllAsync(Collection<? extends V> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
return addAllAsync(toIndex.get() - fromIndex, c); return addAllAsync(toIndex.get() - fromIndex, c);
@ -123,7 +124,7 @@ public class RedissonSubList<V> extends RedissonList<V> implements RList<V> {
checkIndex(index); checkIndex(index);
if (coll.isEmpty()) { if (coll.isEmpty()) {
return newSucceededFuture(false); return RedissonPromise.newSucceededFuture(false);
} }
if (index == 0) { // prepend elements to list if (index == 0) { // prepend elements to list

@ -195,7 +195,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private RFuture<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) { private RFuture<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
RedisConnection connection = nodeConnections.get(addr); RedisConnection connection = nodeConnections.get(addr);
if (connection != null) { if (connection != null) {
return newSucceededFuture(connection); return RedissonPromise.newSucceededFuture(connection);
} }
RedisClient client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts()); RedisClient client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts());
@ -233,7 +233,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
e = new RedisException("Failed to add master: " + e = new RedisException("Failed to add master: " +
partition.getMasterAddress() + ". Reason - server has FAIL flag"); partition.getMasterAddress() + ". Reason - server has FAIL flag");
} }
return newFailedFuture(e); return RedissonPromise.newFailedFuture(e);
} }
final RPromise<Collection<RFuture<Void>>> result = new RedissonPromise<Collection<RFuture<Void>>>(); final RPromise<Collection<RFuture<Void>>> result = new RedissonPromise<Collection<RFuture<Void>>>();
@ -572,7 +572,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
if (newMasters.isEmpty()) { if (newMasters.isEmpty()) {
return newSucceededFuture(null); return RedissonPromise.newSucceededFuture(null);
} }
final RPromise<Void> result = new RedissonPromise<Void>(); final RPromise<Void> result = new RedissonPromise<Void>();

@ -32,7 +32,6 @@ import org.redisson.command.CommandSyncService;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.AsyncSemaphore;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
@ -58,8 +57,6 @@ public interface ConnectionManager {
AsyncSemaphore getSemaphore(String channelName); AsyncSemaphore getSemaphore(String channelName);
<R> RFuture<R> newSucceededFuture(R value);
ConnectionEventsHub getConnectionEventsHub(); ConnectionEventsHub getConnectionEventsHub();
boolean isShutdown(); boolean isShutdown();
@ -72,8 +69,6 @@ public interface ConnectionManager {
IdleConnectionWatcher getConnectionWatcher(); IdleConnectionWatcher getConnectionWatcher();
<R> RFuture<R> newFailedFuture(Throwable cause);
Collection<RedisClientEntry> getClients(); Collection<RedisClientEntry> getClients();
void shutdownAsync(RedisClient client); void shutdownAsync(RedisClient client);
@ -90,8 +85,6 @@ public interface ConnectionManager {
MasterSlaveEntry getEntry(InetSocketAddress address); MasterSlaveEntry getEntry(InetSocketAddress address);
<R> RPromise<R> newPromise();
void releaseRead(NodeSource source, RedisConnection connection); void releaseRead(NodeSource source, RedisConnection connection);
void releaseWrite(NodeSource source, RedisConnection connection); void releaseWrite(NodeSource source, RedisConnection connection);

@ -220,7 +220,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.cfg = cfg; this.cfg = cfg;
this.codec = cfg.getCodec(); this.codec = cfg.getCodec();
this.shutdownPromise = newPromise(); this.shutdownPromise = new RedissonPromise<Boolean>();
this.commandExecutor = new CommandSyncService(this); this.commandExecutor = new CommandSyncService(this);
} }
@ -434,7 +434,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public RFuture<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec, final RedisPubSubListener<?>... listeners) { public RFuture<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec, final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName); final AsyncSemaphore lock = getSemaphore(channelName);
final RPromise<PubSubConnectionEntry> result = newPromise(); final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
lock.acquire(new Runnable() { lock.acquire(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -446,14 +446,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) { public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = newPromise(); RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners); subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
return promise; return promise;
} }
public RFuture<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?>... listeners) { public RFuture<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName); final AsyncSemaphore lock = getSemaphore(channelName);
final RPromise<PubSubConnectionEntry> result = newPromise(); final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
lock.acquire(new Runnable() { lock.acquire(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -465,7 +465,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) { public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = newPromise(); RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners); subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners);
return promise; return promise;
} }
@ -626,7 +626,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
final Codec entryCodec = entry.getConnection().getChannels().get(channelName); final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
if (temporaryDown) { if (temporaryDown) {
final RPromise<Codec> result = newPromise(); final RPromise<Codec> result = new RedissonPromise<Codec>();
entry.unsubscribe(channelName, new BaseRedisPubSubListener() { entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override @Override
@ -642,7 +642,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return result; return result;
} }
entry.unsubscribe(channelName, null); entry.unsubscribe(channelName, null);
return newSucceededFuture(entryCodec); return RedissonPromise.newSucceededFuture(entryCodec);
} }
public Codec punsubscribe(final String channelName, final AsyncSemaphore lock) { public Codec punsubscribe(final String channelName, final AsyncSemaphore lock) {
@ -685,7 +685,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
final Codec entryCodec = entry.getConnection().getChannels().get(channelName); final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
if (temporaryDown) { if (temporaryDown) {
final RPromise<Codec> result = newPromise(); final RPromise<Codec> result = new RedissonPromise<Codec>();
entry.punsubscribe(channelName, new BaseRedisPubSubListener() { entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override @Override
@ -701,7 +701,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return result; return result;
} }
entry.punsubscribe(channelName, null); entry.punsubscribe(channelName, null);
return newSucceededFuture(entryCodec); return RedissonPromise.newSucceededFuture(entryCodec);
} }
public MasterSlaveEntry getEntry(InetSocketAddress address) { public MasterSlaveEntry getEntry(InetSocketAddress address) {
@ -898,21 +898,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return Collections.unmodifiableCollection(clientEntries.values()); return Collections.unmodifiableCollection(clientEntries.values());
} }
@Override
public <R> RPromise<R> newPromise() {
return new RedissonPromise<R>();
}
@Override
public <R> RFuture<R> newSucceededFuture(R value) {
return RedissonPromise.newSucceededFuture(value);
}
@Override
public <R> RFuture<R> newFailedFuture(Throwable cause) {
return RedissonPromise.newFailedFuture(cause);
}
@Override @Override
public EventLoopGroup getGroup() { public EventLoopGroup getGroup() {
return group; return group;

@ -36,6 +36,7 @@ import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReplicatedServersConfig; import org.redisson.config.ReplicatedServersConfig;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -117,11 +118,11 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
private RFuture<RedisConnection> connect(BaseMasterSlaveServersConfig<?> cfg, final URI addr) { private RFuture<RedisConnection> connect(BaseMasterSlaveServersConfig<?> cfg, final URI addr) {
RedisConnection connection = nodeConnections.get(addr); RedisConnection connection = nodeConnections.get(addr);
if (connection != null) { if (connection != null) {
return newSucceededFuture(connection); return RedissonPromise.newSucceededFuture(connection);
} }
RedisClient client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts()); RedisClient client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts());
final RPromise<RedisConnection> result = newPromise(); final RPromise<RedisConnection> result = new RedissonPromise<RedisConnection>();
RFuture<RedisConnection> future = client.connectAsync(); RFuture<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() { future.addListener(new FutureListener<RedisConnection>() {
@Override @Override

@ -40,6 +40,7 @@ import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.SentinelServersConfig; import org.redisson.config.SentinelServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -160,7 +161,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
RedisClient client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts()); RedisClient client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());
RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client); RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client);
if (oldClient != null) { if (oldClient != null) {
return newSucceededFuture(null); return RedissonPromise.newSucceededFuture(null);
} }
RFuture<RedisPubSubConnection> pubsubFuture = client.connectPubSubAsync(); RFuture<RedisPubSubConnection> pubsubFuture = client.connectPubSubAsync();

@ -239,7 +239,7 @@ public class LoadBalancerManager {
return slaveConnectionPool.get(command, entry); return slaveConnectionPool.get(command, entry);
} }
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr); RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr);
return connectionManager.newFailedFuture(exception); return RedissonPromise.newFailedFuture(exception);
} }
public RFuture<RedisConnection> nextConnection(RedisCommand<?> command) { public RFuture<RedisConnection> nextConnection(RedisCommand<?> command) {

@ -35,6 +35,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -69,7 +70,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
public RFuture<Void> add(final ClientConnectionsEntry entry) { public RFuture<Void> add(final ClientConnectionsEntry entry) {
final RPromise<Void> promise = connectionManager.newPromise(); final RPromise<Void> promise = new RedissonPromise<Void>();
promise.addListener(new FutureListener<Void>() { promise.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
@ -112,7 +113,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
@Override @Override
public void run() { public void run() {
RPromise<T> promise = connectionManager.newPromise(); RPromise<T> promise = new RedissonPromise<T>();
createConnection(entry, promise); createConnection(entry, promise);
promise.addListener(new FutureListener<T>() { promise.addListener(new FutureListener<T>() {
@Override @Override
@ -196,7 +197,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
RedisConnectionException exception = new RedisConnectionException(errorMsg.toString()); RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());
return connectionManager.newFailedFuture(exception); return RedissonPromise.newFailedFuture(exception);
} }
public RFuture<T> get(RedisCommand<?> command, ClientConnectionsEntry entry) { public RFuture<T> get(RedisCommand<?> command, ClientConnectionsEntry entry) {
@ -207,7 +208,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
RedisConnectionException exception = new RedisConnectionException( RedisConnectionException exception = new RedisConnectionException(
"Can't aquire connection to " + entry); "Can't aquire connection to " + entry);
return connectionManager.newFailedFuture(exception); return RedissonPromise.newFailedFuture(exception);
} }
public static abstract class AcquireCallback<T> implements Runnable, FutureListener<T> { public static abstract class AcquireCallback<T> implements Runnable, FutureListener<T> {
@ -215,7 +216,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
private RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) { private RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) {
final RPromise<T> result = connectionManager.newPromise(); final RPromise<T> result = new RedissonPromise<T>();
AcquireCallback<T> callback = new AcquireCallback<T>() { AcquireCallback<T> callback = new AcquireCallback<T>() {
@Override @Override
@ -386,7 +387,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
if (future.isSuccess() && "PONG".equals(future.getNow())) { if (future.isSuccess() && "PONG".equals(future.getNow())) {
entry.resetFailedAttempts(); entry.resetFailedAttempts();
RPromise<Void> promise = connectionManager.newPromise(); RPromise<Void> promise = new RedissonPromise<Void>();
promise.addListener(new FutureListener<Void>() { promise.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) public void operationComplete(Future<Void> future)

@ -32,6 +32,7 @@ import org.redisson.api.mapreduce.RReducer;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener; import org.redisson.misc.TransferListener;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -94,7 +95,7 @@ abstract class MapReduceExecutor<M, VIn, KOut, VOut> implements RMapReduceExecut
@Override @Override
public RFuture<Map<KOut, VOut>> executeAsync() { public RFuture<Map<KOut, VOut>> executeAsync() {
final RPromise<Map<KOut, VOut>> promise = connectionManager.newPromise(); final RPromise<Map<KOut, VOut>> promise = new RedissonPromise<Map<KOut, VOut>>();
final RFuture<Void> future = executeMapperAsync(resultMapName, null); final RFuture<Void> future = executeMapperAsync(resultMapName, null);
addCancelHandling(promise, future); addCancelHandling(promise, future);
future.addListener(new FutureListener<Void>() { future.addListener(new FutureListener<Void>() {

@ -30,6 +30,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.PubSubConnectionEntry; import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.AsyncSemaphore;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -64,7 +65,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
return new NettyFuturePublisher<Integer>(new Supplier<RFuture<Integer>>() { return new NettyFuturePublisher<Integer>(new Supplier<RFuture<Integer>>() {
@Override @Override
public RFuture<Integer> get() { public RFuture<Integer> get() {
RPromise<Integer> promise = commandExecutor.getConnectionManager().newPromise(); RPromise<Integer> promise = new RedissonPromise<Integer>();
addListener(new PubSubPatternStatusListener(listener, name), promise); addListener(new PubSubPatternStatusListener(listener, name), promise);
return promise; return promise;
} }
@ -76,7 +77,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
return new NettyFuturePublisher<Integer>(new Supplier<RFuture<Integer>>() { return new NettyFuturePublisher<Integer>(new Supplier<RFuture<Integer>>() {
@Override @Override
public RFuture<Integer> get() { public RFuture<Integer> get() {
RPromise<Integer> promise = commandExecutor.getConnectionManager().newPromise(); RPromise<Integer> promise = new RedissonPromise<Integer>();
PubSubPatternMessageListener<M> pubSubListener = new PubSubPatternMessageListener<M>(listener, name); PubSubPatternMessageListener<M> pubSubListener = new PubSubPatternMessageListener<M>(listener, name);
addListener(pubSubListener, promise); addListener(pubSubListener, promise);
return promise; return promise;

Loading…
Cancel
Save