Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit dd36a912dc

@ -0,0 +1,11 @@
### Expected behavior
### Actual behavior
### Steps to reproduce or test case
### Redis version
### Redisson version
### Redisson configuration

@ -167,10 +167,10 @@ public class CommandPubSubDecoder extends CommandDecoder {
return null;
}
return commandData.getCommand().getReplayMultiDecoder();
} else if (parts.get(0).equals("message")) {
} else if (command.equals("message")) {
String channelName = (String) parts.get(1);
return entries.get(channelName).getDecoder();
} else if (parts.get(0).equals("pmessage")) {
} else if (command.equals("pmessage")) {
String patternName = (String) parts.get(1);
return entries.get(patternName).getDecoder();
}
@ -191,6 +191,10 @@ public class CommandPubSubDecoder extends CommandDecoder {
return entries.get(patternName).getDecoder().getDecoder(parts.size(), state());
}
}
if (data != null && data.getCommand().getName().equals(RedisCommands.PING.getName())) {
return data.getCodec().getValueDecoder();
}
return super.selectDecoder(data, parts);
}

@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import io.netty.channel.ChannelHandler.Sharable;
@ -50,12 +51,12 @@ public class PingConnectionHandler extends ChannelInboundHandlerAdapter {
protected void sendPing(final ChannelHandlerContext ctx) {
RedisConnection connection = RedisConnection.getFrom(ctx.channel());
final RFuture<Object> future = connection.async(RedisCommands.PING);
final RFuture<String> future = connection.async(StringCodec.INSTANCE, RedisCommands.PING);
config.getTimer().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (future.cancel(false)) {
if (future.cancel(false) || !future.isSuccess()) {
ctx.channel().close();
} else {
sendPing(ctx);

@ -87,8 +87,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private boolean isConfigEndpoint;
private AddressResolver<InetSocketAddress> resolver;
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
super(config, id);
@ -103,15 +101,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
if (cfg.getNodeAddresses().size() == 1) {
this.resolver = resolverGroup.getResolver(getGroup().next());
AddressResolver<InetSocketAddress> resolver = createResolverGroup().getResolver(getGroup().next());
Future<List<InetSocketAddress>> addrsFuture = resolver.resolveAll(InetSocketAddress.createUnresolved(addr.getHost(), addr.getPort()));
List<InetSocketAddress> allAddrs = addrsFuture.syncUninterruptibly().getNow();
if (allAddrs.size() > 1) {
configEndpointHostName = addr.getHost();
isConfigEndpoint = true;
} else {
resolver.close();
}
resolver.close();
}
clusterNodesCommand = RedisCommands.CLUSTER_NODES;
@ -296,12 +293,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public void run() {
if (isConfigEndpoint) {
final URI uri = cfg.getNodeAddresses().iterator().next();
final AddressResolver<InetSocketAddress> resolver = createResolverGroup().getResolver(getGroup().next());
Future<List<InetSocketAddress>> allNodes = resolver.resolveAll(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(future.cause());
if (!future.isSuccess()) {
resolver.close();
checkClusterState(cfg, Collections.<URI>emptyList().iterator(), lastException);
return;
}
@ -312,6 +311,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
nodes.add(node);
}
resolver.close();
Iterator<URI> nodesIterator = nodes.iterator();
checkClusterState(cfg, nodesIterator, lastException);
}

@ -57,15 +57,6 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
*/
private int database = 0;
/**
* Should the server address be monitored for changes in DNS? Useful for
* AWS ElastiCache where the client is pointed at the endpoint for a replication group
* which is a DNS alias to the current master node.<br>
* <em>NB: applications must ensure the JVM DNS cache TTL is low enough to support this.</em>
* e.g., http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-jvm-ttl.html
*/
private boolean dnsMonitoring = true;
/**
* Interval in milliseconds to check DNS
*/
@ -79,7 +70,6 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
setAddress(config.getAddress());
setConnectionPoolSize(config.getConnectionPoolSize());
setSubscriptionConnectionPoolSize(config.getSubscriptionConnectionPoolSize());
setDnsMonitoring(config.isDnsMonitoring());
setDnsMonitoringInterval(config.getDnsMonitoringInterval());
setSubscriptionConnectionMinimumIdleSize(config.getSubscriptionConnectionMinimumIdleSize());
setConnectionMinimumIdleSize(config.getConnectionMinimumIdleSize());
@ -143,27 +133,11 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
}
/**
* Monitoring of the endpoint address for DNS changes.
* <p>
* Applications must ensure the JVM DNS cache TTL is low enough to support this
* Interval in milliseconds to check the endpoint's DNS<p>
* Applications must ensure the JVM DNS cache TTL is low enough to support this.<p>
* Set <code>-1</code> to disable.
* <p>
* Default is <code>true</code>
*
* @param dnsMonitoring flag
* @return config
*/
public SingleServerConfig setDnsMonitoring(boolean dnsMonitoring) {
this.dnsMonitoring = dnsMonitoring;
return this;
}
public boolean isDnsMonitoring() {
return dnsMonitoring;
}
/**
* Interval in milliseconds to check the endpoint DNS if {@link #isDnsMonitoring()} is true.
*
* Default is <code>5000</code>
* Default is <code>5000</code>.
*
* @param dnsMonitoringInterval time
* @return config

@ -389,6 +389,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected MasterSlaveServersConfig create(BaseMasterSlaveServersConfig<?> cfg) {
MasterSlaveServersConfig c = new MasterSlaveServersConfig();
c.setPingConnectionInterval(cfg.getPingConnectionInterval());
c.setSslEnableEndpointIdentification(cfg.isSslEnableEndpointIdentification());
c.setSslProvider(cfg.getSslProvider());
c.setSslTruststore(cfg.getSslTruststore());

@ -57,11 +57,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
newconfig.setConnectTimeout(cfg.getConnectTimeout());
newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
if (cfg.isDnsMonitoring()) {
newconfig.setDnsMonitoringInterval(cfg.getDnsMonitoringInterval());
} else {
newconfig.setDnsMonitoringInterval(-1);
}
newconfig.setDnsMonitoringInterval(cfg.getDnsMonitoringInterval());
newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize());
newconfig.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());

@ -143,7 +143,6 @@ public class SpringNamespaceWikiTest {
assertEquals(13, single.getConnectionMinimumIdleSize());
assertEquals(14, single.getConnectionPoolSize());
assertEquals(15, single.getDatabase());
assertEquals(false, single.isDnsMonitoring());
assertEquals(80000, single.getDnsMonitoringInterval());
((ConfigurableApplicationContext) context).close();
} finally {

Loading…
Cancel
Save