refactoring

pull/4428/head
Nikita Koksharov 3 years ago
parent 298abd331a
commit 64805cf366

@ -616,16 +616,24 @@ public class RedissonExecutorService implements RScheduledExecutorService {
return new RedissonExecutorBatchFuture(future, result); return new RedissonExecutorBatchFuture(future, result);
} }
private String generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
protected TaskParameters createTaskParameters(Callable<?> task) { protected TaskParameters createTaskParameters(Callable<?> task) {
ClassBody classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state); String id = "00" + generateId();
return new TaskParameters(id, classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state);
} }
protected TaskParameters createTaskParameters(Runnable task) { protected TaskParameters createTaskParameters(Runnable task) {
ClassBody classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state); String id = "00" + generateId();
return new TaskParameters(id, classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state);
} }
@Override @Override
@ -901,10 +909,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
ClassBody classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay); long startTime = System.currentTimeMillis() + unit.toMillis(delay);
ScheduledParameters params = new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime); ScheduledParameters params = createScheduledParameters(timeToLive, ttlUnit, classBody, state, startTime);
if (timeToLive > 0) {
params.setTtl(ttlUnit.toMillis(timeToLive));
}
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(params).toCompletableFuture(); RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(params).toCompletableFuture();
addListener(result); addListener(result);
return createFuture(result, startTime); return createFuture(result, startTime);
@ -924,15 +929,21 @@ public class RedissonExecutorService implements RScheduledExecutorService {
ClassBody classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay); long startTime = System.currentTimeMillis() + unit.toMillis(delay);
ScheduledParameters params = new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime); ScheduledParameters params = createScheduledParameters(timeToLive, ttlUnit, classBody, state, startTime);
if (timeToLive > 0) {
params.setTtl(ttlUnit.toMillis(timeToLive));
}
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(params).toCompletableFuture(); RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(params).toCompletableFuture();
addListener(result); addListener(result);
return createFuture(result, startTime); return createFuture(result, startTime);
} }
private ScheduledParameters createScheduledParameters(long timeToLive, TimeUnit ttlUnit, ClassBody classBody, byte[] state, long startTime) {
String id = generateScheduledTaskId();
ScheduledParameters params = new ScheduledParameters(id, classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime);
if (timeToLive > 0) {
params.setTtl(ttlUnit.toMillis(timeToLive));
}
return params;
}
@Override @Override
public RScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { public RScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAtFixedRateAsync(task, initialDelay, period, unit); RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAtFixedRateAsync(task, initialDelay, period, unit);
@ -947,7 +958,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
ClassBody classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
ScheduledAtFixedRateParameters params = new ScheduledAtFixedRateParameters();
String id = generateScheduledTaskId();
ScheduledAtFixedRateParameters params = new ScheduledAtFixedRateParameters(id);
params.setClassName(classBody.getClazzName()); params.setClassName(classBody.getClazzName());
params.setClassBody(classBody.getClazz()); params.setClassBody(classBody.getClazz());
params.setLambdaBody(classBody.getLambda()); params.setLambdaBody(classBody.getLambda());
@ -960,6 +973,10 @@ public class RedissonExecutorService implements RScheduledExecutorService {
return createFuture(result, startTime); return createFuture(result, startTime);
} }
private String generateScheduledTaskId() {
return "01" + generateId();
}
@Override @Override
public RScheduledFuture<?> schedule(Runnable task, CronSchedule cronSchedule) { public RScheduledFuture<?> schedule(Runnable task, CronSchedule cronSchedule) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, cronSchedule); RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, cronSchedule);
@ -979,8 +996,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new IllegalArgumentException("Wrong cron expression! Unable to calculate start date"); throw new IllegalArgumentException("Wrong cron expression! Unable to calculate start date");
} }
long startTime = startDate.toInstant().toEpochMilli(); long startTime = startDate.toInstant().toEpochMilli();
ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters(); String id = generateScheduledTaskId();
ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters(id);
params.setClassName(classBody.getClazzName()); params.setClassName(classBody.getClazzName());
params.setClassBody(classBody.getClazz()); params.setClassBody(classBody.getClazz());
params.setLambdaBody(classBody.getLambda()); params.setLambdaBody(classBody.getLambda());
@ -1014,8 +1032,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
ClassBody classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
ScheduledWithFixedDelayParameters params = new ScheduledWithFixedDelayParameters(); String id = generateScheduledTaskId();
ScheduledWithFixedDelayParameters params = new ScheduledWithFixedDelayParameters(id);
params.setClassName(classBody.getClazzName()); params.setClassName(classBody.getClazzName());
params.setClassBody(classBody.getClazz()); params.setClassBody(classBody.getClazz());
params.setLambdaBody(classBody.getLambda()); params.setLambdaBody(classBody.getLambda());

