Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java
pull/1821/head
Nikita 6 years ago
commit 74fde8ae90

@ -15,10 +15,15 @@
*/ */
package org.redisson; package org.redisson;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.lang.invoke.SerializedLambda;
import java.lang.ref.ReferenceQueue; import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -131,7 +136,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final ScheduledTasksService scheduledRemoteService; private final ScheduledTasksService scheduledRemoteService;
private final TasksService executorRemoteService; private final TasksService executorRemoteService;
private final Map<Class<?>, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap(); private final Map<Class<?>, ClassBody> class2body = PlatformDependent.newConcurrentHashMap();
private final String name; private final String name;
private final String requestQueueName; private final String requestQueueName;
@ -316,9 +321,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public void execute(Runnable task) { public void execute(Runnable task) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state)); RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
syncExecute(promise); syncExecute(promise);
} }
@ -332,9 +337,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
for (Runnable task : tasks) { for (Runnable task : tasks) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
asyncServiceWithoutResult.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state)); asyncServiceWithoutResult.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
} }
List<Boolean> result = (List<Boolean>) executorRemoteService.executeAdd(); List<Boolean> result = (List<Boolean>) executorRemoteService.executeAdd();
@ -374,25 +379,84 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
} }
private byte[] getClassBody(Object task) { public static class ClassBody {
private byte[] lambda;
private byte[] clazz;
private String clazzName;
public ClassBody(byte[] lambda, byte[] clazz, String clazzName) {
super();
this.lambda = lambda;
this.clazz = clazz;
this.clazzName = clazzName;
}
public String getClazzName() {
return clazzName;
}
public byte[] getClazz() {
return clazz;
}
public byte[] getLambda() {
return lambda;
}
}
private ClassBody getClassBody(Object task) {
Class<?> c = task.getClass(); Class<?> c = task.getClass();
byte[] classBody = class2bytes.get(c); ClassBody result = class2body.get(c);
if (classBody == null) { if (result == null) {
String className = c.getName(); String className = c.getName();
String classAsPath = className.replace('.', '/') + ".class"; String classAsPath = className.replace('.', '/') + ".class";
InputStream classStream = c.getClassLoader().getResourceAsStream(classAsPath); InputStream classStream = c.getClassLoader().getResourceAsStream(classAsPath);
DataInputStream s = new DataInputStream(classStream); byte[] lambdaBody = null;
if (classStream == null) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
ObjectOutput oo = new ObjectOutputStream(os);
oo.writeObject(task);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to serialize lambda", e);
}
lambdaBody = os.toByteArray();
SerializedLambda lambda;
try {
Method writeReplace = task.getClass().getDeclaredMethod("writeReplace");
writeReplace.setAccessible(true);
lambda = (SerializedLambda) writeReplace.invoke(task);
} catch (Exception ex) {
throw new IllegalArgumentException("Lambda should implement java.io.Serializable interface", ex);
}
className = lambda.getCapturingClass().replace('/', '.');
classStream = task.getClass().getClassLoader().getResourceAsStream(lambda.getCapturingClass() + ".class");
}
byte[] classBody;
try { try {
DataInputStream s = new DataInputStream(classStream);
classBody = new byte[s.available()]; classBody = new byte[s.available()];
s.readFully(classBody); s.readFully(classBody);
} catch (IOException e) { } catch (IOException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} finally {
try {
classStream.close();
} catch (IOException e) {
// skip
}
} }
class2bytes.put(c, classBody); result = new ClassBody(lambdaBody, classBody, className);
class2body.put(c, result);
} }
return classBody; return result;
} }
@Override @Override
@ -505,9 +569,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public <T> RExecutorFuture<T> submitAsync(Callable<T> task) { public <T> RExecutorFuture<T> submitAsync(Callable<T> task) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state)); RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
addListener(result); addListener(result);
return createFuture(result); return createFuture(result);
} }
@ -523,9 +587,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Callable<?> task : tasks) { for (Callable<?> task : tasks) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state)); RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise); RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture); result.add(executorFuture);
} }
@ -549,9 +613,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>(); final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
for (Callable<?> task : tasks) { for (Callable<?> task : tasks) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state)); RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise); RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture); result.add(executorFuture);
} }
@ -651,9 +715,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Runnable task : tasks) { for (Runnable task : tasks) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state)); RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise); RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture); result.add(executorFuture);
} }
@ -677,9 +741,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>(); final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
for (Runnable task : tasks) { for (Runnable task : tasks) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state)); RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise); RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture); result.add(executorFuture);
} }
@ -721,9 +785,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public RExecutorFuture<?> submitAsync(Runnable task) { public RExecutorFuture<?> submitAsync(Runnable task) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state)); RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
addListener(result); addListener(result);
return createFuture(result); return createFuture(result);
} }
@ -789,10 +853,10 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit) { public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay); long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(new ScheduledParameters(task.getClass().getName(), classBody, state, startTime)); RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime));
addListener(result); addListener(result);
return createFuture(result, startTime); return createFuture(result, startTime);
@ -810,10 +874,10 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit) { public <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeUnit unit) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay); long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(new ScheduledParameters(task.getClass().getName(), classBody, state, startTime)); RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime));
addListener(result); addListener(result);
return createFuture(result, startTime); return createFuture(result, startTime);
} }
@ -830,12 +894,13 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public RScheduledFuture<?> scheduleAtFixedRateAsync(Runnable task, long initialDelay, long period, TimeUnit unit) { public RScheduledFuture<?> scheduleAtFixedRateAsync(Runnable task, long initialDelay, long period, TimeUnit unit) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
ScheduledAtFixedRateParameters params = new ScheduledAtFixedRateParameters(); ScheduledAtFixedRateParameters params = new ScheduledAtFixedRateParameters();
params.setClassName(task.getClass().getName()); params.setClassName(classBody.getClazzName());
params.setClassBody(classBody); params.setClassBody(classBody.getClazz());
params.setLambdaBody(classBody.getLambda());
params.setState(state); params.setState(state);
params.setStartTime(startTime); params.setStartTime(startTime);
params.setPeriod(unit.toMillis(period)); params.setPeriod(unit.toMillis(period));
@ -857,7 +922,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public RScheduledFuture<?> scheduleAsync(Runnable task, CronSchedule cronSchedule) { public RScheduledFuture<?> scheduleAsync(Runnable task, CronSchedule cronSchedule) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
final Date startDate = cronSchedule.getExpression().getNextValidTimeAfter(new Date()); final Date startDate = cronSchedule.getExpression().getNextValidTimeAfter(new Date());
if (startDate == null) { if (startDate == null) {
@ -866,8 +931,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
long startTime = startDate.getTime(); long startTime = startDate.getTime();
ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters(); ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters();
params.setClassName(task.getClass().getName()); params.setClassName(classBody.getClazzName());
params.setClassBody(classBody); params.setClassBody(classBody.getClazz());
params.setLambdaBody(classBody.getLambda());
params.setState(state); params.setState(state);
params.setStartTime(startTime); params.setStartTime(startTime);
params.setCronExpression(cronSchedule.getExpression().getCronExpression()); params.setCronExpression(cronSchedule.getExpression().getCronExpression());
@ -896,13 +962,14 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public RScheduledFuture<?> scheduleWithFixedDelayAsync(Runnable task, long initialDelay, long delay, TimeUnit unit) { public RScheduledFuture<?> scheduleWithFixedDelayAsync(Runnable task, long initialDelay, long delay, TimeUnit unit) {
check(task); check(task);
byte[] classBody = getClassBody(task); ClassBody classBody = getClassBody(task);
byte[] state = encode(task); byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
ScheduledWithFixedDelayParameters params = new ScheduledWithFixedDelayParameters(); ScheduledWithFixedDelayParameters params = new ScheduledWithFixedDelayParameters();
params.setClassName(task.getClass().getName()); params.setClassName(classBody.getClazzName());
params.setClassBody(classBody); params.setClassBody(classBody.getClazz());
params.setLambdaBody(classBody.getLambda());
params.setState(state); params.setState(state);
params.setStartTime(startTime); params.setStartTime(startTime);
params.setDelay(unit.toMillis(delay)); params.setDelay(unit.toMillis(delay));
@ -975,12 +1042,17 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new NullPointerException(); throw new NullPointerException();
} }
RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()])); List<RExecutorFuture<?>> futures = new ArrayList<RExecutorFuture<?>>();
io.netty.util.concurrent.Future<T> result = poll(future.getTaskFutures(), timeout, unit); for (Callable<T> callable : tasks) {
RExecutorFuture<T> future = submit(callable);
futures.add(future);
}
io.netty.util.concurrent.Future<T> result = poll(futures, timeout, unit);
if (result == null) { if (result == null) {
throw new TimeoutException(); throw new TimeoutException();
} }
for (RExecutorFuture<?> f : future.getTaskFutures()) { for (RExecutorFuture<?> f : futures) {
f.cancel(true); f.cancel(true);
} }
return result.getNow(); return result.getNow();

@ -61,12 +61,12 @@ import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.TransactionOptions; import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider; import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.command.CommandReactiveService;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.ConfigSupport; import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;
import org.redisson.pubsub.SemaphorePubSub; import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.reactive.CommandReactiveService;
import org.redisson.reactive.ReactiveProxyBuilder; import org.redisson.reactive.ReactiveProxyBuilder;
import org.redisson.reactive.RedissonBatchReactive; import org.redisson.reactive.RedissonBatchReactive;
import org.redisson.reactive.RedissonKeysReactive; import org.redisson.reactive.RedissonKeysReactive;

@ -195,6 +195,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts(), false); decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts(), false);
} else { } else {
if (in.isReadable()) {
decode(in, cmd, firstLevel.getParts(), ctx.channel(), false);
}
decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts(), false); decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts(), false);
} }
} }

