diff --git a/redisson/src/main/java/org/redisson/api/RFuture.java b/redisson/src/main/java/org/redisson/api/RFuture.java index bc0a1c0f6..613ef004e 100644 --- a/redisson/src/main/java/org/redisson/api/RFuture.java +++ b/redisson/src/main/java/org/redisson/api/RFuture.java @@ -15,6 +15,7 @@ */ package org.redisson.api; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import io.netty.util.concurrent.FutureListener; @@ -26,7 +27,7 @@ import io.netty.util.concurrent.FutureListener; * * @param */ -public interface RFuture extends java.util.concurrent.Future { +public interface RFuture extends java.util.concurrent.Future, CompletionStage { /** * Returns {@code true} if and only if the I/O operation was completed @@ -52,6 +53,22 @@ public interface RFuture extends java.util.concurrent.Future { */ V getNow(); + /** + * Returns the result value when complete, or throws an + * (unchecked) exception if completed exceptionally. To better + * conform with the use of common functional forms, if a + * computation involved in the completion of this + * CompletableFuture threw an exception, this method throws an + * (unchecked) {@link CompletionException} with the underlying + * exception as its cause. + * + * @return the result value + * @throws CancellationException if the computation was cancelled + * @throws CompletionException if this future completed + * exceptionally or a completion computation threw an exception + */ + V join(); + /** * Waits for this future to be completed within the * specified time limit. diff --git a/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java b/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java index 92efb057f..b18e6a256 100644 --- a/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java +++ b/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java @@ -15,9 +15,16 @@ */ package org.redisson.misc; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; import io.netty.util.concurrent.FutureListener; @@ -34,120 +41,261 @@ public class PromiseDelegator implements RPromise { return promise; } - @Override + public T join() { + return promise.join(); + } + public boolean isSuccess() { return promise.isSuccess(); } - @Override public boolean trySuccess(T result) { return promise.trySuccess(result); } - @Override public Throwable cause() { return promise.cause(); } - @Override + public T getNow() { + return promise.getNow(); + } + public boolean tryFailure(Throwable cause) { return promise.tryFailure(cause); } - @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return promise.await(timeout, unit); + } + public boolean setUncancellable() { return promise.setUncancellable(); } - @Override + public boolean await(long timeoutMillis) throws InterruptedException { + return promise.await(timeoutMillis); + } + public RPromise addListener(FutureListener listener) { return promise.addListener(listener); } - @Override public RPromise addListeners(FutureListener... listeners) { return promise.addListeners(listeners); } - @Override public RPromise removeListener(FutureListener listener) { return promise.removeListener(listener); } - @Override public RPromise removeListeners(FutureListener... listeners) { return promise.removeListeners(listeners); } - @Override public RPromise await() throws InterruptedException { return promise.await(); } - @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return promise.cancel(mayInterruptIfRunning); + } + public RPromise awaitUninterruptibly() { return promise.awaitUninterruptibly(); } - @Override public RPromise sync() throws InterruptedException { return promise.sync(); } - @Override public RPromise syncUninterruptibly() { return promise.syncUninterruptibly(); } - @Override - public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return promise.await(timeout, unit); - } - - @Override public boolean isCancelled() { return promise.isCancelled(); } - @Override public boolean isDone() { return promise.isDone(); } - @Override - public boolean await(long timeoutMillis) throws InterruptedException { - return promise.await(timeoutMillis); - } - - @Override public T get() throws InterruptedException, ExecutionException { return promise.get(); } - @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { return promise.awaitUninterruptibly(timeout, unit); } - @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return promise.get(timeout, unit); } - @Override + public CompletionStage thenApply(Function fn) { + return promise.thenApply(fn); + } + public boolean awaitUninterruptibly(long timeoutMillis) { return promise.awaitUninterruptibly(timeoutMillis); } - @Override - public T getNow() { - return promise.getNow(); + public CompletionStage thenApplyAsync(Function fn) { + return promise.thenApplyAsync(fn); } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return promise.cancel(mayInterruptIfRunning); + public CompletionStage thenApplyAsync(Function fn, Executor executor) { + return promise.thenApplyAsync(fn, executor); } - - + + public CompletionStage thenAccept(Consumer action) { + return promise.thenAccept(action); + } + + public CompletionStage thenAcceptAsync(Consumer action) { + return promise.thenAcceptAsync(action); + } + + public CompletionStage thenAcceptAsync(Consumer action, Executor executor) { + return promise.thenAcceptAsync(action, executor); + } + + public CompletionStage thenRun(Runnable action) { + return promise.thenRun(action); + } + + public CompletionStage thenRunAsync(Runnable action) { + return promise.thenRunAsync(action); + } + + public CompletionStage thenRunAsync(Runnable action, Executor executor) { + return promise.thenRunAsync(action, executor); + } + + public CompletionStage thenCombine(CompletionStage other, + BiFunction fn) { + return promise.thenCombine(other, fn); + } + + public CompletionStage thenCombineAsync(CompletionStage other, + BiFunction fn) { + return promise.thenCombineAsync(other, fn); + } + + public CompletionStage thenCombineAsync(CompletionStage other, + BiFunction fn, Executor executor) { + return promise.thenCombineAsync(other, fn, executor); + } + + public CompletionStage thenAcceptBoth(CompletionStage other, + BiConsumer action) { + return promise.thenAcceptBoth(other, action); + } + + public CompletionStage thenAcceptBothAsync(CompletionStage other, + BiConsumer action) { + return promise.thenAcceptBothAsync(other, action); + } + + public CompletionStage thenAcceptBothAsync(CompletionStage other, + BiConsumer action, Executor executor) { + return promise.thenAcceptBothAsync(other, action, executor); + } + + public CompletionStage runAfterBoth(CompletionStage other, Runnable action) { + return promise.runAfterBoth(other, action); + } + + public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action) { + return promise.runAfterBothAsync(other, action); + } + + public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { + return promise.runAfterBothAsync(other, action, executor); + } + + public CompletionStage applyToEither(CompletionStage other, Function fn) { + return promise.applyToEither(other, fn); + } + + public CompletionStage applyToEitherAsync(CompletionStage other, Function fn) { + return promise.applyToEitherAsync(other, fn); + } + + public CompletionStage applyToEitherAsync(CompletionStage other, Function fn, + Executor executor) { + return promise.applyToEitherAsync(other, fn, executor); + } + + public CompletionStage acceptEither(CompletionStage other, Consumer action) { + return promise.acceptEither(other, action); + } + + public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action) { + return promise.acceptEitherAsync(other, action); + } + + public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action, + Executor executor) { + return promise.acceptEitherAsync(other, action, executor); + } + + public CompletionStage runAfterEither(CompletionStage other, Runnable action) { + return promise.runAfterEither(other, action); + } + + public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action) { + return promise.runAfterEitherAsync(other, action); + } + + public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { + return promise.runAfterEitherAsync(other, action, executor); + } + + public CompletionStage thenCompose(Function> fn) { + return promise.thenCompose(fn); + } + + public CompletionStage thenComposeAsync(Function> fn) { + return promise.thenComposeAsync(fn); + } + + public CompletionStage thenComposeAsync(Function> fn, + Executor executor) { + return promise.thenComposeAsync(fn, executor); + } + + public CompletionStage exceptionally(Function fn) { + return promise.exceptionally(fn); + } + + public CompletionStage whenComplete(BiConsumer action) { + return promise.whenComplete(action); + } + + public CompletionStage whenCompleteAsync(BiConsumer action) { + return promise.whenCompleteAsync(action); + } + + public CompletionStage whenCompleteAsync(BiConsumer action, Executor executor) { + return promise.whenCompleteAsync(action, executor); + } + + public CompletionStage handle(BiFunction fn) { + return promise.handle(fn); + } + + public CompletionStage handleAsync(BiFunction fn) { + return promise.handleAsync(fn); + } + + public CompletionStage handleAsync(BiFunction fn, Executor executor) { + return promise.handleAsync(fn, executor); + } + + public CompletableFuture toCompletableFuture() { + return promise.toCompletableFuture(); + } + } diff --git a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java index ecc76eee3..3ecbb4f48 100644 --- a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java +++ b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java @@ -15,15 +15,20 @@ */ package org.redisson.misc; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.RFuture; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; +import io.netty.util.internal.PlatformDependent; /** * @@ -31,9 +36,16 @@ import io.netty.util.concurrent.Promise; * * @param */ -public class RedissonPromise implements RPromise { +public class RedissonPromise extends CompletableFuture implements RPromise { + private volatile boolean uncancellable; + + private final int SUCCESS = 1; + private final int FAILED = 2; + private final int CANCELED = 3; + private final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); + private final AtomicInteger status = new AtomicInteger(); public RedissonPromise() { } @@ -49,7 +61,6 @@ public class RedissonPromise implements RPromise { future.trySuccess(result); return future; } - public Promise getInnerPromise() { return promise; @@ -57,27 +68,45 @@ public class RedissonPromise implements RPromise { @Override public boolean isSuccess() { - return promise.isSuccess(); + return isDone() && !isCompletedExceptionally(); } @Override - public boolean trySuccess(T result) { - return promise.trySuccess(result); + public synchronized boolean trySuccess(T result) { + if (status.compareAndSet(0, SUCCESS)) { + complete(result); + promise.trySuccess(result); + return true; + } + return false; } @Override public Throwable cause() { - return promise.cause(); + try { + getNow(null); + } catch (CompletionException e) { + return e.getCause(); + } + return null; } @Override - public boolean tryFailure(Throwable cause) { - return promise.tryFailure(cause); + public synchronized boolean tryFailure(Throwable cause) { + if (status.compareAndSet(0, FAILED)) { + completeExceptionally(cause); + promise.tryFailure(cause); + return true; + } + return false; } @Override public boolean setUncancellable() { - return promise.setUncancellable(); + if (!isDone()) { + uncancellable = true; + } + return uncancellable; } @Override @@ -106,76 +135,97 @@ public class RedissonPromise implements RPromise { @Override public RPromise await() throws InterruptedException { - promise.await(); + try { + get(); + } catch (ExecutionException | CancellationException e) { + // skip + } return this; } @Override public RPromise awaitUninterruptibly() { - promise.awaitUninterruptibly(); + try { + return await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } return this; } @Override public RPromise sync() throws InterruptedException { - promise.sync(); + try { + get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof CancellationException) { + throw (CancellationException)e.getCause(); + } + PlatformDependent.throwException(e.getCause()); + } return this; } @Override public RPromise syncUninterruptibly() { - promise.syncUninterruptibly(); + try { + join(); + } catch (CompletionException e) { + PlatformDependent.throwException(e.getCause()); + } return this; } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return promise.await(timeout, unit); - } - - @Override - public boolean isCancelled() { - return promise.isCancelled(); - } - - @Override - public boolean isDone() { - return promise.isDone(); + try { + get(timeout, unit); + } catch (ExecutionException e) { + if (e.getCause() instanceof CancellationException) { + throw (CancellationException)e.getCause(); + } + throw new CompletionException(e.getCause()); + } catch (TimeoutException e) { + return false; + } + return isDone(); } @Override public boolean await(long timeoutMillis) throws InterruptedException { - return promise.await(timeoutMillis); - } - - @Override - public T get() throws InterruptedException, ExecutionException { - return promise.get(); + return await(timeoutMillis, TimeUnit.MILLISECONDS); } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { - return promise.awaitUninterruptibly(timeout, unit); - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return promise.get(timeout, unit); + try { + return await(timeout, unit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } } @Override public boolean awaitUninterruptibly(long timeoutMillis) { - return promise.awaitUninterruptibly(timeoutMillis); + return awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS); } @Override public T getNow() { - return promise.getNow(); + return getNow(null); } @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return promise.cancel(mayInterruptIfRunning); + public synchronized boolean cancel(boolean mayInterruptIfRunning) { + if (uncancellable) { + return false; + } + if (status.compareAndSet(0, CANCELED)) { + promise.cancel(mayInterruptIfRunning); + return super.cancel(mayInterruptIfRunning); + } + return false; } } diff --git a/redisson/src/test/java/org/redisson/RedisClientTest.java b/redisson/src/test/java/org/redisson/RedisClientTest.java index 98da18393..aad94f6d0 100644 --- a/redisson/src/test/java/org/redisson/RedisClientTest.java +++ b/redisson/src/test/java/org/redisson/RedisClientTest.java @@ -33,7 +33,6 @@ import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ImmediateEventExecutor; public class RedisClientTest { @@ -69,12 +68,17 @@ public class RedisClientTest { public void testConnectAsync() throws InterruptedException { RedisClient c = new RedisClient("localhost", 6379); RFuture f = c.connectAsync(); - final CountDownLatch l = new CountDownLatch(1); + final CountDownLatch l = new CountDownLatch(2); f.addListener((FutureListener) future -> { RedisConnection conn = future.get(); assertThat(conn.sync(RedisCommands.PING)).isEqualTo("PONG"); l.countDown(); }); + f.handle((conn, ex) -> { + assertThat(conn.sync(RedisCommands.PING)).isEqualTo("PONG"); + l.countDown(); + return null; + }); assertThat(l.await(10, TimeUnit.SECONDS)).isTrue(); } diff --git a/redisson/src/test/java/org/redisson/RedissonMapTest.java b/redisson/src/test/java/org/redisson/RedissonMapTest.java index c57434c73..4d78ba571 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapTest.java @@ -12,6 +12,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import org.junit.Assert; @@ -627,8 +628,14 @@ public class RedissonMapTest extends BaseTest { map.put(4, 6); map.put(7, 8); - assertThat(map.fastRemoveAsync(1, 3, 7).get()).isEqualTo(3); - Thread.sleep(1); + CountDownLatch l = new CountDownLatch(1); + RFuture future = map.fastRemoveAsync(1, 3, 7); + future.handle((r, ex) -> { + assertThat(r).isEqualTo(3); + l.countDown(); + return this; + }); + assertThat(future.get()).isEqualTo(3); assertThat(map.size()).isEqualTo(1); }