From 64805cf36606556aa2f9ddeb57b76d38db31978c Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 1 Jul 2022 11:52:59 +0300 Subject: [PATCH] refactoring --- .../org/redisson/RedissonExecutorService.java | 49 +++++++++++++------ .../executor/ScheduledTasksService.java | 10 +--- .../org/redisson/executor/TasksService.java | 13 +++-- .../ScheduledAtFixedRateParameters.java | 7 +++ .../ScheduledCronExpressionParameters.java | 9 +++- .../executor/params/ScheduledParameters.java | 10 ++-- .../ScheduledWithFixedDelayParameters.java | 9 +++- .../executor/params/TaskParameters.java | 9 +++- 8 files changed, 79 insertions(+), 37 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index c6123e996..707eede34 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -616,16 +616,24 @@ public class RedissonExecutorService implements RScheduledExecutorService { return new RedissonExecutorBatchFuture(future, result); } + private String generateId() { + byte[] id = new byte[16]; + ThreadLocalRandom.current().nextBytes(id); + return ByteBufUtil.hexDump(id); + } + protected TaskParameters createTaskParameters(Callable task) { ClassBody classBody = getClassBody(task); byte[] state = encode(task); - return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state); + String id = "00" + generateId(); + return new TaskParameters(id, classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state); } protected TaskParameters createTaskParameters(Runnable task) { ClassBody classBody = getClassBody(task); byte[] state = encode(task); - return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state); + String id = "00" + generateId(); + return new TaskParameters(id, classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state); } @Override @@ -901,10 +909,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { ClassBody classBody = getClassBody(task); byte[] state = encode(task); long startTime = System.currentTimeMillis() + unit.toMillis(delay); - ScheduledParameters params = new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime); - if (timeToLive > 0) { - params.setTtl(ttlUnit.toMillis(timeToLive)); - } + ScheduledParameters params = createScheduledParameters(timeToLive, ttlUnit, classBody, state, startTime); RemotePromise result = (RemotePromise) asyncScheduledService.scheduleRunnable(params).toCompletableFuture(); addListener(result); return createFuture(result, startTime); @@ -924,15 +929,21 @@ public class RedissonExecutorService implements RScheduledExecutorService { ClassBody classBody = getClassBody(task); byte[] state = encode(task); long startTime = System.currentTimeMillis() + unit.toMillis(delay); - ScheduledParameters params = new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime); - if (timeToLive > 0) { - params.setTtl(ttlUnit.toMillis(timeToLive)); - } + ScheduledParameters params = createScheduledParameters(timeToLive, ttlUnit, classBody, state, startTime); RemotePromise result = (RemotePromise) asyncScheduledService.scheduleCallable(params).toCompletableFuture(); addListener(result); return createFuture(result, startTime); } + private ScheduledParameters createScheduledParameters(long timeToLive, TimeUnit ttlUnit, ClassBody classBody, byte[] state, long startTime) { + String id = generateScheduledTaskId(); + ScheduledParameters params = new ScheduledParameters(id, classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime); + if (timeToLive > 0) { + params.setTtl(ttlUnit.toMillis(timeToLive)); + } + return params; + } + @Override public RScheduledFuture scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAtFixedRateAsync(task, initialDelay, period, unit); @@ -947,7 +958,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { ClassBody classBody = getClassBody(task); byte[] state = encode(task); long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); - ScheduledAtFixedRateParameters params = new ScheduledAtFixedRateParameters(); + + String id = generateScheduledTaskId(); + ScheduledAtFixedRateParameters params = new ScheduledAtFixedRateParameters(id); params.setClassName(classBody.getClazzName()); params.setClassBody(classBody.getClazz()); params.setLambdaBody(classBody.getLambda()); @@ -960,6 +973,10 @@ public class RedissonExecutorService implements RScheduledExecutorService { return createFuture(result, startTime); } + private String generateScheduledTaskId() { + return "01" + generateId(); + } + @Override public RScheduledFuture schedule(Runnable task, CronSchedule cronSchedule) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(task, cronSchedule); @@ -979,8 +996,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { throw new IllegalArgumentException("Wrong cron expression! Unable to calculate start date"); } long startTime = startDate.toInstant().toEpochMilli(); - - ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters(); + + String id = generateScheduledTaskId(); + ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters(id); params.setClassName(classBody.getClazzName()); params.setClassBody(classBody.getClazz()); params.setLambdaBody(classBody.getLambda()); @@ -1014,8 +1032,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { ClassBody classBody = getClassBody(task); byte[] state = encode(task); long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); - - ScheduledWithFixedDelayParameters params = new ScheduledWithFixedDelayParameters(); + + String id = generateScheduledTaskId(); + ScheduledWithFixedDelayParameters params = new ScheduledWithFixedDelayParameters(id); params.setClassName(classBody.getClazzName()); params.setClassBody(classBody.getClazz()); params.setLambdaBody(classBody.getLambda()); diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index 326b7aa96..52e5222a0 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -15,7 +15,6 @@ */ package org.redisson.executor; -import io.netty.buffer.ByteBufUtil; import org.redisson.RedissonExecutorService; import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; @@ -30,7 +29,6 @@ import org.redisson.remote.ResponseEntry; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ThreadLocalRandom; /** * @@ -52,7 +50,6 @@ public class ScheduledTasksService extends TasksService { @Override protected CompletableFuture addAsync(String requestQueueName, RemoteServiceRequest request) { ScheduledParameters params = (ScheduledParameters) request.getArgs()[0]; - params.setRequestId(request.getId()); long expireTime = 0; if (params.getTtl() > 0) { @@ -143,12 +140,9 @@ public class ScheduledTasksService extends TasksService { @Override protected String generateRequestId(Object[] args) { if (requestId == null) { - byte[] id = new byte[17]; - ThreadLocalRandom.current().nextBytes(id); - id[0] = 01; - return ByteBufUtil.hexDump(id); + return super.generateRequestId(args); } return requestId; - } + } } diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 5787aee59..6b7c7e1cc 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -15,7 +15,6 @@ */ package org.redisson.executor; -import io.netty.buffer.ByteBufUtil; import org.redisson.RedissonExecutorService; import org.redisson.api.RFuture; import org.redisson.api.RMap; @@ -29,7 +28,10 @@ import org.redisson.misc.CompletableFutureWrapper; import org.redisson.remote.*; import java.util.Arrays; -import java.util.concurrent.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; /** * @@ -109,7 +111,6 @@ public class TasksService extends BaseRemoteService { protected CompletableFuture addAsync(String requestQueueName, RemoteServiceRequest request) { TaskParameters params = (TaskParameters) request.getArgs()[0]; - params.setRequestId(request.getId()); long retryStartTime = 0; if (tasksRetryInterval > 0) { @@ -181,10 +182,8 @@ public class TasksService extends BaseRemoteService { @Override protected String generateRequestId(Object[] args) { - byte[] id = new byte[17]; - ThreadLocalRandom.current().nextBytes(id); - id[0] = 00; - return ByteBufUtil.hexDump(id); + TaskParameters params = (TaskParameters) args[0]; + return params.getRequestId(); } public RFuture cancelExecutionAsync(String requestId) { diff --git a/redisson/src/main/java/org/redisson/executor/params/ScheduledAtFixedRateParameters.java b/redisson/src/main/java/org/redisson/executor/params/ScheduledAtFixedRateParameters.java index c9a119ad5..86ec0f8ab 100644 --- a/redisson/src/main/java/org/redisson/executor/params/ScheduledAtFixedRateParameters.java +++ b/redisson/src/main/java/org/redisson/executor/params/ScheduledAtFixedRateParameters.java @@ -26,6 +26,13 @@ public class ScheduledAtFixedRateParameters extends ScheduledParameters { private String executorId; private long spentTime; + public ScheduledAtFixedRateParameters() { + } + + public ScheduledAtFixedRateParameters(String requestId) { + super(requestId); + } + public long getSpentTime() { return spentTime; } diff --git a/redisson/src/main/java/org/redisson/executor/params/ScheduledCronExpressionParameters.java b/redisson/src/main/java/org/redisson/executor/params/ScheduledCronExpressionParameters.java index 973af3baa..7dc28ebc3 100644 --- a/redisson/src/main/java/org/redisson/executor/params/ScheduledCronExpressionParameters.java +++ b/redisson/src/main/java/org/redisson/executor/params/ScheduledCronExpressionParameters.java @@ -25,7 +25,14 @@ public class ScheduledCronExpressionParameters extends ScheduledParameters { private String cronExpression; private String timezone; private String executorId; - + + public ScheduledCronExpressionParameters() { + } + + public ScheduledCronExpressionParameters(String requestId) { + super(requestId); + } + public String getCronExpression() { return cronExpression; } diff --git a/redisson/src/main/java/org/redisson/executor/params/ScheduledParameters.java b/redisson/src/main/java/org/redisson/executor/params/ScheduledParameters.java index ec2019593..e3ec268d5 100644 --- a/redisson/src/main/java/org/redisson/executor/params/ScheduledParameters.java +++ b/redisson/src/main/java/org/redisson/executor/params/ScheduledParameters.java @@ -26,9 +26,13 @@ public class ScheduledParameters extends TaskParameters { public ScheduledParameters() { } - - public ScheduledParameters(String className, byte[] classBody, byte[] lambdaBody, byte[] state, long startTime) { - super(className, classBody, lambdaBody, state); + + public ScheduledParameters(String requestId) { + super(requestId); + } + + public ScheduledParameters(String id, String className, byte[] classBody, byte[] lambdaBody, byte[] state, long startTime) { + super(id, className, classBody, lambdaBody, state); this.startTime = startTime; } diff --git a/redisson/src/main/java/org/redisson/executor/params/ScheduledWithFixedDelayParameters.java b/redisson/src/main/java/org/redisson/executor/params/ScheduledWithFixedDelayParameters.java index 2b3320849..a6e160482 100644 --- a/redisson/src/main/java/org/redisson/executor/params/ScheduledWithFixedDelayParameters.java +++ b/redisson/src/main/java/org/redisson/executor/params/ScheduledWithFixedDelayParameters.java @@ -24,7 +24,14 @@ public class ScheduledWithFixedDelayParameters extends ScheduledParameters { private long delay; private String executorId; - + + public ScheduledWithFixedDelayParameters() { + } + + public ScheduledWithFixedDelayParameters(String requestId) { + super(requestId); + } + public long getDelay() { return delay; } diff --git a/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java b/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java index 7981b4a1c..d1c7a09ba 100644 --- a/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java +++ b/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java @@ -35,9 +35,14 @@ public class TaskParameters implements Serializable { public TaskParameters() { } - - public TaskParameters(String className, byte[] classBody, byte[] lambdaBody, byte[] state) { + + public TaskParameters(String requestId) { + this.requestId = requestId; + } + + public TaskParameters(String requestId, String className, byte[] classBody, byte[] lambdaBody, byte[] state) { super(); + this.requestId = requestId; this.className = className; this.classBody = classBody; this.state = state;