From 0c7ea7f46ba5b6809556c4c257389164d6a6f380 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 8 Nov 2019 15:07:50 +0300 Subject: [PATCH] refactoring --- .../redisson/command/CommandAsyncService.java | 2 +- .../redisson/connection/MasterSlaveEntry.java | 52 ++++++------------- .../RedissonBoundedBlockingQueueTest.java | 2 +- .../RedissonLiveObjectServiceTest.java | 6 --- 4 files changed, 19 insertions(+), 43 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index e8b20d5ce..708e97d2b 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -145,7 +145,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { try { future.await(); } catch (InterruptedException e) { - ((RPromise)future).tryFailure(e); + ((RPromise) future).tryFailure(e); throw e; } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 070e36534..0171e6d74 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -15,17 +15,12 @@ */ package org.redisson.connection; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; - +import io.netty.channel.ChannelFuture; import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; +import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; @@ -37,17 +32,16 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterPubSubConnectionPool; -import org.redisson.misc.CountableListener; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedisURI; -import org.redisson.misc.RedissonPromise; -import org.redisson.misc.TransferListener; +import org.redisson.misc.*; import org.redisson.pubsub.PubSubConnectionEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -238,33 +232,21 @@ public class MasterSlaveEntry { log.error("Can't resubscribe blocking queue " + commandData, e); return; } - - AtomicBoolean skip = new AtomicBoolean(); - BiConsumer listener = new BiConsumer() { - @Override - public void accept(Object t, Throwable u) { - if (skip.get()) { - return; - } - releaseWrite(newConnection); - } - }; - commandData.getPromise().onComplete(listener); + if (commandData.getPromise().isDone()) { + releaseWrite(newConnection); return; } + ChannelFuture channelFuture = newConnection.send(commandData); - channelFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - listener.accept(null, null); - skip.set(true); - releaseWrite(newConnection); - log.error("Can't resubscribe blocking queue {}", commandData); - } + channelFuture.addListener(future -> { + if (!future.isSuccess()) { + commandData.getPromise().tryFailure(new RedisException("Can't resubscribe blocking queue " + commandData + " to " + newConnection)); } }); + commandData.getPromise().onComplete((r, ex) -> { + releaseWrite(newConnection); + }); }); } diff --git a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java index db4dfb7b7..e2e8cf938 100644 --- a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java @@ -542,7 +542,7 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest { long startTime = System.currentTimeMillis(); Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName()); - assertThat(System.currentTimeMillis() - startTime).isBetween(3000L, 3200L); + assertThat(System.currentTimeMillis() - startTime).isBetween(2900L, 3200L); assertThat(value).isEqualTo(3); assertThat(queue2).containsExactly(3, 4, 5, 6); } diff --git a/redisson/src/test/java/org/redisson/RedissonLiveObjectServiceTest.java b/redisson/src/test/java/org/redisson/RedissonLiveObjectServiceTest.java index d17c7dfb0..48a5bc2b2 100644 --- a/redisson/src/test/java/org/redisson/RedissonLiveObjectServiceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLiveObjectServiceTest.java @@ -1340,12 +1340,6 @@ public class RedissonLiveObjectServiceTest extends BaseTest { assertTrue(ConcurrentHashMap.class.isAssignableFrom(ts.getContent().getClass())); assertFalse(RMap.class.isAssignableFrom(ts.getContent().getClass())); - ArrayBlockingQueue abq = new ArrayBlockingQueue<>(10); - abq.add("111"); - ts.setContent(abq); - assertTrue(ArrayBlockingQueue.class.isAssignableFrom(ts.getContent().getClass())); - assertFalse(RBlockingQueue.class.isAssignableFrom(ts.getContent().getClass())); - ConcurrentLinkedQueue clq = new ConcurrentLinkedQueue<>(); ts.setContent(clq); assertTrue(ConcurrentLinkedQueue.class.isAssignableFrom(ts.getContent().getClass()));