diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandBatchEncoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandBatchEncoder.java index 6986545e2..fda60d7a9 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandBatchEncoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandBatchEncoder.java @@ -19,8 +19,9 @@ import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.MessageToByteEncoder; /** @@ -32,6 +33,17 @@ import io.netty.handler.codec.MessageToByteEncoder; public class CommandBatchEncoder extends MessageToByteEncoder { public static final CommandBatchEncoder INSTANCE = new CommandBatchEncoder(); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (acceptOutboundMessage(msg)) { + if (!promise.setUncancellable()) { + return; + } + } + + super.write(ctx, msg, promise); + } @Override protected void encode(ChannelHandlerContext ctx, CommandsData msg, ByteBuf out) throws Exception { diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java index 6dba012f7..d809bd77c 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java @@ -59,6 +59,12 @@ public class CommandEncoder extends MessageToByteEncoder> { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (acceptOutboundMessage(msg)) { + if (!promise.setUncancellable()) { + return; + } + } + try { super.write(ctx, msg, promise); } catch (Exception e) { diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index c7651df1b..80f6950bf 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -1,5 +1,7 @@ package org.redisson; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -9,13 +11,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import static org.assertj.core.api.Assertions.*; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; import org.redisson.api.RBatch; import org.redisson.api.RFuture; import org.redisson.api.RListAsync; +import org.redisson.api.RMapCacheAsync; import org.redisson.api.RScript; import org.redisson.api.RScript.Mode; import org.redisson.client.RedisException; @@ -41,6 +43,16 @@ public class RedissonBatchTest extends BaseTest { System.out.println(t); } + @Test + public void testWriteTimeout() { + RBatch batch = redisson.createBatch(); + for (int i = 0; i < 200000; i++) { + RMapCacheAsync map = batch.getMapCache("test"); + map.putAsync("" + i, "" + i, 10, TimeUnit.SECONDS); + } + batch.execute(); + } + @Test public void testSkipResult() { Assume.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.2.0") > 0); diff --git a/redisson/src/test/java/org/redisson/RedissonMapTest.java b/redisson/src/test/java/org/redisson/RedissonMapTest.java index 0c681a53b..8d6ed59ea 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapTest.java @@ -26,6 +26,8 @@ import org.redisson.client.codec.StringCodec; import org.redisson.codec.JsonJacksonCodec; import org.redisson.config.Config; +import net.bytebuddy.utility.RandomString; + public class RedissonMapTest extends BaseMapTest { public static class SimpleKey implements Serializable { @@ -420,6 +422,16 @@ public class RedissonMapTest extends BaseMapTest { assertThat(rmap.readAllEntrySet()).containsExactlyElementsOf(map.entrySet()); } + @Test + public void testWriteTimeout() { + Map map = redisson.getMap("simple"); + Map joinMap = new HashMap<>(); + for (int i = 0; i < 200000; i++) { + joinMap.put(RandomString.make(1024), RandomString.make(1024)); + } + map.putAll(joinMap); + } + @Test public void testPutAll() { Map map = redisson.getMap("simple");