Feature - ping and pingAll methods with timeout added to Node object. #1921

pull/1907/head
Nikita Koksharov 6 years ago
parent aa085248b0
commit ff4c2f71f2

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.redisson.api.Node; import org.redisson.api.Node;
import org.redisson.api.NodeType; import org.redisson.api.NodeType;
@ -57,12 +58,14 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet(); Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
URI addr = URIBuilder.create(address); URI addr = URIBuilder.create(address);
for (MasterSlaveEntry masterSlaveEntry : entries) { for (MasterSlaveEntry masterSlaveEntry : entries) {
if (masterSlaveEntry.getAllEntries().isEmpty() && URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) { if (masterSlaveEntry.getAllEntries().isEmpty()
&& URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) {
return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
} }
for (ClientConnectionsEntry entry : masterSlaveEntry.getAllEntries()) { for (ClientConnectionsEntry entry : masterSlaveEntry.getAllEntries()) {
if (URIBuilder.compare(entry.getClient().getAddr(), addr) && entry.getFreezeReason() != FreezeReason.MANAGER) { if (URIBuilder.compare(entry.getClient().getAddr(), addr)
&& entry.getFreezeReason() != FreezeReason.MANAGER) {
return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType()); return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType());
} }
} }
@ -75,13 +78,15 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet(); Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
List<N> result = new ArrayList<N>(); List<N> result = new ArrayList<N>();
for (MasterSlaveEntry masterSlaveEntry : entries) { for (MasterSlaveEntry masterSlaveEntry : entries) {
if (masterSlaveEntry.getAllEntries().isEmpty() && type == NodeType.MASTER) { if (masterSlaveEntry.getAllEntries().isEmpty()
&& type == NodeType.MASTER) {
RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER); RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
result.add((N) entry); result.add((N) entry);
} }
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) { for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getAllEntries()) {
if (slaveEntry.getFreezeReason() != FreezeReason.MANAGER && slaveEntry.getNodeType() == type) { if (slaveEntry.getFreezeReason() != FreezeReason.MANAGER
&& slaveEntry.getNodeType() == type) {
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType()); RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
result.add((N) entry); result.add((N) entry);
} }
@ -112,7 +117,7 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
} }
@Override @Override
public boolean pingAll() { public boolean pingAll(long timeout, TimeUnit timeUnit) {
List<RedisClientEntry> clients = new ArrayList<>((Collection<RedisClientEntry>) getNodes()); List<RedisClientEntry> clients = new ArrayList<>((Collection<RedisClientEntry>) getNodes());
Map<RedisConnection, RFuture<String>> result = new ConcurrentHashMap<>(clients.size()); Map<RedisConnection, RFuture<String>> result = new ConcurrentHashMap<>(clients.size());
CountDownLatch latch = new CountDownLatch(clients.size()); CountDownLatch latch = new CountDownLatch(clients.size());
@ -120,7 +125,7 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
RFuture<RedisConnection> f = entry.getClient().connectAsync(); RFuture<RedisConnection> f = entry.getClient().connectAsync();
f.onComplete((c, e) -> { f.onComplete((c, e) -> {
if (c != null) { if (c != null) {
RFuture<String> r = c.async(connectionManager.getConfig().getPingTimeout(), RedisCommands.PING); RFuture<String> r = c.async(timeUnit.toMillis(timeout), RedisCommands.PING);
result.put(c, r); result.put(c, r);
latch.countDown(); latch.countDown();
} else { } else {
@ -154,9 +159,14 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
entry.getKey().closeAsync(); entry.getKey().closeAsync();
} }
// true and no futures missed during client connection // true and no futures were missed during client connection
return res && result.size() == clients.size(); return res && result.size() == clients.size();
} }
@Override
public boolean pingAll() {
return pingAll(1, TimeUnit.SECONDS);
}
@Override @Override
public int addConnectionListener(ConnectionListener connectionListener) { public int addConnectionListener(ConnectionListener connectionListener) {

@ -17,6 +17,7 @@ package org.redisson.api;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.Time; import org.redisson.client.protocol.Time;
@ -54,10 +55,20 @@ public interface Node extends NodeAsync {
InetSocketAddress getAddr(); InetSocketAddress getAddr();
/** /**
* Ping Redis node by PING command. * Ping Redis node.
* Default timeout is 1000 milliseconds
* *
* @return <code>true</code> if PONG received, <code>false</code> otherwise * @return <code>true</code> if "PONG" reply received, <code>false</code> otherwise
*/ */
boolean ping(); boolean ping();
/**
* Ping Redis node with specified timeout.
*
* @param timeout - ping timeout
* @param timeUnit - timeout unit
* @return <code>true</code> if "PONG" reply received, <code>false</code> otherwise
*/
boolean ping(long timeout, TimeUnit timeUnit);
} }

@ -16,6 +16,7 @@
package org.redisson.api; package org.redisson.api;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.api.Node.InfoSection; import org.redisson.api.Node.InfoSection;
import org.redisson.client.protocol.Time; import org.redisson.client.protocol.Time;
@ -34,6 +35,15 @@ public interface NodeAsync {
RFuture<Boolean> pingAsync(); RFuture<Boolean> pingAsync();
/**
* Ping Redis node with specified timeout.
*
* @param timeout - ping timeout
* @param timeUnit - timeout unit
* @return <code>true</code> if "PONG" reply received, <code>false</code> otherwise
*/
RFuture<Boolean> pingAsync(long timeout, TimeUnit timeUnit);
RFuture<Map<String, String>> clusterInfoAsync(); RFuture<Map<String, String>> clusterInfoAsync();
} }

@ -16,6 +16,7 @@
package org.redisson.api; package org.redisson.api;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.redisson.connection.ConnectionListener; import org.redisson.connection.ConnectionListener;
@ -67,10 +68,18 @@ public interface NodesGroup<N extends Node> {
Collection<N> getNodes(); Collection<N> getNodes();
/** /**
* Ping all Redis nodes * Ping all Redis nodes.
* Default timeout per Redis node is 1000 milliseconds
* *
* @return <code>true</code> if all nodes have replied "PONG", <code>false</code> in other case. * @return <code>true</code> if all nodes replied "PONG", <code>false</code> in other case.
*/ */
boolean pingAll(); boolean pingAll();
/**
* Ping all Redis nodes with specified timeout per node
*
* @return <code>true</code> if all nodes replied "PONG", <code>false</code> in other case.
*/
boolean pingAll(long timeout, TimeUnit timeUnit);
} }

@ -136,7 +136,7 @@ public class RedisConnection implements RedisCommands {
} }
public <R> R await(RFuture<R> future) { public <R> R await(RFuture<R> future) {
final CountDownLatch l = new CountDownLatch(1); CountDownLatch l = new CountDownLatch(1);
future.onComplete((res, e) -> { future.onComplete((res, e) -> {
l.countDown(); l.countDown();
}); });
@ -184,15 +184,15 @@ public class RedisConnection implements RedisCommands {
} }
public <T, R> RFuture<R> async(long timeout, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> async(long timeout, RedisCommand<T> command, Object... params) {
return async(null, command, params); return async(timeout, null, command, params);
} }
public <T, R> RFuture<R> async(Codec encoder, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> async(Codec encoder, RedisCommand<T> command, Object... params) {
return async(-1, encoder, command, params); return async(-1, encoder, command, params);
} }
public <T, R> RFuture<R> async(long timeout, Codec encoder, final RedisCommand<T> command, final Object... params) { public <T, R> RFuture<R> async(long timeout, Codec encoder, RedisCommand<T> command, Object... params) {
final RPromise<R> promise = new RedissonPromise<R>(); RPromise<R> promise = new RedissonPromise<R>();
if (timeout == -1) { if (timeout == -1) {
timeout = redisClient.getCommandTimeout(); timeout = redisClient.getCommandTimeout();
} }
@ -202,7 +202,7 @@ public class RedisConnection implements RedisCommands {
return RedissonPromise.newFailedFuture(cause); return RedissonPromise.newFailedFuture(cause);
} }
final ScheduledFuture<?> scheduledFuture = redisClient.getEventLoopGroup().schedule(new Runnable() { ScheduledFuture<?> scheduledFuture = redisClient.getEventLoopGroup().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: " RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: "
@ -251,7 +251,7 @@ public class RedisConnection implements RedisCommands {
} }
private void close() { private void close() {
CommandData command = getCurrentCommand(); CommandData<?, ?> command = getCurrentCommand();
if (command != null && command.isBlockingCommand()) { if (command != null && command.isBlockingCommand()) {
channel.close(); channel.close();
} else { } else {

@ -17,11 +17,13 @@ package org.redisson.connection;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.api.ClusterNode; import org.redisson.api.ClusterNode;
import org.redisson.api.NodeType; import org.redisson.api.NodeType;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -62,8 +64,14 @@ public class RedisClientEntry implements ClusterNode {
return client.getAddr(); return client.getAddr();
} }
@Override
public RFuture<Boolean> pingAsync() { public RFuture<Boolean> pingAsync() {
final RPromise<Boolean> result = new RedissonPromise<Boolean>(); return pingAsync(1, TimeUnit.SECONDS);
}
@Override
public RFuture<Boolean> pingAsync(long timeout, TimeUnit timeUnit) {
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> f = commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL); RFuture<Boolean> f = commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL);
f.onComplete((res, e) -> { f.onComplete((res, e) -> {
if (e != null) { if (e != null) {
@ -73,6 +81,10 @@ public class RedisClientEntry implements ClusterNode {
result.trySuccess(res); result.trySuccess(res);
}); });
commandExecutor.getConnectionManager().newTimeout(t -> {
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: PING, Redis client: " + client);
result.tryFailure(ex);
}, timeout, timeUnit);
return result; return result;
} }
@ -80,6 +92,11 @@ public class RedisClientEntry implements ClusterNode {
public boolean ping() { public boolean ping() {
return commandExecutor.get(pingAsync()); return commandExecutor.get(pingAsync());
} }
@Override
public boolean ping(long timeout, TimeUnit timeUnit) {
return commandExecutor.get(pingAsync(timeout, timeUnit));
}
@Override @Override
@SuppressWarnings("AvoidInlineConditionals") @SuppressWarnings("AvoidInlineConditionals")

Loading…
Cancel
Save