|
|
|
@ -34,6 +34,7 @@ import org.redisson.ReadMode;
|
|
|
|
|
import org.redisson.client.BaseRedisPubSubListener;
|
|
|
|
|
import org.redisson.client.RedisClient;
|
|
|
|
|
import org.redisson.client.RedisConnection;
|
|
|
|
|
import org.redisson.client.RedisConnectionException;
|
|
|
|
|
import org.redisson.client.RedisNodeNotFoundException;
|
|
|
|
|
import org.redisson.client.RedisPubSubConnection;
|
|
|
|
|
import org.redisson.client.RedisPubSubListener;
|
|
|
|
@ -201,12 +202,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
if (config.getReadMode() == ReadMode.MASTER) {
|
|
|
|
|
SingleEntry entry = new SingleEntry(slots, this, config);
|
|
|
|
|
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
|
|
|
|
|
f.syncUninterruptibly();
|
|
|
|
|
if (!f.awaitUninterruptibly(config.getConnectTimeout(), TimeUnit.MILLISECONDS)) {
|
|
|
|
|
throw new RedisConnectionException("Can't connect to server " + config.getMasterAddress());
|
|
|
|
|
}
|
|
|
|
|
addEntry(singleSlotRange, entry);
|
|
|
|
|
} else {
|
|
|
|
|
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
|
|
|
|
|
List<Future<Void>> fs = entry.initSlaveBalancer();
|
|
|
|
|
Future<Void> f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
|
|
|
|
|
if (!f.awaitUninterruptibly(config.getConnectTimeout(), TimeUnit.MILLISECONDS)) {
|
|
|
|
|
throw new RedisConnectionException("Can't connect to server " + config.getMasterAddress());
|
|
|
|
|
}
|
|
|
|
|
fs.add(f);
|
|
|
|
|
for (Future<Void> future : fs) {
|
|
|
|
|
future.syncUninterruptibly();
|
|
|
|
@ -223,7 +229,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
c.setPingTimeout(cfg.getPingTimeout());
|
|
|
|
|
c.setLoadBalancer(cfg.getLoadBalancer());
|
|
|
|
|
c.setPassword(cfg.getPassword());
|
|
|
|
|
c.setDatabase(cfg.getDatabase());
|
|
|
|
|
c.setClientName(cfg.getClientName());
|
|
|
|
|
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
|
|
|
|
|
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
|
|
|
|
@ -341,13 +346,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Promise<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener) {
|
|
|
|
|
public Promise<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener) {
|
|
|
|
|
Promise<PubSubConnectionEntry> promise = newPromise();
|
|
|
|
|
subscribe(codec, channelName, listener, promise);
|
|
|
|
|
return promise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise<PubSubConnectionEntry> promise) {
|
|
|
|
|
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener, final Promise<PubSubConnectionEntry> promise) {
|
|
|
|
|
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
|
|
|
|
|
if (сonnEntry != null) {
|
|
|
|
|
synchronized (сonnEntry) {
|
|
|
|
@ -393,7 +398,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
|
connect(codec, channelName, listener, promise);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener,
|
|
|
|
|
private void connect(final Codec codec, final String channelName, final RedisPubSubListener<?> listener,
|
|
|
|
|
final Promise<PubSubConnectionEntry> promise) {
|
|
|
|
|
final int slot = 0;
|
|
|
|
|
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
|
|
|
|
|