diff --git a/redisson/src/main/java/org/redisson/RedissonBinaryStream.java b/redisson/src/main/java/org/redisson/RedissonBinaryStream.java index 77b86a1f1..e8eee69ac 100644 --- a/redisson/src/main/java/org/redisson/RedissonBinaryStream.java +++ b/redisson/src/main/java/org/redisson/RedissonBinaryStream.java @@ -24,8 +24,6 @@ import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import java.io.IOException; import java.io.InputStream; @@ -228,25 +226,19 @@ public class RedissonBinaryStream extends RedissonBucket implements RBin @Override public Future read(ByteBuffer dst) { - RPromise result = new RedissonPromise<>(); RFuture res = commandExecutor.readAsync(getRawName(), codec, RedisCommands.GETRANGE, getRawName(), position, position + dst.remaining() - 1); - res.onComplete((data, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } + CompletionStage f = res.thenApply(data -> { if (data.length == 0) { - result.trySuccess(-1); - return; + return -1; } position += data.length; dst.put(data); - result.trySuccess(data.length); + return data.length; }); - return result; + return new CompletableFutureWrapper<>(f); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 4ed3c15f5..a16a07806 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -27,7 +27,10 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.ConnectionManager; import org.redisson.executor.*; import org.redisson.executor.params.*; -import org.redisson.misc.*; +import org.redisson.misc.CompletableFutureWrapper; +import org.redisson.misc.Injector; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; import org.redisson.remote.ResponseEntry.Result; @@ -619,8 +622,11 @@ public class RedissonExecutorService implements RScheduledExecutorService { if (!addResult.get(0)) { throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state"); } - - return new RedissonExecutorBatchFuture(result); + + CompletableFuture future = CompletableFuture.allOf(result.stream() + .map(CompletionStage::toCompletableFuture) + .toArray(CompletableFuture[]::new)); + return new RedissonExecutorBatchFuture(future, result); } protected TaskParameters createTaskParameters(Callable task) { @@ -670,7 +676,10 @@ public class RedissonExecutorService implements RScheduledExecutorService { } }); - return new RedissonExecutorBatchFuture(result); + CompletableFuture future = CompletableFuture.allOf(result.stream() + .map(CompletionStage::toCompletableFuture) + .toArray(CompletableFuture[]::new)); + return new RedissonExecutorBatchFuture(future, result); } @@ -735,8 +744,11 @@ public class RedissonExecutorService implements RScheduledExecutorService { if (!addResult.get(0)) { throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state"); } - - return new RedissonExecutorBatchFuture(result); + + CompletableFuture future = CompletableFuture.allOf(result.stream() + .map(CompletionStage::toCompletableFuture) + .toArray(CompletableFuture[]::new)); + return new RedissonExecutorBatchFuture(future, result); } @Override @@ -774,7 +786,10 @@ public class RedissonExecutorService implements RScheduledExecutorService { } }); - return new RedissonExecutorBatchFuture(result); + CompletableFuture future = CompletableFuture.allOf(result.stream() + .map(CompletionStage::toCompletableFuture) + .toArray(CompletableFuture[]::new)); + return new RedissonExecutorBatchFuture(future, result); } diff --git a/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java b/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java index f8e0b4044..6979b023d 100644 --- a/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java +++ b/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java @@ -25,11 +25,11 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.Time; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -71,21 +71,20 @@ public class RedisClientEntry implements ClusterNode { @Override public RFuture pingAsync(long timeout, TimeUnit timeUnit) { - RPromise result = new RedissonPromise<>(); RFuture f = commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL); - f.whenComplete((res, e) -> { + CompletableFuture s = f.toCompletableFuture().handle((res, e) -> { if (e != null) { - result.trySuccess(false); - return; + return false; } - - result.trySuccess(res); + + return res; }); + commandExecutor.getConnectionManager().newTimeout(t -> { RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: PING, Redis client: " + client); - result.tryFailure(ex); + s.completeExceptionally(ex); }, timeout, timeUnit); - return result; + return new CompletableFutureWrapper<>(s); } @Override diff --git a/redisson/src/main/java/org/redisson/executor/RedissonCompletionService.java b/redisson/src/main/java/org/redisson/executor/RedissonCompletionService.java index 349a6548d..43c2afeff 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonCompletionService.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonCompletionService.java @@ -15,13 +15,7 @@ */ package org.redisson.executor; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.redisson.api.RFuture; import org.redisson.api.RScheduledExecutorService; @@ -67,7 +61,7 @@ public class RedissonCompletionService implements CompletionService { } RFuture f = executorService.submit(task); - f.onComplete((res, e) -> { + f.whenComplete((res, e) -> { completionQueue.add(f); }); return f; @@ -80,7 +74,7 @@ public class RedissonCompletionService implements CompletionService { } RFuture f = executorService.submit(task, result); - f.onComplete((res, e) -> { + f.whenComplete((res, e) -> { completionQueue.add(f); }); return f; diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorBatchFuture.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorBatchFuture.java index b6ff75445..188a95167 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorBatchFuture.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorBatchFuture.java @@ -15,38 +15,25 @@ */ package org.redisson.executor; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - import org.redisson.api.RExecutorBatchFuture; import org.redisson.api.RExecutorFuture; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; + +import java.util.List; +import java.util.concurrent.CompletableFuture; /** * * @author Nikita Koksharov * */ -public class RedissonExecutorBatchFuture extends RedissonPromise implements RExecutorBatchFuture { +public class RedissonExecutorBatchFuture extends CompletableFutureWrapper implements RExecutorBatchFuture { - private List> futures; + private final List> futures; - public RedissonExecutorBatchFuture(List> futures) { + public RedissonExecutorBatchFuture(CompletableFuture future, List> futures) { + super(future); this.futures = futures; - - final AtomicInteger counter = new AtomicInteger(futures.size()); - for (RExecutorFuture future : futures) { - future.onComplete((res, e) -> { - if (e != null) { - RedissonExecutorBatchFuture.this.tryFailure(e); - return; - } - - if (counter.decrementAndGet() == 0) { - RedissonExecutorBatchFuture.this.trySuccess(null); - } - }); - } } @Override diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 5bd9fc25e..37da50fbb 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -17,8 +17,6 @@ package org.redisson.executor; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; import org.redisson.RedissonExecutorService; import org.redisson.RedissonShutdownException; import org.redisson.api.RFuture; @@ -237,12 +235,8 @@ public class TasksRunnerService implements RemoteExecutorService { return; } - commandExecutor.getConnectionManager().newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - renewRetryTime(requestId); - } - }, Math.max(1000, retryInterval / 2), TimeUnit.MILLISECONDS); + commandExecutor.getConnectionManager().newTimeout(timeout -> renewRetryTime(requestId), + Math.max(1000, retryInterval / 2), TimeUnit.MILLISECONDS); } protected RFuture renewRetryTime(String requestId) { @@ -269,9 +263,9 @@ public class TasksRunnerService implements RemoteExecutorService { + "return retryInterval; " + "end;" + "return nil;", - Arrays.asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName), + Arrays.asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName), System.currentTimeMillis(), requestId); - future.onComplete((res, e) -> { + future.whenComplete((res, e) -> { if (e != null) { scheduleRetryTimeRenewal(requestId, 10000L); return; @@ -339,7 +333,7 @@ public class TasksRunnerService implements RemoteExecutorService { if (params.getRequestId() != null && params.getRequestId().startsWith("00")) { RFuture future = renewRetryTime(params.getRequestId()); try { - future.get(); + future.toCompletableFuture().get(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index ef4bc9150..19a51122a 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -24,8 +24,7 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.params.TaskParameters; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; +import org.redisson.misc.CompletableFutureWrapper; import org.redisson.remote.*; import java.util.Arrays; @@ -144,7 +143,7 @@ public class TasksService extends BaseRemoteService { + "return 1;" + "end;" + "return 0;", - Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, + Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, requestQueueName, tasksRetryIntervalName, tasksExpirationTimeName), retryStartTime, request.getId(), encode(request), tasksRetryInterval, expireTime); return f.toCompletableFuture(); @@ -173,7 +172,7 @@ public class TasksService extends BaseRemoteService { + "return 1; " + "end;" + "return 0;", - Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, + Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, tasksName, tasksRetryIntervalName, tasksExpirationTimeName), taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); return f.toCompletableFuture(); @@ -188,78 +187,67 @@ public class TasksService extends BaseRemoteService { } public RFuture cancelExecutionAsync(RequestId requestId) { - RPromise result = new RedissonPromise<>(); - String requestQueueName = getRequestQueueName(RemoteExecutorService.class); CompletableFuture removeFuture = removeAsync(requestQueueName, requestId); - removeFuture.whenComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletableFuture f = removeFuture.thenCompose(res -> { if (res) { - result.trySuccess(true); - return; + return CompletableFuture.completedFuture(true); } - + RMap canceledRequests = getMap(cancelRequestMapName); canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); - commandExecutor.getConnectionManager().newTimeout(timeout -> { - result.trySuccess(false); - }, 60, TimeUnit.SECONDS); - - RPromise response = new RedissonPromise<>(); - scheduleCancelResponseCheck(cancelResponseMapName, requestId, response); - response.onComplete((r, ex) -> { - if (ex != null) { - result.tryFailure(ex); - return; - } - + CompletableFuture response = scheduleCancelResponseCheck(cancelResponseMapName, requestId); + return response.thenApply(r -> { if (r == null) { - result.trySuccess(false); - return; + return false; } - result.trySuccess(r.isCanceled()); + return r.isCanceled(); }); }); - return result; + removeFuture.thenAccept(r -> { + commandExecutor.getConnectionManager().newTimeout(timeout -> { + f.complete(false); + }, 60, TimeUnit.SECONDS); + }); + + return new CompletableFutureWrapper<>(f); } - private void scheduleCancelResponseCheck(String mapName, RequestId requestId, RPromise cancelResponse) { + private CompletableFuture scheduleCancelResponseCheck(String mapName, RequestId requestId) { + CompletableFuture cancelResponse = new CompletableFuture<>(); + commandExecutor.getConnectionManager().newTimeout(timeout -> { if (cancelResponse.isDone()) { return; } RMap canceledResponses = getMap(mapName); - RFuture future = canceledResponses.removeAsync(requestId.toString()); - future.onComplete((response, ex) -> { - if (ex != null) { - scheduleCancelResponseCheck(mapName, requestId, cancelResponse); - return; - } - + RFuture removeFuture = canceledResponses.removeAsync(requestId.toString()); + CompletableFuture future = removeFuture.thenCompose(response -> { if (response == null) { RFuture f = hasTaskAsync(requestId.toString()); - f.onComplete((r, e) -> { - if (e != null || r) { - scheduleCancelResponseCheck(mapName, requestId, cancelResponse); - return; + return f.thenCompose(r -> { + if (r) { + return scheduleCancelResponseCheck(mapName, requestId); } RemoteServiceCancelResponse resp = new RemoteServiceCancelResponse(requestId.toString(), false); - cancelResponse.trySuccess(resp); + return CompletableFuture.completedFuture(resp); }); - } else { - cancelResponse.trySuccess(response); } - }); + return CompletableFuture.completedFuture(response); + }).whenComplete((r, ex) -> { + if (ex != null) { + scheduleCancelResponseCheck(mapName, requestId); + } + }).toCompletableFuture(); + + commandExecutor.transfer(future, cancelResponse); }, 3000, TimeUnit.MILLISECONDS); + return cancelResponse; } public RFuture hasTaskAsync(String taskId) {