@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
@ -113,19 +114,13 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<?, ?>> {
private ByteBuf encode(Object in) { private ByteBuf encode(Object in) {
if (in instanceof byte[]) { if (in instanceof byte[]) {
byte[] payload = (byte[])in; return Unpooled.wrappedBuffer((byte[])in);
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(payload.length);
out.writeBytes(payload);
return out;
} }
if (in instanceof ByteBuf) { if (in instanceof ByteBuf) {
return (ByteBuf) in; return (ByteBuf) in;
} }
if (in instanceof ChannelName) { if (in instanceof ChannelName) {
byte[] payload = ((ChannelName)in).getName(); return Unpooled.wrappedBuffer(((ChannelName)in).getName());
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(payload.length);
out.writeBytes(payload);
return out;
} }
String payload = in.toString(); String payload = in.toString();

@ -15,7 +15,9 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInput;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
@ -36,6 +38,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CustomObjectInputStream;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandExecutor;
import org.redisson.executor.params.ScheduledAtFixedRateParameters; import org.redisson.executor.params.ScheduledAtFixedRateParameters;
import org.redisson.executor.params.ScheduledCronExpressionParameters; import org.redisson.executor.params.ScheduledCronExpressionParameters;
@ -49,7 +52,7 @@ import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry; import org.redisson.remote.ResponseEntry;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -195,11 +198,8 @@ public class TasksRunnerService implements RemoteExecutorService {
public Object executeCallable(TaskParameters params) { public Object executeCallable(TaskParameters params) {
renewRetryTime(params.getRequestId()); renewRetryTime(params.getRequestId());
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(params.getState().length);
try { try {
buf.writeBytes(params.getState()); Callable<?> callable = decode(params);
Callable<?> callable = decode(params.getClassName(), params.getClassBody(), buf);
return callable.call(); return callable.call();
} catch (RedissonShutdownException e) { } catch (RedissonShutdownException e) {
return null; return null;
@ -209,7 +209,6 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} finally { } finally {
buf.release();
finish(params.getRequestId()); finish(params.getRequestId());
} }
} }
@ -260,26 +259,37 @@ public class TasksRunnerService implements RemoteExecutorService {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private <T> T decode(String className, byte[] classBody, ByteBuf buf) throws IOException { private <T> T decode(TaskParameters params) throws IOException {
ByteBuf classBodyBuf = ByteBufAllocator.DEFAULT.buffer(classBody.length); ByteBuf classBodyBuf = Unpooled.wrappedBuffer(params.getClassBody());
ByteBuf stateBuf = Unpooled.wrappedBuffer(params.getState());
try { try {
HashValue hash = new HashValue(Hash.hash128(classBodyBuf)); HashValue hash = new HashValue(Hash.hash128(classBodyBuf));
Codec classLoaderCodec = codecs.get(hash); Codec classLoaderCodec = codecs.get(hash);
if (classLoaderCodec == null) { if (classLoaderCodec == null) {
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader()); RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody); cl.loadClass(params.getClassName(), params.getClassBody());
classLoaderCodec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl); classLoaderCodec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl);
codecs.put(hash, classLoaderCodec); codecs.put(hash, classLoaderCodec);
} }
T task = (T) classLoaderCodec.getValueDecoder().decode(buf, null); T task;
if (params.getLambdaBody() != null) {
ByteArrayInputStream is = new ByteArrayInputStream(params.getLambdaBody());
ObjectInput oo = new CustomObjectInputStream(classLoaderCodec.getClassLoader(), is);
task = (T) oo.readObject();
oo.close();
} else {
task = (T) classLoaderCodec.getValueDecoder().decode(stateBuf, null);
}
Injector.inject(task, redisson); Injector.inject(task, redisson);
return task; return task;
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
} finally { } finally {
classBodyBuf.release(); classBodyBuf.release();
stateBuf.release();
} }
} }
@ -289,11 +299,8 @@ public class TasksRunnerService implements RemoteExecutorService {
renewRetryTime(params.getRequestId()); renewRetryTime(params.getRequestId());
} }
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(params.getState().length);
try { try {
buf.writeBytes(params.getState()); Runnable runnable = decode(params);
Runnable runnable = decode(params.getClassName(), params.getClassBody(), buf);
runnable.run(); runnable.run();
} catch (RedissonShutdownException e) { } catch (RedissonShutdownException e) {
// skip // skip
@ -302,7 +309,6 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} finally { } finally {
buf.release();
finish(params.getRequestId()); finish(params.getRequestId());
} }
} }

