Fixed possible race-condition during write operation cancellation. #1061

pull/1068/head
Nikita 7 years ago
parent 9e4767690c
commit f30351f94b

@ -19,8 +19,9 @@ import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.MessageToByteEncoder;
/**
@ -32,6 +33,17 @@ import io.netty.handler.codec.MessageToByteEncoder;
public class CommandBatchEncoder extends MessageToByteEncoder<CommandsData> {
public static final CommandBatchEncoder INSTANCE = new CommandBatchEncoder();
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (acceptOutboundMessage(msg)) {
if (!promise.setUncancellable()) {
return;
}
}
super.write(ctx, msg, promise);
}
@Override
protected void encode(ChannelHandlerContext ctx, CommandsData msg, ByteBuf out) throws Exception {

@ -59,6 +59,12 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<?, ?>> {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (acceptOutboundMessage(msg)) {
if (!promise.setUncancellable()) {
return;
}
}
try {
super.write(ctx, msg, promise);
} catch (Exception e) {

@ -1,5 +1,7 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -9,13 +11,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.assertj.core.api.Assertions.*;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.redisson.api.RBatch;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RScript;
import org.redisson.api.RScript.Mode;
import org.redisson.client.RedisException;
@ -41,6 +43,16 @@ public class RedissonBatchTest extends BaseTest {
System.out.println(t);
}
@Test
public void testWriteTimeout() {
RBatch batch = redisson.createBatch();
for (int i = 0; i < 200000; i++) {
RMapCacheAsync<String, String> map = batch.getMapCache("test");
map.putAsync("" + i, "" + i, 10, TimeUnit.SECONDS);
}
batch.execute();
}
@Test
public void testSkipResult() {
Assume.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.2.0") > 0);

@ -26,6 +26,8 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import net.bytebuddy.utility.RandomString;
public class RedissonMapTest extends BaseMapTest {
public static class SimpleKey implements Serializable {
@ -420,6 +422,16 @@ public class RedissonMapTest extends BaseMapTest {
assertThat(rmap.readAllEntrySet()).containsExactlyElementsOf(map.entrySet());
}
@Test
public void testWriteTimeout() {
Map<String, String> map = redisson.getMap("simple");
Map<String, String> joinMap = new HashMap<>();
for (int i = 0; i < 200000; i++) {
joinMap.put(RandomString.make(1024), RandomString.make(1024));
}
map.putAll(joinMap);
}
@Test
public void testPutAll() {
Map<Integer, String> map = redisson.getMap("simple");

Loading…
Cancel
Save