refactoring

pull/4061/head
Nikita Koksharov 3 years ago
parent f52a5ffe51
commit 2acb278f5f

@ -24,8 +24,6 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -228,25 +226,19 @@ public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBin
@Override @Override
public Future<Integer> read(ByteBuffer dst) { public Future<Integer> read(ByteBuffer dst) {
RPromise<Integer> result = new RedissonPromise<>();
RFuture<byte[]> res = commandExecutor.readAsync(getRawName(), codec, RedisCommands.GETRANGE, RFuture<byte[]> res = commandExecutor.readAsync(getRawName(), codec, RedisCommands.GETRANGE,
getRawName(), position, position + dst.remaining() - 1); getRawName(), position, position + dst.remaining() - 1);
res.onComplete((data, e) -> { CompletionStage<Integer> f = res.thenApply(data -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (data.length == 0) { if (data.length == 0) {
result.trySuccess(-1); return -1;
return;
} }
position += data.length; position += data.length;
dst.put(data); dst.put(data);
result.trySuccess(data.length); return data.length;
}); });
return result; return new CompletableFutureWrapper<>(f);
} }
@Override @Override

@ -27,7 +27,10 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.executor.*; import org.redisson.executor.*;
import org.redisson.executor.params.*; import org.redisson.executor.params.*;
import org.redisson.misc.*; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Injector;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RequestId; import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry; import org.redisson.remote.ResponseEntry;
import org.redisson.remote.ResponseEntry.Result; import org.redisson.remote.ResponseEntry.Result;
@ -619,8 +622,11 @@ public class RedissonExecutorService implements RScheduledExecutorService {
if (!addResult.get(0)) { if (!addResult.get(0)) {
throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state"); throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state");
} }
return new RedissonExecutorBatchFuture(result); CompletableFuture<Void> future = CompletableFuture.allOf(result.stream()
.map(CompletionStage::toCompletableFuture)
.toArray(CompletableFuture[]::new));
return new RedissonExecutorBatchFuture(future, result);
} }
protected TaskParameters createTaskParameters(Callable<?> task) { protected TaskParameters createTaskParameters(Callable<?> task) {
@ -670,7 +676,10 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
}); });
return new RedissonExecutorBatchFuture(result); CompletableFuture<Void> future = CompletableFuture.allOf(result.stream()
.map(CompletionStage::toCompletableFuture)
.toArray(CompletableFuture[]::new));
return new RedissonExecutorBatchFuture(future, result);
} }
@ -735,8 +744,11 @@ public class RedissonExecutorService implements RScheduledExecutorService {
if (!addResult.get(0)) { if (!addResult.get(0)) {
throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state"); throw new RejectedExecutionException("Tasks have been rejected. ExecutorService is in shutdown state");
} }
return new RedissonExecutorBatchFuture(result); CompletableFuture<Void> future = CompletableFuture.allOf(result.stream()
.map(CompletionStage::toCompletableFuture)
.toArray(CompletableFuture[]::new));
return new RedissonExecutorBatchFuture(future, result);
} }
@Override @Override
@ -774,7 +786,10 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
}); });
return new RedissonExecutorBatchFuture(result); CompletableFuture<Void> future = CompletableFuture.allOf(result.stream()
.map(CompletionStage::toCompletableFuture)
.toArray(CompletableFuture[]::new));
return new RedissonExecutorBatchFuture(future, result);
} }

@ -25,11 +25,11 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.Time; import org.redisson.client.protocol.Time;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedissonPromise;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -71,21 +71,20 @@ public class RedisClientEntry implements ClusterNode {
@Override @Override
public RFuture<Boolean> pingAsync(long timeout, TimeUnit timeUnit) { public RFuture<Boolean> pingAsync(long timeout, TimeUnit timeUnit) {
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> f = commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL); RFuture<Boolean> f = commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL);
f.whenComplete((res, e) -> { CompletableFuture<Boolean> s = f.toCompletableFuture().handle((res, e) -> {
if (e != null) { if (e != null) {
result.trySuccess(false); return false;
return;
} }
result.trySuccess(res); return res;
}); });
commandExecutor.getConnectionManager().newTimeout(t -> { commandExecutor.getConnectionManager().newTimeout(t -> {
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: PING, Redis client: " + client); RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: PING, Redis client: " + client);
result.tryFailure(ex); s.completeExceptionally(ex);
}, timeout, timeUnit); }, timeout, timeUnit);
return result; return new CompletableFutureWrapper<>(s);
} }
@Override @Override

