From 707b419d97c20e0d86df8712ba05ed19edf628b3 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 12 Feb 2018 07:25:15 +0300 Subject: [PATCH 1/6] Fixed - org.redisson.RedissonTopic.removeAllListeners got blocked on invocation. #1268 --- .../org/redisson/RedissonPatternTopic.java | 16 ++++++++++++--- .../main/java/org/redisson/RedissonTopic.java | 16 ++++++++++++--- .../org/redisson/pubsub/AsyncSemaphore.java | 20 +++++++++++++++---- .../RedissonPatternTopicReactive.java | 12 ++++++++++- 4 files changed, 53 insertions(+), 11 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java index cc965006b..1c016a22f 100644 --- a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java @@ -23,8 +23,10 @@ import org.redisson.api.RPatternTopic; import org.redisson.api.listener.PatternMessageListener; import org.redisson.api.listener.PatternStatusListener; import org.redisson.client.RedisPubSubListener; +import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; import org.redisson.command.CommandExecutor; +import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.PubSubConnectionEntry; import org.redisson.pubsub.AsyncSemaphore; @@ -68,10 +70,18 @@ public class RedissonPatternTopic implements RPatternTopic { return System.identityHashCode(pubSubListener); } + protected void acquire(AsyncSemaphore semaphore) { + MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig(); + int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); + if (!semaphore.tryAcquire(timeout)) { + throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic"); + } + } + @Override public void removeListener(int listenerId) { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); - semaphore.acquireUninterruptibly(); + acquire(semaphore); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { @@ -90,7 +100,7 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public void removeAllListeners() { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); - semaphore.acquireUninterruptibly(); + acquire(semaphore); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { @@ -109,7 +119,7 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public void removeListener(PatternMessageListener listener) { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); - semaphore.acquireUninterruptibly(); + acquire(semaphore); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 6206bfc54..7feea2a69 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -24,9 +24,11 @@ import org.redisson.api.RTopic; import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.StatusListener; import org.redisson.client.RedisPubSubListener; +import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.PubSubConnectionEntry; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonObjectFactory; @@ -126,7 +128,7 @@ public class RedissonTopic implements RTopic { @Override public void removeAllListeners() { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); - semaphore.acquireUninterruptibly(); + acquire(semaphore); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { @@ -141,11 +143,19 @@ public class RedissonTopic implements RTopic { semaphore.release(); } } + + protected void acquire(AsyncSemaphore semaphore) { + MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig(); + int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); + if (!semaphore.tryAcquire(timeout)) { + throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic"); + } + } @Override public void removeListener(MessageListener listener) { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); - semaphore.acquireUninterruptibly(); + acquire(semaphore); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { @@ -165,7 +175,7 @@ public class RedissonTopic implements RTopic { @Override public void removeListener(int listenerId) { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); - semaphore.acquireUninterruptibly(); + acquire(semaphore); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { diff --git a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java index 3772d8b60..264309027 100644 --- a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java +++ b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java @@ -19,6 +19,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * @@ -34,19 +35,30 @@ public class AsyncSemaphore { counter = permits; } - public void acquireUninterruptibly() { + public boolean tryAcquire(long timeoutMillis) { final CountDownLatch latch = new CountDownLatch(1); - acquire(new Runnable() { + final Runnable listener = new Runnable() { @Override public void run() { latch.countDown(); } - }); + }; + acquire(listener); try { - latch.await(); + boolean res = latch.await(timeoutMillis, TimeUnit.MILLISECONDS); + if (!res) { + if (!remove(listener)) { + release(); + } + } + return res; } catch (InterruptedException e) { Thread.currentThread().interrupt(); + if (!remove(listener)) { + release(); + } + return false; } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java index c5d64b929..d349f2853 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java @@ -26,8 +26,10 @@ import org.redisson.api.RPatternTopicReactive; import org.redisson.api.listener.PatternMessageListener; import org.redisson.api.listener.PatternStatusListener; import org.redisson.client.RedisPubSubListener; +import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; import org.redisson.command.CommandReactiveExecutor; +import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.PubSubConnectionEntry; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -99,11 +101,19 @@ public class RedissonPatternTopicReactive implements RPatternTopicReactive } }); } + + protected void acquire(AsyncSemaphore semaphore) { + MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig(); + int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); + if (!semaphore.tryAcquire(timeout)) { + throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic"); + } + } @Override public void removeListener(int listenerId) { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); - semaphore.acquireUninterruptibly(); + acquire(semaphore); PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); if (entry == null) { From a481259aa5c83d37e3e54c3e6d2fee1b99f27bb1 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 12 Feb 2018 08:32:29 +0300 Subject: [PATCH 2/6] Fixed - possible pubsub listeners leak. #1268 --- .../MasterSlaveConnectionManager.java | 47 ++++++++++++------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 9f80bbc2e..e8d661155 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -54,7 +54,6 @@ import org.redisson.config.TransportMode; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; -import org.redisson.misc.TransferListener; import org.redisson.misc.URIBuilder; import org.redisson.pubsub.AsyncSemaphore; import org.slf4j.Logger; @@ -432,33 +431,35 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RFuture psubscribe(final String channelName, final Codec codec, final RedisPubSubListener... listeners) { - final AsyncSemaphore lock = getSemaphore(channelName); - final RPromise result = new RedissonPromise(); - lock.acquire(new Runnable() { - @Override - public void run() { - RFuture future = psubscribe(channelName, codec, lock, listeners); - future.addListener(new TransferListener(result)); - } - }); - return result; + public RFuture psubscribe(String channelName, Codec codec, RedisPubSubListener... listeners) { + return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, listeners); } + @Override public RFuture psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { RPromise promise = new RedissonPromise(); subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners); return promise; } - public RFuture subscribe(final Codec codec, final String channelName, final RedisPubSubListener... listeners) { + @Override + public RFuture subscribe(Codec codec, String channelName, RedisPubSubListener... listeners) { + return subscribe(PubSubType.SUBSCRIBE, codec, channelName, listeners); + } + + protected RFuture subscribe(PubSubType type, final Codec codec, final String channelName, + final RedisPubSubListener... listeners) { final AsyncSemaphore lock = getSemaphore(channelName); final RPromise result = new RedissonPromise(); lock.acquire(new Runnable() { @Override public void run() { - RFuture future = subscribe(codec, channelName, lock, listeners); - future.addListener(new TransferListener(result)); + if (result.isDone()) { + lock.release(); + return; + } + + subscribe(codec, channelName, result, type, lock, listeners); } }); return result; @@ -487,6 +488,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void run() { if (promise.isDone()) { + lock.release(); + freePubSubLock.release(); return; } @@ -536,8 +539,18 @@ public class MasterSlaveConnectionManager implements ConnectionManager { connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - lock.release(); - promise.trySuccess(connEntry); + if (!promise.trySuccess(connEntry)) { + for (RedisPubSubListener listener : listeners) { + connEntry.removeListener(channelName, listener); + } + if (!connEntry.hasListeners(channelName)) { + unsubscribe(channelName, lock); + } else { + lock.release(); + } + } else { + lock.release(); + } } }); } From 4e82a6eae67ce334c6169b0dd93b27aaf2434b5a Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 12 Feb 2018 10:41:56 +0300 Subject: [PATCH 3/6] refactoring --- .../connection/MasterSlaveConnectionManager.java | 2 +- .../org/redisson/connection/MasterSlaveEntry.java | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index e8d661155..c0f9222ae 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -447,7 +447,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return subscribe(PubSubType.SUBSCRIBE, codec, channelName, listeners); } - protected RFuture subscribe(PubSubType type, final Codec codec, final String channelName, + private RFuture subscribe(final PubSubType type, final Codec codec, final String channelName, final RedisPubSubListener... listeners) { final AsyncSemaphore lock = getSemaphore(channelName); final RPromise result = new RedissonPromise(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index bf37aebe1..ab3db3cbf 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -268,9 +268,12 @@ public class MasterSlaveEntry { @Override public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - log.debug("resubscribed listeners of '{}' channel to {}", channelName, future.getNow().getConnection().getRedisClient()); + if (!future.isSuccess()) { + subscribe(channelName, listeners, subscribeCodec); + return; } + + log.debug("resubscribed listeners of '{}' channel to '{}'", channelName, future.getNow().getConnection().getRedisClient()); } }); } @@ -292,7 +295,7 @@ public class MasterSlaveEntry { private void psubscribe(final String channelName, final Collection> listeners, final Codec subscribeCodec) { - RFuture subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, null); + RFuture subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()])); subscribeFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) @@ -302,11 +305,7 @@ public class MasterSlaveEntry { return; } - PubSubConnectionEntry newEntry = future.getNow(); - for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(channelName, redisPubSubListener); - } - log.debug("resubscribed listeners for '{}' channel-pattern", channelName); + log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", channelName, future.getNow().getConnection().getRedisClient()); } }); } From a3e8cc95a6aee1e340b5c23edc191766358cc3fc Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 13 Feb 2018 06:16:51 +0300 Subject: [PATCH 4/6] Fixed - RBatch throws NPE with big pipeline in atomic mode. #1294 --- .../client/protocol/BatchCommandData.java | 3 ++- .../java/org/redisson/RedissonBatchTest.java | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java b/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java index 2e59fbaad..e1e23a0bf 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.redisson.client.RedisRedirectException; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.StringCodec; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -35,7 +36,7 @@ public class BatchCommandData extends CommandData implements Compara private final AtomicReference redirectError = new AtomicReference(); public BatchCommandData(RedisCommand command, Object[] params, int index) { - this(new RedissonPromise(), null, command, params, index); + this(new RedissonPromise(), StringCodec.INSTANCE, command, params, index); } public BatchCommandData(RPromise promise, Codec codec, RedisCommand command, Object[] params, int index) { diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index aaa3a20be..c94b563e8 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -49,6 +49,22 @@ public class RedissonBatchTest extends BaseTest { List t = batch.execute(); System.out.println(t); } + + @Test + public void testBigRequestAtomic() { + RBatch batch = redisson.createBatch(); + batch.atomic(); + batch.timeout(15, TimeUnit.SECONDS); + batch.retryInterval(1, TimeUnit.SECONDS); + batch.retryAttempts(5); + for (int i = 0; i < 100; i++) { + batch.getBucket("" + i).setAsync(i); + batch.getBucket("" + i).getAsync(); + } + + BatchResult s = batch.execute(); + assertThat(s.getResponses().size()).isEqualTo(200); + } @Test public void testSyncSlaves() throws FailedToStartRedisException, IOException, InterruptedException { From 3db2deef3af0e3132adb4503ea57eb8da4a75684 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 13 Feb 2018 06:19:11 +0300 Subject: [PATCH 5/6] Fixed - Warning about CommandDecoder.decode() method #1295 --- .../java/org/redisson/client/handler/CommandDecoder.java | 7 +++++-- .../main/java/org/redisson/client/protocol/Decoder.java | 6 ++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 1aa24fdfe..169018023 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -93,7 +93,7 @@ public class CommandDecoder extends ReplayingDecoder { if (data == null) { while (in.writerIndex() > in.readerIndex()) { - decode(in, null, null, ctx.channel()); + decode(in, null, null, ctx.channel()); } } else if (data instanceof CommandData) { CommandData cmd = (CommandData)data; @@ -105,6 +105,7 @@ public class CommandDecoder extends ReplayingDecoder { } } catch (Exception e) { cmd.tryFailure(e); + throw e; } } else if (data instanceof CommandsData) { CommandsData commands = (CommandsData)data; @@ -112,6 +113,7 @@ public class CommandDecoder extends ReplayingDecoder { decodeCommandBatch(ctx, in, data, commands); } catch (Exception e) { commands.getPromise().tryFailure(e); + throw e; } return; } @@ -172,7 +174,7 @@ public class CommandDecoder extends ReplayingDecoder { } private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, - CommandsData commandBatch) { + CommandsData commandBatch) throws Exception { int i = state().getBatchIndex(); Throwable error = null; @@ -211,6 +213,7 @@ public class CommandDecoder extends ReplayingDecoder { } } catch (Exception e) { commandData.tryFailure(e); + throw e; } i++; if (commandData != null && !commandData.isSuccess()) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/Decoder.java b/redisson/src/main/java/org/redisson/client/protocol/Decoder.java index 18a8aaf78..7cda2f9a5 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/Decoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/Decoder.java @@ -21,6 +21,12 @@ import org.redisson.client.handler.State; import io.netty.buffer.ByteBuf; +/** + * + * @author Nikita Koksharov + * + * @param result type + */ public interface Decoder { R decode(ByteBuf buf, State state) throws IOException; From 387ed744b8bb49a49f9cd41adfd500c4592d2027 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 13 Feb 2018 11:40:15 +0300 Subject: [PATCH 6/6] Periodical sentinel cluster scanning added. #1280 --- .../client/protocol/RedisCommands.java | 2 + .../cluster/ClusterConnectionManager.java | 57 +--- .../config/SentinelServersConfig.java | 20 ++ .../connection/CountableListener.java | 20 +- .../MasterSlaveConnectionManager.java | 61 ++++ .../ReplicatedConnectionManager.java | 57 +--- .../connection/SentinelConnectionManager.java | 304 +++++++++++++++--- .../balancer/LoadBalancerManager.java | 10 +- .../test/java/org/redisson/RedisRunner.java | 11 +- .../java/org/redisson/RedissonTopicTest.java | 132 ++++++++ 10 files changed, 512 insertions(+), 162 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index ffffa9aab..1e8e8c5e2 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -324,6 +324,8 @@ public interface RedisCommands { RedisStrictCommand> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder()); RedisCommand>> SENTINEL_SLAVES = new RedisCommand>>("SENTINEL", "SLAVES", new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new ListResultReplayDecoder())); + RedisCommand>> SENTINEL_SENTINELS = new RedisCommand>>("SENTINEL", "SENTINELS", + new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new ListResultReplayDecoder())); RedisStrictCommand CLUSTER_ADDSLOTS = new RedisStrictCommand("CLUSTER", "ADDSLOTS"); RedisStrictCommand CLUSTER_REPLICATE = new RedisStrictCommand("CLUSTER", "REPLICATE"); diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 8e90bdb29..677ac83d5 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -74,8 +74,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private final Logger log = LoggerFactory.getLogger(getClass()); - private final Map nodeConnections = PlatformDependent.newConcurrentHashMap(); - private final ConcurrentMap lastPartitions = PlatformDependent.newConcurrentHashMap(); private ScheduledFuture monitorFuture; @@ -97,7 +95,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Throwable lastException = null; List failedMasters = new ArrayList(); for (URI addr : cfg.getNodeAddresses()) { - RFuture connectionFuture = connect(cfg, addr); + RFuture connectionFuture = connectToNode(cfg, addr, null); try { RedisConnection connection = connectionFuture.syncUninterruptibly().getNow(); @@ -186,43 +184,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return result; } - private void close(RedisConnection conn) { - if (nodeConnections.values().remove(conn)) { - conn.closeAsync(); - } - } - - private RFuture connect(ClusterServersConfig cfg, final URI addr) { - RedisConnection connection = nodeConnections.get(addr); - if (connection != null) { - return RedissonPromise.newSucceededFuture(connection); - } - - RedisClient client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts()); - final RPromise result = new RedissonPromise(); - RFuture future = client.connectAsync(); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } - - RedisConnection connection = future.getNow(); - if (connection.isActive()) { - nodeConnections.put(addr, connection); - result.trySuccess(connection); - } else { - connection.closeAsync(); - result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!")); - } - } - }); - - return result; - } - private RFuture>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) { if (partition.isMasterFail()) { RedisException e = new RedisException("Failed to add master: " + @@ -237,7 +198,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } final RPromise>> result = new RedissonPromise>>(); - RFuture connectionFuture = connect(cfg, partition.getMasterAddress()); + RFuture connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -390,7 +351,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return; } final URI uri = iterator.next(); - RFuture connectionFuture = connect(cfg, uri); + RFuture connectionFuture = connectToNode(cfg, uri, null); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -414,7 +375,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { public void operationComplete(Future> future) throws Exception { if (!future.isSuccess()) { log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause()); - close(connection); + closeNodeConnection(connection); getShutdownLatch().release(); scheduleClusterChangeCheck(cfg, iterator); return; @@ -782,15 +743,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { public void shutdown() { monitorFuture.cancel(true); - List> futures = new ArrayList>(); - for (RedisConnection connection : nodeConnections.values()) { - RFuture future = connection.getRedisClient().shutdownAsync(); - futures.add(future); - } - - for (RFuture future : futures) { - future.syncUninterruptibly(); - } + closeNodeConnections(); super.shutdown(); } diff --git a/redisson/src/main/java/org/redisson/config/SentinelServersConfig.java b/redisson/src/main/java/org/redisson/config/SentinelServersConfig.java index 340098078..8d36003fd 100644 --- a/redisson/src/main/java/org/redisson/config/SentinelServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/SentinelServersConfig.java @@ -35,6 +35,11 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig implements FutureListener { protected final RPromise result; protected final T value; + public CountableListener() { + this(null, null); + } + public CountableListener(RPromise result, T value) { super(); this.result = result; this.value = value; } + public void setCounter(int newValue) { + counter.set(newValue); + } + public void incCounter() { counter.incrementAndGet(); } @@ -46,13 +54,21 @@ public class CountableListener implements FutureListener { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - result.tryFailure(future.cause()); + if (result != null) { + result.tryFailure(future.cause()); + } return; } if (counter.decrementAndGet() == 0) { - result.trySuccess(value); + onSuccess(value); + if (result != null) { + result.trySuccess(value); + } } } + protected void onSuccess(T value) { + } + } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index c0f9222ae..1a9e8afc6 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -18,6 +18,7 @@ package org.redisson.connection; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -39,6 +40,7 @@ import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisClientConfig; import org.redisson.client.RedisConnection; +import org.redisson.client.RedisException; import org.redisson.client.RedisNodeNotFoundException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; @@ -162,6 +164,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected final DnsAddressResolverGroup resolverGroup; + private final Map nodeConnections = PlatformDependent.newConcurrentHashMap(); + { for (int i = 0; i < locks.length; i++) { locks[i] = new AsyncSemaphore(1); @@ -223,6 +227,63 @@ public class MasterSlaveConnectionManager implements ConnectionManager { this.commandExecutor = new CommandSyncService(this); } + protected void closeNodeConnections() { + List> futures = new ArrayList>(); + for (RedisConnection connection : nodeConnections.values()) { + RFuture future = connection.getRedisClient().shutdownAsync(); + futures.add(future); + } + + for (RFuture future : futures) { + future.syncUninterruptibly(); + } + } + + protected void closeNodeConnection(RedisConnection conn) { + if (nodeConnections.values().remove(conn)) { + conn.closeAsync(); + } + } + + protected RFuture connectToNode(BaseMasterSlaveServersConfig cfg, final URI addr, RedisClient client) { + final Object key; + if (client != null) { + key = client; + } else { + key = addr; + } + RedisConnection connection = nodeConnections.get(key); + if (connection != null) { + return RedissonPromise.newSucceededFuture(connection); + } + + if (addr != null) { + client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts()); + } + final RPromise result = new RedissonPromise(); + RFuture future = client.connectAsync(); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + RedisConnection connection = future.getNow(); + if (connection.isActive()) { + nodeConnections.put(key, connection); + result.trySuccess(connection); + } else { + connection.closeAsync(); + result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!")); + } + } + }); + + return result; + } + public boolean isClusterMode() { return false; } diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index ae0e9aa6e..b9cc5e76f 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -16,17 +16,12 @@ package org.redisson.connection; import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.api.NodeType; import org.redisson.api.RFuture; -import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisException; @@ -35,8 +30,6 @@ import org.redisson.config.BaseMasterSlaveServersConfig; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReplicatedServersConfig; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +53,6 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { private AtomicReference currentMaster = new AtomicReference(); - private final Map nodeConnections = new HashMap(); - private ScheduledFuture monitorFuture; private enum Role { @@ -76,7 +67,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { initTimer(this.config); for (URI addr : cfg.getNodeAddresses()) { - RFuture connectionFuture = connect(cfg, addr); + RFuture connectionFuture = connectToNode(cfg, addr, null); connectionFuture.awaitUninterruptibly(); RedisConnection connection = connectionFuture.getNow(); if (connection == null) { @@ -115,37 +106,6 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { return res; } - private RFuture connect(BaseMasterSlaveServersConfig cfg, final URI addr) { - RedisConnection connection = nodeConnections.get(addr); - if (connection != null) { - return RedissonPromise.newSucceededFuture(connection); - } - - RedisClient client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts()); - final RPromise result = new RedissonPromise(); - RFuture future = client.connectAsync(); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } - - RedisConnection connection = future.getNow(); - if (connection.isActive()) { - nodeConnections.put(addr, connection); - result.trySuccess(connection); - } else { - connection.closeAsync(); - result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!")); - } - } - }); - - return result; - } - private void scheduleMasterChangeCheck(final ReplicatedServersConfig cfg) { monitorFuture = group.schedule(new Runnable() { @Override @@ -159,7 +119,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { return; } - RFuture connectionFuture = connect(cfg, addr); + RFuture connectionFuture = connectToNode(cfg, addr, null); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -175,7 +135,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { return; } - RedisConnection connection = future.getNow(); + final RedisConnection connection = future.getNow(); RFuture> result = connection.async(RedisCommands.INFO_REPLICATION); result.addListener(new FutureListener>() { @Override @@ -183,6 +143,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { throws Exception { if (!future.isSuccess()) { log.error(future.cause().getMessage(), future.cause()); + closeNodeConnection(connection); if (count.decrementAndGet() == 0) { scheduleMasterChangeCheck(cfg); } @@ -215,15 +176,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { public void shutdown() { monitorFuture.cancel(true); - List> futures = new ArrayList>(); - for (RedisConnection connection : nodeConnections.values()) { - RFuture future = connection.getRedisClient().shutdownAsync(); - futures.add(future); - } - - for (RFuture future : futures) { - future.syncUninterruptibly(); - } + closeNodeConnections(); super.shutdown(); } } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index f952b73a0..115236d4a 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -17,11 +17,16 @@ package org.redisson.connection; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.NodeType; @@ -40,6 +45,7 @@ import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.SentinelServersConfig; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; +import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.URIBuilder; import org.slf4j.Logger; @@ -47,6 +53,7 @@ import org.slf4j.LoggerFactory; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.PlatformDependent; /** @@ -60,9 +67,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private final ConcurrentMap sentinels = PlatformDependent.newConcurrentHashMap(); private final AtomicReference currentMaster = new AtomicReference(); - private final ConcurrentMap slaves = PlatformDependent.newConcurrentHashMap(); + private final Set slaves = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); private final Set disconnectedSlaves = new HashSet(); + private ScheduledFuture monitorFuture; public SentinelConnectionManager(SentinelServersConfig cfg, Config config) { super(config); @@ -82,13 +90,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { continue; } - // TODO async List master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); String masterHost = createAddress(master.get(0), master.get(1)); this.config.setMasterAddress(masterHost); currentMaster.set(masterHost); log.info("master: {} added", masterHost); - slaves.put(masterHost, true); + slaves.add(masterHost); List> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); for (Map map : sentinelSlaves) { @@ -103,7 +110,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String host = createAddress(ip, port); this.config.addSlaveAddress(host); - slaves.put(host, true); + slaves.add(host); log.debug("slave {} state: {}", host, map); log.info("slave: {} added", host); @@ -113,6 +120,27 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { log.warn("slave: {} is down", host); } } + + List> sentinelSentinels = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName()); + List> connectionFutures = new ArrayList>(sentinelSentinels.size()); + for (Map map : sentinelSentinels) { + if (map.isEmpty()) { + continue; + } + + String ip = map.get("ip"); + String port = map.get("port"); + + String host = createAddress(ip, port); + URI sentinelAddr = URIBuilder.create(host); + RFuture future = registerSentinel(cfg, sentinelAddr, this.config); + connectionFutures.add(future); + } + + for (RFuture future : connectionFutures) { + future.awaitUninterruptibly(this.config.getConnectTimeout()); + } + break; } catch (RedisConnectionException e) { log.warn("Can't connect to sentinel server. {}", e.getMessage()); @@ -128,17 +156,182 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { initSingleEntry(); - List> connectionFutures = new ArrayList>(cfg.getSentinelAddresses().size()); - for (URI addr : cfg.getSentinelAddresses()) { - RFuture future = registerSentinel(cfg, addr, this.config); - connectionFutures.add(future); + scheduleChangeCheck(cfg, null); + } + + private void scheduleChangeCheck(final SentinelServersConfig cfg, final Iterator iterator) { + monitorFuture = group.schedule(new Runnable() { + @Override + public void run() { + AtomicReference lastException = new AtomicReference(); + Iterator iter = iterator; + if (iter == null) { + iter = sentinels.values().iterator(); + } + checkState(cfg, iter, lastException); + } + }, cfg.getScanInterval(), TimeUnit.MILLISECONDS); + } + + protected void checkState(final SentinelServersConfig cfg, final Iterator iterator, final AtomicReference lastException) { + if (!iterator.hasNext()) { + log.error("Can't update cluster state", lastException.get()); + scheduleChangeCheck(cfg, null); + return; + } + if (!getShutdownLatch().acquire()) { + return; } - for (RFuture future : connectionFutures) { - future.awaitUninterruptibly(); + RedisClient client = iterator.next(); + RFuture connectionFuture = connectToNode(null, null, client); + connectionFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + lastException.set(future.cause()); + getShutdownLatch().release(); + checkState(cfg, iterator, lastException); + return; + } + + RedisConnection connection = future.getNow(); + updateState(cfg, connection, iterator); + } + }); + + } + + protected void updateState(final SentinelServersConfig cfg, final RedisConnection connection, final Iterator iterator) { + final AtomicInteger commands = new AtomicInteger(2); + FutureListener commonListener = new FutureListener() { + + private final AtomicBoolean failed = new AtomicBoolean(); + + @Override + public void operationComplete(Future future) throws Exception { + if (commands.decrementAndGet() == 0) { + getShutdownLatch().release(); + if (failed.get()) { + scheduleChangeCheck(cfg, iterator); + } else { + scheduleChangeCheck(cfg, null); + } + } + if (!future.isSuccess() && failed.compareAndSet(false, true)) { + log.error("Can't execute SENTINEL commands on " + connection.getRedisClient().getAddr(), future.cause()); + closeNodeConnection(connection); + } + } + }; + + RFuture> masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); + masterFuture.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + return; + } + + List master = future.getNow(); + + String current = currentMaster.get(); + String newMaster = createAddress(master.get(0), master.get(1)); + if (!newMaster.equals(current) + && currentMaster.compareAndSet(current, newMaster)) { + changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster)); + } + } + }); + masterFuture.addListener(commonListener); + + if (!config.checkSkipSlavesInit()) { + RFuture>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); + commands.incrementAndGet(); + slavesFuture.addListener(new FutureListener>>() { + @Override + public void operationComplete(Future>> future) throws Exception { + if (!future.isSuccess()) { + return; + } + + List> slavesMap = future.getNow(); + final Set currentSlaves = new HashSet(slavesMap.size()); + List> futures = new ArrayList>(); + for (Map map : slavesMap) { + if (map.isEmpty()) { + continue; + } + + String ip = map.get("ip"); + String port = map.get("port"); + String flags = map.get("flags"); + String masterHost = map.get("master-host"); + String masterPort = map.get("master-port"); + + if (!isUseSameMaster(ip, port, masterHost, masterPort)) { + continue; + } + if (flags.contains("s_down") || flags.contains("disconnected")) { + slaveDown(ip, port); + continue; + } + + String slaveAddr = createAddress(ip, port); + currentSlaves.add(slaveAddr); + RFuture slaveFuture = addSlave(ip, port, slaveAddr); + futures.add(slaveFuture); + } + + CountableListener listener = new CountableListener() { + @Override + protected void onSuccess(Void value) { + Set removedSlaves = new HashSet(slaves); + removedSlaves.removeAll(currentSlaves); + for (String slave : removedSlaves) { + slaves.remove(slave); + String[] parts = slave.replace("redis://", "").split(":"); + slaveDown(parts[0], parts[1]); + } + }; + }; + + listener.setCounter(futures.size()); + for (RFuture f : futures) { + f.addListener(listener); + } + + } + }); + slavesFuture.addListener(commonListener); } + + RFuture>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName()); + sentinelsFuture.addListener(new FutureListener>>() { + @Override + public void operationComplete(Future>> future) throws Exception { + if (!future.isSuccess()) { + return; + } + + List> list = future.getNow(); + for (Map map : list) { + if (map.isEmpty()) { + continue; + } + + String ip = map.get("ip"); + String port = map.get("port"); + + String host = createAddress(ip, port); + URI sentinelAddr = URIBuilder.create(host); + registerSentinel(cfg, sentinelAddr, getConfig()); + } + } + }); + sentinelsFuture.addListener(commonListener); } - + private String createAddress(String host, Object port) { if (host.contains(":")) { host = "[" + host + "]"; @@ -157,9 +350,15 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return entry; } - private RFuture registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) { - RedisClient client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts()); - RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client); + private RFuture registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) { + String key = addr.getHost() + ":" + addr.getPort(); + RedisClient client = sentinels.get(key); + if (client != null) { + return RedissonPromise.newSucceededFuture(null); + } + + client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts()); + RedisClient oldClient = sentinels.putIfAbsent(key, client); if (oldClient != null) { return RedissonPromise.newSucceededFuture(null); } @@ -178,6 +377,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { @Override public void onMessage(String channel, Object msg) { + log.debug("message {} from {}", msg, channel); + if ("+sentinel".equals(channel)) { onSentinelAdded(cfg, (String) msg, c); } @@ -209,7 +410,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } }); - return pubsubFuture; + return RedissonPromise.newSucceededFuture(null); } protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlaveServersConfig c) { @@ -228,42 +429,50 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { if (parts.length > 4 && "slave".equals(parts[0])) { - final String ip = parts[2]; - final String port = parts[3]; - - final String slaveAddr = createAddress(ip, port); + String ip = parts[2]; + String port = parts[3]; if (!isUseSameMaster(parts)) { return; } - // to avoid addition twice - if (slaves.putIfAbsent(slaveAddr, true) == null && !config.checkSkipSlavesInit()) { - final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - RFuture future = entry.addSlave(URIBuilder.create(slaveAddr)); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - slaves.remove(slaveAddr); - log.error("Can't add slave: " + slaveAddr, future.cause()); - return; - } + String slaveAddr = createAddress(ip, port); + addSlave(ip, port, slaveAddr); + } else { + log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); + } + } - URI uri = convert(ip, port); - if (entry.slaveUp(uri, FreezeReason.MANAGER)) { - String slaveAddr = ip + ":" + port; - log.info("slave: {} added", slaveAddr); - } + protected RFuture addSlave(final String ip, final String port, final String slaveAddr) { + final RPromise result = new RedissonPromise(); + // to avoid addition twice + if (slaves.add(slaveAddr) && !config.checkSkipSlavesInit()) { + final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); + RFuture future = entry.addSlave(URIBuilder.create(slaveAddr)); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + slaves.remove(slaveAddr); + result.tryFailure(future.cause()); + log.error("Can't add slave: " + slaveAddr, future.cause()); + return; } - }); - } else { - slaveUp(ip, port); - } + URI uri = convert(ip, port); + if (entry.slaveUp(uri, FreezeReason.MANAGER)) { + String slaveAddr = ip + ":" + port; + log.info("slave: {} added", slaveAddr); + result.trySuccess(null); + } + } + + }); } else { - log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); + slaveUp(ip, port); + result.trySuccess(null); } + return result; } protected URI convert(String ip, String port) { @@ -322,15 +531,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } private boolean isUseSameMaster(String[] parts) { - String ip = parts[2]; - String port = parts[3]; - - String slaveAddr = ip + ":" + port; + return isUseSameMaster(parts[2], parts[3], parts[6], parts[7]); + } + protected boolean isUseSameMaster(String slaveIp, String slavePort, String slaveMasterHost, String slaveMasterPort) { String master = currentMaster.get(); - String slaveMaster = createAddress(parts[6], parts[7]); + String slaveMaster = createAddress(slaveMasterHost, slaveMasterPort); if (!master.equals(slaveMaster)) { - log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMaster, master); + log.warn("Skipped slave up {} for master {} differs from current {}", slaveIp + ":" + slavePort, slaveMaster, master); return false; } return true; @@ -410,6 +618,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { @Override public void shutdown() { + monitorFuture.cancel(true); + List> futures = new ArrayList>(); for (RedisClient sentinel : sentinels.values()) { RFuture future = sentinel.shutdownAsync(); diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index c8b0361ee..bd91fa2eb 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -87,12 +87,10 @@ public class LoadBalancerManager { RPromise result = new RedissonPromise(); CountableListener listener = new CountableListener(result, null) { - public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { - super.operationComplete(future); - if (this.result.isSuccess()) { - client2Entry.put(entry.getClient(), entry); - } - }; + @Override + protected void onSuccess(Void value) { + client2Entry.put(entry.getClient(), entry); + } }; RFuture slaveFuture = slaveConnectionPool.add(entry); diff --git a/redisson/src/test/java/org/redisson/RedisRunner.java b/redisson/src/test/java/org/redisson/RedisRunner.java index 75bb974e8..bfa28f182 100644 --- a/redisson/src/test/java/org/redisson/RedisRunner.java +++ b/redisson/src/test/java/org/redisson/RedisRunner.java @@ -277,7 +277,7 @@ public class RedisRunner { try (PrintWriter printer = new PrintWriter(new FileWriter(confFile))) { args.stream().forEach((arg) -> { if (arg.contains("--")) { - printer.println(arg.replace("--", "\n\r")); + printer.println(arg.replace("--", "")); } }); } @@ -431,7 +431,7 @@ public class RedisRunner { public RedisRunner nosave() { this.nosave = true; options.remove(REDIS_OPTIONS.SAVE); - addConfigOption(REDIS_OPTIONS.SAVE, "''"); +// addConfigOption(REDIS_OPTIONS.SAVE, "''"); return this; } @@ -472,7 +472,9 @@ public class RedisRunner { this.randomDir = true; options.remove(REDIS_OPTIONS.DIR); makeRandomDefaultDir(); - addConfigOption(REDIS_OPTIONS.DIR, defaultDir); + + + addConfigOption(REDIS_OPTIONS.DIR, "\"" + defaultDir + "\""); return this; } @@ -868,6 +870,9 @@ public class RedisRunner { System.out.println("REDIS RUNNER: Making directory " + f.getAbsolutePath()); f.mkdirs(); this.defaultDir = f.getAbsolutePath(); + if (RedissonRuntimeEnvironment.isWindows) { + defaultDir = defaultDir.replace("\\", "\\\\"); + } } } diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 535abb755..4b644d152 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -514,6 +514,138 @@ public class RedissonTopicTest { runner.stop(); } + @Test + public void testReattachInSentinel() throws Exception { + RedisRunner.RedisProcess master = new RedisRunner() + .nosave() + .randomDir() + .run(); + RedisRunner.RedisProcess slave1 = new RedisRunner() + .port(6380) + .nosave() + .randomDir() + .slaveof("127.0.0.1", 6379) + .run(); + RedisRunner.RedisProcess slave2 = new RedisRunner() + .port(6381) + .nosave() + .randomDir() + .slaveof("127.0.0.1", 6379) + .run(); + RedisRunner.RedisProcess sentinel1 = new RedisRunner() + .nosave() + .randomDir() + .port(26379) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + RedisRunner.RedisProcess sentinel2 = new RedisRunner() + .nosave() + .randomDir() + .port(26380) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + RedisRunner.RedisProcess sentinel3 = new RedisRunner() + .nosave() + .randomDir() + .port(26381) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6379, 2) + .run(); + + Thread.sleep(5000); + + Config config = new Config(); + config.useSentinelServers().addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster"); + RedissonClient redisson = Redisson.create(config); + + final AtomicBoolean executed = new AtomicBoolean(); + final AtomicInteger subscriptions = new AtomicInteger(); + + RTopic topic = redisson.getTopic("topic"); + topic.addListener(new StatusListener() { + + @Override + public void onUnsubscribe(String channel) { + } + + @Override + public void onSubscribe(String channel) { + subscriptions.incrementAndGet(); + } + }); + topic.addListener(new MessageListener() { + @Override + public void onMessage(String channel, Integer msg) { + executed.set(true); + } + }); + + sentinel1.stop(); + sentinel2.stop(); + sentinel3.stop(); + master.stop(); + slave1.stop(); + slave2.stop(); + + Thread.sleep(TimeUnit.SECONDS.toMillis(20)); + + master = new RedisRunner() + .port(6390) + .nosave() + .randomDir() + .run(); + slave1 = new RedisRunner() + .port(6391) + .nosave() + .randomDir() + .slaveof("127.0.0.1", 6390) + .run(); + slave2 = new RedisRunner() + .port(6392) + .nosave() + .randomDir() + .slaveof("127.0.0.1", 6390) + .run(); + sentinel1 = new RedisRunner() + .nosave() + .randomDir() + .port(26379) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6390, 2) + .run(); + sentinel2 = new RedisRunner() + .nosave() + .randomDir() + .port(26380) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6390, 2) + .run(); + sentinel3 = new RedisRunner() + .nosave() + .randomDir() + .port(26381) + .sentinel() + .sentinelMonitor("myMaster", "127.0.0.1", 6390, 2) + .run(); + + redisson.getTopic("topic").publish(1); + + await().atMost(10, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2); + Assert.assertTrue(executed.get()); + + Thread.sleep(1000000); + + redisson.shutdown(); + sentinel1.stop(); + sentinel2.stop(); + sentinel3.stop(); + master.stop(); + slave1.stop(); + slave2.stop(); + } + @Test public void testReattachInCluster() throws Exception { RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();