Improvement - memory allocation optimization during ExecutorService task execution.

pull/1705/head
Nikita 6 years ago
parent db19103b79
commit 9c4c57f72d

@ -49,7 +49,7 @@ import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
@ -195,11 +195,8 @@ public class TasksRunnerService implements RemoteExecutorService {
public Object executeCallable(TaskParameters params) {
renewRetryTime(params.getRequestId());
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(params.getState().length);
try {
buf.writeBytes(params.getState());
Callable<?> callable = decode(params.getClassName(), params.getClassBody(), buf);
Callable<?> callable = decode(params);
return callable.call();
} catch (RedissonShutdownException e) {
return null;
@ -209,7 +206,6 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
buf.release();
finish(params.getRequestId());
}
}
@ -260,26 +256,28 @@ public class TasksRunnerService implements RemoteExecutorService {
}
@SuppressWarnings("unchecked")
private <T> T decode(String className, byte[] classBody, ByteBuf buf) throws IOException {
ByteBuf classBodyBuf = ByteBufAllocator.DEFAULT.buffer(classBody.length);
private <T> T decode(TaskParameters params) throws IOException {
ByteBuf classBodyBuf = Unpooled.wrappedBuffer(params.getClassBody());
ByteBuf stateBuf = Unpooled.wrappedBuffer(params.getState());
try {
HashValue hash = new HashValue(Hash.hash128(classBodyBuf));
Codec classLoaderCodec = codecs.get(hash);
if (classLoaderCodec == null) {
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody);
cl.loadClass(params.getClassName(), params.getClassBody());
classLoaderCodec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl);
codecs.put(hash, classLoaderCodec);
}
T task = (T) classLoaderCodec.getValueDecoder().decode(buf, null);
T task = (T) classLoaderCodec.getValueDecoder().decode(stateBuf, null);
Injector.inject(task, redisson);
return task;
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
} finally {
classBodyBuf.release();
stateBuf.release();
}
}
@ -289,11 +287,8 @@ public class TasksRunnerService implements RemoteExecutorService {
renewRetryTime(params.getRequestId());
}
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(params.getState().length);
try {
buf.writeBytes(params.getState());
Runnable runnable = decode(params.getClassName(), params.getClassBody(), buf);
Runnable runnable = decode(params);
runnable.run();
} catch (RedissonShutdownException e) {
// skip
@ -302,7 +297,6 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
buf.release();
finish(params.getRequestId());
}
}

Loading…
Cancel
Save