@ -27,8 +27,8 @@ public class ScheduledParameters extends TaskParameters {
public ScheduledParameters() { public ScheduledParameters() {
} }
public ScheduledParameters(String className, byte[] classBody, byte[] state, long startTime) { public ScheduledParameters(String className, byte[] classBody, byte[] lambdaBody, byte[] state, long startTime) {
super(className, classBody, state); super(className, classBody, lambdaBody, state);
this.startTime = startTime; this.startTime = startTime;
} }

@ -28,17 +28,26 @@ public class TaskParameters implements Serializable {
private String className; private String className;
private byte[] classBody; private byte[] classBody;
private byte[] lambdaBody;
private byte[] state; private byte[] state;
private String requestId; private String requestId;
public TaskParameters() { public TaskParameters() {
} }
public TaskParameters(String className, byte[] classBody, byte[] state) { public TaskParameters(String className, byte[] classBody, byte[] lambdaBody, byte[] state) {
super(); super();
this.className = className; this.className = className;
this.classBody = classBody; this.classBody = classBody;
this.state = state; this.state = state;
this.lambdaBody = lambdaBody;
}
public byte[] getLambdaBody() {
return lambdaBody;
}
public void setLambdaBody(byte[] lambdaBody) {
this.lambdaBody = lambdaBody;
} }
public String getClassName() { public String getClassName() {

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.command; package org.redisson.reactive;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -27,6 +27,8 @@ import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;

@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.command; package org.redisson.reactive;
import java.util.List; import java.util.List;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
/** /**
* *

@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.command; package org.redisson.reactive;
import java.util.List; import java.util.List;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;

@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.command.CommandReactiveExecutor;
/** /**
* *

@ -73,8 +73,6 @@ import org.redisson.api.RStreamReactive;
import org.redisson.api.RTopicReactive; import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient; import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveBatchService;
import org.redisson.command.CommandReactiveService;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;

@ -29,7 +29,6 @@ import org.redisson.client.RedisClient;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveService;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;

@ -24,7 +24,6 @@ import org.redisson.api.RList;
import org.redisson.api.RListMultimap; import org.redisson.api.RListMultimap;
import org.redisson.api.RListReactive; import org.redisson.api.RListReactive;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
/** /**
* *

@ -22,7 +22,6 @@ import org.reactivestreams.Publisher;
import org.redisson.RedissonList; import org.redisson.RedissonList;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;

@ -19,7 +19,6 @@ import org.redisson.RedissonReadWriteLock;
import org.redisson.api.RLockReactive; import org.redisson.api.RLockReactive;
import org.redisson.api.RReadWriteLock; import org.redisson.api.RReadWriteLock;
import org.redisson.api.RReadWriteLockReactive; import org.redisson.api.RReadWriteLockReactive;
import org.redisson.command.CommandReactiveExecutor;
/** /**
* *

@ -22,7 +22,6 @@ import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveExecutor;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;

@ -20,7 +20,6 @@ import org.redisson.api.RSet;
import org.redisson.api.RSetMultimap; import org.redisson.api.RSetMultimap;
import org.redisson.api.RSetReactive; import org.redisson.api.RSetReactive;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
/** /**
* *

@ -32,7 +32,6 @@ import org.redisson.api.RTransaction;
import org.redisson.api.RTransactionReactive; import org.redisson.api.RTransactionReactive;
import org.redisson.api.TransactionOptions; import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.transaction.RedissonTransaction; import org.redisson.transaction.RedissonTransaction;
/** /**

@ -109,10 +109,16 @@ public class RedissonTopicTest {
@Test @Test
public void testPing() throws InterruptedException { public void testPing() throws InterruptedException {
Config config = BaseTest.createConfig(); Config config = BaseTest.createConfig();
config.useSingleServer().setPingConnectionInterval(50); config.useSingleServer()
.setPingConnectionInterval(50)
.setConnectTimeout(20_000)
.setTimeout(25_000_000)
.setRetryInterval(750)
.setConnectionMinimumIdleSize(4)
.setConnectionPoolSize(16);
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
int count = 1000; int count = 6000;
CountDownLatch latch = new CountDownLatch(count); CountDownLatch latch = new CountDownLatch(count);
RTopic eventsTopic = redisson.getTopic("eventsTopic"); RTopic eventsTopic = redisson.getTopic("eventsTopic");

Loading…
Cancel
Save