diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 8588aaa29..cd478a412 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -49,7 +49,7 @@ import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; @@ -195,11 +195,8 @@ public class TasksRunnerService implements RemoteExecutorService { public Object executeCallable(TaskParameters params) { renewRetryTime(params.getRequestId()); - ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(params.getState().length); try { - buf.writeBytes(params.getState()); - - Callable callable = decode(params.getClassName(), params.getClassBody(), buf); + Callable callable = decode(params); return callable.call(); } catch (RedissonShutdownException e) { return null; @@ -209,7 +206,6 @@ public class TasksRunnerService implements RemoteExecutorService { } catch (Exception e) { throw new IllegalArgumentException(e); } finally { - buf.release(); finish(params.getRequestId()); } } @@ -260,26 +256,28 @@ public class TasksRunnerService implements RemoteExecutorService { } @SuppressWarnings("unchecked") - private T decode(String className, byte[] classBody, ByteBuf buf) throws IOException { - ByteBuf classBodyBuf = ByteBufAllocator.DEFAULT.buffer(classBody.length); + private T decode(TaskParameters params) throws IOException { + ByteBuf classBodyBuf = Unpooled.wrappedBuffer(params.getClassBody()); + ByteBuf stateBuf = Unpooled.wrappedBuffer(params.getState()); try { HashValue hash = new HashValue(Hash.hash128(classBodyBuf)); Codec classLoaderCodec = codecs.get(hash); if (classLoaderCodec == null) { RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader()); - cl.loadClass(className, classBody); + cl.loadClass(params.getClassName(), params.getClassBody()); classLoaderCodec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl); codecs.put(hash, classLoaderCodec); } - T task = (T) classLoaderCodec.getValueDecoder().decode(buf, null); + T task = (T) classLoaderCodec.getValueDecoder().decode(stateBuf, null); Injector.inject(task, redisson); return task; } catch (Exception e) { throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); } finally { classBodyBuf.release(); + stateBuf.release(); } } @@ -289,11 +287,8 @@ public class TasksRunnerService implements RemoteExecutorService { renewRetryTime(params.getRequestId()); } - ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(params.getState().length); try { - buf.writeBytes(params.getState()); - - Runnable runnable = decode(params.getClassName(), params.getClassBody(), buf); + Runnable runnable = decode(params); runnable.run(); } catch (RedissonShutdownException e) { // skip @@ -302,7 +297,6 @@ public class TasksRunnerService implements RemoteExecutorService { } catch (Exception e) { throw new IllegalArgumentException(e); } finally { - buf.release(); finish(params.getRequestId()); } }