From b23643a3400c7c48aaaf7f896a267de9f2871bc3 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 1 Oct 2018 18:44:09 +0300 Subject: [PATCH 1/2] refactoring --- .../java/org/redisson/RedissonReactive.java | 9 +- .../reactive/RedissonBatchReactive.java | 10 +- .../reactive/RedissonSetCacheReactive.java | 129 ++---------------- .../reactive/RedissonTransactionReactive.java | 9 +- .../RedissonSetCacheReactiveTest.java | 1 - 5 files changed, 36 insertions(+), 122 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 6f26c1074..45aac291a 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -50,6 +50,7 @@ import org.redisson.api.RReadWriteLockReactive; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScriptReactive; import org.redisson.api.RSemaphoreReactive; +import org.redisson.api.RSetCache; import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetMultimapReactive; import org.redisson.api.RSetReactive; @@ -352,12 +353,16 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RSetCacheReactive getSetCache(String name) { - return new RedissonSetCacheReactive(evictionScheduler, commandExecutor, name); + RSetCache set = new RedissonSetCache(evictionScheduler, commandExecutor, name, null); + return ReactiveProxyBuilder.create(commandExecutor, set, + new RedissonSetCacheReactive(commandExecutor, set), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { - return new RedissonSetCacheReactive(codec, evictionScheduler, commandExecutor, name); + RSetCache set = new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null); + return ReactiveProxyBuilder.create(commandExecutor, set, + new RedissonSetCacheReactive(commandExecutor, set), RSetCacheReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index c2bca334a..6def46e3a 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -37,6 +37,7 @@ import org.redisson.RedissonQueue; import org.redisson.RedissonScoredSortedSet; import org.redisson.RedissonScript; import org.redisson.RedissonSet; +import org.redisson.RedissonSetCache; import org.redisson.RedissonSetMultimap; import org.redisson.RedissonStream; import org.redisson.api.BatchOptions; @@ -62,6 +63,7 @@ import org.redisson.api.RMapReactive; import org.redisson.api.RQueueReactive; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScriptReactive; +import org.redisson.api.RSetCache; import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetMultimapReactive; import org.redisson.api.RSetReactive; @@ -230,12 +232,16 @@ public class RedissonBatchReactive implements RBatchReactive { @Override public RSetCacheReactive getSetCache(String name) { - return new RedissonSetCacheReactive(evictionScheduler, executorService, name); + RSetCache set = new RedissonSetCache(evictionScheduler, executorService, name, null); + return ReactiveProxyBuilder.create(executorService, set, + new RedissonSetCacheReactive(executorService, set), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { - return new RedissonSetCacheReactive(codec, evictionScheduler, executorService, name); + RSetCache set = new RedissonSetCache(codec, evictionScheduler, executorService, name, null); + return ReactiveProxyBuilder.create(executorService, set, + new RedissonSetCacheReactive(executorService, set), RSetCacheReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index d33091a69..8e193b224 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -19,24 +19,20 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.RedissonSetCache; import org.redisson.ScanIterator; import org.redisson.api.RFuture; -import org.redisson.api.RSetCacheAsync; -import org.redisson.api.RSetCacheReactive; +import org.redisson.api.RSetCache; import org.redisson.client.RedisClient; -import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandReactiveExecutor; -import org.redisson.eviction.EvictionScheduler; import io.netty.buffer.ByteBuf; -import reactor.fn.Supplier; +import reactor.rx.Streams; /** *

Set-based cache with ability to set TTL for each entry via @@ -58,151 +54,54 @@ import reactor.fn.Supplier; * * @param value */ -public class RedissonSetCacheReactive extends RedissonExpirableReactive implements RSetCacheReactive { +public class RedissonSetCacheReactive { - private final RSetCacheAsync instance; + private final RSetCache instance; + private final CommandReactiveExecutor commandExecutor; - public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { - this(commandExecutor, name, new RedissonSetCache(evictionScheduler, commandExecutor, name, null)); - } - - public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync instance) { - super(commandExecutor, name, instance); - this.instance = instance; - } - - public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { - this(codec, commandExecutor, name, new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null)); - } - - public RedissonSetCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RSetCacheAsync instance) { - super(codec, commandExecutor, name, instance); + public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, RSetCache instance) { + this.commandExecutor = commandExecutor; this.instance = instance; } - - @Override - public Publisher size() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.sizeAsync(); - } - }); - } - - @Override - public Publisher contains(final Object o) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.containsAsync(o); - } - }); - } - - @Override public Publisher iterator() { return new SetReactiveIterator() { @Override protected RFuture> scanIterator(RedisClient client, long nextIterPos) { - return ((ScanIterator)instance).scanIteratorAsync(getName(), client, nextIterPos, null, 10); + return ((ScanIterator)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, null, 10); } }; } - @Override - public Publisher add(final V value, final long ttl, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.addAsync(value, ttl, unit); - } - }); - } - - @Override public Publisher add(V value) { long timeoutDate = 92233720368547758L; - return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_INTEGER, + return commandExecutor.evalWriteReactive(instance.getName(), instance.getCodec(), RedisCommands.EVAL_INTEGER, "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " + "if expireDateScore ~= false and tonumber(expireDateScore) > tonumber(ARGV[1]) then " + "return 0;" + "end; " + "redis.call('zadd', KEYS[1], ARGV[2], ARGV[3]); " + "return 1; ", - Arrays.asList(getName()), System.currentTimeMillis(), timeoutDate, encode(value)); + Arrays.asList(instance.getName()), System.currentTimeMillis(), timeoutDate, ((RedissonSetCache)instance).encode(value)); } - @Override - public Publisher> readAll() { - return reactive(new Supplier>>() { - @Override - public RFuture> get() { - return instance.readAllAsync(); - } - }); - } - - @Override - public Publisher remove(final Object o) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.removeAsync(o); - } - }); - } - - @Override - public Publisher containsAll(final Collection c) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.containsAllAsync(c); - } - }); - } - - @Override public Publisher addAll(Collection c) { if (c.isEmpty()) { - return newSucceeded(0); + return Streams.just(0); } long score = 92233720368547758L - System.currentTimeMillis(); List params = new ArrayList(c.size()*2 + 1); - params.add(getName()); + params.add(instance.getName()); for (V value : c) { - ByteBuf objectState = encode(value); + ByteBuf objectState = ((RedissonSetCache)instance).encode(value); params.add(score); params.add(objectState); } - return commandExecutor.writeReactive(getName(), codec, RedisCommands.ZADD_RAW, params.toArray()); - } - - @Override - public Publisher retainAll(final Collection c) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.retainAllAsync(c); - } - }); - } - - @Override - public Publisher removeAll(final Collection c) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.removeAllAsync(c); - } - }); + return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.ZADD_RAW, params.toArray()); } - @Override public Publisher addAll(Publisher c) { return new PublisherAdder() { @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java index ab68ec051..1fd984412 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java @@ -23,6 +23,7 @@ import org.redisson.api.RMapCache; import org.redisson.api.RMapCacheReactive; import org.redisson.api.RMapReactive; import org.redisson.api.RSet; +import org.redisson.api.RSetCache; import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetReactive; import org.redisson.api.RTransaction; @@ -103,12 +104,16 @@ public class RedissonTransactionReactive implements RTransactionReactive { @Override public RSetCacheReactive getSetCache(String name) { - return new RedissonSetCacheReactive(executorService, name, transaction.getSetCache(name)); + RSetCache set = transaction.getSetCache(name); + return ReactiveProxyBuilder.create(executorService, set, + new RedissonSetCacheReactive(executorService, set), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { - return new RedissonSetCacheReactive(codec, executorService, name, transaction.getSetCache(name, codec)); + RSetCache set = transaction.getSetCache(name, codec); + return ReactiveProxyBuilder.create(executorService, set, + new RedissonSetCacheReactive(executorService, set), RSetCacheReactive.class); } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java index 9af9a65ac..9f47cc72b 100644 --- a/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java @@ -14,7 +14,6 @@ import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; import org.redisson.api.RSetCacheReactive; -import org.redisson.codec.MsgPackJacksonCodec; public class RedissonSetCacheReactiveTest extends BaseReactiveTest { From 03c541f8f7e17061ccca924d25ff8857a050bfd6 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 3 Oct 2018 19:00:14 +0300 Subject: [PATCH 2/2] Fixed - closed connections handling --- .../main/java/org/redisson/client/handler/CommandsQueue.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java index 9e1b0fa94..10846f36b 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -21,6 +21,7 @@ import java.util.Queue; import java.util.regex.Pattern; import org.redisson.client.ChannelName; +import org.redisson.client.RedisConnectionException; import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.QueueCommand; @@ -79,6 +80,10 @@ public class CommandsQueue extends ChannelDuplexHandler { command.getChannelPromise().tryFailure( new WriteRedisConnectionException("Channel has been closed! Can't write command: " + command.getCommand() + " to channel: " + ctx.channel())); + + if (command.getChannelPromise().isSuccess()) { + command.getCommand().tryFailure(new RedisConnectionException("Command succesfully sent, but channel " + ctx.channel() + " has been closed!")); + } } super.channelInactive(ctx);