refactoring

pull/4239/head
Nikita Koksharov 3 years ago
parent b97631b732
commit f13b2adf82

@ -17,10 +17,10 @@ package org.redisson;
import org.redisson.api.RFuture;
import org.redisson.connection.ConnectionManager;
import org.redisson.misc.RedissonPromise;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -30,7 +30,7 @@ import java.util.function.Supplier;
*/
public class ElementsSubscribeService {
private final Map<Integer, RFuture<?>> subscribeListeners = new HashMap<>();
private final Map<Integer, CompletableFuture<?>> subscribeListeners = new HashMap<>();
private final ConnectionManager connectionManager;
public ElementsSubscribeService(ConnectionManager connectionManager) {
@ -40,7 +40,7 @@ public class ElementsSubscribeService {
public <V> int subscribeOnElements(Supplier<RFuture<V>> func, Consumer<V> consumer) {
int id = System.identityHashCode(consumer);
synchronized (subscribeListeners) {
RFuture<?> currentFuture = subscribeListeners.putIfAbsent(id, RedissonPromise.newSucceededFuture(null));
CompletableFuture<?> currentFuture = subscribeListeners.putIfAbsent(id, CompletableFuture.completedFuture(null));
if (currentFuture != null) {
throw new IllegalArgumentException("Consumer object with listener id " + id + " already registered");
}
@ -50,7 +50,7 @@ public class ElementsSubscribeService {
}
public void unsubscribe(int listenerId) {
RFuture<?> f;
CompletableFuture<?> f;
synchronized (subscribeListeners) {
f = subscribeListeners.remove(listenerId);
}
@ -65,13 +65,13 @@ public class ElementsSubscribeService {
return;
}
RFuture<V> f;
CompletableFuture<V> f;
synchronized (subscribeListeners) {
if (!subscribeListeners.containsKey(listenerId)) {
return;
}
f = func.get();
f = func.get().toCompletableFuture();
subscribeListeners.put(listenerId, f);
}

@ -25,7 +25,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedissonPromise;
import java.time.Duration;
import java.util.*;
@ -140,7 +139,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) {
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
@ -388,7 +387,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public RFuture<Boolean> addAllAsync(Collection<? extends V> c) {
if (c.isEmpty()) {
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
RedissonQueueSemaphore semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName());

@ -31,7 +31,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
/**
*
@ -72,7 +72,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
Arrays.asList(getRawName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);
}
@ -112,7 +112,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "if v[1] == value then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;",
Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),
Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
}
@ -320,7 +320,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public RFuture<Boolean> containsAllAsync(Collection<?> c) {
if (c.isEmpty()) {
return RedissonPromise.newSucceededFuture(true);
return new CompletableFutureWrapper<>(true);
}
return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
@ -352,7 +352,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) {
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
@ -376,7 +376,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "i = i + 1;"
+ "end; "
+ "return result;",
Arrays.<Object>asList(queueName, timeoutSetName), encode(c).toArray());
Arrays.asList(queueName, timeoutSetName), encode(c).toArray());
}
@Override
@ -415,7 +415,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "i = i + 1; "
+ "end; "
+ "return changed; ",
Collections.<Object>singletonList(queueName), encode(c).toArray());
Collections.singletonList(queueName), encode(c).toArray());
}
@Override
@ -430,7 +430,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.<Object>asList(queueName, timeoutSetName);
List<Object> keys = Arrays.asList(queueName, timeoutSetName);
return super.sizeInMemoryAsync(keys);
}
@ -458,7 +458,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(queueName));
Arrays.asList(queueName));
}
@Override
@ -471,7 +471,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(queueName, timeoutSetName));
Arrays.asList(queueName, timeoutSetName));
}
@Override
@ -490,7 +490,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(this.queueName, timeoutSetName, queueName));
Arrays.asList(this.queueName, timeoutSetName, queueName));
}
@Override
@ -505,7 +505,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "end; "
+ "end;" +
"return 0;",
Collections.<Object>singletonList(queueName), encode(o));
Collections.singletonList(queueName), encode(o));
}
@Override

@ -21,14 +21,14 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -71,7 +71,7 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat
private final AtomicLong start = new AtomicLong();
private final AtomicLong counter = new AtomicLong();
private final Queue<RPromise<Long>> queue = new ConcurrentLinkedQueue<>();
private final Queue<CompletableFuture<Long>> queue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean isWorkerActive = new AtomicBoolean();
private void send() {
@ -92,8 +92,8 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat
long v = counter.decrementAndGet();
if (v >= 0) {
RPromise<Long> pp = queue.poll();
pp.trySuccess(start.incrementAndGet());
CompletableFuture<Long> pp = queue.poll();
pp.complete(start.incrementAndGet());
} else {
try {
RFuture<List<Object>> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
@ -117,9 +117,9 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat
start.set(value);
counter.set(allocationSize);
RPromise<Long> pp = queue.poll();
CompletableFuture<Long> pp = queue.poll();
counter.decrementAndGet();
pp.trySuccess(start.get());
pp.complete(start.get());
} catch (Exception e) {
if (e instanceof RedissonShutdownException) {
break;
@ -138,10 +138,10 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat
@Override
public RFuture<Long> nextIdAsync() {
RPromise<Long> promise = new RedissonPromise<>();
CompletableFuture<Long> promise = new CompletableFuture<>();
queue.add(promise);
send();
return promise;
return new CompletableFutureWrapper<>(promise);
}
@Override

@ -28,7 +28,7 @@ import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
/**
* Sorted set contained values of String type
@ -283,7 +283,7 @@ public class RedissonLexSortedSet extends RedissonScoredSortedSet<String> implem
@Override
public RFuture<Boolean> addAllAsync(Collection<? extends String> c) {
if (c.isEmpty()) {
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
List<Object> params = new ArrayList<Object>(2*c.size());
params.add(getRawName());

@ -24,7 +24,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
import java.util.*;
import java.util.Map.Entry;
@ -197,7 +197,7 @@ public class RedissonListMultimap<K, V> extends RedissonMultimap<K, V> implement
@Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) {
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
List<Object> args = new ArrayList<Object>(c.size() + 1);

@ -32,7 +32,6 @@ import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.iterator.RedissonMapIterator;
import org.redisson.mapreduce.RedissonMapReduce;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -544,7 +543,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Map<K, V>> getAllAsync(Set<K> keys) {
if (keys.isEmpty()) {
return RedissonPromise.newSucceededFuture(Collections.emptyMap());
return new CompletableFutureWrapper<>(Collections.emptyMap());
}
RFuture<Map<K, V>> future = getAllOperationAsync(keys);
@ -640,7 +639,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public final RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) {
if (map.isEmpty()) {
return RedissonPromise.newSucceededFuture(null);
return new CompletableFutureWrapper<>((Void) null);
}
RFuture<Void> future = putAllOperationAsync(map);
@ -1106,7 +1105,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
keys = options.getLoader().loadAllKeys();
} catch (Exception e) {
log.error("Unable to load keys for map " + getRawName(), e);
return RedissonPromise.newFailedFuture(e);
return new CompletableFutureWrapper<>(e);
}
return loadAllAsync(keys, replaceExistingValues, parallelism, null);
}
@ -1153,7 +1152,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
} catch (Exception e) {
log.error("Unable to load keys for map " + getRawName(), e);
return RedissonPromise.newFailedFuture(e);
return new CompletableFutureWrapper<>(e);
}
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
@ -1292,7 +1291,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
if (keys.length == 0) {
return RedissonPromise.newSucceededFuture(0L);
return new CompletableFutureWrapper<>(0L);
}
if (hasNoWriter()) {

@ -33,7 +33,6 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedissonPromise;
import java.math.BigDecimal;
import java.util.*;
@ -773,7 +772,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, long ttl, TimeUnit ttlUnit) {
if (map.isEmpty()) {
return RedissonPromise.newSucceededFuture(null);
return new CompletableFutureWrapper<>((Void) null);
}
RFuture<Void> future = putAllOperationAsync(map, ttl, ttlUnit);

@ -29,7 +29,6 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseMapIterator;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.Map.Entry;
@ -221,7 +220,7 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
@Override
public RFuture<Long> fastRemoveAsync(K... keys) {
if (keys == null || keys.length == 0) {
return RedissonPromise.newSucceededFuture(0L);
return new CompletableFutureWrapper<>(0L);
}
List<Object> mapKeys = new ArrayList<Object>(keys.length);

@ -29,7 +29,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
/**
*
@ -94,7 +94,7 @@ public class RedissonRingBuffer<V> extends RedissonQueue<V> implements RRingBuff
@Override
public RFuture<Boolean> addAllAsync(Collection<? extends V> c) {
if (c.isEmpty()) {
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
List<Object> args = new ArrayList<>(c.size());

@ -26,7 +26,7 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
import java.util.*;
import java.util.Map.Entry;
@ -175,7 +175,7 @@ public class RedissonSetMultimap<K, V> extends RedissonMultimap<K, V> implements
@Override
public RFuture<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) {
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
List<Object> args = new ArrayList<Object>(c.size() + 1);
@ -289,7 +289,7 @@ public class RedissonSetMultimap<K, V> extends RedissonMultimap<K, V> implements
@Override
RedissonSetMultimapIterator<K, V, Entry<K, V>> entryIterator() {
return new RedissonSetMultimapIterator<K, V, Map.Entry<K, V>>(RedissonSetMultimap.this, commandExecutor, codec);
return new RedissonSetMultimapIterator<>(RedissonSetMultimap.this, commandExecutor, codec);
}
@Override

@ -28,7 +28,7 @@ import org.redisson.client.protocol.decoder.TimeSeriesEntryReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
@ -504,7 +504,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
@Override
public RFuture<Collection<V>> pollFirstAsync(int count) {
if (count <= 0) {
return RedissonPromise.newSucceededFuture(Collections.emptyList());
return new CompletableFutureWrapper<>(Collections.emptyList());
}
return pollAsync(0, count, RedisCommands.EVAL_LIST);
@ -513,7 +513,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
@Override
public RFuture<Collection<V>> pollLastAsync(int count) {
if (count <= 0) {
return RedissonPromise.newSucceededFuture(Collections.emptyList());
return new CompletableFutureWrapper<>(Collections.emptyList());
}
return pollAsync(-1, count, RedisCommands.EVAL_LIST_REVERSE);
}
@ -652,7 +652,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.<Object>asList(getRawName(), getTimeoutSetName());
List<Object> keys = Arrays.asList(getRawName(), getTimeoutSetName());
return super.sizeInMemoryAsync(keys);
}

@ -28,7 +28,6 @@ import org.redisson.client.handler.CommandsQueuePubSub;
import org.redisson.client.protocol.*;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -237,7 +236,7 @@ public class RedisConnection implements RedisCommands {
if (redisClient.getEventLoopGroup().isShuttingDown()) {
RedissonShutdownException cause = new RedissonShutdownException("Redisson is shutdown");
return RedissonPromise.newFailedFuture(cause);
return new CompletableFutureWrapper<>(cause);
}
Timeout scheduledFuture = redisClient.getTimer().newTimeout(t -> {

@ -36,8 +36,6 @@ import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -291,14 +289,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private <T, R> RFuture<R> allAsync(boolean readOnlyMode, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
CompletableFuture<R> mainPromise = new CompletableFuture<R>();
Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
AtomicInteger counter = new AtomicInteger(nodes.size());
BiConsumer<T, Throwable> listener = new BiConsumer<T, Throwable>() {
@Override
public void accept(T result, Throwable u) {
if (u != null && !(u instanceof RedisRedirectException)) {
mainPromise.tryFailure(u);
mainPromise.completeExceptionally(u);
return;
}
@ -311,9 +309,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
if (counter.decrementAndGet() == 0) {
if (callback != null) {
mainPromise.trySuccess(callback.onFinish());
mainPromise.complete(callback.onFinish());
} else {
mainPromise.trySuccess(null);
mainPromise.complete(null);
}
}
}
@ -323,7 +321,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
RFuture<T> promise = async(readOnlyMode, new NodeSource(entry), codec, command, params, true, false);
promise.whenComplete(listener);
}
return mainPromise;
return new CompletableFutureWrapper<R>(mainPromise);
}
public RedisException convertException(ExecutionException e) {
@ -420,7 +418,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return getConnectionManager().getCfg().isUseScriptCache();
}
private static final Map<String, String> SHA_CACHE = new LRUCacheMap<String, String>(500, 0, 0);
private static final Map<String, String> SHA_CACHE = new LRUCacheMap<>(500, 0, 0);
private String calcSHA(String script) {
String digest = SHA_CACHE.get(script);
@ -438,7 +436,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private Object[] copy(Object[] params) {
List<Object> result = new ArrayList<Object>();
List<Object> result = new ArrayList<>();
for (Object object : params) {
if (object instanceof ByteBuf) {
ByteBuf b = (ByteBuf) object;

@ -34,8 +34,6 @@ import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.concurrent.*;
@ -193,7 +191,7 @@ public class CommandBatchService extends CommandAsyncService {
.flatMap(e -> e.getCommands().stream())
.flatMap(c -> Arrays.stream(c.getParams()))
.forEach(obj -> ReferenceCountUtil.safeRelease(obj));
return RedissonPromise.newSucceededFuture(null);
return new CompletableFutureWrapper<>((Void) null);
}
public BatchResult<?> execute() {
@ -219,7 +217,7 @@ public class CommandBatchService extends CommandAsyncService {
if (commands.isEmpty()) {
executed.set(true);
BatchResult<Object> result = new BatchResult<>(Collections.emptyList(), 0);
return RedissonPromise.newSucceededFuture(result);
return new CompletableFutureWrapper<>(result);
}
if (isRedisBasedQueue()) {
@ -252,7 +250,7 @@ public class CommandBatchService extends CommandAsyncService {
}
}
RPromise<BatchResult<?>> promise = new RedissonPromise<>();
CompletableFuture<BatchResult<?>> promise = new CompletableFuture<>();
CompletableFuture<Void> voidPromise = new CompletableFuture<>();
if (this.options.isSkipResult()
&& this.options.getSyncSlaves() == 0) {
@ -264,7 +262,7 @@ public class CommandBatchService extends CommandAsyncService {
e.getCommands().forEach(t -> t.tryFailure(ex));
}
promise.tryFailure(ex);
promise.completeExceptionally(ex);
commands.clear();
nestedServices.clear();
@ -273,7 +271,7 @@ public class CommandBatchService extends CommandAsyncService {
commands.clear();
nestedServices.clear();
promise.trySuccess(new BatchResult<>(Collections.emptyList(), 0));
promise.complete(new BatchResult<>(Collections.emptyList(), 0));
});
} else {
voidPromise.whenComplete((res, ex) -> {
@ -283,7 +281,7 @@ public class CommandBatchService extends CommandAsyncService {
e.getCommands().forEach(t -> t.tryFailure(ex));
}
promise.tryFailure(ex);
promise.completeExceptionally(ex);
commands.clear();
nestedServices.clear();
@ -321,7 +319,7 @@ public class CommandBatchService extends CommandAsyncService {
}
BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
promise.trySuccess(result);
promise.complete(result);
commands.clear();
nestedServices.clear();
@ -346,7 +344,7 @@ public class CommandBatchService extends CommandAsyncService {
connectionManager, this.options, e.getValue(), slots, referenceType, false);
executor.execute();
}
return promise;
return new CompletableFutureWrapper<>(promise);
}
protected Throwable cause(CompletableFuture<?> future) {
@ -361,7 +359,7 @@ public class CommandBatchService extends CommandAsyncService {
}
private <R> RFuture<R> executeRedisBasedQueue() {
RPromise<R> resultPromise = new RedissonPromise<R>();
CompletableFuture<R> resultPromise = new CompletableFuture<R>();
long responseTimeout;
if (options.getResponseTimeout() > 0) {
responseTimeout = options.getResponseTimeout();
@ -376,7 +374,7 @@ public class CommandBatchService extends CommandAsyncService {
c.getCancelCallback().run();
});
resultPromise.tryFailure(new RedisTimeoutException("Response timeout for queued commands " + responseTimeout + ": " +
resultPromise.completeExceptionally(new RedisTimeoutException("Response timeout for queued commands " + responseTimeout + ": " +
commands.values().stream()
.flatMap(e -> e.getCommands().stream().map(d -> d.getCommand()))
.collect(Collectors.toList())));
@ -397,7 +395,7 @@ public class CommandBatchService extends CommandAsyncService {
for (BatchCommandData<?, ?> command : entry.getCommands()) {
if (command.getPromise().isDone() && command.getPromise().isCompletedExceptionally()) {
resultPromise.tryFailure(cause(command.getPromise()));
resultPromise.completeExceptionally(cause(command.getPromise()));
break;
}
}
@ -429,7 +427,7 @@ public class CommandBatchService extends CommandAsyncService {
future.whenComplete((res, ex) -> {
executed.set(true);
if (ex != null) {
resultPromise.tryFailure(ex);
resultPromise.completeExceptionally(ex);
return;
}
@ -472,13 +470,13 @@ public class CommandBatchService extends CommandAsyncService {
}
}
BatchResult<Object> r = new BatchResult<>(responses, syncedSlaves);
resultPromise.trySuccess((R) r);
resultPromise.complete((R) r);
} catch (Exception e) {
resultPromise.tryFailure(e);
resultPromise.completeExceptionally(e);
}
});
});
return resultPromise;
return new CompletableFutureWrapper<>(resultPromise);
}
protected boolean isRedisBasedQueue() {

@ -38,6 +38,11 @@ public class CompletableFutureWrapper<V> implements RFuture<V> {
this(CompletableFuture.completedFuture(value));
}
public CompletableFutureWrapper(Throwable ex) {
this(new CompletableFuture<>());
this.future.completeExceptionally(ex);
}
public CompletableFutureWrapper(CompletionStage<V> stage) {
this.future = stage.toCompletableFuture();
this.lastFuture = future;

@ -114,7 +114,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
checkState();
if (state != null) {
if (state == NULL) {
return new CompletableFutureWrapper<>(null);
return new CompletableFutureWrapper<>((Boolean) null);
} else {
return new CompletableFutureWrapper<>(true);
}

Loading…
Cancel
Save