diff --git a/redisson/src/main/java/org/redisson/ElementsSubscribeService.java b/redisson/src/main/java/org/redisson/ElementsSubscribeService.java index b728ce53a..6ec334763 100644 --- a/redisson/src/main/java/org/redisson/ElementsSubscribeService.java +++ b/redisson/src/main/java/org/redisson/ElementsSubscribeService.java @@ -17,10 +17,10 @@ package org.redisson; import org.redisson.api.RFuture; import org.redisson.connection.ConnectionManager; -import org.redisson.misc.RedissonPromise; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; @@ -30,7 +30,7 @@ import java.util.function.Supplier; */ public class ElementsSubscribeService { - private final Map> subscribeListeners = new HashMap<>(); + private final Map> subscribeListeners = new HashMap<>(); private final ConnectionManager connectionManager; public ElementsSubscribeService(ConnectionManager connectionManager) { @@ -40,7 +40,7 @@ public class ElementsSubscribeService { public int subscribeOnElements(Supplier> func, Consumer consumer) { int id = System.identityHashCode(consumer); synchronized (subscribeListeners) { - RFuture currentFuture = subscribeListeners.putIfAbsent(id, RedissonPromise.newSucceededFuture(null)); + CompletableFuture currentFuture = subscribeListeners.putIfAbsent(id, CompletableFuture.completedFuture(null)); if (currentFuture != null) { throw new IllegalArgumentException("Consumer object with listener id " + id + " already registered"); } @@ -50,7 +50,7 @@ public class ElementsSubscribeService { } public void unsubscribe(int listenerId) { - RFuture f; + CompletableFuture f; synchronized (subscribeListeners) { f = subscribeListeners.remove(listenerId); } @@ -65,13 +65,13 @@ public class ElementsSubscribeService { return; } - RFuture f; + CompletableFuture f; synchronized (subscribeListeners) { if (!subscribeListeners.containsKey(listenerId)) { return; } - f = func.get(); + f = func.get().toCompletableFuture(); subscribeListeners.put(listenerId, f); } diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index 51571f25d..f4ac808ab 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -25,7 +25,6 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.ListDrainToDecoder; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RedissonPromise; import java.time.Duration; import java.util.*; @@ -140,7 +139,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public RFuture removeAllAsync(Collection c) { if (c.isEmpty()) { - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); @@ -388,7 +387,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public RFuture addAllAsync(Collection c) { if (c.isEmpty()) { - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } RedissonQueueSemaphore semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName()); diff --git a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java index 8b0c6ac5d..dc06979e8 100644 --- a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java @@ -31,7 +31,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; /** * @@ -72,7 +72,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "return v[2]; " + "end " + "return nil;", - Arrays.asList(getRawName(), timeoutSetName, queueName), + Arrays.asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } @@ -112,7 +112,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;", - Arrays.asList(getRawName(), timeoutSetName, queueName, channelName), + Arrays.asList(getRawName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e)); } @@ -320,7 +320,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay @Override public RFuture containsAllAsync(Collection c) { if (c.isEmpty()) { - return RedissonPromise.newSucceededFuture(true); + return new CompletableFutureWrapper<>(true); } return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, @@ -352,7 +352,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay @Override public RFuture removeAllAsync(Collection c) { if (c.isEmpty()) { - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, @@ -376,7 +376,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "i = i + 1;" + "end; " + "return result;", - Arrays.asList(queueName, timeoutSetName), encode(c).toArray()); + Arrays.asList(queueName, timeoutSetName), encode(c).toArray()); } @Override @@ -415,7 +415,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "i = i + 1; " + "end; " + "return changed; ", - Collections.singletonList(queueName), encode(c).toArray()); + Collections.singletonList(queueName), encode(c).toArray()); } @Override @@ -430,7 +430,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay @Override public RFuture sizeInMemoryAsync() { - List keys = Arrays.asList(queueName, timeoutSetName); + List keys = Arrays.asList(queueName, timeoutSetName); return super.sizeInMemoryAsync(keys); } @@ -458,7 +458,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "return value; " + "end " + "return nil;", - Arrays.asList(queueName)); + Arrays.asList(queueName)); } @Override @@ -471,7 +471,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "return value; " + "end " + "return nil;", - Arrays.asList(queueName, timeoutSetName)); + Arrays.asList(queueName, timeoutSetName)); } @Override @@ -490,7 +490,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "return value; " + "end " + "return nil;", - Arrays.asList(this.queueName, timeoutSetName, queueName)); + Arrays.asList(this.queueName, timeoutSetName, queueName)); } @Override @@ -505,7 +505,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "end; " + "end;" + "return 0;", - Collections.singletonList(queueName), encode(o)); + Collections.singletonList(queueName), encode(o)); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonIdGenerator.java b/redisson/src/main/java/org/redisson/RedissonIdGenerator.java index fbe31f6c3..f74155dd0 100644 --- a/redisson/src/main/java/org/redisson/RedissonIdGenerator.java +++ b/redisson/src/main/java/org/redisson/RedissonIdGenerator.java @@ -21,14 +21,14 @@ import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,7 +71,7 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat private final AtomicLong start = new AtomicLong(); private final AtomicLong counter = new AtomicLong(); - private final Queue> queue = new ConcurrentLinkedQueue<>(); + private final Queue> queue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean isWorkerActive = new AtomicBoolean(); private void send() { @@ -92,8 +92,8 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat long v = counter.decrementAndGet(); if (v >= 0) { - RPromise pp = queue.poll(); - pp.trySuccess(start.incrementAndGet()); + CompletableFuture pp = queue.poll(); + pp.complete(start.incrementAndGet()); } else { try { RFuture> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST, @@ -117,9 +117,9 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat start.set(value); counter.set(allocationSize); - RPromise pp = queue.poll(); + CompletableFuture pp = queue.poll(); counter.decrementAndGet(); - pp.trySuccess(start.get()); + pp.complete(start.get()); } catch (Exception e) { if (e instanceof RedissonShutdownException) { break; @@ -138,10 +138,10 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat @Override public RFuture nextIdAsync() { - RPromise promise = new RedissonPromise<>(); + CompletableFuture promise = new CompletableFuture<>(); queue.add(promise); send(); - return promise; + return new CompletableFutureWrapper<>(promise); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonLexSortedSet.java b/redisson/src/main/java/org/redisson/RedissonLexSortedSet.java index 313e4a45c..bf7251e9f 100644 --- a/redisson/src/main/java/org/redisson/RedissonLexSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonLexSortedSet.java @@ -28,7 +28,7 @@ import org.redisson.api.RedissonClient; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; /** * Sorted set contained values of String type @@ -283,7 +283,7 @@ public class RedissonLexSortedSet extends RedissonScoredSortedSet implem @Override public RFuture addAllAsync(Collection c) { if (c.isEmpty()) { - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } List params = new ArrayList(2*c.size()); params.add(getRawName()); diff --git a/redisson/src/main/java/org/redisson/RedissonListMultimap.java b/redisson/src/main/java/org/redisson/RedissonListMultimap.java index eaa8940c6..87503b45e 100644 --- a/redisson/src/main/java/org/redisson/RedissonListMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonListMultimap.java @@ -24,7 +24,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; import java.util.*; import java.util.Map.Entry; @@ -197,7 +197,7 @@ public class RedissonListMultimap extends RedissonMultimap implement @Override public RFuture removeAllAsync(Collection c) { if (c.isEmpty()) { - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } List args = new ArrayList(c.size() + 1); diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index ef0ec4265..dfa665226 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -32,7 +32,6 @@ import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.iterator.RedissonMapIterator; import org.redisson.mapreduce.RedissonMapReduce; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -544,7 +543,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public RFuture> getAllAsync(Set keys) { if (keys.isEmpty()) { - return RedissonPromise.newSucceededFuture(Collections.emptyMap()); + return new CompletableFutureWrapper<>(Collections.emptyMap()); } RFuture> future = getAllOperationAsync(keys); @@ -640,7 +639,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public final RFuture putAllAsync(Map map) { if (map.isEmpty()) { - return RedissonPromise.newSucceededFuture(null); + return new CompletableFutureWrapper<>((Void) null); } RFuture future = putAllOperationAsync(map); @@ -1106,7 +1105,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { keys = options.getLoader().loadAllKeys(); } catch (Exception e) { log.error("Unable to load keys for map " + getRawName(), e); - return RedissonPromise.newFailedFuture(e); + return new CompletableFutureWrapper<>(e); } return loadAllAsync(keys, replaceExistingValues, parallelism, null); } @@ -1153,7 +1152,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { } } catch (Exception e) { log.error("Unable to load keys for map " + getRawName(), e); - return RedissonPromise.newFailedFuture(e); + return new CompletableFutureWrapper<>(e); } CompletableFuture f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); @@ -1292,7 +1291,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { } if (keys.length == 0) { - return RedissonPromise.newSucceededFuture(0L); + return new CompletableFutureWrapper<>(0L); } if (hasNoWriter()) { diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 906f83c91..6e9ffb9af 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -33,7 +33,6 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.eviction.EvictionScheduler; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RedissonPromise; import java.math.BigDecimal; import java.util.*; @@ -773,7 +772,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override public RFuture putAllAsync(Map map, long ttl, TimeUnit ttlUnit) { if (map.isEmpty()) { - return RedissonPromise.newSucceededFuture(null); + return new CompletableFutureWrapper<>((Void) null); } RFuture future = putAllOperationAsync(map, ttl, ttlUnit); diff --git a/redisson/src/main/java/org/redisson/RedissonMultimap.java b/redisson/src/main/java/org/redisson/RedissonMultimap.java index 82def6469..54b326c9f 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonMultimap.java @@ -29,7 +29,6 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.iterator.RedissonBaseMapIterator; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.Hash; -import org.redisson.misc.RedissonPromise; import java.util.*; import java.util.Map.Entry; @@ -221,7 +220,7 @@ public abstract class RedissonMultimap extends RedissonExpirable implement @Override public RFuture fastRemoveAsync(K... keys) { if (keys == null || keys.length == 0) { - return RedissonPromise.newSucceededFuture(0L); + return new CompletableFutureWrapper<>(0L); } List mapKeys = new ArrayList(keys.length); diff --git a/redisson/src/main/java/org/redisson/RedissonRingBuffer.java b/redisson/src/main/java/org/redisson/RedissonRingBuffer.java index 4cb23c0f4..fd797d66e 100644 --- a/redisson/src/main/java/org/redisson/RedissonRingBuffer.java +++ b/redisson/src/main/java/org/redisson/RedissonRingBuffer.java @@ -29,7 +29,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; /** * @@ -94,7 +94,7 @@ public class RedissonRingBuffer extends RedissonQueue implements RRingBuff @Override public RFuture addAllAsync(Collection c) { if (c.isEmpty()) { - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } List args = new ArrayList<>(c.size()); diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimap.java b/redisson/src/main/java/org/redisson/RedissonSetMultimap.java index 5b1188778..710ddd244 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimap.java @@ -26,7 +26,7 @@ import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; import java.util.*; import java.util.Map.Entry; @@ -175,7 +175,7 @@ public class RedissonSetMultimap extends RedissonMultimap implements @Override public RFuture removeAllAsync(Collection c) { if (c.isEmpty()) { - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } List args = new ArrayList(c.size() + 1); @@ -289,7 +289,7 @@ public class RedissonSetMultimap extends RedissonMultimap implements @Override RedissonSetMultimapIterator> entryIterator() { - return new RedissonSetMultimapIterator>(RedissonSetMultimap.this, commandExecutor, codec); + return new RedissonSetMultimapIterator<>(RedissonSetMultimap.this, commandExecutor, codec); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java index 568533daf..f956b5ddd 100644 --- a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java +++ b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java @@ -28,7 +28,7 @@ import org.redisson.client.protocol.decoder.TimeSeriesEntryReplayDecoder; import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.iterator.RedissonBaseIterator; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; import java.util.*; import java.util.concurrent.ThreadLocalRandom; @@ -504,7 +504,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer @Override public RFuture> pollFirstAsync(int count) { if (count <= 0) { - return RedissonPromise.newSucceededFuture(Collections.emptyList()); + return new CompletableFutureWrapper<>(Collections.emptyList()); } return pollAsync(0, count, RedisCommands.EVAL_LIST); @@ -513,7 +513,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer @Override public RFuture> pollLastAsync(int count) { if (count <= 0) { - return RedissonPromise.newSucceededFuture(Collections.emptyList()); + return new CompletableFutureWrapper<>(Collections.emptyList()); } return pollAsync(-1, count, RedisCommands.EVAL_LIST_REVERSE); } @@ -652,7 +652,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer @Override public RFuture sizeInMemoryAsync() { - List keys = Arrays.asList(getRawName(), getTimeoutSetName()); + List keys = Arrays.asList(getRawName(), getTimeoutSetName()); return super.sizeInMemoryAsync(keys); } diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index dd211e96d..ed9a6156c 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -28,7 +28,6 @@ import org.redisson.client.handler.CommandsQueuePubSub; import org.redisson.client.protocol.*; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.LogHelper; -import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,7 +236,7 @@ public class RedisConnection implements RedisCommands { if (redisClient.getEventLoopGroup().isShuttingDown()) { RedissonShutdownException cause = new RedissonShutdownException("Redisson is shutdown"); - return RedissonPromise.newFailedFuture(cause); + return new CompletableFutureWrapper<>(cause); } Timeout scheduledFuture = redisClient.getTimer().newTimeout(t -> { diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 906c555dc..6172c2db8 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -36,8 +36,6 @@ import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -291,14 +289,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private RFuture allAsync(boolean readOnlyMode, Codec codec, RedisCommand command, SlotCallback callback, Object... params) { - RPromise mainPromise = new RedissonPromise(); + CompletableFuture mainPromise = new CompletableFuture(); Collection nodes = connectionManager.getEntrySet(); AtomicInteger counter = new AtomicInteger(nodes.size()); BiConsumer listener = new BiConsumer() { @Override public void accept(T result, Throwable u) { if (u != null && !(u instanceof RedisRedirectException)) { - mainPromise.tryFailure(u); + mainPromise.completeExceptionally(u); return; } @@ -311,9 +309,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { } if (counter.decrementAndGet() == 0) { if (callback != null) { - mainPromise.trySuccess(callback.onFinish()); + mainPromise.complete(callback.onFinish()); } else { - mainPromise.trySuccess(null); + mainPromise.complete(null); } } } @@ -323,7 +321,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { RFuture promise = async(readOnlyMode, new NodeSource(entry), codec, command, params, true, false); promise.whenComplete(listener); } - return mainPromise; + return new CompletableFutureWrapper(mainPromise); } public RedisException convertException(ExecutionException e) { @@ -420,7 +418,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { return getConnectionManager().getCfg().isUseScriptCache(); } - private static final Map SHA_CACHE = new LRUCacheMap(500, 0, 0); + private static final Map SHA_CACHE = new LRUCacheMap<>(500, 0, 0); private String calcSHA(String script) { String digest = SHA_CACHE.get(script); @@ -438,7 +436,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private Object[] copy(Object[] params) { - List result = new ArrayList(); + List result = new ArrayList<>(); for (Object object : params) { if (object instanceof ByteBuf) { ByteBuf b = (ByteBuf) object; diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 2dea3ee99..dc20c3ce1 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -34,8 +34,6 @@ import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import java.util.*; import java.util.concurrent.*; @@ -193,7 +191,7 @@ public class CommandBatchService extends CommandAsyncService { .flatMap(e -> e.getCommands().stream()) .flatMap(c -> Arrays.stream(c.getParams())) .forEach(obj -> ReferenceCountUtil.safeRelease(obj)); - return RedissonPromise.newSucceededFuture(null); + return new CompletableFutureWrapper<>((Void) null); } public BatchResult execute() { @@ -219,7 +217,7 @@ public class CommandBatchService extends CommandAsyncService { if (commands.isEmpty()) { executed.set(true); BatchResult result = new BatchResult<>(Collections.emptyList(), 0); - return RedissonPromise.newSucceededFuture(result); + return new CompletableFutureWrapper<>(result); } if (isRedisBasedQueue()) { @@ -252,7 +250,7 @@ public class CommandBatchService extends CommandAsyncService { } } - RPromise> promise = new RedissonPromise<>(); + CompletableFuture> promise = new CompletableFuture<>(); CompletableFuture voidPromise = new CompletableFuture<>(); if (this.options.isSkipResult() && this.options.getSyncSlaves() == 0) { @@ -264,7 +262,7 @@ public class CommandBatchService extends CommandAsyncService { e.getCommands().forEach(t -> t.tryFailure(ex)); } - promise.tryFailure(ex); + promise.completeExceptionally(ex); commands.clear(); nestedServices.clear(); @@ -273,7 +271,7 @@ public class CommandBatchService extends CommandAsyncService { commands.clear(); nestedServices.clear(); - promise.trySuccess(new BatchResult<>(Collections.emptyList(), 0)); + promise.complete(new BatchResult<>(Collections.emptyList(), 0)); }); } else { voidPromise.whenComplete((res, ex) -> { @@ -283,7 +281,7 @@ public class CommandBatchService extends CommandAsyncService { e.getCommands().forEach(t -> t.tryFailure(ex)); } - promise.tryFailure(ex); + promise.completeExceptionally(ex); commands.clear(); nestedServices.clear(); @@ -321,7 +319,7 @@ public class CommandBatchService extends CommandAsyncService { } BatchResult result = new BatchResult(responses, syncedSlaves); - promise.trySuccess(result); + promise.complete(result); commands.clear(); nestedServices.clear(); @@ -346,7 +344,7 @@ public class CommandBatchService extends CommandAsyncService { connectionManager, this.options, e.getValue(), slots, referenceType, false); executor.execute(); } - return promise; + return new CompletableFutureWrapper<>(promise); } protected Throwable cause(CompletableFuture future) { @@ -361,7 +359,7 @@ public class CommandBatchService extends CommandAsyncService { } private RFuture executeRedisBasedQueue() { - RPromise resultPromise = new RedissonPromise(); + CompletableFuture resultPromise = new CompletableFuture(); long responseTimeout; if (options.getResponseTimeout() > 0) { responseTimeout = options.getResponseTimeout(); @@ -376,7 +374,7 @@ public class CommandBatchService extends CommandAsyncService { c.getCancelCallback().run(); }); - resultPromise.tryFailure(new RedisTimeoutException("Response timeout for queued commands " + responseTimeout + ": " + + resultPromise.completeExceptionally(new RedisTimeoutException("Response timeout for queued commands " + responseTimeout + ": " + commands.values().stream() .flatMap(e -> e.getCommands().stream().map(d -> d.getCommand())) .collect(Collectors.toList()))); @@ -397,7 +395,7 @@ public class CommandBatchService extends CommandAsyncService { for (BatchCommandData command : entry.getCommands()) { if (command.getPromise().isDone() && command.getPromise().isCompletedExceptionally()) { - resultPromise.tryFailure(cause(command.getPromise())); + resultPromise.completeExceptionally(cause(command.getPromise())); break; } } @@ -429,7 +427,7 @@ public class CommandBatchService extends CommandAsyncService { future.whenComplete((res, ex) -> { executed.set(true); if (ex != null) { - resultPromise.tryFailure(ex); + resultPromise.completeExceptionally(ex); return; } @@ -472,13 +470,13 @@ public class CommandBatchService extends CommandAsyncService { } } BatchResult r = new BatchResult<>(responses, syncedSlaves); - resultPromise.trySuccess((R) r); + resultPromise.complete((R) r); } catch (Exception e) { - resultPromise.tryFailure(e); + resultPromise.completeExceptionally(e); } }); }); - return resultPromise; + return new CompletableFutureWrapper<>(resultPromise); } protected boolean isRedisBasedQueue() { diff --git a/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java b/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java index ddcfe587c..2d5e691dc 100644 --- a/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java +++ b/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java @@ -38,6 +38,11 @@ public class CompletableFutureWrapper implements RFuture { this(CompletableFuture.completedFuture(value)); } + public CompletableFutureWrapper(Throwable ex) { + this(new CompletableFuture<>()); + this.future.completeExceptionally(ex); + } + public CompletableFutureWrapper(CompletionStage stage) { this.future = stage.toCompletableFuture(); this.lastFuture = future; diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java index faaa50bf8..ec84eb5f7 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java @@ -114,7 +114,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { checkState(); if (state != null) { if (state == NULL) { - return new CompletableFutureWrapper<>(null); + return new CompletableFutureWrapper<>((Boolean) null); } else { return new CompletableFutureWrapper<>(true); }