diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 9b5adb9a9..446ac3541 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -34,6 +34,7 @@ import org.redisson.ReadMode; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; +import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisNodeNotFoundException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; @@ -201,12 +202,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (config.getReadMode() == ReadMode.MASTER) { SingleEntry entry = new SingleEntry(slots, this, config); Future f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); - f.syncUninterruptibly(); + if (!f.awaitUninterruptibly(config.getConnectTimeout(), TimeUnit.MILLISECONDS)) { + throw new RedisConnectionException("Can't connect to server " + config.getMasterAddress()); + } addEntry(singleSlotRange, entry); } else { MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config); List> fs = entry.initSlaveBalancer(); Future f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + if (!f.awaitUninterruptibly(config.getConnectTimeout(), TimeUnit.MILLISECONDS)) { + throw new RedisConnectionException("Can't connect to server " + config.getMasterAddress()); + } fs.add(f); for (Future future : fs) { future.syncUninterruptibly(); @@ -223,7 +229,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { c.setPingTimeout(cfg.getPingTimeout()); c.setLoadBalancer(cfg.getLoadBalancer()); c.setPassword(cfg.getPassword()); - c.setDatabase(cfg.getDatabase()); c.setClientName(cfg.getClientName()); c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); @@ -341,13 +346,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { }); } - public Promise subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener) { + public Promise subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener) { Promise promise = newPromise(); subscribe(codec, channelName, listener, promise); return promise; } - private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise promise) { + private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise promise) { PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { synchronized (сonnEntry) { @@ -393,7 +398,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { connect(codec, channelName, listener, promise); } - private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener, + private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise promise) { final int slot = 0; Future connFuture = nextPubSubConnection(slot); diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 942ef8937..1e30f125d 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -29,15 +29,11 @@ import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.pubsub.PubSubType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PubSubConnectionEntry { public enum Status {ACTIVE, INACTIVE} - private final Logger log = LoggerFactory.getLogger(getClass()); - private volatile Status status = Status.ACTIVE; private final Semaphore subscribedChannelsAmount; private final RedisPubSubConnection conn;