diff --git a/redisson/src/main/java/org/redisson/api/RFuture.java b/redisson/src/main/java/org/redisson/api/RFuture.java index 613ef004e..bc0a1c0f6 100644 --- a/redisson/src/main/java/org/redisson/api/RFuture.java +++ b/redisson/src/main/java/org/redisson/api/RFuture.java @@ -15,7 +15,6 @@ */ package org.redisson.api; -import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import io.netty.util.concurrent.FutureListener; @@ -27,7 +26,7 @@ import io.netty.util.concurrent.FutureListener; * * @param */ -public interface RFuture extends java.util.concurrent.Future, CompletionStage { +public interface RFuture extends java.util.concurrent.Future { /** * Returns {@code true} if and only if the I/O operation was completed @@ -53,22 +52,6 @@ public interface RFuture extends java.util.concurrent.Future, CompletionSt */ V getNow(); - /** - * Returns the result value when complete, or throws an - * (unchecked) exception if completed exceptionally. To better - * conform with the use of common functional forms, if a - * computation involved in the completion of this - * CompletableFuture threw an exception, this method throws an - * (unchecked) {@link CompletionException} with the underlying - * exception as its cause. - * - * @return the result value - * @throws CancellationException if the computation was cancelled - * @throws CompletionException if this future completed - * exceptionally or a completion computation threw an exception - */ - V join(); - /** * Waits for this future to be completed within the * specified time limit. diff --git a/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java b/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java index b18e6a256..92efb057f 100644 --- a/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java +++ b/redisson/src/main/java/org/redisson/misc/PromiseDelegator.java @@ -15,16 +15,9 @@ */ package org.redisson.misc; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; import io.netty.util.concurrent.FutureListener; @@ -41,261 +34,120 @@ public class PromiseDelegator implements RPromise { return promise; } - public T join() { - return promise.join(); - } - + @Override public boolean isSuccess() { return promise.isSuccess(); } + @Override public boolean trySuccess(T result) { return promise.trySuccess(result); } + @Override public Throwable cause() { return promise.cause(); } - public T getNow() { - return promise.getNow(); - } - + @Override public boolean tryFailure(Throwable cause) { return promise.tryFailure(cause); } - public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return promise.await(timeout, unit); - } - + @Override public boolean setUncancellable() { return promise.setUncancellable(); } - public boolean await(long timeoutMillis) throws InterruptedException { - return promise.await(timeoutMillis); - } - + @Override public RPromise addListener(FutureListener listener) { return promise.addListener(listener); } + @Override public RPromise addListeners(FutureListener... listeners) { return promise.addListeners(listeners); } + @Override public RPromise removeListener(FutureListener listener) { return promise.removeListener(listener); } + @Override public RPromise removeListeners(FutureListener... listeners) { return promise.removeListeners(listeners); } + @Override public RPromise await() throws InterruptedException { return promise.await(); } - public boolean cancel(boolean mayInterruptIfRunning) { - return promise.cancel(mayInterruptIfRunning); - } - + @Override public RPromise awaitUninterruptibly() { return promise.awaitUninterruptibly(); } + @Override public RPromise sync() throws InterruptedException { return promise.sync(); } + @Override public RPromise syncUninterruptibly() { return promise.syncUninterruptibly(); } + @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return promise.await(timeout, unit); + } + + @Override public boolean isCancelled() { return promise.isCancelled(); } + @Override public boolean isDone() { return promise.isDone(); } + @Override + public boolean await(long timeoutMillis) throws InterruptedException { + return promise.await(timeoutMillis); + } + + @Override public T get() throws InterruptedException, ExecutionException { return promise.get(); } + @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { return promise.awaitUninterruptibly(timeout, unit); } + @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return promise.get(timeout, unit); } - public CompletionStage thenApply(Function fn) { - return promise.thenApply(fn); - } - + @Override public boolean awaitUninterruptibly(long timeoutMillis) { return promise.awaitUninterruptibly(timeoutMillis); } - public CompletionStage thenApplyAsync(Function fn) { - return promise.thenApplyAsync(fn); - } - - public CompletionStage thenApplyAsync(Function fn, Executor executor) { - return promise.thenApplyAsync(fn, executor); - } - - public CompletionStage thenAccept(Consumer action) { - return promise.thenAccept(action); - } - - public CompletionStage thenAcceptAsync(Consumer action) { - return promise.thenAcceptAsync(action); - } - - public CompletionStage thenAcceptAsync(Consumer action, Executor executor) { - return promise.thenAcceptAsync(action, executor); - } - - public CompletionStage thenRun(Runnable action) { - return promise.thenRun(action); - } - - public CompletionStage thenRunAsync(Runnable action) { - return promise.thenRunAsync(action); - } - - public CompletionStage thenRunAsync(Runnable action, Executor executor) { - return promise.thenRunAsync(action, executor); - } - - public CompletionStage thenCombine(CompletionStage other, - BiFunction fn) { - return promise.thenCombine(other, fn); - } - - public CompletionStage thenCombineAsync(CompletionStage other, - BiFunction fn) { - return promise.thenCombineAsync(other, fn); - } - - public CompletionStage thenCombineAsync(CompletionStage other, - BiFunction fn, Executor executor) { - return promise.thenCombineAsync(other, fn, executor); - } - - public CompletionStage thenAcceptBoth(CompletionStage other, - BiConsumer action) { - return promise.thenAcceptBoth(other, action); - } - - public CompletionStage thenAcceptBothAsync(CompletionStage other, - BiConsumer action) { - return promise.thenAcceptBothAsync(other, action); - } - - public CompletionStage thenAcceptBothAsync(CompletionStage other, - BiConsumer action, Executor executor) { - return promise.thenAcceptBothAsync(other, action, executor); - } - - public CompletionStage runAfterBoth(CompletionStage other, Runnable action) { - return promise.runAfterBoth(other, action); - } - - public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action) { - return promise.runAfterBothAsync(other, action); - } - - public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { - return promise.runAfterBothAsync(other, action, executor); - } - - public CompletionStage applyToEither(CompletionStage other, Function fn) { - return promise.applyToEither(other, fn); - } - - public CompletionStage applyToEitherAsync(CompletionStage other, Function fn) { - return promise.applyToEitherAsync(other, fn); - } - - public CompletionStage applyToEitherAsync(CompletionStage other, Function fn, - Executor executor) { - return promise.applyToEitherAsync(other, fn, executor); - } - - public CompletionStage acceptEither(CompletionStage other, Consumer action) { - return promise.acceptEither(other, action); - } - - public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action) { - return promise.acceptEitherAsync(other, action); - } - - public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action, - Executor executor) { - return promise.acceptEitherAsync(other, action, executor); - } - - public CompletionStage runAfterEither(CompletionStage other, Runnable action) { - return promise.runAfterEither(other, action); - } - - public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action) { - return promise.runAfterEitherAsync(other, action); - } - - public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { - return promise.runAfterEitherAsync(other, action, executor); - } - - public CompletionStage thenCompose(Function> fn) { - return promise.thenCompose(fn); - } - - public CompletionStage thenComposeAsync(Function> fn) { - return promise.thenComposeAsync(fn); - } - - public CompletionStage thenComposeAsync(Function> fn, - Executor executor) { - return promise.thenComposeAsync(fn, executor); - } - - public CompletionStage exceptionally(Function fn) { - return promise.exceptionally(fn); - } - - public CompletionStage whenComplete(BiConsumer action) { - return promise.whenComplete(action); - } - - public CompletionStage whenCompleteAsync(BiConsumer action) { - return promise.whenCompleteAsync(action); - } - - public CompletionStage whenCompleteAsync(BiConsumer action, Executor executor) { - return promise.whenCompleteAsync(action, executor); - } - - public CompletionStage handle(BiFunction fn) { - return promise.handle(fn); - } - - public CompletionStage handleAsync(BiFunction fn) { - return promise.handleAsync(fn); - } - - public CompletionStage handleAsync(BiFunction fn, Executor executor) { - return promise.handleAsync(fn, executor); + @Override + public T getNow() { + return promise.getNow(); } - public CompletableFuture toCompletableFuture() { - return promise.toCompletableFuture(); + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return promise.cancel(mayInterruptIfRunning); } - + + } diff --git a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java index 3ecbb4f48..ecc76eee3 100644 --- a/redisson/src/main/java/org/redisson/misc/RedissonPromise.java +++ b/redisson/src/main/java/org/redisson/misc/RedissonPromise.java @@ -15,20 +15,15 @@ */ package org.redisson.misc; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.RFuture; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; -import io.netty.util.internal.PlatformDependent; /** * @@ -36,16 +31,9 @@ import io.netty.util.internal.PlatformDependent; * * @param */ -public class RedissonPromise extends CompletableFuture implements RPromise { +public class RedissonPromise implements RPromise { - private volatile boolean uncancellable; - - private final int SUCCESS = 1; - private final int FAILED = 2; - private final int CANCELED = 3; - private final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); - private final AtomicInteger status = new AtomicInteger(); public RedissonPromise() { } @@ -61,6 +49,7 @@ public class RedissonPromise extends CompletableFuture implements RPromise future.trySuccess(result); return future; } + public Promise getInnerPromise() { return promise; @@ -68,45 +57,27 @@ public class RedissonPromise extends CompletableFuture implements RPromise @Override public boolean isSuccess() { - return isDone() && !isCompletedExceptionally(); + return promise.isSuccess(); } @Override - public synchronized boolean trySuccess(T result) { - if (status.compareAndSet(0, SUCCESS)) { - complete(result); - promise.trySuccess(result); - return true; - } - return false; + public boolean trySuccess(T result) { + return promise.trySuccess(result); } @Override public Throwable cause() { - try { - getNow(null); - } catch (CompletionException e) { - return e.getCause(); - } - return null; + return promise.cause(); } @Override - public synchronized boolean tryFailure(Throwable cause) { - if (status.compareAndSet(0, FAILED)) { - completeExceptionally(cause); - promise.tryFailure(cause); - return true; - } - return false; + public boolean tryFailure(Throwable cause) { + return promise.tryFailure(cause); } @Override public boolean setUncancellable() { - if (!isDone()) { - uncancellable = true; - } - return uncancellable; + return promise.setUncancellable(); } @Override @@ -135,97 +106,76 @@ public class RedissonPromise extends CompletableFuture implements RPromise @Override public RPromise await() throws InterruptedException { - try { - get(); - } catch (ExecutionException | CancellationException e) { - // skip - } + promise.await(); return this; } @Override public RPromise awaitUninterruptibly() { - try { - return await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + promise.awaitUninterruptibly(); return this; } @Override public RPromise sync() throws InterruptedException { - try { - get(); - } catch (ExecutionException e) { - if (e.getCause() instanceof CancellationException) { - throw (CancellationException)e.getCause(); - } - PlatformDependent.throwException(e.getCause()); - } + promise.sync(); return this; } @Override public RPromise syncUninterruptibly() { - try { - join(); - } catch (CompletionException e) { - PlatformDependent.throwException(e.getCause()); - } + promise.syncUninterruptibly(); return this; } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - try { - get(timeout, unit); - } catch (ExecutionException e) { - if (e.getCause() instanceof CancellationException) { - throw (CancellationException)e.getCause(); - } - throw new CompletionException(e.getCause()); - } catch (TimeoutException e) { - return false; - } - return isDone(); + return promise.await(timeout, unit); + } + + @Override + public boolean isCancelled() { + return promise.isCancelled(); + } + + @Override + public boolean isDone() { + return promise.isDone(); } @Override public boolean await(long timeoutMillis) throws InterruptedException { - return await(timeoutMillis, TimeUnit.MILLISECONDS); + return promise.await(timeoutMillis); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return promise.get(); } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { - try { - return await(timeout, unit); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } + return promise.awaitUninterruptibly(timeout, unit); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return promise.get(timeout, unit); } @Override public boolean awaitUninterruptibly(long timeoutMillis) { - return awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS); + return promise.awaitUninterruptibly(timeoutMillis); } @Override public T getNow() { - return getNow(null); + return promise.getNow(); } @Override - public synchronized boolean cancel(boolean mayInterruptIfRunning) { - if (uncancellable) { - return false; - } - if (status.compareAndSet(0, CANCELED)) { - promise.cancel(mayInterruptIfRunning); - return super.cancel(mayInterruptIfRunning); - } - return false; + public boolean cancel(boolean mayInterruptIfRunning) { + return promise.cancel(mayInterruptIfRunning); } } diff --git a/redisson/src/test/java/org/redisson/RedisClientTest.java b/redisson/src/test/java/org/redisson/RedisClientTest.java index aad94f6d0..98da18393 100644 --- a/redisson/src/test/java/org/redisson/RedisClientTest.java +++ b/redisson/src/test/java/org/redisson/RedisClientTest.java @@ -33,6 +33,7 @@ import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ImmediateEventExecutor; public class RedisClientTest { @@ -68,17 +69,12 @@ public class RedisClientTest { public void testConnectAsync() throws InterruptedException { RedisClient c = new RedisClient("localhost", 6379); RFuture f = c.connectAsync(); - final CountDownLatch l = new CountDownLatch(2); + final CountDownLatch l = new CountDownLatch(1); f.addListener((FutureListener) future -> { RedisConnection conn = future.get(); assertThat(conn.sync(RedisCommands.PING)).isEqualTo("PONG"); l.countDown(); }); - f.handle((conn, ex) -> { - assertThat(conn.sync(RedisCommands.PING)).isEqualTo("PONG"); - l.countDown(); - return null; - }); assertThat(l.await(10, TimeUnit.SECONDS)).isTrue(); } diff --git a/redisson/src/test/java/org/redisson/RedissonMapTest.java b/redisson/src/test/java/org/redisson/RedissonMapTest.java index 4d78ba571..c57434c73 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapTest.java @@ -12,7 +12,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import org.junit.Assert; @@ -628,14 +627,8 @@ public class RedissonMapTest extends BaseTest { map.put(4, 6); map.put(7, 8); - CountDownLatch l = new CountDownLatch(1); - RFuture future = map.fastRemoveAsync(1, 3, 7); - future.handle((r, ex) -> { - assertThat(r).isEqualTo(3); - l.countDown(); - return this; - }); - assertThat(future.get()).isEqualTo(3); + assertThat(map.fastRemoveAsync(1, 3, 7).get()).isEqualTo(3); + Thread.sleep(1); assertThat(map.size()).isEqualTo(1); }