From ff4c2f71f2bfd6284aed7b8bc185c24b8aa57716 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 22 Feb 2019 12:44:08 +0300 Subject: [PATCH] Feature - ping and pingAll methods with timeout added to Node object. #1921 --- .../main/java/org/redisson/RedisNodes.java | 24 +++++++++++++------ .../src/main/java/org/redisson/api/Node.java | 15 ++++++++++-- .../main/java/org/redisson/api/NodeAsync.java | 10 ++++++++ .../java/org/redisson/api/NodesGroup.java | 13 ++++++++-- .../org/redisson/client/RedisConnection.java | 12 +++++----- .../redisson/connection/RedisClientEntry.java | 19 ++++++++++++++- 6 files changed, 75 insertions(+), 18 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedisNodes.java b/redisson/src/main/java/org/redisson/RedisNodes.java index feb0e718d..c223f7bdc 100644 --- a/redisson/src/main/java/org/redisson/RedisNodes.java +++ b/redisson/src/main/java/org/redisson/RedisNodes.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.redisson.api.Node; import org.redisson.api.NodeType; @@ -57,12 +58,14 @@ public class RedisNodes implements NodesGroup { Collection entries = connectionManager.getEntrySet(); URI addr = URIBuilder.create(address); for (MasterSlaveEntry masterSlaveEntry : entries) { - if (masterSlaveEntry.getAllEntries().isEmpty() && URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) { + if (masterSlaveEntry.getAllEntries().isEmpty() + && URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) { return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); } for (ClientConnectionsEntry entry : masterSlaveEntry.getAllEntries()) { - if (URIBuilder.compare(entry.getClient().getAddr(), addr) && entry.getFreezeReason() != FreezeReason.MANAGER) { + if (URIBuilder.compare(entry.getClient().getAddr(), addr) + && entry.getFreezeReason() != FreezeReason.MANAGER) { return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType()); } } @@ -75,13 +78,15 @@ public class RedisNodes implements NodesGroup { Collection entries = connectionManager.getEntrySet(); List result = new ArrayList(); for (MasterSlaveEntry masterSlaveEntry : entries) { - if (masterSlaveEntry.getAllEntries().isEmpty() && type == NodeType.MASTER) { + if (masterSlaveEntry.getAllEntries().isEmpty() + && type == NodeType.MASTER) { RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); result.add((N) entry); } for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) { - if (slaveEntry.getFreezeReason() != FreezeReason.MANAGER && slaveEntry.getNodeType() == type) { + if (slaveEntry.getFreezeReason() != FreezeReason.MANAGER + && slaveEntry.getNodeType() == type) { RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType()); result.add((N) entry); } @@ -112,7 +117,7 @@ public class RedisNodes implements NodesGroup { } @Override - public boolean pingAll() { + public boolean pingAll(long timeout, TimeUnit timeUnit) { List clients = new ArrayList<>((Collection) getNodes()); Map> result = new ConcurrentHashMap<>(clients.size()); CountDownLatch latch = new CountDownLatch(clients.size()); @@ -120,7 +125,7 @@ public class RedisNodes implements NodesGroup { RFuture f = entry.getClient().connectAsync(); f.onComplete((c, e) -> { if (c != null) { - RFuture r = c.async(connectionManager.getConfig().getPingTimeout(), RedisCommands.PING); + RFuture r = c.async(timeUnit.toMillis(timeout), RedisCommands.PING); result.put(c, r); latch.countDown(); } else { @@ -154,9 +159,14 @@ public class RedisNodes implements NodesGroup { entry.getKey().closeAsync(); } - // true and no futures missed during client connection + // true and no futures were missed during client connection return res && result.size() == clients.size(); } + + @Override + public boolean pingAll() { + return pingAll(1, TimeUnit.SECONDS); + } @Override public int addConnectionListener(ConnectionListener connectionListener) { diff --git a/redisson/src/main/java/org/redisson/api/Node.java b/redisson/src/main/java/org/redisson/api/Node.java index 0df90be38..676ceec45 100644 --- a/redisson/src/main/java/org/redisson/api/Node.java +++ b/redisson/src/main/java/org/redisson/api/Node.java @@ -17,6 +17,7 @@ package org.redisson.api; import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.redisson.client.protocol.Time; @@ -54,10 +55,20 @@ public interface Node extends NodeAsync { InetSocketAddress getAddr(); /** - * Ping Redis node by PING command. + * Ping Redis node. + * Default timeout is 1000 milliseconds * - * @return true if PONG received, false otherwise + * @return true if "PONG" reply received, false otherwise */ boolean ping(); + + /** + * Ping Redis node with specified timeout. + * + * @param timeout - ping timeout + * @param timeUnit - timeout unit + * @return true if "PONG" reply received, false otherwise + */ + boolean ping(long timeout, TimeUnit timeUnit); } diff --git a/redisson/src/main/java/org/redisson/api/NodeAsync.java b/redisson/src/main/java/org/redisson/api/NodeAsync.java index b1561438b..41542436d 100644 --- a/redisson/src/main/java/org/redisson/api/NodeAsync.java +++ b/redisson/src/main/java/org/redisson/api/NodeAsync.java @@ -16,6 +16,7 @@ package org.redisson.api; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.redisson.api.Node.InfoSection; import org.redisson.client.protocol.Time; @@ -34,6 +35,15 @@ public interface NodeAsync { RFuture pingAsync(); + /** + * Ping Redis node with specified timeout. + * + * @param timeout - ping timeout + * @param timeUnit - timeout unit + * @return true if "PONG" reply received, false otherwise + */ + RFuture pingAsync(long timeout, TimeUnit timeUnit); + RFuture> clusterInfoAsync(); } diff --git a/redisson/src/main/java/org/redisson/api/NodesGroup.java b/redisson/src/main/java/org/redisson/api/NodesGroup.java index 1139f9870..7553c27ea 100644 --- a/redisson/src/main/java/org/redisson/api/NodesGroup.java +++ b/redisson/src/main/java/org/redisson/api/NodesGroup.java @@ -16,6 +16,7 @@ package org.redisson.api; import java.util.Collection; +import java.util.concurrent.TimeUnit; import org.redisson.connection.ConnectionListener; @@ -67,10 +68,18 @@ public interface NodesGroup { Collection getNodes(); /** - * Ping all Redis nodes + * Ping all Redis nodes. + * Default timeout per Redis node is 1000 milliseconds * - * @return true if all nodes have replied "PONG", false in other case. + * @return true if all nodes replied "PONG", false in other case. */ boolean pingAll(); + /** + * Ping all Redis nodes with specified timeout per node + * + * @return true if all nodes replied "PONG", false in other case. + */ + boolean pingAll(long timeout, TimeUnit timeUnit); + } diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index 4d85bbd0f..2944ec97d 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -136,7 +136,7 @@ public class RedisConnection implements RedisCommands { } public R await(RFuture future) { - final CountDownLatch l = new CountDownLatch(1); + CountDownLatch l = new CountDownLatch(1); future.onComplete((res, e) -> { l.countDown(); }); @@ -184,15 +184,15 @@ public class RedisConnection implements RedisCommands { } public RFuture async(long timeout, RedisCommand command, Object... params) { - return async(null, command, params); + return async(timeout, null, command, params); } public RFuture async(Codec encoder, RedisCommand command, Object... params) { return async(-1, encoder, command, params); } - public RFuture async(long timeout, Codec encoder, final RedisCommand command, final Object... params) { - final RPromise promise = new RedissonPromise(); + public RFuture async(long timeout, Codec encoder, RedisCommand command, Object... params) { + RPromise promise = new RedissonPromise(); if (timeout == -1) { timeout = redisClient.getCommandTimeout(); } @@ -202,7 +202,7 @@ public class RedisConnection implements RedisCommands { return RedissonPromise.newFailedFuture(cause); } - final ScheduledFuture scheduledFuture = redisClient.getEventLoopGroup().schedule(new Runnable() { + ScheduledFuture scheduledFuture = redisClient.getEventLoopGroup().schedule(new Runnable() { @Override public void run() { RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: " @@ -251,7 +251,7 @@ public class RedisConnection implements RedisCommands { } private void close() { - CommandData command = getCurrentCommand(); + CommandData command = getCurrentCommand(); if (command != null && command.isBlockingCommand()) { channel.close(); } else { diff --git a/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java b/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java index 607b43bb6..923a44e33 100644 --- a/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java +++ b/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java @@ -17,11 +17,13 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.redisson.api.ClusterNode; import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; +import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; @@ -62,8 +64,14 @@ public class RedisClientEntry implements ClusterNode { return client.getAddr(); } + @Override public RFuture pingAsync() { - final RPromise result = new RedissonPromise(); + return pingAsync(1, TimeUnit.SECONDS); + } + + @Override + public RFuture pingAsync(long timeout, TimeUnit timeUnit) { + RPromise result = new RedissonPromise<>(); RFuture f = commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL); f.onComplete((res, e) -> { if (e != null) { @@ -73,6 +81,10 @@ public class RedisClientEntry implements ClusterNode { result.trySuccess(res); }); + commandExecutor.getConnectionManager().newTimeout(t -> { + RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: PING, Redis client: " + client); + result.tryFailure(ex); + }, timeout, timeUnit); return result; } @@ -80,6 +92,11 @@ public class RedisClientEntry implements ClusterNode { public boolean ping() { return commandExecutor.get(pingAsync()); } + + @Override + public boolean ping(long timeout, TimeUnit timeUnit) { + return commandExecutor.get(pingAsync(timeout, timeUnit)); + } @Override @SuppressWarnings("AvoidInlineConditionals")