@ -15,7 +15,6 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
import io.netty.buffer.ByteBufUtil;
import org.redisson.RedissonExecutorService; import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
@ -30,7 +29,6 @@ import org.redisson.remote.ResponseEntry;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
/** /**
* *
@ -52,7 +50,6 @@ public class ScheduledTasksService extends TasksService {
@Override @Override
protected CompletableFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) { protected CompletableFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
ScheduledParameters params = (ScheduledParameters) request.getArgs()[0]; ScheduledParameters params = (ScheduledParameters) request.getArgs()[0];
params.setRequestId(request.getId());
long expireTime = 0; long expireTime = 0;
if (params.getTtl() > 0) { if (params.getTtl() > 0) {
@ -143,12 +140,9 @@ public class ScheduledTasksService extends TasksService {
@Override @Override
protected String generateRequestId(Object[] args) { protected String generateRequestId(Object[] args) {
if (requestId == null) { if (requestId == null) {
byte[] id = new byte[17]; return super.generateRequestId(args);
ThreadLocalRandom.current().nextBytes(id);
id[0] = 01;
return ByteBufUtil.hexDump(id);
} }
return requestId; return requestId;
} }
} }

@ -15,7 +15,6 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
import io.netty.buffer.ByteBufUtil;
import org.redisson.RedissonExecutorService; import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RMap; import org.redisson.api.RMap;
@ -29,7 +28,10 @@ import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.remote.*; import org.redisson.remote.*;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.*; import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/** /**
* *
@ -109,7 +111,6 @@ public class TasksService extends BaseRemoteService {
protected CompletableFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) { protected CompletableFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
TaskParameters params = (TaskParameters) request.getArgs()[0]; TaskParameters params = (TaskParameters) request.getArgs()[0];
params.setRequestId(request.getId());
long retryStartTime = 0; long retryStartTime = 0;
if (tasksRetryInterval > 0) { if (tasksRetryInterval > 0) {
@ -181,10 +182,8 @@ public class TasksService extends BaseRemoteService {
@Override @Override
protected String generateRequestId(Object[] args) { protected String generateRequestId(Object[] args) {
byte[] id = new byte[17]; TaskParameters params = (TaskParameters) args[0];
ThreadLocalRandom.current().nextBytes(id); return params.getRequestId();
id[0] = 00;
return ByteBufUtil.hexDump(id);
} }
public RFuture<Boolean> cancelExecutionAsync(String requestId) { public RFuture<Boolean> cancelExecutionAsync(String requestId) {

@ -26,6 +26,13 @@ public class ScheduledAtFixedRateParameters extends ScheduledParameters {
private String executorId; private String executorId;
private long spentTime; private long spentTime;
public ScheduledAtFixedRateParameters() {
}
public ScheduledAtFixedRateParameters(String requestId) {
super(requestId);
}
public long getSpentTime() { public long getSpentTime() {
return spentTime; return spentTime;
} }

@ -25,7 +25,14 @@ public class ScheduledCronExpressionParameters extends ScheduledParameters {
private String cronExpression; private String cronExpression;
private String timezone; private String timezone;
private String executorId; private String executorId;
public ScheduledCronExpressionParameters() {
}
public ScheduledCronExpressionParameters(String requestId) {
super(requestId);
}
public String getCronExpression() { public String getCronExpression() {
return cronExpression; return cronExpression;
} }

@ -26,9 +26,13 @@ public class ScheduledParameters extends TaskParameters {
public ScheduledParameters() { public ScheduledParameters() {
} }
public ScheduledParameters(String className, byte[] classBody, byte[] lambdaBody, byte[] state, long startTime) { public ScheduledParameters(String requestId) {
super(className, classBody, lambdaBody, state); super(requestId);
}
public ScheduledParameters(String id, String className, byte[] classBody, byte[] lambdaBody, byte[] state, long startTime) {
super(id, className, classBody, lambdaBody, state);
this.startTime = startTime; this.startTime = startTime;
} }

@ -24,7 +24,14 @@ public class ScheduledWithFixedDelayParameters extends ScheduledParameters {
private long delay; private long delay;
private String executorId; private String executorId;
public ScheduledWithFixedDelayParameters() {
}
public ScheduledWithFixedDelayParameters(String requestId) {
super(requestId);
}
public long getDelay() { public long getDelay() {
return delay; return delay;
} }

@ -35,9 +35,14 @@ public class TaskParameters implements Serializable {
public TaskParameters() { public TaskParameters() {
} }
public TaskParameters(String className, byte[] classBody, byte[] lambdaBody, byte[] state) { public TaskParameters(String requestId) {
this.requestId = requestId;
}
public TaskParameters(String requestId, String className, byte[] classBody, byte[] lambdaBody, byte[] state) {
super(); super();
this.requestId = requestId;
this.className = className; this.className = className;
this.classBody = classBody; this.classBody = classBody;
this.state = state; this.state = state;

Loading…
Cancel
Save