Feature - Lambda support for RExecutorService #1183 #1656

pull/1705/head
Nikita 6 years ago
parent 230233086e
commit c4762a34a4

@ -15,10 +15,15 @@
*/
package org.redisson;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.lang.invoke.SerializedLambda;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
@ -131,7 +136,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final ScheduledTasksService scheduledRemoteService;
private final TasksService executorRemoteService;
private final Map<Class<?>, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap();
private final Map<Class<?>, ClassBody> class2body = PlatformDependent.newConcurrentHashMap();
private final String name;
private final String requestQueueName;
@ -316,9 +321,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public void execute(Runnable task) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state));
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
syncExecute(promise);
}
@ -332,9 +337,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
for (Runnable task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
asyncServiceWithoutResult.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state));
asyncServiceWithoutResult.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
}
List<Boolean> result = (List<Boolean>) executorRemoteService.executeAdd();
@ -373,26 +378,85 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
}
}
public static class ClassBody {
private byte[] lambda;
private byte[] clazz;
private String clazzName;
public ClassBody(byte[] lambda, byte[] clazz, String clazzName) {
super();
this.lambda = lambda;
this.clazz = clazz;
this.clazzName = clazzName;
}
public String getClazzName() {
return clazzName;
}
public byte[] getClazz() {
return clazz;
}
public byte[] getLambda() {
return lambda;
}
}
private byte[] getClassBody(Object task) {
private ClassBody getClassBody(Object task) {
Class<?> c = task.getClass();
byte[] classBody = class2bytes.get(c);
if (classBody == null) {
ClassBody result = class2body.get(c);
if (result == null) {
String className = c.getName();
String classAsPath = className.replace('.', '/') + ".class";
InputStream classStream = c.getClassLoader().getResourceAsStream(classAsPath);
DataInputStream s = new DataInputStream(classStream);
byte[] lambdaBody = null;
if (classStream == null) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
ObjectOutput oo = new ObjectOutputStream(os);
oo.writeObject(task);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to serialize lambda", e);
}
lambdaBody = os.toByteArray();
SerializedLambda lambda;
try {
Method writeReplace = task.getClass().getDeclaredMethod("writeReplace");
writeReplace.setAccessible(true);
lambda = (SerializedLambda) writeReplace.invoke(task);
} catch (Exception ex) {
throw new IllegalArgumentException("Lambda should implement java.io.Serializable interface", ex);
}
className = lambda.getCapturingClass().replace('/', '.');
classStream = task.getClass().getClassLoader().getResourceAsStream(lambda.getCapturingClass() + ".class");
}
byte[] classBody;
try {
DataInputStream s = new DataInputStream(classStream);
classBody = new byte[s.available()];
s.readFully(classBody);
} catch (IOException e) {
throw new IllegalArgumentException(e);
} finally {
try {
classStream.close();
} catch (IOException e) {
// skip
}
}
class2bytes.put(c, classBody);
result = new ClassBody(lambdaBody, classBody, className);
class2body.put(c, result);
}
return classBody;
return result;
}
@Override
@ -505,9 +569,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public <T> RExecutorFuture<T> submitAsync(Callable<T> task) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state));
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
addListener(result);
return createFuture(result);
}
@ -523,9 +587,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Callable<?> task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state));
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture);
}
@ -549,9 +613,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
for (Callable<?> task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state));
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture);
}
@ -651,9 +715,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Runnable task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state));
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture);
}
@ -677,9 +741,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
for (Runnable task : tasks) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state));
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture);
}
@ -721,9 +785,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RExecutorFuture<?> submitAsync(Runnable task) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state));
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
addListener(result);
return createFuture(result);
}
@ -789,10 +853,10 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(new ScheduledParameters(task.getClass().getName(), classBody, state, startTime));
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime));
addListener(result);
return createFuture(result, startTime);
@ -810,10 +874,10 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(new ScheduledParameters(task.getClass().getName(), classBody, state, startTime));
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime));
addListener(result);
return createFuture(result, startTime);
}
@ -830,12 +894,13 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RScheduledFuture<?> scheduleAtFixedRateAsync(Runnable task, long initialDelay, long period, TimeUnit unit) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
ScheduledAtFixedRateParameters params = new ScheduledAtFixedRateParameters();
params.setClassName(task.getClass().getName());
params.setClassBody(classBody);
params.setClassName(classBody.getClazzName());
params.setClassBody(classBody.getClazz());
params.setLambdaBody(classBody.getLambda());
params.setState(state);
params.setStartTime(startTime);
params.setPeriod(unit.toMillis(period));
@ -857,7 +922,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RScheduledFuture<?> scheduleAsync(Runnable task, CronSchedule cronSchedule) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
final Date startDate = cronSchedule.getExpression().getNextValidTimeAfter(new Date());
if (startDate == null) {
@ -866,8 +931,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
long startTime = startDate.getTime();
ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters();
params.setClassName(task.getClass().getName());
params.setClassBody(classBody);
params.setClassName(classBody.getClazzName());
params.setClassBody(classBody.getClazz());
params.setLambdaBody(classBody.getLambda());
params.setState(state);
params.setStartTime(startTime);
params.setCronExpression(cronSchedule.getExpression().getCronExpression());
@ -896,13 +962,14 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RScheduledFuture<?> scheduleWithFixedDelayAsync(Runnable task, long initialDelay, long delay, TimeUnit unit) {
check(task);
byte[] classBody = getClassBody(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
ScheduledWithFixedDelayParameters params = new ScheduledWithFixedDelayParameters();
params.setClassName(task.getClass().getName());
params.setClassBody(classBody);
params.setClassName(classBody.getClazzName());
params.setClassBody(classBody.getClazz());
params.setLambdaBody(classBody.getLambda());
params.setState(state);
params.setStartTime(startTime);
params.setDelay(unit.toMillis(delay));

@ -15,7 +15,9 @@
*/
package org.redisson.executor;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
@ -36,6 +38,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CustomObjectInputStream;
import org.redisson.command.CommandExecutor;
import org.redisson.executor.params.ScheduledAtFixedRateParameters;
import org.redisson.executor.params.ScheduledCronExpressionParameters;
@ -270,7 +273,16 @@ public class TasksRunnerService implements RemoteExecutorService {
codecs.put(hash, classLoaderCodec);
}
T task = (T) classLoaderCodec.getValueDecoder().decode(stateBuf, null);
T task;
if (params.getLambdaBody() != null) {
ByteArrayInputStream is = new ByteArrayInputStream(params.getLambdaBody());
ObjectInput oo = new CustomObjectInputStream(classLoaderCodec.getClassLoader(), is);
task = (T) oo.readObject();
oo.close();
} else {
task = (T) classLoaderCodec.getValueDecoder().decode(stateBuf, null);
}
Injector.inject(task, redisson);
return task;
} catch (Exception e) {

@ -27,8 +27,8 @@ public class ScheduledParameters extends TaskParameters {
public ScheduledParameters() {
}
public ScheduledParameters(String className, byte[] classBody, byte[] state, long startTime) {
super(className, classBody, state);
public ScheduledParameters(String className, byte[] classBody, byte[] lambdaBody, byte[] state, long startTime) {
super(className, classBody, lambdaBody, state);
this.startTime = startTime;
}

@ -28,17 +28,26 @@ public class TaskParameters implements Serializable {
private String className;
private byte[] classBody;
private byte[] lambdaBody;
private byte[] state;
private String requestId;
public TaskParameters() {
}
public TaskParameters(String className, byte[] classBody, byte[] state) {
public TaskParameters(String className, byte[] classBody, byte[] lambdaBody, byte[] state) {
super();
this.className = className;
this.classBody = classBody;
this.state = state;
this.lambdaBody = lambdaBody;
}
public byte[] getLambdaBody() {
return lambdaBody;
}
public void setLambdaBody(byte[] lambdaBody) {
this.lambdaBody = lambdaBody;
}
public String getClassName() {

Loading…
Cancel
Save