refactoring

pull/4428/head
Nikita Koksharov
parent fd595f5c83
commit c959fc6f75

@ -29,7 +29,6 @@ import org.redisson.executor.*;
import org.redisson.executor.params.*;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Injector;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.redisson.remote.ResponseEntry.Result;
import org.slf4j.Logger;
@ -813,7 +812,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
return createFuture(result);
}
private void cancelResponseHandling(RequestId requestId) {
private void cancelResponseHandling(String requestId) {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
@ -852,7 +851,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
return f;
}
private void storeReference(RExecutorFuture<?> future, RequestId requestId) {
private void storeReference(RExecutorFuture<?> future, String requestId) {
while (true) {
RedissonExecutorFutureReference r = (RedissonExecutorFutureReference) referenceDueue.poll();
if (r == null) {
@ -1037,9 +1036,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RFuture<Boolean> cancelTaskAsync(String taskId) {
if (taskId.startsWith("01")) {
return scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId));
return scheduledRemoteService.cancelExecutionAsync(taskId);
}
return executorRemoteService.cancelExecutionAsync(new RequestId(taskId));
return executorRemoteService.cancelExecutionAsync(taskId);
}
private <T> T poll(List<CompletableFuture<?>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {

@ -91,7 +91,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
@Override
protected CompletableFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
protected CompletableFuture<Boolean> removeAsync(String requestQueueName, String taskId) {
RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then "
+ "redis.call('hdel', KEYS[2], ARGV[1]);" +
@ -99,7 +99,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
"end;"
+ "return 0;",
Arrays.asList(requestQueueName, requestQueueName + ":tasks"),
taskId.toString());
taskId);
return f.toCompletableFuture();
}
@ -384,7 +384,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
CompletableFuture<RRemoteServiceResponse> responsePromise = new CompletableFuture<>();
CompletableFuture<RemoteServiceCancelRequest> cancelRequestFuture = new CompletableFuture<>();
scheduleCheck(cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture);
scheduleCheck(cancelRequestMapName, request.getId(), cancelRequestFuture);
responsePromise.whenComplete((result, e) -> {
if (request.getOptions().isResultExpected()

@ -17,7 +17,6 @@ package org.redisson.executor;
import org.redisson.api.RExecutorFuture;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.remote.RequestId;
import java.util.concurrent.CompletableFuture;
@ -29,20 +28,20 @@ import java.util.concurrent.CompletableFuture;
*/
public class RedissonExecutorFuture<V> extends CompletableFutureWrapper<V> implements RExecutorFuture<V> {
private final RequestId taskId;
private final String taskId;
public RedissonExecutorFuture(RemotePromise<V> promise) {
this(promise, promise.getRequestId());
}
public RedissonExecutorFuture(CompletableFuture<V> promise, RequestId taskId) {
public RedissonExecutorFuture(CompletableFuture<V> promise, String taskId) {
super(promise);
this.taskId = taskId;
}
@Override
public String getTaskId() {
return taskId.toString();
return taskId;
}
}

@ -16,7 +16,6 @@
package org.redisson.executor;
import org.redisson.api.RExecutorFuture;
import org.redisson.remote.RequestId;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
@ -30,9 +29,9 @@ import java.util.concurrent.CompletableFuture;
public class RedissonExecutorFutureReference extends WeakReference<RExecutorFuture<?>> {
private final CompletableFuture<?> promise;
private final RequestId requestId;
private final String requestId;
public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture<?> referent, ReferenceQueue<? super RExecutorFuture<?>> q, CompletableFuture<?> promise) {
public RedissonExecutorFutureReference(String requestId, RExecutorFuture<?> referent, ReferenceQueue<? super RExecutorFuture<?>> q, CompletableFuture<?> promise) {
super(referent, q);
this.requestId = requestId;
this.promise = promise;
@ -42,7 +41,7 @@ public class RedissonExecutorFutureReference extends WeakReference<RExecutorFutu
return promise;
}
public RequestId getRequestId() {
public String getRequestId() {
return requestId;
}

@ -17,7 +17,6 @@ package org.redisson.executor;
import org.redisson.api.RScheduledFuture;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.remote.RequestId;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@ -31,7 +30,7 @@ import java.util.concurrent.TimeUnit;
public class RedissonScheduledFuture<V> extends CompletableFutureWrapper<V> implements RScheduledFuture<V> {
private final long scheduledExecutionTime;
private final RequestId taskId;
private final String taskId;
private final RemotePromise<V> promise;
public RedissonScheduledFuture(RemotePromise<V> promise, long scheduledExecutionTime) {
@ -69,7 +68,7 @@ public class RedissonScheduledFuture<V> extends CompletableFutureWrapper<V> impl
@Override
public String getTaskId() {
return taskId.toString();
return taskId;
}
}

@ -15,8 +15,6 @@
*/
package org.redisson.executor;
import org.redisson.remote.RequestId;
import java.util.concurrent.CompletableFuture;
/**
@ -26,15 +24,15 @@ import java.util.concurrent.CompletableFuture;
*/
public class RemotePromise<T> extends CompletableFuture<T> {
private final RequestId requestId;
private final String requestId;
private CompletableFuture<Boolean> addFuture;
public RemotePromise(RequestId requestId) {
public RemotePromise(String requestId) {
super();
this.requestId = requestId;
}
public RequestId getRequestId() {
public String getRequestId() {
return requestId;
}

@ -15,6 +15,7 @@
*/
package org.redisson.executor;
import io.netty.buffer.ByteBufUtil;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
@ -24,7 +25,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import java.util.Arrays;
@ -39,13 +39,13 @@ import java.util.concurrent.ThreadLocalRandom;
*/
public class ScheduledTasksService extends TasksService {
private RequestId requestId;
private String requestId;
public ScheduledTasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String redissonId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, redissonId, responses);
}
public void setRequestId(RequestId requestId) {
public void setRequestId(String requestId) {
this.requestId = requestId;
}
@ -95,7 +95,7 @@ public class ScheduledTasksService extends TasksService {
}
@Override
protected CompletableFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
protected CompletableFuture<Boolean> removeAsync(String requestQueueName, String taskId) {
RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove from scheduler queue
"if redis.call('exists', KEYS[3]) == 0 then "
@ -141,12 +141,12 @@ public class ScheduledTasksService extends TasksService {
}
@Override
protected RequestId generateRequestId() {
protected String generateRequestId() {
if (requestId == null) {
byte[] id = new byte[17];
ThreadLocalRandom.current().nextBytes(id);
id[0] = 01;
return new RequestId(id);
return ByteBufUtil.hexDump(id);
}
return requestId;
}

@ -34,7 +34,6 @@ import org.redisson.executor.params.*;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import org.redisson.misc.Injector;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import java.io.ByteArrayInputStream;
@ -172,7 +171,7 @@ public class TasksRunnerService implements RemoteExecutorService {
scheduledRemoteService.setSchedulerQueueName(schedulerQueueName);
scheduledRemoteService.setSchedulerChannelName(schedulerChannelName);
scheduledRemoteService.setTasksName(tasksName);
scheduledRemoteService.setRequestId(new RequestId(requestId));
scheduledRemoteService.setRequestId(requestId);
scheduledRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName);
scheduledRemoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
RemoteExecutorServiceAsync asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());

@ -15,6 +15,7 @@
*/
package org.redisson.executor;
import io.netty.buffer.ByteBufUtil;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
@ -150,7 +151,7 @@ public class TasksService extends BaseRemoteService {
}
@Override
protected CompletableFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
protected CompletableFuture<Boolean> removeAsync(String requestQueueName, String taskId) {
RFuture<Boolean> f = commandExecutor.evalWriteNoRetryAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zrem', KEYS[2], 'ff' .. ARGV[1]); "
+ "redis.call('zrem', KEYS[8], ARGV[1]); "
@ -174,19 +175,19 @@ public class TasksService extends BaseRemoteService {
+ "return 0;",
Arrays.asList(requestQueueName, schedulerQueueName, tasksCounterName, statusName, terminationTopicName,
tasksName, tasksRetryIntervalName, tasksExpirationTimeName),
taskId.toString(), RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
taskId, RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
return f.toCompletableFuture();
}
@Override
protected RequestId generateRequestId() {
protected String generateRequestId() {
byte[] id = new byte[17];
ThreadLocalRandom.current().nextBytes(id);
id[0] = 00;
return new RequestId(id);
return ByteBufUtil.hexDump(id);
}
public RFuture<Boolean> cancelExecutionAsync(RequestId requestId) {
public RFuture<Boolean> cancelExecutionAsync(String requestId) {
String requestQueueName = getRequestQueueName(RemoteExecutorService.class);
CompletableFuture<Boolean> removeFuture = removeAsync(requestQueueName, requestId);
CompletableFuture<Boolean> f = removeFuture.thenCompose(res -> {
@ -195,7 +196,7 @@ public class TasksService extends BaseRemoteService {
}
RMap<String, RemoteServiceCancelRequest> canceledRequests = getMap(cancelRequestMapName);
canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true));
canceledRequests.putAsync(requestId, new RemoteServiceCancelRequest(true, true));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);
CompletableFuture<RemoteServiceCancelResponse> response = scheduleCancelResponseCheck(cancelResponseMapName, requestId);
@ -216,7 +217,7 @@ public class TasksService extends BaseRemoteService {
return new CompletableFutureWrapper<>(f);
}
private CompletableFuture<RemoteServiceCancelResponse> scheduleCancelResponseCheck(String mapName, RequestId requestId) {
private CompletableFuture<RemoteServiceCancelResponse> scheduleCancelResponseCheck(String mapName, String requestId) {
CompletableFuture<RemoteServiceCancelResponse> cancelResponse = new CompletableFuture<>();
commandExecutor.getConnectionManager().newTimeout(timeout -> {
@ -225,16 +226,16 @@ public class TasksService extends BaseRemoteService {
}
RMap<String, RemoteServiceCancelResponse> canceledResponses = getMap(mapName);
RFuture<RemoteServiceCancelResponse> removeFuture = canceledResponses.removeAsync(requestId.toString());
RFuture<RemoteServiceCancelResponse> removeFuture = canceledResponses.removeAsync(requestId);
CompletableFuture<RemoteServiceCancelResponse> future = removeFuture.thenCompose(response -> {
if (response == null) {
RFuture<Boolean> f = hasTaskAsync(requestId.toString());
RFuture<Boolean> f = hasTaskAsync(requestId);
return f.thenCompose(r -> {
if (r) {
return scheduleCancelResponseCheck(mapName, requestId);
}
RemoteServiceCancelResponse resp = new RemoteServiceCancelResponse(requestId.toString(), false);
RemoteServiceCancelResponse resp = new RemoteServiceCancelResponse(requestId, false);
return CompletableFuture.completedFuture(resp);
});
}

@ -91,7 +91,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RequestId requestId = remoteService.generateRequestId();
String requestId = remoteService.generateRequestId();
if (method.getName().equals("toString")) {
return getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + requestId;
@ -111,7 +111,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
Long ackTimeout = optionsCopy.getAckTimeoutInMillis();
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(),
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId, method.getName(),
remoteService.getMethodSignature(method), args, optionsCopy, System.currentTimeMillis());
CompletableFuture<RemoteServiceAck> ackFuture;
@ -256,7 +256,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
}
private RemotePromise<Object> createResultPromise(RemoteInvocationOptions optionsCopy,
RequestId requestId, String requestQueueName, Long ackTimeout) {
String requestId, String requestQueueName, Long ackTimeout) {
RemotePromise<Object> result = new RemotePromise<Object>(requestId) {
@Override
@ -288,7 +288,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
boolean ackNotSent = commandExecutor.get(future);
if (ackNotSent) {
RList<Object> list = new RedissonList<>(LongCodec.INSTANCE, commandExecutor, requestQueueName, null);
list.remove(requestId.toString());
list.remove(requestId);
super.cancel(mayInterruptIfRunning);
return true;
}
@ -333,7 +333,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
}
private CompletableFuture<Boolean> cancelAsync(RemoteInvocationOptions optionsCopy, RemotePromise<Object> promise,
RequestId requestId, String requestQueueName, Long ackTimeout, boolean mayInterruptIfRunning) {
String requestId, String requestQueueName, Long ackTimeout, boolean mayInterruptIfRunning) {
if (promise.isCancelled()) {
return CompletableFuture.completedFuture(true);
}
@ -361,7 +361,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
return future.thenCompose(ackNotSent -> {
if (ackNotSent) {
RList<Object> list = new RedissonList<>(LongCodec.INSTANCE, commandExecutor, requestQueueName, null);
CompletableFuture<Boolean> removeFuture = list.removeAsync(requestId.toString()).toCompletableFuture();
CompletableFuture<Boolean> removeFuture = list.removeAsync(requestId).toCompletableFuture();
return removeFuture.thenApply(res -> {
promise.doCancel(mayInterruptIfRunning);
return true;

@ -72,7 +72,7 @@ public abstract class BaseRemoteProxy {
}
protected CompletionStage<RemoteServiceAck> tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy,
String ackName, RequestId requestId) {
String ackName, String requestId) {
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteNoRetryAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
@ -90,7 +90,7 @@ public abstract class BaseRemoteProxy {
}
protected final <T extends RRemoteServiceResponse> CompletableFuture<T> pollResponse(long timeout,
RequestId requestId, boolean insertFirst) {
String requestId, boolean insertFirst) {
CompletableFuture<T> responseFuture = new CompletableFuture<T>();
ResponseEntry entry;
@ -101,7 +101,7 @@ public abstract class BaseRemoteProxy {
ScheduledFuture<?> responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture);
Map<RequestId, List<Result>> entryResponses = entry.getResponses();
Map<String, List<Result>> entryResponses = entry.getResponses();
List<Result> list = entryResponses.computeIfAbsent(requestId, k -> new ArrayList<>(3));
Result res = new Result(responseFuture, responseTimeoutFuture);
@ -120,7 +120,7 @@ public abstract class BaseRemoteProxy {
return responseFuture;
}
private <T extends RRemoteServiceResponse> ScheduledFuture<?> createResponseTimeout(long timeout, RequestId requestId, CompletableFuture<T> responseFuture) {
private <T extends RRemoteServiceResponse> ScheduledFuture<?> createResponseTimeout(long timeout, String requestId, CompletableFuture<T> responseFuture) {
return commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
@ -148,7 +148,7 @@ public abstract class BaseRemoteProxy {
}, timeout, TimeUnit.MILLISECONDS);
}
private <T extends RRemoteServiceResponse> void addCancelHandling(RequestId requestId, CompletableFuture<T> responseFuture) {
private <T extends RRemoteServiceResponse> void addCancelHandling(String requestId, CompletableFuture<T> responseFuture) {
responseFuture.whenComplete((res, ex) -> {
if (!responseFuture.isCancelled()) {
return;
@ -208,7 +208,7 @@ public abstract class BaseRemoteProxy {
return;
}
RequestId key = new RequestId(response.getId());
String key = response.getId();
List<Result> list = entry.getResponses().get(key);
if (list == null) {
pollResponse();

@ -77,15 +77,10 @@ public abstract class BaseRemoteService {
return "{remote_response}:" + executorId;
}
protected String getAckName(RequestId requestId) {
return "{" + name + ":remote" + "}:" + requestId + ":ack";
}
protected String getAckName(String requestId) {
return "{" + name + ":remote" + "}:" + requestId + ":ack";
}
public String getRequestQueueName(Class<?> remoteInterface) {
return requestQueueNameCache.computeIfAbsent(remoteInterface, k -> "{" + name + ":" + k.getName() + "}");
}
@ -146,7 +141,7 @@ public abstract class BaseRemoteService {
return new RedissonMap<>(new CompositeCodec(StringCodec.INSTANCE, codec, codec), commandExecutor, name);
}
protected <T> void scheduleCheck(String mapName, RequestId requestId, CompletableFuture<T> cancelRequest) {
protected <T> void scheduleCheck(String mapName, String requestId, CompletableFuture<T> cancelRequest) {
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
@ -155,7 +150,7 @@ public abstract class BaseRemoteService {
}
RMap<String, T> canceledRequests = getMap(mapName);
RFuture<T> future = canceledRequests.removeAsync(requestId.toString());
RFuture<T> future = canceledRequests.removeAsync(requestId);
future.whenComplete((request, ex) -> {
if (cancelRequest.isDone()) {
return;
@ -175,16 +170,16 @@ public abstract class BaseRemoteService {
}, 3000, TimeUnit.MILLISECONDS);
}
protected RequestId generateRequestId() {
protected String generateRequestId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return new RequestId(id);
return new String(id);
}
protected abstract CompletableFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request,
RemotePromise<Object> result);
protected abstract CompletableFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId);
protected abstract CompletableFuture<Boolean> removeAsync(String requestQueueName, String taskId);
protected long[] getMethodSignature(Method method) {
long[] result = methodSignaturesCache.get(method);

@ -1,66 +0,0 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.remote;
import java.util.Arrays;
import io.netty.buffer.ByteBufUtil;
/**
*
* @author Nikita Koksharov
*
*/
public class RequestId {
private final byte[] id;
public RequestId(String id) {
this(ByteBufUtil.decodeHexDump(id));
}
public RequestId(byte[] buf) {
id = buf;
}
@Override
public String toString() {
return ByteBufUtil.hexDump(id);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(id);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RequestId other = (RequestId) obj;
if (!Arrays.equals(id, other.id))
return false;
return true;
}
}

@ -50,10 +50,10 @@ public class ResponseEntry {
}
private final Map<RequestId, List<Result>> responses = new HashMap<RequestId, List<Result>>();
private final Map<String, List<Result>> responses = new HashMap<String, List<Result>>();
private final AtomicBoolean started = new AtomicBoolean();
public Map<RequestId, List<Result>> getResponses() {
public Map<String, List<Result>> getResponses() {
return responses;
}

@ -60,10 +60,10 @@ public class SyncRemoteProxy extends BaseRemoteProxy {
&& !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE)))
throw new IllegalArgumentException("The noResult option only supports void return value");
RequestId requestId = remoteService.generateRequestId();
String requestId = remoteService.generateRequestId();
String requestQueueName = getRequestQueueName(remoteInterface);
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(),
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId, method.getName(),
remoteService.getMethodSignature(method), args, optionsCopy, System.currentTimeMillis());
CompletableFuture<RemoteServiceAck> ackFuture;

Loading…
Cancel
Save