From 60352b8490220cfeecc99ca51d73b867f563838c Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 19 Dec 2018 17:16:12 +0300 Subject: [PATCH 1/8] Fixed - error during channel initialization is not logged. #1566 --- .../org/redisson/client/handler/CommandDecoder.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 8744c37cb..b73dba41d 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -143,6 +143,17 @@ public class CommandDecoder extends ReplayingDecoder { sendNext(ctx); throw e; } + } else { + try { + while (in.writerIndex() > in.readerIndex()) { + decode(in, null, null, ctx.channel(), false); + } + sendNext(ctx); + } catch (Exception e) { + log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e); + sendNext(ctx); + throw e; + } } } @@ -335,7 +346,7 @@ public class CommandDecoder extends ReplayingDecoder { if (data != null) { data.tryFailure(new RedisException(error + ". channel: " + channel + " command: " + LogHelper.toString(data))); } else { - log.error("Error: {} channel: {} data: {}", error, channel, LogHelper.toString(data)); + log.error("Error message from Redis: {} channel: {}", error, channel); } } } finally { From e2690921a38ccfc3206ef4a4d03523474f41af6b Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 20 Dec 2018 08:43:00 +0300 Subject: [PATCH 2/8] Fixed - RBitSet object couldn't be used as nested object. #1751 --- .../java/org/redisson/RedissonBitSet.java | 2 +- .../java/org/redisson/RedissonObject.java | 20 +++++++++---------- .../redisson/command/CommandAsyncService.java | 9 ++------- .../core/RedissonObjectBuilder.java | 4 +++- .../redisson/misc/RedissonObjectFactory.java | 8 ++++++-- .../org/redisson/RedissonReferenceTest.java | 17 ++++++++++++++++ 6 files changed, 39 insertions(+), 21 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBitSet.java b/redisson/src/main/java/org/redisson/RedissonBitSet.java index a58fcce46..87ac51e70 100644 --- a/redisson/src/main/java/org/redisson/RedissonBitSet.java +++ b/redisson/src/main/java/org/redisson/RedissonBitSet.java @@ -38,7 +38,7 @@ import org.redisson.command.CommandBatchService; public class RedissonBitSet extends RedissonExpirable implements RBitSet { public RedissonBitSet(CommandAsyncExecutor connectionManager, String name) { - super(connectionManager, name); + super(null, connectionManager, name); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index b65a639e3..2592756fe 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -101,7 +101,7 @@ public abstract class RedissonObject implements RObject { @Override public RFuture sizeInMemoryAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.MEMORY_USAGE, getName()); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.MEMORY_USAGE, getName()); } public final RFuture sizeInMemoryAsync(List keys) { @@ -124,7 +124,7 @@ public abstract class RedissonObject implements RObject { @Override public RFuture renameAsync(String newName) { - return commandExecutor.writeAsync(getName(), RedisCommands.RENAME, getName(), newName); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.RENAME, getName(), newName); } @Override @@ -134,7 +134,7 @@ public abstract class RedissonObject implements RObject { @Override public RFuture migrateAsync(String host, int port, int database, long timeout) { - return commandExecutor.writeAsync(getName(), RedisCommands.MIGRATE, host, port, getName(), database, timeout); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.MIGRATE, host, port, getName(), database, timeout); } @Override @@ -144,7 +144,7 @@ public abstract class RedissonObject implements RObject { @Override public RFuture copyAsync(String host, int port, int database, long timeout) { - return commandExecutor.writeAsync(getName(), RedisCommands.MIGRATE, host, port, getName(), database, timeout, "COPY"); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.MIGRATE, host, port, getName(), database, timeout, "COPY"); } @Override @@ -154,7 +154,7 @@ public abstract class RedissonObject implements RObject { @Override public RFuture moveAsync(int database) { - return commandExecutor.writeAsync(getName(), RedisCommands.MOVE, getName(), database); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.MOVE, getName(), database); } @Override @@ -164,7 +164,7 @@ public abstract class RedissonObject implements RObject { @Override public RFuture renamenxAsync(String newName) { - return commandExecutor.writeAsync(getName(), RedisCommands.RENAMENX, getName(), newName); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.RENAMENX, getName(), newName); } @Override @@ -174,7 +174,7 @@ public abstract class RedissonObject implements RObject { @Override public RFuture deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName()); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.DEL_BOOL, getName()); } @Override @@ -184,7 +184,7 @@ public abstract class RedissonObject implements RObject { @Override public RFuture unlinkAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.UNLINK_BOOL, getName()); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.UNLINK_BOOL, getName()); } @Override @@ -194,7 +194,7 @@ public abstract class RedissonObject implements RObject { @Override public RFuture touchAsync() { - return commandExecutor.writeAsync(getName(), codec, RedisCommands.TOUCH, getName()); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.TOUCH, getName()); } @Override @@ -204,7 +204,7 @@ public abstract class RedissonObject implements RObject { @Override public RFuture isExistsAsync() { - return commandExecutor.readAsync(getName(), codec, RedisCommands.EXISTS, getName()); + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.EXISTS, getName()); } @Override diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index cabfd1ec7..63a382938 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -1070,12 +1070,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private void handleReference(RPromise mainPromise, R res) { - try { - mainPromise.trySuccess((R) tryHandleReference(res)); - } catch (Exception e) { - //fall back and let other part of the code handle the type conversion. - mainPromise.trySuccess(res); - } + mainPromise.trySuccess((R) tryHandleReference(res)); } protected Object tryHandleReference(Object o) { @@ -1198,7 +1193,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } return RedissonObjectFactory.fromReference(redissonRx, (RedissonReference) res); } catch (Exception exception) { - return res; + throw new IllegalStateException(exception); } } diff --git a/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java b/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java index c55587938..4bd973dfd 100644 --- a/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java +++ b/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java @@ -84,7 +84,9 @@ public class RedissonObjectBuilder { public void store(RObject ar, String fieldName, RMap liveMap) { Codec codec = ar.getCodec(); - codecProvider.registerCodec((Class) codec.getClass(), codec); + if (codec != null) { + codecProvider.registerCodec((Class) codec.getClass(), codec); + } liveMap.fastPut(fieldName, new RedissonReference(ar.getClass(), ar.getName(), codec)); } diff --git a/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java b/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java index a70083824..e7219369f 100644 --- a/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java +++ b/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java @@ -171,14 +171,18 @@ public class RedissonObjectFactory { Class clazz = object.getClass().getInterfaces()[0]; RObject rObject = ((RObject) object); - config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec()); + if (rObject.getCodec() != null) { + config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec()); + } return new RedissonReference(clazz, rObject.getName(), rObject.getCodec()); } if (object instanceof RObjectReactive && !(object instanceof RLiveObject)) { Class clazz = object.getClass().getInterfaces()[0]; RObjectReactive rObject = ((RObjectReactive) object); - config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec()); + if (rObject.getCodec() != null) { + config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec()); + } return new RedissonReference(clazz, rObject.getName(), rObject.getCodec()); } diff --git a/redisson/src/test/java/org/redisson/RedissonReferenceTest.java b/redisson/src/test/java/org/redisson/RedissonReferenceTest.java index 38628334e..9ebf95f99 100644 --- a/redisson/src/test/java/org/redisson/RedissonReferenceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReferenceTest.java @@ -2,8 +2,10 @@ package org.redisson; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; +import static org.assertj.core.api.Assertions.*; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -19,6 +21,21 @@ import org.redisson.config.Config; */ public class RedissonReferenceTest extends BaseTest { + @Test + public void testBitSet() { + RMap data = redisson.getMap("data-00"); + RBitSet bs = redisson.getBitSet("data-01"); + bs.set(5); + bs.set(7); + data.put("a", bs); + + assertThat(data.entrySet()).hasSize(1); + for (Map.Entry entry : data.entrySet()) { + assertThat(entry.getValue().get(5)).isTrue(); + assertThat(entry.getValue().get(7)).isTrue(); + } + } + @Test public void testBasic() { RBucket b1 = redisson.getBucket("b1"); From cd4326d6d2eeed78336df9bd40fcca01c0d6bcb6 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 20 Dec 2018 11:13:30 +0300 Subject: [PATCH 3/8] Feature - RTopic.countSubscribers method added. #1472 --- .../src/main/java/org/redisson/RedissonTopic.java | 14 +++++++++++++- .../src/main/java/org/redisson/api/RTopic.java | 10 +++++++++- .../main/java/org/redisson/api/RTopicAsync.java | 8 ++++++++ .../main/java/org/redisson/api/RTopicReactive.java | 9 +++++++++ .../src/main/java/org/redisson/api/RTopicRx.java | 8 ++++++++ .../redisson/client/protocol/RedisCommands.java | 1 + .../test/java/org/redisson/RedissonTopicTest.java | 14 ++++++++++++++ 7 files changed, 62 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 7c846f5cd..6ea6d403f 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -27,6 +27,8 @@ import org.redisson.client.ChannelName; import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.config.MasterSlaveServersConfig; @@ -80,7 +82,7 @@ public class RedissonTopic implements RTopic { @Override public RFuture publishAsync(Object message) { - return commandExecutor.writeAsync(name, codec, RedisCommands.PUBLISH, name, encode(message)); + return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PUBLISH, name, encode(message)); } protected ByteBuf encode(Object value) { @@ -273,4 +275,14 @@ public class RedissonTopic implements RTopic { return 0; } + @Override + public RFuture countSubscribersAsync() { + return commandExecutor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.PUBSUB_NUMSUB, name); + } + + @Override + public long countSubscribers() { + return commandExecutor.get(countSubscribersAsync()); + } + } diff --git a/redisson/src/main/java/org/redisson/api/RTopic.java b/redisson/src/main/java/org/redisson/api/RTopic.java index 63a0e67d5..d75c9f537 100644 --- a/redisson/src/main/java/org/redisson/api/RTopic.java +++ b/redisson/src/main/java/org/redisson/api/RTopic.java @@ -85,9 +85,17 @@ public interface RTopic extends RTopicAsync { void removeAllListeners(); /** - * Returns amount of registered listeners + * Returns amount of registered listeners to this topic * * @return amount of listeners */ int countListeners(); + + /** + * Returns amount of subscribers to this topic across all Redisson instances. + * Each subscriber may have multiple listeners. + * + * @return amount of subscribers + */ + long countSubscribers(); } diff --git a/redisson/src/main/java/org/redisson/api/RTopicAsync.java b/redisson/src/main/java/org/redisson/api/RTopicAsync.java index 380d8851c..1b69136bb 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicAsync.java +++ b/redisson/src/main/java/org/redisson/api/RTopicAsync.java @@ -72,4 +72,12 @@ public interface RTopicAsync { */ RFuture removeListenerAsync(MessageListener listener); + /** + * Returns amount of subscribers to this topic across all Redisson instances. + * Each subscriber may have multiple listeners. + * + * @return amount of subscribers + */ + RFuture countSubscribersAsync(); + } diff --git a/redisson/src/main/java/org/redisson/api/RTopicReactive.java b/redisson/src/main/java/org/redisson/api/RTopicReactive.java index 27385f46a..9457a00f1 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicReactive.java +++ b/redisson/src/main/java/org/redisson/api/RTopicReactive.java @@ -72,4 +72,13 @@ public interface RTopicReactive { * @param listenerId - listener id */ void removeListener(int listenerId); + + /** + * Returns amount of subscribers to this topic across all Redisson instances. + * Each subscriber may have multiple listeners. + * + * @return amount of subscribers + */ + Publisher countSubscribers(); + } diff --git a/redisson/src/main/java/org/redisson/api/RTopicRx.java b/redisson/src/main/java/org/redisson/api/RTopicRx.java index eedfbf723..ffdc929a5 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicRx.java +++ b/redisson/src/main/java/org/redisson/api/RTopicRx.java @@ -82,5 +82,13 @@ public interface RTopicRx { * @return stream of messages */ Flowable getMessages(Class type); + + /** + * Returns amount of subscribers to this topic across all Redisson instances. + * Each subscriber may have multiple listeners. + * + * @return amount of subscribers + */ + Flowable countSubscribers(); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 825c67a10..75ce207ef 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -426,6 +426,7 @@ public interface RedisCommands { RedisStrictCommand QUIT = new RedisStrictCommand("QUIT", new VoidReplayConvertor()); RedisStrictCommand PUBLISH = new RedisStrictCommand("PUBLISH"); + RedisCommand PUBSUB_NUMSUB = new RedisCommand("PUBSUB", "NUMSUB", new ListObjectDecoder(1)); RedisCommand SUBSCRIBE = new RedisCommand("SUBSCRIBE", new PubSubStatusDecoder()); RedisCommand UNSUBSCRIBE = new RedisCommand("UNSUBSCRIBE", new PubSubStatusDecoder()); diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 19da8d3c2..a7e36f4d8 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -107,6 +107,20 @@ public class RedissonTopicTest { } + @Test + public void testCountSubscribers() { + RedissonClient redisson = BaseTest.createInstance(); + RTopic topic1 = redisson.getTopic("topic", LongCodec.INSTANCE); + assertThat(topic1.countSubscribers()).isZero(); + int id = topic1.addListener(Long.class, (channel, msg) -> { + }); + assertThat(topic1.countSubscribers()).isOne(); + topic1.removeListener(id); + assertThat(topic1.countSubscribers()).isZero(); + + redisson.shutdown(); + } + @Test public void testCountListeners() { RedissonClient redisson = BaseTest.createInstance(); From 97e582aea45684d9a00755a32fa3fd124b8900ff Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 20 Dec 2018 16:19:16 +0300 Subject: [PATCH 4/8] refactoring --- .../org/redisson/RedissonAtomicDouble.java | 4 +-- .../java/org/redisson/RedissonAtomicLong.java | 4 +-- .../convertor/BitsSizeReplayConvertor.java | 7 ++++- .../BooleanAmountReplayConvertor.java | 7 ++++- .../BooleanNotNullReplayConvertor.java | 7 ++++- .../convertor/BooleanNullReplayConvertor.java | 7 ++++- .../BooleanNullSafeReplayConvertor.java | 7 ++++- .../BooleanNumberReplayConvertor.java | 7 ++++- .../convertor/BooleanReplayConvertor.java | 2 +- .../client/protocol/convertor/Convertor.java | 2 -- .../convertor/DoubleReplayConvertor.java | 7 ++++- .../protocol/convertor/EmptyConvertor.java | 8 ++++- .../convertor/IntegerReplayConvertor.java | 2 +- .../convertor/LongReplayConvertor.java | 7 ++++- .../protocol/convertor/NumberConvertor.java | 7 ++++- .../protocol/convertor/SingleConvertor.java | 31 ------------------- .../protocol/convertor/StreamIdConvertor.java | 2 +- .../convertor/StringToListConvertor.java | 4 +-- .../convertor/TrueReplayConvertor.java | 7 ++++- .../protocol/convertor/TypeConvertor.java | 7 ++++- .../convertor/VoidReplayConvertor.java | 7 ++++- 21 files changed, 87 insertions(+), 56 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/client/protocol/convertor/SingleConvertor.java diff --git a/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java b/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java index aa18211ee..6031c02a2 100644 --- a/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java +++ b/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java @@ -24,7 +24,7 @@ import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import org.redisson.command.CommandAsyncExecutor; /** @@ -110,7 +110,7 @@ public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDo @Override public RFuture getAndAddAsync(final double delta) { - return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand("INCRBYFLOAT", new SingleConvertor() { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand("INCRBYFLOAT", new Convertor() { @Override public Double convert(Object obj) { return Double.valueOf(obj.toString()) - delta; diff --git a/redisson/src/main/java/org/redisson/RedissonAtomicLong.java b/redisson/src/main/java/org/redisson/RedissonAtomicLong.java index bea90ab65..7c96e0dc4 100644 --- a/redisson/src/main/java/org/redisson/RedissonAtomicLong.java +++ b/redisson/src/main/java/org/redisson/RedissonAtomicLong.java @@ -23,7 +23,7 @@ import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import org.redisson.command.CommandAsyncExecutor; /** @@ -108,7 +108,7 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong @Override public RFuture getAndAddAsync(final long delta) { - return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand("INCRBY", new SingleConvertor() { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand("INCRBY", new Convertor() { @Override public Long convert(Object obj) { return ((Long) obj) - delta; diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/BitsSizeReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/BitsSizeReplayConvertor.java index 23b1a368f..9841b1dcb 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/BitsSizeReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/BitsSizeReplayConvertor.java @@ -15,7 +15,12 @@ */ package org.redisson.client.protocol.convertor; -public class BitsSizeReplayConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class BitsSizeReplayConvertor implements Convertor { @Override public Long convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanAmountReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanAmountReplayConvertor.java index ec031a4bf..cff93124a 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanAmountReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanAmountReplayConvertor.java @@ -15,7 +15,12 @@ */ package org.redisson.client.protocol.convertor; -public class BooleanAmountReplayConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class BooleanAmountReplayConvertor implements Convertor { @Override public Boolean convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNotNullReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNotNullReplayConvertor.java index c48bbe01e..0a4af12ee 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNotNullReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNotNullReplayConvertor.java @@ -15,7 +15,12 @@ */ package org.redisson.client.protocol.convertor; -public class BooleanNotNullReplayConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class BooleanNotNullReplayConvertor implements Convertor { @Override public Boolean convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNullReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNullReplayConvertor.java index d98597d87..f6c68e0de 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNullReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNullReplayConvertor.java @@ -15,7 +15,12 @@ */ package org.redisson.client.protocol.convertor; -public class BooleanNullReplayConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class BooleanNullReplayConvertor implements Convertor { @Override public Boolean convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNullSafeReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNullSafeReplayConvertor.java index 57349e1ce..f8cacaaac 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNullSafeReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNullSafeReplayConvertor.java @@ -15,7 +15,12 @@ */ package org.redisson.client.protocol.convertor; -public class BooleanNullSafeReplayConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class BooleanNullSafeReplayConvertor implements Convertor { @Override public Boolean convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNumberReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNumberReplayConvertor.java index 3d39cd3c4..42f236739 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNumberReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanNumberReplayConvertor.java @@ -15,7 +15,12 @@ */ package org.redisson.client.protocol.convertor; -public class BooleanNumberReplayConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class BooleanNumberReplayConvertor implements Convertor { private long number; diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanReplayConvertor.java index d502e0a39..9aa2e9415 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/BooleanReplayConvertor.java @@ -20,7 +20,7 @@ package org.redisson.client.protocol.convertor; * @author Nikita Koksharov * */ -public class BooleanReplayConvertor extends SingleConvertor { +public class BooleanReplayConvertor implements Convertor { @Override public Boolean convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/Convertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/Convertor.java index 0408f1cc8..eb0e39b67 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/Convertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/Convertor.java @@ -23,8 +23,6 @@ package org.redisson.client.protocol.convertor; */ public interface Convertor { - Object convertMulti(Object obj); - R convert(Object obj); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/DoubleReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/DoubleReplayConvertor.java index ee069073a..57aa541e0 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/DoubleReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/DoubleReplayConvertor.java @@ -15,7 +15,12 @@ */ package org.redisson.client.protocol.convertor; -public class DoubleReplayConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class DoubleReplayConvertor implements Convertor { @Override public Double convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/EmptyConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/EmptyConvertor.java index b2c10a530..cf152a2e7 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/EmptyConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/EmptyConvertor.java @@ -15,7 +15,13 @@ */ package org.redisson.client.protocol.convertor; -public class EmptyConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + * @param + */ +public class EmptyConvertor implements Convertor { @Override public R convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/IntegerReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/IntegerReplayConvertor.java index aba02f0de..526b08a73 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/IntegerReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/IntegerReplayConvertor.java @@ -20,7 +20,7 @@ package org.redisson.client.protocol.convertor; * @author Nikita Koksharov * */ -public class IntegerReplayConvertor extends SingleConvertor { +public class IntegerReplayConvertor implements Convertor { private Integer nullValue; diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/LongReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/LongReplayConvertor.java index c0f8ba6a0..e469771e3 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/LongReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/LongReplayConvertor.java @@ -15,7 +15,12 @@ */ package org.redisson.client.protocol.convertor; -public class LongReplayConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class LongReplayConvertor implements Convertor { @Override public Long convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/NumberConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/NumberConvertor.java index 6a52f1dba..7a27ef7ee 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/NumberConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/NumberConvertor.java @@ -17,7 +17,12 @@ package org.redisson.client.protocol.convertor; import java.math.BigDecimal; -public class NumberConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class NumberConvertor implements Convertor { private Class resultClass; diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/SingleConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/SingleConvertor.java deleted file mode 100644 index 3d175a043..000000000 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/SingleConvertor.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client.protocol.convertor; - -/** - * - * @author Nikita Koksharov - * - * @param type - */ -public abstract class SingleConvertor implements Convertor { - - @Override - public Object convertMulti(Object obj) { - return obj; - } - -} diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java index e37a1485a..c6f764c57 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java @@ -22,7 +22,7 @@ import org.redisson.api.StreamMessageId; * @author Nikita Koksharov * */ -public class StreamIdConvertor extends SingleConvertor { +public class StreamIdConvertor implements Convertor { @Override public StreamMessageId convert(Object id) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/StringToListConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/StringToListConvertor.java index d6e23db91..fe2eda497 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/StringToListConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/StringToListConvertor.java @@ -18,14 +18,12 @@ package org.redisson.client.protocol.convertor; import java.util.ArrayList; import java.util.List; -import org.redisson.client.protocol.convertor.SingleConvertor; - /** * * @author Nikita Koksharov * */ -public class StringToListConvertor extends SingleConvertor> { +public class StringToListConvertor implements Convertor> { @Override public List convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/TrueReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/TrueReplayConvertor.java index 761a5026d..c638c319c 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/TrueReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/TrueReplayConvertor.java @@ -15,7 +15,12 @@ */ package org.redisson.client.protocol.convertor; -public class TrueReplayConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class TrueReplayConvertor implements Convertor { @Override public Boolean convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/TypeConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/TypeConvertor.java index 4a7bb7018..b94565b26 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/TypeConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/TypeConvertor.java @@ -17,7 +17,12 @@ package org.redisson.client.protocol.convertor; import org.redisson.api.RType; -public class TypeConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class TypeConvertor implements Convertor { @Override public RType convert(Object obj) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/VoidReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/VoidReplayConvertor.java index 646f3d08d..46d14832f 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/VoidReplayConvertor.java +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/VoidReplayConvertor.java @@ -15,7 +15,12 @@ */ package org.redisson.client.protocol.convertor; -public class VoidReplayConvertor extends SingleConvertor { +/** + * + * @author Nikita Koksharov + * + */ +public class VoidReplayConvertor implements Convertor { @Override public Void convert(Object obj) { From 1a08db75b72f8863f9e5dc2fa4e3de5780d521d7 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 20 Dec 2018 17:20:38 +0300 Subject: [PATCH 5/8] Improvement - RedisConnectionClosedException removed. #1695 #1748 --- .../java/org/redisson/RedissonMultiLock.java | 7 +- .../RedisConnectionClosedException.java | 31 ------- .../client/handler/CommandsQueue.java | 6 -- .../redisson/command/CommandAsyncService.java | 80 ++++++++++--------- .../redisson/command/CommandBatchService.java | 39 ++++++--- .../reactive/CommandReactiveBatchService.java | 5 +- .../redisson/rx/CommandRxBatchService.java | 5 +- .../java/org/redisson/RedissonBatchTest.java | 11 ++- 8 files changed, 86 insertions(+), 98 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/client/RedisConnectionClosedException.java diff --git a/redisson/src/main/java/org/redisson/RedissonMultiLock.java b/redisson/src/main/java/org/redisson/RedissonMultiLock.java index 65de5907a..116f0a984 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiLock.java @@ -28,7 +28,6 @@ import java.util.concurrent.locks.Lock; import org.redisson.api.RFuture; import org.redisson.api.RLock; -import org.redisson.client.RedisConnectionClosedException; import org.redisson.client.RedisResponseTimeoutException; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -236,9 +235,6 @@ public class RedissonMultiLock implements Lock { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } - } catch (RedisConnectionClosedException e) { - unlockInner(Arrays.asList(lock)); - lockAcquired = false; } catch (RedisResponseTimeoutException e) { unlockInner(Arrays.asList(lock)); lockAcquired = false; @@ -321,8 +317,7 @@ public class RedissonMultiLock implements Lock { lockAcquired = future.getNow(); } - if (future.cause() instanceof RedisConnectionClosedException - || future.cause() instanceof RedisResponseTimeoutException) { + if (future.cause() instanceof RedisResponseTimeoutException) { unlockInnerAsync(Arrays.asList(lock), threadId); } diff --git a/redisson/src/main/java/org/redisson/client/RedisConnectionClosedException.java b/redisson/src/main/java/org/redisson/client/RedisConnectionClosedException.java deleted file mode 100644 index 0c5fae97a..000000000 --- a/redisson/src/main/java/org/redisson/client/RedisConnectionClosedException.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client; - -/** - * - * @author Nikita Koksharov - * - */ -public class RedisConnectionClosedException extends RedisConnectionException { - - private static final long serialVersionUID = -5162298227713965182L; - - public RedisConnectionClosedException(String msg) { - super(msg); - } - -} diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java index eb0dc0743..8d7f35e8e 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -21,7 +21,6 @@ import java.util.Queue; import java.util.regex.Pattern; import org.redisson.client.ChannelName; -import org.redisson.client.RedisConnectionClosedException; import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.QueueCommand; @@ -82,11 +81,6 @@ public class CommandsQueue extends ChannelDuplexHandler { command.getChannelPromise().tryFailure( new WriteRedisConnectionException("Channel has been closed! Can't write command: " + LogHelper.toString(command.getCommand()) + " to channel: " + ctx.channel())); - - if (command.getChannelPromise().isSuccess() && !command.getCommand().isBlockingCommand()) { - command.getCommand().tryFailure(new RedisConnectionClosedException("Command " - + LogHelper.toString(command.getCommand()) + " succesfully sent, but channel " + ctx.channel() + " has been closed!")); - } } super.channelInactive(ctx); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 63a382938..1934e7f8c 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -219,7 +219,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); - async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false, null); + async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false); return mainPromise; } @@ -227,21 +227,21 @@ public class CommandAsyncService implements CommandAsyncExecutor { public RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); int slot = connectionManager.calcSlot(name); - async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null); + async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false); return mainPromise; } public RFuture readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); int slot = connectionManager.calcSlot(key); - async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null); + async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture readAsync(RedisClient client, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); - async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false, null); + async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false); return mainPromise; } @@ -291,7 +291,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { for (MasterSlaveEntry entry : nodes) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(true, new NodeSource(entry), codec, command, params, promise, 0, true, null); + async(true, new NodeSource(entry), codec, command, params, promise, 0, true); } return mainPromise; } @@ -336,7 +336,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); MasterSlaveEntry entry = nodes.remove(0); - async(true, new NodeSource(entry), codec, command, params, attemptPromise, 0, false, null); + async(true, new NodeSource(entry), codec, command, params, attemptPromise, 0, false); } @Override @@ -392,7 +392,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { for (MasterSlaveEntry entry : nodes) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true, null); + async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true); } return mainPromise; } @@ -419,7 +419,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { public RFuture readAsync(String key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); NodeSource source = getNodeSource(key); - async(true, source, codec, command, params, mainPromise, 0, false, null); + async(true, source, codec, command, params, mainPromise, 0, false); return mainPromise; } @@ -427,20 +427,20 @@ public class CommandAsyncService implements CommandAsyncExecutor { public RFuture readAsync(byte[] key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); NodeSource source = getNodeSource(key); - async(true, source, codec, command, params, mainPromise, 0, false, null); + async(true, source, codec, command, params, mainPromise, 0, false); return mainPromise; } public RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); - async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null); + async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); - async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null); + async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false); return mainPromise; } @@ -510,7 +510,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { for (MasterSlaveEntry entry : entries) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true, null); + async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true); } return mainPromise; } @@ -590,7 +590,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - async(false, nodeSource, codec, command, args.toArray(), promise, 0, false, null); + async(false, nodeSource, codec, command, args.toArray(), promise, 0, false); promise.addListener(new FutureListener() { @Override @@ -613,8 +613,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(pps)); - async(false, nodeSource, codec, command, args.toArray(), mainPromise, 0, false, - null); + async(false, nodeSource, codec, command, args.toArray(), mainPromise, 0, false); } }); } else { @@ -636,7 +635,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false, null); + async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false); return mainPromise; } @@ -649,20 +648,20 @@ public class CommandAsyncService implements CommandAsyncExecutor { public RFuture writeAsync(String key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); NodeSource source = getNodeSource(key); - async(false, source, codec, command, params, mainPromise, 0, false, null); + async(false, source, codec, command, params, mainPromise, 0, false); return mainPromise; } public RFuture writeAsync(byte[] key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = createPromise(); NodeSource source = getNodeSource(key); - async(false, source, codec, command, params, mainPromise, 0, false, null); + async(false, source, codec, command, params, mainPromise, 0, false); return mainPromise; } public void async(final boolean readOnlyMode, final NodeSource source, final Codec codec, final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt, - final boolean ignoreRedirect, final RFuture connFuture) { + final boolean ignoreRedirect) { if (mainPromise.isCancelled()) { free(params); return; @@ -764,7 +763,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { count, details.getCommand(), LogHelper.toString(details.getParams())); } details.removeMainPromiseListener(); - async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect, connFuture); + async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect); AsyncDetails.release(details); } @@ -798,7 +797,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.getWriteFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - checkWriteFuture(details, connection); + checkWriteFuture(details, ignoreRedirect, connection); } }); @@ -831,7 +830,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } } - private void checkWriteFuture(final AsyncDetails details, final RedisConnection connection) { + private void checkWriteFuture(final AsyncDetails details, final boolean ignoreRedirect, final RedisConnection connection) { ChannelFuture future = details.getWriteFuture(); if (future.isCancelled() || details.getAttemptPromise().isDone()) { return; @@ -884,9 +883,26 @@ public class CommandAsyncService implements CommandAsyncExecutor { TimerTask timeoutTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { + if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) { + if (!details.getAttemptPromise().cancel(false)) { + return; + } + + int count = details.getAttempt() + 1; + if (log.isDebugEnabled()) { + log.debug("attempt {} for command {} and params {}", + count, details.getCommand(), LogHelper.toString(details.getParams())); + } + details.removeMainPromiseListener(); + async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect); + AsyncDetails.release(details); + return; + } + details.getAttemptPromise().tryFailure( - new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + details.getCommand() - + " with params: " + LogHelper.toString(details.getParams()) + " channel: " + connection.getChannel())); + new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured" + + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts. Command: " + details.getCommand() + + ", params: " + LogHelper.toString(details.getParams()) + ", channel: " + connection.getChannel())); } }; @@ -905,17 +921,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { final Timeout scheduledFuture; if (popTimeout != 0) { // handling cases when connection has been lost - final Channel orignalChannel = connection.getChannel(); scheduledFuture = connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - // re-connection hasn't been made - // and connection is still active -// if (orignalChannel == connection.getChannel() -// && connection.isActive()) { -// return; -// } - if (details.getAttemptPromise().trySuccess(null)) { connection.forceFastReconnectAsync(); } @@ -1004,7 +1012,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } @@ -1012,14 +1020,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (future.cause() instanceof RedisAskException && !ignoreRedirect) { RedisAskException ex = (RedisAskException) future.cause(); async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } if (future.cause() instanceof RedisLoadingException) { async(details.isReadOnlyMode(), source, details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } @@ -1029,7 +1037,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public void run(Timeout timeout) throws Exception { async(details.isReadOnlyMode(), source, details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); } }, 1, TimeUnit.SECONDS); diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 02bfa4d05..c5f02724e 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -153,7 +153,7 @@ public class CommandBatchService extends CommandAsyncService { @Override public void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect, RFuture connFuture) { + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { if (executed.get()) { throw new IllegalStateException("Batch already has been executed!"); } @@ -187,7 +187,7 @@ public class CommandBatchService extends CommandAsyncService { throw new IllegalStateException("Data modification commands can't be used with queueStore=REDIS_READ_ATOMIC"); } - super.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, true, connFuture); + super.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, true); } AsyncSemaphore semaphore = new AsyncSemaphore(0); @@ -439,10 +439,9 @@ public class CommandBatchService extends CommandAsyncService { final CountableListener>> listener = new CountableListener>>(mainPromise, result); listener.setCounter(connections.size()); for (final Map.Entry entry : commands.entrySet()) { - ConnectionEntry connection = connections.get(entry.getKey()); final RPromise> execPromise = new RedissonPromise>(); async(false, new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC, - new Object[] {}, execPromise, 0, false, connection.getConnectionFuture()); + new Object[] {}, execPromise, 0, false); execPromise.addListener(new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { @@ -647,6 +646,8 @@ public class CommandBatchService extends CommandAsyncService { final RPromise attemptPromise = new RedissonPromise(); final AsyncDetails details = new AsyncDetails(); + details.init(null, attemptPromise, + entry.isReadOnlyMode(), source, null, null, null, mainPromise, attempt); final RFuture connectionFuture; if (entry.isReadOnlyMode()) { @@ -751,7 +752,7 @@ public class CommandBatchService extends CommandAsyncService { @Override public void operationComplete(Future connFuture) throws Exception { checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(), - options.getResponseTimeout(), attempts, options.getExecutionMode()); + options.getResponseTimeout(), attempts, options.getExecutionMode(), slots); } }); @@ -808,8 +809,8 @@ public class CommandBatchService extends CommandAsyncService { } } - private void checkWriteFuture(Entry entry, final RPromise attemptPromise, AsyncDetails details, - final RedisConnection connection, ChannelFuture future, long responseTimeout, int attempts) { + private void checkWriteFuture(final Entry entry, final RPromise attemptPromise, final AsyncDetails details, + final RedisConnection connection, ChannelFuture future, long responseTimeout, int attempts, final AtomicInteger slots, final RPromise mainPromise) { if (future.isCancelled() || attemptPromise.isDone()) { return; } @@ -817,7 +818,9 @@ public class CommandBatchService extends CommandAsyncService { if (!future.isSuccess()) { details.setException(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause())); if (details.getAttempt() == attempts) { - attemptPromise.tryFailure(details.getException()); + if (!attemptPromise.tryFailure(details.getException())) { + log.error(details.getException().getMessage()); + } } return; } @@ -827,6 +830,21 @@ public class CommandBatchService extends CommandAsyncService { TimerTask timerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { + if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) { + if (!details.getAttemptPromise().cancel(false)) { + return; + } + + int count = details.getAttempt() + 1; + if (log.isDebugEnabled()) { + log.debug("attempt {} for command {} and params {}", + count, details.getCommand(), LogHelper.toString(details.getParams())); + } + details.removeMainPromiseListener(); + execute(entry, details.getSource(), mainPromise, slots, count, options); + return; + } + attemptPromise.tryFailure( new RedisResponseTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel())); } @@ -842,7 +860,8 @@ public class CommandBatchService extends CommandAsyncService { private void checkConnectionFuture(final Entry entry, final NodeSource source, final RPromise mainPromise, final RPromise attemptPromise, final AsyncDetails details, - RFuture connFuture, final boolean noResult, final long responseTimeout, final int attempts, ExecutionMode executionMode) { + RFuture connFuture, final boolean noResult, final long responseTimeout, final int attempts, + ExecutionMode executionMode, final AtomicInteger slots) { if (connFuture.isCancelled()) { return; } @@ -881,7 +900,7 @@ public class CommandBatchService extends CommandAsyncService { details.getWriteFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - checkWriteFuture(entry, attemptPromise, details, connection, future, responseTimeout, attempts); + checkWriteFuture(entry, attemptPromise, details, connection, future, responseTimeout, attempts, slots, mainPromise); } }); diff --git a/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java index c40b324cc..51ea0b61b 100644 --- a/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveBatchService.java @@ -24,7 +24,6 @@ import org.redisson.api.BatchOptions; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonReactiveClient; -import org.redisson.client.RedisConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.command.CommandAsyncExecutor; @@ -64,8 +63,8 @@ public class CommandReactiveBatchService extends CommandReactiveService { @Override public void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect, RFuture connFuture) { - batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect, connFuture); + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { + batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect); } public RFuture> executeAsync(BatchOptions options) { diff --git a/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java b/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java index ebe3f0845..e15583d29 100644 --- a/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java +++ b/redisson/src/main/java/org/redisson/rx/CommandRxBatchService.java @@ -25,7 +25,6 @@ import org.redisson.api.BatchOptions; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonRxClient; -import org.redisson.client.RedisConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.command.CommandAsyncExecutor; @@ -65,8 +64,8 @@ public class CommandRxBatchService extends CommandRxService { @Override public void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect, RFuture connFuture) { - batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect, connFuture); + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { + batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect); } public RFuture> executeAsync(BatchOptions options) { diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index f6fe65a0a..06f3a3bc7 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -150,7 +150,7 @@ public class RedissonBatchTest extends BaseTest { BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC); RBatch batch = redisson.createBatch(batchOptions); - for (int i = 0; i < 300000; i++) { + for (int i = 0; i < 400000; i++) { batch.getBucket("test").setAsync(123); } @@ -171,6 +171,8 @@ public class RedissonBatchTest extends BaseTest { assertThat(redisson.getBucket("test1").get()).isEqualTo(1); assertThat(redisson.getBucket("test2").get()).isEqualTo(2); + + redisson.shutdown(); } @Test @@ -228,6 +230,7 @@ public class RedissonBatchTest extends BaseTest { assertThat(result.getSyncedSlaves()).isEqualTo(1); process.shutdown(); + redisson.shutdown(); } @Test @@ -238,7 +241,8 @@ public class RedissonBatchTest extends BaseTest { RBatch batch = redisson.createBatch(batchOptions); RMapCacheAsync map = batch.getMapCache("test"); - for (int i = 0; i < 200000; i++) { + int total = 200000; + for (int i = 0; i < total; i++) { RFuture f = map.putAsync("" + i, "" + i, 5, TimeUnit.MINUTES); if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { f.syncUninterruptibly(); @@ -246,7 +250,7 @@ public class RedissonBatchTest extends BaseTest { } batch.execute(); - assertThat(redisson.getMapCache("test").size()).isEqualTo(200000); + assertThat(redisson.getMapCache("test").size()).isEqualTo(total); redisson.shutdown(); } @@ -339,6 +343,7 @@ public class RedissonBatchTest extends BaseTest { } process.shutdown(); + redisson.shutdown(); } From bdb5e1cf43ca6532096dee98611af2417dc00b0a Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 20 Dec 2018 17:31:02 +0300 Subject: [PATCH 6/8] Fixed - NPE in CommandDecoder #1764 --- .../main/java/org/redisson/client/handler/CommandDecoder.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index b73dba41d..d825dc532 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -487,6 +487,9 @@ public class CommandDecoder extends ReplayingDecoder { Decoder decoder = data.getCommand().getReplayDecoder(); if (decoder == null) { + if (data.getCodec() == null) { + return StringCodec.INSTANCE.getValueDecoder(); + } if (data.getCommand().getOutParamType() == ValueType.MAP) { if (parts != null && parts.size() % 2 != 0) { return data.getCodec().getMapValueDecoder(); From 13abe16511bedfdbce4ca36e1df931c3a230dbba Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 24 Dec 2018 10:10:20 +0300 Subject: [PATCH 7/8] Fixed - CommandDecoder throws IndexOutOfBoundsException if pingConnectionInterval param is used #1497 Fixed - Unable to send command! error if pingConnectionInterval param is used #1632 --- .../client/RedisPubSubConnection.java | 4 +- .../client/handler/CommandDecoder.java | 144 ++++++------------ .../client/handler/CommandPubSubDecoder.java | 13 +- .../client/handler/CommandsQueue.java | 15 +- .../org/redisson/client/handler/State.java | 26 ++-- .../redisson/client/handler/StateLevel.java | 27 ++-- 6 files changed, 97 insertions(+), 132 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java b/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java index f5d5c1494..ff1a78de9 100644 --- a/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -34,6 +34,7 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -166,7 +167,8 @@ public class RedisPubSubConnection extends RedisConnection { } private ChannelFuture async(MultiDecoder messageDecoder, RedisCommand command, Object ... params) { - return channel.writeAndFlush(new CommandData(null, messageDecoder, null, command, params)); + RPromise promise = new RedissonPromise(); + return channel.writeAndFlush(new CommandData(promise, messageDecoder, null, command, params)); } public Map getChannels() { diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index d825dc532..6286448e3 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -51,9 +51,7 @@ import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder; -import org.redisson.client.protocol.decoder.SlotsDecoder; import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; import org.slf4j.Logger; @@ -87,19 +85,23 @@ public class CommandDecoder extends ReplayingDecoder { log.trace("reply: {}, channel: {}, command: {}", in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), ctx.channel(), data); } if (state() == null) { - boolean makeCheckpoint = data != null; - if (data != null) { - if (data instanceof CommandsData) { - makeCheckpoint = false; - } else { - CommandData cmd = (CommandData)data; - if (cmd.getCommand().getReplayMultiDecoder() != null - && (SlotsDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()) - || ListMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()))) { - makeCheckpoint = false; - } - } - } + boolean makeCheckpoint = false; +// commented out due to https://github.com/redisson/redisson/issues/1632. Reproduced with RedissonMapCacheTest +// +// boolean makeCheckpoint = data != null; +// if (data != null) { +// if (data instanceof CommandsData) { +// makeCheckpoint = false; +// } else { +// CommandData cmd = (CommandData)data; +// MultiDecoder decoder = cmd.getCommand().getReplayMultiDecoder(); +// if (decoder != null +// && (decoder instanceof SlotsDecoder +// || decoder instanceof ListMultiDecoder)) { +// makeCheckpoint = false; +// } +// } +// } state(new State(makeCheckpoint)); } @@ -122,10 +124,10 @@ public class CommandDecoder extends ReplayingDecoder { if (data instanceof CommandData) { CommandData cmd = (CommandData)data; try { - if (state().getLevels().size() > 0) { + if (state().isMakeCheckpoint()) { decodeFromCheckpoint(ctx, in, data, cmd); } else { - decode(in, cmd, null, ctx.channel(), false); + decode(in, cmd, null, ctx, false); } sendNext(ctx, data); } catch (Exception e) { @@ -146,7 +148,7 @@ public class CommandDecoder extends ReplayingDecoder { } else { try { while (in.writerIndex() > in.readerIndex()) { - decode(in, null, null, ctx.channel(), false); + decode(in, null, null, ctx, false); } sendNext(ctx); } catch (Exception e) { @@ -164,54 +166,18 @@ public class CommandDecoder extends ReplayingDecoder { protected void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, CommandData cmd) throws IOException { - if (state().getLevels().size() == 2) { - StateLevel secondLevel = state().getLevels().get(1); - - if (secondLevel.getParts().isEmpty()) { - state().getLevels().remove(1); - } - } + StateLevel level = state().getLastLevel(); - if (state().getLevels().size() == 2) { - StateLevel firstLevel = state().getLevels().get(0); - StateLevel secondLevel = state().getLevels().get(1); - - decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts(), false); - - MultiDecoder decoder = messageDecoder(cmd, firstLevel.getParts()); - if (decoder != null) { - Object result = decoder.decode(firstLevel.getParts(), state()); - if (data != null) { - handleResult(cmd, null, result, true, ctx.channel()); - } - } + List prevParts = null; + if (state().getLevels().size() > 1) { + StateLevel prevLevel = state().getLevels().get(state().getLevel() - 1); + prevParts = prevLevel.getParts(); } - if (state().getLevels().size() == 1) { - StateLevel firstLevel = state().getLevels().get(0); - if (firstLevel.getParts().isEmpty() && firstLevel.getLastList() == null) { - state().resetLevel(); - decode(in, cmd, null, ctx.channel(), false); - } else { - if (firstLevel.getLastList() != null) { - if (firstLevel.getLastList().isEmpty()) { - decode(in, cmd, firstLevel.getParts(), ctx.channel(), false); - } else { - decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), firstLevel.getLastListSize(), firstLevel.getLastList(), false); - } - firstLevel.setLastList(null); - firstLevel.setLastListSize(0); - - while (in.isReadable() && firstLevel.getParts().size() < firstLevel.getSize()) { - decode(in, cmd, firstLevel.getParts(), ctx.channel(), false); - } - decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts(), false); - } else { - while (firstLevel.getSize() == firstLevel.getParts().size() && in.isReadable()) { - decode(in, cmd, firstLevel.getParts(), ctx.channel(), false); - } - decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts(), false); - } - } + + decodeList(in, cmd, prevParts, ctx, level.getSize(), level.getParts(), false); + + if (state().getLastLevel() == level) { + state().removeLastLevel(); } } @@ -244,7 +210,7 @@ public class CommandDecoder extends ReplayingDecoder { } try { - decode(in, commandData, null, ctx.channel(), skipConvertor); + decode(in, commandData, null, ctx, skipConvertor); } finally { if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) { commandsData.remove(); @@ -302,8 +268,9 @@ public class CommandDecoder extends ReplayingDecoder { } } - protected void decode(ByteBuf in, CommandData data, List parts, Channel channel, boolean skipConvertor) throws IOException { + protected void decode(ByteBuf in, CommandData data, List parts, ChannelHandlerContext ctx, boolean skipConvertor) throws IOException { int code = in.readByte(); + Channel channel = ctx.channel(); if (code == '+') { ByteBuf rb = in.readBytes(in.bytesBefore((byte) '\r')); try { @@ -365,33 +332,22 @@ public class CommandDecoder extends ReplayingDecoder { handleResult(data, parts, result, false, channel); } else if (code == '*') { long size = readLong(in); - List respParts; + final List respParts = new ArrayList(); - StateLevel lastLevel = state().getLastLevel(); - if (lastLevel != null && lastLevel.getSize() != lastLevel.getParts().size()) { - respParts = new ArrayList(); - lastLevel.setLastListSize(size); - lastLevel.setLastList(respParts); - } else { - int level = state().incLevel(); - if (state().getLevels().size()-1 >= level) { - StateLevel stateLevel = state().getLevels().get(level); - respParts = stateLevel.getParts(); - size = stateLevel.getSize(); - } else { - respParts = new ArrayList(); - if (state().isMakeCheckpoint()) { - state().addLevel(new StateLevel(size, respParts)); - } - } + StateLevel lastLevel = null; + if (state().isMakeCheckpoint()) { + lastLevel = new StateLevel(size, respParts); + state().addLevel(lastLevel); } - decodeList(in, data, parts, channel, size, respParts, skipConvertor); + decodeList(in, data, parts, ctx, size, respParts, skipConvertor); - if (lastLevel != null && lastLevel.getLastList() != null) { - lastLevel.setLastList(null); - lastLevel.setLastListSize(0); + if (state().isMakeCheckpoint()) { + if (lastLevel == state().getLastLevel() && lastLevel.isFull()) { + state().removeLastLevel(); + } } + } else { String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8); throw new IllegalStateException("Can't decode replay: " + dataStr); @@ -400,7 +356,7 @@ public class CommandDecoder extends ReplayingDecoder { @SuppressWarnings("unchecked") private void decodeList(ByteBuf in, CommandData data, List parts, - Channel channel, long size, List respParts, boolean skipConvertor) + ChannelHandlerContext ctx, long size, List respParts, boolean skipConvertor) throws IOException { if (parts == null && commandsData.get() != null) { List> commands = commandsData.get(); @@ -410,7 +366,7 @@ public class CommandDecoder extends ReplayingDecoder { suffix = 1; } CommandData commandData = (CommandData) commands.get(i+suffix); - decode(in, commandData, respParts, channel, skipConvertor); + decode(in, commandData, respParts, ctx, skipConvertor); if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) { data.tryFailure(commandData.cause()); } @@ -421,7 +377,7 @@ public class CommandDecoder extends ReplayingDecoder { } } else { for (int i = respParts.size(); i < size; i++) { - decode(in, data, respParts, channel, skipConvertor); + decode(in, data, respParts, ctx, skipConvertor); if (state().isMakeCheckpoint()) { checkpoint(); } @@ -434,13 +390,13 @@ public class CommandDecoder extends ReplayingDecoder { } Object result = decoder.decode(respParts, state()); - decodeResult(data, parts, channel, result); + decodeResult(data, parts, ctx, result); } - protected void decodeResult(CommandData data, List parts, Channel channel, + protected void decodeResult(CommandData data, List parts, ChannelHandlerContext ctx, Object result) throws IOException { if (data != null) { - handleResult(data, parts, result, true, channel); + handleResult(data, parts, result, true, ctx.channel()); } } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index b3ecfb034..df7d9a7f6 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -39,7 +39,6 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import io.netty.util.internal.PlatformDependent; @@ -75,7 +74,7 @@ public class CommandPubSubDecoder extends CommandDecoder { if (data == null) { try { while (in.writerIndex() > in.readerIndex()) { - decode(in, null, null, ctx.channel(), false); + decode(in, null, null, ctx, false); } sendNext(ctx); } catch (Exception e) { @@ -86,11 +85,11 @@ public class CommandPubSubDecoder extends CommandDecoder { } else if (data instanceof CommandData) { CommandData cmd = (CommandData)data; try { - if (state().getLevels().size() > 0) { + if (state().isMakeCheckpoint()) { decodeFromCheckpoint(ctx, in, data, cmd); } else { while (in.writerIndex() > in.readerIndex()) { - decode(in, cmd, null, ctx.channel(), false); + decode(in, cmd, null, ctx, false); } } sendNext(ctx, data); @@ -104,7 +103,7 @@ public class CommandPubSubDecoder extends CommandDecoder { } @Override - protected void decodeResult(CommandData data, List parts, Channel channel, + protected void decodeResult(CommandData data, List parts, ChannelHandlerContext ctx, final Object result) throws IOException { if (executor.isShutdown()) { return; @@ -113,7 +112,7 @@ public class CommandPubSubDecoder extends CommandDecoder { if (result instanceof Message) { checkpoint(); - final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel); + final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(ctx.channel()); ChannelName channelName = ((Message) result).getChannel(); if (result instanceof PubSubStatusMessage) { String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase(); @@ -161,7 +160,7 @@ public class CommandPubSubDecoder extends CommandDecoder { } } else { if (data != null && data.getCommand().getName().equals("PING")) { - super.decodeResult(data, parts, channel, result); + super.decodeResult(data, parts, ctx, result); } } } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java index 8d7f35e8e..756d3b38b 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -65,8 +65,19 @@ public class CommandsQueue extends ChannelDuplexHandler { }; public void sendNextCommand(Channel channel) { - channel.attr(CommandsQueue.CURRENT_COMMAND).set(null); - queue.poll(); + QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).getAndSet(null); + if (command != null) { + queue.poll(); + } else { + QueueCommandHolder c = queue.peek(); + if (c != null) { + QueueCommand data = c.getCommand(); + List> pubSubOps = data.getPubSubOperations(); + if (!pubSubOps.isEmpty()) { + queue.poll(); + } + } + } sendData(channel); } diff --git a/redisson/src/main/java/org/redisson/client/handler/State.java b/redisson/src/main/java/org/redisson/client/handler/State.java index 18706dc0b..57c794666 100644 --- a/redisson/src/main/java/org/redisson/client/handler/State.java +++ b/redisson/src/main/java/org/redisson/client/handler/State.java @@ -21,6 +21,11 @@ import java.util.List; import org.redisson.client.protocol.decoder.DecoderState; +/** + * + * @author Nikita Koksharov + * + */ public class State { private int batchIndex; @@ -37,18 +42,11 @@ public class State { public boolean isMakeCheckpoint() { return makeCheckpoint; } - - public void resetLevel() { - level = -1; - levels.clear(); - } - public int decLevel() { - return --level; - } - public int incLevel() { - return ++level; - } + public int getLevel() { + return level; + } + public StateLevel getLastLevel() { if (levels == null || levels.isEmpty()) { return null; @@ -61,7 +59,13 @@ public class State { levels = new ArrayList(2); } levels.add(stateLevel); + level++; } + public void removeLastLevel() { + levels.remove(level); + level--; + } + public List getLevels() { if (levels == null) { return Collections.emptyList(); diff --git a/redisson/src/main/java/org/redisson/client/handler/StateLevel.java b/redisson/src/main/java/org/redisson/client/handler/StateLevel.java index 210ecc19f..5c821ac6c 100644 --- a/redisson/src/main/java/org/redisson/client/handler/StateLevel.java +++ b/redisson/src/main/java/org/redisson/client/handler/StateLevel.java @@ -17,12 +17,15 @@ package org.redisson.client.handler; import java.util.List; +/** + * + * @author Nikita Koksharov + * + */ public class StateLevel { - private long size; - private List parts; - private long lastListSize; - private List lastList; + private final long size; + private final List parts; public StateLevel(long size, List parts) { super(); @@ -30,20 +33,10 @@ public class StateLevel { this.parts = parts; } - public long getLastListSize() { - return lastListSize; + public boolean isFull() { + return size == parts.size(); } - public void setLastListSize(long lastListSize) { - this.lastListSize = lastListSize; - } - - public List getLastList() { - return lastList; - } - public void setLastList(List lastList) { - this.lastList = lastList; - } - + public long getSize() { return size; } From 8ad76f2c30c7fff17c87c40f342edff913c4aff7 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 24 Dec 2018 10:33:23 +0300 Subject: [PATCH 8/8] refactoring --- .../org/redisson/spring/data/connection/BinaryConvertor.java | 4 ++-- .../redisson/spring/data/connection/DataTypeConvertor.java | 4 ++-- .../org/redisson/spring/data/connection/SecondsConvertor.java | 4 ++-- .../org/redisson/spring/data/connection/BinaryConvertor.java | 4 ++-- .../redisson/spring/data/connection/DataTypeConvertor.java | 4 ++-- .../redisson/spring/data/connection/DistanceConvertor.java | 4 ++-- .../org/redisson/spring/data/connection/SecondsConvertor.java | 4 ++-- .../org/redisson/spring/data/connection/BinaryConvertor.java | 4 ++-- .../redisson/spring/data/connection/DataTypeConvertor.java | 4 ++-- .../redisson/spring/data/connection/DistanceConvertor.java | 4 ++-- .../org/redisson/spring/data/connection/SecondsConvertor.java | 4 ++-- .../org/redisson/spring/data/connection/BinaryConvertor.java | 4 ++-- .../redisson/spring/data/connection/DataTypeConvertor.java | 4 ++-- .../redisson/spring/data/connection/DistanceConvertor.java | 4 ++-- .../org/redisson/spring/data/connection/SecondsConvertor.java | 4 ++-- 15 files changed, 30 insertions(+), 30 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java index ba5e69662..0b19295b4 100644 --- a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java +++ b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import io.netty.util.CharsetUtil; @@ -24,7 +24,7 @@ import io.netty.util.CharsetUtil; * @author Nikita Koksharov * */ -public class BinaryConvertor extends SingleConvertor { +public class BinaryConvertor implements Convertor { @Override public Object convert(Object obj) { diff --git a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java index 536c43006..de500a69f 100644 --- a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java +++ b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import org.springframework.data.redis.connection.DataType; /** @@ -23,7 +23,7 @@ import org.springframework.data.redis.connection.DataType; * @author Nikita Koksharov * */ -public class DataTypeConvertor extends SingleConvertor { +public class DataTypeConvertor implements Convertor { @Override public DataType convert(Object obj) { diff --git a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java index d7e9d814e..42f9def12 100644 --- a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java +++ b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java @@ -17,14 +17,14 @@ package org.redisson.spring.data.connection; import java.util.concurrent.TimeUnit; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; /** * * @author Nikita Koksharov * */ -public class SecondsConvertor extends SingleConvertor { +public class SecondsConvertor implements Convertor { private final TimeUnit unit; private final TimeUnit source; diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java index ba5e69662..0b19295b4 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import io.netty.util.CharsetUtil; @@ -24,7 +24,7 @@ import io.netty.util.CharsetUtil; * @author Nikita Koksharov * */ -public class BinaryConvertor extends SingleConvertor { +public class BinaryConvertor implements Convertor { @Override public Object convert(Object obj) { diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java index 536c43006..de500a69f 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import org.springframework.data.redis.connection.DataType; /** @@ -23,7 +23,7 @@ import org.springframework.data.redis.connection.DataType; * @author Nikita Koksharov * */ -public class DataTypeConvertor extends SingleConvertor { +public class DataTypeConvertor implements Convertor { @Override public DataType convert(Object obj) { diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java index c44ce44d8..f14717b90 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import org.springframework.data.geo.Distance; import org.springframework.data.geo.Metric; @@ -24,7 +24,7 @@ import org.springframework.data.geo.Metric; * @author Nikita Koksharov * */ -public class DistanceConvertor extends SingleConvertor { +public class DistanceConvertor implements Convertor { private final Metric metric; diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java index d7e9d814e..42f9def12 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java @@ -17,14 +17,14 @@ package org.redisson.spring.data.connection; import java.util.concurrent.TimeUnit; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; /** * * @author Nikita Koksharov * */ -public class SecondsConvertor extends SingleConvertor { +public class SecondsConvertor implements Convertor { private final TimeUnit unit; private final TimeUnit source; diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java index ba5e69662..0b19295b4 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import io.netty.util.CharsetUtil; @@ -24,7 +24,7 @@ import io.netty.util.CharsetUtil; * @author Nikita Koksharov * */ -public class BinaryConvertor extends SingleConvertor { +public class BinaryConvertor implements Convertor { @Override public Object convert(Object obj) { diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java index 536c43006..de500a69f 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import org.springframework.data.redis.connection.DataType; /** @@ -23,7 +23,7 @@ import org.springframework.data.redis.connection.DataType; * @author Nikita Koksharov * */ -public class DataTypeConvertor extends SingleConvertor { +public class DataTypeConvertor implements Convertor { @Override public DataType convert(Object obj) { diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java index c44ce44d8..f14717b90 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import org.springframework.data.geo.Distance; import org.springframework.data.geo.Metric; @@ -24,7 +24,7 @@ import org.springframework.data.geo.Metric; * @author Nikita Koksharov * */ -public class DistanceConvertor extends SingleConvertor { +public class DistanceConvertor implements Convertor { private final Metric metric; diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java index d7e9d814e..42f9def12 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java @@ -17,14 +17,14 @@ package org.redisson.spring.data.connection; import java.util.concurrent.TimeUnit; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; /** * * @author Nikita Koksharov * */ -public class SecondsConvertor extends SingleConvertor { +public class SecondsConvertor implements Convertor { private final TimeUnit unit; private final TimeUnit source; diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java index ba5e69662..0b19295b4 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/BinaryConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import io.netty.util.CharsetUtil; @@ -24,7 +24,7 @@ import io.netty.util.CharsetUtil; * @author Nikita Koksharov * */ -public class BinaryConvertor extends SingleConvertor { +public class BinaryConvertor implements Convertor { @Override public Object convert(Object obj) { diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java index 536c43006..de500a69f 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/DataTypeConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import org.springframework.data.redis.connection.DataType; /** @@ -23,7 +23,7 @@ import org.springframework.data.redis.connection.DataType; * @author Nikita Koksharov * */ -public class DataTypeConvertor extends SingleConvertor { +public class DataTypeConvertor implements Convertor { @Override public DataType convert(Object obj) { diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java index c44ce44d8..f14717b90 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/DistanceConvertor.java @@ -15,7 +15,7 @@ */ package org.redisson.spring.data.connection; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; import org.springframework.data.geo.Distance; import org.springframework.data.geo.Metric; @@ -24,7 +24,7 @@ import org.springframework.data.geo.Metric; * @author Nikita Koksharov * */ -public class DistanceConvertor extends SingleConvertor { +public class DistanceConvertor implements Convertor { private final Metric metric; diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java index d7e9d814e..42f9def12 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/SecondsConvertor.java @@ -17,14 +17,14 @@ package org.redisson.spring.data.connection; import java.util.concurrent.TimeUnit; -import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.Convertor; /** * * @author Nikita Koksharov * */ -public class SecondsConvertor extends SingleConvertor { +public class SecondsConvertor implements Convertor { private final TimeUnit unit; private final TimeUnit source;