Fixed - RExecutorService task execution performance regression.

pull/1603/head
Nikita
parent f78b0b68f5
commit 8d8ff88a10

@ -36,6 +36,8 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandExecutor;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import org.redisson.misc.Injector; import org.redisson.misc.Injector;
import org.redisson.remote.RequestId; import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry; import org.redisson.remote.ResponseEntry;
@ -55,7 +57,7 @@ import io.netty.util.concurrent.FutureListener;
*/ */
public class TasksRunnerService implements RemoteExecutorService { public class TasksRunnerService implements RemoteExecutorService {
private final Map<String, Codec> codecs = new LRUCacheMap<String, Codec>(500, 0, 0); private final Map<HashValue, Codec> codecs = new LRUCacheMap<HashValue, Codec>(500, 0, 0);
private final Codec codec; private final Codec codec;
private final String name; private final String name;
@ -248,14 +250,16 @@ public class TasksRunnerService implements RemoteExecutorService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private <T> T decode(String className, byte[] classBody, ByteBuf buf) throws IOException { private <T> T decode(String className, byte[] classBody, ByteBuf buf) throws IOException {
ByteBuf classBodyBuf = ByteBufAllocator.DEFAULT.buffer(classBody.length);
try { try {
Codec classLoaderCodec = codecs.get(className); HashValue hash = new HashValue(Hash.hash128(classBodyBuf));
Codec classLoaderCodec = codecs.get(hash);
if (classLoaderCodec == null) { if (classLoaderCodec == null) {
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader()); RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody); cl.loadClass(className, classBody);
classLoaderCodec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl); classLoaderCodec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl);
codecs.put(className, classLoaderCodec); codecs.put(hash, classLoaderCodec);
} }
T task = (T) classLoaderCodec.getValueDecoder().decode(buf, null); T task = (T) classLoaderCodec.getValueDecoder().decode(buf, null);
@ -263,8 +267,9 @@ public class TasksRunnerService implements RemoteExecutorService {
return task; return task;
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
} finally {
classBodyBuf.release();
} }
} }
@Override @Override

@ -297,7 +297,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
e.execute(new RunnableTask()); e.execute(new RunnableTask());
} }
e.shutdown(); e.shutdown();
assertThat(e.awaitTermination(900, TimeUnit.MILLISECONDS)).isTrue(); assertThat(e.awaitTermination(1000, TimeUnit.MILLISECONDS)).isTrue();
} }
@Test @Test

Loading…
Cancel
Save