@ -15,13 +15,7 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RScheduledExecutorService; import org.redisson.api.RScheduledExecutorService;
@ -67,7 +61,7 @@ public class RedissonCompletionService<V> implements CompletionService<V> {
} }
RFuture<V> f = executorService.submit(task); RFuture<V> f = executorService.submit(task);
f.onComplete((res, e) -> { f.whenComplete((res, e) -> {
completionQueue.add(f); completionQueue.add(f);
}); });
return f; return f;
@ -80,7 +74,7 @@ public class RedissonCompletionService<V> implements CompletionService<V> {
} }
RFuture<V> f = executorService.submit(task, result); RFuture<V> f = executorService.submit(task, result);
f.onComplete((res, e) -> { f.whenComplete((res, e) -> {
completionQueue.add(f); completionQueue.add(f);
}); });
return f; return f;

@ -15,38 +15,25 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RExecutorBatchFuture; import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture; import org.redisson.api.RExecutorFuture;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.CompletableFutureWrapper;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class RedissonExecutorBatchFuture extends RedissonPromise<Void> implements RExecutorBatchFuture { public class RedissonExecutorBatchFuture extends CompletableFutureWrapper<Void> implements RExecutorBatchFuture {
private List<RExecutorFuture<?>> futures; private final List<RExecutorFuture<?>> futures;
public RedissonExecutorBatchFuture(List<RExecutorFuture<?>> futures) { public RedissonExecutorBatchFuture(CompletableFuture<Void> future, List<RExecutorFuture<?>> futures) {
super(future);
this.futures = futures; this.futures = futures;
final AtomicInteger counter = new AtomicInteger(futures.size());
for (RExecutorFuture<?> future : futures) {
future.onComplete((res, e) -> {
if (e != null) {
RedissonExecutorBatchFuture.this.tryFailure(e);
return;
}
if (counter.decrementAndGet() == 0) {
RedissonExecutorBatchFuture.this.trySuccess(null);
}
});
}
} }
@Override @Override

@ -17,8 +17,6 @@ package org.redisson.executor;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.RedissonExecutorService; import org.redisson.RedissonExecutorService;
import org.redisson.RedissonShutdownException; import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -237,12 +235,8 @@ public class TasksRunnerService implements RemoteExecutorService {
return; return;
} }
commandExecutor.getConnectionManager().newTimeout(new TimerTask() { commandExecutor.getConnectionManager().newTimeout(timeout -> renewRetryTime(requestId),
@Override Math.max(1000, retryInterval / 2), TimeUnit.MILLISECONDS);
public void run(Timeout timeout) throws Exception {
renewRetryTime(requestId);
}
}, Math.max(1000, retryInterval / 2), TimeUnit.MILLISECONDS);
} }
protected RFuture<Long> renewRetryTime(String requestId) { protected RFuture<Long> renewRetryTime(String requestId) {
@ -269,9 +263,9 @@ public class TasksRunnerService implements RemoteExecutorService {
+ "return retryInterval; " + "return retryInterval; "
+ "end;" + "end;"
+ "return nil;", + "return nil;",
Arrays.<Object>asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName), Arrays.asList(statusName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName, tasksName),
System.currentTimeMillis(), requestId); System.currentTimeMillis(), requestId);
future.onComplete((res, e) -> { future.whenComplete((res, e) -> {
if (e != null) { if (e != null) {
scheduleRetryTimeRenewal(requestId, 10000L); scheduleRetryTimeRenewal(requestId, 10000L);
return; return;
@ -339,7 +333,7 @@ public class TasksRunnerService implements RemoteExecutorService {
if (params.getRequestId() != null && params.getRequestId().startsWith("00")) { if (params.getRequestId() != null && params.getRequestId().startsWith("00")) {
RFuture<Long> future = renewRetryTime(params.getRequestId()); RFuture<Long> future = renewRetryTime(params.getRequestId());
try { try {
future.get(); future.toCompletableFuture().get();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }

@ -24,8 +24,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.params.TaskParameters; import org.redisson.executor.params.TaskParameters;
import org.redisson.misc.RPromise; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.*; import org.redisson.remote.*;
import java.util.Arrays; import java.util.Arrays;
@ -144,7 +143,7 @@ public class TasksService extends BaseRemoteService {
+ "return 1;" + "return 1;"
+ "end;" + "end;"
+ "return 0;", + "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, Arrays.asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName,
tasksName, requestQueueName, tasksRetryIntervalName, tasksExpirationTimeName), tasksName, requestQueueName, tasksRetryIntervalName, tasksExpirationTimeName),
retryStartTime, request.getId(), encode(request), tasksRetryInterval, expireTime); retryStartTime, request.getId(), encode(request), tasksRetryInterval, expireTime);
return f.toCompletableFuture(); return f.toCompletableFuture();
@ -173,7 +172,7 @@ public class TasksService extends BaseRemoteService {
+ "return 1; " + "return 1; "
+ "end;" + "end;"
+ "return 0;", + "return 0;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName, Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName,
tasksName, tasksRetryIntervalName, tasksExpirationTimeName), tasksName, tasksRetryIntervalName, tasksExpirationTimeName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
return f.toCompletableFuture(); return f.toCompletableFuture();
@ -188,78 +187,67 @@ public class TasksService extends BaseRemoteService {
} }
public RFuture<Boolean> cancelExecutionAsync(RequestId requestId) { public RFuture<Boolean> cancelExecutionAsync(RequestId requestId) {
RPromise<Boolean> result = new RedissonPromise<>();
String requestQueueName = getRequestQueueName(RemoteExecutorService.class); String requestQueueName = getRequestQueueName(RemoteExecutorService.class);
CompletableFuture<Boolean> removeFuture = removeAsync(requestQueueName, requestId); CompletableFuture<Boolean> removeFuture = removeAsync(requestQueueName, requestId);
removeFuture.whenComplete((res, e) -> { CompletableFuture<Boolean> f = removeFuture.thenCompose(res -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (res) { if (res) {
result.trySuccess(true); return CompletableFuture.completedFuture(true);
return;
} }
RMap<String, RemoteServiceCancelRequest> canceledRequests = getMap(cancelRequestMapName); RMap<String, RemoteServiceCancelRequest> canceledRequests = getMap(cancelRequestMapName);
canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true)); canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true));
canceledRequests.expireAsync(60, TimeUnit.SECONDS); canceledRequests.expireAsync(60, TimeUnit.SECONDS);
commandExecutor.getConnectionManager().newTimeout(timeout -> { CompletableFuture<RemoteServiceCancelResponse> response = scheduleCancelResponseCheck(cancelResponseMapName, requestId);
result.trySuccess(false); return response.thenApply(r -> {
}, 60, TimeUnit.SECONDS);
RPromise<RemoteServiceCancelResponse> response = new RedissonPromise<>();
scheduleCancelResponseCheck(cancelResponseMapName, requestId, response);
response.onComplete((r, ex) -> {
if (ex != null) {
result.tryFailure(ex);
return;
}
if (r == null) { if (r == null) {
result.trySuccess(false); return false;
return;
} }
result.trySuccess(r.isCanceled()); return r.isCanceled();
}); });
}); });
return result; removeFuture.thenAccept(r -> {
commandExecutor.getConnectionManager().newTimeout(timeout -> {
f.complete(false);
}, 60, TimeUnit.SECONDS);
});
return new CompletableFutureWrapper<>(f);
} }
private void scheduleCancelResponseCheck(String mapName, RequestId requestId, RPromise<RemoteServiceCancelResponse> cancelResponse) { private CompletableFuture<RemoteServiceCancelResponse> scheduleCancelResponseCheck(String mapName, RequestId requestId) {
CompletableFuture<RemoteServiceCancelResponse> cancelResponse = new CompletableFuture<>();
commandExecutor.getConnectionManager().newTimeout(timeout -> { commandExecutor.getConnectionManager().newTimeout(timeout -> {
if (cancelResponse.isDone()) { if (cancelResponse.isDone()) {
return; return;
} }
RMap<String, RemoteServiceCancelResponse> canceledResponses = getMap(mapName); RMap<String, RemoteServiceCancelResponse> canceledResponses = getMap(mapName);
RFuture<RemoteServiceCancelResponse> future = canceledResponses.removeAsync(requestId.toString()); RFuture<RemoteServiceCancelResponse> removeFuture = canceledResponses.removeAsync(requestId.toString());
future.onComplete((response, ex) -> { CompletableFuture<RemoteServiceCancelResponse> future = removeFuture.thenCompose(response -> {
if (ex != null) {
scheduleCancelResponseCheck(mapName, requestId, cancelResponse);
return;
}
if (response == null) { if (response == null) {
RFuture<Boolean> f = hasTaskAsync(requestId.toString()); RFuture<Boolean> f = hasTaskAsync(requestId.toString());
f.onComplete((r, e) -> { return f.thenCompose(r -> {
if (e != null || r) { if (r) {
scheduleCancelResponseCheck(mapName, requestId, cancelResponse); return scheduleCancelResponseCheck(mapName, requestId);
return;
} }
RemoteServiceCancelResponse resp = new RemoteServiceCancelResponse(requestId.toString(), false); RemoteServiceCancelResponse resp = new RemoteServiceCancelResponse(requestId.toString(), false);
cancelResponse.trySuccess(resp); return CompletableFuture.completedFuture(resp);
}); });
} else {
cancelResponse.trySuccess(response);
} }
}); return CompletableFuture.completedFuture(response);
}).whenComplete((r, ex) -> {
if (ex != null) {
scheduleCancelResponseCheck(mapName, requestId);
}
}).toCompletableFuture();
commandExecutor.transfer(future, cancelResponse);
}, 3000, TimeUnit.MILLISECONDS); }, 3000, TimeUnit.MILLISECONDS);
return cancelResponse;
} }
public RFuture<Boolean> hasTaskAsync(String taskId) { public RFuture<Boolean> hasTaskAsync(String taskId) {

Loading…
Cancel
Save