From a23572a5521d945b220aa9a44894738ae49b98aa Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 21 Dec 2021 09:42:44 +0300 Subject: [PATCH] refactoring --- .../main/java/org/redisson/RedisNodes.java | 8 +- .../RedissonBoundedBlockingQueue.java | 4 +- .../org/redisson/RedissonCountDownLatch.java | 9 ++- .../org/redisson/RedissonExecutorService.java | 16 +++- .../main/java/org/redisson/RedissonLock.java | 10 ++- .../java/org/redisson/RedissonMultiLock.java | 8 +- .../main/java/org/redisson/RedissonNode.java | 52 ++++++++----- .../RedissonPermitExpirableSemaphore.java | 6 +- .../org/redisson/RedissonRemoteService.java | 4 +- .../java/org/redisson/RedissonSemaphore.java | 7 +- .../org/redisson/RedissonTransferQueue.java | 17 +++- .../main/java/org/redisson/api/RFuture.java | 78 +++++++++++++++---- .../command/CommandAsyncExecutor.java | 3 +- .../redisson/command/CommandAsyncService.java | 54 ++++++------- .../balancer/LoadBalancerManager.java | 5 +- .../org/redisson/executor/TasksService.java | 6 +- .../main/java/org/redisson/jcache/JCache.java | 20 ++--- .../redisson/mapreduce/CoordinatorTask.java | 22 +++--- .../misc/CompletableFutureWrapper.java | 6 +- .../reactive/CommandReactiveService.java | 4 + .../redisson/redisnode/RedissonBaseNodes.java | 9 ++- .../org/redisson/remote/AsyncRemoteProxy.java | 5 +- .../org/redisson/remote/SyncRemoteProxy.java | 38 ++++++--- .../org/redisson/rx/CommandRxService.java | 4 + .../transaction/BaseTransactionalMap.java | 9 +-- .../transaction/BaseTransactionalSet.java | 7 +- .../transaction/RedissonTransaction.java | 23 +++--- .../test/java/org/redisson/RedisRunner.java | 10 ++- .../java/org/redisson/RedissonBatchTest.java | 33 +++++--- .../redisson/RedissonBlockingQueueTest.java | 41 ++++++++-- .../RedissonBoundedBlockingQueueTest.java | 21 ++++- .../org/redisson/RedissonFairLockTest.java | 70 ++++++++--------- .../RedissonPriorityBlockingQueueTest.java | 12 ++- .../redisson/RedissonRemoteServiceTest.java | 5 +- .../java/org/redisson/RedissonScriptTest.java | 6 +- .../test/java/org/redisson/RedissonTest.java | 24 +++++- .../java/org/redisson/RedissonTopicTest.java | 6 +- .../RedissonScheduledExecutorServiceTest.java | 21 ++--- 38 files changed, 443 insertions(+), 240 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedisNodes.java b/redisson/src/main/java/org/redisson/RedisNodes.java index 85d3edc63..1b58c0297 100644 --- a/redisson/src/main/java/org/redisson/RedisNodes.java +++ b/redisson/src/main/java/org/redisson/RedisNodes.java @@ -147,8 +147,12 @@ public class RedisNodes implements NodesGroup { boolean res = true; for (Entry> entry : result.entrySet()) { RFuture f = entry.getValue(); - f.awaitUninterruptibly(); - String pong = f.getNow(); + String pong = null; + try { + pong = f.toCompletableFuture().join(); + } catch (Exception e) { + // skip + } entry.getKey().closeAsync(); if (!"PONG".equals(pong)) { res = false; diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index 1db62290e..5416faefc 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -136,11 +136,11 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements } if (res == null) { - result.trySuccess(takeFuture.getNow()); + result.trySuccess(res); return; } createSemaphore(null).releaseAsync().onComplete((r, ex) -> { - result.trySuccess(takeFuture.getNow()); + result.trySuccess(res); }); }); return result; diff --git a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java index 4906adeef..f1fabdfa3 100644 --- a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -16,7 +16,10 @@ package org.redisson; import java.util.Arrays; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -129,7 +132,11 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return true; } RFuture promise = subscribe(); - if (!promise.await(time, unit)) { + try { + promise.toCompletableFuture().get(time, unit); + } catch (ExecutionException | CancellationException e) { + // skip + } catch (TimeoutException e) { return false; } diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 245b507cb..02c1200c0 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -1107,8 +1107,12 @@ public class RedissonExecutorService implements RScheduledExecutorService { throw new NullPointerException(); } - RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()])); - future.await(); + RExecutorBatchFuture future = submit(tasks.toArray(new Callable[0])); + try { + future.toCompletableFuture().join(); + } catch (Exception e) { + // skip + } List futures = future.getTaskFutures(); return (List>) futures; } @@ -1120,8 +1124,12 @@ public class RedissonExecutorService implements RScheduledExecutorService { throw new NullPointerException(); } - RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()])); - future.await(timeout, unit); + RExecutorBatchFuture future = submit(tasks.toArray(new Callable[0])); + try { + future.toCompletableFuture().get(timeout, unit); + } catch (ExecutionException | TimeoutException | CancellationException e) { + // skip + } List futures = future.getTaskFutures(); return (List>) futures; } diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 5adf38a12..03ed9f496 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -29,7 +29,9 @@ import org.redisson.pubsub.LockPubSub; import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -234,10 +236,12 @@ public class RedissonLock extends RedissonBaseLock { current = System.currentTimeMillis(); RFuture subscribeFuture = subscribe(threadId); - if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { + try { + subscribeFuture.toCompletableFuture().get(time, TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { if (!subscribeFuture.cancel(false)) { - subscribeFuture.onComplete((res, e) -> { - if (e == null) { + subscribeFuture.onComplete((res, ex) -> { + if (ex == null) { unsubscribe(subscribeFuture, threadId); } }); diff --git a/redisson/src/main/java/org/redisson/RedissonMultiLock.java b/redisson/src/main/java/org/redisson/RedissonMultiLock.java index 9c927af3a..84014164b 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiLock.java @@ -311,7 +311,13 @@ public class RedissonMultiLock implements RLock { protected void unlockInner(Collection locks) { locks.stream() .map(RLockAsync::unlockAsync) - .forEach(RFuture::awaitUninterruptibly); + .forEach(f -> { + try { + f.toCompletableFuture().join(); + } catch (Exception e) { + // skip + } + }); } protected RFuture unlockInnerAsync(Collection locks, long threadId) { diff --git a/redisson/src/main/java/org/redisson/RedissonNode.java b/redisson/src/main/java/org/redisson/RedissonNode.java index bc53a8833..cd47f91c3 100644 --- a/redisson/src/main/java/org/redisson/RedissonNode.java +++ b/redisson/src/main/java/org/redisson/RedissonNode.java @@ -15,13 +15,7 @@ */ package org.redisson; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Map.Entry; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; - +import io.netty.buffer.ByteBufUtil; import org.redisson.api.RExecutorService; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; @@ -34,7 +28,12 @@ import org.redisson.connection.MasterSlaveEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBufUtil; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map.Entry; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; /** * @@ -169,21 +168,34 @@ public final class RedissonNode { ConnectionManager connectionManager = ((Redisson) redisson).getConnectionManager(); for (MasterSlaveEntry entry : connectionManager.getEntrySet()) { RFuture readFuture = entry.connectionReadOp(null); - if (readFuture.awaitUninterruptibly((long) connectionManager.getConfig().getConnectTimeout()) - && readFuture.isSuccess()) { - RedisConnection connection = readFuture.getNow(); - entry.releaseRead(connection); - remoteAddress = (InetSocketAddress) connection.getChannel().remoteAddress(); - localAddress = (InetSocketAddress) connection.getChannel().localAddress(); + RedisConnection readConnection = null; + try { + readConnection = readFuture.toCompletableFuture().get(connectionManager.getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + // skip + } + if (readConnection != null) { + entry.releaseRead(readConnection); + remoteAddress = (InetSocketAddress) readConnection.getChannel().remoteAddress(); + localAddress = (InetSocketAddress) readConnection.getChannel().localAddress(); return; } + RFuture writeFuture = entry.connectionWriteOp(null); - if (writeFuture.awaitUninterruptibly((long) connectionManager.getConfig().getConnectTimeout()) - && writeFuture.isSuccess()) { - RedisConnection connection = writeFuture.getNow(); - entry.releaseWrite(connection); - remoteAddress = (InetSocketAddress) connection.getChannel().remoteAddress(); - localAddress = (InetSocketAddress) connection.getChannel().localAddress(); + RedisConnection writeConnection = null; + try { + writeConnection = writeFuture.toCompletableFuture().get(connectionManager.getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + // skip + } + if (writeConnection != null) { + entry.releaseWrite(writeConnection); + remoteAddress = (InetSocketAddress) writeConnection.getChannel().remoteAddress(); + localAddress = (InetSocketAddress) writeConnection.getChannel().localAddress(); return; } } diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index ced077be5..a6ff38d4b 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -17,8 +17,10 @@ package org.redisson; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -433,7 +435,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen current = System.currentTimeMillis(); RFuture future = subscribe(); - if (!future.await(time, TimeUnit.MILLISECONDS)) { + try { + future.toCompletableFuture().get(time, TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { return null; } diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 1d06fac69..71cd429f5 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -183,9 +183,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS RMap tasks = getMap(((RedissonObject) requestQueue).getRawName() + ":tasks"); RFuture taskFuture = getTask(requestId, tasks); - commandExecutor.getInterrupted(taskFuture); - - RemoteServiceRequest request = taskFuture.getNow(); + RemoteServiceRequest request = commandExecutor.getInterrupted(taskFuture); if (request == null) { throw new IllegalStateException("Task can't be found for request: " + requestId); } diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index a489e6879..03f7b8d4a 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -31,7 +31,10 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -301,7 +304,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { current = System.currentTimeMillis(); RFuture future = subscribe(); - if (!future.await(time, TimeUnit.MILLISECONDS)) { + try { + future.toCompletableFuture().get(time, TimeUnit.MILLISECONDS); + } catch (ExecutionException | CancellationException | TimeoutException e) { log.debug("unable to subscribe for permits acquisition, permits: {}, name: {}", permits, getName()); return false; } diff --git a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java index daf5eb363..c2fdd3f06 100644 --- a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java @@ -37,9 +37,15 @@ import org.redisson.misc.RedissonPromise; import org.redisson.remote.RemoteServiceRequest; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; + + + /** * * @author Nikita Koksharov @@ -176,8 +182,11 @@ public class RedissonTransferQueue extends RedissonExpirable implements RTran RemotePromise future = (RemotePromise) service.invoke(v); long remainTime = unit.toMillis(timeout); long startTime = System.currentTimeMillis(); - if (!future.getAddFuture().await(remainTime, TimeUnit.MILLISECONDS)) { - if (!future.getAddFuture().cancel(false)) { + CompletableFuture addFuture = future.getAddFuture().toCompletableFuture(); + try { + addFuture.get(remainTime, TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { + if (!addFuture.cancel(false)) { if (!future.cancel(false)) { commandExecutor.getInterrupted(future); return true; @@ -187,7 +196,9 @@ public class RedissonTransferQueue extends RedissonExpirable implements RTran } remainTime -= System.currentTimeMillis() - startTime; - if (!future.await(remainTime)) { + try { + future.toCompletableFuture().get(remainTime, TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { if (!future.cancel(false)) { commandExecutor.getInterrupted(future); return true; diff --git a/redisson/src/main/java/org/redisson/api/RFuture.java b/redisson/src/main/java/org/redisson/api/RFuture.java index f169c757d..e522a77e8 100644 --- a/redisson/src/main/java/org/redisson/api/RFuture.java +++ b/redisson/src/main/java/org/redisson/api/RFuture.java @@ -65,8 +65,15 @@ public interface RFuture extends java.util.concurrent.Future, CompletionSt V join(); /** - * Waits for this future to be completed within the - * specified time limit. + * Use snippet below instead. + * + *
+     *                 try {
+     *                     toCompletableFuture().get();
+     *                 } catch (Exception e) {
+     *                     // skip
+     *                 }
+     * 
* * @param timeout - wait timeout * @param unit - time unit @@ -76,11 +83,19 @@ public interface RFuture extends java.util.concurrent.Future, CompletionSt * @throws InterruptedException * if the current thread was interrupted */ + @Deprecated boolean await(long timeout, TimeUnit unit) throws InterruptedException; /** - * Waits for this future to be completed within the - * specified time limit. + * Use snippet below instead. + * + *
+     *                 try {
+     *                     toCompletableFuture().get();
+     *                 } catch (Exception e) {
+     *                     // skip
+     *                 }
+     * 
* * @param timeoutMillis - timeout value * @return {@code true} if and only if the future was completed within @@ -89,6 +104,7 @@ public interface RFuture extends java.util.concurrent.Future, CompletionSt * @throws InterruptedException * if the current thread was interrupted */ + @Deprecated boolean await(long timeoutMillis) throws InterruptedException; /** @@ -110,44 +126,74 @@ public interface RFuture extends java.util.concurrent.Future, CompletionSt RFuture syncUninterruptibly(); /** - * Waits for this future to be completed. + * Use snippet below instead. + * + *
+     *                 try {
+     *                     toCompletableFuture().get();
+     *                 } catch (Exception e) {
+     *                     // skip
+     *                 }
+     * 
* * @throws InterruptedException * if the current thread was interrupted * @return Future object */ + @Deprecated RFuture await() throws InterruptedException; /** - * Waits for this future to be completed without - * interruption. This method catches an {@link InterruptedException} and - * discards it silently. - * + * Use snippet below instead. + * + *
+     *             try {
+     *                 rFuture.toCompletableFuture().join();
+     *             } catch (Exception e) {
+     *                 // skip
+     *             }
+     * 
+ * * @return Future object */ + @Deprecated RFuture awaitUninterruptibly(); /** - * Waits for this future to be completed within the - * specified time limit without interruption. This method catches an - * {@link InterruptedException} and discards it silently. + * Use snippet below instead. + * + *
+     *                 try {
+     *                     toCompletableFuture().get();
+     *                 } catch (Exception e) {
+     *                     // skip
+     *                 }
+     * 
* * @param timeout - timeout value * @param unit - timeout unit value * @return {@code true} if and only if the future was completed within * the specified time limit */ + @Deprecated boolean awaitUninterruptibly(long timeout, TimeUnit unit); /** - * Waits for this future to be completed within the - * specified time limit without interruption. This method catches an - * {@link InterruptedException} and discards it silently. - * + * Use snippet below instead. + * + *
+     *                 try {
+     *                     toCompletableFuture().get();
+     *                 } catch (Exception e) {
+     *                     // skip
+     *                 }
+     * 
+ * * @param timeoutMillis - timeout value * @return {@code true} if and only if the future was completed within * the specified time limit */ + @Deprecated boolean awaitUninterruptibly(long timeoutMillis); void onComplete(BiConsumer action); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index 9da61d446..e5af40ccf 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -28,6 +28,7 @@ import org.redisson.liveobject.core.RedissonObjectBuilder; import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutionException; /** * @@ -40,7 +41,7 @@ public interface CommandAsyncExecutor { ConnectionManager getConnectionManager(); - RedisException convertException(RFuture future); + RedisException convertException(ExecutionException e); void syncSubscription(RFuture future); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 01f3c2b73..14667ea74 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -45,8 +45,7 @@ import java.io.IOException; import java.security.MessageDigest; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -84,18 +83,17 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public void syncSubscription(RFuture future) { MasterSlaveServersConfig config = connectionManager.getConfig(); + int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); try { - int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); - if (!future.await(timeout)) { - ((RPromise) future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); - } + future.toCompletableFuture().get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - } - try { - future.toCompletableFuture().join(); - } catch (CompletionException e) { + } catch (CancellationException e) { + // skip + } catch (ExecutionException e) { throw (RuntimeException) e.getCause(); + } catch (TimeoutException e) { + ((RPromise) future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); } } @@ -103,13 +101,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { public void syncSubscriptionInterrupted(RFuture future) throws InterruptedException { MasterSlaveServersConfig config = connectionManager.getConfig(); int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); - if (!future.await(timeout)) { - ((RPromise) future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); - } try { - future.toCompletableFuture().get(); + future.toCompletableFuture().get(timeout, TimeUnit.MILLISECONDS); + } catch (CancellationException e) { + // skip } catch (ExecutionException e) { throw (RuntimeException) e.getCause(); + } catch (TimeoutException e) { + ((RPromise) future).tryFailure(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); } } @@ -120,33 +119,26 @@ public class CommandAsyncService implements CommandAsyncExecutor { } try { - future.await(); + return future.toCompletableFuture().get(); } catch (InterruptedException e) { future.cancel(true); Thread.currentThread().interrupt(); throw new RedisException(e); + } catch (ExecutionException e) { + throw convertException(e); } - if (future.isSuccess()) { - return future.getNow(); - } - - throw convertException(future); } @Override public V getInterrupted(RFuture future) throws InterruptedException { try { - future.await(); + return future.toCompletableFuture().get(); } catch (InterruptedException e) { ((RPromise) future).tryFailure(e); throw e; + } catch (ExecutionException e) { + throw convertException(e); } - - if (future.isSuccess()) { - return future.getNow(); - } - - throw convertException(future); } protected RPromise createPromise() { @@ -329,11 +321,11 @@ public class CommandAsyncService implements CommandAsyncExecutor { return mainPromise; } - public RedisException convertException(RFuture future) { - if (future.cause() instanceof RedisException) { - return (RedisException) future.cause(); + public RedisException convertException(ExecutionException e) { + if (e.getCause() instanceof RedisException) { + return (RedisException) e.getCause(); } - return new RedisException("Unexpected exception while processing command", future.cause()); + return new RedisException("Unexpected exception while processing command", e.getCause()); } private NodeSource getNodeSource(String key) { diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 318467478..6ceac54c9 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -72,13 +72,10 @@ public class LoadBalancerManager { } public CompletableFuture add(ClientConnectionsEntry entry) { - List> futures = new ArrayList<>(2); CompletableFuture slaveFuture = slaveConnectionPool.add(entry); - futures.add(slaveFuture); CompletableFuture pubSubFuture = pubSubConnectionPool.add(entry); - futures.add(pubSubFuture); - CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + CompletableFuture future = CompletableFuture.allOf(slaveFuture, pubSubFuture); return future.thenAccept(r -> { client2Entry.put(entry.getClient(), entry); }); diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 0dd691446..e63eb6b66 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -227,12 +227,12 @@ public class TasksService extends BaseRemoteService { result.tryFailure(ex); return; } - - if (response.getNow() == null) { + + if (r == null) { result.trySuccess(false); return; } - result.trySuccess(response.getNow().isCanceled()); + result.trySuccess(r.isCanceled()); }); }); diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index 3810b9c47..9c96c3d8a 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -572,9 +572,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs } }); } else { - res.toCompletableFuture().join(); + List r = res.toCompletableFuture().join(); - List r = res.getNow(); Long added = (Long) r.get(0); Long syncs = (Long) r.get(1); if (syncs > 0) { @@ -803,9 +802,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs } }); } else { - res.toCompletableFuture().join(); + List r = res.toCompletableFuture().join(); - List r = res.getNow(); r.add(syncId); waitSync(r); result.trySuccess((Long) r.get(0) >= 1); @@ -1861,8 +1859,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs }); }); } else { - future.toCompletableFuture().join(); - V oldValue = future.getNow(); + V oldValue = future.toCompletableFuture().join(); try { cacheWriter.delete(key); if (oldValue != null) { @@ -2126,9 +2123,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs } }); } else { - future.toCompletableFuture().join(); - - List r = future.getNow(); + List r = future.toCompletableFuture().join(); long nullsAmount = (long) r.get(1); if (nullsAmount == keys.size()) { @@ -2288,9 +2283,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs } }); } else { - future.toCompletableFuture().join(); - - List r = future.getNow(); + List r = future.toCompletableFuture().join(); if (r.size() < 2) { result.trySuccess(null); @@ -3239,8 +3232,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs private void registerCacheEntryListener(CacheEntryListenerConfiguration cacheEntryListenerConfiguration, boolean addToConfig) { if (osType == null) { RFuture> serverFuture = commandExecutor.readAsync((String) null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER); - serverFuture.toCompletableFuture().join(); - String os = serverFuture.getNow().get("os"); + String os = serverFuture.toCompletableFuture().join().get("os"); if (os.contains("Windows")) { osType = BaseEventCodec.OSType.WINDOWS; } else if (os.contains("NONSTOP")) { diff --git a/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java b/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java index 72bb65917..91563be39 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java +++ b/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java @@ -17,11 +17,7 @@ package org.redisson.mapreduce; import java.io.Serializable; import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import org.redisson.Redisson; import org.redisson.api.RExecutorService; @@ -105,12 +101,20 @@ public class CoordinatorTask implements Callable, Serializab mapperTask.addObjectName(objectName); RFuture mapperFuture = executor.submitAsync(mapperTask); try { - if (timeout > 0 && !mapperFuture.await(timeout - timeSpent)) { - mapperFuture.cancel(true); - throw new MapReduceTimeoutException(); + if (timeout > 0) { + try { + mapperFuture.toCompletableFuture().get(timeout - timeSpent, TimeUnit.MILLISECONDS); + } catch (ExecutionException | CancellationException | TimeoutException e) { + mapperFuture.cancel(true); + throw new MapReduceTimeoutException(); + } } if (timeout == 0) { - mapperFuture.await(); + try { + mapperFuture.toCompletableFuture().join(); + } catch (CompletionException | CancellationException e) { + // skip + } } } catch (InterruptedException e) { mapperFuture.cancel(true); diff --git a/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java b/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java index 2aeaade77..d843ef87b 100644 --- a/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java +++ b/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java @@ -294,11 +294,13 @@ public class CompletableFutureWrapper implements RFuture { @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { try { - future.get(); + future.get(timeout, unit); } catch (ExecutionException e) { // skip + } catch (TimeoutException e) { + return false; } - return future.isDone(); + return true; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java index dfa8616a8..65e1b115a 100644 --- a/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java +++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java @@ -16,6 +16,7 @@ package org.redisson.reactive; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionException; import org.redisson.api.RFuture; import org.redisson.command.CommandAsyncService; @@ -54,6 +55,9 @@ public class CommandReactiveService extends CommandAsyncService implements Comma future.onComplete((v, e) -> { if (e != null) { + if (e instanceof CompletionException) { + e = e.getCause(); + } emitter.error(e); return; } diff --git a/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java b/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java index 3e8b7db5a..76493c2c1 100644 --- a/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java +++ b/redisson/src/main/java/org/redisson/redisnode/RedissonBaseNodes.java @@ -141,8 +141,13 @@ public class RedissonBaseNodes implements BaseRedisNodes { boolean res = true; for (Map.Entry> entry : result.entrySet()) { RFuture f = entry.getValue(); - f.awaitUninterruptibly(); - if (!"PONG".equals(f.getNow())) { + String pong = null; + try { + pong = f.toCompletableFuture().join(); + } catch (Exception e) { + // skip + } + if (!"PONG".equals(pong)) { res = false; } entry.getKey().closeAsync(); diff --git a/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java index 2ba36a2be..2de336cd9 100644 --- a/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java @@ -38,7 +38,6 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -317,8 +316,8 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { cancelExecution(optionsCopy, mayInterruptIfRunning, this, cancelRequestMapName); try { - awaitUninterruptibly(60, TimeUnit.SECONDS); - } catch (CancellationException e) { + toCompletableFuture().get(60, TimeUnit.SECONDS); + } catch (Exception e) { // skip } return isCancelled(); diff --git a/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java index 9d2c892b9..3bbc70b9c 100644 --- a/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java @@ -20,6 +20,9 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.Optional; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.redisson.RedissonBucket; import org.redisson.api.RFuture; @@ -85,18 +88,20 @@ public class SyncRemoteProxy extends BaseRemoteProxy { RemotePromise addPromise = new RemotePromise(requestId); RFuture futureAdd = remoteService.addAsync(requestQueueName, request, addPromise); - futureAdd.await(); - if (!futureAdd.isSuccess()) { + Boolean res; + try { + res = futureAdd.toCompletableFuture().join(); + } catch (Exception e) { if (responseFuture != null) { responseFuture.cancel(false); } if (ackFuture != null) { ackFuture.cancel(false); } - throw futureAdd.cause(); + throw e.getCause(); } - - if (!futureAdd.get()) { + + if (!res) { if (responseFuture != null) { responseFuture.cancel(false); } @@ -109,13 +114,20 @@ public class SyncRemoteProxy extends BaseRemoteProxy { // poll for the ack only if expected if (ackFuture != null) { String ackName = remoteService.getAckName(requestId); - ackFuture.await(optionsCopy.getAckTimeoutInMillis()); - RemoteServiceAck ack = ackFuture.getNow(); + RemoteServiceAck ack = null; + try { + ack = ackFuture.toCompletableFuture().get(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { + // skip + } if (ack == null) { RFuture ackFutureAttempt = tryPollAckAgainAsync(optionsCopy, ackName, requestId); - ackFutureAttempt.await(optionsCopy.getAckTimeoutInMillis()); - ack = ackFutureAttempt.getNow(); + try { + ack = ackFutureAttempt.toCompletableFuture().get(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { + // skip + } if (ack == null) { throw new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request); @@ -126,8 +138,12 @@ public class SyncRemoteProxy extends BaseRemoteProxy { // poll for the response only if expected if (responseFuture != null) { - responseFuture.awaitUninterruptibly(); - RemoteServiceResponse response = (RemoteServiceResponse) responseFuture.getNow(); + RemoteServiceResponse response = null; + try { + response = (RemoteServiceResponse) responseFuture.toCompletableFuture().join(); + } catch (Exception e) { + // skip + } if (response == null) { throw new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); diff --git a/redisson/src/main/java/org/redisson/rx/CommandRxService.java b/redisson/src/main/java/org/redisson/rx/CommandRxService.java index f252685f0..edd9cee6d 100644 --- a/redisson/src/main/java/org/redisson/rx/CommandRxService.java +++ b/redisson/src/main/java/org/redisson/rx/CommandRxService.java @@ -16,6 +16,7 @@ package org.redisson.rx; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionException; import org.redisson.api.RFuture; import org.redisson.command.CommandAsyncService; @@ -60,6 +61,9 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx future.onComplete((res, e) -> { if (e != null) { + if (e instanceof CompletionException) { + e = e.getCause(); + } p.onError(e); return; } diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index f7ce2ec0a..7500c3045 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -663,9 +663,8 @@ public class BaseTransactionalMap { return; } - Set set = future.getNow(); Map newstate = new HashMap(state); - for (Iterator iterator = set.iterator(); iterator.hasNext();) { + for (Iterator iterator = res.iterator(); iterator.hasNext();) { K key = iterator.next(); MapEntry value = newstate.remove(toKeyHash(key)); if (value == MapEntry.NULL) { @@ -677,10 +676,10 @@ public class BaseTransactionalMap { if (entry == MapEntry.NULL) { continue; } - set.add((K) entry.getKey()); + res.add((K) entry.getKey()); } - result.trySuccess(set); + result.trySuccess(res); }); return result; @@ -777,7 +776,7 @@ public class BaseTransactionalMap { return; } - map.putAll(future.getNow()); + map.putAll(res); result.trySuccess(map); }); diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java index fbc22ecae..765996b03 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java @@ -190,9 +190,8 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { return; } - Set set = future.getNow(); Map newstate = new HashMap<>(state); - for (Iterator iterator = set.iterator(); iterator.hasNext();) { + for (Iterator iterator = res.iterator(); iterator.hasNext();) { V key = iterator.next(); Object value = newstate.remove(toHash(key)); if (value == NULL) { @@ -204,10 +203,10 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { if (value == NULL) { continue; } - set.add((V) value); + res.add((V) value); } - result.trySuccess(set); + result.trySuccess(res); }); return result; diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index 2cd62307e..8b0b84b70 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -183,15 +183,14 @@ public class RedissonTransaction implements RTransaction { } String id = generateId(); - RPromise result = new RedissonPromise(); - RFuture> future = disableLocalCacheAsync(id, localCaches, operations); - future.onComplete((res, ex) -> { + RPromise result = new RedissonPromise<>(); + CompletableFuture> future = disableLocalCacheAsync(id, localCaches, operations); + future.whenComplete((hashes, ex) -> { if (ex != null) { result.tryFailure(new TransactionException("Unable to execute transaction", ex)); return; } - Map hashes = future.getNow(); try { checkTimeout(); } catch (TransactionTimeoutException e) { @@ -404,12 +403,12 @@ public class RedissonTransaction implements RTransaction { return hashes; } - private RFuture> disableLocalCacheAsync(String requestId, Set localCaches, List operations) { + private CompletableFuture> disableLocalCacheAsync(String requestId, Set localCaches, List operations) { if (localCaches.isEmpty()) { - return RedissonPromise.newSucceededFuture(Collections.emptyMap()); + return CompletableFuture.completedFuture(Collections.emptyMap()); } - RPromise> result = new RedissonPromise<>(); + CompletableFuture> result = new CompletableFuture<>(); Map hashes = new HashMap<>(localCaches.size()); RedissonBatch batch = createBatch(); for (TransactionalOperation transactionalOperation : operations) { @@ -437,13 +436,13 @@ public class RedissonTransaction implements RTransaction { RFuture> batchListener = batch.executeAsync(); batchListener.onComplete((res, e) -> { if (e != null) { - result.tryFailure(e); + result.completeExceptionally(e); return; } AsyncCountDownLatch latch = new AsyncCountDownLatch(); latch.latch(() -> { - result.trySuccess(hashes); + result.complete(hashes); }, hashes.size()); List> subscriptionFutures = new ArrayList<>(); @@ -489,21 +488,21 @@ public class RedissonTransaction implements RTransaction { RFuture> publishFuture = publishBatch.executeAsync(); publishFuture.onComplete((res2, ex2) -> { - result.onComplete((res3, ex3) -> { + result.whenComplete((res3, ex3) -> { for (RTopic topic : topics) { topic.removeAllListeners(); } }); if (ex2 != null) { - result.tryFailure(ex2); + result.completeExceptionally(ex2); return; } commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - result.tryFailure(new TransactionTimeoutException("Unable to execute transaction within " + options.getResponseTimeout() + "ms")); + result.completeExceptionally(new TransactionTimeoutException("Unable to execute transaction within " + options.getResponseTimeout() + "ms")); } }, options.getResponseTimeout(), TimeUnit.MILLISECONDS); }); diff --git a/redisson/src/test/java/org/redisson/RedisRunner.java b/redisson/src/test/java/org/redisson/RedisRunner.java index 3b358d8a8..f1efc3313 100644 --- a/redisson/src/test/java/org/redisson/RedisRunner.java +++ b/redisson/src/test/java/org/redisson/RedisRunner.java @@ -16,8 +16,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.redisson.client.RedisClient; import org.redisson.client.RedisClientConfig; @@ -894,9 +896,11 @@ public class RedisRunner { RedisConnection connection = c.connect(); try { connection.async(new RedisStrictCommand("SHUTDOWN", "NOSAVE", new VoidReplayConvertor())) - .await(3, TimeUnit.SECONDS); + .toCompletableFuture().get(3, TimeUnit.SECONDS); } catch (InterruptedException interruptedException) { //shutdown via command failed, lets wait and kill it later. + } catch (ExecutionException | TimeoutException e) { + // skip } c.shutdown(); connection.closeAsync().syncUninterruptibly(); @@ -928,9 +932,11 @@ public class RedisRunner { RedisConnection connection = c.connect(); try { connection.async(new RedisStrictCommand("SHUTDOWN", "NOSAVE", new VoidReplayConvertor())) - .await(3, TimeUnit.SECONDS); + .get(3, TimeUnit.SECONDS); } catch (InterruptedException interruptedException) { //shutdown via command failed, lets wait and kill it later. + } catch (ExecutionException | TimeoutException e) { + // skip } c.shutdown(); connection.closeAsync().syncUninterruptibly(); diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index 997ac6cb7..a46a999ef 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -16,6 +16,7 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.cluster.ClusterNodeInfo; import org.redisson.codec.JsonJacksonCodec; +import org.redisson.command.BatchPromise; import org.redisson.config.Config; import org.redisson.config.SubscriptionMode; @@ -119,7 +120,17 @@ public class RedissonBatchTest extends BaseTest { batch.execute(); - futures.forEach(f -> assertThat(f.awaitUninterruptibly(1)).isTrue()); + futures.forEach(f -> { + try { + f.toCompletableFuture().get(1, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + org.junit.jupiter.api.Assertions.fail(e); + } catch (ExecutionException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); sourceClient.shutdown(); destinationClient.shutdown(); @@ -411,7 +422,7 @@ public class RedissonBatchTest extends BaseTest { for (int i = 0; i < total; i++) { RFuture f = map.putAsync("" + i, "" + i, 5, TimeUnit.MINUTES); if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { - f.toCompletableFuture().join(); + ((BatchPromise)f).getSentPromise().toCompletableFuture().join(); } } @@ -474,10 +485,10 @@ public class RedissonBatchTest extends BaseTest { List list = (List) f.getResponses(); assertThat(list).containsExactly(1L, 2L, 3L, 2L); - assertThat(f1.getNow()).isEqualTo(1); - assertThat(f2.getNow()).isEqualTo(2); - assertThat(f3.getNow()).isEqualTo(3); - assertThat(d1.getNow()).isEqualTo(2); + assertThat(f1.toCompletableFuture().getNow(null)).isEqualTo(1); + assertThat(f2.toCompletableFuture().getNow(null)).isEqualTo(2); + assertThat(f3.toCompletableFuture().getNow(null)).isEqualTo(3); + assertThat(d1.toCompletableFuture().getNow(null)).isEqualTo(2); } @ParameterizedTest @@ -535,8 +546,8 @@ public class RedissonBatchTest extends BaseTest { RFuture val2 = b.getMap("test2", StringCodec.INSTANCE).getAsync("21"); b.execute(); - org.junit.jupiter.api.Assertions.assertEquals("2", val1.getNow()); - org.junit.jupiter.api.Assertions.assertEquals("3", val2.getNow()); + org.junit.jupiter.api.Assertions.assertEquals("2", val1.toCompletableFuture().getNow(null)); + org.junit.jupiter.api.Assertions.assertEquals("3", val2.toCompletableFuture().getNow(null)); } @ParameterizedTest @@ -549,8 +560,8 @@ public class RedissonBatchTest extends BaseTest { RFuture val2 = b.getMap("test2", StringCodec.INSTANCE).getAsync("21"); b.execute(); - org.junit.jupiter.api.Assertions.assertEquals("2", val1.getNow()); - org.junit.jupiter.api.Assertions.assertEquals("3", val2.getNow()); + org.junit.jupiter.api.Assertions.assertEquals("2", val1.toCompletableFuture().getNow(null)); + org.junit.jupiter.api.Assertions.assertEquals("3", val2.toCompletableFuture().getNow(null)); } @ParameterizedTest @@ -706,7 +717,7 @@ public class RedissonBatchTest extends BaseTest { int i = 0; for (Object element : s.getResponses()) { RFuture a = futures.get(i); - org.junit.jupiter.api.Assertions.assertEquals(a.getNow(), element); + org.junit.jupiter.api.Assertions.assertEquals(a.toCompletableFuture().getNow(null), element); i++; } } diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 335e94c92..df2d8c828 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -54,8 +54,13 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { RedissonClient redisson = Redisson.create(config); final RBlockingQueue queue1 = getQueue(redisson); RFuture f = queue1.pollAsync(5, TimeUnit.SECONDS); - - Assertions.assertFalse(f.await(1, TimeUnit.SECONDS)); + + try { + f.toCompletableFuture().get(1, TimeUnit.SECONDS); + Assertions.fail(); + } catch (TimeoutException e) { + // skip + } runner.stop(); long start = System.currentTimeMillis(); @@ -144,7 +149,11 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { RBlockingQueue queue1 = getQueue(redisson); RFuture f = queue1.pollAsync(10, TimeUnit.SECONDS); - f.await(1, TimeUnit.SECONDS); + try { + f.toCompletableFuture().get(1, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + // skip + } runner.stop(); runner = new RedisRunner() @@ -196,7 +205,11 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { for (int i = 0; i < 10; i++) { RBlockingQueue queue = redisson.getBlockingQueue("queue" + i); RFuture f = queue.takeAsync(); - f.await(1, TimeUnit.SECONDS); + try { + f.toCompletableFuture().get(1, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + // skip + } futures.add(f); } @@ -211,11 +224,15 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { for (int i = 0; i < 10; i++) { RFuture f = futures.get(i); - f.await(20, TimeUnit.SECONDS); + try { + f.toCompletableFuture().get(20, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + // skip + } if (f.cause() != null) { f.cause().printStackTrace(); } - Integer result = f.getNow(); + Integer result = f.toCompletableFuture().getNow(null); assertThat(result).isEqualTo(i*100); } @@ -273,7 +290,11 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { RBlockingQueue queue1 = getQueue(redisson); RFuture f = queue1.takeAsync(); - f.await(1, TimeUnit.SECONDS); + try { + f.toCompletableFuture().get(1, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + // skip + } master.stop(); @@ -314,7 +335,11 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { RBlockingQueue queue1 = getQueue(redisson); RFuture f = queue1.takeAsync(); - f.await(1, TimeUnit.SECONDS); + try { + f.toCompletableFuture().get(1, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + e.printStackTrace(); + } runner.stop(); runner = new RedisRunner() diff --git a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java index 226803112..8dcc9e78a 100644 --- a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java @@ -227,8 +227,13 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest { final RBoundedBlockingQueue queue1 = redisson.getBoundedBlockingQueue("bounded-queue:pollTimeout"); assertThat(queue1.trySetCapacity(5)).isTrue(); RFuture f = queue1.pollAsync(5, TimeUnit.SECONDS); - - Assertions.assertFalse(f.await(1, TimeUnit.SECONDS)); + + try { + f.toCompletableFuture().get(1, TimeUnit.SECONDS); + Assertions.fail(); + } catch (TimeoutException e) { + // skip + } runner.stop(); long start = System.currentTimeMillis(); @@ -308,7 +313,11 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest { RBoundedBlockingQueue queue1 = redisson.getBoundedBlockingQueue("queue:pollany"); RFuture f = queue1.pollAsync(10, TimeUnit.SECONDS); - f.await(1, TimeUnit.SECONDS); + try { + f.toCompletableFuture().get(1, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + // skip + } runner.stop(); runner = new RedisRunner() @@ -349,7 +358,11 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest { RBoundedBlockingQueue queue1 = redisson.getBoundedBlockingQueue("testTakeReattach"); assertThat(queue1.trySetCapacity(15)).isTrue(); RFuture f = queue1.takeAsync(); - f.await(1, TimeUnit.SECONDS); + try { + f.toCompletableFuture().get(1, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + // skip + } runner.stop(); runner = new RedisRunner() diff --git a/redisson/src/test/java/org/redisson/RedissonFairLockTest.java b/redisson/src/test/java/org/redisson/RedissonFairLockTest.java index ad823bb2c..0a373202d 100644 --- a/redisson/src/test/java/org/redisson/RedissonFairLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonFairLockTest.java @@ -267,62 +267,62 @@ public class RedissonFairLockTest extends BaseConcurrentTest { long threadFourthWaiter = 105; // take the lock successfully - Long ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get(); + Long ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNull(ttl); // fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout - Long firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Long firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(firstTTL); Assertions.assertTrue(firstTTL >= 29900 && firstTTL <= 30100, "Expected 30000 +/- 100 but was " + firstTTL); // fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout - Long secondTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Long secondTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(secondTTL); Assertions.assertTrue(secondTTL >= 34900 && secondTTL <= 35100, "Expected 35000 +/- 100 but was " + secondTTL); // try the third, and check the TTL - Long thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Long thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(thirdTTL); Assertions.assertTrue(thirdTTL >= 39900 && thirdTTL <= 40100, "Expected 40000 +/- 100 but was " + thirdTTL); // try the fourth, and check the TTL - Long fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get(); + Long fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(fourthTTL); Assertions.assertTrue(fourthTTL >= 44900 && fourthTTL <= 45100, "Expected 45000 +/- 100 but was " + fourthTTL); // wait timeout the second waiter - lock.acquireFailedAsync(5000, TimeUnit.MILLISECONDS, threadSecondWaiter).await().get(); + lock.acquireFailedAsync(5000, TimeUnit.MILLISECONDS, threadSecondWaiter).toCompletableFuture().join();; // try the first, and check the TTL - firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(firstTTL); Assertions.assertTrue(firstTTL >= 29900 && firstTTL <= 30100, "Expected 30000 +/- 100 but was " + firstTTL); // try the third, and check the TTL - thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(thirdTTL); Assertions.assertTrue(thirdTTL >= 34700 && thirdTTL <= 35300, "Expected 35000 +/- 300 but was " + thirdTTL); // try the fourth, and check the TTL - fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get(); + fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(fourthTTL); Assertions.assertTrue(fourthTTL >= 39900 && fourthTTL <= 40100, "Expected 40000 +/- 100 but was " + fourthTTL); // unlock the original lock holder - Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow(); + Boolean unlocked = lock.unlockInnerAsync(threadInit).toCompletableFuture().join();; Assertions.assertNotNull(unlocked); Assertions.assertTrue(unlocked); // acquire the lock immediately with the 1nd - ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNull(ttl); // try the third, and check the TTL - thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(thirdTTL); Assertions.assertTrue(thirdTTL >= 29700 && thirdTTL <= 30300, "Expected 30000 +/- 300 but was " + thirdTTL); - fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).await().get(); + fourthTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFourthWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(fourthTTL); Assertions.assertTrue(fourthTTL >= 34900 && fourthTTL <= 35100, "Expected 35000 +/- 100 but was " + fourthTTL); } @@ -348,40 +348,40 @@ public class RedissonFairLockTest extends BaseConcurrentTest { long threadThirdWaiter = 104; // take the lock successfully - Boolean locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + Boolean locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();; Assertions.assertTrue(locked); // fail to get the lock, but end up in the thread queue w/ ttl + 100ms timeout - locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();; Assertions.assertFalse(locked); // fail to get the lock again, but end up in the thread queue w/ ttl + 200ms timeout - locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();; Assertions.assertFalse(locked); // unlock the original lock holder - Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow(); + Boolean unlocked = lock.unlockInnerAsync(threadInit).toCompletableFuture().join();; Assertions.assertTrue(unlocked); // get the lock - locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();; Assertions.assertTrue(locked); // fail to get the lock, keeping ttl of lock ttl + 200ms - locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();; Assertions.assertFalse(locked); // fail to get the lock, keeping ttl of lock ttl + 100ms - locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();; Assertions.assertFalse(locked); // fail to get the lock, keeping ttl of lock ttl + 200ms - locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();; Assertions.assertFalse(locked); Thread.sleep(500); - locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).await().get(); + locked = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_NULL_BOOLEAN).toCompletableFuture().join();; Assertions.assertTrue(locked); } @@ -405,36 +405,36 @@ public class RedissonFairLockTest extends BaseConcurrentTest { long threadThirdWaiter = 104; // take the lock successfully - Long ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get(); + Long ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNull(ttl); // fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout - Long firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Long firstTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(firstTTL); // fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout - Long secondTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Long secondTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(secondTTL); // unlock the original lock holder - Boolean unlocked = lock.unlockInnerAsync(threadInit).await().getNow(); + Boolean unlocked = lock.unlockInnerAsync(threadInit).toCompletableFuture().join();; Assertions.assertNotNull(unlocked); Assertions.assertTrue(unlocked); - ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + ttl = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNull(ttl); - Long thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Long thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(thirdTTL); - Long secondTTLAgain = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Long secondTTLAgain = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(secondTTLAgain); long diff = secondTTL - secondTTLAgain; Assertions.assertTrue(diff > 4900 && diff < 5100, "Expected 5000 +/- 100 but was " + diff); diff = thirdTTL - secondTTLAgain; Assertions.assertTrue(diff > 4900 && diff < 5100, "Expected 5000 +/- 100 but was " + diff); - thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + thirdTTL = lock.tryLockInnerAsync(5000, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(thirdTTL); diff = thirdTTL - secondTTLAgain; Assertions.assertTrue(diff > 4900 && diff < 5100, "Expected 5000 +/- 100 but was " + diff); @@ -462,18 +462,18 @@ public class RedissonFairLockTest extends BaseConcurrentTest { long threadThirdWaiter = 104; // take the lock successfully - Long ttl = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).await().get(); + Long ttl = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadInit, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNull(ttl); // fail to get the lock, but end up in the thread queue w/ ttl + 5s timeout - Long firstTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).await().get(); + Long firstTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadFirstWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(firstTTL); // fail to get the lock again, but end up in the thread queue w/ ttl + 10s timeout - Long secondTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).await().get(); + Long secondTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadSecondWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(secondTTL); - Long thirdTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + Long thirdTTL = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNotNull(thirdTTL); long diff = thirdTTL - firstTTL; @@ -481,7 +481,7 @@ public class RedissonFairLockTest extends BaseConcurrentTest { Thread.sleep(thirdTTL + threadWaitTime); - ttl = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).await().get(); + ttl = lock.tryLockInnerAsync(-1, leaseTime, TimeUnit.MILLISECONDS, threadThirdWaiter, RedisCommands.EVAL_LONG).toCompletableFuture().join();; Assertions.assertNull(ttl); } diff --git a/redisson/src/test/java/org/redisson/RedissonPriorityBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonPriorityBlockingQueueTest.java index c9ce1ba52..254face84 100644 --- a/redisson/src/test/java/org/redisson/RedissonPriorityBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonPriorityBlockingQueueTest.java @@ -47,7 +47,11 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest RBlockingQueue queue1 = getQueue(redisson); RFuture f = queue1.pollAsync(10, TimeUnit.SECONDS); - f.await(1, TimeUnit.SECONDS); + try { + f.toCompletableFuture().get(1, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + // skip + } runner.stop(); runner = new RedisRunner() @@ -83,7 +87,11 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest RedissonClient redisson = Redisson.create(config); RBlockingQueue queue1 = getQueue(redisson); RFuture f = queue1.takeAsync(); - f.await(1, TimeUnit.SECONDS); + try { + f.toCompletableFuture().get(1, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + // skip + } runner.stop(); runner = new RedisRunner() diff --git a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 3c397bd38..dd1d36add 100644 --- a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -440,7 +440,7 @@ public class RedissonRemoteServiceTest extends BaseTest { f.toCompletableFuture().join(); RFuture resFuture = ri.resultMethod(100L); resFuture.toCompletableFuture().join(); - assertThat(resFuture.getNow()).isEqualTo(200); + assertThat(resFuture.toCompletableFuture().join()).isEqualTo(200); r1.shutdown(); r2.shutdown(); @@ -492,8 +492,7 @@ public class RedissonRemoteServiceTest extends BaseTest { RFuture f = ri.voidMethod("someName", 100L); f.toCompletableFuture().join(); RFuture resFuture = ri.resultMethod(100L); - resFuture.toCompletableFuture().join(); - assertThat(resFuture.getNow()).isEqualTo(200); + assertThat(resFuture.toCompletableFuture().join()).isEqualTo(200); r1.shutdown(); r2.shutdown(); diff --git a/redisson/src/test/java/org/redisson/RedissonScriptTest.java b/redisson/src/test/java/org/redisson/RedissonScriptTest.java index c89870889..45dab82d8 100644 --- a/redisson/src/test/java/org/redisson/RedissonScriptTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScriptTest.java @@ -58,7 +58,7 @@ public class RedissonScriptTest extends BaseTest { public void testEvalAsync() { RScript script = redisson.getScript(StringCodec.INSTANCE); RFuture> res = script.evalAsync(RScript.Mode.READ_ONLY, "return {'1','2','3.3333','foo',nil,'bar'}", RScript.ReturnType.MULTI, Collections.emptyList()); - assertThat(res.awaitUninterruptibly().getNow()).containsExactly("1", "2", "3.3333", "foo"); + assertThat(res.toCompletableFuture().join()).containsExactly("1", "2", "3.3333", "foo"); } @Test @@ -117,7 +117,7 @@ public class RedissonScriptTest extends BaseTest { public void testScriptLoadAsync() { redisson.getBucket("foo").set("bar"); RFuture r = redisson.getScript().scriptLoadAsync("return redis.call('get', 'foo')"); - Assertions.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r.awaitUninterruptibly().getNow()); + Assertions.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r.toCompletableFuture().join()); String r1 = redisson.getScript().evalSha(Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList()); Assertions.assertEquals("bar", r1); } @@ -143,7 +143,7 @@ public class RedissonScriptTest extends BaseTest { String r = redisson.getScript().eval(Mode.READ_ONLY, "return redis.call('get', 'foo')", RScript.ReturnType.VALUE); Assertions.assertEquals("bar", r); RFuture r1 = redisson.getScript().evalShaAsync(Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList()); - Assertions.assertEquals("bar", r1.awaitUninterruptibly().getNow()); + Assertions.assertEquals("bar", r1.toCompletableFuture().join()); } diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index a469406d6..9e0fbb9ee 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -420,7 +420,11 @@ public class RedissonTest extends BaseTest { int readonlyErrors = 0; for (RFuture rFuture : futures) { - rFuture.awaitUninterruptibly(); + try { + rFuture.toCompletableFuture().join(); + } catch (Exception e) { + // skip + } if (!rFuture.isSuccess()) { if (rFuture.cause().getMessage().contains("READONLY You can't write against")) { readonlyErrors++; @@ -549,7 +553,11 @@ public class RedissonTest extends BaseTest { assertThat(newMaster).isNotNull(); for (RFuture rFuture : futures) { - rFuture.awaitUninterruptibly(); + try { + rFuture.toCompletableFuture().join(); + } catch (Exception e) { + // skip + } if (!rFuture.isSuccess()) { Assertions.fail(); } @@ -649,7 +657,11 @@ public class RedissonTest extends BaseTest { int readonlyErrors = 0; for (RFuture rFuture : futures) { - rFuture.awaitUninterruptibly(); + try { + rFuture.toCompletableFuture().join(); + } catch (Exception e) { + // skip + } if (!rFuture.isSuccess()) { errors++; } else { @@ -723,7 +735,11 @@ public class RedissonTest extends BaseTest { int readonlyErrors = 0; for (RFuture rFuture : futures) { - rFuture.awaitUninterruptibly(); + try { + rFuture.toCompletableFuture().join(); + } catch (Exception e) { + // skip + } if (!rFuture.isSuccess()) { rFuture.cause().printStackTrace(); errors++; diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 000a91768..03e64feaa 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -1205,7 +1205,11 @@ public class RedissonTopicTest { } for (RFuture rFuture : futures) { - rFuture.awaitUninterruptibly(); + try { + rFuture.toCompletableFuture().join(); + } catch (Exception e) { + // skip + } } }; }; diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index f7f796a35..ae5ecd733 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -161,12 +161,11 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { @Test - public void testDelay() { + public void testDelay() throws ExecutionException, InterruptedException, TimeoutException { RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS)); long start = System.currentTimeMillis(); RScheduledFuture f = executor.schedule(new ScheduledCallableTask(), 11, TimeUnit.SECONDS); - assertThat(f.awaitUninterruptibly(12000)).isTrue(); - assertThat(f.isSuccess()).isTrue(); + f.toCompletableFuture().get(12, TimeUnit.SECONDS); assertThat(System.currentTimeMillis() - start).isBetween(11000L, 11500L); Reflect.onClass(RedissonExecutorService.class).set("RESULT_OPTIONS", RemoteInvocationOptions.defaults().noAck().expectResultWithin(3, TimeUnit.SECONDS)); @@ -174,14 +173,12 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS)); start = System.currentTimeMillis(); RScheduledFuture f1 = executor.schedule(new ScheduledCallableTask(), 5, TimeUnit.SECONDS); - assertThat(f1.awaitUninterruptibly(6000)).isTrue(); - assertThat(f1.isSuccess()).isTrue(); + f1.toCompletableFuture().get(6, TimeUnit.SECONDS); assertThat(System.currentTimeMillis() - start).isBetween(5000L, 5500L); start = System.currentTimeMillis(); RScheduledFuture f2 = executor.schedule(new RunnableTask(), 5, TimeUnit.SECONDS); - assertThat(f2.awaitUninterruptibly(6000)).isTrue(); - assertThat(f2.isSuccess()).isTrue(); + f2.toCompletableFuture().get(6, TimeUnit.SECONDS); assertThat(System.currentTimeMillis() - start).isBetween(5000L, 5500L); } @@ -260,7 +257,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { } @Test - public void testLoad() { + public void testLoad() throws InterruptedException { Config config = createConfig(); RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", Runtime.getRuntime().availableProcessors()*2)); @@ -274,7 +271,13 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { } for (RScheduledFuture future : futures) { - assertThat(future.awaitUninterruptibly(5100)).isTrue(); + try { + future.toCompletableFuture().get(5100, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assertions.fail(e); + } catch (ExecutionException e) { + // skip + } } node.shutdown();