pull/1603/head
Nikita 7 years ago
parent 70b4e1856a
commit bb19fd7997

@ -15,19 +15,66 @@
*/
package org.redisson;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import org.redisson.api.*;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.CronSchedule;
import org.redisson.api.ExecutorOptions;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RExecutorBatchFuture;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.ChannelName;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.*;
import org.redisson.executor.RedissonExecutorBatchFuture;
import org.redisson.executor.RedissonExecutorFuture;
import org.redisson.executor.RedissonExecutorFutureReference;
import org.redisson.executor.RedissonExecutorRemoteService;
import org.redisson.executor.RedissonScheduledFuture;
import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync;
import org.redisson.executor.RemotePromise;
import org.redisson.executor.ScheduledTasksService;
import org.redisson.executor.TasksBatchService;
import org.redisson.executor.TasksRunnerService;
import org.redisson.executor.TasksService;
import org.redisson.executor.params.ScheduledAtFixedRateParameters;
import org.redisson.executor.params.ScheduledCronExpressionParameters;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.executor.params.ScheduledWithFixedDelayParameters;
import org.redisson.executor.params.TaskParameters;
import org.redisson.misc.Injector;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
@ -38,14 +85,10 @@ import org.redisson.remote.ResponseEntry.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Modifier;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -277,7 +320,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state, null);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state));
syncExecute(promise);
}
@ -293,7 +336,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state, null);
asyncServiceWithoutResult.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state));
}
List<Boolean> result = (List<Boolean>) executorRemoteService.executeAdd();
@ -466,7 +509,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state));
addListener(result);
return createFuture(result);
}
@ -484,7 +527,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state));
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture);
}
@ -510,7 +553,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state));
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture);
}
@ -612,7 +655,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state));
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture);
}
@ -638,7 +681,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state));
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture);
}
@ -682,7 +725,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state));
addListener(result);
return createFuture(result);
}
@ -751,7 +794,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(task.getClass().getName(), classBody, state, startTime, null);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(new ScheduledParameters(task.getClass().getName(), classBody, state, startTime));
addListener(result);
return createFuture(result, startTime);
@ -772,7 +815,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(task.getClass().getName(), classBody, state, startTime, null);
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(new ScheduledParameters(task.getClass().getName(), classBody, state, startTime));
addListener(result);
return createFuture(result, startTime);
}
@ -792,7 +835,14 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleAtFixedRate(task.getClass().getName(), classBody, state, startTime, unit.toMillis(period), executorId, null);
ScheduledAtFixedRateParameters params = new ScheduledAtFixedRateParameters();
params.setClassName(task.getClass().getName());
params.setClassBody(classBody);
params.setState(state);
params.setStartTime(startTime);
params.setPeriod(unit.toMillis(period));
params.setExecutorId(executorId);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleAtFixedRate(params);
addListener(result);
return createFuture(result, startTime);
}
@ -816,7 +866,16 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new IllegalArgumentException("Wrong cron expression! Unable to calculate start date");
}
long startTime = startDate.getTime();
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(task.getClass().getName(), classBody, state, startTime, cronSchedule.getExpression().getCronExpression(),cronSchedule.getExpression().getTimeZone().getID(), executorId, null);
ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters();
params.setClassName(task.getClass().getName());
params.setClassBody(classBody);
params.setState(state);
params.setStartTime(startTime);
params.setCronExpression(cronSchedule.getExpression().getCronExpression());
params.setTimezone(cronSchedule.getExpression().getTimeZone().getID());
params.setExecutorId(executorId);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(params);
addListener(result);
RedissonScheduledFuture<Void> f = new RedissonScheduledFuture<Void>(result, startTime) {
public long getDelay(TimeUnit unit) {
@ -842,7 +901,15 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleWithFixedDelay(task.getClass().getName(), classBody, state, startTime, unit.toMillis(delay), executorId, null);
ScheduledWithFixedDelayParameters params = new ScheduledWithFixedDelayParameters();
params.setClassName(task.getClass().getName());
params.setClassBody(classBody);
params.setState(state);
params.setStartTime(startTime);
params.setDelay(unit.toMillis(delay));
params.setExecutorId(executorId);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleWithFixedDelay(params);
addListener(result);
return createFuture(result, startTime);
}

@ -15,6 +15,12 @@
*/
package org.redisson.executor;
import org.redisson.executor.params.ScheduledAtFixedRateParameters;
import org.redisson.executor.params.ScheduledCronExpressionParameters;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.executor.params.ScheduledWithFixedDelayParameters;
import org.redisson.executor.params.TaskParameters;
/**
*
* @author Nikita Koksharov
@ -22,18 +28,18 @@ package org.redisson.executor;
*/
public interface RemoteExecutorService {
Object executeCallable(String className, byte[] classBody, byte[] state, String requestId);
Object executeCallable(TaskParameters params);
void executeRunnable(String className, byte[] classBody, byte[] state, String requestId);
void executeRunnable(TaskParameters params);
Object scheduleCallable(String className, byte[] classBody, byte[] state, long startTime, String requestId);
Object scheduleCallable(ScheduledParameters params);
void scheduleRunnable(String className, byte[] classBody, byte[] state, long startTime, String requestId);
void scheduleRunnable(ScheduledParameters params);
void scheduleAtFixedRate(String className, byte[] classBody, byte[] state, long startTime, long period, String executorId, String requestId);
void scheduleAtFixedRate(ScheduledAtFixedRateParameters params);
void scheduleWithFixedDelay(String className, byte[] classBody, byte[] state, long startTime, long delay, String executorId, String requestId);
void scheduleWithFixedDelay(ScheduledWithFixedDelayParameters params);
void schedule(String className, byte[] classBody, byte[] state, long startTime, String cronExpression, String timezone, String executorId, String requestId);
void schedule(ScheduledCronExpressionParameters params);
}

@ -17,6 +17,11 @@ package org.redisson.executor;
import org.redisson.api.RFuture;
import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.executor.params.ScheduledAtFixedRateParameters;
import org.redisson.executor.params.ScheduledCronExpressionParameters;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.executor.params.ScheduledWithFixedDelayParameters;
import org.redisson.executor.params.TaskParameters;
/**
*
@ -26,18 +31,18 @@ import org.redisson.api.annotation.RRemoteAsync;
@RRemoteAsync(RemoteExecutorService.class)
public interface RemoteExecutorServiceAsync {
<T> RFuture<T> executeCallable(String className, byte[] classBody, byte[] state, String requestId);
<T> RFuture<T> executeCallable(TaskParameters params);
RFuture<Void> executeRunnable(String className, byte[] classBody, byte[] state, String requestId);
RFuture<Void> executeRunnable(TaskParameters params);
<T> RFuture<T> scheduleCallable(String className, byte[] classBody, byte[] state, long startTime, String requestId);
<T> RFuture<T> scheduleCallable(ScheduledParameters params);
RFuture<Void> scheduleRunnable(String className, byte[] classBody, byte[] state, long startTime, String requestId);
RFuture<Void> scheduleRunnable(ScheduledParameters params);
RFuture<Void> scheduleAtFixedRate(String className, byte[] classBody, byte[] state, long startTime, long period, String executorId, String requestId);
RFuture<Void> scheduleAtFixedRate(ScheduledAtFixedRateParameters params);
RFuture<Void> scheduleWithFixedDelay(String className, byte[] classBody, byte[] state, long startTime, long delay, String executorId, String requestId);
RFuture<Void> scheduleWithFixedDelay(ScheduledWithFixedDelayParameters params);
RFuture<Void> schedule(String className, byte[] classBody, byte[] state, long startTime, String cronExpression, String timezone, String executorId, String requestId);
RFuture<Void> schedule(ScheduledCronExpressionParameters params);
}

@ -26,6 +26,7 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
@ -51,19 +52,8 @@ public class ScheduledTasksService extends TasksService {
@Override
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
int requestIndex = 0;
if ("scheduleCallable".equals(request.getMethodName())
|| "scheduleRunnable".equals(request.getMethodName())) {
requestIndex = 4;
}
if ("scheduleAtFixedRate".equals(request.getMethodName())
|| "scheduleWithFixedDelay".equals(request.getMethodName())
|| "schedule".equals(request.getMethodName())) {
requestIndex = 6;
}
request.getArgs()[requestIndex] = request.getId();
Long startTime = (Long)request.getArgs()[3];
ScheduledParameters params = (ScheduledParameters) request.getArgs()[0];
params.setRequestId(request.getId());
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state
@ -91,7 +81,7 @@ public class ScheduledTasksService extends TasksService {
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName, tasksRetryIntervalName),
startTime, request.getId(), encode(request), System.currentTimeMillis(), tasksRetryInterval);
params.getStartTime(), request.getId(), encode(request), System.currentTimeMillis(), tasksRetryInterval);
}
@Override

@ -15,16 +15,11 @@
*/
package org.redisson.executor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -42,19 +37,23 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.executor.params.ScheduledAtFixedRateParameters;
import org.redisson.executor.params.ScheduledCronExpressionParameters;
import org.redisson.executor.params.ScheduledParameters;
import org.redisson.executor.params.ScheduledWithFixedDelayParameters;
import org.redisson.executor.params.TaskParameters;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import org.redisson.misc.Injector;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
* Executor service runs Callable and Runnable tasks.
@ -119,11 +118,12 @@ public class TasksRunnerService implements RemoteExecutorService {
}
@Override
public void scheduleAtFixedRate(String className, byte[] classBody, byte[] state, long startTime, long period, String executorId, String requestId) {
long newStartTime = System.currentTimeMillis() + period;
RFuture<Void> future = asyncScheduledServiceAtFixed(executorId, requestId).scheduleAtFixedRate(className, classBody, state, newStartTime, period, executorId, requestId);
public void scheduleAtFixedRate(ScheduledAtFixedRateParameters params) {
long newStartTime = System.currentTimeMillis() + params.getPeriod();
params.setStartTime(newStartTime);
RFuture<Void> future = asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId()).scheduleAtFixedRate(params);
try {
executeRunnable(className, classBody, state, requestId);
executeRunnable(params);
} catch (RuntimeException e) {
// cancel task if it throws an exception
future.cancel(true);
@ -132,17 +132,18 @@ public class TasksRunnerService implements RemoteExecutorService {
}
@Override
public void schedule(String className, byte[] classBody, byte[] state, long startTime, String cronExpression, String timezone, String executorId, String requestId) {
CronExpression expression = new CronExpression(cronExpression);
expression.setTimeZone(TimeZone.getTimeZone(timezone));
public void schedule(ScheduledCronExpressionParameters params) {
CronExpression expression = new CronExpression(params.getCronExpression());
expression.setTimeZone(TimeZone.getTimeZone(params.getTimezone()));
Date nextStartDate = expression.getNextValidTimeAfter(new Date());
RFuture<Void> future = null;
if (nextStartDate != null) {
RemoteExecutorServiceAsync service = asyncScheduledServiceAtFixed(executorId, requestId);
future = service.schedule(className, classBody, state, nextStartDate.getTime(), cronExpression, timezone, executorId, requestId);
RemoteExecutorServiceAsync service = asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId());
params.setStartTime(nextStartDate.getTime());
future = service.schedule(params);
}
try {
executeRunnable(className, classBody, state, requestId);
executeRunnable(params);
} catch (RuntimeException e) {
// cancel task if it throws an exception
if (future != null) {
@ -173,31 +174,32 @@ public class TasksRunnerService implements RemoteExecutorService {
}
@Override
public void scheduleWithFixedDelay(String className, byte[] classBody, byte[] state, long startTime, long delay, String executorId, String requestId) {
executeRunnable(className, classBody, state, requestId);
long newStartTime = System.currentTimeMillis() + delay;
asyncScheduledServiceAtFixed(executorId, requestId).scheduleWithFixedDelay(className, classBody, state, newStartTime, delay, executorId, requestId);
public void scheduleWithFixedDelay(ScheduledWithFixedDelayParameters params) {
executeRunnable(params);
long newStartTime = System.currentTimeMillis() + params.getDelay();
params.setStartTime(newStartTime);
asyncScheduledServiceAtFixed(params.getExecutorId(), params.getRequestId()).scheduleWithFixedDelay(params);
}
@Override
public Object scheduleCallable(String className, byte[] classBody, byte[] state, long startTime, String requestId) {
return executeCallable(className, classBody, state, requestId);
public Object scheduleCallable(ScheduledParameters params) {
return executeCallable(params);
}
@Override
public void scheduleRunnable(String className, byte[] classBody, byte[] state, long startTime, String requestId) {
executeRunnable(className, classBody, state, requestId);
public void scheduleRunnable(ScheduledParameters params) {
executeRunnable(params);
}
@Override
public Object executeCallable(String className, byte[] classBody, byte[] state, String requestId) {
renewRetryTime(requestId);
public Object executeCallable(TaskParameters params) {
renewRetryTime(params.getRequestId());
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(params.getState().length);
try {
buf.writeBytes(state);
buf.writeBytes(params.getState());
Callable<?> callable = decode(className, classBody, buf);
Callable<?> callable = decode(params.getClassName(), params.getClassBody(), buf);
return callable.call();
} catch (RedissonShutdownException e) {
return null;
@ -208,7 +210,7 @@ public class TasksRunnerService implements RemoteExecutorService {
throw new IllegalArgumentException(e);
} finally {
buf.release();
finish(requestId);
finish(params.getRequestId());
}
}
@ -282,16 +284,16 @@ public class TasksRunnerService implements RemoteExecutorService {
}
@Override
public void executeRunnable(String className, byte[] classBody, byte[] state, String requestId) {
if (requestId != null && requestId.startsWith("00")) {
renewRetryTime(requestId);
public void executeRunnable(TaskParameters params) {
if (params.getRequestId() != null && params.getRequestId().startsWith("00")) {
renewRetryTime(params.getRequestId());
}
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(params.getState().length);
try {
buf.writeBytes(state);
buf.writeBytes(params.getState());
Runnable runnable = decode(className, classBody, buf);
Runnable runnable = decode(params.getClassName(), params.getClassBody(), buf);
runnable.run();
} catch (RedissonShutdownException e) {
// skip
@ -301,7 +303,7 @@ public class TasksRunnerService implements RemoteExecutorService {
throw new IllegalArgumentException(e);
} finally {
buf.release();
finish(requestId);
finish(params.getRequestId());
}
}

@ -30,6 +30,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.params.TaskParameters;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RemoteServiceCancelRequest;
@ -125,7 +126,9 @@ public class TasksService extends BaseRemoteService {
}
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request) {
request.getArgs()[3] = request.getId();
TaskParameters params = (TaskParameters) request.getArgs()[0];
params.setRequestId(request.getId());
long retryStartTime = 0;
if (tasksRetryInterval > 0) {
retryStartTime = System.currentTimeMillis() + tasksRetryInterval;

@ -0,0 +1,42 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor.params;
/**
*
* @author Nikita Koksharov
*
*/
public class ScheduledAtFixedRateParameters extends ScheduledParameters {
private long period;
private String executorId;
public long getPeriod() {
return period;
}
public void setPeriod(long period) {
this.period = period;
}
public String getExecutorId() {
return executorId;
}
public void setExecutorId(String executorId) {
this.executorId = executorId;
}
}

@ -0,0 +1,50 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor.params;
/**
*
* @author Nikita Koksharov
*
*/
public class ScheduledCronExpressionParameters extends ScheduledParameters {
private String cronExpression;
private String timezone;
private String executorId;
public String getCronExpression() {
return cronExpression;
}
public void setCronExpression(String cronExpression) {
this.cronExpression = cronExpression;
}
public String getTimezone() {
return timezone;
}
public void setTimezone(String timezone) {
this.timezone = timezone;
}
public String getExecutorId() {
return executorId;
}
public void setExecutorId(String executorId) {
this.executorId = executorId;
}
}

@ -0,0 +1,43 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor.params;
/**
*
* @author Nikita Koksharov
*
*/
public class ScheduledParameters extends TaskParameters {
private long startTime;
public ScheduledParameters() {
}
public ScheduledParameters(String className, byte[] classBody, byte[] state, long startTime) {
super(className, classBody, state);
this.startTime = startTime;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
}

@ -0,0 +1,42 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor.params;
/**
*
* @author Nikita Koksharov
*
*/
public class ScheduledWithFixedDelayParameters extends ScheduledParameters {
private long delay;
private String executorId;
public long getDelay() {
return delay;
}
public void setDelay(long delay) {
this.delay = delay;
}
public String getExecutorId() {
return executorId;
}
public void setExecutorId(String executorId) {
this.executorId = executorId;
}
}

@ -0,0 +1,72 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor.params;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class TaskParameters implements Serializable {
private static final long serialVersionUID = -5662511632962297898L;
private String className;
private byte[] classBody;
private byte[] state;
private String requestId;
public TaskParameters() {
}
public TaskParameters(String className, byte[] classBody, byte[] state) {
super();
this.className = className;
this.classBody = classBody;
this.state = state;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public byte[] getClassBody() {
return classBody;
}
public void setClassBody(byte[] classBody) {
this.classBody = classBody;
}
public byte[] getState() {
return state;
}
public void setState(byte[] state) {
this.state = state;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
}
Loading…
Cancel
Save