From 2eee782be804d2b185bc5030ead5d8701db57d11 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 18 Oct 2018 13:41:46 +0300 Subject: [PATCH 1/8] Fixed - DecoderException when using connection ping #1497 --- .../java/org/redisson/client/handler/CommandDecoder.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 582288ea0..71728db88 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -155,7 +155,7 @@ public class CommandDecoder extends ReplayingDecoder { CommandData cmd) throws IOException { if (state().getLevels().size() == 2) { StateLevel secondLevel = state().getLevels().get(1); - + if (secondLevel.getParts().isEmpty()) { state().getLevels().remove(1); } @@ -189,13 +189,14 @@ public class CommandDecoder extends ReplayingDecoder { } firstLevel.setLastList(null); firstLevel.setLastListSize(0); - + while (in.isReadable() && firstLevel.getParts().size() < firstLevel.getSize()) { decode(in, cmd, firstLevel.getParts(), ctx.channel(), false); } decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts(), false); } else { decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts(), false); + decode(in, cmd, null, ctx.channel(), false); } } } From fda6495b4eb77df3ce78799f67e8d39029af9ee7 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 18 Oct 2018 13:42:30 +0300 Subject: [PATCH 2/8] testPing improvements --- .../src/test/java/org/redisson/RedissonTopicTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index acb679c8f..a164a2e79 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -109,10 +109,16 @@ public class RedissonTopicTest { @Test public void testPing() throws InterruptedException { Config config = BaseTest.createConfig(); - config.useSingleServer().setPingConnectionInterval(50); + config.useSingleServer() + .setPingConnectionInterval(50) + .setConnectTimeout(20_000) + .setTimeout(25_000_000) + .setRetryInterval(750) + .setConnectionMinimumIdleSize(4) + .setConnectionPoolSize(16); RedissonClient redisson = Redisson.create(config); - int count = 1000; + int count = 6000; CountDownLatch latch = new CountDownLatch(count); RTopic eventsTopic = redisson.getTopic("eventsTopic"); From d2ac9ab8d0969edbf07db836dda738eef9ea29e4 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Oct 2018 12:23:59 +0300 Subject: [PATCH 3/8] refactoring --- redisson/src/main/java/org/redisson/RedissonReactive.java | 2 +- .../{command => reactive}/CommandReactiveBatchService.java | 5 +++-- .../{command => reactive}/CommandReactiveExecutor.java | 3 ++- .../{command => reactive}/CommandReactiveService.java | 4 ++-- .../java/org/redisson/reactive/ReactiveProxyBuilder.java | 1 - .../java/org/redisson/reactive/RedissonBatchReactive.java | 1 - .../java/org/redisson/reactive/RedissonKeysReactive.java | 1 - .../org/redisson/reactive/RedissonListMultimapReactive.java | 1 - .../java/org/redisson/reactive/RedissonListReactive.java | 1 - .../org/redisson/reactive/RedissonReadWriteLockReactive.java | 1 - .../redisson/reactive/RedissonScoredSortedSetReactive.java | 1 - .../org/redisson/reactive/RedissonSetMultimapReactive.java | 1 - .../org/redisson/reactive/RedissonTransactionReactive.java | 1 - 13 files changed, 8 insertions(+), 15 deletions(-) rename redisson/src/main/java/org/redisson/{command => reactive}/CommandReactiveBatchService.java (96%) rename redisson/src/main/java/org/redisson/{command => reactive}/CommandReactiveExecutor.java (91%) rename redisson/src/main/java/org/redisson/{command => reactive}/CommandReactiveService.java (93%) diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 2964284d1..94fb15b1f 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -61,12 +61,12 @@ import org.redisson.api.RedissonReactiveClient; import org.redisson.api.TransactionOptions; import org.redisson.client.codec.Codec; import org.redisson.codec.ReferenceCodecProvider; -import org.redisson.command.CommandReactiveService; import org.redisson.config.Config; import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; import org.redisson.pubsub.SemaphorePubSub; +import org.redisson.reactive.CommandReactiveService; import org.redisson.reactive.ReactiveProxyBuilder; import org.redisson.reactive.RedissonBatchReactive; import org.redisson.reactive.RedissonKeysReactive; diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java similarity index 96% rename from redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java rename to redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java index d3d01eba5..c40b324cc 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson.command; +package org.redisson.reactive; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -27,10 +27,11 @@ import org.redisson.api.RedissonReactiveClient; import org.redisson.client.RedisConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.command.CommandBatchService; import org.redisson.connection.ConnectionManager; import org.redisson.connection.NodeSource; import org.redisson.misc.RPromise; -import org.redisson.reactive.NettyFuturePublisher; import reactor.fn.Supplier; import reactor.rx.action.support.DefaultSubscriber; diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveExecutor.java similarity index 91% rename from redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java rename to redisson/src/main/java/org/redisson/reactive/CommandReactiveExecutor.java index 2180e989c..b0677a2f1 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java +++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveExecutor.java @@ -13,10 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson.command; +package org.redisson.reactive; import org.reactivestreams.Publisher; import org.redisson.api.RFuture; +import org.redisson.command.CommandAsyncExecutor; import reactor.fn.Supplier; diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveService.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java similarity index 93% rename from redisson/src/main/java/org/redisson/command/CommandReactiveService.java rename to redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java index 84dc2422b..cf99fe476 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveService.java +++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java @@ -13,12 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson.command; +package org.redisson.reactive; import org.reactivestreams.Publisher; import org.redisson.api.RFuture; +import org.redisson.command.CommandAsyncService; import org.redisson.connection.ConnectionManager; -import org.redisson.reactive.NettyFuturePublisher; import reactor.fn.Supplier; diff --git a/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java b/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java index 6934657d2..c177fdeed 100644 --- a/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java +++ b/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.redisson.api.RFuture; -import org.redisson.command.CommandReactiveExecutor; import reactor.fn.Supplier; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 4b91bd990..be3c669d1 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -72,7 +72,6 @@ import org.redisson.api.RStreamReactive; import org.redisson.api.RTopicReactive; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveBatchService; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java index 43f15960f..fd104c204 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java @@ -23,7 +23,6 @@ import org.reactivestreams.Subscriber; import org.redisson.RedissonKeys; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.command.CommandReactiveService; import org.redisson.connection.MasterSlaveEntry; import io.netty.util.concurrent.Future; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListMultimapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListMultimapReactive.java index 1631c09e6..3d6930350 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListMultimapReactive.java @@ -20,7 +20,6 @@ import org.redisson.api.RList; import org.redisson.api.RListMultimap; import org.redisson.api.RListReactive; import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveExecutor; /** * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index 8e16cd11e..f99cc7950 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -20,7 +20,6 @@ import org.reactivestreams.Subscriber; import org.redisson.RedissonList; import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java index 2eaeecb47..312b3706d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java @@ -19,7 +19,6 @@ import org.redisson.RedissonReadWriteLock; import org.redisson.api.RLockReactive; import org.redisson.api.RReadWriteLock; import org.redisson.api.RReadWriteLockReactive; -import org.redisson.command.CommandReactiveExecutor; /** * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index 254c591dd..22e0d3b9a 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -22,7 +22,6 @@ import org.redisson.api.RScoredSortedSetAsync; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.command.CommandReactiveExecutor; /** * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java index 4ee892a5a..5a9a90d3b 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java @@ -20,7 +20,6 @@ import org.redisson.api.RSet; import org.redisson.api.RSetMultimap; import org.redisson.api.RSetReactive; import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveExecutor; /** * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java index 896d2ae6b..d1a4ce91e 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java @@ -30,7 +30,6 @@ import org.redisson.api.RTransaction; import org.redisson.api.RTransactionReactive; import org.redisson.api.TransactionOptions; import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveExecutor; import org.redisson.transaction.RedissonTransaction; import reactor.fn.Supplier; From 6d20c1356cd72e7f13962bc42dde8e3a0beb11ae Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Oct 2018 13:38:31 +0300 Subject: [PATCH 4/8] RExecutorService.invokeAny execution optimization --- .../java/org/redisson/RedissonExecutorService.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index dffd5dc09..cdd25145c 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -974,13 +974,18 @@ public class RedissonExecutorService implements RScheduledExecutorService { if (tasks == null) { throw new NullPointerException(); } - - RExecutorBatchFuture future = submit(tasks.toArray(new Callable[tasks.size()])); - io.netty.util.concurrent.Future result = poll(future.getTaskFutures(), timeout, unit); + + List> futures = new ArrayList>(); + for (Callable callable : tasks) { + RExecutorFuture future = submit(callable); + futures.add(future); + } + + io.netty.util.concurrent.Future result = poll(futures, timeout, unit); if (result == null) { throw new TimeoutException(); } - for (RExecutorFuture f : future.getTaskFutures()) { + for (RExecutorFuture f : futures) { f.cancel(true); } return result.getNow(); From db19103b7947878fec5609a5c4b7dc4c2b63d25b Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Oct 2018 14:44:49 +0300 Subject: [PATCH 5/8] Fixed - DecoderException when using connection ping #1497 --- .../main/java/org/redisson/client/handler/CommandDecoder.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 71728db88..c7cf4e667 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -195,8 +195,10 @@ public class CommandDecoder extends ReplayingDecoder { } decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts(), false); } else { + if (in.isReadable()) { + decode(in, cmd, firstLevel.getParts(), ctx.channel(), false); + } decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts(), false); - decode(in, cmd, null, ctx.channel(), false); } } } From 9c4c57f72d3330750b6558aecacc7afb997b03b0 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Oct 2018 14:46:03 +0300 Subject: [PATCH 6/8] Improvement - memory allocation optimization during ExecutorService task execution. --- .../redisson/executor/TasksRunnerService.java | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 8588aaa29..cd478a412 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -49,7 +49,7 @@ import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; @@ -195,11 +195,8 @@ public class TasksRunnerService implements RemoteExecutorService { public Object executeCallable(TaskParameters params) { renewRetryTime(params.getRequestId()); - ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(params.getState().length); try { - buf.writeBytes(params.getState()); - - Callable callable = decode(params.getClassName(), params.getClassBody(), buf); + Callable callable = decode(params); return callable.call(); } catch (RedissonShutdownException e) { return null; @@ -209,7 +206,6 @@ public class TasksRunnerService implements RemoteExecutorService { } catch (Exception e) { throw new IllegalArgumentException(e); } finally { - buf.release(); finish(params.getRequestId()); } } @@ -260,26 +256,28 @@ public class TasksRunnerService implements RemoteExecutorService { } @SuppressWarnings("unchecked") - private T decode(String className, byte[] classBody, ByteBuf buf) throws IOException { - ByteBuf classBodyBuf = ByteBufAllocator.DEFAULT.buffer(classBody.length); + private T decode(TaskParameters params) throws IOException { + ByteBuf classBodyBuf = Unpooled.wrappedBuffer(params.getClassBody()); + ByteBuf stateBuf = Unpooled.wrappedBuffer(params.getState()); try { HashValue hash = new HashValue(Hash.hash128(classBodyBuf)); Codec classLoaderCodec = codecs.get(hash); if (classLoaderCodec == null) { RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader()); - cl.loadClass(className, classBody); + cl.loadClass(params.getClassName(), params.getClassBody()); classLoaderCodec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl); codecs.put(hash, classLoaderCodec); } - T task = (T) classLoaderCodec.getValueDecoder().decode(buf, null); + T task = (T) classLoaderCodec.getValueDecoder().decode(stateBuf, null); Injector.inject(task, redisson); return task; } catch (Exception e) { throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); } finally { classBodyBuf.release(); + stateBuf.release(); } } @@ -289,11 +287,8 @@ public class TasksRunnerService implements RemoteExecutorService { renewRetryTime(params.getRequestId()); } - ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(params.getState().length); try { - buf.writeBytes(params.getState()); - - Runnable runnable = decode(params.getClassName(), params.getClassBody(), buf); + Runnable runnable = decode(params); runnable.run(); } catch (RedissonShutdownException e) { // skip @@ -302,7 +297,6 @@ public class TasksRunnerService implements RemoteExecutorService { } catch (Exception e) { throw new IllegalArgumentException(e); } finally { - buf.release(); finish(params.getRequestId()); } } From 230233086e6d851c8a0fb253d6c35c0954fc1767 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Oct 2018 14:46:30 +0300 Subject: [PATCH 7/8] Improvement - memory allocation optimization during command encoding --- .../org/redisson/client/handler/CommandEncoder.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java index 305abe142..01b7e66c8 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -113,19 +114,13 @@ public class CommandEncoder extends MessageToByteEncoder> { private ByteBuf encode(Object in) { if (in instanceof byte[]) { - byte[] payload = (byte[])in; - ByteBuf out = ByteBufAllocator.DEFAULT.buffer(payload.length); - out.writeBytes(payload); - return out; + return Unpooled.wrappedBuffer((byte[])in); } if (in instanceof ByteBuf) { return (ByteBuf) in; } if (in instanceof ChannelName) { - byte[] payload = ((ChannelName)in).getName(); - ByteBuf out = ByteBufAllocator.DEFAULT.buffer(payload.length); - out.writeBytes(payload); - return out; + return Unpooled.wrappedBuffer(((ChannelName)in).getName()); } String payload = in.toString(); From c4762a34a4fd6d0d27bf7f962c840510c4e568e7 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Oct 2018 15:39:23 +0300 Subject: [PATCH 8/8] Feature - Lambda support for RExecutorService #1183 #1656 --- .../org/redisson/RedissonExecutorService.java | 139 +++++++++++++----- .../redisson/executor/TasksRunnerService.java | 14 +- .../executor/params/ScheduledParameters.java | 4 +- .../executor/params/TaskParameters.java | 11 +- 4 files changed, 128 insertions(+), 40 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index cdd25145c..42f064a4b 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -15,10 +15,15 @@ */ package org.redisson; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.lang.invoke.SerializedLambda; import java.lang.ref.ReferenceQueue; +import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; @@ -131,7 +136,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final ScheduledTasksService scheduledRemoteService; private final TasksService executorRemoteService; - private final Map, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap(); + private final Map, ClassBody> class2body = PlatformDependent.newConcurrentHashMap(); private final String name; private final String requestQueueName; @@ -316,9 +321,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public void execute(Runnable task) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); - RemotePromise promise = (RemotePromise)asyncServiceWithoutResult.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state)); + RemotePromise promise = (RemotePromise)asyncServiceWithoutResult.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); syncExecute(promise); } @@ -332,9 +337,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); for (Runnable task : tasks) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); - asyncServiceWithoutResult.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state)); + asyncServiceWithoutResult.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); } List result = (List) executorRemoteService.executeAdd(); @@ -373,26 +378,85 @@ public class RedissonExecutorService implements RScheduledExecutorService { } } } + + public static class ClassBody { + + private byte[] lambda; + private byte[] clazz; + private String clazzName; + + public ClassBody(byte[] lambda, byte[] clazz, String clazzName) { + super(); + this.lambda = lambda; + this.clazz = clazz; + this.clazzName = clazzName; + } + + public String getClazzName() { + return clazzName; + } + + public byte[] getClazz() { + return clazz; + } + + public byte[] getLambda() { + return lambda; + } + + } - private byte[] getClassBody(Object task) { + private ClassBody getClassBody(Object task) { Class c = task.getClass(); - byte[] classBody = class2bytes.get(c); - if (classBody == null) { + ClassBody result = class2body.get(c); + if (result == null) { String className = c.getName(); String classAsPath = className.replace('.', '/') + ".class"; InputStream classStream = c.getClassLoader().getResourceAsStream(classAsPath); - DataInputStream s = new DataInputStream(classStream); + byte[] lambdaBody = null; + if (classStream == null) { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + try { + ObjectOutput oo = new ObjectOutputStream(os); + oo.writeObject(task); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to serialize lambda", e); + } + lambdaBody = os.toByteArray(); + + SerializedLambda lambda; + try { + Method writeReplace = task.getClass().getDeclaredMethod("writeReplace"); + writeReplace.setAccessible(true); + lambda = (SerializedLambda) writeReplace.invoke(task); + } catch (Exception ex) { + throw new IllegalArgumentException("Lambda should implement java.io.Serializable interface", ex); + } + + className = lambda.getCapturingClass().replace('/', '.'); + classStream = task.getClass().getClassLoader().getResourceAsStream(lambda.getCapturingClass() + ".class"); + } + + byte[] classBody; try { + DataInputStream s = new DataInputStream(classStream); classBody = new byte[s.available()]; s.readFully(classBody); } catch (IOException e) { throw new IllegalArgumentException(e); + } finally { + try { + classStream.close(); + } catch (IOException e) { + // skip + } } - class2bytes.put(c, classBody); + result = new ClassBody(lambdaBody, classBody, className); + class2body.put(c, result); } - return classBody; + return result; } @Override @@ -505,9 +569,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RExecutorFuture submitAsync(Callable task) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); - RemotePromise result = (RemotePromise) asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state)); + RemotePromise result = (RemotePromise) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); addListener(result); return createFuture(result); } @@ -523,9 +587,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); for (Callable task : tasks) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); - RemotePromise promise = (RemotePromise)asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state)); + RemotePromise promise = (RemotePromise)asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -549,9 +613,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { final List> result = new ArrayList>(); for (Callable task : tasks) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); - RemotePromise promise = (RemotePromise)asyncService.executeCallable(new TaskParameters(task.getClass().getName(), classBody, state)); + RemotePromise promise = (RemotePromise)asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -651,9 +715,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); for (Runnable task : tasks) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); - RemotePromise promise = (RemotePromise)asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state)); + RemotePromise promise = (RemotePromise)asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -677,9 +741,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { final List> result = new ArrayList>(); for (Runnable task : tasks) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); - RemotePromise promise = (RemotePromise)asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state)); + RemotePromise promise = (RemotePromise)asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -721,9 +785,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RExecutorFuture submitAsync(Runnable task) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); - RemotePromise result = (RemotePromise) asyncService.executeRunnable(new TaskParameters(task.getClass().getName(), classBody, state)); + RemotePromise result = (RemotePromise) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); addListener(result); return createFuture(result); } @@ -789,10 +853,10 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RScheduledFuture scheduleAsync(Runnable task, long delay, TimeUnit unit) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); long startTime = System.currentTimeMillis() + unit.toMillis(delay); - RemotePromise result = (RemotePromise) asyncScheduledService.scheduleRunnable(new ScheduledParameters(task.getClass().getName(), classBody, state, startTime)); + RemotePromise result = (RemotePromise) asyncScheduledService.scheduleRunnable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime)); addListener(result); return createFuture(result, startTime); @@ -810,10 +874,10 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RScheduledFuture scheduleAsync(Callable task, long delay, TimeUnit unit) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); long startTime = System.currentTimeMillis() + unit.toMillis(delay); - RemotePromise result = (RemotePromise) asyncScheduledService.scheduleCallable(new ScheduledParameters(task.getClass().getName(), classBody, state, startTime)); + RemotePromise result = (RemotePromise) asyncScheduledService.scheduleCallable(new ScheduledParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state, startTime)); addListener(result); return createFuture(result, startTime); } @@ -830,12 +894,13 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RScheduledFuture scheduleAtFixedRateAsync(Runnable task, long initialDelay, long period, TimeUnit unit) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); ScheduledAtFixedRateParameters params = new ScheduledAtFixedRateParameters(); - params.setClassName(task.getClass().getName()); - params.setClassBody(classBody); + params.setClassName(classBody.getClazzName()); + params.setClassBody(classBody.getClazz()); + params.setLambdaBody(classBody.getLambda()); params.setState(state); params.setStartTime(startTime); params.setPeriod(unit.toMillis(period)); @@ -857,7 +922,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RScheduledFuture scheduleAsync(Runnable task, CronSchedule cronSchedule) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); final Date startDate = cronSchedule.getExpression().getNextValidTimeAfter(new Date()); if (startDate == null) { @@ -866,8 +931,9 @@ public class RedissonExecutorService implements RScheduledExecutorService { long startTime = startDate.getTime(); ScheduledCronExpressionParameters params = new ScheduledCronExpressionParameters(); - params.setClassName(task.getClass().getName()); - params.setClassBody(classBody); + params.setClassName(classBody.getClazzName()); + params.setClassBody(classBody.getClazz()); + params.setLambdaBody(classBody.getLambda()); params.setState(state); params.setStartTime(startTime); params.setCronExpression(cronSchedule.getExpression().getCronExpression()); @@ -896,13 +962,14 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RScheduledFuture scheduleWithFixedDelayAsync(Runnable task, long initialDelay, long delay, TimeUnit unit) { check(task); - byte[] classBody = getClassBody(task); + ClassBody classBody = getClassBody(task); byte[] state = encode(task); long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); ScheduledWithFixedDelayParameters params = new ScheduledWithFixedDelayParameters(); - params.setClassName(task.getClass().getName()); - params.setClassBody(classBody); + params.setClassName(classBody.getClazzName()); + params.setClassBody(classBody.getClazz()); + params.setLambdaBody(classBody.getLambda()); params.setState(state); params.setStartTime(startTime); params.setDelay(unit.toMillis(delay)); diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index cd478a412..a5e5c575f 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -15,7 +15,9 @@ */ package org.redisson.executor; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.ObjectInput; import java.util.Arrays; import java.util.Date; import java.util.Map; @@ -36,6 +38,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; +import org.redisson.codec.CustomObjectInputStream; import org.redisson.command.CommandExecutor; import org.redisson.executor.params.ScheduledAtFixedRateParameters; import org.redisson.executor.params.ScheduledCronExpressionParameters; @@ -270,7 +273,16 @@ public class TasksRunnerService implements RemoteExecutorService { codecs.put(hash, classLoaderCodec); } - T task = (T) classLoaderCodec.getValueDecoder().decode(stateBuf, null); + T task; + if (params.getLambdaBody() != null) { + ByteArrayInputStream is = new ByteArrayInputStream(params.getLambdaBody()); + ObjectInput oo = new CustomObjectInputStream(classLoaderCodec.getClassLoader(), is); + task = (T) oo.readObject(); + oo.close(); + } else { + task = (T) classLoaderCodec.getValueDecoder().decode(stateBuf, null); + } + Injector.inject(task, redisson); return task; } catch (Exception e) { diff --git a/redisson/src/main/java/org/redisson/executor/params/ScheduledParameters.java b/redisson/src/main/java/org/redisson/executor/params/ScheduledParameters.java index b538ab39b..f3c902cfa 100644 --- a/redisson/src/main/java/org/redisson/executor/params/ScheduledParameters.java +++ b/redisson/src/main/java/org/redisson/executor/params/ScheduledParameters.java @@ -27,8 +27,8 @@ public class ScheduledParameters extends TaskParameters { public ScheduledParameters() { } - public ScheduledParameters(String className, byte[] classBody, byte[] state, long startTime) { - super(className, classBody, state); + public ScheduledParameters(String className, byte[] classBody, byte[] lambdaBody, byte[] state, long startTime) { + super(className, classBody, lambdaBody, state); this.startTime = startTime; } diff --git a/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java b/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java index e1673dbcd..1ececf8a9 100644 --- a/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java +++ b/redisson/src/main/java/org/redisson/executor/params/TaskParameters.java @@ -28,17 +28,26 @@ public class TaskParameters implements Serializable { private String className; private byte[] classBody; + private byte[] lambdaBody; private byte[] state; private String requestId; public TaskParameters() { } - public TaskParameters(String className, byte[] classBody, byte[] state) { + public TaskParameters(String className, byte[] classBody, byte[] lambdaBody, byte[] state) { super(); this.className = className; this.classBody = classBody; this.state = state; + this.lambdaBody = lambdaBody; + } + + public byte[] getLambdaBody() { + return lambdaBody; + } + public void setLambdaBody(byte[] lambdaBody) { + this.lambdaBody = lambdaBody; } public String getClassName() {