Fixed - RExecutorService task execution performance regression. #1596

pull/1547/head
Nikita 7 years ago
parent 84c00617f9
commit f78b0b68f5

@ -18,6 +18,7 @@ package org.redisson.executor;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -28,6 +29,7 @@ import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.cache.LRUCacheMap;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
@ -53,6 +55,8 @@ import io.netty.util.concurrent.FutureListener;
*/
public class TasksRunnerService implements RemoteExecutorService {
private final Map<String, Codec> codecs = new LRUCacheMap<String, Codec>(500, 0, 0);
private final Codec codec;
private final String name;
private final CommandExecutor commandExecutor;
@ -182,10 +186,7 @@ public class TasksRunnerService implements RemoteExecutorService {
try {
buf.writeBytes(state);
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody);
Callable<?> callable = decode(cl, buf);
Callable<?> callable = decode(className, classBody, buf);
return callable.call();
} catch (RedissonShutdownException e) {
return null;
@ -244,12 +245,20 @@ public class TasksRunnerService implements RemoteExecutorService {
}
});
}
@SuppressWarnings("unchecked")
private <T> T decode(RedissonClassLoader cl, ByteBuf buf) throws IOException {
private <T> T decode(String className, byte[] classBody, ByteBuf buf) throws IOException {
try {
Codec codec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl);
T task = (T) codec.getValueDecoder().decode(buf, null);
Codec classLoaderCodec = codecs.get(className);
if (classLoaderCodec == null) {
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody);
classLoaderCodec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl);
codecs.put(className, classLoaderCodec);
}
T task = (T) classLoaderCodec.getValueDecoder().decode(buf, null);
Injector.inject(task, redisson);
return task;
} catch (Exception e) {
@ -268,10 +277,7 @@ public class TasksRunnerService implements RemoteExecutorService {
try {
buf.writeBytes(state);
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody);
Runnable runnable = decode(cl, buf);
Runnable runnable = decode(className, classBody, buf);
runnable.run();
} catch (RedissonShutdownException e) {
// skip

@ -290,6 +290,15 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(redisson.getKeys().count()).isZero();
}
@Test
public void testPerformance() throws InterruptedException {
RExecutorService e = redisson.getExecutorService("test");
for (int i = 0; i < 5000; i++) {
e.execute(new RunnableTask());
}
e.shutdown();
assertThat(e.awaitTermination(900, TimeUnit.MILLISECONDS)).isTrue();
}
@Test
public void testShutdown() throws InterruptedException {

Loading…
Cancel
Save